summaryrefslogtreecommitdiff
path: root/tests/unittests/test_datasource
diff options
context:
space:
mode:
Diffstat (limited to 'tests/unittests/test_datasource')
-rw-r--r--tests/unittests/test_datasource/test_altcloud.py6
-rw-r--r--tests/unittests/test_datasource/test_azure.py102
-rw-r--r--tests/unittests/test_datasource/test_cloudsigma.py1
-rw-r--r--tests/unittests/test_datasource/test_configdrive.py107
-rw-r--r--tests/unittests/test_datasource/test_digitalocean.py7
-rw-r--r--tests/unittests/test_datasource/test_gce.py4
-rw-r--r--tests/unittests/test_datasource/test_maas.py76
-rw-r--r--tests/unittests/test_datasource/test_nocloud.py57
-rw-r--r--tests/unittests/test_datasource/test_opennebula.py23
-rw-r--r--tests/unittests/test_datasource/test_openstack.py7
-rw-r--r--tests/unittests/test_datasource/test_smartos.py24
11 files changed, 226 insertions, 188 deletions
diff --git a/tests/unittests/test_datasource/test_altcloud.py b/tests/unittests/test_datasource/test_altcloud.py
index 1a48ee5f..e9cd2fa5 100644
--- a/tests/unittests/test_datasource/test_altcloud.py
+++ b/tests/unittests/test_datasource/test_altcloud.py
@@ -46,7 +46,7 @@ def _write_cloud_info_file(value):
cifile = open(cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE, 'w')
cifile.write(value)
cifile.close()
- os.chmod(cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE, 0664)
+ os.chmod(cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE, 0o664)
def _remove_cloud_info_file():
@@ -67,12 +67,12 @@ def _write_user_data_files(mount_dir, value):
udfile = open(deltacloud_user_data_file, 'w')
udfile.write(value)
udfile.close()
- os.chmod(deltacloud_user_data_file, 0664)
+ os.chmod(deltacloud_user_data_file, 0o664)
udfile = open(user_data_file, 'w')
udfile.write(value)
udfile.close()
- os.chmod(user_data_file, 0664)
+ os.chmod(user_data_file, 0o664)
def _remove_user_data_files(mount_dir,
diff --git a/tests/unittests/test_datasource/test_azure.py b/tests/unittests/test_datasource/test_azure.py
index e992a006..38d70fcd 100644
--- a/tests/unittests/test_datasource/test_azure.py
+++ b/tests/unittests/test_datasource/test_azure.py
@@ -1,14 +1,24 @@
from cloudinit import helpers
-from cloudinit.util import load_file
+from cloudinit.util import b64e, load_file
from cloudinit.sources import DataSourceAzure
-from ..helpers import populate_dir
+from ..helpers import TestCase, populate_dir
+
+try:
+ from unittest import mock
+except ImportError:
+ import mock
+try:
+ from contextlib import ExitStack
+except ImportError:
+ from contextlib2 import ExitStack
-import base64
import crypt
-from mocker import MockerTestCase
import os
import stat
import yaml
+import shutil
+import tempfile
+import unittest
def construct_valid_ovf_env(data=None, pubkeys=None, userdata=None):
@@ -40,7 +50,7 @@ def construct_valid_ovf_env(data=None, pubkeys=None, userdata=None):
content += "<%s%s>%s</%s>\n" % (key, attrs, val, key)
if userdata:
- content += "<UserData>%s</UserData>\n" % (base64.b64encode(userdata))
+ content += "<UserData>%s</UserData>\n" % (b64e(userdata))
if pubkeys:
content += "<SSH><PublicKeys>\n"
@@ -66,26 +76,25 @@ def construct_valid_ovf_env(data=None, pubkeys=None, userdata=None):
return content
-class TestAzureDataSource(MockerTestCase):
+class TestAzureDataSource(TestCase):
def setUp(self):
- # makeDir comes from MockerTestCase
- self.tmp = self.makeDir()
+ super(TestAzureDataSource, self).setUp()
+ self.tmp = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, self.tmp)
# patch cloud_dir, so our 'seed_dir' is guaranteed empty
self.paths = helpers.Paths({'cloud_dir': self.tmp})
self.waagent_d = os.path.join(self.tmp, 'var', 'lib', 'waagent')
- self.unapply = []
- super(TestAzureDataSource, self).setUp()
+ self.patches = ExitStack()
+ self.addCleanup(self.patches.close)
- def tearDown(self):
- apply_patches([i for i in reversed(self.unapply)])
- super(TestAzureDataSource, self).tearDown()
+ super(TestAzureDataSource, self).setUp()
def apply_patches(self, patches):
- ret = apply_patches(patches)
- self.unapply += ret
+ for module, name, new in patches:
+ self.patches.enter_context(mock.patch.object(module, name, new))
def _get_ds(self, data):
@@ -117,16 +126,14 @@ class TestAzureDataSource(MockerTestCase):
mod = DataSourceAzure
mod.BUILTIN_DS_CONFIG['data_dir'] = self.waagent_d
- self.apply_patches([(mod, 'list_possible_azure_ds_devs', dsdevs)])
-
- self.apply_patches([(mod, 'invoke_agent', _invoke_agent),
- (mod, 'wait_for_files', _wait_for_files),
- (mod, 'pubkeys_from_crt_files',
- _pubkeys_from_crt_files),
- (mod, 'iid_from_shared_config',
- _iid_from_shared_config),
- (mod, 'apply_hostname_bounce',
- _apply_hostname_bounce), ])
+ self.apply_patches([
+ (mod, 'list_possible_azure_ds_devs', dsdevs),
+ (mod, 'invoke_agent', _invoke_agent),
+ (mod, 'wait_for_files', _wait_for_files),
+ (mod, 'pubkeys_from_crt_files', _pubkeys_from_crt_files),
+ (mod, 'iid_from_shared_config', _iid_from_shared_config),
+ (mod, 'apply_hostname_bounce', _apply_hostname_bounce),
+ ])
dsrc = mod.DataSourceAzureNet(
data.get('sys_cfg', {}), distro=None, paths=self.paths)
@@ -153,7 +160,7 @@ class TestAzureDataSource(MockerTestCase):
ret = dsrc.get_data()
self.assertTrue(ret)
self.assertTrue(os.path.isdir(self.waagent_d))
- self.assertEqual(stat.S_IMODE(os.stat(self.waagent_d).st_mode), 0700)
+ self.assertEqual(stat.S_IMODE(os.stat(self.waagent_d).st_mode), 0o700)
def test_user_cfg_set_agent_command_plain(self):
# set dscfg in via plaintext
@@ -174,7 +181,7 @@ class TestAzureDataSource(MockerTestCase):
# set dscfg in via base64 encoded yaml
cfg = {'agent_command': "my_command"}
odata = {'HostName': "myhost", 'UserName': "myuser",
- 'dscfg': {'text': base64.b64encode(yaml.dump(cfg)),
+ 'dscfg': {'text': b64e(yaml.dump(cfg)),
'encoding': 'base64'}}
data = {'ovfcontent': construct_valid_ovf_env(data=odata)}
@@ -226,13 +233,13 @@ class TestAzureDataSource(MockerTestCase):
def test_userdata_found(self):
mydata = "FOOBAR"
- odata = {'UserData': base64.b64encode(mydata)}
+ odata = {'UserData': b64e(mydata)}
data = {'ovfcontent': construct_valid_ovf_env(data=odata)}
dsrc = self._get_ds(data)
ret = dsrc.get_data()
self.assertTrue(ret)
- self.assertEqual(dsrc.userdata_raw, mydata)
+ self.assertEqual(dsrc.userdata_raw, mydata.encode('utf-8'))
def test_no_datasource_expected(self):
# no source should be found if no seed_dir and no devs
@@ -274,7 +281,7 @@ class TestAzureDataSource(MockerTestCase):
'command': 'my-bounce-command',
'hostname_command': 'my-hostname-command'}}
odata = {'HostName': "xhost",
- 'dscfg': {'text': base64.b64encode(yaml.dump(cfg)),
+ 'dscfg': {'text': b64e(yaml.dump(cfg)),
'encoding': 'base64'}}
data = {'ovfcontent': construct_valid_ovf_env(data=odata)}
self._get_ds(data).get_data()
@@ -289,7 +296,7 @@ class TestAzureDataSource(MockerTestCase):
# config specifying set_hostname off should not bounce
cfg = {'set_hostname': False}
odata = {'HostName': "xhost",
- 'dscfg': {'text': base64.b64encode(yaml.dump(cfg)),
+ 'dscfg': {'text': b64e(yaml.dump(cfg)),
'encoding': 'base64'}}
data = {'ovfcontent': construct_valid_ovf_env(data=odata)}
self._get_ds(data).get_data()
@@ -318,7 +325,7 @@ class TestAzureDataSource(MockerTestCase):
# Make sure that user can affect disk aliases
dscfg = {'disk_aliases': {'ephemeral0': '/dev/sdc'}}
odata = {'HostName': "myhost", 'UserName': "myuser",
- 'dscfg': {'text': base64.b64encode(yaml.dump(dscfg)),
+ 'dscfg': {'text': b64e(yaml.dump(dscfg)),
'encoding': 'base64'}}
usercfg = {'disk_setup': {'/dev/sdc': {'something': '...'},
'ephemeral0': False}}
@@ -340,7 +347,7 @@ class TestAzureDataSource(MockerTestCase):
dsrc = self._get_ds(data)
dsrc.get_data()
- self.assertEqual(userdata, dsrc.userdata_raw)
+ self.assertEqual(userdata.encode('us-ascii'), dsrc.userdata_raw)
def test_ovf_env_arrives_in_waagent_dir(self):
xml = construct_valid_ovf_env(data={}, userdata="FOODATA")
@@ -353,9 +360,15 @@ class TestAzureDataSource(MockerTestCase):
self.assertTrue(os.path.exists(ovf_env_path))
self.assertEqual(xml, load_file(ovf_env_path))
+ def test_ovf_can_include_unicode(self):
+ xml = construct_valid_ovf_env(data={})
+ xml = u'\ufeff{0}'.format(xml)
+ dsrc = self._get_ds({'ovfcontent': xml})
+ dsrc.get_data()
+
def test_existing_ovf_same(self):
# waagent/SharedConfig left alone if found ovf-env.xml same as cached
- odata = {'UserData': base64.b64encode("SOMEUSERDATA")}
+ odata = {'UserData': b64e("SOMEUSERDATA")}
data = {'ovfcontent': construct_valid_ovf_env(data=odata)}
populate_dir(self.waagent_d,
@@ -379,9 +392,9 @@ class TestAzureDataSource(MockerTestCase):
# 'get_data' should remove SharedConfig.xml in /var/lib/waagent
# if ovf-env.xml differs.
cached_ovfenv = construct_valid_ovf_env(
- {'userdata': base64.b64encode("FOO_USERDATA")})
+ {'userdata': b64e("FOO_USERDATA")})
new_ovfenv = construct_valid_ovf_env(
- {'userdata': base64.b64encode("NEW_USERDATA")})
+ {'userdata': b64e("NEW_USERDATA")})
populate_dir(self.waagent_d,
{'ovf-env.xml': cached_ovfenv,
@@ -391,7 +404,7 @@ class TestAzureDataSource(MockerTestCase):
dsrc = self._get_ds({'ovfcontent': new_ovfenv})
ret = dsrc.get_data()
self.assertTrue(ret)
- self.assertEqual(dsrc.userdata_raw, "NEW_USERDATA")
+ self.assertEqual(dsrc.userdata_raw, b"NEW_USERDATA")
self.assertTrue(os.path.exists(
os.path.join(self.waagent_d, 'otherfile')))
self.assertFalse(
@@ -402,7 +415,7 @@ class TestAzureDataSource(MockerTestCase):
load_file(os.path.join(self.waagent_d, 'ovf-env.xml')))
-class TestReadAzureOvf(MockerTestCase):
+class TestReadAzureOvf(TestCase):
def test_invalid_xml_raises_non_azure_ds(self):
invalid_xml = "<foo>" + construct_valid_ovf_env(data={})
self.assertRaises(DataSourceAzure.BrokenAzureDataSource,
@@ -417,7 +430,7 @@ class TestReadAzureOvf(MockerTestCase):
self.assertIn(mypk, cfg['_pubkeys'])
-class TestReadAzureSharedConfig(MockerTestCase):
+class TestReadAzureSharedConfig(unittest.TestCase):
def test_valid_content(self):
xml = """<?xml version="1.0" encoding="utf-8"?>
<SharedConfig>
@@ -429,14 +442,3 @@ class TestReadAzureSharedConfig(MockerTestCase):
</SharedConfig>"""
ret = DataSourceAzure.iid_from_shared_config_content(xml)
self.assertEqual("MY_INSTANCE_ID", ret)
-
-
-def apply_patches(patches):
- ret = []
- for (ref, name, replace) in patches:
- if replace is None:
- continue
- orig = getattr(ref, name)
- setattr(ref, name, replace)
- ret.append((ref, name, orig))
- return ret
diff --git a/tests/unittests/test_datasource/test_cloudsigma.py b/tests/unittests/test_datasource/test_cloudsigma.py
index 306ac7d8..772d189a 100644
--- a/tests/unittests/test_datasource/test_cloudsigma.py
+++ b/tests/unittests/test_datasource/test_cloudsigma.py
@@ -39,6 +39,7 @@ class CepkoMock(Cepko):
class DataSourceCloudSigmaTest(test_helpers.TestCase):
def setUp(self):
+ super(DataSourceCloudSigmaTest, self).setUp()
self.datasource = DataSourceCloudSigma.DataSourceCloudSigma("", "", "")
self.datasource.is_running_in_cloudsigma = lambda: True
self.datasource.cepko = CepkoMock(SERVER_CONTEXT)
diff --git a/tests/unittests/test_datasource/test_configdrive.py b/tests/unittests/test_datasource/test_configdrive.py
index d88066e5..e28bdd84 100644
--- a/tests/unittests/test_datasource/test_configdrive.py
+++ b/tests/unittests/test_datasource/test_configdrive.py
@@ -1,10 +1,17 @@
from copy import copy
import json
import os
-import os.path
-
-import mocker
-from mocker import MockerTestCase
+import shutil
+import tempfile
+
+try:
+ from unittest import mock
+except ImportError:
+ import mock
+try:
+ from contextlib import ExitStack
+except ImportError:
+ from contextlib2 import ExitStack
from cloudinit import helpers
from cloudinit import settings
@@ -12,7 +19,8 @@ from cloudinit.sources import DataSourceConfigDrive as ds
from cloudinit.sources.helpers import openstack
from cloudinit import util
-from .. import helpers as unit_helpers
+from ..helpers import TestCase
+
PUBKEY = u'ssh-rsa AAAAB3NzaC1....sIkJhq8wdX+4I3A4cYbYP ubuntu@server-460\n'
EC2_META = {
@@ -64,11 +72,12 @@ CFG_DRIVE_FILES_V2 = {
'openstack/latest/user_data': USER_DATA}
-class TestConfigDriveDataSource(MockerTestCase):
+class TestConfigDriveDataSource(TestCase):
def setUp(self):
super(TestConfigDriveDataSource, self).setUp()
- self.tmp = self.makeDir()
+ self.tmp = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, self.tmp)
def test_ec2_metadata(self):
populate_dir(self.tmp, CFG_DRIVE_FILES_V2)
@@ -91,23 +100,29 @@ class TestConfigDriveDataSource(MockerTestCase):
'swap': '/dev/vda3',
}
for name, dev_name in name_tests.items():
- with unit_helpers.mocker() as my_mock:
- find_mock = my_mock.replace(util.find_devs_with,
- spec=False, passthrough=False)
+ with ExitStack() as mocks:
provided_name = dev_name[len('/dev/'):]
provided_name = "s" + provided_name[1:]
- find_mock(mocker.ARGS)
- my_mock.result([provided_name])
- exists_mock = my_mock.replace(os.path.exists,
- spec=False, passthrough=False)
- exists_mock(mocker.ARGS)
- my_mock.result(False)
- exists_mock(mocker.ARGS)
- my_mock.result(True)
- my_mock.replay()
+ find_mock = mocks.enter_context(
+ mock.patch.object(util, 'find_devs_with',
+ return_value=[provided_name]))
+ # We want os.path.exists() to return False on its first call,
+ # and True on its second call. We use a handy generator as
+ # the mock side effect for this. The mocked function returns
+ # what the side effect returns.
+
+ def exists_side_effect():
+ yield False
+ yield True
+ exists_mock = mocks.enter_context(
+ mock.patch.object(os.path, 'exists',
+ side_effect=exists_side_effect()))
device = cfg_ds.device_name_to_device(name)
self.assertEquals(dev_name, device)
+ find_mock.assert_called_once_with(mock.ANY)
+ self.assertEqual(exists_mock.call_count, 2)
+
def test_dev_os_map(self):
populate_dir(self.tmp, CFG_DRIVE_FILES_V2)
cfg_ds = ds.DataSourceConfigDrive(settings.CFG_BUILTIN,
@@ -123,19 +138,19 @@ class TestConfigDriveDataSource(MockerTestCase):
'swap': '/dev/vda3',
}
for name, dev_name in name_tests.items():
- with unit_helpers.mocker() as my_mock:
- find_mock = my_mock.replace(util.find_devs_with,
- spec=False, passthrough=False)
- find_mock(mocker.ARGS)
- my_mock.result([dev_name])
- exists_mock = my_mock.replace(os.path.exists,
- spec=False, passthrough=False)
- exists_mock(mocker.ARGS)
- my_mock.result(True)
- my_mock.replay()
+ with ExitStack() as mocks:
+ find_mock = mocks.enter_context(
+ mock.patch.object(util, 'find_devs_with',
+ return_value=[dev_name]))
+ exists_mock = mocks.enter_context(
+ mock.patch.object(os.path, 'exists',
+ return_value=True))
device = cfg_ds.device_name_to_device(name)
self.assertEquals(dev_name, device)
+ find_mock.assert_called_once_with(mock.ANY)
+ exists_mock.assert_called_once_with(mock.ANY)
+
def test_dev_ec2_remap(self):
populate_dir(self.tmp, CFG_DRIVE_FILES_V2)
cfg_ds = ds.DataSourceConfigDrive(settings.CFG_BUILTIN,
@@ -156,16 +171,21 @@ class TestConfigDriveDataSource(MockerTestCase):
'root2k': None,
}
for name, dev_name in name_tests.items():
- with unit_helpers.mocker(verify_calls=False) as my_mock:
- exists_mock = my_mock.replace(os.path.exists,
- spec=False, passthrough=False)
- exists_mock(mocker.ARGS)
- my_mock.result(False)
- exists_mock(mocker.ARGS)
- my_mock.result(True)
- my_mock.replay()
+ # We want os.path.exists() to return False on its first call,
+ # and True on its second call. We use a handy generator as
+ # the mock side effect for this. The mocked function returns
+ # what the side effect returns.
+ def exists_side_effect():
+ yield False
+ yield True
+ with mock.patch.object(os.path, 'exists',
+ side_effect=exists_side_effect()):
device = cfg_ds.device_name_to_device(name)
self.assertEquals(dev_name, device)
+ # We don't assert the call count for os.path.exists() because
+ # not all of the entries in name_tests results in two calls to
+ # that function. Specifically, 'root2k' doesn't seem to call
+ # it at all.
def test_dev_ec2_map(self):
populate_dir(self.tmp, CFG_DRIVE_FILES_V2)
@@ -173,12 +193,6 @@ class TestConfigDriveDataSource(MockerTestCase):
None,
helpers.Paths({}))
found = ds.read_config_drive(self.tmp)
- exists_mock = self.mocker.replace(os.path.exists,
- spec=False, passthrough=False)
- exists_mock(mocker.ARGS)
- self.mocker.count(0, None)
- self.mocker.result(True)
- self.mocker.replay()
ec2_md = found['ec2-metadata']
os_md = found['metadata']
cfg_ds.ec2_metadata = ec2_md
@@ -193,8 +207,9 @@ class TestConfigDriveDataSource(MockerTestCase):
'root2k': None,
}
for name, dev_name in name_tests.items():
- device = cfg_ds.device_name_to_device(name)
- self.assertEquals(dev_name, device)
+ with mock.patch.object(os.path, 'exists', return_value=True):
+ device = cfg_ds.device_name_to_device(name)
+ self.assertEquals(dev_name, device)
def test_dir_valid(self):
"""Verify a dir is read as such."""
@@ -326,7 +341,7 @@ def populate_ds_from_read_config(cfg_ds, source, results):
def populate_dir(seed_dir, files):
- for (name, content) in files.iteritems():
+ for (name, content) in files.items():
path = os.path.join(seed_dir, name)
dirname = os.path.dirname(path)
if not os.path.isdir(dirname):
diff --git a/tests/unittests/test_datasource/test_digitalocean.py b/tests/unittests/test_datasource/test_digitalocean.py
index d1270fc2..98f9cfac 100644
--- a/tests/unittests/test_datasource/test_digitalocean.py
+++ b/tests/unittests/test_datasource/test_digitalocean.py
@@ -18,8 +18,7 @@
import httpretty
import re
-from types import ListType
-from urlparse import urlparse
+from six.moves.urllib_parse import urlparse
from cloudinit import settings
from cloudinit import helpers
@@ -110,7 +109,7 @@ class TestDataSourceDigitalOcean(test_helpers.HttprettyTestCase):
self.assertEqual([DO_META.get('public-keys')],
self.ds.get_public_ssh_keys())
- self.assertIs(type(self.ds.get_public_ssh_keys()), ListType)
+ self.assertIsInstance(self.ds.get_public_ssh_keys(), list)
@httpretty.activate
def test_multiple_ssh_keys(self):
@@ -124,4 +123,4 @@ class TestDataSourceDigitalOcean(test_helpers.HttprettyTestCase):
self.assertEqual(DO_META.get('public-keys').splitlines(),
self.ds.get_public_ssh_keys())
- self.assertIs(type(self.ds.get_public_ssh_keys()), ListType)
+ self.assertIsInstance(self.ds.get_public_ssh_keys(), list)
diff --git a/tests/unittests/test_datasource/test_gce.py b/tests/unittests/test_datasource/test_gce.py
index 06050bb1..6dd4b5ed 100644
--- a/tests/unittests/test_datasource/test_gce.py
+++ b/tests/unittests/test_datasource/test_gce.py
@@ -19,7 +19,7 @@ import httpretty
import re
from base64 import b64encode, b64decode
-from urlparse import urlparse
+from six.moves.urllib_parse import urlparse
from cloudinit import settings
from cloudinit import helpers
@@ -45,7 +45,7 @@ GCE_META_ENCODING = {
'instance/id': '12345',
'instance/hostname': 'server.project-baz.local',
'instance/zone': 'baz/bang',
- 'instance/attributes/user-data': b64encode('/bin/echo baz\n'),
+ 'instance/attributes/user-data': b64encode(b'/bin/echo baz\n'),
'instance/attributes/user-data-encoding': 'base64',
}
diff --git a/tests/unittests/test_datasource/test_maas.py b/tests/unittests/test_datasource/test_maas.py
index c157beb8..d25e1adc 100644
--- a/tests/unittests/test_datasource/test_maas.py
+++ b/tests/unittests/test_datasource/test_maas.py
@@ -1,19 +1,25 @@
from copy import copy
import os
+import shutil
+import tempfile
from cloudinit.sources import DataSourceMAAS
from cloudinit import url_helper
-from ..helpers import populate_dir
+from ..helpers import TestCase, populate_dir
-import mocker
+try:
+ from unittest import mock
+except ImportError:
+ import mock
-class TestMAASDataSource(mocker.MockerTestCase):
+class TestMAASDataSource(TestCase):
def setUp(self):
super(TestMAASDataSource, self).setUp()
# Make a temp directoy for tests to use.
- self.tmp = self.makeDir()
+ self.tmp = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, self.tmp)
def test_seed_dir_valid(self):
"""Verify a valid seeddir is read as such."""
@@ -93,16 +99,18 @@ class TestMAASDataSource(mocker.MockerTestCase):
def test_seed_url_valid(self):
"""Verify that valid seed_url is read as such."""
- valid = {'meta-data/instance-id': 'i-instanceid',
+ valid = {
+ 'meta-data/instance-id': 'i-instanceid',
'meta-data/local-hostname': 'test-hostname',
'meta-data/public-keys': 'test-hostname',
- 'user-data': 'foodata'}
+ 'user-data': 'foodata',
+ }
valid_order = [
'meta-data/local-hostname',
'meta-data/instance-id',
'meta-data/public-keys',
'user-data',
- ]
+ ]
my_seed = "http://example.com/xmeta"
my_ver = "1999-99-99"
my_headers = {'header1': 'value1', 'header2': 'value2'}
@@ -110,28 +118,38 @@ class TestMAASDataSource(mocker.MockerTestCase):
def my_headers_cb(url):
return my_headers
- mock_request = self.mocker.replace(url_helper.readurl,
- passthrough=False)
-
- for key in valid_order:
- url = "%s/%s/%s" % (my_seed, my_ver, key)
- mock_request(url, headers=None, timeout=mocker.ANY,
- data=mocker.ANY, sec_between=mocker.ANY,
- ssl_details=mocker.ANY, retries=mocker.ANY,
- headers_cb=my_headers_cb,
- exception_cb=mocker.ANY)
- resp = valid.get(key)
- self.mocker.result(url_helper.StringResponse(resp))
- self.mocker.replay()
-
- (userdata, metadata) = DataSourceMAAS.read_maas_seed_url(my_seed,
- header_cb=my_headers_cb, version=my_ver)
-
- self.assertEqual("foodata", userdata)
- self.assertEqual(metadata['instance-id'],
- valid['meta-data/instance-id'])
- self.assertEqual(metadata['local-hostname'],
- valid['meta-data/local-hostname'])
+ # Each time url_helper.readurl() is called, something different is
+ # returned based on the canned data above. We need to build up a list
+ # of side effect return values, which the mock will return. At the
+ # same time, we'll build up a list of expected call arguments for
+ # asserting after the code under test is run.
+ calls = []
+
+ def side_effect():
+ for key in valid_order:
+ resp = valid.get(key)
+ url = "%s/%s/%s" % (my_seed, my_ver, key)
+ calls.append(
+ mock.call(url, headers=None, timeout=mock.ANY,
+ data=mock.ANY, sec_between=mock.ANY,
+ ssl_details=mock.ANY, retries=mock.ANY,
+ headers_cb=my_headers_cb,
+ exception_cb=mock.ANY))
+ yield url_helper.StringResponse(resp)
+
+ # Now do the actual call of the code under test.
+ with mock.patch.object(url_helper, 'readurl',
+ side_effect=side_effect()) as mockobj:
+ userdata, metadata = DataSourceMAAS.read_maas_seed_url(
+ my_seed, header_cb=my_headers_cb, version=my_ver)
+
+ self.assertEqual("foodata", userdata)
+ self.assertEqual(metadata['instance-id'],
+ valid['meta-data/instance-id'])
+ self.assertEqual(metadata['local-hostname'],
+ valid['meta-data/local-hostname'])
+
+ mockobj.has_calls(calls)
def test_seed_url_invalid(self):
"""Verify that invalid seed_url raises MAASSeedDirMalformed."""
diff --git a/tests/unittests/test_datasource/test_nocloud.py b/tests/unittests/test_datasource/test_nocloud.py
index e9235951..4f967f58 100644
--- a/tests/unittests/test_datasource/test_nocloud.py
+++ b/tests/unittests/test_datasource/test_nocloud.py
@@ -1,35 +1,39 @@
from cloudinit import helpers
from cloudinit.sources import DataSourceNoCloud
from cloudinit import util
-from ..helpers import populate_dir
+from ..helpers import TestCase, populate_dir
-from mocker import MockerTestCase
import os
import yaml
+import shutil
+import tempfile
+import unittest
+try:
+ from unittest import mock
+except ImportError:
+ import mock
+try:
+ from contextlib import ExitStack
+except ImportError:
+ from contextlib2 import ExitStack
-class TestNoCloudDataSource(MockerTestCase):
+
+class TestNoCloudDataSource(TestCase):
def setUp(self):
- self.tmp = self.makeDir()
+ super(TestNoCloudDataSource, self).setUp()
+ self.tmp = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, self.tmp)
self.paths = helpers.Paths({'cloud_dir': self.tmp})
self.cmdline = "root=TESTCMDLINE"
- self.unapply = []
- self.apply_patches([(util, 'get_cmdline', self._getcmdline)])
- super(TestNoCloudDataSource, self).setUp()
-
- def tearDown(self):
- apply_patches([i for i in reversed(self.unapply)])
- super(TestNoCloudDataSource, self).tearDown()
+ self.mocks = ExitStack()
+ self.addCleanup(self.mocks.close)
- def apply_patches(self, patches):
- ret = apply_patches(patches)
- self.unapply += ret
-
- def _getcmdline(self):
- return self.cmdline
+ self.mocks.enter_context(
+ mock.patch.object(util, 'get_cmdline', return_value=self.cmdline))
def test_nocloud_seed_dir(self):
md = {'instance-id': 'IID', 'dsmode': 'local'}
@@ -59,7 +63,9 @@ class TestNoCloudDataSource(MockerTestCase):
def my_find_devs_with(*args, **kwargs):
raise PsuedoException
- self.apply_patches([(util, 'find_devs_with', my_find_devs_with)])
+ self.mocks.enter_context(
+ mock.patch.object(util, 'find_devs_with',
+ side_effect=PsuedoException))
# by default, NoCloud should search for filesystems by label
sys_cfg = {'datasource': {'NoCloud': {}}}
@@ -85,7 +91,7 @@ class TestNoCloudDataSource(MockerTestCase):
data = {
'fs_label': None,
- 'meta-data': {'instance-id': 'IID'},
+ 'meta-data': yaml.safe_dump({'instance-id': 'IID'}),
'user-data': "USER_DATA_RAW",
}
@@ -133,7 +139,7 @@ class TestNoCloudDataSource(MockerTestCase):
self.assertTrue(ret)
-class TestParseCommandLineData(MockerTestCase):
+class TestParseCommandLineData(unittest.TestCase):
def test_parse_cmdline_data_valid(self):
ds_id = "ds=nocloud"
@@ -178,15 +184,4 @@ class TestParseCommandLineData(MockerTestCase):
self.assertFalse(ret)
-def apply_patches(patches):
- ret = []
- for (ref, name, replace) in patches:
- if replace is None:
- continue
- orig = getattr(ref, name)
- setattr(ref, name, replace)
- ret.append((ref, name, orig))
- return ret
-
-
# vi: ts=4 expandtab
diff --git a/tests/unittests/test_datasource/test_opennebula.py b/tests/unittests/test_datasource/test_opennebula.py
index b4fd1f4d..27adf21b 100644
--- a/tests/unittests/test_datasource/test_opennebula.py
+++ b/tests/unittests/test_datasource/test_opennebula.py
@@ -1,12 +1,14 @@
from cloudinit import helpers
from cloudinit.sources import DataSourceOpenNebula as ds
from cloudinit import util
-from mocker import MockerTestCase
-from ..helpers import populate_dir
+from ..helpers import TestCase, populate_dir
-from base64 import b64encode
import os
import pwd
+import shutil
+import tempfile
+import unittest
+
TEST_VARS = {
'VAR1': 'single',
@@ -37,12 +39,13 @@ CMD_IP_OUT = '''\
'''
-class TestOpenNebulaDataSource(MockerTestCase):
+class TestOpenNebulaDataSource(TestCase):
parsed_user = None
def setUp(self):
super(TestOpenNebulaDataSource, self).setUp()
- self.tmp = self.makeDir()
+ self.tmp = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, self.tmp)
self.paths = helpers.Paths({'cloud_dir': self.tmp})
# defaults for few tests
@@ -176,7 +179,7 @@ class TestOpenNebulaDataSource(MockerTestCase):
self.assertEqual(USER_DATA, results['userdata'])
def test_user_data_encoding_required_for_decode(self):
- b64userdata = b64encode(USER_DATA)
+ b64userdata = util.b64e(USER_DATA)
for k in ('USER_DATA', 'USERDATA'):
my_d = os.path.join(self.tmp, k)
populate_context_dir(my_d, {k: b64userdata})
@@ -188,7 +191,7 @@ class TestOpenNebulaDataSource(MockerTestCase):
def test_user_data_base64_encoding(self):
for k in ('USER_DATA', 'USERDATA'):
my_d = os.path.join(self.tmp, k)
- populate_context_dir(my_d, {k: b64encode(USER_DATA),
+ populate_context_dir(my_d, {k: util.b64e(USER_DATA),
'USERDATA_ENCODING': 'base64'})
results = ds.read_context_disk_dir(my_d)
@@ -228,7 +231,7 @@ class TestOpenNebulaDataSource(MockerTestCase):
util.find_devs_with = orig_find_devs_with
-class TestOpenNebulaNetwork(MockerTestCase):
+class TestOpenNebulaNetwork(unittest.TestCase):
def setUp(self):
super(TestOpenNebulaNetwork, self).setUp()
@@ -280,7 +283,7 @@ iface eth0 inet static
''')
-class TestParseShellConfig(MockerTestCase):
+class TestParseShellConfig(unittest.TestCase):
def test_no_seconds(self):
cfg = '\n'.join(["foo=bar", "SECONDS=2", "xx=foo"])
# we could test 'sleep 2', but that would make the test run slower.
@@ -290,7 +293,7 @@ class TestParseShellConfig(MockerTestCase):
def populate_context_dir(path, variables):
data = "# Context variables generated by OpenNebula\n"
- for (k, v) in variables.iteritems():
+ for k, v in variables.items():
data += ("%s='%s'\n" % (k.upper(), v.replace(r"'", r"'\''")))
populate_dir(path, {'context.sh': data})
diff --git a/tests/unittests/test_datasource/test_openstack.py b/tests/unittests/test_datasource/test_openstack.py
index 49894e51..81ef1546 100644
--- a/tests/unittests/test_datasource/test_openstack.py
+++ b/tests/unittests/test_datasource/test_openstack.py
@@ -20,12 +20,11 @@ import copy
import json
import re
-from StringIO import StringIO
-
-from urlparse import urlparse
-
from .. import helpers as test_helpers
+from six import StringIO
+from six.moves.urllib.parse import urlparse
+
from cloudinit import helpers
from cloudinit import settings
from cloudinit.sources import DataSourceOpenStack as ds
diff --git a/tests/unittests/test_datasource/test_smartos.py b/tests/unittests/test_datasource/test_smartos.py
index 65675106..8b62b1b1 100644
--- a/tests/unittests/test_datasource/test_smartos.py
+++ b/tests/unittests/test_datasource/test_smartos.py
@@ -22,16 +22,21 @@
# return responses.
#
-import base64
+from __future__ import print_function
+
from cloudinit import helpers as c_helpers
from cloudinit.sources import DataSourceSmartOS
+from cloudinit.util import b64e
from .. import helpers
import os
import os.path
import re
+import shutil
+import tempfile
import stat
import uuid
+
MOCK_RETURNS = {
'hostname': 'test-host',
'root_authorized_keys': 'ssh-rsa AAAAB3Nz...aC1yc2E= keyname',
@@ -107,11 +112,12 @@ class MockSerial(object):
class TestSmartOSDataSource(helpers.FilesystemMockingTestCase):
def setUp(self):
- helpers.FilesystemMockingTestCase.setUp(self)
+ super(TestSmartOSDataSource, self).setUp()
- # makeDir comes from MockerTestCase
- self.tmp = self.makeDir()
- self.legacy_user_d = self.makeDir()
+ self.tmp = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, self.tmp)
+ self.legacy_user_d = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, self.legacy_user_d)
# If you should want to watch the logs...
self._log = None
@@ -227,7 +233,7 @@ class TestSmartOSDataSource(helpers.FilesystemMockingTestCase):
my_returns = MOCK_RETURNS.copy()
my_returns['base64_all'] = "true"
for k in ('hostname', 'cloud-init:user-data'):
- my_returns[k] = base64.b64encode(my_returns[k])
+ my_returns[k] = b64e(my_returns[k])
dsrc = self._get_ds(mockdata=my_returns)
ret = dsrc.get_data()
@@ -248,7 +254,7 @@ class TestSmartOSDataSource(helpers.FilesystemMockingTestCase):
my_returns['b64-cloud-init:user-data'] = "true"
my_returns['b64-hostname'] = "true"
for k in ('hostname', 'cloud-init:user-data'):
- my_returns[k] = base64.b64encode(my_returns[k])
+ my_returns[k] = b64e(my_returns[k])
dsrc = self._get_ds(mockdata=my_returns)
ret = dsrc.get_data()
@@ -264,7 +270,7 @@ class TestSmartOSDataSource(helpers.FilesystemMockingTestCase):
my_returns = MOCK_RETURNS.copy()
my_returns['base64_keys'] = 'hostname,ignored'
for k in ('hostname',):
- my_returns[k] = base64.b64encode(my_returns[k])
+ my_returns[k] = b64e(my_returns[k])
dsrc = self._get_ds(mockdata=my_returns)
ret = dsrc.get_data()
@@ -365,7 +371,7 @@ class TestSmartOSDataSource(helpers.FilesystemMockingTestCase):
permissions = oct(os.stat(name_f)[stat.ST_MODE])[-3:]
if re.match(r'.*\/mdata-user-data$', name_f):
found_new = True
- print name_f
+ print(name_f)
self.assertEquals(permissions, '400')
self.assertFalse(found_new)