From a3649e03206a3596131413956ea7ecc18790ec73 Mon Sep 17 00:00:00 2001 From: Lars Kellogg-Stedman Date: Tue, 5 Sep 2017 11:03:59 -0600 Subject: relocate tests/unittests/helpers.py to cloudinit/tests This moves the base test case classes into into cloudinit/tests and updates all the corresponding imports. --- cloudinit/tests/helpers.py | 395 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 395 insertions(+) create mode 100644 cloudinit/tests/helpers.py (limited to 'cloudinit/tests/helpers.py') diff --git a/cloudinit/tests/helpers.py b/cloudinit/tests/helpers.py new file mode 100644 index 00000000..28e26622 --- /dev/null +++ b/cloudinit/tests/helpers.py @@ -0,0 +1,395 @@ +# This file is part of cloud-init. See LICENSE file for license information. + +from __future__ import print_function + +import functools +import json +import logging +import os +import shutil +import sys +import tempfile +import unittest + +import mock +import six +import unittest2 + +try: + from contextlib import ExitStack +except ImportError: + from contextlib2 import ExitStack + +from cloudinit import helpers as ch +from cloudinit import util + +# Used for skipping tests +SkipTest = unittest2.SkipTest + +# Used for detecting different python versions +PY2 = False +PY26 = False +PY27 = False +PY3 = False + +_PY_VER = sys.version_info +_PY_MAJOR, _PY_MINOR, _PY_MICRO = _PY_VER[0:3] +if (_PY_MAJOR, _PY_MINOR) <= (2, 6): + if (_PY_MAJOR, _PY_MINOR) == (2, 6): + PY26 = True + if (_PY_MAJOR, _PY_MINOR) >= (2, 0): + PY2 = True +else: + if (_PY_MAJOR, _PY_MINOR) == (2, 7): + PY27 = True + PY2 = True + if (_PY_MAJOR, _PY_MINOR) >= (3, 0): + PY3 = True + + +# Makes the old path start +# with new base instead of whatever +# it previously had +def rebase_path(old_path, new_base): + if old_path.startswith(new_base): + # Already handled... + return old_path + # Retarget the base of that path + # to the new base instead of the + # old one... + path = os.path.join(new_base, old_path.lstrip("/")) + path = os.path.abspath(path) + return path + + +# Can work on anything that takes a path as arguments +def retarget_many_wrapper(new_base, am, old_func): + def wrapper(*args, **kwds): + n_args = list(args) + nam = am + if am == -1: + nam = len(n_args) + for i in range(0, nam): + path = args[i] + # patchOS() wraps various os and os.path functions, however in + # Python 3 some of these now accept file-descriptors (integers). + # That breaks rebase_path() so in lieu of a better solution, just + # don't rebase if we get a fd. + if isinstance(path, six.string_types): + n_args[i] = rebase_path(path, new_base) + return old_func(*n_args, **kwds) + return wrapper + + +class TestCase(unittest2.TestCase): + + def reset_global_state(self): + """Reset any global state to its original settings. + + cloudinit caches some values in cloudinit.util. Unit tests that + involved those cached paths were then subject to failure if the order + of invocation changed (LP: #1703697). + + This function resets any of these global state variables to their + initial state. + + In the future this should really be done with some registry that + can then be cleaned in a more obvious way. + """ + util.PROC_CMDLINE = None + util._DNS_REDIRECT_IP = None + util._LSB_RELEASE = {} + + def setUp(self): + super(TestCase, self).setUp() + self.reset_global_state() + + +class CiTestCase(TestCase): + """This is the preferred test case base class unless user + needs other test case classes below.""" + + # Subclass overrides for specific test behavior + # Whether or not a unit test needs logfile setup + with_logs = False + + def setUp(self): + super(CiTestCase, self).setUp() + if self.with_logs: + # Create a log handler so unit tests can search expected logs. + self.logger = logging.getLogger() + self.logs = six.StringIO() + formatter = logging.Formatter('%(levelname)s: %(message)s') + handler = logging.StreamHandler(self.logs) + handler.setFormatter(formatter) + self.old_handlers = self.logger.handlers + self.logger.handlers = [handler] + + def tearDown(self): + if self.with_logs: + # Remove the handler we setup + logging.getLogger().handlers = self.old_handlers + super(CiTestCase, self).tearDown() + + def tmp_dir(self, dir=None, cleanup=True): + # return a full path to a temporary directory that will be cleaned up. + if dir is None: + tmpd = tempfile.mkdtemp( + prefix="ci-%s." % self.__class__.__name__) + else: + tmpd = tempfile.mkdtemp(dir=dir) + self.addCleanup(functools.partial(shutil.rmtree, tmpd)) + return tmpd + + def tmp_path(self, path, dir=None): + # return an absolute path to 'path' under dir. + # if dir is None, one will be created with tmp_dir() + # the file is not created or modified. + if dir is None: + dir = self.tmp_dir() + return os.path.normpath(os.path.abspath(os.path.join(dir, path))) + + +class ResourceUsingTestCase(CiTestCase): + + def setUp(self): + super(ResourceUsingTestCase, self).setUp() + self.resource_path = None + + def resourceLocation(self, subname=None): + if self.resource_path is None: + paths = [ + os.path.join('tests', 'data'), + os.path.join('data'), + os.path.join(os.pardir, 'tests', 'data'), + os.path.join(os.pardir, 'data'), + ] + for p in paths: + if os.path.isdir(p): + self.resource_path = p + break + self.assertTrue((self.resource_path and + os.path.isdir(self.resource_path)), + msg="Unable to locate test resource data path!") + if not subname: + return self.resource_path + return os.path.join(self.resource_path, subname) + + def readResource(self, name): + where = self.resourceLocation(name) + with open(where, 'r') as fh: + return fh.read() + + def getCloudPaths(self, ds=None): + tmpdir = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, tmpdir) + cp = ch.Paths({'cloud_dir': tmpdir, + 'templates_dir': self.resourceLocation()}, + ds=ds) + return cp + + +class FilesystemMockingTestCase(ResourceUsingTestCase): + + def setUp(self): + super(FilesystemMockingTestCase, self).setUp() + self.patched_funcs = ExitStack() + + def tearDown(self): + self.patched_funcs.close() + ResourceUsingTestCase.tearDown(self) + + def replicateTestRoot(self, example_root, target_root): + real_root = self.resourceLocation() + real_root = os.path.join(real_root, 'roots', example_root) + for (dir_path, _dirnames, filenames) in os.walk(real_root): + real_path = dir_path + make_path = rebase_path(real_path[len(real_root):], target_root) + util.ensure_dir(make_path) + for f in filenames: + real_path = util.abs_join(real_path, f) + make_path = util.abs_join(make_path, f) + shutil.copy(real_path, make_path) + + def patchUtils(self, new_root): + patch_funcs = { + util: [('write_file', 1), + ('append_file', 1), + ('load_file', 1), + ('ensure_dir', 1), + ('chmod', 1), + ('delete_dir_contents', 1), + ('del_file', 1), + ('sym_link', -1), + ('copy', -1)], + } + for (mod, funcs) in patch_funcs.items(): + for (f, am) in funcs: + func = getattr(mod, f) + trap_func = retarget_many_wrapper(new_root, am, func) + self.patched_funcs.enter_context( + mock.patch.object(mod, f, trap_func)) + + # Handle subprocess calls + func = getattr(util, 'subp') + + def nsubp(*_args, **_kwargs): + return ('', '') + + self.patched_funcs.enter_context( + mock.patch.object(util, 'subp', nsubp)) + + def null_func(*_args, **_kwargs): + return None + + for f in ['chownbyid', 'chownbyname']: + self.patched_funcs.enter_context( + mock.patch.object(util, f, null_func)) + + def patchOS(self, new_root): + patch_funcs = { + os.path: [('isfile', 1), ('exists', 1), + ('islink', 1), ('isdir', 1)], + os: [('listdir', 1), ('mkdir', 1), + ('lstat', 1), ('symlink', 2)], + } + for (mod, funcs) in patch_funcs.items(): + for f, nargs in funcs: + func = getattr(mod, f) + trap_func = retarget_many_wrapper(new_root, nargs, func) + self.patched_funcs.enter_context( + mock.patch.object(mod, f, trap_func)) + + def patchOpen(self, new_root): + trap_func = retarget_many_wrapper(new_root, 1, open) + name = 'builtins.open' if PY3 else '__builtin__.open' + self.patched_funcs.enter_context(mock.patch(name, trap_func)) + + def patchStdoutAndStderr(self, stdout=None, stderr=None): + if stdout is not None: + self.patched_funcs.enter_context( + mock.patch.object(sys, 'stdout', stdout)) + if stderr is not None: + self.patched_funcs.enter_context( + mock.patch.object(sys, 'stderr', stderr)) + + def reRoot(self, root=None): + if root is None: + root = self.tmp_dir() + self.patchUtils(root) + self.patchOS(root) + return root + + +class HttprettyTestCase(CiTestCase): + # necessary as http_proxy gets in the way of httpretty + # https://github.com/gabrielfalcao/HTTPretty/issues/122 + + def setUp(self): + self.restore_proxy = os.environ.get('http_proxy') + if self.restore_proxy is not None: + del os.environ['http_proxy'] + super(HttprettyTestCase, self).setUp() + + def tearDown(self): + if self.restore_proxy: + os.environ['http_proxy'] = self.restore_proxy + super(HttprettyTestCase, self).tearDown() + + +def populate_dir(path, files): + if not os.path.exists(path): + os.makedirs(path) + ret = [] + for (name, content) in files.items(): + p = os.path.sep.join([path, name]) + util.ensure_dir(os.path.dirname(p)) + with open(p, "wb") as fp: + if isinstance(content, six.binary_type): + fp.write(content) + else: + fp.write(content.encode('utf-8')) + fp.close() + ret.append(p) + + return ret + + +def dir2dict(startdir, prefix=None): + flist = {} + if prefix is None: + prefix = startdir + for root, dirs, files in os.walk(startdir): + for fname in files: + fpath = os.path.join(root, fname) + key = fpath[len(prefix):] + flist[key] = util.load_file(fpath) + return flist + + +def json_dumps(data): + # print data in nicely formatted json. + return json.dumps(data, indent=1, sort_keys=True, + separators=(',', ': ')) + + +def wrap_and_call(prefix, mocks, func, *args, **kwargs): + """ + call func(args, **kwargs) with mocks applied, then unapplies mocks + nicer to read than repeating dectorators on each function + + prefix: prefix for mock names (e.g. 'cloudinit.stages.util') or None + mocks: dictionary of names (under 'prefix') to mock and either + a return value or a dictionary to pass to the mock.patch call + func: function to call with mocks applied + *args,**kwargs: arguments for 'func' + + return_value: return from 'func' + """ + delim = '.' + if prefix is None: + prefix = '' + prefix = prefix.rstrip(delim) + unwraps = [] + for fname, kw in mocks.items(): + if prefix: + fname = delim.join((prefix, fname)) + if not isinstance(kw, dict): + kw = {'return_value': kw} + p = mock.patch(fname, **kw) + p.start() + unwraps.append(p) + try: + return func(*args, **kwargs) + finally: + for p in unwraps: + p.stop() + + +try: + skipIf = unittest.skipIf +except AttributeError: + # Python 2.6. Doesn't have to be high fidelity. + def skipIf(condition, reason): + def decorator(func): + def wrapper(*args, **kws): + if condition: + return func(*args, **kws) + else: + print(reason, file=sys.stderr) + return wrapper + return decorator + + +# older versions of mock do not have the useful 'assert_not_called' +if not hasattr(mock.Mock, 'assert_not_called'): + def __mock_assert_not_called(mmock): + if mmock.call_count != 0: + msg = ("[citest] Expected '%s' to not have been called. " + "Called %s times." % + (mmock._mock_name or 'mock', mmock.call_count)) + raise AssertionError(msg) + mock.Mock.assert_not_called = __mock_assert_not_called + + +# vi: ts=4 expandtab -- cgit v1.2.3 From da6562e21d0b17a0957adc0c5a2c9da076e0d219 Mon Sep 17 00:00:00 2001 From: Ryan Harper Date: Tue, 19 Sep 2017 11:10:09 -0500 Subject: DataSourceOVF: use util.find_devs_with(TYPE=iso9660) DataSourceOVF attempts to find iso files via walking os.listdir('/dev/') which is far too wide. This approach is too invasive and can sometimes race with systemd attempting to fsck and mount devices. Instead, utilize cloudinit.util.find_devs_with to filter devices by criteria (which uses blkid under the covers). This results in fewer attempts to mount block devices which do not contain iso filesystems. Unittest changes include: - cloudinit.tests.helpers; introduce add_patch() helper - Add unittest coverage for DataSourceOVF use of transport_iso9660 LP: #1718287 --- cloudinit/sources/DataSourceOVF.py | 74 ++++++++----- cloudinit/tests/helpers.py | 10 ++ tests/unittests/test_datasource/test_ovf.py | 164 ++++++++++++++++++++++++++++ 3 files changed, 221 insertions(+), 27 deletions(-) (limited to 'cloudinit/tests/helpers.py') diff --git a/cloudinit/sources/DataSourceOVF.py b/cloudinit/sources/DataSourceOVF.py index 24b45d55..ccebf11a 100644 --- a/cloudinit/sources/DataSourceOVF.py +++ b/cloudinit/sources/DataSourceOVF.py @@ -375,26 +375,56 @@ def get_ovf_env(dirname): return (None, False) -# Transport functions take no input and return -# a 3 tuple of content, path, filename -def transport_iso9660(require_iso=True): +def maybe_cdrom_device(devname): + """Test if devname matches known list of devices which may contain iso9660 + filesystems. - # default_regex matches values in - # /lib/udev/rules.d/60-cdrom_id.rules - # KERNEL!="sr[0-9]*|hd[a-z]|xvd*", GOTO="cdrom_end" - envname = "CLOUD_INIT_CDROM_DEV_REGEX" - default_regex = "^(sr[0-9]+|hd[a-z]|xvd.*)" + Be helpful in accepting either knames (with no leading /dev/) or full path + names, but do not allow paths outside of /dev/, like /dev/foo/bar/xxx. + """ + if not devname: + return False + elif not isinstance(devname, util.string_types): + raise ValueError("Unexpected input for devname: %s" % devname) + + # resolve '..' and multi '/' elements + devname = os.path.normpath(devname) - devname_regex = os.environ.get(envname, default_regex) + # drop leading '/dev/' + if devname.startswith("/dev/"): + # partition returns tuple (before, partition, after) + devname = devname.partition("/dev/")[-1] + + # ignore leading slash (/sr0), else fail on / in name (foo/bar/xvdc) + if devname.startswith("/"): + devname = devname.split("/")[-1] + elif devname.count("/") > 0: + return False + + # if empty string + if not devname: + return False + + # default_regex matches values in /lib/udev/rules.d/60-cdrom_id.rules + # KERNEL!="sr[0-9]*|hd[a-z]|xvd*", GOTO="cdrom_end" + default_regex = r"^(sr[0-9]+|hd[a-z]|xvd.*)" + devname_regex = os.environ.get("CLOUD_INIT_CDROM_DEV_REGEX", default_regex) cdmatch = re.compile(devname_regex) + return cdmatch.match(devname) is not None + + +# Transport functions take no input and return +# a 3 tuple of content, path, filename +def transport_iso9660(require_iso=True): + # Go through mounts to see if it was already mounted mounts = util.mounts() for (dev, info) in mounts.items(): fstype = info['fstype'] if fstype != "iso9660" and require_iso: continue - if cdmatch.match(dev[5:]) is None: # take off '/dev/' + if not maybe_cdrom_device(dev): continue mp = info['mountpoint'] (fname, contents) = get_ovf_env(mp) @@ -406,29 +436,19 @@ def transport_iso9660(require_iso=True): else: mtype = None - devs = os.listdir("/dev/") - devs.sort() + # generate a list of devices with mtype filesystem, filter by regex + devs = [dev for dev in + util.find_devs_with("TYPE=%s" % mtype if mtype else None) + if maybe_cdrom_device(dev)] for dev in devs: - fullp = os.path.join("/dev/", dev) - - if (fullp in mounts or - not cdmatch.match(dev) or os.path.isdir(fullp)): - continue - - try: - # See if we can read anything at all...?? - util.peek_file(fullp, 512) - except IOError: - continue - try: - (fname, contents) = util.mount_cb(fullp, get_ovf_env, mtype=mtype) + (fname, contents) = util.mount_cb(dev, get_ovf_env, mtype=mtype) except util.MountFailedError: - LOG.debug("%s not mountable as iso9660", fullp) + LOG.debug("%s not mountable as iso9660", dev) continue if contents is not False: - return (contents, fullp, fname) + return (contents, dev, fname) return (False, None, None) diff --git a/cloudinit/tests/helpers.py b/cloudinit/tests/helpers.py index 28e26622..6f88a5b7 100644 --- a/cloudinit/tests/helpers.py +++ b/cloudinit/tests/helpers.py @@ -104,6 +104,16 @@ class TestCase(unittest2.TestCase): super(TestCase, self).setUp() self.reset_global_state() + def add_patch(self, target, attr, **kwargs): + """Patches specified target object and sets it as attr on test + instance also schedules cleanup""" + if 'autospec' not in kwargs: + kwargs['autospec'] = True + m = mock.patch(target, **kwargs) + p = m.start() + self.addCleanup(m.stop) + setattr(self, attr, p) + class CiTestCase(TestCase): """This is the preferred test case base class unless user diff --git a/tests/unittests/test_datasource/test_ovf.py b/tests/unittests/test_datasource/test_ovf.py index 9dbf4dd9..700da86c 100644 --- a/tests/unittests/test_datasource/test_ovf.py +++ b/tests/unittests/test_datasource/test_ovf.py @@ -5,6 +5,7 @@ # This file is part of cloud-init. See LICENSE file for license information. import base64 +from collections import OrderedDict from cloudinit.tests import helpers as test_helpers @@ -70,4 +71,167 @@ class TestReadOvfEnv(test_helpers.TestCase): self.assertEqual({'password': "passw0rd"}, cfg) self.assertIsNone(ud) + +class TestTransportIso9660(test_helpers.CiTestCase): + + def setUp(self): + super(TestTransportIso9660, self).setUp() + self.add_patch('cloudinit.util.find_devs_with', + 'm_find_devs_with') + self.add_patch('cloudinit.util.mounts', 'm_mounts') + self.add_patch('cloudinit.util.mount_cb', 'm_mount_cb') + self.add_patch('cloudinit.sources.DataSourceOVF.get_ovf_env', + 'm_get_ovf_env') + self.m_get_ovf_env.return_value = ('myfile', 'mycontent') + + def test_find_already_mounted(self): + """Check we call get_ovf_env from on matching mounted devices""" + mounts = { + '/dev/sr9': { + 'fstype': 'iso9660', + 'mountpoint': 'wark/media/sr9', + 'opts': 'ro', + } + } + self.m_mounts.return_value = mounts + + (contents, fullp, fname) = dsovf.transport_iso9660() + self.assertEqual("mycontent", contents) + self.assertEqual("/dev/sr9", fullp) + self.assertEqual("myfile", fname) + + def test_find_already_mounted_skips_non_iso9660(self): + """Check we call get_ovf_env ignoring non iso9660""" + mounts = { + '/dev/xvdb': { + 'fstype': 'vfat', + 'mountpoint': 'wark/foobar', + 'opts': 'defaults,noatime', + }, + '/dev/xvdc': { + 'fstype': 'iso9660', + 'mountpoint': 'wark/media/sr9', + 'opts': 'ro', + } + } + # We use an OrderedDict here to ensure we check xvdb before xvdc + # as we're not mocking the regex matching, however, if we place + # an entry in the results then we can be reasonably sure that + # we're skipping an entry which fails to match. + self.m_mounts.return_value = ( + OrderedDict(sorted(mounts.items(), key=lambda t: t[0]))) + + (contents, fullp, fname) = dsovf.transport_iso9660() + self.assertEqual("mycontent", contents) + self.assertEqual("/dev/xvdc", fullp) + self.assertEqual("myfile", fname) + + def test_find_already_mounted_matches_kname(self): + """Check we dont regex match on basename of the device""" + mounts = { + '/dev/foo/bar/xvdc': { + 'fstype': 'iso9660', + 'mountpoint': 'wark/media/sr9', + 'opts': 'ro', + } + } + # we're skipping an entry which fails to match. + self.m_mounts.return_value = mounts + + (contents, fullp, fname) = dsovf.transport_iso9660() + self.assertEqual(False, contents) + self.assertIsNone(fullp) + self.assertIsNone(fname) + + def test_mount_cb_called_on_blkdevs_with_iso9660(self): + """Check we call mount_cb on blockdevs with iso9660 only""" + self.m_mounts.return_value = {} + self.m_find_devs_with.return_value = ['/dev/sr0'] + self.m_mount_cb.return_value = ("myfile", "mycontent") + + (contents, fullp, fname) = dsovf.transport_iso9660() + + self.m_mount_cb.assert_called_with( + "/dev/sr0", dsovf.get_ovf_env, mtype="iso9660") + self.assertEqual("mycontent", contents) + self.assertEqual("/dev/sr0", fullp) + self.assertEqual("myfile", fname) + + def test_mount_cb_called_on_blkdevs_with_iso9660_check_regex(self): + """Check we call mount_cb on blockdevs with iso9660 and match regex""" + self.m_mounts.return_value = {} + self.m_find_devs_with.return_value = [ + '/dev/abc', '/dev/my-cdrom', '/dev/sr0'] + self.m_mount_cb.return_value = ("myfile", "mycontent") + + (contents, fullp, fname) = dsovf.transport_iso9660() + + self.m_mount_cb.assert_called_with( + "/dev/sr0", dsovf.get_ovf_env, mtype="iso9660") + self.assertEqual("mycontent", contents) + self.assertEqual("/dev/sr0", fullp) + self.assertEqual("myfile", fname) + + def test_mount_cb_not_called_no_matches(self): + """Check we don't call mount_cb if nothing matches""" + self.m_mounts.return_value = {} + self.m_find_devs_with.return_value = ['/dev/vg/myovf'] + + (contents, fullp, fname) = dsovf.transport_iso9660() + + self.assertEqual(0, self.m_mount_cb.call_count) + self.assertEqual(False, contents) + self.assertIsNone(fullp) + self.assertIsNone(fname) + + def test_mount_cb_called_require_iso_false(self): + """Check we call mount_cb on blockdevs with require_iso=False""" + self.m_mounts.return_value = {} + self.m_find_devs_with.return_value = ['/dev/xvdz'] + self.m_mount_cb.return_value = ("myfile", "mycontent") + + (contents, fullp, fname) = dsovf.transport_iso9660(require_iso=False) + + self.m_mount_cb.assert_called_with( + "/dev/xvdz", dsovf.get_ovf_env, mtype=None) + self.assertEqual("mycontent", contents) + self.assertEqual("/dev/xvdz", fullp) + self.assertEqual("myfile", fname) + + def test_maybe_cdrom_device_none(self): + """Test maybe_cdrom_device returns False for none/empty input""" + self.assertFalse(dsovf.maybe_cdrom_device(None)) + self.assertFalse(dsovf.maybe_cdrom_device('')) + + def test_maybe_cdrom_device_non_string_exception(self): + """Test maybe_cdrom_device raises ValueError on non-string types""" + with self.assertRaises(ValueError): + dsovf.maybe_cdrom_device({'a': 'eleven'}) + + def test_maybe_cdrom_device_false_on_multi_dir_paths(self): + """Test maybe_cdrom_device is false on /dev[/.*]/* paths""" + self.assertFalse(dsovf.maybe_cdrom_device('/dev/foo/sr0')) + self.assertFalse(dsovf.maybe_cdrom_device('foo/sr0')) + self.assertFalse(dsovf.maybe_cdrom_device('../foo/sr0')) + self.assertFalse(dsovf.maybe_cdrom_device('../foo/sr0')) + + def test_maybe_cdrom_device_true_on_hd_partitions(self): + """Test maybe_cdrom_device is false on /dev/hd[a-z][0-9]+ paths""" + self.assertTrue(dsovf.maybe_cdrom_device('/dev/hda1')) + self.assertTrue(dsovf.maybe_cdrom_device('hdz9')) + + def test_maybe_cdrom_device_true_on_valid_relative_paths(self): + """Test maybe_cdrom_device normalizes paths""" + self.assertTrue(dsovf.maybe_cdrom_device('/dev/wark/../sr9')) + self.assertTrue(dsovf.maybe_cdrom_device('///sr0')) + self.assertTrue(dsovf.maybe_cdrom_device('/sr0')) + self.assertTrue(dsovf.maybe_cdrom_device('//dev//hda')) + + def test_maybe_cdrom_device_true_on_xvd_partitions(self): + """Test maybe_cdrom_device returns true on xvd*""" + self.assertTrue(dsovf.maybe_cdrom_device('/dev/xvda')) + self.assertTrue(dsovf.maybe_cdrom_device('/dev/xvda1')) + self.assertTrue(dsovf.maybe_cdrom_device('xvdza1')) + +# # vi: ts=4 expandtab -- cgit v1.2.3