summaryrefslogtreecommitdiff
path: root/tests
diff options
context:
space:
mode:
Diffstat (limited to 'tests')
-rw-r--r--tests/unittests/test_datasource/test_configdrive.py145
-rw-r--r--tests/unittests/test_distros/test_user_data_normalize.py279
-rw-r--r--tests/unittests/test_util.py39
3 files changed, 453 insertions, 10 deletions
diff --git a/tests/unittests/test_datasource/test_configdrive.py b/tests/unittests/test_datasource/test_configdrive.py
index 55573114..00379e03 100644
--- a/tests/unittests/test_datasource/test_configdrive.py
+++ b/tests/unittests/test_datasource/test_configdrive.py
@@ -2,10 +2,12 @@ from copy import copy
import json
import os
import os.path
-import shutil
-import tempfile
-from unittest import TestCase
+import mocker
+from mocker import MockerTestCase
+
+from cloudinit import helpers
+from cloudinit import settings
from cloudinit.sources import DataSourceConfigDrive as ds
from cloudinit import util
@@ -60,17 +62,140 @@ CFG_DRIVE_FILES_V2 = {
'openstack/latest/user_data': USER_DATA}
-class TestConfigDriveDataSource(TestCase):
+class TestConfigDriveDataSource(MockerTestCase):
def setUp(self):
super(TestConfigDriveDataSource, self).setUp()
- self.tmp = tempfile.mkdtemp()
+ self.tmp = self.makeDir()
- def tearDown(self):
- try:
- shutil.rmtree(self.tmp)
- except OSError:
- pass
+ def test_ec2_metadata(self):
+ populate_dir(self.tmp, CFG_DRIVE_FILES_V2)
+ found = ds.read_config_drive_dir(self.tmp)
+ self.assertTrue('ec2-metadata' in found)
+ ec2_md = found['ec2-metadata']
+ self.assertEqual(EC2_META, ec2_md)
+
+ def test_dev_os_remap(self):
+ populate_dir(self.tmp, CFG_DRIVE_FILES_V2)
+ cfg_ds = ds.DataSourceConfigDrive(settings.CFG_BUILTIN,
+ None,
+ helpers.Paths({}))
+ found = ds.read_config_drive_dir(self.tmp)
+ cfg_ds.metadata = found['metadata']
+ name_tests = {
+ 'ami': '/dev/vda1',
+ 'root': '/dev/vda1',
+ 'ephemeral0': '/dev/vda2',
+ 'swap': '/dev/vda3',
+ }
+ for name, dev_name in name_tests.items():
+ my_mock = mocker.Mocker()
+ find_mock = my_mock.replace(util.find_devs_with,
+ spec=False, passthrough=False)
+ provided_name = dev_name[len('/dev/'):]
+ provided_name = "s" + provided_name[1:]
+ find_mock(mocker.ARGS)
+ my_mock.result([provided_name])
+ exists_mock = my_mock.replace(os.path.exists,
+ spec=False, passthrough=False)
+ exists_mock(mocker.ARGS)
+ my_mock.result(False)
+ exists_mock(mocker.ARGS)
+ my_mock.result(True)
+ my_mock.replay()
+ device = cfg_ds.device_name_to_device(name)
+ my_mock.restore()
+ self.assertEquals(dev_name, device)
+
+ def test_dev_os_map(self):
+ populate_dir(self.tmp, CFG_DRIVE_FILES_V2)
+ cfg_ds = ds.DataSourceConfigDrive(settings.CFG_BUILTIN,
+ None,
+ helpers.Paths({}))
+ found = ds.read_config_drive_dir(self.tmp)
+ os_md = found['metadata']
+ cfg_ds.metadata = os_md
+ name_tests = {
+ 'ami': '/dev/vda1',
+ 'root': '/dev/vda1',
+ 'ephemeral0': '/dev/vda2',
+ 'swap': '/dev/vda3',
+ }
+ for name, dev_name in name_tests.items():
+ my_mock = mocker.Mocker()
+ find_mock = my_mock.replace(util.find_devs_with,
+ spec=False, passthrough=False)
+ find_mock(mocker.ARGS)
+ my_mock.result([dev_name])
+ exists_mock = my_mock.replace(os.path.exists,
+ spec=False, passthrough=False)
+ exists_mock(mocker.ARGS)
+ my_mock.result(True)
+ my_mock.replay()
+ device = cfg_ds.device_name_to_device(name)
+ my_mock.restore()
+ self.assertEquals(dev_name, device)
+
+ def test_dev_ec2_remap(self):
+ populate_dir(self.tmp, CFG_DRIVE_FILES_V2)
+ cfg_ds = ds.DataSourceConfigDrive(settings.CFG_BUILTIN,
+ None,
+ helpers.Paths({}))
+ found = ds.read_config_drive_dir(self.tmp)
+ ec2_md = found['ec2-metadata']
+ os_md = found['metadata']
+ cfg_ds.ec2_metadata = ec2_md
+ cfg_ds.metadata = os_md
+ name_tests = {
+ 'ami': '/dev/vda1',
+ 'root': '/dev/vda1',
+ 'ephemeral0': '/dev/vda2',
+ 'swap': '/dev/vda3',
+ None: None,
+ 'bob': None,
+ 'root2k': None,
+ }
+ for name, dev_name in name_tests.items():
+ my_mock = mocker.Mocker()
+ exists_mock = my_mock.replace(os.path.exists,
+ spec=False, passthrough=False)
+ exists_mock(mocker.ARGS)
+ my_mock.result(False)
+ exists_mock(mocker.ARGS)
+ my_mock.result(True)
+ my_mock.replay()
+ device = cfg_ds.device_name_to_device(name)
+ self.assertEquals(dev_name, device)
+ my_mock.restore()
+
+ def test_dev_ec2_map(self):
+ populate_dir(self.tmp, CFG_DRIVE_FILES_V2)
+ cfg_ds = ds.DataSourceConfigDrive(settings.CFG_BUILTIN,
+ None,
+ helpers.Paths({}))
+ found = ds.read_config_drive_dir(self.tmp)
+ exists_mock = self.mocker.replace(os.path.exists,
+ spec=False, passthrough=False)
+ exists_mock(mocker.ARGS)
+ self.mocker.count(0, None)
+ self.mocker.result(True)
+ self.mocker.replay()
+ ec2_md = found['ec2-metadata']
+ os_md = found['metadata']
+ cfg_ds.ec2_metadata = ec2_md
+ cfg_ds.metadata = os_md
+ name_tests = {
+ 'ami': '/dev/sda1',
+ 'root': '/dev/sda1',
+ 'ephemeral0': '/dev/sda2',
+ 'swap': '/dev/sda3',
+ None: None,
+ 'bob': None,
+ 'root2k': None,
+ }
+ for name, dev_name in name_tests.items():
+ device = cfg_ds.device_name_to_device(name)
+ self.assertEquals(dev_name, device)
def test_dir_valid(self):
"""Verify a dir is read as such."""
diff --git a/tests/unittests/test_distros/test_user_data_normalize.py b/tests/unittests/test_distros/test_user_data_normalize.py
new file mode 100644
index 00000000..8f0d8896
--- /dev/null
+++ b/tests/unittests/test_distros/test_user_data_normalize.py
@@ -0,0 +1,279 @@
+from mocker import MockerTestCase
+
+from cloudinit import distros
+from cloudinit import helpers
+from cloudinit import settings
+
+bcfg = {
+ 'name': 'bob',
+ 'plain_text_passwd': 'ubuntu',
+ 'home': "/home/ubuntu",
+ 'shell': "/bin/bash",
+ 'lock_passwd': True,
+ 'gecos': "Ubuntu",
+ 'groups': ["foo"]
+}
+
+
+class TestUGNormalize(MockerTestCase):
+
+ def _make_distro(self, dtype, def_user=None):
+ cfg = dict(settings.CFG_BUILTIN)
+ cfg['system_info']['distro'] = dtype
+ paths = helpers.Paths(cfg['system_info']['paths'])
+ distro_cls = distros.fetch(dtype)
+ if def_user:
+ cfg['system_info']['default_user'] = def_user.copy()
+ distro = distro_cls(dtype, cfg['system_info'], paths)
+ return distro
+
+ def _norm(self, cfg, distro):
+ return distros.normalize_users_groups(cfg, distro)
+
+ def test_basic_groups(self):
+ distro = self._make_distro('ubuntu')
+ ug_cfg = {
+ 'groups': ['bob'],
+ }
+ (users, groups) = self._norm(ug_cfg, distro)
+ self.assertIn('bob', groups)
+ self.assertEquals({}, users)
+
+ def test_csv_groups(self):
+ distro = self._make_distro('ubuntu')
+ ug_cfg = {
+ 'groups': 'bob,joe,steve',
+ }
+ (users, groups) = self._norm(ug_cfg, distro)
+ self.assertIn('bob', groups)
+ self.assertIn('joe', groups)
+ self.assertIn('steve', groups)
+ self.assertEquals({}, users)
+
+ def test_more_groups(self):
+ distro = self._make_distro('ubuntu')
+ ug_cfg = {
+ 'groups': ['bob', 'joe', 'steve']
+ }
+ (users, groups) = self._norm(ug_cfg, distro)
+ self.assertIn('bob', groups)
+ self.assertIn('joe', groups)
+ self.assertIn('steve', groups)
+ self.assertEquals({}, users)
+
+ def test_member_groups(self):
+ distro = self._make_distro('ubuntu')
+ ug_cfg = {
+ 'groups': {
+ 'bob': ['s'],
+ 'joe': [],
+ 'steve': [],
+ }
+ }
+ (users, groups) = self._norm(ug_cfg, distro)
+ self.assertIn('bob', groups)
+ self.assertEquals(['s'], groups['bob'])
+ self.assertEquals([], groups['joe'])
+ self.assertIn('joe', groups)
+ self.assertIn('steve', groups)
+ self.assertEquals({}, users)
+
+ def test_users_simple_dict(self):
+ distro = self._make_distro('ubuntu', bcfg)
+ ug_cfg = {
+ 'users': {
+ 'default': True,
+ }
+ }
+ (users, _groups) = self._norm(ug_cfg, distro)
+ self.assertIn('bob', users)
+ ug_cfg = {
+ 'users': {
+ 'default': 'yes',
+ }
+ }
+ (users, _groups) = self._norm(ug_cfg, distro)
+ self.assertIn('bob', users)
+ ug_cfg = {
+ 'users': {
+ 'default': '1',
+ }
+ }
+ (users, _groups) = self._norm(ug_cfg, distro)
+ self.assertIn('bob', users)
+
+ def test_users_simple_dict_no(self):
+ distro = self._make_distro('ubuntu', bcfg)
+ ug_cfg = {
+ 'users': {
+ 'default': False,
+ }
+ }
+ (users, _groups) = self._norm(ug_cfg, distro)
+ self.assertEquals({}, users)
+ ug_cfg = {
+ 'users': {
+ 'default': 'no',
+ }
+ }
+ (users, _groups) = self._norm(ug_cfg, distro)
+ self.assertEquals({}, users)
+
+ def test_users_simple_csv(self):
+ distro = self._make_distro('ubuntu')
+ ug_cfg = {
+ 'users': 'joe,bob',
+ }
+ (users, _groups) = self._norm(ug_cfg, distro)
+ self.assertIn('joe', users)
+ self.assertIn('bob', users)
+ self.assertEquals({'default': False}, users['joe'])
+ self.assertEquals({'default': False}, users['bob'])
+
+ def test_users_simple(self):
+ distro = self._make_distro('ubuntu')
+ ug_cfg = {
+ 'users': [
+ 'joe',
+ 'bob'
+ ],
+ }
+ (users, _groups) = self._norm(ug_cfg, distro)
+ self.assertIn('joe', users)
+ self.assertIn('bob', users)
+ self.assertEquals({'default': False}, users['joe'])
+ self.assertEquals({'default': False}, users['bob'])
+
+ def test_users_old_user(self):
+ distro = self._make_distro('ubuntu', bcfg)
+ ug_cfg = {
+ 'user': 'zetta',
+ 'users': 'default'
+ }
+ (users, _groups) = self._norm(ug_cfg, distro)
+ self.assertIn('bob', users)
+ self.assertIn('zetta', users)
+ self.assertNotIn('default', users)
+ ug_cfg = {
+ 'user': 'zetta',
+ 'users': 'default, joe'
+ }
+ (users, _groups) = self._norm(ug_cfg, distro)
+ self.assertIn('bob', users)
+ self.assertIn('joe', users)
+ self.assertIn('zetta', users)
+ self.assertNotIn('default', users)
+ ug_cfg = {
+ 'user': 'zetta',
+ 'users': ['bob', 'joe']
+ }
+ (users, _groups) = self._norm(ug_cfg, distro)
+ self.assertNotIn('bob', users)
+ self.assertIn('joe', users)
+ self.assertIn('zetta', users)
+ ug_cfg = {
+ 'user': 'zetta',
+ 'users': {
+ 'bob': True,
+ 'joe': True,
+ }
+ }
+ (users, _groups) = self._norm(ug_cfg, distro)
+ self.assertIn('bob', users)
+ self.assertIn('joe', users)
+ self.assertIn('zetta', users)
+ ug_cfg = {
+ 'user': 'zetta',
+ }
+ (users, _groups) = self._norm(ug_cfg, distro)
+ self.assertIn('zetta', users)
+ ug_cfg = {}
+ (users, groups) = self._norm(ug_cfg, distro)
+ self.assertEquals({}, users)
+ self.assertEquals({}, groups)
+
+ def test_users_dict_default_additional(self):
+ distro = self._make_distro('ubuntu', bcfg)
+ ug_cfg = {
+ 'users': [
+ {'name': 'default', 'blah': True}
+ ],
+ }
+ (users, _groups) = self._norm(ug_cfg, distro)
+ self.assertIn('bob', users)
+ self.assertEquals(",".join(distro.get_default_user()['groups']),
+ users['bob']['groups'])
+ self.assertEquals(True,
+ users['bob']['blah'])
+ self.assertEquals(True,
+ users['bob']['default'])
+
+ def test_users_dict_extract(self):
+ distro = self._make_distro('ubuntu', bcfg)
+ ug_cfg = {
+ 'users': [
+ 'default',
+ ],
+ }
+ (users, _groups) = self._norm(ug_cfg, distro)
+ self.assertIn('bob', users)
+ (name, config) = distros.extract_default(users)
+ self.assertEquals(name, 'bob')
+ expected_config = {}
+ def_config = None
+ try:
+ def_config = distro.get_default_user()
+ except NotImplementedError:
+ pass
+ if not def_config:
+ def_config = {}
+ expected_config.update(def_config)
+
+ # Ignore these for now
+ expected_config.pop('name', None)
+ expected_config.pop('groups', None)
+ config.pop('groups', None)
+ self.assertEquals(config, expected_config)
+
+ def test_users_dict_default(self):
+ distro = self._make_distro('ubuntu', bcfg)
+ ug_cfg = {
+ 'users': [
+ 'default',
+ ],
+ }
+ (users, _groups) = self._norm(ug_cfg, distro)
+ self.assertIn('bob', users)
+ self.assertEquals(",".join(distro.get_default_user()['groups']),
+ users['bob']['groups'])
+ self.assertEquals(True,
+ users['bob']['default'])
+
+ def test_users_dict_trans(self):
+ distro = self._make_distro('ubuntu')
+ ug_cfg = {
+ 'users': [
+ {'name': 'joe',
+ 'tr-me': True},
+ {'name': 'bob'},
+ ],
+ }
+ (users, _groups) = self._norm(ug_cfg, distro)
+ self.assertIn('joe', users)
+ self.assertIn('bob', users)
+ self.assertEquals({'tr_me': True, 'default': False}, users['joe'])
+ self.assertEquals({'default': False}, users['bob'])
+
+ def test_users_dict(self):
+ distro = self._make_distro('ubuntu')
+ ug_cfg = {
+ 'users': [
+ {'name': 'joe'},
+ {'name': 'bob'},
+ ],
+ }
+ (users, _groups) = self._norm(ug_cfg, distro)
+ self.assertIn('joe', users)
+ self.assertIn('bob', users)
+ self.assertEquals({'default': False}, users['joe'])
+ self.assertEquals({'default': False}, users['bob'])
diff --git a/tests/unittests/test_util.py b/tests/unittests/test_util.py
index 15fcbd26..96962b91 100644
--- a/tests/unittests/test_util.py
+++ b/tests/unittests/test_util.py
@@ -1,5 +1,6 @@
import os
import stat
+import yaml
from mocker import MockerTestCase
from unittest import TestCase
@@ -268,4 +269,42 @@ class TestGetCmdline(TestCase):
os.environ['DEBUG_PROC_CMDLINE'] = 'abcd 123'
self.assertEqual(os.environ['DEBUG_PROC_CMDLINE'], util.get_cmdline())
+
+class TestLoadYaml(TestCase):
+ mydefault = "7b03a8ebace993d806255121073fed52"
+
+ def test_simple(self):
+ mydata = {'1': "one", '2': "two"}
+ self.assertEqual(util.load_yaml(yaml.dump(mydata)), mydata)
+
+ def test_nonallowed_returns_default(self):
+ # for now, anything not in the allowed list just returns the default.
+ myyaml = yaml.dump({'1': "one"})
+ self.assertEqual(util.load_yaml(blob=myyaml,
+ default=self.mydefault,
+ allowed=(str,)),
+ self.mydefault)
+
+ def test_bogus_returns_default(self):
+ badyaml = "1\n 2:"
+ self.assertEqual(util.load_yaml(blob=badyaml,
+ default=self.mydefault),
+ self.mydefault)
+
+ def test_unsafe_types(self):
+ # should not load complex types
+ unsafe_yaml = yaml.dump((1, 2, 3,))
+ self.assertEqual(util.load_yaml(blob=unsafe_yaml,
+ default=self.mydefault),
+ self.mydefault)
+
+ def test_python_unicode(self):
+ # complex type of python/unicde is explicitly allowed
+ myobj = {'1': unicode("FOOBAR")}
+ safe_yaml = yaml.dump(myobj)
+ self.assertEqual(util.load_yaml(blob=safe_yaml,
+ default=self.mydefault),
+ myobj)
+
+
# vi: ts=4 expandtab