summaryrefslogtreecommitdiff
path: root/tests/unittests/test_datasource
diff options
context:
space:
mode:
Diffstat (limited to 'tests/unittests/test_datasource')
-rw-r--r--tests/unittests/test_datasource/test_altcloud.py445
-rw-r--r--tests/unittests/test_datasource/test_configdrive.py177
-rw-r--r--tests/unittests/test_datasource/test_maas.py24
3 files changed, 633 insertions, 13 deletions
diff --git a/tests/unittests/test_datasource/test_altcloud.py b/tests/unittests/test_datasource/test_altcloud.py
new file mode 100644
index 00000000..bda61c7e
--- /dev/null
+++ b/tests/unittests/test_datasource/test_altcloud.py
@@ -0,0 +1,445 @@
+# vi: ts=4 expandtab
+#
+# Copyright (C) 2009-2010 Canonical Ltd.
+# Copyright (C) 2012 Hewlett-Packard Development Company, L.P.
+# Copyright (C) 2012 Yahoo! Inc.
+#
+# Author: Joe VLcek <JVLcek@RedHat.com>
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License version 3, as
+# published by the Free Software Foundation.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+'''
+This test file exercises the code in sources DataSourceAltCloud.py
+'''
+
+import os
+import shutil
+import tempfile
+
+from cloudinit import helpers
+from unittest import TestCase
+
+# Get the cloudinit.sources.DataSourceAltCloud import items needed.
+import cloudinit.sources.DataSourceAltCloud
+from cloudinit.sources.DataSourceAltCloud import DataSourceAltCloud
+from cloudinit.sources.DataSourceAltCloud import read_user_data_callback
+
+
+def _write_cloud_info_file(value):
+ '''
+ Populate the CLOUD_INFO_FILE which would be populated
+ with a cloud backend identifier ImageFactory when building
+ an image with ImageFactory.
+ '''
+ cifile = open(cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE, 'w')
+ cifile.write(value)
+ cifile.close()
+ os.chmod(cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE, 0664)
+
+
+def _remove_cloud_info_file():
+ '''
+ Remove the test CLOUD_INFO_FILE
+ '''
+ os.remove(cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE)
+
+
+def _write_user_data_files(mount_dir, value):
+ '''
+ Populate the deltacloud_user_data_file the user_data_file
+ which would be populated with user data.
+ '''
+ deltacloud_user_data_file = mount_dir + '/deltacloud-user-data.txt'
+ user_data_file = mount_dir + '/user-data.txt'
+
+ udfile = open(deltacloud_user_data_file, 'w')
+ udfile.write(value)
+ udfile.close()
+ os.chmod(deltacloud_user_data_file, 0664)
+
+ udfile = open(user_data_file, 'w')
+ udfile.write(value)
+ udfile.close()
+ os.chmod(user_data_file, 0664)
+
+
+def _remove_user_data_files(mount_dir,
+ dc_file=True,
+ non_dc_file=True):
+ '''
+ Remove the test files: deltacloud_user_data_file and
+ user_data_file
+ '''
+ deltacloud_user_data_file = mount_dir + '/deltacloud-user-data.txt'
+ user_data_file = mount_dir + '/user-data.txt'
+
+ # Ignore any failures removeing files that are already gone.
+ if dc_file:
+ try:
+ os.remove(deltacloud_user_data_file)
+ except OSError:
+ pass
+
+ if non_dc_file:
+ try:
+ os.remove(user_data_file)
+ except OSError:
+ pass
+
+
+class TestGetCloudType(TestCase):
+ '''
+ Test to exercise method: DataSourceAltCloud.get_cloud_type()
+ '''
+
+ def setUp(self):
+ '''Set up.'''
+ self.paths = helpers.Paths({'cloud_dir': '/tmp'})
+
+ def tearDown(self):
+ # Reset
+ cloudinit.sources.DataSourceAltCloud.CMD_DMI_SYSTEM = \
+ ['dmidecode', '--string', 'system-product-name']
+
+ def test_rhev(self):
+ '''
+ Test method get_cloud_type() for RHEVm systems.
+ Forcing dmidecode return to match a RHEVm system: RHEV Hypervisor
+ '''
+ cloudinit.sources.DataSourceAltCloud.CMD_DMI_SYSTEM = \
+ ['echo', 'RHEV Hypervisor']
+ dsrc = DataSourceAltCloud({}, None, self.paths)
+ self.assertEquals('RHEV', \
+ dsrc.get_cloud_type())
+
+ def test_vsphere(self):
+ '''
+ Test method get_cloud_type() for vSphere systems.
+ Forcing dmidecode return to match a vSphere system: RHEV Hypervisor
+ '''
+ cloudinit.sources.DataSourceAltCloud.CMD_DMI_SYSTEM = \
+ ['echo', 'VMware Virtual Platform']
+ dsrc = DataSourceAltCloud({}, None, self.paths)
+ self.assertEquals('VSPHERE', \
+ dsrc.get_cloud_type())
+
+ def test_unknown(self):
+ '''
+ Test method get_cloud_type() for unknown systems.
+ Forcing dmidecode return to match an unrecognized return.
+ '''
+ cloudinit.sources.DataSourceAltCloud.CMD_DMI_SYSTEM = \
+ ['echo', 'Unrecognized Platform']
+ dsrc = DataSourceAltCloud({}, None, self.paths)
+ self.assertEquals('UNKNOWN', \
+ dsrc.get_cloud_type())
+
+ def test_exception1(self):
+ '''
+ Test method get_cloud_type() where command dmidecode fails.
+ '''
+ cloudinit.sources.DataSourceAltCloud.CMD_DMI_SYSTEM = \
+ ['ls', 'bad command']
+ dsrc = DataSourceAltCloud({}, None, self.paths)
+ self.assertEquals('UNKNOWN', \
+ dsrc.get_cloud_type())
+
+ def test_exception2(self):
+ '''
+ Test method get_cloud_type() where command dmidecode is not available.
+ '''
+ cloudinit.sources.DataSourceAltCloud.CMD_DMI_SYSTEM = \
+ ['bad command']
+ dsrc = DataSourceAltCloud({}, None, self.paths)
+ self.assertEquals('UNKNOWN', \
+ dsrc.get_cloud_type())
+
+
+class TestGetDataCloudInfoFile(TestCase):
+ '''
+ Test to exercise method: DataSourceAltCloud.get_data()
+ With a contrived CLOUD_INFO_FILE
+ '''
+ def setUp(self):
+ '''Set up.'''
+ self.paths = helpers.Paths({'cloud_dir': '/tmp'})
+ self.cloud_info_file = tempfile.mkstemp()[1]
+ cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE = \
+ self.cloud_info_file
+
+ def tearDown(self):
+ # Reset
+
+ # Attempt to remove the temp file ignoring errors
+ try:
+ os.remove(self.cloud_info_file)
+ except OSError:
+ pass
+
+ cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE = \
+ '/etc/sysconfig/cloud-info'
+
+ def test_rhev(self):
+ '''Success Test module get_data() forcing RHEV.'''
+
+ _write_cloud_info_file('RHEV')
+ dsrc = DataSourceAltCloud({}, None, self.paths)
+ dsrc.user_data_rhevm = lambda: True
+ self.assertEquals(True, dsrc.get_data())
+
+ def test_vsphere(self):
+ '''Success Test module get_data() forcing VSPHERE.'''
+
+ _write_cloud_info_file('VSPHERE')
+ dsrc = DataSourceAltCloud({}, None, self.paths)
+ dsrc.user_data_vsphere = lambda: True
+ self.assertEquals(True, dsrc.get_data())
+
+ def test_fail_rhev(self):
+ '''Failure Test module get_data() forcing RHEV.'''
+
+ _write_cloud_info_file('RHEV')
+ dsrc = DataSourceAltCloud({}, None, self.paths)
+ dsrc.user_data_rhevm = lambda: False
+ self.assertEquals(False, dsrc.get_data())
+
+ def test_fail_vsphere(self):
+ '''Failure Test module get_data() forcing VSPHERE.'''
+
+ _write_cloud_info_file('VSPHERE')
+ dsrc = DataSourceAltCloud({}, None, self.paths)
+ dsrc.user_data_vsphere = lambda: False
+ self.assertEquals(False, dsrc.get_data())
+
+ def test_unrecognized(self):
+ '''Failure Test module get_data() forcing unrecognized.'''
+
+ _write_cloud_info_file('unrecognized')
+ dsrc = DataSourceAltCloud({}, None, self.paths)
+ self.assertEquals(False, dsrc.get_data())
+
+
+class TestGetDataNoCloudInfoFile(TestCase):
+ '''
+ Test to exercise method: DataSourceAltCloud.get_data()
+ Without a CLOUD_INFO_FILE
+ '''
+ def setUp(self):
+ '''Set up.'''
+ self.paths = helpers.Paths({'cloud_dir': '/tmp'})
+ cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE = \
+ 'no such file'
+
+ def tearDown(self):
+ # Reset
+ cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE = \
+ '/etc/sysconfig/cloud-info'
+ cloudinit.sources.DataSourceAltCloud.CMD_DMI_SYSTEM = \
+ ['dmidecode', '--string', 'system-product-name']
+
+ def test_rhev_no_cloud_file(self):
+ '''Test No cloud info file module get_data() forcing RHEV.'''
+
+ cloudinit.sources.DataSourceAltCloud.CMD_DMI_SYSTEM = \
+ ['echo', 'RHEV Hypervisor']
+ dsrc = DataSourceAltCloud({}, None, self.paths)
+ dsrc.user_data_rhevm = lambda: True
+ self.assertEquals(True, dsrc.get_data())
+
+ def test_vsphere_no_cloud_file(self):
+ '''Test No cloud info file module get_data() forcing VSPHERE.'''
+
+ cloudinit.sources.DataSourceAltCloud.CMD_DMI_SYSTEM = \
+ ['echo', 'VMware Virtual Platform']
+ dsrc = DataSourceAltCloud({}, None, self.paths)
+ dsrc.user_data_vsphere = lambda: True
+ self.assertEquals(True, dsrc.get_data())
+
+ def test_failure_no_cloud_file(self):
+ '''Test No cloud info file module get_data() forcing unrecognized.'''
+
+ cloudinit.sources.DataSourceAltCloud.CMD_DMI_SYSTEM = \
+ ['echo', 'Unrecognized Platform']
+ dsrc = DataSourceAltCloud({}, None, self.paths)
+ self.assertEquals(False, dsrc.get_data())
+
+
+class TestUserDataRhevm(TestCase):
+ '''
+ Test to exercise method: DataSourceAltCloud.user_data_rhevm()
+ '''
+ def setUp(self):
+ '''Set up.'''
+ self.paths = helpers.Paths({'cloud_dir': '/tmp'})
+ self.mount_dir = tempfile.mkdtemp()
+
+ _write_user_data_files(self.mount_dir, 'test user data')
+
+ def tearDown(self):
+ # Reset
+
+ _remove_user_data_files(self.mount_dir)
+
+ # Attempt to remove the temp dir ignoring errors
+ try:
+ shutil.rmtree(self.mount_dir)
+ except OSError:
+ pass
+
+ cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE = \
+ '/etc/sysconfig/cloud-info'
+ cloudinit.sources.DataSourceAltCloud.CMD_PROBE_FLOPPY = \
+ ['/sbin/modprobe', 'floppy']
+ cloudinit.sources.DataSourceAltCloud.CMD_UDEVADM_SETTLE = \
+ ['/sbin/udevadm', 'settle', '--quiet', '--timeout=5']
+
+ def test_mount_cb_fails(self):
+ '''Test user_data_rhevm() where mount_cb fails.'''
+
+ cloudinit.sources.DataSourceAltCloud.CMD_PROBE_FLOPPY = \
+ ['echo', 'modprobe floppy']
+
+ dsrc = DataSourceAltCloud({}, None, self.paths)
+
+ self.assertEquals(False, dsrc.user_data_rhevm())
+
+ def test_modprobe_fails(self):
+ '''Test user_data_rhevm() where modprobe fails.'''
+
+ cloudinit.sources.DataSourceAltCloud.CMD_PROBE_FLOPPY = \
+ ['ls', 'modprobe floppy']
+
+ dsrc = DataSourceAltCloud({}, None, self.paths)
+
+ self.assertEquals(False, dsrc.user_data_rhevm())
+
+ def test_no_modprobe_cmd(self):
+ '''Test user_data_rhevm() with no modprobe command.'''
+
+ cloudinit.sources.DataSourceAltCloud.CMD_PROBE_FLOPPY = \
+ ['bad command', 'modprobe floppy']
+
+ dsrc = DataSourceAltCloud({}, None, self.paths)
+
+ self.assertEquals(False, dsrc.user_data_rhevm())
+
+ def test_udevadm_fails(self):
+ '''Test user_data_rhevm() where udevadm fails.'''
+
+ cloudinit.sources.DataSourceAltCloud.CMD_UDEVADM_SETTLE = \
+ ['ls', 'udevadm floppy']
+
+ dsrc = DataSourceAltCloud({}, None, self.paths)
+
+ self.assertEquals(False, dsrc.user_data_rhevm())
+
+ def test_no_udevadm_cmd(self):
+ '''Test user_data_rhevm() with no udevadm command.'''
+
+ cloudinit.sources.DataSourceAltCloud.CMD_UDEVADM_SETTLE = \
+ ['bad command', 'udevadm floppy']
+
+ dsrc = DataSourceAltCloud({}, None, self.paths)
+
+ self.assertEquals(False, dsrc.user_data_rhevm())
+
+
+class TestUserDataVsphere(TestCase):
+ '''
+ Test to exercise method: DataSourceAltCloud.user_data_vsphere()
+ '''
+ def setUp(self):
+ '''Set up.'''
+ self.paths = helpers.Paths({'cloud_dir': '/tmp'})
+ self.mount_dir = tempfile.mkdtemp()
+
+ _write_user_data_files(self.mount_dir, 'test user data')
+
+ def tearDown(self):
+ # Reset
+
+ _remove_user_data_files(self.mount_dir)
+
+ # Attempt to remove the temp dir ignoring errors
+ try:
+ shutil.rmtree(self.mount_dir)
+ except OSError:
+ pass
+
+ cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE = \
+ '/etc/sysconfig/cloud-info'
+
+ def test_user_data_vsphere(self):
+ '''Test user_data_vsphere() where mount_cb fails.'''
+
+ cloudinit.sources.DataSourceAltCloud.MEDIA_DIR = self.mount_dir
+
+ dsrc = DataSourceAltCloud({}, None, self.paths)
+
+ self.assertEquals(False, dsrc.user_data_vsphere())
+
+
+class TestReadUserDataCallback(TestCase):
+ '''
+ Test to exercise method: DataSourceAltCloud.read_user_data_callback()
+ '''
+ def setUp(self):
+ '''Set up.'''
+ self.paths = helpers.Paths({'cloud_dir': '/tmp'})
+ self.mount_dir = tempfile.mkdtemp()
+
+ _write_user_data_files(self.mount_dir, 'test user data')
+
+ def tearDown(self):
+ # Reset
+
+ _remove_user_data_files(self.mount_dir)
+
+ # Attempt to remove the temp dir ignoring errors
+ try:
+ shutil.rmtree(self.mount_dir)
+ except OSError:
+ pass
+
+ def test_callback_both(self):
+ '''Test read_user_data_callback() with both files.'''
+
+ self.assertEquals('test user data',
+ read_user_data_callback(self.mount_dir))
+
+ def test_callback_dc(self):
+ '''Test read_user_data_callback() with only DC file.'''
+
+ _remove_user_data_files(self.mount_dir,
+ dc_file=False,
+ non_dc_file=True)
+
+ self.assertEquals('test user data',
+ read_user_data_callback(self.mount_dir))
+
+ def test_callback_non_dc(self):
+ '''Test read_user_data_callback() with only non-DC file.'''
+
+ _remove_user_data_files(self.mount_dir,
+ dc_file=True,
+ non_dc_file=False)
+
+ self.assertEquals('test user data',
+ read_user_data_callback(self.mount_dir))
+
+ def test_callback_none(self):
+ '''Test read_user_data_callback() no files are found.'''
+
+ _remove_user_data_files(self.mount_dir)
+ self.assertEquals(None, read_user_data_callback(self.mount_dir))
+
+# vi: ts=4 expandtab
diff --git a/tests/unittests/test_datasource/test_configdrive.py b/tests/unittests/test_datasource/test_configdrive.py
new file mode 100644
index 00000000..55573114
--- /dev/null
+++ b/tests/unittests/test_datasource/test_configdrive.py
@@ -0,0 +1,177 @@
+from copy import copy
+import json
+import os
+import os.path
+import shutil
+import tempfile
+from unittest import TestCase
+
+from cloudinit.sources import DataSourceConfigDrive as ds
+from cloudinit import util
+
+
+PUBKEY = u'ssh-rsa AAAAB3NzaC1....sIkJhq8wdX+4I3A4cYbYP ubuntu@server-460\n'
+EC2_META = {
+ 'ami-id': 'ami-00000001',
+ 'ami-launch-index': 0,
+ 'ami-manifest-path': 'FIXME',
+ 'block-device-mapping': {
+ 'ami': 'sda1',
+ 'ephemeral0': 'sda2',
+ 'root': '/dev/sda1',
+ 'swap': 'sda3'},
+ 'hostname': 'sm-foo-test.novalocal',
+ 'instance-action': 'none',
+ 'instance-id': 'i-00000001',
+ 'instance-type': 'm1.tiny',
+ 'local-hostname': 'sm-foo-test.novalocal',
+ 'local-ipv4': None,
+ 'placement': {'availability-zone': 'nova'},
+ 'public-hostname': 'sm-foo-test.novalocal',
+ 'public-ipv4': '',
+ 'public-keys': {'0': {'openssh-key': PUBKEY}},
+ 'reservation-id': 'r-iru5qm4m',
+ 'security-groups': ['default']
+}
+USER_DATA = '#!/bin/sh\necho This is user data\n'
+OSTACK_META = {
+ 'availability_zone': 'nova',
+ 'files': [{'content_path': '/content/0000', 'path': '/etc/foo.cfg'},
+ {'content_path': '/content/0001', 'path': '/etc/bar/bar.cfg'}],
+ 'hostname': 'sm-foo-test.novalocal',
+ 'meta': {'dsmode': 'local', 'my-meta': 'my-value'},
+ 'name': 'sm-foo-test',
+ 'public_keys': {'mykey': PUBKEY},
+ 'uuid': 'b0fa911b-69d4-4476-bbe2-1c92bff6535c'}
+
+CONTENT_0 = 'This is contents of /etc/foo.cfg\n'
+CONTENT_1 = '# this is /etc/bar/bar.cfg\n'
+
+CFG_DRIVE_FILES_V2 = {
+ 'ec2/2009-04-04/meta-data.json': json.dumps(EC2_META),
+ 'ec2/2009-04-04/user-data': USER_DATA,
+ 'ec2/latest/meta-data.json': json.dumps(EC2_META),
+ 'ec2/latest/user-data': USER_DATA,
+ 'openstack/2012-08-10/meta_data.json': json.dumps(OSTACK_META),
+ 'openstack/2012-08-10/user_data': USER_DATA,
+ 'openstack/content/0000': CONTENT_0,
+ 'openstack/content/0001': CONTENT_1,
+ 'openstack/latest/meta_data.json': json.dumps(OSTACK_META),
+ 'openstack/latest/user_data': USER_DATA}
+
+
+class TestConfigDriveDataSource(TestCase):
+
+ def setUp(self):
+ super(TestConfigDriveDataSource, self).setUp()
+ self.tmp = tempfile.mkdtemp()
+
+ def tearDown(self):
+ try:
+ shutil.rmtree(self.tmp)
+ except OSError:
+ pass
+
+ def test_dir_valid(self):
+ """Verify a dir is read as such."""
+
+ populate_dir(self.tmp, CFG_DRIVE_FILES_V2)
+
+ found = ds.read_config_drive_dir(self.tmp)
+
+ expected_md = copy(OSTACK_META)
+ expected_md['instance-id'] = expected_md['uuid']
+
+ self.assertEqual(USER_DATA, found['userdata'])
+ self.assertEqual(expected_md, found['metadata'])
+ self.assertEqual(found['files']['/etc/foo.cfg'], CONTENT_0)
+ self.assertEqual(found['files']['/etc/bar/bar.cfg'], CONTENT_1)
+
+ def test_seed_dir_valid_extra(self):
+ """Verify extra files do not affect datasource validity."""
+
+ data = copy(CFG_DRIVE_FILES_V2)
+ data["myfoofile.txt"] = "myfoocontent"
+ data["openstack/latest/random-file.txt"] = "random-content"
+
+ populate_dir(self.tmp, data)
+
+ found = ds.read_config_drive_dir(self.tmp)
+
+ expected_md = copy(OSTACK_META)
+ expected_md['instance-id'] = expected_md['uuid']
+
+ self.assertEqual(expected_md, found['metadata'])
+
+ def test_seed_dir_bad_json_metadata(self):
+ """Verify that bad json in metadata raises BrokenConfigDriveDir."""
+ data = copy(CFG_DRIVE_FILES_V2)
+
+ data["openstack/2012-08-10/meta_data.json"] = "non-json garbage {}"
+ data["openstack/latest/meta_data.json"] = "non-json garbage {}"
+
+ populate_dir(self.tmp, data)
+
+ self.assertRaises(ds.BrokenConfigDriveDir,
+ ds.read_config_drive_dir, self.tmp)
+
+ def test_seed_dir_no_configdrive(self):
+ """Verify that no metadata raises NonConfigDriveDir."""
+
+ my_d = os.path.join(self.tmp, "non-configdrive")
+ data = copy(CFG_DRIVE_FILES_V2)
+ data["myfoofile.txt"] = "myfoocontent"
+ data["openstack/latest/random-file.txt"] = "random-content"
+ data["content/foo"] = "foocontent"
+
+ self.assertRaises(ds.NonConfigDriveDir,
+ ds.read_config_drive_dir, my_d)
+
+ def test_seed_dir_missing(self):
+ """Verify that missing seed_dir raises NonConfigDriveDir."""
+ my_d = os.path.join(self.tmp, "nonexistantdirectory")
+ self.assertRaises(ds.NonConfigDriveDir,
+ ds.read_config_drive_dir, my_d)
+
+ def test_find_candidates(self):
+ devs_with_answers = {
+ "TYPE=vfat": [],
+ "TYPE=iso9660": ["/dev/vdb"],
+ "LABEL=config-2": ["/dev/vdb"],
+ }
+
+ def my_devs_with(criteria):
+ return devs_with_answers[criteria]
+
+ try:
+ orig_find_devs_with = util.find_devs_with
+ util.find_devs_with = my_devs_with
+
+ self.assertEqual(["/dev/vdb"], ds.find_candidate_devs())
+
+ # add a vfat item
+ # zdd reverse sorts after vdb, but config-2 label is preferred
+ devs_with_answers['TYPE=vfat'] = ["/dev/zdd"]
+ self.assertEqual(["/dev/vdb", "/dev/zdd"],
+ ds.find_candidate_devs())
+
+ # verify that partitions are not considered
+ devs_with_answers = {"TYPE=vfat": ["/dev/sda1"],
+ "TYPE=iso9660": [], "LABEL=config-2": ["/dev/vdb3"]}
+ self.assertEqual([], ds.find_candidate_devs())
+
+ finally:
+ util.find_devs_with = orig_find_devs_with
+
+
+def populate_dir(seed_dir, files):
+ for (name, content) in files.iteritems():
+ path = os.path.join(seed_dir, name)
+ dirname = os.path.dirname(path)
+ if not os.path.isdir(dirname):
+ os.makedirs(dirname)
+ with open(path, "w") as fp:
+ fp.write(content)
+ fp.close()
+
+# vi: ts=4 expandtab
diff --git a/tests/unittests/test_datasource/test_maas.py b/tests/unittests/test_datasource/test_maas.py
index 261c410a..85e6add0 100644
--- a/tests/unittests/test_datasource/test_maas.py
+++ b/tests/unittests/test_datasource/test_maas.py
@@ -1,10 +1,8 @@
-import os
-from StringIO import StringIO
from copy import copy
+import os
-from cloudinit import util
-from cloudinit import url_helper
from cloudinit.sources import DataSourceMAAS
+from cloudinit import url_helper
from mocker import MockerTestCase
@@ -17,7 +15,7 @@ class TestMAASDataSource(MockerTestCase):
self.tmp = self.makeDir()
def test_seed_dir_valid(self):
- """Verify a valid seeddir is read as such"""
+ """Verify a valid seeddir is read as such."""
data = {'instance-id': 'i-valid01',
'local-hostname': 'valid01-hostname',
@@ -37,7 +35,7 @@ class TestMAASDataSource(MockerTestCase):
self.assertFalse(('user-data' in metadata))
def test_seed_dir_valid_extra(self):
- """Verify extra files do not affect seed_dir validity """
+ """Verify extra files do not affect seed_dir validity."""
data = {'instance-id': 'i-valid-extra',
'local-hostname': 'valid-extra-hostname',
@@ -56,7 +54,7 @@ class TestMAASDataSource(MockerTestCase):
self.assertFalse(('foo' in metadata))
def test_seed_dir_invalid(self):
- """Verify that invalid seed_dir raises MAASSeedDirMalformed"""
+ """Verify that invalid seed_dir raises MAASSeedDirMalformed."""
valid = {'instance-id': 'i-instanceid',
'local-hostname': 'test-hostname', 'user-data': ''}
@@ -80,20 +78,20 @@ class TestMAASDataSource(MockerTestCase):
DataSourceMAAS.read_maas_seed_dir, my_d)
def test_seed_dir_none(self):
- """Verify that empty seed_dir raises MAASSeedDirNone"""
+ """Verify that empty seed_dir raises MAASSeedDirNone."""
my_d = os.path.join(self.tmp, "valid_empty")
self.assertRaises(DataSourceMAAS.MAASSeedDirNone,
DataSourceMAAS.read_maas_seed_dir, my_d)
def test_seed_dir_missing(self):
- """Verify that missing seed_dir raises MAASSeedDirNone"""
- self.assertRaises(DataSourceMAAS.MAASSeedDirNone,
+ """Verify that missing seed_dir raises MAASSeedDirNone."""
+ self.assertRaises(DataSourceMAAS.MAASSeedDirNone,
DataSourceMAAS.read_maas_seed_dir,
os.path.join(self.tmp, "nonexistantdirectory"))
def test_seed_url_valid(self):
- """Verify that valid seed_url is read as such"""
+ """Verify that valid seed_url is read as such."""
valid = {'meta-data/instance-id': 'i-instanceid',
'meta-data/local-hostname': 'test-hostname',
'meta-data/public-keys': 'test-hostname',
@@ -131,11 +129,11 @@ class TestMAASDataSource(MockerTestCase):
valid['meta-data/local-hostname'])
def test_seed_url_invalid(self):
- """Verify that invalid seed_url raises MAASSeedDirMalformed"""
+ """Verify that invalid seed_url raises MAASSeedDirMalformed."""
pass
def test_seed_url_missing(self):
- """Verify seed_url with no found entries raises MAASSeedDirNone"""
+ """Verify seed_url with no found entries raises MAASSeedDirNone."""
pass