summaryrefslogtreecommitdiff
path: root/tests
diff options
context:
space:
mode:
Diffstat (limited to 'tests')
-rw-r--r--tests/unittests/test__init__.py27
-rw-r--r--tests/unittests/test_merging.py205
-rw-r--r--tests/unittests/test_userdata.py82
3 files changed, 232 insertions, 82 deletions
diff --git a/tests/unittests/test__init__.py b/tests/unittests/test__init__.py
index ac082076..7924755a 100644
--- a/tests/unittests/test__init__.py
+++ b/tests/unittests/test__init__.py
@@ -22,8 +22,10 @@ class FakeModule(handlers.Handler):
def list_types(self):
return self.types
- def _handle_part(self, data, ctype, filename, payload, frequency):
+ def handle_part(self, data, ctype, filename, payload, frequency):
pass
+
+
class TestWalkerHandleHandler(MockerTestCase):
@@ -103,6 +105,9 @@ class TestHandlerHandlePart(MockerTestCase):
self.filename = "fake filename"
self.payload = "fake payload"
self.frequency = settings.PER_INSTANCE
+ self.headers = {
+ 'Content-Type': self.ctype,
+ }
def test_normal_version_1(self):
"""
@@ -118,8 +123,8 @@ class TestHandlerHandlePart(MockerTestCase):
self.payload)
self.mocker.replay()
- handlers.run_part(mod_mock, self.data, self.ctype, self.filename,
- self.payload, self.frequency)
+ handlers.run_part(mod_mock, self.data, self.filename,
+ self.payload, self.frequency, self.headers)
def test_normal_version_2(self):
"""
@@ -135,8 +140,8 @@ class TestHandlerHandlePart(MockerTestCase):
self.payload, self.frequency)
self.mocker.replay()
- handlers.run_part(mod_mock, self.data, self.ctype, self.filename,
- self.payload, self.frequency)
+ handlers.run_part(mod_mock, self.data, self.filename,
+ self.payload, self.frequency, self.headers)
def test_modfreq_per_always(self):
"""
@@ -152,8 +157,8 @@ class TestHandlerHandlePart(MockerTestCase):
self.payload)
self.mocker.replay()
- handlers.run_part(mod_mock, self.data, self.ctype, self.filename,
- self.payload, self.frequency)
+ handlers.run_part(mod_mock, self.data, self.filename,
+ self.payload, self.frequency, self.headers)
def test_no_handle_when_modfreq_once(self):
"""C{handle_part} is not called if frequency is once."""
@@ -163,8 +168,8 @@ class TestHandlerHandlePart(MockerTestCase):
self.mocker.result(settings.PER_ONCE)
self.mocker.replay()
- handlers.run_part(mod_mock, self.data, self.ctype, self.filename,
- self.payload, self.frequency)
+ handlers.run_part(mod_mock, self.data, self.filename,
+ self.payload, self.frequency, self.headers)
def test_exception_is_caught(self):
"""Exceptions within C{handle_part} are caught and logged."""
@@ -178,8 +183,8 @@ class TestHandlerHandlePart(MockerTestCase):
self.mocker.throw(Exception())
self.mocker.replay()
- handlers.run_part(mod_mock, self.data, self.ctype, self.filename,
- self.payload, self.frequency)
+ handlers.run_part(mod_mock, self.data, self.filename,
+ self.payload, self.frequency, self.headers)
class TestCmdlineUrl(MockerTestCase):
diff --git a/tests/unittests/test_merging.py b/tests/unittests/test_merging.py
index 0037b966..fa7ee8e4 100644
--- a/tests/unittests/test_merging.py
+++ b/tests/unittests/test_merging.py
@@ -1,62 +1,143 @@
-from mocker import MockerTestCase
-
-from cloudinit import util
-
-
-class TestMergeDict(MockerTestCase):
- def test_simple_merge(self):
- """Test simple non-conflict merge."""
- source = {"key1": "value1"}
- candidate = {"key2": "value2"}
- result = util.mergedict(source, candidate)
- self.assertEqual({"key1": "value1", "key2": "value2"}, result)
-
- def test_nested_merge(self):
- """Test nested merge."""
- source = {"key1": {"key1.1": "value1.1"}}
- candidate = {"key1": {"key1.2": "value1.2"}}
- result = util.mergedict(source, candidate)
- self.assertEqual(
- {"key1": {"key1.1": "value1.1", "key1.2": "value1.2"}}, result)
-
- def test_merge_does_not_override(self):
- """Test that candidate doesn't override source."""
- source = {"key1": "value1", "key2": "value2"}
- candidate = {"key1": "value2", "key2": "NEW VALUE"}
- result = util.mergedict(source, candidate)
- self.assertEqual(source, result)
-
- def test_empty_candidate(self):
- """Test empty candidate doesn't change source."""
- source = {"key": "value"}
- candidate = {}
- result = util.mergedict(source, candidate)
- self.assertEqual(source, result)
-
- def test_empty_source(self):
- """Test empty source is replaced by candidate."""
- source = {}
- candidate = {"key": "value"}
- result = util.mergedict(source, candidate)
- self.assertEqual(candidate, result)
-
- def test_non_dict_candidate(self):
- """Test non-dict candidate is discarded."""
- source = {"key": "value"}
- candidate = "not a dict"
- result = util.mergedict(source, candidate)
- self.assertEqual(source, result)
-
- def test_non_dict_source(self):
- """Test non-dict source is not modified with a dict candidate."""
- source = "not a dict"
- candidate = {"key": "value"}
- result = util.mergedict(source, candidate)
- self.assertEqual(source, result)
-
- def test_neither_dict(self):
- """Test if neither candidate or source is dict source wins."""
- source = "source"
- candidate = "candidate"
- result = util.mergedict(source, candidate)
- self.assertEqual(source, result)
+import os
+
+from tests.unittests import helpers
+
+from cloudinit import mergers
+
+
+class TestSimpleRun(helpers.MockerTestCase):
+ def test_basic_merge(self):
+ source = {
+ 'Blah': ['blah2'],
+ 'Blah3': 'c',
+ }
+ merge_with = {
+ 'Blah2': ['blah3'],
+ 'Blah3': 'b',
+ 'Blah': ['123'],
+ }
+ # Basic merge should not do thing special
+ merge_how = "list()+dict()+str()"
+ merger_set = mergers.string_extract_mergers(merge_how)
+ self.assertEquals(3, len(merger_set))
+ merger = mergers.construct(merger_set)
+ merged = merger.merge(source, merge_with)
+ self.assertEquals(merged['Blah'], ['blah2'])
+ self.assertEquals(merged['Blah2'], ['blah3'])
+ self.assertEquals(merged['Blah3'], 'c')
+
+ def test_dict_overwrite(self):
+ source = {
+ 'Blah': ['blah2'],
+ }
+ merge_with = {
+ 'Blah': ['123'],
+ }
+ # Now lets try a dict overwrite
+ merge_how = "list()+dict(overwrite)+str()"
+ merger_set = mergers.string_extract_mergers(merge_how)
+ self.assertEquals(3, len(merger_set))
+ merger = mergers.construct(merger_set)
+ merged = merger.merge(source, merge_with)
+ self.assertEquals(merged['Blah'], ['123'])
+
+ def test_string_append(self):
+ source = {
+ 'Blah': 'blah2',
+ }
+ merge_with = {
+ 'Blah': '345',
+ }
+ merge_how = "list()+dict()+str(append)"
+ merger_set = mergers.string_extract_mergers(merge_how)
+ self.assertEquals(3, len(merger_set))
+ merger = mergers.construct(merger_set)
+ merged = merger.merge(source, merge_with)
+ self.assertEquals(merged['Blah'], 'blah2345')
+
+ def test_list_extend(self):
+ source = ['abc']
+ merge_with = ['123']
+ merge_how = "list(extend)+dict()+str()"
+ merger_set = mergers.string_extract_mergers(merge_how)
+ self.assertEquals(3, len(merger_set))
+ merger = mergers.construct(merger_set)
+ merged = merger.merge(source, merge_with)
+ self.assertEquals(merged, ['abc', '123'])
+
+ def test_deep_merge(self):
+ source = {
+ 'a': [1, 'b', 2],
+ 'b': 'blahblah',
+ 'c': {
+ 'e': [1, 2, 3],
+ 'f': 'bigblobof',
+ 'iamadict': {
+ 'ok': 'ok',
+ }
+ },
+ 'run': [
+ 'runme',
+ 'runme2',
+ ],
+ 'runmereally': [
+ 'e', ['a'], 'd',
+ ],
+ }
+ merge_with = {
+ 'a': ['e', 'f', 'g'],
+ 'b': 'more',
+ 'c': {
+ 'a': 'b',
+ 'f': 'stuff',
+ },
+ 'run': [
+ 'morecmd',
+ 'moremoremore',
+ ],
+ 'runmereally': [
+ 'blah', ['b'], 'e',
+ ],
+ }
+ merge_how = "list(extend)+dict()+str(append)"
+ merger_set = mergers.string_extract_mergers(merge_how)
+ self.assertEquals(3, len(merger_set))
+ merger = mergers.construct(merger_set)
+ merged = merger.merge(source, merge_with)
+ self.assertEquals(merged['a'], [1, 'b', 2, 'e', 'f', 'g'])
+ self.assertEquals(merged['b'], 'blahblahmore')
+ self.assertEquals(merged['c']['f'], 'bigblobofstuff')
+ self.assertEquals(merged['run'], ['runme', 'runme2', 'morecmd', 'moremoremore'])
+ self.assertEquals(merged['runmereally'], ['e', ['a'], 'd', 'blah', ['b'], 'e'])
+
+ def test_dict_overwrite_layered(self):
+ source = {
+ 'Blah3': {
+ 'f': '3',
+ 'g': {
+ 'a': 'b',
+ }
+ }
+ }
+ merge_with = {
+ 'Blah3': {
+ 'e': '2',
+ 'g': {
+ 'e': 'f',
+ }
+ }
+ }
+ merge_how = "list()+dict()+str()"
+ merger_set = mergers.string_extract_mergers(merge_how)
+ self.assertEquals(3, len(merger_set))
+ merger = mergers.construct(merger_set)
+ merged = merger.merge(source, merge_with)
+ self.assertEquals(merged['Blah3'], {
+ 'e': '2',
+ 'f': '3',
+ 'g': {
+ 'a': 'b',
+ 'e': 'f',
+ }
+ })
+
diff --git a/tests/unittests/test_userdata.py b/tests/unittests/test_userdata.py
index 82a4c555..ef0dd7b8 100644
--- a/tests/unittests/test_userdata.py
+++ b/tests/unittests/test_userdata.py
@@ -9,12 +9,17 @@ from email.mime.base import MIMEBase
from mocker import MockerTestCase
+from cloudinit import handlers
+from cloudinit import helpers as c_helpers
from cloudinit import log
from cloudinit import sources
from cloudinit import stages
+from cloudinit import util
INSTANCE_ID = "i-testing"
+from tests.unittests import helpers
+
class FakeDataSource(sources.DataSource):
@@ -26,22 +31,16 @@ class FakeDataSource(sources.DataSource):
# FIXME: these tests shouldn't be checking log output??
# Weirddddd...
-
-
-class TestConsumeUserData(MockerTestCase):
+class TestConsumeUserData(helpers.FilesystemMockingTestCase):
def setUp(self):
- MockerTestCase.setUp(self)
- # Replace the write so no actual files
- # get written out...
- self.mock_write = self.mocker.replace("cloudinit.util.write_file",
- passthrough=False)
+ helpers.FilesystemMockingTestCase.setUp(self)
self._log = None
self._log_file = None
self._log_handler = None
def tearDown(self):
- MockerTestCase.tearDown(self)
+ helpers.FilesystemMockingTestCase.tearDown(self)
if self._log_handler and self._log:
self._log.removeHandler(self._log_handler)
@@ -53,12 +52,73 @@ class TestConsumeUserData(MockerTestCase):
self._log.addHandler(self._log_handler)
return log_file
+ def test_merging_cloud_config(self):
+ blob = '''
+#cloud-config
+a: b
+e: f
+run:
+ - b
+ - c
+'''
+ message1 = MIMEBase("text", "cloud-config")
+ message1['Merge-Type'] = 'dict()+list(extend)+str(append)'
+ message1.set_payload(blob)
+
+ blob2 = '''
+#cloud-config
+a: e
+e: g
+run:
+ - stuff
+ - morestuff
+'''
+ message2 = MIMEBase("text", "cloud-config")
+ message2['X-Merge-Type'] = 'dict()+list(extend)+str()'
+ message2.set_payload(blob2)
+
+ blob3 = '''
+#cloud-config
+e:
+ - 1
+ - 2
+ - 3
+p: 1
+'''
+ message3 = MIMEBase("text", "cloud-config")
+ message3['Merge-Type'] = 'dict()+list()+str()'
+ message3.set_payload(blob3)
+
+ messages = [message1, message2, message3]
+
+ paths = c_helpers.Paths({}, ds=FakeDataSource(''))
+ cloud_cfg = handlers.cloud_config.CloudConfigPartHandler(paths)
+
+ new_root = self.makeDir()
+ self.patchUtils(new_root)
+ self.patchOS(new_root)
+ cloud_cfg.handle_part(None, handlers.CONTENT_START, None, None, None, None)
+ for i, m in enumerate(messages):
+ headers = dict(m)
+ fn = "part-%s" % (i + 1)
+ payload = m.get_payload(decode=True)
+ cloud_cfg.handle_part(None, headers['Content-Type'],
+ fn, payload, None, headers)
+ cloud_cfg.handle_part(None, handlers.CONTENT_END, None, None, None, None)
+ contents = util.load_file(paths.get_ipath('cloud_config'))
+ contents = util.load_yaml(contents)
+ self.assertEquals(contents['run'], ['b', 'c', 'stuff', 'morestuff'])
+ self.assertEquals(contents['a'], 'be')
+ self.assertEquals(contents['e'], 'fg')
+ self.assertEquals(contents['p'], 1)
+
def test_unhandled_type_warning(self):
"""Raw text without magic is ignored but shows warning."""
ci = stages.Init()
data = "arbitrary text\n"
ci.datasource = FakeDataSource(data)
+ self.mock_write = self.mocker.replace("cloudinit.util.write_file", passthrough=False)
self.mock_write(ci.paths.get_ipath("cloud_config"), "", 0600)
self.mocker.replay()
@@ -76,6 +136,7 @@ class TestConsumeUserData(MockerTestCase):
message.set_payload("Just text")
ci.datasource = FakeDataSource(message.as_string())
+ self.mock_write = self.mocker.replace("cloudinit.util.write_file", passthrough=False)
self.mock_write(ci.paths.get_ipath("cloud_config"), "", 0600)
self.mocker.replay()
@@ -93,6 +154,7 @@ class TestConsumeUserData(MockerTestCase):
ci.datasource = FakeDataSource(script)
outpath = os.path.join(ci.paths.get_ipath_cur("scripts"), "part-001")
+ self.mock_write = self.mocker.replace("cloudinit.util.write_file", passthrough=False)
self.mock_write(ci.paths.get_ipath("cloud_config"), "", 0600)
self.mock_write(outpath, script, 0700)
self.mocker.replay()
@@ -111,6 +173,7 @@ class TestConsumeUserData(MockerTestCase):
ci.datasource = FakeDataSource(message.as_string())
outpath = os.path.join(ci.paths.get_ipath_cur("scripts"), "part-001")
+ self.mock_write = self.mocker.replace("cloudinit.util.write_file", passthrough=False)
self.mock_write(ci.paths.get_ipath("cloud_config"), "", 0600)
self.mock_write(outpath, script, 0700)
self.mocker.replay()
@@ -129,6 +192,7 @@ class TestConsumeUserData(MockerTestCase):
ci.datasource = FakeDataSource(message.as_string())
outpath = os.path.join(ci.paths.get_ipath_cur("scripts"), "part-001")
+ self.mock_write = self.mocker.replace("cloudinit.util.write_file", passthrough=False)
self.mock_write(outpath, script, 0700)
self.mock_write(ci.paths.get_ipath("cloud_config"), "", 0600)
self.mocker.replay()