diff options
Diffstat (limited to 'tests/cloud_tests/util.py')
-rw-r--r-- | tests/cloud_tests/util.py | 235 |
1 files changed, 182 insertions, 53 deletions
diff --git a/tests/cloud_tests/util.py b/tests/cloud_tests/util.py index 64a86672..2bbe21c7 100644 --- a/tests/cloud_tests/util.py +++ b/tests/cloud_tests/util.py @@ -1,28 +1,43 @@ # This file is part of cloud-init. See LICENSE file for license information. +"""Utilities for re-use across integration tests.""" + +import copy import glob import os import random +import shutil import string import tempfile import yaml -from cloudinit.distros import OSFAMILIES from cloudinit import util as c_util from tests.cloud_tests import LOG +OS_FAMILY_MAPPING = { + 'debian': ['debian', 'ubuntu'], + 'redhat': ['centos', 'rhel', 'fedora'], + 'gentoo': ['gentoo'], + 'freebsd': ['freebsd'], + 'suse': ['sles'], + 'arch': ['arch'], +} + def list_test_data(data_dir): - """ - find all tests with test data available in data_dir - data_dir should contain <platforms>/<os_name>/<testnames>/<data> - return_value: {<platform>: {<os_name>: [<testname>]}} + """Find all tests with test data available in data_dir. + + @param data_dir: should contain <platforms>/<os_name>/<testnames>/<data> + @return_value: {<platform>: {<os_name>: [<testname>]}} """ if not os.path.isdir(data_dir): raise ValueError("bad data dir") res = {} for platform in os.listdir(data_dir): + if not os.path.isdir(os.path.join(data_dir, platform)): + continue + res[platform] = {} for os_name in os.listdir(os.path.join(data_dir, platform)): res[platform][os_name] = [ @@ -36,39 +51,33 @@ def list_test_data(data_dir): def gen_instance_name(prefix='cloud-test', image_desc=None, use_desc=None, max_len=63, delim='-', max_tries=16, used_list=None, valid=string.ascii_lowercase + string.digits): - """ - generate an unique name for a test instance - prefix: name prefix, defaults to cloud-test, default should be left - image_desc: short string with image desc, will be truncated to 16 chars - use_desc: short string with usage desc, will be truncated to 30 chars - max_len: maximum name length, defaults to 64 chars - delim: delimiter to use between tokens - max_tries: maximum tries to find a unique name before giving up - used_list: already used names, or none to not check - valid: string of valid characters for name - return_value: valid, unused name, may raise StopIteration + """Generate an unique name for a test instance. + + @param prefix: name prefix, defaults to cloud-test, default should be left + @param image_desc: short string (len <= 16) with image desc + @param use_desc: short string (len <= 30) with usage desc + @param max_len: maximum name length, defaults to 64 chars + @param delim: delimiter to use between tokens + @param max_tries: maximum tries to find a unique name before giving up + @param used_list: already used names, or none to not check + @param valid: string of valid characters for name + @return_value: valid, unused name, may raise StopIteration """ unknown = 'unknown' def join(*args): - """ - join args with delim - """ + """Join args with delim.""" return delim.join(args) def fill(*args): - """ - join name elems and fill rest with random data - """ + """Join name elems and fill rest with random data.""" name = join(*args) num = max_len - len(name) - len(delim) return join(name, ''.join(random.choice(valid) for _ in range(num))) def clean(elem, max_len): - """ - filter bad characters out of elem and trim to length - """ - elem = elem[:max_len] if elem else unknown + """Filter bad characters out of elem and trim to length.""" + elem = elem.lower()[:max_len] if elem else unknown return ''.join(c if c in valid else delim for c in elem) return next(name for name in @@ -78,30 +87,39 @@ def gen_instance_name(prefix='cloud-test', image_desc=None, use_desc=None, def sorted_unique(iterable, key=None, reverse=False): - """ - return_value: a sorted list of unique items in iterable + """Create unique sorted list. + + @param iterable: the data structure to sort + @param key: if you have a specific key + @param reverse: to reverse or not + @return_value: a sorted list of unique items in iterable """ return sorted(set(iterable), key=key, reverse=reverse) def get_os_family(os_name): + """Get os family type for os_name. + + @param os_name: name of os + @return_value: family name for os_name """ - get os family type for os_name - """ - return next((k for k, v in OSFAMILIES.items() if os_name in v), None) + return next((k for k, v in OS_FAMILY_MAPPING.items() + if os_name.lower() in v), None) def current_verbosity(): - """ - get verbosity currently in effect from log level - return_value: verbosity, 0-2, 2 = verbose, 0 = quiet + """Get verbosity currently in effect from log level. + + @return_value: verbosity, 0-2, 2=verbose, 0=quiet """ return max(min(3 - int(LOG.level / 10), 2), 0) def is_writable_dir(path): - """ - make sure dir is writable + """Make sure dir is writable. + + @param path: path to determine if writable + @return_value: boolean with result """ try: c_util.ensure_dir(path) @@ -112,9 +130,10 @@ def is_writable_dir(path): def is_clean_writable_dir(path): - """ - make sure dir is empty and writable, creating it if it does not exist - return_value: True/False if successful + """Make sure dir is empty and writable, creating it if it does not exist. + + @param path: path to check + @return_value: True/False if successful """ path = os.path.abspath(path) if not (is_writable_dir(path) and len(os.listdir(path)) == 0): @@ -123,29 +142,31 @@ def is_clean_writable_dir(path): def configure_yaml(): + """Clean yaml.""" yaml.add_representer(str, (lambda dumper, data: dumper.represent_scalar( 'tag:yaml.org,2002:str', data, style='|' if '\n' in data else ''))) -def yaml_format(data): - """ - format data as yaml +def yaml_format(data, content_type=None): + """Format data as yaml. + + @param data: data to dump + @param header: if specified, add a header to the dumped data + @return_value: yaml string """ configure_yaml() - return yaml.dump(data, indent=2, default_flow_style=False) + content_type = ( + '#{}\n'.format(content_type.strip('#\n')) if content_type else '') + return content_type + yaml.dump(data, indent=2, default_flow_style=False) def yaml_dump(data, path): - """ - dump data to path in yaml format - """ - write_file(os.path.abspath(path), yaml_format(data), omode='w') + """Dump data to path in yaml format.""" + c_util.write_file(os.path.abspath(path), yaml_format(data), omode='w') def merge_results(data, path): - """ - handle merging results from collect phase and verify phase - """ + """Handle merging results from collect phase and verify phase.""" current = {} if os.path.exists(path): with open(path, 'r') as fp: @@ -154,10 +175,118 @@ def merge_results(data, path): yaml_dump(current, path) -def write_file(*args, **kwargs): +def rel_files(basedir): + """List of files under directory by relative path, not including dirs. + + @param basedir: directory to search + @return_value: list or relative paths + """ + basedir = os.path.normpath(basedir) + return [path[len(basedir) + 1:] for path in + glob.glob(os.path.join(basedir, '**'), recursive=True) + if not os.path.isdir(path)] + + +def flat_tar(output, basedir, owner='root', group='root'): + """Create a flat tar archive (no leading ./) from basedir. + + @param output: output tar file to write + @param basedir: base directory for archive + @param owner: owner of archive files + @param group: group archive files belong to + @return_value: none + """ + c_util.subp(['tar', 'cf', output, '--owner', owner, '--group', group, + '-C', basedir] + rel_files(basedir), capture=True) + + +def parse_conf_list(entries, valid=None, boolean=False): + """Parse config in a list of strings in key=value format. + + @param entries: list of key=value strings + @param valid: list of valid keys in result, return None if invalid input + @param boolean: if true, then interpret all values as booleans + @return_value: dict of configuration or None if invalid """ - write a file using cloudinit.util.write_file + res = {key: value.lower() == 'true' if boolean else value + for key, value in (i.split('=') for i in entries)} + return res if not valid or all(k in valid for k in res.keys()) else None + + +def update_args(args, updates, preserve_old=True): + """Update cmdline arguments from a dictionary. + + @param args: cmdline arguments + @param updates: dictionary of {arg_name: new_value} mappings + @param preserve_old: if true, create a deep copy of args before updating + @return_value: updated cmdline arguments + """ + args = copy.deepcopy(args) if preserve_old else args + if updates: + vars(args).update(updates) + return args + + +def update_user_data(user_data, updates, dump_to_yaml=True): + """Update user_data from dictionary. + + @param user_data: user data as yaml string or dict + @param updates: dictionary to merge with user data + @param dump_to_yaml: return as yaml dumped string if true + @return_value: updated user data, as yaml string if dump_to_yaml is true """ - c_util.write_file(*args, **kwargs) + user_data = (c_util.load_yaml(user_data) + if isinstance(user_data, str) else copy.deepcopy(user_data)) + user_data.update(updates) + return (yaml_format(user_data, content_type='cloud-config') + if dump_to_yaml else user_data) + + +class InTargetExecuteError(c_util.ProcessExecutionError): + """Error type for in target commands that fail.""" + + default_desc = 'Unexpected error while running command in target instance' + + def __init__(self, stdout, stderr, exit_code, cmd, instance, + description=None): + """Init error and parent error class.""" + if isinstance(cmd, (tuple, list)): + cmd = ' '.join(cmd) + super(InTargetExecuteError, self).__init__( + stdout=stdout, stderr=stderr, exit_code=exit_code, cmd=cmd, + reason="Instance: {}".format(instance), + description=description if description else self.default_desc) + + +class TempDir(object): + """Configurable temporary directory like tempfile.TemporaryDirectory.""" + + def __init__(self, tmpdir=None, preserve=False, prefix='cloud_test_data_'): + """Initialize. + + @param tmpdir: directory to use as tempdir + @param preserve: if true, always preserve data on exit + @param prefix: prefix to use for tempfile name + """ + self.tmpdir = tmpdir + self.preserve = preserve + self.prefix = prefix + + def __enter__(self): + """Create tempdir. + + @return_value: tempdir path + """ + if not self.tmpdir: + self.tmpdir = tempfile.mkdtemp(prefix=self.prefix) + LOG.debug('using tmpdir: %s', self.tmpdir) + return self.tmpdir + + def __exit__(self, etype, value, trace): + """Destroy tempdir if no errors occurred.""" + if etype or self.preserve: + LOG.info('leaving data in %s', self.tmpdir) + else: + shutil.rmtree(self.tmpdir) # vi: ts=4 expandtab |