1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
|
# This file is part of cloud-init. See LICENSE file for license information.
from cloudinit.config import cc_bootcmd
from cloudinit.sources import DataSourceNone
from cloudinit import (distros, helpers, cloud, util)
from cloudinit.tests.helpers import CiTestCase, mock, skipIf
import logging
import tempfile
try:
import jsonschema
assert jsonschema # avoid pyflakes error F401: import unused
_missing_jsonschema_dep = False
except ImportError:
_missing_jsonschema_dep = True
LOG = logging.getLogger(__name__)
class FakeExtendedTempFile(object):
def __init__(self, suffix):
self.suffix = suffix
self.handle = tempfile.NamedTemporaryFile(
prefix="ci-%s." % self.__class__.__name__, delete=False)
def __enter__(self):
return self.handle
def __exit__(self, exc_type, exc_value, traceback):
self.handle.close()
class TestBootcmd(CiTestCase):
with_logs = True
_etmpfile_path = ('cloudinit.config.cc_bootcmd.temp_utils.'
'ExtendedTemporaryFile')
def setUp(self):
super(TestBootcmd, self).setUp()
self.subp = util.subp
self.new_root = self.tmp_dir()
def _get_cloud(self, distro):
paths = helpers.Paths({})
cls = distros.fetch(distro)
mydist = cls(distro, {}, paths)
myds = DataSourceNone.DataSourceNone({}, mydist, paths)
paths.datasource = myds
return cloud.Cloud(myds, paths, {}, mydist, None)
def test_handler_skip_if_no_bootcmd(self):
"""When the provided config doesn't contain bootcmd, skip it."""
cfg = {}
mycloud = self._get_cloud('ubuntu')
cc_bootcmd.handle('notimportant', cfg, mycloud, LOG, None)
self.assertIn(
"Skipping module named notimportant, no 'bootcmd' key",
self.logs.getvalue())
def test_handler_invalid_command_set(self):
"""Commands which can't be converted to shell will raise errors."""
invalid_config = {'bootcmd': 1}
cc = self._get_cloud('ubuntu')
with self.assertRaises(TypeError) as context_manager:
cc_bootcmd.handle('cc_bootcmd', invalid_config, cc, LOG, [])
self.assertIn('Failed to shellify bootcmd', self.logs.getvalue())
self.assertEqual(
"'int' object is not iterable",
str(context_manager.exception))
@skipIf(_missing_jsonschema_dep, "No python-jsonschema dependency")
def test_handler_schema_validation_warns_non_array_type(self):
"""Schema validation warns of non-array type for bootcmd key.
Schema validation is not strict, so bootcmd attempts to shellify the
invalid content.
"""
invalid_config = {'bootcmd': 1}
cc = self._get_cloud('ubuntu')
with self.assertRaises(TypeError):
cc_bootcmd.handle('cc_bootcmd', invalid_config, cc, LOG, [])
self.assertIn(
'Invalid config:\nbootcmd: 1 is not of type \'array\'',
self.logs.getvalue())
self.assertIn('Failed to shellify', self.logs.getvalue())
@skipIf(_missing_jsonschema_dep, 'No python-jsonschema dependency')
def test_handler_schema_validation_warns_non_array_item_type(self):
"""Schema validation warns of non-array or string bootcmd items.
Schema validation is not strict, so bootcmd attempts to shellify the
invalid content.
"""
invalid_config = {
'bootcmd': ['ls /', 20, ['wget', 'http://stuff/blah'], {'a': 'n'}]}
cc = self._get_cloud('ubuntu')
with self.assertRaises(RuntimeError) as context_manager:
cc_bootcmd.handle('cc_bootcmd', invalid_config, cc, LOG, [])
expected_warnings = [
'bootcmd.1: 20 is not valid under any of the given schemas',
'bootcmd.3: {\'a\': \'n\'} is not valid under any of the given'
' schema'
]
logs = self.logs.getvalue()
for warning in expected_warnings:
self.assertIn(warning, logs)
self.assertIn('Failed to shellify', logs)
self.assertEqual(
'Unable to shellify type int which is not a list or string',
str(context_manager.exception))
def test_handler_creates_and_runs_bootcmd_script_with_instance_id(self):
"""Valid schema runs a bootcmd script with INSTANCE_ID in the env."""
cc = self._get_cloud('ubuntu')
out_file = self.tmp_path('bootcmd.out', self.new_root)
my_id = "b6ea0f59-e27d-49c6-9f87-79f19765a425"
valid_config = {'bootcmd': [
'echo {0} $INSTANCE_ID > {1}'.format(my_id, out_file)]}
with mock.patch(self._etmpfile_path, FakeExtendedTempFile):
cc_bootcmd.handle('cc_bootcmd', valid_config, cc, LOG, [])
self.assertEqual(my_id + ' iid-datasource-none\n',
util.load_file(out_file))
def test_handler_runs_bootcmd_script_with_error(self):
"""When a valid script generates an error, that error is raised."""
cc = self._get_cloud('ubuntu')
valid_config = {'bootcmd': ['exit 1']} # Script with error
with mock.patch(self._etmpfile_path, FakeExtendedTempFile):
with self.assertRaises(util.ProcessExecutionError) as ctxt_manager:
cc_bootcmd.handle('does-not-matter', valid_config, cc, LOG, [])
self.assertIn(
'Unexpected error while running command.\n'
"Command: ['/bin/sh',",
str(ctxt_manager.exception))
self.assertIn(
'Failed to run bootcmd module does-not-matter',
self.logs.getvalue())
# vi: ts=4 expandtab
|