summaryrefslogtreecommitdiff
path: root/tests
diff options
context:
space:
mode:
Diffstat (limited to 'tests')
-rw-r--r--tests/data/user_data.1.txt15
-rw-r--r--tests/unittests/test_merging.py62
-rw-r--r--tests/unittests/test_runs/test_merge_run.py52
-rw-r--r--tests/unittests/test_util.py59
4 files changed, 129 insertions, 59 deletions
diff --git a/tests/data/user_data.1.txt b/tests/data/user_data.1.txt
new file mode 100644
index 00000000..4c4543de
--- /dev/null
+++ b/tests/data/user_data.1.txt
@@ -0,0 +1,15 @@
+#cloud-config
+write_files:
+- content: blah
+ path: /etc/blah.ini
+ permissions: 493
+
+system_info:
+ package_mirrors:
+ - arches: [i386, amd64, blah]
+ failsafe:
+ primary: http://my.archive.mydomain.com/ubuntu
+ security: http://my.security.mydomain.com/ubuntu
+ search:
+ primary: []
+ security: []
diff --git a/tests/unittests/test_merging.py b/tests/unittests/test_merging.py
new file mode 100644
index 00000000..0037b966
--- /dev/null
+++ b/tests/unittests/test_merging.py
@@ -0,0 +1,62 @@
+from mocker import MockerTestCase
+
+from cloudinit import util
+
+
+class TestMergeDict(MockerTestCase):
+ def test_simple_merge(self):
+ """Test simple non-conflict merge."""
+ source = {"key1": "value1"}
+ candidate = {"key2": "value2"}
+ result = util.mergedict(source, candidate)
+ self.assertEqual({"key1": "value1", "key2": "value2"}, result)
+
+ def test_nested_merge(self):
+ """Test nested merge."""
+ source = {"key1": {"key1.1": "value1.1"}}
+ candidate = {"key1": {"key1.2": "value1.2"}}
+ result = util.mergedict(source, candidate)
+ self.assertEqual(
+ {"key1": {"key1.1": "value1.1", "key1.2": "value1.2"}}, result)
+
+ def test_merge_does_not_override(self):
+ """Test that candidate doesn't override source."""
+ source = {"key1": "value1", "key2": "value2"}
+ candidate = {"key1": "value2", "key2": "NEW VALUE"}
+ result = util.mergedict(source, candidate)
+ self.assertEqual(source, result)
+
+ def test_empty_candidate(self):
+ """Test empty candidate doesn't change source."""
+ source = {"key": "value"}
+ candidate = {}
+ result = util.mergedict(source, candidate)
+ self.assertEqual(source, result)
+
+ def test_empty_source(self):
+ """Test empty source is replaced by candidate."""
+ source = {}
+ candidate = {"key": "value"}
+ result = util.mergedict(source, candidate)
+ self.assertEqual(candidate, result)
+
+ def test_non_dict_candidate(self):
+ """Test non-dict candidate is discarded."""
+ source = {"key": "value"}
+ candidate = "not a dict"
+ result = util.mergedict(source, candidate)
+ self.assertEqual(source, result)
+
+ def test_non_dict_source(self):
+ """Test non-dict source is not modified with a dict candidate."""
+ source = "not a dict"
+ candidate = {"key": "value"}
+ result = util.mergedict(source, candidate)
+ self.assertEqual(source, result)
+
+ def test_neither_dict(self):
+ """Test if neither candidate or source is dict source wins."""
+ source = "source"
+ candidate = "candidate"
+ result = util.mergedict(source, candidate)
+ self.assertEqual(source, result)
diff --git a/tests/unittests/test_runs/test_merge_run.py b/tests/unittests/test_runs/test_merge_run.py
new file mode 100644
index 00000000..04c03730
--- /dev/null
+++ b/tests/unittests/test_runs/test_merge_run.py
@@ -0,0 +1,52 @@
+import os
+
+from tests.unittests import helpers
+
+from cloudinit.settings import (PER_INSTANCE)
+from cloudinit import stages
+from cloudinit import util
+
+
+class TestSimpleRun(helpers.FilesystemMockingTestCase):
+ def _patchIn(self, root):
+ self.restore()
+ self.patchOS(root)
+ self.patchUtils(root)
+
+ def test_none_ds(self):
+ new_root = self.makeDir()
+ self.replicateTestRoot('simple_ubuntu', new_root)
+ cfg = {
+ 'datasource_list': ['None'],
+ 'cloud_init_modules': ['write-files'],
+ }
+ ud = self.readResource('user_data.1.txt')
+ cloud_cfg = util.yaml_dumps(cfg)
+ util.ensure_dir(os.path.join(new_root, 'etc', 'cloud'))
+ util.write_file(os.path.join(new_root, 'etc',
+ 'cloud', 'cloud.cfg'), cloud_cfg)
+ self._patchIn(new_root)
+
+ # Now start verifying whats created
+ initer = stages.Init()
+ initer.read_cfg()
+ initer.initialize()
+ initer.fetch()
+ initer.datasource.userdata_raw = ud
+ iid = initer.instancify()
+ initer.update()
+ initer.cloudify().run('consume_userdata',
+ initer.consume_userdata,
+ args=[PER_INSTANCE],
+ freq=PER_INSTANCE)
+ mirrors = initer.distro.get_option('package_mirrors')
+ self.assertEquals(1, len(mirrors))
+ mirror = mirrors[0]
+ self.assertEquals(mirror['arches'], ['i386', 'amd64', 'blah'])
+ mods = stages.Modules(initer)
+ (which_ran, failures) = mods.run_section('cloud_init_modules')
+ self.assertTrue(len(failures) == 0)
+ self.assertTrue(os.path.exists('/etc/blah.ini'))
+ self.assertIn('write-files', which_ran)
+ contents = util.load_file('/etc/blah.ini')
+ self.assertEquals(contents, 'blah')
diff --git a/tests/unittests/test_util.py b/tests/unittests/test_util.py
index 96962b91..02611581 100644
--- a/tests/unittests/test_util.py
+++ b/tests/unittests/test_util.py
@@ -28,65 +28,6 @@ class FakeSelinux(object):
self.restored.append(path)
-class TestMergeDict(MockerTestCase):
- def test_simple_merge(self):
- """Test simple non-conflict merge."""
- source = {"key1": "value1"}
- candidate = {"key2": "value2"}
- result = util.mergedict(source, candidate)
- self.assertEqual({"key1": "value1", "key2": "value2"}, result)
-
- def test_nested_merge(self):
- """Test nested merge."""
- source = {"key1": {"key1.1": "value1.1"}}
- candidate = {"key1": {"key1.2": "value1.2"}}
- result = util.mergedict(source, candidate)
- self.assertEqual(
- {"key1": {"key1.1": "value1.1", "key1.2": "value1.2"}}, result)
-
- def test_merge_does_not_override(self):
- """Test that candidate doesn't override source."""
- source = {"key1": "value1", "key2": "value2"}
- candidate = {"key1": "value2", "key2": "NEW VALUE"}
- result = util.mergedict(source, candidate)
- self.assertEqual(source, result)
-
- def test_empty_candidate(self):
- """Test empty candidate doesn't change source."""
- source = {"key": "value"}
- candidate = {}
- result = util.mergedict(source, candidate)
- self.assertEqual(source, result)
-
- def test_empty_source(self):
- """Test empty source is replaced by candidate."""
- source = {}
- candidate = {"key": "value"}
- result = util.mergedict(source, candidate)
- self.assertEqual(candidate, result)
-
- def test_non_dict_candidate(self):
- """Test non-dict candidate is discarded."""
- source = {"key": "value"}
- candidate = "not a dict"
- result = util.mergedict(source, candidate)
- self.assertEqual(source, result)
-
- def test_non_dict_source(self):
- """Test non-dict source is not modified with a dict candidate."""
- source = "not a dict"
- candidate = {"key": "value"}
- result = util.mergedict(source, candidate)
- self.assertEqual(source, result)
-
- def test_neither_dict(self):
- """Test if neither candidate or source is dict source wins."""
- source = "source"
- candidate = "candidate"
- result = util.mergedict(source, candidate)
- self.assertEqual(source, result)
-
-
class TestGetCfgOptionListOrStr(TestCase):
def test_not_found_no_default(self):
"""None is returned if key is not found and no default given."""