summaryrefslogtreecommitdiff
path: root/tests/unittests/test_datasource
diff options
context:
space:
mode:
authorChad Smith <chad.smith@canonical.com>2018-10-03 12:10:23 -0600
committerChad Smith <chad.smith@canonical.com>2018-10-03 12:10:23 -0600
commitd6347e1c439eda7f43d9620dac2b461e980e1ae9 (patch)
tree08410263488d11a2a29edcc620575ed1b028100e /tests/unittests/test_datasource
parent564793a76b9c9add1ee81bab4919c8dccd45a33d (diff)
parente28000457591bde9f22d6b7a538b1fc33349d780 (diff)
downloadvyos-cloud-init-d6347e1c439eda7f43d9620dac2b461e980e1ae9.tar.gz
vyos-cloud-init-d6347e1c439eda7f43d9620dac2b461e980e1ae9.zip
merge from master at 18.4
Diffstat (limited to 'tests/unittests/test_datasource')
-rw-r--r--tests/unittests/test_datasource/test_altcloud.py44
-rw-r--r--tests/unittests/test_datasource/test_azure.py399
-rw-r--r--tests/unittests/test_datasource/test_cloudsigma.py3
-rw-r--r--tests/unittests/test_datasource/test_common.py4
-rw-r--r--tests/unittests/test_datasource/test_configdrive.py15
-rw-r--r--tests/unittests/test_datasource/test_nocloud.py2
-rw-r--r--tests/unittests/test_datasource/test_opennebula.py409
-rw-r--r--tests/unittests/test_datasource/test_openstack.py121
-rw-r--r--tests/unittests/test_datasource/test_ovf.py8
-rw-r--r--tests/unittests/test_datasource/test_scaleway.py79
-rw-r--r--tests/unittests/test_datasource/test_smartos.py94
11 files changed, 1077 insertions, 101 deletions
diff --git a/tests/unittests/test_datasource/test_altcloud.py b/tests/unittests/test_datasource/test_altcloud.py
index 3253f3ad..ff35904e 100644
--- a/tests/unittests/test_datasource/test_altcloud.py
+++ b/tests/unittests/test_datasource/test_altcloud.py
@@ -262,64 +262,56 @@ class TestUserDataRhevm(CiTestCase):
'''
Test to exercise method: DataSourceAltCloud.user_data_rhevm()
'''
- cmd_pass = ['true']
- cmd_fail = ['false']
- cmd_not_found = ['bogus bad command']
-
def setUp(self):
'''Set up.'''
self.paths = helpers.Paths({'cloud_dir': '/tmp'})
- self.mount_dir = tempfile.mkdtemp()
+ self.mount_dir = self.tmp_dir()
_write_user_data_files(self.mount_dir, 'test user data')
-
- def tearDown(self):
- # Reset
-
- _remove_user_data_files(self.mount_dir)
-
- # Attempt to remove the temp dir ignoring errors
- try:
- shutil.rmtree(self.mount_dir)
- except OSError:
- pass
-
- dsac.CLOUD_INFO_FILE = '/etc/sysconfig/cloud-info'
- dsac.CMD_PROBE_FLOPPY = ['modprobe', 'floppy']
- dsac.CMD_UDEVADM_SETTLE = ['udevadm', 'settle',
- '--quiet', '--timeout=5']
+ self.add_patch(
+ 'cloudinit.sources.DataSourceAltCloud.modprobe_floppy',
+ 'm_modprobe_floppy', return_value=None)
+ self.add_patch(
+ 'cloudinit.sources.DataSourceAltCloud.util.udevadm_settle',
+ 'm_udevadm_settle', return_value=('', ''))
+ self.add_patch(
+ 'cloudinit.sources.DataSourceAltCloud.util.mount_cb',
+ 'm_mount_cb')
def test_mount_cb_fails(self):
'''Test user_data_rhevm() where mount_cb fails.'''
- dsac.CMD_PROBE_FLOPPY = self.cmd_pass
+ self.m_mount_cb.side_effect = util.MountFailedError("Failed Mount")
dsrc = dsac.DataSourceAltCloud({}, None, self.paths)
self.assertEqual(False, dsrc.user_data_rhevm())
def test_modprobe_fails(self):
'''Test user_data_rhevm() where modprobe fails.'''
- dsac.CMD_PROBE_FLOPPY = self.cmd_fail
+ self.m_modprobe_floppy.side_effect = util.ProcessExecutionError(
+ "Failed modprobe")
dsrc = dsac.DataSourceAltCloud({}, None, self.paths)
self.assertEqual(False, dsrc.user_data_rhevm())
def test_no_modprobe_cmd(self):
'''Test user_data_rhevm() with no modprobe command.'''
- dsac.CMD_PROBE_FLOPPY = self.cmd_not_found
+ self.m_modprobe_floppy.side_effect = util.ProcessExecutionError(
+ "No such file or dir")
dsrc = dsac.DataSourceAltCloud({}, None, self.paths)
self.assertEqual(False, dsrc.user_data_rhevm())
def test_udevadm_fails(self):
'''Test user_data_rhevm() where udevadm fails.'''
- dsac.CMD_UDEVADM_SETTLE = self.cmd_fail
+ self.m_udevadm_settle.side_effect = util.ProcessExecutionError(
+ "Failed settle.")
dsrc = dsac.DataSourceAltCloud({}, None, self.paths)
self.assertEqual(False, dsrc.user_data_rhevm())
def test_no_udevadm_cmd(self):
'''Test user_data_rhevm() with no udevadm command.'''
- dsac.CMD_UDEVADM_SETTLE = self.cmd_not_found
+ self.m_udevadm_settle.side_effect = OSError("No such file or dir")
dsrc = dsac.DataSourceAltCloud({}, None, self.paths)
self.assertEqual(False, dsrc.user_data_rhevm())
diff --git a/tests/unittests/test_datasource/test_azure.py b/tests/unittests/test_datasource/test_azure.py
index e82716eb..4e428b71 100644
--- a/tests/unittests/test_datasource/test_azure.py
+++ b/tests/unittests/test_datasource/test_azure.py
@@ -1,15 +1,21 @@
# This file is part of cloud-init. See LICENSE file for license information.
+from cloudinit import distros
from cloudinit import helpers
-from cloudinit.sources import DataSourceAzure as dsaz
+from cloudinit import url_helper
+from cloudinit.sources import (
+ UNSET, DataSourceAzure as dsaz, InvalidMetaDataException)
from cloudinit.util import (b64e, decode_binary, load_file, write_file,
find_freebsd_part, get_path_dev_freebsd,
MountFailedError)
from cloudinit.version import version_string as vs
-from cloudinit.tests.helpers import (CiTestCase, TestCase, populate_dir, mock,
- ExitStack, PY26, SkipTest)
+from cloudinit.tests.helpers import (
+ HttprettyTestCase, CiTestCase, populate_dir, mock, wrap_and_call,
+ ExitStack, PY26, SkipTest)
import crypt
+import httpretty
+import json
import os
import stat
import xml.etree.ElementTree as ET
@@ -77,6 +83,106 @@ def construct_valid_ovf_env(data=None, pubkeys=None,
return content
+NETWORK_METADATA = {
+ "network": {
+ "interface": [
+ {
+ "macAddress": "000D3A047598",
+ "ipv6": {
+ "ipAddress": []
+ },
+ "ipv4": {
+ "subnet": [
+ {
+ "prefix": "24",
+ "address": "10.0.0.0"
+ }
+ ],
+ "ipAddress": [
+ {
+ "privateIpAddress": "10.0.0.4",
+ "publicIpAddress": "104.46.124.81"
+ }
+ ]
+ }
+ }
+ ]
+ }
+}
+
+
+class TestGetMetadataFromIMDS(HttprettyTestCase):
+
+ with_logs = True
+
+ def setUp(self):
+ super(TestGetMetadataFromIMDS, self).setUp()
+ self.network_md_url = dsaz.IMDS_URL + "instance?api-version=2017-12-01"
+
+ @mock.patch('cloudinit.sources.DataSourceAzure.readurl')
+ @mock.patch('cloudinit.sources.DataSourceAzure.EphemeralDHCPv4')
+ @mock.patch('cloudinit.sources.DataSourceAzure.net.is_up')
+ def test_get_metadata_does_not_dhcp_if_network_is_up(
+ self, m_net_is_up, m_dhcp, m_readurl):
+ """Do not perform DHCP setup when nic is already up."""
+ m_net_is_up.return_value = True
+ m_readurl.return_value = url_helper.StringResponse(
+ json.dumps(NETWORK_METADATA).encode('utf-8'))
+ self.assertEqual(
+ NETWORK_METADATA,
+ dsaz.get_metadata_from_imds('eth9', retries=3))
+
+ m_net_is_up.assert_called_with('eth9')
+ m_dhcp.assert_not_called()
+ self.assertIn(
+ "Crawl of Azure Instance Metadata Service (IMDS) took", # log_time
+ self.logs.getvalue())
+
+ @mock.patch('cloudinit.sources.DataSourceAzure.readurl')
+ @mock.patch('cloudinit.sources.DataSourceAzure.EphemeralDHCPv4')
+ @mock.patch('cloudinit.sources.DataSourceAzure.net.is_up')
+ def test_get_metadata_performs_dhcp_when_network_is_down(
+ self, m_net_is_up, m_dhcp, m_readurl):
+ """Perform DHCP setup when nic is not up."""
+ m_net_is_up.return_value = False
+ m_readurl.return_value = url_helper.StringResponse(
+ json.dumps(NETWORK_METADATA).encode('utf-8'))
+
+ self.assertEqual(
+ NETWORK_METADATA,
+ dsaz.get_metadata_from_imds('eth9', retries=2))
+
+ m_net_is_up.assert_called_with('eth9')
+ m_dhcp.assert_called_with('eth9')
+ self.assertIn(
+ "Crawl of Azure Instance Metadata Service (IMDS) took", # log_time
+ self.logs.getvalue())
+
+ m_readurl.assert_called_with(
+ self.network_md_url, exception_cb=mock.ANY,
+ headers={'Metadata': 'true'}, retries=2, timeout=1)
+
+ @mock.patch('cloudinit.url_helper.time.sleep')
+ @mock.patch('cloudinit.sources.DataSourceAzure.net.is_up')
+ def test_get_metadata_from_imds_empty_when_no_imds_present(
+ self, m_net_is_up, m_sleep):
+ """Return empty dict when IMDS network metadata is absent."""
+ httpretty.register_uri(
+ httpretty.GET,
+ dsaz.IMDS_URL + 'instance?api-version=2017-12-01',
+ body={}, status=404)
+
+ m_net_is_up.return_value = True # skips dhcp
+
+ self.assertEqual({}, dsaz.get_metadata_from_imds('eth9', retries=2))
+
+ m_net_is_up.assert_called_with('eth9')
+ self.assertEqual([mock.call(1), mock.call(1)], m_sleep.call_args_list)
+ self.assertIn(
+ "Crawl of Azure Instance Metadata Service (IMDS) took", # log_time
+ self.logs.getvalue())
+
+
class TestAzureDataSource(CiTestCase):
with_logs = True
@@ -95,8 +201,19 @@ class TestAzureDataSource(CiTestCase):
self.patches = ExitStack()
self.addCleanup(self.patches.close)
- self.patches.enter_context(mock.patch.object(dsaz, '_get_random_seed'))
-
+ self.patches.enter_context(mock.patch.object(
+ dsaz, '_get_random_seed', return_value='wild'))
+ self.m_get_metadata_from_imds = self.patches.enter_context(
+ mock.patch.object(
+ dsaz, 'get_metadata_from_imds',
+ mock.MagicMock(return_value=NETWORK_METADATA)))
+ self.m_fallback_nic = self.patches.enter_context(
+ mock.patch('cloudinit.sources.net.find_fallback_nic',
+ return_value='eth9'))
+ self.m_remove_ubuntu_network_scripts = self.patches.enter_context(
+ mock.patch.object(
+ dsaz, 'maybe_remove_ubuntu_network_config_scripts',
+ mock.MagicMock()))
super(TestAzureDataSource, self).setUp()
def apply_patches(self, patches):
@@ -137,7 +254,7 @@ scbus-1 on xpt0 bus 0
])
return dsaz
- def _get_ds(self, data, agent_command=None):
+ def _get_ds(self, data, agent_command=None, distro=None):
def dsdevs():
return data.get('dsdevs', [])
@@ -186,8 +303,11 @@ scbus-1 on xpt0 bus 0
side_effect=_wait_for_files)),
])
+ if distro is not None:
+ distro_cls = distros.fetch(distro)
+ distro = distro_cls(distro, data.get('sys_cfg', {}), self.paths)
dsrc = dsaz.DataSourceAzure(
- data.get('sys_cfg', {}), distro=None, paths=self.paths)
+ data.get('sys_cfg', {}), distro=distro, paths=self.paths)
if agent_command is not None:
dsrc.ds_cfg['agent_command'] = agent_command
@@ -260,29 +380,20 @@ fdescfs /dev/fd fdescfs rw 0 0
res = get_path_dev_freebsd('/etc', mnt_list)
self.assertIsNotNone(res)
- @mock.patch('cloudinit.sources.DataSourceAzure.util.read_dmi_data')
- def test_non_azure_dmi_chassis_asset_tag(self, m_read_dmi_data):
- """Report non-azure when DMI's chassis asset tag doesn't match.
-
- Return False when the asset tag doesn't match Azure's static
- AZURE_CHASSIS_ASSET_TAG.
- """
+ @mock.patch('cloudinit.sources.DataSourceAzure._is_platform_viable')
+ def test_call_is_platform_viable_seed(self, m_is_platform_viable):
+ """Check seed_dir using _is_platform_viable and return False."""
# Return a non-matching asset tag value
- nonazure_tag = dsaz.AZURE_CHASSIS_ASSET_TAG + 'X'
- m_read_dmi_data.return_value = nonazure_tag
+ m_is_platform_viable.return_value = False
dsrc = dsaz.DataSourceAzure(
{}, distro=None, paths=self.paths)
self.assertFalse(dsrc.get_data())
- self.assertEqual(
- "DEBUG: Non-Azure DMI asset tag '{0}' discovered.\n".format(
- nonazure_tag),
- self.logs.getvalue())
+ m_is_platform_viable.assert_called_with(dsrc.seed_dir)
def test_basic_seed_dir(self):
odata = {'HostName': "myhost", 'UserName': "myuser"}
data = {'ovfcontent': construct_valid_ovf_env(data=odata),
'sys_cfg': {}}
-
dsrc = self._get_ds(data)
ret = dsrc.get_data()
self.assertTrue(ret)
@@ -291,6 +402,82 @@ fdescfs /dev/fd fdescfs rw 0 0
self.assertTrue(os.path.isfile(
os.path.join(self.waagent_d, 'ovf-env.xml')))
+ def test_get_data_non_ubuntu_will_not_remove_network_scripts(self):
+ """get_data on non-Ubuntu will not remove ubuntu net scripts."""
+ odata = {'HostName': "myhost", 'UserName': "myuser"}
+ data = {'ovfcontent': construct_valid_ovf_env(data=odata),
+ 'sys_cfg': {}}
+
+ dsrc = self._get_ds(data, distro='debian')
+ dsrc.get_data()
+ self.m_remove_ubuntu_network_scripts.assert_not_called()
+
+ def test_get_data_on_ubuntu_will_remove_network_scripts(self):
+ """get_data will remove ubuntu net scripts on Ubuntu distro."""
+ odata = {'HostName': "myhost", 'UserName': "myuser"}
+ data = {'ovfcontent': construct_valid_ovf_env(data=odata),
+ 'sys_cfg': {}}
+
+ dsrc = self._get_ds(data, distro='ubuntu')
+ dsrc.get_data()
+ self.m_remove_ubuntu_network_scripts.assert_called_once_with()
+
+ def test_crawl_metadata_returns_structured_data_and_caches_nothing(self):
+ """Return all structured metadata and cache no class attributes."""
+ yaml_cfg = "{agent_command: my_command}\n"
+ odata = {'HostName': "myhost", 'UserName': "myuser",
+ 'UserData': {'text': 'FOOBAR', 'encoding': 'plain'},
+ 'dscfg': {'text': yaml_cfg, 'encoding': 'plain'}}
+ data = {'ovfcontent': construct_valid_ovf_env(data=odata),
+ 'sys_cfg': {}}
+ dsrc = self._get_ds(data)
+ expected_cfg = {
+ 'PreprovisionedVm': False,
+ 'datasource': {'Azure': {'agent_command': 'my_command'}},
+ 'system_info': {'default_user': {'name': u'myuser'}}}
+ expected_metadata = {
+ 'azure_data': {
+ 'configurationsettype': 'LinuxProvisioningConfiguration'},
+ 'imds': {'network': {'interface': [{
+ 'ipv4': {'ipAddress': [
+ {'privateIpAddress': '10.0.0.4',
+ 'publicIpAddress': '104.46.124.81'}],
+ 'subnet': [{'address': '10.0.0.0', 'prefix': '24'}]},
+ 'ipv6': {'ipAddress': []},
+ 'macAddress': '000D3A047598'}]}},
+ 'instance-id': 'test-instance-id',
+ 'local-hostname': u'myhost',
+ 'random_seed': 'wild'}
+
+ crawled_metadata = dsrc.crawl_metadata()
+
+ self.assertItemsEqual(
+ crawled_metadata.keys(),
+ ['cfg', 'files', 'metadata', 'userdata_raw'])
+ self.assertEqual(crawled_metadata['cfg'], expected_cfg)
+ self.assertEqual(
+ list(crawled_metadata['files'].keys()), ['ovf-env.xml'])
+ self.assertIn(
+ b'<HostName>myhost</HostName>',
+ crawled_metadata['files']['ovf-env.xml'])
+ self.assertEqual(crawled_metadata['metadata'], expected_metadata)
+ self.assertEqual(crawled_metadata['userdata_raw'], 'FOOBAR')
+ self.assertEqual(dsrc.userdata_raw, None)
+ self.assertEqual(dsrc.metadata, {})
+ self.assertEqual(dsrc._metadata_imds, UNSET)
+ self.assertFalse(os.path.isfile(
+ os.path.join(self.waagent_d, 'ovf-env.xml')))
+
+ def test_crawl_metadata_raises_invalid_metadata_on_error(self):
+ """crawl_metadata raises an exception on invalid ovf-env.xml."""
+ data = {'ovfcontent': "BOGUS", 'sys_cfg': {}}
+ dsrc = self._get_ds(data)
+ error_msg = ('BrokenAzureDataSource: Invalid ovf-env.xml:'
+ ' syntax error: line 1, column 0')
+ with self.assertRaises(InvalidMetaDataException) as cm:
+ dsrc.crawl_metadata()
+ self.assertEqual(str(cm.exception), error_msg)
+
def test_waagent_d_has_0700_perms(self):
# we expect /var/lib/waagent to be created 0700
dsrc = self._get_ds({'ovfcontent': construct_valid_ovf_env()})
@@ -314,6 +501,20 @@ fdescfs /dev/fd fdescfs rw 0 0
self.assertTrue(ret)
self.assertEqual(data['agent_invoked'], cfg['agent_command'])
+ def test_network_config_set_from_imds(self):
+ """Datasource.network_config returns IMDS network data."""
+ odata = {}
+ data = {'ovfcontent': construct_valid_ovf_env(data=odata)}
+ expected_network_config = {
+ 'ethernets': {
+ 'eth0': {'set-name': 'eth0',
+ 'match': {'macaddress': '00:0d:3a:04:75:98'},
+ 'dhcp4': True}},
+ 'version': 2}
+ dsrc = self._get_ds(data)
+ dsrc.get_data()
+ self.assertEqual(expected_network_config, dsrc.network_config)
+
def test_user_cfg_set_agent_command(self):
# set dscfg in via base64 encoded yaml
cfg = {'agent_command': "my_command"}
@@ -579,12 +780,34 @@ fdescfs /dev/fd fdescfs rw 0 0
self.assertEqual(
[mock.call("/dev/cd0")], m_check_fbsd_cdrom.call_args_list)
+ @mock.patch('cloudinit.net.generate_fallback_config')
+ def test_imds_network_config(self, mock_fallback):
+ """Network config is generated from IMDS network data when present."""
+ odata = {'HostName': "myhost", 'UserName': "myuser"}
+ data = {'ovfcontent': construct_valid_ovf_env(data=odata),
+ 'sys_cfg': {}}
+
+ dsrc = self._get_ds(data)
+ ret = dsrc.get_data()
+ self.assertTrue(ret)
+
+ expected_cfg = {
+ 'ethernets': {
+ 'eth0': {'dhcp4': True,
+ 'match': {'macaddress': '00:0d:3a:04:75:98'},
+ 'set-name': 'eth0'}},
+ 'version': 2}
+
+ self.assertEqual(expected_cfg, dsrc.network_config)
+ mock_fallback.assert_not_called()
+
@mock.patch('cloudinit.net.get_interface_mac')
@mock.patch('cloudinit.net.get_devicelist')
@mock.patch('cloudinit.net.device_driver')
@mock.patch('cloudinit.net.generate_fallback_config')
- def test_network_config(self, mock_fallback, mock_dd,
- mock_devlist, mock_get_mac):
+ def test_fallback_network_config(self, mock_fallback, mock_dd,
+ mock_devlist, mock_get_mac):
+ """On absent IMDS network data, generate network fallback config."""
odata = {'HostName': "myhost", 'UserName': "myuser"}
data = {'ovfcontent': construct_valid_ovf_env(data=odata),
'sys_cfg': {}}
@@ -605,6 +828,8 @@ fdescfs /dev/fd fdescfs rw 0 0
mock_get_mac.return_value = '00:11:22:33:44:55'
dsrc = self._get_ds(data)
+ # Represent empty response from network imds
+ self.m_get_metadata_from_imds.return_value = {}
ret = dsrc.get_data()
self.assertTrue(ret)
@@ -617,8 +842,9 @@ fdescfs /dev/fd fdescfs rw 0 0
@mock.patch('cloudinit.net.get_devicelist')
@mock.patch('cloudinit.net.device_driver')
@mock.patch('cloudinit.net.generate_fallback_config')
- def test_network_config_blacklist(self, mock_fallback, mock_dd,
- mock_devlist, mock_get_mac):
+ def test_fallback_network_config_blacklist(self, mock_fallback, mock_dd,
+ mock_devlist, mock_get_mac):
+ """On absent network metadata, blacklist mlx from fallback config."""
odata = {'HostName': "myhost", 'UserName': "myuser"}
data = {'ovfcontent': construct_valid_ovf_env(data=odata),
'sys_cfg': {}}
@@ -649,6 +875,8 @@ fdescfs /dev/fd fdescfs rw 0 0
mock_get_mac.return_value = '00:11:22:33:44:55'
dsrc = self._get_ds(data)
+ # Represent empty response from network imds
+ self.m_get_metadata_from_imds.return_value = {}
ret = dsrc.get_data()
self.assertTrue(ret)
@@ -689,9 +917,12 @@ class TestAzureBounce(CiTestCase):
mock.patch.object(dsaz, 'get_metadata_from_fabric',
mock.MagicMock(return_value={})))
self.patches.enter_context(
- mock.patch.object(dsaz.util, 'which', lambda x: True))
+ mock.patch.object(dsaz, 'get_metadata_from_imds',
+ mock.MagicMock(return_value={})))
self.patches.enter_context(
- mock.patch.object(dsaz, '_get_random_seed'))
+ mock.patch.object(dsaz.util, 'which', lambda x: True))
+ self.patches.enter_context(mock.patch.object(
+ dsaz, '_get_random_seed', return_value='wild'))
def _dmi_mocks(key):
if key == 'system-uuid':
@@ -719,9 +950,12 @@ class TestAzureBounce(CiTestCase):
mock.patch.object(dsaz, 'set_hostname'))
self.subp = self.patches.enter_context(
mock.patch('cloudinit.sources.DataSourceAzure.util.subp'))
+ self.find_fallback_nic = self.patches.enter_context(
+ mock.patch('cloudinit.net.find_fallback_nic', return_value='eth9'))
def tearDown(self):
self.patches.close()
+ super(TestAzureBounce, self).tearDown()
def _get_ds(self, ovfcontent=None, agent_command=None):
if ovfcontent is not None:
@@ -927,7 +1161,7 @@ class TestLoadAzureDsDir(CiTestCase):
str(context_manager.exception))
-class TestReadAzureOvf(TestCase):
+class TestReadAzureOvf(CiTestCase):
def test_invalid_xml_raises_non_azure_ds(self):
invalid_xml = "<foo>" + construct_valid_ovf_env(data={})
@@ -1188,6 +1422,25 @@ class TestCanDevBeReformatted(CiTestCase):
"(datasource.Azure.never_destroy_ntfs)", msg)
+class TestClearCachedData(CiTestCase):
+
+ def test_clear_cached_attrs_clears_imds(self):
+ """All class attributes are reset to defaults, including imds data."""
+ tmp = self.tmp_dir()
+ paths = helpers.Paths(
+ {'cloud_dir': tmp, 'run_dir': tmp})
+ dsrc = dsaz.DataSourceAzure({}, distro=None, paths=paths)
+ clean_values = [dsrc.metadata, dsrc.userdata, dsrc._metadata_imds]
+ dsrc.metadata = 'md'
+ dsrc.userdata = 'ud'
+ dsrc._metadata_imds = 'imds'
+ dsrc._dirty_cache = True
+ dsrc.clear_cached_attrs()
+ self.assertEqual(
+ [dsrc.metadata, dsrc.userdata, dsrc._metadata_imds],
+ clean_values)
+
+
class TestAzureNetExists(CiTestCase):
def test_azure_net_must_exist_for_legacy_objpkl(self):
@@ -1398,4 +1651,94 @@ class TestAzureDataSourcePreprovisioning(CiTestCase):
self.assertEqual(m_net.call_count, 1)
+class TestRemoveUbuntuNetworkConfigScripts(CiTestCase):
+
+ with_logs = True
+
+ def setUp(self):
+ super(TestRemoveUbuntuNetworkConfigScripts, self).setUp()
+ self.tmp = self.tmp_dir()
+
+ def test_remove_network_scripts_removes_both_files_and_directories(self):
+ """Any files or directories in paths are removed when present."""
+ file1 = self.tmp_path('file1', dir=self.tmp)
+ subdir = self.tmp_path('sub1', dir=self.tmp)
+ subfile = self.tmp_path('leaf1', dir=subdir)
+ write_file(file1, 'file1content')
+ write_file(subfile, 'leafcontent')
+ dsaz.maybe_remove_ubuntu_network_config_scripts(paths=[subdir, file1])
+
+ for path in (file1, subdir, subfile):
+ self.assertFalse(os.path.exists(path),
+ 'Found unremoved: %s' % path)
+
+ expected_logs = [
+ 'INFO: Removing Ubuntu extended network scripts because cloud-init'
+ ' updates Azure network configuration on the following event:'
+ ' System boot.',
+ 'Recursively deleting %s' % subdir,
+ 'Attempting to remove %s' % file1]
+ for log in expected_logs:
+ self.assertIn(log, self.logs.getvalue())
+
+ def test_remove_network_scripts_only_attempts_removal_if_path_exists(self):
+ """Any files or directories absent are skipped without error."""
+ dsaz.maybe_remove_ubuntu_network_config_scripts(paths=[
+ self.tmp_path('nodirhere/', dir=self.tmp),
+ self.tmp_path('notfilehere', dir=self.tmp)])
+ self.assertNotIn('/not/a', self.logs.getvalue()) # No delete logs
+
+ @mock.patch('cloudinit.sources.DataSourceAzure.os.path.exists')
+ def test_remove_network_scripts_default_removes_stock_scripts(self,
+ m_exists):
+ """Azure's stock ubuntu image scripts and artifacts are removed."""
+ # Report path absent on all to avoid delete operation
+ m_exists.return_value = False
+ dsaz.maybe_remove_ubuntu_network_config_scripts()
+ calls = m_exists.call_args_list
+ for path in dsaz.UBUNTU_EXTENDED_NETWORK_SCRIPTS:
+ self.assertIn(mock.call(path), calls)
+
+
+class TestWBIsPlatformViable(CiTestCase):
+ """White box tests for _is_platform_viable."""
+ with_logs = True
+
+ @mock.patch('cloudinit.sources.DataSourceAzure.util.read_dmi_data')
+ def test_true_on_non_azure_chassis(self, m_read_dmi_data):
+ """Return True if DMI chassis-asset-tag is AZURE_CHASSIS_ASSET_TAG."""
+ m_read_dmi_data.return_value = dsaz.AZURE_CHASSIS_ASSET_TAG
+ self.assertTrue(dsaz._is_platform_viable('doesnotmatter'))
+
+ @mock.patch('cloudinit.sources.DataSourceAzure.os.path.exists')
+ @mock.patch('cloudinit.sources.DataSourceAzure.util.read_dmi_data')
+ def test_true_on_azure_ovf_env_in_seed_dir(self, m_read_dmi_data, m_exist):
+ """Return True if ovf-env.xml exists in known seed dirs."""
+ # Non-matching Azure chassis-asset-tag
+ m_read_dmi_data.return_value = dsaz.AZURE_CHASSIS_ASSET_TAG + 'X'
+
+ m_exist.return_value = True
+ self.assertTrue(dsaz._is_platform_viable('/some/seed/dir'))
+ m_exist.called_once_with('/other/seed/dir')
+
+ def test_false_on_no_matching_azure_criteria(self):
+ """Report non-azure on unmatched asset tag, ovf-env absent and no dev.
+
+ Return False when the asset tag doesn't match Azure's static
+ AZURE_CHASSIS_ASSET_TAG, no ovf-env.xml files exist in known seed dirs
+ and no devices have a label starting with prefix 'rd_rdfe_'.
+ """
+ self.assertFalse(wrap_and_call(
+ 'cloudinit.sources.DataSourceAzure',
+ {'os.path.exists': False,
+ # Non-matching Azure chassis-asset-tag
+ 'util.read_dmi_data': dsaz.AZURE_CHASSIS_ASSET_TAG + 'X',
+ 'util.which': None},
+ dsaz._is_platform_viable, 'doesnotmatter'))
+ self.assertIn(
+ "DEBUG: Non-Azure DMI asset tag '{0}' discovered.\n".format(
+ dsaz.AZURE_CHASSIS_ASSET_TAG + 'X'),
+ self.logs.getvalue())
+
+
# vi: ts=4 expandtab
diff --git a/tests/unittests/test_datasource/test_cloudsigma.py b/tests/unittests/test_datasource/test_cloudsigma.py
index f6a59b6b..380ad1b5 100644
--- a/tests/unittests/test_datasource/test_cloudsigma.py
+++ b/tests/unittests/test_datasource/test_cloudsigma.py
@@ -42,6 +42,9 @@ class CepkoMock(Cepko):
class DataSourceCloudSigmaTest(test_helpers.CiTestCase):
def setUp(self):
super(DataSourceCloudSigmaTest, self).setUp()
+ self.add_patch(
+ "cloudinit.sources.DataSourceCloudSigma.util.is_container",
+ "m_is_container", return_value=False)
self.paths = helpers.Paths({'run_dir': self.tmp_dir()})
self.datasource = DataSourceCloudSigma.DataSourceCloudSigma(
"", "", paths=self.paths)
diff --git a/tests/unittests/test_datasource/test_common.py b/tests/unittests/test_datasource/test_common.py
index 0d35dc29..6b01a4ea 100644
--- a/tests/unittests/test_datasource/test_common.py
+++ b/tests/unittests/test_datasource/test_common.py
@@ -20,6 +20,7 @@ from cloudinit.sources import (
DataSourceNoCloud as NoCloud,
DataSourceOpenNebula as OpenNebula,
DataSourceOpenStack as OpenStack,
+ DataSourceOracle as Oracle,
DataSourceOVF as OVF,
DataSourceScaleway as Scaleway,
DataSourceSmartOS as SmartOS,
@@ -37,10 +38,12 @@ DEFAULT_LOCAL = [
IBMCloud.DataSourceIBMCloud,
NoCloud.DataSourceNoCloud,
OpenNebula.DataSourceOpenNebula,
+ Oracle.DataSourceOracle,
OVF.DataSourceOVF,
SmartOS.DataSourceSmartOS,
Ec2.DataSourceEc2Local,
OpenStack.DataSourceOpenStackLocal,
+ Scaleway.DataSourceScaleway,
]
DEFAULT_NETWORK = [
@@ -55,7 +58,6 @@ DEFAULT_NETWORK = [
NoCloud.DataSourceNoCloudNet,
OpenStack.DataSourceOpenStack,
OVF.DataSourceOVFNet,
- Scaleway.DataSourceScaleway,
]
diff --git a/tests/unittests/test_datasource/test_configdrive.py b/tests/unittests/test_datasource/test_configdrive.py
index 68400f22..231619c9 100644
--- a/tests/unittests/test_datasource/test_configdrive.py
+++ b/tests/unittests/test_datasource/test_configdrive.py
@@ -136,6 +136,7 @@ NETWORK_DATA_3 = {
]
}
+BOND_MAC = "fa:16:3e:b3:72:36"
NETWORK_DATA_BOND = {
"services": [
{"type": "dns", "address": "1.1.1.191"},
@@ -163,7 +164,7 @@ NETWORK_DATA_BOND = {
{"bond_links": ["eth0", "eth1"],
"bond_miimon": 100, "bond_mode": "4",
"bond_xmit_hash_policy": "layer3+4",
- "ethernet_mac_address": "0c:c4:7a:34:6e:3c",
+ "ethernet_mac_address": BOND_MAC,
"id": "bond0", "type": "bond"},
{"ethernet_mac_address": "fa:16:3e:b3:72:30",
"id": "vlan2", "type": "vlan", "vlan_id": 602,
@@ -224,6 +225,9 @@ class TestConfigDriveDataSource(CiTestCase):
def setUp(self):
super(TestConfigDriveDataSource, self).setUp()
+ self.add_patch(
+ "cloudinit.sources.DataSourceConfigDrive.util.find_devs_with",
+ "m_find_devs_with", return_value=[])
self.tmp = self.tmp_dir()
def test_ec2_metadata(self):
@@ -642,7 +646,7 @@ class TestConvertNetworkData(CiTestCase):
routes)
eni_renderer = eni.Renderer()
eni_renderer.render_network_state(
- network_state.parse_net_config_data(ncfg), self.tmp)
+ network_state.parse_net_config_data(ncfg), target=self.tmp)
with open(os.path.join(self.tmp, "etc",
"network", "interfaces"), 'r') as f:
eni_rendering = f.read()
@@ -664,7 +668,7 @@ class TestConvertNetworkData(CiTestCase):
eni_renderer = eni.Renderer()
eni_renderer.render_network_state(
- network_state.parse_net_config_data(ncfg), self.tmp)
+ network_state.parse_net_config_data(ncfg), target=self.tmp)
with open(os.path.join(self.tmp, "etc",
"network", "interfaces"), 'r') as f:
eni_rendering = f.read()
@@ -688,6 +692,9 @@ class TestConvertNetworkData(CiTestCase):
self.assertIn("auto oeth0", eni_rendering)
self.assertIn("auto oeth1", eni_rendering)
self.assertIn("auto bond0", eni_rendering)
+ # The bond should have the given mac address
+ pos = eni_rendering.find("auto bond0")
+ self.assertIn(BOND_MAC, eni_rendering[pos:])
def test_vlan(self):
# light testing of vlan config conversion and eni rendering
@@ -695,7 +702,7 @@ class TestConvertNetworkData(CiTestCase):
known_macs=KNOWN_MACS)
eni_renderer = eni.Renderer()
eni_renderer.render_network_state(
- network_state.parse_net_config_data(ncfg), self.tmp)
+ network_state.parse_net_config_data(ncfg), target=self.tmp)
with open(os.path.join(self.tmp, "etc",
"network", "interfaces"), 'r') as f:
eni_rendering = f.read()
diff --git a/tests/unittests/test_datasource/test_nocloud.py b/tests/unittests/test_datasource/test_nocloud.py
index cdbd1e1a..21931eb7 100644
--- a/tests/unittests/test_datasource/test_nocloud.py
+++ b/tests/unittests/test_datasource/test_nocloud.py
@@ -25,6 +25,8 @@ class TestNoCloudDataSource(CiTestCase):
self.mocks.enter_context(
mock.patch.object(util, 'get_cmdline', return_value=self.cmdline))
+ self.mocks.enter_context(
+ mock.patch.object(util, 'read_dmi_data', return_value=None))
def test_nocloud_seed_dir(self):
md = {'instance-id': 'IID', 'dsmode': 'local'}
diff --git a/tests/unittests/test_datasource/test_opennebula.py b/tests/unittests/test_datasource/test_opennebula.py
index ab42f344..61591017 100644
--- a/tests/unittests/test_datasource/test_opennebula.py
+++ b/tests/unittests/test_datasource/test_opennebula.py
@@ -43,6 +43,7 @@ DS_PATH = "cloudinit.sources.DataSourceOpenNebula"
class TestOpenNebulaDataSource(CiTestCase):
parsed_user = None
+ allowed_subp = ['bash']
def setUp(self):
super(TestOpenNebulaDataSource, self).setUp()
@@ -354,6 +355,412 @@ class TestOpenNebulaNetwork(unittest.TestCase):
system_nics = ('eth0', 'ens3')
+ def test_context_devname(self):
+ """Verify context_devname correctly returns mac and name."""
+ context = {
+ 'ETH0_MAC': '02:00:0a:12:01:01',
+ 'ETH1_MAC': '02:00:0a:12:0f:0f', }
+ expected = {
+ '02:00:0a:12:01:01': 'ETH0',
+ '02:00:0a:12:0f:0f': 'ETH1', }
+ net = ds.OpenNebulaNetwork(context)
+ self.assertEqual(expected, net.context_devname)
+
+ def test_get_nameservers(self):
+ """
+ Verify get_nameservers('device') correctly returns DNS server addresses
+ and search domains.
+ """
+ context = {
+ 'DNS': '1.2.3.8',
+ 'ETH0_DNS': '1.2.3.6 1.2.3.7',
+ 'ETH0_SEARCH_DOMAIN': 'example.com example.org', }
+ expected = {
+ 'addresses': ['1.2.3.6', '1.2.3.7', '1.2.3.8'],
+ 'search': ['example.com', 'example.org']}
+ net = ds.OpenNebulaNetwork(context)
+ val = net.get_nameservers('eth0')
+ self.assertEqual(expected, val)
+
+ def test_get_mtu(self):
+ """Verify get_mtu('device') correctly returns MTU size."""
+ context = {'ETH0_MTU': '1280'}
+ net = ds.OpenNebulaNetwork(context)
+ val = net.get_mtu('eth0')
+ self.assertEqual('1280', val)
+
+ def test_get_ip(self):
+ """Verify get_ip('device') correctly returns IPv4 address."""
+ context = {'ETH0_IP': PUBLIC_IP}
+ net = ds.OpenNebulaNetwork(context)
+ val = net.get_ip('eth0', MACADDR)
+ self.assertEqual(PUBLIC_IP, val)
+
+ def test_get_ip_emptystring(self):
+ """
+ Verify get_ip('device') correctly returns IPv4 address.
+ It returns IP address created by MAC address if ETH0_IP has empty
+ string.
+ """
+ context = {'ETH0_IP': ''}
+ net = ds.OpenNebulaNetwork(context)
+ val = net.get_ip('eth0', MACADDR)
+ self.assertEqual(IP_BY_MACADDR, val)
+
+ def test_get_ip6(self):
+ """
+ Verify get_ip6('device') correctly returns IPv6 address.
+ In this case, IPv6 address is Given by ETH0_IP6.
+ """
+ context = {
+ 'ETH0_IP6': IP6_GLOBAL,
+ 'ETH0_IP6_ULA': '', }
+ expected = [IP6_GLOBAL]
+ net = ds.OpenNebulaNetwork(context)
+ val = net.get_ip6('eth0')
+ self.assertEqual(expected, val)
+
+ def test_get_ip6_ula(self):
+ """
+ Verify get_ip6('device') correctly returns IPv6 address.
+ In this case, IPv6 address is Given by ETH0_IP6_ULA.
+ """
+ context = {
+ 'ETH0_IP6': '',
+ 'ETH0_IP6_ULA': IP6_ULA, }
+ expected = [IP6_ULA]
+ net = ds.OpenNebulaNetwork(context)
+ val = net.get_ip6('eth0')
+ self.assertEqual(expected, val)
+
+ def test_get_ip6_dual(self):
+ """
+ Verify get_ip6('device') correctly returns IPv6 address.
+ In this case, IPv6 addresses are Given by ETH0_IP6 and ETH0_IP6_ULA.
+ """
+ context = {
+ 'ETH0_IP6': IP6_GLOBAL,
+ 'ETH0_IP6_ULA': IP6_ULA, }
+ expected = [IP6_GLOBAL, IP6_ULA]
+ net = ds.OpenNebulaNetwork(context)
+ val = net.get_ip6('eth0')
+ self.assertEqual(expected, val)
+
+ def test_get_ip6_prefix(self):
+ """
+ Verify get_ip6_prefix('device') correctly returns IPv6 prefix.
+ """
+ context = {'ETH0_IP6_PREFIX_LENGTH': IP6_PREFIX}
+ net = ds.OpenNebulaNetwork(context)
+ val = net.get_ip6_prefix('eth0')
+ self.assertEqual(IP6_PREFIX, val)
+
+ def test_get_ip6_prefix_emptystring(self):
+ """
+ Verify get_ip6_prefix('device') correctly returns IPv6 prefix.
+ It returns default value '64' if ETH0_IP6_PREFIX_LENGTH has empty
+ string.
+ """
+ context = {'ETH0_IP6_PREFIX_LENGTH': ''}
+ net = ds.OpenNebulaNetwork(context)
+ val = net.get_ip6_prefix('eth0')
+ self.assertEqual('64', val)
+
+ def test_get_gateway(self):
+ """
+ Verify get_gateway('device') correctly returns IPv4 default gateway
+ address.
+ """
+ context = {'ETH0_GATEWAY': '1.2.3.5'}
+ net = ds.OpenNebulaNetwork(context)
+ val = net.get_gateway('eth0')
+ self.assertEqual('1.2.3.5', val)
+
+ def test_get_gateway6(self):
+ """
+ Verify get_gateway6('device') correctly returns IPv6 default gateway
+ address.
+ """
+ context = {'ETH0_GATEWAY6': IP6_GW}
+ net = ds.OpenNebulaNetwork(context)
+ val = net.get_gateway6('eth0')
+ self.assertEqual(IP6_GW, val)
+
+ def test_get_mask(self):
+ """
+ Verify get_mask('device') correctly returns IPv4 subnet mask.
+ """
+ context = {'ETH0_MASK': '255.255.0.0'}
+ net = ds.OpenNebulaNetwork(context)
+ val = net.get_mask('eth0')
+ self.assertEqual('255.255.0.0', val)
+
+ def test_get_mask_emptystring(self):
+ """
+ Verify get_mask('device') correctly returns IPv4 subnet mask.
+ It returns default value '255.255.255.0' if ETH0_MASK has empty string.
+ """
+ context = {'ETH0_MASK': ''}
+ net = ds.OpenNebulaNetwork(context)
+ val = net.get_mask('eth0')
+ self.assertEqual('255.255.255.0', val)
+
+ def test_get_network(self):
+ """
+ Verify get_network('device') correctly returns IPv4 network address.
+ """
+ context = {'ETH0_NETWORK': '1.2.3.0'}
+ net = ds.OpenNebulaNetwork(context)
+ val = net.get_network('eth0', MACADDR)
+ self.assertEqual('1.2.3.0', val)
+
+ def test_get_network_emptystring(self):
+ """
+ Verify get_network('device') correctly returns IPv4 network address.
+ It returns network address created by MAC address if ETH0_NETWORK has
+ empty string.
+ """
+ context = {'ETH0_NETWORK': ''}
+ net = ds.OpenNebulaNetwork(context)
+ val = net.get_network('eth0', MACADDR)
+ self.assertEqual('10.18.1.0', val)
+
+ def test_get_field(self):
+ """
+ Verify get_field('device', 'name') returns *context* value.
+ """
+ context = {'ETH9_DUMMY': 'DUMMY_VALUE'}
+ net = ds.OpenNebulaNetwork(context)
+ val = net.get_field('eth9', 'dummy')
+ self.assertEqual('DUMMY_VALUE', val)
+
+ def test_get_field_withdefaultvalue(self):
+ """
+ Verify get_field('device', 'name', 'default value') returns *context*
+ value.
+ """
+ context = {'ETH9_DUMMY': 'DUMMY_VALUE'}
+ net = ds.OpenNebulaNetwork(context)
+ val = net.get_field('eth9', 'dummy', 'DEFAULT_VALUE')
+ self.assertEqual('DUMMY_VALUE', val)
+
+ def test_get_field_withdefaultvalue_emptycontext(self):
+ """
+ Verify get_field('device', 'name', 'default value') returns *default*
+ value if context value is empty string.
+ """
+ context = {'ETH9_DUMMY': ''}
+ net = ds.OpenNebulaNetwork(context)
+ val = net.get_field('eth9', 'dummy', 'DEFAULT_VALUE')
+ self.assertEqual('DEFAULT_VALUE', val)
+
+ def test_get_field_emptycontext(self):
+ """
+ Verify get_field('device', 'name') returns None if context value is
+ empty string.
+ """
+ context = {'ETH9_DUMMY': ''}
+ net = ds.OpenNebulaNetwork(context)
+ val = net.get_field('eth9', 'dummy')
+ self.assertEqual(None, val)
+
+ def test_get_field_nonecontext(self):
+ """
+ Verify get_field('device', 'name') returns None if context value is
+ None.
+ """
+ context = {'ETH9_DUMMY': None}
+ net = ds.OpenNebulaNetwork(context)
+ val = net.get_field('eth9', 'dummy')
+ self.assertEqual(None, val)
+
+ @mock.patch(DS_PATH + ".get_physical_nics_by_mac")
+ def test_gen_conf_gateway(self, m_get_phys_by_mac):
+ """Test rendering with/without IPv4 gateway"""
+ self.maxDiff = None
+ # empty ETH0_GATEWAY
+ context = {
+ 'ETH0_MAC': '02:00:0a:12:01:01',
+ 'ETH0_GATEWAY': '', }
+ for nic in self.system_nics:
+ expected = {
+ 'version': 2,
+ 'ethernets': {
+ nic: {
+ 'match': {'macaddress': MACADDR},
+ 'addresses': [IP_BY_MACADDR + '/' + IP4_PREFIX]}}}
+ m_get_phys_by_mac.return_value = {MACADDR: nic}
+ net = ds.OpenNebulaNetwork(context)
+ self.assertEqual(net.gen_conf(), expected)
+
+ # set ETH0_GATEWAY
+ context = {
+ 'ETH0_MAC': '02:00:0a:12:01:01',
+ 'ETH0_GATEWAY': '1.2.3.5', }
+ for nic in self.system_nics:
+ expected = {
+ 'version': 2,
+ 'ethernets': {
+ nic: {
+ 'gateway4': '1.2.3.5',
+ 'match': {'macaddress': MACADDR},
+ 'addresses': [IP_BY_MACADDR + '/' + IP4_PREFIX]}}}
+ m_get_phys_by_mac.return_value = {MACADDR: nic}
+ net = ds.OpenNebulaNetwork(context)
+ self.assertEqual(net.gen_conf(), expected)
+
+ @mock.patch(DS_PATH + ".get_physical_nics_by_mac")
+ def test_gen_conf_gateway6(self, m_get_phys_by_mac):
+ """Test rendering with/without IPv6 gateway"""
+ self.maxDiff = None
+ # empty ETH0_GATEWAY6
+ context = {
+ 'ETH0_MAC': '02:00:0a:12:01:01',
+ 'ETH0_GATEWAY6': '', }
+ for nic in self.system_nics:
+ expected = {
+ 'version': 2,
+ 'ethernets': {
+ nic: {
+ 'match': {'macaddress': MACADDR},
+ 'addresses': [IP_BY_MACADDR + '/' + IP4_PREFIX]}}}
+ m_get_phys_by_mac.return_value = {MACADDR: nic}
+ net = ds.OpenNebulaNetwork(context)
+ self.assertEqual(net.gen_conf(), expected)
+
+ # set ETH0_GATEWAY6
+ context = {
+ 'ETH0_MAC': '02:00:0a:12:01:01',
+ 'ETH0_GATEWAY6': IP6_GW, }
+ for nic in self.system_nics:
+ expected = {
+ 'version': 2,
+ 'ethernets': {
+ nic: {
+ 'gateway6': IP6_GW,
+ 'match': {'macaddress': MACADDR},
+ 'addresses': [IP_BY_MACADDR + '/' + IP4_PREFIX]}}}
+ m_get_phys_by_mac.return_value = {MACADDR: nic}
+ net = ds.OpenNebulaNetwork(context)
+ self.assertEqual(net.gen_conf(), expected)
+
+ @mock.patch(DS_PATH + ".get_physical_nics_by_mac")
+ def test_gen_conf_ipv6address(self, m_get_phys_by_mac):
+ """Test rendering with/without IPv6 address"""
+ self.maxDiff = None
+ # empty ETH0_IP6, ETH0_IP6_ULA, ETH0_IP6_PREFIX_LENGTH
+ context = {
+ 'ETH0_MAC': '02:00:0a:12:01:01',
+ 'ETH0_IP6': '',
+ 'ETH0_IP6_ULA': '',
+ 'ETH0_IP6_PREFIX_LENGTH': '', }
+ for nic in self.system_nics:
+ expected = {
+ 'version': 2,
+ 'ethernets': {
+ nic: {
+ 'match': {'macaddress': MACADDR},
+ 'addresses': [IP_BY_MACADDR + '/' + IP4_PREFIX]}}}
+ m_get_phys_by_mac.return_value = {MACADDR: nic}
+ net = ds.OpenNebulaNetwork(context)
+ self.assertEqual(net.gen_conf(), expected)
+
+ # set ETH0_IP6, ETH0_IP6_ULA, ETH0_IP6_PREFIX_LENGTH
+ context = {
+ 'ETH0_MAC': '02:00:0a:12:01:01',
+ 'ETH0_IP6': IP6_GLOBAL,
+ 'ETH0_IP6_PREFIX_LENGTH': IP6_PREFIX,
+ 'ETH0_IP6_ULA': IP6_ULA, }
+ for nic in self.system_nics:
+ expected = {
+ 'version': 2,
+ 'ethernets': {
+ nic: {
+ 'match': {'macaddress': MACADDR},
+ 'addresses': [
+ IP_BY_MACADDR + '/' + IP4_PREFIX,
+ IP6_GLOBAL + '/' + IP6_PREFIX,
+ IP6_ULA + '/' + IP6_PREFIX]}}}
+ m_get_phys_by_mac.return_value = {MACADDR: nic}
+ net = ds.OpenNebulaNetwork(context)
+ self.assertEqual(net.gen_conf(), expected)
+
+ @mock.patch(DS_PATH + ".get_physical_nics_by_mac")
+ def test_gen_conf_dns(self, m_get_phys_by_mac):
+ """Test rendering with/without DNS server, search domain"""
+ self.maxDiff = None
+ # empty DNS, ETH0_DNS, ETH0_SEARCH_DOMAIN
+ context = {
+ 'ETH0_MAC': '02:00:0a:12:01:01',
+ 'DNS': '',
+ 'ETH0_DNS': '',
+ 'ETH0_SEARCH_DOMAIN': '', }
+ for nic in self.system_nics:
+ expected = {
+ 'version': 2,
+ 'ethernets': {
+ nic: {
+ 'match': {'macaddress': MACADDR},
+ 'addresses': [IP_BY_MACADDR + '/' + IP4_PREFIX]}}}
+ m_get_phys_by_mac.return_value = {MACADDR: nic}
+ net = ds.OpenNebulaNetwork(context)
+ self.assertEqual(net.gen_conf(), expected)
+
+ # set DNS, ETH0_DNS, ETH0_SEARCH_DOMAIN
+ context = {
+ 'ETH0_MAC': '02:00:0a:12:01:01',
+ 'DNS': '1.2.3.8',
+ 'ETH0_DNS': '1.2.3.6 1.2.3.7',
+ 'ETH0_SEARCH_DOMAIN': 'example.com example.org', }
+ for nic in self.system_nics:
+ expected = {
+ 'version': 2,
+ 'ethernets': {
+ nic: {
+ 'nameservers': {
+ 'addresses': ['1.2.3.6', '1.2.3.7', '1.2.3.8'],
+ 'search': ['example.com', 'example.org']},
+ 'match': {'macaddress': MACADDR},
+ 'addresses': [IP_BY_MACADDR + '/' + IP4_PREFIX]}}}
+ m_get_phys_by_mac.return_value = {MACADDR: nic}
+ net = ds.OpenNebulaNetwork(context)
+ self.assertEqual(net.gen_conf(), expected)
+
+ @mock.patch(DS_PATH + ".get_physical_nics_by_mac")
+ def test_gen_conf_mtu(self, m_get_phys_by_mac):
+ """Test rendering with/without MTU"""
+ self.maxDiff = None
+ # empty ETH0_MTU
+ context = {
+ 'ETH0_MAC': '02:00:0a:12:01:01',
+ 'ETH0_MTU': '', }
+ for nic in self.system_nics:
+ expected = {
+ 'version': 2,
+ 'ethernets': {
+ nic: {
+ 'match': {'macaddress': MACADDR},
+ 'addresses': [IP_BY_MACADDR + '/' + IP4_PREFIX]}}}
+ m_get_phys_by_mac.return_value = {MACADDR: nic}
+ net = ds.OpenNebulaNetwork(context)
+ self.assertEqual(net.gen_conf(), expected)
+
+ # set ETH0_MTU
+ context = {
+ 'ETH0_MAC': '02:00:0a:12:01:01',
+ 'ETH0_MTU': '1280', }
+ for nic in self.system_nics:
+ expected = {
+ 'version': 2,
+ 'ethernets': {
+ nic: {
+ 'mtu': '1280',
+ 'match': {'macaddress': MACADDR},
+ 'addresses': [IP_BY_MACADDR + '/' + IP4_PREFIX]}}}
+ m_get_phys_by_mac.return_value = {MACADDR: nic}
+ net = ds.OpenNebulaNetwork(context)
+ self.assertEqual(net.gen_conf(), expected)
+
@mock.patch(DS_PATH + ".get_physical_nics_by_mac")
def test_eth0(self, m_get_phys_by_mac):
for nic in self.system_nics:
@@ -395,7 +802,6 @@ class TestOpenNebulaNetwork(unittest.TestCase):
'match': {'macaddress': MACADDR},
'addresses': [IP_BY_MACADDR + '/16'],
'gateway4': '1.2.3.5',
- 'gateway6': None,
'nameservers': {
'addresses': ['1.2.3.6', '1.2.3.7', '1.2.3.8']}}}}
@@ -494,7 +900,6 @@ class TestOpenNebulaNetwork(unittest.TestCase):
'match': {'macaddress': MAC_1},
'addresses': ['10.3.1.3/16'],
'gateway4': '10.3.0.1',
- 'gateway6': None,
'nameservers': {
'addresses': ['10.3.1.2', '1.2.3.8'],
'search': [
diff --git a/tests/unittests/test_datasource/test_openstack.py b/tests/unittests/test_datasource/test_openstack.py
index 585acc33..a731f1ed 100644
--- a/tests/unittests/test_datasource/test_openstack.py
+++ b/tests/unittests/test_datasource/test_openstack.py
@@ -12,11 +12,11 @@ import re
from cloudinit.tests import helpers as test_helpers
from six.moves.urllib.parse import urlparse
-from six import StringIO
+from six import StringIO, text_type
from cloudinit import helpers
from cloudinit import settings
-from cloudinit.sources import convert_vendordata, UNSET
+from cloudinit.sources import BrokenMetadata, convert_vendordata, UNSET
from cloudinit.sources import DataSourceOpenStack as ds
from cloudinit.sources.helpers import openstack
from cloudinit import util
@@ -186,7 +186,7 @@ class TestOpenStackDataSource(test_helpers.HttprettyTestCase):
if k.endswith('meta_data.json'):
os_files[k] = json.dumps(os_meta)
_register_uris(self.VERSION, {}, {}, os_files)
- self.assertRaises(openstack.BrokenMetadata, _read_metadata_service)
+ self.assertRaises(BrokenMetadata, _read_metadata_service)
def test_userdata_empty(self):
os_files = copy.deepcopy(OS_FILES)
@@ -217,7 +217,7 @@ class TestOpenStackDataSource(test_helpers.HttprettyTestCase):
if k.endswith('vendor_data.json'):
os_files[k] = '{' # some invalid json
_register_uris(self.VERSION, {}, {}, os_files)
- self.assertRaises(openstack.BrokenMetadata, _read_metadata_service)
+ self.assertRaises(BrokenMetadata, _read_metadata_service)
def test_metadata_invalid(self):
os_files = copy.deepcopy(OS_FILES)
@@ -225,7 +225,7 @@ class TestOpenStackDataSource(test_helpers.HttprettyTestCase):
if k.endswith('meta_data.json'):
os_files[k] = '{' # some invalid json
_register_uris(self.VERSION, {}, {}, os_files)
- self.assertRaises(openstack.BrokenMetadata, _read_metadata_service)
+ self.assertRaises(BrokenMetadata, _read_metadata_service)
@test_helpers.mock.patch('cloudinit.net.dhcp.maybe_perform_dhcp_discovery')
def test_datasource(self, m_dhcp):
@@ -510,6 +510,27 @@ class TestDetectOpenStack(test_helpers.CiTestCase):
ds.detect_openstack(),
'Expected detect_openstack == True on OpenTelekomCloud')
+ @test_helpers.mock.patch(MOCK_PATH + 'util.read_dmi_data')
+ def test_detect_openstack_oraclecloud_chassis_asset_tag(self, m_dmi,
+ m_is_x86):
+ """Return True on OpenStack reporting Oracle cloud asset-tag."""
+ m_is_x86.return_value = True
+
+ def fake_dmi_read(dmi_key):
+ if dmi_key == 'system-product-name':
+ return 'Standard PC (i440FX + PIIX, 1996)' # No match
+ if dmi_key == 'chassis-asset-tag':
+ return 'OracleCloud.com'
+ assert False, 'Unexpected dmi read of %s' % dmi_key
+
+ m_dmi.side_effect = fake_dmi_read
+ self.assertTrue(
+ ds.detect_openstack(accept_oracle=True),
+ 'Expected detect_openstack == True on OracleCloud.com')
+ self.assertFalse(
+ ds.detect_openstack(accept_oracle=False),
+ 'Expected detect_openstack == False.')
+
@test_helpers.mock.patch(MOCK_PATH + 'util.get_proc_env')
@test_helpers.mock.patch(MOCK_PATH + 'util.read_dmi_data')
def test_detect_openstack_by_proc_1_environ(self, m_dmi, m_proc_env,
@@ -534,4 +555,94 @@ class TestDetectOpenStack(test_helpers.CiTestCase):
m_proc_env.assert_called_with(1)
+class TestMetadataReader(test_helpers.HttprettyTestCase):
+ """Test the MetadataReader."""
+ burl = 'http://169.254.169.254/'
+ md_base = {
+ 'availability_zone': 'myaz1',
+ 'hostname': 'sm-foo-test.novalocal',
+ "keys": [{"data": PUBKEY, "name": "brickies", "type": "ssh"}],
+ 'launch_index': 0,
+ 'name': 'sm-foo-test',
+ 'public_keys': {'mykey': PUBKEY},
+ 'project_id': '6a103f813b774b9fb15a4fcd36e1c056',
+ 'uuid': 'b0fa911b-69d4-4476-bbe2-1c92bff6535c'}
+
+ def register(self, path, body=None, status=200):
+ content = (body if not isinstance(body, text_type)
+ else body.encode('utf-8'))
+ hp.register_uri(
+ hp.GET, self.burl + "openstack" + path, status=status,
+ body=content)
+
+ def register_versions(self, versions):
+ self.register("", '\n'.join(versions))
+ self.register("/", '\n'.join(versions))
+
+ def register_version(self, version, data):
+ content = '\n'.join(sorted(data.keys()))
+ self.register(version, content)
+ self.register(version + "/", content)
+ for path, content in data.items():
+ self.register("/%s/%s" % (version, path), content)
+ self.register("/%s/%s" % (version, path), content)
+ if 'user_data' not in data:
+ self.register("/%s/user_data" % version, "nodata", status=404)
+
+ def test__find_working_version(self):
+ """Test a working version ignores unsupported."""
+ unsup = "2016-11-09"
+ self.register_versions(
+ [openstack.OS_FOLSOM, openstack.OS_LIBERTY, unsup,
+ openstack.OS_LATEST])
+ self.assertEqual(
+ openstack.OS_LIBERTY,
+ openstack.MetadataReader(self.burl)._find_working_version())
+
+ def test__find_working_version_uses_latest(self):
+ """'latest' should be used if no supported versions."""
+ unsup1, unsup2 = ("2016-11-09", '2017-06-06')
+ self.register_versions([unsup1, unsup2, openstack.OS_LATEST])
+ self.assertEqual(
+ openstack.OS_LATEST,
+ openstack.MetadataReader(self.burl)._find_working_version())
+
+ def test_read_v2_os_ocata(self):
+ """Validate return value of read_v2 for os_ocata data."""
+ md = copy.deepcopy(self.md_base)
+ md['devices'] = []
+ network_data = {'links': [], 'networks': [], 'services': []}
+ vendor_data = {}
+ vendor_data2 = {"static": {}}
+
+ data = {
+ 'meta_data.json': json.dumps(md),
+ 'network_data.json': json.dumps(network_data),
+ 'vendor_data.json': json.dumps(vendor_data),
+ 'vendor_data2.json': json.dumps(vendor_data2),
+ }
+
+ self.register_versions([openstack.OS_OCATA, openstack.OS_LATEST])
+ self.register_version(openstack.OS_OCATA, data)
+
+ mock_read_ec2 = test_helpers.mock.MagicMock(
+ return_value={'instance-id': 'unused-ec2'})
+ expected_md = copy.deepcopy(md)
+ expected_md.update(
+ {'instance-id': md['uuid'], 'local-hostname': md['hostname']})
+ expected = {
+ 'userdata': '', # Annoying, no user-data results in empty string.
+ 'version': 2,
+ 'metadata': expected_md,
+ 'vendordata': vendor_data,
+ 'networkdata': network_data,
+ 'ec2-metadata': mock_read_ec2.return_value,
+ 'files': {},
+ }
+ reader = openstack.MetadataReader(self.burl)
+ reader._read_ec2_metadata = mock_read_ec2
+ self.assertEqual(expected, reader.read_v2())
+ self.assertEqual(1, mock_read_ec2.call_count)
+
+
# vi: ts=4 expandtab
diff --git a/tests/unittests/test_datasource/test_ovf.py b/tests/unittests/test_datasource/test_ovf.py
index fc4eb36e..9d52eb99 100644
--- a/tests/unittests/test_datasource/test_ovf.py
+++ b/tests/unittests/test_datasource/test_ovf.py
@@ -124,7 +124,9 @@ class TestDatasourceOVF(CiTestCase):
ds = self.datasource(sys_cfg={}, distro={}, paths=paths)
retcode = wrap_and_call(
'cloudinit.sources.DataSourceOVF',
- {'util.read_dmi_data': None},
+ {'util.read_dmi_data': None,
+ 'transport_iso9660': (False, None, None),
+ 'transport_vmware_guestd': (False, None, None)},
ds.get_data)
self.assertFalse(retcode, 'Expected False return from ds.get_data')
self.assertIn(
@@ -138,7 +140,9 @@ class TestDatasourceOVF(CiTestCase):
paths=paths)
retcode = wrap_and_call(
'cloudinit.sources.DataSourceOVF',
- {'util.read_dmi_data': 'vmware'},
+ {'util.read_dmi_data': 'vmware',
+ 'transport_iso9660': (False, None, None),
+ 'transport_vmware_guestd': (False, None, None)},
ds.get_data)
self.assertFalse(retcode, 'Expected False return from ds.get_data')
self.assertIn(
diff --git a/tests/unittests/test_datasource/test_scaleway.py b/tests/unittests/test_datasource/test_scaleway.py
index e4e9bb20..c2bc7a00 100644
--- a/tests/unittests/test_datasource/test_scaleway.py
+++ b/tests/unittests/test_datasource/test_scaleway.py
@@ -176,11 +176,18 @@ class TestDataSourceScaleway(HttprettyTestCase):
self.vendordata_url = \
DataSourceScaleway.BUILTIN_DS_CONFIG['vendordata_url']
+ self.add_patch('cloudinit.sources.DataSourceScaleway.on_scaleway',
+ '_m_on_scaleway', return_value=True)
+ self.add_patch(
+ 'cloudinit.sources.DataSourceScaleway.net.find_fallback_nic',
+ '_m_find_fallback_nic', return_value='scalewaynic0')
+
+ @mock.patch('cloudinit.sources.DataSourceScaleway.EphemeralDHCPv4')
@mock.patch('cloudinit.sources.DataSourceScaleway.SourceAddressAdapter',
get_source_address_adapter)
@mock.patch('cloudinit.util.get_cmdline')
@mock.patch('time.sleep', return_value=None)
- def test_metadata_ok(self, sleep, m_get_cmdline):
+ def test_metadata_ok(self, sleep, m_get_cmdline, dhcpv4):
"""
get_data() returns metadata, user data and vendor data.
"""
@@ -211,11 +218,12 @@ class TestDataSourceScaleway(HttprettyTestCase):
self.assertIsNone(self.datasource.region)
self.assertEqual(sleep.call_count, 0)
+ @mock.patch('cloudinit.sources.DataSourceScaleway.EphemeralDHCPv4')
@mock.patch('cloudinit.sources.DataSourceScaleway.SourceAddressAdapter',
get_source_address_adapter)
@mock.patch('cloudinit.util.get_cmdline')
@mock.patch('time.sleep', return_value=None)
- def test_metadata_404(self, sleep, m_get_cmdline):
+ def test_metadata_404(self, sleep, m_get_cmdline, dhcpv4):
"""
get_data() returns metadata, but no user data nor vendor data.
"""
@@ -234,11 +242,12 @@ class TestDataSourceScaleway(HttprettyTestCase):
self.assertIsNone(self.datasource.get_vendordata_raw())
self.assertEqual(sleep.call_count, 0)
+ @mock.patch('cloudinit.sources.DataSourceScaleway.EphemeralDHCPv4')
@mock.patch('cloudinit.sources.DataSourceScaleway.SourceAddressAdapter',
get_source_address_adapter)
@mock.patch('cloudinit.util.get_cmdline')
@mock.patch('time.sleep', return_value=None)
- def test_metadata_rate_limit(self, sleep, m_get_cmdline):
+ def test_metadata_rate_limit(self, sleep, m_get_cmdline, dhcpv4):
"""
get_data() is rate limited two times by the metadata API when fetching
user data.
@@ -262,3 +271,67 @@ class TestDataSourceScaleway(HttprettyTestCase):
self.assertEqual(self.datasource.get_userdata_raw(),
DataResponses.FAKE_USER_DATA)
self.assertEqual(sleep.call_count, 2)
+
+ @mock.patch('cloudinit.sources.DataSourceScaleway.net.find_fallback_nic')
+ @mock.patch('cloudinit.util.get_cmdline')
+ def test_network_config_ok(self, m_get_cmdline, fallback_nic):
+ """
+ network_config will only generate IPv4 config if no ipv6 data is
+ available in the metadata
+ """
+ m_get_cmdline.return_value = 'scaleway'
+ fallback_nic.return_value = 'ens2'
+ self.datasource.metadata['ipv6'] = None
+
+ netcfg = self.datasource.network_config
+ resp = {'version': 1,
+ 'config': [{
+ 'type': 'physical',
+ 'name': 'ens2',
+ 'subnets': [{'type': 'dhcp4'}]}]
+ }
+ self.assertEqual(netcfg, resp)
+
+ @mock.patch('cloudinit.sources.DataSourceScaleway.net.find_fallback_nic')
+ @mock.patch('cloudinit.util.get_cmdline')
+ def test_network_config_ipv6_ok(self, m_get_cmdline, fallback_nic):
+ """
+ network_config will only generate IPv4/v6 configs if ipv6 data is
+ available in the metadata
+ """
+ m_get_cmdline.return_value = 'scaleway'
+ fallback_nic.return_value = 'ens2'
+ self.datasource.metadata['ipv6'] = {
+ 'address': '2000:abc:4444:9876::42:999',
+ 'gateway': '2000:abc:4444:9876::42:000',
+ 'netmask': '127',
+ }
+
+ netcfg = self.datasource.network_config
+ resp = {'version': 1,
+ 'config': [{
+ 'type': 'physical',
+ 'name': 'ens2',
+ 'subnets': [{'type': 'dhcp4'},
+ {'type': 'static',
+ 'address': '2000:abc:4444:9876::42:999',
+ 'gateway': '2000:abc:4444:9876::42:000',
+ 'netmask': '127', }
+ ]
+
+ }]
+ }
+ self.assertEqual(netcfg, resp)
+
+ @mock.patch('cloudinit.sources.DataSourceScaleway.net.find_fallback_nic')
+ @mock.patch('cloudinit.util.get_cmdline')
+ def test_network_config_existing(self, m_get_cmdline, fallback_nic):
+ """
+ network_config() should return the same data if a network config
+ already exists
+ """
+ m_get_cmdline.return_value = 'scaleway'
+ self.datasource._network_config = '0xdeadbeef'
+
+ netcfg = self.datasource.network_config
+ self.assertEqual(netcfg, '0xdeadbeef')
diff --git a/tests/unittests/test_datasource/test_smartos.py b/tests/unittests/test_datasource/test_smartos.py
index dca0b3d4..46d67b94 100644
--- a/tests/unittests/test_datasource/test_smartos.py
+++ b/tests/unittests/test_datasource/test_smartos.py
@@ -20,10 +20,8 @@ import multiprocessing
import os
import os.path
import re
-import shutil
import signal
import stat
-import tempfile
import unittest2
import uuid
@@ -31,15 +29,27 @@ from cloudinit import serial
from cloudinit.sources import DataSourceSmartOS
from cloudinit.sources.DataSourceSmartOS import (
convert_smartos_network_data as convert_net,
- SMARTOS_ENV_KVM, SERIAL_DEVICE, get_smartos_environ)
+ SMARTOS_ENV_KVM, SERIAL_DEVICE, get_smartos_environ,
+ identify_file)
import six
from cloudinit import helpers as c_helpers
-from cloudinit.util import (b64e, subp)
+from cloudinit.util import (
+ b64e, subp, ProcessExecutionError, which, write_file)
-from cloudinit.tests.helpers import mock, FilesystemMockingTestCase, TestCase
+from cloudinit.tests.helpers import (
+ CiTestCase, mock, FilesystemMockingTestCase, skipIf)
+
+try:
+ import serial as _pyserial
+ assert _pyserial # avoid pyflakes error F401: import unused
+ HAS_PYSERIAL = True
+except ImportError:
+ HAS_PYSERIAL = False
+
+DSMOS = 'cloudinit.sources.DataSourceSmartOS'
SDC_NICS = json.loads("""
[
{
@@ -366,37 +376,33 @@ class PsuedoJoyentClient(object):
class TestSmartOSDataSource(FilesystemMockingTestCase):
+ jmc_cfact = None
+ get_smartos_environ = None
+
def setUp(self):
super(TestSmartOSDataSource, self).setUp()
- dsmos = 'cloudinit.sources.DataSourceSmartOS'
- patcher = mock.patch(dsmos + ".jmc_client_factory")
- self.jmc_cfact = patcher.start()
- self.addCleanup(patcher.stop)
- patcher = mock.patch(dsmos + ".get_smartos_environ")
- self.get_smartos_environ = patcher.start()
- self.addCleanup(patcher.stop)
-
- self.tmp = tempfile.mkdtemp()
- self.addCleanup(shutil.rmtree, self.tmp)
- self.paths = c_helpers.Paths(
- {'cloud_dir': self.tmp, 'run_dir': self.tmp})
-
- self.legacy_user_d = os.path.join(self.tmp, 'legacy_user_tmp')
+ self.add_patch(DSMOS + ".get_smartos_environ", "get_smartos_environ")
+ self.add_patch(DSMOS + ".jmc_client_factory", "jmc_cfact")
+ self.legacy_user_d = self.tmp_path('legacy_user_tmp')
os.mkdir(self.legacy_user_d)
-
- self.orig_lud = DataSourceSmartOS.LEGACY_USER_D
- DataSourceSmartOS.LEGACY_USER_D = self.legacy_user_d
-
- def tearDown(self):
- DataSourceSmartOS.LEGACY_USER_D = self.orig_lud
- super(TestSmartOSDataSource, self).tearDown()
+ self.add_patch(DSMOS + ".LEGACY_USER_D", "m_legacy_user_d",
+ autospec=False, new=self.legacy_user_d)
+ self.add_patch(DSMOS + ".identify_file", "m_identify_file",
+ return_value="text/plain")
def _get_ds(self, mockdata=None, mode=DataSourceSmartOS.SMARTOS_ENV_KVM,
sys_cfg=None, ds_cfg=None):
self.jmc_cfact.return_value = PsuedoJoyentClient(mockdata)
self.get_smartos_environ.return_value = mode
+ tmpd = self.tmp_dir()
+ dirs = {'cloud_dir': self.tmp_path('cloud_dir', tmpd),
+ 'run_dir': self.tmp_path('run_dir')}
+ for d in dirs.values():
+ os.mkdir(d)
+ paths = c_helpers.Paths(dirs)
+
if sys_cfg is None:
sys_cfg = {}
@@ -405,7 +411,7 @@ class TestSmartOSDataSource(FilesystemMockingTestCase):
sys_cfg['datasource']['SmartOS'] = ds_cfg
return DataSourceSmartOS.DataSourceSmartOS(
- sys_cfg, distro=None, paths=self.paths)
+ sys_cfg, distro=None, paths=paths)
def test_no_base64(self):
ds_cfg = {'no_base64_decode': ['test_var1'], 'all_base': True}
@@ -493,6 +499,7 @@ class TestSmartOSDataSource(FilesystemMockingTestCase):
dsrc.metadata['user-script'])
legacy_script_f = "%s/user-script" % self.legacy_user_d
+ print("legacy_script_f=%s" % legacy_script_f)
self.assertTrue(os.path.exists(legacy_script_f))
self.assertTrue(os.path.islink(legacy_script_f))
user_script_perm = oct(os.stat(legacy_script_f)[stat.ST_MODE])[-3:]
@@ -640,6 +647,28 @@ class TestSmartOSDataSource(FilesystemMockingTestCase):
mydscfg['disk_aliases']['FOO'])
+class TestIdentifyFile(CiTestCase):
+ """Test the 'identify_file' utility."""
+ @skipIf(not which("file"), "command 'file' not available.")
+ def test_file_happy_path(self):
+ """Test file is available and functional on plain text."""
+ fname = self.tmp_path("myfile")
+ write_file(fname, "plain text content here\n")
+ with self.allow_subp(["file"]):
+ self.assertEqual("text/plain", identify_file(fname))
+
+ @mock.patch(DSMOS + ".util.subp")
+ def test_returns_none_on_error(self, m_subp):
+ """On 'file' execution error, None should be returned."""
+ m_subp.side_effect = ProcessExecutionError("FILE_FAILED", exit_code=99)
+ fname = self.tmp_path("myfile")
+ write_file(fname, "plain text content here\n")
+ self.assertEqual(None, identify_file(fname))
+ self.assertEqual(
+ [mock.call(["file", "--brief", "--mime-type", fname])],
+ m_subp.call_args_list)
+
+
class ShortReader(object):
"""Implements a 'read' interface for bytes provided.
much like io.BytesIO but the 'endbyte' acts as if EOF.
@@ -893,7 +922,7 @@ class TestJoyentMetadataClient(FilesystemMockingTestCase):
self.assertEqual(client.list(), [])
-class TestNetworkConversion(TestCase):
+class TestNetworkConversion(CiTestCase):
def test_convert_simple(self):
expected = {
'version': 1,
@@ -1058,7 +1087,8 @@ class TestNetworkConversion(TestCase):
"Only supported on KVM and bhyve guests under SmartOS")
@unittest2.skipUnless(os.access(SERIAL_DEVICE, os.W_OK),
"Requires write access to " + SERIAL_DEVICE)
-class TestSerialConcurrency(TestCase):
+@unittest2.skipUnless(HAS_PYSERIAL is True, "pyserial not available")
+class TestSerialConcurrency(CiTestCase):
"""
This class tests locking on an actual serial port, and as such can only
be run in a kvm or bhyve guest running on a SmartOS host. A test run on
@@ -1066,7 +1096,11 @@ class TestSerialConcurrency(TestCase):
there is only one session over a connection. In contrast, in the
absence of proper locking multiple processes opening the same serial
port can corrupt each others' exchanges with the metadata server.
+
+ This takes on the order of 2 to 3 minutes to run.
"""
+ allowed_subp = ['mdata-get']
+
def setUp(self):
self.mdata_proc = multiprocessing.Process(target=self.start_mdata_loop)
self.mdata_proc.start()
@@ -1097,7 +1131,7 @@ class TestSerialConcurrency(TestCase):
keys = [tup[0] for tup in ds.SMARTOS_ATTRIB_MAP.values()]
keys.extend(ds.SMARTOS_ATTRIB_JSON.values())
- client = ds.jmc_client_factory()
+ client = ds.jmc_client_factory(smartos_type=SMARTOS_ENV_KVM)
self.assertIsNotNone(client)
# The behavior that we are testing for was observed mdata-get running