summaryrefslogtreecommitdiff
path: root/tests/unittests/test_datasource
diff options
context:
space:
mode:
authorScott Moser <smoser@brickies.net>2016-08-23 16:48:42 -0400
committerScott Moser <smoser@brickies.net>2016-08-23 16:48:42 -0400
commit86e2614b6c3db342aa5a6590e91b9e459bbcb484 (patch)
tree6996805b91a0c1c31f3afea3a689348bf760de63 /tests/unittests/test_datasource
parentc937c66dd0d1ad7b73dcc2efb5eb4c16b05f4479 (diff)
parent9f7ce5f090689b664ffce7e0b4ac78bfeafd1a79 (diff)
downloadvyos-cloud-init-86e2614b6c3db342aa5a6590e91b9e459bbcb484.tar.gz
vyos-cloud-init-86e2614b6c3db342aa5a6590e91b9e459bbcb484.zip
merge trunk at 0.7.7~bzr1245
Diffstat (limited to 'tests/unittests/test_datasource')
-rw-r--r--tests/unittests/test_datasource/test_altcloud.py48
-rw-r--r--tests/unittests/test_datasource/test_azure.py30
-rw-r--r--tests/unittests/test_datasource/test_azure_helper.py14
-rw-r--r--tests/unittests/test_datasource/test_cloudsigma.py2
-rw-r--r--tests/unittests/test_datasource/test_cloudstack.py10
-rw-r--r--tests/unittests/test_datasource/test_configdrive.py238
-rw-r--r--tests/unittests/test_datasource/test_digitalocean.py2
-rw-r--r--tests/unittests/test_datasource/test_gce.py4
-rw-r--r--tests/unittests/test_datasource/test_maas.py4
-rw-r--r--tests/unittests/test_datasource/test_nocloud.py15
-rw-r--r--tests/unittests/test_datasource/test_openstack.py102
-rw-r--r--tests/unittests/test_datasource/test_smartos.py360
12 files changed, 473 insertions, 356 deletions
diff --git a/tests/unittests/test_datasource/test_altcloud.py b/tests/unittests/test_datasource/test_altcloud.py
index 85759c68..12966563 100644
--- a/tests/unittests/test_datasource/test_altcloud.py
+++ b/tests/unittests/test_datasource/test_altcloud.py
@@ -134,7 +134,7 @@ class TestGetCloudType(TestCase):
'''
util.read_dmi_data = _dmi_data('RHEV')
dsrc = DataSourceAltCloud({}, None, self.paths)
- self.assertEquals('RHEV', dsrc.get_cloud_type())
+ self.assertEqual('RHEV', dsrc.get_cloud_type())
def test_vsphere(self):
'''
@@ -143,7 +143,7 @@ class TestGetCloudType(TestCase):
'''
util.read_dmi_data = _dmi_data('VMware Virtual Platform')
dsrc = DataSourceAltCloud({}, None, self.paths)
- self.assertEquals('VSPHERE', dsrc.get_cloud_type())
+ self.assertEqual('VSPHERE', dsrc.get_cloud_type())
def test_unknown(self):
'''
@@ -152,7 +152,7 @@ class TestGetCloudType(TestCase):
'''
util.read_dmi_data = _dmi_data('Unrecognized Platform')
dsrc = DataSourceAltCloud({}, None, self.paths)
- self.assertEquals('UNKNOWN', dsrc.get_cloud_type())
+ self.assertEqual('UNKNOWN', dsrc.get_cloud_type())
class TestGetDataCloudInfoFile(TestCase):
@@ -187,7 +187,7 @@ class TestGetDataCloudInfoFile(TestCase):
_write_cloud_info_file('RHEV')
dsrc = DataSourceAltCloud({}, None, self.paths)
dsrc.user_data_rhevm = lambda: True
- self.assertEquals(True, dsrc.get_data())
+ self.assertEqual(True, dsrc.get_data())
def test_vsphere(self):
'''Success Test module get_data() forcing VSPHERE.'''
@@ -195,7 +195,7 @@ class TestGetDataCloudInfoFile(TestCase):
_write_cloud_info_file('VSPHERE')
dsrc = DataSourceAltCloud({}, None, self.paths)
dsrc.user_data_vsphere = lambda: True
- self.assertEquals(True, dsrc.get_data())
+ self.assertEqual(True, dsrc.get_data())
def test_fail_rhev(self):
'''Failure Test module get_data() forcing RHEV.'''
@@ -203,7 +203,7 @@ class TestGetDataCloudInfoFile(TestCase):
_write_cloud_info_file('RHEV')
dsrc = DataSourceAltCloud({}, None, self.paths)
dsrc.user_data_rhevm = lambda: False
- self.assertEquals(False, dsrc.get_data())
+ self.assertEqual(False, dsrc.get_data())
def test_fail_vsphere(self):
'''Failure Test module get_data() forcing VSPHERE.'''
@@ -211,14 +211,14 @@ class TestGetDataCloudInfoFile(TestCase):
_write_cloud_info_file('VSPHERE')
dsrc = DataSourceAltCloud({}, None, self.paths)
dsrc.user_data_vsphere = lambda: False
- self.assertEquals(False, dsrc.get_data())
+ self.assertEqual(False, dsrc.get_data())
def test_unrecognized(self):
'''Failure Test module get_data() forcing unrecognized.'''
_write_cloud_info_file('unrecognized')
dsrc = DataSourceAltCloud({}, None, self.paths)
- self.assertEquals(False, dsrc.get_data())
+ self.assertEqual(False, dsrc.get_data())
class TestGetDataNoCloudInfoFile(TestCase):
@@ -250,7 +250,7 @@ class TestGetDataNoCloudInfoFile(TestCase):
util.read_dmi_data = _dmi_data('RHEV Hypervisor')
dsrc = DataSourceAltCloud({}, None, self.paths)
dsrc.user_data_rhevm = lambda: True
- self.assertEquals(True, dsrc.get_data())
+ self.assertEqual(True, dsrc.get_data())
def test_vsphere_no_cloud_file(self):
'''Test No cloud info file module get_data() forcing VSPHERE.'''
@@ -258,14 +258,14 @@ class TestGetDataNoCloudInfoFile(TestCase):
util.read_dmi_data = _dmi_data('VMware Virtual Platform')
dsrc = DataSourceAltCloud({}, None, self.paths)
dsrc.user_data_vsphere = lambda: True
- self.assertEquals(True, dsrc.get_data())
+ self.assertEqual(True, dsrc.get_data())
def test_failure_no_cloud_file(self):
'''Test No cloud info file module get_data() forcing unrecognized.'''
util.read_dmi_data = _dmi_data('Unrecognized Platform')
dsrc = DataSourceAltCloud({}, None, self.paths)
- self.assertEquals(False, dsrc.get_data())
+ self.assertEqual(False, dsrc.get_data())
class TestUserDataRhevm(TestCase):
@@ -305,7 +305,7 @@ class TestUserDataRhevm(TestCase):
dsrc = DataSourceAltCloud({}, None, self.paths)
- self.assertEquals(False, dsrc.user_data_rhevm())
+ self.assertEqual(False, dsrc.user_data_rhevm())
def test_modprobe_fails(self):
'''Test user_data_rhevm() where modprobe fails.'''
@@ -315,7 +315,7 @@ class TestUserDataRhevm(TestCase):
dsrc = DataSourceAltCloud({}, None, self.paths)
- self.assertEquals(False, dsrc.user_data_rhevm())
+ self.assertEqual(False, dsrc.user_data_rhevm())
def test_no_modprobe_cmd(self):
'''Test user_data_rhevm() with no modprobe command.'''
@@ -325,7 +325,7 @@ class TestUserDataRhevm(TestCase):
dsrc = DataSourceAltCloud({}, None, self.paths)
- self.assertEquals(False, dsrc.user_data_rhevm())
+ self.assertEqual(False, dsrc.user_data_rhevm())
def test_udevadm_fails(self):
'''Test user_data_rhevm() where udevadm fails.'''
@@ -335,7 +335,7 @@ class TestUserDataRhevm(TestCase):
dsrc = DataSourceAltCloud({}, None, self.paths)
- self.assertEquals(False, dsrc.user_data_rhevm())
+ self.assertEqual(False, dsrc.user_data_rhevm())
def test_no_udevadm_cmd(self):
'''Test user_data_rhevm() with no udevadm command.'''
@@ -345,7 +345,7 @@ class TestUserDataRhevm(TestCase):
dsrc = DataSourceAltCloud({}, None, self.paths)
- self.assertEquals(False, dsrc.user_data_rhevm())
+ self.assertEqual(False, dsrc.user_data_rhevm())
class TestUserDataVsphere(TestCase):
@@ -380,7 +380,7 @@ class TestUserDataVsphere(TestCase):
dsrc = DataSourceAltCloud({}, None, self.paths)
- self.assertEquals(False, dsrc.user_data_vsphere())
+ self.assertEqual(False, dsrc.user_data_vsphere())
class TestReadUserDataCallback(TestCase):
@@ -408,8 +408,8 @@ class TestReadUserDataCallback(TestCase):
def test_callback_both(self):
'''Test read_user_data_callback() with both files.'''
- self.assertEquals('test user data',
- read_user_data_callback(self.mount_dir))
+ self.assertEqual('test user data',
+ read_user_data_callback(self.mount_dir))
def test_callback_dc(self):
'''Test read_user_data_callback() with only DC file.'''
@@ -418,8 +418,8 @@ class TestReadUserDataCallback(TestCase):
dc_file=False,
non_dc_file=True)
- self.assertEquals('test user data',
- read_user_data_callback(self.mount_dir))
+ self.assertEqual('test user data',
+ read_user_data_callback(self.mount_dir))
def test_callback_non_dc(self):
'''Test read_user_data_callback() with only non-DC file.'''
@@ -428,14 +428,14 @@ class TestReadUserDataCallback(TestCase):
dc_file=True,
non_dc_file=False)
- self.assertEquals('test user data',
- read_user_data_callback(self.mount_dir))
+ self.assertEqual('test user data',
+ read_user_data_callback(self.mount_dir))
def test_callback_none(self):
'''Test read_user_data_callback() no files are found.'''
_remove_user_data_files(self.mount_dir)
- self.assertEquals(None, read_user_data_callback(self.mount_dir))
+ self.assertEqual(None, read_user_data_callback(self.mount_dir))
def force_arch(arch=None):
diff --git a/tests/unittests/test_datasource/test_azure.py b/tests/unittests/test_datasource/test_azure.py
index 444e2799..e90e903c 100644
--- a/tests/unittests/test_datasource/test_azure.py
+++ b/tests/unittests/test_datasource/test_azure.py
@@ -1,24 +1,16 @@
from cloudinit import helpers
from cloudinit.util import b64e, decode_binary, load_file
from cloudinit.sources import DataSourceAzure
-from ..helpers import TestCase, populate_dir
-try:
- from unittest import mock
-except ImportError:
- import mock
-try:
- from contextlib import ExitStack
-except ImportError:
- from contextlib2 import ExitStack
+from ..helpers import TestCase, populate_dir, mock, ExitStack, PY26, SkipTest
import crypt
import os
-import stat
-import yaml
import shutil
+import stat
import tempfile
import xml.etree.ElementTree as ET
+import yaml
def construct_valid_ovf_env(data=None, pubkeys=None, userdata=None):
@@ -83,6 +75,8 @@ class TestAzureDataSource(TestCase):
def setUp(self):
super(TestAzureDataSource, self).setUp()
+ if PY26:
+ raise SkipTest("Does not work on python 2.6")
self.tmp = tempfile.mkdtemp()
self.addCleanup(shutil.rmtree, self.tmp)
@@ -165,7 +159,7 @@ class TestAzureDataSource(TestCase):
def tags_equal(x, y):
for x_tag, x_val in x.items():
y_val = y.get(x_val.tag)
- self.assertEquals(x_val.text, y_val.text)
+ self.assertEqual(x_val.text, y_val.text)
old_cnt = create_tag_index(oxml)
new_cnt = create_tag_index(nxml)
@@ -354,8 +348,8 @@ class TestAzureDataSource(TestCase):
self.assertTrue(ret)
cfg = dsrc.get_config_obj()
- self.assertEquals(dsrc.device_name_to_device("ephemeral0"),
- "/dev/sdb")
+ self.assertEqual(dsrc.device_name_to_device("ephemeral0"),
+ "/dev/sdb")
assert 'disk_setup' in cfg
assert 'fs_setup' in cfg
self.assertIsInstance(cfg['disk_setup'], dict)
@@ -404,15 +398,15 @@ class TestAzureDataSource(TestCase):
self.xml_notequals(data['ovfcontent'], on_disk_ovf)
# Make sure that the redacted password on disk is not used by CI
- self.assertNotEquals(dsrc.cfg.get('password'),
- DataSourceAzure.DEF_PASSWD_REDACTION)
+ self.assertNotEqual(dsrc.cfg.get('password'),
+ DataSourceAzure.DEF_PASSWD_REDACTION)
# Make sure that the password was really encrypted
et = ET.fromstring(on_disk_ovf)
for elem in et.iter():
if 'UserPassword' in elem.tag:
- self.assertEquals(DataSourceAzure.DEF_PASSWD_REDACTION,
- elem.text)
+ self.assertEqual(DataSourceAzure.DEF_PASSWD_REDACTION,
+ elem.text)
def test_ovf_env_arrives_in_waagent_dir(self):
xml = construct_valid_ovf_env(data={}, userdata="FOODATA")
diff --git a/tests/unittests/test_datasource/test_azure_helper.py b/tests/unittests/test_datasource/test_azure_helper.py
index 1134199b..65202ff0 100644
--- a/tests/unittests/test_datasource/test_azure_helper.py
+++ b/tests/unittests/test_datasource/test_azure_helper.py
@@ -1,17 +1,8 @@
import os
from cloudinit.sources.helpers import azure as azure_helper
-from ..helpers import TestCase
-try:
- from unittest import mock
-except ImportError:
- import mock
-
-try:
- from contextlib import ExitStack
-except ImportError:
- from contextlib2 import ExitStack
+from ..helpers import ExitStack, mock, TestCase
GOAL_STATE_TEMPLATE = """\
@@ -70,7 +61,7 @@ class TestFindEndpoint(TestCase):
def test_missing_special_azure_line(self):
self.load_file.return_value = ''
- self.assertRaises(Exception,
+ self.assertRaises(ValueError,
azure_helper.WALinuxAgentShim.find_endpoint)
@staticmethod
@@ -287,6 +278,7 @@ class TestOpenSSLManager(TestCase):
self.subp.side_effect = capture_directory
manager = azure_helper.OpenSSLManager()
self.assertEqual(manager.tmpdir, subp_directory['path'])
+ manager.clean_up()
@mock.patch.object(azure_helper, 'cd', mock.MagicMock())
@mock.patch.object(azure_helper.tempfile, 'mkdtemp', mock.MagicMock())
diff --git a/tests/unittests/test_datasource/test_cloudsigma.py b/tests/unittests/test_datasource/test_cloudsigma.py
index 772d189a..2a42ce0c 100644
--- a/tests/unittests/test_datasource/test_cloudsigma.py
+++ b/tests/unittests/test_datasource/test_cloudsigma.py
@@ -1,4 +1,5 @@
# coding: utf-8
+
import copy
from cloudinit.cs_utils import Cepko
@@ -6,7 +7,6 @@ from cloudinit.sources import DataSourceCloudSigma
from .. import helpers as test_helpers
-
SERVER_CONTEXT = {
"cpu": 1000,
"cpus_instead_of_cores": False,
diff --git a/tests/unittests/test_datasource/test_cloudstack.py b/tests/unittests/test_datasource/test_cloudstack.py
index 656d80d1..b1aab17b 100644
--- a/tests/unittests/test_datasource/test_cloudstack.py
+++ b/tests/unittests/test_datasource/test_cloudstack.py
@@ -1,15 +1,7 @@
from cloudinit import helpers
from cloudinit.sources.DataSourceCloudStack import DataSourceCloudStack
-from ..helpers import TestCase
-try:
- from unittest import mock
-except ImportError:
- import mock
-try:
- from contextlib import ExitStack
-except ImportError:
- from contextlib2 import ExitStack
+from ..helpers import TestCase, mock, ExitStack
class TestCloudStackPasswordFetching(TestCase):
diff --git a/tests/unittests/test_datasource/test_configdrive.py b/tests/unittests/test_datasource/test_configdrive.py
index 89b15f54..18551b92 100644
--- a/tests/unittests/test_datasource/test_configdrive.py
+++ b/tests/unittests/test_datasource/test_configdrive.py
@@ -5,22 +5,15 @@ import shutil
import six
import tempfile
-try:
- from unittest import mock
-except ImportError:
- import mock
-try:
- from contextlib import ExitStack
-except ImportError:
- from contextlib2 import ExitStack
-
from cloudinit import helpers
+from cloudinit.net import eni
+from cloudinit.net import network_state
from cloudinit import settings
from cloudinit.sources import DataSourceConfigDrive as ds
from cloudinit.sources.helpers import openstack
from cloudinit import util
-from ..helpers import TestCase
+from ..helpers import TestCase, ExitStack, mock
PUBKEY = u'ssh-rsa AAAAB3NzaC1....sIkJhq8wdX+4I3A4cYbYP ubuntu@server-460\n'
@@ -73,7 +66,7 @@ NETWORK_DATA = {
'type': 'ovs', 'mtu': None, 'id': 'tap2f88d109-5b'},
{'vif_id': '1a5382f8-04c5-4d75-ab98-d666c1ef52cc',
'ethernet_mac_address': 'fa:16:3e:05:30:fe',
- 'type': 'ovs', 'mtu': None, 'id': 'tap1a5382f8-04'}
+ 'type': 'ovs', 'mtu': None, 'id': 'tap1a5382f8-04', 'name': 'nic0'}
],
'networks': [
{'link': 'tap2ecc7709-b3', 'type': 'ipv4_dhcp',
@@ -88,6 +81,35 @@ NETWORK_DATA = {
]
}
+NETWORK_DATA_2 = {
+ "services": [
+ {"type": "dns", "address": "1.1.1.191"},
+ {"type": "dns", "address": "1.1.1.4"}],
+ "networks": [
+ {"network_id": "d94bbe94-7abc-48d4-9c82-4628ea26164a", "type": "ipv4",
+ "netmask": "255.255.255.248", "link": "eth0",
+ "routes": [{"netmask": "0.0.0.0", "network": "0.0.0.0",
+ "gateway": "2.2.2.9"}],
+ "ip_address": "2.2.2.10", "id": "network0-ipv4"},
+ {"network_id": "ca447c83-6409-499b-aaef-6ad1ae995348", "type": "ipv4",
+ "netmask": "255.255.255.224", "link": "eth1",
+ "routes": [], "ip_address": "3.3.3.24", "id": "network1-ipv4"}],
+ "links": [
+ {"ethernet_mac_address": "fa:16:3e:dd:50:9a", "mtu": 1500,
+ "type": "vif", "id": "eth0", "vif_id": "vif-foo1"},
+ {"ethernet_mac_address": "fa:16:3e:a8:14:69", "mtu": 1500,
+ "type": "vif", "id": "eth1", "vif_id": "vif-foo2"}]
+}
+
+
+KNOWN_MACS = {
+ 'fa:16:3e:69:b0:58': 'enp0s1',
+ 'fa:16:3e:d4:57:ad': 'enp0s2',
+ 'fa:16:3e:dd:50:9a': 'foo1',
+ 'fa:16:3e:a8:14:69': 'foo2',
+ 'fa:16:3e:ed:9a:59': 'foo3',
+}
+
CFG_DRIVE_FILES_V2 = {
'ec2/2009-04-04/meta-data.json': json.dumps(EC2_META),
'ec2/2009-04-04/user-data': USER_DATA,
@@ -151,7 +173,7 @@ class TestConfigDriveDataSource(TestCase):
mock.patch.object(os.path, 'exists',
side_effect=exists_side_effect()))
device = cfg_ds.device_name_to_device(name)
- self.assertEquals(dev_name, device)
+ self.assertEqual(dev_name, device)
find_mock.assert_called_once_with(mock.ANY)
self.assertEqual(exists_mock.call_count, 2)
@@ -179,7 +201,7 @@ class TestConfigDriveDataSource(TestCase):
mock.patch.object(os.path, 'exists',
return_value=True))
device = cfg_ds.device_name_to_device(name)
- self.assertEquals(dev_name, device)
+ self.assertEqual(dev_name, device)
find_mock.assert_called_once_with(mock.ANY)
exists_mock.assert_called_once_with(mock.ANY)
@@ -214,7 +236,7 @@ class TestConfigDriveDataSource(TestCase):
with mock.patch.object(os.path, 'exists',
side_effect=exists_side_effect()):
device = cfg_ds.device_name_to_device(name)
- self.assertEquals(dev_name, device)
+ self.assertEqual(dev_name, device)
# We don't assert the call count for os.path.exists() because
# not all of the entries in name_tests results in two calls to
# that function. Specifically, 'root2k' doesn't seem to call
@@ -242,7 +264,7 @@ class TestConfigDriveDataSource(TestCase):
for name, dev_name in name_tests.items():
with mock.patch.object(os.path, 'exists', return_value=True):
device = cfg_ds.device_name_to_device(name)
- self.assertEquals(dev_name, device)
+ self.assertEqual(dev_name, device)
def test_dir_valid(self):
"""Verify a dir is read as such."""
@@ -348,32 +370,200 @@ class TestConfigDriveDataSource(TestCase):
util.find_devs_with = orig_find_devs_with
util.is_partition = orig_is_partition
- def test_pubkeys_v2(self):
+ @mock.patch('cloudinit.sources.DataSourceConfigDrive.on_first_boot')
+ def test_pubkeys_v2(self, on_first_boot):
"""Verify that public-keys work in config-drive-v2."""
populate_dir(self.tmp, CFG_DRIVE_FILES_V2)
myds = cfg_ds_from_dir(self.tmp)
self.assertEqual(myds.get_public_ssh_keys(),
[OSTACK_META['public_keys']['mykey']])
- def test_network_data_is_found(self):
+
+class TestNetJson(TestCase):
+ def setUp(self):
+ super(TestNetJson, self).setUp()
+ self.tmp = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, self.tmp)
+ self.maxDiff = None
+
+ @mock.patch('cloudinit.sources.DataSourceConfigDrive.on_first_boot')
+ def test_network_data_is_found(self, on_first_boot):
"""Verify that network_data is present in ds in config-drive-v2."""
populate_dir(self.tmp, CFG_DRIVE_FILES_V2)
myds = cfg_ds_from_dir(self.tmp)
- self.assertEqual(myds.network_json, NETWORK_DATA)
+ self.assertIsNotNone(myds.network_json)
- def test_network_config_is_converted(self):
+ @mock.patch('cloudinit.sources.DataSourceConfigDrive.on_first_boot')
+ def test_network_config_is_converted(self, on_first_boot):
"""Verify that network_data is converted and present on ds object."""
populate_dir(self.tmp, CFG_DRIVE_FILES_V2)
myds = cfg_ds_from_dir(self.tmp)
- network_config = ds.convert_network_data(NETWORK_DATA)
+ network_config = openstack.convert_net_json(NETWORK_DATA,
+ known_macs=KNOWN_MACS)
self.assertEqual(myds.network_config, network_config)
+ def test_network_config_conversions(self):
+ """Tests a bunch of input network json and checks the
+ expected conversions."""
+ in_datas = [
+ NETWORK_DATA,
+ {
+ 'services': [{'type': 'dns', 'address': '172.19.0.12'}],
+ 'networks': [{
+ 'network_id': 'dacd568d-5be6-4786-91fe-750c374b78b4',
+ 'type': 'ipv4',
+ 'netmask': '255.255.252.0',
+ 'link': 'tap1a81968a-79',
+ 'routes': [{
+ 'netmask': '0.0.0.0',
+ 'network': '0.0.0.0',
+ 'gateway': '172.19.3.254',
+ }],
+ 'ip_address': '172.19.1.34',
+ 'id': 'network0',
+ }],
+ 'links': [{
+ 'type': 'bridge',
+ 'vif_id': '1a81968a-797a-400f-8a80-567f997eb93f',
+ 'ethernet_mac_address': 'fa:16:3e:ed:9a:59',
+ 'id': 'tap1a81968a-79',
+ 'mtu': None,
+ }],
+ },
+ ]
+ out_datas = [
+ {
+ 'version': 1,
+ 'config': [
+ {
+ 'subnets': [{'type': 'dhcp4'}],
+ 'type': 'physical',
+ 'mac_address': 'fa:16:3e:69:b0:58',
+ 'name': 'enp0s1',
+ 'mtu': None,
+ },
+ {
+ 'subnets': [{'type': 'dhcp4'}],
+ 'type': 'physical',
+ 'mac_address': 'fa:16:3e:d4:57:ad',
+ 'name': 'enp0s2',
+ 'mtu': None,
+ },
+ {
+ 'subnets': [{'type': 'dhcp4'}],
+ 'type': 'physical',
+ 'mac_address': 'fa:16:3e:05:30:fe',
+ 'name': 'nic0',
+ 'mtu': None,
+ },
+ {
+ 'type': 'nameserver',
+ 'address': '199.204.44.24',
+ },
+ {
+ 'type': 'nameserver',
+ 'address': '199.204.47.54',
+ }
+ ],
+
+ },
+ {
+ 'version': 1,
+ 'config': [
+ {
+ 'name': 'foo3',
+ 'mac_address': 'fa:16:3e:ed:9a:59',
+ 'mtu': None,
+ 'type': 'physical',
+ 'subnets': [
+ {
+ 'address': '172.19.1.34',
+ 'netmask': '255.255.252.0',
+ 'type': 'static',
+ 'ipv4': True,
+ 'routes': [{
+ 'gateway': '172.19.3.254',
+ 'netmask': '0.0.0.0',
+ 'network': '0.0.0.0',
+ }],
+ }
+ ]
+ },
+ {
+ 'type': 'nameserver',
+ 'address': '172.19.0.12',
+ }
+ ],
+ },
+ ]
+ for in_data, out_data in zip(in_datas, out_datas):
+ conv_data = openstack.convert_net_json(in_data,
+ known_macs=KNOWN_MACS)
+ self.assertEqual(out_data, conv_data)
+
+
+class TestConvertNetworkData(TestCase):
+ def setUp(self):
+ super(TestConvertNetworkData, self).setUp()
+ self.tmp = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, self.tmp)
+
+ def _getnames_in_config(self, ncfg):
+ return set([n['name'] for n in ncfg['config']
+ if n['type'] == 'physical'])
+
+ def test_conversion_fills_names(self):
+ ncfg = openstack.convert_net_json(NETWORK_DATA, known_macs=KNOWN_MACS)
+ expected = set(['nic0', 'enp0s1', 'enp0s2'])
+ found = self._getnames_in_config(ncfg)
+ self.assertEqual(found, expected)
+
+ @mock.patch('cloudinit.net.get_interfaces_by_mac')
+ def test_convert_reads_system_prefers_name(self, get_interfaces_by_mac):
+ macs = KNOWN_MACS.copy()
+ macs.update({'fa:16:3e:05:30:fe': 'foonic1',
+ 'fa:16:3e:69:b0:58': 'ens1'})
+ get_interfaces_by_mac.return_value = macs
+
+ ncfg = openstack.convert_net_json(NETWORK_DATA)
+ expected = set(['nic0', 'ens1', 'enp0s2'])
+ found = self._getnames_in_config(ncfg)
+ self.assertEqual(found, expected)
+
+ def test_convert_raises_value_error_on_missing_name(self):
+ macs = {'aa:aa:aa:aa:aa:00': 'ens1'}
+ self.assertRaises(ValueError, openstack.convert_net_json,
+ NETWORK_DATA, known_macs=macs)
+
+ def test_conversion_with_route(self):
+ ncfg = openstack.convert_net_json(NETWORK_DATA_2,
+ known_macs=KNOWN_MACS)
+ # not the best test, but see that we get a route in the
+ # network config and that it gets rendered to an ENI file
+ routes = []
+ for n in ncfg['config']:
+ for s in n.get('subnets', []):
+ routes.extend(s.get('routes', []))
+ self.assertIn(
+ {'network': '0.0.0.0', 'netmask': '0.0.0.0', 'gateway': '2.2.2.9'},
+ routes)
+ eni_renderer = eni.Renderer()
+ eni_renderer.render_network_state(
+ self.tmp, network_state.parse_net_config_data(ncfg))
+ with open(os.path.join(self.tmp, "etc",
+ "network", "interfaces"), 'r') as f:
+ eni_rendering = f.read()
+ self.assertIn("route add default gw 2.2.2.9", eni_rendering)
+
def cfg_ds_from_dir(seed_d):
- found = ds.read_config_drive(seed_d)
cfg_ds = ds.DataSourceConfigDrive(settings.CFG_BUILTIN, None,
helpers.Paths({}))
- populate_ds_from_read_config(cfg_ds, seed_d, found)
+ cfg_ds.seed_dir = seed_d
+ cfg_ds.known_macs = KNOWN_MACS.copy()
+ if not cfg_ds.get_data():
+ raise RuntimeError("Data source did not extract itself from"
+ " seed directory %s" % seed_d)
return cfg_ds
@@ -387,7 +577,8 @@ def populate_ds_from_read_config(cfg_ds, source, results):
cfg_ds.userdata_raw = results.get('userdata')
cfg_ds.version = results.get('version')
cfg_ds.network_json = results.get('networkdata')
- cfg_ds._network_config = ds.convert_network_data(cfg_ds.network_json)
+ cfg_ds._network_config = openstack.convert_net_json(
+ cfg_ds.network_json, known_macs=KNOWN_MACS)
def populate_dir(seed_dir, files):
@@ -400,7 +591,6 @@ def populate_dir(seed_dir, files):
mode = "w"
else:
mode = "wb"
-
with open(path, mode) as fp:
fp.write(content)
diff --git a/tests/unittests/test_datasource/test_digitalocean.py b/tests/unittests/test_datasource/test_digitalocean.py
index 679d1b82..8936a1e3 100644
--- a/tests/unittests/test_datasource/test_digitalocean.py
+++ b/tests/unittests/test_datasource/test_digitalocean.py
@@ -19,8 +19,8 @@ import re
from six.moves.urllib_parse import urlparse
-from cloudinit import settings
from cloudinit import helpers
+from cloudinit import settings
from cloudinit.sources import DataSourceDigitalOcean
from .. import helpers as test_helpers
diff --git a/tests/unittests/test_datasource/test_gce.py b/tests/unittests/test_datasource/test_gce.py
index fa714070..6e62a4d2 100644
--- a/tests/unittests/test_datasource/test_gce.py
+++ b/tests/unittests/test_datasource/test_gce.py
@@ -20,8 +20,8 @@ import re
from base64 import b64encode, b64decode
from six.moves.urllib_parse import urlparse
-from cloudinit import settings
from cloudinit import helpers
+from cloudinit import settings
from cloudinit.sources import DataSourceGCE
from .. import helpers as test_helpers
@@ -52,7 +52,7 @@ GCE_META_ENCODING = {
HEADERS = {'X-Google-Metadata-Request': 'True'}
MD_URL_RE = re.compile(
- r'http://metadata.google.internal./computeMetadata/v1/.*')
+ r'http://metadata.google.internal/computeMetadata/v1/.*')
def _set_mock_metadata(gce_meta=None):
diff --git a/tests/unittests/test_datasource/test_maas.py b/tests/unittests/test_datasource/test_maas.py
index 77d15cac..f66f1c6d 100644
--- a/tests/unittests/test_datasource/test_maas.py
+++ b/tests/unittests/test_datasource/test_maas.py
@@ -104,13 +104,13 @@ class TestMAASDataSource(TestCase):
'meta-data/local-hostname': 'test-hostname',
'meta-data/public-keys': 'test-hostname',
'user-data': b'foodata',
- }
+ }
valid_order = [
'meta-data/local-hostname',
'meta-data/instance-id',
'meta-data/public-keys',
'user-data',
- ]
+ ]
my_seed = "http://example.com/xmeta"
my_ver = "1999-99-99"
my_headers = {'header1': 'value1', 'header2': 'value2'}
diff --git a/tests/unittests/test_datasource/test_nocloud.py b/tests/unittests/test_datasource/test_nocloud.py
index 2d5fc37c..b0fa1130 100644
--- a/tests/unittests/test_datasource/test_nocloud.py
+++ b/tests/unittests/test_datasource/test_nocloud.py
@@ -1,22 +1,13 @@
from cloudinit import helpers
from cloudinit.sources import DataSourceNoCloud
from cloudinit import util
-from ..helpers import TestCase, populate_dir
+from ..helpers import TestCase, populate_dir, mock, ExitStack
import os
-import yaml
import shutil
import tempfile
-import unittest
-try:
- from unittest import mock
-except ImportError:
- import mock
-try:
- from contextlib import ExitStack
-except ImportError:
- from contextlib2 import ExitStack
+import yaml
class TestNoCloudDataSource(TestCase):
@@ -139,7 +130,7 @@ class TestNoCloudDataSource(TestCase):
self.assertTrue(ret)
-class TestParseCommandLineData(unittest.TestCase):
+class TestParseCommandLineData(TestCase):
def test_parse_cmdline_data_valid(self):
ds_id = "ds=nocloud"
diff --git a/tests/unittests/test_datasource/test_openstack.py b/tests/unittests/test_datasource/test_openstack.py
index 0aa1ba84..5c8592c5 100644
--- a/tests/unittests/test_datasource/test_openstack.py
+++ b/tests/unittests/test_datasource/test_openstack.py
@@ -22,8 +22,8 @@ import re
from .. import helpers as test_helpers
-from six import StringIO
from six.moves.urllib.parse import urlparse
+from six import StringIO
from cloudinit import helpers
from cloudinit import settings
@@ -135,41 +135,45 @@ def _register_uris(version, ec2_files, ec2_meta, os_files):
body=get_request_callback)
+def _read_metadata_service():
+ return ds.read_metadata_service(BASE_URL, retries=0, timeout=0.1)
+
+
class TestOpenStackDataSource(test_helpers.HttprettyTestCase):
VERSION = 'latest'
@hp.activate
def test_successful(self):
_register_uris(self.VERSION, EC2_FILES, EC2_META, OS_FILES)
- f = ds.read_metadata_service(BASE_URL)
- self.assertEquals(VENDOR_DATA, f.get('vendordata'))
- self.assertEquals(CONTENT_0, f['files']['/etc/foo.cfg'])
- self.assertEquals(CONTENT_1, f['files']['/etc/bar/bar.cfg'])
- self.assertEquals(2, len(f['files']))
- self.assertEquals(USER_DATA, f.get('userdata'))
- self.assertEquals(EC2_META, f.get('ec2-metadata'))
- self.assertEquals(2, f.get('version'))
+ f = _read_metadata_service()
+ self.assertEqual(VENDOR_DATA, f.get('vendordata'))
+ self.assertEqual(CONTENT_0, f['files']['/etc/foo.cfg'])
+ self.assertEqual(CONTENT_1, f['files']['/etc/bar/bar.cfg'])
+ self.assertEqual(2, len(f['files']))
+ self.assertEqual(USER_DATA, f.get('userdata'))
+ self.assertEqual(EC2_META, f.get('ec2-metadata'))
+ self.assertEqual(2, f.get('version'))
metadata = f['metadata']
- self.assertEquals('nova', metadata.get('availability_zone'))
- self.assertEquals('sm-foo-test.novalocal', metadata.get('hostname'))
- self.assertEquals('sm-foo-test.novalocal',
- metadata.get('local-hostname'))
- self.assertEquals('sm-foo-test', metadata.get('name'))
- self.assertEquals('b0fa911b-69d4-4476-bbe2-1c92bff6535c',
- metadata.get('uuid'))
- self.assertEquals('b0fa911b-69d4-4476-bbe2-1c92bff6535c',
- metadata.get('instance-id'))
+ self.assertEqual('nova', metadata.get('availability_zone'))
+ self.assertEqual('sm-foo-test.novalocal', metadata.get('hostname'))
+ self.assertEqual('sm-foo-test.novalocal',
+ metadata.get('local-hostname'))
+ self.assertEqual('sm-foo-test', metadata.get('name'))
+ self.assertEqual('b0fa911b-69d4-4476-bbe2-1c92bff6535c',
+ metadata.get('uuid'))
+ self.assertEqual('b0fa911b-69d4-4476-bbe2-1c92bff6535c',
+ metadata.get('instance-id'))
@hp.activate
def test_no_ec2(self):
_register_uris(self.VERSION, {}, {}, OS_FILES)
- f = ds.read_metadata_service(BASE_URL)
- self.assertEquals(VENDOR_DATA, f.get('vendordata'))
- self.assertEquals(CONTENT_0, f['files']['/etc/foo.cfg'])
- self.assertEquals(CONTENT_1, f['files']['/etc/bar/bar.cfg'])
- self.assertEquals(USER_DATA, f.get('userdata'))
- self.assertEquals({}, f.get('ec2-metadata'))
- self.assertEquals(2, f.get('version'))
+ f = _read_metadata_service()
+ self.assertEqual(VENDOR_DATA, f.get('vendordata'))
+ self.assertEqual(CONTENT_0, f['files']['/etc/foo.cfg'])
+ self.assertEqual(CONTENT_1, f['files']['/etc/bar/bar.cfg'])
+ self.assertEqual(USER_DATA, f.get('userdata'))
+ self.assertEqual({}, f.get('ec2-metadata'))
+ self.assertEqual(2, f.get('version'))
@hp.activate
def test_bad_metadata(self):
@@ -178,8 +182,7 @@ class TestOpenStackDataSource(test_helpers.HttprettyTestCase):
if k.endswith('meta_data.json'):
os_files.pop(k, None)
_register_uris(self.VERSION, {}, {}, os_files)
- self.assertRaises(openstack.NonReadable, ds.read_metadata_service,
- BASE_URL)
+ self.assertRaises(openstack.NonReadable, _read_metadata_service)
@hp.activate
def test_bad_uuid(self):
@@ -190,8 +193,7 @@ class TestOpenStackDataSource(test_helpers.HttprettyTestCase):
if k.endswith('meta_data.json'):
os_files[k] = json.dumps(os_meta)
_register_uris(self.VERSION, {}, {}, os_files)
- self.assertRaises(openstack.BrokenMetadata, ds.read_metadata_service,
- BASE_URL)
+ self.assertRaises(openstack.BrokenMetadata, _read_metadata_service)
@hp.activate
def test_userdata_empty(self):
@@ -200,10 +202,10 @@ class TestOpenStackDataSource(test_helpers.HttprettyTestCase):
if k.endswith('user_data'):
os_files.pop(k, None)
_register_uris(self.VERSION, {}, {}, os_files)
- f = ds.read_metadata_service(BASE_URL)
- self.assertEquals(VENDOR_DATA, f.get('vendordata'))
- self.assertEquals(CONTENT_0, f['files']['/etc/foo.cfg'])
- self.assertEquals(CONTENT_1, f['files']['/etc/bar/bar.cfg'])
+ f = _read_metadata_service()
+ self.assertEqual(VENDOR_DATA, f.get('vendordata'))
+ self.assertEqual(CONTENT_0, f['files']['/etc/foo.cfg'])
+ self.assertEqual(CONTENT_1, f['files']['/etc/bar/bar.cfg'])
self.assertFalse(f.get('userdata'))
@hp.activate
@@ -213,9 +215,9 @@ class TestOpenStackDataSource(test_helpers.HttprettyTestCase):
if k.endswith('vendor_data.json'):
os_files.pop(k, None)
_register_uris(self.VERSION, {}, {}, os_files)
- f = ds.read_metadata_service(BASE_URL)
- self.assertEquals(CONTENT_0, f['files']['/etc/foo.cfg'])
- self.assertEquals(CONTENT_1, f['files']['/etc/bar/bar.cfg'])
+ f = _read_metadata_service()
+ self.assertEqual(CONTENT_0, f['files']['/etc/foo.cfg'])
+ self.assertEqual(CONTENT_1, f['files']['/etc/bar/bar.cfg'])
self.assertFalse(f.get('vendordata'))
@hp.activate
@@ -225,8 +227,7 @@ class TestOpenStackDataSource(test_helpers.HttprettyTestCase):
if k.endswith('vendor_data.json'):
os_files[k] = '{' # some invalid json
_register_uris(self.VERSION, {}, {}, os_files)
- self.assertRaises(openstack.BrokenMetadata, ds.read_metadata_service,
- BASE_URL)
+ self.assertRaises(openstack.BrokenMetadata, _read_metadata_service)
@hp.activate
def test_metadata_invalid(self):
@@ -235,8 +236,7 @@ class TestOpenStackDataSource(test_helpers.HttprettyTestCase):
if k.endswith('meta_data.json'):
os_files[k] = '{' # some invalid json
_register_uris(self.VERSION, {}, {}, os_files)
- self.assertRaises(openstack.BrokenMetadata, ds.read_metadata_service,
- BASE_URL)
+ self.assertRaises(openstack.BrokenMetadata, _read_metadata_service)
@hp.activate
def test_datasource(self):
@@ -245,18 +245,18 @@ class TestOpenStackDataSource(test_helpers.HttprettyTestCase):
None,
helpers.Paths({}))
self.assertIsNone(ds_os.version)
- found = ds_os.get_data()
+ found = ds_os.get_data(timeout=0.1, retries=0)
self.assertTrue(found)
- self.assertEquals(2, ds_os.version)
+ self.assertEqual(2, ds_os.version)
md = dict(ds_os.metadata)
md.pop('instance-id', None)
md.pop('local-hostname', None)
- self.assertEquals(OSTACK_META, md)
- self.assertEquals(EC2_META, ds_os.ec2_metadata)
- self.assertEquals(USER_DATA, ds_os.userdata_raw)
- self.assertEquals(2, len(ds_os.files))
- self.assertEquals(VENDOR_DATA, ds_os.vendordata_pure)
- self.assertEquals(ds_os.vendordata_raw, None)
+ self.assertEqual(OSTACK_META, md)
+ self.assertEqual(EC2_META, ds_os.ec2_metadata)
+ self.assertEqual(USER_DATA, ds_os.userdata_raw)
+ self.assertEqual(2, len(ds_os.files))
+ self.assertEqual(VENDOR_DATA, ds_os.vendordata_pure)
+ self.assertEqual(ds_os.vendordata_raw, None)
@hp.activate
def test_bad_datasource_meta(self):
@@ -269,7 +269,7 @@ class TestOpenStackDataSource(test_helpers.HttprettyTestCase):
None,
helpers.Paths({}))
self.assertIsNone(ds_os.version)
- found = ds_os.get_data()
+ found = ds_os.get_data(timeout=0.1, retries=0)
self.assertFalse(found)
self.assertIsNone(ds_os.version)
@@ -288,7 +288,7 @@ class TestOpenStackDataSource(test_helpers.HttprettyTestCase):
'timeout': 0,
}
self.assertIsNone(ds_os.version)
- found = ds_os.get_data()
+ found = ds_os.get_data(timeout=0.1, retries=0)
self.assertFalse(found)
self.assertIsNone(ds_os.version)
@@ -311,7 +311,7 @@ class TestOpenStackDataSource(test_helpers.HttprettyTestCase):
'timeout': 0,
}
self.assertIsNone(ds_os.version)
- found = ds_os.get_data()
+ found = ds_os.get_data(timeout=0.1, retries=0)
self.assertFalse(found)
self.assertIsNone(ds_os.version)
diff --git a/tests/unittests/test_datasource/test_smartos.py b/tests/unittests/test_datasource/test_smartos.py
index 5c49966a..9c6c8768 100644
--- a/tests/unittests/test_datasource/test_smartos.py
+++ b/tests/unittests/test_datasource/test_smartos.py
@@ -24,6 +24,8 @@
from __future__ import print_function
+from binascii import crc32
+import json
import os
import os.path
import re
@@ -31,21 +33,58 @@ import shutil
import stat
import tempfile
import uuid
-from binascii import crc32
-import serial
+from cloudinit import serial
+from cloudinit.sources import DataSourceSmartOS
+
import six
from cloudinit import helpers as c_helpers
-from cloudinit.sources import DataSourceSmartOS
from cloudinit.util import b64e
-from .. import helpers
-
-try:
- from unittest import mock
-except ImportError:
- import mock
+from ..helpers import mock, FilesystemMockingTestCase, TestCase
+
+SDC_NICS = json.loads("""
+[
+ {
+ "nic_tag": "external",
+ "primary": true,
+ "mtu": 1500,
+ "model": "virtio",
+ "gateway": "8.12.42.1",
+ "netmask": "255.255.255.0",
+ "ip": "8.12.42.102",
+ "network_uuid": "992fc7ce-6aac-4b74-aed6-7b9d2c6c0bfe",
+ "gateways": [
+ "8.12.42.1"
+ ],
+ "vlan_id": 324,
+ "mac": "90:b8:d0:f5:e4:f5",
+ "interface": "net0",
+ "ips": [
+ "8.12.42.102/24"
+ ]
+ },
+ {
+ "nic_tag": "sdc_overlay/16187209",
+ "gateway": "192.168.128.1",
+ "model": "virtio",
+ "mac": "90:b8:d0:a5:ff:cd",
+ "netmask": "255.255.252.0",
+ "ip": "192.168.128.93",
+ "network_uuid": "4cad71da-09bc-452b-986d-03562a03a0a9",
+ "gateways": [
+ "192.168.128.1"
+ ],
+ "vlan_id": 2,
+ "mtu": 8500,
+ "interface": "net1",
+ "ips": [
+ "192.168.128.93/22"
+ ]
+ }
+]
+""")
MOCK_RETURNS = {
'hostname': 'test-host',
@@ -60,79 +99,68 @@ MOCK_RETURNS = {
'sdc:vendor-data': '\n'.join(['VENDOR_DATA', '']),
'user-data': '\n'.join(['something', '']),
'user-script': '\n'.join(['/bin/true', '']),
+ 'sdc:nics': json.dumps(SDC_NICS),
}
DMI_DATA_RETURN = 'smartdc'
-def get_mock_client(mockdata):
- class MockMetadataClient(object):
+class PsuedoJoyentClient(object):
+ def __init__(self, data=None):
+ if data is None:
+ data = MOCK_RETURNS.copy()
+ self.data = data
+ return
+
+ def get(self, key, default=None, strip=False):
+ if key in self.data:
+ r = self.data[key]
+ if strip:
+ r = r.strip()
+ else:
+ r = default
+ return r
- def __init__(self, serial):
- pass
+ def get_json(self, key, default=None):
+ result = self.get(key, default=default)
+ if result is None:
+ return default
+ return json.loads(result)
- def get_metadata(self, metadata_key):
- return mockdata.get(metadata_key)
- return MockMetadataClient
+ def exists(self):
+ return True
-class TestSmartOSDataSource(helpers.FilesystemMockingTestCase):
+class TestSmartOSDataSource(FilesystemMockingTestCase):
def setUp(self):
super(TestSmartOSDataSource, self).setUp()
+ dsmos = 'cloudinit.sources.DataSourceSmartOS'
+ patcher = mock.patch(dsmos + ".jmc_client_factory")
+ self.jmc_cfact = patcher.start()
+ self.addCleanup(patcher.stop)
+ patcher = mock.patch(dsmos + ".get_smartos_environ")
+ self.get_smartos_environ = patcher.start()
+ self.addCleanup(patcher.stop)
+
self.tmp = tempfile.mkdtemp()
self.addCleanup(shutil.rmtree, self.tmp)
- self.legacy_user_d = tempfile.mkdtemp()
- self.addCleanup(shutil.rmtree, self.legacy_user_d)
-
- # If you should want to watch the logs...
- self._log = None
- self._log_file = None
- self._log_handler = None
-
- # patch cloud_dir, so our 'seed_dir' is guaranteed empty
self.paths = c_helpers.Paths({'cloud_dir': self.tmp})
- self.unapply = []
- super(TestSmartOSDataSource, self).setUp()
+ self.legacy_user_d = os.path.join(self.tmp, 'legacy_user_tmp')
+ os.mkdir(self.legacy_user_d)
+
+ self.orig_lud = DataSourceSmartOS.LEGACY_USER_D
+ DataSourceSmartOS.LEGACY_USER_D = self.legacy_user_d
def tearDown(self):
- helpers.FilesystemMockingTestCase.tearDown(self)
- if self._log_handler and self._log:
- self._log.removeHandler(self._log_handler)
- apply_patches([i for i in reversed(self.unapply)])
+ DataSourceSmartOS.LEGACY_USER_D = self.orig_lud
super(TestSmartOSDataSource, self).tearDown()
- def _patchIn(self, root):
- self.restore()
- self.patchOS(root)
- self.patchUtils(root)
-
- def apply_patches(self, patches):
- ret = apply_patches(patches)
- self.unapply += ret
-
- def _get_ds(self, sys_cfg=None, ds_cfg=None, mockdata=None, dmi_data=None,
- is_lxbrand=False):
- mod = DataSourceSmartOS
-
- if mockdata is None:
- mockdata = MOCK_RETURNS
-
- if dmi_data is None:
- dmi_data = DMI_DATA_RETURN
-
- def _dmi_data():
- return dmi_data
-
- def _os_uname():
- if not is_lxbrand:
- # LP: #1243287. tests assume this runs, but running test on
- # arm would cause them all to fail.
- return ('LINUX', 'NODENAME', 'RELEASE', 'VERSION', 'x86_64')
- else:
- return ('LINUX', 'NODENAME', 'RELEASE', 'BRANDZ VIRTUAL LINUX',
- 'X86_64')
+ def _get_ds(self, mockdata=None, mode=DataSourceSmartOS.SMARTOS_ENV_KVM,
+ sys_cfg=None, ds_cfg=None):
+ self.jmc_cfact.return_value = PsuedoJoyentClient(mockdata)
+ self.get_smartos_environ.return_value = mode
if sys_cfg is None:
sys_cfg = {}
@@ -141,44 +169,8 @@ class TestSmartOSDataSource(helpers.FilesystemMockingTestCase):
sys_cfg['datasource'] = sys_cfg.get('datasource', {})
sys_cfg['datasource']['SmartOS'] = ds_cfg
- self.apply_patches([(mod, 'LEGACY_USER_D', self.legacy_user_d)])
- self.apply_patches([
- (mod, 'JoyentMetadataClient', get_mock_client(mockdata))])
- self.apply_patches([(mod, 'dmi_data', _dmi_data)])
- self.apply_patches([(os, 'uname', _os_uname)])
- self.apply_patches([(mod, 'device_exists', lambda d: True)])
- dsrc = mod.DataSourceSmartOS(sys_cfg, distro=None,
- paths=self.paths)
- self.apply_patches([(dsrc, '_get_seed_file_object', mock.MagicMock())])
- return dsrc
-
- def test_seed(self):
- # default seed should be /dev/ttyS1
- dsrc = self._get_ds()
- ret = dsrc.get_data()
- self.assertTrue(ret)
- self.assertEquals('kvm', dsrc.smartos_type)
- self.assertEquals('/dev/ttyS1', dsrc.seed)
-
- def test_seed_lxbrand(self):
- # default seed should be /dev/ttyS1
- dsrc = self._get_ds(is_lxbrand=True)
- ret = dsrc.get_data()
- self.assertTrue(ret)
- self.assertEquals('lx-brand', dsrc.smartos_type)
- self.assertEquals('/native/.zonecontrol/metadata.sock', dsrc.seed)
-
- def test_issmartdc(self):
- dsrc = self._get_ds()
- ret = dsrc.get_data()
- self.assertTrue(ret)
- self.assertTrue(dsrc.is_smartdc)
-
- def test_issmartdc_lxbrand(self):
- dsrc = self._get_ds(is_lxbrand=True)
- ret = dsrc.get_data()
- self.assertTrue(ret)
- self.assertTrue(dsrc.is_smartdc)
+ return DataSourceSmartOS.DataSourceSmartOS(
+ sys_cfg, distro=None, paths=self.paths)
def test_no_base64(self):
ds_cfg = {'no_base64_decode': ['test_var1'], 'all_base': True}
@@ -190,110 +182,65 @@ class TestSmartOSDataSource(helpers.FilesystemMockingTestCase):
dsrc = self._get_ds(mockdata=MOCK_RETURNS)
ret = dsrc.get_data()
self.assertTrue(ret)
- self.assertEquals(MOCK_RETURNS['sdc:uuid'],
- dsrc.metadata['instance-id'])
+ self.assertEqual(MOCK_RETURNS['sdc:uuid'],
+ dsrc.metadata['instance-id'])
def test_root_keys(self):
dsrc = self._get_ds(mockdata=MOCK_RETURNS)
ret = dsrc.get_data()
self.assertTrue(ret)
- self.assertEquals(MOCK_RETURNS['root_authorized_keys'],
- dsrc.metadata['public-keys'])
+ self.assertEqual(MOCK_RETURNS['root_authorized_keys'],
+ dsrc.metadata['public-keys'])
def test_hostname_b64(self):
dsrc = self._get_ds(mockdata=MOCK_RETURNS)
ret = dsrc.get_data()
self.assertTrue(ret)
- self.assertEquals(MOCK_RETURNS['hostname'],
- dsrc.metadata['local-hostname'])
+ self.assertEqual(MOCK_RETURNS['hostname'],
+ dsrc.metadata['local-hostname'])
def test_hostname(self):
dsrc = self._get_ds(mockdata=MOCK_RETURNS)
ret = dsrc.get_data()
self.assertTrue(ret)
- self.assertEquals(MOCK_RETURNS['hostname'],
- dsrc.metadata['local-hostname'])
-
- def test_base64_all(self):
- # metadata provided base64_all of true
- my_returns = MOCK_RETURNS.copy()
- my_returns['base64_all'] = "true"
- for k in ('hostname', 'cloud-init:user-data'):
- my_returns[k] = b64e(my_returns[k])
-
- dsrc = self._get_ds(mockdata=my_returns)
- ret = dsrc.get_data()
- self.assertTrue(ret)
- self.assertEquals(MOCK_RETURNS['hostname'],
- dsrc.metadata['local-hostname'])
- self.assertEquals(MOCK_RETURNS['cloud-init:user-data'],
- dsrc.userdata_raw)
- self.assertEquals(MOCK_RETURNS['root_authorized_keys'],
- dsrc.metadata['public-keys'])
- self.assertEquals(MOCK_RETURNS['disable_iptables_flag'],
- dsrc.metadata['iptables_disable'])
- self.assertEquals(MOCK_RETURNS['enable_motd_sys_info'],
- dsrc.metadata['motd_sys_info'])
-
- def test_b64_userdata(self):
- my_returns = MOCK_RETURNS.copy()
- my_returns['b64-cloud-init:user-data'] = "true"
- my_returns['b64-hostname'] = "true"
- for k in ('hostname', 'cloud-init:user-data'):
- my_returns[k] = b64e(my_returns[k])
+ self.assertEqual(MOCK_RETURNS['hostname'],
+ dsrc.metadata['local-hostname'])
- dsrc = self._get_ds(mockdata=my_returns)
- ret = dsrc.get_data()
- self.assertTrue(ret)
- self.assertEquals(MOCK_RETURNS['hostname'],
- dsrc.metadata['local-hostname'])
- self.assertEquals(MOCK_RETURNS['cloud-init:user-data'],
- dsrc.userdata_raw)
- self.assertEquals(MOCK_RETURNS['root_authorized_keys'],
- dsrc.metadata['public-keys'])
-
- def test_b64_keys(self):
- my_returns = MOCK_RETURNS.copy()
- my_returns['base64_keys'] = 'hostname,ignored'
- for k in ('hostname',):
- my_returns[k] = b64e(my_returns[k])
-
- dsrc = self._get_ds(mockdata=my_returns)
+ def test_userdata(self):
+ dsrc = self._get_ds(mockdata=MOCK_RETURNS)
ret = dsrc.get_data()
self.assertTrue(ret)
- self.assertEquals(MOCK_RETURNS['hostname'],
- dsrc.metadata['local-hostname'])
- self.assertEquals(MOCK_RETURNS['cloud-init:user-data'],
- dsrc.userdata_raw)
+ self.assertEqual(MOCK_RETURNS['user-data'],
+ dsrc.metadata['legacy-user-data'])
+ self.assertEqual(MOCK_RETURNS['cloud-init:user-data'],
+ dsrc.userdata_raw)
- def test_userdata(self):
+ def test_sdc_nics(self):
dsrc = self._get_ds(mockdata=MOCK_RETURNS)
ret = dsrc.get_data()
self.assertTrue(ret)
- self.assertEquals(MOCK_RETURNS['user-data'],
- dsrc.metadata['legacy-user-data'])
- self.assertEquals(MOCK_RETURNS['cloud-init:user-data'],
- dsrc.userdata_raw)
+ self.assertEqual(json.loads(MOCK_RETURNS['sdc:nics']),
+ dsrc.metadata['network-data'])
def test_sdc_scripts(self):
dsrc = self._get_ds(mockdata=MOCK_RETURNS)
ret = dsrc.get_data()
self.assertTrue(ret)
- self.assertEquals(MOCK_RETURNS['user-script'],
- dsrc.metadata['user-script'])
+ self.assertEqual(MOCK_RETURNS['user-script'],
+ dsrc.metadata['user-script'])
legacy_script_f = "%s/user-script" % self.legacy_user_d
self.assertTrue(os.path.exists(legacy_script_f))
self.assertTrue(os.path.islink(legacy_script_f))
user_script_perm = oct(os.stat(legacy_script_f)[stat.ST_MODE])[-3:]
- self.assertEquals(user_script_perm, '700')
+ self.assertEqual(user_script_perm, '700')
def test_scripts_shebanged(self):
dsrc = self._get_ds(mockdata=MOCK_RETURNS)
ret = dsrc.get_data()
self.assertTrue(ret)
- self.assertEquals(MOCK_RETURNS['user-script'],
- dsrc.metadata['user-script'])
+ self.assertEqual(MOCK_RETURNS['user-script'],
+ dsrc.metadata['user-script'])
legacy_script_f = "%s/user-script" % self.legacy_user_d
self.assertTrue(os.path.exists(legacy_script_f))
@@ -301,9 +248,9 @@ class TestSmartOSDataSource(helpers.FilesystemMockingTestCase):
shebang = None
with open(legacy_script_f, 'r') as f:
shebang = f.readlines()[0].strip()
- self.assertEquals(shebang, "#!/bin/bash")
+ self.assertEqual(shebang, "#!/bin/bash")
user_script_perm = oct(os.stat(legacy_script_f)[stat.ST_MODE])[-3:]
- self.assertEquals(user_script_perm, '700')
+ self.assertEqual(user_script_perm, '700')
def test_scripts_shebang_not_added(self):
"""
@@ -319,8 +266,8 @@ class TestSmartOSDataSource(helpers.FilesystemMockingTestCase):
dsrc = self._get_ds(mockdata=my_returns)
ret = dsrc.get_data()
self.assertTrue(ret)
- self.assertEquals(my_returns['user-script'],
- dsrc.metadata['user-script'])
+ self.assertEqual(my_returns['user-script'],
+ dsrc.metadata['user-script'])
legacy_script_f = "%s/user-script" % self.legacy_user_d
self.assertTrue(os.path.exists(legacy_script_f))
@@ -328,7 +275,7 @@ class TestSmartOSDataSource(helpers.FilesystemMockingTestCase):
shebang = None
with open(legacy_script_f, 'r') as f:
shebang = f.readlines()[0].strip()
- self.assertEquals(shebang, "#!/usr/bin/perl")
+ self.assertEqual(shebang, "#!/usr/bin/perl")
def test_userdata_removed(self):
"""
@@ -358,7 +305,7 @@ class TestSmartOSDataSource(helpers.FilesystemMockingTestCase):
if re.match(r'.*\/mdata-user-data$', name_f):
found_new = True
print(name_f)
- self.assertEquals(permissions, '400')
+ self.assertEqual(permissions, '400')
self.assertFalse(found_new)
@@ -366,8 +313,8 @@ class TestSmartOSDataSource(helpers.FilesystemMockingTestCase):
dsrc = self._get_ds(mockdata=MOCK_RETURNS)
ret = dsrc.get_data()
self.assertTrue(ret)
- self.assertEquals(MOCK_RETURNS['sdc:vendor-data'],
- dsrc.metadata['vendor-data'])
+ self.assertEqual(MOCK_RETURNS['sdc:vendor-data'],
+ dsrc.metadata['vendor-data'])
def test_default_vendor_data(self):
my_returns = MOCK_RETURNS.copy()
@@ -376,7 +323,7 @@ class TestSmartOSDataSource(helpers.FilesystemMockingTestCase):
dsrc = self._get_ds(mockdata=my_returns)
ret = dsrc.get_data()
self.assertTrue(ret)
- self.assertNotEquals(def_op_script, dsrc.metadata['vendor-data'])
+ self.assertNotEqual(def_op_script, dsrc.metadata['vendor-data'])
# we expect default vendor-data is a boothook
self.assertTrue(dsrc.vendordata_raw.startswith("#cloud-boothook"))
@@ -385,15 +332,15 @@ class TestSmartOSDataSource(helpers.FilesystemMockingTestCase):
dsrc = self._get_ds(mockdata=MOCK_RETURNS)
ret = dsrc.get_data()
self.assertTrue(ret)
- self.assertEquals(MOCK_RETURNS['disable_iptables_flag'],
- dsrc.metadata['iptables_disable'])
+ self.assertEqual(MOCK_RETURNS['disable_iptables_flag'],
+ dsrc.metadata['iptables_disable'])
def test_motd_sys_info(self):
dsrc = self._get_ds(mockdata=MOCK_RETURNS)
ret = dsrc.get_data()
self.assertTrue(ret)
- self.assertEquals(MOCK_RETURNS['enable_motd_sys_info'],
- dsrc.metadata['motd_sys_info'])
+ self.assertEqual(MOCK_RETURNS['enable_motd_sys_info'],
+ dsrc.metadata['motd_sys_info'])
def test_default_ephemeral(self):
# Test to make sure that the builtin config has the ephemeral
@@ -430,21 +377,11 @@ class TestSmartOSDataSource(helpers.FilesystemMockingTestCase):
mydscfg['disk_aliases']['FOO'])
-def apply_patches(patches):
- ret = []
- for (ref, name, replace) in patches:
- if replace is None:
- continue
- orig = getattr(ref, name)
- setattr(ref, name, replace)
- ret.append((ref, name, orig))
- return ret
-
-
-class TestJoyentMetadataClient(helpers.FilesystemMockingTestCase):
+class TestJoyentMetadataClient(FilesystemMockingTestCase):
def setUp(self):
super(TestJoyentMetadataClient, self).setUp()
+
self.serial = mock.MagicMock(spec=serial.Serial)
self.request_id = 0xabcdef12
self.metadata_value = 'value'
@@ -481,7 +418,8 @@ class TestJoyentMetadataClient(helpers.FilesystemMockingTestCase):
mock.Mock(return_value=self.request_id)))
def _get_client(self):
- return DataSourceSmartOS.JoyentMetadataClient(self.serial)
+ return DataSourceSmartOS.JoyentMetadataClient(
+ fp=self.serial, smartos_type=DataSourceSmartOS.SMARTOS_ENV_KVM)
def assertEndsWith(self, haystack, prefix):
self.assertTrue(haystack.endswith(prefix),
@@ -495,7 +433,7 @@ class TestJoyentMetadataClient(helpers.FilesystemMockingTestCase):
def test_get_metadata_writes_a_single_line(self):
client = self._get_client()
- client.get_metadata('some_key')
+ client.get('some_key')
self.assertEqual(1, self.serial.write.call_count)
written_line = self.serial.write.call_args[0][0]
print(type(written_line))
@@ -505,7 +443,7 @@ class TestJoyentMetadataClient(helpers.FilesystemMockingTestCase):
def _get_written_line(self, key='some_key'):
client = self._get_client()
- client.get_metadata(key)
+ client.get(key)
return self.serial.write.call_args[0][0]
def test_get_metadata_writes_bytes(self):
@@ -549,32 +487,32 @@ class TestJoyentMetadataClient(helpers.FilesystemMockingTestCase):
def test_get_metadata_reads_a_line(self):
client = self._get_client()
- client.get_metadata('some_key')
+ client.get('some_key')
self.assertEqual(self.metasource_data_len, self.serial.read.call_count)
def test_get_metadata_returns_valid_value(self):
client = self._get_client()
- value = client.get_metadata('some_key')
+ value = client.get('some_key')
self.assertEqual(self.metadata_value, value)
def test_get_metadata_throws_exception_for_incorrect_length(self):
self.response_parts['length'] = 0
client = self._get_client()
self.assertRaises(DataSourceSmartOS.JoyentMetadataFetchException,
- client.get_metadata, 'some_key')
+ client.get, 'some_key')
def test_get_metadata_throws_exception_for_incorrect_crc(self):
self.response_parts['crc'] = 'deadbeef'
client = self._get_client()
self.assertRaises(DataSourceSmartOS.JoyentMetadataFetchException,
- client.get_metadata, 'some_key')
+ client.get, 'some_key')
def test_get_metadata_throws_exception_for_request_id_mismatch(self):
self.response_parts['request_id'] = 'deadbeef'
client = self._get_client()
client._checksum = lambda _: self.response_parts['crc']
self.assertRaises(DataSourceSmartOS.JoyentMetadataFetchException,
- client.get_metadata, 'some_key')
+ client.get, 'some_key')
def test_get_metadata_returns_None_if_value_not_found(self):
self.response_parts['payload'] = ''
@@ -582,4 +520,24 @@ class TestJoyentMetadataClient(helpers.FilesystemMockingTestCase):
self.response_parts['length'] = 17
client = self._get_client()
client._checksum = lambda _: self.response_parts['crc']
- self.assertIsNone(client.get_metadata('some_key'))
+ self.assertIsNone(client.get('some_key'))
+
+
+class TestNetworkConversion(TestCase):
+
+ def test_convert_simple(self):
+ expected = {
+ 'version': 1,
+ 'config': [
+ {'name': 'net0', 'type': 'physical',
+ 'subnets': [{'type': 'static', 'gateway': '8.12.42.1',
+ 'netmask': '255.255.255.0',
+ 'address': '8.12.42.102/24'}],
+ 'mtu': 1500, 'mac_address': '90:b8:d0:f5:e4:f5'},
+ {'name': 'net1', 'type': 'physical',
+ 'subnets': [{'type': 'static', 'gateway': '192.168.128.1',
+ 'netmask': '255.255.252.0',
+ 'address': '192.168.128.93/22'}],
+ 'mtu': 8500, 'mac_address': '90:b8:d0:a5:ff:cd'}]}
+ found = DataSourceSmartOS.convert_smartos_network_data(SDC_NICS)
+ self.assertEqual(expected, found)