summaryrefslogtreecommitdiff
path: root/tests/unittests/test_datasource
diff options
context:
space:
mode:
Diffstat (limited to 'tests/unittests/test_datasource')
-rw-r--r--tests/unittests/test_datasource/test_azure.py86
-rw-r--r--tests/unittests/test_datasource/test_smartos.py273
2 files changed, 352 insertions, 7 deletions
diff --git a/tests/unittests/test_datasource/test_azure.py b/tests/unittests/test_datasource/test_azure.py
index 2e8583f9..1ca6a79d 100644
--- a/tests/unittests/test_datasource/test_azure.py
+++ b/tests/unittests/test_datasource/test_azure.py
@@ -2,6 +2,7 @@ from cloudinit import helpers
from cloudinit.sources import DataSourceAzure
from tests.unittests.helpers import populate_dir
+import crypt
import base64
from mocker import MockerTestCase
import os
@@ -26,8 +27,15 @@ def construct_valid_ovf_env(data=None, pubkeys=None, userdata=None):
xmlns:i="http://www.w3.org/2001/XMLSchema-instance">
<ConfigurationSetType>LinuxProvisioningConfiguration</ConfigurationSetType>
"""
- for key, val in data.items():
- content += "<%s>%s</%s>\n" % (key, val, key)
+ for key, dval in data.items():
+ if isinstance(dval, dict):
+ val = dval.get('text')
+ attrs = ' ' + ' '.join(["%s='%s'" % (k, v) for k, v in dval.items()
+ if k != 'text'])
+ else:
+ val = dval
+ attrs = ""
+ content += "<%s%s>%s</%s>\n" % (key, attrs, val, key)
if userdata:
content += "<UserData>%s</UserData>\n" % (base64.b64encode(userdata))
@@ -103,6 +111,9 @@ class TestAzureDataSource(MockerTestCase):
data['iid_from_shared_cfg'] = path
return 'i-my-azure-id'
+ def _apply_hostname_bounce(**kwargs):
+ data['apply_hostname_bounce'] = kwargs
+
if data.get('ovfcontent') is not None:
populate_dir(os.path.join(self.paths.seed_dir, "azure"),
{'ovf-env.xml': data['ovfcontent']})
@@ -118,7 +129,9 @@ class TestAzureDataSource(MockerTestCase):
(mod, 'pubkeys_from_crt_files',
_pubkeys_from_crt_files),
(mod, 'iid_from_shared_config',
- _iid_from_shared_config), ])
+ _iid_from_shared_config),
+ (mod, 'apply_hostname_bounce',
+ _apply_hostname_bounce), ])
dsrc = mod.DataSourceAzureNet(
data.get('sys_cfg', {}), distro=None, paths=self.paths)
@@ -139,10 +152,24 @@ class TestAzureDataSource(MockerTestCase):
self.assertEqual(0700, data['datadir_mode'])
self.assertEqual(dsrc.metadata['instance-id'], 'i-my-azure-id')
+ def test_user_cfg_set_agent_command_plain(self):
+ # set dscfg in via plaintext
+ cfg = {'agent_command': "my_command"}
+ odata = {'HostName': "myhost", 'UserName': "myuser",
+ 'dscfg': {'text': yaml.dump(cfg), 'encoding': 'plain'}}
+ data = {'ovfcontent': construct_valid_ovf_env(data=odata)}
+
+ dsrc = self._get_ds(data)
+ ret = dsrc.get_data()
+ self.assertTrue(ret)
+ self.assertEqual(data['agent_invoked'], cfg['agent_command'])
+
def test_user_cfg_set_agent_command(self):
+ # set dscfg in via base64 encoded yaml
cfg = {'agent_command': "my_command"}
odata = {'HostName': "myhost", 'UserName': "myuser",
- 'dscfg': yaml.dump(cfg)}
+ 'dscfg': {'text': base64.b64encode(yaml.dump(cfg)),
+ 'encoding': 'base64'}}
data = {'ovfcontent': construct_valid_ovf_env(data=odata)}
dsrc = self._get_ds(data)
@@ -181,11 +208,15 @@ class TestAzureDataSource(MockerTestCase):
self.assertTrue('default_user' in dsrc.cfg['system_info'])
defuser = dsrc.cfg['system_info']['default_user']
- # default user shoudl be updated for password and username
- # and should not be locked.
+ # default user should be updated username and should not be locked.
self.assertEqual(defuser['name'], odata['UserName'])
- self.assertEqual(defuser['password'], odata['UserPassword'])
self.assertFalse(defuser['lock_passwd'])
+ # passwd is crypt formated string $id$salt$encrypted
+ # encrypting plaintext with salt value of everything up to final '$'
+ # should equal that after the '$'
+ pos = defuser['passwd'].rfind("$") + 1
+ self.assertEqual(defuser['passwd'],
+ crypt.crypt(odata['UserPassword'], defuser['passwd'][0:pos]))
def test_userdata_found(self):
mydata = "FOOBAR"
@@ -218,6 +249,47 @@ class TestAzureDataSource(MockerTestCase):
for mypk in mypklist:
self.assertIn(mypk, dsrc.cfg['_pubkeys'])
+ def test_disabled_bounce(self):
+ pass
+
+ def test_apply_bounce_call_1(self):
+ # hostname needs to get through to apply_hostname_bounce
+ odata = {'HostName': 'my-random-hostname'}
+ data = {'ovfcontent': construct_valid_ovf_env(data=odata)}
+
+ self._get_ds(data).get_data()
+ self.assertIn('hostname', data['apply_hostname_bounce'])
+ self.assertEqual(data['apply_hostname_bounce']['hostname'],
+ odata['HostName'])
+
+ def test_apply_bounce_call_configurable(self):
+ # hostname_bounce should be configurable in datasource cfg
+ cfg = {'hostname_bounce': {'interface': 'eth1', 'policy': 'off',
+ 'command': 'my-bounce-command',
+ 'hostname_command': 'my-hostname-command'}}
+ odata = {'HostName': "xhost",
+ 'dscfg': {'text': base64.b64encode(yaml.dump(cfg)),
+ 'encoding': 'base64'}}
+ data = {'ovfcontent': construct_valid_ovf_env(data=odata)}
+ self._get_ds(data).get_data()
+
+ for k in cfg['hostname_bounce']:
+ self.assertIn(k, data['apply_hostname_bounce'])
+
+ for k, v in cfg['hostname_bounce'].items():
+ self.assertEqual(data['apply_hostname_bounce'][k], v)
+
+ def test_set_hostname_disabled(self):
+ # config specifying set_hostname off should not bounce
+ cfg = {'set_hostname': False}
+ odata = {'HostName': "xhost",
+ 'dscfg': {'text': base64.b64encode(yaml.dump(cfg)),
+ 'encoding': 'base64'}}
+ data = {'ovfcontent': construct_valid_ovf_env(data=odata)}
+ self._get_ds(data).get_data()
+
+ self.assertEqual(data.get('apply_hostname_bounce', "N/A"), "N/A")
+
class TestReadAzureOvf(MockerTestCase):
def test_invalid_xml_raises_non_azure_ds(self):
diff --git a/tests/unittests/test_datasource/test_smartos.py b/tests/unittests/test_datasource/test_smartos.py
new file mode 100644
index 00000000..f53715b0
--- /dev/null
+++ b/tests/unittests/test_datasource/test_smartos.py
@@ -0,0 +1,273 @@
+# vi: ts=4 expandtab
+#
+# Copyright (C) 2013 Canonical Ltd.
+#
+# Author: Ben Howard <ben.howard@canonical.com>
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License version 3, as
+# published by the Free Software Foundation.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+#
+# This is a testcase for the SmartOS datasource. It replicates a serial
+# console and acts like the SmartOS console does in order to validate
+# return responses.
+#
+
+import base64
+from cloudinit import helpers
+from cloudinit.sources import DataSourceSmartOS
+
+from mocker import MockerTestCase
+import uuid
+
+MOCK_RETURNS = {
+ 'hostname': 'test-host',
+ 'root_authorized_keys': 'ssh-rsa AAAAB3Nz...aC1yc2E= keyname',
+ 'disable_iptables_flag': None,
+ 'enable_motd_sys_info': None,
+ 'test-var1': 'some data',
+ 'user-data': '\n'.join(['#!/bin/sh', '/bin/true', '']),
+}
+
+DMI_DATA_RETURN = (str(uuid.uuid4()), 'smartdc')
+
+
+class MockSerial(object):
+ """Fake a serial terminal for testing the code that
+ interfaces with the serial"""
+
+ port = None
+
+ def __init__(self, mockdata):
+ self.last = None
+ self.last = None
+ self.new = True
+ self.count = 0
+ self.mocked_out = []
+ self.mockdata = mockdata
+
+ def open(self):
+ return True
+
+ def close(self):
+ return True
+
+ def isOpen(self):
+ return True
+
+ def write(self, line):
+ line = line.replace('GET ', '')
+ self.last = line.rstrip()
+
+ def readline(self):
+ if self.new:
+ self.new = False
+ if self.last in self.mockdata:
+ return 'SUCCESS\n'
+ else:
+ return 'NOTFOUND %s\n' % self.last
+
+ if self.last in self.mockdata:
+ if not self.mocked_out:
+ self.mocked_out = [x for x in self._format_out()]
+ print self.mocked_out
+
+ if len(self.mocked_out) > self.count:
+ self.count += 1
+ return self.mocked_out[self.count - 1]
+
+ def _format_out(self):
+ if self.last in self.mockdata:
+ _mret = self.mockdata[self.last]
+ try:
+ for l in _mret.splitlines():
+ yield "%s\n" % l.rstrip()
+ except:
+ yield "%s\n" % _mret.rstrip()
+
+ yield '.'
+ yield '\n'
+
+
+class TestSmartOSDataSource(MockerTestCase):
+ def setUp(self):
+ # makeDir comes from MockerTestCase
+ self.tmp = self.makeDir()
+
+ # patch cloud_dir, so our 'seed_dir' is guaranteed empty
+ self.paths = helpers.Paths({'cloud_dir': self.tmp})
+
+ self.unapply = []
+ super(TestSmartOSDataSource, self).setUp()
+
+ def tearDown(self):
+ apply_patches([i for i in reversed(self.unapply)])
+ super(TestSmartOSDataSource, self).tearDown()
+
+ def apply_patches(self, patches):
+ ret = apply_patches(patches)
+ self.unapply += ret
+
+ def _get_ds(self, sys_cfg=None, ds_cfg=None, mockdata=None, dmi_data=None):
+ mod = DataSourceSmartOS
+
+ if mockdata is None:
+ mockdata = MOCK_RETURNS
+
+ if dmi_data is None:
+ dmi_data = DMI_DATA_RETURN
+
+ def _get_serial(*_):
+ return MockSerial(mockdata)
+
+ def _dmi_data():
+ return dmi_data
+
+ if sys_cfg is None:
+ sys_cfg = {}
+
+ if ds_cfg is not None:
+ sys_cfg['datasource'] = sys_cfg.get('datasource', {})
+ sys_cfg['datasource']['SmartOS'] = ds_cfg
+
+ self.apply_patches([(mod, 'get_serial', _get_serial)])
+ self.apply_patches([(mod, 'dmi_data', _dmi_data)])
+ dsrc = mod.DataSourceSmartOS(sys_cfg, distro=None,
+ paths=self.paths)
+ return dsrc
+
+ def test_seed(self):
+ # default seed should be /dev/ttyS1
+ dsrc = self._get_ds()
+ ret = dsrc.get_data()
+ self.assertTrue(ret)
+ self.assertEquals('/dev/ttyS1', dsrc.seed)
+
+ def test_issmartdc(self):
+ dsrc = self._get_ds()
+ ret = dsrc.get_data()
+ self.assertTrue(ret)
+ self.assertTrue(dsrc.is_smartdc)
+
+ def test_no_base64(self):
+ ds_cfg = {'no_base64_decode': ['test_var1'], 'all_base': True}
+ dsrc = self._get_ds(ds_cfg=ds_cfg)
+ ret = dsrc.get_data()
+ self.assertTrue(ret)
+
+ def test_uuid(self):
+ dsrc = self._get_ds(mockdata=MOCK_RETURNS)
+ ret = dsrc.get_data()
+ self.assertTrue(ret)
+ self.assertEquals(DMI_DATA_RETURN[0], dsrc.metadata['instance-id'])
+
+ def test_root_keys(self):
+ dsrc = self._get_ds(mockdata=MOCK_RETURNS)
+ ret = dsrc.get_data()
+ self.assertTrue(ret)
+ self.assertEquals(MOCK_RETURNS['root_authorized_keys'],
+ dsrc.metadata['public-keys'])
+
+ def test_hostname_b64(self):
+ dsrc = self._get_ds(mockdata=MOCK_RETURNS)
+ ret = dsrc.get_data()
+ self.assertTrue(ret)
+ self.assertEquals(MOCK_RETURNS['hostname'],
+ dsrc.metadata['local-hostname'])
+
+ def test_hostname(self):
+ dsrc = self._get_ds(mockdata=MOCK_RETURNS)
+ ret = dsrc.get_data()
+ self.assertTrue(ret)
+ self.assertEquals(MOCK_RETURNS['hostname'],
+ dsrc.metadata['local-hostname'])
+
+ def test_base64_all(self):
+ # metadata provided base64_all of true
+ my_returns = MOCK_RETURNS.copy()
+ my_returns['base64_all'] = "true"
+ for k in ('hostname', 'user-data'):
+ my_returns[k] = base64.b64encode(my_returns[k])
+
+ dsrc = self._get_ds(mockdata=my_returns)
+ ret = dsrc.get_data()
+ self.assertTrue(ret)
+ self.assertEquals(MOCK_RETURNS['hostname'],
+ dsrc.metadata['local-hostname'])
+ self.assertEquals(MOCK_RETURNS['user-data'],
+ dsrc.userdata_raw)
+ self.assertEquals(MOCK_RETURNS['root_authorized_keys'],
+ dsrc.metadata['public-keys'])
+ self.assertEquals(MOCK_RETURNS['disable_iptables_flag'],
+ dsrc.metadata['iptables_disable'])
+ self.assertEquals(MOCK_RETURNS['enable_motd_sys_info'],
+ dsrc.metadata['motd_sys_info'])
+
+ def test_b64_userdata(self):
+ my_returns = MOCK_RETURNS.copy()
+ my_returns['b64-user-data'] = "true"
+ my_returns['b64-hostname'] = "true"
+ for k in ('hostname', 'user-data'):
+ my_returns[k] = base64.b64encode(my_returns[k])
+
+ dsrc = self._get_ds(mockdata=my_returns)
+ ret = dsrc.get_data()
+ self.assertTrue(ret)
+ self.assertEquals(MOCK_RETURNS['hostname'],
+ dsrc.metadata['local-hostname'])
+ self.assertEquals(MOCK_RETURNS['user-data'], dsrc.userdata_raw)
+ self.assertEquals(MOCK_RETURNS['root_authorized_keys'],
+ dsrc.metadata['public-keys'])
+
+ def test_b64_keys(self):
+ my_returns = MOCK_RETURNS.copy()
+ my_returns['base64_keys'] = 'hostname,ignored'
+ for k in ('hostname',):
+ my_returns[k] = base64.b64encode(my_returns[k])
+
+ dsrc = self._get_ds(mockdata=my_returns)
+ ret = dsrc.get_data()
+ self.assertTrue(ret)
+ self.assertEquals(MOCK_RETURNS['hostname'],
+ dsrc.metadata['local-hostname'])
+ self.assertEquals(MOCK_RETURNS['user-data'], dsrc.userdata_raw)
+
+ def test_userdata(self):
+ dsrc = self._get_ds(mockdata=MOCK_RETURNS)
+ ret = dsrc.get_data()
+ self.assertTrue(ret)
+ self.assertEquals(MOCK_RETURNS['user-data'], dsrc.userdata_raw)
+
+ def test_disable_iptables_flag(self):
+ dsrc = self._get_ds(mockdata=MOCK_RETURNS)
+ ret = dsrc.get_data()
+ self.assertTrue(ret)
+ self.assertEquals(MOCK_RETURNS['disable_iptables_flag'],
+ dsrc.metadata['iptables_disable'])
+
+ def test_motd_sys_info(self):
+ dsrc = self._get_ds(mockdata=MOCK_RETURNS)
+ ret = dsrc.get_data()
+ self.assertTrue(ret)
+ self.assertEquals(MOCK_RETURNS['enable_motd_sys_info'],
+ dsrc.metadata['motd_sys_info'])
+
+
+def apply_patches(patches):
+ ret = []
+ for (ref, name, replace) in patches:
+ if replace is None:
+ continue
+ orig = getattr(ref, name)
+ setattr(ref, name, replace)
+ ret.append((ref, name, orig))
+ return ret