1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
|
import functools
import logging
import multiprocessing
import os
import time
from collections import namedtuple
from contextlib import contextmanager
from pathlib import Path
log = logging.getLogger("integration_testing")
key_pair = namedtuple("key_pair", "public_key private_key")
ASSETS_DIR = Path("tests/integration_tests/assets")
KEY_PATH = ASSETS_DIR / "keys"
def verify_ordered_items_in_text(to_verify: list, text: str):
"""Assert all items in list appear in order in text.
Examples:
verify_ordered_items_in_text(['a', '1'], 'ab1') # passes
verify_ordered_items_in_text(['1', 'a'], 'ab1') # raises AssertionError
"""
index = 0
for item in to_verify:
index = text[index:].find(item)
assert index > -1, "Expected item not found: '{}'".format(item)
def verify_clean_log(log):
"""Assert no unexpected tracebacks or warnings in logs"""
warning_count = log.count("WARN")
expected_warnings = 0
traceback_count = log.count("Traceback")
expected_tracebacks = 0
warning_texts = [
# Consistently on all Azure launches:
# azure.py[WARNING]: No lease found; using default endpoint
"No lease found; using default endpoint"
]
traceback_texts = []
if "oracle" in log:
# LP: #1842752
lease_exists_text = "Stderr: RTNETLINK answers: File exists"
warning_texts.append(lease_exists_text)
traceback_texts.append(lease_exists_text)
# LP: #1833446
fetch_error_text = (
"UrlError: 404 Client Error: Not Found for url: "
"http://169.254.169.254/latest/meta-data/"
)
warning_texts.append(fetch_error_text)
traceback_texts.append(fetch_error_text)
# Oracle has a file in /etc/cloud/cloud.cfg.d that contains
# users:
# - default
# - name: opc
# ssh_redirect_user: true
# This can trigger a warning about opc having no public key
warning_texts.append(
"Unable to disable SSH logins for opc given ssh_redirect_user"
)
for warning_text in warning_texts:
expected_warnings += log.count(warning_text)
for traceback_text in traceback_texts:
expected_tracebacks += log.count(traceback_text)
assert warning_count == expected_warnings
assert traceback_count == expected_tracebacks
@contextmanager
def emit_dots_on_travis():
"""emit a dot every 60 seconds if running on Travis.
Travis will kill jobs that don't emit output for a certain amount of time.
This context manager spins up a background process which will emit a dot to
stdout every 60 seconds to avoid being killed.
It should be wrapped selectively around operations that are known to take a
long time.
"""
if os.environ.get("TRAVIS") != "true":
# If we aren't on Travis, don't do anything.
yield
return
def emit_dots():
while True:
log.info(".")
time.sleep(60)
dot_process = multiprocessing.Process(target=emit_dots)
dot_process.start()
try:
yield
finally:
dot_process.terminate()
def get_test_rsa_keypair(key_name: str = "test1") -> key_pair:
private_key_path = KEY_PATH / "id_rsa.{}".format(key_name)
public_key_path = KEY_PATH / "id_rsa.{}.pub".format(key_name)
with public_key_path.open() as public_file:
public_key = public_file.read()
with private_key_path.open() as private_file:
private_key = private_file.read()
return key_pair(public_key, private_key)
def retry(*, tries: int = 30, delay: int = 1):
"""Decorator for retries.
Retry a function until code no longer raises an exception or
max tries is reached.
Example:
@retry(tries=5, delay=1)
def try_something_that_may_not_be_ready():
...
"""
def _retry(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
last_error = None
for _ in range(tries):
try:
func(*args, **kwargs)
break
except Exception as e:
last_error = e
time.sleep(delay)
else:
if last_error:
raise last_error
return wrapper
return _retry
|