# This file is part of cloud-init. See LICENSE file for license information. import os.path from unittest import mock import pytest from cloudinit.config import cc_mounts from cloudinit.config.cc_mounts import create_swapfile from cloudinit.subp import ProcessExecutionError from tests.unittests import helpers as test_helpers M_PATH = "cloudinit.config.cc_mounts." class TestSanitizeDevname(test_helpers.FilesystemMockingTestCase): def setUp(self): super(TestSanitizeDevname, self).setUp() self.new_root = self.tmp_dir() self.patchOS(self.new_root) def _touch(self, path): path = os.path.join(self.new_root, path.lstrip("/")) basedir = os.path.dirname(path) if not os.path.exists(basedir): os.makedirs(basedir) open(path, "a").close() def _makedirs(self, directory): directory = os.path.join(self.new_root, directory.lstrip("/")) if not os.path.exists(directory): os.makedirs(directory) def mock_existence_of_disk(self, disk_path): self._touch(disk_path) self._makedirs(os.path.join("/sys/block", disk_path.split("/")[-1])) def mock_existence_of_partition(self, disk_path, partition_number): self.mock_existence_of_disk(disk_path) self._touch(disk_path + str(partition_number)) disk_name = disk_path.split("/")[-1] self._makedirs( os.path.join( "/sys/block", disk_name, disk_name + str(partition_number) ) ) def test_existent_full_disk_path_is_returned(self): disk_path = "/dev/sda" self.mock_existence_of_disk(disk_path) self.assertEqual( disk_path, cc_mounts.sanitize_devname(disk_path, lambda x: None, mock.Mock()), ) def test_existent_disk_name_returns_full_path(self): disk_name = "sda" disk_path = "/dev/" + disk_name self.mock_existence_of_disk(disk_path) self.assertEqual( disk_path, cc_mounts.sanitize_devname(disk_name, lambda x: None, mock.Mock()), ) def test_existent_meta_disk_is_returned(self): actual_disk_path = "/dev/sda" self.mock_existence_of_disk(actual_disk_path) self.assertEqual( actual_disk_path, cc_mounts.sanitize_devname( "ephemeral0", lambda x: actual_disk_path, mock.Mock() ), ) def test_existent_meta_partition_is_returned(self): disk_name, partition_part = "/dev/sda", "1" actual_partition_path = disk_name + partition_part self.mock_existence_of_partition(disk_name, partition_part) self.assertEqual( actual_partition_path, cc_mounts.sanitize_devname( "ephemeral0.1", lambda x: disk_name, mock.Mock() ), ) def test_existent_meta_partition_with_p_is_returned(self): disk_name, partition_part = "/dev/sda", "p1" actual_partition_path = disk_name + partition_part self.mock_existence_of_partition(disk_name, partition_part) self.assertEqual( actual_partition_path, cc_mounts.sanitize_devname( "ephemeral0.1", lambda x: disk_name, mock.Mock() ), ) def test_first_partition_returned_if_existent_disk_is_partitioned(self): disk_name, partition_part = "/dev/sda", "1" actual_partition_path = disk_name + partition_part self.mock_existence_of_partition(disk_name, partition_part) self.assertEqual( actual_partition_path, cc_mounts.sanitize_devname( "ephemeral0", lambda x: disk_name, mock.Mock() ), ) def test_nth_partition_returned_if_requested(self): disk_name, partition_part = "/dev/sda", "3" actual_partition_path = disk_name + partition_part self.mock_existence_of_partition(disk_name, partition_part) self.assertEqual( actual_partition_path, cc_mounts.sanitize_devname( "ephemeral0.3", lambda x: disk_name, mock.Mock() ), ) def test_transformer_returning_none_returns_none(self): self.assertIsNone( cc_mounts.sanitize_devname( "ephemeral0", lambda x: None, mock.Mock() ) ) def test_missing_device_returns_none(self): self.assertIsNone( cc_mounts.sanitize_devname("/dev/sda", None, mock.Mock()) ) def test_missing_sys_returns_none(self): disk_path = "/dev/sda" self._makedirs(disk_path) self.assertIsNone( cc_mounts.sanitize_devname(disk_path, None, mock.Mock()) ) def test_existent_disk_but_missing_partition_returns_none(self): disk_path = "/dev/sda" self.mock_existence_of_disk(disk_path) self.assertIsNone( cc_mounts.sanitize_devname( "ephemeral0.1", lambda x: disk_path, mock.Mock() ) ) def test_network_device_returns_network_device(self): disk_path = "netdevice:/path" self.assertEqual( disk_path, cc_mounts.sanitize_devname(disk_path, None, mock.Mock()) ) def test_device_aliases_remapping(self): disk_path = "/dev/sda" self.mock_existence_of_disk(disk_path) self.assertEqual( disk_path, cc_mounts.sanitize_devname( "mydata", lambda x: None, mock.Mock(), {"mydata": disk_path} ), ) class TestSwapFileCreation(test_helpers.FilesystemMockingTestCase): def setUp(self): super(TestSwapFileCreation, self).setUp() self.new_root = self.tmp_dir() self.patchOS(self.new_root) self.fstab_path = os.path.join(self.new_root, "etc/fstab") self.swap_path = os.path.join(self.new_root, "swap.img") self._makedirs("/etc") self.add_patch( "cloudinit.config.cc_mounts.FSTAB_PATH", "mock_fstab_path", self.fstab_path, autospec=False, ) self.add_patch("cloudinit.config.cc_mounts.subp.subp", "m_subp_subp") self.add_patch( "cloudinit.config.cc_mounts.util.mounts", "mock_util_mounts", return_value={ "/dev/sda1": { "fstype": "ext4", "mountpoint": "/", "opts": "rw,relatime,discard", } }, ) self.mock_cloud = mock.Mock() self.mock_log = mock.Mock() self.mock_cloud.device_name_to_device = self.device_name_to_device self.cc = { "swap": { "filename": self.swap_path, "size": "512", "maxsize": "512", } } def _makedirs(self, directory): directory = os.path.join(self.new_root, directory.lstrip("/")) if not os.path.exists(directory): os.makedirs(directory) def device_name_to_device(self, path): if path == "swap": return self.swap_path else: dev = None return dev @mock.patch("cloudinit.util.get_mount_info") @mock.patch("cloudinit.util.kernel_version") def test_swap_creation_method_fallocate_on_xfs( self, m_kernel_version, m_get_mount_info ): m_kernel_version.return_value = (4, 20) m_get_mount_info.return_value = ["", "xfs"] cc_mounts.handle(None, self.cc, self.mock_cloud, self.mock_log, []) self.m_subp_subp.assert_has_calls( [ mock.call( ["fallocate", "-l", "0M", self.swap_path], capture=True ), mock.call(["mkswap", self.swap_path]), mock.call(["swapon", "-a"]), ] ) @mock.patch("cloudinit.util.get_mount_info") @mock.patch("cloudinit.util.kernel_version") def test_swap_creation_method_xfs( self, m_kernel_version, m_get_mount_info ): m_kernel_version.return_value = (3, 18) m_get_mount_info.return_value = ["", "xfs"] cc_mounts.handle(None, self.cc, self.mock_cloud, self.mock_log, []) self.m_subp_subp.assert_has_calls( [ mock.call( [ "dd", "if=/dev/zero", "of=" + self.swap_path, "bs=1M", "count=0", ], capture=True, ), mock.call(["mkswap", self.swap_path]), mock.call(["swapon", "-a"]), ] ) @mock.patch("cloudinit.util.get_mount_info") @mock.patch("cloudinit.util.kernel_version") def test_swap_creation_method_btrfs( self, m_kernel_version, m_get_mount_info ): m_kernel_version.return_value = (4, 20) m_get_mount_info.return_value = ["", "btrfs"] cc_mounts.handle(None, self.cc, self.mock_cloud, self.mock_log, []) self.m_subp_subp.assert_has_calls( [ mock.call( [ "dd", "if=/dev/zero", "of=" + self.swap_path, "bs=1M", "count=0", ], capture=True, ), mock.call(["mkswap", self.swap_path]), mock.call(["swapon", "-a"]), ] ) @mock.patch("cloudinit.util.get_mount_info") @mock.patch("cloudinit.util.kernel_version") def test_swap_creation_method_ext4( self, m_kernel_version, m_get_mount_info ): m_kernel_version.return_value = (5, 14) m_get_mount_info.return_value = ["", "ext4"] cc_mounts.handle(None, self.cc, self.mock_cloud, self.mock_log, []) self.m_subp_subp.assert_has_calls( [ mock.call( ["fallocate", "-l", "0M", self.swap_path], capture=True ), mock.call(["mkswap", self.swap_path]), mock.call(["swapon", "-a"]), ] ) class TestFstabHandling(test_helpers.FilesystemMockingTestCase): swap_path = "/dev/sdb1" def setUp(self): super(TestFstabHandling, self).setUp() self.new_root = self.tmp_dir() self.patchOS(self.new_root) self.fstab_path = os.path.join(self.new_root, "etc/fstab") self._makedirs("/etc") self.add_patch( "cloudinit.config.cc_mounts.FSTAB_PATH", "mock_fstab_path", self.fstab_path, autospec=False, ) self.add_patch( "cloudinit.config.cc_mounts._is_block_device", "mock_is_block_device", return_value=True, ) self.add_patch("cloudinit.config.cc_mounts.subp.subp", "m_subp_subp") self.add_patch( "cloudinit.config.cc_mounts.util.mounts", "mock_util_mounts", return_value={ "/dev/sda1": { "fstype": "ext4", "mountpoint": "/", "opts": "rw,relatime,discard", } }, ) self.mock_cloud = mock.Mock() self.mock_log = mock.Mock() self.mock_cloud.device_name_to_device = self.device_name_to_device def _makedirs(self, directory): directory = os.path.join(self.new_root, directory.lstrip("/")) if not os.path.exists(directory): os.makedirs(directory) def device_name_to_device(self, path): if path == "swap": return self.swap_path else: dev = None return dev def test_no_fstab(self): """Handle images which do not include an fstab.""" self.assertFalse(os.path.exists(cc_mounts.FSTAB_PATH)) fstab_expected_content = ( "%s\tnone\tswap\tsw,comment=cloudconfig\t0\t0\n" % (self.swap_path,) ) cc_mounts.handle(None, {}, self.mock_cloud, self.mock_log, []) with open(cc_mounts.FSTAB_PATH, "r") as fd: fstab_new_content = fd.read() self.assertEqual(fstab_expected_content, fstab_new_content) def test_swap_integrity(self): """Ensure that the swap file is correctly created and can swapon successfully. Fixing the corner case of: kernel: swapon: swapfile has holes""" fstab = "/swap.img swap swap defaults 0 0\n" with open(cc_mounts.FSTAB_PATH, "w") as fd: fd.write(fstab) cc = {"swap": ["filename: /swap.img", "size: 512", "maxsize: 512"]} cc_mounts.handle(None, cc, self.mock_cloud, self.mock_log, []) def test_fstab_no_swap_device(self): """Ensure that cloud-init adds a discovered swap partition to /etc/fstab.""" fstab_original_content = "" fstab_expected_content = ( "%s\tnone\tswap\tsw,comment=cloudconfig\t0\t0\n" % (self.swap_path,) ) with open(cc_mounts.FSTAB_PATH, "w") as fd: fd.write(fstab_original_content) cc_mounts.handle(None, {}, self.mock_cloud, self.mock_log, []) with open(cc_mounts.FSTAB_PATH, "r") as fd: fstab_new_content = fd.read() self.assertEqual(fstab_expected_content, fstab_new_content) def test_fstab_same_swap_device_already_configured(self): """Ensure that cloud-init will not add a swap device if the same device already exists in /etc/fstab.""" fstab_original_content = "%s swap swap defaults 0 0\n" % ( self.swap_path, ) fstab_expected_content = fstab_original_content with open(cc_mounts.FSTAB_PATH, "w") as fd: fd.write(fstab_original_content) cc_mounts.handle(None, {}, self.mock_cloud, self.mock_log, []) with open(cc_mounts.FSTAB_PATH, "r") as fd: fstab_new_content = fd.read() self.assertEqual(fstab_expected_content, fstab_new_content) def test_fstab_alternate_swap_device_already_configured(self): """Ensure that cloud-init will add a discovered swap device to /etc/fstab even when there exists a swap definition on another device.""" fstab_original_content = "/dev/sdc1 swap swap defaults 0 0\n" fstab_expected_content = ( fstab_original_content + "%s\tnone\tswap\tsw,comment=cloudconfig\t0\t0\n" % (self.swap_path,) ) with open(cc_mounts.FSTAB_PATH, "w") as fd: fd.write(fstab_original_content) cc_mounts.handle(None, {}, self.mock_cloud, self.mock_log, []) with open(cc_mounts.FSTAB_PATH, "r") as fd: fstab_new_content = fd.read() self.assertEqual(fstab_expected_content, fstab_new_content) def test_no_change_fstab_sets_needs_mount_all(self): """verify unchanged fstab entries are mounted if not call mount -a""" fstab_original_content = ( "LABEL=cloudimg-rootfs / ext4 defaults 0 0\n" "LABEL=UEFI /boot/efi vfat defaults 0 0\n" "/dev/vdb /mnt auto defaults,noexec,comment=cloudconfig 0 2\n" ) fstab_expected_content = fstab_original_content cc = {"mounts": [["/dev/vdb", "/mnt", "auto", "defaults,noexec"]]} with open(cc_mounts.FSTAB_PATH, "w") as fd: fd.write(fstab_original_content) with open(cc_mounts.FSTAB_PATH, "r") as fd: fstab_new_content = fd.read() self.assertEqual(fstab_expected_content, fstab_new_content) cc_mounts.handle(None, cc, self.mock_cloud, self.mock_log, []) self.m_subp_subp.assert_has_calls( [ mock.call(["mount", "-a"]), mock.call(["systemctl", "daemon-reload"]), ] ) class TestCreateSwapfile: @pytest.mark.parametrize("fstype", ("xfs", "btrfs", "ext4", "other")) @mock.patch(M_PATH + "util.get_mount_info") @mock.patch(M_PATH + "subp.subp") def test_happy_path(self, m_subp, m_get_mount_info, fstype, tmpdir): swap_file = tmpdir.join("swap-file") fname = str(swap_file) # Some of the calls to subp.subp should create the swap file; this # roughly approximates that m_subp.side_effect = lambda *args, **kwargs: swap_file.write("") m_get_mount_info.return_value = (mock.ANY, fstype) create_swapfile(fname, "") assert mock.call(["mkswap", fname]) in m_subp.call_args_list @mock.patch(M_PATH + "util.get_mount_info") @mock.patch(M_PATH + "subp.subp") def test_fallback_from_fallocate_to_dd( self, m_subp, m_get_mount_info, caplog, tmpdir ): swap_file = tmpdir.join("swap-file") fname = str(swap_file) def subp_side_effect(cmd, *args, **kwargs): # Mock fallocate failing, to initiate fallback if cmd[0] == "fallocate": raise ProcessExecutionError() m_subp.side_effect = subp_side_effect # Use ext4 so both fallocate and dd are valid swap creation methods m_get_mount_info.return_value = (mock.ANY, "ext4") create_swapfile(fname, "") cmds = [args[0][0] for args, _kwargs in m_subp.call_args_list] assert "fallocate" in cmds, "fallocate was not called" assert "dd" in cmds, "fallocate failure did not fallback to dd" assert cmds.index("dd") > cmds.index( "fallocate" ), "dd ran before fallocate" assert mock.call(["mkswap", fname]) in m_subp.call_args_list msg = "fallocate swap creation failed, will attempt with dd" assert msg in caplog.text # vi: ts=4 expandtab