summaryrefslogtreecommitdiff
path: root/tests/unittests/test_merging.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/unittests/test_merging.py')
-rw-r--r--tests/unittests/test_merging.py194
1 files changed, 54 insertions, 140 deletions
diff --git a/tests/unittests/test_merging.py b/tests/unittests/test_merging.py
index ad137e85..470b18c7 100644
--- a/tests/unittests/test_merging.py
+++ b/tests/unittests/test_merging.py
@@ -1,142 +1,56 @@
from tests.unittests import helpers
-from cloudinit import mergers
-
-
-class TestSimpleRun(helpers.MockerTestCase):
- def test_basic_merge(self):
- source = {
- 'Blah': ['blah2'],
- 'Blah3': 'c',
- }
- merge_with = {
- 'Blah2': ['blah3'],
- 'Blah3': 'b',
- 'Blah': ['123'],
- }
- # Basic merge should not do thing special
- merge_how = "list()+dict()+str()"
- merger_set = mergers.string_extract_mergers(merge_how)
- self.assertEquals(3, len(merger_set))
- merger = mergers.construct(merger_set)
- merged = merger.merge(source, merge_with)
- self.assertEquals(merged['Blah'], ['blah2'])
- self.assertEquals(merged['Blah2'], ['blah3'])
- self.assertEquals(merged['Blah3'], 'c')
-
- def test_dict_overwrite(self):
- source = {
- 'Blah': ['blah2'],
- }
- merge_with = {
- 'Blah': ['123'],
- }
- # Now lets try a dict overwrite
- merge_how = "list()+dict(overwrite)+str()"
- merger_set = mergers.string_extract_mergers(merge_how)
- self.assertEquals(3, len(merger_set))
- merger = mergers.construct(merger_set)
- merged = merger.merge(source, merge_with)
- self.assertEquals(merged['Blah'], ['123'])
-
- def test_string_append(self):
- source = {
- 'Blah': 'blah2',
- }
- merge_with = {
- 'Blah': '345',
- }
- merge_how = "list()+dict()+str(append)"
- merger_set = mergers.string_extract_mergers(merge_how)
- self.assertEquals(3, len(merger_set))
- merger = mergers.construct(merger_set)
- merged = merger.merge(source, merge_with)
- self.assertEquals(merged['Blah'], 'blah2345')
-
- def test_list_extend(self):
- source = ['abc']
- merge_with = ['123']
- merge_how = "list(extend)+dict()+str()"
- merger_set = mergers.string_extract_mergers(merge_how)
- self.assertEquals(3, len(merger_set))
- merger = mergers.construct(merger_set)
- merged = merger.merge(source, merge_with)
- self.assertEquals(merged, ['abc', '123'])
-
- def test_deep_merge(self):
- source = {
- 'a': [1, 'b', 2],
- 'b': 'blahblah',
- 'c': {
- 'e': [1, 2, 3],
- 'f': 'bigblobof',
- 'iamadict': {
- 'ok': 'ok',
- }
- },
- 'run': [
- 'runme',
- 'runme2',
- ],
- 'runmereally': [
- 'e', ['a'], 'd',
- ],
- }
- merge_with = {
- 'a': ['e', 'f', 'g'],
- 'b': 'more',
- 'c': {
- 'a': 'b',
- 'f': 'stuff',
- },
- 'run': [
- 'morecmd',
- 'moremoremore',
- ],
- 'runmereally': [
- 'blah', ['b'], 'e',
- ],
- }
- merge_how = "list(extend)+dict()+str(append)"
- merger_set = mergers.string_extract_mergers(merge_how)
- self.assertEquals(3, len(merger_set))
- merger = mergers.construct(merger_set)
- merged = merger.merge(source, merge_with)
- self.assertEquals(merged['a'], [1, 'b', 2, 'e', 'f', 'g'])
- self.assertEquals(merged['b'], 'blahblahmore')
- self.assertEquals(merged['c']['f'], 'bigblobofstuff')
- self.assertEquals(merged['run'], ['runme', 'runme2', 'morecmd',
- 'moremoremore'])
- self.assertEquals(merged['runmereally'], ['e', ['a'], 'd', 'blah',
- ['b'], 'e'])
-
- def test_dict_overwrite_layered(self):
- source = {
- 'Blah3': {
- 'f': '3',
- 'g': {
- 'a': 'b',
- }
- }
- }
- merge_with = {
- 'Blah3': {
- 'e': '2',
- 'g': {
- 'e': 'f',
- }
- }
- }
- merge_how = "list()+dict()+str()"
- merger_set = mergers.string_extract_mergers(merge_how)
- self.assertEquals(3, len(merger_set))
- merger = mergers.construct(merger_set)
- merged = merger.merge(source, merge_with)
- self.assertEquals(merged['Blah3'], {
- 'e': '2',
- 'f': '3',
- 'g': {
- 'a': 'b',
- 'e': 'f',
- }
- })
+from cloudinit.handlers import cloud_config
+from cloudinit.handlers import (CONTENT_START, CONTENT_END)
+
+from cloudinit import helpers as c_helpers
+from cloudinit import util
+
+import collections
+import glob
+import os
+import re
+
+
+class TestSimpleRun(helpers.ResourceUsingTestCase):
+ def _load_merge_files(self, data_dir):
+ merge_root = self.resourceLocation(data_dir)
+ tests = []
+ source_ids = collections.defaultdict(list)
+ expected_files = {}
+ for fn in glob.glob(os.path.join(merge_root, "source*.*yaml")):
+ base_fn = os.path.basename(fn)
+ file_id = re.match(r"source(\d+)\-(\d+)[.]yaml", base_fn)
+ if not file_id:
+ raise IOError("File %s does not have a numeric identifier"
+ % (fn))
+ file_id = int(file_id.group(1))
+ source_ids[file_id].append(fn)
+ expected_fn = os.path.join(merge_root,
+ "expected%s.yaml" % (file_id))
+ if not os.path.isfile(expected_fn):
+ raise IOError("No expected file found at %s" % (expected_fn))
+ expected_files[file_id] = expected_fn
+ for id in sorted(source_ids.keys()):
+ source_file_contents = []
+ for fn in sorted(source_ids[id]):
+ source_file_contents.append(util.load_file(fn))
+ expected = util.load_yaml(util.load_file(expected_files[id]))
+ tests.append((source_file_contents, expected))
+ return tests
+
+ def test_merge_samples(self):
+ tests = self._load_merge_files('merge_sources')
+ paths = c_helpers.Paths({})
+ cc_handler = cloud_config.CloudConfigPartHandler(paths)
+ cc_handler.cloud_fn = None
+ for (payloads, expected_merge) in tests:
+ cc_handler.handle_part(None, CONTENT_START, None,
+ None, None, None)
+ for (i, p) in enumerate(payloads):
+ cc_handler.handle_part(None, None, "t-%s.yaml" % (i + 1),
+ p, None, {})
+ merged_buf = cc_handler.cloud_buf
+ cc_handler.handle_part(None, CONTENT_END, None,
+ None, None, None)
+ self.assertEquals(expected_merge, merged_buf)