diff options
Diffstat (limited to 'tests/unittests/test_datasource')
-rw-r--r-- | tests/unittests/test_datasource/test_azure.py | 60 | ||||
-rw-r--r-- | tests/unittests/test_datasource/test_opennebula.py | 267 | ||||
-rw-r--r-- | tests/unittests/test_datasource/test_smartos.py | 183 |
3 files changed, 470 insertions, 40 deletions
diff --git a/tests/unittests/test_datasource/test_azure.py b/tests/unittests/test_datasource/test_azure.py index 1ca6a79d..aad84206 100644 --- a/tests/unittests/test_datasource/test_azure.py +++ b/tests/unittests/test_datasource/test_azure.py @@ -2,8 +2,8 @@ from cloudinit import helpers from cloudinit.sources import DataSourceAzure from tests.unittests.helpers import populate_dir -import crypt import base64 +import crypt from mocker import MockerTestCase import os import yaml @@ -120,8 +120,7 @@ class TestAzureDataSource(MockerTestCase): mod = DataSourceAzure - if data.get('dsdevs'): - self.apply_patches([(mod, 'list_possible_azure_ds_devs', dsdevs)]) + self.apply_patches([(mod, 'list_possible_azure_ds_devs', dsdevs)]) self.apply_patches([(mod, 'invoke_agent', _invoke_agent), (mod, 'write_files', _write_files), @@ -154,9 +153,12 @@ class TestAzureDataSource(MockerTestCase): def test_user_cfg_set_agent_command_plain(self): # set dscfg in via plaintext - cfg = {'agent_command': "my_command"} + # we must have friendly-to-xml formatted plaintext in yaml_cfg + # not all plaintext is expected to work. + yaml_cfg = "{agent_command: my_command}\n" + cfg = yaml.safe_load(yaml_cfg) odata = {'HostName': "myhost", 'UserName': "myuser", - 'dscfg': {'text': yaml.dump(cfg), 'encoding': 'plain'}} + 'dscfg': {'text': yaml_cfg, 'encoding': 'plain'}} data = {'ovfcontent': construct_valid_ovf_env(data=odata)} dsrc = self._get_ds(data) @@ -290,11 +292,57 @@ class TestAzureDataSource(MockerTestCase): self.assertEqual(data.get('apply_hostname_bounce', "N/A"), "N/A") + def test_default_ephemeral(self): + # make sure the ephemeral device works + odata = {} + data = {'ovfcontent': construct_valid_ovf_env(data=odata), + 'sys_cfg': {}} + + dsrc = self._get_ds(data) + ret = dsrc.get_data() + self.assertTrue(ret) + cfg = dsrc.get_config_obj() + + self.assertEquals(dsrc.device_name_to_device("ephemeral0"), + "/dev/sdb") + assert 'disk_setup' in cfg + assert 'fs_setup' in cfg + self.assertIsInstance(cfg['disk_setup'], dict) + self.assertIsInstance(cfg['fs_setup'], list) + + def test_provide_disk_aliases(self): + # Make sure that user can affect disk aliases + dscfg = {'disk_aliases': {'ephemeral0': '/dev/sdc'}} + odata = {'HostName': "myhost", 'UserName': "myuser", + 'dscfg': {'text': base64.b64encode(yaml.dump(dscfg)), + 'encoding': 'base64'}} + usercfg = {'disk_setup': {'/dev/sdc': {'something': '...'}, + 'ephemeral0': False}} + userdata = '#cloud-config' + yaml.dump(usercfg) + "\n" + + ovfcontent = construct_valid_ovf_env(data=odata, userdata=userdata) + data = {'ovfcontent': ovfcontent, 'sys_cfg': {}} + + dsrc = self._get_ds(data) + ret = dsrc.get_data() + self.assertTrue(ret) + cfg = dsrc.get_config_obj() + self.assertTrue(cfg) + + def test_userdata_arrives(self): + userdata = "This is my user-data" + xml = construct_valid_ovf_env(data={}, userdata=userdata) + data = {'ovfcontent': xml} + dsrc = self._get_ds(data) + dsrc.get_data() + + self.assertEqual(userdata, dsrc.userdata_raw) + class TestReadAzureOvf(MockerTestCase): def test_invalid_xml_raises_non_azure_ds(self): invalid_xml = "<foo>" + construct_valid_ovf_env(data={}) - self.assertRaises(DataSourceAzure.NonAzureDataSource, + self.assertRaises(DataSourceAzure.BrokenAzureDataSource, DataSourceAzure.read_azure_ovf, invalid_xml) def test_load_with_pubkeys(self): diff --git a/tests/unittests/test_datasource/test_opennebula.py b/tests/unittests/test_datasource/test_opennebula.py new file mode 100644 index 00000000..e1812a88 --- /dev/null +++ b/tests/unittests/test_datasource/test_opennebula.py @@ -0,0 +1,267 @@ +from cloudinit import helpers +from cloudinit.sources import DataSourceOpenNebula as ds +from cloudinit import util +from mocker import MockerTestCase +from tests.unittests.helpers import populate_dir + +import os +import pwd + +TEST_VARS = { + 'VAR1': 'single', + 'VAR2': 'double word', + 'VAR3': 'multi\nline\n', + 'VAR4': "'single'", + 'VAR5': "'double word'", + 'VAR6': "'multi\nline\n'", + 'VAR7': 'single\\t', + 'VAR8': 'double\\tword', + 'VAR9': 'multi\\t\nline\n', + 'VAR10': '\\', # expect \ + 'VAR11': '\'', # expect ' + 'VAR12': '$', # expect $ +} + +INVALID_CONTEXT = ';' +USER_DATA = '#cloud-config\napt_upgrade: true' +SSH_KEY = 'ssh-rsa AAAAB3NzaC1....sIkJhq8wdX+4I3A4cYbYP ubuntu@server-460-%i' +HOSTNAME = 'foo.example.com' +PUBLIC_IP = '10.0.0.3' + +CMD_IP_OUT = '''\ +1: lo: <LOOPBACK,UP,LOWER_UP> mtu 16436 qdisc noqueue state UNKNOWN + link/loopback 00:00:00:00:00:00 brd 00:00:00:00:00:00 +2: eth0: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc mq state UP qlen 1000 + link/ether 02:00:0a:12:01:01 brd ff:ff:ff:ff:ff:ff +''' + + +class TestOpenNebulaDataSource(MockerTestCase): + parsed_user = None + + def setUp(self): + super(TestOpenNebulaDataSource, self).setUp() + self.tmp = self.makeDir() + self.paths = helpers.Paths({'cloud_dir': self.tmp}) + + # defaults for few tests + self.ds = ds.DataSourceOpenNebula + self.seed_dir = os.path.join(self.paths.seed_dir, "opennebula") + self.sys_cfg = {'datasource': {'OpenNebula': {'dsmode': 'local'}}} + + # we don't want 'sudo' called in tests. so we patch switch_user_cmd + def my_switch_user_cmd(user): + self.parsed_user = user + return [] + + self.switch_user_cmd_real = ds.switch_user_cmd + ds.switch_user_cmd = my_switch_user_cmd + + def tearDown(self): + ds.switch_user_cmd = self.switch_user_cmd_real + super(TestOpenNebulaDataSource, self).tearDown() + + def test_get_data_non_contextdisk(self): + orig_find_devs_with = util.find_devs_with + try: + # dont' try to lookup for CDs + util.find_devs_with = lambda n: [] + dsrc = self.ds(sys_cfg=self.sys_cfg, distro=None, paths=self.paths) + ret = dsrc.get_data() + self.assertFalse(ret) + finally: + util.find_devs_with = orig_find_devs_with + + def test_get_data_broken_contextdisk(self): + orig_find_devs_with = util.find_devs_with + try: + # dont' try to lookup for CDs + util.find_devs_with = lambda n: [] + populate_dir(self.seed_dir, {'context.sh': INVALID_CONTEXT}) + dsrc = self.ds(sys_cfg=self.sys_cfg, distro=None, paths=self.paths) + self.assertRaises(ds.BrokenContextDiskDir, dsrc.get_data) + finally: + util.find_devs_with = orig_find_devs_with + + def test_get_data_invalid_identity(self): + orig_find_devs_with = util.find_devs_with + try: + # generate non-existing system user name + sys_cfg = self.sys_cfg + invalid_user = 'invalid' + while not sys_cfg['datasource']['OpenNebula'].get('parseuser'): + try: + pwd.getpwnam(invalid_user) + invalid_user += 'X' + except KeyError: + sys_cfg['datasource']['OpenNebula']['parseuser'] = \ + invalid_user + + # dont' try to lookup for CDs + util.find_devs_with = lambda n: [] + populate_context_dir(self.seed_dir, {'KEY1': 'val1'}) + dsrc = self.ds(sys_cfg=sys_cfg, distro=None, paths=self.paths) + self.assertRaises(ds.BrokenContextDiskDir, dsrc.get_data) + finally: + util.find_devs_with = orig_find_devs_with + + def test_get_data(self): + orig_find_devs_with = util.find_devs_with + try: + # dont' try to lookup for CDs + util.find_devs_with = lambda n: [] + populate_context_dir(self.seed_dir, {'KEY1': 'val1'}) + dsrc = self.ds(sys_cfg=self.sys_cfg, distro=None, paths=self.paths) + ret = dsrc.get_data() + self.assertTrue(ret) + finally: + util.find_devs_with = orig_find_devs_with + + def test_seed_dir_non_contextdisk(self): + self.assertRaises(ds.NonContextDiskDir, ds.read_context_disk_dir, + self.seed_dir) + + def test_seed_dir_empty1_context(self): + populate_dir(self.seed_dir, {'context.sh': ''}) + results = ds.read_context_disk_dir(self.seed_dir) + + self.assertEqual(results['userdata'], None) + self.assertEqual(results['metadata'], {}) + + def test_seed_dir_empty2_context(self): + populate_context_dir(self.seed_dir, {}) + results = ds.read_context_disk_dir(self.seed_dir) + + self.assertEqual(results['userdata'], None) + self.assertEqual(results['metadata'], {}) + + def test_seed_dir_broken_context(self): + populate_dir(self.seed_dir, {'context.sh': INVALID_CONTEXT}) + + self.assertRaises(ds.BrokenContextDiskDir, + ds.read_context_disk_dir, + self.seed_dir) + + def test_context_parser(self): + populate_context_dir(self.seed_dir, TEST_VARS) + results = ds.read_context_disk_dir(self.seed_dir) + + self.assertTrue('metadata' in results) + self.assertEqual(TEST_VARS, results['metadata']) + + def test_ssh_key(self): + public_keys = ['first key', 'second key'] + for c in range(4): + for k in ('SSH_KEY', 'SSH_PUBLIC_KEY'): + my_d = os.path.join(self.tmp, "%s-%i" % (k, c)) + populate_context_dir(my_d, {k: '\n'.join(public_keys)}) + results = ds.read_context_disk_dir(my_d) + + self.assertTrue('metadata' in results) + self.assertTrue('public-keys' in results['metadata']) + self.assertEqual(public_keys, + results['metadata']['public-keys']) + + public_keys.append(SSH_KEY % (c + 1,)) + + def test_user_data(self): + for k in ('USER_DATA', 'USERDATA'): + my_d = os.path.join(self.tmp, k) + populate_context_dir(my_d, {k: USER_DATA}) + results = ds.read_context_disk_dir(my_d) + + self.assertTrue('userdata' in results) + self.assertEqual(USER_DATA, results['userdata']) + + def test_hostname(self): + for k in ('HOSTNAME', 'PUBLIC_IP', 'IP_PUBLIC', 'ETH0_IP'): + my_d = os.path.join(self.tmp, k) + populate_context_dir(my_d, {k: PUBLIC_IP}) + results = ds.read_context_disk_dir(my_d) + + self.assertTrue('metadata' in results) + self.assertTrue('local-hostname' in results['metadata']) + self.assertEqual(PUBLIC_IP, results['metadata']['local-hostname']) + + def test_network_interfaces(self): + populate_context_dir(self.seed_dir, {'ETH0_IP': '1.2.3.4'}) + results = ds.read_context_disk_dir(self.seed_dir) + + self.assertTrue('network-interfaces' in results) + + def test_find_candidates(self): + def my_devs_with(criteria): + return { + "LABEL=CONTEXT": ["/dev/sdb"], + "LABEL=CDROM": ["/dev/sr0"], + "TYPE=iso9660": ["/dev/vdb"], + }.get(criteria, []) + + orig_find_devs_with = util.find_devs_with + try: + util.find_devs_with = my_devs_with + self.assertEqual(["/dev/sdb", "/dev/sr0", "/dev/vdb"], + ds.find_candidate_devs()) + finally: + util.find_devs_with = orig_find_devs_with + + +class TestOpenNebulaNetwork(MockerTestCase): + + def setUp(self): + super(TestOpenNebulaNetwork, self).setUp() + + def test_lo(self): + net = ds.OpenNebulaNetwork('', {}) + self.assertEqual(net.gen_conf(), u'''\ +auto lo +iface lo inet loopback +''') + + def test_eth0(self): + net = ds.OpenNebulaNetwork(CMD_IP_OUT, {}) + self.assertEqual(net.gen_conf(), u'''\ +auto lo +iface lo inet loopback + +auto eth0 +iface eth0 inet static + address 10.18.1.1 + network 10.18.1.0 + netmask 255.255.255.0 +''') + + def test_eth0_override(self): + context = { + 'DNS': '1.2.3.8', + 'ETH0_IP': '1.2.3.4', + 'ETH0_NETWORK': '1.2.3.0', + 'ETH0_MASK': '255.255.0.0', + 'ETH0_GATEWAY': '1.2.3.5', + 'ETH0_DOMAIN': 'example.com', + 'ETH0_DNS': '1.2.3.6 1.2.3.7' + } + + net = ds.OpenNebulaNetwork(CMD_IP_OUT, context) + self.assertEqual(net.gen_conf(), u'''\ +auto lo +iface lo inet loopback + +auto eth0 +iface eth0 inet static + address 1.2.3.4 + network 1.2.3.0 + netmask 255.255.0.0 + gateway 1.2.3.5 + dns-search example.com + dns-nameservers 1.2.3.8 1.2.3.6 1.2.3.7 +''') + + +def populate_context_dir(path, variables): + data = "# Context variables generated by OpenNebula\n" + for (k, v) in variables.iteritems(): + data += ("%s='%s'\n" % (k.upper(), v.replace(r"'", r"'\''"))) + populate_dir(path, {'context.sh': data}) + +# vi: ts=4 expandtab diff --git a/tests/unittests/test_datasource/test_smartos.py b/tests/unittests/test_datasource/test_smartos.py index 6c12f1e2..956767d8 100644 --- a/tests/unittests/test_datasource/test_smartos.py +++ b/tests/unittests/test_datasource/test_smartos.py @@ -22,25 +22,24 @@ # return responses. # +import base64 from cloudinit import helpers from cloudinit.sources import DataSourceSmartOS from mocker import MockerTestCase import uuid -mock_returns = { +MOCK_RETURNS = { 'hostname': 'test-host', 'root_authorized_keys': 'ssh-rsa AAAAB3Nz...aC1yc2E= keyname', 'disable_iptables_flag': None, 'enable_motd_sys_info': None, - 'system_uuid': str(uuid.uuid4()), - 'smartdc': 'smartdc', - 'userdata': """ -#!/bin/sh -/bin/true -""", + 'test-var1': 'some data', + 'user-data': '\n'.join(['#!/bin/sh', '/bin/true', '']), } +DMI_DATA_RETURN = (str(uuid.uuid4()), 'smartdc') + class MockSerial(object): """Fake a serial terminal for testing the code that @@ -48,12 +47,13 @@ class MockSerial(object): port = None - def __init__(self): + def __init__(self, mockdata): self.last = None self.last = None self.new = True self.count = 0 self.mocked_out = [] + self.mockdata = mockdata def open(self): return True @@ -71,30 +71,30 @@ class MockSerial(object): def readline(self): if self.new: self.new = False - if self.last in mock_returns: + if self.last in self.mockdata: return 'SUCCESS\n' else: return 'NOTFOUND %s\n' % self.last - if self.last in mock_returns: + if self.last in self.mockdata: if not self.mocked_out: self.mocked_out = [x for x in self._format_out()] - print self.mocked_out if len(self.mocked_out) > self.count: self.count += 1 return self.mocked_out[self.count - 1] def _format_out(self): - if self.last in mock_returns: + if self.last in self.mockdata: + _mret = self.mockdata[self.last] try: - for l in mock_returns[self.last].splitlines(): - yield "%s\n" % l + for l in _mret.splitlines(): + yield "%s\n" % l.rstrip() except: - yield "%s\n" % mock_returns[self.last] + yield "%s\n" % _mret.rstrip() - yield '\n' yield '.' + yield '\n' class TestSmartOSDataSource(MockerTestCase): @@ -116,23 +116,36 @@ class TestSmartOSDataSource(MockerTestCase): ret = apply_patches(patches) self.unapply += ret - def _get_ds(self): + def _get_ds(self, sys_cfg=None, ds_cfg=None, mockdata=None, dmi_data=None): + mod = DataSourceSmartOS + + if mockdata is None: + mockdata = MOCK_RETURNS + + if dmi_data is None: + dmi_data = DMI_DATA_RETURN def _get_serial(*_): - return MockSerial() + return MockSerial(mockdata) def _dmi_data(): - return mock_returns['system_uuid'], 'smartdc' + return dmi_data + + if sys_cfg is None: + sys_cfg = {} + + if ds_cfg is not None: + sys_cfg['datasource'] = sys_cfg.get('datasource', {}) + sys_cfg['datasource']['SmartOS'] = ds_cfg - data = {'sys_cfg': {}} - mod = DataSourceSmartOS self.apply_patches([(mod, 'get_serial', _get_serial)]) self.apply_patches([(mod, 'dmi_data', _dmi_data)]) - dsrc = mod.DataSourceSmartOS( - data.get('sys_cfg', {}), distro=None, paths=self.paths) + dsrc = mod.DataSourceSmartOS(sys_cfg, distro=None, + paths=self.paths) return dsrc def test_seed(self): + # default seed should be /dev/ttyS1 dsrc = self._get_ds() ret = dsrc.get_data() self.assertTrue(ret) @@ -144,41 +157,143 @@ class TestSmartOSDataSource(MockerTestCase): self.assertTrue(ret) self.assertTrue(dsrc.is_smartdc) + def test_no_base64(self): + ds_cfg = {'no_base64_decode': ['test_var1'], 'all_base': True} + dsrc = self._get_ds(ds_cfg=ds_cfg) + ret = dsrc.get_data() + self.assertTrue(ret) + def test_uuid(self): - dsrc = self._get_ds() + dsrc = self._get_ds(mockdata=MOCK_RETURNS) ret = dsrc.get_data() self.assertTrue(ret) - self.assertEquals(mock_returns['system_uuid'], - dsrc.metadata['instance-id']) + self.assertEquals(DMI_DATA_RETURN[0], dsrc.metadata['instance-id']) def test_root_keys(self): - dsrc = self._get_ds() + dsrc = self._get_ds(mockdata=MOCK_RETURNS) ret = dsrc.get_data() self.assertTrue(ret) - self.assertEquals(mock_returns['root_authorized_keys'], + self.assertEquals(MOCK_RETURNS['root_authorized_keys'], dsrc.metadata['public-keys']) + def test_hostname_b64(self): + dsrc = self._get_ds(mockdata=MOCK_RETURNS) + ret = dsrc.get_data() + self.assertTrue(ret) + self.assertEquals(MOCK_RETURNS['hostname'], + dsrc.metadata['local-hostname']) + def test_hostname(self): - dsrc = self._get_ds() + dsrc = self._get_ds(mockdata=MOCK_RETURNS) + ret = dsrc.get_data() + self.assertTrue(ret) + self.assertEquals(MOCK_RETURNS['hostname'], + dsrc.metadata['local-hostname']) + + def test_base64_all(self): + # metadata provided base64_all of true + my_returns = MOCK_RETURNS.copy() + my_returns['base64_all'] = "true" + for k in ('hostname', 'user-data'): + my_returns[k] = base64.b64encode(my_returns[k]) + + dsrc = self._get_ds(mockdata=my_returns) ret = dsrc.get_data() self.assertTrue(ret) - self.assertEquals(mock_returns['hostname'], + self.assertEquals(MOCK_RETURNS['hostname'], dsrc.metadata['local-hostname']) + self.assertEquals(MOCK_RETURNS['user-data'], + dsrc.userdata_raw) + self.assertEquals(MOCK_RETURNS['root_authorized_keys'], + dsrc.metadata['public-keys']) + self.assertEquals(MOCK_RETURNS['disable_iptables_flag'], + dsrc.metadata['iptables_disable']) + self.assertEquals(MOCK_RETURNS['enable_motd_sys_info'], + dsrc.metadata['motd_sys_info']) + + def test_b64_userdata(self): + my_returns = MOCK_RETURNS.copy() + my_returns['b64-user-data'] = "true" + my_returns['b64-hostname'] = "true" + for k in ('hostname', 'user-data'): + my_returns[k] = base64.b64encode(my_returns[k]) + + dsrc = self._get_ds(mockdata=my_returns) + ret = dsrc.get_data() + self.assertTrue(ret) + self.assertEquals(MOCK_RETURNS['hostname'], + dsrc.metadata['local-hostname']) + self.assertEquals(MOCK_RETURNS['user-data'], dsrc.userdata_raw) + self.assertEquals(MOCK_RETURNS['root_authorized_keys'], + dsrc.metadata['public-keys']) + + def test_b64_keys(self): + my_returns = MOCK_RETURNS.copy() + my_returns['base64_keys'] = 'hostname,ignored' + for k in ('hostname',): + my_returns[k] = base64.b64encode(my_returns[k]) + + dsrc = self._get_ds(mockdata=my_returns) + ret = dsrc.get_data() + self.assertTrue(ret) + self.assertEquals(MOCK_RETURNS['hostname'], + dsrc.metadata['local-hostname']) + self.assertEquals(MOCK_RETURNS['user-data'], dsrc.userdata_raw) + + def test_userdata(self): + dsrc = self._get_ds(mockdata=MOCK_RETURNS) + ret = dsrc.get_data() + self.assertTrue(ret) + self.assertEquals(MOCK_RETURNS['user-data'], dsrc.userdata_raw) def test_disable_iptables_flag(self): - dsrc = self._get_ds() + dsrc = self._get_ds(mockdata=MOCK_RETURNS) ret = dsrc.get_data() self.assertTrue(ret) - self.assertEquals(mock_returns['disable_iptables_flag'], + self.assertEquals(MOCK_RETURNS['disable_iptables_flag'], dsrc.metadata['iptables_disable']) def test_motd_sys_info(self): - dsrc = self._get_ds() + dsrc = self._get_ds(mockdata=MOCK_RETURNS) ret = dsrc.get_data() self.assertTrue(ret) - self.assertEquals(mock_returns['enable_motd_sys_info'], + self.assertEquals(MOCK_RETURNS['enable_motd_sys_info'], dsrc.metadata['motd_sys_info']) + def test_default_ephemeral(self): + # Test to make sure that the builtin config has the ephemeral + # configuration. + dsrc = self._get_ds() + cfg = dsrc.get_config_obj() + + ret = dsrc.get_data() + self.assertTrue(ret) + + assert 'disk_setup' in cfg + assert 'fs_setup' in cfg + self.assertIsInstance(cfg['disk_setup'], dict) + self.assertIsInstance(cfg['fs_setup'], list) + + def test_override_disk_aliases(self): + # Test to make sure that the built-in DS is overriden + builtin = DataSourceSmartOS.BUILTIN_DS_CONFIG + + mydscfg = {'disk_aliases': {'FOO': '/dev/bar'}} + + # expect that these values are in builtin, or this is pointless + for k in mydscfg: + self.assertIn(k, builtin) + + dsrc = self._get_ds(ds_cfg=mydscfg) + ret = dsrc.get_data() + self.assertTrue(ret) + + self.assertEqual(mydscfg['disk_aliases']['FOO'], + dsrc.ds_cfg['disk_aliases']['FOO']) + + self.assertEqual(dsrc.device_name_to_device('FOO'), + mydscfg['disk_aliases']['FOO']) + def apply_patches(patches): ret = [] |