summaryrefslogtreecommitdiff
path: root/tests/cloud_tests/util.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/cloud_tests/util.py')
-rw-r--r--tests/cloud_tests/util.py235
1 files changed, 182 insertions, 53 deletions
diff --git a/tests/cloud_tests/util.py b/tests/cloud_tests/util.py
index 64a86672..2bbe21c7 100644
--- a/tests/cloud_tests/util.py
+++ b/tests/cloud_tests/util.py
@@ -1,28 +1,43 @@
# This file is part of cloud-init. See LICENSE file for license information.
+"""Utilities for re-use across integration tests."""
+
+import copy
import glob
import os
import random
+import shutil
import string
import tempfile
import yaml
-from cloudinit.distros import OSFAMILIES
from cloudinit import util as c_util
from tests.cloud_tests import LOG
+OS_FAMILY_MAPPING = {
+ 'debian': ['debian', 'ubuntu'],
+ 'redhat': ['centos', 'rhel', 'fedora'],
+ 'gentoo': ['gentoo'],
+ 'freebsd': ['freebsd'],
+ 'suse': ['sles'],
+ 'arch': ['arch'],
+}
+
def list_test_data(data_dir):
- """
- find all tests with test data available in data_dir
- data_dir should contain <platforms>/<os_name>/<testnames>/<data>
- return_value: {<platform>: {<os_name>: [<testname>]}}
+ """Find all tests with test data available in data_dir.
+
+ @param data_dir: should contain <platforms>/<os_name>/<testnames>/<data>
+ @return_value: {<platform>: {<os_name>: [<testname>]}}
"""
if not os.path.isdir(data_dir):
raise ValueError("bad data dir")
res = {}
for platform in os.listdir(data_dir):
+ if not os.path.isdir(os.path.join(data_dir, platform)):
+ continue
+
res[platform] = {}
for os_name in os.listdir(os.path.join(data_dir, platform)):
res[platform][os_name] = [
@@ -36,39 +51,33 @@ def list_test_data(data_dir):
def gen_instance_name(prefix='cloud-test', image_desc=None, use_desc=None,
max_len=63, delim='-', max_tries=16, used_list=None,
valid=string.ascii_lowercase + string.digits):
- """
- generate an unique name for a test instance
- prefix: name prefix, defaults to cloud-test, default should be left
- image_desc: short string with image desc, will be truncated to 16 chars
- use_desc: short string with usage desc, will be truncated to 30 chars
- max_len: maximum name length, defaults to 64 chars
- delim: delimiter to use between tokens
- max_tries: maximum tries to find a unique name before giving up
- used_list: already used names, or none to not check
- valid: string of valid characters for name
- return_value: valid, unused name, may raise StopIteration
+ """Generate an unique name for a test instance.
+
+ @param prefix: name prefix, defaults to cloud-test, default should be left
+ @param image_desc: short string (len <= 16) with image desc
+ @param use_desc: short string (len <= 30) with usage desc
+ @param max_len: maximum name length, defaults to 64 chars
+ @param delim: delimiter to use between tokens
+ @param max_tries: maximum tries to find a unique name before giving up
+ @param used_list: already used names, or none to not check
+ @param valid: string of valid characters for name
+ @return_value: valid, unused name, may raise StopIteration
"""
unknown = 'unknown'
def join(*args):
- """
- join args with delim
- """
+ """Join args with delim."""
return delim.join(args)
def fill(*args):
- """
- join name elems and fill rest with random data
- """
+ """Join name elems and fill rest with random data."""
name = join(*args)
num = max_len - len(name) - len(delim)
return join(name, ''.join(random.choice(valid) for _ in range(num)))
def clean(elem, max_len):
- """
- filter bad characters out of elem and trim to length
- """
- elem = elem[:max_len] if elem else unknown
+ """Filter bad characters out of elem and trim to length."""
+ elem = elem.lower()[:max_len] if elem else unknown
return ''.join(c if c in valid else delim for c in elem)
return next(name for name in
@@ -78,30 +87,39 @@ def gen_instance_name(prefix='cloud-test', image_desc=None, use_desc=None,
def sorted_unique(iterable, key=None, reverse=False):
- """
- return_value: a sorted list of unique items in iterable
+ """Create unique sorted list.
+
+ @param iterable: the data structure to sort
+ @param key: if you have a specific key
+ @param reverse: to reverse or not
+ @return_value: a sorted list of unique items in iterable
"""
return sorted(set(iterable), key=key, reverse=reverse)
def get_os_family(os_name):
+ """Get os family type for os_name.
+
+ @param os_name: name of os
+ @return_value: family name for os_name
"""
- get os family type for os_name
- """
- return next((k for k, v in OSFAMILIES.items() if os_name in v), None)
+ return next((k for k, v in OS_FAMILY_MAPPING.items()
+ if os_name.lower() in v), None)
def current_verbosity():
- """
- get verbosity currently in effect from log level
- return_value: verbosity, 0-2, 2 = verbose, 0 = quiet
+ """Get verbosity currently in effect from log level.
+
+ @return_value: verbosity, 0-2, 2=verbose, 0=quiet
"""
return max(min(3 - int(LOG.level / 10), 2), 0)
def is_writable_dir(path):
- """
- make sure dir is writable
+ """Make sure dir is writable.
+
+ @param path: path to determine if writable
+ @return_value: boolean with result
"""
try:
c_util.ensure_dir(path)
@@ -112,9 +130,10 @@ def is_writable_dir(path):
def is_clean_writable_dir(path):
- """
- make sure dir is empty and writable, creating it if it does not exist
- return_value: True/False if successful
+ """Make sure dir is empty and writable, creating it if it does not exist.
+
+ @param path: path to check
+ @return_value: True/False if successful
"""
path = os.path.abspath(path)
if not (is_writable_dir(path) and len(os.listdir(path)) == 0):
@@ -123,29 +142,31 @@ def is_clean_writable_dir(path):
def configure_yaml():
+ """Clean yaml."""
yaml.add_representer(str, (lambda dumper, data: dumper.represent_scalar(
'tag:yaml.org,2002:str', data, style='|' if '\n' in data else '')))
-def yaml_format(data):
- """
- format data as yaml
+def yaml_format(data, content_type=None):
+ """Format data as yaml.
+
+ @param data: data to dump
+ @param header: if specified, add a header to the dumped data
+ @return_value: yaml string
"""
configure_yaml()
- return yaml.dump(data, indent=2, default_flow_style=False)
+ content_type = (
+ '#{}\n'.format(content_type.strip('#\n')) if content_type else '')
+ return content_type + yaml.dump(data, indent=2, default_flow_style=False)
def yaml_dump(data, path):
- """
- dump data to path in yaml format
- """
- write_file(os.path.abspath(path), yaml_format(data), omode='w')
+ """Dump data to path in yaml format."""
+ c_util.write_file(os.path.abspath(path), yaml_format(data), omode='w')
def merge_results(data, path):
- """
- handle merging results from collect phase and verify phase
- """
+ """Handle merging results from collect phase and verify phase."""
current = {}
if os.path.exists(path):
with open(path, 'r') as fp:
@@ -154,10 +175,118 @@ def merge_results(data, path):
yaml_dump(current, path)
-def write_file(*args, **kwargs):
+def rel_files(basedir):
+ """List of files under directory by relative path, not including dirs.
+
+ @param basedir: directory to search
+ @return_value: list or relative paths
+ """
+ basedir = os.path.normpath(basedir)
+ return [path[len(basedir) + 1:] for path in
+ glob.glob(os.path.join(basedir, '**'), recursive=True)
+ if not os.path.isdir(path)]
+
+
+def flat_tar(output, basedir, owner='root', group='root'):
+ """Create a flat tar archive (no leading ./) from basedir.
+
+ @param output: output tar file to write
+ @param basedir: base directory for archive
+ @param owner: owner of archive files
+ @param group: group archive files belong to
+ @return_value: none
+ """
+ c_util.subp(['tar', 'cf', output, '--owner', owner, '--group', group,
+ '-C', basedir] + rel_files(basedir), capture=True)
+
+
+def parse_conf_list(entries, valid=None, boolean=False):
+ """Parse config in a list of strings in key=value format.
+
+ @param entries: list of key=value strings
+ @param valid: list of valid keys in result, return None if invalid input
+ @param boolean: if true, then interpret all values as booleans
+ @return_value: dict of configuration or None if invalid
"""
- write a file using cloudinit.util.write_file
+ res = {key: value.lower() == 'true' if boolean else value
+ for key, value in (i.split('=') for i in entries)}
+ return res if not valid or all(k in valid for k in res.keys()) else None
+
+
+def update_args(args, updates, preserve_old=True):
+ """Update cmdline arguments from a dictionary.
+
+ @param args: cmdline arguments
+ @param updates: dictionary of {arg_name: new_value} mappings
+ @param preserve_old: if true, create a deep copy of args before updating
+ @return_value: updated cmdline arguments
+ """
+ args = copy.deepcopy(args) if preserve_old else args
+ if updates:
+ vars(args).update(updates)
+ return args
+
+
+def update_user_data(user_data, updates, dump_to_yaml=True):
+ """Update user_data from dictionary.
+
+ @param user_data: user data as yaml string or dict
+ @param updates: dictionary to merge with user data
+ @param dump_to_yaml: return as yaml dumped string if true
+ @return_value: updated user data, as yaml string if dump_to_yaml is true
"""
- c_util.write_file(*args, **kwargs)
+ user_data = (c_util.load_yaml(user_data)
+ if isinstance(user_data, str) else copy.deepcopy(user_data))
+ user_data.update(updates)
+ return (yaml_format(user_data, content_type='cloud-config')
+ if dump_to_yaml else user_data)
+
+
+class InTargetExecuteError(c_util.ProcessExecutionError):
+ """Error type for in target commands that fail."""
+
+ default_desc = 'Unexpected error while running command in target instance'
+
+ def __init__(self, stdout, stderr, exit_code, cmd, instance,
+ description=None):
+ """Init error and parent error class."""
+ if isinstance(cmd, (tuple, list)):
+ cmd = ' '.join(cmd)
+ super(InTargetExecuteError, self).__init__(
+ stdout=stdout, stderr=stderr, exit_code=exit_code, cmd=cmd,
+ reason="Instance: {}".format(instance),
+ description=description if description else self.default_desc)
+
+
+class TempDir(object):
+ """Configurable temporary directory like tempfile.TemporaryDirectory."""
+
+ def __init__(self, tmpdir=None, preserve=False, prefix='cloud_test_data_'):
+ """Initialize.
+
+ @param tmpdir: directory to use as tempdir
+ @param preserve: if true, always preserve data on exit
+ @param prefix: prefix to use for tempfile name
+ """
+ self.tmpdir = tmpdir
+ self.preserve = preserve
+ self.prefix = prefix
+
+ def __enter__(self):
+ """Create tempdir.
+
+ @return_value: tempdir path
+ """
+ if not self.tmpdir:
+ self.tmpdir = tempfile.mkdtemp(prefix=self.prefix)
+ LOG.debug('using tmpdir: %s', self.tmpdir)
+ return self.tmpdir
+
+ def __exit__(self, etype, value, trace):
+ """Destroy tempdir if no errors occurred."""
+ if etype or self.preserve:
+ LOG.info('leaving data in %s', self.tmpdir)
+ else:
+ shutil.rmtree(self.tmpdir)
# vi: ts=4 expandtab