summaryrefslogtreecommitdiff
path: root/tests/unittests
diff options
context:
space:
mode:
Diffstat (limited to 'tests/unittests')
-rw-r--r--tests/unittests/helpers.py104
-rw-r--r--tests/unittests/test_datasource/test_configdrive.py145
-rw-r--r--tests/unittests/test_distros/test_netconfig.py174
-rw-r--r--tests/unittests/test_distros/test_user_data_normalize.py279
-rw-r--r--tests/unittests/test_filters/test_launch_index.py12
-rw-r--r--tests/unittests/test_runs/test_simple_run.py86
-rw-r--r--tests/unittests/test_util.py39
7 files changed, 827 insertions, 12 deletions
diff --git a/tests/unittests/helpers.py b/tests/unittests/helpers.py
index d0f09e70..2c5dcad2 100644
--- a/tests/unittests/helpers.py
+++ b/tests/unittests/helpers.py
@@ -3,6 +3,38 @@ import os
from mocker import MockerTestCase
from cloudinit import helpers as ch
+from cloudinit import util
+
+import shutil
+
+
+# Makes the old path start
+# with new base instead of whatever
+# it previously had
+def rebase_path(old_path, new_base):
+ if old_path.startswith(new_base):
+ # Already handled...
+ return old_path
+ # Retarget the base of that path
+ # to the new base instead of the
+ # old one...
+ path = os.path.join(new_base, old_path.lstrip("/"))
+ path = os.path.abspath(path)
+ return path
+
+
+# Can work on anything that takes a path as arguments
+def retarget_many_wrapper(new_base, am, old_func):
+ def wrapper(*args, **kwds):
+ n_args = list(args)
+ nam = am
+ if am == -1:
+ nam = len(n_args)
+ for i in range(0, nam):
+ path = args[i]
+ n_args[i] = rebase_path(path, new_base)
+ return old_func(*n_args, **kwds)
+ return wrapper
class ResourceUsingTestCase(MockerTestCase):
@@ -40,3 +72,75 @@ class ResourceUsingTestCase(MockerTestCase):
'templates_dir': self.resourceLocation(),
})
return cp
+
+
+class FilesystemMockingTestCase(ResourceUsingTestCase):
+ def __init__(self, methodName="runTest"):
+ ResourceUsingTestCase.__init__(self, methodName)
+ self.patched_funcs = []
+
+ def replicateTestRoot(self, example_root, target_root):
+ real_root = self.resourceLocation()
+ real_root = os.path.join(real_root, 'roots', example_root)
+ for (dir_path, _dirnames, filenames) in os.walk(real_root):
+ real_path = dir_path
+ make_path = rebase_path(real_path[len(real_root):], target_root)
+ util.ensure_dir(make_path)
+ for f in filenames:
+ real_path = util.abs_join(real_path, f)
+ make_path = util.abs_join(make_path, f)
+ shutil.copy(real_path, make_path)
+
+ def tearDown(self):
+ self.restore()
+ ResourceUsingTestCase.tearDown(self)
+
+ def restore(self):
+ for (mod, f, func) in self.patched_funcs:
+ setattr(mod, f, func)
+ self.patched_funcs = []
+
+ def patchUtils(self, new_root):
+ patch_funcs = {
+ util: [('write_file', 1),
+ ('load_file', 1),
+ ('ensure_dir', 1),
+ ('chmod', 1),
+ ('delete_dir_contents', 1),
+ ('del_file', 1),
+ ('sym_link', -1)],
+ }
+ for (mod, funcs) in patch_funcs.items():
+ for (f, am) in funcs:
+ func = getattr(mod, f)
+ trap_func = retarget_many_wrapper(new_root, am, func)
+ setattr(mod, f, trap_func)
+ self.patched_funcs.append((mod, f, func))
+
+ # Handle subprocess calls
+ func = getattr(util, 'subp')
+
+ def nsubp(*_args, **_kwargs):
+ return ('', '')
+
+ setattr(util, 'subp', nsubp)
+ self.patched_funcs.append((util, 'subp', func))
+
+ def null_func(*_args, **_kwargs):
+ return None
+
+ for f in ['chownbyid', 'chownbyname']:
+ func = getattr(util, f)
+ setattr(util, f, null_func)
+ self.patched_funcs.append((util, f, func))
+
+ def patchOS(self, new_root):
+ patch_funcs = {
+ os.path: ['isfile', 'exists', 'islink', 'isdir'],
+ }
+ for (mod, funcs) in patch_funcs.items():
+ for f in funcs:
+ func = getattr(mod, f)
+ trap_func = retarget_many_wrapper(new_root, 1, func)
+ setattr(mod, f, trap_func)
+ self.patched_funcs.append((mod, f, func))
diff --git a/tests/unittests/test_datasource/test_configdrive.py b/tests/unittests/test_datasource/test_configdrive.py
index 55573114..00379e03 100644
--- a/tests/unittests/test_datasource/test_configdrive.py
+++ b/tests/unittests/test_datasource/test_configdrive.py
@@ -2,10 +2,12 @@ from copy import copy
import json
import os
import os.path
-import shutil
-import tempfile
-from unittest import TestCase
+import mocker
+from mocker import MockerTestCase
+
+from cloudinit import helpers
+from cloudinit import settings
from cloudinit.sources import DataSourceConfigDrive as ds
from cloudinit import util
@@ -60,17 +62,140 @@ CFG_DRIVE_FILES_V2 = {
'openstack/latest/user_data': USER_DATA}
-class TestConfigDriveDataSource(TestCase):
+class TestConfigDriveDataSource(MockerTestCase):
def setUp(self):
super(TestConfigDriveDataSource, self).setUp()
- self.tmp = tempfile.mkdtemp()
+ self.tmp = self.makeDir()
- def tearDown(self):
- try:
- shutil.rmtree(self.tmp)
- except OSError:
- pass
+ def test_ec2_metadata(self):
+ populate_dir(self.tmp, CFG_DRIVE_FILES_V2)
+ found = ds.read_config_drive_dir(self.tmp)
+ self.assertTrue('ec2-metadata' in found)
+ ec2_md = found['ec2-metadata']
+ self.assertEqual(EC2_META, ec2_md)
+
+ def test_dev_os_remap(self):
+ populate_dir(self.tmp, CFG_DRIVE_FILES_V2)
+ cfg_ds = ds.DataSourceConfigDrive(settings.CFG_BUILTIN,
+ None,
+ helpers.Paths({}))
+ found = ds.read_config_drive_dir(self.tmp)
+ cfg_ds.metadata = found['metadata']
+ name_tests = {
+ 'ami': '/dev/vda1',
+ 'root': '/dev/vda1',
+ 'ephemeral0': '/dev/vda2',
+ 'swap': '/dev/vda3',
+ }
+ for name, dev_name in name_tests.items():
+ my_mock = mocker.Mocker()
+ find_mock = my_mock.replace(util.find_devs_with,
+ spec=False, passthrough=False)
+ provided_name = dev_name[len('/dev/'):]
+ provided_name = "s" + provided_name[1:]
+ find_mock(mocker.ARGS)
+ my_mock.result([provided_name])
+ exists_mock = my_mock.replace(os.path.exists,
+ spec=False, passthrough=False)
+ exists_mock(mocker.ARGS)
+ my_mock.result(False)
+ exists_mock(mocker.ARGS)
+ my_mock.result(True)
+ my_mock.replay()
+ device = cfg_ds.device_name_to_device(name)
+ my_mock.restore()
+ self.assertEquals(dev_name, device)
+
+ def test_dev_os_map(self):
+ populate_dir(self.tmp, CFG_DRIVE_FILES_V2)
+ cfg_ds = ds.DataSourceConfigDrive(settings.CFG_BUILTIN,
+ None,
+ helpers.Paths({}))
+ found = ds.read_config_drive_dir(self.tmp)
+ os_md = found['metadata']
+ cfg_ds.metadata = os_md
+ name_tests = {
+ 'ami': '/dev/vda1',
+ 'root': '/dev/vda1',
+ 'ephemeral0': '/dev/vda2',
+ 'swap': '/dev/vda3',
+ }
+ for name, dev_name in name_tests.items():
+ my_mock = mocker.Mocker()
+ find_mock = my_mock.replace(util.find_devs_with,
+ spec=False, passthrough=False)
+ find_mock(mocker.ARGS)
+ my_mock.result([dev_name])
+ exists_mock = my_mock.replace(os.path.exists,
+ spec=False, passthrough=False)
+ exists_mock(mocker.ARGS)
+ my_mock.result(True)
+ my_mock.replay()
+ device = cfg_ds.device_name_to_device(name)
+ my_mock.restore()
+ self.assertEquals(dev_name, device)
+
+ def test_dev_ec2_remap(self):
+ populate_dir(self.tmp, CFG_DRIVE_FILES_V2)
+ cfg_ds = ds.DataSourceConfigDrive(settings.CFG_BUILTIN,
+ None,
+ helpers.Paths({}))
+ found = ds.read_config_drive_dir(self.tmp)
+ ec2_md = found['ec2-metadata']
+ os_md = found['metadata']
+ cfg_ds.ec2_metadata = ec2_md
+ cfg_ds.metadata = os_md
+ name_tests = {
+ 'ami': '/dev/vda1',
+ 'root': '/dev/vda1',
+ 'ephemeral0': '/dev/vda2',
+ 'swap': '/dev/vda3',
+ None: None,
+ 'bob': None,
+ 'root2k': None,
+ }
+ for name, dev_name in name_tests.items():
+ my_mock = mocker.Mocker()
+ exists_mock = my_mock.replace(os.path.exists,
+ spec=False, passthrough=False)
+ exists_mock(mocker.ARGS)
+ my_mock.result(False)
+ exists_mock(mocker.ARGS)
+ my_mock.result(True)
+ my_mock.replay()
+ device = cfg_ds.device_name_to_device(name)
+ self.assertEquals(dev_name, device)
+ my_mock.restore()
+
+ def test_dev_ec2_map(self):
+ populate_dir(self.tmp, CFG_DRIVE_FILES_V2)
+ cfg_ds = ds.DataSourceConfigDrive(settings.CFG_BUILTIN,
+ None,
+ helpers.Paths({}))
+ found = ds.read_config_drive_dir(self.tmp)
+ exists_mock = self.mocker.replace(os.path.exists,
+ spec=False, passthrough=False)
+ exists_mock(mocker.ARGS)
+ self.mocker.count(0, None)
+ self.mocker.result(True)
+ self.mocker.replay()
+ ec2_md = found['ec2-metadata']
+ os_md = found['metadata']
+ cfg_ds.ec2_metadata = ec2_md
+ cfg_ds.metadata = os_md
+ name_tests = {
+ 'ami': '/dev/sda1',
+ 'root': '/dev/sda1',
+ 'ephemeral0': '/dev/sda2',
+ 'swap': '/dev/sda3',
+ None: None,
+ 'bob': None,
+ 'root2k': None,
+ }
+ for name, dev_name in name_tests.items():
+ device = cfg_ds.device_name_to_device(name)
+ self.assertEquals(dev_name, device)
def test_dir_valid(self):
"""Verify a dir is read as such."""
diff --git a/tests/unittests/test_distros/test_netconfig.py b/tests/unittests/test_distros/test_netconfig.py
new file mode 100644
index 00000000..55765f0c
--- /dev/null
+++ b/tests/unittests/test_distros/test_netconfig.py
@@ -0,0 +1,174 @@
+from mocker import MockerTestCase
+
+import mocker
+
+import os
+
+from cloudinit import distros
+from cloudinit import helpers
+from cloudinit import settings
+from cloudinit import util
+
+from StringIO import StringIO
+
+
+BASE_NET_CFG = '''
+auto lo
+iface lo inet loopback
+
+auto eth0
+iface eth0 inet static
+ address 192.168.1.5
+ netmask 255.255.255.0
+ network 192.168.0.0
+ broadcast 192.168.1.0
+ gateway 192.168.1.254
+
+auto eth1
+iface eth1 inet dhcp
+'''
+
+
+class WriteBuffer(object):
+ def __init__(self):
+ self.buffer = StringIO()
+ self.mode = None
+ self.omode = None
+
+ def write(self, text):
+ self.buffer.write(text)
+
+ def __str__(self):
+ return self.buffer.getvalue()
+
+
+class TestNetCfgDistro(MockerTestCase):
+
+ def _get_distro(self, dname):
+ cls = distros.fetch(dname)
+ cfg = settings.CFG_BUILTIN
+ cfg['system_info']['distro'] = dname
+ paths = helpers.Paths({})
+ return cls(dname, cfg, paths)
+
+ def test_simple_write_ub(self):
+ ub_distro = self._get_distro('ubuntu')
+ util_mock = self.mocker.replace(util.write_file,
+ spec=False, passthrough=False)
+ exists_mock = self.mocker.replace(os.path.isfile,
+ spec=False, passthrough=False)
+
+ exists_mock(mocker.ARGS)
+ self.mocker.count(0, None)
+ self.mocker.result(False)
+
+ write_bufs = {}
+
+ def replace_write(filename, content, mode=0644, omode="wb"):
+ buf = WriteBuffer()
+ buf.mode = mode
+ buf.omode = omode
+ buf.write(content)
+ write_bufs[filename] = buf
+
+ util_mock(mocker.ARGS)
+ self.mocker.call(replace_write)
+ self.mocker.replay()
+ ub_distro.apply_network(BASE_NET_CFG, False)
+
+ self.assertEquals(len(write_bufs), 1)
+ self.assertIn('/etc/network/interfaces', write_bufs)
+ write_buf = write_bufs['/etc/network/interfaces']
+ self.assertEquals(str(write_buf).strip(), BASE_NET_CFG.strip())
+ self.assertEquals(write_buf.mode, 0644)
+
+ def assertCfgEquals(self, blob1, blob2):
+ cfg_tester = distros.rhel.QuotingConfigObj
+ b1 = dict(cfg_tester(blob1.strip().splitlines()))
+ b2 = dict(cfg_tester(blob2.strip().splitlines()))
+ self.assertEquals(b1, b2)
+ for (k, v) in b1.items():
+ self.assertIn(k, b2)
+ for (k, v) in b2.items():
+ self.assertIn(k, b1)
+ for (k, v) in b1.items():
+ self.assertEquals(v, b2[k])
+
+ def test_simple_write_rh(self):
+ rh_distro = self._get_distro('rhel')
+ write_mock = self.mocker.replace(util.write_file,
+ spec=False, passthrough=False)
+ load_mock = self.mocker.replace(util.load_file,
+ spec=False, passthrough=False)
+ exists_mock = self.mocker.replace(os.path.isfile,
+ spec=False, passthrough=False)
+
+ write_bufs = {}
+
+ def replace_write(filename, content, mode=0644, omode="wb"):
+ buf = WriteBuffer()
+ buf.mode = mode
+ buf.omode = omode
+ buf.write(content)
+ write_bufs[filename] = buf
+
+ exists_mock(mocker.ARGS)
+ self.mocker.count(0, None)
+ self.mocker.result(False)
+
+ load_mock(mocker.ARGS)
+ self.mocker.count(0, None)
+ self.mocker.result('')
+
+ for _i in range(0, 3):
+ write_mock(mocker.ARGS)
+ self.mocker.call(replace_write)
+
+ write_mock(mocker.ARGS)
+ self.mocker.call(replace_write)
+
+ self.mocker.replay()
+ rh_distro.apply_network(BASE_NET_CFG, False)
+
+ self.assertEquals(len(write_bufs), 4)
+ self.assertIn('/etc/sysconfig/network-scripts/ifcfg-lo', write_bufs)
+ write_buf = write_bufs['/etc/sysconfig/network-scripts/ifcfg-lo']
+ expected_buf = '''
+DEVICE="lo"
+ONBOOT=yes
+'''
+ self.assertCfgEquals(expected_buf, str(write_buf))
+ self.assertEquals(write_buf.mode, 0644)
+
+ self.assertIn('/etc/sysconfig/network-scripts/ifcfg-eth0', write_bufs)
+ write_buf = write_bufs['/etc/sysconfig/network-scripts/ifcfg-eth0']
+ expected_buf = '''
+DEVICE="eth0"
+BOOTPROTO="static"
+NETMASK="255.255.255.0"
+IPADDR="192.168.1.5"
+ONBOOT=yes
+GATEWAY="192.168.1.254"
+BROADCAST="192.168.1.0"
+'''
+ self.assertCfgEquals(expected_buf, str(write_buf))
+ self.assertEquals(write_buf.mode, 0644)
+
+ self.assertIn('/etc/sysconfig/network-scripts/ifcfg-eth1', write_bufs)
+ write_buf = write_bufs['/etc/sysconfig/network-scripts/ifcfg-eth1']
+ expected_buf = '''
+DEVICE="eth1"
+BOOTPROTO="dhcp"
+ONBOOT=yes
+'''
+ self.assertCfgEquals(expected_buf, str(write_buf))
+ self.assertEquals(write_buf.mode, 0644)
+
+ self.assertIn('/etc/sysconfig/network', write_bufs)
+ write_buf = write_bufs['/etc/sysconfig/network']
+ expected_buf = '''
+# Created by cloud-init v. 0.7
+NETWORKING=yes
+'''
+ self.assertCfgEquals(expected_buf, str(write_buf))
+ self.assertEquals(write_buf.mode, 0644)
diff --git a/tests/unittests/test_distros/test_user_data_normalize.py b/tests/unittests/test_distros/test_user_data_normalize.py
new file mode 100644
index 00000000..8f0d8896
--- /dev/null
+++ b/tests/unittests/test_distros/test_user_data_normalize.py
@@ -0,0 +1,279 @@
+from mocker import MockerTestCase
+
+from cloudinit import distros
+from cloudinit import helpers
+from cloudinit import settings
+
+bcfg = {
+ 'name': 'bob',
+ 'plain_text_passwd': 'ubuntu',
+ 'home': "/home/ubuntu",
+ 'shell': "/bin/bash",
+ 'lock_passwd': True,
+ 'gecos': "Ubuntu",
+ 'groups': ["foo"]
+}
+
+
+class TestUGNormalize(MockerTestCase):
+
+ def _make_distro(self, dtype, def_user=None):
+ cfg = dict(settings.CFG_BUILTIN)
+ cfg['system_info']['distro'] = dtype
+ paths = helpers.Paths(cfg['system_info']['paths'])
+ distro_cls = distros.fetch(dtype)
+ if def_user:
+ cfg['system_info']['default_user'] = def_user.copy()
+ distro = distro_cls(dtype, cfg['system_info'], paths)
+ return distro
+
+ def _norm(self, cfg, distro):
+ return distros.normalize_users_groups(cfg, distro)
+
+ def test_basic_groups(self):
+ distro = self._make_distro('ubuntu')
+ ug_cfg = {
+ 'groups': ['bob'],
+ }
+ (users, groups) = self._norm(ug_cfg, distro)
+ self.assertIn('bob', groups)
+ self.assertEquals({}, users)
+
+ def test_csv_groups(self):
+ distro = self._make_distro('ubuntu')
+ ug_cfg = {
+ 'groups': 'bob,joe,steve',
+ }
+ (users, groups) = self._norm(ug_cfg, distro)
+ self.assertIn('bob', groups)
+ self.assertIn('joe', groups)
+ self.assertIn('steve', groups)
+ self.assertEquals({}, users)
+
+ def test_more_groups(self):
+ distro = self._make_distro('ubuntu')
+ ug_cfg = {
+ 'groups': ['bob', 'joe', 'steve']
+ }
+ (users, groups) = self._norm(ug_cfg, distro)
+ self.assertIn('bob', groups)
+ self.assertIn('joe', groups)
+ self.assertIn('steve', groups)
+ self.assertEquals({}, users)
+
+ def test_member_groups(self):
+ distro = self._make_distro('ubuntu')
+ ug_cfg = {
+ 'groups': {
+ 'bob': ['s'],
+ 'joe': [],
+ 'steve': [],
+ }
+ }
+ (users, groups) = self._norm(ug_cfg, distro)
+ self.assertIn('bob', groups)
+ self.assertEquals(['s'], groups['bob'])
+ self.assertEquals([], groups['joe'])
+ self.assertIn('joe', groups)
+ self.assertIn('steve', groups)
+ self.assertEquals({}, users)
+
+ def test_users_simple_dict(self):
+ distro = self._make_distro('ubuntu', bcfg)
+ ug_cfg = {
+ 'users': {
+ 'default': True,
+ }
+ }
+ (users, _groups) = self._norm(ug_cfg, distro)
+ self.assertIn('bob', users)
+ ug_cfg = {
+ 'users': {
+ 'default': 'yes',
+ }
+ }
+ (users, _groups) = self._norm(ug_cfg, distro)
+ self.assertIn('bob', users)
+ ug_cfg = {
+ 'users': {
+ 'default': '1',
+ }
+ }
+ (users, _groups) = self._norm(ug_cfg, distro)
+ self.assertIn('bob', users)
+
+ def test_users_simple_dict_no(self):
+ distro = self._make_distro('ubuntu', bcfg)
+ ug_cfg = {
+ 'users': {
+ 'default': False,
+ }
+ }
+ (users, _groups) = self._norm(ug_cfg, distro)
+ self.assertEquals({}, users)
+ ug_cfg = {
+ 'users': {
+ 'default': 'no',
+ }
+ }
+ (users, _groups) = self._norm(ug_cfg, distro)
+ self.assertEquals({}, users)
+
+ def test_users_simple_csv(self):
+ distro = self._make_distro('ubuntu')
+ ug_cfg = {
+ 'users': 'joe,bob',
+ }
+ (users, _groups) = self._norm(ug_cfg, distro)
+ self.assertIn('joe', users)
+ self.assertIn('bob', users)
+ self.assertEquals({'default': False}, users['joe'])
+ self.assertEquals({'default': False}, users['bob'])
+
+ def test_users_simple(self):
+ distro = self._make_distro('ubuntu')
+ ug_cfg = {
+ 'users': [
+ 'joe',
+ 'bob'
+ ],
+ }
+ (users, _groups) = self._norm(ug_cfg, distro)
+ self.assertIn('joe', users)
+ self.assertIn('bob', users)
+ self.assertEquals({'default': False}, users['joe'])
+ self.assertEquals({'default': False}, users['bob'])
+
+ def test_users_old_user(self):
+ distro = self._make_distro('ubuntu', bcfg)
+ ug_cfg = {
+ 'user': 'zetta',
+ 'users': 'default'
+ }
+ (users, _groups) = self._norm(ug_cfg, distro)
+ self.assertIn('bob', users)
+ self.assertIn('zetta', users)
+ self.assertNotIn('default', users)
+ ug_cfg = {
+ 'user': 'zetta',
+ 'users': 'default, joe'
+ }
+ (users, _groups) = self._norm(ug_cfg, distro)
+ self.assertIn('bob', users)
+ self.assertIn('joe', users)
+ self.assertIn('zetta', users)
+ self.assertNotIn('default', users)
+ ug_cfg = {
+ 'user': 'zetta',
+ 'users': ['bob', 'joe']
+ }
+ (users, _groups) = self._norm(ug_cfg, distro)
+ self.assertNotIn('bob', users)
+ self.assertIn('joe', users)
+ self.assertIn('zetta', users)
+ ug_cfg = {
+ 'user': 'zetta',
+ 'users': {
+ 'bob': True,
+ 'joe': True,
+ }
+ }
+ (users, _groups) = self._norm(ug_cfg, distro)
+ self.assertIn('bob', users)
+ self.assertIn('joe', users)
+ self.assertIn('zetta', users)
+ ug_cfg = {
+ 'user': 'zetta',
+ }
+ (users, _groups) = self._norm(ug_cfg, distro)
+ self.assertIn('zetta', users)
+ ug_cfg = {}
+ (users, groups) = self._norm(ug_cfg, distro)
+ self.assertEquals({}, users)
+ self.assertEquals({}, groups)
+
+ def test_users_dict_default_additional(self):
+ distro = self._make_distro('ubuntu', bcfg)
+ ug_cfg = {
+ 'users': [
+ {'name': 'default', 'blah': True}
+ ],
+ }
+ (users, _groups) = self._norm(ug_cfg, distro)
+ self.assertIn('bob', users)
+ self.assertEquals(",".join(distro.get_default_user()['groups']),
+ users['bob']['groups'])
+ self.assertEquals(True,
+ users['bob']['blah'])
+ self.assertEquals(True,
+ users['bob']['default'])
+
+ def test_users_dict_extract(self):
+ distro = self._make_distro('ubuntu', bcfg)
+ ug_cfg = {
+ 'users': [
+ 'default',
+ ],
+ }
+ (users, _groups) = self._norm(ug_cfg, distro)
+ self.assertIn('bob', users)
+ (name, config) = distros.extract_default(users)
+ self.assertEquals(name, 'bob')
+ expected_config = {}
+ def_config = None
+ try:
+ def_config = distro.get_default_user()
+ except NotImplementedError:
+ pass
+ if not def_config:
+ def_config = {}
+ expected_config.update(def_config)
+
+ # Ignore these for now
+ expected_config.pop('name', None)
+ expected_config.pop('groups', None)
+ config.pop('groups', None)
+ self.assertEquals(config, expected_config)
+
+ def test_users_dict_default(self):
+ distro = self._make_distro('ubuntu', bcfg)
+ ug_cfg = {
+ 'users': [
+ 'default',
+ ],
+ }
+ (users, _groups) = self._norm(ug_cfg, distro)
+ self.assertIn('bob', users)
+ self.assertEquals(",".join(distro.get_default_user()['groups']),
+ users['bob']['groups'])
+ self.assertEquals(True,
+ users['bob']['default'])
+
+ def test_users_dict_trans(self):
+ distro = self._make_distro('ubuntu')
+ ug_cfg = {
+ 'users': [
+ {'name': 'joe',
+ 'tr-me': True},
+ {'name': 'bob'},
+ ],
+ }
+ (users, _groups) = self._norm(ug_cfg, distro)
+ self.assertIn('joe', users)
+ self.assertIn('bob', users)
+ self.assertEquals({'tr_me': True, 'default': False}, users['joe'])
+ self.assertEquals({'default': False}, users['bob'])
+
+ def test_users_dict(self):
+ distro = self._make_distro('ubuntu')
+ ug_cfg = {
+ 'users': [
+ {'name': 'joe'},
+ {'name': 'bob'},
+ ],
+ }
+ (users, _groups) = self._norm(ug_cfg, distro)
+ self.assertIn('joe', users)
+ self.assertIn('bob', users)
+ self.assertEquals({'default': False}, users['joe'])
+ self.assertEquals({'default': False}, users['bob'])
diff --git a/tests/unittests/test_filters/test_launch_index.py b/tests/unittests/test_filters/test_launch_index.py
index 7ca7cbb6..1e9b9053 100644
--- a/tests/unittests/test_filters/test_launch_index.py
+++ b/tests/unittests/test_filters/test_launch_index.py
@@ -1,6 +1,14 @@
import copy
+import os
+import sys
-import helpers as th
+top_dir = os.path.join(os.path.dirname(__file__), os.pardir, "helpers.py")
+top_dir = os.path.abspath(top_dir)
+if os.path.exists(top_dir):
+ sys.path.insert(0, os.path.dirname(top_dir))
+
+
+import helpers
import itertools
@@ -18,7 +26,7 @@ def count_messages(root):
return am
-class TestLaunchFilter(th.ResourceUsingTestCase):
+class TestLaunchFilter(helpers.ResourceUsingTestCase):
def assertCounts(self, message, expected_counts):
orig_message = copy.deepcopy(message)
diff --git a/tests/unittests/test_runs/test_simple_run.py b/tests/unittests/test_runs/test_simple_run.py
new file mode 100644
index 00000000..1e852e1e
--- /dev/null
+++ b/tests/unittests/test_runs/test_simple_run.py
@@ -0,0 +1,86 @@
+import os
+import sys
+
+# Allow running this test individually
+top_dir = os.path.join(os.path.dirname(__file__), os.pardir, "helpers.py")
+top_dir = os.path.abspath(top_dir)
+if os.path.exists(top_dir):
+ sys.path.insert(0, os.path.dirname(top_dir))
+
+
+import helpers
+
+from cloudinit.settings import (PER_INSTANCE)
+from cloudinit import stages
+from cloudinit import util
+
+
+class TestSimpleRun(helpers.FilesystemMockingTestCase):
+ def _patchIn(self, root):
+ self.restore()
+ self.patchOS(root)
+ self.patchUtils(root)
+
+ def _pp_root(self, root, repatch=True):
+ self.restore()
+ for (dirpath, dirnames, filenames) in os.walk(root):
+ print(dirpath)
+ for f in filenames:
+ joined = os.path.join(dirpath, f)
+ if os.path.islink(joined):
+ print("f %s - (symlink)" % (f))
+ else:
+ print("f %s" % (f))
+ for d in dirnames:
+ joined = os.path.join(dirpath, d)
+ if os.path.islink(joined):
+ print("d %s - (symlink)" % (d))
+ else:
+ print("d %s" % (d))
+ if repatch:
+ self._patchIn(root)
+
+ def test_none_ds(self):
+ new_root = self.makeDir()
+ self.replicateTestRoot('simple_ubuntu', new_root)
+ cfg = {
+ 'datasource_list': ['None'],
+ 'write_files': [{
+ 'path': '/etc/blah.ini',
+ 'content': 'blah',
+ 'permissions': 0755,
+ }],
+ 'cloud_init_modules': ['write-files'],
+ }
+ cloud_cfg = util.yaml_dumps(cfg)
+ util.ensure_dir(os.path.join(new_root, 'etc', 'cloud'))
+ util.write_file(os.path.join(new_root, 'etc',
+ 'cloud', 'cloud.cfg'), cloud_cfg)
+ self._patchIn(new_root)
+
+ # Now start verifying whats created
+ initer = stages.Init()
+ initer.read_cfg()
+ initer.initialize()
+ self.assertTrue(os.path.exists("/var/lib/cloud"))
+ for d in ['scripts', 'seed', 'instances', 'handlers', 'sem', 'data']:
+ self.assertTrue(os.path.isdir(os.path.join("/var/lib/cloud", d)))
+
+ initer.fetch()
+ iid = initer.instancify()
+ self.assertEquals(iid, 'iid-datasource-none')
+ initer.update()
+ self.assertTrue(os.path.islink("var/lib/cloud/instance"))
+
+ initer.cloudify().run('consume_userdata',
+ initer.consume_userdata,
+ args=[PER_INSTANCE],
+ freq=PER_INSTANCE)
+
+ mods = stages.Modules(initer)
+ (which_ran, failures) = mods.run_section('cloud_init_modules')
+ self.assertTrue(len(failures) == 0)
+ self.assertTrue(os.path.exists('/etc/blah.ini'))
+ self.assertIn('write-files', which_ran)
+ contents = util.load_file('/etc/blah.ini')
+ self.assertEquals(contents, 'blah')
diff --git a/tests/unittests/test_util.py b/tests/unittests/test_util.py
index 15fcbd26..96962b91 100644
--- a/tests/unittests/test_util.py
+++ b/tests/unittests/test_util.py
@@ -1,5 +1,6 @@
import os
import stat
+import yaml
from mocker import MockerTestCase
from unittest import TestCase
@@ -268,4 +269,42 @@ class TestGetCmdline(TestCase):
os.environ['DEBUG_PROC_CMDLINE'] = 'abcd 123'
self.assertEqual(os.environ['DEBUG_PROC_CMDLINE'], util.get_cmdline())
+
+class TestLoadYaml(TestCase):
+ mydefault = "7b03a8ebace993d806255121073fed52"
+
+ def test_simple(self):
+ mydata = {'1': "one", '2': "two"}
+ self.assertEqual(util.load_yaml(yaml.dump(mydata)), mydata)
+
+ def test_nonallowed_returns_default(self):
+ # for now, anything not in the allowed list just returns the default.
+ myyaml = yaml.dump({'1': "one"})
+ self.assertEqual(util.load_yaml(blob=myyaml,
+ default=self.mydefault,
+ allowed=(str,)),
+ self.mydefault)
+
+ def test_bogus_returns_default(self):
+ badyaml = "1\n 2:"
+ self.assertEqual(util.load_yaml(blob=badyaml,
+ default=self.mydefault),
+ self.mydefault)
+
+ def test_unsafe_types(self):
+ # should not load complex types
+ unsafe_yaml = yaml.dump((1, 2, 3,))
+ self.assertEqual(util.load_yaml(blob=unsafe_yaml,
+ default=self.mydefault),
+ self.mydefault)
+
+ def test_python_unicode(self):
+ # complex type of python/unicde is explicitly allowed
+ myobj = {'1': unicode("FOOBAR")}
+ safe_yaml = yaml.dump(myobj)
+ self.assertEqual(util.load_yaml(blob=safe_yaml,
+ default=self.mydefault),
+ myobj)
+
+
# vi: ts=4 expandtab