diff options
Diffstat (limited to 'tests/unittests/test_datasource')
-rw-r--r-- | tests/unittests/test_datasource/test_altcloud.py | 6 | ||||
-rw-r--r-- | tests/unittests/test_datasource/test_azure.py | 97 | ||||
-rw-r--r-- | tests/unittests/test_datasource/test_configdrive.py | 106 | ||||
-rw-r--r-- | tests/unittests/test_datasource/test_digitalocean.py | 7 | ||||
-rw-r--r-- | tests/unittests/test_datasource/test_gce.py | 4 | ||||
-rw-r--r-- | tests/unittests/test_datasource/test_maas.py | 75 | ||||
-rw-r--r-- | tests/unittests/test_datasource/test_nocloud.py | 54 | ||||
-rw-r--r-- | tests/unittests/test_datasource/test_opennebula.py | 26 | ||||
-rw-r--r-- | tests/unittests/test_datasource/test_openstack.py | 7 | ||||
-rw-r--r-- | tests/unittests/test_datasource/test_smartos.py | 26 |
10 files changed, 229 insertions, 179 deletions
diff --git a/tests/unittests/test_datasource/test_altcloud.py b/tests/unittests/test_datasource/test_altcloud.py index 1a48ee5f..e9cd2fa5 100644 --- a/tests/unittests/test_datasource/test_altcloud.py +++ b/tests/unittests/test_datasource/test_altcloud.py @@ -46,7 +46,7 @@ def _write_cloud_info_file(value): cifile = open(cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE, 'w') cifile.write(value) cifile.close() - os.chmod(cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE, 0664) + os.chmod(cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE, 0o664) def _remove_cloud_info_file(): @@ -67,12 +67,12 @@ def _write_user_data_files(mount_dir, value): udfile = open(deltacloud_user_data_file, 'w') udfile.write(value) udfile.close() - os.chmod(deltacloud_user_data_file, 0664) + os.chmod(deltacloud_user_data_file, 0o664) udfile = open(user_data_file, 'w') udfile.write(value) udfile.close() - os.chmod(user_data_file, 0664) + os.chmod(user_data_file, 0o664) def _remove_user_data_files(mount_dir, diff --git a/tests/unittests/test_datasource/test_azure.py b/tests/unittests/test_datasource/test_azure.py index e992a006..1f0330b3 100644 --- a/tests/unittests/test_datasource/test_azure.py +++ b/tests/unittests/test_datasource/test_azure.py @@ -3,12 +3,30 @@ from cloudinit.util import load_file from cloudinit.sources import DataSourceAzure from ..helpers import populate_dir +try: + from unittest import mock +except ImportError: + import mock +try: + from contextlib import ExitStack +except ImportError: + from contextlib2 import ExitStack + import base64 import crypt -from mocker import MockerTestCase import os import stat import yaml +import shutil +import tempfile +import unittest + + +def b64(source): + # In Python 3, b64encode only accepts bytes and returns bytes. + if not isinstance(source, bytes): + source = source.encode('utf-8') + return base64.b64encode(source).decode('us-ascii') def construct_valid_ovf_env(data=None, pubkeys=None, userdata=None): @@ -40,7 +58,7 @@ def construct_valid_ovf_env(data=None, pubkeys=None, userdata=None): content += "<%s%s>%s</%s>\n" % (key, attrs, val, key) if userdata: - content += "<UserData>%s</UserData>\n" % (base64.b64encode(userdata)) + content += "<UserData>%s</UserData>\n" % (b64(userdata)) if pubkeys: content += "<SSH><PublicKeys>\n" @@ -66,26 +84,24 @@ def construct_valid_ovf_env(data=None, pubkeys=None, userdata=None): return content -class TestAzureDataSource(MockerTestCase): +class TestAzureDataSource(unittest.TestCase): def setUp(self): - # makeDir comes from MockerTestCase - self.tmp = self.makeDir() + self.tmp = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, self.tmp) # patch cloud_dir, so our 'seed_dir' is guaranteed empty self.paths = helpers.Paths({'cloud_dir': self.tmp}) self.waagent_d = os.path.join(self.tmp, 'var', 'lib', 'waagent') - self.unapply = [] - super(TestAzureDataSource, self).setUp() + self.patches = ExitStack() + self.addCleanup(self.patches.close) - def tearDown(self): - apply_patches([i for i in reversed(self.unapply)]) - super(TestAzureDataSource, self).tearDown() + super(TestAzureDataSource, self).setUp() def apply_patches(self, patches): - ret = apply_patches(patches) - self.unapply += ret + for module, name, new in patches: + self.patches.enter_context(mock.patch.object(module, name, new)) def _get_ds(self, data): @@ -117,16 +133,14 @@ class TestAzureDataSource(MockerTestCase): mod = DataSourceAzure mod.BUILTIN_DS_CONFIG['data_dir'] = self.waagent_d - self.apply_patches([(mod, 'list_possible_azure_ds_devs', dsdevs)]) - - self.apply_patches([(mod, 'invoke_agent', _invoke_agent), - (mod, 'wait_for_files', _wait_for_files), - (mod, 'pubkeys_from_crt_files', - _pubkeys_from_crt_files), - (mod, 'iid_from_shared_config', - _iid_from_shared_config), - (mod, 'apply_hostname_bounce', - _apply_hostname_bounce), ]) + self.apply_patches([ + (mod, 'list_possible_azure_ds_devs', dsdevs), + (mod, 'invoke_agent', _invoke_agent), + (mod, 'wait_for_files', _wait_for_files), + (mod, 'pubkeys_from_crt_files', _pubkeys_from_crt_files), + (mod, 'iid_from_shared_config', _iid_from_shared_config), + (mod, 'apply_hostname_bounce', _apply_hostname_bounce), + ]) dsrc = mod.DataSourceAzureNet( data.get('sys_cfg', {}), distro=None, paths=self.paths) @@ -153,7 +167,7 @@ class TestAzureDataSource(MockerTestCase): ret = dsrc.get_data() self.assertTrue(ret) self.assertTrue(os.path.isdir(self.waagent_d)) - self.assertEqual(stat.S_IMODE(os.stat(self.waagent_d).st_mode), 0700) + self.assertEqual(stat.S_IMODE(os.stat(self.waagent_d).st_mode), 0o700) def test_user_cfg_set_agent_command_plain(self): # set dscfg in via plaintext @@ -174,7 +188,7 @@ class TestAzureDataSource(MockerTestCase): # set dscfg in via base64 encoded yaml cfg = {'agent_command': "my_command"} odata = {'HostName': "myhost", 'UserName': "myuser", - 'dscfg': {'text': base64.b64encode(yaml.dump(cfg)), + 'dscfg': {'text': b64(yaml.dump(cfg)), 'encoding': 'base64'}} data = {'ovfcontent': construct_valid_ovf_env(data=odata)} @@ -226,13 +240,13 @@ class TestAzureDataSource(MockerTestCase): def test_userdata_found(self): mydata = "FOOBAR" - odata = {'UserData': base64.b64encode(mydata)} + odata = {'UserData': b64(mydata)} data = {'ovfcontent': construct_valid_ovf_env(data=odata)} dsrc = self._get_ds(data) ret = dsrc.get_data() self.assertTrue(ret) - self.assertEqual(dsrc.userdata_raw, mydata) + self.assertEqual(dsrc.userdata_raw, mydata.encode('utf-8')) def test_no_datasource_expected(self): # no source should be found if no seed_dir and no devs @@ -274,7 +288,7 @@ class TestAzureDataSource(MockerTestCase): 'command': 'my-bounce-command', 'hostname_command': 'my-hostname-command'}} odata = {'HostName': "xhost", - 'dscfg': {'text': base64.b64encode(yaml.dump(cfg)), + 'dscfg': {'text': b64(yaml.dump(cfg)), 'encoding': 'base64'}} data = {'ovfcontent': construct_valid_ovf_env(data=odata)} self._get_ds(data).get_data() @@ -289,7 +303,7 @@ class TestAzureDataSource(MockerTestCase): # config specifying set_hostname off should not bounce cfg = {'set_hostname': False} odata = {'HostName': "xhost", - 'dscfg': {'text': base64.b64encode(yaml.dump(cfg)), + 'dscfg': {'text': b64(yaml.dump(cfg)), 'encoding': 'base64'}} data = {'ovfcontent': construct_valid_ovf_env(data=odata)} self._get_ds(data).get_data() @@ -318,7 +332,7 @@ class TestAzureDataSource(MockerTestCase): # Make sure that user can affect disk aliases dscfg = {'disk_aliases': {'ephemeral0': '/dev/sdc'}} odata = {'HostName': "myhost", 'UserName': "myuser", - 'dscfg': {'text': base64.b64encode(yaml.dump(dscfg)), + 'dscfg': {'text': b64(yaml.dump(dscfg)), 'encoding': 'base64'}} usercfg = {'disk_setup': {'/dev/sdc': {'something': '...'}, 'ephemeral0': False}} @@ -340,7 +354,7 @@ class TestAzureDataSource(MockerTestCase): dsrc = self._get_ds(data) dsrc.get_data() - self.assertEqual(userdata, dsrc.userdata_raw) + self.assertEqual(userdata.encode('us-ascii'), dsrc.userdata_raw) def test_ovf_env_arrives_in_waagent_dir(self): xml = construct_valid_ovf_env(data={}, userdata="FOODATA") @@ -355,7 +369,7 @@ class TestAzureDataSource(MockerTestCase): def test_existing_ovf_same(self): # waagent/SharedConfig left alone if found ovf-env.xml same as cached - odata = {'UserData': base64.b64encode("SOMEUSERDATA")} + odata = {'UserData': b64("SOMEUSERDATA")} data = {'ovfcontent': construct_valid_ovf_env(data=odata)} populate_dir(self.waagent_d, @@ -379,9 +393,9 @@ class TestAzureDataSource(MockerTestCase): # 'get_data' should remove SharedConfig.xml in /var/lib/waagent # if ovf-env.xml differs. cached_ovfenv = construct_valid_ovf_env( - {'userdata': base64.b64encode("FOO_USERDATA")}) + {'userdata': b64("FOO_USERDATA")}) new_ovfenv = construct_valid_ovf_env( - {'userdata': base64.b64encode("NEW_USERDATA")}) + {'userdata': b64("NEW_USERDATA")}) populate_dir(self.waagent_d, {'ovf-env.xml': cached_ovfenv, @@ -391,7 +405,7 @@ class TestAzureDataSource(MockerTestCase): dsrc = self._get_ds({'ovfcontent': new_ovfenv}) ret = dsrc.get_data() self.assertTrue(ret) - self.assertEqual(dsrc.userdata_raw, "NEW_USERDATA") + self.assertEqual(dsrc.userdata_raw, b"NEW_USERDATA") self.assertTrue(os.path.exists( os.path.join(self.waagent_d, 'otherfile'))) self.assertFalse( @@ -402,7 +416,7 @@ class TestAzureDataSource(MockerTestCase): load_file(os.path.join(self.waagent_d, 'ovf-env.xml'))) -class TestReadAzureOvf(MockerTestCase): +class TestReadAzureOvf(unittest.TestCase): def test_invalid_xml_raises_non_azure_ds(self): invalid_xml = "<foo>" + construct_valid_ovf_env(data={}) self.assertRaises(DataSourceAzure.BrokenAzureDataSource, @@ -417,7 +431,7 @@ class TestReadAzureOvf(MockerTestCase): self.assertIn(mypk, cfg['_pubkeys']) -class TestReadAzureSharedConfig(MockerTestCase): +class TestReadAzureSharedConfig(unittest.TestCase): def test_valid_content(self): xml = """<?xml version="1.0" encoding="utf-8"?> <SharedConfig> @@ -429,14 +443,3 @@ class TestReadAzureSharedConfig(MockerTestCase): </SharedConfig>""" ret = DataSourceAzure.iid_from_shared_config_content(xml) self.assertEqual("MY_INSTANCE_ID", ret) - - -def apply_patches(patches): - ret = [] - for (ref, name, replace) in patches: - if replace is None: - continue - orig = getattr(ref, name) - setattr(ref, name, replace) - ret.append((ref, name, orig)) - return ret diff --git a/tests/unittests/test_datasource/test_configdrive.py b/tests/unittests/test_datasource/test_configdrive.py index d88066e5..258c68e2 100644 --- a/tests/unittests/test_datasource/test_configdrive.py +++ b/tests/unittests/test_datasource/test_configdrive.py @@ -1,10 +1,18 @@ from copy import copy import json import os -import os.path - -import mocker -from mocker import MockerTestCase +import shutil +import tempfile +import unittest + +try: + from unittest import mock +except ImportError: + import mock +try: + from contextlib import ExitStack +except ImportError: + from contextlib2 import ExitStack from cloudinit import helpers from cloudinit import settings @@ -12,8 +20,6 @@ from cloudinit.sources import DataSourceConfigDrive as ds from cloudinit.sources.helpers import openstack from cloudinit import util -from .. import helpers as unit_helpers - PUBKEY = u'ssh-rsa AAAAB3NzaC1....sIkJhq8wdX+4I3A4cYbYP ubuntu@server-460\n' EC2_META = { 'ami-id': 'ami-00000001', @@ -64,11 +70,12 @@ CFG_DRIVE_FILES_V2 = { 'openstack/latest/user_data': USER_DATA} -class TestConfigDriveDataSource(MockerTestCase): +class TestConfigDriveDataSource(unittest.TestCase): def setUp(self): super(TestConfigDriveDataSource, self).setUp() - self.tmp = self.makeDir() + self.tmp = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, self.tmp) def test_ec2_metadata(self): populate_dir(self.tmp, CFG_DRIVE_FILES_V2) @@ -91,23 +98,28 @@ class TestConfigDriveDataSource(MockerTestCase): 'swap': '/dev/vda3', } for name, dev_name in name_tests.items(): - with unit_helpers.mocker() as my_mock: - find_mock = my_mock.replace(util.find_devs_with, - spec=False, passthrough=False) + with ExitStack() as mocks: provided_name = dev_name[len('/dev/'):] provided_name = "s" + provided_name[1:] - find_mock(mocker.ARGS) - my_mock.result([provided_name]) - exists_mock = my_mock.replace(os.path.exists, - spec=False, passthrough=False) - exists_mock(mocker.ARGS) - my_mock.result(False) - exists_mock(mocker.ARGS) - my_mock.result(True) - my_mock.replay() + find_mock = mocks.enter_context( + mock.patch.object(util, 'find_devs_with', + return_value=[provided_name])) + # We want os.path.exists() to return False on its first call, + # and True on its second call. We use a handy generator as + # the mock side effect for this. The mocked function returns + # what the side effect returns. + def exists_side_effect(): + yield False + yield True + exists_mock = mocks.enter_context( + mock.patch.object(os.path, 'exists', + side_effect=exists_side_effect())) device = cfg_ds.device_name_to_device(name) self.assertEquals(dev_name, device) + find_mock.assert_called_once_with(mock.ANY) + self.assertEqual(exists_mock.call_count, 2) + def test_dev_os_map(self): populate_dir(self.tmp, CFG_DRIVE_FILES_V2) cfg_ds = ds.DataSourceConfigDrive(settings.CFG_BUILTIN, @@ -123,19 +135,19 @@ class TestConfigDriveDataSource(MockerTestCase): 'swap': '/dev/vda3', } for name, dev_name in name_tests.items(): - with unit_helpers.mocker() as my_mock: - find_mock = my_mock.replace(util.find_devs_with, - spec=False, passthrough=False) - find_mock(mocker.ARGS) - my_mock.result([dev_name]) - exists_mock = my_mock.replace(os.path.exists, - spec=False, passthrough=False) - exists_mock(mocker.ARGS) - my_mock.result(True) - my_mock.replay() + with ExitStack() as mocks: + find_mock = mocks.enter_context( + mock.patch.object(util, 'find_devs_with', + return_value=[dev_name])) + exists_mock = mocks.enter_context( + mock.patch.object(os.path, 'exists', + return_value=True)) device = cfg_ds.device_name_to_device(name) self.assertEquals(dev_name, device) + find_mock.assert_called_once_with(mock.ANY) + exists_mock.assert_called_once_with(mock.ANY) + def test_dev_ec2_remap(self): populate_dir(self.tmp, CFG_DRIVE_FILES_V2) cfg_ds = ds.DataSourceConfigDrive(settings.CFG_BUILTIN, @@ -156,16 +168,21 @@ class TestConfigDriveDataSource(MockerTestCase): 'root2k': None, } for name, dev_name in name_tests.items(): - with unit_helpers.mocker(verify_calls=False) as my_mock: - exists_mock = my_mock.replace(os.path.exists, - spec=False, passthrough=False) - exists_mock(mocker.ARGS) - my_mock.result(False) - exists_mock(mocker.ARGS) - my_mock.result(True) - my_mock.replay() + # We want os.path.exists() to return False on its first call, + # and True on its second call. We use a handy generator as + # the mock side effect for this. The mocked function returns + # what the side effect returns. + def exists_side_effect(): + yield False + yield True + with mock.patch.object(os.path, 'exists', + side_effect=exists_side_effect()): device = cfg_ds.device_name_to_device(name) self.assertEquals(dev_name, device) + # We don't assert the call count for os.path.exists() because + # not all of the entries in name_tests results in two calls to + # that function. Specifically, 'root2k' doesn't seem to call + # it at all. def test_dev_ec2_map(self): populate_dir(self.tmp, CFG_DRIVE_FILES_V2) @@ -173,12 +190,6 @@ class TestConfigDriveDataSource(MockerTestCase): None, helpers.Paths({})) found = ds.read_config_drive(self.tmp) - exists_mock = self.mocker.replace(os.path.exists, - spec=False, passthrough=False) - exists_mock(mocker.ARGS) - self.mocker.count(0, None) - self.mocker.result(True) - self.mocker.replay() ec2_md = found['ec2-metadata'] os_md = found['metadata'] cfg_ds.ec2_metadata = ec2_md @@ -193,8 +204,9 @@ class TestConfigDriveDataSource(MockerTestCase): 'root2k': None, } for name, dev_name in name_tests.items(): - device = cfg_ds.device_name_to_device(name) - self.assertEquals(dev_name, device) + with mock.patch.object(os.path, 'exists', return_value=True): + device = cfg_ds.device_name_to_device(name) + self.assertEquals(dev_name, device) def test_dir_valid(self): """Verify a dir is read as such.""" @@ -326,7 +338,7 @@ def populate_ds_from_read_config(cfg_ds, source, results): def populate_dir(seed_dir, files): - for (name, content) in files.iteritems(): + for (name, content) in files.items(): path = os.path.join(seed_dir, name) dirname = os.path.dirname(path) if not os.path.isdir(dirname): diff --git a/tests/unittests/test_datasource/test_digitalocean.py b/tests/unittests/test_datasource/test_digitalocean.py index d1270fc2..98f9cfac 100644 --- a/tests/unittests/test_datasource/test_digitalocean.py +++ b/tests/unittests/test_datasource/test_digitalocean.py @@ -18,8 +18,7 @@ import httpretty import re -from types import ListType -from urlparse import urlparse +from six.moves.urllib_parse import urlparse from cloudinit import settings from cloudinit import helpers @@ -110,7 +109,7 @@ class TestDataSourceDigitalOcean(test_helpers.HttprettyTestCase): self.assertEqual([DO_META.get('public-keys')], self.ds.get_public_ssh_keys()) - self.assertIs(type(self.ds.get_public_ssh_keys()), ListType) + self.assertIsInstance(self.ds.get_public_ssh_keys(), list) @httpretty.activate def test_multiple_ssh_keys(self): @@ -124,4 +123,4 @@ class TestDataSourceDigitalOcean(test_helpers.HttprettyTestCase): self.assertEqual(DO_META.get('public-keys').splitlines(), self.ds.get_public_ssh_keys()) - self.assertIs(type(self.ds.get_public_ssh_keys()), ListType) + self.assertIsInstance(self.ds.get_public_ssh_keys(), list) diff --git a/tests/unittests/test_datasource/test_gce.py b/tests/unittests/test_datasource/test_gce.py index 06050bb1..6dd4b5ed 100644 --- a/tests/unittests/test_datasource/test_gce.py +++ b/tests/unittests/test_datasource/test_gce.py @@ -19,7 +19,7 @@ import httpretty import re from base64 import b64encode, b64decode -from urlparse import urlparse +from six.moves.urllib_parse import urlparse from cloudinit import settings from cloudinit import helpers @@ -45,7 +45,7 @@ GCE_META_ENCODING = { 'instance/id': '12345', 'instance/hostname': 'server.project-baz.local', 'instance/zone': 'baz/bang', - 'instance/attributes/user-data': b64encode('/bin/echo baz\n'), + 'instance/attributes/user-data': b64encode(b'/bin/echo baz\n'), 'instance/attributes/user-data-encoding': 'base64', } diff --git a/tests/unittests/test_datasource/test_maas.py b/tests/unittests/test_datasource/test_maas.py index c157beb8..6af0cd82 100644 --- a/tests/unittests/test_datasource/test_maas.py +++ b/tests/unittests/test_datasource/test_maas.py @@ -1,19 +1,26 @@ from copy import copy import os +import shutil +import tempfile +import unittest from cloudinit.sources import DataSourceMAAS from cloudinit import url_helper from ..helpers import populate_dir -import mocker +try: + from unittest import mock +except ImportError: + import mock -class TestMAASDataSource(mocker.MockerTestCase): +class TestMAASDataSource(unittest.TestCase): def setUp(self): super(TestMAASDataSource, self).setUp() # Make a temp directoy for tests to use. - self.tmp = self.makeDir() + self.tmp = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, self.tmp) def test_seed_dir_valid(self): """Verify a valid seeddir is read as such.""" @@ -93,16 +100,18 @@ class TestMAASDataSource(mocker.MockerTestCase): def test_seed_url_valid(self): """Verify that valid seed_url is read as such.""" - valid = {'meta-data/instance-id': 'i-instanceid', + valid = { + 'meta-data/instance-id': 'i-instanceid', 'meta-data/local-hostname': 'test-hostname', 'meta-data/public-keys': 'test-hostname', - 'user-data': 'foodata'} + 'user-data': 'foodata', + } valid_order = [ 'meta-data/local-hostname', 'meta-data/instance-id', 'meta-data/public-keys', 'user-data', - ] + ] my_seed = "http://example.com/xmeta" my_ver = "1999-99-99" my_headers = {'header1': 'value1', 'header2': 'value2'} @@ -110,28 +119,38 @@ class TestMAASDataSource(mocker.MockerTestCase): def my_headers_cb(url): return my_headers - mock_request = self.mocker.replace(url_helper.readurl, - passthrough=False) - - for key in valid_order: - url = "%s/%s/%s" % (my_seed, my_ver, key) - mock_request(url, headers=None, timeout=mocker.ANY, - data=mocker.ANY, sec_between=mocker.ANY, - ssl_details=mocker.ANY, retries=mocker.ANY, - headers_cb=my_headers_cb, - exception_cb=mocker.ANY) - resp = valid.get(key) - self.mocker.result(url_helper.StringResponse(resp)) - self.mocker.replay() - - (userdata, metadata) = DataSourceMAAS.read_maas_seed_url(my_seed, - header_cb=my_headers_cb, version=my_ver) - - self.assertEqual("foodata", userdata) - self.assertEqual(metadata['instance-id'], - valid['meta-data/instance-id']) - self.assertEqual(metadata['local-hostname'], - valid['meta-data/local-hostname']) + # Each time url_helper.readurl() is called, something different is + # returned based on the canned data above. We need to build up a list + # of side effect return values, which the mock will return. At the + # same time, we'll build up a list of expected call arguments for + # asserting after the code under test is run. + calls = [] + + def side_effect(): + for key in valid_order: + resp = valid.get(key) + url = "%s/%s/%s" % (my_seed, my_ver, key) + calls.append( + mock.call(url, headers=None, timeout=mock.ANY, + data=mock.ANY, sec_between=mock.ANY, + ssl_details=mock.ANY, retries=mock.ANY, + headers_cb=my_headers_cb, + exception_cb=mock.ANY)) + yield url_helper.StringResponse(resp) + + # Now do the actual call of the code under test. + with mock.patch.object(url_helper, 'readurl', + side_effect=side_effect()) as mockobj: + userdata, metadata = DataSourceMAAS.read_maas_seed_url( + my_seed, header_cb=my_headers_cb, version=my_ver) + + self.assertEqual("foodata", userdata) + self.assertEqual(metadata['instance-id'], + valid['meta-data/instance-id']) + self.assertEqual(metadata['local-hostname'], + valid['meta-data/local-hostname']) + + mockobj.has_calls(calls) def test_seed_url_invalid(self): """Verify that invalid seed_url raises MAASSeedDirMalformed.""" diff --git a/tests/unittests/test_datasource/test_nocloud.py b/tests/unittests/test_datasource/test_nocloud.py index e9235951..480a4012 100644 --- a/tests/unittests/test_datasource/test_nocloud.py +++ b/tests/unittests/test_datasource/test_nocloud.py @@ -3,33 +3,38 @@ from cloudinit.sources import DataSourceNoCloud from cloudinit import util from ..helpers import populate_dir -from mocker import MockerTestCase import os import yaml +import shutil +import tempfile +import unittest +try: + from unittest import mock +except ImportError: + import mock +try: + from contextlib import ExitStack +except ImportError: + from contextlib2 import ExitStack -class TestNoCloudDataSource(MockerTestCase): + +class TestNoCloudDataSource(unittest.TestCase): def setUp(self): - self.tmp = self.makeDir() + self.tmp = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, self.tmp) self.paths = helpers.Paths({'cloud_dir': self.tmp}) self.cmdline = "root=TESTCMDLINE" - self.unapply = [] - self.apply_patches([(util, 'get_cmdline', self._getcmdline)]) - super(TestNoCloudDataSource, self).setUp() - - def tearDown(self): - apply_patches([i for i in reversed(self.unapply)]) - super(TestNoCloudDataSource, self).tearDown() + self.mocks = ExitStack() + self.addCleanup(self.mocks.close) - def apply_patches(self, patches): - ret = apply_patches(patches) - self.unapply += ret + self.mocks.enter_context( + mock.patch.object(util, 'get_cmdline', return_value=self.cmdline)) - def _getcmdline(self): - return self.cmdline + super(TestNoCloudDataSource, self).setUp() def test_nocloud_seed_dir(self): md = {'instance-id': 'IID', 'dsmode': 'local'} @@ -59,7 +64,9 @@ class TestNoCloudDataSource(MockerTestCase): def my_find_devs_with(*args, **kwargs): raise PsuedoException - self.apply_patches([(util, 'find_devs_with', my_find_devs_with)]) + self.mocks.enter_context( + mock.patch.object(util, 'find_devs_with', + side_effect=PsuedoException)) # by default, NoCloud should search for filesystems by label sys_cfg = {'datasource': {'NoCloud': {}}} @@ -85,7 +92,7 @@ class TestNoCloudDataSource(MockerTestCase): data = { 'fs_label': None, - 'meta-data': {'instance-id': 'IID'}, + 'meta-data': yaml.safe_dump({'instance-id': 'IID'}), 'user-data': "USER_DATA_RAW", } @@ -133,7 +140,7 @@ class TestNoCloudDataSource(MockerTestCase): self.assertTrue(ret) -class TestParseCommandLineData(MockerTestCase): +class TestParseCommandLineData(unittest.TestCase): def test_parse_cmdline_data_valid(self): ds_id = "ds=nocloud" @@ -178,15 +185,4 @@ class TestParseCommandLineData(MockerTestCase): self.assertFalse(ret) -def apply_patches(patches): - ret = [] - for (ref, name, replace) in patches: - if replace is None: - continue - orig = getattr(ref, name) - setattr(ref, name, replace) - ret.append((ref, name, orig)) - return ret - - # vi: ts=4 expandtab diff --git a/tests/unittests/test_datasource/test_opennebula.py b/tests/unittests/test_datasource/test_opennebula.py index b4fd1f4d..ef534bab 100644 --- a/tests/unittests/test_datasource/test_opennebula.py +++ b/tests/unittests/test_datasource/test_opennebula.py @@ -1,12 +1,21 @@ from cloudinit import helpers from cloudinit.sources import DataSourceOpenNebula as ds from cloudinit import util -from mocker import MockerTestCase from ..helpers import populate_dir from base64 import b64encode import os import pwd +import shutil +import tempfile +import unittest + +def b64(source): + # In Python 3, b64encode only accepts bytes and returns bytes. + if not isinstance(source, bytes): + source = source.encode('utf-8') + return b64encode(source).decode('us-ascii') + TEST_VARS = { 'VAR1': 'single', @@ -37,12 +46,13 @@ CMD_IP_OUT = '''\ ''' -class TestOpenNebulaDataSource(MockerTestCase): +class TestOpenNebulaDataSource(unittest.TestCase): parsed_user = None def setUp(self): super(TestOpenNebulaDataSource, self).setUp() - self.tmp = self.makeDir() + self.tmp = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, self.tmp) self.paths = helpers.Paths({'cloud_dir': self.tmp}) # defaults for few tests @@ -176,7 +186,7 @@ class TestOpenNebulaDataSource(MockerTestCase): self.assertEqual(USER_DATA, results['userdata']) def test_user_data_encoding_required_for_decode(self): - b64userdata = b64encode(USER_DATA) + b64userdata = b64(USER_DATA) for k in ('USER_DATA', 'USERDATA'): my_d = os.path.join(self.tmp, k) populate_context_dir(my_d, {k: b64userdata}) @@ -188,7 +198,7 @@ class TestOpenNebulaDataSource(MockerTestCase): def test_user_data_base64_encoding(self): for k in ('USER_DATA', 'USERDATA'): my_d = os.path.join(self.tmp, k) - populate_context_dir(my_d, {k: b64encode(USER_DATA), + populate_context_dir(my_d, {k: b64(USER_DATA), 'USERDATA_ENCODING': 'base64'}) results = ds.read_context_disk_dir(my_d) @@ -228,7 +238,7 @@ class TestOpenNebulaDataSource(MockerTestCase): util.find_devs_with = orig_find_devs_with -class TestOpenNebulaNetwork(MockerTestCase): +class TestOpenNebulaNetwork(unittest.TestCase): def setUp(self): super(TestOpenNebulaNetwork, self).setUp() @@ -280,7 +290,7 @@ iface eth0 inet static ''') -class TestParseShellConfig(MockerTestCase): +class TestParseShellConfig(unittest.TestCase): def test_no_seconds(self): cfg = '\n'.join(["foo=bar", "SECONDS=2", "xx=foo"]) # we could test 'sleep 2', but that would make the test run slower. @@ -290,7 +300,7 @@ class TestParseShellConfig(MockerTestCase): def populate_context_dir(path, variables): data = "# Context variables generated by OpenNebula\n" - for (k, v) in variables.iteritems(): + for k, v in variables.items(): data += ("%s='%s'\n" % (k.upper(), v.replace(r"'", r"'\''"))) populate_dir(path, {'context.sh': data}) diff --git a/tests/unittests/test_datasource/test_openstack.py b/tests/unittests/test_datasource/test_openstack.py index 49894e51..81ef1546 100644 --- a/tests/unittests/test_datasource/test_openstack.py +++ b/tests/unittests/test_datasource/test_openstack.py @@ -20,12 +20,11 @@ import copy import json import re -from StringIO import StringIO - -from urlparse import urlparse - from .. import helpers as test_helpers +from six import StringIO +from six.moves.urllib.parse import urlparse + from cloudinit import helpers from cloudinit import settings from cloudinit.sources import DataSourceOpenStack as ds diff --git a/tests/unittests/test_datasource/test_smartos.py b/tests/unittests/test_datasource/test_smartos.py index 65675106..2fb9e1b6 100644 --- a/tests/unittests/test_datasource/test_smartos.py +++ b/tests/unittests/test_datasource/test_smartos.py @@ -22,6 +22,8 @@ # return responses. # +from __future__ import print_function + import base64 from cloudinit import helpers as c_helpers from cloudinit.sources import DataSourceSmartOS @@ -29,9 +31,18 @@ from .. import helpers import os import os.path import re +import shutil +import tempfile import stat import uuid +def b64(source): + # In Python 3, b64encode only accepts bytes and returns bytes. + if not isinstance(source, bytes): + source = source.encode('utf-8') + return base64.b64encode(source).decode('us-ascii') + + MOCK_RETURNS = { 'hostname': 'test-host', 'root_authorized_keys': 'ssh-rsa AAAAB3Nz...aC1yc2E= keyname', @@ -109,9 +120,10 @@ class TestSmartOSDataSource(helpers.FilesystemMockingTestCase): def setUp(self): helpers.FilesystemMockingTestCase.setUp(self) - # makeDir comes from MockerTestCase - self.tmp = self.makeDir() - self.legacy_user_d = self.makeDir() + self.tmp = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, self.tmp) + self.legacy_user_d = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, self.legacy_user_d) # If you should want to watch the logs... self._log = None @@ -227,7 +239,7 @@ class TestSmartOSDataSource(helpers.FilesystemMockingTestCase): my_returns = MOCK_RETURNS.copy() my_returns['base64_all'] = "true" for k in ('hostname', 'cloud-init:user-data'): - my_returns[k] = base64.b64encode(my_returns[k]) + my_returns[k] = b64(my_returns[k]) dsrc = self._get_ds(mockdata=my_returns) ret = dsrc.get_data() @@ -248,7 +260,7 @@ class TestSmartOSDataSource(helpers.FilesystemMockingTestCase): my_returns['b64-cloud-init:user-data'] = "true" my_returns['b64-hostname'] = "true" for k in ('hostname', 'cloud-init:user-data'): - my_returns[k] = base64.b64encode(my_returns[k]) + my_returns[k] = b64(my_returns[k]) dsrc = self._get_ds(mockdata=my_returns) ret = dsrc.get_data() @@ -264,7 +276,7 @@ class TestSmartOSDataSource(helpers.FilesystemMockingTestCase): my_returns = MOCK_RETURNS.copy() my_returns['base64_keys'] = 'hostname,ignored' for k in ('hostname',): - my_returns[k] = base64.b64encode(my_returns[k]) + my_returns[k] = b64(my_returns[k]) dsrc = self._get_ds(mockdata=my_returns) ret = dsrc.get_data() @@ -365,7 +377,7 @@ class TestSmartOSDataSource(helpers.FilesystemMockingTestCase): permissions = oct(os.stat(name_f)[stat.ST_MODE])[-3:] if re.match(r'.*\/mdata-user-data$', name_f): found_new = True - print name_f + print(name_f) self.assertEquals(permissions, '400') self.assertFalse(found_new) |