summaryrefslogtreecommitdiff
path: root/tests/unittests/test_datasource/test_gce.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/unittests/test_datasource/test_gce.py')
-rw-r--r--tests/unittests/test_datasource/test_gce.py363
1 files changed, 0 insertions, 363 deletions
diff --git a/tests/unittests/test_datasource/test_gce.py b/tests/unittests/test_datasource/test_gce.py
deleted file mode 100644
index 01f4cbd1..00000000
--- a/tests/unittests/test_datasource/test_gce.py
+++ /dev/null
@@ -1,363 +0,0 @@
-# Copyright (C) 2014 Vaidas Jablonskis
-#
-# Author: Vaidas Jablonskis <jablonskis@gmail.com>
-#
-# This file is part of cloud-init. See LICENSE file for license information.
-
-import datetime
-import httpretty
-import json
-import re
-from unittest import mock
-from urllib.parse import urlparse
-
-from base64 import b64encode, b64decode
-
-from cloudinit import distros
-from cloudinit import helpers
-from cloudinit import settings
-from cloudinit.sources import DataSourceGCE
-
-from cloudinit.tests import helpers as test_helpers
-
-
-GCE_META = {
- 'instance/id': '123',
- 'instance/zone': 'foo/bar',
- 'instance/hostname': 'server.project-foo.local',
-}
-
-GCE_META_PARTIAL = {
- 'instance/id': '1234',
- 'instance/hostname': 'server.project-bar.local',
- 'instance/zone': 'bar/baz',
-}
-
-GCE_META_ENCODING = {
- 'instance/id': '12345',
- 'instance/hostname': 'server.project-baz.local',
- 'instance/zone': 'baz/bang',
- 'instance/attributes': {
- 'user-data': b64encode(b'#!/bin/echo baz\n').decode('utf-8'),
- 'user-data-encoding': 'base64',
- }
-}
-
-GCE_USER_DATA_TEXT = {
- 'instance/id': '12345',
- 'instance/hostname': 'server.project-baz.local',
- 'instance/zone': 'baz/bang',
- 'instance/attributes': {
- 'user-data': '#!/bin/sh\necho hi mom\ntouch /run/up-now\n',
- }
-}
-
-HEADERS = {'Metadata-Flavor': 'Google'}
-MD_URL_RE = re.compile(
- r'http://metadata.google.internal/computeMetadata/v1/.*')
-GUEST_ATTRIBUTES_URL = ('http://metadata.google.internal/computeMetadata/'
- 'v1/instance/guest-attributes/hostkeys/')
-
-
-def _set_mock_metadata(gce_meta=None):
- if gce_meta is None:
- gce_meta = GCE_META
-
- def _request_callback(method, uri, headers):
- url_path = urlparse(uri).path
- if url_path.startswith('/computeMetadata/v1/'):
- path = url_path.split('/computeMetadata/v1/')[1:][0]
- recursive = path.endswith('/')
- path = path.rstrip('/')
- else:
- path = None
- if path in gce_meta:
- response = gce_meta.get(path)
- if recursive:
- response = json.dumps(response)
- return (200, headers, response)
- else:
- return (404, headers, '')
-
- # reset is needed. https://github.com/gabrielfalcao/HTTPretty/issues/316
- httpretty.register_uri(httpretty.GET, MD_URL_RE, body=_request_callback)
-
-
-@httpretty.activate
-class TestDataSourceGCE(test_helpers.HttprettyTestCase):
-
- def _make_distro(self, dtype, def_user=None):
- cfg = dict(settings.CFG_BUILTIN)
- cfg['system_info']['distro'] = dtype
- paths = helpers.Paths(cfg['system_info']['paths'])
- distro_cls = distros.fetch(dtype)
- if def_user:
- cfg['system_info']['default_user'] = def_user.copy()
- distro = distro_cls(dtype, cfg['system_info'], paths)
- return distro
-
- def setUp(self):
- tmp = self.tmp_dir()
- self.ds = DataSourceGCE.DataSourceGCE(
- settings.CFG_BUILTIN, None,
- helpers.Paths({'run_dir': tmp}))
- ppatch = self.m_platform_reports_gce = mock.patch(
- 'cloudinit.sources.DataSourceGCE.platform_reports_gce')
- self.m_platform_reports_gce = ppatch.start()
- self.m_platform_reports_gce.return_value = True
- self.addCleanup(ppatch.stop)
- super(TestDataSourceGCE, self).setUp()
-
- def test_connection(self):
- _set_mock_metadata()
- success = self.ds.get_data()
- self.assertTrue(success)
-
- req_header = httpretty.last_request().headers
- for header_name, expected_value in HEADERS.items():
- self.assertEqual(expected_value, req_header.get(header_name))
-
- def test_metadata(self):
- # UnicodeDecodeError if set to ds.userdata instead of userdata_raw
- meta = GCE_META.copy()
- meta['instance/attributes/user-data'] = b'/bin/echo \xff\n'
-
- _set_mock_metadata()
- self.ds.get_data()
-
- shostname = GCE_META.get('instance/hostname').split('.')[0]
- self.assertEqual(shostname,
- self.ds.get_hostname())
-
- self.assertEqual(GCE_META.get('instance/id'),
- self.ds.get_instance_id())
-
- self.assertEqual(GCE_META.get('instance/attributes/user-data'),
- self.ds.get_userdata_raw())
-
- # test partial metadata (missing user-data in particular)
- def test_metadata_partial(self):
- _set_mock_metadata(GCE_META_PARTIAL)
- self.ds.get_data()
-
- self.assertEqual(GCE_META_PARTIAL.get('instance/id'),
- self.ds.get_instance_id())
-
- shostname = GCE_META_PARTIAL.get('instance/hostname').split('.')[0]
- self.assertEqual(shostname, self.ds.get_hostname())
-
- def test_userdata_no_encoding(self):
- """check that user-data is read."""
- _set_mock_metadata(GCE_USER_DATA_TEXT)
- self.ds.get_data()
- self.assertEqual(
- GCE_USER_DATA_TEXT['instance/attributes']['user-data'].encode(),
- self.ds.get_userdata_raw())
-
- def test_metadata_encoding(self):
- """user-data is base64 encoded if user-data-encoding is 'base64'."""
- _set_mock_metadata(GCE_META_ENCODING)
- self.ds.get_data()
-
- instance_data = GCE_META_ENCODING.get('instance/attributes')
- decoded = b64decode(instance_data.get('user-data'))
- self.assertEqual(decoded, self.ds.get_userdata_raw())
-
- def test_missing_required_keys_return_false(self):
- for required_key in ['instance/id', 'instance/zone',
- 'instance/hostname']:
- meta = GCE_META_PARTIAL.copy()
- del meta[required_key]
- _set_mock_metadata(meta)
- self.assertEqual(False, self.ds.get_data())
- httpretty.reset()
-
- def test_no_ssh_keys_metadata(self):
- _set_mock_metadata()
- self.ds.get_data()
- self.assertEqual([], self.ds.get_public_ssh_keys())
-
- def test_cloudinit_ssh_keys(self):
- valid_key = 'ssh-rsa VALID {0}'
- invalid_key = 'ssh-rsa INVALID {0}'
- project_attributes = {
- 'sshKeys': '\n'.join([
- 'cloudinit:{0}'.format(valid_key.format(0)),
- 'user:{0}'.format(invalid_key.format(0)),
- ]),
- 'ssh-keys': '\n'.join([
- 'cloudinit:{0}'.format(valid_key.format(1)),
- 'user:{0}'.format(invalid_key.format(1)),
- ]),
- }
- instance_attributes = {
- 'ssh-keys': '\n'.join([
- 'cloudinit:{0}'.format(valid_key.format(2)),
- 'user:{0}'.format(invalid_key.format(2)),
- ]),
- 'block-project-ssh-keys': 'False',
- }
-
- meta = GCE_META.copy()
- meta['project/attributes'] = project_attributes
- meta['instance/attributes'] = instance_attributes
-
- _set_mock_metadata(meta)
- self.ds.get_data()
-
- expected = [valid_key.format(key) for key in range(3)]
- self.assertEqual(set(expected), set(self.ds.get_public_ssh_keys()))
-
- @mock.patch("cloudinit.sources.DataSourceGCE.ug_util")
- def test_default_user_ssh_keys(self, mock_ug_util):
- mock_ug_util.normalize_users_groups.return_value = None, None
- mock_ug_util.extract_default.return_value = 'ubuntu', None
- ubuntu_ds = DataSourceGCE.DataSourceGCE(
- settings.CFG_BUILTIN, self._make_distro('ubuntu'),
- helpers.Paths({'run_dir': self.tmp_dir()}))
-
- valid_key = 'ssh-rsa VALID {0}'
- invalid_key = 'ssh-rsa INVALID {0}'
- project_attributes = {
- 'sshKeys': '\n'.join([
- 'ubuntu:{0}'.format(valid_key.format(0)),
- 'user:{0}'.format(invalid_key.format(0)),
- ]),
- 'ssh-keys': '\n'.join([
- 'ubuntu:{0}'.format(valid_key.format(1)),
- 'user:{0}'.format(invalid_key.format(1)),
- ]),
- }
- instance_attributes = {
- 'ssh-keys': '\n'.join([
- 'ubuntu:{0}'.format(valid_key.format(2)),
- 'user:{0}'.format(invalid_key.format(2)),
- ]),
- 'block-project-ssh-keys': 'False',
- }
-
- meta = GCE_META.copy()
- meta['project/attributes'] = project_attributes
- meta['instance/attributes'] = instance_attributes
-
- _set_mock_metadata(meta)
- ubuntu_ds.get_data()
-
- expected = [valid_key.format(key) for key in range(3)]
- self.assertEqual(set(expected), set(ubuntu_ds.get_public_ssh_keys()))
-
- def test_instance_ssh_keys_override(self):
- valid_key = 'ssh-rsa VALID {0}'
- invalid_key = 'ssh-rsa INVALID {0}'
- project_attributes = {
- 'sshKeys': 'cloudinit:{0}'.format(invalid_key.format(0)),
- 'ssh-keys': 'cloudinit:{0}'.format(invalid_key.format(1)),
- }
- instance_attributes = {
- 'sshKeys': 'cloudinit:{0}'.format(valid_key.format(0)),
- 'ssh-keys': 'cloudinit:{0}'.format(valid_key.format(1)),
- 'block-project-ssh-keys': 'False',
- }
-
- meta = GCE_META.copy()
- meta['project/attributes'] = project_attributes
- meta['instance/attributes'] = instance_attributes
-
- _set_mock_metadata(meta)
- self.ds.get_data()
-
- expected = [valid_key.format(key) for key in range(2)]
- self.assertEqual(set(expected), set(self.ds.get_public_ssh_keys()))
-
- def test_block_project_ssh_keys_override(self):
- valid_key = 'ssh-rsa VALID {0}'
- invalid_key = 'ssh-rsa INVALID {0}'
- project_attributes = {
- 'sshKeys': 'cloudinit:{0}'.format(invalid_key.format(0)),
- 'ssh-keys': 'cloudinit:{0}'.format(invalid_key.format(1)),
- }
- instance_attributes = {
- 'ssh-keys': 'cloudinit:{0}'.format(valid_key.format(0)),
- 'block-project-ssh-keys': 'True',
- }
-
- meta = GCE_META.copy()
- meta['project/attributes'] = project_attributes
- meta['instance/attributes'] = instance_attributes
-
- _set_mock_metadata(meta)
- self.ds.get_data()
-
- expected = [valid_key.format(0)]
- self.assertEqual(set(expected), set(self.ds.get_public_ssh_keys()))
-
- def test_only_last_part_of_zone_used_for_availability_zone(self):
- _set_mock_metadata()
- r = self.ds.get_data()
- self.assertEqual(True, r)
- self.assertEqual('bar', self.ds.availability_zone)
-
- @mock.patch("cloudinit.sources.DataSourceGCE.GoogleMetadataFetcher")
- def test_get_data_returns_false_if_not_on_gce(self, m_fetcher):
- self.m_platform_reports_gce.return_value = False
- ret = self.ds.get_data()
- self.assertEqual(False, ret)
- m_fetcher.assert_not_called()
-
- def test_has_expired(self):
-
- def _get_timestamp(days):
- format_str = '%Y-%m-%dT%H:%M:%S+0000'
- today = datetime.datetime.now()
- timestamp = today + datetime.timedelta(days=days)
- return timestamp.strftime(format_str)
-
- past = _get_timestamp(-1)
- future = _get_timestamp(1)
- ssh_keys = {
- None: False,
- '': False,
- 'Invalid': False,
- 'user:ssh-rsa key user@domain.com': False,
- 'user:ssh-rsa key google {"expireOn":"%s"}' % past: False,
- 'user:ssh-rsa key google-ssh': False,
- 'user:ssh-rsa key google-ssh {invalid:json}': False,
- 'user:ssh-rsa key google-ssh {"userName":"user"}': False,
- 'user:ssh-rsa key google-ssh {"expireOn":"invalid"}': False,
- 'user:xyz key google-ssh {"expireOn":"%s"}' % future: False,
- 'user:xyz key google-ssh {"expireOn":"%s"}' % past: True,
- }
-
- for key, expired in ssh_keys.items():
- self.assertEqual(DataSourceGCE._has_expired(key), expired)
-
- def test_parse_public_keys_non_ascii(self):
- public_key_data = [
- 'cloudinit:rsa ssh-ke%s invalid' % chr(165),
- 'use%sname:rsa ssh-key' % chr(174),
- 'cloudinit:test 1',
- 'default:test 2',
- 'user:test 3',
- ]
- expected = ['test 1', 'test 2']
- found = DataSourceGCE._parse_public_keys(
- public_key_data, default_user='default')
- self.assertEqual(sorted(found), sorted(expected))
-
- @mock.patch("cloudinit.url_helper.readurl")
- def test_publish_host_keys(self, m_readurl):
- hostkeys = [('ssh-rsa', 'asdfasdf'),
- ('ssh-ed25519', 'qwerqwer')]
- readurl_expected_calls = [
- mock.call(check_status=False, data=b'asdfasdf', headers=HEADERS,
- request_method='PUT',
- url='%s%s' % (GUEST_ATTRIBUTES_URL, 'ssh-rsa')),
- mock.call(check_status=False, data=b'qwerqwer', headers=HEADERS,
- request_method='PUT',
- url='%s%s' % (GUEST_ATTRIBUTES_URL, 'ssh-ed25519')),
- ]
- self.ds.publish_host_keys(hostkeys)
- m_readurl.assert_has_calls(readurl_expected_calls, any_order=True)
-
-
-# vi: ts=4 expandtab