diff options
Diffstat (limited to 'tests/unittests/config/test_cc_mounts.py')
-rw-r--r-- | tests/unittests/config/test_cc_mounts.py | 449 |
1 files changed, 255 insertions, 194 deletions
diff --git a/tests/unittests/config/test_cc_mounts.py b/tests/unittests/config/test_cc_mounts.py index fc65f108..084faacd 100644 --- a/tests/unittests/config/test_cc_mounts.py +++ b/tests/unittests/config/test_cc_mounts.py @@ -1,302 +1,363 @@ # This file is part of cloud-init. See LICENSE file for license information. -import pytest import os.path from unittest import mock -from tests.unittests import helpers as test_helpers +import pytest + from cloudinit.config import cc_mounts from cloudinit.config.cc_mounts import create_swapfile from cloudinit.subp import ProcessExecutionError +from tests.unittests import helpers as test_helpers -M_PATH = 'cloudinit.config.cc_mounts.' +M_PATH = "cloudinit.config.cc_mounts." class TestSanitizeDevname(test_helpers.FilesystemMockingTestCase): - def setUp(self): super(TestSanitizeDevname, self).setUp() self.new_root = self.tmp_dir() self.patchOS(self.new_root) def _touch(self, path): - path = os.path.join(self.new_root, path.lstrip('/')) + path = os.path.join(self.new_root, path.lstrip("/")) basedir = os.path.dirname(path) if not os.path.exists(basedir): os.makedirs(basedir) - open(path, 'a').close() + open(path, "a").close() def _makedirs(self, directory): - directory = os.path.join(self.new_root, directory.lstrip('/')) + directory = os.path.join(self.new_root, directory.lstrip("/")) if not os.path.exists(directory): os.makedirs(directory) def mock_existence_of_disk(self, disk_path): self._touch(disk_path) - self._makedirs(os.path.join('/sys/block', disk_path.split('/')[-1])) + self._makedirs(os.path.join("/sys/block", disk_path.split("/")[-1])) def mock_existence_of_partition(self, disk_path, partition_number): self.mock_existence_of_disk(disk_path) self._touch(disk_path + str(partition_number)) - disk_name = disk_path.split('/')[-1] - self._makedirs(os.path.join('/sys/block', - disk_name, - disk_name + str(partition_number))) + disk_name = disk_path.split("/")[-1] + self._makedirs( + os.path.join( + "/sys/block", disk_name, disk_name + str(partition_number) + ) + ) def test_existent_full_disk_path_is_returned(self): - disk_path = '/dev/sda' + disk_path = "/dev/sda" self.mock_existence_of_disk(disk_path) - self.assertEqual(disk_path, - cc_mounts.sanitize_devname(disk_path, - lambda x: None, - mock.Mock())) + self.assertEqual( + disk_path, + cc_mounts.sanitize_devname(disk_path, lambda x: None, mock.Mock()), + ) def test_existent_disk_name_returns_full_path(self): - disk_name = 'sda' - disk_path = '/dev/' + disk_name + disk_name = "sda" + disk_path = "/dev/" + disk_name self.mock_existence_of_disk(disk_path) - self.assertEqual(disk_path, - cc_mounts.sanitize_devname(disk_name, - lambda x: None, - mock.Mock())) + self.assertEqual( + disk_path, + cc_mounts.sanitize_devname(disk_name, lambda x: None, mock.Mock()), + ) def test_existent_meta_disk_is_returned(self): - actual_disk_path = '/dev/sda' + actual_disk_path = "/dev/sda" self.mock_existence_of_disk(actual_disk_path) self.assertEqual( actual_disk_path, - cc_mounts.sanitize_devname('ephemeral0', - lambda x: actual_disk_path, - mock.Mock())) + cc_mounts.sanitize_devname( + "ephemeral0", lambda x: actual_disk_path, mock.Mock() + ), + ) def test_existent_meta_partition_is_returned(self): - disk_name, partition_part = '/dev/sda', '1' + disk_name, partition_part = "/dev/sda", "1" actual_partition_path = disk_name + partition_part self.mock_existence_of_partition(disk_name, partition_part) self.assertEqual( actual_partition_path, - cc_mounts.sanitize_devname('ephemeral0.1', - lambda x: disk_name, - mock.Mock())) + cc_mounts.sanitize_devname( + "ephemeral0.1", lambda x: disk_name, mock.Mock() + ), + ) def test_existent_meta_partition_with_p_is_returned(self): - disk_name, partition_part = '/dev/sda', 'p1' + disk_name, partition_part = "/dev/sda", "p1" actual_partition_path = disk_name + partition_part self.mock_existence_of_partition(disk_name, partition_part) self.assertEqual( actual_partition_path, - cc_mounts.sanitize_devname('ephemeral0.1', - lambda x: disk_name, - mock.Mock())) + cc_mounts.sanitize_devname( + "ephemeral0.1", lambda x: disk_name, mock.Mock() + ), + ) def test_first_partition_returned_if_existent_disk_is_partitioned(self): - disk_name, partition_part = '/dev/sda', '1' + disk_name, partition_part = "/dev/sda", "1" actual_partition_path = disk_name + partition_part self.mock_existence_of_partition(disk_name, partition_part) self.assertEqual( actual_partition_path, - cc_mounts.sanitize_devname('ephemeral0', - lambda x: disk_name, - mock.Mock())) + cc_mounts.sanitize_devname( + "ephemeral0", lambda x: disk_name, mock.Mock() + ), + ) def test_nth_partition_returned_if_requested(self): - disk_name, partition_part = '/dev/sda', '3' + disk_name, partition_part = "/dev/sda", "3" actual_partition_path = disk_name + partition_part self.mock_existence_of_partition(disk_name, partition_part) self.assertEqual( actual_partition_path, - cc_mounts.sanitize_devname('ephemeral0.3', - lambda x: disk_name, - mock.Mock())) + cc_mounts.sanitize_devname( + "ephemeral0.3", lambda x: disk_name, mock.Mock() + ), + ) def test_transformer_returning_none_returns_none(self): self.assertIsNone( cc_mounts.sanitize_devname( - 'ephemeral0', lambda x: None, mock.Mock())) + "ephemeral0", lambda x: None, mock.Mock() + ) + ) def test_missing_device_returns_none(self): self.assertIsNone( - cc_mounts.sanitize_devname('/dev/sda', None, mock.Mock())) + cc_mounts.sanitize_devname("/dev/sda", None, mock.Mock()) + ) def test_missing_sys_returns_none(self): - disk_path = '/dev/sda' + disk_path = "/dev/sda" self._makedirs(disk_path) self.assertIsNone( - cc_mounts.sanitize_devname(disk_path, None, mock.Mock())) + cc_mounts.sanitize_devname(disk_path, None, mock.Mock()) + ) def test_existent_disk_but_missing_partition_returns_none(self): - disk_path = '/dev/sda' + disk_path = "/dev/sda" self.mock_existence_of_disk(disk_path) self.assertIsNone( cc_mounts.sanitize_devname( - 'ephemeral0.1', lambda x: disk_path, mock.Mock())) + "ephemeral0.1", lambda x: disk_path, mock.Mock() + ) + ) def test_network_device_returns_network_device(self): - disk_path = 'netdevice:/path' + disk_path = "netdevice:/path" self.assertEqual( - disk_path, - cc_mounts.sanitize_devname(disk_path, None, mock.Mock())) + disk_path, cc_mounts.sanitize_devname(disk_path, None, mock.Mock()) + ) def test_device_aliases_remapping(self): - disk_path = '/dev/sda' + disk_path = "/dev/sda" self.mock_existence_of_disk(disk_path) - self.assertEqual(disk_path, - cc_mounts.sanitize_devname('mydata', - lambda x: None, - mock.Mock(), - {'mydata': disk_path})) + self.assertEqual( + disk_path, + cc_mounts.sanitize_devname( + "mydata", lambda x: None, mock.Mock(), {"mydata": disk_path} + ), + ) class TestSwapFileCreation(test_helpers.FilesystemMockingTestCase): - def setUp(self): super(TestSwapFileCreation, self).setUp() self.new_root = self.tmp_dir() self.patchOS(self.new_root) - self.fstab_path = os.path.join(self.new_root, 'etc/fstab') - self.swap_path = os.path.join(self.new_root, 'swap.img') - self._makedirs('/etc') + self.fstab_path = os.path.join(self.new_root, "etc/fstab") + self.swap_path = os.path.join(self.new_root, "swap.img") + self._makedirs("/etc") - self.add_patch('cloudinit.config.cc_mounts.FSTAB_PATH', - 'mock_fstab_path', - self.fstab_path, - autospec=False) - - self.add_patch('cloudinit.config.cc_mounts.subp.subp', - 'm_subp_subp') + self.add_patch( + "cloudinit.config.cc_mounts.FSTAB_PATH", + "mock_fstab_path", + self.fstab_path, + autospec=False, + ) - self.add_patch('cloudinit.config.cc_mounts.util.mounts', - 'mock_util_mounts', - return_value={ - '/dev/sda1': {'fstype': 'ext4', - 'mountpoint': '/', - 'opts': 'rw,relatime,discard' - }}) + self.add_patch("cloudinit.config.cc_mounts.subp.subp", "m_subp_subp") + + self.add_patch( + "cloudinit.config.cc_mounts.util.mounts", + "mock_util_mounts", + return_value={ + "/dev/sda1": { + "fstype": "ext4", + "mountpoint": "/", + "opts": "rw,relatime,discard", + } + }, + ) self.mock_cloud = mock.Mock() self.mock_log = mock.Mock() self.mock_cloud.device_name_to_device = self.device_name_to_device self.cc = { - 'swap': { - 'filename': self.swap_path, - 'size': '512', - 'maxsize': '512'}} + "swap": { + "filename": self.swap_path, + "size": "512", + "maxsize": "512", + } + } def _makedirs(self, directory): - directory = os.path.join(self.new_root, directory.lstrip('/')) + directory = os.path.join(self.new_root, directory.lstrip("/")) if not os.path.exists(directory): os.makedirs(directory) def device_name_to_device(self, path): - if path == 'swap': + if path == "swap": return self.swap_path else: dev = None return dev - @mock.patch('cloudinit.util.get_mount_info') - @mock.patch('cloudinit.util.kernel_version') - def test_swap_creation_method_fallocate_on_xfs(self, m_kernel_version, - m_get_mount_info): + @mock.patch("cloudinit.util.get_mount_info") + @mock.patch("cloudinit.util.kernel_version") + def test_swap_creation_method_fallocate_on_xfs( + self, m_kernel_version, m_get_mount_info + ): m_kernel_version.return_value = (4, 20) m_get_mount_info.return_value = ["", "xfs"] cc_mounts.handle(None, self.cc, self.mock_cloud, self.mock_log, []) - self.m_subp_subp.assert_has_calls([ - mock.call(['fallocate', '-l', '0M', self.swap_path], capture=True), - mock.call(['mkswap', self.swap_path]), - mock.call(['swapon', '-a'])]) - - @mock.patch('cloudinit.util.get_mount_info') - @mock.patch('cloudinit.util.kernel_version') - def test_swap_creation_method_xfs(self, m_kernel_version, - m_get_mount_info): + self.m_subp_subp.assert_has_calls( + [ + mock.call( + ["fallocate", "-l", "0M", self.swap_path], capture=True + ), + mock.call(["mkswap", self.swap_path]), + mock.call(["swapon", "-a"]), + ] + ) + + @mock.patch("cloudinit.util.get_mount_info") + @mock.patch("cloudinit.util.kernel_version") + def test_swap_creation_method_xfs( + self, m_kernel_version, m_get_mount_info + ): m_kernel_version.return_value = (3, 18) m_get_mount_info.return_value = ["", "xfs"] cc_mounts.handle(None, self.cc, self.mock_cloud, self.mock_log, []) - self.m_subp_subp.assert_has_calls([ - mock.call(['dd', 'if=/dev/zero', - 'of=' + self.swap_path, - 'bs=1M', 'count=0'], capture=True), - mock.call(['mkswap', self.swap_path]), - mock.call(['swapon', '-a'])]) - - @mock.patch('cloudinit.util.get_mount_info') - @mock.patch('cloudinit.util.kernel_version') - def test_swap_creation_method_btrfs(self, m_kernel_version, - m_get_mount_info): + self.m_subp_subp.assert_has_calls( + [ + mock.call( + [ + "dd", + "if=/dev/zero", + "of=" + self.swap_path, + "bs=1M", + "count=0", + ], + capture=True, + ), + mock.call(["mkswap", self.swap_path]), + mock.call(["swapon", "-a"]), + ] + ) + + @mock.patch("cloudinit.util.get_mount_info") + @mock.patch("cloudinit.util.kernel_version") + def test_swap_creation_method_btrfs( + self, m_kernel_version, m_get_mount_info + ): m_kernel_version.return_value = (4, 20) m_get_mount_info.return_value = ["", "btrfs"] cc_mounts.handle(None, self.cc, self.mock_cloud, self.mock_log, []) - self.m_subp_subp.assert_has_calls([ - mock.call(['dd', 'if=/dev/zero', - 'of=' + self.swap_path, - 'bs=1M', 'count=0'], capture=True), - mock.call(['mkswap', self.swap_path]), - mock.call(['swapon', '-a'])]) - - @mock.patch('cloudinit.util.get_mount_info') - @mock.patch('cloudinit.util.kernel_version') - def test_swap_creation_method_ext4(self, m_kernel_version, - m_get_mount_info): + self.m_subp_subp.assert_has_calls( + [ + mock.call( + [ + "dd", + "if=/dev/zero", + "of=" + self.swap_path, + "bs=1M", + "count=0", + ], + capture=True, + ), + mock.call(["mkswap", self.swap_path]), + mock.call(["swapon", "-a"]), + ] + ) + + @mock.patch("cloudinit.util.get_mount_info") + @mock.patch("cloudinit.util.kernel_version") + def test_swap_creation_method_ext4( + self, m_kernel_version, m_get_mount_info + ): m_kernel_version.return_value = (5, 14) m_get_mount_info.return_value = ["", "ext4"] cc_mounts.handle(None, self.cc, self.mock_cloud, self.mock_log, []) - self.m_subp_subp.assert_has_calls([ - mock.call(['fallocate', '-l', '0M', self.swap_path], capture=True), - mock.call(['mkswap', self.swap_path]), - mock.call(['swapon', '-a'])]) + self.m_subp_subp.assert_has_calls( + [ + mock.call( + ["fallocate", "-l", "0M", self.swap_path], capture=True + ), + mock.call(["mkswap", self.swap_path]), + mock.call(["swapon", "-a"]), + ] + ) class TestFstabHandling(test_helpers.FilesystemMockingTestCase): - swap_path = '/dev/sdb1' + swap_path = "/dev/sdb1" def setUp(self): super(TestFstabHandling, self).setUp() self.new_root = self.tmp_dir() self.patchOS(self.new_root) - self.fstab_path = os.path.join(self.new_root, 'etc/fstab') - self._makedirs('/etc') - - self.add_patch('cloudinit.config.cc_mounts.FSTAB_PATH', - 'mock_fstab_path', - self.fstab_path, - autospec=False) + self.fstab_path = os.path.join(self.new_root, "etc/fstab") + self._makedirs("/etc") - self.add_patch('cloudinit.config.cc_mounts._is_block_device', - 'mock_is_block_device', - return_value=True) + self.add_patch( + "cloudinit.config.cc_mounts.FSTAB_PATH", + "mock_fstab_path", + self.fstab_path, + autospec=False, + ) - self.add_patch('cloudinit.config.cc_mounts.subp.subp', - 'm_subp_subp') + self.add_patch( + "cloudinit.config.cc_mounts._is_block_device", + "mock_is_block_device", + return_value=True, + ) - self.add_patch('cloudinit.config.cc_mounts.util.mounts', - 'mock_util_mounts', - return_value={ - '/dev/sda1': {'fstype': 'ext4', - 'mountpoint': '/', - 'opts': 'rw,relatime,discard' - }}) + self.add_patch("cloudinit.config.cc_mounts.subp.subp", "m_subp_subp") + + self.add_patch( + "cloudinit.config.cc_mounts.util.mounts", + "mock_util_mounts", + return_value={ + "/dev/sda1": { + "fstype": "ext4", + "mountpoint": "/", + "opts": "rw,relatime,discard", + } + }, + ) self.mock_cloud = mock.Mock() self.mock_log = mock.Mock() self.mock_cloud.device_name_to_device = self.device_name_to_device def _makedirs(self, directory): - directory = os.path.join(self.new_root, directory.lstrip('/')) + directory = os.path.join(self.new_root, directory.lstrip("/")) if not os.path.exists(directory): os.makedirs(directory) def device_name_to_device(self, path): - if path == 'swap': + if path == "swap": return self.swap_path else: dev = None @@ -304,127 +365,126 @@ class TestFstabHandling(test_helpers.FilesystemMockingTestCase): return dev def test_no_fstab(self): - """ Handle images which do not include an fstab. """ + """Handle images which do not include an fstab.""" self.assertFalse(os.path.exists(cc_mounts.FSTAB_PATH)) fstab_expected_content = ( - '%s\tnone\tswap\tsw,comment=cloudconfig\t' - '0\t0\n' % (self.swap_path,) + "%s\tnone\tswap\tsw,comment=cloudconfig\t0\t0\n" + % (self.swap_path,) ) cc_mounts.handle(None, {}, self.mock_cloud, self.mock_log, []) - with open(cc_mounts.FSTAB_PATH, 'r') as fd: + with open(cc_mounts.FSTAB_PATH, "r") as fd: fstab_new_content = fd.read() self.assertEqual(fstab_expected_content, fstab_new_content) def test_swap_integrity(self): - '''Ensure that the swap file is correctly created and can + """Ensure that the swap file is correctly created and can swapon successfully. Fixing the corner case of: - kernel: swapon: swapfile has holes''' + kernel: swapon: swapfile has holes""" - fstab = '/swap.img swap swap defaults 0 0\n' + fstab = "/swap.img swap swap defaults 0 0\n" - with open(cc_mounts.FSTAB_PATH, 'w') as fd: + with open(cc_mounts.FSTAB_PATH, "w") as fd: fd.write(fstab) - cc = {'swap': ['filename: /swap.img', 'size: 512', 'maxsize: 512']} + cc = {"swap": ["filename: /swap.img", "size: 512", "maxsize: 512"]} cc_mounts.handle(None, cc, self.mock_cloud, self.mock_log, []) def test_fstab_no_swap_device(self): - '''Ensure that cloud-init adds a discovered swap partition - to /etc/fstab.''' + """Ensure that cloud-init adds a discovered swap partition + to /etc/fstab.""" - fstab_original_content = '' + fstab_original_content = "" fstab_expected_content = ( - '%s\tnone\tswap\tsw,comment=cloudconfig\t' - '0\t0\n' % (self.swap_path,) + "%s\tnone\tswap\tsw,comment=cloudconfig\t0\t0\n" + % (self.swap_path,) ) - with open(cc_mounts.FSTAB_PATH, 'w') as fd: + with open(cc_mounts.FSTAB_PATH, "w") as fd: fd.write(fstab_original_content) cc_mounts.handle(None, {}, self.mock_cloud, self.mock_log, []) - with open(cc_mounts.FSTAB_PATH, 'r') as fd: + with open(cc_mounts.FSTAB_PATH, "r") as fd: fstab_new_content = fd.read() self.assertEqual(fstab_expected_content, fstab_new_content) def test_fstab_same_swap_device_already_configured(self): - '''Ensure that cloud-init will not add a swap device if the same - device already exists in /etc/fstab.''' + """Ensure that cloud-init will not add a swap device if the same + device already exists in /etc/fstab.""" - fstab_original_content = '%s swap swap defaults 0 0\n' % ( - self.swap_path,) + fstab_original_content = "%s swap swap defaults 0 0\n" % ( + self.swap_path, + ) fstab_expected_content = fstab_original_content - with open(cc_mounts.FSTAB_PATH, 'w') as fd: + with open(cc_mounts.FSTAB_PATH, "w") as fd: fd.write(fstab_original_content) cc_mounts.handle(None, {}, self.mock_cloud, self.mock_log, []) - with open(cc_mounts.FSTAB_PATH, 'r') as fd: + with open(cc_mounts.FSTAB_PATH, "r") as fd: fstab_new_content = fd.read() self.assertEqual(fstab_expected_content, fstab_new_content) def test_fstab_alternate_swap_device_already_configured(self): - '''Ensure that cloud-init will add a discovered swap device to + """Ensure that cloud-init will add a discovered swap device to /etc/fstab even when there exists a swap definition on another - device.''' + device.""" - fstab_original_content = '/dev/sdc1 swap swap defaults 0 0\n' + fstab_original_content = "/dev/sdc1 swap swap defaults 0 0\n" fstab_expected_content = ( - fstab_original_content + - '%s\tnone\tswap\tsw,comment=cloudconfig\t' - '0\t0\n' % (self.swap_path,) + fstab_original_content + + "%s\tnone\tswap\tsw,comment=cloudconfig\t0\t0\n" + % (self.swap_path,) ) - with open(cc_mounts.FSTAB_PATH, 'w') as fd: + with open(cc_mounts.FSTAB_PATH, "w") as fd: fd.write(fstab_original_content) cc_mounts.handle(None, {}, self.mock_cloud, self.mock_log, []) - with open(cc_mounts.FSTAB_PATH, 'r') as fd: + with open(cc_mounts.FSTAB_PATH, "r") as fd: fstab_new_content = fd.read() self.assertEqual(fstab_expected_content, fstab_new_content) def test_no_change_fstab_sets_needs_mount_all(self): - '''verify unchanged fstab entries are mounted if not call mount -a''' + """verify unchanged fstab entries are mounted if not call mount -a""" fstab_original_content = ( - 'LABEL=cloudimg-rootfs / ext4 defaults 0 0\n' - 'LABEL=UEFI /boot/efi vfat defaults 0 0\n' - '/dev/vdb /mnt auto defaults,noexec,comment=cloudconfig 0 2\n' + "LABEL=cloudimg-rootfs / ext4 defaults 0 0\n" + "LABEL=UEFI /boot/efi vfat defaults 0 0\n" + "/dev/vdb /mnt auto defaults,noexec,comment=cloudconfig 0 2\n" ) fstab_expected_content = fstab_original_content - cc = { - 'mounts': [ - ['/dev/vdb', '/mnt', 'auto', 'defaults,noexec'] - ] - } - with open(cc_mounts.FSTAB_PATH, 'w') as fd: + cc = {"mounts": [["/dev/vdb", "/mnt", "auto", "defaults,noexec"]]} + with open(cc_mounts.FSTAB_PATH, "w") as fd: fd.write(fstab_original_content) - with open(cc_mounts.FSTAB_PATH, 'r') as fd: + with open(cc_mounts.FSTAB_PATH, "r") as fd: fstab_new_content = fd.read() self.assertEqual(fstab_expected_content, fstab_new_content) cc_mounts.handle(None, cc, self.mock_cloud, self.mock_log, []) - self.m_subp_subp.assert_has_calls([ - mock.call(['mount', '-a']), - mock.call(['systemctl', 'daemon-reload'])]) + self.m_subp_subp.assert_has_calls( + [ + mock.call(["mount", "-a"]), + mock.call(["systemctl", "daemon-reload"]), + ] + ) class TestCreateSwapfile: - - @pytest.mark.parametrize('fstype', ('xfs', 'btrfs', 'ext4', 'other')) - @mock.patch(M_PATH + 'util.get_mount_info') - @mock.patch(M_PATH + 'subp.subp') + @pytest.mark.parametrize("fstype", ("xfs", "btrfs", "ext4", "other")) + @mock.patch(M_PATH + "util.get_mount_info") + @mock.patch(M_PATH + "subp.subp") def test_happy_path(self, m_subp, m_get_mount_info, fstype, tmpdir): swap_file = tmpdir.join("swap-file") fname = str(swap_file) # Some of the calls to subp.subp should create the swap file; this # roughly approximates that - m_subp.side_effect = lambda *args, **kwargs: swap_file.write('') + m_subp.side_effect = lambda *args, **kwargs: swap_file.write("") m_get_mount_info.return_value = (mock.ANY, fstype) - create_swapfile(fname, '') - assert mock.call(['mkswap', fname]) in m_subp.call_args_list + create_swapfile(fname, "") + assert mock.call(["mkswap", fname]) in m_subp.call_args_list @mock.patch(M_PATH + "util.get_mount_info") @mock.patch(M_PATH + "subp.subp") @@ -458,4 +518,5 @@ class TestCreateSwapfile: msg = "fallocate swap creation failed, will attempt with dd" assert msg in caplog.text + # vi: ts=4 expandtab |