summaryrefslogtreecommitdiff
path: root/tests/unittests/test_builtin_handlers.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/unittests/test_builtin_handlers.py')
-rw-r--r--tests/unittests/test_builtin_handlers.py324
1 files changed, 302 insertions, 22 deletions
diff --git a/tests/unittests/test_builtin_handlers.py b/tests/unittests/test_builtin_handlers.py
index 9751ed95..abe820e1 100644
--- a/tests/unittests/test_builtin_handlers.py
+++ b/tests/unittests/test_builtin_handlers.py
@@ -2,27 +2,34 @@
"""Tests of the built-in user data handlers."""
+import copy
import os
import shutil
import tempfile
+from textwrap import dedent
-try:
- from unittest import mock
-except ImportError:
- import mock
-from cloudinit.tests import helpers as test_helpers
+from cloudinit.tests.helpers import (
+ FilesystemMockingTestCase, CiTestCase, mock, skipUnlessJinja)
from cloudinit import handlers
from cloudinit import helpers
from cloudinit import util
-from cloudinit.handlers import upstart_job
+from cloudinit.handlers.cloud_config import CloudConfigPartHandler
+from cloudinit.handlers.jinja_template import (
+ JinjaTemplatePartHandler, convert_jinja_instance_data,
+ render_jinja_payload)
+from cloudinit.handlers.shell_script import ShellScriptPartHandler
+from cloudinit.handlers.upstart_job import UpstartJobPartHandler
from cloudinit.settings import (PER_ALWAYS, PER_INSTANCE)
-class TestBuiltins(test_helpers.FilesystemMockingTestCase):
+class TestUpstartJobPartHandler(FilesystemMockingTestCase):
+
+ mpath = 'cloudinit.handlers.upstart_job.'
+
def test_upstart_frequency_no_out(self):
c_root = tempfile.mkdtemp()
self.addCleanup(shutil.rmtree, c_root)
@@ -32,14 +39,13 @@ class TestBuiltins(test_helpers.FilesystemMockingTestCase):
'cloud_dir': c_root,
'upstart_dir': up_root,
})
- freq = PER_ALWAYS
- h = upstart_job.UpstartJobPartHandler(paths)
+ h = UpstartJobPartHandler(paths)
# No files should be written out when
# the frequency is ! per-instance
h.handle_part('', handlers.CONTENT_START,
None, None, None)
h.handle_part('blah', 'text/upstart-job',
- 'test.conf', 'blah', freq)
+ 'test.conf', 'blah', frequency=PER_ALWAYS)
h.handle_part('', handlers.CONTENT_END,
None, None, None)
self.assertEqual(0, len(os.listdir(up_root)))
@@ -48,7 +54,6 @@ class TestBuiltins(test_helpers.FilesystemMockingTestCase):
# files should be written out when frequency is ! per-instance
new_root = tempfile.mkdtemp()
self.addCleanup(shutil.rmtree, new_root)
- freq = PER_INSTANCE
self.patchOS(new_root)
self.patchUtils(new_root)
@@ -56,22 +61,297 @@ class TestBuiltins(test_helpers.FilesystemMockingTestCase):
'upstart_dir': "/etc/upstart",
})
- upstart_job.SUITABLE_UPSTART = True
util.ensure_dir("/run")
util.ensure_dir("/etc/upstart")
- with mock.patch.object(util, 'subp') as mockobj:
- h = upstart_job.UpstartJobPartHandler(paths)
- h.handle_part('', handlers.CONTENT_START,
- None, None, None)
- h.handle_part('blah', 'text/upstart-job',
- 'test.conf', 'blah', freq)
- h.handle_part('', handlers.CONTENT_END,
- None, None, None)
+ with mock.patch(self.mpath + 'SUITABLE_UPSTART', return_value=True):
+ with mock.patch.object(util, 'subp') as m_subp:
+ h = UpstartJobPartHandler(paths)
+ h.handle_part('', handlers.CONTENT_START,
+ None, None, None)
+ h.handle_part('blah', 'text/upstart-job',
+ 'test.conf', 'blah', frequency=PER_INSTANCE)
+ h.handle_part('', handlers.CONTENT_END,
+ None, None, None)
- self.assertEqual(len(os.listdir('/etc/upstart')), 1)
+ self.assertEqual(len(os.listdir('/etc/upstart')), 1)
- mockobj.assert_called_once_with(
+ m_subp.assert_called_once_with(
['initctl', 'reload-configuration'], capture=False)
+
+class TestJinjaTemplatePartHandler(CiTestCase):
+
+ with_logs = True
+
+ mpath = 'cloudinit.handlers.jinja_template.'
+
+ def setUp(self):
+ super(TestJinjaTemplatePartHandler, self).setUp()
+ self.tmp = self.tmp_dir()
+ self.run_dir = os.path.join(self.tmp, 'run_dir')
+ util.ensure_dir(self.run_dir)
+ self.paths = helpers.Paths({
+ 'cloud_dir': self.tmp, 'run_dir': self.run_dir})
+
+ def test_jinja_template_part_handler_defaults(self):
+ """On init, paths are saved and subhandler types are empty."""
+ h = JinjaTemplatePartHandler(self.paths)
+ self.assertEqual(['## template: jinja'], h.prefixes)
+ self.assertEqual(3, h.handler_version)
+ self.assertEqual(self.paths, h.paths)
+ self.assertEqual({}, h.sub_handlers)
+
+ def test_jinja_template_part_handler_looks_up_sub_handler_types(self):
+ """When sub_handlers are passed, init lists types of subhandlers."""
+ script_handler = ShellScriptPartHandler(self.paths)
+ cloudconfig_handler = CloudConfigPartHandler(self.paths)
+ h = JinjaTemplatePartHandler(
+ self.paths, sub_handlers=[script_handler, cloudconfig_handler])
+ self.assertItemsEqual(
+ ['text/cloud-config', 'text/cloud-config-jsonp',
+ 'text/x-shellscript'],
+ h.sub_handlers)
+
+ def test_jinja_template_part_handler_looks_up_subhandler_types(self):
+ """When sub_handlers are passed, init lists types of subhandlers."""
+ script_handler = ShellScriptPartHandler(self.paths)
+ cloudconfig_handler = CloudConfigPartHandler(self.paths)
+ h = JinjaTemplatePartHandler(
+ self.paths, sub_handlers=[script_handler, cloudconfig_handler])
+ self.assertItemsEqual(
+ ['text/cloud-config', 'text/cloud-config-jsonp',
+ 'text/x-shellscript'],
+ h.sub_handlers)
+
+ def test_jinja_template_handle_noop_on_content_signals(self):
+ """Perform no part handling when content type is CONTENT_SIGNALS."""
+ script_handler = ShellScriptPartHandler(self.paths)
+
+ h = JinjaTemplatePartHandler(
+ self.paths, sub_handlers=[script_handler])
+ with mock.patch.object(script_handler, 'handle_part') as m_handle_part:
+ h.handle_part(
+ data='data', ctype=handlers.CONTENT_START, filename='part-1',
+ payload='## template: jinja\n#!/bin/bash\necho himom',
+ frequency='freq', headers='headers')
+ m_handle_part.assert_not_called()
+
+ @skipUnlessJinja()
+ def test_jinja_template_handle_subhandler_v2_with_clean_payload(self):
+ """Call version 2 subhandler.handle_part with stripped payload."""
+ script_handler = ShellScriptPartHandler(self.paths)
+ self.assertEqual(2, script_handler.handler_version)
+
+ # Create required instance-data.json file
+ instance_json = os.path.join(self.run_dir, 'instance-data.json')
+ instance_data = {'topkey': 'echo himom'}
+ util.write_file(instance_json, util.json_dumps(instance_data))
+ h = JinjaTemplatePartHandler(
+ self.paths, sub_handlers=[script_handler])
+ with mock.patch.object(script_handler, 'handle_part') as m_part:
+ # ctype with leading '!' not in handlers.CONTENT_SIGNALS
+ h.handle_part(
+ data='data', ctype="!" + handlers.CONTENT_START,
+ filename='part01',
+ payload='## template: jinja \t \n#!/bin/bash\n{{ topkey }}',
+ frequency='freq', headers='headers')
+ m_part.assert_called_once_with(
+ 'data', '!__begin__', 'part01', '#!/bin/bash\necho himom', 'freq')
+
+ @skipUnlessJinja()
+ def test_jinja_template_handle_subhandler_v3_with_clean_payload(self):
+ """Call version 3 subhandler.handle_part with stripped payload."""
+ cloudcfg_handler = CloudConfigPartHandler(self.paths)
+ self.assertEqual(3, cloudcfg_handler.handler_version)
+
+ # Create required instance-data.json file
+ instance_json = os.path.join(self.run_dir, 'instance-data.json')
+ instance_data = {'topkey': {'sub': 'runcmd: [echo hi]'}}
+ util.write_file(instance_json, util.json_dumps(instance_data))
+ h = JinjaTemplatePartHandler(
+ self.paths, sub_handlers=[cloudcfg_handler])
+ with mock.patch.object(cloudcfg_handler, 'handle_part') as m_part:
+ # ctype with leading '!' not in handlers.CONTENT_SIGNALS
+ h.handle_part(
+ data='data', ctype="!" + handlers.CONTENT_END,
+ filename='part01',
+ payload='## template: jinja\n#cloud-config\n{{ topkey.sub }}',
+ frequency='freq', headers='headers')
+ m_part.assert_called_once_with(
+ 'data', '!__end__', 'part01', '#cloud-config\nruncmd: [echo hi]',
+ 'freq', 'headers')
+
+ def test_jinja_template_handle_errors_on_missing_instance_data_json(self):
+ """If instance-data is absent, raise an error from handle_part."""
+ script_handler = ShellScriptPartHandler(self.paths)
+ h = JinjaTemplatePartHandler(
+ self.paths, sub_handlers=[script_handler])
+ with self.assertRaises(RuntimeError) as context_manager:
+ h.handle_part(
+ data='data', ctype="!" + handlers.CONTENT_START,
+ filename='part01',
+ payload='## template: jinja \n#!/bin/bash\necho himom',
+ frequency='freq', headers='headers')
+ script_file = os.path.join(script_handler.script_dir, 'part01')
+ self.assertEqual(
+ 'Cannot render jinja template vars. Instance data not yet present'
+ ' at {}/instance-data.json'.format(
+ self.run_dir), str(context_manager.exception))
+ self.assertFalse(
+ os.path.exists(script_file),
+ 'Unexpected file created %s' % script_file)
+
+ @skipUnlessJinja()
+ def test_jinja_template_handle_renders_jinja_content(self):
+ """When present, render jinja variables from instance-data.json."""
+ script_handler = ShellScriptPartHandler(self.paths)
+ instance_json = os.path.join(self.run_dir, 'instance-data.json')
+ instance_data = {'topkey': {'subkey': 'echo himom'}}
+ util.write_file(instance_json, util.json_dumps(instance_data))
+ h = JinjaTemplatePartHandler(
+ self.paths, sub_handlers=[script_handler])
+ h.handle_part(
+ data='data', ctype="!" + handlers.CONTENT_START,
+ filename='part01',
+ payload=(
+ '## template: jinja \n'
+ '#!/bin/bash\n'
+ '{{ topkey.subkey|default("nosubkey") }}'),
+ frequency='freq', headers='headers')
+ script_file = os.path.join(script_handler.script_dir, 'part01')
+ self.assertNotIn(
+ 'Instance data not yet present at {}/instance-data.json'.format(
+ self.run_dir),
+ self.logs.getvalue())
+ self.assertEqual(
+ '#!/bin/bash\necho himom', util.load_file(script_file))
+
+ @skipUnlessJinja()
+ def test_jinja_template_handle_renders_jinja_content_missing_keys(self):
+ """When specified jinja variable is undefined, log a warning."""
+ script_handler = ShellScriptPartHandler(self.paths)
+ instance_json = os.path.join(self.run_dir, 'instance-data.json')
+ instance_data = {'topkey': {'subkey': 'echo himom'}}
+ util.write_file(instance_json, util.json_dumps(instance_data))
+ h = JinjaTemplatePartHandler(
+ self.paths, sub_handlers=[script_handler])
+ h.handle_part(
+ data='data', ctype="!" + handlers.CONTENT_START,
+ filename='part01',
+ payload='## template: jinja \n#!/bin/bash\n{{ goodtry }}',
+ frequency='freq', headers='headers')
+ script_file = os.path.join(script_handler.script_dir, 'part01')
+ self.assertTrue(
+ os.path.exists(script_file),
+ 'Missing expected file %s' % script_file)
+ self.assertIn(
+ "WARNING: Could not render jinja template variables in file"
+ " 'part01': 'goodtry'\n",
+ self.logs.getvalue())
+
+
+class TestConvertJinjaInstanceData(CiTestCase):
+
+ def test_convert_instance_data_hyphens_to_underscores(self):
+ """Replace hyphenated keys with underscores in instance-data."""
+ data = {'hyphenated-key': 'hyphenated-val',
+ 'underscore_delim_key': 'underscore_delimited_val'}
+ expected_data = {'hyphenated_key': 'hyphenated-val',
+ 'underscore_delim_key': 'underscore_delimited_val'}
+ self.assertEqual(
+ expected_data,
+ convert_jinja_instance_data(data=data))
+
+ def test_convert_instance_data_promotes_versioned_keys_to_top_level(self):
+ """Any versioned keys are promoted as top-level keys
+
+ This provides any cloud-init standardized keys up at a top-level to
+ allow ease of reference for users. Intsead of v1.availability_zone,
+ the name availability_zone can be used in templates.
+ """
+ data = {'ds': {'dskey1': 1, 'dskey2': 2},
+ 'v1': {'v1key1': 'v1.1'},
+ 'v2': {'v2key1': 'v2.1'}}
+ expected_data = copy.deepcopy(data)
+ expected_data.update({'v1key1': 'v1.1', 'v2key1': 'v2.1'})
+
+ converted_data = convert_jinja_instance_data(data=data)
+ self.assertItemsEqual(
+ ['ds', 'v1', 'v2', 'v1key1', 'v2key1'], converted_data.keys())
+ self.assertEqual(
+ expected_data,
+ converted_data)
+
+ def test_convert_instance_data_most_recent_version_of_promoted_keys(self):
+ """The most-recent versioned key value is promoted to top-level."""
+ data = {'v1': {'key1': 'old v1 key1', 'key2': 'old v1 key2'},
+ 'v2': {'key1': 'newer v2 key1', 'key3': 'newer v2 key3'},
+ 'v3': {'key1': 'newest v3 key1'}}
+ expected_data = copy.deepcopy(data)
+ expected_data.update(
+ {'key1': 'newest v3 key1', 'key2': 'old v1 key2',
+ 'key3': 'newer v2 key3'})
+
+ converted_data = convert_jinja_instance_data(data=data)
+ self.assertEqual(
+ expected_data,
+ converted_data)
+
+ def test_convert_instance_data_decodes_decode_paths(self):
+ """Any decode_paths provided are decoded by convert_instance_data."""
+ data = {'key1': {'subkey1': 'aGkgbW9t'}, 'key2': 'aGkgZGFk'}
+ expected_data = copy.deepcopy(data)
+ expected_data['key1']['subkey1'] = 'hi mom'
+
+ converted_data = convert_jinja_instance_data(
+ data=data, decode_paths=('key1/subkey1',))
+ self.assertEqual(
+ expected_data,
+ converted_data)
+
+
+class TestRenderJinjaPayload(CiTestCase):
+
+ with_logs = True
+
+ @skipUnlessJinja()
+ def test_render_jinja_payload_logs_jinja_vars_on_debug(self):
+ """When debug is True, log jinja varables available."""
+ payload = (
+ '## template: jinja\n#!/bin/sh\necho hi from {{ v1.hostname }}')
+ instance_data = {'v1': {'hostname': 'foo'}, 'instance-id': 'iid'}
+ expected_log = dedent("""\
+ DEBUG: Converted jinja variables
+ {
+ "hostname": "foo",
+ "instance_id": "iid",
+ "v1": {
+ "hostname": "foo"
+ }
+ }
+ """)
+ self.assertEqual(
+ render_jinja_payload(
+ payload=payload, payload_fn='myfile',
+ instance_data=instance_data, debug=True),
+ '#!/bin/sh\necho hi from foo')
+ self.assertEqual(expected_log, self.logs.getvalue())
+
+ @skipUnlessJinja()
+ def test_render_jinja_payload_replaces_missing_variables_and_warns(self):
+ """Warn on missing jinja variables and replace the absent variable."""
+ payload = (
+ '## template: jinja\n#!/bin/sh\necho hi from {{ NOTHERE }}')
+ instance_data = {'v1': {'hostname': 'foo'}, 'instance-id': 'iid'}
+ self.assertEqual(
+ render_jinja_payload(
+ payload=payload, payload_fn='myfile',
+ instance_data=instance_data),
+ '#!/bin/sh\necho hi from CI_MISSING_JINJA_VAR/NOTHERE')
+ expected_log = (
+ 'WARNING: Could not render jinja template variables in file'
+ " 'myfile': 'NOTHERE'")
+ self.assertIn(expected_log, self.logs.getvalue())
+
# vi: ts=4 expandtab