summaryrefslogtreecommitdiff
path: root/tests
diff options
context:
space:
mode:
Diffstat (limited to 'tests')
-rw-r--r--tests/unittests/helpers.py84
-rw-r--r--tests/unittests/test__init__.py236
-rw-r--r--tests/unittests/test_builtin_handlers.py39
-rw-r--r--tests/unittests/test_cs_util.py25
-rw-r--r--tests/unittests/test_data.py143
-rw-r--r--tests/unittests/test_datasource/test_altcloud.py6
-rw-r--r--tests/unittests/test_datasource/test_azure.py97
-rw-r--r--tests/unittests/test_datasource/test_configdrive.py106
-rw-r--r--tests/unittests/test_datasource/test_digitalocean.py7
-rw-r--r--tests/unittests/test_datasource/test_gce.py4
-rw-r--r--tests/unittests/test_datasource/test_maas.py75
-rw-r--r--tests/unittests/test_datasource/test_nocloud.py54
-rw-r--r--tests/unittests/test_datasource/test_opennebula.py26
-rw-r--r--tests/unittests/test_datasource/test_openstack.py7
-rw-r--r--tests/unittests/test_datasource/test_smartos.py26
-rw-r--r--tests/unittests/test_distros/test_generic.py6
-rw-r--r--tests/unittests/test_distros/test_hostname.py4
-rw-r--r--tests/unittests/test_distros/test_hosts.py4
-rw-r--r--tests/unittests/test_distros/test_netconfig.py273
-rw-r--r--tests/unittests/test_distros/test_resolv.py5
-rw-r--r--tests/unittests/test_distros/test_sysconfig.py5
-rw-r--r--tests/unittests/test_distros/test_user_data_normalize.py4
-rw-r--r--tests/unittests/test_filters/test_launch_index.py8
-rw-r--r--tests/unittests/test_handler/test_handler_apt_configure.py22
-rw-r--r--tests/unittests/test_handler/test_handler_ca_certs.py253
-rw-r--r--tests/unittests/test_handler/test_handler_chef.py8
-rw-r--r--tests/unittests/test_handler/test_handler_debug.py5
-rw-r--r--tests/unittests/test_handler/test_handler_growpart.py109
-rw-r--r--tests/unittests/test_handler/test_handler_locale.py11
-rw-r--r--tests/unittests/test_handler/test_handler_seed_random.py21
-rw-r--r--tests/unittests/test_handler/test_handler_set_hostname.py11
-rw-r--r--tests/unittests/test_handler/test_handler_timezone.py11
-rw-r--r--tests/unittests/test_handler/test_handler_yum_add_repo.py12
-rw-r--r--tests/unittests/test_merging.py16
-rw-r--r--tests/unittests/test_pathprefix2dict.py10
-rw-r--r--tests/unittests/test_runs/test_merge_run.py8
-rw-r--r--tests/unittests/test_runs/test_simple_run.py11
-rw-r--r--tests/unittests/test_templating.py4
-rw-r--r--tests/unittests/test_util.py62
39 files changed, 969 insertions, 849 deletions
diff --git a/tests/unittests/helpers.py b/tests/unittests/helpers.py
index 52305397..828579e8 100644
--- a/tests/unittests/helpers.py
+++ b/tests/unittests/helpers.py
@@ -1,17 +1,23 @@
import os
import sys
+import shutil
+import tempfile
import unittest
-from contextlib import contextmanager
+import six
-from mocker import Mocker
-from mocker import MockerTestCase
+try:
+ from unittest import mock
+except ImportError:
+ import mock
+try:
+ from contextlib import ExitStack
+except ImportError:
+ from contextlib2 import ExitStack
from cloudinit import helpers as ch
from cloudinit import util
-import shutil
-
# Used for detecting different python versions
PY2 = False
PY26 = False
@@ -60,7 +66,7 @@ if PY26:
def assertDictContainsSubset(self, expected, actual, msg=None):
missing = []
mismatched = []
- for k, v in expected.iteritems():
+ for k, v in expected.items():
if k not in actual:
missing.append(k)
elif actual[k] != v:
@@ -86,17 +92,6 @@ else:
pass
-@contextmanager
-def mocker(verify_calls=True):
- m = Mocker()
- try:
- yield m
- finally:
- m.restore()
- if verify_calls:
- m.verify()
-
-
# Makes the old path start
# with new base instead of whatever
# it previously had
@@ -121,14 +116,19 @@ def retarget_many_wrapper(new_base, am, old_func):
nam = len(n_args)
for i in range(0, nam):
path = args[i]
- n_args[i] = rebase_path(path, new_base)
+ # patchOS() wraps various os and os.path functions, however in
+ # Python 3 some of these now accept file-descriptors (integers).
+ # That breaks rebase_path() so in lieu of a better solution, just
+ # don't rebase if we get a fd.
+ if isinstance(path, six.string_types):
+ n_args[i] = rebase_path(path, new_base)
return old_func(*n_args, **kwds)
return wrapper
-class ResourceUsingTestCase(MockerTestCase):
- def __init__(self, methodName="runTest"):
- MockerTestCase.__init__(self, methodName)
+class ResourceUsingTestCase(unittest.TestCase):
+ def setUp(self):
+ unittest.TestCase.setUp(self)
self.resource_path = None
def resourceLocation(self, subname=None):
@@ -156,17 +156,23 @@ class ResourceUsingTestCase(MockerTestCase):
return fh.read()
def getCloudPaths(self):
+ tmpdir = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, tmpdir)
cp = ch.Paths({
- 'cloud_dir': self.makeDir(),
+ 'cloud_dir': tmpdir,
'templates_dir': self.resourceLocation(),
})
return cp
class FilesystemMockingTestCase(ResourceUsingTestCase):
- def __init__(self, methodName="runTest"):
- ResourceUsingTestCase.__init__(self, methodName)
- self.patched_funcs = []
+ def setUp(self):
+ ResourceUsingTestCase.setUp(self)
+ self.patched_funcs = ExitStack()
+
+ def tearDown(self):
+ self.patched_funcs.close()
+ ResourceUsingTestCase.tearDown(self)
def replicateTestRoot(self, example_root, target_root):
real_root = self.resourceLocation()
@@ -180,15 +186,6 @@ class FilesystemMockingTestCase(ResourceUsingTestCase):
make_path = util.abs_join(make_path, f)
shutil.copy(real_path, make_path)
- def tearDown(self):
- self.restore()
- ResourceUsingTestCase.tearDown(self)
-
- def restore(self):
- for (mod, f, func) in self.patched_funcs:
- setattr(mod, f, func)
- self.patched_funcs = []
-
def patchUtils(self, new_root):
patch_funcs = {
util: [('write_file', 1),
@@ -205,8 +202,8 @@ class FilesystemMockingTestCase(ResourceUsingTestCase):
for (f, am) in funcs:
func = getattr(mod, f)
trap_func = retarget_many_wrapper(new_root, am, func)
- setattr(mod, f, trap_func)
- self.patched_funcs.append((mod, f, func))
+ self.patched_funcs.enter_context(
+ mock.patch.object(mod, f, trap_func))
# Handle subprocess calls
func = getattr(util, 'subp')
@@ -214,16 +211,15 @@ class FilesystemMockingTestCase(ResourceUsingTestCase):
def nsubp(*_args, **_kwargs):
return ('', '')
- setattr(util, 'subp', nsubp)
- self.patched_funcs.append((util, 'subp', func))
+ self.patched_funcs.enter_context(
+ mock.patch.object(util, 'subp', nsubp))
def null_func(*_args, **_kwargs):
return None
for f in ['chownbyid', 'chownbyname']:
- func = getattr(util, f)
- setattr(util, f, null_func)
- self.patched_funcs.append((util, f, func))
+ self.patched_funcs.enter_context(
+ mock.patch.object(util, f, null_func))
def patchOS(self, new_root):
patch_funcs = {
@@ -234,8 +230,8 @@ class FilesystemMockingTestCase(ResourceUsingTestCase):
for f in funcs:
func = getattr(mod, f)
trap_func = retarget_many_wrapper(new_root, 1, func)
- setattr(mod, f, trap_func)
- self.patched_funcs.append((mod, f, func))
+ self.patched_funcs.enter_context(
+ mock.patch.object(mod, f, trap_func))
class HttprettyTestCase(TestCase):
@@ -256,7 +252,7 @@ class HttprettyTestCase(TestCase):
def populate_dir(path, files):
if not os.path.exists(path):
os.makedirs(path)
- for (name, content) in files.iteritems():
+ for (name, content) in files.items():
with open(os.path.join(path, name), "w") as fp:
fp.write(content)
fp.close()
diff --git a/tests/unittests/test__init__.py b/tests/unittests/test__init__.py
index 17965488..f5dc3435 100644
--- a/tests/unittests/test__init__.py
+++ b/tests/unittests/test__init__.py
@@ -1,10 +1,19 @@
import os
-
-from mocker import MockerTestCase, ARGS, KWARGS
+import shutil
+import tempfile
+import unittest
+
+try:
+ from unittest import mock
+except ImportError:
+ import mock
+try:
+ from contextlib import ExitStack
+except ImportError:
+ from contextlib2 import ExitStack
from cloudinit import handlers
from cloudinit import helpers
-from cloudinit import importer
from cloudinit import settings
from cloudinit import url_helper
from cloudinit import util
@@ -22,76 +31,73 @@ class FakeModule(handlers.Handler):
pass
-class TestWalkerHandleHandler(MockerTestCase):
+class TestWalkerHandleHandler(unittest.TestCase):
def setUp(self):
-
- MockerTestCase.setUp(self)
+ unittest.TestCase.setUp(self)
+ tmpdir = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, tmpdir)
self.data = {
"handlercount": 0,
"frequency": "",
- "handlerdir": self.makeDir(),
+ "handlerdir": tmpdir,
"handlers": helpers.ContentHandlers(),
"data": None}
self.expected_module_name = "part-handler-%03d" % (
self.data["handlercount"],)
expected_file_name = "%s.py" % self.expected_module_name
- expected_file_fullname = os.path.join(self.data["handlerdir"],
- expected_file_name)
+ self.expected_file_fullname = os.path.join(
+ self.data["handlerdir"], expected_file_name)
self.module_fake = FakeModule()
self.ctype = None
self.filename = None
self.payload = "dummy payload"
- # Mock the write_file function
- write_file_mock = self.mocker.replace(util.write_file,
- passthrough=False)
- write_file_mock(expected_file_fullname, self.payload, 0600)
+ # Mock the write_file() function. We'll assert that it got called as
+ # expected in each of the individual tests.
+ resources = ExitStack()
+ self.addCleanup(resources.close)
+ self.write_file_mock = resources.enter_context(
+ mock.patch('cloudinit.util.write_file'))
def test_no_errors(self):
"""Payload gets written to file and added to C{pdata}."""
- import_mock = self.mocker.replace(importer.import_module,
- passthrough=False)
- import_mock(self.expected_module_name)
- self.mocker.result(self.module_fake)
- self.mocker.replay()
-
- handlers.walker_handle_handler(self.data, self.ctype, self.filename,
- self.payload)
-
- self.assertEqual(1, self.data["handlercount"])
+ with mock.patch('cloudinit.importer.import_module',
+ return_value=self.module_fake) as mockobj:
+ handlers.walker_handle_handler(self.data, self.ctype,
+ self.filename, self.payload)
+ mockobj.assert_called_with_once(self.expected_module_name)
+ self.write_file_mock.assert_called_with_once(
+ self.expected_file_fullname, self.payload, 0o600)
+ self.assertEqual(self.data['handlercount'], 1)
def test_import_error(self):
"""Module import errors are logged. No handler added to C{pdata}."""
- import_mock = self.mocker.replace(importer.import_module,
- passthrough=False)
- import_mock(self.expected_module_name)
- self.mocker.throw(ImportError())
- self.mocker.replay()
-
- handlers.walker_handle_handler(self.data, self.ctype, self.filename,
- self.payload)
-
- self.assertEqual(0, self.data["handlercount"])
+ with mock.patch('cloudinit.importer.import_module',
+ side_effect=ImportError) as mockobj:
+ handlers.walker_handle_handler(self.data, self.ctype,
+ self.filename, self.payload)
+ mockobj.assert_called_with_once(self.expected_module_name)
+ self.write_file_mock.assert_called_with_once(
+ self.expected_file_fullname, self.payload, 0o600)
+ self.assertEqual(self.data['handlercount'], 0)
def test_attribute_error(self):
"""Attribute errors are logged. No handler added to C{pdata}."""
- import_mock = self.mocker.replace(importer.import_module,
- passthrough=False)
- import_mock(self.expected_module_name)
- self.mocker.result(self.module_fake)
- self.mocker.throw(AttributeError())
- self.mocker.replay()
-
- handlers.walker_handle_handler(self.data, self.ctype, self.filename,
- self.payload)
+ with mock.patch('cloudinit.importer.import_module',
+ side_effect=AttributeError,
+ return_value=self.module_fake) as mockobj:
+ handlers.walker_handle_handler(self.data, self.ctype,
+ self.filename, self.payload)
+ mockobj.assert_called_with_once(self.expected_module_name)
+ self.write_file_mock.assert_called_with_once(
+ self.expected_file_fullname, self.payload, 0o600)
+ self.assertEqual(self.data['handlercount'], 0)
- self.assertEqual(0, self.data["handlercount"])
-
-class TestHandlerHandlePart(MockerTestCase):
+class TestHandlerHandlePart(unittest.TestCase):
def setUp(self):
self.data = "fake data"
@@ -108,95 +114,80 @@ class TestHandlerHandlePart(MockerTestCase):
C{handle_part} is called without C{frequency} for
C{handler_version} == 1.
"""
- mod_mock = self.mocker.mock()
- getattr(mod_mock, "frequency")
- self.mocker.result(settings.PER_INSTANCE)
- getattr(mod_mock, "handler_version")
- self.mocker.result(1)
- mod_mock.handle_part(self.data, self.ctype, self.filename,
- self.payload)
- self.mocker.replay()
-
- handlers.run_part(mod_mock, self.data, self.filename,
- self.payload, self.frequency, self.headers)
+ mod_mock = mock.Mock(frequency=settings.PER_INSTANCE,
+ handler_version=1)
+ handlers.run_part(mod_mock, self.data, self.filename, self.payload,
+ self.frequency, self.headers)
+ # Assert that the handle_part() method of the mock object got
+ # called with the expected arguments.
+ mod_mock.handle_part.assert_called_with_once(
+ self.data, self.ctype, self.filename, self.payload)
def test_normal_version_2(self):
"""
C{handle_part} is called with C{frequency} for
C{handler_version} == 2.
"""
- mod_mock = self.mocker.mock()
- getattr(mod_mock, "frequency")
- self.mocker.result(settings.PER_INSTANCE)
- getattr(mod_mock, "handler_version")
- self.mocker.result(2)
- mod_mock.handle_part(self.data, self.ctype, self.filename,
- self.payload, self.frequency)
- self.mocker.replay()
-
- handlers.run_part(mod_mock, self.data, self.filename,
- self.payload, self.frequency, self.headers)
+ mod_mock = mock.Mock(frequency=settings.PER_INSTANCE,
+ handler_version=2)
+ handlers.run_part(mod_mock, self.data, self.filename, self.payload,
+ self.frequency, self.headers)
+ # Assert that the handle_part() method of the mock object got
+ # called with the expected arguments.
+ mod_mock.handle_part.assert_called_with_once(
+ self.data, self.ctype, self.filename, self.payload)
def test_modfreq_per_always(self):
"""
C{handle_part} is called regardless of frequency if nofreq is always.
"""
self.frequency = "once"
- mod_mock = self.mocker.mock()
- getattr(mod_mock, "frequency")
- self.mocker.result(settings.PER_ALWAYS)
- getattr(mod_mock, "handler_version")
- self.mocker.result(1)
- mod_mock.handle_part(self.data, self.ctype, self.filename,
- self.payload)
- self.mocker.replay()
-
- handlers.run_part(mod_mock, self.data, self.filename,
- self.payload, self.frequency, self.headers)
+ mod_mock = mock.Mock(frequency=settings.PER_ALWAYS,
+ handler_version=1)
+ handlers.run_part(mod_mock, self.data, self.filename, self.payload,
+ self.frequency, self.headers)
+ # Assert that the handle_part() method of the mock object got
+ # called with the expected arguments.
+ mod_mock.handle_part.assert_called_with_once(
+ self.data, self.ctype, self.filename, self.payload)
def test_no_handle_when_modfreq_once(self):
"""C{handle_part} is not called if frequency is once."""
self.frequency = "once"
- mod_mock = self.mocker.mock()
- getattr(mod_mock, "frequency")
- self.mocker.result(settings.PER_ONCE)
- self.mocker.replay()
-
- handlers.run_part(mod_mock, self.data, self.filename,
- self.payload, self.frequency, self.headers)
+ mod_mock = mock.Mock(frequency=settings.PER_ONCE)
+ handlers.run_part(mod_mock, self.data, self.filename, self.payload,
+ self.frequency, self.headers)
+ # Assert that the handle_part() method of the mock object got
+ # called with the expected arguments.
+ mod_mock.handle_part.assert_called_with_once(
+ self.data, self.ctype, self.filename, self.payload)
def test_exception_is_caught(self):
"""Exceptions within C{handle_part} are caught and logged."""
- mod_mock = self.mocker.mock()
- getattr(mod_mock, "frequency")
- self.mocker.result(settings.PER_INSTANCE)
- getattr(mod_mock, "handler_version")
- self.mocker.result(1)
- mod_mock.handle_part(self.data, self.ctype, self.filename,
- self.payload)
- self.mocker.throw(Exception())
- self.mocker.replay()
-
- handlers.run_part(mod_mock, self.data, self.filename,
- self.payload, self.frequency, self.headers)
-
-
-class TestCmdlineUrl(MockerTestCase):
+ mod_mock = mock.Mock(frequency=settings.PER_INSTANCE,
+ handler_version=1)
+ handlers.run_part(mod_mock, self.data, self.filename, self.payload,
+ self.frequency, self.headers)
+ mod_mock.handle_part.side_effect = Exception
+ handlers.run_part(mod_mock, self.data, self.filename, self.payload,
+ self.frequency, self.headers)
+ mod_mock.handle_part.assert_called_with_once(
+ self.data, self.ctype, self.filename, self.payload)
+
+
+class TestCmdlineUrl(unittest.TestCase):
def test_invalid_content(self):
url = "http://example.com/foo"
key = "mykey"
payload = "0"
cmdline = "ro %s=%s bar=1" % (key, url)
- mock_readurl = self.mocker.replace(url_helper.readurl,
- passthrough=False)
- mock_readurl(url, ARGS, KWARGS)
- self.mocker.result(url_helper.StringResponse(payload))
- self.mocker.replay()
-
- self.assertEqual((key, url, None),
- util.get_cmdline_url(names=[key], starts="xxxxxx",
- cmdline=cmdline))
+ with mock.patch('cloudinit.url_helper.readurl',
+ return_value=url_helper.StringResponse(payload)):
+ self.assertEqual(
+ util.get_cmdline_url(names=[key], starts="xxxxxx",
+ cmdline=cmdline),
+ (key, url, None))
def test_valid_content(self):
url = "http://example.com/foo"
@@ -204,27 +195,24 @@ class TestCmdlineUrl(MockerTestCase):
payload = "xcloud-config\nmydata: foo\nbar: wark\n"
cmdline = "ro %s=%s bar=1" % (key, url)
- mock_readurl = self.mocker.replace(url_helper.readurl,
- passthrough=False)
- mock_readurl(url, ARGS, KWARGS)
- self.mocker.result(url_helper.StringResponse(payload))
- self.mocker.replay()
-
- self.assertEqual((key, url, payload),
- util.get_cmdline_url(names=[key], starts="xcloud-config",
- cmdline=cmdline))
+ with mock.patch('cloudinit.url_helper.readurl',
+ return_value=url_helper.StringResponse(payload)):
+ self.assertEqual(
+ util.get_cmdline_url(names=[key], starts="xcloud-config",
+ cmdline=cmdline),
+ (key, url, payload))
def test_no_key_found(self):
url = "http://example.com/foo"
key = "mykey"
cmdline = "ro %s=%s bar=1" % (key, url)
- self.mocker.replace(url_helper.readurl, passthrough=False)
- self.mocker.result(url_helper.StringResponse(""))
- self.mocker.replay()
+ with mock.patch('cloudinit.url_helper.readurl',
+ return_value=url_helper.StringResponse('')):
+ self.assertEqual(
+ util.get_cmdline_url(names=["does-not-appear"],
+ starts="#cloud-config", cmdline=cmdline),
+ (None, None, None))
- self.assertEqual((None, None, None),
- util.get_cmdline_url(names=["does-not-appear"],
- starts="#cloud-config", cmdline=cmdline))
# vi: ts=4 expandtab
diff --git a/tests/unittests/test_builtin_handlers.py b/tests/unittests/test_builtin_handlers.py
index af7f442e..ad32d0b2 100644
--- a/tests/unittests/test_builtin_handlers.py
+++ b/tests/unittests/test_builtin_handlers.py
@@ -1,6 +1,13 @@
"""Tests of the built-in user data handlers."""
import os
+import shutil
+import tempfile
+
+try:
+ from unittest import mock
+except ImportError:
+ import mock
from . import helpers as test_helpers
@@ -14,10 +21,11 @@ from cloudinit.settings import (PER_ALWAYS, PER_INSTANCE)
class TestBuiltins(test_helpers.FilesystemMockingTestCase):
-
def test_upstart_frequency_no_out(self):
- c_root = self.makeDir()
- up_root = self.makeDir()
+ c_root = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, c_root)
+ up_root = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, up_root)
paths = helpers.Paths({
'cloud_dir': c_root,
'upstart_dir': up_root,
@@ -36,7 +44,8 @@ class TestBuiltins(test_helpers.FilesystemMockingTestCase):
def test_upstart_frequency_single(self):
# files should be written out when frequency is ! per-instance
- new_root = self.makeDir()
+ new_root = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, new_root)
freq = PER_INSTANCE
self.patchOS(new_root)
@@ -49,16 +58,16 @@ class TestBuiltins(test_helpers.FilesystemMockingTestCase):
util.ensure_dir("/run")
util.ensure_dir("/etc/upstart")
- mock_subp = self.mocker.replace(util.subp, passthrough=False)
- mock_subp(["initctl", "reload-configuration"], capture=False)
- self.mocker.replay()
+ with mock.patch.object(util, 'subp') as mockobj:
+ h = upstart_job.UpstartJobPartHandler(paths)
+ h.handle_part('', handlers.CONTENT_START,
+ None, None, None)
+ h.handle_part('blah', 'text/upstart-job',
+ 'test.conf', 'blah', freq)
+ h.handle_part('', handlers.CONTENT_END,
+ None, None, None)
- h = upstart_job.UpstartJobPartHandler(paths)
- h.handle_part('', handlers.CONTENT_START,
- None, None, None)
- h.handle_part('blah', 'text/upstart-job',
- 'test.conf', 'blah', freq)
- h.handle_part('', handlers.CONTENT_END,
- None, None, None)
+ self.assertEquals(len(os.listdir('/etc/upstart')), 1)
- self.assertEquals(1, len(os.listdir('/etc/upstart')))
+ mockobj.assert_called_once_with(
+ ['initctl', 'reload-configuration'], capture=False)
diff --git a/tests/unittests/test_cs_util.py b/tests/unittests/test_cs_util.py
index 7d59222b..99fac84d 100644
--- a/tests/unittests/test_cs_util.py
+++ b/tests/unittests/test_cs_util.py
@@ -1,4 +1,4 @@
-from mocker import MockerTestCase
+import unittest
from cloudinit.cs_utils import Cepko
@@ -26,16 +26,21 @@ class CepkoMock(Cepko):
return SERVER_CONTEXT['tags']
-class CepkoResultTests(MockerTestCase):
+# 2015-01-22 BAW: This test is completely useless because it only ever tests
+# the CepkoMock object. Even in its original form, I don't think it ever
+# touched the underlying Cepko class methods.
+@unittest.skip('This test is completely useless')
+class CepkoResultTests(unittest.TestCase):
def setUp(self):
- self.mocked = self.mocker.replace("cloudinit.cs_utils.Cepko",
- spec=CepkoMock,
- count=False,
- passthrough=False)
- self.mocked()
- self.mocker.result(CepkoMock())
- self.mocker.replay()
- self.c = Cepko()
+ pass
+ ## self.mocked = self.mocker.replace("cloudinit.cs_utils.Cepko",
+ ## spec=CepkoMock,
+ ## count=False,
+ ## passthrough=False)
+ ## self.mocked()
+ ## self.mocker.result(CepkoMock())
+ ## self.mocker.replay()
+ ## self.c = Cepko()
def test_getitem(self):
result = self.c.all()
diff --git a/tests/unittests/test_data.py b/tests/unittests/test_data.py
index fd6bd8a1..7598837e 100644
--- a/tests/unittests/test_data.py
+++ b/tests/unittests/test_data.py
@@ -1,10 +1,17 @@
"""Tests for handling of userdata within cloud init."""
-import StringIO
-
import gzip
import logging
import os
+import shutil
+import tempfile
+
+try:
+ from unittest import mock
+except ImportError:
+ import mock
+
+from six import BytesIO, StringIO
from email.mime.application import MIMEApplication
from email.mime.base import MIMEBase
@@ -43,17 +50,16 @@ class TestConsumeUserData(helpers.FilesystemMockingTestCase):
self._log_handler = None
def tearDown(self):
- helpers.FilesystemMockingTestCase.tearDown(self)
if self._log_handler and self._log:
self._log.removeHandler(self._log_handler)
+ helpers.FilesystemMockingTestCase.tearDown(self)
def _patchIn(self, root):
- self.restore()
self.patchOS(root)
self.patchUtils(root)
def capture_log(self, lvl=logging.DEBUG):
- log_file = StringIO.StringIO()
+ log_file = StringIO()
self._log_handler = logging.StreamHandler(log_file)
self._log_handler.setLevel(lvl)
self._log = log.getLogger()
@@ -71,7 +77,8 @@ class TestConsumeUserData(helpers.FilesystemMockingTestCase):
ci = stages.Init()
ci.datasource = FakeDataSource(blob)
- new_root = self.makeDir()
+ new_root = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, new_root)
self.patchUtils(new_root)
self.patchOS(new_root)
ci.fetch()
@@ -99,7 +106,8 @@ class TestConsumeUserData(helpers.FilesystemMockingTestCase):
{ "op": "add", "path": "/foo", "value": "quxC" }
]
'''
- new_root = self.makeDir()
+ new_root = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, new_root)
self._patchIn(new_root)
initer = stages.Init()
initer.datasource = FakeDataSource(user_blob, vendordata=vendor_blob)
@@ -138,7 +146,8 @@ class TestConsumeUserData(helpers.FilesystemMockingTestCase):
{ "op": "add", "path": "/foo", "value": "quxC" }
]
'''
- new_root = self.makeDir()
+ new_root = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, new_root)
self._patchIn(new_root)
initer = stages.Init()
initer.datasource = FakeDataSource(user_blob, vendordata=vendor_blob)
@@ -184,7 +193,8 @@ c: d
ci = stages.Init()
ci.datasource = FakeDataSource(str(message))
- new_root = self.makeDir()
+ new_root = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, new_root)
self.patchUtils(new_root)
self.patchOS(new_root)
ci.fetch()
@@ -214,7 +224,8 @@ name: user
run:
- z
'''
- new_root = self.makeDir()
+ new_root = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, new_root)
self._patchIn(new_root)
initer = stages.Init()
initer.datasource = FakeDataSource(user_blob, vendordata=vendor_blob)
@@ -249,7 +260,8 @@ vendor_data:
enabled: True
prefix: /bin/true
'''
- new_root = self.makeDir()
+ new_root = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, new_root)
self._patchIn(new_root)
initer = stages.Init()
initer.datasource = FakeDataSource(user_blob, vendordata=vendor_blob)
@@ -309,7 +321,8 @@ p: 1
paths = c_helpers.Paths({}, ds=FakeDataSource(''))
cloud_cfg = handlers.cloud_config.CloudConfigPartHandler(paths)
- new_root = self.makeDir()
+ new_root = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, new_root)
self.patchUtils(new_root)
self.patchOS(new_root)
cloud_cfg.handle_part(None, handlers.CONTENT_START, None, None, None,
@@ -335,25 +348,25 @@ p: 1
data = "arbitrary text\n"
ci.datasource = FakeDataSource(data)
- mock_write = self.mocker.replace("cloudinit.util.write_file",
- passthrough=False)
- mock_write(ci.paths.get_ipath("cloud_config"), "", 0600)
- self.mocker.replay()
+ with mock.patch('cloudinit.util.write_file') as mockobj:
+ log_file = self.capture_log(logging.WARNING)
+ ci.fetch()
+ ci.consume_data()
+ self.assertIn(
+ "Unhandled non-multipart (text/x-not-multipart) userdata:",
+ log_file.getvalue())
+
+ mockobj.assert_called_once_with(
+ ci.paths.get_ipath("cloud_config"), "", 0o600)
- log_file = self.capture_log(logging.WARNING)
- ci.fetch()
- ci.consume_data()
- self.assertIn(
- "Unhandled non-multipart (text/x-not-multipart) userdata:",
- log_file.getvalue())
def test_mime_gzip_compressed(self):
"""Tests that individual message gzip encoding works."""
def gzip_part(text):
- contents = StringIO.StringIO()
- f = gzip.GzipFile(fileobj=contents, mode='w')
- f.write(str(text))
+ contents = BytesIO()
+ f = gzip.GzipFile(fileobj=contents, mode='wb')
+ f.write(util.encode_text(text))
f.flush()
f.close()
return MIMEApplication(contents.getvalue(), 'gzip')
@@ -374,7 +387,8 @@ c: 4
message.attach(gzip_part(base_content2))
ci = stages.Init()
ci.datasource = FakeDataSource(str(message))
- new_root = self.makeDir()
+ new_root = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, new_root)
self.patchUtils(new_root)
self.patchOS(new_root)
ci.fetch()
@@ -394,17 +408,15 @@ c: 4
message.set_payload("Just text")
ci.datasource = FakeDataSource(message.as_string())
- mock_write = self.mocker.replace("cloudinit.util.write_file",
- passthrough=False)
- mock_write(ci.paths.get_ipath("cloud_config"), "", 0600)
- self.mocker.replay()
-
- log_file = self.capture_log(logging.WARNING)
- ci.fetch()
- ci.consume_data()
- self.assertIn(
- "Unhandled unknown content-type (text/plain)",
- log_file.getvalue())
+ with mock.patch('cloudinit.util.write_file') as mockobj:
+ log_file = self.capture_log(logging.WARNING)
+ ci.fetch()
+ ci.consume_data()
+ self.assertIn(
+ "Unhandled unknown content-type (text/plain)",
+ log_file.getvalue())
+ mockobj.assert_called_once_with(
+ ci.paths.get_ipath("cloud_config"), "", 0o600)
def test_shellscript(self):
"""Raw text starting #!/bin/sh is treated as script."""
@@ -413,16 +425,17 @@ c: 4
ci.datasource = FakeDataSource(script)
outpath = os.path.join(ci.paths.get_ipath_cur("scripts"), "part-001")
- mock_write = self.mocker.replace("cloudinit.util.write_file",
- passthrough=False)
- mock_write(ci.paths.get_ipath("cloud_config"), "", 0600)
- mock_write(outpath, script, 0700)
- self.mocker.replay()
- log_file = self.capture_log(logging.WARNING)
- ci.fetch()
- ci.consume_data()
- self.assertEqual("", log_file.getvalue())
+ with mock.patch('cloudinit.util.write_file') as mockobj:
+ log_file = self.capture_log(logging.WARNING)
+ ci.fetch()
+ ci.consume_data()
+ self.assertEqual("", log_file.getvalue())
+
+ mockobj.assert_has_calls([
+ mock.call(outpath, script, 0o700),
+ mock.call(ci.paths.get_ipath("cloud_config"), "", 0o600),
+ ])
def test_mime_text_x_shellscript(self):
"""Mime message of type text/x-shellscript is treated as script."""
@@ -433,16 +446,17 @@ c: 4
ci.datasource = FakeDataSource(message.as_string())
outpath = os.path.join(ci.paths.get_ipath_cur("scripts"), "part-001")
- mock_write = self.mocker.replace("cloudinit.util.write_file",
- passthrough=False)
- mock_write(ci.paths.get_ipath("cloud_config"), "", 0600)
- mock_write(outpath, script, 0700)
- self.mocker.replay()
- log_file = self.capture_log(logging.WARNING)
- ci.fetch()
- ci.consume_data()
- self.assertEqual("", log_file.getvalue())
+ with mock.patch('cloudinit.util.write_file') as mockobj:
+ log_file = self.capture_log(logging.WARNING)
+ ci.fetch()
+ ci.consume_data()
+ self.assertEqual("", log_file.getvalue())
+
+ mockobj.assert_has_calls([
+ mock.call(outpath, script, 0o700),
+ mock.call(ci.paths.get_ipath("cloud_config"), "", 0o600),
+ ])
def test_mime_text_plain_shell(self):
"""Mime type text/plain starting #!/bin/sh is treated as script."""
@@ -453,13 +467,14 @@ c: 4
ci.datasource = FakeDataSource(message.as_string())
outpath = os.path.join(ci.paths.get_ipath_cur("scripts"), "part-001")
- mock_write = self.mocker.replace("cloudinit.util.write_file",
- passthrough=False)
- mock_write(outpath, script, 0700)
- mock_write(ci.paths.get_ipath("cloud_config"), "", 0600)
- self.mocker.replay()
- log_file = self.capture_log(logging.WARNING)
- ci.fetch()
- ci.consume_data()
- self.assertEqual("", log_file.getvalue())
+ with mock.patch('cloudinit.util.write_file') as mockobj:
+ log_file = self.capture_log(logging.WARNING)
+ ci.fetch()
+ ci.consume_data()
+ self.assertEqual("", log_file.getvalue())
+
+ mockobj.assert_has_calls([
+ mock.call(outpath, script, 0o700),
+ mock.call(ci.paths.get_ipath("cloud_config"), "", 0o600),
+ ])
diff --git a/tests/unittests/test_datasource/test_altcloud.py b/tests/unittests/test_datasource/test_altcloud.py
index 1a48ee5f..e9cd2fa5 100644
--- a/tests/unittests/test_datasource/test_altcloud.py
+++ b/tests/unittests/test_datasource/test_altcloud.py
@@ -46,7 +46,7 @@ def _write_cloud_info_file(value):
cifile = open(cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE, 'w')
cifile.write(value)
cifile.close()
- os.chmod(cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE, 0664)
+ os.chmod(cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE, 0o664)
def _remove_cloud_info_file():
@@ -67,12 +67,12 @@ def _write_user_data_files(mount_dir, value):
udfile = open(deltacloud_user_data_file, 'w')
udfile.write(value)
udfile.close()
- os.chmod(deltacloud_user_data_file, 0664)
+ os.chmod(deltacloud_user_data_file, 0o664)
udfile = open(user_data_file, 'w')
udfile.write(value)
udfile.close()
- os.chmod(user_data_file, 0664)
+ os.chmod(user_data_file, 0o664)
def _remove_user_data_files(mount_dir,
diff --git a/tests/unittests/test_datasource/test_azure.py b/tests/unittests/test_datasource/test_azure.py
index e992a006..1f0330b3 100644
--- a/tests/unittests/test_datasource/test_azure.py
+++ b/tests/unittests/test_datasource/test_azure.py
@@ -3,12 +3,30 @@ from cloudinit.util import load_file
from cloudinit.sources import DataSourceAzure
from ..helpers import populate_dir
+try:
+ from unittest import mock
+except ImportError:
+ import mock
+try:
+ from contextlib import ExitStack
+except ImportError:
+ from contextlib2 import ExitStack
+
import base64
import crypt
-from mocker import MockerTestCase
import os
import stat
import yaml
+import shutil
+import tempfile
+import unittest
+
+
+def b64(source):
+ # In Python 3, b64encode only accepts bytes and returns bytes.
+ if not isinstance(source, bytes):
+ source = source.encode('utf-8')
+ return base64.b64encode(source).decode('us-ascii')
def construct_valid_ovf_env(data=None, pubkeys=None, userdata=None):
@@ -40,7 +58,7 @@ def construct_valid_ovf_env(data=None, pubkeys=None, userdata=None):
content += "<%s%s>%s</%s>\n" % (key, attrs, val, key)
if userdata:
- content += "<UserData>%s</UserData>\n" % (base64.b64encode(userdata))
+ content += "<UserData>%s</UserData>\n" % (b64(userdata))
if pubkeys:
content += "<SSH><PublicKeys>\n"
@@ -66,26 +84,24 @@ def construct_valid_ovf_env(data=None, pubkeys=None, userdata=None):
return content
-class TestAzureDataSource(MockerTestCase):
+class TestAzureDataSource(unittest.TestCase):
def setUp(self):
- # makeDir comes from MockerTestCase
- self.tmp = self.makeDir()
+ self.tmp = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, self.tmp)
# patch cloud_dir, so our 'seed_dir' is guaranteed empty
self.paths = helpers.Paths({'cloud_dir': self.tmp})
self.waagent_d = os.path.join(self.tmp, 'var', 'lib', 'waagent')
- self.unapply = []
- super(TestAzureDataSource, self).setUp()
+ self.patches = ExitStack()
+ self.addCleanup(self.patches.close)
- def tearDown(self):
- apply_patches([i for i in reversed(self.unapply)])
- super(TestAzureDataSource, self).tearDown()
+ super(TestAzureDataSource, self).setUp()
def apply_patches(self, patches):
- ret = apply_patches(patches)
- self.unapply += ret
+ for module, name, new in patches:
+ self.patches.enter_context(mock.patch.object(module, name, new))
def _get_ds(self, data):
@@ -117,16 +133,14 @@ class TestAzureDataSource(MockerTestCase):
mod = DataSourceAzure
mod.BUILTIN_DS_CONFIG['data_dir'] = self.waagent_d
- self.apply_patches([(mod, 'list_possible_azure_ds_devs', dsdevs)])
-
- self.apply_patches([(mod, 'invoke_agent', _invoke_agent),
- (mod, 'wait_for_files', _wait_for_files),
- (mod, 'pubkeys_from_crt_files',
- _pubkeys_from_crt_files),
- (mod, 'iid_from_shared_config',
- _iid_from_shared_config),
- (mod, 'apply_hostname_bounce',
- _apply_hostname_bounce), ])
+ self.apply_patches([
+ (mod, 'list_possible_azure_ds_devs', dsdevs),
+ (mod, 'invoke_agent', _invoke_agent),
+ (mod, 'wait_for_files', _wait_for_files),
+ (mod, 'pubkeys_from_crt_files', _pubkeys_from_crt_files),
+ (mod, 'iid_from_shared_config', _iid_from_shared_config),
+ (mod, 'apply_hostname_bounce', _apply_hostname_bounce),
+ ])
dsrc = mod.DataSourceAzureNet(
data.get('sys_cfg', {}), distro=None, paths=self.paths)
@@ -153,7 +167,7 @@ class TestAzureDataSource(MockerTestCase):
ret = dsrc.get_data()
self.assertTrue(ret)
self.assertTrue(os.path.isdir(self.waagent_d))
- self.assertEqual(stat.S_IMODE(os.stat(self.waagent_d).st_mode), 0700)
+ self.assertEqual(stat.S_IMODE(os.stat(self.waagent_d).st_mode), 0o700)
def test_user_cfg_set_agent_command_plain(self):
# set dscfg in via plaintext
@@ -174,7 +188,7 @@ class TestAzureDataSource(MockerTestCase):
# set dscfg in via base64 encoded yaml
cfg = {'agent_command': "my_command"}
odata = {'HostName': "myhost", 'UserName': "myuser",
- 'dscfg': {'text': base64.b64encode(yaml.dump(cfg)),
+ 'dscfg': {'text': b64(yaml.dump(cfg)),
'encoding': 'base64'}}
data = {'ovfcontent': construct_valid_ovf_env(data=odata)}
@@ -226,13 +240,13 @@ class TestAzureDataSource(MockerTestCase):
def test_userdata_found(self):
mydata = "FOOBAR"
- odata = {'UserData': base64.b64encode(mydata)}
+ odata = {'UserData': b64(mydata)}
data = {'ovfcontent': construct_valid_ovf_env(data=odata)}
dsrc = self._get_ds(data)
ret = dsrc.get_data()
self.assertTrue(ret)
- self.assertEqual(dsrc.userdata_raw, mydata)
+ self.assertEqual(dsrc.userdata_raw, mydata.encode('utf-8'))
def test_no_datasource_expected(self):
# no source should be found if no seed_dir and no devs
@@ -274,7 +288,7 @@ class TestAzureDataSource(MockerTestCase):
'command': 'my-bounce-command',
'hostname_command': 'my-hostname-command'}}
odata = {'HostName': "xhost",
- 'dscfg': {'text': base64.b64encode(yaml.dump(cfg)),
+ 'dscfg': {'text': b64(yaml.dump(cfg)),
'encoding': 'base64'}}
data = {'ovfcontent': construct_valid_ovf_env(data=odata)}
self._get_ds(data).get_data()
@@ -289,7 +303,7 @@ class TestAzureDataSource(MockerTestCase):
# config specifying set_hostname off should not bounce
cfg = {'set_hostname': False}
odata = {'HostName': "xhost",
- 'dscfg': {'text': base64.b64encode(yaml.dump(cfg)),
+ 'dscfg': {'text': b64(yaml.dump(cfg)),
'encoding': 'base64'}}
data = {'ovfcontent': construct_valid_ovf_env(data=odata)}
self._get_ds(data).get_data()
@@ -318,7 +332,7 @@ class TestAzureDataSource(MockerTestCase):
# Make sure that user can affect disk aliases
dscfg = {'disk_aliases': {'ephemeral0': '/dev/sdc'}}
odata = {'HostName': "myhost", 'UserName': "myuser",
- 'dscfg': {'text': base64.b64encode(yaml.dump(dscfg)),
+ 'dscfg': {'text': b64(yaml.dump(dscfg)),
'encoding': 'base64'}}
usercfg = {'disk_setup': {'/dev/sdc': {'something': '...'},
'ephemeral0': False}}
@@ -340,7 +354,7 @@ class TestAzureDataSource(MockerTestCase):
dsrc = self._get_ds(data)
dsrc.get_data()
- self.assertEqual(userdata, dsrc.userdata_raw)
+ self.assertEqual(userdata.encode('us-ascii'), dsrc.userdata_raw)
def test_ovf_env_arrives_in_waagent_dir(self):
xml = construct_valid_ovf_env(data={}, userdata="FOODATA")
@@ -355,7 +369,7 @@ class TestAzureDataSource(MockerTestCase):
def test_existing_ovf_same(self):
# waagent/SharedConfig left alone if found ovf-env.xml same as cached
- odata = {'UserData': base64.b64encode("SOMEUSERDATA")}
+ odata = {'UserData': b64("SOMEUSERDATA")}
data = {'ovfcontent': construct_valid_ovf_env(data=odata)}
populate_dir(self.waagent_d,
@@ -379,9 +393,9 @@ class TestAzureDataSource(MockerTestCase):
# 'get_data' should remove SharedConfig.xml in /var/lib/waagent
# if ovf-env.xml differs.
cached_ovfenv = construct_valid_ovf_env(
- {'userdata': base64.b64encode("FOO_USERDATA")})
+ {'userdata': b64("FOO_USERDATA")})
new_ovfenv = construct_valid_ovf_env(
- {'userdata': base64.b64encode("NEW_USERDATA")})
+ {'userdata': b64("NEW_USERDATA")})
populate_dir(self.waagent_d,
{'ovf-env.xml': cached_ovfenv,
@@ -391,7 +405,7 @@ class TestAzureDataSource(MockerTestCase):
dsrc = self._get_ds({'ovfcontent': new_ovfenv})
ret = dsrc.get_data()
self.assertTrue(ret)
- self.assertEqual(dsrc.userdata_raw, "NEW_USERDATA")
+ self.assertEqual(dsrc.userdata_raw, b"NEW_USERDATA")
self.assertTrue(os.path.exists(
os.path.join(self.waagent_d, 'otherfile')))
self.assertFalse(
@@ -402,7 +416,7 @@ class TestAzureDataSource(MockerTestCase):
load_file(os.path.join(self.waagent_d, 'ovf-env.xml')))
-class TestReadAzureOvf(MockerTestCase):
+class TestReadAzureOvf(unittest.TestCase):
def test_invalid_xml_raises_non_azure_ds(self):
invalid_xml = "<foo>" + construct_valid_ovf_env(data={})
self.assertRaises(DataSourceAzure.BrokenAzureDataSource,
@@ -417,7 +431,7 @@ class TestReadAzureOvf(MockerTestCase):
self.assertIn(mypk, cfg['_pubkeys'])
-class TestReadAzureSharedConfig(MockerTestCase):
+class TestReadAzureSharedConfig(unittest.TestCase):
def test_valid_content(self):
xml = """<?xml version="1.0" encoding="utf-8"?>
<SharedConfig>
@@ -429,14 +443,3 @@ class TestReadAzureSharedConfig(MockerTestCase):
</SharedConfig>"""
ret = DataSourceAzure.iid_from_shared_config_content(xml)
self.assertEqual("MY_INSTANCE_ID", ret)
-
-
-def apply_patches(patches):
- ret = []
- for (ref, name, replace) in patches:
- if replace is None:
- continue
- orig = getattr(ref, name)
- setattr(ref, name, replace)
- ret.append((ref, name, orig))
- return ret
diff --git a/tests/unittests/test_datasource/test_configdrive.py b/tests/unittests/test_datasource/test_configdrive.py
index d88066e5..258c68e2 100644
--- a/tests/unittests/test_datasource/test_configdrive.py
+++ b/tests/unittests/test_datasource/test_configdrive.py
@@ -1,10 +1,18 @@
from copy import copy
import json
import os
-import os.path
-
-import mocker
-from mocker import MockerTestCase
+import shutil
+import tempfile
+import unittest
+
+try:
+ from unittest import mock
+except ImportError:
+ import mock
+try:
+ from contextlib import ExitStack
+except ImportError:
+ from contextlib2 import ExitStack
from cloudinit import helpers
from cloudinit import settings
@@ -12,8 +20,6 @@ from cloudinit.sources import DataSourceConfigDrive as ds
from cloudinit.sources.helpers import openstack
from cloudinit import util
-from .. import helpers as unit_helpers
-
PUBKEY = u'ssh-rsa AAAAB3NzaC1....sIkJhq8wdX+4I3A4cYbYP ubuntu@server-460\n'
EC2_META = {
'ami-id': 'ami-00000001',
@@ -64,11 +70,12 @@ CFG_DRIVE_FILES_V2 = {
'openstack/latest/user_data': USER_DATA}
-class TestConfigDriveDataSource(MockerTestCase):
+class TestConfigDriveDataSource(unittest.TestCase):
def setUp(self):
super(TestConfigDriveDataSource, self).setUp()
- self.tmp = self.makeDir()
+ self.tmp = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, self.tmp)
def test_ec2_metadata(self):
populate_dir(self.tmp, CFG_DRIVE_FILES_V2)
@@ -91,23 +98,28 @@ class TestConfigDriveDataSource(MockerTestCase):
'swap': '/dev/vda3',
}
for name, dev_name in name_tests.items():
- with unit_helpers.mocker() as my_mock:
- find_mock = my_mock.replace(util.find_devs_with,
- spec=False, passthrough=False)
+ with ExitStack() as mocks:
provided_name = dev_name[len('/dev/'):]
provided_name = "s" + provided_name[1:]
- find_mock(mocker.ARGS)
- my_mock.result([provided_name])
- exists_mock = my_mock.replace(os.path.exists,
- spec=False, passthrough=False)
- exists_mock(mocker.ARGS)
- my_mock.result(False)
- exists_mock(mocker.ARGS)
- my_mock.result(True)
- my_mock.replay()
+ find_mock = mocks.enter_context(
+ mock.patch.object(util, 'find_devs_with',
+ return_value=[provided_name]))
+ # We want os.path.exists() to return False on its first call,
+ # and True on its second call. We use a handy generator as
+ # the mock side effect for this. The mocked function returns
+ # what the side effect returns.
+ def exists_side_effect():
+ yield False
+ yield True
+ exists_mock = mocks.enter_context(
+ mock.patch.object(os.path, 'exists',
+ side_effect=exists_side_effect()))
device = cfg_ds.device_name_to_device(name)
self.assertEquals(dev_name, device)
+ find_mock.assert_called_once_with(mock.ANY)
+ self.assertEqual(exists_mock.call_count, 2)
+
def test_dev_os_map(self):
populate_dir(self.tmp, CFG_DRIVE_FILES_V2)
cfg_ds = ds.DataSourceConfigDrive(settings.CFG_BUILTIN,
@@ -123,19 +135,19 @@ class TestConfigDriveDataSource(MockerTestCase):
'swap': '/dev/vda3',
}
for name, dev_name in name_tests.items():
- with unit_helpers.mocker() as my_mock:
- find_mock = my_mock.replace(util.find_devs_with,
- spec=False, passthrough=False)
- find_mock(mocker.ARGS)
- my_mock.result([dev_name])
- exists_mock = my_mock.replace(os.path.exists,
- spec=False, passthrough=False)
- exists_mock(mocker.ARGS)
- my_mock.result(True)
- my_mock.replay()
+ with ExitStack() as mocks:
+ find_mock = mocks.enter_context(
+ mock.patch.object(util, 'find_devs_with',
+ return_value=[dev_name]))
+ exists_mock = mocks.enter_context(
+ mock.patch.object(os.path, 'exists',
+ return_value=True))
device = cfg_ds.device_name_to_device(name)
self.assertEquals(dev_name, device)
+ find_mock.assert_called_once_with(mock.ANY)
+ exists_mock.assert_called_once_with(mock.ANY)
+
def test_dev_ec2_remap(self):
populate_dir(self.tmp, CFG_DRIVE_FILES_V2)
cfg_ds = ds.DataSourceConfigDrive(settings.CFG_BUILTIN,
@@ -156,16 +168,21 @@ class TestConfigDriveDataSource(MockerTestCase):
'root2k': None,
}
for name, dev_name in name_tests.items():
- with unit_helpers.mocker(verify_calls=False) as my_mock:
- exists_mock = my_mock.replace(os.path.exists,
- spec=False, passthrough=False)
- exists_mock(mocker.ARGS)
- my_mock.result(False)
- exists_mock(mocker.ARGS)
- my_mock.result(True)
- my_mock.replay()
+ # We want os.path.exists() to return False on its first call,
+ # and True on its second call. We use a handy generator as
+ # the mock side effect for this. The mocked function returns
+ # what the side effect returns.
+ def exists_side_effect():
+ yield False
+ yield True
+ with mock.patch.object(os.path, 'exists',
+ side_effect=exists_side_effect()):
device = cfg_ds.device_name_to_device(name)
self.assertEquals(dev_name, device)
+ # We don't assert the call count for os.path.exists() because
+ # not all of the entries in name_tests results in two calls to
+ # that function. Specifically, 'root2k' doesn't seem to call
+ # it at all.
def test_dev_ec2_map(self):
populate_dir(self.tmp, CFG_DRIVE_FILES_V2)
@@ -173,12 +190,6 @@ class TestConfigDriveDataSource(MockerTestCase):
None,
helpers.Paths({}))
found = ds.read_config_drive(self.tmp)
- exists_mock = self.mocker.replace(os.path.exists,
- spec=False, passthrough=False)
- exists_mock(mocker.ARGS)
- self.mocker.count(0, None)
- self.mocker.result(True)
- self.mocker.replay()
ec2_md = found['ec2-metadata']
os_md = found['metadata']
cfg_ds.ec2_metadata = ec2_md
@@ -193,8 +204,9 @@ class TestConfigDriveDataSource(MockerTestCase):
'root2k': None,
}
for name, dev_name in name_tests.items():
- device = cfg_ds.device_name_to_device(name)
- self.assertEquals(dev_name, device)
+ with mock.patch.object(os.path, 'exists', return_value=True):
+ device = cfg_ds.device_name_to_device(name)
+ self.assertEquals(dev_name, device)
def test_dir_valid(self):
"""Verify a dir is read as such."""
@@ -326,7 +338,7 @@ def populate_ds_from_read_config(cfg_ds, source, results):
def populate_dir(seed_dir, files):
- for (name, content) in files.iteritems():
+ for (name, content) in files.items():
path = os.path.join(seed_dir, name)
dirname = os.path.dirname(path)
if not os.path.isdir(dirname):
diff --git a/tests/unittests/test_datasource/test_digitalocean.py b/tests/unittests/test_datasource/test_digitalocean.py
index d1270fc2..98f9cfac 100644
--- a/tests/unittests/test_datasource/test_digitalocean.py
+++ b/tests/unittests/test_datasource/test_digitalocean.py
@@ -18,8 +18,7 @@
import httpretty
import re
-from types import ListType
-from urlparse import urlparse
+from six.moves.urllib_parse import urlparse
from cloudinit import settings
from cloudinit import helpers
@@ -110,7 +109,7 @@ class TestDataSourceDigitalOcean(test_helpers.HttprettyTestCase):
self.assertEqual([DO_META.get('public-keys')],
self.ds.get_public_ssh_keys())
- self.assertIs(type(self.ds.get_public_ssh_keys()), ListType)
+ self.assertIsInstance(self.ds.get_public_ssh_keys(), list)
@httpretty.activate
def test_multiple_ssh_keys(self):
@@ -124,4 +123,4 @@ class TestDataSourceDigitalOcean(test_helpers.HttprettyTestCase):
self.assertEqual(DO_META.get('public-keys').splitlines(),
self.ds.get_public_ssh_keys())
- self.assertIs(type(self.ds.get_public_ssh_keys()), ListType)
+ self.assertIsInstance(self.ds.get_public_ssh_keys(), list)
diff --git a/tests/unittests/test_datasource/test_gce.py b/tests/unittests/test_datasource/test_gce.py
index 06050bb1..6dd4b5ed 100644
--- a/tests/unittests/test_datasource/test_gce.py
+++ b/tests/unittests/test_datasource/test_gce.py
@@ -19,7 +19,7 @@ import httpretty
import re
from base64 import b64encode, b64decode
-from urlparse import urlparse
+from six.moves.urllib_parse import urlparse
from cloudinit import settings
from cloudinit import helpers
@@ -45,7 +45,7 @@ GCE_META_ENCODING = {
'instance/id': '12345',
'instance/hostname': 'server.project-baz.local',
'instance/zone': 'baz/bang',
- 'instance/attributes/user-data': b64encode('/bin/echo baz\n'),
+ 'instance/attributes/user-data': b64encode(b'/bin/echo baz\n'),
'instance/attributes/user-data-encoding': 'base64',
}
diff --git a/tests/unittests/test_datasource/test_maas.py b/tests/unittests/test_datasource/test_maas.py
index c157beb8..6af0cd82 100644
--- a/tests/unittests/test_datasource/test_maas.py
+++ b/tests/unittests/test_datasource/test_maas.py
@@ -1,19 +1,26 @@
from copy import copy
import os
+import shutil
+import tempfile
+import unittest
from cloudinit.sources import DataSourceMAAS
from cloudinit import url_helper
from ..helpers import populate_dir
-import mocker
+try:
+ from unittest import mock
+except ImportError:
+ import mock
-class TestMAASDataSource(mocker.MockerTestCase):
+class TestMAASDataSource(unittest.TestCase):
def setUp(self):
super(TestMAASDataSource, self).setUp()
# Make a temp directoy for tests to use.
- self.tmp = self.makeDir()
+ self.tmp = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, self.tmp)
def test_seed_dir_valid(self):
"""Verify a valid seeddir is read as such."""
@@ -93,16 +100,18 @@ class TestMAASDataSource(mocker.MockerTestCase):
def test_seed_url_valid(self):
"""Verify that valid seed_url is read as such."""
- valid = {'meta-data/instance-id': 'i-instanceid',
+ valid = {
+ 'meta-data/instance-id': 'i-instanceid',
'meta-data/local-hostname': 'test-hostname',
'meta-data/public-keys': 'test-hostname',
- 'user-data': 'foodata'}
+ 'user-data': 'foodata',
+ }
valid_order = [
'meta-data/local-hostname',
'meta-data/instance-id',
'meta-data/public-keys',
'user-data',
- ]
+ ]
my_seed = "http://example.com/xmeta"
my_ver = "1999-99-99"
my_headers = {'header1': 'value1', 'header2': 'value2'}
@@ -110,28 +119,38 @@ class TestMAASDataSource(mocker.MockerTestCase):
def my_headers_cb(url):
return my_headers
- mock_request = self.mocker.replace(url_helper.readurl,
- passthrough=False)
-
- for key in valid_order:
- url = "%s/%s/%s" % (my_seed, my_ver, key)
- mock_request(url, headers=None, timeout=mocker.ANY,
- data=mocker.ANY, sec_between=mocker.ANY,
- ssl_details=mocker.ANY, retries=mocker.ANY,
- headers_cb=my_headers_cb,
- exception_cb=mocker.ANY)
- resp = valid.get(key)
- self.mocker.result(url_helper.StringResponse(resp))
- self.mocker.replay()
-
- (userdata, metadata) = DataSourceMAAS.read_maas_seed_url(my_seed,
- header_cb=my_headers_cb, version=my_ver)
-
- self.assertEqual("foodata", userdata)
- self.assertEqual(metadata['instance-id'],
- valid['meta-data/instance-id'])
- self.assertEqual(metadata['local-hostname'],
- valid['meta-data/local-hostname'])
+ # Each time url_helper.readurl() is called, something different is
+ # returned based on the canned data above. We need to build up a list
+ # of side effect return values, which the mock will return. At the
+ # same time, we'll build up a list of expected call arguments for
+ # asserting after the code under test is run.
+ calls = []
+
+ def side_effect():
+ for key in valid_order:
+ resp = valid.get(key)
+ url = "%s/%s/%s" % (my_seed, my_ver, key)
+ calls.append(
+ mock.call(url, headers=None, timeout=mock.ANY,
+ data=mock.ANY, sec_between=mock.ANY,
+ ssl_details=mock.ANY, retries=mock.ANY,
+ headers_cb=my_headers_cb,
+ exception_cb=mock.ANY))
+ yield url_helper.StringResponse(resp)
+
+ # Now do the actual call of the code under test.
+ with mock.patch.object(url_helper, 'readurl',
+ side_effect=side_effect()) as mockobj:
+ userdata, metadata = DataSourceMAAS.read_maas_seed_url(
+ my_seed, header_cb=my_headers_cb, version=my_ver)
+
+ self.assertEqual("foodata", userdata)
+ self.assertEqual(metadata['instance-id'],
+ valid['meta-data/instance-id'])
+ self.assertEqual(metadata['local-hostname'],
+ valid['meta-data/local-hostname'])
+
+ mockobj.has_calls(calls)
def test_seed_url_invalid(self):
"""Verify that invalid seed_url raises MAASSeedDirMalformed."""
diff --git a/tests/unittests/test_datasource/test_nocloud.py b/tests/unittests/test_datasource/test_nocloud.py
index e9235951..480a4012 100644
--- a/tests/unittests/test_datasource/test_nocloud.py
+++ b/tests/unittests/test_datasource/test_nocloud.py
@@ -3,33 +3,38 @@ from cloudinit.sources import DataSourceNoCloud
from cloudinit import util
from ..helpers import populate_dir
-from mocker import MockerTestCase
import os
import yaml
+import shutil
+import tempfile
+import unittest
+try:
+ from unittest import mock
+except ImportError:
+ import mock
+try:
+ from contextlib import ExitStack
+except ImportError:
+ from contextlib2 import ExitStack
-class TestNoCloudDataSource(MockerTestCase):
+
+class TestNoCloudDataSource(unittest.TestCase):
def setUp(self):
- self.tmp = self.makeDir()
+ self.tmp = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, self.tmp)
self.paths = helpers.Paths({'cloud_dir': self.tmp})
self.cmdline = "root=TESTCMDLINE"
- self.unapply = []
- self.apply_patches([(util, 'get_cmdline', self._getcmdline)])
- super(TestNoCloudDataSource, self).setUp()
-
- def tearDown(self):
- apply_patches([i for i in reversed(self.unapply)])
- super(TestNoCloudDataSource, self).tearDown()
+ self.mocks = ExitStack()
+ self.addCleanup(self.mocks.close)
- def apply_patches(self, patches):
- ret = apply_patches(patches)
- self.unapply += ret
+ self.mocks.enter_context(
+ mock.patch.object(util, 'get_cmdline', return_value=self.cmdline))
- def _getcmdline(self):
- return self.cmdline
+ super(TestNoCloudDataSource, self).setUp()
def test_nocloud_seed_dir(self):
md = {'instance-id': 'IID', 'dsmode': 'local'}
@@ -59,7 +64,9 @@ class TestNoCloudDataSource(MockerTestCase):
def my_find_devs_with(*args, **kwargs):
raise PsuedoException
- self.apply_patches([(util, 'find_devs_with', my_find_devs_with)])
+ self.mocks.enter_context(
+ mock.patch.object(util, 'find_devs_with',
+ side_effect=PsuedoException))
# by default, NoCloud should search for filesystems by label
sys_cfg = {'datasource': {'NoCloud': {}}}
@@ -85,7 +92,7 @@ class TestNoCloudDataSource(MockerTestCase):
data = {
'fs_label': None,
- 'meta-data': {'instance-id': 'IID'},
+ 'meta-data': yaml.safe_dump({'instance-id': 'IID'}),
'user-data': "USER_DATA_RAW",
}
@@ -133,7 +140,7 @@ class TestNoCloudDataSource(MockerTestCase):
self.assertTrue(ret)
-class TestParseCommandLineData(MockerTestCase):
+class TestParseCommandLineData(unittest.TestCase):
def test_parse_cmdline_data_valid(self):
ds_id = "ds=nocloud"
@@ -178,15 +185,4 @@ class TestParseCommandLineData(MockerTestCase):
self.assertFalse(ret)
-def apply_patches(patches):
- ret = []
- for (ref, name, replace) in patches:
- if replace is None:
- continue
- orig = getattr(ref, name)
- setattr(ref, name, replace)
- ret.append((ref, name, orig))
- return ret
-
-
# vi: ts=4 expandtab
diff --git a/tests/unittests/test_datasource/test_opennebula.py b/tests/unittests/test_datasource/test_opennebula.py
index b4fd1f4d..ef534bab 100644
--- a/tests/unittests/test_datasource/test_opennebula.py
+++ b/tests/unittests/test_datasource/test_opennebula.py
@@ -1,12 +1,21 @@
from cloudinit import helpers
from cloudinit.sources import DataSourceOpenNebula as ds
from cloudinit import util
-from mocker import MockerTestCase
from ..helpers import populate_dir
from base64 import b64encode
import os
import pwd
+import shutil
+import tempfile
+import unittest
+
+def b64(source):
+ # In Python 3, b64encode only accepts bytes and returns bytes.
+ if not isinstance(source, bytes):
+ source = source.encode('utf-8')
+ return b64encode(source).decode('us-ascii')
+
TEST_VARS = {
'VAR1': 'single',
@@ -37,12 +46,13 @@ CMD_IP_OUT = '''\
'''
-class TestOpenNebulaDataSource(MockerTestCase):
+class TestOpenNebulaDataSource(unittest.TestCase):
parsed_user = None
def setUp(self):
super(TestOpenNebulaDataSource, self).setUp()
- self.tmp = self.makeDir()
+ self.tmp = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, self.tmp)
self.paths = helpers.Paths({'cloud_dir': self.tmp})
# defaults for few tests
@@ -176,7 +186,7 @@ class TestOpenNebulaDataSource(MockerTestCase):
self.assertEqual(USER_DATA, results['userdata'])
def test_user_data_encoding_required_for_decode(self):
- b64userdata = b64encode(USER_DATA)
+ b64userdata = b64(USER_DATA)
for k in ('USER_DATA', 'USERDATA'):
my_d = os.path.join(self.tmp, k)
populate_context_dir(my_d, {k: b64userdata})
@@ -188,7 +198,7 @@ class TestOpenNebulaDataSource(MockerTestCase):
def test_user_data_base64_encoding(self):
for k in ('USER_DATA', 'USERDATA'):
my_d = os.path.join(self.tmp, k)
- populate_context_dir(my_d, {k: b64encode(USER_DATA),
+ populate_context_dir(my_d, {k: b64(USER_DATA),
'USERDATA_ENCODING': 'base64'})
results = ds.read_context_disk_dir(my_d)
@@ -228,7 +238,7 @@ class TestOpenNebulaDataSource(MockerTestCase):
util.find_devs_with = orig_find_devs_with
-class TestOpenNebulaNetwork(MockerTestCase):
+class TestOpenNebulaNetwork(unittest.TestCase):
def setUp(self):
super(TestOpenNebulaNetwork, self).setUp()
@@ -280,7 +290,7 @@ iface eth0 inet static
''')
-class TestParseShellConfig(MockerTestCase):
+class TestParseShellConfig(unittest.TestCase):
def test_no_seconds(self):
cfg = '\n'.join(["foo=bar", "SECONDS=2", "xx=foo"])
# we could test 'sleep 2', but that would make the test run slower.
@@ -290,7 +300,7 @@ class TestParseShellConfig(MockerTestCase):
def populate_context_dir(path, variables):
data = "# Context variables generated by OpenNebula\n"
- for (k, v) in variables.iteritems():
+ for k, v in variables.items():
data += ("%s='%s'\n" % (k.upper(), v.replace(r"'", r"'\''")))
populate_dir(path, {'context.sh': data})
diff --git a/tests/unittests/test_datasource/test_openstack.py b/tests/unittests/test_datasource/test_openstack.py
index 49894e51..81ef1546 100644
--- a/tests/unittests/test_datasource/test_openstack.py
+++ b/tests/unittests/test_datasource/test_openstack.py
@@ -20,12 +20,11 @@ import copy
import json
import re
-from StringIO import StringIO
-
-from urlparse import urlparse
-
from .. import helpers as test_helpers
+from six import StringIO
+from six.moves.urllib.parse import urlparse
+
from cloudinit import helpers
from cloudinit import settings
from cloudinit.sources import DataSourceOpenStack as ds
diff --git a/tests/unittests/test_datasource/test_smartos.py b/tests/unittests/test_datasource/test_smartos.py
index 65675106..2fb9e1b6 100644
--- a/tests/unittests/test_datasource/test_smartos.py
+++ b/tests/unittests/test_datasource/test_smartos.py
@@ -22,6 +22,8 @@
# return responses.
#
+from __future__ import print_function
+
import base64
from cloudinit import helpers as c_helpers
from cloudinit.sources import DataSourceSmartOS
@@ -29,9 +31,18 @@ from .. import helpers
import os
import os.path
import re
+import shutil
+import tempfile
import stat
import uuid
+def b64(source):
+ # In Python 3, b64encode only accepts bytes and returns bytes.
+ if not isinstance(source, bytes):
+ source = source.encode('utf-8')
+ return base64.b64encode(source).decode('us-ascii')
+
+
MOCK_RETURNS = {
'hostname': 'test-host',
'root_authorized_keys': 'ssh-rsa AAAAB3Nz...aC1yc2E= keyname',
@@ -109,9 +120,10 @@ class TestSmartOSDataSource(helpers.FilesystemMockingTestCase):
def setUp(self):
helpers.FilesystemMockingTestCase.setUp(self)
- # makeDir comes from MockerTestCase
- self.tmp = self.makeDir()
- self.legacy_user_d = self.makeDir()
+ self.tmp = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, self.tmp)
+ self.legacy_user_d = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, self.legacy_user_d)
# If you should want to watch the logs...
self._log = None
@@ -227,7 +239,7 @@ class TestSmartOSDataSource(helpers.FilesystemMockingTestCase):
my_returns = MOCK_RETURNS.copy()
my_returns['base64_all'] = "true"
for k in ('hostname', 'cloud-init:user-data'):
- my_returns[k] = base64.b64encode(my_returns[k])
+ my_returns[k] = b64(my_returns[k])
dsrc = self._get_ds(mockdata=my_returns)
ret = dsrc.get_data()
@@ -248,7 +260,7 @@ class TestSmartOSDataSource(helpers.FilesystemMockingTestCase):
my_returns['b64-cloud-init:user-data'] = "true"
my_returns['b64-hostname'] = "true"
for k in ('hostname', 'cloud-init:user-data'):
- my_returns[k] = base64.b64encode(my_returns[k])
+ my_returns[k] = b64(my_returns[k])
dsrc = self._get_ds(mockdata=my_returns)
ret = dsrc.get_data()
@@ -264,7 +276,7 @@ class TestSmartOSDataSource(helpers.FilesystemMockingTestCase):
my_returns = MOCK_RETURNS.copy()
my_returns['base64_keys'] = 'hostname,ignored'
for k in ('hostname',):
- my_returns[k] = base64.b64encode(my_returns[k])
+ my_returns[k] = b64(my_returns[k])
dsrc = self._get_ds(mockdata=my_returns)
ret = dsrc.get_data()
@@ -365,7 +377,7 @@ class TestSmartOSDataSource(helpers.FilesystemMockingTestCase):
permissions = oct(os.stat(name_f)[stat.ST_MODE])[-3:]
if re.match(r'.*\/mdata-user-data$', name_f):
found_new = True
- print name_f
+ print(name_f)
self.assertEquals(permissions, '400')
self.assertFalse(found_new)
diff --git a/tests/unittests/test_distros/test_generic.py b/tests/unittests/test_distros/test_generic.py
index db6aa0e8..35153f0d 100644
--- a/tests/unittests/test_distros/test_generic.py
+++ b/tests/unittests/test_distros/test_generic.py
@@ -4,6 +4,8 @@ from cloudinit import util
from .. import helpers
import os
+import shutil
+import tempfile
unknown_arch_info = {
'arches': ['default'],
@@ -53,7 +55,8 @@ class TestGenericDistro(helpers.FilesystemMockingTestCase):
def setUp(self):
super(TestGenericDistro, self).setUp()
# Make a temp directoy for tests to use.
- self.tmp = self.makeDir()
+ self.tmp = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, self.tmp)
def _write_load_sudoers(self, _user, rules):
cls = distros.fetch("ubuntu")
@@ -64,7 +67,6 @@ class TestGenericDistro(helpers.FilesystemMockingTestCase):
self.patchUtils(self.tmp)
d.write_sudo_rules("harlowja", rules)
contents = util.load_file(d.ci_sudoers_fn)
- self.restore()
return contents
def _count_in(self, lines_look_for, text_content):
diff --git a/tests/unittests/test_distros/test_hostname.py b/tests/unittests/test_distros/test_hostname.py
index 8e644f4d..143e29a9 100644
--- a/tests/unittests/test_distros/test_hostname.py
+++ b/tests/unittests/test_distros/test_hostname.py
@@ -1,4 +1,4 @@
-from mocker import MockerTestCase
+import unittest
from cloudinit.distros.parsers import hostname
@@ -12,7 +12,7 @@ blahblah
BASE_HOSTNAME = BASE_HOSTNAME.strip()
-class TestHostnameHelper(MockerTestCase):
+class TestHostnameHelper(unittest.TestCase):
def test_parse_same(self):
hn = hostname.HostnameConf(BASE_HOSTNAME)
self.assertEquals(str(hn).strip(), BASE_HOSTNAME)
diff --git a/tests/unittests/test_distros/test_hosts.py b/tests/unittests/test_distros/test_hosts.py
index 687a0dab..fc701eaa 100644
--- a/tests/unittests/test_distros/test_hosts.py
+++ b/tests/unittests/test_distros/test_hosts.py
@@ -1,4 +1,4 @@
-from mocker import MockerTestCase
+import unittest
from cloudinit.distros.parsers import hosts
@@ -14,7 +14,7 @@ BASE_ETC = '''
BASE_ETC = BASE_ETC.strip()
-class TestHostsHelper(MockerTestCase):
+class TestHostsHelper(unittest.TestCase):
def test_parse(self):
eh = hosts.HostsConf(BASE_ETC)
self.assertEquals(eh.get_entry('127.0.0.1'), [['localhost']])
diff --git a/tests/unittests/test_distros/test_netconfig.py b/tests/unittests/test_distros/test_netconfig.py
index 193338e8..91e630ae 100644
--- a/tests/unittests/test_distros/test_netconfig.py
+++ b/tests/unittests/test_distros/test_netconfig.py
@@ -1,8 +1,16 @@
-from mocker import MockerTestCase
+import os
+import unittest
-import mocker
+try:
+ from unittest import mock
+except ImportError:
+ import mock
+try:
+ from contextlib import ExitStack
+except ImportError:
+ from contextlib2 import ExitStack
-import os
+from six import StringIO
from cloudinit import distros
from cloudinit import helpers
@@ -11,8 +19,6 @@ from cloudinit import util
from cloudinit.distros.parsers.sys_conf import SysConf
-from StringIO import StringIO
-
BASE_NET_CFG = '''
auto lo
@@ -74,7 +80,7 @@ class WriteBuffer(object):
return self.buffer.getvalue()
-class TestNetCfgDistro(MockerTestCase):
+class TestNetCfgDistro(unittest.TestCase):
def _get_distro(self, dname):
cls = distros.fetch(dname)
@@ -85,34 +91,28 @@ class TestNetCfgDistro(MockerTestCase):
def test_simple_write_ub(self):
ub_distro = self._get_distro('ubuntu')
- util_mock = self.mocker.replace(util.write_file,
- spec=False, passthrough=False)
- exists_mock = self.mocker.replace(os.path.isfile,
- spec=False, passthrough=False)
+ with ExitStack() as mocks:
+ write_bufs = {}
- exists_mock(mocker.ARGS)
- self.mocker.count(0, None)
- self.mocker.result(False)
+ def replace_write(filename, content, mode=0o644, omode="wb"):
+ buf = WriteBuffer()
+ buf.mode = mode
+ buf.omode = omode
+ buf.write(content)
+ write_bufs[filename] = buf
- write_bufs = {}
-
- def replace_write(filename, content, mode=0644, omode="wb"):
- buf = WriteBuffer()
- buf.mode = mode
- buf.omode = omode
- buf.write(content)
- write_bufs[filename] = buf
+ mocks.enter_context(
+ mock.patch.object(util, 'write_file', replace_write))
+ mocks.enter_context(
+ mock.patch.object(os.path, 'isfile', return_value=False))
- util_mock(mocker.ARGS)
- self.mocker.call(replace_write)
- self.mocker.replay()
- ub_distro.apply_network(BASE_NET_CFG, False)
+ ub_distro.apply_network(BASE_NET_CFG, False)
- self.assertEquals(len(write_bufs), 1)
- self.assertIn('/etc/network/interfaces', write_bufs)
- write_buf = write_bufs['/etc/network/interfaces']
- self.assertEquals(str(write_buf).strip(), BASE_NET_CFG.strip())
- self.assertEquals(write_buf.mode, 0644)
+ self.assertEquals(len(write_bufs), 1)
+ self.assertIn('/etc/network/interfaces', write_bufs)
+ write_buf = write_bufs['/etc/network/interfaces']
+ self.assertEquals(str(write_buf).strip(), BASE_NET_CFG.strip())
+ self.assertEquals(write_buf.mode, 0o644)
def assertCfgEquals(self, blob1, blob2):
b1 = dict(SysConf(blob1.strip().splitlines()))
@@ -127,53 +127,41 @@ class TestNetCfgDistro(MockerTestCase):
def test_simple_write_rh(self):
rh_distro = self._get_distro('rhel')
- write_mock = self.mocker.replace(util.write_file,
- spec=False, passthrough=False)
- load_mock = self.mocker.replace(util.load_file,
- spec=False, passthrough=False)
- exists_mock = self.mocker.replace(os.path.isfile,
- spec=False, passthrough=False)
write_bufs = {}
- def replace_write(filename, content, mode=0644, omode="wb"):
+ def replace_write(filename, content, mode=0o644, omode="wb"):
buf = WriteBuffer()
buf.mode = mode
buf.omode = omode
buf.write(content)
write_bufs[filename] = buf
- exists_mock(mocker.ARGS)
- self.mocker.count(0, None)
- self.mocker.result(False)
-
- load_mock(mocker.ARGS)
- self.mocker.count(0, None)
- self.mocker.result('')
-
- for _i in range(0, 3):
- write_mock(mocker.ARGS)
- self.mocker.call(replace_write)
-
- write_mock(mocker.ARGS)
- self.mocker.call(replace_write)
-
- self.mocker.replay()
- rh_distro.apply_network(BASE_NET_CFG, False)
-
- self.assertEquals(len(write_bufs), 4)
- self.assertIn('/etc/sysconfig/network-scripts/ifcfg-lo', write_bufs)
- write_buf = write_bufs['/etc/sysconfig/network-scripts/ifcfg-lo']
- expected_buf = '''
+ with ExitStack() as mocks:
+ mocks.enter_context(
+ mock.patch.object(util, 'write_file', replace_write))
+ mocks.enter_context(
+ mock.patch.object(util, 'load_file', return_value=''))
+ mocks.enter_context(
+ mock.patch.object(os.path, 'isfile', return_value=False))
+
+ rh_distro.apply_network(BASE_NET_CFG, False)
+
+ self.assertEquals(len(write_bufs), 4)
+ self.assertIn('/etc/sysconfig/network-scripts/ifcfg-lo',
+ write_bufs)
+ write_buf = write_bufs['/etc/sysconfig/network-scripts/ifcfg-lo']
+ expected_buf = '''
DEVICE="lo"
ONBOOT=yes
'''
- self.assertCfgEquals(expected_buf, str(write_buf))
- self.assertEquals(write_buf.mode, 0644)
+ self.assertCfgEquals(expected_buf, str(write_buf))
+ self.assertEquals(write_buf.mode, 0o644)
- self.assertIn('/etc/sysconfig/network-scripts/ifcfg-eth0', write_bufs)
- write_buf = write_bufs['/etc/sysconfig/network-scripts/ifcfg-eth0']
- expected_buf = '''
+ self.assertIn('/etc/sysconfig/network-scripts/ifcfg-eth0',
+ write_bufs)
+ write_buf = write_bufs['/etc/sysconfig/network-scripts/ifcfg-eth0']
+ expected_buf = '''
DEVICE="eth0"
BOOTPROTO="static"
NETMASK="255.255.255.0"
@@ -182,77 +170,66 @@ ONBOOT=yes
GATEWAY="192.168.1.254"
BROADCAST="192.168.1.0"
'''
- self.assertCfgEquals(expected_buf, str(write_buf))
- self.assertEquals(write_buf.mode, 0644)
+ self.assertCfgEquals(expected_buf, str(write_buf))
+ self.assertEquals(write_buf.mode, 0o644)
- self.assertIn('/etc/sysconfig/network-scripts/ifcfg-eth1', write_bufs)
- write_buf = write_bufs['/etc/sysconfig/network-scripts/ifcfg-eth1']
- expected_buf = '''
+ self.assertIn('/etc/sysconfig/network-scripts/ifcfg-eth1',
+ write_bufs)
+ write_buf = write_bufs['/etc/sysconfig/network-scripts/ifcfg-eth1']
+ expected_buf = '''
DEVICE="eth1"
BOOTPROTO="dhcp"
ONBOOT=yes
'''
- self.assertCfgEquals(expected_buf, str(write_buf))
- self.assertEquals(write_buf.mode, 0644)
+ self.assertCfgEquals(expected_buf, str(write_buf))
+ self.assertEquals(write_buf.mode, 0o644)
- self.assertIn('/etc/sysconfig/network', write_bufs)
- write_buf = write_bufs['/etc/sysconfig/network']
- expected_buf = '''
+ self.assertIn('/etc/sysconfig/network', write_bufs)
+ write_buf = write_bufs['/etc/sysconfig/network']
+ expected_buf = '''
# Created by cloud-init v. 0.7
NETWORKING=yes
'''
- self.assertCfgEquals(expected_buf, str(write_buf))
- self.assertEquals(write_buf.mode, 0644)
+ self.assertCfgEquals(expected_buf, str(write_buf))
+ self.assertEquals(write_buf.mode, 0o644)
def test_write_ipv6_rhel(self):
rh_distro = self._get_distro('rhel')
- write_mock = self.mocker.replace(util.write_file,
- spec=False, passthrough=False)
- load_mock = self.mocker.replace(util.load_file,
- spec=False, passthrough=False)
- exists_mock = self.mocker.replace(os.path.isfile,
- spec=False, passthrough=False)
write_bufs = {}
- def replace_write(filename, content, mode=0644, omode="wb"):
+ def replace_write(filename, content, mode=0o644, omode="wb"):
buf = WriteBuffer()
buf.mode = mode
buf.omode = omode
buf.write(content)
write_bufs[filename] = buf
- exists_mock(mocker.ARGS)
- self.mocker.count(0, None)
- self.mocker.result(False)
-
- load_mock(mocker.ARGS)
- self.mocker.count(0, None)
- self.mocker.result('')
-
- for _i in range(0, 3):
- write_mock(mocker.ARGS)
- self.mocker.call(replace_write)
-
- write_mock(mocker.ARGS)
- self.mocker.call(replace_write)
-
- self.mocker.replay()
- rh_distro.apply_network(BASE_NET_CFG_IPV6, False)
-
- self.assertEquals(len(write_bufs), 4)
- self.assertIn('/etc/sysconfig/network-scripts/ifcfg-lo', write_bufs)
- write_buf = write_bufs['/etc/sysconfig/network-scripts/ifcfg-lo']
- expected_buf = '''
+ with ExitStack() as mocks:
+ mocks.enter_context(
+ mock.patch.object(util, 'write_file', replace_write))
+ mocks.enter_context(
+ mock.patch.object(util, 'load_file', return_value=''))
+ mocks.enter_context(
+ mock.patch.object(os.path, 'isfile', return_value=False))
+
+ rh_distro.apply_network(BASE_NET_CFG_IPV6, False)
+
+ self.assertEquals(len(write_bufs), 4)
+ self.assertIn('/etc/sysconfig/network-scripts/ifcfg-lo',
+ write_bufs)
+ write_buf = write_bufs['/etc/sysconfig/network-scripts/ifcfg-lo']
+ expected_buf = '''
DEVICE="lo"
ONBOOT=yes
'''
- self.assertCfgEquals(expected_buf, str(write_buf))
- self.assertEquals(write_buf.mode, 0644)
+ self.assertCfgEquals(expected_buf, str(write_buf))
+ self.assertEquals(write_buf.mode, 0o644)
- self.assertIn('/etc/sysconfig/network-scripts/ifcfg-eth0', write_bufs)
- write_buf = write_bufs['/etc/sysconfig/network-scripts/ifcfg-eth0']
- expected_buf = '''
+ self.assertIn('/etc/sysconfig/network-scripts/ifcfg-eth0',
+ write_bufs)
+ write_buf = write_bufs['/etc/sysconfig/network-scripts/ifcfg-eth0']
+ expected_buf = '''
DEVICE="eth0"
BOOTPROTO="static"
NETMASK="255.255.255.0"
@@ -264,11 +241,12 @@ IPV6INIT=yes
IPV6ADDR="2607:f0d0:1002:0011::2"
IPV6_DEFAULTGW="2607:f0d0:1002:0011::1"
'''
- self.assertCfgEquals(expected_buf, str(write_buf))
- self.assertEquals(write_buf.mode, 0644)
- self.assertIn('/etc/sysconfig/network-scripts/ifcfg-eth1', write_bufs)
- write_buf = write_bufs['/etc/sysconfig/network-scripts/ifcfg-eth1']
- expected_buf = '''
+ self.assertCfgEquals(expected_buf, str(write_buf))
+ self.assertEquals(write_buf.mode, 0o644)
+ self.assertIn('/etc/sysconfig/network-scripts/ifcfg-eth1',
+ write_bufs)
+ write_buf = write_bufs['/etc/sysconfig/network-scripts/ifcfg-eth1']
+ expected_buf = '''
DEVICE="eth1"
BOOTPROTO="static"
NETMASK="255.255.255.0"
@@ -280,38 +258,22 @@ IPV6INIT=yes
IPV6ADDR="2607:f0d0:1002:0011::3"
IPV6_DEFAULTGW="2607:f0d0:1002:0011::1"
'''
- self.assertCfgEquals(expected_buf, str(write_buf))
- self.assertEquals(write_buf.mode, 0644)
+ self.assertCfgEquals(expected_buf, str(write_buf))
+ self.assertEquals(write_buf.mode, 0o644)
- self.assertIn('/etc/sysconfig/network', write_bufs)
- write_buf = write_bufs['/etc/sysconfig/network']
- expected_buf = '''
+ self.assertIn('/etc/sysconfig/network', write_bufs)
+ write_buf = write_bufs['/etc/sysconfig/network']
+ expected_buf = '''
# Created by cloud-init v. 0.7
NETWORKING=yes
NETWORKING_IPV6=yes
IPV6_AUTOCONF=no
'''
- self.assertCfgEquals(expected_buf, str(write_buf))
- self.assertEquals(write_buf.mode, 0644)
+ self.assertCfgEquals(expected_buf, str(write_buf))
+ self.assertEquals(write_buf.mode, 0o644)
def test_simple_write_freebsd(self):
fbsd_distro = self._get_distro('freebsd')
- util_mock = self.mocker.replace(util.write_file,
- spec=False, passthrough=False)
- exists_mock = self.mocker.replace(os.path.isfile,
- spec=False, passthrough=False)
- load_mock = self.mocker.replace(util.load_file,
- spec=False, passthrough=False)
- subp_mock = self.mocker.replace(util.subp,
- spec=False, passthrough=False)
-
- subp_mock(['ifconfig', '-a'])
- self.mocker.count(0, None)
- self.mocker.result(('vtnet0', ''))
-
- exists_mock(mocker.ARGS)
- self.mocker.count(0, None)
- self.mocker.result(False)
write_bufs = {}
read_bufs = {
@@ -319,7 +281,7 @@ IPV6_AUTOCONF=no
'/etc/resolv.conf': '',
}
- def replace_write(filename, content, mode=0644, omode="wb"):
+ def replace_write(filename, content, mode=0o644, omode="wb"):
buf = WriteBuffer()
buf.mode = mode
buf.omode = omode
@@ -336,23 +298,24 @@ IPV6_AUTOCONF=no
return str(write_bufs[fname])
return read_bufs[fname]
- util_mock(mocker.ARGS)
- self.mocker.call(replace_write)
- self.mocker.count(0, None)
-
- load_mock(mocker.ARGS)
- self.mocker.call(replace_read)
- self.mocker.count(0, None)
-
- self.mocker.replay()
- fbsd_distro.apply_network(BASE_NET_CFG, False)
-
- self.assertIn('/etc/rc.conf', write_bufs)
- write_buf = write_bufs['/etc/rc.conf']
- expected_buf = '''
+ with ExitStack() as mocks:
+ mocks.enter_context(
+ mock.patch.object(util, 'subp', return_value=('vtnet0', '')))
+ mocks.enter_context(
+ mock.patch.object(os.path, 'exists', return_value=False))
+ mocks.enter_context(
+ mock.patch.object(util, 'write_file', replace_write))
+ mocks.enter_context(
+ mock.patch.object(util, 'load_file', replace_read))
+
+ fbsd_distro.apply_network(BASE_NET_CFG, False)
+
+ self.assertIn('/etc/rc.conf', write_bufs)
+ write_buf = write_bufs['/etc/rc.conf']
+ expected_buf = '''
ifconfig_vtnet0="192.168.1.5 netmask 255.255.255.0"
ifconfig_vtnet1="DHCP"
defaultrouter="192.168.1.254"
'''
- self.assertCfgEquals(expected_buf, str(write_buf))
- self.assertEquals(write_buf.mode, 0644)
+ self.assertCfgEquals(expected_buf, str(write_buf))
+ self.assertEquals(write_buf.mode, 0o644)
diff --git a/tests/unittests/test_distros/test_resolv.py b/tests/unittests/test_distros/test_resolv.py
index 6b6ff6aa..779b83e3 100644
--- a/tests/unittests/test_distros/test_resolv.py
+++ b/tests/unittests/test_distros/test_resolv.py
@@ -1,8 +1,7 @@
-from mocker import MockerTestCase
-
from cloudinit.distros.parsers import resolv_conf
import re
+import unittest
BASE_RESOLVE = '''
@@ -14,7 +13,7 @@ nameserver 10.15.30.92
BASE_RESOLVE = BASE_RESOLVE.strip()
-class TestResolvHelper(MockerTestCase):
+class TestResolvHelper(unittest.TestCase):
def test_parse_same(self):
rp = resolv_conf.ResolvConf(BASE_RESOLVE)
rp_r = str(rp).strip()
diff --git a/tests/unittests/test_distros/test_sysconfig.py b/tests/unittests/test_distros/test_sysconfig.py
index 0c651407..f66201b3 100644
--- a/tests/unittests/test_distros/test_sysconfig.py
+++ b/tests/unittests/test_distros/test_sysconfig.py
@@ -1,6 +1,5 @@
-from mocker import MockerTestCase
-
import re
+import unittest
from cloudinit.distros.parsers.sys_conf import SysConf
@@ -8,7 +7,7 @@ from cloudinit.distros.parsers.sys_conf import SysConf
# Lots of good examples @
# http://content.hccfl.edu/pollock/AUnix1/SysconfigFilesDesc.txt
-class TestSysConfHelper(MockerTestCase):
+class TestSysConfHelper(unittest.TestCase):
# This function was added in 2.7, make it work for 2.6
def assertRegMatches(self, text, regexp):
regexp = re.compile(regexp)
diff --git a/tests/unittests/test_distros/test_user_data_normalize.py b/tests/unittests/test_distros/test_user_data_normalize.py
index 50398c74..b90d6185 100644
--- a/tests/unittests/test_distros/test_user_data_normalize.py
+++ b/tests/unittests/test_distros/test_user_data_normalize.py
@@ -1,4 +1,4 @@
-from mocker import MockerTestCase
+import unittest
from cloudinit import distros
from cloudinit import helpers
@@ -15,7 +15,7 @@ bcfg = {
}
-class TestUGNormalize(MockerTestCase):
+class TestUGNormalize(unittest.TestCase):
def _make_distro(self, dtype, def_user=None):
cfg = dict(settings.CFG_BUILTIN)
diff --git a/tests/unittests/test_filters/test_launch_index.py b/tests/unittests/test_filters/test_launch_index.py
index 2f4c2fda..95d24b9b 100644
--- a/tests/unittests/test_filters/test_launch_index.py
+++ b/tests/unittests/test_filters/test_launch_index.py
@@ -2,7 +2,7 @@ import copy
from .. import helpers
-import itertools
+from six.moves import filterfalse
from cloudinit.filters import launch_index
from cloudinit import user_data as ud
@@ -36,11 +36,9 @@ class TestLaunchFilter(helpers.ResourceUsingTestCase):
return False
# Do some basic payload checking
msg1_msgs = [m for m in msg1.walk()]
- msg1_msgs = [m for m in
- itertools.ifilterfalse(ud.is_skippable, msg1_msgs)]
+ msg1_msgs = [m for m in filterfalse(ud.is_skippable, msg1_msgs)]
msg2_msgs = [m for m in msg2.walk()]
- msg2_msgs = [m for m in
- itertools.ifilterfalse(ud.is_skippable, msg2_msgs)]
+ msg2_msgs = [m for m in filterfalse(ud.is_skippable, msg2_msgs)]
for i in range(0, len(msg2_msgs)):
m1_msg = msg1_msgs[i]
m2_msg = msg2_msgs[i]
diff --git a/tests/unittests/test_handler/test_handler_apt_configure.py b/tests/unittests/test_handler/test_handler_apt_configure.py
index 203dd2aa..d72fa8c7 100644
--- a/tests/unittests/test_handler/test_handler_apt_configure.py
+++ b/tests/unittests/test_handler/test_handler_apt_configure.py
@@ -1,27 +1,29 @@
-from mocker import MockerTestCase
-
from cloudinit import util
from cloudinit.config import cc_apt_configure
import os
import re
+import shutil
+import tempfile
+import unittest
-class TestAptProxyConfig(MockerTestCase):
+class TestAptProxyConfig(unittest.TestCase):
def setUp(self):
super(TestAptProxyConfig, self).setUp()
- self.tmp = self.makeDir()
+ self.tmp = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, self.tmp)
self.pfile = os.path.join(self.tmp, "proxy.cfg")
self.cfile = os.path.join(self.tmp, "config.cfg")
def _search_apt_config(self, contents, ptype, value):
- print(
- r"acquire::%s::proxy\s+[\"']%s[\"'];\n" % (ptype, value),
- contents, "flags=re.IGNORECASE")
- return(re.search(
+ ## print(
+ ## r"acquire::%s::proxy\s+[\"']%s[\"'];\n" % (ptype, value),
+ ## contents, "flags=re.IGNORECASE")
+ return re.search(
r"acquire::%s::proxy\s+[\"']%s[\"'];\n" % (ptype, value),
- contents, flags=re.IGNORECASE))
+ contents, flags=re.IGNORECASE)
def test_apt_proxy_written(self):
cfg = {'apt_proxy': 'myproxy'}
@@ -60,7 +62,7 @@ class TestAptProxyConfig(MockerTestCase):
contents = str(util.read_file_or_url(self.pfile))
- for ptype, pval in values.iteritems():
+ for ptype, pval in values.items():
self.assertTrue(self._search_apt_config(contents, ptype, pval))
def test_proxy_deleted(self):
diff --git a/tests/unittests/test_handler/test_handler_ca_certs.py b/tests/unittests/test_handler/test_handler_ca_certs.py
index 0558023a..97213a0c 100644
--- a/tests/unittests/test_handler/test_handler_ca_certs.py
+++ b/tests/unittests/test_handler/test_handler_ca_certs.py
@@ -1,5 +1,3 @@
-from mocker import MockerTestCase
-
from cloudinit import cloud
from cloudinit import helpers
from cloudinit import util
@@ -7,9 +5,21 @@ from cloudinit import util
from cloudinit.config import cc_ca_certs
import logging
+import shutil
+import tempfile
+import unittest
+
+try:
+ from unittest import mock
+except ImportError:
+ import mock
+try:
+ from contextlib import ExitStack
+except ImportError:
+ from contextlib2 import ExitStack
-class TestNoConfig(MockerTestCase):
+class TestNoConfig(unittest.TestCase):
def setUp(self):
super(TestNoConfig, self).setUp()
self.name = "ca-certs"
@@ -22,15 +32,20 @@ class TestNoConfig(MockerTestCase):
Test that nothing is done if no ca-certs configuration is provided.
"""
config = util.get_builtin_cfg()
- self.mocker.replace(util.write_file, passthrough=False)
- self.mocker.replace(cc_ca_certs.update_ca_certs, passthrough=False)
- self.mocker.replay()
+ with ExitStack() as mocks:
+ util_mock = mocks.enter_context(
+ mock.patch.object(util, 'write_file'))
+ certs_mock = mocks.enter_context(
+ mock.patch.object(cc_ca_certs, 'update_ca_certs'))
- cc_ca_certs.handle(self.name, config, self.cloud_init, self.log,
- self.args)
+ cc_ca_certs.handle(self.name, config, self.cloud_init, self.log,
+ self.args)
+ self.assertEqual(util_mock.call_count, 0)
+ self.assertEqual(certs_mock.call_count, 0)
-class TestConfig(MockerTestCase):
+
+class TestConfig(unittest.TestCase):
def setUp(self):
super(TestConfig, self).setUp()
self.name = "ca-certs"
@@ -39,16 +54,16 @@ class TestConfig(MockerTestCase):
self.log = logging.getLogger("TestNoConfig")
self.args = []
- # Mock out the functions that actually modify the system
- self.mock_add = self.mocker.replace(cc_ca_certs.add_ca_certs,
- passthrough=False)
- self.mock_update = self.mocker.replace(cc_ca_certs.update_ca_certs,
- passthrough=False)
- self.mock_remove = self.mocker.replace(
- cc_ca_certs.remove_default_ca_certs, passthrough=False)
+ self.mocks = ExitStack()
+ self.addCleanup(self.mocks.close)
- # Order must be correct
- self.mocker.order()
+ # Mock out the functions that actually modify the system
+ self.mock_add = self.mocks.enter_context(
+ mock.patch.object(cc_ca_certs, 'add_ca_certs'))
+ self.mock_update = self.mocks.enter_context(
+ mock.patch.object(cc_ca_certs, 'update_ca_certs'))
+ self.mock_remove = self.mocks.enter_context(
+ mock.patch.object(cc_ca_certs, 'remove_default_ca_certs'))
def test_no_trusted_list(self):
"""
@@ -57,86 +72,88 @@ class TestConfig(MockerTestCase):
"""
config = {"ca-certs": {}}
- # No functions should be called
- self.mock_update()
- self.mocker.replay()
-
cc_ca_certs.handle(self.name, config, self.cloud, self.log, self.args)
+ self.assertEqual(self.mock_add.call_count, 0)
+ self.assertEqual(self.mock_update.call_count, 1)
+ self.assertEqual(self.mock_remove.call_count, 0)
+
def test_empty_trusted_list(self):
"""Test that no certificate are written if 'trusted' list is empty."""
config = {"ca-certs": {"trusted": []}}
- # No functions should be called
- self.mock_update()
- self.mocker.replay()
-
cc_ca_certs.handle(self.name, config, self.cloud, self.log, self.args)
+ self.assertEqual(self.mock_add.call_count, 0)
+ self.assertEqual(self.mock_update.call_count, 1)
+ self.assertEqual(self.mock_remove.call_count, 0)
+
def test_single_trusted(self):
"""Test that a single cert gets passed to add_ca_certs."""
config = {"ca-certs": {"trusted": ["CERT1"]}}
- self.mock_add(["CERT1"])
- self.mock_update()
- self.mocker.replay()
-
cc_ca_certs.handle(self.name, config, self.cloud, self.log, self.args)
+ self.mock_add.assert_called_once_with(['CERT1'])
+ self.assertEqual(self.mock_update.call_count, 1)
+ self.assertEqual(self.mock_remove.call_count, 0)
+
def test_multiple_trusted(self):
"""Test that multiple certs get passed to add_ca_certs."""
config = {"ca-certs": {"trusted": ["CERT1", "CERT2"]}}
- self.mock_add(["CERT1", "CERT2"])
- self.mock_update()
- self.mocker.replay()
-
cc_ca_certs.handle(self.name, config, self.cloud, self.log, self.args)
+ self.mock_add.assert_called_once_with(['CERT1', 'CERT2'])
+ self.assertEqual(self.mock_update.call_count, 1)
+ self.assertEqual(self.mock_remove.call_count, 0)
+
def test_remove_default_ca_certs(self):
"""Test remove_defaults works as expected."""
config = {"ca-certs": {"remove-defaults": True}}
- self.mock_remove()
- self.mock_update()
- self.mocker.replay()
-
cc_ca_certs.handle(self.name, config, self.cloud, self.log, self.args)
+ self.assertEqual(self.mock_add.call_count, 0)
+ self.assertEqual(self.mock_update.call_count, 1)
+ self.assertEqual(self.mock_remove.call_count, 1)
+
def test_no_remove_defaults_if_false(self):
"""Test remove_defaults is not called when config value is False."""
config = {"ca-certs": {"remove-defaults": False}}
- self.mock_update()
- self.mocker.replay()
-
cc_ca_certs.handle(self.name, config, self.cloud, self.log, self.args)
+ self.assertEqual(self.mock_add.call_count, 0)
+ self.assertEqual(self.mock_update.call_count, 1)
+ self.assertEqual(self.mock_remove.call_count, 0)
+
def test_correct_order_for_remove_then_add(self):
"""Test remove_defaults is not called when config value is False."""
config = {"ca-certs": {"remove-defaults": True, "trusted": ["CERT1"]}}
- self.mock_remove()
- self.mock_add(["CERT1"])
- self.mock_update()
- self.mocker.replay()
-
cc_ca_certs.handle(self.name, config, self.cloud, self.log, self.args)
+ self.mock_add.assert_called_once_with(['CERT1'])
+ self.assertEqual(self.mock_update.call_count, 1)
+ self.assertEqual(self.mock_remove.call_count, 1)
-class TestAddCaCerts(MockerTestCase):
+
+class TestAddCaCerts(unittest.TestCase):
def setUp(self):
super(TestAddCaCerts, self).setUp()
+ tmpdir = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, tmpdir)
self.paths = helpers.Paths({
- 'cloud_dir': self.makeDir()
+ 'cloud_dir': tmpdir,
})
def test_no_certs_in_list(self):
"""Test that no certificate are written if not provided."""
- self.mocker.replace(util.write_file, passthrough=False)
- self.mocker.replay()
- cc_ca_certs.add_ca_certs([])
+ with mock.patch.object(util, 'write_file') as mockobj:
+ cc_ca_certs.add_ca_certs([])
+ self.assertEqual(mockobj.call_count, 0)
def test_single_cert_trailing_cr(self):
"""Test adding a single certificate to the trusted CAs
@@ -146,19 +163,21 @@ class TestAddCaCerts(MockerTestCase):
ca_certs_content = "line1\nline2\ncloud-init-ca-certs.crt\nline3\n"
expected = "line1\nline2\nline3\ncloud-init-ca-certs.crt\n"
- mock_write = self.mocker.replace(util.write_file, passthrough=False)
- mock_load = self.mocker.replace(util.load_file, passthrough=False)
-
- mock_write("/usr/share/ca-certificates/cloud-init-ca-certs.crt",
- cert, mode=0644)
-
- mock_load("/etc/ca-certificates.conf")
- self.mocker.result(ca_certs_content)
+ with ExitStack() as mocks:
+ mock_write = mocks.enter_context(
+ mock.patch.object(util, 'write_file'))
+ mock_load = mocks.enter_context(
+ mock.patch.object(util, 'load_file',
+ return_value=ca_certs_content))
- mock_write("/etc/ca-certificates.conf", expected, omode="wb")
- self.mocker.replay()
+ cc_ca_certs.add_ca_certs([cert])
- cc_ca_certs.add_ca_certs([cert])
+ mock_write.assert_has_calls([
+ mock.call("/usr/share/ca-certificates/cloud-init-ca-certs.crt",
+ cert, mode=0o644),
+ mock.call("/etc/ca-certificates.conf", expected, omode="wb"),
+ ])
+ mock_load.assert_called_once_with("/etc/ca-certificates.conf")
def test_single_cert_no_trailing_cr(self):
"""Test adding a single certificate to the trusted CAs
@@ -167,75 +186,89 @@ class TestAddCaCerts(MockerTestCase):
ca_certs_content = "line1\nline2\nline3"
- mock_write = self.mocker.replace(util.write_file, passthrough=False)
- mock_load = self.mocker.replace(util.load_file, passthrough=False)
+ with ExitStack() as mocks:
+ mock_write = mocks.enter_context(
+ mock.patch.object(util, 'write_file'))
+ mock_load = mocks.enter_context(
+ mock.patch.object(util, 'load_file',
+ return_value=ca_certs_content))
- mock_write("/usr/share/ca-certificates/cloud-init-ca-certs.crt",
- cert, mode=0644)
+ cc_ca_certs.add_ca_certs([cert])
- mock_load("/etc/ca-certificates.conf")
- self.mocker.result(ca_certs_content)
+ mock_write.assert_has_calls([
+ mock.call("/usr/share/ca-certificates/cloud-init-ca-certs.crt",
+ cert, mode=0o644),
+ mock.call("/etc/ca-certificates.conf",
+ "%s\n%s\n" % (ca_certs_content,
+ "cloud-init-ca-certs.crt"),
+ omode="wb"),
+ ])
- mock_write("/etc/ca-certificates.conf",
- "%s\n%s\n" % (ca_certs_content, "cloud-init-ca-certs.crt"),
- omode="wb")
- self.mocker.replay()
-
- cc_ca_certs.add_ca_certs([cert])
+ mock_load.assert_called_once_with("/etc/ca-certificates.conf")
def test_multiple_certs(self):
"""Test adding multiple certificates to the trusted CAs."""
certs = ["CERT1\nLINE2\nLINE3", "CERT2\nLINE2\nLINE3"]
expected_cert_file = "\n".join(certs)
-
- mock_write = self.mocker.replace(util.write_file, passthrough=False)
- mock_load = self.mocker.replace(util.load_file, passthrough=False)
-
- mock_write("/usr/share/ca-certificates/cloud-init-ca-certs.crt",
- expected_cert_file, mode=0644)
-
ca_certs_content = "line1\nline2\nline3"
- mock_load("/etc/ca-certificates.conf")
- self.mocker.result(ca_certs_content)
- out = "%s\n%s\n" % (ca_certs_content, "cloud-init-ca-certs.crt")
- mock_write("/etc/ca-certificates.conf", out, omode="wb")
+ with ExitStack() as mocks:
+ mock_write = mocks.enter_context(
+ mock.patch.object(util, 'write_file'))
+ mock_load = mocks.enter_context(
+ mock.patch.object(util, 'load_file',
+ return_value=ca_certs_content))
- self.mocker.replay()
+ cc_ca_certs.add_ca_certs(certs)
- cc_ca_certs.add_ca_certs(certs)
+ mock_write.assert_has_calls([
+ mock.call("/usr/share/ca-certificates/cloud-init-ca-certs.crt",
+ expected_cert_file, mode=0o644),
+ mock.call("/etc/ca-certificates.conf",
+ "%s\n%s\n" % (ca_certs_content,
+ "cloud-init-ca-certs.crt"),
+ omode='wb'),
+ ])
+ mock_load.assert_called_once_with("/etc/ca-certificates.conf")
-class TestUpdateCaCerts(MockerTestCase):
- def test_commands(self):
- mock_check_call = self.mocker.replace(util.subp,
- passthrough=False)
- mock_check_call(["update-ca-certificates"], capture=False)
- self.mocker.replay()
- cc_ca_certs.update_ca_certs()
+class TestUpdateCaCerts(unittest.TestCase):
+ def test_commands(self):
+ with mock.patch.object(util, 'subp') as mockobj:
+ cc_ca_certs.update_ca_certs()
+ mockobj.assert_called_once_with(
+ ["update-ca-certificates"], capture=False)
-class TestRemoveDefaultCaCerts(MockerTestCase):
+class TestRemoveDefaultCaCerts(unittest.TestCase):
def setUp(self):
super(TestRemoveDefaultCaCerts, self).setUp()
+ tmpdir = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, tmpdir)
self.paths = helpers.Paths({
- 'cloud_dir': self.makeDir()
+ 'cloud_dir': tmpdir,
})
def test_commands(self):
- mock_delete_dir_contents = self.mocker.replace(
- util.delete_dir_contents, passthrough=False)
- mock_write = self.mocker.replace(util.write_file, passthrough=False)
- mock_subp = self.mocker.replace(util.subp,
- passthrough=False)
-
- mock_delete_dir_contents("/usr/share/ca-certificates/")
- mock_delete_dir_contents("/etc/ssl/certs/")
- mock_write("/etc/ca-certificates.conf", "", mode=0644)
- mock_subp(('debconf-set-selections', '-'),
- "ca-certificates ca-certificates/trust_new_crts select no")
- self.mocker.replay()
-
- cc_ca_certs.remove_default_ca_certs()
+ with ExitStack() as mocks:
+ mock_delete = mocks.enter_context(
+ mock.patch.object(util, 'delete_dir_contents'))
+ mock_write = mocks.enter_context(
+ mock.patch.object(util, 'write_file'))
+ mock_subp = mocks.enter_context(mock.patch.object(util, 'subp'))
+
+ cc_ca_certs.remove_default_ca_certs()
+
+ mock_delete.assert_has_calls([
+ mock.call("/usr/share/ca-certificates/"),
+ mock.call("/etc/ssl/certs/"),
+ ])
+
+ mock_write.assert_called_once_with(
+ "/etc/ca-certificates.conf", "", mode=0o644)
+
+ mock_subp.assert_called_once_with(
+ ('debconf-set-selections', '-'),
+ "ca-certificates ca-certificates/trust_new_crts select no")
diff --git a/tests/unittests/test_handler/test_handler_chef.py b/tests/unittests/test_handler/test_handler_chef.py
index ef1aa208..8ab27911 100644
--- a/tests/unittests/test_handler/test_handler_chef.py
+++ b/tests/unittests/test_handler/test_handler_chef.py
@@ -11,7 +11,10 @@ from cloudinit.sources import DataSourceNone
from .. import helpers as t_help
+import six
import logging
+import shutil
+import tempfile
LOG = logging.getLogger(__name__)
@@ -19,7 +22,8 @@ LOG = logging.getLogger(__name__)
class TestChef(t_help.FilesystemMockingTestCase):
def setUp(self):
super(TestChef, self).setUp()
- self.tmp = self.makeDir(prefix="unittest_")
+ self.tmp = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, self.tmp)
def fetch_cloud(self, distro_kind):
cls = distros.fetch(distro_kind)
@@ -74,7 +78,7 @@ class TestChef(t_help.FilesystemMockingTestCase):
for k, v in cfg['chef'].items():
self.assertIn(v, c)
for k, v in cc_chef.CHEF_RB_TPL_DEFAULTS.items():
- if isinstance(v, basestring):
+ if isinstance(v, six.string_types):
self.assertIn(v, c)
c = util.load_file(cc_chef.CHEF_FB_PATH)
self.assertEqual({}, json.loads(c))
diff --git a/tests/unittests/test_handler/test_handler_debug.py b/tests/unittests/test_handler/test_handler_debug.py
index 8891ca04..80708d7b 100644
--- a/tests/unittests/test_handler/test_handler_debug.py
+++ b/tests/unittests/test_handler/test_handler_debug.py
@@ -26,6 +26,8 @@ from cloudinit.sources import DataSourceNone
from .. import helpers as t_help
import logging
+import shutil
+import tempfile
LOG = logging.getLogger(__name__)
@@ -33,7 +35,8 @@ LOG = logging.getLogger(__name__)
class TestDebug(t_help.FilesystemMockingTestCase):
def setUp(self):
super(TestDebug, self).setUp()
- self.new_root = self.makeDir(prefix="unittest_")
+ self.new_root = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, self.new_root)
def _get_cloud(self, distro, metadata=None):
self.patchUtils(self.new_root)
diff --git a/tests/unittests/test_handler/test_handler_growpart.py b/tests/unittests/test_handler/test_handler_growpart.py
index 5d0636d1..89727863 100644
--- a/tests/unittests/test_handler/test_handler_growpart.py
+++ b/tests/unittests/test_handler/test_handler_growpart.py
@@ -1,5 +1,3 @@
-from mocker import MockerTestCase
-
from cloudinit import cloud
from cloudinit import util
@@ -9,6 +7,16 @@ import errno
import logging
import os
import re
+import unittest
+
+try:
+ from unittest import mock
+except ImportError:
+ import mock
+try:
+ from contextlib import ExitStack
+except ImportError:
+ from contextlib2 import ExitStack
# growpart:
# mode: auto # off, on, auto, 'growpart'
@@ -42,7 +50,7 @@ growpart disk partition
"""
-class TestDisabled(MockerTestCase):
+class TestDisabled(unittest.TestCase):
def setUp(self):
super(TestDisabled, self).setUp()
self.name = "growpart"
@@ -57,14 +65,14 @@ class TestDisabled(MockerTestCase):
# this really only verifies that resizer_factory isn't called
config = {'growpart': {'mode': 'off'}}
- self.mocker.replace(cc_growpart.resizer_factory,
- passthrough=False)
- self.mocker.replay()
- self.handle(self.name, config, self.cloud_init, self.log, self.args)
+ with mock.patch.object(cc_growpart, 'resizer_factory') as mockobj:
+ self.handle(self.name, config, self.cloud_init, self.log,
+ self.args)
+ self.assertEqual(mockobj.call_count, 0)
-class TestConfig(MockerTestCase):
+class TestConfig(unittest.TestCase):
def setUp(self):
super(TestConfig, self).setUp()
self.name = "growpart"
@@ -77,75 +85,76 @@ class TestConfig(MockerTestCase):
self.cloud_init = None
self.handle = cc_growpart.handle
- # Order must be correct
- self.mocker.order()
-
def test_no_resizers_auto_is_fine(self):
- subp = self.mocker.replace(util.subp, passthrough=False)
- subp(['growpart', '--help'], env={'LANG': 'C'})
- self.mocker.result((HELP_GROWPART_NO_RESIZE, ""))
- self.mocker.replay()
+ with mock.patch.object(
+ util, 'subp',
+ return_value=(HELP_GROWPART_NO_RESIZE, "")) as mockobj:
- config = {'growpart': {'mode': 'auto'}}
- self.handle(self.name, config, self.cloud_init, self.log, self.args)
+ config = {'growpart': {'mode': 'auto'}}
+ self.handle(self.name, config, self.cloud_init, self.log,
+ self.args)
+
+ mockobj.assert_called_once_with(
+ ['growpart', '--help'], env={'LANG': 'C'})
def test_no_resizers_mode_growpart_is_exception(self):
- subp = self.mocker.replace(util.subp, passthrough=False)
- subp(['growpart', '--help'], env={'LANG': 'C'})
- self.mocker.result((HELP_GROWPART_NO_RESIZE, ""))
- self.mocker.replay()
+ with mock.patch.object(
+ util, 'subp',
+ return_value=(HELP_GROWPART_NO_RESIZE, "")) as mockobj:
+ config = {'growpart': {'mode': "growpart"}}
+ self.assertRaises(
+ ValueError, self.handle, self.name, config,
+ self.cloud_init, self.log, self.args)
- config = {'growpart': {'mode': "growpart"}}
- self.assertRaises(ValueError, self.handle, self.name, config,
- self.cloud_init, self.log, self.args)
+ mockobj.assert_called_once_with(
+ ['growpart', '--help'], env={'LANG': 'C'})
def test_mode_auto_prefers_growpart(self):
- subp = self.mocker.replace(util.subp, passthrough=False)
- subp(['growpart', '--help'], env={'LANG': 'C'})
- self.mocker.result((HELP_GROWPART_RESIZE, ""))
- self.mocker.replay()
+ with mock.patch.object(
+ util, 'subp',
+ return_value=(HELP_GROWPART_RESIZE, "")) as mockobj:
+ ret = cc_growpart.resizer_factory(mode="auto")
+ self.assertIsInstance(ret, cc_growpart.ResizeGrowPart)
- ret = cc_growpart.resizer_factory(mode="auto")
- self.assertTrue(isinstance(ret, cc_growpart.ResizeGrowPart))
+ mockobj.assert_called_once_with(
+ ['growpart', '--help'], env={'LANG': 'C'})
def test_handle_with_no_growpart_entry(self):
# if no 'growpart' entry in config, then mode=auto should be used
myresizer = object()
+ retval = (("/", cc_growpart.RESIZE.CHANGED, "my-message",),)
+
+ with ExitStack() as mocks:
+ factory = mocks.enter_context(
+ mock.patch.object(cc_growpart, 'resizer_factory',
+ return_value=myresizer))
+ rsdevs = mocks.enter_context(
+ mock.patch.object(cc_growpart, 'resize_devices',
+ return_value=retval))
+ mocks.enter_context(
+ mock.patch.object(cc_growpart, 'RESIZERS',
+ (('mysizer', object),)
+ ))
- factory = self.mocker.replace(cc_growpart.resizer_factory,
- passthrough=False)
- rsdevs = self.mocker.replace(cc_growpart.resize_devices,
- passthrough=False)
- factory("auto")
- self.mocker.result(myresizer)
- rsdevs(myresizer, ["/"])
- self.mocker.result((("/", cc_growpart.RESIZE.CHANGED, "my-message",),))
- self.mocker.replay()
-
- try:
- orig_resizers = cc_growpart.RESIZERS
- cc_growpart.RESIZERS = (('mysizer', object),)
self.handle(self.name, {}, self.cloud_init, self.log, self.args)
- finally:
- cc_growpart.RESIZERS = orig_resizers
+ factory.assert_called_once_with('auto')
+ rsdevs.assert_called_once_with(myresizer, ['/'])
-class TestResize(MockerTestCase):
+
+class TestResize(unittest.TestCase):
def setUp(self):
super(TestResize, self).setUp()
self.name = "growpart"
self.log = logging.getLogger("TestResize")
- # Order must be correct
- self.mocker.order()
-
def test_simple_devices(self):
# test simple device list
# this patches out devent2dev, os.stat, and device_part_info
# so in the end, doesn't test a lot
devs = ["/dev/XXda1", "/dev/YYda2"]
- devstat_ret = Bunch(st_mode=25008, st_ino=6078, st_dev=5L,
+ devstat_ret = Bunch(st_mode=25008, st_ino=6078, st_dev=5,
st_nlink=1, st_uid=0, st_gid=6, st_size=0,
st_atime=0, st_mtime=0, st_ctime=0)
enoent = ["/dev/NOENT"]
diff --git a/tests/unittests/test_handler/test_handler_locale.py b/tests/unittests/test_handler/test_handler_locale.py
index eb251636..de85eff6 100644
--- a/tests/unittests/test_handler/test_handler_locale.py
+++ b/tests/unittests/test_handler/test_handler_locale.py
@@ -29,9 +29,11 @@ from .. import helpers as t_help
from configobj import ConfigObj
-from StringIO import StringIO
+from six import BytesIO
import logging
+import shutil
+import tempfile
LOG = logging.getLogger(__name__)
@@ -39,7 +41,8 @@ LOG = logging.getLogger(__name__)
class TestLocale(t_help.FilesystemMockingTestCase):
def setUp(self):
super(TestLocale, self).setUp()
- self.new_root = self.makeDir(prefix="unittest_")
+ self.new_root = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, self.new_root)
def _get_cloud(self, distro):
self.patchUtils(self.new_root)
@@ -59,6 +62,6 @@ class TestLocale(t_help.FilesystemMockingTestCase):
cc = self._get_cloud('sles')
cc_locale.handle('cc_locale', cfg, cc, LOG, [])
- contents = util.load_file('/etc/sysconfig/language')
- n_cfg = ConfigObj(StringIO(contents))
+ contents = util.load_file('/etc/sysconfig/language', decode=False)
+ n_cfg = ConfigObj(BytesIO(contents))
self.assertEquals({'RC_LANG': cfg['locale']}, dict(n_cfg))
diff --git a/tests/unittests/test_handler/test_handler_seed_random.py b/tests/unittests/test_handler/test_handler_seed_random.py
index 40481f16..d3f18fa0 100644
--- a/tests/unittests/test_handler/test_handler_seed_random.py
+++ b/tests/unittests/test_handler/test_handler_seed_random.py
@@ -22,7 +22,7 @@ import base64
import gzip
import tempfile
-from StringIO import StringIO
+from six import BytesIO
from cloudinit import cloud
from cloudinit import distros
@@ -38,6 +38,13 @@ import logging
LOG = logging.getLogger(__name__)
+def b64(source):
+ # In Python 3, b64encode only accepts bytes and returns bytes.
+ if not isinstance(source, bytes):
+ source = source.encode('utf-8')
+ return base64.b64encode(source).decode('us-ascii')
+
+
class TestRandomSeed(t_help.TestCase):
def setUp(self):
super(TestRandomSeed, self).setUp()
@@ -69,7 +76,7 @@ class TestRandomSeed(t_help.TestCase):
return
def _compress(self, text):
- contents = StringIO()
+ contents = BytesIO()
gz_fh = gzip.GzipFile(mode='wb', fileobj=contents)
gz_fh.write(text)
gz_fh.close()
@@ -96,7 +103,7 @@ class TestRandomSeed(t_help.TestCase):
self.assertEquals("tiny-tim-was-here", contents)
def test_append_random_unknown_encoding(self):
- data = self._compress("tiny-toe")
+ data = self._compress(b"tiny-toe")
cfg = {
'random_seed': {
'file': self._seed_file,
@@ -108,7 +115,7 @@ class TestRandomSeed(t_help.TestCase):
self._get_cloud('ubuntu'), LOG, [])
def test_append_random_gzip(self):
- data = self._compress("tiny-toe")
+ data = self._compress(b"tiny-toe")
cfg = {
'random_seed': {
'file': self._seed_file,
@@ -121,7 +128,7 @@ class TestRandomSeed(t_help.TestCase):
self.assertEquals("tiny-toe", contents)
def test_append_random_gz(self):
- data = self._compress("big-toe")
+ data = self._compress(b"big-toe")
cfg = {
'random_seed': {
'file': self._seed_file,
@@ -134,7 +141,7 @@ class TestRandomSeed(t_help.TestCase):
self.assertEquals("big-toe", contents)
def test_append_random_base64(self):
- data = base64.b64encode('bubbles')
+ data = b64('bubbles')
cfg = {
'random_seed': {
'file': self._seed_file,
@@ -147,7 +154,7 @@ class TestRandomSeed(t_help.TestCase):
self.assertEquals("bubbles", contents)
def test_append_random_b64(self):
- data = base64.b64encode('kit-kat')
+ data = b64('kit-kat')
cfg = {
'random_seed': {
'file': self._seed_file,
diff --git a/tests/unittests/test_handler/test_handler_set_hostname.py b/tests/unittests/test_handler/test_handler_set_hostname.py
index e1530e30..d358b069 100644
--- a/tests/unittests/test_handler/test_handler_set_hostname.py
+++ b/tests/unittests/test_handler/test_handler_set_hostname.py
@@ -7,9 +7,11 @@ from cloudinit import util
from .. import helpers as t_help
+import shutil
+import tempfile
import logging
-from StringIO import StringIO
+from six import BytesIO
from configobj import ConfigObj
@@ -19,7 +21,8 @@ LOG = logging.getLogger(__name__)
class TestHostname(t_help.FilesystemMockingTestCase):
def setUp(self):
super(TestHostname, self).setUp()
- self.tmp = self.makeDir(prefix="unittest_")
+ self.tmp = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, self.tmp)
def _fetch_distro(self, kind):
cls = distros.fetch(kind)
@@ -38,8 +41,8 @@ class TestHostname(t_help.FilesystemMockingTestCase):
cc_set_hostname.handle('cc_set_hostname',
cfg, cc, LOG, [])
if not distro.uses_systemd():
- contents = util.load_file("/etc/sysconfig/network")
- n_cfg = ConfigObj(StringIO(contents))
+ contents = util.load_file("/etc/sysconfig/network", decode=False)
+ n_cfg = ConfigObj(BytesIO(contents))
self.assertEquals({'HOSTNAME': 'blah.blah.blah.yahoo.com'},
dict(n_cfg))
diff --git a/tests/unittests/test_handler/test_handler_timezone.py b/tests/unittests/test_handler/test_handler_timezone.py
index 874db340..e3df8759 100644
--- a/tests/unittests/test_handler/test_handler_timezone.py
+++ b/tests/unittests/test_handler/test_handler_timezone.py
@@ -29,8 +29,10 @@ from .. import helpers as t_help
from configobj import ConfigObj
-from StringIO import StringIO
+from six import BytesIO
+import shutil
+import tempfile
import logging
LOG = logging.getLogger(__name__)
@@ -39,7 +41,8 @@ LOG = logging.getLogger(__name__)
class TestTimezone(t_help.FilesystemMockingTestCase):
def setUp(self):
super(TestTimezone, self).setUp()
- self.new_root = self.makeDir(prefix="unittest_")
+ self.new_root = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, self.new_root)
def _get_cloud(self, distro):
self.patchUtils(self.new_root)
@@ -67,8 +70,8 @@ class TestTimezone(t_help.FilesystemMockingTestCase):
cc_timezone.handle('cc_timezone', cfg, cc, LOG, [])
- contents = util.load_file('/etc/sysconfig/clock')
- n_cfg = ConfigObj(StringIO(contents))
+ contents = util.load_file('/etc/sysconfig/clock', decode=False)
+ n_cfg = ConfigObj(BytesIO(contents))
self.assertEquals({'TIMEZONE': cfg['timezone']}, dict(n_cfg))
contents = util.load_file('/etc/localtime')
diff --git a/tests/unittests/test_handler/test_handler_yum_add_repo.py b/tests/unittests/test_handler/test_handler_yum_add_repo.py
index 435c9787..3a8aa7c1 100644
--- a/tests/unittests/test_handler/test_handler_yum_add_repo.py
+++ b/tests/unittests/test_handler/test_handler_yum_add_repo.py
@@ -4,9 +4,11 @@ from cloudinit.config import cc_yum_add_repo
from .. import helpers
+import shutil
+import tempfile
import logging
-from StringIO import StringIO
+from six import BytesIO
import configobj
@@ -16,7 +18,8 @@ LOG = logging.getLogger(__name__)
class TestConfig(helpers.FilesystemMockingTestCase):
def setUp(self):
super(TestConfig, self).setUp()
- self.tmp = self.makeDir(prefix="unittest_")
+ self.tmp = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, self.tmp)
def test_bad_config(self):
cfg = {
@@ -52,8 +55,9 @@ class TestConfig(helpers.FilesystemMockingTestCase):
}
self.patchUtils(self.tmp)
cc_yum_add_repo.handle('yum_add_repo', cfg, None, LOG, [])
- contents = util.load_file("/etc/yum.repos.d/epel_testing.repo")
- contents = configobj.ConfigObj(StringIO(contents))
+ contents = util.load_file("/etc/yum.repos.d/epel_testing.repo",
+ decode=False)
+ contents = configobj.ConfigObj(BytesIO(contents))
expected = {
'epel_testing': {
'name': 'Extra Packages for Enterprise Linux 5 - Testing',
diff --git a/tests/unittests/test_merging.py b/tests/unittests/test_merging.py
index 07b610f7..976d8283 100644
--- a/tests/unittests/test_merging.py
+++ b/tests/unittests/test_merging.py
@@ -11,11 +11,13 @@ import glob
import os
import random
import re
+import six
import string
SOURCE_PAT = "source*.*yaml"
EXPECTED_PAT = "expected%s.yaml"
-TYPES = [long, int, dict, str, list, tuple, None]
+TYPES = [dict, str, list, tuple, None]
+TYPES.extend(six.integer_types)
def _old_mergedict(src, cand):
@@ -25,7 +27,7 @@ def _old_mergedict(src, cand):
Nested dictionaries are merged recursively.
"""
if isinstance(src, dict) and isinstance(cand, dict):
- for (k, v) in cand.iteritems():
+ for (k, v) in cand.items():
if k not in src:
src[k] = v
else:
@@ -42,8 +44,8 @@ def _old_mergemanydict(*args):
def _random_str(rand):
base = ''
- for _i in xrange(rand.randint(1, 2 ** 8)):
- base += rand.choice(string.letters + string.digits)
+ for _i in range(rand.randint(1, 2 ** 8)):
+ base += rand.choice(string.ascii_letters + string.digits)
return base
@@ -64,7 +66,7 @@ def _make_dict(current_depth, max_depth, rand):
if t in [dict, list, tuple]:
if t in [dict]:
amount = rand.randint(0, 5)
- keys = [_random_str(rand) for _i in xrange(0, amount)]
+ keys = [_random_str(rand) for _i in range(0, amount)]
base = {}
for k in keys:
try:
@@ -74,14 +76,14 @@ def _make_dict(current_depth, max_depth, rand):
elif t in [list, tuple]:
base = []
amount = rand.randint(0, 5)
- for _i in xrange(0, amount):
+ for _i in range(0, amount):
try:
base.append(_make_dict(current_depth + 1, max_depth, rand))
except _NoMoreException:
pass
if t in [tuple]:
base = tuple(base)
- elif t in [long, int]:
+ elif t in six.integer_types:
base = rand.randint(0, 2 ** 8)
elif t in [str]:
base = _random_str(rand)
diff --git a/tests/unittests/test_pathprefix2dict.py b/tests/unittests/test_pathprefix2dict.py
index 590c4b82..38a56dc2 100644
--- a/tests/unittests/test_pathprefix2dict.py
+++ b/tests/unittests/test_pathprefix2dict.py
@@ -1,13 +1,17 @@
from cloudinit import util
-from mocker import MockerTestCase
from .helpers import populate_dir
+import shutil
+import tempfile
+import unittest
-class TestPathPrefix2Dict(MockerTestCase):
+
+class TestPathPrefix2Dict(unittest.TestCase):
def setUp(self):
- self.tmp = self.makeDir()
+ self.tmp = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, self.tmp)
def test_required_only(self):
dirdata = {'f1': 'f1content', 'f2': 'f2content'}
diff --git a/tests/unittests/test_runs/test_merge_run.py b/tests/unittests/test_runs/test_merge_run.py
index 977adb34..d0ec36a9 100644
--- a/tests/unittests/test_runs/test_merge_run.py
+++ b/tests/unittests/test_runs/test_merge_run.py
@@ -1,20 +1,22 @@
import os
+import shutil
+import tempfile
from .. import helpers
-from cloudinit.settings import (PER_INSTANCE)
+from cloudinit.settings import PER_INSTANCE
from cloudinit import stages
from cloudinit import util
class TestMergeRun(helpers.FilesystemMockingTestCase):
def _patchIn(self, root):
- self.restore()
self.patchOS(root)
self.patchUtils(root)
def test_none_ds(self):
- new_root = self.makeDir()
+ new_root = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, new_root)
self.replicateTestRoot('simple_ubuntu', new_root)
cfg = {
'datasource_list': ['None'],
diff --git a/tests/unittests/test_runs/test_simple_run.py b/tests/unittests/test_runs/test_simple_run.py
index c9ba949e..e19e65cd 100644
--- a/tests/unittests/test_runs/test_simple_run.py
+++ b/tests/unittests/test_runs/test_simple_run.py
@@ -1,20 +1,20 @@
import os
+import shutil
+import tempfile
from .. import helpers
-from cloudinit.settings import (PER_INSTANCE)
+from cloudinit.settings import PER_INSTANCE
from cloudinit import stages
from cloudinit import util
class TestSimpleRun(helpers.FilesystemMockingTestCase):
def _patchIn(self, root):
- self.restore()
self.patchOS(root)
self.patchUtils(root)
def _pp_root(self, root, repatch=True):
- self.restore()
for (dirpath, dirnames, filenames) in os.walk(root):
print(dirpath)
for f in filenames:
@@ -33,7 +33,8 @@ class TestSimpleRun(helpers.FilesystemMockingTestCase):
self._patchIn(root)
def test_none_ds(self):
- new_root = self.makeDir()
+ new_root = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, new_root)
self.replicateTestRoot('simple_ubuntu', new_root)
cfg = {
'datasource_list': ['None'],
@@ -41,7 +42,7 @@ class TestSimpleRun(helpers.FilesystemMockingTestCase):
{
'path': '/etc/blah.ini',
'content': 'blah',
- 'permissions': 0755,
+ 'permissions': 0o755,
},
],
'cloud_init_modules': ['write-files'],
diff --git a/tests/unittests/test_templating.py b/tests/unittests/test_templating.py
index 3ba4ed8a..957467f6 100644
--- a/tests/unittests/test_templating.py
+++ b/tests/unittests/test_templating.py
@@ -16,6 +16,9 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
+import six
+import unittest
+
from . import helpers as test_helpers
import textwrap
@@ -38,6 +41,7 @@ class TestTemplates(test_helpers.TestCase):
out_data = templater.basic_render(in_data, {'b': 2})
self.assertEqual(expected_data.strip(), out_data)
+ @unittest.skipIf(six.PY3, 'Cheetah is not compatible with Python 3')
def test_detection(self):
blob = "## template:cheetah"
diff --git a/tests/unittests/test_util.py b/tests/unittests/test_util.py
index 3e079131..7a224230 100644
--- a/tests/unittests/test_util.py
+++ b/tests/unittests/test_util.py
@@ -1,10 +1,19 @@
+from __future__ import print_function
+
import os
import stat
import yaml
+import shutil
+import tempfile
-from mocker import MockerTestCase
from . import helpers
import unittest
+import six
+
+try:
+ from unittest import mock
+except ImportError:
+ import mock
from cloudinit import importer
from cloudinit import util
@@ -61,10 +70,11 @@ class TestGetCfgOptionListOrStr(unittest.TestCase):
self.assertEqual([], result)
-class TestWriteFile(MockerTestCase):
+class TestWriteFile(unittest.TestCase):
def setUp(self):
super(TestWriteFile, self).setUp()
- self.tmp = self.makeDir(prefix="unittest_")
+ self.tmp = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, self.tmp)
def test_basic_usage(self):
"""Verify basic usage with default args."""
@@ -79,7 +89,7 @@ class TestWriteFile(MockerTestCase):
create_contents = f.read()
self.assertEqual(contents, create_contents)
file_stat = os.stat(path)
- self.assertEqual(0644, stat.S_IMODE(file_stat.st_mode))
+ self.assertEqual(0o644, stat.S_IMODE(file_stat.st_mode))
def test_dir_is_created_if_required(self):
"""Verifiy that directories are created is required."""
@@ -97,12 +107,12 @@ class TestWriteFile(MockerTestCase):
path = os.path.join(self.tmp, "NewFile.txt")
contents = "Hey there"
- util.write_file(path, contents, mode=0666)
+ util.write_file(path, contents, mode=0o666)
self.assertTrue(os.path.exists(path))
self.assertTrue(os.path.isfile(path))
file_stat = os.stat(path)
- self.assertEqual(0666, stat.S_IMODE(file_stat.st_mode))
+ self.assertEqual(0o666, stat.S_IMODE(file_stat.st_mode))
def test_custom_omode(self):
"""Verify custom omode works properly."""
@@ -111,7 +121,7 @@ class TestWriteFile(MockerTestCase):
# Create file first with basic content
with open(path, "wb") as f:
- f.write("LINE1\n")
+ f.write(b"LINE1\n")
util.write_file(path, contents, omode="a")
self.assertTrue(os.path.exists(path))
@@ -126,23 +136,24 @@ class TestWriteFile(MockerTestCase):
with open(my_file, "w") as fp:
fp.write("My Content")
- import_mock = self.mocker.replace(importer.import_module,
- passthrough=False)
- import_mock('selinux')
-
fake_se = FakeSelinux(my_file)
- self.mocker.result(fake_se)
- self.mocker.replay()
- with util.SeLinuxGuard(my_file) as is_on:
- self.assertTrue(is_on)
+
+ with mock.patch.object(importer, 'import_module',
+ return_value=fake_se) as mockobj:
+ with util.SeLinuxGuard(my_file) as is_on:
+ self.assertTrue(is_on)
+
self.assertEqual(1, len(fake_se.restored))
self.assertEqual(my_file, fake_se.restored[0])
+ mockobj.assert_called_once_with('selinux')
+
-class TestDeleteDirContents(MockerTestCase):
+class TestDeleteDirContents(unittest.TestCase):
def setUp(self):
super(TestDeleteDirContents, self).setUp()
- self.tmp = self.makeDir(prefix="unittest_")
+ self.tmp = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, self.tmp)
def assertDirEmpty(self, dirname):
self.assertEqual([], os.listdir(dirname))
@@ -157,7 +168,7 @@ class TestDeleteDirContents(MockerTestCase):
def test_deletes_files(self):
"""Single file should be deleted."""
with open(os.path.join(self.tmp, "new_file.txt"), "wb") as f:
- f.write("DELETE ME")
+ f.write(b"DELETE ME")
util.delete_dir_contents(self.tmp)
@@ -185,7 +196,7 @@ class TestDeleteDirContents(MockerTestCase):
os.mkdir(os.path.join(self.tmp, "new_dir"))
f_name = os.path.join(self.tmp, "new_dir", "new_file.txt")
with open(f_name, "wb") as f:
- f.write("DELETE ME")
+ f.write(b"DELETE ME")
util.delete_dir_contents(self.tmp)
@@ -196,7 +207,7 @@ class TestDeleteDirContents(MockerTestCase):
file_name = os.path.join(self.tmp, "new_file.txt")
link_name = os.path.join(self.tmp, "new_file_link.txt")
with open(file_name, "wb") as f:
- f.write("DELETE ME")
+ f.write(b"DELETE ME")
os.symlink(file_name, link_name)
util.delete_dir_contents(self.tmp)
@@ -246,8 +257,8 @@ class TestLoadYaml(unittest.TestCase):
self.mydefault)
def test_python_unicode(self):
- # complex type of python/unicde is explicitly allowed
- myobj = {'1': unicode("FOOBAR")}
+ # complex type of python/unicode is explicitly allowed
+ myobj = {'1': six.text_type("FOOBAR")}
safe_yaml = yaml.dump(myobj)
self.assertEqual(util.load_yaml(blob=safe_yaml,
default=self.mydefault),
@@ -314,13 +325,13 @@ class TestMountinfoParsing(helpers.ResourceUsingTestCase):
class TestReadDMIData(helpers.FilesystemMockingTestCase):
def _patchIn(self, root):
- self.restore()
self.patchOS(root)
self.patchUtils(root)
def _write_key(self, key, content):
"""Mocks the sys path found on Linux systems."""
- new_root = self.makeDir()
+ new_root = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, new_root)
self._patchIn(new_root)
util.ensure_dir(os.path.join('sys', 'class', 'dmi', 'id'))
@@ -332,7 +343,8 @@ class TestReadDMIData(helpers.FilesystemMockingTestCase):
In order to test a missing sys path and call outs to dmidecode, this
function fakes the results of dmidecode to test the results.
"""
- new_root = self.makeDir()
+ new_root = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, new_root)
self._patchIn(new_root)
self.real_which = util.which
self.real_subp = util.subp