diff options
Diffstat (limited to 'tests/unittests/sources/test_init.py')
-rw-r--r-- | tests/unittests/sources/test_init.py | 771 |
1 files changed, 771 insertions, 0 deletions
diff --git a/tests/unittests/sources/test_init.py b/tests/unittests/sources/test_init.py new file mode 100644 index 00000000..a1d19518 --- /dev/null +++ b/tests/unittests/sources/test_init.py @@ -0,0 +1,771 @@ +# This file is part of cloud-init. See LICENSE file for license information. + +import copy +import inspect +import os +import stat + +from cloudinit.event import EventScope, EventType +from cloudinit.helpers import Paths +from cloudinit import importer +from cloudinit.sources import ( + EXPERIMENTAL_TEXT, INSTANCE_JSON_FILE, INSTANCE_JSON_SENSITIVE_FILE, + METADATA_UNKNOWN, REDACT_SENSITIVE_VALUE, UNSET, DataSource, + canonical_cloud_id, redact_sensitive_keys) +from tests.unittests.helpers import CiTestCase, mock +from cloudinit.user_data import UserDataProcessor +from cloudinit import util + + +class DataSourceTestSubclassNet(DataSource): + + dsname = 'MyTestSubclass' + url_max_wait = 55 + + def __init__(self, sys_cfg, distro, paths, custom_metadata=None, + custom_userdata=None, get_data_retval=True): + super(DataSourceTestSubclassNet, self).__init__( + sys_cfg, distro, paths) + self._custom_userdata = custom_userdata + self._custom_metadata = custom_metadata + self._get_data_retval = get_data_retval + + def _get_cloud_name(self): + return 'SubclassCloudName' + + def _get_data(self): + if self._custom_metadata: + self.metadata = self._custom_metadata + else: + self.metadata = {'availability_zone': 'myaz', + 'local-hostname': 'test-subclass-hostname', + 'region': 'myregion'} + if self._custom_userdata: + self.userdata_raw = self._custom_userdata + else: + self.userdata_raw = 'userdata_raw' + self.vendordata_raw = 'vendordata_raw' + return self._get_data_retval + + +class InvalidDataSourceTestSubclassNet(DataSource): + pass + + +class TestDataSource(CiTestCase): + + with_logs = True + maxDiff = None + + def setUp(self): + super(TestDataSource, self).setUp() + self.sys_cfg = {'datasource': {'_undef': {'key1': False}}} + self.distro = 'distrotest' # generally should be a Distro object + self.paths = Paths({}) + self.datasource = DataSource(self.sys_cfg, self.distro, self.paths) + + def test_datasource_init(self): + """DataSource initializes metadata attributes, ds_cfg and ud_proc.""" + self.assertEqual(self.paths, self.datasource.paths) + self.assertEqual(self.sys_cfg, self.datasource.sys_cfg) + self.assertEqual(self.distro, self.datasource.distro) + self.assertIsNone(self.datasource.userdata) + self.assertEqual({}, self.datasource.metadata) + self.assertIsNone(self.datasource.userdata_raw) + self.assertIsNone(self.datasource.vendordata) + self.assertIsNone(self.datasource.vendordata_raw) + self.assertEqual({'key1': False}, self.datasource.ds_cfg) + self.assertIsInstance(self.datasource.ud_proc, UserDataProcessor) + + def test_datasource_init_gets_ds_cfg_using_dsname(self): + """Init uses DataSource.dsname for sourcing ds_cfg.""" + sys_cfg = {'datasource': {'MyTestSubclass': {'key2': False}}} + distro = 'distrotest' # generally should be a Distro object + datasource = DataSourceTestSubclassNet(sys_cfg, distro, self.paths) + self.assertEqual({'key2': False}, datasource.ds_cfg) + + def test_str_is_classname(self): + """The string representation of the datasource is the classname.""" + self.assertEqual('DataSource', str(self.datasource)) + self.assertEqual( + 'DataSourceTestSubclassNet', + str(DataSourceTestSubclassNet('', '', self.paths))) + + def test_datasource_get_url_params_defaults(self): + """get_url_params default url config settings for the datasource.""" + params = self.datasource.get_url_params() + self.assertEqual(params.max_wait_seconds, self.datasource.url_max_wait) + self.assertEqual(params.timeout_seconds, self.datasource.url_timeout) + self.assertEqual(params.num_retries, self.datasource.url_retries) + self.assertEqual(params.sec_between_retries, + self.datasource.url_sec_between_retries) + + def test_datasource_get_url_params_subclassed(self): + """Subclasses can override get_url_params defaults.""" + sys_cfg = {'datasource': {'MyTestSubclass': {'key2': False}}} + distro = 'distrotest' # generally should be a Distro object + datasource = DataSourceTestSubclassNet(sys_cfg, distro, self.paths) + expected = (datasource.url_max_wait, datasource.url_timeout, + datasource.url_retries, datasource.url_sec_between_retries) + url_params = datasource.get_url_params() + self.assertNotEqual(self.datasource.get_url_params(), url_params) + self.assertEqual(expected, url_params) + + def test_datasource_get_url_params_ds_config_override(self): + """Datasource configuration options can override url param defaults.""" + sys_cfg = { + 'datasource': { + 'MyTestSubclass': { + 'max_wait': '1', 'timeout': '2', + 'retries': '3', 'sec_between_retries': 4 + }}} + datasource = DataSourceTestSubclassNet( + sys_cfg, self.distro, self.paths) + expected = (1, 2, 3, 4) + url_params = datasource.get_url_params() + self.assertNotEqual( + (datasource.url_max_wait, datasource.url_timeout, + datasource.url_retries, datasource.url_sec_between_retries), + url_params) + self.assertEqual(expected, url_params) + + def test_datasource_get_url_params_is_zero_or_greater(self): + """get_url_params ignores timeouts with a value below 0.""" + # Set an override that is below 0 which gets ignored. + sys_cfg = {'datasource': {'_undef': {'timeout': '-1'}}} + datasource = DataSource(sys_cfg, self.distro, self.paths) + (_max_wait, timeout, _retries, + _sec_between_retries) = datasource.get_url_params() + self.assertEqual(0, timeout) + + def test_datasource_get_url_uses_defaults_on_errors(self): + """On invalid system config values for url_params defaults are used.""" + # All invalid values should be logged + sys_cfg = {'datasource': { + '_undef': { + 'max_wait': 'nope', 'timeout': 'bug', 'retries': 'nonint'}}} + datasource = DataSource(sys_cfg, self.distro, self.paths) + url_params = datasource.get_url_params() + expected = (datasource.url_max_wait, datasource.url_timeout, + datasource.url_retries, datasource.url_sec_between_retries) + self.assertEqual(expected, url_params) + logs = self.logs.getvalue() + expected_logs = [ + "Config max_wait 'nope' is not an int, using default '-1'", + "Config timeout 'bug' is not an int, using default '10'", + "Config retries 'nonint' is not an int, using default '5'", + ] + for log in expected_logs: + self.assertIn(log, logs) + + @mock.patch('cloudinit.sources.net.find_fallback_nic') + def test_fallback_interface_is_discovered(self, m_get_fallback_nic): + """The fallback_interface is discovered via find_fallback_nic.""" + m_get_fallback_nic.return_value = 'nic9' + self.assertEqual('nic9', self.datasource.fallback_interface) + + @mock.patch('cloudinit.sources.net.find_fallback_nic') + def test_fallback_interface_logs_undiscovered(self, m_get_fallback_nic): + """Log a warning when fallback_interface can not discover the nic.""" + self.datasource._cloud_name = 'MySupahCloud' + m_get_fallback_nic.return_value = None # Couldn't discover nic + self.assertIsNone(self.datasource.fallback_interface) + self.assertEqual( + 'WARNING: Did not find a fallback interface on MySupahCloud.\n', + self.logs.getvalue()) + + @mock.patch('cloudinit.sources.net.find_fallback_nic') + def test_wb_fallback_interface_is_cached(self, m_get_fallback_nic): + """The fallback_interface is cached and won't be rediscovered.""" + self.datasource._fallback_interface = 'nic10' + self.assertEqual('nic10', self.datasource.fallback_interface) + m_get_fallback_nic.assert_not_called() + + def test__get_data_unimplemented(self): + """Raise an error when _get_data is not implemented.""" + with self.assertRaises(NotImplementedError) as context_manager: + self.datasource.get_data() + self.assertIn( + 'Subclasses of DataSource must implement _get_data', + str(context_manager.exception)) + datasource2 = InvalidDataSourceTestSubclassNet( + self.sys_cfg, self.distro, self.paths) + with self.assertRaises(NotImplementedError) as context_manager: + datasource2.get_data() + self.assertIn( + 'Subclasses of DataSource must implement _get_data', + str(context_manager.exception)) + + def test_get_data_calls_subclass__get_data(self): + """Datasource.get_data uses the subclass' version of _get_data.""" + tmp = self.tmp_dir() + datasource = DataSourceTestSubclassNet( + self.sys_cfg, self.distro, Paths({'run_dir': tmp})) + self.assertTrue(datasource.get_data()) + self.assertEqual( + {'availability_zone': 'myaz', + 'local-hostname': 'test-subclass-hostname', + 'region': 'myregion'}, + datasource.metadata) + self.assertEqual('userdata_raw', datasource.userdata_raw) + self.assertEqual('vendordata_raw', datasource.vendordata_raw) + + def test_get_hostname_strips_local_hostname_without_domain(self): + """Datasource.get_hostname strips metadata local-hostname of domain.""" + tmp = self.tmp_dir() + datasource = DataSourceTestSubclassNet( + self.sys_cfg, self.distro, Paths({'run_dir': tmp})) + self.assertTrue(datasource.get_data()) + self.assertEqual( + 'test-subclass-hostname', datasource.metadata['local-hostname']) + self.assertEqual('test-subclass-hostname', datasource.get_hostname()) + datasource.metadata['local-hostname'] = 'hostname.my.domain.com' + self.assertEqual('hostname', datasource.get_hostname()) + + def test_get_hostname_with_fqdn_returns_local_hostname_with_domain(self): + """Datasource.get_hostname with fqdn set gets qualified hostname.""" + tmp = self.tmp_dir() + datasource = DataSourceTestSubclassNet( + self.sys_cfg, self.distro, Paths({'run_dir': tmp})) + self.assertTrue(datasource.get_data()) + datasource.metadata['local-hostname'] = 'hostname.my.domain.com' + self.assertEqual( + 'hostname.my.domain.com', datasource.get_hostname(fqdn=True)) + + def test_get_hostname_without_metadata_uses_system_hostname(self): + """Datasource.gethostname runs util.get_hostname when no metadata.""" + tmp = self.tmp_dir() + datasource = DataSourceTestSubclassNet( + self.sys_cfg, self.distro, Paths({'run_dir': tmp})) + self.assertEqual({}, datasource.metadata) + mock_fqdn = 'cloudinit.sources.util.get_fqdn_from_hosts' + with mock.patch('cloudinit.sources.util.get_hostname') as m_gethost: + with mock.patch(mock_fqdn) as m_fqdn: + m_gethost.return_value = 'systemhostname.domain.com' + m_fqdn.return_value = None # No maching fqdn in /etc/hosts + self.assertEqual('systemhostname', datasource.get_hostname()) + self.assertEqual( + 'systemhostname.domain.com', + datasource.get_hostname(fqdn=True)) + + def test_get_hostname_without_metadata_returns_none(self): + """Datasource.gethostname returns None when metadata_only and no MD.""" + tmp = self.tmp_dir() + datasource = DataSourceTestSubclassNet( + self.sys_cfg, self.distro, Paths({'run_dir': tmp})) + self.assertEqual({}, datasource.metadata) + mock_fqdn = 'cloudinit.sources.util.get_fqdn_from_hosts' + with mock.patch('cloudinit.sources.util.get_hostname') as m_gethost: + with mock.patch(mock_fqdn) as m_fqdn: + self.assertIsNone(datasource.get_hostname(metadata_only=True)) + self.assertIsNone( + datasource.get_hostname(fqdn=True, metadata_only=True)) + self.assertEqual([], m_gethost.call_args_list) + self.assertEqual([], m_fqdn.call_args_list) + + def test_get_hostname_without_metadata_prefers_etc_hosts(self): + """Datasource.gethostname prefers /etc/hosts to util.get_hostname.""" + tmp = self.tmp_dir() + datasource = DataSourceTestSubclassNet( + self.sys_cfg, self.distro, Paths({'run_dir': tmp})) + self.assertEqual({}, datasource.metadata) + mock_fqdn = 'cloudinit.sources.util.get_fqdn_from_hosts' + with mock.patch('cloudinit.sources.util.get_hostname') as m_gethost: + with mock.patch(mock_fqdn) as m_fqdn: + m_gethost.return_value = 'systemhostname.domain.com' + m_fqdn.return_value = 'fqdnhostname.domain.com' + self.assertEqual('fqdnhostname', datasource.get_hostname()) + self.assertEqual('fqdnhostname.domain.com', + datasource.get_hostname(fqdn=True)) + + def test_get_data_does_not_write_instance_data_on_failure(self): + """get_data does not write INSTANCE_JSON_FILE on get_data False.""" + tmp = self.tmp_dir() + datasource = DataSourceTestSubclassNet( + self.sys_cfg, self.distro, Paths({'run_dir': tmp}), + get_data_retval=False) + self.assertFalse(datasource.get_data()) + json_file = self.tmp_path(INSTANCE_JSON_FILE, tmp) + self.assertFalse( + os.path.exists(json_file), 'Found unexpected file %s' % json_file) + + def test_get_data_writes_json_instance_data_on_success(self): + """get_data writes INSTANCE_JSON_FILE to run_dir as world readable.""" + tmp = self.tmp_dir() + datasource = DataSourceTestSubclassNet( + self.sys_cfg, self.distro, Paths({'run_dir': tmp})) + sys_info = { + "python": "3.7", + "platform": + "Linux-5.4.0-24-generic-x86_64-with-Ubuntu-20.04-focal", + "uname": ["Linux", "myhost", "5.4.0-24-generic", "SMP blah", + "x86_64"], + "variant": "ubuntu", "dist": ["ubuntu", "20.04", "focal"]} + with mock.patch("cloudinit.util.system_info", return_value=sys_info): + datasource.get_data() + json_file = self.tmp_path(INSTANCE_JSON_FILE, tmp) + content = util.load_file(json_file) + expected = { + 'base64_encoded_keys': [], + 'merged_cfg': REDACT_SENSITIVE_VALUE, + 'sensitive_keys': ['merged_cfg'], + 'sys_info': sys_info, + 'v1': { + '_beta_keys': ['subplatform'], + 'availability-zone': 'myaz', + 'availability_zone': 'myaz', + 'cloud-name': 'subclasscloudname', + 'cloud_name': 'subclasscloudname', + 'distro': 'ubuntu', + 'distro_release': 'focal', + 'distro_version': '20.04', + 'instance-id': 'iid-datasource', + 'instance_id': 'iid-datasource', + 'local-hostname': 'test-subclass-hostname', + 'local_hostname': 'test-subclass-hostname', + 'kernel_release': '5.4.0-24-generic', + 'machine': 'x86_64', + 'platform': 'mytestsubclass', + 'public_ssh_keys': [], + 'python_version': '3.7', + 'region': 'myregion', + 'system_platform': + 'Linux-5.4.0-24-generic-x86_64-with-Ubuntu-20.04-focal', + 'subplatform': 'unknown', + 'variant': 'ubuntu'}, + 'ds': { + + '_doc': EXPERIMENTAL_TEXT, + 'meta_data': {'availability_zone': 'myaz', + 'local-hostname': 'test-subclass-hostname', + 'region': 'myregion'}}} + self.assertEqual(expected, util.load_json(content)) + file_stat = os.stat(json_file) + self.assertEqual(0o644, stat.S_IMODE(file_stat.st_mode)) + self.assertEqual(expected, util.load_json(content)) + + def test_get_data_writes_redacted_public_json_instance_data(self): + """get_data writes redacted content to public INSTANCE_JSON_FILE.""" + tmp = self.tmp_dir() + datasource = DataSourceTestSubclassNet( + self.sys_cfg, self.distro, Paths({'run_dir': tmp}), + custom_metadata={ + 'availability_zone': 'myaz', + 'local-hostname': 'test-subclass-hostname', + 'region': 'myregion', + 'some': {'security-credentials': { + 'cred1': 'sekret', 'cred2': 'othersekret'}}}) + self.assertCountEqual( + ('merged_cfg', 'security-credentials',), + datasource.sensitive_metadata_keys) + sys_info = { + "python": "3.7", + "platform": + "Linux-5.4.0-24-generic-x86_64-with-Ubuntu-20.04-focal", + "uname": ["Linux", "myhost", "5.4.0-24-generic", "SMP blah", + "x86_64"], + "variant": "ubuntu", "dist": ["ubuntu", "20.04", "focal"]} + with mock.patch("cloudinit.util.system_info", return_value=sys_info): + datasource.get_data() + json_file = self.tmp_path(INSTANCE_JSON_FILE, tmp) + redacted = util.load_json(util.load_file(json_file)) + expected = { + 'base64_encoded_keys': [], + 'merged_cfg': REDACT_SENSITIVE_VALUE, + 'sensitive_keys': [ + 'ds/meta_data/some/security-credentials', 'merged_cfg'], + 'sys_info': sys_info, + 'v1': { + '_beta_keys': ['subplatform'], + 'availability-zone': 'myaz', + 'availability_zone': 'myaz', + 'cloud-name': 'subclasscloudname', + 'cloud_name': 'subclasscloudname', + 'distro': 'ubuntu', + 'distro_release': 'focal', + 'distro_version': '20.04', + 'instance-id': 'iid-datasource', + 'instance_id': 'iid-datasource', + 'local-hostname': 'test-subclass-hostname', + 'local_hostname': 'test-subclass-hostname', + 'kernel_release': '5.4.0-24-generic', + 'machine': 'x86_64', + 'platform': 'mytestsubclass', + 'public_ssh_keys': [], + 'python_version': '3.7', + 'region': 'myregion', + 'system_platform': + 'Linux-5.4.0-24-generic-x86_64-with-Ubuntu-20.04-focal', + 'subplatform': 'unknown', + 'variant': 'ubuntu'}, + 'ds': { + '_doc': EXPERIMENTAL_TEXT, + 'meta_data': { + 'availability_zone': 'myaz', + 'local-hostname': 'test-subclass-hostname', + 'region': 'myregion', + 'some': {'security-credentials': REDACT_SENSITIVE_VALUE}}} + } + self.assertCountEqual(expected, redacted) + file_stat = os.stat(json_file) + self.assertEqual(0o644, stat.S_IMODE(file_stat.st_mode)) + + def test_get_data_writes_json_instance_data_sensitive(self): + """ + get_data writes unmodified data to sensitive file as root-readonly. + """ + tmp = self.tmp_dir() + datasource = DataSourceTestSubclassNet( + self.sys_cfg, self.distro, Paths({'run_dir': tmp}), + custom_metadata={ + 'availability_zone': 'myaz', + 'local-hostname': 'test-subclass-hostname', + 'region': 'myregion', + 'some': {'security-credentials': { + 'cred1': 'sekret', 'cred2': 'othersekret'}}}) + sys_info = { + "python": "3.7", + "platform": + "Linux-5.4.0-24-generic-x86_64-with-Ubuntu-20.04-focal", + "uname": ["Linux", "myhost", "5.4.0-24-generic", "SMP blah", + "x86_64"], + "variant": "ubuntu", "dist": ["ubuntu", "20.04", "focal"]} + + self.assertCountEqual( + ('merged_cfg', 'security-credentials',), + datasource.sensitive_metadata_keys) + with mock.patch("cloudinit.util.system_info", return_value=sys_info): + datasource.get_data() + sensitive_json_file = self.tmp_path(INSTANCE_JSON_SENSITIVE_FILE, tmp) + content = util.load_file(sensitive_json_file) + expected = { + 'base64_encoded_keys': [], + 'merged_cfg': { + '_doc': ( + 'Merged cloud-init system config from ' + '/etc/cloud/cloud.cfg and /etc/cloud/cloud.cfg.d/' + ), + 'datasource': {'_undef': {'key1': False}}}, + 'sensitive_keys': [ + 'ds/meta_data/some/security-credentials', 'merged_cfg'], + 'sys_info': sys_info, + 'v1': { + '_beta_keys': ['subplatform'], + 'availability-zone': 'myaz', + 'availability_zone': 'myaz', + 'cloud-name': 'subclasscloudname', + 'cloud_name': 'subclasscloudname', + 'distro': 'ubuntu', + 'distro_release': 'focal', + 'distro_version': '20.04', + 'instance-id': 'iid-datasource', + 'instance_id': 'iid-datasource', + 'kernel_release': '5.4.0-24-generic', + 'local-hostname': 'test-subclass-hostname', + 'local_hostname': 'test-subclass-hostname', + 'machine': 'x86_64', + 'platform': 'mytestsubclass', + 'public_ssh_keys': [], + 'python_version': '3.7', + 'region': 'myregion', + 'subplatform': 'unknown', + 'system_platform': + 'Linux-5.4.0-24-generic-x86_64-with-Ubuntu-20.04-focal', + 'variant': 'ubuntu'}, + 'ds': { + '_doc': EXPERIMENTAL_TEXT, + 'meta_data': { + 'availability_zone': 'myaz', + 'local-hostname': 'test-subclass-hostname', + 'region': 'myregion', + 'some': { + 'security-credentials': + {'cred1': 'sekret', 'cred2': 'othersekret'}}}} + } + self.assertCountEqual(expected, util.load_json(content)) + file_stat = os.stat(sensitive_json_file) + self.assertEqual(0o600, stat.S_IMODE(file_stat.st_mode)) + self.assertEqual(expected, util.load_json(content)) + + def test_get_data_handles_redacted_unserializable_content(self): + """get_data warns unserializable content in INSTANCE_JSON_FILE.""" + tmp = self.tmp_dir() + datasource = DataSourceTestSubclassNet( + self.sys_cfg, self.distro, Paths({'run_dir': tmp}), + custom_metadata={'key1': 'val1', 'key2': {'key2.1': self.paths}}) + datasource.get_data() + json_file = self.tmp_path(INSTANCE_JSON_FILE, tmp) + content = util.load_file(json_file) + expected_metadata = { + 'key1': 'val1', + 'key2': { + 'key2.1': "Warning: redacted unserializable type <class" + " 'cloudinit.helpers.Paths'>"}} + instance_json = util.load_json(content) + self.assertEqual( + expected_metadata, instance_json['ds']['meta_data']) + + def test_persist_instance_data_writes_ec2_metadata_when_set(self): + """When ec2_metadata class attribute is set, persist to json.""" + tmp = self.tmp_dir() + datasource = DataSourceTestSubclassNet( + self.sys_cfg, self.distro, Paths({'run_dir': tmp})) + datasource.ec2_metadata = UNSET + datasource.get_data() + json_file = self.tmp_path(INSTANCE_JSON_FILE, tmp) + instance_data = util.load_json(util.load_file(json_file)) + self.assertNotIn('ec2_metadata', instance_data['ds']) + datasource.ec2_metadata = {'ec2stuff': 'is good'} + datasource.persist_instance_data() + instance_data = util.load_json(util.load_file(json_file)) + self.assertEqual( + {'ec2stuff': 'is good'}, + instance_data['ds']['ec2_metadata']) + + def test_persist_instance_data_writes_network_json_when_set(self): + """When network_data.json class attribute is set, persist to json.""" + tmp = self.tmp_dir() + datasource = DataSourceTestSubclassNet( + self.sys_cfg, self.distro, Paths({'run_dir': tmp})) + datasource.get_data() + json_file = self.tmp_path(INSTANCE_JSON_FILE, tmp) + instance_data = util.load_json(util.load_file(json_file)) + self.assertNotIn('network_json', instance_data['ds']) + datasource.network_json = {'network_json': 'is good'} + datasource.persist_instance_data() + instance_data = util.load_json(util.load_file(json_file)) + self.assertEqual( + {'network_json': 'is good'}, + instance_data['ds']['network_json']) + + def test_get_data_base64encodes_unserializable_bytes(self): + """On py3, get_data base64encodes any unserializable content.""" + tmp = self.tmp_dir() + datasource = DataSourceTestSubclassNet( + self.sys_cfg, self.distro, Paths({'run_dir': tmp}), + custom_metadata={'key1': 'val1', 'key2': {'key2.1': b'\x123'}}) + self.assertTrue(datasource.get_data()) + json_file = self.tmp_path(INSTANCE_JSON_FILE, tmp) + content = util.load_file(json_file) + instance_json = util.load_json(content) + self.assertCountEqual( + ['ds/meta_data/key2/key2.1'], + instance_json['base64_encoded_keys']) + self.assertEqual( + {'key1': 'val1', 'key2': {'key2.1': 'EjM='}}, + instance_json['ds']['meta_data']) + + def test_get_hostname_subclass_support(self): + """Validate get_hostname signature on all subclasses of DataSource.""" + base_args = inspect.getfullargspec(DataSource.get_hostname) + # Import all DataSource subclasses so we can inspect them. + modules = util.find_modules(os.path.dirname(os.path.dirname(__file__))) + for _loc, name in modules.items(): + mod_locs, _ = importer.find_module(name, ['cloudinit.sources'], []) + if mod_locs: + importer.import_module(mod_locs[0]) + for child in DataSource.__subclasses__(): + if 'Test' in child.dsname: + continue + self.assertEqual( + base_args, + inspect.getfullargspec(child.get_hostname), + '%s does not implement DataSource.get_hostname params' + % child) + for grandchild in child.__subclasses__(): + self.assertEqual( + base_args, + inspect.getfullargspec(grandchild.get_hostname), + '%s does not implement DataSource.get_hostname params' + % grandchild) + + def test_clear_cached_attrs_resets_cached_attr_class_attributes(self): + """Class attributes listed in cached_attr_defaults are reset.""" + count = 0 + # Setup values for all cached class attributes + for attr, value in self.datasource.cached_attr_defaults: + setattr(self.datasource, attr, count) + count += 1 + self.datasource._dirty_cache = True + self.datasource.clear_cached_attrs() + for attr, value in self.datasource.cached_attr_defaults: + self.assertEqual(value, getattr(self.datasource, attr)) + + def test_clear_cached_attrs_noops_on_clean_cache(self): + """Class attributes listed in cached_attr_defaults are reset.""" + count = 0 + # Setup values for all cached class attributes + for attr, _ in self.datasource.cached_attr_defaults: + setattr(self.datasource, attr, count) + count += 1 + self.datasource._dirty_cache = False # Fake clean cache + self.datasource.clear_cached_attrs() + count = 0 + for attr, _ in self.datasource.cached_attr_defaults: + self.assertEqual(count, getattr(self.datasource, attr)) + count += 1 + + def test_clear_cached_attrs_skips_non_attr_class_attributes(self): + """Skip any cached_attr_defaults which aren't class attributes.""" + self.datasource._dirty_cache = True + self.datasource.clear_cached_attrs() + for attr in ('ec2_metadata', 'network_json'): + self.assertFalse(hasattr(self.datasource, attr)) + + def test_clear_cached_attrs_of_custom_attrs(self): + """Custom attr_values can be passed to clear_cached_attrs.""" + self.datasource._dirty_cache = True + cached_attr_name = self.datasource.cached_attr_defaults[0][0] + setattr(self.datasource, cached_attr_name, 'himom') + self.datasource.myattr = 'orig' + self.datasource.clear_cached_attrs( + attr_defaults=(('myattr', 'updated'),)) + self.assertEqual('himom', getattr(self.datasource, cached_attr_name)) + self.assertEqual('updated', self.datasource.myattr) + + @mock.patch.dict(DataSource.default_update_events, { + EventScope.NETWORK: {EventType.BOOT_NEW_INSTANCE}}) + @mock.patch.dict(DataSource.supported_update_events, { + EventScope.NETWORK: {EventType.BOOT_NEW_INSTANCE}}) + def test_update_metadata_only_acts_on_supported_update_events(self): + """update_metadata_if_supported wont get_data on unsupported events.""" + self.assertEqual( + {EventScope.NETWORK: set([EventType.BOOT_NEW_INSTANCE])}, + self.datasource.default_update_events + ) + + def fake_get_data(): + raise Exception('get_data should not be called') + + self.datasource.get_data = fake_get_data + self.assertFalse( + self.datasource.update_metadata_if_supported( + source_event_types=[EventType.BOOT])) + + @mock.patch.dict(DataSource.supported_update_events, { + EventScope.NETWORK: {EventType.BOOT_NEW_INSTANCE}}) + def test_update_metadata_returns_true_on_supported_update_event(self): + """update_metadata_if_supported returns get_data on supported events""" + def fake_get_data(): + return True + + self.datasource.get_data = fake_get_data + self.datasource._network_config = 'something' + self.datasource._dirty_cache = True + self.assertTrue( + self.datasource.update_metadata_if_supported( + source_event_types=[ + EventType.BOOT, EventType.BOOT_NEW_INSTANCE])) + self.assertEqual(UNSET, self.datasource._network_config) + + self.assertIn( + "DEBUG: Update datasource metadata and network config due to" + " events: boot-new-instance", + self.logs.getvalue() + ) + + +class TestRedactSensitiveData(CiTestCase): + + def test_redact_sensitive_data_noop_when_no_sensitive_keys_present(self): + """When sensitive_keys is absent or empty from metadata do nothing.""" + md = {'my': 'data'} + self.assertEqual( + md, redact_sensitive_keys(md, redact_value='redacted')) + md['sensitive_keys'] = [] + self.assertEqual( + md, redact_sensitive_keys(md, redact_value='redacted')) + + def test_redact_sensitive_data_redacts_exact_match_name(self): + """Only exact matched sensitive_keys are redacted from metadata.""" + md = {'sensitive_keys': ['md/secure'], + 'md': {'secure': 's3kr1t', 'insecure': 'publik'}} + secure_md = copy.deepcopy(md) + secure_md['md']['secure'] = 'redacted' + self.assertEqual( + secure_md, + redact_sensitive_keys(md, redact_value='redacted')) + + def test_redact_sensitive_data_does_redacts_with_default_string(self): + """When redact_value is absent, REDACT_SENSITIVE_VALUE is used.""" + md = {'sensitive_keys': ['md/secure'], + 'md': {'secure': 's3kr1t', 'insecure': 'publik'}} + secure_md = copy.deepcopy(md) + secure_md['md']['secure'] = 'redacted for non-root user' + self.assertEqual( + secure_md, + redact_sensitive_keys(md)) + + +class TestCanonicalCloudID(CiTestCase): + + def test_cloud_id_returns_platform_on_unknowns(self): + """When region and cloud_name are unknown, return platform.""" + self.assertEqual( + 'platform', + canonical_cloud_id(cloud_name=METADATA_UNKNOWN, + region=METADATA_UNKNOWN, + platform='platform')) + + def test_cloud_id_returns_platform_on_none(self): + """When region and cloud_name are unknown, return platform.""" + self.assertEqual( + 'platform', + canonical_cloud_id(cloud_name=None, + region=None, + platform='platform')) + + def test_cloud_id_returns_cloud_name_on_unknown_region(self): + """When region is unknown, return cloud_name.""" + for region in (None, METADATA_UNKNOWN): + self.assertEqual( + 'cloudname', + canonical_cloud_id(cloud_name='cloudname', + region=region, + platform='platform')) + + def test_cloud_id_returns_platform_on_unknown_cloud_name(self): + """When region is set but cloud_name is unknown return cloud_name.""" + self.assertEqual( + 'platform', + canonical_cloud_id(cloud_name=METADATA_UNKNOWN, + region='region', + platform='platform')) + + def test_cloud_id_aws_based_on_region_and_cloud_name(self): + """When cloud_name is aws, return proper cloud-id based on region.""" + self.assertEqual( + 'aws-china', + canonical_cloud_id(cloud_name='aws', + region='cn-north-1', + platform='platform')) + self.assertEqual( + 'aws', + canonical_cloud_id(cloud_name='aws', + region='us-east-1', + platform='platform')) + self.assertEqual( + 'aws-gov', + canonical_cloud_id(cloud_name='aws', + region='us-gov-1', + platform='platform')) + self.assertEqual( # Overrideen non-aws cloud_name is returned + '!aws', + canonical_cloud_id(cloud_name='!aws', + region='us-gov-1', + platform='platform')) + + def test_cloud_id_azure_based_on_region_and_cloud_name(self): + """Report cloud-id when cloud_name is azure and region is in china.""" + self.assertEqual( + 'azure-china', + canonical_cloud_id(cloud_name='azure', + region='chinaeast', + platform='platform')) + self.assertEqual( + 'azure', + canonical_cloud_id(cloud_name='azure', + region='!chinaeast', + platform='platform')) + +# vi: ts=4 expandtab |