summaryrefslogtreecommitdiff
path: root/cloudinit/tests/helpers.py
diff options
context:
space:
mode:
Diffstat (limited to 'cloudinit/tests/helpers.py')
-rw-r--r--cloudinit/tests/helpers.py86
1 files changed, 56 insertions, 30 deletions
diff --git a/cloudinit/tests/helpers.py b/cloudinit/tests/helpers.py
index 999b1d7c..5bfe7fa4 100644
--- a/cloudinit/tests/helpers.py
+++ b/cloudinit/tests/helpers.py
@@ -3,11 +3,13 @@
from __future__ import print_function
import functools
+import httpretty
import logging
import os
import shutil
import sys
import tempfile
+import time
import unittest
import mock
@@ -24,6 +26,8 @@ try:
except ImportError:
from ConfigParser import ConfigParser
+from cloudinit.config.schema import (
+ SchemaValidationError, validate_cloudconfig_schema)
from cloudinit import helpers as ch
from cloudinit import util
@@ -108,12 +112,12 @@ class TestCase(unittest2.TestCase):
super(TestCase, self).setUp()
self.reset_global_state()
- def add_patch(self, target, attr, **kwargs):
+ def add_patch(self, target, attr, *args, **kwargs):
"""Patches specified target object and sets it as attr on test
instance also schedules cleanup"""
if 'autospec' not in kwargs:
kwargs['autospec'] = True
- m = mock.patch(target, **kwargs)
+ m = mock.patch(target, *args, **kwargs)
p = m.start()
self.addCleanup(m.stop)
setattr(self, attr, p)
@@ -190,35 +194,11 @@ class ResourceUsingTestCase(CiTestCase):
super(ResourceUsingTestCase, self).setUp()
self.resource_path = None
- def resourceLocation(self, subname=None):
- if self.resource_path is None:
- paths = [
- os.path.join('tests', 'data'),
- os.path.join('data'),
- os.path.join(os.pardir, 'tests', 'data'),
- os.path.join(os.pardir, 'data'),
- ]
- for p in paths:
- if os.path.isdir(p):
- self.resource_path = p
- break
- self.assertTrue((self.resource_path and
- os.path.isdir(self.resource_path)),
- msg="Unable to locate test resource data path!")
- if not subname:
- return self.resource_path
- return os.path.join(self.resource_path, subname)
-
- def readResource(self, name):
- where = self.resourceLocation(name)
- with open(where, 'r') as fh:
- return fh.read()
-
def getCloudPaths(self, ds=None):
tmpdir = tempfile.mkdtemp()
self.addCleanup(shutil.rmtree, tmpdir)
cp = ch.Paths({'cloud_dir': tmpdir,
- 'templates_dir': self.resourceLocation()},
+ 'templates_dir': resourceLocation()},
ds=ds)
return cp
@@ -234,7 +214,7 @@ class FilesystemMockingTestCase(ResourceUsingTestCase):
ResourceUsingTestCase.tearDown(self)
def replicateTestRoot(self, example_root, target_root):
- real_root = self.resourceLocation()
+ real_root = resourceLocation()
real_root = os.path.join(real_root, 'roots', example_root)
for (dir_path, _dirnames, filenames) in os.walk(real_root):
real_path = dir_path
@@ -285,7 +265,8 @@ class FilesystemMockingTestCase(ResourceUsingTestCase):
os.path: [('isfile', 1), ('exists', 1),
('islink', 1), ('isdir', 1), ('lexists', 1)],
os: [('listdir', 1), ('mkdir', 1),
- ('lstat', 1), ('symlink', 2)]
+ ('lstat', 1), ('symlink', 2),
+ ('stat', 1)]
}
if hasattr(os, 'scandir'):
@@ -323,19 +304,43 @@ class FilesystemMockingTestCase(ResourceUsingTestCase):
class HttprettyTestCase(CiTestCase):
# necessary as http_proxy gets in the way of httpretty
# https://github.com/gabrielfalcao/HTTPretty/issues/122
+ # Also make sure that allow_net_connect is set to False.
+ # And make sure reset and enable/disable are done.
def setUp(self):
self.restore_proxy = os.environ.get('http_proxy')
if self.restore_proxy is not None:
del os.environ['http_proxy']
super(HttprettyTestCase, self).setUp()
+ httpretty.HTTPretty.allow_net_connect = False
+ httpretty.reset()
+ httpretty.enable()
def tearDown(self):
+ httpretty.disable()
+ httpretty.reset()
if self.restore_proxy:
os.environ['http_proxy'] = self.restore_proxy
super(HttprettyTestCase, self).tearDown()
+class SchemaTestCaseMixin(unittest2.TestCase):
+
+ def assertSchemaValid(self, cfg, msg="Valid Schema failed validation."):
+ """Assert the config is valid per self.schema.
+
+ If there is only one top level key in the schema properties, then
+ the cfg will be put under that key."""
+ props = list(self.schema.get('properties'))
+ # put cfg under top level key if there is only one in the schema
+ if len(props) == 1:
+ cfg = {props[0]: cfg}
+ try:
+ validate_cloudconfig_schema(cfg, self.schema, strict=True)
+ except SchemaValidationError:
+ self.fail(msg)
+
+
def populate_dir(path, files):
if not os.path.exists(path):
os.makedirs(path)
@@ -354,11 +359,20 @@ def populate_dir(path, files):
return ret
+def populate_dir_with_ts(path, data):
+ """data is {'file': ('contents', mtime)}. mtime relative to now."""
+ populate_dir(path, dict((k, v[0]) for k, v in data.items()))
+ btime = time.time()
+ for fpath, (_contents, mtime) in data.items():
+ ts = btime + mtime if mtime else btime
+ os.utime(os.path.sep.join((path, fpath)), (ts, ts))
+
+
def dir2dict(startdir, prefix=None):
flist = {}
if prefix is None:
prefix = startdir
- for root, dirs, files in os.walk(startdir):
+ for root, _dirs, files in os.walk(startdir):
for fname in files:
fpath = os.path.join(root, fname)
key = fpath[len(prefix):]
@@ -399,6 +413,18 @@ def wrap_and_call(prefix, mocks, func, *args, **kwargs):
p.stop()
+def resourceLocation(subname=None):
+ path = os.path.join('tests', 'data')
+ if not subname:
+ return path
+ return os.path.join(path, subname)
+
+
+def readResource(name, mode='r'):
+ with open(resourceLocation(name), mode) as fh:
+ return fh.read()
+
+
try:
skipIf = unittest.skipIf
except AttributeError: