summaryrefslogtreecommitdiff
path: root/tests/unittests
diff options
context:
space:
mode:
Diffstat (limited to 'tests/unittests')
-rw-r--r--tests/unittests/helpers.py4
-rw-r--r--tests/unittests/test__init__.py36
-rw-r--r--tests/unittests/test_builtin_handlers.py22
-rw-r--r--tests/unittests/test_datasource/test_azure.py331
-rw-r--r--tests/unittests/test_datasource/test_configdrive.py5
-rw-r--r--tests/unittests/test_datasource/test_maas.py12
-rw-r--r--tests/unittests/test_datasource/test_nocloud.py4
-rw-r--r--tests/unittests/test_datasource/test_smartos.py273
-rw-r--r--tests/unittests/test_handler/test_handler_apt_configure.py106
-rw-r--r--tests/unittests/test_handler/test_handler_growpart.py258
-rw-r--r--tests/unittests/test_handler/test_handler_locale.py64
-rw-r--r--tests/unittests/test_handler/test_handler_set_hostname.py14
-rw-r--r--tests/unittests/test_handler/test_handler_timezone.py75
-rw-r--r--tests/unittests/test_merging.py309
-rw-r--r--tests/unittests/test_sshutil.py101
-rw-r--r--tests/unittests/test_userdata.py207
-rw-r--r--tests/unittests/test_util.py59
17 files changed, 1769 insertions, 111 deletions
diff --git a/tests/unittests/helpers.py b/tests/unittests/helpers.py
index 904677f1..c0da0983 100644
--- a/tests/unittests/helpers.py
+++ b/tests/unittests/helpers.py
@@ -146,7 +146,8 @@ class FilesystemMockingTestCase(ResourceUsingTestCase):
('chmod', 1),
('delete_dir_contents', 1),
('del_file', 1),
- ('sym_link', -1)],
+ ('sym_link', -1),
+ ('copy', -1)],
}
for (mod, funcs) in patch_funcs.items():
for (f, am) in funcs:
@@ -175,6 +176,7 @@ class FilesystemMockingTestCase(ResourceUsingTestCase):
def patchOS(self, new_root):
patch_funcs = {
os.path: ['isfile', 'exists', 'islink', 'isdir'],
+ os: ['listdir'],
}
for (mod, funcs) in patch_funcs.items():
for f in funcs:
diff --git a/tests/unittests/test__init__.py b/tests/unittests/test__init__.py
index ac082076..b4b20e51 100644
--- a/tests/unittests/test__init__.py
+++ b/tests/unittests/test__init__.py
@@ -22,7 +22,8 @@ class FakeModule(handlers.Handler):
def list_types(self):
return self.types
- def _handle_part(self, data, ctype, filename, payload, frequency):
+ def handle_part(self, _data, ctype, filename, # pylint: disable=W0221
+ payload, frequency):
pass
@@ -103,6 +104,9 @@ class TestHandlerHandlePart(MockerTestCase):
self.filename = "fake filename"
self.payload = "fake payload"
self.frequency = settings.PER_INSTANCE
+ self.headers = {
+ 'Content-Type': self.ctype,
+ }
def test_normal_version_1(self):
"""
@@ -118,8 +122,8 @@ class TestHandlerHandlePart(MockerTestCase):
self.payload)
self.mocker.replay()
- handlers.run_part(mod_mock, self.data, self.ctype, self.filename,
- self.payload, self.frequency)
+ handlers.run_part(mod_mock, self.data, self.filename,
+ self.payload, self.frequency, self.headers)
def test_normal_version_2(self):
"""
@@ -135,8 +139,8 @@ class TestHandlerHandlePart(MockerTestCase):
self.payload, self.frequency)
self.mocker.replay()
- handlers.run_part(mod_mock, self.data, self.ctype, self.filename,
- self.payload, self.frequency)
+ handlers.run_part(mod_mock, self.data, self.filename,
+ self.payload, self.frequency, self.headers)
def test_modfreq_per_always(self):
"""
@@ -152,8 +156,8 @@ class TestHandlerHandlePart(MockerTestCase):
self.payload)
self.mocker.replay()
- handlers.run_part(mod_mock, self.data, self.ctype, self.filename,
- self.payload, self.frequency)
+ handlers.run_part(mod_mock, self.data, self.filename,
+ self.payload, self.frequency, self.headers)
def test_no_handle_when_modfreq_once(self):
"""C{handle_part} is not called if frequency is once."""
@@ -163,8 +167,8 @@ class TestHandlerHandlePart(MockerTestCase):
self.mocker.result(settings.PER_ONCE)
self.mocker.replay()
- handlers.run_part(mod_mock, self.data, self.ctype, self.filename,
- self.payload, self.frequency)
+ handlers.run_part(mod_mock, self.data, self.filename,
+ self.payload, self.frequency, self.headers)
def test_exception_is_caught(self):
"""Exceptions within C{handle_part} are caught and logged."""
@@ -178,8 +182,8 @@ class TestHandlerHandlePart(MockerTestCase):
self.mocker.throw(Exception())
self.mocker.replay()
- handlers.run_part(mod_mock, self.data, self.ctype, self.filename,
- self.payload, self.frequency)
+ handlers.run_part(mod_mock, self.data, self.filename,
+ self.payload, self.frequency, self.headers)
class TestCmdlineUrl(MockerTestCase):
@@ -191,8 +195,8 @@ class TestCmdlineUrl(MockerTestCase):
mock_readurl = self.mocker.replace(url_helper.readurl,
passthrough=False)
- mock_readurl(url)
- self.mocker.result(url_helper.UrlResponse(200, payload))
+ mock_readurl(url, ARGS, KWARGS)
+ self.mocker.result(util.StringResponse(payload))
self.mocker.replay()
self.assertEqual((key, url, None),
@@ -207,8 +211,8 @@ class TestCmdlineUrl(MockerTestCase):
mock_readurl = self.mocker.replace(url_helper.readurl,
passthrough=False)
- mock_readurl(url)
- self.mocker.result(url_helper.UrlResponse(200, payload))
+ mock_readurl(url, ARGS, KWARGS)
+ self.mocker.result(util.StringResponse(payload))
self.mocker.replay()
self.assertEqual((key, url, payload),
@@ -221,7 +225,7 @@ class TestCmdlineUrl(MockerTestCase):
cmdline = "ro %s=%s bar=1" % (key, url)
self.mocker.replace(url_helper.readurl, passthrough=False)
- self.mocker.result(url_helper.UrlResponse(400))
+ self.mocker.result(util.StringResponse(""))
self.mocker.replay()
self.assertEqual((None, None, None),
diff --git a/tests/unittests/test_builtin_handlers.py b/tests/unittests/test_builtin_handlers.py
index 5f41cb3d..b387f13b 100644
--- a/tests/unittests/test_builtin_handlers.py
+++ b/tests/unittests/test_builtin_handlers.py
@@ -2,7 +2,7 @@
import os
-from mocker import MockerTestCase
+from tests.unittests import helpers as test_helpers
from cloudinit import handlers
from cloudinit import helpers
@@ -13,7 +13,7 @@ from cloudinit.handlers import upstart_job
from cloudinit.settings import (PER_ALWAYS, PER_INSTANCE)
-class TestBuiltins(MockerTestCase):
+class TestBuiltins(test_helpers.FilesystemMockingTestCase):
def test_upstart_frequency_no_out(self):
c_root = self.makeDir()
@@ -36,13 +36,18 @@ class TestBuiltins(MockerTestCase):
def test_upstart_frequency_single(self):
# files should be written out when frequency is ! per-instance
- c_root = self.makeDir()
- up_root = self.makeDir()
+ new_root = self.makeDir()
+ freq = PER_INSTANCE
+
+ self.patchOS(new_root)
+ self.patchUtils(new_root)
paths = helpers.Paths({
- 'cloud_dir': c_root,
- 'upstart_dir': up_root,
+ 'upstart_dir': "/etc/upstart",
})
- freq = PER_INSTANCE
+
+ upstart_job.SUITABLE_UPSTART = True
+ util.ensure_dir("/run")
+ util.ensure_dir("/etc/upstart")
mock_subp = self.mocker.replace(util.subp, passthrough=False)
mock_subp(["initctl", "reload-configuration"], capture=False)
@@ -55,4 +60,5 @@ class TestBuiltins(MockerTestCase):
'test.conf', 'blah', freq)
h.handle_part('', handlers.CONTENT_END,
None, None, None)
- self.assertEquals(1, len(os.listdir(up_root)))
+
+ self.assertEquals(1, len(os.listdir('/etc/upstart')))
diff --git a/tests/unittests/test_datasource/test_azure.py b/tests/unittests/test_datasource/test_azure.py
new file mode 100644
index 00000000..1ca6a79d
--- /dev/null
+++ b/tests/unittests/test_datasource/test_azure.py
@@ -0,0 +1,331 @@
+from cloudinit import helpers
+from cloudinit.sources import DataSourceAzure
+from tests.unittests.helpers import populate_dir
+
+import crypt
+import base64
+from mocker import MockerTestCase
+import os
+import yaml
+
+
+def construct_valid_ovf_env(data=None, pubkeys=None, userdata=None):
+ if data is None:
+ data = {'HostName': 'FOOHOST'}
+ if pubkeys is None:
+ pubkeys = {}
+
+ content = """<?xml version="1.0" encoding="utf-8"?>
+<Environment xmlns="http://schemas.dmtf.org/ovf/environment/1"
+ xmlns:oe="http://schemas.dmtf.org/ovf/environment/1"
+ xmlns:wa="http://schemas.microsoft.com/windowsazure"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+
+ <wa:ProvisioningSection><wa:Version>1.0</wa:Version>
+ <LinuxProvisioningConfigurationSet
+ xmlns="http://schemas.microsoft.com/windowsazure"
+ xmlns:i="http://www.w3.org/2001/XMLSchema-instance">
+ <ConfigurationSetType>LinuxProvisioningConfiguration</ConfigurationSetType>
+ """
+ for key, dval in data.items():
+ if isinstance(dval, dict):
+ val = dval.get('text')
+ attrs = ' ' + ' '.join(["%s='%s'" % (k, v) for k, v in dval.items()
+ if k != 'text'])
+ else:
+ val = dval
+ attrs = ""
+ content += "<%s%s>%s</%s>\n" % (key, attrs, val, key)
+
+ if userdata:
+ content += "<UserData>%s</UserData>\n" % (base64.b64encode(userdata))
+
+ if pubkeys:
+ content += "<SSH><PublicKeys>\n"
+ for fp, path in pubkeys:
+ content += " <PublicKey>"
+ content += ("<Fingerprint>%s</Fingerprint><Path>%s</Path>" %
+ (fp, path))
+ content += "</PublicKey>\n"
+ content += "</PublicKeys></SSH>"
+ content += """
+ </LinuxProvisioningConfigurationSet>
+ </wa:ProvisioningSection>
+ <wa:PlatformSettingsSection><wa:Version>1.0</wa:Version>
+ <PlatformSettings xmlns="http://schemas.microsoft.com/windowsazure"
+ xmlns:i="http://www.w3.org/2001/XMLSchema-instance">
+ <KmsServerHostname>kms.core.windows.net</KmsServerHostname>
+ <ProvisionGuestAgent>false</ProvisionGuestAgent>
+ <GuestAgentPackageName i:nil="true" />
+ </PlatformSettings></wa:PlatformSettingsSection>
+</Environment>
+ """
+
+ return content
+
+
+class TestAzureDataSource(MockerTestCase):
+
+ def setUp(self):
+ # makeDir comes from MockerTestCase
+ self.tmp = self.makeDir()
+
+ # patch cloud_dir, so our 'seed_dir' is guaranteed empty
+ self.paths = helpers.Paths({'cloud_dir': self.tmp})
+
+ self.unapply = []
+ super(TestAzureDataSource, self).setUp()
+
+ def tearDown(self):
+ apply_patches([i for i in reversed(self.unapply)])
+ super(TestAzureDataSource, self).tearDown()
+
+ def apply_patches(self, patches):
+ ret = apply_patches(patches)
+ self.unapply += ret
+
+ def _get_ds(self, data):
+
+ def dsdevs():
+ return data.get('dsdevs', [])
+
+ def _invoke_agent(cmd):
+ data['agent_invoked'] = cmd
+
+ def _write_files(datadir, files, dirmode):
+ data['files'] = {}
+ data['datadir'] = datadir
+ data['datadir_mode'] = dirmode
+ for (fname, content) in files.items():
+ data['files'][fname] = content
+
+ def _wait_for_files(flist, _maxwait=None, _naplen=None):
+ data['waited'] = flist
+ return []
+
+ def _pubkeys_from_crt_files(flist):
+ data['pubkey_files'] = flist
+ return ["pubkey_from: %s" % f for f in flist]
+
+ def _iid_from_shared_config(path):
+ data['iid_from_shared_cfg'] = path
+ return 'i-my-azure-id'
+
+ def _apply_hostname_bounce(**kwargs):
+ data['apply_hostname_bounce'] = kwargs
+
+ if data.get('ovfcontent') is not None:
+ populate_dir(os.path.join(self.paths.seed_dir, "azure"),
+ {'ovf-env.xml': data['ovfcontent']})
+
+ mod = DataSourceAzure
+
+ if data.get('dsdevs'):
+ self.apply_patches([(mod, 'list_possible_azure_ds_devs', dsdevs)])
+
+ self.apply_patches([(mod, 'invoke_agent', _invoke_agent),
+ (mod, 'write_files', _write_files),
+ (mod, 'wait_for_files', _wait_for_files),
+ (mod, 'pubkeys_from_crt_files',
+ _pubkeys_from_crt_files),
+ (mod, 'iid_from_shared_config',
+ _iid_from_shared_config),
+ (mod, 'apply_hostname_bounce',
+ _apply_hostname_bounce), ])
+
+ dsrc = mod.DataSourceAzureNet(
+ data.get('sys_cfg', {}), distro=None, paths=self.paths)
+
+ return dsrc
+
+ def test_basic_seed_dir(self):
+ odata = {'HostName': "myhost", 'UserName': "myuser"}
+ data = {'ovfcontent': construct_valid_ovf_env(data=odata),
+ 'sys_cfg': {}}
+
+ dsrc = self._get_ds(data)
+ ret = dsrc.get_data()
+ self.assertTrue(ret)
+ self.assertEqual(dsrc.userdata_raw, "")
+ self.assertEqual(dsrc.metadata['local-hostname'], odata['HostName'])
+ self.assertTrue('ovf-env.xml' in data['files'])
+ self.assertEqual(0700, data['datadir_mode'])
+ self.assertEqual(dsrc.metadata['instance-id'], 'i-my-azure-id')
+
+ def test_user_cfg_set_agent_command_plain(self):
+ # set dscfg in via plaintext
+ cfg = {'agent_command': "my_command"}
+ odata = {'HostName': "myhost", 'UserName': "myuser",
+ 'dscfg': {'text': yaml.dump(cfg), 'encoding': 'plain'}}
+ data = {'ovfcontent': construct_valid_ovf_env(data=odata)}
+
+ dsrc = self._get_ds(data)
+ ret = dsrc.get_data()
+ self.assertTrue(ret)
+ self.assertEqual(data['agent_invoked'], cfg['agent_command'])
+
+ def test_user_cfg_set_agent_command(self):
+ # set dscfg in via base64 encoded yaml
+ cfg = {'agent_command': "my_command"}
+ odata = {'HostName': "myhost", 'UserName': "myuser",
+ 'dscfg': {'text': base64.b64encode(yaml.dump(cfg)),
+ 'encoding': 'base64'}}
+ data = {'ovfcontent': construct_valid_ovf_env(data=odata)}
+
+ dsrc = self._get_ds(data)
+ ret = dsrc.get_data()
+ self.assertTrue(ret)
+ self.assertEqual(data['agent_invoked'], cfg['agent_command'])
+
+ def test_sys_cfg_set_agent_command(self):
+ sys_cfg = {'datasource': {'Azure': {'agent_command': '_COMMAND'}}}
+ data = {'ovfcontent': construct_valid_ovf_env(data={}),
+ 'sys_cfg': sys_cfg}
+
+ dsrc = self._get_ds(data)
+ ret = dsrc.get_data()
+ self.assertTrue(ret)
+ self.assertEqual(data['agent_invoked'], '_COMMAND')
+
+ def test_username_used(self):
+ odata = {'HostName': "myhost", 'UserName': "myuser"}
+ data = {'ovfcontent': construct_valid_ovf_env(data=odata)}
+
+ dsrc = self._get_ds(data)
+ ret = dsrc.get_data()
+ self.assertTrue(ret)
+ self.assertEqual(dsrc.cfg['system_info']['default_user']['name'],
+ "myuser")
+
+ def test_password_given(self):
+ odata = {'HostName': "myhost", 'UserName': "myuser",
+ 'UserPassword': "mypass"}
+ data = {'ovfcontent': construct_valid_ovf_env(data=odata)}
+
+ dsrc = self._get_ds(data)
+ ret = dsrc.get_data()
+ self.assertTrue(ret)
+ self.assertTrue('default_user' in dsrc.cfg['system_info'])
+ defuser = dsrc.cfg['system_info']['default_user']
+
+ # default user should be updated username and should not be locked.
+ self.assertEqual(defuser['name'], odata['UserName'])
+ self.assertFalse(defuser['lock_passwd'])
+ # passwd is crypt formated string $id$salt$encrypted
+ # encrypting plaintext with salt value of everything up to final '$'
+ # should equal that after the '$'
+ pos = defuser['passwd'].rfind("$") + 1
+ self.assertEqual(defuser['passwd'],
+ crypt.crypt(odata['UserPassword'], defuser['passwd'][0:pos]))
+
+ def test_userdata_found(self):
+ mydata = "FOOBAR"
+ odata = {'UserData': base64.b64encode(mydata)}
+ data = {'ovfcontent': construct_valid_ovf_env(data=odata)}
+
+ dsrc = self._get_ds(data)
+ ret = dsrc.get_data()
+ self.assertTrue(ret)
+ self.assertEqual(dsrc.userdata_raw, mydata)
+
+ def test_no_datasource_expected(self):
+ #no source should be found if no seed_dir and no devs
+ data = {}
+ dsrc = self._get_ds({})
+ ret = dsrc.get_data()
+ self.assertFalse(ret)
+ self.assertFalse('agent_invoked' in data)
+
+ def test_cfg_has_pubkeys(self):
+ odata = {'HostName': "myhost", 'UserName': "myuser"}
+ mypklist = [{'fingerprint': 'fp1', 'path': 'path1'}]
+ pubkeys = [(x['fingerprint'], x['path']) for x in mypklist]
+ data = {'ovfcontent': construct_valid_ovf_env(data=odata,
+ pubkeys=pubkeys)}
+
+ dsrc = self._get_ds(data)
+ ret = dsrc.get_data()
+ self.assertTrue(ret)
+ for mypk in mypklist:
+ self.assertIn(mypk, dsrc.cfg['_pubkeys'])
+
+ def test_disabled_bounce(self):
+ pass
+
+ def test_apply_bounce_call_1(self):
+ # hostname needs to get through to apply_hostname_bounce
+ odata = {'HostName': 'my-random-hostname'}
+ data = {'ovfcontent': construct_valid_ovf_env(data=odata)}
+
+ self._get_ds(data).get_data()
+ self.assertIn('hostname', data['apply_hostname_bounce'])
+ self.assertEqual(data['apply_hostname_bounce']['hostname'],
+ odata['HostName'])
+
+ def test_apply_bounce_call_configurable(self):
+ # hostname_bounce should be configurable in datasource cfg
+ cfg = {'hostname_bounce': {'interface': 'eth1', 'policy': 'off',
+ 'command': 'my-bounce-command',
+ 'hostname_command': 'my-hostname-command'}}
+ odata = {'HostName': "xhost",
+ 'dscfg': {'text': base64.b64encode(yaml.dump(cfg)),
+ 'encoding': 'base64'}}
+ data = {'ovfcontent': construct_valid_ovf_env(data=odata)}
+ self._get_ds(data).get_data()
+
+ for k in cfg['hostname_bounce']:
+ self.assertIn(k, data['apply_hostname_bounce'])
+
+ for k, v in cfg['hostname_bounce'].items():
+ self.assertEqual(data['apply_hostname_bounce'][k], v)
+
+ def test_set_hostname_disabled(self):
+ # config specifying set_hostname off should not bounce
+ cfg = {'set_hostname': False}
+ odata = {'HostName': "xhost",
+ 'dscfg': {'text': base64.b64encode(yaml.dump(cfg)),
+ 'encoding': 'base64'}}
+ data = {'ovfcontent': construct_valid_ovf_env(data=odata)}
+ self._get_ds(data).get_data()
+
+ self.assertEqual(data.get('apply_hostname_bounce', "N/A"), "N/A")
+
+
+class TestReadAzureOvf(MockerTestCase):
+ def test_invalid_xml_raises_non_azure_ds(self):
+ invalid_xml = "<foo>" + construct_valid_ovf_env(data={})
+ self.assertRaises(DataSourceAzure.NonAzureDataSource,
+ DataSourceAzure.read_azure_ovf, invalid_xml)
+
+ def test_load_with_pubkeys(self):
+ mypklist = [{'fingerprint': 'fp1', 'path': 'path1'}]
+ pubkeys = [(x['fingerprint'], x['path']) for x in mypklist]
+ content = construct_valid_ovf_env(pubkeys=pubkeys)
+ (_md, _ud, cfg) = DataSourceAzure.read_azure_ovf(content)
+ for mypk in mypklist:
+ self.assertIn(mypk, cfg['_pubkeys'])
+
+
+class TestReadAzureSharedConfig(MockerTestCase):
+ def test_valid_content(self):
+ xml = """<?xml version="1.0" encoding="utf-8"?>
+ <SharedConfig>
+ <Deployment name="MY_INSTANCE_ID">
+ <Service name="myservice"/>
+ <ServiceInstance name="INSTANCE_ID.0" guid="{abcd-uuid}" />
+ </Deployment>
+ <Incarnation number="1"/>
+ </SharedConfig>"""
+ ret = DataSourceAzure.iid_from_shared_config_content(xml)
+ self.assertEqual("MY_INSTANCE_ID", ret)
+
+
+def apply_patches(patches):
+ ret = []
+ for (ref, name, replace) in patches:
+ if replace is None:
+ continue
+ orig = getattr(ref, name)
+ setattr(ref, name, replace)
+ ret.append((ref, name, orig))
+ return ret
diff --git a/tests/unittests/test_datasource/test_configdrive.py b/tests/unittests/test_datasource/test_configdrive.py
index 930086db..d5935294 100644
--- a/tests/unittests/test_datasource/test_configdrive.py
+++ b/tests/unittests/test_datasource/test_configdrive.py
@@ -259,8 +259,9 @@ class TestConfigDriveDataSource(MockerTestCase):
def test_find_candidates(self):
devs_with_answers = {}
- def my_devs_with(criteria):
- return devs_with_answers[criteria]
+ def my_devs_with(*args, **kwargs):
+ criteria = args[0] if len(args) else kwargs.pop('criteria', None)
+ return devs_with_answers.get(criteria, [])
def my_is_partition(dev):
return dev[-1] in "0123456789" and not dev.startswith("sr")
diff --git a/tests/unittests/test_datasource/test_maas.py b/tests/unittests/test_datasource/test_maas.py
index b56fea82..2007a6df 100644
--- a/tests/unittests/test_datasource/test_maas.py
+++ b/tests/unittests/test_datasource/test_maas.py
@@ -3,12 +3,13 @@ import os
from cloudinit.sources import DataSourceMAAS
from cloudinit import url_helper
+from cloudinit import util
from tests.unittests.helpers import populate_dir
-from mocker import MockerTestCase
+import mocker
-class TestMAASDataSource(MockerTestCase):
+class TestMAASDataSource(mocker.MockerTestCase):
def setUp(self):
super(TestMAASDataSource, self).setUp()
@@ -115,9 +116,12 @@ class TestMAASDataSource(MockerTestCase):
for key in valid_order:
url = "%s/%s/%s" % (my_seed, my_ver, key)
- mock_request(url, headers=my_headers, timeout=None)
+ mock_request(url, headers=None, timeout=mocker.ANY,
+ data=mocker.ANY, sec_between=mocker.ANY,
+ ssl_details=mocker.ANY, retries=mocker.ANY,
+ headers_cb=my_headers_cb)
resp = valid.get(key)
- self.mocker.result(url_helper.UrlResponse(200, resp))
+ self.mocker.result(util.StringResponse(resp))
self.mocker.replay()
(userdata, metadata) = DataSourceMAAS.read_maas_seed_url(my_seed,
diff --git a/tests/unittests/test_datasource/test_nocloud.py b/tests/unittests/test_datasource/test_nocloud.py
index 28e0a472..7328b240 100644
--- a/tests/unittests/test_datasource/test_nocloud.py
+++ b/tests/unittests/test_datasource/test_nocloud.py
@@ -1,7 +1,7 @@
from cloudinit import helpers
-from tests.unittests.helpers import populate_dir
from cloudinit.sources import DataSourceNoCloud
from cloudinit import util
+from tests.unittests.helpers import populate_dir
from mocker import MockerTestCase
import os
@@ -22,7 +22,7 @@ class TestNoCloudDataSource(MockerTestCase):
def tearDown(self):
apply_patches([i for i in reversed(self.unapply)])
- super(TestNoCloudDataSource, self).setUp()
+ super(TestNoCloudDataSource, self).tearDown()
def apply_patches(self, patches):
ret = apply_patches(patches)
diff --git a/tests/unittests/test_datasource/test_smartos.py b/tests/unittests/test_datasource/test_smartos.py
new file mode 100644
index 00000000..f53715b0
--- /dev/null
+++ b/tests/unittests/test_datasource/test_smartos.py
@@ -0,0 +1,273 @@
+# vi: ts=4 expandtab
+#
+# Copyright (C) 2013 Canonical Ltd.
+#
+# Author: Ben Howard <ben.howard@canonical.com>
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License version 3, as
+# published by the Free Software Foundation.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+#
+# This is a testcase for the SmartOS datasource. It replicates a serial
+# console and acts like the SmartOS console does in order to validate
+# return responses.
+#
+
+import base64
+from cloudinit import helpers
+from cloudinit.sources import DataSourceSmartOS
+
+from mocker import MockerTestCase
+import uuid
+
+MOCK_RETURNS = {
+ 'hostname': 'test-host',
+ 'root_authorized_keys': 'ssh-rsa AAAAB3Nz...aC1yc2E= keyname',
+ 'disable_iptables_flag': None,
+ 'enable_motd_sys_info': None,
+ 'test-var1': 'some data',
+ 'user-data': '\n'.join(['#!/bin/sh', '/bin/true', '']),
+}
+
+DMI_DATA_RETURN = (str(uuid.uuid4()), 'smartdc')
+
+
+class MockSerial(object):
+ """Fake a serial terminal for testing the code that
+ interfaces with the serial"""
+
+ port = None
+
+ def __init__(self, mockdata):
+ self.last = None
+ self.last = None
+ self.new = True
+ self.count = 0
+ self.mocked_out = []
+ self.mockdata = mockdata
+
+ def open(self):
+ return True
+
+ def close(self):
+ return True
+
+ def isOpen(self):
+ return True
+
+ def write(self, line):
+ line = line.replace('GET ', '')
+ self.last = line.rstrip()
+
+ def readline(self):
+ if self.new:
+ self.new = False
+ if self.last in self.mockdata:
+ return 'SUCCESS\n'
+ else:
+ return 'NOTFOUND %s\n' % self.last
+
+ if self.last in self.mockdata:
+ if not self.mocked_out:
+ self.mocked_out = [x for x in self._format_out()]
+ print self.mocked_out
+
+ if len(self.mocked_out) > self.count:
+ self.count += 1
+ return self.mocked_out[self.count - 1]
+
+ def _format_out(self):
+ if self.last in self.mockdata:
+ _mret = self.mockdata[self.last]
+ try:
+ for l in _mret.splitlines():
+ yield "%s\n" % l.rstrip()
+ except:
+ yield "%s\n" % _mret.rstrip()
+
+ yield '.'
+ yield '\n'
+
+
+class TestSmartOSDataSource(MockerTestCase):
+ def setUp(self):
+ # makeDir comes from MockerTestCase
+ self.tmp = self.makeDir()
+
+ # patch cloud_dir, so our 'seed_dir' is guaranteed empty
+ self.paths = helpers.Paths({'cloud_dir': self.tmp})
+
+ self.unapply = []
+ super(TestSmartOSDataSource, self).setUp()
+
+ def tearDown(self):
+ apply_patches([i for i in reversed(self.unapply)])
+ super(TestSmartOSDataSource, self).tearDown()
+
+ def apply_patches(self, patches):
+ ret = apply_patches(patches)
+ self.unapply += ret
+
+ def _get_ds(self, sys_cfg=None, ds_cfg=None, mockdata=None, dmi_data=None):
+ mod = DataSourceSmartOS
+
+ if mockdata is None:
+ mockdata = MOCK_RETURNS
+
+ if dmi_data is None:
+ dmi_data = DMI_DATA_RETURN
+
+ def _get_serial(*_):
+ return MockSerial(mockdata)
+
+ def _dmi_data():
+ return dmi_data
+
+ if sys_cfg is None:
+ sys_cfg = {}
+
+ if ds_cfg is not None:
+ sys_cfg['datasource'] = sys_cfg.get('datasource', {})
+ sys_cfg['datasource']['SmartOS'] = ds_cfg
+
+ self.apply_patches([(mod, 'get_serial', _get_serial)])
+ self.apply_patches([(mod, 'dmi_data', _dmi_data)])
+ dsrc = mod.DataSourceSmartOS(sys_cfg, distro=None,
+ paths=self.paths)
+ return dsrc
+
+ def test_seed(self):
+ # default seed should be /dev/ttyS1
+ dsrc = self._get_ds()
+ ret = dsrc.get_data()
+ self.assertTrue(ret)
+ self.assertEquals('/dev/ttyS1', dsrc.seed)
+
+ def test_issmartdc(self):
+ dsrc = self._get_ds()
+ ret = dsrc.get_data()
+ self.assertTrue(ret)
+ self.assertTrue(dsrc.is_smartdc)
+
+ def test_no_base64(self):
+ ds_cfg = {'no_base64_decode': ['test_var1'], 'all_base': True}
+ dsrc = self._get_ds(ds_cfg=ds_cfg)
+ ret = dsrc.get_data()
+ self.assertTrue(ret)
+
+ def test_uuid(self):
+ dsrc = self._get_ds(mockdata=MOCK_RETURNS)
+ ret = dsrc.get_data()
+ self.assertTrue(ret)
+ self.assertEquals(DMI_DATA_RETURN[0], dsrc.metadata['instance-id'])
+
+ def test_root_keys(self):
+ dsrc = self._get_ds(mockdata=MOCK_RETURNS)
+ ret = dsrc.get_data()
+ self.assertTrue(ret)
+ self.assertEquals(MOCK_RETURNS['root_authorized_keys'],
+ dsrc.metadata['public-keys'])
+
+ def test_hostname_b64(self):
+ dsrc = self._get_ds(mockdata=MOCK_RETURNS)
+ ret = dsrc.get_data()
+ self.assertTrue(ret)
+ self.assertEquals(MOCK_RETURNS['hostname'],
+ dsrc.metadata['local-hostname'])
+
+ def test_hostname(self):
+ dsrc = self._get_ds(mockdata=MOCK_RETURNS)
+ ret = dsrc.get_data()
+ self.assertTrue(ret)
+ self.assertEquals(MOCK_RETURNS['hostname'],
+ dsrc.metadata['local-hostname'])
+
+ def test_base64_all(self):
+ # metadata provided base64_all of true
+ my_returns = MOCK_RETURNS.copy()
+ my_returns['base64_all'] = "true"
+ for k in ('hostname', 'user-data'):
+ my_returns[k] = base64.b64encode(my_returns[k])
+
+ dsrc = self._get_ds(mockdata=my_returns)
+ ret = dsrc.get_data()
+ self.assertTrue(ret)
+ self.assertEquals(MOCK_RETURNS['hostname'],
+ dsrc.metadata['local-hostname'])
+ self.assertEquals(MOCK_RETURNS['user-data'],
+ dsrc.userdata_raw)
+ self.assertEquals(MOCK_RETURNS['root_authorized_keys'],
+ dsrc.metadata['public-keys'])
+ self.assertEquals(MOCK_RETURNS['disable_iptables_flag'],
+ dsrc.metadata['iptables_disable'])
+ self.assertEquals(MOCK_RETURNS['enable_motd_sys_info'],
+ dsrc.metadata['motd_sys_info'])
+
+ def test_b64_userdata(self):
+ my_returns = MOCK_RETURNS.copy()
+ my_returns['b64-user-data'] = "true"
+ my_returns['b64-hostname'] = "true"
+ for k in ('hostname', 'user-data'):
+ my_returns[k] = base64.b64encode(my_returns[k])
+
+ dsrc = self._get_ds(mockdata=my_returns)
+ ret = dsrc.get_data()
+ self.assertTrue(ret)
+ self.assertEquals(MOCK_RETURNS['hostname'],
+ dsrc.metadata['local-hostname'])
+ self.assertEquals(MOCK_RETURNS['user-data'], dsrc.userdata_raw)
+ self.assertEquals(MOCK_RETURNS['root_authorized_keys'],
+ dsrc.metadata['public-keys'])
+
+ def test_b64_keys(self):
+ my_returns = MOCK_RETURNS.copy()
+ my_returns['base64_keys'] = 'hostname,ignored'
+ for k in ('hostname',):
+ my_returns[k] = base64.b64encode(my_returns[k])
+
+ dsrc = self._get_ds(mockdata=my_returns)
+ ret = dsrc.get_data()
+ self.assertTrue(ret)
+ self.assertEquals(MOCK_RETURNS['hostname'],
+ dsrc.metadata['local-hostname'])
+ self.assertEquals(MOCK_RETURNS['user-data'], dsrc.userdata_raw)
+
+ def test_userdata(self):
+ dsrc = self._get_ds(mockdata=MOCK_RETURNS)
+ ret = dsrc.get_data()
+ self.assertTrue(ret)
+ self.assertEquals(MOCK_RETURNS['user-data'], dsrc.userdata_raw)
+
+ def test_disable_iptables_flag(self):
+ dsrc = self._get_ds(mockdata=MOCK_RETURNS)
+ ret = dsrc.get_data()
+ self.assertTrue(ret)
+ self.assertEquals(MOCK_RETURNS['disable_iptables_flag'],
+ dsrc.metadata['iptables_disable'])
+
+ def test_motd_sys_info(self):
+ dsrc = self._get_ds(mockdata=MOCK_RETURNS)
+ ret = dsrc.get_data()
+ self.assertTrue(ret)
+ self.assertEquals(MOCK_RETURNS['enable_motd_sys_info'],
+ dsrc.metadata['motd_sys_info'])
+
+
+def apply_patches(patches):
+ ret = []
+ for (ref, name, replace) in patches:
+ if replace is None:
+ continue
+ orig = getattr(ref, name)
+ setattr(ref, name, replace)
+ ret.append((ref, name, orig))
+ return ret
diff --git a/tests/unittests/test_handler/test_handler_apt_configure.py b/tests/unittests/test_handler/test_handler_apt_configure.py
new file mode 100644
index 00000000..203dd2aa
--- /dev/null
+++ b/tests/unittests/test_handler/test_handler_apt_configure.py
@@ -0,0 +1,106 @@
+from mocker import MockerTestCase
+
+from cloudinit import util
+
+from cloudinit.config import cc_apt_configure
+
+import os
+import re
+
+
+class TestAptProxyConfig(MockerTestCase):
+ def setUp(self):
+ super(TestAptProxyConfig, self).setUp()
+ self.tmp = self.makeDir()
+ self.pfile = os.path.join(self.tmp, "proxy.cfg")
+ self.cfile = os.path.join(self.tmp, "config.cfg")
+
+ def _search_apt_config(self, contents, ptype, value):
+ print(
+ r"acquire::%s::proxy\s+[\"']%s[\"'];\n" % (ptype, value),
+ contents, "flags=re.IGNORECASE")
+ return(re.search(
+ r"acquire::%s::proxy\s+[\"']%s[\"'];\n" % (ptype, value),
+ contents, flags=re.IGNORECASE))
+
+ def test_apt_proxy_written(self):
+ cfg = {'apt_proxy': 'myproxy'}
+ cc_apt_configure.apply_apt_config(cfg, self.pfile, self.cfile)
+
+ self.assertTrue(os.path.isfile(self.pfile))
+ self.assertFalse(os.path.isfile(self.cfile))
+
+ contents = str(util.read_file_or_url(self.pfile))
+ self.assertTrue(self._search_apt_config(contents, "http", "myproxy"))
+
+ def test_apt_http_proxy_written(self):
+ cfg = {'apt_http_proxy': 'myproxy'}
+ cc_apt_configure.apply_apt_config(cfg, self.pfile, self.cfile)
+
+ self.assertTrue(os.path.isfile(self.pfile))
+ self.assertFalse(os.path.isfile(self.cfile))
+
+ contents = str(util.read_file_or_url(self.pfile))
+ self.assertTrue(self._search_apt_config(contents, "http", "myproxy"))
+
+ def test_apt_all_proxy_written(self):
+ cfg = {'apt_http_proxy': 'myproxy_http_proxy',
+ 'apt_https_proxy': 'myproxy_https_proxy',
+ 'apt_ftp_proxy': 'myproxy_ftp_proxy'}
+
+ values = {'http': cfg['apt_http_proxy'],
+ 'https': cfg['apt_https_proxy'],
+ 'ftp': cfg['apt_ftp_proxy'],
+ }
+
+ cc_apt_configure.apply_apt_config(cfg, self.pfile, self.cfile)
+
+ self.assertTrue(os.path.isfile(self.pfile))
+ self.assertFalse(os.path.isfile(self.cfile))
+
+ contents = str(util.read_file_or_url(self.pfile))
+
+ for ptype, pval in values.iteritems():
+ self.assertTrue(self._search_apt_config(contents, ptype, pval))
+
+ def test_proxy_deleted(self):
+ util.write_file(self.cfile, "content doesnt matter")
+ cc_apt_configure.apply_apt_config({}, self.pfile, self.cfile)
+ self.assertFalse(os.path.isfile(self.pfile))
+ self.assertFalse(os.path.isfile(self.cfile))
+
+ def test_proxy_replaced(self):
+ util.write_file(self.cfile, "content doesnt matter")
+ cc_apt_configure.apply_apt_config({'apt_proxy': "foo"},
+ self.pfile, self.cfile)
+ self.assertTrue(os.path.isfile(self.pfile))
+ contents = str(util.read_file_or_url(self.pfile))
+ self.assertTrue(self._search_apt_config(contents, "http", "foo"))
+
+ def test_config_written(self):
+ payload = 'this is my apt config'
+ cfg = {'apt_config': payload}
+
+ cc_apt_configure.apply_apt_config(cfg, self.pfile, self.cfile)
+
+ self.assertTrue(os.path.isfile(self.cfile))
+ self.assertFalse(os.path.isfile(self.pfile))
+
+ self.assertEqual(str(util.read_file_or_url(self.cfile)), payload)
+
+ def test_config_replaced(self):
+ util.write_file(self.pfile, "content doesnt matter")
+ cc_apt_configure.apply_apt_config({'apt_config': "foo"},
+ self.pfile, self.cfile)
+ self.assertTrue(os.path.isfile(self.cfile))
+ self.assertEqual(str(util.read_file_or_url(self.cfile)), "foo")
+
+ def test_config_deleted(self):
+ # if no 'apt_config' is provided, delete any previously written file
+ util.write_file(self.pfile, "content doesnt matter")
+ cc_apt_configure.apply_apt_config({}, self.pfile, self.cfile)
+ self.assertFalse(os.path.isfile(self.pfile))
+ self.assertFalse(os.path.isfile(self.cfile))
+
+
+# vi: ts=4 expandtab
diff --git a/tests/unittests/test_handler/test_handler_growpart.py b/tests/unittests/test_handler/test_handler_growpart.py
new file mode 100644
index 00000000..c0497e08
--- /dev/null
+++ b/tests/unittests/test_handler/test_handler_growpart.py
@@ -0,0 +1,258 @@
+from mocker import MockerTestCase
+
+from cloudinit import cloud
+from cloudinit import util
+
+from cloudinit.config import cc_growpart
+
+import errno
+import logging
+import os
+import re
+import unittest
+
+# growpart:
+# mode: auto # off, on, auto, 'growpart', 'parted'
+# devices: ['root']
+
+HELP_PARTED_NO_RESIZE = """
+Usage: parted [OPTION]... [DEVICE [COMMAND [PARAMETERS]...]...]
+Apply COMMANDs with PARAMETERS to DEVICE. If no COMMAND(s) are given, run in
+interactive mode.
+
+OPTIONs:
+<SNIP>
+
+COMMANDs:
+<SNIP>
+ quit exit program
+ rescue START END rescue a lost partition near START
+ and END
+ resize NUMBER START END resize partition NUMBER and its file
+ system
+ rm NUMBER delete partition NUMBER
+<SNIP>
+Report bugs to bug-parted@gnu.org
+"""
+
+HELP_PARTED_RESIZE = """
+Usage: parted [OPTION]... [DEVICE [COMMAND [PARAMETERS]...]...]
+Apply COMMANDs with PARAMETERS to DEVICE. If no COMMAND(s) are given, run in
+interactive mode.
+
+OPTIONs:
+<SNIP>
+
+COMMANDs:
+<SNIP>
+ quit exit program
+ rescue START END rescue a lost partition near START
+ and END
+ resize NUMBER START END resize partition NUMBER and its file
+ system
+ resizepart NUMBER END resize partition NUMBER
+ rm NUMBER delete partition NUMBER
+<SNIP>
+Report bugs to bug-parted@gnu.org
+"""
+
+HELP_GROWPART_RESIZE = """
+growpart disk partition
+ rewrite partition table so that partition takes up all the space it can
+ options:
+ -h | --help print Usage and exit
+<SNIP>
+ -u | --update R update the the kernel partition table info after growing
+ this requires kernel support and 'partx --update'
+ R is one of:
+ - 'auto' : [default] update partition if possible
+<SNIP>
+ Example:
+ - growpart /dev/sda 1
+ Resize partition 1 on /dev/sda
+"""
+
+HELP_GROWPART_NO_RESIZE = """
+growpart disk partition
+ rewrite partition table so that partition takes up all the space it can
+ options:
+ -h | --help print Usage and exit
+<SNIP>
+ Example:
+ - growpart /dev/sda 1
+ Resize partition 1 on /dev/sda
+"""
+
+
+class TestDisabled(MockerTestCase):
+ def setUp(self):
+ super(TestDisabled, self).setUp()
+ self.name = "growpart"
+ self.cloud_init = None
+ self.log = logging.getLogger("TestDisabled")
+ self.args = []
+
+ self.handle = cc_growpart.handle
+
+ def test_mode_off(self):
+ #Test that nothing is done if mode is off.
+
+ # this really only verifies that resizer_factory isn't called
+ config = {'growpart': {'mode': 'off'}}
+ self.mocker.replace(cc_growpart.resizer_factory,
+ passthrough=False)
+ self.mocker.replay()
+
+ self.handle(self.name, config, self.cloud_init, self.log, self.args)
+
+
+class TestConfig(MockerTestCase):
+ def setUp(self):
+ super(TestConfig, self).setUp()
+ self.name = "growpart"
+ self.paths = None
+ self.cloud = cloud.Cloud(None, self.paths, None, None, None)
+ self.log = logging.getLogger("TestConfig")
+ self.args = []
+ os.environ = {}
+
+ self.cloud_init = None
+ self.handle = cc_growpart.handle
+
+ # Order must be correct
+ self.mocker.order()
+
+ @unittest.skip("until LP: #1212444 fixed")
+ def test_no_resizers_auto_is_fine(self):
+ subp = self.mocker.replace(util.subp, passthrough=False)
+ subp(['parted', '--help'], env={'LANG': 'C'})
+ self.mocker.result((HELP_PARTED_NO_RESIZE, ""))
+ subp(['growpart', '--help'], env={'LANG': 'C'})
+ self.mocker.result((HELP_GROWPART_NO_RESIZE, ""))
+ self.mocker.replay()
+
+ config = {'growpart': {'mode': 'auto'}}
+ self.handle(self.name, config, self.cloud_init, self.log, self.args)
+
+ def test_no_resizers_mode_growpart_is_exception(self):
+ subp = self.mocker.replace(util.subp, passthrough=False)
+ subp(['growpart', '--help'], env={'LANG': 'C'})
+ self.mocker.result((HELP_GROWPART_NO_RESIZE, ""))
+ self.mocker.replay()
+
+ config = {'growpart': {'mode': "growpart"}}
+ self.assertRaises(ValueError, self.handle, self.name, config,
+ self.cloud_init, self.log, self.args)
+
+ @unittest.skip("until LP: #1212444 fixed")
+ def test_mode_auto_prefers_parted(self):
+ subp = self.mocker.replace(util.subp, passthrough=False)
+ subp(['parted', '--help'], env={'LANG': 'C'})
+ self.mocker.result((HELP_PARTED_RESIZE, ""))
+ self.mocker.replay()
+
+ ret = cc_growpart.resizer_factory(mode="auto")
+ self.assertTrue(isinstance(ret, cc_growpart.ResizeParted))
+
+ def test_handle_with_no_growpart_entry(self):
+ #if no 'growpart' entry in config, then mode=auto should be used
+
+ myresizer = object()
+
+ factory = self.mocker.replace(cc_growpart.resizer_factory,
+ passthrough=False)
+ rsdevs = self.mocker.replace(cc_growpart.resize_devices,
+ passthrough=False)
+ factory("auto")
+ self.mocker.result(myresizer)
+ rsdevs(myresizer, ["/"])
+ self.mocker.result((("/", cc_growpart.RESIZE.CHANGED, "my-message",),))
+ self.mocker.replay()
+
+ try:
+ orig_resizers = cc_growpart.RESIZERS
+ cc_growpart.RESIZERS = (('mysizer', object),)
+ self.handle(self.name, {}, self.cloud_init, self.log, self.args)
+ finally:
+ cc_growpart.RESIZERS = orig_resizers
+
+
+class TestResize(MockerTestCase):
+ def setUp(self):
+ super(TestResize, self).setUp()
+ self.name = "growpart"
+ self.log = logging.getLogger("TestResize")
+
+ # Order must be correct
+ self.mocker.order()
+
+ def test_simple_devices(self):
+ #test simple device list
+ # this patches out devent2dev, os.stat, and device_part_info
+ # so in the end, doesn't test a lot
+ devs = ["/dev/XXda1", "/dev/YYda2"]
+ devstat_ret = Bunch(st_mode=25008, st_ino=6078, st_dev=5L,
+ st_nlink=1, st_uid=0, st_gid=6, st_size=0,
+ st_atime=0, st_mtime=0, st_ctime=0)
+ enoent = ["/dev/NOENT"]
+ real_stat = os.stat
+ resize_calls = []
+
+ class myresizer(object):
+ def resize(self, diskdev, partnum, partdev):
+ resize_calls.append((diskdev, partnum, partdev))
+ if partdev == "/dev/YYda2":
+ return (1024, 2048)
+ return (1024, 1024) # old size, new size
+
+ def mystat(path):
+ if path in devs:
+ return devstat_ret
+ if path in enoent:
+ e = OSError("%s: does not exist" % path)
+ e.errno = errno.ENOENT
+ raise e
+ return real_stat(path)
+
+ try:
+ opinfo = cc_growpart.device_part_info
+ cc_growpart.device_part_info = simple_device_part_info
+ os.stat = mystat
+
+ resized = cc_growpart.resize_devices(myresizer(), devs + enoent)
+
+ def find(name, res):
+ for f in res:
+ if f[0] == name:
+ return f
+ return None
+
+ self.assertEqual(cc_growpart.RESIZE.NOCHANGE,
+ find("/dev/XXda1", resized)[1])
+ self.assertEqual(cc_growpart.RESIZE.CHANGED,
+ find("/dev/YYda2", resized)[1])
+ self.assertEqual(cc_growpart.RESIZE.SKIPPED,
+ find(enoent[0], resized)[1])
+ #self.assertEqual(resize_calls,
+ #[("/dev/XXda", "1", "/dev/XXda1"),
+ #("/dev/YYda", "2", "/dev/YYda2")])
+ finally:
+ cc_growpart.device_part_info = opinfo
+ os.stat = real_stat
+
+
+def simple_device_part_info(devpath):
+ # simple stupid return (/dev/vda, 1) for /dev/vda
+ ret = re.search("([^0-9]*)([0-9]*)$", devpath)
+ x = (ret.group(1), ret.group(2))
+ return x
+
+
+class Bunch:
+ st_mode = None # fix pylint complaint
+
+ def __init__(self, **kwds):
+ self.__dict__.update(kwds)
+
+
+# vi: ts=4 expandtab
diff --git a/tests/unittests/test_handler/test_handler_locale.py b/tests/unittests/test_handler/test_handler_locale.py
new file mode 100644
index 00000000..72ad00fd
--- /dev/null
+++ b/tests/unittests/test_handler/test_handler_locale.py
@@ -0,0 +1,64 @@
+# Copyright (C) 2013 Hewlett-Packard Development Company, L.P.
+#
+# Author: Juerg Haefliger <juerg.haefliger@hp.com>
+#
+# Based on test_handler_set_hostname.py
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License version 3, as
+# published by the Free Software Foundation.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+from cloudinit.config import cc_locale
+
+from cloudinit import cloud
+from cloudinit import distros
+from cloudinit import helpers
+from cloudinit import util
+
+from cloudinit.sources import DataSourceNoCloud
+
+from tests.unittests import helpers as t_help
+
+from configobj import ConfigObj
+
+from StringIO import StringIO
+
+import logging
+
+LOG = logging.getLogger(__name__)
+
+
+class TestLocale(t_help.FilesystemMockingTestCase):
+ def setUp(self):
+ super(TestLocale, self).setUp()
+ self.new_root = self.makeDir(prefix="unittest_")
+
+ def _get_cloud(self, distro):
+ self.patchUtils(self.new_root)
+ paths = helpers.Paths({})
+
+ cls = distros.fetch(distro)
+ d = cls(distro, {}, paths)
+ ds = DataSourceNoCloud.DataSourceNoCloud({}, d, paths)
+ cc = cloud.Cloud(ds, paths, {}, d, None)
+ return cc
+
+ def test_set_locale_sles(self):
+
+ cfg = {
+ 'locale': 'My.Locale',
+ }
+ cc = self._get_cloud('sles')
+ cc_locale.handle('cc_locale', cfg, cc, LOG, [])
+
+ contents = util.load_file('/etc/sysconfig/language')
+ n_cfg = ConfigObj(StringIO(contents))
+ self.assertEquals({'RC_LANG': cfg['locale']}, dict(n_cfg))
diff --git a/tests/unittests/test_handler/test_handler_set_hostname.py b/tests/unittests/test_handler/test_handler_set_hostname.py
index a1aba62f..6344ec0c 100644
--- a/tests/unittests/test_handler/test_handler_set_hostname.py
+++ b/tests/unittests/test_handler/test_handler_set_hostname.py
@@ -35,7 +35,6 @@ class TestHostname(t_help.FilesystemMockingTestCase):
ds = None
cc = cloud.Cloud(ds, paths, {}, distro, None)
self.patchUtils(self.tmp)
- self.patchOS(self.tmp)
cc_set_hostname.handle('cc_set_hostname',
cfg, cc, LOG, [])
contents = util.load_file("/etc/sysconfig/network")
@@ -56,3 +55,16 @@ class TestHostname(t_help.FilesystemMockingTestCase):
cfg, cc, LOG, [])
contents = util.load_file("/etc/hostname")
self.assertEquals('blah', contents.strip())
+
+ def test_write_hostname_sles(self):
+ cfg = {
+ 'hostname': 'blah.blah.blah.suse.com',
+ }
+ distro = self._fetch_distro('sles')
+ paths = helpers.Paths({})
+ ds = None
+ cc = cloud.Cloud(ds, paths, {}, distro, None)
+ self.patchUtils(self.tmp)
+ cc_set_hostname.handle('cc_set_hostname', cfg, cc, LOG, [])
+ contents = util.load_file("/etc/HOSTNAME")
+ self.assertEquals('blah', contents.strip())
diff --git a/tests/unittests/test_handler/test_handler_timezone.py b/tests/unittests/test_handler/test_handler_timezone.py
new file mode 100644
index 00000000..40b69773
--- /dev/null
+++ b/tests/unittests/test_handler/test_handler_timezone.py
@@ -0,0 +1,75 @@
+# Copyright (C) 2013 Hewlett-Packard Development Company, L.P.
+#
+# Author: Juerg Haefliger <juerg.haefliger@hp.com>
+#
+# Based on test_handler_set_hostname.py
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License version 3, as
+# published by the Free Software Foundation.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+from cloudinit.config import cc_timezone
+
+from cloudinit import cloud
+from cloudinit import distros
+from cloudinit import helpers
+from cloudinit import util
+
+from cloudinit.sources import DataSourceNoCloud
+
+from tests.unittests import helpers as t_help
+
+from configobj import ConfigObj
+
+from StringIO import StringIO
+
+import logging
+
+LOG = logging.getLogger(__name__)
+
+
+class TestTimezone(t_help.FilesystemMockingTestCase):
+ def setUp(self):
+ super(TestTimezone, self).setUp()
+ self.new_root = self.makeDir(prefix="unittest_")
+
+ def _get_cloud(self, distro):
+ self.patchUtils(self.new_root)
+ self.patchOS(self.new_root)
+
+ paths = helpers.Paths({})
+
+ cls = distros.fetch(distro)
+ d = cls(distro, {}, paths)
+ ds = DataSourceNoCloud.DataSourceNoCloud({}, d, paths)
+ cc = cloud.Cloud(ds, paths, {}, d, None)
+ return cc
+
+ def test_set_timezone_sles(self):
+
+ cfg = {
+ 'timezone': 'Tatooine/Bestine',
+ }
+ cc = self._get_cloud('sles')
+
+ # Create a dummy timezone file
+ dummy_contents = '0123456789abcdefgh'
+ util.write_file('/usr/share/zoneinfo/%s' % cfg['timezone'],
+ dummy_contents)
+
+ cc_timezone.handle('cc_timezone', cfg, cc, LOG, [])
+
+ contents = util.load_file('/etc/sysconfig/clock')
+ n_cfg = ConfigObj(StringIO(contents))
+ self.assertEquals({'TIMEZONE': cfg['timezone']}, dict(n_cfg))
+
+ contents = util.load_file('/etc/localtime')
+ self.assertEquals(dummy_contents, contents.strip())
diff --git a/tests/unittests/test_merging.py b/tests/unittests/test_merging.py
index 0037b966..486b9158 100644
--- a/tests/unittests/test_merging.py
+++ b/tests/unittests/test_merging.py
@@ -1,62 +1,255 @@
-from mocker import MockerTestCase
+from tests.unittests import helpers
+from cloudinit.handlers import cloud_config
+from cloudinit.handlers import (CONTENT_START, CONTENT_END)
+
+from cloudinit import helpers as c_helpers
from cloudinit import util
+import collections
+import glob
+import os
+import random
+import re
+import string # pylint: disable=W0402
+
+SOURCE_PAT = "source*.*yaml"
+EXPECTED_PAT = "expected%s.yaml"
+TYPES = [long, int, dict, str, list, tuple, None]
+
+
+def _old_mergedict(src, cand):
+ """
+ Merge values from C{cand} into C{src}.
+ If C{src} has a key C{cand} will not override.
+ Nested dictionaries are merged recursively.
+ """
+ if isinstance(src, dict) and isinstance(cand, dict):
+ for (k, v) in cand.iteritems():
+ if k not in src:
+ src[k] = v
+ else:
+ src[k] = _old_mergedict(src[k], v)
+ return src
+
+
+def _old_mergemanydict(*args):
+ out = {}
+ for a in args:
+ out = _old_mergedict(out, a)
+ return out
+
+
+def _random_str(rand):
+ base = ''
+ for _i in xrange(rand.randint(1, 2 ** 8)):
+ base += rand.choice(string.letters + string.digits)
+ return base
+
+
+class _NoMoreException(Exception):
+ pass
+
+
+def _make_dict(current_depth, max_depth, rand):
+ if current_depth >= max_depth:
+ raise _NoMoreException()
+ if current_depth == 0:
+ t = dict
+ else:
+ t = rand.choice(TYPES)
+ base = None
+ if t in [None]:
+ return base
+ if t in [dict, list, tuple]:
+ if t in [dict]:
+ amount = rand.randint(0, 5)
+ keys = [_random_str(rand) for _i in xrange(0, amount)]
+ base = {}
+ for k in keys:
+ try:
+ base[k] = _make_dict(current_depth + 1, max_depth, rand)
+ except _NoMoreException:
+ pass
+ elif t in [list, tuple]:
+ base = []
+ amount = rand.randint(0, 5)
+ for _i in xrange(0, amount):
+ try:
+ base.append(_make_dict(current_depth + 1, max_depth, rand))
+ except _NoMoreException:
+ pass
+ if t in [tuple]:
+ base = tuple(base)
+ elif t in [long, int]:
+ base = rand.randint(0, 2 ** 8)
+ elif t in [str]:
+ base = _random_str(rand)
+ return base
+
+
+def make_dict(max_depth, seed=None):
+ max_depth = max(1, max_depth)
+ rand = random.Random(seed)
+ return _make_dict(0, max_depth, rand)
+
+
+class TestSimpleRun(helpers.ResourceUsingTestCase):
+ def _load_merge_files(self):
+ merge_root = self.resourceLocation('merge_sources')
+ tests = []
+ source_ids = collections.defaultdict(list)
+ expected_files = {}
+ for fn in glob.glob(os.path.join(merge_root, SOURCE_PAT)):
+ base_fn = os.path.basename(fn)
+ file_id = re.match(r"source(\d+)\-(\d+)[.]yaml", base_fn)
+ if not file_id:
+ raise IOError("File %s does not have a numeric identifier"
+ % (fn))
+ file_id = int(file_id.group(1))
+ source_ids[file_id].append(fn)
+ expected_fn = os.path.join(merge_root, EXPECTED_PAT % (file_id))
+ if not os.path.isfile(expected_fn):
+ raise IOError("No expected file found at %s" % (expected_fn))
+ expected_files[file_id] = expected_fn
+ for i in sorted(source_ids.keys()):
+ source_file_contents = []
+ for fn in sorted(source_ids[i]):
+ source_file_contents.append([fn, util.load_file(fn)])
+ expected = util.load_yaml(util.load_file(expected_files[i]))
+ entry = [source_file_contents, [expected, expected_files[i]]]
+ tests.append(entry)
+ return tests
+
+ def test_seed_runs(self):
+ test_dicts = []
+ for i in range(1, 50):
+ base_dicts = []
+ for j in range(1, 50):
+ base_dicts.append(make_dict(5, i * j))
+ test_dicts.append(base_dicts)
+ for test in test_dicts:
+ c = _old_mergemanydict(*test)
+ d = util.mergemanydict(test)
+ self.assertEquals(c, d)
+
+ def test_merge_cc_samples(self):
+ tests = self._load_merge_files()
+ paths = c_helpers.Paths({})
+ cc_handler = cloud_config.CloudConfigPartHandler(paths)
+ cc_handler.cloud_fn = None
+ for (payloads, (expected_merge, expected_fn)) in tests:
+ cc_handler.handle_part(None, CONTENT_START, None,
+ None, None, None)
+ merging_fns = []
+ for (fn, contents) in payloads:
+ cc_handler.handle_part(None, None, "%s.yaml" % (fn),
+ contents, None, {})
+ merging_fns.append(fn)
+ merged_buf = cc_handler.cloud_buf
+ cc_handler.handle_part(None, CONTENT_END, None,
+ None, None, None)
+ fail_msg = "Equality failure on checking %s with %s: %s != %s"
+ fail_msg = fail_msg % (expected_fn,
+ ",".join(merging_fns), merged_buf,
+ expected_merge)
+ self.assertEquals(expected_merge, merged_buf, msg=fail_msg)
+
+ def test_compat_merges_dict(self):
+ a = {
+ '1': '2',
+ 'b': 'c',
+ }
+ b = {
+ 'b': 'e',
+ }
+ c = _old_mergedict(a, b)
+ d = util.mergemanydict([a, b])
+ self.assertEquals(c, d)
+
+ def test_compat_merges_dict2(self):
+ a = {
+ 'Blah': 1,
+ 'Blah2': 2,
+ 'Blah3': 3,
+ }
+ b = {
+ 'Blah': 1,
+ 'Blah2': 2,
+ 'Blah3': [1],
+ }
+ c = _old_mergedict(a, b)
+ d = util.mergemanydict([a, b])
+ self.assertEquals(c, d)
+
+ def test_compat_merges_list(self):
+ a = {'b': [1, 2, 3]}
+ b = {'b': [4, 5]}
+ c = {'b': [6, 7]}
+ e = _old_mergemanydict(a, b, c)
+ f = util.mergemanydict([a, b, c])
+ self.assertEquals(e, f)
+
+ def test_compat_merges_str(self):
+ a = {'b': "hi"}
+ b = {'b': "howdy"}
+ c = {'b': "hallo"}
+ e = _old_mergemanydict(a, b, c)
+ f = util.mergemanydict([a, b, c])
+ self.assertEquals(e, f)
+
+ def test_compat_merge_sub_dict(self):
+ a = {
+ '1': '2',
+ 'b': {
+ 'f': 'g',
+ 'e': 'c',
+ 'h': 'd',
+ 'hh': {
+ '1': 2,
+ },
+ }
+ }
+ b = {
+ 'b': {
+ 'e': 'c',
+ 'hh': {
+ '3': 4,
+ }
+ }
+ }
+ c = _old_mergedict(a, b)
+ d = util.mergemanydict([a, b])
+ self.assertEquals(c, d)
+
+ def test_compat_merge_sub_dict2(self):
+ a = {
+ '1': '2',
+ 'b': {
+ 'f': 'g',
+ }
+ }
+ b = {
+ 'b': {
+ 'e': 'c',
+ }
+ }
+ c = _old_mergedict(a, b)
+ d = util.mergemanydict([a, b])
+ self.assertEquals(c, d)
-class TestMergeDict(MockerTestCase):
- def test_simple_merge(self):
- """Test simple non-conflict merge."""
- source = {"key1": "value1"}
- candidate = {"key2": "value2"}
- result = util.mergedict(source, candidate)
- self.assertEqual({"key1": "value1", "key2": "value2"}, result)
-
- def test_nested_merge(self):
- """Test nested merge."""
- source = {"key1": {"key1.1": "value1.1"}}
- candidate = {"key1": {"key1.2": "value1.2"}}
- result = util.mergedict(source, candidate)
- self.assertEqual(
- {"key1": {"key1.1": "value1.1", "key1.2": "value1.2"}}, result)
-
- def test_merge_does_not_override(self):
- """Test that candidate doesn't override source."""
- source = {"key1": "value1", "key2": "value2"}
- candidate = {"key1": "value2", "key2": "NEW VALUE"}
- result = util.mergedict(source, candidate)
- self.assertEqual(source, result)
-
- def test_empty_candidate(self):
- """Test empty candidate doesn't change source."""
- source = {"key": "value"}
- candidate = {}
- result = util.mergedict(source, candidate)
- self.assertEqual(source, result)
-
- def test_empty_source(self):
- """Test empty source is replaced by candidate."""
- source = {}
- candidate = {"key": "value"}
- result = util.mergedict(source, candidate)
- self.assertEqual(candidate, result)
-
- def test_non_dict_candidate(self):
- """Test non-dict candidate is discarded."""
- source = {"key": "value"}
- candidate = "not a dict"
- result = util.mergedict(source, candidate)
- self.assertEqual(source, result)
-
- def test_non_dict_source(self):
- """Test non-dict source is not modified with a dict candidate."""
- source = "not a dict"
- candidate = {"key": "value"}
- result = util.mergedict(source, candidate)
- self.assertEqual(source, result)
-
- def test_neither_dict(self):
- """Test if neither candidate or source is dict source wins."""
- source = "source"
- candidate = "candidate"
- result = util.mergedict(source, candidate)
- self.assertEqual(source, result)
+ def test_compat_merge_sub_list(self):
+ a = {
+ '1': '2',
+ 'b': {
+ 'f': ['1'],
+ }
+ }
+ b = {
+ 'b': {
+ 'f': [],
+ }
+ }
+ c = _old_mergedict(a, b)
+ d = util.mergemanydict([a, b])
+ self.assertEquals(c, d)
diff --git a/tests/unittests/test_sshutil.py b/tests/unittests/test_sshutil.py
new file mode 100644
index 00000000..d8662cac
--- /dev/null
+++ b/tests/unittests/test_sshutil.py
@@ -0,0 +1,101 @@
+from cloudinit import ssh_util
+from unittest import TestCase
+
+
+VALID_CONTENT = {
+ 'dsa': (
+ "AAAAB3NzaC1kc3MAAACBAIrjOQSlSea19bExXBMBKBvcLhBoVvNBjCppNzllipF"
+ "W4jgIOMcNanULRrZGjkOKat6MWJNetSbV1E6IOFDQ16rQgsh/OvYU9XhzM8seLa"
+ "A21VszZuhIV7/2DE3vxu7B54zVzueG1O1Deq6goQCRGWBUnqO2yluJiG4HzrnDa"
+ "jzRAAAAFQDMPO96qXd4F5A+5b2f2MO7SpVomQAAAIBpC3K2zIbDLqBBs1fn7rsv"
+ "KcJvwihdlVjG7UXsDB76P2GNqVG+IlYPpJZ8TO/B/fzTMtrdXp9pSm9OY1+BgN4"
+ "REsZ2WNcvfgY33aWaEM+ieCcQigvxrNAF2FTVcbUIIxAn6SmHuQSWrLSfdHc8H7"
+ "hsrgeUPPdzjBD/cv2ZmqwZ1AAAAIAplIsScrJut5wJMgyK1JG0Kbw9JYQpLe95P"
+ "obB069g8+mYR8U0fysmTEdR44mMu0VNU5E5OhTYoTGfXrVrkR134LqFM2zpVVbE"
+ "JNDnIqDHxTkc6LY2vu8Y2pQ3/bVnllZZOda2oD5HQ7ovygQa6CH+fbaZHbdDUX/"
+ "5z7u2rVAlDw=="
+ ),
+ 'ecdsa': (
+ "AAAAE2VjZHNhLXNoYTItbmlzdHAyNTYAAAAIbmlzdHAyNTYAAABBBITrGBB3cgJ"
+ "J7fPxvtMW9H3oRisNpJ3OAslxZeyP7I0A9BPAW0RQIwHVtVnM7zrp4nI+JLZov/"
+ "Ql7lc2leWL7CY="
+ ),
+ 'rsa': (
+ "AAAAB3NzaC1yc2EAAAABIwAAAQEA3I7VUf2l5gSn5uavROsc5HRDpZdQueUq5oz"
+ "emNSj8T7enqKHOEaFoU2VoPgGEWC9RyzSQVeyD6s7APMcE82EtmW4skVEgEGSbD"
+ "c1pvxzxtchBj78hJP6Cf5TCMFSXw+Fz5rF1dR23QDbN1mkHs7adr8GW4kSWqU7Q"
+ "7NDwfIrJJtO7Hi42GyXtvEONHbiRPOe8stqUly7MvUoN+5kfjBM8Qqpfl2+FNhT"
+ "YWpMfYdPUnE7u536WqzFmsaqJctz3gBxH9Ex7dFtrxR4qiqEr9Qtlu3xGn7Bw07"
+ "/+i1D+ey3ONkZLN+LQ714cgj8fRS4Hj29SCmXp5Kt5/82cD/VN3NtHw=="
+ ),
+}
+
+TEST_OPTIONS = ("no-port-forwarding,no-agent-forwarding,no-X11-forwarding,"
+ 'command="echo \'Please login as the user \"ubuntu\" rather than the'
+ 'user \"root\".\';echo;sleep 10"')
+
+
+class TestAuthKeyLineParser(TestCase):
+ def test_simple_parse(self):
+ # test key line with common 3 fields (keytype, base64, comment)
+ parser = ssh_util.AuthKeyLineParser()
+ for ktype in ['rsa', 'ecdsa', 'dsa']:
+ content = VALID_CONTENT[ktype]
+ comment = 'user-%s@host' % ktype
+ line = ' '.join((ktype, content, comment,))
+ key = parser.parse(line)
+
+ self.assertEqual(key.base64, content)
+ self.assertFalse(key.options)
+ self.assertEqual(key.comment, comment)
+ self.assertEqual(key.keytype, ktype)
+
+ def test_parse_no_comment(self):
+ # test key line with key type and base64 only
+ parser = ssh_util.AuthKeyLineParser()
+ for ktype in ['rsa', 'ecdsa', 'dsa']:
+ content = VALID_CONTENT[ktype]
+ line = ' '.join((ktype, content,))
+ key = parser.parse(line)
+
+ self.assertEqual(key.base64, content)
+ self.assertFalse(key.options)
+ self.assertFalse(key.comment)
+ self.assertEqual(key.keytype, ktype)
+
+ def test_parse_with_keyoptions(self):
+ # test key line with options in it
+ parser = ssh_util.AuthKeyLineParser()
+ options = TEST_OPTIONS
+ for ktype in ['rsa', 'ecdsa', 'dsa']:
+ content = VALID_CONTENT[ktype]
+ comment = 'user-%s@host' % ktype
+ line = ' '.join((options, ktype, content, comment,))
+ key = parser.parse(line)
+
+ self.assertEqual(key.base64, content)
+ self.assertEqual(key.options, options)
+ self.assertEqual(key.comment, comment)
+ self.assertEqual(key.keytype, ktype)
+
+ def test_parse_with_options_passed_in(self):
+ # test key line with key type and base64 only
+ parser = ssh_util.AuthKeyLineParser()
+
+ baseline = ' '.join(("rsa", VALID_CONTENT['rsa'], "user@host"))
+ myopts = "no-port-forwarding,no-agent-forwarding"
+
+ key = parser.parse("allowedopt" + " " + baseline)
+ self.assertEqual(key.options, "allowedopt")
+
+ key = parser.parse("overridden_opt " + baseline, options=myopts)
+ self.assertEqual(key.options, myopts)
+
+ def test_parse_invalid_keytype(self):
+ parser = ssh_util.AuthKeyLineParser()
+ key = parser.parse(' '.join(["badkeytype", VALID_CONTENT['rsa']]))
+
+ self.assertFalse(key.valid())
+
+
+# vi: ts=4 expandtab
diff --git a/tests/unittests/test_userdata.py b/tests/unittests/test_userdata.py
index 82a4c555..b227616c 100644
--- a/tests/unittests/test_userdata.py
+++ b/tests/unittests/test_userdata.py
@@ -2,19 +2,25 @@
import StringIO
+import gzip
import logging
import os
from email.mime.base import MIMEBase
+from email.mime.multipart import MIMEMultipart
+from email.mime.application import MIMEApplication
-from mocker import MockerTestCase
-
+from cloudinit import handlers
+from cloudinit import helpers as c_helpers
from cloudinit import log
from cloudinit import sources
from cloudinit import stages
+from cloudinit import util
INSTANCE_ID = "i-testing"
+from tests.unittests import helpers
+
class FakeDataSource(sources.DataSource):
@@ -26,22 +32,16 @@ class FakeDataSource(sources.DataSource):
# FIXME: these tests shouldn't be checking log output??
# Weirddddd...
-
-
-class TestConsumeUserData(MockerTestCase):
+class TestConsumeUserData(helpers.FilesystemMockingTestCase):
def setUp(self):
- MockerTestCase.setUp(self)
- # Replace the write so no actual files
- # get written out...
- self.mock_write = self.mocker.replace("cloudinit.util.write_file",
- passthrough=False)
+ helpers.FilesystemMockingTestCase.setUp(self)
self._log = None
self._log_file = None
self._log_handler = None
def tearDown(self):
- MockerTestCase.tearDown(self)
+ helpers.FilesystemMockingTestCase.tearDown(self)
if self._log_handler and self._log:
self._log.removeHandler(self._log_handler)
@@ -53,13 +53,134 @@ class TestConsumeUserData(MockerTestCase):
self._log.addHandler(self._log_handler)
return log_file
+ def test_simple_jsonp(self):
+ blob = '''
+#cloud-config-jsonp
+[
+ { "op": "add", "path": "/baz", "value": "qux" },
+ { "op": "add", "path": "/bar", "value": "qux2" }
+]
+'''
+
+ ci = stages.Init()
+ ci.datasource = FakeDataSource(blob)
+ new_root = self.makeDir()
+ self.patchUtils(new_root)
+ self.patchOS(new_root)
+ ci.fetch()
+ ci.consume_userdata()
+ cc_contents = util.load_file(ci.paths.get_ipath("cloud_config"))
+ cc = util.load_yaml(cc_contents)
+ self.assertEquals(2, len(cc))
+ self.assertEquals('qux', cc['baz'])
+ self.assertEquals('qux2', cc['bar'])
+
+ def test_mixed_cloud_config(self):
+ blob_cc = '''
+#cloud-config
+a: b
+c: d
+'''
+ message_cc = MIMEBase("text", "cloud-config")
+ message_cc.set_payload(blob_cc)
+
+ blob_jp = '''
+#cloud-config-jsonp
+[
+ { "op": "replace", "path": "/a", "value": "c" },
+ { "op": "remove", "path": "/c" }
+]
+'''
+
+ message_jp = MIMEBase('text', "cloud-config-jsonp")
+ message_jp.set_payload(blob_jp)
+
+ message = MIMEMultipart()
+ message.attach(message_cc)
+ message.attach(message_jp)
+
+ ci = stages.Init()
+ ci.datasource = FakeDataSource(str(message))
+ new_root = self.makeDir()
+ self.patchUtils(new_root)
+ self.patchOS(new_root)
+ ci.fetch()
+ ci.consume_userdata()
+ cc_contents = util.load_file(ci.paths.get_ipath("cloud_config"))
+ cc = util.load_yaml(cc_contents)
+ self.assertEquals(1, len(cc))
+ self.assertEquals('c', cc['a'])
+
+ def test_merging_cloud_config(self):
+ blob = '''
+#cloud-config
+a: b
+e: f
+run:
+ - b
+ - c
+'''
+ message1 = MIMEBase("text", "cloud-config")
+ message1.set_payload(blob)
+
+ blob2 = '''
+#cloud-config
+a: e
+e: g
+run:
+ - stuff
+ - morestuff
+'''
+ message2 = MIMEBase("text", "cloud-config")
+ message2['X-Merge-Type'] = ('dict(recurse_array,'
+ 'recurse_str)+list(append)+str(append)')
+ message2.set_payload(blob2)
+
+ blob3 = '''
+#cloud-config
+e:
+ - 1
+ - 2
+ - 3
+p: 1
+'''
+ message3 = MIMEBase("text", "cloud-config")
+ message3.set_payload(blob3)
+
+ messages = [message1, message2, message3]
+
+ paths = c_helpers.Paths({}, ds=FakeDataSource(''))
+ cloud_cfg = handlers.cloud_config.CloudConfigPartHandler(paths)
+
+ new_root = self.makeDir()
+ self.patchUtils(new_root)
+ self.patchOS(new_root)
+ cloud_cfg.handle_part(None, handlers.CONTENT_START, None, None, None,
+ None)
+ for i, m in enumerate(messages):
+ headers = dict(m)
+ fn = "part-%s" % (i + 1)
+ payload = m.get_payload(decode=True)
+ cloud_cfg.handle_part(None, headers['Content-Type'],
+ fn, payload, None, headers)
+ cloud_cfg.handle_part(None, handlers.CONTENT_END, None, None, None,
+ None)
+ contents = util.load_file(paths.get_ipath('cloud_config'))
+ contents = util.load_yaml(contents)
+ self.assertEquals(contents['run'], ['b', 'c', 'stuff', 'morestuff'])
+ self.assertEquals(contents['a'], 'be')
+ self.assertEquals(contents['e'], [1, 2, 3])
+ self.assertEquals(contents['p'], 1)
+
def test_unhandled_type_warning(self):
"""Raw text without magic is ignored but shows warning."""
ci = stages.Init()
data = "arbitrary text\n"
ci.datasource = FakeDataSource(data)
- self.mock_write(ci.paths.get_ipath("cloud_config"), "", 0600)
+ mock_write = self.mocker.replace("cloudinit.util.write_file",
+ passthrough=False)
+ mock_write(ci.paths.get_ipath("cloud_config"), "", 0600)
self.mocker.replay()
log_file = self.capture_log(logging.WARNING)
@@ -69,6 +190,46 @@ class TestConsumeUserData(MockerTestCase):
"Unhandled non-multipart (text/x-not-multipart) userdata:",
log_file.getvalue())
+ def test_mime_gzip_compressed(self):
+ """Tests that individual message gzip encoding works."""
+
+ def gzip_part(text):
+ contents = StringIO.StringIO()
+ f = gzip.GzipFile(fileobj=contents, mode='w')
+ f.write(str(text))
+ f.flush()
+ f.close()
+ return MIMEApplication(contents.getvalue(), 'gzip')
+
+ base_content1 = '''
+#cloud-config
+a: 2
+'''
+
+ base_content2 = '''
+#cloud-config
+b: 3
+c: 4
+'''
+
+ message = MIMEMultipart('test')
+ message.attach(gzip_part(base_content1))
+ message.attach(gzip_part(base_content2))
+ ci = stages.Init()
+ ci.datasource = FakeDataSource(str(message))
+ new_root = self.makeDir()
+ self.patchUtils(new_root)
+ self.patchOS(new_root)
+ ci.fetch()
+ ci.consume_userdata()
+ contents = util.load_file(ci.paths.get_ipath("cloud_config"))
+ contents = util.load_yaml(contents)
+ self.assertTrue(isinstance(contents, dict))
+ self.assertEquals(3, len(contents))
+ self.assertEquals(2, contents['a'])
+ self.assertEquals(3, contents['b'])
+ self.assertEquals(4, contents['c'])
+
def test_mime_text_plain(self):
"""Mime message of type text/plain is ignored but shows warning."""
ci = stages.Init()
@@ -76,7 +237,9 @@ class TestConsumeUserData(MockerTestCase):
message.set_payload("Just text")
ci.datasource = FakeDataSource(message.as_string())
- self.mock_write(ci.paths.get_ipath("cloud_config"), "", 0600)
+ mock_write = self.mocker.replace("cloudinit.util.write_file",
+ passthrough=False)
+ mock_write(ci.paths.get_ipath("cloud_config"), "", 0600)
self.mocker.replay()
log_file = self.capture_log(logging.WARNING)
@@ -93,8 +256,10 @@ class TestConsumeUserData(MockerTestCase):
ci.datasource = FakeDataSource(script)
outpath = os.path.join(ci.paths.get_ipath_cur("scripts"), "part-001")
- self.mock_write(ci.paths.get_ipath("cloud_config"), "", 0600)
- self.mock_write(outpath, script, 0700)
+ mock_write = self.mocker.replace("cloudinit.util.write_file",
+ passthrough=False)
+ mock_write(ci.paths.get_ipath("cloud_config"), "", 0600)
+ mock_write(outpath, script, 0700)
self.mocker.replay()
log_file = self.capture_log(logging.WARNING)
@@ -111,8 +276,10 @@ class TestConsumeUserData(MockerTestCase):
ci.datasource = FakeDataSource(message.as_string())
outpath = os.path.join(ci.paths.get_ipath_cur("scripts"), "part-001")
- self.mock_write(ci.paths.get_ipath("cloud_config"), "", 0600)
- self.mock_write(outpath, script, 0700)
+ mock_write = self.mocker.replace("cloudinit.util.write_file",
+ passthrough=False)
+ mock_write(ci.paths.get_ipath("cloud_config"), "", 0600)
+ mock_write(outpath, script, 0700)
self.mocker.replay()
log_file = self.capture_log(logging.WARNING)
@@ -129,8 +296,10 @@ class TestConsumeUserData(MockerTestCase):
ci.datasource = FakeDataSource(message.as_string())
outpath = os.path.join(ci.paths.get_ipath_cur("scripts"), "part-001")
- self.mock_write(outpath, script, 0700)
- self.mock_write(ci.paths.get_ipath("cloud_config"), "", 0600)
+ mock_write = self.mocker.replace("cloudinit.util.write_file",
+ passthrough=False)
+ mock_write(outpath, script, 0700)
+ mock_write(ci.paths.get_ipath("cloud_config"), "", 0600)
self.mocker.replay()
log_file = self.capture_log(logging.WARNING)
diff --git a/tests/unittests/test_util.py b/tests/unittests/test_util.py
index 02611581..87415cb5 100644
--- a/tests/unittests/test_util.py
+++ b/tests/unittests/test_util.py
@@ -1,9 +1,12 @@
+# pylint: disable=C0301
+# the mountinfo data lines are too long
import os
import stat
import yaml
from mocker import MockerTestCase
from unittest import TestCase
+from tests.unittests import helpers
from cloudinit import importer
from cloudinit import util
@@ -248,4 +251,60 @@ class TestLoadYaml(TestCase):
myobj)
+class TestMountinfoParsing(helpers.ResourceUsingTestCase):
+ def test_invalid_mountinfo(self):
+ line = ("20 1 252:1 / / rw,relatime - ext4 /dev/mapper/vg0-root"
+ "rw,errors=remount-ro,data=ordered")
+ elements = line.split()
+ for i in range(len(elements) + 1):
+ lines = [' '.join(elements[0:i])]
+ if i < 10:
+ expected = None
+ else:
+ expected = ('/dev/mapper/vg0-root', 'ext4', '/')
+ self.assertEqual(expected, util.parse_mount_info('/', lines))
+
+ def test_precise_ext4_root(self):
+
+ lines = self.readResource('mountinfo_precise_ext4.txt').splitlines()
+
+ expected = ('/dev/mapper/vg0-root', 'ext4', '/')
+ self.assertEqual(expected, util.parse_mount_info('/', lines))
+ self.assertEqual(expected, util.parse_mount_info('/usr', lines))
+ self.assertEqual(expected, util.parse_mount_info('/usr/bin', lines))
+
+ expected = ('/dev/md0', 'ext4', '/boot')
+ self.assertEqual(expected, util.parse_mount_info('/boot', lines))
+ self.assertEqual(expected, util.parse_mount_info('/boot/grub', lines))
+
+ expected = ('/dev/mapper/vg0-root', 'ext4', '/')
+ self.assertEqual(expected, util.parse_mount_info('/home', lines))
+ self.assertEqual(expected, util.parse_mount_info('/home/me', lines))
+
+ expected = ('tmpfs', 'tmpfs', '/run')
+ self.assertEqual(expected, util.parse_mount_info('/run', lines))
+
+ expected = ('none', 'tmpfs', '/run/lock')
+ self.assertEqual(expected, util.parse_mount_info('/run/lock', lines))
+
+ def test_raring_btrfs_root(self):
+ lines = self.readResource('mountinfo_raring_btrfs.txt').splitlines()
+
+ expected = ('/dev/vda1', 'btrfs', '/')
+ self.assertEqual(expected, util.parse_mount_info('/', lines))
+ self.assertEqual(expected, util.parse_mount_info('/usr', lines))
+ self.assertEqual(expected, util.parse_mount_info('/usr/bin', lines))
+ self.assertEqual(expected, util.parse_mount_info('/boot', lines))
+ self.assertEqual(expected, util.parse_mount_info('/boot/grub', lines))
+
+ expected = ('/dev/vda1', 'btrfs', '/home')
+ self.assertEqual(expected, util.parse_mount_info('/home', lines))
+ self.assertEqual(expected, util.parse_mount_info('/home/me', lines))
+
+ expected = ('tmpfs', 'tmpfs', '/run')
+ self.assertEqual(expected, util.parse_mount_info('/run', lines))
+
+ expected = ('none', 'tmpfs', '/run/lock')
+ self.assertEqual(expected, util.parse_mount_info('/run/lock', lines))
+
# vi: ts=4 expandtab