summaryrefslogtreecommitdiff
path: root/tests/unittests/test_merging.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/unittests/test_merging.py')
-rw-r--r--tests/unittests/test_merging.py309
1 files changed, 251 insertions, 58 deletions
diff --git a/tests/unittests/test_merging.py b/tests/unittests/test_merging.py
index 0037b966..486b9158 100644
--- a/tests/unittests/test_merging.py
+++ b/tests/unittests/test_merging.py
@@ -1,62 +1,255 @@
-from mocker import MockerTestCase
+from tests.unittests import helpers
+from cloudinit.handlers import cloud_config
+from cloudinit.handlers import (CONTENT_START, CONTENT_END)
+
+from cloudinit import helpers as c_helpers
from cloudinit import util
+import collections
+import glob
+import os
+import random
+import re
+import string # pylint: disable=W0402
+
+SOURCE_PAT = "source*.*yaml"
+EXPECTED_PAT = "expected%s.yaml"
+TYPES = [long, int, dict, str, list, tuple, None]
+
+
+def _old_mergedict(src, cand):
+ """
+ Merge values from C{cand} into C{src}.
+ If C{src} has a key C{cand} will not override.
+ Nested dictionaries are merged recursively.
+ """
+ if isinstance(src, dict) and isinstance(cand, dict):
+ for (k, v) in cand.iteritems():
+ if k not in src:
+ src[k] = v
+ else:
+ src[k] = _old_mergedict(src[k], v)
+ return src
+
+
+def _old_mergemanydict(*args):
+ out = {}
+ for a in args:
+ out = _old_mergedict(out, a)
+ return out
+
+
+def _random_str(rand):
+ base = ''
+ for _i in xrange(rand.randint(1, 2 ** 8)):
+ base += rand.choice(string.letters + string.digits)
+ return base
+
+
+class _NoMoreException(Exception):
+ pass
+
+
+def _make_dict(current_depth, max_depth, rand):
+ if current_depth >= max_depth:
+ raise _NoMoreException()
+ if current_depth == 0:
+ t = dict
+ else:
+ t = rand.choice(TYPES)
+ base = None
+ if t in [None]:
+ return base
+ if t in [dict, list, tuple]:
+ if t in [dict]:
+ amount = rand.randint(0, 5)
+ keys = [_random_str(rand) for _i in xrange(0, amount)]
+ base = {}
+ for k in keys:
+ try:
+ base[k] = _make_dict(current_depth + 1, max_depth, rand)
+ except _NoMoreException:
+ pass
+ elif t in [list, tuple]:
+ base = []
+ amount = rand.randint(0, 5)
+ for _i in xrange(0, amount):
+ try:
+ base.append(_make_dict(current_depth + 1, max_depth, rand))
+ except _NoMoreException:
+ pass
+ if t in [tuple]:
+ base = tuple(base)
+ elif t in [long, int]:
+ base = rand.randint(0, 2 ** 8)
+ elif t in [str]:
+ base = _random_str(rand)
+ return base
+
+
+def make_dict(max_depth, seed=None):
+ max_depth = max(1, max_depth)
+ rand = random.Random(seed)
+ return _make_dict(0, max_depth, rand)
+
+
+class TestSimpleRun(helpers.ResourceUsingTestCase):
+ def _load_merge_files(self):
+ merge_root = self.resourceLocation('merge_sources')
+ tests = []
+ source_ids = collections.defaultdict(list)
+ expected_files = {}
+ for fn in glob.glob(os.path.join(merge_root, SOURCE_PAT)):
+ base_fn = os.path.basename(fn)
+ file_id = re.match(r"source(\d+)\-(\d+)[.]yaml", base_fn)
+ if not file_id:
+ raise IOError("File %s does not have a numeric identifier"
+ % (fn))
+ file_id = int(file_id.group(1))
+ source_ids[file_id].append(fn)
+ expected_fn = os.path.join(merge_root, EXPECTED_PAT % (file_id))
+ if not os.path.isfile(expected_fn):
+ raise IOError("No expected file found at %s" % (expected_fn))
+ expected_files[file_id] = expected_fn
+ for i in sorted(source_ids.keys()):
+ source_file_contents = []
+ for fn in sorted(source_ids[i]):
+ source_file_contents.append([fn, util.load_file(fn)])
+ expected = util.load_yaml(util.load_file(expected_files[i]))
+ entry = [source_file_contents, [expected, expected_files[i]]]
+ tests.append(entry)
+ return tests
+
+ def test_seed_runs(self):
+ test_dicts = []
+ for i in range(1, 50):
+ base_dicts = []
+ for j in range(1, 50):
+ base_dicts.append(make_dict(5, i * j))
+ test_dicts.append(base_dicts)
+ for test in test_dicts:
+ c = _old_mergemanydict(*test)
+ d = util.mergemanydict(test)
+ self.assertEquals(c, d)
+
+ def test_merge_cc_samples(self):
+ tests = self._load_merge_files()
+ paths = c_helpers.Paths({})
+ cc_handler = cloud_config.CloudConfigPartHandler(paths)
+ cc_handler.cloud_fn = None
+ for (payloads, (expected_merge, expected_fn)) in tests:
+ cc_handler.handle_part(None, CONTENT_START, None,
+ None, None, None)
+ merging_fns = []
+ for (fn, contents) in payloads:
+ cc_handler.handle_part(None, None, "%s.yaml" % (fn),
+ contents, None, {})
+ merging_fns.append(fn)
+ merged_buf = cc_handler.cloud_buf
+ cc_handler.handle_part(None, CONTENT_END, None,
+ None, None, None)
+ fail_msg = "Equality failure on checking %s with %s: %s != %s"
+ fail_msg = fail_msg % (expected_fn,
+ ",".join(merging_fns), merged_buf,
+ expected_merge)
+ self.assertEquals(expected_merge, merged_buf, msg=fail_msg)
+
+ def test_compat_merges_dict(self):
+ a = {
+ '1': '2',
+ 'b': 'c',
+ }
+ b = {
+ 'b': 'e',
+ }
+ c = _old_mergedict(a, b)
+ d = util.mergemanydict([a, b])
+ self.assertEquals(c, d)
+
+ def test_compat_merges_dict2(self):
+ a = {
+ 'Blah': 1,
+ 'Blah2': 2,
+ 'Blah3': 3,
+ }
+ b = {
+ 'Blah': 1,
+ 'Blah2': 2,
+ 'Blah3': [1],
+ }
+ c = _old_mergedict(a, b)
+ d = util.mergemanydict([a, b])
+ self.assertEquals(c, d)
+
+ def test_compat_merges_list(self):
+ a = {'b': [1, 2, 3]}
+ b = {'b': [4, 5]}
+ c = {'b': [6, 7]}
+ e = _old_mergemanydict(a, b, c)
+ f = util.mergemanydict([a, b, c])
+ self.assertEquals(e, f)
+
+ def test_compat_merges_str(self):
+ a = {'b': "hi"}
+ b = {'b': "howdy"}
+ c = {'b': "hallo"}
+ e = _old_mergemanydict(a, b, c)
+ f = util.mergemanydict([a, b, c])
+ self.assertEquals(e, f)
+
+ def test_compat_merge_sub_dict(self):
+ a = {
+ '1': '2',
+ 'b': {
+ 'f': 'g',
+ 'e': 'c',
+ 'h': 'd',
+ 'hh': {
+ '1': 2,
+ },
+ }
+ }
+ b = {
+ 'b': {
+ 'e': 'c',
+ 'hh': {
+ '3': 4,
+ }
+ }
+ }
+ c = _old_mergedict(a, b)
+ d = util.mergemanydict([a, b])
+ self.assertEquals(c, d)
+
+ def test_compat_merge_sub_dict2(self):
+ a = {
+ '1': '2',
+ 'b': {
+ 'f': 'g',
+ }
+ }
+ b = {
+ 'b': {
+ 'e': 'c',
+ }
+ }
+ c = _old_mergedict(a, b)
+ d = util.mergemanydict([a, b])
+ self.assertEquals(c, d)
-class TestMergeDict(MockerTestCase):
- def test_simple_merge(self):
- """Test simple non-conflict merge."""
- source = {"key1": "value1"}
- candidate = {"key2": "value2"}
- result = util.mergedict(source, candidate)
- self.assertEqual({"key1": "value1", "key2": "value2"}, result)
-
- def test_nested_merge(self):
- """Test nested merge."""
- source = {"key1": {"key1.1": "value1.1"}}
- candidate = {"key1": {"key1.2": "value1.2"}}
- result = util.mergedict(source, candidate)
- self.assertEqual(
- {"key1": {"key1.1": "value1.1", "key1.2": "value1.2"}}, result)
-
- def test_merge_does_not_override(self):
- """Test that candidate doesn't override source."""
- source = {"key1": "value1", "key2": "value2"}
- candidate = {"key1": "value2", "key2": "NEW VALUE"}
- result = util.mergedict(source, candidate)
- self.assertEqual(source, result)
-
- def test_empty_candidate(self):
- """Test empty candidate doesn't change source."""
- source = {"key": "value"}
- candidate = {}
- result = util.mergedict(source, candidate)
- self.assertEqual(source, result)
-
- def test_empty_source(self):
- """Test empty source is replaced by candidate."""
- source = {}
- candidate = {"key": "value"}
- result = util.mergedict(source, candidate)
- self.assertEqual(candidate, result)
-
- def test_non_dict_candidate(self):
- """Test non-dict candidate is discarded."""
- source = {"key": "value"}
- candidate = "not a dict"
- result = util.mergedict(source, candidate)
- self.assertEqual(source, result)
-
- def test_non_dict_source(self):
- """Test non-dict source is not modified with a dict candidate."""
- source = "not a dict"
- candidate = {"key": "value"}
- result = util.mergedict(source, candidate)
- self.assertEqual(source, result)
-
- def test_neither_dict(self):
- """Test if neither candidate or source is dict source wins."""
- source = "source"
- candidate = "candidate"
- result = util.mergedict(source, candidate)
- self.assertEqual(source, result)
+ def test_compat_merge_sub_list(self):
+ a = {
+ '1': '2',
+ 'b': {
+ 'f': ['1'],
+ }
+ }
+ b = {
+ 'b': {
+ 'f': [],
+ }
+ }
+ c = _old_mergedict(a, b)
+ d = util.mergemanydict([a, b])
+ self.assertEquals(c, d)