# This file is part of cloud-init. See LICENSE file for license information. """Tests of the built-in user data handlers.""" import copy import errno import os import shutil import tempfile from textwrap import dedent from cloudinit.tests.helpers import ( FilesystemMockingTestCase, CiTestCase, mock, skipUnlessJinja) from cloudinit import handlers from cloudinit import helpers from cloudinit import subp from cloudinit import util from cloudinit.handlers.cloud_config import CloudConfigPartHandler from cloudinit.handlers.jinja_template import ( JinjaTemplatePartHandler, convert_jinja_instance_data, render_jinja_payload) from cloudinit.handlers.shell_script import ShellScriptPartHandler from cloudinit.handlers.upstart_job import UpstartJobPartHandler from cloudinit.settings import (PER_ALWAYS, PER_INSTANCE) INSTANCE_DATA_FILE = 'instance-data-sensitive.json' class TestUpstartJobPartHandler(FilesystemMockingTestCase): mpath = 'cloudinit.handlers.upstart_job.' def test_upstart_frequency_no_out(self): c_root = tempfile.mkdtemp() self.addCleanup(shutil.rmtree, c_root) up_root = tempfile.mkdtemp() self.addCleanup(shutil.rmtree, up_root) paths = helpers.Paths({ 'cloud_dir': c_root, 'upstart_dir': up_root, }) h = UpstartJobPartHandler(paths) # No files should be written out when # the frequency is ! per-instance h.handle_part('', handlers.CONTENT_START, None, None, None) h.handle_part('blah', 'text/upstart-job', 'test.conf', 'blah', frequency=PER_ALWAYS) h.handle_part('', handlers.CONTENT_END, None, None, None) self.assertEqual(0, len(os.listdir(up_root))) def test_upstart_frequency_single(self): # files should be written out when frequency is ! per-instance new_root = tempfile.mkdtemp() self.addCleanup(shutil.rmtree, new_root) self.patchOS(new_root) self.patchUtils(new_root) paths = helpers.Paths({ 'upstart_dir': "/etc/upstart", }) util.ensure_dir("/run") util.ensure_dir("/etc/upstart") with mock.patch(self.mpath + 'SUITABLE_UPSTART', return_value=True): with mock.patch.object(subp, 'subp') as m_subp: h = UpstartJobPartHandler(paths) h.handle_part('', handlers.CONTENT_START, None, None, None) h.handle_part('blah', 'text/upstart-job', 'test.conf', 'blah', frequency=PER_INSTANCE) h.handle_part('', handlers.CONTENT_END, None, None, None) self.assertEqual(len(os.listdir('/etc/upstart')), 1) m_subp.assert_called_once_with( ['initctl', 'reload-configuration'], capture=False) class TestJinjaTemplatePartHandler(CiTestCase): with_logs = True mpath = 'cloudinit.handlers.jinja_template.' def setUp(self): super(TestJinjaTemplatePartHandler, self).setUp() self.tmp = self.tmp_dir() self.run_dir = os.path.join(self.tmp, 'run_dir') util.ensure_dir(self.run_dir) self.paths = helpers.Paths({ 'cloud_dir': self.tmp, 'run_dir': self.run_dir}) def test_jinja_template_part_handler_defaults(self): """On init, paths are saved and subhandler types are empty.""" h = JinjaTemplatePartHandler(self.paths) self.assertEqual(['## template: jinja'], h.prefixes) self.assertEqual(3, h.handler_version) self.assertEqual(self.paths, h.paths) self.assertEqual({}, h.sub_handlers) def test_jinja_template_part_handler_looks_up_sub_handler_types(self): """When sub_handlers are passed, init lists types of subhandlers.""" script_handler = ShellScriptPartHandler(self.paths) cloudconfig_handler = CloudConfigPartHandler(self.paths) h = JinjaTemplatePartHandler( self.paths, sub_handlers=[script_handler, cloudconfig_handler]) self.assertCountEqual( ['text/cloud-config', 'text/cloud-config-jsonp', 'text/x-shellscript'], h.sub_handlers) def test_jinja_template_part_handler_looks_up_subhandler_types(self): """When sub_handlers are passed, init lists types of subhandlers.""" script_handler = ShellScriptPartHandler(self.paths) cloudconfig_handler = CloudConfigPartHandler(self.paths) h = JinjaTemplatePartHandler( self.paths, sub_handlers=[script_handler, cloudconfig_handler]) self.assertCountEqual( ['text/cloud-config', 'text/cloud-config-jsonp', 'text/x-shellscript'], h.sub_handlers) def test_jinja_template_handle_noop_on_content_signals(self): """Perform no part handling when content type is CONTENT_SIGNALS.""" script_handler = ShellScriptPartHandler(self.paths) h = JinjaTemplatePartHandler( self.paths, sub_handlers=[script_handler]) with mock.patch.object(script_handler, 'handle_part') as m_handle_part: h.handle_part( data='data', ctype=handlers.CONTENT_START, filename='part-1', payload='## template: jinja\n#!/bin/bash\necho himom', frequency='freq', headers='headers') m_handle_part.assert_not_called() @skipUnlessJinja() def test_jinja_template_handle_subhandler_v2_with_clean_payload(self): """Call version 2 subhandler.handle_part with stripped payload.""" script_handler = ShellScriptPartHandler(self.paths) self.assertEqual(2, script_handler.handler_version) # Create required instance data json file instance_json = os.path.join(self.run_dir, INSTANCE_DATA_FILE) instance_data = {'topkey': 'echo himom'} util.write_file(instance_json, util.json_dumps(instance_data)) h = JinjaTemplatePartHandler( self.paths, sub_handlers=[script_handler]) with mock.patch.object(script_handler, 'handle_part') as m_part: # ctype with leading '!' not in handlers.CONTENT_SIGNALS h.handle_part( data='data', ctype="!" + handlers.CONTENT_START, filename='part01', payload='## template: jinja \t \n#!/bin/bash\n{{ topkey }}', frequency='freq', headers='headers') m_part.assert_called_once_with( 'data', '!__begin__', 'part01', '#!/bin/bash\necho himom', 'freq') @skipUnlessJinja() def test_jinja_template_handle_subhandler_v3_with_clean_payload(self): """Call version 3 subhandler.handle_part with stripped payload.""" cloudcfg_handler = CloudConfigPartHandler(self.paths) self.assertEqual(3, cloudcfg_handler.handler_version) # Create required instance-data.json file instance_json = os.path.join(self.run_dir, INSTANCE_DATA_FILE) instance_data = {'topkey': {'sub': 'runcmd: [echo hi]'}} util.write_file(instance_json, util.json_dumps(instance_data)) h = JinjaTemplatePartHandler( self.paths, sub_handlers=[cloudcfg_handler]) with mock.patch.object(cloudcfg_handler, 'handle_part') as m_part: # ctype with leading '!' not in handlers.CONTENT_SIGNALS h.handle_part( data='data', ctype="!" + handlers.CONTENT_END, filename='part01', payload='## template: jinja\n#cloud-config\n{{ topkey.sub }}', frequency='freq', headers='headers') m_part.assert_called_once_with( 'data', '!__end__', 'part01', '#cloud-config\nruncmd: [echo hi]', 'freq', 'headers') def test_jinja_template_handle_errors_on_missing_instance_data_json(self): """If instance-data is absent, raise an error from handle_part.""" script_handler = ShellScriptPartHandler(self.paths) h = JinjaTemplatePartHandler( self.paths, sub_handlers=[script_handler]) with self.assertRaises(RuntimeError) as context_manager: h.handle_part( data='data', ctype="!" + handlers.CONTENT_START, filename='part01', payload='## template: jinja \n#!/bin/bash\necho himom', frequency='freq', headers='headers') script_file = os.path.join(script_handler.script_dir, 'part01') self.assertEqual( 'Cannot render jinja template vars. Instance data not yet present' ' at {}/{}'.format(self.run_dir, INSTANCE_DATA_FILE), str(context_manager.exception) ) self.assertFalse( os.path.exists(script_file), 'Unexpected file created %s' % script_file) def test_jinja_template_handle_errors_on_unreadable_instance_data(self): """If instance-data is unreadable, raise an error from handle_part.""" script_handler = ShellScriptPartHandler(self.paths) instance_json = os.path.join( self.run_dir, INSTANCE_DATA_FILE) util.write_file(instance_json, util.json_dumps({})) h = JinjaTemplatePartHandler( self.paths, sub_handlers=[script_handler]) with mock.patch(self.mpath + 'load_file') as m_load: with self.assertRaises(RuntimeError) as context_manager: m_load.side_effect = OSError(errno.EACCES, 'Not allowed') h.handle_part( data='data', ctype="!" + handlers.CONTENT_START, filename='part01', payload='## template: jinja \n#!/bin/bash\necho himom', frequency='freq', headers='headers') script_file = os.path.join(script_handler.script_dir, 'part01') self.assertEqual( "Cannot render jinja template vars. No read permission on " "'{}/{}'. Try sudo".format(self.run_dir, INSTANCE_DATA_FILE), str(context_manager.exception)) self.assertFalse( os.path.exists(script_file), 'Unexpected file created %s' % script_file) @skipUnlessJinja() def test_jinja_template_handle_renders_jinja_content(self): """When present, render jinja variables from instance data""" script_handler = ShellScriptPartHandler(self.paths) instance_json = os.path.join(self.run_dir, INSTANCE_DATA_FILE) instance_data = {'topkey': {'subkey': 'echo himom'}} util.write_file(instance_json, util.json_dumps(instance_data)) h = JinjaTemplatePartHandler( self.paths, sub_handlers=[script_handler]) h.handle_part( data='data', ctype="!" + handlers.CONTENT_START, filename='part01', payload=( '## template: jinja \n' '#!/bin/bash\n' '{{ topkey.subkey|default("nosubkey") }}'), frequency='freq', headers='headers') script_file = os.path.join(script_handler.script_dir, 'part01') self.assertNotIn( 'Instance data not yet present at {}/{}'.format( self.run_dir, INSTANCE_DATA_FILE), self.logs.getvalue()) self.assertEqual( '#!/bin/bash\necho himom', util.load_file(script_file)) @skipUnlessJinja() def test_jinja_template_handle_renders_jinja_content_missing_keys(self): """When specified jinja variable is undefined, log a warning.""" script_handler = ShellScriptPartHandler(self.paths) instance_json = os.path.join(self.run_dir, INSTANCE_DATA_FILE) instance_data = {'topkey': {'subkey': 'echo himom'}} util.write_file(instance_json, util.json_dumps(instance_data)) h = JinjaTemplatePartHandler( self.paths, sub_handlers=[script_handler]) h.handle_part( data='data', ctype="!" + handlers.CONTENT_START, filename='part01', payload='## template: jinja \n#!/bin/bash\n{{ goodtry }}', frequency='freq', headers='headers') script_file = os.path.join(script_handler.script_dir, 'part01') self.assertTrue( os.path.exists(script_file), 'Missing expected file %s' % script_file) self.assertIn( "WARNING: Could not render jinja template variables in file" " 'part01': 'goodtry'\n", self.logs.getvalue()) class TestConvertJinjaInstanceData(CiTestCase): def test_convert_instance_data_hyphens_to_underscores(self): """Replace hyphenated keys with underscores in instance-data.""" data = {'hyphenated-key': 'hyphenated-val', 'underscore_delim_key': 'underscore_delimited_val'} expected_data = {'hyphenated_key': 'hyphenated-val', 'underscore_delim_key': 'underscore_delimited_val'} self.assertEqual( expected_data, convert_jinja_instance_data(data=data)) def test_convert_instance_data_promotes_versioned_keys_to_top_level(self): """Any versioned keys are promoted as top-level keys This provides any cloud-init standardized keys up at a top-level to allow ease of reference for users. Intsead of v1.availability_zone, the name availability_zone can be used in templates. """ data = {'ds': {'dskey1': 1, 'dskey2': 2}, 'v1': {'v1key1': 'v1.1'}, 'v2': {'v2key1': 'v2.1'}} expected_data = copy.deepcopy(data) expected_data.update({'v1key1': 'v1.1', 'v2key1': 'v2.1'}) converted_data = convert_jinja_instance_data(data=data) self.assertCountEqual( ['ds', 'v1', 'v2', 'v1key1', 'v2key1'], converted_data.keys()) self.assertEqual( expected_data, converted_data) def test_convert_instance_data_most_recent_version_of_promoted_keys(self): """The most-recent versioned key value is promoted to top-level.""" data = {'v1': {'key1': 'old v1 key1', 'key2': 'old v1 key2'}, 'v2': {'key1': 'newer v2 key1', 'key3': 'newer v2 key3'}, 'v3': {'key1': 'newest v3 key1'}} expected_data = copy.deepcopy(data) expected_data.update( {'key1': 'newest v3 key1', 'key2': 'old v1 key2', 'key3': 'newer v2 key3'}) converted_data = convert_jinja_instance_data(data=data) self.assertEqual( expected_data, converted_data) def test_convert_instance_data_decodes_decode_paths(self): """Any decode_paths provided are decoded by convert_instance_data.""" data = {'key1': {'subkey1': 'aGkgbW9t'}, 'key2': 'aGkgZGFk'} expected_data = copy.deepcopy(data) expected_data['key1']['subkey1'] = 'hi mom' converted_data = convert_jinja_instance_data( data=data, decode_paths=('key1/subkey1',)) self.assertEqual( expected_data, converted_data) class TestRenderJinjaPayload(CiTestCase): with_logs = True @skipUnlessJinja() def test_render_jinja_payload_logs_jinja_vars_on_debug(self): """When debug is True, log jinja varables available.""" payload = ( '## template: jinja\n#!/bin/sh\necho hi from {{ v1.hostname }}') instance_data = {'v1': {'hostname': 'foo'}, 'instance-id': 'iid'} expected_log = dedent("""\ DEBUG: Converted jinja variables { "hostname": "foo", "instance_id": "iid", "v1": { "hostname": "foo" } } """) self.assertEqual( render_jinja_payload( payload=payload, payload_fn='myfile', instance_data=instance_data, debug=True), '#!/bin/sh\necho hi from foo') self.assertEqual(expected_log, self.logs.getvalue()) @skipUnlessJinja() def test_render_jinja_payload_replaces_missing_variables_and_warns(self): """Warn on missing jinja variables and replace the absent variable.""" payload = ( '## template: jinja\n#!/bin/sh\necho hi from {{ NOTHERE }}') instance_data = {'v1': {'hostname': 'foo'}, 'instance-id': 'iid'} self.assertEqual( render_jinja_payload( payload=payload, payload_fn='myfile', instance_data=instance_data), '#!/bin/sh\necho hi from CI_MISSING_JINJA_VAR/NOTHERE') expected_log = ( 'WARNING: Could not render jinja template variables in file' " 'myfile': 'NOTHERE'") self.assertIn(expected_log, self.logs.getvalue()) # vi: ts=4 expandtab