From f53fc46aa732e3b29991b3e5e39da31a722945ee Mon Sep 17 00:00:00 2001 From: Wesley Wiedenmeier Date: Thu, 22 Dec 2016 17:27:37 -0500 Subject: integration test: initial commit of integration test framework The adds in end-to-end testing of cloud-init. The framework utilizes LXD and cloud images as a backend to test user-data passed in. Arbitrary data is then captured from predefined commands specified by the user. After collection, data verification is completed by running a series of Python unit tests against the collected data. Currently only the Ubuntu Trusty, Xenial, Yakkety, and Zesty releases are supported. Test cases for 50% of the modules is complete and available. Additionally a Read the Docs file was created to guide test writing and execution. --- tests/cloud_tests/util.py | 163 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 163 insertions(+) create mode 100644 tests/cloud_tests/util.py (limited to 'tests/cloud_tests/util.py') diff --git a/tests/cloud_tests/util.py b/tests/cloud_tests/util.py new file mode 100644 index 00000000..64a86672 --- /dev/null +++ b/tests/cloud_tests/util.py @@ -0,0 +1,163 @@ +# This file is part of cloud-init. See LICENSE file for license information. + +import glob +import os +import random +import string +import tempfile +import yaml + +from cloudinit.distros import OSFAMILIES +from cloudinit import util as c_util +from tests.cloud_tests import LOG + + +def list_test_data(data_dir): + """ + find all tests with test data available in data_dir + data_dir should contain /// + return_value: {: {: []}} + """ + if not os.path.isdir(data_dir): + raise ValueError("bad data dir") + + res = {} + for platform in os.listdir(data_dir): + res[platform] = {} + for os_name in os.listdir(os.path.join(data_dir, platform)): + res[platform][os_name] = [ + os.path.sep.join(f.split(os.path.sep)[-2:]) for f in + glob.glob(os.sep.join((data_dir, platform, os_name, '*/*')))] + + LOG.debug('found test data: %s\n', res) + return res + + +def gen_instance_name(prefix='cloud-test', image_desc=None, use_desc=None, + max_len=63, delim='-', max_tries=16, used_list=None, + valid=string.ascii_lowercase + string.digits): + """ + generate an unique name for a test instance + prefix: name prefix, defaults to cloud-test, default should be left + image_desc: short string with image desc, will be truncated to 16 chars + use_desc: short string with usage desc, will be truncated to 30 chars + max_len: maximum name length, defaults to 64 chars + delim: delimiter to use between tokens + max_tries: maximum tries to find a unique name before giving up + used_list: already used names, or none to not check + valid: string of valid characters for name + return_value: valid, unused name, may raise StopIteration + """ + unknown = 'unknown' + + def join(*args): + """ + join args with delim + """ + return delim.join(args) + + def fill(*args): + """ + join name elems and fill rest with random data + """ + name = join(*args) + num = max_len - len(name) - len(delim) + return join(name, ''.join(random.choice(valid) for _ in range(num))) + + def clean(elem, max_len): + """ + filter bad characters out of elem and trim to length + """ + elem = elem[:max_len] if elem else unknown + return ''.join(c if c in valid else delim for c in elem) + + return next(name for name in + (fill(prefix, clean(image_desc, 16), clean(use_desc, 30)) + for _ in range(max_tries)) + if not used_list or name not in used_list) + + +def sorted_unique(iterable, key=None, reverse=False): + """ + return_value: a sorted list of unique items in iterable + """ + return sorted(set(iterable), key=key, reverse=reverse) + + +def get_os_family(os_name): + """ + get os family type for os_name + """ + return next((k for k, v in OSFAMILIES.items() if os_name in v), None) + + +def current_verbosity(): + """ + get verbosity currently in effect from log level + return_value: verbosity, 0-2, 2 = verbose, 0 = quiet + """ + return max(min(3 - int(LOG.level / 10), 2), 0) + + +def is_writable_dir(path): + """ + make sure dir is writable + """ + try: + c_util.ensure_dir(path) + os.remove(tempfile.mkstemp(dir=os.path.abspath(path))[1]) + except (IOError, OSError): + return False + return True + + +def is_clean_writable_dir(path): + """ + make sure dir is empty and writable, creating it if it does not exist + return_value: True/False if successful + """ + path = os.path.abspath(path) + if not (is_writable_dir(path) and len(os.listdir(path)) == 0): + return False + return True + + +def configure_yaml(): + yaml.add_representer(str, (lambda dumper, data: dumper.represent_scalar( + 'tag:yaml.org,2002:str', data, style='|' if '\n' in data else ''))) + + +def yaml_format(data): + """ + format data as yaml + """ + configure_yaml() + return yaml.dump(data, indent=2, default_flow_style=False) + + +def yaml_dump(data, path): + """ + dump data to path in yaml format + """ + write_file(os.path.abspath(path), yaml_format(data), omode='w') + + +def merge_results(data, path): + """ + handle merging results from collect phase and verify phase + """ + current = {} + if os.path.exists(path): + with open(path, 'r') as fp: + current = c_util.load_yaml(fp.read()) + current.update(data) + yaml_dump(current, path) + + +def write_file(*args, **kwargs): + """ + write a file using cloudinit.util.write_file + """ + c_util.write_file(*args, **kwargs) + +# vi: ts=4 expandtab -- cgit v1.2.3