diff options
author | Scott Moser <smoser@brickies.net> | 2016-08-23 16:48:42 -0400 |
---|---|---|
committer | Scott Moser <smoser@brickies.net> | 2016-08-23 16:48:42 -0400 |
commit | 86e2614b6c3db342aa5a6590e91b9e459bbcb484 (patch) | |
tree | 6996805b91a0c1c31f3afea3a689348bf760de63 /tests | |
parent | c937c66dd0d1ad7b73dcc2efb5eb4c16b05f4479 (diff) | |
parent | 9f7ce5f090689b664ffce7e0b4ac78bfeafd1a79 (diff) | |
download | vyos-cloud-init-86e2614b6c3db342aa5a6590e91b9e459bbcb484.tar.gz vyos-cloud-init-86e2614b6c3db342aa5a6590e91b9e459bbcb484.zip |
merge trunk at 0.7.7~bzr1245
Diffstat (limited to 'tests')
51 files changed, 1705 insertions, 761 deletions
diff --git a/tests/unittests/helpers.py b/tests/unittests/helpers.py index 7f4b8784..8d46a8bf 100644 --- a/tests/unittests/helpers.py +++ b/tests/unittests/helpers.py @@ -2,18 +2,16 @@ from __future__ import print_function import functools import os -import sys import shutil +import sys import tempfile import unittest +import mock import six +import unittest2 try: - from unittest import mock -except ImportError: - import mock -try: from contextlib import ExitStack except ImportError: from contextlib2 import ExitStack @@ -21,6 +19,9 @@ except ImportError: from cloudinit import helpers as ch from cloudinit import util +# Used for skipping tests +SkipTest = unittest2.SkipTest + # Used for detecting different python versions PY2 = False PY26 = False @@ -44,78 +45,6 @@ else: if _PY_MINOR == 4 and _PY_MICRO < 3: FIX_HTTPRETTY = True -if PY26: - # For now add these on, taken from python 2.7 + slightly adjusted. Drop - # all this once Python 2.6 is dropped as a minimum requirement. - class TestCase(unittest.TestCase): - def setUp(self): - super(TestCase, self).setUp() - self.__all_cleanups = ExitStack() - - def tearDown(self): - self.__all_cleanups.close() - unittest.TestCase.tearDown(self) - - def addCleanup(self, function, *args, **kws): - self.__all_cleanups.callback(function, *args, **kws) - - def assertIs(self, expr1, expr2, msg=None): - if expr1 is not expr2: - standardMsg = '%r is not %r' % (expr1, expr2) - self.fail(self._formatMessage(msg, standardMsg)) - - def assertIn(self, member, container, msg=None): - if member not in container: - standardMsg = '%r not found in %r' % (member, container) - self.fail(self._formatMessage(msg, standardMsg)) - - def assertNotIn(self, member, container, msg=None): - if member in container: - standardMsg = '%r unexpectedly found in %r' - standardMsg = standardMsg % (member, container) - self.fail(self._formatMessage(msg, standardMsg)) - - def assertIsNone(self, value, msg=None): - if value is not None: - standardMsg = '%r is not None' - standardMsg = standardMsg % (value) - self.fail(self._formatMessage(msg, standardMsg)) - - def assertIsInstance(self, obj, cls, msg=None): - """Same as self.assertTrue(isinstance(obj, cls)), with a nicer - default message.""" - if not isinstance(obj, cls): - standardMsg = '%s is not an instance of %r' % (repr(obj), cls) - self.fail(self._formatMessage(msg, standardMsg)) - - def assertDictContainsSubset(self, expected, actual, msg=None): - missing = [] - mismatched = [] - for k, v in expected.items(): - if k not in actual: - missing.append(k) - elif actual[k] != v: - mismatched.append('%r, expected: %r, actual: %r' - % (k, v, actual[k])) - - if len(missing) == 0 and len(mismatched) == 0: - return - - standardMsg = '' - if missing: - standardMsg = 'Missing: %r' % ','.join(m for m in missing) - if mismatched: - if standardMsg: - standardMsg += '; ' - standardMsg += 'Mismatched values: %s' % ','.join(mismatched) - - self.fail(self._formatMessage(msg, standardMsg)) - - -else: - class TestCase(unittest.TestCase): - pass - # Makes the old path start # with new base instead of whatever @@ -151,6 +80,10 @@ def retarget_many_wrapper(new_base, am, old_func): return wrapper +class TestCase(unittest2.TestCase): + pass + + class ResourceUsingTestCase(TestCase): def setUp(self): super(ResourceUsingTestCase, self).setUp() @@ -180,13 +113,12 @@ class ResourceUsingTestCase(TestCase): with open(where, 'r') as fh: return fh.read() - def getCloudPaths(self): + def getCloudPaths(self, ds=None): tmpdir = tempfile.mkdtemp() self.addCleanup(shutil.rmtree, tmpdir) - cp = ch.Paths({ - 'cloud_dir': tmpdir, - 'templates_dir': self.resourceLocation(), - }) + cp = ch.Paths({'cloud_dir': tmpdir, + 'templates_dir': self.resourceLocation()}, + ds=ds) return cp diff --git a/tests/unittests/test__init__.py b/tests/unittests/test__init__.py index 153f1658..0154784a 100644 --- a/tests/unittests/test__init__.py +++ b/tests/unittests/test__init__.py @@ -1,16 +1,6 @@ import os import shutil import tempfile -import unittest - -try: - from unittest import mock -except ImportError: - import mock -try: - from contextlib import ExitStack -except ImportError: - from contextlib2 import ExitStack from cloudinit import handlers from cloudinit import helpers @@ -18,7 +8,7 @@ from cloudinit import settings from cloudinit import url_helper from cloudinit import util -from .helpers import TestCase +from .helpers import TestCase, ExitStack, mock class FakeModule(handlers.Handler): @@ -99,9 +89,10 @@ class TestWalkerHandleHandler(TestCase): self.assertEqual(self.data['handlercount'], 0) -class TestHandlerHandlePart(unittest.TestCase): +class TestHandlerHandlePart(TestCase): def setUp(self): + super(TestHandlerHandlePart, self).setUp() self.data = "fake data" self.ctype = "fake ctype" self.filename = "fake filename" @@ -177,7 +168,7 @@ class TestHandlerHandlePart(unittest.TestCase): self.data, self.ctype, self.filename, self.payload) -class TestCmdlineUrl(unittest.TestCase): +class TestCmdlineUrl(TestCase): def test_invalid_content(self): url = "http://example.com/foo" key = "mykey" diff --git a/tests/unittests/test_builtin_handlers.py b/tests/unittests/test_builtin_handlers.py index ad32d0b2..dea908d7 100644 --- a/tests/unittests/test_builtin_handlers.py +++ b/tests/unittests/test_builtin_handlers.py @@ -40,7 +40,7 @@ class TestBuiltins(test_helpers.FilesystemMockingTestCase): 'test.conf', 'blah', freq) h.handle_part('', handlers.CONTENT_END, None, None, None) - self.assertEquals(0, len(os.listdir(up_root))) + self.assertEqual(0, len(os.listdir(up_root))) def test_upstart_frequency_single(self): # files should be written out when frequency is ! per-instance @@ -67,7 +67,7 @@ class TestBuiltins(test_helpers.FilesystemMockingTestCase): h.handle_part('', handlers.CONTENT_END, None, None, None) - self.assertEquals(len(os.listdir('/etc/upstart')), 1) + self.assertEqual(len(os.listdir('/etc/upstart')), 1) mockobj.assert_called_once_with( ['initctl', 'reload-configuration'], capture=False) diff --git a/tests/unittests/test_cli.py b/tests/unittests/test_cli.py index ed863399..5fa252f7 100644 --- a/tests/unittests/test_cli.py +++ b/tests/unittests/test_cli.py @@ -1,17 +1,10 @@ -import imp -import os -import sys import six from . import helpers as test_helpers -try: - from unittest import mock -except ImportError: - import mock +from cloudinit.cmd import main as cli - -BIN_CLOUDINIT = "bin/cloud-init" +mock = test_helpers.mock class TestCLI(test_helpers.FilesystemMockingTestCase): @@ -20,35 +13,22 @@ class TestCLI(test_helpers.FilesystemMockingTestCase): super(TestCLI, self).setUp() self.stderr = six.StringIO() self.patchStdoutAndStderr(stderr=self.stderr) - self.sys_exit = mock.MagicMock() - self.patched_funcs.enter_context( - mock.patch.object(sys, 'exit', self.sys_exit)) - - def _call_main(self): - self.patched_funcs.enter_context( - mock.patch.object(sys, 'argv', ['cloud-init'])) - cli = imp.load_module( - 'cli', open(BIN_CLOUDINIT), '', ('', 'r', imp.PY_SOURCE)) + + def _call_main(self, sysv_args=None): + if not sysv_args: + sysv_args = ['cloud-init'] try: - return cli.main() - except: - pass + return cli.main(sysv_args=sysv_args) + except SystemExit as e: + return e.code - @test_helpers.skipIf(not os.path.isfile(BIN_CLOUDINIT), "no bin/cloudinit") def test_no_arguments_shows_usage(self): - self._call_main() - self.assertIn('usage: cloud-init', self.stderr.getvalue()) - - @test_helpers.skipIf(not os.path.isfile(BIN_CLOUDINIT), "no bin/cloudinit") - def test_no_arguments_exits_2(self): exit_code = self._call_main() - if self.sys_exit.call_count: - self.assertEqual(mock.call(2), self.sys_exit.call_args) - else: - self.assertEqual(2, exit_code) + self.assertIn('usage: cloud-init', self.stderr.getvalue()) + self.assertEqual(2, exit_code) - @test_helpers.skipIf(not os.path.isfile(BIN_CLOUDINIT), "no bin/cloudinit") def test_no_arguments_shows_error_message(self): - self._call_main() + exit_code = self._call_main() self.assertIn('cloud-init: error: too few arguments', self.stderr.getvalue()) + self.assertEqual(2, exit_code) diff --git a/tests/unittests/test_cs_util.py b/tests/unittests/test_cs_util.py index d7273035..56c9ce9e 100644 --- a/tests/unittests/test_cs_util.py +++ b/tests/unittests/test_cs_util.py @@ -1,21 +1,9 @@ from __future__ import print_function -import sys -import unittest +from . import helpers as test_helpers from cloudinit.cs_utils import Cepko -try: - skip = unittest.skip -except AttributeError: - # Python 2.6. Doesn't have to be high fidelity. - def skip(reason): - def decorator(func): - def wrapper(*args, **kws): - print(reason, file=sys.stderr) - return wrapper - return decorator - SERVER_CONTEXT = { "cpu": 1000, @@ -43,18 +31,9 @@ class CepkoMock(Cepko): # 2015-01-22 BAW: This test is completely useless because it only ever tests # the CepkoMock object. Even in its original form, I don't think it ever # touched the underlying Cepko class methods. -@skip('This test is completely useless') -class CepkoResultTests(unittest.TestCase): +class CepkoResultTests(test_helpers.TestCase): def setUp(self): - pass - # self.mocked = self.mocker.replace("cloudinit.cs_utils.Cepko", - # spec=CepkoMock, - # count=False, - # passthrough=False) - # self.mocked() - # self.mocker.result(CepkoMock()) - # self.mocker.replay() - # self.c = Cepko() + raise test_helpers.SkipTest('This test is completely useless') def test_getitem(self): result = self.c.all() diff --git a/tests/unittests/test_data.py b/tests/unittests/test_data.py index 9c1ec1d4..13db8a4c 100644 --- a/tests/unittests/test_data.py +++ b/tests/unittests/test_data.py @@ -106,9 +106,9 @@ class TestConsumeUserData(helpers.FilesystemMockingTestCase): ci.consume_data() cc_contents = util.load_file(ci.paths.get_ipath("cloud_config")) cc = util.load_yaml(cc_contents) - self.assertEquals(2, len(cc)) - self.assertEquals('qux', cc['baz']) - self.assertEquals('qux2', cc['bar']) + self.assertEqual(2, len(cc)) + self.assertEqual('qux', cc['baz']) + self.assertEqual('qux2', cc['bar']) def test_simple_jsonp_vendor_and_user(self): # test that user-data wins over vendor @@ -145,9 +145,9 @@ class TestConsumeUserData(helpers.FilesystemMockingTestCase): (_which_ran, _failures) = mods.run_section('cloud_init_modules') cfg = mods.cfg self.assertIn('vendor_data', cfg) - self.assertEquals('qux', cfg['baz']) - self.assertEquals('qux2', cfg['bar']) - self.assertEquals('quxC', cfg['foo']) + self.assertEqual('qux', cfg['baz']) + self.assertEqual('qux2', cfg['bar']) + self.assertEqual('quxC', cfg['foo']) def test_simple_jsonp_no_vendor_consumed(self): # make sure that vendor data is not consumed @@ -184,8 +184,8 @@ class TestConsumeUserData(helpers.FilesystemMockingTestCase): mods = stages.Modules(initer) (_which_ran, _failures) = mods.run_section('cloud_init_modules') cfg = mods.cfg - self.assertEquals('qux', cfg['baz']) - self.assertEquals('qux2', cfg['bar']) + self.assertEqual('qux', cfg['baz']) + self.assertEqual('qux2', cfg['bar']) self.assertNotIn('foo', cfg) def test_mixed_cloud_config(self): @@ -222,8 +222,8 @@ c: d ci.consume_data() cc_contents = util.load_file(ci.paths.get_ipath("cloud_config")) cc = util.load_yaml(cc_contents) - self.assertEquals(1, len(cc)) - self.assertEquals('c', cc['a']) + self.assertEqual(1, len(cc)) + self.assertEqual('c', cc['a']) def test_vendor_user_yaml_cloud_config(self): vendor_blob = ''' @@ -263,8 +263,8 @@ run: (_which_ran, _failures) = mods.run_section('cloud_init_modules') cfg = mods.cfg self.assertIn('vendor_data', cfg) - self.assertEquals('c', cfg['a']) - self.assertEquals('user', cfg['name']) + self.assertEqual('c', cfg['a']) + self.assertEqual('user', cfg['name']) self.assertNotIn('x', cfg['run']) self.assertNotIn('y', cfg['run']) self.assertIn('z', cfg['run']) @@ -358,10 +358,10 @@ p: 1 None) contents = util.load_file(paths.get_ipath('cloud_config')) contents = util.load_yaml(contents) - self.assertEquals(contents['run'], ['b', 'c', 'stuff', 'morestuff']) - self.assertEquals(contents['a'], 'be') - self.assertEquals(contents['e'], [1, 2, 3]) - self.assertEquals(contents['p'], 1) + self.assertEqual(contents['run'], ['b', 'c', 'stuff', 'morestuff']) + self.assertEqual(contents['a'], 'be') + self.assertEqual(contents['e'], [1, 2, 3]) + self.assertEqual(contents['p'], 1) def test_unhandled_type_warning(self): """Raw text without magic is ignored but shows warning.""" @@ -411,10 +411,10 @@ c: 4 contents = util.load_file(ci.paths.get_ipath("cloud_config")) contents = util.load_yaml(contents) self.assertTrue(isinstance(contents, dict)) - self.assertEquals(3, len(contents)) - self.assertEquals(2, contents['a']) - self.assertEquals(3, contents['b']) - self.assertEquals(4, contents['c']) + self.assertEqual(3, len(contents)) + self.assertEqual(2, contents['a']) + self.assertEqual(3, contents['b']) + self.assertEqual(4, contents['c']) def test_mime_text_plain(self): """Mime message of type text/plain is ignored but shows warning.""" @@ -449,8 +449,7 @@ c: 4 mockobj.assert_has_calls([ mock.call(outpath, script, 0o700), - mock.call(ci.paths.get_ipath("cloud_config"), "", 0o600), - ]) + mock.call(ci.paths.get_ipath("cloud_config"), "", 0o600)]) def test_mime_text_x_shellscript(self): """Mime message of type text/x-shellscript is treated as script.""" @@ -470,8 +469,7 @@ c: 4 mockobj.assert_has_calls([ mock.call(outpath, script, 0o700), - mock.call(ci.paths.get_ipath("cloud_config"), "", 0o600), - ]) + mock.call(ci.paths.get_ipath("cloud_config"), "", 0o600)]) def test_mime_text_plain_shell(self): """Mime type text/plain starting #!/bin/sh is treated as script.""" @@ -491,8 +489,7 @@ c: 4 mockobj.assert_has_calls([ mock.call(outpath, script, 0o700), - mock.call(ci.paths.get_ipath("cloud_config"), "", 0o600), - ]) + mock.call(ci.paths.get_ipath("cloud_config"), "", 0o600)]) def test_mime_application_octet_stream(self): """Mime type application/octet-stream is ignored but shows warning.""" @@ -560,3 +557,20 @@ class TestUDProcess(helpers.ResourceUsingTestCase): ud_proc = ud.UserDataProcessor(self.getCloudPaths()) message = ud_proc.process(msg) self.assertTrue(count_messages(message) == 1) + + +class TestConvertString(helpers.TestCase): + def test_handles_binary_non_utf8_decodable(self): + blob = b'\x32\x99' + msg = ud.convert_string(blob) + self.assertEqual(blob, msg.get_payload(decode=True)) + + def test_handles_binary_utf8_decodable(self): + blob = b'\x32\x32' + msg = ud.convert_string(blob) + self.assertEqual(blob, msg.get_payload(decode=True)) + + def test_handle_headers(self): + text = "hi mom" + msg = ud.convert_string(text) + self.assertEqual(text, msg.get_payload(decode=False)) diff --git a/tests/unittests/test_datasource/test_altcloud.py b/tests/unittests/test_datasource/test_altcloud.py index 85759c68..12966563 100644 --- a/tests/unittests/test_datasource/test_altcloud.py +++ b/tests/unittests/test_datasource/test_altcloud.py @@ -134,7 +134,7 @@ class TestGetCloudType(TestCase): ''' util.read_dmi_data = _dmi_data('RHEV') dsrc = DataSourceAltCloud({}, None, self.paths) - self.assertEquals('RHEV', dsrc.get_cloud_type()) + self.assertEqual('RHEV', dsrc.get_cloud_type()) def test_vsphere(self): ''' @@ -143,7 +143,7 @@ class TestGetCloudType(TestCase): ''' util.read_dmi_data = _dmi_data('VMware Virtual Platform') dsrc = DataSourceAltCloud({}, None, self.paths) - self.assertEquals('VSPHERE', dsrc.get_cloud_type()) + self.assertEqual('VSPHERE', dsrc.get_cloud_type()) def test_unknown(self): ''' @@ -152,7 +152,7 @@ class TestGetCloudType(TestCase): ''' util.read_dmi_data = _dmi_data('Unrecognized Platform') dsrc = DataSourceAltCloud({}, None, self.paths) - self.assertEquals('UNKNOWN', dsrc.get_cloud_type()) + self.assertEqual('UNKNOWN', dsrc.get_cloud_type()) class TestGetDataCloudInfoFile(TestCase): @@ -187,7 +187,7 @@ class TestGetDataCloudInfoFile(TestCase): _write_cloud_info_file('RHEV') dsrc = DataSourceAltCloud({}, None, self.paths) dsrc.user_data_rhevm = lambda: True - self.assertEquals(True, dsrc.get_data()) + self.assertEqual(True, dsrc.get_data()) def test_vsphere(self): '''Success Test module get_data() forcing VSPHERE.''' @@ -195,7 +195,7 @@ class TestGetDataCloudInfoFile(TestCase): _write_cloud_info_file('VSPHERE') dsrc = DataSourceAltCloud({}, None, self.paths) dsrc.user_data_vsphere = lambda: True - self.assertEquals(True, dsrc.get_data()) + self.assertEqual(True, dsrc.get_data()) def test_fail_rhev(self): '''Failure Test module get_data() forcing RHEV.''' @@ -203,7 +203,7 @@ class TestGetDataCloudInfoFile(TestCase): _write_cloud_info_file('RHEV') dsrc = DataSourceAltCloud({}, None, self.paths) dsrc.user_data_rhevm = lambda: False - self.assertEquals(False, dsrc.get_data()) + self.assertEqual(False, dsrc.get_data()) def test_fail_vsphere(self): '''Failure Test module get_data() forcing VSPHERE.''' @@ -211,14 +211,14 @@ class TestGetDataCloudInfoFile(TestCase): _write_cloud_info_file('VSPHERE') dsrc = DataSourceAltCloud({}, None, self.paths) dsrc.user_data_vsphere = lambda: False - self.assertEquals(False, dsrc.get_data()) + self.assertEqual(False, dsrc.get_data()) def test_unrecognized(self): '''Failure Test module get_data() forcing unrecognized.''' _write_cloud_info_file('unrecognized') dsrc = DataSourceAltCloud({}, None, self.paths) - self.assertEquals(False, dsrc.get_data()) + self.assertEqual(False, dsrc.get_data()) class TestGetDataNoCloudInfoFile(TestCase): @@ -250,7 +250,7 @@ class TestGetDataNoCloudInfoFile(TestCase): util.read_dmi_data = _dmi_data('RHEV Hypervisor') dsrc = DataSourceAltCloud({}, None, self.paths) dsrc.user_data_rhevm = lambda: True - self.assertEquals(True, dsrc.get_data()) + self.assertEqual(True, dsrc.get_data()) def test_vsphere_no_cloud_file(self): '''Test No cloud info file module get_data() forcing VSPHERE.''' @@ -258,14 +258,14 @@ class TestGetDataNoCloudInfoFile(TestCase): util.read_dmi_data = _dmi_data('VMware Virtual Platform') dsrc = DataSourceAltCloud({}, None, self.paths) dsrc.user_data_vsphere = lambda: True - self.assertEquals(True, dsrc.get_data()) + self.assertEqual(True, dsrc.get_data()) def test_failure_no_cloud_file(self): '''Test No cloud info file module get_data() forcing unrecognized.''' util.read_dmi_data = _dmi_data('Unrecognized Platform') dsrc = DataSourceAltCloud({}, None, self.paths) - self.assertEquals(False, dsrc.get_data()) + self.assertEqual(False, dsrc.get_data()) class TestUserDataRhevm(TestCase): @@ -305,7 +305,7 @@ class TestUserDataRhevm(TestCase): dsrc = DataSourceAltCloud({}, None, self.paths) - self.assertEquals(False, dsrc.user_data_rhevm()) + self.assertEqual(False, dsrc.user_data_rhevm()) def test_modprobe_fails(self): '''Test user_data_rhevm() where modprobe fails.''' @@ -315,7 +315,7 @@ class TestUserDataRhevm(TestCase): dsrc = DataSourceAltCloud({}, None, self.paths) - self.assertEquals(False, dsrc.user_data_rhevm()) + self.assertEqual(False, dsrc.user_data_rhevm()) def test_no_modprobe_cmd(self): '''Test user_data_rhevm() with no modprobe command.''' @@ -325,7 +325,7 @@ class TestUserDataRhevm(TestCase): dsrc = DataSourceAltCloud({}, None, self.paths) - self.assertEquals(False, dsrc.user_data_rhevm()) + self.assertEqual(False, dsrc.user_data_rhevm()) def test_udevadm_fails(self): '''Test user_data_rhevm() where udevadm fails.''' @@ -335,7 +335,7 @@ class TestUserDataRhevm(TestCase): dsrc = DataSourceAltCloud({}, None, self.paths) - self.assertEquals(False, dsrc.user_data_rhevm()) + self.assertEqual(False, dsrc.user_data_rhevm()) def test_no_udevadm_cmd(self): '''Test user_data_rhevm() with no udevadm command.''' @@ -345,7 +345,7 @@ class TestUserDataRhevm(TestCase): dsrc = DataSourceAltCloud({}, None, self.paths) - self.assertEquals(False, dsrc.user_data_rhevm()) + self.assertEqual(False, dsrc.user_data_rhevm()) class TestUserDataVsphere(TestCase): @@ -380,7 +380,7 @@ class TestUserDataVsphere(TestCase): dsrc = DataSourceAltCloud({}, None, self.paths) - self.assertEquals(False, dsrc.user_data_vsphere()) + self.assertEqual(False, dsrc.user_data_vsphere()) class TestReadUserDataCallback(TestCase): @@ -408,8 +408,8 @@ class TestReadUserDataCallback(TestCase): def test_callback_both(self): '''Test read_user_data_callback() with both files.''' - self.assertEquals('test user data', - read_user_data_callback(self.mount_dir)) + self.assertEqual('test user data', + read_user_data_callback(self.mount_dir)) def test_callback_dc(self): '''Test read_user_data_callback() with only DC file.''' @@ -418,8 +418,8 @@ class TestReadUserDataCallback(TestCase): dc_file=False, non_dc_file=True) - self.assertEquals('test user data', - read_user_data_callback(self.mount_dir)) + self.assertEqual('test user data', + read_user_data_callback(self.mount_dir)) def test_callback_non_dc(self): '''Test read_user_data_callback() with only non-DC file.''' @@ -428,14 +428,14 @@ class TestReadUserDataCallback(TestCase): dc_file=True, non_dc_file=False) - self.assertEquals('test user data', - read_user_data_callback(self.mount_dir)) + self.assertEqual('test user data', + read_user_data_callback(self.mount_dir)) def test_callback_none(self): '''Test read_user_data_callback() no files are found.''' _remove_user_data_files(self.mount_dir) - self.assertEquals(None, read_user_data_callback(self.mount_dir)) + self.assertEqual(None, read_user_data_callback(self.mount_dir)) def force_arch(arch=None): diff --git a/tests/unittests/test_datasource/test_azure.py b/tests/unittests/test_datasource/test_azure.py index 444e2799..e90e903c 100644 --- a/tests/unittests/test_datasource/test_azure.py +++ b/tests/unittests/test_datasource/test_azure.py @@ -1,24 +1,16 @@ from cloudinit import helpers from cloudinit.util import b64e, decode_binary, load_file from cloudinit.sources import DataSourceAzure -from ..helpers import TestCase, populate_dir -try: - from unittest import mock -except ImportError: - import mock -try: - from contextlib import ExitStack -except ImportError: - from contextlib2 import ExitStack +from ..helpers import TestCase, populate_dir, mock, ExitStack, PY26, SkipTest import crypt import os -import stat -import yaml import shutil +import stat import tempfile import xml.etree.ElementTree as ET +import yaml def construct_valid_ovf_env(data=None, pubkeys=None, userdata=None): @@ -83,6 +75,8 @@ class TestAzureDataSource(TestCase): def setUp(self): super(TestAzureDataSource, self).setUp() + if PY26: + raise SkipTest("Does not work on python 2.6") self.tmp = tempfile.mkdtemp() self.addCleanup(shutil.rmtree, self.tmp) @@ -165,7 +159,7 @@ class TestAzureDataSource(TestCase): def tags_equal(x, y): for x_tag, x_val in x.items(): y_val = y.get(x_val.tag) - self.assertEquals(x_val.text, y_val.text) + self.assertEqual(x_val.text, y_val.text) old_cnt = create_tag_index(oxml) new_cnt = create_tag_index(nxml) @@ -354,8 +348,8 @@ class TestAzureDataSource(TestCase): self.assertTrue(ret) cfg = dsrc.get_config_obj() - self.assertEquals(dsrc.device_name_to_device("ephemeral0"), - "/dev/sdb") + self.assertEqual(dsrc.device_name_to_device("ephemeral0"), + "/dev/sdb") assert 'disk_setup' in cfg assert 'fs_setup' in cfg self.assertIsInstance(cfg['disk_setup'], dict) @@ -404,15 +398,15 @@ class TestAzureDataSource(TestCase): self.xml_notequals(data['ovfcontent'], on_disk_ovf) # Make sure that the redacted password on disk is not used by CI - self.assertNotEquals(dsrc.cfg.get('password'), - DataSourceAzure.DEF_PASSWD_REDACTION) + self.assertNotEqual(dsrc.cfg.get('password'), + DataSourceAzure.DEF_PASSWD_REDACTION) # Make sure that the password was really encrypted et = ET.fromstring(on_disk_ovf) for elem in et.iter(): if 'UserPassword' in elem.tag: - self.assertEquals(DataSourceAzure.DEF_PASSWD_REDACTION, - elem.text) + self.assertEqual(DataSourceAzure.DEF_PASSWD_REDACTION, + elem.text) def test_ovf_env_arrives_in_waagent_dir(self): xml = construct_valid_ovf_env(data={}, userdata="FOODATA") diff --git a/tests/unittests/test_datasource/test_azure_helper.py b/tests/unittests/test_datasource/test_azure_helper.py index 1134199b..65202ff0 100644 --- a/tests/unittests/test_datasource/test_azure_helper.py +++ b/tests/unittests/test_datasource/test_azure_helper.py @@ -1,17 +1,8 @@ import os from cloudinit.sources.helpers import azure as azure_helper -from ..helpers import TestCase -try: - from unittest import mock -except ImportError: - import mock - -try: - from contextlib import ExitStack -except ImportError: - from contextlib2 import ExitStack +from ..helpers import ExitStack, mock, TestCase GOAL_STATE_TEMPLATE = """\ @@ -70,7 +61,7 @@ class TestFindEndpoint(TestCase): def test_missing_special_azure_line(self): self.load_file.return_value = '' - self.assertRaises(Exception, + self.assertRaises(ValueError, azure_helper.WALinuxAgentShim.find_endpoint) @staticmethod @@ -287,6 +278,7 @@ class TestOpenSSLManager(TestCase): self.subp.side_effect = capture_directory manager = azure_helper.OpenSSLManager() self.assertEqual(manager.tmpdir, subp_directory['path']) + manager.clean_up() @mock.patch.object(azure_helper, 'cd', mock.MagicMock()) @mock.patch.object(azure_helper.tempfile, 'mkdtemp', mock.MagicMock()) diff --git a/tests/unittests/test_datasource/test_cloudsigma.py b/tests/unittests/test_datasource/test_cloudsigma.py index 772d189a..2a42ce0c 100644 --- a/tests/unittests/test_datasource/test_cloudsigma.py +++ b/tests/unittests/test_datasource/test_cloudsigma.py @@ -1,4 +1,5 @@ # coding: utf-8 + import copy from cloudinit.cs_utils import Cepko @@ -6,7 +7,6 @@ from cloudinit.sources import DataSourceCloudSigma from .. import helpers as test_helpers - SERVER_CONTEXT = { "cpu": 1000, "cpus_instead_of_cores": False, diff --git a/tests/unittests/test_datasource/test_cloudstack.py b/tests/unittests/test_datasource/test_cloudstack.py index 656d80d1..b1aab17b 100644 --- a/tests/unittests/test_datasource/test_cloudstack.py +++ b/tests/unittests/test_datasource/test_cloudstack.py @@ -1,15 +1,7 @@ from cloudinit import helpers from cloudinit.sources.DataSourceCloudStack import DataSourceCloudStack -from ..helpers import TestCase -try: - from unittest import mock -except ImportError: - import mock -try: - from contextlib import ExitStack -except ImportError: - from contextlib2 import ExitStack +from ..helpers import TestCase, mock, ExitStack class TestCloudStackPasswordFetching(TestCase): diff --git a/tests/unittests/test_datasource/test_configdrive.py b/tests/unittests/test_datasource/test_configdrive.py index 89b15f54..18551b92 100644 --- a/tests/unittests/test_datasource/test_configdrive.py +++ b/tests/unittests/test_datasource/test_configdrive.py @@ -5,22 +5,15 @@ import shutil import six import tempfile -try: - from unittest import mock -except ImportError: - import mock -try: - from contextlib import ExitStack -except ImportError: - from contextlib2 import ExitStack - from cloudinit import helpers +from cloudinit.net import eni +from cloudinit.net import network_state from cloudinit import settings from cloudinit.sources import DataSourceConfigDrive as ds from cloudinit.sources.helpers import openstack from cloudinit import util -from ..helpers import TestCase +from ..helpers import TestCase, ExitStack, mock PUBKEY = u'ssh-rsa AAAAB3NzaC1....sIkJhq8wdX+4I3A4cYbYP ubuntu@server-460\n' @@ -73,7 +66,7 @@ NETWORK_DATA = { 'type': 'ovs', 'mtu': None, 'id': 'tap2f88d109-5b'}, {'vif_id': '1a5382f8-04c5-4d75-ab98-d666c1ef52cc', 'ethernet_mac_address': 'fa:16:3e:05:30:fe', - 'type': 'ovs', 'mtu': None, 'id': 'tap1a5382f8-04'} + 'type': 'ovs', 'mtu': None, 'id': 'tap1a5382f8-04', 'name': 'nic0'} ], 'networks': [ {'link': 'tap2ecc7709-b3', 'type': 'ipv4_dhcp', @@ -88,6 +81,35 @@ NETWORK_DATA = { ] } +NETWORK_DATA_2 = { + "services": [ + {"type": "dns", "address": "1.1.1.191"}, + {"type": "dns", "address": "1.1.1.4"}], + "networks": [ + {"network_id": "d94bbe94-7abc-48d4-9c82-4628ea26164a", "type": "ipv4", + "netmask": "255.255.255.248", "link": "eth0", + "routes": [{"netmask": "0.0.0.0", "network": "0.0.0.0", + "gateway": "2.2.2.9"}], + "ip_address": "2.2.2.10", "id": "network0-ipv4"}, + {"network_id": "ca447c83-6409-499b-aaef-6ad1ae995348", "type": "ipv4", + "netmask": "255.255.255.224", "link": "eth1", + "routes": [], "ip_address": "3.3.3.24", "id": "network1-ipv4"}], + "links": [ + {"ethernet_mac_address": "fa:16:3e:dd:50:9a", "mtu": 1500, + "type": "vif", "id": "eth0", "vif_id": "vif-foo1"}, + {"ethernet_mac_address": "fa:16:3e:a8:14:69", "mtu": 1500, + "type": "vif", "id": "eth1", "vif_id": "vif-foo2"}] +} + + +KNOWN_MACS = { + 'fa:16:3e:69:b0:58': 'enp0s1', + 'fa:16:3e:d4:57:ad': 'enp0s2', + 'fa:16:3e:dd:50:9a': 'foo1', + 'fa:16:3e:a8:14:69': 'foo2', + 'fa:16:3e:ed:9a:59': 'foo3', +} + CFG_DRIVE_FILES_V2 = { 'ec2/2009-04-04/meta-data.json': json.dumps(EC2_META), 'ec2/2009-04-04/user-data': USER_DATA, @@ -151,7 +173,7 @@ class TestConfigDriveDataSource(TestCase): mock.patch.object(os.path, 'exists', side_effect=exists_side_effect())) device = cfg_ds.device_name_to_device(name) - self.assertEquals(dev_name, device) + self.assertEqual(dev_name, device) find_mock.assert_called_once_with(mock.ANY) self.assertEqual(exists_mock.call_count, 2) @@ -179,7 +201,7 @@ class TestConfigDriveDataSource(TestCase): mock.patch.object(os.path, 'exists', return_value=True)) device = cfg_ds.device_name_to_device(name) - self.assertEquals(dev_name, device) + self.assertEqual(dev_name, device) find_mock.assert_called_once_with(mock.ANY) exists_mock.assert_called_once_with(mock.ANY) @@ -214,7 +236,7 @@ class TestConfigDriveDataSource(TestCase): with mock.patch.object(os.path, 'exists', side_effect=exists_side_effect()): device = cfg_ds.device_name_to_device(name) - self.assertEquals(dev_name, device) + self.assertEqual(dev_name, device) # We don't assert the call count for os.path.exists() because # not all of the entries in name_tests results in two calls to # that function. Specifically, 'root2k' doesn't seem to call @@ -242,7 +264,7 @@ class TestConfigDriveDataSource(TestCase): for name, dev_name in name_tests.items(): with mock.patch.object(os.path, 'exists', return_value=True): device = cfg_ds.device_name_to_device(name) - self.assertEquals(dev_name, device) + self.assertEqual(dev_name, device) def test_dir_valid(self): """Verify a dir is read as such.""" @@ -348,32 +370,200 @@ class TestConfigDriveDataSource(TestCase): util.find_devs_with = orig_find_devs_with util.is_partition = orig_is_partition - def test_pubkeys_v2(self): + @mock.patch('cloudinit.sources.DataSourceConfigDrive.on_first_boot') + def test_pubkeys_v2(self, on_first_boot): """Verify that public-keys work in config-drive-v2.""" populate_dir(self.tmp, CFG_DRIVE_FILES_V2) myds = cfg_ds_from_dir(self.tmp) self.assertEqual(myds.get_public_ssh_keys(), [OSTACK_META['public_keys']['mykey']]) - def test_network_data_is_found(self): + +class TestNetJson(TestCase): + def setUp(self): + super(TestNetJson, self).setUp() + self.tmp = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, self.tmp) + self.maxDiff = None + + @mock.patch('cloudinit.sources.DataSourceConfigDrive.on_first_boot') + def test_network_data_is_found(self, on_first_boot): """Verify that network_data is present in ds in config-drive-v2.""" populate_dir(self.tmp, CFG_DRIVE_FILES_V2) myds = cfg_ds_from_dir(self.tmp) - self.assertEqual(myds.network_json, NETWORK_DATA) + self.assertIsNotNone(myds.network_json) - def test_network_config_is_converted(self): + @mock.patch('cloudinit.sources.DataSourceConfigDrive.on_first_boot') + def test_network_config_is_converted(self, on_first_boot): """Verify that network_data is converted and present on ds object.""" populate_dir(self.tmp, CFG_DRIVE_FILES_V2) myds = cfg_ds_from_dir(self.tmp) - network_config = ds.convert_network_data(NETWORK_DATA) + network_config = openstack.convert_net_json(NETWORK_DATA, + known_macs=KNOWN_MACS) self.assertEqual(myds.network_config, network_config) + def test_network_config_conversions(self): + """Tests a bunch of input network json and checks the + expected conversions.""" + in_datas = [ + NETWORK_DATA, + { + 'services': [{'type': 'dns', 'address': '172.19.0.12'}], + 'networks': [{ + 'network_id': 'dacd568d-5be6-4786-91fe-750c374b78b4', + 'type': 'ipv4', + 'netmask': '255.255.252.0', + 'link': 'tap1a81968a-79', + 'routes': [{ + 'netmask': '0.0.0.0', + 'network': '0.0.0.0', + 'gateway': '172.19.3.254', + }], + 'ip_address': '172.19.1.34', + 'id': 'network0', + }], + 'links': [{ + 'type': 'bridge', + 'vif_id': '1a81968a-797a-400f-8a80-567f997eb93f', + 'ethernet_mac_address': 'fa:16:3e:ed:9a:59', + 'id': 'tap1a81968a-79', + 'mtu': None, + }], + }, + ] + out_datas = [ + { + 'version': 1, + 'config': [ + { + 'subnets': [{'type': 'dhcp4'}], + 'type': 'physical', + 'mac_address': 'fa:16:3e:69:b0:58', + 'name': 'enp0s1', + 'mtu': None, + }, + { + 'subnets': [{'type': 'dhcp4'}], + 'type': 'physical', + 'mac_address': 'fa:16:3e:d4:57:ad', + 'name': 'enp0s2', + 'mtu': None, + }, + { + 'subnets': [{'type': 'dhcp4'}], + 'type': 'physical', + 'mac_address': 'fa:16:3e:05:30:fe', + 'name': 'nic0', + 'mtu': None, + }, + { + 'type': 'nameserver', + 'address': '199.204.44.24', + }, + { + 'type': 'nameserver', + 'address': '199.204.47.54', + } + ], + + }, + { + 'version': 1, + 'config': [ + { + 'name': 'foo3', + 'mac_address': 'fa:16:3e:ed:9a:59', + 'mtu': None, + 'type': 'physical', + 'subnets': [ + { + 'address': '172.19.1.34', + 'netmask': '255.255.252.0', + 'type': 'static', + 'ipv4': True, + 'routes': [{ + 'gateway': '172.19.3.254', + 'netmask': '0.0.0.0', + 'network': '0.0.0.0', + }], + } + ] + }, + { + 'type': 'nameserver', + 'address': '172.19.0.12', + } + ], + }, + ] + for in_data, out_data in zip(in_datas, out_datas): + conv_data = openstack.convert_net_json(in_data, + known_macs=KNOWN_MACS) + self.assertEqual(out_data, conv_data) + + +class TestConvertNetworkData(TestCase): + def setUp(self): + super(TestConvertNetworkData, self).setUp() + self.tmp = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, self.tmp) + + def _getnames_in_config(self, ncfg): + return set([n['name'] for n in ncfg['config'] + if n['type'] == 'physical']) + + def test_conversion_fills_names(self): + ncfg = openstack.convert_net_json(NETWORK_DATA, known_macs=KNOWN_MACS) + expected = set(['nic0', 'enp0s1', 'enp0s2']) + found = self._getnames_in_config(ncfg) + self.assertEqual(found, expected) + + @mock.patch('cloudinit.net.get_interfaces_by_mac') + def test_convert_reads_system_prefers_name(self, get_interfaces_by_mac): + macs = KNOWN_MACS.copy() + macs.update({'fa:16:3e:05:30:fe': 'foonic1', + 'fa:16:3e:69:b0:58': 'ens1'}) + get_interfaces_by_mac.return_value = macs + + ncfg = openstack.convert_net_json(NETWORK_DATA) + expected = set(['nic0', 'ens1', 'enp0s2']) + found = self._getnames_in_config(ncfg) + self.assertEqual(found, expected) + + def test_convert_raises_value_error_on_missing_name(self): + macs = {'aa:aa:aa:aa:aa:00': 'ens1'} + self.assertRaises(ValueError, openstack.convert_net_json, + NETWORK_DATA, known_macs=macs) + + def test_conversion_with_route(self): + ncfg = openstack.convert_net_json(NETWORK_DATA_2, + known_macs=KNOWN_MACS) + # not the best test, but see that we get a route in the + # network config and that it gets rendered to an ENI file + routes = [] + for n in ncfg['config']: + for s in n.get('subnets', []): + routes.extend(s.get('routes', [])) + self.assertIn( + {'network': '0.0.0.0', 'netmask': '0.0.0.0', 'gateway': '2.2.2.9'}, + routes) + eni_renderer = eni.Renderer() + eni_renderer.render_network_state( + self.tmp, network_state.parse_net_config_data(ncfg)) + with open(os.path.join(self.tmp, "etc", + "network", "interfaces"), 'r') as f: + eni_rendering = f.read() + self.assertIn("route add default gw 2.2.2.9", eni_rendering) + def cfg_ds_from_dir(seed_d): - found = ds.read_config_drive(seed_d) cfg_ds = ds.DataSourceConfigDrive(settings.CFG_BUILTIN, None, helpers.Paths({})) - populate_ds_from_read_config(cfg_ds, seed_d, found) + cfg_ds.seed_dir = seed_d + cfg_ds.known_macs = KNOWN_MACS.copy() + if not cfg_ds.get_data(): + raise RuntimeError("Data source did not extract itself from" + " seed directory %s" % seed_d) return cfg_ds @@ -387,7 +577,8 @@ def populate_ds_from_read_config(cfg_ds, source, results): cfg_ds.userdata_raw = results.get('userdata') cfg_ds.version = results.get('version') cfg_ds.network_json = results.get('networkdata') - cfg_ds._network_config = ds.convert_network_data(cfg_ds.network_json) + cfg_ds._network_config = openstack.convert_net_json( + cfg_ds.network_json, known_macs=KNOWN_MACS) def populate_dir(seed_dir, files): @@ -400,7 +591,6 @@ def populate_dir(seed_dir, files): mode = "w" else: mode = "wb" - with open(path, mode) as fp: fp.write(content) diff --git a/tests/unittests/test_datasource/test_digitalocean.py b/tests/unittests/test_datasource/test_digitalocean.py index 679d1b82..8936a1e3 100644 --- a/tests/unittests/test_datasource/test_digitalocean.py +++ b/tests/unittests/test_datasource/test_digitalocean.py @@ -19,8 +19,8 @@ import re from six.moves.urllib_parse import urlparse -from cloudinit import settings from cloudinit import helpers +from cloudinit import settings from cloudinit.sources import DataSourceDigitalOcean from .. import helpers as test_helpers diff --git a/tests/unittests/test_datasource/test_gce.py b/tests/unittests/test_datasource/test_gce.py index fa714070..6e62a4d2 100644 --- a/tests/unittests/test_datasource/test_gce.py +++ b/tests/unittests/test_datasource/test_gce.py @@ -20,8 +20,8 @@ import re from base64 import b64encode, b64decode from six.moves.urllib_parse import urlparse -from cloudinit import settings from cloudinit import helpers +from cloudinit import settings from cloudinit.sources import DataSourceGCE from .. import helpers as test_helpers @@ -52,7 +52,7 @@ GCE_META_ENCODING = { HEADERS = {'X-Google-Metadata-Request': 'True'} MD_URL_RE = re.compile( - r'http://metadata.google.internal./computeMetadata/v1/.*') + r'http://metadata.google.internal/computeMetadata/v1/.*') def _set_mock_metadata(gce_meta=None): diff --git a/tests/unittests/test_datasource/test_maas.py b/tests/unittests/test_datasource/test_maas.py index 77d15cac..f66f1c6d 100644 --- a/tests/unittests/test_datasource/test_maas.py +++ b/tests/unittests/test_datasource/test_maas.py @@ -104,13 +104,13 @@ class TestMAASDataSource(TestCase): 'meta-data/local-hostname': 'test-hostname', 'meta-data/public-keys': 'test-hostname', 'user-data': b'foodata', - } + } valid_order = [ 'meta-data/local-hostname', 'meta-data/instance-id', 'meta-data/public-keys', 'user-data', - ] + ] my_seed = "http://example.com/xmeta" my_ver = "1999-99-99" my_headers = {'header1': 'value1', 'header2': 'value2'} diff --git a/tests/unittests/test_datasource/test_nocloud.py b/tests/unittests/test_datasource/test_nocloud.py index 2d5fc37c..b0fa1130 100644 --- a/tests/unittests/test_datasource/test_nocloud.py +++ b/tests/unittests/test_datasource/test_nocloud.py @@ -1,22 +1,13 @@ from cloudinit import helpers from cloudinit.sources import DataSourceNoCloud from cloudinit import util -from ..helpers import TestCase, populate_dir +from ..helpers import TestCase, populate_dir, mock, ExitStack import os -import yaml import shutil import tempfile -import unittest -try: - from unittest import mock -except ImportError: - import mock -try: - from contextlib import ExitStack -except ImportError: - from contextlib2 import ExitStack +import yaml class TestNoCloudDataSource(TestCase): @@ -139,7 +130,7 @@ class TestNoCloudDataSource(TestCase): self.assertTrue(ret) -class TestParseCommandLineData(unittest.TestCase): +class TestParseCommandLineData(TestCase): def test_parse_cmdline_data_valid(self): ds_id = "ds=nocloud" diff --git a/tests/unittests/test_datasource/test_openstack.py b/tests/unittests/test_datasource/test_openstack.py index 0aa1ba84..5c8592c5 100644 --- a/tests/unittests/test_datasource/test_openstack.py +++ b/tests/unittests/test_datasource/test_openstack.py @@ -22,8 +22,8 @@ import re from .. import helpers as test_helpers -from six import StringIO from six.moves.urllib.parse import urlparse +from six import StringIO from cloudinit import helpers from cloudinit import settings @@ -135,41 +135,45 @@ def _register_uris(version, ec2_files, ec2_meta, os_files): body=get_request_callback) +def _read_metadata_service(): + return ds.read_metadata_service(BASE_URL, retries=0, timeout=0.1) + + class TestOpenStackDataSource(test_helpers.HttprettyTestCase): VERSION = 'latest' @hp.activate def test_successful(self): _register_uris(self.VERSION, EC2_FILES, EC2_META, OS_FILES) - f = ds.read_metadata_service(BASE_URL) - self.assertEquals(VENDOR_DATA, f.get('vendordata')) - self.assertEquals(CONTENT_0, f['files']['/etc/foo.cfg']) - self.assertEquals(CONTENT_1, f['files']['/etc/bar/bar.cfg']) - self.assertEquals(2, len(f['files'])) - self.assertEquals(USER_DATA, f.get('userdata')) - self.assertEquals(EC2_META, f.get('ec2-metadata')) - self.assertEquals(2, f.get('version')) + f = _read_metadata_service() + self.assertEqual(VENDOR_DATA, f.get('vendordata')) + self.assertEqual(CONTENT_0, f['files']['/etc/foo.cfg']) + self.assertEqual(CONTENT_1, f['files']['/etc/bar/bar.cfg']) + self.assertEqual(2, len(f['files'])) + self.assertEqual(USER_DATA, f.get('userdata')) + self.assertEqual(EC2_META, f.get('ec2-metadata')) + self.assertEqual(2, f.get('version')) metadata = f['metadata'] - self.assertEquals('nova', metadata.get('availability_zone')) - self.assertEquals('sm-foo-test.novalocal', metadata.get('hostname')) - self.assertEquals('sm-foo-test.novalocal', - metadata.get('local-hostname')) - self.assertEquals('sm-foo-test', metadata.get('name')) - self.assertEquals('b0fa911b-69d4-4476-bbe2-1c92bff6535c', - metadata.get('uuid')) - self.assertEquals('b0fa911b-69d4-4476-bbe2-1c92bff6535c', - metadata.get('instance-id')) + self.assertEqual('nova', metadata.get('availability_zone')) + self.assertEqual('sm-foo-test.novalocal', metadata.get('hostname')) + self.assertEqual('sm-foo-test.novalocal', + metadata.get('local-hostname')) + self.assertEqual('sm-foo-test', metadata.get('name')) + self.assertEqual('b0fa911b-69d4-4476-bbe2-1c92bff6535c', + metadata.get('uuid')) + self.assertEqual('b0fa911b-69d4-4476-bbe2-1c92bff6535c', + metadata.get('instance-id')) @hp.activate def test_no_ec2(self): _register_uris(self.VERSION, {}, {}, OS_FILES) - f = ds.read_metadata_service(BASE_URL) - self.assertEquals(VENDOR_DATA, f.get('vendordata')) - self.assertEquals(CONTENT_0, f['files']['/etc/foo.cfg']) - self.assertEquals(CONTENT_1, f['files']['/etc/bar/bar.cfg']) - self.assertEquals(USER_DATA, f.get('userdata')) - self.assertEquals({}, f.get('ec2-metadata')) - self.assertEquals(2, f.get('version')) + f = _read_metadata_service() + self.assertEqual(VENDOR_DATA, f.get('vendordata')) + self.assertEqual(CONTENT_0, f['files']['/etc/foo.cfg']) + self.assertEqual(CONTENT_1, f['files']['/etc/bar/bar.cfg']) + self.assertEqual(USER_DATA, f.get('userdata')) + self.assertEqual({}, f.get('ec2-metadata')) + self.assertEqual(2, f.get('version')) @hp.activate def test_bad_metadata(self): @@ -178,8 +182,7 @@ class TestOpenStackDataSource(test_helpers.HttprettyTestCase): if k.endswith('meta_data.json'): os_files.pop(k, None) _register_uris(self.VERSION, {}, {}, os_files) - self.assertRaises(openstack.NonReadable, ds.read_metadata_service, - BASE_URL) + self.assertRaises(openstack.NonReadable, _read_metadata_service) @hp.activate def test_bad_uuid(self): @@ -190,8 +193,7 @@ class TestOpenStackDataSource(test_helpers.HttprettyTestCase): if k.endswith('meta_data.json'): os_files[k] = json.dumps(os_meta) _register_uris(self.VERSION, {}, {}, os_files) - self.assertRaises(openstack.BrokenMetadata, ds.read_metadata_service, - BASE_URL) + self.assertRaises(openstack.BrokenMetadata, _read_metadata_service) @hp.activate def test_userdata_empty(self): @@ -200,10 +202,10 @@ class TestOpenStackDataSource(test_helpers.HttprettyTestCase): if k.endswith('user_data'): os_files.pop(k, None) _register_uris(self.VERSION, {}, {}, os_files) - f = ds.read_metadata_service(BASE_URL) - self.assertEquals(VENDOR_DATA, f.get('vendordata')) - self.assertEquals(CONTENT_0, f['files']['/etc/foo.cfg']) - self.assertEquals(CONTENT_1, f['files']['/etc/bar/bar.cfg']) + f = _read_metadata_service() + self.assertEqual(VENDOR_DATA, f.get('vendordata')) + self.assertEqual(CONTENT_0, f['files']['/etc/foo.cfg']) + self.assertEqual(CONTENT_1, f['files']['/etc/bar/bar.cfg']) self.assertFalse(f.get('userdata')) @hp.activate @@ -213,9 +215,9 @@ class TestOpenStackDataSource(test_helpers.HttprettyTestCase): if k.endswith('vendor_data.json'): os_files.pop(k, None) _register_uris(self.VERSION, {}, {}, os_files) - f = ds.read_metadata_service(BASE_URL) - self.assertEquals(CONTENT_0, f['files']['/etc/foo.cfg']) - self.assertEquals(CONTENT_1, f['files']['/etc/bar/bar.cfg']) + f = _read_metadata_service() + self.assertEqual(CONTENT_0, f['files']['/etc/foo.cfg']) + self.assertEqual(CONTENT_1, f['files']['/etc/bar/bar.cfg']) self.assertFalse(f.get('vendordata')) @hp.activate @@ -225,8 +227,7 @@ class TestOpenStackDataSource(test_helpers.HttprettyTestCase): if k.endswith('vendor_data.json'): os_files[k] = '{' # some invalid json _register_uris(self.VERSION, {}, {}, os_files) - self.assertRaises(openstack.BrokenMetadata, ds.read_metadata_service, - BASE_URL) + self.assertRaises(openstack.BrokenMetadata, _read_metadata_service) @hp.activate def test_metadata_invalid(self): @@ -235,8 +236,7 @@ class TestOpenStackDataSource(test_helpers.HttprettyTestCase): if k.endswith('meta_data.json'): os_files[k] = '{' # some invalid json _register_uris(self.VERSION, {}, {}, os_files) - self.assertRaises(openstack.BrokenMetadata, ds.read_metadata_service, - BASE_URL) + self.assertRaises(openstack.BrokenMetadata, _read_metadata_service) @hp.activate def test_datasource(self): @@ -245,18 +245,18 @@ class TestOpenStackDataSource(test_helpers.HttprettyTestCase): None, helpers.Paths({})) self.assertIsNone(ds_os.version) - found = ds_os.get_data() + found = ds_os.get_data(timeout=0.1, retries=0) self.assertTrue(found) - self.assertEquals(2, ds_os.version) + self.assertEqual(2, ds_os.version) md = dict(ds_os.metadata) md.pop('instance-id', None) md.pop('local-hostname', None) - self.assertEquals(OSTACK_META, md) - self.assertEquals(EC2_META, ds_os.ec2_metadata) - self.assertEquals(USER_DATA, ds_os.userdata_raw) - self.assertEquals(2, len(ds_os.files)) - self.assertEquals(VENDOR_DATA, ds_os.vendordata_pure) - self.assertEquals(ds_os.vendordata_raw, None) + self.assertEqual(OSTACK_META, md) + self.assertEqual(EC2_META, ds_os.ec2_metadata) + self.assertEqual(USER_DATA, ds_os.userdata_raw) + self.assertEqual(2, len(ds_os.files)) + self.assertEqual(VENDOR_DATA, ds_os.vendordata_pure) + self.assertEqual(ds_os.vendordata_raw, None) @hp.activate def test_bad_datasource_meta(self): @@ -269,7 +269,7 @@ class TestOpenStackDataSource(test_helpers.HttprettyTestCase): None, helpers.Paths({})) self.assertIsNone(ds_os.version) - found = ds_os.get_data() + found = ds_os.get_data(timeout=0.1, retries=0) self.assertFalse(found) self.assertIsNone(ds_os.version) @@ -288,7 +288,7 @@ class TestOpenStackDataSource(test_helpers.HttprettyTestCase): 'timeout': 0, } self.assertIsNone(ds_os.version) - found = ds_os.get_data() + found = ds_os.get_data(timeout=0.1, retries=0) self.assertFalse(found) self.assertIsNone(ds_os.version) @@ -311,7 +311,7 @@ class TestOpenStackDataSource(test_helpers.HttprettyTestCase): 'timeout': 0, } self.assertIsNone(ds_os.version) - found = ds_os.get_data() + found = ds_os.get_data(timeout=0.1, retries=0) self.assertFalse(found) self.assertIsNone(ds_os.version) diff --git a/tests/unittests/test_datasource/test_smartos.py b/tests/unittests/test_datasource/test_smartos.py index 5c49966a..9c6c8768 100644 --- a/tests/unittests/test_datasource/test_smartos.py +++ b/tests/unittests/test_datasource/test_smartos.py @@ -24,6 +24,8 @@ from __future__ import print_function +from binascii import crc32 +import json import os import os.path import re @@ -31,21 +33,58 @@ import shutil import stat import tempfile import uuid -from binascii import crc32 -import serial +from cloudinit import serial +from cloudinit.sources import DataSourceSmartOS + import six from cloudinit import helpers as c_helpers -from cloudinit.sources import DataSourceSmartOS from cloudinit.util import b64e -from .. import helpers - -try: - from unittest import mock -except ImportError: - import mock +from ..helpers import mock, FilesystemMockingTestCase, TestCase + +SDC_NICS = json.loads(""" +[ + { + "nic_tag": "external", + "primary": true, + "mtu": 1500, + "model": "virtio", + "gateway": "8.12.42.1", + "netmask": "255.255.255.0", + "ip": "8.12.42.102", + "network_uuid": "992fc7ce-6aac-4b74-aed6-7b9d2c6c0bfe", + "gateways": [ + "8.12.42.1" + ], + "vlan_id": 324, + "mac": "90:b8:d0:f5:e4:f5", + "interface": "net0", + "ips": [ + "8.12.42.102/24" + ] + }, + { + "nic_tag": "sdc_overlay/16187209", + "gateway": "192.168.128.1", + "model": "virtio", + "mac": "90:b8:d0:a5:ff:cd", + "netmask": "255.255.252.0", + "ip": "192.168.128.93", + "network_uuid": "4cad71da-09bc-452b-986d-03562a03a0a9", + "gateways": [ + "192.168.128.1" + ], + "vlan_id": 2, + "mtu": 8500, + "interface": "net1", + "ips": [ + "192.168.128.93/22" + ] + } +] +""") MOCK_RETURNS = { 'hostname': 'test-host', @@ -60,79 +99,68 @@ MOCK_RETURNS = { 'sdc:vendor-data': '\n'.join(['VENDOR_DATA', '']), 'user-data': '\n'.join(['something', '']), 'user-script': '\n'.join(['/bin/true', '']), + 'sdc:nics': json.dumps(SDC_NICS), } DMI_DATA_RETURN = 'smartdc' -def get_mock_client(mockdata): - class MockMetadataClient(object): +class PsuedoJoyentClient(object): + def __init__(self, data=None): + if data is None: + data = MOCK_RETURNS.copy() + self.data = data + return + + def get(self, key, default=None, strip=False): + if key in self.data: + r = self.data[key] + if strip: + r = r.strip() + else: + r = default + return r - def __init__(self, serial): - pass + def get_json(self, key, default=None): + result = self.get(key, default=default) + if result is None: + return default + return json.loads(result) - def get_metadata(self, metadata_key): - return mockdata.get(metadata_key) - return MockMetadataClient + def exists(self): + return True -class TestSmartOSDataSource(helpers.FilesystemMockingTestCase): +class TestSmartOSDataSource(FilesystemMockingTestCase): def setUp(self): super(TestSmartOSDataSource, self).setUp() + dsmos = 'cloudinit.sources.DataSourceSmartOS' + patcher = mock.patch(dsmos + ".jmc_client_factory") + self.jmc_cfact = patcher.start() + self.addCleanup(patcher.stop) + patcher = mock.patch(dsmos + ".get_smartos_environ") + self.get_smartos_environ = patcher.start() + self.addCleanup(patcher.stop) + self.tmp = tempfile.mkdtemp() self.addCleanup(shutil.rmtree, self.tmp) - self.legacy_user_d = tempfile.mkdtemp() - self.addCleanup(shutil.rmtree, self.legacy_user_d) - - # If you should want to watch the logs... - self._log = None - self._log_file = None - self._log_handler = None - - # patch cloud_dir, so our 'seed_dir' is guaranteed empty self.paths = c_helpers.Paths({'cloud_dir': self.tmp}) - self.unapply = [] - super(TestSmartOSDataSource, self).setUp() + self.legacy_user_d = os.path.join(self.tmp, 'legacy_user_tmp') + os.mkdir(self.legacy_user_d) + + self.orig_lud = DataSourceSmartOS.LEGACY_USER_D + DataSourceSmartOS.LEGACY_USER_D = self.legacy_user_d def tearDown(self): - helpers.FilesystemMockingTestCase.tearDown(self) - if self._log_handler and self._log: - self._log.removeHandler(self._log_handler) - apply_patches([i for i in reversed(self.unapply)]) + DataSourceSmartOS.LEGACY_USER_D = self.orig_lud super(TestSmartOSDataSource, self).tearDown() - def _patchIn(self, root): - self.restore() - self.patchOS(root) - self.patchUtils(root) - - def apply_patches(self, patches): - ret = apply_patches(patches) - self.unapply += ret - - def _get_ds(self, sys_cfg=None, ds_cfg=None, mockdata=None, dmi_data=None, - is_lxbrand=False): - mod = DataSourceSmartOS - - if mockdata is None: - mockdata = MOCK_RETURNS - - if dmi_data is None: - dmi_data = DMI_DATA_RETURN - - def _dmi_data(): - return dmi_data - - def _os_uname(): - if not is_lxbrand: - # LP: #1243287. tests assume this runs, but running test on - # arm would cause them all to fail. - return ('LINUX', 'NODENAME', 'RELEASE', 'VERSION', 'x86_64') - else: - return ('LINUX', 'NODENAME', 'RELEASE', 'BRANDZ VIRTUAL LINUX', - 'X86_64') + def _get_ds(self, mockdata=None, mode=DataSourceSmartOS.SMARTOS_ENV_KVM, + sys_cfg=None, ds_cfg=None): + self.jmc_cfact.return_value = PsuedoJoyentClient(mockdata) + self.get_smartos_environ.return_value = mode if sys_cfg is None: sys_cfg = {} @@ -141,44 +169,8 @@ class TestSmartOSDataSource(helpers.FilesystemMockingTestCase): sys_cfg['datasource'] = sys_cfg.get('datasource', {}) sys_cfg['datasource']['SmartOS'] = ds_cfg - self.apply_patches([(mod, 'LEGACY_USER_D', self.legacy_user_d)]) - self.apply_patches([ - (mod, 'JoyentMetadataClient', get_mock_client(mockdata))]) - self.apply_patches([(mod, 'dmi_data', _dmi_data)]) - self.apply_patches([(os, 'uname', _os_uname)]) - self.apply_patches([(mod, 'device_exists', lambda d: True)]) - dsrc = mod.DataSourceSmartOS(sys_cfg, distro=None, - paths=self.paths) - self.apply_patches([(dsrc, '_get_seed_file_object', mock.MagicMock())]) - return dsrc - - def test_seed(self): - # default seed should be /dev/ttyS1 - dsrc = self._get_ds() - ret = dsrc.get_data() - self.assertTrue(ret) - self.assertEquals('kvm', dsrc.smartos_type) - self.assertEquals('/dev/ttyS1', dsrc.seed) - - def test_seed_lxbrand(self): - # default seed should be /dev/ttyS1 - dsrc = self._get_ds(is_lxbrand=True) - ret = dsrc.get_data() - self.assertTrue(ret) - self.assertEquals('lx-brand', dsrc.smartos_type) - self.assertEquals('/native/.zonecontrol/metadata.sock', dsrc.seed) - - def test_issmartdc(self): - dsrc = self._get_ds() - ret = dsrc.get_data() - self.assertTrue(ret) - self.assertTrue(dsrc.is_smartdc) - - def test_issmartdc_lxbrand(self): - dsrc = self._get_ds(is_lxbrand=True) - ret = dsrc.get_data() - self.assertTrue(ret) - self.assertTrue(dsrc.is_smartdc) + return DataSourceSmartOS.DataSourceSmartOS( + sys_cfg, distro=None, paths=self.paths) def test_no_base64(self): ds_cfg = {'no_base64_decode': ['test_var1'], 'all_base': True} @@ -190,110 +182,65 @@ class TestSmartOSDataSource(helpers.FilesystemMockingTestCase): dsrc = self._get_ds(mockdata=MOCK_RETURNS) ret = dsrc.get_data() self.assertTrue(ret) - self.assertEquals(MOCK_RETURNS['sdc:uuid'], - dsrc.metadata['instance-id']) + self.assertEqual(MOCK_RETURNS['sdc:uuid'], + dsrc.metadata['instance-id']) def test_root_keys(self): dsrc = self._get_ds(mockdata=MOCK_RETURNS) ret = dsrc.get_data() self.assertTrue(ret) - self.assertEquals(MOCK_RETURNS['root_authorized_keys'], - dsrc.metadata['public-keys']) + self.assertEqual(MOCK_RETURNS['root_authorized_keys'], + dsrc.metadata['public-keys']) def test_hostname_b64(self): dsrc = self._get_ds(mockdata=MOCK_RETURNS) ret = dsrc.get_data() self.assertTrue(ret) - self.assertEquals(MOCK_RETURNS['hostname'], - dsrc.metadata['local-hostname']) + self.assertEqual(MOCK_RETURNS['hostname'], + dsrc.metadata['local-hostname']) def test_hostname(self): dsrc = self._get_ds(mockdata=MOCK_RETURNS) ret = dsrc.get_data() self.assertTrue(ret) - self.assertEquals(MOCK_RETURNS['hostname'], - dsrc.metadata['local-hostname']) - - def test_base64_all(self): - # metadata provided base64_all of true - my_returns = MOCK_RETURNS.copy() - my_returns['base64_all'] = "true" - for k in ('hostname', 'cloud-init:user-data'): - my_returns[k] = b64e(my_returns[k]) - - dsrc = self._get_ds(mockdata=my_returns) - ret = dsrc.get_data() - self.assertTrue(ret) - self.assertEquals(MOCK_RETURNS['hostname'], - dsrc.metadata['local-hostname']) - self.assertEquals(MOCK_RETURNS['cloud-init:user-data'], - dsrc.userdata_raw) - self.assertEquals(MOCK_RETURNS['root_authorized_keys'], - dsrc.metadata['public-keys']) - self.assertEquals(MOCK_RETURNS['disable_iptables_flag'], - dsrc.metadata['iptables_disable']) - self.assertEquals(MOCK_RETURNS['enable_motd_sys_info'], - dsrc.metadata['motd_sys_info']) - - def test_b64_userdata(self): - my_returns = MOCK_RETURNS.copy() - my_returns['b64-cloud-init:user-data'] = "true" - my_returns['b64-hostname'] = "true" - for k in ('hostname', 'cloud-init:user-data'): - my_returns[k] = b64e(my_returns[k]) + self.assertEqual(MOCK_RETURNS['hostname'], + dsrc.metadata['local-hostname']) - dsrc = self._get_ds(mockdata=my_returns) - ret = dsrc.get_data() - self.assertTrue(ret) - self.assertEquals(MOCK_RETURNS['hostname'], - dsrc.metadata['local-hostname']) - self.assertEquals(MOCK_RETURNS['cloud-init:user-data'], - dsrc.userdata_raw) - self.assertEquals(MOCK_RETURNS['root_authorized_keys'], - dsrc.metadata['public-keys']) - - def test_b64_keys(self): - my_returns = MOCK_RETURNS.copy() - my_returns['base64_keys'] = 'hostname,ignored' - for k in ('hostname',): - my_returns[k] = b64e(my_returns[k]) - - dsrc = self._get_ds(mockdata=my_returns) + def test_userdata(self): + dsrc = self._get_ds(mockdata=MOCK_RETURNS) ret = dsrc.get_data() self.assertTrue(ret) - self.assertEquals(MOCK_RETURNS['hostname'], - dsrc.metadata['local-hostname']) - self.assertEquals(MOCK_RETURNS['cloud-init:user-data'], - dsrc.userdata_raw) + self.assertEqual(MOCK_RETURNS['user-data'], + dsrc.metadata['legacy-user-data']) + self.assertEqual(MOCK_RETURNS['cloud-init:user-data'], + dsrc.userdata_raw) - def test_userdata(self): + def test_sdc_nics(self): dsrc = self._get_ds(mockdata=MOCK_RETURNS) ret = dsrc.get_data() self.assertTrue(ret) - self.assertEquals(MOCK_RETURNS['user-data'], - dsrc.metadata['legacy-user-data']) - self.assertEquals(MOCK_RETURNS['cloud-init:user-data'], - dsrc.userdata_raw) + self.assertEqual(json.loads(MOCK_RETURNS['sdc:nics']), + dsrc.metadata['network-data']) def test_sdc_scripts(self): dsrc = self._get_ds(mockdata=MOCK_RETURNS) ret = dsrc.get_data() self.assertTrue(ret) - self.assertEquals(MOCK_RETURNS['user-script'], - dsrc.metadata['user-script']) + self.assertEqual(MOCK_RETURNS['user-script'], + dsrc.metadata['user-script']) legacy_script_f = "%s/user-script" % self.legacy_user_d self.assertTrue(os.path.exists(legacy_script_f)) self.assertTrue(os.path.islink(legacy_script_f)) user_script_perm = oct(os.stat(legacy_script_f)[stat.ST_MODE])[-3:] - self.assertEquals(user_script_perm, '700') + self.assertEqual(user_script_perm, '700') def test_scripts_shebanged(self): dsrc = self._get_ds(mockdata=MOCK_RETURNS) ret = dsrc.get_data() self.assertTrue(ret) - self.assertEquals(MOCK_RETURNS['user-script'], - dsrc.metadata['user-script']) + self.assertEqual(MOCK_RETURNS['user-script'], + dsrc.metadata['user-script']) legacy_script_f = "%s/user-script" % self.legacy_user_d self.assertTrue(os.path.exists(legacy_script_f)) @@ -301,9 +248,9 @@ class TestSmartOSDataSource(helpers.FilesystemMockingTestCase): shebang = None with open(legacy_script_f, 'r') as f: shebang = f.readlines()[0].strip() - self.assertEquals(shebang, "#!/bin/bash") + self.assertEqual(shebang, "#!/bin/bash") user_script_perm = oct(os.stat(legacy_script_f)[stat.ST_MODE])[-3:] - self.assertEquals(user_script_perm, '700') + self.assertEqual(user_script_perm, '700') def test_scripts_shebang_not_added(self): """ @@ -319,8 +266,8 @@ class TestSmartOSDataSource(helpers.FilesystemMockingTestCase): dsrc = self._get_ds(mockdata=my_returns) ret = dsrc.get_data() self.assertTrue(ret) - self.assertEquals(my_returns['user-script'], - dsrc.metadata['user-script']) + self.assertEqual(my_returns['user-script'], + dsrc.metadata['user-script']) legacy_script_f = "%s/user-script" % self.legacy_user_d self.assertTrue(os.path.exists(legacy_script_f)) @@ -328,7 +275,7 @@ class TestSmartOSDataSource(helpers.FilesystemMockingTestCase): shebang = None with open(legacy_script_f, 'r') as f: shebang = f.readlines()[0].strip() - self.assertEquals(shebang, "#!/usr/bin/perl") + self.assertEqual(shebang, "#!/usr/bin/perl") def test_userdata_removed(self): """ @@ -358,7 +305,7 @@ class TestSmartOSDataSource(helpers.FilesystemMockingTestCase): if re.match(r'.*\/mdata-user-data$', name_f): found_new = True print(name_f) - self.assertEquals(permissions, '400') + self.assertEqual(permissions, '400') self.assertFalse(found_new) @@ -366,8 +313,8 @@ class TestSmartOSDataSource(helpers.FilesystemMockingTestCase): dsrc = self._get_ds(mockdata=MOCK_RETURNS) ret = dsrc.get_data() self.assertTrue(ret) - self.assertEquals(MOCK_RETURNS['sdc:vendor-data'], - dsrc.metadata['vendor-data']) + self.assertEqual(MOCK_RETURNS['sdc:vendor-data'], + dsrc.metadata['vendor-data']) def test_default_vendor_data(self): my_returns = MOCK_RETURNS.copy() @@ -376,7 +323,7 @@ class TestSmartOSDataSource(helpers.FilesystemMockingTestCase): dsrc = self._get_ds(mockdata=my_returns) ret = dsrc.get_data() self.assertTrue(ret) - self.assertNotEquals(def_op_script, dsrc.metadata['vendor-data']) + self.assertNotEqual(def_op_script, dsrc.metadata['vendor-data']) # we expect default vendor-data is a boothook self.assertTrue(dsrc.vendordata_raw.startswith("#cloud-boothook")) @@ -385,15 +332,15 @@ class TestSmartOSDataSource(helpers.FilesystemMockingTestCase): dsrc = self._get_ds(mockdata=MOCK_RETURNS) ret = dsrc.get_data() self.assertTrue(ret) - self.assertEquals(MOCK_RETURNS['disable_iptables_flag'], - dsrc.metadata['iptables_disable']) + self.assertEqual(MOCK_RETURNS['disable_iptables_flag'], + dsrc.metadata['iptables_disable']) def test_motd_sys_info(self): dsrc = self._get_ds(mockdata=MOCK_RETURNS) ret = dsrc.get_data() self.assertTrue(ret) - self.assertEquals(MOCK_RETURNS['enable_motd_sys_info'], - dsrc.metadata['motd_sys_info']) + self.assertEqual(MOCK_RETURNS['enable_motd_sys_info'], + dsrc.metadata['motd_sys_info']) def test_default_ephemeral(self): # Test to make sure that the builtin config has the ephemeral @@ -430,21 +377,11 @@ class TestSmartOSDataSource(helpers.FilesystemMockingTestCase): mydscfg['disk_aliases']['FOO']) -def apply_patches(patches): - ret = [] - for (ref, name, replace) in patches: - if replace is None: - continue - orig = getattr(ref, name) - setattr(ref, name, replace) - ret.append((ref, name, orig)) - return ret - - -class TestJoyentMetadataClient(helpers.FilesystemMockingTestCase): +class TestJoyentMetadataClient(FilesystemMockingTestCase): def setUp(self): super(TestJoyentMetadataClient, self).setUp() + self.serial = mock.MagicMock(spec=serial.Serial) self.request_id = 0xabcdef12 self.metadata_value = 'value' @@ -481,7 +418,8 @@ class TestJoyentMetadataClient(helpers.FilesystemMockingTestCase): mock.Mock(return_value=self.request_id))) def _get_client(self): - return DataSourceSmartOS.JoyentMetadataClient(self.serial) + return DataSourceSmartOS.JoyentMetadataClient( + fp=self.serial, smartos_type=DataSourceSmartOS.SMARTOS_ENV_KVM) def assertEndsWith(self, haystack, prefix): self.assertTrue(haystack.endswith(prefix), @@ -495,7 +433,7 @@ class TestJoyentMetadataClient(helpers.FilesystemMockingTestCase): def test_get_metadata_writes_a_single_line(self): client = self._get_client() - client.get_metadata('some_key') + client.get('some_key') self.assertEqual(1, self.serial.write.call_count) written_line = self.serial.write.call_args[0][0] print(type(written_line)) @@ -505,7 +443,7 @@ class TestJoyentMetadataClient(helpers.FilesystemMockingTestCase): def _get_written_line(self, key='some_key'): client = self._get_client() - client.get_metadata(key) + client.get(key) return self.serial.write.call_args[0][0] def test_get_metadata_writes_bytes(self): @@ -549,32 +487,32 @@ class TestJoyentMetadataClient(helpers.FilesystemMockingTestCase): def test_get_metadata_reads_a_line(self): client = self._get_client() - client.get_metadata('some_key') + client.get('some_key') self.assertEqual(self.metasource_data_len, self.serial.read.call_count) def test_get_metadata_returns_valid_value(self): client = self._get_client() - value = client.get_metadata('some_key') + value = client.get('some_key') self.assertEqual(self.metadata_value, value) def test_get_metadata_throws_exception_for_incorrect_length(self): self.response_parts['length'] = 0 client = self._get_client() self.assertRaises(DataSourceSmartOS.JoyentMetadataFetchException, - client.get_metadata, 'some_key') + client.get, 'some_key') def test_get_metadata_throws_exception_for_incorrect_crc(self): self.response_parts['crc'] = 'deadbeef' client = self._get_client() self.assertRaises(DataSourceSmartOS.JoyentMetadataFetchException, - client.get_metadata, 'some_key') + client.get, 'some_key') def test_get_metadata_throws_exception_for_request_id_mismatch(self): self.response_parts['request_id'] = 'deadbeef' client = self._get_client() client._checksum = lambda _: self.response_parts['crc'] self.assertRaises(DataSourceSmartOS.JoyentMetadataFetchException, - client.get_metadata, 'some_key') + client.get, 'some_key') def test_get_metadata_returns_None_if_value_not_found(self): self.response_parts['payload'] = '' @@ -582,4 +520,24 @@ class TestJoyentMetadataClient(helpers.FilesystemMockingTestCase): self.response_parts['length'] = 17 client = self._get_client() client._checksum = lambda _: self.response_parts['crc'] - self.assertIsNone(client.get_metadata('some_key')) + self.assertIsNone(client.get('some_key')) + + +class TestNetworkConversion(TestCase): + + def test_convert_simple(self): + expected = { + 'version': 1, + 'config': [ + {'name': 'net0', 'type': 'physical', + 'subnets': [{'type': 'static', 'gateway': '8.12.42.1', + 'netmask': '255.255.255.0', + 'address': '8.12.42.102/24'}], + 'mtu': 1500, 'mac_address': '90:b8:d0:f5:e4:f5'}, + {'name': 'net1', 'type': 'physical', + 'subnets': [{'type': 'static', 'gateway': '192.168.128.1', + 'netmask': '255.255.252.0', + 'address': '192.168.128.93/22'}], + 'mtu': 8500, 'mac_address': '90:b8:d0:a5:ff:cd'}]} + found = DataSourceSmartOS.convert_smartos_network_data(SDC_NICS) + self.assertEqual(expected, found) diff --git a/tests/unittests/test_distros/test_generic.py b/tests/unittests/test_distros/test_generic.py index 6ed1704c..96fa0811 100644 --- a/tests/unittests/test_distros/test_generic.py +++ b/tests/unittests/test_distros/test_generic.py @@ -87,13 +87,13 @@ class TestGenericDistro(helpers.FilesystemMockingTestCase): rules = 'ALL=(ALL:ALL) ALL' contents = self._write_load_sudoers('harlowja', rules) expected = ['harlowja ALL=(ALL:ALL) ALL'] - self.assertEquals(len(expected), self._count_in(expected, contents)) + self.assertEqual(len(expected), self._count_in(expected, contents)) not_expected = [ 'harlowja A', 'harlowja L', 'harlowja L', ] - self.assertEquals(0, self._count_in(not_expected, contents)) + self.assertEqual(0, self._count_in(not_expected, contents)) def test_sudoers_ensure_rules_list(self): rules = [ @@ -107,13 +107,13 @@ class TestGenericDistro(helpers.FilesystemMockingTestCase): 'harlowja B-ALL=(ALL:ALL) ALL', 'harlowja C-ALL=(ALL:ALL) ALL', ] - self.assertEquals(len(expected), self._count_in(expected, contents)) + self.assertEqual(len(expected), self._count_in(expected, contents)) not_expected = [ 'harlowja A', 'harlowja L', 'harlowja L', ] - self.assertEquals(0, self._count_in(not_expected, contents)) + self.assertEqual(0, self._count_in(not_expected, contents)) def test_sudoers_ensure_new(self): cls = distros.fetch("ubuntu") @@ -136,7 +136,7 @@ class TestGenericDistro(helpers.FilesystemMockingTestCase): self.assertIn("includedir /b", contents) self.assertTrue(os.path.isdir("/b")) self.assertIn("josh", contents) - self.assertEquals(2, contents.count("josh")) + self.assertEqual(2, contents.count("josh")) def test_arch_package_mirror_info_unknown(self): """for an unknown arch, we should get back that with arch 'default'.""" diff --git a/tests/unittests/test_distros/test_hostname.py b/tests/unittests/test_distros/test_hostname.py index 143e29a9..5f28a868 100644 --- a/tests/unittests/test_distros/test_hostname.py +++ b/tests/unittests/test_distros/test_hostname.py @@ -15,24 +15,24 @@ BASE_HOSTNAME = BASE_HOSTNAME.strip() class TestHostnameHelper(unittest.TestCase): def test_parse_same(self): hn = hostname.HostnameConf(BASE_HOSTNAME) - self.assertEquals(str(hn).strip(), BASE_HOSTNAME) - self.assertEquals(hn.hostname, 'blahblah') + self.assertEqual(str(hn).strip(), BASE_HOSTNAME) + self.assertEqual(hn.hostname, 'blahblah') def test_no_adjust_hostname(self): hn = hostname.HostnameConf(BASE_HOSTNAME) prev_name = hn.hostname hn.set_hostname("") - self.assertEquals(hn.hostname, prev_name) + self.assertEqual(hn.hostname, prev_name) def test_adjust_hostname(self): hn = hostname.HostnameConf(BASE_HOSTNAME) prev_name = hn.hostname - self.assertEquals(prev_name, 'blahblah') + self.assertEqual(prev_name, 'blahblah') hn.set_hostname("bbbbd") - self.assertEquals(hn.hostname, 'bbbbd') + self.assertEqual(hn.hostname, 'bbbbd') expected_out = ''' # My super-duper-hostname bbbbd ''' - self.assertEquals(str(hn).strip(), expected_out.strip()) + self.assertEqual(str(hn).strip(), expected_out.strip()) diff --git a/tests/unittests/test_distros/test_hosts.py b/tests/unittests/test_distros/test_hosts.py index fc701eaa..ab867c6f 100644 --- a/tests/unittests/test_distros/test_hosts.py +++ b/tests/unittests/test_distros/test_hosts.py @@ -17,25 +17,25 @@ BASE_ETC = BASE_ETC.strip() class TestHostsHelper(unittest.TestCase): def test_parse(self): eh = hosts.HostsConf(BASE_ETC) - self.assertEquals(eh.get_entry('127.0.0.1'), [['localhost']]) - self.assertEquals(eh.get_entry('192.168.1.10'), - [['foo.mydomain.org', 'foo'], - ['bar.mydomain.org', 'bar']]) + self.assertEqual(eh.get_entry('127.0.0.1'), [['localhost']]) + self.assertEqual(eh.get_entry('192.168.1.10'), + [['foo.mydomain.org', 'foo'], + ['bar.mydomain.org', 'bar']]) eh = str(eh) self.assertTrue(eh.startswith('# Example')) def test_add(self): eh = hosts.HostsConf(BASE_ETC) eh.add_entry('127.0.0.0', 'blah') - self.assertEquals(eh.get_entry('127.0.0.0'), [['blah']]) + self.assertEqual(eh.get_entry('127.0.0.0'), [['blah']]) eh.add_entry('127.0.0.3', 'blah', 'blah2', 'blah3') - self.assertEquals(eh.get_entry('127.0.0.3'), - [['blah', 'blah2', 'blah3']]) + self.assertEqual(eh.get_entry('127.0.0.3'), + [['blah', 'blah2', 'blah3']]) def test_del(self): eh = hosts.HostsConf(BASE_ETC) eh.add_entry('127.0.0.0', 'blah') - self.assertEquals(eh.get_entry('127.0.0.0'), [['blah']]) + self.assertEqual(eh.get_entry('127.0.0.0'), [['blah']]) eh.del_entries('127.0.0.0') - self.assertEquals(eh.get_entry('127.0.0.0'), []) + self.assertEqual(eh.get_entry('127.0.0.0'), []) diff --git a/tests/unittests/test_distros/test_netconfig.py b/tests/unittests/test_distros/test_netconfig.py index 2c2a424d..9172e3aa 100644 --- a/tests/unittests/test_distros/test_netconfig.py +++ b/tests/unittests/test_distros/test_netconfig.py @@ -1,4 +1,5 @@ import os +from six import StringIO try: from unittest import mock @@ -9,16 +10,14 @@ try: except ImportError: from contextlib2 import ExitStack -from six import StringIO from ..helpers import TestCase from cloudinit import distros +from cloudinit.distros.parsers.sys_conf import SysConf from cloudinit import helpers from cloudinit import settings from cloudinit import util -from cloudinit.distros.parsers.sys_conf import SysConf - BASE_NET_CFG = ''' auto lo @@ -108,23 +107,23 @@ class TestNetCfgDistro(TestCase): ub_distro.apply_network(BASE_NET_CFG, False) - self.assertEquals(len(write_bufs), 1) + self.assertEqual(len(write_bufs), 1) eni_name = '/etc/network/interfaces.d/50-cloud-init.cfg' self.assertIn(eni_name, write_bufs) write_buf = write_bufs[eni_name] - self.assertEquals(str(write_buf).strip(), BASE_NET_CFG.strip()) - self.assertEquals(write_buf.mode, 0o644) + self.assertEqual(str(write_buf).strip(), BASE_NET_CFG.strip()) + self.assertEqual(write_buf.mode, 0o644) def assertCfgEquals(self, blob1, blob2): b1 = dict(SysConf(blob1.strip().splitlines())) b2 = dict(SysConf(blob2.strip().splitlines())) - self.assertEquals(b1, b2) + self.assertEqual(b1, b2) for (k, v) in b1.items(): self.assertIn(k, b2) for (k, v) in b2.items(): self.assertIn(k, b1) for (k, v) in b1.items(): - self.assertEquals(v, b2[k]) + self.assertEqual(v, b2[k]) def test_simple_write_rh(self): rh_distro = self._get_distro('rhel') @@ -148,7 +147,7 @@ class TestNetCfgDistro(TestCase): rh_distro.apply_network(BASE_NET_CFG, False) - self.assertEquals(len(write_bufs), 4) + self.assertEqual(len(write_bufs), 4) self.assertIn('/etc/sysconfig/network-scripts/ifcfg-lo', write_bufs) write_buf = write_bufs['/etc/sysconfig/network-scripts/ifcfg-lo'] @@ -157,7 +156,7 @@ DEVICE="lo" ONBOOT=yes ''' self.assertCfgEquals(expected_buf, str(write_buf)) - self.assertEquals(write_buf.mode, 0o644) + self.assertEqual(write_buf.mode, 0o644) self.assertIn('/etc/sysconfig/network-scripts/ifcfg-eth0', write_bufs) @@ -172,7 +171,7 @@ GATEWAY="192.168.1.254" BROADCAST="192.168.1.0" ''' self.assertCfgEquals(expected_buf, str(write_buf)) - self.assertEquals(write_buf.mode, 0o644) + self.assertEqual(write_buf.mode, 0o644) self.assertIn('/etc/sysconfig/network-scripts/ifcfg-eth1', write_bufs) @@ -183,7 +182,7 @@ BOOTPROTO="dhcp" ONBOOT=yes ''' self.assertCfgEquals(expected_buf, str(write_buf)) - self.assertEquals(write_buf.mode, 0o644) + self.assertEqual(write_buf.mode, 0o644) self.assertIn('/etc/sysconfig/network', write_bufs) write_buf = write_bufs['/etc/sysconfig/network'] @@ -192,7 +191,7 @@ ONBOOT=yes NETWORKING=yes ''' self.assertCfgEquals(expected_buf, str(write_buf)) - self.assertEquals(write_buf.mode, 0o644) + self.assertEqual(write_buf.mode, 0o644) def test_write_ipv6_rhel(self): rh_distro = self._get_distro('rhel') @@ -216,7 +215,7 @@ NETWORKING=yes rh_distro.apply_network(BASE_NET_CFG_IPV6, False) - self.assertEquals(len(write_bufs), 4) + self.assertEqual(len(write_bufs), 4) self.assertIn('/etc/sysconfig/network-scripts/ifcfg-lo', write_bufs) write_buf = write_bufs['/etc/sysconfig/network-scripts/ifcfg-lo'] @@ -225,7 +224,7 @@ DEVICE="lo" ONBOOT=yes ''' self.assertCfgEquals(expected_buf, str(write_buf)) - self.assertEquals(write_buf.mode, 0o644) + self.assertEqual(write_buf.mode, 0o644) self.assertIn('/etc/sysconfig/network-scripts/ifcfg-eth0', write_bufs) @@ -243,7 +242,7 @@ IPV6ADDR="2607:f0d0:1002:0011::2" IPV6_DEFAULTGW="2607:f0d0:1002:0011::1" ''' self.assertCfgEquals(expected_buf, str(write_buf)) - self.assertEquals(write_buf.mode, 0o644) + self.assertEqual(write_buf.mode, 0o644) self.assertIn('/etc/sysconfig/network-scripts/ifcfg-eth1', write_bufs) write_buf = write_bufs['/etc/sysconfig/network-scripts/ifcfg-eth1'] @@ -260,7 +259,7 @@ IPV6ADDR="2607:f0d0:1002:0011::3" IPV6_DEFAULTGW="2607:f0d0:1002:0011::1" ''' self.assertCfgEquals(expected_buf, str(write_buf)) - self.assertEquals(write_buf.mode, 0o644) + self.assertEqual(write_buf.mode, 0o644) self.assertIn('/etc/sysconfig/network', write_bufs) write_buf = write_bufs['/etc/sysconfig/network'] @@ -271,7 +270,7 @@ NETWORKING_IPV6=yes IPV6_AUTOCONF=no ''' self.assertCfgEquals(expected_buf, str(write_buf)) - self.assertEquals(write_buf.mode, 0o644) + self.assertEqual(write_buf.mode, 0o644) def test_simple_write_freebsd(self): fbsd_distro = self._get_distro('freebsd') @@ -319,4 +318,4 @@ ifconfig_vtnet1="DHCP" defaultrouter="192.168.1.254" ''' self.assertCfgEquals(expected_buf, str(write_buf)) - self.assertEquals(write_buf.mode, 0o644) + self.assertEqual(write_buf.mode, 0o644) diff --git a/tests/unittests/test_distros/test_resolv.py b/tests/unittests/test_distros/test_resolv.py index 9edeb6e7..9402b5ea 100644 --- a/tests/unittests/test_distros/test_resolv.py +++ b/tests/unittests/test_distros/test_resolv.py @@ -1,9 +1,10 @@ from cloudinit.distros.parsers import resolv_conf from cloudinit.distros import rhel_util +from ..helpers import TestCase + import re import tempfile -from ..helpers import TestCase BASE_RESOLVE = ''' @@ -19,7 +20,7 @@ class TestResolvHelper(TestCase): def test_parse_same(self): rp = resolv_conf.ResolvConf(BASE_RESOLVE) rp_r = str(rp).strip() - self.assertEquals(BASE_RESOLVE, rp_r) + self.assertEqual(BASE_RESOLVE, rp_r) def test_write_works(self): with tempfile.NamedTemporaryFile() as fh: @@ -27,10 +28,10 @@ class TestResolvHelper(TestCase): def test_local_domain(self): rp = resolv_conf.ResolvConf(BASE_RESOLVE) - self.assertEquals(None, rp.local_domain) + self.assertEqual(None, rp.local_domain) rp.local_domain = "bob" - self.assertEquals('bob', rp.local_domain) + self.assertEqual('bob', rp.local_domain) self.assertIn('domain bob', str(rp)) def test_nameservers(self): @@ -41,7 +42,7 @@ class TestResolvHelper(TestCase): self.assertIn('10.2', rp.nameservers) self.assertIn('nameserver 10.2', str(rp)) self.assertNotIn('10.3', rp.nameservers) - self.assertEquals(len(rp.nameservers), 3) + self.assertEqual(len(rp.nameservers), 3) rp.add_nameserver('10.2') self.assertRaises(ValueError, rp.add_nameserver, '10.3') self.assertNotIn('10.3', rp.nameservers) @@ -55,12 +56,12 @@ class TestResolvHelper(TestCase): self.assertTrue(re.search(r'search(.*)bbb.y.com(.*)', str(rp))) self.assertIn('bbb.y.com', rp.search_domains) rp.add_search_domain('bbb.y.com') - self.assertEquals(len(rp.search_domains), 3) + self.assertEqual(len(rp.search_domains), 3) rp.add_search_domain('bbb2.y.com') - self.assertEquals(len(rp.search_domains), 4) + self.assertEqual(len(rp.search_domains), 4) rp.add_search_domain('bbb3.y.com') - self.assertEquals(len(rp.search_domains), 5) + self.assertEqual(len(rp.search_domains), 5) rp.add_search_domain('bbb4.y.com') - self.assertEquals(len(rp.search_domains), 6) + self.assertEqual(len(rp.search_domains), 6) self.assertRaises(ValueError, rp.add_search_domain, 'bbb5.y.com') - self.assertEquals(len(rp.search_domains), 6) + self.assertEqual(len(rp.search_domains), 6) diff --git a/tests/unittests/test_distros/test_sysconfig.py b/tests/unittests/test_distros/test_sysconfig.py index 03d89a10..8cb55522 100644 --- a/tests/unittests/test_distros/test_sysconfig.py +++ b/tests/unittests/test_distros/test_sysconfig.py @@ -1,6 +1,7 @@ import re from cloudinit.distros.parsers.sys_conf import SysConf + from ..helpers import TestCase @@ -27,34 +28,34 @@ IPV6TO4_ROUTING='eth0-:0004::1/64 eth1-:0005::1/64' ETHTOOL_OPTS="-K ${DEVICE} tso on; -G ${DEVICE} rx 256 tx 256" USEMD5=no''' conf = SysConf(contents.splitlines()) - self.assertEquals(conf['HOSTNAME'], 'blahblah') - self.assertEquals(conf['SHORTDATE'], '$(date +%y:%m:%d:%H:%M)') + self.assertEqual(conf['HOSTNAME'], 'blahblah') + self.assertEqual(conf['SHORTDATE'], '$(date +%y:%m:%d:%H:%M)') # Should be unquoted - self.assertEquals(conf['ETHTOOL_OPTS'], ('-K ${DEVICE} tso on; ' - '-G ${DEVICE} rx 256 tx 256')) - self.assertEquals(contents, str(conf)) + self.assertEqual(conf['ETHTOOL_OPTS'], ('-K ${DEVICE} tso on; ' + '-G ${DEVICE} rx 256 tx 256')) + self.assertEqual(contents, str(conf)) def test_parse_shell_vars(self): contents = 'USESMBAUTH=$XYZ' conf = SysConf(contents.splitlines()) - self.assertEquals(contents, str(conf)) + self.assertEqual(contents, str(conf)) conf = SysConf('') conf['B'] = '${ZZ}d apples' # Should be quoted - self.assertEquals('B="${ZZ}d apples"', str(conf)) + self.assertEqual('B="${ZZ}d apples"', str(conf)) conf = SysConf('') conf['B'] = '$? d apples' - self.assertEquals('B="$? d apples"', str(conf)) + self.assertEqual('B="$? d apples"', str(conf)) contents = 'IPMI_WATCHDOG_OPTIONS="timeout=60"' conf = SysConf(contents.splitlines()) - self.assertEquals('IPMI_WATCHDOG_OPTIONS=timeout=60', str(conf)) + self.assertEqual('IPMI_WATCHDOG_OPTIONS=timeout=60', str(conf)) def test_parse_adjust(self): contents = 'IPV6TO4_ROUTING="eth0-:0004::1/64 eth1-:0005::1/64"' conf = SysConf(contents.splitlines()) # Should be unquoted - self.assertEquals('eth0-:0004::1/64 eth1-:0005::1/64', - conf['IPV6TO4_ROUTING']) + self.assertEqual('eth0-:0004::1/64 eth1-:0005::1/64', + conf['IPV6TO4_ROUTING']) conf['IPV6TO4_ROUTING'] = "blah \tblah" contents2 = str(conf).strip() # Should be requoted due to whitespace @@ -65,12 +66,12 @@ USEMD5=no''' conf = SysConf(''.splitlines()) conf['B'] = ' $(time)' contents = str(conf) - self.assertEquals('B= $(time)', contents) + self.assertEqual('B= $(time)', contents) def test_parse_empty(self): contents = '' conf = SysConf(contents.splitlines()) - self.assertEquals('', str(conf).strip()) + self.assertEqual('', str(conf).strip()) def test_parse_add_new(self): contents = 'BLAH=b' diff --git a/tests/unittests/test_distros/test_user_data_normalize.py b/tests/unittests/test_distros/test_user_data_normalize.py index 4525f487..a887a930 100644 --- a/tests/unittests/test_distros/test_user_data_normalize.py +++ b/tests/unittests/test_distros/test_user_data_normalize.py @@ -33,20 +33,19 @@ class TestUGNormalize(TestCase): def test_group_dict(self): distro = self._make_distro('ubuntu') - g = {'groups': [ - {'ubuntu': ['foo', 'bar'], - 'bob': 'users'}, - 'cloud-users', - {'bob': 'users2'} - ]} + g = {'groups': + [{'ubuntu': ['foo', 'bar'], + 'bob': 'users'}, + 'cloud-users', + {'bob': 'users2'}]} (_users, groups) = self._norm(g, distro) self.assertIn('ubuntu', groups) ub_members = groups['ubuntu'] - self.assertEquals(sorted(['foo', 'bar']), sorted(ub_members)) + self.assertEqual(sorted(['foo', 'bar']), sorted(ub_members)) self.assertIn('bob', groups) b_members = groups['bob'] - self.assertEquals(sorted(['users', 'users2']), - sorted(b_members)) + self.assertEqual(sorted(['users', 'users2']), + sorted(b_members)) def test_basic_groups(self): distro = self._make_distro('ubuntu') @@ -55,7 +54,7 @@ class TestUGNormalize(TestCase): } (users, groups) = self._norm(ug_cfg, distro) self.assertIn('bob', groups) - self.assertEquals({}, users) + self.assertEqual({}, users) def test_csv_groups(self): distro = self._make_distro('ubuntu') @@ -66,7 +65,7 @@ class TestUGNormalize(TestCase): self.assertIn('bob', groups) self.assertIn('joe', groups) self.assertIn('steve', groups) - self.assertEquals({}, users) + self.assertEqual({}, users) def test_more_groups(self): distro = self._make_distro('ubuntu') @@ -77,7 +76,7 @@ class TestUGNormalize(TestCase): self.assertIn('bob', groups) self.assertIn('joe', groups) self.assertIn('steve', groups) - self.assertEquals({}, users) + self.assertEqual({}, users) def test_member_groups(self): distro = self._make_distro('ubuntu') @@ -90,11 +89,11 @@ class TestUGNormalize(TestCase): } (users, groups) = self._norm(ug_cfg, distro) self.assertIn('bob', groups) - self.assertEquals(['s'], groups['bob']) - self.assertEquals([], groups['joe']) + self.assertEqual(['s'], groups['bob']) + self.assertEqual([], groups['joe']) self.assertIn('joe', groups) self.assertIn('steve', groups) - self.assertEquals({}, users) + self.assertEqual({}, users) def test_users_simple_dict(self): distro = self._make_distro('ubuntu', bcfg) @@ -128,14 +127,14 @@ class TestUGNormalize(TestCase): } } (users, _groups) = self._norm(ug_cfg, distro) - self.assertEquals({}, users) + self.assertEqual({}, users) ug_cfg = { 'users': { 'default': 'no', } } (users, _groups) = self._norm(ug_cfg, distro) - self.assertEquals({}, users) + self.assertEqual({}, users) def test_users_simple_csv(self): distro = self._make_distro('ubuntu') @@ -145,8 +144,8 @@ class TestUGNormalize(TestCase): (users, _groups) = self._norm(ug_cfg, distro) self.assertIn('joe', users) self.assertIn('bob', users) - self.assertEquals({'default': False}, users['joe']) - self.assertEquals({'default': False}, users['bob']) + self.assertEqual({'default': False}, users['joe']) + self.assertEqual({'default': False}, users['bob']) def test_users_simple(self): distro = self._make_distro('ubuntu') @@ -159,8 +158,8 @@ class TestUGNormalize(TestCase): (users, _groups) = self._norm(ug_cfg, distro) self.assertIn('joe', users) self.assertIn('bob', users) - self.assertEquals({'default': False}, users['joe']) - self.assertEquals({'default': False}, users['bob']) + self.assertEqual({'default': False}, users['joe']) + self.assertEqual({'default': False}, users['bob']) def test_users_old_user(self): distro = self._make_distro('ubuntu', bcfg) @@ -211,8 +210,8 @@ class TestUGNormalize(TestCase): self.assertIn('zetta', users) ug_cfg = {} (users, groups) = self._norm(ug_cfg, distro) - self.assertEquals({}, users) - self.assertEquals({}, groups) + self.assertEqual({}, users) + self.assertEqual({}, groups) def test_users_dict_default_additional(self): distro = self._make_distro('ubuntu', bcfg) @@ -223,12 +222,10 @@ class TestUGNormalize(TestCase): } (users, _groups) = self._norm(ug_cfg, distro) self.assertIn('bob', users) - self.assertEquals(",".join(distro.get_default_user()['groups']), - users['bob']['groups']) - self.assertEquals(True, - users['bob']['blah']) - self.assertEquals(True, - users['bob']['default']) + self.assertEqual(",".join(distro.get_default_user()['groups']), + users['bob']['groups']) + self.assertEqual(True, users['bob']['blah']) + self.assertEqual(True, users['bob']['default']) def test_users_dict_extract(self): distro = self._make_distro('ubuntu', bcfg) @@ -240,7 +237,7 @@ class TestUGNormalize(TestCase): (users, _groups) = self._norm(ug_cfg, distro) self.assertIn('bob', users) (name, config) = distros.extract_default(users) - self.assertEquals(name, 'bob') + self.assertEqual(name, 'bob') expected_config = {} def_config = None try: @@ -255,7 +252,7 @@ class TestUGNormalize(TestCase): expected_config.pop('name', None) expected_config.pop('groups', None) config.pop('groups', None) - self.assertEquals(config, expected_config) + self.assertEqual(config, expected_config) def test_users_dict_default(self): distro = self._make_distro('ubuntu', bcfg) @@ -266,10 +263,9 @@ class TestUGNormalize(TestCase): } (users, _groups) = self._norm(ug_cfg, distro) self.assertIn('bob', users) - self.assertEquals(",".join(distro.get_default_user()['groups']), - users['bob']['groups']) - self.assertEquals(True, - users['bob']['default']) + self.assertEqual(",".join(distro.get_default_user()['groups']), + users['bob']['groups']) + self.assertEqual(True, users['bob']['default']) def test_users_dict_trans(self): distro = self._make_distro('ubuntu') @@ -283,8 +279,8 @@ class TestUGNormalize(TestCase): (users, _groups) = self._norm(ug_cfg, distro) self.assertIn('joe', users) self.assertIn('bob', users) - self.assertEquals({'tr_me': True, 'default': False}, users['joe']) - self.assertEquals({'default': False}, users['bob']) + self.assertEqual({'tr_me': True, 'default': False}, users['joe']) + self.assertEqual({'default': False}, users['bob']) def test_users_dict(self): distro = self._make_distro('ubuntu') @@ -297,5 +293,5 @@ class TestUGNormalize(TestCase): (users, _groups) = self._norm(ug_cfg, distro) self.assertIn('joe', users) self.assertIn('bob', users) - self.assertEquals({'default': False}, users['joe']) - self.assertEquals({'default': False}, users['bob']) + self.assertEqual({'default': False}, users['joe']) + self.assertEqual({'default': False}, users['bob']) diff --git a/tests/unittests/test_ec2_util.py b/tests/unittests/test_ec2_util.py index 99fc54be..d6cf17fa 100644 --- a/tests/unittests/test_ec2_util.py +++ b/tests/unittests/test_ec2_util.py @@ -16,7 +16,7 @@ class TestEc2Util(helpers.HttprettyTestCase): body='stuff', status=200) userdata = eu.get_instance_userdata(self.VERSION) - self.assertEquals('stuff', userdata.decode('utf-8')) + self.assertEqual('stuff', userdata.decode('utf-8')) @hp.activate def test_userdata_fetch_fail_not_found(self): @@ -24,7 +24,7 @@ class TestEc2Util(helpers.HttprettyTestCase): 'http://169.254.169.254/%s/user-data' % (self.VERSION), status=404) userdata = eu.get_instance_userdata(self.VERSION, retries=0) - self.assertEquals('', userdata) + self.assertEqual('', userdata) @hp.activate def test_userdata_fetch_fail_server_dead(self): @@ -32,7 +32,7 @@ class TestEc2Util(helpers.HttprettyTestCase): 'http://169.254.169.254/%s/user-data' % (self.VERSION), status=500) userdata = eu.get_instance_userdata(self.VERSION, retries=0) - self.assertEquals('', userdata) + self.assertEqual('', userdata) @hp.activate def test_userdata_fetch_fail_server_not_found(self): @@ -40,7 +40,7 @@ class TestEc2Util(helpers.HttprettyTestCase): 'http://169.254.169.254/%s/user-data' % (self.VERSION), status=404) userdata = eu.get_instance_userdata(self.VERSION) - self.assertEquals('', userdata) + self.assertEqual('', userdata) @hp.activate def test_metadata_fetch_no_keys(self): @@ -56,9 +56,9 @@ class TestEc2Util(helpers.HttprettyTestCase): hp.register_uri(hp.GET, uh.combine_url(base_url, 'ami-launch-index'), status=200, body='1') md = eu.get_instance_metadata(self.VERSION, retries=0) - self.assertEquals(md['hostname'], 'ec2.fake.host.name.com') - self.assertEquals(md['instance-id'], '123') - self.assertEquals(md['ami-launch-index'], '1') + self.assertEqual(md['hostname'], 'ec2.fake.host.name.com') + self.assertEqual(md['instance-id'], '123') + self.assertEqual(md['ami-launch-index'], '1') @hp.activate def test_metadata_fetch_key(self): @@ -77,9 +77,9 @@ class TestEc2Util(helpers.HttprettyTestCase): uh.combine_url(base_url, 'public-keys/0/openssh-key'), status=200, body='ssh-rsa AAAA.....wZEf my-public-key') md = eu.get_instance_metadata(self.VERSION, retries=0, timeout=0.1) - self.assertEquals(md['hostname'], 'ec2.fake.host.name.com') - self.assertEquals(md['instance-id'], '123') - self.assertEquals(1, len(md['public-keys'])) + self.assertEqual(md['hostname'], 'ec2.fake.host.name.com') + self.assertEqual(md['instance-id'], '123') + self.assertEqual(1, len(md['public-keys'])) @hp.activate def test_metadata_fetch_with_2_keys(self): @@ -102,9 +102,9 @@ class TestEc2Util(helpers.HttprettyTestCase): uh.combine_url(base_url, 'public-keys/1/openssh-key'), status=200, body='ssh-rsa AAAA.....wZEf my-other-key') md = eu.get_instance_metadata(self.VERSION, retries=0, timeout=0.1) - self.assertEquals(md['hostname'], 'ec2.fake.host.name.com') - self.assertEquals(md['instance-id'], '123') - self.assertEquals(2, len(md['public-keys'])) + self.assertEqual(md['hostname'], 'ec2.fake.host.name.com') + self.assertEqual(md['instance-id'], '123') + self.assertEqual(2, len(md['public-keys'])) @hp.activate def test_metadata_fetch_bdm(self): @@ -131,9 +131,9 @@ class TestEc2Util(helpers.HttprettyTestCase): status=200, body="sdc") md = eu.get_instance_metadata(self.VERSION, retries=0, timeout=0.1) - self.assertEquals(md['hostname'], 'ec2.fake.host.name.com') - self.assertEquals(md['instance-id'], '123') + self.assertEqual(md['hostname'], 'ec2.fake.host.name.com') + self.assertEqual(md['instance-id'], '123') bdm = md['block-device-mapping'] - self.assertEquals(2, len(bdm)) - self.assertEquals(bdm['ami'], 'sdb') - self.assertEquals(bdm['ephemeral0'], 'sdc') + self.assertEqual(2, len(bdm)) + self.assertEqual(bdm['ami'], 'sdb') + self.assertEqual(bdm['ephemeral0'], 'sdc') diff --git a/tests/unittests/test_filters/test_launch_index.py b/tests/unittests/test_filters/test_launch_index.py index 95d24b9b..395713e6 100644 --- a/tests/unittests/test_filters/test_launch_index.py +++ b/tests/unittests/test_filters/test_launch_index.py @@ -25,7 +25,7 @@ class TestLaunchFilter(helpers.ResourceUsingTestCase): for (index, count) in expected_counts.items(): index = util.safe_int(index) filtered_message = launch_index.Filter(index).apply(message) - self.assertEquals(count_messages(filtered_message), count) + self.assertEqual(count_messages(filtered_message), count) # Ensure original message still ok/not modified self.assertTrue(self.equivalentMessage(message, orig_message)) diff --git a/tests/unittests/test_handler/test_handler_apt_configure.py b/tests/unittests/test_handler/test_handler_apt_configure.py index 1ed185ca..d1dca2c4 100644 --- a/tests/unittests/test_handler/test_handler_apt_configure.py +++ b/tests/unittests/test_handler/test_handler_apt_configure.py @@ -1,6 +1,6 @@ +from cloudinit.config import cc_apt_configure from cloudinit import util -from cloudinit.config import cc_apt_configure from ..helpers import TestCase import os diff --git a/tests/unittests/test_handler/test_handler_apt_configure_sources_list.py b/tests/unittests/test_handler/test_handler_apt_configure_sources_list.py new file mode 100644 index 00000000..acde0863 --- /dev/null +++ b/tests/unittests/test_handler/test_handler_apt_configure_sources_list.py @@ -0,0 +1,180 @@ +""" test_handler_apt_configure_sources_list +Test templating of sources list +""" +import logging +import os +import shutil +import tempfile + +try: + from unittest import mock +except ImportError: + import mock + +from cloudinit import cloud +from cloudinit import distros +from cloudinit import helpers +from cloudinit import templater +from cloudinit import util + +from cloudinit.config import cc_apt_configure +from cloudinit.sources import DataSourceNone + +from cloudinit.distros.debian import Distro + +from .. import helpers as t_help + +LOG = logging.getLogger(__name__) + +YAML_TEXT_CUSTOM_SL = """ +apt_mirror: http://archive.ubuntu.com/ubuntu/ +apt_custom_sources_list: | + ## template:jinja + ## Note, this file is written by cloud-init on first boot of an instance + ## modifications made here will not survive a re-bundle. + ## if you wish to make changes you can: + ## a.) add 'apt_preserve_sources_list: true' to /etc/cloud/cloud.cfg + ## or do the same in user-data + ## b.) add sources in /etc/apt/sources.list.d + ## c.) make changes to template file /etc/cloud/templates/sources.list.tmpl + + # See http://help.ubuntu.com/community/UpgradeNotes for how to upgrade to + # newer versions of the distribution. + deb {{mirror}} {{codename}} main restricted + deb-src {{mirror}} {{codename}} main restricted + # FIND_SOMETHING_SPECIAL +""" + +EXPECTED_CONVERTED_CONTENT = ( + """## Note, this file is written by cloud-init on first boot of an instance +## modifications made here will not survive a re-bundle. +## if you wish to make changes you can: +## a.) add 'apt_preserve_sources_list: true' to /etc/cloud/cloud.cfg +## or do the same in user-data +## b.) add sources in /etc/apt/sources.list.d +## c.) make changes to template file /etc/cloud/templates/sources.list.tmpl + +# See http://help.ubuntu.com/community/UpgradeNotes for how to upgrade to +# newer versions of the distribution. +deb http://archive.ubuntu.com/ubuntu/ fakerelease main restricted +deb-src http://archive.ubuntu.com/ubuntu/ fakerelease main restricted +# FIND_SOMETHING_SPECIAL +""") + + +def load_tfile_or_url(*args, **kwargs): + """load_tfile_or_url + load file and return content after decoding + """ + return util.decode_binary(util.read_file_or_url(*args, **kwargs).contents) + + +class TestAptSourceConfigSourceList(t_help.FilesystemMockingTestCase): + """TestAptSourceConfigSourceList + Main Class to test sources list rendering + """ + def setUp(self): + super(TestAptSourceConfigSourceList, self).setUp() + self.subp = util.subp + self.new_root = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, self.new_root) + + def _get_cloud(self, distro, metadata=None): + self.patchUtils(self.new_root) + paths = helpers.Paths({}) + cls = distros.fetch(distro) + mydist = cls(distro, {}, paths) + myds = DataSourceNone.DataSourceNone({}, mydist, paths) + if metadata: + myds.metadata.update(metadata) + return cloud.Cloud(myds, paths, {}, mydist, None) + + def apt_source_list(self, distro, mirror, mirrorcheck=None): + """apt_source_list + Test rendering of a source.list from template for a given distro + """ + if mirrorcheck is None: + mirrorcheck = mirror + + if isinstance(mirror, list): + cfg = {'apt_mirror_search': mirror} + else: + cfg = {'apt_mirror': mirror} + mycloud = self._get_cloud(distro) + + with mock.patch.object(templater, 'render_to_file') as mocktmpl: + with mock.patch.object(os.path, 'isfile', + return_value=True) as mockisfile: + with mock.patch.object(util, 'rename'): + cc_apt_configure.handle("notimportant", cfg, mycloud, + LOG, None) + + mockisfile.assert_any_call( + ('/etc/cloud/templates/sources.list.%s.tmpl' % distro)) + mocktmpl.assert_called_once_with( + ('/etc/cloud/templates/sources.list.%s.tmpl' % distro), + '/etc/apt/sources.list', + {'codename': '', 'primary': mirrorcheck, 'mirror': mirrorcheck}) + + def test_apt_source_list_debian(self): + """Test rendering of a source.list from template for debian""" + self.apt_source_list('debian', 'http://httpredir.debian.org/debian') + + def test_apt_source_list_ubuntu(self): + """Test rendering of a source.list from template for ubuntu""" + self.apt_source_list('ubuntu', 'http://archive.ubuntu.com/ubuntu/') + + @staticmethod + def myresolve(name): + """Fake util.is_resolvable for mirrorfail tests""" + if name == "does.not.exist": + print("Faking FAIL for '%s'" % name) + return False + else: + print("Faking SUCCESS for '%s'" % name) + return True + + def test_apt_srcl_debian_mirrorfail(self): + """Test rendering of a source.list from template for debian""" + with mock.patch.object(util, 'is_resolvable', + side_effect=self.myresolve) as mockresolve: + self.apt_source_list('debian', + ['http://does.not.exist', + 'http://httpredir.debian.org/debian'], + 'http://httpredir.debian.org/debian') + mockresolve.assert_any_call("does.not.exist") + mockresolve.assert_any_call("httpredir.debian.org") + + def test_apt_srcl_ubuntu_mirrorfail(self): + """Test rendering of a source.list from template for ubuntu""" + with mock.patch.object(util, 'is_resolvable', + side_effect=self.myresolve) as mockresolve: + self.apt_source_list('ubuntu', + ['http://does.not.exist', + 'http://archive.ubuntu.com/ubuntu/'], + 'http://archive.ubuntu.com/ubuntu/') + mockresolve.assert_any_call("does.not.exist") + mockresolve.assert_any_call("archive.ubuntu.com") + + def test_apt_srcl_custom(self): + """Test rendering from a custom source.list template""" + cfg = util.load_yaml(YAML_TEXT_CUSTOM_SL) + mycloud = self._get_cloud('ubuntu') + + # the second mock restores the original subp + with mock.patch.object(util, 'write_file') as mockwrite: + with mock.patch.object(util, 'subp', self.subp): + with mock.patch.object(cc_apt_configure, 'get_release', + return_value='fakerelease'): + with mock.patch.object(Distro, 'get_primary_arch', + return_value='amd64'): + cc_apt_configure.handle("notimportant", cfg, mycloud, + LOG, None) + + mockwrite.assert_called_once_with( + '/etc/apt/sources.list', + EXPECTED_CONVERTED_CONTENT, + mode=420) + + +# vi: ts=4 expandtab diff --git a/tests/unittests/test_handler/test_handler_apt_source.py b/tests/unittests/test_handler/test_handler_apt_source.py new file mode 100644 index 00000000..99a4d860 --- /dev/null +++ b/tests/unittests/test_handler/test_handler_apt_source.py @@ -0,0 +1,516 @@ +""" test_handler_apt_source +Testing various config variations of the apt_source config +""" +import os +import re +import shutil +import tempfile + +try: + from unittest import mock +except ImportError: + import mock +from mock import call + +from cloudinit.config import cc_apt_configure +from cloudinit import gpg +from cloudinit import util + +from ..helpers import TestCase + +EXPECTEDKEY = """-----BEGIN PGP PUBLIC KEY BLOCK----- +Version: GnuPG v1 + +mI0ESuZLUgEEAKkqq3idtFP7g9hzOu1a8+v8ImawQN4TrvlygfScMU1TIS1eC7UQ +NUA8Qqgr9iUaGnejb0VciqftLrU9D6WYHSKz+EITefgdyJ6SoQxjoJdsCpJ7o9Jy +8PQnpRttiFm4qHu6BVnKnBNxw/z3ST9YMqW5kbMQpfxbGe+obRox59NpABEBAAG0 +HUxhdW5jaHBhZCBQUEEgZm9yIFNjb3R0IE1vc2VyiLYEEwECACAFAkrmS1ICGwMG +CwkIBwMCBBUCCAMEFgIDAQIeAQIXgAAKCRAGILvPA2g/d3aEA/9tVjc10HOZwV29 +OatVuTeERjjrIbxflO586GLA8cp0C9RQCwgod/R+cKYdQcHjbqVcP0HqxveLg0RZ +FJpWLmWKamwkABErwQLGlM/Hwhjfade8VvEQutH5/0JgKHmzRsoqfR+LMO6OS+Sm +S0ORP6HXET3+jC8BMG4tBWCTK/XEZw== +=ACB2 +-----END PGP PUBLIC KEY BLOCK-----""" + + +def load_tfile_or_url(*args, **kwargs): + """load_tfile_or_url + load file and return content after decoding + """ + return util.decode_binary(util.read_file_or_url(*args, **kwargs).contents) + + +class TestAptSourceConfig(TestCase): + """TestAptSourceConfig + Main Class to test apt_source configs + """ + release = "fantastic" + + def setUp(self): + super(TestAptSourceConfig, self).setUp() + self.tmp = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, self.tmp) + self.aptlistfile = os.path.join(self.tmp, "single-deb.list") + self.aptlistfile2 = os.path.join(self.tmp, "single-deb2.list") + self.aptlistfile3 = os.path.join(self.tmp, "single-deb3.list") + self.join = os.path.join + # mock fallback filename into writable tmp dir + self.fallbackfn = os.path.join(self.tmp, "etc/apt/sources.list.d/", + "cloud_config_sources.list") + + patcher = mock.patch("cloudinit.config.cc_apt_configure.get_release") + get_rel = patcher.start() + get_rel.return_value = self.release + self.addCleanup(patcher.stop) + + @staticmethod + def _get_default_params(): + """get_default_params + Get the most basic default mrror and release info to be used in tests + """ + params = {} + params['RELEASE'] = cc_apt_configure.get_release() + params['MIRROR'] = "http://archive.ubuntu.com/ubuntu" + return params + + def myjoin(self, *args, **kwargs): + """myjoin - redir into writable tmpdir""" + if (args[0] == "/etc/apt/sources.list.d/" and + args[1] == "cloud_config_sources.list" and + len(args) == 2): + return self.join(self.tmp, args[0].lstrip("/"), args[1]) + else: + return self.join(*args, **kwargs) + + def apt_src_basic(self, filename, cfg): + """apt_src_basic + Test Fix deb source string, has to overwrite mirror conf in params + """ + params = self._get_default_params() + + cc_apt_configure.add_apt_sources(cfg, params) + + self.assertTrue(os.path.isfile(filename)) + + contents = load_tfile_or_url(filename) + self.assertTrue(re.search(r"%s %s %s %s\n" % + ("deb", "http://archive.ubuntu.com/ubuntu", + "karmic-backports", + "main universe multiverse restricted"), + contents, flags=re.IGNORECASE)) + + def test_apt_src_basic(self): + """Test deb source string, overwrite mirror and filename""" + cfg = {'source': ('deb http://archive.ubuntu.com/ubuntu' + ' karmic-backports' + ' main universe multiverse restricted'), + 'filename': self.aptlistfile} + self.apt_src_basic(self.aptlistfile, [cfg]) + + def test_apt_src_basic_dict(self): + """Test deb source string, overwrite mirror and filename (dict)""" + cfg = {self.aptlistfile: {'source': + ('deb http://archive.ubuntu.com/ubuntu' + ' karmic-backports' + ' main universe multiverse restricted')}} + self.apt_src_basic(self.aptlistfile, cfg) + + def apt_src_basic_tri(self, cfg): + """apt_src_basic_tri + Test Fix three deb source string, has to overwrite mirror conf in + params. Test with filenames provided in config. + generic part to check three files with different content + """ + self.apt_src_basic(self.aptlistfile, cfg) + + # extra verify on two extra files of this test + contents = load_tfile_or_url(self.aptlistfile2) + self.assertTrue(re.search(r"%s %s %s %s\n" % + ("deb", "http://archive.ubuntu.com/ubuntu", + "precise-backports", + "main universe multiverse restricted"), + contents, flags=re.IGNORECASE)) + contents = load_tfile_or_url(self.aptlistfile3) + self.assertTrue(re.search(r"%s %s %s %s\n" % + ("deb", "http://archive.ubuntu.com/ubuntu", + "lucid-backports", + "main universe multiverse restricted"), + contents, flags=re.IGNORECASE)) + + def test_apt_src_basic_tri(self): + """Test Fix three deb source string with filenames""" + cfg1 = {'source': ('deb http://archive.ubuntu.com/ubuntu' + ' karmic-backports' + ' main universe multiverse restricted'), + 'filename': self.aptlistfile} + cfg2 = {'source': ('deb http://archive.ubuntu.com/ubuntu' + ' precise-backports' + ' main universe multiverse restricted'), + 'filename': self.aptlistfile2} + cfg3 = {'source': ('deb http://archive.ubuntu.com/ubuntu' + ' lucid-backports' + ' main universe multiverse restricted'), + 'filename': self.aptlistfile3} + self.apt_src_basic_tri([cfg1, cfg2, cfg3]) + + def test_apt_src_basic_dict_tri(self): + """Test Fix three deb source string with filenames (dict)""" + cfg = {self.aptlistfile: {'source': + ('deb http://archive.ubuntu.com/ubuntu' + ' karmic-backports' + ' main universe multiverse restricted')}, + self.aptlistfile2: {'source': + ('deb http://archive.ubuntu.com/ubuntu' + ' precise-backports' + ' main universe multiverse restricted')}, + self.aptlistfile3: {'source': + ('deb http://archive.ubuntu.com/ubuntu' + ' lucid-backports' + ' main universe multiverse restricted')}} + self.apt_src_basic_tri(cfg) + + def test_apt_src_basic_nofn(self): + """Test Fix three deb source string without filenames (dict)""" + cfg = {'source': ('deb http://archive.ubuntu.com/ubuntu' + ' karmic-backports' + ' main universe multiverse restricted')} + with mock.patch.object(os.path, 'join', side_effect=self.myjoin): + self.apt_src_basic(self.fallbackfn, [cfg]) + + def apt_src_replacement(self, filename, cfg): + """apt_src_replace + Test Autoreplacement of MIRROR and RELEASE in source specs + """ + params = self._get_default_params() + cc_apt_configure.add_apt_sources(cfg, params) + + self.assertTrue(os.path.isfile(filename)) + + contents = load_tfile_or_url(filename) + self.assertTrue(re.search(r"%s %s %s %s\n" % + ("deb", params['MIRROR'], params['RELEASE'], + "multiverse"), + contents, flags=re.IGNORECASE)) + + def test_apt_src_replace(self): + """Test Autoreplacement of MIRROR and RELEASE in source specs""" + cfg = {'source': 'deb $MIRROR $RELEASE multiverse', + 'filename': self.aptlistfile} + self.apt_src_replacement(self.aptlistfile, [cfg]) + + def apt_src_replace_tri(self, cfg): + """apt_src_replace_tri + Test three autoreplacements of MIRROR and RELEASE in source specs with + generic part + """ + self.apt_src_replacement(self.aptlistfile, cfg) + + # extra verify on two extra files of this test + params = self._get_default_params() + contents = load_tfile_or_url(self.aptlistfile2) + self.assertTrue(re.search(r"%s %s %s %s\n" % + ("deb", params['MIRROR'], params['RELEASE'], + "main"), + contents, flags=re.IGNORECASE)) + contents = load_tfile_or_url(self.aptlistfile3) + self.assertTrue(re.search(r"%s %s %s %s\n" % + ("deb", params['MIRROR'], params['RELEASE'], + "universe"), + contents, flags=re.IGNORECASE)) + + def test_apt_src_replace_tri(self): + """Test triple Autoreplacement of MIRROR and RELEASE in source specs""" + cfg1 = {'source': 'deb $MIRROR $RELEASE multiverse', + 'filename': self.aptlistfile} + cfg2 = {'source': 'deb $MIRROR $RELEASE main', + 'filename': self.aptlistfile2} + cfg3 = {'source': 'deb $MIRROR $RELEASE universe', + 'filename': self.aptlistfile3} + self.apt_src_replace_tri([cfg1, cfg2, cfg3]) + + def test_apt_src_replace_dict_tri(self): + """Test triple Autoreplacement in source specs (dict)""" + cfg = {self.aptlistfile: {'source': 'deb $MIRROR $RELEASE multiverse'}, + 'notused': {'source': 'deb $MIRROR $RELEASE main', + 'filename': self.aptlistfile2}, + self.aptlistfile3: {'source': 'deb $MIRROR $RELEASE universe'}} + self.apt_src_replace_tri(cfg) + + def test_apt_src_replace_nofn(self): + """Test Autoreplacement of MIRROR and RELEASE in source specs nofile""" + cfg = {'source': 'deb $MIRROR $RELEASE multiverse'} + with mock.patch.object(os.path, 'join', side_effect=self.myjoin): + self.apt_src_replacement(self.fallbackfn, [cfg]) + + def apt_src_keyid(self, filename, cfg, keynum): + """apt_src_keyid + Test specification of a source + keyid + """ + params = self._get_default_params() + + with mock.patch.object(util, 'subp', + return_value=('fakekey 1234', '')) as mockobj: + cc_apt_configure.add_apt_sources(cfg, params) + + # check if it added the right ammount of keys + calls = [] + for _ in range(keynum): + calls.append(call(('apt-key', 'add', '-'), 'fakekey 1234')) + mockobj.assert_has_calls(calls, any_order=True) + + self.assertTrue(os.path.isfile(filename)) + + contents = load_tfile_or_url(filename) + self.assertTrue(re.search(r"%s %s %s %s\n" % + ("deb", + ('http://ppa.launchpad.net/smoser/' + 'cloud-init-test/ubuntu'), + "xenial", "main"), + contents, flags=re.IGNORECASE)) + + def test_apt_src_keyid(self): + """Test specification of a source + keyid with filename being set""" + cfg = {'source': ('deb ' + 'http://ppa.launchpad.net/' + 'smoser/cloud-init-test/ubuntu' + ' xenial main'), + 'keyid': "03683F77", + 'filename': self.aptlistfile} + self.apt_src_keyid(self.aptlistfile, [cfg], 1) + + def test_apt_src_keyid_tri(self): + """Test 3x specification of a source + keyid with filename being set""" + cfg1 = {'source': ('deb ' + 'http://ppa.launchpad.net/' + 'smoser/cloud-init-test/ubuntu' + ' xenial main'), + 'keyid': "03683F77", + 'filename': self.aptlistfile} + cfg2 = {'source': ('deb ' + 'http://ppa.launchpad.net/' + 'smoser/cloud-init-test/ubuntu' + ' xenial universe'), + 'keyid': "03683F77", + 'filename': self.aptlistfile2} + cfg3 = {'source': ('deb ' + 'http://ppa.launchpad.net/' + 'smoser/cloud-init-test/ubuntu' + ' xenial multiverse'), + 'keyid': "03683F77", + 'filename': self.aptlistfile3} + + self.apt_src_keyid(self.aptlistfile, [cfg1, cfg2, cfg3], 3) + contents = load_tfile_or_url(self.aptlistfile2) + self.assertTrue(re.search(r"%s %s %s %s\n" % + ("deb", + ('http://ppa.launchpad.net/smoser/' + 'cloud-init-test/ubuntu'), + "xenial", "universe"), + contents, flags=re.IGNORECASE)) + contents = load_tfile_or_url(self.aptlistfile3) + self.assertTrue(re.search(r"%s %s %s %s\n" % + ("deb", + ('http://ppa.launchpad.net/smoser/' + 'cloud-init-test/ubuntu'), + "xenial", "multiverse"), + contents, flags=re.IGNORECASE)) + + def test_apt_src_keyid_nofn(self): + """Test specification of a source + keyid without filename being set""" + cfg = {'source': ('deb ' + 'http://ppa.launchpad.net/' + 'smoser/cloud-init-test/ubuntu' + ' xenial main'), + 'keyid': "03683F77"} + with mock.patch.object(os.path, 'join', side_effect=self.myjoin): + self.apt_src_keyid(self.fallbackfn, [cfg], 1) + + def apt_src_key(self, filename, cfg): + """apt_src_key + Test specification of a source + key + """ + params = self._get_default_params() + + with mock.patch.object(util, 'subp') as mockobj: + cc_apt_configure.add_apt_sources([cfg], params) + + mockobj.assert_called_with(('apt-key', 'add', '-'), 'fakekey 4321') + + self.assertTrue(os.path.isfile(filename)) + + contents = load_tfile_or_url(filename) + self.assertTrue(re.search(r"%s %s %s %s\n" % + ("deb", + ('http://ppa.launchpad.net/smoser/' + 'cloud-init-test/ubuntu'), + "xenial", "main"), + contents, flags=re.IGNORECASE)) + + def test_apt_src_key(self): + """Test specification of a source + key with filename being set""" + cfg = {'source': ('deb ' + 'http://ppa.launchpad.net/' + 'smoser/cloud-init-test/ubuntu' + ' xenial main'), + 'key': "fakekey 4321", + 'filename': self.aptlistfile} + self.apt_src_key(self.aptlistfile, cfg) + + def test_apt_src_key_nofn(self): + """Test specification of a source + key without filename being set""" + cfg = {'source': ('deb ' + 'http://ppa.launchpad.net/' + 'smoser/cloud-init-test/ubuntu' + ' xenial main'), + 'key': "fakekey 4321"} + with mock.patch.object(os.path, 'join', side_effect=self.myjoin): + self.apt_src_key(self.fallbackfn, cfg) + + def test_apt_src_keyonly(self): + """Test specifying key without source""" + params = self._get_default_params() + cfg = {'key': "fakekey 4242", + 'filename': self.aptlistfile} + + with mock.patch.object(util, 'subp') as mockobj: + cc_apt_configure.add_apt_sources([cfg], params) + + mockobj.assert_called_once_with(('apt-key', 'add', '-'), + 'fakekey 4242') + + # filename should be ignored on key only + self.assertFalse(os.path.isfile(self.aptlistfile)) + + def test_apt_src_keyidonly(self): + """Test specification of a keyid without source""" + params = self._get_default_params() + cfg = {'keyid': "03683F77", + 'filename': self.aptlistfile} + + with mock.patch.object(util, 'subp', + return_value=('fakekey 1212', '')) as mockobj: + cc_apt_configure.add_apt_sources([cfg], params) + + mockobj.assert_called_with(('apt-key', 'add', '-'), 'fakekey 1212') + + # filename should be ignored on key only + self.assertFalse(os.path.isfile(self.aptlistfile)) + + def apt_src_keyid_real(self, cfg, expectedkey): + """apt_src_keyid_real + Test specification of a keyid without source including + up to addition of the key (add_apt_key_raw mocked to keep the + environment as is) + """ + params = self._get_default_params() + + with mock.patch.object(cc_apt_configure, 'add_apt_key_raw') as mockkey: + with mock.patch.object(gpg, 'get_key_by_id', + return_value=expectedkey) as mockgetkey: + cc_apt_configure.add_apt_sources([cfg], params) + + mockgetkey.assert_called_with(cfg['keyid'], + cfg.get('keyserver', + 'keyserver.ubuntu.com')) + mockkey.assert_called_with(expectedkey) + + # filename should be ignored on key only + self.assertFalse(os.path.isfile(self.aptlistfile)) + + def test_apt_src_keyid_real(self): + """test_apt_src_keyid_real - Test keyid including key add""" + keyid = "03683F77" + cfg = {'keyid': keyid, + 'filename': self.aptlistfile} + + self.apt_src_keyid_real(cfg, EXPECTEDKEY) + + def test_apt_src_longkeyid_real(self): + """test_apt_src_longkeyid_real - Test long keyid including key add""" + keyid = "B59D 5F15 97A5 04B7 E230 6DCA 0620 BBCF 0368 3F77" + cfg = {'keyid': keyid, + 'filename': self.aptlistfile} + + self.apt_src_keyid_real(cfg, EXPECTEDKEY) + + def test_apt_src_longkeyid_ks_real(self): + """test_apt_src_longkeyid_ks_real - Test long keyid from other ks""" + keyid = "B59D 5F15 97A5 04B7 E230 6DCA 0620 BBCF 0368 3F77" + cfg = {'keyid': keyid, + 'keyserver': 'keys.gnupg.net', + 'filename': self.aptlistfile} + + self.apt_src_keyid_real(cfg, EXPECTEDKEY) + + def test_apt_src_ppa(self): + """Test adding a ppa""" + params = self._get_default_params() + cfg = {'source': 'ppa:smoser/cloud-init-test', + 'filename': self.aptlistfile} + + # default matcher needed for ppa + matcher = re.compile(r'^[\w-]+:\w').search + + with mock.patch.object(util, 'subp') as mockobj: + cc_apt_configure.add_apt_sources([cfg], params, + aa_repo_match=matcher) + mockobj.assert_called_once_with(['add-apt-repository', + 'ppa:smoser/cloud-init-test']) + + # adding ppa should ignore filename (uses add-apt-repository) + self.assertFalse(os.path.isfile(self.aptlistfile)) + + def test_apt_src_ppa_tri(self): + """Test adding three ppa's""" + params = self._get_default_params() + cfg1 = {'source': 'ppa:smoser/cloud-init-test', + 'filename': self.aptlistfile} + cfg2 = {'source': 'ppa:smoser/cloud-init-test2', + 'filename': self.aptlistfile2} + cfg3 = {'source': 'ppa:smoser/cloud-init-test3', + 'filename': self.aptlistfile3} + + # default matcher needed for ppa + matcher = re.compile(r'^[\w-]+:\w').search + + with mock.patch.object(util, 'subp') as mockobj: + cc_apt_configure.add_apt_sources([cfg1, cfg2, cfg3], params, + aa_repo_match=matcher) + calls = [call(['add-apt-repository', 'ppa:smoser/cloud-init-test']), + call(['add-apt-repository', 'ppa:smoser/cloud-init-test2']), + call(['add-apt-repository', 'ppa:smoser/cloud-init-test3'])] + mockobj.assert_has_calls(calls, any_order=True) + + # adding ppa should ignore all filenames (uses add-apt-repository) + self.assertFalse(os.path.isfile(self.aptlistfile)) + self.assertFalse(os.path.isfile(self.aptlistfile2)) + self.assertFalse(os.path.isfile(self.aptlistfile3)) + + def test_convert_to_new_format(self): + """Test the conversion of old to new format""" + cfg1 = {'source': 'deb $MIRROR $RELEASE multiverse', + 'filename': self.aptlistfile} + cfg2 = {'source': 'deb $MIRROR $RELEASE main', + 'filename': self.aptlistfile2} + cfg3 = {'source': 'deb $MIRROR $RELEASE universe', + 'filename': self.aptlistfile3} + checkcfg = {self.aptlistfile: {'filename': self.aptlistfile, + 'source': 'deb $MIRROR $RELEASE ' + 'multiverse'}, + self.aptlistfile2: {'filename': self.aptlistfile2, + 'source': 'deb $MIRROR $RELEASE main'}, + self.aptlistfile3: {'filename': self.aptlistfile3, + 'source': 'deb $MIRROR $RELEASE ' + 'universe'}} + + newcfg = cc_apt_configure.convert_to_new_format([cfg1, cfg2, cfg3]) + self.assertEqual(newcfg, checkcfg) + + newcfg2 = cc_apt_configure.convert_to_new_format(newcfg) + self.assertEqual(newcfg2, checkcfg) + + with self.assertRaises(ValueError): + cc_apt_configure.convert_to_new_format(5) + + +# vi: ts=4 expandtab diff --git a/tests/unittests/test_handler/test_handler_ca_certs.py b/tests/unittests/test_handler/test_handler_ca_certs.py index a6b9c0fd..5e771731 100644 --- a/tests/unittests/test_handler/test_handler_ca_certs.py +++ b/tests/unittests/test_handler/test_handler_ca_certs.py @@ -1,8 +1,8 @@ from cloudinit import cloud +from cloudinit.config import cc_ca_certs from cloudinit import helpers from cloudinit import util -from cloudinit.config import cc_ca_certs from ..helpers import TestCase import logging @@ -176,8 +176,7 @@ class TestAddCaCerts(TestCase): mock_write.assert_has_calls([ mock.call("/usr/share/ca-certificates/cloud-init-ca-certs.crt", cert, mode=0o644), - mock.call("/etc/ca-certificates.conf", expected, omode="wb"), - ]) + mock.call("/etc/ca-certificates.conf", expected, omode="wb")]) mock_load.assert_called_once_with("/etc/ca-certificates.conf") def test_single_cert_no_trailing_cr(self): @@ -202,8 +201,7 @@ class TestAddCaCerts(TestCase): mock.call("/etc/ca-certificates.conf", "%s\n%s\n" % (ca_certs_content, "cloud-init-ca-certs.crt"), - omode="wb"), - ]) + omode="wb")]) mock_load.assert_called_once_with("/etc/ca-certificates.conf") @@ -228,8 +226,7 @@ class TestAddCaCerts(TestCase): mock.call("/etc/ca-certificates.conf", "%s\n%s\n" % (ca_certs_content, "cloud-init-ca-certs.crt"), - omode='wb'), - ]) + omode='wb')]) mock_load.assert_called_once_with("/etc/ca-certificates.conf") @@ -264,8 +261,7 @@ class TestRemoveDefaultCaCerts(TestCase): mock_delete.assert_has_calls([ mock.call("/usr/share/ca-certificates/"), - mock.call("/etc/ssl/certs/"), - ]) + mock.call("/etc/ssl/certs/")]) mock_write.assert_called_once_with( "/etc/ca-certificates.conf", "", mode=0o644) diff --git a/tests/unittests/test_handler/test_handler_chef.py b/tests/unittests/test_handler/test_handler_chef.py index 7763f23b..7a1bc317 100644 --- a/tests/unittests/test_handler/test_handler_chef.py +++ b/tests/unittests/test_handler/test_handler_chef.py @@ -1,21 +1,19 @@ import json +import logging import os - -from cloudinit.config import cc_chef +import shutil +import six +import tempfile from cloudinit import cloud +from cloudinit.config import cc_chef from cloudinit import distros from cloudinit import helpers -from cloudinit import util from cloudinit.sources import DataSourceNone +from cloudinit import util from .. import helpers as t_help -import six -import logging -import shutil -import tempfile - LOG = logging.getLogger(__name__) CLIENT_TEMPL = os.path.sep.join(["templates", "chef_client.rb.tmpl"]) diff --git a/tests/unittests/test_handler/test_handler_growpart.py b/tests/unittests/test_handler/test_handler_growpart.py index bef0d80d..e653488a 100644 --- a/tests/unittests/test_handler/test_handler_growpart.py +++ b/tests/unittests/test_handler/test_handler_growpart.py @@ -1,7 +1,7 @@ from cloudinit import cloud +from cloudinit.config import cc_growpart from cloudinit import util -from cloudinit.config import cc_growpart from ..helpers import TestCase import errno diff --git a/tests/unittests/test_handler/test_handler_locale.py b/tests/unittests/test_handler/test_handler_locale.py index de85eff6..c91908f4 100644 --- a/tests/unittests/test_handler/test_handler_locale.py +++ b/tests/unittests/test_handler/test_handler_locale.py @@ -64,4 +64,4 @@ class TestLocale(t_help.FilesystemMockingTestCase): contents = util.load_file('/etc/sysconfig/language', decode=False) n_cfg = ConfigObj(BytesIO(contents)) - self.assertEquals({'RC_LANG': cfg['locale']}, dict(n_cfg)) + self.assertEqual({'RC_LANG': cfg['locale']}, dict(n_cfg)) diff --git a/tests/unittests/test_handler/test_handler_lxd.py b/tests/unittests/test_handler/test_handler_lxd.py index 5f61ba6a..6f90defb 100644 --- a/tests/unittests/test_handler/test_handler_lxd.py +++ b/tests/unittests/test_handler/test_handler_lxd.py @@ -1,6 +1,6 @@ from cloudinit.config import cc_lxd -from cloudinit import (distros, helpers, cloud) from cloudinit.sources import DataSourceNoCloud +from cloudinit import (distros, helpers, cloud) from .. import helpers as t_help import logging @@ -42,11 +42,11 @@ class TestLxd(t_help.TestCase): cc_lxd.handle('cc_lxd', self.lxd_cfg, cc, LOG, []) self.assertTrue(mock_util.which.called) init_call = mock_util.subp.call_args_list[0][0][0] - self.assertEquals(init_call, - ['lxd', 'init', '--auto', - '--network-address=0.0.0.0', - '--storage-backend=zfs', - '--storage-pool=poolname']) + self.assertEqual(init_call, + ['lxd', 'init', '--auto', + '--network-address=0.0.0.0', + '--storage-backend=zfs', + '--storage-pool=poolname']) @mock.patch("cloudinit.config.cc_lxd.util") def test_lxd_install(self, mock_util): @@ -56,7 +56,7 @@ class TestLxd(t_help.TestCase): cc_lxd.handle('cc_lxd', self.lxd_cfg, cc, LOG, []) self.assertTrue(cc.distro.install_packages.called) install_pkg = cc.distro.install_packages.call_args_list[0][0][0] - self.assertEquals(sorted(install_pkg), ['lxd', 'zfs']) + self.assertEqual(sorted(install_pkg), ['lxd', 'zfs']) @mock.patch("cloudinit.config.cc_lxd.util") def test_no_init_does_nothing(self, mock_util): @@ -87,7 +87,7 @@ class TestLxd(t_help.TestCase): "ipv6_netmask": "64", "ipv6_nat": "true", "domain": "lxd"} - self.assertEquals( + self.assertEqual( cc_lxd.bridge_to_debconf(data), {"lxd/setup-bridge": "true", "lxd/bridge-name": "testbr0", @@ -109,7 +109,7 @@ class TestLxd(t_help.TestCase): "ipv6_address": "fd98:9e0:3744::1", "ipv6_netmask": "64", "ipv6_nat": "true"} - self.assertEquals( + self.assertEqual( cc_lxd.bridge_to_debconf(data), {"lxd/setup-bridge": "true", "lxd/bridge-ipv6": "true", @@ -120,7 +120,7 @@ class TestLxd(t_help.TestCase): def test_lxd_debconf_existing(self): data = {"mode": "existing", "name": "testbr0"} - self.assertEquals( + self.assertEqual( cc_lxd.bridge_to_debconf(data), {"lxd/setup-bridge": "false", "lxd/use-existing-bridge": "true", @@ -128,7 +128,7 @@ class TestLxd(t_help.TestCase): def test_lxd_debconf_none(self): data = {"mode": "none"} - self.assertEquals( + self.assertEqual( cc_lxd.bridge_to_debconf(data), {"lxd/setup-bridge": "false", "lxd/bridge-name": ""}) diff --git a/tests/unittests/test_handler/test_handler_power_state.py b/tests/unittests/test_handler/test_handler_power_state.py index 04ce5687..feff319d 100644 --- a/tests/unittests/test_handler/test_handler_power_state.py +++ b/tests/unittests/test_handler/test_handler_power_state.py @@ -119,7 +119,7 @@ def check_lps_ret(psc_return, mode=None): try: float(timeout) - except: + except Exception: errs.append("timeout failed convert to float") if len(errs): diff --git a/tests/unittests/test_handler/test_handler_rsyslog.py b/tests/unittests/test_handler/test_handler_rsyslog.py index b932165c..38636063 100644 --- a/tests/unittests/test_handler/test_handler_rsyslog.py +++ b/tests/unittests/test_handler/test_handler_rsyslog.py @@ -31,7 +31,7 @@ class TestLoadConfig(t_help.TestCase): 'config_dir': "mydir", 'config_filename': 'myfilename', 'service_reload_command': 'auto'} - ) + ) self.assertEqual(found, self.basecfg) diff --git a/tests/unittests/test_handler/test_handler_seed_random.py b/tests/unittests/test_handler/test_handler_seed_random.py index 98bc9b81..a0390da9 100644 --- a/tests/unittests/test_handler/test_handler_seed_random.py +++ b/tests/unittests/test_handler/test_handler_seed_random.py @@ -92,7 +92,7 @@ class TestRandomSeed(t_help.TestCase): } cc_seed_random.handle('test', cfg, self._get_cloud('ubuntu'), LOG, []) contents = util.load_file(self._seed_file) - self.assertEquals("tiny-tim-was-here", contents) + self.assertEqual("tiny-tim-was-here", contents) def test_append_random_unknown_encoding(self): data = self._compress(b"tiny-toe") @@ -117,7 +117,7 @@ class TestRandomSeed(t_help.TestCase): } cc_seed_random.handle('test', cfg, self._get_cloud('ubuntu'), LOG, []) contents = util.load_file(self._seed_file) - self.assertEquals("tiny-toe", contents) + self.assertEqual("tiny-toe", contents) def test_append_random_gz(self): data = self._compress(b"big-toe") @@ -130,7 +130,7 @@ class TestRandomSeed(t_help.TestCase): } cc_seed_random.handle('test', cfg, self._get_cloud('ubuntu'), LOG, []) contents = util.load_file(self._seed_file) - self.assertEquals("big-toe", contents) + self.assertEqual("big-toe", contents) def test_append_random_base64(self): data = util.b64e('bubbles') @@ -143,7 +143,7 @@ class TestRandomSeed(t_help.TestCase): } cc_seed_random.handle('test', cfg, self._get_cloud('ubuntu'), LOG, []) contents = util.load_file(self._seed_file) - self.assertEquals("bubbles", contents) + self.assertEqual("bubbles", contents) def test_append_random_b64(self): data = util.b64e('kit-kat') @@ -156,7 +156,7 @@ class TestRandomSeed(t_help.TestCase): } cc_seed_random.handle('test', cfg, self._get_cloud('ubuntu'), LOG, []) contents = util.load_file(self._seed_file) - self.assertEquals("kit-kat", contents) + self.assertEqual("kit-kat", contents) def test_append_random_metadata(self): cfg = { @@ -168,7 +168,7 @@ class TestRandomSeed(t_help.TestCase): c = self._get_cloud('ubuntu', {'random_seed': '-so-was-josh'}) cc_seed_random.handle('test', cfg, c, LOG, []) contents = util.load_file(self._seed_file) - self.assertEquals('tiny-tim-was-here-so-was-josh', contents) + self.assertEqual('tiny-tim-was-here-so-was-josh', contents) def test_seed_command_provided_and_available(self): c = self._get_cloud('ubuntu', {}) diff --git a/tests/unittests/test_handler/test_handler_set_hostname.py b/tests/unittests/test_handler/test_handler_set_hostname.py index d358b069..7effa124 100644 --- a/tests/unittests/test_handler/test_handler_set_hostname.py +++ b/tests/unittests/test_handler/test_handler_set_hostname.py @@ -7,13 +7,11 @@ from cloudinit import util from .. import helpers as t_help -import shutil -import tempfile +from configobj import ConfigObj import logging - +import shutil from six import BytesIO - -from configobj import ConfigObj +import tempfile LOG = logging.getLogger(__name__) @@ -43,8 +41,8 @@ class TestHostname(t_help.FilesystemMockingTestCase): if not distro.uses_systemd(): contents = util.load_file("/etc/sysconfig/network", decode=False) n_cfg = ConfigObj(BytesIO(contents)) - self.assertEquals({'HOSTNAME': 'blah.blah.blah.yahoo.com'}, - dict(n_cfg)) + self.assertEqual({'HOSTNAME': 'blah.blah.blah.yahoo.com'}, + dict(n_cfg)) def test_write_hostname_debian(self): cfg = { @@ -58,7 +56,7 @@ class TestHostname(t_help.FilesystemMockingTestCase): cc_set_hostname.handle('cc_set_hostname', cfg, cc, LOG, []) contents = util.load_file("/etc/hostname") - self.assertEquals('blah', contents.strip()) + self.assertEqual('blah', contents.strip()) def test_write_hostname_sles(self): cfg = { @@ -71,4 +69,4 @@ class TestHostname(t_help.FilesystemMockingTestCase): self.patchUtils(self.tmp) cc_set_hostname.handle('cc_set_hostname', cfg, cc, LOG, []) contents = util.load_file("/etc/HOSTNAME") - self.assertEquals('blah', contents.strip()) + self.assertEqual('blah', contents.strip()) diff --git a/tests/unittests/test_handler/test_handler_snappy.py b/tests/unittests/test_handler/test_handler_snappy.py index 8aeff53c..57dce1bc 100644 --- a/tests/unittests/test_handler/test_handler_snappy.py +++ b/tests/unittests/test_handler/test_handler_snappy.py @@ -1,6 +1,7 @@ from cloudinit.config.cc_snappy import ( makeop, get_package_ops, render_snap_op) from cloudinit import util + from .. import helpers as t_help import os diff --git a/tests/unittests/test_handler/test_handler_timezone.py b/tests/unittests/test_handler/test_handler_timezone.py index e3df8759..b7e6b03d 100644 --- a/tests/unittests/test_handler/test_handler_timezone.py +++ b/tests/unittests/test_handler/test_handler_timezone.py @@ -28,12 +28,10 @@ from cloudinit.sources import DataSourceNoCloud from .. import helpers as t_help from configobj import ConfigObj - -from six import BytesIO - +import logging import shutil +from six import BytesIO import tempfile -import logging LOG = logging.getLogger(__name__) @@ -72,7 +70,7 @@ class TestTimezone(t_help.FilesystemMockingTestCase): contents = util.load_file('/etc/sysconfig/clock', decode=False) n_cfg = ConfigObj(BytesIO(contents)) - self.assertEquals({'TIMEZONE': cfg['timezone']}, dict(n_cfg)) + self.assertEqual({'TIMEZONE': cfg['timezone']}, dict(n_cfg)) contents = util.load_file('/etc/localtime') - self.assertEquals(dummy_contents, contents.strip()) + self.assertEqual(dummy_contents, contents.strip()) diff --git a/tests/unittests/test_handler/test_handler_write_files.py b/tests/unittests/test_handler/test_handler_write_files.py index f1c7f7b4..466e45f8 100644 --- a/tests/unittests/test_handler/test_handler_write_files.py +++ b/tests/unittests/test_handler/test_handler_write_files.py @@ -1,6 +1,6 @@ -from cloudinit import util -from cloudinit import log as logging from cloudinit.config.cc_write_files import write_files +from cloudinit import log as logging +from cloudinit import util from ..helpers import FilesystemMockingTestCase diff --git a/tests/unittests/test_handler/test_handler_yum_add_repo.py b/tests/unittests/test_handler/test_handler_yum_add_repo.py index 3a8aa7c1..28b060f8 100644 --- a/tests/unittests/test_handler/test_handler_yum_add_repo.py +++ b/tests/unittests/test_handler/test_handler_yum_add_repo.py @@ -1,16 +1,13 @@ -from cloudinit import util - from cloudinit.config import cc_yum_add_repo +from cloudinit import util from .. import helpers -import shutil -import tempfile +import configobj import logging - +import shutil from six import BytesIO - -import configobj +import tempfile LOG = logging.getLogger(__name__) @@ -68,4 +65,4 @@ class TestConfig(helpers.FilesystemMockingTestCase): 'gpgcheck': '1', } } - self.assertEquals(expected, dict(contents)) + self.assertEqual(expected, dict(contents)) diff --git a/tests/unittests/test_helpers.py b/tests/unittests/test_helpers.py new file mode 100644 index 00000000..943a5723 --- /dev/null +++ b/tests/unittests/test_helpers.py @@ -0,0 +1,33 @@ +"""Tests of the built-in user data handlers.""" + +import os + +from . import helpers as test_helpers + +from cloudinit import sources + + +class MyDataSource(sources.DataSource): + _instance_id = None + + def get_instance_id(self): + return self._instance_id + + +class TestPaths(test_helpers.ResourceUsingTestCase): + def test_get_ipath_and_instance_id_with_slashes(self): + myds = MyDataSource(sys_cfg={}, distro=None, paths={}) + myds._instance_id = "/foo/bar" + safe_iid = "_foo_bar" + mypaths = self.getCloudPaths(myds) + + self.assertEqual( + os.path.join(mypaths.cloud_dir, 'instances', safe_iid), + mypaths.get_ipath()) + + def test_get_ipath_and_empty_instance_id_returns_none(self): + myds = MyDataSource(sys_cfg={}, distro=None, paths={}) + myds._instance_id = None + mypaths = self.getCloudPaths(myds) + + self.assertEqual(None, mypaths.get_ipath()) diff --git a/tests/unittests/test_merging.py b/tests/unittests/test_merging.py index 976d8283..a33ec184 100644 --- a/tests/unittests/test_merging.py +++ b/tests/unittests/test_merging.py @@ -125,15 +125,15 @@ class TestSimpleRun(helpers.ResourceUsingTestCase): def test_seed_runs(self): test_dicts = [] - for i in range(1, 50): + for i in range(1, 10): base_dicts = [] - for j in range(1, 50): + for j in range(1, 10): base_dicts.append(make_dict(5, i * j)) test_dicts.append(base_dicts) for test in test_dicts: c = _old_mergemanydict(*test) d = util.mergemanydict(test) - self.assertEquals(c, d) + self.assertEqual(c, d) def test_merge_cc_samples(self): tests = self._load_merge_files() @@ -155,7 +155,7 @@ class TestSimpleRun(helpers.ResourceUsingTestCase): fail_msg = fail_msg % (expected_fn, ",".join(merging_fns), merged_buf, expected_merge) - self.assertEquals(expected_merge, merged_buf, msg=fail_msg) + self.assertEqual(expected_merge, merged_buf, msg=fail_msg) def test_compat_merges_dict(self): a = { @@ -167,7 +167,7 @@ class TestSimpleRun(helpers.ResourceUsingTestCase): } c = _old_mergedict(a, b) d = util.mergemanydict([a, b]) - self.assertEquals(c, d) + self.assertEqual(c, d) def test_compat_merges_dict2(self): a = { @@ -182,7 +182,7 @@ class TestSimpleRun(helpers.ResourceUsingTestCase): } c = _old_mergedict(a, b) d = util.mergemanydict([a, b]) - self.assertEquals(c, d) + self.assertEqual(c, d) def test_compat_merges_list(self): a = {'b': [1, 2, 3]} @@ -190,7 +190,7 @@ class TestSimpleRun(helpers.ResourceUsingTestCase): c = {'b': [6, 7]} e = _old_mergemanydict(a, b, c) f = util.mergemanydict([a, b, c]) - self.assertEquals(e, f) + self.assertEqual(e, f) def test_compat_merges_str(self): a = {'b': "hi"} @@ -198,7 +198,7 @@ class TestSimpleRun(helpers.ResourceUsingTestCase): c = {'b': "hallo"} e = _old_mergemanydict(a, b, c) f = util.mergemanydict([a, b, c]) - self.assertEquals(e, f) + self.assertEqual(e, f) def test_compat_merge_sub_dict(self): a = { @@ -222,7 +222,7 @@ class TestSimpleRun(helpers.ResourceUsingTestCase): } c = _old_mergedict(a, b) d = util.mergemanydict([a, b]) - self.assertEquals(c, d) + self.assertEqual(c, d) def test_compat_merge_sub_dict2(self): a = { @@ -238,7 +238,7 @@ class TestSimpleRun(helpers.ResourceUsingTestCase): } c = _old_mergedict(a, b) d = util.mergemanydict([a, b]) - self.assertEquals(c, d) + self.assertEqual(c, d) def test_compat_merge_sub_list(self): a = { @@ -254,4 +254,4 @@ class TestSimpleRun(helpers.ResourceUsingTestCase): } c = _old_mergedict(a, b) d = util.mergemanydict([a, b]) - self.assertEquals(c, d) + self.assertEqual(c, d) diff --git a/tests/unittests/test_net.py b/tests/unittests/test_net.py index 09235c4d..3ae00fc6 100644 --- a/tests/unittests/test_net.py +++ b/tests/unittests/test_net.py @@ -1,13 +1,22 @@ -from cloudinit import util from cloudinit import net +from cloudinit.net import cmdline +from cloudinit.net import eni +from cloudinit.net import network_state +from cloudinit.net import sysconfig +from cloudinit.sources.helpers import openstack +from cloudinit import util + +from .helpers import mock from .helpers import TestCase import base64 import copy -import io import gzip +import io import json import os +import shutil +import tempfile DHCP_CONTENT_1 = """ DEVICE='eth0' @@ -67,22 +76,214 @@ STATIC_EXPECTED_1 = { 'dns_nameservers': ['10.0.1.1']}], } +# Examples (and expected outputs for various renderers). +OS_SAMPLES = [ + { + 'in_data': { + "services": [{"type": "dns", "address": "172.19.0.12"}], + "networks": [{ + "network_id": "dacd568d-5be6-4786-91fe-750c374b78b4", + "type": "ipv4", "netmask": "255.255.252.0", + "link": "tap1a81968a-79", + "routes": [{ + "netmask": "0.0.0.0", + "network": "0.0.0.0", + "gateway": "172.19.3.254", + }], + "ip_address": "172.19.1.34", "id": "network0" + }], + "links": [ + { + "ethernet_mac_address": "fa:16:3e:ed:9a:59", + "mtu": None, "type": "bridge", "id": + "tap1a81968a-79", + "vif_id": "1a81968a-797a-400f-8a80-567f997eb93f" + }, + ], + }, + 'in_macs': { + 'fa:16:3e:ed:9a:59': 'eth0', + }, + 'out_sysconfig': [ + ('etc/sysconfig/network-scripts/ifcfg-eth0', + """ +# Created by cloud-init on instance boot automatically, do not edit. +# +BOOTPROTO=static +DEFROUTE=yes +DEVICE=eth0 +GATEWAY=172.19.3.254 +HWADDR=fa:16:3e:ed:9a:59 +IPADDR=172.19.1.34 +NETMASK=255.255.252.0 +NM_CONTROLLED=no +ONBOOT=yes +TYPE=Ethernet +USERCTL=no +""".lstrip()), + ('etc/sysconfig/network-scripts/route-eth0', + """ +# Created by cloud-init on instance boot automatically, do not edit. +# +ADDRESS0=0.0.0.0 +GATEWAY0=172.19.3.254 +NETMASK0=0.0.0.0 +""".lstrip()), + ('etc/resolv.conf', + """ +; Created by cloud-init on instance boot automatically, do not edit. +; +nameserver 172.19.0.12 +""".lstrip()), + ('etc/udev/rules.d/70-persistent-net.rules', + "".join(['SUBSYSTEM=="net", ACTION=="add", DRIVERS=="?*", ', + 'ATTR{address}=="fa:16:3e:ed:9a:59", NAME="eth0"\n']))] + } +] + + +def _setup_test(tmp_dir, mock_get_devicelist, mock_sys_netdev_info, + mock_sys_dev_path): + mock_get_devicelist.return_value = ['eth1000'] + dev_characteristics = { + 'eth1000': { + "bridge": False, + "carrier": False, + "dormant": False, + "operstate": "down", + "address": "07-1C-C6-75-A4-BE", + } + } + + def netdev_info(name, field): + return dev_characteristics[name][field] + + mock_sys_netdev_info.side_effect = netdev_info + + def sys_dev_path(devname, path=""): + return tmp_dir + devname + "/" + path + + for dev in dev_characteristics: + os.makedirs(os.path.join(tmp_dir, dev)) + with open(os.path.join(tmp_dir, dev, 'operstate'), 'w') as fh: + fh.write("down") + + mock_sys_dev_path.side_effect = sys_dev_path + + +class TestSysConfigRendering(TestCase): + + @mock.patch("cloudinit.net.sys_dev_path") + @mock.patch("cloudinit.net.sys_netdev_info") + @mock.patch("cloudinit.net.get_devicelist") + def test_default_generation(self, mock_get_devicelist, + mock_sys_netdev_info, + mock_sys_dev_path): + tmp_dir = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, tmp_dir) + _setup_test(tmp_dir, mock_get_devicelist, + mock_sys_netdev_info, mock_sys_dev_path) + + network_cfg = net.generate_fallback_config() + ns = network_state.parse_net_config_data(network_cfg, + skip_broken=False) + + render_dir = os.path.join(tmp_dir, "render") + os.makedirs(render_dir) + + renderer = sysconfig.Renderer() + renderer.render_network_state(render_dir, ns) + + render_file = 'etc/sysconfig/network-scripts/ifcfg-eth1000' + with open(os.path.join(render_dir, render_file)) as fh: + content = fh.read() + expected_content = """ +# Created by cloud-init on instance boot automatically, do not edit. +# +BOOTPROTO=dhcp +DEVICE=eth1000 +HWADDR=07-1C-C6-75-A4-BE +NM_CONTROLLED=no +ONBOOT=yes +TYPE=Ethernet +USERCTL=no +""".lstrip() + self.assertEqual(expected_content, content) + + def test_openstack_rendering_samples(self): + tmp_dir = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, tmp_dir) + render_dir = os.path.join(tmp_dir, "render") + for os_sample in OS_SAMPLES: + ex_input = os_sample['in_data'] + ex_mac_addrs = os_sample['in_macs'] + network_cfg = openstack.convert_net_json( + ex_input, known_macs=ex_mac_addrs) + ns = network_state.parse_net_config_data(network_cfg, + skip_broken=False) + renderer = sysconfig.Renderer() + renderer.render_network_state(render_dir, ns) + for fn, expected_content in os_sample.get('out_sysconfig', []): + with open(os.path.join(render_dir, fn)) as fh: + self.assertEqual(expected_content, fh.read()) + + +class TestEniNetRendering(TestCase): + + @mock.patch("cloudinit.net.sys_dev_path") + @mock.patch("cloudinit.net.sys_netdev_info") + @mock.patch("cloudinit.net.get_devicelist") + def test_default_generation(self, mock_get_devicelist, + mock_sys_netdev_info, + mock_sys_dev_path): + tmp_dir = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, tmp_dir) + _setup_test(tmp_dir, mock_get_devicelist, + mock_sys_netdev_info, mock_sys_dev_path) + + network_cfg = net.generate_fallback_config() + ns = network_state.parse_net_config_data(network_cfg, + skip_broken=False) + + render_dir = os.path.join(tmp_dir, "render") + os.makedirs(render_dir) + + renderer = eni.Renderer( + {'links_path_prefix': None, + 'eni_path': 'interfaces', 'netrules_path': None, + }) + renderer.render_network_state(render_dir, ns) + + self.assertTrue(os.path.exists(os.path.join(render_dir, + 'interfaces'))) + with open(os.path.join(render_dir, 'interfaces')) as fh: + contents = fh.read() + + expected = """ +auto lo +iface lo inet loopback + +auto eth1000 +iface eth1000 inet dhcp +""" + self.assertEqual(expected.lstrip(), contents.lstrip()) + -class TestNetConfigParsing(TestCase): +class TestCmdlineConfigParsing(TestCase): simple_cfg = { 'config': [{"type": "physical", "name": "eth0", "mac_address": "c0:d6:9f:2c:e8:80", "subnets": [{"type": "dhcp"}]}]} - def test_klibc_convert_dhcp(self): - found = net._klibc_to_config_entry(DHCP_CONTENT_1) + def test_cmdline_convert_dhcp(self): + found = cmdline._klibc_to_config_entry(DHCP_CONTENT_1) self.assertEqual(found, ('eth0', DHCP_EXPECTED_1)) - def test_klibc_convert_static(self): - found = net._klibc_to_config_entry(STATIC_CONTENT_1) + def test_cmdline_convert_static(self): + found = cmdline._klibc_to_config_entry(STATIC_CONTENT_1) self.assertEqual(found, ('eth1', STATIC_EXPECTED_1)) - def test_config_from_klibc_net_cfg(self): + def test_config_from_cmdline_net_cfg(self): files = [] pairs = (('net-eth0.cfg', DHCP_CONTENT_1), ('net-eth1.cfg', STATIC_CONTENT_1)) @@ -103,21 +304,22 @@ class TestNetConfigParsing(TestCase): files.append(fp) util.write_file(fp, content) - found = net.config_from_klibc_net_cfg(files=files, mac_addrs=macs) + found = cmdline.config_from_klibc_net_cfg(files=files, + mac_addrs=macs) self.assertEqual(found, expected) def test_cmdline_with_b64(self): data = base64.b64encode(json.dumps(self.simple_cfg).encode()) encoded_text = data.decode() - cmdline = 'ro network-config=' + encoded_text + ' root=foo' - found = net.read_kernel_cmdline_config(cmdline=cmdline) + raw_cmdline = 'ro network-config=' + encoded_text + ' root=foo' + found = cmdline.read_kernel_cmdline_config(cmdline=raw_cmdline) self.assertEqual(found, self.simple_cfg) def test_cmdline_with_b64_gz(self): data = _gzip_data(json.dumps(self.simple_cfg).encode()) encoded_text = base64.b64encode(data).decode() - cmdline = 'ro network-config=' + encoded_text + ' root=foo' - found = net.read_kernel_cmdline_config(cmdline=cmdline) + raw_cmdline = 'ro network-config=' + encoded_text + ' root=foo' + found = cmdline.read_kernel_cmdline_config(cmdline=raw_cmdline) self.assertEqual(found, self.simple_cfg) diff --git a/tests/unittests/test_reporting.py b/tests/unittests/test_reporting.py index 32356ef9..20ca23df 100644 --- a/tests/unittests/test_reporting.py +++ b/tests/unittests/test_reporting.py @@ -4,10 +4,12 @@ # vi: ts=4 expandtab from cloudinit import reporting -from cloudinit.reporting import handlers from cloudinit.reporting import events +from cloudinit.reporting import handlers + +import mock -from .helpers import (mock, TestCase) +from .helpers import TestCase def _fake_registry(): diff --git a/tests/unittests/test_rh_subscription.py b/tests/unittests/test_rh_subscription.py index 8c586ad7..891dbe77 100644 --- a/tests/unittests/test_rh_subscription.py +++ b/tests/unittests/test_rh_subscription.py @@ -1,11 +1,24 @@ -from cloudinit import util -from cloudinit.config import cc_rh_subscription +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 3, as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + import logging -import mock -import unittest + +from cloudinit.config import cc_rh_subscription +from cloudinit import util + +from .helpers import TestCase, mock -class GoodTests(unittest.TestCase): +class GoodTests(TestCase): def setUp(self): super(GoodTests, self).setUp() self.name = "cc_rh_subscription" @@ -92,7 +105,7 @@ class GoodTests(unittest.TestCase): self.assertEqual(self.SM._sub_man_cli.call_count, 9) -class TestBadInput(unittest.TestCase): +class TestBadInput(TestCase): name = "cc_rh_subscription" cloud_init = None log = logging.getLogger("bad_tests") diff --git a/tests/unittests/test_runs/test_merge_run.py b/tests/unittests/test_runs/test_merge_run.py index d0ec36a9..ce43798e 100644 --- a/tests/unittests/test_runs/test_merge_run.py +++ b/tests/unittests/test_runs/test_merge_run.py @@ -42,13 +42,13 @@ class TestMergeRun(helpers.FilesystemMockingTestCase): args=[PER_INSTANCE], freq=PER_INSTANCE) mirrors = initer.distro.get_option('package_mirrors') - self.assertEquals(1, len(mirrors)) + self.assertEqual(1, len(mirrors)) mirror = mirrors[0] - self.assertEquals(mirror['arches'], ['i386', 'amd64', 'blah']) + self.assertEqual(mirror['arches'], ['i386', 'amd64', 'blah']) mods = stages.Modules(initer) (which_ran, failures) = mods.run_section('cloud_init_modules') self.assertTrue(len(failures) == 0) self.assertTrue(os.path.exists('/etc/blah.ini')) self.assertIn('write-files', which_ran) contents = util.load_file('/etc/blah.ini') - self.assertEquals(contents, 'blah') + self.assertEqual(contents, 'blah') diff --git a/tests/unittests/test_runs/test_simple_run.py b/tests/unittests/test_runs/test_simple_run.py index e19e65cd..07e7b1a8 100644 --- a/tests/unittests/test_runs/test_simple_run.py +++ b/tests/unittests/test_runs/test_simple_run.py @@ -63,7 +63,7 @@ class TestSimpleRun(helpers.FilesystemMockingTestCase): initer.fetch() iid = initer.instancify() - self.assertEquals(iid, 'iid-datasource-none') + self.assertEqual(iid, 'iid-datasource-none') initer.update() self.assertTrue(os.path.islink("var/lib/cloud/instance")) @@ -78,4 +78,4 @@ class TestSimpleRun(helpers.FilesystemMockingTestCase): self.assertTrue(os.path.exists('/etc/blah.ini')) self.assertIn('write-files', which_ran) contents = util.load_file('/etc/blah.ini') - self.assertEquals(contents, 'blah') + self.assertEqual(contents, 'blah') diff --git a/tests/unittests/test_templating.py b/tests/unittests/test_templating.py index b9863650..94b6e061 100644 --- a/tests/unittests/test_templating.py +++ b/tests/unittests/test_templating.py @@ -58,7 +58,7 @@ class TestTemplates(test_helpers.TestCase): blob = "blahblah $blah" (template_type, renderer, contents) = templater.detect_template(blob) self.assertIn("cheetah", template_type) - self.assertEquals(blob, contents) + self.assertEqual(blob, contents) blob = '##template:something-new' self.assertRaises(ValueError, templater.detect_template, blob) @@ -67,18 +67,18 @@ class TestTemplates(test_helpers.TestCase): blob = '''## template:cheetah $a,$b''' c = templater.render_string(blob, {"a": 1, "b": 2}) - self.assertEquals("1,2", c) + self.assertEqual("1,2", c) def test_render_jinja(self): blob = '''## template:jinja {{a}},{{b}}''' c = templater.render_string(blob, {"a": 1, "b": 2}) - self.assertEquals("1,2", c) + self.assertEqual("1,2", c) def test_render_default(self): blob = '''$a,$b''' c = templater.render_string(blob, {"a": 1, "b": 2}) - self.assertEquals("1,2", c) + self.assertEqual("1,2", c) def test_render_basic_deeper(self): hn = 'myfoohost.yahoo.com' |