summaryrefslogtreecommitdiff
path: root/tests/unittests/sources/test_init.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/unittests/sources/test_init.py')
-rw-r--r--tests/unittests/sources/test_init.py771
1 files changed, 771 insertions, 0 deletions
diff --git a/tests/unittests/sources/test_init.py b/tests/unittests/sources/test_init.py
new file mode 100644
index 00000000..a1d19518
--- /dev/null
+++ b/tests/unittests/sources/test_init.py
@@ -0,0 +1,771 @@
+# This file is part of cloud-init. See LICENSE file for license information.
+
+import copy
+import inspect
+import os
+import stat
+
+from cloudinit.event import EventScope, EventType
+from cloudinit.helpers import Paths
+from cloudinit import importer
+from cloudinit.sources import (
+ EXPERIMENTAL_TEXT, INSTANCE_JSON_FILE, INSTANCE_JSON_SENSITIVE_FILE,
+ METADATA_UNKNOWN, REDACT_SENSITIVE_VALUE, UNSET, DataSource,
+ canonical_cloud_id, redact_sensitive_keys)
+from tests.unittests.helpers import CiTestCase, mock
+from cloudinit.user_data import UserDataProcessor
+from cloudinit import util
+
+
+class DataSourceTestSubclassNet(DataSource):
+
+ dsname = 'MyTestSubclass'
+ url_max_wait = 55
+
+ def __init__(self, sys_cfg, distro, paths, custom_metadata=None,
+ custom_userdata=None, get_data_retval=True):
+ super(DataSourceTestSubclassNet, self).__init__(
+ sys_cfg, distro, paths)
+ self._custom_userdata = custom_userdata
+ self._custom_metadata = custom_metadata
+ self._get_data_retval = get_data_retval
+
+ def _get_cloud_name(self):
+ return 'SubclassCloudName'
+
+ def _get_data(self):
+ if self._custom_metadata:
+ self.metadata = self._custom_metadata
+ else:
+ self.metadata = {'availability_zone': 'myaz',
+ 'local-hostname': 'test-subclass-hostname',
+ 'region': 'myregion'}
+ if self._custom_userdata:
+ self.userdata_raw = self._custom_userdata
+ else:
+ self.userdata_raw = 'userdata_raw'
+ self.vendordata_raw = 'vendordata_raw'
+ return self._get_data_retval
+
+
+class InvalidDataSourceTestSubclassNet(DataSource):
+ pass
+
+
+class TestDataSource(CiTestCase):
+
+ with_logs = True
+ maxDiff = None
+
+ def setUp(self):
+ super(TestDataSource, self).setUp()
+ self.sys_cfg = {'datasource': {'_undef': {'key1': False}}}
+ self.distro = 'distrotest' # generally should be a Distro object
+ self.paths = Paths({})
+ self.datasource = DataSource(self.sys_cfg, self.distro, self.paths)
+
+ def test_datasource_init(self):
+ """DataSource initializes metadata attributes, ds_cfg and ud_proc."""
+ self.assertEqual(self.paths, self.datasource.paths)
+ self.assertEqual(self.sys_cfg, self.datasource.sys_cfg)
+ self.assertEqual(self.distro, self.datasource.distro)
+ self.assertIsNone(self.datasource.userdata)
+ self.assertEqual({}, self.datasource.metadata)
+ self.assertIsNone(self.datasource.userdata_raw)
+ self.assertIsNone(self.datasource.vendordata)
+ self.assertIsNone(self.datasource.vendordata_raw)
+ self.assertEqual({'key1': False}, self.datasource.ds_cfg)
+ self.assertIsInstance(self.datasource.ud_proc, UserDataProcessor)
+
+ def test_datasource_init_gets_ds_cfg_using_dsname(self):
+ """Init uses DataSource.dsname for sourcing ds_cfg."""
+ sys_cfg = {'datasource': {'MyTestSubclass': {'key2': False}}}
+ distro = 'distrotest' # generally should be a Distro object
+ datasource = DataSourceTestSubclassNet(sys_cfg, distro, self.paths)
+ self.assertEqual({'key2': False}, datasource.ds_cfg)
+
+ def test_str_is_classname(self):
+ """The string representation of the datasource is the classname."""
+ self.assertEqual('DataSource', str(self.datasource))
+ self.assertEqual(
+ 'DataSourceTestSubclassNet',
+ str(DataSourceTestSubclassNet('', '', self.paths)))
+
+ def test_datasource_get_url_params_defaults(self):
+ """get_url_params default url config settings for the datasource."""
+ params = self.datasource.get_url_params()
+ self.assertEqual(params.max_wait_seconds, self.datasource.url_max_wait)
+ self.assertEqual(params.timeout_seconds, self.datasource.url_timeout)
+ self.assertEqual(params.num_retries, self.datasource.url_retries)
+ self.assertEqual(params.sec_between_retries,
+ self.datasource.url_sec_between_retries)
+
+ def test_datasource_get_url_params_subclassed(self):
+ """Subclasses can override get_url_params defaults."""
+ sys_cfg = {'datasource': {'MyTestSubclass': {'key2': False}}}
+ distro = 'distrotest' # generally should be a Distro object
+ datasource = DataSourceTestSubclassNet(sys_cfg, distro, self.paths)
+ expected = (datasource.url_max_wait, datasource.url_timeout,
+ datasource.url_retries, datasource.url_sec_between_retries)
+ url_params = datasource.get_url_params()
+ self.assertNotEqual(self.datasource.get_url_params(), url_params)
+ self.assertEqual(expected, url_params)
+
+ def test_datasource_get_url_params_ds_config_override(self):
+ """Datasource configuration options can override url param defaults."""
+ sys_cfg = {
+ 'datasource': {
+ 'MyTestSubclass': {
+ 'max_wait': '1', 'timeout': '2',
+ 'retries': '3', 'sec_between_retries': 4
+ }}}
+ datasource = DataSourceTestSubclassNet(
+ sys_cfg, self.distro, self.paths)
+ expected = (1, 2, 3, 4)
+ url_params = datasource.get_url_params()
+ self.assertNotEqual(
+ (datasource.url_max_wait, datasource.url_timeout,
+ datasource.url_retries, datasource.url_sec_between_retries),
+ url_params)
+ self.assertEqual(expected, url_params)
+
+ def test_datasource_get_url_params_is_zero_or_greater(self):
+ """get_url_params ignores timeouts with a value below 0."""
+ # Set an override that is below 0 which gets ignored.
+ sys_cfg = {'datasource': {'_undef': {'timeout': '-1'}}}
+ datasource = DataSource(sys_cfg, self.distro, self.paths)
+ (_max_wait, timeout, _retries,
+ _sec_between_retries) = datasource.get_url_params()
+ self.assertEqual(0, timeout)
+
+ def test_datasource_get_url_uses_defaults_on_errors(self):
+ """On invalid system config values for url_params defaults are used."""
+ # All invalid values should be logged
+ sys_cfg = {'datasource': {
+ '_undef': {
+ 'max_wait': 'nope', 'timeout': 'bug', 'retries': 'nonint'}}}
+ datasource = DataSource(sys_cfg, self.distro, self.paths)
+ url_params = datasource.get_url_params()
+ expected = (datasource.url_max_wait, datasource.url_timeout,
+ datasource.url_retries, datasource.url_sec_between_retries)
+ self.assertEqual(expected, url_params)
+ logs = self.logs.getvalue()
+ expected_logs = [
+ "Config max_wait 'nope' is not an int, using default '-1'",
+ "Config timeout 'bug' is not an int, using default '10'",
+ "Config retries 'nonint' is not an int, using default '5'",
+ ]
+ for log in expected_logs:
+ self.assertIn(log, logs)
+
+ @mock.patch('cloudinit.sources.net.find_fallback_nic')
+ def test_fallback_interface_is_discovered(self, m_get_fallback_nic):
+ """The fallback_interface is discovered via find_fallback_nic."""
+ m_get_fallback_nic.return_value = 'nic9'
+ self.assertEqual('nic9', self.datasource.fallback_interface)
+
+ @mock.patch('cloudinit.sources.net.find_fallback_nic')
+ def test_fallback_interface_logs_undiscovered(self, m_get_fallback_nic):
+ """Log a warning when fallback_interface can not discover the nic."""
+ self.datasource._cloud_name = 'MySupahCloud'
+ m_get_fallback_nic.return_value = None # Couldn't discover nic
+ self.assertIsNone(self.datasource.fallback_interface)
+ self.assertEqual(
+ 'WARNING: Did not find a fallback interface on MySupahCloud.\n',
+ self.logs.getvalue())
+
+ @mock.patch('cloudinit.sources.net.find_fallback_nic')
+ def test_wb_fallback_interface_is_cached(self, m_get_fallback_nic):
+ """The fallback_interface is cached and won't be rediscovered."""
+ self.datasource._fallback_interface = 'nic10'
+ self.assertEqual('nic10', self.datasource.fallback_interface)
+ m_get_fallback_nic.assert_not_called()
+
+ def test__get_data_unimplemented(self):
+ """Raise an error when _get_data is not implemented."""
+ with self.assertRaises(NotImplementedError) as context_manager:
+ self.datasource.get_data()
+ self.assertIn(
+ 'Subclasses of DataSource must implement _get_data',
+ str(context_manager.exception))
+ datasource2 = InvalidDataSourceTestSubclassNet(
+ self.sys_cfg, self.distro, self.paths)
+ with self.assertRaises(NotImplementedError) as context_manager:
+ datasource2.get_data()
+ self.assertIn(
+ 'Subclasses of DataSource must implement _get_data',
+ str(context_manager.exception))
+
+ def test_get_data_calls_subclass__get_data(self):
+ """Datasource.get_data uses the subclass' version of _get_data."""
+ tmp = self.tmp_dir()
+ datasource = DataSourceTestSubclassNet(
+ self.sys_cfg, self.distro, Paths({'run_dir': tmp}))
+ self.assertTrue(datasource.get_data())
+ self.assertEqual(
+ {'availability_zone': 'myaz',
+ 'local-hostname': 'test-subclass-hostname',
+ 'region': 'myregion'},
+ datasource.metadata)
+ self.assertEqual('userdata_raw', datasource.userdata_raw)
+ self.assertEqual('vendordata_raw', datasource.vendordata_raw)
+
+ def test_get_hostname_strips_local_hostname_without_domain(self):
+ """Datasource.get_hostname strips metadata local-hostname of domain."""
+ tmp = self.tmp_dir()
+ datasource = DataSourceTestSubclassNet(
+ self.sys_cfg, self.distro, Paths({'run_dir': tmp}))
+ self.assertTrue(datasource.get_data())
+ self.assertEqual(
+ 'test-subclass-hostname', datasource.metadata['local-hostname'])
+ self.assertEqual('test-subclass-hostname', datasource.get_hostname())
+ datasource.metadata['local-hostname'] = 'hostname.my.domain.com'
+ self.assertEqual('hostname', datasource.get_hostname())
+
+ def test_get_hostname_with_fqdn_returns_local_hostname_with_domain(self):
+ """Datasource.get_hostname with fqdn set gets qualified hostname."""
+ tmp = self.tmp_dir()
+ datasource = DataSourceTestSubclassNet(
+ self.sys_cfg, self.distro, Paths({'run_dir': tmp}))
+ self.assertTrue(datasource.get_data())
+ datasource.metadata['local-hostname'] = 'hostname.my.domain.com'
+ self.assertEqual(
+ 'hostname.my.domain.com', datasource.get_hostname(fqdn=True))
+
+ def test_get_hostname_without_metadata_uses_system_hostname(self):
+ """Datasource.gethostname runs util.get_hostname when no metadata."""
+ tmp = self.tmp_dir()
+ datasource = DataSourceTestSubclassNet(
+ self.sys_cfg, self.distro, Paths({'run_dir': tmp}))
+ self.assertEqual({}, datasource.metadata)
+ mock_fqdn = 'cloudinit.sources.util.get_fqdn_from_hosts'
+ with mock.patch('cloudinit.sources.util.get_hostname') as m_gethost:
+ with mock.patch(mock_fqdn) as m_fqdn:
+ m_gethost.return_value = 'systemhostname.domain.com'
+ m_fqdn.return_value = None # No maching fqdn in /etc/hosts
+ self.assertEqual('systemhostname', datasource.get_hostname())
+ self.assertEqual(
+ 'systemhostname.domain.com',
+ datasource.get_hostname(fqdn=True))
+
+ def test_get_hostname_without_metadata_returns_none(self):
+ """Datasource.gethostname returns None when metadata_only and no MD."""
+ tmp = self.tmp_dir()
+ datasource = DataSourceTestSubclassNet(
+ self.sys_cfg, self.distro, Paths({'run_dir': tmp}))
+ self.assertEqual({}, datasource.metadata)
+ mock_fqdn = 'cloudinit.sources.util.get_fqdn_from_hosts'
+ with mock.patch('cloudinit.sources.util.get_hostname') as m_gethost:
+ with mock.patch(mock_fqdn) as m_fqdn:
+ self.assertIsNone(datasource.get_hostname(metadata_only=True))
+ self.assertIsNone(
+ datasource.get_hostname(fqdn=True, metadata_only=True))
+ self.assertEqual([], m_gethost.call_args_list)
+ self.assertEqual([], m_fqdn.call_args_list)
+
+ def test_get_hostname_without_metadata_prefers_etc_hosts(self):
+ """Datasource.gethostname prefers /etc/hosts to util.get_hostname."""
+ tmp = self.tmp_dir()
+ datasource = DataSourceTestSubclassNet(
+ self.sys_cfg, self.distro, Paths({'run_dir': tmp}))
+ self.assertEqual({}, datasource.metadata)
+ mock_fqdn = 'cloudinit.sources.util.get_fqdn_from_hosts'
+ with mock.patch('cloudinit.sources.util.get_hostname') as m_gethost:
+ with mock.patch(mock_fqdn) as m_fqdn:
+ m_gethost.return_value = 'systemhostname.domain.com'
+ m_fqdn.return_value = 'fqdnhostname.domain.com'
+ self.assertEqual('fqdnhostname', datasource.get_hostname())
+ self.assertEqual('fqdnhostname.domain.com',
+ datasource.get_hostname(fqdn=True))
+
+ def test_get_data_does_not_write_instance_data_on_failure(self):
+ """get_data does not write INSTANCE_JSON_FILE on get_data False."""
+ tmp = self.tmp_dir()
+ datasource = DataSourceTestSubclassNet(
+ self.sys_cfg, self.distro, Paths({'run_dir': tmp}),
+ get_data_retval=False)
+ self.assertFalse(datasource.get_data())
+ json_file = self.tmp_path(INSTANCE_JSON_FILE, tmp)
+ self.assertFalse(
+ os.path.exists(json_file), 'Found unexpected file %s' % json_file)
+
+ def test_get_data_writes_json_instance_data_on_success(self):
+ """get_data writes INSTANCE_JSON_FILE to run_dir as world readable."""
+ tmp = self.tmp_dir()
+ datasource = DataSourceTestSubclassNet(
+ self.sys_cfg, self.distro, Paths({'run_dir': tmp}))
+ sys_info = {
+ "python": "3.7",
+ "platform":
+ "Linux-5.4.0-24-generic-x86_64-with-Ubuntu-20.04-focal",
+ "uname": ["Linux", "myhost", "5.4.0-24-generic", "SMP blah",
+ "x86_64"],
+ "variant": "ubuntu", "dist": ["ubuntu", "20.04", "focal"]}
+ with mock.patch("cloudinit.util.system_info", return_value=sys_info):
+ datasource.get_data()
+ json_file = self.tmp_path(INSTANCE_JSON_FILE, tmp)
+ content = util.load_file(json_file)
+ expected = {
+ 'base64_encoded_keys': [],
+ 'merged_cfg': REDACT_SENSITIVE_VALUE,
+ 'sensitive_keys': ['merged_cfg'],
+ 'sys_info': sys_info,
+ 'v1': {
+ '_beta_keys': ['subplatform'],
+ 'availability-zone': 'myaz',
+ 'availability_zone': 'myaz',
+ 'cloud-name': 'subclasscloudname',
+ 'cloud_name': 'subclasscloudname',
+ 'distro': 'ubuntu',
+ 'distro_release': 'focal',
+ 'distro_version': '20.04',
+ 'instance-id': 'iid-datasource',
+ 'instance_id': 'iid-datasource',
+ 'local-hostname': 'test-subclass-hostname',
+ 'local_hostname': 'test-subclass-hostname',
+ 'kernel_release': '5.4.0-24-generic',
+ 'machine': 'x86_64',
+ 'platform': 'mytestsubclass',
+ 'public_ssh_keys': [],
+ 'python_version': '3.7',
+ 'region': 'myregion',
+ 'system_platform':
+ 'Linux-5.4.0-24-generic-x86_64-with-Ubuntu-20.04-focal',
+ 'subplatform': 'unknown',
+ 'variant': 'ubuntu'},
+ 'ds': {
+
+ '_doc': EXPERIMENTAL_TEXT,
+ 'meta_data': {'availability_zone': 'myaz',
+ 'local-hostname': 'test-subclass-hostname',
+ 'region': 'myregion'}}}
+ self.assertEqual(expected, util.load_json(content))
+ file_stat = os.stat(json_file)
+ self.assertEqual(0o644, stat.S_IMODE(file_stat.st_mode))
+ self.assertEqual(expected, util.load_json(content))
+
+ def test_get_data_writes_redacted_public_json_instance_data(self):
+ """get_data writes redacted content to public INSTANCE_JSON_FILE."""
+ tmp = self.tmp_dir()
+ datasource = DataSourceTestSubclassNet(
+ self.sys_cfg, self.distro, Paths({'run_dir': tmp}),
+ custom_metadata={
+ 'availability_zone': 'myaz',
+ 'local-hostname': 'test-subclass-hostname',
+ 'region': 'myregion',
+ 'some': {'security-credentials': {
+ 'cred1': 'sekret', 'cred2': 'othersekret'}}})
+ self.assertCountEqual(
+ ('merged_cfg', 'security-credentials',),
+ datasource.sensitive_metadata_keys)
+ sys_info = {
+ "python": "3.7",
+ "platform":
+ "Linux-5.4.0-24-generic-x86_64-with-Ubuntu-20.04-focal",
+ "uname": ["Linux", "myhost", "5.4.0-24-generic", "SMP blah",
+ "x86_64"],
+ "variant": "ubuntu", "dist": ["ubuntu", "20.04", "focal"]}
+ with mock.patch("cloudinit.util.system_info", return_value=sys_info):
+ datasource.get_data()
+ json_file = self.tmp_path(INSTANCE_JSON_FILE, tmp)
+ redacted = util.load_json(util.load_file(json_file))
+ expected = {
+ 'base64_encoded_keys': [],
+ 'merged_cfg': REDACT_SENSITIVE_VALUE,
+ 'sensitive_keys': [
+ 'ds/meta_data/some/security-credentials', 'merged_cfg'],
+ 'sys_info': sys_info,
+ 'v1': {
+ '_beta_keys': ['subplatform'],
+ 'availability-zone': 'myaz',
+ 'availability_zone': 'myaz',
+ 'cloud-name': 'subclasscloudname',
+ 'cloud_name': 'subclasscloudname',
+ 'distro': 'ubuntu',
+ 'distro_release': 'focal',
+ 'distro_version': '20.04',
+ 'instance-id': 'iid-datasource',
+ 'instance_id': 'iid-datasource',
+ 'local-hostname': 'test-subclass-hostname',
+ 'local_hostname': 'test-subclass-hostname',
+ 'kernel_release': '5.4.0-24-generic',
+ 'machine': 'x86_64',
+ 'platform': 'mytestsubclass',
+ 'public_ssh_keys': [],
+ 'python_version': '3.7',
+ 'region': 'myregion',
+ 'system_platform':
+ 'Linux-5.4.0-24-generic-x86_64-with-Ubuntu-20.04-focal',
+ 'subplatform': 'unknown',
+ 'variant': 'ubuntu'},
+ 'ds': {
+ '_doc': EXPERIMENTAL_TEXT,
+ 'meta_data': {
+ 'availability_zone': 'myaz',
+ 'local-hostname': 'test-subclass-hostname',
+ 'region': 'myregion',
+ 'some': {'security-credentials': REDACT_SENSITIVE_VALUE}}}
+ }
+ self.assertCountEqual(expected, redacted)
+ file_stat = os.stat(json_file)
+ self.assertEqual(0o644, stat.S_IMODE(file_stat.st_mode))
+
+ def test_get_data_writes_json_instance_data_sensitive(self):
+ """
+ get_data writes unmodified data to sensitive file as root-readonly.
+ """
+ tmp = self.tmp_dir()
+ datasource = DataSourceTestSubclassNet(
+ self.sys_cfg, self.distro, Paths({'run_dir': tmp}),
+ custom_metadata={
+ 'availability_zone': 'myaz',
+ 'local-hostname': 'test-subclass-hostname',
+ 'region': 'myregion',
+ 'some': {'security-credentials': {
+ 'cred1': 'sekret', 'cred2': 'othersekret'}}})
+ sys_info = {
+ "python": "3.7",
+ "platform":
+ "Linux-5.4.0-24-generic-x86_64-with-Ubuntu-20.04-focal",
+ "uname": ["Linux", "myhost", "5.4.0-24-generic", "SMP blah",
+ "x86_64"],
+ "variant": "ubuntu", "dist": ["ubuntu", "20.04", "focal"]}
+
+ self.assertCountEqual(
+ ('merged_cfg', 'security-credentials',),
+ datasource.sensitive_metadata_keys)
+ with mock.patch("cloudinit.util.system_info", return_value=sys_info):
+ datasource.get_data()
+ sensitive_json_file = self.tmp_path(INSTANCE_JSON_SENSITIVE_FILE, tmp)
+ content = util.load_file(sensitive_json_file)
+ expected = {
+ 'base64_encoded_keys': [],
+ 'merged_cfg': {
+ '_doc': (
+ 'Merged cloud-init system config from '
+ '/etc/cloud/cloud.cfg and /etc/cloud/cloud.cfg.d/'
+ ),
+ 'datasource': {'_undef': {'key1': False}}},
+ 'sensitive_keys': [
+ 'ds/meta_data/some/security-credentials', 'merged_cfg'],
+ 'sys_info': sys_info,
+ 'v1': {
+ '_beta_keys': ['subplatform'],
+ 'availability-zone': 'myaz',
+ 'availability_zone': 'myaz',
+ 'cloud-name': 'subclasscloudname',
+ 'cloud_name': 'subclasscloudname',
+ 'distro': 'ubuntu',
+ 'distro_release': 'focal',
+ 'distro_version': '20.04',
+ 'instance-id': 'iid-datasource',
+ 'instance_id': 'iid-datasource',
+ 'kernel_release': '5.4.0-24-generic',
+ 'local-hostname': 'test-subclass-hostname',
+ 'local_hostname': 'test-subclass-hostname',
+ 'machine': 'x86_64',
+ 'platform': 'mytestsubclass',
+ 'public_ssh_keys': [],
+ 'python_version': '3.7',
+ 'region': 'myregion',
+ 'subplatform': 'unknown',
+ 'system_platform':
+ 'Linux-5.4.0-24-generic-x86_64-with-Ubuntu-20.04-focal',
+ 'variant': 'ubuntu'},
+ 'ds': {
+ '_doc': EXPERIMENTAL_TEXT,
+ 'meta_data': {
+ 'availability_zone': 'myaz',
+ 'local-hostname': 'test-subclass-hostname',
+ 'region': 'myregion',
+ 'some': {
+ 'security-credentials':
+ {'cred1': 'sekret', 'cred2': 'othersekret'}}}}
+ }
+ self.assertCountEqual(expected, util.load_json(content))
+ file_stat = os.stat(sensitive_json_file)
+ self.assertEqual(0o600, stat.S_IMODE(file_stat.st_mode))
+ self.assertEqual(expected, util.load_json(content))
+
+ def test_get_data_handles_redacted_unserializable_content(self):
+ """get_data warns unserializable content in INSTANCE_JSON_FILE."""
+ tmp = self.tmp_dir()
+ datasource = DataSourceTestSubclassNet(
+ self.sys_cfg, self.distro, Paths({'run_dir': tmp}),
+ custom_metadata={'key1': 'val1', 'key2': {'key2.1': self.paths}})
+ datasource.get_data()
+ json_file = self.tmp_path(INSTANCE_JSON_FILE, tmp)
+ content = util.load_file(json_file)
+ expected_metadata = {
+ 'key1': 'val1',
+ 'key2': {
+ 'key2.1': "Warning: redacted unserializable type <class"
+ " 'cloudinit.helpers.Paths'>"}}
+ instance_json = util.load_json(content)
+ self.assertEqual(
+ expected_metadata, instance_json['ds']['meta_data'])
+
+ def test_persist_instance_data_writes_ec2_metadata_when_set(self):
+ """When ec2_metadata class attribute is set, persist to json."""
+ tmp = self.tmp_dir()
+ datasource = DataSourceTestSubclassNet(
+ self.sys_cfg, self.distro, Paths({'run_dir': tmp}))
+ datasource.ec2_metadata = UNSET
+ datasource.get_data()
+ json_file = self.tmp_path(INSTANCE_JSON_FILE, tmp)
+ instance_data = util.load_json(util.load_file(json_file))
+ self.assertNotIn('ec2_metadata', instance_data['ds'])
+ datasource.ec2_metadata = {'ec2stuff': 'is good'}
+ datasource.persist_instance_data()
+ instance_data = util.load_json(util.load_file(json_file))
+ self.assertEqual(
+ {'ec2stuff': 'is good'},
+ instance_data['ds']['ec2_metadata'])
+
+ def test_persist_instance_data_writes_network_json_when_set(self):
+ """When network_data.json class attribute is set, persist to json."""
+ tmp = self.tmp_dir()
+ datasource = DataSourceTestSubclassNet(
+ self.sys_cfg, self.distro, Paths({'run_dir': tmp}))
+ datasource.get_data()
+ json_file = self.tmp_path(INSTANCE_JSON_FILE, tmp)
+ instance_data = util.load_json(util.load_file(json_file))
+ self.assertNotIn('network_json', instance_data['ds'])
+ datasource.network_json = {'network_json': 'is good'}
+ datasource.persist_instance_data()
+ instance_data = util.load_json(util.load_file(json_file))
+ self.assertEqual(
+ {'network_json': 'is good'},
+ instance_data['ds']['network_json'])
+
+ def test_get_data_base64encodes_unserializable_bytes(self):
+ """On py3, get_data base64encodes any unserializable content."""
+ tmp = self.tmp_dir()
+ datasource = DataSourceTestSubclassNet(
+ self.sys_cfg, self.distro, Paths({'run_dir': tmp}),
+ custom_metadata={'key1': 'val1', 'key2': {'key2.1': b'\x123'}})
+ self.assertTrue(datasource.get_data())
+ json_file = self.tmp_path(INSTANCE_JSON_FILE, tmp)
+ content = util.load_file(json_file)
+ instance_json = util.load_json(content)
+ self.assertCountEqual(
+ ['ds/meta_data/key2/key2.1'],
+ instance_json['base64_encoded_keys'])
+ self.assertEqual(
+ {'key1': 'val1', 'key2': {'key2.1': 'EjM='}},
+ instance_json['ds']['meta_data'])
+
+ def test_get_hostname_subclass_support(self):
+ """Validate get_hostname signature on all subclasses of DataSource."""
+ base_args = inspect.getfullargspec(DataSource.get_hostname)
+ # Import all DataSource subclasses so we can inspect them.
+ modules = util.find_modules(os.path.dirname(os.path.dirname(__file__)))
+ for _loc, name in modules.items():
+ mod_locs, _ = importer.find_module(name, ['cloudinit.sources'], [])
+ if mod_locs:
+ importer.import_module(mod_locs[0])
+ for child in DataSource.__subclasses__():
+ if 'Test' in child.dsname:
+ continue
+ self.assertEqual(
+ base_args,
+ inspect.getfullargspec(child.get_hostname),
+ '%s does not implement DataSource.get_hostname params'
+ % child)
+ for grandchild in child.__subclasses__():
+ self.assertEqual(
+ base_args,
+ inspect.getfullargspec(grandchild.get_hostname),
+ '%s does not implement DataSource.get_hostname params'
+ % grandchild)
+
+ def test_clear_cached_attrs_resets_cached_attr_class_attributes(self):
+ """Class attributes listed in cached_attr_defaults are reset."""
+ count = 0
+ # Setup values for all cached class attributes
+ for attr, value in self.datasource.cached_attr_defaults:
+ setattr(self.datasource, attr, count)
+ count += 1
+ self.datasource._dirty_cache = True
+ self.datasource.clear_cached_attrs()
+ for attr, value in self.datasource.cached_attr_defaults:
+ self.assertEqual(value, getattr(self.datasource, attr))
+
+ def test_clear_cached_attrs_noops_on_clean_cache(self):
+ """Class attributes listed in cached_attr_defaults are reset."""
+ count = 0
+ # Setup values for all cached class attributes
+ for attr, _ in self.datasource.cached_attr_defaults:
+ setattr(self.datasource, attr, count)
+ count += 1
+ self.datasource._dirty_cache = False # Fake clean cache
+ self.datasource.clear_cached_attrs()
+ count = 0
+ for attr, _ in self.datasource.cached_attr_defaults:
+ self.assertEqual(count, getattr(self.datasource, attr))
+ count += 1
+
+ def test_clear_cached_attrs_skips_non_attr_class_attributes(self):
+ """Skip any cached_attr_defaults which aren't class attributes."""
+ self.datasource._dirty_cache = True
+ self.datasource.clear_cached_attrs()
+ for attr in ('ec2_metadata', 'network_json'):
+ self.assertFalse(hasattr(self.datasource, attr))
+
+ def test_clear_cached_attrs_of_custom_attrs(self):
+ """Custom attr_values can be passed to clear_cached_attrs."""
+ self.datasource._dirty_cache = True
+ cached_attr_name = self.datasource.cached_attr_defaults[0][0]
+ setattr(self.datasource, cached_attr_name, 'himom')
+ self.datasource.myattr = 'orig'
+ self.datasource.clear_cached_attrs(
+ attr_defaults=(('myattr', 'updated'),))
+ self.assertEqual('himom', getattr(self.datasource, cached_attr_name))
+ self.assertEqual('updated', self.datasource.myattr)
+
+ @mock.patch.dict(DataSource.default_update_events, {
+ EventScope.NETWORK: {EventType.BOOT_NEW_INSTANCE}})
+ @mock.patch.dict(DataSource.supported_update_events, {
+ EventScope.NETWORK: {EventType.BOOT_NEW_INSTANCE}})
+ def test_update_metadata_only_acts_on_supported_update_events(self):
+ """update_metadata_if_supported wont get_data on unsupported events."""
+ self.assertEqual(
+ {EventScope.NETWORK: set([EventType.BOOT_NEW_INSTANCE])},
+ self.datasource.default_update_events
+ )
+
+ def fake_get_data():
+ raise Exception('get_data should not be called')
+
+ self.datasource.get_data = fake_get_data
+ self.assertFalse(
+ self.datasource.update_metadata_if_supported(
+ source_event_types=[EventType.BOOT]))
+
+ @mock.patch.dict(DataSource.supported_update_events, {
+ EventScope.NETWORK: {EventType.BOOT_NEW_INSTANCE}})
+ def test_update_metadata_returns_true_on_supported_update_event(self):
+ """update_metadata_if_supported returns get_data on supported events"""
+ def fake_get_data():
+ return True
+
+ self.datasource.get_data = fake_get_data
+ self.datasource._network_config = 'something'
+ self.datasource._dirty_cache = True
+ self.assertTrue(
+ self.datasource.update_metadata_if_supported(
+ source_event_types=[
+ EventType.BOOT, EventType.BOOT_NEW_INSTANCE]))
+ self.assertEqual(UNSET, self.datasource._network_config)
+
+ self.assertIn(
+ "DEBUG: Update datasource metadata and network config due to"
+ " events: boot-new-instance",
+ self.logs.getvalue()
+ )
+
+
+class TestRedactSensitiveData(CiTestCase):
+
+ def test_redact_sensitive_data_noop_when_no_sensitive_keys_present(self):
+ """When sensitive_keys is absent or empty from metadata do nothing."""
+ md = {'my': 'data'}
+ self.assertEqual(
+ md, redact_sensitive_keys(md, redact_value='redacted'))
+ md['sensitive_keys'] = []
+ self.assertEqual(
+ md, redact_sensitive_keys(md, redact_value='redacted'))
+
+ def test_redact_sensitive_data_redacts_exact_match_name(self):
+ """Only exact matched sensitive_keys are redacted from metadata."""
+ md = {'sensitive_keys': ['md/secure'],
+ 'md': {'secure': 's3kr1t', 'insecure': 'publik'}}
+ secure_md = copy.deepcopy(md)
+ secure_md['md']['secure'] = 'redacted'
+ self.assertEqual(
+ secure_md,
+ redact_sensitive_keys(md, redact_value='redacted'))
+
+ def test_redact_sensitive_data_does_redacts_with_default_string(self):
+ """When redact_value is absent, REDACT_SENSITIVE_VALUE is used."""
+ md = {'sensitive_keys': ['md/secure'],
+ 'md': {'secure': 's3kr1t', 'insecure': 'publik'}}
+ secure_md = copy.deepcopy(md)
+ secure_md['md']['secure'] = 'redacted for non-root user'
+ self.assertEqual(
+ secure_md,
+ redact_sensitive_keys(md))
+
+
+class TestCanonicalCloudID(CiTestCase):
+
+ def test_cloud_id_returns_platform_on_unknowns(self):
+ """When region and cloud_name are unknown, return platform."""
+ self.assertEqual(
+ 'platform',
+ canonical_cloud_id(cloud_name=METADATA_UNKNOWN,
+ region=METADATA_UNKNOWN,
+ platform='platform'))
+
+ def test_cloud_id_returns_platform_on_none(self):
+ """When region and cloud_name are unknown, return platform."""
+ self.assertEqual(
+ 'platform',
+ canonical_cloud_id(cloud_name=None,
+ region=None,
+ platform='platform'))
+
+ def test_cloud_id_returns_cloud_name_on_unknown_region(self):
+ """When region is unknown, return cloud_name."""
+ for region in (None, METADATA_UNKNOWN):
+ self.assertEqual(
+ 'cloudname',
+ canonical_cloud_id(cloud_name='cloudname',
+ region=region,
+ platform='platform'))
+
+ def test_cloud_id_returns_platform_on_unknown_cloud_name(self):
+ """When region is set but cloud_name is unknown return cloud_name."""
+ self.assertEqual(
+ 'platform',
+ canonical_cloud_id(cloud_name=METADATA_UNKNOWN,
+ region='region',
+ platform='platform'))
+
+ def test_cloud_id_aws_based_on_region_and_cloud_name(self):
+ """When cloud_name is aws, return proper cloud-id based on region."""
+ self.assertEqual(
+ 'aws-china',
+ canonical_cloud_id(cloud_name='aws',
+ region='cn-north-1',
+ platform='platform'))
+ self.assertEqual(
+ 'aws',
+ canonical_cloud_id(cloud_name='aws',
+ region='us-east-1',
+ platform='platform'))
+ self.assertEqual(
+ 'aws-gov',
+ canonical_cloud_id(cloud_name='aws',
+ region='us-gov-1',
+ platform='platform'))
+ self.assertEqual( # Overrideen non-aws cloud_name is returned
+ '!aws',
+ canonical_cloud_id(cloud_name='!aws',
+ region='us-gov-1',
+ platform='platform'))
+
+ def test_cloud_id_azure_based_on_region_and_cloud_name(self):
+ """Report cloud-id when cloud_name is azure and region is in china."""
+ self.assertEqual(
+ 'azure-china',
+ canonical_cloud_id(cloud_name='azure',
+ region='chinaeast',
+ platform='platform'))
+ self.assertEqual(
+ 'azure',
+ canonical_cloud_id(cloud_name='azure',
+ region='!chinaeast',
+ platform='platform'))
+
+# vi: ts=4 expandtab