summaryrefslogtreecommitdiff
path: root/tests
diff options
context:
space:
mode:
Diffstat (limited to 'tests')
-rw-r--r--tests/unittests/helpers.py5
-rw-r--r--tests/unittests/test_data.py9
-rw-r--r--tests/unittests/test_datasource/test_digitalocean.py126
-rw-r--r--tests/unittests/test_datasource/test_nocloud.py1
-rw-r--r--tests/unittests/test_datasource/test_openstack.py50
-rw-r--r--tests/unittests/test_distros/test_netconfig.py11
-rw-r--r--tests/unittests/test_handler/test_handler_chef.py121
-rw-r--r--tests/unittests/test_handler/test_handler_debug.py78
-rw-r--r--tests/unittests/test_handler/test_handler_set_hostname.py9
-rw-r--r--tests/unittests/test_runs/test_merge_run.py2
-rw-r--r--tests/unittests/test_sshutil.py73
11 files changed, 461 insertions, 24 deletions
diff --git a/tests/unittests/helpers.py b/tests/unittests/helpers.py
index 9700a4ca..52305397 100644
--- a/tests/unittests/helpers.py
+++ b/tests/unittests/helpers.py
@@ -35,6 +35,11 @@ else:
if PY26:
# For now add these on, taken from python 2.7 + slightly adjusted
class TestCase(unittest.TestCase):
+ def assertIs(self, expr1, expr2, msg=None):
+ if expr1 is not expr2:
+ standardMsg = '%r is not %r' % (expr1, expr2)
+ self.fail(self._formatMessage(msg, standardMsg))
+
def assertIn(self, member, container, msg=None):
if member not in container:
standardMsg = '%r not found in %r' % (member, container)
diff --git a/tests/unittests/test_data.py b/tests/unittests/test_data.py
index 41d0dc29..fd6bd8a1 100644
--- a/tests/unittests/test_data.py
+++ b/tests/unittests/test_data.py
@@ -106,7 +106,7 @@ class TestConsumeUserData(helpers.FilesystemMockingTestCase):
initer.read_cfg()
initer.initialize()
initer.fetch()
- _iid = initer.instancify()
+ initer.instancify()
initer.update()
initer.cloudify().run('consume_data',
initer.consume_data,
@@ -145,7 +145,7 @@ class TestConsumeUserData(helpers.FilesystemMockingTestCase):
initer.read_cfg()
initer.initialize()
initer.fetch()
- _iid = initer.instancify()
+ initer.instancify()
initer.update()
initer.cloudify().run('consume_data',
initer.consume_data,
@@ -221,7 +221,7 @@ run:
initer.read_cfg()
initer.initialize()
initer.fetch()
- _iid = initer.instancify()
+ initer.instancify()
initer.update()
initer.cloudify().run('consume_data',
initer.consume_data,
@@ -256,7 +256,7 @@ vendor_data:
initer.read_cfg()
initer.initialize()
initer.fetch()
- _iid = initer.instancify()
+ initer.instancify()
initer.update()
initer.cloudify().run('consume_data',
initer.consume_data,
@@ -264,7 +264,6 @@ vendor_data:
freq=PER_INSTANCE)
mods = stages.Modules(initer)
(_which_ran, _failures) = mods.run_section('cloud_init_modules')
- _cfg = mods.cfg
vendor_script = initer.paths.get_ipath_cur('vendor_scripts')
vendor_script_fns = "%s%s/part-001" % (new_root, vendor_script)
self.assertTrue(os.path.exists(vendor_script_fns))
diff --git a/tests/unittests/test_datasource/test_digitalocean.py b/tests/unittests/test_datasource/test_digitalocean.py
new file mode 100644
index 00000000..04bee340
--- /dev/null
+++ b/tests/unittests/test_datasource/test_digitalocean.py
@@ -0,0 +1,126 @@
+#
+# Copyright (C) 2014 Neal Shrader
+#
+# Author: Neal Shrader <neal@digitalocean.com>
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License version 3, as
+# published by the Free Software Foundation.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+import httpretty
+import re
+
+from types import ListType
+from urlparse import urlparse
+
+from cloudinit import settings
+from cloudinit import helpers
+from cloudinit.sources import DataSourceDigitalOcean
+
+from .. import helpers as test_helpers
+
+# Abbreviated for the test
+DO_INDEX = """id
+ hostname
+ user-data
+ vendor-data
+ public-keys
+ region"""
+
+DO_MULTIPLE_KEYS = """ssh-rsa AAAAB3NzaC1yc2EAAAA... neal@digitalocean.com
+ ssh-rsa AAAAB3NzaC1yc2EAAAA... neal2@digitalocean.com"""
+DO_SINGLE_KEY = "ssh-rsa AAAAB3NzaC1yc2EAAAA... neal@digitalocean.com"
+
+DO_META = {
+ '': DO_INDEX,
+ 'user-data': '#!/bin/bash\necho "user-data"',
+ 'vendor-data': '#!/bin/bash\necho "vendor-data"',
+ 'public-keys': DO_SINGLE_KEY,
+ 'region': 'nyc3',
+ 'id': '2000000',
+ 'hostname': 'cloudinit-test',
+}
+
+MD_URL_RE = re.compile(r'http://169.254.169.254/metadata/v1/.*')
+
+def _request_callback(method, uri, headers):
+ url_path = urlparse(uri).path
+ if url_path.startswith('/metadata/v1/'):
+ path = url_path.split('/metadata/v1/')[1:][0]
+ else:
+ path = None
+ if path in DO_META:
+ return (200, headers, DO_META.get(path))
+ else:
+ return (404, headers, '')
+
+
+class TestDataSourceDigitalOcean(test_helpers.HttprettyTestCase):
+
+ def setUp(self):
+ self.ds = DataSourceDigitalOcean.DataSourceDigitalOcean(
+ settings.CFG_BUILTIN, None,
+ helpers.Paths({}))
+ super(TestDataSourceDigitalOcean, self).setUp()
+
+ @httpretty.activate
+ def test_connection(self):
+ httpretty.register_uri(
+ httpretty.GET, MD_URL_RE,
+ body=_request_callback)
+
+ success = self.ds.get_data()
+ self.assertTrue(success)
+
+ @httpretty.activate
+ def test_metadata(self):
+ httpretty.register_uri(
+ httpretty.GET, MD_URL_RE,
+ body=_request_callback)
+ self.ds.get_data()
+
+ self.assertEqual(DO_META.get('user-data'),
+ self.ds.get_userdata_raw())
+
+ self.assertEqual(DO_META.get('vendor-data'),
+ self.ds.get_vendordata_raw())
+
+ self.assertEqual(DO_META.get('region'),
+ self.ds.availability_zone)
+
+ self.assertEqual(DO_META.get('id'),
+ self.ds.get_instance_id())
+
+ self.assertEqual(DO_META.get('hostname'),
+ self.ds.get_hostname())
+
+ self.assertEqual('http://mirrors.digitalocean.com/',
+ self.ds.get_package_mirror_info())
+
+ # Single key
+ self.assertEqual([DO_META.get('public-keys')],
+ self.ds.get_public_ssh_keys())
+
+ self.assertIs(type(self.ds.get_public_ssh_keys()), ListType)
+
+ @httpretty.activate
+ def test_multiple_ssh_keys(self):
+ DO_META['public_keys'] = DO_MULTIPLE_KEYS
+ httpretty.register_uri(
+ httpretty.GET, MD_URL_RE,
+ body=_request_callback)
+ self.ds.get_data()
+
+ # Multiple keys
+ self.assertEqual(DO_META.get('public-keys').splitlines(),
+ self.ds.get_public_ssh_keys())
+
+ self.assertIs(type(self.ds.get_public_ssh_keys()), ListType)
diff --git a/tests/unittests/test_datasource/test_nocloud.py b/tests/unittests/test_datasource/test_nocloud.py
index 8bcc026c..e9235951 100644
--- a/tests/unittests/test_datasource/test_nocloud.py
+++ b/tests/unittests/test_datasource/test_nocloud.py
@@ -57,7 +57,6 @@ class TestNoCloudDataSource(MockerTestCase):
pass
def my_find_devs_with(*args, **kwargs):
- _f = (args, kwargs)
raise PsuedoException
self.apply_patches([(util, 'find_devs_with', my_find_devs_with)])
diff --git a/tests/unittests/test_datasource/test_openstack.py b/tests/unittests/test_datasource/test_openstack.py
index 530fba20..49894e51 100644
--- a/tests/unittests/test_datasource/test_openstack.py
+++ b/tests/unittests/test_datasource/test_openstack.py
@@ -142,7 +142,7 @@ class TestOpenStackDataSource(test_helpers.HttprettyTestCase):
@hp.activate
def test_successful(self):
_register_uris(self.VERSION, EC2_FILES, EC2_META, OS_FILES)
- f = ds.read_metadata_service(BASE_URL, version=self.VERSION)
+ f = ds.read_metadata_service(BASE_URL)
self.assertEquals(VENDOR_DATA, f.get('vendordata'))
self.assertEquals(CONTENT_0, f['files']['/etc/foo.cfg'])
self.assertEquals(CONTENT_1, f['files']['/etc/bar/bar.cfg'])
@@ -164,7 +164,7 @@ class TestOpenStackDataSource(test_helpers.HttprettyTestCase):
@hp.activate
def test_no_ec2(self):
_register_uris(self.VERSION, {}, {}, OS_FILES)
- f = ds.read_metadata_service(BASE_URL, version=self.VERSION)
+ f = ds.read_metadata_service(BASE_URL)
self.assertEquals(VENDOR_DATA, f.get('vendordata'))
self.assertEquals(CONTENT_0, f['files']['/etc/foo.cfg'])
self.assertEquals(CONTENT_1, f['files']['/etc/bar/bar.cfg'])
@@ -180,7 +180,7 @@ class TestOpenStackDataSource(test_helpers.HttprettyTestCase):
os_files.pop(k, None)
_register_uris(self.VERSION, {}, {}, os_files)
self.assertRaises(openstack.NonReadable, ds.read_metadata_service,
- BASE_URL, version=self.VERSION)
+ BASE_URL)
@hp.activate
def test_bad_uuid(self):
@@ -192,7 +192,7 @@ class TestOpenStackDataSource(test_helpers.HttprettyTestCase):
os_files[k] = json.dumps(os_meta)
_register_uris(self.VERSION, {}, {}, os_files)
self.assertRaises(openstack.BrokenMetadata, ds.read_metadata_service,
- BASE_URL, version=self.VERSION)
+ BASE_URL)
@hp.activate
def test_userdata_empty(self):
@@ -201,7 +201,7 @@ class TestOpenStackDataSource(test_helpers.HttprettyTestCase):
if k.endswith('user_data'):
os_files.pop(k, None)
_register_uris(self.VERSION, {}, {}, os_files)
- f = ds.read_metadata_service(BASE_URL, version=self.VERSION)
+ f = ds.read_metadata_service(BASE_URL)
self.assertEquals(VENDOR_DATA, f.get('vendordata'))
self.assertEquals(CONTENT_0, f['files']['/etc/foo.cfg'])
self.assertEquals(CONTENT_1, f['files']['/etc/bar/bar.cfg'])
@@ -214,7 +214,7 @@ class TestOpenStackDataSource(test_helpers.HttprettyTestCase):
if k.endswith('vendor_data.json'):
os_files.pop(k, None)
_register_uris(self.VERSION, {}, {}, os_files)
- f = ds.read_metadata_service(BASE_URL, version=self.VERSION)
+ f = ds.read_metadata_service(BASE_URL)
self.assertEquals(CONTENT_0, f['files']['/etc/foo.cfg'])
self.assertEquals(CONTENT_1, f['files']['/etc/bar/bar.cfg'])
self.assertFalse(f.get('vendordata'))
@@ -227,7 +227,7 @@ class TestOpenStackDataSource(test_helpers.HttprettyTestCase):
os_files[k] = '{' # some invalid json
_register_uris(self.VERSION, {}, {}, os_files)
self.assertRaises(openstack.BrokenMetadata, ds.read_metadata_service,
- BASE_URL, version=self.VERSION)
+ BASE_URL)
@hp.activate
def test_metadata_invalid(self):
@@ -237,7 +237,7 @@ class TestOpenStackDataSource(test_helpers.HttprettyTestCase):
os_files[k] = '{' # some invalid json
_register_uris(self.VERSION, {}, {}, os_files)
self.assertRaises(openstack.BrokenMetadata, ds.read_metadata_service,
- BASE_URL, version=self.VERSION)
+ BASE_URL)
@hp.activate
def test_datasource(self):
@@ -256,7 +256,8 @@ class TestOpenStackDataSource(test_helpers.HttprettyTestCase):
self.assertEquals(EC2_META, ds_os.ec2_metadata)
self.assertEquals(USER_DATA, ds_os.userdata_raw)
self.assertEquals(2, len(ds_os.files))
- self.assertEquals(VENDOR_DATA, ds_os.vendordata_raw)
+ self.assertEquals(VENDOR_DATA, ds_os.vendordata_pure)
+ self.assertEquals(ds_os.vendordata_raw, None)
@hp.activate
def test_bad_datasource_meta(self):
@@ -314,3 +315,34 @@ class TestOpenStackDataSource(test_helpers.HttprettyTestCase):
found = ds_os.get_data()
self.assertFalse(found)
self.assertIsNone(ds_os.version)
+
+
+class TestVendorDataLoading(test_helpers.TestCase):
+ def cvj(self, data):
+ return openstack.convert_vendordata_json(data)
+
+ def test_vd_load_none(self):
+ # non-existant vendor-data should return none
+ self.assertIsNone(self.cvj(None))
+
+ def test_vd_load_string(self):
+ self.assertEqual(self.cvj("foobar"), "foobar")
+
+ def test_vd_load_list(self):
+ data = [{'foo': 'bar'}, 'mystring', list(['another', 'list'])]
+ self.assertEqual(self.cvj(data), data)
+
+ def test_vd_load_dict_no_ci(self):
+ self.assertEqual(self.cvj({'foo': 'bar'}), None)
+
+ def test_vd_load_dict_ci_dict(self):
+ self.assertRaises(ValueError, self.cvj,
+ {'foo': 'bar', 'cloud-init': {'x': 1}})
+
+ def test_vd_load_dict_ci_string(self):
+ data = {'foo': 'bar', 'cloud-init': 'VENDOR_DATA'}
+ self.assertEqual(self.cvj(data), data['cloud-init'])
+
+ def test_vd_load_dict_ci_list(self):
+ data = {'foo': 'bar', 'cloud-init': ['VD_1', 'VD_2']}
+ self.assertEqual(self.cvj(data), data['cloud-init'])
diff --git a/tests/unittests/test_distros/test_netconfig.py b/tests/unittests/test_distros/test_netconfig.py
index ed997a1d..35cc1f43 100644
--- a/tests/unittests/test_distros/test_netconfig.py
+++ b/tests/unittests/test_distros/test_netconfig.py
@@ -182,6 +182,12 @@ NETWORKING=yes
spec=False, passthrough=False)
load_mock = self.mocker.replace(util.load_file,
spec=False, passthrough=False)
+ subp_mock = self.mocker.replace(util.subp,
+ spec=False, passthrough=False)
+
+ subp_mock(['ifconfig', '-a'])
+ self.mocker.count(0, None)
+ self.mocker.result(('vtnet0', ''))
exists_mock(mocker.ARGS)
self.mocker.count(0, None)
@@ -190,6 +196,7 @@ NETWORKING=yes
write_bufs = {}
read_bufs = {
'/etc/rc.conf': '',
+ '/etc/resolv.conf': '',
}
def replace_write(filename, content, mode=0644, omode="wb"):
@@ -223,8 +230,8 @@ NETWORKING=yes
self.assertIn('/etc/rc.conf', write_bufs)
write_buf = write_bufs['/etc/rc.conf']
expected_buf = '''
-ifconfig_eth0="192.168.1.5 netmask 255.255.255.0"
-ifconfig_eth1="DHCP"
+ifconfig_vtnet0="192.168.1.5 netmask 255.255.255.0"
+ifconfig_vtnet1="DHCP"
defaultrouter="192.168.1.254"
'''
self.assertCfgEquals(expected_buf, str(write_buf))
diff --git a/tests/unittests/test_handler/test_handler_chef.py b/tests/unittests/test_handler/test_handler_chef.py
new file mode 100644
index 00000000..ef1aa208
--- /dev/null
+++ b/tests/unittests/test_handler/test_handler_chef.py
@@ -0,0 +1,121 @@
+import json
+import os
+
+from cloudinit.config import cc_chef
+
+from cloudinit import cloud
+from cloudinit import distros
+from cloudinit import helpers
+from cloudinit import util
+from cloudinit.sources import DataSourceNone
+
+from .. import helpers as t_help
+
+import logging
+
+LOG = logging.getLogger(__name__)
+
+
+class TestChef(t_help.FilesystemMockingTestCase):
+ def setUp(self):
+ super(TestChef, self).setUp()
+ self.tmp = self.makeDir(prefix="unittest_")
+
+ def fetch_cloud(self, distro_kind):
+ cls = distros.fetch(distro_kind)
+ paths = helpers.Paths({})
+ distro = cls(distro_kind, {}, paths)
+ ds = DataSourceNone.DataSourceNone({}, distro, paths, None)
+ return cloud.Cloud(ds, paths, {}, distro, None)
+
+ def test_no_config(self):
+ self.patchUtils(self.tmp)
+ self.patchOS(self.tmp)
+
+ cfg = {}
+ cc_chef.handle('chef', cfg, self.fetch_cloud('ubuntu'), LOG, [])
+ for d in cc_chef.CHEF_DIRS:
+ self.assertFalse(os.path.isdir(d))
+
+ def test_basic_config(self):
+ # This should create a file of the format...
+ """
+ # Created by cloud-init v. 0.7.6 on Sat, 11 Oct 2014 23:57:21 +0000
+ log_level :info
+ ssl_verify_mode :verify_none
+ log_location "/var/log/chef/client.log"
+ validation_client_name "bob"
+ validation_key "/etc/chef/validation.pem"
+ client_key "/etc/chef/client.pem"
+ chef_server_url "localhost"
+ environment "_default"
+ node_name "iid-datasource-none"
+ json_attribs "/etc/chef/firstboot.json"
+ file_cache_path "/var/cache/chef"
+ file_backup_path "/var/backups/chef"
+ pid_file "/var/run/chef/client.pid"
+ Chef::Log::Formatter.show_time = true
+ """
+ tpl_file = util.load_file('templates/chef_client.rb.tmpl')
+ self.patchUtils(self.tmp)
+ self.patchOS(self.tmp)
+
+ util.write_file('/etc/cloud/templates/chef_client.rb.tmpl', tpl_file)
+ cfg = {
+ 'chef': {
+ 'server_url': 'localhost',
+ 'validation_name': 'bob',
+ },
+ }
+ cc_chef.handle('chef', cfg, self.fetch_cloud('ubuntu'), LOG, [])
+ for d in cc_chef.CHEF_DIRS:
+ self.assertTrue(os.path.isdir(d))
+ c = util.load_file(cc_chef.CHEF_RB_PATH)
+ for k, v in cfg['chef'].items():
+ self.assertIn(v, c)
+ for k, v in cc_chef.CHEF_RB_TPL_DEFAULTS.items():
+ if isinstance(v, basestring):
+ self.assertIn(v, c)
+ c = util.load_file(cc_chef.CHEF_FB_PATH)
+ self.assertEqual({}, json.loads(c))
+
+ def test_firstboot_json(self):
+ self.patchUtils(self.tmp)
+ self.patchOS(self.tmp)
+
+ cfg = {
+ 'chef': {
+ 'server_url': 'localhost',
+ 'validation_name': 'bob',
+ 'run_list': ['a', 'b', 'c'],
+ 'initial_attributes': {
+ 'c': 'd',
+ }
+ },
+ }
+ cc_chef.handle('chef', cfg, self.fetch_cloud('ubuntu'), LOG, [])
+ c = util.load_file(cc_chef.CHEF_FB_PATH)
+ self.assertEqual(
+ {
+ 'run_list': ['a', 'b', 'c'],
+ 'c': 'd',
+ }, json.loads(c))
+
+ def test_template_deletes(self):
+ tpl_file = util.load_file('templates/chef_client.rb.tmpl')
+ self.patchUtils(self.tmp)
+ self.patchOS(self.tmp)
+
+ util.write_file('/etc/cloud/templates/chef_client.rb.tmpl', tpl_file)
+ cfg = {
+ 'chef': {
+ 'server_url': 'localhost',
+ 'validation_name': 'bob',
+ 'json_attribs': None,
+ 'show_time': None,
+ },
+ }
+ cc_chef.handle('chef', cfg, self.fetch_cloud('ubuntu'), LOG, [])
+ c = util.load_file(cc_chef.CHEF_RB_PATH)
+ self.assertNotIn('json_attribs', c)
+ self.assertNotIn('Formatter.show_time', c)
diff --git a/tests/unittests/test_handler/test_handler_debug.py b/tests/unittests/test_handler/test_handler_debug.py
new file mode 100644
index 00000000..bd9e29d8
--- /dev/null
+++ b/tests/unittests/test_handler/test_handler_debug.py
@@ -0,0 +1,78 @@
+# vi: ts=4 expandtab
+#
+# Copyright (C) 2014 Yahoo! Inc.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License version 3, as
+# published by the Free Software Foundation.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+from cloudinit.config import cc_debug
+
+from cloudinit import cloud
+from cloudinit import distros
+from cloudinit import helpers
+from cloudinit import util
+
+from cloudinit.sources import DataSourceNone
+
+from .. import helpers as t_help
+
+import logging
+
+LOG = logging.getLogger(__name__)
+
+
+class TestDebug(t_help.FilesystemMockingTestCase):
+ def setUp(self):
+ super(TestDebug, self).setUp()
+ self.new_root = self.makeDir(prefix="unittest_")
+
+ def _get_cloud(self, distro, metadata=None):
+ self.patchUtils(self.new_root)
+ paths = helpers.Paths({})
+ cls = distros.fetch(distro)
+ d = cls(distro, {}, paths)
+ ds = DataSourceNone.DataSourceNone({}, d, paths)
+ if metadata:
+ ds.metadata.update(metadata)
+ return cloud.Cloud(ds, paths, {}, d, None)
+
+ def test_debug_write(self):
+ cfg = {
+ 'abc': '123',
+ 'c': u'\u20a0',
+ 'debug': {
+ 'verbose': True,
+ # Does not actually write here due to mocking...
+ 'output': '/var/log/cloud-init-debug.log',
+ },
+ }
+ cc = self._get_cloud('ubuntu')
+ cc_debug.handle('cc_debug', cfg, cc, LOG, [])
+ contents = util.load_file('/var/log/cloud-init-debug.log')
+ # Some basic sanity tests...
+ self.assertGreater(len(contents), 0)
+ for k in cfg.keys():
+ self.assertIn(k, contents)
+
+ def test_debug_no_write(self):
+ cfg = {
+ 'abc': '123',
+ 'debug': {
+ 'verbose': False,
+ # Does not actually write here due to mocking...
+ 'output': '/var/log/cloud-init-debug.log',
+ },
+ }
+ cc = self._get_cloud('ubuntu')
+ cc_debug.handle('cc_debug', cfg, cc, LOG, [])
+ self.assertRaises(IOError,
+ util.load_file, '/var/log/cloud-init-debug.log')
diff --git a/tests/unittests/test_handler/test_handler_set_hostname.py b/tests/unittests/test_handler/test_handler_set_hostname.py
index 03004ab9..e1530e30 100644
--- a/tests/unittests/test_handler/test_handler_set_hostname.py
+++ b/tests/unittests/test_handler/test_handler_set_hostname.py
@@ -37,10 +37,11 @@ class TestHostname(t_help.FilesystemMockingTestCase):
self.patchUtils(self.tmp)
cc_set_hostname.handle('cc_set_hostname',
cfg, cc, LOG, [])
- contents = util.load_file("/etc/sysconfig/network")
- n_cfg = ConfigObj(StringIO(contents))
- self.assertEquals({'HOSTNAME': 'blah.blah.blah.yahoo.com'},
- dict(n_cfg))
+ if not distro.uses_systemd():
+ contents = util.load_file("/etc/sysconfig/network")
+ n_cfg = ConfigObj(StringIO(contents))
+ self.assertEquals({'HOSTNAME': 'blah.blah.blah.yahoo.com'},
+ dict(n_cfg))
def test_write_hostname_debian(self):
cfg = {
diff --git a/tests/unittests/test_runs/test_merge_run.py b/tests/unittests/test_runs/test_merge_run.py
index 32b41925..977adb34 100644
--- a/tests/unittests/test_runs/test_merge_run.py
+++ b/tests/unittests/test_runs/test_merge_run.py
@@ -33,7 +33,7 @@ class TestMergeRun(helpers.FilesystemMockingTestCase):
initer.initialize()
initer.fetch()
initer.datasource.userdata_raw = ud
- _iid = initer.instancify()
+ initer.instancify()
initer.update()
initer.cloudify().run('consume_data',
initer.consume_data,
diff --git a/tests/unittests/test_sshutil.py b/tests/unittests/test_sshutil.py
index d8662cac..3b317121 100644
--- a/tests/unittests/test_sshutil.py
+++ b/tests/unittests/test_sshutil.py
@@ -1,5 +1,7 @@
+from mock import patch
+
+from . import helpers as test_helpers
from cloudinit import ssh_util
-from unittest import TestCase
VALID_CONTENT = {
@@ -35,7 +37,7 @@ TEST_OPTIONS = ("no-port-forwarding,no-agent-forwarding,no-X11-forwarding,"
'user \"root\".\';echo;sleep 10"')
-class TestAuthKeyLineParser(TestCase):
+class TestAuthKeyLineParser(test_helpers.TestCase):
def test_simple_parse(self):
# test key line with common 3 fields (keytype, base64, comment)
parser = ssh_util.AuthKeyLineParser()
@@ -98,4 +100,71 @@ class TestAuthKeyLineParser(TestCase):
self.assertFalse(key.valid())
+class TestParseSSHConfig(test_helpers.TestCase):
+
+ def setUp(self):
+ self.load_file_patch = patch('cloudinit.ssh_util.util.load_file')
+ self.load_file = self.load_file_patch.start()
+ self.isfile_patch = patch('cloudinit.ssh_util.os.path.isfile')
+ self.isfile = self.isfile_patch.start()
+ self.isfile.return_value = True
+
+ def tearDown(self):
+ self.load_file_patch.stop()
+ self.isfile_patch.stop()
+
+ def test_not_a_file(self):
+ self.isfile.return_value = False
+ self.load_file.side_effect = IOError
+ ret = ssh_util.parse_ssh_config('not a real file')
+ self.assertEqual([], ret)
+
+ def test_empty_file(self):
+ self.load_file.return_value = ''
+ ret = ssh_util.parse_ssh_config('some real file')
+ self.assertEqual([], ret)
+
+ def test_comment_line(self):
+ comment_line = '# This is a comment'
+ self.load_file.return_value = comment_line
+ ret = ssh_util.parse_ssh_config('some real file')
+ self.assertEqual(1, len(ret))
+ self.assertEqual(comment_line, ret[0].line)
+
+ def test_blank_lines(self):
+ lines = ['', '\t', ' ']
+ self.load_file.return_value = '\n'.join(lines)
+ ret = ssh_util.parse_ssh_config('some real file')
+ self.assertEqual(len(lines), len(ret))
+ for line in ret:
+ self.assertEqual('', line.line)
+
+ def test_lower_case_config(self):
+ self.load_file.return_value = 'foo bar'
+ ret = ssh_util.parse_ssh_config('some real file')
+ self.assertEqual(1, len(ret))
+ self.assertEqual('foo', ret[0].key)
+ self.assertEqual('bar', ret[0].value)
+
+ def test_upper_case_config(self):
+ self.load_file.return_value = 'Foo Bar'
+ ret = ssh_util.parse_ssh_config('some real file')
+ self.assertEqual(1, len(ret))
+ self.assertEqual('foo', ret[0].key)
+ self.assertEqual('Bar', ret[0].value)
+
+ def test_lower_case_with_equals(self):
+ self.load_file.return_value = 'foo=bar'
+ ret = ssh_util.parse_ssh_config('some real file')
+ self.assertEqual(1, len(ret))
+ self.assertEqual('foo', ret[0].key)
+ self.assertEqual('bar', ret[0].value)
+
+ def test_upper_case_with_equals(self):
+ self.load_file.return_value = 'Foo=bar'
+ ret = ssh_util.parse_ssh_config('some real file')
+ self.assertEqual(1, len(ret))
+ self.assertEqual('foo', ret[0].key)
+ self.assertEqual('bar', ret[0].value)
+
# vi: ts=4 expandtab