diff options
Diffstat (limited to 'tests/unittests/test_merging.py')
-rw-r--r-- | tests/unittests/test_merging.py | 194 |
1 files changed, 54 insertions, 140 deletions
diff --git a/tests/unittests/test_merging.py b/tests/unittests/test_merging.py index ad137e85..470b18c7 100644 --- a/tests/unittests/test_merging.py +++ b/tests/unittests/test_merging.py @@ -1,142 +1,56 @@ from tests.unittests import helpers -from cloudinit import mergers - - -class TestSimpleRun(helpers.MockerTestCase): - def test_basic_merge(self): - source = { - 'Blah': ['blah2'], - 'Blah3': 'c', - } - merge_with = { - 'Blah2': ['blah3'], - 'Blah3': 'b', - 'Blah': ['123'], - } - # Basic merge should not do thing special - merge_how = "list()+dict()+str()" - merger_set = mergers.string_extract_mergers(merge_how) - self.assertEquals(3, len(merger_set)) - merger = mergers.construct(merger_set) - merged = merger.merge(source, merge_with) - self.assertEquals(merged['Blah'], ['blah2']) - self.assertEquals(merged['Blah2'], ['blah3']) - self.assertEquals(merged['Blah3'], 'c') - - def test_dict_overwrite(self): - source = { - 'Blah': ['blah2'], - } - merge_with = { - 'Blah': ['123'], - } - # Now lets try a dict overwrite - merge_how = "list()+dict(overwrite)+str()" - merger_set = mergers.string_extract_mergers(merge_how) - self.assertEquals(3, len(merger_set)) - merger = mergers.construct(merger_set) - merged = merger.merge(source, merge_with) - self.assertEquals(merged['Blah'], ['123']) - - def test_string_append(self): - source = { - 'Blah': 'blah2', - } - merge_with = { - 'Blah': '345', - } - merge_how = "list()+dict()+str(append)" - merger_set = mergers.string_extract_mergers(merge_how) - self.assertEquals(3, len(merger_set)) - merger = mergers.construct(merger_set) - merged = merger.merge(source, merge_with) - self.assertEquals(merged['Blah'], 'blah2345') - - def test_list_extend(self): - source = ['abc'] - merge_with = ['123'] - merge_how = "list(extend)+dict()+str()" - merger_set = mergers.string_extract_mergers(merge_how) - self.assertEquals(3, len(merger_set)) - merger = mergers.construct(merger_set) - merged = merger.merge(source, merge_with) - self.assertEquals(merged, ['abc', '123']) - - def test_deep_merge(self): - source = { - 'a': [1, 'b', 2], - 'b': 'blahblah', - 'c': { - 'e': [1, 2, 3], - 'f': 'bigblobof', - 'iamadict': { - 'ok': 'ok', - } - }, - 'run': [ - 'runme', - 'runme2', - ], - 'runmereally': [ - 'e', ['a'], 'd', - ], - } - merge_with = { - 'a': ['e', 'f', 'g'], - 'b': 'more', - 'c': { - 'a': 'b', - 'f': 'stuff', - }, - 'run': [ - 'morecmd', - 'moremoremore', - ], - 'runmereally': [ - 'blah', ['b'], 'e', - ], - } - merge_how = "list(extend)+dict()+str(append)" - merger_set = mergers.string_extract_mergers(merge_how) - self.assertEquals(3, len(merger_set)) - merger = mergers.construct(merger_set) - merged = merger.merge(source, merge_with) - self.assertEquals(merged['a'], [1, 'b', 2, 'e', 'f', 'g']) - self.assertEquals(merged['b'], 'blahblahmore') - self.assertEquals(merged['c']['f'], 'bigblobofstuff') - self.assertEquals(merged['run'], ['runme', 'runme2', 'morecmd', - 'moremoremore']) - self.assertEquals(merged['runmereally'], ['e', ['a'], 'd', 'blah', - ['b'], 'e']) - - def test_dict_overwrite_layered(self): - source = { - 'Blah3': { - 'f': '3', - 'g': { - 'a': 'b', - } - } - } - merge_with = { - 'Blah3': { - 'e': '2', - 'g': { - 'e': 'f', - } - } - } - merge_how = "list()+dict()+str()" - merger_set = mergers.string_extract_mergers(merge_how) - self.assertEquals(3, len(merger_set)) - merger = mergers.construct(merger_set) - merged = merger.merge(source, merge_with) - self.assertEquals(merged['Blah3'], { - 'e': '2', - 'f': '3', - 'g': { - 'a': 'b', - 'e': 'f', - } - }) +from cloudinit.handlers import cloud_config +from cloudinit.handlers import (CONTENT_START, CONTENT_END) + +from cloudinit import helpers as c_helpers +from cloudinit import util + +import collections +import glob +import os +import re + + +class TestSimpleRun(helpers.ResourceUsingTestCase): + def _load_merge_files(self, data_dir): + merge_root = self.resourceLocation(data_dir) + tests = [] + source_ids = collections.defaultdict(list) + expected_files = {} + for fn in glob.glob(os.path.join(merge_root, "source*.*yaml")): + base_fn = os.path.basename(fn) + file_id = re.match(r"source(\d+)\-(\d+)[.]yaml", base_fn) + if not file_id: + raise IOError("File %s does not have a numeric identifier" + % (fn)) + file_id = int(file_id.group(1)) + source_ids[file_id].append(fn) + expected_fn = os.path.join(merge_root, + "expected%s.yaml" % (file_id)) + if not os.path.isfile(expected_fn): + raise IOError("No expected file found at %s" % (expected_fn)) + expected_files[file_id] = expected_fn + for id in sorted(source_ids.keys()): + source_file_contents = [] + for fn in sorted(source_ids[id]): + source_file_contents.append(util.load_file(fn)) + expected = util.load_yaml(util.load_file(expected_files[id])) + tests.append((source_file_contents, expected)) + return tests + + def test_merge_samples(self): + tests = self._load_merge_files('merge_sources') + paths = c_helpers.Paths({}) + cc_handler = cloud_config.CloudConfigPartHandler(paths) + cc_handler.cloud_fn = None + for (payloads, expected_merge) in tests: + cc_handler.handle_part(None, CONTENT_START, None, + None, None, None) + for (i, p) in enumerate(payloads): + cc_handler.handle_part(None, None, "t-%s.yaml" % (i + 1), + p, None, {}) + merged_buf = cc_handler.cloud_buf + cc_handler.handle_part(None, CONTENT_END, None, + None, None, None) + self.assertEquals(expected_merge, merged_buf) |