summaryrefslogtreecommitdiff
path: root/tests/unittests
diff options
context:
space:
mode:
Diffstat (limited to 'tests/unittests')
-rw-r--r--tests/unittests/helpers.py5
-rw-r--r--tests/unittests/test_datasource/test_digitalocean.py126
-rw-r--r--tests/unittests/test_datasource/test_openstack.py3
-rw-r--r--tests/unittests/test_distros/test_netconfig.py7
-rw-r--r--tests/unittests/test_handler/test_handler_chef.py121
-rw-r--r--tests/unittests/test_handler/test_handler_set_hostname.py9
-rw-r--r--tests/unittests/test_sshutil.py73
7 files changed, 336 insertions, 8 deletions
diff --git a/tests/unittests/helpers.py b/tests/unittests/helpers.py
index 9700a4ca..52305397 100644
--- a/tests/unittests/helpers.py
+++ b/tests/unittests/helpers.py
@@ -35,6 +35,11 @@ else:
if PY26:
# For now add these on, taken from python 2.7 + slightly adjusted
class TestCase(unittest.TestCase):
+ def assertIs(self, expr1, expr2, msg=None):
+ if expr1 is not expr2:
+ standardMsg = '%r is not %r' % (expr1, expr2)
+ self.fail(self._formatMessage(msg, standardMsg))
+
def assertIn(self, member, container, msg=None):
if member not in container:
standardMsg = '%r not found in %r' % (member, container)
diff --git a/tests/unittests/test_datasource/test_digitalocean.py b/tests/unittests/test_datasource/test_digitalocean.py
new file mode 100644
index 00000000..04bee340
--- /dev/null
+++ b/tests/unittests/test_datasource/test_digitalocean.py
@@ -0,0 +1,126 @@
+#
+# Copyright (C) 2014 Neal Shrader
+#
+# Author: Neal Shrader <neal@digitalocean.com>
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License version 3, as
+# published by the Free Software Foundation.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+import httpretty
+import re
+
+from types import ListType
+from urlparse import urlparse
+
+from cloudinit import settings
+from cloudinit import helpers
+from cloudinit.sources import DataSourceDigitalOcean
+
+from .. import helpers as test_helpers
+
+# Abbreviated for the test
+DO_INDEX = """id
+ hostname
+ user-data
+ vendor-data
+ public-keys
+ region"""
+
+DO_MULTIPLE_KEYS = """ssh-rsa AAAAB3NzaC1yc2EAAAA... neal@digitalocean.com
+ ssh-rsa AAAAB3NzaC1yc2EAAAA... neal2@digitalocean.com"""
+DO_SINGLE_KEY = "ssh-rsa AAAAB3NzaC1yc2EAAAA... neal@digitalocean.com"
+
+DO_META = {
+ '': DO_INDEX,
+ 'user-data': '#!/bin/bash\necho "user-data"',
+ 'vendor-data': '#!/bin/bash\necho "vendor-data"',
+ 'public-keys': DO_SINGLE_KEY,
+ 'region': 'nyc3',
+ 'id': '2000000',
+ 'hostname': 'cloudinit-test',
+}
+
+MD_URL_RE = re.compile(r'http://169.254.169.254/metadata/v1/.*')
+
+def _request_callback(method, uri, headers):
+ url_path = urlparse(uri).path
+ if url_path.startswith('/metadata/v1/'):
+ path = url_path.split('/metadata/v1/')[1:][0]
+ else:
+ path = None
+ if path in DO_META:
+ return (200, headers, DO_META.get(path))
+ else:
+ return (404, headers, '')
+
+
+class TestDataSourceDigitalOcean(test_helpers.HttprettyTestCase):
+
+ def setUp(self):
+ self.ds = DataSourceDigitalOcean.DataSourceDigitalOcean(
+ settings.CFG_BUILTIN, None,
+ helpers.Paths({}))
+ super(TestDataSourceDigitalOcean, self).setUp()
+
+ @httpretty.activate
+ def test_connection(self):
+ httpretty.register_uri(
+ httpretty.GET, MD_URL_RE,
+ body=_request_callback)
+
+ success = self.ds.get_data()
+ self.assertTrue(success)
+
+ @httpretty.activate
+ def test_metadata(self):
+ httpretty.register_uri(
+ httpretty.GET, MD_URL_RE,
+ body=_request_callback)
+ self.ds.get_data()
+
+ self.assertEqual(DO_META.get('user-data'),
+ self.ds.get_userdata_raw())
+
+ self.assertEqual(DO_META.get('vendor-data'),
+ self.ds.get_vendordata_raw())
+
+ self.assertEqual(DO_META.get('region'),
+ self.ds.availability_zone)
+
+ self.assertEqual(DO_META.get('id'),
+ self.ds.get_instance_id())
+
+ self.assertEqual(DO_META.get('hostname'),
+ self.ds.get_hostname())
+
+ self.assertEqual('http://mirrors.digitalocean.com/',
+ self.ds.get_package_mirror_info())
+
+ # Single key
+ self.assertEqual([DO_META.get('public-keys')],
+ self.ds.get_public_ssh_keys())
+
+ self.assertIs(type(self.ds.get_public_ssh_keys()), ListType)
+
+ @httpretty.activate
+ def test_multiple_ssh_keys(self):
+ DO_META['public_keys'] = DO_MULTIPLE_KEYS
+ httpretty.register_uri(
+ httpretty.GET, MD_URL_RE,
+ body=_request_callback)
+ self.ds.get_data()
+
+ # Multiple keys
+ self.assertEqual(DO_META.get('public-keys').splitlines(),
+ self.ds.get_public_ssh_keys())
+
+ self.assertIs(type(self.ds.get_public_ssh_keys()), ListType)
diff --git a/tests/unittests/test_datasource/test_openstack.py b/tests/unittests/test_datasource/test_openstack.py
index 7b4e651a..49894e51 100644
--- a/tests/unittests/test_datasource/test_openstack.py
+++ b/tests/unittests/test_datasource/test_openstack.py
@@ -19,7 +19,6 @@
import copy
import json
import re
-import unittest
from StringIO import StringIO
@@ -318,7 +317,7 @@ class TestOpenStackDataSource(test_helpers.HttprettyTestCase):
self.assertIsNone(ds_os.version)
-class TestVendorDataLoading(unittest.TestCase):
+class TestVendorDataLoading(test_helpers.TestCase):
def cvj(self, data):
return openstack.convert_vendordata_json(data)
diff --git a/tests/unittests/test_distros/test_netconfig.py b/tests/unittests/test_distros/test_netconfig.py
index fbdb7b3f..35cc1f43 100644
--- a/tests/unittests/test_distros/test_netconfig.py
+++ b/tests/unittests/test_distros/test_netconfig.py
@@ -182,6 +182,12 @@ NETWORKING=yes
spec=False, passthrough=False)
load_mock = self.mocker.replace(util.load_file,
spec=False, passthrough=False)
+ subp_mock = self.mocker.replace(util.subp,
+ spec=False, passthrough=False)
+
+ subp_mock(['ifconfig', '-a'])
+ self.mocker.count(0, None)
+ self.mocker.result(('vtnet0', ''))
exists_mock(mocker.ARGS)
self.mocker.count(0, None)
@@ -190,6 +196,7 @@ NETWORKING=yes
write_bufs = {}
read_bufs = {
'/etc/rc.conf': '',
+ '/etc/resolv.conf': '',
}
def replace_write(filename, content, mode=0644, omode="wb"):
diff --git a/tests/unittests/test_handler/test_handler_chef.py b/tests/unittests/test_handler/test_handler_chef.py
new file mode 100644
index 00000000..ef1aa208
--- /dev/null
+++ b/tests/unittests/test_handler/test_handler_chef.py
@@ -0,0 +1,121 @@
+import json
+import os
+
+from cloudinit.config import cc_chef
+
+from cloudinit import cloud
+from cloudinit import distros
+from cloudinit import helpers
+from cloudinit import util
+from cloudinit.sources import DataSourceNone
+
+from .. import helpers as t_help
+
+import logging
+
+LOG = logging.getLogger(__name__)
+
+
+class TestChef(t_help.FilesystemMockingTestCase):
+ def setUp(self):
+ super(TestChef, self).setUp()
+ self.tmp = self.makeDir(prefix="unittest_")
+
+ def fetch_cloud(self, distro_kind):
+ cls = distros.fetch(distro_kind)
+ paths = helpers.Paths({})
+ distro = cls(distro_kind, {}, paths)
+ ds = DataSourceNone.DataSourceNone({}, distro, paths, None)
+ return cloud.Cloud(ds, paths, {}, distro, None)
+
+ def test_no_config(self):
+ self.patchUtils(self.tmp)
+ self.patchOS(self.tmp)
+
+ cfg = {}
+ cc_chef.handle('chef', cfg, self.fetch_cloud('ubuntu'), LOG, [])
+ for d in cc_chef.CHEF_DIRS:
+ self.assertFalse(os.path.isdir(d))
+
+ def test_basic_config(self):
+ # This should create a file of the format...
+ """
+ # Created by cloud-init v. 0.7.6 on Sat, 11 Oct 2014 23:57:21 +0000
+ log_level :info
+ ssl_verify_mode :verify_none
+ log_location "/var/log/chef/client.log"
+ validation_client_name "bob"
+ validation_key "/etc/chef/validation.pem"
+ client_key "/etc/chef/client.pem"
+ chef_server_url "localhost"
+ environment "_default"
+ node_name "iid-datasource-none"
+ json_attribs "/etc/chef/firstboot.json"
+ file_cache_path "/var/cache/chef"
+ file_backup_path "/var/backups/chef"
+ pid_file "/var/run/chef/client.pid"
+ Chef::Log::Formatter.show_time = true
+ """
+ tpl_file = util.load_file('templates/chef_client.rb.tmpl')
+ self.patchUtils(self.tmp)
+ self.patchOS(self.tmp)
+
+ util.write_file('/etc/cloud/templates/chef_client.rb.tmpl', tpl_file)
+ cfg = {
+ 'chef': {
+ 'server_url': 'localhost',
+ 'validation_name': 'bob',
+ },
+ }
+ cc_chef.handle('chef', cfg, self.fetch_cloud('ubuntu'), LOG, [])
+ for d in cc_chef.CHEF_DIRS:
+ self.assertTrue(os.path.isdir(d))
+ c = util.load_file(cc_chef.CHEF_RB_PATH)
+ for k, v in cfg['chef'].items():
+ self.assertIn(v, c)
+ for k, v in cc_chef.CHEF_RB_TPL_DEFAULTS.items():
+ if isinstance(v, basestring):
+ self.assertIn(v, c)
+ c = util.load_file(cc_chef.CHEF_FB_PATH)
+ self.assertEqual({}, json.loads(c))
+
+ def test_firstboot_json(self):
+ self.patchUtils(self.tmp)
+ self.patchOS(self.tmp)
+
+ cfg = {
+ 'chef': {
+ 'server_url': 'localhost',
+ 'validation_name': 'bob',
+ 'run_list': ['a', 'b', 'c'],
+ 'initial_attributes': {
+ 'c': 'd',
+ }
+ },
+ }
+ cc_chef.handle('chef', cfg, self.fetch_cloud('ubuntu'), LOG, [])
+ c = util.load_file(cc_chef.CHEF_FB_PATH)
+ self.assertEqual(
+ {
+ 'run_list': ['a', 'b', 'c'],
+ 'c': 'd',
+ }, json.loads(c))
+
+ def test_template_deletes(self):
+ tpl_file = util.load_file('templates/chef_client.rb.tmpl')
+ self.patchUtils(self.tmp)
+ self.patchOS(self.tmp)
+
+ util.write_file('/etc/cloud/templates/chef_client.rb.tmpl', tpl_file)
+ cfg = {
+ 'chef': {
+ 'server_url': 'localhost',
+ 'validation_name': 'bob',
+ 'json_attribs': None,
+ 'show_time': None,
+ },
+ }
+ cc_chef.handle('chef', cfg, self.fetch_cloud('ubuntu'), LOG, [])
+ c = util.load_file(cc_chef.CHEF_RB_PATH)
+ self.assertNotIn('json_attribs', c)
+ self.assertNotIn('Formatter.show_time', c)
diff --git a/tests/unittests/test_handler/test_handler_set_hostname.py b/tests/unittests/test_handler/test_handler_set_hostname.py
index 03004ab9..e1530e30 100644
--- a/tests/unittests/test_handler/test_handler_set_hostname.py
+++ b/tests/unittests/test_handler/test_handler_set_hostname.py
@@ -37,10 +37,11 @@ class TestHostname(t_help.FilesystemMockingTestCase):
self.patchUtils(self.tmp)
cc_set_hostname.handle('cc_set_hostname',
cfg, cc, LOG, [])
- contents = util.load_file("/etc/sysconfig/network")
- n_cfg = ConfigObj(StringIO(contents))
- self.assertEquals({'HOSTNAME': 'blah.blah.blah.yahoo.com'},
- dict(n_cfg))
+ if not distro.uses_systemd():
+ contents = util.load_file("/etc/sysconfig/network")
+ n_cfg = ConfigObj(StringIO(contents))
+ self.assertEquals({'HOSTNAME': 'blah.blah.blah.yahoo.com'},
+ dict(n_cfg))
def test_write_hostname_debian(self):
cfg = {
diff --git a/tests/unittests/test_sshutil.py b/tests/unittests/test_sshutil.py
index d8662cac..3b317121 100644
--- a/tests/unittests/test_sshutil.py
+++ b/tests/unittests/test_sshutil.py
@@ -1,5 +1,7 @@
+from mock import patch
+
+from . import helpers as test_helpers
from cloudinit import ssh_util
-from unittest import TestCase
VALID_CONTENT = {
@@ -35,7 +37,7 @@ TEST_OPTIONS = ("no-port-forwarding,no-agent-forwarding,no-X11-forwarding,"
'user \"root\".\';echo;sleep 10"')
-class TestAuthKeyLineParser(TestCase):
+class TestAuthKeyLineParser(test_helpers.TestCase):
def test_simple_parse(self):
# test key line with common 3 fields (keytype, base64, comment)
parser = ssh_util.AuthKeyLineParser()
@@ -98,4 +100,71 @@ class TestAuthKeyLineParser(TestCase):
self.assertFalse(key.valid())
+class TestParseSSHConfig(test_helpers.TestCase):
+
+ def setUp(self):
+ self.load_file_patch = patch('cloudinit.ssh_util.util.load_file')
+ self.load_file = self.load_file_patch.start()
+ self.isfile_patch = patch('cloudinit.ssh_util.os.path.isfile')
+ self.isfile = self.isfile_patch.start()
+ self.isfile.return_value = True
+
+ def tearDown(self):
+ self.load_file_patch.stop()
+ self.isfile_patch.stop()
+
+ def test_not_a_file(self):
+ self.isfile.return_value = False
+ self.load_file.side_effect = IOError
+ ret = ssh_util.parse_ssh_config('not a real file')
+ self.assertEqual([], ret)
+
+ def test_empty_file(self):
+ self.load_file.return_value = ''
+ ret = ssh_util.parse_ssh_config('some real file')
+ self.assertEqual([], ret)
+
+ def test_comment_line(self):
+ comment_line = '# This is a comment'
+ self.load_file.return_value = comment_line
+ ret = ssh_util.parse_ssh_config('some real file')
+ self.assertEqual(1, len(ret))
+ self.assertEqual(comment_line, ret[0].line)
+
+ def test_blank_lines(self):
+ lines = ['', '\t', ' ']
+ self.load_file.return_value = '\n'.join(lines)
+ ret = ssh_util.parse_ssh_config('some real file')
+ self.assertEqual(len(lines), len(ret))
+ for line in ret:
+ self.assertEqual('', line.line)
+
+ def test_lower_case_config(self):
+ self.load_file.return_value = 'foo bar'
+ ret = ssh_util.parse_ssh_config('some real file')
+ self.assertEqual(1, len(ret))
+ self.assertEqual('foo', ret[0].key)
+ self.assertEqual('bar', ret[0].value)
+
+ def test_upper_case_config(self):
+ self.load_file.return_value = 'Foo Bar'
+ ret = ssh_util.parse_ssh_config('some real file')
+ self.assertEqual(1, len(ret))
+ self.assertEqual('foo', ret[0].key)
+ self.assertEqual('Bar', ret[0].value)
+
+ def test_lower_case_with_equals(self):
+ self.load_file.return_value = 'foo=bar'
+ ret = ssh_util.parse_ssh_config('some real file')
+ self.assertEqual(1, len(ret))
+ self.assertEqual('foo', ret[0].key)
+ self.assertEqual('bar', ret[0].value)
+
+ def test_upper_case_with_equals(self):
+ self.load_file.return_value = 'Foo=bar'
+ ret = ssh_util.parse_ssh_config('some real file')
+ self.assertEqual(1, len(ret))
+ self.assertEqual('foo', ret[0].key)
+ self.assertEqual('bar', ret[0].value)
+
# vi: ts=4 expandtab