summaryrefslogtreecommitdiff
path: root/tests/unittests/helpers.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/unittests/helpers.py')
-rw-r--r--tests/unittests/helpers.py554
1 files changed, 554 insertions, 0 deletions
diff --git a/tests/unittests/helpers.py b/tests/unittests/helpers.py
new file mode 100644
index 00000000..67fed8c9
--- /dev/null
+++ b/tests/unittests/helpers.py
@@ -0,0 +1,554 @@
+# This file is part of cloud-init. See LICENSE file for license information.
+
+import functools
+import io
+import logging
+import os
+import random
+import shutil
+import string
+import sys
+import tempfile
+import time
+import unittest
+from contextlib import ExitStack, contextmanager
+from pathlib import Path
+from unittest import mock
+from unittest.util import strclass
+
+import httpretty
+
+import cloudinit
+from cloudinit import cloud, distros
+from cloudinit import helpers as ch
+from cloudinit import subp, util
+from cloudinit.config.schema import (
+ SchemaValidationError,
+ validate_cloudconfig_schema,
+)
+from cloudinit.sources import DataSourceNone
+from cloudinit.templater import JINJA_AVAILABLE
+
+_real_subp = subp.subp
+
+# Used for skipping tests
+SkipTest = unittest.SkipTest
+skipIf = unittest.skipIf
+
+
+# Makes the old path start
+# with new base instead of whatever
+# it previously had
+def rebase_path(old_path, new_base):
+ if old_path.startswith(new_base):
+ # Already handled...
+ return old_path
+ # Retarget the base of that path
+ # to the new base instead of the
+ # old one...
+ path = os.path.join(new_base, old_path.lstrip("/"))
+ path = os.path.abspath(path)
+ return path
+
+
+# Can work on anything that takes a path as arguments
+def retarget_many_wrapper(new_base, am, old_func):
+ def wrapper(*args, **kwds):
+ n_args = list(args)
+ nam = am
+ if am == -1:
+ nam = len(n_args)
+ for i in range(0, nam):
+ path = args[i]
+ # patchOS() wraps various os and os.path functions, however in
+ # Python 3 some of these now accept file-descriptors (integers).
+ # That breaks rebase_path() so in lieu of a better solution, just
+ # don't rebase if we get a fd.
+ if isinstance(path, str):
+ n_args[i] = rebase_path(path, new_base)
+ return old_func(*n_args, **kwds)
+
+ return wrapper
+
+
+class TestCase(unittest.TestCase):
+ def reset_global_state(self):
+ """Reset any global state to its original settings.
+
+ cloudinit caches some values in cloudinit.util. Unit tests that
+ involved those cached paths were then subject to failure if the order
+ of invocation changed (LP: #1703697).
+
+ This function resets any of these global state variables to their
+ initial state.
+
+ In the future this should really be done with some registry that
+ can then be cleaned in a more obvious way.
+ """
+ util.PROC_CMDLINE = None
+ util._DNS_REDIRECT_IP = None
+ util._LSB_RELEASE = {}
+
+ def setUp(self):
+ super(TestCase, self).setUp()
+ self.reset_global_state()
+
+ def shortDescription(self):
+ return strclass(self.__class__) + "." + self._testMethodName
+
+ def add_patch(self, target, attr, *args, **kwargs):
+ """Patches specified target object and sets it as attr on test
+ instance also schedules cleanup"""
+ if "autospec" not in kwargs:
+ kwargs["autospec"] = True
+ m = mock.patch(target, *args, **kwargs)
+ p = m.start()
+ self.addCleanup(m.stop)
+ setattr(self, attr, p)
+
+
+class CiTestCase(TestCase):
+ """This is the preferred test case base class unless user
+ needs other test case classes below."""
+
+ # Subclass overrides for specific test behavior
+ # Whether or not a unit test needs logfile setup
+ with_logs = False
+ allowed_subp = False
+ SUBP_SHELL_TRUE = "shell=true"
+
+ @contextmanager
+ def allow_subp(self, allowed_subp):
+ orig = self.allowed_subp
+ try:
+ self.allowed_subp = allowed_subp
+ yield
+ finally:
+ self.allowed_subp = orig
+
+ def setUp(self):
+ super(CiTestCase, self).setUp()
+ if self.with_logs:
+ # Create a log handler so unit tests can search expected logs.
+ self.logger = logging.getLogger()
+ self.logs = io.StringIO()
+ formatter = logging.Formatter("%(levelname)s: %(message)s")
+ handler = logging.StreamHandler(self.logs)
+ handler.setFormatter(formatter)
+ self.old_handlers = self.logger.handlers
+ self.logger.handlers = [handler]
+ if self.allowed_subp is True:
+ subp.subp = _real_subp
+ else:
+ subp.subp = self._fake_subp
+
+ def _fake_subp(self, *args, **kwargs):
+ if "args" in kwargs:
+ cmd = kwargs["args"]
+ else:
+ if not args:
+ raise TypeError(
+ "subp() missing 1 required positional argument: 'args'"
+ )
+ cmd = args[0]
+
+ if not isinstance(cmd, str):
+ cmd = cmd[0]
+ pass_through = False
+ if not isinstance(self.allowed_subp, (list, bool)):
+ raise TypeError("self.allowed_subp supports list or bool.")
+ if isinstance(self.allowed_subp, bool):
+ pass_through = self.allowed_subp
+ else:
+ pass_through = (cmd in self.allowed_subp) or (
+ self.SUBP_SHELL_TRUE in self.allowed_subp
+ and kwargs.get("shell")
+ )
+ if pass_through:
+ return _real_subp(*args, **kwargs)
+ raise Exception(
+ "called subp. set self.allowed_subp=True to allow\n subp(%s)"
+ % ", ".join(
+ [str(repr(a)) for a in args]
+ + ["%s=%s" % (k, repr(v)) for k, v in kwargs.items()]
+ )
+ )
+
+ def tearDown(self):
+ if self.with_logs:
+ # Remove the handler we setup
+ logging.getLogger().handlers = self.old_handlers
+ logging.getLogger().setLevel(logging.NOTSET)
+ subp.subp = _real_subp
+ super(CiTestCase, self).tearDown()
+
+ def tmp_dir(self, dir=None, cleanup=True):
+ # return a full path to a temporary directory that will be cleaned up.
+ if dir is None:
+ tmpd = tempfile.mkdtemp(prefix="ci-%s." % self.__class__.__name__)
+ else:
+ tmpd = tempfile.mkdtemp(dir=dir)
+ self.addCleanup(
+ functools.partial(shutil.rmtree, tmpd, ignore_errors=True)
+ )
+ return tmpd
+
+ def tmp_path(self, path, dir=None):
+ # return an absolute path to 'path' under dir.
+ # if dir is None, one will be created with tmp_dir()
+ # the file is not created or modified.
+ if dir is None:
+ dir = self.tmp_dir()
+ return os.path.normpath(os.path.abspath(os.path.join(dir, path)))
+
+ def tmp_cloud(self, distro, sys_cfg=None, metadata=None):
+ """Create a cloud with tmp working directory paths.
+
+ @param distro: Name of the distro to attach to the cloud.
+ @param metadata: Optional metadata to set on the datasource.
+
+ @return: The built cloud instance.
+ """
+ self.new_root = self.tmp_dir()
+ if not sys_cfg:
+ sys_cfg = {}
+ tmp_paths = {}
+ for var in ["templates_dir", "run_dir", "cloud_dir"]:
+ tmp_paths[var] = self.tmp_path(var, dir=self.new_root)
+ util.ensure_dir(tmp_paths[var])
+ self.paths = ch.Paths(tmp_paths)
+ cls = distros.fetch(distro)
+ mydist = cls(distro, sys_cfg, self.paths)
+ myds = DataSourceNone.DataSourceNone(sys_cfg, mydist, self.paths)
+ if metadata:
+ myds.metadata.update(metadata)
+ return cloud.Cloud(myds, self.paths, sys_cfg, mydist, None)
+
+ @classmethod
+ def random_string(cls, length=8):
+ """return a random lowercase string with default length of 8"""
+ return "".join(
+ random.choice(string.ascii_lowercase) for _ in range(length)
+ )
+
+
+class ResourceUsingTestCase(CiTestCase):
+ def setUp(self):
+ super(ResourceUsingTestCase, self).setUp()
+ self.resource_path = None
+
+ def getCloudPaths(self, ds=None):
+ tmpdir = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, tmpdir)
+ cp = ch.Paths(
+ {"cloud_dir": tmpdir, "templates_dir": resourceLocation()}, ds=ds
+ )
+ return cp
+
+
+class FilesystemMockingTestCase(ResourceUsingTestCase):
+ def setUp(self):
+ super(FilesystemMockingTestCase, self).setUp()
+ self.patched_funcs = ExitStack()
+
+ def tearDown(self):
+ self.patched_funcs.close()
+ ResourceUsingTestCase.tearDown(self)
+
+ def replicateTestRoot(self, example_root, target_root):
+ real_root = resourceLocation()
+ real_root = os.path.join(real_root, "roots", example_root)
+ for (dir_path, _dirnames, filenames) in os.walk(real_root):
+ real_path = dir_path
+ make_path = rebase_path(real_path[len(real_root) :], target_root)
+ util.ensure_dir(make_path)
+ for f in filenames:
+ real_path = util.abs_join(real_path, f)
+ make_path = util.abs_join(make_path, f)
+ shutil.copy(real_path, make_path)
+
+ def patchUtils(self, new_root):
+ patch_funcs = {
+ util: [
+ ("write_file", 1),
+ ("append_file", 1),
+ ("load_file", 1),
+ ("ensure_dir", 1),
+ ("chmod", 1),
+ ("delete_dir_contents", 1),
+ ("del_file", 1),
+ ("sym_link", -1),
+ ("copy", -1),
+ ],
+ }
+ for (mod, funcs) in patch_funcs.items():
+ for (f, am) in funcs:
+ func = getattr(mod, f)
+ trap_func = retarget_many_wrapper(new_root, am, func)
+ self.patched_funcs.enter_context(
+ mock.patch.object(mod, f, trap_func)
+ )
+
+ # Handle subprocess calls
+ func = getattr(subp, "subp")
+
+ def nsubp(*_args, **_kwargs):
+ return ("", "")
+
+ self.patched_funcs.enter_context(
+ mock.patch.object(subp, "subp", nsubp)
+ )
+
+ def null_func(*_args, **_kwargs):
+ return None
+
+ for f in ["chownbyid", "chownbyname"]:
+ self.patched_funcs.enter_context(
+ mock.patch.object(util, f, null_func)
+ )
+
+ def patchOS(self, new_root):
+ patch_funcs = {
+ os.path: [
+ ("isfile", 1),
+ ("exists", 1),
+ ("islink", 1),
+ ("isdir", 1),
+ ("lexists", 1),
+ ],
+ os: [
+ ("listdir", 1),
+ ("mkdir", 1),
+ ("lstat", 1),
+ ("symlink", 2),
+ ("stat", 1),
+ ],
+ }
+
+ if hasattr(os, "scandir"):
+ # py27 does not have scandir
+ patch_funcs[os].append(("scandir", 1))
+
+ for (mod, funcs) in patch_funcs.items():
+ for f, nargs in funcs:
+ func = getattr(mod, f)
+ trap_func = retarget_many_wrapper(new_root, nargs, func)
+ self.patched_funcs.enter_context(
+ mock.patch.object(mod, f, trap_func)
+ )
+
+ def patchOpen(self, new_root):
+ trap_func = retarget_many_wrapper(new_root, 1, open)
+ self.patched_funcs.enter_context(
+ mock.patch("builtins.open", trap_func)
+ )
+
+ def patchStdoutAndStderr(self, stdout=None, stderr=None):
+ if stdout is not None:
+ self.patched_funcs.enter_context(
+ mock.patch.object(sys, "stdout", stdout)
+ )
+ if stderr is not None:
+ self.patched_funcs.enter_context(
+ mock.patch.object(sys, "stderr", stderr)
+ )
+
+ def reRoot(self, root=None):
+ if root is None:
+ root = self.tmp_dir()
+ self.patchUtils(root)
+ self.patchOS(root)
+ self.patchOpen(root)
+ return root
+
+ @contextmanager
+ def reRooted(self, root=None):
+ try:
+ yield self.reRoot(root)
+ finally:
+ self.patched_funcs.close()
+
+
+class HttprettyTestCase(CiTestCase):
+ # necessary as http_proxy gets in the way of httpretty
+ # https://github.com/gabrielfalcao/HTTPretty/issues/122
+ # Also make sure that allow_net_connect is set to False.
+ # And make sure reset and enable/disable are done.
+
+ def setUp(self):
+ self.restore_proxy = os.environ.get("http_proxy")
+ if self.restore_proxy is not None:
+ del os.environ["http_proxy"]
+ super(HttprettyTestCase, self).setUp()
+ httpretty.HTTPretty.allow_net_connect = False
+ httpretty.reset()
+ httpretty.enable()
+ # Stop the logging from HttpPretty so our logs don't get mixed
+ # up with its logs
+ logging.getLogger("httpretty.core").setLevel(logging.CRITICAL)
+
+ def tearDown(self):
+ httpretty.disable()
+ httpretty.reset()
+ if self.restore_proxy:
+ os.environ["http_proxy"] = self.restore_proxy
+ super(HttprettyTestCase, self).tearDown()
+
+
+class SchemaTestCaseMixin(unittest.TestCase):
+ def assertSchemaValid(self, cfg, msg="Valid Schema failed validation."):
+ """Assert the config is valid per self.schema.
+
+ If there is only one top level key in the schema properties, then
+ the cfg will be put under that key."""
+ props = list(self.schema.get("properties"))
+ # put cfg under top level key if there is only one in the schema
+ if len(props) == 1:
+ cfg = {props[0]: cfg}
+ try:
+ validate_cloudconfig_schema(cfg, self.schema, strict=True)
+ except SchemaValidationError:
+ self.fail(msg)
+
+
+def populate_dir(path, files):
+ if not os.path.exists(path):
+ os.makedirs(path)
+ ret = []
+ for (name, content) in files.items():
+ p = os.path.sep.join([path, name])
+ util.ensure_dir(os.path.dirname(p))
+ with open(p, "wb") as fp:
+ if isinstance(content, bytes):
+ fp.write(content)
+ else:
+ fp.write(content.encode("utf-8"))
+ fp.close()
+ ret.append(p)
+
+ return ret
+
+
+def populate_dir_with_ts(path, data):
+ """data is {'file': ('contents', mtime)}. mtime relative to now."""
+ populate_dir(path, dict((k, v[0]) for k, v in data.items()))
+ btime = time.time()
+ for fpath, (_contents, mtime) in data.items():
+ ts = btime + mtime if mtime else btime
+ os.utime(os.path.sep.join((path, fpath)), (ts, ts))
+
+
+def dir2dict(startdir, prefix=None):
+ flist = {}
+ if prefix is None:
+ prefix = startdir
+ for root, _dirs, files in os.walk(startdir):
+ for fname in files:
+ fpath = os.path.join(root, fname)
+ key = fpath[len(prefix) :]
+ flist[key] = util.load_file(fpath)
+ return flist
+
+
+def wrap_and_call(prefix, mocks, func, *args, **kwargs):
+ """
+ call func(args, **kwargs) with mocks applied, then unapplies mocks
+ nicer to read than repeating dectorators on each function
+
+ prefix: prefix for mock names (e.g. 'cloudinit.stages.util') or None
+ mocks: dictionary of names (under 'prefix') to mock and either
+ a return value or a dictionary to pass to the mock.patch call
+ func: function to call with mocks applied
+ *args,**kwargs: arguments for 'func'
+
+ return_value: return from 'func'
+ """
+ delim = "."
+ if prefix is None:
+ prefix = ""
+ prefix = prefix.rstrip(delim)
+ unwraps = []
+ for fname, kw in mocks.items():
+ if prefix:
+ fname = delim.join((prefix, fname))
+ if not isinstance(kw, dict):
+ kw = {"return_value": kw}
+ p = mock.patch(fname, **kw)
+ p.start()
+ unwraps.append(p)
+ try:
+ return func(*args, **kwargs)
+ finally:
+ for p in unwraps:
+ p.stop()
+
+
+def resourceLocation(subname=None):
+ path = cloud_init_project_dir("tests/data")
+ if not subname:
+ return path
+ return os.path.join(path, subname)
+
+
+def readResource(name, mode="r"):
+ with open(resourceLocation(name), mode) as fh:
+ return fh.read()
+
+
+try:
+ import jsonschema
+
+ assert jsonschema # avoid pyflakes error F401: import unused
+ _missing_jsonschema_dep = False
+except ImportError:
+ _missing_jsonschema_dep = True
+
+
+def skipUnlessJsonSchema():
+ return skipIf(
+ _missing_jsonschema_dep, "No python-jsonschema dependency present."
+ )
+
+
+def skipUnlessJinja():
+ return skipIf(not JINJA_AVAILABLE, "No jinja dependency present.")
+
+
+def skipIfJinja():
+ return skipIf(JINJA_AVAILABLE, "Jinja dependency present.")
+
+
+# older versions of mock do not have the useful 'assert_not_called'
+if not hasattr(mock.Mock, "assert_not_called"):
+
+ def __mock_assert_not_called(mmock):
+ if mmock.call_count != 0:
+ msg = (
+ "[citest] Expected '%s' to not have been called. "
+ "Called %s times."
+ % (mmock._mock_name or "mock", mmock.call_count)
+ )
+ raise AssertionError(msg)
+
+ mock.Mock.assert_not_called = __mock_assert_not_called
+
+
+def get_top_level_dir() -> Path:
+ """Return the absolute path to the top cloudinit project directory
+
+ @return Path('<top-cloudinit-dir>')
+ """
+ return Path(cloudinit.__file__).parent.parent.resolve()
+
+
+def cloud_init_project_dir(sub_path: str) -> str:
+ """Get a path within the cloudinit project directory
+
+ @return str of the combined path
+
+ Example: cloud_init_project_dir("my/path") -> "/path/to/cloud-init/my/path"
+ """
+ return str(get_top_level_dir() / sub_path)
+
+
+# vi: ts=4 expandtab