summaryrefslogtreecommitdiff
path: root/tests/unittests/sources/test_smartos.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/unittests/sources/test_smartos.py')
-rw-r--r--tests/unittests/sources/test_smartos.py956
1 files changed, 607 insertions, 349 deletions
diff --git a/tests/unittests/sources/test_smartos.py b/tests/unittests/sources/test_smartos.py
index e306eded..55239c4e 100644
--- a/tests/unittests/sources/test_smartos.py
+++ b/tests/unittests/sources/test_smartos.py
@@ -5,14 +5,13 @@
#
# This file is part of cloud-init. See LICENSE file for license information.
-'''This is a testcase for the SmartOS datasource.
+"""This is a testcase for the SmartOS datasource.
It replicates a serial console and acts like the SmartOS console does in
order to validate return responses.
-'''
+"""
-from binascii import crc32
import json
import multiprocessing
import os
@@ -22,32 +21,40 @@ import signal
import stat
import unittest
import uuid
+from binascii import crc32
+from cloudinit import helpers as c_helpers
from cloudinit import serial
+from cloudinit.event import EventScope, EventType
from cloudinit.sources import DataSourceSmartOS
+from cloudinit.sources.DataSourceSmartOS import SERIAL_DEVICE, SMARTOS_ENV_KVM
from cloudinit.sources.DataSourceSmartOS import (
convert_smartos_network_data as convert_net,
- SMARTOS_ENV_KVM, SERIAL_DEVICE, get_smartos_environ,
- identify_file)
-from cloudinit.event import EventScope, EventType
-
-from cloudinit import helpers as c_helpers
-from cloudinit.util import (b64e, write_file)
-from cloudinit.subp import (subp, ProcessExecutionError, which)
-
+)
+from cloudinit.sources.DataSourceSmartOS import (
+ get_smartos_environ,
+ identify_file,
+)
+from cloudinit.subp import ProcessExecutionError, subp, which
+from cloudinit.util import b64e, write_file
from tests.unittests.helpers import (
- CiTestCase, mock, FilesystemMockingTestCase, skipIf)
-
+ CiTestCase,
+ FilesystemMockingTestCase,
+ mock,
+ skipIf,
+)
try:
import serial as _pyserial
+
assert _pyserial # avoid pyflakes error F401: import unused
HAS_PYSERIAL = True
except ImportError:
HAS_PYSERIAL = False
-DSMOS = 'cloudinit.sources.DataSourceSmartOS'
-SDC_NICS = json.loads("""
+DSMOS = "cloudinit.sources.DataSourceSmartOS"
+SDC_NICS = json.loads(
+ """
[
{
"nic_tag": "external",
@@ -87,10 +94,12 @@ SDC_NICS = json.loads("""
]
}
]
-""")
+"""
+)
-SDC_NICS_ALT = json.loads("""
+SDC_NICS_ALT = json.loads(
+ """
[
{
"interface": "net0",
@@ -126,9 +135,11 @@ SDC_NICS_ALT = json.loads("""
"mtu": 1500
}
]
-""")
+"""
+)
-SDC_NICS_DHCP = json.loads("""
+SDC_NICS_DHCP = json.loads(
+ """
[
{
"interface": "net0",
@@ -164,9 +175,11 @@ SDC_NICS_DHCP = json.loads("""
"mtu": 1500
}
]
-""")
+"""
+)
-SDC_NICS_MIP = json.loads("""
+SDC_NICS_MIP = json.loads(
+ """
[
{
"interface": "net0",
@@ -204,9 +217,11 @@ SDC_NICS_MIP = json.loads("""
"mtu": 1500
}
]
-""")
+"""
+)
-SDC_NICS_MIP_IPV6 = json.loads("""
+SDC_NICS_MIP_IPV6 = json.loads(
+ """
[
{
"interface": "net0",
@@ -243,9 +258,11 @@ SDC_NICS_MIP_IPV6 = json.loads("""
"mtu": 1500
}
]
-""")
+"""
+)
-SDC_NICS_IPV4_IPV6 = json.loads("""
+SDC_NICS_IPV4_IPV6 = json.loads(
+ """
[
{
"interface": "net0",
@@ -277,9 +294,11 @@ SDC_NICS_IPV4_IPV6 = json.loads("""
"mtu": 1500
}
]
-""")
+"""
+)
-SDC_NICS_SINGLE_GATEWAY = json.loads("""
+SDC_NICS_SINGLE_GATEWAY = json.loads(
+ """
[
{
"interface":"net0",
@@ -309,32 +328,33 @@ SDC_NICS_SINGLE_GATEWAY = json.loads("""
"mtu":1500
}
]
-""")
+"""
+)
MOCK_RETURNS = {
- 'hostname': 'test-host',
- 'root_authorized_keys': 'ssh-rsa AAAAB3Nz...aC1yc2E= keyname',
- 'disable_iptables_flag': None,
- 'enable_motd_sys_info': None,
- 'test-var1': 'some data',
- 'cloud-init:user-data': '\n'.join(['#!/bin/sh', '/bin/true', '']),
- 'sdc:datacenter_name': 'somewhere2',
- 'sdc:operator-script': '\n'.join(['bin/true', '']),
- 'sdc:uuid': str(uuid.uuid4()),
- 'sdc:vendor-data': '\n'.join(['VENDOR_DATA', '']),
- 'user-data': '\n'.join(['something', '']),
- 'user-script': '\n'.join(['/bin/true', '']),
- 'sdc:nics': json.dumps(SDC_NICS),
+ "hostname": "test-host",
+ "root_authorized_keys": "ssh-rsa AAAAB3Nz...aC1yc2E= keyname",
+ "disable_iptables_flag": None,
+ "enable_motd_sys_info": None,
+ "test-var1": "some data",
+ "cloud-init:user-data": "\n".join(["#!/bin/sh", "/bin/true", ""]),
+ "sdc:datacenter_name": "somewhere2",
+ "sdc:operator-script": "\n".join(["bin/true", ""]),
+ "sdc:uuid": str(uuid.uuid4()),
+ "sdc:vendor-data": "\n".join(["VENDOR_DATA", ""]),
+ "user-data": "\n".join(["something", ""]),
+ "user-script": "\n".join(["/bin/true", ""]),
+ "sdc:nics": json.dumps(SDC_NICS),
}
-DMI_DATA_RETURN = 'smartdc'
+DMI_DATA_RETURN = "smartdc"
# Useful for calculating the length of a frame body. A SUCCESS body will be
# followed by more characters or be one character less if SUCCESS with no
# payload. See Section 4.3 of https://eng.joyent.com/mdata/protocol.html.
-SUCCESS_LEN = len('0123abcd SUCCESS ')
-NOTFOUND_LEN = len('0123abcd NOTFOUND')
+SUCCESS_LEN = len("0123abcd SUCCESS ")
+NOTFOUND_LEN = len("0123abcd NOTFOUND")
class PsuedoJoyentClient(object):
@@ -364,11 +384,11 @@ class PsuedoJoyentClient(object):
return True
def open_transport(self):
- assert(not self._is_open)
+ assert not self._is_open
self._is_open = True
def close_transport(self):
- assert(self._is_open)
+ assert self._is_open
self._is_open = False
@@ -381,21 +401,35 @@ class TestSmartOSDataSource(FilesystemMockingTestCase):
self.add_patch(DSMOS + ".get_smartos_environ", "get_smartos_environ")
self.add_patch(DSMOS + ".jmc_client_factory", "jmc_cfact")
- self.legacy_user_d = self.tmp_path('legacy_user_tmp')
+ self.legacy_user_d = self.tmp_path("legacy_user_tmp")
os.mkdir(self.legacy_user_d)
- self.add_patch(DSMOS + ".LEGACY_USER_D", "m_legacy_user_d",
- autospec=False, new=self.legacy_user_d)
- self.add_patch(DSMOS + ".identify_file", "m_identify_file",
- return_value="text/plain")
+ self.add_patch(
+ DSMOS + ".LEGACY_USER_D",
+ "m_legacy_user_d",
+ autospec=False,
+ new=self.legacy_user_d,
+ )
+ self.add_patch(
+ DSMOS + ".identify_file",
+ "m_identify_file",
+ return_value="text/plain",
+ )
- def _get_ds(self, mockdata=None, mode=DataSourceSmartOS.SMARTOS_ENV_KVM,
- sys_cfg=None, ds_cfg=None):
+ def _get_ds(
+ self,
+ mockdata=None,
+ mode=DataSourceSmartOS.SMARTOS_ENV_KVM,
+ sys_cfg=None,
+ ds_cfg=None,
+ ):
self.jmc_cfact.return_value = PsuedoJoyentClient(mockdata)
self.get_smartos_environ.return_value = mode
tmpd = self.tmp_dir()
- dirs = {'cloud_dir': self.tmp_path('cloud_dir', tmpd),
- 'run_dir': self.tmp_path('run_dir')}
+ dirs = {
+ "cloud_dir": self.tmp_path("cloud_dir", tmpd),
+ "run_dir": self.tmp_path("run_dir"),
+ }
for d in dirs.values():
os.mkdir(d)
paths = c_helpers.Paths(dirs)
@@ -404,14 +438,15 @@ class TestSmartOSDataSource(FilesystemMockingTestCase):
sys_cfg = {}
if ds_cfg is not None:
- sys_cfg['datasource'] = sys_cfg.get('datasource', {})
- sys_cfg['datasource']['SmartOS'] = ds_cfg
+ sys_cfg["datasource"] = sys_cfg.get("datasource", {})
+ sys_cfg["datasource"]["SmartOS"] = ds_cfg
return DataSourceSmartOS.DataSourceSmartOS(
- sys_cfg, distro=None, paths=paths)
+ sys_cfg, distro=None, paths=paths
+ )
def test_no_base64(self):
- ds_cfg = {'no_base64_decode': ['test_var1'], 'all_base': True}
+ ds_cfg = {"no_base64_decode": ["test_var1"], "all_base": True}
dsrc = self._get_ds(ds_cfg=ds_cfg)
ret = dsrc.get_data()
self.assertTrue(ret)
@@ -420,166 +455,180 @@ class TestSmartOSDataSource(FilesystemMockingTestCase):
dsrc = self._get_ds(mockdata=MOCK_RETURNS)
ret = dsrc.get_data()
self.assertTrue(ret)
- self.assertEqual(MOCK_RETURNS['sdc:uuid'],
- dsrc.metadata['instance-id'])
+ self.assertEqual(
+ MOCK_RETURNS["sdc:uuid"], dsrc.metadata["instance-id"]
+ )
def test_platform_info(self):
"""All platform-related attributes are properly set."""
dsrc = self._get_ds(mockdata=MOCK_RETURNS)
- self.assertEqual('joyent', dsrc.cloud_name)
- self.assertEqual('joyent', dsrc.platform_type)
- self.assertEqual('serial (/dev/ttyS1)', dsrc.subplatform)
+ self.assertEqual("joyent", dsrc.cloud_name)
+ self.assertEqual("joyent", dsrc.platform_type)
+ self.assertEqual("serial (/dev/ttyS1)", dsrc.subplatform)
def test_root_keys(self):
dsrc = self._get_ds(mockdata=MOCK_RETURNS)
ret = dsrc.get_data()
self.assertTrue(ret)
- self.assertEqual(MOCK_RETURNS['root_authorized_keys'],
- dsrc.metadata['public-keys'])
+ self.assertEqual(
+ MOCK_RETURNS["root_authorized_keys"], dsrc.metadata["public-keys"]
+ )
def test_hostname_b64(self):
dsrc = self._get_ds(mockdata=MOCK_RETURNS)
ret = dsrc.get_data()
self.assertTrue(ret)
- self.assertEqual(MOCK_RETURNS['hostname'],
- dsrc.metadata['local-hostname'])
+ self.assertEqual(
+ MOCK_RETURNS["hostname"], dsrc.metadata["local-hostname"]
+ )
def test_hostname(self):
dsrc = self._get_ds(mockdata=MOCK_RETURNS)
ret = dsrc.get_data()
self.assertTrue(ret)
- self.assertEqual(MOCK_RETURNS['hostname'],
- dsrc.metadata['local-hostname'])
+ self.assertEqual(
+ MOCK_RETURNS["hostname"], dsrc.metadata["local-hostname"]
+ )
def test_hostname_if_no_sdc_hostname(self):
my_returns = MOCK_RETURNS.copy()
- my_returns['sdc:hostname'] = 'sdc-' + my_returns['hostname']
+ my_returns["sdc:hostname"] = "sdc-" + my_returns["hostname"]
dsrc = self._get_ds(mockdata=my_returns)
ret = dsrc.get_data()
self.assertTrue(ret)
- self.assertEqual(my_returns['hostname'],
- dsrc.metadata['local-hostname'])
+ self.assertEqual(
+ my_returns["hostname"], dsrc.metadata["local-hostname"]
+ )
def test_sdc_hostname_if_no_hostname(self):
my_returns = MOCK_RETURNS.copy()
- my_returns['sdc:hostname'] = 'sdc-' + my_returns['hostname']
- del my_returns['hostname']
+ my_returns["sdc:hostname"] = "sdc-" + my_returns["hostname"]
+ del my_returns["hostname"]
dsrc = self._get_ds(mockdata=my_returns)
ret = dsrc.get_data()
self.assertTrue(ret)
- self.assertEqual(my_returns['sdc:hostname'],
- dsrc.metadata['local-hostname'])
+ self.assertEqual(
+ my_returns["sdc:hostname"], dsrc.metadata["local-hostname"]
+ )
def test_sdc_uuid_if_no_hostname_or_sdc_hostname(self):
my_returns = MOCK_RETURNS.copy()
- del my_returns['hostname']
+ del my_returns["hostname"]
dsrc = self._get_ds(mockdata=my_returns)
ret = dsrc.get_data()
self.assertTrue(ret)
- self.assertEqual(my_returns['sdc:uuid'],
- dsrc.metadata['local-hostname'])
+ self.assertEqual(
+ my_returns["sdc:uuid"], dsrc.metadata["local-hostname"]
+ )
def test_userdata(self):
dsrc = self._get_ds(mockdata=MOCK_RETURNS)
ret = dsrc.get_data()
self.assertTrue(ret)
- self.assertEqual(MOCK_RETURNS['user-data'],
- dsrc.metadata['legacy-user-data'])
- self.assertEqual(MOCK_RETURNS['cloud-init:user-data'],
- dsrc.userdata_raw)
+ self.assertEqual(
+ MOCK_RETURNS["user-data"], dsrc.metadata["legacy-user-data"]
+ )
+ self.assertEqual(
+ MOCK_RETURNS["cloud-init:user-data"], dsrc.userdata_raw
+ )
def test_sdc_nics(self):
dsrc = self._get_ds(mockdata=MOCK_RETURNS)
ret = dsrc.get_data()
self.assertTrue(ret)
- self.assertEqual(json.loads(MOCK_RETURNS['sdc:nics']),
- dsrc.metadata['network-data'])
+ self.assertEqual(
+ json.loads(MOCK_RETURNS["sdc:nics"]), dsrc.metadata["network-data"]
+ )
def test_sdc_scripts(self):
dsrc = self._get_ds(mockdata=MOCK_RETURNS)
ret = dsrc.get_data()
self.assertTrue(ret)
- self.assertEqual(MOCK_RETURNS['user-script'],
- dsrc.metadata['user-script'])
+ self.assertEqual(
+ MOCK_RETURNS["user-script"], dsrc.metadata["user-script"]
+ )
legacy_script_f = "%s/user-script" % self.legacy_user_d
print("legacy_script_f=%s" % legacy_script_f)
self.assertTrue(os.path.exists(legacy_script_f))
self.assertTrue(os.path.islink(legacy_script_f))
user_script_perm = oct(os.stat(legacy_script_f)[stat.ST_MODE])[-3:]
- self.assertEqual(user_script_perm, '700')
+ self.assertEqual(user_script_perm, "700")
def test_scripts_shebanged(self):
dsrc = self._get_ds(mockdata=MOCK_RETURNS)
ret = dsrc.get_data()
self.assertTrue(ret)
- self.assertEqual(MOCK_RETURNS['user-script'],
- dsrc.metadata['user-script'])
+ self.assertEqual(
+ MOCK_RETURNS["user-script"], dsrc.metadata["user-script"]
+ )
legacy_script_f = "%s/user-script" % self.legacy_user_d
self.assertTrue(os.path.exists(legacy_script_f))
self.assertTrue(os.path.islink(legacy_script_f))
shebang = None
- with open(legacy_script_f, 'r') as f:
+ with open(legacy_script_f, "r") as f:
shebang = f.readlines()[0].strip()
self.assertEqual(shebang, "#!/bin/bash")
user_script_perm = oct(os.stat(legacy_script_f)[stat.ST_MODE])[-3:]
- self.assertEqual(user_script_perm, '700')
+ self.assertEqual(user_script_perm, "700")
def test_scripts_shebang_not_added(self):
"""
- Test that the SmartOS requirement that plain text scripts
- are executable. This test makes sure that plain texts scripts
- with out file magic have it added appropriately by cloud-init.
+ Test that the SmartOS requirement that plain text scripts
+ are executable. This test makes sure that plain texts scripts
+ with out file magic have it added appropriately by cloud-init.
"""
my_returns = MOCK_RETURNS.copy()
- my_returns['user-script'] = '\n'.join(['#!/usr/bin/perl',
- 'print("hi")', ''])
+ my_returns["user-script"] = "\n".join(
+ ["#!/usr/bin/perl", 'print("hi")', ""]
+ )
dsrc = self._get_ds(mockdata=my_returns)
ret = dsrc.get_data()
self.assertTrue(ret)
- self.assertEqual(my_returns['user-script'],
- dsrc.metadata['user-script'])
+ self.assertEqual(
+ my_returns["user-script"], dsrc.metadata["user-script"]
+ )
legacy_script_f = "%s/user-script" % self.legacy_user_d
self.assertTrue(os.path.exists(legacy_script_f))
self.assertTrue(os.path.islink(legacy_script_f))
shebang = None
- with open(legacy_script_f, 'r') as f:
+ with open(legacy_script_f, "r") as f:
shebang = f.readlines()[0].strip()
self.assertEqual(shebang, "#!/usr/bin/perl")
def test_userdata_removed(self):
"""
- User-data in the SmartOS world is supposed to be written to a file
- each and every boot. This tests to make sure that in the event the
- legacy user-data is removed, the existing user-data is backed-up
- and there is no /var/db/user-data left.
+ User-data in the SmartOS world is supposed to be written to a file
+ each and every boot. This tests to make sure that in the event the
+ legacy user-data is removed, the existing user-data is backed-up
+ and there is no /var/db/user-data left.
"""
user_data_f = "%s/mdata-user-data" % self.legacy_user_d
- with open(user_data_f, 'w') as f:
+ with open(user_data_f, "w") as f:
f.write("PREVIOUS")
my_returns = MOCK_RETURNS.copy()
- del my_returns['user-data']
+ del my_returns["user-data"]
dsrc = self._get_ds(mockdata=my_returns)
ret = dsrc.get_data()
self.assertTrue(ret)
- self.assertFalse(dsrc.metadata.get('legacy-user-data'))
+ self.assertFalse(dsrc.metadata.get("legacy-user-data"))
found_new = False
for root, _dirs, files in os.walk(self.legacy_user_d):
for name in files:
name_f = os.path.join(root, name)
permissions = oct(os.stat(name_f)[stat.ST_MODE])[-3:]
- if re.match(r'.*\/mdata-user-data$', name_f):
+ if re.match(r".*\/mdata-user-data$", name_f):
found_new = True
print(name_f)
- self.assertEqual(permissions, '400')
+ self.assertEqual(permissions, "400")
self.assertFalse(found_new)
@@ -587,17 +636,18 @@ class TestSmartOSDataSource(FilesystemMockingTestCase):
dsrc = self._get_ds(mockdata=MOCK_RETURNS)
ret = dsrc.get_data()
self.assertTrue(ret)
- self.assertEqual(MOCK_RETURNS['sdc:vendor-data'],
- dsrc.metadata['vendor-data'])
+ self.assertEqual(
+ MOCK_RETURNS["sdc:vendor-data"], dsrc.metadata["vendor-data"]
+ )
def test_default_vendor_data(self):
my_returns = MOCK_RETURNS.copy()
- def_op_script = my_returns['sdc:vendor-data']
- del my_returns['sdc:vendor-data']
+ def_op_script = my_returns["sdc:vendor-data"]
+ del my_returns["sdc:vendor-data"]
dsrc = self._get_ds(mockdata=my_returns)
ret = dsrc.get_data()
self.assertTrue(ret)
- self.assertNotEqual(def_op_script, dsrc.metadata['vendor-data'])
+ self.assertNotEqual(def_op_script, dsrc.metadata["vendor-data"])
# we expect default vendor-data is a boothook
self.assertTrue(dsrc.vendordata_raw.startswith("#cloud-boothook"))
@@ -606,15 +656,19 @@ class TestSmartOSDataSource(FilesystemMockingTestCase):
dsrc = self._get_ds(mockdata=MOCK_RETURNS)
ret = dsrc.get_data()
self.assertTrue(ret)
- self.assertEqual(MOCK_RETURNS['disable_iptables_flag'],
- dsrc.metadata['iptables_disable'])
+ self.assertEqual(
+ MOCK_RETURNS["disable_iptables_flag"],
+ dsrc.metadata["iptables_disable"],
+ )
def test_motd_sys_info(self):
dsrc = self._get_ds(mockdata=MOCK_RETURNS)
ret = dsrc.get_data()
self.assertTrue(ret)
- self.assertEqual(MOCK_RETURNS['enable_motd_sys_info'],
- dsrc.metadata['motd_sys_info'])
+ self.assertEqual(
+ MOCK_RETURNS["enable_motd_sys_info"],
+ dsrc.metadata["motd_sys_info"],
+ )
def test_default_ephemeral(self):
# Test to make sure that the builtin config has the ephemeral
@@ -625,16 +679,16 @@ class TestSmartOSDataSource(FilesystemMockingTestCase):
ret = dsrc.get_data()
self.assertTrue(ret)
- assert 'disk_setup' in cfg
- assert 'fs_setup' in cfg
- self.assertIsInstance(cfg['disk_setup'], dict)
- self.assertIsInstance(cfg['fs_setup'], list)
+ assert "disk_setup" in cfg
+ assert "fs_setup" in cfg
+ self.assertIsInstance(cfg["disk_setup"], dict)
+ self.assertIsInstance(cfg["fs_setup"], list)
def test_override_disk_aliases(self):
# Test to make sure that the built-in DS is overriden
builtin = DataSourceSmartOS.BUILTIN_DS_CONFIG
- mydscfg = {'disk_aliases': {'FOO': '/dev/bar'}}
+ mydscfg = {"disk_aliases": {"FOO": "/dev/bar"}}
# expect that these values are in builtin, or this is pointless
for k in mydscfg:
@@ -644,25 +698,30 @@ class TestSmartOSDataSource(FilesystemMockingTestCase):
ret = dsrc.get_data()
self.assertTrue(ret)
- self.assertEqual(mydscfg['disk_aliases']['FOO'],
- dsrc.ds_cfg['disk_aliases']['FOO'])
+ self.assertEqual(
+ mydscfg["disk_aliases"]["FOO"], dsrc.ds_cfg["disk_aliases"]["FOO"]
+ )
- self.assertEqual(dsrc.device_name_to_device('FOO'),
- mydscfg['disk_aliases']['FOO'])
+ self.assertEqual(
+ dsrc.device_name_to_device("FOO"), mydscfg["disk_aliases"]["FOO"]
+ )
def test_reconfig_network_on_boot(self):
# Test to ensure that network is configured from metadata on each boot
dsrc = self._get_ds(mockdata=MOCK_RETURNS)
self.assertSetEqual(
- {EventType.BOOT_NEW_INSTANCE,
- EventType.BOOT,
- EventType.BOOT_LEGACY},
- dsrc.default_update_events[EventScope.NETWORK]
+ {
+ EventType.BOOT_NEW_INSTANCE,
+ EventType.BOOT,
+ EventType.BOOT_LEGACY,
+ },
+ dsrc.default_update_events[EventScope.NETWORK],
)
class TestIdentifyFile(CiTestCase):
"""Test the 'identify_file' utility."""
+
@skipIf(not which("file"), "command 'file' not available.")
def test_file_happy_path(self):
"""Test file is available and functional on plain text."""
@@ -680,14 +739,16 @@ class TestIdentifyFile(CiTestCase):
self.assertEqual(None, identify_file(fname))
self.assertEqual(
[mock.call(["file", "--brief", "--mime-type", fname])],
- m_subp.call_args_list)
+ m_subp.call_args_list,
+ )
class ShortReader(object):
"""Implements a 'read' interface for bytes provided.
much like io.BytesIO but the 'endbyte' acts as if EOF.
When it is reached a short will be returned."""
- def __init__(self, initial_bytes, endbyte=b'\0'):
+
+ def __init__(self, initial_bytes, endbyte=b"\0"):
self.data = initial_bytes
self.index = 0
self.len = len(self.data)
@@ -700,7 +761,7 @@ class ShortReader(object):
def read(self, size=-1):
"""Read size bytes but not past a null."""
if size == 0 or self.index >= self.len:
- return b''
+ return b""
rsize = size
if size < 0 or size + self.index > self.len:
@@ -711,7 +772,7 @@ class ShortReader(object):
rsize = next_null - self.index + 1
i = self.index
self.index += rsize
- ret = self.data[i:i + rsize]
+ ret = self.data[i : i + rsize]
if len(ret) and ret[-1:] == self.endbyte:
ret = ret[:-1]
return ret
@@ -719,32 +780,34 @@ class ShortReader(object):
class TestJoyentMetadataClient(FilesystemMockingTestCase):
- invalid = b'invalid command\n'
- failure = b'FAILURE\n'
- v2_ok = b'V2_OK\n'
+ invalid = b"invalid command\n"
+ failure = b"FAILURE\n"
+ v2_ok = b"V2_OK\n"
def setUp(self):
super(TestJoyentMetadataClient, self).setUp()
self.serial = mock.MagicMock(spec=serial.Serial)
- self.request_id = 0xabcdef12
- self.metadata_value = 'value'
+ self.request_id = 0xABCDEF12
+ self.metadata_value = "value"
self.response_parts = {
- 'command': 'SUCCESS',
- 'crc': 'b5a9ff00',
- 'length': SUCCESS_LEN + len(b64e(self.metadata_value)),
- 'payload': b64e(self.metadata_value),
- 'request_id': '{0:08x}'.format(self.request_id),
+ "command": "SUCCESS",
+ "crc": "b5a9ff00",
+ "length": SUCCESS_LEN + len(b64e(self.metadata_value)),
+ "payload": b64e(self.metadata_value),
+ "request_id": "{0:08x}".format(self.request_id),
}
def make_response():
- payloadstr = ''
- if 'payload' in self.response_parts:
- payloadstr = ' {0}'.format(self.response_parts['payload'])
- return ('V2 {length} {crc} {request_id} '
- '{command}{payloadstr}\n'.format(
- payloadstr=payloadstr,
- **self.response_parts).encode('ascii'))
+ payloadstr = ""
+ if "payload" in self.response_parts:
+ payloadstr = " {0}".format(self.response_parts["payload"])
+ return (
+ "V2 {length} {crc} {request_id} "
+ "{command}{payloadstr}\n".format(
+ payloadstr=payloadstr, **self.response_parts
+ ).encode("ascii")
+ )
self.metasource_data = None
@@ -758,41 +821,49 @@ class TestJoyentMetadataClient(FilesystemMockingTestCase):
self.serial.read.side_effect = read_response
self.patched_funcs.enter_context(
- mock.patch('cloudinit.sources.DataSourceSmartOS.random.randint',
- mock.Mock(return_value=self.request_id)))
+ mock.patch(
+ "cloudinit.sources.DataSourceSmartOS.random.randint",
+ mock.Mock(return_value=self.request_id),
+ )
+ )
def _get_client(self):
return DataSourceSmartOS.JoyentMetadataClient(
- fp=self.serial, smartos_type=DataSourceSmartOS.SMARTOS_ENV_KVM)
+ fp=self.serial, smartos_type=DataSourceSmartOS.SMARTOS_ENV_KVM
+ )
def _get_serial_client(self):
self.serial.timeout = 1
- return DataSourceSmartOS.JoyentMetadataSerialClient(None,
- fp=self.serial)
+ return DataSourceSmartOS.JoyentMetadataSerialClient(
+ None, fp=self.serial
+ )
def assertEndsWith(self, haystack, prefix):
- self.assertTrue(haystack.endswith(prefix),
- "{0} does not end with '{1}'".format(
- repr(haystack), prefix))
+ self.assertTrue(
+ haystack.endswith(prefix),
+ "{0} does not end with '{1}'".format(repr(haystack), prefix),
+ )
def assertStartsWith(self, haystack, prefix):
- self.assertTrue(haystack.startswith(prefix),
- "{0} does not start with '{1}'".format(
- repr(haystack), prefix))
+ self.assertTrue(
+ haystack.startswith(prefix),
+ "{0} does not start with '{1}'".format(repr(haystack), prefix),
+ )
def assertNoMoreSideEffects(self, obj):
self.assertRaises(StopIteration, obj)
def test_get_metadata_writes_a_single_line(self):
client = self._get_client()
- client.get('some_key')
+ client.get("some_key")
self.assertEqual(1, self.serial.write.call_count)
written_line = self.serial.write.call_args[0][0]
- self.assertEndsWith(written_line.decode('ascii'),
- b'\n'.decode('ascii'))
- self.assertEqual(1, written_line.count(b'\n'))
+ self.assertEndsWith(
+ written_line.decode("ascii"), b"\n".decode("ascii")
+ )
+ self.assertEqual(1, written_line.count(b"\n"))
- def _get_written_line(self, key='some_key'):
+ def _get_written_line(self, key="some_key"):
client = self._get_client()
client.get(key)
return self.serial.write.call_args[0][0]
@@ -802,76 +873,86 @@ class TestJoyentMetadataClient(FilesystemMockingTestCase):
def test_get_metadata_line_starts_with_v2(self):
foo = self._get_written_line()
- self.assertStartsWith(foo.decode('ascii'), b'V2'.decode('ascii'))
+ self.assertStartsWith(foo.decode("ascii"), b"V2".decode("ascii"))
def test_get_metadata_uses_get_command(self):
- parts = self._get_written_line().decode('ascii').strip().split(' ')
- self.assertEqual('GET', parts[4])
+ parts = self._get_written_line().decode("ascii").strip().split(" ")
+ self.assertEqual("GET", parts[4])
def test_get_metadata_base64_encodes_argument(self):
- key = 'my_key'
- parts = self._get_written_line(key).decode('ascii').strip().split(' ')
+ key = "my_key"
+ parts = self._get_written_line(key).decode("ascii").strip().split(" ")
self.assertEqual(b64e(key), parts[5])
def test_get_metadata_calculates_length_correctly(self):
- parts = self._get_written_line().decode('ascii').strip().split(' ')
- expected_length = len(' '.join(parts[3:]))
+ parts = self._get_written_line().decode("ascii").strip().split(" ")
+ expected_length = len(" ".join(parts[3:]))
self.assertEqual(expected_length, int(parts[1]))
def test_get_metadata_uses_appropriate_request_id(self):
- parts = self._get_written_line().decode('ascii').strip().split(' ')
+ parts = self._get_written_line().decode("ascii").strip().split(" ")
request_id = parts[3]
self.assertEqual(8, len(request_id))
self.assertEqual(request_id, request_id.lower())
def test_get_metadata_uses_random_number_for_request_id(self):
line = self._get_written_line()
- request_id = line.decode('ascii').strip().split(' ')[3]
- self.assertEqual('{0:08x}'.format(self.request_id), request_id)
+ request_id = line.decode("ascii").strip().split(" ")[3]
+ self.assertEqual("{0:08x}".format(self.request_id), request_id)
def test_get_metadata_checksums_correctly(self):
- parts = self._get_written_line().decode('ascii').strip().split(' ')
- expected_checksum = '{0:08x}'.format(
- crc32(' '.join(parts[3:]).encode('utf-8')) & 0xffffffff)
+ parts = self._get_written_line().decode("ascii").strip().split(" ")
+ expected_checksum = "{0:08x}".format(
+ crc32(" ".join(parts[3:]).encode("utf-8")) & 0xFFFFFFFF
+ )
checksum = parts[2]
self.assertEqual(expected_checksum, checksum)
def test_get_metadata_reads_a_line(self):
client = self._get_client()
- client.get('some_key')
+ client.get("some_key")
self.assertEqual(self.metasource_data_len, self.serial.read.call_count)
def test_get_metadata_returns_valid_value(self):
client = self._get_client()
- value = client.get('some_key')
+ value = client.get("some_key")
self.assertEqual(self.metadata_value, value)
def test_get_metadata_throws_exception_for_incorrect_length(self):
- self.response_parts['length'] = 0
+ self.response_parts["length"] = 0
client = self._get_client()
- self.assertRaises(DataSourceSmartOS.JoyentMetadataFetchException,
- client.get, 'some_key')
+ self.assertRaises(
+ DataSourceSmartOS.JoyentMetadataFetchException,
+ client.get,
+ "some_key",
+ )
def test_get_metadata_throws_exception_for_incorrect_crc(self):
- self.response_parts['crc'] = 'deadbeef'
+ self.response_parts["crc"] = "deadbeef"
client = self._get_client()
- self.assertRaises(DataSourceSmartOS.JoyentMetadataFetchException,
- client.get, 'some_key')
+ self.assertRaises(
+ DataSourceSmartOS.JoyentMetadataFetchException,
+ client.get,
+ "some_key",
+ )
def test_get_metadata_throws_exception_for_request_id_mismatch(self):
- self.response_parts['request_id'] = 'deadbeef'
+ self.response_parts["request_id"] = "deadbeef"
client = self._get_client()
- client._checksum = lambda _: self.response_parts['crc']
- self.assertRaises(DataSourceSmartOS.JoyentMetadataFetchException,
- client.get, 'some_key')
+ client._checksum = lambda _: self.response_parts["crc"]
+ self.assertRaises(
+ DataSourceSmartOS.JoyentMetadataFetchException,
+ client.get,
+ "some_key",
+ )
def test_get_metadata_returns_None_if_value_not_found(self):
- self.response_parts['payload'] = ''
- self.response_parts['command'] = 'NOTFOUND'
- self.response_parts['length'] = NOTFOUND_LEN
+ self.response_parts["payload"] = ""
+ self.response_parts["command"] = "NOTFOUND"
+ self.response_parts["length"] = NOTFOUND_LEN
client = self._get_client()
- client._checksum = lambda _: self.response_parts['crc']
- self.assertIsNone(client.get('some_key'))
+ client._checksum = lambda _: self.response_parts["crc"]
+ self.assertIsNone(client.get("some_key"))
def test_negotiate(self):
client = self._get_client()
@@ -883,55 +964,58 @@ class TestJoyentMetadataClient(FilesystemMockingTestCase):
def test_negotiate_short_response(self):
client = self._get_client()
# chopped '\n' from v2_ok.
- reader = ShortReader(self.v2_ok[:-1] + b'\0')
+ reader = ShortReader(self.v2_ok[:-1] + b"\0")
client.fp.read.side_effect = reader.read
- self.assertRaises(DataSourceSmartOS.JoyentMetadataTimeoutException,
- client._negotiate)
+ self.assertRaises(
+ DataSourceSmartOS.JoyentMetadataTimeoutException, client._negotiate
+ )
self.assertTrue(reader.emptied)
def test_negotiate_bad_response(self):
client = self._get_client()
- reader = ShortReader(b'garbage\n' + self.v2_ok)
+ reader = ShortReader(b"garbage\n" + self.v2_ok)
client.fp.read.side_effect = reader.read
- self.assertRaises(DataSourceSmartOS.JoyentMetadataFetchException,
- client._negotiate)
+ self.assertRaises(
+ DataSourceSmartOS.JoyentMetadataFetchException, client._negotiate
+ )
self.assertEqual(self.v2_ok, client.fp.read())
def test_serial_open_transport(self):
client = self._get_serial_client()
- reader = ShortReader(b'garbage\0' + self.invalid + self.v2_ok)
+ reader = ShortReader(b"garbage\0" + self.invalid + self.v2_ok)
client.fp.read.side_effect = reader.read
client.open_transport()
self.assertTrue(reader.emptied)
def test_flush_failure(self):
client = self._get_serial_client()
- reader = ShortReader(b'garbage' + b'\0' + self.failure +
- self.invalid + self.v2_ok)
+ reader = ShortReader(
+ b"garbage" + b"\0" + self.failure + self.invalid + self.v2_ok
+ )
client.fp.read.side_effect = reader.read
client.open_transport()
self.assertTrue(reader.emptied)
def test_flush_many_timeouts(self):
client = self._get_serial_client()
- reader = ShortReader(b'\0' * 100 + self.invalid + self.v2_ok)
+ reader = ShortReader(b"\0" * 100 + self.invalid + self.v2_ok)
client.fp.read.side_effect = reader.read
client.open_transport()
self.assertTrue(reader.emptied)
def test_list_metadata_returns_list(self):
- parts = ['foo', 'bar']
- value = b64e('\n'.join(parts))
- self.response_parts['payload'] = value
- self.response_parts['crc'] = '40873553'
- self.response_parts['length'] = SUCCESS_LEN + len(value)
+ parts = ["foo", "bar"]
+ value = b64e("\n".join(parts))
+ self.response_parts["payload"] = value
+ self.response_parts["crc"] = "40873553"
+ self.response_parts["length"] = SUCCESS_LEN + len(value)
client = self._get_client()
self.assertEqual(client.list(), parts)
def test_list_metadata_returns_empty_list_if_no_customer_metadata(self):
- del self.response_parts['payload']
- self.response_parts['length'] = SUCCESS_LEN - 1
- self.response_parts['crc'] = '14e563ba'
+ del self.response_parts["payload"]
+ self.response_parts["length"] = SUCCESS_LEN - 1
+ self.response_parts["crc"] = "14e563ba"
client = self._get_client()
self.assertEqual(client.list(), [])
@@ -939,181 +1023,354 @@ class TestJoyentMetadataClient(FilesystemMockingTestCase):
class TestNetworkConversion(CiTestCase):
def test_convert_simple(self):
expected = {
- 'version': 1,
- 'config': [
- {'name': 'net0', 'type': 'physical',
- 'subnets': [{'type': 'static', 'gateway': '8.12.42.1',
- 'address': '8.12.42.102/24'}],
- 'mtu': 1500, 'mac_address': '90:b8:d0:f5:e4:f5'},
- {'name': 'net1', 'type': 'physical',
- 'subnets': [{'type': 'static',
- 'address': '192.168.128.93/22'}],
- 'mtu': 8500, 'mac_address': '90:b8:d0:a5:ff:cd'}]}
+ "version": 1,
+ "config": [
+ {
+ "name": "net0",
+ "type": "physical",
+ "subnets": [
+ {
+ "type": "static",
+ "gateway": "8.12.42.1",
+ "address": "8.12.42.102/24",
+ }
+ ],
+ "mtu": 1500,
+ "mac_address": "90:b8:d0:f5:e4:f5",
+ },
+ {
+ "name": "net1",
+ "type": "physical",
+ "subnets": [
+ {"type": "static", "address": "192.168.128.93/22"}
+ ],
+ "mtu": 8500,
+ "mac_address": "90:b8:d0:a5:ff:cd",
+ },
+ ],
+ }
found = convert_net(SDC_NICS)
self.assertEqual(expected, found)
def test_convert_simple_alt(self):
expected = {
- 'version': 1,
- 'config': [
- {'name': 'net0', 'type': 'physical',
- 'subnets': [{'type': 'static', 'gateway': '8.12.42.1',
- 'address': '8.12.42.51/24'}],
- 'mtu': 1500, 'mac_address': '90:b8:d0:ae:64:51'},
- {'name': 'net1', 'type': 'physical',
- 'subnets': [{'type': 'static',
- 'address': '10.210.1.217/24'}],
- 'mtu': 1500, 'mac_address': '90:b8:d0:bd:4f:9c'}]}
+ "version": 1,
+ "config": [
+ {
+ "name": "net0",
+ "type": "physical",
+ "subnets": [
+ {
+ "type": "static",
+ "gateway": "8.12.42.1",
+ "address": "8.12.42.51/24",
+ }
+ ],
+ "mtu": 1500,
+ "mac_address": "90:b8:d0:ae:64:51",
+ },
+ {
+ "name": "net1",
+ "type": "physical",
+ "subnets": [
+ {"type": "static", "address": "10.210.1.217/24"}
+ ],
+ "mtu": 1500,
+ "mac_address": "90:b8:d0:bd:4f:9c",
+ },
+ ],
+ }
found = convert_net(SDC_NICS_ALT)
self.assertEqual(expected, found)
def test_convert_simple_dhcp(self):
expected = {
- 'version': 1,
- 'config': [
- {'name': 'net0', 'type': 'physical',
- 'subnets': [{'type': 'static', 'gateway': '8.12.42.1',
- 'address': '8.12.42.51/24'}],
- 'mtu': 1500, 'mac_address': '90:b8:d0:ae:64:51'},
- {'name': 'net1', 'type': 'physical',
- 'subnets': [{'type': 'dhcp4'}],
- 'mtu': 1500, 'mac_address': '90:b8:d0:bd:4f:9c'}]}
+ "version": 1,
+ "config": [
+ {
+ "name": "net0",
+ "type": "physical",
+ "subnets": [
+ {
+ "type": "static",
+ "gateway": "8.12.42.1",
+ "address": "8.12.42.51/24",
+ }
+ ],
+ "mtu": 1500,
+ "mac_address": "90:b8:d0:ae:64:51",
+ },
+ {
+ "name": "net1",
+ "type": "physical",
+ "subnets": [{"type": "dhcp4"}],
+ "mtu": 1500,
+ "mac_address": "90:b8:d0:bd:4f:9c",
+ },
+ ],
+ }
found = convert_net(SDC_NICS_DHCP)
self.assertEqual(expected, found)
def test_convert_simple_multi_ip(self):
expected = {
- 'version': 1,
- 'config': [
- {'name': 'net0', 'type': 'physical',
- 'subnets': [{'type': 'static', 'gateway': '8.12.42.1',
- 'address': '8.12.42.51/24'},
- {'type': 'static',
- 'address': '8.12.42.52/24'}],
- 'mtu': 1500, 'mac_address': '90:b8:d0:ae:64:51'},
- {'name': 'net1', 'type': 'physical',
- 'subnets': [{'type': 'static',
- 'address': '10.210.1.217/24'},
- {'type': 'static',
- 'address': '10.210.1.151/24'}],
- 'mtu': 1500, 'mac_address': '90:b8:d0:bd:4f:9c'}]}
+ "version": 1,
+ "config": [
+ {
+ "name": "net0",
+ "type": "physical",
+ "subnets": [
+ {
+ "type": "static",
+ "gateway": "8.12.42.1",
+ "address": "8.12.42.51/24",
+ },
+ {"type": "static", "address": "8.12.42.52/24"},
+ ],
+ "mtu": 1500,
+ "mac_address": "90:b8:d0:ae:64:51",
+ },
+ {
+ "name": "net1",
+ "type": "physical",
+ "subnets": [
+ {"type": "static", "address": "10.210.1.217/24"},
+ {"type": "static", "address": "10.210.1.151/24"},
+ ],
+ "mtu": 1500,
+ "mac_address": "90:b8:d0:bd:4f:9c",
+ },
+ ],
+ }
found = convert_net(SDC_NICS_MIP)
self.assertEqual(expected, found)
def test_convert_with_dns(self):
expected = {
- 'version': 1,
- 'config': [
- {'name': 'net0', 'type': 'physical',
- 'subnets': [{'type': 'static', 'gateway': '8.12.42.1',
- 'address': '8.12.42.51/24'}],
- 'mtu': 1500, 'mac_address': '90:b8:d0:ae:64:51'},
- {'name': 'net1', 'type': 'physical',
- 'subnets': [{'type': 'dhcp4'}],
- 'mtu': 1500, 'mac_address': '90:b8:d0:bd:4f:9c'},
- {'type': 'nameserver',
- 'address': ['8.8.8.8', '8.8.8.1'], 'search': ["local"]}]}
+ "version": 1,
+ "config": [
+ {
+ "name": "net0",
+ "type": "physical",
+ "subnets": [
+ {
+ "type": "static",
+ "gateway": "8.12.42.1",
+ "address": "8.12.42.51/24",
+ }
+ ],
+ "mtu": 1500,
+ "mac_address": "90:b8:d0:ae:64:51",
+ },
+ {
+ "name": "net1",
+ "type": "physical",
+ "subnets": [{"type": "dhcp4"}],
+ "mtu": 1500,
+ "mac_address": "90:b8:d0:bd:4f:9c",
+ },
+ {
+ "type": "nameserver",
+ "address": ["8.8.8.8", "8.8.8.1"],
+ "search": ["local"],
+ },
+ ],
+ }
found = convert_net(
- network_data=SDC_NICS_DHCP, dns_servers=['8.8.8.8', '8.8.8.1'],
- dns_domain="local")
+ network_data=SDC_NICS_DHCP,
+ dns_servers=["8.8.8.8", "8.8.8.1"],
+ dns_domain="local",
+ )
self.assertEqual(expected, found)
def test_convert_simple_multi_ipv6(self):
expected = {
- 'version': 1,
- 'config': [
- {'name': 'net0', 'type': 'physical',
- 'subnets': [{'type': 'static', 'address':
- '2001:4800:78ff:1b:be76:4eff:fe06:96b3/64'},
- {'type': 'static', 'gateway': '8.12.42.1',
- 'address': '8.12.42.51/24'}],
- 'mtu': 1500, 'mac_address': '90:b8:d0:ae:64:51'},
- {'name': 'net1', 'type': 'physical',
- 'subnets': [{'type': 'static',
- 'address': '10.210.1.217/24'}],
- 'mtu': 1500, 'mac_address': '90:b8:d0:bd:4f:9c'}]}
+ "version": 1,
+ "config": [
+ {
+ "name": "net0",
+ "type": "physical",
+ "subnets": [
+ {
+ "type": "static",
+ "address": (
+ "2001:4800:78ff:1b:be76:4eff:fe06:96b3/64"
+ ),
+ },
+ {
+ "type": "static",
+ "gateway": "8.12.42.1",
+ "address": "8.12.42.51/24",
+ },
+ ],
+ "mtu": 1500,
+ "mac_address": "90:b8:d0:ae:64:51",
+ },
+ {
+ "name": "net1",
+ "type": "physical",
+ "subnets": [
+ {"type": "static", "address": "10.210.1.217/24"}
+ ],
+ "mtu": 1500,
+ "mac_address": "90:b8:d0:bd:4f:9c",
+ },
+ ],
+ }
found = convert_net(SDC_NICS_MIP_IPV6)
self.assertEqual(expected, found)
def test_convert_simple_both_ipv4_ipv6(self):
expected = {
- 'version': 1,
- 'config': [
- {'mac_address': '90:b8:d0:ae:64:51', 'mtu': 1500,
- 'name': 'net0', 'type': 'physical',
- 'subnets': [{'address': '2001::10/64', 'gateway': '2001::1',
- 'type': 'static'},
- {'address': '8.12.42.51/24',
- 'gateway': '8.12.42.1',
- 'type': 'static'},
- {'address': '2001::11/64', 'type': 'static'},
- {'address': '8.12.42.52/32', 'type': 'static'}]},
- {'mac_address': '90:b8:d0:bd:4f:9c', 'mtu': 1500,
- 'name': 'net1', 'type': 'physical',
- 'subnets': [{'address': '10.210.1.217/24',
- 'type': 'static'}]}]}
+ "version": 1,
+ "config": [
+ {
+ "mac_address": "90:b8:d0:ae:64:51",
+ "mtu": 1500,
+ "name": "net0",
+ "type": "physical",
+ "subnets": [
+ {
+ "address": "2001::10/64",
+ "gateway": "2001::1",
+ "type": "static",
+ },
+ {
+ "address": "8.12.42.51/24",
+ "gateway": "8.12.42.1",
+ "type": "static",
+ },
+ {"address": "2001::11/64", "type": "static"},
+ {"address": "8.12.42.52/32", "type": "static"},
+ ],
+ },
+ {
+ "mac_address": "90:b8:d0:bd:4f:9c",
+ "mtu": 1500,
+ "name": "net1",
+ "type": "physical",
+ "subnets": [
+ {"address": "10.210.1.217/24", "type": "static"}
+ ],
+ },
+ ],
+ }
found = convert_net(SDC_NICS_IPV4_IPV6)
self.assertEqual(expected, found)
def test_gateways_not_on_all_nics(self):
expected = {
- 'version': 1,
- 'config': [
- {'mac_address': '90:b8:d0:d8:82:b4', 'mtu': 1500,
- 'name': 'net0', 'type': 'physical',
- 'subnets': [{'address': '8.12.42.26/24',
- 'gateway': '8.12.42.1', 'type': 'static'}]},
- {'mac_address': '90:b8:d0:0a:51:31', 'mtu': 1500,
- 'name': 'net1', 'type': 'physical',
- 'subnets': [{'address': '10.210.1.27/24',
- 'type': 'static'}]}]}
+ "version": 1,
+ "config": [
+ {
+ "mac_address": "90:b8:d0:d8:82:b4",
+ "mtu": 1500,
+ "name": "net0",
+ "type": "physical",
+ "subnets": [
+ {
+ "address": "8.12.42.26/24",
+ "gateway": "8.12.42.1",
+ "type": "static",
+ }
+ ],
+ },
+ {
+ "mac_address": "90:b8:d0:0a:51:31",
+ "mtu": 1500,
+ "name": "net1",
+ "type": "physical",
+ "subnets": [
+ {"address": "10.210.1.27/24", "type": "static"}
+ ],
+ },
+ ],
+ }
found = convert_net(SDC_NICS_SINGLE_GATEWAY)
self.assertEqual(expected, found)
def test_routes_on_all_nics(self):
routes = [
- {'linklocal': False, 'dst': '3.0.0.0/8', 'gateway': '8.12.42.3'},
- {'linklocal': False, 'dst': '4.0.0.0/8', 'gateway': '10.210.1.4'}]
+ {"linklocal": False, "dst": "3.0.0.0/8", "gateway": "8.12.42.3"},
+ {"linklocal": False, "dst": "4.0.0.0/8", "gateway": "10.210.1.4"},
+ ]
expected = {
- 'version': 1,
- 'config': [
- {'mac_address': '90:b8:d0:d8:82:b4', 'mtu': 1500,
- 'name': 'net0', 'type': 'physical',
- 'subnets': [{'address': '8.12.42.26/24',
- 'gateway': '8.12.42.1', 'type': 'static',
- 'routes': [{'network': '3.0.0.0/8',
- 'gateway': '8.12.42.3'},
- {'network': '4.0.0.0/8',
- 'gateway': '10.210.1.4'}]}]},
- {'mac_address': '90:b8:d0:0a:51:31', 'mtu': 1500,
- 'name': 'net1', 'type': 'physical',
- 'subnets': [{'address': '10.210.1.27/24', 'type': 'static',
- 'routes': [{'network': '3.0.0.0/8',
- 'gateway': '8.12.42.3'},
- {'network': '4.0.0.0/8',
- 'gateway': '10.210.1.4'}]}]}]}
+ "version": 1,
+ "config": [
+ {
+ "mac_address": "90:b8:d0:d8:82:b4",
+ "mtu": 1500,
+ "name": "net0",
+ "type": "physical",
+ "subnets": [
+ {
+ "address": "8.12.42.26/24",
+ "gateway": "8.12.42.1",
+ "type": "static",
+ "routes": [
+ {
+ "network": "3.0.0.0/8",
+ "gateway": "8.12.42.3",
+ },
+ {
+ "network": "4.0.0.0/8",
+ "gateway": "10.210.1.4",
+ },
+ ],
+ }
+ ],
+ },
+ {
+ "mac_address": "90:b8:d0:0a:51:31",
+ "mtu": 1500,
+ "name": "net1",
+ "type": "physical",
+ "subnets": [
+ {
+ "address": "10.210.1.27/24",
+ "type": "static",
+ "routes": [
+ {
+ "network": "3.0.0.0/8",
+ "gateway": "8.12.42.3",
+ },
+ {
+ "network": "4.0.0.0/8",
+ "gateway": "10.210.1.4",
+ },
+ ],
+ }
+ ],
+ },
+ ],
+ }
found = convert_net(SDC_NICS_SINGLE_GATEWAY, routes=routes)
self.maxDiff = None
self.assertEqual(expected, found)
-@unittest.skipUnless(get_smartos_environ() == SMARTOS_ENV_KVM,
- "Only supported on KVM and bhyve guests under SmartOS")
-@unittest.skipUnless(os.access(SERIAL_DEVICE, os.W_OK),
- "Requires write access to " + SERIAL_DEVICE)
+@unittest.skipUnless(
+ get_smartos_environ() == SMARTOS_ENV_KVM,
+ "Only supported on KVM and bhyve guests under SmartOS",
+)
+@unittest.skipUnless(
+ os.access(SERIAL_DEVICE, os.W_OK),
+ "Requires write access to " + SERIAL_DEVICE,
+)
@unittest.skipUnless(HAS_PYSERIAL is True, "pyserial not available")
class TestSerialConcurrency(CiTestCase):
"""
- This class tests locking on an actual serial port, and as such can only
- be run in a kvm or bhyve guest running on a SmartOS host. A test run on
- a metadata socket will not be valid because a metadata socket ensures
- there is only one session over a connection. In contrast, in the
- absence of proper locking multiple processes opening the same serial
- port can corrupt each others' exchanges with the metadata server.
-
- This takes on the order of 2 to 3 minutes to run.
+ This class tests locking on an actual serial port, and as such can only
+ be run in a kvm or bhyve guest running on a SmartOS host. A test run on
+ a metadata socket will not be valid because a metadata socket ensures
+ there is only one session over a connection. In contrast, in the
+ absence of proper locking multiple processes opening the same serial
+ port can corrupt each others' exchanges with the metadata server.
+
+ This takes on the order of 2 to 3 minutes to run.
"""
- allowed_subp = ['mdata-get']
+
+ allowed_subp = ["mdata-get"]
def setUp(self):
self.mdata_proc = multiprocessing.Process(target=self.start_mdata_loop)
@@ -1128,16 +1385,16 @@ class TestSerialConcurrency(CiTestCase):
def start_mdata_loop(self):
"""
- The mdata-get command is repeatedly run in a separate process so
- that it may try to race with metadata operations performed in the
- main test process. Use of mdata-get is better than two processes
- using the protocol implementation in DataSourceSmartOS because we
- are testing to be sure that cloud-init and mdata-get respect each
- others locks.
+ The mdata-get command is repeatedly run in a separate process so
+ that it may try to race with metadata operations performed in the
+ main test process. Use of mdata-get is better than two processes
+ using the protocol implementation in DataSourceSmartOS because we
+ are testing to be sure that cloud-init and mdata-get respect each
+ others locks.
"""
rcs = list(range(0, 256))
while True:
- subp(['mdata-get', 'sdc:routes'], rcs=rcs)
+ subp(["mdata-get", "sdc:routes"], rcs=rcs)
def test_all_keys(self):
self.assertIsNotNone(self.mdata_proc.pid)
@@ -1160,4 +1417,5 @@ class TestSerialConcurrency(CiTestCase):
self.assertIsNone(self.mdata_proc.exitcode)
+
# vi: ts=4 expandtab