1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
|
# This file is part of cloud-init. See LICENSE file for license information.
from cloudinit.sources import DataSourceOracle as oracle
from cloudinit.sources import BrokenMetadata
from cloudinit import helpers
from cloudinit.tests import helpers as test_helpers
from textwrap import dedent
import argparse
import httpretty
import json
import mock
import os
import six
import uuid
DS_PATH = "cloudinit.sources.DataSourceOracle"
MD_VER = "2013-10-17"
class TestDataSourceOracle(test_helpers.CiTestCase):
"""Test datasource DataSourceOracle."""
ds_class = oracle.DataSourceOracle
my_uuid = str(uuid.uuid4())
my_md = {"uuid": "ocid1.instance.oc1.phx.abyhqlj",
"name": "ci-vm1", "availability_zone": "phx-ad-3",
"hostname": "ci-vm1hostname",
"launch_index": 0, "files": [],
"public_keys": {"0": "ssh-rsa AAAAB3N...== user@host"},
"meta": {}}
def _patch_instance(self, inst, patches):
"""Patch an instance of a class 'inst'.
for each name, kwargs in patches:
inst.name = mock.Mock(**kwargs)
returns a namespace object that has
namespace.name = mock.Mock(**kwargs)
Do not bother with cleanup as instance is assumed transient."""
mocks = argparse.Namespace()
for name, kwargs in patches.items():
imock = mock.Mock(name=name, spec=getattr(inst, name), **kwargs)
setattr(mocks, name, imock)
setattr(inst, name, imock)
return mocks
def _get_ds(self, sys_cfg=None, distro=None, paths=None, ud_proc=None,
patches=None):
if sys_cfg is None:
sys_cfg = {}
if patches is None:
patches = {}
if paths is None:
tmpd = self.tmp_dir()
dirs = {'cloud_dir': self.tmp_path('cloud_dir', tmpd),
'run_dir': self.tmp_path('run_dir')}
for d in dirs.values():
os.mkdir(d)
paths = helpers.Paths(dirs)
ds = self.ds_class(sys_cfg=sys_cfg, distro=distro,
paths=paths, ud_proc=ud_proc)
return ds, self._patch_instance(ds, patches)
def test_platform_not_viable_returns_false(self):
ds, mocks = self._get_ds(
patches={'_is_platform_viable': {'return_value': False}})
self.assertFalse(ds._get_data())
mocks._is_platform_viable.assert_called_once_with()
def test_platform_info(self):
"""Return platform-related information for Oracle Datasource."""
ds, _mocks = self._get_ds()
self.assertEqual('oracle', ds.cloud_name)
self.assertEqual('oracle', ds.platform_type)
self.assertEqual(
'metadata (http://169.254.169.254/openstack/)', ds.subplatform)
@mock.patch(DS_PATH + "._is_iscsi_root", return_value=True)
def test_without_userdata(self, m_is_iscsi_root):
"""If no user-data is provided, it should not be in return dict."""
ds, mocks = self._get_ds(patches={
'_is_platform_viable': {'return_value': True},
'crawl_metadata': {
'return_value': {
MD_VER: {'system_uuid': self.my_uuid,
'meta_data': self.my_md}}}})
self.assertTrue(ds._get_data())
mocks._is_platform_viable.assert_called_once_with()
mocks.crawl_metadata.assert_called_once_with()
self.assertEqual(self.my_uuid, ds.system_uuid)
self.assertEqual(self.my_md['availability_zone'], ds.availability_zone)
self.assertIn(self.my_md["public_keys"]["0"], ds.get_public_ssh_keys())
self.assertEqual(self.my_md['uuid'], ds.get_instance_id())
self.assertIsNone(ds.userdata_raw)
@mock.patch(DS_PATH + "._is_iscsi_root", return_value=True)
def test_with_vendordata(self, m_is_iscsi_root):
"""Test with vendor data."""
vd = {'cloud-init': '#cloud-config\nkey: value'}
ds, mocks = self._get_ds(patches={
'_is_platform_viable': {'return_value': True},
'crawl_metadata': {
'return_value': {
MD_VER: {'system_uuid': self.my_uuid,
'meta_data': self.my_md,
'vendor_data': vd}}}})
self.assertTrue(ds._get_data())
mocks._is_platform_viable.assert_called_once_with()
mocks.crawl_metadata.assert_called_once_with()
self.assertEqual(vd, ds.vendordata_pure)
self.assertEqual(vd['cloud-init'], ds.vendordata_raw)
@mock.patch(DS_PATH + "._is_iscsi_root", return_value=True)
def test_with_userdata(self, m_is_iscsi_root):
"""Ensure user-data is populated if present and is binary."""
my_userdata = b'abcdefg'
ds, mocks = self._get_ds(patches={
'_is_platform_viable': {'return_value': True},
'crawl_metadata': {
'return_value': {
MD_VER: {'system_uuid': self.my_uuid,
'meta_data': self.my_md,
'user_data': my_userdata}}}})
self.assertTrue(ds._get_data())
mocks._is_platform_viable.assert_called_once_with()
mocks.crawl_metadata.assert_called_once_with()
self.assertEqual(self.my_uuid, ds.system_uuid)
self.assertIn(self.my_md["public_keys"]["0"], ds.get_public_ssh_keys())
self.assertEqual(self.my_md['uuid'], ds.get_instance_id())
self.assertEqual(my_userdata, ds.userdata_raw)
@mock.patch(DS_PATH + ".cmdline.read_initramfs_config")
@mock.patch(DS_PATH + "._is_iscsi_root", return_value=True)
def test_network_cmdline(self, m_is_iscsi_root, m_initramfs_config):
"""network_config should read kernel cmdline."""
distro = mock.MagicMock()
ds, _ = self._get_ds(distro=distro, patches={
'_is_platform_viable': {'return_value': True},
'crawl_metadata': {
'return_value': {
MD_VER: {'system_uuid': self.my_uuid,
'meta_data': self.my_md}}}})
ncfg = {'version': 1, 'config': [{'a': 'b'}]}
m_initramfs_config.return_value = ncfg
self.assertTrue(ds._get_data())
self.assertEqual(ncfg, ds.network_config)
self.assertEqual([mock.call()], m_initramfs_config.call_args_list)
self.assertFalse(distro.generate_fallback_config.called)
@mock.patch(DS_PATH + ".cmdline.read_initramfs_config")
@mock.patch(DS_PATH + "._is_iscsi_root", return_value=True)
def test_network_fallback(self, m_is_iscsi_root, m_initramfs_config):
"""test that fallback network is generated if no kernel cmdline."""
distro = mock.MagicMock()
ds, _ = self._get_ds(distro=distro, patches={
'_is_platform_viable': {'return_value': True},
'crawl_metadata': {
'return_value': {
MD_VER: {'system_uuid': self.my_uuid,
'meta_data': self.my_md}}}})
ncfg = {'version': 1, 'config': [{'a': 'b'}]}
m_initramfs_config.return_value = None
self.assertTrue(ds._get_data())
ncfg = {'version': 1, 'config': [{'distro1': 'value'}]}
distro.generate_fallback_config.return_value = ncfg
self.assertEqual(ncfg, ds.network_config)
self.assertEqual([mock.call()], m_initramfs_config.call_args_list)
distro.generate_fallback_config.assert_called_once_with()
# test that the result got cached, and the methods not re-called.
self.assertEqual(ncfg, ds.network_config)
self.assertEqual(1, m_initramfs_config.call_count)
@mock.patch(DS_PATH + "._read_system_uuid", return_value=str(uuid.uuid4()))
class TestReadMetaData(test_helpers.HttprettyTestCase):
"""Test the read_metadata which interacts with http metadata service."""
mdurl = oracle.METADATA_ENDPOINT
my_md = {"uuid": "ocid1.instance.oc1.phx.abyhqlj",
"name": "ci-vm1", "availability_zone": "phx-ad-3",
"hostname": "ci-vm1hostname",
"launch_index": 0, "files": [],
"public_keys": {"0": "ssh-rsa AAAAB3N...== user@host"},
"meta": {}}
def populate_md(self, data):
"""call httppretty.register_url for each item dict 'data',
including valid indexes. Text values converted to bytes."""
httpretty.register_uri(
httpretty.GET, self.mdurl + MD_VER + "/",
'\n'.join(data.keys()).encode('utf-8'))
for k, v in data.items():
httpretty.register_uri(
httpretty.GET, self.mdurl + MD_VER + "/" + k,
v if not isinstance(v, six.text_type) else v.encode('utf-8'))
def test_broken_no_sys_uuid(self, m_read_system_uuid):
"""Datasource requires ability to read system_uuid and true return."""
m_read_system_uuid.return_value = None
self.assertRaises(BrokenMetadata, oracle.read_metadata)
def test_broken_no_metadata_json(self, m_read_system_uuid):
"""Datasource requires meta_data.json."""
httpretty.register_uri(
httpretty.GET, self.mdurl + MD_VER + "/",
'\n'.join(['user_data']).encode('utf-8'))
with self.assertRaises(BrokenMetadata) as cm:
oracle.read_metadata()
self.assertIn("Required field 'meta_data.json' missing",
str(cm.exception))
def test_with_userdata(self, m_read_system_uuid):
data = {'user_data': b'#!/bin/sh\necho hi world\n',
'meta_data.json': json.dumps(self.my_md)}
self.populate_md(data)
result = oracle.read_metadata()[MD_VER]
self.assertEqual(data['user_data'], result['user_data'])
self.assertEqual(self.my_md, result['meta_data'])
def test_without_userdata(self, m_read_system_uuid):
data = {'meta_data.json': json.dumps(self.my_md)}
self.populate_md(data)
result = oracle.read_metadata()[MD_VER]
self.assertNotIn('user_data', result)
self.assertEqual(self.my_md, result['meta_data'])
def test_unknown_fields_included(self, m_read_system_uuid):
"""Unknown fields listed in index should be included.
And those ending in .json should be decoded."""
some_data = {'key1': 'data1', 'subk1': {'subd1': 'subv'}}
some_vendor_data = {'cloud-init': 'foo'}
data = {'meta_data.json': json.dumps(self.my_md),
'some_data.json': json.dumps(some_data),
'vendor_data.json': json.dumps(some_vendor_data),
'other_blob': b'this is blob'}
self.populate_md(data)
result = oracle.read_metadata()[MD_VER]
self.assertNotIn('user_data', result)
self.assertEqual(self.my_md, result['meta_data'])
self.assertEqual(some_data, result['some_data'])
self.assertEqual(some_vendor_data, result['vendor_data'])
self.assertEqual(data['other_blob'], result['other_blob'])
class TestIsPlatformViable(test_helpers.CiTestCase):
@mock.patch(DS_PATH + ".util.read_dmi_data",
return_value=oracle.CHASSIS_ASSET_TAG)
def test_expected_viable(self, m_read_dmi_data):
"""System with known chassis tag is viable."""
self.assertTrue(oracle._is_platform_viable())
m_read_dmi_data.assert_has_calls([mock.call('chassis-asset-tag')])
@mock.patch(DS_PATH + ".util.read_dmi_data", return_value=None)
def test_expected_not_viable_dmi_data_none(self, m_read_dmi_data):
"""System without known chassis tag is not viable."""
self.assertFalse(oracle._is_platform_viable())
m_read_dmi_data.assert_has_calls([mock.call('chassis-asset-tag')])
@mock.patch(DS_PATH + ".util.read_dmi_data", return_value="LetsGoCubs")
def test_expected_not_viable_other(self, m_read_dmi_data):
"""System with unnown chassis tag is not viable."""
self.assertFalse(oracle._is_platform_viable())
m_read_dmi_data.assert_has_calls([mock.call('chassis-asset-tag')])
class TestLoadIndex(test_helpers.CiTestCase):
"""_load_index handles parsing of an index into a proper list.
The tests here guarantee correct parsing of html version or
a fixed version. See the function docstring for more doc."""
_known_html_api_versions = dedent("""\
<html>
<head><title>Index of /openstack/</title></head>
<body bgcolor="white">
<h1>Index of /openstack/</h1><hr><pre><a href="../">../</a>
<a href="2013-10-17/">2013-10-17/</a> 27-Jun-2018 12:22 -
<a href="latest/">latest/</a> 27-Jun-2018 12:22 -
</pre><hr></body>
</html>""")
_known_html_contents = dedent("""\
<html>
<head><title>Index of /openstack/2013-10-17/</title></head>
<body bgcolor="white">
<h1>Index of /openstack/2013-10-17/</h1><hr><pre><a href="../">../</a>
<a href="meta_data.json">meta_data.json</a> 27-Jun-2018 12:22 679
<a href="user_data">user_data</a> 27-Jun-2018 12:22 146
</pre><hr></body>
</html>""")
def test_parse_html(self):
"""Test parsing of lower case html."""
self.assertEqual(
['2013-10-17/', 'latest/'],
oracle._load_index(self._known_html_api_versions))
self.assertEqual(
['meta_data.json', 'user_data'],
oracle._load_index(self._known_html_contents))
def test_parse_html_upper(self):
"""Test parsing of upper case html, although known content is lower."""
def _toupper(data):
return data.replace("<a", "<A").replace("html>", "HTML>")
self.assertEqual(
['2013-10-17/', 'latest/'],
oracle._load_index(_toupper(self._known_html_api_versions)))
self.assertEqual(
['meta_data.json', 'user_data'],
oracle._load_index(_toupper(self._known_html_contents)))
def test_parse_newline_list_with_endl(self):
"""Test parsing of newline separated list with ending newline."""
self.assertEqual(
['2013-10-17/', 'latest/'],
oracle._load_index("\n".join(["2013-10-17/", "latest/", ""])))
self.assertEqual(
['meta_data.json', 'user_data'],
oracle._load_index("\n".join(["meta_data.json", "user_data", ""])))
def test_parse_newline_list_without_endl(self):
"""Test parsing of newline separated list with no ending newline.
Actual openstack implementation does not include trailing newline."""
self.assertEqual(
['2013-10-17/', 'latest/'],
oracle._load_index("\n".join(["2013-10-17/", "latest/"])))
self.assertEqual(
['meta_data.json', 'user_data'],
oracle._load_index("\n".join(["meta_data.json", "user_data"])))
# vi: ts=4 expandtab
|