1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
|
# This file is part of cloud-init. See LICENSE file for license information.
"""Base platform class."""
import os
import shutil
from simplestreams import filters, mirrors
from simplestreams import util as s_util
from cloudinit import subp
from cloudinit import util as c_util
from tests.cloud_tests import util
class Platform(object):
"""Base class for platforms."""
platform_name = None
def __init__(self, config):
"""Set up platform."""
self.config = config
self.tmpdir = util.mkdtemp()
if 'data_dir' in config:
self.data_dir = config['data_dir']
else:
self.data_dir = os.path.join(self.tmpdir, "data_dir")
os.mkdir(self.data_dir)
self._generate_ssh_keys(self.data_dir)
def get_image(self, img_conf):
"""Get image using specified image configuration.
@param img_conf: configuration for image
@return_value: cloud_tests.images instance
"""
raise NotImplementedError
def destroy(self):
"""Clean up platform data."""
shutil.rmtree(self.tmpdir)
def _generate_ssh_keys(self, data_dir):
"""Generate SSH keys to be used with image."""
filename = os.path.join(data_dir, self.config['private_key'])
if os.path.exists(filename):
c_util.del_file(filename)
subp.subp(['ssh-keygen', '-m', 'PEM', '-t', 'rsa', '-b', '4096',
'-f', filename, '-P', '',
'-C', 'ubuntu@cloud_test'],
capture=True)
@staticmethod
def _query_streams(img_conf, img_filter):
"""Query streams for latest image given a specific filter.
@param img_conf: configuration for image
@param filters: array of filters as strings format 'key=value'
@return: dictionary with latest image information or empty
"""
def policy(content, path):
return s_util.read_signed(content, keyring=img_conf['keyring'])
(url, path) = s_util.path_from_mirror_url(img_conf['mirror_url'], None)
smirror = mirrors.UrlMirrorReader(url, policy=policy)
config = {'max_items': 1, 'filters': filters.get_filters(img_filter)}
tmirror = FilterMirror(config)
tmirror.sync(smirror, path)
try:
return tmirror.json_entries[0]
except IndexError:
raise RuntimeError('no images found with filter: %s' % img_filter)
class FilterMirror(mirrors.BasicMirrorWriter):
"""Taken from sstream-query to return query result as json array."""
def __init__(self, config=None):
super(FilterMirror, self).__init__(config=config)
if config is None:
config = {}
self.config = config
self.filters = config.get('filters', [])
self.json_entries = []
def load_products(self, path=None, content_id=None):
return {'content_id': content_id, 'products': {}}
def filter_item(self, data, src, target, pedigree):
return filters.filter_item(self.filters, data, src, pedigree)
def insert_item(self, data, src, target, pedigree, contentsource):
# src and target are top level products:1.0
# data is src['products'][ped[0]]['versions'][ped[1]]['items'][ped[2]]
# contentsource is a ContentSource if 'path' exists in data or None
data = s_util.products_exdata(src, pedigree)
if 'path' in data:
data.update({'item_url': contentsource.url})
self.json_entries.append(data)
# vi: ts=4 expandtab
|