summaryrefslogtreecommitdiff
path: root/tests/unittests/test_datasource
diff options
context:
space:
mode:
Diffstat (limited to 'tests/unittests/test_datasource')
-rw-r--r--tests/unittests/test_datasource/test_aliyun.py2
-rw-r--r--tests/unittests/test_datasource/test_azure.py362
-rw-r--r--tests/unittests/test_datasource/test_azure_helper.py117
-rw-r--r--tests/unittests/test_datasource/test_cloudsigma.py20
-rw-r--r--tests/unittests/test_datasource/test_cloudstack.py21
-rw-r--r--tests/unittests/test_datasource/test_common.py24
-rw-r--r--tests/unittests/test_datasource/test_configdrive.py98
-rw-r--r--tests/unittests/test_datasource/test_ec2.py93
-rw-r--r--tests/unittests/test_datasource/test_exoscale.py211
-rw-r--r--tests/unittests/test_datasource/test_gce.py22
-rw-r--r--tests/unittests/test_datasource/test_maas.py2
-rw-r--r--tests/unittests/test_datasource/test_nocloud.py60
-rw-r--r--tests/unittests/test_datasource/test_openstack.py8
-rw-r--r--tests/unittests/test_datasource/test_ovf.py55
-rw-r--r--tests/unittests/test_datasource/test_rbx.py208
-rw-r--r--tests/unittests/test_datasource/test_scaleway.py125
-rw-r--r--tests/unittests/test_datasource/test_smartos.py13
17 files changed, 1323 insertions, 118 deletions
diff --git a/tests/unittests/test_datasource/test_aliyun.py b/tests/unittests/test_datasource/test_aliyun.py
index e9213ca1..1e66fcdb 100644
--- a/tests/unittests/test_datasource/test_aliyun.py
+++ b/tests/unittests/test_datasource/test_aliyun.py
@@ -2,8 +2,8 @@
import functools
import httpretty
-import mock
import os
+from unittest import mock
from cloudinit import helpers
from cloudinit.sources import DataSourceAliYun as ay
diff --git a/tests/unittests/test_datasource/test_azure.py b/tests/unittests/test_datasource/test_azure.py
index 417d86a9..a809fd87 100644
--- a/tests/unittests/test_datasource/test_azure.py
+++ b/tests/unittests/test_datasource/test_azure.py
@@ -6,13 +6,13 @@ from cloudinit import url_helper
from cloudinit.sources import (
UNSET, DataSourceAzure as dsaz, InvalidMetaDataException)
from cloudinit.util import (b64e, decode_binary, load_file, write_file,
- find_freebsd_part, get_path_dev_freebsd,
- MountFailedError)
+ MountFailedError, json_dumps, load_json)
from cloudinit.version import version_string as vs
from cloudinit.tests.helpers import (
HttprettyTestCase, CiTestCase, populate_dir, mock, wrap_and_call,
- ExitStack, PY26, SkipTest)
+ ExitStack, resourceLocation)
+import copy
import crypt
import httpretty
import json
@@ -85,6 +85,25 @@ def construct_valid_ovf_env(data=None, pubkeys=None,
NETWORK_METADATA = {
+ "compute": {
+ "location": "eastus2",
+ "name": "my-hostname",
+ "offer": "UbuntuServer",
+ "osType": "Linux",
+ "placementGroupId": "",
+ "platformFaultDomain": "0",
+ "platformUpdateDomain": "0",
+ "publisher": "Canonical",
+ "resourceGroupName": "srugroup1",
+ "sku": "19.04-DAILY",
+ "subscriptionId": "12aad61c-6de4-4e53-a6c6-5aff52a83777",
+ "tags": "",
+ "version": "19.04.201906190",
+ "vmId": "ff702a6b-cb6a-4fcd-ad68-b4ce38227642",
+ "vmScaleSetName": "",
+ "vmSize": "Standard_DS1_v2",
+ "zone": ""
+ },
"network": {
"interface": [
{
@@ -111,9 +130,155 @@ NETWORK_METADATA = {
}
}
+SECONDARY_INTERFACE = {
+ "macAddress": "220D3A047598",
+ "ipv6": {
+ "ipAddress": []
+ },
+ "ipv4": {
+ "subnet": [
+ {
+ "prefix": "24",
+ "address": "10.0.1.0"
+ }
+ ],
+ "ipAddress": [
+ {
+ "privateIpAddress": "10.0.1.5",
+ }
+ ]
+ }
+}
+
MOCKPATH = 'cloudinit.sources.DataSourceAzure.'
+class TestParseNetworkConfig(CiTestCase):
+
+ maxDiff = None
+
+ def test_single_ipv4_nic_configuration(self):
+ """parse_network_config emits dhcp on single nic with ipv4"""
+ expected = {'ethernets': {
+ 'eth0': {'dhcp4': True,
+ 'dhcp4-overrides': {'route-metric': 100},
+ 'dhcp6': False,
+ 'match': {'macaddress': '00:0d:3a:04:75:98'},
+ 'set-name': 'eth0'}}, 'version': 2}
+ self.assertEqual(expected, dsaz.parse_network_config(NETWORK_METADATA))
+
+ def test_increases_route_metric_for_non_primary_nics(self):
+ """parse_network_config increases route-metric for each nic"""
+ expected = {'ethernets': {
+ 'eth0': {'dhcp4': True,
+ 'dhcp4-overrides': {'route-metric': 100},
+ 'dhcp6': False,
+ 'match': {'macaddress': '00:0d:3a:04:75:98'},
+ 'set-name': 'eth0'},
+ 'eth1': {'set-name': 'eth1',
+ 'match': {'macaddress': '22:0d:3a:04:75:98'},
+ 'dhcp6': False,
+ 'dhcp4': True,
+ 'dhcp4-overrides': {'route-metric': 200}},
+ 'eth2': {'set-name': 'eth2',
+ 'match': {'macaddress': '33:0d:3a:04:75:98'},
+ 'dhcp6': False,
+ 'dhcp4': True,
+ 'dhcp4-overrides': {'route-metric': 300}}}, 'version': 2}
+ imds_data = copy.deepcopy(NETWORK_METADATA)
+ imds_data['network']['interface'].append(SECONDARY_INTERFACE)
+ third_intf = copy.deepcopy(SECONDARY_INTERFACE)
+ third_intf['macAddress'] = third_intf['macAddress'].replace('22', '33')
+ third_intf['ipv4']['subnet'][0]['address'] = '10.0.2.0'
+ third_intf['ipv4']['ipAddress'][0]['privateIpAddress'] = '10.0.2.6'
+ imds_data['network']['interface'].append(third_intf)
+ self.assertEqual(expected, dsaz.parse_network_config(imds_data))
+
+ def test_ipv4_and_ipv6_route_metrics_match_for_nics(self):
+ """parse_network_config emits matching ipv4 and ipv6 route-metrics."""
+ expected = {'ethernets': {
+ 'eth0': {'addresses': ['10.0.0.5/24', '2001:dead:beef::2/128'],
+ 'dhcp4': True,
+ 'dhcp4-overrides': {'route-metric': 100},
+ 'dhcp6': True,
+ 'dhcp6-overrides': {'route-metric': 100},
+ 'match': {'macaddress': '00:0d:3a:04:75:98'},
+ 'set-name': 'eth0'},
+ 'eth1': {'set-name': 'eth1',
+ 'match': {'macaddress': '22:0d:3a:04:75:98'},
+ 'dhcp4': True,
+ 'dhcp6': False,
+ 'dhcp4-overrides': {'route-metric': 200}},
+ 'eth2': {'set-name': 'eth2',
+ 'match': {'macaddress': '33:0d:3a:04:75:98'},
+ 'dhcp4': True,
+ 'dhcp4-overrides': {'route-metric': 300},
+ 'dhcp6': True,
+ 'dhcp6-overrides': {'route-metric': 300}}}, 'version': 2}
+ imds_data = copy.deepcopy(NETWORK_METADATA)
+ nic1 = imds_data['network']['interface'][0]
+ nic1['ipv4']['ipAddress'].append({'privateIpAddress': '10.0.0.5'})
+
+ nic1['ipv6'] = {
+ "subnet": [{"address": "2001:dead:beef::16"}],
+ "ipAddress": [{"privateIpAddress": "2001:dead:beef::1"},
+ {"privateIpAddress": "2001:dead:beef::2"}]
+ }
+ imds_data['network']['interface'].append(SECONDARY_INTERFACE)
+ third_intf = copy.deepcopy(SECONDARY_INTERFACE)
+ third_intf['macAddress'] = third_intf['macAddress'].replace('22', '33')
+ third_intf['ipv4']['subnet'][0]['address'] = '10.0.2.0'
+ third_intf['ipv4']['ipAddress'][0]['privateIpAddress'] = '10.0.2.6'
+ third_intf['ipv6'] = {
+ "subnet": [{"prefix": "64", "address": "2001:dead:beef::2"}],
+ "ipAddress": [{"privateIpAddress": "2001:dead:beef::1"}]
+ }
+ imds_data['network']['interface'].append(third_intf)
+ self.assertEqual(expected, dsaz.parse_network_config(imds_data))
+
+ def test_ipv4_secondary_ips_will_be_static_addrs(self):
+ """parse_network_config emits primary ipv4 as dhcp others are static"""
+ expected = {'ethernets': {
+ 'eth0': {'addresses': ['10.0.0.5/24'],
+ 'dhcp4': True,
+ 'dhcp4-overrides': {'route-metric': 100},
+ 'dhcp6': True,
+ 'dhcp6-overrides': {'route-metric': 100},
+ 'match': {'macaddress': '00:0d:3a:04:75:98'},
+ 'set-name': 'eth0'}}, 'version': 2}
+ imds_data = copy.deepcopy(NETWORK_METADATA)
+ nic1 = imds_data['network']['interface'][0]
+ nic1['ipv4']['ipAddress'].append({'privateIpAddress': '10.0.0.5'})
+
+ nic1['ipv6'] = {
+ "subnet": [{"prefix": "10", "address": "2001:dead:beef::16"}],
+ "ipAddress": [{"privateIpAddress": "2001:dead:beef::1"}]
+ }
+ self.assertEqual(expected, dsaz.parse_network_config(imds_data))
+
+ def test_ipv6_secondary_ips_will_be_static_cidrs(self):
+ """parse_network_config emits primary ipv6 as dhcp others are static"""
+ expected = {'ethernets': {
+ 'eth0': {'addresses': ['10.0.0.5/24', '2001:dead:beef::2/10'],
+ 'dhcp4': True,
+ 'dhcp4-overrides': {'route-metric': 100},
+ 'dhcp6': True,
+ 'dhcp6-overrides': {'route-metric': 100},
+ 'match': {'macaddress': '00:0d:3a:04:75:98'},
+ 'set-name': 'eth0'}}, 'version': 2}
+ imds_data = copy.deepcopy(NETWORK_METADATA)
+ nic1 = imds_data['network']['interface'][0]
+ nic1['ipv4']['ipAddress'].append({'privateIpAddress': '10.0.0.5'})
+
+ # Secondary ipv6 addresses currently ignored/unconfigured
+ nic1['ipv6'] = {
+ "subnet": [{"prefix": "10", "address": "2001:dead:beef::16"}],
+ "ipAddress": [{"privateIpAddress": "2001:dead:beef::1"},
+ {"privateIpAddress": "2001:dead:beef::2"}]
+ }
+ self.assertEqual(expected, dsaz.parse_network_config(imds_data))
+
+
class TestGetMetadataFromIMDS(HttprettyTestCase):
with_logs = True
@@ -142,7 +307,7 @@ class TestGetMetadataFromIMDS(HttprettyTestCase):
self.logs.getvalue())
@mock.patch(MOCKPATH + 'readurl')
- @mock.patch(MOCKPATH + 'EphemeralDHCPv4')
+ @mock.patch(MOCKPATH + 'EphemeralDHCPv4WithReporting')
@mock.patch(MOCKPATH + 'net.is_up')
def test_get_metadata_performs_dhcp_when_network_is_down(
self, m_net_is_up, m_dhcp, m_readurl):
@@ -156,14 +321,15 @@ class TestGetMetadataFromIMDS(HttprettyTestCase):
dsaz.get_metadata_from_imds('eth9', retries=2))
m_net_is_up.assert_called_with('eth9')
- m_dhcp.assert_called_with('eth9')
+ m_dhcp.assert_called_with(mock.ANY, 'eth9')
self.assertIn(
"Crawl of Azure Instance Metadata Service (IMDS) took", # log_time
self.logs.getvalue())
m_readurl.assert_called_with(
self.network_md_url, exception_cb=mock.ANY,
- headers={'Metadata': 'true'}, retries=2, timeout=1)
+ headers={'Metadata': 'true'}, retries=2,
+ timeout=dsaz.IMDS_TIMEOUT_IN_SECONDS)
@mock.patch('cloudinit.url_helper.time.sleep')
@mock.patch(MOCKPATH + 'net.is_up')
@@ -221,8 +387,6 @@ class TestAzureDataSource(CiTestCase):
def setUp(self):
super(TestAzureDataSource, self).setUp()
- if PY26:
- raise SkipTest("Does not work on python 2.6")
self.tmp = self.tmp_dir()
# patch cloud_dir, so our 'seed_dir' is guaranteed empty
@@ -313,7 +477,7 @@ scbus-1 on xpt0 bus 0
'public-keys': [],
})
- self.instance_id = 'test-instance-id'
+ self.instance_id = 'D0DF4C54-4ECB-4A4B-9954-5BDF3ED5C3B8'
def _dmi_mocks(key):
if key == 'system-uuid':
@@ -392,29 +556,6 @@ scbus-1 on xpt0 bus 0
dev = ds.get_resource_disk_on_freebsd(1)
self.assertEqual("da1", dev)
- @mock.patch('cloudinit.util.subp')
- def test_find_freebsd_part_on_Azure(self, mock_subp):
- glabel_out = '''
-gptid/fa52d426-c337-11e6-8911-00155d4c5e47 N/A da0p1
- label/rootfs N/A da0p2
- label/swap N/A da0p3
-'''
- mock_subp.return_value = (glabel_out, "")
- res = find_freebsd_part("/dev/label/rootfs")
- self.assertEqual("da0p2", res)
-
- def test_get_path_dev_freebsd_on_Azure(self):
- mnt_list = '''
-/dev/label/rootfs / ufs rw 1 1
-devfs /dev devfs rw,multilabel 0 0
-fdescfs /dev/fd fdescfs rw 0 0
-/dev/da1s1 /mnt/resource ufs rw 2 2
-'''
- with mock.patch.object(os.path, 'exists',
- return_value=True):
- res = get_path_dev_freebsd('/etc', mnt_list)
- self.assertIsNotNone(res)
-
@mock.patch(MOCKPATH + '_is_platform_viable')
def test_call_is_platform_viable_seed(self, m_is_platform_viable):
"""Check seed_dir using _is_platform_viable and return False."""
@@ -503,14 +644,8 @@ fdescfs /dev/fd fdescfs rw 0 0
expected_metadata = {
'azure_data': {
'configurationsettype': 'LinuxProvisioningConfiguration'},
- 'imds': {'network': {'interface': [{
- 'ipv4': {'ipAddress': [
- {'privateIpAddress': '10.0.0.4',
- 'publicIpAddress': '104.46.124.81'}],
- 'subnet': [{'address': '10.0.0.0', 'prefix': '24'}]},
- 'ipv6': {'ipAddress': []},
- 'macAddress': '000D3A047598'}]}},
- 'instance-id': 'test-instance-id',
+ 'imds': NETWORK_METADATA,
+ 'instance-id': 'D0DF4C54-4ECB-4A4B-9954-5BDF3ED5C3B8',
'local-hostname': u'myhost',
'random_seed': 'wild'}
@@ -543,7 +678,8 @@ fdescfs /dev/fd fdescfs rw 0 0
dsrc.crawl_metadata()
self.assertEqual(str(cm.exception), error_msg)
- @mock.patch('cloudinit.sources.DataSourceAzure.EphemeralDHCPv4')
+ @mock.patch(
+ 'cloudinit.sources.DataSourceAzure.EphemeralDHCPv4WithReporting')
@mock.patch('cloudinit.sources.DataSourceAzure.util.write_file')
@mock.patch(
'cloudinit.sources.DataSourceAzure.DataSourceAzure._report_ready')
@@ -631,12 +767,71 @@ fdescfs /dev/fd fdescfs rw 0 0
'ethernets': {
'eth0': {'set-name': 'eth0',
'match': {'macaddress': '00:0d:3a:04:75:98'},
- 'dhcp4': True}},
+ 'dhcp6': False,
+ 'dhcp4': True,
+ 'dhcp4-overrides': {'route-metric': 100}}},
'version': 2}
dsrc = self._get_ds(data)
dsrc.get_data()
self.assertEqual(expected_network_config, dsrc.network_config)
+ def test_network_config_set_from_imds_route_metric_for_secondary_nic(self):
+ """Datasource.network_config adds route-metric to secondary nics."""
+ sys_cfg = {'datasource': {'Azure': {'apply_network_config': True}}}
+ odata = {}
+ data = {'ovfcontent': construct_valid_ovf_env(data=odata),
+ 'sys_cfg': sys_cfg}
+ expected_network_config = {
+ 'ethernets': {
+ 'eth0': {'set-name': 'eth0',
+ 'match': {'macaddress': '00:0d:3a:04:75:98'},
+ 'dhcp6': False,
+ 'dhcp4': True,
+ 'dhcp4-overrides': {'route-metric': 100}},
+ 'eth1': {'set-name': 'eth1',
+ 'match': {'macaddress': '22:0d:3a:04:75:98'},
+ 'dhcp6': False,
+ 'dhcp4': True,
+ 'dhcp4-overrides': {'route-metric': 200}},
+ 'eth2': {'set-name': 'eth2',
+ 'match': {'macaddress': '33:0d:3a:04:75:98'},
+ 'dhcp6': False,
+ 'dhcp4': True,
+ 'dhcp4-overrides': {'route-metric': 300}}},
+ 'version': 2}
+ imds_data = copy.deepcopy(NETWORK_METADATA)
+ imds_data['network']['interface'].append(SECONDARY_INTERFACE)
+ third_intf = copy.deepcopy(SECONDARY_INTERFACE)
+ third_intf['macAddress'] = third_intf['macAddress'].replace('22', '33')
+ third_intf['ipv4']['subnet'][0]['address'] = '10.0.2.0'
+ third_intf['ipv4']['ipAddress'][0]['privateIpAddress'] = '10.0.2.6'
+ imds_data['network']['interface'].append(third_intf)
+
+ self.m_get_metadata_from_imds.return_value = imds_data
+ dsrc = self._get_ds(data)
+ dsrc.get_data()
+ self.assertEqual(expected_network_config, dsrc.network_config)
+
+ def test_availability_zone_set_from_imds(self):
+ """Datasource.availability returns IMDS platformFaultDomain."""
+ sys_cfg = {'datasource': {'Azure': {'apply_network_config': True}}}
+ odata = {}
+ data = {'ovfcontent': construct_valid_ovf_env(data=odata),
+ 'sys_cfg': sys_cfg}
+ dsrc = self._get_ds(data)
+ dsrc.get_data()
+ self.assertEqual('0', dsrc.availability_zone)
+
+ def test_region_set_from_imds(self):
+ """Datasource.region returns IMDS region location."""
+ sys_cfg = {'datasource': {'Azure': {'apply_network_config': True}}}
+ odata = {}
+ data = {'ovfcontent': construct_valid_ovf_env(data=odata),
+ 'sys_cfg': sys_cfg}
+ dsrc = self._get_ds(data)
+ dsrc.get_data()
+ self.assertEqual('eastus2', dsrc.region)
+
def test_user_cfg_set_agent_command(self):
# set dscfg in via base64 encoded yaml
cfg = {'agent_command': "my_command"}
@@ -704,6 +899,22 @@ fdescfs /dev/fd fdescfs rw 0 0
crypt.crypt(odata['UserPassword'],
defuser['passwd'][0:pos]))
+ def test_user_not_locked_if_password_redacted(self):
+ odata = {'HostName': "myhost", 'UserName': "myuser",
+ 'UserPassword': dsaz.DEF_PASSWD_REDACTION}
+ data = {'ovfcontent': construct_valid_ovf_env(data=odata)}
+
+ dsrc = self._get_ds(data)
+ ret = dsrc.get_data()
+ self.assertTrue(ret)
+ self.assertTrue('default_user' in dsrc.cfg['system_info'])
+ defuser = dsrc.cfg['system_info']['default_user']
+
+ # default user should be updated username and should not be locked.
+ self.assertEqual(defuser['name'], odata['UserName'])
+ self.assertIn('lock_passwd', defuser)
+ self.assertFalse(defuser['lock_passwd'])
+
def test_userdata_plain(self):
mydata = "FOOBAR"
odata = {'UserData': {'text': mydata, 'encoding': 'plain'}}
@@ -880,6 +1091,24 @@ fdescfs /dev/fd fdescfs rw 0 0
self.assertTrue(ret)
self.assertEqual('value', dsrc.metadata['test'])
+ def test_instance_id_endianness(self):
+ """Return the previous iid when dmi uuid is the byteswapped iid."""
+ ds = self._get_ds({'ovfcontent': construct_valid_ovf_env()})
+ # byte-swapped previous
+ write_file(
+ os.path.join(self.paths.cloud_dir, 'data', 'instance-id'),
+ '544CDFD0-CB4E-4B4A-9954-5BDF3ED5C3B8')
+ ds.get_data()
+ self.assertEqual(
+ '544CDFD0-CB4E-4B4A-9954-5BDF3ED5C3B8', ds.metadata['instance-id'])
+ # not byte-swapped previous
+ write_file(
+ os.path.join(self.paths.cloud_dir, 'data', 'instance-id'),
+ '644CDFD0-CB4E-4B4A-9954-5BDF3ED5C3B8')
+ ds.get_data()
+ self.assertEqual(
+ 'D0DF4C54-4ECB-4A4B-9954-5BDF3ED5C3B8', ds.metadata['instance-id'])
+
def test_instance_id_from_dmidecode_used(self):
ds = self._get_ds({'ovfcontent': construct_valid_ovf_env()})
ds.get_data()
@@ -917,6 +1146,8 @@ fdescfs /dev/fd fdescfs rw 0 0
expected_cfg = {
'ethernets': {
'eth0': {'dhcp4': True,
+ 'dhcp4-overrides': {'route-metric': 100},
+ 'dhcp6': False,
'match': {'macaddress': '00:0d:3a:04:75:98'},
'set-name': 'eth0'}},
'version': 2}
@@ -1079,7 +1310,7 @@ class TestAzureBounce(CiTestCase):
def _dmi_mocks(key):
if key == 'system-uuid':
- return 'test-instance-id'
+ return 'D0DF4C54-4ECB-4A4B-9954-5BDF3ED5C3B8'
elif key == 'chassis-asset-tag':
return '7783-7084-3265-9085-8269-3286-77'
raise RuntimeError('should not get here')
@@ -1243,7 +1474,9 @@ class TestAzureBounce(CiTestCase):
self.assertEqual(initial_host_name,
self.set_hostname.call_args_list[-1][0][0])
- def test_environment_correct_for_bounce_command(self):
+ @mock.patch.object(dsaz, 'get_boot_telemetry')
+ def test_environment_correct_for_bounce_command(
+ self, mock_get_boot_telemetry):
interface = 'int0'
hostname = 'my-new-host'
old_hostname = 'my-old-host'
@@ -1259,7 +1492,9 @@ class TestAzureBounce(CiTestCase):
self.assertEqual(hostname, bounce_env['hostname'])
self.assertEqual(old_hostname, bounce_env['old_hostname'])
- def test_default_bounce_command_ifup_used_by_default(self):
+ @mock.patch.object(dsaz, 'get_boot_telemetry')
+ def test_default_bounce_command_ifup_used_by_default(
+ self, mock_get_boot_telemetry):
cfg = {'hostname_bounce': {'policy': 'force'}}
data = self.get_ovf_env_with_dscfg('some-hostname', cfg)
dsrc = self._get_ds(data, agent_command=['not', '__builtin__'])
@@ -1377,12 +1612,15 @@ class TestCanDevBeReformatted(CiTestCase):
self._domock(p + "util.mount_cb", 'm_mount_cb')
self._domock(p + "os.path.realpath", 'm_realpath')
self._domock(p + "os.path.exists", 'm_exists')
+ self._domock(p + "util.SeLinuxGuard", 'm_selguard')
self.m_exists.side_effect = lambda p: p in bypath
self.m_realpath.side_effect = realpath
self.m_has_ntfs_filesystem.side_effect = has_ntfs_fs
self.m_mount_cb.side_effect = mount_cb
self.m_partitions_on_device.side_effect = partitions_on_device
+ self.m_selguard.__enter__ = mock.Mock(return_value=False)
+ self.m_selguard.__exit__ = mock.Mock()
def test_three_partitions_is_false(self):
"""A disk with 3 partitions can not be formatted."""
@@ -1692,6 +1930,7 @@ class TestPreprovisioningPollIMDS(CiTestCase):
self.paths = helpers.Paths({'cloud_dir': self.tmp})
dsaz.BUILTIN_DS_CONFIG['data_dir'] = self.waagent_d
+ @mock.patch('time.sleep', mock.MagicMock())
@mock.patch(MOCKPATH + 'EphemeralDHCPv4')
def test_poll_imds_re_dhcp_on_timeout(self, m_dhcpv4, report_ready_func,
fake_resp, m_media_switch, m_dhcp,
@@ -1789,12 +2028,14 @@ class TestAzureDataSourcePreprovisioning(CiTestCase):
headers={'Metadata': 'true',
'User-Agent':
'Cloud-Init/%s' % vs()
- }, method='GET', timeout=1,
+ }, method='GET',
+ timeout=dsaz.IMDS_TIMEOUT_IN_SECONDS,
url=full_url)])
self.assertEqual(m_dhcp.call_count, 2)
m_net.assert_any_call(
broadcast='192.168.2.255', interface='eth9', ip='192.168.2.9',
- prefix_or_mask='255.255.255.0', router='192.168.2.1')
+ prefix_or_mask='255.255.255.0', router='192.168.2.1',
+ static_routes=None)
self.assertEqual(m_net.call_count, 2)
def test__reprovision_calls__poll_imds(self, fake_resp,
@@ -1826,11 +2067,14 @@ class TestAzureDataSourcePreprovisioning(CiTestCase):
headers={'Metadata': 'true',
'User-Agent':
'Cloud-Init/%s' % vs()},
- method='GET', timeout=1, url=full_url)])
+ method='GET',
+ timeout=dsaz.IMDS_TIMEOUT_IN_SECONDS,
+ url=full_url)])
self.assertEqual(m_dhcp.call_count, 2)
m_net.assert_any_call(
broadcast='192.168.2.255', interface='eth9', ip='192.168.2.9',
- prefix_or_mask='255.255.255.0', router='192.168.2.1')
+ prefix_or_mask='255.255.255.0', router='192.168.2.1',
+ static_routes=None)
self.assertEqual(m_net.call_count, 2)
@@ -1924,4 +2168,24 @@ class TestWBIsPlatformViable(CiTestCase):
self.logs.getvalue())
+class TestRandomSeed(CiTestCase):
+ """Test proper handling of random_seed"""
+
+ def test_non_ascii_seed_is_serializable(self):
+ """Pass if a random string from the Azure infrastructure which
+ contains at least one non-Unicode character can be converted to/from
+ JSON without alteration and without throwing an exception.
+ """
+ path = resourceLocation("azure/non_unicode_random_string")
+ result = dsaz._get_random_seed(path)
+
+ obj = {'seed': result}
+ try:
+ serialized = json_dumps(obj)
+ deserialized = load_json(serialized)
+ except UnicodeDecodeError:
+ self.fail("Non-serializable random seed returned")
+
+ self.assertEqual(deserialized['seed'], result)
+
# vi: ts=4 expandtab
diff --git a/tests/unittests/test_datasource/test_azure_helper.py b/tests/unittests/test_datasource/test_azure_helper.py
index 26b2b93d..007df09f 100644
--- a/tests/unittests/test_datasource/test_azure_helper.py
+++ b/tests/unittests/test_datasource/test_azure_helper.py
@@ -1,11 +1,13 @@
# This file is part of cloud-init. See LICENSE file for license information.
import os
+import unittest2
from textwrap import dedent
from cloudinit.sources.helpers import azure as azure_helper
from cloudinit.tests.helpers import CiTestCase, ExitStack, mock, populate_dir
+from cloudinit.util import load_file
from cloudinit.sources.helpers.azure import WALinuxAgentShim as wa_shim
GOAL_STATE_TEMPLATE = """\
@@ -65,12 +67,17 @@ class TestFindEndpoint(CiTestCase):
self.networkd_leases.return_value = None
def test_missing_file(self):
- self.assertRaises(ValueError, wa_shim.find_endpoint)
+ """wa_shim find_endpoint uses default endpoint if leasefile not found
+ """
+ self.assertEqual(wa_shim.find_endpoint(), "168.63.129.16")
def test_missing_special_azure_line(self):
+ """wa_shim find_endpoint uses default endpoint if leasefile is found
+ but does not contain DHCP Option 245 (whose value is the endpoint)
+ """
self.load_file.return_value = ''
self.dhcp_options.return_value = {'eth0': {'key': 'value'}}
- self.assertRaises(ValueError, wa_shim.find_endpoint)
+ self.assertEqual(wa_shim.find_endpoint(), "168.63.129.16")
@staticmethod
def _build_lease_content(encoded_address):
@@ -163,6 +170,25 @@ class TestGoalStateParsing(CiTestCase):
goal_state = self._get_goal_state(instance_id=instance_id)
self.assertEqual(instance_id, goal_state.instance_id)
+ def test_instance_id_byte_swap(self):
+ """Return true when previous_iid is byteswapped current_iid"""
+ previous_iid = "D0DF4C54-4ECB-4A4B-9954-5BDF3ED5C3B8"
+ current_iid = "544CDFD0-CB4E-4B4A-9954-5BDF3ED5C3B8"
+ self.assertTrue(
+ azure_helper.is_byte_swapped(previous_iid, current_iid))
+
+ def test_instance_id_no_byte_swap_same_instance_id(self):
+ previous_iid = "D0DF4C54-4ECB-4A4B-9954-5BDF3ED5C3B8"
+ current_iid = "D0DF4C54-4ECB-4A4B-9954-5BDF3ED5C3B8"
+ self.assertFalse(
+ azure_helper.is_byte_swapped(previous_iid, current_iid))
+
+ def test_instance_id_no_byte_swap_diff_instance_id(self):
+ previous_iid = "D0DF4C54-4ECB-4A4B-9954-5BDF3ED5C3B8"
+ current_iid = "G0DF4C54-4ECB-4A4B-9954-5BDF3ED5C3B8"
+ self.assertFalse(
+ azure_helper.is_byte_swapped(previous_iid, current_iid))
+
def test_certificates_xml_parsed_and_fetched_correctly(self):
http_client = mock.MagicMock()
certificates_url = 'TestCertificatesUrl'
@@ -205,8 +231,10 @@ class TestAzureEndpointHttpClient(CiTestCase):
response = client.get(url, secure=False)
self.assertEqual(1, self.read_file_or_url.call_count)
self.assertEqual(self.read_file_or_url.return_value, response)
- self.assertEqual(mock.call(url, headers=self.regular_headers),
- self.read_file_or_url.call_args)
+ self.assertEqual(
+ mock.call(url, headers=self.regular_headers, retries=10,
+ timeout=5),
+ self.read_file_or_url.call_args)
def test_secure_get(self):
url = 'MyTestUrl'
@@ -220,8 +248,10 @@ class TestAzureEndpointHttpClient(CiTestCase):
response = client.get(url, secure=True)
self.assertEqual(1, self.read_file_or_url.call_count)
self.assertEqual(self.read_file_or_url.return_value, response)
- self.assertEqual(mock.call(url, headers=expected_headers),
- self.read_file_or_url.call_args)
+ self.assertEqual(
+ mock.call(url, headers=expected_headers, retries=10,
+ timeout=5),
+ self.read_file_or_url.call_args)
def test_post(self):
data = mock.MagicMock()
@@ -231,7 +261,8 @@ class TestAzureEndpointHttpClient(CiTestCase):
self.assertEqual(1, self.read_file_or_url.call_count)
self.assertEqual(self.read_file_or_url.return_value, response)
self.assertEqual(
- mock.call(url, data=data, headers=self.regular_headers),
+ mock.call(url, data=data, headers=self.regular_headers, retries=10,
+ timeout=5),
self.read_file_or_url.call_args)
def test_post_with_extra_headers(self):
@@ -243,7 +274,8 @@ class TestAzureEndpointHttpClient(CiTestCase):
expected_headers = self.regular_headers.copy()
expected_headers.update(extra_headers)
self.assertEqual(
- mock.call(mock.ANY, data=mock.ANY, headers=expected_headers),
+ mock.call(mock.ANY, data=mock.ANY, headers=expected_headers,
+ retries=10, timeout=5),
self.read_file_or_url.call_args)
@@ -289,6 +321,50 @@ class TestOpenSSLManager(CiTestCase):
self.assertEqual([mock.call(manager.tmpdir)], del_dir.call_args_list)
+class TestOpenSSLManagerActions(CiTestCase):
+
+ def setUp(self):
+ super(TestOpenSSLManagerActions, self).setUp()
+
+ self.allowed_subp = True
+
+ def _data_file(self, name):
+ path = 'tests/data/azure'
+ return os.path.join(path, name)
+
+ @unittest2.skip("todo move to cloud_test")
+ def test_pubkey_extract(self):
+ cert = load_file(self._data_file('pubkey_extract_cert'))
+ good_key = load_file(self._data_file('pubkey_extract_ssh_key'))
+ sslmgr = azure_helper.OpenSSLManager()
+ key = sslmgr._get_ssh_key_from_cert(cert)
+ self.assertEqual(good_key, key)
+
+ good_fingerprint = '073E19D14D1C799224C6A0FD8DDAB6A8BF27D473'
+ fingerprint = sslmgr._get_fingerprint_from_cert(cert)
+ self.assertEqual(good_fingerprint, fingerprint)
+
+ @unittest2.skip("todo move to cloud_test")
+ @mock.patch.object(azure_helper.OpenSSLManager, '_decrypt_certs_from_xml')
+ def test_parse_certificates(self, mock_decrypt_certs):
+ """Azure control plane puts private keys as well as certificates
+ into the Certificates XML object. Make sure only the public keys
+ from certs are extracted and that fingerprints are converted to
+ the form specified in the ovf-env.xml file.
+ """
+ cert_contents = load_file(self._data_file('parse_certificates_pem'))
+ fingerprints = load_file(self._data_file(
+ 'parse_certificates_fingerprints')
+ ).splitlines()
+ mock_decrypt_certs.return_value = cert_contents
+ sslmgr = azure_helper.OpenSSLManager()
+ keys_by_fp = sslmgr.parse_certificates('')
+ for fp in keys_by_fp.keys():
+ self.assertIn(fp, fingerprints)
+ for fp in fingerprints:
+ self.assertIn(fp, keys_by_fp)
+
+
class TestWALinuxAgentShim(CiTestCase):
def setUp(self):
@@ -329,18 +405,31 @@ class TestWALinuxAgentShim(CiTestCase):
def test_certificates_used_to_determine_public_keys(self):
shim = wa_shim()
- data = shim.register_with_azure_and_fetch_data()
+ """if register_with_azure_and_fetch_data() isn't passed some info about
+ the user's public keys, there's no point in even trying to parse
+ the certificates
+ """
+ mypk = [{'fingerprint': 'fp1', 'path': 'path1'},
+ {'fingerprint': 'fp3', 'path': 'path3', 'value': ''}]
+ certs = {'fp1': 'expected-key',
+ 'fp2': 'should-not-be-found',
+ 'fp3': 'expected-no-value-key',
+ }
+ sslmgr = self.OpenSSLManager.return_value
+ sslmgr.parse_certificates.return_value = certs
+ data = shim.register_with_azure_and_fetch_data(pubkey_info=mypk)
self.assertEqual(
[mock.call(self.GoalState.return_value.certificates_xml)],
- self.OpenSSLManager.return_value.parse_certificates.call_args_list)
- self.assertEqual(
- self.OpenSSLManager.return_value.parse_certificates.return_value,
- data['public-keys'])
+ sslmgr.parse_certificates.call_args_list)
+ self.assertIn('expected-key', data['public-keys'])
+ self.assertIn('expected-no-value-key', data['public-keys'])
+ self.assertNotIn('should-not-be-found', data['public-keys'])
def test_absent_certificates_produces_empty_public_keys(self):
+ mypk = [{'fingerprint': 'fp1', 'path': 'path1'}]
self.GoalState.return_value.certificates_xml = None
shim = wa_shim()
- data = shim.register_with_azure_and_fetch_data()
+ data = shim.register_with_azure_and_fetch_data(pubkey_info=mypk)
self.assertEqual([], data['public-keys'])
def test_correct_url_used_for_report_ready(self):
diff --git a/tests/unittests/test_datasource/test_cloudsigma.py b/tests/unittests/test_datasource/test_cloudsigma.py
index 3bf52e69..d62d542b 100644
--- a/tests/unittests/test_datasource/test_cloudsigma.py
+++ b/tests/unittests/test_datasource/test_cloudsigma.py
@@ -30,6 +30,8 @@ SERVER_CONTEXT = {
}
}
+DS_PATH = 'cloudinit.sources.DataSourceCloudSigma.DataSourceCloudSigma'
+
class CepkoMock(Cepko):
def __init__(self, mocked_context):
@@ -42,17 +44,15 @@ class CepkoMock(Cepko):
class DataSourceCloudSigmaTest(test_helpers.CiTestCase):
def setUp(self):
super(DataSourceCloudSigmaTest, self).setUp()
- self.add_patch(
- "cloudinit.sources.DataSourceCloudSigma.util.is_container",
- "m_is_container", return_value=False)
self.paths = helpers.Paths({'run_dir': self.tmp_dir()})
+ self.add_patch(DS_PATH + '.is_running_in_cloudsigma',
+ "m_is_container", return_value=True)
self.datasource = DataSourceCloudSigma.DataSourceCloudSigma(
"", "", paths=self.paths)
- self.datasource.is_running_in_cloudsigma = lambda: True
self.datasource.cepko = CepkoMock(SERVER_CONTEXT)
- self.datasource.get_data()
def test_get_hostname(self):
+ self.datasource.get_data()
self.assertEqual("test_server", self.datasource.get_hostname())
self.datasource.metadata['name'] = ''
self.assertEqual("65b2fb23", self.datasource.get_hostname())
@@ -61,23 +61,28 @@ class DataSourceCloudSigmaTest(test_helpers.CiTestCase):
self.assertEqual("65b2fb23", self.datasource.get_hostname())
def test_get_public_ssh_keys(self):
+ self.datasource.get_data()
self.assertEqual([SERVER_CONTEXT['meta']['ssh_public_key']],
self.datasource.get_public_ssh_keys())
def test_get_instance_id(self):
+ self.datasource.get_data()
self.assertEqual(SERVER_CONTEXT['uuid'],
self.datasource.get_instance_id())
def test_platform(self):
"""All platform-related attributes are set."""
+ self.datasource.get_data()
self.assertEqual(self.datasource.cloud_name, 'cloudsigma')
self.assertEqual(self.datasource.platform_type, 'cloudsigma')
self.assertEqual(self.datasource.subplatform, 'cepko (/dev/ttyS1)')
def test_metadata(self):
+ self.datasource.get_data()
self.assertEqual(self.datasource.metadata, SERVER_CONTEXT)
def test_user_data(self):
+ self.datasource.get_data()
self.assertEqual(self.datasource.userdata_raw,
SERVER_CONTEXT['meta']['cloudinit-user-data'])
@@ -91,14 +96,13 @@ class DataSourceCloudSigmaTest(test_helpers.CiTestCase):
self.assertEqual(self.datasource.userdata_raw, b'hi world\n')
def test_vendor_data(self):
+ self.datasource.get_data()
self.assertEqual(self.datasource.vendordata_raw,
SERVER_CONTEXT['vendor_data']['cloudinit'])
def test_lack_of_vendor_data(self):
stripped_context = copy.deepcopy(SERVER_CONTEXT)
del stripped_context["vendor_data"]
- self.datasource = DataSourceCloudSigma.DataSourceCloudSigma(
- "", "", paths=self.paths)
self.datasource.cepko = CepkoMock(stripped_context)
self.datasource.get_data()
@@ -107,8 +111,6 @@ class DataSourceCloudSigmaTest(test_helpers.CiTestCase):
def test_lack_of_cloudinit_key_in_vendor_data(self):
stripped_context = copy.deepcopy(SERVER_CONTEXT)
del stripped_context["vendor_data"]["cloudinit"]
- self.datasource = DataSourceCloudSigma.DataSourceCloudSigma(
- "", "", paths=self.paths)
self.datasource.cepko = CepkoMock(stripped_context)
self.datasource.get_data()
diff --git a/tests/unittests/test_datasource/test_cloudstack.py b/tests/unittests/test_datasource/test_cloudstack.py
index d6d2d6b2..83c2f753 100644
--- a/tests/unittests/test_datasource/test_cloudstack.py
+++ b/tests/unittests/test_datasource/test_cloudstack.py
@@ -10,6 +10,9 @@ from cloudinit.tests.helpers import CiTestCase, ExitStack, mock
import os
import time
+MOD_PATH = 'cloudinit.sources.DataSourceCloudStack'
+DS_PATH = MOD_PATH + '.DataSourceCloudStack'
+
class TestCloudStackPasswordFetching(CiTestCase):
@@ -17,7 +20,7 @@ class TestCloudStackPasswordFetching(CiTestCase):
super(TestCloudStackPasswordFetching, self).setUp()
self.patches = ExitStack()
self.addCleanup(self.patches.close)
- mod_name = 'cloudinit.sources.DataSourceCloudStack'
+ mod_name = MOD_PATH
self.patches.enter_context(mock.patch('{0}.ec2'.format(mod_name)))
self.patches.enter_context(mock.patch('{0}.uhelp'.format(mod_name)))
default_gw = "192.201.20.0"
@@ -56,7 +59,9 @@ class TestCloudStackPasswordFetching(CiTestCase):
ds.get_data()
self.assertEqual({}, ds.get_config_obj())
- def test_password_sets_password(self):
+ @mock.patch(DS_PATH + '.wait_for_metadata_service')
+ def test_password_sets_password(self, m_wait):
+ m_wait.return_value = True
password = 'SekritSquirrel'
self._set_password_server_response(password)
ds = DataSourceCloudStack(
@@ -64,7 +69,9 @@ class TestCloudStackPasswordFetching(CiTestCase):
ds.get_data()
self.assertEqual(password, ds.get_config_obj()['password'])
- def test_bad_request_doesnt_stop_ds_from_working(self):
+ @mock.patch(DS_PATH + '.wait_for_metadata_service')
+ def test_bad_request_doesnt_stop_ds_from_working(self, m_wait):
+ m_wait.return_value = True
self._set_password_server_response('bad_request')
ds = DataSourceCloudStack(
{}, None, helpers.Paths({'run_dir': self.tmp}))
@@ -79,7 +86,9 @@ class TestCloudStackPasswordFetching(CiTestCase):
request_types.append(arg.split()[1])
self.assertEqual(expected_request_types, request_types)
- def test_valid_response_means_password_marked_as_saved(self):
+ @mock.patch(DS_PATH + '.wait_for_metadata_service')
+ def test_valid_response_means_password_marked_as_saved(self, m_wait):
+ m_wait.return_value = True
password = 'SekritSquirrel'
subp = self._set_password_server_response(password)
ds = DataSourceCloudStack(
@@ -92,7 +101,9 @@ class TestCloudStackPasswordFetching(CiTestCase):
subp = self._set_password_server_response(response_string)
ds = DataSourceCloudStack(
{}, None, helpers.Paths({'run_dir': self.tmp}))
- ds.get_data()
+ with mock.patch(DS_PATH + '.wait_for_metadata_service') as m_wait:
+ m_wait.return_value = True
+ ds.get_data()
self.assertRequestTypesSent(subp, ['send_my_password'])
def test_password_not_saved_if_empty(self):
diff --git a/tests/unittests/test_datasource/test_common.py b/tests/unittests/test_datasource/test_common.py
index 6b01a4ea..4ab5d471 100644
--- a/tests/unittests/test_datasource/test_common.py
+++ b/tests/unittests/test_datasource/test_common.py
@@ -4,6 +4,7 @@ from cloudinit import settings
from cloudinit import sources
from cloudinit import type_utils
from cloudinit.sources import (
+ DataSource,
DataSourceAliYun as AliYun,
DataSourceAltCloud as AltCloud,
DataSourceAzure as Azure,
@@ -13,6 +14,7 @@ from cloudinit.sources import (
DataSourceConfigDrive as ConfigDrive,
DataSourceDigitalOcean as DigitalOcean,
DataSourceEc2 as Ec2,
+ DataSourceExoscale as Exoscale,
DataSourceGCE as GCE,
DataSourceHetzner as Hetzner,
DataSourceIBMCloud as IBMCloud,
@@ -22,6 +24,7 @@ from cloudinit.sources import (
DataSourceOpenStack as OpenStack,
DataSourceOracle as Oracle,
DataSourceOVF as OVF,
+ DataSourceRbxCloud as RbxCloud,
DataSourceScaleway as Scaleway,
DataSourceSmartOS as SmartOS,
)
@@ -43,6 +46,7 @@ DEFAULT_LOCAL = [
SmartOS.DataSourceSmartOS,
Ec2.DataSourceEc2Local,
OpenStack.DataSourceOpenStackLocal,
+ RbxCloud.DataSourceRbxCloud,
Scaleway.DataSourceScaleway,
]
@@ -53,6 +57,7 @@ DEFAULT_NETWORK = [
CloudStack.DataSourceCloudStack,
DSNone.DataSourceNone,
Ec2.DataSourceEc2,
+ Exoscale.DataSourceExoscale,
GCE.DataSourceGCE,
MAAS.DataSourceMAAS,
NoCloud.DataSourceNoCloudNet,
@@ -83,4 +88,23 @@ class ExpectedDataSources(test_helpers.TestCase):
self.assertEqual(set([AliYun.DataSourceAliYun]), set(found))
+class TestDataSourceInvariants(test_helpers.TestCase):
+ def test_data_sources_have_valid_network_config_sources(self):
+ for ds in DEFAULT_LOCAL + DEFAULT_NETWORK:
+ for cfg_src in ds.network_config_sources:
+ fail_msg = ('{} has an invalid network_config_sources entry:'
+ ' {}'.format(str(ds), cfg_src))
+ self.assertTrue(hasattr(sources.NetworkConfigSource, cfg_src),
+ fail_msg)
+
+ def test_expected_dsname_defined(self):
+ for ds in DEFAULT_LOCAL + DEFAULT_NETWORK:
+ fail_msg = (
+ '{} has an invalid / missing dsname property: {}'.format(
+ str(ds), str(ds.dsname)
+ )
+ )
+ self.assertNotEqual(ds.dsname, DataSource.dsname, fail_msg)
+ self.assertIsNotNone(ds.dsname)
+
# vi: ts=4 expandtab
diff --git a/tests/unittests/test_datasource/test_configdrive.py b/tests/unittests/test_datasource/test_configdrive.py
index dcdabea5..6f830cc6 100644
--- a/tests/unittests/test_datasource/test_configdrive.py
+++ b/tests/unittests/test_datasource/test_configdrive.py
@@ -220,13 +220,15 @@ CFG_DRIVE_FILES_V2 = {
'openstack/2015-10-15/user_data': USER_DATA,
'openstack/2015-10-15/network_data.json': json.dumps(NETWORK_DATA)}
+M_PATH = "cloudinit.sources.DataSourceConfigDrive."
+
class TestConfigDriveDataSource(CiTestCase):
def setUp(self):
super(TestConfigDriveDataSource, self).setUp()
self.add_patch(
- "cloudinit.sources.DataSourceConfigDrive.util.find_devs_with",
+ M_PATH + "util.find_devs_with",
"m_find_devs_with", return_value=[])
self.tmp = self.tmp_dir()
@@ -268,8 +270,7 @@ class TestConfigDriveDataSource(CiTestCase):
exists_mock = mocks.enter_context(
mock.patch.object(os.path, 'exists',
side_effect=exists_side_effect()))
- device = cfg_ds.device_name_to_device(name)
- self.assertEqual(dev_name, device)
+ self.assertEqual(dev_name, cfg_ds.device_name_to_device(name))
find_mock.assert_called_once_with(mock.ANY)
self.assertEqual(exists_mock.call_count, 2)
@@ -296,8 +297,7 @@ class TestConfigDriveDataSource(CiTestCase):
exists_mock = mocks.enter_context(
mock.patch.object(os.path, 'exists',
return_value=True))
- device = cfg_ds.device_name_to_device(name)
- self.assertEqual(dev_name, device)
+ self.assertEqual(dev_name, cfg_ds.device_name_to_device(name))
find_mock.assert_called_once_with(mock.ANY)
exists_mock.assert_called_once_with(mock.ANY)
@@ -331,8 +331,7 @@ class TestConfigDriveDataSource(CiTestCase):
yield True
with mock.patch.object(os.path, 'exists',
side_effect=exists_side_effect()):
- device = cfg_ds.device_name_to_device(name)
- self.assertEqual(dev_name, device)
+ self.assertEqual(dev_name, cfg_ds.device_name_to_device(name))
# We don't assert the call count for os.path.exists() because
# not all of the entries in name_tests results in two calls to
# that function. Specifically, 'root2k' doesn't seem to call
@@ -359,8 +358,7 @@ class TestConfigDriveDataSource(CiTestCase):
}
for name, dev_name in name_tests.items():
with mock.patch.object(os.path, 'exists', return_value=True):
- device = cfg_ds.device_name_to_device(name)
- self.assertEqual(dev_name, device)
+ self.assertEqual(dev_name, cfg_ds.device_name_to_device(name))
def test_dir_valid(self):
"""Verify a dir is read as such."""
@@ -472,7 +470,7 @@ class TestConfigDriveDataSource(CiTestCase):
util.find_devs_with = orig_find_devs_with
util.is_partition = orig_is_partition
- @mock.patch('cloudinit.sources.DataSourceConfigDrive.on_first_boot')
+ @mock.patch(M_PATH + 'on_first_boot')
def test_pubkeys_v2(self, on_first_boot):
"""Verify that public-keys work in config-drive-v2."""
myds = cfg_ds_from_dir(self.tmp, files=CFG_DRIVE_FILES_V2)
@@ -482,6 +480,19 @@ class TestConfigDriveDataSource(CiTestCase):
self.assertEqual('openstack', myds.platform)
self.assertEqual('seed-dir (%s/seed)' % self.tmp, myds.subplatform)
+ def test_subplatform_config_drive_when_starts_with_dev(self):
+ """subplatform reports config-drive when source starts with /dev/."""
+ cfg_ds = ds.DataSourceConfigDrive(settings.CFG_BUILTIN,
+ None,
+ helpers.Paths({}))
+ with mock.patch(M_PATH + 'find_candidate_devs') as m_find_devs:
+ with mock.patch(M_PATH + 'util.is_FreeBSD', return_value=False):
+ with mock.patch(M_PATH + 'util.mount_cb'):
+ with mock.patch(M_PATH + 'on_first_boot'):
+ m_find_devs.return_value = ['/dev/anything']
+ self.assertEqual(True, cfg_ds.get_data())
+ self.assertEqual('config-disk (/dev/anything)', cfg_ds.subplatform)
+
class TestNetJson(CiTestCase):
def setUp(self):
@@ -489,13 +500,13 @@ class TestNetJson(CiTestCase):
self.tmp = self.tmp_dir()
self.maxDiff = None
- @mock.patch('cloudinit.sources.DataSourceConfigDrive.on_first_boot')
+ @mock.patch(M_PATH + 'on_first_boot')
def test_network_data_is_found(self, on_first_boot):
"""Verify that network_data is present in ds in config-drive-v2."""
myds = cfg_ds_from_dir(self.tmp, files=CFG_DRIVE_FILES_V2)
self.assertIsNotNone(myds.network_json)
- @mock.patch('cloudinit.sources.DataSourceConfigDrive.on_first_boot')
+ @mock.patch(M_PATH + 'on_first_boot')
def test_network_config_is_converted(self, on_first_boot):
"""Verify that network_data is converted and present on ds object."""
myds = cfg_ds_from_dir(self.tmp, files=CFG_DRIVE_FILES_V2)
@@ -503,6 +514,46 @@ class TestNetJson(CiTestCase):
known_macs=KNOWN_MACS)
self.assertEqual(myds.network_config, network_config)
+ def test_network_config_conversion_dhcp6(self):
+ """Test some ipv6 input network json and check the expected
+ conversions."""
+ in_data = {
+ 'links': [
+ {'vif_id': '2ecc7709-b3f7-4448-9580-e1ec32d75bbd',
+ 'ethernet_mac_address': 'fa:16:3e:69:b0:58',
+ 'type': 'ovs', 'mtu': None, 'id': 'tap2ecc7709-b3'},
+ {'vif_id': '2f88d109-5b57-40e6-af32-2472df09dc33',
+ 'ethernet_mac_address': 'fa:16:3e:d4:57:ad',
+ 'type': 'ovs', 'mtu': None, 'id': 'tap2f88d109-5b'},
+ ],
+ 'networks': [
+ {'link': 'tap2ecc7709-b3', 'type': 'ipv6_dhcpv6-stateless',
+ 'network_id': '6d6357ac-0f70-4afa-8bd7-c274cc4ea235',
+ 'id': 'network0'},
+ {'link': 'tap2f88d109-5b', 'type': 'ipv6_dhcpv6-stateful',
+ 'network_id': 'd227a9b3-6960-4d94-8976-ee5788b44f54',
+ 'id': 'network1'},
+ ]
+ }
+ out_data = {
+ 'version': 1,
+ 'config': [
+ {'mac_address': 'fa:16:3e:69:b0:58',
+ 'mtu': None,
+ 'name': 'enp0s1',
+ 'subnets': [{'type': 'ipv6_dhcpv6-stateless'}],
+ 'type': 'physical'},
+ {'mac_address': 'fa:16:3e:d4:57:ad',
+ 'mtu': None,
+ 'name': 'enp0s2',
+ 'subnets': [{'type': 'ipv6_dhcpv6-stateful'}],
+ 'type': 'physical',
+ 'accept-ra': True}
+ ],
+ }
+ conv_data = openstack.convert_net_json(in_data, known_macs=KNOWN_MACS)
+ self.assertEqual(out_data, conv_data)
+
def test_network_config_conversions(self):
"""Tests a bunch of input network json and checks the
expected conversions."""
@@ -604,6 +655,9 @@ class TestNetJson(CiTestCase):
class TestConvertNetworkData(CiTestCase):
+
+ with_logs = True
+
def setUp(self):
super(TestConvertNetworkData, self).setUp()
self.tmp = self.tmp_dir()
@@ -730,6 +784,26 @@ class TestConvertNetworkData(CiTestCase):
'enp0s2': 'fa:16:3e:d4:57:ad'}
self.assertEqual(expected, config_name2mac)
+ def test_unknown_device_types_accepted(self):
+ # If we don't recognise a link, we should treat it as physical for a
+ # best-effort boot
+ my_netdata = deepcopy(NETWORK_DATA)
+ my_netdata['links'][0]['type'] = 'my-special-link-type'
+
+ ncfg = openstack.convert_net_json(my_netdata, known_macs=KNOWN_MACS)
+ config_name2mac = {}
+ for n in ncfg['config']:
+ if n['type'] == 'physical':
+ config_name2mac[n['name']] = n['mac_address']
+
+ expected = {'nic0': 'fa:16:3e:05:30:fe', 'enp0s1': 'fa:16:3e:69:b0:58',
+ 'enp0s2': 'fa:16:3e:d4:57:ad'}
+ self.assertEqual(expected, config_name2mac)
+
+ # We should, however, warn the user that we don't recognise the type
+ self.assertIn('Unknown network_data link type (my-special-link-type)',
+ self.logs.getvalue())
+
def cfg_ds_from_dir(base_d, files=None):
run = os.path.join(base_d, "run")
diff --git a/tests/unittests/test_datasource/test_ec2.py b/tests/unittests/test_datasource/test_ec2.py
index 1a5956d9..2a96122f 100644
--- a/tests/unittests/test_datasource/test_ec2.py
+++ b/tests/unittests/test_datasource/test_ec2.py
@@ -3,7 +3,7 @@
import copy
import httpretty
import json
-import mock
+from unittest import mock
from cloudinit import helpers
from cloudinit.sources import DataSourceEc2 as ec2
@@ -191,7 +191,9 @@ def register_mock_metaserver(base_url, data):
register(base_url, 'not found', status=404)
def myreg(*argc, **kwargs):
- return httpretty.register_uri(httpretty.GET, *argc, **kwargs)
+ url = argc[0]
+ method = httpretty.PUT if ec2.API_TOKEN_ROUTE in url else httpretty.GET
+ return httpretty.register_uri(method, *argc, **kwargs)
register_helper(myreg, base_url, data)
@@ -237,6 +239,8 @@ class TestEc2(test_helpers.HttprettyTestCase):
if md:
all_versions = (
[ds.min_metadata_version] + ds.extended_metadata_versions)
+ token_url = self.data_url('latest', data_item='api/token')
+ register_mock_metaserver(token_url, 'API-TOKEN')
for version in all_versions:
metadata_url = self.data_url(version) + '/'
if version == md_version:
@@ -401,6 +405,47 @@ class TestEc2(test_helpers.HttprettyTestCase):
ds.metadata = DEFAULT_METADATA
self.assertEqual('my-identity-id', ds.get_instance_id())
+ def test_classic_instance_true(self):
+ """If no vpc-id in metadata, is_classic_instance must return true."""
+ md_copy = copy.deepcopy(DEFAULT_METADATA)
+ ifaces_md = md_copy.get('network', {}).get('interfaces', {})
+ for _mac, mac_data in ifaces_md.get('macs', {}).items():
+ if 'vpc-id' in mac_data:
+ del mac_data['vpc-id']
+
+ ds = self._setup_ds(
+ platform_data=self.valid_platform_data,
+ sys_cfg={'datasource': {'Ec2': {'strict_id': False}}},
+ md={'md': md_copy})
+ self.assertTrue(ds.get_data())
+ self.assertTrue(ds.is_classic_instance())
+
+ def test_classic_instance_false(self):
+ """If vpc-id in metadata, is_classic_instance must return false."""
+ ds = self._setup_ds(
+ platform_data=self.valid_platform_data,
+ sys_cfg={'datasource': {'Ec2': {'strict_id': False}}},
+ md={'md': DEFAULT_METADATA})
+ self.assertTrue(ds.get_data())
+ self.assertFalse(ds.is_classic_instance())
+
+ def test_aws_token_redacted(self):
+ """Verify that aws tokens are redacted when logged."""
+ ds = self._setup_ds(
+ platform_data=self.valid_platform_data,
+ sys_cfg={'datasource': {'Ec2': {'strict_id': False}}},
+ md={'md': DEFAULT_METADATA})
+ self.assertTrue(ds.get_data())
+ all_logs = self.logs.getvalue().splitlines()
+ REDACT_TTL = "'X-aws-ec2-metadata-token-ttl-seconds': 'REDACTED'"
+ REDACT_TOK = "'X-aws-ec2-metadata-token': 'REDACTED'"
+ logs_with_redacted_ttl = [log for log in all_logs if REDACT_TTL in log]
+ logs_with_redacted = [log for log in all_logs if REDACT_TOK in log]
+ logs_with_token = [log for log in all_logs if 'API-TOKEN' in log]
+ self.assertEqual(1, len(logs_with_redacted_ttl))
+ self.assertEqual(79, len(logs_with_redacted))
+ self.assertEqual(0, len(logs_with_token))
+
@mock.patch('cloudinit.net.dhcp.maybe_perform_dhcp_discovery')
def test_valid_platform_with_strict_true(self, m_dhcp):
"""Valid platform data should return true with strict_id true."""
@@ -514,7 +559,8 @@ class TestEc2(test_helpers.HttprettyTestCase):
m_dhcp.assert_called_once_with('eth9')
m_net.assert_called_once_with(
broadcast='192.168.2.255', interface='eth9', ip='192.168.2.9',
- prefix_or_mask='255.255.255.0', router='192.168.2.1')
+ prefix_or_mask='255.255.255.0', router='192.168.2.1',
+ static_routes=None)
self.assertIn('Crawl of metadata service took', self.logs.getvalue())
@@ -637,4 +683,45 @@ class TestConvertEc2MetadataNetworkConfig(test_helpers.CiTestCase):
expected,
ec2.convert_ec2_metadata_network_config(self.network_metadata))
+
+class TesIdentifyPlatform(test_helpers.CiTestCase):
+
+ def collmock(self, **kwargs):
+ """return non-special _collect_platform_data updated with changes."""
+ unspecial = {
+ 'asset_tag': '3857-0037-2746-7462-1818-3997-77',
+ 'serial': 'H23-C4J3JV-R6',
+ 'uuid': '81c7e555-6471-4833-9551-1ab366c4cfd2',
+ 'uuid_source': 'dmi',
+ 'vendor': 'tothecloud',
+ }
+ unspecial.update(**kwargs)
+ return unspecial
+
+ @mock.patch('cloudinit.sources.DataSourceEc2._collect_platform_data')
+ def test_identify_zstack(self, m_collect):
+ """zstack should be identified if chassis-asset-tag ends in .zstack.io
+ """
+ m_collect.return_value = self.collmock(asset_tag='123456.zstack.io')
+ self.assertEqual(ec2.CloudNames.ZSTACK, ec2.identify_platform())
+
+ @mock.patch('cloudinit.sources.DataSourceEc2._collect_platform_data')
+ def test_identify_zstack_full_domain_only(self, m_collect):
+ """zstack asset-tag matching should match only on full domain boundary.
+ """
+ m_collect.return_value = self.collmock(asset_tag='123456.buzzstack.io')
+ self.assertEqual(ec2.CloudNames.UNKNOWN, ec2.identify_platform())
+
+ @mock.patch('cloudinit.sources.DataSourceEc2._collect_platform_data')
+ def test_identify_e24cloud(self, m_collect):
+ """e24cloud identified if vendor is e24cloud"""
+ m_collect.return_value = self.collmock(vendor='e24cloud')
+ self.assertEqual(ec2.CloudNames.E24CLOUD, ec2.identify_platform())
+
+ @mock.patch('cloudinit.sources.DataSourceEc2._collect_platform_data')
+ def test_identify_e24cloud_negative(self, m_collect):
+ """e24cloud identified if vendor is e24cloud"""
+ m_collect.return_value = self.collmock(vendor='e24cloudyday')
+ self.assertEqual(ec2.CloudNames.UNKNOWN, ec2.identify_platform())
+
# vi: ts=4 expandtab
diff --git a/tests/unittests/test_datasource/test_exoscale.py b/tests/unittests/test_datasource/test_exoscale.py
new file mode 100644
index 00000000..f0061199
--- /dev/null
+++ b/tests/unittests/test_datasource/test_exoscale.py
@@ -0,0 +1,211 @@
+# Author: Mathieu Corbin <mathieu.corbin@exoscale.com>
+# Author: Christopher Glass <christopher.glass@exoscale.com>
+#
+# This file is part of cloud-init. See LICENSE file for license information.
+from cloudinit import helpers
+from cloudinit.sources.DataSourceExoscale import (
+ API_VERSION,
+ DataSourceExoscale,
+ METADATA_URL,
+ get_password,
+ PASSWORD_SERVER_PORT,
+ read_metadata)
+from cloudinit.tests.helpers import HttprettyTestCase, mock
+from cloudinit import util
+
+import httpretty
+import os
+import requests
+
+
+TEST_PASSWORD_URL = "{}:{}/{}/".format(METADATA_URL,
+ PASSWORD_SERVER_PORT,
+ API_VERSION)
+
+TEST_METADATA_URL = "{}/{}/meta-data/".format(METADATA_URL,
+ API_VERSION)
+
+TEST_USERDATA_URL = "{}/{}/user-data".format(METADATA_URL,
+ API_VERSION)
+
+
+@httpretty.activate
+class TestDatasourceExoscale(HttprettyTestCase):
+
+ def setUp(self):
+ super(TestDatasourceExoscale, self).setUp()
+ self.tmp = self.tmp_dir()
+ self.password_url = TEST_PASSWORD_URL
+ self.metadata_url = TEST_METADATA_URL
+ self.userdata_url = TEST_USERDATA_URL
+
+ def test_password_saved(self):
+ """The password is not set when it is not found
+ in the metadata service."""
+ httpretty.register_uri(httpretty.GET,
+ self.password_url,
+ body="saved_password")
+ self.assertFalse(get_password())
+
+ def test_password_empty(self):
+ """No password is set if the metadata service returns
+ an empty string."""
+ httpretty.register_uri(httpretty.GET,
+ self.password_url,
+ body="")
+ self.assertFalse(get_password())
+
+ def test_password(self):
+ """The password is set to what is found in the metadata
+ service."""
+ expected_password = "p@ssw0rd"
+ httpretty.register_uri(httpretty.GET,
+ self.password_url,
+ body=expected_password)
+ password = get_password()
+ self.assertEqual(expected_password, password)
+
+ def test_activate_removes_set_passwords_semaphore(self):
+ """Allow set_passwords to run every boot by removing the semaphore."""
+ path = helpers.Paths({'cloud_dir': self.tmp})
+ sem_dir = self.tmp_path('instance/sem', dir=self.tmp)
+ util.ensure_dir(sem_dir)
+ sem_file = os.path.join(sem_dir, 'config_set_passwords')
+ with open(sem_file, 'w') as stream:
+ stream.write('')
+ ds = DataSourceExoscale({}, None, path)
+ ds.activate(None, None)
+ self.assertFalse(os.path.exists(sem_file))
+
+ def test_get_data(self):
+ """The datasource conforms to expected behavior when supplied
+ full test data."""
+ path = helpers.Paths({'run_dir': self.tmp})
+ ds = DataSourceExoscale({}, None, path)
+ ds._is_platform_viable = lambda: True
+ expected_password = "p@ssw0rd"
+ expected_id = "12345"
+ expected_hostname = "myname"
+ expected_userdata = "#cloud-config"
+ httpretty.register_uri(httpretty.GET,
+ self.userdata_url,
+ body=expected_userdata)
+ httpretty.register_uri(httpretty.GET,
+ self.password_url,
+ body=expected_password)
+ httpretty.register_uri(httpretty.GET,
+ self.metadata_url,
+ body="instance-id\nlocal-hostname")
+ httpretty.register_uri(httpretty.GET,
+ "{}local-hostname".format(self.metadata_url),
+ body=expected_hostname)
+ httpretty.register_uri(httpretty.GET,
+ "{}instance-id".format(self.metadata_url),
+ body=expected_id)
+ self.assertTrue(ds._get_data())
+ self.assertEqual(ds.userdata_raw.decode("utf-8"), "#cloud-config")
+ self.assertEqual(ds.metadata, {"instance-id": expected_id,
+ "local-hostname": expected_hostname})
+ self.assertEqual(ds.get_config_obj(),
+ {'ssh_pwauth': True,
+ 'password': expected_password,
+ 'chpasswd': {
+ 'expire': False,
+ }})
+
+ def test_get_data_saved_password(self):
+ """The datasource conforms to expected behavior when saved_password is
+ returned by the password server."""
+ path = helpers.Paths({'run_dir': self.tmp})
+ ds = DataSourceExoscale({}, None, path)
+ ds._is_platform_viable = lambda: True
+ expected_answer = "saved_password"
+ expected_id = "12345"
+ expected_hostname = "myname"
+ expected_userdata = "#cloud-config"
+ httpretty.register_uri(httpretty.GET,
+ self.userdata_url,
+ body=expected_userdata)
+ httpretty.register_uri(httpretty.GET,
+ self.password_url,
+ body=expected_answer)
+ httpretty.register_uri(httpretty.GET,
+ self.metadata_url,
+ body="instance-id\nlocal-hostname")
+ httpretty.register_uri(httpretty.GET,
+ "{}local-hostname".format(self.metadata_url),
+ body=expected_hostname)
+ httpretty.register_uri(httpretty.GET,
+ "{}instance-id".format(self.metadata_url),
+ body=expected_id)
+ self.assertTrue(ds._get_data())
+ self.assertEqual(ds.userdata_raw.decode("utf-8"), "#cloud-config")
+ self.assertEqual(ds.metadata, {"instance-id": expected_id,
+ "local-hostname": expected_hostname})
+ self.assertEqual(ds.get_config_obj(), {})
+
+ def test_get_data_no_password(self):
+ """The datasource conforms to expected behavior when no password is
+ returned by the password server."""
+ path = helpers.Paths({'run_dir': self.tmp})
+ ds = DataSourceExoscale({}, None, path)
+ ds._is_platform_viable = lambda: True
+ expected_answer = ""
+ expected_id = "12345"
+ expected_hostname = "myname"
+ expected_userdata = "#cloud-config"
+ httpretty.register_uri(httpretty.GET,
+ self.userdata_url,
+ body=expected_userdata)
+ httpretty.register_uri(httpretty.GET,
+ self.password_url,
+ body=expected_answer)
+ httpretty.register_uri(httpretty.GET,
+ self.metadata_url,
+ body="instance-id\nlocal-hostname")
+ httpretty.register_uri(httpretty.GET,
+ "{}local-hostname".format(self.metadata_url),
+ body=expected_hostname)
+ httpretty.register_uri(httpretty.GET,
+ "{}instance-id".format(self.metadata_url),
+ body=expected_id)
+ self.assertTrue(ds._get_data())
+ self.assertEqual(ds.userdata_raw.decode("utf-8"), "#cloud-config")
+ self.assertEqual(ds.metadata, {"instance-id": expected_id,
+ "local-hostname": expected_hostname})
+ self.assertEqual(ds.get_config_obj(), {})
+
+ @mock.patch('cloudinit.sources.DataSourceExoscale.get_password')
+ def test_read_metadata_when_password_server_unreachable(self, m_password):
+ """The read_metadata function returns partial results in case the
+ password server (only) is unreachable."""
+ expected_id = "12345"
+ expected_hostname = "myname"
+ expected_userdata = "#cloud-config"
+
+ m_password.side_effect = requests.Timeout('Fake Connection Timeout')
+ httpretty.register_uri(httpretty.GET,
+ self.userdata_url,
+ body=expected_userdata)
+ httpretty.register_uri(httpretty.GET,
+ self.metadata_url,
+ body="instance-id\nlocal-hostname")
+ httpretty.register_uri(httpretty.GET,
+ "{}local-hostname".format(self.metadata_url),
+ body=expected_hostname)
+ httpretty.register_uri(httpretty.GET,
+ "{}instance-id".format(self.metadata_url),
+ body=expected_id)
+
+ result = read_metadata()
+
+ self.assertIsNone(result.get("password"))
+ self.assertEqual(result.get("user-data").decode("utf-8"),
+ expected_userdata)
+
+ def test_non_viable_platform(self):
+ """The datasource fails fast when the platform is not viable."""
+ path = helpers.Paths({'run_dir': self.tmp})
+ ds = DataSourceExoscale({}, None, path)
+ ds._is_platform_viable = lambda: False
+ self.assertFalse(ds._get_data())
diff --git a/tests/unittests/test_datasource/test_gce.py b/tests/unittests/test_datasource/test_gce.py
index 41176c6a..4afbccff 100644
--- a/tests/unittests/test_datasource/test_gce.py
+++ b/tests/unittests/test_datasource/test_gce.py
@@ -7,11 +7,11 @@
import datetime
import httpretty
import json
-import mock
import re
+from unittest import mock
+from urllib.parse import urlparse
from base64 import b64encode, b64decode
-from six.moves.urllib_parse import urlparse
from cloudinit import distros
from cloudinit import helpers
@@ -55,6 +55,8 @@ GCE_USER_DATA_TEXT = {
HEADERS = {'Metadata-Flavor': 'Google'}
MD_URL_RE = re.compile(
r'http://metadata.google.internal/computeMetadata/v1/.*')
+GUEST_ATTRIBUTES_URL = ('http://metadata.google.internal/computeMetadata/'
+ 'v1/instance/guest-attributes/hostkeys/')
def _set_mock_metadata(gce_meta=None):
@@ -341,4 +343,20 @@ class TestDataSourceGCE(test_helpers.HttprettyTestCase):
public_key_data, default_user='default')
self.assertEqual(sorted(found), sorted(expected))
+ @mock.patch("cloudinit.url_helper.readurl")
+ def test_publish_host_keys(self, m_readurl):
+ hostkeys = [('ssh-rsa', 'asdfasdf'),
+ ('ssh-ed25519', 'qwerqwer')]
+ readurl_expected_calls = [
+ mock.call(check_status=False, data=b'asdfasdf', headers=HEADERS,
+ request_method='PUT',
+ url='%s%s' % (GUEST_ATTRIBUTES_URL, 'ssh-rsa')),
+ mock.call(check_status=False, data=b'qwerqwer', headers=HEADERS,
+ request_method='PUT',
+ url='%s%s' % (GUEST_ATTRIBUTES_URL, 'ssh-ed25519')),
+ ]
+ self.ds.publish_host_keys(hostkeys)
+ m_readurl.assert_has_calls(readurl_expected_calls, any_order=True)
+
+
# vi: ts=4 expandtab
diff --git a/tests/unittests/test_datasource/test_maas.py b/tests/unittests/test_datasource/test_maas.py
index c84d067e..2a81d3f5 100644
--- a/tests/unittests/test_datasource/test_maas.py
+++ b/tests/unittests/test_datasource/test_maas.py
@@ -1,11 +1,11 @@
# This file is part of cloud-init. See LICENSE file for license information.
from copy import copy
-import mock
import os
import shutil
import tempfile
import yaml
+from unittest import mock
from cloudinit.sources import DataSourceMAAS
from cloudinit import url_helper
diff --git a/tests/unittests/test_datasource/test_nocloud.py b/tests/unittests/test_datasource/test_nocloud.py
index 3429272c..18bea0b9 100644
--- a/tests/unittests/test_datasource/test_nocloud.py
+++ b/tests/unittests/test_datasource/test_nocloud.py
@@ -32,6 +32,36 @@ class TestNoCloudDataSource(CiTestCase):
self.mocks.enter_context(
mock.patch.object(util, 'read_dmi_data', return_value=None))
+ def _test_fs_config_is_read(self, fs_label, fs_label_to_search):
+ vfat_device = 'device-1'
+
+ def m_mount_cb(device, callback, mtype):
+ if (device == vfat_device):
+ return {'meta-data': yaml.dump({'instance-id': 'IID'})}
+ else:
+ return {}
+
+ def m_find_devs_with(query='', path=''):
+ if 'TYPE=vfat' == query:
+ return [vfat_device]
+ elif 'LABEL={}'.format(fs_label) == query:
+ return [vfat_device]
+ else:
+ return []
+
+ self.mocks.enter_context(
+ mock.patch.object(util, 'find_devs_with',
+ side_effect=m_find_devs_with))
+ self.mocks.enter_context(
+ mock.patch.object(util, 'mount_cb',
+ side_effect=m_mount_cb))
+ sys_cfg = {'datasource': {'NoCloud': {'fs_label': fs_label_to_search}}}
+ dsrc = dsNoCloud(sys_cfg=sys_cfg, distro=None, paths=self.paths)
+ ret = dsrc.get_data()
+
+ self.assertEqual(dsrc.metadata.get('instance-id'), 'IID')
+ self.assertTrue(ret)
+
def test_nocloud_seed_dir_on_lxd(self, m_is_lxd):
md = {'instance-id': 'IID', 'dsmode': 'local'}
ud = b"USER_DATA_HERE"
@@ -90,6 +120,18 @@ class TestNoCloudDataSource(CiTestCase):
ret = dsrc.get_data()
self.assertFalse(ret)
+ def test_fs_config_lowercase_label(self, m_is_lxd):
+ self._test_fs_config_is_read('cidata', 'cidata')
+
+ def test_fs_config_uppercase_label(self, m_is_lxd):
+ self._test_fs_config_is_read('CIDATA', 'cidata')
+
+ def test_fs_config_lowercase_label_search_uppercase(self, m_is_lxd):
+ self._test_fs_config_is_read('cidata', 'CIDATA')
+
+ def test_fs_config_uppercase_label_search_uppercase(self, m_is_lxd):
+ self._test_fs_config_is_read('CIDATA', 'CIDATA')
+
def test_no_datasource_expected(self, m_is_lxd):
# no source should be found if no cmdline, config, and fs_label=None
sys_cfg = {'datasource': {'NoCloud': {'fs_label': None}}}
@@ -236,6 +278,24 @@ class TestNoCloudDataSource(CiTestCase):
self.assertEqual(netconf, dsrc.network_config)
self.assertNotIn(gateway, str(dsrc.network_config))
+ @mock.patch("cloudinit.util.blkid")
+ def test_nocloud_get_devices_freebsd(self, m_is_lxd, fake_blkid):
+ populate_dir(os.path.join(self.paths.seed_dir, "nocloud"),
+ {'user-data': b"ud", 'meta-data': "instance-id: IID\n"})
+
+ sys_cfg = {'datasource': {'NoCloud': {'fs_label': None}}}
+
+ self.mocks.enter_context(
+ mock.patch.object(util, 'is_FreeBSD', return_value=True))
+
+ self.mocks.enter_context(
+ mock.patch.object(os.path, 'exists', return_value=True))
+
+ dsrc = dsNoCloud(sys_cfg=sys_cfg, distro=None, paths=self.paths)
+ ret = dsrc._get_devices('foo')
+ self.assertEqual(['/dev/msdosfs/foo', '/dev/iso9660/foo'], ret)
+ fake_blkid.assert_not_called()
+
class TestParseCommandLineData(CiTestCase):
diff --git a/tests/unittests/test_datasource/test_openstack.py b/tests/unittests/test_datasource/test_openstack.py
index a731f1ed..f754556f 100644
--- a/tests/unittests/test_datasource/test_openstack.py
+++ b/tests/unittests/test_datasource/test_openstack.py
@@ -8,12 +8,11 @@ import copy
import httpretty as hp
import json
import re
+from io import StringIO
+from urllib.parse import urlparse
from cloudinit.tests import helpers as test_helpers
-from six.moves.urllib.parse import urlparse
-from six import StringIO, text_type
-
from cloudinit import helpers
from cloudinit import settings
from cloudinit.sources import BrokenMetadata, convert_vendordata, UNSET
@@ -569,8 +568,7 @@ class TestMetadataReader(test_helpers.HttprettyTestCase):
'uuid': 'b0fa911b-69d4-4476-bbe2-1c92bff6535c'}
def register(self, path, body=None, status=200):
- content = (body if not isinstance(body, text_type)
- else body.encode('utf-8'))
+ content = body if not isinstance(body, str) else body.encode('utf-8')
hp.register_uri(
hp.GET, self.burl + "openstack" + path, status=status,
body=content)
diff --git a/tests/unittests/test_datasource/test_ovf.py b/tests/unittests/test_datasource/test_ovf.py
index 349d54cc..a19c35c8 100644
--- a/tests/unittests/test_datasource/test_ovf.py
+++ b/tests/unittests/test_datasource/test_ovf.py
@@ -169,19 +169,56 @@ class TestDatasourceOVF(CiTestCase):
MARKER-ID = 12345345
""")
util.write_file(conf_file, conf_content)
- with self.assertRaises(CustomScriptNotFound) as context:
- wrap_and_call(
- 'cloudinit.sources.DataSourceOVF',
- {'util.read_dmi_data': 'vmware',
- 'util.del_dir': True,
- 'search_file': self.tdir,
- 'wait_for_imc_cfg_file': conf_file,
- 'get_nics_to_enable': ''},
- ds.get_data)
+ with mock.patch(MPATH + 'get_tools_config', return_value='true'):
+ with self.assertRaises(CustomScriptNotFound) as context:
+ wrap_and_call(
+ 'cloudinit.sources.DataSourceOVF',
+ {'util.read_dmi_data': 'vmware',
+ 'util.del_dir': True,
+ 'search_file': self.tdir,
+ 'wait_for_imc_cfg_file': conf_file,
+ 'get_nics_to_enable': ''},
+ ds.get_data)
customscript = self.tmp_path('test-script', self.tdir)
self.assertIn('Script %s not found!!' % customscript,
str(context.exception))
+ def test_get_data_cust_script_disabled(self):
+ """If custom script is disabled by VMware tools configuration,
+ raise a RuntimeError.
+ """
+ paths = Paths({'cloud_dir': self.tdir})
+ ds = self.datasource(
+ sys_cfg={'disable_vmware_customization': False}, distro={},
+ paths=paths)
+ # Prepare the conf file
+ conf_file = self.tmp_path('test-cust', self.tdir)
+ conf_content = dedent("""\
+ [CUSTOM-SCRIPT]
+ SCRIPT-NAME = test-script
+ [MISC]
+ MARKER-ID = 12345346
+ """)
+ util.write_file(conf_file, conf_content)
+ # Prepare the custom sript
+ customscript = self.tmp_path('test-script', self.tdir)
+ util.write_file(customscript, "This is the post cust script")
+
+ with mock.patch(MPATH + 'get_tools_config', return_value='invalid'):
+ with mock.patch(MPATH + 'set_customization_status',
+ return_value=('msg', b'')):
+ with self.assertRaises(RuntimeError) as context:
+ wrap_and_call(
+ 'cloudinit.sources.DataSourceOVF',
+ {'util.read_dmi_data': 'vmware',
+ 'util.del_dir': True,
+ 'search_file': self.tdir,
+ 'wait_for_imc_cfg_file': conf_file,
+ 'get_nics_to_enable': ''},
+ ds.get_data)
+ self.assertIn('Custom script is disabled by VM Administrator',
+ str(context.exception))
+
def test_get_data_non_vmware_seed_platform_info(self):
"""Platform info properly reports when on non-vmware platforms."""
paths = Paths({'cloud_dir': self.tdir, 'run_dir': self.tdir})
diff --git a/tests/unittests/test_datasource/test_rbx.py b/tests/unittests/test_datasource/test_rbx.py
new file mode 100644
index 00000000..aabf1f18
--- /dev/null
+++ b/tests/unittests/test_datasource/test_rbx.py
@@ -0,0 +1,208 @@
+import json
+
+from cloudinit import helpers
+from cloudinit import distros
+from cloudinit.sources import DataSourceRbxCloud as ds
+from cloudinit.tests.helpers import mock, CiTestCase, populate_dir
+
+DS_PATH = "cloudinit.sources.DataSourceRbxCloud"
+
+CRYPTO_PASS = "$6$uktth46t$FvpDzFD2iL9YNZIG1Epz7957hJqbH0f" \
+ "QKhnzcfBcUhEodGAWRqTy7tYG4nEW7SUOYBjxOSFIQW5" \
+ "tToyGP41.s1"
+
+CLOUD_METADATA = {
+ "vm": {
+ "memory": 4,
+ "cpu": 2,
+ "name": "vm-image-builder",
+ "_id": "5beab44f680cffd11f0e60fc"
+ },
+ "additionalMetadata": {
+ "username": "guru",
+ "sshKeys": ["ssh-rsa ..."],
+ "password": {
+ "sha512": CRYPTO_PASS
+ }
+ },
+ "disk": [
+ {"size": 10, "type": "ssd",
+ "name": "vm-image-builder-os",
+ "_id": "5beab450680cffd11f0e60fe"},
+ {"size": 2, "type": "ssd",
+ "name": "ubuntu-1804-bionic",
+ "_id": "5bef002c680cffd11f107590"}
+ ],
+ "netadp": [
+ {
+ "ip": [{"address": "62.181.8.174"}],
+ "network": {
+ "dns": {"nameservers": ["8.8.8.8", "8.8.4.4"]},
+ "routing": [],
+ "gateway": "62.181.8.1",
+ "netmask": "255.255.248.0",
+ "name": "public",
+ "type": "public",
+ "_id": "5784e97be2627505227b578c"
+ },
+ "speed": 1000,
+ "type": "hv",
+ "macaddress": "00:15:5D:FF:0F:03",
+ "_id": "5beab450680cffd11f0e6102"
+ },
+ {
+ "ip": [{"address": "10.209.78.11"}],
+ "network": {
+ "dns": {"nameservers": ["9.9.9.9", "8.8.8.8"]},
+ "routing": [],
+ "gateway": "10.209.78.1",
+ "netmask": "255.255.255.0",
+ "name": "network-determined-bardeen",
+ "type": "private",
+ "_id": "5beaec64680cffd11f0e7c31"
+ },
+ "speed": 1000,
+ "type": "hv",
+ "macaddress": "00:15:5D:FF:0F:24",
+ "_id": "5bec18c6680cffd11f0f0d8b"
+ }
+ ],
+ "dvddrive": [{"iso": {}}]
+}
+
+
+class TestRbxDataSource(CiTestCase):
+ parsed_user = None
+ allowed_subp = ['bash']
+
+ def _fetch_distro(self, kind):
+ cls = distros.fetch(kind)
+ paths = helpers.Paths({})
+ return cls(kind, {}, paths)
+
+ def setUp(self):
+ super(TestRbxDataSource, self).setUp()
+ self.tmp = self.tmp_dir()
+ self.paths = helpers.Paths(
+ {'cloud_dir': self.tmp, 'run_dir': self.tmp}
+ )
+
+ # defaults for few tests
+ self.ds = ds.DataSourceRbxCloud
+ self.seed_dir = self.paths.seed_dir
+ self.sys_cfg = {'datasource': {'RbxCloud': {'dsmode': 'local'}}}
+
+ def test_seed_read_user_data_callback_empty_file(self):
+ populate_user_metadata(self.seed_dir, '')
+ populate_cloud_metadata(self.seed_dir, {})
+ results = ds.read_user_data_callback(self.seed_dir)
+
+ self.assertIsNone(results)
+
+ def test_seed_read_user_data_callback_valid_disk(self):
+ populate_user_metadata(self.seed_dir, '')
+ populate_cloud_metadata(self.seed_dir, CLOUD_METADATA)
+ results = ds.read_user_data_callback(self.seed_dir)
+
+ self.assertNotEqual(results, None)
+ self.assertTrue('userdata' in results)
+ self.assertTrue('metadata' in results)
+ self.assertTrue('cfg' in results)
+
+ def test_seed_read_user_data_callback_userdata(self):
+ userdata = "#!/bin/sh\nexit 1"
+ populate_user_metadata(self.seed_dir, userdata)
+ populate_cloud_metadata(self.seed_dir, CLOUD_METADATA)
+
+ results = ds.read_user_data_callback(self.seed_dir)
+
+ self.assertNotEqual(results, None)
+ self.assertTrue('userdata' in results)
+ self.assertEqual(results['userdata'], userdata)
+
+ def test_generate_network_config(self):
+ expected = {
+ 'version': 1,
+ 'config': [
+ {
+ 'subnets': [
+ {'control': 'auto',
+ 'dns_nameservers': ['8.8.8.8', '8.8.4.4'],
+ 'netmask': '255.255.248.0',
+ 'address': '62.181.8.174',
+ 'type': 'static', 'gateway': '62.181.8.1'}
+ ],
+ 'type': 'physical',
+ 'name': 'eth0',
+ 'mac_address': '00:15:5d:ff:0f:03'
+ },
+ {
+ 'subnets': [
+ {'control': 'auto',
+ 'dns_nameservers': ['9.9.9.9', '8.8.8.8'],
+ 'netmask': '255.255.255.0',
+ 'address': '10.209.78.11',
+ 'type': 'static',
+ 'gateway': '10.209.78.1'}
+ ],
+ 'type': 'physical',
+ 'name': 'eth1',
+ 'mac_address': '00:15:5d:ff:0f:24'
+ }
+ ]
+ }
+ self.assertTrue(
+ ds.generate_network_config(CLOUD_METADATA['netadp']),
+ expected
+ )
+
+ @mock.patch(DS_PATH + '.util.subp')
+ def test_gratuitous_arp_run_standard_arping(self, m_subp):
+ """Test handle run arping & parameters."""
+ items = [
+ {
+ 'destination': '172.17.0.2',
+ 'source': '172.16.6.104'
+ },
+ {
+ 'destination': '172.17.0.2',
+ 'source': '172.16.6.104',
+ },
+ ]
+ ds.gratuitous_arp(items, self._fetch_distro('ubuntu'))
+ self.assertEqual([
+ mock.call([
+ 'arping', '-c', '2', '-S',
+ '172.16.6.104', '172.17.0.2'
+ ]),
+ mock.call([
+ 'arping', '-c', '2', '-S',
+ '172.16.6.104', '172.17.0.2'
+ ])
+ ], m_subp.call_args_list
+ )
+
+ @mock.patch(DS_PATH + '.util.subp')
+ def test_handle_rhel_like_arping(self, m_subp):
+ """Test handle on RHEL-like distros."""
+ items = [
+ {
+ 'source': '172.16.6.104',
+ 'destination': '172.17.0.2',
+ }
+ ]
+ ds.gratuitous_arp(items, self._fetch_distro('fedora'))
+ self.assertEqual([
+ mock.call(
+ ['arping', '-c', '2', '-s', '172.16.6.104', '172.17.0.2']
+ )],
+ m_subp.call_args_list
+ )
+
+
+def populate_cloud_metadata(path, data):
+ populate_dir(path, {'cloud.json': json.dumps(data)})
+
+
+def populate_user_metadata(path, data):
+ populate_dir(path, {'user.data': data})
diff --git a/tests/unittests/test_datasource/test_scaleway.py b/tests/unittests/test_datasource/test_scaleway.py
index c2bc7a00..1b4dd0ad 100644
--- a/tests/unittests/test_datasource/test_scaleway.py
+++ b/tests/unittests/test_datasource/test_scaleway.py
@@ -7,6 +7,7 @@ import requests
from cloudinit import helpers
from cloudinit import settings
+from cloudinit import sources
from cloudinit.sources import DataSourceScaleway
from cloudinit.tests.helpers import mock, HttprettyTestCase, CiTestCase
@@ -49,6 +50,9 @@ class MetadataResponses(object):
FAKE_METADATA = {
'id': '00000000-0000-0000-0000-000000000000',
'hostname': 'scaleway.host',
+ 'tags': [
+ "AUTHORIZED_KEY=ssh-rsa_AAAAB3NzaC1yc2EAAAADAQABDDDDD",
+ ],
'ssh_public_keys': [{
'key': 'ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABA',
'fingerprint': '2048 06:ae:... login (RSA)'
@@ -204,10 +208,11 @@ class TestDataSourceScaleway(HttprettyTestCase):
self.assertEqual(self.datasource.get_instance_id(),
MetadataResponses.FAKE_METADATA['id'])
- self.assertEqual(self.datasource.get_public_ssh_keys(), [
- elem['key'] for elem in
- MetadataResponses.FAKE_METADATA['ssh_public_keys']
- ])
+ self.assertEqual(self.datasource.get_public_ssh_keys().sort(), [
+ u'ssh-rsa AAAAB3NzaC1yc2EAAAADAQABCCCCC',
+ u'ssh-rsa AAAAB3NzaC1yc2EAAAADAQABDDDDD',
+ u'ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABA',
+ ].sort())
self.assertEqual(self.datasource.get_hostname(),
MetadataResponses.FAKE_METADATA['hostname'])
self.assertEqual(self.datasource.get_userdata_raw(),
@@ -218,6 +223,70 @@ class TestDataSourceScaleway(HttprettyTestCase):
self.assertIsNone(self.datasource.region)
self.assertEqual(sleep.call_count, 0)
+ def test_ssh_keys_empty(self):
+ """
+ get_public_ssh_keys() should return empty list if no ssh key are
+ available
+ """
+ self.datasource.metadata['tags'] = []
+ self.datasource.metadata['ssh_public_keys'] = []
+ self.assertEqual(self.datasource.get_public_ssh_keys(), [])
+
+ def test_ssh_keys_only_tags(self):
+ """
+ get_public_ssh_keys() should return list of keys available in tags
+ """
+ self.datasource.metadata['tags'] = [
+ "AUTHORIZED_KEY=ssh-rsa_AAAAB3NzaC1yc2EAAAADAQABDDDDD",
+ "AUTHORIZED_KEY=ssh-rsa_AAAAB3NzaC1yc2EAAAADAQABCCCCC",
+ ]
+ self.datasource.metadata['ssh_public_keys'] = []
+ self.assertEqual(self.datasource.get_public_ssh_keys().sort(), [
+ u'ssh-rsa AAAAB3NzaC1yc2EAAAADAQABDDDDD',
+ u'ssh-rsa AAAAB3NzaC1yc2EAAAADAQABCCCCC',
+ ].sort())
+
+ def test_ssh_keys_only_conf(self):
+ """
+ get_public_ssh_keys() should return list of keys available in
+ ssh_public_keys field
+ """
+ self.datasource.metadata['tags'] = []
+ self.datasource.metadata['ssh_public_keys'] = [{
+ 'key': 'ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABA',
+ 'fingerprint': '2048 06:ae:... login (RSA)'
+ }, {
+ 'key': 'ssh-rsa AAAAB3NzaC1yc2EAAAADAQABCCCCC',
+ 'fingerprint': '2048 06:ff:... login2 (RSA)'
+ }]
+ self.assertEqual(self.datasource.get_public_ssh_keys().sort(), [
+ u'ssh-rsa AAAAB3NzaC1yc2EAAAADAQABCCCCC',
+ u'ssh-rsa AAAAB3NzaC1yc2EAAAADAQABDDDDD',
+ u'ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABA',
+ ].sort())
+
+ def test_ssh_keys_both(self):
+ """
+ get_public_ssh_keys() should return a merge of keys available
+ in ssh_public_keys and tags
+ """
+ self.datasource.metadata['tags'] = [
+ "AUTHORIZED_KEY=ssh-rsa_AAAAB3NzaC1yc2EAAAADAQABDDDDD",
+ ]
+
+ self.datasource.metadata['ssh_public_keys'] = [{
+ 'key': 'ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABA',
+ 'fingerprint': '2048 06:ae:... login (RSA)'
+ }, {
+ 'key': 'ssh-rsa AAAAB3NzaC1yc2EAAAADAQABCCCCC',
+ 'fingerprint': '2048 06:ff:... login2 (RSA)'
+ }]
+ self.assertEqual(self.datasource.get_public_ssh_keys().sort(), [
+ u'ssh-rsa AAAAB3NzaC1yc2EAAAADAQABCCCCC',
+ u'ssh-rsa AAAAB3NzaC1yc2EAAAADAQABDDDDD',
+ u'ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABA',
+ ].sort())
+
@mock.patch('cloudinit.sources.DataSourceScaleway.EphemeralDHCPv4')
@mock.patch('cloudinit.sources.DataSourceScaleway.SourceAddressAdapter',
get_source_address_adapter)
@@ -335,3 +404,51 @@ class TestDataSourceScaleway(HttprettyTestCase):
netcfg = self.datasource.network_config
self.assertEqual(netcfg, '0xdeadbeef')
+
+ @mock.patch('cloudinit.sources.DataSourceScaleway.net.find_fallback_nic')
+ @mock.patch('cloudinit.util.get_cmdline')
+ def test_network_config_unset(self, m_get_cmdline, fallback_nic):
+ """
+ _network_config will be set to sources.UNSET after the first boot.
+ Make sure it behave correctly.
+ """
+ m_get_cmdline.return_value = 'scaleway'
+ fallback_nic.return_value = 'ens2'
+ self.datasource.metadata['ipv6'] = None
+ self.datasource._network_config = sources.UNSET
+
+ resp = {'version': 1,
+ 'config': [{
+ 'type': 'physical',
+ 'name': 'ens2',
+ 'subnets': [{'type': 'dhcp4'}]}]
+ }
+
+ netcfg = self.datasource.network_config
+ self.assertEqual(netcfg, resp)
+
+ @mock.patch('cloudinit.sources.DataSourceScaleway.LOG.warning')
+ @mock.patch('cloudinit.sources.DataSourceScaleway.net.find_fallback_nic')
+ @mock.patch('cloudinit.util.get_cmdline')
+ def test_network_config_cached_none(self, m_get_cmdline, fallback_nic,
+ logwarning):
+ """
+ network_config() should return config data if cached data is None
+ rather than sources.UNSET
+ """
+ m_get_cmdline.return_value = 'scaleway'
+ fallback_nic.return_value = 'ens2'
+ self.datasource.metadata['ipv6'] = None
+ self.datasource._network_config = None
+
+ resp = {'version': 1,
+ 'config': [{
+ 'type': 'physical',
+ 'name': 'ens2',
+ 'subnets': [{'type': 'dhcp4'}]}]
+ }
+
+ netcfg = self.datasource.network_config
+ self.assertEqual(netcfg, resp)
+ logwarning.assert_called_with('Found None as cached _network_config. '
+ 'Resetting to %s', sources.UNSET)
diff --git a/tests/unittests/test_datasource/test_smartos.py b/tests/unittests/test_datasource/test_smartos.py
index 42ac6971..62084de5 100644
--- a/tests/unittests/test_datasource/test_smartos.py
+++ b/tests/unittests/test_datasource/test_smartos.py
@@ -1,5 +1,5 @@
# Copyright (C) 2013 Canonical Ltd.
-# Copyright (c) 2018, Joyent, Inc.
+# Copyright 2019 Joyent, Inc.
#
# Author: Ben Howard <ben.howard@canonical.com>
#
@@ -31,8 +31,7 @@ from cloudinit.sources.DataSourceSmartOS import (
convert_smartos_network_data as convert_net,
SMARTOS_ENV_KVM, SERIAL_DEVICE, get_smartos_environ,
identify_file)
-
-import six
+from cloudinit.event import EventType
from cloudinit import helpers as c_helpers
from cloudinit.util import (
@@ -653,6 +652,12 @@ class TestSmartOSDataSource(FilesystemMockingTestCase):
self.assertEqual(dsrc.device_name_to_device('FOO'),
mydscfg['disk_aliases']['FOO'])
+ def test_reconfig_network_on_boot(self):
+ # Test to ensure that network is configured from metadata on each boot
+ dsrc = self._get_ds(mockdata=MOCK_RETURNS)
+ self.assertSetEqual(set([EventType.BOOT_NEW_INSTANCE, EventType.BOOT]),
+ dsrc.update_events['network'])
+
class TestIdentifyFile(CiTestCase):
"""Test the 'identify_file' utility."""
@@ -791,7 +796,7 @@ class TestJoyentMetadataClient(FilesystemMockingTestCase):
return self.serial.write.call_args[0][0]
def test_get_metadata_writes_bytes(self):
- self.assertIsInstance(self._get_written_line(), six.binary_type)
+ self.assertIsInstance(self._get_written_line(), bytes)
def test_get_metadata_line_starts_with_v2(self):
foo = self._get_written_line()