diff options
Diffstat (limited to 'tests/unittests')
-rw-r--r-- | tests/unittests/helpers.py | 4 | ||||
-rw-r--r-- | tests/unittests/test__init__.py | 36 | ||||
-rw-r--r-- | tests/unittests/test_builtin_handlers.py | 22 | ||||
-rw-r--r-- | tests/unittests/test_datasource/test_azure.py | 331 | ||||
-rw-r--r-- | tests/unittests/test_datasource/test_configdrive.py | 5 | ||||
-rw-r--r-- | tests/unittests/test_datasource/test_maas.py | 12 | ||||
-rw-r--r-- | tests/unittests/test_datasource/test_nocloud.py | 4 | ||||
-rw-r--r-- | tests/unittests/test_datasource/test_smartos.py | 273 | ||||
-rw-r--r-- | tests/unittests/test_handler/test_handler_apt_configure.py | 106 | ||||
-rw-r--r-- | tests/unittests/test_handler/test_handler_growpart.py | 258 | ||||
-rw-r--r-- | tests/unittests/test_handler/test_handler_locale.py | 64 | ||||
-rw-r--r-- | tests/unittests/test_handler/test_handler_set_hostname.py | 14 | ||||
-rw-r--r-- | tests/unittests/test_handler/test_handler_timezone.py | 75 | ||||
-rw-r--r-- | tests/unittests/test_merging.py | 309 | ||||
-rw-r--r-- | tests/unittests/test_sshutil.py | 101 | ||||
-rw-r--r-- | tests/unittests/test_userdata.py | 207 | ||||
-rw-r--r-- | tests/unittests/test_util.py | 59 |
17 files changed, 1769 insertions, 111 deletions
diff --git a/tests/unittests/helpers.py b/tests/unittests/helpers.py index 904677f1..c0da0983 100644 --- a/tests/unittests/helpers.py +++ b/tests/unittests/helpers.py @@ -146,7 +146,8 @@ class FilesystemMockingTestCase(ResourceUsingTestCase): ('chmod', 1), ('delete_dir_contents', 1), ('del_file', 1), - ('sym_link', -1)], + ('sym_link', -1), + ('copy', -1)], } for (mod, funcs) in patch_funcs.items(): for (f, am) in funcs: @@ -175,6 +176,7 @@ class FilesystemMockingTestCase(ResourceUsingTestCase): def patchOS(self, new_root): patch_funcs = { os.path: ['isfile', 'exists', 'islink', 'isdir'], + os: ['listdir'], } for (mod, funcs) in patch_funcs.items(): for f in funcs: diff --git a/tests/unittests/test__init__.py b/tests/unittests/test__init__.py index ac082076..b4b20e51 100644 --- a/tests/unittests/test__init__.py +++ b/tests/unittests/test__init__.py @@ -22,7 +22,8 @@ class FakeModule(handlers.Handler): def list_types(self): return self.types - def _handle_part(self, data, ctype, filename, payload, frequency): + def handle_part(self, _data, ctype, filename, # pylint: disable=W0221 + payload, frequency): pass @@ -103,6 +104,9 @@ class TestHandlerHandlePart(MockerTestCase): self.filename = "fake filename" self.payload = "fake payload" self.frequency = settings.PER_INSTANCE + self.headers = { + 'Content-Type': self.ctype, + } def test_normal_version_1(self): """ @@ -118,8 +122,8 @@ class TestHandlerHandlePart(MockerTestCase): self.payload) self.mocker.replay() - handlers.run_part(mod_mock, self.data, self.ctype, self.filename, - self.payload, self.frequency) + handlers.run_part(mod_mock, self.data, self.filename, + self.payload, self.frequency, self.headers) def test_normal_version_2(self): """ @@ -135,8 +139,8 @@ class TestHandlerHandlePart(MockerTestCase): self.payload, self.frequency) self.mocker.replay() - handlers.run_part(mod_mock, self.data, self.ctype, self.filename, - self.payload, self.frequency) + handlers.run_part(mod_mock, self.data, self.filename, + self.payload, self.frequency, self.headers) def test_modfreq_per_always(self): """ @@ -152,8 +156,8 @@ class TestHandlerHandlePart(MockerTestCase): self.payload) self.mocker.replay() - handlers.run_part(mod_mock, self.data, self.ctype, self.filename, - self.payload, self.frequency) + handlers.run_part(mod_mock, self.data, self.filename, + self.payload, self.frequency, self.headers) def test_no_handle_when_modfreq_once(self): """C{handle_part} is not called if frequency is once.""" @@ -163,8 +167,8 @@ class TestHandlerHandlePart(MockerTestCase): self.mocker.result(settings.PER_ONCE) self.mocker.replay() - handlers.run_part(mod_mock, self.data, self.ctype, self.filename, - self.payload, self.frequency) + handlers.run_part(mod_mock, self.data, self.filename, + self.payload, self.frequency, self.headers) def test_exception_is_caught(self): """Exceptions within C{handle_part} are caught and logged.""" @@ -178,8 +182,8 @@ class TestHandlerHandlePart(MockerTestCase): self.mocker.throw(Exception()) self.mocker.replay() - handlers.run_part(mod_mock, self.data, self.ctype, self.filename, - self.payload, self.frequency) + handlers.run_part(mod_mock, self.data, self.filename, + self.payload, self.frequency, self.headers) class TestCmdlineUrl(MockerTestCase): @@ -191,8 +195,8 @@ class TestCmdlineUrl(MockerTestCase): mock_readurl = self.mocker.replace(url_helper.readurl, passthrough=False) - mock_readurl(url) - self.mocker.result(url_helper.UrlResponse(200, payload)) + mock_readurl(url, ARGS, KWARGS) + self.mocker.result(util.StringResponse(payload)) self.mocker.replay() self.assertEqual((key, url, None), @@ -207,8 +211,8 @@ class TestCmdlineUrl(MockerTestCase): mock_readurl = self.mocker.replace(url_helper.readurl, passthrough=False) - mock_readurl(url) - self.mocker.result(url_helper.UrlResponse(200, payload)) + mock_readurl(url, ARGS, KWARGS) + self.mocker.result(util.StringResponse(payload)) self.mocker.replay() self.assertEqual((key, url, payload), @@ -221,7 +225,7 @@ class TestCmdlineUrl(MockerTestCase): cmdline = "ro %s=%s bar=1" % (key, url) self.mocker.replace(url_helper.readurl, passthrough=False) - self.mocker.result(url_helper.UrlResponse(400)) + self.mocker.result(util.StringResponse("")) self.mocker.replay() self.assertEqual((None, None, None), diff --git a/tests/unittests/test_builtin_handlers.py b/tests/unittests/test_builtin_handlers.py index 5f41cb3d..b387f13b 100644 --- a/tests/unittests/test_builtin_handlers.py +++ b/tests/unittests/test_builtin_handlers.py @@ -2,7 +2,7 @@ import os -from mocker import MockerTestCase +from tests.unittests import helpers as test_helpers from cloudinit import handlers from cloudinit import helpers @@ -13,7 +13,7 @@ from cloudinit.handlers import upstart_job from cloudinit.settings import (PER_ALWAYS, PER_INSTANCE) -class TestBuiltins(MockerTestCase): +class TestBuiltins(test_helpers.FilesystemMockingTestCase): def test_upstart_frequency_no_out(self): c_root = self.makeDir() @@ -36,13 +36,18 @@ class TestBuiltins(MockerTestCase): def test_upstart_frequency_single(self): # files should be written out when frequency is ! per-instance - c_root = self.makeDir() - up_root = self.makeDir() + new_root = self.makeDir() + freq = PER_INSTANCE + + self.patchOS(new_root) + self.patchUtils(new_root) paths = helpers.Paths({ - 'cloud_dir': c_root, - 'upstart_dir': up_root, + 'upstart_dir': "/etc/upstart", }) - freq = PER_INSTANCE + + upstart_job.SUITABLE_UPSTART = True + util.ensure_dir("/run") + util.ensure_dir("/etc/upstart") mock_subp = self.mocker.replace(util.subp, passthrough=False) mock_subp(["initctl", "reload-configuration"], capture=False) @@ -55,4 +60,5 @@ class TestBuiltins(MockerTestCase): 'test.conf', 'blah', freq) h.handle_part('', handlers.CONTENT_END, None, None, None) - self.assertEquals(1, len(os.listdir(up_root))) + + self.assertEquals(1, len(os.listdir('/etc/upstart'))) diff --git a/tests/unittests/test_datasource/test_azure.py b/tests/unittests/test_datasource/test_azure.py new file mode 100644 index 00000000..1ca6a79d --- /dev/null +++ b/tests/unittests/test_datasource/test_azure.py @@ -0,0 +1,331 @@ +from cloudinit import helpers +from cloudinit.sources import DataSourceAzure +from tests.unittests.helpers import populate_dir + +import crypt +import base64 +from mocker import MockerTestCase +import os +import yaml + + +def construct_valid_ovf_env(data=None, pubkeys=None, userdata=None): + if data is None: + data = {'HostName': 'FOOHOST'} + if pubkeys is None: + pubkeys = {} + + content = """<?xml version="1.0" encoding="utf-8"?> +<Environment xmlns="http://schemas.dmtf.org/ovf/environment/1" + xmlns:oe="http://schemas.dmtf.org/ovf/environment/1" + xmlns:wa="http://schemas.microsoft.com/windowsazure" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"> + + <wa:ProvisioningSection><wa:Version>1.0</wa:Version> + <LinuxProvisioningConfigurationSet + xmlns="http://schemas.microsoft.com/windowsazure" + xmlns:i="http://www.w3.org/2001/XMLSchema-instance"> + <ConfigurationSetType>LinuxProvisioningConfiguration</ConfigurationSetType> + """ + for key, dval in data.items(): + if isinstance(dval, dict): + val = dval.get('text') + attrs = ' ' + ' '.join(["%s='%s'" % (k, v) for k, v in dval.items() + if k != 'text']) + else: + val = dval + attrs = "" + content += "<%s%s>%s</%s>\n" % (key, attrs, val, key) + + if userdata: + content += "<UserData>%s</UserData>\n" % (base64.b64encode(userdata)) + + if pubkeys: + content += "<SSH><PublicKeys>\n" + for fp, path in pubkeys: + content += " <PublicKey>" + content += ("<Fingerprint>%s</Fingerprint><Path>%s</Path>" % + (fp, path)) + content += "</PublicKey>\n" + content += "</PublicKeys></SSH>" + content += """ + </LinuxProvisioningConfigurationSet> + </wa:ProvisioningSection> + <wa:PlatformSettingsSection><wa:Version>1.0</wa:Version> + <PlatformSettings xmlns="http://schemas.microsoft.com/windowsazure" + xmlns:i="http://www.w3.org/2001/XMLSchema-instance"> + <KmsServerHostname>kms.core.windows.net</KmsServerHostname> + <ProvisionGuestAgent>false</ProvisionGuestAgent> + <GuestAgentPackageName i:nil="true" /> + </PlatformSettings></wa:PlatformSettingsSection> +</Environment> + """ + + return content + + +class TestAzureDataSource(MockerTestCase): + + def setUp(self): + # makeDir comes from MockerTestCase + self.tmp = self.makeDir() + + # patch cloud_dir, so our 'seed_dir' is guaranteed empty + self.paths = helpers.Paths({'cloud_dir': self.tmp}) + + self.unapply = [] + super(TestAzureDataSource, self).setUp() + + def tearDown(self): + apply_patches([i for i in reversed(self.unapply)]) + super(TestAzureDataSource, self).tearDown() + + def apply_patches(self, patches): + ret = apply_patches(patches) + self.unapply += ret + + def _get_ds(self, data): + + def dsdevs(): + return data.get('dsdevs', []) + + def _invoke_agent(cmd): + data['agent_invoked'] = cmd + + def _write_files(datadir, files, dirmode): + data['files'] = {} + data['datadir'] = datadir + data['datadir_mode'] = dirmode + for (fname, content) in files.items(): + data['files'][fname] = content + + def _wait_for_files(flist, _maxwait=None, _naplen=None): + data['waited'] = flist + return [] + + def _pubkeys_from_crt_files(flist): + data['pubkey_files'] = flist + return ["pubkey_from: %s" % f for f in flist] + + def _iid_from_shared_config(path): + data['iid_from_shared_cfg'] = path + return 'i-my-azure-id' + + def _apply_hostname_bounce(**kwargs): + data['apply_hostname_bounce'] = kwargs + + if data.get('ovfcontent') is not None: + populate_dir(os.path.join(self.paths.seed_dir, "azure"), + {'ovf-env.xml': data['ovfcontent']}) + + mod = DataSourceAzure + + if data.get('dsdevs'): + self.apply_patches([(mod, 'list_possible_azure_ds_devs', dsdevs)]) + + self.apply_patches([(mod, 'invoke_agent', _invoke_agent), + (mod, 'write_files', _write_files), + (mod, 'wait_for_files', _wait_for_files), + (mod, 'pubkeys_from_crt_files', + _pubkeys_from_crt_files), + (mod, 'iid_from_shared_config', + _iid_from_shared_config), + (mod, 'apply_hostname_bounce', + _apply_hostname_bounce), ]) + + dsrc = mod.DataSourceAzureNet( + data.get('sys_cfg', {}), distro=None, paths=self.paths) + + return dsrc + + def test_basic_seed_dir(self): + odata = {'HostName': "myhost", 'UserName': "myuser"} + data = {'ovfcontent': construct_valid_ovf_env(data=odata), + 'sys_cfg': {}} + + dsrc = self._get_ds(data) + ret = dsrc.get_data() + self.assertTrue(ret) + self.assertEqual(dsrc.userdata_raw, "") + self.assertEqual(dsrc.metadata['local-hostname'], odata['HostName']) + self.assertTrue('ovf-env.xml' in data['files']) + self.assertEqual(0700, data['datadir_mode']) + self.assertEqual(dsrc.metadata['instance-id'], 'i-my-azure-id') + + def test_user_cfg_set_agent_command_plain(self): + # set dscfg in via plaintext + cfg = {'agent_command': "my_command"} + odata = {'HostName': "myhost", 'UserName': "myuser", + 'dscfg': {'text': yaml.dump(cfg), 'encoding': 'plain'}} + data = {'ovfcontent': construct_valid_ovf_env(data=odata)} + + dsrc = self._get_ds(data) + ret = dsrc.get_data() + self.assertTrue(ret) + self.assertEqual(data['agent_invoked'], cfg['agent_command']) + + def test_user_cfg_set_agent_command(self): + # set dscfg in via base64 encoded yaml + cfg = {'agent_command': "my_command"} + odata = {'HostName': "myhost", 'UserName': "myuser", + 'dscfg': {'text': base64.b64encode(yaml.dump(cfg)), + 'encoding': 'base64'}} + data = {'ovfcontent': construct_valid_ovf_env(data=odata)} + + dsrc = self._get_ds(data) + ret = dsrc.get_data() + self.assertTrue(ret) + self.assertEqual(data['agent_invoked'], cfg['agent_command']) + + def test_sys_cfg_set_agent_command(self): + sys_cfg = {'datasource': {'Azure': {'agent_command': '_COMMAND'}}} + data = {'ovfcontent': construct_valid_ovf_env(data={}), + 'sys_cfg': sys_cfg} + + dsrc = self._get_ds(data) + ret = dsrc.get_data() + self.assertTrue(ret) + self.assertEqual(data['agent_invoked'], '_COMMAND') + + def test_username_used(self): + odata = {'HostName': "myhost", 'UserName': "myuser"} + data = {'ovfcontent': construct_valid_ovf_env(data=odata)} + + dsrc = self._get_ds(data) + ret = dsrc.get_data() + self.assertTrue(ret) + self.assertEqual(dsrc.cfg['system_info']['default_user']['name'], + "myuser") + + def test_password_given(self): + odata = {'HostName': "myhost", 'UserName': "myuser", + 'UserPassword': "mypass"} + data = {'ovfcontent': construct_valid_ovf_env(data=odata)} + + dsrc = self._get_ds(data) + ret = dsrc.get_data() + self.assertTrue(ret) + self.assertTrue('default_user' in dsrc.cfg['system_info']) + defuser = dsrc.cfg['system_info']['default_user'] + + # default user should be updated username and should not be locked. + self.assertEqual(defuser['name'], odata['UserName']) + self.assertFalse(defuser['lock_passwd']) + # passwd is crypt formated string $id$salt$encrypted + # encrypting plaintext with salt value of everything up to final '$' + # should equal that after the '$' + pos = defuser['passwd'].rfind("$") + 1 + self.assertEqual(defuser['passwd'], + crypt.crypt(odata['UserPassword'], defuser['passwd'][0:pos])) + + def test_userdata_found(self): + mydata = "FOOBAR" + odata = {'UserData': base64.b64encode(mydata)} + data = {'ovfcontent': construct_valid_ovf_env(data=odata)} + + dsrc = self._get_ds(data) + ret = dsrc.get_data() + self.assertTrue(ret) + self.assertEqual(dsrc.userdata_raw, mydata) + + def test_no_datasource_expected(self): + #no source should be found if no seed_dir and no devs + data = {} + dsrc = self._get_ds({}) + ret = dsrc.get_data() + self.assertFalse(ret) + self.assertFalse('agent_invoked' in data) + + def test_cfg_has_pubkeys(self): + odata = {'HostName': "myhost", 'UserName': "myuser"} + mypklist = [{'fingerprint': 'fp1', 'path': 'path1'}] + pubkeys = [(x['fingerprint'], x['path']) for x in mypklist] + data = {'ovfcontent': construct_valid_ovf_env(data=odata, + pubkeys=pubkeys)} + + dsrc = self._get_ds(data) + ret = dsrc.get_data() + self.assertTrue(ret) + for mypk in mypklist: + self.assertIn(mypk, dsrc.cfg['_pubkeys']) + + def test_disabled_bounce(self): + pass + + def test_apply_bounce_call_1(self): + # hostname needs to get through to apply_hostname_bounce + odata = {'HostName': 'my-random-hostname'} + data = {'ovfcontent': construct_valid_ovf_env(data=odata)} + + self._get_ds(data).get_data() + self.assertIn('hostname', data['apply_hostname_bounce']) + self.assertEqual(data['apply_hostname_bounce']['hostname'], + odata['HostName']) + + def test_apply_bounce_call_configurable(self): + # hostname_bounce should be configurable in datasource cfg + cfg = {'hostname_bounce': {'interface': 'eth1', 'policy': 'off', + 'command': 'my-bounce-command', + 'hostname_command': 'my-hostname-command'}} + odata = {'HostName': "xhost", + 'dscfg': {'text': base64.b64encode(yaml.dump(cfg)), + 'encoding': 'base64'}} + data = {'ovfcontent': construct_valid_ovf_env(data=odata)} + self._get_ds(data).get_data() + + for k in cfg['hostname_bounce']: + self.assertIn(k, data['apply_hostname_bounce']) + + for k, v in cfg['hostname_bounce'].items(): + self.assertEqual(data['apply_hostname_bounce'][k], v) + + def test_set_hostname_disabled(self): + # config specifying set_hostname off should not bounce + cfg = {'set_hostname': False} + odata = {'HostName': "xhost", + 'dscfg': {'text': base64.b64encode(yaml.dump(cfg)), + 'encoding': 'base64'}} + data = {'ovfcontent': construct_valid_ovf_env(data=odata)} + self._get_ds(data).get_data() + + self.assertEqual(data.get('apply_hostname_bounce', "N/A"), "N/A") + + +class TestReadAzureOvf(MockerTestCase): + def test_invalid_xml_raises_non_azure_ds(self): + invalid_xml = "<foo>" + construct_valid_ovf_env(data={}) + self.assertRaises(DataSourceAzure.NonAzureDataSource, + DataSourceAzure.read_azure_ovf, invalid_xml) + + def test_load_with_pubkeys(self): + mypklist = [{'fingerprint': 'fp1', 'path': 'path1'}] + pubkeys = [(x['fingerprint'], x['path']) for x in mypklist] + content = construct_valid_ovf_env(pubkeys=pubkeys) + (_md, _ud, cfg) = DataSourceAzure.read_azure_ovf(content) + for mypk in mypklist: + self.assertIn(mypk, cfg['_pubkeys']) + + +class TestReadAzureSharedConfig(MockerTestCase): + def test_valid_content(self): + xml = """<?xml version="1.0" encoding="utf-8"?> + <SharedConfig> + <Deployment name="MY_INSTANCE_ID"> + <Service name="myservice"/> + <ServiceInstance name="INSTANCE_ID.0" guid="{abcd-uuid}" /> + </Deployment> + <Incarnation number="1"/> + </SharedConfig>""" + ret = DataSourceAzure.iid_from_shared_config_content(xml) + self.assertEqual("MY_INSTANCE_ID", ret) + + +def apply_patches(patches): + ret = [] + for (ref, name, replace) in patches: + if replace is None: + continue + orig = getattr(ref, name) + setattr(ref, name, replace) + ret.append((ref, name, orig)) + return ret diff --git a/tests/unittests/test_datasource/test_configdrive.py b/tests/unittests/test_datasource/test_configdrive.py index 930086db..d5935294 100644 --- a/tests/unittests/test_datasource/test_configdrive.py +++ b/tests/unittests/test_datasource/test_configdrive.py @@ -259,8 +259,9 @@ class TestConfigDriveDataSource(MockerTestCase): def test_find_candidates(self): devs_with_answers = {} - def my_devs_with(criteria): - return devs_with_answers[criteria] + def my_devs_with(*args, **kwargs): + criteria = args[0] if len(args) else kwargs.pop('criteria', None) + return devs_with_answers.get(criteria, []) def my_is_partition(dev): return dev[-1] in "0123456789" and not dev.startswith("sr") diff --git a/tests/unittests/test_datasource/test_maas.py b/tests/unittests/test_datasource/test_maas.py index b56fea82..2007a6df 100644 --- a/tests/unittests/test_datasource/test_maas.py +++ b/tests/unittests/test_datasource/test_maas.py @@ -3,12 +3,13 @@ import os from cloudinit.sources import DataSourceMAAS from cloudinit import url_helper +from cloudinit import util from tests.unittests.helpers import populate_dir -from mocker import MockerTestCase +import mocker -class TestMAASDataSource(MockerTestCase): +class TestMAASDataSource(mocker.MockerTestCase): def setUp(self): super(TestMAASDataSource, self).setUp() @@ -115,9 +116,12 @@ class TestMAASDataSource(MockerTestCase): for key in valid_order: url = "%s/%s/%s" % (my_seed, my_ver, key) - mock_request(url, headers=my_headers, timeout=None) + mock_request(url, headers=None, timeout=mocker.ANY, + data=mocker.ANY, sec_between=mocker.ANY, + ssl_details=mocker.ANY, retries=mocker.ANY, + headers_cb=my_headers_cb) resp = valid.get(key) - self.mocker.result(url_helper.UrlResponse(200, resp)) + self.mocker.result(util.StringResponse(resp)) self.mocker.replay() (userdata, metadata) = DataSourceMAAS.read_maas_seed_url(my_seed, diff --git a/tests/unittests/test_datasource/test_nocloud.py b/tests/unittests/test_datasource/test_nocloud.py index 28e0a472..7328b240 100644 --- a/tests/unittests/test_datasource/test_nocloud.py +++ b/tests/unittests/test_datasource/test_nocloud.py @@ -1,7 +1,7 @@ from cloudinit import helpers -from tests.unittests.helpers import populate_dir from cloudinit.sources import DataSourceNoCloud from cloudinit import util +from tests.unittests.helpers import populate_dir from mocker import MockerTestCase import os @@ -22,7 +22,7 @@ class TestNoCloudDataSource(MockerTestCase): def tearDown(self): apply_patches([i for i in reversed(self.unapply)]) - super(TestNoCloudDataSource, self).setUp() + super(TestNoCloudDataSource, self).tearDown() def apply_patches(self, patches): ret = apply_patches(patches) diff --git a/tests/unittests/test_datasource/test_smartos.py b/tests/unittests/test_datasource/test_smartos.py new file mode 100644 index 00000000..f53715b0 --- /dev/null +++ b/tests/unittests/test_datasource/test_smartos.py @@ -0,0 +1,273 @@ +# vi: ts=4 expandtab +# +# Copyright (C) 2013 Canonical Ltd. +# +# Author: Ben Howard <ben.howard@canonical.com> +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 3, as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +# +# +# This is a testcase for the SmartOS datasource. It replicates a serial +# console and acts like the SmartOS console does in order to validate +# return responses. +# + +import base64 +from cloudinit import helpers +from cloudinit.sources import DataSourceSmartOS + +from mocker import MockerTestCase +import uuid + +MOCK_RETURNS = { + 'hostname': 'test-host', + 'root_authorized_keys': 'ssh-rsa AAAAB3Nz...aC1yc2E= keyname', + 'disable_iptables_flag': None, + 'enable_motd_sys_info': None, + 'test-var1': 'some data', + 'user-data': '\n'.join(['#!/bin/sh', '/bin/true', '']), +} + +DMI_DATA_RETURN = (str(uuid.uuid4()), 'smartdc') + + +class MockSerial(object): + """Fake a serial terminal for testing the code that + interfaces with the serial""" + + port = None + + def __init__(self, mockdata): + self.last = None + self.last = None + self.new = True + self.count = 0 + self.mocked_out = [] + self.mockdata = mockdata + + def open(self): + return True + + def close(self): + return True + + def isOpen(self): + return True + + def write(self, line): + line = line.replace('GET ', '') + self.last = line.rstrip() + + def readline(self): + if self.new: + self.new = False + if self.last in self.mockdata: + return 'SUCCESS\n' + else: + return 'NOTFOUND %s\n' % self.last + + if self.last in self.mockdata: + if not self.mocked_out: + self.mocked_out = [x for x in self._format_out()] + print self.mocked_out + + if len(self.mocked_out) > self.count: + self.count += 1 + return self.mocked_out[self.count - 1] + + def _format_out(self): + if self.last in self.mockdata: + _mret = self.mockdata[self.last] + try: + for l in _mret.splitlines(): + yield "%s\n" % l.rstrip() + except: + yield "%s\n" % _mret.rstrip() + + yield '.' + yield '\n' + + +class TestSmartOSDataSource(MockerTestCase): + def setUp(self): + # makeDir comes from MockerTestCase + self.tmp = self.makeDir() + + # patch cloud_dir, so our 'seed_dir' is guaranteed empty + self.paths = helpers.Paths({'cloud_dir': self.tmp}) + + self.unapply = [] + super(TestSmartOSDataSource, self).setUp() + + def tearDown(self): + apply_patches([i for i in reversed(self.unapply)]) + super(TestSmartOSDataSource, self).tearDown() + + def apply_patches(self, patches): + ret = apply_patches(patches) + self.unapply += ret + + def _get_ds(self, sys_cfg=None, ds_cfg=None, mockdata=None, dmi_data=None): + mod = DataSourceSmartOS + + if mockdata is None: + mockdata = MOCK_RETURNS + + if dmi_data is None: + dmi_data = DMI_DATA_RETURN + + def _get_serial(*_): + return MockSerial(mockdata) + + def _dmi_data(): + return dmi_data + + if sys_cfg is None: + sys_cfg = {} + + if ds_cfg is not None: + sys_cfg['datasource'] = sys_cfg.get('datasource', {}) + sys_cfg['datasource']['SmartOS'] = ds_cfg + + self.apply_patches([(mod, 'get_serial', _get_serial)]) + self.apply_patches([(mod, 'dmi_data', _dmi_data)]) + dsrc = mod.DataSourceSmartOS(sys_cfg, distro=None, + paths=self.paths) + return dsrc + + def test_seed(self): + # default seed should be /dev/ttyS1 + dsrc = self._get_ds() + ret = dsrc.get_data() + self.assertTrue(ret) + self.assertEquals('/dev/ttyS1', dsrc.seed) + + def test_issmartdc(self): + dsrc = self._get_ds() + ret = dsrc.get_data() + self.assertTrue(ret) + self.assertTrue(dsrc.is_smartdc) + + def test_no_base64(self): + ds_cfg = {'no_base64_decode': ['test_var1'], 'all_base': True} + dsrc = self._get_ds(ds_cfg=ds_cfg) + ret = dsrc.get_data() + self.assertTrue(ret) + + def test_uuid(self): + dsrc = self._get_ds(mockdata=MOCK_RETURNS) + ret = dsrc.get_data() + self.assertTrue(ret) + self.assertEquals(DMI_DATA_RETURN[0], dsrc.metadata['instance-id']) + + def test_root_keys(self): + dsrc = self._get_ds(mockdata=MOCK_RETURNS) + ret = dsrc.get_data() + self.assertTrue(ret) + self.assertEquals(MOCK_RETURNS['root_authorized_keys'], + dsrc.metadata['public-keys']) + + def test_hostname_b64(self): + dsrc = self._get_ds(mockdata=MOCK_RETURNS) + ret = dsrc.get_data() + self.assertTrue(ret) + self.assertEquals(MOCK_RETURNS['hostname'], + dsrc.metadata['local-hostname']) + + def test_hostname(self): + dsrc = self._get_ds(mockdata=MOCK_RETURNS) + ret = dsrc.get_data() + self.assertTrue(ret) + self.assertEquals(MOCK_RETURNS['hostname'], + dsrc.metadata['local-hostname']) + + def test_base64_all(self): + # metadata provided base64_all of true + my_returns = MOCK_RETURNS.copy() + my_returns['base64_all'] = "true" + for k in ('hostname', 'user-data'): + my_returns[k] = base64.b64encode(my_returns[k]) + + dsrc = self._get_ds(mockdata=my_returns) + ret = dsrc.get_data() + self.assertTrue(ret) + self.assertEquals(MOCK_RETURNS['hostname'], + dsrc.metadata['local-hostname']) + self.assertEquals(MOCK_RETURNS['user-data'], + dsrc.userdata_raw) + self.assertEquals(MOCK_RETURNS['root_authorized_keys'], + dsrc.metadata['public-keys']) + self.assertEquals(MOCK_RETURNS['disable_iptables_flag'], + dsrc.metadata['iptables_disable']) + self.assertEquals(MOCK_RETURNS['enable_motd_sys_info'], + dsrc.metadata['motd_sys_info']) + + def test_b64_userdata(self): + my_returns = MOCK_RETURNS.copy() + my_returns['b64-user-data'] = "true" + my_returns['b64-hostname'] = "true" + for k in ('hostname', 'user-data'): + my_returns[k] = base64.b64encode(my_returns[k]) + + dsrc = self._get_ds(mockdata=my_returns) + ret = dsrc.get_data() + self.assertTrue(ret) + self.assertEquals(MOCK_RETURNS['hostname'], + dsrc.metadata['local-hostname']) + self.assertEquals(MOCK_RETURNS['user-data'], dsrc.userdata_raw) + self.assertEquals(MOCK_RETURNS['root_authorized_keys'], + dsrc.metadata['public-keys']) + + def test_b64_keys(self): + my_returns = MOCK_RETURNS.copy() + my_returns['base64_keys'] = 'hostname,ignored' + for k in ('hostname',): + my_returns[k] = base64.b64encode(my_returns[k]) + + dsrc = self._get_ds(mockdata=my_returns) + ret = dsrc.get_data() + self.assertTrue(ret) + self.assertEquals(MOCK_RETURNS['hostname'], + dsrc.metadata['local-hostname']) + self.assertEquals(MOCK_RETURNS['user-data'], dsrc.userdata_raw) + + def test_userdata(self): + dsrc = self._get_ds(mockdata=MOCK_RETURNS) + ret = dsrc.get_data() + self.assertTrue(ret) + self.assertEquals(MOCK_RETURNS['user-data'], dsrc.userdata_raw) + + def test_disable_iptables_flag(self): + dsrc = self._get_ds(mockdata=MOCK_RETURNS) + ret = dsrc.get_data() + self.assertTrue(ret) + self.assertEquals(MOCK_RETURNS['disable_iptables_flag'], + dsrc.metadata['iptables_disable']) + + def test_motd_sys_info(self): + dsrc = self._get_ds(mockdata=MOCK_RETURNS) + ret = dsrc.get_data() + self.assertTrue(ret) + self.assertEquals(MOCK_RETURNS['enable_motd_sys_info'], + dsrc.metadata['motd_sys_info']) + + +def apply_patches(patches): + ret = [] + for (ref, name, replace) in patches: + if replace is None: + continue + orig = getattr(ref, name) + setattr(ref, name, replace) + ret.append((ref, name, orig)) + return ret diff --git a/tests/unittests/test_handler/test_handler_apt_configure.py b/tests/unittests/test_handler/test_handler_apt_configure.py new file mode 100644 index 00000000..203dd2aa --- /dev/null +++ b/tests/unittests/test_handler/test_handler_apt_configure.py @@ -0,0 +1,106 @@ +from mocker import MockerTestCase + +from cloudinit import util + +from cloudinit.config import cc_apt_configure + +import os +import re + + +class TestAptProxyConfig(MockerTestCase): + def setUp(self): + super(TestAptProxyConfig, self).setUp() + self.tmp = self.makeDir() + self.pfile = os.path.join(self.tmp, "proxy.cfg") + self.cfile = os.path.join(self.tmp, "config.cfg") + + def _search_apt_config(self, contents, ptype, value): + print( + r"acquire::%s::proxy\s+[\"']%s[\"'];\n" % (ptype, value), + contents, "flags=re.IGNORECASE") + return(re.search( + r"acquire::%s::proxy\s+[\"']%s[\"'];\n" % (ptype, value), + contents, flags=re.IGNORECASE)) + + def test_apt_proxy_written(self): + cfg = {'apt_proxy': 'myproxy'} + cc_apt_configure.apply_apt_config(cfg, self.pfile, self.cfile) + + self.assertTrue(os.path.isfile(self.pfile)) + self.assertFalse(os.path.isfile(self.cfile)) + + contents = str(util.read_file_or_url(self.pfile)) + self.assertTrue(self._search_apt_config(contents, "http", "myproxy")) + + def test_apt_http_proxy_written(self): + cfg = {'apt_http_proxy': 'myproxy'} + cc_apt_configure.apply_apt_config(cfg, self.pfile, self.cfile) + + self.assertTrue(os.path.isfile(self.pfile)) + self.assertFalse(os.path.isfile(self.cfile)) + + contents = str(util.read_file_or_url(self.pfile)) + self.assertTrue(self._search_apt_config(contents, "http", "myproxy")) + + def test_apt_all_proxy_written(self): + cfg = {'apt_http_proxy': 'myproxy_http_proxy', + 'apt_https_proxy': 'myproxy_https_proxy', + 'apt_ftp_proxy': 'myproxy_ftp_proxy'} + + values = {'http': cfg['apt_http_proxy'], + 'https': cfg['apt_https_proxy'], + 'ftp': cfg['apt_ftp_proxy'], + } + + cc_apt_configure.apply_apt_config(cfg, self.pfile, self.cfile) + + self.assertTrue(os.path.isfile(self.pfile)) + self.assertFalse(os.path.isfile(self.cfile)) + + contents = str(util.read_file_or_url(self.pfile)) + + for ptype, pval in values.iteritems(): + self.assertTrue(self._search_apt_config(contents, ptype, pval)) + + def test_proxy_deleted(self): + util.write_file(self.cfile, "content doesnt matter") + cc_apt_configure.apply_apt_config({}, self.pfile, self.cfile) + self.assertFalse(os.path.isfile(self.pfile)) + self.assertFalse(os.path.isfile(self.cfile)) + + def test_proxy_replaced(self): + util.write_file(self.cfile, "content doesnt matter") + cc_apt_configure.apply_apt_config({'apt_proxy': "foo"}, + self.pfile, self.cfile) + self.assertTrue(os.path.isfile(self.pfile)) + contents = str(util.read_file_or_url(self.pfile)) + self.assertTrue(self._search_apt_config(contents, "http", "foo")) + + def test_config_written(self): + payload = 'this is my apt config' + cfg = {'apt_config': payload} + + cc_apt_configure.apply_apt_config(cfg, self.pfile, self.cfile) + + self.assertTrue(os.path.isfile(self.cfile)) + self.assertFalse(os.path.isfile(self.pfile)) + + self.assertEqual(str(util.read_file_or_url(self.cfile)), payload) + + def test_config_replaced(self): + util.write_file(self.pfile, "content doesnt matter") + cc_apt_configure.apply_apt_config({'apt_config': "foo"}, + self.pfile, self.cfile) + self.assertTrue(os.path.isfile(self.cfile)) + self.assertEqual(str(util.read_file_or_url(self.cfile)), "foo") + + def test_config_deleted(self): + # if no 'apt_config' is provided, delete any previously written file + util.write_file(self.pfile, "content doesnt matter") + cc_apt_configure.apply_apt_config({}, self.pfile, self.cfile) + self.assertFalse(os.path.isfile(self.pfile)) + self.assertFalse(os.path.isfile(self.cfile)) + + +# vi: ts=4 expandtab diff --git a/tests/unittests/test_handler/test_handler_growpart.py b/tests/unittests/test_handler/test_handler_growpart.py new file mode 100644 index 00000000..c0497e08 --- /dev/null +++ b/tests/unittests/test_handler/test_handler_growpart.py @@ -0,0 +1,258 @@ +from mocker import MockerTestCase + +from cloudinit import cloud +from cloudinit import util + +from cloudinit.config import cc_growpart + +import errno +import logging +import os +import re +import unittest + +# growpart: +# mode: auto # off, on, auto, 'growpart', 'parted' +# devices: ['root'] + +HELP_PARTED_NO_RESIZE = """ +Usage: parted [OPTION]... [DEVICE [COMMAND [PARAMETERS]...]...] +Apply COMMANDs with PARAMETERS to DEVICE. If no COMMAND(s) are given, run in +interactive mode. + +OPTIONs: +<SNIP> + +COMMANDs: +<SNIP> + quit exit program + rescue START END rescue a lost partition near START + and END + resize NUMBER START END resize partition NUMBER and its file + system + rm NUMBER delete partition NUMBER +<SNIP> +Report bugs to bug-parted@gnu.org +""" + +HELP_PARTED_RESIZE = """ +Usage: parted [OPTION]... [DEVICE [COMMAND [PARAMETERS]...]...] +Apply COMMANDs with PARAMETERS to DEVICE. If no COMMAND(s) are given, run in +interactive mode. + +OPTIONs: +<SNIP> + +COMMANDs: +<SNIP> + quit exit program + rescue START END rescue a lost partition near START + and END + resize NUMBER START END resize partition NUMBER and its file + system + resizepart NUMBER END resize partition NUMBER + rm NUMBER delete partition NUMBER +<SNIP> +Report bugs to bug-parted@gnu.org +""" + +HELP_GROWPART_RESIZE = """ +growpart disk partition + rewrite partition table so that partition takes up all the space it can + options: + -h | --help print Usage and exit +<SNIP> + -u | --update R update the the kernel partition table info after growing + this requires kernel support and 'partx --update' + R is one of: + - 'auto' : [default] update partition if possible +<SNIP> + Example: + - growpart /dev/sda 1 + Resize partition 1 on /dev/sda +""" + +HELP_GROWPART_NO_RESIZE = """ +growpart disk partition + rewrite partition table so that partition takes up all the space it can + options: + -h | --help print Usage and exit +<SNIP> + Example: + - growpart /dev/sda 1 + Resize partition 1 on /dev/sda +""" + + +class TestDisabled(MockerTestCase): + def setUp(self): + super(TestDisabled, self).setUp() + self.name = "growpart" + self.cloud_init = None + self.log = logging.getLogger("TestDisabled") + self.args = [] + + self.handle = cc_growpart.handle + + def test_mode_off(self): + #Test that nothing is done if mode is off. + + # this really only verifies that resizer_factory isn't called + config = {'growpart': {'mode': 'off'}} + self.mocker.replace(cc_growpart.resizer_factory, + passthrough=False) + self.mocker.replay() + + self.handle(self.name, config, self.cloud_init, self.log, self.args) + + +class TestConfig(MockerTestCase): + def setUp(self): + super(TestConfig, self).setUp() + self.name = "growpart" + self.paths = None + self.cloud = cloud.Cloud(None, self.paths, None, None, None) + self.log = logging.getLogger("TestConfig") + self.args = [] + os.environ = {} + + self.cloud_init = None + self.handle = cc_growpart.handle + + # Order must be correct + self.mocker.order() + + @unittest.skip("until LP: #1212444 fixed") + def test_no_resizers_auto_is_fine(self): + subp = self.mocker.replace(util.subp, passthrough=False) + subp(['parted', '--help'], env={'LANG': 'C'}) + self.mocker.result((HELP_PARTED_NO_RESIZE, "")) + subp(['growpart', '--help'], env={'LANG': 'C'}) + self.mocker.result((HELP_GROWPART_NO_RESIZE, "")) + self.mocker.replay() + + config = {'growpart': {'mode': 'auto'}} + self.handle(self.name, config, self.cloud_init, self.log, self.args) + + def test_no_resizers_mode_growpart_is_exception(self): + subp = self.mocker.replace(util.subp, passthrough=False) + subp(['growpart', '--help'], env={'LANG': 'C'}) + self.mocker.result((HELP_GROWPART_NO_RESIZE, "")) + self.mocker.replay() + + config = {'growpart': {'mode': "growpart"}} + self.assertRaises(ValueError, self.handle, self.name, config, + self.cloud_init, self.log, self.args) + + @unittest.skip("until LP: #1212444 fixed") + def test_mode_auto_prefers_parted(self): + subp = self.mocker.replace(util.subp, passthrough=False) + subp(['parted', '--help'], env={'LANG': 'C'}) + self.mocker.result((HELP_PARTED_RESIZE, "")) + self.mocker.replay() + + ret = cc_growpart.resizer_factory(mode="auto") + self.assertTrue(isinstance(ret, cc_growpart.ResizeParted)) + + def test_handle_with_no_growpart_entry(self): + #if no 'growpart' entry in config, then mode=auto should be used + + myresizer = object() + + factory = self.mocker.replace(cc_growpart.resizer_factory, + passthrough=False) + rsdevs = self.mocker.replace(cc_growpart.resize_devices, + passthrough=False) + factory("auto") + self.mocker.result(myresizer) + rsdevs(myresizer, ["/"]) + self.mocker.result((("/", cc_growpart.RESIZE.CHANGED, "my-message",),)) + self.mocker.replay() + + try: + orig_resizers = cc_growpart.RESIZERS + cc_growpart.RESIZERS = (('mysizer', object),) + self.handle(self.name, {}, self.cloud_init, self.log, self.args) + finally: + cc_growpart.RESIZERS = orig_resizers + + +class TestResize(MockerTestCase): + def setUp(self): + super(TestResize, self).setUp() + self.name = "growpart" + self.log = logging.getLogger("TestResize") + + # Order must be correct + self.mocker.order() + + def test_simple_devices(self): + #test simple device list + # this patches out devent2dev, os.stat, and device_part_info + # so in the end, doesn't test a lot + devs = ["/dev/XXda1", "/dev/YYda2"] + devstat_ret = Bunch(st_mode=25008, st_ino=6078, st_dev=5L, + st_nlink=1, st_uid=0, st_gid=6, st_size=0, + st_atime=0, st_mtime=0, st_ctime=0) + enoent = ["/dev/NOENT"] + real_stat = os.stat + resize_calls = [] + + class myresizer(object): + def resize(self, diskdev, partnum, partdev): + resize_calls.append((diskdev, partnum, partdev)) + if partdev == "/dev/YYda2": + return (1024, 2048) + return (1024, 1024) # old size, new size + + def mystat(path): + if path in devs: + return devstat_ret + if path in enoent: + e = OSError("%s: does not exist" % path) + e.errno = errno.ENOENT + raise e + return real_stat(path) + + try: + opinfo = cc_growpart.device_part_info + cc_growpart.device_part_info = simple_device_part_info + os.stat = mystat + + resized = cc_growpart.resize_devices(myresizer(), devs + enoent) + + def find(name, res): + for f in res: + if f[0] == name: + return f + return None + + self.assertEqual(cc_growpart.RESIZE.NOCHANGE, + find("/dev/XXda1", resized)[1]) + self.assertEqual(cc_growpart.RESIZE.CHANGED, + find("/dev/YYda2", resized)[1]) + self.assertEqual(cc_growpart.RESIZE.SKIPPED, + find(enoent[0], resized)[1]) + #self.assertEqual(resize_calls, + #[("/dev/XXda", "1", "/dev/XXda1"), + #("/dev/YYda", "2", "/dev/YYda2")]) + finally: + cc_growpart.device_part_info = opinfo + os.stat = real_stat + + +def simple_device_part_info(devpath): + # simple stupid return (/dev/vda, 1) for /dev/vda + ret = re.search("([^0-9]*)([0-9]*)$", devpath) + x = (ret.group(1), ret.group(2)) + return x + + +class Bunch: + st_mode = None # fix pylint complaint + + def __init__(self, **kwds): + self.__dict__.update(kwds) + + +# vi: ts=4 expandtab diff --git a/tests/unittests/test_handler/test_handler_locale.py b/tests/unittests/test_handler/test_handler_locale.py new file mode 100644 index 00000000..72ad00fd --- /dev/null +++ b/tests/unittests/test_handler/test_handler_locale.py @@ -0,0 +1,64 @@ +# Copyright (C) 2013 Hewlett-Packard Development Company, L.P. +# +# Author: Juerg Haefliger <juerg.haefliger@hp.com> +# +# Based on test_handler_set_hostname.py +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 3, as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +from cloudinit.config import cc_locale + +from cloudinit import cloud +from cloudinit import distros +from cloudinit import helpers +from cloudinit import util + +from cloudinit.sources import DataSourceNoCloud + +from tests.unittests import helpers as t_help + +from configobj import ConfigObj + +from StringIO import StringIO + +import logging + +LOG = logging.getLogger(__name__) + + +class TestLocale(t_help.FilesystemMockingTestCase): + def setUp(self): + super(TestLocale, self).setUp() + self.new_root = self.makeDir(prefix="unittest_") + + def _get_cloud(self, distro): + self.patchUtils(self.new_root) + paths = helpers.Paths({}) + + cls = distros.fetch(distro) + d = cls(distro, {}, paths) + ds = DataSourceNoCloud.DataSourceNoCloud({}, d, paths) + cc = cloud.Cloud(ds, paths, {}, d, None) + return cc + + def test_set_locale_sles(self): + + cfg = { + 'locale': 'My.Locale', + } + cc = self._get_cloud('sles') + cc_locale.handle('cc_locale', cfg, cc, LOG, []) + + contents = util.load_file('/etc/sysconfig/language') + n_cfg = ConfigObj(StringIO(contents)) + self.assertEquals({'RC_LANG': cfg['locale']}, dict(n_cfg)) diff --git a/tests/unittests/test_handler/test_handler_set_hostname.py b/tests/unittests/test_handler/test_handler_set_hostname.py index a1aba62f..6344ec0c 100644 --- a/tests/unittests/test_handler/test_handler_set_hostname.py +++ b/tests/unittests/test_handler/test_handler_set_hostname.py @@ -35,7 +35,6 @@ class TestHostname(t_help.FilesystemMockingTestCase): ds = None cc = cloud.Cloud(ds, paths, {}, distro, None) self.patchUtils(self.tmp) - self.patchOS(self.tmp) cc_set_hostname.handle('cc_set_hostname', cfg, cc, LOG, []) contents = util.load_file("/etc/sysconfig/network") @@ -56,3 +55,16 @@ class TestHostname(t_help.FilesystemMockingTestCase): cfg, cc, LOG, []) contents = util.load_file("/etc/hostname") self.assertEquals('blah', contents.strip()) + + def test_write_hostname_sles(self): + cfg = { + 'hostname': 'blah.blah.blah.suse.com', + } + distro = self._fetch_distro('sles') + paths = helpers.Paths({}) + ds = None + cc = cloud.Cloud(ds, paths, {}, distro, None) + self.patchUtils(self.tmp) + cc_set_hostname.handle('cc_set_hostname', cfg, cc, LOG, []) + contents = util.load_file("/etc/HOSTNAME") + self.assertEquals('blah', contents.strip()) diff --git a/tests/unittests/test_handler/test_handler_timezone.py b/tests/unittests/test_handler/test_handler_timezone.py new file mode 100644 index 00000000..40b69773 --- /dev/null +++ b/tests/unittests/test_handler/test_handler_timezone.py @@ -0,0 +1,75 @@ +# Copyright (C) 2013 Hewlett-Packard Development Company, L.P. +# +# Author: Juerg Haefliger <juerg.haefliger@hp.com> +# +# Based on test_handler_set_hostname.py +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 3, as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +from cloudinit.config import cc_timezone + +from cloudinit import cloud +from cloudinit import distros +from cloudinit import helpers +from cloudinit import util + +from cloudinit.sources import DataSourceNoCloud + +from tests.unittests import helpers as t_help + +from configobj import ConfigObj + +from StringIO import StringIO + +import logging + +LOG = logging.getLogger(__name__) + + +class TestTimezone(t_help.FilesystemMockingTestCase): + def setUp(self): + super(TestTimezone, self).setUp() + self.new_root = self.makeDir(prefix="unittest_") + + def _get_cloud(self, distro): + self.patchUtils(self.new_root) + self.patchOS(self.new_root) + + paths = helpers.Paths({}) + + cls = distros.fetch(distro) + d = cls(distro, {}, paths) + ds = DataSourceNoCloud.DataSourceNoCloud({}, d, paths) + cc = cloud.Cloud(ds, paths, {}, d, None) + return cc + + def test_set_timezone_sles(self): + + cfg = { + 'timezone': 'Tatooine/Bestine', + } + cc = self._get_cloud('sles') + + # Create a dummy timezone file + dummy_contents = '0123456789abcdefgh' + util.write_file('/usr/share/zoneinfo/%s' % cfg['timezone'], + dummy_contents) + + cc_timezone.handle('cc_timezone', cfg, cc, LOG, []) + + contents = util.load_file('/etc/sysconfig/clock') + n_cfg = ConfigObj(StringIO(contents)) + self.assertEquals({'TIMEZONE': cfg['timezone']}, dict(n_cfg)) + + contents = util.load_file('/etc/localtime') + self.assertEquals(dummy_contents, contents.strip()) diff --git a/tests/unittests/test_merging.py b/tests/unittests/test_merging.py index 0037b966..486b9158 100644 --- a/tests/unittests/test_merging.py +++ b/tests/unittests/test_merging.py @@ -1,62 +1,255 @@ -from mocker import MockerTestCase +from tests.unittests import helpers +from cloudinit.handlers import cloud_config +from cloudinit.handlers import (CONTENT_START, CONTENT_END) + +from cloudinit import helpers as c_helpers from cloudinit import util +import collections +import glob +import os +import random +import re +import string # pylint: disable=W0402 + +SOURCE_PAT = "source*.*yaml" +EXPECTED_PAT = "expected%s.yaml" +TYPES = [long, int, dict, str, list, tuple, None] + + +def _old_mergedict(src, cand): + """ + Merge values from C{cand} into C{src}. + If C{src} has a key C{cand} will not override. + Nested dictionaries are merged recursively. + """ + if isinstance(src, dict) and isinstance(cand, dict): + for (k, v) in cand.iteritems(): + if k not in src: + src[k] = v + else: + src[k] = _old_mergedict(src[k], v) + return src + + +def _old_mergemanydict(*args): + out = {} + for a in args: + out = _old_mergedict(out, a) + return out + + +def _random_str(rand): + base = '' + for _i in xrange(rand.randint(1, 2 ** 8)): + base += rand.choice(string.letters + string.digits) + return base + + +class _NoMoreException(Exception): + pass + + +def _make_dict(current_depth, max_depth, rand): + if current_depth >= max_depth: + raise _NoMoreException() + if current_depth == 0: + t = dict + else: + t = rand.choice(TYPES) + base = None + if t in [None]: + return base + if t in [dict, list, tuple]: + if t in [dict]: + amount = rand.randint(0, 5) + keys = [_random_str(rand) for _i in xrange(0, amount)] + base = {} + for k in keys: + try: + base[k] = _make_dict(current_depth + 1, max_depth, rand) + except _NoMoreException: + pass + elif t in [list, tuple]: + base = [] + amount = rand.randint(0, 5) + for _i in xrange(0, amount): + try: + base.append(_make_dict(current_depth + 1, max_depth, rand)) + except _NoMoreException: + pass + if t in [tuple]: + base = tuple(base) + elif t in [long, int]: + base = rand.randint(0, 2 ** 8) + elif t in [str]: + base = _random_str(rand) + return base + + +def make_dict(max_depth, seed=None): + max_depth = max(1, max_depth) + rand = random.Random(seed) + return _make_dict(0, max_depth, rand) + + +class TestSimpleRun(helpers.ResourceUsingTestCase): + def _load_merge_files(self): + merge_root = self.resourceLocation('merge_sources') + tests = [] + source_ids = collections.defaultdict(list) + expected_files = {} + for fn in glob.glob(os.path.join(merge_root, SOURCE_PAT)): + base_fn = os.path.basename(fn) + file_id = re.match(r"source(\d+)\-(\d+)[.]yaml", base_fn) + if not file_id: + raise IOError("File %s does not have a numeric identifier" + % (fn)) + file_id = int(file_id.group(1)) + source_ids[file_id].append(fn) + expected_fn = os.path.join(merge_root, EXPECTED_PAT % (file_id)) + if not os.path.isfile(expected_fn): + raise IOError("No expected file found at %s" % (expected_fn)) + expected_files[file_id] = expected_fn + for i in sorted(source_ids.keys()): + source_file_contents = [] + for fn in sorted(source_ids[i]): + source_file_contents.append([fn, util.load_file(fn)]) + expected = util.load_yaml(util.load_file(expected_files[i])) + entry = [source_file_contents, [expected, expected_files[i]]] + tests.append(entry) + return tests + + def test_seed_runs(self): + test_dicts = [] + for i in range(1, 50): + base_dicts = [] + for j in range(1, 50): + base_dicts.append(make_dict(5, i * j)) + test_dicts.append(base_dicts) + for test in test_dicts: + c = _old_mergemanydict(*test) + d = util.mergemanydict(test) + self.assertEquals(c, d) + + def test_merge_cc_samples(self): + tests = self._load_merge_files() + paths = c_helpers.Paths({}) + cc_handler = cloud_config.CloudConfigPartHandler(paths) + cc_handler.cloud_fn = None + for (payloads, (expected_merge, expected_fn)) in tests: + cc_handler.handle_part(None, CONTENT_START, None, + None, None, None) + merging_fns = [] + for (fn, contents) in payloads: + cc_handler.handle_part(None, None, "%s.yaml" % (fn), + contents, None, {}) + merging_fns.append(fn) + merged_buf = cc_handler.cloud_buf + cc_handler.handle_part(None, CONTENT_END, None, + None, None, None) + fail_msg = "Equality failure on checking %s with %s: %s != %s" + fail_msg = fail_msg % (expected_fn, + ",".join(merging_fns), merged_buf, + expected_merge) + self.assertEquals(expected_merge, merged_buf, msg=fail_msg) + + def test_compat_merges_dict(self): + a = { + '1': '2', + 'b': 'c', + } + b = { + 'b': 'e', + } + c = _old_mergedict(a, b) + d = util.mergemanydict([a, b]) + self.assertEquals(c, d) + + def test_compat_merges_dict2(self): + a = { + 'Blah': 1, + 'Blah2': 2, + 'Blah3': 3, + } + b = { + 'Blah': 1, + 'Blah2': 2, + 'Blah3': [1], + } + c = _old_mergedict(a, b) + d = util.mergemanydict([a, b]) + self.assertEquals(c, d) + + def test_compat_merges_list(self): + a = {'b': [1, 2, 3]} + b = {'b': [4, 5]} + c = {'b': [6, 7]} + e = _old_mergemanydict(a, b, c) + f = util.mergemanydict([a, b, c]) + self.assertEquals(e, f) + + def test_compat_merges_str(self): + a = {'b': "hi"} + b = {'b': "howdy"} + c = {'b': "hallo"} + e = _old_mergemanydict(a, b, c) + f = util.mergemanydict([a, b, c]) + self.assertEquals(e, f) + + def test_compat_merge_sub_dict(self): + a = { + '1': '2', + 'b': { + 'f': 'g', + 'e': 'c', + 'h': 'd', + 'hh': { + '1': 2, + }, + } + } + b = { + 'b': { + 'e': 'c', + 'hh': { + '3': 4, + } + } + } + c = _old_mergedict(a, b) + d = util.mergemanydict([a, b]) + self.assertEquals(c, d) + + def test_compat_merge_sub_dict2(self): + a = { + '1': '2', + 'b': { + 'f': 'g', + } + } + b = { + 'b': { + 'e': 'c', + } + } + c = _old_mergedict(a, b) + d = util.mergemanydict([a, b]) + self.assertEquals(c, d) -class TestMergeDict(MockerTestCase): - def test_simple_merge(self): - """Test simple non-conflict merge.""" - source = {"key1": "value1"} - candidate = {"key2": "value2"} - result = util.mergedict(source, candidate) - self.assertEqual({"key1": "value1", "key2": "value2"}, result) - - def test_nested_merge(self): - """Test nested merge.""" - source = {"key1": {"key1.1": "value1.1"}} - candidate = {"key1": {"key1.2": "value1.2"}} - result = util.mergedict(source, candidate) - self.assertEqual( - {"key1": {"key1.1": "value1.1", "key1.2": "value1.2"}}, result) - - def test_merge_does_not_override(self): - """Test that candidate doesn't override source.""" - source = {"key1": "value1", "key2": "value2"} - candidate = {"key1": "value2", "key2": "NEW VALUE"} - result = util.mergedict(source, candidate) - self.assertEqual(source, result) - - def test_empty_candidate(self): - """Test empty candidate doesn't change source.""" - source = {"key": "value"} - candidate = {} - result = util.mergedict(source, candidate) - self.assertEqual(source, result) - - def test_empty_source(self): - """Test empty source is replaced by candidate.""" - source = {} - candidate = {"key": "value"} - result = util.mergedict(source, candidate) - self.assertEqual(candidate, result) - - def test_non_dict_candidate(self): - """Test non-dict candidate is discarded.""" - source = {"key": "value"} - candidate = "not a dict" - result = util.mergedict(source, candidate) - self.assertEqual(source, result) - - def test_non_dict_source(self): - """Test non-dict source is not modified with a dict candidate.""" - source = "not a dict" - candidate = {"key": "value"} - result = util.mergedict(source, candidate) - self.assertEqual(source, result) - - def test_neither_dict(self): - """Test if neither candidate or source is dict source wins.""" - source = "source" - candidate = "candidate" - result = util.mergedict(source, candidate) - self.assertEqual(source, result) + def test_compat_merge_sub_list(self): + a = { + '1': '2', + 'b': { + 'f': ['1'], + } + } + b = { + 'b': { + 'f': [], + } + } + c = _old_mergedict(a, b) + d = util.mergemanydict([a, b]) + self.assertEquals(c, d) diff --git a/tests/unittests/test_sshutil.py b/tests/unittests/test_sshutil.py new file mode 100644 index 00000000..d8662cac --- /dev/null +++ b/tests/unittests/test_sshutil.py @@ -0,0 +1,101 @@ +from cloudinit import ssh_util +from unittest import TestCase + + +VALID_CONTENT = { + 'dsa': ( + "AAAAB3NzaC1kc3MAAACBAIrjOQSlSea19bExXBMBKBvcLhBoVvNBjCppNzllipF" + "W4jgIOMcNanULRrZGjkOKat6MWJNetSbV1E6IOFDQ16rQgsh/OvYU9XhzM8seLa" + "A21VszZuhIV7/2DE3vxu7B54zVzueG1O1Deq6goQCRGWBUnqO2yluJiG4HzrnDa" + "jzRAAAAFQDMPO96qXd4F5A+5b2f2MO7SpVomQAAAIBpC3K2zIbDLqBBs1fn7rsv" + "KcJvwihdlVjG7UXsDB76P2GNqVG+IlYPpJZ8TO/B/fzTMtrdXp9pSm9OY1+BgN4" + "REsZ2WNcvfgY33aWaEM+ieCcQigvxrNAF2FTVcbUIIxAn6SmHuQSWrLSfdHc8H7" + "hsrgeUPPdzjBD/cv2ZmqwZ1AAAAIAplIsScrJut5wJMgyK1JG0Kbw9JYQpLe95P" + "obB069g8+mYR8U0fysmTEdR44mMu0VNU5E5OhTYoTGfXrVrkR134LqFM2zpVVbE" + "JNDnIqDHxTkc6LY2vu8Y2pQ3/bVnllZZOda2oD5HQ7ovygQa6CH+fbaZHbdDUX/" + "5z7u2rVAlDw==" + ), + 'ecdsa': ( + "AAAAE2VjZHNhLXNoYTItbmlzdHAyNTYAAAAIbmlzdHAyNTYAAABBBITrGBB3cgJ" + "J7fPxvtMW9H3oRisNpJ3OAslxZeyP7I0A9BPAW0RQIwHVtVnM7zrp4nI+JLZov/" + "Ql7lc2leWL7CY=" + ), + 'rsa': ( + "AAAAB3NzaC1yc2EAAAABIwAAAQEA3I7VUf2l5gSn5uavROsc5HRDpZdQueUq5oz" + "emNSj8T7enqKHOEaFoU2VoPgGEWC9RyzSQVeyD6s7APMcE82EtmW4skVEgEGSbD" + "c1pvxzxtchBj78hJP6Cf5TCMFSXw+Fz5rF1dR23QDbN1mkHs7adr8GW4kSWqU7Q" + "7NDwfIrJJtO7Hi42GyXtvEONHbiRPOe8stqUly7MvUoN+5kfjBM8Qqpfl2+FNhT" + "YWpMfYdPUnE7u536WqzFmsaqJctz3gBxH9Ex7dFtrxR4qiqEr9Qtlu3xGn7Bw07" + "/+i1D+ey3ONkZLN+LQ714cgj8fRS4Hj29SCmXp5Kt5/82cD/VN3NtHw==" + ), +} + +TEST_OPTIONS = ("no-port-forwarding,no-agent-forwarding,no-X11-forwarding," + 'command="echo \'Please login as the user \"ubuntu\" rather than the' + 'user \"root\".\';echo;sleep 10"') + + +class TestAuthKeyLineParser(TestCase): + def test_simple_parse(self): + # test key line with common 3 fields (keytype, base64, comment) + parser = ssh_util.AuthKeyLineParser() + for ktype in ['rsa', 'ecdsa', 'dsa']: + content = VALID_CONTENT[ktype] + comment = 'user-%s@host' % ktype + line = ' '.join((ktype, content, comment,)) + key = parser.parse(line) + + self.assertEqual(key.base64, content) + self.assertFalse(key.options) + self.assertEqual(key.comment, comment) + self.assertEqual(key.keytype, ktype) + + def test_parse_no_comment(self): + # test key line with key type and base64 only + parser = ssh_util.AuthKeyLineParser() + for ktype in ['rsa', 'ecdsa', 'dsa']: + content = VALID_CONTENT[ktype] + line = ' '.join((ktype, content,)) + key = parser.parse(line) + + self.assertEqual(key.base64, content) + self.assertFalse(key.options) + self.assertFalse(key.comment) + self.assertEqual(key.keytype, ktype) + + def test_parse_with_keyoptions(self): + # test key line with options in it + parser = ssh_util.AuthKeyLineParser() + options = TEST_OPTIONS + for ktype in ['rsa', 'ecdsa', 'dsa']: + content = VALID_CONTENT[ktype] + comment = 'user-%s@host' % ktype + line = ' '.join((options, ktype, content, comment,)) + key = parser.parse(line) + + self.assertEqual(key.base64, content) + self.assertEqual(key.options, options) + self.assertEqual(key.comment, comment) + self.assertEqual(key.keytype, ktype) + + def test_parse_with_options_passed_in(self): + # test key line with key type and base64 only + parser = ssh_util.AuthKeyLineParser() + + baseline = ' '.join(("rsa", VALID_CONTENT['rsa'], "user@host")) + myopts = "no-port-forwarding,no-agent-forwarding" + + key = parser.parse("allowedopt" + " " + baseline) + self.assertEqual(key.options, "allowedopt") + + key = parser.parse("overridden_opt " + baseline, options=myopts) + self.assertEqual(key.options, myopts) + + def test_parse_invalid_keytype(self): + parser = ssh_util.AuthKeyLineParser() + key = parser.parse(' '.join(["badkeytype", VALID_CONTENT['rsa']])) + + self.assertFalse(key.valid()) + + +# vi: ts=4 expandtab diff --git a/tests/unittests/test_userdata.py b/tests/unittests/test_userdata.py index 82a4c555..b227616c 100644 --- a/tests/unittests/test_userdata.py +++ b/tests/unittests/test_userdata.py @@ -2,19 +2,25 @@ import StringIO +import gzip import logging import os from email.mime.base import MIMEBase +from email.mime.multipart import MIMEMultipart +from email.mime.application import MIMEApplication -from mocker import MockerTestCase - +from cloudinit import handlers +from cloudinit import helpers as c_helpers from cloudinit import log from cloudinit import sources from cloudinit import stages +from cloudinit import util INSTANCE_ID = "i-testing" +from tests.unittests import helpers + class FakeDataSource(sources.DataSource): @@ -26,22 +32,16 @@ class FakeDataSource(sources.DataSource): # FIXME: these tests shouldn't be checking log output?? # Weirddddd... - - -class TestConsumeUserData(MockerTestCase): +class TestConsumeUserData(helpers.FilesystemMockingTestCase): def setUp(self): - MockerTestCase.setUp(self) - # Replace the write so no actual files - # get written out... - self.mock_write = self.mocker.replace("cloudinit.util.write_file", - passthrough=False) + helpers.FilesystemMockingTestCase.setUp(self) self._log = None self._log_file = None self._log_handler = None def tearDown(self): - MockerTestCase.tearDown(self) + helpers.FilesystemMockingTestCase.tearDown(self) if self._log_handler and self._log: self._log.removeHandler(self._log_handler) @@ -53,13 +53,134 @@ class TestConsumeUserData(MockerTestCase): self._log.addHandler(self._log_handler) return log_file + def test_simple_jsonp(self): + blob = ''' +#cloud-config-jsonp +[ + { "op": "add", "path": "/baz", "value": "qux" }, + { "op": "add", "path": "/bar", "value": "qux2" } +] +''' + + ci = stages.Init() + ci.datasource = FakeDataSource(blob) + new_root = self.makeDir() + self.patchUtils(new_root) + self.patchOS(new_root) + ci.fetch() + ci.consume_userdata() + cc_contents = util.load_file(ci.paths.get_ipath("cloud_config")) + cc = util.load_yaml(cc_contents) + self.assertEquals(2, len(cc)) + self.assertEquals('qux', cc['baz']) + self.assertEquals('qux2', cc['bar']) + + def test_mixed_cloud_config(self): + blob_cc = ''' +#cloud-config +a: b +c: d +''' + message_cc = MIMEBase("text", "cloud-config") + message_cc.set_payload(blob_cc) + + blob_jp = ''' +#cloud-config-jsonp +[ + { "op": "replace", "path": "/a", "value": "c" }, + { "op": "remove", "path": "/c" } +] +''' + + message_jp = MIMEBase('text', "cloud-config-jsonp") + message_jp.set_payload(blob_jp) + + message = MIMEMultipart() + message.attach(message_cc) + message.attach(message_jp) + + ci = stages.Init() + ci.datasource = FakeDataSource(str(message)) + new_root = self.makeDir() + self.patchUtils(new_root) + self.patchOS(new_root) + ci.fetch() + ci.consume_userdata() + cc_contents = util.load_file(ci.paths.get_ipath("cloud_config")) + cc = util.load_yaml(cc_contents) + self.assertEquals(1, len(cc)) + self.assertEquals('c', cc['a']) + + def test_merging_cloud_config(self): + blob = ''' +#cloud-config +a: b +e: f +run: + - b + - c +''' + message1 = MIMEBase("text", "cloud-config") + message1.set_payload(blob) + + blob2 = ''' +#cloud-config +a: e +e: g +run: + - stuff + - morestuff +''' + message2 = MIMEBase("text", "cloud-config") + message2['X-Merge-Type'] = ('dict(recurse_array,' + 'recurse_str)+list(append)+str(append)') + message2.set_payload(blob2) + + blob3 = ''' +#cloud-config +e: + - 1 + - 2 + - 3 +p: 1 +''' + message3 = MIMEBase("text", "cloud-config") + message3.set_payload(blob3) + + messages = [message1, message2, message3] + + paths = c_helpers.Paths({}, ds=FakeDataSource('')) + cloud_cfg = handlers.cloud_config.CloudConfigPartHandler(paths) + + new_root = self.makeDir() + self.patchUtils(new_root) + self.patchOS(new_root) + cloud_cfg.handle_part(None, handlers.CONTENT_START, None, None, None, + None) + for i, m in enumerate(messages): + headers = dict(m) + fn = "part-%s" % (i + 1) + payload = m.get_payload(decode=True) + cloud_cfg.handle_part(None, headers['Content-Type'], + fn, payload, None, headers) + cloud_cfg.handle_part(None, handlers.CONTENT_END, None, None, None, + None) + contents = util.load_file(paths.get_ipath('cloud_config')) + contents = util.load_yaml(contents) + self.assertEquals(contents['run'], ['b', 'c', 'stuff', 'morestuff']) + self.assertEquals(contents['a'], 'be') + self.assertEquals(contents['e'], [1, 2, 3]) + self.assertEquals(contents['p'], 1) + def test_unhandled_type_warning(self): """Raw text without magic is ignored but shows warning.""" ci = stages.Init() data = "arbitrary text\n" ci.datasource = FakeDataSource(data) - self.mock_write(ci.paths.get_ipath("cloud_config"), "", 0600) + mock_write = self.mocker.replace("cloudinit.util.write_file", + passthrough=False) + mock_write(ci.paths.get_ipath("cloud_config"), "", 0600) self.mocker.replay() log_file = self.capture_log(logging.WARNING) @@ -69,6 +190,46 @@ class TestConsumeUserData(MockerTestCase): "Unhandled non-multipart (text/x-not-multipart) userdata:", log_file.getvalue()) + def test_mime_gzip_compressed(self): + """Tests that individual message gzip encoding works.""" + + def gzip_part(text): + contents = StringIO.StringIO() + f = gzip.GzipFile(fileobj=contents, mode='w') + f.write(str(text)) + f.flush() + f.close() + return MIMEApplication(contents.getvalue(), 'gzip') + + base_content1 = ''' +#cloud-config +a: 2 +''' + + base_content2 = ''' +#cloud-config +b: 3 +c: 4 +''' + + message = MIMEMultipart('test') + message.attach(gzip_part(base_content1)) + message.attach(gzip_part(base_content2)) + ci = stages.Init() + ci.datasource = FakeDataSource(str(message)) + new_root = self.makeDir() + self.patchUtils(new_root) + self.patchOS(new_root) + ci.fetch() + ci.consume_userdata() + contents = util.load_file(ci.paths.get_ipath("cloud_config")) + contents = util.load_yaml(contents) + self.assertTrue(isinstance(contents, dict)) + self.assertEquals(3, len(contents)) + self.assertEquals(2, contents['a']) + self.assertEquals(3, contents['b']) + self.assertEquals(4, contents['c']) + def test_mime_text_plain(self): """Mime message of type text/plain is ignored but shows warning.""" ci = stages.Init() @@ -76,7 +237,9 @@ class TestConsumeUserData(MockerTestCase): message.set_payload("Just text") ci.datasource = FakeDataSource(message.as_string()) - self.mock_write(ci.paths.get_ipath("cloud_config"), "", 0600) + mock_write = self.mocker.replace("cloudinit.util.write_file", + passthrough=False) + mock_write(ci.paths.get_ipath("cloud_config"), "", 0600) self.mocker.replay() log_file = self.capture_log(logging.WARNING) @@ -93,8 +256,10 @@ class TestConsumeUserData(MockerTestCase): ci.datasource = FakeDataSource(script) outpath = os.path.join(ci.paths.get_ipath_cur("scripts"), "part-001") - self.mock_write(ci.paths.get_ipath("cloud_config"), "", 0600) - self.mock_write(outpath, script, 0700) + mock_write = self.mocker.replace("cloudinit.util.write_file", + passthrough=False) + mock_write(ci.paths.get_ipath("cloud_config"), "", 0600) + mock_write(outpath, script, 0700) self.mocker.replay() log_file = self.capture_log(logging.WARNING) @@ -111,8 +276,10 @@ class TestConsumeUserData(MockerTestCase): ci.datasource = FakeDataSource(message.as_string()) outpath = os.path.join(ci.paths.get_ipath_cur("scripts"), "part-001") - self.mock_write(ci.paths.get_ipath("cloud_config"), "", 0600) - self.mock_write(outpath, script, 0700) + mock_write = self.mocker.replace("cloudinit.util.write_file", + passthrough=False) + mock_write(ci.paths.get_ipath("cloud_config"), "", 0600) + mock_write(outpath, script, 0700) self.mocker.replay() log_file = self.capture_log(logging.WARNING) @@ -129,8 +296,10 @@ class TestConsumeUserData(MockerTestCase): ci.datasource = FakeDataSource(message.as_string()) outpath = os.path.join(ci.paths.get_ipath_cur("scripts"), "part-001") - self.mock_write(outpath, script, 0700) - self.mock_write(ci.paths.get_ipath("cloud_config"), "", 0600) + mock_write = self.mocker.replace("cloudinit.util.write_file", + passthrough=False) + mock_write(outpath, script, 0700) + mock_write(ci.paths.get_ipath("cloud_config"), "", 0600) self.mocker.replay() log_file = self.capture_log(logging.WARNING) diff --git a/tests/unittests/test_util.py b/tests/unittests/test_util.py index 02611581..87415cb5 100644 --- a/tests/unittests/test_util.py +++ b/tests/unittests/test_util.py @@ -1,9 +1,12 @@ +# pylint: disable=C0301 +# the mountinfo data lines are too long import os import stat import yaml from mocker import MockerTestCase from unittest import TestCase +from tests.unittests import helpers from cloudinit import importer from cloudinit import util @@ -248,4 +251,60 @@ class TestLoadYaml(TestCase): myobj) +class TestMountinfoParsing(helpers.ResourceUsingTestCase): + def test_invalid_mountinfo(self): + line = ("20 1 252:1 / / rw,relatime - ext4 /dev/mapper/vg0-root" + "rw,errors=remount-ro,data=ordered") + elements = line.split() + for i in range(len(elements) + 1): + lines = [' '.join(elements[0:i])] + if i < 10: + expected = None + else: + expected = ('/dev/mapper/vg0-root', 'ext4', '/') + self.assertEqual(expected, util.parse_mount_info('/', lines)) + + def test_precise_ext4_root(self): + + lines = self.readResource('mountinfo_precise_ext4.txt').splitlines() + + expected = ('/dev/mapper/vg0-root', 'ext4', '/') + self.assertEqual(expected, util.parse_mount_info('/', lines)) + self.assertEqual(expected, util.parse_mount_info('/usr', lines)) + self.assertEqual(expected, util.parse_mount_info('/usr/bin', lines)) + + expected = ('/dev/md0', 'ext4', '/boot') + self.assertEqual(expected, util.parse_mount_info('/boot', lines)) + self.assertEqual(expected, util.parse_mount_info('/boot/grub', lines)) + + expected = ('/dev/mapper/vg0-root', 'ext4', '/') + self.assertEqual(expected, util.parse_mount_info('/home', lines)) + self.assertEqual(expected, util.parse_mount_info('/home/me', lines)) + + expected = ('tmpfs', 'tmpfs', '/run') + self.assertEqual(expected, util.parse_mount_info('/run', lines)) + + expected = ('none', 'tmpfs', '/run/lock') + self.assertEqual(expected, util.parse_mount_info('/run/lock', lines)) + + def test_raring_btrfs_root(self): + lines = self.readResource('mountinfo_raring_btrfs.txt').splitlines() + + expected = ('/dev/vda1', 'btrfs', '/') + self.assertEqual(expected, util.parse_mount_info('/', lines)) + self.assertEqual(expected, util.parse_mount_info('/usr', lines)) + self.assertEqual(expected, util.parse_mount_info('/usr/bin', lines)) + self.assertEqual(expected, util.parse_mount_info('/boot', lines)) + self.assertEqual(expected, util.parse_mount_info('/boot/grub', lines)) + + expected = ('/dev/vda1', 'btrfs', '/home') + self.assertEqual(expected, util.parse_mount_info('/home', lines)) + self.assertEqual(expected, util.parse_mount_info('/home/me', lines)) + + expected = ('tmpfs', 'tmpfs', '/run') + self.assertEqual(expected, util.parse_mount_info('/run', lines)) + + expected = ('none', 'tmpfs', '/run/lock') + self.assertEqual(expected, util.parse_mount_info('/run/lock', lines)) + # vi: ts=4 expandtab |