diff options
Diffstat (limited to 'tests/unittests/sources/test_gce.py')
-rw-r--r-- | tests/unittests/sources/test_gce.py | 388 |
1 files changed, 388 insertions, 0 deletions
diff --git a/tests/unittests/sources/test_gce.py b/tests/unittests/sources/test_gce.py new file mode 100644 index 00000000..dc768e99 --- /dev/null +++ b/tests/unittests/sources/test_gce.py @@ -0,0 +1,388 @@ +# Copyright (C) 2014 Vaidas Jablonskis +# +# Author: Vaidas Jablonskis <jablonskis@gmail.com> +# +# This file is part of cloud-init. See LICENSE file for license information. + +import datetime +import httpretty +import json +import re +from unittest import mock +from urllib.parse import urlparse + +from base64 import b64encode, b64decode + +from cloudinit import distros +from cloudinit import helpers +from cloudinit import settings +from cloudinit.sources import DataSourceGCE + +from tests.unittests import helpers as test_helpers + + +GCE_META = { + 'instance/id': '123', + 'instance/zone': 'foo/bar', + 'instance/hostname': 'server.project-foo.local', +} + +GCE_META_PARTIAL = { + 'instance/id': '1234', + 'instance/hostname': 'server.project-bar.local', + 'instance/zone': 'bar/baz', +} + +GCE_META_ENCODING = { + 'instance/id': '12345', + 'instance/hostname': 'server.project-baz.local', + 'instance/zone': 'baz/bang', + 'instance/attributes': { + 'user-data': b64encode(b'#!/bin/echo baz\n').decode('utf-8'), + 'user-data-encoding': 'base64', + } +} + +GCE_USER_DATA_TEXT = { + 'instance/id': '12345', + 'instance/hostname': 'server.project-baz.local', + 'instance/zone': 'baz/bang', + 'instance/attributes': { + 'user-data': '#!/bin/sh\necho hi mom\ntouch /run/up-now\n', + } +} + +HEADERS = {'Metadata-Flavor': 'Google'} +MD_URL_RE = re.compile( + r'http://metadata.google.internal/computeMetadata/v1/.*') +GUEST_ATTRIBUTES_URL = ('http://metadata.google.internal/computeMetadata/' + 'v1/instance/guest-attributes/hostkeys/') + + +def _set_mock_metadata(gce_meta=None): + if gce_meta is None: + gce_meta = GCE_META + + def _request_callback(method, uri, headers): + url_path = urlparse(uri).path + if url_path.startswith('/computeMetadata/v1/'): + path = url_path.split('/computeMetadata/v1/')[1:][0] + recursive = path.endswith('/') + path = path.rstrip('/') + else: + path = None + if path in gce_meta: + response = gce_meta.get(path) + if recursive: + response = json.dumps(response) + return (200, headers, response) + else: + return (404, headers, '') + + # reset is needed. https://github.com/gabrielfalcao/HTTPretty/issues/316 + httpretty.register_uri(httpretty.GET, MD_URL_RE, body=_request_callback) + + +@httpretty.activate +class TestDataSourceGCE(test_helpers.HttprettyTestCase): + + def _make_distro(self, dtype, def_user=None): + cfg = dict(settings.CFG_BUILTIN) + cfg['system_info']['distro'] = dtype + paths = helpers.Paths(cfg['system_info']['paths']) + distro_cls = distros.fetch(dtype) + if def_user: + cfg['system_info']['default_user'] = def_user.copy() + distro = distro_cls(dtype, cfg['system_info'], paths) + return distro + + def setUp(self): + tmp = self.tmp_dir() + self.ds = DataSourceGCE.DataSourceGCE( + settings.CFG_BUILTIN, None, + helpers.Paths({'run_dir': tmp})) + ppatch = self.m_platform_reports_gce = mock.patch( + 'cloudinit.sources.DataSourceGCE.platform_reports_gce') + self.m_platform_reports_gce = ppatch.start() + self.m_platform_reports_gce.return_value = True + self.addCleanup(ppatch.stop) + self.add_patch('time.sleep', 'm_sleep') # just to speed up tests + super(TestDataSourceGCE, self).setUp() + + def test_connection(self): + _set_mock_metadata() + success = self.ds.get_data() + self.assertTrue(success) + + req_header = httpretty.last_request().headers + for header_name, expected_value in HEADERS.items(): + self.assertEqual(expected_value, req_header.get(header_name)) + + def test_metadata(self): + # UnicodeDecodeError if set to ds.userdata instead of userdata_raw + meta = GCE_META.copy() + meta['instance/attributes/user-data'] = b'/bin/echo \xff\n' + + _set_mock_metadata() + self.ds.get_data() + + shostname = GCE_META.get('instance/hostname').split('.')[0] + self.assertEqual(shostname, + self.ds.get_hostname()) + + self.assertEqual(GCE_META.get('instance/id'), + self.ds.get_instance_id()) + + self.assertEqual(GCE_META.get('instance/attributes/user-data'), + self.ds.get_userdata_raw()) + + # test partial metadata (missing user-data in particular) + def test_metadata_partial(self): + _set_mock_metadata(GCE_META_PARTIAL) + self.ds.get_data() + + self.assertEqual(GCE_META_PARTIAL.get('instance/id'), + self.ds.get_instance_id()) + + shostname = GCE_META_PARTIAL.get('instance/hostname').split('.')[0] + self.assertEqual(shostname, self.ds.get_hostname()) + + def test_userdata_no_encoding(self): + """check that user-data is read.""" + _set_mock_metadata(GCE_USER_DATA_TEXT) + self.ds.get_data() + self.assertEqual( + GCE_USER_DATA_TEXT['instance/attributes']['user-data'].encode(), + self.ds.get_userdata_raw()) + + def test_metadata_encoding(self): + """user-data is base64 encoded if user-data-encoding is 'base64'.""" + _set_mock_metadata(GCE_META_ENCODING) + self.ds.get_data() + + instance_data = GCE_META_ENCODING.get('instance/attributes') + decoded = b64decode(instance_data.get('user-data')) + self.assertEqual(decoded, self.ds.get_userdata_raw()) + + def test_missing_required_keys_return_false(self): + for required_key in ['instance/id', 'instance/zone', + 'instance/hostname']: + meta = GCE_META_PARTIAL.copy() + del meta[required_key] + _set_mock_metadata(meta) + self.assertEqual(False, self.ds.get_data()) + httpretty.reset() + + def test_no_ssh_keys_metadata(self): + _set_mock_metadata() + self.ds.get_data() + self.assertEqual([], self.ds.get_public_ssh_keys()) + + def test_cloudinit_ssh_keys(self): + valid_key = 'ssh-rsa VALID {0}' + invalid_key = 'ssh-rsa INVALID {0}' + project_attributes = { + 'sshKeys': '\n'.join([ + 'cloudinit:{0}'.format(valid_key.format(0)), + 'user:{0}'.format(invalid_key.format(0)), + ]), + 'ssh-keys': '\n'.join([ + 'cloudinit:{0}'.format(valid_key.format(1)), + 'user:{0}'.format(invalid_key.format(1)), + ]), + } + instance_attributes = { + 'ssh-keys': '\n'.join([ + 'cloudinit:{0}'.format(valid_key.format(2)), + 'user:{0}'.format(invalid_key.format(2)), + ]), + 'block-project-ssh-keys': 'False', + } + + meta = GCE_META.copy() + meta['project/attributes'] = project_attributes + meta['instance/attributes'] = instance_attributes + + _set_mock_metadata(meta) + self.ds.get_data() + + expected = [valid_key.format(key) for key in range(3)] + self.assertEqual(set(expected), set(self.ds.get_public_ssh_keys())) + + @mock.patch("cloudinit.sources.DataSourceGCE.ug_util") + def test_default_user_ssh_keys(self, mock_ug_util): + mock_ug_util.normalize_users_groups.return_value = None, None + mock_ug_util.extract_default.return_value = 'ubuntu', None + ubuntu_ds = DataSourceGCE.DataSourceGCE( + settings.CFG_BUILTIN, self._make_distro('ubuntu'), + helpers.Paths({'run_dir': self.tmp_dir()})) + + valid_key = 'ssh-rsa VALID {0}' + invalid_key = 'ssh-rsa INVALID {0}' + project_attributes = { + 'sshKeys': '\n'.join([ + 'ubuntu:{0}'.format(valid_key.format(0)), + 'user:{0}'.format(invalid_key.format(0)), + ]), + 'ssh-keys': '\n'.join([ + 'ubuntu:{0}'.format(valid_key.format(1)), + 'user:{0}'.format(invalid_key.format(1)), + ]), + } + instance_attributes = { + 'ssh-keys': '\n'.join([ + 'ubuntu:{0}'.format(valid_key.format(2)), + 'user:{0}'.format(invalid_key.format(2)), + ]), + 'block-project-ssh-keys': 'False', + } + + meta = GCE_META.copy() + meta['project/attributes'] = project_attributes + meta['instance/attributes'] = instance_attributes + + _set_mock_metadata(meta) + ubuntu_ds.get_data() + + expected = [valid_key.format(key) for key in range(3)] + self.assertEqual(set(expected), set(ubuntu_ds.get_public_ssh_keys())) + + def test_instance_ssh_keys_override(self): + valid_key = 'ssh-rsa VALID {0}' + invalid_key = 'ssh-rsa INVALID {0}' + project_attributes = { + 'sshKeys': 'cloudinit:{0}'.format(invalid_key.format(0)), + 'ssh-keys': 'cloudinit:{0}'.format(invalid_key.format(1)), + } + instance_attributes = { + 'sshKeys': 'cloudinit:{0}'.format(valid_key.format(0)), + 'ssh-keys': 'cloudinit:{0}'.format(valid_key.format(1)), + 'block-project-ssh-keys': 'False', + } + + meta = GCE_META.copy() + meta['project/attributes'] = project_attributes + meta['instance/attributes'] = instance_attributes + + _set_mock_metadata(meta) + self.ds.get_data() + + expected = [valid_key.format(key) for key in range(2)] + self.assertEqual(set(expected), set(self.ds.get_public_ssh_keys())) + + def test_block_project_ssh_keys_override(self): + valid_key = 'ssh-rsa VALID {0}' + invalid_key = 'ssh-rsa INVALID {0}' + project_attributes = { + 'sshKeys': 'cloudinit:{0}'.format(invalid_key.format(0)), + 'ssh-keys': 'cloudinit:{0}'.format(invalid_key.format(1)), + } + instance_attributes = { + 'ssh-keys': 'cloudinit:{0}'.format(valid_key.format(0)), + 'block-project-ssh-keys': 'True', + } + + meta = GCE_META.copy() + meta['project/attributes'] = project_attributes + meta['instance/attributes'] = instance_attributes + + _set_mock_metadata(meta) + self.ds.get_data() + + expected = [valid_key.format(0)] + self.assertEqual(set(expected), set(self.ds.get_public_ssh_keys())) + + def test_only_last_part_of_zone_used_for_availability_zone(self): + _set_mock_metadata() + r = self.ds.get_data() + self.assertEqual(True, r) + self.assertEqual('bar', self.ds.availability_zone) + + @mock.patch("cloudinit.sources.DataSourceGCE.GoogleMetadataFetcher") + def test_get_data_returns_false_if_not_on_gce(self, m_fetcher): + self.m_platform_reports_gce.return_value = False + ret = self.ds.get_data() + self.assertEqual(False, ret) + m_fetcher.assert_not_called() + + def test_has_expired(self): + + def _get_timestamp(days): + format_str = '%Y-%m-%dT%H:%M:%S+0000' + today = datetime.datetime.now() + timestamp = today + datetime.timedelta(days=days) + return timestamp.strftime(format_str) + + past = _get_timestamp(-1) + future = _get_timestamp(1) + ssh_keys = { + None: False, + '': False, + 'Invalid': False, + 'user:ssh-rsa key user@domain.com': False, + 'user:ssh-rsa key google {"expireOn":"%s"}' % past: False, + 'user:ssh-rsa key google-ssh': False, + 'user:ssh-rsa key google-ssh {invalid:json}': False, + 'user:ssh-rsa key google-ssh {"userName":"user"}': False, + 'user:ssh-rsa key google-ssh {"expireOn":"invalid"}': False, + 'user:xyz key google-ssh {"expireOn":"%s"}' % future: False, + 'user:xyz key google-ssh {"expireOn":"%s"}' % past: True, + } + + for key, expired in ssh_keys.items(): + self.assertEqual(DataSourceGCE._has_expired(key), expired) + + def test_parse_public_keys_non_ascii(self): + public_key_data = [ + 'cloudinit:rsa ssh-ke%s invalid' % chr(165), + 'use%sname:rsa ssh-key' % chr(174), + 'cloudinit:test 1', + 'default:test 2', + 'user:test 3', + ] + expected = ['test 1', 'test 2'] + found = DataSourceGCE._parse_public_keys( + public_key_data, default_user='default') + self.assertEqual(sorted(found), sorted(expected)) + + @mock.patch("cloudinit.url_helper.readurl") + def test_publish_host_keys(self, m_readurl): + hostkeys = [('ssh-rsa', 'asdfasdf'), + ('ssh-ed25519', 'qwerqwer')] + readurl_expected_calls = [ + mock.call(check_status=False, data=b'asdfasdf', headers=HEADERS, + request_method='PUT', + url='%s%s' % (GUEST_ATTRIBUTES_URL, 'ssh-rsa')), + mock.call(check_status=False, data=b'qwerqwer', headers=HEADERS, + request_method='PUT', + url='%s%s' % (GUEST_ATTRIBUTES_URL, 'ssh-ed25519')), + ] + self.ds.publish_host_keys(hostkeys) + m_readurl.assert_has_calls(readurl_expected_calls, any_order=True) + + @mock.patch( + "cloudinit.sources.DataSourceGCE.EphemeralDHCPv4", + autospec=True, + ) + @mock.patch( + "cloudinit.sources.DataSourceGCE.DataSourceGCELocal.fallback_interface" + ) + def test_local_datasource_uses_ephemeral_dhcp(self, _m_fallback, m_dhcp): + _set_mock_metadata() + ds = DataSourceGCE.DataSourceGCELocal( + sys_cfg={}, distro=None, paths=None + ) + ds._get_data() + assert m_dhcp.call_count == 1 + + @mock.patch( + "cloudinit.sources.DataSourceGCE.EphemeralDHCPv4", + autospec=True, + ) + def test_datasource_doesnt_use_ephemeral_dhcp(self, m_dhcp): + _set_mock_metadata() + ds = DataSourceGCE.DataSourceGCE(sys_cfg={}, distro=None, paths=None) + ds._get_data() + assert m_dhcp.call_count == 0 + +# vi: ts=4 expandtab |