summaryrefslogtreecommitdiff
path: root/cloudinit
diff options
context:
space:
mode:
Diffstat (limited to 'cloudinit')
-rw-r--r--cloudinit/sources/DataSourceOVF.py74
-rw-r--r--cloudinit/tests/helpers.py10
2 files changed, 57 insertions, 27 deletions
diff --git a/cloudinit/sources/DataSourceOVF.py b/cloudinit/sources/DataSourceOVF.py
index 24b45d55..ccebf11a 100644
--- a/cloudinit/sources/DataSourceOVF.py
+++ b/cloudinit/sources/DataSourceOVF.py
@@ -375,26 +375,56 @@ def get_ovf_env(dirname):
return (None, False)
-# Transport functions take no input and return
-# a 3 tuple of content, path, filename
-def transport_iso9660(require_iso=True):
+def maybe_cdrom_device(devname):
+ """Test if devname matches known list of devices which may contain iso9660
+ filesystems.
- # default_regex matches values in
- # /lib/udev/rules.d/60-cdrom_id.rules
- # KERNEL!="sr[0-9]*|hd[a-z]|xvd*", GOTO="cdrom_end"
- envname = "CLOUD_INIT_CDROM_DEV_REGEX"
- default_regex = "^(sr[0-9]+|hd[a-z]|xvd.*)"
+ Be helpful in accepting either knames (with no leading /dev/) or full path
+ names, but do not allow paths outside of /dev/, like /dev/foo/bar/xxx.
+ """
+ if not devname:
+ return False
+ elif not isinstance(devname, util.string_types):
+ raise ValueError("Unexpected input for devname: %s" % devname)
+
+ # resolve '..' and multi '/' elements
+ devname = os.path.normpath(devname)
- devname_regex = os.environ.get(envname, default_regex)
+ # drop leading '/dev/'
+ if devname.startswith("/dev/"):
+ # partition returns tuple (before, partition, after)
+ devname = devname.partition("/dev/")[-1]
+
+ # ignore leading slash (/sr0), else fail on / in name (foo/bar/xvdc)
+ if devname.startswith("/"):
+ devname = devname.split("/")[-1]
+ elif devname.count("/") > 0:
+ return False
+
+ # if empty string
+ if not devname:
+ return False
+
+ # default_regex matches values in /lib/udev/rules.d/60-cdrom_id.rules
+ # KERNEL!="sr[0-9]*|hd[a-z]|xvd*", GOTO="cdrom_end"
+ default_regex = r"^(sr[0-9]+|hd[a-z]|xvd.*)"
+ devname_regex = os.environ.get("CLOUD_INIT_CDROM_DEV_REGEX", default_regex)
cdmatch = re.compile(devname_regex)
+ return cdmatch.match(devname) is not None
+
+
+# Transport functions take no input and return
+# a 3 tuple of content, path, filename
+def transport_iso9660(require_iso=True):
+
# Go through mounts to see if it was already mounted
mounts = util.mounts()
for (dev, info) in mounts.items():
fstype = info['fstype']
if fstype != "iso9660" and require_iso:
continue
- if cdmatch.match(dev[5:]) is None: # take off '/dev/'
+ if not maybe_cdrom_device(dev):
continue
mp = info['mountpoint']
(fname, contents) = get_ovf_env(mp)
@@ -406,29 +436,19 @@ def transport_iso9660(require_iso=True):
else:
mtype = None
- devs = os.listdir("/dev/")
- devs.sort()
+ # generate a list of devices with mtype filesystem, filter by regex
+ devs = [dev for dev in
+ util.find_devs_with("TYPE=%s" % mtype if mtype else None)
+ if maybe_cdrom_device(dev)]
for dev in devs:
- fullp = os.path.join("/dev/", dev)
-
- if (fullp in mounts or
- not cdmatch.match(dev) or os.path.isdir(fullp)):
- continue
-
- try:
- # See if we can read anything at all...??
- util.peek_file(fullp, 512)
- except IOError:
- continue
-
try:
- (fname, contents) = util.mount_cb(fullp, get_ovf_env, mtype=mtype)
+ (fname, contents) = util.mount_cb(dev, get_ovf_env, mtype=mtype)
except util.MountFailedError:
- LOG.debug("%s not mountable as iso9660", fullp)
+ LOG.debug("%s not mountable as iso9660", dev)
continue
if contents is not False:
- return (contents, fullp, fname)
+ return (contents, dev, fname)
return (False, None, None)
diff --git a/cloudinit/tests/helpers.py b/cloudinit/tests/helpers.py
index 28e26622..6f88a5b7 100644
--- a/cloudinit/tests/helpers.py
+++ b/cloudinit/tests/helpers.py
@@ -104,6 +104,16 @@ class TestCase(unittest2.TestCase):
super(TestCase, self).setUp()
self.reset_global_state()
+ def add_patch(self, target, attr, **kwargs):
+ """Patches specified target object and sets it as attr on test
+ instance also schedules cleanup"""
+ if 'autospec' not in kwargs:
+ kwargs['autospec'] = True
+ m = mock.patch(target, **kwargs)
+ p = m.start()
+ self.addCleanup(m.stop)
+ setattr(self, attr, p)
+
class CiTestCase(TestCase):
"""This is the preferred test case base class unless user