summaryrefslogtreecommitdiff
path: root/cloudinit/sources/tests/test_init.py
diff options
context:
space:
mode:
authorChad Smith <chad.smith@canonical.com>2018-10-03 12:10:23 -0600
committerChad Smith <chad.smith@canonical.com>2018-10-03 12:10:23 -0600
commitd6347e1c439eda7f43d9620dac2b461e980e1ae9 (patch)
tree08410263488d11a2a29edcc620575ed1b028100e /cloudinit/sources/tests/test_init.py
parent564793a76b9c9add1ee81bab4919c8dccd45a33d (diff)
parente28000457591bde9f22d6b7a538b1fc33349d780 (diff)
downloadvyos-cloud-init-d6347e1c439eda7f43d9620dac2b461e980e1ae9.tar.gz
vyos-cloud-init-d6347e1c439eda7f43d9620dac2b461e980e1ae9.zip
merge from master at 18.4
Diffstat (limited to 'cloudinit/sources/tests/test_init.py')
-rw-r--r--cloudinit/sources/tests/test_init.py192
1 files changed, 165 insertions, 27 deletions
diff --git a/cloudinit/sources/tests/test_init.py b/cloudinit/sources/tests/test_init.py
index dcd221be..8082019e 100644
--- a/cloudinit/sources/tests/test_init.py
+++ b/cloudinit/sources/tests/test_init.py
@@ -1,5 +1,6 @@
# This file is part of cloud-init. See LICENSE file for license information.
+import copy
import inspect
import os
import six
@@ -9,7 +10,8 @@ from cloudinit.event import EventType
from cloudinit.helpers import Paths
from cloudinit import importer
from cloudinit.sources import (
- INSTANCE_JSON_FILE, DataSource, UNSET)
+ EXPERIMENTAL_TEXT, INSTANCE_JSON_FILE, INSTANCE_JSON_SENSITIVE_FILE,
+ REDACT_SENSITIVE_VALUE, UNSET, DataSource, redact_sensitive_keys)
from cloudinit.tests.helpers import CiTestCase, skipIf, mock
from cloudinit.user_data import UserDataProcessor
from cloudinit import util
@@ -20,24 +22,30 @@ class DataSourceTestSubclassNet(DataSource):
dsname = 'MyTestSubclass'
url_max_wait = 55
- def __init__(self, sys_cfg, distro, paths, custom_userdata=None):
+ def __init__(self, sys_cfg, distro, paths, custom_metadata=None,
+ custom_userdata=None, get_data_retval=True):
super(DataSourceTestSubclassNet, self).__init__(
sys_cfg, distro, paths)
self._custom_userdata = custom_userdata
+ self._custom_metadata = custom_metadata
+ self._get_data_retval = get_data_retval
def _get_cloud_name(self):
return 'SubclassCloudName'
def _get_data(self):
- self.metadata = {'availability_zone': 'myaz',
- 'local-hostname': 'test-subclass-hostname',
- 'region': 'myregion'}
+ if self._custom_metadata:
+ self.metadata = self._custom_metadata
+ else:
+ self.metadata = {'availability_zone': 'myaz',
+ 'local-hostname': 'test-subclass-hostname',
+ 'region': 'myregion'}
if self._custom_userdata:
self.userdata_raw = self._custom_userdata
else:
self.userdata_raw = 'userdata_raw'
self.vendordata_raw = 'vendordata_raw'
- return True
+ return self._get_data_retval
class InvalidDataSourceTestSubclassNet(DataSource):
@@ -264,8 +272,19 @@ class TestDataSource(CiTestCase):
self.assertEqual('fqdnhostname.domain.com',
datasource.get_hostname(fqdn=True))
- def test_get_data_write_json_instance_data(self):
- """get_data writes INSTANCE_JSON_FILE to run_dir as readonly root."""
+ def test_get_data_does_not_write_instance_data_on_failure(self):
+ """get_data does not write INSTANCE_JSON_FILE on get_data False."""
+ tmp = self.tmp_dir()
+ datasource = DataSourceTestSubclassNet(
+ self.sys_cfg, self.distro, Paths({'run_dir': tmp}),
+ get_data_retval=False)
+ self.assertFalse(datasource.get_data())
+ json_file = self.tmp_path(INSTANCE_JSON_FILE, tmp)
+ self.assertFalse(
+ os.path.exists(json_file), 'Found unexpected file %s' % json_file)
+
+ def test_get_data_writes_json_instance_data_on_success(self):
+ """get_data writes INSTANCE_JSON_FILE to run_dir as world readable."""
tmp = self.tmp_dir()
datasource = DataSourceTestSubclassNet(
self.sys_cfg, self.distro, Paths({'run_dir': tmp}))
@@ -273,40 +292,126 @@ class TestDataSource(CiTestCase):
json_file = self.tmp_path(INSTANCE_JSON_FILE, tmp)
content = util.load_file(json_file)
expected = {
- 'base64-encoded-keys': [],
+ 'base64_encoded_keys': [],
+ 'sensitive_keys': [],
'v1': {
'availability-zone': 'myaz',
+ 'availability_zone': 'myaz',
'cloud-name': 'subclasscloudname',
+ 'cloud_name': 'subclasscloudname',
'instance-id': 'iid-datasource',
+ 'instance_id': 'iid-datasource',
'local-hostname': 'test-subclass-hostname',
+ 'local_hostname': 'test-subclass-hostname',
'region': 'myregion'},
'ds': {
- 'meta-data': {'availability_zone': 'myaz',
+ '_doc': EXPERIMENTAL_TEXT,
+ 'meta_data': {'availability_zone': 'myaz',
'local-hostname': 'test-subclass-hostname',
- 'region': 'myregion'},
- 'user-data': 'userdata_raw',
- 'vendor-data': 'vendordata_raw'}}
+ 'region': 'myregion'}}}
self.assertEqual(expected, util.load_json(content))
file_stat = os.stat(json_file)
+ self.assertEqual(0o644, stat.S_IMODE(file_stat.st_mode))
+ self.assertEqual(expected, util.load_json(content))
+
+ def test_get_data_writes_json_instance_data_sensitive(self):
+ """get_data writes INSTANCE_JSON_SENSITIVE_FILE as readonly root."""
+ tmp = self.tmp_dir()
+ datasource = DataSourceTestSubclassNet(
+ self.sys_cfg, self.distro, Paths({'run_dir': tmp}),
+ custom_metadata={
+ 'availability_zone': 'myaz',
+ 'local-hostname': 'test-subclass-hostname',
+ 'region': 'myregion',
+ 'some': {'security-credentials': {
+ 'cred1': 'sekret', 'cred2': 'othersekret'}}})
+ self.assertEqual(
+ ('security-credentials',), datasource.sensitive_metadata_keys)
+ datasource.get_data()
+ json_file = self.tmp_path(INSTANCE_JSON_FILE, tmp)
+ sensitive_json_file = self.tmp_path(INSTANCE_JSON_SENSITIVE_FILE, tmp)
+ redacted = util.load_json(util.load_file(json_file))
+ self.assertEqual(
+ {'cred1': 'sekret', 'cred2': 'othersekret'},
+ redacted['ds']['meta_data']['some']['security-credentials'])
+ content = util.load_file(sensitive_json_file)
+ expected = {
+ 'base64_encoded_keys': [],
+ 'sensitive_keys': ['ds/meta_data/some/security-credentials'],
+ 'v1': {
+ 'availability-zone': 'myaz',
+ 'availability_zone': 'myaz',
+ 'cloud-name': 'subclasscloudname',
+ 'cloud_name': 'subclasscloudname',
+ 'instance-id': 'iid-datasource',
+ 'instance_id': 'iid-datasource',
+ 'local-hostname': 'test-subclass-hostname',
+ 'local_hostname': 'test-subclass-hostname',
+ 'region': 'myregion'},
+ 'ds': {
+ '_doc': EXPERIMENTAL_TEXT,
+ 'meta_data': {
+ 'availability_zone': 'myaz',
+ 'local-hostname': 'test-subclass-hostname',
+ 'region': 'myregion',
+ 'some': {'security-credentials': REDACT_SENSITIVE_VALUE}}}
+ }
+ self.maxDiff = None
+ self.assertEqual(expected, util.load_json(content))
+ file_stat = os.stat(sensitive_json_file)
self.assertEqual(0o600, stat.S_IMODE(file_stat.st_mode))
+ self.assertEqual(expected, util.load_json(content))
def test_get_data_handles_redacted_unserializable_content(self):
"""get_data warns unserializable content in INSTANCE_JSON_FILE."""
tmp = self.tmp_dir()
datasource = DataSourceTestSubclassNet(
self.sys_cfg, self.distro, Paths({'run_dir': tmp}),
- custom_userdata={'key1': 'val1', 'key2': {'key2.1': self.paths}})
- self.assertTrue(datasource.get_data())
+ custom_metadata={'key1': 'val1', 'key2': {'key2.1': self.paths}})
+ datasource.get_data()
json_file = self.tmp_path(INSTANCE_JSON_FILE, tmp)
content = util.load_file(json_file)
- expected_userdata = {
+ expected_metadata = {
'key1': 'val1',
'key2': {
'key2.1': "Warning: redacted unserializable type <class"
" 'cloudinit.helpers.Paths'>"}}
instance_json = util.load_json(content)
self.assertEqual(
- expected_userdata, instance_json['ds']['user-data'])
+ expected_metadata, instance_json['ds']['meta_data'])
+
+ def test_persist_instance_data_writes_ec2_metadata_when_set(self):
+ """When ec2_metadata class attribute is set, persist to json."""
+ tmp = self.tmp_dir()
+ datasource = DataSourceTestSubclassNet(
+ self.sys_cfg, self.distro, Paths({'run_dir': tmp}))
+ datasource.ec2_metadata = UNSET
+ datasource.get_data()
+ json_file = self.tmp_path(INSTANCE_JSON_FILE, tmp)
+ instance_data = util.load_json(util.load_file(json_file))
+ self.assertNotIn('ec2_metadata', instance_data['ds'])
+ datasource.ec2_metadata = {'ec2stuff': 'is good'}
+ datasource.persist_instance_data()
+ instance_data = util.load_json(util.load_file(json_file))
+ self.assertEqual(
+ {'ec2stuff': 'is good'},
+ instance_data['ds']['ec2_metadata'])
+
+ def test_persist_instance_data_writes_network_json_when_set(self):
+ """When network_data.json class attribute is set, persist to json."""
+ tmp = self.tmp_dir()
+ datasource = DataSourceTestSubclassNet(
+ self.sys_cfg, self.distro, Paths({'run_dir': tmp}))
+ datasource.get_data()
+ json_file = self.tmp_path(INSTANCE_JSON_FILE, tmp)
+ instance_data = util.load_json(util.load_file(json_file))
+ self.assertNotIn('network_json', instance_data['ds'])
+ datasource.network_json = {'network_json': 'is good'}
+ datasource.persist_instance_data()
+ instance_data = util.load_json(util.load_file(json_file))
+ self.assertEqual(
+ {'network_json': 'is good'},
+ instance_data['ds']['network_json'])
@skipIf(not six.PY3, "json serialization on <= py2.7 handles bytes")
def test_get_data_base64encodes_unserializable_bytes(self):
@@ -314,17 +419,17 @@ class TestDataSource(CiTestCase):
tmp = self.tmp_dir()
datasource = DataSourceTestSubclassNet(
self.sys_cfg, self.distro, Paths({'run_dir': tmp}),
- custom_userdata={'key1': 'val1', 'key2': {'key2.1': b'\x123'}})
+ custom_metadata={'key1': 'val1', 'key2': {'key2.1': b'\x123'}})
self.assertTrue(datasource.get_data())
json_file = self.tmp_path(INSTANCE_JSON_FILE, tmp)
content = util.load_file(json_file)
instance_json = util.load_json(content)
- self.assertEqual(
- ['ds/user-data/key2/key2.1'],
- instance_json['base64-encoded-keys'])
+ self.assertItemsEqual(
+ ['ds/meta_data/key2/key2.1'],
+ instance_json['base64_encoded_keys'])
self.assertEqual(
{'key1': 'val1', 'key2': {'key2.1': 'EjM='}},
- instance_json['ds']['user-data'])
+ instance_json['ds']['meta_data'])
@skipIf(not six.PY2, "json serialization on <= py2.7 handles bytes")
def test_get_data_handles_bytes_values(self):
@@ -332,15 +437,15 @@ class TestDataSource(CiTestCase):
tmp = self.tmp_dir()
datasource = DataSourceTestSubclassNet(
self.sys_cfg, self.distro, Paths({'run_dir': tmp}),
- custom_userdata={'key1': 'val1', 'key2': {'key2.1': b'\x123'}})
+ custom_metadata={'key1': 'val1', 'key2': {'key2.1': b'\x123'}})
self.assertTrue(datasource.get_data())
json_file = self.tmp_path(INSTANCE_JSON_FILE, tmp)
content = util.load_file(json_file)
instance_json = util.load_json(content)
- self.assertEqual([], instance_json['base64-encoded-keys'])
+ self.assertEqual([], instance_json['base64_encoded_keys'])
self.assertEqual(
{'key1': 'val1', 'key2': {'key2.1': '\x123'}},
- instance_json['ds']['user-data'])
+ instance_json['ds']['meta_data'])
@skipIf(not six.PY2, "Only python2 hits UnicodeDecodeErrors on non-utf8")
def test_non_utf8_encoding_logs_warning(self):
@@ -348,7 +453,7 @@ class TestDataSource(CiTestCase):
tmp = self.tmp_dir()
datasource = DataSourceTestSubclassNet(
self.sys_cfg, self.distro, Paths({'run_dir': tmp}),
- custom_userdata={'key1': 'val1', 'key2': {'key2.1': b'ab\xaadef'}})
+ custom_metadata={'key1': 'val1', 'key2': {'key2.1': b'ab\xaadef'}})
self.assertTrue(datasource.get_data())
json_file = self.tmp_path(INSTANCE_JSON_FILE, tmp)
self.assertFalse(os.path.exists(json_file))
@@ -429,8 +534,9 @@ class TestDataSource(CiTestCase):
def test_update_metadata_only_acts_on_supported_update_events(self):
"""update_metadata won't get_data on unsupported update events."""
+ self.datasource.update_events['network'].discard(EventType.BOOT)
self.assertEqual(
- {'network': [EventType.BOOT_NEW_INSTANCE]},
+ {'network': set([EventType.BOOT_NEW_INSTANCE])},
self.datasource.update_events)
def fake_get_data():
@@ -461,4 +567,36 @@ class TestDataSource(CiTestCase):
self.logs.getvalue())
+class TestRedactSensitiveData(CiTestCase):
+
+ def test_redact_sensitive_data_noop_when_no_sensitive_keys_present(self):
+ """When sensitive_keys is absent or empty from metadata do nothing."""
+ md = {'my': 'data'}
+ self.assertEqual(
+ md, redact_sensitive_keys(md, redact_value='redacted'))
+ md['sensitive_keys'] = []
+ self.assertEqual(
+ md, redact_sensitive_keys(md, redact_value='redacted'))
+
+ def test_redact_sensitive_data_redacts_exact_match_name(self):
+ """Only exact matched sensitive_keys are redacted from metadata."""
+ md = {'sensitive_keys': ['md/secure'],
+ 'md': {'secure': 's3kr1t', 'insecure': 'publik'}}
+ secure_md = copy.deepcopy(md)
+ secure_md['md']['secure'] = 'redacted'
+ self.assertEqual(
+ secure_md,
+ redact_sensitive_keys(md, redact_value='redacted'))
+
+ def test_redact_sensitive_data_does_redacts_with_default_string(self):
+ """When redact_value is absent, REDACT_SENSITIVE_VALUE is used."""
+ md = {'sensitive_keys': ['md/secure'],
+ 'md': {'secure': 's3kr1t', 'insecure': 'publik'}}
+ secure_md = copy.deepcopy(md)
+ secure_md['md']['secure'] = 'redacted for non-root user'
+ self.assertEqual(
+ secure_md,
+ redact_sensitive_keys(md))
+
+
# vi: ts=4 expandtab