diff options
author | Joshua Harlow <harlowja@gmail.com> | 2016-06-06 18:42:29 -0700 |
---|---|---|
committer | Joshua Harlow <harlowja@gmail.com> | 2016-06-06 18:42:29 -0700 |
commit | f640797e342b6efbfb838a6350b312935222e992 (patch) | |
tree | 4e893298101cf3141d80b4bf2a2d6e009462502d /tests/unittests/test_datasource/test_smartos.py | |
parent | 85a53d66ad0241b2d6453d902487bb2edc1512b8 (diff) | |
parent | bc9bd58d1533d996029770da758f73217c15af33 (diff) | |
download | vyos-cloud-init-f640797e342b6efbfb838a6350b312935222e992.tar.gz vyos-cloud-init-f640797e342b6efbfb838a6350b312935222e992.zip |
Rebase against master
Diffstat (limited to 'tests/unittests/test_datasource/test_smartos.py')
-rw-r--r-- | tests/unittests/test_datasource/test_smartos.py | 290 |
1 files changed, 124 insertions, 166 deletions
diff --git a/tests/unittests/test_datasource/test_smartos.py b/tests/unittests/test_datasource/test_smartos.py index 28f56039..ddbb8fbd 100644 --- a/tests/unittests/test_datasource/test_smartos.py +++ b/tests/unittests/test_datasource/test_smartos.py @@ -25,6 +25,7 @@ from __future__ import print_function from binascii import crc32 +import json import os import os.path import re @@ -41,8 +42,49 @@ import six from cloudinit import helpers as c_helpers from cloudinit.util import b64e -from .. import helpers -from ..helpers import mock +from ..helpers import mock, FilesystemMockingTestCase, TestCase + +SDC_NICS = json.loads(""" +[ + { + "nic_tag": "external", + "primary": true, + "mtu": 1500, + "model": "virtio", + "gateway": "8.12.42.1", + "netmask": "255.255.255.0", + "ip": "8.12.42.102", + "network_uuid": "992fc7ce-6aac-4b74-aed6-7b9d2c6c0bfe", + "gateways": [ + "8.12.42.1" + ], + "vlan_id": 324, + "mac": "90:b8:d0:f5:e4:f5", + "interface": "net0", + "ips": [ + "8.12.42.102/24" + ] + }, + { + "nic_tag": "sdc_overlay/16187209", + "gateway": "192.168.128.1", + "model": "virtio", + "mac": "90:b8:d0:a5:ff:cd", + "netmask": "255.255.252.0", + "ip": "192.168.128.93", + "network_uuid": "4cad71da-09bc-452b-986d-03562a03a0a9", + "gateways": [ + "192.168.128.1" + ], + "vlan_id": 2, + "mtu": 8500, + "interface": "net1", + "ips": [ + "192.168.128.93/22" + ] + } +] +""") MOCK_RETURNS = { 'hostname': 'test-host', @@ -57,79 +99,66 @@ MOCK_RETURNS = { 'sdc:vendor-data': '\n'.join(['VENDOR_DATA', '']), 'user-data': '\n'.join(['something', '']), 'user-script': '\n'.join(['/bin/true', '']), + 'sdc:nics': json.dumps(SDC_NICS), } DMI_DATA_RETURN = 'smartdc' -def get_mock_client(mockdata): - class MockMetadataClient(object): +class PsuedoJoyentClient(object): + def __init__(self, data=None): + if data is None: + data = MOCK_RETURNS.copy() + self.data = data + return - def __init__(self, serial): - pass + def get(self, key, default=None, strip=False): + if key in self.data: + r = self.data[key] + if strip: + r = r.strip() + else: + r = default + return r - def get_metadata(self, metadata_key): - return mockdata.get(metadata_key) - return MockMetadataClient + def get_json(self, key, default=None): + result = self.get(key, default=default) + if result is None: + return default + return json.loads(result) + def exists(self): + return True -class TestSmartOSDataSource(helpers.FilesystemMockingTestCase): + +class TestSmartOSDataSource(FilesystemMockingTestCase): def setUp(self): super(TestSmartOSDataSource, self).setUp() + dsmos = 'cloudinit.sources.DataSourceSmartOS' + patcher = mock.patch(dsmos + ".jmc_client_factory") + self.jmc_cfact = patcher.start() + self.addCleanup(patcher.stop) + patcher = mock.patch(dsmos + ".get_smartos_environ") + self.get_smartos_environ = patcher.start() + self.addCleanup(patcher.stop) + self.tmp = tempfile.mkdtemp() self.addCleanup(shutil.rmtree, self.tmp) - self.legacy_user_d = tempfile.mkdtemp() - self.addCleanup(shutil.rmtree, self.legacy_user_d) - - # If you should want to watch the logs... - self._log = None - self._log_file = None - self._log_handler = None - - # patch cloud_dir, so our 'seed_dir' is guaranteed empty self.paths = c_helpers.Paths({'cloud_dir': self.tmp}) - self.unapply = [] - super(TestSmartOSDataSource, self).setUp() + self.legacy_user_d = tempfile.mkdtemp() + self.orig_lud = DataSourceSmartOS.LEGACY_USER_D + DataSourceSmartOS.LEGACY_USER_D = self.legacy_user_d def tearDown(self): - helpers.FilesystemMockingTestCase.tearDown(self) - if self._log_handler and self._log: - self._log.removeHandler(self._log_handler) - apply_patches([i for i in reversed(self.unapply)]) + DataSourceSmartOS.LEGACY_USER_D = self.orig_lud super(TestSmartOSDataSource, self).tearDown() - def _patchIn(self, root): - self.restore() - self.patchOS(root) - self.patchUtils(root) - - def apply_patches(self, patches): - ret = apply_patches(patches) - self.unapply += ret - - def _get_ds(self, sys_cfg=None, ds_cfg=None, mockdata=None, dmi_data=None, - is_lxbrand=False): - mod = DataSourceSmartOS - - if mockdata is None: - mockdata = MOCK_RETURNS - - if dmi_data is None: - dmi_data = DMI_DATA_RETURN - - def _dmi_data(): - return dmi_data - - def _os_uname(): - if not is_lxbrand: - # LP: #1243287. tests assume this runs, but running test on - # arm would cause them all to fail. - return ('LINUX', 'NODENAME', 'RELEASE', 'VERSION', 'x86_64') - else: - return ('LINUX', 'NODENAME', 'RELEASE', 'BRANDZ VIRTUAL LINUX', - 'X86_64') + def _get_ds(self, mockdata=None, mode=DataSourceSmartOS.SMARTOS_ENV_KVM, + sys_cfg=None, ds_cfg=None): + self.jmc_cfact.return_value = PsuedoJoyentClient(mockdata) + self.get_smartos_environ.return_value = mode if sys_cfg is None: sys_cfg = {} @@ -138,44 +167,8 @@ class TestSmartOSDataSource(helpers.FilesystemMockingTestCase): sys_cfg['datasource'] = sys_cfg.get('datasource', {}) sys_cfg['datasource']['SmartOS'] = ds_cfg - self.apply_patches([(mod, 'LEGACY_USER_D', self.legacy_user_d)]) - self.apply_patches([ - (mod, 'JoyentMetadataClient', get_mock_client(mockdata))]) - self.apply_patches([(mod, 'dmi_data', _dmi_data)]) - self.apply_patches([(os, 'uname', _os_uname)]) - self.apply_patches([(mod, 'device_exists', lambda d: True)]) - dsrc = mod.DataSourceSmartOS(sys_cfg, distro=None, - paths=self.paths) - self.apply_patches([(dsrc, '_get_seed_file_object', mock.MagicMock())]) - return dsrc - - def test_seed(self): - # default seed should be /dev/ttyS1 - dsrc = self._get_ds() - ret = dsrc.get_data() - self.assertTrue(ret) - self.assertEqual('kvm', dsrc.smartos_type) - self.assertEqual('/dev/ttyS1', dsrc.seed) - - def test_seed_lxbrand(self): - # default seed should be /dev/ttyS1 - dsrc = self._get_ds(is_lxbrand=True) - ret = dsrc.get_data() - self.assertTrue(ret) - self.assertEqual('lx-brand', dsrc.smartos_type) - self.assertEqual('/native/.zonecontrol/metadata.sock', dsrc.seed) - - def test_issmartdc(self): - dsrc = self._get_ds() - ret = dsrc.get_data() - self.assertTrue(ret) - self.assertTrue(dsrc.is_smartdc) - - def test_issmartdc_lxbrand(self): - dsrc = self._get_ds(is_lxbrand=True) - ret = dsrc.get_data() - self.assertTrue(ret) - self.assertTrue(dsrc.is_smartdc) + return DataSourceSmartOS.DataSourceSmartOS( + sys_cfg, distro=None, paths=self.paths) def test_no_base64(self): ds_cfg = {'no_base64_decode': ['test_var1'], 'all_base': True} @@ -211,58 +204,6 @@ class TestSmartOSDataSource(helpers.FilesystemMockingTestCase): self.assertEqual(MOCK_RETURNS['hostname'], dsrc.metadata['local-hostname']) - def test_base64_all(self): - # metadata provided base64_all of true - my_returns = MOCK_RETURNS.copy() - my_returns['base64_all'] = "true" - for k in ('hostname', 'cloud-init:user-data'): - my_returns[k] = b64e(my_returns[k]) - - dsrc = self._get_ds(mockdata=my_returns) - ret = dsrc.get_data() - self.assertTrue(ret) - self.assertEqual(MOCK_RETURNS['hostname'], - dsrc.metadata['local-hostname']) - self.assertEqual(MOCK_RETURNS['cloud-init:user-data'], - dsrc.userdata_raw) - self.assertEqual(MOCK_RETURNS['root_authorized_keys'], - dsrc.metadata['public-keys']) - self.assertEqual(MOCK_RETURNS['disable_iptables_flag'], - dsrc.metadata['iptables_disable']) - self.assertEqual(MOCK_RETURNS['enable_motd_sys_info'], - dsrc.metadata['motd_sys_info']) - - def test_b64_userdata(self): - my_returns = MOCK_RETURNS.copy() - my_returns['b64-cloud-init:user-data'] = "true" - my_returns['b64-hostname'] = "true" - for k in ('hostname', 'cloud-init:user-data'): - my_returns[k] = b64e(my_returns[k]) - - dsrc = self._get_ds(mockdata=my_returns) - ret = dsrc.get_data() - self.assertTrue(ret) - self.assertEqual(MOCK_RETURNS['hostname'], - dsrc.metadata['local-hostname']) - self.assertEqual(MOCK_RETURNS['cloud-init:user-data'], - dsrc.userdata_raw) - self.assertEqual(MOCK_RETURNS['root_authorized_keys'], - dsrc.metadata['public-keys']) - - def test_b64_keys(self): - my_returns = MOCK_RETURNS.copy() - my_returns['base64_keys'] = 'hostname,ignored' - for k in ('hostname',): - my_returns[k] = b64e(my_returns[k]) - - dsrc = self._get_ds(mockdata=my_returns) - ret = dsrc.get_data() - self.assertTrue(ret) - self.assertEqual(MOCK_RETURNS['hostname'], - dsrc.metadata['local-hostname']) - self.assertEqual(MOCK_RETURNS['cloud-init:user-data'], - dsrc.userdata_raw) - def test_userdata(self): dsrc = self._get_ds(mockdata=MOCK_RETURNS) ret = dsrc.get_data() @@ -272,6 +213,13 @@ class TestSmartOSDataSource(helpers.FilesystemMockingTestCase): self.assertEqual(MOCK_RETURNS['cloud-init:user-data'], dsrc.userdata_raw) + def test_sdc_nics(self): + dsrc = self._get_ds(mockdata=MOCK_RETURNS) + ret = dsrc.get_data() + self.assertTrue(ret) + self.assertEqual(json.loads(MOCK_RETURNS['sdc:nics']), + dsrc.metadata['network-data']) + def test_sdc_scripts(self): dsrc = self._get_ds(mockdata=MOCK_RETURNS) ret = dsrc.get_data() @@ -427,18 +375,7 @@ class TestSmartOSDataSource(helpers.FilesystemMockingTestCase): mydscfg['disk_aliases']['FOO']) -def apply_patches(patches): - ret = [] - for (ref, name, replace) in patches: - if replace is None: - continue - orig = getattr(ref, name) - setattr(ref, name, replace) - ret.append((ref, name, orig)) - return ret - - -class TestJoyentMetadataClient(helpers.FilesystemMockingTestCase): +class TestJoyentMetadataClient(FilesystemMockingTestCase): def setUp(self): super(TestJoyentMetadataClient, self).setUp() @@ -479,7 +416,8 @@ class TestJoyentMetadataClient(helpers.FilesystemMockingTestCase): mock.Mock(return_value=self.request_id))) def _get_client(self): - return DataSourceSmartOS.JoyentMetadataClient(self.serial) + return DataSourceSmartOS.JoyentMetadataClient( + fp=self.serial, smartos_type=DataSourceSmartOS.SMARTOS_ENV_KVM) def assertEndsWith(self, haystack, prefix): self.assertTrue(haystack.endswith(prefix), @@ -493,7 +431,7 @@ class TestJoyentMetadataClient(helpers.FilesystemMockingTestCase): def test_get_metadata_writes_a_single_line(self): client = self._get_client() - client.get_metadata('some_key') + client.get('some_key') self.assertEqual(1, self.serial.write.call_count) written_line = self.serial.write.call_args[0][0] print(type(written_line)) @@ -503,7 +441,7 @@ class TestJoyentMetadataClient(helpers.FilesystemMockingTestCase): def _get_written_line(self, key='some_key'): client = self._get_client() - client.get_metadata(key) + client.get(key) return self.serial.write.call_args[0][0] def test_get_metadata_writes_bytes(self): @@ -547,32 +485,32 @@ class TestJoyentMetadataClient(helpers.FilesystemMockingTestCase): def test_get_metadata_reads_a_line(self): client = self._get_client() - client.get_metadata('some_key') + client.get('some_key') self.assertEqual(self.metasource_data_len, self.serial.read.call_count) def test_get_metadata_returns_valid_value(self): client = self._get_client() - value = client.get_metadata('some_key') + value = client.get('some_key') self.assertEqual(self.metadata_value, value) def test_get_metadata_throws_exception_for_incorrect_length(self): self.response_parts['length'] = 0 client = self._get_client() self.assertRaises(DataSourceSmartOS.JoyentMetadataFetchException, - client.get_metadata, 'some_key') + client.get, 'some_key') def test_get_metadata_throws_exception_for_incorrect_crc(self): self.response_parts['crc'] = 'deadbeef' client = self._get_client() self.assertRaises(DataSourceSmartOS.JoyentMetadataFetchException, - client.get_metadata, 'some_key') + client.get, 'some_key') def test_get_metadata_throws_exception_for_request_id_mismatch(self): self.response_parts['request_id'] = 'deadbeef' client = self._get_client() client._checksum = lambda _: self.response_parts['crc'] self.assertRaises(DataSourceSmartOS.JoyentMetadataFetchException, - client.get_metadata, 'some_key') + client.get, 'some_key') def test_get_metadata_returns_None_if_value_not_found(self): self.response_parts['payload'] = '' @@ -580,4 +518,24 @@ class TestJoyentMetadataClient(helpers.FilesystemMockingTestCase): self.response_parts['length'] = 17 client = self._get_client() client._checksum = lambda _: self.response_parts['crc'] - self.assertIsNone(client.get_metadata('some_key')) + self.assertIsNone(client.get('some_key')) + + +class TestNetworkConversion(TestCase): + + def test_convert_simple(self): + expected = { + 'version': 1, + 'config': [ + {'name': 'net0', 'type': 'physical', + 'subnets': [{'type': 'static', 'gateway': '8.12.42.1', + 'netmask': '255.255.255.0', + 'address': '8.12.42.102/24'}], + 'mtu': 1500, 'mac_address': '90:b8:d0:f5:e4:f5'}, + {'name': 'net1', 'type': 'physical', + 'subnets': [{'type': 'static', 'gateway': '192.168.128.1', + 'netmask': '255.255.252.0', + 'address': '192.168.128.93/22'}], + 'mtu': 8500, 'mac_address': '90:b8:d0:a5:ff:cd'}]} + found = DataSourceSmartOS.convert_smartos_network_data(SDC_NICS) + self.assertEqual(expected, found) |