diff options
author | Chad Smith <chad.smith@canonical.com> | 2018-10-03 12:10:23 -0600 |
---|---|---|
committer | Chad Smith <chad.smith@canonical.com> | 2018-10-03 12:10:23 -0600 |
commit | d6347e1c439eda7f43d9620dac2b461e980e1ae9 (patch) | |
tree | 08410263488d11a2a29edcc620575ed1b028100e /tests/unittests/test_datasource | |
parent | 564793a76b9c9add1ee81bab4919c8dccd45a33d (diff) | |
parent | e28000457591bde9f22d6b7a538b1fc33349d780 (diff) | |
download | vyos-cloud-init-d6347e1c439eda7f43d9620dac2b461e980e1ae9.tar.gz vyos-cloud-init-d6347e1c439eda7f43d9620dac2b461e980e1ae9.zip |
merge from master at 18.4
Diffstat (limited to 'tests/unittests/test_datasource')
-rw-r--r-- | tests/unittests/test_datasource/test_altcloud.py | 44 | ||||
-rw-r--r-- | tests/unittests/test_datasource/test_azure.py | 399 | ||||
-rw-r--r-- | tests/unittests/test_datasource/test_cloudsigma.py | 3 | ||||
-rw-r--r-- | tests/unittests/test_datasource/test_common.py | 4 | ||||
-rw-r--r-- | tests/unittests/test_datasource/test_configdrive.py | 15 | ||||
-rw-r--r-- | tests/unittests/test_datasource/test_nocloud.py | 2 | ||||
-rw-r--r-- | tests/unittests/test_datasource/test_opennebula.py | 409 | ||||
-rw-r--r-- | tests/unittests/test_datasource/test_openstack.py | 121 | ||||
-rw-r--r-- | tests/unittests/test_datasource/test_ovf.py | 8 | ||||
-rw-r--r-- | tests/unittests/test_datasource/test_scaleway.py | 79 | ||||
-rw-r--r-- | tests/unittests/test_datasource/test_smartos.py | 94 |
11 files changed, 1077 insertions, 101 deletions
diff --git a/tests/unittests/test_datasource/test_altcloud.py b/tests/unittests/test_datasource/test_altcloud.py index 3253f3ad..ff35904e 100644 --- a/tests/unittests/test_datasource/test_altcloud.py +++ b/tests/unittests/test_datasource/test_altcloud.py @@ -262,64 +262,56 @@ class TestUserDataRhevm(CiTestCase): ''' Test to exercise method: DataSourceAltCloud.user_data_rhevm() ''' - cmd_pass = ['true'] - cmd_fail = ['false'] - cmd_not_found = ['bogus bad command'] - def setUp(self): '''Set up.''' self.paths = helpers.Paths({'cloud_dir': '/tmp'}) - self.mount_dir = tempfile.mkdtemp() + self.mount_dir = self.tmp_dir() _write_user_data_files(self.mount_dir, 'test user data') - - def tearDown(self): - # Reset - - _remove_user_data_files(self.mount_dir) - - # Attempt to remove the temp dir ignoring errors - try: - shutil.rmtree(self.mount_dir) - except OSError: - pass - - dsac.CLOUD_INFO_FILE = '/etc/sysconfig/cloud-info' - dsac.CMD_PROBE_FLOPPY = ['modprobe', 'floppy'] - dsac.CMD_UDEVADM_SETTLE = ['udevadm', 'settle', - '--quiet', '--timeout=5'] + self.add_patch( + 'cloudinit.sources.DataSourceAltCloud.modprobe_floppy', + 'm_modprobe_floppy', return_value=None) + self.add_patch( + 'cloudinit.sources.DataSourceAltCloud.util.udevadm_settle', + 'm_udevadm_settle', return_value=('', '')) + self.add_patch( + 'cloudinit.sources.DataSourceAltCloud.util.mount_cb', + 'm_mount_cb') def test_mount_cb_fails(self): '''Test user_data_rhevm() where mount_cb fails.''' - dsac.CMD_PROBE_FLOPPY = self.cmd_pass + self.m_mount_cb.side_effect = util.MountFailedError("Failed Mount") dsrc = dsac.DataSourceAltCloud({}, None, self.paths) self.assertEqual(False, dsrc.user_data_rhevm()) def test_modprobe_fails(self): '''Test user_data_rhevm() where modprobe fails.''' - dsac.CMD_PROBE_FLOPPY = self.cmd_fail + self.m_modprobe_floppy.side_effect = util.ProcessExecutionError( + "Failed modprobe") dsrc = dsac.DataSourceAltCloud({}, None, self.paths) self.assertEqual(False, dsrc.user_data_rhevm()) def test_no_modprobe_cmd(self): '''Test user_data_rhevm() with no modprobe command.''' - dsac.CMD_PROBE_FLOPPY = self.cmd_not_found + self.m_modprobe_floppy.side_effect = util.ProcessExecutionError( + "No such file or dir") dsrc = dsac.DataSourceAltCloud({}, None, self.paths) self.assertEqual(False, dsrc.user_data_rhevm()) def test_udevadm_fails(self): '''Test user_data_rhevm() where udevadm fails.''' - dsac.CMD_UDEVADM_SETTLE = self.cmd_fail + self.m_udevadm_settle.side_effect = util.ProcessExecutionError( + "Failed settle.") dsrc = dsac.DataSourceAltCloud({}, None, self.paths) self.assertEqual(False, dsrc.user_data_rhevm()) def test_no_udevadm_cmd(self): '''Test user_data_rhevm() with no udevadm command.''' - dsac.CMD_UDEVADM_SETTLE = self.cmd_not_found + self.m_udevadm_settle.side_effect = OSError("No such file or dir") dsrc = dsac.DataSourceAltCloud({}, None, self.paths) self.assertEqual(False, dsrc.user_data_rhevm()) diff --git a/tests/unittests/test_datasource/test_azure.py b/tests/unittests/test_datasource/test_azure.py index e82716eb..4e428b71 100644 --- a/tests/unittests/test_datasource/test_azure.py +++ b/tests/unittests/test_datasource/test_azure.py @@ -1,15 +1,21 @@ # This file is part of cloud-init. See LICENSE file for license information. +from cloudinit import distros from cloudinit import helpers -from cloudinit.sources import DataSourceAzure as dsaz +from cloudinit import url_helper +from cloudinit.sources import ( + UNSET, DataSourceAzure as dsaz, InvalidMetaDataException) from cloudinit.util import (b64e, decode_binary, load_file, write_file, find_freebsd_part, get_path_dev_freebsd, MountFailedError) from cloudinit.version import version_string as vs -from cloudinit.tests.helpers import (CiTestCase, TestCase, populate_dir, mock, - ExitStack, PY26, SkipTest) +from cloudinit.tests.helpers import ( + HttprettyTestCase, CiTestCase, populate_dir, mock, wrap_and_call, + ExitStack, PY26, SkipTest) import crypt +import httpretty +import json import os import stat import xml.etree.ElementTree as ET @@ -77,6 +83,106 @@ def construct_valid_ovf_env(data=None, pubkeys=None, return content +NETWORK_METADATA = { + "network": { + "interface": [ + { + "macAddress": "000D3A047598", + "ipv6": { + "ipAddress": [] + }, + "ipv4": { + "subnet": [ + { + "prefix": "24", + "address": "10.0.0.0" + } + ], + "ipAddress": [ + { + "privateIpAddress": "10.0.0.4", + "publicIpAddress": "104.46.124.81" + } + ] + } + } + ] + } +} + + +class TestGetMetadataFromIMDS(HttprettyTestCase): + + with_logs = True + + def setUp(self): + super(TestGetMetadataFromIMDS, self).setUp() + self.network_md_url = dsaz.IMDS_URL + "instance?api-version=2017-12-01" + + @mock.patch('cloudinit.sources.DataSourceAzure.readurl') + @mock.patch('cloudinit.sources.DataSourceAzure.EphemeralDHCPv4') + @mock.patch('cloudinit.sources.DataSourceAzure.net.is_up') + def test_get_metadata_does_not_dhcp_if_network_is_up( + self, m_net_is_up, m_dhcp, m_readurl): + """Do not perform DHCP setup when nic is already up.""" + m_net_is_up.return_value = True + m_readurl.return_value = url_helper.StringResponse( + json.dumps(NETWORK_METADATA).encode('utf-8')) + self.assertEqual( + NETWORK_METADATA, + dsaz.get_metadata_from_imds('eth9', retries=3)) + + m_net_is_up.assert_called_with('eth9') + m_dhcp.assert_not_called() + self.assertIn( + "Crawl of Azure Instance Metadata Service (IMDS) took", # log_time + self.logs.getvalue()) + + @mock.patch('cloudinit.sources.DataSourceAzure.readurl') + @mock.patch('cloudinit.sources.DataSourceAzure.EphemeralDHCPv4') + @mock.patch('cloudinit.sources.DataSourceAzure.net.is_up') + def test_get_metadata_performs_dhcp_when_network_is_down( + self, m_net_is_up, m_dhcp, m_readurl): + """Perform DHCP setup when nic is not up.""" + m_net_is_up.return_value = False + m_readurl.return_value = url_helper.StringResponse( + json.dumps(NETWORK_METADATA).encode('utf-8')) + + self.assertEqual( + NETWORK_METADATA, + dsaz.get_metadata_from_imds('eth9', retries=2)) + + m_net_is_up.assert_called_with('eth9') + m_dhcp.assert_called_with('eth9') + self.assertIn( + "Crawl of Azure Instance Metadata Service (IMDS) took", # log_time + self.logs.getvalue()) + + m_readurl.assert_called_with( + self.network_md_url, exception_cb=mock.ANY, + headers={'Metadata': 'true'}, retries=2, timeout=1) + + @mock.patch('cloudinit.url_helper.time.sleep') + @mock.patch('cloudinit.sources.DataSourceAzure.net.is_up') + def test_get_metadata_from_imds_empty_when_no_imds_present( + self, m_net_is_up, m_sleep): + """Return empty dict when IMDS network metadata is absent.""" + httpretty.register_uri( + httpretty.GET, + dsaz.IMDS_URL + 'instance?api-version=2017-12-01', + body={}, status=404) + + m_net_is_up.return_value = True # skips dhcp + + self.assertEqual({}, dsaz.get_metadata_from_imds('eth9', retries=2)) + + m_net_is_up.assert_called_with('eth9') + self.assertEqual([mock.call(1), mock.call(1)], m_sleep.call_args_list) + self.assertIn( + "Crawl of Azure Instance Metadata Service (IMDS) took", # log_time + self.logs.getvalue()) + + class TestAzureDataSource(CiTestCase): with_logs = True @@ -95,8 +201,19 @@ class TestAzureDataSource(CiTestCase): self.patches = ExitStack() self.addCleanup(self.patches.close) - self.patches.enter_context(mock.patch.object(dsaz, '_get_random_seed')) - + self.patches.enter_context(mock.patch.object( + dsaz, '_get_random_seed', return_value='wild')) + self.m_get_metadata_from_imds = self.patches.enter_context( + mock.patch.object( + dsaz, 'get_metadata_from_imds', + mock.MagicMock(return_value=NETWORK_METADATA))) + self.m_fallback_nic = self.patches.enter_context( + mock.patch('cloudinit.sources.net.find_fallback_nic', + return_value='eth9')) + self.m_remove_ubuntu_network_scripts = self.patches.enter_context( + mock.patch.object( + dsaz, 'maybe_remove_ubuntu_network_config_scripts', + mock.MagicMock())) super(TestAzureDataSource, self).setUp() def apply_patches(self, patches): @@ -137,7 +254,7 @@ scbus-1 on xpt0 bus 0 ]) return dsaz - def _get_ds(self, data, agent_command=None): + def _get_ds(self, data, agent_command=None, distro=None): def dsdevs(): return data.get('dsdevs', []) @@ -186,8 +303,11 @@ scbus-1 on xpt0 bus 0 side_effect=_wait_for_files)), ]) + if distro is not None: + distro_cls = distros.fetch(distro) + distro = distro_cls(distro, data.get('sys_cfg', {}), self.paths) dsrc = dsaz.DataSourceAzure( - data.get('sys_cfg', {}), distro=None, paths=self.paths) + data.get('sys_cfg', {}), distro=distro, paths=self.paths) if agent_command is not None: dsrc.ds_cfg['agent_command'] = agent_command @@ -260,29 +380,20 @@ fdescfs /dev/fd fdescfs rw 0 0 res = get_path_dev_freebsd('/etc', mnt_list) self.assertIsNotNone(res) - @mock.patch('cloudinit.sources.DataSourceAzure.util.read_dmi_data') - def test_non_azure_dmi_chassis_asset_tag(self, m_read_dmi_data): - """Report non-azure when DMI's chassis asset tag doesn't match. - - Return False when the asset tag doesn't match Azure's static - AZURE_CHASSIS_ASSET_TAG. - """ + @mock.patch('cloudinit.sources.DataSourceAzure._is_platform_viable') + def test_call_is_platform_viable_seed(self, m_is_platform_viable): + """Check seed_dir using _is_platform_viable and return False.""" # Return a non-matching asset tag value - nonazure_tag = dsaz.AZURE_CHASSIS_ASSET_TAG + 'X' - m_read_dmi_data.return_value = nonazure_tag + m_is_platform_viable.return_value = False dsrc = dsaz.DataSourceAzure( {}, distro=None, paths=self.paths) self.assertFalse(dsrc.get_data()) - self.assertEqual( - "DEBUG: Non-Azure DMI asset tag '{0}' discovered.\n".format( - nonazure_tag), - self.logs.getvalue()) + m_is_platform_viable.assert_called_with(dsrc.seed_dir) def test_basic_seed_dir(self): odata = {'HostName': "myhost", 'UserName': "myuser"} data = {'ovfcontent': construct_valid_ovf_env(data=odata), 'sys_cfg': {}} - dsrc = self._get_ds(data) ret = dsrc.get_data() self.assertTrue(ret) @@ -291,6 +402,82 @@ fdescfs /dev/fd fdescfs rw 0 0 self.assertTrue(os.path.isfile( os.path.join(self.waagent_d, 'ovf-env.xml'))) + def test_get_data_non_ubuntu_will_not_remove_network_scripts(self): + """get_data on non-Ubuntu will not remove ubuntu net scripts.""" + odata = {'HostName': "myhost", 'UserName': "myuser"} + data = {'ovfcontent': construct_valid_ovf_env(data=odata), + 'sys_cfg': {}} + + dsrc = self._get_ds(data, distro='debian') + dsrc.get_data() + self.m_remove_ubuntu_network_scripts.assert_not_called() + + def test_get_data_on_ubuntu_will_remove_network_scripts(self): + """get_data will remove ubuntu net scripts on Ubuntu distro.""" + odata = {'HostName': "myhost", 'UserName': "myuser"} + data = {'ovfcontent': construct_valid_ovf_env(data=odata), + 'sys_cfg': {}} + + dsrc = self._get_ds(data, distro='ubuntu') + dsrc.get_data() + self.m_remove_ubuntu_network_scripts.assert_called_once_with() + + def test_crawl_metadata_returns_structured_data_and_caches_nothing(self): + """Return all structured metadata and cache no class attributes.""" + yaml_cfg = "{agent_command: my_command}\n" + odata = {'HostName': "myhost", 'UserName': "myuser", + 'UserData': {'text': 'FOOBAR', 'encoding': 'plain'}, + 'dscfg': {'text': yaml_cfg, 'encoding': 'plain'}} + data = {'ovfcontent': construct_valid_ovf_env(data=odata), + 'sys_cfg': {}} + dsrc = self._get_ds(data) + expected_cfg = { + 'PreprovisionedVm': False, + 'datasource': {'Azure': {'agent_command': 'my_command'}}, + 'system_info': {'default_user': {'name': u'myuser'}}} + expected_metadata = { + 'azure_data': { + 'configurationsettype': 'LinuxProvisioningConfiguration'}, + 'imds': {'network': {'interface': [{ + 'ipv4': {'ipAddress': [ + {'privateIpAddress': '10.0.0.4', + 'publicIpAddress': '104.46.124.81'}], + 'subnet': [{'address': '10.0.0.0', 'prefix': '24'}]}, + 'ipv6': {'ipAddress': []}, + 'macAddress': '000D3A047598'}]}}, + 'instance-id': 'test-instance-id', + 'local-hostname': u'myhost', + 'random_seed': 'wild'} + + crawled_metadata = dsrc.crawl_metadata() + + self.assertItemsEqual( + crawled_metadata.keys(), + ['cfg', 'files', 'metadata', 'userdata_raw']) + self.assertEqual(crawled_metadata['cfg'], expected_cfg) + self.assertEqual( + list(crawled_metadata['files'].keys()), ['ovf-env.xml']) + self.assertIn( + b'<HostName>myhost</HostName>', + crawled_metadata['files']['ovf-env.xml']) + self.assertEqual(crawled_metadata['metadata'], expected_metadata) + self.assertEqual(crawled_metadata['userdata_raw'], 'FOOBAR') + self.assertEqual(dsrc.userdata_raw, None) + self.assertEqual(dsrc.metadata, {}) + self.assertEqual(dsrc._metadata_imds, UNSET) + self.assertFalse(os.path.isfile( + os.path.join(self.waagent_d, 'ovf-env.xml'))) + + def test_crawl_metadata_raises_invalid_metadata_on_error(self): + """crawl_metadata raises an exception on invalid ovf-env.xml.""" + data = {'ovfcontent': "BOGUS", 'sys_cfg': {}} + dsrc = self._get_ds(data) + error_msg = ('BrokenAzureDataSource: Invalid ovf-env.xml:' + ' syntax error: line 1, column 0') + with self.assertRaises(InvalidMetaDataException) as cm: + dsrc.crawl_metadata() + self.assertEqual(str(cm.exception), error_msg) + def test_waagent_d_has_0700_perms(self): # we expect /var/lib/waagent to be created 0700 dsrc = self._get_ds({'ovfcontent': construct_valid_ovf_env()}) @@ -314,6 +501,20 @@ fdescfs /dev/fd fdescfs rw 0 0 self.assertTrue(ret) self.assertEqual(data['agent_invoked'], cfg['agent_command']) + def test_network_config_set_from_imds(self): + """Datasource.network_config returns IMDS network data.""" + odata = {} + data = {'ovfcontent': construct_valid_ovf_env(data=odata)} + expected_network_config = { + 'ethernets': { + 'eth0': {'set-name': 'eth0', + 'match': {'macaddress': '00:0d:3a:04:75:98'}, + 'dhcp4': True}}, + 'version': 2} + dsrc = self._get_ds(data) + dsrc.get_data() + self.assertEqual(expected_network_config, dsrc.network_config) + def test_user_cfg_set_agent_command(self): # set dscfg in via base64 encoded yaml cfg = {'agent_command': "my_command"} @@ -579,12 +780,34 @@ fdescfs /dev/fd fdescfs rw 0 0 self.assertEqual( [mock.call("/dev/cd0")], m_check_fbsd_cdrom.call_args_list) + @mock.patch('cloudinit.net.generate_fallback_config') + def test_imds_network_config(self, mock_fallback): + """Network config is generated from IMDS network data when present.""" + odata = {'HostName': "myhost", 'UserName': "myuser"} + data = {'ovfcontent': construct_valid_ovf_env(data=odata), + 'sys_cfg': {}} + + dsrc = self._get_ds(data) + ret = dsrc.get_data() + self.assertTrue(ret) + + expected_cfg = { + 'ethernets': { + 'eth0': {'dhcp4': True, + 'match': {'macaddress': '00:0d:3a:04:75:98'}, + 'set-name': 'eth0'}}, + 'version': 2} + + self.assertEqual(expected_cfg, dsrc.network_config) + mock_fallback.assert_not_called() + @mock.patch('cloudinit.net.get_interface_mac') @mock.patch('cloudinit.net.get_devicelist') @mock.patch('cloudinit.net.device_driver') @mock.patch('cloudinit.net.generate_fallback_config') - def test_network_config(self, mock_fallback, mock_dd, - mock_devlist, mock_get_mac): + def test_fallback_network_config(self, mock_fallback, mock_dd, + mock_devlist, mock_get_mac): + """On absent IMDS network data, generate network fallback config.""" odata = {'HostName': "myhost", 'UserName': "myuser"} data = {'ovfcontent': construct_valid_ovf_env(data=odata), 'sys_cfg': {}} @@ -605,6 +828,8 @@ fdescfs /dev/fd fdescfs rw 0 0 mock_get_mac.return_value = '00:11:22:33:44:55' dsrc = self._get_ds(data) + # Represent empty response from network imds + self.m_get_metadata_from_imds.return_value = {} ret = dsrc.get_data() self.assertTrue(ret) @@ -617,8 +842,9 @@ fdescfs /dev/fd fdescfs rw 0 0 @mock.patch('cloudinit.net.get_devicelist') @mock.patch('cloudinit.net.device_driver') @mock.patch('cloudinit.net.generate_fallback_config') - def test_network_config_blacklist(self, mock_fallback, mock_dd, - mock_devlist, mock_get_mac): + def test_fallback_network_config_blacklist(self, mock_fallback, mock_dd, + mock_devlist, mock_get_mac): + """On absent network metadata, blacklist mlx from fallback config.""" odata = {'HostName': "myhost", 'UserName': "myuser"} data = {'ovfcontent': construct_valid_ovf_env(data=odata), 'sys_cfg': {}} @@ -649,6 +875,8 @@ fdescfs /dev/fd fdescfs rw 0 0 mock_get_mac.return_value = '00:11:22:33:44:55' dsrc = self._get_ds(data) + # Represent empty response from network imds + self.m_get_metadata_from_imds.return_value = {} ret = dsrc.get_data() self.assertTrue(ret) @@ -689,9 +917,12 @@ class TestAzureBounce(CiTestCase): mock.patch.object(dsaz, 'get_metadata_from_fabric', mock.MagicMock(return_value={}))) self.patches.enter_context( - mock.patch.object(dsaz.util, 'which', lambda x: True)) + mock.patch.object(dsaz, 'get_metadata_from_imds', + mock.MagicMock(return_value={}))) self.patches.enter_context( - mock.patch.object(dsaz, '_get_random_seed')) + mock.patch.object(dsaz.util, 'which', lambda x: True)) + self.patches.enter_context(mock.patch.object( + dsaz, '_get_random_seed', return_value='wild')) def _dmi_mocks(key): if key == 'system-uuid': @@ -719,9 +950,12 @@ class TestAzureBounce(CiTestCase): mock.patch.object(dsaz, 'set_hostname')) self.subp = self.patches.enter_context( mock.patch('cloudinit.sources.DataSourceAzure.util.subp')) + self.find_fallback_nic = self.patches.enter_context( + mock.patch('cloudinit.net.find_fallback_nic', return_value='eth9')) def tearDown(self): self.patches.close() + super(TestAzureBounce, self).tearDown() def _get_ds(self, ovfcontent=None, agent_command=None): if ovfcontent is not None: @@ -927,7 +1161,7 @@ class TestLoadAzureDsDir(CiTestCase): str(context_manager.exception)) -class TestReadAzureOvf(TestCase): +class TestReadAzureOvf(CiTestCase): def test_invalid_xml_raises_non_azure_ds(self): invalid_xml = "<foo>" + construct_valid_ovf_env(data={}) @@ -1188,6 +1422,25 @@ class TestCanDevBeReformatted(CiTestCase): "(datasource.Azure.never_destroy_ntfs)", msg) +class TestClearCachedData(CiTestCase): + + def test_clear_cached_attrs_clears_imds(self): + """All class attributes are reset to defaults, including imds data.""" + tmp = self.tmp_dir() + paths = helpers.Paths( + {'cloud_dir': tmp, 'run_dir': tmp}) + dsrc = dsaz.DataSourceAzure({}, distro=None, paths=paths) + clean_values = [dsrc.metadata, dsrc.userdata, dsrc._metadata_imds] + dsrc.metadata = 'md' + dsrc.userdata = 'ud' + dsrc._metadata_imds = 'imds' + dsrc._dirty_cache = True + dsrc.clear_cached_attrs() + self.assertEqual( + [dsrc.metadata, dsrc.userdata, dsrc._metadata_imds], + clean_values) + + class TestAzureNetExists(CiTestCase): def test_azure_net_must_exist_for_legacy_objpkl(self): @@ -1398,4 +1651,94 @@ class TestAzureDataSourcePreprovisioning(CiTestCase): self.assertEqual(m_net.call_count, 1) +class TestRemoveUbuntuNetworkConfigScripts(CiTestCase): + + with_logs = True + + def setUp(self): + super(TestRemoveUbuntuNetworkConfigScripts, self).setUp() + self.tmp = self.tmp_dir() + + def test_remove_network_scripts_removes_both_files_and_directories(self): + """Any files or directories in paths are removed when present.""" + file1 = self.tmp_path('file1', dir=self.tmp) + subdir = self.tmp_path('sub1', dir=self.tmp) + subfile = self.tmp_path('leaf1', dir=subdir) + write_file(file1, 'file1content') + write_file(subfile, 'leafcontent') + dsaz.maybe_remove_ubuntu_network_config_scripts(paths=[subdir, file1]) + + for path in (file1, subdir, subfile): + self.assertFalse(os.path.exists(path), + 'Found unremoved: %s' % path) + + expected_logs = [ + 'INFO: Removing Ubuntu extended network scripts because cloud-init' + ' updates Azure network configuration on the following event:' + ' System boot.', + 'Recursively deleting %s' % subdir, + 'Attempting to remove %s' % file1] + for log in expected_logs: + self.assertIn(log, self.logs.getvalue()) + + def test_remove_network_scripts_only_attempts_removal_if_path_exists(self): + """Any files or directories absent are skipped without error.""" + dsaz.maybe_remove_ubuntu_network_config_scripts(paths=[ + self.tmp_path('nodirhere/', dir=self.tmp), + self.tmp_path('notfilehere', dir=self.tmp)]) + self.assertNotIn('/not/a', self.logs.getvalue()) # No delete logs + + @mock.patch('cloudinit.sources.DataSourceAzure.os.path.exists') + def test_remove_network_scripts_default_removes_stock_scripts(self, + m_exists): + """Azure's stock ubuntu image scripts and artifacts are removed.""" + # Report path absent on all to avoid delete operation + m_exists.return_value = False + dsaz.maybe_remove_ubuntu_network_config_scripts() + calls = m_exists.call_args_list + for path in dsaz.UBUNTU_EXTENDED_NETWORK_SCRIPTS: + self.assertIn(mock.call(path), calls) + + +class TestWBIsPlatformViable(CiTestCase): + """White box tests for _is_platform_viable.""" + with_logs = True + + @mock.patch('cloudinit.sources.DataSourceAzure.util.read_dmi_data') + def test_true_on_non_azure_chassis(self, m_read_dmi_data): + """Return True if DMI chassis-asset-tag is AZURE_CHASSIS_ASSET_TAG.""" + m_read_dmi_data.return_value = dsaz.AZURE_CHASSIS_ASSET_TAG + self.assertTrue(dsaz._is_platform_viable('doesnotmatter')) + + @mock.patch('cloudinit.sources.DataSourceAzure.os.path.exists') + @mock.patch('cloudinit.sources.DataSourceAzure.util.read_dmi_data') + def test_true_on_azure_ovf_env_in_seed_dir(self, m_read_dmi_data, m_exist): + """Return True if ovf-env.xml exists in known seed dirs.""" + # Non-matching Azure chassis-asset-tag + m_read_dmi_data.return_value = dsaz.AZURE_CHASSIS_ASSET_TAG + 'X' + + m_exist.return_value = True + self.assertTrue(dsaz._is_platform_viable('/some/seed/dir')) + m_exist.called_once_with('/other/seed/dir') + + def test_false_on_no_matching_azure_criteria(self): + """Report non-azure on unmatched asset tag, ovf-env absent and no dev. + + Return False when the asset tag doesn't match Azure's static + AZURE_CHASSIS_ASSET_TAG, no ovf-env.xml files exist in known seed dirs + and no devices have a label starting with prefix 'rd_rdfe_'. + """ + self.assertFalse(wrap_and_call( + 'cloudinit.sources.DataSourceAzure', + {'os.path.exists': False, + # Non-matching Azure chassis-asset-tag + 'util.read_dmi_data': dsaz.AZURE_CHASSIS_ASSET_TAG + 'X', + 'util.which': None}, + dsaz._is_platform_viable, 'doesnotmatter')) + self.assertIn( + "DEBUG: Non-Azure DMI asset tag '{0}' discovered.\n".format( + dsaz.AZURE_CHASSIS_ASSET_TAG + 'X'), + self.logs.getvalue()) + + # vi: ts=4 expandtab diff --git a/tests/unittests/test_datasource/test_cloudsigma.py b/tests/unittests/test_datasource/test_cloudsigma.py index f6a59b6b..380ad1b5 100644 --- a/tests/unittests/test_datasource/test_cloudsigma.py +++ b/tests/unittests/test_datasource/test_cloudsigma.py @@ -42,6 +42,9 @@ class CepkoMock(Cepko): class DataSourceCloudSigmaTest(test_helpers.CiTestCase): def setUp(self): super(DataSourceCloudSigmaTest, self).setUp() + self.add_patch( + "cloudinit.sources.DataSourceCloudSigma.util.is_container", + "m_is_container", return_value=False) self.paths = helpers.Paths({'run_dir': self.tmp_dir()}) self.datasource = DataSourceCloudSigma.DataSourceCloudSigma( "", "", paths=self.paths) diff --git a/tests/unittests/test_datasource/test_common.py b/tests/unittests/test_datasource/test_common.py index 0d35dc29..6b01a4ea 100644 --- a/tests/unittests/test_datasource/test_common.py +++ b/tests/unittests/test_datasource/test_common.py @@ -20,6 +20,7 @@ from cloudinit.sources import ( DataSourceNoCloud as NoCloud, DataSourceOpenNebula as OpenNebula, DataSourceOpenStack as OpenStack, + DataSourceOracle as Oracle, DataSourceOVF as OVF, DataSourceScaleway as Scaleway, DataSourceSmartOS as SmartOS, @@ -37,10 +38,12 @@ DEFAULT_LOCAL = [ IBMCloud.DataSourceIBMCloud, NoCloud.DataSourceNoCloud, OpenNebula.DataSourceOpenNebula, + Oracle.DataSourceOracle, OVF.DataSourceOVF, SmartOS.DataSourceSmartOS, Ec2.DataSourceEc2Local, OpenStack.DataSourceOpenStackLocal, + Scaleway.DataSourceScaleway, ] DEFAULT_NETWORK = [ @@ -55,7 +58,6 @@ DEFAULT_NETWORK = [ NoCloud.DataSourceNoCloudNet, OpenStack.DataSourceOpenStack, OVF.DataSourceOVFNet, - Scaleway.DataSourceScaleway, ] diff --git a/tests/unittests/test_datasource/test_configdrive.py b/tests/unittests/test_datasource/test_configdrive.py index 68400f22..231619c9 100644 --- a/tests/unittests/test_datasource/test_configdrive.py +++ b/tests/unittests/test_datasource/test_configdrive.py @@ -136,6 +136,7 @@ NETWORK_DATA_3 = { ] } +BOND_MAC = "fa:16:3e:b3:72:36" NETWORK_DATA_BOND = { "services": [ {"type": "dns", "address": "1.1.1.191"}, @@ -163,7 +164,7 @@ NETWORK_DATA_BOND = { {"bond_links": ["eth0", "eth1"], "bond_miimon": 100, "bond_mode": "4", "bond_xmit_hash_policy": "layer3+4", - "ethernet_mac_address": "0c:c4:7a:34:6e:3c", + "ethernet_mac_address": BOND_MAC, "id": "bond0", "type": "bond"}, {"ethernet_mac_address": "fa:16:3e:b3:72:30", "id": "vlan2", "type": "vlan", "vlan_id": 602, @@ -224,6 +225,9 @@ class TestConfigDriveDataSource(CiTestCase): def setUp(self): super(TestConfigDriveDataSource, self).setUp() + self.add_patch( + "cloudinit.sources.DataSourceConfigDrive.util.find_devs_with", + "m_find_devs_with", return_value=[]) self.tmp = self.tmp_dir() def test_ec2_metadata(self): @@ -642,7 +646,7 @@ class TestConvertNetworkData(CiTestCase): routes) eni_renderer = eni.Renderer() eni_renderer.render_network_state( - network_state.parse_net_config_data(ncfg), self.tmp) + network_state.parse_net_config_data(ncfg), target=self.tmp) with open(os.path.join(self.tmp, "etc", "network", "interfaces"), 'r') as f: eni_rendering = f.read() @@ -664,7 +668,7 @@ class TestConvertNetworkData(CiTestCase): eni_renderer = eni.Renderer() eni_renderer.render_network_state( - network_state.parse_net_config_data(ncfg), self.tmp) + network_state.parse_net_config_data(ncfg), target=self.tmp) with open(os.path.join(self.tmp, "etc", "network", "interfaces"), 'r') as f: eni_rendering = f.read() @@ -688,6 +692,9 @@ class TestConvertNetworkData(CiTestCase): self.assertIn("auto oeth0", eni_rendering) self.assertIn("auto oeth1", eni_rendering) self.assertIn("auto bond0", eni_rendering) + # The bond should have the given mac address + pos = eni_rendering.find("auto bond0") + self.assertIn(BOND_MAC, eni_rendering[pos:]) def test_vlan(self): # light testing of vlan config conversion and eni rendering @@ -695,7 +702,7 @@ class TestConvertNetworkData(CiTestCase): known_macs=KNOWN_MACS) eni_renderer = eni.Renderer() eni_renderer.render_network_state( - network_state.parse_net_config_data(ncfg), self.tmp) + network_state.parse_net_config_data(ncfg), target=self.tmp) with open(os.path.join(self.tmp, "etc", "network", "interfaces"), 'r') as f: eni_rendering = f.read() diff --git a/tests/unittests/test_datasource/test_nocloud.py b/tests/unittests/test_datasource/test_nocloud.py index cdbd1e1a..21931eb7 100644 --- a/tests/unittests/test_datasource/test_nocloud.py +++ b/tests/unittests/test_datasource/test_nocloud.py @@ -25,6 +25,8 @@ class TestNoCloudDataSource(CiTestCase): self.mocks.enter_context( mock.patch.object(util, 'get_cmdline', return_value=self.cmdline)) + self.mocks.enter_context( + mock.patch.object(util, 'read_dmi_data', return_value=None)) def test_nocloud_seed_dir(self): md = {'instance-id': 'IID', 'dsmode': 'local'} diff --git a/tests/unittests/test_datasource/test_opennebula.py b/tests/unittests/test_datasource/test_opennebula.py index ab42f344..61591017 100644 --- a/tests/unittests/test_datasource/test_opennebula.py +++ b/tests/unittests/test_datasource/test_opennebula.py @@ -43,6 +43,7 @@ DS_PATH = "cloudinit.sources.DataSourceOpenNebula" class TestOpenNebulaDataSource(CiTestCase): parsed_user = None + allowed_subp = ['bash'] def setUp(self): super(TestOpenNebulaDataSource, self).setUp() @@ -354,6 +355,412 @@ class TestOpenNebulaNetwork(unittest.TestCase): system_nics = ('eth0', 'ens3') + def test_context_devname(self): + """Verify context_devname correctly returns mac and name.""" + context = { + 'ETH0_MAC': '02:00:0a:12:01:01', + 'ETH1_MAC': '02:00:0a:12:0f:0f', } + expected = { + '02:00:0a:12:01:01': 'ETH0', + '02:00:0a:12:0f:0f': 'ETH1', } + net = ds.OpenNebulaNetwork(context) + self.assertEqual(expected, net.context_devname) + + def test_get_nameservers(self): + """ + Verify get_nameservers('device') correctly returns DNS server addresses + and search domains. + """ + context = { + 'DNS': '1.2.3.8', + 'ETH0_DNS': '1.2.3.6 1.2.3.7', + 'ETH0_SEARCH_DOMAIN': 'example.com example.org', } + expected = { + 'addresses': ['1.2.3.6', '1.2.3.7', '1.2.3.8'], + 'search': ['example.com', 'example.org']} + net = ds.OpenNebulaNetwork(context) + val = net.get_nameservers('eth0') + self.assertEqual(expected, val) + + def test_get_mtu(self): + """Verify get_mtu('device') correctly returns MTU size.""" + context = {'ETH0_MTU': '1280'} + net = ds.OpenNebulaNetwork(context) + val = net.get_mtu('eth0') + self.assertEqual('1280', val) + + def test_get_ip(self): + """Verify get_ip('device') correctly returns IPv4 address.""" + context = {'ETH0_IP': PUBLIC_IP} + net = ds.OpenNebulaNetwork(context) + val = net.get_ip('eth0', MACADDR) + self.assertEqual(PUBLIC_IP, val) + + def test_get_ip_emptystring(self): + """ + Verify get_ip('device') correctly returns IPv4 address. + It returns IP address created by MAC address if ETH0_IP has empty + string. + """ + context = {'ETH0_IP': ''} + net = ds.OpenNebulaNetwork(context) + val = net.get_ip('eth0', MACADDR) + self.assertEqual(IP_BY_MACADDR, val) + + def test_get_ip6(self): + """ + Verify get_ip6('device') correctly returns IPv6 address. + In this case, IPv6 address is Given by ETH0_IP6. + """ + context = { + 'ETH0_IP6': IP6_GLOBAL, + 'ETH0_IP6_ULA': '', } + expected = [IP6_GLOBAL] + net = ds.OpenNebulaNetwork(context) + val = net.get_ip6('eth0') + self.assertEqual(expected, val) + + def test_get_ip6_ula(self): + """ + Verify get_ip6('device') correctly returns IPv6 address. + In this case, IPv6 address is Given by ETH0_IP6_ULA. + """ + context = { + 'ETH0_IP6': '', + 'ETH0_IP6_ULA': IP6_ULA, } + expected = [IP6_ULA] + net = ds.OpenNebulaNetwork(context) + val = net.get_ip6('eth0') + self.assertEqual(expected, val) + + def test_get_ip6_dual(self): + """ + Verify get_ip6('device') correctly returns IPv6 address. + In this case, IPv6 addresses are Given by ETH0_IP6 and ETH0_IP6_ULA. + """ + context = { + 'ETH0_IP6': IP6_GLOBAL, + 'ETH0_IP6_ULA': IP6_ULA, } + expected = [IP6_GLOBAL, IP6_ULA] + net = ds.OpenNebulaNetwork(context) + val = net.get_ip6('eth0') + self.assertEqual(expected, val) + + def test_get_ip6_prefix(self): + """ + Verify get_ip6_prefix('device') correctly returns IPv6 prefix. + """ + context = {'ETH0_IP6_PREFIX_LENGTH': IP6_PREFIX} + net = ds.OpenNebulaNetwork(context) + val = net.get_ip6_prefix('eth0') + self.assertEqual(IP6_PREFIX, val) + + def test_get_ip6_prefix_emptystring(self): + """ + Verify get_ip6_prefix('device') correctly returns IPv6 prefix. + It returns default value '64' if ETH0_IP6_PREFIX_LENGTH has empty + string. + """ + context = {'ETH0_IP6_PREFIX_LENGTH': ''} + net = ds.OpenNebulaNetwork(context) + val = net.get_ip6_prefix('eth0') + self.assertEqual('64', val) + + def test_get_gateway(self): + """ + Verify get_gateway('device') correctly returns IPv4 default gateway + address. + """ + context = {'ETH0_GATEWAY': '1.2.3.5'} + net = ds.OpenNebulaNetwork(context) + val = net.get_gateway('eth0') + self.assertEqual('1.2.3.5', val) + + def test_get_gateway6(self): + """ + Verify get_gateway6('device') correctly returns IPv6 default gateway + address. + """ + context = {'ETH0_GATEWAY6': IP6_GW} + net = ds.OpenNebulaNetwork(context) + val = net.get_gateway6('eth0') + self.assertEqual(IP6_GW, val) + + def test_get_mask(self): + """ + Verify get_mask('device') correctly returns IPv4 subnet mask. + """ + context = {'ETH0_MASK': '255.255.0.0'} + net = ds.OpenNebulaNetwork(context) + val = net.get_mask('eth0') + self.assertEqual('255.255.0.0', val) + + def test_get_mask_emptystring(self): + """ + Verify get_mask('device') correctly returns IPv4 subnet mask. + It returns default value '255.255.255.0' if ETH0_MASK has empty string. + """ + context = {'ETH0_MASK': ''} + net = ds.OpenNebulaNetwork(context) + val = net.get_mask('eth0') + self.assertEqual('255.255.255.0', val) + + def test_get_network(self): + """ + Verify get_network('device') correctly returns IPv4 network address. + """ + context = {'ETH0_NETWORK': '1.2.3.0'} + net = ds.OpenNebulaNetwork(context) + val = net.get_network('eth0', MACADDR) + self.assertEqual('1.2.3.0', val) + + def test_get_network_emptystring(self): + """ + Verify get_network('device') correctly returns IPv4 network address. + It returns network address created by MAC address if ETH0_NETWORK has + empty string. + """ + context = {'ETH0_NETWORK': ''} + net = ds.OpenNebulaNetwork(context) + val = net.get_network('eth0', MACADDR) + self.assertEqual('10.18.1.0', val) + + def test_get_field(self): + """ + Verify get_field('device', 'name') returns *context* value. + """ + context = {'ETH9_DUMMY': 'DUMMY_VALUE'} + net = ds.OpenNebulaNetwork(context) + val = net.get_field('eth9', 'dummy') + self.assertEqual('DUMMY_VALUE', val) + + def test_get_field_withdefaultvalue(self): + """ + Verify get_field('device', 'name', 'default value') returns *context* + value. + """ + context = {'ETH9_DUMMY': 'DUMMY_VALUE'} + net = ds.OpenNebulaNetwork(context) + val = net.get_field('eth9', 'dummy', 'DEFAULT_VALUE') + self.assertEqual('DUMMY_VALUE', val) + + def test_get_field_withdefaultvalue_emptycontext(self): + """ + Verify get_field('device', 'name', 'default value') returns *default* + value if context value is empty string. + """ + context = {'ETH9_DUMMY': ''} + net = ds.OpenNebulaNetwork(context) + val = net.get_field('eth9', 'dummy', 'DEFAULT_VALUE') + self.assertEqual('DEFAULT_VALUE', val) + + def test_get_field_emptycontext(self): + """ + Verify get_field('device', 'name') returns None if context value is + empty string. + """ + context = {'ETH9_DUMMY': ''} + net = ds.OpenNebulaNetwork(context) + val = net.get_field('eth9', 'dummy') + self.assertEqual(None, val) + + def test_get_field_nonecontext(self): + """ + Verify get_field('device', 'name') returns None if context value is + None. + """ + context = {'ETH9_DUMMY': None} + net = ds.OpenNebulaNetwork(context) + val = net.get_field('eth9', 'dummy') + self.assertEqual(None, val) + + @mock.patch(DS_PATH + ".get_physical_nics_by_mac") + def test_gen_conf_gateway(self, m_get_phys_by_mac): + """Test rendering with/without IPv4 gateway""" + self.maxDiff = None + # empty ETH0_GATEWAY + context = { + 'ETH0_MAC': '02:00:0a:12:01:01', + 'ETH0_GATEWAY': '', } + for nic in self.system_nics: + expected = { + 'version': 2, + 'ethernets': { + nic: { + 'match': {'macaddress': MACADDR}, + 'addresses': [IP_BY_MACADDR + '/' + IP4_PREFIX]}}} + m_get_phys_by_mac.return_value = {MACADDR: nic} + net = ds.OpenNebulaNetwork(context) + self.assertEqual(net.gen_conf(), expected) + + # set ETH0_GATEWAY + context = { + 'ETH0_MAC': '02:00:0a:12:01:01', + 'ETH0_GATEWAY': '1.2.3.5', } + for nic in self.system_nics: + expected = { + 'version': 2, + 'ethernets': { + nic: { + 'gateway4': '1.2.3.5', + 'match': {'macaddress': MACADDR}, + 'addresses': [IP_BY_MACADDR + '/' + IP4_PREFIX]}}} + m_get_phys_by_mac.return_value = {MACADDR: nic} + net = ds.OpenNebulaNetwork(context) + self.assertEqual(net.gen_conf(), expected) + + @mock.patch(DS_PATH + ".get_physical_nics_by_mac") + def test_gen_conf_gateway6(self, m_get_phys_by_mac): + """Test rendering with/without IPv6 gateway""" + self.maxDiff = None + # empty ETH0_GATEWAY6 + context = { + 'ETH0_MAC': '02:00:0a:12:01:01', + 'ETH0_GATEWAY6': '', } + for nic in self.system_nics: + expected = { + 'version': 2, + 'ethernets': { + nic: { + 'match': {'macaddress': MACADDR}, + 'addresses': [IP_BY_MACADDR + '/' + IP4_PREFIX]}}} + m_get_phys_by_mac.return_value = {MACADDR: nic} + net = ds.OpenNebulaNetwork(context) + self.assertEqual(net.gen_conf(), expected) + + # set ETH0_GATEWAY6 + context = { + 'ETH0_MAC': '02:00:0a:12:01:01', + 'ETH0_GATEWAY6': IP6_GW, } + for nic in self.system_nics: + expected = { + 'version': 2, + 'ethernets': { + nic: { + 'gateway6': IP6_GW, + 'match': {'macaddress': MACADDR}, + 'addresses': [IP_BY_MACADDR + '/' + IP4_PREFIX]}}} + m_get_phys_by_mac.return_value = {MACADDR: nic} + net = ds.OpenNebulaNetwork(context) + self.assertEqual(net.gen_conf(), expected) + + @mock.patch(DS_PATH + ".get_physical_nics_by_mac") + def test_gen_conf_ipv6address(self, m_get_phys_by_mac): + """Test rendering with/without IPv6 address""" + self.maxDiff = None + # empty ETH0_IP6, ETH0_IP6_ULA, ETH0_IP6_PREFIX_LENGTH + context = { + 'ETH0_MAC': '02:00:0a:12:01:01', + 'ETH0_IP6': '', + 'ETH0_IP6_ULA': '', + 'ETH0_IP6_PREFIX_LENGTH': '', } + for nic in self.system_nics: + expected = { + 'version': 2, + 'ethernets': { + nic: { + 'match': {'macaddress': MACADDR}, + 'addresses': [IP_BY_MACADDR + '/' + IP4_PREFIX]}}} + m_get_phys_by_mac.return_value = {MACADDR: nic} + net = ds.OpenNebulaNetwork(context) + self.assertEqual(net.gen_conf(), expected) + + # set ETH0_IP6, ETH0_IP6_ULA, ETH0_IP6_PREFIX_LENGTH + context = { + 'ETH0_MAC': '02:00:0a:12:01:01', + 'ETH0_IP6': IP6_GLOBAL, + 'ETH0_IP6_PREFIX_LENGTH': IP6_PREFIX, + 'ETH0_IP6_ULA': IP6_ULA, } + for nic in self.system_nics: + expected = { + 'version': 2, + 'ethernets': { + nic: { + 'match': {'macaddress': MACADDR}, + 'addresses': [ + IP_BY_MACADDR + '/' + IP4_PREFIX, + IP6_GLOBAL + '/' + IP6_PREFIX, + IP6_ULA + '/' + IP6_PREFIX]}}} + m_get_phys_by_mac.return_value = {MACADDR: nic} + net = ds.OpenNebulaNetwork(context) + self.assertEqual(net.gen_conf(), expected) + + @mock.patch(DS_PATH + ".get_physical_nics_by_mac") + def test_gen_conf_dns(self, m_get_phys_by_mac): + """Test rendering with/without DNS server, search domain""" + self.maxDiff = None + # empty DNS, ETH0_DNS, ETH0_SEARCH_DOMAIN + context = { + 'ETH0_MAC': '02:00:0a:12:01:01', + 'DNS': '', + 'ETH0_DNS': '', + 'ETH0_SEARCH_DOMAIN': '', } + for nic in self.system_nics: + expected = { + 'version': 2, + 'ethernets': { + nic: { + 'match': {'macaddress': MACADDR}, + 'addresses': [IP_BY_MACADDR + '/' + IP4_PREFIX]}}} + m_get_phys_by_mac.return_value = {MACADDR: nic} + net = ds.OpenNebulaNetwork(context) + self.assertEqual(net.gen_conf(), expected) + + # set DNS, ETH0_DNS, ETH0_SEARCH_DOMAIN + context = { + 'ETH0_MAC': '02:00:0a:12:01:01', + 'DNS': '1.2.3.8', + 'ETH0_DNS': '1.2.3.6 1.2.3.7', + 'ETH0_SEARCH_DOMAIN': 'example.com example.org', } + for nic in self.system_nics: + expected = { + 'version': 2, + 'ethernets': { + nic: { + 'nameservers': { + 'addresses': ['1.2.3.6', '1.2.3.7', '1.2.3.8'], + 'search': ['example.com', 'example.org']}, + 'match': {'macaddress': MACADDR}, + 'addresses': [IP_BY_MACADDR + '/' + IP4_PREFIX]}}} + m_get_phys_by_mac.return_value = {MACADDR: nic} + net = ds.OpenNebulaNetwork(context) + self.assertEqual(net.gen_conf(), expected) + + @mock.patch(DS_PATH + ".get_physical_nics_by_mac") + def test_gen_conf_mtu(self, m_get_phys_by_mac): + """Test rendering with/without MTU""" + self.maxDiff = None + # empty ETH0_MTU + context = { + 'ETH0_MAC': '02:00:0a:12:01:01', + 'ETH0_MTU': '', } + for nic in self.system_nics: + expected = { + 'version': 2, + 'ethernets': { + nic: { + 'match': {'macaddress': MACADDR}, + 'addresses': [IP_BY_MACADDR + '/' + IP4_PREFIX]}}} + m_get_phys_by_mac.return_value = {MACADDR: nic} + net = ds.OpenNebulaNetwork(context) + self.assertEqual(net.gen_conf(), expected) + + # set ETH0_MTU + context = { + 'ETH0_MAC': '02:00:0a:12:01:01', + 'ETH0_MTU': '1280', } + for nic in self.system_nics: + expected = { + 'version': 2, + 'ethernets': { + nic: { + 'mtu': '1280', + 'match': {'macaddress': MACADDR}, + 'addresses': [IP_BY_MACADDR + '/' + IP4_PREFIX]}}} + m_get_phys_by_mac.return_value = {MACADDR: nic} + net = ds.OpenNebulaNetwork(context) + self.assertEqual(net.gen_conf(), expected) + @mock.patch(DS_PATH + ".get_physical_nics_by_mac") def test_eth0(self, m_get_phys_by_mac): for nic in self.system_nics: @@ -395,7 +802,6 @@ class TestOpenNebulaNetwork(unittest.TestCase): 'match': {'macaddress': MACADDR}, 'addresses': [IP_BY_MACADDR + '/16'], 'gateway4': '1.2.3.5', - 'gateway6': None, 'nameservers': { 'addresses': ['1.2.3.6', '1.2.3.7', '1.2.3.8']}}}} @@ -494,7 +900,6 @@ class TestOpenNebulaNetwork(unittest.TestCase): 'match': {'macaddress': MAC_1}, 'addresses': ['10.3.1.3/16'], 'gateway4': '10.3.0.1', - 'gateway6': None, 'nameservers': { 'addresses': ['10.3.1.2', '1.2.3.8'], 'search': [ diff --git a/tests/unittests/test_datasource/test_openstack.py b/tests/unittests/test_datasource/test_openstack.py index 585acc33..a731f1ed 100644 --- a/tests/unittests/test_datasource/test_openstack.py +++ b/tests/unittests/test_datasource/test_openstack.py @@ -12,11 +12,11 @@ import re from cloudinit.tests import helpers as test_helpers from six.moves.urllib.parse import urlparse -from six import StringIO +from six import StringIO, text_type from cloudinit import helpers from cloudinit import settings -from cloudinit.sources import convert_vendordata, UNSET +from cloudinit.sources import BrokenMetadata, convert_vendordata, UNSET from cloudinit.sources import DataSourceOpenStack as ds from cloudinit.sources.helpers import openstack from cloudinit import util @@ -186,7 +186,7 @@ class TestOpenStackDataSource(test_helpers.HttprettyTestCase): if k.endswith('meta_data.json'): os_files[k] = json.dumps(os_meta) _register_uris(self.VERSION, {}, {}, os_files) - self.assertRaises(openstack.BrokenMetadata, _read_metadata_service) + self.assertRaises(BrokenMetadata, _read_metadata_service) def test_userdata_empty(self): os_files = copy.deepcopy(OS_FILES) @@ -217,7 +217,7 @@ class TestOpenStackDataSource(test_helpers.HttprettyTestCase): if k.endswith('vendor_data.json'): os_files[k] = '{' # some invalid json _register_uris(self.VERSION, {}, {}, os_files) - self.assertRaises(openstack.BrokenMetadata, _read_metadata_service) + self.assertRaises(BrokenMetadata, _read_metadata_service) def test_metadata_invalid(self): os_files = copy.deepcopy(OS_FILES) @@ -225,7 +225,7 @@ class TestOpenStackDataSource(test_helpers.HttprettyTestCase): if k.endswith('meta_data.json'): os_files[k] = '{' # some invalid json _register_uris(self.VERSION, {}, {}, os_files) - self.assertRaises(openstack.BrokenMetadata, _read_metadata_service) + self.assertRaises(BrokenMetadata, _read_metadata_service) @test_helpers.mock.patch('cloudinit.net.dhcp.maybe_perform_dhcp_discovery') def test_datasource(self, m_dhcp): @@ -510,6 +510,27 @@ class TestDetectOpenStack(test_helpers.CiTestCase): ds.detect_openstack(), 'Expected detect_openstack == True on OpenTelekomCloud') + @test_helpers.mock.patch(MOCK_PATH + 'util.read_dmi_data') + def test_detect_openstack_oraclecloud_chassis_asset_tag(self, m_dmi, + m_is_x86): + """Return True on OpenStack reporting Oracle cloud asset-tag.""" + m_is_x86.return_value = True + + def fake_dmi_read(dmi_key): + if dmi_key == 'system-product-name': + return 'Standard PC (i440FX + PIIX, 1996)' # No match + if dmi_key == 'chassis-asset-tag': + return 'OracleCloud.com' + assert False, 'Unexpected dmi read of %s' % dmi_key + + m_dmi.side_effect = fake_dmi_read + self.assertTrue( + ds.detect_openstack(accept_oracle=True), + 'Expected detect_openstack == True on OracleCloud.com') + self.assertFalse( + ds.detect_openstack(accept_oracle=False), + 'Expected detect_openstack == False.') + @test_helpers.mock.patch(MOCK_PATH + 'util.get_proc_env') @test_helpers.mock.patch(MOCK_PATH + 'util.read_dmi_data') def test_detect_openstack_by_proc_1_environ(self, m_dmi, m_proc_env, @@ -534,4 +555,94 @@ class TestDetectOpenStack(test_helpers.CiTestCase): m_proc_env.assert_called_with(1) +class TestMetadataReader(test_helpers.HttprettyTestCase): + """Test the MetadataReader.""" + burl = 'http://169.254.169.254/' + md_base = { + 'availability_zone': 'myaz1', + 'hostname': 'sm-foo-test.novalocal', + "keys": [{"data": PUBKEY, "name": "brickies", "type": "ssh"}], + 'launch_index': 0, + 'name': 'sm-foo-test', + 'public_keys': {'mykey': PUBKEY}, + 'project_id': '6a103f813b774b9fb15a4fcd36e1c056', + 'uuid': 'b0fa911b-69d4-4476-bbe2-1c92bff6535c'} + + def register(self, path, body=None, status=200): + content = (body if not isinstance(body, text_type) + else body.encode('utf-8')) + hp.register_uri( + hp.GET, self.burl + "openstack" + path, status=status, + body=content) + + def register_versions(self, versions): + self.register("", '\n'.join(versions)) + self.register("/", '\n'.join(versions)) + + def register_version(self, version, data): + content = '\n'.join(sorted(data.keys())) + self.register(version, content) + self.register(version + "/", content) + for path, content in data.items(): + self.register("/%s/%s" % (version, path), content) + self.register("/%s/%s" % (version, path), content) + if 'user_data' not in data: + self.register("/%s/user_data" % version, "nodata", status=404) + + def test__find_working_version(self): + """Test a working version ignores unsupported.""" + unsup = "2016-11-09" + self.register_versions( + [openstack.OS_FOLSOM, openstack.OS_LIBERTY, unsup, + openstack.OS_LATEST]) + self.assertEqual( + openstack.OS_LIBERTY, + openstack.MetadataReader(self.burl)._find_working_version()) + + def test__find_working_version_uses_latest(self): + """'latest' should be used if no supported versions.""" + unsup1, unsup2 = ("2016-11-09", '2017-06-06') + self.register_versions([unsup1, unsup2, openstack.OS_LATEST]) + self.assertEqual( + openstack.OS_LATEST, + openstack.MetadataReader(self.burl)._find_working_version()) + + def test_read_v2_os_ocata(self): + """Validate return value of read_v2 for os_ocata data.""" + md = copy.deepcopy(self.md_base) + md['devices'] = [] + network_data = {'links': [], 'networks': [], 'services': []} + vendor_data = {} + vendor_data2 = {"static": {}} + + data = { + 'meta_data.json': json.dumps(md), + 'network_data.json': json.dumps(network_data), + 'vendor_data.json': json.dumps(vendor_data), + 'vendor_data2.json': json.dumps(vendor_data2), + } + + self.register_versions([openstack.OS_OCATA, openstack.OS_LATEST]) + self.register_version(openstack.OS_OCATA, data) + + mock_read_ec2 = test_helpers.mock.MagicMock( + return_value={'instance-id': 'unused-ec2'}) + expected_md = copy.deepcopy(md) + expected_md.update( + {'instance-id': md['uuid'], 'local-hostname': md['hostname']}) + expected = { + 'userdata': '', # Annoying, no user-data results in empty string. + 'version': 2, + 'metadata': expected_md, + 'vendordata': vendor_data, + 'networkdata': network_data, + 'ec2-metadata': mock_read_ec2.return_value, + 'files': {}, + } + reader = openstack.MetadataReader(self.burl) + reader._read_ec2_metadata = mock_read_ec2 + self.assertEqual(expected, reader.read_v2()) + self.assertEqual(1, mock_read_ec2.call_count) + + # vi: ts=4 expandtab diff --git a/tests/unittests/test_datasource/test_ovf.py b/tests/unittests/test_datasource/test_ovf.py index fc4eb36e..9d52eb99 100644 --- a/tests/unittests/test_datasource/test_ovf.py +++ b/tests/unittests/test_datasource/test_ovf.py @@ -124,7 +124,9 @@ class TestDatasourceOVF(CiTestCase): ds = self.datasource(sys_cfg={}, distro={}, paths=paths) retcode = wrap_and_call( 'cloudinit.sources.DataSourceOVF', - {'util.read_dmi_data': None}, + {'util.read_dmi_data': None, + 'transport_iso9660': (False, None, None), + 'transport_vmware_guestd': (False, None, None)}, ds.get_data) self.assertFalse(retcode, 'Expected False return from ds.get_data') self.assertIn( @@ -138,7 +140,9 @@ class TestDatasourceOVF(CiTestCase): paths=paths) retcode = wrap_and_call( 'cloudinit.sources.DataSourceOVF', - {'util.read_dmi_data': 'vmware'}, + {'util.read_dmi_data': 'vmware', + 'transport_iso9660': (False, None, None), + 'transport_vmware_guestd': (False, None, None)}, ds.get_data) self.assertFalse(retcode, 'Expected False return from ds.get_data') self.assertIn( diff --git a/tests/unittests/test_datasource/test_scaleway.py b/tests/unittests/test_datasource/test_scaleway.py index e4e9bb20..c2bc7a00 100644 --- a/tests/unittests/test_datasource/test_scaleway.py +++ b/tests/unittests/test_datasource/test_scaleway.py @@ -176,11 +176,18 @@ class TestDataSourceScaleway(HttprettyTestCase): self.vendordata_url = \ DataSourceScaleway.BUILTIN_DS_CONFIG['vendordata_url'] + self.add_patch('cloudinit.sources.DataSourceScaleway.on_scaleway', + '_m_on_scaleway', return_value=True) + self.add_patch( + 'cloudinit.sources.DataSourceScaleway.net.find_fallback_nic', + '_m_find_fallback_nic', return_value='scalewaynic0') + + @mock.patch('cloudinit.sources.DataSourceScaleway.EphemeralDHCPv4') @mock.patch('cloudinit.sources.DataSourceScaleway.SourceAddressAdapter', get_source_address_adapter) @mock.patch('cloudinit.util.get_cmdline') @mock.patch('time.sleep', return_value=None) - def test_metadata_ok(self, sleep, m_get_cmdline): + def test_metadata_ok(self, sleep, m_get_cmdline, dhcpv4): """ get_data() returns metadata, user data and vendor data. """ @@ -211,11 +218,12 @@ class TestDataSourceScaleway(HttprettyTestCase): self.assertIsNone(self.datasource.region) self.assertEqual(sleep.call_count, 0) + @mock.patch('cloudinit.sources.DataSourceScaleway.EphemeralDHCPv4') @mock.patch('cloudinit.sources.DataSourceScaleway.SourceAddressAdapter', get_source_address_adapter) @mock.patch('cloudinit.util.get_cmdline') @mock.patch('time.sleep', return_value=None) - def test_metadata_404(self, sleep, m_get_cmdline): + def test_metadata_404(self, sleep, m_get_cmdline, dhcpv4): """ get_data() returns metadata, but no user data nor vendor data. """ @@ -234,11 +242,12 @@ class TestDataSourceScaleway(HttprettyTestCase): self.assertIsNone(self.datasource.get_vendordata_raw()) self.assertEqual(sleep.call_count, 0) + @mock.patch('cloudinit.sources.DataSourceScaleway.EphemeralDHCPv4') @mock.patch('cloudinit.sources.DataSourceScaleway.SourceAddressAdapter', get_source_address_adapter) @mock.patch('cloudinit.util.get_cmdline') @mock.patch('time.sleep', return_value=None) - def test_metadata_rate_limit(self, sleep, m_get_cmdline): + def test_metadata_rate_limit(self, sleep, m_get_cmdline, dhcpv4): """ get_data() is rate limited two times by the metadata API when fetching user data. @@ -262,3 +271,67 @@ class TestDataSourceScaleway(HttprettyTestCase): self.assertEqual(self.datasource.get_userdata_raw(), DataResponses.FAKE_USER_DATA) self.assertEqual(sleep.call_count, 2) + + @mock.patch('cloudinit.sources.DataSourceScaleway.net.find_fallback_nic') + @mock.patch('cloudinit.util.get_cmdline') + def test_network_config_ok(self, m_get_cmdline, fallback_nic): + """ + network_config will only generate IPv4 config if no ipv6 data is + available in the metadata + """ + m_get_cmdline.return_value = 'scaleway' + fallback_nic.return_value = 'ens2' + self.datasource.metadata['ipv6'] = None + + netcfg = self.datasource.network_config + resp = {'version': 1, + 'config': [{ + 'type': 'physical', + 'name': 'ens2', + 'subnets': [{'type': 'dhcp4'}]}] + } + self.assertEqual(netcfg, resp) + + @mock.patch('cloudinit.sources.DataSourceScaleway.net.find_fallback_nic') + @mock.patch('cloudinit.util.get_cmdline') + def test_network_config_ipv6_ok(self, m_get_cmdline, fallback_nic): + """ + network_config will only generate IPv4/v6 configs if ipv6 data is + available in the metadata + """ + m_get_cmdline.return_value = 'scaleway' + fallback_nic.return_value = 'ens2' + self.datasource.metadata['ipv6'] = { + 'address': '2000:abc:4444:9876::42:999', + 'gateway': '2000:abc:4444:9876::42:000', + 'netmask': '127', + } + + netcfg = self.datasource.network_config + resp = {'version': 1, + 'config': [{ + 'type': 'physical', + 'name': 'ens2', + 'subnets': [{'type': 'dhcp4'}, + {'type': 'static', + 'address': '2000:abc:4444:9876::42:999', + 'gateway': '2000:abc:4444:9876::42:000', + 'netmask': '127', } + ] + + }] + } + self.assertEqual(netcfg, resp) + + @mock.patch('cloudinit.sources.DataSourceScaleway.net.find_fallback_nic') + @mock.patch('cloudinit.util.get_cmdline') + def test_network_config_existing(self, m_get_cmdline, fallback_nic): + """ + network_config() should return the same data if a network config + already exists + """ + m_get_cmdline.return_value = 'scaleway' + self.datasource._network_config = '0xdeadbeef' + + netcfg = self.datasource.network_config + self.assertEqual(netcfg, '0xdeadbeef') diff --git a/tests/unittests/test_datasource/test_smartos.py b/tests/unittests/test_datasource/test_smartos.py index dca0b3d4..46d67b94 100644 --- a/tests/unittests/test_datasource/test_smartos.py +++ b/tests/unittests/test_datasource/test_smartos.py @@ -20,10 +20,8 @@ import multiprocessing import os import os.path import re -import shutil import signal import stat -import tempfile import unittest2 import uuid @@ -31,15 +29,27 @@ from cloudinit import serial from cloudinit.sources import DataSourceSmartOS from cloudinit.sources.DataSourceSmartOS import ( convert_smartos_network_data as convert_net, - SMARTOS_ENV_KVM, SERIAL_DEVICE, get_smartos_environ) + SMARTOS_ENV_KVM, SERIAL_DEVICE, get_smartos_environ, + identify_file) import six from cloudinit import helpers as c_helpers -from cloudinit.util import (b64e, subp) +from cloudinit.util import ( + b64e, subp, ProcessExecutionError, which, write_file) -from cloudinit.tests.helpers import mock, FilesystemMockingTestCase, TestCase +from cloudinit.tests.helpers import ( + CiTestCase, mock, FilesystemMockingTestCase, skipIf) + +try: + import serial as _pyserial + assert _pyserial # avoid pyflakes error F401: import unused + HAS_PYSERIAL = True +except ImportError: + HAS_PYSERIAL = False + +DSMOS = 'cloudinit.sources.DataSourceSmartOS' SDC_NICS = json.loads(""" [ { @@ -366,37 +376,33 @@ class PsuedoJoyentClient(object): class TestSmartOSDataSource(FilesystemMockingTestCase): + jmc_cfact = None + get_smartos_environ = None + def setUp(self): super(TestSmartOSDataSource, self).setUp() - dsmos = 'cloudinit.sources.DataSourceSmartOS' - patcher = mock.patch(dsmos + ".jmc_client_factory") - self.jmc_cfact = patcher.start() - self.addCleanup(patcher.stop) - patcher = mock.patch(dsmos + ".get_smartos_environ") - self.get_smartos_environ = patcher.start() - self.addCleanup(patcher.stop) - - self.tmp = tempfile.mkdtemp() - self.addCleanup(shutil.rmtree, self.tmp) - self.paths = c_helpers.Paths( - {'cloud_dir': self.tmp, 'run_dir': self.tmp}) - - self.legacy_user_d = os.path.join(self.tmp, 'legacy_user_tmp') + self.add_patch(DSMOS + ".get_smartos_environ", "get_smartos_environ") + self.add_patch(DSMOS + ".jmc_client_factory", "jmc_cfact") + self.legacy_user_d = self.tmp_path('legacy_user_tmp') os.mkdir(self.legacy_user_d) - - self.orig_lud = DataSourceSmartOS.LEGACY_USER_D - DataSourceSmartOS.LEGACY_USER_D = self.legacy_user_d - - def tearDown(self): - DataSourceSmartOS.LEGACY_USER_D = self.orig_lud - super(TestSmartOSDataSource, self).tearDown() + self.add_patch(DSMOS + ".LEGACY_USER_D", "m_legacy_user_d", + autospec=False, new=self.legacy_user_d) + self.add_patch(DSMOS + ".identify_file", "m_identify_file", + return_value="text/plain") def _get_ds(self, mockdata=None, mode=DataSourceSmartOS.SMARTOS_ENV_KVM, sys_cfg=None, ds_cfg=None): self.jmc_cfact.return_value = PsuedoJoyentClient(mockdata) self.get_smartos_environ.return_value = mode + tmpd = self.tmp_dir() + dirs = {'cloud_dir': self.tmp_path('cloud_dir', tmpd), + 'run_dir': self.tmp_path('run_dir')} + for d in dirs.values(): + os.mkdir(d) + paths = c_helpers.Paths(dirs) + if sys_cfg is None: sys_cfg = {} @@ -405,7 +411,7 @@ class TestSmartOSDataSource(FilesystemMockingTestCase): sys_cfg['datasource']['SmartOS'] = ds_cfg return DataSourceSmartOS.DataSourceSmartOS( - sys_cfg, distro=None, paths=self.paths) + sys_cfg, distro=None, paths=paths) def test_no_base64(self): ds_cfg = {'no_base64_decode': ['test_var1'], 'all_base': True} @@ -493,6 +499,7 @@ class TestSmartOSDataSource(FilesystemMockingTestCase): dsrc.metadata['user-script']) legacy_script_f = "%s/user-script" % self.legacy_user_d + print("legacy_script_f=%s" % legacy_script_f) self.assertTrue(os.path.exists(legacy_script_f)) self.assertTrue(os.path.islink(legacy_script_f)) user_script_perm = oct(os.stat(legacy_script_f)[stat.ST_MODE])[-3:] @@ -640,6 +647,28 @@ class TestSmartOSDataSource(FilesystemMockingTestCase): mydscfg['disk_aliases']['FOO']) +class TestIdentifyFile(CiTestCase): + """Test the 'identify_file' utility.""" + @skipIf(not which("file"), "command 'file' not available.") + def test_file_happy_path(self): + """Test file is available and functional on plain text.""" + fname = self.tmp_path("myfile") + write_file(fname, "plain text content here\n") + with self.allow_subp(["file"]): + self.assertEqual("text/plain", identify_file(fname)) + + @mock.patch(DSMOS + ".util.subp") + def test_returns_none_on_error(self, m_subp): + """On 'file' execution error, None should be returned.""" + m_subp.side_effect = ProcessExecutionError("FILE_FAILED", exit_code=99) + fname = self.tmp_path("myfile") + write_file(fname, "plain text content here\n") + self.assertEqual(None, identify_file(fname)) + self.assertEqual( + [mock.call(["file", "--brief", "--mime-type", fname])], + m_subp.call_args_list) + + class ShortReader(object): """Implements a 'read' interface for bytes provided. much like io.BytesIO but the 'endbyte' acts as if EOF. @@ -893,7 +922,7 @@ class TestJoyentMetadataClient(FilesystemMockingTestCase): self.assertEqual(client.list(), []) -class TestNetworkConversion(TestCase): +class TestNetworkConversion(CiTestCase): def test_convert_simple(self): expected = { 'version': 1, @@ -1058,7 +1087,8 @@ class TestNetworkConversion(TestCase): "Only supported on KVM and bhyve guests under SmartOS") @unittest2.skipUnless(os.access(SERIAL_DEVICE, os.W_OK), "Requires write access to " + SERIAL_DEVICE) -class TestSerialConcurrency(TestCase): +@unittest2.skipUnless(HAS_PYSERIAL is True, "pyserial not available") +class TestSerialConcurrency(CiTestCase): """ This class tests locking on an actual serial port, and as such can only be run in a kvm or bhyve guest running on a SmartOS host. A test run on @@ -1066,7 +1096,11 @@ class TestSerialConcurrency(TestCase): there is only one session over a connection. In contrast, in the absence of proper locking multiple processes opening the same serial port can corrupt each others' exchanges with the metadata server. + + This takes on the order of 2 to 3 minutes to run. """ + allowed_subp = ['mdata-get'] + def setUp(self): self.mdata_proc = multiprocessing.Process(target=self.start_mdata_loop) self.mdata_proc.start() @@ -1097,7 +1131,7 @@ class TestSerialConcurrency(TestCase): keys = [tup[0] for tup in ds.SMARTOS_ATTRIB_MAP.values()] keys.extend(ds.SMARTOS_ATTRIB_JSON.values()) - client = ds.jmc_client_factory() + client = ds.jmc_client_factory(smartos_type=SMARTOS_ENV_KVM) self.assertIsNotNone(client) # The behavior that we are testing for was observed mdata-get running |