diff options
Diffstat (limited to 'tests')
-rw-r--r-- | tests/unittests/test__init__.py | 35 | ||||
-rw-r--r-- | tests/unittests/test_builtin_handlers.py | 5 | ||||
-rw-r--r-- | tests/unittests/test_datasource/test_altcloud.py | 445 | ||||
-rw-r--r-- | tests/unittests/test_datasource/test_configdrive.py | 177 | ||||
-rw-r--r-- | tests/unittests/test_datasource/test_maas.py | 24 | ||||
-rw-r--r-- | tests/unittests/test_distros/test_generic.py | 121 | ||||
-rw-r--r-- | tests/unittests/test_handler/test_handler_ca_certs.py | 32 | ||||
-rw-r--r-- | tests/unittests/test_userdata.py | 17 | ||||
-rw-r--r-- | tests/unittests/test_util.py | 8 |
9 files changed, 804 insertions, 60 deletions
diff --git a/tests/unittests/test__init__.py b/tests/unittests/test__init__.py index af18955d..ac082076 100644 --- a/tests/unittests/test__init__.py +++ b/tests/unittests/test__init__.py @@ -1,6 +1,6 @@ -import StringIO import logging import os +import StringIO import sys from mocker import MockerTestCase, ANY, ARGS, KWARGS @@ -50,24 +50,27 @@ class TestWalkerHandleHandler(MockerTestCase): self.payload = "dummy payload" # Mock the write_file function - write_file_mock = self.mocker.replace(util.write_file, passthrough=False) + write_file_mock = self.mocker.replace(util.write_file, + passthrough=False) write_file_mock(expected_file_fullname, self.payload, 0600) def test_no_errors(self): """Payload gets written to file and added to C{pdata}.""" - import_mock = self.mocker.replace(importer.import_module, passthrough=False) + import_mock = self.mocker.replace(importer.import_module, + passthrough=False) import_mock(self.expected_module_name) self.mocker.result(self.module_fake) self.mocker.replay() - + handlers.walker_handle_handler(self.data, self.ctype, self.filename, self.payload) - + self.assertEqual(1, self.data["handlercount"]) - + def test_import_error(self): - """Module import errors are logged. No handler added to C{pdata}""" - import_mock = self.mocker.replace(importer.import_module, passthrough=False) + """Module import errors are logged. No handler added to C{pdata}.""" + import_mock = self.mocker.replace(importer.import_module, + passthrough=False) import_mock(self.expected_module_name) self.mocker.throw(ImportError()) self.mocker.replay() @@ -78,8 +81,9 @@ class TestWalkerHandleHandler(MockerTestCase): self.assertEqual(0, self.data["handlercount"]) def test_attribute_error(self): - """Attribute errors are logged. No handler added to C{pdata}""" - import_mock = self.mocker.replace(importer.import_module, passthrough=False) + """Attribute errors are logged. No handler added to C{pdata}.""" + import_mock = self.mocker.replace(importer.import_module, + passthrough=False) import_mock(self.expected_module_name) self.mocker.result(self.module_fake) self.mocker.throw(AttributeError()) @@ -152,7 +156,7 @@ class TestHandlerHandlePart(MockerTestCase): self.payload, self.frequency) def test_no_handle_when_modfreq_once(self): - """C{handle_part} is not called if frequency is once""" + """C{handle_part} is not called if frequency is once.""" self.frequency = "once" mod_mock = self.mocker.mock() getattr(mod_mock, "frequency") @@ -185,13 +189,15 @@ class TestCmdlineUrl(MockerTestCase): payload = "0" cmdline = "ro %s=%s bar=1" % (key, url) - mock_readurl = self.mocker.replace(url_helper.readurl, passthrough=False) + mock_readurl = self.mocker.replace(url_helper.readurl, + passthrough=False) mock_readurl(url) self.mocker.result(url_helper.UrlResponse(200, payload)) self.mocker.replay() self.assertEqual((key, url, None), - util.get_cmdline_url(names=[key], starts="xxxxxx", cmdline=cmdline)) + util.get_cmdline_url(names=[key], starts="xxxxxx", + cmdline=cmdline)) def test_valid_content(self): url = "http://example.com/foo" @@ -199,7 +205,8 @@ class TestCmdlineUrl(MockerTestCase): payload = "xcloud-config\nmydata: foo\nbar: wark\n" cmdline = "ro %s=%s bar=1" % (key, url) - mock_readurl = self.mocker.replace(url_helper.readurl, passthrough=False) + mock_readurl = self.mocker.replace(url_helper.readurl, + passthrough=False) mock_readurl(url) self.mocker.result(url_helper.UrlResponse(200, payload)) self.mocker.replay() diff --git a/tests/unittests/test_builtin_handlers.py b/tests/unittests/test_builtin_handlers.py index 84d85d4d..ebc0bd51 100644 --- a/tests/unittests/test_builtin_handlers.py +++ b/tests/unittests/test_builtin_handlers.py @@ -1,4 +1,4 @@ -"""Tests of the built-in user data handlers""" +"""Tests of the built-in user data handlers.""" import os @@ -6,7 +6,6 @@ from mocker import MockerTestCase from cloudinit import handlers from cloudinit import helpers -from cloudinit import util from cloudinit.handlers import upstart_job @@ -34,7 +33,7 @@ class TestBuiltins(MockerTestCase): None, None, None) self.assertEquals(0, len(os.listdir(up_root))) - def test_upstart_frequency_single(self): + def test_upstart_frequency_single(self): c_root = self.makeDir() up_root = self.makeDir() paths = helpers.Paths({ diff --git a/tests/unittests/test_datasource/test_altcloud.py b/tests/unittests/test_datasource/test_altcloud.py new file mode 100644 index 00000000..bda61c7e --- /dev/null +++ b/tests/unittests/test_datasource/test_altcloud.py @@ -0,0 +1,445 @@ +# vi: ts=4 expandtab +# +# Copyright (C) 2009-2010 Canonical Ltd. +# Copyright (C) 2012 Hewlett-Packard Development Company, L.P. +# Copyright (C) 2012 Yahoo! Inc. +# +# Author: Joe VLcek <JVLcek@RedHat.com> +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 3, as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +''' +This test file exercises the code in sources DataSourceAltCloud.py +''' + +import os +import shutil +import tempfile + +from cloudinit import helpers +from unittest import TestCase + +# Get the cloudinit.sources.DataSourceAltCloud import items needed. +import cloudinit.sources.DataSourceAltCloud +from cloudinit.sources.DataSourceAltCloud import DataSourceAltCloud +from cloudinit.sources.DataSourceAltCloud import read_user_data_callback + + +def _write_cloud_info_file(value): + ''' + Populate the CLOUD_INFO_FILE which would be populated + with a cloud backend identifier ImageFactory when building + an image with ImageFactory. + ''' + cifile = open(cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE, 'w') + cifile.write(value) + cifile.close() + os.chmod(cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE, 0664) + + +def _remove_cloud_info_file(): + ''' + Remove the test CLOUD_INFO_FILE + ''' + os.remove(cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE) + + +def _write_user_data_files(mount_dir, value): + ''' + Populate the deltacloud_user_data_file the user_data_file + which would be populated with user data. + ''' + deltacloud_user_data_file = mount_dir + '/deltacloud-user-data.txt' + user_data_file = mount_dir + '/user-data.txt' + + udfile = open(deltacloud_user_data_file, 'w') + udfile.write(value) + udfile.close() + os.chmod(deltacloud_user_data_file, 0664) + + udfile = open(user_data_file, 'w') + udfile.write(value) + udfile.close() + os.chmod(user_data_file, 0664) + + +def _remove_user_data_files(mount_dir, + dc_file=True, + non_dc_file=True): + ''' + Remove the test files: deltacloud_user_data_file and + user_data_file + ''' + deltacloud_user_data_file = mount_dir + '/deltacloud-user-data.txt' + user_data_file = mount_dir + '/user-data.txt' + + # Ignore any failures removeing files that are already gone. + if dc_file: + try: + os.remove(deltacloud_user_data_file) + except OSError: + pass + + if non_dc_file: + try: + os.remove(user_data_file) + except OSError: + pass + + +class TestGetCloudType(TestCase): + ''' + Test to exercise method: DataSourceAltCloud.get_cloud_type() + ''' + + def setUp(self): + '''Set up.''' + self.paths = helpers.Paths({'cloud_dir': '/tmp'}) + + def tearDown(self): + # Reset + cloudinit.sources.DataSourceAltCloud.CMD_DMI_SYSTEM = \ + ['dmidecode', '--string', 'system-product-name'] + + def test_rhev(self): + ''' + Test method get_cloud_type() for RHEVm systems. + Forcing dmidecode return to match a RHEVm system: RHEV Hypervisor + ''' + cloudinit.sources.DataSourceAltCloud.CMD_DMI_SYSTEM = \ + ['echo', 'RHEV Hypervisor'] + dsrc = DataSourceAltCloud({}, None, self.paths) + self.assertEquals('RHEV', \ + dsrc.get_cloud_type()) + + def test_vsphere(self): + ''' + Test method get_cloud_type() for vSphere systems. + Forcing dmidecode return to match a vSphere system: RHEV Hypervisor + ''' + cloudinit.sources.DataSourceAltCloud.CMD_DMI_SYSTEM = \ + ['echo', 'VMware Virtual Platform'] + dsrc = DataSourceAltCloud({}, None, self.paths) + self.assertEquals('VSPHERE', \ + dsrc.get_cloud_type()) + + def test_unknown(self): + ''' + Test method get_cloud_type() for unknown systems. + Forcing dmidecode return to match an unrecognized return. + ''' + cloudinit.sources.DataSourceAltCloud.CMD_DMI_SYSTEM = \ + ['echo', 'Unrecognized Platform'] + dsrc = DataSourceAltCloud({}, None, self.paths) + self.assertEquals('UNKNOWN', \ + dsrc.get_cloud_type()) + + def test_exception1(self): + ''' + Test method get_cloud_type() where command dmidecode fails. + ''' + cloudinit.sources.DataSourceAltCloud.CMD_DMI_SYSTEM = \ + ['ls', 'bad command'] + dsrc = DataSourceAltCloud({}, None, self.paths) + self.assertEquals('UNKNOWN', \ + dsrc.get_cloud_type()) + + def test_exception2(self): + ''' + Test method get_cloud_type() where command dmidecode is not available. + ''' + cloudinit.sources.DataSourceAltCloud.CMD_DMI_SYSTEM = \ + ['bad command'] + dsrc = DataSourceAltCloud({}, None, self.paths) + self.assertEquals('UNKNOWN', \ + dsrc.get_cloud_type()) + + +class TestGetDataCloudInfoFile(TestCase): + ''' + Test to exercise method: DataSourceAltCloud.get_data() + With a contrived CLOUD_INFO_FILE + ''' + def setUp(self): + '''Set up.''' + self.paths = helpers.Paths({'cloud_dir': '/tmp'}) + self.cloud_info_file = tempfile.mkstemp()[1] + cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE = \ + self.cloud_info_file + + def tearDown(self): + # Reset + + # Attempt to remove the temp file ignoring errors + try: + os.remove(self.cloud_info_file) + except OSError: + pass + + cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE = \ + '/etc/sysconfig/cloud-info' + + def test_rhev(self): + '''Success Test module get_data() forcing RHEV.''' + + _write_cloud_info_file('RHEV') + dsrc = DataSourceAltCloud({}, None, self.paths) + dsrc.user_data_rhevm = lambda: True + self.assertEquals(True, dsrc.get_data()) + + def test_vsphere(self): + '''Success Test module get_data() forcing VSPHERE.''' + + _write_cloud_info_file('VSPHERE') + dsrc = DataSourceAltCloud({}, None, self.paths) + dsrc.user_data_vsphere = lambda: True + self.assertEquals(True, dsrc.get_data()) + + def test_fail_rhev(self): + '''Failure Test module get_data() forcing RHEV.''' + + _write_cloud_info_file('RHEV') + dsrc = DataSourceAltCloud({}, None, self.paths) + dsrc.user_data_rhevm = lambda: False + self.assertEquals(False, dsrc.get_data()) + + def test_fail_vsphere(self): + '''Failure Test module get_data() forcing VSPHERE.''' + + _write_cloud_info_file('VSPHERE') + dsrc = DataSourceAltCloud({}, None, self.paths) + dsrc.user_data_vsphere = lambda: False + self.assertEquals(False, dsrc.get_data()) + + def test_unrecognized(self): + '''Failure Test module get_data() forcing unrecognized.''' + + _write_cloud_info_file('unrecognized') + dsrc = DataSourceAltCloud({}, None, self.paths) + self.assertEquals(False, dsrc.get_data()) + + +class TestGetDataNoCloudInfoFile(TestCase): + ''' + Test to exercise method: DataSourceAltCloud.get_data() + Without a CLOUD_INFO_FILE + ''' + def setUp(self): + '''Set up.''' + self.paths = helpers.Paths({'cloud_dir': '/tmp'}) + cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE = \ + 'no such file' + + def tearDown(self): + # Reset + cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE = \ + '/etc/sysconfig/cloud-info' + cloudinit.sources.DataSourceAltCloud.CMD_DMI_SYSTEM = \ + ['dmidecode', '--string', 'system-product-name'] + + def test_rhev_no_cloud_file(self): + '''Test No cloud info file module get_data() forcing RHEV.''' + + cloudinit.sources.DataSourceAltCloud.CMD_DMI_SYSTEM = \ + ['echo', 'RHEV Hypervisor'] + dsrc = DataSourceAltCloud({}, None, self.paths) + dsrc.user_data_rhevm = lambda: True + self.assertEquals(True, dsrc.get_data()) + + def test_vsphere_no_cloud_file(self): + '''Test No cloud info file module get_data() forcing VSPHERE.''' + + cloudinit.sources.DataSourceAltCloud.CMD_DMI_SYSTEM = \ + ['echo', 'VMware Virtual Platform'] + dsrc = DataSourceAltCloud({}, None, self.paths) + dsrc.user_data_vsphere = lambda: True + self.assertEquals(True, dsrc.get_data()) + + def test_failure_no_cloud_file(self): + '''Test No cloud info file module get_data() forcing unrecognized.''' + + cloudinit.sources.DataSourceAltCloud.CMD_DMI_SYSTEM = \ + ['echo', 'Unrecognized Platform'] + dsrc = DataSourceAltCloud({}, None, self.paths) + self.assertEquals(False, dsrc.get_data()) + + +class TestUserDataRhevm(TestCase): + ''' + Test to exercise method: DataSourceAltCloud.user_data_rhevm() + ''' + def setUp(self): + '''Set up.''' + self.paths = helpers.Paths({'cloud_dir': '/tmp'}) + self.mount_dir = tempfile.mkdtemp() + + _write_user_data_files(self.mount_dir, 'test user data') + + def tearDown(self): + # Reset + + _remove_user_data_files(self.mount_dir) + + # Attempt to remove the temp dir ignoring errors + try: + shutil.rmtree(self.mount_dir) + except OSError: + pass + + cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE = \ + '/etc/sysconfig/cloud-info' + cloudinit.sources.DataSourceAltCloud.CMD_PROBE_FLOPPY = \ + ['/sbin/modprobe', 'floppy'] + cloudinit.sources.DataSourceAltCloud.CMD_UDEVADM_SETTLE = \ + ['/sbin/udevadm', 'settle', '--quiet', '--timeout=5'] + + def test_mount_cb_fails(self): + '''Test user_data_rhevm() where mount_cb fails.''' + + cloudinit.sources.DataSourceAltCloud.CMD_PROBE_FLOPPY = \ + ['echo', 'modprobe floppy'] + + dsrc = DataSourceAltCloud({}, None, self.paths) + + self.assertEquals(False, dsrc.user_data_rhevm()) + + def test_modprobe_fails(self): + '''Test user_data_rhevm() where modprobe fails.''' + + cloudinit.sources.DataSourceAltCloud.CMD_PROBE_FLOPPY = \ + ['ls', 'modprobe floppy'] + + dsrc = DataSourceAltCloud({}, None, self.paths) + + self.assertEquals(False, dsrc.user_data_rhevm()) + + def test_no_modprobe_cmd(self): + '''Test user_data_rhevm() with no modprobe command.''' + + cloudinit.sources.DataSourceAltCloud.CMD_PROBE_FLOPPY = \ + ['bad command', 'modprobe floppy'] + + dsrc = DataSourceAltCloud({}, None, self.paths) + + self.assertEquals(False, dsrc.user_data_rhevm()) + + def test_udevadm_fails(self): + '''Test user_data_rhevm() where udevadm fails.''' + + cloudinit.sources.DataSourceAltCloud.CMD_UDEVADM_SETTLE = \ + ['ls', 'udevadm floppy'] + + dsrc = DataSourceAltCloud({}, None, self.paths) + + self.assertEquals(False, dsrc.user_data_rhevm()) + + def test_no_udevadm_cmd(self): + '''Test user_data_rhevm() with no udevadm command.''' + + cloudinit.sources.DataSourceAltCloud.CMD_UDEVADM_SETTLE = \ + ['bad command', 'udevadm floppy'] + + dsrc = DataSourceAltCloud({}, None, self.paths) + + self.assertEquals(False, dsrc.user_data_rhevm()) + + +class TestUserDataVsphere(TestCase): + ''' + Test to exercise method: DataSourceAltCloud.user_data_vsphere() + ''' + def setUp(self): + '''Set up.''' + self.paths = helpers.Paths({'cloud_dir': '/tmp'}) + self.mount_dir = tempfile.mkdtemp() + + _write_user_data_files(self.mount_dir, 'test user data') + + def tearDown(self): + # Reset + + _remove_user_data_files(self.mount_dir) + + # Attempt to remove the temp dir ignoring errors + try: + shutil.rmtree(self.mount_dir) + except OSError: + pass + + cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE = \ + '/etc/sysconfig/cloud-info' + + def test_user_data_vsphere(self): + '''Test user_data_vsphere() where mount_cb fails.''' + + cloudinit.sources.DataSourceAltCloud.MEDIA_DIR = self.mount_dir + + dsrc = DataSourceAltCloud({}, None, self.paths) + + self.assertEquals(False, dsrc.user_data_vsphere()) + + +class TestReadUserDataCallback(TestCase): + ''' + Test to exercise method: DataSourceAltCloud.read_user_data_callback() + ''' + def setUp(self): + '''Set up.''' + self.paths = helpers.Paths({'cloud_dir': '/tmp'}) + self.mount_dir = tempfile.mkdtemp() + + _write_user_data_files(self.mount_dir, 'test user data') + + def tearDown(self): + # Reset + + _remove_user_data_files(self.mount_dir) + + # Attempt to remove the temp dir ignoring errors + try: + shutil.rmtree(self.mount_dir) + except OSError: + pass + + def test_callback_both(self): + '''Test read_user_data_callback() with both files.''' + + self.assertEquals('test user data', + read_user_data_callback(self.mount_dir)) + + def test_callback_dc(self): + '''Test read_user_data_callback() with only DC file.''' + + _remove_user_data_files(self.mount_dir, + dc_file=False, + non_dc_file=True) + + self.assertEquals('test user data', + read_user_data_callback(self.mount_dir)) + + def test_callback_non_dc(self): + '''Test read_user_data_callback() with only non-DC file.''' + + _remove_user_data_files(self.mount_dir, + dc_file=True, + non_dc_file=False) + + self.assertEquals('test user data', + read_user_data_callback(self.mount_dir)) + + def test_callback_none(self): + '''Test read_user_data_callback() no files are found.''' + + _remove_user_data_files(self.mount_dir) + self.assertEquals(None, read_user_data_callback(self.mount_dir)) + +# vi: ts=4 expandtab diff --git a/tests/unittests/test_datasource/test_configdrive.py b/tests/unittests/test_datasource/test_configdrive.py new file mode 100644 index 00000000..55573114 --- /dev/null +++ b/tests/unittests/test_datasource/test_configdrive.py @@ -0,0 +1,177 @@ +from copy import copy +import json +import os +import os.path +import shutil +import tempfile +from unittest import TestCase + +from cloudinit.sources import DataSourceConfigDrive as ds +from cloudinit import util + + +PUBKEY = u'ssh-rsa AAAAB3NzaC1....sIkJhq8wdX+4I3A4cYbYP ubuntu@server-460\n' +EC2_META = { + 'ami-id': 'ami-00000001', + 'ami-launch-index': 0, + 'ami-manifest-path': 'FIXME', + 'block-device-mapping': { + 'ami': 'sda1', + 'ephemeral0': 'sda2', + 'root': '/dev/sda1', + 'swap': 'sda3'}, + 'hostname': 'sm-foo-test.novalocal', + 'instance-action': 'none', + 'instance-id': 'i-00000001', + 'instance-type': 'm1.tiny', + 'local-hostname': 'sm-foo-test.novalocal', + 'local-ipv4': None, + 'placement': {'availability-zone': 'nova'}, + 'public-hostname': 'sm-foo-test.novalocal', + 'public-ipv4': '', + 'public-keys': {'0': {'openssh-key': PUBKEY}}, + 'reservation-id': 'r-iru5qm4m', + 'security-groups': ['default'] +} +USER_DATA = '#!/bin/sh\necho This is user data\n' +OSTACK_META = { + 'availability_zone': 'nova', + 'files': [{'content_path': '/content/0000', 'path': '/etc/foo.cfg'}, + {'content_path': '/content/0001', 'path': '/etc/bar/bar.cfg'}], + 'hostname': 'sm-foo-test.novalocal', + 'meta': {'dsmode': 'local', 'my-meta': 'my-value'}, + 'name': 'sm-foo-test', + 'public_keys': {'mykey': PUBKEY}, + 'uuid': 'b0fa911b-69d4-4476-bbe2-1c92bff6535c'} + +CONTENT_0 = 'This is contents of /etc/foo.cfg\n' +CONTENT_1 = '# this is /etc/bar/bar.cfg\n' + +CFG_DRIVE_FILES_V2 = { + 'ec2/2009-04-04/meta-data.json': json.dumps(EC2_META), + 'ec2/2009-04-04/user-data': USER_DATA, + 'ec2/latest/meta-data.json': json.dumps(EC2_META), + 'ec2/latest/user-data': USER_DATA, + 'openstack/2012-08-10/meta_data.json': json.dumps(OSTACK_META), + 'openstack/2012-08-10/user_data': USER_DATA, + 'openstack/content/0000': CONTENT_0, + 'openstack/content/0001': CONTENT_1, + 'openstack/latest/meta_data.json': json.dumps(OSTACK_META), + 'openstack/latest/user_data': USER_DATA} + + +class TestConfigDriveDataSource(TestCase): + + def setUp(self): + super(TestConfigDriveDataSource, self).setUp() + self.tmp = tempfile.mkdtemp() + + def tearDown(self): + try: + shutil.rmtree(self.tmp) + except OSError: + pass + + def test_dir_valid(self): + """Verify a dir is read as such.""" + + populate_dir(self.tmp, CFG_DRIVE_FILES_V2) + + found = ds.read_config_drive_dir(self.tmp) + + expected_md = copy(OSTACK_META) + expected_md['instance-id'] = expected_md['uuid'] + + self.assertEqual(USER_DATA, found['userdata']) + self.assertEqual(expected_md, found['metadata']) + self.assertEqual(found['files']['/etc/foo.cfg'], CONTENT_0) + self.assertEqual(found['files']['/etc/bar/bar.cfg'], CONTENT_1) + + def test_seed_dir_valid_extra(self): + """Verify extra files do not affect datasource validity.""" + + data = copy(CFG_DRIVE_FILES_V2) + data["myfoofile.txt"] = "myfoocontent" + data["openstack/latest/random-file.txt"] = "random-content" + + populate_dir(self.tmp, data) + + found = ds.read_config_drive_dir(self.tmp) + + expected_md = copy(OSTACK_META) + expected_md['instance-id'] = expected_md['uuid'] + + self.assertEqual(expected_md, found['metadata']) + + def test_seed_dir_bad_json_metadata(self): + """Verify that bad json in metadata raises BrokenConfigDriveDir.""" + data = copy(CFG_DRIVE_FILES_V2) + + data["openstack/2012-08-10/meta_data.json"] = "non-json garbage {}" + data["openstack/latest/meta_data.json"] = "non-json garbage {}" + + populate_dir(self.tmp, data) + + self.assertRaises(ds.BrokenConfigDriveDir, + ds.read_config_drive_dir, self.tmp) + + def test_seed_dir_no_configdrive(self): + """Verify that no metadata raises NonConfigDriveDir.""" + + my_d = os.path.join(self.tmp, "non-configdrive") + data = copy(CFG_DRIVE_FILES_V2) + data["myfoofile.txt"] = "myfoocontent" + data["openstack/latest/random-file.txt"] = "random-content" + data["content/foo"] = "foocontent" + + self.assertRaises(ds.NonConfigDriveDir, + ds.read_config_drive_dir, my_d) + + def test_seed_dir_missing(self): + """Verify that missing seed_dir raises NonConfigDriveDir.""" + my_d = os.path.join(self.tmp, "nonexistantdirectory") + self.assertRaises(ds.NonConfigDriveDir, + ds.read_config_drive_dir, my_d) + + def test_find_candidates(self): + devs_with_answers = { + "TYPE=vfat": [], + "TYPE=iso9660": ["/dev/vdb"], + "LABEL=config-2": ["/dev/vdb"], + } + + def my_devs_with(criteria): + return devs_with_answers[criteria] + + try: + orig_find_devs_with = util.find_devs_with + util.find_devs_with = my_devs_with + + self.assertEqual(["/dev/vdb"], ds.find_candidate_devs()) + + # add a vfat item + # zdd reverse sorts after vdb, but config-2 label is preferred + devs_with_answers['TYPE=vfat'] = ["/dev/zdd"] + self.assertEqual(["/dev/vdb", "/dev/zdd"], + ds.find_candidate_devs()) + + # verify that partitions are not considered + devs_with_answers = {"TYPE=vfat": ["/dev/sda1"], + "TYPE=iso9660": [], "LABEL=config-2": ["/dev/vdb3"]} + self.assertEqual([], ds.find_candidate_devs()) + + finally: + util.find_devs_with = orig_find_devs_with + + +def populate_dir(seed_dir, files): + for (name, content) in files.iteritems(): + path = os.path.join(seed_dir, name) + dirname = os.path.dirname(path) + if not os.path.isdir(dirname): + os.makedirs(dirname) + with open(path, "w") as fp: + fp.write(content) + fp.close() + +# vi: ts=4 expandtab diff --git a/tests/unittests/test_datasource/test_maas.py b/tests/unittests/test_datasource/test_maas.py index 261c410a..85e6add0 100644 --- a/tests/unittests/test_datasource/test_maas.py +++ b/tests/unittests/test_datasource/test_maas.py @@ -1,10 +1,8 @@ -import os -from StringIO import StringIO from copy import copy +import os -from cloudinit import util -from cloudinit import url_helper from cloudinit.sources import DataSourceMAAS +from cloudinit import url_helper from mocker import MockerTestCase @@ -17,7 +15,7 @@ class TestMAASDataSource(MockerTestCase): self.tmp = self.makeDir() def test_seed_dir_valid(self): - """Verify a valid seeddir is read as such""" + """Verify a valid seeddir is read as such.""" data = {'instance-id': 'i-valid01', 'local-hostname': 'valid01-hostname', @@ -37,7 +35,7 @@ class TestMAASDataSource(MockerTestCase): self.assertFalse(('user-data' in metadata)) def test_seed_dir_valid_extra(self): - """Verify extra files do not affect seed_dir validity """ + """Verify extra files do not affect seed_dir validity.""" data = {'instance-id': 'i-valid-extra', 'local-hostname': 'valid-extra-hostname', @@ -56,7 +54,7 @@ class TestMAASDataSource(MockerTestCase): self.assertFalse(('foo' in metadata)) def test_seed_dir_invalid(self): - """Verify that invalid seed_dir raises MAASSeedDirMalformed""" + """Verify that invalid seed_dir raises MAASSeedDirMalformed.""" valid = {'instance-id': 'i-instanceid', 'local-hostname': 'test-hostname', 'user-data': ''} @@ -80,20 +78,20 @@ class TestMAASDataSource(MockerTestCase): DataSourceMAAS.read_maas_seed_dir, my_d) def test_seed_dir_none(self): - """Verify that empty seed_dir raises MAASSeedDirNone""" + """Verify that empty seed_dir raises MAASSeedDirNone.""" my_d = os.path.join(self.tmp, "valid_empty") self.assertRaises(DataSourceMAAS.MAASSeedDirNone, DataSourceMAAS.read_maas_seed_dir, my_d) def test_seed_dir_missing(self): - """Verify that missing seed_dir raises MAASSeedDirNone""" - self.assertRaises(DataSourceMAAS.MAASSeedDirNone, + """Verify that missing seed_dir raises MAASSeedDirNone.""" + self.assertRaises(DataSourceMAAS.MAASSeedDirNone, DataSourceMAAS.read_maas_seed_dir, os.path.join(self.tmp, "nonexistantdirectory")) def test_seed_url_valid(self): - """Verify that valid seed_url is read as such""" + """Verify that valid seed_url is read as such.""" valid = {'meta-data/instance-id': 'i-instanceid', 'meta-data/local-hostname': 'test-hostname', 'meta-data/public-keys': 'test-hostname', @@ -131,11 +129,11 @@ class TestMAASDataSource(MockerTestCase): valid['meta-data/local-hostname']) def test_seed_url_invalid(self): - """Verify that invalid seed_url raises MAASSeedDirMalformed""" + """Verify that invalid seed_url raises MAASSeedDirMalformed.""" pass def test_seed_url_missing(self): - """Verify seed_url with no found entries raises MAASSeedDirNone""" + """Verify seed_url with no found entries raises MAASSeedDirNone.""" pass diff --git a/tests/unittests/test_distros/test_generic.py b/tests/unittests/test_distros/test_generic.py new file mode 100644 index 00000000..2df4c2f0 --- /dev/null +++ b/tests/unittests/test_distros/test_generic.py @@ -0,0 +1,121 @@ +from mocker import MockerTestCase + +from cloudinit import distros + +unknown_arch_info = { + 'arches': ['default'], + 'failsafe': {'primary': 'http://fs-primary-default', + 'security': 'http://fs-security-default'} +} + +package_mirrors = [ + {'arches': ['i386', 'amd64'], + 'failsafe': {'primary': 'http://fs-primary-intel', + 'security': 'http://fs-security-intel'}, + 'search': { + 'primary': ['http://%(ec2_region)s.ec2/', + 'http://%(availability_zone)s.clouds/'], + 'security': ['http://security-mirror1-intel', + 'http://security-mirror2-intel']}}, + {'arches': ['armhf', 'armel'], + 'failsafe': {'primary': 'http://fs-primary-arm', + 'security': 'http://fs-security-arm'}}, + unknown_arch_info +] + +gpmi = distros._get_package_mirror_info # pylint: disable=W0212 +gapmi = distros._get_arch_package_mirror_info # pylint: disable=W0212 + + +class TestGenericDistro(MockerTestCase): + + def return_first(self, mlist): + if not mlist: + return None + return mlist[0] + + def return_second(self, mlist): + if not mlist: + return None + return mlist[1] + + def return_none(self, _mlist): + return None + + def return_last(self, mlist): + if not mlist: + return None + return(mlist[-1]) + + def setUp(self): + super(TestGenericDistro, self).setUp() + # Make a temp directoy for tests to use. + self.tmp = self.makeDir() + + def test_arch_package_mirror_info_unknown(self): + """for an unknown arch, we should get back that with arch 'default'.""" + arch_mirrors = gapmi(package_mirrors, arch="unknown") + self.assertEqual(unknown_arch_info, arch_mirrors) + + def test_arch_package_mirror_info_known(self): + arch_mirrors = gapmi(package_mirrors, arch="amd64") + self.assertEqual(package_mirrors[0], arch_mirrors) + + def test_get_package_mirror_info_az_ec2(self): + arch_mirrors = gapmi(package_mirrors, arch="amd64") + + results = gpmi(arch_mirrors, availability_zone="us-east-1a", + mirror_filter=self.return_first) + self.assertEqual(results, + {'primary': 'http://us-east-1.ec2/', + 'security': 'http://security-mirror1-intel'}) + + results = gpmi(arch_mirrors, availability_zone="us-east-1a", + mirror_filter=self.return_second) + self.assertEqual(results, + {'primary': 'http://us-east-1a.clouds/', + 'security': 'http://security-mirror2-intel'}) + + results = gpmi(arch_mirrors, availability_zone="us-east-1a", + mirror_filter=self.return_none) + self.assertEqual(results, package_mirrors[0]['failsafe']) + + def test_get_package_mirror_info_az_non_ec2(self): + arch_mirrors = gapmi(package_mirrors, arch="amd64") + + results = gpmi(arch_mirrors, availability_zone="nova.cloudvendor", + mirror_filter=self.return_first) + self.assertEqual(results, + {'primary': 'http://nova.cloudvendor.clouds/', + 'security': 'http://security-mirror1-intel'}) + + results = gpmi(arch_mirrors, availability_zone="nova.cloudvendor", + mirror_filter=self.return_last) + self.assertEqual(results, + {'primary': 'http://nova.cloudvendor.clouds/', + 'security': 'http://security-mirror2-intel'}) + + def test_get_package_mirror_info_none(self): + arch_mirrors = gapmi(package_mirrors, arch="amd64") + + # because both search entries here replacement based on + # availability-zone, the filter will be called with an empty list and + # failsafe should be taken. + results = gpmi(arch_mirrors, availability_zone=None, + mirror_filter=self.return_first) + self.assertEqual(results, + {'primary': 'http://fs-primary-intel', + 'security': 'http://security-mirror1-intel'}) + + results = gpmi(arch_mirrors, availability_zone=None, + mirror_filter=self.return_last) + self.assertEqual(results, + {'primary': 'http://fs-primary-intel', + 'security': 'http://security-mirror2-intel'}) + + +#def _get_package_mirror_info(mirror_info, availability_zone=None, +# mirror_filter=util.search_for_mirror): + + +# vi: ts=4 expandtab diff --git a/tests/unittests/test_handler/test_handler_ca_certs.py b/tests/unittests/test_handler/test_handler_ca_certs.py index 1f96e992..d3df5c50 100644 --- a/tests/unittests/test_handler/test_handler_ca_certs.py +++ b/tests/unittests/test_handler/test_handler_ca_certs.py @@ -1,8 +1,8 @@ from mocker import MockerTestCase -from cloudinit import util from cloudinit import cloud from cloudinit import helpers +from cloudinit import util from cloudinit.config import cc_ca_certs @@ -26,7 +26,8 @@ class TestNoConfig(MockerTestCase): self.mocker.replace(cc_ca_certs.update_ca_certs, passthrough=False) self.mocker.replay() - cc_ca_certs.handle(self.name, config, self.cloud_init, self.log, self.args) + cc_ca_certs.handle(self.name, config, self.cloud_init, self.log, + self.args) class TestConfig(MockerTestCase): @@ -39,11 +40,12 @@ class TestConfig(MockerTestCase): self.args = [] # Mock out the functions that actually modify the system - self.mock_add = self.mocker.replace(cc_ca_certs.add_ca_certs, passthrough=False) + self.mock_add = self.mocker.replace(cc_ca_certs.add_ca_certs, + passthrough=False) self.mock_update = self.mocker.replace(cc_ca_certs.update_ca_certs, passthrough=False) - self.mock_remove = self.mocker.replace(cc_ca_certs.remove_default_ca_certs, - passthrough=False) + self.mock_remove = self.mocker.replace( + cc_ca_certs.remove_default_ca_certs, passthrough=False) # Order must be correct self.mocker.order() @@ -62,7 +64,7 @@ class TestConfig(MockerTestCase): cc_ca_certs.handle(self.name, config, self.cloud, self.log, self.args) def test_empty_trusted_list(self): - """Test that no certificate are written if 'trusted' list is empty""" + """Test that no certificate are written if 'trusted' list is empty.""" config = {"ca-certs": {"trusted": []}} # No functions should be called @@ -72,7 +74,7 @@ class TestConfig(MockerTestCase): cc_ca_certs.handle(self.name, config, self.cloud, self.log, self.args) def test_single_trusted(self): - """Test that a single cert gets passed to add_ca_certs""" + """Test that a single cert gets passed to add_ca_certs.""" config = {"ca-certs": {"trusted": ["CERT1"]}} self.mock_add(self.paths, ["CERT1"]) @@ -82,7 +84,7 @@ class TestConfig(MockerTestCase): cc_ca_certs.handle(self.name, config, self.cloud, self.log, self.args) def test_multiple_trusted(self): - """Test that multiple certs get passed to add_ca_certs""" + """Test that multiple certs get passed to add_ca_certs.""" config = {"ca-certs": {"trusted": ["CERT1", "CERT2"]}} self.mock_add(self.paths, ["CERT1", "CERT2"]) @@ -92,7 +94,7 @@ class TestConfig(MockerTestCase): cc_ca_certs.handle(self.name, config, self.cloud, self.log, self.args) def test_remove_default_ca_certs(self): - """Test remove_defaults works as expected""" + """Test remove_defaults works as expected.""" config = {"ca-certs": {"remove-defaults": True}} self.mock_remove(self.paths) @@ -102,7 +104,7 @@ class TestConfig(MockerTestCase): cc_ca_certs.handle(self.name, config, self.cloud, self.log, self.args) def test_no_remove_defaults_if_false(self): - """Test remove_defaults is not called when config value is False""" + """Test remove_defaults is not called when config value is False.""" config = {"ca-certs": {"remove-defaults": False}} self.mock_update() @@ -111,7 +113,7 @@ class TestConfig(MockerTestCase): cc_ca_certs.handle(self.name, config, self.cloud, self.log, self.args) def test_correct_order_for_remove_then_add(self): - """Test remove_defaults is not called when config value is False""" + """Test remove_defaults is not called when config value is False.""" config = {"ca-certs": {"remove-defaults": True, "trusted": ["CERT1"]}} self.mock_remove(self.paths) @@ -137,7 +139,7 @@ class TestAddCaCerts(MockerTestCase): cc_ca_certs.add_ca_certs(self.paths, []) def test_single_cert(self): - """Test adding a single certificate to the trusted CAs""" + """Test adding a single certificate to the trusted CAs.""" cert = "CERT1\nLINE2\nLINE3" mock_write = self.mocker.replace(util.write_file, passthrough=False) @@ -150,7 +152,7 @@ class TestAddCaCerts(MockerTestCase): cc_ca_certs.add_ca_certs(self.paths, [cert]) def test_multiple_certs(self): - """Test adding multiple certificates to the trusted CAs""" + """Test adding multiple certificates to the trusted CAs.""" certs = ["CERT1\nLINE2\nLINE3", "CERT2\nLINE2\nLINE3"] expected_cert_file = "\n".join(certs) @@ -183,8 +185,8 @@ class TestRemoveDefaultCaCerts(MockerTestCase): }) def test_commands(self): - mock_delete_dir_contents = self.mocker.replace(util.delete_dir_contents, - passthrough=False) + mock_delete_dir_contents = self.mocker.replace( + util.delete_dir_contents, passthrough=False) mock_write = self.mocker.replace(util.write_file, passthrough=False) mock_subp = self.mocker.replace(util.subp, passthrough=False) diff --git a/tests/unittests/test_userdata.py b/tests/unittests/test_userdata.py index 861642b6..82a4c555 100644 --- a/tests/unittests/test_userdata.py +++ b/tests/unittests/test_userdata.py @@ -1,21 +1,17 @@ -"""Tests for handling of userdata within cloud init""" +"""Tests for handling of userdata within cloud init.""" import StringIO import logging import os -import shutil -import tempfile from email.mime.base import MIMEBase from mocker import MockerTestCase -from cloudinit import helpers from cloudinit import log from cloudinit import sources from cloudinit import stages -from cloudinit import util INSTANCE_ID = "i-testing" @@ -58,7 +54,7 @@ class TestConsumeUserData(MockerTestCase): return log_file def test_unhandled_type_warning(self): - """Raw text without magic is ignored but shows warning""" + """Raw text without magic is ignored but shows warning.""" ci = stages.Init() data = "arbitrary text\n" ci.datasource = FakeDataSource(data) @@ -74,7 +70,7 @@ class TestConsumeUserData(MockerTestCase): log_file.getvalue()) def test_mime_text_plain(self): - """Mime message of type text/plain is ignored but shows warning""" + """Mime message of type text/plain is ignored but shows warning.""" ci = stages.Init() message = MIMEBase("text", "plain") message.set_payload("Just text") @@ -90,9 +86,8 @@ class TestConsumeUserData(MockerTestCase): "Unhandled unknown content-type (text/plain)", log_file.getvalue()) - def test_shellscript(self): - """Raw text starting #!/bin/sh is treated as script""" + """Raw text starting #!/bin/sh is treated as script.""" ci = stages.Init() script = "#!/bin/sh\necho hello\n" ci.datasource = FakeDataSource(script) @@ -108,7 +103,7 @@ class TestConsumeUserData(MockerTestCase): self.assertEqual("", log_file.getvalue()) def test_mime_text_x_shellscript(self): - """Mime message of type text/x-shellscript is treated as script""" + """Mime message of type text/x-shellscript is treated as script.""" ci = stages.Init() script = "#!/bin/sh\necho hello\n" message = MIMEBase("text", "x-shellscript") @@ -126,7 +121,7 @@ class TestConsumeUserData(MockerTestCase): self.assertEqual("", log_file.getvalue()) def test_mime_text_plain_shell(self): - """Mime type text/plain starting #!/bin/sh is treated as script""" + """Mime type text/plain starting #!/bin/sh is treated as script.""" ci = stages.Init() script = "#!/bin/sh\necho hello\n" message = MIMEBase("text", "plain") diff --git a/tests/unittests/test_util.py b/tests/unittests/test_util.py index 93979f06..15fcbd26 100644 --- a/tests/unittests/test_util.py +++ b/tests/unittests/test_util.py @@ -1,11 +1,11 @@ import os import stat -from unittest import TestCase from mocker import MockerTestCase +from unittest import TestCase -from cloudinit import util from cloudinit import importer +from cloudinit import util class FakeSelinux(object): @@ -14,7 +14,7 @@ class FakeSelinux(object): self.match_what = match_what self.restored = [] - def matchpathcon(self, path, mode): + def matchpathcon(self, path, mode): # pylint: disable=W0613 if path == self.match_what: return else: @@ -23,7 +23,7 @@ class FakeSelinux(object): def is_selinux_enabled(self): return True - def restorecon(self, path, recursive): + def restorecon(self, path, recursive): # pylint: disable=W0613 self.restored.append(path) |