summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cloudinit/config/cc_mounts.py104
-rw-r--r--tests/unittests/test_handler/test_handler_mounts.py133
2 files changed, 177 insertions, 60 deletions
diff --git a/cloudinit/config/cc_mounts.py b/cloudinit/config/cc_mounts.py
index 1cb1e839..f970c2ca 100644
--- a/cloudinit/config/cc_mounts.py
+++ b/cloudinit/config/cc_mounts.py
@@ -28,15 +28,15 @@ from cloudinit import type_utils
from cloudinit import util
# Shortname matches 'sda', 'sda1', 'xvda', 'hda', 'sdb', xvdb, vda, vdd1, sr0
-SHORTNAME_FILTER = r"^([x]{0,1}[shv]d[a-z][0-9]*|sr[0-9]+)$"
-SHORTNAME = re.compile(SHORTNAME_FILTER)
+DEVICE_NAME_FILTER = r"^([x]{0,1}[shv]d[a-z][0-9]*|sr[0-9]+)$"
+DEVICE_NAME_RE = re.compile(DEVICE_NAME_FILTER)
WS = re.compile("[%s]+" % (whitespace))
FSTAB_PATH = "/etc/fstab"
LOG = logging.getLogger(__name__)
-def is_mdname(name):
+def is_meta_device_name(name):
# return true if this is a metadata service name
if name in ["ami", "root", "swap"]:
return True
@@ -48,6 +48,23 @@ def is_mdname(name):
return False
+def _get_nth_partition_for_device(device_path, partition_number):
+ potential_suffixes = [str(partition_number), 'p%s' % (partition_number,)]
+ for suffix in potential_suffixes:
+ potential_partition_device = '%s%s' % (device_path, suffix)
+ if os.path.exists(potential_partition_device):
+ return potential_partition_device
+ return None
+
+
+def _is_block_device(device_path, partition_path=None):
+ device_name = device_path.split('/')[-1]
+ sys_path = os.path.join('/sys/block/', device_name)
+ if partition_path is not None:
+ sys_path = os.path.join(sys_path, partition_path.split('/')[-1])
+ return os.path.exists(sys_path)
+
+
def sanitize_devname(startname, transformer, log):
log.debug("Attempting to determine the real name of %s", startname)
@@ -58,21 +75,34 @@ def sanitize_devname(startname, transformer, log):
devname = "ephemeral0"
log.debug("Adjusted mount option from ephemeral to ephemeral0")
- (blockdev, part) = util.expand_dotted_devname(devname)
+ device_path, partition_number = util.expand_dotted_devname(devname)
- if is_mdname(blockdev):
- orig = blockdev
- blockdev = transformer(blockdev)
- if not blockdev:
+ if is_meta_device_name(device_path):
+ orig = device_path
+ device_path = transformer(device_path)
+ if not device_path:
return None
- if not blockdev.startswith("/"):
- blockdev = "/dev/%s" % blockdev
- log.debug("Mapped metadata name %s to %s", orig, blockdev)
+ if not device_path.startswith("/"):
+ device_path = "/dev/%s" % (device_path,)
+ log.debug("Mapped metadata name %s to %s", orig, device_path)
+ else:
+ if DEVICE_NAME_RE.match(startname):
+ device_path = "/dev/%s" % (device_path,)
+
+ partition_path = None
+ if partition_number is None:
+ partition_path = _get_nth_partition_for_device(device_path, 1)
else:
- if SHORTNAME.match(startname):
- blockdev = "/dev/%s" % blockdev
+ partition_path = _get_nth_partition_for_device(device_path,
+ partition_number)
+ if partition_path is None:
+ return None
- return devnode_for_dev_part(blockdev, part)
+ if _is_block_device(device_path, partition_path):
+ if partition_path is not None:
+ return partition_path
+ return device_path
+ return None
def suggested_swapsize(memsize=None, maxsize=None, fsys=None):
@@ -366,49 +396,3 @@ def handle(_name, cfg, cloud, log, _args):
util.subp(("mount", "-a"))
except:
util.logexc(log, "Activating mounts via 'mount -a' failed")
-
-
-def devnode_for_dev_part(device, partition):
- """
- Find the name of the partition. While this might seem rather
- straight forward, its not since some devices are '<device><partition>'
- while others are '<device>p<partition>'. For example, /dev/xvda3 on EC2
- will present as /dev/xvda3p1 for the first partition since /dev/xvda3 is
- a block device.
- """
- if not os.path.exists(device):
- return None
-
- short_name = os.path.basename(device)
- sys_path = "/sys/block/%s" % short_name
-
- if not os.path.exists(sys_path):
- LOG.debug("did not find entry for %s in /sys/block", short_name)
- return None
-
- sys_long_path = sys_path + "/" + short_name
-
- if partition is not None:
- partition = str(partition)
-
- if partition is None:
- valid_mappings = [sys_long_path + "1", sys_long_path + "p1"]
- elif partition != "0":
- valid_mappings = [sys_long_path + "%s" % partition,
- sys_long_path + "p%s" % partition]
- else:
- valid_mappings = []
-
- for cdisk in valid_mappings:
- if not os.path.exists(cdisk):
- continue
-
- dev_path = "/dev/%s" % os.path.basename(cdisk)
- if os.path.exists(dev_path):
- return dev_path
-
- if partition is None or partition == "0":
- return device
-
- LOG.debug("Did not fine partition %s for device %s", partition, device)
- return None
diff --git a/tests/unittests/test_handler/test_handler_mounts.py b/tests/unittests/test_handler/test_handler_mounts.py
new file mode 100644
index 00000000..355674b2
--- /dev/null
+++ b/tests/unittests/test_handler/test_handler_mounts.py
@@ -0,0 +1,133 @@
+import os.path
+import shutil
+import tempfile
+
+from cloudinit.config import cc_mounts
+
+from .. import helpers as test_helpers
+
+try:
+ from unittest import mock
+except ImportError:
+ import mock
+
+
+class TestSanitizeDevname(test_helpers.FilesystemMockingTestCase):
+
+ def setUp(self):
+ super(TestSanitizeDevname, self).setUp()
+ self.new_root = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, self.new_root)
+ self.patchOS(self.new_root)
+
+ def _touch(self, path):
+ path = os.path.join(self.new_root, path.lstrip('/'))
+ basedir = os.path.dirname(path)
+ if not os.path.exists(basedir):
+ os.makedirs(basedir)
+ open(path, 'a').close()
+
+ def _makedirs(self, directory):
+ directory = os.path.join(self.new_root, directory.lstrip('/'))
+ if not os.path.exists(directory):
+ os.makedirs(directory)
+
+ def mock_existence_of_disk(self, disk_path):
+ self._touch(disk_path)
+ self._makedirs(os.path.join('/sys/block', disk_path.split('/')[-1]))
+
+ def mock_existence_of_partition(self, disk_path, partition_number):
+ self.mock_existence_of_disk(disk_path)
+ self._touch(disk_path + str(partition_number))
+ disk_name = disk_path.split('/')[-1]
+ self._makedirs(os.path.join('/sys/block',
+ disk_name,
+ disk_name + str(partition_number)))
+
+ def test_existent_full_disk_path_is_returned(self):
+ disk_path = '/dev/sda'
+ self.mock_existence_of_disk(disk_path)
+ self.assertEqual(disk_path,
+ cc_mounts.sanitize_devname(disk_path,
+ lambda x: None,
+ mock.Mock()))
+
+ def test_existent_disk_name_returns_full_path(self):
+ disk_name = 'sda'
+ disk_path = '/dev/' + disk_name
+ self.mock_existence_of_disk(disk_path)
+ self.assertEqual(disk_path,
+ cc_mounts.sanitize_devname(disk_name,
+ lambda x: None,
+ mock.Mock()))
+
+ def test_existent_meta_disk_is_returned(self):
+ actual_disk_path = '/dev/sda'
+ self.mock_existence_of_disk(actual_disk_path)
+ self.assertEqual(
+ actual_disk_path,
+ cc_mounts.sanitize_devname('ephemeral0',
+ lambda x: actual_disk_path,
+ mock.Mock()))
+
+ def test_existent_meta_partition_is_returned(self):
+ disk_name, partition_part = '/dev/sda', '1'
+ actual_partition_path = disk_name + partition_part
+ self.mock_existence_of_partition(disk_name, partition_part)
+ self.assertEqual(
+ actual_partition_path,
+ cc_mounts.sanitize_devname('ephemeral0.1',
+ lambda x: disk_name,
+ mock.Mock()))
+
+ def test_existent_meta_partition_with_p_is_returned(self):
+ disk_name, partition_part = '/dev/sda', 'p1'
+ actual_partition_path = disk_name + partition_part
+ self.mock_existence_of_partition(disk_name, partition_part)
+ self.assertEqual(
+ actual_partition_path,
+ cc_mounts.sanitize_devname('ephemeral0.1',
+ lambda x: disk_name,
+ mock.Mock()))
+
+ def test_first_partition_returned_if_existent_disk_is_partitioned(self):
+ disk_name, partition_part = '/dev/sda', '1'
+ actual_partition_path = disk_name + partition_part
+ self.mock_existence_of_partition(disk_name, partition_part)
+ self.assertEqual(
+ actual_partition_path,
+ cc_mounts.sanitize_devname('ephemeral0',
+ lambda x: disk_name,
+ mock.Mock()))
+
+ def test_nth_partition_returned_if_requested(self):
+ disk_name, partition_part = '/dev/sda', '3'
+ actual_partition_path = disk_name + partition_part
+ self.mock_existence_of_partition(disk_name, partition_part)
+ self.assertEqual(
+ actual_partition_path,
+ cc_mounts.sanitize_devname('ephemeral0.3',
+ lambda x: disk_name,
+ mock.Mock()))
+
+ def test_transformer_returning_none_returns_none(self):
+ self.assertIsNone(
+ cc_mounts.sanitize_devname(
+ 'ephemeral0', lambda x: None, mock.Mock()))
+
+ def test_missing_device_returns_none(self):
+ self.assertIsNone(
+ cc_mounts.sanitize_devname('/dev/sda', None, mock.Mock()))
+
+ def test_missing_sys_returns_none(self):
+ disk_path = '/dev/sda'
+ self._makedirs(disk_path)
+ self.assertIsNone(
+ cc_mounts.sanitize_devname(disk_path, None, mock.Mock()))
+
+ def test_existent_disk_but_missing_partition_returns_none(self):
+ disk_path = '/dev/sda'
+ self.mock_existence_of_disk(disk_path)
+ self.assertIsNone(
+ cc_mounts.sanitize_devname(
+ 'ephemeral0.1', lambda x: disk_path, mock.Mock()))