diff options
Diffstat (limited to 'tests/unittests/test_builtin_handlers.py')
-rw-r--r-- | tests/unittests/test_builtin_handlers.py | 405 |
1 files changed, 235 insertions, 170 deletions
diff --git a/tests/unittests/test_builtin_handlers.py b/tests/unittests/test_builtin_handlers.py index cf2c0a4d..a057be2a 100644 --- a/tests/unittests/test_builtin_handlers.py +++ b/tests/unittests/test_builtin_handlers.py @@ -5,54 +5,59 @@ import copy import errno import os -import pytest import shutil import tempfile from textwrap import dedent +import pytest -from tests.unittests.helpers import ( - FilesystemMockingTestCase, CiTestCase, mock, skipUnlessJinja) - -from cloudinit import handlers -from cloudinit import helpers -from cloudinit import subp -from cloudinit import util - +from cloudinit import handlers, helpers, subp, util from cloudinit.handlers.cloud_config import CloudConfigPartHandler from cloudinit.handlers.jinja_template import ( - JinjaTemplatePartHandler, convert_jinja_instance_data, - render_jinja_payload) + JinjaTemplatePartHandler, + convert_jinja_instance_data, + render_jinja_payload, +) from cloudinit.handlers.shell_script import ShellScriptPartHandler from cloudinit.handlers.upstart_job import UpstartJobPartHandler +from cloudinit.settings import PER_ALWAYS, PER_INSTANCE +from tests.unittests.helpers import ( + CiTestCase, + FilesystemMockingTestCase, + mock, + skipUnlessJinja, +) -from cloudinit.settings import (PER_ALWAYS, PER_INSTANCE) - -INSTANCE_DATA_FILE = 'instance-data-sensitive.json' +INSTANCE_DATA_FILE = "instance-data-sensitive.json" class TestUpstartJobPartHandler(FilesystemMockingTestCase): - mpath = 'cloudinit.handlers.upstart_job.' + mpath = "cloudinit.handlers.upstart_job." def test_upstart_frequency_no_out(self): c_root = tempfile.mkdtemp() self.addCleanup(shutil.rmtree, c_root) up_root = tempfile.mkdtemp() self.addCleanup(shutil.rmtree, up_root) - paths = helpers.Paths({ - 'cloud_dir': c_root, - 'upstart_dir': up_root, - }) + paths = helpers.Paths( + { + "cloud_dir": c_root, + "upstart_dir": up_root, + } + ) h = UpstartJobPartHandler(paths) # No files should be written out when # the frequency is ! per-instance - h.handle_part('', handlers.CONTENT_START, - None, None, None) - h.handle_part('blah', 'text/upstart-job', - 'test.conf', 'blah', frequency=PER_ALWAYS) - h.handle_part('', handlers.CONTENT_END, - None, None, None) + h.handle_part("", handlers.CONTENT_START, None, None, None) + h.handle_part( + "blah", + "text/upstart-job", + "test.conf", + "blah", + frequency=PER_ALWAYS, + ) + h.handle_part("", handlers.CONTENT_END, None, None, None) self.assertEqual(0, len(os.listdir(up_root))) def test_upstart_frequency_single(self): @@ -62,47 +67,54 @@ class TestUpstartJobPartHandler(FilesystemMockingTestCase): self.patchOS(new_root) self.patchUtils(new_root) - paths = helpers.Paths({ - 'upstart_dir': "/etc/upstart", - }) + paths = helpers.Paths( + { + "upstart_dir": "/etc/upstart", + } + ) util.ensure_dir("/run") util.ensure_dir("/etc/upstart") - with mock.patch(self.mpath + 'SUITABLE_UPSTART', return_value=True): - with mock.patch.object(subp, 'subp') as m_subp: + with mock.patch(self.mpath + "SUITABLE_UPSTART", return_value=True): + with mock.patch.object(subp, "subp") as m_subp: h = UpstartJobPartHandler(paths) - h.handle_part('', handlers.CONTENT_START, - None, None, None) - h.handle_part('blah', 'text/upstart-job', - 'test.conf', 'blah', frequency=PER_INSTANCE) - h.handle_part('', handlers.CONTENT_END, - None, None, None) + h.handle_part("", handlers.CONTENT_START, None, None, None) + h.handle_part( + "blah", + "text/upstart-job", + "test.conf", + "blah", + frequency=PER_INSTANCE, + ) + h.handle_part("", handlers.CONTENT_END, None, None, None) - self.assertEqual(len(os.listdir('/etc/upstart')), 1) + self.assertEqual(len(os.listdir("/etc/upstart")), 1) m_subp.assert_called_once_with( - ['initctl', 'reload-configuration'], capture=False) + ["initctl", "reload-configuration"], capture=False + ) class TestJinjaTemplatePartHandler(CiTestCase): with_logs = True - mpath = 'cloudinit.handlers.jinja_template.' + mpath = "cloudinit.handlers.jinja_template." def setUp(self): super(TestJinjaTemplatePartHandler, self).setUp() self.tmp = self.tmp_dir() - self.run_dir = os.path.join(self.tmp, 'run_dir') + self.run_dir = os.path.join(self.tmp, "run_dir") util.ensure_dir(self.run_dir) - self.paths = helpers.Paths({ - 'cloud_dir': self.tmp, 'run_dir': self.run_dir}) + self.paths = helpers.Paths( + {"cloud_dir": self.tmp, "run_dir": self.run_dir} + ) def test_jinja_template_part_handler_defaults(self): """On init, paths are saved and subhandler types are empty.""" h = JinjaTemplatePartHandler(self.paths) - self.assertEqual(['## template: jinja'], h.prefixes) + self.assertEqual(["## template: jinja"], h.prefixes) self.assertEqual(3, h.handler_version) self.assertEqual(self.paths, h.paths) self.assertEqual({}, h.sub_handlers) @@ -112,34 +124,47 @@ class TestJinjaTemplatePartHandler(CiTestCase): script_handler = ShellScriptPartHandler(self.paths) cloudconfig_handler = CloudConfigPartHandler(self.paths) h = JinjaTemplatePartHandler( - self.paths, sub_handlers=[script_handler, cloudconfig_handler]) + self.paths, sub_handlers=[script_handler, cloudconfig_handler] + ) self.assertCountEqual( - ['text/cloud-config', 'text/cloud-config-jsonp', - 'text/x-shellscript'], - h.sub_handlers) + [ + "text/cloud-config", + "text/cloud-config-jsonp", + "text/x-shellscript", + ], + h.sub_handlers, + ) def test_jinja_template_part_handler_looks_up_subhandler_types(self): """When sub_handlers are passed, init lists types of subhandlers.""" script_handler = ShellScriptPartHandler(self.paths) cloudconfig_handler = CloudConfigPartHandler(self.paths) h = JinjaTemplatePartHandler( - self.paths, sub_handlers=[script_handler, cloudconfig_handler]) + self.paths, sub_handlers=[script_handler, cloudconfig_handler] + ) self.assertCountEqual( - ['text/cloud-config', 'text/cloud-config-jsonp', - 'text/x-shellscript'], - h.sub_handlers) + [ + "text/cloud-config", + "text/cloud-config-jsonp", + "text/x-shellscript", + ], + h.sub_handlers, + ) def test_jinja_template_handle_noop_on_content_signals(self): """Perform no part handling when content type is CONTENT_SIGNALS.""" script_handler = ShellScriptPartHandler(self.paths) - h = JinjaTemplatePartHandler( - self.paths, sub_handlers=[script_handler]) - with mock.patch.object(script_handler, 'handle_part') as m_handle_part: + h = JinjaTemplatePartHandler(self.paths, sub_handlers=[script_handler]) + with mock.patch.object(script_handler, "handle_part") as m_handle_part: h.handle_part( - data='data', ctype=handlers.CONTENT_START, filename='part-1', - payload='## template: jinja\n#!/bin/bash\necho himom', - frequency='freq', headers='headers') + data="data", + ctype=handlers.CONTENT_START, + filename="part-1", + payload="## template: jinja\n#!/bin/bash\necho himom", + frequency="freq", + headers="headers", + ) m_handle_part.assert_not_called() @skipUnlessJinja() @@ -150,19 +175,22 @@ class TestJinjaTemplatePartHandler(CiTestCase): # Create required instance data json file instance_json = os.path.join(self.run_dir, INSTANCE_DATA_FILE) - instance_data = {'topkey': 'echo himom'} + instance_data = {"topkey": "echo himom"} util.write_file(instance_json, util.json_dumps(instance_data)) - h = JinjaTemplatePartHandler( - self.paths, sub_handlers=[script_handler]) - with mock.patch.object(script_handler, 'handle_part') as m_part: + h = JinjaTemplatePartHandler(self.paths, sub_handlers=[script_handler]) + with mock.patch.object(script_handler, "handle_part") as m_part: # ctype with leading '!' not in handlers.CONTENT_SIGNALS h.handle_part( - data='data', ctype="!" + handlers.CONTENT_START, - filename='part01', - payload='## template: jinja \t \n#!/bin/bash\n{{ topkey }}', - frequency='freq', headers='headers') + data="data", + ctype="!" + handlers.CONTENT_START, + filename="part01", + payload="## template: jinja \t \n#!/bin/bash\n{{ topkey }}", + frequency="freq", + headers="headers", + ) m_part.assert_called_once_with( - 'data', '!__begin__', 'part01', '#!/bin/bash\necho himom', 'freq') + "data", "!__begin__", "part01", "#!/bin/bash\necho himom", "freq" + ) @skipUnlessJinja() def test_jinja_template_handle_subhandler_v3_with_clean_payload(self): @@ -172,146 +200,163 @@ class TestJinjaTemplatePartHandler(CiTestCase): # Create required instance-data.json file instance_json = os.path.join(self.run_dir, INSTANCE_DATA_FILE) - instance_data = {'topkey': {'sub': 'runcmd: [echo hi]'}} + instance_data = {"topkey": {"sub": "runcmd: [echo hi]"}} util.write_file(instance_json, util.json_dumps(instance_data)) h = JinjaTemplatePartHandler( - self.paths, sub_handlers=[cloudcfg_handler]) - with mock.patch.object(cloudcfg_handler, 'handle_part') as m_part: + self.paths, sub_handlers=[cloudcfg_handler] + ) + with mock.patch.object(cloudcfg_handler, "handle_part") as m_part: # ctype with leading '!' not in handlers.CONTENT_SIGNALS h.handle_part( - data='data', ctype="!" + handlers.CONTENT_END, - filename='part01', - payload='## template: jinja\n#cloud-config\n{{ topkey.sub }}', - frequency='freq', headers='headers') + data="data", + ctype="!" + handlers.CONTENT_END, + filename="part01", + payload="## template: jinja\n#cloud-config\n{{ topkey.sub }}", + frequency="freq", + headers="headers", + ) m_part.assert_called_once_with( - 'data', '!__end__', 'part01', '#cloud-config\nruncmd: [echo hi]', - 'freq', 'headers') + "data", + "!__end__", + "part01", + "#cloud-config\nruncmd: [echo hi]", + "freq", + "headers", + ) def test_jinja_template_handle_errors_on_missing_instance_data_json(self): """If instance-data is absent, raise an error from handle_part.""" script_handler = ShellScriptPartHandler(self.paths) - h = JinjaTemplatePartHandler( - self.paths, sub_handlers=[script_handler]) + h = JinjaTemplatePartHandler(self.paths, sub_handlers=[script_handler]) with self.assertRaises(RuntimeError) as context_manager: h.handle_part( - data='data', ctype="!" + handlers.CONTENT_START, - filename='part01', - payload='## template: jinja \n#!/bin/bash\necho himom', - frequency='freq', headers='headers') - script_file = os.path.join(script_handler.script_dir, 'part01') + data="data", + ctype="!" + handlers.CONTENT_START, + filename="part01", + payload="## template: jinja \n#!/bin/bash\necho himom", + frequency="freq", + headers="headers", + ) + script_file = os.path.join(script_handler.script_dir, "part01") self.assertEqual( - 'Cannot render jinja template vars. Instance data not yet present' - ' at {}/{}'.format(self.run_dir, INSTANCE_DATA_FILE), - str(context_manager.exception) + "Cannot render jinja template vars. Instance data not yet present" + " at {}/{}".format(self.run_dir, INSTANCE_DATA_FILE), + str(context_manager.exception), ) self.assertFalse( os.path.exists(script_file), - 'Unexpected file created %s' % script_file) + "Unexpected file created %s" % script_file, + ) def test_jinja_template_handle_errors_on_unreadable_instance_data(self): """If instance-data is unreadable, raise an error from handle_part.""" script_handler = ShellScriptPartHandler(self.paths) - instance_json = os.path.join( - self.run_dir, INSTANCE_DATA_FILE) + instance_json = os.path.join(self.run_dir, INSTANCE_DATA_FILE) util.write_file(instance_json, util.json_dumps({})) - h = JinjaTemplatePartHandler( - self.paths, sub_handlers=[script_handler]) - with mock.patch(self.mpath + 'load_file') as m_load: + h = JinjaTemplatePartHandler(self.paths, sub_handlers=[script_handler]) + with mock.patch(self.mpath + "load_file") as m_load: with self.assertRaises(RuntimeError) as context_manager: - m_load.side_effect = OSError(errno.EACCES, 'Not allowed') + m_load.side_effect = OSError(errno.EACCES, "Not allowed") h.handle_part( - data='data', ctype="!" + handlers.CONTENT_START, - filename='part01', - payload='## template: jinja \n#!/bin/bash\necho himom', - frequency='freq', headers='headers') - script_file = os.path.join(script_handler.script_dir, 'part01') + data="data", + ctype="!" + handlers.CONTENT_START, + filename="part01", + payload="## template: jinja \n#!/bin/bash\necho himom", + frequency="freq", + headers="headers", + ) + script_file = os.path.join(script_handler.script_dir, "part01") self.assertEqual( "Cannot render jinja template vars. No read permission on " "'{}/{}'. Try sudo".format(self.run_dir, INSTANCE_DATA_FILE), - str(context_manager.exception)) + str(context_manager.exception), + ) self.assertFalse( os.path.exists(script_file), - 'Unexpected file created %s' % script_file) + "Unexpected file created %s" % script_file, + ) @skipUnlessJinja() def test_jinja_template_handle_renders_jinja_content(self): """When present, render jinja variables from instance data""" script_handler = ShellScriptPartHandler(self.paths) instance_json = os.path.join(self.run_dir, INSTANCE_DATA_FILE) - instance_data = {'topkey': {'subkey': 'echo himom'}} + instance_data = {"topkey": {"subkey": "echo himom"}} util.write_file(instance_json, util.json_dumps(instance_data)) - h = JinjaTemplatePartHandler( - self.paths, sub_handlers=[script_handler]) + h = JinjaTemplatePartHandler(self.paths, sub_handlers=[script_handler]) h.handle_part( - data='data', ctype="!" + handlers.CONTENT_START, - filename='part01', + data="data", + ctype="!" + handlers.CONTENT_START, + filename="part01", payload=( - '## template: jinja \n' - '#!/bin/bash\n' - '{{ topkey.subkey|default("nosubkey") }}'), - frequency='freq', headers='headers') - script_file = os.path.join(script_handler.script_dir, 'part01') + "## template: jinja \n" + "#!/bin/bash\n" + '{{ topkey.subkey|default("nosubkey") }}' + ), + frequency="freq", + headers="headers", + ) + script_file = os.path.join(script_handler.script_dir, "part01") self.assertNotIn( - 'Instance data not yet present at {}/{}'.format( - self.run_dir, INSTANCE_DATA_FILE), - self.logs.getvalue()) + "Instance data not yet present at {}/{}".format( + self.run_dir, INSTANCE_DATA_FILE + ), + self.logs.getvalue(), + ) self.assertEqual( - '#!/bin/bash\necho himom', util.load_file(script_file)) + "#!/bin/bash\necho himom", util.load_file(script_file) + ) @skipUnlessJinja() def test_jinja_template_handle_renders_jinja_content_missing_keys(self): """When specified jinja variable is undefined, log a warning.""" script_handler = ShellScriptPartHandler(self.paths) instance_json = os.path.join(self.run_dir, INSTANCE_DATA_FILE) - instance_data = {'topkey': {'subkey': 'echo himom'}} + instance_data = {"topkey": {"subkey": "echo himom"}} util.write_file(instance_json, util.json_dumps(instance_data)) - h = JinjaTemplatePartHandler( - self.paths, sub_handlers=[script_handler]) + h = JinjaTemplatePartHandler(self.paths, sub_handlers=[script_handler]) h.handle_part( - data='data', ctype="!" + handlers.CONTENT_START, - filename='part01', - payload='## template: jinja \n#!/bin/bash\n{{ goodtry }}', - frequency='freq', headers='headers') - script_file = os.path.join(script_handler.script_dir, 'part01') + data="data", + ctype="!" + handlers.CONTENT_START, + filename="part01", + payload="## template: jinja \n#!/bin/bash\n{{ goodtry }}", + frequency="freq", + headers="headers", + ) + script_file = os.path.join(script_handler.script_dir, "part01") self.assertTrue( os.path.exists(script_file), - 'Missing expected file %s' % script_file) + "Missing expected file %s" % script_file, + ) self.assertIn( "WARNING: Could not render jinja template variables in file" " 'part01': 'goodtry'\n", - self.logs.getvalue()) + self.logs.getvalue(), + ) class TestConvertJinjaInstanceData: - @pytest.mark.parametrize( - "include_key_aliases,data,expected", ( - ( - False, - {'my-key': 'my-val'}, - {'my-key': 'my-val'} - ), + "include_key_aliases,data,expected", + ( + (False, {"my-key": "my-val"}, {"my-key": "my-val"}), ( True, - {'my-key': 'my-val'}, - {'my-key': 'my-val', 'my_key': 'my-val'} - ), - ( - False, - {'my.key': 'my.val'}, - {'my.key': 'my.val'} + {"my-key": "my-val"}, + {"my-key": "my-val", "my_key": "my-val"}, ), + (False, {"my.key": "my.val"}, {"my.key": "my.val"}), ( True, - {'my.key': 'my.val'}, - {'my.key': 'my.val', 'my_key': 'my.val'} + {"my.key": "my.val"}, + {"my.key": "my.val", "my_key": "my.val"}, ), ( True, - {'my/key': 'my/val'}, - {'my/key': 'my/val', 'my_key': 'my/val'} + {"my/key": "my/val"}, + {"my/key": "my/val", "my_key": "my/val"}, ), - ) + ), ) def test_convert_instance_data_operators_to_underscores( self, include_key_aliases, data, expected @@ -328,39 +373,48 @@ class TestConvertJinjaInstanceData: allow ease of reference for users. Intsead of v1.availability_zone, the name availability_zone can be used in templates. """ - data = {'ds': {'dskey1': 1, 'dskey2': 2}, - 'v1': {'v1key1': 'v1.1'}, - 'v2': {'v2key1': 'v2.1'}} + data = { + "ds": {"dskey1": 1, "dskey2": 2}, + "v1": {"v1key1": "v1.1"}, + "v2": {"v2key1": "v2.1"}, + } expected_data = copy.deepcopy(data) - expected_data.update({'v1key1': 'v1.1', 'v2key1': 'v2.1'}) + expected_data.update({"v1key1": "v1.1", "v2key1": "v2.1"}) converted_data = convert_jinja_instance_data(data=data) - assert sorted(['ds', 'v1', 'v2', 'v1key1', 'v2key1']) == sorted( + assert sorted(["ds", "v1", "v2", "v1key1", "v2key1"]) == sorted( converted_data.keys() ) assert expected_data == converted_data def test_convert_instance_data_most_recent_version_of_promoted_keys(self): """The most-recent versioned key value is promoted to top-level.""" - data = {'v1': {'key1': 'old v1 key1', 'key2': 'old v1 key2'}, - 'v2': {'key1': 'newer v2 key1', 'key3': 'newer v2 key3'}, - 'v3': {'key1': 'newest v3 key1'}} + data = { + "v1": {"key1": "old v1 key1", "key2": "old v1 key2"}, + "v2": {"key1": "newer v2 key1", "key3": "newer v2 key3"}, + "v3": {"key1": "newest v3 key1"}, + } expected_data = copy.deepcopy(data) expected_data.update( - {'key1': 'newest v3 key1', 'key2': 'old v1 key2', - 'key3': 'newer v2 key3'}) + { + "key1": "newest v3 key1", + "key2": "old v1 key2", + "key3": "newer v2 key3", + } + ) converted_data = convert_jinja_instance_data(data=data) assert expected_data == converted_data def test_convert_instance_data_decodes_decode_paths(self): """Any decode_paths provided are decoded by convert_instance_data.""" - data = {'key1': {'subkey1': 'aGkgbW9t'}, 'key2': 'aGkgZGFk'} + data = {"key1": {"subkey1": "aGkgbW9t"}, "key2": "aGkgZGFk"} expected_data = copy.deepcopy(data) - expected_data['key1']['subkey1'] = 'hi mom' + expected_data["key1"]["subkey1"] = "hi mom" converted_data = convert_jinja_instance_data( - data=data, decode_paths=('key1/subkey1',)) + data=data, decode_paths=("key1/subkey1",) + ) assert expected_data == converted_data @@ -372,9 +426,11 @@ class TestRenderJinjaPayload(CiTestCase): def test_render_jinja_payload_logs_jinja_vars_on_debug(self): """When debug is True, log jinja varables available.""" payload = ( - '## template: jinja\n#!/bin/sh\necho hi from {{ v1.hostname }}') - instance_data = {'v1': {'hostname': 'foo'}, 'instance-id': 'iid'} - expected_log = dedent("""\ + "## template: jinja\n#!/bin/sh\necho hi from {{ v1.hostname }}" + ) + instance_data = {"v1": {"hostname": "foo"}, "instance-id": "iid"} + expected_log = dedent( + """\ DEBUG: Converted jinja variables { "hostname": "foo", @@ -384,28 +440,37 @@ class TestRenderJinjaPayload(CiTestCase): "hostname": "foo" } } - """) + """ + ) self.assertEqual( render_jinja_payload( - payload=payload, payload_fn='myfile', - instance_data=instance_data, debug=True), - '#!/bin/sh\necho hi from foo') + payload=payload, + payload_fn="myfile", + instance_data=instance_data, + debug=True, + ), + "#!/bin/sh\necho hi from foo", + ) self.assertEqual(expected_log, self.logs.getvalue()) @skipUnlessJinja() def test_render_jinja_payload_replaces_missing_variables_and_warns(self): """Warn on missing jinja variables and replace the absent variable.""" - payload = ( - '## template: jinja\n#!/bin/sh\necho hi from {{ NOTHERE }}') - instance_data = {'v1': {'hostname': 'foo'}, 'instance-id': 'iid'} + payload = "## template: jinja\n#!/bin/sh\necho hi from {{ NOTHERE }}" + instance_data = {"v1": {"hostname": "foo"}, "instance-id": "iid"} self.assertEqual( render_jinja_payload( - payload=payload, payload_fn='myfile', - instance_data=instance_data), - '#!/bin/sh\necho hi from CI_MISSING_JINJA_VAR/NOTHERE') + payload=payload, + payload_fn="myfile", + instance_data=instance_data, + ), + "#!/bin/sh\necho hi from CI_MISSING_JINJA_VAR/NOTHERE", + ) expected_log = ( - 'WARNING: Could not render jinja template variables in file' - " 'myfile': 'NOTHERE'") + "WARNING: Could not render jinja template variables in file" + " 'myfile': 'NOTHERE'" + ) self.assertIn(expected_log, self.logs.getvalue()) + # vi: ts=4 expandtab |