diff options
Diffstat (limited to 'tests/unittests')
-rw-r--r-- | tests/unittests/helpers.py | 5 | ||||
-rw-r--r-- | tests/unittests/test_datasource/test_digitalocean.py | 126 | ||||
-rw-r--r-- | tests/unittests/test_datasource/test_openstack.py | 3 | ||||
-rw-r--r-- | tests/unittests/test_distros/test_netconfig.py | 7 | ||||
-rw-r--r-- | tests/unittests/test_handler/test_handler_chef.py | 121 | ||||
-rw-r--r-- | tests/unittests/test_handler/test_handler_set_hostname.py | 9 | ||||
-rw-r--r-- | tests/unittests/test_sshutil.py | 73 |
7 files changed, 336 insertions, 8 deletions
diff --git a/tests/unittests/helpers.py b/tests/unittests/helpers.py index 9700a4ca..52305397 100644 --- a/tests/unittests/helpers.py +++ b/tests/unittests/helpers.py @@ -35,6 +35,11 @@ else: if PY26: # For now add these on, taken from python 2.7 + slightly adjusted class TestCase(unittest.TestCase): + def assertIs(self, expr1, expr2, msg=None): + if expr1 is not expr2: + standardMsg = '%r is not %r' % (expr1, expr2) + self.fail(self._formatMessage(msg, standardMsg)) + def assertIn(self, member, container, msg=None): if member not in container: standardMsg = '%r not found in %r' % (member, container) diff --git a/tests/unittests/test_datasource/test_digitalocean.py b/tests/unittests/test_datasource/test_digitalocean.py new file mode 100644 index 00000000..04bee340 --- /dev/null +++ b/tests/unittests/test_datasource/test_digitalocean.py @@ -0,0 +1,126 @@ +# +# Copyright (C) 2014 Neal Shrader +# +# Author: Neal Shrader <neal@digitalocean.com> +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 3, as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import httpretty +import re + +from types import ListType +from urlparse import urlparse + +from cloudinit import settings +from cloudinit import helpers +from cloudinit.sources import DataSourceDigitalOcean + +from .. import helpers as test_helpers + +# Abbreviated for the test +DO_INDEX = """id + hostname + user-data + vendor-data + public-keys + region""" + +DO_MULTIPLE_KEYS = """ssh-rsa AAAAB3NzaC1yc2EAAAA... neal@digitalocean.com + ssh-rsa AAAAB3NzaC1yc2EAAAA... neal2@digitalocean.com""" +DO_SINGLE_KEY = "ssh-rsa AAAAB3NzaC1yc2EAAAA... neal@digitalocean.com" + +DO_META = { + '': DO_INDEX, + 'user-data': '#!/bin/bash\necho "user-data"', + 'vendor-data': '#!/bin/bash\necho "vendor-data"', + 'public-keys': DO_SINGLE_KEY, + 'region': 'nyc3', + 'id': '2000000', + 'hostname': 'cloudinit-test', +} + +MD_URL_RE = re.compile(r'http://169.254.169.254/metadata/v1/.*') + +def _request_callback(method, uri, headers): + url_path = urlparse(uri).path + if url_path.startswith('/metadata/v1/'): + path = url_path.split('/metadata/v1/')[1:][0] + else: + path = None + if path in DO_META: + return (200, headers, DO_META.get(path)) + else: + return (404, headers, '') + + +class TestDataSourceDigitalOcean(test_helpers.HttprettyTestCase): + + def setUp(self): + self.ds = DataSourceDigitalOcean.DataSourceDigitalOcean( + settings.CFG_BUILTIN, None, + helpers.Paths({})) + super(TestDataSourceDigitalOcean, self).setUp() + + @httpretty.activate + def test_connection(self): + httpretty.register_uri( + httpretty.GET, MD_URL_RE, + body=_request_callback) + + success = self.ds.get_data() + self.assertTrue(success) + + @httpretty.activate + def test_metadata(self): + httpretty.register_uri( + httpretty.GET, MD_URL_RE, + body=_request_callback) + self.ds.get_data() + + self.assertEqual(DO_META.get('user-data'), + self.ds.get_userdata_raw()) + + self.assertEqual(DO_META.get('vendor-data'), + self.ds.get_vendordata_raw()) + + self.assertEqual(DO_META.get('region'), + self.ds.availability_zone) + + self.assertEqual(DO_META.get('id'), + self.ds.get_instance_id()) + + self.assertEqual(DO_META.get('hostname'), + self.ds.get_hostname()) + + self.assertEqual('http://mirrors.digitalocean.com/', + self.ds.get_package_mirror_info()) + + # Single key + self.assertEqual([DO_META.get('public-keys')], + self.ds.get_public_ssh_keys()) + + self.assertIs(type(self.ds.get_public_ssh_keys()), ListType) + + @httpretty.activate + def test_multiple_ssh_keys(self): + DO_META['public_keys'] = DO_MULTIPLE_KEYS + httpretty.register_uri( + httpretty.GET, MD_URL_RE, + body=_request_callback) + self.ds.get_data() + + # Multiple keys + self.assertEqual(DO_META.get('public-keys').splitlines(), + self.ds.get_public_ssh_keys()) + + self.assertIs(type(self.ds.get_public_ssh_keys()), ListType) diff --git a/tests/unittests/test_datasource/test_openstack.py b/tests/unittests/test_datasource/test_openstack.py index 7b4e651a..49894e51 100644 --- a/tests/unittests/test_datasource/test_openstack.py +++ b/tests/unittests/test_datasource/test_openstack.py @@ -19,7 +19,6 @@ import copy import json import re -import unittest from StringIO import StringIO @@ -318,7 +317,7 @@ class TestOpenStackDataSource(test_helpers.HttprettyTestCase): self.assertIsNone(ds_os.version) -class TestVendorDataLoading(unittest.TestCase): +class TestVendorDataLoading(test_helpers.TestCase): def cvj(self, data): return openstack.convert_vendordata_json(data) diff --git a/tests/unittests/test_distros/test_netconfig.py b/tests/unittests/test_distros/test_netconfig.py index fbdb7b3f..35cc1f43 100644 --- a/tests/unittests/test_distros/test_netconfig.py +++ b/tests/unittests/test_distros/test_netconfig.py @@ -182,6 +182,12 @@ NETWORKING=yes spec=False, passthrough=False) load_mock = self.mocker.replace(util.load_file, spec=False, passthrough=False) + subp_mock = self.mocker.replace(util.subp, + spec=False, passthrough=False) + + subp_mock(['ifconfig', '-a']) + self.mocker.count(0, None) + self.mocker.result(('vtnet0', '')) exists_mock(mocker.ARGS) self.mocker.count(0, None) @@ -190,6 +196,7 @@ NETWORKING=yes write_bufs = {} read_bufs = { '/etc/rc.conf': '', + '/etc/resolv.conf': '', } def replace_write(filename, content, mode=0644, omode="wb"): diff --git a/tests/unittests/test_handler/test_handler_chef.py b/tests/unittests/test_handler/test_handler_chef.py new file mode 100644 index 00000000..ef1aa208 --- /dev/null +++ b/tests/unittests/test_handler/test_handler_chef.py @@ -0,0 +1,121 @@ +import json +import os + +from cloudinit.config import cc_chef + +from cloudinit import cloud +from cloudinit import distros +from cloudinit import helpers +from cloudinit import util +from cloudinit.sources import DataSourceNone + +from .. import helpers as t_help + +import logging + +LOG = logging.getLogger(__name__) + + +class TestChef(t_help.FilesystemMockingTestCase): + def setUp(self): + super(TestChef, self).setUp() + self.tmp = self.makeDir(prefix="unittest_") + + def fetch_cloud(self, distro_kind): + cls = distros.fetch(distro_kind) + paths = helpers.Paths({}) + distro = cls(distro_kind, {}, paths) + ds = DataSourceNone.DataSourceNone({}, distro, paths, None) + return cloud.Cloud(ds, paths, {}, distro, None) + + def test_no_config(self): + self.patchUtils(self.tmp) + self.patchOS(self.tmp) + + cfg = {} + cc_chef.handle('chef', cfg, self.fetch_cloud('ubuntu'), LOG, []) + for d in cc_chef.CHEF_DIRS: + self.assertFalse(os.path.isdir(d)) + + def test_basic_config(self): + # This should create a file of the format... + """ + # Created by cloud-init v. 0.7.6 on Sat, 11 Oct 2014 23:57:21 +0000 + log_level :info + ssl_verify_mode :verify_none + log_location "/var/log/chef/client.log" + validation_client_name "bob" + validation_key "/etc/chef/validation.pem" + client_key "/etc/chef/client.pem" + chef_server_url "localhost" + environment "_default" + node_name "iid-datasource-none" + json_attribs "/etc/chef/firstboot.json" + file_cache_path "/var/cache/chef" + file_backup_path "/var/backups/chef" + pid_file "/var/run/chef/client.pid" + Chef::Log::Formatter.show_time = true + """ + tpl_file = util.load_file('templates/chef_client.rb.tmpl') + self.patchUtils(self.tmp) + self.patchOS(self.tmp) + + util.write_file('/etc/cloud/templates/chef_client.rb.tmpl', tpl_file) + cfg = { + 'chef': { + 'server_url': 'localhost', + 'validation_name': 'bob', + }, + } + cc_chef.handle('chef', cfg, self.fetch_cloud('ubuntu'), LOG, []) + for d in cc_chef.CHEF_DIRS: + self.assertTrue(os.path.isdir(d)) + c = util.load_file(cc_chef.CHEF_RB_PATH) + for k, v in cfg['chef'].items(): + self.assertIn(v, c) + for k, v in cc_chef.CHEF_RB_TPL_DEFAULTS.items(): + if isinstance(v, basestring): + self.assertIn(v, c) + c = util.load_file(cc_chef.CHEF_FB_PATH) + self.assertEqual({}, json.loads(c)) + + def test_firstboot_json(self): + self.patchUtils(self.tmp) + self.patchOS(self.tmp) + + cfg = { + 'chef': { + 'server_url': 'localhost', + 'validation_name': 'bob', + 'run_list': ['a', 'b', 'c'], + 'initial_attributes': { + 'c': 'd', + } + }, + } + cc_chef.handle('chef', cfg, self.fetch_cloud('ubuntu'), LOG, []) + c = util.load_file(cc_chef.CHEF_FB_PATH) + self.assertEqual( + { + 'run_list': ['a', 'b', 'c'], + 'c': 'd', + }, json.loads(c)) + + def test_template_deletes(self): + tpl_file = util.load_file('templates/chef_client.rb.tmpl') + self.patchUtils(self.tmp) + self.patchOS(self.tmp) + + util.write_file('/etc/cloud/templates/chef_client.rb.tmpl', tpl_file) + cfg = { + 'chef': { + 'server_url': 'localhost', + 'validation_name': 'bob', + 'json_attribs': None, + 'show_time': None, + }, + } + cc_chef.handle('chef', cfg, self.fetch_cloud('ubuntu'), LOG, []) + c = util.load_file(cc_chef.CHEF_RB_PATH) + self.assertNotIn('json_attribs', c) + self.assertNotIn('Formatter.show_time', c) diff --git a/tests/unittests/test_handler/test_handler_set_hostname.py b/tests/unittests/test_handler/test_handler_set_hostname.py index 03004ab9..e1530e30 100644 --- a/tests/unittests/test_handler/test_handler_set_hostname.py +++ b/tests/unittests/test_handler/test_handler_set_hostname.py @@ -37,10 +37,11 @@ class TestHostname(t_help.FilesystemMockingTestCase): self.patchUtils(self.tmp) cc_set_hostname.handle('cc_set_hostname', cfg, cc, LOG, []) - contents = util.load_file("/etc/sysconfig/network") - n_cfg = ConfigObj(StringIO(contents)) - self.assertEquals({'HOSTNAME': 'blah.blah.blah.yahoo.com'}, - dict(n_cfg)) + if not distro.uses_systemd(): + contents = util.load_file("/etc/sysconfig/network") + n_cfg = ConfigObj(StringIO(contents)) + self.assertEquals({'HOSTNAME': 'blah.blah.blah.yahoo.com'}, + dict(n_cfg)) def test_write_hostname_debian(self): cfg = { diff --git a/tests/unittests/test_sshutil.py b/tests/unittests/test_sshutil.py index d8662cac..3b317121 100644 --- a/tests/unittests/test_sshutil.py +++ b/tests/unittests/test_sshutil.py @@ -1,5 +1,7 @@ +from mock import patch + +from . import helpers as test_helpers from cloudinit import ssh_util -from unittest import TestCase VALID_CONTENT = { @@ -35,7 +37,7 @@ TEST_OPTIONS = ("no-port-forwarding,no-agent-forwarding,no-X11-forwarding," 'user \"root\".\';echo;sleep 10"') -class TestAuthKeyLineParser(TestCase): +class TestAuthKeyLineParser(test_helpers.TestCase): def test_simple_parse(self): # test key line with common 3 fields (keytype, base64, comment) parser = ssh_util.AuthKeyLineParser() @@ -98,4 +100,71 @@ class TestAuthKeyLineParser(TestCase): self.assertFalse(key.valid()) +class TestParseSSHConfig(test_helpers.TestCase): + + def setUp(self): + self.load_file_patch = patch('cloudinit.ssh_util.util.load_file') + self.load_file = self.load_file_patch.start() + self.isfile_patch = patch('cloudinit.ssh_util.os.path.isfile') + self.isfile = self.isfile_patch.start() + self.isfile.return_value = True + + def tearDown(self): + self.load_file_patch.stop() + self.isfile_patch.stop() + + def test_not_a_file(self): + self.isfile.return_value = False + self.load_file.side_effect = IOError + ret = ssh_util.parse_ssh_config('not a real file') + self.assertEqual([], ret) + + def test_empty_file(self): + self.load_file.return_value = '' + ret = ssh_util.parse_ssh_config('some real file') + self.assertEqual([], ret) + + def test_comment_line(self): + comment_line = '# This is a comment' + self.load_file.return_value = comment_line + ret = ssh_util.parse_ssh_config('some real file') + self.assertEqual(1, len(ret)) + self.assertEqual(comment_line, ret[0].line) + + def test_blank_lines(self): + lines = ['', '\t', ' '] + self.load_file.return_value = '\n'.join(lines) + ret = ssh_util.parse_ssh_config('some real file') + self.assertEqual(len(lines), len(ret)) + for line in ret: + self.assertEqual('', line.line) + + def test_lower_case_config(self): + self.load_file.return_value = 'foo bar' + ret = ssh_util.parse_ssh_config('some real file') + self.assertEqual(1, len(ret)) + self.assertEqual('foo', ret[0].key) + self.assertEqual('bar', ret[0].value) + + def test_upper_case_config(self): + self.load_file.return_value = 'Foo Bar' + ret = ssh_util.parse_ssh_config('some real file') + self.assertEqual(1, len(ret)) + self.assertEqual('foo', ret[0].key) + self.assertEqual('Bar', ret[0].value) + + def test_lower_case_with_equals(self): + self.load_file.return_value = 'foo=bar' + ret = ssh_util.parse_ssh_config('some real file') + self.assertEqual(1, len(ret)) + self.assertEqual('foo', ret[0].key) + self.assertEqual('bar', ret[0].value) + + def test_upper_case_with_equals(self): + self.load_file.return_value = 'Foo=bar' + ret = ssh_util.parse_ssh_config('some real file') + self.assertEqual(1, len(ret)) + self.assertEqual('foo', ret[0].key) + self.assertEqual('bar', ret[0].value) + # vi: ts=4 expandtab |