summaryrefslogtreecommitdiff
path: root/tests
diff options
context:
space:
mode:
Diffstat (limited to 'tests')
-rw-r--r--tests/unittests/helpers.py56
-rw-r--r--tests/unittests/test_data.py110
2 files changed, 96 insertions, 70 deletions
diff --git a/tests/unittests/helpers.py b/tests/unittests/helpers.py
index 4ca460f1..43dd5aa0 100644
--- a/tests/unittests/helpers.py
+++ b/tests/unittests/helpers.py
@@ -1,7 +1,17 @@
import os
import sys
+import tempfile
import unittest
+try:
+ from unittest import mock
+except ImportError:
+ import mock
+try:
+ from contextlib import ExitStack
+except ImportError:
+ from contextlib2 import ExitStack
+
from cloudinit import helpers as ch
from cloudinit import util
@@ -111,7 +121,11 @@ def retarget_many_wrapper(new_base, am, old_func):
class ResourceUsingTestCase(unittest.TestCase):
- def __init__(self, methodName="runTest"):
+ ## def __init__(self, methodName="runTest"):
+ ## self.resource_path = None
+
+ def setUp(self):
+ unittest.TestCase.setUp(self)
self.resource_path = None
def resourceLocation(self, subname=None):
@@ -139,17 +153,23 @@ class ResourceUsingTestCase(unittest.TestCase):
return fh.read()
def getCloudPaths(self):
+ tmpdir = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, tmpdir)
cp = ch.Paths({
- 'cloud_dir': self.makeDir(),
+ 'cloud_dir': tmpdir,
'templates_dir': self.resourceLocation(),
})
return cp
class FilesystemMockingTestCase(ResourceUsingTestCase):
- def __init__(self, methodName="runTest"):
- ResourceUsingTestCase.__init__(self, methodName)
- self.patched_funcs = []
+ ## def __init__(self, methodName="runTest"):
+ ## ResourceUsingTestCase.__init__(self, methodName)
+
+ def setUp(self):
+ ResourceUsingTestCase.setUp(self)
+ self.patched_funcs = ExitStack()
+ self.addCleanup(self.patched_funcs.close)
def replicateTestRoot(self, example_root, target_root):
real_root = self.resourceLocation()
@@ -163,15 +183,6 @@ class FilesystemMockingTestCase(ResourceUsingTestCase):
make_path = util.abs_join(make_path, f)
shutil.copy(real_path, make_path)
- def tearDown(self):
- self.restore()
- ResourceUsingTestCase.tearDown(self)
-
- def restore(self):
- for (mod, f, func) in self.patched_funcs:
- setattr(mod, f, func)
- self.patched_funcs = []
-
def patchUtils(self, new_root):
patch_funcs = {
util: [('write_file', 1),
@@ -188,8 +199,8 @@ class FilesystemMockingTestCase(ResourceUsingTestCase):
for (f, am) in funcs:
func = getattr(mod, f)
trap_func = retarget_many_wrapper(new_root, am, func)
- setattr(mod, f, trap_func)
- self.patched_funcs.append((mod, f, func))
+ self.patched_funcs.enter_context(
+ mock.patch.object(mod, f, trap_func))
# Handle subprocess calls
func = getattr(util, 'subp')
@@ -197,16 +208,15 @@ class FilesystemMockingTestCase(ResourceUsingTestCase):
def nsubp(*_args, **_kwargs):
return ('', '')
- setattr(util, 'subp', nsubp)
- self.patched_funcs.append((util, 'subp', func))
+ self.patched_funcs.enter_context(
+ mock.patch.object(util, 'subp', nsubp))
def null_func(*_args, **_kwargs):
return None
for f in ['chownbyid', 'chownbyname']:
- func = getattr(util, f)
- setattr(util, f, null_func)
- self.patched_funcs.append((util, f, func))
+ self.patched_funcs.enter_context(
+ mock.patch.object(util, f, null_func))
def patchOS(self, new_root):
patch_funcs = {
@@ -217,8 +227,8 @@ class FilesystemMockingTestCase(ResourceUsingTestCase):
for f in funcs:
func = getattr(mod, f)
trap_func = retarget_many_wrapper(new_root, 1, func)
- setattr(mod, f, trap_func)
- self.patched_funcs.append((mod, f, func))
+ self.patched_funcs.enter_context(
+ mock.patch.object(mod, f, trap_func))
class HttprettyTestCase(TestCase):
diff --git a/tests/unittests/test_data.py b/tests/unittests/test_data.py
index a35afc27..71e02e5d 100644
--- a/tests/unittests/test_data.py
+++ b/tests/unittests/test_data.py
@@ -3,6 +3,13 @@
import gzip
import logging
import os
+import shutil
+import tempfile
+
+try:
+ from unittest import mock
+except ImportError:
+ import mock
from six import BytesIO, StringIO
@@ -43,9 +50,9 @@ class TestConsumeUserData(helpers.FilesystemMockingTestCase):
self._log_handler = None
def tearDown(self):
- helpers.FilesystemMockingTestCase.tearDown(self)
if self._log_handler and self._log:
self._log.removeHandler(self._log_handler)
+ helpers.FilesystemMockingTestCase.tearDown(self)
def _patchIn(self, root):
self.restore()
@@ -71,7 +78,8 @@ class TestConsumeUserData(helpers.FilesystemMockingTestCase):
ci = stages.Init()
ci.datasource = FakeDataSource(blob)
- new_root = self.makeDir()
+ new_root = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, new_root)
self.patchUtils(new_root)
self.patchOS(new_root)
ci.fetch()
@@ -99,7 +107,8 @@ class TestConsumeUserData(helpers.FilesystemMockingTestCase):
{ "op": "add", "path": "/foo", "value": "quxC" }
]
'''
- new_root = self.makeDir()
+ new_root = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, new_root)
self._patchIn(new_root)
initer = stages.Init()
initer.datasource = FakeDataSource(user_blob, vendordata=vendor_blob)
@@ -138,7 +147,8 @@ class TestConsumeUserData(helpers.FilesystemMockingTestCase):
{ "op": "add", "path": "/foo", "value": "quxC" }
]
'''
- new_root = self.makeDir()
+ new_root = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, new_root)
self._patchIn(new_root)
initer = stages.Init()
initer.datasource = FakeDataSource(user_blob, vendordata=vendor_blob)
@@ -184,7 +194,8 @@ c: d
ci = stages.Init()
ci.datasource = FakeDataSource(str(message))
- new_root = self.makeDir()
+ new_root = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, new_root)
self.patchUtils(new_root)
self.patchOS(new_root)
ci.fetch()
@@ -214,7 +225,8 @@ name: user
run:
- z
'''
- new_root = self.makeDir()
+ new_root = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, new_root)
self._patchIn(new_root)
initer = stages.Init()
initer.datasource = FakeDataSource(user_blob, vendordata=vendor_blob)
@@ -249,7 +261,8 @@ vendor_data:
enabled: True
prefix: /bin/true
'''
- new_root = self.makeDir()
+ new_root = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, new_root)
self._patchIn(new_root)
initer = stages.Init()
initer.datasource = FakeDataSource(user_blob, vendordata=vendor_blob)
@@ -309,7 +322,8 @@ p: 1
paths = c_helpers.Paths({}, ds=FakeDataSource(''))
cloud_cfg = handlers.cloud_config.CloudConfigPartHandler(paths)
- new_root = self.makeDir()
+ new_root = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, new_root)
self.patchUtils(new_root)
self.patchOS(new_root)
cloud_cfg.handle_part(None, handlers.CONTENT_START, None, None, None,
@@ -374,7 +388,8 @@ c: 4
message.attach(gzip_part(base_content2))
ci = stages.Init()
ci.datasource = FakeDataSource(str(message))
- new_root = self.makeDir()
+ new_root = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, new_root)
self.patchUtils(new_root)
self.patchOS(new_root)
ci.fetch()
@@ -394,17 +409,15 @@ c: 4
message.set_payload("Just text")
ci.datasource = FakeDataSource(message.as_string())
- mock_write = self.mocker.replace("cloudinit.util.write_file",
- passthrough=False)
- mock_write(ci.paths.get_ipath("cloud_config"), "", 0o600)
- self.mocker.replay()
-
- log_file = self.capture_log(logging.WARNING)
- ci.fetch()
- ci.consume_data()
- self.assertIn(
- "Unhandled unknown content-type (text/plain)",
- log_file.getvalue())
+ with mock.patch('cloudinit.util.write_file') as mockobj:
+ log_file = self.capture_log(logging.WARNING)
+ ci.fetch()
+ ci.consume_data()
+ self.assertIn(
+ "Unhandled unknown content-type (text/plain)",
+ log_file.getvalue())
+ mockobj.assert_called_once_with(
+ ci.paths.get_ipath("cloud_config"), "", 0o600)
def test_shellscript(self):
"""Raw text starting #!/bin/sh is treated as script."""
@@ -413,16 +426,17 @@ c: 4
ci.datasource = FakeDataSource(script)
outpath = os.path.join(ci.paths.get_ipath_cur("scripts"), "part-001")
- mock_write = self.mocker.replace("cloudinit.util.write_file",
- passthrough=False)
- mock_write(ci.paths.get_ipath("cloud_config"), "", 0o600)
- mock_write(outpath, script, 0o700)
- self.mocker.replay()
- log_file = self.capture_log(logging.WARNING)
- ci.fetch()
- ci.consume_data()
- self.assertEqual("", log_file.getvalue())
+ with mock.patch('cloudinit.util.write_file') as mockobj:
+ log_file = self.capture_log(logging.WARNING)
+ ci.fetch()
+ ci.consume_data()
+ self.assertEqual("", log_file.getvalue())
+
+ mockobj.assert_has_calls([
+ mock.call(outpath, script, 0o700),
+ mock.call(ci.paths.get_ipath("cloud_config"), "", 0o600),
+ ])
def test_mime_text_x_shellscript(self):
"""Mime message of type text/x-shellscript is treated as script."""
@@ -433,16 +447,17 @@ c: 4
ci.datasource = FakeDataSource(message.as_string())
outpath = os.path.join(ci.paths.get_ipath_cur("scripts"), "part-001")
- mock_write = self.mocker.replace("cloudinit.util.write_file",
- passthrough=False)
- mock_write(ci.paths.get_ipath("cloud_config"), "", 0o600)
- mock_write(outpath, script, 0o700)
- self.mocker.replay()
- log_file = self.capture_log(logging.WARNING)
- ci.fetch()
- ci.consume_data()
- self.assertEqual("", log_file.getvalue())
+ with mock.patch('cloudinit.util.write_file') as mockobj:
+ log_file = self.capture_log(logging.WARNING)
+ ci.fetch()
+ ci.consume_data()
+ self.assertEqual("", log_file.getvalue())
+
+ mockobj.assert_has_calls([
+ mock.call(outpath, script, 0o700),
+ mock.call(ci.paths.get_ipath("cloud_config"), "", 0o600),
+ ])
def test_mime_text_plain_shell(self):
"""Mime type text/plain starting #!/bin/sh is treated as script."""
@@ -453,13 +468,14 @@ c: 4
ci.datasource = FakeDataSource(message.as_string())
outpath = os.path.join(ci.paths.get_ipath_cur("scripts"), "part-001")
- mock_write = self.mocker.replace("cloudinit.util.write_file",
- passthrough=False)
- mock_write(outpath, script, 0o700)
- mock_write(ci.paths.get_ipath("cloud_config"), "", 0o600)
- self.mocker.replay()
- log_file = self.capture_log(logging.WARNING)
- ci.fetch()
- ci.consume_data()
- self.assertEqual("", log_file.getvalue())
+ with mock.patch('cloudinit.util.write_file') as mockobj:
+ log_file = self.capture_log(logging.WARNING)
+ ci.fetch()
+ ci.consume_data()
+ self.assertEqual("", log_file.getvalue())
+
+ mockobj.assert_has_calls([
+ mock.call(outpath, script, 0o700),
+ mock.call(ci.paths.get_ipath("cloud_config"), "", 0o600),
+ ])