summaryrefslogtreecommitdiff
path: root/tests/unittests
diff options
context:
space:
mode:
Diffstat (limited to 'tests/unittests')
-rw-r--r--tests/unittests/helpers.py1
-rw-r--r--tests/unittests/test_distros/test_generic.py32
-rw-r--r--tests/unittests/test_distros/test_user_data_normalize.py22
-rw-r--r--tests/unittests/test_handler/test_handler_set_hostname.py58
-rw-r--r--tests/unittests/test_merging.py62
-rw-r--r--tests/unittests/test_runs/test_merge_run.py52
-rw-r--r--tests/unittests/test_util.py59
7 files changed, 224 insertions, 62 deletions
diff --git a/tests/unittests/helpers.py b/tests/unittests/helpers.py
index 2c5dcad2..e8080668 100644
--- a/tests/unittests/helpers.py
+++ b/tests/unittests/helpers.py
@@ -103,6 +103,7 @@ class FilesystemMockingTestCase(ResourceUsingTestCase):
def patchUtils(self, new_root):
patch_funcs = {
util: [('write_file', 1),
+ ('append_file', 1),
('load_file', 1),
('ensure_dir', 1),
('chmod', 1),
diff --git a/tests/unittests/test_distros/test_generic.py b/tests/unittests/test_distros/test_generic.py
index 2df4c2f0..704699b5 100644
--- a/tests/unittests/test_distros/test_generic.py
+++ b/tests/unittests/test_distros/test_generic.py
@@ -1,6 +1,9 @@
-from mocker import MockerTestCase
-
from cloudinit import distros
+from cloudinit import util
+
+from tests.unittests import helpers
+
+import os
unknown_arch_info = {
'arches': ['default'],
@@ -27,7 +30,7 @@ gpmi = distros._get_package_mirror_info # pylint: disable=W0212
gapmi = distros._get_arch_package_mirror_info # pylint: disable=W0212
-class TestGenericDistro(MockerTestCase):
+class TestGenericDistro(helpers.FilesystemMockingTestCase):
def return_first(self, mlist):
if not mlist:
@@ -52,6 +55,29 @@ class TestGenericDistro(MockerTestCase):
# Make a temp directoy for tests to use.
self.tmp = self.makeDir()
+ def test_sudoers_ensure_new(self):
+ cls = distros.fetch("ubuntu")
+ d = cls("ubuntu", {}, None)
+ self.patchOS(self.tmp)
+ self.patchUtils(self.tmp)
+ d.ensure_sudo_dir("/b")
+ contents = util.load_file("/etc/sudoers")
+ self.assertIn("includedir /b", contents)
+ self.assertTrue(os.path.isdir("/b"))
+
+ def test_sudoers_ensure_append(self):
+ cls = distros.fetch("ubuntu")
+ d = cls("ubuntu", {}, None)
+ self.patchOS(self.tmp)
+ self.patchUtils(self.tmp)
+ util.write_file("/etc/sudoers", "josh, josh\n")
+ d.ensure_sudo_dir("/b")
+ contents = util.load_file("/etc/sudoers")
+ self.assertIn("includedir /b", contents)
+ self.assertTrue(os.path.isdir("/b"))
+ self.assertIn("josh", contents)
+ self.assertEquals(2, contents.count("josh"))
+
def test_arch_package_mirror_info_unknown(self):
"""for an unknown arch, we should get back that with arch 'default'."""
arch_mirrors = gapmi(package_mirrors, arch="unknown")
diff --git a/tests/unittests/test_distros/test_user_data_normalize.py b/tests/unittests/test_distros/test_user_data_normalize.py
index 8f0d8896..5d9d4311 100644
--- a/tests/unittests/test_distros/test_user_data_normalize.py
+++ b/tests/unittests/test_distros/test_user_data_normalize.py
@@ -30,6 +30,28 @@ class TestUGNormalize(MockerTestCase):
def _norm(self, cfg, distro):
return distros.normalize_users_groups(cfg, distro)
+ def test_group_dict(self):
+ distro = self._make_distro('ubuntu')
+ g = {'groups': [
+ {
+ 'ubuntu': ['foo', 'bar'],
+ 'bob': 'users',
+ },
+ 'cloud-users',
+ {
+ 'bob': 'users2',
+ },
+ ]
+ }
+ (_users, groups) = self._norm(g, distro)
+ self.assertIn('ubuntu', groups)
+ ub_members = groups['ubuntu']
+ self.assertEquals(sorted(['foo', 'bar']), sorted(ub_members))
+ self.assertIn('bob', groups)
+ b_members = groups['bob']
+ self.assertEquals(sorted(['users', 'users2']),
+ sorted(b_members))
+
def test_basic_groups(self):
distro = self._make_distro('ubuntu')
ug_cfg = {
diff --git a/tests/unittests/test_handler/test_handler_set_hostname.py b/tests/unittests/test_handler/test_handler_set_hostname.py
new file mode 100644
index 00000000..a1aba62f
--- /dev/null
+++ b/tests/unittests/test_handler/test_handler_set_hostname.py
@@ -0,0 +1,58 @@
+from cloudinit.config import cc_set_hostname
+
+from cloudinit import cloud
+from cloudinit import distros
+from cloudinit import helpers
+from cloudinit import util
+
+from tests.unittests import helpers as t_help
+
+import logging
+
+from StringIO import StringIO
+
+from configobj import ConfigObj
+
+LOG = logging.getLogger(__name__)
+
+
+class TestHostname(t_help.FilesystemMockingTestCase):
+ def setUp(self):
+ super(TestHostname, self).setUp()
+ self.tmp = self.makeDir(prefix="unittest_")
+
+ def _fetch_distro(self, kind):
+ cls = distros.fetch(kind)
+ paths = helpers.Paths({})
+ return cls(kind, {}, paths)
+
+ def test_write_hostname_rhel(self):
+ cfg = {
+ 'hostname': 'blah.blah.blah.yahoo.com',
+ }
+ distro = self._fetch_distro('rhel')
+ paths = helpers.Paths({})
+ ds = None
+ cc = cloud.Cloud(ds, paths, {}, distro, None)
+ self.patchUtils(self.tmp)
+ self.patchOS(self.tmp)
+ cc_set_hostname.handle('cc_set_hostname',
+ cfg, cc, LOG, [])
+ contents = util.load_file("/etc/sysconfig/network")
+ n_cfg = ConfigObj(StringIO(contents))
+ self.assertEquals({'HOSTNAME': 'blah.blah.blah.yahoo.com'},
+ dict(n_cfg))
+
+ def test_write_hostname_debian(self):
+ cfg = {
+ 'hostname': 'blah.blah.blah.yahoo.com',
+ }
+ distro = self._fetch_distro('debian')
+ paths = helpers.Paths({})
+ ds = None
+ cc = cloud.Cloud(ds, paths, {}, distro, None)
+ self.patchUtils(self.tmp)
+ cc_set_hostname.handle('cc_set_hostname',
+ cfg, cc, LOG, [])
+ contents = util.load_file("/etc/hostname")
+ self.assertEquals('blah', contents.strip())
diff --git a/tests/unittests/test_merging.py b/tests/unittests/test_merging.py
new file mode 100644
index 00000000..0037b966
--- /dev/null
+++ b/tests/unittests/test_merging.py
@@ -0,0 +1,62 @@
+from mocker import MockerTestCase
+
+from cloudinit import util
+
+
+class TestMergeDict(MockerTestCase):
+ def test_simple_merge(self):
+ """Test simple non-conflict merge."""
+ source = {"key1": "value1"}
+ candidate = {"key2": "value2"}
+ result = util.mergedict(source, candidate)
+ self.assertEqual({"key1": "value1", "key2": "value2"}, result)
+
+ def test_nested_merge(self):
+ """Test nested merge."""
+ source = {"key1": {"key1.1": "value1.1"}}
+ candidate = {"key1": {"key1.2": "value1.2"}}
+ result = util.mergedict(source, candidate)
+ self.assertEqual(
+ {"key1": {"key1.1": "value1.1", "key1.2": "value1.2"}}, result)
+
+ def test_merge_does_not_override(self):
+ """Test that candidate doesn't override source."""
+ source = {"key1": "value1", "key2": "value2"}
+ candidate = {"key1": "value2", "key2": "NEW VALUE"}
+ result = util.mergedict(source, candidate)
+ self.assertEqual(source, result)
+
+ def test_empty_candidate(self):
+ """Test empty candidate doesn't change source."""
+ source = {"key": "value"}
+ candidate = {}
+ result = util.mergedict(source, candidate)
+ self.assertEqual(source, result)
+
+ def test_empty_source(self):
+ """Test empty source is replaced by candidate."""
+ source = {}
+ candidate = {"key": "value"}
+ result = util.mergedict(source, candidate)
+ self.assertEqual(candidate, result)
+
+ def test_non_dict_candidate(self):
+ """Test non-dict candidate is discarded."""
+ source = {"key": "value"}
+ candidate = "not a dict"
+ result = util.mergedict(source, candidate)
+ self.assertEqual(source, result)
+
+ def test_non_dict_source(self):
+ """Test non-dict source is not modified with a dict candidate."""
+ source = "not a dict"
+ candidate = {"key": "value"}
+ result = util.mergedict(source, candidate)
+ self.assertEqual(source, result)
+
+ def test_neither_dict(self):
+ """Test if neither candidate or source is dict source wins."""
+ source = "source"
+ candidate = "candidate"
+ result = util.mergedict(source, candidate)
+ self.assertEqual(source, result)
diff --git a/tests/unittests/test_runs/test_merge_run.py b/tests/unittests/test_runs/test_merge_run.py
new file mode 100644
index 00000000..36de97ae
--- /dev/null
+++ b/tests/unittests/test_runs/test_merge_run.py
@@ -0,0 +1,52 @@
+import os
+
+from tests.unittests import helpers
+
+from cloudinit.settings import (PER_INSTANCE)
+from cloudinit import stages
+from cloudinit import util
+
+
+class TestMergeRun(helpers.FilesystemMockingTestCase):
+ def _patchIn(self, root):
+ self.restore()
+ self.patchOS(root)
+ self.patchUtils(root)
+
+ def test_none_ds(self):
+ new_root = self.makeDir()
+ self.replicateTestRoot('simple_ubuntu', new_root)
+ cfg = {
+ 'datasource_list': ['None'],
+ 'cloud_init_modules': ['write-files'],
+ }
+ ud = self.readResource('user_data.1.txt')
+ cloud_cfg = util.yaml_dumps(cfg)
+ util.ensure_dir(os.path.join(new_root, 'etc', 'cloud'))
+ util.write_file(os.path.join(new_root, 'etc',
+ 'cloud', 'cloud.cfg'), cloud_cfg)
+ self._patchIn(new_root)
+
+ # Now start verifying whats created
+ initer = stages.Init()
+ initer.read_cfg()
+ initer.initialize()
+ initer.fetch()
+ initer.datasource.userdata_raw = ud
+ iid = initer.instancify()
+ initer.update()
+ initer.cloudify().run('consume_userdata',
+ initer.consume_userdata,
+ args=[PER_INSTANCE],
+ freq=PER_INSTANCE)
+ mirrors = initer.distro.get_option('package_mirrors')
+ self.assertEquals(1, len(mirrors))
+ mirror = mirrors[0]
+ self.assertEquals(mirror['arches'], ['i386', 'amd64', 'blah'])
+ mods = stages.Modules(initer)
+ (which_ran, failures) = mods.run_section('cloud_init_modules')
+ self.assertTrue(len(failures) == 0)
+ self.assertTrue(os.path.exists('/etc/blah.ini'))
+ self.assertIn('write-files', which_ran)
+ contents = util.load_file('/etc/blah.ini')
+ self.assertEquals(contents, 'blah')
diff --git a/tests/unittests/test_util.py b/tests/unittests/test_util.py
index 96962b91..02611581 100644
--- a/tests/unittests/test_util.py
+++ b/tests/unittests/test_util.py
@@ -28,65 +28,6 @@ class FakeSelinux(object):
self.restored.append(path)
-class TestMergeDict(MockerTestCase):
- def test_simple_merge(self):
- """Test simple non-conflict merge."""
- source = {"key1": "value1"}
- candidate = {"key2": "value2"}
- result = util.mergedict(source, candidate)
- self.assertEqual({"key1": "value1", "key2": "value2"}, result)
-
- def test_nested_merge(self):
- """Test nested merge."""
- source = {"key1": {"key1.1": "value1.1"}}
- candidate = {"key1": {"key1.2": "value1.2"}}
- result = util.mergedict(source, candidate)
- self.assertEqual(
- {"key1": {"key1.1": "value1.1", "key1.2": "value1.2"}}, result)
-
- def test_merge_does_not_override(self):
- """Test that candidate doesn't override source."""
- source = {"key1": "value1", "key2": "value2"}
- candidate = {"key1": "value2", "key2": "NEW VALUE"}
- result = util.mergedict(source, candidate)
- self.assertEqual(source, result)
-
- def test_empty_candidate(self):
- """Test empty candidate doesn't change source."""
- source = {"key": "value"}
- candidate = {}
- result = util.mergedict(source, candidate)
- self.assertEqual(source, result)
-
- def test_empty_source(self):
- """Test empty source is replaced by candidate."""
- source = {}
- candidate = {"key": "value"}
- result = util.mergedict(source, candidate)
- self.assertEqual(candidate, result)
-
- def test_non_dict_candidate(self):
- """Test non-dict candidate is discarded."""
- source = {"key": "value"}
- candidate = "not a dict"
- result = util.mergedict(source, candidate)
- self.assertEqual(source, result)
-
- def test_non_dict_source(self):
- """Test non-dict source is not modified with a dict candidate."""
- source = "not a dict"
- candidate = {"key": "value"}
- result = util.mergedict(source, candidate)
- self.assertEqual(source, result)
-
- def test_neither_dict(self):
- """Test if neither candidate or source is dict source wins."""
- source = "source"
- candidate = "candidate"
- result = util.mergedict(source, candidate)
- self.assertEqual(source, result)
-
-
class TestGetCfgOptionListOrStr(TestCase):
def test_not_found_no_default(self):
"""None is returned if key is not found and no default given."""