diff options
Diffstat (limited to 'tests/unittests')
-rw-r--r-- | tests/unittests/helpers.py | 104 | ||||
-rw-r--r-- | tests/unittests/test_datasource/test_configdrive.py | 145 | ||||
-rw-r--r-- | tests/unittests/test_distros/test_netconfig.py | 174 | ||||
-rw-r--r-- | tests/unittests/test_distros/test_user_data_normalize.py | 279 | ||||
-rw-r--r-- | tests/unittests/test_filters/test_launch_index.py | 12 | ||||
-rw-r--r-- | tests/unittests/test_runs/test_simple_run.py | 86 | ||||
-rw-r--r-- | tests/unittests/test_util.py | 39 |
7 files changed, 827 insertions, 12 deletions
diff --git a/tests/unittests/helpers.py b/tests/unittests/helpers.py index d0f09e70..2c5dcad2 100644 --- a/tests/unittests/helpers.py +++ b/tests/unittests/helpers.py @@ -3,6 +3,38 @@ import os from mocker import MockerTestCase from cloudinit import helpers as ch +from cloudinit import util + +import shutil + + +# Makes the old path start +# with new base instead of whatever +# it previously had +def rebase_path(old_path, new_base): + if old_path.startswith(new_base): + # Already handled... + return old_path + # Retarget the base of that path + # to the new base instead of the + # old one... + path = os.path.join(new_base, old_path.lstrip("/")) + path = os.path.abspath(path) + return path + + +# Can work on anything that takes a path as arguments +def retarget_many_wrapper(new_base, am, old_func): + def wrapper(*args, **kwds): + n_args = list(args) + nam = am + if am == -1: + nam = len(n_args) + for i in range(0, nam): + path = args[i] + n_args[i] = rebase_path(path, new_base) + return old_func(*n_args, **kwds) + return wrapper class ResourceUsingTestCase(MockerTestCase): @@ -40,3 +72,75 @@ class ResourceUsingTestCase(MockerTestCase): 'templates_dir': self.resourceLocation(), }) return cp + + +class FilesystemMockingTestCase(ResourceUsingTestCase): + def __init__(self, methodName="runTest"): + ResourceUsingTestCase.__init__(self, methodName) + self.patched_funcs = [] + + def replicateTestRoot(self, example_root, target_root): + real_root = self.resourceLocation() + real_root = os.path.join(real_root, 'roots', example_root) + for (dir_path, _dirnames, filenames) in os.walk(real_root): + real_path = dir_path + make_path = rebase_path(real_path[len(real_root):], target_root) + util.ensure_dir(make_path) + for f in filenames: + real_path = util.abs_join(real_path, f) + make_path = util.abs_join(make_path, f) + shutil.copy(real_path, make_path) + + def tearDown(self): + self.restore() + ResourceUsingTestCase.tearDown(self) + + def restore(self): + for (mod, f, func) in self.patched_funcs: + setattr(mod, f, func) + self.patched_funcs = [] + + def patchUtils(self, new_root): + patch_funcs = { + util: [('write_file', 1), + ('load_file', 1), + ('ensure_dir', 1), + ('chmod', 1), + ('delete_dir_contents', 1), + ('del_file', 1), + ('sym_link', -1)], + } + for (mod, funcs) in patch_funcs.items(): + for (f, am) in funcs: + func = getattr(mod, f) + trap_func = retarget_many_wrapper(new_root, am, func) + setattr(mod, f, trap_func) + self.patched_funcs.append((mod, f, func)) + + # Handle subprocess calls + func = getattr(util, 'subp') + + def nsubp(*_args, **_kwargs): + return ('', '') + + setattr(util, 'subp', nsubp) + self.patched_funcs.append((util, 'subp', func)) + + def null_func(*_args, **_kwargs): + return None + + for f in ['chownbyid', 'chownbyname']: + func = getattr(util, f) + setattr(util, f, null_func) + self.patched_funcs.append((util, f, func)) + + def patchOS(self, new_root): + patch_funcs = { + os.path: ['isfile', 'exists', 'islink', 'isdir'], + } + for (mod, funcs) in patch_funcs.items(): + for f in funcs: + func = getattr(mod, f) + trap_func = retarget_many_wrapper(new_root, 1, func) + setattr(mod, f, trap_func) + self.patched_funcs.append((mod, f, func)) diff --git a/tests/unittests/test_datasource/test_configdrive.py b/tests/unittests/test_datasource/test_configdrive.py index 55573114..00379e03 100644 --- a/tests/unittests/test_datasource/test_configdrive.py +++ b/tests/unittests/test_datasource/test_configdrive.py @@ -2,10 +2,12 @@ from copy import copy import json import os import os.path -import shutil -import tempfile -from unittest import TestCase +import mocker +from mocker import MockerTestCase + +from cloudinit import helpers +from cloudinit import settings from cloudinit.sources import DataSourceConfigDrive as ds from cloudinit import util @@ -60,17 +62,140 @@ CFG_DRIVE_FILES_V2 = { 'openstack/latest/user_data': USER_DATA} -class TestConfigDriveDataSource(TestCase): +class TestConfigDriveDataSource(MockerTestCase): def setUp(self): super(TestConfigDriveDataSource, self).setUp() - self.tmp = tempfile.mkdtemp() + self.tmp = self.makeDir() - def tearDown(self): - try: - shutil.rmtree(self.tmp) - except OSError: - pass + def test_ec2_metadata(self): + populate_dir(self.tmp, CFG_DRIVE_FILES_V2) + found = ds.read_config_drive_dir(self.tmp) + self.assertTrue('ec2-metadata' in found) + ec2_md = found['ec2-metadata'] + self.assertEqual(EC2_META, ec2_md) + + def test_dev_os_remap(self): + populate_dir(self.tmp, CFG_DRIVE_FILES_V2) + cfg_ds = ds.DataSourceConfigDrive(settings.CFG_BUILTIN, + None, + helpers.Paths({})) + found = ds.read_config_drive_dir(self.tmp) + cfg_ds.metadata = found['metadata'] + name_tests = { + 'ami': '/dev/vda1', + 'root': '/dev/vda1', + 'ephemeral0': '/dev/vda2', + 'swap': '/dev/vda3', + } + for name, dev_name in name_tests.items(): + my_mock = mocker.Mocker() + find_mock = my_mock.replace(util.find_devs_with, + spec=False, passthrough=False) + provided_name = dev_name[len('/dev/'):] + provided_name = "s" + provided_name[1:] + find_mock(mocker.ARGS) + my_mock.result([provided_name]) + exists_mock = my_mock.replace(os.path.exists, + spec=False, passthrough=False) + exists_mock(mocker.ARGS) + my_mock.result(False) + exists_mock(mocker.ARGS) + my_mock.result(True) + my_mock.replay() + device = cfg_ds.device_name_to_device(name) + my_mock.restore() + self.assertEquals(dev_name, device) + + def test_dev_os_map(self): + populate_dir(self.tmp, CFG_DRIVE_FILES_V2) + cfg_ds = ds.DataSourceConfigDrive(settings.CFG_BUILTIN, + None, + helpers.Paths({})) + found = ds.read_config_drive_dir(self.tmp) + os_md = found['metadata'] + cfg_ds.metadata = os_md + name_tests = { + 'ami': '/dev/vda1', + 'root': '/dev/vda1', + 'ephemeral0': '/dev/vda2', + 'swap': '/dev/vda3', + } + for name, dev_name in name_tests.items(): + my_mock = mocker.Mocker() + find_mock = my_mock.replace(util.find_devs_with, + spec=False, passthrough=False) + find_mock(mocker.ARGS) + my_mock.result([dev_name]) + exists_mock = my_mock.replace(os.path.exists, + spec=False, passthrough=False) + exists_mock(mocker.ARGS) + my_mock.result(True) + my_mock.replay() + device = cfg_ds.device_name_to_device(name) + my_mock.restore() + self.assertEquals(dev_name, device) + + def test_dev_ec2_remap(self): + populate_dir(self.tmp, CFG_DRIVE_FILES_V2) + cfg_ds = ds.DataSourceConfigDrive(settings.CFG_BUILTIN, + None, + helpers.Paths({})) + found = ds.read_config_drive_dir(self.tmp) + ec2_md = found['ec2-metadata'] + os_md = found['metadata'] + cfg_ds.ec2_metadata = ec2_md + cfg_ds.metadata = os_md + name_tests = { + 'ami': '/dev/vda1', + 'root': '/dev/vda1', + 'ephemeral0': '/dev/vda2', + 'swap': '/dev/vda3', + None: None, + 'bob': None, + 'root2k': None, + } + for name, dev_name in name_tests.items(): + my_mock = mocker.Mocker() + exists_mock = my_mock.replace(os.path.exists, + spec=False, passthrough=False) + exists_mock(mocker.ARGS) + my_mock.result(False) + exists_mock(mocker.ARGS) + my_mock.result(True) + my_mock.replay() + device = cfg_ds.device_name_to_device(name) + self.assertEquals(dev_name, device) + my_mock.restore() + + def test_dev_ec2_map(self): + populate_dir(self.tmp, CFG_DRIVE_FILES_V2) + cfg_ds = ds.DataSourceConfigDrive(settings.CFG_BUILTIN, + None, + helpers.Paths({})) + found = ds.read_config_drive_dir(self.tmp) + exists_mock = self.mocker.replace(os.path.exists, + spec=False, passthrough=False) + exists_mock(mocker.ARGS) + self.mocker.count(0, None) + self.mocker.result(True) + self.mocker.replay() + ec2_md = found['ec2-metadata'] + os_md = found['metadata'] + cfg_ds.ec2_metadata = ec2_md + cfg_ds.metadata = os_md + name_tests = { + 'ami': '/dev/sda1', + 'root': '/dev/sda1', + 'ephemeral0': '/dev/sda2', + 'swap': '/dev/sda3', + None: None, + 'bob': None, + 'root2k': None, + } + for name, dev_name in name_tests.items(): + device = cfg_ds.device_name_to_device(name) + self.assertEquals(dev_name, device) def test_dir_valid(self): """Verify a dir is read as such.""" diff --git a/tests/unittests/test_distros/test_netconfig.py b/tests/unittests/test_distros/test_netconfig.py new file mode 100644 index 00000000..55765f0c --- /dev/null +++ b/tests/unittests/test_distros/test_netconfig.py @@ -0,0 +1,174 @@ +from mocker import MockerTestCase + +import mocker + +import os + +from cloudinit import distros +from cloudinit import helpers +from cloudinit import settings +from cloudinit import util + +from StringIO import StringIO + + +BASE_NET_CFG = ''' +auto lo +iface lo inet loopback + +auto eth0 +iface eth0 inet static + address 192.168.1.5 + netmask 255.255.255.0 + network 192.168.0.0 + broadcast 192.168.1.0 + gateway 192.168.1.254 + +auto eth1 +iface eth1 inet dhcp +''' + + +class WriteBuffer(object): + def __init__(self): + self.buffer = StringIO() + self.mode = None + self.omode = None + + def write(self, text): + self.buffer.write(text) + + def __str__(self): + return self.buffer.getvalue() + + +class TestNetCfgDistro(MockerTestCase): + + def _get_distro(self, dname): + cls = distros.fetch(dname) + cfg = settings.CFG_BUILTIN + cfg['system_info']['distro'] = dname + paths = helpers.Paths({}) + return cls(dname, cfg, paths) + + def test_simple_write_ub(self): + ub_distro = self._get_distro('ubuntu') + util_mock = self.mocker.replace(util.write_file, + spec=False, passthrough=False) + exists_mock = self.mocker.replace(os.path.isfile, + spec=False, passthrough=False) + + exists_mock(mocker.ARGS) + self.mocker.count(0, None) + self.mocker.result(False) + + write_bufs = {} + + def replace_write(filename, content, mode=0644, omode="wb"): + buf = WriteBuffer() + buf.mode = mode + buf.omode = omode + buf.write(content) + write_bufs[filename] = buf + + util_mock(mocker.ARGS) + self.mocker.call(replace_write) + self.mocker.replay() + ub_distro.apply_network(BASE_NET_CFG, False) + + self.assertEquals(len(write_bufs), 1) + self.assertIn('/etc/network/interfaces', write_bufs) + write_buf = write_bufs['/etc/network/interfaces'] + self.assertEquals(str(write_buf).strip(), BASE_NET_CFG.strip()) + self.assertEquals(write_buf.mode, 0644) + + def assertCfgEquals(self, blob1, blob2): + cfg_tester = distros.rhel.QuotingConfigObj + b1 = dict(cfg_tester(blob1.strip().splitlines())) + b2 = dict(cfg_tester(blob2.strip().splitlines())) + self.assertEquals(b1, b2) + for (k, v) in b1.items(): + self.assertIn(k, b2) + for (k, v) in b2.items(): + self.assertIn(k, b1) + for (k, v) in b1.items(): + self.assertEquals(v, b2[k]) + + def test_simple_write_rh(self): + rh_distro = self._get_distro('rhel') + write_mock = self.mocker.replace(util.write_file, + spec=False, passthrough=False) + load_mock = self.mocker.replace(util.load_file, + spec=False, passthrough=False) + exists_mock = self.mocker.replace(os.path.isfile, + spec=False, passthrough=False) + + write_bufs = {} + + def replace_write(filename, content, mode=0644, omode="wb"): + buf = WriteBuffer() + buf.mode = mode + buf.omode = omode + buf.write(content) + write_bufs[filename] = buf + + exists_mock(mocker.ARGS) + self.mocker.count(0, None) + self.mocker.result(False) + + load_mock(mocker.ARGS) + self.mocker.count(0, None) + self.mocker.result('') + + for _i in range(0, 3): + write_mock(mocker.ARGS) + self.mocker.call(replace_write) + + write_mock(mocker.ARGS) + self.mocker.call(replace_write) + + self.mocker.replay() + rh_distro.apply_network(BASE_NET_CFG, False) + + self.assertEquals(len(write_bufs), 4) + self.assertIn('/etc/sysconfig/network-scripts/ifcfg-lo', write_bufs) + write_buf = write_bufs['/etc/sysconfig/network-scripts/ifcfg-lo'] + expected_buf = ''' +DEVICE="lo" +ONBOOT=yes +''' + self.assertCfgEquals(expected_buf, str(write_buf)) + self.assertEquals(write_buf.mode, 0644) + + self.assertIn('/etc/sysconfig/network-scripts/ifcfg-eth0', write_bufs) + write_buf = write_bufs['/etc/sysconfig/network-scripts/ifcfg-eth0'] + expected_buf = ''' +DEVICE="eth0" +BOOTPROTO="static" +NETMASK="255.255.255.0" +IPADDR="192.168.1.5" +ONBOOT=yes +GATEWAY="192.168.1.254" +BROADCAST="192.168.1.0" +''' + self.assertCfgEquals(expected_buf, str(write_buf)) + self.assertEquals(write_buf.mode, 0644) + + self.assertIn('/etc/sysconfig/network-scripts/ifcfg-eth1', write_bufs) + write_buf = write_bufs['/etc/sysconfig/network-scripts/ifcfg-eth1'] + expected_buf = ''' +DEVICE="eth1" +BOOTPROTO="dhcp" +ONBOOT=yes +''' + self.assertCfgEquals(expected_buf, str(write_buf)) + self.assertEquals(write_buf.mode, 0644) + + self.assertIn('/etc/sysconfig/network', write_bufs) + write_buf = write_bufs['/etc/sysconfig/network'] + expected_buf = ''' +# Created by cloud-init v. 0.7 +NETWORKING=yes +''' + self.assertCfgEquals(expected_buf, str(write_buf)) + self.assertEquals(write_buf.mode, 0644) diff --git a/tests/unittests/test_distros/test_user_data_normalize.py b/tests/unittests/test_distros/test_user_data_normalize.py new file mode 100644 index 00000000..8f0d8896 --- /dev/null +++ b/tests/unittests/test_distros/test_user_data_normalize.py @@ -0,0 +1,279 @@ +from mocker import MockerTestCase + +from cloudinit import distros +from cloudinit import helpers +from cloudinit import settings + +bcfg = { + 'name': 'bob', + 'plain_text_passwd': 'ubuntu', + 'home': "/home/ubuntu", + 'shell': "/bin/bash", + 'lock_passwd': True, + 'gecos': "Ubuntu", + 'groups': ["foo"] +} + + +class TestUGNormalize(MockerTestCase): + + def _make_distro(self, dtype, def_user=None): + cfg = dict(settings.CFG_BUILTIN) + cfg['system_info']['distro'] = dtype + paths = helpers.Paths(cfg['system_info']['paths']) + distro_cls = distros.fetch(dtype) + if def_user: + cfg['system_info']['default_user'] = def_user.copy() + distro = distro_cls(dtype, cfg['system_info'], paths) + return distro + + def _norm(self, cfg, distro): + return distros.normalize_users_groups(cfg, distro) + + def test_basic_groups(self): + distro = self._make_distro('ubuntu') + ug_cfg = { + 'groups': ['bob'], + } + (users, groups) = self._norm(ug_cfg, distro) + self.assertIn('bob', groups) + self.assertEquals({}, users) + + def test_csv_groups(self): + distro = self._make_distro('ubuntu') + ug_cfg = { + 'groups': 'bob,joe,steve', + } + (users, groups) = self._norm(ug_cfg, distro) + self.assertIn('bob', groups) + self.assertIn('joe', groups) + self.assertIn('steve', groups) + self.assertEquals({}, users) + + def test_more_groups(self): + distro = self._make_distro('ubuntu') + ug_cfg = { + 'groups': ['bob', 'joe', 'steve'] + } + (users, groups) = self._norm(ug_cfg, distro) + self.assertIn('bob', groups) + self.assertIn('joe', groups) + self.assertIn('steve', groups) + self.assertEquals({}, users) + + def test_member_groups(self): + distro = self._make_distro('ubuntu') + ug_cfg = { + 'groups': { + 'bob': ['s'], + 'joe': [], + 'steve': [], + } + } + (users, groups) = self._norm(ug_cfg, distro) + self.assertIn('bob', groups) + self.assertEquals(['s'], groups['bob']) + self.assertEquals([], groups['joe']) + self.assertIn('joe', groups) + self.assertIn('steve', groups) + self.assertEquals({}, users) + + def test_users_simple_dict(self): + distro = self._make_distro('ubuntu', bcfg) + ug_cfg = { + 'users': { + 'default': True, + } + } + (users, _groups) = self._norm(ug_cfg, distro) + self.assertIn('bob', users) + ug_cfg = { + 'users': { + 'default': 'yes', + } + } + (users, _groups) = self._norm(ug_cfg, distro) + self.assertIn('bob', users) + ug_cfg = { + 'users': { + 'default': '1', + } + } + (users, _groups) = self._norm(ug_cfg, distro) + self.assertIn('bob', users) + + def test_users_simple_dict_no(self): + distro = self._make_distro('ubuntu', bcfg) + ug_cfg = { + 'users': { + 'default': False, + } + } + (users, _groups) = self._norm(ug_cfg, distro) + self.assertEquals({}, users) + ug_cfg = { + 'users': { + 'default': 'no', + } + } + (users, _groups) = self._norm(ug_cfg, distro) + self.assertEquals({}, users) + + def test_users_simple_csv(self): + distro = self._make_distro('ubuntu') + ug_cfg = { + 'users': 'joe,bob', + } + (users, _groups) = self._norm(ug_cfg, distro) + self.assertIn('joe', users) + self.assertIn('bob', users) + self.assertEquals({'default': False}, users['joe']) + self.assertEquals({'default': False}, users['bob']) + + def test_users_simple(self): + distro = self._make_distro('ubuntu') + ug_cfg = { + 'users': [ + 'joe', + 'bob' + ], + } + (users, _groups) = self._norm(ug_cfg, distro) + self.assertIn('joe', users) + self.assertIn('bob', users) + self.assertEquals({'default': False}, users['joe']) + self.assertEquals({'default': False}, users['bob']) + + def test_users_old_user(self): + distro = self._make_distro('ubuntu', bcfg) + ug_cfg = { + 'user': 'zetta', + 'users': 'default' + } + (users, _groups) = self._norm(ug_cfg, distro) + self.assertIn('bob', users) + self.assertIn('zetta', users) + self.assertNotIn('default', users) + ug_cfg = { + 'user': 'zetta', + 'users': 'default, joe' + } + (users, _groups) = self._norm(ug_cfg, distro) + self.assertIn('bob', users) + self.assertIn('joe', users) + self.assertIn('zetta', users) + self.assertNotIn('default', users) + ug_cfg = { + 'user': 'zetta', + 'users': ['bob', 'joe'] + } + (users, _groups) = self._norm(ug_cfg, distro) + self.assertNotIn('bob', users) + self.assertIn('joe', users) + self.assertIn('zetta', users) + ug_cfg = { + 'user': 'zetta', + 'users': { + 'bob': True, + 'joe': True, + } + } + (users, _groups) = self._norm(ug_cfg, distro) + self.assertIn('bob', users) + self.assertIn('joe', users) + self.assertIn('zetta', users) + ug_cfg = { + 'user': 'zetta', + } + (users, _groups) = self._norm(ug_cfg, distro) + self.assertIn('zetta', users) + ug_cfg = {} + (users, groups) = self._norm(ug_cfg, distro) + self.assertEquals({}, users) + self.assertEquals({}, groups) + + def test_users_dict_default_additional(self): + distro = self._make_distro('ubuntu', bcfg) + ug_cfg = { + 'users': [ + {'name': 'default', 'blah': True} + ], + } + (users, _groups) = self._norm(ug_cfg, distro) + self.assertIn('bob', users) + self.assertEquals(",".join(distro.get_default_user()['groups']), + users['bob']['groups']) + self.assertEquals(True, + users['bob']['blah']) + self.assertEquals(True, + users['bob']['default']) + + def test_users_dict_extract(self): + distro = self._make_distro('ubuntu', bcfg) + ug_cfg = { + 'users': [ + 'default', + ], + } + (users, _groups) = self._norm(ug_cfg, distro) + self.assertIn('bob', users) + (name, config) = distros.extract_default(users) + self.assertEquals(name, 'bob') + expected_config = {} + def_config = None + try: + def_config = distro.get_default_user() + except NotImplementedError: + pass + if not def_config: + def_config = {} + expected_config.update(def_config) + + # Ignore these for now + expected_config.pop('name', None) + expected_config.pop('groups', None) + config.pop('groups', None) + self.assertEquals(config, expected_config) + + def test_users_dict_default(self): + distro = self._make_distro('ubuntu', bcfg) + ug_cfg = { + 'users': [ + 'default', + ], + } + (users, _groups) = self._norm(ug_cfg, distro) + self.assertIn('bob', users) + self.assertEquals(",".join(distro.get_default_user()['groups']), + users['bob']['groups']) + self.assertEquals(True, + users['bob']['default']) + + def test_users_dict_trans(self): + distro = self._make_distro('ubuntu') + ug_cfg = { + 'users': [ + {'name': 'joe', + 'tr-me': True}, + {'name': 'bob'}, + ], + } + (users, _groups) = self._norm(ug_cfg, distro) + self.assertIn('joe', users) + self.assertIn('bob', users) + self.assertEquals({'tr_me': True, 'default': False}, users['joe']) + self.assertEquals({'default': False}, users['bob']) + + def test_users_dict(self): + distro = self._make_distro('ubuntu') + ug_cfg = { + 'users': [ + {'name': 'joe'}, + {'name': 'bob'}, + ], + } + (users, _groups) = self._norm(ug_cfg, distro) + self.assertIn('joe', users) + self.assertIn('bob', users) + self.assertEquals({'default': False}, users['joe']) + self.assertEquals({'default': False}, users['bob']) diff --git a/tests/unittests/test_filters/test_launch_index.py b/tests/unittests/test_filters/test_launch_index.py index 7ca7cbb6..1e9b9053 100644 --- a/tests/unittests/test_filters/test_launch_index.py +++ b/tests/unittests/test_filters/test_launch_index.py @@ -1,6 +1,14 @@ import copy +import os +import sys -import helpers as th +top_dir = os.path.join(os.path.dirname(__file__), os.pardir, "helpers.py") +top_dir = os.path.abspath(top_dir) +if os.path.exists(top_dir): + sys.path.insert(0, os.path.dirname(top_dir)) + + +import helpers import itertools @@ -18,7 +26,7 @@ def count_messages(root): return am -class TestLaunchFilter(th.ResourceUsingTestCase): +class TestLaunchFilter(helpers.ResourceUsingTestCase): def assertCounts(self, message, expected_counts): orig_message = copy.deepcopy(message) diff --git a/tests/unittests/test_runs/test_simple_run.py b/tests/unittests/test_runs/test_simple_run.py new file mode 100644 index 00000000..1e852e1e --- /dev/null +++ b/tests/unittests/test_runs/test_simple_run.py @@ -0,0 +1,86 @@ +import os +import sys + +# Allow running this test individually +top_dir = os.path.join(os.path.dirname(__file__), os.pardir, "helpers.py") +top_dir = os.path.abspath(top_dir) +if os.path.exists(top_dir): + sys.path.insert(0, os.path.dirname(top_dir)) + + +import helpers + +from cloudinit.settings import (PER_INSTANCE) +from cloudinit import stages +from cloudinit import util + + +class TestSimpleRun(helpers.FilesystemMockingTestCase): + def _patchIn(self, root): + self.restore() + self.patchOS(root) + self.patchUtils(root) + + def _pp_root(self, root, repatch=True): + self.restore() + for (dirpath, dirnames, filenames) in os.walk(root): + print(dirpath) + for f in filenames: + joined = os.path.join(dirpath, f) + if os.path.islink(joined): + print("f %s - (symlink)" % (f)) + else: + print("f %s" % (f)) + for d in dirnames: + joined = os.path.join(dirpath, d) + if os.path.islink(joined): + print("d %s - (symlink)" % (d)) + else: + print("d %s" % (d)) + if repatch: + self._patchIn(root) + + def test_none_ds(self): + new_root = self.makeDir() + self.replicateTestRoot('simple_ubuntu', new_root) + cfg = { + 'datasource_list': ['None'], + 'write_files': [{ + 'path': '/etc/blah.ini', + 'content': 'blah', + 'permissions': 0755, + }], + 'cloud_init_modules': ['write-files'], + } + cloud_cfg = util.yaml_dumps(cfg) + util.ensure_dir(os.path.join(new_root, 'etc', 'cloud')) + util.write_file(os.path.join(new_root, 'etc', + 'cloud', 'cloud.cfg'), cloud_cfg) + self._patchIn(new_root) + + # Now start verifying whats created + initer = stages.Init() + initer.read_cfg() + initer.initialize() + self.assertTrue(os.path.exists("/var/lib/cloud")) + for d in ['scripts', 'seed', 'instances', 'handlers', 'sem', 'data']: + self.assertTrue(os.path.isdir(os.path.join("/var/lib/cloud", d))) + + initer.fetch() + iid = initer.instancify() + self.assertEquals(iid, 'iid-datasource-none') + initer.update() + self.assertTrue(os.path.islink("var/lib/cloud/instance")) + + initer.cloudify().run('consume_userdata', + initer.consume_userdata, + args=[PER_INSTANCE], + freq=PER_INSTANCE) + + mods = stages.Modules(initer) + (which_ran, failures) = mods.run_section('cloud_init_modules') + self.assertTrue(len(failures) == 0) + self.assertTrue(os.path.exists('/etc/blah.ini')) + self.assertIn('write-files', which_ran) + contents = util.load_file('/etc/blah.ini') + self.assertEquals(contents, 'blah') diff --git a/tests/unittests/test_util.py b/tests/unittests/test_util.py index 15fcbd26..96962b91 100644 --- a/tests/unittests/test_util.py +++ b/tests/unittests/test_util.py @@ -1,5 +1,6 @@ import os import stat +import yaml from mocker import MockerTestCase from unittest import TestCase @@ -268,4 +269,42 @@ class TestGetCmdline(TestCase): os.environ['DEBUG_PROC_CMDLINE'] = 'abcd 123' self.assertEqual(os.environ['DEBUG_PROC_CMDLINE'], util.get_cmdline()) + +class TestLoadYaml(TestCase): + mydefault = "7b03a8ebace993d806255121073fed52" + + def test_simple(self): + mydata = {'1': "one", '2': "two"} + self.assertEqual(util.load_yaml(yaml.dump(mydata)), mydata) + + def test_nonallowed_returns_default(self): + # for now, anything not in the allowed list just returns the default. + myyaml = yaml.dump({'1': "one"}) + self.assertEqual(util.load_yaml(blob=myyaml, + default=self.mydefault, + allowed=(str,)), + self.mydefault) + + def test_bogus_returns_default(self): + badyaml = "1\n 2:" + self.assertEqual(util.load_yaml(blob=badyaml, + default=self.mydefault), + self.mydefault) + + def test_unsafe_types(self): + # should not load complex types + unsafe_yaml = yaml.dump((1, 2, 3,)) + self.assertEqual(util.load_yaml(blob=unsafe_yaml, + default=self.mydefault), + self.mydefault) + + def test_python_unicode(self): + # complex type of python/unicde is explicitly allowed + myobj = {'1': unicode("FOOBAR")} + safe_yaml = yaml.dump(myobj) + self.assertEqual(util.load_yaml(blob=safe_yaml, + default=self.mydefault), + myobj) + + # vi: ts=4 expandtab |