summaryrefslogtreecommitdiff
path: root/cloudinit/tests/helpers.py
diff options
context:
space:
mode:
Diffstat (limited to 'cloudinit/tests/helpers.py')
-rw-r--r--cloudinit/tests/helpers.py43
1 files changed, 43 insertions, 0 deletions
diff --git a/cloudinit/tests/helpers.py b/cloudinit/tests/helpers.py
index 9a21426e..e7e4182f 100644
--- a/cloudinit/tests/helpers.py
+++ b/cloudinit/tests/helpers.py
@@ -33,6 +33,8 @@ from cloudinit import helpers as ch
from cloudinit.sources import DataSourceNone
from cloudinit import util
+_real_subp = util.subp
+
# Used for skipping tests
SkipTest = unittest2.SkipTest
skipIf = unittest2.skipIf
@@ -143,6 +145,17 @@ class CiTestCase(TestCase):
# Subclass overrides for specific test behavior
# Whether or not a unit test needs logfile setup
with_logs = False
+ allowed_subp = False
+ SUBP_SHELL_TRUE = "shell=true"
+
+ @contextmanager
+ def allow_subp(self, allowed_subp):
+ orig = self.allowed_subp
+ try:
+ self.allowed_subp = allowed_subp
+ yield
+ finally:
+ self.allowed_subp = orig
def setUp(self):
super(CiTestCase, self).setUp()
@@ -155,11 +168,41 @@ class CiTestCase(TestCase):
handler.setFormatter(formatter)
self.old_handlers = self.logger.handlers
self.logger.handlers = [handler]
+ if self.allowed_subp is True:
+ util.subp = _real_subp
+ else:
+ util.subp = self._fake_subp
+
+ def _fake_subp(self, *args, **kwargs):
+ if 'args' in kwargs:
+ cmd = kwargs['args']
+ else:
+ cmd = args[0]
+
+ if not isinstance(cmd, six.string_types):
+ cmd = cmd[0]
+ pass_through = False
+ if not isinstance(self.allowed_subp, (list, bool)):
+ raise TypeError("self.allowed_subp supports list or bool.")
+ if isinstance(self.allowed_subp, bool):
+ pass_through = self.allowed_subp
+ else:
+ pass_through = (
+ (cmd in self.allowed_subp) or
+ (self.SUBP_SHELL_TRUE in self.allowed_subp and
+ kwargs.get('shell')))
+ if pass_through:
+ return _real_subp(*args, **kwargs)
+ raise Exception(
+ "called subp. set self.allowed_subp=True to allow\n subp(%s)" %
+ ', '.join([str(repr(a)) for a in args] +
+ ["%s=%s" % (k, repr(v)) for k, v in kwargs.items()]))
def tearDown(self):
if self.with_logs:
# Remove the handler we setup
logging.getLogger().handlers = self.old_handlers
+ util.subp = _real_subp
super(CiTestCase, self).tearDown()
def tmp_dir(self, dir=None, cleanup=True):