summaryrefslogtreecommitdiff
path: root/tests/unittests/config/test_cc_disk_setup.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/unittests/config/test_cc_disk_setup.py')
-rw-r--r--tests/unittests/config/test_cc_disk_setup.py333
1 files changed, 333 insertions, 0 deletions
diff --git a/tests/unittests/config/test_cc_disk_setup.py b/tests/unittests/config/test_cc_disk_setup.py
new file mode 100644
index 00000000..f2796e83
--- /dev/null
+++ b/tests/unittests/config/test_cc_disk_setup.py
@@ -0,0 +1,333 @@
+# This file is part of cloud-init. See LICENSE file for license information.
+
+import random
+import re
+
+import pytest
+
+from cloudinit.config import cc_disk_setup
+from cloudinit.config.schema import (
+ SchemaValidationError,
+ get_schema,
+ validate_cloudconfig_schema,
+)
+from tests.unittests.helpers import (
+ CiTestCase,
+ ExitStack,
+ TestCase,
+ mock,
+ skipUnlessJsonSchema,
+)
+
+
+class TestIsDiskUsed(TestCase):
+ def setUp(self):
+ super(TestIsDiskUsed, self).setUp()
+ self.patches = ExitStack()
+ mod_name = "cloudinit.config.cc_disk_setup"
+ self.enumerate_disk = self.patches.enter_context(
+ mock.patch("{0}.enumerate_disk".format(mod_name))
+ )
+ self.check_fs = self.patches.enter_context(
+ mock.patch("{0}.check_fs".format(mod_name))
+ )
+
+ def tearDown(self):
+ super(TestIsDiskUsed, self).tearDown()
+ self.patches.close()
+
+ def test_multiple_child_nodes_returns_true(self):
+ self.enumerate_disk.return_value = (mock.MagicMock() for _ in range(2))
+ self.check_fs.return_value = (mock.MagicMock(), None, mock.MagicMock())
+ self.assertTrue(cc_disk_setup.is_disk_used(mock.MagicMock()))
+
+ def test_valid_filesystem_returns_true(self):
+ self.enumerate_disk.return_value = (mock.MagicMock() for _ in range(1))
+ self.check_fs.return_value = (
+ mock.MagicMock(),
+ "ext4",
+ mock.MagicMock(),
+ )
+ self.assertTrue(cc_disk_setup.is_disk_used(mock.MagicMock()))
+
+ def test_one_child_nodes_and_no_fs_returns_false(self):
+ self.enumerate_disk.return_value = (mock.MagicMock() for _ in range(1))
+ self.check_fs.return_value = (mock.MagicMock(), None, mock.MagicMock())
+ self.assertFalse(cc_disk_setup.is_disk_used(mock.MagicMock()))
+
+
+class TestGetMbrHddSize(TestCase):
+ def setUp(self):
+ super(TestGetMbrHddSize, self).setUp()
+ self.patches = ExitStack()
+ self.subp = self.patches.enter_context(
+ mock.patch.object(cc_disk_setup.subp, "subp")
+ )
+
+ def tearDown(self):
+ super(TestGetMbrHddSize, self).tearDown()
+ self.patches.close()
+
+ def _configure_subp_mock(self, hdd_size_in_bytes, sector_size_in_bytes):
+ def _subp(cmd, *args, **kwargs):
+ self.assertEqual(3, len(cmd))
+ if "--getsize64" in cmd:
+ return hdd_size_in_bytes, None
+ elif "--getss" in cmd:
+ return sector_size_in_bytes, None
+ raise Exception("Unexpected blockdev command called")
+
+ self.subp.side_effect = _subp
+
+ def _test_for_sector_size(self, sector_size):
+ size_in_bytes = random.randint(10000, 10000000) * 512
+ size_in_sectors = size_in_bytes / sector_size
+ self._configure_subp_mock(size_in_bytes, sector_size)
+ self.assertEqual(
+ size_in_sectors, cc_disk_setup.get_hdd_size("/dev/sda1")
+ )
+
+ def test_size_for_512_byte_sectors(self):
+ self._test_for_sector_size(512)
+
+ def test_size_for_1024_byte_sectors(self):
+ self._test_for_sector_size(1024)
+
+ def test_size_for_2048_byte_sectors(self):
+ self._test_for_sector_size(2048)
+
+ def test_size_for_4096_byte_sectors(self):
+ self._test_for_sector_size(4096)
+
+
+class TestGetPartitionMbrLayout(TestCase):
+ def test_single_partition_using_boolean(self):
+ self.assertEqual(
+ "0,", cc_disk_setup.get_partition_mbr_layout(1000, True)
+ )
+
+ def test_single_partition_using_list(self):
+ disk_size = random.randint(1000000, 1000000000000)
+ self.assertEqual(
+ ",,83", cc_disk_setup.get_partition_mbr_layout(disk_size, [100])
+ )
+
+ def test_half_and_half(self):
+ disk_size = random.randint(1000000, 1000000000000)
+ expected_partition_size = int(float(disk_size) / 2)
+ self.assertEqual(
+ ",{0},83\n,,83".format(expected_partition_size),
+ cc_disk_setup.get_partition_mbr_layout(disk_size, [50, 50]),
+ )
+
+ def test_thirds_with_different_partition_type(self):
+ disk_size = random.randint(1000000, 1000000000000)
+ expected_partition_size = int(float(disk_size) * 0.33)
+ self.assertEqual(
+ ",{0},83\n,,82".format(expected_partition_size),
+ cc_disk_setup.get_partition_mbr_layout(disk_size, [33, [66, 82]]),
+ )
+
+
+class TestUpdateFsSetupDevices(TestCase):
+ def test_regression_1634678(self):
+ # Cf. https://bugs.launchpad.net/cloud-init/+bug/1634678
+ fs_setup = {
+ "partition": "auto",
+ "device": "/dev/xvdb1",
+ "overwrite": False,
+ "label": "test",
+ "filesystem": "ext4",
+ }
+
+ cc_disk_setup.update_fs_setup_devices(
+ [fs_setup], lambda device: device
+ )
+
+ self.assertEqual(
+ {
+ "_origname": "/dev/xvdb1",
+ "partition": "auto",
+ "device": "/dev/xvdb1",
+ "overwrite": False,
+ "label": "test",
+ "filesystem": "ext4",
+ },
+ fs_setup,
+ )
+
+ def test_dotted_devname(self):
+ fs_setup = {
+ "partition": "auto",
+ "device": "ephemeral0.0",
+ "label": "test2",
+ "filesystem": "xfs",
+ }
+
+ cc_disk_setup.update_fs_setup_devices(
+ [fs_setup], lambda device: device
+ )
+
+ self.assertEqual(
+ {
+ "_origname": "ephemeral0.0",
+ "_partition": "auto",
+ "partition": "0",
+ "device": "ephemeral0",
+ "label": "test2",
+ "filesystem": "xfs",
+ },
+ fs_setup,
+ )
+
+ def test_dotted_devname_populates_partition(self):
+ fs_setup = {
+ "device": "ephemeral0.1",
+ "label": "test2",
+ "filesystem": "xfs",
+ }
+ cc_disk_setup.update_fs_setup_devices(
+ [fs_setup], lambda device: device
+ )
+ self.assertEqual(
+ {
+ "_origname": "ephemeral0.1",
+ "device": "ephemeral0",
+ "partition": "1",
+ "label": "test2",
+ "filesystem": "xfs",
+ },
+ fs_setup,
+ )
+
+
+@mock.patch(
+ "cloudinit.config.cc_disk_setup.assert_and_settle_device",
+ return_value=None,
+)
+@mock.patch(
+ "cloudinit.config.cc_disk_setup.find_device_node",
+ return_value=("/dev/xdb1", False),
+)
+@mock.patch("cloudinit.config.cc_disk_setup.device_type", return_value=None)
+@mock.patch("cloudinit.config.cc_disk_setup.subp.subp", return_value=("", ""))
+class TestMkfsCommandHandling(CiTestCase):
+
+ with_logs = True
+
+ def test_with_cmd(self, subp, *args):
+ """mkfs honors cmd and logs warnings when extra_opts or overwrite are
+ provided."""
+ cc_disk_setup.mkfs(
+ {
+ "cmd": "mkfs -t %(filesystem)s -L %(label)s %(device)s",
+ "filesystem": "ext4",
+ "device": "/dev/xdb1",
+ "label": "with_cmd",
+ "extra_opts": ["should", "generate", "warning"],
+ "overwrite": "should generate warning too",
+ }
+ )
+
+ self.assertIn(
+ "extra_opts "
+ + "ignored because cmd was specified: mkfs -t ext4 -L with_cmd "
+ + "/dev/xdb1",
+ self.logs.getvalue(),
+ )
+ self.assertIn(
+ "overwrite "
+ + "ignored because cmd was specified: mkfs -t ext4 -L with_cmd "
+ + "/dev/xdb1",
+ self.logs.getvalue(),
+ )
+
+ subp.assert_called_once_with(
+ "mkfs -t ext4 -L with_cmd /dev/xdb1", shell=True
+ )
+
+ @mock.patch("cloudinit.config.cc_disk_setup.subp.which")
+ def test_overwrite_and_extra_opts_without_cmd(self, m_which, subp, *args):
+ """mkfs observes extra_opts and overwrite settings when cmd is not
+ present."""
+ m_which.side_effect = lambda p: {"mkfs.ext4": "/sbin/mkfs.ext4"}[p]
+ cc_disk_setup.mkfs(
+ {
+ "filesystem": "ext4",
+ "device": "/dev/xdb1",
+ "label": "without_cmd",
+ "extra_opts": ["are", "added"],
+ "overwrite": True,
+ }
+ )
+
+ subp.assert_called_once_with(
+ [
+ "/sbin/mkfs.ext4",
+ "/dev/xdb1",
+ "-L",
+ "without_cmd",
+ "-F",
+ "are",
+ "added",
+ ],
+ shell=False,
+ )
+
+ @mock.patch("cloudinit.config.cc_disk_setup.subp.which")
+ def test_mkswap(self, m_which, subp, *args):
+ """mkfs observes extra_opts and overwrite settings when cmd is not
+ present."""
+ m_which.side_effect = iter([None, "/sbin/mkswap"])
+ cc_disk_setup.mkfs(
+ {
+ "filesystem": "swap",
+ "device": "/dev/xdb1",
+ "label": "swap",
+ "overwrite": True,
+ }
+ )
+
+ self.assertEqual(
+ [mock.call("mkfs.swap"), mock.call("mkswap")],
+ m_which.call_args_list,
+ )
+ subp.assert_called_once_with(
+ ["/sbin/mkswap", "/dev/xdb1", "-L", "swap", "-f"], shell=False
+ )
+
+
+@skipUnlessJsonSchema()
+class TestDebugSchema:
+ """Directly test schema rather than through handle."""
+
+ @pytest.mark.parametrize(
+ "config, error_msg",
+ (
+ # Valid schemas tested by meta.examples in test_schema
+ # Invalid schemas
+ ({"disk_setup": 1}, "disk_setup: 1 is not of type 'object'"),
+ ({"fs_setup": 1}, "fs_setup: 1 is not of type 'array'"),
+ (
+ {"device_aliases": 1},
+ "device_aliases: 1 is not of type 'object'",
+ ),
+ (
+ {"debug": {"boguskey": True}},
+ re.escape(
+ "Additional properties are not allowed ('boguskey' was"
+ " unexpected)"
+ ),
+ ),
+ ),
+ )
+ @skipUnlessJsonSchema()
+ def test_schema_validation(self, config, error_msg):
+ """Assert expected schema validation and error messages."""
+ # New-style schema $defs exist in config/cloud-init-schema*.json
+ schema = get_schema()
+ with pytest.raises(SchemaValidationError, match=error_msg):
+ validate_cloudconfig_schema(config, schema, strict=True)
+
+
+# vi: ts=4 expandtab