summaryrefslogtreecommitdiff
path: root/tests/cloud_tests/util.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/cloud_tests/util.py')
-rw-r--r--tests/cloud_tests/util.py532
1 files changed, 0 insertions, 532 deletions
diff --git a/tests/cloud_tests/util.py b/tests/cloud_tests/util.py
deleted file mode 100644
index 49baadb0..00000000
--- a/tests/cloud_tests/util.py
+++ /dev/null
@@ -1,532 +0,0 @@
-# This file is part of cloud-init. See LICENSE file for license information.
-
-"""Utilities for re-use across integration tests."""
-
-import base64
-import copy
-import glob
-import multiprocessing
-import os
-import random
-import shlex
-import shutil
-import string
-import subprocess
-import tempfile
-import time
-import yaml
-from contextlib import contextmanager
-
-from cloudinit import subp
-from cloudinit import util as c_util
-from tests.cloud_tests import LOG
-
-OS_FAMILY_MAPPING = {
- 'debian': ['debian', 'ubuntu'],
- 'redhat': ['centos', 'photon', 'rhel', 'fedora'],
- 'gentoo': ['gentoo'],
- 'freebsd': ['freebsd'],
- 'suse': ['sles'],
- 'arch': ['arch'],
-}
-
-
-def list_test_data(data_dir):
- """Find all tests with test data available in data_dir.
-
- @param data_dir: should contain <platforms>/<os_name>/<testnames>/<data>
- @return_value: {<platform>: {<os_name>: [<testname>]}}
- """
- if not os.path.isdir(data_dir):
- raise ValueError("bad data dir")
-
- res = {}
- for platform in os.listdir(data_dir):
- if not os.path.isdir(os.path.join(data_dir, platform)):
- continue
-
- res[platform] = {}
- for os_name in os.listdir(os.path.join(data_dir, platform)):
- res[platform][os_name] = [
- os.path.sep.join(f.split(os.path.sep)[-2:]) for f in
- glob.glob(os.sep.join((data_dir, platform, os_name, '*/*')))]
-
- LOG.debug('found test data: %s\n', res)
- return res
-
-
-def gen_instance_name(prefix='cloud-test', image_desc=None, use_desc=None,
- max_len=63, delim='-', max_tries=16, used_list=None,
- valid=string.ascii_lowercase + string.digits):
- """Generate an unique name for a test instance.
-
- @param prefix: name prefix, defaults to cloud-test, default should be left
- @param image_desc: short string (len <= 16) with image desc
- @param use_desc: short string (len <= 30) with usage desc
- @param max_len: maximum name length, defaults to 64 chars
- @param delim: delimiter to use between tokens
- @param max_tries: maximum tries to find a unique name before giving up
- @param used_list: already used names, or none to not check
- @param valid: string of valid characters for name
- @return_value: valid, unused name, may raise StopIteration
- """
- unknown = 'unknown'
-
- def join(*args):
- """Join args with delim."""
- return delim.join(args)
-
- def fill(*args):
- """Join name elems and fill rest with random data."""
- name = join(*args)
- num = max_len - len(name) - len(delim)
- return join(name, ''.join(random.choice(valid) for _ in range(num)))
-
- def clean(elem, max_len):
- """Filter bad characters out of elem and trim to length."""
- elem = elem.lower()[:max_len] if elem else unknown
- return ''.join(c if c in valid else delim for c in elem)
-
- return next(name for name in
- (fill(prefix, clean(image_desc, 16), clean(use_desc, 30))
- for _ in range(max_tries))
- if not used_list or name not in used_list)
-
-
-def sorted_unique(iterable, key=None, reverse=False):
- """Create unique sorted list.
-
- @param iterable: the data structure to sort
- @param key: if you have a specific key
- @param reverse: to reverse or not
- @return_value: a sorted list of unique items in iterable
- """
- return sorted(set(iterable), key=key, reverse=reverse)
-
-
-def get_os_family(os_name):
- """Get os family type for os_name.
-
- @param os_name: name of os
- @return_value: family name for os_name
- """
- return next((k for k, v in OS_FAMILY_MAPPING.items()
- if os_name.lower() in v), None)
-
-
-def current_verbosity():
- """Get verbosity currently in effect from log level.
-
- @return_value: verbosity, 0-2, 2=verbose, 0=quiet
- """
- return max(min(3 - int(LOG.level / 10), 2), 0)
-
-
-@contextmanager
-def emit_dots_on_travis():
- """
- A context manager that emits a dot every 10 seconds if running on Travis.
-
- Travis will kill jobs that don't emit output for a certain amount of time.
- This context manager spins up a background process which will emit a dot to
- stdout every 10 seconds to avoid being killed.
-
- It should be wrapped selectively around operations that are known to take a
- long time.
- """
- if os.environ.get('TRAVIS') != "true":
- # If we aren't on Travis, don't do anything.
- yield
- return
-
- def emit_dots():
- while True:
- print(".")
- time.sleep(10)
-
- dot_process = multiprocessing.Process(target=emit_dots)
- dot_process.start()
- try:
- yield
- finally:
- dot_process.terminate()
-
-
-def is_writable_dir(path):
- """Make sure dir is writable.
-
- @param path: path to determine if writable
- @return_value: boolean with result
- """
- try:
- c_util.ensure_dir(path)
- os.remove(tempfile.mkstemp(dir=os.path.abspath(path))[1])
- except (IOError, OSError):
- return False
- return True
-
-
-def is_clean_writable_dir(path):
- """Make sure dir is empty and writable, creating it if it does not exist.
-
- @param path: path to check
- @return_value: True/False if successful
- """
- path = os.path.abspath(path)
- if not (is_writable_dir(path) and len(os.listdir(path)) == 0):
- return False
- return True
-
-
-def configure_yaml():
- """Clean yaml."""
- yaml.add_representer(str, (lambda dumper, data: dumper.represent_scalar(
- 'tag:yaml.org,2002:str', data, style='|' if '\n' in data else '')))
-
-
-def yaml_format(data, content_type=None):
- """Format data as yaml.
-
- @param data: data to dump
- @param header: if specified, add a header to the dumped data
- @return_value: yaml string
- """
- configure_yaml()
- content_type = (
- '#{}\n'.format(content_type.strip('#\n')) if content_type else '')
- return content_type + yaml.dump(data, indent=2, default_flow_style=False)
-
-
-def yaml_dump(data, path):
- """Dump data to path in yaml format."""
- c_util.write_file(os.path.abspath(path), yaml_format(data), omode='w')
-
-
-def merge_results(data, path):
- """Handle merging results from collect phase and verify phase."""
- current = {}
- if os.path.exists(path):
- with open(path, 'r') as fp:
- current = c_util.load_yaml(fp.read())
- current.update(data)
- yaml_dump(current, path)
-
-
-def rel_files(basedir):
- """List of files under directory by relative path, not including dirs.
-
- @param basedir: directory to search
- @return_value: list or relative paths
- """
- basedir = os.path.normpath(basedir)
- return [path[len(basedir) + 1:] for path in
- glob.glob(os.path.join(basedir, '**'), recursive=True)
- if not os.path.isdir(path)]
-
-
-def flat_tar(output, basedir, owner='root', group='root'):
- """Create a flat tar archive (no leading ./) from basedir.
-
- @param output: output tar file to write
- @param basedir: base directory for archive
- @param owner: owner of archive files
- @param group: group archive files belong to
- @return_value: none
- """
- subp.subp(['tar', 'cf', output, '--owner', owner, '--group', group,
- '-C', basedir] + rel_files(basedir), capture=True)
-
-
-def parse_conf_list(entries, valid=None, boolean=False):
- """Parse config in a list of strings in key=value format.
-
- @param entries: list of key=value strings
- @param valid: list of valid keys in result, return None if invalid input
- @param boolean: if true, then interpret all values as booleans
- @return_value: dict of configuration or None if invalid
- """
- res = {key: value.lower() == 'true' if boolean else value
- for key, value in (i.split('=') for i in entries)}
- return res if not valid or all(k in valid for k in res.keys()) else None
-
-
-def update_args(args, updates, preserve_old=True):
- """Update cmdline arguments from a dictionary.
-
- @param args: cmdline arguments
- @param updates: dictionary of {arg_name: new_value} mappings
- @param preserve_old: if true, create a deep copy of args before updating
- @return_value: updated cmdline arguments
- """
- args = copy.deepcopy(args) if preserve_old else args
- if updates:
- vars(args).update(updates)
- return args
-
-
-def update_user_data(user_data, updates, dump_to_yaml=True):
- """Update user_data from dictionary.
-
- @param user_data: user data as yaml string or dict
- @param updates: dictionary to merge with user data
- @param dump_to_yaml: return as yaml dumped string if true
- @return_value: updated user data, as yaml string if dump_to_yaml is true
- """
- user_data = (c_util.load_yaml(user_data)
- if isinstance(user_data, str) else copy.deepcopy(user_data))
- user_data.update(updates)
- return (yaml_format(user_data, content_type='cloud-config')
- if dump_to_yaml else user_data)
-
-
-def shell_safe(cmd):
- """Produce string safe shell string.
-
- Create a string that can be passed to:
- set -- <string>
- to produce the same array that cmd represents.
-
- Internally we utilize 'getopt's ability/knowledge on how to quote
- strings to be safe for shell. This implementation could be changed
- to be pure python. It is just a matter of correctly escaping
- or quoting characters like: ' " ^ & $ ; ( ) ...
-
- @param cmd: command as a list
- """
- out = subprocess.check_output(
- ["getopt", "--shell", "sh", "--options", "", "--", "--"] + list(cmd))
- # out contains ' -- <data>\n'. drop the ' -- ' and the '\n'
- return out.decode()[4:-1]
-
-
-def shell_pack(cmd):
- """Return a string that can shuffled through 'sh' and execute cmd.
-
- In Python subprocess terms:
- check_output(cmd) == check_output(shell_pack(cmd), shell=True)
-
- @param cmd: list or string of command to pack up
- """
-
- if isinstance(cmd, str):
- cmd = [cmd]
- else:
- cmd = list(cmd)
-
- stuffed = shell_safe(cmd)
- # for whatever reason b64encode returns bytes when it is clearly
- # representable as a string by nature of being base64 encoded.
- b64 = base64.b64encode(stuffed.encode()).decode()
- return 'eval set -- "$(echo %s | base64 --decode)" && exec "$@"' % b64
-
-
-def shell_quote(cmd):
- if isinstance(cmd, (tuple, list)):
- return ' '.join([shlex.quote(x) for x in cmd])
- return shlex.quote(cmd)
-
-
-class TargetBase(object):
- _tmp_count = 0
-
- def execute(self, command, stdin=None, env=None,
- rcs=None, description=None):
- """Execute command in instance, recording output, error and exit code.
-
- Assumes functional networking and execution as root with the
- target filesystem being available at /.
-
- @param command: the command to execute as root inside the image
- if command is a string, then it will be executed as:
- ['sh', '-c', command]
- @param stdin: bytes content for standard in
- @param env: environment variables
- @param rcs: return codes.
- None (default): non-zero exit code will raise exception.
- False: any is allowed (No execption raised).
- list of int: any rc not in the list will raise exception.
- @param description: purpose of command
- @return_value: tuple containing stdout data, stderr data, exit code
- """
- if isinstance(command, str):
- command = ['sh', '-c', command]
-
- if rcs is None:
- rcs = (0,)
-
- if description:
- LOG.debug('executing "%s"', description)
- else:
- LOG.debug("executing command: %s", shell_quote(command))
-
- out, err, rc = self._execute(command=command, stdin=stdin, env=env)
-
- # False means accept anything.
- if (rcs is False or rc in rcs):
- return out, err, rc
-
- raise InTargetExecuteError(out, err, rc, command, description)
-
- def _execute(self, command, stdin=None, env=None):
- """Execute command in inside, return stdout, stderr and exit code.
-
- Assumes functional networking and execution as root with the
- target filesystem being available at /.
-
- @param stdin: bytes content for standard in
- @param env: environment variables
- @return_value: tuple containing stdout data, stderr data, exit code
-
- This is intended to be implemented by the Image or Instance.
- Many callers will use the higher level 'execute'."""
- raise NotImplementedError("_execute must be implemented by subclass.")
-
- def read_data(self, remote_path, decode=False):
- """Read data from instance filesystem.
-
- @param remote_path: path in instance
- @param decode: decode data before returning.
- @return_value: content of remote_path as bytes if 'decode' is False,
- and as string if 'decode' is True.
- """
- # when sh is invoked with '-c', then the first argument is "$0"
- # which is commonly understood as the "program name".
- # 'read_data' is the program name, and 'remote_path' is '$1'
- stdout, _stderr, rc = self._execute(
- ["sh", "-c", 'exec cat "$1"', 'read_data', remote_path])
- if rc != 0:
- raise RuntimeError("Failed to read file '%s'" % remote_path)
-
- if decode:
- return stdout.decode()
- return stdout
-
- def write_data(self, remote_path, data):
- """Write data to instance filesystem.
-
- @param remote_path: path in instance
- @param data: data to write in bytes
- """
- # when sh is invoked with '-c', then the first argument is "$0"
- # which is commonly understood as the "program name".
- # 'write_data' is the program name, and 'remote_path' is '$1'
- _, _, rc = self._execute(
- ["sh", "-c", 'exec cat >"$1"', 'write_data', remote_path],
- stdin=data)
-
- if rc != 0:
- raise RuntimeError("Failed to write to '%s'" % remote_path)
- return
-
- def pull_file(self, remote_path, local_path):
- """Copy file at 'remote_path', from instance to 'local_path'.
-
- @param remote_path: path on remote instance
- @param local_path: path on local instance
- """
- with open(local_path, 'wb') as fp:
- fp.write(self.read_data(remote_path))
-
- def push_file(self, local_path, remote_path):
- """Copy file at 'local_path' to instance at 'remote_path'.
-
- @param local_path: path on local instance
- @param remote_path: path on remote instance"""
- with open(local_path, "rb") as fp:
- self.write_data(remote_path, data=fp.read())
-
- def run_script(self, script, rcs=None, description=None):
- """Run script in target and return stdout.
-
- @param script: script contents
- @param rcs: allowed return codes from script
- @param description: purpose of script
- @return_value: stdout from script
- """
- # Just write to a file, add execute, run it, then remove it.
- shblob = '; '.join((
- 'set -e',
- 's="$1"',
- 'shift',
- 'cat > "$s"',
- 'trap "rm -f $s" EXIT',
- 'chmod +x "$s"',
- '"$s" "$@"'))
- return self.execute(
- ['sh', '-c', shblob, 'runscript', self.tmpfile()],
- stdin=script, description=description, rcs=rcs)
-
- def tmpfile(self):
- """Get a tmp file in the target.
-
- @return_value: path to new file in target
- """
- path = "/tmp/%s-%04d" % (type(self).__name__, self._tmp_count)
- self._tmp_count += 1
- return path
-
-
-class InTargetExecuteError(subp.ProcessExecutionError):
- """Error type for in target commands that fail."""
-
- default_desc = 'Unexpected error while running command.'
-
- def __init__(self, stdout, stderr, exit_code, cmd, description=None,
- reason=None):
- """Init error and parent error class."""
- super(InTargetExecuteError, self).__init__(
- stdout=stdout, stderr=stderr, exit_code=exit_code,
- cmd=shell_quote(cmd),
- description=description if description else self.default_desc,
- reason=reason)
-
-
-class PlatformError(IOError):
- """Error type for platform errors."""
-
- default_desc = 'unexpected error in platform.'
-
- def __init__(self, operation, description=None):
- """Init error and parent error class."""
- description = description if description else self.default_desc
-
- message = '%s: %s' % (operation, description)
- IOError.__init__(self, message)
-
-
-def mkdtemp(prefix='cloud_test_data'):
- return tempfile.mkdtemp(prefix=prefix)
-
-
-class TempDir(object):
- """Configurable temporary directory like tempfile.TemporaryDirectory."""
-
- def __init__(self, tmpdir=None, preserve=False, prefix='cloud_test_data_'):
- """Initialize.
-
- @param tmpdir: directory to use as tempdir
- @param preserve: if true, always preserve data on exit
- @param prefix: prefix to use for tempfile name
- """
- self.tmpdir = tmpdir
- self.preserve = preserve
- self.prefix = prefix
-
- def __enter__(self):
- """Create tempdir.
-
- @return_value: tempdir path
- """
- if not self.tmpdir:
- self.tmpdir = mkdtemp(prefix=self.prefix)
- LOG.debug('using tmpdir: %s', self.tmpdir)
- return self.tmpdir
-
- def __exit__(self, etype, value, trace):
- """Destroy tempdir if no errors occurred."""
- if etype or self.preserve:
- LOG.info('leaving data in %s', self.tmpdir)
- else:
- shutil.rmtree(self.tmpdir)
-
-# vi: ts=4 expandtab