summaryrefslogtreecommitdiff
path: root/tests
diff options
context:
space:
mode:
Diffstat (limited to 'tests')
-rw-r--r--tests/data/filter_cloud_multipart.yaml30
-rw-r--r--tests/data/filter_cloud_multipart_1.email11
-rw-r--r--tests/data/filter_cloud_multipart_2.email39
-rw-r--r--tests/data/filter_cloud_multipart_header.email11
-rw-r--r--tests/unittests/helpers.py42
-rw-r--r--tests/unittests/test__init__.py14
-rw-r--r--tests/unittests/test_builtin_handlers.py4
-rw-r--r--tests/unittests/test_datasource/test_altcloud.py445
-rw-r--r--tests/unittests/test_datasource/test_configdrive.py177
-rw-r--r--tests/unittests/test_datasource/test_maas.py22
-rw-r--r--tests/unittests/test_distros/test_generic.py121
-rw-r--r--tests/unittests/test_filters/test_launch_index.py134
-rw-r--r--tests/unittests/test_handler/test_handler_ca_certs.py18
-rw-r--r--tests/unittests/test_userdata.py13
-rw-r--r--tests/unittests/test_util.py4
15 files changed, 1047 insertions, 38 deletions
diff --git a/tests/data/filter_cloud_multipart.yaml b/tests/data/filter_cloud_multipart.yaml
new file mode 100644
index 00000000..7acc2b9d
--- /dev/null
+++ b/tests/data/filter_cloud_multipart.yaml
@@ -0,0 +1,30 @@
+#cloud-config-archive
+---
+- content: "\n blah: true\n launch-index: 3\n"
+ type: text/cloud-config
+- content: "\n blah: true\n launch-index: 4\n"
+ type: text/cloud-config
+- content: The quick brown fox jumps over the lazy dog
+ filename: b0.txt
+ launch-index: 0
+ type: plain/text
+- content: The quick brown fox jumps over the lazy dog
+ filename: b3.txt
+ launch-index: 3
+ type: plain/text
+- content: The quick brown fox jumps over the lazy dog
+ filename: b2.txt
+ launch-index: 2
+ type: plain/text
+- content: '#!/bin/bash \n echo "stuff"'
+ filename: b2.txt
+ launch-index: 2
+- content: '#!/bin/bash \n echo "stuff"'
+ filename: b2.txt
+ launch-index: 1
+- content: '#!/bin/bash \n echo "stuff"'
+ filename: b2.txt
+ # Use a string to see if conversion works
+ launch-index: "1"
+...
+
diff --git a/tests/data/filter_cloud_multipart_1.email b/tests/data/filter_cloud_multipart_1.email
new file mode 100644
index 00000000..6d93b1f1
--- /dev/null
+++ b/tests/data/filter_cloud_multipart_1.email
@@ -0,0 +1,11 @@
+From nobody Fri Aug 31 17:17:00 2012
+Content-Type: text/plain; charset="us-ascii"
+MIME-Version: 1.0
+Content-Transfer-Encoding: 7bit
+
+
+#cloud-config
+b: c
+launch-index: 2
+
+
diff --git a/tests/data/filter_cloud_multipart_2.email b/tests/data/filter_cloud_multipart_2.email
new file mode 100644
index 00000000..b04068c5
--- /dev/null
+++ b/tests/data/filter_cloud_multipart_2.email
@@ -0,0 +1,39 @@
+From nobody Fri Aug 31 17:43:04 2012
+Content-Type: multipart/mixed; boundary="===============1668325974=="
+MIME-Version: 1.0
+
+--===============1668325974==
+Content-Type: text/cloud-config; charset="us-ascii"
+MIME-Version: 1.0
+Content-Transfer-Encoding: 7bit
+
+
+#cloud-config
+b: c
+launch-index: 2
+
+
+--===============1668325974==
+Content-Type: text/plain; charset="us-ascii"
+MIME-Version: 1.0
+Content-Transfer-Encoding: 7bit
+
+
+#cloud-config-archive
+- content: The quick brown fox jumps over the lazy dog
+ filename: b3.txt
+ launch-index: 3
+ type: plain/text
+
+--===============1668325974==
+Content-Type: text/plain; charset="us-ascii"
+MIME-Version: 1.0
+Content-Transfer-Encoding: 7bit
+
+
+#cloud-config
+b: c
+launch-index: 2
+
+
+--===============1668325974==--
diff --git a/tests/data/filter_cloud_multipart_header.email b/tests/data/filter_cloud_multipart_header.email
new file mode 100644
index 00000000..770f7ef1
--- /dev/null
+++ b/tests/data/filter_cloud_multipart_header.email
@@ -0,0 +1,11 @@
+From nobody Fri Aug 31 17:17:00 2012
+Content-Type: text/plain; charset="us-ascii"
+MIME-Version: 1.0
+Launch-Index: 5
+Content-Transfer-Encoding: 7bit
+
+
+#cloud-config
+b: c
+
+
diff --git a/tests/unittests/helpers.py b/tests/unittests/helpers.py
new file mode 100644
index 00000000..d0f09e70
--- /dev/null
+++ b/tests/unittests/helpers.py
@@ -0,0 +1,42 @@
+import os
+
+from mocker import MockerTestCase
+
+from cloudinit import helpers as ch
+
+
+class ResourceUsingTestCase(MockerTestCase):
+ def __init__(self, methodName="runTest"):
+ MockerTestCase.__init__(self, methodName)
+ self.resource_path = None
+
+ def resourceLocation(self, subname=None):
+ if self.resource_path is None:
+ paths = [
+ os.path.join('tests', 'data'),
+ os.path.join('data'),
+ os.path.join(os.pardir, 'tests', 'data'),
+ os.path.join(os.pardir, 'data'),
+ ]
+ for p in paths:
+ if os.path.isdir(p):
+ self.resource_path = p
+ break
+ self.assertTrue((self.resource_path and
+ os.path.isdir(self.resource_path)),
+ msg="Unable to locate test resource data path!")
+ if not subname:
+ return self.resource_path
+ return os.path.join(self.resource_path, subname)
+
+ def readResource(self, name):
+ where = self.resourceLocation(name)
+ with open(where, 'r') as fh:
+ return fh.read()
+
+ def getCloudPaths(self):
+ cp = ch.Paths({
+ 'cloud_dir': self.makeDir(),
+ 'templates_dir': self.resourceLocation(),
+ })
+ return cp
diff --git a/tests/unittests/test__init__.py b/tests/unittests/test__init__.py
index 464c8c2f..ac082076 100644
--- a/tests/unittests/test__init__.py
+++ b/tests/unittests/test__init__.py
@@ -1,6 +1,6 @@
-import StringIO
import logging
import os
+import StringIO
import sys
from mocker import MockerTestCase, ANY, ARGS, KWARGS
@@ -61,14 +61,14 @@ class TestWalkerHandleHandler(MockerTestCase):
import_mock(self.expected_module_name)
self.mocker.result(self.module_fake)
self.mocker.replay()
-
+
handlers.walker_handle_handler(self.data, self.ctype, self.filename,
self.payload)
-
+
self.assertEqual(1, self.data["handlercount"])
-
+
def test_import_error(self):
- """Module import errors are logged. No handler added to C{pdata}"""
+ """Module import errors are logged. No handler added to C{pdata}."""
import_mock = self.mocker.replace(importer.import_module,
passthrough=False)
import_mock(self.expected_module_name)
@@ -81,7 +81,7 @@ class TestWalkerHandleHandler(MockerTestCase):
self.assertEqual(0, self.data["handlercount"])
def test_attribute_error(self):
- """Attribute errors are logged. No handler added to C{pdata}"""
+ """Attribute errors are logged. No handler added to C{pdata}."""
import_mock = self.mocker.replace(importer.import_module,
passthrough=False)
import_mock(self.expected_module_name)
@@ -156,7 +156,7 @@ class TestHandlerHandlePart(MockerTestCase):
self.payload, self.frequency)
def test_no_handle_when_modfreq_once(self):
- """C{handle_part} is not called if frequency is once"""
+ """C{handle_part} is not called if frequency is once."""
self.frequency = "once"
mod_mock = self.mocker.mock()
getattr(mod_mock, "frequency")
diff --git a/tests/unittests/test_builtin_handlers.py b/tests/unittests/test_builtin_handlers.py
index 5bba8bc9..ebc0bd51 100644
--- a/tests/unittests/test_builtin_handlers.py
+++ b/tests/unittests/test_builtin_handlers.py
@@ -1,4 +1,4 @@
-"""Tests of the built-in user data handlers"""
+"""Tests of the built-in user data handlers."""
import os
@@ -33,7 +33,7 @@ class TestBuiltins(MockerTestCase):
None, None, None)
self.assertEquals(0, len(os.listdir(up_root)))
- def test_upstart_frequency_single(self):
+ def test_upstart_frequency_single(self):
c_root = self.makeDir()
up_root = self.makeDir()
paths = helpers.Paths({
diff --git a/tests/unittests/test_datasource/test_altcloud.py b/tests/unittests/test_datasource/test_altcloud.py
new file mode 100644
index 00000000..bda61c7e
--- /dev/null
+++ b/tests/unittests/test_datasource/test_altcloud.py
@@ -0,0 +1,445 @@
+# vi: ts=4 expandtab
+#
+# Copyright (C) 2009-2010 Canonical Ltd.
+# Copyright (C) 2012 Hewlett-Packard Development Company, L.P.
+# Copyright (C) 2012 Yahoo! Inc.
+#
+# Author: Joe VLcek <JVLcek@RedHat.com>
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License version 3, as
+# published by the Free Software Foundation.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+'''
+This test file exercises the code in sources DataSourceAltCloud.py
+'''
+
+import os
+import shutil
+import tempfile
+
+from cloudinit import helpers
+from unittest import TestCase
+
+# Get the cloudinit.sources.DataSourceAltCloud import items needed.
+import cloudinit.sources.DataSourceAltCloud
+from cloudinit.sources.DataSourceAltCloud import DataSourceAltCloud
+from cloudinit.sources.DataSourceAltCloud import read_user_data_callback
+
+
+def _write_cloud_info_file(value):
+ '''
+ Populate the CLOUD_INFO_FILE which would be populated
+ with a cloud backend identifier ImageFactory when building
+ an image with ImageFactory.
+ '''
+ cifile = open(cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE, 'w')
+ cifile.write(value)
+ cifile.close()
+ os.chmod(cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE, 0664)
+
+
+def _remove_cloud_info_file():
+ '''
+ Remove the test CLOUD_INFO_FILE
+ '''
+ os.remove(cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE)
+
+
+def _write_user_data_files(mount_dir, value):
+ '''
+ Populate the deltacloud_user_data_file the user_data_file
+ which would be populated with user data.
+ '''
+ deltacloud_user_data_file = mount_dir + '/deltacloud-user-data.txt'
+ user_data_file = mount_dir + '/user-data.txt'
+
+ udfile = open(deltacloud_user_data_file, 'w')
+ udfile.write(value)
+ udfile.close()
+ os.chmod(deltacloud_user_data_file, 0664)
+
+ udfile = open(user_data_file, 'w')
+ udfile.write(value)
+ udfile.close()
+ os.chmod(user_data_file, 0664)
+
+
+def _remove_user_data_files(mount_dir,
+ dc_file=True,
+ non_dc_file=True):
+ '''
+ Remove the test files: deltacloud_user_data_file and
+ user_data_file
+ '''
+ deltacloud_user_data_file = mount_dir + '/deltacloud-user-data.txt'
+ user_data_file = mount_dir + '/user-data.txt'
+
+ # Ignore any failures removeing files that are already gone.
+ if dc_file:
+ try:
+ os.remove(deltacloud_user_data_file)
+ except OSError:
+ pass
+
+ if non_dc_file:
+ try:
+ os.remove(user_data_file)
+ except OSError:
+ pass
+
+
+class TestGetCloudType(TestCase):
+ '''
+ Test to exercise method: DataSourceAltCloud.get_cloud_type()
+ '''
+
+ def setUp(self):
+ '''Set up.'''
+ self.paths = helpers.Paths({'cloud_dir': '/tmp'})
+
+ def tearDown(self):
+ # Reset
+ cloudinit.sources.DataSourceAltCloud.CMD_DMI_SYSTEM = \
+ ['dmidecode', '--string', 'system-product-name']
+
+ def test_rhev(self):
+ '''
+ Test method get_cloud_type() for RHEVm systems.
+ Forcing dmidecode return to match a RHEVm system: RHEV Hypervisor
+ '''
+ cloudinit.sources.DataSourceAltCloud.CMD_DMI_SYSTEM = \
+ ['echo', 'RHEV Hypervisor']
+ dsrc = DataSourceAltCloud({}, None, self.paths)
+ self.assertEquals('RHEV', \
+ dsrc.get_cloud_type())
+
+ def test_vsphere(self):
+ '''
+ Test method get_cloud_type() for vSphere systems.
+ Forcing dmidecode return to match a vSphere system: RHEV Hypervisor
+ '''
+ cloudinit.sources.DataSourceAltCloud.CMD_DMI_SYSTEM = \
+ ['echo', 'VMware Virtual Platform']
+ dsrc = DataSourceAltCloud({}, None, self.paths)
+ self.assertEquals('VSPHERE', \
+ dsrc.get_cloud_type())
+
+ def test_unknown(self):
+ '''
+ Test method get_cloud_type() for unknown systems.
+ Forcing dmidecode return to match an unrecognized return.
+ '''
+ cloudinit.sources.DataSourceAltCloud.CMD_DMI_SYSTEM = \
+ ['echo', 'Unrecognized Platform']
+ dsrc = DataSourceAltCloud({}, None, self.paths)
+ self.assertEquals('UNKNOWN', \
+ dsrc.get_cloud_type())
+
+ def test_exception1(self):
+ '''
+ Test method get_cloud_type() where command dmidecode fails.
+ '''
+ cloudinit.sources.DataSourceAltCloud.CMD_DMI_SYSTEM = \
+ ['ls', 'bad command']
+ dsrc = DataSourceAltCloud({}, None, self.paths)
+ self.assertEquals('UNKNOWN', \
+ dsrc.get_cloud_type())
+
+ def test_exception2(self):
+ '''
+ Test method get_cloud_type() where command dmidecode is not available.
+ '''
+ cloudinit.sources.DataSourceAltCloud.CMD_DMI_SYSTEM = \
+ ['bad command']
+ dsrc = DataSourceAltCloud({}, None, self.paths)
+ self.assertEquals('UNKNOWN', \
+ dsrc.get_cloud_type())
+
+
+class TestGetDataCloudInfoFile(TestCase):
+ '''
+ Test to exercise method: DataSourceAltCloud.get_data()
+ With a contrived CLOUD_INFO_FILE
+ '''
+ def setUp(self):
+ '''Set up.'''
+ self.paths = helpers.Paths({'cloud_dir': '/tmp'})
+ self.cloud_info_file = tempfile.mkstemp()[1]
+ cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE = \
+ self.cloud_info_file
+
+ def tearDown(self):
+ # Reset
+
+ # Attempt to remove the temp file ignoring errors
+ try:
+ os.remove(self.cloud_info_file)
+ except OSError:
+ pass
+
+ cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE = \
+ '/etc/sysconfig/cloud-info'
+
+ def test_rhev(self):
+ '''Success Test module get_data() forcing RHEV.'''
+
+ _write_cloud_info_file('RHEV')
+ dsrc = DataSourceAltCloud({}, None, self.paths)
+ dsrc.user_data_rhevm = lambda: True
+ self.assertEquals(True, dsrc.get_data())
+
+ def test_vsphere(self):
+ '''Success Test module get_data() forcing VSPHERE.'''
+
+ _write_cloud_info_file('VSPHERE')
+ dsrc = DataSourceAltCloud({}, None, self.paths)
+ dsrc.user_data_vsphere = lambda: True
+ self.assertEquals(True, dsrc.get_data())
+
+ def test_fail_rhev(self):
+ '''Failure Test module get_data() forcing RHEV.'''
+
+ _write_cloud_info_file('RHEV')
+ dsrc = DataSourceAltCloud({}, None, self.paths)
+ dsrc.user_data_rhevm = lambda: False
+ self.assertEquals(False, dsrc.get_data())
+
+ def test_fail_vsphere(self):
+ '''Failure Test module get_data() forcing VSPHERE.'''
+
+ _write_cloud_info_file('VSPHERE')
+ dsrc = DataSourceAltCloud({}, None, self.paths)
+ dsrc.user_data_vsphere = lambda: False
+ self.assertEquals(False, dsrc.get_data())
+
+ def test_unrecognized(self):
+ '''Failure Test module get_data() forcing unrecognized.'''
+
+ _write_cloud_info_file('unrecognized')
+ dsrc = DataSourceAltCloud({}, None, self.paths)
+ self.assertEquals(False, dsrc.get_data())
+
+
+class TestGetDataNoCloudInfoFile(TestCase):
+ '''
+ Test to exercise method: DataSourceAltCloud.get_data()
+ Without a CLOUD_INFO_FILE
+ '''
+ def setUp(self):
+ '''Set up.'''
+ self.paths = helpers.Paths({'cloud_dir': '/tmp'})
+ cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE = \
+ 'no such file'
+
+ def tearDown(self):
+ # Reset
+ cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE = \
+ '/etc/sysconfig/cloud-info'
+ cloudinit.sources.DataSourceAltCloud.CMD_DMI_SYSTEM = \
+ ['dmidecode', '--string', 'system-product-name']
+
+ def test_rhev_no_cloud_file(self):
+ '''Test No cloud info file module get_data() forcing RHEV.'''
+
+ cloudinit.sources.DataSourceAltCloud.CMD_DMI_SYSTEM = \
+ ['echo', 'RHEV Hypervisor']
+ dsrc = DataSourceAltCloud({}, None, self.paths)
+ dsrc.user_data_rhevm = lambda: True
+ self.assertEquals(True, dsrc.get_data())
+
+ def test_vsphere_no_cloud_file(self):
+ '''Test No cloud info file module get_data() forcing VSPHERE.'''
+
+ cloudinit.sources.DataSourceAltCloud.CMD_DMI_SYSTEM = \
+ ['echo', 'VMware Virtual Platform']
+ dsrc = DataSourceAltCloud({}, None, self.paths)
+ dsrc.user_data_vsphere = lambda: True
+ self.assertEquals(True, dsrc.get_data())
+
+ def test_failure_no_cloud_file(self):
+ '''Test No cloud info file module get_data() forcing unrecognized.'''
+
+ cloudinit.sources.DataSourceAltCloud.CMD_DMI_SYSTEM = \
+ ['echo', 'Unrecognized Platform']
+ dsrc = DataSourceAltCloud({}, None, self.paths)
+ self.assertEquals(False, dsrc.get_data())
+
+
+class TestUserDataRhevm(TestCase):
+ '''
+ Test to exercise method: DataSourceAltCloud.user_data_rhevm()
+ '''
+ def setUp(self):
+ '''Set up.'''
+ self.paths = helpers.Paths({'cloud_dir': '/tmp'})
+ self.mount_dir = tempfile.mkdtemp()
+
+ _write_user_data_files(self.mount_dir, 'test user data')
+
+ def tearDown(self):
+ # Reset
+
+ _remove_user_data_files(self.mount_dir)
+
+ # Attempt to remove the temp dir ignoring errors
+ try:
+ shutil.rmtree(self.mount_dir)
+ except OSError:
+ pass
+
+ cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE = \
+ '/etc/sysconfig/cloud-info'
+ cloudinit.sources.DataSourceAltCloud.CMD_PROBE_FLOPPY = \
+ ['/sbin/modprobe', 'floppy']
+ cloudinit.sources.DataSourceAltCloud.CMD_UDEVADM_SETTLE = \
+ ['/sbin/udevadm', 'settle', '--quiet', '--timeout=5']
+
+ def test_mount_cb_fails(self):
+ '''Test user_data_rhevm() where mount_cb fails.'''
+
+ cloudinit.sources.DataSourceAltCloud.CMD_PROBE_FLOPPY = \
+ ['echo', 'modprobe floppy']
+
+ dsrc = DataSourceAltCloud({}, None, self.paths)
+
+ self.assertEquals(False, dsrc.user_data_rhevm())
+
+ def test_modprobe_fails(self):
+ '''Test user_data_rhevm() where modprobe fails.'''
+
+ cloudinit.sources.DataSourceAltCloud.CMD_PROBE_FLOPPY = \
+ ['ls', 'modprobe floppy']
+
+ dsrc = DataSourceAltCloud({}, None, self.paths)
+
+ self.assertEquals(False, dsrc.user_data_rhevm())
+
+ def test_no_modprobe_cmd(self):
+ '''Test user_data_rhevm() with no modprobe command.'''
+
+ cloudinit.sources.DataSourceAltCloud.CMD_PROBE_FLOPPY = \
+ ['bad command', 'modprobe floppy']
+
+ dsrc = DataSourceAltCloud({}, None, self.paths)
+
+ self.assertEquals(False, dsrc.user_data_rhevm())
+
+ def test_udevadm_fails(self):
+ '''Test user_data_rhevm() where udevadm fails.'''
+
+ cloudinit.sources.DataSourceAltCloud.CMD_UDEVADM_SETTLE = \
+ ['ls', 'udevadm floppy']
+
+ dsrc = DataSourceAltCloud({}, None, self.paths)
+
+ self.assertEquals(False, dsrc.user_data_rhevm())
+
+ def test_no_udevadm_cmd(self):
+ '''Test user_data_rhevm() with no udevadm command.'''
+
+ cloudinit.sources.DataSourceAltCloud.CMD_UDEVADM_SETTLE = \
+ ['bad command', 'udevadm floppy']
+
+ dsrc = DataSourceAltCloud({}, None, self.paths)
+
+ self.assertEquals(False, dsrc.user_data_rhevm())
+
+
+class TestUserDataVsphere(TestCase):
+ '''
+ Test to exercise method: DataSourceAltCloud.user_data_vsphere()
+ '''
+ def setUp(self):
+ '''Set up.'''
+ self.paths = helpers.Paths({'cloud_dir': '/tmp'})
+ self.mount_dir = tempfile.mkdtemp()
+
+ _write_user_data_files(self.mount_dir, 'test user data')
+
+ def tearDown(self):
+ # Reset
+
+ _remove_user_data_files(self.mount_dir)
+
+ # Attempt to remove the temp dir ignoring errors
+ try:
+ shutil.rmtree(self.mount_dir)
+ except OSError:
+ pass
+
+ cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE = \
+ '/etc/sysconfig/cloud-info'
+
+ def test_user_data_vsphere(self):
+ '''Test user_data_vsphere() where mount_cb fails.'''
+
+ cloudinit.sources.DataSourceAltCloud.MEDIA_DIR = self.mount_dir
+
+ dsrc = DataSourceAltCloud({}, None, self.paths)
+
+ self.assertEquals(False, dsrc.user_data_vsphere())
+
+
+class TestReadUserDataCallback(TestCase):
+ '''
+ Test to exercise method: DataSourceAltCloud.read_user_data_callback()
+ '''
+ def setUp(self):
+ '''Set up.'''
+ self.paths = helpers.Paths({'cloud_dir': '/tmp'})
+ self.mount_dir = tempfile.mkdtemp()
+
+ _write_user_data_files(self.mount_dir, 'test user data')
+
+ def tearDown(self):
+ # Reset
+
+ _remove_user_data_files(self.mount_dir)
+
+ # Attempt to remove the temp dir ignoring errors
+ try:
+ shutil.rmtree(self.mount_dir)
+ except OSError:
+ pass
+
+ def test_callback_both(self):
+ '''Test read_user_data_callback() with both files.'''
+
+ self.assertEquals('test user data',
+ read_user_data_callback(self.mount_dir))
+
+ def test_callback_dc(self):
+ '''Test read_user_data_callback() with only DC file.'''
+
+ _remove_user_data_files(self.mount_dir,
+ dc_file=False,
+ non_dc_file=True)
+
+ self.assertEquals('test user data',
+ read_user_data_callback(self.mount_dir))
+
+ def test_callback_non_dc(self):
+ '''Test read_user_data_callback() with only non-DC file.'''
+
+ _remove_user_data_files(self.mount_dir,
+ dc_file=True,
+ non_dc_file=False)
+
+ self.assertEquals('test user data',
+ read_user_data_callback(self.mount_dir))
+
+ def test_callback_none(self):
+ '''Test read_user_data_callback() no files are found.'''
+
+ _remove_user_data_files(self.mount_dir)
+ self.assertEquals(None, read_user_data_callback(self.mount_dir))
+
+# vi: ts=4 expandtab
diff --git a/tests/unittests/test_datasource/test_configdrive.py b/tests/unittests/test_datasource/test_configdrive.py
new file mode 100644
index 00000000..55573114
--- /dev/null
+++ b/tests/unittests/test_datasource/test_configdrive.py
@@ -0,0 +1,177 @@
+from copy import copy
+import json
+import os
+import os.path
+import shutil
+import tempfile
+from unittest import TestCase
+
+from cloudinit.sources import DataSourceConfigDrive as ds
+from cloudinit import util
+
+
+PUBKEY = u'ssh-rsa AAAAB3NzaC1....sIkJhq8wdX+4I3A4cYbYP ubuntu@server-460\n'
+EC2_META = {
+ 'ami-id': 'ami-00000001',
+ 'ami-launch-index': 0,
+ 'ami-manifest-path': 'FIXME',
+ 'block-device-mapping': {
+ 'ami': 'sda1',
+ 'ephemeral0': 'sda2',
+ 'root': '/dev/sda1',
+ 'swap': 'sda3'},
+ 'hostname': 'sm-foo-test.novalocal',
+ 'instance-action': 'none',
+ 'instance-id': 'i-00000001',
+ 'instance-type': 'm1.tiny',
+ 'local-hostname': 'sm-foo-test.novalocal',
+ 'local-ipv4': None,
+ 'placement': {'availability-zone': 'nova'},
+ 'public-hostname': 'sm-foo-test.novalocal',
+ 'public-ipv4': '',
+ 'public-keys': {'0': {'openssh-key': PUBKEY}},
+ 'reservation-id': 'r-iru5qm4m',
+ 'security-groups': ['default']
+}
+USER_DATA = '#!/bin/sh\necho This is user data\n'
+OSTACK_META = {
+ 'availability_zone': 'nova',
+ 'files': [{'content_path': '/content/0000', 'path': '/etc/foo.cfg'},
+ {'content_path': '/content/0001', 'path': '/etc/bar/bar.cfg'}],
+ 'hostname': 'sm-foo-test.novalocal',
+ 'meta': {'dsmode': 'local', 'my-meta': 'my-value'},
+ 'name': 'sm-foo-test',
+ 'public_keys': {'mykey': PUBKEY},
+ 'uuid': 'b0fa911b-69d4-4476-bbe2-1c92bff6535c'}
+
+CONTENT_0 = 'This is contents of /etc/foo.cfg\n'
+CONTENT_1 = '# this is /etc/bar/bar.cfg\n'
+
+CFG_DRIVE_FILES_V2 = {
+ 'ec2/2009-04-04/meta-data.json': json.dumps(EC2_META),
+ 'ec2/2009-04-04/user-data': USER_DATA,
+ 'ec2/latest/meta-data.json': json.dumps(EC2_META),
+ 'ec2/latest/user-data': USER_DATA,
+ 'openstack/2012-08-10/meta_data.json': json.dumps(OSTACK_META),
+ 'openstack/2012-08-10/user_data': USER_DATA,
+ 'openstack/content/0000': CONTENT_0,
+ 'openstack/content/0001': CONTENT_1,
+ 'openstack/latest/meta_data.json': json.dumps(OSTACK_META),
+ 'openstack/latest/user_data': USER_DATA}
+
+
+class TestConfigDriveDataSource(TestCase):
+
+ def setUp(self):
+ super(TestConfigDriveDataSource, self).setUp()
+ self.tmp = tempfile.mkdtemp()
+
+ def tearDown(self):
+ try:
+ shutil.rmtree(self.tmp)
+ except OSError:
+ pass
+
+ def test_dir_valid(self):
+ """Verify a dir is read as such."""
+
+ populate_dir(self.tmp, CFG_DRIVE_FILES_V2)
+
+ found = ds.read_config_drive_dir(self.tmp)
+
+ expected_md = copy(OSTACK_META)
+ expected_md['instance-id'] = expected_md['uuid']
+
+ self.assertEqual(USER_DATA, found['userdata'])
+ self.assertEqual(expected_md, found['metadata'])
+ self.assertEqual(found['files']['/etc/foo.cfg'], CONTENT_0)
+ self.assertEqual(found['files']['/etc/bar/bar.cfg'], CONTENT_1)
+
+ def test_seed_dir_valid_extra(self):
+ """Verify extra files do not affect datasource validity."""
+
+ data = copy(CFG_DRIVE_FILES_V2)
+ data["myfoofile.txt"] = "myfoocontent"
+ data["openstack/latest/random-file.txt"] = "random-content"
+
+ populate_dir(self.tmp, data)
+
+ found = ds.read_config_drive_dir(self.tmp)
+
+ expected_md = copy(OSTACK_META)
+ expected_md['instance-id'] = expected_md['uuid']
+
+ self.assertEqual(expected_md, found['metadata'])
+
+ def test_seed_dir_bad_json_metadata(self):
+ """Verify that bad json in metadata raises BrokenConfigDriveDir."""
+ data = copy(CFG_DRIVE_FILES_V2)
+
+ data["openstack/2012-08-10/meta_data.json"] = "non-json garbage {}"
+ data["openstack/latest/meta_data.json"] = "non-json garbage {}"
+
+ populate_dir(self.tmp, data)
+
+ self.assertRaises(ds.BrokenConfigDriveDir,
+ ds.read_config_drive_dir, self.tmp)
+
+ def test_seed_dir_no_configdrive(self):
+ """Verify that no metadata raises NonConfigDriveDir."""
+
+ my_d = os.path.join(self.tmp, "non-configdrive")
+ data = copy(CFG_DRIVE_FILES_V2)
+ data["myfoofile.txt"] = "myfoocontent"
+ data["openstack/latest/random-file.txt"] = "random-content"
+ data["content/foo"] = "foocontent"
+
+ self.assertRaises(ds.NonConfigDriveDir,
+ ds.read_config_drive_dir, my_d)
+
+ def test_seed_dir_missing(self):
+ """Verify that missing seed_dir raises NonConfigDriveDir."""
+ my_d = os.path.join(self.tmp, "nonexistantdirectory")
+ self.assertRaises(ds.NonConfigDriveDir,
+ ds.read_config_drive_dir, my_d)
+
+ def test_find_candidates(self):
+ devs_with_answers = {
+ "TYPE=vfat": [],
+ "TYPE=iso9660": ["/dev/vdb"],
+ "LABEL=config-2": ["/dev/vdb"],
+ }
+
+ def my_devs_with(criteria):
+ return devs_with_answers[criteria]
+
+ try:
+ orig_find_devs_with = util.find_devs_with
+ util.find_devs_with = my_devs_with
+
+ self.assertEqual(["/dev/vdb"], ds.find_candidate_devs())
+
+ # add a vfat item
+ # zdd reverse sorts after vdb, but config-2 label is preferred
+ devs_with_answers['TYPE=vfat'] = ["/dev/zdd"]
+ self.assertEqual(["/dev/vdb", "/dev/zdd"],
+ ds.find_candidate_devs())
+
+ # verify that partitions are not considered
+ devs_with_answers = {"TYPE=vfat": ["/dev/sda1"],
+ "TYPE=iso9660": [], "LABEL=config-2": ["/dev/vdb3"]}
+ self.assertEqual([], ds.find_candidate_devs())
+
+ finally:
+ util.find_devs_with = orig_find_devs_with
+
+
+def populate_dir(seed_dir, files):
+ for (name, content) in files.iteritems():
+ path = os.path.join(seed_dir, name)
+ dirname = os.path.dirname(path)
+ if not os.path.isdir(dirname):
+ os.makedirs(dirname)
+ with open(path, "w") as fp:
+ fp.write(content)
+ fp.close()
+
+# vi: ts=4 expandtab
diff --git a/tests/unittests/test_datasource/test_maas.py b/tests/unittests/test_datasource/test_maas.py
index 8a155f39..85e6add0 100644
--- a/tests/unittests/test_datasource/test_maas.py
+++ b/tests/unittests/test_datasource/test_maas.py
@@ -1,8 +1,8 @@
-import os
from copy import copy
+import os
-from cloudinit import url_helper
from cloudinit.sources import DataSourceMAAS
+from cloudinit import url_helper
from mocker import MockerTestCase
@@ -15,7 +15,7 @@ class TestMAASDataSource(MockerTestCase):
self.tmp = self.makeDir()
def test_seed_dir_valid(self):
- """Verify a valid seeddir is read as such"""
+ """Verify a valid seeddir is read as such."""
data = {'instance-id': 'i-valid01',
'local-hostname': 'valid01-hostname',
@@ -35,7 +35,7 @@ class TestMAASDataSource(MockerTestCase):
self.assertFalse(('user-data' in metadata))
def test_seed_dir_valid_extra(self):
- """Verify extra files do not affect seed_dir validity """
+ """Verify extra files do not affect seed_dir validity."""
data = {'instance-id': 'i-valid-extra',
'local-hostname': 'valid-extra-hostname',
@@ -54,7 +54,7 @@ class TestMAASDataSource(MockerTestCase):
self.assertFalse(('foo' in metadata))
def test_seed_dir_invalid(self):
- """Verify that invalid seed_dir raises MAASSeedDirMalformed"""
+ """Verify that invalid seed_dir raises MAASSeedDirMalformed."""
valid = {'instance-id': 'i-instanceid',
'local-hostname': 'test-hostname', 'user-data': ''}
@@ -78,20 +78,20 @@ class TestMAASDataSource(MockerTestCase):
DataSourceMAAS.read_maas_seed_dir, my_d)
def test_seed_dir_none(self):
- """Verify that empty seed_dir raises MAASSeedDirNone"""
+ """Verify that empty seed_dir raises MAASSeedDirNone."""
my_d = os.path.join(self.tmp, "valid_empty")
self.assertRaises(DataSourceMAAS.MAASSeedDirNone,
DataSourceMAAS.read_maas_seed_dir, my_d)
def test_seed_dir_missing(self):
- """Verify that missing seed_dir raises MAASSeedDirNone"""
- self.assertRaises(DataSourceMAAS.MAASSeedDirNone,
+ """Verify that missing seed_dir raises MAASSeedDirNone."""
+ self.assertRaises(DataSourceMAAS.MAASSeedDirNone,
DataSourceMAAS.read_maas_seed_dir,
os.path.join(self.tmp, "nonexistantdirectory"))
def test_seed_url_valid(self):
- """Verify that valid seed_url is read as such"""
+ """Verify that valid seed_url is read as such."""
valid = {'meta-data/instance-id': 'i-instanceid',
'meta-data/local-hostname': 'test-hostname',
'meta-data/public-keys': 'test-hostname',
@@ -129,11 +129,11 @@ class TestMAASDataSource(MockerTestCase):
valid['meta-data/local-hostname'])
def test_seed_url_invalid(self):
- """Verify that invalid seed_url raises MAASSeedDirMalformed"""
+ """Verify that invalid seed_url raises MAASSeedDirMalformed."""
pass
def test_seed_url_missing(self):
- """Verify seed_url with no found entries raises MAASSeedDirNone"""
+ """Verify seed_url with no found entries raises MAASSeedDirNone."""
pass
diff --git a/tests/unittests/test_distros/test_generic.py b/tests/unittests/test_distros/test_generic.py
new file mode 100644
index 00000000..2df4c2f0
--- /dev/null
+++ b/tests/unittests/test_distros/test_generic.py
@@ -0,0 +1,121 @@
+from mocker import MockerTestCase
+
+from cloudinit import distros
+
+unknown_arch_info = {
+ 'arches': ['default'],
+ 'failsafe': {'primary': 'http://fs-primary-default',
+ 'security': 'http://fs-security-default'}
+}
+
+package_mirrors = [
+ {'arches': ['i386', 'amd64'],
+ 'failsafe': {'primary': 'http://fs-primary-intel',
+ 'security': 'http://fs-security-intel'},
+ 'search': {
+ 'primary': ['http://%(ec2_region)s.ec2/',
+ 'http://%(availability_zone)s.clouds/'],
+ 'security': ['http://security-mirror1-intel',
+ 'http://security-mirror2-intel']}},
+ {'arches': ['armhf', 'armel'],
+ 'failsafe': {'primary': 'http://fs-primary-arm',
+ 'security': 'http://fs-security-arm'}},
+ unknown_arch_info
+]
+
+gpmi = distros._get_package_mirror_info # pylint: disable=W0212
+gapmi = distros._get_arch_package_mirror_info # pylint: disable=W0212
+
+
+class TestGenericDistro(MockerTestCase):
+
+ def return_first(self, mlist):
+ if not mlist:
+ return None
+ return mlist[0]
+
+ def return_second(self, mlist):
+ if not mlist:
+ return None
+ return mlist[1]
+
+ def return_none(self, _mlist):
+ return None
+
+ def return_last(self, mlist):
+ if not mlist:
+ return None
+ return(mlist[-1])
+
+ def setUp(self):
+ super(TestGenericDistro, self).setUp()
+ # Make a temp directoy for tests to use.
+ self.tmp = self.makeDir()
+
+ def test_arch_package_mirror_info_unknown(self):
+ """for an unknown arch, we should get back that with arch 'default'."""
+ arch_mirrors = gapmi(package_mirrors, arch="unknown")
+ self.assertEqual(unknown_arch_info, arch_mirrors)
+
+ def test_arch_package_mirror_info_known(self):
+ arch_mirrors = gapmi(package_mirrors, arch="amd64")
+ self.assertEqual(package_mirrors[0], arch_mirrors)
+
+ def test_get_package_mirror_info_az_ec2(self):
+ arch_mirrors = gapmi(package_mirrors, arch="amd64")
+
+ results = gpmi(arch_mirrors, availability_zone="us-east-1a",
+ mirror_filter=self.return_first)
+ self.assertEqual(results,
+ {'primary': 'http://us-east-1.ec2/',
+ 'security': 'http://security-mirror1-intel'})
+
+ results = gpmi(arch_mirrors, availability_zone="us-east-1a",
+ mirror_filter=self.return_second)
+ self.assertEqual(results,
+ {'primary': 'http://us-east-1a.clouds/',
+ 'security': 'http://security-mirror2-intel'})
+
+ results = gpmi(arch_mirrors, availability_zone="us-east-1a",
+ mirror_filter=self.return_none)
+ self.assertEqual(results, package_mirrors[0]['failsafe'])
+
+ def test_get_package_mirror_info_az_non_ec2(self):
+ arch_mirrors = gapmi(package_mirrors, arch="amd64")
+
+ results = gpmi(arch_mirrors, availability_zone="nova.cloudvendor",
+ mirror_filter=self.return_first)
+ self.assertEqual(results,
+ {'primary': 'http://nova.cloudvendor.clouds/',
+ 'security': 'http://security-mirror1-intel'})
+
+ results = gpmi(arch_mirrors, availability_zone="nova.cloudvendor",
+ mirror_filter=self.return_last)
+ self.assertEqual(results,
+ {'primary': 'http://nova.cloudvendor.clouds/',
+ 'security': 'http://security-mirror2-intel'})
+
+ def test_get_package_mirror_info_none(self):
+ arch_mirrors = gapmi(package_mirrors, arch="amd64")
+
+ # because both search entries here replacement based on
+ # availability-zone, the filter will be called with an empty list and
+ # failsafe should be taken.
+ results = gpmi(arch_mirrors, availability_zone=None,
+ mirror_filter=self.return_first)
+ self.assertEqual(results,
+ {'primary': 'http://fs-primary-intel',
+ 'security': 'http://security-mirror1-intel'})
+
+ results = gpmi(arch_mirrors, availability_zone=None,
+ mirror_filter=self.return_last)
+ self.assertEqual(results,
+ {'primary': 'http://fs-primary-intel',
+ 'security': 'http://security-mirror2-intel'})
+
+
+#def _get_package_mirror_info(mirror_info, availability_zone=None,
+# mirror_filter=util.search_for_mirror):
+
+
+# vi: ts=4 expandtab
diff --git a/tests/unittests/test_filters/test_launch_index.py b/tests/unittests/test_filters/test_launch_index.py
new file mode 100644
index 00000000..7ca7cbb6
--- /dev/null
+++ b/tests/unittests/test_filters/test_launch_index.py
@@ -0,0 +1,134 @@
+import copy
+
+import helpers as th
+
+import itertools
+
+from cloudinit.filters import launch_index
+from cloudinit import user_data as ud
+from cloudinit import util
+
+
+def count_messages(root):
+ am = 0
+ for m in root.walk():
+ if ud.is_skippable(m):
+ continue
+ am += 1
+ return am
+
+
+class TestLaunchFilter(th.ResourceUsingTestCase):
+
+ def assertCounts(self, message, expected_counts):
+ orig_message = copy.deepcopy(message)
+ for (index, count) in expected_counts.items():
+ index = util.safe_int(index)
+ filtered_message = launch_index.Filter(index).apply(message)
+ self.assertEquals(count_messages(filtered_message), count)
+ # Ensure original message still ok/not modified
+ self.assertTrue(self.equivalentMessage(message, orig_message))
+
+ def equivalentMessage(self, msg1, msg2):
+ msg1_count = count_messages(msg1)
+ msg2_count = count_messages(msg2)
+ if msg1_count != msg2_count:
+ return False
+ # Do some basic payload checking
+ msg1_msgs = [m for m in msg1.walk()]
+ msg1_msgs = [m for m in
+ itertools.ifilterfalse(ud.is_skippable, msg1_msgs)]
+ msg2_msgs = [m for m in msg2.walk()]
+ msg2_msgs = [m for m in
+ itertools.ifilterfalse(ud.is_skippable, msg2_msgs)]
+ for i in range(0, len(msg2_msgs)):
+ m1_msg = msg1_msgs[i]
+ m2_msg = msg2_msgs[i]
+ if m1_msg.get_charset() != m2_msg.get_charset():
+ return False
+ if m1_msg.is_multipart() != m2_msg.is_multipart():
+ return False
+ m1_py = m1_msg.get_payload(decode=True)
+ m2_py = m2_msg.get_payload(decode=True)
+ if m1_py != m2_py:
+ return False
+ return True
+
+ def testMultiEmailIndex(self):
+ test_data = self.readResource('filter_cloud_multipart_2.email')
+ ud_proc = ud.UserDataProcessor(self.getCloudPaths())
+ message = ud_proc.process(test_data)
+ self.assertTrue(count_messages(message) > 0)
+ # This file should have the following
+ # indexes -> amount mapping in it
+ expected_counts = {
+ 3: 1,
+ 2: 2,
+ None: 3,
+ -1: 0,
+ }
+ self.assertCounts(message, expected_counts)
+
+ def testHeaderEmailIndex(self):
+ test_data = self.readResource('filter_cloud_multipart_header.email')
+ ud_proc = ud.UserDataProcessor(self.getCloudPaths())
+ message = ud_proc.process(test_data)
+ self.assertTrue(count_messages(message) > 0)
+ # This file should have the following
+ # indexes -> amount mapping in it
+ expected_counts = {
+ 5: 1,
+ -1: 0,
+ 'c': 1,
+ None: 1,
+ }
+ self.assertCounts(message, expected_counts)
+
+ def testConfigEmailIndex(self):
+ test_data = self.readResource('filter_cloud_multipart_1.email')
+ ud_proc = ud.UserDataProcessor(self.getCloudPaths())
+ message = ud_proc.process(test_data)
+ self.assertTrue(count_messages(message) > 0)
+ # This file should have the following
+ # indexes -> amount mapping in it
+ expected_counts = {
+ 2: 1,
+ -1: 0,
+ None: 1,
+ }
+ self.assertCounts(message, expected_counts)
+
+ def testNoneIndex(self):
+ test_data = self.readResource('filter_cloud_multipart.yaml')
+ ud_proc = ud.UserDataProcessor(self.getCloudPaths())
+ message = ud_proc.process(test_data)
+ start_count = count_messages(message)
+ self.assertTrue(start_count > 0)
+ filtered_message = launch_index.Filter(None).apply(message)
+ self.assertTrue(self.equivalentMessage(message, filtered_message))
+
+ def testIndexes(self):
+ test_data = self.readResource('filter_cloud_multipart.yaml')
+ ud_proc = ud.UserDataProcessor(self.getCloudPaths())
+ message = ud_proc.process(test_data)
+ start_count = count_messages(message)
+ self.assertTrue(start_count > 0)
+ # This file should have the following
+ # indexes -> amount mapping in it
+ expected_counts = {
+ 2: 2,
+ 3: 2,
+ 1: 2,
+ 0: 1,
+ 4: 1,
+ 7: 0,
+ -1: 0,
+ 100: 0,
+ # None should just give all back
+ None: start_count,
+ # Non ints should be ignored
+ 'c': start_count,
+ # Strings should be converted
+ '1': 2,
+ }
+ self.assertCounts(message, expected_counts)
diff --git a/tests/unittests/test_handler/test_handler_ca_certs.py b/tests/unittests/test_handler/test_handler_ca_certs.py
index 948de4c4..d3df5c50 100644
--- a/tests/unittests/test_handler/test_handler_ca_certs.py
+++ b/tests/unittests/test_handler/test_handler_ca_certs.py
@@ -1,8 +1,8 @@
from mocker import MockerTestCase
-from cloudinit import util
from cloudinit import cloud
from cloudinit import helpers
+from cloudinit import util
from cloudinit.config import cc_ca_certs
@@ -64,7 +64,7 @@ class TestConfig(MockerTestCase):
cc_ca_certs.handle(self.name, config, self.cloud, self.log, self.args)
def test_empty_trusted_list(self):
- """Test that no certificate are written if 'trusted' list is empty"""
+ """Test that no certificate are written if 'trusted' list is empty."""
config = {"ca-certs": {"trusted": []}}
# No functions should be called
@@ -74,7 +74,7 @@ class TestConfig(MockerTestCase):
cc_ca_certs.handle(self.name, config, self.cloud, self.log, self.args)
def test_single_trusted(self):
- """Test that a single cert gets passed to add_ca_certs"""
+ """Test that a single cert gets passed to add_ca_certs."""
config = {"ca-certs": {"trusted": ["CERT1"]}}
self.mock_add(self.paths, ["CERT1"])
@@ -84,7 +84,7 @@ class TestConfig(MockerTestCase):
cc_ca_certs.handle(self.name, config, self.cloud, self.log, self.args)
def test_multiple_trusted(self):
- """Test that multiple certs get passed to add_ca_certs"""
+ """Test that multiple certs get passed to add_ca_certs."""
config = {"ca-certs": {"trusted": ["CERT1", "CERT2"]}}
self.mock_add(self.paths, ["CERT1", "CERT2"])
@@ -94,7 +94,7 @@ class TestConfig(MockerTestCase):
cc_ca_certs.handle(self.name, config, self.cloud, self.log, self.args)
def test_remove_default_ca_certs(self):
- """Test remove_defaults works as expected"""
+ """Test remove_defaults works as expected."""
config = {"ca-certs": {"remove-defaults": True}}
self.mock_remove(self.paths)
@@ -104,7 +104,7 @@ class TestConfig(MockerTestCase):
cc_ca_certs.handle(self.name, config, self.cloud, self.log, self.args)
def test_no_remove_defaults_if_false(self):
- """Test remove_defaults is not called when config value is False"""
+ """Test remove_defaults is not called when config value is False."""
config = {"ca-certs": {"remove-defaults": False}}
self.mock_update()
@@ -113,7 +113,7 @@ class TestConfig(MockerTestCase):
cc_ca_certs.handle(self.name, config, self.cloud, self.log, self.args)
def test_correct_order_for_remove_then_add(self):
- """Test remove_defaults is not called when config value is False"""
+ """Test remove_defaults is not called when config value is False."""
config = {"ca-certs": {"remove-defaults": True, "trusted": ["CERT1"]}}
self.mock_remove(self.paths)
@@ -139,7 +139,7 @@ class TestAddCaCerts(MockerTestCase):
cc_ca_certs.add_ca_certs(self.paths, [])
def test_single_cert(self):
- """Test adding a single certificate to the trusted CAs"""
+ """Test adding a single certificate to the trusted CAs."""
cert = "CERT1\nLINE2\nLINE3"
mock_write = self.mocker.replace(util.write_file, passthrough=False)
@@ -152,7 +152,7 @@ class TestAddCaCerts(MockerTestCase):
cc_ca_certs.add_ca_certs(self.paths, [cert])
def test_multiple_certs(self):
- """Test adding multiple certificates to the trusted CAs"""
+ """Test adding multiple certificates to the trusted CAs."""
certs = ["CERT1\nLINE2\nLINE3", "CERT2\nLINE2\nLINE3"]
expected_cert_file = "\n".join(certs)
diff --git a/tests/unittests/test_userdata.py b/tests/unittests/test_userdata.py
index fbbf07f2..82a4c555 100644
--- a/tests/unittests/test_userdata.py
+++ b/tests/unittests/test_userdata.py
@@ -1,4 +1,4 @@
-"""Tests for handling of userdata within cloud init"""
+"""Tests for handling of userdata within cloud init."""
import StringIO
@@ -54,7 +54,7 @@ class TestConsumeUserData(MockerTestCase):
return log_file
def test_unhandled_type_warning(self):
- """Raw text without magic is ignored but shows warning"""
+ """Raw text without magic is ignored but shows warning."""
ci = stages.Init()
data = "arbitrary text\n"
ci.datasource = FakeDataSource(data)
@@ -70,7 +70,7 @@ class TestConsumeUserData(MockerTestCase):
log_file.getvalue())
def test_mime_text_plain(self):
- """Mime message of type text/plain is ignored but shows warning"""
+ """Mime message of type text/plain is ignored but shows warning."""
ci = stages.Init()
message = MIMEBase("text", "plain")
message.set_payload("Just text")
@@ -86,9 +86,8 @@ class TestConsumeUserData(MockerTestCase):
"Unhandled unknown content-type (text/plain)",
log_file.getvalue())
-
def test_shellscript(self):
- """Raw text starting #!/bin/sh is treated as script"""
+ """Raw text starting #!/bin/sh is treated as script."""
ci = stages.Init()
script = "#!/bin/sh\necho hello\n"
ci.datasource = FakeDataSource(script)
@@ -104,7 +103,7 @@ class TestConsumeUserData(MockerTestCase):
self.assertEqual("", log_file.getvalue())
def test_mime_text_x_shellscript(self):
- """Mime message of type text/x-shellscript is treated as script"""
+ """Mime message of type text/x-shellscript is treated as script."""
ci = stages.Init()
script = "#!/bin/sh\necho hello\n"
message = MIMEBase("text", "x-shellscript")
@@ -122,7 +121,7 @@ class TestConsumeUserData(MockerTestCase):
self.assertEqual("", log_file.getvalue())
def test_mime_text_plain_shell(self):
- """Mime type text/plain starting #!/bin/sh is treated as script"""
+ """Mime type text/plain starting #!/bin/sh is treated as script."""
ci = stages.Init()
script = "#!/bin/sh\necho hello\n"
message = MIMEBase("text", "plain")
diff --git a/tests/unittests/test_util.py b/tests/unittests/test_util.py
index 19f66cc4..15fcbd26 100644
--- a/tests/unittests/test_util.py
+++ b/tests/unittests/test_util.py
@@ -1,11 +1,11 @@
import os
import stat
-from unittest import TestCase
from mocker import MockerTestCase
+from unittest import TestCase
-from cloudinit import util
from cloudinit import importer
+from cloudinit import util
class FakeSelinux(object):