1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
|
from unittest import TestCase
from mocker import MockerTestCase
from tempfile import mkdtemp
from shutil import rmtree
import os
import stat
from cloudinit.util import mergedict, get_cfg_option_list_or_str, write_file
class TestMergeDict(TestCase):
def test_simple_merge(self):
"""Test simple non-conflict merge."""
source = {"key1": "value1"}
candidate = {"key2": "value2"}
result = mergedict(source, candidate)
self.assertEqual({"key1": "value1", "key2": "value2"}, result)
def test_nested_merge(self):
"""Test nested merge."""
source = {"key1": {"key1.1": "value1.1"}}
candidate = {"key1": {"key1.2": "value1.2"}}
result = mergedict(source, candidate)
self.assertEqual(
{"key1": {"key1.1": "value1.1", "key1.2": "value1.2"}}, result)
def test_merge_does_not_override(self):
"""Test that candidate doesn't override source."""
source = {"key1": "value1", "key2": "value2"}
candidate = {"key2": "value2", "key2": "NEW VALUE"}
result = mergedict(source, candidate)
self.assertEqual(source, result)
def test_empty_candidate(self):
"""Test empty candidate doesn't change source."""
source = {"key": "value"}
candidate = {}
result = mergedict(source, candidate)
self.assertEqual(source, result)
def test_empty_source(self):
"""Test empty source is replaced by candidate."""
source = {}
candidate = {"key": "value"}
result = mergedict(source, candidate)
self.assertEqual(candidate, result)
def test_non_dict_candidate(self):
"""Test non-dict candidate is discarded."""
source = {"key": "value"}
candidate = "not a dict"
result = mergedict(source, candidate)
self.assertEqual(source, result)
def test_non_dict_source(self):
"""Test non-dict source is not modified with a dict candidate."""
source = "not a dict"
candidate = {"key": "value"}
result = mergedict(source, candidate)
self.assertEqual(source, result)
def test_neither_dict(self):
"""Test if neither candidate or source is dict source wins."""
source = "source"
candidate = "candidate"
result = mergedict(source, candidate)
self.assertEqual(source, result)
class TestGetCfgOptionListOrStr(TestCase):
def test_not_found_no_default(self):
"""None is returned if key is not found and no default given."""
config = {}
result = get_cfg_option_list_or_str(config, "key")
self.assertIsNone(result)
def test_not_found_with_default(self):
"""Default is returned if key is not found."""
config = {}
result = get_cfg_option_list_or_str(config, "key", default=["DEFAULT"])
self.assertEqual(["DEFAULT"], result)
def test_found_with_default(self):
"""Default is not returned if key is found."""
config = {"key": ["value1"]}
result = get_cfg_option_list_or_str(config, "key", default=["DEFAULT"])
self.assertEqual(["value1"], result)
def test_found_convert_to_list(self):
"""Single string is converted to one element list."""
config = {"key": "value1"}
result = get_cfg_option_list_or_str(config, "key")
self.assertEqual(["value1"], result)
def test_value_is_none(self):
"""If value is None empty list is returned."""
config = {"key": None}
result = get_cfg_option_list_or_str(config, "key")
self.assertEqual([], result)
class TestWriteFile(MockerTestCase):
def setUp(self):
super(TestWriteFile, self).setUp()
# Make a temp directoy for tests to use.
self.tmp = mkdtemp(prefix="unittest_")
def tearDown(self):
super(TestWriteFile, self).tearDown()
# Clean up temp directory
rmtree(self.tmp)
def test_basic_usage(self):
"""Verify basic usage with default args."""
path = os.path.join(self.tmp, "NewFile.txt")
contents = "Hey there"
write_file(path, contents)
self.assertTrue(os.path.exists(path))
self.assertTrue(os.path.isfile(path))
with open(path) as f:
create_contents = f.read()
self.assertEqual(contents, create_contents)
file_stat = os.stat(path)
self.assertEqual(0644, stat.S_IMODE(file_stat.st_mode))
def test_dir_is_created_if_required(self):
"""Verifiy that directories are created is required."""
dirname = os.path.join(self.tmp, "subdir")
path = os.path.join(dirname, "NewFile.txt")
contents = "Hey there"
write_file(path, contents)
self.assertTrue(os.path.isdir(dirname))
self.assertTrue(os.path.isfile(path))
def test_custom_mode(self):
"""Verify custom mode works properly."""
path = os.path.join(self.tmp, "NewFile.txt")
contents = "Hey there"
write_file(path, contents, mode=0666)
self.assertTrue(os.path.exists(path))
self.assertTrue(os.path.isfile(path))
file_stat = os.stat(path)
self.assertEqual(0666, stat.S_IMODE(file_stat.st_mode))
def test_custom_omode(self):
"""Verify custom omode works properly."""
path = os.path.join(self.tmp, "NewFile.txt")
contents = "Hey there"
# Create file first with basic content
with open(path, "wb") as f:
f.write("LINE1\n")
write_file(path, contents, omode="a")
self.assertTrue(os.path.exists(path))
self.assertTrue(os.path.isfile(path))
with open(path) as f:
create_contents = f.read()
self.assertEqual("LINE1\nHey there", create_contents)
def test_restorecon_if_possible_is_called(self):
"""Make sure the restorecon_if_possible is called correctly."""
path = os.path.join(self.tmp, "NewFile.txt")
contents = "Hey there"
# Mock out the restorecon_if_possible call to test if it's called.
mock_restorecon = self.mocker.replace(
"cloudinit.util.restorecon_if_possible", passthrough=False)
mock_restorecon(path)
self.mocker.replay()
write_file(path, contents)
|