diff options
Diffstat (limited to 'tests/unittests/test_handler')
-rw-r--r-- | tests/unittests/test_handler/test_handler_bootcmd.py | 145 | ||||
-rw-r--r-- | tests/unittests/test_handler/test_handler_resizefs.py | 257 | ||||
-rw-r--r-- | tests/unittests/test_handler/test_schema.py | 44 |
3 files changed, 415 insertions, 31 deletions
diff --git a/tests/unittests/test_handler/test_handler_bootcmd.py b/tests/unittests/test_handler/test_handler_bootcmd.py new file mode 100644 index 00000000..580017ed --- /dev/null +++ b/tests/unittests/test_handler/test_handler_bootcmd.py @@ -0,0 +1,145 @@ +# This file is part of cloud-init. See LICENSE file for license information. + +from cloudinit.config import cc_bootcmd +from cloudinit.sources import DataSourceNone +from cloudinit import (distros, helpers, cloud, util) +from cloudinit.tests.helpers import CiTestCase, mock, skipIf + +import logging +import tempfile + +try: + import jsonschema + assert jsonschema # avoid pyflakes error F401: import unused + _missing_jsonschema_dep = False +except ImportError: + _missing_jsonschema_dep = True + +LOG = logging.getLogger(__name__) + + +class FakeExtendedTempFile(object): + def __init__(self, suffix): + self.suffix = suffix + self.handle = tempfile.NamedTemporaryFile( + prefix="ci-%s." % self.__class__.__name__, delete=False) + + def __enter__(self): + return self.handle + + def __exit__(self, exc_type, exc_value, traceback): + self.handle.close() + + +class TestBootcmd(CiTestCase): + + with_logs = True + + _etmpfile_path = ('cloudinit.config.cc_bootcmd.temp_utils.' + 'ExtendedTemporaryFile') + + def setUp(self): + super(TestBootcmd, self).setUp() + self.subp = util.subp + self.new_root = self.tmp_dir() + + def _get_cloud(self, distro): + paths = helpers.Paths({}) + cls = distros.fetch(distro) + mydist = cls(distro, {}, paths) + myds = DataSourceNone.DataSourceNone({}, mydist, paths) + paths.datasource = myds + return cloud.Cloud(myds, paths, {}, mydist, None) + + def test_handler_skip_if_no_bootcmd(self): + """When the provided config doesn't contain bootcmd, skip it.""" + cfg = {} + mycloud = self._get_cloud('ubuntu') + cc_bootcmd.handle('notimportant', cfg, mycloud, LOG, None) + self.assertIn( + "Skipping module named notimportant, no 'bootcmd' key", + self.logs.getvalue()) + + def test_handler_invalid_command_set(self): + """Commands which can't be converted to shell will raise errors.""" + invalid_config = {'bootcmd': 1} + cc = self._get_cloud('ubuntu') + with self.assertRaises(TypeError) as context_manager: + cc_bootcmd.handle('cc_bootcmd', invalid_config, cc, LOG, []) + self.assertIn('Failed to shellify bootcmd', self.logs.getvalue()) + self.assertEqual( + "'int' object is not iterable", + str(context_manager.exception)) + + @skipIf(_missing_jsonschema_dep, "No python-jsonschema dependency") + def test_handler_schema_validation_warns_non_array_type(self): + """Schema validation warns of non-array type for bootcmd key. + + Schema validation is not strict, so bootcmd attempts to shellify the + invalid content. + """ + invalid_config = {'bootcmd': 1} + cc = self._get_cloud('ubuntu') + with self.assertRaises(TypeError): + cc_bootcmd.handle('cc_bootcmd', invalid_config, cc, LOG, []) + self.assertIn( + 'Invalid config:\nbootcmd: 1 is not of type \'array\'', + self.logs.getvalue()) + self.assertIn('Failed to shellify', self.logs.getvalue()) + + @skipIf(_missing_jsonschema_dep, 'No python-jsonschema dependency') + def test_handler_schema_validation_warns_non_array_item_type(self): + """Schema validation warns of non-array or string bootcmd items. + + Schema validation is not strict, so bootcmd attempts to shellify the + invalid content. + """ + invalid_config = { + 'bootcmd': ['ls /', 20, ['wget', 'http://stuff/blah'], {'a': 'n'}]} + cc = self._get_cloud('ubuntu') + with self.assertRaises(RuntimeError) as context_manager: + cc_bootcmd.handle('cc_bootcmd', invalid_config, cc, LOG, []) + expected_warnings = [ + 'bootcmd.1: 20 is not valid under any of the given schemas', + 'bootcmd.3: {\'a\': \'n\'} is not valid under any of the given' + ' schema' + ] + logs = self.logs.getvalue() + for warning in expected_warnings: + self.assertIn(warning, logs) + self.assertIn('Failed to shellify', logs) + self.assertEqual( + 'Unable to shellify type int which is not a list or string', + str(context_manager.exception)) + + def test_handler_creates_and_runs_bootcmd_script_with_instance_id(self): + """Valid schema runs a bootcmd script with INSTANCE_ID in the env.""" + cc = self._get_cloud('ubuntu') + out_file = self.tmp_path('bootcmd.out', self.new_root) + my_id = "b6ea0f59-e27d-49c6-9f87-79f19765a425" + valid_config = {'bootcmd': [ + 'echo {0} $INSTANCE_ID > {1}'.format(my_id, out_file)]} + + with mock.patch(self._etmpfile_path, FakeExtendedTempFile): + cc_bootcmd.handle('cc_bootcmd', valid_config, cc, LOG, []) + self.assertEqual(my_id + ' iid-datasource-none\n', + util.load_file(out_file)) + + def test_handler_runs_bootcmd_script_with_error(self): + """When a valid script generates an error, that error is raised.""" + cc = self._get_cloud('ubuntu') + valid_config = {'bootcmd': ['exit 1']} # Script with error + + with mock.patch(self._etmpfile_path, FakeExtendedTempFile): + with self.assertRaises(util.ProcessExecutionError) as ctxt_manager: + cc_bootcmd.handle('does-not-matter', valid_config, cc, LOG, []) + self.assertIn( + 'Unexpected error while running command.\n' + "Command: ['/bin/sh',", + str(ctxt_manager.exception)) + self.assertIn( + 'Failed to run bootcmd module does-not-matter', + self.logs.getvalue()) + + +# vi: ts=4 expandtab diff --git a/tests/unittests/test_handler/test_handler_resizefs.py b/tests/unittests/test_handler/test_handler_resizefs.py index 52591b8b..76dddbf8 100644 --- a/tests/unittests/test_handler/test_handler_resizefs.py +++ b/tests/unittests/test_handler/test_handler_resizefs.py @@ -1,17 +1,30 @@ # This file is part of cloud-init. See LICENSE file for license information. -from cloudinit.config import cc_resizefs +from cloudinit.config.cc_resizefs import ( + can_skip_resize, handle, is_device_path_writable_block, + rootdev_from_cmdline) +import logging import textwrap -import unittest + +from cloudinit.tests.helpers import (CiTestCase, mock, skipIf, util, + wrap_and_call) + + +LOG = logging.getLogger(__name__) + try: - from unittest import mock + import jsonschema + assert jsonschema # avoid pyflakes error F401: import unused + _missing_jsonschema_dep = False except ImportError: - import mock + _missing_jsonschema_dep = True + +class TestResizefs(CiTestCase): + with_logs = True -class TestResizefs(unittest.TestCase): def setUp(self): super(TestResizefs, self).setUp() self.name = "resizefs" @@ -34,7 +47,7 @@ class TestResizefs(unittest.TestCase): 58720296 3145728 3 freebsd-swap (1.5G) 61866024 1048496 - free - (512M) """) - res = cc_resizefs.can_skip_resize(fs_type, resize_what, devpth) + res = can_skip_resize(fs_type, resize_what, devpth) self.assertTrue(res) @mock.patch('cloudinit.config.cc_resizefs._get_dumpfs_output') @@ -52,8 +65,238 @@ class TestResizefs(unittest.TestCase): => 34 297086 da0 GPT (145M) 34 297086 1 freebsd-ufs (145M) """) - res = cc_resizefs.can_skip_resize(fs_type, resize_what, devpth) + res = can_skip_resize(fs_type, resize_what, devpth) self.assertTrue(res) + def test_handle_noops_on_disabled(self): + """The handle function logs when the configuration disables resize.""" + cfg = {'resize_rootfs': False} + handle('cc_resizefs', cfg, _cloud=None, log=LOG, args=[]) + self.assertIn( + 'DEBUG: Skipping module named cc_resizefs, resizing disabled\n', + self.logs.getvalue()) + + @skipIf(_missing_jsonschema_dep, "No python-jsonschema dependency") + def test_handle_schema_validation_logs_invalid_resize_rootfs_value(self): + """The handle reports json schema violations as a warning. + + Invalid values for resize_rootfs result in disabling the module. + """ + cfg = {'resize_rootfs': 'junk'} + handle('cc_resizefs', cfg, _cloud=None, log=LOG, args=[]) + logs = self.logs.getvalue() + self.assertIn( + "WARNING: Invalid config:\nresize_rootfs: 'junk' is not one of" + " [True, False, 'noblock']", + logs) + self.assertIn( + 'DEBUG: Skipping module named cc_resizefs, resizing disabled\n', + logs) + + @mock.patch('cloudinit.config.cc_resizefs.util.get_mount_info') + def test_handle_warns_on_unknown_mount_info(self, m_get_mount_info): + """handle warns when get_mount_info sees unknown filesystem for /.""" + m_get_mount_info.return_value = None + cfg = {'resize_rootfs': True} + handle('cc_resizefs', cfg, _cloud=None, log=LOG, args=[]) + logs = self.logs.getvalue() + self.assertNotIn("WARNING: Invalid config:\nresize_rootfs:", logs) + self.assertIn( + 'WARNING: Could not determine filesystem type of /\n', + logs) + self.assertEqual( + [mock.call('/', LOG)], + m_get_mount_info.call_args_list) + + def test_handle_warns_on_undiscoverable_root_path_in_commandline(self): + """handle noops when the root path is not found on the commandline.""" + cfg = {'resize_rootfs': True} + exists_mock_path = 'cloudinit.config.cc_resizefs.os.path.exists' + + def fake_mount_info(path, log): + self.assertEqual('/', path) + self.assertEqual(LOG, log) + return ('/dev/root', 'ext4', '/') + + with mock.patch(exists_mock_path) as m_exists: + m_exists.return_value = False + wrap_and_call( + 'cloudinit.config.cc_resizefs.util', + {'is_container': {'return_value': False}, + 'get_mount_info': {'side_effect': fake_mount_info}, + 'get_cmdline': {'return_value': 'BOOT_IMAGE=/vmlinuz.efi'}}, + handle, 'cc_resizefs', cfg, _cloud=None, log=LOG, + args=[]) + logs = self.logs.getvalue() + self.assertIn("WARNING: Unable to find device '/dev/root'", logs) + + +class TestRootDevFromCmdline(CiTestCase): + + def test_rootdev_from_cmdline_with_no_root(self): + """Return None from rootdev_from_cmdline when root is not present.""" + invalid_cases = [ + 'BOOT_IMAGE=/adsf asdfa werasef root adf', 'BOOT_IMAGE=/adsf', ''] + for case in invalid_cases: + self.assertIsNone(rootdev_from_cmdline(case)) + + def test_rootdev_from_cmdline_with_root_startswith_dev(self): + """Return the cmdline root when the path starts with /dev.""" + self.assertEqual( + '/dev/this', rootdev_from_cmdline('asdf root=/dev/this')) + + def test_rootdev_from_cmdline_with_root_without_dev_prefix(self): + """Add /dev prefix to cmdline root when the path lacks the prefix.""" + self.assertEqual('/dev/this', rootdev_from_cmdline('asdf root=this')) + + def test_rootdev_from_cmdline_with_root_with_label(self): + """When cmdline root contains a LABEL, our root is disk/by-label.""" + self.assertEqual( + '/dev/disk/by-label/unique', + rootdev_from_cmdline('asdf root=LABEL=unique')) + + def test_rootdev_from_cmdline_with_root_with_uuid(self): + """When cmdline root contains a UUID, our root is disk/by-uuid.""" + self.assertEqual( + '/dev/disk/by-uuid/adsfdsaf-adsf', + rootdev_from_cmdline('asdf root=UUID=adsfdsaf-adsf')) + + +class TestIsDevicePathWritableBlock(CiTestCase): + + with_logs = True + + def test_is_device_path_writable_block_warns_missing_cmdline_root(self): + """When root does not exist isn't in the cmdline, log warning.""" + info = 'does not matter' + + def fake_mount_info(path, log): + self.assertEqual('/', path) + self.assertEqual(LOG, log) + return ('/dev/root', 'ext4', '/') + + exists_mock_path = 'cloudinit.config.cc_resizefs.os.path.exists' + with mock.patch(exists_mock_path) as m_exists: + m_exists.return_value = False + is_valid = wrap_and_call( + 'cloudinit.config.cc_resizefs.util', + {'is_container': {'return_value': False}, + 'get_mount_info': {'side_effect': fake_mount_info}, + 'get_cmdline': {'return_value': 'BOOT_IMAGE=/vmlinuz.efi'}}, + is_device_path_writable_block, '/dev/root', info, LOG) + self.assertFalse(is_valid) + logs = self.logs.getvalue() + self.assertIn("WARNING: Unable to find device '/dev/root'", logs) + + def test_is_device_path_writable_block_does_not_exist(self): + """When devpath does not exist, a warning is logged.""" + info = 'dev=/I/dont/exist mnt_point=/ path=/dev/none' + is_valid = wrap_and_call( + 'cloudinit.config.cc_resizefs.util', + {'is_container': {'return_value': False}}, + is_device_path_writable_block, '/I/dont/exist', info, LOG) + self.assertFalse(is_valid) + self.assertIn( + "WARNING: Device '/I/dont/exist' did not exist." + ' cannot resize: %s' % info, + self.logs.getvalue()) + + def test_is_device_path_writable_block_does_not_exist_in_container(self): + """When devpath does not exist in a container, log a debug message.""" + info = 'dev=/I/dont/exist mnt_point=/ path=/dev/none' + is_valid = wrap_and_call( + 'cloudinit.config.cc_resizefs.util', + {'is_container': {'return_value': True}}, + is_device_path_writable_block, '/I/dont/exist', info, LOG) + self.assertFalse(is_valid) + self.assertIn( + "DEBUG: Device '/I/dont/exist' did not exist in container." + ' cannot resize: %s' % info, + self.logs.getvalue()) + + def test_is_device_path_writable_block_raises_oserror(self): + """When unexpected OSError is raises by os.stat it is reraised.""" + info = 'dev=/I/dont/exist mnt_point=/ path=/dev/none' + with self.assertRaises(OSError) as context_manager: + wrap_and_call( + 'cloudinit.config.cc_resizefs', + {'util.is_container': {'return_value': True}, + 'os.stat': {'side_effect': OSError('Something unexpected')}}, + is_device_path_writable_block, '/I/dont/exist', info, LOG) + self.assertEqual( + 'Something unexpected', str(context_manager.exception)) + + def test_is_device_path_writable_block_readonly(self): + """When root device is readonly, emit a warning and return False.""" + fake_devpath = self.tmp_path('dev/readonly') + util.write_file(fake_devpath, '', mode=0o400) # read-only + info = 'dev=/dev/root mnt_point=/ path={0}'.format(fake_devpath) + + exists_mock_path = 'cloudinit.config.cc_resizefs.os.path.exists' + with mock.patch(exists_mock_path) as m_exists: + m_exists.return_value = False + is_valid = wrap_and_call( + 'cloudinit.config.cc_resizefs.util', + {'is_container': {'return_value': False}, + 'rootdev_from_cmdline': {'return_value': fake_devpath}}, + is_device_path_writable_block, '/dev/root', info, LOG) + self.assertFalse(is_valid) + logs = self.logs.getvalue() + self.assertIn( + "Converted /dev/root to '{0}' per kernel cmdline".format( + fake_devpath), + logs) + self.assertIn( + "WARNING: '{0}' not writable. cannot resize".format(fake_devpath), + logs) + + def test_is_device_path_writable_block_readonly_in_container(self): + """When root device is readonly, emit debug log and return False.""" + fake_devpath = self.tmp_path('dev/readonly') + util.write_file(fake_devpath, '', mode=0o400) # read-only + info = 'dev=/dev/root mnt_point=/ path={0}'.format(fake_devpath) + + is_valid = wrap_and_call( + 'cloudinit.config.cc_resizefs.util', + {'is_container': {'return_value': True}}, + is_device_path_writable_block, fake_devpath, info, LOG) + self.assertFalse(is_valid) + self.assertIn( + "DEBUG: '{0}' not writable in container. cannot resize".format( + fake_devpath), + self.logs.getvalue()) + + def test_is_device_path_writable_block_non_block(self): + """When device is not a block device, emit warning return False.""" + fake_devpath = self.tmp_path('dev/readwrite') + util.write_file(fake_devpath, '', mode=0o600) # read-write + info = 'dev=/dev/root mnt_point=/ path={0}'.format(fake_devpath) + + is_valid = wrap_and_call( + 'cloudinit.config.cc_resizefs.util', + {'is_container': {'return_value': False}}, + is_device_path_writable_block, fake_devpath, info, LOG) + self.assertFalse(is_valid) + self.assertIn( + "WARNING: device '{0}' not a block device. cannot resize".format( + fake_devpath), + self.logs.getvalue()) + + def test_is_device_path_writable_block_non_block_on_container(self): + """When device is non-block device in container, emit debug log.""" + fake_devpath = self.tmp_path('dev/readwrite') + util.write_file(fake_devpath, '', mode=0o600) # read-write + info = 'dev=/dev/root mnt_point=/ path={0}'.format(fake_devpath) + + is_valid = wrap_and_call( + 'cloudinit.config.cc_resizefs.util', + {'is_container': {'return_value': True}}, + is_device_path_writable_block, fake_devpath, info, LOG) + self.assertFalse(is_valid) + self.assertIn( + "DEBUG: device '{0}' not a block device in container." + ' cannot resize'.format(fake_devpath), + self.logs.getvalue()) + # vi: ts=4 expandtab diff --git a/tests/unittests/test_handler/test_schema.py b/tests/unittests/test_handler/test_schema.py index 6137e3cf..745bb0ff 100644 --- a/tests/unittests/test_handler/test_schema.py +++ b/tests/unittests/test_handler/test_schema.py @@ -27,7 +27,7 @@ class GetSchemaTest(CiTestCase): """Every cloudconfig module with schema is listed in allOf keyword.""" schema = get_schema() self.assertItemsEqual( - ['cc_ntp', 'cc_runcmd'], + ['cc_bootcmd', 'cc_ntp', 'cc_resizefs', 'cc_runcmd'], [subschema['id'] for subschema in schema['allOf']]) self.assertEqual('cloud-config-schema', schema['id']) self.assertEqual( @@ -205,6 +205,17 @@ class GetSchemaDocTest(CiTestCase): '**prop1:** (string/integer) prop-description', get_schema_doc(full_schema)) + def test_get_schema_doc_handles_enum_types(self): + """get_schema_doc converts enum types to yaml and delimits with '/'.""" + full_schema = copy(self.required_schema) + full_schema.update( + {'properties': { + 'prop1': {'enum': [True, False, 'stuff'], + 'description': 'prop-description'}}}) + self.assertIn( + '**prop1:** (true/false/stuff) prop-description', + get_schema_doc(full_schema)) + def test_get_schema_doc_handles_nested_oneof_property_types(self): """get_schema_doc describes array items oneOf declarations in type.""" full_schema = copy(self.required_schema) @@ -219,29 +230,11 @@ class GetSchemaDocTest(CiTestCase): '**prop1:** (array of (string)/(integer)) prop-description', get_schema_doc(full_schema)) - def test_get_schema_doc_returns_restructured_text_with_examples(self): - """get_schema_doc returns indented examples when present in schema.""" - full_schema = copy(self.required_schema) - full_schema.update( - {'examples': [{'ex1': [1, 2, 3]}], - 'properties': { - 'prop1': {'type': 'array', 'description': 'prop-description', - 'items': {'type': 'integer'}}}}) - self.assertIn( - dedent(""" - **Config schema**: - **prop1:** (array of integer) prop-description - - **Examples**:: - - ex1"""), - get_schema_doc(full_schema)) - - def test_get_schema_doc_handles_unstructured_examples(self): - """get_schema_doc properly indented examples which as just strings.""" + def test_get_schema_doc_handles_string_examples(self): + """get_schema_doc properly indented examples as a list of strings.""" full_schema = copy(self.required_schema) full_schema.update( - {'examples': ['My example:\n [don\'t, expand, "this"]'], + {'examples': ['ex1:\n [don\'t, expand, "this"]', 'ex2: true'], 'properties': { 'prop1': {'type': 'array', 'description': 'prop-description', 'items': {'type': 'integer'}}}}) @@ -252,8 +245,11 @@ class GetSchemaDocTest(CiTestCase): **Examples**:: - My example: - [don't, expand, "this"]"""), + ex1: + [don't, expand, "this"] + # --- Example2 --- + ex2: true + """), get_schema_doc(full_schema)) def test_get_schema_doc_raises_key_errors(self): |