summaryrefslogtreecommitdiff
path: root/tests/unittests/test_datasource/test_smartos.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/unittests/test_datasource/test_smartos.py')
-rw-r--r--tests/unittests/test_datasource/test_smartos.py360
1 files changed, 159 insertions, 201 deletions
diff --git a/tests/unittests/test_datasource/test_smartos.py b/tests/unittests/test_datasource/test_smartos.py
index 5c49966a..9c6c8768 100644
--- a/tests/unittests/test_datasource/test_smartos.py
+++ b/tests/unittests/test_datasource/test_smartos.py
@@ -24,6 +24,8 @@
from __future__ import print_function
+from binascii import crc32
+import json
import os
import os.path
import re
@@ -31,21 +33,58 @@ import shutil
import stat
import tempfile
import uuid
-from binascii import crc32
-import serial
+from cloudinit import serial
+from cloudinit.sources import DataSourceSmartOS
+
import six
from cloudinit import helpers as c_helpers
-from cloudinit.sources import DataSourceSmartOS
from cloudinit.util import b64e
-from .. import helpers
-
-try:
- from unittest import mock
-except ImportError:
- import mock
+from ..helpers import mock, FilesystemMockingTestCase, TestCase
+
+SDC_NICS = json.loads("""
+[
+ {
+ "nic_tag": "external",
+ "primary": true,
+ "mtu": 1500,
+ "model": "virtio",
+ "gateway": "8.12.42.1",
+ "netmask": "255.255.255.0",
+ "ip": "8.12.42.102",
+ "network_uuid": "992fc7ce-6aac-4b74-aed6-7b9d2c6c0bfe",
+ "gateways": [
+ "8.12.42.1"
+ ],
+ "vlan_id": 324,
+ "mac": "90:b8:d0:f5:e4:f5",
+ "interface": "net0",
+ "ips": [
+ "8.12.42.102/24"
+ ]
+ },
+ {
+ "nic_tag": "sdc_overlay/16187209",
+ "gateway": "192.168.128.1",
+ "model": "virtio",
+ "mac": "90:b8:d0:a5:ff:cd",
+ "netmask": "255.255.252.0",
+ "ip": "192.168.128.93",
+ "network_uuid": "4cad71da-09bc-452b-986d-03562a03a0a9",
+ "gateways": [
+ "192.168.128.1"
+ ],
+ "vlan_id": 2,
+ "mtu": 8500,
+ "interface": "net1",
+ "ips": [
+ "192.168.128.93/22"
+ ]
+ }
+]
+""")
MOCK_RETURNS = {
'hostname': 'test-host',
@@ -60,79 +99,68 @@ MOCK_RETURNS = {
'sdc:vendor-data': '\n'.join(['VENDOR_DATA', '']),
'user-data': '\n'.join(['something', '']),
'user-script': '\n'.join(['/bin/true', '']),
+ 'sdc:nics': json.dumps(SDC_NICS),
}
DMI_DATA_RETURN = 'smartdc'
-def get_mock_client(mockdata):
- class MockMetadataClient(object):
+class PsuedoJoyentClient(object):
+ def __init__(self, data=None):
+ if data is None:
+ data = MOCK_RETURNS.copy()
+ self.data = data
+ return
+
+ def get(self, key, default=None, strip=False):
+ if key in self.data:
+ r = self.data[key]
+ if strip:
+ r = r.strip()
+ else:
+ r = default
+ return r
- def __init__(self, serial):
- pass
+ def get_json(self, key, default=None):
+ result = self.get(key, default=default)
+ if result is None:
+ return default
+ return json.loads(result)
- def get_metadata(self, metadata_key):
- return mockdata.get(metadata_key)
- return MockMetadataClient
+ def exists(self):
+ return True
-class TestSmartOSDataSource(helpers.FilesystemMockingTestCase):
+class TestSmartOSDataSource(FilesystemMockingTestCase):
def setUp(self):
super(TestSmartOSDataSource, self).setUp()
+ dsmos = 'cloudinit.sources.DataSourceSmartOS'
+ patcher = mock.patch(dsmos + ".jmc_client_factory")
+ self.jmc_cfact = patcher.start()
+ self.addCleanup(patcher.stop)
+ patcher = mock.patch(dsmos + ".get_smartos_environ")
+ self.get_smartos_environ = patcher.start()
+ self.addCleanup(patcher.stop)
+
self.tmp = tempfile.mkdtemp()
self.addCleanup(shutil.rmtree, self.tmp)
- self.legacy_user_d = tempfile.mkdtemp()
- self.addCleanup(shutil.rmtree, self.legacy_user_d)
-
- # If you should want to watch the logs...
- self._log = None
- self._log_file = None
- self._log_handler = None
-
- # patch cloud_dir, so our 'seed_dir' is guaranteed empty
self.paths = c_helpers.Paths({'cloud_dir': self.tmp})
- self.unapply = []
- super(TestSmartOSDataSource, self).setUp()
+ self.legacy_user_d = os.path.join(self.tmp, 'legacy_user_tmp')
+ os.mkdir(self.legacy_user_d)
+
+ self.orig_lud = DataSourceSmartOS.LEGACY_USER_D
+ DataSourceSmartOS.LEGACY_USER_D = self.legacy_user_d
def tearDown(self):
- helpers.FilesystemMockingTestCase.tearDown(self)
- if self._log_handler and self._log:
- self._log.removeHandler(self._log_handler)
- apply_patches([i for i in reversed(self.unapply)])
+ DataSourceSmartOS.LEGACY_USER_D = self.orig_lud
super(TestSmartOSDataSource, self).tearDown()
- def _patchIn(self, root):
- self.restore()
- self.patchOS(root)
- self.patchUtils(root)
-
- def apply_patches(self, patches):
- ret = apply_patches(patches)
- self.unapply += ret
-
- def _get_ds(self, sys_cfg=None, ds_cfg=None, mockdata=None, dmi_data=None,
- is_lxbrand=False):
- mod = DataSourceSmartOS
-
- if mockdata is None:
- mockdata = MOCK_RETURNS
-
- if dmi_data is None:
- dmi_data = DMI_DATA_RETURN
-
- def _dmi_data():
- return dmi_data
-
- def _os_uname():
- if not is_lxbrand:
- # LP: #1243287. tests assume this runs, but running test on
- # arm would cause them all to fail.
- return ('LINUX', 'NODENAME', 'RELEASE', 'VERSION', 'x86_64')
- else:
- return ('LINUX', 'NODENAME', 'RELEASE', 'BRANDZ VIRTUAL LINUX',
- 'X86_64')
+ def _get_ds(self, mockdata=None, mode=DataSourceSmartOS.SMARTOS_ENV_KVM,
+ sys_cfg=None, ds_cfg=None):
+ self.jmc_cfact.return_value = PsuedoJoyentClient(mockdata)
+ self.get_smartos_environ.return_value = mode
if sys_cfg is None:
sys_cfg = {}
@@ -141,44 +169,8 @@ class TestSmartOSDataSource(helpers.FilesystemMockingTestCase):
sys_cfg['datasource'] = sys_cfg.get('datasource', {})
sys_cfg['datasource']['SmartOS'] = ds_cfg
- self.apply_patches([(mod, 'LEGACY_USER_D', self.legacy_user_d)])
- self.apply_patches([
- (mod, 'JoyentMetadataClient', get_mock_client(mockdata))])
- self.apply_patches([(mod, 'dmi_data', _dmi_data)])
- self.apply_patches([(os, 'uname', _os_uname)])
- self.apply_patches([(mod, 'device_exists', lambda d: True)])
- dsrc = mod.DataSourceSmartOS(sys_cfg, distro=None,
- paths=self.paths)
- self.apply_patches([(dsrc, '_get_seed_file_object', mock.MagicMock())])
- return dsrc
-
- def test_seed(self):
- # default seed should be /dev/ttyS1
- dsrc = self._get_ds()
- ret = dsrc.get_data()
- self.assertTrue(ret)
- self.assertEquals('kvm', dsrc.smartos_type)
- self.assertEquals('/dev/ttyS1', dsrc.seed)
-
- def test_seed_lxbrand(self):
- # default seed should be /dev/ttyS1
- dsrc = self._get_ds(is_lxbrand=True)
- ret = dsrc.get_data()
- self.assertTrue(ret)
- self.assertEquals('lx-brand', dsrc.smartos_type)
- self.assertEquals('/native/.zonecontrol/metadata.sock', dsrc.seed)
-
- def test_issmartdc(self):
- dsrc = self._get_ds()
- ret = dsrc.get_data()
- self.assertTrue(ret)
- self.assertTrue(dsrc.is_smartdc)
-
- def test_issmartdc_lxbrand(self):
- dsrc = self._get_ds(is_lxbrand=True)
- ret = dsrc.get_data()
- self.assertTrue(ret)
- self.assertTrue(dsrc.is_smartdc)
+ return DataSourceSmartOS.DataSourceSmartOS(
+ sys_cfg, distro=None, paths=self.paths)
def test_no_base64(self):
ds_cfg = {'no_base64_decode': ['test_var1'], 'all_base': True}
@@ -190,110 +182,65 @@ class TestSmartOSDataSource(helpers.FilesystemMockingTestCase):
dsrc = self._get_ds(mockdata=MOCK_RETURNS)
ret = dsrc.get_data()
self.assertTrue(ret)
- self.assertEquals(MOCK_RETURNS['sdc:uuid'],
- dsrc.metadata['instance-id'])
+ self.assertEqual(MOCK_RETURNS['sdc:uuid'],
+ dsrc.metadata['instance-id'])
def test_root_keys(self):
dsrc = self._get_ds(mockdata=MOCK_RETURNS)
ret = dsrc.get_data()
self.assertTrue(ret)
- self.assertEquals(MOCK_RETURNS['root_authorized_keys'],
- dsrc.metadata['public-keys'])
+ self.assertEqual(MOCK_RETURNS['root_authorized_keys'],
+ dsrc.metadata['public-keys'])
def test_hostname_b64(self):
dsrc = self._get_ds(mockdata=MOCK_RETURNS)
ret = dsrc.get_data()
self.assertTrue(ret)
- self.assertEquals(MOCK_RETURNS['hostname'],
- dsrc.metadata['local-hostname'])
+ self.assertEqual(MOCK_RETURNS['hostname'],
+ dsrc.metadata['local-hostname'])
def test_hostname(self):
dsrc = self._get_ds(mockdata=MOCK_RETURNS)
ret = dsrc.get_data()
self.assertTrue(ret)
- self.assertEquals(MOCK_RETURNS['hostname'],
- dsrc.metadata['local-hostname'])
-
- def test_base64_all(self):
- # metadata provided base64_all of true
- my_returns = MOCK_RETURNS.copy()
- my_returns['base64_all'] = "true"
- for k in ('hostname', 'cloud-init:user-data'):
- my_returns[k] = b64e(my_returns[k])
-
- dsrc = self._get_ds(mockdata=my_returns)
- ret = dsrc.get_data()
- self.assertTrue(ret)
- self.assertEquals(MOCK_RETURNS['hostname'],
- dsrc.metadata['local-hostname'])
- self.assertEquals(MOCK_RETURNS['cloud-init:user-data'],
- dsrc.userdata_raw)
- self.assertEquals(MOCK_RETURNS['root_authorized_keys'],
- dsrc.metadata['public-keys'])
- self.assertEquals(MOCK_RETURNS['disable_iptables_flag'],
- dsrc.metadata['iptables_disable'])
- self.assertEquals(MOCK_RETURNS['enable_motd_sys_info'],
- dsrc.metadata['motd_sys_info'])
-
- def test_b64_userdata(self):
- my_returns = MOCK_RETURNS.copy()
- my_returns['b64-cloud-init:user-data'] = "true"
- my_returns['b64-hostname'] = "true"
- for k in ('hostname', 'cloud-init:user-data'):
- my_returns[k] = b64e(my_returns[k])
+ self.assertEqual(MOCK_RETURNS['hostname'],
+ dsrc.metadata['local-hostname'])
- dsrc = self._get_ds(mockdata=my_returns)
- ret = dsrc.get_data()
- self.assertTrue(ret)
- self.assertEquals(MOCK_RETURNS['hostname'],
- dsrc.metadata['local-hostname'])
- self.assertEquals(MOCK_RETURNS['cloud-init:user-data'],
- dsrc.userdata_raw)
- self.assertEquals(MOCK_RETURNS['root_authorized_keys'],
- dsrc.metadata['public-keys'])
-
- def test_b64_keys(self):
- my_returns = MOCK_RETURNS.copy()
- my_returns['base64_keys'] = 'hostname,ignored'
- for k in ('hostname',):
- my_returns[k] = b64e(my_returns[k])
-
- dsrc = self._get_ds(mockdata=my_returns)
+ def test_userdata(self):
+ dsrc = self._get_ds(mockdata=MOCK_RETURNS)
ret = dsrc.get_data()
self.assertTrue(ret)
- self.assertEquals(MOCK_RETURNS['hostname'],
- dsrc.metadata['local-hostname'])
- self.assertEquals(MOCK_RETURNS['cloud-init:user-data'],
- dsrc.userdata_raw)
+ self.assertEqual(MOCK_RETURNS['user-data'],
+ dsrc.metadata['legacy-user-data'])
+ self.assertEqual(MOCK_RETURNS['cloud-init:user-data'],
+ dsrc.userdata_raw)
- def test_userdata(self):
+ def test_sdc_nics(self):
dsrc = self._get_ds(mockdata=MOCK_RETURNS)
ret = dsrc.get_data()
self.assertTrue(ret)
- self.assertEquals(MOCK_RETURNS['user-data'],
- dsrc.metadata['legacy-user-data'])
- self.assertEquals(MOCK_RETURNS['cloud-init:user-data'],
- dsrc.userdata_raw)
+ self.assertEqual(json.loads(MOCK_RETURNS['sdc:nics']),
+ dsrc.metadata['network-data'])
def test_sdc_scripts(self):
dsrc = self._get_ds(mockdata=MOCK_RETURNS)
ret = dsrc.get_data()
self.assertTrue(ret)
- self.assertEquals(MOCK_RETURNS['user-script'],
- dsrc.metadata['user-script'])
+ self.assertEqual(MOCK_RETURNS['user-script'],
+ dsrc.metadata['user-script'])
legacy_script_f = "%s/user-script" % self.legacy_user_d
self.assertTrue(os.path.exists(legacy_script_f))
self.assertTrue(os.path.islink(legacy_script_f))
user_script_perm = oct(os.stat(legacy_script_f)[stat.ST_MODE])[-3:]
- self.assertEquals(user_script_perm, '700')
+ self.assertEqual(user_script_perm, '700')
def test_scripts_shebanged(self):
dsrc = self._get_ds(mockdata=MOCK_RETURNS)
ret = dsrc.get_data()
self.assertTrue(ret)
- self.assertEquals(MOCK_RETURNS['user-script'],
- dsrc.metadata['user-script'])
+ self.assertEqual(MOCK_RETURNS['user-script'],
+ dsrc.metadata['user-script'])
legacy_script_f = "%s/user-script" % self.legacy_user_d
self.assertTrue(os.path.exists(legacy_script_f))
@@ -301,9 +248,9 @@ class TestSmartOSDataSource(helpers.FilesystemMockingTestCase):
shebang = None
with open(legacy_script_f, 'r') as f:
shebang = f.readlines()[0].strip()
- self.assertEquals(shebang, "#!/bin/bash")
+ self.assertEqual(shebang, "#!/bin/bash")
user_script_perm = oct(os.stat(legacy_script_f)[stat.ST_MODE])[-3:]
- self.assertEquals(user_script_perm, '700')
+ self.assertEqual(user_script_perm, '700')
def test_scripts_shebang_not_added(self):
"""
@@ -319,8 +266,8 @@ class TestSmartOSDataSource(helpers.FilesystemMockingTestCase):
dsrc = self._get_ds(mockdata=my_returns)
ret = dsrc.get_data()
self.assertTrue(ret)
- self.assertEquals(my_returns['user-script'],
- dsrc.metadata['user-script'])
+ self.assertEqual(my_returns['user-script'],
+ dsrc.metadata['user-script'])
legacy_script_f = "%s/user-script" % self.legacy_user_d
self.assertTrue(os.path.exists(legacy_script_f))
@@ -328,7 +275,7 @@ class TestSmartOSDataSource(helpers.FilesystemMockingTestCase):
shebang = None
with open(legacy_script_f, 'r') as f:
shebang = f.readlines()[0].strip()
- self.assertEquals(shebang, "#!/usr/bin/perl")
+ self.assertEqual(shebang, "#!/usr/bin/perl")
def test_userdata_removed(self):
"""
@@ -358,7 +305,7 @@ class TestSmartOSDataSource(helpers.FilesystemMockingTestCase):
if re.match(r'.*\/mdata-user-data$', name_f):
found_new = True
print(name_f)
- self.assertEquals(permissions, '400')
+ self.assertEqual(permissions, '400')
self.assertFalse(found_new)
@@ -366,8 +313,8 @@ class TestSmartOSDataSource(helpers.FilesystemMockingTestCase):
dsrc = self._get_ds(mockdata=MOCK_RETURNS)
ret = dsrc.get_data()
self.assertTrue(ret)
- self.assertEquals(MOCK_RETURNS['sdc:vendor-data'],
- dsrc.metadata['vendor-data'])
+ self.assertEqual(MOCK_RETURNS['sdc:vendor-data'],
+ dsrc.metadata['vendor-data'])
def test_default_vendor_data(self):
my_returns = MOCK_RETURNS.copy()
@@ -376,7 +323,7 @@ class TestSmartOSDataSource(helpers.FilesystemMockingTestCase):
dsrc = self._get_ds(mockdata=my_returns)
ret = dsrc.get_data()
self.assertTrue(ret)
- self.assertNotEquals(def_op_script, dsrc.metadata['vendor-data'])
+ self.assertNotEqual(def_op_script, dsrc.metadata['vendor-data'])
# we expect default vendor-data is a boothook
self.assertTrue(dsrc.vendordata_raw.startswith("#cloud-boothook"))
@@ -385,15 +332,15 @@ class TestSmartOSDataSource(helpers.FilesystemMockingTestCase):
dsrc = self._get_ds(mockdata=MOCK_RETURNS)
ret = dsrc.get_data()
self.assertTrue(ret)
- self.assertEquals(MOCK_RETURNS['disable_iptables_flag'],
- dsrc.metadata['iptables_disable'])
+ self.assertEqual(MOCK_RETURNS['disable_iptables_flag'],
+ dsrc.metadata['iptables_disable'])
def test_motd_sys_info(self):
dsrc = self._get_ds(mockdata=MOCK_RETURNS)
ret = dsrc.get_data()
self.assertTrue(ret)
- self.assertEquals(MOCK_RETURNS['enable_motd_sys_info'],
- dsrc.metadata['motd_sys_info'])
+ self.assertEqual(MOCK_RETURNS['enable_motd_sys_info'],
+ dsrc.metadata['motd_sys_info'])
def test_default_ephemeral(self):
# Test to make sure that the builtin config has the ephemeral
@@ -430,21 +377,11 @@ class TestSmartOSDataSource(helpers.FilesystemMockingTestCase):
mydscfg['disk_aliases']['FOO'])
-def apply_patches(patches):
- ret = []
- for (ref, name, replace) in patches:
- if replace is None:
- continue
- orig = getattr(ref, name)
- setattr(ref, name, replace)
- ret.append((ref, name, orig))
- return ret
-
-
-class TestJoyentMetadataClient(helpers.FilesystemMockingTestCase):
+class TestJoyentMetadataClient(FilesystemMockingTestCase):
def setUp(self):
super(TestJoyentMetadataClient, self).setUp()
+
self.serial = mock.MagicMock(spec=serial.Serial)
self.request_id = 0xabcdef12
self.metadata_value = 'value'
@@ -481,7 +418,8 @@ class TestJoyentMetadataClient(helpers.FilesystemMockingTestCase):
mock.Mock(return_value=self.request_id)))
def _get_client(self):
- return DataSourceSmartOS.JoyentMetadataClient(self.serial)
+ return DataSourceSmartOS.JoyentMetadataClient(
+ fp=self.serial, smartos_type=DataSourceSmartOS.SMARTOS_ENV_KVM)
def assertEndsWith(self, haystack, prefix):
self.assertTrue(haystack.endswith(prefix),
@@ -495,7 +433,7 @@ class TestJoyentMetadataClient(helpers.FilesystemMockingTestCase):
def test_get_metadata_writes_a_single_line(self):
client = self._get_client()
- client.get_metadata('some_key')
+ client.get('some_key')
self.assertEqual(1, self.serial.write.call_count)
written_line = self.serial.write.call_args[0][0]
print(type(written_line))
@@ -505,7 +443,7 @@ class TestJoyentMetadataClient(helpers.FilesystemMockingTestCase):
def _get_written_line(self, key='some_key'):
client = self._get_client()
- client.get_metadata(key)
+ client.get(key)
return self.serial.write.call_args[0][0]
def test_get_metadata_writes_bytes(self):
@@ -549,32 +487,32 @@ class TestJoyentMetadataClient(helpers.FilesystemMockingTestCase):
def test_get_metadata_reads_a_line(self):
client = self._get_client()
- client.get_metadata('some_key')
+ client.get('some_key')
self.assertEqual(self.metasource_data_len, self.serial.read.call_count)
def test_get_metadata_returns_valid_value(self):
client = self._get_client()
- value = client.get_metadata('some_key')
+ value = client.get('some_key')
self.assertEqual(self.metadata_value, value)
def test_get_metadata_throws_exception_for_incorrect_length(self):
self.response_parts['length'] = 0
client = self._get_client()
self.assertRaises(DataSourceSmartOS.JoyentMetadataFetchException,
- client.get_metadata, 'some_key')
+ client.get, 'some_key')
def test_get_metadata_throws_exception_for_incorrect_crc(self):
self.response_parts['crc'] = 'deadbeef'
client = self._get_client()
self.assertRaises(DataSourceSmartOS.JoyentMetadataFetchException,
- client.get_metadata, 'some_key')
+ client.get, 'some_key')
def test_get_metadata_throws_exception_for_request_id_mismatch(self):
self.response_parts['request_id'] = 'deadbeef'
client = self._get_client()
client._checksum = lambda _: self.response_parts['crc']
self.assertRaises(DataSourceSmartOS.JoyentMetadataFetchException,
- client.get_metadata, 'some_key')
+ client.get, 'some_key')
def test_get_metadata_returns_None_if_value_not_found(self):
self.response_parts['payload'] = ''
@@ -582,4 +520,24 @@ class TestJoyentMetadataClient(helpers.FilesystemMockingTestCase):
self.response_parts['length'] = 17
client = self._get_client()
client._checksum = lambda _: self.response_parts['crc']
- self.assertIsNone(client.get_metadata('some_key'))
+ self.assertIsNone(client.get('some_key'))
+
+
+class TestNetworkConversion(TestCase):
+
+ def test_convert_simple(self):
+ expected = {
+ 'version': 1,
+ 'config': [
+ {'name': 'net0', 'type': 'physical',
+ 'subnets': [{'type': 'static', 'gateway': '8.12.42.1',
+ 'netmask': '255.255.255.0',
+ 'address': '8.12.42.102/24'}],
+ 'mtu': 1500, 'mac_address': '90:b8:d0:f5:e4:f5'},
+ {'name': 'net1', 'type': 'physical',
+ 'subnets': [{'type': 'static', 'gateway': '192.168.128.1',
+ 'netmask': '255.255.252.0',
+ 'address': '192.168.128.93/22'}],
+ 'mtu': 8500, 'mac_address': '90:b8:d0:a5:ff:cd'}]}
+ found = DataSourceSmartOS.convert_smartos_network_data(SDC_NICS)
+ self.assertEqual(expected, found)