diff options
Diffstat (limited to 'cloudinit/tests/helpers.py')
-rw-r--r-- | cloudinit/tests/helpers.py | 504 |
1 files changed, 0 insertions, 504 deletions
diff --git a/cloudinit/tests/helpers.py b/cloudinit/tests/helpers.py deleted file mode 100644 index 58f63b69..00000000 --- a/cloudinit/tests/helpers.py +++ /dev/null @@ -1,504 +0,0 @@ -# This file is part of cloud-init. See LICENSE file for license information. - -import functools -import httpretty -import io -import logging -import os -import random -import shutil -import string -import sys -import tempfile -import time -import unittest -from contextlib import ExitStack, contextmanager -from unittest import mock -from unittest.util import strclass - -from cloudinit.config.schema import ( - SchemaValidationError, validate_cloudconfig_schema) -from cloudinit import cloud -from cloudinit import distros -from cloudinit import helpers as ch -from cloudinit.sources import DataSourceNone -from cloudinit.templater import JINJA_AVAILABLE -from cloudinit import subp -from cloudinit import util - -_real_subp = subp.subp - -# Used for skipping tests -SkipTest = unittest.SkipTest -skipIf = unittest.skipIf - - -# Makes the old path start -# with new base instead of whatever -# it previously had -def rebase_path(old_path, new_base): - if old_path.startswith(new_base): - # Already handled... - return old_path - # Retarget the base of that path - # to the new base instead of the - # old one... - path = os.path.join(new_base, old_path.lstrip("/")) - path = os.path.abspath(path) - return path - - -# Can work on anything that takes a path as arguments -def retarget_many_wrapper(new_base, am, old_func): - def wrapper(*args, **kwds): - n_args = list(args) - nam = am - if am == -1: - nam = len(n_args) - for i in range(0, nam): - path = args[i] - # patchOS() wraps various os and os.path functions, however in - # Python 3 some of these now accept file-descriptors (integers). - # That breaks rebase_path() so in lieu of a better solution, just - # don't rebase if we get a fd. - if isinstance(path, str): - n_args[i] = rebase_path(path, new_base) - return old_func(*n_args, **kwds) - return wrapper - - -class TestCase(unittest.TestCase): - - def reset_global_state(self): - """Reset any global state to its original settings. - - cloudinit caches some values in cloudinit.util. Unit tests that - involved those cached paths were then subject to failure if the order - of invocation changed (LP: #1703697). - - This function resets any of these global state variables to their - initial state. - - In the future this should really be done with some registry that - can then be cleaned in a more obvious way. - """ - util.PROC_CMDLINE = None - util._DNS_REDIRECT_IP = None - util._LSB_RELEASE = {} - - def setUp(self): - super(TestCase, self).setUp() - self.reset_global_state() - - def shortDescription(self): - return strclass(self.__class__) + '.' + self._testMethodName - - def add_patch(self, target, attr, *args, **kwargs): - """Patches specified target object and sets it as attr on test - instance also schedules cleanup""" - if 'autospec' not in kwargs: - kwargs['autospec'] = True - m = mock.patch(target, *args, **kwargs) - p = m.start() - self.addCleanup(m.stop) - setattr(self, attr, p) - - -class CiTestCase(TestCase): - """This is the preferred test case base class unless user - needs other test case classes below.""" - - # Subclass overrides for specific test behavior - # Whether or not a unit test needs logfile setup - with_logs = False - allowed_subp = False - SUBP_SHELL_TRUE = "shell=true" - - @contextmanager - def allow_subp(self, allowed_subp): - orig = self.allowed_subp - try: - self.allowed_subp = allowed_subp - yield - finally: - self.allowed_subp = orig - - def setUp(self): - super(CiTestCase, self).setUp() - if self.with_logs: - # Create a log handler so unit tests can search expected logs. - self.logger = logging.getLogger() - self.logs = io.StringIO() - formatter = logging.Formatter('%(levelname)s: %(message)s') - handler = logging.StreamHandler(self.logs) - handler.setFormatter(formatter) - self.old_handlers = self.logger.handlers - self.logger.handlers = [handler] - if self.allowed_subp is True: - subp.subp = _real_subp - else: - subp.subp = self._fake_subp - - def _fake_subp(self, *args, **kwargs): - if 'args' in kwargs: - cmd = kwargs['args'] - else: - if not args: - raise TypeError( - "subp() missing 1 required positional argument: 'args'") - cmd = args[0] - - if not isinstance(cmd, str): - cmd = cmd[0] - pass_through = False - if not isinstance(self.allowed_subp, (list, bool)): - raise TypeError("self.allowed_subp supports list or bool.") - if isinstance(self.allowed_subp, bool): - pass_through = self.allowed_subp - else: - pass_through = ( - (cmd in self.allowed_subp) or - (self.SUBP_SHELL_TRUE in self.allowed_subp and - kwargs.get('shell'))) - if pass_through: - return _real_subp(*args, **kwargs) - raise Exception( - "called subp. set self.allowed_subp=True to allow\n subp(%s)" % - ', '.join([str(repr(a)) for a in args] + - ["%s=%s" % (k, repr(v)) for k, v in kwargs.items()])) - - def tearDown(self): - if self.with_logs: - # Remove the handler we setup - logging.getLogger().handlers = self.old_handlers - logging.getLogger().level = None - subp.subp = _real_subp - super(CiTestCase, self).tearDown() - - def tmp_dir(self, dir=None, cleanup=True): - # return a full path to a temporary directory that will be cleaned up. - if dir is None: - tmpd = tempfile.mkdtemp( - prefix="ci-%s." % self.__class__.__name__) - else: - tmpd = tempfile.mkdtemp(dir=dir) - self.addCleanup( - functools.partial(shutil.rmtree, tmpd, ignore_errors=True)) - return tmpd - - def tmp_path(self, path, dir=None): - # return an absolute path to 'path' under dir. - # if dir is None, one will be created with tmp_dir() - # the file is not created or modified. - if dir is None: - dir = self.tmp_dir() - return os.path.normpath(os.path.abspath(os.path.join(dir, path))) - - def tmp_cloud(self, distro, sys_cfg=None, metadata=None): - """Create a cloud with tmp working directory paths. - - @param distro: Name of the distro to attach to the cloud. - @param metadata: Optional metadata to set on the datasource. - - @return: The built cloud instance. - """ - self.new_root = self.tmp_dir() - if not sys_cfg: - sys_cfg = {} - tmp_paths = {} - for var in ['templates_dir', 'run_dir', 'cloud_dir']: - tmp_paths[var] = self.tmp_path(var, dir=self.new_root) - util.ensure_dir(tmp_paths[var]) - self.paths = ch.Paths(tmp_paths) - cls = distros.fetch(distro) - mydist = cls(distro, sys_cfg, self.paths) - myds = DataSourceNone.DataSourceNone(sys_cfg, mydist, self.paths) - if metadata: - myds.metadata.update(metadata) - return cloud.Cloud(myds, self.paths, sys_cfg, mydist, None) - - @classmethod - def random_string(cls, length=8): - """ return a random lowercase string with default length of 8""" - return ''.join( - random.choice(string.ascii_lowercase) for _ in range(length)) - - -class ResourceUsingTestCase(CiTestCase): - - def setUp(self): - super(ResourceUsingTestCase, self).setUp() - self.resource_path = None - - def getCloudPaths(self, ds=None): - tmpdir = tempfile.mkdtemp() - self.addCleanup(shutil.rmtree, tmpdir) - cp = ch.Paths({'cloud_dir': tmpdir, - 'templates_dir': resourceLocation()}, - ds=ds) - return cp - - -class FilesystemMockingTestCase(ResourceUsingTestCase): - - def setUp(self): - super(FilesystemMockingTestCase, self).setUp() - self.patched_funcs = ExitStack() - - def tearDown(self): - self.patched_funcs.close() - ResourceUsingTestCase.tearDown(self) - - def replicateTestRoot(self, example_root, target_root): - real_root = resourceLocation() - real_root = os.path.join(real_root, 'roots', example_root) - for (dir_path, _dirnames, filenames) in os.walk(real_root): - real_path = dir_path - make_path = rebase_path(real_path[len(real_root):], target_root) - util.ensure_dir(make_path) - for f in filenames: - real_path = util.abs_join(real_path, f) - make_path = util.abs_join(make_path, f) - shutil.copy(real_path, make_path) - - def patchUtils(self, new_root): - patch_funcs = { - util: [('write_file', 1), - ('append_file', 1), - ('load_file', 1), - ('ensure_dir', 1), - ('chmod', 1), - ('delete_dir_contents', 1), - ('del_file', 1), - ('sym_link', -1), - ('copy', -1)], - } - for (mod, funcs) in patch_funcs.items(): - for (f, am) in funcs: - func = getattr(mod, f) - trap_func = retarget_many_wrapper(new_root, am, func) - self.patched_funcs.enter_context( - mock.patch.object(mod, f, trap_func)) - - # Handle subprocess calls - func = getattr(subp, 'subp') - - def nsubp(*_args, **_kwargs): - return ('', '') - - self.patched_funcs.enter_context( - mock.patch.object(subp, 'subp', nsubp)) - - def null_func(*_args, **_kwargs): - return None - - for f in ['chownbyid', 'chownbyname']: - self.patched_funcs.enter_context( - mock.patch.object(util, f, null_func)) - - def patchOS(self, new_root): - patch_funcs = { - os.path: [('isfile', 1), ('exists', 1), - ('islink', 1), ('isdir', 1), ('lexists', 1)], - os: [('listdir', 1), ('mkdir', 1), - ('lstat', 1), ('symlink', 2), - ('stat', 1)] - } - - if hasattr(os, 'scandir'): - # py27 does not have scandir - patch_funcs[os].append(('scandir', 1)) - - for (mod, funcs) in patch_funcs.items(): - for f, nargs in funcs: - func = getattr(mod, f) - trap_func = retarget_many_wrapper(new_root, nargs, func) - self.patched_funcs.enter_context( - mock.patch.object(mod, f, trap_func)) - - def patchOpen(self, new_root): - trap_func = retarget_many_wrapper(new_root, 1, open) - self.patched_funcs.enter_context( - mock.patch('builtins.open', trap_func) - ) - - def patchStdoutAndStderr(self, stdout=None, stderr=None): - if stdout is not None: - self.patched_funcs.enter_context( - mock.patch.object(sys, 'stdout', stdout)) - if stderr is not None: - self.patched_funcs.enter_context( - mock.patch.object(sys, 'stderr', stderr)) - - def reRoot(self, root=None): - if root is None: - root = self.tmp_dir() - self.patchUtils(root) - self.patchOS(root) - self.patchOpen(root) - return root - - @contextmanager - def reRooted(self, root=None): - try: - yield self.reRoot(root) - finally: - self.patched_funcs.close() - - -class HttprettyTestCase(CiTestCase): - # necessary as http_proxy gets in the way of httpretty - # https://github.com/gabrielfalcao/HTTPretty/issues/122 - # Also make sure that allow_net_connect is set to False. - # And make sure reset and enable/disable are done. - - def setUp(self): - self.restore_proxy = os.environ.get('http_proxy') - if self.restore_proxy is not None: - del os.environ['http_proxy'] - super(HttprettyTestCase, self).setUp() - httpretty.HTTPretty.allow_net_connect = False - httpretty.reset() - httpretty.enable() - - def tearDown(self): - httpretty.disable() - httpretty.reset() - if self.restore_proxy: - os.environ['http_proxy'] = self.restore_proxy - super(HttprettyTestCase, self).tearDown() - - -class SchemaTestCaseMixin(unittest.TestCase): - - def assertSchemaValid(self, cfg, msg="Valid Schema failed validation."): - """Assert the config is valid per self.schema. - - If there is only one top level key in the schema properties, then - the cfg will be put under that key.""" - props = list(self.schema.get('properties')) - # put cfg under top level key if there is only one in the schema - if len(props) == 1: - cfg = {props[0]: cfg} - try: - validate_cloudconfig_schema(cfg, self.schema, strict=True) - except SchemaValidationError: - self.fail(msg) - - -def populate_dir(path, files): - if not os.path.exists(path): - os.makedirs(path) - ret = [] - for (name, content) in files.items(): - p = os.path.sep.join([path, name]) - util.ensure_dir(os.path.dirname(p)) - with open(p, "wb") as fp: - if isinstance(content, bytes): - fp.write(content) - else: - fp.write(content.encode('utf-8')) - fp.close() - ret.append(p) - - return ret - - -def populate_dir_with_ts(path, data): - """data is {'file': ('contents', mtime)}. mtime relative to now.""" - populate_dir(path, dict((k, v[0]) for k, v in data.items())) - btime = time.time() - for fpath, (_contents, mtime) in data.items(): - ts = btime + mtime if mtime else btime - os.utime(os.path.sep.join((path, fpath)), (ts, ts)) - - -def dir2dict(startdir, prefix=None): - flist = {} - if prefix is None: - prefix = startdir - for root, _dirs, files in os.walk(startdir): - for fname in files: - fpath = os.path.join(root, fname) - key = fpath[len(prefix):] - flist[key] = util.load_file(fpath) - return flist - - -def wrap_and_call(prefix, mocks, func, *args, **kwargs): - """ - call func(args, **kwargs) with mocks applied, then unapplies mocks - nicer to read than repeating dectorators on each function - - prefix: prefix for mock names (e.g. 'cloudinit.stages.util') or None - mocks: dictionary of names (under 'prefix') to mock and either - a return value or a dictionary to pass to the mock.patch call - func: function to call with mocks applied - *args,**kwargs: arguments for 'func' - - return_value: return from 'func' - """ - delim = '.' - if prefix is None: - prefix = '' - prefix = prefix.rstrip(delim) - unwraps = [] - for fname, kw in mocks.items(): - if prefix: - fname = delim.join((prefix, fname)) - if not isinstance(kw, dict): - kw = {'return_value': kw} - p = mock.patch(fname, **kw) - p.start() - unwraps.append(p) - try: - return func(*args, **kwargs) - finally: - for p in unwraps: - p.stop() - - -def resourceLocation(subname=None): - path = os.path.join('tests', 'data') - if not subname: - return path - return os.path.join(path, subname) - - -def readResource(name, mode='r'): - with open(resourceLocation(name), mode) as fh: - return fh.read() - - -try: - import jsonschema - assert jsonschema # avoid pyflakes error F401: import unused - _missing_jsonschema_dep = False -except ImportError: - _missing_jsonschema_dep = True - - -def skipUnlessJsonSchema(): - return skipIf( - _missing_jsonschema_dep, "No python-jsonschema dependency present.") - - -def skipUnlessJinja(): - return skipIf(not JINJA_AVAILABLE, "No jinja dependency present.") - - -def skipIfJinja(): - return skipIf(JINJA_AVAILABLE, "Jinja dependency present.") - - -# older versions of mock do not have the useful 'assert_not_called' -if not hasattr(mock.Mock, 'assert_not_called'): - def __mock_assert_not_called(mmock): - if mmock.call_count != 0: - msg = ("[citest] Expected '%s' to not have been called. " - "Called %s times." % - (mmock._mock_name or 'mock', mmock.call_count)) - raise AssertionError(msg) - mock.Mock.assert_not_called = __mock_assert_not_called - -# vi: ts=4 expandtab |