summaryrefslogtreecommitdiff
path: root/tests
diff options
context:
space:
mode:
Diffstat (limited to 'tests')
-rw-r--r--tests/__init__.py0
-rw-r--r--tests/configs/sample1.yaml1
-rw-r--r--tests/data/roots/simple_ubuntu/etc/networks/interfaces3
-rw-r--r--tests/data/user_data.1.txt15
-rw-r--r--tests/unittests/__init__.py0
-rw-r--r--tests/unittests/helpers.py149
-rw-r--r--tests/unittests/test_builtin_handlers.py9
-rw-r--r--tests/unittests/test_datasource/__init__.py0
-rw-r--r--tests/unittests/test_datasource/test_configdrive.py128
-rw-r--r--tests/unittests/test_datasource/test_maas.py8
-rw-r--r--tests/unittests/test_datasource/test_nocloud.py157
-rw-r--r--tests/unittests/test_distros/__init__.py0
-rw-r--r--tests/unittests/test_distros/test_generic.py85
-rw-r--r--tests/unittests/test_distros/test_hostname.py38
-rw-r--r--tests/unittests/test_distros/test_hosts.py41
-rw-r--r--tests/unittests/test_distros/test_netconfig.py7
-rw-r--r--tests/unittests/test_distros/test_resolv.py61
-rw-r--r--tests/unittests/test_distros/test_sysconfig.py82
-rw-r--r--tests/unittests/test_distros/test_user_data_normalize.py62
-rw-r--r--tests/unittests/test_filters/__init__.py0
-rw-r--r--tests/unittests/test_filters/test_launch_index.py4
-rw-r--r--tests/unittests/test_handler/__init__.py0
-rw-r--r--tests/unittests/test_handler/test_handler_ca_certs.py68
-rw-r--r--tests/unittests/test_handler/test_handler_power_state.py88
-rw-r--r--tests/unittests/test_handler/test_handler_set_hostname.py58
-rw-r--r--tests/unittests/test_handler/test_handler_yum_add_repo.py68
-rw-r--r--tests/unittests/test_merging.py62
-rw-r--r--tests/unittests/test_runs/__init__.py0
-rw-r--r--tests/unittests/test_runs/test_merge_run.py52
-rw-r--r--tests/unittests/test_runs/test_simple_run.py80
-rw-r--r--tests/unittests/test_util.py59
31 files changed, 1231 insertions, 154 deletions
diff --git a/tests/__init__.py b/tests/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/tests/__init__.py
diff --git a/tests/configs/sample1.yaml b/tests/configs/sample1.yaml
index 24e874ee..6231f293 100644
--- a/tests/configs/sample1.yaml
+++ b/tests/configs/sample1.yaml
@@ -50,4 +50,3 @@ runcmd:
byobu_by_default: user
-output: {all: '| tee -a /var/log/cloud-init-output.log'}
diff --git a/tests/data/roots/simple_ubuntu/etc/networks/interfaces b/tests/data/roots/simple_ubuntu/etc/networks/interfaces
new file mode 100644
index 00000000..77efa67d
--- /dev/null
+++ b/tests/data/roots/simple_ubuntu/etc/networks/interfaces
@@ -0,0 +1,3 @@
+auto lo
+iface lo inet loopback
+
diff --git a/tests/data/user_data.1.txt b/tests/data/user_data.1.txt
new file mode 100644
index 00000000..4c4543de
--- /dev/null
+++ b/tests/data/user_data.1.txt
@@ -0,0 +1,15 @@
+#cloud-config
+write_files:
+- content: blah
+ path: /etc/blah.ini
+ permissions: 493
+
+system_info:
+ package_mirrors:
+ - arches: [i386, amd64, blah]
+ failsafe:
+ primary: http://my.archive.mydomain.com/ubuntu
+ security: http://my.security.mydomain.com/ubuntu
+ search:
+ primary: []
+ security: []
diff --git a/tests/unittests/__init__.py b/tests/unittests/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/tests/unittests/__init__.py
diff --git a/tests/unittests/helpers.py b/tests/unittests/helpers.py
index d0f09e70..91a50e18 100644
--- a/tests/unittests/helpers.py
+++ b/tests/unittests/helpers.py
@@ -1,8 +1,77 @@
import os
+import sys
+import unittest
+from contextlib import contextmanager
+
+from mocker import Mocker
from mocker import MockerTestCase
from cloudinit import helpers as ch
+from cloudinit import util
+
+import shutil
+
+# Handle how 2.6 doesn't have the assertIn or assertNotIn
+_PY_VER = sys.version_info
+_PY_MAJOR, _PY_MINOR = _PY_VER[0:2]
+if (_PY_MAJOR, _PY_MINOR) <= (2, 6):
+ # For now add these on, taken from python 2.7 + slightly adjusted
+ class TestCase(unittest.TestCase):
+ def assertIn(self, member, container, msg=None):
+ if member not in container:
+ standardMsg = '%r not found in %r' % (member, container)
+ self.fail(self._formatMessage(msg, standardMsg))
+
+ def assertNotIn(self, member, container, msg=None):
+ if member in container:
+ standardMsg = '%r unexpectedly found in %r'
+ standardMsg = standardMsg % (member, container)
+ self.fail(self._formatMessage(msg, standardMsg))
+
+else:
+ class TestCase(unittest.TestCase):
+ pass
+
+
+@contextmanager
+def mocker(verify_calls=True):
+ m = Mocker()
+ try:
+ yield m
+ finally:
+ m.restore()
+ if verify_calls:
+ m.verify()
+
+
+# Makes the old path start
+# with new base instead of whatever
+# it previously had
+def rebase_path(old_path, new_base):
+ if old_path.startswith(new_base):
+ # Already handled...
+ return old_path
+ # Retarget the base of that path
+ # to the new base instead of the
+ # old one...
+ path = os.path.join(new_base, old_path.lstrip("/"))
+ path = os.path.abspath(path)
+ return path
+
+
+# Can work on anything that takes a path as arguments
+def retarget_many_wrapper(new_base, am, old_func):
+ def wrapper(*args, **kwds):
+ n_args = list(args)
+ nam = am
+ if am == -1:
+ nam = len(n_args)
+ for i in range(0, nam):
+ path = args[i]
+ n_args[i] = rebase_path(path, new_base)
+ return old_func(*n_args, **kwds)
+ return wrapper
class ResourceUsingTestCase(MockerTestCase):
@@ -40,3 +109,83 @@ class ResourceUsingTestCase(MockerTestCase):
'templates_dir': self.resourceLocation(),
})
return cp
+
+
+class FilesystemMockingTestCase(ResourceUsingTestCase):
+ def __init__(self, methodName="runTest"):
+ ResourceUsingTestCase.__init__(self, methodName)
+ self.patched_funcs = []
+
+ def replicateTestRoot(self, example_root, target_root):
+ real_root = self.resourceLocation()
+ real_root = os.path.join(real_root, 'roots', example_root)
+ for (dir_path, _dirnames, filenames) in os.walk(real_root):
+ real_path = dir_path
+ make_path = rebase_path(real_path[len(real_root):], target_root)
+ util.ensure_dir(make_path)
+ for f in filenames:
+ real_path = util.abs_join(real_path, f)
+ make_path = util.abs_join(make_path, f)
+ shutil.copy(real_path, make_path)
+
+ def tearDown(self):
+ self.restore()
+ ResourceUsingTestCase.tearDown(self)
+
+ def restore(self):
+ for (mod, f, func) in self.patched_funcs:
+ setattr(mod, f, func)
+ self.patched_funcs = []
+
+ def patchUtils(self, new_root):
+ patch_funcs = {
+ util: [('write_file', 1),
+ ('append_file', 1),
+ ('load_file', 1),
+ ('ensure_dir', 1),
+ ('chmod', 1),
+ ('delete_dir_contents', 1),
+ ('del_file', 1),
+ ('sym_link', -1)],
+ }
+ for (mod, funcs) in patch_funcs.items():
+ for (f, am) in funcs:
+ func = getattr(mod, f)
+ trap_func = retarget_many_wrapper(new_root, am, func)
+ setattr(mod, f, trap_func)
+ self.patched_funcs.append((mod, f, func))
+
+ # Handle subprocess calls
+ func = getattr(util, 'subp')
+
+ def nsubp(*_args, **_kwargs):
+ return ('', '')
+
+ setattr(util, 'subp', nsubp)
+ self.patched_funcs.append((util, 'subp', func))
+
+ def null_func(*_args, **_kwargs):
+ return None
+
+ for f in ['chownbyid', 'chownbyname']:
+ func = getattr(util, f)
+ setattr(util, f, null_func)
+ self.patched_funcs.append((util, f, func))
+
+ def patchOS(self, new_root):
+ patch_funcs = {
+ os.path: ['isfile', 'exists', 'islink', 'isdir'],
+ }
+ for (mod, funcs) in patch_funcs.items():
+ for f in funcs:
+ func = getattr(mod, f)
+ trap_func = retarget_many_wrapper(new_root, 1, func)
+ setattr(mod, f, trap_func)
+ self.patched_funcs.append((mod, f, func))
+
+def populate_dir(path, files):
+ os.makedirs(path)
+ for (name, content) in files.iteritems():
+ with open(os.path.join(path, name), "w") as fp:
+ fp.write(content)
+ fp.close()
diff --git a/tests/unittests/test_builtin_handlers.py b/tests/unittests/test_builtin_handlers.py
index ebc0bd51..5f41cb3d 100644
--- a/tests/unittests/test_builtin_handlers.py
+++ b/tests/unittests/test_builtin_handlers.py
@@ -6,6 +6,7 @@ from mocker import MockerTestCase
from cloudinit import handlers
from cloudinit import helpers
+from cloudinit import util
from cloudinit.handlers import upstart_job
@@ -34,6 +35,7 @@ class TestBuiltins(MockerTestCase):
self.assertEquals(0, len(os.listdir(up_root)))
def test_upstart_frequency_single(self):
+ # files should be written out when frequency is ! per-instance
c_root = self.makeDir()
up_root = self.makeDir()
paths = helpers.Paths({
@@ -41,9 +43,12 @@ class TestBuiltins(MockerTestCase):
'upstart_dir': up_root,
})
freq = PER_INSTANCE
+
+ mock_subp = self.mocker.replace(util.subp, passthrough=False)
+ mock_subp(["initctl", "reload-configuration"], capture=False)
+ self.mocker.replay()
+
h = upstart_job.UpstartJobPartHandler(paths)
- # No files should be written out when
- # the frequency is ! per-instance
h.handle_part('', handlers.CONTENT_START,
None, None, None)
h.handle_part('blah', 'text/upstart-job',
diff --git a/tests/unittests/test_datasource/__init__.py b/tests/unittests/test_datasource/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/tests/unittests/test_datasource/__init__.py
diff --git a/tests/unittests/test_datasource/test_configdrive.py b/tests/unittests/test_datasource/test_configdrive.py
index 4fa13db8..930086db 100644
--- a/tests/unittests/test_datasource/test_configdrive.py
+++ b/tests/unittests/test_datasource/test_configdrive.py
@@ -6,12 +6,12 @@ import os.path
import mocker
from mocker import MockerTestCase
-from cloudinit.sources import DataSourceConfigDrive as ds
+from cloudinit import helpers
from cloudinit import settings
+from cloudinit.sources import DataSourceConfigDrive as ds
from cloudinit import util
-from cloudinit import helpers
-
+from tests.unittests import helpers as unit_helpers
PUBKEY = u'ssh-rsa AAAAB3NzaC1....sIkJhq8wdX+4I3A4cYbYP ubuntu@server-460\n'
EC2_META = {
@@ -90,23 +90,22 @@ class TestConfigDriveDataSource(MockerTestCase):
'swap': '/dev/vda3',
}
for name, dev_name in name_tests.items():
- my_mock = mocker.Mocker()
- find_mock = my_mock.replace(util.find_devs_with,
- spec=False, passthrough=False)
- provided_name = dev_name[len('/dev/'):]
- provided_name = "s" + provided_name[1:]
- find_mock(mocker.ARGS)
- my_mock.result([provided_name])
- exists_mock = my_mock.replace(os.path.exists,
- spec=False, passthrough=False)
- exists_mock(mocker.ARGS)
- my_mock.result(False)
- exists_mock(mocker.ARGS)
- my_mock.result(True)
- my_mock.replay()
- device = cfg_ds.device_name_to_device(name)
- my_mock.restore()
- self.assertEquals(dev_name, device)
+ with unit_helpers.mocker() as my_mock:
+ find_mock = my_mock.replace(util.find_devs_with,
+ spec=False, passthrough=False)
+ provided_name = dev_name[len('/dev/'):]
+ provided_name = "s" + provided_name[1:]
+ find_mock(mocker.ARGS)
+ my_mock.result([provided_name])
+ exists_mock = my_mock.replace(os.path.exists,
+ spec=False, passthrough=False)
+ exists_mock(mocker.ARGS)
+ my_mock.result(False)
+ exists_mock(mocker.ARGS)
+ my_mock.result(True)
+ my_mock.replay()
+ device = cfg_ds.device_name_to_device(name)
+ self.assertEquals(dev_name, device)
def test_dev_os_map(self):
populate_dir(self.tmp, CFG_DRIVE_FILES_V2)
@@ -123,19 +122,18 @@ class TestConfigDriveDataSource(MockerTestCase):
'swap': '/dev/vda3',
}
for name, dev_name in name_tests.items():
- my_mock = mocker.Mocker()
- find_mock = my_mock.replace(util.find_devs_with,
- spec=False, passthrough=False)
- find_mock(mocker.ARGS)
- my_mock.result([dev_name])
- exists_mock = my_mock.replace(os.path.exists,
- spec=False, passthrough=False)
- exists_mock(mocker.ARGS)
- my_mock.result(True)
- my_mock.replay()
- device = cfg_ds.device_name_to_device(name)
- my_mock.restore()
- self.assertEquals(dev_name, device)
+ with unit_helpers.mocker() as my_mock:
+ find_mock = my_mock.replace(util.find_devs_with,
+ spec=False, passthrough=False)
+ find_mock(mocker.ARGS)
+ my_mock.result([dev_name])
+ exists_mock = my_mock.replace(os.path.exists,
+ spec=False, passthrough=False)
+ exists_mock(mocker.ARGS)
+ my_mock.result(True)
+ my_mock.replay()
+ device = cfg_ds.device_name_to_device(name)
+ self.assertEquals(dev_name, device)
def test_dev_ec2_remap(self):
populate_dir(self.tmp, CFG_DRIVE_FILES_V2)
@@ -157,17 +155,16 @@ class TestConfigDriveDataSource(MockerTestCase):
'root2k': None,
}
for name, dev_name in name_tests.items():
- my_mock = mocker.Mocker()
- exists_mock = my_mock.replace(os.path.exists,
- spec=False, passthrough=False)
- exists_mock(mocker.ARGS)
- my_mock.result(False)
- exists_mock(mocker.ARGS)
- my_mock.result(True)
- my_mock.replay()
- device = cfg_ds.device_name_to_device(name)
- self.assertEquals(dev_name, device)
- my_mock.restore()
+ with unit_helpers.mocker(verify_calls=False) as my_mock:
+ exists_mock = my_mock.replace(os.path.exists,
+ spec=False, passthrough=False)
+ exists_mock(mocker.ARGS)
+ my_mock.result(False)
+ exists_mock(mocker.ARGS)
+ my_mock.result(True)
+ my_mock.replay()
+ device = cfg_ds.device_name_to_device(name)
+ self.assertEquals(dev_name, device)
def test_dev_ec2_map(self):
populate_dir(self.tmp, CFG_DRIVE_FILES_V2)
@@ -260,19 +257,25 @@ class TestConfigDriveDataSource(MockerTestCase):
ds.read_config_drive_dir, my_d)
def test_find_candidates(self):
- devs_with_answers = {
- "TYPE=vfat": [],
- "TYPE=iso9660": ["/dev/vdb"],
- "LABEL=config-2": ["/dev/vdb"],
- }
+ devs_with_answers = {}
def my_devs_with(criteria):
return devs_with_answers[criteria]
+ def my_is_partition(dev):
+ return dev[-1] in "0123456789" and not dev.startswith("sr")
+
try:
orig_find_devs_with = util.find_devs_with
util.find_devs_with = my_devs_with
+ orig_is_partition = util.is_partition
+ util.is_partition = my_is_partition
+
+ devs_with_answers = {"TYPE=vfat": [],
+ "TYPE=iso9660": ["/dev/vdb"],
+ "LABEL=config-2": ["/dev/vdb"],
+ }
self.assertEqual(["/dev/vdb"], ds.find_candidate_devs())
# add a vfat item
@@ -288,6 +291,33 @@ class TestConfigDriveDataSource(MockerTestCase):
finally:
util.find_devs_with = orig_find_devs_with
+ util.is_partition = orig_is_partition
+
+ def test_pubkeys_v2(self):
+ """Verify that public-keys work in config-drive-v2."""
+ populate_dir(self.tmp, CFG_DRIVE_FILES_V2)
+ myds = cfg_ds_from_dir(self.tmp)
+ self.assertEqual(myds.get_public_ssh_keys(),
+ [OSTACK_META['public_keys']['mykey']])
+
+
+def cfg_ds_from_dir(seed_d):
+ found = ds.read_config_drive_dir(seed_d)
+ cfg_ds = ds.DataSourceConfigDrive(settings.CFG_BUILTIN, None,
+ helpers.Paths({}))
+ populate_ds_from_read_config(cfg_ds, seed_d, found)
+ return cfg_ds
+
+
+def populate_ds_from_read_config(cfg_ds, source, results):
+ """Patch the DataSourceConfigDrive from the results of
+ read_config_drive_dir hopefully in line with what it would have
+ if cfg_ds.get_data had been successfully called"""
+ cfg_ds.source = source
+ cfg_ds.metadata = results.get('metadata')
+ cfg_ds.ec2_metadata = results.get('ec2-metadata')
+ cfg_ds.userdata_raw = results.get('userdata')
+ cfg_ds.version = results.get('cfgdrive_ver')
def populate_dir(seed_dir, files):
diff --git a/tests/unittests/test_datasource/test_maas.py b/tests/unittests/test_datasource/test_maas.py
index 85e6add0..b56fea82 100644
--- a/tests/unittests/test_datasource/test_maas.py
+++ b/tests/unittests/test_datasource/test_maas.py
@@ -3,6 +3,7 @@ import os
from cloudinit.sources import DataSourceMAAS
from cloudinit import url_helper
+from tests.unittests.helpers import populate_dir
from mocker import MockerTestCase
@@ -137,11 +138,4 @@ class TestMAASDataSource(MockerTestCase):
pass
-def populate_dir(seed_dir, files):
- os.mkdir(seed_dir)
- for (name, content) in files.iteritems():
- with open(os.path.join(seed_dir, name), "w") as fp:
- fp.write(content)
- fp.close()
-
# vi: ts=4 expandtab
diff --git a/tests/unittests/test_datasource/test_nocloud.py b/tests/unittests/test_datasource/test_nocloud.py
new file mode 100644
index 00000000..28e0a472
--- /dev/null
+++ b/tests/unittests/test_datasource/test_nocloud.py
@@ -0,0 +1,157 @@
+from cloudinit import helpers
+from tests.unittests.helpers import populate_dir
+from cloudinit.sources import DataSourceNoCloud
+from cloudinit import util
+
+from mocker import MockerTestCase
+import os
+import yaml
+
+
+class TestNoCloudDataSource(MockerTestCase):
+
+ def setUp(self):
+ self.tmp = self.makeDir()
+ self.paths = helpers.Paths({'cloud_dir': self.tmp})
+
+ self.cmdline = "root=TESTCMDLINE"
+
+ self.unapply = []
+ self.apply_patches([(util, 'get_cmdline', self._getcmdline)])
+ super(TestNoCloudDataSource, self).setUp()
+
+ def tearDown(self):
+ apply_patches([i for i in reversed(self.unapply)])
+ super(TestNoCloudDataSource, self).setUp()
+
+ def apply_patches(self, patches):
+ ret = apply_patches(patches)
+ self.unapply += ret
+
+ def _getcmdline(self):
+ return self.cmdline
+
+ def test_nocloud_seed_dir(self):
+ md = {'instance-id': 'IID', 'dsmode': 'local'}
+ ud = "USER_DATA_HERE"
+ populate_dir(os.path.join(self.paths.seed_dir, "nocloud"),
+ {'user-data': ud, 'meta-data': yaml.safe_dump(md)})
+
+ sys_cfg = {
+ 'datasource': {'NoCloud': {'fs_label': None}}
+ }
+
+ ds = DataSourceNoCloud.DataSourceNoCloud
+
+ dsrc = ds(sys_cfg=sys_cfg, distro=None, paths=self.paths)
+ ret = dsrc.get_data()
+ self.assertEqual(dsrc.userdata_raw, ud)
+ self.assertEqual(dsrc.metadata, md)
+ self.assertTrue(ret)
+
+ def test_fs_label(self):
+ #find_devs_with should not be called ff fs_label is None
+ ds = DataSourceNoCloud.DataSourceNoCloud
+
+ class PsuedoException(Exception):
+ pass
+
+ def my_find_devs_with(*args, **kwargs):
+ _f = (args, kwargs)
+ raise PsuedoException
+
+ self.apply_patches([(util, 'find_devs_with', my_find_devs_with)])
+
+ # by default, NoCloud should search for filesystems by label
+ sys_cfg = {'datasource': {'NoCloud': {}}}
+ dsrc = ds(sys_cfg=sys_cfg, distro=None, paths=self.paths)
+ self.assertRaises(PsuedoException, dsrc.get_data)
+
+ # but disabling searching should just end up with None found
+ sys_cfg = {'datasource': {'NoCloud': {'fs_label': None}}}
+ dsrc = ds(sys_cfg=sys_cfg, distro=None, paths=self.paths)
+ ret = dsrc.get_data()
+ self.assertFalse(ret)
+
+ def test_no_datasource_expected(self):
+ #no source should be found if no cmdline, config, and fs_label=None
+ sys_cfg = {'datasource': {'NoCloud': {'fs_label': None}}}
+
+ ds = DataSourceNoCloud.DataSourceNoCloud
+ dsrc = ds(sys_cfg=sys_cfg, distro=None, paths=self.paths)
+ self.assertFalse(dsrc.get_data())
+
+ def test_seed_in_config(self):
+ ds = DataSourceNoCloud.DataSourceNoCloud
+
+ data = {
+ 'fs_label': None,
+ 'meta-data': {'instance-id': 'IID'},
+ 'user-data': "USER_DATA_RAW",
+ }
+
+ sys_cfg = {'datasource': {'NoCloud': data}}
+ dsrc = ds(sys_cfg=sys_cfg, distro=None, paths=self.paths)
+ ret = dsrc.get_data()
+ self.assertEqual(dsrc.userdata_raw, "USER_DATA_RAW")
+ self.assertEqual(dsrc.metadata.get('instance-id'), 'IID')
+ self.assertTrue(ret)
+
+
+class TestParseCommandLineData(MockerTestCase):
+
+ def test_parse_cmdline_data_valid(self):
+ ds_id = "ds=nocloud"
+ pairs = (
+ ("root=/dev/sda1 %(ds_id)s", {}),
+ ("%(ds_id)s; root=/dev/foo", {}),
+ ("%(ds_id)s", {}),
+ ("%(ds_id)s;", {}),
+ ("%(ds_id)s;s=SEED", {'seedfrom': 'SEED'}),
+ ("%(ds_id)s;seedfrom=SEED;local-hostname=xhost",
+ {'seedfrom': 'SEED', 'local-hostname': 'xhost'}),
+ ("%(ds_id)s;h=xhost",
+ {'local-hostname': 'xhost'}),
+ ("%(ds_id)s;h=xhost;i=IID",
+ {'local-hostname': 'xhost', 'instance-id': 'IID'}),
+ )
+
+ for (fmt, expected) in pairs:
+ fill = {}
+ cmdline = fmt % {'ds_id': ds_id}
+ ret = DataSourceNoCloud.parse_cmdline_data(ds_id=ds_id, fill=fill,
+ cmdline=cmdline)
+ self.assertEqual(expected, fill)
+ self.assertTrue(ret)
+
+ def test_parse_cmdline_data_none(self):
+ ds_id = "ds=foo"
+ cmdlines = (
+ "root=/dev/sda1 ro",
+ "console=/dev/ttyS0 root=/dev/foo",
+ "",
+ "ds=foocloud",
+ "ds=foo-net",
+ "ds=nocloud;s=SEED",
+ )
+
+ for cmdline in cmdlines:
+ fill = {}
+ ret = DataSourceNoCloud.parse_cmdline_data(ds_id=ds_id, fill=fill,
+ cmdline=cmdline)
+ self.assertEqual(fill, {})
+ self.assertFalse(ret)
+
+
+def apply_patches(patches):
+ ret = []
+ for (ref, name, replace) in patches:
+ if replace is None:
+ continue
+ orig = getattr(ref, name)
+ setattr(ref, name, replace)
+ ret.append((ref, name, orig))
+ return ret
+
+
+# vi: ts=4 expandtab
diff --git a/tests/unittests/test_distros/__init__.py b/tests/unittests/test_distros/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/tests/unittests/test_distros/__init__.py
diff --git a/tests/unittests/test_distros/test_generic.py b/tests/unittests/test_distros/test_generic.py
index 2df4c2f0..7befb8c8 100644
--- a/tests/unittests/test_distros/test_generic.py
+++ b/tests/unittests/test_distros/test_generic.py
@@ -1,6 +1,9 @@
-from mocker import MockerTestCase
-
from cloudinit import distros
+from cloudinit import util
+
+from tests.unittests import helpers
+
+import os
unknown_arch_info = {
'arches': ['default'],
@@ -27,7 +30,7 @@ gpmi = distros._get_package_mirror_info # pylint: disable=W0212
gapmi = distros._get_arch_package_mirror_info # pylint: disable=W0212
-class TestGenericDistro(MockerTestCase):
+class TestGenericDistro(helpers.FilesystemMockingTestCase):
def return_first(self, mlist):
if not mlist:
@@ -52,6 +55,82 @@ class TestGenericDistro(MockerTestCase):
# Make a temp directoy for tests to use.
self.tmp = self.makeDir()
+ def _write_load_sudoers(self, _user, rules):
+ cls = distros.fetch("ubuntu")
+ d = cls("ubuntu", {}, None)
+ os.makedirs(os.path.join(self.tmp, "etc"))
+ os.makedirs(os.path.join(self.tmp, "etc", 'sudoers.d'))
+ self.patchOS(self.tmp)
+ self.patchUtils(self.tmp)
+ d.write_sudo_rules("harlowja", rules)
+ contents = util.load_file(d.ci_sudoers_fn)
+ self.restore()
+ return contents
+
+ def _count_in(self, lines_look_for, text_content):
+ found_amount = 0
+ for e in lines_look_for:
+ for line in text_content.splitlines():
+ line = line.strip()
+ if line == e:
+ found_amount += 1
+ return found_amount
+
+ def test_sudoers_ensure_rules(self):
+ rules = 'ALL=(ALL:ALL) ALL'
+ contents = self._write_load_sudoers('harlowja', rules)
+ expected = ['harlowja ALL=(ALL:ALL) ALL']
+ self.assertEquals(len(expected), self._count_in(expected, contents))
+ not_expected = [
+ 'harlowja A',
+ 'harlowja L',
+ 'harlowja L',
+ ]
+ self.assertEquals(0, self._count_in(not_expected, contents))
+
+ def test_sudoers_ensure_rules_list(self):
+ rules = [
+ 'ALL=(ALL:ALL) ALL',
+ 'B-ALL=(ALL:ALL) ALL',
+ 'C-ALL=(ALL:ALL) ALL',
+ ]
+ contents = self._write_load_sudoers('harlowja', rules)
+ expected = [
+ 'harlowja ALL=(ALL:ALL) ALL',
+ 'harlowja B-ALL=(ALL:ALL) ALL',
+ 'harlowja C-ALL=(ALL:ALL) ALL',
+ ]
+ self.assertEquals(len(expected), self._count_in(expected, contents))
+ not_expected = [
+ 'harlowja A',
+ 'harlowja L',
+ 'harlowja L',
+ ]
+ self.assertEquals(0, self._count_in(not_expected, contents))
+
+ def test_sudoers_ensure_new(self):
+ cls = distros.fetch("ubuntu")
+ d = cls("ubuntu", {}, None)
+ self.patchOS(self.tmp)
+ self.patchUtils(self.tmp)
+ d.ensure_sudo_dir("/b")
+ contents = util.load_file("/etc/sudoers")
+ self.assertIn("includedir /b", contents)
+ self.assertTrue(os.path.isdir("/b"))
+
+ def test_sudoers_ensure_append(self):
+ cls = distros.fetch("ubuntu")
+ d = cls("ubuntu", {}, None)
+ self.patchOS(self.tmp)
+ self.patchUtils(self.tmp)
+ util.write_file("/etc/sudoers", "josh, josh\n")
+ d.ensure_sudo_dir("/b")
+ contents = util.load_file("/etc/sudoers")
+ self.assertIn("includedir /b", contents)
+ self.assertTrue(os.path.isdir("/b"))
+ self.assertIn("josh", contents)
+ self.assertEquals(2, contents.count("josh"))
+
def test_arch_package_mirror_info_unknown(self):
"""for an unknown arch, we should get back that with arch 'default'."""
arch_mirrors = gapmi(package_mirrors, arch="unknown")
diff --git a/tests/unittests/test_distros/test_hostname.py b/tests/unittests/test_distros/test_hostname.py
new file mode 100644
index 00000000..8e644f4d
--- /dev/null
+++ b/tests/unittests/test_distros/test_hostname.py
@@ -0,0 +1,38 @@
+from mocker import MockerTestCase
+
+from cloudinit.distros.parsers import hostname
+
+
+BASE_HOSTNAME = '''
+# My super-duper-hostname
+
+blahblah
+
+'''
+BASE_HOSTNAME = BASE_HOSTNAME.strip()
+
+
+class TestHostnameHelper(MockerTestCase):
+ def test_parse_same(self):
+ hn = hostname.HostnameConf(BASE_HOSTNAME)
+ self.assertEquals(str(hn).strip(), BASE_HOSTNAME)
+ self.assertEquals(hn.hostname, 'blahblah')
+
+ def test_no_adjust_hostname(self):
+ hn = hostname.HostnameConf(BASE_HOSTNAME)
+ prev_name = hn.hostname
+ hn.set_hostname("")
+ self.assertEquals(hn.hostname, prev_name)
+
+ def test_adjust_hostname(self):
+ hn = hostname.HostnameConf(BASE_HOSTNAME)
+ prev_name = hn.hostname
+ self.assertEquals(prev_name, 'blahblah')
+ hn.set_hostname("bbbbd")
+ self.assertEquals(hn.hostname, 'bbbbd')
+ expected_out = '''
+# My super-duper-hostname
+
+bbbbd
+'''
+ self.assertEquals(str(hn).strip(), expected_out.strip())
diff --git a/tests/unittests/test_distros/test_hosts.py b/tests/unittests/test_distros/test_hosts.py
new file mode 100644
index 00000000..687a0dab
--- /dev/null
+++ b/tests/unittests/test_distros/test_hosts.py
@@ -0,0 +1,41 @@
+from mocker import MockerTestCase
+
+from cloudinit.distros.parsers import hosts
+
+
+BASE_ETC = '''
+# Example
+127.0.0.1 localhost
+192.168.1.10 foo.mydomain.org foo
+192.168.1.10 bar.mydomain.org bar
+146.82.138.7 master.debian.org master
+209.237.226.90 www.opensource.org
+'''
+BASE_ETC = BASE_ETC.strip()
+
+
+class TestHostsHelper(MockerTestCase):
+ def test_parse(self):
+ eh = hosts.HostsConf(BASE_ETC)
+ self.assertEquals(eh.get_entry('127.0.0.1'), [['localhost']])
+ self.assertEquals(eh.get_entry('192.168.1.10'),
+ [['foo.mydomain.org', 'foo'],
+ ['bar.mydomain.org', 'bar']])
+ eh = str(eh)
+ self.assertTrue(eh.startswith('# Example'))
+
+ def test_add(self):
+ eh = hosts.HostsConf(BASE_ETC)
+ eh.add_entry('127.0.0.0', 'blah')
+ self.assertEquals(eh.get_entry('127.0.0.0'), [['blah']])
+ eh.add_entry('127.0.0.3', 'blah', 'blah2', 'blah3')
+ self.assertEquals(eh.get_entry('127.0.0.3'),
+ [['blah', 'blah2', 'blah3']])
+
+ def test_del(self):
+ eh = hosts.HostsConf(BASE_ETC)
+ eh.add_entry('127.0.0.0', 'blah')
+ self.assertEquals(eh.get_entry('127.0.0.0'), [['blah']])
+
+ eh.del_entries('127.0.0.0')
+ self.assertEquals(eh.get_entry('127.0.0.0'), [])
diff --git a/tests/unittests/test_distros/test_netconfig.py b/tests/unittests/test_distros/test_netconfig.py
index 55765f0c..9763b14b 100644
--- a/tests/unittests/test_distros/test_netconfig.py
+++ b/tests/unittests/test_distros/test_netconfig.py
@@ -9,6 +9,8 @@ from cloudinit import helpers
from cloudinit import settings
from cloudinit import util
+from cloudinit.distros.parsers.sys_conf import SysConf
+
from StringIO import StringIO
@@ -83,9 +85,8 @@ class TestNetCfgDistro(MockerTestCase):
self.assertEquals(write_buf.mode, 0644)
def assertCfgEquals(self, blob1, blob2):
- cfg_tester = distros.rhel.QuotingConfigObj
- b1 = dict(cfg_tester(blob1.strip().splitlines()))
- b2 = dict(cfg_tester(blob2.strip().splitlines()))
+ b1 = dict(SysConf(blob1.strip().splitlines()))
+ b2 = dict(SysConf(blob2.strip().splitlines()))
self.assertEquals(b1, b2)
for (k, v) in b1.items():
self.assertIn(k, b2)
diff --git a/tests/unittests/test_distros/test_resolv.py b/tests/unittests/test_distros/test_resolv.py
new file mode 100644
index 00000000..6b6ff6aa
--- /dev/null
+++ b/tests/unittests/test_distros/test_resolv.py
@@ -0,0 +1,61 @@
+from mocker import MockerTestCase
+
+from cloudinit.distros.parsers import resolv_conf
+
+import re
+
+
+BASE_RESOLVE = '''
+; generated by /sbin/dhclient-script
+search blah.yahoo.com yahoo.com
+nameserver 10.15.44.14
+nameserver 10.15.30.92
+'''
+BASE_RESOLVE = BASE_RESOLVE.strip()
+
+
+class TestResolvHelper(MockerTestCase):
+ def test_parse_same(self):
+ rp = resolv_conf.ResolvConf(BASE_RESOLVE)
+ rp_r = str(rp).strip()
+ self.assertEquals(BASE_RESOLVE, rp_r)
+
+ def test_local_domain(self):
+ rp = resolv_conf.ResolvConf(BASE_RESOLVE)
+ self.assertEquals(None, rp.local_domain)
+
+ rp.local_domain = "bob"
+ self.assertEquals('bob', rp.local_domain)
+ self.assertIn('domain bob', str(rp))
+
+ def test_nameservers(self):
+ rp = resolv_conf.ResolvConf(BASE_RESOLVE)
+ self.assertIn('10.15.44.14', rp.nameservers)
+ self.assertIn('10.15.30.92', rp.nameservers)
+ rp.add_nameserver('10.2')
+ self.assertIn('10.2', rp.nameservers)
+ self.assertIn('nameserver 10.2', str(rp))
+ self.assertNotIn('10.3', rp.nameservers)
+ self.assertEquals(len(rp.nameservers), 3)
+ rp.add_nameserver('10.2')
+ self.assertRaises(ValueError, rp.add_nameserver, '10.3')
+ self.assertNotIn('10.3', rp.nameservers)
+
+ def test_search_domains(self):
+ rp = resolv_conf.ResolvConf(BASE_RESOLVE)
+ self.assertIn('yahoo.com', rp.search_domains)
+ self.assertIn('blah.yahoo.com', rp.search_domains)
+ rp.add_search_domain('bbb.y.com')
+ self.assertIn('bbb.y.com', rp.search_domains)
+ self.assertTrue(re.search(r'search(.*)bbb.y.com(.*)', str(rp)))
+ self.assertIn('bbb.y.com', rp.search_domains)
+ rp.add_search_domain('bbb.y.com')
+ self.assertEquals(len(rp.search_domains), 3)
+ rp.add_search_domain('bbb2.y.com')
+ self.assertEquals(len(rp.search_domains), 4)
+ rp.add_search_domain('bbb3.y.com')
+ self.assertEquals(len(rp.search_domains), 5)
+ rp.add_search_domain('bbb4.y.com')
+ self.assertEquals(len(rp.search_domains), 6)
+ self.assertRaises(ValueError, rp.add_search_domain, 'bbb5.y.com')
+ self.assertEquals(len(rp.search_domains), 6)
diff --git a/tests/unittests/test_distros/test_sysconfig.py b/tests/unittests/test_distros/test_sysconfig.py
new file mode 100644
index 00000000..0c651407
--- /dev/null
+++ b/tests/unittests/test_distros/test_sysconfig.py
@@ -0,0 +1,82 @@
+from mocker import MockerTestCase
+
+import re
+
+from cloudinit.distros.parsers.sys_conf import SysConf
+
+
+# Lots of good examples @
+# http://content.hccfl.edu/pollock/AUnix1/SysconfigFilesDesc.txt
+
+class TestSysConfHelper(MockerTestCase):
+ # This function was added in 2.7, make it work for 2.6
+ def assertRegMatches(self, text, regexp):
+ regexp = re.compile(regexp)
+ self.assertTrue(regexp.search(text),
+ msg="%s must match %s!" % (text, regexp.pattern))
+
+ def test_parse_no_change(self):
+ contents = '''# A comment
+USESMBAUTH=no
+KEYTABLE=/usr/lib/kbd/keytables/us.map
+SHORTDATE=$(date +%y:%m:%d:%H:%M)
+HOSTNAME=blahblah
+NETMASK0=255.255.255.0
+# Inline comment
+LIST=$LOGROOT/incremental-list
+IPV6TO4_ROUTING='eth0-:0004::1/64 eth1-:0005::1/64'
+ETHTOOL_OPTS="-K ${DEVICE} tso on; -G ${DEVICE} rx 256 tx 256"
+USEMD5=no'''
+ conf = SysConf(contents.splitlines())
+ self.assertEquals(conf['HOSTNAME'], 'blahblah')
+ self.assertEquals(conf['SHORTDATE'], '$(date +%y:%m:%d:%H:%M)')
+ # Should be unquoted
+ self.assertEquals(conf['ETHTOOL_OPTS'], ('-K ${DEVICE} tso on; '
+ '-G ${DEVICE} rx 256 tx 256'))
+ self.assertEquals(contents, str(conf))
+
+ def test_parse_shell_vars(self):
+ contents = 'USESMBAUTH=$XYZ'
+ conf = SysConf(contents.splitlines())
+ self.assertEquals(contents, str(conf))
+ conf = SysConf('')
+ conf['B'] = '${ZZ}d apples'
+ # Should be quoted
+ self.assertEquals('B="${ZZ}d apples"', str(conf))
+ conf = SysConf('')
+ conf['B'] = '$? d apples'
+ self.assertEquals('B="$? d apples"', str(conf))
+ contents = 'IPMI_WATCHDOG_OPTIONS="timeout=60"'
+ conf = SysConf(contents.splitlines())
+ self.assertEquals('IPMI_WATCHDOG_OPTIONS=timeout=60', str(conf))
+
+ def test_parse_adjust(self):
+ contents = 'IPV6TO4_ROUTING="eth0-:0004::1/64 eth1-:0005::1/64"'
+ conf = SysConf(contents.splitlines())
+ # Should be unquoted
+ self.assertEquals('eth0-:0004::1/64 eth1-:0005::1/64',
+ conf['IPV6TO4_ROUTING'])
+ conf['IPV6TO4_ROUTING'] = "blah \tblah"
+ contents2 = str(conf).strip()
+ # Should be requoted due to whitespace
+ self.assertRegMatches(contents2,
+ r'IPV6TO4_ROUTING=[\']blah\s+blah[\']')
+
+ def test_parse_no_adjust_shell(self):
+ conf = SysConf(''.splitlines())
+ conf['B'] = ' $(time)'
+ contents = str(conf)
+ self.assertEquals('B= $(time)', contents)
+
+ def test_parse_empty(self):
+ contents = ''
+ conf = SysConf(contents.splitlines())
+ self.assertEquals('', str(conf).strip())
+
+ def test_parse_add_new(self):
+ contents = 'BLAH=b'
+ conf = SysConf(contents.splitlines())
+ conf['Z'] = 'd'
+ contents = str(conf)
+ self.assertIn("Z=d", contents)
+ self.assertIn("BLAH=b", contents)
diff --git a/tests/unittests/test_distros/test_user_data_normalize.py b/tests/unittests/test_distros/test_user_data_normalize.py
index 9d6fb996..50398c74 100644
--- a/tests/unittests/test_distros/test_user_data_normalize.py
+++ b/tests/unittests/test_distros/test_user_data_normalize.py
@@ -4,24 +4,54 @@ from cloudinit import distros
from cloudinit import helpers
from cloudinit import settings
+bcfg = {
+ 'name': 'bob',
+ 'plain_text_passwd': 'ubuntu',
+ 'home': "/home/ubuntu",
+ 'shell': "/bin/bash",
+ 'lock_passwd': True,
+ 'gecos': "Ubuntu",
+ 'groups': ["foo"]
+}
+
class TestUGNormalize(MockerTestCase):
- def _make_distro(self, dtype, def_user=None, def_groups=None):
+ def _make_distro(self, dtype, def_user=None):
cfg = dict(settings.CFG_BUILTIN)
cfg['system_info']['distro'] = dtype
paths = helpers.Paths(cfg['system_info']['paths'])
distro_cls = distros.fetch(dtype)
- distro = distro_cls(dtype, cfg['system_info'], paths)
if def_user:
- distro.default_user = def_user
- if def_groups:
- distro.default_user_groups = def_groups
+ cfg['system_info']['default_user'] = def_user.copy()
+ distro = distro_cls(dtype, cfg['system_info'], paths)
return distro
def _norm(self, cfg, distro):
return distros.normalize_users_groups(cfg, distro)
+ def test_group_dict(self):
+ distro = self._make_distro('ubuntu')
+ g = {'groups': [
+ {
+ 'ubuntu': ['foo', 'bar'],
+ 'bob': 'users',
+ },
+ 'cloud-users',
+ {
+ 'bob': 'users2',
+ },
+ ]
+ }
+ (_users, groups) = self._norm(g, distro)
+ self.assertIn('ubuntu', groups)
+ ub_members = groups['ubuntu']
+ self.assertEquals(sorted(['foo', 'bar']), sorted(ub_members))
+ self.assertIn('bob', groups)
+ b_members = groups['bob']
+ self.assertEquals(sorted(['users', 'users2']),
+ sorted(b_members))
+
def test_basic_groups(self):
distro = self._make_distro('ubuntu')
ug_cfg = {
@@ -71,7 +101,7 @@ class TestUGNormalize(MockerTestCase):
self.assertEquals({}, users)
def test_users_simple_dict(self):
- distro = self._make_distro('ubuntu', 'bob')
+ distro = self._make_distro('ubuntu', bcfg)
ug_cfg = {
'users': {
'default': True,
@@ -95,7 +125,7 @@ class TestUGNormalize(MockerTestCase):
self.assertIn('bob', users)
def test_users_simple_dict_no(self):
- distro = self._make_distro('ubuntu', 'bob')
+ distro = self._make_distro('ubuntu', bcfg)
ug_cfg = {
'users': {
'default': False,
@@ -137,32 +167,35 @@ class TestUGNormalize(MockerTestCase):
self.assertEquals({'default': False}, users['bob'])
def test_users_old_user(self):
- distro = self._make_distro('ubuntu', 'bob')
+ distro = self._make_distro('ubuntu', bcfg)
ug_cfg = {
'user': 'zetta',
'users': 'default'
}
(users, _groups) = self._norm(ug_cfg, distro)
- self.assertIn('bob', users)
+ self.assertNotIn('bob', users) # Bob is not the default now, zetta is
self.assertIn('zetta', users)
+ self.assertTrue(users['zetta']['default'])
self.assertNotIn('default', users)
ug_cfg = {
'user': 'zetta',
'users': 'default, joe'
}
(users, _groups) = self._norm(ug_cfg, distro)
- self.assertIn('bob', users)
+ self.assertNotIn('bob', users) # Bob is not the default now, zetta is
self.assertIn('joe', users)
self.assertIn('zetta', users)
+ self.assertTrue(users['zetta']['default'])
self.assertNotIn('default', users)
ug_cfg = {
'user': 'zetta',
'users': ['bob', 'joe']
}
(users, _groups) = self._norm(ug_cfg, distro)
- self.assertNotIn('bob', users)
+ self.assertIn('bob', users)
self.assertIn('joe', users)
self.assertIn('zetta', users)
+ self.assertTrue(users['zetta']['default'])
ug_cfg = {
'user': 'zetta',
'users': {
@@ -174,6 +207,7 @@ class TestUGNormalize(MockerTestCase):
self.assertIn('bob', users)
self.assertIn('joe', users)
self.assertIn('zetta', users)
+ self.assertTrue(users['zetta']['default'])
ug_cfg = {
'user': 'zetta',
}
@@ -185,7 +219,7 @@ class TestUGNormalize(MockerTestCase):
self.assertEquals({}, groups)
def test_users_dict_default_additional(self):
- distro = self._make_distro('ubuntu', 'bob')
+ distro = self._make_distro('ubuntu', bcfg)
ug_cfg = {
'users': [
{'name': 'default', 'blah': True}
@@ -201,7 +235,7 @@ class TestUGNormalize(MockerTestCase):
users['bob']['default'])
def test_users_dict_extract(self):
- distro = self._make_distro('ubuntu', 'bob')
+ distro = self._make_distro('ubuntu', bcfg)
ug_cfg = {
'users': [
'default',
@@ -228,7 +262,7 @@ class TestUGNormalize(MockerTestCase):
self.assertEquals(config, expected_config)
def test_users_dict_default(self):
- distro = self._make_distro('ubuntu', 'bob')
+ distro = self._make_distro('ubuntu', bcfg)
ug_cfg = {
'users': [
'default',
diff --git a/tests/unittests/test_filters/__init__.py b/tests/unittests/test_filters/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/tests/unittests/test_filters/__init__.py
diff --git a/tests/unittests/test_filters/test_launch_index.py b/tests/unittests/test_filters/test_launch_index.py
index 7ca7cbb6..773bb312 100644
--- a/tests/unittests/test_filters/test_launch_index.py
+++ b/tests/unittests/test_filters/test_launch_index.py
@@ -1,6 +1,6 @@
import copy
-import helpers as th
+from tests.unittests import helpers
import itertools
@@ -18,7 +18,7 @@ def count_messages(root):
return am
-class TestLaunchFilter(th.ResourceUsingTestCase):
+class TestLaunchFilter(helpers.ResourceUsingTestCase):
def assertCounts(self, message, expected_counts):
orig_message = copy.deepcopy(message)
diff --git a/tests/unittests/test_handler/__init__.py b/tests/unittests/test_handler/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/tests/unittests/test_handler/__init__.py
diff --git a/tests/unittests/test_handler/test_handler_ca_certs.py b/tests/unittests/test_handler/test_handler_ca_certs.py
index d3df5c50..0558023a 100644
--- a/tests/unittests/test_handler/test_handler_ca_certs.py
+++ b/tests/unittests/test_handler/test_handler_ca_certs.py
@@ -77,7 +77,7 @@ class TestConfig(MockerTestCase):
"""Test that a single cert gets passed to add_ca_certs."""
config = {"ca-certs": {"trusted": ["CERT1"]}}
- self.mock_add(self.paths, ["CERT1"])
+ self.mock_add(["CERT1"])
self.mock_update()
self.mocker.replay()
@@ -87,7 +87,7 @@ class TestConfig(MockerTestCase):
"""Test that multiple certs get passed to add_ca_certs."""
config = {"ca-certs": {"trusted": ["CERT1", "CERT2"]}}
- self.mock_add(self.paths, ["CERT1", "CERT2"])
+ self.mock_add(["CERT1", "CERT2"])
self.mock_update()
self.mocker.replay()
@@ -97,7 +97,7 @@ class TestConfig(MockerTestCase):
"""Test remove_defaults works as expected."""
config = {"ca-certs": {"remove-defaults": True}}
- self.mock_remove(self.paths)
+ self.mock_remove()
self.mock_update()
self.mocker.replay()
@@ -116,8 +116,8 @@ class TestConfig(MockerTestCase):
"""Test remove_defaults is not called when config value is False."""
config = {"ca-certs": {"remove-defaults": True, "trusted": ["CERT1"]}}
- self.mock_remove(self.paths)
- self.mock_add(self.paths, ["CERT1"])
+ self.mock_remove()
+ self.mock_add(["CERT1"])
self.mock_update()
self.mocker.replay()
@@ -136,20 +136,52 @@ class TestAddCaCerts(MockerTestCase):
"""Test that no certificate are written if not provided."""
self.mocker.replace(util.write_file, passthrough=False)
self.mocker.replay()
- cc_ca_certs.add_ca_certs(self.paths, [])
+ cc_ca_certs.add_ca_certs([])
- def test_single_cert(self):
- """Test adding a single certificate to the trusted CAs."""
+ def test_single_cert_trailing_cr(self):
+ """Test adding a single certificate to the trusted CAs
+ when existing ca-certificates has trailing newline"""
cert = "CERT1\nLINE2\nLINE3"
+ ca_certs_content = "line1\nline2\ncloud-init-ca-certs.crt\nline3\n"
+ expected = "line1\nline2\nline3\ncloud-init-ca-certs.crt\n"
+
mock_write = self.mocker.replace(util.write_file, passthrough=False)
+ mock_load = self.mocker.replace(util.load_file, passthrough=False)
+
mock_write("/usr/share/ca-certificates/cloud-init-ca-certs.crt",
cert, mode=0644)
+
+ mock_load("/etc/ca-certificates.conf")
+ self.mocker.result(ca_certs_content)
+
+ mock_write("/etc/ca-certificates.conf", expected, omode="wb")
+ self.mocker.replay()
+
+ cc_ca_certs.add_ca_certs([cert])
+
+ def test_single_cert_no_trailing_cr(self):
+ """Test adding a single certificate to the trusted CAs
+ when existing ca-certificates has no trailing newline"""
+ cert = "CERT1\nLINE2\nLINE3"
+
+ ca_certs_content = "line1\nline2\nline3"
+
+ mock_write = self.mocker.replace(util.write_file, passthrough=False)
+ mock_load = self.mocker.replace(util.load_file, passthrough=False)
+
+ mock_write("/usr/share/ca-certificates/cloud-init-ca-certs.crt",
+ cert, mode=0644)
+
+ mock_load("/etc/ca-certificates.conf")
+ self.mocker.result(ca_certs_content)
+
mock_write("/etc/ca-certificates.conf",
- "\ncloud-init-ca-certs.crt", omode="ab")
+ "%s\n%s\n" % (ca_certs_content, "cloud-init-ca-certs.crt"),
+ omode="wb")
self.mocker.replay()
- cc_ca_certs.add_ca_certs(self.paths, [cert])
+ cc_ca_certs.add_ca_certs([cert])
def test_multiple_certs(self):
"""Test adding multiple certificates to the trusted CAs."""
@@ -157,13 +189,21 @@ class TestAddCaCerts(MockerTestCase):
expected_cert_file = "\n".join(certs)
mock_write = self.mocker.replace(util.write_file, passthrough=False)
+ mock_load = self.mocker.replace(util.load_file, passthrough=False)
+
mock_write("/usr/share/ca-certificates/cloud-init-ca-certs.crt",
expected_cert_file, mode=0644)
- mock_write("/etc/ca-certificates.conf",
- "\ncloud-init-ca-certs.crt", omode="ab")
+
+ ca_certs_content = "line1\nline2\nline3"
+ mock_load("/etc/ca-certificates.conf")
+ self.mocker.result(ca_certs_content)
+
+ out = "%s\n%s\n" % (ca_certs_content, "cloud-init-ca-certs.crt")
+ mock_write("/etc/ca-certificates.conf", out, omode="wb")
+
self.mocker.replay()
- cc_ca_certs.add_ca_certs(self.paths, certs)
+ cc_ca_certs.add_ca_certs(certs)
class TestUpdateCaCerts(MockerTestCase):
@@ -198,4 +238,4 @@ class TestRemoveDefaultCaCerts(MockerTestCase):
"ca-certificates ca-certificates/trust_new_crts select no")
self.mocker.replay()
- cc_ca_certs.remove_default_ca_certs(self.paths)
+ cc_ca_certs.remove_default_ca_certs()
diff --git a/tests/unittests/test_handler/test_handler_power_state.py b/tests/unittests/test_handler/test_handler_power_state.py
new file mode 100644
index 00000000..f6e37fa5
--- /dev/null
+++ b/tests/unittests/test_handler/test_handler_power_state.py
@@ -0,0 +1,88 @@
+from cloudinit.config import cc_power_state_change as psc
+
+from tests.unittests import helpers as t_help
+
+
+class TestLoadPowerState(t_help.TestCase):
+ def setUp(self):
+ super(self.__class__, self).setUp()
+
+ def test_no_config(self):
+ # completely empty config should mean do nothing
+ (cmd, _timeout) = psc.load_power_state({})
+ self.assertEqual(cmd, None)
+
+ def test_irrelevant_config(self):
+ # no power_state field in config should return None for cmd
+ (cmd, _timeout) = psc.load_power_state({'foo': 'bar'})
+ self.assertEqual(cmd, None)
+
+ def test_invalid_mode(self):
+ cfg = {'power_state': {'mode': 'gibberish'}}
+ self.assertRaises(TypeError, psc.load_power_state, cfg)
+
+ cfg = {'power_state': {'mode': ''}}
+ self.assertRaises(TypeError, psc.load_power_state, cfg)
+
+ def test_empty_mode(self):
+ cfg = {'power_state': {'message': 'goodbye'}}
+ self.assertRaises(TypeError, psc.load_power_state, cfg)
+
+ def test_valid_modes(self):
+ cfg = {'power_state': {}}
+ for mode in ('halt', 'poweroff', 'reboot'):
+ cfg['power_state']['mode'] = mode
+ check_lps_ret(psc.load_power_state(cfg), mode=mode)
+
+ def test_invalid_delay(self):
+ cfg = {'power_state': {'mode': 'poweroff', 'delay': 'goodbye'}}
+ self.assertRaises(TypeError, psc.load_power_state, cfg)
+
+ def test_valid_delay(self):
+ cfg = {'power_state': {'mode': 'poweroff', 'delay': ''}}
+ for delay in ("now", "+1", "+30"):
+ cfg['power_state']['delay'] = delay
+ check_lps_ret(psc.load_power_state(cfg))
+
+ def test_message_present(self):
+ cfg = {'power_state': {'mode': 'poweroff', 'message': 'GOODBYE'}}
+ ret = psc.load_power_state(cfg)
+ check_lps_ret(psc.load_power_state(cfg))
+ self.assertIn(cfg['power_state']['message'], ret[0])
+
+ def test_no_message(self):
+ # if message is not present, then no argument should be passed for it
+ cfg = {'power_state': {'mode': 'poweroff'}}
+ (cmd, _timeout) = psc.load_power_state(cfg)
+ self.assertNotIn("", cmd)
+ check_lps_ret(psc.load_power_state(cfg))
+ self.assertTrue(len(cmd) == 3)
+
+
+def check_lps_ret(psc_return, mode=None):
+ if len(psc_return) != 2:
+ raise TypeError("length returned = %d" % len(psc_return))
+
+ errs = []
+ cmd = psc_return[0]
+ timeout = psc_return[1]
+
+ if not 'shutdown' in psc_return[0][0]:
+ errs.append("string 'shutdown' not in cmd")
+
+ if mode is not None:
+ opt = {'halt': '-H', 'poweroff': '-P', 'reboot': '-r'}[mode]
+ if opt not in psc_return[0]:
+ errs.append("opt '%s' not in cmd: %s" % (opt, cmd))
+
+ if len(cmd) != 3 and len(cmd) != 4:
+ errs.append("Invalid command length: %s" % len(cmd))
+
+ try:
+ float(timeout)
+ except:
+ errs.append("timeout failed convert to float")
+
+ if len(errs):
+ lines = ["Errors in result: %s" % str(psc_return)] + errs
+ raise Exception('\n'.join(lines))
diff --git a/tests/unittests/test_handler/test_handler_set_hostname.py b/tests/unittests/test_handler/test_handler_set_hostname.py
new file mode 100644
index 00000000..a1aba62f
--- /dev/null
+++ b/tests/unittests/test_handler/test_handler_set_hostname.py
@@ -0,0 +1,58 @@
+from cloudinit.config import cc_set_hostname
+
+from cloudinit import cloud
+from cloudinit import distros
+from cloudinit import helpers
+from cloudinit import util
+
+from tests.unittests import helpers as t_help
+
+import logging
+
+from StringIO import StringIO
+
+from configobj import ConfigObj
+
+LOG = logging.getLogger(__name__)
+
+
+class TestHostname(t_help.FilesystemMockingTestCase):
+ def setUp(self):
+ super(TestHostname, self).setUp()
+ self.tmp = self.makeDir(prefix="unittest_")
+
+ def _fetch_distro(self, kind):
+ cls = distros.fetch(kind)
+ paths = helpers.Paths({})
+ return cls(kind, {}, paths)
+
+ def test_write_hostname_rhel(self):
+ cfg = {
+ 'hostname': 'blah.blah.blah.yahoo.com',
+ }
+ distro = self._fetch_distro('rhel')
+ paths = helpers.Paths({})
+ ds = None
+ cc = cloud.Cloud(ds, paths, {}, distro, None)
+ self.patchUtils(self.tmp)
+ self.patchOS(self.tmp)
+ cc_set_hostname.handle('cc_set_hostname',
+ cfg, cc, LOG, [])
+ contents = util.load_file("/etc/sysconfig/network")
+ n_cfg = ConfigObj(StringIO(contents))
+ self.assertEquals({'HOSTNAME': 'blah.blah.blah.yahoo.com'},
+ dict(n_cfg))
+
+ def test_write_hostname_debian(self):
+ cfg = {
+ 'hostname': 'blah.blah.blah.yahoo.com',
+ }
+ distro = self._fetch_distro('debian')
+ paths = helpers.Paths({})
+ ds = None
+ cc = cloud.Cloud(ds, paths, {}, distro, None)
+ self.patchUtils(self.tmp)
+ cc_set_hostname.handle('cc_set_hostname',
+ cfg, cc, LOG, [])
+ contents = util.load_file("/etc/hostname")
+ self.assertEquals('blah', contents.strip())
diff --git a/tests/unittests/test_handler/test_handler_yum_add_repo.py b/tests/unittests/test_handler/test_handler_yum_add_repo.py
new file mode 100644
index 00000000..8df592f9
--- /dev/null
+++ b/tests/unittests/test_handler/test_handler_yum_add_repo.py
@@ -0,0 +1,68 @@
+from cloudinit import helpers
+from cloudinit import util
+
+from cloudinit.config import cc_yum_add_repo
+
+from tests.unittests import helpers
+
+import logging
+
+from StringIO import StringIO
+
+import configobj
+
+LOG = logging.getLogger(__name__)
+
+
+class TestConfig(helpers.FilesystemMockingTestCase):
+ def setUp(self):
+ super(TestConfig, self).setUp()
+ self.tmp = self.makeDir(prefix="unittest_")
+
+ def test_bad_config(self):
+ cfg = {
+ 'yum_repos': {
+ 'epel-testing': {
+ 'name': 'Extra Packages for Enterprise Linux 5 - Testing',
+ # Missing this should cause the repo not to be written
+ #'baseurl': 'http://blah.org/pub/epel/testing/5/$basearch',
+ 'enabled': False,
+ 'gpgcheck': True,
+ 'gpgkey': 'file:///etc/pki/rpm-gpg/RPM-GPG-KEY-EPEL',
+ 'failovermethod': 'priority',
+ },
+ },
+ }
+ self.patchUtils(self.tmp)
+ cc_yum_add_repo.handle('yum_add_repo', cfg, None, LOG, [])
+ self.assertRaises(IOError, util.load_file,
+ "/etc/yum.repos.d/epel_testing.repo")
+
+ def test_write_config(self):
+ cfg = {
+ 'yum_repos': {
+ 'epel-testing': {
+ 'name': 'Extra Packages for Enterprise Linux 5 - Testing',
+ 'baseurl': 'http://blah.org/pub/epel/testing/5/$basearch',
+ 'enabled': False,
+ 'gpgcheck': True,
+ 'gpgkey': 'file:///etc/pki/rpm-gpg/RPM-GPG-KEY-EPEL',
+ 'failovermethod': 'priority',
+ },
+ },
+ }
+ self.patchUtils(self.tmp)
+ cc_yum_add_repo.handle('yum_add_repo', cfg, None, LOG, [])
+ contents = util.load_file("/etc/yum.repos.d/epel_testing.repo")
+ contents = configobj.ConfigObj(StringIO(contents))
+ expected = {
+ 'epel_testing': {
+ 'name': 'Extra Packages for Enterprise Linux 5 - Testing',
+ 'failovermethod': 'priority',
+ 'gpgkey': 'file:///etc/pki/rpm-gpg/RPM-GPG-KEY-EPEL',
+ 'enabled': '0',
+ 'baseurl': 'http://blah.org/pub/epel/testing/5/$basearch',
+ 'gpgcheck': '1',
+ }
+ }
+ self.assertEquals(expected, dict(contents))
diff --git a/tests/unittests/test_merging.py b/tests/unittests/test_merging.py
new file mode 100644
index 00000000..0037b966
--- /dev/null
+++ b/tests/unittests/test_merging.py
@@ -0,0 +1,62 @@
+from mocker import MockerTestCase
+
+from cloudinit import util
+
+
+class TestMergeDict(MockerTestCase):
+ def test_simple_merge(self):
+ """Test simple non-conflict merge."""
+ source = {"key1": "value1"}
+ candidate = {"key2": "value2"}
+ result = util.mergedict(source, candidate)
+ self.assertEqual({"key1": "value1", "key2": "value2"}, result)
+
+ def test_nested_merge(self):
+ """Test nested merge."""
+ source = {"key1": {"key1.1": "value1.1"}}
+ candidate = {"key1": {"key1.2": "value1.2"}}
+ result = util.mergedict(source, candidate)
+ self.assertEqual(
+ {"key1": {"key1.1": "value1.1", "key1.2": "value1.2"}}, result)
+
+ def test_merge_does_not_override(self):
+ """Test that candidate doesn't override source."""
+ source = {"key1": "value1", "key2": "value2"}
+ candidate = {"key1": "value2", "key2": "NEW VALUE"}
+ result = util.mergedict(source, candidate)
+ self.assertEqual(source, result)
+
+ def test_empty_candidate(self):
+ """Test empty candidate doesn't change source."""
+ source = {"key": "value"}
+ candidate = {}
+ result = util.mergedict(source, candidate)
+ self.assertEqual(source, result)
+
+ def test_empty_source(self):
+ """Test empty source is replaced by candidate."""
+ source = {}
+ candidate = {"key": "value"}
+ result = util.mergedict(source, candidate)
+ self.assertEqual(candidate, result)
+
+ def test_non_dict_candidate(self):
+ """Test non-dict candidate is discarded."""
+ source = {"key": "value"}
+ candidate = "not a dict"
+ result = util.mergedict(source, candidate)
+ self.assertEqual(source, result)
+
+ def test_non_dict_source(self):
+ """Test non-dict source is not modified with a dict candidate."""
+ source = "not a dict"
+ candidate = {"key": "value"}
+ result = util.mergedict(source, candidate)
+ self.assertEqual(source, result)
+
+ def test_neither_dict(self):
+ """Test if neither candidate or source is dict source wins."""
+ source = "source"
+ candidate = "candidate"
+ result = util.mergedict(source, candidate)
+ self.assertEqual(source, result)
diff --git a/tests/unittests/test_runs/__init__.py b/tests/unittests/test_runs/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/tests/unittests/test_runs/__init__.py
diff --git a/tests/unittests/test_runs/test_merge_run.py b/tests/unittests/test_runs/test_merge_run.py
new file mode 100644
index 00000000..d9c3a455
--- /dev/null
+++ b/tests/unittests/test_runs/test_merge_run.py
@@ -0,0 +1,52 @@
+import os
+
+from tests.unittests import helpers
+
+from cloudinit.settings import (PER_INSTANCE)
+from cloudinit import stages
+from cloudinit import util
+
+
+class TestMergeRun(helpers.FilesystemMockingTestCase):
+ def _patchIn(self, root):
+ self.restore()
+ self.patchOS(root)
+ self.patchUtils(root)
+
+ def test_none_ds(self):
+ new_root = self.makeDir()
+ self.replicateTestRoot('simple_ubuntu', new_root)
+ cfg = {
+ 'datasource_list': ['None'],
+ 'cloud_init_modules': ['write-files'],
+ }
+ ud = self.readResource('user_data.1.txt')
+ cloud_cfg = util.yaml_dumps(cfg)
+ util.ensure_dir(os.path.join(new_root, 'etc', 'cloud'))
+ util.write_file(os.path.join(new_root, 'etc',
+ 'cloud', 'cloud.cfg'), cloud_cfg)
+ self._patchIn(new_root)
+
+ # Now start verifying whats created
+ initer = stages.Init()
+ initer.read_cfg()
+ initer.initialize()
+ initer.fetch()
+ initer.datasource.userdata_raw = ud
+ _iid = initer.instancify()
+ initer.update()
+ initer.cloudify().run('consume_userdata',
+ initer.consume_userdata,
+ args=[PER_INSTANCE],
+ freq=PER_INSTANCE)
+ mirrors = initer.distro.get_option('package_mirrors')
+ self.assertEquals(1, len(mirrors))
+ mirror = mirrors[0]
+ self.assertEquals(mirror['arches'], ['i386', 'amd64', 'blah'])
+ mods = stages.Modules(initer)
+ (which_ran, failures) = mods.run_section('cloud_init_modules')
+ self.assertTrue(len(failures) == 0)
+ self.assertTrue(os.path.exists('/etc/blah.ini'))
+ self.assertIn('write-files', which_ran)
+ contents = util.load_file('/etc/blah.ini')
+ self.assertEquals(contents, 'blah')
diff --git a/tests/unittests/test_runs/test_simple_run.py b/tests/unittests/test_runs/test_simple_run.py
new file mode 100644
index 00000000..60ef812a
--- /dev/null
+++ b/tests/unittests/test_runs/test_simple_run.py
@@ -0,0 +1,80 @@
+import os
+
+from tests.unittests import helpers
+
+from cloudinit.settings import (PER_INSTANCE)
+from cloudinit import stages
+from cloudinit import util
+
+
+class TestSimpleRun(helpers.FilesystemMockingTestCase):
+ def _patchIn(self, root):
+ self.restore()
+ self.patchOS(root)
+ self.patchUtils(root)
+
+ def _pp_root(self, root, repatch=True):
+ self.restore()
+ for (dirpath, dirnames, filenames) in os.walk(root):
+ print(dirpath)
+ for f in filenames:
+ joined = os.path.join(dirpath, f)
+ if os.path.islink(joined):
+ print("f %s - (symlink)" % (f))
+ else:
+ print("f %s" % (f))
+ for d in dirnames:
+ joined = os.path.join(dirpath, d)
+ if os.path.islink(joined):
+ print("d %s - (symlink)" % (d))
+ else:
+ print("d %s" % (d))
+ if repatch:
+ self._patchIn(root)
+
+ def test_none_ds(self):
+ new_root = self.makeDir()
+ self.replicateTestRoot('simple_ubuntu', new_root)
+ cfg = {
+ 'datasource_list': ['None'],
+ 'write_files': [
+ {
+ 'path': '/etc/blah.ini',
+ 'content': 'blah',
+ 'permissions': 0755,
+ },
+ ],
+ 'cloud_init_modules': ['write-files'],
+ }
+ cloud_cfg = util.yaml_dumps(cfg)
+ util.ensure_dir(os.path.join(new_root, 'etc', 'cloud'))
+ util.write_file(os.path.join(new_root, 'etc',
+ 'cloud', 'cloud.cfg'), cloud_cfg)
+ self._patchIn(new_root)
+
+ # Now start verifying whats created
+ initer = stages.Init()
+ initer.read_cfg()
+ initer.initialize()
+ self.assertTrue(os.path.exists("/var/lib/cloud"))
+ for d in ['scripts', 'seed', 'instances', 'handlers', 'sem', 'data']:
+ self.assertTrue(os.path.isdir(os.path.join("/var/lib/cloud", d)))
+
+ initer.fetch()
+ iid = initer.instancify()
+ self.assertEquals(iid, 'iid-datasource-none')
+ initer.update()
+ self.assertTrue(os.path.islink("var/lib/cloud/instance"))
+
+ initer.cloudify().run('consume_userdata',
+ initer.consume_userdata,
+ args=[PER_INSTANCE],
+ freq=PER_INSTANCE)
+
+ mods = stages.Modules(initer)
+ (which_ran, failures) = mods.run_section('cloud_init_modules')
+ self.assertTrue(len(failures) == 0)
+ self.assertTrue(os.path.exists('/etc/blah.ini'))
+ self.assertIn('write-files', which_ran)
+ contents = util.load_file('/etc/blah.ini')
+ self.assertEquals(contents, 'blah')
diff --git a/tests/unittests/test_util.py b/tests/unittests/test_util.py
index 96962b91..02611581 100644
--- a/tests/unittests/test_util.py
+++ b/tests/unittests/test_util.py
@@ -28,65 +28,6 @@ class FakeSelinux(object):
self.restored.append(path)
-class TestMergeDict(MockerTestCase):
- def test_simple_merge(self):
- """Test simple non-conflict merge."""
- source = {"key1": "value1"}
- candidate = {"key2": "value2"}
- result = util.mergedict(source, candidate)
- self.assertEqual({"key1": "value1", "key2": "value2"}, result)
-
- def test_nested_merge(self):
- """Test nested merge."""
- source = {"key1": {"key1.1": "value1.1"}}
- candidate = {"key1": {"key1.2": "value1.2"}}
- result = util.mergedict(source, candidate)
- self.assertEqual(
- {"key1": {"key1.1": "value1.1", "key1.2": "value1.2"}}, result)
-
- def test_merge_does_not_override(self):
- """Test that candidate doesn't override source."""
- source = {"key1": "value1", "key2": "value2"}
- candidate = {"key1": "value2", "key2": "NEW VALUE"}
- result = util.mergedict(source, candidate)
- self.assertEqual(source, result)
-
- def test_empty_candidate(self):
- """Test empty candidate doesn't change source."""
- source = {"key": "value"}
- candidate = {}
- result = util.mergedict(source, candidate)
- self.assertEqual(source, result)
-
- def test_empty_source(self):
- """Test empty source is replaced by candidate."""
- source = {}
- candidate = {"key": "value"}
- result = util.mergedict(source, candidate)
- self.assertEqual(candidate, result)
-
- def test_non_dict_candidate(self):
- """Test non-dict candidate is discarded."""
- source = {"key": "value"}
- candidate = "not a dict"
- result = util.mergedict(source, candidate)
- self.assertEqual(source, result)
-
- def test_non_dict_source(self):
- """Test non-dict source is not modified with a dict candidate."""
- source = "not a dict"
- candidate = {"key": "value"}
- result = util.mergedict(source, candidate)
- self.assertEqual(source, result)
-
- def test_neither_dict(self):
- """Test if neither candidate or source is dict source wins."""
- source = "source"
- candidate = "candidate"
- result = util.mergedict(source, candidate)
- self.assertEqual(source, result)
-
-
class TestGetCfgOptionListOrStr(TestCase):
def test_not_found_no_default(self):
"""None is returned if key is not found and no default given."""