diff options
Diffstat (limited to 'tests')
-rw-r--r-- | tests/data/filter_cloud_multipart.yaml | 30 | ||||
-rw-r--r-- | tests/data/filter_cloud_multipart_1.email | 11 | ||||
-rw-r--r-- | tests/data/filter_cloud_multipart_2.email | 39 | ||||
-rw-r--r-- | tests/data/filter_cloud_multipart_header.email | 11 | ||||
-rw-r--r-- | tests/unittests/helpers.py | 42 | ||||
-rw-r--r-- | tests/unittests/test__init__.py | 14 | ||||
-rw-r--r-- | tests/unittests/test_builtin_handlers.py | 4 | ||||
-rw-r--r-- | tests/unittests/test_datasource/test_altcloud.py | 445 | ||||
-rw-r--r-- | tests/unittests/test_datasource/test_configdrive.py | 177 | ||||
-rw-r--r-- | tests/unittests/test_datasource/test_maas.py | 22 | ||||
-rw-r--r-- | tests/unittests/test_distros/test_generic.py | 121 | ||||
-rw-r--r-- | tests/unittests/test_filters/test_launch_index.py | 134 | ||||
-rw-r--r-- | tests/unittests/test_handler/test_handler_ca_certs.py | 18 | ||||
-rw-r--r-- | tests/unittests/test_userdata.py | 13 | ||||
-rw-r--r-- | tests/unittests/test_util.py | 4 |
15 files changed, 1047 insertions, 38 deletions
diff --git a/tests/data/filter_cloud_multipart.yaml b/tests/data/filter_cloud_multipart.yaml new file mode 100644 index 00000000..7acc2b9d --- /dev/null +++ b/tests/data/filter_cloud_multipart.yaml @@ -0,0 +1,30 @@ +#cloud-config-archive +--- +- content: "\n blah: true\n launch-index: 3\n" + type: text/cloud-config +- content: "\n blah: true\n launch-index: 4\n" + type: text/cloud-config +- content: The quick brown fox jumps over the lazy dog + filename: b0.txt + launch-index: 0 + type: plain/text +- content: The quick brown fox jumps over the lazy dog + filename: b3.txt + launch-index: 3 + type: plain/text +- content: The quick brown fox jumps over the lazy dog + filename: b2.txt + launch-index: 2 + type: plain/text +- content: '#!/bin/bash \n echo "stuff"' + filename: b2.txt + launch-index: 2 +- content: '#!/bin/bash \n echo "stuff"' + filename: b2.txt + launch-index: 1 +- content: '#!/bin/bash \n echo "stuff"' + filename: b2.txt + # Use a string to see if conversion works + launch-index: "1" +... + diff --git a/tests/data/filter_cloud_multipart_1.email b/tests/data/filter_cloud_multipart_1.email new file mode 100644 index 00000000..6d93b1f1 --- /dev/null +++ b/tests/data/filter_cloud_multipart_1.email @@ -0,0 +1,11 @@ +From nobody Fri Aug 31 17:17:00 2012 +Content-Type: text/plain; charset="us-ascii" +MIME-Version: 1.0 +Content-Transfer-Encoding: 7bit + + +#cloud-config +b: c +launch-index: 2 + + diff --git a/tests/data/filter_cloud_multipart_2.email b/tests/data/filter_cloud_multipart_2.email new file mode 100644 index 00000000..b04068c5 --- /dev/null +++ b/tests/data/filter_cloud_multipart_2.email @@ -0,0 +1,39 @@ +From nobody Fri Aug 31 17:43:04 2012 +Content-Type: multipart/mixed; boundary="===============1668325974==" +MIME-Version: 1.0 + +--===============1668325974== +Content-Type: text/cloud-config; charset="us-ascii" +MIME-Version: 1.0 +Content-Transfer-Encoding: 7bit + + +#cloud-config +b: c +launch-index: 2 + + +--===============1668325974== +Content-Type: text/plain; charset="us-ascii" +MIME-Version: 1.0 +Content-Transfer-Encoding: 7bit + + +#cloud-config-archive +- content: The quick brown fox jumps over the lazy dog + filename: b3.txt + launch-index: 3 + type: plain/text + +--===============1668325974== +Content-Type: text/plain; charset="us-ascii" +MIME-Version: 1.0 +Content-Transfer-Encoding: 7bit + + +#cloud-config +b: c +launch-index: 2 + + +--===============1668325974==-- diff --git a/tests/data/filter_cloud_multipart_header.email b/tests/data/filter_cloud_multipart_header.email new file mode 100644 index 00000000..770f7ef1 --- /dev/null +++ b/tests/data/filter_cloud_multipart_header.email @@ -0,0 +1,11 @@ +From nobody Fri Aug 31 17:17:00 2012 +Content-Type: text/plain; charset="us-ascii" +MIME-Version: 1.0 +Launch-Index: 5 +Content-Transfer-Encoding: 7bit + + +#cloud-config +b: c + + diff --git a/tests/unittests/helpers.py b/tests/unittests/helpers.py new file mode 100644 index 00000000..d0f09e70 --- /dev/null +++ b/tests/unittests/helpers.py @@ -0,0 +1,42 @@ +import os + +from mocker import MockerTestCase + +from cloudinit import helpers as ch + + +class ResourceUsingTestCase(MockerTestCase): + def __init__(self, methodName="runTest"): + MockerTestCase.__init__(self, methodName) + self.resource_path = None + + def resourceLocation(self, subname=None): + if self.resource_path is None: + paths = [ + os.path.join('tests', 'data'), + os.path.join('data'), + os.path.join(os.pardir, 'tests', 'data'), + os.path.join(os.pardir, 'data'), + ] + for p in paths: + if os.path.isdir(p): + self.resource_path = p + break + self.assertTrue((self.resource_path and + os.path.isdir(self.resource_path)), + msg="Unable to locate test resource data path!") + if not subname: + return self.resource_path + return os.path.join(self.resource_path, subname) + + def readResource(self, name): + where = self.resourceLocation(name) + with open(where, 'r') as fh: + return fh.read() + + def getCloudPaths(self): + cp = ch.Paths({ + 'cloud_dir': self.makeDir(), + 'templates_dir': self.resourceLocation(), + }) + return cp diff --git a/tests/unittests/test__init__.py b/tests/unittests/test__init__.py index 464c8c2f..ac082076 100644 --- a/tests/unittests/test__init__.py +++ b/tests/unittests/test__init__.py @@ -1,6 +1,6 @@ -import StringIO import logging import os +import StringIO import sys from mocker import MockerTestCase, ANY, ARGS, KWARGS @@ -61,14 +61,14 @@ class TestWalkerHandleHandler(MockerTestCase): import_mock(self.expected_module_name) self.mocker.result(self.module_fake) self.mocker.replay() - + handlers.walker_handle_handler(self.data, self.ctype, self.filename, self.payload) - + self.assertEqual(1, self.data["handlercount"]) - + def test_import_error(self): - """Module import errors are logged. No handler added to C{pdata}""" + """Module import errors are logged. No handler added to C{pdata}.""" import_mock = self.mocker.replace(importer.import_module, passthrough=False) import_mock(self.expected_module_name) @@ -81,7 +81,7 @@ class TestWalkerHandleHandler(MockerTestCase): self.assertEqual(0, self.data["handlercount"]) def test_attribute_error(self): - """Attribute errors are logged. No handler added to C{pdata}""" + """Attribute errors are logged. No handler added to C{pdata}.""" import_mock = self.mocker.replace(importer.import_module, passthrough=False) import_mock(self.expected_module_name) @@ -156,7 +156,7 @@ class TestHandlerHandlePart(MockerTestCase): self.payload, self.frequency) def test_no_handle_when_modfreq_once(self): - """C{handle_part} is not called if frequency is once""" + """C{handle_part} is not called if frequency is once.""" self.frequency = "once" mod_mock = self.mocker.mock() getattr(mod_mock, "frequency") diff --git a/tests/unittests/test_builtin_handlers.py b/tests/unittests/test_builtin_handlers.py index 5bba8bc9..ebc0bd51 100644 --- a/tests/unittests/test_builtin_handlers.py +++ b/tests/unittests/test_builtin_handlers.py @@ -1,4 +1,4 @@ -"""Tests of the built-in user data handlers""" +"""Tests of the built-in user data handlers.""" import os @@ -33,7 +33,7 @@ class TestBuiltins(MockerTestCase): None, None, None) self.assertEquals(0, len(os.listdir(up_root))) - def test_upstart_frequency_single(self): + def test_upstart_frequency_single(self): c_root = self.makeDir() up_root = self.makeDir() paths = helpers.Paths({ diff --git a/tests/unittests/test_datasource/test_altcloud.py b/tests/unittests/test_datasource/test_altcloud.py new file mode 100644 index 00000000..bda61c7e --- /dev/null +++ b/tests/unittests/test_datasource/test_altcloud.py @@ -0,0 +1,445 @@ +# vi: ts=4 expandtab +# +# Copyright (C) 2009-2010 Canonical Ltd. +# Copyright (C) 2012 Hewlett-Packard Development Company, L.P. +# Copyright (C) 2012 Yahoo! Inc. +# +# Author: Joe VLcek <JVLcek@RedHat.com> +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 3, as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +''' +This test file exercises the code in sources DataSourceAltCloud.py +''' + +import os +import shutil +import tempfile + +from cloudinit import helpers +from unittest import TestCase + +# Get the cloudinit.sources.DataSourceAltCloud import items needed. +import cloudinit.sources.DataSourceAltCloud +from cloudinit.sources.DataSourceAltCloud import DataSourceAltCloud +from cloudinit.sources.DataSourceAltCloud import read_user_data_callback + + +def _write_cloud_info_file(value): + ''' + Populate the CLOUD_INFO_FILE which would be populated + with a cloud backend identifier ImageFactory when building + an image with ImageFactory. + ''' + cifile = open(cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE, 'w') + cifile.write(value) + cifile.close() + os.chmod(cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE, 0664) + + +def _remove_cloud_info_file(): + ''' + Remove the test CLOUD_INFO_FILE + ''' + os.remove(cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE) + + +def _write_user_data_files(mount_dir, value): + ''' + Populate the deltacloud_user_data_file the user_data_file + which would be populated with user data. + ''' + deltacloud_user_data_file = mount_dir + '/deltacloud-user-data.txt' + user_data_file = mount_dir + '/user-data.txt' + + udfile = open(deltacloud_user_data_file, 'w') + udfile.write(value) + udfile.close() + os.chmod(deltacloud_user_data_file, 0664) + + udfile = open(user_data_file, 'w') + udfile.write(value) + udfile.close() + os.chmod(user_data_file, 0664) + + +def _remove_user_data_files(mount_dir, + dc_file=True, + non_dc_file=True): + ''' + Remove the test files: deltacloud_user_data_file and + user_data_file + ''' + deltacloud_user_data_file = mount_dir + '/deltacloud-user-data.txt' + user_data_file = mount_dir + '/user-data.txt' + + # Ignore any failures removeing files that are already gone. + if dc_file: + try: + os.remove(deltacloud_user_data_file) + except OSError: + pass + + if non_dc_file: + try: + os.remove(user_data_file) + except OSError: + pass + + +class TestGetCloudType(TestCase): + ''' + Test to exercise method: DataSourceAltCloud.get_cloud_type() + ''' + + def setUp(self): + '''Set up.''' + self.paths = helpers.Paths({'cloud_dir': '/tmp'}) + + def tearDown(self): + # Reset + cloudinit.sources.DataSourceAltCloud.CMD_DMI_SYSTEM = \ + ['dmidecode', '--string', 'system-product-name'] + + def test_rhev(self): + ''' + Test method get_cloud_type() for RHEVm systems. + Forcing dmidecode return to match a RHEVm system: RHEV Hypervisor + ''' + cloudinit.sources.DataSourceAltCloud.CMD_DMI_SYSTEM = \ + ['echo', 'RHEV Hypervisor'] + dsrc = DataSourceAltCloud({}, None, self.paths) + self.assertEquals('RHEV', \ + dsrc.get_cloud_type()) + + def test_vsphere(self): + ''' + Test method get_cloud_type() for vSphere systems. + Forcing dmidecode return to match a vSphere system: RHEV Hypervisor + ''' + cloudinit.sources.DataSourceAltCloud.CMD_DMI_SYSTEM = \ + ['echo', 'VMware Virtual Platform'] + dsrc = DataSourceAltCloud({}, None, self.paths) + self.assertEquals('VSPHERE', \ + dsrc.get_cloud_type()) + + def test_unknown(self): + ''' + Test method get_cloud_type() for unknown systems. + Forcing dmidecode return to match an unrecognized return. + ''' + cloudinit.sources.DataSourceAltCloud.CMD_DMI_SYSTEM = \ + ['echo', 'Unrecognized Platform'] + dsrc = DataSourceAltCloud({}, None, self.paths) + self.assertEquals('UNKNOWN', \ + dsrc.get_cloud_type()) + + def test_exception1(self): + ''' + Test method get_cloud_type() where command dmidecode fails. + ''' + cloudinit.sources.DataSourceAltCloud.CMD_DMI_SYSTEM = \ + ['ls', 'bad command'] + dsrc = DataSourceAltCloud({}, None, self.paths) + self.assertEquals('UNKNOWN', \ + dsrc.get_cloud_type()) + + def test_exception2(self): + ''' + Test method get_cloud_type() where command dmidecode is not available. + ''' + cloudinit.sources.DataSourceAltCloud.CMD_DMI_SYSTEM = \ + ['bad command'] + dsrc = DataSourceAltCloud({}, None, self.paths) + self.assertEquals('UNKNOWN', \ + dsrc.get_cloud_type()) + + +class TestGetDataCloudInfoFile(TestCase): + ''' + Test to exercise method: DataSourceAltCloud.get_data() + With a contrived CLOUD_INFO_FILE + ''' + def setUp(self): + '''Set up.''' + self.paths = helpers.Paths({'cloud_dir': '/tmp'}) + self.cloud_info_file = tempfile.mkstemp()[1] + cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE = \ + self.cloud_info_file + + def tearDown(self): + # Reset + + # Attempt to remove the temp file ignoring errors + try: + os.remove(self.cloud_info_file) + except OSError: + pass + + cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE = \ + '/etc/sysconfig/cloud-info' + + def test_rhev(self): + '''Success Test module get_data() forcing RHEV.''' + + _write_cloud_info_file('RHEV') + dsrc = DataSourceAltCloud({}, None, self.paths) + dsrc.user_data_rhevm = lambda: True + self.assertEquals(True, dsrc.get_data()) + + def test_vsphere(self): + '''Success Test module get_data() forcing VSPHERE.''' + + _write_cloud_info_file('VSPHERE') + dsrc = DataSourceAltCloud({}, None, self.paths) + dsrc.user_data_vsphere = lambda: True + self.assertEquals(True, dsrc.get_data()) + + def test_fail_rhev(self): + '''Failure Test module get_data() forcing RHEV.''' + + _write_cloud_info_file('RHEV') + dsrc = DataSourceAltCloud({}, None, self.paths) + dsrc.user_data_rhevm = lambda: False + self.assertEquals(False, dsrc.get_data()) + + def test_fail_vsphere(self): + '''Failure Test module get_data() forcing VSPHERE.''' + + _write_cloud_info_file('VSPHERE') + dsrc = DataSourceAltCloud({}, None, self.paths) + dsrc.user_data_vsphere = lambda: False + self.assertEquals(False, dsrc.get_data()) + + def test_unrecognized(self): + '''Failure Test module get_data() forcing unrecognized.''' + + _write_cloud_info_file('unrecognized') + dsrc = DataSourceAltCloud({}, None, self.paths) + self.assertEquals(False, dsrc.get_data()) + + +class TestGetDataNoCloudInfoFile(TestCase): + ''' + Test to exercise method: DataSourceAltCloud.get_data() + Without a CLOUD_INFO_FILE + ''' + def setUp(self): + '''Set up.''' + self.paths = helpers.Paths({'cloud_dir': '/tmp'}) + cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE = \ + 'no such file' + + def tearDown(self): + # Reset + cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE = \ + '/etc/sysconfig/cloud-info' + cloudinit.sources.DataSourceAltCloud.CMD_DMI_SYSTEM = \ + ['dmidecode', '--string', 'system-product-name'] + + def test_rhev_no_cloud_file(self): + '''Test No cloud info file module get_data() forcing RHEV.''' + + cloudinit.sources.DataSourceAltCloud.CMD_DMI_SYSTEM = \ + ['echo', 'RHEV Hypervisor'] + dsrc = DataSourceAltCloud({}, None, self.paths) + dsrc.user_data_rhevm = lambda: True + self.assertEquals(True, dsrc.get_data()) + + def test_vsphere_no_cloud_file(self): + '''Test No cloud info file module get_data() forcing VSPHERE.''' + + cloudinit.sources.DataSourceAltCloud.CMD_DMI_SYSTEM = \ + ['echo', 'VMware Virtual Platform'] + dsrc = DataSourceAltCloud({}, None, self.paths) + dsrc.user_data_vsphere = lambda: True + self.assertEquals(True, dsrc.get_data()) + + def test_failure_no_cloud_file(self): + '''Test No cloud info file module get_data() forcing unrecognized.''' + + cloudinit.sources.DataSourceAltCloud.CMD_DMI_SYSTEM = \ + ['echo', 'Unrecognized Platform'] + dsrc = DataSourceAltCloud({}, None, self.paths) + self.assertEquals(False, dsrc.get_data()) + + +class TestUserDataRhevm(TestCase): + ''' + Test to exercise method: DataSourceAltCloud.user_data_rhevm() + ''' + def setUp(self): + '''Set up.''' + self.paths = helpers.Paths({'cloud_dir': '/tmp'}) + self.mount_dir = tempfile.mkdtemp() + + _write_user_data_files(self.mount_dir, 'test user data') + + def tearDown(self): + # Reset + + _remove_user_data_files(self.mount_dir) + + # Attempt to remove the temp dir ignoring errors + try: + shutil.rmtree(self.mount_dir) + except OSError: + pass + + cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE = \ + '/etc/sysconfig/cloud-info' + cloudinit.sources.DataSourceAltCloud.CMD_PROBE_FLOPPY = \ + ['/sbin/modprobe', 'floppy'] + cloudinit.sources.DataSourceAltCloud.CMD_UDEVADM_SETTLE = \ + ['/sbin/udevadm', 'settle', '--quiet', '--timeout=5'] + + def test_mount_cb_fails(self): + '''Test user_data_rhevm() where mount_cb fails.''' + + cloudinit.sources.DataSourceAltCloud.CMD_PROBE_FLOPPY = \ + ['echo', 'modprobe floppy'] + + dsrc = DataSourceAltCloud({}, None, self.paths) + + self.assertEquals(False, dsrc.user_data_rhevm()) + + def test_modprobe_fails(self): + '''Test user_data_rhevm() where modprobe fails.''' + + cloudinit.sources.DataSourceAltCloud.CMD_PROBE_FLOPPY = \ + ['ls', 'modprobe floppy'] + + dsrc = DataSourceAltCloud({}, None, self.paths) + + self.assertEquals(False, dsrc.user_data_rhevm()) + + def test_no_modprobe_cmd(self): + '''Test user_data_rhevm() with no modprobe command.''' + + cloudinit.sources.DataSourceAltCloud.CMD_PROBE_FLOPPY = \ + ['bad command', 'modprobe floppy'] + + dsrc = DataSourceAltCloud({}, None, self.paths) + + self.assertEquals(False, dsrc.user_data_rhevm()) + + def test_udevadm_fails(self): + '''Test user_data_rhevm() where udevadm fails.''' + + cloudinit.sources.DataSourceAltCloud.CMD_UDEVADM_SETTLE = \ + ['ls', 'udevadm floppy'] + + dsrc = DataSourceAltCloud({}, None, self.paths) + + self.assertEquals(False, dsrc.user_data_rhevm()) + + def test_no_udevadm_cmd(self): + '''Test user_data_rhevm() with no udevadm command.''' + + cloudinit.sources.DataSourceAltCloud.CMD_UDEVADM_SETTLE = \ + ['bad command', 'udevadm floppy'] + + dsrc = DataSourceAltCloud({}, None, self.paths) + + self.assertEquals(False, dsrc.user_data_rhevm()) + + +class TestUserDataVsphere(TestCase): + ''' + Test to exercise method: DataSourceAltCloud.user_data_vsphere() + ''' + def setUp(self): + '''Set up.''' + self.paths = helpers.Paths({'cloud_dir': '/tmp'}) + self.mount_dir = tempfile.mkdtemp() + + _write_user_data_files(self.mount_dir, 'test user data') + + def tearDown(self): + # Reset + + _remove_user_data_files(self.mount_dir) + + # Attempt to remove the temp dir ignoring errors + try: + shutil.rmtree(self.mount_dir) + except OSError: + pass + + cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE = \ + '/etc/sysconfig/cloud-info' + + def test_user_data_vsphere(self): + '''Test user_data_vsphere() where mount_cb fails.''' + + cloudinit.sources.DataSourceAltCloud.MEDIA_DIR = self.mount_dir + + dsrc = DataSourceAltCloud({}, None, self.paths) + + self.assertEquals(False, dsrc.user_data_vsphere()) + + +class TestReadUserDataCallback(TestCase): + ''' + Test to exercise method: DataSourceAltCloud.read_user_data_callback() + ''' + def setUp(self): + '''Set up.''' + self.paths = helpers.Paths({'cloud_dir': '/tmp'}) + self.mount_dir = tempfile.mkdtemp() + + _write_user_data_files(self.mount_dir, 'test user data') + + def tearDown(self): + # Reset + + _remove_user_data_files(self.mount_dir) + + # Attempt to remove the temp dir ignoring errors + try: + shutil.rmtree(self.mount_dir) + except OSError: + pass + + def test_callback_both(self): + '''Test read_user_data_callback() with both files.''' + + self.assertEquals('test user data', + read_user_data_callback(self.mount_dir)) + + def test_callback_dc(self): + '''Test read_user_data_callback() with only DC file.''' + + _remove_user_data_files(self.mount_dir, + dc_file=False, + non_dc_file=True) + + self.assertEquals('test user data', + read_user_data_callback(self.mount_dir)) + + def test_callback_non_dc(self): + '''Test read_user_data_callback() with only non-DC file.''' + + _remove_user_data_files(self.mount_dir, + dc_file=True, + non_dc_file=False) + + self.assertEquals('test user data', + read_user_data_callback(self.mount_dir)) + + def test_callback_none(self): + '''Test read_user_data_callback() no files are found.''' + + _remove_user_data_files(self.mount_dir) + self.assertEquals(None, read_user_data_callback(self.mount_dir)) + +# vi: ts=4 expandtab diff --git a/tests/unittests/test_datasource/test_configdrive.py b/tests/unittests/test_datasource/test_configdrive.py new file mode 100644 index 00000000..55573114 --- /dev/null +++ b/tests/unittests/test_datasource/test_configdrive.py @@ -0,0 +1,177 @@ +from copy import copy +import json +import os +import os.path +import shutil +import tempfile +from unittest import TestCase + +from cloudinit.sources import DataSourceConfigDrive as ds +from cloudinit import util + + +PUBKEY = u'ssh-rsa AAAAB3NzaC1....sIkJhq8wdX+4I3A4cYbYP ubuntu@server-460\n' +EC2_META = { + 'ami-id': 'ami-00000001', + 'ami-launch-index': 0, + 'ami-manifest-path': 'FIXME', + 'block-device-mapping': { + 'ami': 'sda1', + 'ephemeral0': 'sda2', + 'root': '/dev/sda1', + 'swap': 'sda3'}, + 'hostname': 'sm-foo-test.novalocal', + 'instance-action': 'none', + 'instance-id': 'i-00000001', + 'instance-type': 'm1.tiny', + 'local-hostname': 'sm-foo-test.novalocal', + 'local-ipv4': None, + 'placement': {'availability-zone': 'nova'}, + 'public-hostname': 'sm-foo-test.novalocal', + 'public-ipv4': '', + 'public-keys': {'0': {'openssh-key': PUBKEY}}, + 'reservation-id': 'r-iru5qm4m', + 'security-groups': ['default'] +} +USER_DATA = '#!/bin/sh\necho This is user data\n' +OSTACK_META = { + 'availability_zone': 'nova', + 'files': [{'content_path': '/content/0000', 'path': '/etc/foo.cfg'}, + {'content_path': '/content/0001', 'path': '/etc/bar/bar.cfg'}], + 'hostname': 'sm-foo-test.novalocal', + 'meta': {'dsmode': 'local', 'my-meta': 'my-value'}, + 'name': 'sm-foo-test', + 'public_keys': {'mykey': PUBKEY}, + 'uuid': 'b0fa911b-69d4-4476-bbe2-1c92bff6535c'} + +CONTENT_0 = 'This is contents of /etc/foo.cfg\n' +CONTENT_1 = '# this is /etc/bar/bar.cfg\n' + +CFG_DRIVE_FILES_V2 = { + 'ec2/2009-04-04/meta-data.json': json.dumps(EC2_META), + 'ec2/2009-04-04/user-data': USER_DATA, + 'ec2/latest/meta-data.json': json.dumps(EC2_META), + 'ec2/latest/user-data': USER_DATA, + 'openstack/2012-08-10/meta_data.json': json.dumps(OSTACK_META), + 'openstack/2012-08-10/user_data': USER_DATA, + 'openstack/content/0000': CONTENT_0, + 'openstack/content/0001': CONTENT_1, + 'openstack/latest/meta_data.json': json.dumps(OSTACK_META), + 'openstack/latest/user_data': USER_DATA} + + +class TestConfigDriveDataSource(TestCase): + + def setUp(self): + super(TestConfigDriveDataSource, self).setUp() + self.tmp = tempfile.mkdtemp() + + def tearDown(self): + try: + shutil.rmtree(self.tmp) + except OSError: + pass + + def test_dir_valid(self): + """Verify a dir is read as such.""" + + populate_dir(self.tmp, CFG_DRIVE_FILES_V2) + + found = ds.read_config_drive_dir(self.tmp) + + expected_md = copy(OSTACK_META) + expected_md['instance-id'] = expected_md['uuid'] + + self.assertEqual(USER_DATA, found['userdata']) + self.assertEqual(expected_md, found['metadata']) + self.assertEqual(found['files']['/etc/foo.cfg'], CONTENT_0) + self.assertEqual(found['files']['/etc/bar/bar.cfg'], CONTENT_1) + + def test_seed_dir_valid_extra(self): + """Verify extra files do not affect datasource validity.""" + + data = copy(CFG_DRIVE_FILES_V2) + data["myfoofile.txt"] = "myfoocontent" + data["openstack/latest/random-file.txt"] = "random-content" + + populate_dir(self.tmp, data) + + found = ds.read_config_drive_dir(self.tmp) + + expected_md = copy(OSTACK_META) + expected_md['instance-id'] = expected_md['uuid'] + + self.assertEqual(expected_md, found['metadata']) + + def test_seed_dir_bad_json_metadata(self): + """Verify that bad json in metadata raises BrokenConfigDriveDir.""" + data = copy(CFG_DRIVE_FILES_V2) + + data["openstack/2012-08-10/meta_data.json"] = "non-json garbage {}" + data["openstack/latest/meta_data.json"] = "non-json garbage {}" + + populate_dir(self.tmp, data) + + self.assertRaises(ds.BrokenConfigDriveDir, + ds.read_config_drive_dir, self.tmp) + + def test_seed_dir_no_configdrive(self): + """Verify that no metadata raises NonConfigDriveDir.""" + + my_d = os.path.join(self.tmp, "non-configdrive") + data = copy(CFG_DRIVE_FILES_V2) + data["myfoofile.txt"] = "myfoocontent" + data["openstack/latest/random-file.txt"] = "random-content" + data["content/foo"] = "foocontent" + + self.assertRaises(ds.NonConfigDriveDir, + ds.read_config_drive_dir, my_d) + + def test_seed_dir_missing(self): + """Verify that missing seed_dir raises NonConfigDriveDir.""" + my_d = os.path.join(self.tmp, "nonexistantdirectory") + self.assertRaises(ds.NonConfigDriveDir, + ds.read_config_drive_dir, my_d) + + def test_find_candidates(self): + devs_with_answers = { + "TYPE=vfat": [], + "TYPE=iso9660": ["/dev/vdb"], + "LABEL=config-2": ["/dev/vdb"], + } + + def my_devs_with(criteria): + return devs_with_answers[criteria] + + try: + orig_find_devs_with = util.find_devs_with + util.find_devs_with = my_devs_with + + self.assertEqual(["/dev/vdb"], ds.find_candidate_devs()) + + # add a vfat item + # zdd reverse sorts after vdb, but config-2 label is preferred + devs_with_answers['TYPE=vfat'] = ["/dev/zdd"] + self.assertEqual(["/dev/vdb", "/dev/zdd"], + ds.find_candidate_devs()) + + # verify that partitions are not considered + devs_with_answers = {"TYPE=vfat": ["/dev/sda1"], + "TYPE=iso9660": [], "LABEL=config-2": ["/dev/vdb3"]} + self.assertEqual([], ds.find_candidate_devs()) + + finally: + util.find_devs_with = orig_find_devs_with + + +def populate_dir(seed_dir, files): + for (name, content) in files.iteritems(): + path = os.path.join(seed_dir, name) + dirname = os.path.dirname(path) + if not os.path.isdir(dirname): + os.makedirs(dirname) + with open(path, "w") as fp: + fp.write(content) + fp.close() + +# vi: ts=4 expandtab diff --git a/tests/unittests/test_datasource/test_maas.py b/tests/unittests/test_datasource/test_maas.py index 8a155f39..85e6add0 100644 --- a/tests/unittests/test_datasource/test_maas.py +++ b/tests/unittests/test_datasource/test_maas.py @@ -1,8 +1,8 @@ -import os from copy import copy +import os -from cloudinit import url_helper from cloudinit.sources import DataSourceMAAS +from cloudinit import url_helper from mocker import MockerTestCase @@ -15,7 +15,7 @@ class TestMAASDataSource(MockerTestCase): self.tmp = self.makeDir() def test_seed_dir_valid(self): - """Verify a valid seeddir is read as such""" + """Verify a valid seeddir is read as such.""" data = {'instance-id': 'i-valid01', 'local-hostname': 'valid01-hostname', @@ -35,7 +35,7 @@ class TestMAASDataSource(MockerTestCase): self.assertFalse(('user-data' in metadata)) def test_seed_dir_valid_extra(self): - """Verify extra files do not affect seed_dir validity """ + """Verify extra files do not affect seed_dir validity.""" data = {'instance-id': 'i-valid-extra', 'local-hostname': 'valid-extra-hostname', @@ -54,7 +54,7 @@ class TestMAASDataSource(MockerTestCase): self.assertFalse(('foo' in metadata)) def test_seed_dir_invalid(self): - """Verify that invalid seed_dir raises MAASSeedDirMalformed""" + """Verify that invalid seed_dir raises MAASSeedDirMalformed.""" valid = {'instance-id': 'i-instanceid', 'local-hostname': 'test-hostname', 'user-data': ''} @@ -78,20 +78,20 @@ class TestMAASDataSource(MockerTestCase): DataSourceMAAS.read_maas_seed_dir, my_d) def test_seed_dir_none(self): - """Verify that empty seed_dir raises MAASSeedDirNone""" + """Verify that empty seed_dir raises MAASSeedDirNone.""" my_d = os.path.join(self.tmp, "valid_empty") self.assertRaises(DataSourceMAAS.MAASSeedDirNone, DataSourceMAAS.read_maas_seed_dir, my_d) def test_seed_dir_missing(self): - """Verify that missing seed_dir raises MAASSeedDirNone""" - self.assertRaises(DataSourceMAAS.MAASSeedDirNone, + """Verify that missing seed_dir raises MAASSeedDirNone.""" + self.assertRaises(DataSourceMAAS.MAASSeedDirNone, DataSourceMAAS.read_maas_seed_dir, os.path.join(self.tmp, "nonexistantdirectory")) def test_seed_url_valid(self): - """Verify that valid seed_url is read as such""" + """Verify that valid seed_url is read as such.""" valid = {'meta-data/instance-id': 'i-instanceid', 'meta-data/local-hostname': 'test-hostname', 'meta-data/public-keys': 'test-hostname', @@ -129,11 +129,11 @@ class TestMAASDataSource(MockerTestCase): valid['meta-data/local-hostname']) def test_seed_url_invalid(self): - """Verify that invalid seed_url raises MAASSeedDirMalformed""" + """Verify that invalid seed_url raises MAASSeedDirMalformed.""" pass def test_seed_url_missing(self): - """Verify seed_url with no found entries raises MAASSeedDirNone""" + """Verify seed_url with no found entries raises MAASSeedDirNone.""" pass diff --git a/tests/unittests/test_distros/test_generic.py b/tests/unittests/test_distros/test_generic.py new file mode 100644 index 00000000..2df4c2f0 --- /dev/null +++ b/tests/unittests/test_distros/test_generic.py @@ -0,0 +1,121 @@ +from mocker import MockerTestCase + +from cloudinit import distros + +unknown_arch_info = { + 'arches': ['default'], + 'failsafe': {'primary': 'http://fs-primary-default', + 'security': 'http://fs-security-default'} +} + +package_mirrors = [ + {'arches': ['i386', 'amd64'], + 'failsafe': {'primary': 'http://fs-primary-intel', + 'security': 'http://fs-security-intel'}, + 'search': { + 'primary': ['http://%(ec2_region)s.ec2/', + 'http://%(availability_zone)s.clouds/'], + 'security': ['http://security-mirror1-intel', + 'http://security-mirror2-intel']}}, + {'arches': ['armhf', 'armel'], + 'failsafe': {'primary': 'http://fs-primary-arm', + 'security': 'http://fs-security-arm'}}, + unknown_arch_info +] + +gpmi = distros._get_package_mirror_info # pylint: disable=W0212 +gapmi = distros._get_arch_package_mirror_info # pylint: disable=W0212 + + +class TestGenericDistro(MockerTestCase): + + def return_first(self, mlist): + if not mlist: + return None + return mlist[0] + + def return_second(self, mlist): + if not mlist: + return None + return mlist[1] + + def return_none(self, _mlist): + return None + + def return_last(self, mlist): + if not mlist: + return None + return(mlist[-1]) + + def setUp(self): + super(TestGenericDistro, self).setUp() + # Make a temp directoy for tests to use. + self.tmp = self.makeDir() + + def test_arch_package_mirror_info_unknown(self): + """for an unknown arch, we should get back that with arch 'default'.""" + arch_mirrors = gapmi(package_mirrors, arch="unknown") + self.assertEqual(unknown_arch_info, arch_mirrors) + + def test_arch_package_mirror_info_known(self): + arch_mirrors = gapmi(package_mirrors, arch="amd64") + self.assertEqual(package_mirrors[0], arch_mirrors) + + def test_get_package_mirror_info_az_ec2(self): + arch_mirrors = gapmi(package_mirrors, arch="amd64") + + results = gpmi(arch_mirrors, availability_zone="us-east-1a", + mirror_filter=self.return_first) + self.assertEqual(results, + {'primary': 'http://us-east-1.ec2/', + 'security': 'http://security-mirror1-intel'}) + + results = gpmi(arch_mirrors, availability_zone="us-east-1a", + mirror_filter=self.return_second) + self.assertEqual(results, + {'primary': 'http://us-east-1a.clouds/', + 'security': 'http://security-mirror2-intel'}) + + results = gpmi(arch_mirrors, availability_zone="us-east-1a", + mirror_filter=self.return_none) + self.assertEqual(results, package_mirrors[0]['failsafe']) + + def test_get_package_mirror_info_az_non_ec2(self): + arch_mirrors = gapmi(package_mirrors, arch="amd64") + + results = gpmi(arch_mirrors, availability_zone="nova.cloudvendor", + mirror_filter=self.return_first) + self.assertEqual(results, + {'primary': 'http://nova.cloudvendor.clouds/', + 'security': 'http://security-mirror1-intel'}) + + results = gpmi(arch_mirrors, availability_zone="nova.cloudvendor", + mirror_filter=self.return_last) + self.assertEqual(results, + {'primary': 'http://nova.cloudvendor.clouds/', + 'security': 'http://security-mirror2-intel'}) + + def test_get_package_mirror_info_none(self): + arch_mirrors = gapmi(package_mirrors, arch="amd64") + + # because both search entries here replacement based on + # availability-zone, the filter will be called with an empty list and + # failsafe should be taken. + results = gpmi(arch_mirrors, availability_zone=None, + mirror_filter=self.return_first) + self.assertEqual(results, + {'primary': 'http://fs-primary-intel', + 'security': 'http://security-mirror1-intel'}) + + results = gpmi(arch_mirrors, availability_zone=None, + mirror_filter=self.return_last) + self.assertEqual(results, + {'primary': 'http://fs-primary-intel', + 'security': 'http://security-mirror2-intel'}) + + +#def _get_package_mirror_info(mirror_info, availability_zone=None, +# mirror_filter=util.search_for_mirror): + + +# vi: ts=4 expandtab diff --git a/tests/unittests/test_filters/test_launch_index.py b/tests/unittests/test_filters/test_launch_index.py new file mode 100644 index 00000000..7ca7cbb6 --- /dev/null +++ b/tests/unittests/test_filters/test_launch_index.py @@ -0,0 +1,134 @@ +import copy + +import helpers as th + +import itertools + +from cloudinit.filters import launch_index +from cloudinit import user_data as ud +from cloudinit import util + + +def count_messages(root): + am = 0 + for m in root.walk(): + if ud.is_skippable(m): + continue + am += 1 + return am + + +class TestLaunchFilter(th.ResourceUsingTestCase): + + def assertCounts(self, message, expected_counts): + orig_message = copy.deepcopy(message) + for (index, count) in expected_counts.items(): + index = util.safe_int(index) + filtered_message = launch_index.Filter(index).apply(message) + self.assertEquals(count_messages(filtered_message), count) + # Ensure original message still ok/not modified + self.assertTrue(self.equivalentMessage(message, orig_message)) + + def equivalentMessage(self, msg1, msg2): + msg1_count = count_messages(msg1) + msg2_count = count_messages(msg2) + if msg1_count != msg2_count: + return False + # Do some basic payload checking + msg1_msgs = [m for m in msg1.walk()] + msg1_msgs = [m for m in + itertools.ifilterfalse(ud.is_skippable, msg1_msgs)] + msg2_msgs = [m for m in msg2.walk()] + msg2_msgs = [m for m in + itertools.ifilterfalse(ud.is_skippable, msg2_msgs)] + for i in range(0, len(msg2_msgs)): + m1_msg = msg1_msgs[i] + m2_msg = msg2_msgs[i] + if m1_msg.get_charset() != m2_msg.get_charset(): + return False + if m1_msg.is_multipart() != m2_msg.is_multipart(): + return False + m1_py = m1_msg.get_payload(decode=True) + m2_py = m2_msg.get_payload(decode=True) + if m1_py != m2_py: + return False + return True + + def testMultiEmailIndex(self): + test_data = self.readResource('filter_cloud_multipart_2.email') + ud_proc = ud.UserDataProcessor(self.getCloudPaths()) + message = ud_proc.process(test_data) + self.assertTrue(count_messages(message) > 0) + # This file should have the following + # indexes -> amount mapping in it + expected_counts = { + 3: 1, + 2: 2, + None: 3, + -1: 0, + } + self.assertCounts(message, expected_counts) + + def testHeaderEmailIndex(self): + test_data = self.readResource('filter_cloud_multipart_header.email') + ud_proc = ud.UserDataProcessor(self.getCloudPaths()) + message = ud_proc.process(test_data) + self.assertTrue(count_messages(message) > 0) + # This file should have the following + # indexes -> amount mapping in it + expected_counts = { + 5: 1, + -1: 0, + 'c': 1, + None: 1, + } + self.assertCounts(message, expected_counts) + + def testConfigEmailIndex(self): + test_data = self.readResource('filter_cloud_multipart_1.email') + ud_proc = ud.UserDataProcessor(self.getCloudPaths()) + message = ud_proc.process(test_data) + self.assertTrue(count_messages(message) > 0) + # This file should have the following + # indexes -> amount mapping in it + expected_counts = { + 2: 1, + -1: 0, + None: 1, + } + self.assertCounts(message, expected_counts) + + def testNoneIndex(self): + test_data = self.readResource('filter_cloud_multipart.yaml') + ud_proc = ud.UserDataProcessor(self.getCloudPaths()) + message = ud_proc.process(test_data) + start_count = count_messages(message) + self.assertTrue(start_count > 0) + filtered_message = launch_index.Filter(None).apply(message) + self.assertTrue(self.equivalentMessage(message, filtered_message)) + + def testIndexes(self): + test_data = self.readResource('filter_cloud_multipart.yaml') + ud_proc = ud.UserDataProcessor(self.getCloudPaths()) + message = ud_proc.process(test_data) + start_count = count_messages(message) + self.assertTrue(start_count > 0) + # This file should have the following + # indexes -> amount mapping in it + expected_counts = { + 2: 2, + 3: 2, + 1: 2, + 0: 1, + 4: 1, + 7: 0, + -1: 0, + 100: 0, + # None should just give all back + None: start_count, + # Non ints should be ignored + 'c': start_count, + # Strings should be converted + '1': 2, + } + self.assertCounts(message, expected_counts) diff --git a/tests/unittests/test_handler/test_handler_ca_certs.py b/tests/unittests/test_handler/test_handler_ca_certs.py index 948de4c4..d3df5c50 100644 --- a/tests/unittests/test_handler/test_handler_ca_certs.py +++ b/tests/unittests/test_handler/test_handler_ca_certs.py @@ -1,8 +1,8 @@ from mocker import MockerTestCase -from cloudinit import util from cloudinit import cloud from cloudinit import helpers +from cloudinit import util from cloudinit.config import cc_ca_certs @@ -64,7 +64,7 @@ class TestConfig(MockerTestCase): cc_ca_certs.handle(self.name, config, self.cloud, self.log, self.args) def test_empty_trusted_list(self): - """Test that no certificate are written if 'trusted' list is empty""" + """Test that no certificate are written if 'trusted' list is empty.""" config = {"ca-certs": {"trusted": []}} # No functions should be called @@ -74,7 +74,7 @@ class TestConfig(MockerTestCase): cc_ca_certs.handle(self.name, config, self.cloud, self.log, self.args) def test_single_trusted(self): - """Test that a single cert gets passed to add_ca_certs""" + """Test that a single cert gets passed to add_ca_certs.""" config = {"ca-certs": {"trusted": ["CERT1"]}} self.mock_add(self.paths, ["CERT1"]) @@ -84,7 +84,7 @@ class TestConfig(MockerTestCase): cc_ca_certs.handle(self.name, config, self.cloud, self.log, self.args) def test_multiple_trusted(self): - """Test that multiple certs get passed to add_ca_certs""" + """Test that multiple certs get passed to add_ca_certs.""" config = {"ca-certs": {"trusted": ["CERT1", "CERT2"]}} self.mock_add(self.paths, ["CERT1", "CERT2"]) @@ -94,7 +94,7 @@ class TestConfig(MockerTestCase): cc_ca_certs.handle(self.name, config, self.cloud, self.log, self.args) def test_remove_default_ca_certs(self): - """Test remove_defaults works as expected""" + """Test remove_defaults works as expected.""" config = {"ca-certs": {"remove-defaults": True}} self.mock_remove(self.paths) @@ -104,7 +104,7 @@ class TestConfig(MockerTestCase): cc_ca_certs.handle(self.name, config, self.cloud, self.log, self.args) def test_no_remove_defaults_if_false(self): - """Test remove_defaults is not called when config value is False""" + """Test remove_defaults is not called when config value is False.""" config = {"ca-certs": {"remove-defaults": False}} self.mock_update() @@ -113,7 +113,7 @@ class TestConfig(MockerTestCase): cc_ca_certs.handle(self.name, config, self.cloud, self.log, self.args) def test_correct_order_for_remove_then_add(self): - """Test remove_defaults is not called when config value is False""" + """Test remove_defaults is not called when config value is False.""" config = {"ca-certs": {"remove-defaults": True, "trusted": ["CERT1"]}} self.mock_remove(self.paths) @@ -139,7 +139,7 @@ class TestAddCaCerts(MockerTestCase): cc_ca_certs.add_ca_certs(self.paths, []) def test_single_cert(self): - """Test adding a single certificate to the trusted CAs""" + """Test adding a single certificate to the trusted CAs.""" cert = "CERT1\nLINE2\nLINE3" mock_write = self.mocker.replace(util.write_file, passthrough=False) @@ -152,7 +152,7 @@ class TestAddCaCerts(MockerTestCase): cc_ca_certs.add_ca_certs(self.paths, [cert]) def test_multiple_certs(self): - """Test adding multiple certificates to the trusted CAs""" + """Test adding multiple certificates to the trusted CAs.""" certs = ["CERT1\nLINE2\nLINE3", "CERT2\nLINE2\nLINE3"] expected_cert_file = "\n".join(certs) diff --git a/tests/unittests/test_userdata.py b/tests/unittests/test_userdata.py index fbbf07f2..82a4c555 100644 --- a/tests/unittests/test_userdata.py +++ b/tests/unittests/test_userdata.py @@ -1,4 +1,4 @@ -"""Tests for handling of userdata within cloud init""" +"""Tests for handling of userdata within cloud init.""" import StringIO @@ -54,7 +54,7 @@ class TestConsumeUserData(MockerTestCase): return log_file def test_unhandled_type_warning(self): - """Raw text without magic is ignored but shows warning""" + """Raw text without magic is ignored but shows warning.""" ci = stages.Init() data = "arbitrary text\n" ci.datasource = FakeDataSource(data) @@ -70,7 +70,7 @@ class TestConsumeUserData(MockerTestCase): log_file.getvalue()) def test_mime_text_plain(self): - """Mime message of type text/plain is ignored but shows warning""" + """Mime message of type text/plain is ignored but shows warning.""" ci = stages.Init() message = MIMEBase("text", "plain") message.set_payload("Just text") @@ -86,9 +86,8 @@ class TestConsumeUserData(MockerTestCase): "Unhandled unknown content-type (text/plain)", log_file.getvalue()) - def test_shellscript(self): - """Raw text starting #!/bin/sh is treated as script""" + """Raw text starting #!/bin/sh is treated as script.""" ci = stages.Init() script = "#!/bin/sh\necho hello\n" ci.datasource = FakeDataSource(script) @@ -104,7 +103,7 @@ class TestConsumeUserData(MockerTestCase): self.assertEqual("", log_file.getvalue()) def test_mime_text_x_shellscript(self): - """Mime message of type text/x-shellscript is treated as script""" + """Mime message of type text/x-shellscript is treated as script.""" ci = stages.Init() script = "#!/bin/sh\necho hello\n" message = MIMEBase("text", "x-shellscript") @@ -122,7 +121,7 @@ class TestConsumeUserData(MockerTestCase): self.assertEqual("", log_file.getvalue()) def test_mime_text_plain_shell(self): - """Mime type text/plain starting #!/bin/sh is treated as script""" + """Mime type text/plain starting #!/bin/sh is treated as script.""" ci = stages.Init() script = "#!/bin/sh\necho hello\n" message = MIMEBase("text", "plain") diff --git a/tests/unittests/test_util.py b/tests/unittests/test_util.py index 19f66cc4..15fcbd26 100644 --- a/tests/unittests/test_util.py +++ b/tests/unittests/test_util.py @@ -1,11 +1,11 @@ import os import stat -from unittest import TestCase from mocker import MockerTestCase +from unittest import TestCase -from cloudinit import util from cloudinit import importer +from cloudinit import util class FakeSelinux(object): |