summaryrefslogtreecommitdiff
path: root/cloudinit/sources/tests
diff options
context:
space:
mode:
Diffstat (limited to 'cloudinit/sources/tests')
-rw-r--r--cloudinit/sources/tests/__init__.py0
-rw-r--r--cloudinit/sources/tests/test_init.py759
-rw-r--r--cloudinit/sources/tests/test_oracle.py785
3 files changed, 0 insertions, 1544 deletions
diff --git a/cloudinit/sources/tests/__init__.py b/cloudinit/sources/tests/__init__.py
deleted file mode 100644
index e69de29b..00000000
--- a/cloudinit/sources/tests/__init__.py
+++ /dev/null
diff --git a/cloudinit/sources/tests/test_init.py b/cloudinit/sources/tests/test_init.py
deleted file mode 100644
index 1420a988..00000000
--- a/cloudinit/sources/tests/test_init.py
+++ /dev/null
@@ -1,759 +0,0 @@
-# This file is part of cloud-init. See LICENSE file for license information.
-
-import copy
-import inspect
-import os
-import stat
-
-from cloudinit.event import EventType
-from cloudinit.helpers import Paths
-from cloudinit import importer
-from cloudinit.sources import (
- EXPERIMENTAL_TEXT, INSTANCE_JSON_FILE, INSTANCE_JSON_SENSITIVE_FILE,
- METADATA_UNKNOWN, REDACT_SENSITIVE_VALUE, UNSET, DataSource,
- canonical_cloud_id, redact_sensitive_keys)
-from cloudinit.tests.helpers import CiTestCase, mock
-from cloudinit.user_data import UserDataProcessor
-from cloudinit import util
-
-
-class DataSourceTestSubclassNet(DataSource):
-
- dsname = 'MyTestSubclass'
- url_max_wait = 55
-
- def __init__(self, sys_cfg, distro, paths, custom_metadata=None,
- custom_userdata=None, get_data_retval=True):
- super(DataSourceTestSubclassNet, self).__init__(
- sys_cfg, distro, paths)
- self._custom_userdata = custom_userdata
- self._custom_metadata = custom_metadata
- self._get_data_retval = get_data_retval
-
- def _get_cloud_name(self):
- return 'SubclassCloudName'
-
- def _get_data(self):
- if self._custom_metadata:
- self.metadata = self._custom_metadata
- else:
- self.metadata = {'availability_zone': 'myaz',
- 'local-hostname': 'test-subclass-hostname',
- 'region': 'myregion'}
- if self._custom_userdata:
- self.userdata_raw = self._custom_userdata
- else:
- self.userdata_raw = 'userdata_raw'
- self.vendordata_raw = 'vendordata_raw'
- return self._get_data_retval
-
-
-class InvalidDataSourceTestSubclassNet(DataSource):
- pass
-
-
-class TestDataSource(CiTestCase):
-
- with_logs = True
- maxDiff = None
-
- def setUp(self):
- super(TestDataSource, self).setUp()
- self.sys_cfg = {'datasource': {'_undef': {'key1': False}}}
- self.distro = 'distrotest' # generally should be a Distro object
- self.paths = Paths({})
- self.datasource = DataSource(self.sys_cfg, self.distro, self.paths)
-
- def test_datasource_init(self):
- """DataSource initializes metadata attributes, ds_cfg and ud_proc."""
- self.assertEqual(self.paths, self.datasource.paths)
- self.assertEqual(self.sys_cfg, self.datasource.sys_cfg)
- self.assertEqual(self.distro, self.datasource.distro)
- self.assertIsNone(self.datasource.userdata)
- self.assertEqual({}, self.datasource.metadata)
- self.assertIsNone(self.datasource.userdata_raw)
- self.assertIsNone(self.datasource.vendordata)
- self.assertIsNone(self.datasource.vendordata_raw)
- self.assertEqual({'key1': False}, self.datasource.ds_cfg)
- self.assertIsInstance(self.datasource.ud_proc, UserDataProcessor)
-
- def test_datasource_init_gets_ds_cfg_using_dsname(self):
- """Init uses DataSource.dsname for sourcing ds_cfg."""
- sys_cfg = {'datasource': {'MyTestSubclass': {'key2': False}}}
- distro = 'distrotest' # generally should be a Distro object
- datasource = DataSourceTestSubclassNet(sys_cfg, distro, self.paths)
- self.assertEqual({'key2': False}, datasource.ds_cfg)
-
- def test_str_is_classname(self):
- """The string representation of the datasource is the classname."""
- self.assertEqual('DataSource', str(self.datasource))
- self.assertEqual(
- 'DataSourceTestSubclassNet',
- str(DataSourceTestSubclassNet('', '', self.paths)))
-
- def test_datasource_get_url_params_defaults(self):
- """get_url_params default url config settings for the datasource."""
- params = self.datasource.get_url_params()
- self.assertEqual(params.max_wait_seconds, self.datasource.url_max_wait)
- self.assertEqual(params.timeout_seconds, self.datasource.url_timeout)
- self.assertEqual(params.num_retries, self.datasource.url_retries)
-
- def test_datasource_get_url_params_subclassed(self):
- """Subclasses can override get_url_params defaults."""
- sys_cfg = {'datasource': {'MyTestSubclass': {'key2': False}}}
- distro = 'distrotest' # generally should be a Distro object
- datasource = DataSourceTestSubclassNet(sys_cfg, distro, self.paths)
- expected = (datasource.url_max_wait, datasource.url_timeout,
- datasource.url_retries)
- url_params = datasource.get_url_params()
- self.assertNotEqual(self.datasource.get_url_params(), url_params)
- self.assertEqual(expected, url_params)
-
- def test_datasource_get_url_params_ds_config_override(self):
- """Datasource configuration options can override url param defaults."""
- sys_cfg = {
- 'datasource': {
- 'MyTestSubclass': {
- 'max_wait': '1', 'timeout': '2', 'retries': '3'}}}
- datasource = DataSourceTestSubclassNet(
- sys_cfg, self.distro, self.paths)
- expected = (1, 2, 3)
- url_params = datasource.get_url_params()
- self.assertNotEqual(
- (datasource.url_max_wait, datasource.url_timeout,
- datasource.url_retries),
- url_params)
- self.assertEqual(expected, url_params)
-
- def test_datasource_get_url_params_is_zero_or_greater(self):
- """get_url_params ignores timeouts with a value below 0."""
- # Set an override that is below 0 which gets ignored.
- sys_cfg = {'datasource': {'_undef': {'timeout': '-1'}}}
- datasource = DataSource(sys_cfg, self.distro, self.paths)
- (_max_wait, timeout, _retries) = datasource.get_url_params()
- self.assertEqual(0, timeout)
-
- def test_datasource_get_url_uses_defaults_on_errors(self):
- """On invalid system config values for url_params defaults are used."""
- # All invalid values should be logged
- sys_cfg = {'datasource': {
- '_undef': {
- 'max_wait': 'nope', 'timeout': 'bug', 'retries': 'nonint'}}}
- datasource = DataSource(sys_cfg, self.distro, self.paths)
- url_params = datasource.get_url_params()
- expected = (datasource.url_max_wait, datasource.url_timeout,
- datasource.url_retries)
- self.assertEqual(expected, url_params)
- logs = self.logs.getvalue()
- expected_logs = [
- "Config max_wait 'nope' is not an int, using default '-1'",
- "Config timeout 'bug' is not an int, using default '10'",
- "Config retries 'nonint' is not an int, using default '5'",
- ]
- for log in expected_logs:
- self.assertIn(log, logs)
-
- @mock.patch('cloudinit.sources.net.find_fallback_nic')
- def test_fallback_interface_is_discovered(self, m_get_fallback_nic):
- """The fallback_interface is discovered via find_fallback_nic."""
- m_get_fallback_nic.return_value = 'nic9'
- self.assertEqual('nic9', self.datasource.fallback_interface)
-
- @mock.patch('cloudinit.sources.net.find_fallback_nic')
- def test_fallback_interface_logs_undiscovered(self, m_get_fallback_nic):
- """Log a warning when fallback_interface can not discover the nic."""
- self.datasource._cloud_name = 'MySupahCloud'
- m_get_fallback_nic.return_value = None # Couldn't discover nic
- self.assertIsNone(self.datasource.fallback_interface)
- self.assertEqual(
- 'WARNING: Did not find a fallback interface on MySupahCloud.\n',
- self.logs.getvalue())
-
- @mock.patch('cloudinit.sources.net.find_fallback_nic')
- def test_wb_fallback_interface_is_cached(self, m_get_fallback_nic):
- """The fallback_interface is cached and won't be rediscovered."""
- self.datasource._fallback_interface = 'nic10'
- self.assertEqual('nic10', self.datasource.fallback_interface)
- m_get_fallback_nic.assert_not_called()
-
- def test__get_data_unimplemented(self):
- """Raise an error when _get_data is not implemented."""
- with self.assertRaises(NotImplementedError) as context_manager:
- self.datasource.get_data()
- self.assertIn(
- 'Subclasses of DataSource must implement _get_data',
- str(context_manager.exception))
- datasource2 = InvalidDataSourceTestSubclassNet(
- self.sys_cfg, self.distro, self.paths)
- with self.assertRaises(NotImplementedError) as context_manager:
- datasource2.get_data()
- self.assertIn(
- 'Subclasses of DataSource must implement _get_data',
- str(context_manager.exception))
-
- def test_get_data_calls_subclass__get_data(self):
- """Datasource.get_data uses the subclass' version of _get_data."""
- tmp = self.tmp_dir()
- datasource = DataSourceTestSubclassNet(
- self.sys_cfg, self.distro, Paths({'run_dir': tmp}))
- self.assertTrue(datasource.get_data())
- self.assertEqual(
- {'availability_zone': 'myaz',
- 'local-hostname': 'test-subclass-hostname',
- 'region': 'myregion'},
- datasource.metadata)
- self.assertEqual('userdata_raw', datasource.userdata_raw)
- self.assertEqual('vendordata_raw', datasource.vendordata_raw)
-
- def test_get_hostname_strips_local_hostname_without_domain(self):
- """Datasource.get_hostname strips metadata local-hostname of domain."""
- tmp = self.tmp_dir()
- datasource = DataSourceTestSubclassNet(
- self.sys_cfg, self.distro, Paths({'run_dir': tmp}))
- self.assertTrue(datasource.get_data())
- self.assertEqual(
- 'test-subclass-hostname', datasource.metadata['local-hostname'])
- self.assertEqual('test-subclass-hostname', datasource.get_hostname())
- datasource.metadata['local-hostname'] = 'hostname.my.domain.com'
- self.assertEqual('hostname', datasource.get_hostname())
-
- def test_get_hostname_with_fqdn_returns_local_hostname_with_domain(self):
- """Datasource.get_hostname with fqdn set gets qualified hostname."""
- tmp = self.tmp_dir()
- datasource = DataSourceTestSubclassNet(
- self.sys_cfg, self.distro, Paths({'run_dir': tmp}))
- self.assertTrue(datasource.get_data())
- datasource.metadata['local-hostname'] = 'hostname.my.domain.com'
- self.assertEqual(
- 'hostname.my.domain.com', datasource.get_hostname(fqdn=True))
-
- def test_get_hostname_without_metadata_uses_system_hostname(self):
- """Datasource.gethostname runs util.get_hostname when no metadata."""
- tmp = self.tmp_dir()
- datasource = DataSourceTestSubclassNet(
- self.sys_cfg, self.distro, Paths({'run_dir': tmp}))
- self.assertEqual({}, datasource.metadata)
- mock_fqdn = 'cloudinit.sources.util.get_fqdn_from_hosts'
- with mock.patch('cloudinit.sources.util.get_hostname') as m_gethost:
- with mock.patch(mock_fqdn) as m_fqdn:
- m_gethost.return_value = 'systemhostname.domain.com'
- m_fqdn.return_value = None # No maching fqdn in /etc/hosts
- self.assertEqual('systemhostname', datasource.get_hostname())
- self.assertEqual(
- 'systemhostname.domain.com',
- datasource.get_hostname(fqdn=True))
-
- def test_get_hostname_without_metadata_returns_none(self):
- """Datasource.gethostname returns None when metadata_only and no MD."""
- tmp = self.tmp_dir()
- datasource = DataSourceTestSubclassNet(
- self.sys_cfg, self.distro, Paths({'run_dir': tmp}))
- self.assertEqual({}, datasource.metadata)
- mock_fqdn = 'cloudinit.sources.util.get_fqdn_from_hosts'
- with mock.patch('cloudinit.sources.util.get_hostname') as m_gethost:
- with mock.patch(mock_fqdn) as m_fqdn:
- self.assertIsNone(datasource.get_hostname(metadata_only=True))
- self.assertIsNone(
- datasource.get_hostname(fqdn=True, metadata_only=True))
- self.assertEqual([], m_gethost.call_args_list)
- self.assertEqual([], m_fqdn.call_args_list)
-
- def test_get_hostname_without_metadata_prefers_etc_hosts(self):
- """Datasource.gethostname prefers /etc/hosts to util.get_hostname."""
- tmp = self.tmp_dir()
- datasource = DataSourceTestSubclassNet(
- self.sys_cfg, self.distro, Paths({'run_dir': tmp}))
- self.assertEqual({}, datasource.metadata)
- mock_fqdn = 'cloudinit.sources.util.get_fqdn_from_hosts'
- with mock.patch('cloudinit.sources.util.get_hostname') as m_gethost:
- with mock.patch(mock_fqdn) as m_fqdn:
- m_gethost.return_value = 'systemhostname.domain.com'
- m_fqdn.return_value = 'fqdnhostname.domain.com'
- self.assertEqual('fqdnhostname', datasource.get_hostname())
- self.assertEqual('fqdnhostname.domain.com',
- datasource.get_hostname(fqdn=True))
-
- def test_get_data_does_not_write_instance_data_on_failure(self):
- """get_data does not write INSTANCE_JSON_FILE on get_data False."""
- tmp = self.tmp_dir()
- datasource = DataSourceTestSubclassNet(
- self.sys_cfg, self.distro, Paths({'run_dir': tmp}),
- get_data_retval=False)
- self.assertFalse(datasource.get_data())
- json_file = self.tmp_path(INSTANCE_JSON_FILE, tmp)
- self.assertFalse(
- os.path.exists(json_file), 'Found unexpected file %s' % json_file)
-
- def test_get_data_writes_json_instance_data_on_success(self):
- """get_data writes INSTANCE_JSON_FILE to run_dir as world readable."""
- tmp = self.tmp_dir()
- datasource = DataSourceTestSubclassNet(
- self.sys_cfg, self.distro, Paths({'run_dir': tmp}))
- sys_info = {
- "python": "3.7",
- "platform":
- "Linux-5.4.0-24-generic-x86_64-with-Ubuntu-20.04-focal",
- "uname": ["Linux", "myhost", "5.4.0-24-generic", "SMP blah",
- "x86_64"],
- "variant": "ubuntu", "dist": ["ubuntu", "20.04", "focal"]}
- with mock.patch("cloudinit.util.system_info", return_value=sys_info):
- datasource.get_data()
- json_file = self.tmp_path(INSTANCE_JSON_FILE, tmp)
- content = util.load_file(json_file)
- expected = {
- 'base64_encoded_keys': [],
- 'merged_cfg': REDACT_SENSITIVE_VALUE,
- 'sensitive_keys': ['merged_cfg'],
- 'sys_info': sys_info,
- 'v1': {
- '_beta_keys': ['subplatform'],
- 'availability-zone': 'myaz',
- 'availability_zone': 'myaz',
- 'cloud-name': 'subclasscloudname',
- 'cloud_name': 'subclasscloudname',
- 'distro': 'ubuntu',
- 'distro_release': 'focal',
- 'distro_version': '20.04',
- 'instance-id': 'iid-datasource',
- 'instance_id': 'iid-datasource',
- 'local-hostname': 'test-subclass-hostname',
- 'local_hostname': 'test-subclass-hostname',
- 'kernel_release': '5.4.0-24-generic',
- 'machine': 'x86_64',
- 'platform': 'mytestsubclass',
- 'public_ssh_keys': [],
- 'python_version': '3.7',
- 'region': 'myregion',
- 'system_platform':
- 'Linux-5.4.0-24-generic-x86_64-with-Ubuntu-20.04-focal',
- 'subplatform': 'unknown',
- 'variant': 'ubuntu'},
- 'ds': {
-
- '_doc': EXPERIMENTAL_TEXT,
- 'meta_data': {'availability_zone': 'myaz',
- 'local-hostname': 'test-subclass-hostname',
- 'region': 'myregion'}}}
- self.assertEqual(expected, util.load_json(content))
- file_stat = os.stat(json_file)
- self.assertEqual(0o644, stat.S_IMODE(file_stat.st_mode))
- self.assertEqual(expected, util.load_json(content))
-
- def test_get_data_writes_redacted_public_json_instance_data(self):
- """get_data writes redacted content to public INSTANCE_JSON_FILE."""
- tmp = self.tmp_dir()
- datasource = DataSourceTestSubclassNet(
- self.sys_cfg, self.distro, Paths({'run_dir': tmp}),
- custom_metadata={
- 'availability_zone': 'myaz',
- 'local-hostname': 'test-subclass-hostname',
- 'region': 'myregion',
- 'some': {'security-credentials': {
- 'cred1': 'sekret', 'cred2': 'othersekret'}}})
- self.assertCountEqual(
- ('merged_cfg', 'security-credentials',),
- datasource.sensitive_metadata_keys)
- sys_info = {
- "python": "3.7",
- "platform":
- "Linux-5.4.0-24-generic-x86_64-with-Ubuntu-20.04-focal",
- "uname": ["Linux", "myhost", "5.4.0-24-generic", "SMP blah",
- "x86_64"],
- "variant": "ubuntu", "dist": ["ubuntu", "20.04", "focal"]}
- with mock.patch("cloudinit.util.system_info", return_value=sys_info):
- datasource.get_data()
- json_file = self.tmp_path(INSTANCE_JSON_FILE, tmp)
- redacted = util.load_json(util.load_file(json_file))
- expected = {
- 'base64_encoded_keys': [],
- 'merged_cfg': REDACT_SENSITIVE_VALUE,
- 'sensitive_keys': [
- 'ds/meta_data/some/security-credentials', 'merged_cfg'],
- 'sys_info': sys_info,
- 'v1': {
- '_beta_keys': ['subplatform'],
- 'availability-zone': 'myaz',
- 'availability_zone': 'myaz',
- 'cloud-name': 'subclasscloudname',
- 'cloud_name': 'subclasscloudname',
- 'distro': 'ubuntu',
- 'distro_release': 'focal',
- 'distro_version': '20.04',
- 'instance-id': 'iid-datasource',
- 'instance_id': 'iid-datasource',
- 'local-hostname': 'test-subclass-hostname',
- 'local_hostname': 'test-subclass-hostname',
- 'kernel_release': '5.4.0-24-generic',
- 'machine': 'x86_64',
- 'platform': 'mytestsubclass',
- 'public_ssh_keys': [],
- 'python_version': '3.7',
- 'region': 'myregion',
- 'system_platform':
- 'Linux-5.4.0-24-generic-x86_64-with-Ubuntu-20.04-focal',
- 'subplatform': 'unknown',
- 'variant': 'ubuntu'},
- 'ds': {
- '_doc': EXPERIMENTAL_TEXT,
- 'meta_data': {
- 'availability_zone': 'myaz',
- 'local-hostname': 'test-subclass-hostname',
- 'region': 'myregion',
- 'some': {'security-credentials': REDACT_SENSITIVE_VALUE}}}
- }
- self.assertCountEqual(expected, redacted)
- file_stat = os.stat(json_file)
- self.assertEqual(0o644, stat.S_IMODE(file_stat.st_mode))
-
- def test_get_data_writes_json_instance_data_sensitive(self):
- """
- get_data writes unmodified data to sensitive file as root-readonly.
- """
- tmp = self.tmp_dir()
- datasource = DataSourceTestSubclassNet(
- self.sys_cfg, self.distro, Paths({'run_dir': tmp}),
- custom_metadata={
- 'availability_zone': 'myaz',
- 'local-hostname': 'test-subclass-hostname',
- 'region': 'myregion',
- 'some': {'security-credentials': {
- 'cred1': 'sekret', 'cred2': 'othersekret'}}})
- sys_info = {
- "python": "3.7",
- "platform":
- "Linux-5.4.0-24-generic-x86_64-with-Ubuntu-20.04-focal",
- "uname": ["Linux", "myhost", "5.4.0-24-generic", "SMP blah",
- "x86_64"],
- "variant": "ubuntu", "dist": ["ubuntu", "20.04", "focal"]}
-
- self.assertCountEqual(
- ('merged_cfg', 'security-credentials',),
- datasource.sensitive_metadata_keys)
- with mock.patch("cloudinit.util.system_info", return_value=sys_info):
- datasource.get_data()
- sensitive_json_file = self.tmp_path(INSTANCE_JSON_SENSITIVE_FILE, tmp)
- content = util.load_file(sensitive_json_file)
- expected = {
- 'base64_encoded_keys': [],
- 'merged_cfg': {
- '_doc': (
- 'Merged cloud-init system config from '
- '/etc/cloud/cloud.cfg and /etc/cloud/cloud.cfg.d/'
- ),
- 'datasource': {'_undef': {'key1': False}}},
- 'sensitive_keys': [
- 'ds/meta_data/some/security-credentials', 'merged_cfg'],
- 'sys_info': sys_info,
- 'v1': {
- '_beta_keys': ['subplatform'],
- 'availability-zone': 'myaz',
- 'availability_zone': 'myaz',
- 'cloud-name': 'subclasscloudname',
- 'cloud_name': 'subclasscloudname',
- 'distro': 'ubuntu',
- 'distro_release': 'focal',
- 'distro_version': '20.04',
- 'instance-id': 'iid-datasource',
- 'instance_id': 'iid-datasource',
- 'kernel_release': '5.4.0-24-generic',
- 'local-hostname': 'test-subclass-hostname',
- 'local_hostname': 'test-subclass-hostname',
- 'machine': 'x86_64',
- 'platform': 'mytestsubclass',
- 'public_ssh_keys': [],
- 'python_version': '3.7',
- 'region': 'myregion',
- 'subplatform': 'unknown',
- 'system_platform':
- 'Linux-5.4.0-24-generic-x86_64-with-Ubuntu-20.04-focal',
- 'variant': 'ubuntu'},
- 'ds': {
- '_doc': EXPERIMENTAL_TEXT,
- 'meta_data': {
- 'availability_zone': 'myaz',
- 'local-hostname': 'test-subclass-hostname',
- 'region': 'myregion',
- 'some': {
- 'security-credentials':
- {'cred1': 'sekret', 'cred2': 'othersekret'}}}}
- }
- self.assertCountEqual(expected, util.load_json(content))
- file_stat = os.stat(sensitive_json_file)
- self.assertEqual(0o600, stat.S_IMODE(file_stat.st_mode))
- self.assertEqual(expected, util.load_json(content))
-
- def test_get_data_handles_redacted_unserializable_content(self):
- """get_data warns unserializable content in INSTANCE_JSON_FILE."""
- tmp = self.tmp_dir()
- datasource = DataSourceTestSubclassNet(
- self.sys_cfg, self.distro, Paths({'run_dir': tmp}),
- custom_metadata={'key1': 'val1', 'key2': {'key2.1': self.paths}})
- datasource.get_data()
- json_file = self.tmp_path(INSTANCE_JSON_FILE, tmp)
- content = util.load_file(json_file)
- expected_metadata = {
- 'key1': 'val1',
- 'key2': {
- 'key2.1': "Warning: redacted unserializable type <class"
- " 'cloudinit.helpers.Paths'>"}}
- instance_json = util.load_json(content)
- self.assertEqual(
- expected_metadata, instance_json['ds']['meta_data'])
-
- def test_persist_instance_data_writes_ec2_metadata_when_set(self):
- """When ec2_metadata class attribute is set, persist to json."""
- tmp = self.tmp_dir()
- datasource = DataSourceTestSubclassNet(
- self.sys_cfg, self.distro, Paths({'run_dir': tmp}))
- datasource.ec2_metadata = UNSET
- datasource.get_data()
- json_file = self.tmp_path(INSTANCE_JSON_FILE, tmp)
- instance_data = util.load_json(util.load_file(json_file))
- self.assertNotIn('ec2_metadata', instance_data['ds'])
- datasource.ec2_metadata = {'ec2stuff': 'is good'}
- datasource.persist_instance_data()
- instance_data = util.load_json(util.load_file(json_file))
- self.assertEqual(
- {'ec2stuff': 'is good'},
- instance_data['ds']['ec2_metadata'])
-
- def test_persist_instance_data_writes_network_json_when_set(self):
- """When network_data.json class attribute is set, persist to json."""
- tmp = self.tmp_dir()
- datasource = DataSourceTestSubclassNet(
- self.sys_cfg, self.distro, Paths({'run_dir': tmp}))
- datasource.get_data()
- json_file = self.tmp_path(INSTANCE_JSON_FILE, tmp)
- instance_data = util.load_json(util.load_file(json_file))
- self.assertNotIn('network_json', instance_data['ds'])
- datasource.network_json = {'network_json': 'is good'}
- datasource.persist_instance_data()
- instance_data = util.load_json(util.load_file(json_file))
- self.assertEqual(
- {'network_json': 'is good'},
- instance_data['ds']['network_json'])
-
- def test_get_data_base64encodes_unserializable_bytes(self):
- """On py3, get_data base64encodes any unserializable content."""
- tmp = self.tmp_dir()
- datasource = DataSourceTestSubclassNet(
- self.sys_cfg, self.distro, Paths({'run_dir': tmp}),
- custom_metadata={'key1': 'val1', 'key2': {'key2.1': b'\x123'}})
- self.assertTrue(datasource.get_data())
- json_file = self.tmp_path(INSTANCE_JSON_FILE, tmp)
- content = util.load_file(json_file)
- instance_json = util.load_json(content)
- self.assertCountEqual(
- ['ds/meta_data/key2/key2.1'],
- instance_json['base64_encoded_keys'])
- self.assertEqual(
- {'key1': 'val1', 'key2': {'key2.1': 'EjM='}},
- instance_json['ds']['meta_data'])
-
- def test_get_hostname_subclass_support(self):
- """Validate get_hostname signature on all subclasses of DataSource."""
- base_args = inspect.getfullargspec(DataSource.get_hostname)
- # Import all DataSource subclasses so we can inspect them.
- modules = util.find_modules(os.path.dirname(os.path.dirname(__file__)))
- for _loc, name in modules.items():
- mod_locs, _ = importer.find_module(name, ['cloudinit.sources'], [])
- if mod_locs:
- importer.import_module(mod_locs[0])
- for child in DataSource.__subclasses__():
- if 'Test' in child.dsname:
- continue
- self.assertEqual(
- base_args,
- inspect.getfullargspec(child.get_hostname),
- '%s does not implement DataSource.get_hostname params'
- % child)
- for grandchild in child.__subclasses__():
- self.assertEqual(
- base_args,
- inspect.getfullargspec(grandchild.get_hostname),
- '%s does not implement DataSource.get_hostname params'
- % grandchild)
-
- def test_clear_cached_attrs_resets_cached_attr_class_attributes(self):
- """Class attributes listed in cached_attr_defaults are reset."""
- count = 0
- # Setup values for all cached class attributes
- for attr, value in self.datasource.cached_attr_defaults:
- setattr(self.datasource, attr, count)
- count += 1
- self.datasource._dirty_cache = True
- self.datasource.clear_cached_attrs()
- for attr, value in self.datasource.cached_attr_defaults:
- self.assertEqual(value, getattr(self.datasource, attr))
-
- def test_clear_cached_attrs_noops_on_clean_cache(self):
- """Class attributes listed in cached_attr_defaults are reset."""
- count = 0
- # Setup values for all cached class attributes
- for attr, _ in self.datasource.cached_attr_defaults:
- setattr(self.datasource, attr, count)
- count += 1
- self.datasource._dirty_cache = False # Fake clean cache
- self.datasource.clear_cached_attrs()
- count = 0
- for attr, _ in self.datasource.cached_attr_defaults:
- self.assertEqual(count, getattr(self.datasource, attr))
- count += 1
-
- def test_clear_cached_attrs_skips_non_attr_class_attributes(self):
- """Skip any cached_attr_defaults which aren't class attributes."""
- self.datasource._dirty_cache = True
- self.datasource.clear_cached_attrs()
- for attr in ('ec2_metadata', 'network_json'):
- self.assertFalse(hasattr(self.datasource, attr))
-
- def test_clear_cached_attrs_of_custom_attrs(self):
- """Custom attr_values can be passed to clear_cached_attrs."""
- self.datasource._dirty_cache = True
- cached_attr_name = self.datasource.cached_attr_defaults[0][0]
- setattr(self.datasource, cached_attr_name, 'himom')
- self.datasource.myattr = 'orig'
- self.datasource.clear_cached_attrs(
- attr_defaults=(('myattr', 'updated'),))
- self.assertEqual('himom', getattr(self.datasource, cached_attr_name))
- self.assertEqual('updated', self.datasource.myattr)
-
- def test_update_metadata_only_acts_on_supported_update_events(self):
- """update_metadata won't get_data on unsupported update events."""
- self.datasource.update_events['network'].discard(EventType.BOOT)
- self.assertEqual(
- {'network': set([EventType.BOOT_NEW_INSTANCE])},
- self.datasource.update_events)
-
- def fake_get_data():
- raise Exception('get_data should not be called')
-
- self.datasource.get_data = fake_get_data
- self.assertFalse(
- self.datasource.update_metadata(
- source_event_types=[EventType.BOOT]))
-
- def test_update_metadata_returns_true_on_supported_update_event(self):
- """update_metadata returns get_data response on supported events."""
-
- def fake_get_data():
- return True
-
- self.datasource.get_data = fake_get_data
- self.datasource._network_config = 'something'
- self.datasource._dirty_cache = True
- self.assertTrue(
- self.datasource.update_metadata(
- source_event_types=[
- EventType.BOOT, EventType.BOOT_NEW_INSTANCE]))
- self.assertEqual(UNSET, self.datasource._network_config)
- self.assertIn(
- "DEBUG: Update datasource metadata and network config due to"
- " events: New instance first boot",
- self.logs.getvalue())
-
-
-class TestRedactSensitiveData(CiTestCase):
-
- def test_redact_sensitive_data_noop_when_no_sensitive_keys_present(self):
- """When sensitive_keys is absent or empty from metadata do nothing."""
- md = {'my': 'data'}
- self.assertEqual(
- md, redact_sensitive_keys(md, redact_value='redacted'))
- md['sensitive_keys'] = []
- self.assertEqual(
- md, redact_sensitive_keys(md, redact_value='redacted'))
-
- def test_redact_sensitive_data_redacts_exact_match_name(self):
- """Only exact matched sensitive_keys are redacted from metadata."""
- md = {'sensitive_keys': ['md/secure'],
- 'md': {'secure': 's3kr1t', 'insecure': 'publik'}}
- secure_md = copy.deepcopy(md)
- secure_md['md']['secure'] = 'redacted'
- self.assertEqual(
- secure_md,
- redact_sensitive_keys(md, redact_value='redacted'))
-
- def test_redact_sensitive_data_does_redacts_with_default_string(self):
- """When redact_value is absent, REDACT_SENSITIVE_VALUE is used."""
- md = {'sensitive_keys': ['md/secure'],
- 'md': {'secure': 's3kr1t', 'insecure': 'publik'}}
- secure_md = copy.deepcopy(md)
- secure_md['md']['secure'] = 'redacted for non-root user'
- self.assertEqual(
- secure_md,
- redact_sensitive_keys(md))
-
-
-class TestCanonicalCloudID(CiTestCase):
-
- def test_cloud_id_returns_platform_on_unknowns(self):
- """When region and cloud_name are unknown, return platform."""
- self.assertEqual(
- 'platform',
- canonical_cloud_id(cloud_name=METADATA_UNKNOWN,
- region=METADATA_UNKNOWN,
- platform='platform'))
-
- def test_cloud_id_returns_platform_on_none(self):
- """When region and cloud_name are unknown, return platform."""
- self.assertEqual(
- 'platform',
- canonical_cloud_id(cloud_name=None,
- region=None,
- platform='platform'))
-
- def test_cloud_id_returns_cloud_name_on_unknown_region(self):
- """When region is unknown, return cloud_name."""
- for region in (None, METADATA_UNKNOWN):
- self.assertEqual(
- 'cloudname',
- canonical_cloud_id(cloud_name='cloudname',
- region=region,
- platform='platform'))
-
- def test_cloud_id_returns_platform_on_unknown_cloud_name(self):
- """When region is set but cloud_name is unknown return cloud_name."""
- self.assertEqual(
- 'platform',
- canonical_cloud_id(cloud_name=METADATA_UNKNOWN,
- region='region',
- platform='platform'))
-
- def test_cloud_id_aws_based_on_region_and_cloud_name(self):
- """When cloud_name is aws, return proper cloud-id based on region."""
- self.assertEqual(
- 'aws-china',
- canonical_cloud_id(cloud_name='aws',
- region='cn-north-1',
- platform='platform'))
- self.assertEqual(
- 'aws',
- canonical_cloud_id(cloud_name='aws',
- region='us-east-1',
- platform='platform'))
- self.assertEqual(
- 'aws-gov',
- canonical_cloud_id(cloud_name='aws',
- region='us-gov-1',
- platform='platform'))
- self.assertEqual( # Overrideen non-aws cloud_name is returned
- '!aws',
- canonical_cloud_id(cloud_name='!aws',
- region='us-gov-1',
- platform='platform'))
-
- def test_cloud_id_azure_based_on_region_and_cloud_name(self):
- """Report cloud-id when cloud_name is azure and region is in china."""
- self.assertEqual(
- 'azure-china',
- canonical_cloud_id(cloud_name='azure',
- region='chinaeast',
- platform='platform'))
- self.assertEqual(
- 'azure',
- canonical_cloud_id(cloud_name='azure',
- region='!chinaeast',
- platform='platform'))
-
-# vi: ts=4 expandtab
diff --git a/cloudinit/sources/tests/test_oracle.py b/cloudinit/sources/tests/test_oracle.py
deleted file mode 100644
index a7bbdfd9..00000000
--- a/cloudinit/sources/tests/test_oracle.py
+++ /dev/null
@@ -1,785 +0,0 @@
-# This file is part of cloud-init. See LICENSE file for license information.
-
-import base64
-import copy
-import json
-from contextlib import ExitStack
-from unittest import mock
-
-import pytest
-
-from cloudinit.sources import DataSourceOracle as oracle
-from cloudinit.sources import NetworkConfigSource
-from cloudinit.sources.DataSourceOracle import OpcMetadata
-from cloudinit.tests import helpers as test_helpers
-from cloudinit.url_helper import UrlError
-
-DS_PATH = "cloudinit.sources.DataSourceOracle"
-
-# `curl -L http://169.254.169.254/opc/v1/vnics/` on a Oracle Bare Metal Machine
-# with a secondary VNIC attached (vnicId truncated for Python line length)
-OPC_BM_SECONDARY_VNIC_RESPONSE = """\
-[ {
- "vnicId" : "ocid1.vnic.oc1.phx.abyhqljtyvcucqkhdqmgjszebxe4hrb!!TRUNCATED||",
- "privateIp" : "10.0.0.8",
- "vlanTag" : 0,
- "macAddr" : "90:e2:ba:d4:f1:68",
- "virtualRouterIp" : "10.0.0.1",
- "subnetCidrBlock" : "10.0.0.0/24",
- "nicIndex" : 0
-}, {
- "vnicId" : "ocid1.vnic.oc1.phx.abyhqljtfmkxjdy2sqidndiwrsg63zf!!TRUNCATED||",
- "privateIp" : "10.0.4.5",
- "vlanTag" : 1,
- "macAddr" : "02:00:17:05:CF:51",
- "virtualRouterIp" : "10.0.4.1",
- "subnetCidrBlock" : "10.0.4.0/24",
- "nicIndex" : 0
-} ]"""
-
-# `curl -L http://169.254.169.254/opc/v1/vnics/` on a Oracle Virtual Machine
-# with a secondary VNIC attached
-OPC_VM_SECONDARY_VNIC_RESPONSE = """\
-[ {
- "vnicId" : "ocid1.vnic.oc1.phx.abyhqljtch72z5pd76cc2636qeqh7z_truncated",
- "privateIp" : "10.0.0.230",
- "vlanTag" : 1039,
- "macAddr" : "02:00:17:05:D1:DB",
- "virtualRouterIp" : "10.0.0.1",
- "subnetCidrBlock" : "10.0.0.0/24"
-}, {
- "vnicId" : "ocid1.vnic.oc1.phx.abyhqljt4iew3gwmvrwrhhf3bp5drj_truncated",
- "privateIp" : "10.0.0.231",
- "vlanTag" : 1041,
- "macAddr" : "00:00:17:02:2B:B1",
- "virtualRouterIp" : "10.0.0.1",
- "subnetCidrBlock" : "10.0.0.0/24"
-} ]"""
-
-
-# Fetched with `curl http://169.254.169.254/opc/v1/instance/` (and then
-# truncated for line length)
-OPC_V2_METADATA = """\
-{
- "availabilityDomain" : "qIZq:PHX-AD-1",
- "faultDomain" : "FAULT-DOMAIN-2",
- "compartmentId" : "ocid1.tenancy.oc1..aaaaaaaao7f7cccogqrg5emjxkxmTRUNCATED",
- "displayName" : "instance-20200320-1400",
- "hostname" : "instance-20200320-1400",
- "id" : "ocid1.instance.oc1.phx.anyhqljtniwq6syc3nex55sep5w34qbwmw6TRUNCATED",
- "image" : "ocid1.image.oc1.phx.aaaaaaaagmkn4gdhvvx24kiahh2b2qchsicTRUNCATED",
- "metadata" : {
- "ssh_authorized_keys" : "ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQ truncated",
- "user_data" : "IyEvYmluL3NoCnRvdWNoIC90bXAvZm9v"
- },
- "region" : "phx",
- "canonicalRegionName" : "us-phoenix-1",
- "ociAdName" : "phx-ad-3",
- "shape" : "VM.Standard2.1",
- "state" : "Running",
- "timeCreated" : 1584727285318,
- "agentConfig" : {
- "monitoringDisabled" : true,
- "managementDisabled" : true
- }
-}"""
-
-# Just a small meaningless change to differentiate the two metadatas
-OPC_V1_METADATA = OPC_V2_METADATA.replace("ocid1.instance", "ocid2.instance")
-
-
-@pytest.fixture
-def metadata_version():
- return 2
-
-
-@pytest.yield_fixture
-def oracle_ds(request, fixture_utils, paths, metadata_version):
- """
- Return an instantiated DataSourceOracle.
-
- This also performs the mocking required for the default test case:
- * ``_read_system_uuid`` returns something,
- * ``_is_platform_viable`` returns True,
- * ``_is_iscsi_root`` returns True (the simpler code path),
- * ``read_opc_metadata`` returns ``OPC_V1_METADATA``
-
- (This uses the paths fixture for the required helpers.Paths object, and the
- fixture_utils fixture for fetching markers.)
- """
- sys_cfg = fixture_utils.closest_marker_first_arg_or(
- request, "ds_sys_cfg", mock.MagicMock()
- )
- metadata = OpcMetadata(metadata_version, json.loads(OPC_V2_METADATA), None)
- with mock.patch(DS_PATH + "._read_system_uuid", return_value="someuuid"):
- with mock.patch(DS_PATH + "._is_platform_viable", return_value=True):
- with mock.patch(DS_PATH + "._is_iscsi_root", return_value=True):
- with mock.patch(
- DS_PATH + ".read_opc_metadata",
- return_value=metadata,
- ):
- yield oracle.DataSourceOracle(
- sys_cfg=sys_cfg, distro=mock.Mock(), paths=paths,
- )
-
-
-class TestDataSourceOracle:
- def test_platform_info(self, oracle_ds):
- assert "oracle" == oracle_ds.cloud_name
- assert "oracle" == oracle_ds.platform_type
-
- def test_subplatform_before_fetch(self, oracle_ds):
- assert 'unknown' == oracle_ds.subplatform
-
- def test_platform_info_after_fetch(self, oracle_ds):
- oracle_ds._get_data()
- assert 'metadata (http://169.254.169.254/opc/v2/)' == \
- oracle_ds.subplatform
-
- @pytest.mark.parametrize('metadata_version', [1])
- def test_v1_platform_info_after_fetch(self, oracle_ds):
- oracle_ds._get_data()
- assert 'metadata (http://169.254.169.254/opc/v1/)' == \
- oracle_ds.subplatform
-
- def test_secondary_nics_disabled_by_default(self, oracle_ds):
- assert not oracle_ds.ds_cfg["configure_secondary_nics"]
-
- @pytest.mark.ds_sys_cfg(
- {"datasource": {"Oracle": {"configure_secondary_nics": True}}}
- )
- def test_sys_cfg_can_enable_configure_secondary_nics(self, oracle_ds):
- assert oracle_ds.ds_cfg["configure_secondary_nics"]
-
-
-class TestIsPlatformViable(test_helpers.CiTestCase):
- @mock.patch(DS_PATH + ".dmi.read_dmi_data",
- return_value=oracle.CHASSIS_ASSET_TAG)
- def test_expected_viable(self, m_read_dmi_data):
- """System with known chassis tag is viable."""
- self.assertTrue(oracle._is_platform_viable())
- m_read_dmi_data.assert_has_calls([mock.call('chassis-asset-tag')])
-
- @mock.patch(DS_PATH + ".dmi.read_dmi_data", return_value=None)
- def test_expected_not_viable_dmi_data_none(self, m_read_dmi_data):
- """System without known chassis tag is not viable."""
- self.assertFalse(oracle._is_platform_viable())
- m_read_dmi_data.assert_has_calls([mock.call('chassis-asset-tag')])
-
- @mock.patch(DS_PATH + ".dmi.read_dmi_data", return_value="LetsGoCubs")
- def test_expected_not_viable_other(self, m_read_dmi_data):
- """System with unnown chassis tag is not viable."""
- self.assertFalse(oracle._is_platform_viable())
- m_read_dmi_data.assert_has_calls([mock.call('chassis-asset-tag')])
-
-
-class TestNetworkConfigFromOpcImds:
- def test_no_secondary_nics_does_not_mutate_input(self, oracle_ds):
- oracle_ds._vnics_data = [{}]
- # We test this by using in a non-dict to ensure that no dict
- # operations are used; failure would be seen as exceptions
- oracle_ds._network_config = object()
- oracle_ds._add_network_config_from_opc_imds()
-
- def test_bare_metal_machine_skipped(self, oracle_ds, caplog):
- # nicIndex in the first entry indicates a bare metal machine
- oracle_ds._vnics_data = json.loads(OPC_BM_SECONDARY_VNIC_RESPONSE)
- # We test this by using a non-dict to ensure that no dict
- # operations are used
- oracle_ds._network_config = object()
- oracle_ds._add_network_config_from_opc_imds()
- assert 'bare metal machine' in caplog.text
-
- def test_missing_mac_skipped(self, oracle_ds, caplog):
- oracle_ds._vnics_data = json.loads(OPC_VM_SECONDARY_VNIC_RESPONSE)
-
- oracle_ds._network_config = {
- 'version': 1, 'config': [{'primary': 'nic'}]
- }
- with mock.patch(DS_PATH + ".get_interfaces_by_mac", return_value={}):
- oracle_ds._add_network_config_from_opc_imds()
-
- assert 1 == len(oracle_ds.network_config['config'])
- assert 'Interface with MAC 00:00:17:02:2b:b1 not found; skipping' in \
- caplog.text
-
- def test_missing_mac_skipped_v2(self, oracle_ds, caplog):
- oracle_ds._vnics_data = json.loads(OPC_VM_SECONDARY_VNIC_RESPONSE)
-
- oracle_ds._network_config = {
- 'version': 2, 'ethernets': {'primary': {'nic': {}}}
- }
- with mock.patch(DS_PATH + ".get_interfaces_by_mac", return_value={}):
- oracle_ds._add_network_config_from_opc_imds()
-
- assert 1 == len(oracle_ds.network_config['ethernets'])
- assert 'Interface with MAC 00:00:17:02:2b:b1 not found; skipping' in \
- caplog.text
-
- def test_secondary_nic(self, oracle_ds):
- oracle_ds._vnics_data = json.loads(OPC_VM_SECONDARY_VNIC_RESPONSE)
- oracle_ds._network_config = {
- 'version': 1, 'config': [{'primary': 'nic'}]
- }
- mac_addr, nic_name = '00:00:17:02:2b:b1', 'ens3'
- with mock.patch(DS_PATH + ".get_interfaces_by_mac",
- return_value={mac_addr: nic_name}):
- oracle_ds._add_network_config_from_opc_imds()
-
- # The input is mutated
- assert 2 == len(oracle_ds.network_config['config'])
-
- secondary_nic_cfg = oracle_ds.network_config['config'][1]
- assert nic_name == secondary_nic_cfg['name']
- assert 'physical' == secondary_nic_cfg['type']
- assert mac_addr == secondary_nic_cfg['mac_address']
- assert 9000 == secondary_nic_cfg['mtu']
-
- assert 1 == len(secondary_nic_cfg['subnets'])
- subnet_cfg = secondary_nic_cfg['subnets'][0]
- # These values are hard-coded in OPC_VM_SECONDARY_VNIC_RESPONSE
- assert '10.0.0.231' == subnet_cfg['address']
-
- def test_secondary_nic_v2(self, oracle_ds):
- oracle_ds._vnics_data = json.loads(OPC_VM_SECONDARY_VNIC_RESPONSE)
- oracle_ds._network_config = {
- 'version': 2, 'ethernets': {'primary': {'nic': {}}}
- }
- mac_addr, nic_name = '00:00:17:02:2b:b1', 'ens3'
- with mock.patch(DS_PATH + ".get_interfaces_by_mac",
- return_value={mac_addr: nic_name}):
- oracle_ds._add_network_config_from_opc_imds()
-
- # The input is mutated
- assert 2 == len(oracle_ds.network_config['ethernets'])
-
- secondary_nic_cfg = oracle_ds.network_config['ethernets']['ens3']
- assert secondary_nic_cfg['dhcp4'] is False
- assert secondary_nic_cfg['dhcp6'] is False
- assert mac_addr == secondary_nic_cfg['match']['macaddress']
- assert 9000 == secondary_nic_cfg['mtu']
-
- assert 1 == len(secondary_nic_cfg['addresses'])
- # These values are hard-coded in OPC_VM_SECONDARY_VNIC_RESPONSE
- assert '10.0.0.231' == secondary_nic_cfg['addresses'][0]
-
-
-class TestNetworkConfigFiltersNetFailover(test_helpers.CiTestCase):
-
- def setUp(self):
- super(TestNetworkConfigFiltersNetFailover, self).setUp()
- self.add_patch(DS_PATH + '.get_interfaces_by_mac',
- 'm_get_interfaces_by_mac')
- self.add_patch(DS_PATH + '.is_netfail_master', 'm_netfail_master')
-
- def test_ignore_bogus_network_config(self):
- netcfg = {'something': 'here'}
- passed_netcfg = copy.copy(netcfg)
- oracle._ensure_netfailover_safe(passed_netcfg)
- self.assertEqual(netcfg, passed_netcfg)
-
- def test_ignore_network_config_unknown_versions(self):
- netcfg = {'something': 'here', 'version': 3}
- passed_netcfg = copy.copy(netcfg)
- oracle._ensure_netfailover_safe(passed_netcfg)
- self.assertEqual(netcfg, passed_netcfg)
-
- def test_checks_v1_type_physical_interfaces(self):
- mac_addr, nic_name = '00:00:17:02:2b:b1', 'ens3'
- self.m_get_interfaces_by_mac.return_value = {
- mac_addr: nic_name,
- }
- netcfg = {'version': 1, 'config': [
- {'type': 'physical', 'name': nic_name, 'mac_address': mac_addr,
- 'subnets': [{'type': 'dhcp4'}]}]}
- passed_netcfg = copy.copy(netcfg)
- self.m_netfail_master.return_value = False
- oracle._ensure_netfailover_safe(passed_netcfg)
- self.assertEqual(netcfg, passed_netcfg)
- self.assertEqual([mock.call(nic_name)],
- self.m_netfail_master.call_args_list)
-
- def test_checks_v1_skips_non_phys_interfaces(self):
- mac_addr, nic_name = '00:00:17:02:2b:b1', 'bond0'
- self.m_get_interfaces_by_mac.return_value = {
- mac_addr: nic_name,
- }
- netcfg = {'version': 1, 'config': [
- {'type': 'bond', 'name': nic_name, 'mac_address': mac_addr,
- 'subnets': [{'type': 'dhcp4'}]}]}
- passed_netcfg = copy.copy(netcfg)
- oracle._ensure_netfailover_safe(passed_netcfg)
- self.assertEqual(netcfg, passed_netcfg)
- self.assertEqual(0, self.m_netfail_master.call_count)
-
- def test_removes_master_mac_property_v1(self):
- nic_master, mac_master = 'ens3', self.random_string()
- nic_other, mac_other = 'ens7', self.random_string()
- nic_extra, mac_extra = 'enp0s1f2', self.random_string()
- self.m_get_interfaces_by_mac.return_value = {
- mac_master: nic_master,
- mac_other: nic_other,
- mac_extra: nic_extra,
- }
- netcfg = {'version': 1, 'config': [
- {'type': 'physical', 'name': nic_master,
- 'mac_address': mac_master},
- {'type': 'physical', 'name': nic_other, 'mac_address': mac_other},
- {'type': 'physical', 'name': nic_extra, 'mac_address': mac_extra},
- ]}
-
- def _is_netfail_master(iface):
- if iface == 'ens3':
- return True
- return False
- self.m_netfail_master.side_effect = _is_netfail_master
- expected_cfg = {'version': 1, 'config': [
- {'type': 'physical', 'name': nic_master},
- {'type': 'physical', 'name': nic_other, 'mac_address': mac_other},
- {'type': 'physical', 'name': nic_extra, 'mac_address': mac_extra},
- ]}
- oracle._ensure_netfailover_safe(netcfg)
- self.assertEqual(expected_cfg, netcfg)
-
- def test_checks_v2_type_ethernet_interfaces(self):
- mac_addr, nic_name = '00:00:17:02:2b:b1', 'ens3'
- self.m_get_interfaces_by_mac.return_value = {
- mac_addr: nic_name,
- }
- netcfg = {'version': 2, 'ethernets': {
- nic_name: {'dhcp4': True, 'critical': True, 'set-name': nic_name,
- 'match': {'macaddress': mac_addr}}}}
- passed_netcfg = copy.copy(netcfg)
- self.m_netfail_master.return_value = False
- oracle._ensure_netfailover_safe(passed_netcfg)
- self.assertEqual(netcfg, passed_netcfg)
- self.assertEqual([mock.call(nic_name)],
- self.m_netfail_master.call_args_list)
-
- def test_skips_v2_non_ethernet_interfaces(self):
- mac_addr, nic_name = '00:00:17:02:2b:b1', 'wlps0'
- self.m_get_interfaces_by_mac.return_value = {
- mac_addr: nic_name,
- }
- netcfg = {'version': 2, 'wifis': {
- nic_name: {'dhcp4': True, 'critical': True, 'set-name': nic_name,
- 'match': {'macaddress': mac_addr}}}}
- passed_netcfg = copy.copy(netcfg)
- oracle._ensure_netfailover_safe(passed_netcfg)
- self.assertEqual(netcfg, passed_netcfg)
- self.assertEqual(0, self.m_netfail_master.call_count)
-
- def test_removes_master_mac_property_v2(self):
- nic_master, mac_master = 'ens3', self.random_string()
- nic_other, mac_other = 'ens7', self.random_string()
- nic_extra, mac_extra = 'enp0s1f2', self.random_string()
- self.m_get_interfaces_by_mac.return_value = {
- mac_master: nic_master,
- mac_other: nic_other,
- mac_extra: nic_extra,
- }
- netcfg = {'version': 2, 'ethernets': {
- nic_extra: {'dhcp4': True, 'set-name': nic_extra,
- 'match': {'macaddress': mac_extra}},
- nic_other: {'dhcp4': True, 'set-name': nic_other,
- 'match': {'macaddress': mac_other}},
- nic_master: {'dhcp4': True, 'set-name': nic_master,
- 'match': {'macaddress': mac_master}},
- }}
-
- def _is_netfail_master(iface):
- if iface == 'ens3':
- return True
- return False
- self.m_netfail_master.side_effect = _is_netfail_master
-
- expected_cfg = {'version': 2, 'ethernets': {
- nic_master: {'dhcp4': True, 'match': {'name': nic_master}},
- nic_extra: {'dhcp4': True, 'set-name': nic_extra,
- 'match': {'macaddress': mac_extra}},
- nic_other: {'dhcp4': True, 'set-name': nic_other,
- 'match': {'macaddress': mac_other}},
- }}
- oracle._ensure_netfailover_safe(netcfg)
- import pprint
- pprint.pprint(netcfg)
- print('---- ^^ modified ^^ ---- vv original vv ----')
- pprint.pprint(expected_cfg)
- self.assertEqual(expected_cfg, netcfg)
-
-
-def _mock_v2_urls(httpretty):
- def instance_callback(request, uri, response_headers):
- print(response_headers)
- assert request.headers.get("Authorization") == "Bearer Oracle"
- return [200, response_headers, OPC_V2_METADATA]
-
- def vnics_callback(request, uri, response_headers):
- assert request.headers.get("Authorization") == "Bearer Oracle"
- return [200, response_headers, OPC_BM_SECONDARY_VNIC_RESPONSE]
-
- httpretty.register_uri(
- httpretty.GET,
- "http://169.254.169.254/opc/v2/instance/",
- body=instance_callback
- )
- httpretty.register_uri(
- httpretty.GET,
- "http://169.254.169.254/opc/v2/vnics/",
- body=vnics_callback
- )
-
-
-def _mock_no_v2_urls(httpretty):
- httpretty.register_uri(
- httpretty.GET,
- "http://169.254.169.254/opc/v2/instance/",
- status=404,
- )
- httpretty.register_uri(
- httpretty.GET,
- "http://169.254.169.254/opc/v1/instance/",
- body=OPC_V1_METADATA
- )
- httpretty.register_uri(
- httpretty.GET,
- "http://169.254.169.254/opc/v1/vnics/",
- body=OPC_BM_SECONDARY_VNIC_RESPONSE
- )
-
-
-class TestReadOpcMetadata:
- # See https://docs.pytest.org/en/stable/example
- # /parametrize.html#parametrizing-conditional-raising
- does_not_raise = ExitStack
-
- @mock.patch("cloudinit.url_helper.time.sleep", lambda _: None)
- @pytest.mark.parametrize(
- 'version,setup_urls,instance_data,fetch_vnics,vnics_data', [
- (2, _mock_v2_urls, json.loads(OPC_V2_METADATA), True,
- json.loads(OPC_BM_SECONDARY_VNIC_RESPONSE)),
- (2, _mock_v2_urls, json.loads(OPC_V2_METADATA), False, None),
- (1, _mock_no_v2_urls, json.loads(OPC_V1_METADATA), True,
- json.loads(OPC_BM_SECONDARY_VNIC_RESPONSE)),
- (1, _mock_no_v2_urls, json.loads(OPC_V1_METADATA), False, None),
- ]
- )
- def test_metadata_returned(
- self, version, setup_urls, instance_data,
- fetch_vnics, vnics_data, httpretty
- ):
- setup_urls(httpretty)
- metadata = oracle.read_opc_metadata(fetch_vnics_data=fetch_vnics)
-
- assert version == metadata.version
- assert instance_data == metadata.instance_data
- assert vnics_data == metadata.vnics_data
-
- # No need to actually wait between retries in the tests
- @mock.patch("cloudinit.url_helper.time.sleep", lambda _: None)
- @pytest.mark.parametrize(
- "v2_failure_count,v1_failure_count,expected_body,expectation",
- [
- (1, 0, json.loads(OPC_V2_METADATA), does_not_raise()),
- (2, 0, json.loads(OPC_V2_METADATA), does_not_raise()),
- (3, 0, json.loads(OPC_V1_METADATA), does_not_raise()),
- (3, 1, json.loads(OPC_V1_METADATA), does_not_raise()),
- (3, 2, json.loads(OPC_V1_METADATA), does_not_raise()),
- (3, 3, None, pytest.raises(UrlError)),
- ]
- )
- def test_retries(self, v2_failure_count, v1_failure_count,
- expected_body, expectation, httpretty):
- v2_responses = [httpretty.Response("", status=404)] * v2_failure_count
- v2_responses.append(httpretty.Response(OPC_V2_METADATA))
- v1_responses = [httpretty.Response("", status=404)] * v1_failure_count
- v1_responses.append(httpretty.Response(OPC_V1_METADATA))
-
- httpretty.register_uri(
- httpretty.GET,
- "http://169.254.169.254/opc/v1/instance/",
- responses=v1_responses,
- )
- httpretty.register_uri(
- httpretty.GET,
- "http://169.254.169.254/opc/v2/instance/",
- responses=v2_responses,
- )
- with expectation:
- assert expected_body == oracle.read_opc_metadata().instance_data
-
-
-class TestCommon_GetDataBehaviour:
- """This test class tests behaviour common to iSCSI and non-iSCSI root.
-
- It defines a fixture, parameterized_oracle_ds, which is used in all the
- tests herein to test that the commonly expected behaviour is the same with
- iSCSI root and without.
-
- (As non-iSCSI root behaviour is a superset of iSCSI root behaviour this
- class is implicitly also testing all iSCSI root behaviour so there is no
- separate class for that case.)
- """
-
- @pytest.yield_fixture(params=[True, False])
- def parameterized_oracle_ds(self, request, oracle_ds):
- """oracle_ds parameterized for iSCSI and non-iSCSI root respectively"""
- is_iscsi_root = request.param
- with ExitStack() as stack:
- stack.enter_context(
- mock.patch(
- DS_PATH + "._is_iscsi_root", return_value=is_iscsi_root
- )
- )
- if not is_iscsi_root:
- stack.enter_context(
- mock.patch(DS_PATH + ".net.find_fallback_nic")
- )
- stack.enter_context(
- mock.patch(DS_PATH + ".dhcp.EphemeralDHCPv4")
- )
- yield oracle_ds
-
- @mock.patch(
- DS_PATH + "._is_platform_viable", mock.Mock(return_value=False)
- )
- def test_false_if_platform_not_viable(
- self, parameterized_oracle_ds,
- ):
- assert not parameterized_oracle_ds._get_data()
-
- @pytest.mark.parametrize(
- "keyname,expected_value",
- (
- ("availability-zone", "phx-ad-3"),
- ("launch-index", 0),
- ("local-hostname", "instance-20200320-1400"),
- (
- "instance-id",
- "ocid1.instance.oc1.phx"
- ".anyhqljtniwq6syc3nex55sep5w34qbwmw6TRUNCATED",
- ),
- ("name", "instance-20200320-1400"),
- (
- "public_keys",
- "ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQ truncated",
- ),
- ),
- )
- def test_metadata_keys_set_correctly(
- self, keyname, expected_value, parameterized_oracle_ds,
- ):
- assert parameterized_oracle_ds._get_data()
- assert expected_value == parameterized_oracle_ds.metadata[keyname]
-
- @pytest.mark.parametrize(
- "attribute_name,expected_value",
- [
- ("_crawled_metadata", json.loads(OPC_V2_METADATA)),
- (
- "userdata_raw",
- base64.b64decode(b"IyEvYmluL3NoCnRvdWNoIC90bXAvZm9v"),
- ),
- ("system_uuid", "my-test-uuid"),
- ],
- )
- @mock.patch(
- DS_PATH + "._read_system_uuid", mock.Mock(return_value="my-test-uuid")
- )
- def test_attributes_set_correctly(
- self, attribute_name, expected_value, parameterized_oracle_ds,
- ):
- assert parameterized_oracle_ds._get_data()
- assert expected_value == getattr(
- parameterized_oracle_ds, attribute_name
- )
-
- @pytest.mark.parametrize(
- "ssh_keys,expected_value",
- [
- # No SSH keys in metadata => no keys detected
- (None, []),
- # Empty SSH keys in metadata => no keys detected
- ("", []),
- # Single SSH key in metadata => single key detected
- ("ssh-rsa ... test@test", ["ssh-rsa ... test@test"]),
- # Multiple SSH keys in metadata => multiple keys detected
- (
- "ssh-rsa ... test@test\nssh-rsa ... test2@test2",
- ["ssh-rsa ... test@test", "ssh-rsa ... test2@test2"],
- ),
- ],
- )
- def test_public_keys_handled_correctly(
- self, ssh_keys, expected_value, parameterized_oracle_ds
- ):
- instance_data = json.loads(OPC_V1_METADATA)
- if ssh_keys is None:
- del instance_data["metadata"]["ssh_authorized_keys"]
- else:
- instance_data["metadata"]["ssh_authorized_keys"] = ssh_keys
- metadata = OpcMetadata(None, instance_data, None)
- with mock.patch(
- DS_PATH + ".read_opc_metadata", mock.Mock(return_value=metadata),
- ):
- assert parameterized_oracle_ds._get_data()
- assert (
- expected_value == parameterized_oracle_ds.get_public_ssh_keys()
- )
-
- def test_missing_user_data_handled_gracefully(
- self, parameterized_oracle_ds
- ):
- instance_data = json.loads(OPC_V1_METADATA)
- del instance_data["metadata"]["user_data"]
- metadata = OpcMetadata(None, instance_data, None)
- with mock.patch(
- DS_PATH + ".read_opc_metadata", mock.Mock(return_value=metadata),
- ):
- assert parameterized_oracle_ds._get_data()
-
- assert parameterized_oracle_ds.userdata_raw is None
-
- def test_missing_metadata_handled_gracefully(
- self, parameterized_oracle_ds
- ):
- instance_data = json.loads(OPC_V1_METADATA)
- del instance_data["metadata"]
- metadata = OpcMetadata(None, instance_data, None)
- with mock.patch(
- DS_PATH + ".read_opc_metadata", mock.Mock(return_value=metadata),
- ):
- assert parameterized_oracle_ds._get_data()
-
- assert parameterized_oracle_ds.userdata_raw is None
- assert [] == parameterized_oracle_ds.get_public_ssh_keys()
-
-
-@mock.patch(DS_PATH + "._is_iscsi_root", lambda: False)
-class TestNonIscsiRoot_GetDataBehaviour:
- @mock.patch(DS_PATH + ".dhcp.EphemeralDHCPv4")
- @mock.patch(DS_PATH + ".net.find_fallback_nic")
- def test_read_opc_metadata_called_with_ephemeral_dhcp(
- self, m_find_fallback_nic, m_EphemeralDHCPv4, oracle_ds
- ):
- in_context_manager = False
-
- def enter_context_manager():
- nonlocal in_context_manager
- in_context_manager = True
-
- def exit_context_manager(*args):
- nonlocal in_context_manager
- in_context_manager = False
-
- m_EphemeralDHCPv4.return_value.__enter__.side_effect = (
- enter_context_manager
- )
- m_EphemeralDHCPv4.return_value.__exit__.side_effect = (
- exit_context_manager
- )
-
- def assert_in_context_manager(**kwargs):
- assert in_context_manager
- return mock.MagicMock()
-
- with mock.patch(
- DS_PATH + ".read_opc_metadata",
- mock.Mock(side_effect=assert_in_context_manager),
- ):
- assert oracle_ds._get_data()
-
- assert [
- mock.call(m_find_fallback_nic.return_value)
- ] == m_EphemeralDHCPv4.call_args_list
-
-
-@mock.patch(DS_PATH + ".get_interfaces_by_mac", lambda: {})
-@mock.patch(DS_PATH + ".cmdline.read_initramfs_config")
-class TestNetworkConfig:
- def test_network_config_cached(self, m_read_initramfs_config, oracle_ds):
- """.network_config should be cached"""
- assert 0 == m_read_initramfs_config.call_count
- oracle_ds.network_config # pylint: disable=pointless-statement
- assert 1 == m_read_initramfs_config.call_count
- oracle_ds.network_config # pylint: disable=pointless-statement
- assert 1 == m_read_initramfs_config.call_count
-
- def test_network_cmdline(self, m_read_initramfs_config, oracle_ds):
- """network_config should prefer initramfs config over fallback"""
- ncfg = {"version": 1, "config": [{"a": "b"}]}
- m_read_initramfs_config.return_value = copy.deepcopy(ncfg)
-
- assert ncfg == oracle_ds.network_config
- assert 0 == oracle_ds.distro.generate_fallback_config.call_count
-
- def test_network_fallback(self, m_read_initramfs_config, oracle_ds):
- """network_config should prefer initramfs config over fallback"""
- ncfg = {"version": 1, "config": [{"a": "b"}]}
-
- m_read_initramfs_config.return_value = None
- oracle_ds.distro.generate_fallback_config.return_value = copy.deepcopy(
- ncfg
- )
-
- assert ncfg == oracle_ds.network_config
-
- @pytest.mark.parametrize(
- "configure_secondary_nics,expect_secondary_nics",
- [(True, True), (False, False), (None, False)],
- )
- def test_secondary_nic_addition(
- self,
- m_read_initramfs_config,
- configure_secondary_nics,
- expect_secondary_nics,
- oracle_ds,
- ):
- """Test that _add_network_config_from_opc_imds is called as expected
-
- (configure_secondary_nics=None is used to test the default behaviour.)
- """
- m_read_initramfs_config.return_value = {"version": 1, "config": []}
-
- if configure_secondary_nics is not None:
- oracle_ds.ds_cfg[
- "configure_secondary_nics"
- ] = configure_secondary_nics
-
- def side_effect(self):
- self._network_config["secondary_added"] = mock.sentinel.needle
-
- oracle_ds._vnics_data = 'DummyData'
- with mock.patch.object(
- oracle.DataSourceOracle, "_add_network_config_from_opc_imds",
- new=side_effect,
- ):
- was_secondary_added = "secondary_added" in oracle_ds.network_config
- assert expect_secondary_nics == was_secondary_added
-
- def test_secondary_nic_failure_isnt_blocking(
- self,
- m_read_initramfs_config,
- caplog,
- oracle_ds,
- ):
- oracle_ds.ds_cfg["configure_secondary_nics"] = True
- oracle_ds._vnics_data = "DummyData"
-
- with mock.patch.object(
- oracle.DataSourceOracle, "_add_network_config_from_opc_imds",
- side_effect=Exception()
- ):
- network_config = oracle_ds.network_config
- assert network_config == m_read_initramfs_config.return_value
- assert "Failed to parse secondary network configuration" in caplog.text
-
- def test_ds_network_cfg_preferred_over_initramfs(self, _m):
- """Ensure that DS net config is preferred over initramfs config"""
- config_sources = oracle.DataSourceOracle.network_config_sources
- ds_idx = config_sources.index(NetworkConfigSource.ds)
- initramfs_idx = config_sources.index(NetworkConfigSource.initramfs)
- assert ds_idx < initramfs_idx
-
-
-# vi: ts=4 expandtab