summaryrefslogtreecommitdiff
path: root/tests/cloud_tests/util.py
blob: 64a86672432bb2b7bdf8e3a93d8013ce45d49c48 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
# This file is part of cloud-init. See LICENSE file for license information.

import glob
import os
import random
import string
import tempfile
import yaml

from cloudinit.distros import OSFAMILIES
from cloudinit import util as c_util
from tests.cloud_tests import LOG


def list_test_data(data_dir):
    """
    find all tests with test data available in data_dir
    data_dir should contain <platforms>/<os_name>/<testnames>/<data>
    return_value: {<platform>: {<os_name>: [<testname>]}}
    """
    if not os.path.isdir(data_dir):
        raise ValueError("bad data dir")

    res = {}
    for platform in os.listdir(data_dir):
        res[platform] = {}
        for os_name in os.listdir(os.path.join(data_dir, platform)):
            res[platform][os_name] = [
                os.path.sep.join(f.split(os.path.sep)[-2:]) for f in
                glob.glob(os.sep.join((data_dir, platform, os_name, '*/*')))]

    LOG.debug('found test data: %s\n', res)
    return res


def gen_instance_name(prefix='cloud-test', image_desc=None, use_desc=None,
                      max_len=63, delim='-', max_tries=16, used_list=None,
                      valid=string.ascii_lowercase + string.digits):
    """
    generate an unique name for a test instance
    prefix: name prefix, defaults to cloud-test, default should be left
    image_desc: short string with image desc, will be truncated to 16 chars
    use_desc: short string with usage desc, will be truncated to 30 chars
    max_len: maximum name length, defaults to 64 chars
    delim: delimiter to use between tokens
    max_tries: maximum tries to find a unique name before giving up
    used_list: already used names, or none to not check
    valid: string of valid characters for name
    return_value: valid, unused name, may raise StopIteration
    """
    unknown = 'unknown'

    def join(*args):
        """
        join args with delim
        """
        return delim.join(args)

    def fill(*args):
        """
        join name elems and fill rest with random data
        """
        name = join(*args)
        num = max_len - len(name) - len(delim)
        return join(name, ''.join(random.choice(valid) for _ in range(num)))

    def clean(elem, max_len):
        """
        filter bad characters out of elem and trim to length
        """
        elem = elem[:max_len] if elem else unknown
        return ''.join(c if c in valid else delim for c in elem)

    return next(name for name in
                (fill(prefix, clean(image_desc, 16), clean(use_desc, 30))
                 for _ in range(max_tries))
                if not used_list or name not in used_list)


def sorted_unique(iterable, key=None, reverse=False):
    """
    return_value: a sorted list of unique items in iterable
    """
    return sorted(set(iterable), key=key, reverse=reverse)


def get_os_family(os_name):
    """
    get os family type for os_name
    """
    return next((k for k, v in OSFAMILIES.items() if os_name in v), None)


def current_verbosity():
    """
    get verbosity currently in effect from log level
    return_value: verbosity, 0-2, 2 = verbose, 0 = quiet
    """
    return max(min(3 - int(LOG.level / 10), 2), 0)


def is_writable_dir(path):
    """
    make sure dir is writable
    """
    try:
        c_util.ensure_dir(path)
        os.remove(tempfile.mkstemp(dir=os.path.abspath(path))[1])
    except (IOError, OSError):
        return False
    return True


def is_clean_writable_dir(path):
    """
    make sure dir is empty and writable, creating it if it does not exist
    return_value: True/False if successful
    """
    path = os.path.abspath(path)
    if not (is_writable_dir(path) and len(os.listdir(path)) == 0):
        return False
    return True


def configure_yaml():
    yaml.add_representer(str, (lambda dumper, data: dumper.represent_scalar(
        'tag:yaml.org,2002:str', data, style='|' if '\n' in data else '')))


def yaml_format(data):
    """
    format data as yaml
    """
    configure_yaml()
    return yaml.dump(data, indent=2, default_flow_style=False)


def yaml_dump(data, path):
    """
    dump data to path in yaml format
    """
    write_file(os.path.abspath(path), yaml_format(data), omode='w')


def merge_results(data, path):
    """
    handle merging results from collect phase and verify phase
    """
    current = {}
    if os.path.exists(path):
        with open(path, 'r') as fp:
            current = c_util.load_yaml(fp.read())
    current.update(data)
    yaml_dump(current, path)


def write_file(*args, **kwargs):
    """
    write a file using cloudinit.util.write_file
    """
    c_util.write_file(*args, **kwargs)

# vi: ts=4 expandtab