diff options
Diffstat (limited to 'tests/unittests/config/test_cc_disk_setup.py')
-rw-r--r-- | tests/unittests/config/test_cc_disk_setup.py | 333 |
1 files changed, 333 insertions, 0 deletions
diff --git a/tests/unittests/config/test_cc_disk_setup.py b/tests/unittests/config/test_cc_disk_setup.py new file mode 100644 index 00000000..f2796e83 --- /dev/null +++ b/tests/unittests/config/test_cc_disk_setup.py @@ -0,0 +1,333 @@ +# This file is part of cloud-init. See LICENSE file for license information. + +import random +import re + +import pytest + +from cloudinit.config import cc_disk_setup +from cloudinit.config.schema import ( + SchemaValidationError, + get_schema, + validate_cloudconfig_schema, +) +from tests.unittests.helpers import ( + CiTestCase, + ExitStack, + TestCase, + mock, + skipUnlessJsonSchema, +) + + +class TestIsDiskUsed(TestCase): + def setUp(self): + super(TestIsDiskUsed, self).setUp() + self.patches = ExitStack() + mod_name = "cloudinit.config.cc_disk_setup" + self.enumerate_disk = self.patches.enter_context( + mock.patch("{0}.enumerate_disk".format(mod_name)) + ) + self.check_fs = self.patches.enter_context( + mock.patch("{0}.check_fs".format(mod_name)) + ) + + def tearDown(self): + super(TestIsDiskUsed, self).tearDown() + self.patches.close() + + def test_multiple_child_nodes_returns_true(self): + self.enumerate_disk.return_value = (mock.MagicMock() for _ in range(2)) + self.check_fs.return_value = (mock.MagicMock(), None, mock.MagicMock()) + self.assertTrue(cc_disk_setup.is_disk_used(mock.MagicMock())) + + def test_valid_filesystem_returns_true(self): + self.enumerate_disk.return_value = (mock.MagicMock() for _ in range(1)) + self.check_fs.return_value = ( + mock.MagicMock(), + "ext4", + mock.MagicMock(), + ) + self.assertTrue(cc_disk_setup.is_disk_used(mock.MagicMock())) + + def test_one_child_nodes_and_no_fs_returns_false(self): + self.enumerate_disk.return_value = (mock.MagicMock() for _ in range(1)) + self.check_fs.return_value = (mock.MagicMock(), None, mock.MagicMock()) + self.assertFalse(cc_disk_setup.is_disk_used(mock.MagicMock())) + + +class TestGetMbrHddSize(TestCase): + def setUp(self): + super(TestGetMbrHddSize, self).setUp() + self.patches = ExitStack() + self.subp = self.patches.enter_context( + mock.patch.object(cc_disk_setup.subp, "subp") + ) + + def tearDown(self): + super(TestGetMbrHddSize, self).tearDown() + self.patches.close() + + def _configure_subp_mock(self, hdd_size_in_bytes, sector_size_in_bytes): + def _subp(cmd, *args, **kwargs): + self.assertEqual(3, len(cmd)) + if "--getsize64" in cmd: + return hdd_size_in_bytes, None + elif "--getss" in cmd: + return sector_size_in_bytes, None + raise Exception("Unexpected blockdev command called") + + self.subp.side_effect = _subp + + def _test_for_sector_size(self, sector_size): + size_in_bytes = random.randint(10000, 10000000) * 512 + size_in_sectors = size_in_bytes / sector_size + self._configure_subp_mock(size_in_bytes, sector_size) + self.assertEqual( + size_in_sectors, cc_disk_setup.get_hdd_size("/dev/sda1") + ) + + def test_size_for_512_byte_sectors(self): + self._test_for_sector_size(512) + + def test_size_for_1024_byte_sectors(self): + self._test_for_sector_size(1024) + + def test_size_for_2048_byte_sectors(self): + self._test_for_sector_size(2048) + + def test_size_for_4096_byte_sectors(self): + self._test_for_sector_size(4096) + + +class TestGetPartitionMbrLayout(TestCase): + def test_single_partition_using_boolean(self): + self.assertEqual( + "0,", cc_disk_setup.get_partition_mbr_layout(1000, True) + ) + + def test_single_partition_using_list(self): + disk_size = random.randint(1000000, 1000000000000) + self.assertEqual( + ",,83", cc_disk_setup.get_partition_mbr_layout(disk_size, [100]) + ) + + def test_half_and_half(self): + disk_size = random.randint(1000000, 1000000000000) + expected_partition_size = int(float(disk_size) / 2) + self.assertEqual( + ",{0},83\n,,83".format(expected_partition_size), + cc_disk_setup.get_partition_mbr_layout(disk_size, [50, 50]), + ) + + def test_thirds_with_different_partition_type(self): + disk_size = random.randint(1000000, 1000000000000) + expected_partition_size = int(float(disk_size) * 0.33) + self.assertEqual( + ",{0},83\n,,82".format(expected_partition_size), + cc_disk_setup.get_partition_mbr_layout(disk_size, [33, [66, 82]]), + ) + + +class TestUpdateFsSetupDevices(TestCase): + def test_regression_1634678(self): + # Cf. https://bugs.launchpad.net/cloud-init/+bug/1634678 + fs_setup = { + "partition": "auto", + "device": "/dev/xvdb1", + "overwrite": False, + "label": "test", + "filesystem": "ext4", + } + + cc_disk_setup.update_fs_setup_devices( + [fs_setup], lambda device: device + ) + + self.assertEqual( + { + "_origname": "/dev/xvdb1", + "partition": "auto", + "device": "/dev/xvdb1", + "overwrite": False, + "label": "test", + "filesystem": "ext4", + }, + fs_setup, + ) + + def test_dotted_devname(self): + fs_setup = { + "partition": "auto", + "device": "ephemeral0.0", + "label": "test2", + "filesystem": "xfs", + } + + cc_disk_setup.update_fs_setup_devices( + [fs_setup], lambda device: device + ) + + self.assertEqual( + { + "_origname": "ephemeral0.0", + "_partition": "auto", + "partition": "0", + "device": "ephemeral0", + "label": "test2", + "filesystem": "xfs", + }, + fs_setup, + ) + + def test_dotted_devname_populates_partition(self): + fs_setup = { + "device": "ephemeral0.1", + "label": "test2", + "filesystem": "xfs", + } + cc_disk_setup.update_fs_setup_devices( + [fs_setup], lambda device: device + ) + self.assertEqual( + { + "_origname": "ephemeral0.1", + "device": "ephemeral0", + "partition": "1", + "label": "test2", + "filesystem": "xfs", + }, + fs_setup, + ) + + +@mock.patch( + "cloudinit.config.cc_disk_setup.assert_and_settle_device", + return_value=None, +) +@mock.patch( + "cloudinit.config.cc_disk_setup.find_device_node", + return_value=("/dev/xdb1", False), +) +@mock.patch("cloudinit.config.cc_disk_setup.device_type", return_value=None) +@mock.patch("cloudinit.config.cc_disk_setup.subp.subp", return_value=("", "")) +class TestMkfsCommandHandling(CiTestCase): + + with_logs = True + + def test_with_cmd(self, subp, *args): + """mkfs honors cmd and logs warnings when extra_opts or overwrite are + provided.""" + cc_disk_setup.mkfs( + { + "cmd": "mkfs -t %(filesystem)s -L %(label)s %(device)s", + "filesystem": "ext4", + "device": "/dev/xdb1", + "label": "with_cmd", + "extra_opts": ["should", "generate", "warning"], + "overwrite": "should generate warning too", + } + ) + + self.assertIn( + "extra_opts " + + "ignored because cmd was specified: mkfs -t ext4 -L with_cmd " + + "/dev/xdb1", + self.logs.getvalue(), + ) + self.assertIn( + "overwrite " + + "ignored because cmd was specified: mkfs -t ext4 -L with_cmd " + + "/dev/xdb1", + self.logs.getvalue(), + ) + + subp.assert_called_once_with( + "mkfs -t ext4 -L with_cmd /dev/xdb1", shell=True + ) + + @mock.patch("cloudinit.config.cc_disk_setup.subp.which") + def test_overwrite_and_extra_opts_without_cmd(self, m_which, subp, *args): + """mkfs observes extra_opts and overwrite settings when cmd is not + present.""" + m_which.side_effect = lambda p: {"mkfs.ext4": "/sbin/mkfs.ext4"}[p] + cc_disk_setup.mkfs( + { + "filesystem": "ext4", + "device": "/dev/xdb1", + "label": "without_cmd", + "extra_opts": ["are", "added"], + "overwrite": True, + } + ) + + subp.assert_called_once_with( + [ + "/sbin/mkfs.ext4", + "/dev/xdb1", + "-L", + "without_cmd", + "-F", + "are", + "added", + ], + shell=False, + ) + + @mock.patch("cloudinit.config.cc_disk_setup.subp.which") + def test_mkswap(self, m_which, subp, *args): + """mkfs observes extra_opts and overwrite settings when cmd is not + present.""" + m_which.side_effect = iter([None, "/sbin/mkswap"]) + cc_disk_setup.mkfs( + { + "filesystem": "swap", + "device": "/dev/xdb1", + "label": "swap", + "overwrite": True, + } + ) + + self.assertEqual( + [mock.call("mkfs.swap"), mock.call("mkswap")], + m_which.call_args_list, + ) + subp.assert_called_once_with( + ["/sbin/mkswap", "/dev/xdb1", "-L", "swap", "-f"], shell=False + ) + + +@skipUnlessJsonSchema() +class TestDebugSchema: + """Directly test schema rather than through handle.""" + + @pytest.mark.parametrize( + "config, error_msg", + ( + # Valid schemas tested by meta.examples in test_schema + # Invalid schemas + ({"disk_setup": 1}, "disk_setup: 1 is not of type 'object'"), + ({"fs_setup": 1}, "fs_setup: 1 is not of type 'array'"), + ( + {"device_aliases": 1}, + "device_aliases: 1 is not of type 'object'", + ), + ( + {"debug": {"boguskey": True}}, + re.escape( + "Additional properties are not allowed ('boguskey' was" + " unexpected)" + ), + ), + ), + ) + @skipUnlessJsonSchema() + def test_schema_validation(self, config, error_msg): + """Assert expected schema validation and error messages.""" + # New-style schema $defs exist in config/cloud-init-schema*.json + schema = get_schema() + with pytest.raises(SchemaValidationError, match=error_msg): + validate_cloudconfig_schema(config, schema, strict=True) + + +# vi: ts=4 expandtab |