summaryrefslogtreecommitdiff
path: root/tests/unittests/helpers.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/unittests/helpers.py')
-rw-r--r--tests/unittests/helpers.py191
1 files changed, 109 insertions, 82 deletions
diff --git a/tests/unittests/helpers.py b/tests/unittests/helpers.py
index e9afbd36..67fed8c9 100644
--- a/tests/unittests/helpers.py
+++ b/tests/unittests/helpers.py
@@ -1,7 +1,6 @@
# This file is part of cloud-init. See LICENSE file for license information.
import functools
-import httpretty
import io
import logging
import os
@@ -12,21 +11,23 @@ import sys
import tempfile
import time
import unittest
-from pathlib import Path
from contextlib import ExitStack, contextmanager
+from pathlib import Path
from unittest import mock
from unittest.util import strclass
+import httpretty
+
import cloudinit
-from cloudinit.config.schema import (
- SchemaValidationError, validate_cloudconfig_schema)
-from cloudinit import cloud
-from cloudinit import distros
+from cloudinit import cloud, distros
from cloudinit import helpers as ch
+from cloudinit import subp, util
+from cloudinit.config.schema import (
+ SchemaValidationError,
+ validate_cloudconfig_schema,
+)
from cloudinit.sources import DataSourceNone
from cloudinit.templater import JINJA_AVAILABLE
-from cloudinit import subp
-from cloudinit import util
_real_subp = subp.subp
@@ -66,11 +67,11 @@ def retarget_many_wrapper(new_base, am, old_func):
if isinstance(path, str):
n_args[i] = rebase_path(path, new_base)
return old_func(*n_args, **kwds)
+
return wrapper
class TestCase(unittest.TestCase):
-
def reset_global_state(self):
"""Reset any global state to its original settings.
@@ -93,13 +94,13 @@ class TestCase(unittest.TestCase):
self.reset_global_state()
def shortDescription(self):
- return strclass(self.__class__) + '.' + self._testMethodName
+ return strclass(self.__class__) + "." + self._testMethodName
def add_patch(self, target, attr, *args, **kwargs):
"""Patches specified target object and sets it as attr on test
instance also schedules cleanup"""
- if 'autospec' not in kwargs:
- kwargs['autospec'] = True
+ if "autospec" not in kwargs:
+ kwargs["autospec"] = True
m = mock.patch(target, *args, **kwargs)
p = m.start()
self.addCleanup(m.stop)
@@ -108,7 +109,7 @@ class TestCase(unittest.TestCase):
class CiTestCase(TestCase):
"""This is the preferred test case base class unless user
- needs other test case classes below."""
+ needs other test case classes below."""
# Subclass overrides for specific test behavior
# Whether or not a unit test needs logfile setup
@@ -131,7 +132,7 @@ class CiTestCase(TestCase):
# Create a log handler so unit tests can search expected logs.
self.logger = logging.getLogger()
self.logs = io.StringIO()
- formatter = logging.Formatter('%(levelname)s: %(message)s')
+ formatter = logging.Formatter("%(levelname)s: %(message)s")
handler = logging.StreamHandler(self.logs)
handler.setFormatter(formatter)
self.old_handlers = self.logger.handlers
@@ -142,12 +143,13 @@ class CiTestCase(TestCase):
subp.subp = self._fake_subp
def _fake_subp(self, *args, **kwargs):
- if 'args' in kwargs:
- cmd = kwargs['args']
+ if "args" in kwargs:
+ cmd = kwargs["args"]
else:
if not args:
raise TypeError(
- "subp() missing 1 required positional argument: 'args'")
+ "subp() missing 1 required positional argument: 'args'"
+ )
cmd = args[0]
if not isinstance(cmd, str):
@@ -158,16 +160,19 @@ class CiTestCase(TestCase):
if isinstance(self.allowed_subp, bool):
pass_through = self.allowed_subp
else:
- pass_through = (
- (cmd in self.allowed_subp) or
- (self.SUBP_SHELL_TRUE in self.allowed_subp and
- kwargs.get('shell')))
+ pass_through = (cmd in self.allowed_subp) or (
+ self.SUBP_SHELL_TRUE in self.allowed_subp
+ and kwargs.get("shell")
+ )
if pass_through:
return _real_subp(*args, **kwargs)
raise Exception(
- "called subp. set self.allowed_subp=True to allow\n subp(%s)" %
- ', '.join([str(repr(a)) for a in args] +
- ["%s=%s" % (k, repr(v)) for k, v in kwargs.items()]))
+ "called subp. set self.allowed_subp=True to allow\n subp(%s)"
+ % ", ".join(
+ [str(repr(a)) for a in args]
+ + ["%s=%s" % (k, repr(v)) for k, v in kwargs.items()]
+ )
+ )
def tearDown(self):
if self.with_logs:
@@ -180,12 +185,12 @@ class CiTestCase(TestCase):
def tmp_dir(self, dir=None, cleanup=True):
# return a full path to a temporary directory that will be cleaned up.
if dir is None:
- tmpd = tempfile.mkdtemp(
- prefix="ci-%s." % self.__class__.__name__)
+ tmpd = tempfile.mkdtemp(prefix="ci-%s." % self.__class__.__name__)
else:
tmpd = tempfile.mkdtemp(dir=dir)
self.addCleanup(
- functools.partial(shutil.rmtree, tmpd, ignore_errors=True))
+ functools.partial(shutil.rmtree, tmpd, ignore_errors=True)
+ )
return tmpd
def tmp_path(self, path, dir=None):
@@ -208,7 +213,7 @@ class CiTestCase(TestCase):
if not sys_cfg:
sys_cfg = {}
tmp_paths = {}
- for var in ['templates_dir', 'run_dir', 'cloud_dir']:
+ for var in ["templates_dir", "run_dir", "cloud_dir"]:
tmp_paths[var] = self.tmp_path(var, dir=self.new_root)
util.ensure_dir(tmp_paths[var])
self.paths = ch.Paths(tmp_paths)
@@ -221,13 +226,13 @@ class CiTestCase(TestCase):
@classmethod
def random_string(cls, length=8):
- """ return a random lowercase string with default length of 8"""
- return ''.join(
- random.choice(string.ascii_lowercase) for _ in range(length))
+ """return a random lowercase string with default length of 8"""
+ return "".join(
+ random.choice(string.ascii_lowercase) for _ in range(length)
+ )
class ResourceUsingTestCase(CiTestCase):
-
def setUp(self):
super(ResourceUsingTestCase, self).setUp()
self.resource_path = None
@@ -235,14 +240,13 @@ class ResourceUsingTestCase(CiTestCase):
def getCloudPaths(self, ds=None):
tmpdir = tempfile.mkdtemp()
self.addCleanup(shutil.rmtree, tmpdir)
- cp = ch.Paths({'cloud_dir': tmpdir,
- 'templates_dir': resourceLocation()},
- ds=ds)
+ cp = ch.Paths(
+ {"cloud_dir": tmpdir, "templates_dir": resourceLocation()}, ds=ds
+ )
return cp
class FilesystemMockingTestCase(ResourceUsingTestCase):
-
def setUp(self):
super(FilesystemMockingTestCase, self).setUp()
self.patched_funcs = ExitStack()
@@ -253,10 +257,10 @@ class FilesystemMockingTestCase(ResourceUsingTestCase):
def replicateTestRoot(self, example_root, target_root):
real_root = resourceLocation()
- real_root = os.path.join(real_root, 'roots', example_root)
+ real_root = os.path.join(real_root, "roots", example_root)
for (dir_path, _dirnames, filenames) in os.walk(real_root):
real_path = dir_path
- make_path = rebase_path(real_path[len(real_root):], target_root)
+ make_path = rebase_path(real_path[len(real_root) :], target_root)
util.ensure_dir(make_path)
for f in filenames:
real_path = util.abs_join(real_path, f)
@@ -265,72 +269,89 @@ class FilesystemMockingTestCase(ResourceUsingTestCase):
def patchUtils(self, new_root):
patch_funcs = {
- util: [('write_file', 1),
- ('append_file', 1),
- ('load_file', 1),
- ('ensure_dir', 1),
- ('chmod', 1),
- ('delete_dir_contents', 1),
- ('del_file', 1),
- ('sym_link', -1),
- ('copy', -1)],
+ util: [
+ ("write_file", 1),
+ ("append_file", 1),
+ ("load_file", 1),
+ ("ensure_dir", 1),
+ ("chmod", 1),
+ ("delete_dir_contents", 1),
+ ("del_file", 1),
+ ("sym_link", -1),
+ ("copy", -1),
+ ],
}
for (mod, funcs) in patch_funcs.items():
for (f, am) in funcs:
func = getattr(mod, f)
trap_func = retarget_many_wrapper(new_root, am, func)
self.patched_funcs.enter_context(
- mock.patch.object(mod, f, trap_func))
+ mock.patch.object(mod, f, trap_func)
+ )
# Handle subprocess calls
- func = getattr(subp, 'subp')
+ func = getattr(subp, "subp")
def nsubp(*_args, **_kwargs):
- return ('', '')
+ return ("", "")
self.patched_funcs.enter_context(
- mock.patch.object(subp, 'subp', nsubp))
+ mock.patch.object(subp, "subp", nsubp)
+ )
def null_func(*_args, **_kwargs):
return None
- for f in ['chownbyid', 'chownbyname']:
+ for f in ["chownbyid", "chownbyname"]:
self.patched_funcs.enter_context(
- mock.patch.object(util, f, null_func))
+ mock.patch.object(util, f, null_func)
+ )
def patchOS(self, new_root):
patch_funcs = {
- os.path: [('isfile', 1), ('exists', 1),
- ('islink', 1), ('isdir', 1), ('lexists', 1)],
- os: [('listdir', 1), ('mkdir', 1),
- ('lstat', 1), ('symlink', 2),
- ('stat', 1)]
+ os.path: [
+ ("isfile", 1),
+ ("exists", 1),
+ ("islink", 1),
+ ("isdir", 1),
+ ("lexists", 1),
+ ],
+ os: [
+ ("listdir", 1),
+ ("mkdir", 1),
+ ("lstat", 1),
+ ("symlink", 2),
+ ("stat", 1),
+ ],
}
- if hasattr(os, 'scandir'):
+ if hasattr(os, "scandir"):
# py27 does not have scandir
- patch_funcs[os].append(('scandir', 1))
+ patch_funcs[os].append(("scandir", 1))
for (mod, funcs) in patch_funcs.items():
for f, nargs in funcs:
func = getattr(mod, f)
trap_func = retarget_many_wrapper(new_root, nargs, func)
self.patched_funcs.enter_context(
- mock.patch.object(mod, f, trap_func))
+ mock.patch.object(mod, f, trap_func)
+ )
def patchOpen(self, new_root):
trap_func = retarget_many_wrapper(new_root, 1, open)
self.patched_funcs.enter_context(
- mock.patch('builtins.open', trap_func)
+ mock.patch("builtins.open", trap_func)
)
def patchStdoutAndStderr(self, stdout=None, stderr=None):
if stdout is not None:
self.patched_funcs.enter_context(
- mock.patch.object(sys, 'stdout', stdout))
+ mock.patch.object(sys, "stdout", stdout)
+ )
if stderr is not None:
self.patched_funcs.enter_context(
- mock.patch.object(sys, 'stderr', stderr))
+ mock.patch.object(sys, "stderr", stderr)
+ )
def reRoot(self, root=None):
if root is None:
@@ -355,33 +376,32 @@ class HttprettyTestCase(CiTestCase):
# And make sure reset and enable/disable are done.
def setUp(self):
- self.restore_proxy = os.environ.get('http_proxy')
+ self.restore_proxy = os.environ.get("http_proxy")
if self.restore_proxy is not None:
- del os.environ['http_proxy']
+ del os.environ["http_proxy"]
super(HttprettyTestCase, self).setUp()
httpretty.HTTPretty.allow_net_connect = False
httpretty.reset()
httpretty.enable()
# Stop the logging from HttpPretty so our logs don't get mixed
# up with its logs
- logging.getLogger('httpretty.core').setLevel(logging.CRITICAL)
+ logging.getLogger("httpretty.core").setLevel(logging.CRITICAL)
def tearDown(self):
httpretty.disable()
httpretty.reset()
if self.restore_proxy:
- os.environ['http_proxy'] = self.restore_proxy
+ os.environ["http_proxy"] = self.restore_proxy
super(HttprettyTestCase, self).tearDown()
class SchemaTestCaseMixin(unittest.TestCase):
-
def assertSchemaValid(self, cfg, msg="Valid Schema failed validation."):
"""Assert the config is valid per self.schema.
If there is only one top level key in the schema properties, then
the cfg will be put under that key."""
- props = list(self.schema.get('properties'))
+ props = list(self.schema.get("properties"))
# put cfg under top level key if there is only one in the schema
if len(props) == 1:
cfg = {props[0]: cfg}
@@ -402,7 +422,7 @@ def populate_dir(path, files):
if isinstance(content, bytes):
fp.write(content)
else:
- fp.write(content.encode('utf-8'))
+ fp.write(content.encode("utf-8"))
fp.close()
ret.append(p)
@@ -425,7 +445,7 @@ def dir2dict(startdir, prefix=None):
for root, _dirs, files in os.walk(startdir):
for fname in files:
fpath = os.path.join(root, fname)
- key = fpath[len(prefix):]
+ key = fpath[len(prefix) :]
flist[key] = util.load_file(fpath)
return flist
@@ -443,16 +463,16 @@ def wrap_and_call(prefix, mocks, func, *args, **kwargs):
return_value: return from 'func'
"""
- delim = '.'
+ delim = "."
if prefix is None:
- prefix = ''
+ prefix = ""
prefix = prefix.rstrip(delim)
unwraps = []
for fname, kw in mocks.items():
if prefix:
fname = delim.join((prefix, fname))
if not isinstance(kw, dict):
- kw = {'return_value': kw}
+ kw = {"return_value": kw}
p = mock.patch(fname, **kw)
p.start()
unwraps.append(p)
@@ -464,19 +484,20 @@ def wrap_and_call(prefix, mocks, func, *args, **kwargs):
def resourceLocation(subname=None):
- path = cloud_init_project_dir('tests/data')
+ path = cloud_init_project_dir("tests/data")
if not subname:
return path
return os.path.join(path, subname)
-def readResource(name, mode='r'):
+def readResource(name, mode="r"):
with open(resourceLocation(name), mode) as fh:
return fh.read()
try:
import jsonschema
+
assert jsonschema # avoid pyflakes error F401: import unused
_missing_jsonschema_dep = False
except ImportError:
@@ -485,7 +506,8 @@ except ImportError:
def skipUnlessJsonSchema():
return skipIf(
- _missing_jsonschema_dep, "No python-jsonschema dependency present.")
+ _missing_jsonschema_dep, "No python-jsonschema dependency present."
+ )
def skipUnlessJinja():
@@ -497,13 +519,17 @@ def skipIfJinja():
# older versions of mock do not have the useful 'assert_not_called'
-if not hasattr(mock.Mock, 'assert_not_called'):
+if not hasattr(mock.Mock, "assert_not_called"):
+
def __mock_assert_not_called(mmock):
if mmock.call_count != 0:
- msg = ("[citest] Expected '%s' to not have been called. "
- "Called %s times." %
- (mmock._mock_name or 'mock', mmock.call_count))
+ msg = (
+ "[citest] Expected '%s' to not have been called. "
+ "Called %s times."
+ % (mmock._mock_name or "mock", mmock.call_count)
+ )
raise AssertionError(msg)
+
mock.Mock.assert_not_called = __mock_assert_not_called
@@ -524,4 +550,5 @@ def cloud_init_project_dir(sub_path: str) -> str:
"""
return str(get_top_level_dir() / sub_path)
+
# vi: ts=4 expandtab