1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
|
# This file is part of cloud-init. See LICENSE file for license information.
import logging
import os
import uuid
from tempfile import NamedTemporaryFile
from pycloudlib.instance import BaseInstance
from pycloudlib.result import Result
from tests.integration_tests import integration_settings
try:
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from tests.integration_tests.clouds import IntegrationCloud
except ImportError:
pass
log = logging.getLogger('integration_testing')
def _get_tmp_path():
tmp_filename = str(uuid.uuid4())
return '/var/tmp/{}.tmp'.format(tmp_filename)
class IntegrationInstance:
def __init__(self, cloud: 'IntegrationCloud', instance: BaseInstance,
settings=integration_settings):
self.cloud = cloud
self.instance = instance
self.settings = settings
def destroy(self):
self.instance.delete()
def execute(self, command, *, use_sudo=True) -> Result:
if self.instance.username == 'root' and use_sudo is False:
raise Exception('Root user cannot run unprivileged')
return self.instance.execute(command, use_sudo=use_sudo)
def pull_file(self, remote_path, local_path):
# First copy to a temporary directory because of permissions issues
tmp_path = _get_tmp_path()
self.instance.execute('cp {} {}'.format(str(remote_path), tmp_path))
self.instance.pull_file(tmp_path, str(local_path))
def push_file(self, local_path, remote_path):
# First push to a temporary directory because of permissions issues
tmp_path = _get_tmp_path()
self.instance.push_file(str(local_path), tmp_path)
self.execute('mv {} {}'.format(tmp_path, str(remote_path)))
def read_from_file(self, remote_path) -> str:
result = self.execute('cat {}'.format(remote_path))
if result.failed:
# TODO: Raise here whatever pycloudlib raises when it has
# a consistent error response
raise IOError(
'Failed reading remote file via cat: {}\n'
'Return code: {}\n'
'Stderr: {}\n'
'Stdout: {}'.format(
remote_path, result.return_code,
result.stderr, result.stdout)
)
return result.stdout
def write_to_file(self, remote_path, contents: str):
# Writes file locally and then pushes it rather
# than writing the file directly on the instance
with NamedTemporaryFile('w', delete=False) as tmp_file:
tmp_file.write(contents)
try:
self.push_file(tmp_file.name, remote_path)
finally:
os.unlink(tmp_file.name)
def snapshot(self):
return self.cloud.snapshot(self.instance)
def _install_new_cloud_init(self, remote_script):
self.execute(remote_script)
version = self.execute('cloud-init -v').split()[-1]
log.info('Installed cloud-init version: %s', version)
self.instance.clean()
image_id = self.snapshot()
log.info('Created new image: %s', image_id)
self.cloud.image_id = image_id
def install_proposed_image(self):
log.info('Installing proposed image')
remote_script = (
'echo deb "http://archive.ubuntu.com/ubuntu '
'$(lsb_release -sc)-proposed main" | '
'tee /etc/apt/sources.list.d/proposed.list\n'
'apt-get update -q\n'
'apt-get install -qy cloud-init'
)
self._install_new_cloud_init(remote_script)
def install_ppa(self, repo):
log.info('Installing PPA')
remote_script = (
'add-apt-repository {repo} -y && '
'apt-get update -q && '
'apt-get install -qy cloud-init'
).format(repo=repo)
self._install_new_cloud_init(remote_script)
def install_deb(self):
log.info('Installing deb package')
deb_path = integration_settings.CLOUD_INIT_SOURCE
deb_name = os.path.basename(deb_path)
remote_path = '/var/tmp/{}'.format(deb_name)
self.push_file(
local_path=integration_settings.CLOUD_INIT_SOURCE,
remote_path=remote_path)
remote_script = 'dpkg -i {path}'.format(path=remote_path)
self._install_new_cloud_init(remote_script)
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if not self.settings.KEEP_INSTANCE:
self.destroy()
class IntegrationEc2Instance(IntegrationInstance):
pass
class IntegrationGceInstance(IntegrationInstance):
pass
class IntegrationAzureInstance(IntegrationInstance):
pass
class IntegrationOciInstance(IntegrationInstance):
pass
class IntegrationLxdInstance(IntegrationInstance):
pass
|