diff options
author | Scott Moser <smoser@brickies.net> | 2016-08-23 16:48:42 -0400 |
---|---|---|
committer | Scott Moser <smoser@brickies.net> | 2016-08-23 16:48:42 -0400 |
commit | 86e2614b6c3db342aa5a6590e91b9e459bbcb484 (patch) | |
tree | 6996805b91a0c1c31f3afea3a689348bf760de63 /tests/unittests/test_datasource | |
parent | c937c66dd0d1ad7b73dcc2efb5eb4c16b05f4479 (diff) | |
parent | 9f7ce5f090689b664ffce7e0b4ac78bfeafd1a79 (diff) | |
download | vyos-cloud-init-86e2614b6c3db342aa5a6590e91b9e459bbcb484.tar.gz vyos-cloud-init-86e2614b6c3db342aa5a6590e91b9e459bbcb484.zip |
merge trunk at 0.7.7~bzr1245
Diffstat (limited to 'tests/unittests/test_datasource')
-rw-r--r-- | tests/unittests/test_datasource/test_altcloud.py | 48 | ||||
-rw-r--r-- | tests/unittests/test_datasource/test_azure.py | 30 | ||||
-rw-r--r-- | tests/unittests/test_datasource/test_azure_helper.py | 14 | ||||
-rw-r--r-- | tests/unittests/test_datasource/test_cloudsigma.py | 2 | ||||
-rw-r--r-- | tests/unittests/test_datasource/test_cloudstack.py | 10 | ||||
-rw-r--r-- | tests/unittests/test_datasource/test_configdrive.py | 238 | ||||
-rw-r--r-- | tests/unittests/test_datasource/test_digitalocean.py | 2 | ||||
-rw-r--r-- | tests/unittests/test_datasource/test_gce.py | 4 | ||||
-rw-r--r-- | tests/unittests/test_datasource/test_maas.py | 4 | ||||
-rw-r--r-- | tests/unittests/test_datasource/test_nocloud.py | 15 | ||||
-rw-r--r-- | tests/unittests/test_datasource/test_openstack.py | 102 | ||||
-rw-r--r-- | tests/unittests/test_datasource/test_smartos.py | 360 |
12 files changed, 473 insertions, 356 deletions
diff --git a/tests/unittests/test_datasource/test_altcloud.py b/tests/unittests/test_datasource/test_altcloud.py index 85759c68..12966563 100644 --- a/tests/unittests/test_datasource/test_altcloud.py +++ b/tests/unittests/test_datasource/test_altcloud.py @@ -134,7 +134,7 @@ class TestGetCloudType(TestCase): ''' util.read_dmi_data = _dmi_data('RHEV') dsrc = DataSourceAltCloud({}, None, self.paths) - self.assertEquals('RHEV', dsrc.get_cloud_type()) + self.assertEqual('RHEV', dsrc.get_cloud_type()) def test_vsphere(self): ''' @@ -143,7 +143,7 @@ class TestGetCloudType(TestCase): ''' util.read_dmi_data = _dmi_data('VMware Virtual Platform') dsrc = DataSourceAltCloud({}, None, self.paths) - self.assertEquals('VSPHERE', dsrc.get_cloud_type()) + self.assertEqual('VSPHERE', dsrc.get_cloud_type()) def test_unknown(self): ''' @@ -152,7 +152,7 @@ class TestGetCloudType(TestCase): ''' util.read_dmi_data = _dmi_data('Unrecognized Platform') dsrc = DataSourceAltCloud({}, None, self.paths) - self.assertEquals('UNKNOWN', dsrc.get_cloud_type()) + self.assertEqual('UNKNOWN', dsrc.get_cloud_type()) class TestGetDataCloudInfoFile(TestCase): @@ -187,7 +187,7 @@ class TestGetDataCloudInfoFile(TestCase): _write_cloud_info_file('RHEV') dsrc = DataSourceAltCloud({}, None, self.paths) dsrc.user_data_rhevm = lambda: True - self.assertEquals(True, dsrc.get_data()) + self.assertEqual(True, dsrc.get_data()) def test_vsphere(self): '''Success Test module get_data() forcing VSPHERE.''' @@ -195,7 +195,7 @@ class TestGetDataCloudInfoFile(TestCase): _write_cloud_info_file('VSPHERE') dsrc = DataSourceAltCloud({}, None, self.paths) dsrc.user_data_vsphere = lambda: True - self.assertEquals(True, dsrc.get_data()) + self.assertEqual(True, dsrc.get_data()) def test_fail_rhev(self): '''Failure Test module get_data() forcing RHEV.''' @@ -203,7 +203,7 @@ class TestGetDataCloudInfoFile(TestCase): _write_cloud_info_file('RHEV') dsrc = DataSourceAltCloud({}, None, self.paths) dsrc.user_data_rhevm = lambda: False - self.assertEquals(False, dsrc.get_data()) + self.assertEqual(False, dsrc.get_data()) def test_fail_vsphere(self): '''Failure Test module get_data() forcing VSPHERE.''' @@ -211,14 +211,14 @@ class TestGetDataCloudInfoFile(TestCase): _write_cloud_info_file('VSPHERE') dsrc = DataSourceAltCloud({}, None, self.paths) dsrc.user_data_vsphere = lambda: False - self.assertEquals(False, dsrc.get_data()) + self.assertEqual(False, dsrc.get_data()) def test_unrecognized(self): '''Failure Test module get_data() forcing unrecognized.''' _write_cloud_info_file('unrecognized') dsrc = DataSourceAltCloud({}, None, self.paths) - self.assertEquals(False, dsrc.get_data()) + self.assertEqual(False, dsrc.get_data()) class TestGetDataNoCloudInfoFile(TestCase): @@ -250,7 +250,7 @@ class TestGetDataNoCloudInfoFile(TestCase): util.read_dmi_data = _dmi_data('RHEV Hypervisor') dsrc = DataSourceAltCloud({}, None, self.paths) dsrc.user_data_rhevm = lambda: True - self.assertEquals(True, dsrc.get_data()) + self.assertEqual(True, dsrc.get_data()) def test_vsphere_no_cloud_file(self): '''Test No cloud info file module get_data() forcing VSPHERE.''' @@ -258,14 +258,14 @@ class TestGetDataNoCloudInfoFile(TestCase): util.read_dmi_data = _dmi_data('VMware Virtual Platform') dsrc = DataSourceAltCloud({}, None, self.paths) dsrc.user_data_vsphere = lambda: True - self.assertEquals(True, dsrc.get_data()) + self.assertEqual(True, dsrc.get_data()) def test_failure_no_cloud_file(self): '''Test No cloud info file module get_data() forcing unrecognized.''' util.read_dmi_data = _dmi_data('Unrecognized Platform') dsrc = DataSourceAltCloud({}, None, self.paths) - self.assertEquals(False, dsrc.get_data()) + self.assertEqual(False, dsrc.get_data()) class TestUserDataRhevm(TestCase): @@ -305,7 +305,7 @@ class TestUserDataRhevm(TestCase): dsrc = DataSourceAltCloud({}, None, self.paths) - self.assertEquals(False, dsrc.user_data_rhevm()) + self.assertEqual(False, dsrc.user_data_rhevm()) def test_modprobe_fails(self): '''Test user_data_rhevm() where modprobe fails.''' @@ -315,7 +315,7 @@ class TestUserDataRhevm(TestCase): dsrc = DataSourceAltCloud({}, None, self.paths) - self.assertEquals(False, dsrc.user_data_rhevm()) + self.assertEqual(False, dsrc.user_data_rhevm()) def test_no_modprobe_cmd(self): '''Test user_data_rhevm() with no modprobe command.''' @@ -325,7 +325,7 @@ class TestUserDataRhevm(TestCase): dsrc = DataSourceAltCloud({}, None, self.paths) - self.assertEquals(False, dsrc.user_data_rhevm()) + self.assertEqual(False, dsrc.user_data_rhevm()) def test_udevadm_fails(self): '''Test user_data_rhevm() where udevadm fails.''' @@ -335,7 +335,7 @@ class TestUserDataRhevm(TestCase): dsrc = DataSourceAltCloud({}, None, self.paths) - self.assertEquals(False, dsrc.user_data_rhevm()) + self.assertEqual(False, dsrc.user_data_rhevm()) def test_no_udevadm_cmd(self): '''Test user_data_rhevm() with no udevadm command.''' @@ -345,7 +345,7 @@ class TestUserDataRhevm(TestCase): dsrc = DataSourceAltCloud({}, None, self.paths) - self.assertEquals(False, dsrc.user_data_rhevm()) + self.assertEqual(False, dsrc.user_data_rhevm()) class TestUserDataVsphere(TestCase): @@ -380,7 +380,7 @@ class TestUserDataVsphere(TestCase): dsrc = DataSourceAltCloud({}, None, self.paths) - self.assertEquals(False, dsrc.user_data_vsphere()) + self.assertEqual(False, dsrc.user_data_vsphere()) class TestReadUserDataCallback(TestCase): @@ -408,8 +408,8 @@ class TestReadUserDataCallback(TestCase): def test_callback_both(self): '''Test read_user_data_callback() with both files.''' - self.assertEquals('test user data', - read_user_data_callback(self.mount_dir)) + self.assertEqual('test user data', + read_user_data_callback(self.mount_dir)) def test_callback_dc(self): '''Test read_user_data_callback() with only DC file.''' @@ -418,8 +418,8 @@ class TestReadUserDataCallback(TestCase): dc_file=False, non_dc_file=True) - self.assertEquals('test user data', - read_user_data_callback(self.mount_dir)) + self.assertEqual('test user data', + read_user_data_callback(self.mount_dir)) def test_callback_non_dc(self): '''Test read_user_data_callback() with only non-DC file.''' @@ -428,14 +428,14 @@ class TestReadUserDataCallback(TestCase): dc_file=True, non_dc_file=False) - self.assertEquals('test user data', - read_user_data_callback(self.mount_dir)) + self.assertEqual('test user data', + read_user_data_callback(self.mount_dir)) def test_callback_none(self): '''Test read_user_data_callback() no files are found.''' _remove_user_data_files(self.mount_dir) - self.assertEquals(None, read_user_data_callback(self.mount_dir)) + self.assertEqual(None, read_user_data_callback(self.mount_dir)) def force_arch(arch=None): diff --git a/tests/unittests/test_datasource/test_azure.py b/tests/unittests/test_datasource/test_azure.py index 444e2799..e90e903c 100644 --- a/tests/unittests/test_datasource/test_azure.py +++ b/tests/unittests/test_datasource/test_azure.py @@ -1,24 +1,16 @@ from cloudinit import helpers from cloudinit.util import b64e, decode_binary, load_file from cloudinit.sources import DataSourceAzure -from ..helpers import TestCase, populate_dir -try: - from unittest import mock -except ImportError: - import mock -try: - from contextlib import ExitStack -except ImportError: - from contextlib2 import ExitStack +from ..helpers import TestCase, populate_dir, mock, ExitStack, PY26, SkipTest import crypt import os -import stat -import yaml import shutil +import stat import tempfile import xml.etree.ElementTree as ET +import yaml def construct_valid_ovf_env(data=None, pubkeys=None, userdata=None): @@ -83,6 +75,8 @@ class TestAzureDataSource(TestCase): def setUp(self): super(TestAzureDataSource, self).setUp() + if PY26: + raise SkipTest("Does not work on python 2.6") self.tmp = tempfile.mkdtemp() self.addCleanup(shutil.rmtree, self.tmp) @@ -165,7 +159,7 @@ class TestAzureDataSource(TestCase): def tags_equal(x, y): for x_tag, x_val in x.items(): y_val = y.get(x_val.tag) - self.assertEquals(x_val.text, y_val.text) + self.assertEqual(x_val.text, y_val.text) old_cnt = create_tag_index(oxml) new_cnt = create_tag_index(nxml) @@ -354,8 +348,8 @@ class TestAzureDataSource(TestCase): self.assertTrue(ret) cfg = dsrc.get_config_obj() - self.assertEquals(dsrc.device_name_to_device("ephemeral0"), - "/dev/sdb") + self.assertEqual(dsrc.device_name_to_device("ephemeral0"), + "/dev/sdb") assert 'disk_setup' in cfg assert 'fs_setup' in cfg self.assertIsInstance(cfg['disk_setup'], dict) @@ -404,15 +398,15 @@ class TestAzureDataSource(TestCase): self.xml_notequals(data['ovfcontent'], on_disk_ovf) # Make sure that the redacted password on disk is not used by CI - self.assertNotEquals(dsrc.cfg.get('password'), - DataSourceAzure.DEF_PASSWD_REDACTION) + self.assertNotEqual(dsrc.cfg.get('password'), + DataSourceAzure.DEF_PASSWD_REDACTION) # Make sure that the password was really encrypted et = ET.fromstring(on_disk_ovf) for elem in et.iter(): if 'UserPassword' in elem.tag: - self.assertEquals(DataSourceAzure.DEF_PASSWD_REDACTION, - elem.text) + self.assertEqual(DataSourceAzure.DEF_PASSWD_REDACTION, + elem.text) def test_ovf_env_arrives_in_waagent_dir(self): xml = construct_valid_ovf_env(data={}, userdata="FOODATA") diff --git a/tests/unittests/test_datasource/test_azure_helper.py b/tests/unittests/test_datasource/test_azure_helper.py index 1134199b..65202ff0 100644 --- a/tests/unittests/test_datasource/test_azure_helper.py +++ b/tests/unittests/test_datasource/test_azure_helper.py @@ -1,17 +1,8 @@ import os from cloudinit.sources.helpers import azure as azure_helper -from ..helpers import TestCase -try: - from unittest import mock -except ImportError: - import mock - -try: - from contextlib import ExitStack -except ImportError: - from contextlib2 import ExitStack +from ..helpers import ExitStack, mock, TestCase GOAL_STATE_TEMPLATE = """\ @@ -70,7 +61,7 @@ class TestFindEndpoint(TestCase): def test_missing_special_azure_line(self): self.load_file.return_value = '' - self.assertRaises(Exception, + self.assertRaises(ValueError, azure_helper.WALinuxAgentShim.find_endpoint) @staticmethod @@ -287,6 +278,7 @@ class TestOpenSSLManager(TestCase): self.subp.side_effect = capture_directory manager = azure_helper.OpenSSLManager() self.assertEqual(manager.tmpdir, subp_directory['path']) + manager.clean_up() @mock.patch.object(azure_helper, 'cd', mock.MagicMock()) @mock.patch.object(azure_helper.tempfile, 'mkdtemp', mock.MagicMock()) diff --git a/tests/unittests/test_datasource/test_cloudsigma.py b/tests/unittests/test_datasource/test_cloudsigma.py index 772d189a..2a42ce0c 100644 --- a/tests/unittests/test_datasource/test_cloudsigma.py +++ b/tests/unittests/test_datasource/test_cloudsigma.py @@ -1,4 +1,5 @@ # coding: utf-8 + import copy from cloudinit.cs_utils import Cepko @@ -6,7 +7,6 @@ from cloudinit.sources import DataSourceCloudSigma from .. import helpers as test_helpers - SERVER_CONTEXT = { "cpu": 1000, "cpus_instead_of_cores": False, diff --git a/tests/unittests/test_datasource/test_cloudstack.py b/tests/unittests/test_datasource/test_cloudstack.py index 656d80d1..b1aab17b 100644 --- a/tests/unittests/test_datasource/test_cloudstack.py +++ b/tests/unittests/test_datasource/test_cloudstack.py @@ -1,15 +1,7 @@ from cloudinit import helpers from cloudinit.sources.DataSourceCloudStack import DataSourceCloudStack -from ..helpers import TestCase -try: - from unittest import mock -except ImportError: - import mock -try: - from contextlib import ExitStack -except ImportError: - from contextlib2 import ExitStack +from ..helpers import TestCase, mock, ExitStack class TestCloudStackPasswordFetching(TestCase): diff --git a/tests/unittests/test_datasource/test_configdrive.py b/tests/unittests/test_datasource/test_configdrive.py index 89b15f54..18551b92 100644 --- a/tests/unittests/test_datasource/test_configdrive.py +++ b/tests/unittests/test_datasource/test_configdrive.py @@ -5,22 +5,15 @@ import shutil import six import tempfile -try: - from unittest import mock -except ImportError: - import mock -try: - from contextlib import ExitStack -except ImportError: - from contextlib2 import ExitStack - from cloudinit import helpers +from cloudinit.net import eni +from cloudinit.net import network_state from cloudinit import settings from cloudinit.sources import DataSourceConfigDrive as ds from cloudinit.sources.helpers import openstack from cloudinit import util -from ..helpers import TestCase +from ..helpers import TestCase, ExitStack, mock PUBKEY = u'ssh-rsa AAAAB3NzaC1....sIkJhq8wdX+4I3A4cYbYP ubuntu@server-460\n' @@ -73,7 +66,7 @@ NETWORK_DATA = { 'type': 'ovs', 'mtu': None, 'id': 'tap2f88d109-5b'}, {'vif_id': '1a5382f8-04c5-4d75-ab98-d666c1ef52cc', 'ethernet_mac_address': 'fa:16:3e:05:30:fe', - 'type': 'ovs', 'mtu': None, 'id': 'tap1a5382f8-04'} + 'type': 'ovs', 'mtu': None, 'id': 'tap1a5382f8-04', 'name': 'nic0'} ], 'networks': [ {'link': 'tap2ecc7709-b3', 'type': 'ipv4_dhcp', @@ -88,6 +81,35 @@ NETWORK_DATA = { ] } +NETWORK_DATA_2 = { + "services": [ + {"type": "dns", "address": "1.1.1.191"}, + {"type": "dns", "address": "1.1.1.4"}], + "networks": [ + {"network_id": "d94bbe94-7abc-48d4-9c82-4628ea26164a", "type": "ipv4", + "netmask": "255.255.255.248", "link": "eth0", + "routes": [{"netmask": "0.0.0.0", "network": "0.0.0.0", + "gateway": "2.2.2.9"}], + "ip_address": "2.2.2.10", "id": "network0-ipv4"}, + {"network_id": "ca447c83-6409-499b-aaef-6ad1ae995348", "type": "ipv4", + "netmask": "255.255.255.224", "link": "eth1", + "routes": [], "ip_address": "3.3.3.24", "id": "network1-ipv4"}], + "links": [ + {"ethernet_mac_address": "fa:16:3e:dd:50:9a", "mtu": 1500, + "type": "vif", "id": "eth0", "vif_id": "vif-foo1"}, + {"ethernet_mac_address": "fa:16:3e:a8:14:69", "mtu": 1500, + "type": "vif", "id": "eth1", "vif_id": "vif-foo2"}] +} + + +KNOWN_MACS = { + 'fa:16:3e:69:b0:58': 'enp0s1', + 'fa:16:3e:d4:57:ad': 'enp0s2', + 'fa:16:3e:dd:50:9a': 'foo1', + 'fa:16:3e:a8:14:69': 'foo2', + 'fa:16:3e:ed:9a:59': 'foo3', +} + CFG_DRIVE_FILES_V2 = { 'ec2/2009-04-04/meta-data.json': json.dumps(EC2_META), 'ec2/2009-04-04/user-data': USER_DATA, @@ -151,7 +173,7 @@ class TestConfigDriveDataSource(TestCase): mock.patch.object(os.path, 'exists', side_effect=exists_side_effect())) device = cfg_ds.device_name_to_device(name) - self.assertEquals(dev_name, device) + self.assertEqual(dev_name, device) find_mock.assert_called_once_with(mock.ANY) self.assertEqual(exists_mock.call_count, 2) @@ -179,7 +201,7 @@ class TestConfigDriveDataSource(TestCase): mock.patch.object(os.path, 'exists', return_value=True)) device = cfg_ds.device_name_to_device(name) - self.assertEquals(dev_name, device) + self.assertEqual(dev_name, device) find_mock.assert_called_once_with(mock.ANY) exists_mock.assert_called_once_with(mock.ANY) @@ -214,7 +236,7 @@ class TestConfigDriveDataSource(TestCase): with mock.patch.object(os.path, 'exists', side_effect=exists_side_effect()): device = cfg_ds.device_name_to_device(name) - self.assertEquals(dev_name, device) + self.assertEqual(dev_name, device) # We don't assert the call count for os.path.exists() because # not all of the entries in name_tests results in two calls to # that function. Specifically, 'root2k' doesn't seem to call @@ -242,7 +264,7 @@ class TestConfigDriveDataSource(TestCase): for name, dev_name in name_tests.items(): with mock.patch.object(os.path, 'exists', return_value=True): device = cfg_ds.device_name_to_device(name) - self.assertEquals(dev_name, device) + self.assertEqual(dev_name, device) def test_dir_valid(self): """Verify a dir is read as such.""" @@ -348,32 +370,200 @@ class TestConfigDriveDataSource(TestCase): util.find_devs_with = orig_find_devs_with util.is_partition = orig_is_partition - def test_pubkeys_v2(self): + @mock.patch('cloudinit.sources.DataSourceConfigDrive.on_first_boot') + def test_pubkeys_v2(self, on_first_boot): """Verify that public-keys work in config-drive-v2.""" populate_dir(self.tmp, CFG_DRIVE_FILES_V2) myds = cfg_ds_from_dir(self.tmp) self.assertEqual(myds.get_public_ssh_keys(), [OSTACK_META['public_keys']['mykey']]) - def test_network_data_is_found(self): + +class TestNetJson(TestCase): + def setUp(self): + super(TestNetJson, self).setUp() + self.tmp = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, self.tmp) + self.maxDiff = None + + @mock.patch('cloudinit.sources.DataSourceConfigDrive.on_first_boot') + def test_network_data_is_found(self, on_first_boot): """Verify that network_data is present in ds in config-drive-v2.""" populate_dir(self.tmp, CFG_DRIVE_FILES_V2) myds = cfg_ds_from_dir(self.tmp) - self.assertEqual(myds.network_json, NETWORK_DATA) + self.assertIsNotNone(myds.network_json) - def test_network_config_is_converted(self): + @mock.patch('cloudinit.sources.DataSourceConfigDrive.on_first_boot') + def test_network_config_is_converted(self, on_first_boot): """Verify that network_data is converted and present on ds object.""" populate_dir(self.tmp, CFG_DRIVE_FILES_V2) myds = cfg_ds_from_dir(self.tmp) - network_config = ds.convert_network_data(NETWORK_DATA) + network_config = openstack.convert_net_json(NETWORK_DATA, + known_macs=KNOWN_MACS) self.assertEqual(myds.network_config, network_config) + def test_network_config_conversions(self): + """Tests a bunch of input network json and checks the + expected conversions.""" + in_datas = [ + NETWORK_DATA, + { + 'services': [{'type': 'dns', 'address': '172.19.0.12'}], + 'networks': [{ + 'network_id': 'dacd568d-5be6-4786-91fe-750c374b78b4', + 'type': 'ipv4', + 'netmask': '255.255.252.0', + 'link': 'tap1a81968a-79', + 'routes': [{ + 'netmask': '0.0.0.0', + 'network': '0.0.0.0', + 'gateway': '172.19.3.254', + }], + 'ip_address': '172.19.1.34', + 'id': 'network0', + }], + 'links': [{ + 'type': 'bridge', + 'vif_id': '1a81968a-797a-400f-8a80-567f997eb93f', + 'ethernet_mac_address': 'fa:16:3e:ed:9a:59', + 'id': 'tap1a81968a-79', + 'mtu': None, + }], + }, + ] + out_datas = [ + { + 'version': 1, + 'config': [ + { + 'subnets': [{'type': 'dhcp4'}], + 'type': 'physical', + 'mac_address': 'fa:16:3e:69:b0:58', + 'name': 'enp0s1', + 'mtu': None, + }, + { + 'subnets': [{'type': 'dhcp4'}], + 'type': 'physical', + 'mac_address': 'fa:16:3e:d4:57:ad', + 'name': 'enp0s2', + 'mtu': None, + }, + { + 'subnets': [{'type': 'dhcp4'}], + 'type': 'physical', + 'mac_address': 'fa:16:3e:05:30:fe', + 'name': 'nic0', + 'mtu': None, + }, + { + 'type': 'nameserver', + 'address': '199.204.44.24', + }, + { + 'type': 'nameserver', + 'address': '199.204.47.54', + } + ], + + }, + { + 'version': 1, + 'config': [ + { + 'name': 'foo3', + 'mac_address': 'fa:16:3e:ed:9a:59', + 'mtu': None, + 'type': 'physical', + 'subnets': [ + { + 'address': '172.19.1.34', + 'netmask': '255.255.252.0', + 'type': 'static', + 'ipv4': True, + 'routes': [{ + 'gateway': '172.19.3.254', + 'netmask': '0.0.0.0', + 'network': '0.0.0.0', + }], + } + ] + }, + { + 'type': 'nameserver', + 'address': '172.19.0.12', + } + ], + }, + ] + for in_data, out_data in zip(in_datas, out_datas): + conv_data = openstack.convert_net_json(in_data, + known_macs=KNOWN_MACS) + self.assertEqual(out_data, conv_data) + + +class TestConvertNetworkData(TestCase): + def setUp(self): + super(TestConvertNetworkData, self).setUp() + self.tmp = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, self.tmp) + + def _getnames_in_config(self, ncfg): + return set([n['name'] for n in ncfg['config'] + if n['type'] == 'physical']) + + def test_conversion_fills_names(self): + ncfg = openstack.convert_net_json(NETWORK_DATA, known_macs=KNOWN_MACS) + expected = set(['nic0', 'enp0s1', 'enp0s2']) + found = self._getnames_in_config(ncfg) + self.assertEqual(found, expected) + + @mock.patch('cloudinit.net.get_interfaces_by_mac') + def test_convert_reads_system_prefers_name(self, get_interfaces_by_mac): + macs = KNOWN_MACS.copy() + macs.update({'fa:16:3e:05:30:fe': 'foonic1', + 'fa:16:3e:69:b0:58': 'ens1'}) + get_interfaces_by_mac.return_value = macs + + ncfg = openstack.convert_net_json(NETWORK_DATA) + expected = set(['nic0', 'ens1', 'enp0s2']) + found = self._getnames_in_config(ncfg) + self.assertEqual(found, expected) + + def test_convert_raises_value_error_on_missing_name(self): + macs = {'aa:aa:aa:aa:aa:00': 'ens1'} + self.assertRaises(ValueError, openstack.convert_net_json, + NETWORK_DATA, known_macs=macs) + + def test_conversion_with_route(self): + ncfg = openstack.convert_net_json(NETWORK_DATA_2, + known_macs=KNOWN_MACS) + # not the best test, but see that we get a route in the + # network config and that it gets rendered to an ENI file + routes = [] + for n in ncfg['config']: + for s in n.get('subnets', []): + routes.extend(s.get('routes', [])) + self.assertIn( + {'network': '0.0.0.0', 'netmask': '0.0.0.0', 'gateway': '2.2.2.9'}, + routes) + eni_renderer = eni.Renderer() + eni_renderer.render_network_state( + self.tmp, network_state.parse_net_config_data(ncfg)) + with open(os.path.join(self.tmp, "etc", + "network", "interfaces"), 'r') as f: + eni_rendering = f.read() + self.assertIn("route add default gw 2.2.2.9", eni_rendering) + def cfg_ds_from_dir(seed_d): - found = ds.read_config_drive(seed_d) cfg_ds = ds.DataSourceConfigDrive(settings.CFG_BUILTIN, None, helpers.Paths({})) - populate_ds_from_read_config(cfg_ds, seed_d, found) + cfg_ds.seed_dir = seed_d + cfg_ds.known_macs = KNOWN_MACS.copy() + if not cfg_ds.get_data(): + raise RuntimeError("Data source did not extract itself from" + " seed directory %s" % seed_d) return cfg_ds @@ -387,7 +577,8 @@ def populate_ds_from_read_config(cfg_ds, source, results): cfg_ds.userdata_raw = results.get('userdata') cfg_ds.version = results.get('version') cfg_ds.network_json = results.get('networkdata') - cfg_ds._network_config = ds.convert_network_data(cfg_ds.network_json) + cfg_ds._network_config = openstack.convert_net_json( + cfg_ds.network_json, known_macs=KNOWN_MACS) def populate_dir(seed_dir, files): @@ -400,7 +591,6 @@ def populate_dir(seed_dir, files): mode = "w" else: mode = "wb" - with open(path, mode) as fp: fp.write(content) diff --git a/tests/unittests/test_datasource/test_digitalocean.py b/tests/unittests/test_datasource/test_digitalocean.py index 679d1b82..8936a1e3 100644 --- a/tests/unittests/test_datasource/test_digitalocean.py +++ b/tests/unittests/test_datasource/test_digitalocean.py @@ -19,8 +19,8 @@ import re from six.moves.urllib_parse import urlparse -from cloudinit import settings from cloudinit import helpers +from cloudinit import settings from cloudinit.sources import DataSourceDigitalOcean from .. import helpers as test_helpers diff --git a/tests/unittests/test_datasource/test_gce.py b/tests/unittests/test_datasource/test_gce.py index fa714070..6e62a4d2 100644 --- a/tests/unittests/test_datasource/test_gce.py +++ b/tests/unittests/test_datasource/test_gce.py @@ -20,8 +20,8 @@ import re from base64 import b64encode, b64decode from six.moves.urllib_parse import urlparse -from cloudinit import settings from cloudinit import helpers +from cloudinit import settings from cloudinit.sources import DataSourceGCE from .. import helpers as test_helpers @@ -52,7 +52,7 @@ GCE_META_ENCODING = { HEADERS = {'X-Google-Metadata-Request': 'True'} MD_URL_RE = re.compile( - r'http://metadata.google.internal./computeMetadata/v1/.*') + r'http://metadata.google.internal/computeMetadata/v1/.*') def _set_mock_metadata(gce_meta=None): diff --git a/tests/unittests/test_datasource/test_maas.py b/tests/unittests/test_datasource/test_maas.py index 77d15cac..f66f1c6d 100644 --- a/tests/unittests/test_datasource/test_maas.py +++ b/tests/unittests/test_datasource/test_maas.py @@ -104,13 +104,13 @@ class TestMAASDataSource(TestCase): 'meta-data/local-hostname': 'test-hostname', 'meta-data/public-keys': 'test-hostname', 'user-data': b'foodata', - } + } valid_order = [ 'meta-data/local-hostname', 'meta-data/instance-id', 'meta-data/public-keys', 'user-data', - ] + ] my_seed = "http://example.com/xmeta" my_ver = "1999-99-99" my_headers = {'header1': 'value1', 'header2': 'value2'} diff --git a/tests/unittests/test_datasource/test_nocloud.py b/tests/unittests/test_datasource/test_nocloud.py index 2d5fc37c..b0fa1130 100644 --- a/tests/unittests/test_datasource/test_nocloud.py +++ b/tests/unittests/test_datasource/test_nocloud.py @@ -1,22 +1,13 @@ from cloudinit import helpers from cloudinit.sources import DataSourceNoCloud from cloudinit import util -from ..helpers import TestCase, populate_dir +from ..helpers import TestCase, populate_dir, mock, ExitStack import os -import yaml import shutil import tempfile -import unittest -try: - from unittest import mock -except ImportError: - import mock -try: - from contextlib import ExitStack -except ImportError: - from contextlib2 import ExitStack +import yaml class TestNoCloudDataSource(TestCase): @@ -139,7 +130,7 @@ class TestNoCloudDataSource(TestCase): self.assertTrue(ret) -class TestParseCommandLineData(unittest.TestCase): +class TestParseCommandLineData(TestCase): def test_parse_cmdline_data_valid(self): ds_id = "ds=nocloud" diff --git a/tests/unittests/test_datasource/test_openstack.py b/tests/unittests/test_datasource/test_openstack.py index 0aa1ba84..5c8592c5 100644 --- a/tests/unittests/test_datasource/test_openstack.py +++ b/tests/unittests/test_datasource/test_openstack.py @@ -22,8 +22,8 @@ import re from .. import helpers as test_helpers -from six import StringIO from six.moves.urllib.parse import urlparse +from six import StringIO from cloudinit import helpers from cloudinit import settings @@ -135,41 +135,45 @@ def _register_uris(version, ec2_files, ec2_meta, os_files): body=get_request_callback) +def _read_metadata_service(): + return ds.read_metadata_service(BASE_URL, retries=0, timeout=0.1) + + class TestOpenStackDataSource(test_helpers.HttprettyTestCase): VERSION = 'latest' @hp.activate def test_successful(self): _register_uris(self.VERSION, EC2_FILES, EC2_META, OS_FILES) - f = ds.read_metadata_service(BASE_URL) - self.assertEquals(VENDOR_DATA, f.get('vendordata')) - self.assertEquals(CONTENT_0, f['files']['/etc/foo.cfg']) - self.assertEquals(CONTENT_1, f['files']['/etc/bar/bar.cfg']) - self.assertEquals(2, len(f['files'])) - self.assertEquals(USER_DATA, f.get('userdata')) - self.assertEquals(EC2_META, f.get('ec2-metadata')) - self.assertEquals(2, f.get('version')) + f = _read_metadata_service() + self.assertEqual(VENDOR_DATA, f.get('vendordata')) + self.assertEqual(CONTENT_0, f['files']['/etc/foo.cfg']) + self.assertEqual(CONTENT_1, f['files']['/etc/bar/bar.cfg']) + self.assertEqual(2, len(f['files'])) + self.assertEqual(USER_DATA, f.get('userdata')) + self.assertEqual(EC2_META, f.get('ec2-metadata')) + self.assertEqual(2, f.get('version')) metadata = f['metadata'] - self.assertEquals('nova', metadata.get('availability_zone')) - self.assertEquals('sm-foo-test.novalocal', metadata.get('hostname')) - self.assertEquals('sm-foo-test.novalocal', - metadata.get('local-hostname')) - self.assertEquals('sm-foo-test', metadata.get('name')) - self.assertEquals('b0fa911b-69d4-4476-bbe2-1c92bff6535c', - metadata.get('uuid')) - self.assertEquals('b0fa911b-69d4-4476-bbe2-1c92bff6535c', - metadata.get('instance-id')) + self.assertEqual('nova', metadata.get('availability_zone')) + self.assertEqual('sm-foo-test.novalocal', metadata.get('hostname')) + self.assertEqual('sm-foo-test.novalocal', + metadata.get('local-hostname')) + self.assertEqual('sm-foo-test', metadata.get('name')) + self.assertEqual('b0fa911b-69d4-4476-bbe2-1c92bff6535c', + metadata.get('uuid')) + self.assertEqual('b0fa911b-69d4-4476-bbe2-1c92bff6535c', + metadata.get('instance-id')) @hp.activate def test_no_ec2(self): _register_uris(self.VERSION, {}, {}, OS_FILES) - f = ds.read_metadata_service(BASE_URL) - self.assertEquals(VENDOR_DATA, f.get('vendordata')) - self.assertEquals(CONTENT_0, f['files']['/etc/foo.cfg']) - self.assertEquals(CONTENT_1, f['files']['/etc/bar/bar.cfg']) - self.assertEquals(USER_DATA, f.get('userdata')) - self.assertEquals({}, f.get('ec2-metadata')) - self.assertEquals(2, f.get('version')) + f = _read_metadata_service() + self.assertEqual(VENDOR_DATA, f.get('vendordata')) + self.assertEqual(CONTENT_0, f['files']['/etc/foo.cfg']) + self.assertEqual(CONTENT_1, f['files']['/etc/bar/bar.cfg']) + self.assertEqual(USER_DATA, f.get('userdata')) + self.assertEqual({}, f.get('ec2-metadata')) + self.assertEqual(2, f.get('version')) @hp.activate def test_bad_metadata(self): @@ -178,8 +182,7 @@ class TestOpenStackDataSource(test_helpers.HttprettyTestCase): if k.endswith('meta_data.json'): os_files.pop(k, None) _register_uris(self.VERSION, {}, {}, os_files) - self.assertRaises(openstack.NonReadable, ds.read_metadata_service, - BASE_URL) + self.assertRaises(openstack.NonReadable, _read_metadata_service) @hp.activate def test_bad_uuid(self): @@ -190,8 +193,7 @@ class TestOpenStackDataSource(test_helpers.HttprettyTestCase): if k.endswith('meta_data.json'): os_files[k] = json.dumps(os_meta) _register_uris(self.VERSION, {}, {}, os_files) - self.assertRaises(openstack.BrokenMetadata, ds.read_metadata_service, - BASE_URL) + self.assertRaises(openstack.BrokenMetadata, _read_metadata_service) @hp.activate def test_userdata_empty(self): @@ -200,10 +202,10 @@ class TestOpenStackDataSource(test_helpers.HttprettyTestCase): if k.endswith('user_data'): os_files.pop(k, None) _register_uris(self.VERSION, {}, {}, os_files) - f = ds.read_metadata_service(BASE_URL) - self.assertEquals(VENDOR_DATA, f.get('vendordata')) - self.assertEquals(CONTENT_0, f['files']['/etc/foo.cfg']) - self.assertEquals(CONTENT_1, f['files']['/etc/bar/bar.cfg']) + f = _read_metadata_service() + self.assertEqual(VENDOR_DATA, f.get('vendordata')) + self.assertEqual(CONTENT_0, f['files']['/etc/foo.cfg']) + self.assertEqual(CONTENT_1, f['files']['/etc/bar/bar.cfg']) self.assertFalse(f.get('userdata')) @hp.activate @@ -213,9 +215,9 @@ class TestOpenStackDataSource(test_helpers.HttprettyTestCase): if k.endswith('vendor_data.json'): os_files.pop(k, None) _register_uris(self.VERSION, {}, {}, os_files) - f = ds.read_metadata_service(BASE_URL) - self.assertEquals(CONTENT_0, f['files']['/etc/foo.cfg']) - self.assertEquals(CONTENT_1, f['files']['/etc/bar/bar.cfg']) + f = _read_metadata_service() + self.assertEqual(CONTENT_0, f['files']['/etc/foo.cfg']) + self.assertEqual(CONTENT_1, f['files']['/etc/bar/bar.cfg']) self.assertFalse(f.get('vendordata')) @hp.activate @@ -225,8 +227,7 @@ class TestOpenStackDataSource(test_helpers.HttprettyTestCase): if k.endswith('vendor_data.json'): os_files[k] = '{' # some invalid json _register_uris(self.VERSION, {}, {}, os_files) - self.assertRaises(openstack.BrokenMetadata, ds.read_metadata_service, - BASE_URL) + self.assertRaises(openstack.BrokenMetadata, _read_metadata_service) @hp.activate def test_metadata_invalid(self): @@ -235,8 +236,7 @@ class TestOpenStackDataSource(test_helpers.HttprettyTestCase): if k.endswith('meta_data.json'): os_files[k] = '{' # some invalid json _register_uris(self.VERSION, {}, {}, os_files) - self.assertRaises(openstack.BrokenMetadata, ds.read_metadata_service, - BASE_URL) + self.assertRaises(openstack.BrokenMetadata, _read_metadata_service) @hp.activate def test_datasource(self): @@ -245,18 +245,18 @@ class TestOpenStackDataSource(test_helpers.HttprettyTestCase): None, helpers.Paths({})) self.assertIsNone(ds_os.version) - found = ds_os.get_data() + found = ds_os.get_data(timeout=0.1, retries=0) self.assertTrue(found) - self.assertEquals(2, ds_os.version) + self.assertEqual(2, ds_os.version) md = dict(ds_os.metadata) md.pop('instance-id', None) md.pop('local-hostname', None) - self.assertEquals(OSTACK_META, md) - self.assertEquals(EC2_META, ds_os.ec2_metadata) - self.assertEquals(USER_DATA, ds_os.userdata_raw) - self.assertEquals(2, len(ds_os.files)) - self.assertEquals(VENDOR_DATA, ds_os.vendordata_pure) - self.assertEquals(ds_os.vendordata_raw, None) + self.assertEqual(OSTACK_META, md) + self.assertEqual(EC2_META, ds_os.ec2_metadata) + self.assertEqual(USER_DATA, ds_os.userdata_raw) + self.assertEqual(2, len(ds_os.files)) + self.assertEqual(VENDOR_DATA, ds_os.vendordata_pure) + self.assertEqual(ds_os.vendordata_raw, None) @hp.activate def test_bad_datasource_meta(self): @@ -269,7 +269,7 @@ class TestOpenStackDataSource(test_helpers.HttprettyTestCase): None, helpers.Paths({})) self.assertIsNone(ds_os.version) - found = ds_os.get_data() + found = ds_os.get_data(timeout=0.1, retries=0) self.assertFalse(found) self.assertIsNone(ds_os.version) @@ -288,7 +288,7 @@ class TestOpenStackDataSource(test_helpers.HttprettyTestCase): 'timeout': 0, } self.assertIsNone(ds_os.version) - found = ds_os.get_data() + found = ds_os.get_data(timeout=0.1, retries=0) self.assertFalse(found) self.assertIsNone(ds_os.version) @@ -311,7 +311,7 @@ class TestOpenStackDataSource(test_helpers.HttprettyTestCase): 'timeout': 0, } self.assertIsNone(ds_os.version) - found = ds_os.get_data() + found = ds_os.get_data(timeout=0.1, retries=0) self.assertFalse(found) self.assertIsNone(ds_os.version) diff --git a/tests/unittests/test_datasource/test_smartos.py b/tests/unittests/test_datasource/test_smartos.py index 5c49966a..9c6c8768 100644 --- a/tests/unittests/test_datasource/test_smartos.py +++ b/tests/unittests/test_datasource/test_smartos.py @@ -24,6 +24,8 @@ from __future__ import print_function +from binascii import crc32 +import json import os import os.path import re @@ -31,21 +33,58 @@ import shutil import stat import tempfile import uuid -from binascii import crc32 -import serial +from cloudinit import serial +from cloudinit.sources import DataSourceSmartOS + import six from cloudinit import helpers as c_helpers -from cloudinit.sources import DataSourceSmartOS from cloudinit.util import b64e -from .. import helpers - -try: - from unittest import mock -except ImportError: - import mock +from ..helpers import mock, FilesystemMockingTestCase, TestCase + +SDC_NICS = json.loads(""" +[ + { + "nic_tag": "external", + "primary": true, + "mtu": 1500, + "model": "virtio", + "gateway": "8.12.42.1", + "netmask": "255.255.255.0", + "ip": "8.12.42.102", + "network_uuid": "992fc7ce-6aac-4b74-aed6-7b9d2c6c0bfe", + "gateways": [ + "8.12.42.1" + ], + "vlan_id": 324, + "mac": "90:b8:d0:f5:e4:f5", + "interface": "net0", + "ips": [ + "8.12.42.102/24" + ] + }, + { + "nic_tag": "sdc_overlay/16187209", + "gateway": "192.168.128.1", + "model": "virtio", + "mac": "90:b8:d0:a5:ff:cd", + "netmask": "255.255.252.0", + "ip": "192.168.128.93", + "network_uuid": "4cad71da-09bc-452b-986d-03562a03a0a9", + "gateways": [ + "192.168.128.1" + ], + "vlan_id": 2, + "mtu": 8500, + "interface": "net1", + "ips": [ + "192.168.128.93/22" + ] + } +] +""") MOCK_RETURNS = { 'hostname': 'test-host', @@ -60,79 +99,68 @@ MOCK_RETURNS = { 'sdc:vendor-data': '\n'.join(['VENDOR_DATA', '']), 'user-data': '\n'.join(['something', '']), 'user-script': '\n'.join(['/bin/true', '']), + 'sdc:nics': json.dumps(SDC_NICS), } DMI_DATA_RETURN = 'smartdc' -def get_mock_client(mockdata): - class MockMetadataClient(object): +class PsuedoJoyentClient(object): + def __init__(self, data=None): + if data is None: + data = MOCK_RETURNS.copy() + self.data = data + return + + def get(self, key, default=None, strip=False): + if key in self.data: + r = self.data[key] + if strip: + r = r.strip() + else: + r = default + return r - def __init__(self, serial): - pass + def get_json(self, key, default=None): + result = self.get(key, default=default) + if result is None: + return default + return json.loads(result) - def get_metadata(self, metadata_key): - return mockdata.get(metadata_key) - return MockMetadataClient + def exists(self): + return True -class TestSmartOSDataSource(helpers.FilesystemMockingTestCase): +class TestSmartOSDataSource(FilesystemMockingTestCase): def setUp(self): super(TestSmartOSDataSource, self).setUp() + dsmos = 'cloudinit.sources.DataSourceSmartOS' + patcher = mock.patch(dsmos + ".jmc_client_factory") + self.jmc_cfact = patcher.start() + self.addCleanup(patcher.stop) + patcher = mock.patch(dsmos + ".get_smartos_environ") + self.get_smartos_environ = patcher.start() + self.addCleanup(patcher.stop) + self.tmp = tempfile.mkdtemp() self.addCleanup(shutil.rmtree, self.tmp) - self.legacy_user_d = tempfile.mkdtemp() - self.addCleanup(shutil.rmtree, self.legacy_user_d) - - # If you should want to watch the logs... - self._log = None - self._log_file = None - self._log_handler = None - - # patch cloud_dir, so our 'seed_dir' is guaranteed empty self.paths = c_helpers.Paths({'cloud_dir': self.tmp}) - self.unapply = [] - super(TestSmartOSDataSource, self).setUp() + self.legacy_user_d = os.path.join(self.tmp, 'legacy_user_tmp') + os.mkdir(self.legacy_user_d) + + self.orig_lud = DataSourceSmartOS.LEGACY_USER_D + DataSourceSmartOS.LEGACY_USER_D = self.legacy_user_d def tearDown(self): - helpers.FilesystemMockingTestCase.tearDown(self) - if self._log_handler and self._log: - self._log.removeHandler(self._log_handler) - apply_patches([i for i in reversed(self.unapply)]) + DataSourceSmartOS.LEGACY_USER_D = self.orig_lud super(TestSmartOSDataSource, self).tearDown() - def _patchIn(self, root): - self.restore() - self.patchOS(root) - self.patchUtils(root) - - def apply_patches(self, patches): - ret = apply_patches(patches) - self.unapply += ret - - def _get_ds(self, sys_cfg=None, ds_cfg=None, mockdata=None, dmi_data=None, - is_lxbrand=False): - mod = DataSourceSmartOS - - if mockdata is None: - mockdata = MOCK_RETURNS - - if dmi_data is None: - dmi_data = DMI_DATA_RETURN - - def _dmi_data(): - return dmi_data - - def _os_uname(): - if not is_lxbrand: - # LP: #1243287. tests assume this runs, but running test on - # arm would cause them all to fail. - return ('LINUX', 'NODENAME', 'RELEASE', 'VERSION', 'x86_64') - else: - return ('LINUX', 'NODENAME', 'RELEASE', 'BRANDZ VIRTUAL LINUX', - 'X86_64') + def _get_ds(self, mockdata=None, mode=DataSourceSmartOS.SMARTOS_ENV_KVM, + sys_cfg=None, ds_cfg=None): + self.jmc_cfact.return_value = PsuedoJoyentClient(mockdata) + self.get_smartos_environ.return_value = mode if sys_cfg is None: sys_cfg = {} @@ -141,44 +169,8 @@ class TestSmartOSDataSource(helpers.FilesystemMockingTestCase): sys_cfg['datasource'] = sys_cfg.get('datasource', {}) sys_cfg['datasource']['SmartOS'] = ds_cfg - self.apply_patches([(mod, 'LEGACY_USER_D', self.legacy_user_d)]) - self.apply_patches([ - (mod, 'JoyentMetadataClient', get_mock_client(mockdata))]) - self.apply_patches([(mod, 'dmi_data', _dmi_data)]) - self.apply_patches([(os, 'uname', _os_uname)]) - self.apply_patches([(mod, 'device_exists', lambda d: True)]) - dsrc = mod.DataSourceSmartOS(sys_cfg, distro=None, - paths=self.paths) - self.apply_patches([(dsrc, '_get_seed_file_object', mock.MagicMock())]) - return dsrc - - def test_seed(self): - # default seed should be /dev/ttyS1 - dsrc = self._get_ds() - ret = dsrc.get_data() - self.assertTrue(ret) - self.assertEquals('kvm', dsrc.smartos_type) - self.assertEquals('/dev/ttyS1', dsrc.seed) - - def test_seed_lxbrand(self): - # default seed should be /dev/ttyS1 - dsrc = self._get_ds(is_lxbrand=True) - ret = dsrc.get_data() - self.assertTrue(ret) - self.assertEquals('lx-brand', dsrc.smartos_type) - self.assertEquals('/native/.zonecontrol/metadata.sock', dsrc.seed) - - def test_issmartdc(self): - dsrc = self._get_ds() - ret = dsrc.get_data() - self.assertTrue(ret) - self.assertTrue(dsrc.is_smartdc) - - def test_issmartdc_lxbrand(self): - dsrc = self._get_ds(is_lxbrand=True) - ret = dsrc.get_data() - self.assertTrue(ret) - self.assertTrue(dsrc.is_smartdc) + return DataSourceSmartOS.DataSourceSmartOS( + sys_cfg, distro=None, paths=self.paths) def test_no_base64(self): ds_cfg = {'no_base64_decode': ['test_var1'], 'all_base': True} @@ -190,110 +182,65 @@ class TestSmartOSDataSource(helpers.FilesystemMockingTestCase): dsrc = self._get_ds(mockdata=MOCK_RETURNS) ret = dsrc.get_data() self.assertTrue(ret) - self.assertEquals(MOCK_RETURNS['sdc:uuid'], - dsrc.metadata['instance-id']) + self.assertEqual(MOCK_RETURNS['sdc:uuid'], + dsrc.metadata['instance-id']) def test_root_keys(self): dsrc = self._get_ds(mockdata=MOCK_RETURNS) ret = dsrc.get_data() self.assertTrue(ret) - self.assertEquals(MOCK_RETURNS['root_authorized_keys'], - dsrc.metadata['public-keys']) + self.assertEqual(MOCK_RETURNS['root_authorized_keys'], + dsrc.metadata['public-keys']) def test_hostname_b64(self): dsrc = self._get_ds(mockdata=MOCK_RETURNS) ret = dsrc.get_data() self.assertTrue(ret) - self.assertEquals(MOCK_RETURNS['hostname'], - dsrc.metadata['local-hostname']) + self.assertEqual(MOCK_RETURNS['hostname'], + dsrc.metadata['local-hostname']) def test_hostname(self): dsrc = self._get_ds(mockdata=MOCK_RETURNS) ret = dsrc.get_data() self.assertTrue(ret) - self.assertEquals(MOCK_RETURNS['hostname'], - dsrc.metadata['local-hostname']) - - def test_base64_all(self): - # metadata provided base64_all of true - my_returns = MOCK_RETURNS.copy() - my_returns['base64_all'] = "true" - for k in ('hostname', 'cloud-init:user-data'): - my_returns[k] = b64e(my_returns[k]) - - dsrc = self._get_ds(mockdata=my_returns) - ret = dsrc.get_data() - self.assertTrue(ret) - self.assertEquals(MOCK_RETURNS['hostname'], - dsrc.metadata['local-hostname']) - self.assertEquals(MOCK_RETURNS['cloud-init:user-data'], - dsrc.userdata_raw) - self.assertEquals(MOCK_RETURNS['root_authorized_keys'], - dsrc.metadata['public-keys']) - self.assertEquals(MOCK_RETURNS['disable_iptables_flag'], - dsrc.metadata['iptables_disable']) - self.assertEquals(MOCK_RETURNS['enable_motd_sys_info'], - dsrc.metadata['motd_sys_info']) - - def test_b64_userdata(self): - my_returns = MOCK_RETURNS.copy() - my_returns['b64-cloud-init:user-data'] = "true" - my_returns['b64-hostname'] = "true" - for k in ('hostname', 'cloud-init:user-data'): - my_returns[k] = b64e(my_returns[k]) + self.assertEqual(MOCK_RETURNS['hostname'], + dsrc.metadata['local-hostname']) - dsrc = self._get_ds(mockdata=my_returns) - ret = dsrc.get_data() - self.assertTrue(ret) - self.assertEquals(MOCK_RETURNS['hostname'], - dsrc.metadata['local-hostname']) - self.assertEquals(MOCK_RETURNS['cloud-init:user-data'], - dsrc.userdata_raw) - self.assertEquals(MOCK_RETURNS['root_authorized_keys'], - dsrc.metadata['public-keys']) - - def test_b64_keys(self): - my_returns = MOCK_RETURNS.copy() - my_returns['base64_keys'] = 'hostname,ignored' - for k in ('hostname',): - my_returns[k] = b64e(my_returns[k]) - - dsrc = self._get_ds(mockdata=my_returns) + def test_userdata(self): + dsrc = self._get_ds(mockdata=MOCK_RETURNS) ret = dsrc.get_data() self.assertTrue(ret) - self.assertEquals(MOCK_RETURNS['hostname'], - dsrc.metadata['local-hostname']) - self.assertEquals(MOCK_RETURNS['cloud-init:user-data'], - dsrc.userdata_raw) + self.assertEqual(MOCK_RETURNS['user-data'], + dsrc.metadata['legacy-user-data']) + self.assertEqual(MOCK_RETURNS['cloud-init:user-data'], + dsrc.userdata_raw) - def test_userdata(self): + def test_sdc_nics(self): dsrc = self._get_ds(mockdata=MOCK_RETURNS) ret = dsrc.get_data() self.assertTrue(ret) - self.assertEquals(MOCK_RETURNS['user-data'], - dsrc.metadata['legacy-user-data']) - self.assertEquals(MOCK_RETURNS['cloud-init:user-data'], - dsrc.userdata_raw) + self.assertEqual(json.loads(MOCK_RETURNS['sdc:nics']), + dsrc.metadata['network-data']) def test_sdc_scripts(self): dsrc = self._get_ds(mockdata=MOCK_RETURNS) ret = dsrc.get_data() self.assertTrue(ret) - self.assertEquals(MOCK_RETURNS['user-script'], - dsrc.metadata['user-script']) + self.assertEqual(MOCK_RETURNS['user-script'], + dsrc.metadata['user-script']) legacy_script_f = "%s/user-script" % self.legacy_user_d self.assertTrue(os.path.exists(legacy_script_f)) self.assertTrue(os.path.islink(legacy_script_f)) user_script_perm = oct(os.stat(legacy_script_f)[stat.ST_MODE])[-3:] - self.assertEquals(user_script_perm, '700') + self.assertEqual(user_script_perm, '700') def test_scripts_shebanged(self): dsrc = self._get_ds(mockdata=MOCK_RETURNS) ret = dsrc.get_data() self.assertTrue(ret) - self.assertEquals(MOCK_RETURNS['user-script'], - dsrc.metadata['user-script']) + self.assertEqual(MOCK_RETURNS['user-script'], + dsrc.metadata['user-script']) legacy_script_f = "%s/user-script" % self.legacy_user_d self.assertTrue(os.path.exists(legacy_script_f)) @@ -301,9 +248,9 @@ class TestSmartOSDataSource(helpers.FilesystemMockingTestCase): shebang = None with open(legacy_script_f, 'r') as f: shebang = f.readlines()[0].strip() - self.assertEquals(shebang, "#!/bin/bash") + self.assertEqual(shebang, "#!/bin/bash") user_script_perm = oct(os.stat(legacy_script_f)[stat.ST_MODE])[-3:] - self.assertEquals(user_script_perm, '700') + self.assertEqual(user_script_perm, '700') def test_scripts_shebang_not_added(self): """ @@ -319,8 +266,8 @@ class TestSmartOSDataSource(helpers.FilesystemMockingTestCase): dsrc = self._get_ds(mockdata=my_returns) ret = dsrc.get_data() self.assertTrue(ret) - self.assertEquals(my_returns['user-script'], - dsrc.metadata['user-script']) + self.assertEqual(my_returns['user-script'], + dsrc.metadata['user-script']) legacy_script_f = "%s/user-script" % self.legacy_user_d self.assertTrue(os.path.exists(legacy_script_f)) @@ -328,7 +275,7 @@ class TestSmartOSDataSource(helpers.FilesystemMockingTestCase): shebang = None with open(legacy_script_f, 'r') as f: shebang = f.readlines()[0].strip() - self.assertEquals(shebang, "#!/usr/bin/perl") + self.assertEqual(shebang, "#!/usr/bin/perl") def test_userdata_removed(self): """ @@ -358,7 +305,7 @@ class TestSmartOSDataSource(helpers.FilesystemMockingTestCase): if re.match(r'.*\/mdata-user-data$', name_f): found_new = True print(name_f) - self.assertEquals(permissions, '400') + self.assertEqual(permissions, '400') self.assertFalse(found_new) @@ -366,8 +313,8 @@ class TestSmartOSDataSource(helpers.FilesystemMockingTestCase): dsrc = self._get_ds(mockdata=MOCK_RETURNS) ret = dsrc.get_data() self.assertTrue(ret) - self.assertEquals(MOCK_RETURNS['sdc:vendor-data'], - dsrc.metadata['vendor-data']) + self.assertEqual(MOCK_RETURNS['sdc:vendor-data'], + dsrc.metadata['vendor-data']) def test_default_vendor_data(self): my_returns = MOCK_RETURNS.copy() @@ -376,7 +323,7 @@ class TestSmartOSDataSource(helpers.FilesystemMockingTestCase): dsrc = self._get_ds(mockdata=my_returns) ret = dsrc.get_data() self.assertTrue(ret) - self.assertNotEquals(def_op_script, dsrc.metadata['vendor-data']) + self.assertNotEqual(def_op_script, dsrc.metadata['vendor-data']) # we expect default vendor-data is a boothook self.assertTrue(dsrc.vendordata_raw.startswith("#cloud-boothook")) @@ -385,15 +332,15 @@ class TestSmartOSDataSource(helpers.FilesystemMockingTestCase): dsrc = self._get_ds(mockdata=MOCK_RETURNS) ret = dsrc.get_data() self.assertTrue(ret) - self.assertEquals(MOCK_RETURNS['disable_iptables_flag'], - dsrc.metadata['iptables_disable']) + self.assertEqual(MOCK_RETURNS['disable_iptables_flag'], + dsrc.metadata['iptables_disable']) def test_motd_sys_info(self): dsrc = self._get_ds(mockdata=MOCK_RETURNS) ret = dsrc.get_data() self.assertTrue(ret) - self.assertEquals(MOCK_RETURNS['enable_motd_sys_info'], - dsrc.metadata['motd_sys_info']) + self.assertEqual(MOCK_RETURNS['enable_motd_sys_info'], + dsrc.metadata['motd_sys_info']) def test_default_ephemeral(self): # Test to make sure that the builtin config has the ephemeral @@ -430,21 +377,11 @@ class TestSmartOSDataSource(helpers.FilesystemMockingTestCase): mydscfg['disk_aliases']['FOO']) -def apply_patches(patches): - ret = [] - for (ref, name, replace) in patches: - if replace is None: - continue - orig = getattr(ref, name) - setattr(ref, name, replace) - ret.append((ref, name, orig)) - return ret - - -class TestJoyentMetadataClient(helpers.FilesystemMockingTestCase): +class TestJoyentMetadataClient(FilesystemMockingTestCase): def setUp(self): super(TestJoyentMetadataClient, self).setUp() + self.serial = mock.MagicMock(spec=serial.Serial) self.request_id = 0xabcdef12 self.metadata_value = 'value' @@ -481,7 +418,8 @@ class TestJoyentMetadataClient(helpers.FilesystemMockingTestCase): mock.Mock(return_value=self.request_id))) def _get_client(self): - return DataSourceSmartOS.JoyentMetadataClient(self.serial) + return DataSourceSmartOS.JoyentMetadataClient( + fp=self.serial, smartos_type=DataSourceSmartOS.SMARTOS_ENV_KVM) def assertEndsWith(self, haystack, prefix): self.assertTrue(haystack.endswith(prefix), @@ -495,7 +433,7 @@ class TestJoyentMetadataClient(helpers.FilesystemMockingTestCase): def test_get_metadata_writes_a_single_line(self): client = self._get_client() - client.get_metadata('some_key') + client.get('some_key') self.assertEqual(1, self.serial.write.call_count) written_line = self.serial.write.call_args[0][0] print(type(written_line)) @@ -505,7 +443,7 @@ class TestJoyentMetadataClient(helpers.FilesystemMockingTestCase): def _get_written_line(self, key='some_key'): client = self._get_client() - client.get_metadata(key) + client.get(key) return self.serial.write.call_args[0][0] def test_get_metadata_writes_bytes(self): @@ -549,32 +487,32 @@ class TestJoyentMetadataClient(helpers.FilesystemMockingTestCase): def test_get_metadata_reads_a_line(self): client = self._get_client() - client.get_metadata('some_key') + client.get('some_key') self.assertEqual(self.metasource_data_len, self.serial.read.call_count) def test_get_metadata_returns_valid_value(self): client = self._get_client() - value = client.get_metadata('some_key') + value = client.get('some_key') self.assertEqual(self.metadata_value, value) def test_get_metadata_throws_exception_for_incorrect_length(self): self.response_parts['length'] = 0 client = self._get_client() self.assertRaises(DataSourceSmartOS.JoyentMetadataFetchException, - client.get_metadata, 'some_key') + client.get, 'some_key') def test_get_metadata_throws_exception_for_incorrect_crc(self): self.response_parts['crc'] = 'deadbeef' client = self._get_client() self.assertRaises(DataSourceSmartOS.JoyentMetadataFetchException, - client.get_metadata, 'some_key') + client.get, 'some_key') def test_get_metadata_throws_exception_for_request_id_mismatch(self): self.response_parts['request_id'] = 'deadbeef' client = self._get_client() client._checksum = lambda _: self.response_parts['crc'] self.assertRaises(DataSourceSmartOS.JoyentMetadataFetchException, - client.get_metadata, 'some_key') + client.get, 'some_key') def test_get_metadata_returns_None_if_value_not_found(self): self.response_parts['payload'] = '' @@ -582,4 +520,24 @@ class TestJoyentMetadataClient(helpers.FilesystemMockingTestCase): self.response_parts['length'] = 17 client = self._get_client() client._checksum = lambda _: self.response_parts['crc'] - self.assertIsNone(client.get_metadata('some_key')) + self.assertIsNone(client.get('some_key')) + + +class TestNetworkConversion(TestCase): + + def test_convert_simple(self): + expected = { + 'version': 1, + 'config': [ + {'name': 'net0', 'type': 'physical', + 'subnets': [{'type': 'static', 'gateway': '8.12.42.1', + 'netmask': '255.255.255.0', + 'address': '8.12.42.102/24'}], + 'mtu': 1500, 'mac_address': '90:b8:d0:f5:e4:f5'}, + {'name': 'net1', 'type': 'physical', + 'subnets': [{'type': 'static', 'gateway': '192.168.128.1', + 'netmask': '255.255.252.0', + 'address': '192.168.128.93/22'}], + 'mtu': 8500, 'mac_address': '90:b8:d0:a5:ff:cd'}]} + found = DataSourceSmartOS.convert_smartos_network_data(SDC_NICS) + self.assertEqual(expected, found) |