diff options
Diffstat (limited to 'tests')
-rw-r--r-- | tests/unittests/test_datasource/test_configdrive.py | 145 | ||||
-rw-r--r-- | tests/unittests/test_distros/test_user_data_normalize.py | 279 | ||||
-rw-r--r-- | tests/unittests/test_util.py | 39 |
3 files changed, 453 insertions, 10 deletions
diff --git a/tests/unittests/test_datasource/test_configdrive.py b/tests/unittests/test_datasource/test_configdrive.py index 55573114..00379e03 100644 --- a/tests/unittests/test_datasource/test_configdrive.py +++ b/tests/unittests/test_datasource/test_configdrive.py @@ -2,10 +2,12 @@ from copy import copy import json import os import os.path -import shutil -import tempfile -from unittest import TestCase +import mocker +from mocker import MockerTestCase + +from cloudinit import helpers +from cloudinit import settings from cloudinit.sources import DataSourceConfigDrive as ds from cloudinit import util @@ -60,17 +62,140 @@ CFG_DRIVE_FILES_V2 = { 'openstack/latest/user_data': USER_DATA} -class TestConfigDriveDataSource(TestCase): +class TestConfigDriveDataSource(MockerTestCase): def setUp(self): super(TestConfigDriveDataSource, self).setUp() - self.tmp = tempfile.mkdtemp() + self.tmp = self.makeDir() - def tearDown(self): - try: - shutil.rmtree(self.tmp) - except OSError: - pass + def test_ec2_metadata(self): + populate_dir(self.tmp, CFG_DRIVE_FILES_V2) + found = ds.read_config_drive_dir(self.tmp) + self.assertTrue('ec2-metadata' in found) + ec2_md = found['ec2-metadata'] + self.assertEqual(EC2_META, ec2_md) + + def test_dev_os_remap(self): + populate_dir(self.tmp, CFG_DRIVE_FILES_V2) + cfg_ds = ds.DataSourceConfigDrive(settings.CFG_BUILTIN, + None, + helpers.Paths({})) + found = ds.read_config_drive_dir(self.tmp) + cfg_ds.metadata = found['metadata'] + name_tests = { + 'ami': '/dev/vda1', + 'root': '/dev/vda1', + 'ephemeral0': '/dev/vda2', + 'swap': '/dev/vda3', + } + for name, dev_name in name_tests.items(): + my_mock = mocker.Mocker() + find_mock = my_mock.replace(util.find_devs_with, + spec=False, passthrough=False) + provided_name = dev_name[len('/dev/'):] + provided_name = "s" + provided_name[1:] + find_mock(mocker.ARGS) + my_mock.result([provided_name]) + exists_mock = my_mock.replace(os.path.exists, + spec=False, passthrough=False) + exists_mock(mocker.ARGS) + my_mock.result(False) + exists_mock(mocker.ARGS) + my_mock.result(True) + my_mock.replay() + device = cfg_ds.device_name_to_device(name) + my_mock.restore() + self.assertEquals(dev_name, device) + + def test_dev_os_map(self): + populate_dir(self.tmp, CFG_DRIVE_FILES_V2) + cfg_ds = ds.DataSourceConfigDrive(settings.CFG_BUILTIN, + None, + helpers.Paths({})) + found = ds.read_config_drive_dir(self.tmp) + os_md = found['metadata'] + cfg_ds.metadata = os_md + name_tests = { + 'ami': '/dev/vda1', + 'root': '/dev/vda1', + 'ephemeral0': '/dev/vda2', + 'swap': '/dev/vda3', + } + for name, dev_name in name_tests.items(): + my_mock = mocker.Mocker() + find_mock = my_mock.replace(util.find_devs_with, + spec=False, passthrough=False) + find_mock(mocker.ARGS) + my_mock.result([dev_name]) + exists_mock = my_mock.replace(os.path.exists, + spec=False, passthrough=False) + exists_mock(mocker.ARGS) + my_mock.result(True) + my_mock.replay() + device = cfg_ds.device_name_to_device(name) + my_mock.restore() + self.assertEquals(dev_name, device) + + def test_dev_ec2_remap(self): + populate_dir(self.tmp, CFG_DRIVE_FILES_V2) + cfg_ds = ds.DataSourceConfigDrive(settings.CFG_BUILTIN, + None, + helpers.Paths({})) + found = ds.read_config_drive_dir(self.tmp) + ec2_md = found['ec2-metadata'] + os_md = found['metadata'] + cfg_ds.ec2_metadata = ec2_md + cfg_ds.metadata = os_md + name_tests = { + 'ami': '/dev/vda1', + 'root': '/dev/vda1', + 'ephemeral0': '/dev/vda2', + 'swap': '/dev/vda3', + None: None, + 'bob': None, + 'root2k': None, + } + for name, dev_name in name_tests.items(): + my_mock = mocker.Mocker() + exists_mock = my_mock.replace(os.path.exists, + spec=False, passthrough=False) + exists_mock(mocker.ARGS) + my_mock.result(False) + exists_mock(mocker.ARGS) + my_mock.result(True) + my_mock.replay() + device = cfg_ds.device_name_to_device(name) + self.assertEquals(dev_name, device) + my_mock.restore() + + def test_dev_ec2_map(self): + populate_dir(self.tmp, CFG_DRIVE_FILES_V2) + cfg_ds = ds.DataSourceConfigDrive(settings.CFG_BUILTIN, + None, + helpers.Paths({})) + found = ds.read_config_drive_dir(self.tmp) + exists_mock = self.mocker.replace(os.path.exists, + spec=False, passthrough=False) + exists_mock(mocker.ARGS) + self.mocker.count(0, None) + self.mocker.result(True) + self.mocker.replay() + ec2_md = found['ec2-metadata'] + os_md = found['metadata'] + cfg_ds.ec2_metadata = ec2_md + cfg_ds.metadata = os_md + name_tests = { + 'ami': '/dev/sda1', + 'root': '/dev/sda1', + 'ephemeral0': '/dev/sda2', + 'swap': '/dev/sda3', + None: None, + 'bob': None, + 'root2k': None, + } + for name, dev_name in name_tests.items(): + device = cfg_ds.device_name_to_device(name) + self.assertEquals(dev_name, device) def test_dir_valid(self): """Verify a dir is read as such.""" diff --git a/tests/unittests/test_distros/test_user_data_normalize.py b/tests/unittests/test_distros/test_user_data_normalize.py new file mode 100644 index 00000000..8f0d8896 --- /dev/null +++ b/tests/unittests/test_distros/test_user_data_normalize.py @@ -0,0 +1,279 @@ +from mocker import MockerTestCase + +from cloudinit import distros +from cloudinit import helpers +from cloudinit import settings + +bcfg = { + 'name': 'bob', + 'plain_text_passwd': 'ubuntu', + 'home': "/home/ubuntu", + 'shell': "/bin/bash", + 'lock_passwd': True, + 'gecos': "Ubuntu", + 'groups': ["foo"] +} + + +class TestUGNormalize(MockerTestCase): + + def _make_distro(self, dtype, def_user=None): + cfg = dict(settings.CFG_BUILTIN) + cfg['system_info']['distro'] = dtype + paths = helpers.Paths(cfg['system_info']['paths']) + distro_cls = distros.fetch(dtype) + if def_user: + cfg['system_info']['default_user'] = def_user.copy() + distro = distro_cls(dtype, cfg['system_info'], paths) + return distro + + def _norm(self, cfg, distro): + return distros.normalize_users_groups(cfg, distro) + + def test_basic_groups(self): + distro = self._make_distro('ubuntu') + ug_cfg = { + 'groups': ['bob'], + } + (users, groups) = self._norm(ug_cfg, distro) + self.assertIn('bob', groups) + self.assertEquals({}, users) + + def test_csv_groups(self): + distro = self._make_distro('ubuntu') + ug_cfg = { + 'groups': 'bob,joe,steve', + } + (users, groups) = self._norm(ug_cfg, distro) + self.assertIn('bob', groups) + self.assertIn('joe', groups) + self.assertIn('steve', groups) + self.assertEquals({}, users) + + def test_more_groups(self): + distro = self._make_distro('ubuntu') + ug_cfg = { + 'groups': ['bob', 'joe', 'steve'] + } + (users, groups) = self._norm(ug_cfg, distro) + self.assertIn('bob', groups) + self.assertIn('joe', groups) + self.assertIn('steve', groups) + self.assertEquals({}, users) + + def test_member_groups(self): + distro = self._make_distro('ubuntu') + ug_cfg = { + 'groups': { + 'bob': ['s'], + 'joe': [], + 'steve': [], + } + } + (users, groups) = self._norm(ug_cfg, distro) + self.assertIn('bob', groups) + self.assertEquals(['s'], groups['bob']) + self.assertEquals([], groups['joe']) + self.assertIn('joe', groups) + self.assertIn('steve', groups) + self.assertEquals({}, users) + + def test_users_simple_dict(self): + distro = self._make_distro('ubuntu', bcfg) + ug_cfg = { + 'users': { + 'default': True, + } + } + (users, _groups) = self._norm(ug_cfg, distro) + self.assertIn('bob', users) + ug_cfg = { + 'users': { + 'default': 'yes', + } + } + (users, _groups) = self._norm(ug_cfg, distro) + self.assertIn('bob', users) + ug_cfg = { + 'users': { + 'default': '1', + } + } + (users, _groups) = self._norm(ug_cfg, distro) + self.assertIn('bob', users) + + def test_users_simple_dict_no(self): + distro = self._make_distro('ubuntu', bcfg) + ug_cfg = { + 'users': { + 'default': False, + } + } + (users, _groups) = self._norm(ug_cfg, distro) + self.assertEquals({}, users) + ug_cfg = { + 'users': { + 'default': 'no', + } + } + (users, _groups) = self._norm(ug_cfg, distro) + self.assertEquals({}, users) + + def test_users_simple_csv(self): + distro = self._make_distro('ubuntu') + ug_cfg = { + 'users': 'joe,bob', + } + (users, _groups) = self._norm(ug_cfg, distro) + self.assertIn('joe', users) + self.assertIn('bob', users) + self.assertEquals({'default': False}, users['joe']) + self.assertEquals({'default': False}, users['bob']) + + def test_users_simple(self): + distro = self._make_distro('ubuntu') + ug_cfg = { + 'users': [ + 'joe', + 'bob' + ], + } + (users, _groups) = self._norm(ug_cfg, distro) + self.assertIn('joe', users) + self.assertIn('bob', users) + self.assertEquals({'default': False}, users['joe']) + self.assertEquals({'default': False}, users['bob']) + + def test_users_old_user(self): + distro = self._make_distro('ubuntu', bcfg) + ug_cfg = { + 'user': 'zetta', + 'users': 'default' + } + (users, _groups) = self._norm(ug_cfg, distro) + self.assertIn('bob', users) + self.assertIn('zetta', users) + self.assertNotIn('default', users) + ug_cfg = { + 'user': 'zetta', + 'users': 'default, joe' + } + (users, _groups) = self._norm(ug_cfg, distro) + self.assertIn('bob', users) + self.assertIn('joe', users) + self.assertIn('zetta', users) + self.assertNotIn('default', users) + ug_cfg = { + 'user': 'zetta', + 'users': ['bob', 'joe'] + } + (users, _groups) = self._norm(ug_cfg, distro) + self.assertNotIn('bob', users) + self.assertIn('joe', users) + self.assertIn('zetta', users) + ug_cfg = { + 'user': 'zetta', + 'users': { + 'bob': True, + 'joe': True, + } + } + (users, _groups) = self._norm(ug_cfg, distro) + self.assertIn('bob', users) + self.assertIn('joe', users) + self.assertIn('zetta', users) + ug_cfg = { + 'user': 'zetta', + } + (users, _groups) = self._norm(ug_cfg, distro) + self.assertIn('zetta', users) + ug_cfg = {} + (users, groups) = self._norm(ug_cfg, distro) + self.assertEquals({}, users) + self.assertEquals({}, groups) + + def test_users_dict_default_additional(self): + distro = self._make_distro('ubuntu', bcfg) + ug_cfg = { + 'users': [ + {'name': 'default', 'blah': True} + ], + } + (users, _groups) = self._norm(ug_cfg, distro) + self.assertIn('bob', users) + self.assertEquals(",".join(distro.get_default_user()['groups']), + users['bob']['groups']) + self.assertEquals(True, + users['bob']['blah']) + self.assertEquals(True, + users['bob']['default']) + + def test_users_dict_extract(self): + distro = self._make_distro('ubuntu', bcfg) + ug_cfg = { + 'users': [ + 'default', + ], + } + (users, _groups) = self._norm(ug_cfg, distro) + self.assertIn('bob', users) + (name, config) = distros.extract_default(users) + self.assertEquals(name, 'bob') + expected_config = {} + def_config = None + try: + def_config = distro.get_default_user() + except NotImplementedError: + pass + if not def_config: + def_config = {} + expected_config.update(def_config) + + # Ignore these for now + expected_config.pop('name', None) + expected_config.pop('groups', None) + config.pop('groups', None) + self.assertEquals(config, expected_config) + + def test_users_dict_default(self): + distro = self._make_distro('ubuntu', bcfg) + ug_cfg = { + 'users': [ + 'default', + ], + } + (users, _groups) = self._norm(ug_cfg, distro) + self.assertIn('bob', users) + self.assertEquals(",".join(distro.get_default_user()['groups']), + users['bob']['groups']) + self.assertEquals(True, + users['bob']['default']) + + def test_users_dict_trans(self): + distro = self._make_distro('ubuntu') + ug_cfg = { + 'users': [ + {'name': 'joe', + 'tr-me': True}, + {'name': 'bob'}, + ], + } + (users, _groups) = self._norm(ug_cfg, distro) + self.assertIn('joe', users) + self.assertIn('bob', users) + self.assertEquals({'tr_me': True, 'default': False}, users['joe']) + self.assertEquals({'default': False}, users['bob']) + + def test_users_dict(self): + distro = self._make_distro('ubuntu') + ug_cfg = { + 'users': [ + {'name': 'joe'}, + {'name': 'bob'}, + ], + } + (users, _groups) = self._norm(ug_cfg, distro) + self.assertIn('joe', users) + self.assertIn('bob', users) + self.assertEquals({'default': False}, users['joe']) + self.assertEquals({'default': False}, users['bob']) diff --git a/tests/unittests/test_util.py b/tests/unittests/test_util.py index 15fcbd26..96962b91 100644 --- a/tests/unittests/test_util.py +++ b/tests/unittests/test_util.py @@ -1,5 +1,6 @@ import os import stat +import yaml from mocker import MockerTestCase from unittest import TestCase @@ -268,4 +269,42 @@ class TestGetCmdline(TestCase): os.environ['DEBUG_PROC_CMDLINE'] = 'abcd 123' self.assertEqual(os.environ['DEBUG_PROC_CMDLINE'], util.get_cmdline()) + +class TestLoadYaml(TestCase): + mydefault = "7b03a8ebace993d806255121073fed52" + + def test_simple(self): + mydata = {'1': "one", '2': "two"} + self.assertEqual(util.load_yaml(yaml.dump(mydata)), mydata) + + def test_nonallowed_returns_default(self): + # for now, anything not in the allowed list just returns the default. + myyaml = yaml.dump({'1': "one"}) + self.assertEqual(util.load_yaml(blob=myyaml, + default=self.mydefault, + allowed=(str,)), + self.mydefault) + + def test_bogus_returns_default(self): + badyaml = "1\n 2:" + self.assertEqual(util.load_yaml(blob=badyaml, + default=self.mydefault), + self.mydefault) + + def test_unsafe_types(self): + # should not load complex types + unsafe_yaml = yaml.dump((1, 2, 3,)) + self.assertEqual(util.load_yaml(blob=unsafe_yaml, + default=self.mydefault), + self.mydefault) + + def test_python_unicode(self): + # complex type of python/unicde is explicitly allowed + myobj = {'1': unicode("FOOBAR")} + safe_yaml = yaml.dump(myobj) + self.assertEqual(util.load_yaml(blob=safe_yaml, + default=self.mydefault), + myobj) + + # vi: ts=4 expandtab |