1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
|
# This file is part of cloud-init. See LICENSE file for license information.
import logging
import os
import stat
from unittest.mock import patch
from cloudinit import helpers, subp, util
from cloudinit.config.cc_runcmd import handle, schema
from tests.unittests.helpers import (
CiTestCase,
FilesystemMockingTestCase,
SchemaTestCaseMixin,
skipUnlessJsonSchema,
)
from tests.unittests.util import get_cloud
LOG = logging.getLogger(__name__)
class TestRuncmd(FilesystemMockingTestCase):
with_logs = True
def setUp(self):
super(TestRuncmd, self).setUp()
self.subp = subp.subp
self.new_root = self.tmp_dir()
self.patchUtils(self.new_root)
self.paths = helpers.Paths({"scripts": self.new_root})
def test_handler_skip_if_no_runcmd(self):
"""When the provided config doesn't contain runcmd, skip it."""
cfg = {}
mycloud = get_cloud(paths=self.paths)
handle("notimportant", cfg, mycloud, LOG, None)
self.assertIn(
"Skipping module named notimportant, no 'runcmd' key",
self.logs.getvalue(),
)
@patch("cloudinit.util.shellify")
def test_runcmd_shellify_fails(self, cls):
"""When shellify fails throw exception"""
cls.side_effect = TypeError("patched shellify")
valid_config = {"runcmd": ["echo 42"]}
cc = get_cloud(paths=self.paths)
with self.assertRaises(TypeError) as cm:
with self.allow_subp(["/bin/sh"]):
handle("cc_runcmd", valid_config, cc, LOG, None)
self.assertIn("Failed to shellify", str(cm.exception))
def test_handler_invalid_command_set(self):
"""Commands which can't be converted to shell will raise errors."""
invalid_config = {"runcmd": 1}
cc = get_cloud(paths=self.paths)
with self.assertRaises(TypeError) as cm:
handle("cc_runcmd", invalid_config, cc, LOG, [])
self.assertIn(
"Failed to shellify 1 into file"
" /var/lib/cloud/instances/iid-datasource-none/scripts/runcmd",
str(cm.exception),
)
@skipUnlessJsonSchema()
def test_handler_schema_validation_warns_non_array_type(self):
"""Schema validation warns of non-array type for runcmd key.
Schema validation is not strict, so runcmd attempts to shellify the
invalid content.
"""
invalid_config = {"runcmd": 1}
cc = get_cloud(paths=self.paths)
with self.assertRaises(TypeError) as cm:
handle("cc_runcmd", invalid_config, cc, LOG, [])
self.assertIn(
"Invalid cloud-config provided:\nruncmd: 1 is not of type 'array'",
self.logs.getvalue(),
)
self.assertIn("Failed to shellify", str(cm.exception))
@skipUnlessJsonSchema()
def test_handler_schema_validation_warns_non_array_item_type(self):
"""Schema validation warns of non-array or string runcmd items.
Schema validation is not strict, so runcmd attempts to shellify the
invalid content.
"""
invalid_config = {
"runcmd": ["ls /", 20, ["wget", "http://stuff/blah"], {"a": "n"}]
}
cc = get_cloud(paths=self.paths)
with self.assertRaises(TypeError) as cm:
handle("cc_runcmd", invalid_config, cc, LOG, [])
expected_warnings = [
"runcmd.1: 20 is not valid under any of the given schemas",
"runcmd.3: {'a': 'n'} is not valid under any of the given schema",
]
logs = self.logs.getvalue()
for warning in expected_warnings:
self.assertIn(warning, logs)
self.assertIn("Failed to shellify", str(cm.exception))
def test_handler_write_valid_runcmd_schema_to_file(self):
"""Valid runcmd schema is written to a runcmd shell script."""
valid_config = {"runcmd": [["ls", "/"]]}
cc = get_cloud(paths=self.paths)
handle("cc_runcmd", valid_config, cc, LOG, [])
runcmd_file = os.path.join(
self.new_root,
"var/lib/cloud/instances/iid-datasource-none/scripts/runcmd",
)
self.assertEqual("#!/bin/sh\n'ls' '/'\n", util.load_file(runcmd_file))
file_stat = os.stat(runcmd_file)
self.assertEqual(0o700, stat.S_IMODE(file_stat.st_mode))
@skipUnlessJsonSchema()
class TestSchema(CiTestCase, SchemaTestCaseMixin):
"""Directly test schema rather than through handle."""
schema = schema
def test_duplicates_are_fine_array_array(self):
"""Duplicated commands array/array entries are allowed."""
self.assertSchemaValid(
[["echo", "bye"], ["echo", "bye"]],
"command entries can be duplicate.",
)
def test_duplicates_are_fine_array_string(self):
"""Duplicated commands array/string entries are allowed."""
self.assertSchemaValid(
["echo bye", "echo bye"], "command entries can be duplicate."
)
# vi: ts=4 expandtab
|