summaryrefslogtreecommitdiff
path: root/cloudinit/tests/helpers.py
diff options
context:
space:
mode:
authorChad Smith <chad.smith@canonical.com>2018-10-03 12:10:23 -0600
committerChad Smith <chad.smith@canonical.com>2018-10-03 12:10:23 -0600
commitd6347e1c439eda7f43d9620dac2b461e980e1ae9 (patch)
tree08410263488d11a2a29edcc620575ed1b028100e /cloudinit/tests/helpers.py
parent564793a76b9c9add1ee81bab4919c8dccd45a33d (diff)
parente28000457591bde9f22d6b7a538b1fc33349d780 (diff)
downloadvyos-cloud-init-d6347e1c439eda7f43d9620dac2b461e980e1ae9.tar.gz
vyos-cloud-init-d6347e1c439eda7f43d9620dac2b461e980e1ae9.zip
merge from master at 18.4
Diffstat (limited to 'cloudinit/tests/helpers.py')
-rw-r--r--cloudinit/tests/helpers.py110
1 files changed, 92 insertions, 18 deletions
diff --git a/cloudinit/tests/helpers.py b/cloudinit/tests/helpers.py
index 5bfe7fa4..2eb7b0cd 100644
--- a/cloudinit/tests/helpers.py
+++ b/cloudinit/tests/helpers.py
@@ -10,16 +10,16 @@ import shutil
import sys
import tempfile
import time
-import unittest
import mock
import six
import unittest2
+from unittest2.util import strclass
try:
- from contextlib import ExitStack
+ from contextlib import ExitStack, contextmanager
except ImportError:
- from contextlib2 import ExitStack
+ from contextlib2 import ExitStack, contextmanager
try:
from configparser import ConfigParser
@@ -28,11 +28,18 @@ except ImportError:
from cloudinit.config.schema import (
SchemaValidationError, validate_cloudconfig_schema)
+from cloudinit import cloud
+from cloudinit import distros
from cloudinit import helpers as ch
+from cloudinit.sources import DataSourceNone
+from cloudinit.templater import JINJA_AVAILABLE
from cloudinit import util
+_real_subp = util.subp
+
# Used for skipping tests
SkipTest = unittest2.SkipTest
+skipIf = unittest2.skipIf
# Used for detecting different python versions
PY2 = False
@@ -112,6 +119,9 @@ class TestCase(unittest2.TestCase):
super(TestCase, self).setUp()
self.reset_global_state()
+ def shortDescription(self):
+ return strclass(self.__class__) + '.' + self._testMethodName
+
def add_patch(self, target, attr, *args, **kwargs):
"""Patches specified target object and sets it as attr on test
instance also schedules cleanup"""
@@ -140,6 +150,17 @@ class CiTestCase(TestCase):
# Subclass overrides for specific test behavior
# Whether or not a unit test needs logfile setup
with_logs = False
+ allowed_subp = False
+ SUBP_SHELL_TRUE = "shell=true"
+
+ @contextmanager
+ def allow_subp(self, allowed_subp):
+ orig = self.allowed_subp
+ try:
+ self.allowed_subp = allowed_subp
+ yield
+ finally:
+ self.allowed_subp = orig
def setUp(self):
super(CiTestCase, self).setUp()
@@ -152,11 +173,41 @@ class CiTestCase(TestCase):
handler.setFormatter(formatter)
self.old_handlers = self.logger.handlers
self.logger.handlers = [handler]
+ if self.allowed_subp is True:
+ util.subp = _real_subp
+ else:
+ util.subp = self._fake_subp
+
+ def _fake_subp(self, *args, **kwargs):
+ if 'args' in kwargs:
+ cmd = kwargs['args']
+ else:
+ cmd = args[0]
+
+ if not isinstance(cmd, six.string_types):
+ cmd = cmd[0]
+ pass_through = False
+ if not isinstance(self.allowed_subp, (list, bool)):
+ raise TypeError("self.allowed_subp supports list or bool.")
+ if isinstance(self.allowed_subp, bool):
+ pass_through = self.allowed_subp
+ else:
+ pass_through = (
+ (cmd in self.allowed_subp) or
+ (self.SUBP_SHELL_TRUE in self.allowed_subp and
+ kwargs.get('shell')))
+ if pass_through:
+ return _real_subp(*args, **kwargs)
+ raise Exception(
+ "called subp. set self.allowed_subp=True to allow\n subp(%s)" %
+ ', '.join([str(repr(a)) for a in args] +
+ ["%s=%s" % (k, repr(v)) for k, v in kwargs.items()]))
def tearDown(self):
if self.with_logs:
# Remove the handler we setup
logging.getLogger().handlers = self.old_handlers
+ util.subp = _real_subp
super(CiTestCase, self).tearDown()
def tmp_dir(self, dir=None, cleanup=True):
@@ -187,6 +238,29 @@ class CiTestCase(TestCase):
"""
raise SystemExit(code)
+ def tmp_cloud(self, distro, sys_cfg=None, metadata=None):
+ """Create a cloud with tmp working directory paths.
+
+ @param distro: Name of the distro to attach to the cloud.
+ @param metadata: Optional metadata to set on the datasource.
+
+ @return: The built cloud instance.
+ """
+ self.new_root = self.tmp_dir()
+ if not sys_cfg:
+ sys_cfg = {}
+ tmp_paths = {}
+ for var in ['templates_dir', 'run_dir', 'cloud_dir']:
+ tmp_paths[var] = self.tmp_path(var, dir=self.new_root)
+ util.ensure_dir(tmp_paths[var])
+ self.paths = ch.Paths(tmp_paths)
+ cls = distros.fetch(distro)
+ mydist = cls(distro, sys_cfg, self.paths)
+ myds = DataSourceNone.DataSourceNone(sys_cfg, mydist, self.paths)
+ if metadata:
+ myds.metadata.update(metadata)
+ return cloud.Cloud(myds, self.paths, sys_cfg, mydist, None)
+
class ResourceUsingTestCase(CiTestCase):
@@ -300,6 +374,13 @@ class FilesystemMockingTestCase(ResourceUsingTestCase):
self.patchOS(root)
return root
+ @contextmanager
+ def reRooted(self, root=None):
+ try:
+ yield self.reRoot(root)
+ finally:
+ self.patched_funcs.close()
+
class HttprettyTestCase(CiTestCase):
# necessary as http_proxy gets in the way of httpretty
@@ -426,21 +507,6 @@ def readResource(name, mode='r'):
try:
- skipIf = unittest.skipIf
-except AttributeError:
- # Python 2.6. Doesn't have to be high fidelity.
- def skipIf(condition, reason):
- def decorator(func):
- def wrapper(*args, **kws):
- if condition:
- return func(*args, **kws)
- else:
- print(reason, file=sys.stderr)
- return wrapper
- return decorator
-
-
-try:
import jsonschema
assert jsonschema # avoid pyflakes error F401: import unused
_missing_jsonschema_dep = False
@@ -453,6 +519,14 @@ def skipUnlessJsonSchema():
_missing_jsonschema_dep, "No python-jsonschema dependency present.")
+def skipUnlessJinja():
+ return skipIf(not JINJA_AVAILABLE, "No jinja dependency present.")
+
+
+def skipIfJinja():
+ return skipIf(JINJA_AVAILABLE, "Jinja dependency present.")
+
+
# older versions of mock do not have the useful 'assert_not_called'
if not hasattr(mock.Mock, 'assert_not_called'):
def __mock_assert_not_called(mmock):