diff options
Diffstat (limited to 'tests/unittests')
25 files changed, 1579 insertions, 90 deletions
diff --git a/tests/unittests/__init__.py b/tests/unittests/__init__.py new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/tests/unittests/__init__.py diff --git a/tests/unittests/helpers.py b/tests/unittests/helpers.py index d0f09e70..92540b0c 100644 --- a/tests/unittests/helpers.py +++ b/tests/unittests/helpers.py @@ -1,8 +1,63 @@ import os +import sys +import unittest from mocker import MockerTestCase from cloudinit import helpers as ch +from cloudinit import util + +import shutil + +# Handle how 2.6 doesn't have the assertIn or assertNotIn +_PY_VER = sys.version_info +_PY_MAJOR, _PY_MINOR = _PY_VER[0:2] +if (_PY_MAJOR, _PY_MINOR) <= (2, 6): + # For now add these on, taken from python 2.7 + slightly adjusted + class TestCase(unittest.TestCase): + def assertIn(self, member, container, msg=None): + if member not in container: + standardMsg = '%r not found in %r' % (member, container) + self.fail(self._formatMessage(msg, standardMsg)) + + def assertNotIn(self, member, container, msg=None): + if member in container: + standardMsg = '%r unexpectedly found in %r' + standardMsg = standardMsg % (member, container) + self.fail(self._formatMessage(msg, standardMsg)) + +else: + class TestCase(unittest.TestCase): + pass + + +# Makes the old path start +# with new base instead of whatever +# it previously had +def rebase_path(old_path, new_base): + if old_path.startswith(new_base): + # Already handled... + return old_path + # Retarget the base of that path + # to the new base instead of the + # old one... + path = os.path.join(new_base, old_path.lstrip("/")) + path = os.path.abspath(path) + return path + + +# Can work on anything that takes a path as arguments +def retarget_many_wrapper(new_base, am, old_func): + def wrapper(*args, **kwds): + n_args = list(args) + nam = am + if am == -1: + nam = len(n_args) + for i in range(0, nam): + path = args[i] + n_args[i] = rebase_path(path, new_base) + return old_func(*n_args, **kwds) + return wrapper class ResourceUsingTestCase(MockerTestCase): @@ -40,3 +95,76 @@ class ResourceUsingTestCase(MockerTestCase): 'templates_dir': self.resourceLocation(), }) return cp + + +class FilesystemMockingTestCase(ResourceUsingTestCase): + def __init__(self, methodName="runTest"): + ResourceUsingTestCase.__init__(self, methodName) + self.patched_funcs = [] + + def replicateTestRoot(self, example_root, target_root): + real_root = self.resourceLocation() + real_root = os.path.join(real_root, 'roots', example_root) + for (dir_path, _dirnames, filenames) in os.walk(real_root): + real_path = dir_path + make_path = rebase_path(real_path[len(real_root):], target_root) + util.ensure_dir(make_path) + for f in filenames: + real_path = util.abs_join(real_path, f) + make_path = util.abs_join(make_path, f) + shutil.copy(real_path, make_path) + + def tearDown(self): + self.restore() + ResourceUsingTestCase.tearDown(self) + + def restore(self): + for (mod, f, func) in self.patched_funcs: + setattr(mod, f, func) + self.patched_funcs = [] + + def patchUtils(self, new_root): + patch_funcs = { + util: [('write_file', 1), + ('append_file', 1), + ('load_file', 1), + ('ensure_dir', 1), + ('chmod', 1), + ('delete_dir_contents', 1), + ('del_file', 1), + ('sym_link', -1)], + } + for (mod, funcs) in patch_funcs.items(): + for (f, am) in funcs: + func = getattr(mod, f) + trap_func = retarget_many_wrapper(new_root, am, func) + setattr(mod, f, trap_func) + self.patched_funcs.append((mod, f, func)) + + # Handle subprocess calls + func = getattr(util, 'subp') + + def nsubp(*_args, **_kwargs): + return ('', '') + + setattr(util, 'subp', nsubp) + self.patched_funcs.append((util, 'subp', func)) + + def null_func(*_args, **_kwargs): + return None + + for f in ['chownbyid', 'chownbyname']: + func = getattr(util, f) + setattr(util, f, null_func) + self.patched_funcs.append((util, f, func)) + + def patchOS(self, new_root): + patch_funcs = { + os.path: ['isfile', 'exists', 'islink', 'isdir'], + } + for (mod, funcs) in patch_funcs.items(): + for f in funcs: + func = getattr(mod, f) + trap_func = retarget_many_wrapper(new_root, 1, func) + setattr(mod, f, trap_func) + self.patched_funcs.append((mod, f, func)) diff --git a/tests/unittests/test_builtin_handlers.py b/tests/unittests/test_builtin_handlers.py index ebc0bd51..5f41cb3d 100644 --- a/tests/unittests/test_builtin_handlers.py +++ b/tests/unittests/test_builtin_handlers.py @@ -6,6 +6,7 @@ from mocker import MockerTestCase from cloudinit import handlers from cloudinit import helpers +from cloudinit import util from cloudinit.handlers import upstart_job @@ -34,6 +35,7 @@ class TestBuiltins(MockerTestCase): self.assertEquals(0, len(os.listdir(up_root))) def test_upstart_frequency_single(self): + # files should be written out when frequency is ! per-instance c_root = self.makeDir() up_root = self.makeDir() paths = helpers.Paths({ @@ -41,9 +43,12 @@ class TestBuiltins(MockerTestCase): 'upstart_dir': up_root, }) freq = PER_INSTANCE + + mock_subp = self.mocker.replace(util.subp, passthrough=False) + mock_subp(["initctl", "reload-configuration"], capture=False) + self.mocker.replay() + h = upstart_job.UpstartJobPartHandler(paths) - # No files should be written out when - # the frequency is ! per-instance h.handle_part('', handlers.CONTENT_START, None, None, None) h.handle_part('blah', 'text/upstart-job', diff --git a/tests/unittests/test_datasource/__init__.py b/tests/unittests/test_datasource/__init__.py new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/tests/unittests/test_datasource/__init__.py diff --git a/tests/unittests/test_datasource/test_configdrive.py b/tests/unittests/test_datasource/test_configdrive.py index 55573114..aa5b98ed 100644 --- a/tests/unittests/test_datasource/test_configdrive.py +++ b/tests/unittests/test_datasource/test_configdrive.py @@ -2,10 +2,12 @@ from copy import copy import json import os import os.path -import shutil -import tempfile -from unittest import TestCase +import mocker +from mocker import MockerTestCase + +from cloudinit import helpers +from cloudinit import settings from cloudinit.sources import DataSourceConfigDrive as ds from cloudinit import util @@ -60,17 +62,140 @@ CFG_DRIVE_FILES_V2 = { 'openstack/latest/user_data': USER_DATA} -class TestConfigDriveDataSource(TestCase): +class TestConfigDriveDataSource(MockerTestCase): def setUp(self): super(TestConfigDriveDataSource, self).setUp() - self.tmp = tempfile.mkdtemp() + self.tmp = self.makeDir() - def tearDown(self): - try: - shutil.rmtree(self.tmp) - except OSError: - pass + def test_ec2_metadata(self): + populate_dir(self.tmp, CFG_DRIVE_FILES_V2) + found = ds.read_config_drive_dir(self.tmp) + self.assertTrue('ec2-metadata' in found) + ec2_md = found['ec2-metadata'] + self.assertEqual(EC2_META, ec2_md) + + def test_dev_os_remap(self): + populate_dir(self.tmp, CFG_DRIVE_FILES_V2) + cfg_ds = ds.DataSourceConfigDrive(settings.CFG_BUILTIN, + None, + helpers.Paths({})) + found = ds.read_config_drive_dir(self.tmp) + cfg_ds.metadata = found['metadata'] + name_tests = { + 'ami': '/dev/vda1', + 'root': '/dev/vda1', + 'ephemeral0': '/dev/vda2', + 'swap': '/dev/vda3', + } + for name, dev_name in name_tests.items(): + my_mock = mocker.Mocker() + find_mock = my_mock.replace(util.find_devs_with, + spec=False, passthrough=False) + provided_name = dev_name[len('/dev/'):] + provided_name = "s" + provided_name[1:] + find_mock(mocker.ARGS) + my_mock.result([provided_name]) + exists_mock = my_mock.replace(os.path.exists, + spec=False, passthrough=False) + exists_mock(mocker.ARGS) + my_mock.result(False) + exists_mock(mocker.ARGS) + my_mock.result(True) + my_mock.replay() + device = cfg_ds.device_name_to_device(name) + my_mock.restore() + self.assertEquals(dev_name, device) + + def test_dev_os_map(self): + populate_dir(self.tmp, CFG_DRIVE_FILES_V2) + cfg_ds = ds.DataSourceConfigDrive(settings.CFG_BUILTIN, + None, + helpers.Paths({})) + found = ds.read_config_drive_dir(self.tmp) + os_md = found['metadata'] + cfg_ds.metadata = os_md + name_tests = { + 'ami': '/dev/vda1', + 'root': '/dev/vda1', + 'ephemeral0': '/dev/vda2', + 'swap': '/dev/vda3', + } + for name, dev_name in name_tests.items(): + my_mock = mocker.Mocker() + find_mock = my_mock.replace(util.find_devs_with, + spec=False, passthrough=False) + find_mock(mocker.ARGS) + my_mock.result([dev_name]) + exists_mock = my_mock.replace(os.path.exists, + spec=False, passthrough=False) + exists_mock(mocker.ARGS) + my_mock.result(True) + my_mock.replay() + device = cfg_ds.device_name_to_device(name) + my_mock.restore() + self.assertEquals(dev_name, device) + + def test_dev_ec2_remap(self): + populate_dir(self.tmp, CFG_DRIVE_FILES_V2) + cfg_ds = ds.DataSourceConfigDrive(settings.CFG_BUILTIN, + None, + helpers.Paths({})) + found = ds.read_config_drive_dir(self.tmp) + ec2_md = found['ec2-metadata'] + os_md = found['metadata'] + cfg_ds.ec2_metadata = ec2_md + cfg_ds.metadata = os_md + name_tests = { + 'ami': '/dev/vda1', + 'root': '/dev/vda1', + 'ephemeral0': '/dev/vda2', + 'swap': '/dev/vda3', + None: None, + 'bob': None, + 'root2k': None, + } + for name, dev_name in name_tests.items(): + my_mock = mocker.Mocker() + exists_mock = my_mock.replace(os.path.exists, + spec=False, passthrough=False) + exists_mock(mocker.ARGS) + my_mock.result(False) + exists_mock(mocker.ARGS) + my_mock.result(True) + my_mock.replay() + device = cfg_ds.device_name_to_device(name) + self.assertEquals(dev_name, device) + my_mock.restore() + + def test_dev_ec2_map(self): + populate_dir(self.tmp, CFG_DRIVE_FILES_V2) + cfg_ds = ds.DataSourceConfigDrive(settings.CFG_BUILTIN, + None, + helpers.Paths({})) + found = ds.read_config_drive_dir(self.tmp) + exists_mock = self.mocker.replace(os.path.exists, + spec=False, passthrough=False) + exists_mock(mocker.ARGS) + self.mocker.count(0, None) + self.mocker.result(True) + self.mocker.replay() + ec2_md = found['ec2-metadata'] + os_md = found['metadata'] + cfg_ds.ec2_metadata = ec2_md + cfg_ds.metadata = os_md + name_tests = { + 'ami': '/dev/sda1', + 'root': '/dev/sda1', + 'ephemeral0': '/dev/sda2', + 'swap': '/dev/sda3', + None: None, + 'bob': None, + 'root2k': None, + } + for name, dev_name in name_tests.items(): + device = cfg_ds.device_name_to_device(name) + self.assertEquals(dev_name, device) def test_dir_valid(self): """Verify a dir is read as such.""" @@ -163,6 +288,32 @@ class TestConfigDriveDataSource(TestCase): finally: util.find_devs_with = orig_find_devs_with + def test_pubkeys_v2(self): + """Verify that public-keys work in config-drive-v2.""" + populate_dir(self.tmp, CFG_DRIVE_FILES_V2) + myds = cfg_ds_from_dir(self.tmp) + self.assertEqual(myds.get_public_ssh_keys(), + [OSTACK_META['public_keys']['mykey']]) + + +def cfg_ds_from_dir(seed_d): + found = ds.read_config_drive_dir(seed_d) + cfg_ds = ds.DataSourceConfigDrive(settings.CFG_BUILTIN, None, + helpers.Paths({})) + populate_ds_from_read_config(cfg_ds, seed_d, found) + return cfg_ds + + +def populate_ds_from_read_config(cfg_ds, source, results): + """Patch the DataSourceConfigDrive from the results of + read_config_drive_dir hopefully in line with what it would have + if cfg_ds.get_data had been successfully called""" + cfg_ds.source = source + cfg_ds.metadata = results.get('metadata') + cfg_ds.ec2_metadata = results.get('ec2-metadata') + cfg_ds.userdata_raw = results.get('userdata') + cfg_ds.version = results.get('cfgdrive_ver') + def populate_dir(seed_dir, files): for (name, content) in files.iteritems(): diff --git a/tests/unittests/test_distros/__init__.py b/tests/unittests/test_distros/__init__.py new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/tests/unittests/test_distros/__init__.py diff --git a/tests/unittests/test_distros/test_generic.py b/tests/unittests/test_distros/test_generic.py index 2df4c2f0..7befb8c8 100644 --- a/tests/unittests/test_distros/test_generic.py +++ b/tests/unittests/test_distros/test_generic.py @@ -1,6 +1,9 @@ -from mocker import MockerTestCase - from cloudinit import distros +from cloudinit import util + +from tests.unittests import helpers + +import os unknown_arch_info = { 'arches': ['default'], @@ -27,7 +30,7 @@ gpmi = distros._get_package_mirror_info # pylint: disable=W0212 gapmi = distros._get_arch_package_mirror_info # pylint: disable=W0212 -class TestGenericDistro(MockerTestCase): +class TestGenericDistro(helpers.FilesystemMockingTestCase): def return_first(self, mlist): if not mlist: @@ -52,6 +55,82 @@ class TestGenericDistro(MockerTestCase): # Make a temp directoy for tests to use. self.tmp = self.makeDir() + def _write_load_sudoers(self, _user, rules): + cls = distros.fetch("ubuntu") + d = cls("ubuntu", {}, None) + os.makedirs(os.path.join(self.tmp, "etc")) + os.makedirs(os.path.join(self.tmp, "etc", 'sudoers.d')) + self.patchOS(self.tmp) + self.patchUtils(self.tmp) + d.write_sudo_rules("harlowja", rules) + contents = util.load_file(d.ci_sudoers_fn) + self.restore() + return contents + + def _count_in(self, lines_look_for, text_content): + found_amount = 0 + for e in lines_look_for: + for line in text_content.splitlines(): + line = line.strip() + if line == e: + found_amount += 1 + return found_amount + + def test_sudoers_ensure_rules(self): + rules = 'ALL=(ALL:ALL) ALL' + contents = self._write_load_sudoers('harlowja', rules) + expected = ['harlowja ALL=(ALL:ALL) ALL'] + self.assertEquals(len(expected), self._count_in(expected, contents)) + not_expected = [ + 'harlowja A', + 'harlowja L', + 'harlowja L', + ] + self.assertEquals(0, self._count_in(not_expected, contents)) + + def test_sudoers_ensure_rules_list(self): + rules = [ + 'ALL=(ALL:ALL) ALL', + 'B-ALL=(ALL:ALL) ALL', + 'C-ALL=(ALL:ALL) ALL', + ] + contents = self._write_load_sudoers('harlowja', rules) + expected = [ + 'harlowja ALL=(ALL:ALL) ALL', + 'harlowja B-ALL=(ALL:ALL) ALL', + 'harlowja C-ALL=(ALL:ALL) ALL', + ] + self.assertEquals(len(expected), self._count_in(expected, contents)) + not_expected = [ + 'harlowja A', + 'harlowja L', + 'harlowja L', + ] + self.assertEquals(0, self._count_in(not_expected, contents)) + + def test_sudoers_ensure_new(self): + cls = distros.fetch("ubuntu") + d = cls("ubuntu", {}, None) + self.patchOS(self.tmp) + self.patchUtils(self.tmp) + d.ensure_sudo_dir("/b") + contents = util.load_file("/etc/sudoers") + self.assertIn("includedir /b", contents) + self.assertTrue(os.path.isdir("/b")) + + def test_sudoers_ensure_append(self): + cls = distros.fetch("ubuntu") + d = cls("ubuntu", {}, None) + self.patchOS(self.tmp) + self.patchUtils(self.tmp) + util.write_file("/etc/sudoers", "josh, josh\n") + d.ensure_sudo_dir("/b") + contents = util.load_file("/etc/sudoers") + self.assertIn("includedir /b", contents) + self.assertTrue(os.path.isdir("/b")) + self.assertIn("josh", contents) + self.assertEquals(2, contents.count("josh")) + def test_arch_package_mirror_info_unknown(self): """for an unknown arch, we should get back that with arch 'default'.""" arch_mirrors = gapmi(package_mirrors, arch="unknown") diff --git a/tests/unittests/test_distros/test_hostname.py b/tests/unittests/test_distros/test_hostname.py new file mode 100644 index 00000000..8e644f4d --- /dev/null +++ b/tests/unittests/test_distros/test_hostname.py @@ -0,0 +1,38 @@ +from mocker import MockerTestCase + +from cloudinit.distros.parsers import hostname + + +BASE_HOSTNAME = ''' +# My super-duper-hostname + +blahblah + +''' +BASE_HOSTNAME = BASE_HOSTNAME.strip() + + +class TestHostnameHelper(MockerTestCase): + def test_parse_same(self): + hn = hostname.HostnameConf(BASE_HOSTNAME) + self.assertEquals(str(hn).strip(), BASE_HOSTNAME) + self.assertEquals(hn.hostname, 'blahblah') + + def test_no_adjust_hostname(self): + hn = hostname.HostnameConf(BASE_HOSTNAME) + prev_name = hn.hostname + hn.set_hostname("") + self.assertEquals(hn.hostname, prev_name) + + def test_adjust_hostname(self): + hn = hostname.HostnameConf(BASE_HOSTNAME) + prev_name = hn.hostname + self.assertEquals(prev_name, 'blahblah') + hn.set_hostname("bbbbd") + self.assertEquals(hn.hostname, 'bbbbd') + expected_out = ''' +# My super-duper-hostname + +bbbbd +''' + self.assertEquals(str(hn).strip(), expected_out.strip()) diff --git a/tests/unittests/test_distros/test_hosts.py b/tests/unittests/test_distros/test_hosts.py new file mode 100644 index 00000000..687a0dab --- /dev/null +++ b/tests/unittests/test_distros/test_hosts.py @@ -0,0 +1,41 @@ +from mocker import MockerTestCase + +from cloudinit.distros.parsers import hosts + + +BASE_ETC = ''' +# Example +127.0.0.1 localhost +192.168.1.10 foo.mydomain.org foo +192.168.1.10 bar.mydomain.org bar +146.82.138.7 master.debian.org master +209.237.226.90 www.opensource.org +''' +BASE_ETC = BASE_ETC.strip() + + +class TestHostsHelper(MockerTestCase): + def test_parse(self): + eh = hosts.HostsConf(BASE_ETC) + self.assertEquals(eh.get_entry('127.0.0.1'), [['localhost']]) + self.assertEquals(eh.get_entry('192.168.1.10'), + [['foo.mydomain.org', 'foo'], + ['bar.mydomain.org', 'bar']]) + eh = str(eh) + self.assertTrue(eh.startswith('# Example')) + + def test_add(self): + eh = hosts.HostsConf(BASE_ETC) + eh.add_entry('127.0.0.0', 'blah') + self.assertEquals(eh.get_entry('127.0.0.0'), [['blah']]) + eh.add_entry('127.0.0.3', 'blah', 'blah2', 'blah3') + self.assertEquals(eh.get_entry('127.0.0.3'), + [['blah', 'blah2', 'blah3']]) + + def test_del(self): + eh = hosts.HostsConf(BASE_ETC) + eh.add_entry('127.0.0.0', 'blah') + self.assertEquals(eh.get_entry('127.0.0.0'), [['blah']]) + + eh.del_entries('127.0.0.0') + self.assertEquals(eh.get_entry('127.0.0.0'), []) diff --git a/tests/unittests/test_distros/test_netconfig.py b/tests/unittests/test_distros/test_netconfig.py new file mode 100644 index 00000000..9763b14b --- /dev/null +++ b/tests/unittests/test_distros/test_netconfig.py @@ -0,0 +1,175 @@ +from mocker import MockerTestCase + +import mocker + +import os + +from cloudinit import distros +from cloudinit import helpers +from cloudinit import settings +from cloudinit import util + +from cloudinit.distros.parsers.sys_conf import SysConf + +from StringIO import StringIO + + +BASE_NET_CFG = ''' +auto lo +iface lo inet loopback + +auto eth0 +iface eth0 inet static + address 192.168.1.5 + netmask 255.255.255.0 + network 192.168.0.0 + broadcast 192.168.1.0 + gateway 192.168.1.254 + +auto eth1 +iface eth1 inet dhcp +''' + + +class WriteBuffer(object): + def __init__(self): + self.buffer = StringIO() + self.mode = None + self.omode = None + + def write(self, text): + self.buffer.write(text) + + def __str__(self): + return self.buffer.getvalue() + + +class TestNetCfgDistro(MockerTestCase): + + def _get_distro(self, dname): + cls = distros.fetch(dname) + cfg = settings.CFG_BUILTIN + cfg['system_info']['distro'] = dname + paths = helpers.Paths({}) + return cls(dname, cfg, paths) + + def test_simple_write_ub(self): + ub_distro = self._get_distro('ubuntu') + util_mock = self.mocker.replace(util.write_file, + spec=False, passthrough=False) + exists_mock = self.mocker.replace(os.path.isfile, + spec=False, passthrough=False) + + exists_mock(mocker.ARGS) + self.mocker.count(0, None) + self.mocker.result(False) + + write_bufs = {} + + def replace_write(filename, content, mode=0644, omode="wb"): + buf = WriteBuffer() + buf.mode = mode + buf.omode = omode + buf.write(content) + write_bufs[filename] = buf + + util_mock(mocker.ARGS) + self.mocker.call(replace_write) + self.mocker.replay() + ub_distro.apply_network(BASE_NET_CFG, False) + + self.assertEquals(len(write_bufs), 1) + self.assertIn('/etc/network/interfaces', write_bufs) + write_buf = write_bufs['/etc/network/interfaces'] + self.assertEquals(str(write_buf).strip(), BASE_NET_CFG.strip()) + self.assertEquals(write_buf.mode, 0644) + + def assertCfgEquals(self, blob1, blob2): + b1 = dict(SysConf(blob1.strip().splitlines())) + b2 = dict(SysConf(blob2.strip().splitlines())) + self.assertEquals(b1, b2) + for (k, v) in b1.items(): + self.assertIn(k, b2) + for (k, v) in b2.items(): + self.assertIn(k, b1) + for (k, v) in b1.items(): + self.assertEquals(v, b2[k]) + + def test_simple_write_rh(self): + rh_distro = self._get_distro('rhel') + write_mock = self.mocker.replace(util.write_file, + spec=False, passthrough=False) + load_mock = self.mocker.replace(util.load_file, + spec=False, passthrough=False) + exists_mock = self.mocker.replace(os.path.isfile, + spec=False, passthrough=False) + + write_bufs = {} + + def replace_write(filename, content, mode=0644, omode="wb"): + buf = WriteBuffer() + buf.mode = mode + buf.omode = omode + buf.write(content) + write_bufs[filename] = buf + + exists_mock(mocker.ARGS) + self.mocker.count(0, None) + self.mocker.result(False) + + load_mock(mocker.ARGS) + self.mocker.count(0, None) + self.mocker.result('') + + for _i in range(0, 3): + write_mock(mocker.ARGS) + self.mocker.call(replace_write) + + write_mock(mocker.ARGS) + self.mocker.call(replace_write) + + self.mocker.replay() + rh_distro.apply_network(BASE_NET_CFG, False) + + self.assertEquals(len(write_bufs), 4) + self.assertIn('/etc/sysconfig/network-scripts/ifcfg-lo', write_bufs) + write_buf = write_bufs['/etc/sysconfig/network-scripts/ifcfg-lo'] + expected_buf = ''' +DEVICE="lo" +ONBOOT=yes +''' + self.assertCfgEquals(expected_buf, str(write_buf)) + self.assertEquals(write_buf.mode, 0644) + + self.assertIn('/etc/sysconfig/network-scripts/ifcfg-eth0', write_bufs) + write_buf = write_bufs['/etc/sysconfig/network-scripts/ifcfg-eth0'] + expected_buf = ''' +DEVICE="eth0" +BOOTPROTO="static" +NETMASK="255.255.255.0" +IPADDR="192.168.1.5" +ONBOOT=yes +GATEWAY="192.168.1.254" +BROADCAST="192.168.1.0" +''' + self.assertCfgEquals(expected_buf, str(write_buf)) + self.assertEquals(write_buf.mode, 0644) + + self.assertIn('/etc/sysconfig/network-scripts/ifcfg-eth1', write_bufs) + write_buf = write_bufs['/etc/sysconfig/network-scripts/ifcfg-eth1'] + expected_buf = ''' +DEVICE="eth1" +BOOTPROTO="dhcp" +ONBOOT=yes +''' + self.assertCfgEquals(expected_buf, str(write_buf)) + self.assertEquals(write_buf.mode, 0644) + + self.assertIn('/etc/sysconfig/network', write_bufs) + write_buf = write_bufs['/etc/sysconfig/network'] + expected_buf = ''' +# Created by cloud-init v. 0.7 +NETWORKING=yes +''' + self.assertCfgEquals(expected_buf, str(write_buf)) + self.assertEquals(write_buf.mode, 0644) diff --git a/tests/unittests/test_distros/test_resolv.py b/tests/unittests/test_distros/test_resolv.py new file mode 100644 index 00000000..6b6ff6aa --- /dev/null +++ b/tests/unittests/test_distros/test_resolv.py @@ -0,0 +1,61 @@ +from mocker import MockerTestCase + +from cloudinit.distros.parsers import resolv_conf + +import re + + +BASE_RESOLVE = ''' +; generated by /sbin/dhclient-script +search blah.yahoo.com yahoo.com +nameserver 10.15.44.14 +nameserver 10.15.30.92 +''' +BASE_RESOLVE = BASE_RESOLVE.strip() + + +class TestResolvHelper(MockerTestCase): + def test_parse_same(self): + rp = resolv_conf.ResolvConf(BASE_RESOLVE) + rp_r = str(rp).strip() + self.assertEquals(BASE_RESOLVE, rp_r) + + def test_local_domain(self): + rp = resolv_conf.ResolvConf(BASE_RESOLVE) + self.assertEquals(None, rp.local_domain) + + rp.local_domain = "bob" + self.assertEquals('bob', rp.local_domain) + self.assertIn('domain bob', str(rp)) + + def test_nameservers(self): + rp = resolv_conf.ResolvConf(BASE_RESOLVE) + self.assertIn('10.15.44.14', rp.nameservers) + self.assertIn('10.15.30.92', rp.nameservers) + rp.add_nameserver('10.2') + self.assertIn('10.2', rp.nameservers) + self.assertIn('nameserver 10.2', str(rp)) + self.assertNotIn('10.3', rp.nameservers) + self.assertEquals(len(rp.nameservers), 3) + rp.add_nameserver('10.2') + self.assertRaises(ValueError, rp.add_nameserver, '10.3') + self.assertNotIn('10.3', rp.nameservers) + + def test_search_domains(self): + rp = resolv_conf.ResolvConf(BASE_RESOLVE) + self.assertIn('yahoo.com', rp.search_domains) + self.assertIn('blah.yahoo.com', rp.search_domains) + rp.add_search_domain('bbb.y.com') + self.assertIn('bbb.y.com', rp.search_domains) + self.assertTrue(re.search(r'search(.*)bbb.y.com(.*)', str(rp))) + self.assertIn('bbb.y.com', rp.search_domains) + rp.add_search_domain('bbb.y.com') + self.assertEquals(len(rp.search_domains), 3) + rp.add_search_domain('bbb2.y.com') + self.assertEquals(len(rp.search_domains), 4) + rp.add_search_domain('bbb3.y.com') + self.assertEquals(len(rp.search_domains), 5) + rp.add_search_domain('bbb4.y.com') + self.assertEquals(len(rp.search_domains), 6) + self.assertRaises(ValueError, rp.add_search_domain, 'bbb5.y.com') + self.assertEquals(len(rp.search_domains), 6) diff --git a/tests/unittests/test_distros/test_sysconfig.py b/tests/unittests/test_distros/test_sysconfig.py new file mode 100644 index 00000000..0c651407 --- /dev/null +++ b/tests/unittests/test_distros/test_sysconfig.py @@ -0,0 +1,82 @@ +from mocker import MockerTestCase + +import re + +from cloudinit.distros.parsers.sys_conf import SysConf + + +# Lots of good examples @ +# http://content.hccfl.edu/pollock/AUnix1/SysconfigFilesDesc.txt + +class TestSysConfHelper(MockerTestCase): + # This function was added in 2.7, make it work for 2.6 + def assertRegMatches(self, text, regexp): + regexp = re.compile(regexp) + self.assertTrue(regexp.search(text), + msg="%s must match %s!" % (text, regexp.pattern)) + + def test_parse_no_change(self): + contents = '''# A comment +USESMBAUTH=no +KEYTABLE=/usr/lib/kbd/keytables/us.map +SHORTDATE=$(date +%y:%m:%d:%H:%M) +HOSTNAME=blahblah +NETMASK0=255.255.255.0 +# Inline comment +LIST=$LOGROOT/incremental-list +IPV6TO4_ROUTING='eth0-:0004::1/64 eth1-:0005::1/64' +ETHTOOL_OPTS="-K ${DEVICE} tso on; -G ${DEVICE} rx 256 tx 256" +USEMD5=no''' + conf = SysConf(contents.splitlines()) + self.assertEquals(conf['HOSTNAME'], 'blahblah') + self.assertEquals(conf['SHORTDATE'], '$(date +%y:%m:%d:%H:%M)') + # Should be unquoted + self.assertEquals(conf['ETHTOOL_OPTS'], ('-K ${DEVICE} tso on; ' + '-G ${DEVICE} rx 256 tx 256')) + self.assertEquals(contents, str(conf)) + + def test_parse_shell_vars(self): + contents = 'USESMBAUTH=$XYZ' + conf = SysConf(contents.splitlines()) + self.assertEquals(contents, str(conf)) + conf = SysConf('') + conf['B'] = '${ZZ}d apples' + # Should be quoted + self.assertEquals('B="${ZZ}d apples"', str(conf)) + conf = SysConf('') + conf['B'] = '$? d apples' + self.assertEquals('B="$? d apples"', str(conf)) + contents = 'IPMI_WATCHDOG_OPTIONS="timeout=60"' + conf = SysConf(contents.splitlines()) + self.assertEquals('IPMI_WATCHDOG_OPTIONS=timeout=60', str(conf)) + + def test_parse_adjust(self): + contents = 'IPV6TO4_ROUTING="eth0-:0004::1/64 eth1-:0005::1/64"' + conf = SysConf(contents.splitlines()) + # Should be unquoted + self.assertEquals('eth0-:0004::1/64 eth1-:0005::1/64', + conf['IPV6TO4_ROUTING']) + conf['IPV6TO4_ROUTING'] = "blah \tblah" + contents2 = str(conf).strip() + # Should be requoted due to whitespace + self.assertRegMatches(contents2, + r'IPV6TO4_ROUTING=[\']blah\s+blah[\']') + + def test_parse_no_adjust_shell(self): + conf = SysConf(''.splitlines()) + conf['B'] = ' $(time)' + contents = str(conf) + self.assertEquals('B= $(time)', contents) + + def test_parse_empty(self): + contents = '' + conf = SysConf(contents.splitlines()) + self.assertEquals('', str(conf).strip()) + + def test_parse_add_new(self): + contents = 'BLAH=b' + conf = SysConf(contents.splitlines()) + conf['Z'] = 'd' + contents = str(conf) + self.assertIn("Z=d", contents) + self.assertIn("BLAH=b", contents) diff --git a/tests/unittests/test_distros/test_user_data_normalize.py b/tests/unittests/test_distros/test_user_data_normalize.py new file mode 100644 index 00000000..5d9d4311 --- /dev/null +++ b/tests/unittests/test_distros/test_user_data_normalize.py @@ -0,0 +1,301 @@ +from mocker import MockerTestCase + +from cloudinit import distros +from cloudinit import helpers +from cloudinit import settings + +bcfg = { + 'name': 'bob', + 'plain_text_passwd': 'ubuntu', + 'home': "/home/ubuntu", + 'shell': "/bin/bash", + 'lock_passwd': True, + 'gecos': "Ubuntu", + 'groups': ["foo"] +} + + +class TestUGNormalize(MockerTestCase): + + def _make_distro(self, dtype, def_user=None): + cfg = dict(settings.CFG_BUILTIN) + cfg['system_info']['distro'] = dtype + paths = helpers.Paths(cfg['system_info']['paths']) + distro_cls = distros.fetch(dtype) + if def_user: + cfg['system_info']['default_user'] = def_user.copy() + distro = distro_cls(dtype, cfg['system_info'], paths) + return distro + + def _norm(self, cfg, distro): + return distros.normalize_users_groups(cfg, distro) + + def test_group_dict(self): + distro = self._make_distro('ubuntu') + g = {'groups': [ + { + 'ubuntu': ['foo', 'bar'], + 'bob': 'users', + }, + 'cloud-users', + { + 'bob': 'users2', + }, + ] + } + (_users, groups) = self._norm(g, distro) + self.assertIn('ubuntu', groups) + ub_members = groups['ubuntu'] + self.assertEquals(sorted(['foo', 'bar']), sorted(ub_members)) + self.assertIn('bob', groups) + b_members = groups['bob'] + self.assertEquals(sorted(['users', 'users2']), + sorted(b_members)) + + def test_basic_groups(self): + distro = self._make_distro('ubuntu') + ug_cfg = { + 'groups': ['bob'], + } + (users, groups) = self._norm(ug_cfg, distro) + self.assertIn('bob', groups) + self.assertEquals({}, users) + + def test_csv_groups(self): + distro = self._make_distro('ubuntu') + ug_cfg = { + 'groups': 'bob,joe,steve', + } + (users, groups) = self._norm(ug_cfg, distro) + self.assertIn('bob', groups) + self.assertIn('joe', groups) + self.assertIn('steve', groups) + self.assertEquals({}, users) + + def test_more_groups(self): + distro = self._make_distro('ubuntu') + ug_cfg = { + 'groups': ['bob', 'joe', 'steve'] + } + (users, groups) = self._norm(ug_cfg, distro) + self.assertIn('bob', groups) + self.assertIn('joe', groups) + self.assertIn('steve', groups) + self.assertEquals({}, users) + + def test_member_groups(self): + distro = self._make_distro('ubuntu') + ug_cfg = { + 'groups': { + 'bob': ['s'], + 'joe': [], + 'steve': [], + } + } + (users, groups) = self._norm(ug_cfg, distro) + self.assertIn('bob', groups) + self.assertEquals(['s'], groups['bob']) + self.assertEquals([], groups['joe']) + self.assertIn('joe', groups) + self.assertIn('steve', groups) + self.assertEquals({}, users) + + def test_users_simple_dict(self): + distro = self._make_distro('ubuntu', bcfg) + ug_cfg = { + 'users': { + 'default': True, + } + } + (users, _groups) = self._norm(ug_cfg, distro) + self.assertIn('bob', users) + ug_cfg = { + 'users': { + 'default': 'yes', + } + } + (users, _groups) = self._norm(ug_cfg, distro) + self.assertIn('bob', users) + ug_cfg = { + 'users': { + 'default': '1', + } + } + (users, _groups) = self._norm(ug_cfg, distro) + self.assertIn('bob', users) + + def test_users_simple_dict_no(self): + distro = self._make_distro('ubuntu', bcfg) + ug_cfg = { + 'users': { + 'default': False, + } + } + (users, _groups) = self._norm(ug_cfg, distro) + self.assertEquals({}, users) + ug_cfg = { + 'users': { + 'default': 'no', + } + } + (users, _groups) = self._norm(ug_cfg, distro) + self.assertEquals({}, users) + + def test_users_simple_csv(self): + distro = self._make_distro('ubuntu') + ug_cfg = { + 'users': 'joe,bob', + } + (users, _groups) = self._norm(ug_cfg, distro) + self.assertIn('joe', users) + self.assertIn('bob', users) + self.assertEquals({'default': False}, users['joe']) + self.assertEquals({'default': False}, users['bob']) + + def test_users_simple(self): + distro = self._make_distro('ubuntu') + ug_cfg = { + 'users': [ + 'joe', + 'bob' + ], + } + (users, _groups) = self._norm(ug_cfg, distro) + self.assertIn('joe', users) + self.assertIn('bob', users) + self.assertEquals({'default': False}, users['joe']) + self.assertEquals({'default': False}, users['bob']) + + def test_users_old_user(self): + distro = self._make_distro('ubuntu', bcfg) + ug_cfg = { + 'user': 'zetta', + 'users': 'default' + } + (users, _groups) = self._norm(ug_cfg, distro) + self.assertIn('bob', users) + self.assertIn('zetta', users) + self.assertNotIn('default', users) + ug_cfg = { + 'user': 'zetta', + 'users': 'default, joe' + } + (users, _groups) = self._norm(ug_cfg, distro) + self.assertIn('bob', users) + self.assertIn('joe', users) + self.assertIn('zetta', users) + self.assertNotIn('default', users) + ug_cfg = { + 'user': 'zetta', + 'users': ['bob', 'joe'] + } + (users, _groups) = self._norm(ug_cfg, distro) + self.assertNotIn('bob', users) + self.assertIn('joe', users) + self.assertIn('zetta', users) + ug_cfg = { + 'user': 'zetta', + 'users': { + 'bob': True, + 'joe': True, + } + } + (users, _groups) = self._norm(ug_cfg, distro) + self.assertIn('bob', users) + self.assertIn('joe', users) + self.assertIn('zetta', users) + ug_cfg = { + 'user': 'zetta', + } + (users, _groups) = self._norm(ug_cfg, distro) + self.assertIn('zetta', users) + ug_cfg = {} + (users, groups) = self._norm(ug_cfg, distro) + self.assertEquals({}, users) + self.assertEquals({}, groups) + + def test_users_dict_default_additional(self): + distro = self._make_distro('ubuntu', bcfg) + ug_cfg = { + 'users': [ + {'name': 'default', 'blah': True} + ], + } + (users, _groups) = self._norm(ug_cfg, distro) + self.assertIn('bob', users) + self.assertEquals(",".join(distro.get_default_user()['groups']), + users['bob']['groups']) + self.assertEquals(True, + users['bob']['blah']) + self.assertEquals(True, + users['bob']['default']) + + def test_users_dict_extract(self): + distro = self._make_distro('ubuntu', bcfg) + ug_cfg = { + 'users': [ + 'default', + ], + } + (users, _groups) = self._norm(ug_cfg, distro) + self.assertIn('bob', users) + (name, config) = distros.extract_default(users) + self.assertEquals(name, 'bob') + expected_config = {} + def_config = None + try: + def_config = distro.get_default_user() + except NotImplementedError: + pass + if not def_config: + def_config = {} + expected_config.update(def_config) + + # Ignore these for now + expected_config.pop('name', None) + expected_config.pop('groups', None) + config.pop('groups', None) + self.assertEquals(config, expected_config) + + def test_users_dict_default(self): + distro = self._make_distro('ubuntu', bcfg) + ug_cfg = { + 'users': [ + 'default', + ], + } + (users, _groups) = self._norm(ug_cfg, distro) + self.assertIn('bob', users) + self.assertEquals(",".join(distro.get_default_user()['groups']), + users['bob']['groups']) + self.assertEquals(True, + users['bob']['default']) + + def test_users_dict_trans(self): + distro = self._make_distro('ubuntu') + ug_cfg = { + 'users': [ + {'name': 'joe', + 'tr-me': True}, + {'name': 'bob'}, + ], + } + (users, _groups) = self._norm(ug_cfg, distro) + self.assertIn('joe', users) + self.assertIn('bob', users) + self.assertEquals({'tr_me': True, 'default': False}, users['joe']) + self.assertEquals({'default': False}, users['bob']) + + def test_users_dict(self): + distro = self._make_distro('ubuntu') + ug_cfg = { + 'users': [ + {'name': 'joe'}, + {'name': 'bob'}, + ], + } + (users, _groups) = self._norm(ug_cfg, distro) + self.assertIn('joe', users) + self.assertIn('bob', users) + self.assertEquals({'default': False}, users['joe']) + self.assertEquals({'default': False}, users['bob']) diff --git a/tests/unittests/test_filters/__init__.py b/tests/unittests/test_filters/__init__.py new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/tests/unittests/test_filters/__init__.py diff --git a/tests/unittests/test_filters/test_launch_index.py b/tests/unittests/test_filters/test_launch_index.py index 7ca7cbb6..773bb312 100644 --- a/tests/unittests/test_filters/test_launch_index.py +++ b/tests/unittests/test_filters/test_launch_index.py @@ -1,6 +1,6 @@ import copy -import helpers as th +from tests.unittests import helpers import itertools @@ -18,7 +18,7 @@ def count_messages(root): return am -class TestLaunchFilter(th.ResourceUsingTestCase): +class TestLaunchFilter(helpers.ResourceUsingTestCase): def assertCounts(self, message, expected_counts): orig_message = copy.deepcopy(message) diff --git a/tests/unittests/test_handler/__init__.py b/tests/unittests/test_handler/__init__.py new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/tests/unittests/test_handler/__init__.py diff --git a/tests/unittests/test_handler/test_handler_ca_certs.py b/tests/unittests/test_handler/test_handler_ca_certs.py index d3df5c50..0558023a 100644 --- a/tests/unittests/test_handler/test_handler_ca_certs.py +++ b/tests/unittests/test_handler/test_handler_ca_certs.py @@ -77,7 +77,7 @@ class TestConfig(MockerTestCase): """Test that a single cert gets passed to add_ca_certs.""" config = {"ca-certs": {"trusted": ["CERT1"]}} - self.mock_add(self.paths, ["CERT1"]) + self.mock_add(["CERT1"]) self.mock_update() self.mocker.replay() @@ -87,7 +87,7 @@ class TestConfig(MockerTestCase): """Test that multiple certs get passed to add_ca_certs.""" config = {"ca-certs": {"trusted": ["CERT1", "CERT2"]}} - self.mock_add(self.paths, ["CERT1", "CERT2"]) + self.mock_add(["CERT1", "CERT2"]) self.mock_update() self.mocker.replay() @@ -97,7 +97,7 @@ class TestConfig(MockerTestCase): """Test remove_defaults works as expected.""" config = {"ca-certs": {"remove-defaults": True}} - self.mock_remove(self.paths) + self.mock_remove() self.mock_update() self.mocker.replay() @@ -116,8 +116,8 @@ class TestConfig(MockerTestCase): """Test remove_defaults is not called when config value is False.""" config = {"ca-certs": {"remove-defaults": True, "trusted": ["CERT1"]}} - self.mock_remove(self.paths) - self.mock_add(self.paths, ["CERT1"]) + self.mock_remove() + self.mock_add(["CERT1"]) self.mock_update() self.mocker.replay() @@ -136,20 +136,52 @@ class TestAddCaCerts(MockerTestCase): """Test that no certificate are written if not provided.""" self.mocker.replace(util.write_file, passthrough=False) self.mocker.replay() - cc_ca_certs.add_ca_certs(self.paths, []) + cc_ca_certs.add_ca_certs([]) - def test_single_cert(self): - """Test adding a single certificate to the trusted CAs.""" + def test_single_cert_trailing_cr(self): + """Test adding a single certificate to the trusted CAs + when existing ca-certificates has trailing newline""" cert = "CERT1\nLINE2\nLINE3" + ca_certs_content = "line1\nline2\ncloud-init-ca-certs.crt\nline3\n" + expected = "line1\nline2\nline3\ncloud-init-ca-certs.crt\n" + mock_write = self.mocker.replace(util.write_file, passthrough=False) + mock_load = self.mocker.replace(util.load_file, passthrough=False) + mock_write("/usr/share/ca-certificates/cloud-init-ca-certs.crt", cert, mode=0644) + + mock_load("/etc/ca-certificates.conf") + self.mocker.result(ca_certs_content) + + mock_write("/etc/ca-certificates.conf", expected, omode="wb") + self.mocker.replay() + + cc_ca_certs.add_ca_certs([cert]) + + def test_single_cert_no_trailing_cr(self): + """Test adding a single certificate to the trusted CAs + when existing ca-certificates has no trailing newline""" + cert = "CERT1\nLINE2\nLINE3" + + ca_certs_content = "line1\nline2\nline3" + + mock_write = self.mocker.replace(util.write_file, passthrough=False) + mock_load = self.mocker.replace(util.load_file, passthrough=False) + + mock_write("/usr/share/ca-certificates/cloud-init-ca-certs.crt", + cert, mode=0644) + + mock_load("/etc/ca-certificates.conf") + self.mocker.result(ca_certs_content) + mock_write("/etc/ca-certificates.conf", - "\ncloud-init-ca-certs.crt", omode="ab") + "%s\n%s\n" % (ca_certs_content, "cloud-init-ca-certs.crt"), + omode="wb") self.mocker.replay() - cc_ca_certs.add_ca_certs(self.paths, [cert]) + cc_ca_certs.add_ca_certs([cert]) def test_multiple_certs(self): """Test adding multiple certificates to the trusted CAs.""" @@ -157,13 +189,21 @@ class TestAddCaCerts(MockerTestCase): expected_cert_file = "\n".join(certs) mock_write = self.mocker.replace(util.write_file, passthrough=False) + mock_load = self.mocker.replace(util.load_file, passthrough=False) + mock_write("/usr/share/ca-certificates/cloud-init-ca-certs.crt", expected_cert_file, mode=0644) - mock_write("/etc/ca-certificates.conf", - "\ncloud-init-ca-certs.crt", omode="ab") + + ca_certs_content = "line1\nline2\nline3" + mock_load("/etc/ca-certificates.conf") + self.mocker.result(ca_certs_content) + + out = "%s\n%s\n" % (ca_certs_content, "cloud-init-ca-certs.crt") + mock_write("/etc/ca-certificates.conf", out, omode="wb") + self.mocker.replay() - cc_ca_certs.add_ca_certs(self.paths, certs) + cc_ca_certs.add_ca_certs(certs) class TestUpdateCaCerts(MockerTestCase): @@ -198,4 +238,4 @@ class TestRemoveDefaultCaCerts(MockerTestCase): "ca-certificates ca-certificates/trust_new_crts select no") self.mocker.replay() - cc_ca_certs.remove_default_ca_certs(self.paths) + cc_ca_certs.remove_default_ca_certs() diff --git a/tests/unittests/test_handler/test_handler_power_state.py b/tests/unittests/test_handler/test_handler_power_state.py new file mode 100644 index 00000000..f6e37fa5 --- /dev/null +++ b/tests/unittests/test_handler/test_handler_power_state.py @@ -0,0 +1,88 @@ +from cloudinit.config import cc_power_state_change as psc + +from tests.unittests import helpers as t_help + + +class TestLoadPowerState(t_help.TestCase): + def setUp(self): + super(self.__class__, self).setUp() + + def test_no_config(self): + # completely empty config should mean do nothing + (cmd, _timeout) = psc.load_power_state({}) + self.assertEqual(cmd, None) + + def test_irrelevant_config(self): + # no power_state field in config should return None for cmd + (cmd, _timeout) = psc.load_power_state({'foo': 'bar'}) + self.assertEqual(cmd, None) + + def test_invalid_mode(self): + cfg = {'power_state': {'mode': 'gibberish'}} + self.assertRaises(TypeError, psc.load_power_state, cfg) + + cfg = {'power_state': {'mode': ''}} + self.assertRaises(TypeError, psc.load_power_state, cfg) + + def test_empty_mode(self): + cfg = {'power_state': {'message': 'goodbye'}} + self.assertRaises(TypeError, psc.load_power_state, cfg) + + def test_valid_modes(self): + cfg = {'power_state': {}} + for mode in ('halt', 'poweroff', 'reboot'): + cfg['power_state']['mode'] = mode + check_lps_ret(psc.load_power_state(cfg), mode=mode) + + def test_invalid_delay(self): + cfg = {'power_state': {'mode': 'poweroff', 'delay': 'goodbye'}} + self.assertRaises(TypeError, psc.load_power_state, cfg) + + def test_valid_delay(self): + cfg = {'power_state': {'mode': 'poweroff', 'delay': ''}} + for delay in ("now", "+1", "+30"): + cfg['power_state']['delay'] = delay + check_lps_ret(psc.load_power_state(cfg)) + + def test_message_present(self): + cfg = {'power_state': {'mode': 'poweroff', 'message': 'GOODBYE'}} + ret = psc.load_power_state(cfg) + check_lps_ret(psc.load_power_state(cfg)) + self.assertIn(cfg['power_state']['message'], ret[0]) + + def test_no_message(self): + # if message is not present, then no argument should be passed for it + cfg = {'power_state': {'mode': 'poweroff'}} + (cmd, _timeout) = psc.load_power_state(cfg) + self.assertNotIn("", cmd) + check_lps_ret(psc.load_power_state(cfg)) + self.assertTrue(len(cmd) == 3) + + +def check_lps_ret(psc_return, mode=None): + if len(psc_return) != 2: + raise TypeError("length returned = %d" % len(psc_return)) + + errs = [] + cmd = psc_return[0] + timeout = psc_return[1] + + if not 'shutdown' in psc_return[0][0]: + errs.append("string 'shutdown' not in cmd") + + if mode is not None: + opt = {'halt': '-H', 'poweroff': '-P', 'reboot': '-r'}[mode] + if opt not in psc_return[0]: + errs.append("opt '%s' not in cmd: %s" % (opt, cmd)) + + if len(cmd) != 3 and len(cmd) != 4: + errs.append("Invalid command length: %s" % len(cmd)) + + try: + float(timeout) + except: + errs.append("timeout failed convert to float") + + if len(errs): + lines = ["Errors in result: %s" % str(psc_return)] + errs + raise Exception('\n'.join(lines)) diff --git a/tests/unittests/test_handler/test_handler_set_hostname.py b/tests/unittests/test_handler/test_handler_set_hostname.py new file mode 100644 index 00000000..a1aba62f --- /dev/null +++ b/tests/unittests/test_handler/test_handler_set_hostname.py @@ -0,0 +1,58 @@ +from cloudinit.config import cc_set_hostname + +from cloudinit import cloud +from cloudinit import distros +from cloudinit import helpers +from cloudinit import util + +from tests.unittests import helpers as t_help + +import logging + +from StringIO import StringIO + +from configobj import ConfigObj + +LOG = logging.getLogger(__name__) + + +class TestHostname(t_help.FilesystemMockingTestCase): + def setUp(self): + super(TestHostname, self).setUp() + self.tmp = self.makeDir(prefix="unittest_") + + def _fetch_distro(self, kind): + cls = distros.fetch(kind) + paths = helpers.Paths({}) + return cls(kind, {}, paths) + + def test_write_hostname_rhel(self): + cfg = { + 'hostname': 'blah.blah.blah.yahoo.com', + } + distro = self._fetch_distro('rhel') + paths = helpers.Paths({}) + ds = None + cc = cloud.Cloud(ds, paths, {}, distro, None) + self.patchUtils(self.tmp) + self.patchOS(self.tmp) + cc_set_hostname.handle('cc_set_hostname', + cfg, cc, LOG, []) + contents = util.load_file("/etc/sysconfig/network") + n_cfg = ConfigObj(StringIO(contents)) + self.assertEquals({'HOSTNAME': 'blah.blah.blah.yahoo.com'}, + dict(n_cfg)) + + def test_write_hostname_debian(self): + cfg = { + 'hostname': 'blah.blah.blah.yahoo.com', + } + distro = self._fetch_distro('debian') + paths = helpers.Paths({}) + ds = None + cc = cloud.Cloud(ds, paths, {}, distro, None) + self.patchUtils(self.tmp) + cc_set_hostname.handle('cc_set_hostname', + cfg, cc, LOG, []) + contents = util.load_file("/etc/hostname") + self.assertEquals('blah', contents.strip()) diff --git a/tests/unittests/test_handler/test_handler_yum_add_repo.py b/tests/unittests/test_handler/test_handler_yum_add_repo.py new file mode 100644 index 00000000..8df592f9 --- /dev/null +++ b/tests/unittests/test_handler/test_handler_yum_add_repo.py @@ -0,0 +1,68 @@ +from cloudinit import helpers +from cloudinit import util + +from cloudinit.config import cc_yum_add_repo + +from tests.unittests import helpers + +import logging + +from StringIO import StringIO + +import configobj + +LOG = logging.getLogger(__name__) + + +class TestConfig(helpers.FilesystemMockingTestCase): + def setUp(self): + super(TestConfig, self).setUp() + self.tmp = self.makeDir(prefix="unittest_") + + def test_bad_config(self): + cfg = { + 'yum_repos': { + 'epel-testing': { + 'name': 'Extra Packages for Enterprise Linux 5 - Testing', + # Missing this should cause the repo not to be written + #'baseurl': 'http://blah.org/pub/epel/testing/5/$basearch', + 'enabled': False, + 'gpgcheck': True, + 'gpgkey': 'file:///etc/pki/rpm-gpg/RPM-GPG-KEY-EPEL', + 'failovermethod': 'priority', + }, + }, + } + self.patchUtils(self.tmp) + cc_yum_add_repo.handle('yum_add_repo', cfg, None, LOG, []) + self.assertRaises(IOError, util.load_file, + "/etc/yum.repos.d/epel_testing.repo") + + def test_write_config(self): + cfg = { + 'yum_repos': { + 'epel-testing': { + 'name': 'Extra Packages for Enterprise Linux 5 - Testing', + 'baseurl': 'http://blah.org/pub/epel/testing/5/$basearch', + 'enabled': False, + 'gpgcheck': True, + 'gpgkey': 'file:///etc/pki/rpm-gpg/RPM-GPG-KEY-EPEL', + 'failovermethod': 'priority', + }, + }, + } + self.patchUtils(self.tmp) + cc_yum_add_repo.handle('yum_add_repo', cfg, None, LOG, []) + contents = util.load_file("/etc/yum.repos.d/epel_testing.repo") + contents = configobj.ConfigObj(StringIO(contents)) + expected = { + 'epel_testing': { + 'name': 'Extra Packages for Enterprise Linux 5 - Testing', + 'failovermethod': 'priority', + 'gpgkey': 'file:///etc/pki/rpm-gpg/RPM-GPG-KEY-EPEL', + 'enabled': '0', + 'baseurl': 'http://blah.org/pub/epel/testing/5/$basearch', + 'gpgcheck': '1', + } + } + self.assertEquals(expected, dict(contents)) diff --git a/tests/unittests/test_merging.py b/tests/unittests/test_merging.py new file mode 100644 index 00000000..0037b966 --- /dev/null +++ b/tests/unittests/test_merging.py @@ -0,0 +1,62 @@ +from mocker import MockerTestCase + +from cloudinit import util + + +class TestMergeDict(MockerTestCase): + def test_simple_merge(self): + """Test simple non-conflict merge.""" + source = {"key1": "value1"} + candidate = {"key2": "value2"} + result = util.mergedict(source, candidate) + self.assertEqual({"key1": "value1", "key2": "value2"}, result) + + def test_nested_merge(self): + """Test nested merge.""" + source = {"key1": {"key1.1": "value1.1"}} + candidate = {"key1": {"key1.2": "value1.2"}} + result = util.mergedict(source, candidate) + self.assertEqual( + {"key1": {"key1.1": "value1.1", "key1.2": "value1.2"}}, result) + + def test_merge_does_not_override(self): + """Test that candidate doesn't override source.""" + source = {"key1": "value1", "key2": "value2"} + candidate = {"key1": "value2", "key2": "NEW VALUE"} + result = util.mergedict(source, candidate) + self.assertEqual(source, result) + + def test_empty_candidate(self): + """Test empty candidate doesn't change source.""" + source = {"key": "value"} + candidate = {} + result = util.mergedict(source, candidate) + self.assertEqual(source, result) + + def test_empty_source(self): + """Test empty source is replaced by candidate.""" + source = {} + candidate = {"key": "value"} + result = util.mergedict(source, candidate) + self.assertEqual(candidate, result) + + def test_non_dict_candidate(self): + """Test non-dict candidate is discarded.""" + source = {"key": "value"} + candidate = "not a dict" + result = util.mergedict(source, candidate) + self.assertEqual(source, result) + + def test_non_dict_source(self): + """Test non-dict source is not modified with a dict candidate.""" + source = "not a dict" + candidate = {"key": "value"} + result = util.mergedict(source, candidate) + self.assertEqual(source, result) + + def test_neither_dict(self): + """Test if neither candidate or source is dict source wins.""" + source = "source" + candidate = "candidate" + result = util.mergedict(source, candidate) + self.assertEqual(source, result) diff --git a/tests/unittests/test_runs/__init__.py b/tests/unittests/test_runs/__init__.py new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/tests/unittests/test_runs/__init__.py diff --git a/tests/unittests/test_runs/test_merge_run.py b/tests/unittests/test_runs/test_merge_run.py new file mode 100644 index 00000000..d9c3a455 --- /dev/null +++ b/tests/unittests/test_runs/test_merge_run.py @@ -0,0 +1,52 @@ +import os + +from tests.unittests import helpers + +from cloudinit.settings import (PER_INSTANCE) +from cloudinit import stages +from cloudinit import util + + +class TestMergeRun(helpers.FilesystemMockingTestCase): + def _patchIn(self, root): + self.restore() + self.patchOS(root) + self.patchUtils(root) + + def test_none_ds(self): + new_root = self.makeDir() + self.replicateTestRoot('simple_ubuntu', new_root) + cfg = { + 'datasource_list': ['None'], + 'cloud_init_modules': ['write-files'], + } + ud = self.readResource('user_data.1.txt') + cloud_cfg = util.yaml_dumps(cfg) + util.ensure_dir(os.path.join(new_root, 'etc', 'cloud')) + util.write_file(os.path.join(new_root, 'etc', + 'cloud', 'cloud.cfg'), cloud_cfg) + self._patchIn(new_root) + + # Now start verifying whats created + initer = stages.Init() + initer.read_cfg() + initer.initialize() + initer.fetch() + initer.datasource.userdata_raw = ud + _iid = initer.instancify() + initer.update() + initer.cloudify().run('consume_userdata', + initer.consume_userdata, + args=[PER_INSTANCE], + freq=PER_INSTANCE) + mirrors = initer.distro.get_option('package_mirrors') + self.assertEquals(1, len(mirrors)) + mirror = mirrors[0] + self.assertEquals(mirror['arches'], ['i386', 'amd64', 'blah']) + mods = stages.Modules(initer) + (which_ran, failures) = mods.run_section('cloud_init_modules') + self.assertTrue(len(failures) == 0) + self.assertTrue(os.path.exists('/etc/blah.ini')) + self.assertIn('write-files', which_ran) + contents = util.load_file('/etc/blah.ini') + self.assertEquals(contents, 'blah') diff --git a/tests/unittests/test_runs/test_simple_run.py b/tests/unittests/test_runs/test_simple_run.py new file mode 100644 index 00000000..60ef812a --- /dev/null +++ b/tests/unittests/test_runs/test_simple_run.py @@ -0,0 +1,80 @@ +import os + +from tests.unittests import helpers + +from cloudinit.settings import (PER_INSTANCE) +from cloudinit import stages +from cloudinit import util + + +class TestSimpleRun(helpers.FilesystemMockingTestCase): + def _patchIn(self, root): + self.restore() + self.patchOS(root) + self.patchUtils(root) + + def _pp_root(self, root, repatch=True): + self.restore() + for (dirpath, dirnames, filenames) in os.walk(root): + print(dirpath) + for f in filenames: + joined = os.path.join(dirpath, f) + if os.path.islink(joined): + print("f %s - (symlink)" % (f)) + else: + print("f %s" % (f)) + for d in dirnames: + joined = os.path.join(dirpath, d) + if os.path.islink(joined): + print("d %s - (symlink)" % (d)) + else: + print("d %s" % (d)) + if repatch: + self._patchIn(root) + + def test_none_ds(self): + new_root = self.makeDir() + self.replicateTestRoot('simple_ubuntu', new_root) + cfg = { + 'datasource_list': ['None'], + 'write_files': [ + { + 'path': '/etc/blah.ini', + 'content': 'blah', + 'permissions': 0755, + }, + ], + 'cloud_init_modules': ['write-files'], + } + cloud_cfg = util.yaml_dumps(cfg) + util.ensure_dir(os.path.join(new_root, 'etc', 'cloud')) + util.write_file(os.path.join(new_root, 'etc', + 'cloud', 'cloud.cfg'), cloud_cfg) + self._patchIn(new_root) + + # Now start verifying whats created + initer = stages.Init() + initer.read_cfg() + initer.initialize() + self.assertTrue(os.path.exists("/var/lib/cloud")) + for d in ['scripts', 'seed', 'instances', 'handlers', 'sem', 'data']: + self.assertTrue(os.path.isdir(os.path.join("/var/lib/cloud", d))) + + initer.fetch() + iid = initer.instancify() + self.assertEquals(iid, 'iid-datasource-none') + initer.update() + self.assertTrue(os.path.islink("var/lib/cloud/instance")) + + initer.cloudify().run('consume_userdata', + initer.consume_userdata, + args=[PER_INSTANCE], + freq=PER_INSTANCE) + + mods = stages.Modules(initer) + (which_ran, failures) = mods.run_section('cloud_init_modules') + self.assertTrue(len(failures) == 0) + self.assertTrue(os.path.exists('/etc/blah.ini')) + self.assertIn('write-files', which_ran) + contents = util.load_file('/etc/blah.ini') + self.assertEquals(contents, 'blah') diff --git a/tests/unittests/test_util.py b/tests/unittests/test_util.py index 15fcbd26..02611581 100644 --- a/tests/unittests/test_util.py +++ b/tests/unittests/test_util.py @@ -1,5 +1,6 @@ import os import stat +import yaml from mocker import MockerTestCase from unittest import TestCase @@ -27,65 +28,6 @@ class FakeSelinux(object): self.restored.append(path) -class TestMergeDict(MockerTestCase): - def test_simple_merge(self): - """Test simple non-conflict merge.""" - source = {"key1": "value1"} - candidate = {"key2": "value2"} - result = util.mergedict(source, candidate) - self.assertEqual({"key1": "value1", "key2": "value2"}, result) - - def test_nested_merge(self): - """Test nested merge.""" - source = {"key1": {"key1.1": "value1.1"}} - candidate = {"key1": {"key1.2": "value1.2"}} - result = util.mergedict(source, candidate) - self.assertEqual( - {"key1": {"key1.1": "value1.1", "key1.2": "value1.2"}}, result) - - def test_merge_does_not_override(self): - """Test that candidate doesn't override source.""" - source = {"key1": "value1", "key2": "value2"} - candidate = {"key1": "value2", "key2": "NEW VALUE"} - result = util.mergedict(source, candidate) - self.assertEqual(source, result) - - def test_empty_candidate(self): - """Test empty candidate doesn't change source.""" - source = {"key": "value"} - candidate = {} - result = util.mergedict(source, candidate) - self.assertEqual(source, result) - - def test_empty_source(self): - """Test empty source is replaced by candidate.""" - source = {} - candidate = {"key": "value"} - result = util.mergedict(source, candidate) - self.assertEqual(candidate, result) - - def test_non_dict_candidate(self): - """Test non-dict candidate is discarded.""" - source = {"key": "value"} - candidate = "not a dict" - result = util.mergedict(source, candidate) - self.assertEqual(source, result) - - def test_non_dict_source(self): - """Test non-dict source is not modified with a dict candidate.""" - source = "not a dict" - candidate = {"key": "value"} - result = util.mergedict(source, candidate) - self.assertEqual(source, result) - - def test_neither_dict(self): - """Test if neither candidate or source is dict source wins.""" - source = "source" - candidate = "candidate" - result = util.mergedict(source, candidate) - self.assertEqual(source, result) - - class TestGetCfgOptionListOrStr(TestCase): def test_not_found_no_default(self): """None is returned if key is not found and no default given.""" @@ -268,4 +210,42 @@ class TestGetCmdline(TestCase): os.environ['DEBUG_PROC_CMDLINE'] = 'abcd 123' self.assertEqual(os.environ['DEBUG_PROC_CMDLINE'], util.get_cmdline()) + +class TestLoadYaml(TestCase): + mydefault = "7b03a8ebace993d806255121073fed52" + + def test_simple(self): + mydata = {'1': "one", '2': "two"} + self.assertEqual(util.load_yaml(yaml.dump(mydata)), mydata) + + def test_nonallowed_returns_default(self): + # for now, anything not in the allowed list just returns the default. + myyaml = yaml.dump({'1': "one"}) + self.assertEqual(util.load_yaml(blob=myyaml, + default=self.mydefault, + allowed=(str,)), + self.mydefault) + + def test_bogus_returns_default(self): + badyaml = "1\n 2:" + self.assertEqual(util.load_yaml(blob=badyaml, + default=self.mydefault), + self.mydefault) + + def test_unsafe_types(self): + # should not load complex types + unsafe_yaml = yaml.dump((1, 2, 3,)) + self.assertEqual(util.load_yaml(blob=unsafe_yaml, + default=self.mydefault), + self.mydefault) + + def test_python_unicode(self): + # complex type of python/unicde is explicitly allowed + myobj = {'1': unicode("FOOBAR")} + safe_yaml = yaml.dump(myobj) + self.assertEqual(util.load_yaml(blob=safe_yaml, + default=self.mydefault), + myobj) + + # vi: ts=4 expandtab |