summaryrefslogtreecommitdiff
path: root/tests/unittests/test_handler/test_handler_bootcmd.py
blob: 09d4c6811d57ff2d04cefda9f0baee4082a972ab (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
# This file is part of cloud-init. See LICENSE file for license information.

from cloudinit.config import cc_bootcmd
from cloudinit.sources import DataSourceNone
from cloudinit import (distros, helpers, cloud, util)
from cloudinit.tests.helpers import CiTestCase, mock, skipIf

import logging
import tempfile

try:
    import jsonschema
    assert jsonschema  # avoid pyflakes error F401: import unused
    _missing_jsonschema_dep = False
except ImportError:
    _missing_jsonschema_dep = True

LOG = logging.getLogger(__name__)


class FakeExtendedTempFile(object):
    def __init__(self, suffix):
        self.suffix = suffix
        self.handle = tempfile.NamedTemporaryFile(
            prefix="ci-%s." % self.__class__.__name__, delete=False)

    def __enter__(self):
        return self.handle

    def __exit__(self, exc_type, exc_value, traceback):
        self.handle.close()
        util.del_file(self.handle.name)


class TestBootcmd(CiTestCase):

    with_logs = True

    _etmpfile_path = ('cloudinit.config.cc_bootcmd.temp_utils.'
                      'ExtendedTemporaryFile')

    def setUp(self):
        super(TestBootcmd, self).setUp()
        self.subp = util.subp
        self.new_root = self.tmp_dir()

    def _get_cloud(self, distro):
        paths = helpers.Paths({})
        cls = distros.fetch(distro)
        mydist = cls(distro, {}, paths)
        myds = DataSourceNone.DataSourceNone({}, mydist, paths)
        paths.datasource = myds
        return cloud.Cloud(myds, paths, {}, mydist, None)

    def test_handler_skip_if_no_bootcmd(self):
        """When the provided config doesn't contain bootcmd, skip it."""
        cfg = {}
        mycloud = self._get_cloud('ubuntu')
        cc_bootcmd.handle('notimportant', cfg, mycloud, LOG, None)
        self.assertIn(
            "Skipping module named notimportant, no 'bootcmd' key",
            self.logs.getvalue())

    def test_handler_invalid_command_set(self):
        """Commands which can't be converted to shell will raise errors."""
        invalid_config = {'bootcmd': 1}
        cc = self._get_cloud('ubuntu')
        with self.assertRaises(TypeError) as context_manager:
            cc_bootcmd.handle('cc_bootcmd', invalid_config, cc, LOG, [])
        self.assertIn('Failed to shellify bootcmd', self.logs.getvalue())
        self.assertEqual(
            "Input to shellify was type 'int'. Expected list or tuple.",
            str(context_manager.exception))

    @skipIf(_missing_jsonschema_dep, "No python-jsonschema dependency")
    def test_handler_schema_validation_warns_non_array_type(self):
        """Schema validation warns of non-array type for bootcmd key.

        Schema validation is not strict, so bootcmd attempts to shellify the
        invalid content.
        """
        invalid_config = {'bootcmd': 1}
        cc = self._get_cloud('ubuntu')
        with self.assertRaises(TypeError):
            cc_bootcmd.handle('cc_bootcmd', invalid_config, cc, LOG, [])
        self.assertIn(
            'Invalid config:\nbootcmd: 1 is not of type \'array\'',
            self.logs.getvalue())
        self.assertIn('Failed to shellify', self.logs.getvalue())

    @skipIf(_missing_jsonschema_dep, 'No python-jsonschema dependency')
    def test_handler_schema_validation_warns_non_array_item_type(self):
        """Schema validation warns of non-array or string bootcmd items.

        Schema validation is not strict, so bootcmd attempts to shellify the
        invalid content.
        """
        invalid_config = {
            'bootcmd': ['ls /', 20, ['wget', 'http://stuff/blah'], {'a': 'n'}]}
        cc = self._get_cloud('ubuntu')
        with self.assertRaises(TypeError) as context_manager:
            cc_bootcmd.handle('cc_bootcmd', invalid_config, cc, LOG, [])
        expected_warnings = [
            'bootcmd.1: 20 is not valid under any of the given schemas',
            'bootcmd.3: {\'a\': \'n\'} is not valid under any of the given'
            ' schema'
        ]
        logs = self.logs.getvalue()
        for warning in expected_warnings:
            self.assertIn(warning, logs)
        self.assertIn('Failed to shellify', logs)
        self.assertEqual(
            ("Unable to shellify type 'int'. Expected list, string, tuple. "
             "Got: 20"),
            str(context_manager.exception))

    def test_handler_creates_and_runs_bootcmd_script_with_instance_id(self):
        """Valid schema runs a bootcmd script with INSTANCE_ID in the env."""
        cc = self._get_cloud('ubuntu')
        out_file = self.tmp_path('bootcmd.out', self.new_root)
        my_id = "b6ea0f59-e27d-49c6-9f87-79f19765a425"
        valid_config = {'bootcmd': [
            'echo {0} $INSTANCE_ID > {1}'.format(my_id, out_file)]}

        with mock.patch(self._etmpfile_path, FakeExtendedTempFile):
            cc_bootcmd.handle('cc_bootcmd', valid_config, cc, LOG, [])
        self.assertEqual(my_id + ' iid-datasource-none\n',
                         util.load_file(out_file))

    def test_handler_runs_bootcmd_script_with_error(self):
        """When a valid script generates an error, that error is raised."""
        cc = self._get_cloud('ubuntu')
        valid_config = {'bootcmd': ['exit 1']}  # Script with error

        with mock.patch(self._etmpfile_path, FakeExtendedTempFile):
            with self.assertRaises(util.ProcessExecutionError) as ctxt_manager:
                cc_bootcmd.handle('does-not-matter', valid_config, cc, LOG, [])
        self.assertIn(
            'Unexpected error while running command.\n'
            "Command: ['/bin/sh',",
            str(ctxt_manager.exception))
        self.assertIn(
            'Failed to run bootcmd module does-not-matter',
            self.logs.getvalue())


# vi: ts=4 expandtab