summaryrefslogtreecommitdiff
path: root/tests/cloud_tests/platforms/platforms.py
blob: d4e5c561a27a8209e8d88747ae1b0e56d4d5cdc1 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
# This file is part of cloud-init. See LICENSE file for license information.

"""Base platform class."""
import os

from simplestreams import filters, mirrors
from simplestreams import util as s_util

from cloudinit import util as c_util


class Platform(object):
    """Base class for platforms."""

    platform_name = None

    def __init__(self, config):
        """Set up platform."""
        self.config = config
        self._generate_ssh_keys(config['data_dir'])

    def get_image(self, img_conf):
        """Get image using specified image configuration.

        @param img_conf: configuration for image
        @return_value: cloud_tests.images instance
        """
        raise NotImplementedError

    def destroy(self):
        """Clean up platform data."""
        pass

    def _generate_ssh_keys(self, data_dir):
        """Generate SSH keys to be used with image."""
        filename = os.path.join(data_dir, 'id_rsa')

        if os.path.exists(filename):
            c_util.del_file(filename)

        c_util.subp(['ssh-keygen', '-t', 'rsa', '-b', '4096',
                     '-f', filename, '-P', '',
                     '-C', 'ubuntu@cloud_test'],
                    capture=True)

    @staticmethod
    def _query_streams(img_conf, img_filter):
        """Query streams for latest image given a specific filter.

        @param img_conf: configuration for image
        @param filters: array of filters as strings format 'key=value'
        @return: dictionary with latest image information or empty
        """
        def policy(content, path):
            return s_util.read_signed(content, keyring=img_conf['keyring'])

        (url, path) = s_util.path_from_mirror_url(img_conf['mirror_url'], None)
        smirror = mirrors.UrlMirrorReader(url, policy=policy)

        config = {'max_items': 1, 'filters': filters.get_filters(img_filter)}
        tmirror = FilterMirror(config)
        tmirror.sync(smirror, path)

        try:
            return tmirror.json_entries[0]
        except IndexError:
            raise RuntimeError('no images found with filter: %s' % img_filter)


class FilterMirror(mirrors.BasicMirrorWriter):
    """Taken from sstream-query to return query result as json array."""

    def __init__(self, config=None):
        super(FilterMirror, self).__init__(config=config)
        if config is None:
            config = {}
        self.config = config
        self.filters = config.get('filters', [])
        self.json_entries = []

    def load_products(self, path=None, content_id=None):
        return {'content_id': content_id, 'products': {}}

    def filter_item(self, data, src, target, pedigree):
        return filters.filter_item(self.filters, data, src, pedigree)

    def insert_item(self, data, src, target, pedigree, contentsource):
        # src and target are top level products:1.0
        # data is src['products'][ped[0]]['versions'][ped[1]]['items'][ped[2]]
        # contentsource is a ContentSource if 'path' exists in data or None
        data = s_util.products_exdata(src, pedigree)
        if 'path' in data:
            data.update({'item_url': contentsource.url})
        self.json_entries.append(data)

# vi: ts=4 expandtab