summaryrefslogtreecommitdiff
path: root/tests/unittests/test_datasource
diff options
context:
space:
mode:
Diffstat (limited to 'tests/unittests/test_datasource')
-rw-r--r--tests/unittests/test_datasource/test_altcloud.py95
-rw-r--r--tests/unittests/test_datasource/test_azure.py526
-rw-r--r--tests/unittests/test_datasource/test_azure_helper.py420
-rw-r--r--tests/unittests/test_datasource/test_cloudsigma.py1
-rw-r--r--tests/unittests/test_datasource/test_cloudstack.py86
-rw-r--r--tests/unittests/test_datasource/test_configdrive.py207
-rw-r--r--tests/unittests/test_datasource/test_digitalocean.py10
-rw-r--r--tests/unittests/test_datasource/test_gce.py86
-rw-r--r--tests/unittests/test_datasource/test_maas.py92
-rw-r--r--tests/unittests/test_datasource/test_nocloud.py73
-rw-r--r--tests/unittests/test_datasource/test_opennebula.py25
-rw-r--r--tests/unittests/test_datasource/test_openstack.py15
-rw-r--r--tests/unittests/test_datasource/test_smartos.py284
13 files changed, 1429 insertions, 491 deletions
diff --git a/tests/unittests/test_datasource/test_altcloud.py b/tests/unittests/test_datasource/test_altcloud.py
index eaaa90e6..85759c68 100644
--- a/tests/unittests/test_datasource/test_altcloud.py
+++ b/tests/unittests/test_datasource/test_altcloud.py
@@ -26,6 +26,7 @@ import shutil
import tempfile
from cloudinit import helpers
+from cloudinit import util
from unittest import TestCase
# Get the cloudinit.sources.DataSourceAltCloud import items needed.
@@ -45,7 +46,7 @@ def _write_cloud_info_file(value):
cifile = open(cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE, 'w')
cifile.write(value)
cifile.close()
- os.chmod(cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE, 0664)
+ os.chmod(cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE, 0o664)
def _remove_cloud_info_file():
@@ -66,12 +67,12 @@ def _write_user_data_files(mount_dir, value):
udfile = open(deltacloud_user_data_file, 'w')
udfile.write(value)
udfile.close()
- os.chmod(deltacloud_user_data_file, 0664)
+ os.chmod(deltacloud_user_data_file, 0o664)
udfile = open(user_data_file, 'w')
udfile.write(value)
udfile.close()
- os.chmod(user_data_file, 0664)
+ os.chmod(user_data_file, 0o664)
def _remove_user_data_files(mount_dir,
@@ -98,6 +99,16 @@ def _remove_user_data_files(mount_dir,
pass
+def _dmi_data(expected):
+ '''
+ Spoof the data received over DMI
+ '''
+ def _data(key):
+ return expected
+
+ return _data
+
+
class TestGetCloudType(TestCase):
'''
Test to exercise method: DataSourceAltCloud.get_cloud_type()
@@ -106,69 +117,42 @@ class TestGetCloudType(TestCase):
def setUp(self):
'''Set up.'''
self.paths = helpers.Paths({'cloud_dir': '/tmp'})
+ self.dmi_data = util.read_dmi_data
# We have a different code path for arm to deal with LP1243287
# We have to switch arch to x86_64 to avoid test failure
force_arch('x86_64')
def tearDown(self):
# Reset
- cloudinit.sources.DataSourceAltCloud.CMD_DMI_SYSTEM = \
- ['dmidecode', '--string', 'system-product-name']
- # Return back to original arch
+ util.read_dmi_data = self.dmi_data
force_arch()
def test_rhev(self):
'''
Test method get_cloud_type() for RHEVm systems.
- Forcing dmidecode return to match a RHEVm system: RHEV Hypervisor
+ Forcing read_dmi_data return to match a RHEVm system: RHEV Hypervisor
'''
- cloudinit.sources.DataSourceAltCloud.CMD_DMI_SYSTEM = \
- ['echo', 'RHEV Hypervisor']
+ util.read_dmi_data = _dmi_data('RHEV')
dsrc = DataSourceAltCloud({}, None, self.paths)
- self.assertEquals('RHEV', \
- dsrc.get_cloud_type())
+ self.assertEquals('RHEV', dsrc.get_cloud_type())
def test_vsphere(self):
'''
Test method get_cloud_type() for vSphere systems.
- Forcing dmidecode return to match a vSphere system: RHEV Hypervisor
+ Forcing read_dmi_data return to match a vSphere system: RHEV Hypervisor
'''
- cloudinit.sources.DataSourceAltCloud.CMD_DMI_SYSTEM = \
- ['echo', 'VMware Virtual Platform']
+ util.read_dmi_data = _dmi_data('VMware Virtual Platform')
dsrc = DataSourceAltCloud({}, None, self.paths)
- self.assertEquals('VSPHERE', \
- dsrc.get_cloud_type())
+ self.assertEquals('VSPHERE', dsrc.get_cloud_type())
def test_unknown(self):
'''
Test method get_cloud_type() for unknown systems.
- Forcing dmidecode return to match an unrecognized return.
- '''
- cloudinit.sources.DataSourceAltCloud.CMD_DMI_SYSTEM = \
- ['echo', 'Unrecognized Platform']
- dsrc = DataSourceAltCloud({}, None, self.paths)
- self.assertEquals('UNKNOWN', \
- dsrc.get_cloud_type())
-
- def test_exception1(self):
- '''
- Test method get_cloud_type() where command dmidecode fails.
- '''
- cloudinit.sources.DataSourceAltCloud.CMD_DMI_SYSTEM = \
- ['ls', 'bad command']
- dsrc = DataSourceAltCloud({}, None, self.paths)
- self.assertEquals('UNKNOWN', \
- dsrc.get_cloud_type())
-
- def test_exception2(self):
- '''
- Test method get_cloud_type() where command dmidecode is not available.
+ Forcing read_dmi_data return to match an unrecognized return.
'''
- cloudinit.sources.DataSourceAltCloud.CMD_DMI_SYSTEM = \
- ['bad command']
+ util.read_dmi_data = _dmi_data('Unrecognized Platform')
dsrc = DataSourceAltCloud({}, None, self.paths)
- self.assertEquals('UNKNOWN', \
- dsrc.get_cloud_type())
+ self.assertEquals('UNKNOWN', dsrc.get_cloud_type())
class TestGetDataCloudInfoFile(TestCase):
@@ -180,6 +164,7 @@ class TestGetDataCloudInfoFile(TestCase):
'''Set up.'''
self.paths = helpers.Paths({'cloud_dir': '/tmp'})
self.cloud_info_file = tempfile.mkstemp()[1]
+ self.dmi_data = util.read_dmi_data
cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE = \
self.cloud_info_file
@@ -192,6 +177,7 @@ class TestGetDataCloudInfoFile(TestCase):
except OSError:
pass
+ util.read_dmi_data = self.dmi_data
cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE = \
'/etc/sysconfig/cloud-info'
@@ -243,6 +229,7 @@ class TestGetDataNoCloudInfoFile(TestCase):
def setUp(self):
'''Set up.'''
self.paths = helpers.Paths({'cloud_dir': '/tmp'})
+ self.dmi_data = util.read_dmi_data
cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE = \
'no such file'
# We have a different code path for arm to deal with LP1243287
@@ -253,16 +240,14 @@ class TestGetDataNoCloudInfoFile(TestCase):
# Reset
cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE = \
'/etc/sysconfig/cloud-info'
- cloudinit.sources.DataSourceAltCloud.CMD_DMI_SYSTEM = \
- ['dmidecode', '--string', 'system-product-name']
+ util.read_dmi_data = self.dmi_data
# Return back to original arch
force_arch()
def test_rhev_no_cloud_file(self):
'''Test No cloud info file module get_data() forcing RHEV.'''
- cloudinit.sources.DataSourceAltCloud.CMD_DMI_SYSTEM = \
- ['echo', 'RHEV Hypervisor']
+ util.read_dmi_data = _dmi_data('RHEV Hypervisor')
dsrc = DataSourceAltCloud({}, None, self.paths)
dsrc.user_data_rhevm = lambda: True
self.assertEquals(True, dsrc.get_data())
@@ -270,8 +255,7 @@ class TestGetDataNoCloudInfoFile(TestCase):
def test_vsphere_no_cloud_file(self):
'''Test No cloud info file module get_data() forcing VSPHERE.'''
- cloudinit.sources.DataSourceAltCloud.CMD_DMI_SYSTEM = \
- ['echo', 'VMware Virtual Platform']
+ util.read_dmi_data = _dmi_data('VMware Virtual Platform')
dsrc = DataSourceAltCloud({}, None, self.paths)
dsrc.user_data_vsphere = lambda: True
self.assertEquals(True, dsrc.get_data())
@@ -279,8 +263,7 @@ class TestGetDataNoCloudInfoFile(TestCase):
def test_failure_no_cloud_file(self):
'''Test No cloud info file module get_data() forcing unrecognized.'''
- cloudinit.sources.DataSourceAltCloud.CMD_DMI_SYSTEM = \
- ['echo', 'Unrecognized Platform']
+ util.read_dmi_data = _dmi_data('Unrecognized Platform')
dsrc = DataSourceAltCloud({}, None, self.paths)
self.assertEquals(False, dsrc.get_data())
@@ -426,27 +409,27 @@ class TestReadUserDataCallback(TestCase):
'''Test read_user_data_callback() with both files.'''
self.assertEquals('test user data',
- read_user_data_callback(self.mount_dir))
+ read_user_data_callback(self.mount_dir))
def test_callback_dc(self):
'''Test read_user_data_callback() with only DC file.'''
_remove_user_data_files(self.mount_dir,
- dc_file=False,
- non_dc_file=True)
+ dc_file=False,
+ non_dc_file=True)
self.assertEquals('test user data',
- read_user_data_callback(self.mount_dir))
+ read_user_data_callback(self.mount_dir))
def test_callback_non_dc(self):
'''Test read_user_data_callback() with only non-DC file.'''
_remove_user_data_files(self.mount_dir,
- dc_file=True,
- non_dc_file=False)
+ dc_file=True,
+ non_dc_file=False)
self.assertEquals('test user data',
- read_user_data_callback(self.mount_dir))
+ read_user_data_callback(self.mount_dir))
def test_callback_none(self):
'''Test read_user_data_callback() no files are found.'''
diff --git a/tests/unittests/test_datasource/test_azure.py b/tests/unittests/test_datasource/test_azure.py
index e992a006..444e2799 100644
--- a/tests/unittests/test_datasource/test_azure.py
+++ b/tests/unittests/test_datasource/test_azure.py
@@ -1,14 +1,24 @@
from cloudinit import helpers
-from cloudinit.util import load_file
+from cloudinit.util import b64e, decode_binary, load_file
from cloudinit.sources import DataSourceAzure
-from ..helpers import populate_dir
+from ..helpers import TestCase, populate_dir
+
+try:
+ from unittest import mock
+except ImportError:
+ import mock
+try:
+ from contextlib import ExitStack
+except ImportError:
+ from contextlib2 import ExitStack
-import base64
import crypt
-from mocker import MockerTestCase
import os
import stat
import yaml
+import shutil
+import tempfile
+import xml.etree.ElementTree as ET
def construct_valid_ovf_env(data=None, pubkeys=None, userdata=None):
@@ -40,14 +50,17 @@ def construct_valid_ovf_env(data=None, pubkeys=None, userdata=None):
content += "<%s%s>%s</%s>\n" % (key, attrs, val, key)
if userdata:
- content += "<UserData>%s</UserData>\n" % (base64.b64encode(userdata))
+ content += "<UserData>%s</UserData>\n" % (b64e(userdata))
if pubkeys:
content += "<SSH><PublicKeys>\n"
- for fp, path in pubkeys:
+ for fp, path, value in pubkeys:
content += " <PublicKey>"
- content += ("<Fingerprint>%s</Fingerprint><Path>%s</Path>" %
- (fp, path))
+ if fp and path:
+ content += ("<Fingerprint>%s</Fingerprint><Path>%s</Path>" %
+ (fp, path))
+ if value:
+ content += "<Value>%s</Value>" % value
content += "</PublicKey>\n"
content += "</PublicKeys></SSH>"
content += """
@@ -66,26 +79,25 @@ def construct_valid_ovf_env(data=None, pubkeys=None, userdata=None):
return content
-class TestAzureDataSource(MockerTestCase):
+class TestAzureDataSource(TestCase):
def setUp(self):
- # makeDir comes from MockerTestCase
- self.tmp = self.makeDir()
+ super(TestAzureDataSource, self).setUp()
+ self.tmp = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, self.tmp)
# patch cloud_dir, so our 'seed_dir' is guaranteed empty
self.paths = helpers.Paths({'cloud_dir': self.tmp})
self.waagent_d = os.path.join(self.tmp, 'var', 'lib', 'waagent')
- self.unapply = []
- super(TestAzureDataSource, self).setUp()
+ self.patches = ExitStack()
+ self.addCleanup(self.patches.close)
- def tearDown(self):
- apply_patches([i for i in reversed(self.unapply)])
- super(TestAzureDataSource, self).tearDown()
+ super(TestAzureDataSource, self).setUp()
def apply_patches(self, patches):
- ret = apply_patches(patches)
- self.unapply += ret
+ for module, name, new in patches:
+ self.patches.enter_context(mock.patch.object(module, name, new))
def _get_ds(self, data):
@@ -103,13 +115,6 @@ class TestAzureDataSource(MockerTestCase):
data['pubkey_files'] = flist
return ["pubkey_from: %s" % f for f in flist]
- def _iid_from_shared_config(path):
- data['iid_from_shared_cfg'] = path
- return 'i-my-azure-id'
-
- def _apply_hostname_bounce(**kwargs):
- data['apply_hostname_bounce'] = kwargs
-
if data.get('ovfcontent') is not None:
populate_dir(os.path.join(self.paths.seed_dir, "azure"),
{'ovf-env.xml': data['ovfcontent']})
@@ -117,22 +122,63 @@ class TestAzureDataSource(MockerTestCase):
mod = DataSourceAzure
mod.BUILTIN_DS_CONFIG['data_dir'] = self.waagent_d
- self.apply_patches([(mod, 'list_possible_azure_ds_devs', dsdevs)])
-
- self.apply_patches([(mod, 'invoke_agent', _invoke_agent),
- (mod, 'wait_for_files', _wait_for_files),
- (mod, 'pubkeys_from_crt_files',
- _pubkeys_from_crt_files),
- (mod, 'iid_from_shared_config',
- _iid_from_shared_config),
- (mod, 'apply_hostname_bounce',
- _apply_hostname_bounce), ])
+ self.get_metadata_from_fabric = mock.MagicMock(return_value={
+ 'public-keys': [],
+ })
+
+ self.instance_id = 'test-instance-id'
+
+ self.apply_patches([
+ (mod, 'list_possible_azure_ds_devs', dsdevs),
+ (mod, 'invoke_agent', _invoke_agent),
+ (mod, 'wait_for_files', _wait_for_files),
+ (mod, 'pubkeys_from_crt_files', _pubkeys_from_crt_files),
+ (mod, 'perform_hostname_bounce', mock.MagicMock()),
+ (mod, 'get_hostname', mock.MagicMock()),
+ (mod, 'set_hostname', mock.MagicMock()),
+ (mod, 'get_metadata_from_fabric', self.get_metadata_from_fabric),
+ (mod.util, 'read_dmi_data', mock.MagicMock(
+ return_value=self.instance_id)),
+ ])
dsrc = mod.DataSourceAzureNet(
data.get('sys_cfg', {}), distro=None, paths=self.paths)
return dsrc
+ def xml_equals(self, oxml, nxml):
+ """Compare two sets of XML to make sure they are equal"""
+
+ def create_tag_index(xml):
+ et = ET.fromstring(xml)
+ ret = {}
+ for x in et.iter():
+ ret[x.tag] = x
+ return ret
+
+ def tags_exists(x, y):
+ for tag in x.keys():
+ self.assertIn(tag, y)
+ for tag in y.keys():
+ self.assertIn(tag, x)
+
+ def tags_equal(x, y):
+ for x_tag, x_val in x.items():
+ y_val = y.get(x_val.tag)
+ self.assertEquals(x_val.text, y_val.text)
+
+ old_cnt = create_tag_index(oxml)
+ new_cnt = create_tag_index(nxml)
+ tags_exists(old_cnt, new_cnt)
+ tags_equal(old_cnt, new_cnt)
+
+ def xml_notequals(self, oxml, nxml):
+ try:
+ self.xml_equals(oxml, nxml)
+ except AssertionError:
+ return
+ raise AssertionError("XML is the same")
+
def test_basic_seed_dir(self):
odata = {'HostName': "myhost", 'UserName': "myuser"}
data = {'ovfcontent': construct_valid_ovf_env(data=odata),
@@ -145,7 +191,6 @@ class TestAzureDataSource(MockerTestCase):
self.assertEqual(dsrc.metadata['local-hostname'], odata['HostName'])
self.assertTrue(os.path.isfile(
os.path.join(self.waagent_d, 'ovf-env.xml')))
- self.assertEqual(dsrc.metadata['instance-id'], 'i-my-azure-id')
def test_waagent_d_has_0700_perms(self):
# we expect /var/lib/waagent to be created 0700
@@ -153,7 +198,7 @@ class TestAzureDataSource(MockerTestCase):
ret = dsrc.get_data()
self.assertTrue(ret)
self.assertTrue(os.path.isdir(self.waagent_d))
- self.assertEqual(stat.S_IMODE(os.stat(self.waagent_d).st_mode), 0700)
+ self.assertEqual(stat.S_IMODE(os.stat(self.waagent_d).st_mode), 0o700)
def test_user_cfg_set_agent_command_plain(self):
# set dscfg in via plaintext
@@ -162,7 +207,7 @@ class TestAzureDataSource(MockerTestCase):
yaml_cfg = "{agent_command: my_command}\n"
cfg = yaml.safe_load(yaml_cfg)
odata = {'HostName': "myhost", 'UserName': "myuser",
- 'dscfg': {'text': yaml_cfg, 'encoding': 'plain'}}
+ 'dscfg': {'text': yaml_cfg, 'encoding': 'plain'}}
data = {'ovfcontent': construct_valid_ovf_env(data=odata)}
dsrc = self._get_ds(data)
@@ -174,8 +219,8 @@ class TestAzureDataSource(MockerTestCase):
# set dscfg in via base64 encoded yaml
cfg = {'agent_command': "my_command"}
odata = {'HostName': "myhost", 'UserName': "myuser",
- 'dscfg': {'text': base64.b64encode(yaml.dump(cfg)),
- 'encoding': 'base64'}}
+ 'dscfg': {'text': b64e(yaml.dump(cfg)),
+ 'encoding': 'base64'}}
data = {'ovfcontent': construct_valid_ovf_env(data=odata)}
dsrc = self._get_ds(data)
@@ -222,17 +267,28 @@ class TestAzureDataSource(MockerTestCase):
# should equal that after the '$'
pos = defuser['passwd'].rfind("$") + 1
self.assertEqual(defuser['passwd'],
- crypt.crypt(odata['UserPassword'], defuser['passwd'][0:pos]))
+ crypt.crypt(odata['UserPassword'],
+ defuser['passwd'][0:pos]))
+
+ def test_userdata_plain(self):
+ mydata = "FOOBAR"
+ odata = {'UserData': {'text': mydata, 'encoding': 'plain'}}
+ data = {'ovfcontent': construct_valid_ovf_env(data=odata)}
+
+ dsrc = self._get_ds(data)
+ ret = dsrc.get_data()
+ self.assertTrue(ret)
+ self.assertEqual(decode_binary(dsrc.userdata_raw), mydata)
def test_userdata_found(self):
mydata = "FOOBAR"
- odata = {'UserData': base64.b64encode(mydata)}
+ odata = {'UserData': {'text': b64e(mydata), 'encoding': 'base64'}}
data = {'ovfcontent': construct_valid_ovf_env(data=odata)}
dsrc = self._get_ds(data)
ret = dsrc.get_data()
self.assertTrue(ret)
- self.assertEqual(dsrc.userdata_raw, mydata)
+ self.assertEqual(dsrc.userdata_raw, mydata.encode('utf-8'))
def test_no_datasource_expected(self):
# no source should be found if no seed_dir and no devs
@@ -242,10 +298,10 @@ class TestAzureDataSource(MockerTestCase):
self.assertFalse(ret)
self.assertFalse('agent_invoked' in data)
- def test_cfg_has_pubkeys(self):
+ def test_cfg_has_pubkeys_fingerprint(self):
odata = {'HostName': "myhost", 'UserName': "myuser"}
- mypklist = [{'fingerprint': 'fp1', 'path': 'path1'}]
- pubkeys = [(x['fingerprint'], x['path']) for x in mypklist]
+ mypklist = [{'fingerprint': 'fp1', 'path': 'path1', 'value': ''}]
+ pubkeys = [(x['fingerprint'], x['path'], x['value']) for x in mypklist]
data = {'ovfcontent': construct_valid_ovf_env(data=odata,
pubkeys=pubkeys)}
@@ -254,47 +310,38 @@ class TestAzureDataSource(MockerTestCase):
self.assertTrue(ret)
for mypk in mypklist:
self.assertIn(mypk, dsrc.cfg['_pubkeys'])
+ self.assertIn('pubkey_from', dsrc.metadata['public-keys'][-1])
- def test_disabled_bounce(self):
- pass
-
- def test_apply_bounce_call_1(self):
- # hostname needs to get through to apply_hostname_bounce
- odata = {'HostName': 'my-random-hostname'}
- data = {'ovfcontent': construct_valid_ovf_env(data=odata)}
+ def test_cfg_has_pubkeys_value(self):
+ # make sure that provided key is used over fingerprint
+ odata = {'HostName': "myhost", 'UserName': "myuser"}
+ mypklist = [{'fingerprint': 'fp1', 'path': 'path1', 'value': 'value1'}]
+ pubkeys = [(x['fingerprint'], x['path'], x['value']) for x in mypklist]
+ data = {'ovfcontent': construct_valid_ovf_env(data=odata,
+ pubkeys=pubkeys)}
- self._get_ds(data).get_data()
- self.assertIn('hostname', data['apply_hostname_bounce'])
- self.assertEqual(data['apply_hostname_bounce']['hostname'],
- odata['HostName'])
-
- def test_apply_bounce_call_configurable(self):
- # hostname_bounce should be configurable in datasource cfg
- cfg = {'hostname_bounce': {'interface': 'eth1', 'policy': 'off',
- 'command': 'my-bounce-command',
- 'hostname_command': 'my-hostname-command'}}
- odata = {'HostName': "xhost",
- 'dscfg': {'text': base64.b64encode(yaml.dump(cfg)),
- 'encoding': 'base64'}}
- data = {'ovfcontent': construct_valid_ovf_env(data=odata)}
- self._get_ds(data).get_data()
+ dsrc = self._get_ds(data)
+ ret = dsrc.get_data()
+ self.assertTrue(ret)
- for k in cfg['hostname_bounce']:
- self.assertIn(k, data['apply_hostname_bounce'])
+ for mypk in mypklist:
+ self.assertIn(mypk, dsrc.cfg['_pubkeys'])
+ self.assertIn(mypk['value'], dsrc.metadata['public-keys'])
- for k, v in cfg['hostname_bounce'].items():
- self.assertEqual(data['apply_hostname_bounce'][k], v)
+ def test_cfg_has_no_fingerprint_has_value(self):
+ # test value is used when fingerprint not provided
+ odata = {'HostName': "myhost", 'UserName': "myuser"}
+ mypklist = [{'fingerprint': None, 'path': 'path1', 'value': 'value1'}]
+ pubkeys = [(x['fingerprint'], x['path'], x['value']) for x in mypklist]
+ data = {'ovfcontent': construct_valid_ovf_env(data=odata,
+ pubkeys=pubkeys)}
- def test_set_hostname_disabled(self):
- # config specifying set_hostname off should not bounce
- cfg = {'set_hostname': False}
- odata = {'HostName': "xhost",
- 'dscfg': {'text': base64.b64encode(yaml.dump(cfg)),
- 'encoding': 'base64'}}
- data = {'ovfcontent': construct_valid_ovf_env(data=odata)}
- self._get_ds(data).get_data()
+ dsrc = self._get_ds(data)
+ ret = dsrc.get_data()
+ self.assertTrue(ret)
- self.assertEqual(data.get('apply_hostname_bounce', "N/A"), "N/A")
+ for mypk in mypklist:
+ self.assertIn(mypk['value'], dsrc.metadata['public-keys'])
def test_default_ephemeral(self):
# make sure the ephemeral device works
@@ -318,8 +365,8 @@ class TestAzureDataSource(MockerTestCase):
# Make sure that user can affect disk aliases
dscfg = {'disk_aliases': {'ephemeral0': '/dev/sdc'}}
odata = {'HostName': "myhost", 'UserName': "myuser",
- 'dscfg': {'text': base64.b64encode(yaml.dump(dscfg)),
- 'encoding': 'base64'}}
+ 'dscfg': {'text': b64e(yaml.dump(dscfg)),
+ 'encoding': 'base64'}}
usercfg = {'disk_setup': {'/dev/sdc': {'something': '...'},
'ephemeral0': False}}
userdata = '#cloud-config' + yaml.dump(usercfg) + "\n"
@@ -340,7 +387,32 @@ class TestAzureDataSource(MockerTestCase):
dsrc = self._get_ds(data)
dsrc.get_data()
- self.assertEqual(userdata, dsrc.userdata_raw)
+ self.assertEqual(userdata.encode('us-ascii'), dsrc.userdata_raw)
+
+ def test_password_redacted_in_ovf(self):
+ odata = {'HostName': "myhost", 'UserName': "myuser",
+ 'UserPassword': "mypass"}
+ data = {'ovfcontent': construct_valid_ovf_env(data=odata)}
+ dsrc = self._get_ds(data)
+ ret = dsrc.get_data()
+
+ self.assertTrue(ret)
+ ovf_env_path = os.path.join(self.waagent_d, 'ovf-env.xml')
+
+ # The XML should not be same since the user password is redacted
+ on_disk_ovf = load_file(ovf_env_path)
+ self.xml_notequals(data['ovfcontent'], on_disk_ovf)
+
+ # Make sure that the redacted password on disk is not used by CI
+ self.assertNotEquals(dsrc.cfg.get('password'),
+ DataSourceAzure.DEF_PASSWD_REDACTION)
+
+ # Make sure that the password was really encrypted
+ et = ET.fromstring(on_disk_ovf)
+ for elem in et.iter():
+ if 'UserPassword' in elem.tag:
+ self.assertEquals(DataSourceAzure.DEF_PASSWD_REDACTION,
+ elem.text)
def test_ovf_env_arrives_in_waagent_dir(self):
xml = construct_valid_ovf_env(data={}, userdata="FOODATA")
@@ -351,92 +423,224 @@ class TestAzureDataSource(MockerTestCase):
# we expect that the ovf-env.xml file is copied there.
ovf_env_path = os.path.join(self.waagent_d, 'ovf-env.xml')
self.assertTrue(os.path.exists(ovf_env_path))
- self.assertEqual(xml, load_file(ovf_env_path))
-
- def test_existing_ovf_same(self):
- # waagent/SharedConfig left alone if found ovf-env.xml same as cached
- odata = {'UserData': base64.b64encode("SOMEUSERDATA")}
- data = {'ovfcontent': construct_valid_ovf_env(data=odata)}
+ self.xml_equals(xml, load_file(ovf_env_path))
- populate_dir(self.waagent_d,
- {'ovf-env.xml': data['ovfcontent'],
- 'otherfile': 'otherfile-content',
- 'SharedConfig.xml': 'mysharedconfig'})
+ def test_ovf_can_include_unicode(self):
+ xml = construct_valid_ovf_env(data={})
+ xml = u'\ufeff{0}'.format(xml)
+ dsrc = self._get_ds({'ovfcontent': xml})
+ dsrc.get_data()
- dsrc = self._get_ds(data)
- ret = dsrc.get_data()
- self.assertTrue(ret)
- self.assertTrue(os.path.exists(
- os.path.join(self.waagent_d, 'ovf-env.xml')))
- self.assertTrue(os.path.exists(
- os.path.join(self.waagent_d, 'otherfile')))
- self.assertTrue(os.path.exists(
- os.path.join(self.waagent_d, 'SharedConfig.xml')))
-
- def test_existing_ovf_diff(self):
- # waagent/SharedConfig must be removed if ovfenv is found elsewhere
-
- # 'get_data' should remove SharedConfig.xml in /var/lib/waagent
- # if ovf-env.xml differs.
- cached_ovfenv = construct_valid_ovf_env(
- {'userdata': base64.b64encode("FOO_USERDATA")})
- new_ovfenv = construct_valid_ovf_env(
- {'userdata': base64.b64encode("NEW_USERDATA")})
-
- populate_dir(self.waagent_d,
- {'ovf-env.xml': cached_ovfenv,
- 'SharedConfig.xml': "mysharedconfigxml",
- 'otherfile': 'otherfilecontent'})
-
- dsrc = self._get_ds({'ovfcontent': new_ovfenv})
- ret = dsrc.get_data()
+ def test_exception_fetching_fabric_data_doesnt_propagate(self):
+ ds = self._get_ds({'ovfcontent': construct_valid_ovf_env()})
+ ds.ds_cfg['agent_command'] = '__builtin__'
+ self.get_metadata_from_fabric.side_effect = Exception
+ self.assertFalse(ds.get_data())
+
+ def test_fabric_data_included_in_metadata(self):
+ ds = self._get_ds({'ovfcontent': construct_valid_ovf_env()})
+ ds.ds_cfg['agent_command'] = '__builtin__'
+ self.get_metadata_from_fabric.return_value = {'test': 'value'}
+ ret = ds.get_data()
self.assertTrue(ret)
- self.assertEqual(dsrc.userdata_raw, "NEW_USERDATA")
- self.assertTrue(os.path.exists(
- os.path.join(self.waagent_d, 'otherfile')))
- self.assertFalse(
- os.path.exists(os.path.join(self.waagent_d, 'SharedConfig.xml')))
- self.assertTrue(
- os.path.exists(os.path.join(self.waagent_d, 'ovf-env.xml')))
- self.assertEqual(new_ovfenv,
- load_file(os.path.join(self.waagent_d, 'ovf-env.xml')))
-
-
-class TestReadAzureOvf(MockerTestCase):
+ self.assertEqual('value', ds.metadata['test'])
+
+ def test_instance_id_from_dmidecode_used(self):
+ ds = self._get_ds({'ovfcontent': construct_valid_ovf_env()})
+ ds.get_data()
+ self.assertEqual(self.instance_id, ds.metadata['instance-id'])
+
+ def test_instance_id_from_dmidecode_used_for_builtin(self):
+ ds = self._get_ds({'ovfcontent': construct_valid_ovf_env()})
+ ds.ds_cfg['agent_command'] = '__builtin__'
+ ds.get_data()
+ self.assertEqual(self.instance_id, ds.metadata['instance-id'])
+
+
+class TestAzureBounce(TestCase):
+
+ def mock_out_azure_moving_parts(self):
+ self.patches.enter_context(
+ mock.patch.object(DataSourceAzure, 'invoke_agent'))
+ self.patches.enter_context(
+ mock.patch.object(DataSourceAzure, 'wait_for_files'))
+ self.patches.enter_context(
+ mock.patch.object(DataSourceAzure, 'list_possible_azure_ds_devs',
+ mock.MagicMock(return_value=[])))
+ self.patches.enter_context(
+ mock.patch.object(DataSourceAzure,
+ 'find_fabric_formatted_ephemeral_disk',
+ mock.MagicMock(return_value=None)))
+ self.patches.enter_context(
+ mock.patch.object(DataSourceAzure,
+ 'find_fabric_formatted_ephemeral_part',
+ mock.MagicMock(return_value=None)))
+ self.patches.enter_context(
+ mock.patch.object(DataSourceAzure, 'get_metadata_from_fabric',
+ mock.MagicMock(return_value={})))
+ self.patches.enter_context(
+ mock.patch.object(DataSourceAzure.util, 'read_dmi_data',
+ mock.MagicMock(return_value='test-instance-id')))
+
+ def setUp(self):
+ super(TestAzureBounce, self).setUp()
+ self.tmp = tempfile.mkdtemp()
+ self.waagent_d = os.path.join(self.tmp, 'var', 'lib', 'waagent')
+ self.paths = helpers.Paths({'cloud_dir': self.tmp})
+ self.addCleanup(shutil.rmtree, self.tmp)
+ DataSourceAzure.BUILTIN_DS_CONFIG['data_dir'] = self.waagent_d
+ self.patches = ExitStack()
+ self.mock_out_azure_moving_parts()
+ self.get_hostname = self.patches.enter_context(
+ mock.patch.object(DataSourceAzure, 'get_hostname'))
+ self.set_hostname = self.patches.enter_context(
+ mock.patch.object(DataSourceAzure, 'set_hostname'))
+ self.subp = self.patches.enter_context(
+ mock.patch('cloudinit.sources.DataSourceAzure.util.subp'))
+
+ def tearDown(self):
+ self.patches.close()
+
+ def _get_ds(self, ovfcontent=None):
+ if ovfcontent is not None:
+ populate_dir(os.path.join(self.paths.seed_dir, "azure"),
+ {'ovf-env.xml': ovfcontent})
+ return DataSourceAzure.DataSourceAzureNet(
+ {}, distro=None, paths=self.paths)
+
+ def get_ovf_env_with_dscfg(self, hostname, cfg):
+ odata = {
+ 'HostName': hostname,
+ 'dscfg': {
+ 'text': b64e(yaml.dump(cfg)),
+ 'encoding': 'base64'
+ }
+ }
+ return construct_valid_ovf_env(data=odata)
+
+ def test_disabled_bounce_does_not_change_hostname(self):
+ cfg = {'hostname_bounce': {'policy': 'off'}}
+ self._get_ds(self.get_ovf_env_with_dscfg('test-host', cfg)).get_data()
+ self.assertEqual(0, self.set_hostname.call_count)
+
+ @mock.patch('cloudinit.sources.DataSourceAzure.perform_hostname_bounce')
+ def test_disabled_bounce_does_not_perform_bounce(
+ self, perform_hostname_bounce):
+ cfg = {'hostname_bounce': {'policy': 'off'}}
+ self._get_ds(self.get_ovf_env_with_dscfg('test-host', cfg)).get_data()
+ self.assertEqual(0, perform_hostname_bounce.call_count)
+
+ def test_same_hostname_does_not_change_hostname(self):
+ host_name = 'unchanged-host-name'
+ self.get_hostname.return_value = host_name
+ cfg = {'hostname_bounce': {'policy': 'yes'}}
+ self._get_ds(self.get_ovf_env_with_dscfg(host_name, cfg)).get_data()
+ self.assertEqual(0, self.set_hostname.call_count)
+
+ @mock.patch('cloudinit.sources.DataSourceAzure.perform_hostname_bounce')
+ def test_unchanged_hostname_does_not_perform_bounce(
+ self, perform_hostname_bounce):
+ host_name = 'unchanged-host-name'
+ self.get_hostname.return_value = host_name
+ cfg = {'hostname_bounce': {'policy': 'yes'}}
+ self._get_ds(self.get_ovf_env_with_dscfg(host_name, cfg)).get_data()
+ self.assertEqual(0, perform_hostname_bounce.call_count)
+
+ @mock.patch('cloudinit.sources.DataSourceAzure.perform_hostname_bounce')
+ def test_force_performs_bounce_regardless(self, perform_hostname_bounce):
+ host_name = 'unchanged-host-name'
+ self.get_hostname.return_value = host_name
+ cfg = {'hostname_bounce': {'policy': 'force'}}
+ self._get_ds(self.get_ovf_env_with_dscfg(host_name, cfg)).get_data()
+ self.assertEqual(1, perform_hostname_bounce.call_count)
+
+ def test_different_hostnames_sets_hostname(self):
+ expected_hostname = 'azure-expected-host-name'
+ self.get_hostname.return_value = 'default-host-name'
+ self._get_ds(
+ self.get_ovf_env_with_dscfg(expected_hostname, {})).get_data()
+ self.assertEqual(expected_hostname,
+ self.set_hostname.call_args_list[0][0][0])
+
+ @mock.patch('cloudinit.sources.DataSourceAzure.perform_hostname_bounce')
+ def test_different_hostnames_performs_bounce(
+ self, perform_hostname_bounce):
+ expected_hostname = 'azure-expected-host-name'
+ self.get_hostname.return_value = 'default-host-name'
+ self._get_ds(
+ self.get_ovf_env_with_dscfg(expected_hostname, {})).get_data()
+ self.assertEqual(1, perform_hostname_bounce.call_count)
+
+ def test_different_hostnames_sets_hostname_back(self):
+ initial_host_name = 'default-host-name'
+ self.get_hostname.return_value = initial_host_name
+ self._get_ds(
+ self.get_ovf_env_with_dscfg('some-host-name', {})).get_data()
+ self.assertEqual(initial_host_name,
+ self.set_hostname.call_args_list[-1][0][0])
+
+ @mock.patch('cloudinit.sources.DataSourceAzure.perform_hostname_bounce')
+ def test_failure_in_bounce_still_resets_host_name(
+ self, perform_hostname_bounce):
+ perform_hostname_bounce.side_effect = Exception
+ initial_host_name = 'default-host-name'
+ self.get_hostname.return_value = initial_host_name
+ self._get_ds(
+ self.get_ovf_env_with_dscfg('some-host-name', {})).get_data()
+ self.assertEqual(initial_host_name,
+ self.set_hostname.call_args_list[-1][0][0])
+
+ def test_environment_correct_for_bounce_command(self):
+ interface = 'int0'
+ hostname = 'my-new-host'
+ old_hostname = 'my-old-host'
+ self.get_hostname.return_value = old_hostname
+ cfg = {'hostname_bounce': {'interface': interface, 'policy': 'force'}}
+ data = self.get_ovf_env_with_dscfg(hostname, cfg)
+ self._get_ds(data).get_data()
+ self.assertEqual(1, self.subp.call_count)
+ bounce_env = self.subp.call_args[1]['env']
+ self.assertEqual(interface, bounce_env['interface'])
+ self.assertEqual(hostname, bounce_env['hostname'])
+ self.assertEqual(old_hostname, bounce_env['old_hostname'])
+
+ def test_default_bounce_command_used_by_default(self):
+ cmd = 'default-bounce-command'
+ DataSourceAzure.BUILTIN_DS_CONFIG['hostname_bounce']['command'] = cmd
+ cfg = {'hostname_bounce': {'policy': 'force'}}
+ data = self.get_ovf_env_with_dscfg('some-hostname', cfg)
+ self._get_ds(data).get_data()
+ self.assertEqual(1, self.subp.call_count)
+ bounce_args = self.subp.call_args[1]['args']
+ self.assertEqual(cmd, bounce_args)
+
+ @mock.patch('cloudinit.sources.DataSourceAzure.perform_hostname_bounce')
+ def test_set_hostname_option_can_disable_bounce(
+ self, perform_hostname_bounce):
+ cfg = {'set_hostname': False, 'hostname_bounce': {'policy': 'force'}}
+ data = self.get_ovf_env_with_dscfg('some-hostname', cfg)
+ self._get_ds(data).get_data()
+
+ self.assertEqual(0, perform_hostname_bounce.call_count)
+
+ def test_set_hostname_option_can_disable_hostname_set(self):
+ cfg = {'set_hostname': False, 'hostname_bounce': {'policy': 'force'}}
+ data = self.get_ovf_env_with_dscfg('some-hostname', cfg)
+ self._get_ds(data).get_data()
+
+ self.assertEqual(0, self.set_hostname.call_count)
+
+
+class TestReadAzureOvf(TestCase):
def test_invalid_xml_raises_non_azure_ds(self):
invalid_xml = "<foo>" + construct_valid_ovf_env(data={})
self.assertRaises(DataSourceAzure.BrokenAzureDataSource,
- DataSourceAzure.read_azure_ovf, invalid_xml)
+ DataSourceAzure.read_azure_ovf, invalid_xml)
def test_load_with_pubkeys(self):
- mypklist = [{'fingerprint': 'fp1', 'path': 'path1'}]
- pubkeys = [(x['fingerprint'], x['path']) for x in mypklist]
+ mypklist = [{'fingerprint': 'fp1', 'path': 'path1', 'value': ''}]
+ pubkeys = [(x['fingerprint'], x['path'], x['value']) for x in mypklist]
content = construct_valid_ovf_env(pubkeys=pubkeys)
(_md, _ud, cfg) = DataSourceAzure.read_azure_ovf(content)
for mypk in mypklist:
self.assertIn(mypk, cfg['_pubkeys'])
-
-
-class TestReadAzureSharedConfig(MockerTestCase):
- def test_valid_content(self):
- xml = """<?xml version="1.0" encoding="utf-8"?>
- <SharedConfig>
- <Deployment name="MY_INSTANCE_ID">
- <Service name="myservice"/>
- <ServiceInstance name="INSTANCE_ID.0" guid="{abcd-uuid}" />
- </Deployment>
- <Incarnation number="1"/>
- </SharedConfig>"""
- ret = DataSourceAzure.iid_from_shared_config_content(xml)
- self.assertEqual("MY_INSTANCE_ID", ret)
-
-
-def apply_patches(patches):
- ret = []
- for (ref, name, replace) in patches:
- if replace is None:
- continue
- orig = getattr(ref, name)
- setattr(ref, name, replace)
- ret.append((ref, name, orig))
- return ret
diff --git a/tests/unittests/test_datasource/test_azure_helper.py b/tests/unittests/test_datasource/test_azure_helper.py
new file mode 100644
index 00000000..1134199b
--- /dev/null
+++ b/tests/unittests/test_datasource/test_azure_helper.py
@@ -0,0 +1,420 @@
+import os
+
+from cloudinit.sources.helpers import azure as azure_helper
+from ..helpers import TestCase
+
+try:
+ from unittest import mock
+except ImportError:
+ import mock
+
+try:
+ from contextlib import ExitStack
+except ImportError:
+ from contextlib2 import ExitStack
+
+
+GOAL_STATE_TEMPLATE = """\
+<?xml version="1.0" encoding="utf-8"?>
+<GoalState xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:noNamespaceSchemaLocation="goalstate10.xsd">
+ <Version>2012-11-30</Version>
+ <Incarnation>{incarnation}</Incarnation>
+ <Machine>
+ <ExpectedState>Started</ExpectedState>
+ <StopRolesDeadlineHint>300000</StopRolesDeadlineHint>
+ <LBProbePorts>
+ <Port>16001</Port>
+ </LBProbePorts>
+ <ExpectHealthReport>FALSE</ExpectHealthReport>
+ </Machine>
+ <Container>
+ <ContainerId>{container_id}</ContainerId>
+ <RoleInstanceList>
+ <RoleInstance>
+ <InstanceId>{instance_id}</InstanceId>
+ <State>Started</State>
+ <Configuration>
+ <HostingEnvironmentConfig>
+ http://100.86.192.70:80/...hostingEnvironmentConfig...
+ </HostingEnvironmentConfig>
+ <SharedConfig>http://100.86.192.70:80/..SharedConfig..</SharedConfig>
+ <ExtensionsConfig>
+ http://100.86.192.70:80/...extensionsConfig...
+ </ExtensionsConfig>
+ <FullConfig>http://100.86.192.70:80/...fullConfig...</FullConfig>
+ <Certificates>{certificates_url}</Certificates>
+ <ConfigName>68ce47.0.68ce47.0.utl-trusty--292258.1.xml</ConfigName>
+ </Configuration>
+ </RoleInstance>
+ </RoleInstanceList>
+ </Container>
+</GoalState>
+"""
+
+
+class TestFindEndpoint(TestCase):
+
+ def setUp(self):
+ super(TestFindEndpoint, self).setUp()
+ patches = ExitStack()
+ self.addCleanup(patches.close)
+
+ self.load_file = patches.enter_context(
+ mock.patch.object(azure_helper.util, 'load_file'))
+
+ def test_missing_file(self):
+ self.load_file.side_effect = IOError
+ self.assertRaises(IOError,
+ azure_helper.WALinuxAgentShim.find_endpoint)
+
+ def test_missing_special_azure_line(self):
+ self.load_file.return_value = ''
+ self.assertRaises(Exception,
+ azure_helper.WALinuxAgentShim.find_endpoint)
+
+ @staticmethod
+ def _build_lease_content(encoded_address):
+ return '\n'.join([
+ 'lease {',
+ ' interface "eth0";',
+ ' option unknown-245 {0};'.format(encoded_address),
+ '}'])
+
+ def test_latest_lease_used(self):
+ encoded_addresses = ['5:4:3:2', '4:3:2:1']
+ file_content = '\n'.join([self._build_lease_content(encoded_address)
+ for encoded_address in encoded_addresses])
+ self.load_file.return_value = file_content
+ self.assertEqual(encoded_addresses[-1].replace(':', '.'),
+ azure_helper.WALinuxAgentShim.find_endpoint())
+
+
+class TestExtractIpAddressFromLeaseValue(TestCase):
+
+ def test_hex_string(self):
+ ip_address, encoded_address = '98.76.54.32', '62:4c:36:20'
+ self.assertEqual(
+ ip_address,
+ azure_helper.WALinuxAgentShim.get_ip_from_lease_value(
+ encoded_address
+ ))
+
+ def test_hex_string_with_single_character_part(self):
+ ip_address, encoded_address = '4.3.2.1', '4:3:2:1'
+ self.assertEqual(
+ ip_address,
+ azure_helper.WALinuxAgentShim.get_ip_from_lease_value(
+ encoded_address
+ ))
+
+ def test_packed_string(self):
+ ip_address, encoded_address = '98.76.54.32', 'bL6 '
+ self.assertEqual(
+ ip_address,
+ azure_helper.WALinuxAgentShim.get_ip_from_lease_value(
+ encoded_address
+ ))
+
+ def test_packed_string_with_escaped_quote(self):
+ ip_address, encoded_address = '100.72.34.108', 'dH\\"l'
+ self.assertEqual(
+ ip_address,
+ azure_helper.WALinuxAgentShim.get_ip_from_lease_value(
+ encoded_address
+ ))
+
+ def test_packed_string_containing_a_colon(self):
+ ip_address, encoded_address = '100.72.58.108', 'dH:l'
+ self.assertEqual(
+ ip_address,
+ azure_helper.WALinuxAgentShim.get_ip_from_lease_value(
+ encoded_address
+ ))
+
+
+class TestGoalStateParsing(TestCase):
+
+ default_parameters = {
+ 'incarnation': 1,
+ 'container_id': 'MyContainerId',
+ 'instance_id': 'MyInstanceId',
+ 'certificates_url': 'MyCertificatesUrl',
+ }
+
+ def _get_goal_state(self, http_client=None, **kwargs):
+ if http_client is None:
+ http_client = mock.MagicMock()
+ parameters = self.default_parameters.copy()
+ parameters.update(kwargs)
+ xml = GOAL_STATE_TEMPLATE.format(**parameters)
+ if parameters['certificates_url'] is None:
+ new_xml_lines = []
+ for line in xml.splitlines():
+ if 'Certificates' in line:
+ continue
+ new_xml_lines.append(line)
+ xml = '\n'.join(new_xml_lines)
+ return azure_helper.GoalState(xml, http_client)
+
+ def test_incarnation_parsed_correctly(self):
+ incarnation = '123'
+ goal_state = self._get_goal_state(incarnation=incarnation)
+ self.assertEqual(incarnation, goal_state.incarnation)
+
+ def test_container_id_parsed_correctly(self):
+ container_id = 'TestContainerId'
+ goal_state = self._get_goal_state(container_id=container_id)
+ self.assertEqual(container_id, goal_state.container_id)
+
+ def test_instance_id_parsed_correctly(self):
+ instance_id = 'TestInstanceId'
+ goal_state = self._get_goal_state(instance_id=instance_id)
+ self.assertEqual(instance_id, goal_state.instance_id)
+
+ def test_certificates_xml_parsed_and_fetched_correctly(self):
+ http_client = mock.MagicMock()
+ certificates_url = 'TestCertificatesUrl'
+ goal_state = self._get_goal_state(
+ http_client=http_client, certificates_url=certificates_url)
+ certificates_xml = goal_state.certificates_xml
+ self.assertEqual(1, http_client.get.call_count)
+ self.assertEqual(certificates_url, http_client.get.call_args[0][0])
+ self.assertTrue(http_client.get.call_args[1].get('secure', False))
+ self.assertEqual(http_client.get.return_value.contents,
+ certificates_xml)
+
+ def test_missing_certificates_skips_http_get(self):
+ http_client = mock.MagicMock()
+ goal_state = self._get_goal_state(
+ http_client=http_client, certificates_url=None)
+ certificates_xml = goal_state.certificates_xml
+ self.assertEqual(0, http_client.get.call_count)
+ self.assertIsNone(certificates_xml)
+
+
+class TestAzureEndpointHttpClient(TestCase):
+
+ regular_headers = {
+ 'x-ms-agent-name': 'WALinuxAgent',
+ 'x-ms-version': '2012-11-30',
+ }
+
+ def setUp(self):
+ super(TestAzureEndpointHttpClient, self).setUp()
+ patches = ExitStack()
+ self.addCleanup(patches.close)
+
+ self.read_file_or_url = patches.enter_context(
+ mock.patch.object(azure_helper.util, 'read_file_or_url'))
+
+ def test_non_secure_get(self):
+ client = azure_helper.AzureEndpointHttpClient(mock.MagicMock())
+ url = 'MyTestUrl'
+ response = client.get(url, secure=False)
+ self.assertEqual(1, self.read_file_or_url.call_count)
+ self.assertEqual(self.read_file_or_url.return_value, response)
+ self.assertEqual(mock.call(url, headers=self.regular_headers),
+ self.read_file_or_url.call_args)
+
+ def test_secure_get(self):
+ url = 'MyTestUrl'
+ certificate = mock.MagicMock()
+ expected_headers = self.regular_headers.copy()
+ expected_headers.update({
+ "x-ms-cipher-name": "DES_EDE3_CBC",
+ "x-ms-guest-agent-public-x509-cert": certificate,
+ })
+ client = azure_helper.AzureEndpointHttpClient(certificate)
+ response = client.get(url, secure=True)
+ self.assertEqual(1, self.read_file_or_url.call_count)
+ self.assertEqual(self.read_file_or_url.return_value, response)
+ self.assertEqual(mock.call(url, headers=expected_headers),
+ self.read_file_or_url.call_args)
+
+ def test_post(self):
+ data = mock.MagicMock()
+ url = 'MyTestUrl'
+ client = azure_helper.AzureEndpointHttpClient(mock.MagicMock())
+ response = client.post(url, data=data)
+ self.assertEqual(1, self.read_file_or_url.call_count)
+ self.assertEqual(self.read_file_or_url.return_value, response)
+ self.assertEqual(
+ mock.call(url, data=data, headers=self.regular_headers),
+ self.read_file_or_url.call_args)
+
+ def test_post_with_extra_headers(self):
+ url = 'MyTestUrl'
+ client = azure_helper.AzureEndpointHttpClient(mock.MagicMock())
+ extra_headers = {'test': 'header'}
+ client.post(url, extra_headers=extra_headers)
+ self.assertEqual(1, self.read_file_or_url.call_count)
+ expected_headers = self.regular_headers.copy()
+ expected_headers.update(extra_headers)
+ self.assertEqual(
+ mock.call(mock.ANY, data=mock.ANY, headers=expected_headers),
+ self.read_file_or_url.call_args)
+
+
+class TestOpenSSLManager(TestCase):
+
+ def setUp(self):
+ super(TestOpenSSLManager, self).setUp()
+ patches = ExitStack()
+ self.addCleanup(patches.close)
+
+ self.subp = patches.enter_context(
+ mock.patch.object(azure_helper.util, 'subp'))
+ try:
+ self.open = patches.enter_context(
+ mock.patch('__builtin__.open'))
+ except ImportError:
+ self.open = patches.enter_context(
+ mock.patch('builtins.open'))
+
+ @mock.patch.object(azure_helper, 'cd', mock.MagicMock())
+ @mock.patch.object(azure_helper.tempfile, 'mkdtemp')
+ def test_openssl_manager_creates_a_tmpdir(self, mkdtemp):
+ manager = azure_helper.OpenSSLManager()
+ self.assertEqual(mkdtemp.return_value, manager.tmpdir)
+
+ def test_generate_certificate_uses_tmpdir(self):
+ subp_directory = {}
+
+ def capture_directory(*args, **kwargs):
+ subp_directory['path'] = os.getcwd()
+
+ self.subp.side_effect = capture_directory
+ manager = azure_helper.OpenSSLManager()
+ self.assertEqual(manager.tmpdir, subp_directory['path'])
+
+ @mock.patch.object(azure_helper, 'cd', mock.MagicMock())
+ @mock.patch.object(azure_helper.tempfile, 'mkdtemp', mock.MagicMock())
+ @mock.patch.object(azure_helper.util, 'del_dir')
+ def test_clean_up(self, del_dir):
+ manager = azure_helper.OpenSSLManager()
+ manager.clean_up()
+ self.assertEqual([mock.call(manager.tmpdir)], del_dir.call_args_list)
+
+
+class TestWALinuxAgentShim(TestCase):
+
+ def setUp(self):
+ super(TestWALinuxAgentShim, self).setUp()
+ patches = ExitStack()
+ self.addCleanup(patches.close)
+
+ self.AzureEndpointHttpClient = patches.enter_context(
+ mock.patch.object(azure_helper, 'AzureEndpointHttpClient'))
+ self.find_endpoint = patches.enter_context(
+ mock.patch.object(
+ azure_helper.WALinuxAgentShim, 'find_endpoint'))
+ self.GoalState = patches.enter_context(
+ mock.patch.object(azure_helper, 'GoalState'))
+ self.OpenSSLManager = patches.enter_context(
+ mock.patch.object(azure_helper, 'OpenSSLManager'))
+ patches.enter_context(
+ mock.patch.object(azure_helper.time, 'sleep', mock.MagicMock()))
+
+ def test_http_client_uses_certificate(self):
+ shim = azure_helper.WALinuxAgentShim()
+ shim.register_with_azure_and_fetch_data()
+ self.assertEqual(
+ [mock.call(self.OpenSSLManager.return_value.certificate)],
+ self.AzureEndpointHttpClient.call_args_list)
+
+ def test_correct_url_used_for_goalstate(self):
+ self.find_endpoint.return_value = 'test_endpoint'
+ shim = azure_helper.WALinuxAgentShim()
+ shim.register_with_azure_and_fetch_data()
+ get = self.AzureEndpointHttpClient.return_value.get
+ self.assertEqual(
+ [mock.call('http://test_endpoint/machine/?comp=goalstate')],
+ get.call_args_list)
+ self.assertEqual(
+ [mock.call(get.return_value.contents,
+ self.AzureEndpointHttpClient.return_value)],
+ self.GoalState.call_args_list)
+
+ def test_certificates_used_to_determine_public_keys(self):
+ shim = azure_helper.WALinuxAgentShim()
+ data = shim.register_with_azure_and_fetch_data()
+ self.assertEqual(
+ [mock.call(self.GoalState.return_value.certificates_xml)],
+ self.OpenSSLManager.return_value.parse_certificates.call_args_list)
+ self.assertEqual(
+ self.OpenSSLManager.return_value.parse_certificates.return_value,
+ data['public-keys'])
+
+ def test_absent_certificates_produces_empty_public_keys(self):
+ self.GoalState.return_value.certificates_xml = None
+ shim = azure_helper.WALinuxAgentShim()
+ data = shim.register_with_azure_and_fetch_data()
+ self.assertEqual([], data['public-keys'])
+
+ def test_correct_url_used_for_report_ready(self):
+ self.find_endpoint.return_value = 'test_endpoint'
+ shim = azure_helper.WALinuxAgentShim()
+ shim.register_with_azure_and_fetch_data()
+ expected_url = 'http://test_endpoint/machine?comp=health'
+ self.assertEqual(
+ [mock.call(expected_url, data=mock.ANY, extra_headers=mock.ANY)],
+ self.AzureEndpointHttpClient.return_value.post.call_args_list)
+
+ def test_goal_state_values_used_for_report_ready(self):
+ self.GoalState.return_value.incarnation = 'TestIncarnation'
+ self.GoalState.return_value.container_id = 'TestContainerId'
+ self.GoalState.return_value.instance_id = 'TestInstanceId'
+ shim = azure_helper.WALinuxAgentShim()
+ shim.register_with_azure_and_fetch_data()
+ posted_document = (
+ self.AzureEndpointHttpClient.return_value.post.call_args[1]['data']
+ )
+ self.assertIn('TestIncarnation', posted_document)
+ self.assertIn('TestContainerId', posted_document)
+ self.assertIn('TestInstanceId', posted_document)
+
+ def test_clean_up_can_be_called_at_any_time(self):
+ shim = azure_helper.WALinuxAgentShim()
+ shim.clean_up()
+
+ def test_clean_up_will_clean_up_openssl_manager_if_instantiated(self):
+ shim = azure_helper.WALinuxAgentShim()
+ shim.register_with_azure_and_fetch_data()
+ shim.clean_up()
+ self.assertEqual(
+ 1, self.OpenSSLManager.return_value.clean_up.call_count)
+
+ def test_failure_to_fetch_goalstate_bubbles_up(self):
+ class SentinelException(Exception):
+ pass
+ self.AzureEndpointHttpClient.return_value.get.side_effect = (
+ SentinelException)
+ shim = azure_helper.WALinuxAgentShim()
+ self.assertRaises(SentinelException,
+ shim.register_with_azure_and_fetch_data)
+
+
+class TestGetMetadataFromFabric(TestCase):
+
+ @mock.patch.object(azure_helper, 'WALinuxAgentShim')
+ def test_data_from_shim_returned(self, shim):
+ ret = azure_helper.get_metadata_from_fabric()
+ self.assertEqual(
+ shim.return_value.register_with_azure_and_fetch_data.return_value,
+ ret)
+
+ @mock.patch.object(azure_helper, 'WALinuxAgentShim')
+ def test_success_calls_clean_up(self, shim):
+ azure_helper.get_metadata_from_fabric()
+ self.assertEqual(1, shim.return_value.clean_up.call_count)
+
+ @mock.patch.object(azure_helper, 'WALinuxAgentShim')
+ def test_failure_in_registration_calls_clean_up(self, shim):
+ class SentinelException(Exception):
+ pass
+ shim.return_value.register_with_azure_and_fetch_data.side_effect = (
+ SentinelException)
+ self.assertRaises(SentinelException,
+ azure_helper.get_metadata_from_fabric)
+ self.assertEqual(1, shim.return_value.clean_up.call_count)
diff --git a/tests/unittests/test_datasource/test_cloudsigma.py b/tests/unittests/test_datasource/test_cloudsigma.py
index 306ac7d8..772d189a 100644
--- a/tests/unittests/test_datasource/test_cloudsigma.py
+++ b/tests/unittests/test_datasource/test_cloudsigma.py
@@ -39,6 +39,7 @@ class CepkoMock(Cepko):
class DataSourceCloudSigmaTest(test_helpers.TestCase):
def setUp(self):
+ super(DataSourceCloudSigmaTest, self).setUp()
self.datasource = DataSourceCloudSigma.DataSourceCloudSigma("", "", "")
self.datasource.is_running_in_cloudsigma = lambda: True
self.datasource.cepko = CepkoMock(SERVER_CONTEXT)
diff --git a/tests/unittests/test_datasource/test_cloudstack.py b/tests/unittests/test_datasource/test_cloudstack.py
new file mode 100644
index 00000000..656d80d1
--- /dev/null
+++ b/tests/unittests/test_datasource/test_cloudstack.py
@@ -0,0 +1,86 @@
+from cloudinit import helpers
+from cloudinit.sources.DataSourceCloudStack import DataSourceCloudStack
+from ..helpers import TestCase
+
+try:
+ from unittest import mock
+except ImportError:
+ import mock
+try:
+ from contextlib import ExitStack
+except ImportError:
+ from contextlib2 import ExitStack
+
+
+class TestCloudStackPasswordFetching(TestCase):
+
+ def setUp(self):
+ super(TestCloudStackPasswordFetching, self).setUp()
+ self.patches = ExitStack()
+ self.addCleanup(self.patches.close)
+ mod_name = 'cloudinit.sources.DataSourceCloudStack'
+ self.patches.enter_context(mock.patch('{0}.ec2'.format(mod_name)))
+ self.patches.enter_context(mock.patch('{0}.uhelp'.format(mod_name)))
+
+ def _set_password_server_response(self, response_string):
+ subp = mock.MagicMock(return_value=(response_string, ''))
+ self.patches.enter_context(
+ mock.patch('cloudinit.sources.DataSourceCloudStack.util.subp',
+ subp))
+ return subp
+
+ def test_empty_password_doesnt_create_config(self):
+ self._set_password_server_response('')
+ ds = DataSourceCloudStack({}, None, helpers.Paths({}))
+ ds.get_data()
+ self.assertEqual({}, ds.get_config_obj())
+
+ def test_saved_password_doesnt_create_config(self):
+ self._set_password_server_response('saved_password')
+ ds = DataSourceCloudStack({}, None, helpers.Paths({}))
+ ds.get_data()
+ self.assertEqual({}, ds.get_config_obj())
+
+ def test_password_sets_password(self):
+ password = 'SekritSquirrel'
+ self._set_password_server_response(password)
+ ds = DataSourceCloudStack({}, None, helpers.Paths({}))
+ ds.get_data()
+ self.assertEqual(password, ds.get_config_obj()['password'])
+
+ def test_bad_request_doesnt_stop_ds_from_working(self):
+ self._set_password_server_response('bad_request')
+ ds = DataSourceCloudStack({}, None, helpers.Paths({}))
+ self.assertTrue(ds.get_data())
+
+ def assertRequestTypesSent(self, subp, expected_request_types):
+ request_types = []
+ for call in subp.call_args_list:
+ args = call[0][0]
+ for arg in args:
+ if arg.startswith('DomU_Request'):
+ request_types.append(arg.split()[1])
+ self.assertEqual(expected_request_types, request_types)
+
+ def test_valid_response_means_password_marked_as_saved(self):
+ password = 'SekritSquirrel'
+ subp = self._set_password_server_response(password)
+ ds = DataSourceCloudStack({}, None, helpers.Paths({}))
+ ds.get_data()
+ self.assertRequestTypesSent(subp,
+ ['send_my_password', 'saved_password'])
+
+ def _check_password_not_saved_for(self, response_string):
+ subp = self._set_password_server_response(response_string)
+ ds = DataSourceCloudStack({}, None, helpers.Paths({}))
+ ds.get_data()
+ self.assertRequestTypesSent(subp, ['send_my_password'])
+
+ def test_password_not_saved_if_empty(self):
+ self._check_password_not_saved_for('')
+
+ def test_password_not_saved_if_already_saved(self):
+ self._check_password_not_saved_for('saved_password')
+
+ def test_password_not_saved_if_bad_request(self):
+ self._check_password_not_saved_for('bad_request')
diff --git a/tests/unittests/test_datasource/test_configdrive.py b/tests/unittests/test_datasource/test_configdrive.py
index d88066e5..89b15f54 100644
--- a/tests/unittests/test_datasource/test_configdrive.py
+++ b/tests/unittests/test_datasource/test_configdrive.py
@@ -1,10 +1,18 @@
from copy import copy
import json
import os
-import os.path
-
-import mocker
-from mocker import MockerTestCase
+import shutil
+import six
+import tempfile
+
+try:
+ from unittest import mock
+except ImportError:
+ import mock
+try:
+ from contextlib import ExitStack
+except ImportError:
+ from contextlib2 import ExitStack
from cloudinit import helpers
from cloudinit import settings
@@ -12,7 +20,8 @@ from cloudinit.sources import DataSourceConfigDrive as ds
from cloudinit.sources.helpers import openstack
from cloudinit import util
-from .. import helpers as unit_helpers
+from ..helpers import TestCase
+
PUBKEY = u'ssh-rsa AAAAB3NzaC1....sIkJhq8wdX+4I3A4cYbYP ubuntu@server-460\n'
EC2_META = {
@@ -37,7 +46,7 @@ EC2_META = {
'reservation-id': 'r-iru5qm4m',
'security-groups': ['default']
}
-USER_DATA = '#!/bin/sh\necho This is user data\n'
+USER_DATA = b'#!/bin/sh\necho This is user data\n'
OSTACK_META = {
'availability_zone': 'nova',
'files': [{'content_path': '/content/0000', 'path': '/etc/foo.cfg'},
@@ -48,27 +57,60 @@ OSTACK_META = {
'public_keys': {'mykey': PUBKEY},
'uuid': 'b0fa911b-69d4-4476-bbe2-1c92bff6535c'}
-CONTENT_0 = 'This is contents of /etc/foo.cfg\n'
-CONTENT_1 = '# this is /etc/bar/bar.cfg\n'
+CONTENT_0 = b'This is contents of /etc/foo.cfg\n'
+CONTENT_1 = b'# this is /etc/bar/bar.cfg\n'
+NETWORK_DATA = {
+ 'services': [
+ {'type': 'dns', 'address': '199.204.44.24'},
+ {'type': 'dns', 'address': '199.204.47.54'}
+ ],
+ 'links': [
+ {'vif_id': '2ecc7709-b3f7-4448-9580-e1ec32d75bbd',
+ 'ethernet_mac_address': 'fa:16:3e:69:b0:58',
+ 'type': 'ovs', 'mtu': None, 'id': 'tap2ecc7709-b3'},
+ {'vif_id': '2f88d109-5b57-40e6-af32-2472df09dc33',
+ 'ethernet_mac_address': 'fa:16:3e:d4:57:ad',
+ 'type': 'ovs', 'mtu': None, 'id': 'tap2f88d109-5b'},
+ {'vif_id': '1a5382f8-04c5-4d75-ab98-d666c1ef52cc',
+ 'ethernet_mac_address': 'fa:16:3e:05:30:fe',
+ 'type': 'ovs', 'mtu': None, 'id': 'tap1a5382f8-04'}
+ ],
+ 'networks': [
+ {'link': 'tap2ecc7709-b3', 'type': 'ipv4_dhcp',
+ 'network_id': '6d6357ac-0f70-4afa-8bd7-c274cc4ea235',
+ 'id': 'network0'},
+ {'link': 'tap2f88d109-5b', 'type': 'ipv4_dhcp',
+ 'network_id': 'd227a9b3-6960-4d94-8976-ee5788b44f54',
+ 'id': 'network1'},
+ {'link': 'tap1a5382f8-04', 'type': 'ipv4_dhcp',
+ 'network_id': 'dab2ba57-cae2-4311-a5ed-010b263891f5',
+ 'id': 'network2'}
+ ]
+}
CFG_DRIVE_FILES_V2 = {
- 'ec2/2009-04-04/meta-data.json': json.dumps(EC2_META),
- 'ec2/2009-04-04/user-data': USER_DATA,
- 'ec2/latest/meta-data.json': json.dumps(EC2_META),
- 'ec2/latest/user-data': USER_DATA,
- 'openstack/2012-08-10/meta_data.json': json.dumps(OSTACK_META),
- 'openstack/2012-08-10/user_data': USER_DATA,
- 'openstack/content/0000': CONTENT_0,
- 'openstack/content/0001': CONTENT_1,
- 'openstack/latest/meta_data.json': json.dumps(OSTACK_META),
- 'openstack/latest/user_data': USER_DATA}
-
-
-class TestConfigDriveDataSource(MockerTestCase):
+ 'ec2/2009-04-04/meta-data.json': json.dumps(EC2_META),
+ 'ec2/2009-04-04/user-data': USER_DATA,
+ 'ec2/latest/meta-data.json': json.dumps(EC2_META),
+ 'ec2/latest/user-data': USER_DATA,
+ 'openstack/2012-08-10/meta_data.json': json.dumps(OSTACK_META),
+ 'openstack/2012-08-10/user_data': USER_DATA,
+ 'openstack/content/0000': CONTENT_0,
+ 'openstack/content/0001': CONTENT_1,
+ 'openstack/latest/meta_data.json': json.dumps(OSTACK_META),
+ 'openstack/latest/user_data': USER_DATA,
+ 'openstack/latest/network_data.json': json.dumps(NETWORK_DATA),
+ 'openstack/2015-10-15/meta_data.json': json.dumps(OSTACK_META),
+ 'openstack/2015-10-15/user_data': USER_DATA,
+ 'openstack/2015-10-15/network_data.json': json.dumps(NETWORK_DATA)}
+
+
+class TestConfigDriveDataSource(TestCase):
def setUp(self):
super(TestConfigDriveDataSource, self).setUp()
- self.tmp = self.makeDir()
+ self.tmp = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, self.tmp)
def test_ec2_metadata(self):
populate_dir(self.tmp, CFG_DRIVE_FILES_V2)
@@ -91,23 +133,29 @@ class TestConfigDriveDataSource(MockerTestCase):
'swap': '/dev/vda3',
}
for name, dev_name in name_tests.items():
- with unit_helpers.mocker() as my_mock:
- find_mock = my_mock.replace(util.find_devs_with,
- spec=False, passthrough=False)
+ with ExitStack() as mocks:
provided_name = dev_name[len('/dev/'):]
provided_name = "s" + provided_name[1:]
- find_mock(mocker.ARGS)
- my_mock.result([provided_name])
- exists_mock = my_mock.replace(os.path.exists,
- spec=False, passthrough=False)
- exists_mock(mocker.ARGS)
- my_mock.result(False)
- exists_mock(mocker.ARGS)
- my_mock.result(True)
- my_mock.replay()
+ find_mock = mocks.enter_context(
+ mock.patch.object(util, 'find_devs_with',
+ return_value=[provided_name]))
+ # We want os.path.exists() to return False on its first call,
+ # and True on its second call. We use a handy generator as
+ # the mock side effect for this. The mocked function returns
+ # what the side effect returns.
+
+ def exists_side_effect():
+ yield False
+ yield True
+ exists_mock = mocks.enter_context(
+ mock.patch.object(os.path, 'exists',
+ side_effect=exists_side_effect()))
device = cfg_ds.device_name_to_device(name)
self.assertEquals(dev_name, device)
+ find_mock.assert_called_once_with(mock.ANY)
+ self.assertEqual(exists_mock.call_count, 2)
+
def test_dev_os_map(self):
populate_dir(self.tmp, CFG_DRIVE_FILES_V2)
cfg_ds = ds.DataSourceConfigDrive(settings.CFG_BUILTIN,
@@ -123,19 +171,19 @@ class TestConfigDriveDataSource(MockerTestCase):
'swap': '/dev/vda3',
}
for name, dev_name in name_tests.items():
- with unit_helpers.mocker() as my_mock:
- find_mock = my_mock.replace(util.find_devs_with,
- spec=False, passthrough=False)
- find_mock(mocker.ARGS)
- my_mock.result([dev_name])
- exists_mock = my_mock.replace(os.path.exists,
- spec=False, passthrough=False)
- exists_mock(mocker.ARGS)
- my_mock.result(True)
- my_mock.replay()
+ with ExitStack() as mocks:
+ find_mock = mocks.enter_context(
+ mock.patch.object(util, 'find_devs_with',
+ return_value=[dev_name]))
+ exists_mock = mocks.enter_context(
+ mock.patch.object(os.path, 'exists',
+ return_value=True))
device = cfg_ds.device_name_to_device(name)
self.assertEquals(dev_name, device)
+ find_mock.assert_called_once_with(mock.ANY)
+ exists_mock.assert_called_once_with(mock.ANY)
+
def test_dev_ec2_remap(self):
populate_dir(self.tmp, CFG_DRIVE_FILES_V2)
cfg_ds = ds.DataSourceConfigDrive(settings.CFG_BUILTIN,
@@ -156,16 +204,21 @@ class TestConfigDriveDataSource(MockerTestCase):
'root2k': None,
}
for name, dev_name in name_tests.items():
- with unit_helpers.mocker(verify_calls=False) as my_mock:
- exists_mock = my_mock.replace(os.path.exists,
- spec=False, passthrough=False)
- exists_mock(mocker.ARGS)
- my_mock.result(False)
- exists_mock(mocker.ARGS)
- my_mock.result(True)
- my_mock.replay()
+ # We want os.path.exists() to return False on its first call,
+ # and True on its second call. We use a handy generator as
+ # the mock side effect for this. The mocked function returns
+ # what the side effect returns.
+ def exists_side_effect():
+ yield False
+ yield True
+ with mock.patch.object(os.path, 'exists',
+ side_effect=exists_side_effect()):
device = cfg_ds.device_name_to_device(name)
self.assertEquals(dev_name, device)
+ # We don't assert the call count for os.path.exists() because
+ # not all of the entries in name_tests results in two calls to
+ # that function. Specifically, 'root2k' doesn't seem to call
+ # it at all.
def test_dev_ec2_map(self):
populate_dir(self.tmp, CFG_DRIVE_FILES_V2)
@@ -173,12 +226,6 @@ class TestConfigDriveDataSource(MockerTestCase):
None,
helpers.Paths({}))
found = ds.read_config_drive(self.tmp)
- exists_mock = self.mocker.replace(os.path.exists,
- spec=False, passthrough=False)
- exists_mock(mocker.ARGS)
- self.mocker.count(0, None)
- self.mocker.result(True)
- self.mocker.replay()
ec2_md = found['ec2-metadata']
os_md = found['metadata']
cfg_ds.ec2_metadata = ec2_md
@@ -193,8 +240,9 @@ class TestConfigDriveDataSource(MockerTestCase):
'root2k': None,
}
for name, dev_name in name_tests.items():
- device = cfg_ds.device_name_to_device(name)
- self.assertEquals(dev_name, device)
+ with mock.patch.object(os.path, 'exists', return_value=True):
+ device = cfg_ds.device_name_to_device(name)
+ self.assertEquals(dev_name, device)
def test_dir_valid(self):
"""Verify a dir is read as such."""
@@ -209,6 +257,7 @@ class TestConfigDriveDataSource(MockerTestCase):
self.assertEqual(USER_DATA, found['userdata'])
self.assertEqual(expected_md, found['metadata'])
+ self.assertEqual(NETWORK_DATA, found['networkdata'])
self.assertEqual(found['files']['/etc/foo.cfg'], CONTENT_0)
self.assertEqual(found['files']['/etc/bar/bar.cfg'], CONTENT_1)
@@ -234,6 +283,7 @@ class TestConfigDriveDataSource(MockerTestCase):
data = copy(CFG_DRIVE_FILES_V2)
data["openstack/2012-08-10/meta_data.json"] = "non-json garbage {}"
+ data["openstack/2015-10-15/meta_data.json"] = "non-json garbage {}"
data["openstack/latest/meta_data.json"] = "non-json garbage {}"
populate_dir(self.tmp, data)
@@ -277,9 +327,8 @@ class TestConfigDriveDataSource(MockerTestCase):
util.is_partition = my_is_partition
devs_with_answers = {"TYPE=vfat": [],
- "TYPE=iso9660": ["/dev/vdb"],
- "LABEL=config-2": ["/dev/vdb"],
- }
+ "TYPE=iso9660": ["/dev/vdb"],
+ "LABEL=config-2": ["/dev/vdb"]}
self.assertEqual(["/dev/vdb"], ds.find_candidate_devs())
# add a vfat item
@@ -290,9 +339,10 @@ class TestConfigDriveDataSource(MockerTestCase):
# verify that partitions are considered, that have correct label.
devs_with_answers = {"TYPE=vfat": ["/dev/sda1"],
- "TYPE=iso9660": [], "LABEL=config-2": ["/dev/vdb3"]}
+ "TYPE=iso9660": [],
+ "LABEL=config-2": ["/dev/vdb3"]}
self.assertEqual(["/dev/vdb3"],
- ds.find_candidate_devs())
+ ds.find_candidate_devs())
finally:
util.find_devs_with = orig_find_devs_with
@@ -303,7 +353,20 @@ class TestConfigDriveDataSource(MockerTestCase):
populate_dir(self.tmp, CFG_DRIVE_FILES_V2)
myds = cfg_ds_from_dir(self.tmp)
self.assertEqual(myds.get_public_ssh_keys(),
- [OSTACK_META['public_keys']['mykey']])
+ [OSTACK_META['public_keys']['mykey']])
+
+ def test_network_data_is_found(self):
+ """Verify that network_data is present in ds in config-drive-v2."""
+ populate_dir(self.tmp, CFG_DRIVE_FILES_V2)
+ myds = cfg_ds_from_dir(self.tmp)
+ self.assertEqual(myds.network_json, NETWORK_DATA)
+
+ def test_network_config_is_converted(self):
+ """Verify that network_data is converted and present on ds object."""
+ populate_dir(self.tmp, CFG_DRIVE_FILES_V2)
+ myds = cfg_ds_from_dir(self.tmp)
+ network_config = ds.convert_network_data(NETWORK_DATA)
+ self.assertEqual(myds.network_config, network_config)
def cfg_ds_from_dir(seed_d):
@@ -323,16 +386,22 @@ def populate_ds_from_read_config(cfg_ds, source, results):
cfg_ds.ec2_metadata = results.get('ec2-metadata')
cfg_ds.userdata_raw = results.get('userdata')
cfg_ds.version = results.get('version')
+ cfg_ds.network_json = results.get('networkdata')
+ cfg_ds._network_config = ds.convert_network_data(cfg_ds.network_json)
def populate_dir(seed_dir, files):
- for (name, content) in files.iteritems():
+ for (name, content) in files.items():
path = os.path.join(seed_dir, name)
dirname = os.path.dirname(path)
if not os.path.isdir(dirname):
os.makedirs(dirname)
- with open(path, "w") as fp:
+ if isinstance(content, six.text_type):
+ mode = "w"
+ else:
+ mode = "wb"
+
+ with open(path, mode) as fp:
fp.write(content)
- fp.close()
# vi: ts=4 expandtab
diff --git a/tests/unittests/test_datasource/test_digitalocean.py b/tests/unittests/test_datasource/test_digitalocean.py
index d1270fc2..679d1b82 100644
--- a/tests/unittests/test_datasource/test_digitalocean.py
+++ b/tests/unittests/test_datasource/test_digitalocean.py
@@ -15,11 +15,9 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-import httpretty
import re
-from types import ListType
-from urlparse import urlparse
+from six.moves.urllib_parse import urlparse
from cloudinit import settings
from cloudinit import helpers
@@ -27,6 +25,8 @@ from cloudinit.sources import DataSourceDigitalOcean
from .. import helpers as test_helpers
+httpretty = test_helpers.import_httpretty()
+
# Abbreviated for the test
DO_INDEX = """id
hostname
@@ -110,7 +110,7 @@ class TestDataSourceDigitalOcean(test_helpers.HttprettyTestCase):
self.assertEqual([DO_META.get('public-keys')],
self.ds.get_public_ssh_keys())
- self.assertIs(type(self.ds.get_public_ssh_keys()), ListType)
+ self.assertIsInstance(self.ds.get_public_ssh_keys(), list)
@httpretty.activate
def test_multiple_ssh_keys(self):
@@ -124,4 +124,4 @@ class TestDataSourceDigitalOcean(test_helpers.HttprettyTestCase):
self.assertEqual(DO_META.get('public-keys').splitlines(),
self.ds.get_public_ssh_keys())
- self.assertIs(type(self.ds.get_public_ssh_keys()), ListType)
+ self.assertIsInstance(self.ds.get_public_ssh_keys(), list)
diff --git a/tests/unittests/test_datasource/test_gce.py b/tests/unittests/test_datasource/test_gce.py
index 06050bb1..fa714070 100644
--- a/tests/unittests/test_datasource/test_gce.py
+++ b/tests/unittests/test_datasource/test_gce.py
@@ -15,11 +15,10 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-import httpretty
import re
from base64 import b64encode, b64decode
-from urlparse import urlparse
+from six.moves.urllib_parse import urlparse
from cloudinit import settings
from cloudinit import helpers
@@ -27,12 +26,14 @@ from cloudinit.sources import DataSourceGCE
from .. import helpers as test_helpers
+httpretty = test_helpers.import_httpretty()
+
GCE_META = {
'instance/id': '123',
'instance/zone': 'foo/bar',
'project/attributes/sshKeys': 'user:ssh-rsa AA2..+aRD0fyVw== root@server',
'instance/hostname': 'server.project-foo.local',
- 'instance/attributes/user-data': '/bin/echo foo\n',
+ 'instance/attributes/user-data': b'/bin/echo foo\n',
}
GCE_META_PARTIAL = {
@@ -45,7 +46,7 @@ GCE_META_ENCODING = {
'instance/id': '12345',
'instance/hostname': 'server.project-baz.local',
'instance/zone': 'baz/bang',
- 'instance/attributes/user-data': b64encode('/bin/echo baz\n'),
+ 'instance/attributes/user-data': b64encode(b'/bin/echo baz\n'),
'instance/attributes/user-data-encoding': 'base64',
}
@@ -54,8 +55,8 @@ MD_URL_RE = re.compile(
r'http://metadata.google.internal./computeMetadata/v1/.*')
-def _new_request_callback(gce_meta=None):
- if not gce_meta:
+def _set_mock_metadata(gce_meta=None):
+ if gce_meta is None:
gce_meta = GCE_META
def _request_callback(method, uri, headers):
@@ -69,9 +70,10 @@ def _new_request_callback(gce_meta=None):
else:
return (404, headers, '')
- return _request_callback
+ httpretty.register_uri(httpretty.GET, MD_URL_RE, body=_request_callback)
+@httpretty.activate
class TestDataSourceGCE(test_helpers.HttprettyTestCase):
def setUp(self):
@@ -80,23 +82,16 @@ class TestDataSourceGCE(test_helpers.HttprettyTestCase):
helpers.Paths({}))
super(TestDataSourceGCE, self).setUp()
- @httpretty.activate
def test_connection(self):
- httpretty.register_uri(
- httpretty.GET, MD_URL_RE,
- body=_new_request_callback())
-
+ _set_mock_metadata()
success = self.ds.get_data()
self.assertTrue(success)
req_header = httpretty.last_request().headers
self.assertDictContainsSubset(HEADERS, req_header)
- @httpretty.activate
def test_metadata(self):
- httpretty.register_uri(
- httpretty.GET, MD_URL_RE,
- body=_new_request_callback())
+ _set_mock_metadata()
self.ds.get_data()
shostname = GCE_META.get('instance/hostname').split('.')[0]
@@ -106,22 +101,12 @@ class TestDataSourceGCE(test_helpers.HttprettyTestCase):
self.assertEqual(GCE_META.get('instance/id'),
self.ds.get_instance_id())
- self.assertEqual(GCE_META.get('instance/zone'),
- self.ds.availability_zone)
-
self.assertEqual(GCE_META.get('instance/attributes/user-data'),
self.ds.get_userdata_raw())
- # we expect a list of public ssh keys with user names stripped
- self.assertEqual(['ssh-rsa AA2..+aRD0fyVw== root@server'],
- self.ds.get_public_ssh_keys())
-
# test partial metadata (missing user-data in particular)
- @httpretty.activate
def test_metadata_partial(self):
- httpretty.register_uri(
- httpretty.GET, MD_URL_RE,
- body=_new_request_callback(GCE_META_PARTIAL))
+ _set_mock_metadata(GCE_META_PARTIAL)
self.ds.get_data()
self.assertEqual(GCE_META_PARTIAL.get('instance/id'),
@@ -130,13 +115,52 @@ class TestDataSourceGCE(test_helpers.HttprettyTestCase):
shostname = GCE_META_PARTIAL.get('instance/hostname').split('.')[0]
self.assertEqual(shostname, self.ds.get_hostname())
- @httpretty.activate
def test_metadata_encoding(self):
- httpretty.register_uri(
- httpretty.GET, MD_URL_RE,
- body=_new_request_callback(GCE_META_ENCODING))
+ _set_mock_metadata(GCE_META_ENCODING)
self.ds.get_data()
decoded = b64decode(
GCE_META_ENCODING.get('instance/attributes/user-data'))
self.assertEqual(decoded, self.ds.get_userdata_raw())
+
+ def test_missing_required_keys_return_false(self):
+ for required_key in ['instance/id', 'instance/zone',
+ 'instance/hostname']:
+ meta = GCE_META_PARTIAL.copy()
+ del meta[required_key]
+ _set_mock_metadata(meta)
+ self.assertEqual(False, self.ds.get_data())
+ httpretty.reset()
+
+ def test_project_level_ssh_keys_are_used(self):
+ _set_mock_metadata()
+ self.ds.get_data()
+
+ # we expect a list of public ssh keys with user names stripped
+ self.assertEqual(['ssh-rsa AA2..+aRD0fyVw== root@server'],
+ self.ds.get_public_ssh_keys())
+
+ def test_instance_level_ssh_keys_are_used(self):
+ key_content = 'ssh-rsa JustAUser root@server'
+ meta = GCE_META.copy()
+ meta['instance/attributes/sshKeys'] = 'user:{0}'.format(key_content)
+
+ _set_mock_metadata(meta)
+ self.ds.get_data()
+
+ self.assertIn(key_content, self.ds.get_public_ssh_keys())
+
+ def test_instance_level_keys_replace_project_level_keys(self):
+ key_content = 'ssh-rsa JustAUser root@server'
+ meta = GCE_META.copy()
+ meta['instance/attributes/sshKeys'] = 'user:{0}'.format(key_content)
+
+ _set_mock_metadata(meta)
+ self.ds.get_data()
+
+ self.assertEqual([key_content], self.ds.get_public_ssh_keys())
+
+ def test_only_last_part_of_zone_used_for_availability_zone(self):
+ _set_mock_metadata()
+ self.ds.get_data()
+ self.assertEqual('bar', self.ds.availability_zone)
diff --git a/tests/unittests/test_datasource/test_maas.py b/tests/unittests/test_datasource/test_maas.py
index c157beb8..77d15cac 100644
--- a/tests/unittests/test_datasource/test_maas.py
+++ b/tests/unittests/test_datasource/test_maas.py
@@ -1,27 +1,33 @@
from copy import copy
import os
+import shutil
+import tempfile
from cloudinit.sources import DataSourceMAAS
from cloudinit import url_helper
-from ..helpers import populate_dir
+from ..helpers import TestCase, populate_dir
-import mocker
+try:
+ from unittest import mock
+except ImportError:
+ import mock
-class TestMAASDataSource(mocker.MockerTestCase):
+class TestMAASDataSource(TestCase):
def setUp(self):
super(TestMAASDataSource, self).setUp()
# Make a temp directoy for tests to use.
- self.tmp = self.makeDir()
+ self.tmp = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, self.tmp)
def test_seed_dir_valid(self):
"""Verify a valid seeddir is read as such."""
data = {'instance-id': 'i-valid01',
- 'local-hostname': 'valid01-hostname',
- 'user-data': 'valid01-userdata',
- 'public-keys': 'ssh-rsa AAAAB3Nz...aC1yc2E= keyname'}
+ 'local-hostname': 'valid01-hostname',
+ 'user-data': b'valid01-userdata',
+ 'public-keys': 'ssh-rsa AAAAB3Nz...aC1yc2E= keyname'}
my_d = os.path.join(self.tmp, "valid")
populate_dir(my_d, data)
@@ -39,8 +45,8 @@ class TestMAASDataSource(mocker.MockerTestCase):
"""Verify extra files do not affect seed_dir validity."""
data = {'instance-id': 'i-valid-extra',
- 'local-hostname': 'valid-extra-hostname',
- 'user-data': 'valid-extra-userdata', 'foo': 'bar'}
+ 'local-hostname': 'valid-extra-hostname',
+ 'user-data': b'valid-extra-userdata', 'foo': 'bar'}
my_d = os.path.join(self.tmp, "valid_extra")
populate_dir(my_d, data)
@@ -58,7 +64,7 @@ class TestMAASDataSource(mocker.MockerTestCase):
"""Verify that invalid seed_dir raises MAASSeedDirMalformed."""
valid = {'instance-id': 'i-instanceid',
- 'local-hostname': 'test-hostname', 'user-data': ''}
+ 'local-hostname': 'test-hostname', 'user-data': ''}
my_based = os.path.join(self.tmp, "valid_extra")
@@ -88,21 +94,23 @@ class TestMAASDataSource(mocker.MockerTestCase):
def test_seed_dir_missing(self):
"""Verify that missing seed_dir raises MAASSeedDirNone."""
self.assertRaises(DataSourceMAAS.MAASSeedDirNone,
- DataSourceMAAS.read_maas_seed_dir,
- os.path.join(self.tmp, "nonexistantdirectory"))
+ DataSourceMAAS.read_maas_seed_dir,
+ os.path.join(self.tmp, "nonexistantdirectory"))
def test_seed_url_valid(self):
"""Verify that valid seed_url is read as such."""
- valid = {'meta-data/instance-id': 'i-instanceid',
+ valid = {
+ 'meta-data/instance-id': 'i-instanceid',
'meta-data/local-hostname': 'test-hostname',
'meta-data/public-keys': 'test-hostname',
- 'user-data': 'foodata'}
+ 'user-data': b'foodata',
+ }
valid_order = [
'meta-data/local-hostname',
'meta-data/instance-id',
'meta-data/public-keys',
'user-data',
- ]
+ ]
my_seed = "http://example.com/xmeta"
my_ver = "1999-99-99"
my_headers = {'header1': 'value1', 'header2': 'value2'}
@@ -110,28 +118,38 @@ class TestMAASDataSource(mocker.MockerTestCase):
def my_headers_cb(url):
return my_headers
- mock_request = self.mocker.replace(url_helper.readurl,
- passthrough=False)
-
- for key in valid_order:
- url = "%s/%s/%s" % (my_seed, my_ver, key)
- mock_request(url, headers=None, timeout=mocker.ANY,
- data=mocker.ANY, sec_between=mocker.ANY,
- ssl_details=mocker.ANY, retries=mocker.ANY,
- headers_cb=my_headers_cb,
- exception_cb=mocker.ANY)
- resp = valid.get(key)
- self.mocker.result(url_helper.StringResponse(resp))
- self.mocker.replay()
-
- (userdata, metadata) = DataSourceMAAS.read_maas_seed_url(my_seed,
- header_cb=my_headers_cb, version=my_ver)
-
- self.assertEqual("foodata", userdata)
- self.assertEqual(metadata['instance-id'],
- valid['meta-data/instance-id'])
- self.assertEqual(metadata['local-hostname'],
- valid['meta-data/local-hostname'])
+ # Each time url_helper.readurl() is called, something different is
+ # returned based on the canned data above. We need to build up a list
+ # of side effect return values, which the mock will return. At the
+ # same time, we'll build up a list of expected call arguments for
+ # asserting after the code under test is run.
+ calls = []
+
+ def side_effect():
+ for key in valid_order:
+ resp = valid.get(key)
+ url = "%s/%s/%s" % (my_seed, my_ver, key)
+ calls.append(
+ mock.call(url, headers=None, timeout=mock.ANY,
+ data=mock.ANY, sec_between=mock.ANY,
+ ssl_details=mock.ANY, retries=mock.ANY,
+ headers_cb=my_headers_cb,
+ exception_cb=mock.ANY))
+ yield url_helper.StringResponse(resp)
+
+ # Now do the actual call of the code under test.
+ with mock.patch.object(url_helper, 'readurl',
+ side_effect=side_effect()) as mockobj:
+ userdata, metadata = DataSourceMAAS.read_maas_seed_url(
+ my_seed, version=my_ver)
+
+ self.assertEqual(b"foodata", userdata)
+ self.assertEqual(metadata['instance-id'],
+ valid['meta-data/instance-id'])
+ self.assertEqual(metadata['local-hostname'],
+ valid['meta-data/local-hostname'])
+
+ mockobj.has_calls(calls)
def test_seed_url_invalid(self):
"""Verify that invalid seed_url raises MAASSeedDirMalformed."""
diff --git a/tests/unittests/test_datasource/test_nocloud.py b/tests/unittests/test_datasource/test_nocloud.py
index e9235951..2d5fc37c 100644
--- a/tests/unittests/test_datasource/test_nocloud.py
+++ b/tests/unittests/test_datasource/test_nocloud.py
@@ -1,39 +1,43 @@
from cloudinit import helpers
from cloudinit.sources import DataSourceNoCloud
from cloudinit import util
-from ..helpers import populate_dir
+from ..helpers import TestCase, populate_dir
-from mocker import MockerTestCase
import os
import yaml
+import shutil
+import tempfile
+import unittest
+try:
+ from unittest import mock
+except ImportError:
+ import mock
+try:
+ from contextlib import ExitStack
+except ImportError:
+ from contextlib2 import ExitStack
-class TestNoCloudDataSource(MockerTestCase):
+
+class TestNoCloudDataSource(TestCase):
def setUp(self):
- self.tmp = self.makeDir()
+ super(TestNoCloudDataSource, self).setUp()
+ self.tmp = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, self.tmp)
self.paths = helpers.Paths({'cloud_dir': self.tmp})
self.cmdline = "root=TESTCMDLINE"
- self.unapply = []
- self.apply_patches([(util, 'get_cmdline', self._getcmdline)])
- super(TestNoCloudDataSource, self).setUp()
-
- def tearDown(self):
- apply_patches([i for i in reversed(self.unapply)])
- super(TestNoCloudDataSource, self).tearDown()
+ self.mocks = ExitStack()
+ self.addCleanup(self.mocks.close)
- def apply_patches(self, patches):
- ret = apply_patches(patches)
- self.unapply += ret
-
- def _getcmdline(self):
- return self.cmdline
+ self.mocks.enter_context(
+ mock.patch.object(util, 'get_cmdline', return_value=self.cmdline))
def test_nocloud_seed_dir(self):
md = {'instance-id': 'IID', 'dsmode': 'local'}
- ud = "USER_DATA_HERE"
+ ud = b"USER_DATA_HERE"
populate_dir(os.path.join(self.paths.seed_dir, "nocloud"),
{'user-data': ud, 'meta-data': yaml.safe_dump(md)})
@@ -59,7 +63,9 @@ class TestNoCloudDataSource(MockerTestCase):
def my_find_devs_with(*args, **kwargs):
raise PsuedoException
- self.apply_patches([(util, 'find_devs_with', my_find_devs_with)])
+ self.mocks.enter_context(
+ mock.patch.object(util, 'find_devs_with',
+ side_effect=PsuedoException))
# by default, NoCloud should search for filesystems by label
sys_cfg = {'datasource': {'NoCloud': {}}}
@@ -85,21 +91,21 @@ class TestNoCloudDataSource(MockerTestCase):
data = {
'fs_label': None,
- 'meta-data': {'instance-id': 'IID'},
- 'user-data': "USER_DATA_RAW",
+ 'meta-data': yaml.safe_dump({'instance-id': 'IID'}),
+ 'user-data': b"USER_DATA_RAW",
}
sys_cfg = {'datasource': {'NoCloud': data}}
dsrc = ds(sys_cfg=sys_cfg, distro=None, paths=self.paths)
ret = dsrc.get_data()
- self.assertEqual(dsrc.userdata_raw, "USER_DATA_RAW")
+ self.assertEqual(dsrc.userdata_raw, b"USER_DATA_RAW")
self.assertEqual(dsrc.metadata.get('instance-id'), 'IID')
self.assertTrue(ret)
def test_nocloud_seed_with_vendordata(self):
md = {'instance-id': 'IID', 'dsmode': 'local'}
- ud = "USER_DATA_HERE"
- vd = "THIS IS MY VENDOR_DATA"
+ ud = b"USER_DATA_HERE"
+ vd = b"THIS IS MY VENDOR_DATA"
populate_dir(os.path.join(self.paths.seed_dir, "nocloud"),
{'user-data': ud, 'meta-data': yaml.safe_dump(md),
@@ -115,12 +121,12 @@ class TestNoCloudDataSource(MockerTestCase):
ret = dsrc.get_data()
self.assertEqual(dsrc.userdata_raw, ud)
self.assertEqual(dsrc.metadata, md)
- self.assertEqual(dsrc.vendordata, vd)
+ self.assertEqual(dsrc.vendordata_raw, vd)
self.assertTrue(ret)
def test_nocloud_no_vendordata(self):
populate_dir(os.path.join(self.paths.seed_dir, "nocloud"),
- {'user-data': "ud", 'meta-data': "instance-id: IID\n"})
+ {'user-data': b"ud", 'meta-data': "instance-id: IID\n"})
sys_cfg = {'datasource': {'NoCloud': {'fs_label': None}}}
@@ -128,12 +134,12 @@ class TestNoCloudDataSource(MockerTestCase):
dsrc = ds(sys_cfg=sys_cfg, distro=None, paths=self.paths)
ret = dsrc.get_data()
- self.assertEqual(dsrc.userdata_raw, "ud")
+ self.assertEqual(dsrc.userdata_raw, b"ud")
self.assertFalse(dsrc.vendordata)
self.assertTrue(ret)
-class TestParseCommandLineData(MockerTestCase):
+class TestParseCommandLineData(unittest.TestCase):
def test_parse_cmdline_data_valid(self):
ds_id = "ds=nocloud"
@@ -178,15 +184,4 @@ class TestParseCommandLineData(MockerTestCase):
self.assertFalse(ret)
-def apply_patches(patches):
- ret = []
- for (ref, name, replace) in patches:
- if replace is None:
- continue
- orig = getattr(ref, name)
- setattr(ref, name, replace)
- ret.append((ref, name, orig))
- return ret
-
-
# vi: ts=4 expandtab
diff --git a/tests/unittests/test_datasource/test_opennebula.py b/tests/unittests/test_datasource/test_opennebula.py
index b4fd1f4d..d796f030 100644
--- a/tests/unittests/test_datasource/test_opennebula.py
+++ b/tests/unittests/test_datasource/test_opennebula.py
@@ -1,12 +1,14 @@
from cloudinit import helpers
from cloudinit.sources import DataSourceOpenNebula as ds
from cloudinit import util
-from mocker import MockerTestCase
-from ..helpers import populate_dir
+from ..helpers import TestCase, populate_dir
-from base64 import b64encode
import os
import pwd
+import shutil
+import tempfile
+import unittest
+
TEST_VARS = {
'VAR1': 'single',
@@ -18,7 +20,7 @@ TEST_VARS = {
'VAR7': 'single\\t',
'VAR8': 'double\\tword',
'VAR9': 'multi\\t\nline\n',
- 'VAR10': '\\', # expect \
+ 'VAR10': '\\', # expect '\'
'VAR11': '\'', # expect '
'VAR12': '$', # expect $
}
@@ -37,12 +39,13 @@ CMD_IP_OUT = '''\
'''
-class TestOpenNebulaDataSource(MockerTestCase):
+class TestOpenNebulaDataSource(TestCase):
parsed_user = None
def setUp(self):
super(TestOpenNebulaDataSource, self).setUp()
- self.tmp = self.makeDir()
+ self.tmp = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, self.tmp)
self.paths = helpers.Paths({'cloud_dir': self.tmp})
# defaults for few tests
@@ -176,7 +179,7 @@ class TestOpenNebulaDataSource(MockerTestCase):
self.assertEqual(USER_DATA, results['userdata'])
def test_user_data_encoding_required_for_decode(self):
- b64userdata = b64encode(USER_DATA)
+ b64userdata = util.b64e(USER_DATA)
for k in ('USER_DATA', 'USERDATA'):
my_d = os.path.join(self.tmp, k)
populate_context_dir(my_d, {k: b64userdata})
@@ -188,7 +191,7 @@ class TestOpenNebulaDataSource(MockerTestCase):
def test_user_data_base64_encoding(self):
for k in ('USER_DATA', 'USERDATA'):
my_d = os.path.join(self.tmp, k)
- populate_context_dir(my_d, {k: b64encode(USER_DATA),
+ populate_context_dir(my_d, {k: util.b64e(USER_DATA),
'USERDATA_ENCODING': 'base64'})
results = ds.read_context_disk_dir(my_d)
@@ -228,7 +231,7 @@ class TestOpenNebulaDataSource(MockerTestCase):
util.find_devs_with = orig_find_devs_with
-class TestOpenNebulaNetwork(MockerTestCase):
+class TestOpenNebulaNetwork(unittest.TestCase):
def setUp(self):
super(TestOpenNebulaNetwork, self).setUp()
@@ -280,7 +283,7 @@ iface eth0 inet static
''')
-class TestParseShellConfig(MockerTestCase):
+class TestParseShellConfig(unittest.TestCase):
def test_no_seconds(self):
cfg = '\n'.join(["foo=bar", "SECONDS=2", "xx=foo"])
# we could test 'sleep 2', but that would make the test run slower.
@@ -290,7 +293,7 @@ class TestParseShellConfig(MockerTestCase):
def populate_context_dir(path, variables):
data = "# Context variables generated by OpenNebula\n"
- for (k, v) in variables.iteritems():
+ for k, v in variables.items():
data += ("%s='%s'\n" % (k.upper(), v.replace(r"'", r"'\''")))
populate_dir(path, {'context.sh': data})
diff --git a/tests/unittests/test_datasource/test_openstack.py b/tests/unittests/test_datasource/test_openstack.py
index 49894e51..0aa1ba84 100644
--- a/tests/unittests/test_datasource/test_openstack.py
+++ b/tests/unittests/test_datasource/test_openstack.py
@@ -20,19 +20,18 @@ import copy
import json
import re
-from StringIO import StringIO
-
-from urlparse import urlparse
-
from .. import helpers as test_helpers
+from six import StringIO
+from six.moves.urllib.parse import urlparse
+
from cloudinit import helpers
from cloudinit import settings
from cloudinit.sources import DataSourceOpenStack as ds
from cloudinit.sources.helpers import openstack
from cloudinit import util
-import httpretty as hp
+hp = test_helpers.import_httpretty()
BASE_URL = "http://169.254.169.254"
PUBKEY = u'ssh-rsa AAAAB3NzaC1....sIkJhq8wdX+4I3A4cYbYP ubuntu@server-460\n'
@@ -50,7 +49,7 @@ EC2_META = {
'public-ipv4': '0.0.0.1',
'reservation-id': 'r-iru5qm4m',
}
-USER_DATA = '#!/bin/sh\necho This is user data\n'
+USER_DATA = b'#!/bin/sh\necho This is user data\n'
VENDOR_DATA = {
'magic': '',
}
@@ -64,8 +63,8 @@ OSTACK_META = {
'public_keys': {'mykey': PUBKEY},
'uuid': 'b0fa911b-69d4-4476-bbe2-1c92bff6535c',
}
-CONTENT_0 = 'This is contents of /etc/foo.cfg\n'
-CONTENT_1 = '# this is /etc/bar/bar.cfg\n'
+CONTENT_0 = b'This is contents of /etc/foo.cfg\n'
+CONTENT_1 = b'# this is /etc/bar/bar.cfg\n'
OS_FILES = {
'openstack/latest/meta_data.json': json.dumps(OSTACK_META),
'openstack/latest/user_data': USER_DATA,
diff --git a/tests/unittests/test_datasource/test_smartos.py b/tests/unittests/test_datasource/test_smartos.py
index 65675106..5c49966a 100644
--- a/tests/unittests/test_datasource/test_smartos.py
+++ b/tests/unittests/test_datasource/test_smartos.py
@@ -22,15 +22,30 @@
# return responses.
#
-import base64
-from cloudinit import helpers as c_helpers
-from cloudinit.sources import DataSourceSmartOS
-from .. import helpers
+from __future__ import print_function
+
import os
import os.path
import re
+import shutil
import stat
+import tempfile
import uuid
+from binascii import crc32
+
+import serial
+import six
+
+from cloudinit import helpers as c_helpers
+from cloudinit.sources import DataSourceSmartOS
+from cloudinit.util import b64e
+
+from .. import helpers
+
+try:
+ from unittest import mock
+except ImportError:
+ import mock
MOCK_RETURNS = {
'hostname': 'test-host',
@@ -41,77 +56,34 @@ MOCK_RETURNS = {
'cloud-init:user-data': '\n'.join(['#!/bin/sh', '/bin/true', '']),
'sdc:datacenter_name': 'somewhere2',
'sdc:operator-script': '\n'.join(['bin/true', '']),
+ 'sdc:uuid': str(uuid.uuid4()),
'sdc:vendor-data': '\n'.join(['VENDOR_DATA', '']),
'user-data': '\n'.join(['something', '']),
'user-script': '\n'.join(['/bin/true', '']),
}
-DMI_DATA_RETURN = (str(uuid.uuid4()), 'smartdc')
-
-
-class MockSerial(object):
- """Fake a serial terminal for testing the code that
- interfaces with the serial"""
-
- port = None
-
- def __init__(self, mockdata):
- self.last = None
- self.last = None
- self.new = True
- self.count = 0
- self.mocked_out = []
- self.mockdata = mockdata
-
- def open(self):
- return True
-
- def close(self):
- return True
-
- def isOpen(self):
- return True
+DMI_DATA_RETURN = 'smartdc'
- def write(self, line):
- line = line.replace('GET ', '')
- self.last = line.rstrip()
- def readline(self):
- if self.new:
- self.new = False
- if self.last in self.mockdata:
- return 'SUCCESS\n'
- else:
- return 'NOTFOUND %s\n' % self.last
-
- if self.last in self.mockdata:
- if not self.mocked_out:
- self.mocked_out = [x for x in self._format_out()]
-
- if len(self.mocked_out) > self.count:
- self.count += 1
- return self.mocked_out[self.count - 1]
+def get_mock_client(mockdata):
+ class MockMetadataClient(object):
- def _format_out(self):
- if self.last in self.mockdata:
- _mret = self.mockdata[self.last]
- try:
- for l in _mret.splitlines():
- yield "%s\n" % l.rstrip()
- except:
- yield "%s\n" % _mret.rstrip()
+ def __init__(self, serial):
+ pass
- yield '.'
- yield '\n'
+ def get_metadata(self, metadata_key):
+ return mockdata.get(metadata_key)
+ return MockMetadataClient
class TestSmartOSDataSource(helpers.FilesystemMockingTestCase):
def setUp(self):
- helpers.FilesystemMockingTestCase.setUp(self)
+ super(TestSmartOSDataSource, self).setUp()
- # makeDir comes from MockerTestCase
- self.tmp = self.makeDir()
- self.legacy_user_d = self.makeDir()
+ self.tmp = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, self.tmp)
+ self.legacy_user_d = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, self.legacy_user_d)
# If you should want to watch the logs...
self._log = None
@@ -140,7 +112,8 @@ class TestSmartOSDataSource(helpers.FilesystemMockingTestCase):
ret = apply_patches(patches)
self.unapply += ret
- def _get_ds(self, sys_cfg=None, ds_cfg=None, mockdata=None, dmi_data=None):
+ def _get_ds(self, sys_cfg=None, ds_cfg=None, mockdata=None, dmi_data=None,
+ is_lxbrand=False):
mod = DataSourceSmartOS
if mockdata is None:
@@ -149,16 +122,17 @@ class TestSmartOSDataSource(helpers.FilesystemMockingTestCase):
if dmi_data is None:
dmi_data = DMI_DATA_RETURN
- def _get_serial(*_):
- return MockSerial(mockdata)
-
def _dmi_data():
return dmi_data
def _os_uname():
- # LP: #1243287. tests assume this runs, but running test on
- # arm would cause them all to fail.
- return ('LINUX', 'NODENAME', 'RELEASE', 'VERSION', 'x86_64')
+ if not is_lxbrand:
+ # LP: #1243287. tests assume this runs, but running test on
+ # arm would cause them all to fail.
+ return ('LINUX', 'NODENAME', 'RELEASE', 'VERSION', 'x86_64')
+ else:
+ return ('LINUX', 'NODENAME', 'RELEASE', 'BRANDZ VIRTUAL LINUX',
+ 'X86_64')
if sys_cfg is None:
sys_cfg = {}
@@ -168,12 +142,14 @@ class TestSmartOSDataSource(helpers.FilesystemMockingTestCase):
sys_cfg['datasource']['SmartOS'] = ds_cfg
self.apply_patches([(mod, 'LEGACY_USER_D', self.legacy_user_d)])
- self.apply_patches([(mod, 'get_serial', _get_serial)])
+ self.apply_patches([
+ (mod, 'JoyentMetadataClient', get_mock_client(mockdata))])
self.apply_patches([(mod, 'dmi_data', _dmi_data)])
self.apply_patches([(os, 'uname', _os_uname)])
self.apply_patches([(mod, 'device_exists', lambda d: True)])
dsrc = mod.DataSourceSmartOS(sys_cfg, distro=None,
paths=self.paths)
+ self.apply_patches([(dsrc, '_get_seed_file_object', mock.MagicMock())])
return dsrc
def test_seed(self):
@@ -181,14 +157,29 @@ class TestSmartOSDataSource(helpers.FilesystemMockingTestCase):
dsrc = self._get_ds()
ret = dsrc.get_data()
self.assertTrue(ret)
+ self.assertEquals('kvm', dsrc.smartos_type)
self.assertEquals('/dev/ttyS1', dsrc.seed)
+ def test_seed_lxbrand(self):
+ # default seed should be /dev/ttyS1
+ dsrc = self._get_ds(is_lxbrand=True)
+ ret = dsrc.get_data()
+ self.assertTrue(ret)
+ self.assertEquals('lx-brand', dsrc.smartos_type)
+ self.assertEquals('/native/.zonecontrol/metadata.sock', dsrc.seed)
+
def test_issmartdc(self):
dsrc = self._get_ds()
ret = dsrc.get_data()
self.assertTrue(ret)
self.assertTrue(dsrc.is_smartdc)
+ def test_issmartdc_lxbrand(self):
+ dsrc = self._get_ds(is_lxbrand=True)
+ ret = dsrc.get_data()
+ self.assertTrue(ret)
+ self.assertTrue(dsrc.is_smartdc)
+
def test_no_base64(self):
ds_cfg = {'no_base64_decode': ['test_var1'], 'all_base': True}
dsrc = self._get_ds(ds_cfg=ds_cfg)
@@ -199,7 +190,8 @@ class TestSmartOSDataSource(helpers.FilesystemMockingTestCase):
dsrc = self._get_ds(mockdata=MOCK_RETURNS)
ret = dsrc.get_data()
self.assertTrue(ret)
- self.assertEquals(DMI_DATA_RETURN[0], dsrc.metadata['instance-id'])
+ self.assertEquals(MOCK_RETURNS['sdc:uuid'],
+ dsrc.metadata['instance-id'])
def test_root_keys(self):
dsrc = self._get_ds(mockdata=MOCK_RETURNS)
@@ -227,7 +219,7 @@ class TestSmartOSDataSource(helpers.FilesystemMockingTestCase):
my_returns = MOCK_RETURNS.copy()
my_returns['base64_all'] = "true"
for k in ('hostname', 'cloud-init:user-data'):
- my_returns[k] = base64.b64encode(my_returns[k])
+ my_returns[k] = b64e(my_returns[k])
dsrc = self._get_ds(mockdata=my_returns)
ret = dsrc.get_data()
@@ -248,7 +240,7 @@ class TestSmartOSDataSource(helpers.FilesystemMockingTestCase):
my_returns['b64-cloud-init:user-data'] = "true"
my_returns['b64-hostname'] = "true"
for k in ('hostname', 'cloud-init:user-data'):
- my_returns[k] = base64.b64encode(my_returns[k])
+ my_returns[k] = b64e(my_returns[k])
dsrc = self._get_ds(mockdata=my_returns)
ret = dsrc.get_data()
@@ -264,7 +256,7 @@ class TestSmartOSDataSource(helpers.FilesystemMockingTestCase):
my_returns = MOCK_RETURNS.copy()
my_returns['base64_keys'] = 'hostname,ignored'
for k in ('hostname',):
- my_returns[k] = base64.b64encode(my_returns[k])
+ my_returns[k] = b64e(my_returns[k])
dsrc = self._get_ds(mockdata=my_returns)
ret = dsrc.get_data()
@@ -365,7 +357,7 @@ class TestSmartOSDataSource(helpers.FilesystemMockingTestCase):
permissions = oct(os.stat(name_f)[stat.ST_MODE])[-3:]
if re.match(r'.*\/mdata-user-data$', name_f):
found_new = True
- print name_f
+ print(name_f)
self.assertEquals(permissions, '400')
self.assertFalse(found_new)
@@ -447,3 +439,147 @@ def apply_patches(patches):
setattr(ref, name, replace)
ret.append((ref, name, orig))
return ret
+
+
+class TestJoyentMetadataClient(helpers.FilesystemMockingTestCase):
+
+ def setUp(self):
+ super(TestJoyentMetadataClient, self).setUp()
+ self.serial = mock.MagicMock(spec=serial.Serial)
+ self.request_id = 0xabcdef12
+ self.metadata_value = 'value'
+ self.response_parts = {
+ 'command': 'SUCCESS',
+ 'crc': 'b5a9ff00',
+ 'length': 17 + len(b64e(self.metadata_value)),
+ 'payload': b64e(self.metadata_value),
+ 'request_id': '{0:08x}'.format(self.request_id),
+ }
+
+ def make_response():
+ payloadstr = ''
+ if 'payload' in self.response_parts:
+ payloadstr = ' {0}'.format(self.response_parts['payload'])
+ return ('V2 {length} {crc} {request_id} '
+ '{command}{payloadstr}\n'.format(
+ payloadstr=payloadstr,
+ **self.response_parts).encode('ascii'))
+
+ self.metasource_data = None
+
+ def read_response(length):
+ if not self.metasource_data:
+ self.metasource_data = make_response()
+ self.metasource_data_len = len(self.metasource_data)
+ resp = self.metasource_data[:length]
+ self.metasource_data = self.metasource_data[length:]
+ return resp
+
+ self.serial.read.side_effect = read_response
+ self.patched_funcs.enter_context(
+ mock.patch('cloudinit.sources.DataSourceSmartOS.random.randint',
+ mock.Mock(return_value=self.request_id)))
+
+ def _get_client(self):
+ return DataSourceSmartOS.JoyentMetadataClient(self.serial)
+
+ def assertEndsWith(self, haystack, prefix):
+ self.assertTrue(haystack.endswith(prefix),
+ "{0} does not end with '{1}'".format(
+ repr(haystack), prefix))
+
+ def assertStartsWith(self, haystack, prefix):
+ self.assertTrue(haystack.startswith(prefix),
+ "{0} does not start with '{1}'".format(
+ repr(haystack), prefix))
+
+ def test_get_metadata_writes_a_single_line(self):
+ client = self._get_client()
+ client.get_metadata('some_key')
+ self.assertEqual(1, self.serial.write.call_count)
+ written_line = self.serial.write.call_args[0][0]
+ print(type(written_line))
+ self.assertEndsWith(written_line.decode('ascii'),
+ b'\n'.decode('ascii'))
+ self.assertEqual(1, written_line.count(b'\n'))
+
+ def _get_written_line(self, key='some_key'):
+ client = self._get_client()
+ client.get_metadata(key)
+ return self.serial.write.call_args[0][0]
+
+ def test_get_metadata_writes_bytes(self):
+ self.assertIsInstance(self._get_written_line(), six.binary_type)
+
+ def test_get_metadata_line_starts_with_v2(self):
+ foo = self._get_written_line()
+ self.assertStartsWith(foo.decode('ascii'), b'V2'.decode('ascii'))
+
+ def test_get_metadata_uses_get_command(self):
+ parts = self._get_written_line().decode('ascii').strip().split(' ')
+ self.assertEqual('GET', parts[4])
+
+ def test_get_metadata_base64_encodes_argument(self):
+ key = 'my_key'
+ parts = self._get_written_line(key).decode('ascii').strip().split(' ')
+ self.assertEqual(b64e(key), parts[5])
+
+ def test_get_metadata_calculates_length_correctly(self):
+ parts = self._get_written_line().decode('ascii').strip().split(' ')
+ expected_length = len(' '.join(parts[3:]))
+ self.assertEqual(expected_length, int(parts[1]))
+
+ def test_get_metadata_uses_appropriate_request_id(self):
+ parts = self._get_written_line().decode('ascii').strip().split(' ')
+ request_id = parts[3]
+ self.assertEqual(8, len(request_id))
+ self.assertEqual(request_id, request_id.lower())
+
+ def test_get_metadata_uses_random_number_for_request_id(self):
+ line = self._get_written_line()
+ request_id = line.decode('ascii').strip().split(' ')[3]
+ self.assertEqual('{0:08x}'.format(self.request_id), request_id)
+
+ def test_get_metadata_checksums_correctly(self):
+ parts = self._get_written_line().decode('ascii').strip().split(' ')
+ expected_checksum = '{0:08x}'.format(
+ crc32(' '.join(parts[3:]).encode('utf-8')) & 0xffffffff)
+ checksum = parts[2]
+ self.assertEqual(expected_checksum, checksum)
+
+ def test_get_metadata_reads_a_line(self):
+ client = self._get_client()
+ client.get_metadata('some_key')
+ self.assertEqual(self.metasource_data_len, self.serial.read.call_count)
+
+ def test_get_metadata_returns_valid_value(self):
+ client = self._get_client()
+ value = client.get_metadata('some_key')
+ self.assertEqual(self.metadata_value, value)
+
+ def test_get_metadata_throws_exception_for_incorrect_length(self):
+ self.response_parts['length'] = 0
+ client = self._get_client()
+ self.assertRaises(DataSourceSmartOS.JoyentMetadataFetchException,
+ client.get_metadata, 'some_key')
+
+ def test_get_metadata_throws_exception_for_incorrect_crc(self):
+ self.response_parts['crc'] = 'deadbeef'
+ client = self._get_client()
+ self.assertRaises(DataSourceSmartOS.JoyentMetadataFetchException,
+ client.get_metadata, 'some_key')
+
+ def test_get_metadata_throws_exception_for_request_id_mismatch(self):
+ self.response_parts['request_id'] = 'deadbeef'
+ client = self._get_client()
+ client._checksum = lambda _: self.response_parts['crc']
+ self.assertRaises(DataSourceSmartOS.JoyentMetadataFetchException,
+ client.get_metadata, 'some_key')
+
+ def test_get_metadata_returns_None_if_value_not_found(self):
+ self.response_parts['payload'] = ''
+ self.response_parts['command'] = 'NOTFOUND'
+ self.response_parts['length'] = 17
+ client = self._get_client()
+ client._checksum = lambda _: self.response_parts['crc']
+ self.assertIsNone(client.get_metadata('some_key'))