summaryrefslogtreecommitdiff
path: root/cloudinit/sources/tests/test_init.py
diff options
context:
space:
mode:
Diffstat (limited to 'cloudinit/sources/tests/test_init.py')
-rw-r--r--cloudinit/sources/tests/test_init.py130
1 files changed, 109 insertions, 21 deletions
diff --git a/cloudinit/sources/tests/test_init.py b/cloudinit/sources/tests/test_init.py
index 8299af23..6b965750 100644
--- a/cloudinit/sources/tests/test_init.py
+++ b/cloudinit/sources/tests/test_init.py
@@ -1,5 +1,6 @@
# This file is part of cloud-init. See LICENSE file for license information.
+import copy
import inspect
import os
import six
@@ -9,7 +10,8 @@ from cloudinit.event import EventType
from cloudinit.helpers import Paths
from cloudinit import importer
from cloudinit.sources import (
- INSTANCE_JSON_FILE, DataSource, UNSET)
+ INSTANCE_JSON_FILE, INSTANCE_JSON_SENSITIVE_FILE, REDACT_SENSITIVE_VALUE,
+ UNSET, DataSource, redact_sensitive_keys)
from cloudinit.tests.helpers import CiTestCase, skipIf, mock
from cloudinit.user_data import UserDataProcessor
from cloudinit import util
@@ -20,20 +22,24 @@ class DataSourceTestSubclassNet(DataSource):
dsname = 'MyTestSubclass'
url_max_wait = 55
- def __init__(self, sys_cfg, distro, paths, custom_userdata=None,
- get_data_retval=True):
+ def __init__(self, sys_cfg, distro, paths, custom_metadata=None,
+ custom_userdata=None, get_data_retval=True):
super(DataSourceTestSubclassNet, self).__init__(
sys_cfg, distro, paths)
self._custom_userdata = custom_userdata
+ self._custom_metadata = custom_metadata
self._get_data_retval = get_data_retval
def _get_cloud_name(self):
return 'SubclassCloudName'
def _get_data(self):
- self.metadata = {'availability_zone': 'myaz',
- 'local-hostname': 'test-subclass-hostname',
- 'region': 'myregion'}
+ if self._custom_metadata:
+ self.metadata = self._custom_metadata
+ else:
+ self.metadata = {'availability_zone': 'myaz',
+ 'local-hostname': 'test-subclass-hostname',
+ 'region': 'myregion'}
if self._custom_userdata:
self.userdata_raw = self._custom_userdata
else:
@@ -278,7 +284,7 @@ class TestDataSource(CiTestCase):
os.path.exists(json_file), 'Found unexpected file %s' % json_file)
def test_get_data_writes_json_instance_data_on_success(self):
- """get_data writes INSTANCE_JSON_FILE to run_dir as readonly root."""
+ """get_data writes INSTANCE_JSON_FILE to run_dir as world readable."""
tmp = self.tmp_dir()
datasource = DataSourceTestSubclassNet(
self.sys_cfg, self.distro, Paths({'run_dir': tmp}))
@@ -287,40 +293,90 @@ class TestDataSource(CiTestCase):
content = util.load_file(json_file)
expected = {
'base64_encoded_keys': [],
+ 'sensitive_keys': [],
'v1': {
'availability-zone': 'myaz',
+ 'availability_zone': 'myaz',
'cloud-name': 'subclasscloudname',
+ 'cloud_name': 'subclasscloudname',
'instance-id': 'iid-datasource',
+ 'instance_id': 'iid-datasource',
'local-hostname': 'test-subclass-hostname',
+ 'local_hostname': 'test-subclass-hostname',
'region': 'myregion'},
'ds': {
'meta_data': {'availability_zone': 'myaz',
'local-hostname': 'test-subclass-hostname',
- 'region': 'myregion'},
- 'user_data': 'userdata_raw',
- 'vendor_data': 'vendordata_raw'}}
- self.maxDiff = None
+ 'region': 'myregion'}}}
self.assertEqual(expected, util.load_json(content))
file_stat = os.stat(json_file)
+ self.assertEqual(0o644, stat.S_IMODE(file_stat.st_mode))
+ self.assertEqual(expected, util.load_json(content))
+
+ def test_get_data_writes_json_instance_data_sensitive(self):
+ """get_data writes INSTANCE_JSON_SENSITIVE_FILE as readonly root."""
+ tmp = self.tmp_dir()
+ datasource = DataSourceTestSubclassNet(
+ self.sys_cfg, self.distro, Paths({'run_dir': tmp}),
+ custom_metadata={
+ 'availability_zone': 'myaz',
+ 'local-hostname': 'test-subclass-hostname',
+ 'region': 'myregion',
+ 'some': {'security-credentials': {
+ 'cred1': 'sekret', 'cred2': 'othersekret'}}})
+ self.assertEqual(
+ ('security-credentials',), datasource.sensitive_metadata_keys)
+ datasource.get_data()
+ json_file = self.tmp_path(INSTANCE_JSON_FILE, tmp)
+ sensitive_json_file = self.tmp_path(INSTANCE_JSON_SENSITIVE_FILE, tmp)
+ redacted = util.load_json(util.load_file(json_file))
+ self.assertEqual(
+ {'cred1': 'sekret', 'cred2': 'othersekret'},
+ redacted['ds']['meta_data']['some']['security-credentials'])
+ content = util.load_file(sensitive_json_file)
+ expected = {
+ 'base64_encoded_keys': [],
+ 'sensitive_keys': ['ds/meta_data/some/security-credentials'],
+ 'v1': {
+ 'availability-zone': 'myaz',
+ 'availability_zone': 'myaz',
+ 'cloud-name': 'subclasscloudname',
+ 'cloud_name': 'subclasscloudname',
+ 'instance-id': 'iid-datasource',
+ 'instance_id': 'iid-datasource',
+ 'local-hostname': 'test-subclass-hostname',
+ 'local_hostname': 'test-subclass-hostname',
+ 'region': 'myregion'},
+ 'ds': {
+ 'meta_data': {
+ 'availability_zone': 'myaz',
+ 'local-hostname': 'test-subclass-hostname',
+ 'region': 'myregion',
+ 'some': {'security-credentials': REDACT_SENSITIVE_VALUE}}}
+ }
+ self.maxDiff = None
+ self.assertEqual(expected, util.load_json(content))
+ file_stat = os.stat(sensitive_json_file)
self.assertEqual(0o600, stat.S_IMODE(file_stat.st_mode))
+ self.assertEqual(expected, util.load_json(content))
def test_get_data_handles_redacted_unserializable_content(self):
"""get_data warns unserializable content in INSTANCE_JSON_FILE."""
tmp = self.tmp_dir()
datasource = DataSourceTestSubclassNet(
self.sys_cfg, self.distro, Paths({'run_dir': tmp}),
- custom_userdata={'key1': 'val1', 'key2': {'key2.1': self.paths}})
+ custom_metadata={'key1': 'val1', 'key2': {'key2.1': self.paths}})
datasource.get_data()
json_file = self.tmp_path(INSTANCE_JSON_FILE, tmp)
content = util.load_file(json_file)
- expected_userdata = {
+ expected_metadata = {
'key1': 'val1',
'key2': {
'key2.1': "Warning: redacted unserializable type <class"
" 'cloudinit.helpers.Paths'>"}}
instance_json = util.load_json(content)
self.assertEqual(
- expected_userdata, instance_json['ds']['user_data'])
+ expected_metadata, instance_json['ds']['meta_data'])
def test_persist_instance_data_writes_ec2_metadata_when_set(self):
"""When ec2_metadata class attribute is set, persist to json."""
@@ -361,17 +417,17 @@ class TestDataSource(CiTestCase):
tmp = self.tmp_dir()
datasource = DataSourceTestSubclassNet(
self.sys_cfg, self.distro, Paths({'run_dir': tmp}),
- custom_userdata={'key1': 'val1', 'key2': {'key2.1': b'\x123'}})
+ custom_metadata={'key1': 'val1', 'key2': {'key2.1': b'\x123'}})
self.assertTrue(datasource.get_data())
json_file = self.tmp_path(INSTANCE_JSON_FILE, tmp)
content = util.load_file(json_file)
instance_json = util.load_json(content)
- self.assertEqual(
- ['ds/user_data/key2/key2.1'],
+ self.assertItemsEqual(
+ ['ds/meta_data/key2/key2.1'],
instance_json['base64_encoded_keys'])
self.assertEqual(
{'key1': 'val1', 'key2': {'key2.1': 'EjM='}},
- instance_json['ds']['user_data'])
+ instance_json['ds']['meta_data'])
@skipIf(not six.PY2, "json serialization on <= py2.7 handles bytes")
def test_get_data_handles_bytes_values(self):
@@ -379,7 +435,7 @@ class TestDataSource(CiTestCase):
tmp = self.tmp_dir()
datasource = DataSourceTestSubclassNet(
self.sys_cfg, self.distro, Paths({'run_dir': tmp}),
- custom_userdata={'key1': 'val1', 'key2': {'key2.1': b'\x123'}})
+ custom_metadata={'key1': 'val1', 'key2': {'key2.1': b'\x123'}})
self.assertTrue(datasource.get_data())
json_file = self.tmp_path(INSTANCE_JSON_FILE, tmp)
content = util.load_file(json_file)
@@ -387,7 +443,7 @@ class TestDataSource(CiTestCase):
self.assertEqual([], instance_json['base64_encoded_keys'])
self.assertEqual(
{'key1': 'val1', 'key2': {'key2.1': '\x123'}},
- instance_json['ds']['user_data'])
+ instance_json['ds']['meta_data'])
@skipIf(not six.PY2, "Only python2 hits UnicodeDecodeErrors on non-utf8")
def test_non_utf8_encoding_logs_warning(self):
@@ -395,7 +451,7 @@ class TestDataSource(CiTestCase):
tmp = self.tmp_dir()
datasource = DataSourceTestSubclassNet(
self.sys_cfg, self.distro, Paths({'run_dir': tmp}),
- custom_userdata={'key1': 'val1', 'key2': {'key2.1': b'ab\xaadef'}})
+ custom_metadata={'key1': 'val1', 'key2': {'key2.1': b'ab\xaadef'}})
self.assertTrue(datasource.get_data())
json_file = self.tmp_path(INSTANCE_JSON_FILE, tmp)
self.assertFalse(os.path.exists(json_file))
@@ -509,4 +565,36 @@ class TestDataSource(CiTestCase):
self.logs.getvalue())
+class TestRedactSensitiveData(CiTestCase):
+
+ def test_redact_sensitive_data_noop_when_no_sensitive_keys_present(self):
+ """When sensitive_keys is absent or empty from metadata do nothing."""
+ md = {'my': 'data'}
+ self.assertEqual(
+ md, redact_sensitive_keys(md, redact_value='redacted'))
+ md['sensitive_keys'] = []
+ self.assertEqual(
+ md, redact_sensitive_keys(md, redact_value='redacted'))
+
+ def test_redact_sensitive_data_redacts_exact_match_name(self):
+ """Only exact matched sensitive_keys are redacted from metadata."""
+ md = {'sensitive_keys': ['md/secure'],
+ 'md': {'secure': 's3kr1t', 'insecure': 'publik'}}
+ secure_md = copy.deepcopy(md)
+ secure_md['md']['secure'] = 'redacted'
+ self.assertEqual(
+ secure_md,
+ redact_sensitive_keys(md, redact_value='redacted'))
+
+ def test_redact_sensitive_data_does_redacts_with_default_string(self):
+ """When redact_value is absent, REDACT_SENSITIVE_VALUE is used."""
+ md = {'sensitive_keys': ['md/secure'],
+ 'md': {'secure': 's3kr1t', 'insecure': 'publik'}}
+ secure_md = copy.deepcopy(md)
+ secure_md['md']['secure'] = 'redacted for non-root user'
+ self.assertEqual(
+ secure_md,
+ redact_sensitive_keys(md))
+
+
# vi: ts=4 expandtab