diff options
Diffstat (limited to 'tests/unittests/config/test_cc_mounts.py')
-rw-r--r-- | tests/unittests/config/test_cc_mounts.py | 522 |
1 files changed, 522 insertions, 0 deletions
diff --git a/tests/unittests/config/test_cc_mounts.py b/tests/unittests/config/test_cc_mounts.py new file mode 100644 index 00000000..084faacd --- /dev/null +++ b/tests/unittests/config/test_cc_mounts.py @@ -0,0 +1,522 @@ +# This file is part of cloud-init. See LICENSE file for license information. + +import os.path +from unittest import mock + +import pytest + +from cloudinit.config import cc_mounts +from cloudinit.config.cc_mounts import create_swapfile +from cloudinit.subp import ProcessExecutionError +from tests.unittests import helpers as test_helpers + +M_PATH = "cloudinit.config.cc_mounts." + + +class TestSanitizeDevname(test_helpers.FilesystemMockingTestCase): + def setUp(self): + super(TestSanitizeDevname, self).setUp() + self.new_root = self.tmp_dir() + self.patchOS(self.new_root) + + def _touch(self, path): + path = os.path.join(self.new_root, path.lstrip("/")) + basedir = os.path.dirname(path) + if not os.path.exists(basedir): + os.makedirs(basedir) + open(path, "a").close() + + def _makedirs(self, directory): + directory = os.path.join(self.new_root, directory.lstrip("/")) + if not os.path.exists(directory): + os.makedirs(directory) + + def mock_existence_of_disk(self, disk_path): + self._touch(disk_path) + self._makedirs(os.path.join("/sys/block", disk_path.split("/")[-1])) + + def mock_existence_of_partition(self, disk_path, partition_number): + self.mock_existence_of_disk(disk_path) + self._touch(disk_path + str(partition_number)) + disk_name = disk_path.split("/")[-1] + self._makedirs( + os.path.join( + "/sys/block", disk_name, disk_name + str(partition_number) + ) + ) + + def test_existent_full_disk_path_is_returned(self): + disk_path = "/dev/sda" + self.mock_existence_of_disk(disk_path) + self.assertEqual( + disk_path, + cc_mounts.sanitize_devname(disk_path, lambda x: None, mock.Mock()), + ) + + def test_existent_disk_name_returns_full_path(self): + disk_name = "sda" + disk_path = "/dev/" + disk_name + self.mock_existence_of_disk(disk_path) + self.assertEqual( + disk_path, + cc_mounts.sanitize_devname(disk_name, lambda x: None, mock.Mock()), + ) + + def test_existent_meta_disk_is_returned(self): + actual_disk_path = "/dev/sda" + self.mock_existence_of_disk(actual_disk_path) + self.assertEqual( + actual_disk_path, + cc_mounts.sanitize_devname( + "ephemeral0", lambda x: actual_disk_path, mock.Mock() + ), + ) + + def test_existent_meta_partition_is_returned(self): + disk_name, partition_part = "/dev/sda", "1" + actual_partition_path = disk_name + partition_part + self.mock_existence_of_partition(disk_name, partition_part) + self.assertEqual( + actual_partition_path, + cc_mounts.sanitize_devname( + "ephemeral0.1", lambda x: disk_name, mock.Mock() + ), + ) + + def test_existent_meta_partition_with_p_is_returned(self): + disk_name, partition_part = "/dev/sda", "p1" + actual_partition_path = disk_name + partition_part + self.mock_existence_of_partition(disk_name, partition_part) + self.assertEqual( + actual_partition_path, + cc_mounts.sanitize_devname( + "ephemeral0.1", lambda x: disk_name, mock.Mock() + ), + ) + + def test_first_partition_returned_if_existent_disk_is_partitioned(self): + disk_name, partition_part = "/dev/sda", "1" + actual_partition_path = disk_name + partition_part + self.mock_existence_of_partition(disk_name, partition_part) + self.assertEqual( + actual_partition_path, + cc_mounts.sanitize_devname( + "ephemeral0", lambda x: disk_name, mock.Mock() + ), + ) + + def test_nth_partition_returned_if_requested(self): + disk_name, partition_part = "/dev/sda", "3" + actual_partition_path = disk_name + partition_part + self.mock_existence_of_partition(disk_name, partition_part) + self.assertEqual( + actual_partition_path, + cc_mounts.sanitize_devname( + "ephemeral0.3", lambda x: disk_name, mock.Mock() + ), + ) + + def test_transformer_returning_none_returns_none(self): + self.assertIsNone( + cc_mounts.sanitize_devname( + "ephemeral0", lambda x: None, mock.Mock() + ) + ) + + def test_missing_device_returns_none(self): + self.assertIsNone( + cc_mounts.sanitize_devname("/dev/sda", None, mock.Mock()) + ) + + def test_missing_sys_returns_none(self): + disk_path = "/dev/sda" + self._makedirs(disk_path) + self.assertIsNone( + cc_mounts.sanitize_devname(disk_path, None, mock.Mock()) + ) + + def test_existent_disk_but_missing_partition_returns_none(self): + disk_path = "/dev/sda" + self.mock_existence_of_disk(disk_path) + self.assertIsNone( + cc_mounts.sanitize_devname( + "ephemeral0.1", lambda x: disk_path, mock.Mock() + ) + ) + + def test_network_device_returns_network_device(self): + disk_path = "netdevice:/path" + self.assertEqual( + disk_path, cc_mounts.sanitize_devname(disk_path, None, mock.Mock()) + ) + + def test_device_aliases_remapping(self): + disk_path = "/dev/sda" + self.mock_existence_of_disk(disk_path) + self.assertEqual( + disk_path, + cc_mounts.sanitize_devname( + "mydata", lambda x: None, mock.Mock(), {"mydata": disk_path} + ), + ) + + +class TestSwapFileCreation(test_helpers.FilesystemMockingTestCase): + def setUp(self): + super(TestSwapFileCreation, self).setUp() + self.new_root = self.tmp_dir() + self.patchOS(self.new_root) + + self.fstab_path = os.path.join(self.new_root, "etc/fstab") + self.swap_path = os.path.join(self.new_root, "swap.img") + self._makedirs("/etc") + + self.add_patch( + "cloudinit.config.cc_mounts.FSTAB_PATH", + "mock_fstab_path", + self.fstab_path, + autospec=False, + ) + + self.add_patch("cloudinit.config.cc_mounts.subp.subp", "m_subp_subp") + + self.add_patch( + "cloudinit.config.cc_mounts.util.mounts", + "mock_util_mounts", + return_value={ + "/dev/sda1": { + "fstype": "ext4", + "mountpoint": "/", + "opts": "rw,relatime,discard", + } + }, + ) + + self.mock_cloud = mock.Mock() + self.mock_log = mock.Mock() + self.mock_cloud.device_name_to_device = self.device_name_to_device + + self.cc = { + "swap": { + "filename": self.swap_path, + "size": "512", + "maxsize": "512", + } + } + + def _makedirs(self, directory): + directory = os.path.join(self.new_root, directory.lstrip("/")) + if not os.path.exists(directory): + os.makedirs(directory) + + def device_name_to_device(self, path): + if path == "swap": + return self.swap_path + else: + dev = None + + return dev + + @mock.patch("cloudinit.util.get_mount_info") + @mock.patch("cloudinit.util.kernel_version") + def test_swap_creation_method_fallocate_on_xfs( + self, m_kernel_version, m_get_mount_info + ): + m_kernel_version.return_value = (4, 20) + m_get_mount_info.return_value = ["", "xfs"] + + cc_mounts.handle(None, self.cc, self.mock_cloud, self.mock_log, []) + self.m_subp_subp.assert_has_calls( + [ + mock.call( + ["fallocate", "-l", "0M", self.swap_path], capture=True + ), + mock.call(["mkswap", self.swap_path]), + mock.call(["swapon", "-a"]), + ] + ) + + @mock.patch("cloudinit.util.get_mount_info") + @mock.patch("cloudinit.util.kernel_version") + def test_swap_creation_method_xfs( + self, m_kernel_version, m_get_mount_info + ): + m_kernel_version.return_value = (3, 18) + m_get_mount_info.return_value = ["", "xfs"] + + cc_mounts.handle(None, self.cc, self.mock_cloud, self.mock_log, []) + self.m_subp_subp.assert_has_calls( + [ + mock.call( + [ + "dd", + "if=/dev/zero", + "of=" + self.swap_path, + "bs=1M", + "count=0", + ], + capture=True, + ), + mock.call(["mkswap", self.swap_path]), + mock.call(["swapon", "-a"]), + ] + ) + + @mock.patch("cloudinit.util.get_mount_info") + @mock.patch("cloudinit.util.kernel_version") + def test_swap_creation_method_btrfs( + self, m_kernel_version, m_get_mount_info + ): + m_kernel_version.return_value = (4, 20) + m_get_mount_info.return_value = ["", "btrfs"] + + cc_mounts.handle(None, self.cc, self.mock_cloud, self.mock_log, []) + self.m_subp_subp.assert_has_calls( + [ + mock.call( + [ + "dd", + "if=/dev/zero", + "of=" + self.swap_path, + "bs=1M", + "count=0", + ], + capture=True, + ), + mock.call(["mkswap", self.swap_path]), + mock.call(["swapon", "-a"]), + ] + ) + + @mock.patch("cloudinit.util.get_mount_info") + @mock.patch("cloudinit.util.kernel_version") + def test_swap_creation_method_ext4( + self, m_kernel_version, m_get_mount_info + ): + m_kernel_version.return_value = (5, 14) + m_get_mount_info.return_value = ["", "ext4"] + + cc_mounts.handle(None, self.cc, self.mock_cloud, self.mock_log, []) + self.m_subp_subp.assert_has_calls( + [ + mock.call( + ["fallocate", "-l", "0M", self.swap_path], capture=True + ), + mock.call(["mkswap", self.swap_path]), + mock.call(["swapon", "-a"]), + ] + ) + + +class TestFstabHandling(test_helpers.FilesystemMockingTestCase): + + swap_path = "/dev/sdb1" + + def setUp(self): + super(TestFstabHandling, self).setUp() + self.new_root = self.tmp_dir() + self.patchOS(self.new_root) + + self.fstab_path = os.path.join(self.new_root, "etc/fstab") + self._makedirs("/etc") + + self.add_patch( + "cloudinit.config.cc_mounts.FSTAB_PATH", + "mock_fstab_path", + self.fstab_path, + autospec=False, + ) + + self.add_patch( + "cloudinit.config.cc_mounts._is_block_device", + "mock_is_block_device", + return_value=True, + ) + + self.add_patch("cloudinit.config.cc_mounts.subp.subp", "m_subp_subp") + + self.add_patch( + "cloudinit.config.cc_mounts.util.mounts", + "mock_util_mounts", + return_value={ + "/dev/sda1": { + "fstype": "ext4", + "mountpoint": "/", + "opts": "rw,relatime,discard", + } + }, + ) + + self.mock_cloud = mock.Mock() + self.mock_log = mock.Mock() + self.mock_cloud.device_name_to_device = self.device_name_to_device + + def _makedirs(self, directory): + directory = os.path.join(self.new_root, directory.lstrip("/")) + if not os.path.exists(directory): + os.makedirs(directory) + + def device_name_to_device(self, path): + if path == "swap": + return self.swap_path + else: + dev = None + + return dev + + def test_no_fstab(self): + """Handle images which do not include an fstab.""" + self.assertFalse(os.path.exists(cc_mounts.FSTAB_PATH)) + fstab_expected_content = ( + "%s\tnone\tswap\tsw,comment=cloudconfig\t0\t0\n" + % (self.swap_path,) + ) + cc_mounts.handle(None, {}, self.mock_cloud, self.mock_log, []) + with open(cc_mounts.FSTAB_PATH, "r") as fd: + fstab_new_content = fd.read() + self.assertEqual(fstab_expected_content, fstab_new_content) + + def test_swap_integrity(self): + """Ensure that the swap file is correctly created and can + swapon successfully. Fixing the corner case of: + kernel: swapon: swapfile has holes""" + + fstab = "/swap.img swap swap defaults 0 0\n" + + with open(cc_mounts.FSTAB_PATH, "w") as fd: + fd.write(fstab) + cc = {"swap": ["filename: /swap.img", "size: 512", "maxsize: 512"]} + cc_mounts.handle(None, cc, self.mock_cloud, self.mock_log, []) + + def test_fstab_no_swap_device(self): + """Ensure that cloud-init adds a discovered swap partition + to /etc/fstab.""" + + fstab_original_content = "" + fstab_expected_content = ( + "%s\tnone\tswap\tsw,comment=cloudconfig\t0\t0\n" + % (self.swap_path,) + ) + + with open(cc_mounts.FSTAB_PATH, "w") as fd: + fd.write(fstab_original_content) + + cc_mounts.handle(None, {}, self.mock_cloud, self.mock_log, []) + + with open(cc_mounts.FSTAB_PATH, "r") as fd: + fstab_new_content = fd.read() + self.assertEqual(fstab_expected_content, fstab_new_content) + + def test_fstab_same_swap_device_already_configured(self): + """Ensure that cloud-init will not add a swap device if the same + device already exists in /etc/fstab.""" + + fstab_original_content = "%s swap swap defaults 0 0\n" % ( + self.swap_path, + ) + fstab_expected_content = fstab_original_content + + with open(cc_mounts.FSTAB_PATH, "w") as fd: + fd.write(fstab_original_content) + + cc_mounts.handle(None, {}, self.mock_cloud, self.mock_log, []) + + with open(cc_mounts.FSTAB_PATH, "r") as fd: + fstab_new_content = fd.read() + self.assertEqual(fstab_expected_content, fstab_new_content) + + def test_fstab_alternate_swap_device_already_configured(self): + """Ensure that cloud-init will add a discovered swap device to + /etc/fstab even when there exists a swap definition on another + device.""" + + fstab_original_content = "/dev/sdc1 swap swap defaults 0 0\n" + fstab_expected_content = ( + fstab_original_content + + "%s\tnone\tswap\tsw,comment=cloudconfig\t0\t0\n" + % (self.swap_path,) + ) + + with open(cc_mounts.FSTAB_PATH, "w") as fd: + fd.write(fstab_original_content) + + cc_mounts.handle(None, {}, self.mock_cloud, self.mock_log, []) + + with open(cc_mounts.FSTAB_PATH, "r") as fd: + fstab_new_content = fd.read() + self.assertEqual(fstab_expected_content, fstab_new_content) + + def test_no_change_fstab_sets_needs_mount_all(self): + """verify unchanged fstab entries are mounted if not call mount -a""" + fstab_original_content = ( + "LABEL=cloudimg-rootfs / ext4 defaults 0 0\n" + "LABEL=UEFI /boot/efi vfat defaults 0 0\n" + "/dev/vdb /mnt auto defaults,noexec,comment=cloudconfig 0 2\n" + ) + fstab_expected_content = fstab_original_content + cc = {"mounts": [["/dev/vdb", "/mnt", "auto", "defaults,noexec"]]} + with open(cc_mounts.FSTAB_PATH, "w") as fd: + fd.write(fstab_original_content) + with open(cc_mounts.FSTAB_PATH, "r") as fd: + fstab_new_content = fd.read() + self.assertEqual(fstab_expected_content, fstab_new_content) + cc_mounts.handle(None, cc, self.mock_cloud, self.mock_log, []) + self.m_subp_subp.assert_has_calls( + [ + mock.call(["mount", "-a"]), + mock.call(["systemctl", "daemon-reload"]), + ] + ) + + +class TestCreateSwapfile: + @pytest.mark.parametrize("fstype", ("xfs", "btrfs", "ext4", "other")) + @mock.patch(M_PATH + "util.get_mount_info") + @mock.patch(M_PATH + "subp.subp") + def test_happy_path(self, m_subp, m_get_mount_info, fstype, tmpdir): + swap_file = tmpdir.join("swap-file") + fname = str(swap_file) + + # Some of the calls to subp.subp should create the swap file; this + # roughly approximates that + m_subp.side_effect = lambda *args, **kwargs: swap_file.write("") + + m_get_mount_info.return_value = (mock.ANY, fstype) + + create_swapfile(fname, "") + assert mock.call(["mkswap", fname]) in m_subp.call_args_list + + @mock.patch(M_PATH + "util.get_mount_info") + @mock.patch(M_PATH + "subp.subp") + def test_fallback_from_fallocate_to_dd( + self, m_subp, m_get_mount_info, caplog, tmpdir + ): + swap_file = tmpdir.join("swap-file") + fname = str(swap_file) + + def subp_side_effect(cmd, *args, **kwargs): + # Mock fallocate failing, to initiate fallback + if cmd[0] == "fallocate": + raise ProcessExecutionError() + + m_subp.side_effect = subp_side_effect + # Use ext4 so both fallocate and dd are valid swap creation methods + m_get_mount_info.return_value = (mock.ANY, "ext4") + + create_swapfile(fname, "") + + cmds = [args[0][0] for args, _kwargs in m_subp.call_args_list] + assert "fallocate" in cmds, "fallocate was not called" + assert "dd" in cmds, "fallocate failure did not fallback to dd" + + assert cmds.index("dd") > cmds.index( + "fallocate" + ), "dd ran before fallocate" + + assert mock.call(["mkswap", fname]) in m_subp.call_args_list + + msg = "fallocate swap creation failed, will attempt with dd" + assert msg in caplog.text + + +# vi: ts=4 expandtab |