diff options
Diffstat (limited to 'tests')
-rw-r--r-- | tests/unittests/helpers.py | 56 | ||||
-rw-r--r-- | tests/unittests/test_data.py | 110 |
2 files changed, 96 insertions, 70 deletions
diff --git a/tests/unittests/helpers.py b/tests/unittests/helpers.py index 4ca460f1..43dd5aa0 100644 --- a/tests/unittests/helpers.py +++ b/tests/unittests/helpers.py @@ -1,7 +1,17 @@ import os import sys +import tempfile import unittest +try: + from unittest import mock +except ImportError: + import mock +try: + from contextlib import ExitStack +except ImportError: + from contextlib2 import ExitStack + from cloudinit import helpers as ch from cloudinit import util @@ -111,7 +121,11 @@ def retarget_many_wrapper(new_base, am, old_func): class ResourceUsingTestCase(unittest.TestCase): - def __init__(self, methodName="runTest"): + ## def __init__(self, methodName="runTest"): + ## self.resource_path = None + + def setUp(self): + unittest.TestCase.setUp(self) self.resource_path = None def resourceLocation(self, subname=None): @@ -139,17 +153,23 @@ class ResourceUsingTestCase(unittest.TestCase): return fh.read() def getCloudPaths(self): + tmpdir = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, tmpdir) cp = ch.Paths({ - 'cloud_dir': self.makeDir(), + 'cloud_dir': tmpdir, 'templates_dir': self.resourceLocation(), }) return cp class FilesystemMockingTestCase(ResourceUsingTestCase): - def __init__(self, methodName="runTest"): - ResourceUsingTestCase.__init__(self, methodName) - self.patched_funcs = [] + ## def __init__(self, methodName="runTest"): + ## ResourceUsingTestCase.__init__(self, methodName) + + def setUp(self): + ResourceUsingTestCase.setUp(self) + self.patched_funcs = ExitStack() + self.addCleanup(self.patched_funcs.close) def replicateTestRoot(self, example_root, target_root): real_root = self.resourceLocation() @@ -163,15 +183,6 @@ class FilesystemMockingTestCase(ResourceUsingTestCase): make_path = util.abs_join(make_path, f) shutil.copy(real_path, make_path) - def tearDown(self): - self.restore() - ResourceUsingTestCase.tearDown(self) - - def restore(self): - for (mod, f, func) in self.patched_funcs: - setattr(mod, f, func) - self.patched_funcs = [] - def patchUtils(self, new_root): patch_funcs = { util: [('write_file', 1), @@ -188,8 +199,8 @@ class FilesystemMockingTestCase(ResourceUsingTestCase): for (f, am) in funcs: func = getattr(mod, f) trap_func = retarget_many_wrapper(new_root, am, func) - setattr(mod, f, trap_func) - self.patched_funcs.append((mod, f, func)) + self.patched_funcs.enter_context( + mock.patch.object(mod, f, trap_func)) # Handle subprocess calls func = getattr(util, 'subp') @@ -197,16 +208,15 @@ class FilesystemMockingTestCase(ResourceUsingTestCase): def nsubp(*_args, **_kwargs): return ('', '') - setattr(util, 'subp', nsubp) - self.patched_funcs.append((util, 'subp', func)) + self.patched_funcs.enter_context( + mock.patch.object(util, 'subp', nsubp)) def null_func(*_args, **_kwargs): return None for f in ['chownbyid', 'chownbyname']: - func = getattr(util, f) - setattr(util, f, null_func) - self.patched_funcs.append((util, f, func)) + self.patched_funcs.enter_context( + mock.patch.object(util, f, null_func)) def patchOS(self, new_root): patch_funcs = { @@ -217,8 +227,8 @@ class FilesystemMockingTestCase(ResourceUsingTestCase): for f in funcs: func = getattr(mod, f) trap_func = retarget_many_wrapper(new_root, 1, func) - setattr(mod, f, trap_func) - self.patched_funcs.append((mod, f, func)) + self.patched_funcs.enter_context( + mock.patch.object(mod, f, trap_func)) class HttprettyTestCase(TestCase): diff --git a/tests/unittests/test_data.py b/tests/unittests/test_data.py index a35afc27..71e02e5d 100644 --- a/tests/unittests/test_data.py +++ b/tests/unittests/test_data.py @@ -3,6 +3,13 @@ import gzip import logging import os +import shutil +import tempfile + +try: + from unittest import mock +except ImportError: + import mock from six import BytesIO, StringIO @@ -43,9 +50,9 @@ class TestConsumeUserData(helpers.FilesystemMockingTestCase): self._log_handler = None def tearDown(self): - helpers.FilesystemMockingTestCase.tearDown(self) if self._log_handler and self._log: self._log.removeHandler(self._log_handler) + helpers.FilesystemMockingTestCase.tearDown(self) def _patchIn(self, root): self.restore() @@ -71,7 +78,8 @@ class TestConsumeUserData(helpers.FilesystemMockingTestCase): ci = stages.Init() ci.datasource = FakeDataSource(blob) - new_root = self.makeDir() + new_root = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, new_root) self.patchUtils(new_root) self.patchOS(new_root) ci.fetch() @@ -99,7 +107,8 @@ class TestConsumeUserData(helpers.FilesystemMockingTestCase): { "op": "add", "path": "/foo", "value": "quxC" } ] ''' - new_root = self.makeDir() + new_root = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, new_root) self._patchIn(new_root) initer = stages.Init() initer.datasource = FakeDataSource(user_blob, vendordata=vendor_blob) @@ -138,7 +147,8 @@ class TestConsumeUserData(helpers.FilesystemMockingTestCase): { "op": "add", "path": "/foo", "value": "quxC" } ] ''' - new_root = self.makeDir() + new_root = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, new_root) self._patchIn(new_root) initer = stages.Init() initer.datasource = FakeDataSource(user_blob, vendordata=vendor_blob) @@ -184,7 +194,8 @@ c: d ci = stages.Init() ci.datasource = FakeDataSource(str(message)) - new_root = self.makeDir() + new_root = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, new_root) self.patchUtils(new_root) self.patchOS(new_root) ci.fetch() @@ -214,7 +225,8 @@ name: user run: - z ''' - new_root = self.makeDir() + new_root = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, new_root) self._patchIn(new_root) initer = stages.Init() initer.datasource = FakeDataSource(user_blob, vendordata=vendor_blob) @@ -249,7 +261,8 @@ vendor_data: enabled: True prefix: /bin/true ''' - new_root = self.makeDir() + new_root = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, new_root) self._patchIn(new_root) initer = stages.Init() initer.datasource = FakeDataSource(user_blob, vendordata=vendor_blob) @@ -309,7 +322,8 @@ p: 1 paths = c_helpers.Paths({}, ds=FakeDataSource('')) cloud_cfg = handlers.cloud_config.CloudConfigPartHandler(paths) - new_root = self.makeDir() + new_root = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, new_root) self.patchUtils(new_root) self.patchOS(new_root) cloud_cfg.handle_part(None, handlers.CONTENT_START, None, None, None, @@ -374,7 +388,8 @@ c: 4 message.attach(gzip_part(base_content2)) ci = stages.Init() ci.datasource = FakeDataSource(str(message)) - new_root = self.makeDir() + new_root = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, new_root) self.patchUtils(new_root) self.patchOS(new_root) ci.fetch() @@ -394,17 +409,15 @@ c: 4 message.set_payload("Just text") ci.datasource = FakeDataSource(message.as_string()) - mock_write = self.mocker.replace("cloudinit.util.write_file", - passthrough=False) - mock_write(ci.paths.get_ipath("cloud_config"), "", 0o600) - self.mocker.replay() - - log_file = self.capture_log(logging.WARNING) - ci.fetch() - ci.consume_data() - self.assertIn( - "Unhandled unknown content-type (text/plain)", - log_file.getvalue()) + with mock.patch('cloudinit.util.write_file') as mockobj: + log_file = self.capture_log(logging.WARNING) + ci.fetch() + ci.consume_data() + self.assertIn( + "Unhandled unknown content-type (text/plain)", + log_file.getvalue()) + mockobj.assert_called_once_with( + ci.paths.get_ipath("cloud_config"), "", 0o600) def test_shellscript(self): """Raw text starting #!/bin/sh is treated as script.""" @@ -413,16 +426,17 @@ c: 4 ci.datasource = FakeDataSource(script) outpath = os.path.join(ci.paths.get_ipath_cur("scripts"), "part-001") - mock_write = self.mocker.replace("cloudinit.util.write_file", - passthrough=False) - mock_write(ci.paths.get_ipath("cloud_config"), "", 0o600) - mock_write(outpath, script, 0o700) - self.mocker.replay() - log_file = self.capture_log(logging.WARNING) - ci.fetch() - ci.consume_data() - self.assertEqual("", log_file.getvalue()) + with mock.patch('cloudinit.util.write_file') as mockobj: + log_file = self.capture_log(logging.WARNING) + ci.fetch() + ci.consume_data() + self.assertEqual("", log_file.getvalue()) + + mockobj.assert_has_calls([ + mock.call(outpath, script, 0o700), + mock.call(ci.paths.get_ipath("cloud_config"), "", 0o600), + ]) def test_mime_text_x_shellscript(self): """Mime message of type text/x-shellscript is treated as script.""" @@ -433,16 +447,17 @@ c: 4 ci.datasource = FakeDataSource(message.as_string()) outpath = os.path.join(ci.paths.get_ipath_cur("scripts"), "part-001") - mock_write = self.mocker.replace("cloudinit.util.write_file", - passthrough=False) - mock_write(ci.paths.get_ipath("cloud_config"), "", 0o600) - mock_write(outpath, script, 0o700) - self.mocker.replay() - log_file = self.capture_log(logging.WARNING) - ci.fetch() - ci.consume_data() - self.assertEqual("", log_file.getvalue()) + with mock.patch('cloudinit.util.write_file') as mockobj: + log_file = self.capture_log(logging.WARNING) + ci.fetch() + ci.consume_data() + self.assertEqual("", log_file.getvalue()) + + mockobj.assert_has_calls([ + mock.call(outpath, script, 0o700), + mock.call(ci.paths.get_ipath("cloud_config"), "", 0o600), + ]) def test_mime_text_plain_shell(self): """Mime type text/plain starting #!/bin/sh is treated as script.""" @@ -453,13 +468,14 @@ c: 4 ci.datasource = FakeDataSource(message.as_string()) outpath = os.path.join(ci.paths.get_ipath_cur("scripts"), "part-001") - mock_write = self.mocker.replace("cloudinit.util.write_file", - passthrough=False) - mock_write(outpath, script, 0o700) - mock_write(ci.paths.get_ipath("cloud_config"), "", 0o600) - self.mocker.replay() - log_file = self.capture_log(logging.WARNING) - ci.fetch() - ci.consume_data() - self.assertEqual("", log_file.getvalue()) + with mock.patch('cloudinit.util.write_file') as mockobj: + log_file = self.capture_log(logging.WARNING) + ci.fetch() + ci.consume_data() + self.assertEqual("", log_file.getvalue()) + + mockobj.assert_has_calls([ + mock.call(outpath, script, 0o700), + mock.call(ci.paths.get_ipath("cloud_config"), "", 0o600), + ]) |