diff options
Diffstat (limited to 'tests/unittests/sources/test_smartos.py')
-rw-r--r-- | tests/unittests/sources/test_smartos.py | 956 |
1 files changed, 607 insertions, 349 deletions
diff --git a/tests/unittests/sources/test_smartos.py b/tests/unittests/sources/test_smartos.py index e306eded..55239c4e 100644 --- a/tests/unittests/sources/test_smartos.py +++ b/tests/unittests/sources/test_smartos.py @@ -5,14 +5,13 @@ # # This file is part of cloud-init. See LICENSE file for license information. -'''This is a testcase for the SmartOS datasource. +"""This is a testcase for the SmartOS datasource. It replicates a serial console and acts like the SmartOS console does in order to validate return responses. -''' +""" -from binascii import crc32 import json import multiprocessing import os @@ -22,32 +21,40 @@ import signal import stat import unittest import uuid +from binascii import crc32 +from cloudinit import helpers as c_helpers from cloudinit import serial +from cloudinit.event import EventScope, EventType from cloudinit.sources import DataSourceSmartOS +from cloudinit.sources.DataSourceSmartOS import SERIAL_DEVICE, SMARTOS_ENV_KVM from cloudinit.sources.DataSourceSmartOS import ( convert_smartos_network_data as convert_net, - SMARTOS_ENV_KVM, SERIAL_DEVICE, get_smartos_environ, - identify_file) -from cloudinit.event import EventScope, EventType - -from cloudinit import helpers as c_helpers -from cloudinit.util import (b64e, write_file) -from cloudinit.subp import (subp, ProcessExecutionError, which) - +) +from cloudinit.sources.DataSourceSmartOS import ( + get_smartos_environ, + identify_file, +) +from cloudinit.subp import ProcessExecutionError, subp, which +from cloudinit.util import b64e, write_file from tests.unittests.helpers import ( - CiTestCase, mock, FilesystemMockingTestCase, skipIf) - + CiTestCase, + FilesystemMockingTestCase, + mock, + skipIf, +) try: import serial as _pyserial + assert _pyserial # avoid pyflakes error F401: import unused HAS_PYSERIAL = True except ImportError: HAS_PYSERIAL = False -DSMOS = 'cloudinit.sources.DataSourceSmartOS' -SDC_NICS = json.loads(""" +DSMOS = "cloudinit.sources.DataSourceSmartOS" +SDC_NICS = json.loads( + """ [ { "nic_tag": "external", @@ -87,10 +94,12 @@ SDC_NICS = json.loads(""" ] } ] -""") +""" +) -SDC_NICS_ALT = json.loads(""" +SDC_NICS_ALT = json.loads( + """ [ { "interface": "net0", @@ -126,9 +135,11 @@ SDC_NICS_ALT = json.loads(""" "mtu": 1500 } ] -""") +""" +) -SDC_NICS_DHCP = json.loads(""" +SDC_NICS_DHCP = json.loads( + """ [ { "interface": "net0", @@ -164,9 +175,11 @@ SDC_NICS_DHCP = json.loads(""" "mtu": 1500 } ] -""") +""" +) -SDC_NICS_MIP = json.loads(""" +SDC_NICS_MIP = json.loads( + """ [ { "interface": "net0", @@ -204,9 +217,11 @@ SDC_NICS_MIP = json.loads(""" "mtu": 1500 } ] -""") +""" +) -SDC_NICS_MIP_IPV6 = json.loads(""" +SDC_NICS_MIP_IPV6 = json.loads( + """ [ { "interface": "net0", @@ -243,9 +258,11 @@ SDC_NICS_MIP_IPV6 = json.loads(""" "mtu": 1500 } ] -""") +""" +) -SDC_NICS_IPV4_IPV6 = json.loads(""" +SDC_NICS_IPV4_IPV6 = json.loads( + """ [ { "interface": "net0", @@ -277,9 +294,11 @@ SDC_NICS_IPV4_IPV6 = json.loads(""" "mtu": 1500 } ] -""") +""" +) -SDC_NICS_SINGLE_GATEWAY = json.loads(""" +SDC_NICS_SINGLE_GATEWAY = json.loads( + """ [ { "interface":"net0", @@ -309,32 +328,33 @@ SDC_NICS_SINGLE_GATEWAY = json.loads(""" "mtu":1500 } ] -""") +""" +) MOCK_RETURNS = { - 'hostname': 'test-host', - 'root_authorized_keys': 'ssh-rsa AAAAB3Nz...aC1yc2E= keyname', - 'disable_iptables_flag': None, - 'enable_motd_sys_info': None, - 'test-var1': 'some data', - 'cloud-init:user-data': '\n'.join(['#!/bin/sh', '/bin/true', '']), - 'sdc:datacenter_name': 'somewhere2', - 'sdc:operator-script': '\n'.join(['bin/true', '']), - 'sdc:uuid': str(uuid.uuid4()), - 'sdc:vendor-data': '\n'.join(['VENDOR_DATA', '']), - 'user-data': '\n'.join(['something', '']), - 'user-script': '\n'.join(['/bin/true', '']), - 'sdc:nics': json.dumps(SDC_NICS), + "hostname": "test-host", + "root_authorized_keys": "ssh-rsa AAAAB3Nz...aC1yc2E= keyname", + "disable_iptables_flag": None, + "enable_motd_sys_info": None, + "test-var1": "some data", + "cloud-init:user-data": "\n".join(["#!/bin/sh", "/bin/true", ""]), + "sdc:datacenter_name": "somewhere2", + "sdc:operator-script": "\n".join(["bin/true", ""]), + "sdc:uuid": str(uuid.uuid4()), + "sdc:vendor-data": "\n".join(["VENDOR_DATA", ""]), + "user-data": "\n".join(["something", ""]), + "user-script": "\n".join(["/bin/true", ""]), + "sdc:nics": json.dumps(SDC_NICS), } -DMI_DATA_RETURN = 'smartdc' +DMI_DATA_RETURN = "smartdc" # Useful for calculating the length of a frame body. A SUCCESS body will be # followed by more characters or be one character less if SUCCESS with no # payload. See Section 4.3 of https://eng.joyent.com/mdata/protocol.html. -SUCCESS_LEN = len('0123abcd SUCCESS ') -NOTFOUND_LEN = len('0123abcd NOTFOUND') +SUCCESS_LEN = len("0123abcd SUCCESS ") +NOTFOUND_LEN = len("0123abcd NOTFOUND") class PsuedoJoyentClient(object): @@ -364,11 +384,11 @@ class PsuedoJoyentClient(object): return True def open_transport(self): - assert(not self._is_open) + assert not self._is_open self._is_open = True def close_transport(self): - assert(self._is_open) + assert self._is_open self._is_open = False @@ -381,21 +401,35 @@ class TestSmartOSDataSource(FilesystemMockingTestCase): self.add_patch(DSMOS + ".get_smartos_environ", "get_smartos_environ") self.add_patch(DSMOS + ".jmc_client_factory", "jmc_cfact") - self.legacy_user_d = self.tmp_path('legacy_user_tmp') + self.legacy_user_d = self.tmp_path("legacy_user_tmp") os.mkdir(self.legacy_user_d) - self.add_patch(DSMOS + ".LEGACY_USER_D", "m_legacy_user_d", - autospec=False, new=self.legacy_user_d) - self.add_patch(DSMOS + ".identify_file", "m_identify_file", - return_value="text/plain") + self.add_patch( + DSMOS + ".LEGACY_USER_D", + "m_legacy_user_d", + autospec=False, + new=self.legacy_user_d, + ) + self.add_patch( + DSMOS + ".identify_file", + "m_identify_file", + return_value="text/plain", + ) - def _get_ds(self, mockdata=None, mode=DataSourceSmartOS.SMARTOS_ENV_KVM, - sys_cfg=None, ds_cfg=None): + def _get_ds( + self, + mockdata=None, + mode=DataSourceSmartOS.SMARTOS_ENV_KVM, + sys_cfg=None, + ds_cfg=None, + ): self.jmc_cfact.return_value = PsuedoJoyentClient(mockdata) self.get_smartos_environ.return_value = mode tmpd = self.tmp_dir() - dirs = {'cloud_dir': self.tmp_path('cloud_dir', tmpd), - 'run_dir': self.tmp_path('run_dir')} + dirs = { + "cloud_dir": self.tmp_path("cloud_dir", tmpd), + "run_dir": self.tmp_path("run_dir"), + } for d in dirs.values(): os.mkdir(d) paths = c_helpers.Paths(dirs) @@ -404,14 +438,15 @@ class TestSmartOSDataSource(FilesystemMockingTestCase): sys_cfg = {} if ds_cfg is not None: - sys_cfg['datasource'] = sys_cfg.get('datasource', {}) - sys_cfg['datasource']['SmartOS'] = ds_cfg + sys_cfg["datasource"] = sys_cfg.get("datasource", {}) + sys_cfg["datasource"]["SmartOS"] = ds_cfg return DataSourceSmartOS.DataSourceSmartOS( - sys_cfg, distro=None, paths=paths) + sys_cfg, distro=None, paths=paths + ) def test_no_base64(self): - ds_cfg = {'no_base64_decode': ['test_var1'], 'all_base': True} + ds_cfg = {"no_base64_decode": ["test_var1"], "all_base": True} dsrc = self._get_ds(ds_cfg=ds_cfg) ret = dsrc.get_data() self.assertTrue(ret) @@ -420,166 +455,180 @@ class TestSmartOSDataSource(FilesystemMockingTestCase): dsrc = self._get_ds(mockdata=MOCK_RETURNS) ret = dsrc.get_data() self.assertTrue(ret) - self.assertEqual(MOCK_RETURNS['sdc:uuid'], - dsrc.metadata['instance-id']) + self.assertEqual( + MOCK_RETURNS["sdc:uuid"], dsrc.metadata["instance-id"] + ) def test_platform_info(self): """All platform-related attributes are properly set.""" dsrc = self._get_ds(mockdata=MOCK_RETURNS) - self.assertEqual('joyent', dsrc.cloud_name) - self.assertEqual('joyent', dsrc.platform_type) - self.assertEqual('serial (/dev/ttyS1)', dsrc.subplatform) + self.assertEqual("joyent", dsrc.cloud_name) + self.assertEqual("joyent", dsrc.platform_type) + self.assertEqual("serial (/dev/ttyS1)", dsrc.subplatform) def test_root_keys(self): dsrc = self._get_ds(mockdata=MOCK_RETURNS) ret = dsrc.get_data() self.assertTrue(ret) - self.assertEqual(MOCK_RETURNS['root_authorized_keys'], - dsrc.metadata['public-keys']) + self.assertEqual( + MOCK_RETURNS["root_authorized_keys"], dsrc.metadata["public-keys"] + ) def test_hostname_b64(self): dsrc = self._get_ds(mockdata=MOCK_RETURNS) ret = dsrc.get_data() self.assertTrue(ret) - self.assertEqual(MOCK_RETURNS['hostname'], - dsrc.metadata['local-hostname']) + self.assertEqual( + MOCK_RETURNS["hostname"], dsrc.metadata["local-hostname"] + ) def test_hostname(self): dsrc = self._get_ds(mockdata=MOCK_RETURNS) ret = dsrc.get_data() self.assertTrue(ret) - self.assertEqual(MOCK_RETURNS['hostname'], - dsrc.metadata['local-hostname']) + self.assertEqual( + MOCK_RETURNS["hostname"], dsrc.metadata["local-hostname"] + ) def test_hostname_if_no_sdc_hostname(self): my_returns = MOCK_RETURNS.copy() - my_returns['sdc:hostname'] = 'sdc-' + my_returns['hostname'] + my_returns["sdc:hostname"] = "sdc-" + my_returns["hostname"] dsrc = self._get_ds(mockdata=my_returns) ret = dsrc.get_data() self.assertTrue(ret) - self.assertEqual(my_returns['hostname'], - dsrc.metadata['local-hostname']) + self.assertEqual( + my_returns["hostname"], dsrc.metadata["local-hostname"] + ) def test_sdc_hostname_if_no_hostname(self): my_returns = MOCK_RETURNS.copy() - my_returns['sdc:hostname'] = 'sdc-' + my_returns['hostname'] - del my_returns['hostname'] + my_returns["sdc:hostname"] = "sdc-" + my_returns["hostname"] + del my_returns["hostname"] dsrc = self._get_ds(mockdata=my_returns) ret = dsrc.get_data() self.assertTrue(ret) - self.assertEqual(my_returns['sdc:hostname'], - dsrc.metadata['local-hostname']) + self.assertEqual( + my_returns["sdc:hostname"], dsrc.metadata["local-hostname"] + ) def test_sdc_uuid_if_no_hostname_or_sdc_hostname(self): my_returns = MOCK_RETURNS.copy() - del my_returns['hostname'] + del my_returns["hostname"] dsrc = self._get_ds(mockdata=my_returns) ret = dsrc.get_data() self.assertTrue(ret) - self.assertEqual(my_returns['sdc:uuid'], - dsrc.metadata['local-hostname']) + self.assertEqual( + my_returns["sdc:uuid"], dsrc.metadata["local-hostname"] + ) def test_userdata(self): dsrc = self._get_ds(mockdata=MOCK_RETURNS) ret = dsrc.get_data() self.assertTrue(ret) - self.assertEqual(MOCK_RETURNS['user-data'], - dsrc.metadata['legacy-user-data']) - self.assertEqual(MOCK_RETURNS['cloud-init:user-data'], - dsrc.userdata_raw) + self.assertEqual( + MOCK_RETURNS["user-data"], dsrc.metadata["legacy-user-data"] + ) + self.assertEqual( + MOCK_RETURNS["cloud-init:user-data"], dsrc.userdata_raw + ) def test_sdc_nics(self): dsrc = self._get_ds(mockdata=MOCK_RETURNS) ret = dsrc.get_data() self.assertTrue(ret) - self.assertEqual(json.loads(MOCK_RETURNS['sdc:nics']), - dsrc.metadata['network-data']) + self.assertEqual( + json.loads(MOCK_RETURNS["sdc:nics"]), dsrc.metadata["network-data"] + ) def test_sdc_scripts(self): dsrc = self._get_ds(mockdata=MOCK_RETURNS) ret = dsrc.get_data() self.assertTrue(ret) - self.assertEqual(MOCK_RETURNS['user-script'], - dsrc.metadata['user-script']) + self.assertEqual( + MOCK_RETURNS["user-script"], dsrc.metadata["user-script"] + ) legacy_script_f = "%s/user-script" % self.legacy_user_d print("legacy_script_f=%s" % legacy_script_f) self.assertTrue(os.path.exists(legacy_script_f)) self.assertTrue(os.path.islink(legacy_script_f)) user_script_perm = oct(os.stat(legacy_script_f)[stat.ST_MODE])[-3:] - self.assertEqual(user_script_perm, '700') + self.assertEqual(user_script_perm, "700") def test_scripts_shebanged(self): dsrc = self._get_ds(mockdata=MOCK_RETURNS) ret = dsrc.get_data() self.assertTrue(ret) - self.assertEqual(MOCK_RETURNS['user-script'], - dsrc.metadata['user-script']) + self.assertEqual( + MOCK_RETURNS["user-script"], dsrc.metadata["user-script"] + ) legacy_script_f = "%s/user-script" % self.legacy_user_d self.assertTrue(os.path.exists(legacy_script_f)) self.assertTrue(os.path.islink(legacy_script_f)) shebang = None - with open(legacy_script_f, 'r') as f: + with open(legacy_script_f, "r") as f: shebang = f.readlines()[0].strip() self.assertEqual(shebang, "#!/bin/bash") user_script_perm = oct(os.stat(legacy_script_f)[stat.ST_MODE])[-3:] - self.assertEqual(user_script_perm, '700') + self.assertEqual(user_script_perm, "700") def test_scripts_shebang_not_added(self): """ - Test that the SmartOS requirement that plain text scripts - are executable. This test makes sure that plain texts scripts - with out file magic have it added appropriately by cloud-init. + Test that the SmartOS requirement that plain text scripts + are executable. This test makes sure that plain texts scripts + with out file magic have it added appropriately by cloud-init. """ my_returns = MOCK_RETURNS.copy() - my_returns['user-script'] = '\n'.join(['#!/usr/bin/perl', - 'print("hi")', '']) + my_returns["user-script"] = "\n".join( + ["#!/usr/bin/perl", 'print("hi")', ""] + ) dsrc = self._get_ds(mockdata=my_returns) ret = dsrc.get_data() self.assertTrue(ret) - self.assertEqual(my_returns['user-script'], - dsrc.metadata['user-script']) + self.assertEqual( + my_returns["user-script"], dsrc.metadata["user-script"] + ) legacy_script_f = "%s/user-script" % self.legacy_user_d self.assertTrue(os.path.exists(legacy_script_f)) self.assertTrue(os.path.islink(legacy_script_f)) shebang = None - with open(legacy_script_f, 'r') as f: + with open(legacy_script_f, "r") as f: shebang = f.readlines()[0].strip() self.assertEqual(shebang, "#!/usr/bin/perl") def test_userdata_removed(self): """ - User-data in the SmartOS world is supposed to be written to a file - each and every boot. This tests to make sure that in the event the - legacy user-data is removed, the existing user-data is backed-up - and there is no /var/db/user-data left. + User-data in the SmartOS world is supposed to be written to a file + each and every boot. This tests to make sure that in the event the + legacy user-data is removed, the existing user-data is backed-up + and there is no /var/db/user-data left. """ user_data_f = "%s/mdata-user-data" % self.legacy_user_d - with open(user_data_f, 'w') as f: + with open(user_data_f, "w") as f: f.write("PREVIOUS") my_returns = MOCK_RETURNS.copy() - del my_returns['user-data'] + del my_returns["user-data"] dsrc = self._get_ds(mockdata=my_returns) ret = dsrc.get_data() self.assertTrue(ret) - self.assertFalse(dsrc.metadata.get('legacy-user-data')) + self.assertFalse(dsrc.metadata.get("legacy-user-data")) found_new = False for root, _dirs, files in os.walk(self.legacy_user_d): for name in files: name_f = os.path.join(root, name) permissions = oct(os.stat(name_f)[stat.ST_MODE])[-3:] - if re.match(r'.*\/mdata-user-data$', name_f): + if re.match(r".*\/mdata-user-data$", name_f): found_new = True print(name_f) - self.assertEqual(permissions, '400') + self.assertEqual(permissions, "400") self.assertFalse(found_new) @@ -587,17 +636,18 @@ class TestSmartOSDataSource(FilesystemMockingTestCase): dsrc = self._get_ds(mockdata=MOCK_RETURNS) ret = dsrc.get_data() self.assertTrue(ret) - self.assertEqual(MOCK_RETURNS['sdc:vendor-data'], - dsrc.metadata['vendor-data']) + self.assertEqual( + MOCK_RETURNS["sdc:vendor-data"], dsrc.metadata["vendor-data"] + ) def test_default_vendor_data(self): my_returns = MOCK_RETURNS.copy() - def_op_script = my_returns['sdc:vendor-data'] - del my_returns['sdc:vendor-data'] + def_op_script = my_returns["sdc:vendor-data"] + del my_returns["sdc:vendor-data"] dsrc = self._get_ds(mockdata=my_returns) ret = dsrc.get_data() self.assertTrue(ret) - self.assertNotEqual(def_op_script, dsrc.metadata['vendor-data']) + self.assertNotEqual(def_op_script, dsrc.metadata["vendor-data"]) # we expect default vendor-data is a boothook self.assertTrue(dsrc.vendordata_raw.startswith("#cloud-boothook")) @@ -606,15 +656,19 @@ class TestSmartOSDataSource(FilesystemMockingTestCase): dsrc = self._get_ds(mockdata=MOCK_RETURNS) ret = dsrc.get_data() self.assertTrue(ret) - self.assertEqual(MOCK_RETURNS['disable_iptables_flag'], - dsrc.metadata['iptables_disable']) + self.assertEqual( + MOCK_RETURNS["disable_iptables_flag"], + dsrc.metadata["iptables_disable"], + ) def test_motd_sys_info(self): dsrc = self._get_ds(mockdata=MOCK_RETURNS) ret = dsrc.get_data() self.assertTrue(ret) - self.assertEqual(MOCK_RETURNS['enable_motd_sys_info'], - dsrc.metadata['motd_sys_info']) + self.assertEqual( + MOCK_RETURNS["enable_motd_sys_info"], + dsrc.metadata["motd_sys_info"], + ) def test_default_ephemeral(self): # Test to make sure that the builtin config has the ephemeral @@ -625,16 +679,16 @@ class TestSmartOSDataSource(FilesystemMockingTestCase): ret = dsrc.get_data() self.assertTrue(ret) - assert 'disk_setup' in cfg - assert 'fs_setup' in cfg - self.assertIsInstance(cfg['disk_setup'], dict) - self.assertIsInstance(cfg['fs_setup'], list) + assert "disk_setup" in cfg + assert "fs_setup" in cfg + self.assertIsInstance(cfg["disk_setup"], dict) + self.assertIsInstance(cfg["fs_setup"], list) def test_override_disk_aliases(self): # Test to make sure that the built-in DS is overriden builtin = DataSourceSmartOS.BUILTIN_DS_CONFIG - mydscfg = {'disk_aliases': {'FOO': '/dev/bar'}} + mydscfg = {"disk_aliases": {"FOO": "/dev/bar"}} # expect that these values are in builtin, or this is pointless for k in mydscfg: @@ -644,25 +698,30 @@ class TestSmartOSDataSource(FilesystemMockingTestCase): ret = dsrc.get_data() self.assertTrue(ret) - self.assertEqual(mydscfg['disk_aliases']['FOO'], - dsrc.ds_cfg['disk_aliases']['FOO']) + self.assertEqual( + mydscfg["disk_aliases"]["FOO"], dsrc.ds_cfg["disk_aliases"]["FOO"] + ) - self.assertEqual(dsrc.device_name_to_device('FOO'), - mydscfg['disk_aliases']['FOO']) + self.assertEqual( + dsrc.device_name_to_device("FOO"), mydscfg["disk_aliases"]["FOO"] + ) def test_reconfig_network_on_boot(self): # Test to ensure that network is configured from metadata on each boot dsrc = self._get_ds(mockdata=MOCK_RETURNS) self.assertSetEqual( - {EventType.BOOT_NEW_INSTANCE, - EventType.BOOT, - EventType.BOOT_LEGACY}, - dsrc.default_update_events[EventScope.NETWORK] + { + EventType.BOOT_NEW_INSTANCE, + EventType.BOOT, + EventType.BOOT_LEGACY, + }, + dsrc.default_update_events[EventScope.NETWORK], ) class TestIdentifyFile(CiTestCase): """Test the 'identify_file' utility.""" + @skipIf(not which("file"), "command 'file' not available.") def test_file_happy_path(self): """Test file is available and functional on plain text.""" @@ -680,14 +739,16 @@ class TestIdentifyFile(CiTestCase): self.assertEqual(None, identify_file(fname)) self.assertEqual( [mock.call(["file", "--brief", "--mime-type", fname])], - m_subp.call_args_list) + m_subp.call_args_list, + ) class ShortReader(object): """Implements a 'read' interface for bytes provided. much like io.BytesIO but the 'endbyte' acts as if EOF. When it is reached a short will be returned.""" - def __init__(self, initial_bytes, endbyte=b'\0'): + + def __init__(self, initial_bytes, endbyte=b"\0"): self.data = initial_bytes self.index = 0 self.len = len(self.data) @@ -700,7 +761,7 @@ class ShortReader(object): def read(self, size=-1): """Read size bytes but not past a null.""" if size == 0 or self.index >= self.len: - return b'' + return b"" rsize = size if size < 0 or size + self.index > self.len: @@ -711,7 +772,7 @@ class ShortReader(object): rsize = next_null - self.index + 1 i = self.index self.index += rsize - ret = self.data[i:i + rsize] + ret = self.data[i : i + rsize] if len(ret) and ret[-1:] == self.endbyte: ret = ret[:-1] return ret @@ -719,32 +780,34 @@ class ShortReader(object): class TestJoyentMetadataClient(FilesystemMockingTestCase): - invalid = b'invalid command\n' - failure = b'FAILURE\n' - v2_ok = b'V2_OK\n' + invalid = b"invalid command\n" + failure = b"FAILURE\n" + v2_ok = b"V2_OK\n" def setUp(self): super(TestJoyentMetadataClient, self).setUp() self.serial = mock.MagicMock(spec=serial.Serial) - self.request_id = 0xabcdef12 - self.metadata_value = 'value' + self.request_id = 0xABCDEF12 + self.metadata_value = "value" self.response_parts = { - 'command': 'SUCCESS', - 'crc': 'b5a9ff00', - 'length': SUCCESS_LEN + len(b64e(self.metadata_value)), - 'payload': b64e(self.metadata_value), - 'request_id': '{0:08x}'.format(self.request_id), + "command": "SUCCESS", + "crc": "b5a9ff00", + "length": SUCCESS_LEN + len(b64e(self.metadata_value)), + "payload": b64e(self.metadata_value), + "request_id": "{0:08x}".format(self.request_id), } def make_response(): - payloadstr = '' - if 'payload' in self.response_parts: - payloadstr = ' {0}'.format(self.response_parts['payload']) - return ('V2 {length} {crc} {request_id} ' - '{command}{payloadstr}\n'.format( - payloadstr=payloadstr, - **self.response_parts).encode('ascii')) + payloadstr = "" + if "payload" in self.response_parts: + payloadstr = " {0}".format(self.response_parts["payload"]) + return ( + "V2 {length} {crc} {request_id} " + "{command}{payloadstr}\n".format( + payloadstr=payloadstr, **self.response_parts + ).encode("ascii") + ) self.metasource_data = None @@ -758,41 +821,49 @@ class TestJoyentMetadataClient(FilesystemMockingTestCase): self.serial.read.side_effect = read_response self.patched_funcs.enter_context( - mock.patch('cloudinit.sources.DataSourceSmartOS.random.randint', - mock.Mock(return_value=self.request_id))) + mock.patch( + "cloudinit.sources.DataSourceSmartOS.random.randint", + mock.Mock(return_value=self.request_id), + ) + ) def _get_client(self): return DataSourceSmartOS.JoyentMetadataClient( - fp=self.serial, smartos_type=DataSourceSmartOS.SMARTOS_ENV_KVM) + fp=self.serial, smartos_type=DataSourceSmartOS.SMARTOS_ENV_KVM + ) def _get_serial_client(self): self.serial.timeout = 1 - return DataSourceSmartOS.JoyentMetadataSerialClient(None, - fp=self.serial) + return DataSourceSmartOS.JoyentMetadataSerialClient( + None, fp=self.serial + ) def assertEndsWith(self, haystack, prefix): - self.assertTrue(haystack.endswith(prefix), - "{0} does not end with '{1}'".format( - repr(haystack), prefix)) + self.assertTrue( + haystack.endswith(prefix), + "{0} does not end with '{1}'".format(repr(haystack), prefix), + ) def assertStartsWith(self, haystack, prefix): - self.assertTrue(haystack.startswith(prefix), - "{0} does not start with '{1}'".format( - repr(haystack), prefix)) + self.assertTrue( + haystack.startswith(prefix), + "{0} does not start with '{1}'".format(repr(haystack), prefix), + ) def assertNoMoreSideEffects(self, obj): self.assertRaises(StopIteration, obj) def test_get_metadata_writes_a_single_line(self): client = self._get_client() - client.get('some_key') + client.get("some_key") self.assertEqual(1, self.serial.write.call_count) written_line = self.serial.write.call_args[0][0] - self.assertEndsWith(written_line.decode('ascii'), - b'\n'.decode('ascii')) - self.assertEqual(1, written_line.count(b'\n')) + self.assertEndsWith( + written_line.decode("ascii"), b"\n".decode("ascii") + ) + self.assertEqual(1, written_line.count(b"\n")) - def _get_written_line(self, key='some_key'): + def _get_written_line(self, key="some_key"): client = self._get_client() client.get(key) return self.serial.write.call_args[0][0] @@ -802,76 +873,86 @@ class TestJoyentMetadataClient(FilesystemMockingTestCase): def test_get_metadata_line_starts_with_v2(self): foo = self._get_written_line() - self.assertStartsWith(foo.decode('ascii'), b'V2'.decode('ascii')) + self.assertStartsWith(foo.decode("ascii"), b"V2".decode("ascii")) def test_get_metadata_uses_get_command(self): - parts = self._get_written_line().decode('ascii').strip().split(' ') - self.assertEqual('GET', parts[4]) + parts = self._get_written_line().decode("ascii").strip().split(" ") + self.assertEqual("GET", parts[4]) def test_get_metadata_base64_encodes_argument(self): - key = 'my_key' - parts = self._get_written_line(key).decode('ascii').strip().split(' ') + key = "my_key" + parts = self._get_written_line(key).decode("ascii").strip().split(" ") self.assertEqual(b64e(key), parts[5]) def test_get_metadata_calculates_length_correctly(self): - parts = self._get_written_line().decode('ascii').strip().split(' ') - expected_length = len(' '.join(parts[3:])) + parts = self._get_written_line().decode("ascii").strip().split(" ") + expected_length = len(" ".join(parts[3:])) self.assertEqual(expected_length, int(parts[1])) def test_get_metadata_uses_appropriate_request_id(self): - parts = self._get_written_line().decode('ascii').strip().split(' ') + parts = self._get_written_line().decode("ascii").strip().split(" ") request_id = parts[3] self.assertEqual(8, len(request_id)) self.assertEqual(request_id, request_id.lower()) def test_get_metadata_uses_random_number_for_request_id(self): line = self._get_written_line() - request_id = line.decode('ascii').strip().split(' ')[3] - self.assertEqual('{0:08x}'.format(self.request_id), request_id) + request_id = line.decode("ascii").strip().split(" ")[3] + self.assertEqual("{0:08x}".format(self.request_id), request_id) def test_get_metadata_checksums_correctly(self): - parts = self._get_written_line().decode('ascii').strip().split(' ') - expected_checksum = '{0:08x}'.format( - crc32(' '.join(parts[3:]).encode('utf-8')) & 0xffffffff) + parts = self._get_written_line().decode("ascii").strip().split(" ") + expected_checksum = "{0:08x}".format( + crc32(" ".join(parts[3:]).encode("utf-8")) & 0xFFFFFFFF + ) checksum = parts[2] self.assertEqual(expected_checksum, checksum) def test_get_metadata_reads_a_line(self): client = self._get_client() - client.get('some_key') + client.get("some_key") self.assertEqual(self.metasource_data_len, self.serial.read.call_count) def test_get_metadata_returns_valid_value(self): client = self._get_client() - value = client.get('some_key') + value = client.get("some_key") self.assertEqual(self.metadata_value, value) def test_get_metadata_throws_exception_for_incorrect_length(self): - self.response_parts['length'] = 0 + self.response_parts["length"] = 0 client = self._get_client() - self.assertRaises(DataSourceSmartOS.JoyentMetadataFetchException, - client.get, 'some_key') + self.assertRaises( + DataSourceSmartOS.JoyentMetadataFetchException, + client.get, + "some_key", + ) def test_get_metadata_throws_exception_for_incorrect_crc(self): - self.response_parts['crc'] = 'deadbeef' + self.response_parts["crc"] = "deadbeef" client = self._get_client() - self.assertRaises(DataSourceSmartOS.JoyentMetadataFetchException, - client.get, 'some_key') + self.assertRaises( + DataSourceSmartOS.JoyentMetadataFetchException, + client.get, + "some_key", + ) def test_get_metadata_throws_exception_for_request_id_mismatch(self): - self.response_parts['request_id'] = 'deadbeef' + self.response_parts["request_id"] = "deadbeef" client = self._get_client() - client._checksum = lambda _: self.response_parts['crc'] - self.assertRaises(DataSourceSmartOS.JoyentMetadataFetchException, - client.get, 'some_key') + client._checksum = lambda _: self.response_parts["crc"] + self.assertRaises( + DataSourceSmartOS.JoyentMetadataFetchException, + client.get, + "some_key", + ) def test_get_metadata_returns_None_if_value_not_found(self): - self.response_parts['payload'] = '' - self.response_parts['command'] = 'NOTFOUND' - self.response_parts['length'] = NOTFOUND_LEN + self.response_parts["payload"] = "" + self.response_parts["command"] = "NOTFOUND" + self.response_parts["length"] = NOTFOUND_LEN client = self._get_client() - client._checksum = lambda _: self.response_parts['crc'] - self.assertIsNone(client.get('some_key')) + client._checksum = lambda _: self.response_parts["crc"] + self.assertIsNone(client.get("some_key")) def test_negotiate(self): client = self._get_client() @@ -883,55 +964,58 @@ class TestJoyentMetadataClient(FilesystemMockingTestCase): def test_negotiate_short_response(self): client = self._get_client() # chopped '\n' from v2_ok. - reader = ShortReader(self.v2_ok[:-1] + b'\0') + reader = ShortReader(self.v2_ok[:-1] + b"\0") client.fp.read.side_effect = reader.read - self.assertRaises(DataSourceSmartOS.JoyentMetadataTimeoutException, - client._negotiate) + self.assertRaises( + DataSourceSmartOS.JoyentMetadataTimeoutException, client._negotiate + ) self.assertTrue(reader.emptied) def test_negotiate_bad_response(self): client = self._get_client() - reader = ShortReader(b'garbage\n' + self.v2_ok) + reader = ShortReader(b"garbage\n" + self.v2_ok) client.fp.read.side_effect = reader.read - self.assertRaises(DataSourceSmartOS.JoyentMetadataFetchException, - client._negotiate) + self.assertRaises( + DataSourceSmartOS.JoyentMetadataFetchException, client._negotiate + ) self.assertEqual(self.v2_ok, client.fp.read()) def test_serial_open_transport(self): client = self._get_serial_client() - reader = ShortReader(b'garbage\0' + self.invalid + self.v2_ok) + reader = ShortReader(b"garbage\0" + self.invalid + self.v2_ok) client.fp.read.side_effect = reader.read client.open_transport() self.assertTrue(reader.emptied) def test_flush_failure(self): client = self._get_serial_client() - reader = ShortReader(b'garbage' + b'\0' + self.failure + - self.invalid + self.v2_ok) + reader = ShortReader( + b"garbage" + b"\0" + self.failure + self.invalid + self.v2_ok + ) client.fp.read.side_effect = reader.read client.open_transport() self.assertTrue(reader.emptied) def test_flush_many_timeouts(self): client = self._get_serial_client() - reader = ShortReader(b'\0' * 100 + self.invalid + self.v2_ok) + reader = ShortReader(b"\0" * 100 + self.invalid + self.v2_ok) client.fp.read.side_effect = reader.read client.open_transport() self.assertTrue(reader.emptied) def test_list_metadata_returns_list(self): - parts = ['foo', 'bar'] - value = b64e('\n'.join(parts)) - self.response_parts['payload'] = value - self.response_parts['crc'] = '40873553' - self.response_parts['length'] = SUCCESS_LEN + len(value) + parts = ["foo", "bar"] + value = b64e("\n".join(parts)) + self.response_parts["payload"] = value + self.response_parts["crc"] = "40873553" + self.response_parts["length"] = SUCCESS_LEN + len(value) client = self._get_client() self.assertEqual(client.list(), parts) def test_list_metadata_returns_empty_list_if_no_customer_metadata(self): - del self.response_parts['payload'] - self.response_parts['length'] = SUCCESS_LEN - 1 - self.response_parts['crc'] = '14e563ba' + del self.response_parts["payload"] + self.response_parts["length"] = SUCCESS_LEN - 1 + self.response_parts["crc"] = "14e563ba" client = self._get_client() self.assertEqual(client.list(), []) @@ -939,181 +1023,354 @@ class TestJoyentMetadataClient(FilesystemMockingTestCase): class TestNetworkConversion(CiTestCase): def test_convert_simple(self): expected = { - 'version': 1, - 'config': [ - {'name': 'net0', 'type': 'physical', - 'subnets': [{'type': 'static', 'gateway': '8.12.42.1', - 'address': '8.12.42.102/24'}], - 'mtu': 1500, 'mac_address': '90:b8:d0:f5:e4:f5'}, - {'name': 'net1', 'type': 'physical', - 'subnets': [{'type': 'static', - 'address': '192.168.128.93/22'}], - 'mtu': 8500, 'mac_address': '90:b8:d0:a5:ff:cd'}]} + "version": 1, + "config": [ + { + "name": "net0", + "type": "physical", + "subnets": [ + { + "type": "static", + "gateway": "8.12.42.1", + "address": "8.12.42.102/24", + } + ], + "mtu": 1500, + "mac_address": "90:b8:d0:f5:e4:f5", + }, + { + "name": "net1", + "type": "physical", + "subnets": [ + {"type": "static", "address": "192.168.128.93/22"} + ], + "mtu": 8500, + "mac_address": "90:b8:d0:a5:ff:cd", + }, + ], + } found = convert_net(SDC_NICS) self.assertEqual(expected, found) def test_convert_simple_alt(self): expected = { - 'version': 1, - 'config': [ - {'name': 'net0', 'type': 'physical', - 'subnets': [{'type': 'static', 'gateway': '8.12.42.1', - 'address': '8.12.42.51/24'}], - 'mtu': 1500, 'mac_address': '90:b8:d0:ae:64:51'}, - {'name': 'net1', 'type': 'physical', - 'subnets': [{'type': 'static', - 'address': '10.210.1.217/24'}], - 'mtu': 1500, 'mac_address': '90:b8:d0:bd:4f:9c'}]} + "version": 1, + "config": [ + { + "name": "net0", + "type": "physical", + "subnets": [ + { + "type": "static", + "gateway": "8.12.42.1", + "address": "8.12.42.51/24", + } + ], + "mtu": 1500, + "mac_address": "90:b8:d0:ae:64:51", + }, + { + "name": "net1", + "type": "physical", + "subnets": [ + {"type": "static", "address": "10.210.1.217/24"} + ], + "mtu": 1500, + "mac_address": "90:b8:d0:bd:4f:9c", + }, + ], + } found = convert_net(SDC_NICS_ALT) self.assertEqual(expected, found) def test_convert_simple_dhcp(self): expected = { - 'version': 1, - 'config': [ - {'name': 'net0', 'type': 'physical', - 'subnets': [{'type': 'static', 'gateway': '8.12.42.1', - 'address': '8.12.42.51/24'}], - 'mtu': 1500, 'mac_address': '90:b8:d0:ae:64:51'}, - {'name': 'net1', 'type': 'physical', - 'subnets': [{'type': 'dhcp4'}], - 'mtu': 1500, 'mac_address': '90:b8:d0:bd:4f:9c'}]} + "version": 1, + "config": [ + { + "name": "net0", + "type": "physical", + "subnets": [ + { + "type": "static", + "gateway": "8.12.42.1", + "address": "8.12.42.51/24", + } + ], + "mtu": 1500, + "mac_address": "90:b8:d0:ae:64:51", + }, + { + "name": "net1", + "type": "physical", + "subnets": [{"type": "dhcp4"}], + "mtu": 1500, + "mac_address": "90:b8:d0:bd:4f:9c", + }, + ], + } found = convert_net(SDC_NICS_DHCP) self.assertEqual(expected, found) def test_convert_simple_multi_ip(self): expected = { - 'version': 1, - 'config': [ - {'name': 'net0', 'type': 'physical', - 'subnets': [{'type': 'static', 'gateway': '8.12.42.1', - 'address': '8.12.42.51/24'}, - {'type': 'static', - 'address': '8.12.42.52/24'}], - 'mtu': 1500, 'mac_address': '90:b8:d0:ae:64:51'}, - {'name': 'net1', 'type': 'physical', - 'subnets': [{'type': 'static', - 'address': '10.210.1.217/24'}, - {'type': 'static', - 'address': '10.210.1.151/24'}], - 'mtu': 1500, 'mac_address': '90:b8:d0:bd:4f:9c'}]} + "version": 1, + "config": [ + { + "name": "net0", + "type": "physical", + "subnets": [ + { + "type": "static", + "gateway": "8.12.42.1", + "address": "8.12.42.51/24", + }, + {"type": "static", "address": "8.12.42.52/24"}, + ], + "mtu": 1500, + "mac_address": "90:b8:d0:ae:64:51", + }, + { + "name": "net1", + "type": "physical", + "subnets": [ + {"type": "static", "address": "10.210.1.217/24"}, + {"type": "static", "address": "10.210.1.151/24"}, + ], + "mtu": 1500, + "mac_address": "90:b8:d0:bd:4f:9c", + }, + ], + } found = convert_net(SDC_NICS_MIP) self.assertEqual(expected, found) def test_convert_with_dns(self): expected = { - 'version': 1, - 'config': [ - {'name': 'net0', 'type': 'physical', - 'subnets': [{'type': 'static', 'gateway': '8.12.42.1', - 'address': '8.12.42.51/24'}], - 'mtu': 1500, 'mac_address': '90:b8:d0:ae:64:51'}, - {'name': 'net1', 'type': 'physical', - 'subnets': [{'type': 'dhcp4'}], - 'mtu': 1500, 'mac_address': '90:b8:d0:bd:4f:9c'}, - {'type': 'nameserver', - 'address': ['8.8.8.8', '8.8.8.1'], 'search': ["local"]}]} + "version": 1, + "config": [ + { + "name": "net0", + "type": "physical", + "subnets": [ + { + "type": "static", + "gateway": "8.12.42.1", + "address": "8.12.42.51/24", + } + ], + "mtu": 1500, + "mac_address": "90:b8:d0:ae:64:51", + }, + { + "name": "net1", + "type": "physical", + "subnets": [{"type": "dhcp4"}], + "mtu": 1500, + "mac_address": "90:b8:d0:bd:4f:9c", + }, + { + "type": "nameserver", + "address": ["8.8.8.8", "8.8.8.1"], + "search": ["local"], + }, + ], + } found = convert_net( - network_data=SDC_NICS_DHCP, dns_servers=['8.8.8.8', '8.8.8.1'], - dns_domain="local") + network_data=SDC_NICS_DHCP, + dns_servers=["8.8.8.8", "8.8.8.1"], + dns_domain="local", + ) self.assertEqual(expected, found) def test_convert_simple_multi_ipv6(self): expected = { - 'version': 1, - 'config': [ - {'name': 'net0', 'type': 'physical', - 'subnets': [{'type': 'static', 'address': - '2001:4800:78ff:1b:be76:4eff:fe06:96b3/64'}, - {'type': 'static', 'gateway': '8.12.42.1', - 'address': '8.12.42.51/24'}], - 'mtu': 1500, 'mac_address': '90:b8:d0:ae:64:51'}, - {'name': 'net1', 'type': 'physical', - 'subnets': [{'type': 'static', - 'address': '10.210.1.217/24'}], - 'mtu': 1500, 'mac_address': '90:b8:d0:bd:4f:9c'}]} + "version": 1, + "config": [ + { + "name": "net0", + "type": "physical", + "subnets": [ + { + "type": "static", + "address": ( + "2001:4800:78ff:1b:be76:4eff:fe06:96b3/64" + ), + }, + { + "type": "static", + "gateway": "8.12.42.1", + "address": "8.12.42.51/24", + }, + ], + "mtu": 1500, + "mac_address": "90:b8:d0:ae:64:51", + }, + { + "name": "net1", + "type": "physical", + "subnets": [ + {"type": "static", "address": "10.210.1.217/24"} + ], + "mtu": 1500, + "mac_address": "90:b8:d0:bd:4f:9c", + }, + ], + } found = convert_net(SDC_NICS_MIP_IPV6) self.assertEqual(expected, found) def test_convert_simple_both_ipv4_ipv6(self): expected = { - 'version': 1, - 'config': [ - {'mac_address': '90:b8:d0:ae:64:51', 'mtu': 1500, - 'name': 'net0', 'type': 'physical', - 'subnets': [{'address': '2001::10/64', 'gateway': '2001::1', - 'type': 'static'}, - {'address': '8.12.42.51/24', - 'gateway': '8.12.42.1', - 'type': 'static'}, - {'address': '2001::11/64', 'type': 'static'}, - {'address': '8.12.42.52/32', 'type': 'static'}]}, - {'mac_address': '90:b8:d0:bd:4f:9c', 'mtu': 1500, - 'name': 'net1', 'type': 'physical', - 'subnets': [{'address': '10.210.1.217/24', - 'type': 'static'}]}]} + "version": 1, + "config": [ + { + "mac_address": "90:b8:d0:ae:64:51", + "mtu": 1500, + "name": "net0", + "type": "physical", + "subnets": [ + { + "address": "2001::10/64", + "gateway": "2001::1", + "type": "static", + }, + { + "address": "8.12.42.51/24", + "gateway": "8.12.42.1", + "type": "static", + }, + {"address": "2001::11/64", "type": "static"}, + {"address": "8.12.42.52/32", "type": "static"}, + ], + }, + { + "mac_address": "90:b8:d0:bd:4f:9c", + "mtu": 1500, + "name": "net1", + "type": "physical", + "subnets": [ + {"address": "10.210.1.217/24", "type": "static"} + ], + }, + ], + } found = convert_net(SDC_NICS_IPV4_IPV6) self.assertEqual(expected, found) def test_gateways_not_on_all_nics(self): expected = { - 'version': 1, - 'config': [ - {'mac_address': '90:b8:d0:d8:82:b4', 'mtu': 1500, - 'name': 'net0', 'type': 'physical', - 'subnets': [{'address': '8.12.42.26/24', - 'gateway': '8.12.42.1', 'type': 'static'}]}, - {'mac_address': '90:b8:d0:0a:51:31', 'mtu': 1500, - 'name': 'net1', 'type': 'physical', - 'subnets': [{'address': '10.210.1.27/24', - 'type': 'static'}]}]} + "version": 1, + "config": [ + { + "mac_address": "90:b8:d0:d8:82:b4", + "mtu": 1500, + "name": "net0", + "type": "physical", + "subnets": [ + { + "address": "8.12.42.26/24", + "gateway": "8.12.42.1", + "type": "static", + } + ], + }, + { + "mac_address": "90:b8:d0:0a:51:31", + "mtu": 1500, + "name": "net1", + "type": "physical", + "subnets": [ + {"address": "10.210.1.27/24", "type": "static"} + ], + }, + ], + } found = convert_net(SDC_NICS_SINGLE_GATEWAY) self.assertEqual(expected, found) def test_routes_on_all_nics(self): routes = [ - {'linklocal': False, 'dst': '3.0.0.0/8', 'gateway': '8.12.42.3'}, - {'linklocal': False, 'dst': '4.0.0.0/8', 'gateway': '10.210.1.4'}] + {"linklocal": False, "dst": "3.0.0.0/8", "gateway": "8.12.42.3"}, + {"linklocal": False, "dst": "4.0.0.0/8", "gateway": "10.210.1.4"}, + ] expected = { - 'version': 1, - 'config': [ - {'mac_address': '90:b8:d0:d8:82:b4', 'mtu': 1500, - 'name': 'net0', 'type': 'physical', - 'subnets': [{'address': '8.12.42.26/24', - 'gateway': '8.12.42.1', 'type': 'static', - 'routes': [{'network': '3.0.0.0/8', - 'gateway': '8.12.42.3'}, - {'network': '4.0.0.0/8', - 'gateway': '10.210.1.4'}]}]}, - {'mac_address': '90:b8:d0:0a:51:31', 'mtu': 1500, - 'name': 'net1', 'type': 'physical', - 'subnets': [{'address': '10.210.1.27/24', 'type': 'static', - 'routes': [{'network': '3.0.0.0/8', - 'gateway': '8.12.42.3'}, - {'network': '4.0.0.0/8', - 'gateway': '10.210.1.4'}]}]}]} + "version": 1, + "config": [ + { + "mac_address": "90:b8:d0:d8:82:b4", + "mtu": 1500, + "name": "net0", + "type": "physical", + "subnets": [ + { + "address": "8.12.42.26/24", + "gateway": "8.12.42.1", + "type": "static", + "routes": [ + { + "network": "3.0.0.0/8", + "gateway": "8.12.42.3", + }, + { + "network": "4.0.0.0/8", + "gateway": "10.210.1.4", + }, + ], + } + ], + }, + { + "mac_address": "90:b8:d0:0a:51:31", + "mtu": 1500, + "name": "net1", + "type": "physical", + "subnets": [ + { + "address": "10.210.1.27/24", + "type": "static", + "routes": [ + { + "network": "3.0.0.0/8", + "gateway": "8.12.42.3", + }, + { + "network": "4.0.0.0/8", + "gateway": "10.210.1.4", + }, + ], + } + ], + }, + ], + } found = convert_net(SDC_NICS_SINGLE_GATEWAY, routes=routes) self.maxDiff = None self.assertEqual(expected, found) -@unittest.skipUnless(get_smartos_environ() == SMARTOS_ENV_KVM, - "Only supported on KVM and bhyve guests under SmartOS") -@unittest.skipUnless(os.access(SERIAL_DEVICE, os.W_OK), - "Requires write access to " + SERIAL_DEVICE) +@unittest.skipUnless( + get_smartos_environ() == SMARTOS_ENV_KVM, + "Only supported on KVM and bhyve guests under SmartOS", +) +@unittest.skipUnless( + os.access(SERIAL_DEVICE, os.W_OK), + "Requires write access to " + SERIAL_DEVICE, +) @unittest.skipUnless(HAS_PYSERIAL is True, "pyserial not available") class TestSerialConcurrency(CiTestCase): """ - This class tests locking on an actual serial port, and as such can only - be run in a kvm or bhyve guest running on a SmartOS host. A test run on - a metadata socket will not be valid because a metadata socket ensures - there is only one session over a connection. In contrast, in the - absence of proper locking multiple processes opening the same serial - port can corrupt each others' exchanges with the metadata server. - - This takes on the order of 2 to 3 minutes to run. + This class tests locking on an actual serial port, and as such can only + be run in a kvm or bhyve guest running on a SmartOS host. A test run on + a metadata socket will not be valid because a metadata socket ensures + there is only one session over a connection. In contrast, in the + absence of proper locking multiple processes opening the same serial + port can corrupt each others' exchanges with the metadata server. + + This takes on the order of 2 to 3 minutes to run. """ - allowed_subp = ['mdata-get'] + + allowed_subp = ["mdata-get"] def setUp(self): self.mdata_proc = multiprocessing.Process(target=self.start_mdata_loop) @@ -1128,16 +1385,16 @@ class TestSerialConcurrency(CiTestCase): def start_mdata_loop(self): """ - The mdata-get command is repeatedly run in a separate process so - that it may try to race with metadata operations performed in the - main test process. Use of mdata-get is better than two processes - using the protocol implementation in DataSourceSmartOS because we - are testing to be sure that cloud-init and mdata-get respect each - others locks. + The mdata-get command is repeatedly run in a separate process so + that it may try to race with metadata operations performed in the + main test process. Use of mdata-get is better than two processes + using the protocol implementation in DataSourceSmartOS because we + are testing to be sure that cloud-init and mdata-get respect each + others locks. """ rcs = list(range(0, 256)) while True: - subp(['mdata-get', 'sdc:routes'], rcs=rcs) + subp(["mdata-get", "sdc:routes"], rcs=rcs) def test_all_keys(self): self.assertIsNotNone(self.mdata_proc.pid) @@ -1160,4 +1417,5 @@ class TestSerialConcurrency(CiTestCase): self.assertIsNone(self.mdata_proc.exitcode) + # vi: ts=4 expandtab |