diff options
Diffstat (limited to 'tests/unittests/helpers.py')
-rw-r--r-- | tests/unittests/helpers.py | 191 |
1 files changed, 109 insertions, 82 deletions
diff --git a/tests/unittests/helpers.py b/tests/unittests/helpers.py index e9afbd36..67fed8c9 100644 --- a/tests/unittests/helpers.py +++ b/tests/unittests/helpers.py @@ -1,7 +1,6 @@ # This file is part of cloud-init. See LICENSE file for license information. import functools -import httpretty import io import logging import os @@ -12,21 +11,23 @@ import sys import tempfile import time import unittest -from pathlib import Path from contextlib import ExitStack, contextmanager +from pathlib import Path from unittest import mock from unittest.util import strclass +import httpretty + import cloudinit -from cloudinit.config.schema import ( - SchemaValidationError, validate_cloudconfig_schema) -from cloudinit import cloud -from cloudinit import distros +from cloudinit import cloud, distros from cloudinit import helpers as ch +from cloudinit import subp, util +from cloudinit.config.schema import ( + SchemaValidationError, + validate_cloudconfig_schema, +) from cloudinit.sources import DataSourceNone from cloudinit.templater import JINJA_AVAILABLE -from cloudinit import subp -from cloudinit import util _real_subp = subp.subp @@ -66,11 +67,11 @@ def retarget_many_wrapper(new_base, am, old_func): if isinstance(path, str): n_args[i] = rebase_path(path, new_base) return old_func(*n_args, **kwds) + return wrapper class TestCase(unittest.TestCase): - def reset_global_state(self): """Reset any global state to its original settings. @@ -93,13 +94,13 @@ class TestCase(unittest.TestCase): self.reset_global_state() def shortDescription(self): - return strclass(self.__class__) + '.' + self._testMethodName + return strclass(self.__class__) + "." + self._testMethodName def add_patch(self, target, attr, *args, **kwargs): """Patches specified target object and sets it as attr on test instance also schedules cleanup""" - if 'autospec' not in kwargs: - kwargs['autospec'] = True + if "autospec" not in kwargs: + kwargs["autospec"] = True m = mock.patch(target, *args, **kwargs) p = m.start() self.addCleanup(m.stop) @@ -108,7 +109,7 @@ class TestCase(unittest.TestCase): class CiTestCase(TestCase): """This is the preferred test case base class unless user - needs other test case classes below.""" + needs other test case classes below.""" # Subclass overrides for specific test behavior # Whether or not a unit test needs logfile setup @@ -131,7 +132,7 @@ class CiTestCase(TestCase): # Create a log handler so unit tests can search expected logs. self.logger = logging.getLogger() self.logs = io.StringIO() - formatter = logging.Formatter('%(levelname)s: %(message)s') + formatter = logging.Formatter("%(levelname)s: %(message)s") handler = logging.StreamHandler(self.logs) handler.setFormatter(formatter) self.old_handlers = self.logger.handlers @@ -142,12 +143,13 @@ class CiTestCase(TestCase): subp.subp = self._fake_subp def _fake_subp(self, *args, **kwargs): - if 'args' in kwargs: - cmd = kwargs['args'] + if "args" in kwargs: + cmd = kwargs["args"] else: if not args: raise TypeError( - "subp() missing 1 required positional argument: 'args'") + "subp() missing 1 required positional argument: 'args'" + ) cmd = args[0] if not isinstance(cmd, str): @@ -158,16 +160,19 @@ class CiTestCase(TestCase): if isinstance(self.allowed_subp, bool): pass_through = self.allowed_subp else: - pass_through = ( - (cmd in self.allowed_subp) or - (self.SUBP_SHELL_TRUE in self.allowed_subp and - kwargs.get('shell'))) + pass_through = (cmd in self.allowed_subp) or ( + self.SUBP_SHELL_TRUE in self.allowed_subp + and kwargs.get("shell") + ) if pass_through: return _real_subp(*args, **kwargs) raise Exception( - "called subp. set self.allowed_subp=True to allow\n subp(%s)" % - ', '.join([str(repr(a)) for a in args] + - ["%s=%s" % (k, repr(v)) for k, v in kwargs.items()])) + "called subp. set self.allowed_subp=True to allow\n subp(%s)" + % ", ".join( + [str(repr(a)) for a in args] + + ["%s=%s" % (k, repr(v)) for k, v in kwargs.items()] + ) + ) def tearDown(self): if self.with_logs: @@ -180,12 +185,12 @@ class CiTestCase(TestCase): def tmp_dir(self, dir=None, cleanup=True): # return a full path to a temporary directory that will be cleaned up. if dir is None: - tmpd = tempfile.mkdtemp( - prefix="ci-%s." % self.__class__.__name__) + tmpd = tempfile.mkdtemp(prefix="ci-%s." % self.__class__.__name__) else: tmpd = tempfile.mkdtemp(dir=dir) self.addCleanup( - functools.partial(shutil.rmtree, tmpd, ignore_errors=True)) + functools.partial(shutil.rmtree, tmpd, ignore_errors=True) + ) return tmpd def tmp_path(self, path, dir=None): @@ -208,7 +213,7 @@ class CiTestCase(TestCase): if not sys_cfg: sys_cfg = {} tmp_paths = {} - for var in ['templates_dir', 'run_dir', 'cloud_dir']: + for var in ["templates_dir", "run_dir", "cloud_dir"]: tmp_paths[var] = self.tmp_path(var, dir=self.new_root) util.ensure_dir(tmp_paths[var]) self.paths = ch.Paths(tmp_paths) @@ -221,13 +226,13 @@ class CiTestCase(TestCase): @classmethod def random_string(cls, length=8): - """ return a random lowercase string with default length of 8""" - return ''.join( - random.choice(string.ascii_lowercase) for _ in range(length)) + """return a random lowercase string with default length of 8""" + return "".join( + random.choice(string.ascii_lowercase) for _ in range(length) + ) class ResourceUsingTestCase(CiTestCase): - def setUp(self): super(ResourceUsingTestCase, self).setUp() self.resource_path = None @@ -235,14 +240,13 @@ class ResourceUsingTestCase(CiTestCase): def getCloudPaths(self, ds=None): tmpdir = tempfile.mkdtemp() self.addCleanup(shutil.rmtree, tmpdir) - cp = ch.Paths({'cloud_dir': tmpdir, - 'templates_dir': resourceLocation()}, - ds=ds) + cp = ch.Paths( + {"cloud_dir": tmpdir, "templates_dir": resourceLocation()}, ds=ds + ) return cp class FilesystemMockingTestCase(ResourceUsingTestCase): - def setUp(self): super(FilesystemMockingTestCase, self).setUp() self.patched_funcs = ExitStack() @@ -253,10 +257,10 @@ class FilesystemMockingTestCase(ResourceUsingTestCase): def replicateTestRoot(self, example_root, target_root): real_root = resourceLocation() - real_root = os.path.join(real_root, 'roots', example_root) + real_root = os.path.join(real_root, "roots", example_root) for (dir_path, _dirnames, filenames) in os.walk(real_root): real_path = dir_path - make_path = rebase_path(real_path[len(real_root):], target_root) + make_path = rebase_path(real_path[len(real_root) :], target_root) util.ensure_dir(make_path) for f in filenames: real_path = util.abs_join(real_path, f) @@ -265,72 +269,89 @@ class FilesystemMockingTestCase(ResourceUsingTestCase): def patchUtils(self, new_root): patch_funcs = { - util: [('write_file', 1), - ('append_file', 1), - ('load_file', 1), - ('ensure_dir', 1), - ('chmod', 1), - ('delete_dir_contents', 1), - ('del_file', 1), - ('sym_link', -1), - ('copy', -1)], + util: [ + ("write_file", 1), + ("append_file", 1), + ("load_file", 1), + ("ensure_dir", 1), + ("chmod", 1), + ("delete_dir_contents", 1), + ("del_file", 1), + ("sym_link", -1), + ("copy", -1), + ], } for (mod, funcs) in patch_funcs.items(): for (f, am) in funcs: func = getattr(mod, f) trap_func = retarget_many_wrapper(new_root, am, func) self.patched_funcs.enter_context( - mock.patch.object(mod, f, trap_func)) + mock.patch.object(mod, f, trap_func) + ) # Handle subprocess calls - func = getattr(subp, 'subp') + func = getattr(subp, "subp") def nsubp(*_args, **_kwargs): - return ('', '') + return ("", "") self.patched_funcs.enter_context( - mock.patch.object(subp, 'subp', nsubp)) + mock.patch.object(subp, "subp", nsubp) + ) def null_func(*_args, **_kwargs): return None - for f in ['chownbyid', 'chownbyname']: + for f in ["chownbyid", "chownbyname"]: self.patched_funcs.enter_context( - mock.patch.object(util, f, null_func)) + mock.patch.object(util, f, null_func) + ) def patchOS(self, new_root): patch_funcs = { - os.path: [('isfile', 1), ('exists', 1), - ('islink', 1), ('isdir', 1), ('lexists', 1)], - os: [('listdir', 1), ('mkdir', 1), - ('lstat', 1), ('symlink', 2), - ('stat', 1)] + os.path: [ + ("isfile", 1), + ("exists", 1), + ("islink", 1), + ("isdir", 1), + ("lexists", 1), + ], + os: [ + ("listdir", 1), + ("mkdir", 1), + ("lstat", 1), + ("symlink", 2), + ("stat", 1), + ], } - if hasattr(os, 'scandir'): + if hasattr(os, "scandir"): # py27 does not have scandir - patch_funcs[os].append(('scandir', 1)) + patch_funcs[os].append(("scandir", 1)) for (mod, funcs) in patch_funcs.items(): for f, nargs in funcs: func = getattr(mod, f) trap_func = retarget_many_wrapper(new_root, nargs, func) self.patched_funcs.enter_context( - mock.patch.object(mod, f, trap_func)) + mock.patch.object(mod, f, trap_func) + ) def patchOpen(self, new_root): trap_func = retarget_many_wrapper(new_root, 1, open) self.patched_funcs.enter_context( - mock.patch('builtins.open', trap_func) + mock.patch("builtins.open", trap_func) ) def patchStdoutAndStderr(self, stdout=None, stderr=None): if stdout is not None: self.patched_funcs.enter_context( - mock.patch.object(sys, 'stdout', stdout)) + mock.patch.object(sys, "stdout", stdout) + ) if stderr is not None: self.patched_funcs.enter_context( - mock.patch.object(sys, 'stderr', stderr)) + mock.patch.object(sys, "stderr", stderr) + ) def reRoot(self, root=None): if root is None: @@ -355,33 +376,32 @@ class HttprettyTestCase(CiTestCase): # And make sure reset and enable/disable are done. def setUp(self): - self.restore_proxy = os.environ.get('http_proxy') + self.restore_proxy = os.environ.get("http_proxy") if self.restore_proxy is not None: - del os.environ['http_proxy'] + del os.environ["http_proxy"] super(HttprettyTestCase, self).setUp() httpretty.HTTPretty.allow_net_connect = False httpretty.reset() httpretty.enable() # Stop the logging from HttpPretty so our logs don't get mixed # up with its logs - logging.getLogger('httpretty.core').setLevel(logging.CRITICAL) + logging.getLogger("httpretty.core").setLevel(logging.CRITICAL) def tearDown(self): httpretty.disable() httpretty.reset() if self.restore_proxy: - os.environ['http_proxy'] = self.restore_proxy + os.environ["http_proxy"] = self.restore_proxy super(HttprettyTestCase, self).tearDown() class SchemaTestCaseMixin(unittest.TestCase): - def assertSchemaValid(self, cfg, msg="Valid Schema failed validation."): """Assert the config is valid per self.schema. If there is only one top level key in the schema properties, then the cfg will be put under that key.""" - props = list(self.schema.get('properties')) + props = list(self.schema.get("properties")) # put cfg under top level key if there is only one in the schema if len(props) == 1: cfg = {props[0]: cfg} @@ -402,7 +422,7 @@ def populate_dir(path, files): if isinstance(content, bytes): fp.write(content) else: - fp.write(content.encode('utf-8')) + fp.write(content.encode("utf-8")) fp.close() ret.append(p) @@ -425,7 +445,7 @@ def dir2dict(startdir, prefix=None): for root, _dirs, files in os.walk(startdir): for fname in files: fpath = os.path.join(root, fname) - key = fpath[len(prefix):] + key = fpath[len(prefix) :] flist[key] = util.load_file(fpath) return flist @@ -443,16 +463,16 @@ def wrap_and_call(prefix, mocks, func, *args, **kwargs): return_value: return from 'func' """ - delim = '.' + delim = "." if prefix is None: - prefix = '' + prefix = "" prefix = prefix.rstrip(delim) unwraps = [] for fname, kw in mocks.items(): if prefix: fname = delim.join((prefix, fname)) if not isinstance(kw, dict): - kw = {'return_value': kw} + kw = {"return_value": kw} p = mock.patch(fname, **kw) p.start() unwraps.append(p) @@ -464,19 +484,20 @@ def wrap_and_call(prefix, mocks, func, *args, **kwargs): def resourceLocation(subname=None): - path = cloud_init_project_dir('tests/data') + path = cloud_init_project_dir("tests/data") if not subname: return path return os.path.join(path, subname) -def readResource(name, mode='r'): +def readResource(name, mode="r"): with open(resourceLocation(name), mode) as fh: return fh.read() try: import jsonschema + assert jsonschema # avoid pyflakes error F401: import unused _missing_jsonschema_dep = False except ImportError: @@ -485,7 +506,8 @@ except ImportError: def skipUnlessJsonSchema(): return skipIf( - _missing_jsonschema_dep, "No python-jsonschema dependency present.") + _missing_jsonschema_dep, "No python-jsonschema dependency present." + ) def skipUnlessJinja(): @@ -497,13 +519,17 @@ def skipIfJinja(): # older versions of mock do not have the useful 'assert_not_called' -if not hasattr(mock.Mock, 'assert_not_called'): +if not hasattr(mock.Mock, "assert_not_called"): + def __mock_assert_not_called(mmock): if mmock.call_count != 0: - msg = ("[citest] Expected '%s' to not have been called. " - "Called %s times." % - (mmock._mock_name or 'mock', mmock.call_count)) + msg = ( + "[citest] Expected '%s' to not have been called. " + "Called %s times." + % (mmock._mock_name or "mock", mmock.call_count) + ) raise AssertionError(msg) + mock.Mock.assert_not_called = __mock_assert_not_called @@ -524,4 +550,5 @@ def cloud_init_project_dir(sub_path: str) -> str: """ return str(get_top_level_dir() / sub_path) + # vi: ts=4 expandtab |