# This file is part of cloud-init. See LICENSE file for license information. """Tests of the built-in user data handlers.""" import copy import errno import os import shutil import tempfile from textwrap import dedent import pytest from cloudinit import handlers, helpers, subp, util from cloudinit.cmd.devel import read_cfg_paths from cloudinit.handlers.cloud_config import CloudConfigPartHandler from cloudinit.handlers.jinja_template import ( JinjaTemplatePartHandler, convert_jinja_instance_data, render_jinja_payload, ) from cloudinit.handlers.shell_script import ShellScriptPartHandler from cloudinit.handlers.shell_script_by_frequency import ( get_script_folder_by_frequency, path_map, ) from cloudinit.handlers.upstart_job import UpstartJobPartHandler from cloudinit.settings import PER_ALWAYS, PER_INSTANCE, PER_ONCE from tests.unittests.helpers import ( CiTestCase, FilesystemMockingTestCase, mock, skipUnlessJinja, ) INSTANCE_DATA_FILE = "instance-data-sensitive.json" class TestUpstartJobPartHandler(FilesystemMockingTestCase): mpath = "cloudinit.handlers.upstart_job." def test_upstart_frequency_no_out(self): c_root = tempfile.mkdtemp() self.addCleanup(shutil.rmtree, c_root) up_root = tempfile.mkdtemp() self.addCleanup(shutil.rmtree, up_root) paths = helpers.Paths( { "cloud_dir": c_root, "upstart_dir": up_root, } ) h = UpstartJobPartHandler(paths) # No files should be written out when # the frequency is ! per-instance h.handle_part("", handlers.CONTENT_START, None, None, None) h.handle_part( "blah", "text/upstart-job", "test.conf", "blah", frequency=PER_ALWAYS, ) h.handle_part("", handlers.CONTENT_END, None, None, None) self.assertEqual(0, len(os.listdir(up_root))) def test_upstart_frequency_single(self): # files should be written out when frequency is ! per-instance new_root = tempfile.mkdtemp() self.addCleanup(shutil.rmtree, new_root) self.patchOS(new_root) self.patchUtils(new_root) paths = helpers.Paths( { "upstart_dir": "/etc/upstart", } ) util.ensure_dir("/run") util.ensure_dir("/etc/upstart") with mock.patch(self.mpath + "SUITABLE_UPSTART", return_value=True): with mock.patch.object(subp, "subp") as m_subp: h = UpstartJobPartHandler(paths) h.handle_part("", handlers.CONTENT_START, None, None, None) h.handle_part( "blah", "text/upstart-job", "test.conf", "blah", frequency=PER_INSTANCE, ) h.handle_part("", handlers.CONTENT_END, None, None, None) self.assertEqual(len(os.listdir("/etc/upstart")), 1) m_subp.assert_called_once_with( ["initctl", "reload-configuration"], capture=False ) class TestJinjaTemplatePartHandler(CiTestCase): with_logs = True mpath = "cloudinit.handlers.jinja_template." def setUp(self): super(TestJinjaTemplatePartHandler, self).setUp() self.tmp = self.tmp_dir() self.run_dir = os.path.join(self.tmp, "run_dir") util.ensure_dir(self.run_dir) self.paths = helpers.Paths( {"cloud_dir": self.tmp, "run_dir": self.run_dir} ) def test_jinja_template_part_handler_defaults(self): """On init, paths are saved and subhandler types are empty.""" h = JinjaTemplatePartHandler(self.paths) self.assertEqual(["## template: jinja"], h.prefixes) self.assertEqual(3, h.handler_version) self.assertEqual(self.paths, h.paths) self.assertEqual({}, h.sub_handlers) def test_jinja_template_part_handler_looks_up_sub_handler_types(self): """When sub_handlers are passed, init lists types of subhandlers.""" script_handler = ShellScriptPartHandler(self.paths) cloudconfig_handler = CloudConfigPartHandler(self.paths) h = JinjaTemplatePartHandler( self.paths, sub_handlers=[script_handler, cloudconfig_handler] ) self.assertCountEqual( [ "text/cloud-config", "text/cloud-config-jsonp", "text/x-shellscript", ], h.sub_handlers, ) def test_jinja_template_part_handler_looks_up_subhandler_types(self): """When sub_handlers are passed, init lists types of subhandlers.""" script_handler = ShellScriptPartHandler(self.paths) cloudconfig_handler = CloudConfigPartHandler(self.paths) h = JinjaTemplatePartHandler( self.paths, sub_handlers=[script_handler, cloudconfig_handler] ) self.assertCountEqual( [ "text/cloud-config", "text/cloud-config-jsonp", "text/x-shellscript", ], h.sub_handlers, ) def test_jinja_template_handle_noop_on_content_signals(self): """Perform no part handling when content type is CONTENT_SIGNALS.""" script_handler = ShellScriptPartHandler(self.paths) h = JinjaTemplatePartHandler(self.paths, sub_handlers=[script_handler]) with mock.patch.object(script_handler, "handle_part") as m_handle_part: h.handle_part( data="data", ctype=handlers.CONTENT_START, filename="part-1", payload="## template: jinja\n#!/bin/bash\necho himom", frequency="freq", headers="headers", ) m_handle_part.assert_not_called() @skipUnlessJinja() def test_jinja_template_handle_subhandler_v2_with_clean_payload(self): """Call version 2 subhandler.handle_part with stripped payload.""" script_handler = ShellScriptPartHandler(self.paths) self.assertEqual(2, script_handler.handler_version) # Create required instance data json file instance_json = os.path.join(self.run_dir, INSTANCE_DATA_FILE) instance_data = {"topkey": "echo himom"} util.write_file(instance_json, util.json_dumps(instance_data)) h = JinjaTemplatePartHandler(self.paths, sub_handlers=[script_handler]) with mock.patch.object(script_handler, "handle_part") as m_part: # ctype with leading '!' not in handlers.CONTENT_SIGNALS h.handle_part( data="data", ctype="!" + handlers.CONTENT_START, filename="part01", payload="## template: jinja \t \n#!/bin/bash\n{{ topkey }}", frequency="freq", headers="headers", ) m_part.assert_called_once_with( "data", "!__begin__", "part01", "#!/bin/bash\necho himom", "freq" ) @skipUnlessJinja() def test_jinja_template_handle_subhandler_v3_with_clean_payload(self): """Call version 3 subhandler.handle_part with stripped payload.""" cloudcfg_handler = CloudConfigPartHandler(self.paths) self.assertEqual(3, cloudcfg_handler.handler_version) # Create required instance-data.json file instance_json = os.path.join(self.run_dir, INSTANCE_DATA_FILE) instance_data = {"topkey": {"sub": "runcmd: [echo hi]"}} util.write_file(instance_json, util.json_dumps(instance_data)) h = JinjaTemplatePartHandler( self.paths, sub_handlers=[cloudcfg_handler] ) with mock.patch.object(cloudcfg_handler, "handle_part") as m_part: # ctype with leading '!' not in handlers.CONTENT_SIGNALS h.handle_part( data="data", ctype="!" + handlers.CONTENT_END, filename="part01", payload="## template: jinja\n#cloud-config\n{{ topkey.sub }}", frequency="freq", headers="headers", ) m_part.assert_called_once_with( "data", "!__end__", "part01", "#cloud-config\nruncmd: [echo hi]", "freq", "headers", ) def test_jinja_template_handle_errors_on_missing_instance_data_json(self): """If instance-data is absent, raise an error from handle_part.""" script_handler = ShellScriptPartHandler(self.paths) h = JinjaTemplatePartHandler(self.paths, sub_handlers=[script_handler]) with self.assertRaises(RuntimeError) as context_manager: h.handle_part( data="data", ctype="!" + handlers.CONTENT_START, filename="part01", payload="## template: jinja \n#!/bin/bash\necho himom", frequency="freq", headers="headers", ) script_file = os.path.join(script_handler.script_dir, "part01") self.assertEqual( "Cannot render jinja template vars. Instance data not yet present" " at {}/{}".format(self.run_dir, INSTANCE_DATA_FILE), str(context_manager.exception), ) self.assertFalse( os.path.exists(script_file), "Unexpected file created %s" % script_file, ) def test_jinja_template_handle_errors_on_unreadable_instance_data(self): """If instance-data is unreadable, raise an error from handle_part.""" script_handler = ShellScriptPartHandler(self.paths) instance_json = os.path.join(self.run_dir, INSTANCE_DATA_FILE) util.write_file(instance_json, util.json_dumps({})) h = JinjaTemplatePartHandler(self.paths, sub_handlers=[script_handler]) with mock.patch(self.mpath + "load_file") as m_load: with self.assertRaises(RuntimeError) as context_manager: m_load.side_effect = OSError(errno.EACCES, "Not allowed") h.handle_part( data="data", ctype="!" + handlers.CONTENT_START, filename="part01", payload="## template: jinja \n#!/bin/bash\necho himom", frequency="freq", headers="headers", ) script_file = os.path.join(script_handler.script_dir, "part01") self.assertEqual( "Cannot render jinja template vars. No read permission on " "'{}/{}'. Try sudo".format(self.run_dir, INSTANCE_DATA_FILE), str(context_manager.exception), ) self.assertFalse( os.path.exists(script_file), "Unexpected file created %s" % script_file, ) @skipUnlessJinja() def test_jinja_template_handle_renders_jinja_content(self): """When present, render jinja variables from instance data""" script_handler = ShellScriptPartHandler(self.paths) instance_json = os.path.join(self.run_dir, INSTANCE_DATA_FILE) instance_data = {"topkey": {"subkey": "echo himom"}} util.write_file(instance_json, util.json_dumps(instance_data)) h = JinjaTemplatePartHandler(self.paths, sub_handlers=[script_handler]) h.handle_part( data="data", ctype="!" + handlers.CONTENT_START, filename="part01", payload=( "## template: jinja \n" "#!/bin/bash\n" '{{ topkey.subkey|default("nosubkey") }}' ), frequency="freq", headers="headers", ) script_file = os.path.join(script_handler.script_dir, "part01") self.assertNotIn( "Instance data not yet present at {}/{}".format( self.run_dir, INSTANCE_DATA_FILE ), self.logs.getvalue(), ) self.assertEqual( "#!/bin/bash\necho himom", util.load_file(script_file) ) @skipUnlessJinja() def test_jinja_template_handle_renders_jinja_content_missing_keys(self): """When specified jinja variable is undefined, log a warning.""" script_handler = ShellScriptPartHandler(self.paths) instance_json = os.path.join(self.run_dir, INSTANCE_DATA_FILE) instance_data = {"topkey": {"subkey": "echo himom"}} util.write_file(instance_json, util.json_dumps(instance_data)) h = JinjaTemplatePartHandler(self.paths, sub_handlers=[script_handler]) h.handle_part( data="data", ctype="!" + handlers.CONTENT_START, filename="part01", payload="## template: jinja \n#!/bin/bash\n{{ goodtry }}", frequency="freq", headers="headers", ) script_file = os.path.join(script_handler.script_dir, "part01") self.assertTrue( os.path.exists(script_file), "Missing expected file %s" % script_file, ) self.assertIn( "WARNING: Could not render jinja template variables in file" " 'part01': 'goodtry'\n", self.logs.getvalue(), ) class TestConvertJinjaInstanceData: @pytest.mark.parametrize( "include_key_aliases,data,expected", ( (False, {"my-key": "my-val"}, {"my-key": "my-val"}), ( True, {"my-key": "my-val"}, {"my-key": "my-val", "my_key": "my-val"}, ), (False, {"my.key": "my.val"}, {"my.key": "my.val"}), ( True, {"my.key": "my.val"}, {"my.key": "my.val", "my_key": "my.val"}, ), ( True, {"my/key": "my/val"}, {"my/key": "my/val", "my_key": "my/val"}, ), ), ) def test_convert_instance_data_operators_to_underscores( self, include_key_aliases, data, expected ): """Replace Jinja operators keys with underscores in instance-data.""" assert expected == convert_jinja_instance_data( data=data, include_key_aliases=include_key_aliases ) def test_convert_instance_data_promotes_versioned_keys_to_top_level(self): """Any versioned keys are promoted as top-level keys This provides any cloud-init standardized keys up at a top-level to allow ease of reference for users. Intsead of v1.availability_zone, the name availability_zone can be used in templates. """ data = { "ds": {"dskey1": 1, "dskey2": 2}, "v1": {"v1key1": "v1.1"}, "v2": {"v2key1": "v2.1"}, } expected_data = copy.deepcopy(data) expected_data.update({"v1key1": "v1.1", "v2key1": "v2.1"}) converted_data = convert_jinja_instance_data(data=data) assert sorted(["ds", "v1", "v2", "v1key1", "v2key1"]) == sorted( converted_data.keys() ) assert expected_data == converted_data def test_convert_instance_data_most_recent_version_of_promoted_keys(self): """The most-recent versioned key value is promoted to top-level.""" data = { "v1": {"key1": "old v1 key1", "key2": "old v1 key2"}, "v2": {"key1": "newer v2 key1", "key3": "newer v2 key3"}, "v3": {"key1": "newest v3 key1"}, } expected_data = copy.deepcopy(data) expected_data.update( { "key1": "newest v3 key1", "key2": "old v1 key2", "key3": "newer v2 key3", } ) converted_data = convert_jinja_instance_data(data=data) assert expected_data == converted_data def test_convert_instance_data_decodes_decode_paths(self): """Any decode_paths provided are decoded by convert_instance_data.""" data = {"key1": {"subkey1": "aGkgbW9t"}, "key2": "aGkgZGFk"} expected_data = copy.deepcopy(data) expected_data["key1"]["subkey1"] = "hi mom" converted_data = convert_jinja_instance_data( data=data, decode_paths=("key1/subkey1",) ) assert expected_data == converted_data class TestRenderJinjaPayload(CiTestCase): with_logs = True @skipUnlessJinja() def test_render_jinja_payload_logs_jinja_vars_on_debug(self): """When debug is True, log jinja varables available.""" payload = ( "## template: jinja\n#!/bin/sh\necho hi from {{ v1.hostname }}" ) instance_data = {"v1": {"hostname": "foo"}, "instance-id": "iid"} expected_log = dedent( """\ DEBUG: Converted jinja variables { "hostname": "foo", "instance-id": "iid", "instance_id": "iid", "v1": { "hostname": "foo" } } """ ) self.assertEqual( render_jinja_payload( payload=payload, payload_fn="myfile", instance_data=instance_data, debug=True, ), "#!/bin/sh\necho hi from foo", ) self.assertEqual(expected_log, self.logs.getvalue()) @skipUnlessJinja() def test_render_jinja_payload_replaces_missing_variables_and_warns(self): """Warn on missing jinja variables and replace the absent variable.""" payload = "## template: jinja\n#!/bin/sh\necho hi from {{ NOTHERE }}" instance_data = {"v1": {"hostname": "foo"}, "instance-id": "iid"} self.assertEqual( render_jinja_payload( payload=payload, payload_fn="myfile", instance_data=instance_data, ), "#!/bin/sh\necho hi from CI_MISSING_JINJA_VAR/NOTHERE", ) expected_log = ( "WARNING: Could not render jinja template variables in file" " 'myfile': 'NOTHERE'" ) self.assertIn(expected_log, self.logs.getvalue()) class TestShellScriptByFrequencyHandlers: def do_test_frequency(self, frequency): ci_paths = read_cfg_paths() scripts_dir = ci_paths.get_cpath("scripts") testFolder = os.path.join(scripts_dir, path_map[frequency]) folder = get_script_folder_by_frequency(frequency, scripts_dir) assert testFolder == folder def test_get_script_folder_per_boot(self): self.do_test_frequency(PER_ALWAYS) def test_get_script_folder_per_instance(self): self.do_test_frequency(PER_INSTANCE) def test_get_script_folder_per_once(self): self.do_test_frequency(PER_ONCE) # vi: ts=4 expandtab