summaryrefslogtreecommitdiff
path: root/tests/unittests/test_datasource
diff options
context:
space:
mode:
Diffstat (limited to 'tests/unittests/test_datasource')
-rw-r--r--tests/unittests/test_datasource/test_gce.py193
1 files changed, 172 insertions, 21 deletions
diff --git a/tests/unittests/test_datasource/test_gce.py b/tests/unittests/test_datasource/test_gce.py
index 82c788dc..12d68009 100644
--- a/tests/unittests/test_datasource/test_gce.py
+++ b/tests/unittests/test_datasource/test_gce.py
@@ -4,13 +4,16 @@
#
# This file is part of cloud-init. See LICENSE file for license information.
+import datetime
import httpretty
+import json
import mock
import re
from base64 import b64encode, b64decode
from six.moves.urllib_parse import urlparse
+from cloudinit import distros
from cloudinit import helpers
from cloudinit import settings
from cloudinit.sources import DataSourceGCE
@@ -21,10 +24,7 @@ from cloudinit.tests import helpers as test_helpers
GCE_META = {
'instance/id': '123',
'instance/zone': 'foo/bar',
- 'project/attributes/sshKeys': 'user:ssh-rsa AA2..+aRD0fyVw== root@server',
'instance/hostname': 'server.project-foo.local',
- # UnicodeDecodeError below if set to ds.userdata instead of userdata_raw
- 'instance/attributes/user-data': b'/bin/echo \xff\n',
}
GCE_META_PARTIAL = {
@@ -37,11 +37,13 @@ GCE_META_ENCODING = {
'instance/id': '12345',
'instance/hostname': 'server.project-baz.local',
'instance/zone': 'baz/bang',
- 'instance/attributes/user-data': b64encode(b'/bin/echo baz\n'),
- 'instance/attributes/user-data-encoding': 'base64',
+ 'instance/attributes': {
+ 'user-data': b64encode(b'/bin/echo baz\n').decode('utf-8'),
+ 'user-data-encoding': 'base64',
+ }
}
-HEADERS = {'X-Google-Metadata-Request': 'True'}
+HEADERS = {'Metadata-Flavor': 'Google'}
MD_URL_RE = re.compile(
r'http://metadata.google.internal/computeMetadata/v1/.*')
@@ -54,10 +56,15 @@ def _set_mock_metadata(gce_meta=None):
url_path = urlparse(uri).path
if url_path.startswith('/computeMetadata/v1/'):
path = url_path.split('/computeMetadata/v1/')[1:][0]
+ recursive = path.endswith('/')
+ path = path.rstrip('/')
else:
path = None
if path in gce_meta:
- return (200, headers, gce_meta.get(path))
+ response = gce_meta.get(path)
+ if recursive:
+ response = json.dumps(response)
+ return (200, headers, response)
else:
return (404, headers, '')
@@ -69,6 +76,16 @@ def _set_mock_metadata(gce_meta=None):
@httpretty.activate
class TestDataSourceGCE(test_helpers.HttprettyTestCase):
+ def _make_distro(self, dtype, def_user=None):
+ cfg = dict(settings.CFG_BUILTIN)
+ cfg['system_info']['distro'] = dtype
+ paths = helpers.Paths(cfg['system_info']['paths'])
+ distro_cls = distros.fetch(dtype)
+ if def_user:
+ cfg['system_info']['default_user'] = def_user.copy()
+ distro = distro_cls(dtype, cfg['system_info'], paths)
+ return distro
+
def setUp(self):
tmp = self.tmp_dir()
self.ds = DataSourceGCE.DataSourceGCE(
@@ -90,6 +107,10 @@ class TestDataSourceGCE(test_helpers.HttprettyTestCase):
self.assertDictContainsSubset(HEADERS, req_header)
def test_metadata(self):
+ # UnicodeDecodeError if set to ds.userdata instead of userdata_raw
+ meta = GCE_META.copy()
+ meta['instance/attributes/user-data'] = b'/bin/echo \xff\n'
+
_set_mock_metadata()
self.ds.get_data()
@@ -118,8 +139,8 @@ class TestDataSourceGCE(test_helpers.HttprettyTestCase):
_set_mock_metadata(GCE_META_ENCODING)
self.ds.get_data()
- decoded = b64decode(
- GCE_META_ENCODING.get('instance/attributes/user-data'))
+ instance_data = GCE_META_ENCODING.get('instance/attributes')
+ decoded = b64decode(instance_data.get('user-data'))
self.assertEqual(decoded, self.ds.get_userdata_raw())
def test_missing_required_keys_return_false(self):
@@ -131,33 +152,124 @@ class TestDataSourceGCE(test_helpers.HttprettyTestCase):
self.assertEqual(False, self.ds.get_data())
httpretty.reset()
- def test_project_level_ssh_keys_are_used(self):
+ def test_no_ssh_keys_metadata(self):
_set_mock_metadata()
self.ds.get_data()
+ self.assertEqual([], self.ds.get_public_ssh_keys())
+
+ def test_cloudinit_ssh_keys(self):
+ valid_key = 'ssh-rsa VALID {0}'
+ invalid_key = 'ssh-rsa INVALID {0}'
+ project_attributes = {
+ 'sshKeys': '\n'.join([
+ 'cloudinit:{0}'.format(valid_key.format(0)),
+ 'user:{0}'.format(invalid_key.format(0)),
+ ]),
+ 'ssh-keys': '\n'.join([
+ 'cloudinit:{0}'.format(valid_key.format(1)),
+ 'user:{0}'.format(invalid_key.format(1)),
+ ]),
+ }
+ instance_attributes = {
+ 'ssh-keys': '\n'.join([
+ 'cloudinit:{0}'.format(valid_key.format(2)),
+ 'user:{0}'.format(invalid_key.format(2)),
+ ]),
+ 'block-project-ssh-keys': 'False',
+ }
+
+ meta = GCE_META.copy()
+ meta['project/attributes'] = project_attributes
+ meta['instance/attributes'] = instance_attributes
+
+ _set_mock_metadata(meta)
+ self.ds.get_data()
+
+ expected = [valid_key.format(key) for key in range(3)]
+ self.assertEqual(set(expected), set(self.ds.get_public_ssh_keys()))
+
+ @mock.patch("cloudinit.sources.DataSourceGCE.ug_util")
+ def test_default_user_ssh_keys(self, mock_ug_util):
+ mock_ug_util.normalize_users_groups.return_value = None, None
+ mock_ug_util.extract_default.return_value = 'ubuntu', None
+ ubuntu_ds = DataSourceGCE.DataSourceGCE(
+ settings.CFG_BUILTIN, self._make_distro('ubuntu'),
+ helpers.Paths({}))
+
+ valid_key = 'ssh-rsa VALID {0}'
+ invalid_key = 'ssh-rsa INVALID {0}'
+ project_attributes = {
+ 'sshKeys': '\n'.join([
+ 'ubuntu:{0}'.format(valid_key.format(0)),
+ 'user:{0}'.format(invalid_key.format(0)),
+ ]),
+ 'ssh-keys': '\n'.join([
+ 'ubuntu:{0}'.format(valid_key.format(1)),
+ 'user:{0}'.format(invalid_key.format(1)),
+ ]),
+ }
+ instance_attributes = {
+ 'ssh-keys': '\n'.join([
+ 'ubuntu:{0}'.format(valid_key.format(2)),
+ 'user:{0}'.format(invalid_key.format(2)),
+ ]),
+ 'block-project-ssh-keys': 'False',
+ }
- # we expect a list of public ssh keys with user names stripped
- self.assertEqual(['ssh-rsa AA2..+aRD0fyVw== root@server'],
- self.ds.get_public_ssh_keys())
+ meta = GCE_META.copy()
+ meta['project/attributes'] = project_attributes
+ meta['instance/attributes'] = instance_attributes
+
+ _set_mock_metadata(meta)
+ ubuntu_ds.get_data()
+
+ expected = [valid_key.format(key) for key in range(3)]
+ self.assertEqual(set(expected), set(ubuntu_ds.get_public_ssh_keys()))
+
+ def test_instance_ssh_keys_override(self):
+ valid_key = 'ssh-rsa VALID {0}'
+ invalid_key = 'ssh-rsa INVALID {0}'
+ project_attributes = {
+ 'sshKeys': 'cloudinit:{0}'.format(invalid_key.format(0)),
+ 'ssh-keys': 'cloudinit:{0}'.format(invalid_key.format(1)),
+ }
+ instance_attributes = {
+ 'sshKeys': 'cloudinit:{0}'.format(valid_key.format(0)),
+ 'ssh-keys': 'cloudinit:{0}'.format(valid_key.format(1)),
+ 'block-project-ssh-keys': 'False',
+ }
- def test_instance_level_ssh_keys_are_used(self):
- key_content = 'ssh-rsa JustAUser root@server'
meta = GCE_META.copy()
- meta['instance/attributes/ssh-keys'] = 'user:{0}'.format(key_content)
+ meta['project/attributes'] = project_attributes
+ meta['instance/attributes'] = instance_attributes
_set_mock_metadata(meta)
self.ds.get_data()
- self.assertIn(key_content, self.ds.get_public_ssh_keys())
+ expected = [valid_key.format(key) for key in range(2)]
+ self.assertEqual(set(expected), set(self.ds.get_public_ssh_keys()))
+
+ def test_block_project_ssh_keys_override(self):
+ valid_key = 'ssh-rsa VALID {0}'
+ invalid_key = 'ssh-rsa INVALID {0}'
+ project_attributes = {
+ 'sshKeys': 'cloudinit:{0}'.format(invalid_key.format(0)),
+ 'ssh-keys': 'cloudinit:{0}'.format(invalid_key.format(1)),
+ }
+ instance_attributes = {
+ 'ssh-keys': 'cloudinit:{0}'.format(valid_key.format(0)),
+ 'block-project-ssh-keys': 'True',
+ }
- def test_instance_level_keys_replace_project_level_keys(self):
- key_content = 'ssh-rsa JustAUser root@server'
meta = GCE_META.copy()
- meta['instance/attributes/ssh-keys'] = 'user:{0}'.format(key_content)
+ meta['project/attributes'] = project_attributes
+ meta['instance/attributes'] = instance_attributes
_set_mock_metadata(meta)
self.ds.get_data()
- self.assertEqual([key_content], self.ds.get_public_ssh_keys())
+ expected = [valid_key.format(0)]
+ self.assertEqual(set(expected), set(self.ds.get_public_ssh_keys()))
def test_only_last_part_of_zone_used_for_availability_zone(self):
_set_mock_metadata()
@@ -172,5 +284,44 @@ class TestDataSourceGCE(test_helpers.HttprettyTestCase):
self.assertEqual(False, ret)
m_fetcher.assert_not_called()
+ def test_has_expired(self):
+
+ def _get_timestamp(days):
+ format_str = '%Y-%m-%dT%H:%M:%S+0000'
+ today = datetime.datetime.now()
+ timestamp = today + datetime.timedelta(days=days)
+ return timestamp.strftime(format_str)
+
+ past = _get_timestamp(-1)
+ future = _get_timestamp(1)
+ ssh_keys = {
+ None: False,
+ '': False,
+ 'Invalid': False,
+ 'user:ssh-rsa key user@domain.com': False,
+ 'user:ssh-rsa key google {"expireOn":"%s"}' % past: False,
+ 'user:ssh-rsa key google-ssh': False,
+ 'user:ssh-rsa key google-ssh {invalid:json}': False,
+ 'user:ssh-rsa key google-ssh {"userName":"user"}': False,
+ 'user:ssh-rsa key google-ssh {"expireOn":"invalid"}': False,
+ 'user:xyz key google-ssh {"expireOn":"%s"}' % future: False,
+ 'user:xyz key google-ssh {"expireOn":"%s"}' % past: True,
+ }
+
+ for key, expired in ssh_keys.items():
+ self.assertEqual(DataSourceGCE._has_expired(key), expired)
+
+ def test_parse_public_keys_non_ascii(self):
+ public_key_data = [
+ 'cloudinit:rsa ssh-ke%s invalid' % chr(165),
+ 'use%sname:rsa ssh-key' % chr(174),
+ 'cloudinit:test 1',
+ 'default:test 2',
+ 'user:test 3',
+ ]
+ expected = ['test 1', 'test 2']
+ found = DataSourceGCE._parse_public_keys(
+ public_key_data, default_user='default')
+ self.assertEqual(sorted(found), sorted(expected))
# vi: ts=4 expandtab