summaryrefslogtreecommitdiff
path: root/tests
diff options
context:
space:
mode:
Diffstat (limited to 'tests')
-rw-r--r--tests/data/vmware/cust-dhcp-2nic.cfg34
-rw-r--r--tests/data/vmware/cust-static-2nic.cfg39
-rw-r--r--tests/unittests/test_data.py5
-rw-r--r--tests/unittests/test_datasource/test_altcloud.py23
-rw-r--r--tests/unittests/test_datasource/test_azure.py142
-rw-r--r--tests/unittests/test_datasource/test_azure_helper.py120
-rw-r--r--tests/unittests/test_datasource/test_configdrive.py81
-rw-r--r--tests/unittests/test_datasource/test_maas.py18
-rw-r--r--tests/unittests/test_datasource/test_nocloud.py2
-rw-r--r--tests/unittests/test_datasource/test_opennebula.py2
-rw-r--r--tests/unittests/test_datasource/test_smartos.py84
-rw-r--r--tests/unittests/test_distros/test_netconfig.py5
-rw-r--r--tests/unittests/test_distros/test_user_data_normalize.py29
-rw-r--r--tests/unittests/test_handler/test_handler_lxd.py75
-rw-r--r--tests/unittests/test_handler/test_handler_power_state.py47
-rw-r--r--tests/unittests/test_handler/test_handler_seed_random.py13
-rw-r--r--tests/unittests/test_handler/test_handler_snappy.py3
-rw-r--r--tests/unittests/test_handler/test_handler_write_files.py112
-rw-r--r--tests/unittests/test_net.py127
-rw-r--r--tests/unittests/test_registry.py28
-rw-r--r--tests/unittests/test_reporting.py369
-rw-r--r--tests/unittests/test_sshutil.py3
-rw-r--r--tests/unittests/test_templating.py3
-rw-r--r--tests/unittests/test_util.py10
-rw-r--r--tests/unittests/test_vmware_config_file.py103
25 files changed, 1228 insertions, 249 deletions
diff --git a/tests/data/vmware/cust-dhcp-2nic.cfg b/tests/data/vmware/cust-dhcp-2nic.cfg
new file mode 100644
index 00000000..f687311a
--- /dev/null
+++ b/tests/data/vmware/cust-dhcp-2nic.cfg
@@ -0,0 +1,34 @@
+[NETWORK]
+NETWORKING = yes
+BOOTPROTO = dhcp
+HOSTNAME = myhost1
+DOMAINNAME = eng.vmware.com
+
+[NIC-CONFIG]
+NICS = NIC1,NIC2
+
+[NIC1]
+MACADDR = 00:50:56:a6:8c:08
+ONBOOT = yes
+IPv4_MODE = BACKWARDS_COMPATIBLE
+BOOTPROTO = dhcp
+
+[NIC2]
+MACADDR = 00:50:56:a6:5a:de
+ONBOOT = yes
+IPv4_MODE = BACKWARDS_COMPATIBLE
+BOOTPROTO = dhcp
+
+# some random comment
+
+[PASSWORD]
+# secret
+-PASS = c2VjcmV0Cg==
+
+[DNS]
+DNSFROMDHCP=yes
+SUFFIX|1 = eng.vmware.com
+
+[DATETIME]
+TIMEZONE = Africa/Abidjan
+UTC = yes
diff --git a/tests/data/vmware/cust-static-2nic.cfg b/tests/data/vmware/cust-static-2nic.cfg
new file mode 100644
index 00000000..0d80c2c4
--- /dev/null
+++ b/tests/data/vmware/cust-static-2nic.cfg
@@ -0,0 +1,39 @@
+[NETWORK]
+NETWORKING = yes
+BOOTPROTO = dhcp
+HOSTNAME = myhost1
+DOMAINNAME = eng.vmware.com
+
+[NIC-CONFIG]
+NICS = NIC1,NIC2
+
+[NIC1]
+MACADDR = 00:50:56:a6:8c:08
+ONBOOT = yes
+IPv4_MODE = BACKWARDS_COMPATIBLE
+BOOTPROTO = static
+IPADDR = 10.20.87.154
+NETMASK = 255.255.252.0
+GATEWAY = 10.20.87.253, 10.20.87.105
+IPv6ADDR|1 = fc00:10:20:87::154
+IPv6NETMASK|1 = 64
+IPv6GATEWAY|1 = fc00:10:20:87::253
+[NIC2]
+MACADDR = 00:50:56:a6:ef:7d
+ONBOOT = yes
+IPv4_MODE = BACKWARDS_COMPATIBLE
+BOOTPROTO = static
+IPADDR = 192.168.6.102
+NETMASK = 255.255.0.0
+GATEWAY = 192.168.0.10
+
+[DNS]
+DNSFROMDHCP=no
+SUFFIX|1 = eng.vmware.com
+SUFFIX|2 = proxy.vmware.com
+NAMESERVER|1 = 10.20.145.1
+NAMESERVER|2 = 10.20.145.2
+
+[DATETIME]
+TIMEZONE = Africa/Abidjan
+UTC = yes
diff --git a/tests/unittests/test_data.py b/tests/unittests/test_data.py
index c603bfdb..9c1ec1d4 100644
--- a/tests/unittests/test_data.py
+++ b/tests/unittests/test_data.py
@@ -27,11 +27,12 @@ from cloudinit import stages
from cloudinit import user_data as ud
from cloudinit import util
-INSTANCE_ID = "i-testing"
-
from . import helpers
+INSTANCE_ID = "i-testing"
+
+
class FakeDataSource(sources.DataSource):
def __init__(self, userdata=None, vendordata=None):
diff --git a/tests/unittests/test_datasource/test_altcloud.py b/tests/unittests/test_datasource/test_altcloud.py
index e9cd2fa5..85759c68 100644
--- a/tests/unittests/test_datasource/test_altcloud.py
+++ b/tests/unittests/test_datasource/test_altcloud.py
@@ -134,8 +134,7 @@ class TestGetCloudType(TestCase):
'''
util.read_dmi_data = _dmi_data('RHEV')
dsrc = DataSourceAltCloud({}, None, self.paths)
- self.assertEquals('RHEV', \
- dsrc.get_cloud_type())
+ self.assertEquals('RHEV', dsrc.get_cloud_type())
def test_vsphere(self):
'''
@@ -144,8 +143,7 @@ class TestGetCloudType(TestCase):
'''
util.read_dmi_data = _dmi_data('VMware Virtual Platform')
dsrc = DataSourceAltCloud({}, None, self.paths)
- self.assertEquals('VSPHERE', \
- dsrc.get_cloud_type())
+ self.assertEquals('VSPHERE', dsrc.get_cloud_type())
def test_unknown(self):
'''
@@ -154,8 +152,7 @@ class TestGetCloudType(TestCase):
'''
util.read_dmi_data = _dmi_data('Unrecognized Platform')
dsrc = DataSourceAltCloud({}, None, self.paths)
- self.assertEquals('UNKNOWN', \
- dsrc.get_cloud_type())
+ self.assertEquals('UNKNOWN', dsrc.get_cloud_type())
class TestGetDataCloudInfoFile(TestCase):
@@ -412,27 +409,27 @@ class TestReadUserDataCallback(TestCase):
'''Test read_user_data_callback() with both files.'''
self.assertEquals('test user data',
- read_user_data_callback(self.mount_dir))
+ read_user_data_callback(self.mount_dir))
def test_callback_dc(self):
'''Test read_user_data_callback() with only DC file.'''
_remove_user_data_files(self.mount_dir,
- dc_file=False,
- non_dc_file=True)
+ dc_file=False,
+ non_dc_file=True)
self.assertEquals('test user data',
- read_user_data_callback(self.mount_dir))
+ read_user_data_callback(self.mount_dir))
def test_callback_non_dc(self):
'''Test read_user_data_callback() with only non-DC file.'''
_remove_user_data_files(self.mount_dir,
- dc_file=True,
- non_dc_file=False)
+ dc_file=True,
+ non_dc_file=False)
self.assertEquals('test user data',
- read_user_data_callback(self.mount_dir))
+ read_user_data_callback(self.mount_dir))
def test_callback_none(self):
'''Test read_user_data_callback() no files are found.'''
diff --git a/tests/unittests/test_datasource/test_azure.py b/tests/unittests/test_datasource/test_azure.py
index 8952374f..444e2799 100644
--- a/tests/unittests/test_datasource/test_azure.py
+++ b/tests/unittests/test_datasource/test_azure.py
@@ -54,10 +54,13 @@ def construct_valid_ovf_env(data=None, pubkeys=None, userdata=None):
if pubkeys:
content += "<SSH><PublicKeys>\n"
- for fp, path in pubkeys:
+ for fp, path, value in pubkeys:
content += " <PublicKey>"
- content += ("<Fingerprint>%s</Fingerprint><Path>%s</Path>" %
- (fp, path))
+ if fp and path:
+ content += ("<Fingerprint>%s</Fingerprint><Path>%s</Path>" %
+ (fp, path))
+ if value:
+ content += "<Value>%s</Value>" % value
content += "</PublicKey>\n"
content += "</PublicKeys></SSH>"
content += """
@@ -112,10 +115,6 @@ class TestAzureDataSource(TestCase):
data['pubkey_files'] = flist
return ["pubkey_from: %s" % f for f in flist]
- def _iid_from_shared_config(path):
- data['iid_from_shared_cfg'] = path
- return 'i-my-azure-id'
-
if data.get('ovfcontent') is not None:
populate_dir(os.path.join(self.paths.seed_dir, "azure"),
{'ovf-env.xml': data['ovfcontent']})
@@ -124,20 +123,22 @@ class TestAzureDataSource(TestCase):
mod.BUILTIN_DS_CONFIG['data_dir'] = self.waagent_d
self.get_metadata_from_fabric = mock.MagicMock(return_value={
- 'instance-id': 'i-my-azure-id',
'public-keys': [],
})
+ self.instance_id = 'test-instance-id'
+
self.apply_patches([
(mod, 'list_possible_azure_ds_devs', dsdevs),
(mod, 'invoke_agent', _invoke_agent),
(mod, 'wait_for_files', _wait_for_files),
(mod, 'pubkeys_from_crt_files', _pubkeys_from_crt_files),
- (mod, 'iid_from_shared_config', _iid_from_shared_config),
(mod, 'perform_hostname_bounce', mock.MagicMock()),
(mod, 'get_hostname', mock.MagicMock()),
(mod, 'set_hostname', mock.MagicMock()),
(mod, 'get_metadata_from_fabric', self.get_metadata_from_fabric),
+ (mod.util, 'read_dmi_data', mock.MagicMock(
+ return_value=self.instance_id)),
])
dsrc = mod.DataSourceAzureNet(
@@ -190,7 +191,6 @@ class TestAzureDataSource(TestCase):
self.assertEqual(dsrc.metadata['local-hostname'], odata['HostName'])
self.assertTrue(os.path.isfile(
os.path.join(self.waagent_d, 'ovf-env.xml')))
- self.assertEqual(dsrc.metadata['instance-id'], 'i-my-azure-id')
def test_waagent_d_has_0700_perms(self):
# we expect /var/lib/waagent to be created 0700
@@ -207,7 +207,7 @@ class TestAzureDataSource(TestCase):
yaml_cfg = "{agent_command: my_command}\n"
cfg = yaml.safe_load(yaml_cfg)
odata = {'HostName': "myhost", 'UserName': "myuser",
- 'dscfg': {'text': yaml_cfg, 'encoding': 'plain'}}
+ 'dscfg': {'text': yaml_cfg, 'encoding': 'plain'}}
data = {'ovfcontent': construct_valid_ovf_env(data=odata)}
dsrc = self._get_ds(data)
@@ -219,8 +219,8 @@ class TestAzureDataSource(TestCase):
# set dscfg in via base64 encoded yaml
cfg = {'agent_command': "my_command"}
odata = {'HostName': "myhost", 'UserName': "myuser",
- 'dscfg': {'text': b64e(yaml.dump(cfg)),
- 'encoding': 'base64'}}
+ 'dscfg': {'text': b64e(yaml.dump(cfg)),
+ 'encoding': 'base64'}}
data = {'ovfcontent': construct_valid_ovf_env(data=odata)}
dsrc = self._get_ds(data)
@@ -267,7 +267,8 @@ class TestAzureDataSource(TestCase):
# should equal that after the '$'
pos = defuser['passwd'].rfind("$") + 1
self.assertEqual(defuser['passwd'],
- crypt.crypt(odata['UserPassword'], defuser['passwd'][0:pos]))
+ crypt.crypt(odata['UserPassword'],
+ defuser['passwd'][0:pos]))
def test_userdata_plain(self):
mydata = "FOOBAR"
@@ -297,18 +298,50 @@ class TestAzureDataSource(TestCase):
self.assertFalse(ret)
self.assertFalse('agent_invoked' in data)
- def test_cfg_has_pubkeys(self):
+ def test_cfg_has_pubkeys_fingerprint(self):
+ odata = {'HostName': "myhost", 'UserName': "myuser"}
+ mypklist = [{'fingerprint': 'fp1', 'path': 'path1', 'value': ''}]
+ pubkeys = [(x['fingerprint'], x['path'], x['value']) for x in mypklist]
+ data = {'ovfcontent': construct_valid_ovf_env(data=odata,
+ pubkeys=pubkeys)}
+
+ dsrc = self._get_ds(data)
+ ret = dsrc.get_data()
+ self.assertTrue(ret)
+ for mypk in mypklist:
+ self.assertIn(mypk, dsrc.cfg['_pubkeys'])
+ self.assertIn('pubkey_from', dsrc.metadata['public-keys'][-1])
+
+ def test_cfg_has_pubkeys_value(self):
+ # make sure that provided key is used over fingerprint
odata = {'HostName': "myhost", 'UserName': "myuser"}
- mypklist = [{'fingerprint': 'fp1', 'path': 'path1'}]
- pubkeys = [(x['fingerprint'], x['path']) for x in mypklist]
+ mypklist = [{'fingerprint': 'fp1', 'path': 'path1', 'value': 'value1'}]
+ pubkeys = [(x['fingerprint'], x['path'], x['value']) for x in mypklist]
data = {'ovfcontent': construct_valid_ovf_env(data=odata,
pubkeys=pubkeys)}
dsrc = self._get_ds(data)
ret = dsrc.get_data()
self.assertTrue(ret)
+
for mypk in mypklist:
self.assertIn(mypk, dsrc.cfg['_pubkeys'])
+ self.assertIn(mypk['value'], dsrc.metadata['public-keys'])
+
+ def test_cfg_has_no_fingerprint_has_value(self):
+ # test value is used when fingerprint not provided
+ odata = {'HostName': "myhost", 'UserName': "myuser"}
+ mypklist = [{'fingerprint': None, 'path': 'path1', 'value': 'value1'}]
+ pubkeys = [(x['fingerprint'], x['path'], x['value']) for x in mypklist]
+ data = {'ovfcontent': construct_valid_ovf_env(data=odata,
+ pubkeys=pubkeys)}
+
+ dsrc = self._get_ds(data)
+ ret = dsrc.get_data()
+ self.assertTrue(ret)
+
+ for mypk in mypklist:
+ self.assertIn(mypk['value'], dsrc.metadata['public-keys'])
def test_default_ephemeral(self):
# make sure the ephemeral device works
@@ -332,8 +365,8 @@ class TestAzureDataSource(TestCase):
# Make sure that user can affect disk aliases
dscfg = {'disk_aliases': {'ephemeral0': '/dev/sdc'}}
odata = {'HostName': "myhost", 'UserName': "myuser",
- 'dscfg': {'text': b64e(yaml.dump(dscfg)),
- 'encoding': 'base64'}}
+ 'dscfg': {'text': b64e(yaml.dump(dscfg)),
+ 'encoding': 'base64'}}
usercfg = {'disk_setup': {'/dev/sdc': {'something': '...'},
'ephemeral0': False}}
userdata = '#cloud-config' + yaml.dump(usercfg) + "\n"
@@ -398,54 +431,6 @@ class TestAzureDataSource(TestCase):
dsrc = self._get_ds({'ovfcontent': xml})
dsrc.get_data()
- def test_existing_ovf_same(self):
- # waagent/SharedConfig left alone if found ovf-env.xml same as cached
- odata = {'UserData': b64e("SOMEUSERDATA")}
- data = {'ovfcontent': construct_valid_ovf_env(data=odata)}
-
- populate_dir(self.waagent_d,
- {'ovf-env.xml': data['ovfcontent'],
- 'otherfile': 'otherfile-content',
- 'SharedConfig.xml': 'mysharedconfig'})
-
- dsrc = self._get_ds(data)
- ret = dsrc.get_data()
- self.assertTrue(ret)
- self.assertTrue(os.path.exists(
- os.path.join(self.waagent_d, 'ovf-env.xml')))
- self.assertTrue(os.path.exists(
- os.path.join(self.waagent_d, 'otherfile')))
- self.assertTrue(os.path.exists(
- os.path.join(self.waagent_d, 'SharedConfig.xml')))
-
- def test_existing_ovf_diff(self):
- # waagent/SharedConfig must be removed if ovfenv is found elsewhere
-
- # 'get_data' should remove SharedConfig.xml in /var/lib/waagent
- # if ovf-env.xml differs.
- cached_ovfenv = construct_valid_ovf_env(
- {'userdata': b64e("FOO_USERDATA")})
- new_ovfenv = construct_valid_ovf_env(
- {'userdata': b64e("NEW_USERDATA")})
-
- populate_dir(self.waagent_d,
- {'ovf-env.xml': cached_ovfenv,
- 'SharedConfig.xml': "mysharedconfigxml",
- 'otherfile': 'otherfilecontent'})
-
- dsrc = self._get_ds({'ovfcontent': new_ovfenv})
- ret = dsrc.get_data()
- self.assertTrue(ret)
- self.assertEqual(dsrc.userdata_raw, b"NEW_USERDATA")
- self.assertTrue(os.path.exists(
- os.path.join(self.waagent_d, 'otherfile')))
- self.assertFalse(os.path.exists(
- os.path.join(self.waagent_d, 'SharedConfig.xml')))
- self.assertTrue(os.path.exists(
- os.path.join(self.waagent_d, 'ovf-env.xml')))
- new_xml = load_file(os.path.join(self.waagent_d, 'ovf-env.xml'))
- self.xml_equals(new_ovfenv, new_xml)
-
def test_exception_fetching_fabric_data_doesnt_propagate(self):
ds = self._get_ds({'ovfcontent': construct_valid_ovf_env()})
ds.ds_cfg['agent_command'] = '__builtin__'
@@ -460,6 +445,17 @@ class TestAzureDataSource(TestCase):
self.assertTrue(ret)
self.assertEqual('value', ds.metadata['test'])
+ def test_instance_id_from_dmidecode_used(self):
+ ds = self._get_ds({'ovfcontent': construct_valid_ovf_env()})
+ ds.get_data()
+ self.assertEqual(self.instance_id, ds.metadata['instance-id'])
+
+ def test_instance_id_from_dmidecode_used_for_builtin(self):
+ ds = self._get_ds({'ovfcontent': construct_valid_ovf_env()})
+ ds.ds_cfg['agent_command'] = '__builtin__'
+ ds.get_data()
+ self.assertEqual(self.instance_id, ds.metadata['instance-id'])
+
class TestAzureBounce(TestCase):
@@ -469,9 +465,6 @@ class TestAzureBounce(TestCase):
self.patches.enter_context(
mock.patch.object(DataSourceAzure, 'wait_for_files'))
self.patches.enter_context(
- mock.patch.object(DataSourceAzure, 'iid_from_shared_config',
- mock.MagicMock(return_value='i-my-azure-id')))
- self.patches.enter_context(
mock.patch.object(DataSourceAzure, 'list_possible_azure_ds_devs',
mock.MagicMock(return_value=[])))
self.patches.enter_context(
@@ -485,6 +478,9 @@ class TestAzureBounce(TestCase):
self.patches.enter_context(
mock.patch.object(DataSourceAzure, 'get_metadata_from_fabric',
mock.MagicMock(return_value={})))
+ self.patches.enter_context(
+ mock.patch.object(DataSourceAzure.util, 'read_dmi_data',
+ mock.MagicMock(return_value='test-instance-id')))
def setUp(self):
super(TestAzureBounce, self).setUp()
@@ -639,11 +635,11 @@ class TestReadAzureOvf(TestCase):
def test_invalid_xml_raises_non_azure_ds(self):
invalid_xml = "<foo>" + construct_valid_ovf_env(data={})
self.assertRaises(DataSourceAzure.BrokenAzureDataSource,
- DataSourceAzure.read_azure_ovf, invalid_xml)
+ DataSourceAzure.read_azure_ovf, invalid_xml)
def test_load_with_pubkeys(self):
- mypklist = [{'fingerprint': 'fp1', 'path': 'path1'}]
- pubkeys = [(x['fingerprint'], x['path']) for x in mypklist]
+ mypklist = [{'fingerprint': 'fp1', 'path': 'path1', 'value': ''}]
+ pubkeys = [(x['fingerprint'], x['path'], x['value']) for x in mypklist]
content = construct_valid_ovf_env(pubkeys=pubkeys)
(_md, _ud, cfg) = DataSourceAzure.read_azure_ovf(content)
for mypk in mypklist:
diff --git a/tests/unittests/test_datasource/test_azure_helper.py b/tests/unittests/test_datasource/test_azure_helper.py
index a5228870..1134199b 100644
--- a/tests/unittests/test_datasource/test_azure_helper.py
+++ b/tests/unittests/test_datasource/test_azure_helper.py
@@ -1,6 +1,4 @@
import os
-import struct
-import unittest
from cloudinit.sources.helpers import azure as azure_helper
from ..helpers import TestCase
@@ -40,7 +38,7 @@ GOAL_STATE_TEMPLATE = """\
<HostingEnvironmentConfig>
http://100.86.192.70:80/...hostingEnvironmentConfig...
</HostingEnvironmentConfig>
- <SharedConfig>{shared_config_url}</SharedConfig>
+ <SharedConfig>http://100.86.192.70:80/..SharedConfig..</SharedConfig>
<ExtensionsConfig>
http://100.86.192.70:80/...extensionsConfig...
</ExtensionsConfig>
@@ -55,21 +53,6 @@ GOAL_STATE_TEMPLATE = """\
"""
-class TestReadAzureSharedConfig(unittest.TestCase):
-
- def test_valid_content(self):
- xml = """<?xml version="1.0" encoding="utf-8"?>
- <SharedConfig>
- <Deployment name="MY_INSTANCE_ID">
- <Service name="myservice"/>
- <ServiceInstance name="INSTANCE_ID.0" guid="{abcd-uuid}" />
- </Deployment>
- <Incarnation number="1"/>
- </SharedConfig>"""
- ret = azure_helper.iid_from_shared_config_content(xml)
- self.assertEqual("MY_INSTANCE_ID", ret)
-
-
class TestFindEndpoint(TestCase):
def setUp(self):
@@ -90,48 +73,64 @@ class TestFindEndpoint(TestCase):
self.assertRaises(Exception,
azure_helper.WALinuxAgentShim.find_endpoint)
- def _build_lease_content(self, ip_address, use_hex=True):
- ip_address_repr = ':'.join(
- [hex(int(part)).replace('0x', '')
- for part in ip_address.split('.')])
- if not use_hex:
- ip_address_repr = struct.pack(
- '>L', int(ip_address_repr.replace(':', ''), 16))
- ip_address_repr = '"{0}"'.format(ip_address_repr.decode('utf-8'))
+ @staticmethod
+ def _build_lease_content(encoded_address):
return '\n'.join([
'lease {',
' interface "eth0";',
- ' option unknown-245 {0};'.format(ip_address_repr),
+ ' option unknown-245 {0};'.format(encoded_address),
'}'])
- def test_hex_string(self):
- ip_address = '98.76.54.32'
- file_content = self._build_lease_content(ip_address)
+ def test_latest_lease_used(self):
+ encoded_addresses = ['5:4:3:2', '4:3:2:1']
+ file_content = '\n'.join([self._build_lease_content(encoded_address)
+ for encoded_address in encoded_addresses])
self.load_file.return_value = file_content
- self.assertEqual(ip_address,
+ self.assertEqual(encoded_addresses[-1].replace(':', '.'),
azure_helper.WALinuxAgentShim.find_endpoint())
+
+class TestExtractIpAddressFromLeaseValue(TestCase):
+
+ def test_hex_string(self):
+ ip_address, encoded_address = '98.76.54.32', '62:4c:36:20'
+ self.assertEqual(
+ ip_address,
+ azure_helper.WALinuxAgentShim.get_ip_from_lease_value(
+ encoded_address
+ ))
+
def test_hex_string_with_single_character_part(self):
- ip_address = '4.3.2.1'
- file_content = self._build_lease_content(ip_address)
- self.load_file.return_value = file_content
- self.assertEqual(ip_address,
- azure_helper.WALinuxAgentShim.find_endpoint())
+ ip_address, encoded_address = '4.3.2.1', '4:3:2:1'
+ self.assertEqual(
+ ip_address,
+ azure_helper.WALinuxAgentShim.get_ip_from_lease_value(
+ encoded_address
+ ))
def test_packed_string(self):
- ip_address = '98.76.54.32'
- file_content = self._build_lease_content(ip_address, use_hex=False)
- self.load_file.return_value = file_content
- self.assertEqual(ip_address,
- azure_helper.WALinuxAgentShim.find_endpoint())
+ ip_address, encoded_address = '98.76.54.32', 'bL6 '
+ self.assertEqual(
+ ip_address,
+ azure_helper.WALinuxAgentShim.get_ip_from_lease_value(
+ encoded_address
+ ))
- def test_latest_lease_used(self):
- ip_addresses = ['4.3.2.1', '98.76.54.32']
- file_content = '\n'.join([self._build_lease_content(ip_address)
- for ip_address in ip_addresses])
- self.load_file.return_value = file_content
- self.assertEqual(ip_addresses[-1],
- azure_helper.WALinuxAgentShim.find_endpoint())
+ def test_packed_string_with_escaped_quote(self):
+ ip_address, encoded_address = '100.72.34.108', 'dH\\"l'
+ self.assertEqual(
+ ip_address,
+ azure_helper.WALinuxAgentShim.get_ip_from_lease_value(
+ encoded_address
+ ))
+
+ def test_packed_string_containing_a_colon(self):
+ ip_address, encoded_address = '100.72.58.108', 'dH:l'
+ self.assertEqual(
+ ip_address,
+ azure_helper.WALinuxAgentShim.get_ip_from_lease_value(
+ encoded_address
+ ))
class TestGoalStateParsing(TestCase):
@@ -140,7 +139,6 @@ class TestGoalStateParsing(TestCase):
'incarnation': 1,
'container_id': 'MyContainerId',
'instance_id': 'MyInstanceId',
- 'shared_config_url': 'MySharedConfigUrl',
'certificates_url': 'MyCertificatesUrl',
}
@@ -174,20 +172,9 @@ class TestGoalStateParsing(TestCase):
goal_state = self._get_goal_state(instance_id=instance_id)
self.assertEqual(instance_id, goal_state.instance_id)
- def test_shared_config_xml_parsed_and_fetched_correctly(self):
- http_client = mock.MagicMock()
- shared_config_url = 'TestSharedConfigUrl'
- goal_state = self._get_goal_state(
- http_client=http_client, shared_config_url=shared_config_url)
- shared_config_xml = goal_state.shared_config_xml
- self.assertEqual(1, http_client.get.call_count)
- self.assertEqual(shared_config_url, http_client.get.call_args[0][0])
- self.assertEqual(http_client.get.return_value.contents,
- shared_config_xml)
-
def test_certificates_xml_parsed_and_fetched_correctly(self):
http_client = mock.MagicMock()
- certificates_url = 'TestSharedConfigUrl'
+ certificates_url = 'TestCertificatesUrl'
goal_state = self._get_goal_state(
http_client=http_client, certificates_url=certificates_url)
certificates_xml = goal_state.certificates_xml
@@ -324,8 +311,6 @@ class TestWALinuxAgentShim(TestCase):
azure_helper.WALinuxAgentShim, 'find_endpoint'))
self.GoalState = patches.enter_context(
mock.patch.object(azure_helper, 'GoalState'))
- self.iid_from_shared_config_content = patches.enter_context(
- mock.patch.object(azure_helper, 'iid_from_shared_config_content'))
self.OpenSSLManager = patches.enter_context(
mock.patch.object(azure_helper, 'OpenSSLManager'))
patches.enter_context(
@@ -367,15 +352,6 @@ class TestWALinuxAgentShim(TestCase):
data = shim.register_with_azure_and_fetch_data()
self.assertEqual([], data['public-keys'])
- def test_instance_id_returned_in_data(self):
- shim = azure_helper.WALinuxAgentShim()
- data = shim.register_with_azure_and_fetch_data()
- self.assertEqual(
- [mock.call(self.GoalState.return_value.shared_config_xml)],
- self.iid_from_shared_config_content.call_args_list)
- self.assertEqual(self.iid_from_shared_config_content.return_value,
- data['instance-id'])
-
def test_correct_url_used_for_report_ready(self):
self.find_endpoint.return_value = 'test_endpoint'
shim = azure_helper.WALinuxAgentShim()
diff --git a/tests/unittests/test_datasource/test_configdrive.py b/tests/unittests/test_datasource/test_configdrive.py
index 83aca505..89b15f54 100644
--- a/tests/unittests/test_datasource/test_configdrive.py
+++ b/tests/unittests/test_datasource/test_configdrive.py
@@ -59,18 +59,50 @@ OSTACK_META = {
CONTENT_0 = b'This is contents of /etc/foo.cfg\n'
CONTENT_1 = b'# this is /etc/bar/bar.cfg\n'
+NETWORK_DATA = {
+ 'services': [
+ {'type': 'dns', 'address': '199.204.44.24'},
+ {'type': 'dns', 'address': '199.204.47.54'}
+ ],
+ 'links': [
+ {'vif_id': '2ecc7709-b3f7-4448-9580-e1ec32d75bbd',
+ 'ethernet_mac_address': 'fa:16:3e:69:b0:58',
+ 'type': 'ovs', 'mtu': None, 'id': 'tap2ecc7709-b3'},
+ {'vif_id': '2f88d109-5b57-40e6-af32-2472df09dc33',
+ 'ethernet_mac_address': 'fa:16:3e:d4:57:ad',
+ 'type': 'ovs', 'mtu': None, 'id': 'tap2f88d109-5b'},
+ {'vif_id': '1a5382f8-04c5-4d75-ab98-d666c1ef52cc',
+ 'ethernet_mac_address': 'fa:16:3e:05:30:fe',
+ 'type': 'ovs', 'mtu': None, 'id': 'tap1a5382f8-04'}
+ ],
+ 'networks': [
+ {'link': 'tap2ecc7709-b3', 'type': 'ipv4_dhcp',
+ 'network_id': '6d6357ac-0f70-4afa-8bd7-c274cc4ea235',
+ 'id': 'network0'},
+ {'link': 'tap2f88d109-5b', 'type': 'ipv4_dhcp',
+ 'network_id': 'd227a9b3-6960-4d94-8976-ee5788b44f54',
+ 'id': 'network1'},
+ {'link': 'tap1a5382f8-04', 'type': 'ipv4_dhcp',
+ 'network_id': 'dab2ba57-cae2-4311-a5ed-010b263891f5',
+ 'id': 'network2'}
+ ]
+}
CFG_DRIVE_FILES_V2 = {
- 'ec2/2009-04-04/meta-data.json': json.dumps(EC2_META),
- 'ec2/2009-04-04/user-data': USER_DATA,
- 'ec2/latest/meta-data.json': json.dumps(EC2_META),
- 'ec2/latest/user-data': USER_DATA,
- 'openstack/2012-08-10/meta_data.json': json.dumps(OSTACK_META),
- 'openstack/2012-08-10/user_data': USER_DATA,
- 'openstack/content/0000': CONTENT_0,
- 'openstack/content/0001': CONTENT_1,
- 'openstack/latest/meta_data.json': json.dumps(OSTACK_META),
- 'openstack/latest/user_data': USER_DATA}
+ 'ec2/2009-04-04/meta-data.json': json.dumps(EC2_META),
+ 'ec2/2009-04-04/user-data': USER_DATA,
+ 'ec2/latest/meta-data.json': json.dumps(EC2_META),
+ 'ec2/latest/user-data': USER_DATA,
+ 'openstack/2012-08-10/meta_data.json': json.dumps(OSTACK_META),
+ 'openstack/2012-08-10/user_data': USER_DATA,
+ 'openstack/content/0000': CONTENT_0,
+ 'openstack/content/0001': CONTENT_1,
+ 'openstack/latest/meta_data.json': json.dumps(OSTACK_META),
+ 'openstack/latest/user_data': USER_DATA,
+ 'openstack/latest/network_data.json': json.dumps(NETWORK_DATA),
+ 'openstack/2015-10-15/meta_data.json': json.dumps(OSTACK_META),
+ 'openstack/2015-10-15/user_data': USER_DATA,
+ 'openstack/2015-10-15/network_data.json': json.dumps(NETWORK_DATA)}
class TestConfigDriveDataSource(TestCase):
@@ -225,6 +257,7 @@ class TestConfigDriveDataSource(TestCase):
self.assertEqual(USER_DATA, found['userdata'])
self.assertEqual(expected_md, found['metadata'])
+ self.assertEqual(NETWORK_DATA, found['networkdata'])
self.assertEqual(found['files']['/etc/foo.cfg'], CONTENT_0)
self.assertEqual(found['files']['/etc/bar/bar.cfg'], CONTENT_1)
@@ -250,6 +283,7 @@ class TestConfigDriveDataSource(TestCase):
data = copy(CFG_DRIVE_FILES_V2)
data["openstack/2012-08-10/meta_data.json"] = "non-json garbage {}"
+ data["openstack/2015-10-15/meta_data.json"] = "non-json garbage {}"
data["openstack/latest/meta_data.json"] = "non-json garbage {}"
populate_dir(self.tmp, data)
@@ -293,9 +327,8 @@ class TestConfigDriveDataSource(TestCase):
util.is_partition = my_is_partition
devs_with_answers = {"TYPE=vfat": [],
- "TYPE=iso9660": ["/dev/vdb"],
- "LABEL=config-2": ["/dev/vdb"],
- }
+ "TYPE=iso9660": ["/dev/vdb"],
+ "LABEL=config-2": ["/dev/vdb"]}
self.assertEqual(["/dev/vdb"], ds.find_candidate_devs())
# add a vfat item
@@ -306,9 +339,10 @@ class TestConfigDriveDataSource(TestCase):
# verify that partitions are considered, that have correct label.
devs_with_answers = {"TYPE=vfat": ["/dev/sda1"],
- "TYPE=iso9660": [], "LABEL=config-2": ["/dev/vdb3"]}
+ "TYPE=iso9660": [],
+ "LABEL=config-2": ["/dev/vdb3"]}
self.assertEqual(["/dev/vdb3"],
- ds.find_candidate_devs())
+ ds.find_candidate_devs())
finally:
util.find_devs_with = orig_find_devs_with
@@ -319,7 +353,20 @@ class TestConfigDriveDataSource(TestCase):
populate_dir(self.tmp, CFG_DRIVE_FILES_V2)
myds = cfg_ds_from_dir(self.tmp)
self.assertEqual(myds.get_public_ssh_keys(),
- [OSTACK_META['public_keys']['mykey']])
+ [OSTACK_META['public_keys']['mykey']])
+
+ def test_network_data_is_found(self):
+ """Verify that network_data is present in ds in config-drive-v2."""
+ populate_dir(self.tmp, CFG_DRIVE_FILES_V2)
+ myds = cfg_ds_from_dir(self.tmp)
+ self.assertEqual(myds.network_json, NETWORK_DATA)
+
+ def test_network_config_is_converted(self):
+ """Verify that network_data is converted and present on ds object."""
+ populate_dir(self.tmp, CFG_DRIVE_FILES_V2)
+ myds = cfg_ds_from_dir(self.tmp)
+ network_config = ds.convert_network_data(NETWORK_DATA)
+ self.assertEqual(myds.network_config, network_config)
def cfg_ds_from_dir(seed_d):
@@ -339,6 +386,8 @@ def populate_ds_from_read_config(cfg_ds, source, results):
cfg_ds.ec2_metadata = results.get('ec2-metadata')
cfg_ds.userdata_raw = results.get('userdata')
cfg_ds.version = results.get('version')
+ cfg_ds.network_json = results.get('networkdata')
+ cfg_ds._network_config = ds.convert_network_data(cfg_ds.network_json)
def populate_dir(seed_dir, files):
diff --git a/tests/unittests/test_datasource/test_maas.py b/tests/unittests/test_datasource/test_maas.py
index f109bb04..77d15cac 100644
--- a/tests/unittests/test_datasource/test_maas.py
+++ b/tests/unittests/test_datasource/test_maas.py
@@ -25,9 +25,9 @@ class TestMAASDataSource(TestCase):
"""Verify a valid seeddir is read as such."""
data = {'instance-id': 'i-valid01',
- 'local-hostname': 'valid01-hostname',
- 'user-data': b'valid01-userdata',
- 'public-keys': 'ssh-rsa AAAAB3Nz...aC1yc2E= keyname'}
+ 'local-hostname': 'valid01-hostname',
+ 'user-data': b'valid01-userdata',
+ 'public-keys': 'ssh-rsa AAAAB3Nz...aC1yc2E= keyname'}
my_d = os.path.join(self.tmp, "valid")
populate_dir(my_d, data)
@@ -45,8 +45,8 @@ class TestMAASDataSource(TestCase):
"""Verify extra files do not affect seed_dir validity."""
data = {'instance-id': 'i-valid-extra',
- 'local-hostname': 'valid-extra-hostname',
- 'user-data': b'valid-extra-userdata', 'foo': 'bar'}
+ 'local-hostname': 'valid-extra-hostname',
+ 'user-data': b'valid-extra-userdata', 'foo': 'bar'}
my_d = os.path.join(self.tmp, "valid_extra")
populate_dir(my_d, data)
@@ -64,7 +64,7 @@ class TestMAASDataSource(TestCase):
"""Verify that invalid seed_dir raises MAASSeedDirMalformed."""
valid = {'instance-id': 'i-instanceid',
- 'local-hostname': 'test-hostname', 'user-data': ''}
+ 'local-hostname': 'test-hostname', 'user-data': ''}
my_based = os.path.join(self.tmp, "valid_extra")
@@ -94,8 +94,8 @@ class TestMAASDataSource(TestCase):
def test_seed_dir_missing(self):
"""Verify that missing seed_dir raises MAASSeedDirNone."""
self.assertRaises(DataSourceMAAS.MAASSeedDirNone,
- DataSourceMAAS.read_maas_seed_dir,
- os.path.join(self.tmp, "nonexistantdirectory"))
+ DataSourceMAAS.read_maas_seed_dir,
+ os.path.join(self.tmp, "nonexistantdirectory"))
def test_seed_url_valid(self):
"""Verify that valid seed_url is read as such."""
@@ -141,7 +141,7 @@ class TestMAASDataSource(TestCase):
with mock.patch.object(url_helper, 'readurl',
side_effect=side_effect()) as mockobj:
userdata, metadata = DataSourceMAAS.read_maas_seed_url(
- my_seed, header_cb=my_headers_cb, version=my_ver)
+ my_seed, version=my_ver)
self.assertEqual(b"foodata", userdata)
self.assertEqual(metadata['instance-id'],
diff --git a/tests/unittests/test_datasource/test_nocloud.py b/tests/unittests/test_datasource/test_nocloud.py
index 85b4c25a..2d5fc37c 100644
--- a/tests/unittests/test_datasource/test_nocloud.py
+++ b/tests/unittests/test_datasource/test_nocloud.py
@@ -121,7 +121,7 @@ class TestNoCloudDataSource(TestCase):
ret = dsrc.get_data()
self.assertEqual(dsrc.userdata_raw, ud)
self.assertEqual(dsrc.metadata, md)
- self.assertEqual(dsrc.vendordata, vd)
+ self.assertEqual(dsrc.vendordata_raw, vd)
self.assertTrue(ret)
def test_nocloud_no_vendordata(self):
diff --git a/tests/unittests/test_datasource/test_opennebula.py b/tests/unittests/test_datasource/test_opennebula.py
index 27adf21b..d796f030 100644
--- a/tests/unittests/test_datasource/test_opennebula.py
+++ b/tests/unittests/test_datasource/test_opennebula.py
@@ -20,7 +20,7 @@ TEST_VARS = {
'VAR7': 'single\\t',
'VAR8': 'double\\tword',
'VAR9': 'multi\\t\nline\n',
- 'VAR10': '\\', # expect \
+ 'VAR10': '\\', # expect '\'
'VAR11': '\'', # expect '
'VAR12': '$', # expect $
}
diff --git a/tests/unittests/test_datasource/test_smartos.py b/tests/unittests/test_datasource/test_smartos.py
index adee9019..5c49966a 100644
--- a/tests/unittests/test_datasource/test_smartos.py
+++ b/tests/unittests/test_datasource/test_smartos.py
@@ -56,12 +56,13 @@ MOCK_RETURNS = {
'cloud-init:user-data': '\n'.join(['#!/bin/sh', '/bin/true', '']),
'sdc:datacenter_name': 'somewhere2',
'sdc:operator-script': '\n'.join(['bin/true', '']),
+ 'sdc:uuid': str(uuid.uuid4()),
'sdc:vendor-data': '\n'.join(['VENDOR_DATA', '']),
'user-data': '\n'.join(['something', '']),
'user-script': '\n'.join(['/bin/true', '']),
}
-DMI_DATA_RETURN = (str(uuid.uuid4()), 'smartdc')
+DMI_DATA_RETURN = 'smartdc'
def get_mock_client(mockdata):
@@ -111,7 +112,8 @@ class TestSmartOSDataSource(helpers.FilesystemMockingTestCase):
ret = apply_patches(patches)
self.unapply += ret
- def _get_ds(self, sys_cfg=None, ds_cfg=None, mockdata=None, dmi_data=None):
+ def _get_ds(self, sys_cfg=None, ds_cfg=None, mockdata=None, dmi_data=None,
+ is_lxbrand=False):
mod = DataSourceSmartOS
if mockdata is None:
@@ -124,9 +126,13 @@ class TestSmartOSDataSource(helpers.FilesystemMockingTestCase):
return dmi_data
def _os_uname():
- # LP: #1243287. tests assume this runs, but running test on
- # arm would cause them all to fail.
- return ('LINUX', 'NODENAME', 'RELEASE', 'VERSION', 'x86_64')
+ if not is_lxbrand:
+ # LP: #1243287. tests assume this runs, but running test on
+ # arm would cause them all to fail.
+ return ('LINUX', 'NODENAME', 'RELEASE', 'VERSION', 'x86_64')
+ else:
+ return ('LINUX', 'NODENAME', 'RELEASE', 'BRANDZ VIRTUAL LINUX',
+ 'X86_64')
if sys_cfg is None:
sys_cfg = {}
@@ -136,7 +142,6 @@ class TestSmartOSDataSource(helpers.FilesystemMockingTestCase):
sys_cfg['datasource']['SmartOS'] = ds_cfg
self.apply_patches([(mod, 'LEGACY_USER_D', self.legacy_user_d)])
- self.apply_patches([(mod, 'get_serial', mock.MagicMock())])
self.apply_patches([
(mod, 'JoyentMetadataClient', get_mock_client(mockdata))])
self.apply_patches([(mod, 'dmi_data', _dmi_data)])
@@ -144,6 +149,7 @@ class TestSmartOSDataSource(helpers.FilesystemMockingTestCase):
self.apply_patches([(mod, 'device_exists', lambda d: True)])
dsrc = mod.DataSourceSmartOS(sys_cfg, distro=None,
paths=self.paths)
+ self.apply_patches([(dsrc, '_get_seed_file_object', mock.MagicMock())])
return dsrc
def test_seed(self):
@@ -151,14 +157,29 @@ class TestSmartOSDataSource(helpers.FilesystemMockingTestCase):
dsrc = self._get_ds()
ret = dsrc.get_data()
self.assertTrue(ret)
+ self.assertEquals('kvm', dsrc.smartos_type)
self.assertEquals('/dev/ttyS1', dsrc.seed)
+ def test_seed_lxbrand(self):
+ # default seed should be /dev/ttyS1
+ dsrc = self._get_ds(is_lxbrand=True)
+ ret = dsrc.get_data()
+ self.assertTrue(ret)
+ self.assertEquals('lx-brand', dsrc.smartos_type)
+ self.assertEquals('/native/.zonecontrol/metadata.sock', dsrc.seed)
+
def test_issmartdc(self):
dsrc = self._get_ds()
ret = dsrc.get_data()
self.assertTrue(ret)
self.assertTrue(dsrc.is_smartdc)
+ def test_issmartdc_lxbrand(self):
+ dsrc = self._get_ds(is_lxbrand=True)
+ ret = dsrc.get_data()
+ self.assertTrue(ret)
+ self.assertTrue(dsrc.is_smartdc)
+
def test_no_base64(self):
ds_cfg = {'no_base64_decode': ['test_var1'], 'all_base': True}
dsrc = self._get_ds(ds_cfg=ds_cfg)
@@ -169,7 +190,8 @@ class TestSmartOSDataSource(helpers.FilesystemMockingTestCase):
dsrc = self._get_ds(mockdata=MOCK_RETURNS)
ret = dsrc.get_data()
self.assertTrue(ret)
- self.assertEquals(DMI_DATA_RETURN[0], dsrc.metadata['instance-id'])
+ self.assertEquals(MOCK_RETURNS['sdc:uuid'],
+ dsrc.metadata['instance-id'])
def test_root_keys(self):
dsrc = self._get_ds(mockdata=MOCK_RETURNS)
@@ -407,18 +429,6 @@ class TestSmartOSDataSource(helpers.FilesystemMockingTestCase):
self.assertEqual(dsrc.device_name_to_device('FOO'),
mydscfg['disk_aliases']['FOO'])
- @mock.patch('cloudinit.sources.DataSourceSmartOS.JoyentMetadataClient')
- @mock.patch('cloudinit.sources.DataSourceSmartOS.get_serial')
- def test_serial_console_closed_on_error(self, get_serial, metadata_client):
- class OurException(Exception):
- pass
- metadata_client.side_effect = OurException
- try:
- DataSourceSmartOS.query_data('noun', 'device', 0)
- except OurException:
- pass
- self.assertEqual(1, get_serial.return_value.close.call_count)
-
def apply_patches(patches):
ret = []
@@ -447,14 +457,25 @@ class TestJoyentMetadataClient(helpers.FilesystemMockingTestCase):
}
def make_response():
- payload = ''
- if self.response_parts['payload']:
- payload = ' {0}'.format(self.response_parts['payload'])
- del self.response_parts['payload']
- return (
- 'V2 {length} {crc} {request_id} {command}{payload}\n'.format(
- payload=payload, **self.response_parts).encode('ascii'))
- self.serial.readline.side_effect = make_response
+ payloadstr = ''
+ if 'payload' in self.response_parts:
+ payloadstr = ' {0}'.format(self.response_parts['payload'])
+ return ('V2 {length} {crc} {request_id} '
+ '{command}{payloadstr}\n'.format(
+ payloadstr=payloadstr,
+ **self.response_parts).encode('ascii'))
+
+ self.metasource_data = None
+
+ def read_response(length):
+ if not self.metasource_data:
+ self.metasource_data = make_response()
+ self.metasource_data_len = len(self.metasource_data)
+ resp = self.metasource_data[:length]
+ self.metasource_data = self.metasource_data[length:]
+ return resp
+
+ self.serial.read.side_effect = read_response
self.patched_funcs.enter_context(
mock.patch('cloudinit.sources.DataSourceSmartOS.random.randint',
mock.Mock(return_value=self.request_id)))
@@ -477,7 +498,9 @@ class TestJoyentMetadataClient(helpers.FilesystemMockingTestCase):
client.get_metadata('some_key')
self.assertEqual(1, self.serial.write.call_count)
written_line = self.serial.write.call_args[0][0]
- self.assertEndsWith(written_line, b'\n')
+ print(type(written_line))
+ self.assertEndsWith(written_line.decode('ascii'),
+ b'\n'.decode('ascii'))
self.assertEqual(1, written_line.count(b'\n'))
def _get_written_line(self, key='some_key'):
@@ -489,7 +512,8 @@ class TestJoyentMetadataClient(helpers.FilesystemMockingTestCase):
self.assertIsInstance(self._get_written_line(), six.binary_type)
def test_get_metadata_line_starts_with_v2(self):
- self.assertStartsWith(self._get_written_line(), b'V2')
+ foo = self._get_written_line()
+ self.assertStartsWith(foo.decode('ascii'), b'V2'.decode('ascii'))
def test_get_metadata_uses_get_command(self):
parts = self._get_written_line().decode('ascii').strip().split(' ')
@@ -526,7 +550,7 @@ class TestJoyentMetadataClient(helpers.FilesystemMockingTestCase):
def test_get_metadata_reads_a_line(self):
client = self._get_client()
client.get_metadata('some_key')
- self.assertEqual(1, self.serial.readline.call_count)
+ self.assertEqual(self.metasource_data_len, self.serial.read.call_count)
def test_get_metadata_returns_valid_value(self):
client = self._get_client()
diff --git a/tests/unittests/test_distros/test_netconfig.py b/tests/unittests/test_distros/test_netconfig.py
index 6d30c5b8..2c2a424d 100644
--- a/tests/unittests/test_distros/test_netconfig.py
+++ b/tests/unittests/test_distros/test_netconfig.py
@@ -109,8 +109,9 @@ class TestNetCfgDistro(TestCase):
ub_distro.apply_network(BASE_NET_CFG, False)
self.assertEquals(len(write_bufs), 1)
- self.assertIn('/etc/network/interfaces', write_bufs)
- write_buf = write_bufs['/etc/network/interfaces']
+ eni_name = '/etc/network/interfaces.d/50-cloud-init.cfg'
+ self.assertIn(eni_name, write_bufs)
+ write_buf = write_bufs[eni_name]
self.assertEquals(str(write_buf).strip(), BASE_NET_CFG.strip())
self.assertEquals(write_buf.mode, 0o644)
diff --git a/tests/unittests/test_distros/test_user_data_normalize.py b/tests/unittests/test_distros/test_user_data_normalize.py
index e4488e2a..4525f487 100644
--- a/tests/unittests/test_distros/test_user_data_normalize.py
+++ b/tests/unittests/test_distros/test_user_data_normalize.py
@@ -6,13 +6,13 @@ from ..helpers import TestCase
bcfg = {
- 'name': 'bob',
- 'plain_text_passwd': 'ubuntu',
- 'home': "/home/ubuntu",
- 'shell': "/bin/bash",
- 'lock_passwd': True,
- 'gecos': "Ubuntu",
- 'groups': ["foo"]
+ 'name': 'bob',
+ 'plain_text_passwd': 'ubuntu',
+ 'home': "/home/ubuntu",
+ 'shell': "/bin/bash",
+ 'lock_passwd': True,
+ 'gecos': "Ubuntu",
+ 'groups': ["foo"]
}
@@ -34,16 +34,11 @@ class TestUGNormalize(TestCase):
def test_group_dict(self):
distro = self._make_distro('ubuntu')
g = {'groups': [
- {
- 'ubuntu': ['foo', 'bar'],
- 'bob': 'users',
- },
- 'cloud-users',
- {
- 'bob': 'users2',
- },
- ]
- }
+ {'ubuntu': ['foo', 'bar'],
+ 'bob': 'users'},
+ 'cloud-users',
+ {'bob': 'users2'}
+ ]}
(_users, groups) = self._norm(g, distro)
self.assertIn('ubuntu', groups)
ub_members = groups['ubuntu']
diff --git a/tests/unittests/test_handler/test_handler_lxd.py b/tests/unittests/test_handler/test_handler_lxd.py
new file mode 100644
index 00000000..7ffa2a53
--- /dev/null
+++ b/tests/unittests/test_handler/test_handler_lxd.py
@@ -0,0 +1,75 @@
+from cloudinit.config import cc_lxd
+from cloudinit import (distros, helpers, cloud)
+from cloudinit.sources import DataSourceNoCloud
+from .. import helpers as t_help
+
+import logging
+
+try:
+ from unittest import mock
+except ImportError:
+ import mock
+
+LOG = logging.getLogger(__name__)
+
+
+class TestLxd(t_help.TestCase):
+ lxd_cfg = {
+ 'lxd': {
+ 'init': {
+ 'network_address': '0.0.0.0',
+ 'storage_backend': 'zfs',
+ 'storage_pool': 'poolname',
+ }
+ }
+ }
+
+ def setUp(self):
+ super(TestLxd, self).setUp()
+
+ def _get_cloud(self, distro):
+ cls = distros.fetch(distro)
+ paths = helpers.Paths({})
+ d = cls(distro, {}, paths)
+ ds = DataSourceNoCloud.DataSourceNoCloud({}, d, paths)
+ cc = cloud.Cloud(ds, paths, {}, d, None)
+ return cc
+
+ @mock.patch("cloudinit.config.cc_lxd.util")
+ def test_lxd_init(self, mock_util):
+ cc = self._get_cloud('ubuntu')
+ mock_util.which.return_value = True
+ cc_lxd.handle('cc_lxd', self.lxd_cfg, cc, LOG, [])
+ self.assertTrue(mock_util.which.called)
+ init_call = mock_util.subp.call_args_list[0][0][0]
+ self.assertEquals(init_call,
+ ['lxd', 'init', '--auto',
+ '--network-address=0.0.0.0',
+ '--storage-backend=zfs',
+ '--storage-pool=poolname'])
+
+ @mock.patch("cloudinit.config.cc_lxd.util")
+ def test_lxd_install(self, mock_util):
+ cc = self._get_cloud('ubuntu')
+ cc.distro = mock.MagicMock()
+ mock_util.which.return_value = None
+ cc_lxd.handle('cc_lxd', self.lxd_cfg, cc, LOG, [])
+ self.assertTrue(cc.distro.install_packages.called)
+ install_pkg = cc.distro.install_packages.call_args_list[0][0][0]
+ self.assertEquals(sorted(install_pkg), ['lxd', 'zfs'])
+
+ @mock.patch("cloudinit.config.cc_lxd.util")
+ def test_no_init_does_nothing(self, mock_util):
+ cc = self._get_cloud('ubuntu')
+ cc.distro = mock.MagicMock()
+ cc_lxd.handle('cc_lxd', {'lxd': {}}, cc, LOG, [])
+ self.assertFalse(cc.distro.install_packages.called)
+ self.assertFalse(mock_util.subp.called)
+
+ @mock.patch("cloudinit.config.cc_lxd.util")
+ def test_no_lxd_does_nothing(self, mock_util):
+ cc = self._get_cloud('ubuntu')
+ cc.distro = mock.MagicMock()
+ cc_lxd.handle('cc_lxd', {'package_update': True}, cc, LOG, [])
+ self.assertFalse(cc.distro.install_packages.called)
+ self.assertFalse(mock_util.subp.called)
diff --git a/tests/unittests/test_handler/test_handler_power_state.py b/tests/unittests/test_handler/test_handler_power_state.py
index 2f86b8f8..04ce5687 100644
--- a/tests/unittests/test_handler/test_handler_power_state.py
+++ b/tests/unittests/test_handler/test_handler_power_state.py
@@ -1,6 +1,9 @@
+import sys
+
from cloudinit.config import cc_power_state_change as psc
from .. import helpers as t_help
+from ..helpers import mock
class TestLoadPowerState(t_help.TestCase):
@@ -9,12 +12,12 @@ class TestLoadPowerState(t_help.TestCase):
def test_no_config(self):
# completely empty config should mean do nothing
- (cmd, _timeout) = psc.load_power_state({})
+ (cmd, _timeout, _condition) = psc.load_power_state({})
self.assertEqual(cmd, None)
def test_irrelevant_config(self):
# no power_state field in config should return None for cmd
- (cmd, _timeout) = psc.load_power_state({'foo': 'bar'})
+ (cmd, _timeout, _condition) = psc.load_power_state({'foo': 'bar'})
self.assertEqual(cmd, None)
def test_invalid_mode(self):
@@ -53,23 +56,59 @@ class TestLoadPowerState(t_help.TestCase):
def test_no_message(self):
# if message is not present, then no argument should be passed for it
cfg = {'power_state': {'mode': 'poweroff'}}
- (cmd, _timeout) = psc.load_power_state(cfg)
+ (cmd, _timeout, _condition) = psc.load_power_state(cfg)
self.assertNotIn("", cmd)
check_lps_ret(psc.load_power_state(cfg))
self.assertTrue(len(cmd) == 3)
+ def test_condition_null_raises(self):
+ cfg = {'power_state': {'mode': 'poweroff', 'condition': None}}
+ self.assertRaises(TypeError, psc.load_power_state, cfg)
+
+ def test_condition_default_is_true(self):
+ cfg = {'power_state': {'mode': 'poweroff'}}
+ _cmd, _timeout, cond = psc.load_power_state(cfg)
+ self.assertEqual(cond, True)
+
+
+class TestCheckCondition(t_help.TestCase):
+ def cmd_with_exit(self, rc):
+ return([sys.executable, '-c', 'import sys; sys.exit(%s)' % rc])
+
+ def test_true_is_true(self):
+ self.assertEqual(psc.check_condition(True), True)
+
+ def test_false_is_false(self):
+ self.assertEqual(psc.check_condition(False), False)
+
+ def test_cmd_exit_zero_true(self):
+ self.assertEqual(psc.check_condition(self.cmd_with_exit(0)), True)
+
+ def test_cmd_exit_one_false(self):
+ self.assertEqual(psc.check_condition(self.cmd_with_exit(1)), False)
+
+ def test_cmd_exit_nonzero_warns(self):
+ mocklog = mock.Mock()
+ self.assertEqual(
+ psc.check_condition(self.cmd_with_exit(2), mocklog), False)
+ self.assertEqual(mocklog.warn.call_count, 1)
+
def check_lps_ret(psc_return, mode=None):
- if len(psc_return) != 2:
+ if len(psc_return) != 3:
raise TypeError("length returned = %d" % len(psc_return))
errs = []
cmd = psc_return[0]
timeout = psc_return[1]
+ condition = psc_return[2]
if 'shutdown' not in psc_return[0][0]:
errs.append("string 'shutdown' not in cmd")
+ if condition is None:
+ errs.append("condition was not returned")
+
if mode is not None:
opt = {'halt': '-H', 'poweroff': '-P', 'reboot': '-r'}[mode]
if opt not in psc_return[0]:
diff --git a/tests/unittests/test_handler/test_handler_seed_random.py b/tests/unittests/test_handler/test_handler_seed_random.py
index 0bcdcb31..98bc9b81 100644
--- a/tests/unittests/test_handler/test_handler_seed_random.py
+++ b/tests/unittests/test_handler/test_handler_seed_random.py
@@ -170,27 +170,30 @@ class TestRandomSeed(t_help.TestCase):
contents = util.load_file(self._seed_file)
self.assertEquals('tiny-tim-was-here-so-was-josh', contents)
- def test_seed_command_not_provided_pollinate_available(self):
+ def test_seed_command_provided_and_available(self):
c = self._get_cloud('ubuntu', {})
self.whichdata = {'pollinate': '/usr/bin/pollinate'}
- cc_seed_random.handle('test', {}, c, LOG, [])
+ cfg = {'random_seed': {'command': ['pollinate', '-q']}}
+ cc_seed_random.handle('test', cfg, c, LOG, [])
subp_args = [f['args'] for f in self.subp_called]
self.assertIn(['pollinate', '-q'], subp_args)
- def test_seed_command_not_provided_pollinate_not_available(self):
+ def test_seed_command_not_provided(self):
c = self._get_cloud('ubuntu', {})
self.whichdata = {}
cc_seed_random.handle('test', {}, c, LOG, [])
# subp should not have been called as which would say not available
- self.assertEquals(self.subp_called, list())
+ self.assertFalse(self.subp_called)
def test_unavailable_seed_command_and_required_raises_error(self):
c = self._get_cloud('ubuntu', {})
self.whichdata = {}
+ cfg = {'random_seed': {'command': ['THIS_NO_COMMAND'],
+ 'command_required': True}}
self.assertRaises(ValueError, cc_seed_random.handle,
- 'test', {'random_seed': {'command_required': True}}, c, LOG, [])
+ 'test', cfg, c, LOG, [])
def test_seed_command_and_required(self):
c = self._get_cloud('ubuntu', {})
diff --git a/tests/unittests/test_handler/test_handler_snappy.py b/tests/unittests/test_handler/test_handler_snappy.py
index eceb14d9..8aeff53c 100644
--- a/tests/unittests/test_handler/test_handler_snappy.py
+++ b/tests/unittests/test_handler/test_handler_snappy.py
@@ -125,8 +125,7 @@ class TestInstallPackages(t_help.TestCase):
"pkg1.smoser.config": "pkg1.smoser.config-data",
"pkg1.config": "pkg1.config-data",
"pkg2.smoser_0.0_amd64.snap": "pkg2-snapdata",
- "pkg2.smoser_0.0_amd64.config": "pkg2.config",
- })
+ "pkg2.smoser_0.0_amd64.config": "pkg2.config"})
ret = get_package_ops(
packages=[], configs={}, installed=[], fspath=self.tmp)
diff --git a/tests/unittests/test_handler/test_handler_write_files.py b/tests/unittests/test_handler/test_handler_write_files.py
new file mode 100644
index 00000000..f1c7f7b4
--- /dev/null
+++ b/tests/unittests/test_handler/test_handler_write_files.py
@@ -0,0 +1,112 @@
+from cloudinit import util
+from cloudinit import log as logging
+from cloudinit.config.cc_write_files import write_files
+
+from ..helpers import FilesystemMockingTestCase
+
+import base64
+import gzip
+import shutil
+import six
+import tempfile
+
+LOG = logging.getLogger(__name__)
+
+YAML_TEXT = """
+write_files:
+ - encoding: gzip
+ content: !!binary |
+ H4sIAIDb/U8C/1NW1E/KzNMvzuBKTc7IV8hIzcnJVyjPL8pJ4QIA6N+MVxsAAAA=
+ path: /usr/bin/hello
+ permissions: '0755'
+ - content: !!binary |
+ Zm9vYmFyCg==
+ path: /wark
+ permissions: '0755'
+ - content: |
+ hi mom line 1
+ hi mom line 2
+ path: /tmp/message
+"""
+
+YAML_CONTENT_EXPECTED = {
+ '/usr/bin/hello': "#!/bin/sh\necho hello world\n",
+ '/wark': "foobar\n",
+ '/tmp/message': "hi mom line 1\nhi mom line 2\n",
+}
+
+
+class TestWriteFiles(FilesystemMockingTestCase):
+ def setUp(self):
+ super(TestWriteFiles, self).setUp()
+ self.tmp = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, self.tmp)
+
+ def test_simple(self):
+ self.patchUtils(self.tmp)
+ expected = "hello world\n"
+ filename = "/tmp/my.file"
+ write_files(
+ "test_simple", [{"content": expected, "path": filename}], LOG)
+ self.assertEqual(util.load_file(filename), expected)
+
+ def test_yaml_binary(self):
+ self.patchUtils(self.tmp)
+ data = util.load_yaml(YAML_TEXT)
+ write_files("testname", data['write_files'], LOG)
+ for path, content in YAML_CONTENT_EXPECTED.items():
+ self.assertEqual(util.load_file(path), content)
+
+ def test_all_decodings(self):
+ self.patchUtils(self.tmp)
+
+ # build a 'files' array that has a dictionary of encodings
+ # for 'gz', 'gzip', 'gz+base64' ...
+ data = b"foobzr"
+ utf8_valid = b"foobzr"
+ utf8_invalid = b'ab\xaadef'
+ files = []
+ expected = []
+
+ gz_aliases = ('gz', 'gzip')
+ gz_b64_aliases = ('gz+base64', 'gzip+base64', 'gz+b64', 'gzip+b64')
+ b64_aliases = ('base64', 'b64')
+
+ datum = (("utf8", utf8_valid), ("no-utf8", utf8_invalid))
+ for name, data in datum:
+ gz = (_gzip_bytes(data), gz_aliases)
+ gz_b64 = (base64.b64encode(_gzip_bytes(data)), gz_b64_aliases)
+ b64 = (base64.b64encode(data), b64_aliases)
+ for content, aliases in (gz, gz_b64, b64):
+ for enc in aliases:
+ cur = {'content': content,
+ 'path': '/tmp/file-%s-%s' % (name, enc),
+ 'encoding': enc}
+ files.append(cur)
+ expected.append((cur['path'], data))
+
+ write_files("test_decoding", files, LOG)
+
+ for path, content in expected:
+ self.assertEqual(util.load_file(path, decode=False), content)
+
+ # make sure we actually wrote *some* files.
+ flen_expected = (
+ len(gz_aliases + gz_b64_aliases + b64_aliases) * len(datum))
+ self.assertEqual(len(expected), flen_expected)
+
+
+def _gzip_bytes(data):
+ buf = six.BytesIO()
+ fp = None
+ try:
+ fp = gzip.GzipFile(fileobj=buf, mode="wb")
+ fp.write(data)
+ fp.close()
+ return buf.getvalue()
+ finally:
+ if fp:
+ fp.close()
+
+
+# vi: ts=4 expandtab
diff --git a/tests/unittests/test_net.py b/tests/unittests/test_net.py
new file mode 100644
index 00000000..dfb31710
--- /dev/null
+++ b/tests/unittests/test_net.py
@@ -0,0 +1,127 @@
+from cloudinit import util
+from cloudinit import net
+from .helpers import TestCase
+
+import base64
+import copy
+import io
+import gzip
+import json
+import os
+
+DHCP_CONTENT_1 = """
+DEVICE='eth0'
+PROTO='dhcp'
+IPV4ADDR='192.168.122.89'
+IPV4BROADCAST='192.168.122.255'
+IPV4NETMASK='255.255.255.0'
+IPV4GATEWAY='192.168.122.1'
+IPV4DNS0='192.168.122.1'
+IPV4DNS1='0.0.0.0'
+HOSTNAME='foohost'
+DNSDOMAIN=''
+NISDOMAIN=''
+ROOTSERVER='192.168.122.1'
+ROOTPATH=''
+filename=''
+UPTIME='21'
+DHCPLEASETIME='3600'
+DOMAINSEARCH='foo.com'
+"""
+
+DHCP_EXPECTED_1 = {
+ 'name': 'eth0',
+ 'type': 'physical',
+ 'subnets': [{'broadcast': '192.168.122.255',
+ 'gateway': '192.168.122.1',
+ 'dns_search': ['foo.com'],
+ 'type': 'dhcp',
+ 'netmask': '255.255.255.0',
+ 'dns_nameservers': ['192.168.122.1']}],
+}
+
+
+STATIC_CONTENT_1 = """
+DEVICE='eth1'
+PROTO='static'
+IPV4ADDR='10.0.0.2'
+IPV4BROADCAST='10.0.0.255'
+IPV4NETMASK='255.255.255.0'
+IPV4GATEWAY='10.0.0.1'
+IPV4DNS0='10.0.1.1'
+IPV4DNS1='0.0.0.0'
+HOSTNAME='foohost'
+UPTIME='21'
+DHCPLEASETIME='3600'
+DOMAINSEARCH='foo.com'
+"""
+
+STATIC_EXPECTED_1 = {
+ 'name': 'eth1',
+ 'type': 'physical',
+ 'subnets': [{'broadcast': '10.0.0.255', 'gateway': '10.0.0.1',
+ 'dns_search': ['foo.com'], 'type': 'static',
+ 'netmask': '255.255.255.0',
+ 'dns_nameservers': ['10.0.1.1']}],
+}
+
+
+class TestNetConfigParsing(TestCase):
+ simple_cfg = {
+ 'config': [{"type": "physical", "name": "eth0",
+ "mac_address": "c0:d6:9f:2c:e8:80",
+ "subnets": [{"type": "dhcp"}]}]}
+
+ def test_klibc_convert_dhcp(self):
+ found = net._klibc_to_config_entry(DHCP_CONTENT_1)
+ self.assertEqual(found, ('eth0', DHCP_EXPECTED_1))
+
+ def test_klibc_convert_static(self):
+ found = net._klibc_to_config_entry(STATIC_CONTENT_1)
+ self.assertEqual(found, ('eth1', STATIC_EXPECTED_1))
+
+ def test_config_from_klibc_net_cfg(self):
+ files = []
+ pairs = (('net-eth0.cfg', DHCP_CONTENT_1),
+ ('net-eth1.cfg', STATIC_CONTENT_1))
+
+ macs = {'eth1': 'b8:ae:ed:75:ff:2b',
+ 'eth0': 'b8:ae:ed:75:ff:2a'}
+
+ dhcp = copy.deepcopy(DHCP_EXPECTED_1)
+ dhcp['mac_address'] = macs['eth0']
+
+ static = copy.deepcopy(STATIC_EXPECTED_1)
+ static['mac_address'] = macs['eth1']
+
+ expected = {'version': 1, 'config': [dhcp, static]}
+ with util.tempdir() as tmpd:
+ for fname, content in pairs:
+ fp = os.path.join(tmpd, fname)
+ files.append(fp)
+ util.write_file(fp, content)
+
+ found = net.config_from_klibc_net_cfg(files=files, mac_addrs=macs)
+ self.assertEqual(found, expected)
+
+ def test_cmdline_with_b64(self):
+ data = base64.b64encode(json.dumps(self.simple_cfg).encode())
+ encoded_text = data.decode()
+ cmdline = 'ro network-config=' + encoded_text + ' root=foo'
+ found = net.read_kernel_cmdline_config(cmdline=cmdline)
+ self.assertEqual(found, self.simple_cfg)
+
+ def test_cmdline_with_b64_gz(self):
+ data = _gzip_data(json.dumps(self.simple_cfg).encode())
+ encoded_text = base64.b64encode(data).decode()
+ cmdline = 'ro network-config=' + encoded_text + ' root=foo'
+ found = net.read_kernel_cmdline_config(cmdline=cmdline)
+ self.assertEqual(found, self.simple_cfg)
+
+
+def _gzip_data(data):
+ with io.BytesIO() as iobuf:
+ gzfp = gzip.GzipFile(mode="wb", fileobj=iobuf)
+ gzfp.write(data)
+ gzfp.close()
+ return iobuf.getvalue()
diff --git a/tests/unittests/test_registry.py b/tests/unittests/test_registry.py
new file mode 100644
index 00000000..bcf01475
--- /dev/null
+++ b/tests/unittests/test_registry.py
@@ -0,0 +1,28 @@
+from cloudinit.registry import DictRegistry
+
+from .helpers import (mock, TestCase)
+
+
+class TestDictRegistry(TestCase):
+
+ def test_added_item_included_in_output(self):
+ registry = DictRegistry()
+ item_key, item_to_register = 'test_key', mock.Mock()
+ registry.register_item(item_key, item_to_register)
+ self.assertEqual({item_key: item_to_register},
+ registry.registered_items)
+
+ def test_registry_starts_out_empty(self):
+ self.assertEqual({}, DictRegistry().registered_items)
+
+ def test_modifying_registered_items_isnt_exposed_to_other_callers(self):
+ registry = DictRegistry()
+ registry.registered_items['test_item'] = mock.Mock()
+ self.assertEqual({}, registry.registered_items)
+
+ def test_keys_cannot_be_replaced(self):
+ registry = DictRegistry()
+ item_key = 'test_key'
+ registry.register_item(item_key, mock.Mock())
+ self.assertRaises(ValueError,
+ registry.register_item, item_key, mock.Mock())
diff --git a/tests/unittests/test_reporting.py b/tests/unittests/test_reporting.py
new file mode 100644
index 00000000..32356ef9
--- /dev/null
+++ b/tests/unittests/test_reporting.py
@@ -0,0 +1,369 @@
+# Copyright 2015 Canonical Ltd.
+# This file is part of cloud-init. See LICENCE file for license information.
+#
+# vi: ts=4 expandtab
+
+from cloudinit import reporting
+from cloudinit.reporting import handlers
+from cloudinit.reporting import events
+
+from .helpers import (mock, TestCase)
+
+
+def _fake_registry():
+ return mock.Mock(registered_items={'a': mock.MagicMock(),
+ 'b': mock.MagicMock()})
+
+
+class TestReportStartEvent(TestCase):
+
+ @mock.patch('cloudinit.reporting.events.instantiated_handler_registry',
+ new_callable=_fake_registry)
+ def test_report_start_event_passes_something_with_as_string_to_handlers(
+ self, instantiated_handler_registry):
+ event_name, event_description = 'my_test_event', 'my description'
+ events.report_start_event(event_name, event_description)
+ expected_string_representation = ': '.join(
+ ['start', event_name, event_description])
+ for _, handler in (
+ instantiated_handler_registry.registered_items.items()):
+ self.assertEqual(1, handler.publish_event.call_count)
+ event = handler.publish_event.call_args[0][0]
+ self.assertEqual(expected_string_representation, event.as_string())
+
+
+class TestReportFinishEvent(TestCase):
+
+ def _report_finish_event(self, result=events.status.SUCCESS):
+ event_name, event_description = 'my_test_event', 'my description'
+ events.report_finish_event(
+ event_name, event_description, result=result)
+ return event_name, event_description
+
+ def assertHandlersPassedObjectWithAsString(
+ self, handlers, expected_as_string):
+ for _, handler in handlers.items():
+ self.assertEqual(1, handler.publish_event.call_count)
+ event = handler.publish_event.call_args[0][0]
+ self.assertEqual(expected_as_string, event.as_string())
+
+ @mock.patch('cloudinit.reporting.events.instantiated_handler_registry',
+ new_callable=_fake_registry)
+ def test_report_finish_event_passes_something_with_as_string_to_handlers(
+ self, instantiated_handler_registry):
+ event_name, event_description = self._report_finish_event()
+ expected_string_representation = ': '.join(
+ ['finish', event_name, events.status.SUCCESS,
+ event_description])
+ self.assertHandlersPassedObjectWithAsString(
+ instantiated_handler_registry.registered_items,
+ expected_string_representation)
+
+ @mock.patch('cloudinit.reporting.events.instantiated_handler_registry',
+ new_callable=_fake_registry)
+ def test_reporting_successful_finish_has_sensible_string_repr(
+ self, instantiated_handler_registry):
+ event_name, event_description = self._report_finish_event(
+ result=events.status.SUCCESS)
+ expected_string_representation = ': '.join(
+ ['finish', event_name, events.status.SUCCESS,
+ event_description])
+ self.assertHandlersPassedObjectWithAsString(
+ instantiated_handler_registry.registered_items,
+ expected_string_representation)
+
+ @mock.patch('cloudinit.reporting.events.instantiated_handler_registry',
+ new_callable=_fake_registry)
+ def test_reporting_unsuccessful_finish_has_sensible_string_repr(
+ self, instantiated_handler_registry):
+ event_name, event_description = self._report_finish_event(
+ result=events.status.FAIL)
+ expected_string_representation = ': '.join(
+ ['finish', event_name, events.status.FAIL, event_description])
+ self.assertHandlersPassedObjectWithAsString(
+ instantiated_handler_registry.registered_items,
+ expected_string_representation)
+
+ def test_invalid_result_raises_attribute_error(self):
+ self.assertRaises(ValueError, self._report_finish_event, ("BOGUS",))
+
+
+class TestReportingEvent(TestCase):
+
+ def test_as_string(self):
+ event_type, name, description = 'test_type', 'test_name', 'test_desc'
+ event = events.ReportingEvent(event_type, name, description)
+ expected_string_representation = ': '.join(
+ [event_type, name, description])
+ self.assertEqual(expected_string_representation, event.as_string())
+
+ def test_as_dict(self):
+ event_type, name, desc = 'test_type', 'test_name', 'test_desc'
+ event = events.ReportingEvent(event_type, name, desc)
+ expected = {'event_type': event_type, 'name': name,
+ 'description': desc, 'origin': 'cloudinit'}
+
+ # allow for timestamp to differ, but must be present
+ as_dict = event.as_dict()
+ self.assertIn('timestamp', as_dict)
+ del as_dict['timestamp']
+
+ self.assertEqual(expected, as_dict)
+
+
+class TestFinishReportingEvent(TestCase):
+ def test_as_has_result(self):
+ result = events.status.SUCCESS
+ name, desc = 'test_name', 'test_desc'
+ event = events.FinishReportingEvent(name, desc, result)
+ ret = event.as_dict()
+ self.assertTrue('result' in ret)
+ self.assertEqual(ret['result'], result)
+
+
+class TestBaseReportingHandler(TestCase):
+
+ def test_base_reporting_handler_is_abstract(self):
+ regexp = r".*abstract.*publish_event.*"
+ self.assertRaisesRegexp(TypeError, regexp, handlers.ReportingHandler)
+
+
+class TestLogHandler(TestCase):
+
+ @mock.patch.object(reporting.handlers.logging, 'getLogger')
+ def test_appropriate_logger_used(self, getLogger):
+ event_type, event_name = 'test_type', 'test_name'
+ event = events.ReportingEvent(event_type, event_name, 'description')
+ reporting.handlers.LogHandler().publish_event(event)
+ self.assertEqual(
+ [mock.call(
+ 'cloudinit.reporting.{0}.{1}'.format(event_type, event_name))],
+ getLogger.call_args_list)
+
+ @mock.patch.object(reporting.handlers.logging, 'getLogger')
+ def test_single_log_message_at_info_published(self, getLogger):
+ event = events.ReportingEvent('type', 'name', 'description')
+ reporting.handlers.LogHandler().publish_event(event)
+ self.assertEqual(1, getLogger.return_value.log.call_count)
+
+ @mock.patch.object(reporting.handlers.logging, 'getLogger')
+ def test_log_message_uses_event_as_string(self, getLogger):
+ event = events.ReportingEvent('type', 'name', 'description')
+ reporting.handlers.LogHandler(level="INFO").publish_event(event)
+ self.assertIn(event.as_string(),
+ getLogger.return_value.log.call_args[0][1])
+
+
+class TestDefaultRegisteredHandler(TestCase):
+
+ def test_log_handler_registered_by_default(self):
+ registered_items = (
+ reporting.instantiated_handler_registry.registered_items)
+ for _, item in registered_items.items():
+ if isinstance(item, reporting.handlers.LogHandler):
+ break
+ else:
+ self.fail('No reporting LogHandler registered by default.')
+
+
+class TestReportingConfiguration(TestCase):
+
+ @mock.patch.object(reporting, 'instantiated_handler_registry')
+ def test_empty_configuration_doesnt_add_handlers(
+ self, instantiated_handler_registry):
+ reporting.update_configuration({})
+ self.assertEqual(
+ 0, instantiated_handler_registry.register_item.call_count)
+
+ @mock.patch.object(
+ reporting, 'instantiated_handler_registry', reporting.DictRegistry())
+ @mock.patch.object(reporting, 'available_handlers')
+ def test_looks_up_handler_by_type_and_adds_it(self, available_handlers):
+ handler_type_name = 'test_handler'
+ handler_cls = mock.Mock()
+ available_handlers.registered_items = {handler_type_name: handler_cls}
+ handler_name = 'my_test_handler'
+ reporting.update_configuration(
+ {handler_name: {'type': handler_type_name}})
+ self.assertEqual(
+ {handler_name: handler_cls.return_value},
+ reporting.instantiated_handler_registry.registered_items)
+
+ @mock.patch.object(
+ reporting, 'instantiated_handler_registry', reporting.DictRegistry())
+ @mock.patch.object(reporting, 'available_handlers')
+ def test_uses_non_type_parts_of_config_dict_as_kwargs(
+ self, available_handlers):
+ handler_type_name = 'test_handler'
+ handler_cls = mock.Mock()
+ available_handlers.registered_items = {handler_type_name: handler_cls}
+ extra_kwargs = {'foo': 'bar', 'bar': 'baz'}
+ handler_config = extra_kwargs.copy()
+ handler_config.update({'type': handler_type_name})
+ handler_name = 'my_test_handler'
+ reporting.update_configuration({handler_name: handler_config})
+ self.assertEqual(
+ handler_cls.return_value,
+ reporting.instantiated_handler_registry.registered_items[
+ handler_name])
+ self.assertEqual([mock.call(**extra_kwargs)],
+ handler_cls.call_args_list)
+
+ @mock.patch.object(
+ reporting, 'instantiated_handler_registry', reporting.DictRegistry())
+ @mock.patch.object(reporting, 'available_handlers')
+ def test_handler_config_not_modified(self, available_handlers):
+ handler_type_name = 'test_handler'
+ handler_cls = mock.Mock()
+ available_handlers.registered_items = {handler_type_name: handler_cls}
+ handler_config = {'type': handler_type_name, 'foo': 'bar'}
+ expected_handler_config = handler_config.copy()
+ reporting.update_configuration({'my_test_handler': handler_config})
+ self.assertEqual(expected_handler_config, handler_config)
+
+ @mock.patch.object(
+ reporting, 'instantiated_handler_registry', reporting.DictRegistry())
+ @mock.patch.object(reporting, 'available_handlers')
+ def test_handlers_removed_if_falseish_specified(self, available_handlers):
+ handler_type_name = 'test_handler'
+ handler_cls = mock.Mock()
+ available_handlers.registered_items = {handler_type_name: handler_cls}
+ handler_name = 'my_test_handler'
+ reporting.update_configuration(
+ {handler_name: {'type': handler_type_name}})
+ self.assertEqual(
+ 1, len(reporting.instantiated_handler_registry.registered_items))
+ reporting.update_configuration({handler_name: None})
+ self.assertEqual(
+ 0, len(reporting.instantiated_handler_registry.registered_items))
+
+
+class TestReportingEventStack(TestCase):
+ @mock.patch('cloudinit.reporting.events.report_finish_event')
+ @mock.patch('cloudinit.reporting.events.report_start_event')
+ def test_start_and_finish_success(self, report_start, report_finish):
+ with events.ReportEventStack(name="myname", description="mydesc"):
+ pass
+ self.assertEqual(
+ [mock.call('myname', 'mydesc')], report_start.call_args_list)
+ self.assertEqual(
+ [mock.call('myname', 'mydesc', events.status.SUCCESS,
+ post_files=[])],
+ report_finish.call_args_list)
+
+ @mock.patch('cloudinit.reporting.events.report_finish_event')
+ @mock.patch('cloudinit.reporting.events.report_start_event')
+ def test_finish_exception_defaults_fail(self, report_start, report_finish):
+ name = "myname"
+ desc = "mydesc"
+ try:
+ with events.ReportEventStack(name, description=desc):
+ raise ValueError("This didnt work")
+ except ValueError:
+ pass
+ self.assertEqual([mock.call(name, desc)], report_start.call_args_list)
+ self.assertEqual(
+ [mock.call(name, desc, events.status.FAIL, post_files=[])],
+ report_finish.call_args_list)
+
+ @mock.patch('cloudinit.reporting.events.report_finish_event')
+ @mock.patch('cloudinit.reporting.events.report_start_event')
+ def test_result_on_exception_used(self, report_start, report_finish):
+ name = "myname"
+ desc = "mydesc"
+ try:
+ with events.ReportEventStack(
+ name, desc, result_on_exception=events.status.WARN):
+ raise ValueError("This didnt work")
+ except ValueError:
+ pass
+ self.assertEqual([mock.call(name, desc)], report_start.call_args_list)
+ self.assertEqual(
+ [mock.call(name, desc, events.status.WARN, post_files=[])],
+ report_finish.call_args_list)
+
+ @mock.patch('cloudinit.reporting.events.report_start_event')
+ def test_child_fullname_respects_parent(self, report_start):
+ parent_name = "topname"
+ c1_name = "c1name"
+ c2_name = "c2name"
+ c2_expected_fullname = '/'.join([parent_name, c1_name, c2_name])
+ c1_expected_fullname = '/'.join([parent_name, c1_name])
+
+ parent = events.ReportEventStack(parent_name, "topdesc")
+ c1 = events.ReportEventStack(c1_name, "c1desc", parent=parent)
+ c2 = events.ReportEventStack(c2_name, "c2desc", parent=c1)
+ with c1:
+ report_start.assert_called_with(c1_expected_fullname, "c1desc")
+ with c2:
+ report_start.assert_called_with(c2_expected_fullname, "c2desc")
+
+ @mock.patch('cloudinit.reporting.events.report_finish_event')
+ @mock.patch('cloudinit.reporting.events.report_start_event')
+ def test_child_result_bubbles_up(self, report_start, report_finish):
+ parent = events.ReportEventStack("topname", "topdesc")
+ child = events.ReportEventStack("c_name", "c_desc", parent=parent)
+ with parent:
+ with child:
+ child.result = events.status.WARN
+
+ report_finish.assert_called_with(
+ "topname", "topdesc", events.status.WARN, post_files=[])
+
+ @mock.patch('cloudinit.reporting.events.report_finish_event')
+ def test_message_used_in_finish(self, report_finish):
+ with events.ReportEventStack("myname", "mydesc",
+ message="mymessage"):
+ pass
+ self.assertEqual(
+ [mock.call("myname", "mymessage", events.status.SUCCESS,
+ post_files=[])],
+ report_finish.call_args_list)
+
+ @mock.patch('cloudinit.reporting.events.report_finish_event')
+ def test_message_updatable(self, report_finish):
+ with events.ReportEventStack("myname", "mydesc") as c:
+ c.message = "all good"
+ self.assertEqual(
+ [mock.call("myname", "all good", events.status.SUCCESS,
+ post_files=[])],
+ report_finish.call_args_list)
+
+ @mock.patch('cloudinit.reporting.events.report_start_event')
+ @mock.patch('cloudinit.reporting.events.report_finish_event')
+ def test_reporting_disabled_does_not_report_events(
+ self, report_start, report_finish):
+ with events.ReportEventStack("a", "b", reporting_enabled=False):
+ pass
+ self.assertEqual(report_start.call_count, 0)
+ self.assertEqual(report_finish.call_count, 0)
+
+ @mock.patch('cloudinit.reporting.events.report_start_event')
+ @mock.patch('cloudinit.reporting.events.report_finish_event')
+ def test_reporting_child_default_to_parent(
+ self, report_start, report_finish):
+ parent = events.ReportEventStack(
+ "pname", "pdesc", reporting_enabled=False)
+ child = events.ReportEventStack("cname", "cdesc", parent=parent)
+ with parent:
+ with child:
+ pass
+ pass
+ self.assertEqual(report_start.call_count, 0)
+ self.assertEqual(report_finish.call_count, 0)
+
+ def test_reporting_event_has_sane_repr(self):
+ myrep = events.ReportEventStack("fooname", "foodesc",
+ reporting_enabled=True).__repr__()
+ self.assertIn("fooname", myrep)
+ self.assertIn("foodesc", myrep)
+ self.assertIn("True", myrep)
+
+ def test_set_invalid_result_raises_value_error(self):
+ f = events.ReportEventStack("myname", "mydesc")
+ self.assertRaises(ValueError, setattr, f, "result", "BOGUS")
+
+
+class TestStatusAccess(TestCase):
+ def test_invalid_status_access_raises_value_error(self):
+ self.assertRaises(AttributeError, getattr, events.status, "BOGUS")
diff --git a/tests/unittests/test_sshutil.py b/tests/unittests/test_sshutil.py
index 3b317121..9aeb1cde 100644
--- a/tests/unittests/test_sshutil.py
+++ b/tests/unittests/test_sshutil.py
@@ -32,7 +32,8 @@ VALID_CONTENT = {
),
}
-TEST_OPTIONS = ("no-port-forwarding,no-agent-forwarding,no-X11-forwarding,"
+TEST_OPTIONS = (
+ "no-port-forwarding,no-agent-forwarding,no-X11-forwarding,"
'command="echo \'Please login as the user \"ubuntu\" rather than the'
'user \"root\".\';echo;sleep 10"')
diff --git a/tests/unittests/test_templating.py b/tests/unittests/test_templating.py
index 0c19a2c2..b9863650 100644
--- a/tests/unittests/test_templating.py
+++ b/tests/unittests/test_templating.py
@@ -114,5 +114,6 @@ $a,$b'''
codename)
out_data = templater.basic_render(in_data,
- {'mirror': mirror, 'codename': codename})
+ {'mirror': mirror,
+ 'codename': codename})
self.assertEqual(ex_data, out_data)
diff --git a/tests/unittests/test_util.py b/tests/unittests/test_util.py
index 95990165..37a984ac 100644
--- a/tests/unittests/test_util.py
+++ b/tests/unittests/test_util.py
@@ -385,6 +385,16 @@ class TestReadDMIData(helpers.FilesystemMockingTestCase):
self.patch_mapping({})
self.assertEqual(None, util.read_dmi_data('expect-fail'))
+ def test_dots_returned_instead_of_foxfox(self):
+ # uninitialized dmi values show as \xff, return those as .
+ my_len = 32
+ dmi_value = b'\xff' * my_len + b'\n'
+ expected = ""
+ dmi_key = 'system-product-name'
+ sysfs_key = 'product_name'
+ self._create_sysfs_file(sysfs_key, dmi_value)
+ self.assertEqual(expected, util.read_dmi_data(dmi_key))
+
class TestMultiLog(helpers.FilesystemMockingTestCase):
diff --git a/tests/unittests/test_vmware_config_file.py b/tests/unittests/test_vmware_config_file.py
new file mode 100644
index 00000000..d5c7367b
--- /dev/null
+++ b/tests/unittests/test_vmware_config_file.py
@@ -0,0 +1,103 @@
+# vi: ts=4 expandtab
+#
+# Copyright (C) 2015 Canonical Ltd.
+# Copyright (C) 2016 VMware INC.
+#
+# Author: Sankar Tanguturi <stanguturi@vmware.com>
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License version 3, as
+# published by the Free Software Foundation.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+import logging
+import sys
+import unittest
+
+from cloudinit.sources.helpers.vmware.imc.boot_proto import BootProtoEnum
+from cloudinit.sources.helpers.vmware.imc.config import Config
+from cloudinit.sources.helpers.vmware.imc.config_file import ConfigFile
+
+logging.basicConfig(level=logging.DEBUG, stream=sys.stdout)
+logger = logging.getLogger(__name__)
+
+
+class TestVmwareConfigFile(unittest.TestCase):
+
+ def test_utility_methods(self):
+ cf = ConfigFile("tests/data/vmware/cust-dhcp-2nic.cfg")
+
+ cf.clear()
+
+ self.assertEqual(0, len(cf), "clear size")
+
+ cf._insertKey(" PASSWORD|-PASS ", " foo ")
+ cf._insertKey("BAR", " ")
+
+ self.assertEqual(2, len(cf), "insert size")
+ self.assertEqual('foo', cf["PASSWORD|-PASS"], "password")
+ self.assertTrue("PASSWORD|-PASS" in cf, "hasPassword")
+ self.assertFalse(cf.should_keep_current_value("PASSWORD|-PASS"),
+ "keepPassword")
+ self.assertFalse(cf.should_remove_current_value("PASSWORD|-PASS"),
+ "removePassword")
+ self.assertFalse("FOO" in cf, "hasFoo")
+ self.assertTrue(cf.should_keep_current_value("FOO"), "keepFoo")
+ self.assertFalse(cf.should_remove_current_value("FOO"), "removeFoo")
+ self.assertTrue("BAR" in cf, "hasBar")
+ self.assertFalse(cf.should_keep_current_value("BAR"), "keepBar")
+ self.assertTrue(cf.should_remove_current_value("BAR"), "removeBar")
+
+ def test_configfile_static_2nics(self):
+ cf = ConfigFile("tests/data/vmware/cust-static-2nic.cfg")
+
+ conf = Config(cf)
+
+ self.assertEqual('myhost1', conf.host_name, "hostName")
+ self.assertEqual('Africa/Abidjan', conf.timezone, "tz")
+ self.assertTrue(conf.utc, "utc")
+
+ self.assertEqual(['10.20.145.1', '10.20.145.2'],
+ conf.name_servers,
+ "dns")
+ self.assertEqual(['eng.vmware.com', 'proxy.vmware.com'],
+ conf.dns_suffixes,
+ "suffixes")
+
+ nics = conf.nics
+ ipv40 = nics[0].staticIpv4
+
+ self.assertEqual(2, len(nics), "nics")
+ self.assertEqual('NIC1', nics[0].name, "nic0")
+ self.assertEqual('00:50:56:a6:8c:08', nics[0].mac, "mac0")
+ self.assertEqual(BootProtoEnum.STATIC, nics[0].bootProto, "bootproto0")
+ self.assertEqual('10.20.87.154', ipv40[0].ip, "ipv4Addr0")
+ self.assertEqual('255.255.252.0', ipv40[0].netmask, "ipv4Mask0")
+ self.assertEqual(2, len(ipv40[0].gateways), "ipv4Gw0")
+ self.assertEqual('10.20.87.253', ipv40[0].gateways[0], "ipv4Gw0_0")
+ self.assertEqual('10.20.87.105', ipv40[0].gateways[1], "ipv4Gw0_1")
+
+ self.assertEqual(1, len(nics[0].staticIpv6), "ipv6Cnt0")
+ self.assertEqual('fc00:10:20:87::154',
+ nics[0].staticIpv6[0].ip,
+ "ipv6Addr0")
+
+ self.assertEqual('NIC2', nics[1].name, "nic1")
+ self.assertTrue(not nics[1].staticIpv6, "ipv61 dhcp")
+
+ def test_config_file_dhcp_2nics(self):
+ cf = ConfigFile("tests/data/vmware/cust-dhcp-2nic.cfg")
+
+ conf = Config(cf)
+ nics = conf.nics
+ self.assertEqual(2, len(nics), "nics")
+ self.assertEqual('NIC1', nics[0].name, "nic0")
+ self.assertEqual('00:50:56:a6:8c:08', nics[0].mac, "mac0")
+ self.assertEqual(BootProtoEnum.DHCP, nics[0].bootProto, "bootproto0")