summaryrefslogtreecommitdiff
path: root/tests/unittests/test_userdata.py
diff options
context:
space:
mode:
authorBen Howard <ben.howard@canonical.com>2014-01-08 17:16:24 -0700
committerBen Howard <ben.howard@canonical.com>2014-01-08 17:16:24 -0700
commita5727fe1477c9cc4288d1ac41f70bd1ab7d7928a (patch)
tree1340e51d3f4cc61ae07d322dab8e19a8e0cef7b8 /tests/unittests/test_userdata.py
parentee9fbafae1abfd7ba3f4bece11f722519116ca81 (diff)
downloadvyos-cloud-init-a5727fe1477c9cc4288d1ac41f70bd1ab7d7928a.tar.gz
vyos-cloud-init-a5727fe1477c9cc4288d1ac41f70bd1ab7d7928a.zip
Significant re-working of the userdata handling and introduction of
vendordata. Vendordata is a datasource provided userdata-like blob that is parsed similiarly to userdata, execept at the user's pleasure. cloudinit/config/cc_scripts_vendor.py: added vendor script cloud config cloudinit/config/cc_vendor_scripts_per_boot.py: added vendor per boot cloud config cloudinit/config/cc_vendor_scripts_per_instance.py: added vendor per instance vendor cloud config cloudinit/config/cc_vendor_scripts_per_once.py: added per once vendor cloud config script doc/examples/cloud-config-vendor-data.txt: documentation of vendor-data examples doc/vendordata.txt: documentation of vendordata for vendors (RENAMED) tests/unittests/test_userdata.py => tests/unittests/test_userdata.py TO: tests/unittests/test_userdata.py => tests/unittests/test_data.py: userdata test cases are not expanded to confirm superiority over vendor data. bin/cloud-init: change instances of 'consume_userdata' to 'consume_data' cloudinit/handlers/cloud_config.py: Added vendor script handling to default cloud-config modules cloudinit/handlers/shell_script.py: Added ability to change the path key to support vendor provided 'vendor-scripts'. Defaults to 'script'. cloudinit/helpers.py: - Changed ConfigMerger to include handling of vendordata. - Changed helpers to include paths for vendordata. cloudinit/sources/__init__.py: Added functions for helping vendordata - get_vendordata_raw(): returns vendordata unprocessed - get_vendordata(): returns vendordata through userdata processor - has_vendordata(): indicator if vendordata is present - consume_vendordata(): datasource directive for indicating explict user approval of vendordata consumption. Defaults to 'false' cloudinit/stages.py: Re-jiggered for handling of vendordata - _initial_subdirs(): added vendor script definition - update(): added self._store_vendordata() - [ADDED] _store_vendordata(): store vendordata - _get_default_handlers(): modified to allow for filtering which handlers will run against vendordata - [ADDED] _do_handlers(): moved logic from consume_userdata to _do_handlers(). This allows _consume_vendordata() and _consume_userdata() to use the same code path. - [RENAMED] consume_userdata() to _consume_userdata() - [ADDED] _consume_vendordata() for handling vendordata - run after userdata to get user cloud-config - uses ConfigMerger to get the configuration from the instance perspective about whether or not to use vendordata - [ADDED] consume_data() to call _consume_{user,vendor}data cloudinit/util.py: - [ADDED] get_nested_option_as_list() used by cc_vendor* for getting a nested value from a dict and returned as a list - runparts(): added 'exe_prefix' for running exe with a prefix, used by cc_vendor* config/cloud.cfg: Added vendor script execution as default tests/unittests/test_runs/test_merge_run.py: changed consume_userdata() to consume_data() tests/unittests/test_runs/test_simple_run.py: changed consume_userdata() to consume_data()
Diffstat (limited to 'tests/unittests/test_userdata.py')
-rw-r--r--tests/unittests/test_userdata.py308
1 files changed, 0 insertions, 308 deletions
diff --git a/tests/unittests/test_userdata.py b/tests/unittests/test_userdata.py
deleted file mode 100644
index 5ffe8f0a..00000000
--- a/tests/unittests/test_userdata.py
+++ /dev/null
@@ -1,308 +0,0 @@
-"""Tests for handling of userdata within cloud init."""
-
-import StringIO
-
-import gzip
-import logging
-import os
-
-from email.mime.application import MIMEApplication
-from email.mime.base import MIMEBase
-from email.mime.multipart import MIMEMultipart
-
-from cloudinit import handlers
-from cloudinit import helpers as c_helpers
-from cloudinit import log
-from cloudinit import sources
-from cloudinit import stages
-from cloudinit import util
-
-INSTANCE_ID = "i-testing"
-
-from tests.unittests import helpers
-
-
-class FakeDataSource(sources.DataSource):
-
- def __init__(self, userdata):
- sources.DataSource.__init__(self, {}, None, None)
- self.metadata = {'instance-id': INSTANCE_ID}
- self.userdata_raw = userdata
-
-
-# FIXME: these tests shouldn't be checking log output??
-# Weirddddd...
-class TestConsumeUserData(helpers.FilesystemMockingTestCase):
-
- def setUp(self):
- helpers.FilesystemMockingTestCase.setUp(self)
- self._log = None
- self._log_file = None
- self._log_handler = None
-
- def tearDown(self):
- helpers.FilesystemMockingTestCase.tearDown(self)
- if self._log_handler and self._log:
- self._log.removeHandler(self._log_handler)
-
- def capture_log(self, lvl=logging.DEBUG):
- log_file = StringIO.StringIO()
- self._log_handler = logging.StreamHandler(log_file)
- self._log_handler.setLevel(lvl)
- self._log = log.getLogger()
- self._log.addHandler(self._log_handler)
- return log_file
-
- def test_simple_jsonp(self):
- blob = '''
-#cloud-config-jsonp
-[
- { "op": "add", "path": "/baz", "value": "qux" },
- { "op": "add", "path": "/bar", "value": "qux2" }
-]
-'''
-
- ci = stages.Init()
- ci.datasource = FakeDataSource(blob)
- new_root = self.makeDir()
- self.patchUtils(new_root)
- self.patchOS(new_root)
- ci.fetch()
- ci.consume_userdata()
- cc_contents = util.load_file(ci.paths.get_ipath("cloud_config"))
- cc = util.load_yaml(cc_contents)
- self.assertEquals(2, len(cc))
- self.assertEquals('qux', cc['baz'])
- self.assertEquals('qux2', cc['bar'])
-
- def test_mixed_cloud_config(self):
- blob_cc = '''
-#cloud-config
-a: b
-c: d
-'''
- message_cc = MIMEBase("text", "cloud-config")
- message_cc.set_payload(blob_cc)
-
- blob_jp = '''
-#cloud-config-jsonp
-[
- { "op": "replace", "path": "/a", "value": "c" },
- { "op": "remove", "path": "/c" }
-]
-'''
-
- message_jp = MIMEBase('text', "cloud-config-jsonp")
- message_jp.set_payload(blob_jp)
-
- message = MIMEMultipart()
- message.attach(message_cc)
- message.attach(message_jp)
-
- ci = stages.Init()
- ci.datasource = FakeDataSource(str(message))
- new_root = self.makeDir()
- self.patchUtils(new_root)
- self.patchOS(new_root)
- ci.fetch()
- ci.consume_userdata()
- cc_contents = util.load_file(ci.paths.get_ipath("cloud_config"))
- cc = util.load_yaml(cc_contents)
- self.assertEquals(1, len(cc))
- self.assertEquals('c', cc['a'])
-
- def test_merging_cloud_config(self):
- blob = '''
-#cloud-config
-a: b
-e: f
-run:
- - b
- - c
-'''
- message1 = MIMEBase("text", "cloud-config")
- message1.set_payload(blob)
-
- blob2 = '''
-#cloud-config
-a: e
-e: g
-run:
- - stuff
- - morestuff
-'''
- message2 = MIMEBase("text", "cloud-config")
- message2['X-Merge-Type'] = ('dict(recurse_array,'
- 'recurse_str)+list(append)+str(append)')
- message2.set_payload(blob2)
-
- blob3 = '''
-#cloud-config
-e:
- - 1
- - 2
- - 3
-p: 1
-'''
- message3 = MIMEBase("text", "cloud-config")
- message3.set_payload(blob3)
-
- messages = [message1, message2, message3]
-
- paths = c_helpers.Paths({}, ds=FakeDataSource(''))
- cloud_cfg = handlers.cloud_config.CloudConfigPartHandler(paths)
-
- new_root = self.makeDir()
- self.patchUtils(new_root)
- self.patchOS(new_root)
- cloud_cfg.handle_part(None, handlers.CONTENT_START, None, None, None,
- None)
- for i, m in enumerate(messages):
- headers = dict(m)
- fn = "part-%s" % (i + 1)
- payload = m.get_payload(decode=True)
- cloud_cfg.handle_part(None, headers['Content-Type'],
- fn, payload, None, headers)
- cloud_cfg.handle_part(None, handlers.CONTENT_END, None, None, None,
- None)
- contents = util.load_file(paths.get_ipath('cloud_config'))
- contents = util.load_yaml(contents)
- self.assertEquals(contents['run'], ['b', 'c', 'stuff', 'morestuff'])
- self.assertEquals(contents['a'], 'be')
- self.assertEquals(contents['e'], [1, 2, 3])
- self.assertEquals(contents['p'], 1)
-
- def test_unhandled_type_warning(self):
- """Raw text without magic is ignored but shows warning."""
- ci = stages.Init()
- data = "arbitrary text\n"
- ci.datasource = FakeDataSource(data)
-
- mock_write = self.mocker.replace("cloudinit.util.write_file",
- passthrough=False)
- mock_write(ci.paths.get_ipath("cloud_config"), "", 0600)
- self.mocker.replay()
-
- log_file = self.capture_log(logging.WARNING)
- ci.fetch()
- ci.consume_userdata()
- self.assertIn(
- "Unhandled non-multipart (text/x-not-multipart) userdata:",
- log_file.getvalue())
-
- def test_mime_gzip_compressed(self):
- """Tests that individual message gzip encoding works."""
-
- def gzip_part(text):
- contents = StringIO.StringIO()
- f = gzip.GzipFile(fileobj=contents, mode='w')
- f.write(str(text))
- f.flush()
- f.close()
- return MIMEApplication(contents.getvalue(), 'gzip')
-
- base_content1 = '''
-#cloud-config
-a: 2
-'''
-
- base_content2 = '''
-#cloud-config
-b: 3
-c: 4
-'''
-
- message = MIMEMultipart('test')
- message.attach(gzip_part(base_content1))
- message.attach(gzip_part(base_content2))
- ci = stages.Init()
- ci.datasource = FakeDataSource(str(message))
- new_root = self.makeDir()
- self.patchUtils(new_root)
- self.patchOS(new_root)
- ci.fetch()
- ci.consume_userdata()
- contents = util.load_file(ci.paths.get_ipath("cloud_config"))
- contents = util.load_yaml(contents)
- self.assertTrue(isinstance(contents, dict))
- self.assertEquals(3, len(contents))
- self.assertEquals(2, contents['a'])
- self.assertEquals(3, contents['b'])
- self.assertEquals(4, contents['c'])
-
- def test_mime_text_plain(self):
- """Mime message of type text/plain is ignored but shows warning."""
- ci = stages.Init()
- message = MIMEBase("text", "plain")
- message.set_payload("Just text")
- ci.datasource = FakeDataSource(message.as_string())
-
- mock_write = self.mocker.replace("cloudinit.util.write_file",
- passthrough=False)
- mock_write(ci.paths.get_ipath("cloud_config"), "", 0600)
- self.mocker.replay()
-
- log_file = self.capture_log(logging.WARNING)
- ci.fetch()
- ci.consume_userdata()
- self.assertIn(
- "Unhandled unknown content-type (text/plain)",
- log_file.getvalue())
-
- def test_shellscript(self):
- """Raw text starting #!/bin/sh is treated as script."""
- ci = stages.Init()
- script = "#!/bin/sh\necho hello\n"
- ci.datasource = FakeDataSource(script)
-
- outpath = os.path.join(ci.paths.get_ipath_cur("scripts"), "part-001")
- mock_write = self.mocker.replace("cloudinit.util.write_file",
- passthrough=False)
- mock_write(ci.paths.get_ipath("cloud_config"), "", 0600)
- mock_write(outpath, script, 0700)
- self.mocker.replay()
-
- log_file = self.capture_log(logging.WARNING)
- ci.fetch()
- ci.consume_userdata()
- self.assertEqual("", log_file.getvalue())
-
- def test_mime_text_x_shellscript(self):
- """Mime message of type text/x-shellscript is treated as script."""
- ci = stages.Init()
- script = "#!/bin/sh\necho hello\n"
- message = MIMEBase("text", "x-shellscript")
- message.set_payload(script)
- ci.datasource = FakeDataSource(message.as_string())
-
- outpath = os.path.join(ci.paths.get_ipath_cur("scripts"), "part-001")
- mock_write = self.mocker.replace("cloudinit.util.write_file",
- passthrough=False)
- mock_write(ci.paths.get_ipath("cloud_config"), "", 0600)
- mock_write(outpath, script, 0700)
- self.mocker.replay()
-
- log_file = self.capture_log(logging.WARNING)
- ci.fetch()
- ci.consume_userdata()
- self.assertEqual("", log_file.getvalue())
-
- def test_mime_text_plain_shell(self):
- """Mime type text/plain starting #!/bin/sh is treated as script."""
- ci = stages.Init()
- script = "#!/bin/sh\necho hello\n"
- message = MIMEBase("text", "plain")
- message.set_payload(script)
- ci.datasource = FakeDataSource(message.as_string())
-
- outpath = os.path.join(ci.paths.get_ipath_cur("scripts"), "part-001")
- mock_write = self.mocker.replace("cloudinit.util.write_file",
- passthrough=False)
- mock_write(outpath, script, 0700)
- mock_write(ci.paths.get_ipath("cloud_config"), "", 0600)
- self.mocker.replay()
-
- log_file = self.capture_log(logging.WARNING)
- ci.fetch()
- ci.consume_userdata()
- self.assertEqual("", log_file.getvalue())