summaryrefslogtreecommitdiff
path: root/tests/unittests/test_datasource
diff options
context:
space:
mode:
Diffstat (limited to 'tests/unittests/test_datasource')
-rw-r--r--tests/unittests/test_datasource/test_azure.py327
-rw-r--r--tests/unittests/test_datasource/test_nocloud.py2
-rw-r--r--tests/unittests/test_datasource/test_smartos.py191
3 files changed, 519 insertions, 1 deletions
diff --git a/tests/unittests/test_datasource/test_azure.py b/tests/unittests/test_datasource/test_azure.py
new file mode 100644
index 00000000..4cd3f213
--- /dev/null
+++ b/tests/unittests/test_datasource/test_azure.py
@@ -0,0 +1,327 @@
+from cloudinit import helpers
+from cloudinit.sources import DataSourceAzure
+from tests.unittests.helpers import populate_dir
+
+import base64
+from mocker import MockerTestCase
+import os
+import yaml
+
+
+def construct_valid_ovf_env(data=None, pubkeys=None, userdata=None):
+ if data is None:
+ data = {'HostName': 'FOOHOST'}
+ if pubkeys is None:
+ pubkeys = {}
+
+ content = """<?xml version="1.0" encoding="utf-8"?>
+<Environment xmlns="http://schemas.dmtf.org/ovf/environment/1"
+ xmlns:oe="http://schemas.dmtf.org/ovf/environment/1"
+ xmlns:wa="http://schemas.microsoft.com/windowsazure"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+
+ <wa:ProvisioningSection><wa:Version>1.0</wa:Version>
+ <LinuxProvisioningConfigurationSet
+ xmlns="http://schemas.microsoft.com/windowsazure"
+ xmlns:i="http://www.w3.org/2001/XMLSchema-instance">
+ <ConfigurationSetType>LinuxProvisioningConfiguration</ConfigurationSetType>
+ """
+ for key, dval in data.items():
+ if isinstance(dval, dict):
+ val = dval.get('text')
+ attrs = ' ' + ' '.join(["%s='%s'" % (k, v) for k, v in dval.items()
+ if k != 'text'])
+ else:
+ val = dval
+ attrs = ""
+ content += "<%s%s>%s</%s>\n" % (key, attrs, val, key)
+
+ if userdata:
+ content += "<UserData>%s</UserData>\n" % (base64.b64encode(userdata))
+
+ if pubkeys:
+ content += "<SSH><PublicKeys>\n"
+ for fp, path in pubkeys:
+ content += " <PublicKey>"
+ content += ("<Fingerprint>%s</Fingerprint><Path>%s</Path>" %
+ (fp, path))
+ content += "</PublicKey>\n"
+ content += "</PublicKeys></SSH>"
+ content += """
+ </LinuxProvisioningConfigurationSet>
+ </wa:ProvisioningSection>
+ <wa:PlatformSettingsSection><wa:Version>1.0</wa:Version>
+ <PlatformSettings xmlns="http://schemas.microsoft.com/windowsazure"
+ xmlns:i="http://www.w3.org/2001/XMLSchema-instance">
+ <KmsServerHostname>kms.core.windows.net</KmsServerHostname>
+ <ProvisionGuestAgent>false</ProvisionGuestAgent>
+ <GuestAgentPackageName i:nil="true" />
+ </PlatformSettings></wa:PlatformSettingsSection>
+</Environment>
+ """
+
+ return content
+
+
+class TestAzureDataSource(MockerTestCase):
+
+ def setUp(self):
+ # makeDir comes from MockerTestCase
+ self.tmp = self.makeDir()
+
+ # patch cloud_dir, so our 'seed_dir' is guaranteed empty
+ self.paths = helpers.Paths({'cloud_dir': self.tmp})
+
+ self.unapply = []
+ super(TestAzureDataSource, self).setUp()
+
+ def tearDown(self):
+ apply_patches([i for i in reversed(self.unapply)])
+ super(TestAzureDataSource, self).tearDown()
+
+ def apply_patches(self, patches):
+ ret = apply_patches(patches)
+ self.unapply += ret
+
+ def _get_ds(self, data):
+
+ def dsdevs():
+ return data.get('dsdevs', [])
+
+ def _invoke_agent(cmd):
+ data['agent_invoked'] = cmd
+
+ def _write_files(datadir, files, dirmode):
+ data['files'] = {}
+ data['datadir'] = datadir
+ data['datadir_mode'] = dirmode
+ for (fname, content) in files.items():
+ data['files'][fname] = content
+
+ def _wait_for_files(flist, _maxwait=None, _naplen=None):
+ data['waited'] = flist
+ return []
+
+ def _pubkeys_from_crt_files(flist):
+ data['pubkey_files'] = flist
+ return ["pubkey_from: %s" % f for f in flist]
+
+ def _iid_from_shared_config(path):
+ data['iid_from_shared_cfg'] = path
+ return 'i-my-azure-id'
+
+ def _apply_hostname_bounce(**kwargs):
+ data['apply_hostname_bounce'] = kwargs
+
+ if data.get('ovfcontent') is not None:
+ populate_dir(os.path.join(self.paths.seed_dir, "azure"),
+ {'ovf-env.xml': data['ovfcontent']})
+
+ mod = DataSourceAzure
+
+ if data.get('dsdevs'):
+ self.apply_patches([(mod, 'list_possible_azure_ds_devs', dsdevs)])
+
+ self.apply_patches([(mod, 'invoke_agent', _invoke_agent),
+ (mod, 'write_files', _write_files),
+ (mod, 'wait_for_files', _wait_for_files),
+ (mod, 'pubkeys_from_crt_files',
+ _pubkeys_from_crt_files),
+ (mod, 'iid_from_shared_config',
+ _iid_from_shared_config),
+ (mod, 'apply_hostname_bounce',
+ _apply_hostname_bounce), ])
+
+ dsrc = mod.DataSourceAzureNet(
+ data.get('sys_cfg', {}), distro=None, paths=self.paths)
+
+ return dsrc
+
+ def test_basic_seed_dir(self):
+ odata = {'HostName': "myhost", 'UserName': "myuser"}
+ data = {'ovfcontent': construct_valid_ovf_env(data=odata),
+ 'sys_cfg': {}}
+
+ dsrc = self._get_ds(data)
+ ret = dsrc.get_data()
+ self.assertTrue(ret)
+ self.assertEqual(dsrc.userdata_raw, "")
+ self.assertEqual(dsrc.metadata['local-hostname'], odata['HostName'])
+ self.assertTrue('ovf-env.xml' in data['files'])
+ self.assertEqual(0700, data['datadir_mode'])
+ self.assertEqual(dsrc.metadata['instance-id'], 'i-my-azure-id')
+
+ def test_user_cfg_set_agent_command_plain(self):
+ # set dscfg in via plaintext
+ cfg = {'agent_command': "my_command"}
+ odata = {'HostName': "myhost", 'UserName': "myuser",
+ 'dscfg': {'text': yaml.dump(cfg), 'encoding': 'plain'}}
+ data = {'ovfcontent': construct_valid_ovf_env(data=odata)}
+
+ dsrc = self._get_ds(data)
+ ret = dsrc.get_data()
+ self.assertTrue(ret)
+ self.assertEqual(data['agent_invoked'], cfg['agent_command'])
+
+ def test_user_cfg_set_agent_command(self):
+ # set dscfg in via base64 encoded yaml
+ cfg = {'agent_command': "my_command"}
+ odata = {'HostName': "myhost", 'UserName': "myuser",
+ 'dscfg': {'text': base64.b64encode(yaml.dump(cfg)),
+ 'encoding': 'base64'}}
+ data = {'ovfcontent': construct_valid_ovf_env(data=odata)}
+
+ dsrc = self._get_ds(data)
+ ret = dsrc.get_data()
+ self.assertTrue(ret)
+ self.assertEqual(data['agent_invoked'], cfg['agent_command'])
+
+ def test_sys_cfg_set_agent_command(self):
+ sys_cfg = {'datasource': {'Azure': {'agent_command': '_COMMAND'}}}
+ data = {'ovfcontent': construct_valid_ovf_env(data={}),
+ 'sys_cfg': sys_cfg}
+
+ dsrc = self._get_ds(data)
+ ret = dsrc.get_data()
+ self.assertTrue(ret)
+ self.assertEqual(data['agent_invoked'], '_COMMAND')
+
+ def test_username_used(self):
+ odata = {'HostName': "myhost", 'UserName': "myuser"}
+ data = {'ovfcontent': construct_valid_ovf_env(data=odata)}
+
+ dsrc = self._get_ds(data)
+ ret = dsrc.get_data()
+ self.assertTrue(ret)
+ self.assertEqual(dsrc.cfg['system_info']['default_user']['name'],
+ "myuser")
+
+ def test_password_given(self):
+ odata = {'HostName': "myhost", 'UserName': "myuser",
+ 'UserPassword': "mypass"}
+ data = {'ovfcontent': construct_valid_ovf_env(data=odata)}
+
+ dsrc = self._get_ds(data)
+ ret = dsrc.get_data()
+ self.assertTrue(ret)
+ self.assertTrue('default_user' in dsrc.cfg['system_info'])
+ defuser = dsrc.cfg['system_info']['default_user']
+
+ # default user shoudl be updated for password and username
+ # and should not be locked.
+ self.assertEqual(defuser['name'], odata['UserName'])
+ self.assertEqual(defuser['password'], odata['UserPassword'])
+ self.assertFalse(defuser['lock_passwd'])
+
+ def test_userdata_found(self):
+ mydata = "FOOBAR"
+ odata = {'UserData': base64.b64encode(mydata)}
+ data = {'ovfcontent': construct_valid_ovf_env(data=odata)}
+
+ dsrc = self._get_ds(data)
+ ret = dsrc.get_data()
+ self.assertTrue(ret)
+ self.assertEqual(dsrc.userdata_raw, mydata)
+
+ def test_no_datasource_expected(self):
+ #no source should be found if no seed_dir and no devs
+ data = {}
+ dsrc = self._get_ds({})
+ ret = dsrc.get_data()
+ self.assertFalse(ret)
+ self.assertFalse('agent_invoked' in data)
+
+ def test_cfg_has_pubkeys(self):
+ odata = {'HostName': "myhost", 'UserName': "myuser"}
+ mypklist = [{'fingerprint': 'fp1', 'path': 'path1'}]
+ pubkeys = [(x['fingerprint'], x['path']) for x in mypklist]
+ data = {'ovfcontent': construct_valid_ovf_env(data=odata,
+ pubkeys=pubkeys)}
+
+ dsrc = self._get_ds(data)
+ ret = dsrc.get_data()
+ self.assertTrue(ret)
+ for mypk in mypklist:
+ self.assertIn(mypk, dsrc.cfg['_pubkeys'])
+
+ def test_disabled_bounce(self):
+ pass
+
+ def test_apply_bounce_call_1(self):
+ # hostname needs to get through to apply_hostname_bounce
+ mydata = "FOOBAR"
+ odata = {'HostName': 'my-random-hostname'}
+ data = {'ovfcontent': construct_valid_ovf_env(data=odata)}
+
+ self._get_ds(data).get_data()
+ self.assertIn('hostname', data['apply_hostname_bounce'])
+ self.assertEqual(data['apply_hostname_bounce']['hostname'],
+ odata['HostName'])
+
+ def test_apply_bounce_call_configurable(self):
+ # hostname_bounce should be configurable in datasource cfg
+ cfg = {'hostname_bounce': {'interface': 'eth1', 'policy': 'off',
+ 'command': 'my-bounce-command',
+ 'hostname_command': 'my-hostname-command'}}
+ odata = {'HostName': "xhost",
+ 'dscfg': {'text': base64.b64encode(yaml.dump(cfg)),
+ 'encoding': 'base64'}}
+ data = {'ovfcontent': construct_valid_ovf_env(data=odata)}
+ self._get_ds(data).get_data()
+
+ for k in cfg['hostname_bounce']:
+ self.assertIn(k, data['apply_hostname_bounce'])
+
+ for k, v in cfg['hostname_bounce'].items():
+ self.assertEqual(data['apply_hostname_bounce'][k], v)
+
+ def test_set_hostname_disabled(self):
+ # config specifying set_hostname off should not bounce
+ cfg = {'set_hostname': False}
+ odata = {'HostName': "xhost",
+ 'dscfg': {'text': base64.b64encode(yaml.dump(cfg)),
+ 'encoding': 'base64'}}
+ data = {'ovfcontent': construct_valid_ovf_env(data=odata)}
+ self._get_ds(data).get_data()
+
+ self.assertEqual(data.get('apply_hostname_bounce', "N/A"), "N/A")
+
+
+class TestReadAzureOvf(MockerTestCase):
+ def test_invalid_xml_raises_non_azure_ds(self):
+ invalid_xml = "<foo>" + construct_valid_ovf_env(data={})
+ self.assertRaises(DataSourceAzure.NonAzureDataSource,
+ DataSourceAzure.read_azure_ovf, invalid_xml)
+
+ def test_load_with_pubkeys(self):
+ mypklist = [{'fingerprint': 'fp1', 'path': 'path1'}]
+ pubkeys = [(x['fingerprint'], x['path']) for x in mypklist]
+ content = construct_valid_ovf_env(pubkeys=pubkeys)
+ (_md, _ud, cfg) = DataSourceAzure.read_azure_ovf(content)
+ for mypk in mypklist:
+ self.assertIn(mypk, cfg['_pubkeys'])
+
+
+class TestReadAzureSharedConfig(MockerTestCase):
+ def test_valid_content(self):
+ xml = """<?xml version="1.0" encoding="utf-8"?>
+ <SharedConfig>
+ <Deployment name="MY_INSTANCE_ID">
+ <Service name="myservice"/>
+ <ServiceInstance name="INSTANCE_ID.0" guid="{abcd-uuid}" />
+ </Deployment>
+ <Incarnation number="1"/>
+ </SharedConfig>"""
+ ret = DataSourceAzure.iid_from_shared_config_content(xml)
+ self.assertEqual("MY_INSTANCE_ID", ret)
+
+
+def apply_patches(patches):
+ ret = []
+ for (ref, name, replace) in patches:
+ if replace is None:
+ continue
+ orig = getattr(ref, name)
+ setattr(ref, name, replace)
+ ret.append((ref, name, orig))
+ return ret
diff --git a/tests/unittests/test_datasource/test_nocloud.py b/tests/unittests/test_datasource/test_nocloud.py
index 62fc5358..7328b240 100644
--- a/tests/unittests/test_datasource/test_nocloud.py
+++ b/tests/unittests/test_datasource/test_nocloud.py
@@ -22,7 +22,7 @@ class TestNoCloudDataSource(MockerTestCase):
def tearDown(self):
apply_patches([i for i in reversed(self.unapply)])
- super(TestNoCloudDataSource, self).setUp()
+ super(TestNoCloudDataSource, self).tearDown()
def apply_patches(self, patches):
ret = apply_patches(patches)
diff --git a/tests/unittests/test_datasource/test_smartos.py b/tests/unittests/test_datasource/test_smartos.py
new file mode 100644
index 00000000..6c12f1e2
--- /dev/null
+++ b/tests/unittests/test_datasource/test_smartos.py
@@ -0,0 +1,191 @@
+# vi: ts=4 expandtab
+#
+# Copyright (C) 2013 Canonical Ltd.
+#
+# Author: Ben Howard <ben.howard@canonical.com>
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License version 3, as
+# published by the Free Software Foundation.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+#
+# This is a testcase for the SmartOS datasource. It replicates a serial
+# console and acts like the SmartOS console does in order to validate
+# return responses.
+#
+
+from cloudinit import helpers
+from cloudinit.sources import DataSourceSmartOS
+
+from mocker import MockerTestCase
+import uuid
+
+mock_returns = {
+ 'hostname': 'test-host',
+ 'root_authorized_keys': 'ssh-rsa AAAAB3Nz...aC1yc2E= keyname',
+ 'disable_iptables_flag': None,
+ 'enable_motd_sys_info': None,
+ 'system_uuid': str(uuid.uuid4()),
+ 'smartdc': 'smartdc',
+ 'userdata': """
+#!/bin/sh
+/bin/true
+""",
+}
+
+
+class MockSerial(object):
+ """Fake a serial terminal for testing the code that
+ interfaces with the serial"""
+
+ port = None
+
+ def __init__(self):
+ self.last = None
+ self.last = None
+ self.new = True
+ self.count = 0
+ self.mocked_out = []
+
+ def open(self):
+ return True
+
+ def close(self):
+ return True
+
+ def isOpen(self):
+ return True
+
+ def write(self, line):
+ line = line.replace('GET ', '')
+ self.last = line.rstrip()
+
+ def readline(self):
+ if self.new:
+ self.new = False
+ if self.last in mock_returns:
+ return 'SUCCESS\n'
+ else:
+ return 'NOTFOUND %s\n' % self.last
+
+ if self.last in mock_returns:
+ if not self.mocked_out:
+ self.mocked_out = [x for x in self._format_out()]
+ print self.mocked_out
+
+ if len(self.mocked_out) > self.count:
+ self.count += 1
+ return self.mocked_out[self.count - 1]
+
+ def _format_out(self):
+ if self.last in mock_returns:
+ try:
+ for l in mock_returns[self.last].splitlines():
+ yield "%s\n" % l
+ except:
+ yield "%s\n" % mock_returns[self.last]
+
+ yield '\n'
+ yield '.'
+
+
+class TestSmartOSDataSource(MockerTestCase):
+ def setUp(self):
+ # makeDir comes from MockerTestCase
+ self.tmp = self.makeDir()
+
+ # patch cloud_dir, so our 'seed_dir' is guaranteed empty
+ self.paths = helpers.Paths({'cloud_dir': self.tmp})
+
+ self.unapply = []
+ super(TestSmartOSDataSource, self).setUp()
+
+ def tearDown(self):
+ apply_patches([i for i in reversed(self.unapply)])
+ super(TestSmartOSDataSource, self).tearDown()
+
+ def apply_patches(self, patches):
+ ret = apply_patches(patches)
+ self.unapply += ret
+
+ def _get_ds(self):
+
+ def _get_serial(*_):
+ return MockSerial()
+
+ def _dmi_data():
+ return mock_returns['system_uuid'], 'smartdc'
+
+ data = {'sys_cfg': {}}
+ mod = DataSourceSmartOS
+ self.apply_patches([(mod, 'get_serial', _get_serial)])
+ self.apply_patches([(mod, 'dmi_data', _dmi_data)])
+ dsrc = mod.DataSourceSmartOS(
+ data.get('sys_cfg', {}), distro=None, paths=self.paths)
+ return dsrc
+
+ def test_seed(self):
+ dsrc = self._get_ds()
+ ret = dsrc.get_data()
+ self.assertTrue(ret)
+ self.assertEquals('/dev/ttyS1', dsrc.seed)
+
+ def test_issmartdc(self):
+ dsrc = self._get_ds()
+ ret = dsrc.get_data()
+ self.assertTrue(ret)
+ self.assertTrue(dsrc.is_smartdc)
+
+ def test_uuid(self):
+ dsrc = self._get_ds()
+ ret = dsrc.get_data()
+ self.assertTrue(ret)
+ self.assertEquals(mock_returns['system_uuid'],
+ dsrc.metadata['instance-id'])
+
+ def test_root_keys(self):
+ dsrc = self._get_ds()
+ ret = dsrc.get_data()
+ self.assertTrue(ret)
+ self.assertEquals(mock_returns['root_authorized_keys'],
+ dsrc.metadata['public-keys'])
+
+ def test_hostname(self):
+ dsrc = self._get_ds()
+ ret = dsrc.get_data()
+ self.assertTrue(ret)
+ self.assertEquals(mock_returns['hostname'],
+ dsrc.metadata['local-hostname'])
+
+ def test_disable_iptables_flag(self):
+ dsrc = self._get_ds()
+ ret = dsrc.get_data()
+ self.assertTrue(ret)
+ self.assertEquals(mock_returns['disable_iptables_flag'],
+ dsrc.metadata['iptables_disable'])
+
+ def test_motd_sys_info(self):
+ dsrc = self._get_ds()
+ ret = dsrc.get_data()
+ self.assertTrue(ret)
+ self.assertEquals(mock_returns['enable_motd_sys_info'],
+ dsrc.metadata['motd_sys_info'])
+
+
+def apply_patches(patches):
+ ret = []
+ for (ref, name, replace) in patches:
+ if replace is None:
+ continue
+ orig = getattr(ref, name)
+ setattr(ref, name, replace)
+ ret.append((ref, name, orig))
+ return ret