summaryrefslogtreecommitdiff
path: root/tests/unittests
diff options
context:
space:
mode:
Diffstat (limited to 'tests/unittests')
-rw-r--r--tests/unittests/__init__.py0
-rw-r--r--tests/unittests/helpers.py128
-rw-r--r--tests/unittests/test_builtin_handlers.py9
-rw-r--r--tests/unittests/test_datasource/__init__.py0
-rw-r--r--tests/unittests/test_datasource/test_configdrive.py171
-rw-r--r--tests/unittests/test_distros/__init__.py0
-rw-r--r--tests/unittests/test_distros/test_generic.py85
-rw-r--r--tests/unittests/test_distros/test_hostname.py38
-rw-r--r--tests/unittests/test_distros/test_hosts.py41
-rw-r--r--tests/unittests/test_distros/test_netconfig.py175
-rw-r--r--tests/unittests/test_distros/test_resolv.py61
-rw-r--r--tests/unittests/test_distros/test_sysconfig.py82
-rw-r--r--tests/unittests/test_distros/test_user_data_normalize.py301
-rw-r--r--tests/unittests/test_filters/__init__.py0
-rw-r--r--tests/unittests/test_filters/test_launch_index.py4
-rw-r--r--tests/unittests/test_handler/__init__.py0
-rw-r--r--tests/unittests/test_handler/test_handler_ca_certs.py68
-rw-r--r--tests/unittests/test_handler/test_handler_power_state.py88
-rw-r--r--tests/unittests/test_handler/test_handler_set_hostname.py58
-rw-r--r--tests/unittests/test_handler/test_handler_yum_add_repo.py68
-rw-r--r--tests/unittests/test_merging.py62
-rw-r--r--tests/unittests/test_runs/__init__.py0
-rw-r--r--tests/unittests/test_runs/test_merge_run.py52
-rw-r--r--tests/unittests/test_runs/test_simple_run.py80
-rw-r--r--tests/unittests/test_util.py98
25 files changed, 1579 insertions, 90 deletions
diff --git a/tests/unittests/__init__.py b/tests/unittests/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/tests/unittests/__init__.py
diff --git a/tests/unittests/helpers.py b/tests/unittests/helpers.py
index d0f09e70..92540b0c 100644
--- a/tests/unittests/helpers.py
+++ b/tests/unittests/helpers.py
@@ -1,8 +1,63 @@
import os
+import sys
+import unittest
from mocker import MockerTestCase
from cloudinit import helpers as ch
+from cloudinit import util
+
+import shutil
+
+# Handle how 2.6 doesn't have the assertIn or assertNotIn
+_PY_VER = sys.version_info
+_PY_MAJOR, _PY_MINOR = _PY_VER[0:2]
+if (_PY_MAJOR, _PY_MINOR) <= (2, 6):
+ # For now add these on, taken from python 2.7 + slightly adjusted
+ class TestCase(unittest.TestCase):
+ def assertIn(self, member, container, msg=None):
+ if member not in container:
+ standardMsg = '%r not found in %r' % (member, container)
+ self.fail(self._formatMessage(msg, standardMsg))
+
+ def assertNotIn(self, member, container, msg=None):
+ if member in container:
+ standardMsg = '%r unexpectedly found in %r'
+ standardMsg = standardMsg % (member, container)
+ self.fail(self._formatMessage(msg, standardMsg))
+
+else:
+ class TestCase(unittest.TestCase):
+ pass
+
+
+# Makes the old path start
+# with new base instead of whatever
+# it previously had
+def rebase_path(old_path, new_base):
+ if old_path.startswith(new_base):
+ # Already handled...
+ return old_path
+ # Retarget the base of that path
+ # to the new base instead of the
+ # old one...
+ path = os.path.join(new_base, old_path.lstrip("/"))
+ path = os.path.abspath(path)
+ return path
+
+
+# Can work on anything that takes a path as arguments
+def retarget_many_wrapper(new_base, am, old_func):
+ def wrapper(*args, **kwds):
+ n_args = list(args)
+ nam = am
+ if am == -1:
+ nam = len(n_args)
+ for i in range(0, nam):
+ path = args[i]
+ n_args[i] = rebase_path(path, new_base)
+ return old_func(*n_args, **kwds)
+ return wrapper
class ResourceUsingTestCase(MockerTestCase):
@@ -40,3 +95,76 @@ class ResourceUsingTestCase(MockerTestCase):
'templates_dir': self.resourceLocation(),
})
return cp
+
+
+class FilesystemMockingTestCase(ResourceUsingTestCase):
+ def __init__(self, methodName="runTest"):
+ ResourceUsingTestCase.__init__(self, methodName)
+ self.patched_funcs = []
+
+ def replicateTestRoot(self, example_root, target_root):
+ real_root = self.resourceLocation()
+ real_root = os.path.join(real_root, 'roots', example_root)
+ for (dir_path, _dirnames, filenames) in os.walk(real_root):
+ real_path = dir_path
+ make_path = rebase_path(real_path[len(real_root):], target_root)
+ util.ensure_dir(make_path)
+ for f in filenames:
+ real_path = util.abs_join(real_path, f)
+ make_path = util.abs_join(make_path, f)
+ shutil.copy(real_path, make_path)
+
+ def tearDown(self):
+ self.restore()
+ ResourceUsingTestCase.tearDown(self)
+
+ def restore(self):
+ for (mod, f, func) in self.patched_funcs:
+ setattr(mod, f, func)
+ self.patched_funcs = []
+
+ def patchUtils(self, new_root):
+ patch_funcs = {
+ util: [('write_file', 1),
+ ('append_file', 1),
+ ('load_file', 1),
+ ('ensure_dir', 1),
+ ('chmod', 1),
+ ('delete_dir_contents', 1),
+ ('del_file', 1),
+ ('sym_link', -1)],
+ }
+ for (mod, funcs) in patch_funcs.items():
+ for (f, am) in funcs:
+ func = getattr(mod, f)
+ trap_func = retarget_many_wrapper(new_root, am, func)
+ setattr(mod, f, trap_func)
+ self.patched_funcs.append((mod, f, func))
+
+ # Handle subprocess calls
+ func = getattr(util, 'subp')
+
+ def nsubp(*_args, **_kwargs):
+ return ('', '')
+
+ setattr(util, 'subp', nsubp)
+ self.patched_funcs.append((util, 'subp', func))
+
+ def null_func(*_args, **_kwargs):
+ return None
+
+ for f in ['chownbyid', 'chownbyname']:
+ func = getattr(util, f)
+ setattr(util, f, null_func)
+ self.patched_funcs.append((util, f, func))
+
+ def patchOS(self, new_root):
+ patch_funcs = {
+ os.path: ['isfile', 'exists', 'islink', 'isdir'],
+ }
+ for (mod, funcs) in patch_funcs.items():
+ for f in funcs:
+ func = getattr(mod, f)
+ trap_func = retarget_many_wrapper(new_root, 1, func)
+ setattr(mod, f, trap_func)
+ self.patched_funcs.append((mod, f, func))
diff --git a/tests/unittests/test_builtin_handlers.py b/tests/unittests/test_builtin_handlers.py
index ebc0bd51..5f41cb3d 100644
--- a/tests/unittests/test_builtin_handlers.py
+++ b/tests/unittests/test_builtin_handlers.py
@@ -6,6 +6,7 @@ from mocker import MockerTestCase
from cloudinit import handlers
from cloudinit import helpers
+from cloudinit import util
from cloudinit.handlers import upstart_job
@@ -34,6 +35,7 @@ class TestBuiltins(MockerTestCase):
self.assertEquals(0, len(os.listdir(up_root)))
def test_upstart_frequency_single(self):
+ # files should be written out when frequency is ! per-instance
c_root = self.makeDir()
up_root = self.makeDir()
paths = helpers.Paths({
@@ -41,9 +43,12 @@ class TestBuiltins(MockerTestCase):
'upstart_dir': up_root,
})
freq = PER_INSTANCE
+
+ mock_subp = self.mocker.replace(util.subp, passthrough=False)
+ mock_subp(["initctl", "reload-configuration"], capture=False)
+ self.mocker.replay()
+
h = upstart_job.UpstartJobPartHandler(paths)
- # No files should be written out when
- # the frequency is ! per-instance
h.handle_part('', handlers.CONTENT_START,
None, None, None)
h.handle_part('blah', 'text/upstart-job',
diff --git a/tests/unittests/test_datasource/__init__.py b/tests/unittests/test_datasource/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/tests/unittests/test_datasource/__init__.py
diff --git a/tests/unittests/test_datasource/test_configdrive.py b/tests/unittests/test_datasource/test_configdrive.py
index 55573114..aa5b98ed 100644
--- a/tests/unittests/test_datasource/test_configdrive.py
+++ b/tests/unittests/test_datasource/test_configdrive.py
@@ -2,10 +2,12 @@ from copy import copy
import json
import os
import os.path
-import shutil
-import tempfile
-from unittest import TestCase
+import mocker
+from mocker import MockerTestCase
+
+from cloudinit import helpers
+from cloudinit import settings
from cloudinit.sources import DataSourceConfigDrive as ds
from cloudinit import util
@@ -60,17 +62,140 @@ CFG_DRIVE_FILES_V2 = {
'openstack/latest/user_data': USER_DATA}
-class TestConfigDriveDataSource(TestCase):
+class TestConfigDriveDataSource(MockerTestCase):
def setUp(self):
super(TestConfigDriveDataSource, self).setUp()
- self.tmp = tempfile.mkdtemp()
+ self.tmp = self.makeDir()
- def tearDown(self):
- try:
- shutil.rmtree(self.tmp)
- except OSError:
- pass
+ def test_ec2_metadata(self):
+ populate_dir(self.tmp, CFG_DRIVE_FILES_V2)
+ found = ds.read_config_drive_dir(self.tmp)
+ self.assertTrue('ec2-metadata' in found)
+ ec2_md = found['ec2-metadata']
+ self.assertEqual(EC2_META, ec2_md)
+
+ def test_dev_os_remap(self):
+ populate_dir(self.tmp, CFG_DRIVE_FILES_V2)
+ cfg_ds = ds.DataSourceConfigDrive(settings.CFG_BUILTIN,
+ None,
+ helpers.Paths({}))
+ found = ds.read_config_drive_dir(self.tmp)
+ cfg_ds.metadata = found['metadata']
+ name_tests = {
+ 'ami': '/dev/vda1',
+ 'root': '/dev/vda1',
+ 'ephemeral0': '/dev/vda2',
+ 'swap': '/dev/vda3',
+ }
+ for name, dev_name in name_tests.items():
+ my_mock = mocker.Mocker()
+ find_mock = my_mock.replace(util.find_devs_with,
+ spec=False, passthrough=False)
+ provided_name = dev_name[len('/dev/'):]
+ provided_name = "s" + provided_name[1:]
+ find_mock(mocker.ARGS)
+ my_mock.result([provided_name])
+ exists_mock = my_mock.replace(os.path.exists,
+ spec=False, passthrough=False)
+ exists_mock(mocker.ARGS)
+ my_mock.result(False)
+ exists_mock(mocker.ARGS)
+ my_mock.result(True)
+ my_mock.replay()
+ device = cfg_ds.device_name_to_device(name)
+ my_mock.restore()
+ self.assertEquals(dev_name, device)
+
+ def test_dev_os_map(self):
+ populate_dir(self.tmp, CFG_DRIVE_FILES_V2)
+ cfg_ds = ds.DataSourceConfigDrive(settings.CFG_BUILTIN,
+ None,
+ helpers.Paths({}))
+ found = ds.read_config_drive_dir(self.tmp)
+ os_md = found['metadata']
+ cfg_ds.metadata = os_md
+ name_tests = {
+ 'ami': '/dev/vda1',
+ 'root': '/dev/vda1',
+ 'ephemeral0': '/dev/vda2',
+ 'swap': '/dev/vda3',
+ }
+ for name, dev_name in name_tests.items():
+ my_mock = mocker.Mocker()
+ find_mock = my_mock.replace(util.find_devs_with,
+ spec=False, passthrough=False)
+ find_mock(mocker.ARGS)
+ my_mock.result([dev_name])
+ exists_mock = my_mock.replace(os.path.exists,
+ spec=False, passthrough=False)
+ exists_mock(mocker.ARGS)
+ my_mock.result(True)
+ my_mock.replay()
+ device = cfg_ds.device_name_to_device(name)
+ my_mock.restore()
+ self.assertEquals(dev_name, device)
+
+ def test_dev_ec2_remap(self):
+ populate_dir(self.tmp, CFG_DRIVE_FILES_V2)
+ cfg_ds = ds.DataSourceConfigDrive(settings.CFG_BUILTIN,
+ None,
+ helpers.Paths({}))
+ found = ds.read_config_drive_dir(self.tmp)
+ ec2_md = found['ec2-metadata']
+ os_md = found['metadata']
+ cfg_ds.ec2_metadata = ec2_md
+ cfg_ds.metadata = os_md
+ name_tests = {
+ 'ami': '/dev/vda1',
+ 'root': '/dev/vda1',
+ 'ephemeral0': '/dev/vda2',
+ 'swap': '/dev/vda3',
+ None: None,
+ 'bob': None,
+ 'root2k': None,
+ }
+ for name, dev_name in name_tests.items():
+ my_mock = mocker.Mocker()
+ exists_mock = my_mock.replace(os.path.exists,
+ spec=False, passthrough=False)
+ exists_mock(mocker.ARGS)
+ my_mock.result(False)
+ exists_mock(mocker.ARGS)
+ my_mock.result(True)
+ my_mock.replay()
+ device = cfg_ds.device_name_to_device(name)
+ self.assertEquals(dev_name, device)
+ my_mock.restore()
+
+ def test_dev_ec2_map(self):
+ populate_dir(self.tmp, CFG_DRIVE_FILES_V2)
+ cfg_ds = ds.DataSourceConfigDrive(settings.CFG_BUILTIN,
+ None,
+ helpers.Paths({}))
+ found = ds.read_config_drive_dir(self.tmp)
+ exists_mock = self.mocker.replace(os.path.exists,
+ spec=False, passthrough=False)
+ exists_mock(mocker.ARGS)
+ self.mocker.count(0, None)
+ self.mocker.result(True)
+ self.mocker.replay()
+ ec2_md = found['ec2-metadata']
+ os_md = found['metadata']
+ cfg_ds.ec2_metadata = ec2_md
+ cfg_ds.metadata = os_md
+ name_tests = {
+ 'ami': '/dev/sda1',
+ 'root': '/dev/sda1',
+ 'ephemeral0': '/dev/sda2',
+ 'swap': '/dev/sda3',
+ None: None,
+ 'bob': None,
+ 'root2k': None,
+ }
+ for name, dev_name in name_tests.items():
+ device = cfg_ds.device_name_to_device(name)
+ self.assertEquals(dev_name, device)
def test_dir_valid(self):
"""Verify a dir is read as such."""
@@ -163,6 +288,32 @@ class TestConfigDriveDataSource(TestCase):
finally:
util.find_devs_with = orig_find_devs_with
+ def test_pubkeys_v2(self):
+ """Verify that public-keys work in config-drive-v2."""
+ populate_dir(self.tmp, CFG_DRIVE_FILES_V2)
+ myds = cfg_ds_from_dir(self.tmp)
+ self.assertEqual(myds.get_public_ssh_keys(),
+ [OSTACK_META['public_keys']['mykey']])
+
+
+def cfg_ds_from_dir(seed_d):
+ found = ds.read_config_drive_dir(seed_d)
+ cfg_ds = ds.DataSourceConfigDrive(settings.CFG_BUILTIN, None,
+ helpers.Paths({}))
+ populate_ds_from_read_config(cfg_ds, seed_d, found)
+ return cfg_ds
+
+
+def populate_ds_from_read_config(cfg_ds, source, results):
+ """Patch the DataSourceConfigDrive from the results of
+ read_config_drive_dir hopefully in line with what it would have
+ if cfg_ds.get_data had been successfully called"""
+ cfg_ds.source = source
+ cfg_ds.metadata = results.get('metadata')
+ cfg_ds.ec2_metadata = results.get('ec2-metadata')
+ cfg_ds.userdata_raw = results.get('userdata')
+ cfg_ds.version = results.get('cfgdrive_ver')
+
def populate_dir(seed_dir, files):
for (name, content) in files.iteritems():
diff --git a/tests/unittests/test_distros/__init__.py b/tests/unittests/test_distros/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/tests/unittests/test_distros/__init__.py
diff --git a/tests/unittests/test_distros/test_generic.py b/tests/unittests/test_distros/test_generic.py
index 2df4c2f0..7befb8c8 100644
--- a/tests/unittests/test_distros/test_generic.py
+++ b/tests/unittests/test_distros/test_generic.py
@@ -1,6 +1,9 @@
-from mocker import MockerTestCase
-
from cloudinit import distros
+from cloudinit import util
+
+from tests.unittests import helpers
+
+import os
unknown_arch_info = {
'arches': ['default'],
@@ -27,7 +30,7 @@ gpmi = distros._get_package_mirror_info # pylint: disable=W0212
gapmi = distros._get_arch_package_mirror_info # pylint: disable=W0212
-class TestGenericDistro(MockerTestCase):
+class TestGenericDistro(helpers.FilesystemMockingTestCase):
def return_first(self, mlist):
if not mlist:
@@ -52,6 +55,82 @@ class TestGenericDistro(MockerTestCase):
# Make a temp directoy for tests to use.
self.tmp = self.makeDir()
+ def _write_load_sudoers(self, _user, rules):
+ cls = distros.fetch("ubuntu")
+ d = cls("ubuntu", {}, None)
+ os.makedirs(os.path.join(self.tmp, "etc"))
+ os.makedirs(os.path.join(self.tmp, "etc", 'sudoers.d'))
+ self.patchOS(self.tmp)
+ self.patchUtils(self.tmp)
+ d.write_sudo_rules("harlowja", rules)
+ contents = util.load_file(d.ci_sudoers_fn)
+ self.restore()
+ return contents
+
+ def _count_in(self, lines_look_for, text_content):
+ found_amount = 0
+ for e in lines_look_for:
+ for line in text_content.splitlines():
+ line = line.strip()
+ if line == e:
+ found_amount += 1
+ return found_amount
+
+ def test_sudoers_ensure_rules(self):
+ rules = 'ALL=(ALL:ALL) ALL'
+ contents = self._write_load_sudoers('harlowja', rules)
+ expected = ['harlowja ALL=(ALL:ALL) ALL']
+ self.assertEquals(len(expected), self._count_in(expected, contents))
+ not_expected = [
+ 'harlowja A',
+ 'harlowja L',
+ 'harlowja L',
+ ]
+ self.assertEquals(0, self._count_in(not_expected, contents))
+
+ def test_sudoers_ensure_rules_list(self):
+ rules = [
+ 'ALL=(ALL:ALL) ALL',
+ 'B-ALL=(ALL:ALL) ALL',
+ 'C-ALL=(ALL:ALL) ALL',
+ ]
+ contents = self._write_load_sudoers('harlowja', rules)
+ expected = [
+ 'harlowja ALL=(ALL:ALL) ALL',
+ 'harlowja B-ALL=(ALL:ALL) ALL',
+ 'harlowja C-ALL=(ALL:ALL) ALL',
+ ]
+ self.assertEquals(len(expected), self._count_in(expected, contents))
+ not_expected = [
+ 'harlowja A',
+ 'harlowja L',
+ 'harlowja L',
+ ]
+ self.assertEquals(0, self._count_in(not_expected, contents))
+
+ def test_sudoers_ensure_new(self):
+ cls = distros.fetch("ubuntu")
+ d = cls("ubuntu", {}, None)
+ self.patchOS(self.tmp)
+ self.patchUtils(self.tmp)
+ d.ensure_sudo_dir("/b")
+ contents = util.load_file("/etc/sudoers")
+ self.assertIn("includedir /b", contents)
+ self.assertTrue(os.path.isdir("/b"))
+
+ def test_sudoers_ensure_append(self):
+ cls = distros.fetch("ubuntu")
+ d = cls("ubuntu", {}, None)
+ self.patchOS(self.tmp)
+ self.patchUtils(self.tmp)
+ util.write_file("/etc/sudoers", "josh, josh\n")
+ d.ensure_sudo_dir("/b")
+ contents = util.load_file("/etc/sudoers")
+ self.assertIn("includedir /b", contents)
+ self.assertTrue(os.path.isdir("/b"))
+ self.assertIn("josh", contents)
+ self.assertEquals(2, contents.count("josh"))
+
def test_arch_package_mirror_info_unknown(self):
"""for an unknown arch, we should get back that with arch 'default'."""
arch_mirrors = gapmi(package_mirrors, arch="unknown")
diff --git a/tests/unittests/test_distros/test_hostname.py b/tests/unittests/test_distros/test_hostname.py
new file mode 100644
index 00000000..8e644f4d
--- /dev/null
+++ b/tests/unittests/test_distros/test_hostname.py
@@ -0,0 +1,38 @@
+from mocker import MockerTestCase
+
+from cloudinit.distros.parsers import hostname
+
+
+BASE_HOSTNAME = '''
+# My super-duper-hostname
+
+blahblah
+
+'''
+BASE_HOSTNAME = BASE_HOSTNAME.strip()
+
+
+class TestHostnameHelper(MockerTestCase):
+ def test_parse_same(self):
+ hn = hostname.HostnameConf(BASE_HOSTNAME)
+ self.assertEquals(str(hn).strip(), BASE_HOSTNAME)
+ self.assertEquals(hn.hostname, 'blahblah')
+
+ def test_no_adjust_hostname(self):
+ hn = hostname.HostnameConf(BASE_HOSTNAME)
+ prev_name = hn.hostname
+ hn.set_hostname("")
+ self.assertEquals(hn.hostname, prev_name)
+
+ def test_adjust_hostname(self):
+ hn = hostname.HostnameConf(BASE_HOSTNAME)
+ prev_name = hn.hostname
+ self.assertEquals(prev_name, 'blahblah')
+ hn.set_hostname("bbbbd")
+ self.assertEquals(hn.hostname, 'bbbbd')
+ expected_out = '''
+# My super-duper-hostname
+
+bbbbd
+'''
+ self.assertEquals(str(hn).strip(), expected_out.strip())
diff --git a/tests/unittests/test_distros/test_hosts.py b/tests/unittests/test_distros/test_hosts.py
new file mode 100644
index 00000000..687a0dab
--- /dev/null
+++ b/tests/unittests/test_distros/test_hosts.py
@@ -0,0 +1,41 @@
+from mocker import MockerTestCase
+
+from cloudinit.distros.parsers import hosts
+
+
+BASE_ETC = '''
+# Example
+127.0.0.1 localhost
+192.168.1.10 foo.mydomain.org foo
+192.168.1.10 bar.mydomain.org bar
+146.82.138.7 master.debian.org master
+209.237.226.90 www.opensource.org
+'''
+BASE_ETC = BASE_ETC.strip()
+
+
+class TestHostsHelper(MockerTestCase):
+ def test_parse(self):
+ eh = hosts.HostsConf(BASE_ETC)
+ self.assertEquals(eh.get_entry('127.0.0.1'), [['localhost']])
+ self.assertEquals(eh.get_entry('192.168.1.10'),
+ [['foo.mydomain.org', 'foo'],
+ ['bar.mydomain.org', 'bar']])
+ eh = str(eh)
+ self.assertTrue(eh.startswith('# Example'))
+
+ def test_add(self):
+ eh = hosts.HostsConf(BASE_ETC)
+ eh.add_entry('127.0.0.0', 'blah')
+ self.assertEquals(eh.get_entry('127.0.0.0'), [['blah']])
+ eh.add_entry('127.0.0.3', 'blah', 'blah2', 'blah3')
+ self.assertEquals(eh.get_entry('127.0.0.3'),
+ [['blah', 'blah2', 'blah3']])
+
+ def test_del(self):
+ eh = hosts.HostsConf(BASE_ETC)
+ eh.add_entry('127.0.0.0', 'blah')
+ self.assertEquals(eh.get_entry('127.0.0.0'), [['blah']])
+
+ eh.del_entries('127.0.0.0')
+ self.assertEquals(eh.get_entry('127.0.0.0'), [])
diff --git a/tests/unittests/test_distros/test_netconfig.py b/tests/unittests/test_distros/test_netconfig.py
new file mode 100644
index 00000000..9763b14b
--- /dev/null
+++ b/tests/unittests/test_distros/test_netconfig.py
@@ -0,0 +1,175 @@
+from mocker import MockerTestCase
+
+import mocker
+
+import os
+
+from cloudinit import distros
+from cloudinit import helpers
+from cloudinit import settings
+from cloudinit import util
+
+from cloudinit.distros.parsers.sys_conf import SysConf
+
+from StringIO import StringIO
+
+
+BASE_NET_CFG = '''
+auto lo
+iface lo inet loopback
+
+auto eth0
+iface eth0 inet static
+ address 192.168.1.5
+ netmask 255.255.255.0
+ network 192.168.0.0
+ broadcast 192.168.1.0
+ gateway 192.168.1.254
+
+auto eth1
+iface eth1 inet dhcp
+'''
+
+
+class WriteBuffer(object):
+ def __init__(self):
+ self.buffer = StringIO()
+ self.mode = None
+ self.omode = None
+
+ def write(self, text):
+ self.buffer.write(text)
+
+ def __str__(self):
+ return self.buffer.getvalue()
+
+
+class TestNetCfgDistro(MockerTestCase):
+
+ def _get_distro(self, dname):
+ cls = distros.fetch(dname)
+ cfg = settings.CFG_BUILTIN
+ cfg['system_info']['distro'] = dname
+ paths = helpers.Paths({})
+ return cls(dname, cfg, paths)
+
+ def test_simple_write_ub(self):
+ ub_distro = self._get_distro('ubuntu')
+ util_mock = self.mocker.replace(util.write_file,
+ spec=False, passthrough=False)
+ exists_mock = self.mocker.replace(os.path.isfile,
+ spec=False, passthrough=False)
+
+ exists_mock(mocker.ARGS)
+ self.mocker.count(0, None)
+ self.mocker.result(False)
+
+ write_bufs = {}
+
+ def replace_write(filename, content, mode=0644, omode="wb"):
+ buf = WriteBuffer()
+ buf.mode = mode
+ buf.omode = omode
+ buf.write(content)
+ write_bufs[filename] = buf
+
+ util_mock(mocker.ARGS)
+ self.mocker.call(replace_write)
+ self.mocker.replay()
+ ub_distro.apply_network(BASE_NET_CFG, False)
+
+ self.assertEquals(len(write_bufs), 1)
+ self.assertIn('/etc/network/interfaces', write_bufs)
+ write_buf = write_bufs['/etc/network/interfaces']
+ self.assertEquals(str(write_buf).strip(), BASE_NET_CFG.strip())
+ self.assertEquals(write_buf.mode, 0644)
+
+ def assertCfgEquals(self, blob1, blob2):
+ b1 = dict(SysConf(blob1.strip().splitlines()))
+ b2 = dict(SysConf(blob2.strip().splitlines()))
+ self.assertEquals(b1, b2)
+ for (k, v) in b1.items():
+ self.assertIn(k, b2)
+ for (k, v) in b2.items():
+ self.assertIn(k, b1)
+ for (k, v) in b1.items():
+ self.assertEquals(v, b2[k])
+
+ def test_simple_write_rh(self):
+ rh_distro = self._get_distro('rhel')
+ write_mock = self.mocker.replace(util.write_file,
+ spec=False, passthrough=False)
+ load_mock = self.mocker.replace(util.load_file,
+ spec=False, passthrough=False)
+ exists_mock = self.mocker.replace(os.path.isfile,
+ spec=False, passthrough=False)
+
+ write_bufs = {}
+
+ def replace_write(filename, content, mode=0644, omode="wb"):
+ buf = WriteBuffer()
+ buf.mode = mode
+ buf.omode = omode
+ buf.write(content)
+ write_bufs[filename] = buf
+
+ exists_mock(mocker.ARGS)
+ self.mocker.count(0, None)
+ self.mocker.result(False)
+
+ load_mock(mocker.ARGS)
+ self.mocker.count(0, None)
+ self.mocker.result('')
+
+ for _i in range(0, 3):
+ write_mock(mocker.ARGS)
+ self.mocker.call(replace_write)
+
+ write_mock(mocker.ARGS)
+ self.mocker.call(replace_write)
+
+ self.mocker.replay()
+ rh_distro.apply_network(BASE_NET_CFG, False)
+
+ self.assertEquals(len(write_bufs), 4)
+ self.assertIn('/etc/sysconfig/network-scripts/ifcfg-lo', write_bufs)
+ write_buf = write_bufs['/etc/sysconfig/network-scripts/ifcfg-lo']
+ expected_buf = '''
+DEVICE="lo"
+ONBOOT=yes
+'''
+ self.assertCfgEquals(expected_buf, str(write_buf))
+ self.assertEquals(write_buf.mode, 0644)
+
+ self.assertIn('/etc/sysconfig/network-scripts/ifcfg-eth0', write_bufs)
+ write_buf = write_bufs['/etc/sysconfig/network-scripts/ifcfg-eth0']
+ expected_buf = '''
+DEVICE="eth0"
+BOOTPROTO="static"
+NETMASK="255.255.255.0"
+IPADDR="192.168.1.5"
+ONBOOT=yes
+GATEWAY="192.168.1.254"
+BROADCAST="192.168.1.0"
+'''
+ self.assertCfgEquals(expected_buf, str(write_buf))
+ self.assertEquals(write_buf.mode, 0644)
+
+ self.assertIn('/etc/sysconfig/network-scripts/ifcfg-eth1', write_bufs)
+ write_buf = write_bufs['/etc/sysconfig/network-scripts/ifcfg-eth1']
+ expected_buf = '''
+DEVICE="eth1"
+BOOTPROTO="dhcp"
+ONBOOT=yes
+'''
+ self.assertCfgEquals(expected_buf, str(write_buf))
+ self.assertEquals(write_buf.mode, 0644)
+
+ self.assertIn('/etc/sysconfig/network', write_bufs)
+ write_buf = write_bufs['/etc/sysconfig/network']
+ expected_buf = '''
+# Created by cloud-init v. 0.7
+NETWORKING=yes
+'''
+ self.assertCfgEquals(expected_buf, str(write_buf))
+ self.assertEquals(write_buf.mode, 0644)
diff --git a/tests/unittests/test_distros/test_resolv.py b/tests/unittests/test_distros/test_resolv.py
new file mode 100644
index 00000000..6b6ff6aa
--- /dev/null
+++ b/tests/unittests/test_distros/test_resolv.py
@@ -0,0 +1,61 @@
+from mocker import MockerTestCase
+
+from cloudinit.distros.parsers import resolv_conf
+
+import re
+
+
+BASE_RESOLVE = '''
+; generated by /sbin/dhclient-script
+search blah.yahoo.com yahoo.com
+nameserver 10.15.44.14
+nameserver 10.15.30.92
+'''
+BASE_RESOLVE = BASE_RESOLVE.strip()
+
+
+class TestResolvHelper(MockerTestCase):
+ def test_parse_same(self):
+ rp = resolv_conf.ResolvConf(BASE_RESOLVE)
+ rp_r = str(rp).strip()
+ self.assertEquals(BASE_RESOLVE, rp_r)
+
+ def test_local_domain(self):
+ rp = resolv_conf.ResolvConf(BASE_RESOLVE)
+ self.assertEquals(None, rp.local_domain)
+
+ rp.local_domain = "bob"
+ self.assertEquals('bob', rp.local_domain)
+ self.assertIn('domain bob', str(rp))
+
+ def test_nameservers(self):
+ rp = resolv_conf.ResolvConf(BASE_RESOLVE)
+ self.assertIn('10.15.44.14', rp.nameservers)
+ self.assertIn('10.15.30.92', rp.nameservers)
+ rp.add_nameserver('10.2')
+ self.assertIn('10.2', rp.nameservers)
+ self.assertIn('nameserver 10.2', str(rp))
+ self.assertNotIn('10.3', rp.nameservers)
+ self.assertEquals(len(rp.nameservers), 3)
+ rp.add_nameserver('10.2')
+ self.assertRaises(ValueError, rp.add_nameserver, '10.3')
+ self.assertNotIn('10.3', rp.nameservers)
+
+ def test_search_domains(self):
+ rp = resolv_conf.ResolvConf(BASE_RESOLVE)
+ self.assertIn('yahoo.com', rp.search_domains)
+ self.assertIn('blah.yahoo.com', rp.search_domains)
+ rp.add_search_domain('bbb.y.com')
+ self.assertIn('bbb.y.com', rp.search_domains)
+ self.assertTrue(re.search(r'search(.*)bbb.y.com(.*)', str(rp)))
+ self.assertIn('bbb.y.com', rp.search_domains)
+ rp.add_search_domain('bbb.y.com')
+ self.assertEquals(len(rp.search_domains), 3)
+ rp.add_search_domain('bbb2.y.com')
+ self.assertEquals(len(rp.search_domains), 4)
+ rp.add_search_domain('bbb3.y.com')
+ self.assertEquals(len(rp.search_domains), 5)
+ rp.add_search_domain('bbb4.y.com')
+ self.assertEquals(len(rp.search_domains), 6)
+ self.assertRaises(ValueError, rp.add_search_domain, 'bbb5.y.com')
+ self.assertEquals(len(rp.search_domains), 6)
diff --git a/tests/unittests/test_distros/test_sysconfig.py b/tests/unittests/test_distros/test_sysconfig.py
new file mode 100644
index 00000000..0c651407
--- /dev/null
+++ b/tests/unittests/test_distros/test_sysconfig.py
@@ -0,0 +1,82 @@
+from mocker import MockerTestCase
+
+import re
+
+from cloudinit.distros.parsers.sys_conf import SysConf
+
+
+# Lots of good examples @
+# http://content.hccfl.edu/pollock/AUnix1/SysconfigFilesDesc.txt
+
+class TestSysConfHelper(MockerTestCase):
+ # This function was added in 2.7, make it work for 2.6
+ def assertRegMatches(self, text, regexp):
+ regexp = re.compile(regexp)
+ self.assertTrue(regexp.search(text),
+ msg="%s must match %s!" % (text, regexp.pattern))
+
+ def test_parse_no_change(self):
+ contents = '''# A comment
+USESMBAUTH=no
+KEYTABLE=/usr/lib/kbd/keytables/us.map
+SHORTDATE=$(date +%y:%m:%d:%H:%M)
+HOSTNAME=blahblah
+NETMASK0=255.255.255.0
+# Inline comment
+LIST=$LOGROOT/incremental-list
+IPV6TO4_ROUTING='eth0-:0004::1/64 eth1-:0005::1/64'
+ETHTOOL_OPTS="-K ${DEVICE} tso on; -G ${DEVICE} rx 256 tx 256"
+USEMD5=no'''
+ conf = SysConf(contents.splitlines())
+ self.assertEquals(conf['HOSTNAME'], 'blahblah')
+ self.assertEquals(conf['SHORTDATE'], '$(date +%y:%m:%d:%H:%M)')
+ # Should be unquoted
+ self.assertEquals(conf['ETHTOOL_OPTS'], ('-K ${DEVICE} tso on; '
+ '-G ${DEVICE} rx 256 tx 256'))
+ self.assertEquals(contents, str(conf))
+
+ def test_parse_shell_vars(self):
+ contents = 'USESMBAUTH=$XYZ'
+ conf = SysConf(contents.splitlines())
+ self.assertEquals(contents, str(conf))
+ conf = SysConf('')
+ conf['B'] = '${ZZ}d apples'
+ # Should be quoted
+ self.assertEquals('B="${ZZ}d apples"', str(conf))
+ conf = SysConf('')
+ conf['B'] = '$? d apples'
+ self.assertEquals('B="$? d apples"', str(conf))
+ contents = 'IPMI_WATCHDOG_OPTIONS="timeout=60"'
+ conf = SysConf(contents.splitlines())
+ self.assertEquals('IPMI_WATCHDOG_OPTIONS=timeout=60', str(conf))
+
+ def test_parse_adjust(self):
+ contents = 'IPV6TO4_ROUTING="eth0-:0004::1/64 eth1-:0005::1/64"'
+ conf = SysConf(contents.splitlines())
+ # Should be unquoted
+ self.assertEquals('eth0-:0004::1/64 eth1-:0005::1/64',
+ conf['IPV6TO4_ROUTING'])
+ conf['IPV6TO4_ROUTING'] = "blah \tblah"
+ contents2 = str(conf).strip()
+ # Should be requoted due to whitespace
+ self.assertRegMatches(contents2,
+ r'IPV6TO4_ROUTING=[\']blah\s+blah[\']')
+
+ def test_parse_no_adjust_shell(self):
+ conf = SysConf(''.splitlines())
+ conf['B'] = ' $(time)'
+ contents = str(conf)
+ self.assertEquals('B= $(time)', contents)
+
+ def test_parse_empty(self):
+ contents = ''
+ conf = SysConf(contents.splitlines())
+ self.assertEquals('', str(conf).strip())
+
+ def test_parse_add_new(self):
+ contents = 'BLAH=b'
+ conf = SysConf(contents.splitlines())
+ conf['Z'] = 'd'
+ contents = str(conf)
+ self.assertIn("Z=d", contents)
+ self.assertIn("BLAH=b", contents)
diff --git a/tests/unittests/test_distros/test_user_data_normalize.py b/tests/unittests/test_distros/test_user_data_normalize.py
new file mode 100644
index 00000000..5d9d4311
--- /dev/null
+++ b/tests/unittests/test_distros/test_user_data_normalize.py
@@ -0,0 +1,301 @@
+from mocker import MockerTestCase
+
+from cloudinit import distros
+from cloudinit import helpers
+from cloudinit import settings
+
+bcfg = {
+ 'name': 'bob',
+ 'plain_text_passwd': 'ubuntu',
+ 'home': "/home/ubuntu",
+ 'shell': "/bin/bash",
+ 'lock_passwd': True,
+ 'gecos': "Ubuntu",
+ 'groups': ["foo"]
+}
+
+
+class TestUGNormalize(MockerTestCase):
+
+ def _make_distro(self, dtype, def_user=None):
+ cfg = dict(settings.CFG_BUILTIN)
+ cfg['system_info']['distro'] = dtype
+ paths = helpers.Paths(cfg['system_info']['paths'])
+ distro_cls = distros.fetch(dtype)
+ if def_user:
+ cfg['system_info']['default_user'] = def_user.copy()
+ distro = distro_cls(dtype, cfg['system_info'], paths)
+ return distro
+
+ def _norm(self, cfg, distro):
+ return distros.normalize_users_groups(cfg, distro)
+
+ def test_group_dict(self):
+ distro = self._make_distro('ubuntu')
+ g = {'groups': [
+ {
+ 'ubuntu': ['foo', 'bar'],
+ 'bob': 'users',
+ },
+ 'cloud-users',
+ {
+ 'bob': 'users2',
+ },
+ ]
+ }
+ (_users, groups) = self._norm(g, distro)
+ self.assertIn('ubuntu', groups)
+ ub_members = groups['ubuntu']
+ self.assertEquals(sorted(['foo', 'bar']), sorted(ub_members))
+ self.assertIn('bob', groups)
+ b_members = groups['bob']
+ self.assertEquals(sorted(['users', 'users2']),
+ sorted(b_members))
+
+ def test_basic_groups(self):
+ distro = self._make_distro('ubuntu')
+ ug_cfg = {
+ 'groups': ['bob'],
+ }
+ (users, groups) = self._norm(ug_cfg, distro)
+ self.assertIn('bob', groups)
+ self.assertEquals({}, users)
+
+ def test_csv_groups(self):
+ distro = self._make_distro('ubuntu')
+ ug_cfg = {
+ 'groups': 'bob,joe,steve',
+ }
+ (users, groups) = self._norm(ug_cfg, distro)
+ self.assertIn('bob', groups)
+ self.assertIn('joe', groups)
+ self.assertIn('steve', groups)
+ self.assertEquals({}, users)
+
+ def test_more_groups(self):
+ distro = self._make_distro('ubuntu')
+ ug_cfg = {
+ 'groups': ['bob', 'joe', 'steve']
+ }
+ (users, groups) = self._norm(ug_cfg, distro)
+ self.assertIn('bob', groups)
+ self.assertIn('joe', groups)
+ self.assertIn('steve', groups)
+ self.assertEquals({}, users)
+
+ def test_member_groups(self):
+ distro = self._make_distro('ubuntu')
+ ug_cfg = {
+ 'groups': {
+ 'bob': ['s'],
+ 'joe': [],
+ 'steve': [],
+ }
+ }
+ (users, groups) = self._norm(ug_cfg, distro)
+ self.assertIn('bob', groups)
+ self.assertEquals(['s'], groups['bob'])
+ self.assertEquals([], groups['joe'])
+ self.assertIn('joe', groups)
+ self.assertIn('steve', groups)
+ self.assertEquals({}, users)
+
+ def test_users_simple_dict(self):
+ distro = self._make_distro('ubuntu', bcfg)
+ ug_cfg = {
+ 'users': {
+ 'default': True,
+ }
+ }
+ (users, _groups) = self._norm(ug_cfg, distro)
+ self.assertIn('bob', users)
+ ug_cfg = {
+ 'users': {
+ 'default': 'yes',
+ }
+ }
+ (users, _groups) = self._norm(ug_cfg, distro)
+ self.assertIn('bob', users)
+ ug_cfg = {
+ 'users': {
+ 'default': '1',
+ }
+ }
+ (users, _groups) = self._norm(ug_cfg, distro)
+ self.assertIn('bob', users)
+
+ def test_users_simple_dict_no(self):
+ distro = self._make_distro('ubuntu', bcfg)
+ ug_cfg = {
+ 'users': {
+ 'default': False,
+ }
+ }
+ (users, _groups) = self._norm(ug_cfg, distro)
+ self.assertEquals({}, users)
+ ug_cfg = {
+ 'users': {
+ 'default': 'no',
+ }
+ }
+ (users, _groups) = self._norm(ug_cfg, distro)
+ self.assertEquals({}, users)
+
+ def test_users_simple_csv(self):
+ distro = self._make_distro('ubuntu')
+ ug_cfg = {
+ 'users': 'joe,bob',
+ }
+ (users, _groups) = self._norm(ug_cfg, distro)
+ self.assertIn('joe', users)
+ self.assertIn('bob', users)
+ self.assertEquals({'default': False}, users['joe'])
+ self.assertEquals({'default': False}, users['bob'])
+
+ def test_users_simple(self):
+ distro = self._make_distro('ubuntu')
+ ug_cfg = {
+ 'users': [
+ 'joe',
+ 'bob'
+ ],
+ }
+ (users, _groups) = self._norm(ug_cfg, distro)
+ self.assertIn('joe', users)
+ self.assertIn('bob', users)
+ self.assertEquals({'default': False}, users['joe'])
+ self.assertEquals({'default': False}, users['bob'])
+
+ def test_users_old_user(self):
+ distro = self._make_distro('ubuntu', bcfg)
+ ug_cfg = {
+ 'user': 'zetta',
+ 'users': 'default'
+ }
+ (users, _groups) = self._norm(ug_cfg, distro)
+ self.assertIn('bob', users)
+ self.assertIn('zetta', users)
+ self.assertNotIn('default', users)
+ ug_cfg = {
+ 'user': 'zetta',
+ 'users': 'default, joe'
+ }
+ (users, _groups) = self._norm(ug_cfg, distro)
+ self.assertIn('bob', users)
+ self.assertIn('joe', users)
+ self.assertIn('zetta', users)
+ self.assertNotIn('default', users)
+ ug_cfg = {
+ 'user': 'zetta',
+ 'users': ['bob', 'joe']
+ }
+ (users, _groups) = self._norm(ug_cfg, distro)
+ self.assertNotIn('bob', users)
+ self.assertIn('joe', users)
+ self.assertIn('zetta', users)
+ ug_cfg = {
+ 'user': 'zetta',
+ 'users': {
+ 'bob': True,
+ 'joe': True,
+ }
+ }
+ (users, _groups) = self._norm(ug_cfg, distro)
+ self.assertIn('bob', users)
+ self.assertIn('joe', users)
+ self.assertIn('zetta', users)
+ ug_cfg = {
+ 'user': 'zetta',
+ }
+ (users, _groups) = self._norm(ug_cfg, distro)
+ self.assertIn('zetta', users)
+ ug_cfg = {}
+ (users, groups) = self._norm(ug_cfg, distro)
+ self.assertEquals({}, users)
+ self.assertEquals({}, groups)
+
+ def test_users_dict_default_additional(self):
+ distro = self._make_distro('ubuntu', bcfg)
+ ug_cfg = {
+ 'users': [
+ {'name': 'default', 'blah': True}
+ ],
+ }
+ (users, _groups) = self._norm(ug_cfg, distro)
+ self.assertIn('bob', users)
+ self.assertEquals(",".join(distro.get_default_user()['groups']),
+ users['bob']['groups'])
+ self.assertEquals(True,
+ users['bob']['blah'])
+ self.assertEquals(True,
+ users['bob']['default'])
+
+ def test_users_dict_extract(self):
+ distro = self._make_distro('ubuntu', bcfg)
+ ug_cfg = {
+ 'users': [
+ 'default',
+ ],
+ }
+ (users, _groups) = self._norm(ug_cfg, distro)
+ self.assertIn('bob', users)
+ (name, config) = distros.extract_default(users)
+ self.assertEquals(name, 'bob')
+ expected_config = {}
+ def_config = None
+ try:
+ def_config = distro.get_default_user()
+ except NotImplementedError:
+ pass
+ if not def_config:
+ def_config = {}
+ expected_config.update(def_config)
+
+ # Ignore these for now
+ expected_config.pop('name', None)
+ expected_config.pop('groups', None)
+ config.pop('groups', None)
+ self.assertEquals(config, expected_config)
+
+ def test_users_dict_default(self):
+ distro = self._make_distro('ubuntu', bcfg)
+ ug_cfg = {
+ 'users': [
+ 'default',
+ ],
+ }
+ (users, _groups) = self._norm(ug_cfg, distro)
+ self.assertIn('bob', users)
+ self.assertEquals(",".join(distro.get_default_user()['groups']),
+ users['bob']['groups'])
+ self.assertEquals(True,
+ users['bob']['default'])
+
+ def test_users_dict_trans(self):
+ distro = self._make_distro('ubuntu')
+ ug_cfg = {
+ 'users': [
+ {'name': 'joe',
+ 'tr-me': True},
+ {'name': 'bob'},
+ ],
+ }
+ (users, _groups) = self._norm(ug_cfg, distro)
+ self.assertIn('joe', users)
+ self.assertIn('bob', users)
+ self.assertEquals({'tr_me': True, 'default': False}, users['joe'])
+ self.assertEquals({'default': False}, users['bob'])
+
+ def test_users_dict(self):
+ distro = self._make_distro('ubuntu')
+ ug_cfg = {
+ 'users': [
+ {'name': 'joe'},
+ {'name': 'bob'},
+ ],
+ }
+ (users, _groups) = self._norm(ug_cfg, distro)
+ self.assertIn('joe', users)
+ self.assertIn('bob', users)
+ self.assertEquals({'default': False}, users['joe'])
+ self.assertEquals({'default': False}, users['bob'])
diff --git a/tests/unittests/test_filters/__init__.py b/tests/unittests/test_filters/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/tests/unittests/test_filters/__init__.py
diff --git a/tests/unittests/test_filters/test_launch_index.py b/tests/unittests/test_filters/test_launch_index.py
index 7ca7cbb6..773bb312 100644
--- a/tests/unittests/test_filters/test_launch_index.py
+++ b/tests/unittests/test_filters/test_launch_index.py
@@ -1,6 +1,6 @@
import copy
-import helpers as th
+from tests.unittests import helpers
import itertools
@@ -18,7 +18,7 @@ def count_messages(root):
return am
-class TestLaunchFilter(th.ResourceUsingTestCase):
+class TestLaunchFilter(helpers.ResourceUsingTestCase):
def assertCounts(self, message, expected_counts):
orig_message = copy.deepcopy(message)
diff --git a/tests/unittests/test_handler/__init__.py b/tests/unittests/test_handler/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/tests/unittests/test_handler/__init__.py
diff --git a/tests/unittests/test_handler/test_handler_ca_certs.py b/tests/unittests/test_handler/test_handler_ca_certs.py
index d3df5c50..0558023a 100644
--- a/tests/unittests/test_handler/test_handler_ca_certs.py
+++ b/tests/unittests/test_handler/test_handler_ca_certs.py
@@ -77,7 +77,7 @@ class TestConfig(MockerTestCase):
"""Test that a single cert gets passed to add_ca_certs."""
config = {"ca-certs": {"trusted": ["CERT1"]}}
- self.mock_add(self.paths, ["CERT1"])
+ self.mock_add(["CERT1"])
self.mock_update()
self.mocker.replay()
@@ -87,7 +87,7 @@ class TestConfig(MockerTestCase):
"""Test that multiple certs get passed to add_ca_certs."""
config = {"ca-certs": {"trusted": ["CERT1", "CERT2"]}}
- self.mock_add(self.paths, ["CERT1", "CERT2"])
+ self.mock_add(["CERT1", "CERT2"])
self.mock_update()
self.mocker.replay()
@@ -97,7 +97,7 @@ class TestConfig(MockerTestCase):
"""Test remove_defaults works as expected."""
config = {"ca-certs": {"remove-defaults": True}}
- self.mock_remove(self.paths)
+ self.mock_remove()
self.mock_update()
self.mocker.replay()
@@ -116,8 +116,8 @@ class TestConfig(MockerTestCase):
"""Test remove_defaults is not called when config value is False."""
config = {"ca-certs": {"remove-defaults": True, "trusted": ["CERT1"]}}
- self.mock_remove(self.paths)
- self.mock_add(self.paths, ["CERT1"])
+ self.mock_remove()
+ self.mock_add(["CERT1"])
self.mock_update()
self.mocker.replay()
@@ -136,20 +136,52 @@ class TestAddCaCerts(MockerTestCase):
"""Test that no certificate are written if not provided."""
self.mocker.replace(util.write_file, passthrough=False)
self.mocker.replay()
- cc_ca_certs.add_ca_certs(self.paths, [])
+ cc_ca_certs.add_ca_certs([])
- def test_single_cert(self):
- """Test adding a single certificate to the trusted CAs."""
+ def test_single_cert_trailing_cr(self):
+ """Test adding a single certificate to the trusted CAs
+ when existing ca-certificates has trailing newline"""
cert = "CERT1\nLINE2\nLINE3"
+ ca_certs_content = "line1\nline2\ncloud-init-ca-certs.crt\nline3\n"
+ expected = "line1\nline2\nline3\ncloud-init-ca-certs.crt\n"
+
mock_write = self.mocker.replace(util.write_file, passthrough=False)
+ mock_load = self.mocker.replace(util.load_file, passthrough=False)
+
mock_write("/usr/share/ca-certificates/cloud-init-ca-certs.crt",
cert, mode=0644)
+
+ mock_load("/etc/ca-certificates.conf")
+ self.mocker.result(ca_certs_content)
+
+ mock_write("/etc/ca-certificates.conf", expected, omode="wb")
+ self.mocker.replay()
+
+ cc_ca_certs.add_ca_certs([cert])
+
+ def test_single_cert_no_trailing_cr(self):
+ """Test adding a single certificate to the trusted CAs
+ when existing ca-certificates has no trailing newline"""
+ cert = "CERT1\nLINE2\nLINE3"
+
+ ca_certs_content = "line1\nline2\nline3"
+
+ mock_write = self.mocker.replace(util.write_file, passthrough=False)
+ mock_load = self.mocker.replace(util.load_file, passthrough=False)
+
+ mock_write("/usr/share/ca-certificates/cloud-init-ca-certs.crt",
+ cert, mode=0644)
+
+ mock_load("/etc/ca-certificates.conf")
+ self.mocker.result(ca_certs_content)
+
mock_write("/etc/ca-certificates.conf",
- "\ncloud-init-ca-certs.crt", omode="ab")
+ "%s\n%s\n" % (ca_certs_content, "cloud-init-ca-certs.crt"),
+ omode="wb")
self.mocker.replay()
- cc_ca_certs.add_ca_certs(self.paths, [cert])
+ cc_ca_certs.add_ca_certs([cert])
def test_multiple_certs(self):
"""Test adding multiple certificates to the trusted CAs."""
@@ -157,13 +189,21 @@ class TestAddCaCerts(MockerTestCase):
expected_cert_file = "\n".join(certs)
mock_write = self.mocker.replace(util.write_file, passthrough=False)
+ mock_load = self.mocker.replace(util.load_file, passthrough=False)
+
mock_write("/usr/share/ca-certificates/cloud-init-ca-certs.crt",
expected_cert_file, mode=0644)
- mock_write("/etc/ca-certificates.conf",
- "\ncloud-init-ca-certs.crt", omode="ab")
+
+ ca_certs_content = "line1\nline2\nline3"
+ mock_load("/etc/ca-certificates.conf")
+ self.mocker.result(ca_certs_content)
+
+ out = "%s\n%s\n" % (ca_certs_content, "cloud-init-ca-certs.crt")
+ mock_write("/etc/ca-certificates.conf", out, omode="wb")
+
self.mocker.replay()
- cc_ca_certs.add_ca_certs(self.paths, certs)
+ cc_ca_certs.add_ca_certs(certs)
class TestUpdateCaCerts(MockerTestCase):
@@ -198,4 +238,4 @@ class TestRemoveDefaultCaCerts(MockerTestCase):
"ca-certificates ca-certificates/trust_new_crts select no")
self.mocker.replay()
- cc_ca_certs.remove_default_ca_certs(self.paths)
+ cc_ca_certs.remove_default_ca_certs()
diff --git a/tests/unittests/test_handler/test_handler_power_state.py b/tests/unittests/test_handler/test_handler_power_state.py
new file mode 100644
index 00000000..f6e37fa5
--- /dev/null
+++ b/tests/unittests/test_handler/test_handler_power_state.py
@@ -0,0 +1,88 @@
+from cloudinit.config import cc_power_state_change as psc
+
+from tests.unittests import helpers as t_help
+
+
+class TestLoadPowerState(t_help.TestCase):
+ def setUp(self):
+ super(self.__class__, self).setUp()
+
+ def test_no_config(self):
+ # completely empty config should mean do nothing
+ (cmd, _timeout) = psc.load_power_state({})
+ self.assertEqual(cmd, None)
+
+ def test_irrelevant_config(self):
+ # no power_state field in config should return None for cmd
+ (cmd, _timeout) = psc.load_power_state({'foo': 'bar'})
+ self.assertEqual(cmd, None)
+
+ def test_invalid_mode(self):
+ cfg = {'power_state': {'mode': 'gibberish'}}
+ self.assertRaises(TypeError, psc.load_power_state, cfg)
+
+ cfg = {'power_state': {'mode': ''}}
+ self.assertRaises(TypeError, psc.load_power_state, cfg)
+
+ def test_empty_mode(self):
+ cfg = {'power_state': {'message': 'goodbye'}}
+ self.assertRaises(TypeError, psc.load_power_state, cfg)
+
+ def test_valid_modes(self):
+ cfg = {'power_state': {}}
+ for mode in ('halt', 'poweroff', 'reboot'):
+ cfg['power_state']['mode'] = mode
+ check_lps_ret(psc.load_power_state(cfg), mode=mode)
+
+ def test_invalid_delay(self):
+ cfg = {'power_state': {'mode': 'poweroff', 'delay': 'goodbye'}}
+ self.assertRaises(TypeError, psc.load_power_state, cfg)
+
+ def test_valid_delay(self):
+ cfg = {'power_state': {'mode': 'poweroff', 'delay': ''}}
+ for delay in ("now", "+1", "+30"):
+ cfg['power_state']['delay'] = delay
+ check_lps_ret(psc.load_power_state(cfg))
+
+ def test_message_present(self):
+ cfg = {'power_state': {'mode': 'poweroff', 'message': 'GOODBYE'}}
+ ret = psc.load_power_state(cfg)
+ check_lps_ret(psc.load_power_state(cfg))
+ self.assertIn(cfg['power_state']['message'], ret[0])
+
+ def test_no_message(self):
+ # if message is not present, then no argument should be passed for it
+ cfg = {'power_state': {'mode': 'poweroff'}}
+ (cmd, _timeout) = psc.load_power_state(cfg)
+ self.assertNotIn("", cmd)
+ check_lps_ret(psc.load_power_state(cfg))
+ self.assertTrue(len(cmd) == 3)
+
+
+def check_lps_ret(psc_return, mode=None):
+ if len(psc_return) != 2:
+ raise TypeError("length returned = %d" % len(psc_return))
+
+ errs = []
+ cmd = psc_return[0]
+ timeout = psc_return[1]
+
+ if not 'shutdown' in psc_return[0][0]:
+ errs.append("string 'shutdown' not in cmd")
+
+ if mode is not None:
+ opt = {'halt': '-H', 'poweroff': '-P', 'reboot': '-r'}[mode]
+ if opt not in psc_return[0]:
+ errs.append("opt '%s' not in cmd: %s" % (opt, cmd))
+
+ if len(cmd) != 3 and len(cmd) != 4:
+ errs.append("Invalid command length: %s" % len(cmd))
+
+ try:
+ float(timeout)
+ except:
+ errs.append("timeout failed convert to float")
+
+ if len(errs):
+ lines = ["Errors in result: %s" % str(psc_return)] + errs
+ raise Exception('\n'.join(lines))
diff --git a/tests/unittests/test_handler/test_handler_set_hostname.py b/tests/unittests/test_handler/test_handler_set_hostname.py
new file mode 100644
index 00000000..a1aba62f
--- /dev/null
+++ b/tests/unittests/test_handler/test_handler_set_hostname.py
@@ -0,0 +1,58 @@
+from cloudinit.config import cc_set_hostname
+
+from cloudinit import cloud
+from cloudinit import distros
+from cloudinit import helpers
+from cloudinit import util
+
+from tests.unittests import helpers as t_help
+
+import logging
+
+from StringIO import StringIO
+
+from configobj import ConfigObj
+
+LOG = logging.getLogger(__name__)
+
+
+class TestHostname(t_help.FilesystemMockingTestCase):
+ def setUp(self):
+ super(TestHostname, self).setUp()
+ self.tmp = self.makeDir(prefix="unittest_")
+
+ def _fetch_distro(self, kind):
+ cls = distros.fetch(kind)
+ paths = helpers.Paths({})
+ return cls(kind, {}, paths)
+
+ def test_write_hostname_rhel(self):
+ cfg = {
+ 'hostname': 'blah.blah.blah.yahoo.com',
+ }
+ distro = self._fetch_distro('rhel')
+ paths = helpers.Paths({})
+ ds = None
+ cc = cloud.Cloud(ds, paths, {}, distro, None)
+ self.patchUtils(self.tmp)
+ self.patchOS(self.tmp)
+ cc_set_hostname.handle('cc_set_hostname',
+ cfg, cc, LOG, [])
+ contents = util.load_file("/etc/sysconfig/network")
+ n_cfg = ConfigObj(StringIO(contents))
+ self.assertEquals({'HOSTNAME': 'blah.blah.blah.yahoo.com'},
+ dict(n_cfg))
+
+ def test_write_hostname_debian(self):
+ cfg = {
+ 'hostname': 'blah.blah.blah.yahoo.com',
+ }
+ distro = self._fetch_distro('debian')
+ paths = helpers.Paths({})
+ ds = None
+ cc = cloud.Cloud(ds, paths, {}, distro, None)
+ self.patchUtils(self.tmp)
+ cc_set_hostname.handle('cc_set_hostname',
+ cfg, cc, LOG, [])
+ contents = util.load_file("/etc/hostname")
+ self.assertEquals('blah', contents.strip())
diff --git a/tests/unittests/test_handler/test_handler_yum_add_repo.py b/tests/unittests/test_handler/test_handler_yum_add_repo.py
new file mode 100644
index 00000000..8df592f9
--- /dev/null
+++ b/tests/unittests/test_handler/test_handler_yum_add_repo.py
@@ -0,0 +1,68 @@
+from cloudinit import helpers
+from cloudinit import util
+
+from cloudinit.config import cc_yum_add_repo
+
+from tests.unittests import helpers
+
+import logging
+
+from StringIO import StringIO
+
+import configobj
+
+LOG = logging.getLogger(__name__)
+
+
+class TestConfig(helpers.FilesystemMockingTestCase):
+ def setUp(self):
+ super(TestConfig, self).setUp()
+ self.tmp = self.makeDir(prefix="unittest_")
+
+ def test_bad_config(self):
+ cfg = {
+ 'yum_repos': {
+ 'epel-testing': {
+ 'name': 'Extra Packages for Enterprise Linux 5 - Testing',
+ # Missing this should cause the repo not to be written
+ #'baseurl': 'http://blah.org/pub/epel/testing/5/$basearch',
+ 'enabled': False,
+ 'gpgcheck': True,
+ 'gpgkey': 'file:///etc/pki/rpm-gpg/RPM-GPG-KEY-EPEL',
+ 'failovermethod': 'priority',
+ },
+ },
+ }
+ self.patchUtils(self.tmp)
+ cc_yum_add_repo.handle('yum_add_repo', cfg, None, LOG, [])
+ self.assertRaises(IOError, util.load_file,
+ "/etc/yum.repos.d/epel_testing.repo")
+
+ def test_write_config(self):
+ cfg = {
+ 'yum_repos': {
+ 'epel-testing': {
+ 'name': 'Extra Packages for Enterprise Linux 5 - Testing',
+ 'baseurl': 'http://blah.org/pub/epel/testing/5/$basearch',
+ 'enabled': False,
+ 'gpgcheck': True,
+ 'gpgkey': 'file:///etc/pki/rpm-gpg/RPM-GPG-KEY-EPEL',
+ 'failovermethod': 'priority',
+ },
+ },
+ }
+ self.patchUtils(self.tmp)
+ cc_yum_add_repo.handle('yum_add_repo', cfg, None, LOG, [])
+ contents = util.load_file("/etc/yum.repos.d/epel_testing.repo")
+ contents = configobj.ConfigObj(StringIO(contents))
+ expected = {
+ 'epel_testing': {
+ 'name': 'Extra Packages for Enterprise Linux 5 - Testing',
+ 'failovermethod': 'priority',
+ 'gpgkey': 'file:///etc/pki/rpm-gpg/RPM-GPG-KEY-EPEL',
+ 'enabled': '0',
+ 'baseurl': 'http://blah.org/pub/epel/testing/5/$basearch',
+ 'gpgcheck': '1',
+ }
+ }
+ self.assertEquals(expected, dict(contents))
diff --git a/tests/unittests/test_merging.py b/tests/unittests/test_merging.py
new file mode 100644
index 00000000..0037b966
--- /dev/null
+++ b/tests/unittests/test_merging.py
@@ -0,0 +1,62 @@
+from mocker import MockerTestCase
+
+from cloudinit import util
+
+
+class TestMergeDict(MockerTestCase):
+ def test_simple_merge(self):
+ """Test simple non-conflict merge."""
+ source = {"key1": "value1"}
+ candidate = {"key2": "value2"}
+ result = util.mergedict(source, candidate)
+ self.assertEqual({"key1": "value1", "key2": "value2"}, result)
+
+ def test_nested_merge(self):
+ """Test nested merge."""
+ source = {"key1": {"key1.1": "value1.1"}}
+ candidate = {"key1": {"key1.2": "value1.2"}}
+ result = util.mergedict(source, candidate)
+ self.assertEqual(
+ {"key1": {"key1.1": "value1.1", "key1.2": "value1.2"}}, result)
+
+ def test_merge_does_not_override(self):
+ """Test that candidate doesn't override source."""
+ source = {"key1": "value1", "key2": "value2"}
+ candidate = {"key1": "value2", "key2": "NEW VALUE"}
+ result = util.mergedict(source, candidate)
+ self.assertEqual(source, result)
+
+ def test_empty_candidate(self):
+ """Test empty candidate doesn't change source."""
+ source = {"key": "value"}
+ candidate = {}
+ result = util.mergedict(source, candidate)
+ self.assertEqual(source, result)
+
+ def test_empty_source(self):
+ """Test empty source is replaced by candidate."""
+ source = {}
+ candidate = {"key": "value"}
+ result = util.mergedict(source, candidate)
+ self.assertEqual(candidate, result)
+
+ def test_non_dict_candidate(self):
+ """Test non-dict candidate is discarded."""
+ source = {"key": "value"}
+ candidate = "not a dict"
+ result = util.mergedict(source, candidate)
+ self.assertEqual(source, result)
+
+ def test_non_dict_source(self):
+ """Test non-dict source is not modified with a dict candidate."""
+ source = "not a dict"
+ candidate = {"key": "value"}
+ result = util.mergedict(source, candidate)
+ self.assertEqual(source, result)
+
+ def test_neither_dict(self):
+ """Test if neither candidate or source is dict source wins."""
+ source = "source"
+ candidate = "candidate"
+ result = util.mergedict(source, candidate)
+ self.assertEqual(source, result)
diff --git a/tests/unittests/test_runs/__init__.py b/tests/unittests/test_runs/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/tests/unittests/test_runs/__init__.py
diff --git a/tests/unittests/test_runs/test_merge_run.py b/tests/unittests/test_runs/test_merge_run.py
new file mode 100644
index 00000000..d9c3a455
--- /dev/null
+++ b/tests/unittests/test_runs/test_merge_run.py
@@ -0,0 +1,52 @@
+import os
+
+from tests.unittests import helpers
+
+from cloudinit.settings import (PER_INSTANCE)
+from cloudinit import stages
+from cloudinit import util
+
+
+class TestMergeRun(helpers.FilesystemMockingTestCase):
+ def _patchIn(self, root):
+ self.restore()
+ self.patchOS(root)
+ self.patchUtils(root)
+
+ def test_none_ds(self):
+ new_root = self.makeDir()
+ self.replicateTestRoot('simple_ubuntu', new_root)
+ cfg = {
+ 'datasource_list': ['None'],
+ 'cloud_init_modules': ['write-files'],
+ }
+ ud = self.readResource('user_data.1.txt')
+ cloud_cfg = util.yaml_dumps(cfg)
+ util.ensure_dir(os.path.join(new_root, 'etc', 'cloud'))
+ util.write_file(os.path.join(new_root, 'etc',
+ 'cloud', 'cloud.cfg'), cloud_cfg)
+ self._patchIn(new_root)
+
+ # Now start verifying whats created
+ initer = stages.Init()
+ initer.read_cfg()
+ initer.initialize()
+ initer.fetch()
+ initer.datasource.userdata_raw = ud
+ _iid = initer.instancify()
+ initer.update()
+ initer.cloudify().run('consume_userdata',
+ initer.consume_userdata,
+ args=[PER_INSTANCE],
+ freq=PER_INSTANCE)
+ mirrors = initer.distro.get_option('package_mirrors')
+ self.assertEquals(1, len(mirrors))
+ mirror = mirrors[0]
+ self.assertEquals(mirror['arches'], ['i386', 'amd64', 'blah'])
+ mods = stages.Modules(initer)
+ (which_ran, failures) = mods.run_section('cloud_init_modules')
+ self.assertTrue(len(failures) == 0)
+ self.assertTrue(os.path.exists('/etc/blah.ini'))
+ self.assertIn('write-files', which_ran)
+ contents = util.load_file('/etc/blah.ini')
+ self.assertEquals(contents, 'blah')
diff --git a/tests/unittests/test_runs/test_simple_run.py b/tests/unittests/test_runs/test_simple_run.py
new file mode 100644
index 00000000..60ef812a
--- /dev/null
+++ b/tests/unittests/test_runs/test_simple_run.py
@@ -0,0 +1,80 @@
+import os
+
+from tests.unittests import helpers
+
+from cloudinit.settings import (PER_INSTANCE)
+from cloudinit import stages
+from cloudinit import util
+
+
+class TestSimpleRun(helpers.FilesystemMockingTestCase):
+ def _patchIn(self, root):
+ self.restore()
+ self.patchOS(root)
+ self.patchUtils(root)
+
+ def _pp_root(self, root, repatch=True):
+ self.restore()
+ for (dirpath, dirnames, filenames) in os.walk(root):
+ print(dirpath)
+ for f in filenames:
+ joined = os.path.join(dirpath, f)
+ if os.path.islink(joined):
+ print("f %s - (symlink)" % (f))
+ else:
+ print("f %s" % (f))
+ for d in dirnames:
+ joined = os.path.join(dirpath, d)
+ if os.path.islink(joined):
+ print("d %s - (symlink)" % (d))
+ else:
+ print("d %s" % (d))
+ if repatch:
+ self._patchIn(root)
+
+ def test_none_ds(self):
+ new_root = self.makeDir()
+ self.replicateTestRoot('simple_ubuntu', new_root)
+ cfg = {
+ 'datasource_list': ['None'],
+ 'write_files': [
+ {
+ 'path': '/etc/blah.ini',
+ 'content': 'blah',
+ 'permissions': 0755,
+ },
+ ],
+ 'cloud_init_modules': ['write-files'],
+ }
+ cloud_cfg = util.yaml_dumps(cfg)
+ util.ensure_dir(os.path.join(new_root, 'etc', 'cloud'))
+ util.write_file(os.path.join(new_root, 'etc',
+ 'cloud', 'cloud.cfg'), cloud_cfg)
+ self._patchIn(new_root)
+
+ # Now start verifying whats created
+ initer = stages.Init()
+ initer.read_cfg()
+ initer.initialize()
+ self.assertTrue(os.path.exists("/var/lib/cloud"))
+ for d in ['scripts', 'seed', 'instances', 'handlers', 'sem', 'data']:
+ self.assertTrue(os.path.isdir(os.path.join("/var/lib/cloud", d)))
+
+ initer.fetch()
+ iid = initer.instancify()
+ self.assertEquals(iid, 'iid-datasource-none')
+ initer.update()
+ self.assertTrue(os.path.islink("var/lib/cloud/instance"))
+
+ initer.cloudify().run('consume_userdata',
+ initer.consume_userdata,
+ args=[PER_INSTANCE],
+ freq=PER_INSTANCE)
+
+ mods = stages.Modules(initer)
+ (which_ran, failures) = mods.run_section('cloud_init_modules')
+ self.assertTrue(len(failures) == 0)
+ self.assertTrue(os.path.exists('/etc/blah.ini'))
+ self.assertIn('write-files', which_ran)
+ contents = util.load_file('/etc/blah.ini')
+ self.assertEquals(contents, 'blah')
diff --git a/tests/unittests/test_util.py b/tests/unittests/test_util.py
index 15fcbd26..02611581 100644
--- a/tests/unittests/test_util.py
+++ b/tests/unittests/test_util.py
@@ -1,5 +1,6 @@
import os
import stat
+import yaml
from mocker import MockerTestCase
from unittest import TestCase
@@ -27,65 +28,6 @@ class FakeSelinux(object):
self.restored.append(path)
-class TestMergeDict(MockerTestCase):
- def test_simple_merge(self):
- """Test simple non-conflict merge."""
- source = {"key1": "value1"}
- candidate = {"key2": "value2"}
- result = util.mergedict(source, candidate)
- self.assertEqual({"key1": "value1", "key2": "value2"}, result)
-
- def test_nested_merge(self):
- """Test nested merge."""
- source = {"key1": {"key1.1": "value1.1"}}
- candidate = {"key1": {"key1.2": "value1.2"}}
- result = util.mergedict(source, candidate)
- self.assertEqual(
- {"key1": {"key1.1": "value1.1", "key1.2": "value1.2"}}, result)
-
- def test_merge_does_not_override(self):
- """Test that candidate doesn't override source."""
- source = {"key1": "value1", "key2": "value2"}
- candidate = {"key1": "value2", "key2": "NEW VALUE"}
- result = util.mergedict(source, candidate)
- self.assertEqual(source, result)
-
- def test_empty_candidate(self):
- """Test empty candidate doesn't change source."""
- source = {"key": "value"}
- candidate = {}
- result = util.mergedict(source, candidate)
- self.assertEqual(source, result)
-
- def test_empty_source(self):
- """Test empty source is replaced by candidate."""
- source = {}
- candidate = {"key": "value"}
- result = util.mergedict(source, candidate)
- self.assertEqual(candidate, result)
-
- def test_non_dict_candidate(self):
- """Test non-dict candidate is discarded."""
- source = {"key": "value"}
- candidate = "not a dict"
- result = util.mergedict(source, candidate)
- self.assertEqual(source, result)
-
- def test_non_dict_source(self):
- """Test non-dict source is not modified with a dict candidate."""
- source = "not a dict"
- candidate = {"key": "value"}
- result = util.mergedict(source, candidate)
- self.assertEqual(source, result)
-
- def test_neither_dict(self):
- """Test if neither candidate or source is dict source wins."""
- source = "source"
- candidate = "candidate"
- result = util.mergedict(source, candidate)
- self.assertEqual(source, result)
-
-
class TestGetCfgOptionListOrStr(TestCase):
def test_not_found_no_default(self):
"""None is returned if key is not found and no default given."""
@@ -268,4 +210,42 @@ class TestGetCmdline(TestCase):
os.environ['DEBUG_PROC_CMDLINE'] = 'abcd 123'
self.assertEqual(os.environ['DEBUG_PROC_CMDLINE'], util.get_cmdline())
+
+class TestLoadYaml(TestCase):
+ mydefault = "7b03a8ebace993d806255121073fed52"
+
+ def test_simple(self):
+ mydata = {'1': "one", '2': "two"}
+ self.assertEqual(util.load_yaml(yaml.dump(mydata)), mydata)
+
+ def test_nonallowed_returns_default(self):
+ # for now, anything not in the allowed list just returns the default.
+ myyaml = yaml.dump({'1': "one"})
+ self.assertEqual(util.load_yaml(blob=myyaml,
+ default=self.mydefault,
+ allowed=(str,)),
+ self.mydefault)
+
+ def test_bogus_returns_default(self):
+ badyaml = "1\n 2:"
+ self.assertEqual(util.load_yaml(blob=badyaml,
+ default=self.mydefault),
+ self.mydefault)
+
+ def test_unsafe_types(self):
+ # should not load complex types
+ unsafe_yaml = yaml.dump((1, 2, 3,))
+ self.assertEqual(util.load_yaml(blob=unsafe_yaml,
+ default=self.mydefault),
+ self.mydefault)
+
+ def test_python_unicode(self):
+ # complex type of python/unicde is explicitly allowed
+ myobj = {'1': unicode("FOOBAR")}
+ safe_yaml = yaml.dump(myobj)
+ self.assertEqual(util.load_yaml(blob=safe_yaml,
+ default=self.mydefault),
+ myobj)
+
+
# vi: ts=4 expandtab