1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
|
# This file is part of cloud-init. See LICENSE file for license information.
import logging
import os
import stat
from unittest.mock import patch
from cloudinit.config.cc_runcmd import handle, schema
from cloudinit import (helpers, subp, util)
from cloudinit.tests.helpers import (
CiTestCase, FilesystemMockingTestCase, SchemaTestCaseMixin,
skipUnlessJsonSchema)
from tests.unittests.util import get_cloud
LOG = logging.getLogger(__name__)
class TestRuncmd(FilesystemMockingTestCase):
with_logs = True
def setUp(self):
super(TestRuncmd, self).setUp()
self.subp = subp.subp
self.new_root = self.tmp_dir()
self.patchUtils(self.new_root)
self.paths = helpers.Paths({'scripts': self.new_root})
def test_handler_skip_if_no_runcmd(self):
"""When the provided config doesn't contain runcmd, skip it."""
cfg = {}
mycloud = get_cloud(paths=self.paths)
handle('notimportant', cfg, mycloud, LOG, None)
self.assertIn(
"Skipping module named notimportant, no 'runcmd' key",
self.logs.getvalue())
@patch('cloudinit.util.shellify')
def test_runcmd_shellify_fails(self, cls):
"""When shellify fails throw exception"""
cls.side_effect = TypeError("patched shellify")
valid_config = {'runcmd': ['echo 42']}
cc = get_cloud(paths=self.paths)
with self.assertRaises(TypeError) as cm:
with self.allow_subp(['/bin/sh']):
handle('cc_runcmd', valid_config, cc, LOG, None)
self.assertIn("Failed to shellify", str(cm.exception))
def test_handler_invalid_command_set(self):
"""Commands which can't be converted to shell will raise errors."""
invalid_config = {'runcmd': 1}
cc = get_cloud(paths=self.paths)
with self.assertRaises(TypeError) as cm:
handle('cc_runcmd', invalid_config, cc, LOG, [])
self.assertIn(
'Failed to shellify 1 into file'
' /var/lib/cloud/instances/iid-datasource-none/scripts/runcmd',
str(cm.exception))
@skipUnlessJsonSchema()
def test_handler_schema_validation_warns_non_array_type(self):
"""Schema validation warns of non-array type for runcmd key.
Schema validation is not strict, so runcmd attempts to shellify the
invalid content.
"""
invalid_config = {'runcmd': 1}
cc = get_cloud(paths=self.paths)
with self.assertRaises(TypeError) as cm:
handle('cc_runcmd', invalid_config, cc, LOG, [])
self.assertIn(
'Invalid config:\nruncmd: 1 is not of type \'array\'',
self.logs.getvalue())
self.assertIn('Failed to shellify', str(cm.exception))
@skipUnlessJsonSchema()
def test_handler_schema_validation_warns_non_array_item_type(self):
"""Schema validation warns of non-array or string runcmd items.
Schema validation is not strict, so runcmd attempts to shellify the
invalid content.
"""
invalid_config = {
'runcmd': ['ls /', 20, ['wget', 'http://stuff/blah'], {'a': 'n'}]}
cc = get_cloud(paths=self.paths)
with self.assertRaises(TypeError) as cm:
handle('cc_runcmd', invalid_config, cc, LOG, [])
expected_warnings = [
'runcmd.1: 20 is not valid under any of the given schemas',
'runcmd.3: {\'a\': \'n\'} is not valid under any of the given'
' schema'
]
logs = self.logs.getvalue()
for warning in expected_warnings:
self.assertIn(warning, logs)
self.assertIn('Failed to shellify', str(cm.exception))
def test_handler_write_valid_runcmd_schema_to_file(self):
"""Valid runcmd schema is written to a runcmd shell script."""
valid_config = {'runcmd': [['ls', '/']]}
cc = get_cloud(paths=self.paths)
handle('cc_runcmd', valid_config, cc, LOG, [])
runcmd_file = os.path.join(
self.new_root,
'var/lib/cloud/instances/iid-datasource-none/scripts/runcmd')
self.assertEqual("#!/bin/sh\n'ls' '/'\n", util.load_file(runcmd_file))
file_stat = os.stat(runcmd_file)
self.assertEqual(0o700, stat.S_IMODE(file_stat.st_mode))
@skipUnlessJsonSchema()
class TestSchema(CiTestCase, SchemaTestCaseMixin):
"""Directly test schema rather than through handle."""
schema = schema
def test_duplicates_are_fine_array_array(self):
"""Duplicated commands array/array entries are allowed."""
self.assertSchemaValid(
[["echo", "bye"], ["echo", "bye"]],
"command entries can be duplicate.")
def test_duplicates_are_fine_array_string(self):
"""Duplicated commands array/string entries are allowed."""
self.assertSchemaValid(
["echo bye", "echo bye"],
"command entries can be duplicate.")
# vi: ts=4 expandtab
|