summaryrefslogtreecommitdiff
path: root/tests
diff options
context:
space:
mode:
Diffstat (limited to 'tests')
-rw-r--r--tests/unittests/test__init__.py35
-rw-r--r--tests/unittests/test_builtin_handlers.py5
-rw-r--r--tests/unittests/test_datasource/test_altcloud.py445
-rw-r--r--tests/unittests/test_datasource/test_configdrive.py177
-rw-r--r--tests/unittests/test_datasource/test_maas.py24
-rw-r--r--tests/unittests/test_distros/test_generic.py121
-rw-r--r--tests/unittests/test_handler/test_handler_ca_certs.py32
-rw-r--r--tests/unittests/test_userdata.py17
-rw-r--r--tests/unittests/test_util.py8
9 files changed, 804 insertions, 60 deletions
diff --git a/tests/unittests/test__init__.py b/tests/unittests/test__init__.py
index af18955d..ac082076 100644
--- a/tests/unittests/test__init__.py
+++ b/tests/unittests/test__init__.py
@@ -1,6 +1,6 @@
-import StringIO
import logging
import os
+import StringIO
import sys
from mocker import MockerTestCase, ANY, ARGS, KWARGS
@@ -50,24 +50,27 @@ class TestWalkerHandleHandler(MockerTestCase):
self.payload = "dummy payload"
# Mock the write_file function
- write_file_mock = self.mocker.replace(util.write_file, passthrough=False)
+ write_file_mock = self.mocker.replace(util.write_file,
+ passthrough=False)
write_file_mock(expected_file_fullname, self.payload, 0600)
def test_no_errors(self):
"""Payload gets written to file and added to C{pdata}."""
- import_mock = self.mocker.replace(importer.import_module, passthrough=False)
+ import_mock = self.mocker.replace(importer.import_module,
+ passthrough=False)
import_mock(self.expected_module_name)
self.mocker.result(self.module_fake)
self.mocker.replay()
-
+
handlers.walker_handle_handler(self.data, self.ctype, self.filename,
self.payload)
-
+
self.assertEqual(1, self.data["handlercount"])
-
+
def test_import_error(self):
- """Module import errors are logged. No handler added to C{pdata}"""
- import_mock = self.mocker.replace(importer.import_module, passthrough=False)
+ """Module import errors are logged. No handler added to C{pdata}."""
+ import_mock = self.mocker.replace(importer.import_module,
+ passthrough=False)
import_mock(self.expected_module_name)
self.mocker.throw(ImportError())
self.mocker.replay()
@@ -78,8 +81,9 @@ class TestWalkerHandleHandler(MockerTestCase):
self.assertEqual(0, self.data["handlercount"])
def test_attribute_error(self):
- """Attribute errors are logged. No handler added to C{pdata}"""
- import_mock = self.mocker.replace(importer.import_module, passthrough=False)
+ """Attribute errors are logged. No handler added to C{pdata}."""
+ import_mock = self.mocker.replace(importer.import_module,
+ passthrough=False)
import_mock(self.expected_module_name)
self.mocker.result(self.module_fake)
self.mocker.throw(AttributeError())
@@ -152,7 +156,7 @@ class TestHandlerHandlePart(MockerTestCase):
self.payload, self.frequency)
def test_no_handle_when_modfreq_once(self):
- """C{handle_part} is not called if frequency is once"""
+ """C{handle_part} is not called if frequency is once."""
self.frequency = "once"
mod_mock = self.mocker.mock()
getattr(mod_mock, "frequency")
@@ -185,13 +189,15 @@ class TestCmdlineUrl(MockerTestCase):
payload = "0"
cmdline = "ro %s=%s bar=1" % (key, url)
- mock_readurl = self.mocker.replace(url_helper.readurl, passthrough=False)
+ mock_readurl = self.mocker.replace(url_helper.readurl,
+ passthrough=False)
mock_readurl(url)
self.mocker.result(url_helper.UrlResponse(200, payload))
self.mocker.replay()
self.assertEqual((key, url, None),
- util.get_cmdline_url(names=[key], starts="xxxxxx", cmdline=cmdline))
+ util.get_cmdline_url(names=[key], starts="xxxxxx",
+ cmdline=cmdline))
def test_valid_content(self):
url = "http://example.com/foo"
@@ -199,7 +205,8 @@ class TestCmdlineUrl(MockerTestCase):
payload = "xcloud-config\nmydata: foo\nbar: wark\n"
cmdline = "ro %s=%s bar=1" % (key, url)
- mock_readurl = self.mocker.replace(url_helper.readurl, passthrough=False)
+ mock_readurl = self.mocker.replace(url_helper.readurl,
+ passthrough=False)
mock_readurl(url)
self.mocker.result(url_helper.UrlResponse(200, payload))
self.mocker.replay()
diff --git a/tests/unittests/test_builtin_handlers.py b/tests/unittests/test_builtin_handlers.py
index 84d85d4d..ebc0bd51 100644
--- a/tests/unittests/test_builtin_handlers.py
+++ b/tests/unittests/test_builtin_handlers.py
@@ -1,4 +1,4 @@
-"""Tests of the built-in user data handlers"""
+"""Tests of the built-in user data handlers."""
import os
@@ -6,7 +6,6 @@ from mocker import MockerTestCase
from cloudinit import handlers
from cloudinit import helpers
-from cloudinit import util
from cloudinit.handlers import upstart_job
@@ -34,7 +33,7 @@ class TestBuiltins(MockerTestCase):
None, None, None)
self.assertEquals(0, len(os.listdir(up_root)))
- def test_upstart_frequency_single(self):
+ def test_upstart_frequency_single(self):
c_root = self.makeDir()
up_root = self.makeDir()
paths = helpers.Paths({
diff --git a/tests/unittests/test_datasource/test_altcloud.py b/tests/unittests/test_datasource/test_altcloud.py
new file mode 100644
index 00000000..bda61c7e
--- /dev/null
+++ b/tests/unittests/test_datasource/test_altcloud.py
@@ -0,0 +1,445 @@
+# vi: ts=4 expandtab
+#
+# Copyright (C) 2009-2010 Canonical Ltd.
+# Copyright (C) 2012 Hewlett-Packard Development Company, L.P.
+# Copyright (C) 2012 Yahoo! Inc.
+#
+# Author: Joe VLcek <JVLcek@RedHat.com>
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License version 3, as
+# published by the Free Software Foundation.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+'''
+This test file exercises the code in sources DataSourceAltCloud.py
+'''
+
+import os
+import shutil
+import tempfile
+
+from cloudinit import helpers
+from unittest import TestCase
+
+# Get the cloudinit.sources.DataSourceAltCloud import items needed.
+import cloudinit.sources.DataSourceAltCloud
+from cloudinit.sources.DataSourceAltCloud import DataSourceAltCloud
+from cloudinit.sources.DataSourceAltCloud import read_user_data_callback
+
+
+def _write_cloud_info_file(value):
+ '''
+ Populate the CLOUD_INFO_FILE which would be populated
+ with a cloud backend identifier ImageFactory when building
+ an image with ImageFactory.
+ '''
+ cifile = open(cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE, 'w')
+ cifile.write(value)
+ cifile.close()
+ os.chmod(cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE, 0664)
+
+
+def _remove_cloud_info_file():
+ '''
+ Remove the test CLOUD_INFO_FILE
+ '''
+ os.remove(cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE)
+
+
+def _write_user_data_files(mount_dir, value):
+ '''
+ Populate the deltacloud_user_data_file the user_data_file
+ which would be populated with user data.
+ '''
+ deltacloud_user_data_file = mount_dir + '/deltacloud-user-data.txt'
+ user_data_file = mount_dir + '/user-data.txt'
+
+ udfile = open(deltacloud_user_data_file, 'w')
+ udfile.write(value)
+ udfile.close()
+ os.chmod(deltacloud_user_data_file, 0664)
+
+ udfile = open(user_data_file, 'w')
+ udfile.write(value)
+ udfile.close()
+ os.chmod(user_data_file, 0664)
+
+
+def _remove_user_data_files(mount_dir,
+ dc_file=True,
+ non_dc_file=True):
+ '''
+ Remove the test files: deltacloud_user_data_file and
+ user_data_file
+ '''
+ deltacloud_user_data_file = mount_dir + '/deltacloud-user-data.txt'
+ user_data_file = mount_dir + '/user-data.txt'
+
+ # Ignore any failures removeing files that are already gone.
+ if dc_file:
+ try:
+ os.remove(deltacloud_user_data_file)
+ except OSError:
+ pass
+
+ if non_dc_file:
+ try:
+ os.remove(user_data_file)
+ except OSError:
+ pass
+
+
+class TestGetCloudType(TestCase):
+ '''
+ Test to exercise method: DataSourceAltCloud.get_cloud_type()
+ '''
+
+ def setUp(self):
+ '''Set up.'''
+ self.paths = helpers.Paths({'cloud_dir': '/tmp'})
+
+ def tearDown(self):
+ # Reset
+ cloudinit.sources.DataSourceAltCloud.CMD_DMI_SYSTEM = \
+ ['dmidecode', '--string', 'system-product-name']
+
+ def test_rhev(self):
+ '''
+ Test method get_cloud_type() for RHEVm systems.
+ Forcing dmidecode return to match a RHEVm system: RHEV Hypervisor
+ '''
+ cloudinit.sources.DataSourceAltCloud.CMD_DMI_SYSTEM = \
+ ['echo', 'RHEV Hypervisor']
+ dsrc = DataSourceAltCloud({}, None, self.paths)
+ self.assertEquals('RHEV', \
+ dsrc.get_cloud_type())
+
+ def test_vsphere(self):
+ '''
+ Test method get_cloud_type() for vSphere systems.
+ Forcing dmidecode return to match a vSphere system: RHEV Hypervisor
+ '''
+ cloudinit.sources.DataSourceAltCloud.CMD_DMI_SYSTEM = \
+ ['echo', 'VMware Virtual Platform']
+ dsrc = DataSourceAltCloud({}, None, self.paths)
+ self.assertEquals('VSPHERE', \
+ dsrc.get_cloud_type())
+
+ def test_unknown(self):
+ '''
+ Test method get_cloud_type() for unknown systems.
+ Forcing dmidecode return to match an unrecognized return.
+ '''
+ cloudinit.sources.DataSourceAltCloud.CMD_DMI_SYSTEM = \
+ ['echo', 'Unrecognized Platform']
+ dsrc = DataSourceAltCloud({}, None, self.paths)
+ self.assertEquals('UNKNOWN', \
+ dsrc.get_cloud_type())
+
+ def test_exception1(self):
+ '''
+ Test method get_cloud_type() where command dmidecode fails.
+ '''
+ cloudinit.sources.DataSourceAltCloud.CMD_DMI_SYSTEM = \
+ ['ls', 'bad command']
+ dsrc = DataSourceAltCloud({}, None, self.paths)
+ self.assertEquals('UNKNOWN', \
+ dsrc.get_cloud_type())
+
+ def test_exception2(self):
+ '''
+ Test method get_cloud_type() where command dmidecode is not available.
+ '''
+ cloudinit.sources.DataSourceAltCloud.CMD_DMI_SYSTEM = \
+ ['bad command']
+ dsrc = DataSourceAltCloud({}, None, self.paths)
+ self.assertEquals('UNKNOWN', \
+ dsrc.get_cloud_type())
+
+
+class TestGetDataCloudInfoFile(TestCase):
+ '''
+ Test to exercise method: DataSourceAltCloud.get_data()
+ With a contrived CLOUD_INFO_FILE
+ '''
+ def setUp(self):
+ '''Set up.'''
+ self.paths = helpers.Paths({'cloud_dir': '/tmp'})
+ self.cloud_info_file = tempfile.mkstemp()[1]
+ cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE = \
+ self.cloud_info_file
+
+ def tearDown(self):
+ # Reset
+
+ # Attempt to remove the temp file ignoring errors
+ try:
+ os.remove(self.cloud_info_file)
+ except OSError:
+ pass
+
+ cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE = \
+ '/etc/sysconfig/cloud-info'
+
+ def test_rhev(self):
+ '''Success Test module get_data() forcing RHEV.'''
+
+ _write_cloud_info_file('RHEV')
+ dsrc = DataSourceAltCloud({}, None, self.paths)
+ dsrc.user_data_rhevm = lambda: True
+ self.assertEquals(True, dsrc.get_data())
+
+ def test_vsphere(self):
+ '''Success Test module get_data() forcing VSPHERE.'''
+
+ _write_cloud_info_file('VSPHERE')
+ dsrc = DataSourceAltCloud({}, None, self.paths)
+ dsrc.user_data_vsphere = lambda: True
+ self.assertEquals(True, dsrc.get_data())
+
+ def test_fail_rhev(self):
+ '''Failure Test module get_data() forcing RHEV.'''
+
+ _write_cloud_info_file('RHEV')
+ dsrc = DataSourceAltCloud({}, None, self.paths)
+ dsrc.user_data_rhevm = lambda: False
+ self.assertEquals(False, dsrc.get_data())
+
+ def test_fail_vsphere(self):
+ '''Failure Test module get_data() forcing VSPHERE.'''
+
+ _write_cloud_info_file('VSPHERE')
+ dsrc = DataSourceAltCloud({}, None, self.paths)
+ dsrc.user_data_vsphere = lambda: False
+ self.assertEquals(False, dsrc.get_data())
+
+ def test_unrecognized(self):
+ '''Failure Test module get_data() forcing unrecognized.'''
+
+ _write_cloud_info_file('unrecognized')
+ dsrc = DataSourceAltCloud({}, None, self.paths)
+ self.assertEquals(False, dsrc.get_data())
+
+
+class TestGetDataNoCloudInfoFile(TestCase):
+ '''
+ Test to exercise method: DataSourceAltCloud.get_data()
+ Without a CLOUD_INFO_FILE
+ '''
+ def setUp(self):
+ '''Set up.'''
+ self.paths = helpers.Paths({'cloud_dir': '/tmp'})
+ cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE = \
+ 'no such file'
+
+ def tearDown(self):
+ # Reset
+ cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE = \
+ '/etc/sysconfig/cloud-info'
+ cloudinit.sources.DataSourceAltCloud.CMD_DMI_SYSTEM = \
+ ['dmidecode', '--string', 'system-product-name']
+
+ def test_rhev_no_cloud_file(self):
+ '''Test No cloud info file module get_data() forcing RHEV.'''
+
+ cloudinit.sources.DataSourceAltCloud.CMD_DMI_SYSTEM = \
+ ['echo', 'RHEV Hypervisor']
+ dsrc = DataSourceAltCloud({}, None, self.paths)
+ dsrc.user_data_rhevm = lambda: True
+ self.assertEquals(True, dsrc.get_data())
+
+ def test_vsphere_no_cloud_file(self):
+ '''Test No cloud info file module get_data() forcing VSPHERE.'''
+
+ cloudinit.sources.DataSourceAltCloud.CMD_DMI_SYSTEM = \
+ ['echo', 'VMware Virtual Platform']
+ dsrc = DataSourceAltCloud({}, None, self.paths)
+ dsrc.user_data_vsphere = lambda: True
+ self.assertEquals(True, dsrc.get_data())
+
+ def test_failure_no_cloud_file(self):
+ '''Test No cloud info file module get_data() forcing unrecognized.'''
+
+ cloudinit.sources.DataSourceAltCloud.CMD_DMI_SYSTEM = \
+ ['echo', 'Unrecognized Platform']
+ dsrc = DataSourceAltCloud({}, None, self.paths)
+ self.assertEquals(False, dsrc.get_data())
+
+
+class TestUserDataRhevm(TestCase):
+ '''
+ Test to exercise method: DataSourceAltCloud.user_data_rhevm()
+ '''
+ def setUp(self):
+ '''Set up.'''
+ self.paths = helpers.Paths({'cloud_dir': '/tmp'})
+ self.mount_dir = tempfile.mkdtemp()
+
+ _write_user_data_files(self.mount_dir, 'test user data')
+
+ def tearDown(self):
+ # Reset
+
+ _remove_user_data_files(self.mount_dir)
+
+ # Attempt to remove the temp dir ignoring errors
+ try:
+ shutil.rmtree(self.mount_dir)
+ except OSError:
+ pass
+
+ cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE = \
+ '/etc/sysconfig/cloud-info'
+ cloudinit.sources.DataSourceAltCloud.CMD_PROBE_FLOPPY = \
+ ['/sbin/modprobe', 'floppy']
+ cloudinit.sources.DataSourceAltCloud.CMD_UDEVADM_SETTLE = \
+ ['/sbin/udevadm', 'settle', '--quiet', '--timeout=5']
+
+ def test_mount_cb_fails(self):
+ '''Test user_data_rhevm() where mount_cb fails.'''
+
+ cloudinit.sources.DataSourceAltCloud.CMD_PROBE_FLOPPY = \
+ ['echo', 'modprobe floppy']
+
+ dsrc = DataSourceAltCloud({}, None, self.paths)
+
+ self.assertEquals(False, dsrc.user_data_rhevm())
+
+ def test_modprobe_fails(self):
+ '''Test user_data_rhevm() where modprobe fails.'''
+
+ cloudinit.sources.DataSourceAltCloud.CMD_PROBE_FLOPPY = \
+ ['ls', 'modprobe floppy']
+
+ dsrc = DataSourceAltCloud({}, None, self.paths)
+
+ self.assertEquals(False, dsrc.user_data_rhevm())
+
+ def test_no_modprobe_cmd(self):
+ '''Test user_data_rhevm() with no modprobe command.'''
+
+ cloudinit.sources.DataSourceAltCloud.CMD_PROBE_FLOPPY = \
+ ['bad command', 'modprobe floppy']
+
+ dsrc = DataSourceAltCloud({}, None, self.paths)
+
+ self.assertEquals(False, dsrc.user_data_rhevm())
+
+ def test_udevadm_fails(self):
+ '''Test user_data_rhevm() where udevadm fails.'''
+
+ cloudinit.sources.DataSourceAltCloud.CMD_UDEVADM_SETTLE = \
+ ['ls', 'udevadm floppy']
+
+ dsrc = DataSourceAltCloud({}, None, self.paths)
+
+ self.assertEquals(False, dsrc.user_data_rhevm())
+
+ def test_no_udevadm_cmd(self):
+ '''Test user_data_rhevm() with no udevadm command.'''
+
+ cloudinit.sources.DataSourceAltCloud.CMD_UDEVADM_SETTLE = \
+ ['bad command', 'udevadm floppy']
+
+ dsrc = DataSourceAltCloud({}, None, self.paths)
+
+ self.assertEquals(False, dsrc.user_data_rhevm())
+
+
+class TestUserDataVsphere(TestCase):
+ '''
+ Test to exercise method: DataSourceAltCloud.user_data_vsphere()
+ '''
+ def setUp(self):
+ '''Set up.'''
+ self.paths = helpers.Paths({'cloud_dir': '/tmp'})
+ self.mount_dir = tempfile.mkdtemp()
+
+ _write_user_data_files(self.mount_dir, 'test user data')
+
+ def tearDown(self):
+ # Reset
+
+ _remove_user_data_files(self.mount_dir)
+
+ # Attempt to remove the temp dir ignoring errors
+ try:
+ shutil.rmtree(self.mount_dir)
+ except OSError:
+ pass
+
+ cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE = \
+ '/etc/sysconfig/cloud-info'
+
+ def test_user_data_vsphere(self):
+ '''Test user_data_vsphere() where mount_cb fails.'''
+
+ cloudinit.sources.DataSourceAltCloud.MEDIA_DIR = self.mount_dir
+
+ dsrc = DataSourceAltCloud({}, None, self.paths)
+
+ self.assertEquals(False, dsrc.user_data_vsphere())
+
+
+class TestReadUserDataCallback(TestCase):
+ '''
+ Test to exercise method: DataSourceAltCloud.read_user_data_callback()
+ '''
+ def setUp(self):
+ '''Set up.'''
+ self.paths = helpers.Paths({'cloud_dir': '/tmp'})
+ self.mount_dir = tempfile.mkdtemp()
+
+ _write_user_data_files(self.mount_dir, 'test user data')
+
+ def tearDown(self):
+ # Reset
+
+ _remove_user_data_files(self.mount_dir)
+
+ # Attempt to remove the temp dir ignoring errors
+ try:
+ shutil.rmtree(self.mount_dir)
+ except OSError:
+ pass
+
+ def test_callback_both(self):
+ '''Test read_user_data_callback() with both files.'''
+
+ self.assertEquals('test user data',
+ read_user_data_callback(self.mount_dir))
+
+ def test_callback_dc(self):
+ '''Test read_user_data_callback() with only DC file.'''
+
+ _remove_user_data_files(self.mount_dir,
+ dc_file=False,
+ non_dc_file=True)
+
+ self.assertEquals('test user data',
+ read_user_data_callback(self.mount_dir))
+
+ def test_callback_non_dc(self):
+ '''Test read_user_data_callback() with only non-DC file.'''
+
+ _remove_user_data_files(self.mount_dir,
+ dc_file=True,
+ non_dc_file=False)
+
+ self.assertEquals('test user data',
+ read_user_data_callback(self.mount_dir))
+
+ def test_callback_none(self):
+ '''Test read_user_data_callback() no files are found.'''
+
+ _remove_user_data_files(self.mount_dir)
+ self.assertEquals(None, read_user_data_callback(self.mount_dir))
+
+# vi: ts=4 expandtab
diff --git a/tests/unittests/test_datasource/test_configdrive.py b/tests/unittests/test_datasource/test_configdrive.py
new file mode 100644
index 00000000..55573114
--- /dev/null
+++ b/tests/unittests/test_datasource/test_configdrive.py
@@ -0,0 +1,177 @@
+from copy import copy
+import json
+import os
+import os.path
+import shutil
+import tempfile
+from unittest import TestCase
+
+from cloudinit.sources import DataSourceConfigDrive as ds
+from cloudinit import util
+
+
+PUBKEY = u'ssh-rsa AAAAB3NzaC1....sIkJhq8wdX+4I3A4cYbYP ubuntu@server-460\n'
+EC2_META = {
+ 'ami-id': 'ami-00000001',
+ 'ami-launch-index': 0,
+ 'ami-manifest-path': 'FIXME',
+ 'block-device-mapping': {
+ 'ami': 'sda1',
+ 'ephemeral0': 'sda2',
+ 'root': '/dev/sda1',
+ 'swap': 'sda3'},
+ 'hostname': 'sm-foo-test.novalocal',
+ 'instance-action': 'none',
+ 'instance-id': 'i-00000001',
+ 'instance-type': 'm1.tiny',
+ 'local-hostname': 'sm-foo-test.novalocal',
+ 'local-ipv4': None,
+ 'placement': {'availability-zone': 'nova'},
+ 'public-hostname': 'sm-foo-test.novalocal',
+ 'public-ipv4': '',
+ 'public-keys': {'0': {'openssh-key': PUBKEY}},
+ 'reservation-id': 'r-iru5qm4m',
+ 'security-groups': ['default']
+}
+USER_DATA = '#!/bin/sh\necho This is user data\n'
+OSTACK_META = {
+ 'availability_zone': 'nova',
+ 'files': [{'content_path': '/content/0000', 'path': '/etc/foo.cfg'},
+ {'content_path': '/content/0001', 'path': '/etc/bar/bar.cfg'}],
+ 'hostname': 'sm-foo-test.novalocal',
+ 'meta': {'dsmode': 'local', 'my-meta': 'my-value'},
+ 'name': 'sm-foo-test',
+ 'public_keys': {'mykey': PUBKEY},
+ 'uuid': 'b0fa911b-69d4-4476-bbe2-1c92bff6535c'}
+
+CONTENT_0 = 'This is contents of /etc/foo.cfg\n'
+CONTENT_1 = '# this is /etc/bar/bar.cfg\n'
+
+CFG_DRIVE_FILES_V2 = {
+ 'ec2/2009-04-04/meta-data.json': json.dumps(EC2_META),
+ 'ec2/2009-04-04/user-data': USER_DATA,
+ 'ec2/latest/meta-data.json': json.dumps(EC2_META),
+ 'ec2/latest/user-data': USER_DATA,
+ 'openstack/2012-08-10/meta_data.json': json.dumps(OSTACK_META),
+ 'openstack/2012-08-10/user_data': USER_DATA,
+ 'openstack/content/0000': CONTENT_0,
+ 'openstack/content/0001': CONTENT_1,
+ 'openstack/latest/meta_data.json': json.dumps(OSTACK_META),
+ 'openstack/latest/user_data': USER_DATA}
+
+
+class TestConfigDriveDataSource(TestCase):
+
+ def setUp(self):
+ super(TestConfigDriveDataSource, self).setUp()
+ self.tmp = tempfile.mkdtemp()
+
+ def tearDown(self):
+ try:
+ shutil.rmtree(self.tmp)
+ except OSError:
+ pass
+
+ def test_dir_valid(self):
+ """Verify a dir is read as such."""
+
+ populate_dir(self.tmp, CFG_DRIVE_FILES_V2)
+
+ found = ds.read_config_drive_dir(self.tmp)
+
+ expected_md = copy(OSTACK_META)
+ expected_md['instance-id'] = expected_md['uuid']
+
+ self.assertEqual(USER_DATA, found['userdata'])
+ self.assertEqual(expected_md, found['metadata'])
+ self.assertEqual(found['files']['/etc/foo.cfg'], CONTENT_0)
+ self.assertEqual(found['files']['/etc/bar/bar.cfg'], CONTENT_1)
+
+ def test_seed_dir_valid_extra(self):
+ """Verify extra files do not affect datasource validity."""
+
+ data = copy(CFG_DRIVE_FILES_V2)
+ data["myfoofile.txt"] = "myfoocontent"
+ data["openstack/latest/random-file.txt"] = "random-content"
+
+ populate_dir(self.tmp, data)
+
+ found = ds.read_config_drive_dir(self.tmp)
+
+ expected_md = copy(OSTACK_META)
+ expected_md['instance-id'] = expected_md['uuid']
+
+ self.assertEqual(expected_md, found['metadata'])
+
+ def test_seed_dir_bad_json_metadata(self):
+ """Verify that bad json in metadata raises BrokenConfigDriveDir."""
+ data = copy(CFG_DRIVE_FILES_V2)
+
+ data["openstack/2012-08-10/meta_data.json"] = "non-json garbage {}"
+ data["openstack/latest/meta_data.json"] = "non-json garbage {}"
+
+ populate_dir(self.tmp, data)
+
+ self.assertRaises(ds.BrokenConfigDriveDir,
+ ds.read_config_drive_dir, self.tmp)
+
+ def test_seed_dir_no_configdrive(self):
+ """Verify that no metadata raises NonConfigDriveDir."""
+
+ my_d = os.path.join(self.tmp, "non-configdrive")
+ data = copy(CFG_DRIVE_FILES_V2)
+ data["myfoofile.txt"] = "myfoocontent"
+ data["openstack/latest/random-file.txt"] = "random-content"
+ data["content/foo"] = "foocontent"
+
+ self.assertRaises(ds.NonConfigDriveDir,
+ ds.read_config_drive_dir, my_d)
+
+ def test_seed_dir_missing(self):
+ """Verify that missing seed_dir raises NonConfigDriveDir."""
+ my_d = os.path.join(self.tmp, "nonexistantdirectory")
+ self.assertRaises(ds.NonConfigDriveDir,
+ ds.read_config_drive_dir, my_d)
+
+ def test_find_candidates(self):
+ devs_with_answers = {
+ "TYPE=vfat": [],
+ "TYPE=iso9660": ["/dev/vdb"],
+ "LABEL=config-2": ["/dev/vdb"],
+ }
+
+ def my_devs_with(criteria):
+ return devs_with_answers[criteria]
+
+ try:
+ orig_find_devs_with = util.find_devs_with
+ util.find_devs_with = my_devs_with
+
+ self.assertEqual(["/dev/vdb"], ds.find_candidate_devs())
+
+ # add a vfat item
+ # zdd reverse sorts after vdb, but config-2 label is preferred
+ devs_with_answers['TYPE=vfat'] = ["/dev/zdd"]
+ self.assertEqual(["/dev/vdb", "/dev/zdd"],
+ ds.find_candidate_devs())
+
+ # verify that partitions are not considered
+ devs_with_answers = {"TYPE=vfat": ["/dev/sda1"],
+ "TYPE=iso9660": [], "LABEL=config-2": ["/dev/vdb3"]}
+ self.assertEqual([], ds.find_candidate_devs())
+
+ finally:
+ util.find_devs_with = orig_find_devs_with
+
+
+def populate_dir(seed_dir, files):
+ for (name, content) in files.iteritems():
+ path = os.path.join(seed_dir, name)
+ dirname = os.path.dirname(path)
+ if not os.path.isdir(dirname):
+ os.makedirs(dirname)
+ with open(path, "w") as fp:
+ fp.write(content)
+ fp.close()
+
+# vi: ts=4 expandtab
diff --git a/tests/unittests/test_datasource/test_maas.py b/tests/unittests/test_datasource/test_maas.py
index 261c410a..85e6add0 100644
--- a/tests/unittests/test_datasource/test_maas.py
+++ b/tests/unittests/test_datasource/test_maas.py
@@ -1,10 +1,8 @@
-import os
-from StringIO import StringIO
from copy import copy
+import os
-from cloudinit import util
-from cloudinit import url_helper
from cloudinit.sources import DataSourceMAAS
+from cloudinit import url_helper
from mocker import MockerTestCase
@@ -17,7 +15,7 @@ class TestMAASDataSource(MockerTestCase):
self.tmp = self.makeDir()
def test_seed_dir_valid(self):
- """Verify a valid seeddir is read as such"""
+ """Verify a valid seeddir is read as such."""
data = {'instance-id': 'i-valid01',
'local-hostname': 'valid01-hostname',
@@ -37,7 +35,7 @@ class TestMAASDataSource(MockerTestCase):
self.assertFalse(('user-data' in metadata))
def test_seed_dir_valid_extra(self):
- """Verify extra files do not affect seed_dir validity """
+ """Verify extra files do not affect seed_dir validity."""
data = {'instance-id': 'i-valid-extra',
'local-hostname': 'valid-extra-hostname',
@@ -56,7 +54,7 @@ class TestMAASDataSource(MockerTestCase):
self.assertFalse(('foo' in metadata))
def test_seed_dir_invalid(self):
- """Verify that invalid seed_dir raises MAASSeedDirMalformed"""
+ """Verify that invalid seed_dir raises MAASSeedDirMalformed."""
valid = {'instance-id': 'i-instanceid',
'local-hostname': 'test-hostname', 'user-data': ''}
@@ -80,20 +78,20 @@ class TestMAASDataSource(MockerTestCase):
DataSourceMAAS.read_maas_seed_dir, my_d)
def test_seed_dir_none(self):
- """Verify that empty seed_dir raises MAASSeedDirNone"""
+ """Verify that empty seed_dir raises MAASSeedDirNone."""
my_d = os.path.join(self.tmp, "valid_empty")
self.assertRaises(DataSourceMAAS.MAASSeedDirNone,
DataSourceMAAS.read_maas_seed_dir, my_d)
def test_seed_dir_missing(self):
- """Verify that missing seed_dir raises MAASSeedDirNone"""
- self.assertRaises(DataSourceMAAS.MAASSeedDirNone,
+ """Verify that missing seed_dir raises MAASSeedDirNone."""
+ self.assertRaises(DataSourceMAAS.MAASSeedDirNone,
DataSourceMAAS.read_maas_seed_dir,
os.path.join(self.tmp, "nonexistantdirectory"))
def test_seed_url_valid(self):
- """Verify that valid seed_url is read as such"""
+ """Verify that valid seed_url is read as such."""
valid = {'meta-data/instance-id': 'i-instanceid',
'meta-data/local-hostname': 'test-hostname',
'meta-data/public-keys': 'test-hostname',
@@ -131,11 +129,11 @@ class TestMAASDataSource(MockerTestCase):
valid['meta-data/local-hostname'])
def test_seed_url_invalid(self):
- """Verify that invalid seed_url raises MAASSeedDirMalformed"""
+ """Verify that invalid seed_url raises MAASSeedDirMalformed."""
pass
def test_seed_url_missing(self):
- """Verify seed_url with no found entries raises MAASSeedDirNone"""
+ """Verify seed_url with no found entries raises MAASSeedDirNone."""
pass
diff --git a/tests/unittests/test_distros/test_generic.py b/tests/unittests/test_distros/test_generic.py
new file mode 100644
index 00000000..2df4c2f0
--- /dev/null
+++ b/tests/unittests/test_distros/test_generic.py
@@ -0,0 +1,121 @@
+from mocker import MockerTestCase
+
+from cloudinit import distros
+
+unknown_arch_info = {
+ 'arches': ['default'],
+ 'failsafe': {'primary': 'http://fs-primary-default',
+ 'security': 'http://fs-security-default'}
+}
+
+package_mirrors = [
+ {'arches': ['i386', 'amd64'],
+ 'failsafe': {'primary': 'http://fs-primary-intel',
+ 'security': 'http://fs-security-intel'},
+ 'search': {
+ 'primary': ['http://%(ec2_region)s.ec2/',
+ 'http://%(availability_zone)s.clouds/'],
+ 'security': ['http://security-mirror1-intel',
+ 'http://security-mirror2-intel']}},
+ {'arches': ['armhf', 'armel'],
+ 'failsafe': {'primary': 'http://fs-primary-arm',
+ 'security': 'http://fs-security-arm'}},
+ unknown_arch_info
+]
+
+gpmi = distros._get_package_mirror_info # pylint: disable=W0212
+gapmi = distros._get_arch_package_mirror_info # pylint: disable=W0212
+
+
+class TestGenericDistro(MockerTestCase):
+
+ def return_first(self, mlist):
+ if not mlist:
+ return None
+ return mlist[0]
+
+ def return_second(self, mlist):
+ if not mlist:
+ return None
+ return mlist[1]
+
+ def return_none(self, _mlist):
+ return None
+
+ def return_last(self, mlist):
+ if not mlist:
+ return None
+ return(mlist[-1])
+
+ def setUp(self):
+ super(TestGenericDistro, self).setUp()
+ # Make a temp directoy for tests to use.
+ self.tmp = self.makeDir()
+
+ def test_arch_package_mirror_info_unknown(self):
+ """for an unknown arch, we should get back that with arch 'default'."""
+ arch_mirrors = gapmi(package_mirrors, arch="unknown")
+ self.assertEqual(unknown_arch_info, arch_mirrors)
+
+ def test_arch_package_mirror_info_known(self):
+ arch_mirrors = gapmi(package_mirrors, arch="amd64")
+ self.assertEqual(package_mirrors[0], arch_mirrors)
+
+ def test_get_package_mirror_info_az_ec2(self):
+ arch_mirrors = gapmi(package_mirrors, arch="amd64")
+
+ results = gpmi(arch_mirrors, availability_zone="us-east-1a",
+ mirror_filter=self.return_first)
+ self.assertEqual(results,
+ {'primary': 'http://us-east-1.ec2/',
+ 'security': 'http://security-mirror1-intel'})
+
+ results = gpmi(arch_mirrors, availability_zone="us-east-1a",
+ mirror_filter=self.return_second)
+ self.assertEqual(results,
+ {'primary': 'http://us-east-1a.clouds/',
+ 'security': 'http://security-mirror2-intel'})
+
+ results = gpmi(arch_mirrors, availability_zone="us-east-1a",
+ mirror_filter=self.return_none)
+ self.assertEqual(results, package_mirrors[0]['failsafe'])
+
+ def test_get_package_mirror_info_az_non_ec2(self):
+ arch_mirrors = gapmi(package_mirrors, arch="amd64")
+
+ results = gpmi(arch_mirrors, availability_zone="nova.cloudvendor",
+ mirror_filter=self.return_first)
+ self.assertEqual(results,
+ {'primary': 'http://nova.cloudvendor.clouds/',
+ 'security': 'http://security-mirror1-intel'})
+
+ results = gpmi(arch_mirrors, availability_zone="nova.cloudvendor",
+ mirror_filter=self.return_last)
+ self.assertEqual(results,
+ {'primary': 'http://nova.cloudvendor.clouds/',
+ 'security': 'http://security-mirror2-intel'})
+
+ def test_get_package_mirror_info_none(self):
+ arch_mirrors = gapmi(package_mirrors, arch="amd64")
+
+ # because both search entries here replacement based on
+ # availability-zone, the filter will be called with an empty list and
+ # failsafe should be taken.
+ results = gpmi(arch_mirrors, availability_zone=None,
+ mirror_filter=self.return_first)
+ self.assertEqual(results,
+ {'primary': 'http://fs-primary-intel',
+ 'security': 'http://security-mirror1-intel'})
+
+ results = gpmi(arch_mirrors, availability_zone=None,
+ mirror_filter=self.return_last)
+ self.assertEqual(results,
+ {'primary': 'http://fs-primary-intel',
+ 'security': 'http://security-mirror2-intel'})
+
+
+#def _get_package_mirror_info(mirror_info, availability_zone=None,
+# mirror_filter=util.search_for_mirror):
+
+
+# vi: ts=4 expandtab
diff --git a/tests/unittests/test_handler/test_handler_ca_certs.py b/tests/unittests/test_handler/test_handler_ca_certs.py
index 1f96e992..d3df5c50 100644
--- a/tests/unittests/test_handler/test_handler_ca_certs.py
+++ b/tests/unittests/test_handler/test_handler_ca_certs.py
@@ -1,8 +1,8 @@
from mocker import MockerTestCase
-from cloudinit import util
from cloudinit import cloud
from cloudinit import helpers
+from cloudinit import util
from cloudinit.config import cc_ca_certs
@@ -26,7 +26,8 @@ class TestNoConfig(MockerTestCase):
self.mocker.replace(cc_ca_certs.update_ca_certs, passthrough=False)
self.mocker.replay()
- cc_ca_certs.handle(self.name, config, self.cloud_init, self.log, self.args)
+ cc_ca_certs.handle(self.name, config, self.cloud_init, self.log,
+ self.args)
class TestConfig(MockerTestCase):
@@ -39,11 +40,12 @@ class TestConfig(MockerTestCase):
self.args = []
# Mock out the functions that actually modify the system
- self.mock_add = self.mocker.replace(cc_ca_certs.add_ca_certs, passthrough=False)
+ self.mock_add = self.mocker.replace(cc_ca_certs.add_ca_certs,
+ passthrough=False)
self.mock_update = self.mocker.replace(cc_ca_certs.update_ca_certs,
passthrough=False)
- self.mock_remove = self.mocker.replace(cc_ca_certs.remove_default_ca_certs,
- passthrough=False)
+ self.mock_remove = self.mocker.replace(
+ cc_ca_certs.remove_default_ca_certs, passthrough=False)
# Order must be correct
self.mocker.order()
@@ -62,7 +64,7 @@ class TestConfig(MockerTestCase):
cc_ca_certs.handle(self.name, config, self.cloud, self.log, self.args)
def test_empty_trusted_list(self):
- """Test that no certificate are written if 'trusted' list is empty"""
+ """Test that no certificate are written if 'trusted' list is empty."""
config = {"ca-certs": {"trusted": []}}
# No functions should be called
@@ -72,7 +74,7 @@ class TestConfig(MockerTestCase):
cc_ca_certs.handle(self.name, config, self.cloud, self.log, self.args)
def test_single_trusted(self):
- """Test that a single cert gets passed to add_ca_certs"""
+ """Test that a single cert gets passed to add_ca_certs."""
config = {"ca-certs": {"trusted": ["CERT1"]}}
self.mock_add(self.paths, ["CERT1"])
@@ -82,7 +84,7 @@ class TestConfig(MockerTestCase):
cc_ca_certs.handle(self.name, config, self.cloud, self.log, self.args)
def test_multiple_trusted(self):
- """Test that multiple certs get passed to add_ca_certs"""
+ """Test that multiple certs get passed to add_ca_certs."""
config = {"ca-certs": {"trusted": ["CERT1", "CERT2"]}}
self.mock_add(self.paths, ["CERT1", "CERT2"])
@@ -92,7 +94,7 @@ class TestConfig(MockerTestCase):
cc_ca_certs.handle(self.name, config, self.cloud, self.log, self.args)
def test_remove_default_ca_certs(self):
- """Test remove_defaults works as expected"""
+ """Test remove_defaults works as expected."""
config = {"ca-certs": {"remove-defaults": True}}
self.mock_remove(self.paths)
@@ -102,7 +104,7 @@ class TestConfig(MockerTestCase):
cc_ca_certs.handle(self.name, config, self.cloud, self.log, self.args)
def test_no_remove_defaults_if_false(self):
- """Test remove_defaults is not called when config value is False"""
+ """Test remove_defaults is not called when config value is False."""
config = {"ca-certs": {"remove-defaults": False}}
self.mock_update()
@@ -111,7 +113,7 @@ class TestConfig(MockerTestCase):
cc_ca_certs.handle(self.name, config, self.cloud, self.log, self.args)
def test_correct_order_for_remove_then_add(self):
- """Test remove_defaults is not called when config value is False"""
+ """Test remove_defaults is not called when config value is False."""
config = {"ca-certs": {"remove-defaults": True, "trusted": ["CERT1"]}}
self.mock_remove(self.paths)
@@ -137,7 +139,7 @@ class TestAddCaCerts(MockerTestCase):
cc_ca_certs.add_ca_certs(self.paths, [])
def test_single_cert(self):
- """Test adding a single certificate to the trusted CAs"""
+ """Test adding a single certificate to the trusted CAs."""
cert = "CERT1\nLINE2\nLINE3"
mock_write = self.mocker.replace(util.write_file, passthrough=False)
@@ -150,7 +152,7 @@ class TestAddCaCerts(MockerTestCase):
cc_ca_certs.add_ca_certs(self.paths, [cert])
def test_multiple_certs(self):
- """Test adding multiple certificates to the trusted CAs"""
+ """Test adding multiple certificates to the trusted CAs."""
certs = ["CERT1\nLINE2\nLINE3", "CERT2\nLINE2\nLINE3"]
expected_cert_file = "\n".join(certs)
@@ -183,8 +185,8 @@ class TestRemoveDefaultCaCerts(MockerTestCase):
})
def test_commands(self):
- mock_delete_dir_contents = self.mocker.replace(util.delete_dir_contents,
- passthrough=False)
+ mock_delete_dir_contents = self.mocker.replace(
+ util.delete_dir_contents, passthrough=False)
mock_write = self.mocker.replace(util.write_file, passthrough=False)
mock_subp = self.mocker.replace(util.subp,
passthrough=False)
diff --git a/tests/unittests/test_userdata.py b/tests/unittests/test_userdata.py
index 861642b6..82a4c555 100644
--- a/tests/unittests/test_userdata.py
+++ b/tests/unittests/test_userdata.py
@@ -1,21 +1,17 @@
-"""Tests for handling of userdata within cloud init"""
+"""Tests for handling of userdata within cloud init."""
import StringIO
import logging
import os
-import shutil
-import tempfile
from email.mime.base import MIMEBase
from mocker import MockerTestCase
-from cloudinit import helpers
from cloudinit import log
from cloudinit import sources
from cloudinit import stages
-from cloudinit import util
INSTANCE_ID = "i-testing"
@@ -58,7 +54,7 @@ class TestConsumeUserData(MockerTestCase):
return log_file
def test_unhandled_type_warning(self):
- """Raw text without magic is ignored but shows warning"""
+ """Raw text without magic is ignored but shows warning."""
ci = stages.Init()
data = "arbitrary text\n"
ci.datasource = FakeDataSource(data)
@@ -74,7 +70,7 @@ class TestConsumeUserData(MockerTestCase):
log_file.getvalue())
def test_mime_text_plain(self):
- """Mime message of type text/plain is ignored but shows warning"""
+ """Mime message of type text/plain is ignored but shows warning."""
ci = stages.Init()
message = MIMEBase("text", "plain")
message.set_payload("Just text")
@@ -90,9 +86,8 @@ class TestConsumeUserData(MockerTestCase):
"Unhandled unknown content-type (text/plain)",
log_file.getvalue())
-
def test_shellscript(self):
- """Raw text starting #!/bin/sh is treated as script"""
+ """Raw text starting #!/bin/sh is treated as script."""
ci = stages.Init()
script = "#!/bin/sh\necho hello\n"
ci.datasource = FakeDataSource(script)
@@ -108,7 +103,7 @@ class TestConsumeUserData(MockerTestCase):
self.assertEqual("", log_file.getvalue())
def test_mime_text_x_shellscript(self):
- """Mime message of type text/x-shellscript is treated as script"""
+ """Mime message of type text/x-shellscript is treated as script."""
ci = stages.Init()
script = "#!/bin/sh\necho hello\n"
message = MIMEBase("text", "x-shellscript")
@@ -126,7 +121,7 @@ class TestConsumeUserData(MockerTestCase):
self.assertEqual("", log_file.getvalue())
def test_mime_text_plain_shell(self):
- """Mime type text/plain starting #!/bin/sh is treated as script"""
+ """Mime type text/plain starting #!/bin/sh is treated as script."""
ci = stages.Init()
script = "#!/bin/sh\necho hello\n"
message = MIMEBase("text", "plain")
diff --git a/tests/unittests/test_util.py b/tests/unittests/test_util.py
index 93979f06..15fcbd26 100644
--- a/tests/unittests/test_util.py
+++ b/tests/unittests/test_util.py
@@ -1,11 +1,11 @@
import os
import stat
-from unittest import TestCase
from mocker import MockerTestCase
+from unittest import TestCase
-from cloudinit import util
from cloudinit import importer
+from cloudinit import util
class FakeSelinux(object):
@@ -14,7 +14,7 @@ class FakeSelinux(object):
self.match_what = match_what
self.restored = []
- def matchpathcon(self, path, mode):
+ def matchpathcon(self, path, mode): # pylint: disable=W0613
if path == self.match_what:
return
else:
@@ -23,7 +23,7 @@ class FakeSelinux(object):
def is_selinux_enabled(self):
return True
- def restorecon(self, path, recursive):
+ def restorecon(self, path, recursive): # pylint: disable=W0613
self.restored.append(path)