diff options
Diffstat (limited to 'tests/unittests')
56 files changed, 4282 insertions, 1236 deletions
diff --git a/tests/unittests/helpers.py b/tests/unittests/helpers.py index 52305397..7f4b8784 100644 --- a/tests/unittests/helpers.py +++ b/tests/unittests/helpers.py @@ -1,25 +1,35 @@ +from __future__ import print_function + +import functools import os import sys +import shutil +import tempfile import unittest -from contextlib import contextmanager +import six -from mocker import Mocker -from mocker import MockerTestCase +try: + from unittest import mock +except ImportError: + import mock +try: + from contextlib import ExitStack +except ImportError: + from contextlib2 import ExitStack from cloudinit import helpers as ch from cloudinit import util -import shutil - # Used for detecting different python versions PY2 = False PY26 = False PY27 = False PY3 = False +FIX_HTTPRETTY = False _PY_VER = sys.version_info -_PY_MAJOR, _PY_MINOR = _PY_VER[0:2] +_PY_MAJOR, _PY_MINOR, _PY_MICRO = _PY_VER[0:3] if (_PY_MAJOR, _PY_MINOR) <= (2, 6): if (_PY_MAJOR, _PY_MINOR) == (2, 6): PY26 = True @@ -31,10 +41,24 @@ else: PY2 = True if (_PY_MAJOR, _PY_MINOR) >= (3, 0): PY3 = True + if _PY_MINOR == 4 and _PY_MICRO < 3: + FIX_HTTPRETTY = True if PY26: - # For now add these on, taken from python 2.7 + slightly adjusted + # For now add these on, taken from python 2.7 + slightly adjusted. Drop + # all this once Python 2.6 is dropped as a minimum requirement. class TestCase(unittest.TestCase): + def setUp(self): + super(TestCase, self).setUp() + self.__all_cleanups = ExitStack() + + def tearDown(self): + self.__all_cleanups.close() + unittest.TestCase.tearDown(self) + + def addCleanup(self, function, *args, **kws): + self.__all_cleanups.callback(function, *args, **kws) + def assertIs(self, expr1, expr2, msg=None): if expr1 is not expr2: standardMsg = '%r is not %r' % (expr1, expr2) @@ -57,10 +81,17 @@ if PY26: standardMsg = standardMsg % (value) self.fail(self._formatMessage(msg, standardMsg)) + def assertIsInstance(self, obj, cls, msg=None): + """Same as self.assertTrue(isinstance(obj, cls)), with a nicer + default message.""" + if not isinstance(obj, cls): + standardMsg = '%s is not an instance of %r' % (repr(obj), cls) + self.fail(self._formatMessage(msg, standardMsg)) + def assertDictContainsSubset(self, expected, actual, msg=None): missing = [] mismatched = [] - for k, v in expected.iteritems(): + for k, v in expected.items(): if k not in actual: missing.append(k) elif actual[k] != v: @@ -86,17 +117,6 @@ else: pass -@contextmanager -def mocker(verify_calls=True): - m = Mocker() - try: - yield m - finally: - m.restore() - if verify_calls: - m.verify() - - # Makes the old path start # with new base instead of whatever # it previously had @@ -121,14 +141,19 @@ def retarget_many_wrapper(new_base, am, old_func): nam = len(n_args) for i in range(0, nam): path = args[i] - n_args[i] = rebase_path(path, new_base) + # patchOS() wraps various os and os.path functions, however in + # Python 3 some of these now accept file-descriptors (integers). + # That breaks rebase_path() so in lieu of a better solution, just + # don't rebase if we get a fd. + if isinstance(path, six.string_types): + n_args[i] = rebase_path(path, new_base) return old_func(*n_args, **kwds) return wrapper -class ResourceUsingTestCase(MockerTestCase): - def __init__(self, methodName="runTest"): - MockerTestCase.__init__(self, methodName) +class ResourceUsingTestCase(TestCase): + def setUp(self): + super(ResourceUsingTestCase, self).setUp() self.resource_path = None def resourceLocation(self, subname=None): @@ -156,17 +181,23 @@ class ResourceUsingTestCase(MockerTestCase): return fh.read() def getCloudPaths(self): + tmpdir = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, tmpdir) cp = ch.Paths({ - 'cloud_dir': self.makeDir(), + 'cloud_dir': tmpdir, 'templates_dir': self.resourceLocation(), }) return cp class FilesystemMockingTestCase(ResourceUsingTestCase): - def __init__(self, methodName="runTest"): - ResourceUsingTestCase.__init__(self, methodName) - self.patched_funcs = [] + def setUp(self): + super(FilesystemMockingTestCase, self).setUp() + self.patched_funcs = ExitStack() + + def tearDown(self): + self.patched_funcs.close() + ResourceUsingTestCase.tearDown(self) def replicateTestRoot(self, example_root, target_root): real_root = self.resourceLocation() @@ -180,15 +211,6 @@ class FilesystemMockingTestCase(ResourceUsingTestCase): make_path = util.abs_join(make_path, f) shutil.copy(real_path, make_path) - def tearDown(self): - self.restore() - ResourceUsingTestCase.tearDown(self) - - def restore(self): - for (mod, f, func) in self.patched_funcs: - setattr(mod, f, func) - self.patched_funcs = [] - def patchUtils(self, new_root): patch_funcs = { util: [('write_file', 1), @@ -205,8 +227,8 @@ class FilesystemMockingTestCase(ResourceUsingTestCase): for (f, am) in funcs: func = getattr(mod, f) trap_func = retarget_many_wrapper(new_root, am, func) - setattr(mod, f, trap_func) - self.patched_funcs.append((mod, f, func)) + self.patched_funcs.enter_context( + mock.patch.object(mod, f, trap_func)) # Handle subprocess calls func = getattr(util, 'subp') @@ -214,28 +236,73 @@ class FilesystemMockingTestCase(ResourceUsingTestCase): def nsubp(*_args, **_kwargs): return ('', '') - setattr(util, 'subp', nsubp) - self.patched_funcs.append((util, 'subp', func)) + self.patched_funcs.enter_context( + mock.patch.object(util, 'subp', nsubp)) def null_func(*_args, **_kwargs): return None for f in ['chownbyid', 'chownbyname']: - func = getattr(util, f) - setattr(util, f, null_func) - self.patched_funcs.append((util, f, func)) + self.patched_funcs.enter_context( + mock.patch.object(util, f, null_func)) def patchOS(self, new_root): patch_funcs = { - os.path: ['isfile', 'exists', 'islink', 'isdir'], - os: ['listdir'], + os.path: [('isfile', 1), ('exists', 1), + ('islink', 1), ('isdir', 1)], + os: [('listdir', 1), ('mkdir', 1), + ('lstat', 1), ('symlink', 2)], } for (mod, funcs) in patch_funcs.items(): - for f in funcs: + for f, nargs in funcs: func = getattr(mod, f) - trap_func = retarget_many_wrapper(new_root, 1, func) - setattr(mod, f, trap_func) - self.patched_funcs.append((mod, f, func)) + trap_func = retarget_many_wrapper(new_root, nargs, func) + self.patched_funcs.enter_context( + mock.patch.object(mod, f, trap_func)) + + def patchOpen(self, new_root): + trap_func = retarget_many_wrapper(new_root, 1, open) + name = 'builtins.open' if PY3 else '__builtin__.open' + self.patched_funcs.enter_context(mock.patch(name, trap_func)) + + def patchStdoutAndStderr(self, stdout=None, stderr=None): + if stdout is not None: + self.patched_funcs.enter_context( + mock.patch.object(sys, 'stdout', stdout)) + if stderr is not None: + self.patched_funcs.enter_context( + mock.patch.object(sys, 'stderr', stderr)) + + +def import_httpretty(): + """Import HTTPretty and monkey patch Python 3.4 issue. + See https://github.com/gabrielfalcao/HTTPretty/pull/193 and + as well as https://github.com/gabrielfalcao/HTTPretty/issues/221. + + Lifted from + https://github.com/inveniosoftware/datacite/blob/master/tests/helpers.py + """ + if not FIX_HTTPRETTY: + import httpretty + else: + import socket + old_SocketType = socket.SocketType + + import httpretty + from httpretty import core + + def sockettype_patch(f): + @functools.wraps(f) + def inner(*args, **kwargs): + f(*args, **kwargs) + socket.SocketType = old_SocketType + socket.__dict__['SocketType'] = old_SocketType + return inner + + core.httpretty.disable = sockettype_patch( + httpretty.httpretty.disable + ) + return httpretty class HttprettyTestCase(TestCase): @@ -256,7 +323,25 @@ class HttprettyTestCase(TestCase): def populate_dir(path, files): if not os.path.exists(path): os.makedirs(path) - for (name, content) in files.iteritems(): - with open(os.path.join(path, name), "w") as fp: - fp.write(content) + for (name, content) in files.items(): + with open(os.path.join(path, name), "wb") as fp: + if isinstance(content, six.binary_type): + fp.write(content) + else: + fp.write(content.encode('utf-8')) fp.close() + + +try: + skipIf = unittest.skipIf +except AttributeError: + # Python 2.6. Doesn't have to be high fidelity. + def skipIf(condition, reason): + def decorator(func): + def wrapper(*args, **kws): + if condition: + return func(*args, **kws) + else: + print(reason, file=sys.stderr) + return wrapper + return decorator diff --git a/tests/unittests/test__init__.py b/tests/unittests/test__init__.py index 17965488..153f1658 100644 --- a/tests/unittests/test__init__.py +++ b/tests/unittests/test__init__.py @@ -1,14 +1,25 @@ import os - -from mocker import MockerTestCase, ARGS, KWARGS +import shutil +import tempfile +import unittest + +try: + from unittest import mock +except ImportError: + import mock +try: + from contextlib import ExitStack +except ImportError: + from contextlib2 import ExitStack from cloudinit import handlers from cloudinit import helpers -from cloudinit import importer from cloudinit import settings from cloudinit import url_helper from cloudinit import util +from .helpers import TestCase + class FakeModule(handlers.Handler): def __init__(self): @@ -22,76 +33,73 @@ class FakeModule(handlers.Handler): pass -class TestWalkerHandleHandler(MockerTestCase): +class TestWalkerHandleHandler(TestCase): def setUp(self): - - MockerTestCase.setUp(self) + super(TestWalkerHandleHandler, self).setUp() + tmpdir = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, tmpdir) self.data = { "handlercount": 0, "frequency": "", - "handlerdir": self.makeDir(), + "handlerdir": tmpdir, "handlers": helpers.ContentHandlers(), "data": None} self.expected_module_name = "part-handler-%03d" % ( self.data["handlercount"],) expected_file_name = "%s.py" % self.expected_module_name - expected_file_fullname = os.path.join(self.data["handlerdir"], - expected_file_name) + self.expected_file_fullname = os.path.join( + self.data["handlerdir"], expected_file_name) self.module_fake = FakeModule() self.ctype = None self.filename = None self.payload = "dummy payload" - # Mock the write_file function - write_file_mock = self.mocker.replace(util.write_file, - passthrough=False) - write_file_mock(expected_file_fullname, self.payload, 0600) + # Mock the write_file() function. We'll assert that it got called as + # expected in each of the individual tests. + resources = ExitStack() + self.addCleanup(resources.close) + self.write_file_mock = resources.enter_context( + mock.patch('cloudinit.util.write_file')) def test_no_errors(self): """Payload gets written to file and added to C{pdata}.""" - import_mock = self.mocker.replace(importer.import_module, - passthrough=False) - import_mock(self.expected_module_name) - self.mocker.result(self.module_fake) - self.mocker.replay() - - handlers.walker_handle_handler(self.data, self.ctype, self.filename, - self.payload) - - self.assertEqual(1, self.data["handlercount"]) + with mock.patch('cloudinit.importer.import_module', + return_value=self.module_fake) as mockobj: + handlers.walker_handle_handler(self.data, self.ctype, + self.filename, self.payload) + mockobj.assert_called_once_with(self.expected_module_name) + self.write_file_mock.assert_called_once_with( + self.expected_file_fullname, self.payload, 0o600) + self.assertEqual(self.data['handlercount'], 1) def test_import_error(self): """Module import errors are logged. No handler added to C{pdata}.""" - import_mock = self.mocker.replace(importer.import_module, - passthrough=False) - import_mock(self.expected_module_name) - self.mocker.throw(ImportError()) - self.mocker.replay() - - handlers.walker_handle_handler(self.data, self.ctype, self.filename, - self.payload) - - self.assertEqual(0, self.data["handlercount"]) + with mock.patch('cloudinit.importer.import_module', + side_effect=ImportError) as mockobj: + handlers.walker_handle_handler(self.data, self.ctype, + self.filename, self.payload) + mockobj.assert_called_once_with(self.expected_module_name) + self.write_file_mock.assert_called_once_with( + self.expected_file_fullname, self.payload, 0o600) + self.assertEqual(self.data['handlercount'], 0) def test_attribute_error(self): """Attribute errors are logged. No handler added to C{pdata}.""" - import_mock = self.mocker.replace(importer.import_module, - passthrough=False) - import_mock(self.expected_module_name) - self.mocker.result(self.module_fake) - self.mocker.throw(AttributeError()) - self.mocker.replay() - - handlers.walker_handle_handler(self.data, self.ctype, self.filename, - self.payload) - - self.assertEqual(0, self.data["handlercount"]) + with mock.patch('cloudinit.importer.import_module', + side_effect=AttributeError, + return_value=self.module_fake) as mockobj: + handlers.walker_handle_handler(self.data, self.ctype, + self.filename, self.payload) + mockobj.assert_called_once_with(self.expected_module_name) + self.write_file_mock.assert_called_once_with( + self.expected_file_fullname, self.payload, 0o600) + self.assertEqual(self.data['handlercount'], 0) -class TestHandlerHandlePart(MockerTestCase): +class TestHandlerHandlePart(unittest.TestCase): def setUp(self): self.data = "fake data" @@ -108,123 +116,105 @@ class TestHandlerHandlePart(MockerTestCase): C{handle_part} is called without C{frequency} for C{handler_version} == 1. """ - mod_mock = self.mocker.mock() - getattr(mod_mock, "frequency") - self.mocker.result(settings.PER_INSTANCE) - getattr(mod_mock, "handler_version") - self.mocker.result(1) - mod_mock.handle_part(self.data, self.ctype, self.filename, - self.payload) - self.mocker.replay() - - handlers.run_part(mod_mock, self.data, self.filename, - self.payload, self.frequency, self.headers) + mod_mock = mock.Mock(frequency=settings.PER_INSTANCE, + handler_version=1) + handlers.run_part(mod_mock, self.data, self.filename, self.payload, + self.frequency, self.headers) + # Assert that the handle_part() method of the mock object got + # called with the expected arguments. + mod_mock.handle_part.assert_called_once_with( + self.data, self.ctype, self.filename, self.payload) def test_normal_version_2(self): """ C{handle_part} is called with C{frequency} for C{handler_version} == 2. """ - mod_mock = self.mocker.mock() - getattr(mod_mock, "frequency") - self.mocker.result(settings.PER_INSTANCE) - getattr(mod_mock, "handler_version") - self.mocker.result(2) - mod_mock.handle_part(self.data, self.ctype, self.filename, - self.payload, self.frequency) - self.mocker.replay() - - handlers.run_part(mod_mock, self.data, self.filename, - self.payload, self.frequency, self.headers) + mod_mock = mock.Mock(frequency=settings.PER_INSTANCE, + handler_version=2) + handlers.run_part(mod_mock, self.data, self.filename, self.payload, + self.frequency, self.headers) + # Assert that the handle_part() method of the mock object got + # called with the expected arguments. + mod_mock.handle_part.assert_called_once_with( + self.data, self.ctype, self.filename, self.payload, + settings.PER_INSTANCE) def test_modfreq_per_always(self): """ C{handle_part} is called regardless of frequency if nofreq is always. """ self.frequency = "once" - mod_mock = self.mocker.mock() - getattr(mod_mock, "frequency") - self.mocker.result(settings.PER_ALWAYS) - getattr(mod_mock, "handler_version") - self.mocker.result(1) - mod_mock.handle_part(self.data, self.ctype, self.filename, - self.payload) - self.mocker.replay() - - handlers.run_part(mod_mock, self.data, self.filename, - self.payload, self.frequency, self.headers) + mod_mock = mock.Mock(frequency=settings.PER_ALWAYS, + handler_version=1) + handlers.run_part(mod_mock, self.data, self.filename, self.payload, + self.frequency, self.headers) + # Assert that the handle_part() method of the mock object got + # called with the expected arguments. + mod_mock.handle_part.assert_called_once_with( + self.data, self.ctype, self.filename, self.payload) def test_no_handle_when_modfreq_once(self): """C{handle_part} is not called if frequency is once.""" self.frequency = "once" - mod_mock = self.mocker.mock() - getattr(mod_mock, "frequency") - self.mocker.result(settings.PER_ONCE) - self.mocker.replay() - - handlers.run_part(mod_mock, self.data, self.filename, - self.payload, self.frequency, self.headers) + mod_mock = mock.Mock(frequency=settings.PER_ONCE) + handlers.run_part(mod_mock, self.data, self.filename, self.payload, + self.frequency, self.headers) + self.assertEqual(0, mod_mock.handle_part.call_count) def test_exception_is_caught(self): """Exceptions within C{handle_part} are caught and logged.""" - mod_mock = self.mocker.mock() - getattr(mod_mock, "frequency") - self.mocker.result(settings.PER_INSTANCE) - getattr(mod_mock, "handler_version") - self.mocker.result(1) - mod_mock.handle_part(self.data, self.ctype, self.filename, - self.payload) - self.mocker.throw(Exception()) - self.mocker.replay() + mod_mock = mock.Mock(frequency=settings.PER_INSTANCE, + handler_version=1) + mod_mock.handle_part.side_effect = Exception + try: + handlers.run_part(mod_mock, self.data, self.filename, + self.payload, self.frequency, self.headers) + except Exception: + self.fail("Exception was not caught in handle_part") - handlers.run_part(mod_mock, self.data, self.filename, - self.payload, self.frequency, self.headers) + mod_mock.handle_part.assert_called_once_with( + self.data, self.ctype, self.filename, self.payload) -class TestCmdlineUrl(MockerTestCase): +class TestCmdlineUrl(unittest.TestCase): def test_invalid_content(self): url = "http://example.com/foo" key = "mykey" - payload = "0" + payload = b"0" cmdline = "ro %s=%s bar=1" % (key, url) - mock_readurl = self.mocker.replace(url_helper.readurl, - passthrough=False) - mock_readurl(url, ARGS, KWARGS) - self.mocker.result(url_helper.StringResponse(payload)) - self.mocker.replay() - - self.assertEqual((key, url, None), - util.get_cmdline_url(names=[key], starts="xxxxxx", - cmdline=cmdline)) + with mock.patch('cloudinit.url_helper.readurl', + return_value=url_helper.StringResponse(payload)): + self.assertEqual( + util.get_cmdline_url(names=[key], starts="xxxxxx", + cmdline=cmdline), + (key, url, None)) def test_valid_content(self): url = "http://example.com/foo" key = "mykey" - payload = "xcloud-config\nmydata: foo\nbar: wark\n" + payload = b"xcloud-config\nmydata: foo\nbar: wark\n" cmdline = "ro %s=%s bar=1" % (key, url) - mock_readurl = self.mocker.replace(url_helper.readurl, - passthrough=False) - mock_readurl(url, ARGS, KWARGS) - self.mocker.result(url_helper.StringResponse(payload)) - self.mocker.replay() - - self.assertEqual((key, url, payload), - util.get_cmdline_url(names=[key], starts="xcloud-config", - cmdline=cmdline)) + with mock.patch('cloudinit.url_helper.readurl', + return_value=url_helper.StringResponse(payload)): + self.assertEqual( + util.get_cmdline_url(names=[key], starts=b"xcloud-config", + cmdline=cmdline), + (key, url, payload)) def test_no_key_found(self): url = "http://example.com/foo" key = "mykey" cmdline = "ro %s=%s bar=1" % (key, url) - self.mocker.replace(url_helper.readurl, passthrough=False) - self.mocker.result(url_helper.StringResponse("")) - self.mocker.replay() + with mock.patch('cloudinit.url_helper.readurl', + return_value=url_helper.StringResponse(b'')): + self.assertEqual( + util.get_cmdline_url(names=["does-not-appear"], + starts="#cloud-config", cmdline=cmdline), + (None, None, None)) - self.assertEqual((None, None, None), - util.get_cmdline_url(names=["does-not-appear"], - starts="#cloud-config", cmdline=cmdline)) # vi: ts=4 expandtab diff --git a/tests/unittests/test_builtin_handlers.py b/tests/unittests/test_builtin_handlers.py index af7f442e..ad32d0b2 100644 --- a/tests/unittests/test_builtin_handlers.py +++ b/tests/unittests/test_builtin_handlers.py @@ -1,6 +1,13 @@ """Tests of the built-in user data handlers.""" import os +import shutil +import tempfile + +try: + from unittest import mock +except ImportError: + import mock from . import helpers as test_helpers @@ -14,10 +21,11 @@ from cloudinit.settings import (PER_ALWAYS, PER_INSTANCE) class TestBuiltins(test_helpers.FilesystemMockingTestCase): - def test_upstart_frequency_no_out(self): - c_root = self.makeDir() - up_root = self.makeDir() + c_root = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, c_root) + up_root = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, up_root) paths = helpers.Paths({ 'cloud_dir': c_root, 'upstart_dir': up_root, @@ -36,7 +44,8 @@ class TestBuiltins(test_helpers.FilesystemMockingTestCase): def test_upstart_frequency_single(self): # files should be written out when frequency is ! per-instance - new_root = self.makeDir() + new_root = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, new_root) freq = PER_INSTANCE self.patchOS(new_root) @@ -49,16 +58,16 @@ class TestBuiltins(test_helpers.FilesystemMockingTestCase): util.ensure_dir("/run") util.ensure_dir("/etc/upstart") - mock_subp = self.mocker.replace(util.subp, passthrough=False) - mock_subp(["initctl", "reload-configuration"], capture=False) - self.mocker.replay() + with mock.patch.object(util, 'subp') as mockobj: + h = upstart_job.UpstartJobPartHandler(paths) + h.handle_part('', handlers.CONTENT_START, + None, None, None) + h.handle_part('blah', 'text/upstart-job', + 'test.conf', 'blah', freq) + h.handle_part('', handlers.CONTENT_END, + None, None, None) - h = upstart_job.UpstartJobPartHandler(paths) - h.handle_part('', handlers.CONTENT_START, - None, None, None) - h.handle_part('blah', 'text/upstart-job', - 'test.conf', 'blah', freq) - h.handle_part('', handlers.CONTENT_END, - None, None, None) + self.assertEquals(len(os.listdir('/etc/upstart')), 1) - self.assertEquals(1, len(os.listdir('/etc/upstart'))) + mockobj.assert_called_once_with( + ['initctl', 'reload-configuration'], capture=False) diff --git a/tests/unittests/test_cli.py b/tests/unittests/test_cli.py new file mode 100644 index 00000000..ed863399 --- /dev/null +++ b/tests/unittests/test_cli.py @@ -0,0 +1,54 @@ +import imp +import os +import sys +import six + +from . import helpers as test_helpers + +try: + from unittest import mock +except ImportError: + import mock + + +BIN_CLOUDINIT = "bin/cloud-init" + + +class TestCLI(test_helpers.FilesystemMockingTestCase): + + def setUp(self): + super(TestCLI, self).setUp() + self.stderr = six.StringIO() + self.patchStdoutAndStderr(stderr=self.stderr) + self.sys_exit = mock.MagicMock() + self.patched_funcs.enter_context( + mock.patch.object(sys, 'exit', self.sys_exit)) + + def _call_main(self): + self.patched_funcs.enter_context( + mock.patch.object(sys, 'argv', ['cloud-init'])) + cli = imp.load_module( + 'cli', open(BIN_CLOUDINIT), '', ('', 'r', imp.PY_SOURCE)) + try: + return cli.main() + except: + pass + + @test_helpers.skipIf(not os.path.isfile(BIN_CLOUDINIT), "no bin/cloudinit") + def test_no_arguments_shows_usage(self): + self._call_main() + self.assertIn('usage: cloud-init', self.stderr.getvalue()) + + @test_helpers.skipIf(not os.path.isfile(BIN_CLOUDINIT), "no bin/cloudinit") + def test_no_arguments_exits_2(self): + exit_code = self._call_main() + if self.sys_exit.call_count: + self.assertEqual(mock.call(2), self.sys_exit.call_args) + else: + self.assertEqual(2, exit_code) + + @test_helpers.skipIf(not os.path.isfile(BIN_CLOUDINIT), "no bin/cloudinit") + def test_no_arguments_shows_error_message(self): + self._call_main() + self.assertIn('cloud-init: error: too few arguments', + self.stderr.getvalue()) diff --git a/tests/unittests/test_cs_util.py b/tests/unittests/test_cs_util.py index 7d59222b..d7273035 100644 --- a/tests/unittests/test_cs_util.py +++ b/tests/unittests/test_cs_util.py @@ -1,7 +1,21 @@ -from mocker import MockerTestCase +from __future__ import print_function + +import sys +import unittest from cloudinit.cs_utils import Cepko +try: + skip = unittest.skip +except AttributeError: + # Python 2.6. Doesn't have to be high fidelity. + def skip(reason): + def decorator(func): + def wrapper(*args, **kws): + print(reason, file=sys.stderr) + return wrapper + return decorator + SERVER_CONTEXT = { "cpu": 1000, @@ -26,16 +40,21 @@ class CepkoMock(Cepko): return SERVER_CONTEXT['tags'] -class CepkoResultTests(MockerTestCase): +# 2015-01-22 BAW: This test is completely useless because it only ever tests +# the CepkoMock object. Even in its original form, I don't think it ever +# touched the underlying Cepko class methods. +@skip('This test is completely useless') +class CepkoResultTests(unittest.TestCase): def setUp(self): - self.mocked = self.mocker.replace("cloudinit.cs_utils.Cepko", - spec=CepkoMock, - count=False, - passthrough=False) - self.mocked() - self.mocker.result(CepkoMock()) - self.mocker.replay() - self.c = Cepko() + pass + # self.mocked = self.mocker.replace("cloudinit.cs_utils.Cepko", + # spec=CepkoMock, + # count=False, + # passthrough=False) + # self.mocked() + # self.mocker.result(CepkoMock()) + # self.mocker.replay() + # self.c = Cepko() def test_getitem(self): result = self.c.all() diff --git a/tests/unittests/test_data.py b/tests/unittests/test_data.py index fd6bd8a1..9c1ec1d4 100644 --- a/tests/unittests/test_data.py +++ b/tests/unittests/test_data.py @@ -1,11 +1,19 @@ """Tests for handling of userdata within cloud init.""" -import StringIO - import gzip import logging import os +import shutil +import tempfile + +try: + from unittest import mock +except ImportError: + import mock + +from six import BytesIO, StringIO +from email import encoders from email.mime.application import MIMEApplication from email.mime.base import MIMEBase from email.mime.multipart import MIMEMultipart @@ -16,13 +24,15 @@ from cloudinit import log from cloudinit.settings import (PER_INSTANCE) from cloudinit import sources from cloudinit import stages +from cloudinit import user_data as ud from cloudinit import util -INSTANCE_ID = "i-testing" - from . import helpers +INSTANCE_ID = "i-testing" + + class FakeDataSource(sources.DataSource): def __init__(self, userdata=None, vendordata=None): @@ -32,28 +42,45 @@ class FakeDataSource(sources.DataSource): self.vendordata_raw = vendordata +def count_messages(root): + am = 0 + for m in root.walk(): + if ud.is_skippable(m): + continue + am += 1 + return am + + +def gzip_text(text): + contents = BytesIO() + f = gzip.GzipFile(fileobj=contents, mode='wb') + f.write(util.encode_text(text)) + f.flush() + f.close() + return contents.getvalue() + + # FIXME: these tests shouldn't be checking log output?? # Weirddddd... class TestConsumeUserData(helpers.FilesystemMockingTestCase): def setUp(self): - helpers.FilesystemMockingTestCase.setUp(self) + super(TestConsumeUserData, self).setUp() self._log = None self._log_file = None self._log_handler = None def tearDown(self): - helpers.FilesystemMockingTestCase.tearDown(self) if self._log_handler and self._log: self._log.removeHandler(self._log_handler) + helpers.FilesystemMockingTestCase.tearDown(self) def _patchIn(self, root): - self.restore() self.patchOS(root) self.patchUtils(root) def capture_log(self, lvl=logging.DEBUG): - log_file = StringIO.StringIO() + log_file = StringIO() self._log_handler = logging.StreamHandler(log_file) self._log_handler.setLevel(lvl) self._log = log.getLogger() @@ -71,7 +98,8 @@ class TestConsumeUserData(helpers.FilesystemMockingTestCase): ci = stages.Init() ci.datasource = FakeDataSource(blob) - new_root = self.makeDir() + new_root = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, new_root) self.patchUtils(new_root) self.patchOS(new_root) ci.fetch() @@ -99,7 +127,8 @@ class TestConsumeUserData(helpers.FilesystemMockingTestCase): { "op": "add", "path": "/foo", "value": "quxC" } ] ''' - new_root = self.makeDir() + new_root = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, new_root) self._patchIn(new_root) initer = stages.Init() initer.datasource = FakeDataSource(user_blob, vendordata=vendor_blob) @@ -138,7 +167,8 @@ class TestConsumeUserData(helpers.FilesystemMockingTestCase): { "op": "add", "path": "/foo", "value": "quxC" } ] ''' - new_root = self.makeDir() + new_root = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, new_root) self._patchIn(new_root) initer = stages.Init() initer.datasource = FakeDataSource(user_blob, vendordata=vendor_blob) @@ -184,7 +214,8 @@ c: d ci = stages.Init() ci.datasource = FakeDataSource(str(message)) - new_root = self.makeDir() + new_root = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, new_root) self.patchUtils(new_root) self.patchOS(new_root) ci.fetch() @@ -214,7 +245,8 @@ name: user run: - z ''' - new_root = self.makeDir() + new_root = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, new_root) self._patchIn(new_root) initer = stages.Init() initer.datasource = FakeDataSource(user_blob, vendordata=vendor_blob) @@ -249,7 +281,8 @@ vendor_data: enabled: True prefix: /bin/true ''' - new_root = self.makeDir() + new_root = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, new_root) self._patchIn(new_root) initer = stages.Init() initer.datasource = FakeDataSource(user_blob, vendordata=vendor_blob) @@ -309,7 +342,8 @@ p: 1 paths = c_helpers.Paths({}, ds=FakeDataSource('')) cloud_cfg = handlers.cloud_config.CloudConfigPartHandler(paths) - new_root = self.makeDir() + new_root = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, new_root) self.patchUtils(new_root) self.patchOS(new_root) cloud_cfg.handle_part(None, handlers.CONTENT_START, None, None, None, @@ -335,28 +369,22 @@ p: 1 data = "arbitrary text\n" ci.datasource = FakeDataSource(data) - mock_write = self.mocker.replace("cloudinit.util.write_file", - passthrough=False) - mock_write(ci.paths.get_ipath("cloud_config"), "", 0600) - self.mocker.replay() + with mock.patch('cloudinit.util.write_file') as mockobj: + log_file = self.capture_log(logging.WARNING) + ci.fetch() + ci.consume_data() + self.assertIn( + "Unhandled non-multipart (text/x-not-multipart) userdata:", + log_file.getvalue()) - log_file = self.capture_log(logging.WARNING) - ci.fetch() - ci.consume_data() - self.assertIn( - "Unhandled non-multipart (text/x-not-multipart) userdata:", - log_file.getvalue()) + mockobj.assert_called_once_with( + ci.paths.get_ipath("cloud_config"), "", 0o600) def test_mime_gzip_compressed(self): """Tests that individual message gzip encoding works.""" def gzip_part(text): - contents = StringIO.StringIO() - f = gzip.GzipFile(fileobj=contents, mode='w') - f.write(str(text)) - f.flush() - f.close() - return MIMEApplication(contents.getvalue(), 'gzip') + return MIMEApplication(gzip_text(text), 'gzip') base_content1 = ''' #cloud-config @@ -374,7 +402,8 @@ c: 4 message.attach(gzip_part(base_content2)) ci = stages.Init() ci.datasource = FakeDataSource(str(message)) - new_root = self.makeDir() + new_root = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, new_root) self.patchUtils(new_root) self.patchOS(new_root) ci.fetch() @@ -392,19 +421,17 @@ c: 4 ci = stages.Init() message = MIMEBase("text", "plain") message.set_payload("Just text") - ci.datasource = FakeDataSource(message.as_string()) - - mock_write = self.mocker.replace("cloudinit.util.write_file", - passthrough=False) - mock_write(ci.paths.get_ipath("cloud_config"), "", 0600) - self.mocker.replay() - - log_file = self.capture_log(logging.WARNING) - ci.fetch() - ci.consume_data() - self.assertIn( - "Unhandled unknown content-type (text/plain)", - log_file.getvalue()) + ci.datasource = FakeDataSource(message.as_string().encode()) + + with mock.patch('cloudinit.util.write_file') as mockobj: + log_file = self.capture_log(logging.WARNING) + ci.fetch() + ci.consume_data() + self.assertIn( + "Unhandled unknown content-type (text/plain)", + log_file.getvalue()) + mockobj.assert_called_once_with( + ci.paths.get_ipath("cloud_config"), "", 0o600) def test_shellscript(self): """Raw text starting #!/bin/sh is treated as script.""" @@ -413,16 +440,17 @@ c: 4 ci.datasource = FakeDataSource(script) outpath = os.path.join(ci.paths.get_ipath_cur("scripts"), "part-001") - mock_write = self.mocker.replace("cloudinit.util.write_file", - passthrough=False) - mock_write(ci.paths.get_ipath("cloud_config"), "", 0600) - mock_write(outpath, script, 0700) - self.mocker.replay() - log_file = self.capture_log(logging.WARNING) - ci.fetch() - ci.consume_data() - self.assertEqual("", log_file.getvalue()) + with mock.patch('cloudinit.util.write_file') as mockobj: + log_file = self.capture_log(logging.WARNING) + ci.fetch() + ci.consume_data() + self.assertEqual("", log_file.getvalue()) + + mockobj.assert_has_calls([ + mock.call(outpath, script, 0o700), + mock.call(ci.paths.get_ipath("cloud_config"), "", 0o600), + ]) def test_mime_text_x_shellscript(self): """Mime message of type text/x-shellscript is treated as script.""" @@ -433,16 +461,17 @@ c: 4 ci.datasource = FakeDataSource(message.as_string()) outpath = os.path.join(ci.paths.get_ipath_cur("scripts"), "part-001") - mock_write = self.mocker.replace("cloudinit.util.write_file", - passthrough=False) - mock_write(ci.paths.get_ipath("cloud_config"), "", 0600) - mock_write(outpath, script, 0700) - self.mocker.replay() - log_file = self.capture_log(logging.WARNING) - ci.fetch() - ci.consume_data() - self.assertEqual("", log_file.getvalue()) + with mock.patch('cloudinit.util.write_file') as mockobj: + log_file = self.capture_log(logging.WARNING) + ci.fetch() + ci.consume_data() + self.assertEqual("", log_file.getvalue()) + + mockobj.assert_has_calls([ + mock.call(outpath, script, 0o700), + mock.call(ci.paths.get_ipath("cloud_config"), "", 0o600), + ]) def test_mime_text_plain_shell(self): """Mime type text/plain starting #!/bin/sh is treated as script.""" @@ -453,13 +482,81 @@ c: 4 ci.datasource = FakeDataSource(message.as_string()) outpath = os.path.join(ci.paths.get_ipath_cur("scripts"), "part-001") - mock_write = self.mocker.replace("cloudinit.util.write_file", - passthrough=False) - mock_write(outpath, script, 0700) - mock_write(ci.paths.get_ipath("cloud_config"), "", 0600) - self.mocker.replay() - log_file = self.capture_log(logging.WARNING) - ci.fetch() - ci.consume_data() - self.assertEqual("", log_file.getvalue()) + with mock.patch('cloudinit.util.write_file') as mockobj: + log_file = self.capture_log(logging.WARNING) + ci.fetch() + ci.consume_data() + self.assertEqual("", log_file.getvalue()) + + mockobj.assert_has_calls([ + mock.call(outpath, script, 0o700), + mock.call(ci.paths.get_ipath("cloud_config"), "", 0o600), + ]) + + def test_mime_application_octet_stream(self): + """Mime type application/octet-stream is ignored but shows warning.""" + ci = stages.Init() + message = MIMEBase("application", "octet-stream") + message.set_payload(b'\xbf\xe6\xb2\xc3\xd3\xba\x13\xa4\xd8\xa1\xcc') + encoders.encode_base64(message) + ci.datasource = FakeDataSource(message.as_string().encode()) + + with mock.patch('cloudinit.util.write_file') as mockobj: + log_file = self.capture_log(logging.WARNING) + ci.fetch() + ci.consume_data() + self.assertIn( + "Unhandled unknown content-type (application/octet-stream)", + log_file.getvalue()) + mockobj.assert_called_once_with( + ci.paths.get_ipath("cloud_config"), "", 0o600) + + def test_cloud_config_archive(self): + non_decodable = b'\x11\xc9\xb4gTH\xee\x12' + data = [{'content': '#cloud-config\npassword: gocubs\n'}, + {'content': '#cloud-config\nlocale: chicago\n'}, + {'content': non_decodable}] + message = b'#cloud-config-archive\n' + util.yaml_dumps(data).encode() + + ci = stages.Init() + ci.datasource = FakeDataSource(message) + + fs = {} + + def fsstore(filename, content, mode=0o0644, omode="wb"): + fs[filename] = content + + # consuming the user-data provided should write 'cloud_config' file + # which will have our yaml in it. + with mock.patch('cloudinit.util.write_file') as mockobj: + mockobj.side_effect = fsstore + ci.fetch() + ci.consume_data() + + cfg = util.load_yaml(fs[ci.paths.get_ipath("cloud_config")]) + self.assertEqual(cfg.get('password'), 'gocubs') + self.assertEqual(cfg.get('locale'), 'chicago') + + +class TestUDProcess(helpers.ResourceUsingTestCase): + + def test_bytes_in_userdata(self): + msg = b'#cloud-config\napt_update: True\n' + ud_proc = ud.UserDataProcessor(self.getCloudPaths()) + message = ud_proc.process(msg) + self.assertTrue(count_messages(message) == 1) + + def test_string_in_userdata(self): + msg = '#cloud-config\napt_update: True\n' + + ud_proc = ud.UserDataProcessor(self.getCloudPaths()) + message = ud_proc.process(msg) + self.assertTrue(count_messages(message) == 1) + + def test_compressed_in_userdata(self): + msg = gzip_text('#cloud-config\napt_update: True\n') + + ud_proc = ud.UserDataProcessor(self.getCloudPaths()) + message = ud_proc.process(msg) + self.assertTrue(count_messages(message) == 1) diff --git a/tests/unittests/test_datasource/test_altcloud.py b/tests/unittests/test_datasource/test_altcloud.py index eaaa90e6..85759c68 100644 --- a/tests/unittests/test_datasource/test_altcloud.py +++ b/tests/unittests/test_datasource/test_altcloud.py @@ -26,6 +26,7 @@ import shutil import tempfile from cloudinit import helpers +from cloudinit import util from unittest import TestCase # Get the cloudinit.sources.DataSourceAltCloud import items needed. @@ -45,7 +46,7 @@ def _write_cloud_info_file(value): cifile = open(cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE, 'w') cifile.write(value) cifile.close() - os.chmod(cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE, 0664) + os.chmod(cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE, 0o664) def _remove_cloud_info_file(): @@ -66,12 +67,12 @@ def _write_user_data_files(mount_dir, value): udfile = open(deltacloud_user_data_file, 'w') udfile.write(value) udfile.close() - os.chmod(deltacloud_user_data_file, 0664) + os.chmod(deltacloud_user_data_file, 0o664) udfile = open(user_data_file, 'w') udfile.write(value) udfile.close() - os.chmod(user_data_file, 0664) + os.chmod(user_data_file, 0o664) def _remove_user_data_files(mount_dir, @@ -98,6 +99,16 @@ def _remove_user_data_files(mount_dir, pass +def _dmi_data(expected): + ''' + Spoof the data received over DMI + ''' + def _data(key): + return expected + + return _data + + class TestGetCloudType(TestCase): ''' Test to exercise method: DataSourceAltCloud.get_cloud_type() @@ -106,69 +117,42 @@ class TestGetCloudType(TestCase): def setUp(self): '''Set up.''' self.paths = helpers.Paths({'cloud_dir': '/tmp'}) + self.dmi_data = util.read_dmi_data # We have a different code path for arm to deal with LP1243287 # We have to switch arch to x86_64 to avoid test failure force_arch('x86_64') def tearDown(self): # Reset - cloudinit.sources.DataSourceAltCloud.CMD_DMI_SYSTEM = \ - ['dmidecode', '--string', 'system-product-name'] - # Return back to original arch + util.read_dmi_data = self.dmi_data force_arch() def test_rhev(self): ''' Test method get_cloud_type() for RHEVm systems. - Forcing dmidecode return to match a RHEVm system: RHEV Hypervisor + Forcing read_dmi_data return to match a RHEVm system: RHEV Hypervisor ''' - cloudinit.sources.DataSourceAltCloud.CMD_DMI_SYSTEM = \ - ['echo', 'RHEV Hypervisor'] + util.read_dmi_data = _dmi_data('RHEV') dsrc = DataSourceAltCloud({}, None, self.paths) - self.assertEquals('RHEV', \ - dsrc.get_cloud_type()) + self.assertEquals('RHEV', dsrc.get_cloud_type()) def test_vsphere(self): ''' Test method get_cloud_type() for vSphere systems. - Forcing dmidecode return to match a vSphere system: RHEV Hypervisor + Forcing read_dmi_data return to match a vSphere system: RHEV Hypervisor ''' - cloudinit.sources.DataSourceAltCloud.CMD_DMI_SYSTEM = \ - ['echo', 'VMware Virtual Platform'] + util.read_dmi_data = _dmi_data('VMware Virtual Platform') dsrc = DataSourceAltCloud({}, None, self.paths) - self.assertEquals('VSPHERE', \ - dsrc.get_cloud_type()) + self.assertEquals('VSPHERE', dsrc.get_cloud_type()) def test_unknown(self): ''' Test method get_cloud_type() for unknown systems. - Forcing dmidecode return to match an unrecognized return. - ''' - cloudinit.sources.DataSourceAltCloud.CMD_DMI_SYSTEM = \ - ['echo', 'Unrecognized Platform'] - dsrc = DataSourceAltCloud({}, None, self.paths) - self.assertEquals('UNKNOWN', \ - dsrc.get_cloud_type()) - - def test_exception1(self): - ''' - Test method get_cloud_type() where command dmidecode fails. - ''' - cloudinit.sources.DataSourceAltCloud.CMD_DMI_SYSTEM = \ - ['ls', 'bad command'] - dsrc = DataSourceAltCloud({}, None, self.paths) - self.assertEquals('UNKNOWN', \ - dsrc.get_cloud_type()) - - def test_exception2(self): - ''' - Test method get_cloud_type() where command dmidecode is not available. + Forcing read_dmi_data return to match an unrecognized return. ''' - cloudinit.sources.DataSourceAltCloud.CMD_DMI_SYSTEM = \ - ['bad command'] + util.read_dmi_data = _dmi_data('Unrecognized Platform') dsrc = DataSourceAltCloud({}, None, self.paths) - self.assertEquals('UNKNOWN', \ - dsrc.get_cloud_type()) + self.assertEquals('UNKNOWN', dsrc.get_cloud_type()) class TestGetDataCloudInfoFile(TestCase): @@ -180,6 +164,7 @@ class TestGetDataCloudInfoFile(TestCase): '''Set up.''' self.paths = helpers.Paths({'cloud_dir': '/tmp'}) self.cloud_info_file = tempfile.mkstemp()[1] + self.dmi_data = util.read_dmi_data cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE = \ self.cloud_info_file @@ -192,6 +177,7 @@ class TestGetDataCloudInfoFile(TestCase): except OSError: pass + util.read_dmi_data = self.dmi_data cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE = \ '/etc/sysconfig/cloud-info' @@ -243,6 +229,7 @@ class TestGetDataNoCloudInfoFile(TestCase): def setUp(self): '''Set up.''' self.paths = helpers.Paths({'cloud_dir': '/tmp'}) + self.dmi_data = util.read_dmi_data cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE = \ 'no such file' # We have a different code path for arm to deal with LP1243287 @@ -253,16 +240,14 @@ class TestGetDataNoCloudInfoFile(TestCase): # Reset cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE = \ '/etc/sysconfig/cloud-info' - cloudinit.sources.DataSourceAltCloud.CMD_DMI_SYSTEM = \ - ['dmidecode', '--string', 'system-product-name'] + util.read_dmi_data = self.dmi_data # Return back to original arch force_arch() def test_rhev_no_cloud_file(self): '''Test No cloud info file module get_data() forcing RHEV.''' - cloudinit.sources.DataSourceAltCloud.CMD_DMI_SYSTEM = \ - ['echo', 'RHEV Hypervisor'] + util.read_dmi_data = _dmi_data('RHEV Hypervisor') dsrc = DataSourceAltCloud({}, None, self.paths) dsrc.user_data_rhevm = lambda: True self.assertEquals(True, dsrc.get_data()) @@ -270,8 +255,7 @@ class TestGetDataNoCloudInfoFile(TestCase): def test_vsphere_no_cloud_file(self): '''Test No cloud info file module get_data() forcing VSPHERE.''' - cloudinit.sources.DataSourceAltCloud.CMD_DMI_SYSTEM = \ - ['echo', 'VMware Virtual Platform'] + util.read_dmi_data = _dmi_data('VMware Virtual Platform') dsrc = DataSourceAltCloud({}, None, self.paths) dsrc.user_data_vsphere = lambda: True self.assertEquals(True, dsrc.get_data()) @@ -279,8 +263,7 @@ class TestGetDataNoCloudInfoFile(TestCase): def test_failure_no_cloud_file(self): '''Test No cloud info file module get_data() forcing unrecognized.''' - cloudinit.sources.DataSourceAltCloud.CMD_DMI_SYSTEM = \ - ['echo', 'Unrecognized Platform'] + util.read_dmi_data = _dmi_data('Unrecognized Platform') dsrc = DataSourceAltCloud({}, None, self.paths) self.assertEquals(False, dsrc.get_data()) @@ -426,27 +409,27 @@ class TestReadUserDataCallback(TestCase): '''Test read_user_data_callback() with both files.''' self.assertEquals('test user data', - read_user_data_callback(self.mount_dir)) + read_user_data_callback(self.mount_dir)) def test_callback_dc(self): '''Test read_user_data_callback() with only DC file.''' _remove_user_data_files(self.mount_dir, - dc_file=False, - non_dc_file=True) + dc_file=False, + non_dc_file=True) self.assertEquals('test user data', - read_user_data_callback(self.mount_dir)) + read_user_data_callback(self.mount_dir)) def test_callback_non_dc(self): '''Test read_user_data_callback() with only non-DC file.''' _remove_user_data_files(self.mount_dir, - dc_file=True, - non_dc_file=False) + dc_file=True, + non_dc_file=False) self.assertEquals('test user data', - read_user_data_callback(self.mount_dir)) + read_user_data_callback(self.mount_dir)) def test_callback_none(self): '''Test read_user_data_callback() no files are found.''' diff --git a/tests/unittests/test_datasource/test_azure.py b/tests/unittests/test_datasource/test_azure.py index e992a006..444e2799 100644 --- a/tests/unittests/test_datasource/test_azure.py +++ b/tests/unittests/test_datasource/test_azure.py @@ -1,14 +1,24 @@ from cloudinit import helpers -from cloudinit.util import load_file +from cloudinit.util import b64e, decode_binary, load_file from cloudinit.sources import DataSourceAzure -from ..helpers import populate_dir +from ..helpers import TestCase, populate_dir + +try: + from unittest import mock +except ImportError: + import mock +try: + from contextlib import ExitStack +except ImportError: + from contextlib2 import ExitStack -import base64 import crypt -from mocker import MockerTestCase import os import stat import yaml +import shutil +import tempfile +import xml.etree.ElementTree as ET def construct_valid_ovf_env(data=None, pubkeys=None, userdata=None): @@ -40,14 +50,17 @@ def construct_valid_ovf_env(data=None, pubkeys=None, userdata=None): content += "<%s%s>%s</%s>\n" % (key, attrs, val, key) if userdata: - content += "<UserData>%s</UserData>\n" % (base64.b64encode(userdata)) + content += "<UserData>%s</UserData>\n" % (b64e(userdata)) if pubkeys: content += "<SSH><PublicKeys>\n" - for fp, path in pubkeys: + for fp, path, value in pubkeys: content += " <PublicKey>" - content += ("<Fingerprint>%s</Fingerprint><Path>%s</Path>" % - (fp, path)) + if fp and path: + content += ("<Fingerprint>%s</Fingerprint><Path>%s</Path>" % + (fp, path)) + if value: + content += "<Value>%s</Value>" % value content += "</PublicKey>\n" content += "</PublicKeys></SSH>" content += """ @@ -66,26 +79,25 @@ def construct_valid_ovf_env(data=None, pubkeys=None, userdata=None): return content -class TestAzureDataSource(MockerTestCase): +class TestAzureDataSource(TestCase): def setUp(self): - # makeDir comes from MockerTestCase - self.tmp = self.makeDir() + super(TestAzureDataSource, self).setUp() + self.tmp = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, self.tmp) # patch cloud_dir, so our 'seed_dir' is guaranteed empty self.paths = helpers.Paths({'cloud_dir': self.tmp}) self.waagent_d = os.path.join(self.tmp, 'var', 'lib', 'waagent') - self.unapply = [] - super(TestAzureDataSource, self).setUp() + self.patches = ExitStack() + self.addCleanup(self.patches.close) - def tearDown(self): - apply_patches([i for i in reversed(self.unapply)]) - super(TestAzureDataSource, self).tearDown() + super(TestAzureDataSource, self).setUp() def apply_patches(self, patches): - ret = apply_patches(patches) - self.unapply += ret + for module, name, new in patches: + self.patches.enter_context(mock.patch.object(module, name, new)) def _get_ds(self, data): @@ -103,13 +115,6 @@ class TestAzureDataSource(MockerTestCase): data['pubkey_files'] = flist return ["pubkey_from: %s" % f for f in flist] - def _iid_from_shared_config(path): - data['iid_from_shared_cfg'] = path - return 'i-my-azure-id' - - def _apply_hostname_bounce(**kwargs): - data['apply_hostname_bounce'] = kwargs - if data.get('ovfcontent') is not None: populate_dir(os.path.join(self.paths.seed_dir, "azure"), {'ovf-env.xml': data['ovfcontent']}) @@ -117,22 +122,63 @@ class TestAzureDataSource(MockerTestCase): mod = DataSourceAzure mod.BUILTIN_DS_CONFIG['data_dir'] = self.waagent_d - self.apply_patches([(mod, 'list_possible_azure_ds_devs', dsdevs)]) - - self.apply_patches([(mod, 'invoke_agent', _invoke_agent), - (mod, 'wait_for_files', _wait_for_files), - (mod, 'pubkeys_from_crt_files', - _pubkeys_from_crt_files), - (mod, 'iid_from_shared_config', - _iid_from_shared_config), - (mod, 'apply_hostname_bounce', - _apply_hostname_bounce), ]) + self.get_metadata_from_fabric = mock.MagicMock(return_value={ + 'public-keys': [], + }) + + self.instance_id = 'test-instance-id' + + self.apply_patches([ + (mod, 'list_possible_azure_ds_devs', dsdevs), + (mod, 'invoke_agent', _invoke_agent), + (mod, 'wait_for_files', _wait_for_files), + (mod, 'pubkeys_from_crt_files', _pubkeys_from_crt_files), + (mod, 'perform_hostname_bounce', mock.MagicMock()), + (mod, 'get_hostname', mock.MagicMock()), + (mod, 'set_hostname', mock.MagicMock()), + (mod, 'get_metadata_from_fabric', self.get_metadata_from_fabric), + (mod.util, 'read_dmi_data', mock.MagicMock( + return_value=self.instance_id)), + ]) dsrc = mod.DataSourceAzureNet( data.get('sys_cfg', {}), distro=None, paths=self.paths) return dsrc + def xml_equals(self, oxml, nxml): + """Compare two sets of XML to make sure they are equal""" + + def create_tag_index(xml): + et = ET.fromstring(xml) + ret = {} + for x in et.iter(): + ret[x.tag] = x + return ret + + def tags_exists(x, y): + for tag in x.keys(): + self.assertIn(tag, y) + for tag in y.keys(): + self.assertIn(tag, x) + + def tags_equal(x, y): + for x_tag, x_val in x.items(): + y_val = y.get(x_val.tag) + self.assertEquals(x_val.text, y_val.text) + + old_cnt = create_tag_index(oxml) + new_cnt = create_tag_index(nxml) + tags_exists(old_cnt, new_cnt) + tags_equal(old_cnt, new_cnt) + + def xml_notequals(self, oxml, nxml): + try: + self.xml_equals(oxml, nxml) + except AssertionError: + return + raise AssertionError("XML is the same") + def test_basic_seed_dir(self): odata = {'HostName': "myhost", 'UserName': "myuser"} data = {'ovfcontent': construct_valid_ovf_env(data=odata), @@ -145,7 +191,6 @@ class TestAzureDataSource(MockerTestCase): self.assertEqual(dsrc.metadata['local-hostname'], odata['HostName']) self.assertTrue(os.path.isfile( os.path.join(self.waagent_d, 'ovf-env.xml'))) - self.assertEqual(dsrc.metadata['instance-id'], 'i-my-azure-id') def test_waagent_d_has_0700_perms(self): # we expect /var/lib/waagent to be created 0700 @@ -153,7 +198,7 @@ class TestAzureDataSource(MockerTestCase): ret = dsrc.get_data() self.assertTrue(ret) self.assertTrue(os.path.isdir(self.waagent_d)) - self.assertEqual(stat.S_IMODE(os.stat(self.waagent_d).st_mode), 0700) + self.assertEqual(stat.S_IMODE(os.stat(self.waagent_d).st_mode), 0o700) def test_user_cfg_set_agent_command_plain(self): # set dscfg in via plaintext @@ -162,7 +207,7 @@ class TestAzureDataSource(MockerTestCase): yaml_cfg = "{agent_command: my_command}\n" cfg = yaml.safe_load(yaml_cfg) odata = {'HostName': "myhost", 'UserName': "myuser", - 'dscfg': {'text': yaml_cfg, 'encoding': 'plain'}} + 'dscfg': {'text': yaml_cfg, 'encoding': 'plain'}} data = {'ovfcontent': construct_valid_ovf_env(data=odata)} dsrc = self._get_ds(data) @@ -174,8 +219,8 @@ class TestAzureDataSource(MockerTestCase): # set dscfg in via base64 encoded yaml cfg = {'agent_command': "my_command"} odata = {'HostName': "myhost", 'UserName': "myuser", - 'dscfg': {'text': base64.b64encode(yaml.dump(cfg)), - 'encoding': 'base64'}} + 'dscfg': {'text': b64e(yaml.dump(cfg)), + 'encoding': 'base64'}} data = {'ovfcontent': construct_valid_ovf_env(data=odata)} dsrc = self._get_ds(data) @@ -222,17 +267,28 @@ class TestAzureDataSource(MockerTestCase): # should equal that after the '$' pos = defuser['passwd'].rfind("$") + 1 self.assertEqual(defuser['passwd'], - crypt.crypt(odata['UserPassword'], defuser['passwd'][0:pos])) + crypt.crypt(odata['UserPassword'], + defuser['passwd'][0:pos])) + + def test_userdata_plain(self): + mydata = "FOOBAR" + odata = {'UserData': {'text': mydata, 'encoding': 'plain'}} + data = {'ovfcontent': construct_valid_ovf_env(data=odata)} + + dsrc = self._get_ds(data) + ret = dsrc.get_data() + self.assertTrue(ret) + self.assertEqual(decode_binary(dsrc.userdata_raw), mydata) def test_userdata_found(self): mydata = "FOOBAR" - odata = {'UserData': base64.b64encode(mydata)} + odata = {'UserData': {'text': b64e(mydata), 'encoding': 'base64'}} data = {'ovfcontent': construct_valid_ovf_env(data=odata)} dsrc = self._get_ds(data) ret = dsrc.get_data() self.assertTrue(ret) - self.assertEqual(dsrc.userdata_raw, mydata) + self.assertEqual(dsrc.userdata_raw, mydata.encode('utf-8')) def test_no_datasource_expected(self): # no source should be found if no seed_dir and no devs @@ -242,10 +298,10 @@ class TestAzureDataSource(MockerTestCase): self.assertFalse(ret) self.assertFalse('agent_invoked' in data) - def test_cfg_has_pubkeys(self): + def test_cfg_has_pubkeys_fingerprint(self): odata = {'HostName': "myhost", 'UserName': "myuser"} - mypklist = [{'fingerprint': 'fp1', 'path': 'path1'}] - pubkeys = [(x['fingerprint'], x['path']) for x in mypklist] + mypklist = [{'fingerprint': 'fp1', 'path': 'path1', 'value': ''}] + pubkeys = [(x['fingerprint'], x['path'], x['value']) for x in mypklist] data = {'ovfcontent': construct_valid_ovf_env(data=odata, pubkeys=pubkeys)} @@ -254,47 +310,38 @@ class TestAzureDataSource(MockerTestCase): self.assertTrue(ret) for mypk in mypklist: self.assertIn(mypk, dsrc.cfg['_pubkeys']) + self.assertIn('pubkey_from', dsrc.metadata['public-keys'][-1]) - def test_disabled_bounce(self): - pass - - def test_apply_bounce_call_1(self): - # hostname needs to get through to apply_hostname_bounce - odata = {'HostName': 'my-random-hostname'} - data = {'ovfcontent': construct_valid_ovf_env(data=odata)} + def test_cfg_has_pubkeys_value(self): + # make sure that provided key is used over fingerprint + odata = {'HostName': "myhost", 'UserName': "myuser"} + mypklist = [{'fingerprint': 'fp1', 'path': 'path1', 'value': 'value1'}] + pubkeys = [(x['fingerprint'], x['path'], x['value']) for x in mypklist] + data = {'ovfcontent': construct_valid_ovf_env(data=odata, + pubkeys=pubkeys)} - self._get_ds(data).get_data() - self.assertIn('hostname', data['apply_hostname_bounce']) - self.assertEqual(data['apply_hostname_bounce']['hostname'], - odata['HostName']) - - def test_apply_bounce_call_configurable(self): - # hostname_bounce should be configurable in datasource cfg - cfg = {'hostname_bounce': {'interface': 'eth1', 'policy': 'off', - 'command': 'my-bounce-command', - 'hostname_command': 'my-hostname-command'}} - odata = {'HostName': "xhost", - 'dscfg': {'text': base64.b64encode(yaml.dump(cfg)), - 'encoding': 'base64'}} - data = {'ovfcontent': construct_valid_ovf_env(data=odata)} - self._get_ds(data).get_data() + dsrc = self._get_ds(data) + ret = dsrc.get_data() + self.assertTrue(ret) - for k in cfg['hostname_bounce']: - self.assertIn(k, data['apply_hostname_bounce']) + for mypk in mypklist: + self.assertIn(mypk, dsrc.cfg['_pubkeys']) + self.assertIn(mypk['value'], dsrc.metadata['public-keys']) - for k, v in cfg['hostname_bounce'].items(): - self.assertEqual(data['apply_hostname_bounce'][k], v) + def test_cfg_has_no_fingerprint_has_value(self): + # test value is used when fingerprint not provided + odata = {'HostName': "myhost", 'UserName': "myuser"} + mypklist = [{'fingerprint': None, 'path': 'path1', 'value': 'value1'}] + pubkeys = [(x['fingerprint'], x['path'], x['value']) for x in mypklist] + data = {'ovfcontent': construct_valid_ovf_env(data=odata, + pubkeys=pubkeys)} - def test_set_hostname_disabled(self): - # config specifying set_hostname off should not bounce - cfg = {'set_hostname': False} - odata = {'HostName': "xhost", - 'dscfg': {'text': base64.b64encode(yaml.dump(cfg)), - 'encoding': 'base64'}} - data = {'ovfcontent': construct_valid_ovf_env(data=odata)} - self._get_ds(data).get_data() + dsrc = self._get_ds(data) + ret = dsrc.get_data() + self.assertTrue(ret) - self.assertEqual(data.get('apply_hostname_bounce', "N/A"), "N/A") + for mypk in mypklist: + self.assertIn(mypk['value'], dsrc.metadata['public-keys']) def test_default_ephemeral(self): # make sure the ephemeral device works @@ -318,8 +365,8 @@ class TestAzureDataSource(MockerTestCase): # Make sure that user can affect disk aliases dscfg = {'disk_aliases': {'ephemeral0': '/dev/sdc'}} odata = {'HostName': "myhost", 'UserName': "myuser", - 'dscfg': {'text': base64.b64encode(yaml.dump(dscfg)), - 'encoding': 'base64'}} + 'dscfg': {'text': b64e(yaml.dump(dscfg)), + 'encoding': 'base64'}} usercfg = {'disk_setup': {'/dev/sdc': {'something': '...'}, 'ephemeral0': False}} userdata = '#cloud-config' + yaml.dump(usercfg) + "\n" @@ -340,7 +387,32 @@ class TestAzureDataSource(MockerTestCase): dsrc = self._get_ds(data) dsrc.get_data() - self.assertEqual(userdata, dsrc.userdata_raw) + self.assertEqual(userdata.encode('us-ascii'), dsrc.userdata_raw) + + def test_password_redacted_in_ovf(self): + odata = {'HostName': "myhost", 'UserName': "myuser", + 'UserPassword': "mypass"} + data = {'ovfcontent': construct_valid_ovf_env(data=odata)} + dsrc = self._get_ds(data) + ret = dsrc.get_data() + + self.assertTrue(ret) + ovf_env_path = os.path.join(self.waagent_d, 'ovf-env.xml') + + # The XML should not be same since the user password is redacted + on_disk_ovf = load_file(ovf_env_path) + self.xml_notequals(data['ovfcontent'], on_disk_ovf) + + # Make sure that the redacted password on disk is not used by CI + self.assertNotEquals(dsrc.cfg.get('password'), + DataSourceAzure.DEF_PASSWD_REDACTION) + + # Make sure that the password was really encrypted + et = ET.fromstring(on_disk_ovf) + for elem in et.iter(): + if 'UserPassword' in elem.tag: + self.assertEquals(DataSourceAzure.DEF_PASSWD_REDACTION, + elem.text) def test_ovf_env_arrives_in_waagent_dir(self): xml = construct_valid_ovf_env(data={}, userdata="FOODATA") @@ -351,92 +423,224 @@ class TestAzureDataSource(MockerTestCase): # we expect that the ovf-env.xml file is copied there. ovf_env_path = os.path.join(self.waagent_d, 'ovf-env.xml') self.assertTrue(os.path.exists(ovf_env_path)) - self.assertEqual(xml, load_file(ovf_env_path)) - - def test_existing_ovf_same(self): - # waagent/SharedConfig left alone if found ovf-env.xml same as cached - odata = {'UserData': base64.b64encode("SOMEUSERDATA")} - data = {'ovfcontent': construct_valid_ovf_env(data=odata)} + self.xml_equals(xml, load_file(ovf_env_path)) - populate_dir(self.waagent_d, - {'ovf-env.xml': data['ovfcontent'], - 'otherfile': 'otherfile-content', - 'SharedConfig.xml': 'mysharedconfig'}) + def test_ovf_can_include_unicode(self): + xml = construct_valid_ovf_env(data={}) + xml = u'\ufeff{0}'.format(xml) + dsrc = self._get_ds({'ovfcontent': xml}) + dsrc.get_data() - dsrc = self._get_ds(data) - ret = dsrc.get_data() - self.assertTrue(ret) - self.assertTrue(os.path.exists( - os.path.join(self.waagent_d, 'ovf-env.xml'))) - self.assertTrue(os.path.exists( - os.path.join(self.waagent_d, 'otherfile'))) - self.assertTrue(os.path.exists( - os.path.join(self.waagent_d, 'SharedConfig.xml'))) - - def test_existing_ovf_diff(self): - # waagent/SharedConfig must be removed if ovfenv is found elsewhere - - # 'get_data' should remove SharedConfig.xml in /var/lib/waagent - # if ovf-env.xml differs. - cached_ovfenv = construct_valid_ovf_env( - {'userdata': base64.b64encode("FOO_USERDATA")}) - new_ovfenv = construct_valid_ovf_env( - {'userdata': base64.b64encode("NEW_USERDATA")}) - - populate_dir(self.waagent_d, - {'ovf-env.xml': cached_ovfenv, - 'SharedConfig.xml': "mysharedconfigxml", - 'otherfile': 'otherfilecontent'}) - - dsrc = self._get_ds({'ovfcontent': new_ovfenv}) - ret = dsrc.get_data() + def test_exception_fetching_fabric_data_doesnt_propagate(self): + ds = self._get_ds({'ovfcontent': construct_valid_ovf_env()}) + ds.ds_cfg['agent_command'] = '__builtin__' + self.get_metadata_from_fabric.side_effect = Exception + self.assertFalse(ds.get_data()) + + def test_fabric_data_included_in_metadata(self): + ds = self._get_ds({'ovfcontent': construct_valid_ovf_env()}) + ds.ds_cfg['agent_command'] = '__builtin__' + self.get_metadata_from_fabric.return_value = {'test': 'value'} + ret = ds.get_data() self.assertTrue(ret) - self.assertEqual(dsrc.userdata_raw, "NEW_USERDATA") - self.assertTrue(os.path.exists( - os.path.join(self.waagent_d, 'otherfile'))) - self.assertFalse( - os.path.exists(os.path.join(self.waagent_d, 'SharedConfig.xml'))) - self.assertTrue( - os.path.exists(os.path.join(self.waagent_d, 'ovf-env.xml'))) - self.assertEqual(new_ovfenv, - load_file(os.path.join(self.waagent_d, 'ovf-env.xml'))) - - -class TestReadAzureOvf(MockerTestCase): + self.assertEqual('value', ds.metadata['test']) + + def test_instance_id_from_dmidecode_used(self): + ds = self._get_ds({'ovfcontent': construct_valid_ovf_env()}) + ds.get_data() + self.assertEqual(self.instance_id, ds.metadata['instance-id']) + + def test_instance_id_from_dmidecode_used_for_builtin(self): + ds = self._get_ds({'ovfcontent': construct_valid_ovf_env()}) + ds.ds_cfg['agent_command'] = '__builtin__' + ds.get_data() + self.assertEqual(self.instance_id, ds.metadata['instance-id']) + + +class TestAzureBounce(TestCase): + + def mock_out_azure_moving_parts(self): + self.patches.enter_context( + mock.patch.object(DataSourceAzure, 'invoke_agent')) + self.patches.enter_context( + mock.patch.object(DataSourceAzure, 'wait_for_files')) + self.patches.enter_context( + mock.patch.object(DataSourceAzure, 'list_possible_azure_ds_devs', + mock.MagicMock(return_value=[]))) + self.patches.enter_context( + mock.patch.object(DataSourceAzure, + 'find_fabric_formatted_ephemeral_disk', + mock.MagicMock(return_value=None))) + self.patches.enter_context( + mock.patch.object(DataSourceAzure, + 'find_fabric_formatted_ephemeral_part', + mock.MagicMock(return_value=None))) + self.patches.enter_context( + mock.patch.object(DataSourceAzure, 'get_metadata_from_fabric', + mock.MagicMock(return_value={}))) + self.patches.enter_context( + mock.patch.object(DataSourceAzure.util, 'read_dmi_data', + mock.MagicMock(return_value='test-instance-id'))) + + def setUp(self): + super(TestAzureBounce, self).setUp() + self.tmp = tempfile.mkdtemp() + self.waagent_d = os.path.join(self.tmp, 'var', 'lib', 'waagent') + self.paths = helpers.Paths({'cloud_dir': self.tmp}) + self.addCleanup(shutil.rmtree, self.tmp) + DataSourceAzure.BUILTIN_DS_CONFIG['data_dir'] = self.waagent_d + self.patches = ExitStack() + self.mock_out_azure_moving_parts() + self.get_hostname = self.patches.enter_context( + mock.patch.object(DataSourceAzure, 'get_hostname')) + self.set_hostname = self.patches.enter_context( + mock.patch.object(DataSourceAzure, 'set_hostname')) + self.subp = self.patches.enter_context( + mock.patch('cloudinit.sources.DataSourceAzure.util.subp')) + + def tearDown(self): + self.patches.close() + + def _get_ds(self, ovfcontent=None): + if ovfcontent is not None: + populate_dir(os.path.join(self.paths.seed_dir, "azure"), + {'ovf-env.xml': ovfcontent}) + return DataSourceAzure.DataSourceAzureNet( + {}, distro=None, paths=self.paths) + + def get_ovf_env_with_dscfg(self, hostname, cfg): + odata = { + 'HostName': hostname, + 'dscfg': { + 'text': b64e(yaml.dump(cfg)), + 'encoding': 'base64' + } + } + return construct_valid_ovf_env(data=odata) + + def test_disabled_bounce_does_not_change_hostname(self): + cfg = {'hostname_bounce': {'policy': 'off'}} + self._get_ds(self.get_ovf_env_with_dscfg('test-host', cfg)).get_data() + self.assertEqual(0, self.set_hostname.call_count) + + @mock.patch('cloudinit.sources.DataSourceAzure.perform_hostname_bounce') + def test_disabled_bounce_does_not_perform_bounce( + self, perform_hostname_bounce): + cfg = {'hostname_bounce': {'policy': 'off'}} + self._get_ds(self.get_ovf_env_with_dscfg('test-host', cfg)).get_data() + self.assertEqual(0, perform_hostname_bounce.call_count) + + def test_same_hostname_does_not_change_hostname(self): + host_name = 'unchanged-host-name' + self.get_hostname.return_value = host_name + cfg = {'hostname_bounce': {'policy': 'yes'}} + self._get_ds(self.get_ovf_env_with_dscfg(host_name, cfg)).get_data() + self.assertEqual(0, self.set_hostname.call_count) + + @mock.patch('cloudinit.sources.DataSourceAzure.perform_hostname_bounce') + def test_unchanged_hostname_does_not_perform_bounce( + self, perform_hostname_bounce): + host_name = 'unchanged-host-name' + self.get_hostname.return_value = host_name + cfg = {'hostname_bounce': {'policy': 'yes'}} + self._get_ds(self.get_ovf_env_with_dscfg(host_name, cfg)).get_data() + self.assertEqual(0, perform_hostname_bounce.call_count) + + @mock.patch('cloudinit.sources.DataSourceAzure.perform_hostname_bounce') + def test_force_performs_bounce_regardless(self, perform_hostname_bounce): + host_name = 'unchanged-host-name' + self.get_hostname.return_value = host_name + cfg = {'hostname_bounce': {'policy': 'force'}} + self._get_ds(self.get_ovf_env_with_dscfg(host_name, cfg)).get_data() + self.assertEqual(1, perform_hostname_bounce.call_count) + + def test_different_hostnames_sets_hostname(self): + expected_hostname = 'azure-expected-host-name' + self.get_hostname.return_value = 'default-host-name' + self._get_ds( + self.get_ovf_env_with_dscfg(expected_hostname, {})).get_data() + self.assertEqual(expected_hostname, + self.set_hostname.call_args_list[0][0][0]) + + @mock.patch('cloudinit.sources.DataSourceAzure.perform_hostname_bounce') + def test_different_hostnames_performs_bounce( + self, perform_hostname_bounce): + expected_hostname = 'azure-expected-host-name' + self.get_hostname.return_value = 'default-host-name' + self._get_ds( + self.get_ovf_env_with_dscfg(expected_hostname, {})).get_data() + self.assertEqual(1, perform_hostname_bounce.call_count) + + def test_different_hostnames_sets_hostname_back(self): + initial_host_name = 'default-host-name' + self.get_hostname.return_value = initial_host_name + self._get_ds( + self.get_ovf_env_with_dscfg('some-host-name', {})).get_data() + self.assertEqual(initial_host_name, + self.set_hostname.call_args_list[-1][0][0]) + + @mock.patch('cloudinit.sources.DataSourceAzure.perform_hostname_bounce') + def test_failure_in_bounce_still_resets_host_name( + self, perform_hostname_bounce): + perform_hostname_bounce.side_effect = Exception + initial_host_name = 'default-host-name' + self.get_hostname.return_value = initial_host_name + self._get_ds( + self.get_ovf_env_with_dscfg('some-host-name', {})).get_data() + self.assertEqual(initial_host_name, + self.set_hostname.call_args_list[-1][0][0]) + + def test_environment_correct_for_bounce_command(self): + interface = 'int0' + hostname = 'my-new-host' + old_hostname = 'my-old-host' + self.get_hostname.return_value = old_hostname + cfg = {'hostname_bounce': {'interface': interface, 'policy': 'force'}} + data = self.get_ovf_env_with_dscfg(hostname, cfg) + self._get_ds(data).get_data() + self.assertEqual(1, self.subp.call_count) + bounce_env = self.subp.call_args[1]['env'] + self.assertEqual(interface, bounce_env['interface']) + self.assertEqual(hostname, bounce_env['hostname']) + self.assertEqual(old_hostname, bounce_env['old_hostname']) + + def test_default_bounce_command_used_by_default(self): + cmd = 'default-bounce-command' + DataSourceAzure.BUILTIN_DS_CONFIG['hostname_bounce']['command'] = cmd + cfg = {'hostname_bounce': {'policy': 'force'}} + data = self.get_ovf_env_with_dscfg('some-hostname', cfg) + self._get_ds(data).get_data() + self.assertEqual(1, self.subp.call_count) + bounce_args = self.subp.call_args[1]['args'] + self.assertEqual(cmd, bounce_args) + + @mock.patch('cloudinit.sources.DataSourceAzure.perform_hostname_bounce') + def test_set_hostname_option_can_disable_bounce( + self, perform_hostname_bounce): + cfg = {'set_hostname': False, 'hostname_bounce': {'policy': 'force'}} + data = self.get_ovf_env_with_dscfg('some-hostname', cfg) + self._get_ds(data).get_data() + + self.assertEqual(0, perform_hostname_bounce.call_count) + + def test_set_hostname_option_can_disable_hostname_set(self): + cfg = {'set_hostname': False, 'hostname_bounce': {'policy': 'force'}} + data = self.get_ovf_env_with_dscfg('some-hostname', cfg) + self._get_ds(data).get_data() + + self.assertEqual(0, self.set_hostname.call_count) + + +class TestReadAzureOvf(TestCase): def test_invalid_xml_raises_non_azure_ds(self): invalid_xml = "<foo>" + construct_valid_ovf_env(data={}) self.assertRaises(DataSourceAzure.BrokenAzureDataSource, - DataSourceAzure.read_azure_ovf, invalid_xml) + DataSourceAzure.read_azure_ovf, invalid_xml) def test_load_with_pubkeys(self): - mypklist = [{'fingerprint': 'fp1', 'path': 'path1'}] - pubkeys = [(x['fingerprint'], x['path']) for x in mypklist] + mypklist = [{'fingerprint': 'fp1', 'path': 'path1', 'value': ''}] + pubkeys = [(x['fingerprint'], x['path'], x['value']) for x in mypklist] content = construct_valid_ovf_env(pubkeys=pubkeys) (_md, _ud, cfg) = DataSourceAzure.read_azure_ovf(content) for mypk in mypklist: self.assertIn(mypk, cfg['_pubkeys']) - - -class TestReadAzureSharedConfig(MockerTestCase): - def test_valid_content(self): - xml = """<?xml version="1.0" encoding="utf-8"?> - <SharedConfig> - <Deployment name="MY_INSTANCE_ID"> - <Service name="myservice"/> - <ServiceInstance name="INSTANCE_ID.0" guid="{abcd-uuid}" /> - </Deployment> - <Incarnation number="1"/> - </SharedConfig>""" - ret = DataSourceAzure.iid_from_shared_config_content(xml) - self.assertEqual("MY_INSTANCE_ID", ret) - - -def apply_patches(patches): - ret = [] - for (ref, name, replace) in patches: - if replace is None: - continue - orig = getattr(ref, name) - setattr(ref, name, replace) - ret.append((ref, name, orig)) - return ret diff --git a/tests/unittests/test_datasource/test_azure_helper.py b/tests/unittests/test_datasource/test_azure_helper.py new file mode 100644 index 00000000..1134199b --- /dev/null +++ b/tests/unittests/test_datasource/test_azure_helper.py @@ -0,0 +1,420 @@ +import os + +from cloudinit.sources.helpers import azure as azure_helper +from ..helpers import TestCase + +try: + from unittest import mock +except ImportError: + import mock + +try: + from contextlib import ExitStack +except ImportError: + from contextlib2 import ExitStack + + +GOAL_STATE_TEMPLATE = """\ +<?xml version="1.0" encoding="utf-8"?> +<GoalState xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:noNamespaceSchemaLocation="goalstate10.xsd"> + <Version>2012-11-30</Version> + <Incarnation>{incarnation}</Incarnation> + <Machine> + <ExpectedState>Started</ExpectedState> + <StopRolesDeadlineHint>300000</StopRolesDeadlineHint> + <LBProbePorts> + <Port>16001</Port> + </LBProbePorts> + <ExpectHealthReport>FALSE</ExpectHealthReport> + </Machine> + <Container> + <ContainerId>{container_id}</ContainerId> + <RoleInstanceList> + <RoleInstance> + <InstanceId>{instance_id}</InstanceId> + <State>Started</State> + <Configuration> + <HostingEnvironmentConfig> + http://100.86.192.70:80/...hostingEnvironmentConfig... + </HostingEnvironmentConfig> + <SharedConfig>http://100.86.192.70:80/..SharedConfig..</SharedConfig> + <ExtensionsConfig> + http://100.86.192.70:80/...extensionsConfig... + </ExtensionsConfig> + <FullConfig>http://100.86.192.70:80/...fullConfig...</FullConfig> + <Certificates>{certificates_url}</Certificates> + <ConfigName>68ce47.0.68ce47.0.utl-trusty--292258.1.xml</ConfigName> + </Configuration> + </RoleInstance> + </RoleInstanceList> + </Container> +</GoalState> +""" + + +class TestFindEndpoint(TestCase): + + def setUp(self): + super(TestFindEndpoint, self).setUp() + patches = ExitStack() + self.addCleanup(patches.close) + + self.load_file = patches.enter_context( + mock.patch.object(azure_helper.util, 'load_file')) + + def test_missing_file(self): + self.load_file.side_effect = IOError + self.assertRaises(IOError, + azure_helper.WALinuxAgentShim.find_endpoint) + + def test_missing_special_azure_line(self): + self.load_file.return_value = '' + self.assertRaises(Exception, + azure_helper.WALinuxAgentShim.find_endpoint) + + @staticmethod + def _build_lease_content(encoded_address): + return '\n'.join([ + 'lease {', + ' interface "eth0";', + ' option unknown-245 {0};'.format(encoded_address), + '}']) + + def test_latest_lease_used(self): + encoded_addresses = ['5:4:3:2', '4:3:2:1'] + file_content = '\n'.join([self._build_lease_content(encoded_address) + for encoded_address in encoded_addresses]) + self.load_file.return_value = file_content + self.assertEqual(encoded_addresses[-1].replace(':', '.'), + azure_helper.WALinuxAgentShim.find_endpoint()) + + +class TestExtractIpAddressFromLeaseValue(TestCase): + + def test_hex_string(self): + ip_address, encoded_address = '98.76.54.32', '62:4c:36:20' + self.assertEqual( + ip_address, + azure_helper.WALinuxAgentShim.get_ip_from_lease_value( + encoded_address + )) + + def test_hex_string_with_single_character_part(self): + ip_address, encoded_address = '4.3.2.1', '4:3:2:1' + self.assertEqual( + ip_address, + azure_helper.WALinuxAgentShim.get_ip_from_lease_value( + encoded_address + )) + + def test_packed_string(self): + ip_address, encoded_address = '98.76.54.32', 'bL6 ' + self.assertEqual( + ip_address, + azure_helper.WALinuxAgentShim.get_ip_from_lease_value( + encoded_address + )) + + def test_packed_string_with_escaped_quote(self): + ip_address, encoded_address = '100.72.34.108', 'dH\\"l' + self.assertEqual( + ip_address, + azure_helper.WALinuxAgentShim.get_ip_from_lease_value( + encoded_address + )) + + def test_packed_string_containing_a_colon(self): + ip_address, encoded_address = '100.72.58.108', 'dH:l' + self.assertEqual( + ip_address, + azure_helper.WALinuxAgentShim.get_ip_from_lease_value( + encoded_address + )) + + +class TestGoalStateParsing(TestCase): + + default_parameters = { + 'incarnation': 1, + 'container_id': 'MyContainerId', + 'instance_id': 'MyInstanceId', + 'certificates_url': 'MyCertificatesUrl', + } + + def _get_goal_state(self, http_client=None, **kwargs): + if http_client is None: + http_client = mock.MagicMock() + parameters = self.default_parameters.copy() + parameters.update(kwargs) + xml = GOAL_STATE_TEMPLATE.format(**parameters) + if parameters['certificates_url'] is None: + new_xml_lines = [] + for line in xml.splitlines(): + if 'Certificates' in line: + continue + new_xml_lines.append(line) + xml = '\n'.join(new_xml_lines) + return azure_helper.GoalState(xml, http_client) + + def test_incarnation_parsed_correctly(self): + incarnation = '123' + goal_state = self._get_goal_state(incarnation=incarnation) + self.assertEqual(incarnation, goal_state.incarnation) + + def test_container_id_parsed_correctly(self): + container_id = 'TestContainerId' + goal_state = self._get_goal_state(container_id=container_id) + self.assertEqual(container_id, goal_state.container_id) + + def test_instance_id_parsed_correctly(self): + instance_id = 'TestInstanceId' + goal_state = self._get_goal_state(instance_id=instance_id) + self.assertEqual(instance_id, goal_state.instance_id) + + def test_certificates_xml_parsed_and_fetched_correctly(self): + http_client = mock.MagicMock() + certificates_url = 'TestCertificatesUrl' + goal_state = self._get_goal_state( + http_client=http_client, certificates_url=certificates_url) + certificates_xml = goal_state.certificates_xml + self.assertEqual(1, http_client.get.call_count) + self.assertEqual(certificates_url, http_client.get.call_args[0][0]) + self.assertTrue(http_client.get.call_args[1].get('secure', False)) + self.assertEqual(http_client.get.return_value.contents, + certificates_xml) + + def test_missing_certificates_skips_http_get(self): + http_client = mock.MagicMock() + goal_state = self._get_goal_state( + http_client=http_client, certificates_url=None) + certificates_xml = goal_state.certificates_xml + self.assertEqual(0, http_client.get.call_count) + self.assertIsNone(certificates_xml) + + +class TestAzureEndpointHttpClient(TestCase): + + regular_headers = { + 'x-ms-agent-name': 'WALinuxAgent', + 'x-ms-version': '2012-11-30', + } + + def setUp(self): + super(TestAzureEndpointHttpClient, self).setUp() + patches = ExitStack() + self.addCleanup(patches.close) + + self.read_file_or_url = patches.enter_context( + mock.patch.object(azure_helper.util, 'read_file_or_url')) + + def test_non_secure_get(self): + client = azure_helper.AzureEndpointHttpClient(mock.MagicMock()) + url = 'MyTestUrl' + response = client.get(url, secure=False) + self.assertEqual(1, self.read_file_or_url.call_count) + self.assertEqual(self.read_file_or_url.return_value, response) + self.assertEqual(mock.call(url, headers=self.regular_headers), + self.read_file_or_url.call_args) + + def test_secure_get(self): + url = 'MyTestUrl' + certificate = mock.MagicMock() + expected_headers = self.regular_headers.copy() + expected_headers.update({ + "x-ms-cipher-name": "DES_EDE3_CBC", + "x-ms-guest-agent-public-x509-cert": certificate, + }) + client = azure_helper.AzureEndpointHttpClient(certificate) + response = client.get(url, secure=True) + self.assertEqual(1, self.read_file_or_url.call_count) + self.assertEqual(self.read_file_or_url.return_value, response) + self.assertEqual(mock.call(url, headers=expected_headers), + self.read_file_or_url.call_args) + + def test_post(self): + data = mock.MagicMock() + url = 'MyTestUrl' + client = azure_helper.AzureEndpointHttpClient(mock.MagicMock()) + response = client.post(url, data=data) + self.assertEqual(1, self.read_file_or_url.call_count) + self.assertEqual(self.read_file_or_url.return_value, response) + self.assertEqual( + mock.call(url, data=data, headers=self.regular_headers), + self.read_file_or_url.call_args) + + def test_post_with_extra_headers(self): + url = 'MyTestUrl' + client = azure_helper.AzureEndpointHttpClient(mock.MagicMock()) + extra_headers = {'test': 'header'} + client.post(url, extra_headers=extra_headers) + self.assertEqual(1, self.read_file_or_url.call_count) + expected_headers = self.regular_headers.copy() + expected_headers.update(extra_headers) + self.assertEqual( + mock.call(mock.ANY, data=mock.ANY, headers=expected_headers), + self.read_file_or_url.call_args) + + +class TestOpenSSLManager(TestCase): + + def setUp(self): + super(TestOpenSSLManager, self).setUp() + patches = ExitStack() + self.addCleanup(patches.close) + + self.subp = patches.enter_context( + mock.patch.object(azure_helper.util, 'subp')) + try: + self.open = patches.enter_context( + mock.patch('__builtin__.open')) + except ImportError: + self.open = patches.enter_context( + mock.patch('builtins.open')) + + @mock.patch.object(azure_helper, 'cd', mock.MagicMock()) + @mock.patch.object(azure_helper.tempfile, 'mkdtemp') + def test_openssl_manager_creates_a_tmpdir(self, mkdtemp): + manager = azure_helper.OpenSSLManager() + self.assertEqual(mkdtemp.return_value, manager.tmpdir) + + def test_generate_certificate_uses_tmpdir(self): + subp_directory = {} + + def capture_directory(*args, **kwargs): + subp_directory['path'] = os.getcwd() + + self.subp.side_effect = capture_directory + manager = azure_helper.OpenSSLManager() + self.assertEqual(manager.tmpdir, subp_directory['path']) + + @mock.patch.object(azure_helper, 'cd', mock.MagicMock()) + @mock.patch.object(azure_helper.tempfile, 'mkdtemp', mock.MagicMock()) + @mock.patch.object(azure_helper.util, 'del_dir') + def test_clean_up(self, del_dir): + manager = azure_helper.OpenSSLManager() + manager.clean_up() + self.assertEqual([mock.call(manager.tmpdir)], del_dir.call_args_list) + + +class TestWALinuxAgentShim(TestCase): + + def setUp(self): + super(TestWALinuxAgentShim, self).setUp() + patches = ExitStack() + self.addCleanup(patches.close) + + self.AzureEndpointHttpClient = patches.enter_context( + mock.patch.object(azure_helper, 'AzureEndpointHttpClient')) + self.find_endpoint = patches.enter_context( + mock.patch.object( + azure_helper.WALinuxAgentShim, 'find_endpoint')) + self.GoalState = patches.enter_context( + mock.patch.object(azure_helper, 'GoalState')) + self.OpenSSLManager = patches.enter_context( + mock.patch.object(azure_helper, 'OpenSSLManager')) + patches.enter_context( + mock.patch.object(azure_helper.time, 'sleep', mock.MagicMock())) + + def test_http_client_uses_certificate(self): + shim = azure_helper.WALinuxAgentShim() + shim.register_with_azure_and_fetch_data() + self.assertEqual( + [mock.call(self.OpenSSLManager.return_value.certificate)], + self.AzureEndpointHttpClient.call_args_list) + + def test_correct_url_used_for_goalstate(self): + self.find_endpoint.return_value = 'test_endpoint' + shim = azure_helper.WALinuxAgentShim() + shim.register_with_azure_and_fetch_data() + get = self.AzureEndpointHttpClient.return_value.get + self.assertEqual( + [mock.call('http://test_endpoint/machine/?comp=goalstate')], + get.call_args_list) + self.assertEqual( + [mock.call(get.return_value.contents, + self.AzureEndpointHttpClient.return_value)], + self.GoalState.call_args_list) + + def test_certificates_used_to_determine_public_keys(self): + shim = azure_helper.WALinuxAgentShim() + data = shim.register_with_azure_and_fetch_data() + self.assertEqual( + [mock.call(self.GoalState.return_value.certificates_xml)], + self.OpenSSLManager.return_value.parse_certificates.call_args_list) + self.assertEqual( + self.OpenSSLManager.return_value.parse_certificates.return_value, + data['public-keys']) + + def test_absent_certificates_produces_empty_public_keys(self): + self.GoalState.return_value.certificates_xml = None + shim = azure_helper.WALinuxAgentShim() + data = shim.register_with_azure_and_fetch_data() + self.assertEqual([], data['public-keys']) + + def test_correct_url_used_for_report_ready(self): + self.find_endpoint.return_value = 'test_endpoint' + shim = azure_helper.WALinuxAgentShim() + shim.register_with_azure_and_fetch_data() + expected_url = 'http://test_endpoint/machine?comp=health' + self.assertEqual( + [mock.call(expected_url, data=mock.ANY, extra_headers=mock.ANY)], + self.AzureEndpointHttpClient.return_value.post.call_args_list) + + def test_goal_state_values_used_for_report_ready(self): + self.GoalState.return_value.incarnation = 'TestIncarnation' + self.GoalState.return_value.container_id = 'TestContainerId' + self.GoalState.return_value.instance_id = 'TestInstanceId' + shim = azure_helper.WALinuxAgentShim() + shim.register_with_azure_and_fetch_data() + posted_document = ( + self.AzureEndpointHttpClient.return_value.post.call_args[1]['data'] + ) + self.assertIn('TestIncarnation', posted_document) + self.assertIn('TestContainerId', posted_document) + self.assertIn('TestInstanceId', posted_document) + + def test_clean_up_can_be_called_at_any_time(self): + shim = azure_helper.WALinuxAgentShim() + shim.clean_up() + + def test_clean_up_will_clean_up_openssl_manager_if_instantiated(self): + shim = azure_helper.WALinuxAgentShim() + shim.register_with_azure_and_fetch_data() + shim.clean_up() + self.assertEqual( + 1, self.OpenSSLManager.return_value.clean_up.call_count) + + def test_failure_to_fetch_goalstate_bubbles_up(self): + class SentinelException(Exception): + pass + self.AzureEndpointHttpClient.return_value.get.side_effect = ( + SentinelException) + shim = azure_helper.WALinuxAgentShim() + self.assertRaises(SentinelException, + shim.register_with_azure_and_fetch_data) + + +class TestGetMetadataFromFabric(TestCase): + + @mock.patch.object(azure_helper, 'WALinuxAgentShim') + def test_data_from_shim_returned(self, shim): + ret = azure_helper.get_metadata_from_fabric() + self.assertEqual( + shim.return_value.register_with_azure_and_fetch_data.return_value, + ret) + + @mock.patch.object(azure_helper, 'WALinuxAgentShim') + def test_success_calls_clean_up(self, shim): + azure_helper.get_metadata_from_fabric() + self.assertEqual(1, shim.return_value.clean_up.call_count) + + @mock.patch.object(azure_helper, 'WALinuxAgentShim') + def test_failure_in_registration_calls_clean_up(self, shim): + class SentinelException(Exception): + pass + shim.return_value.register_with_azure_and_fetch_data.side_effect = ( + SentinelException) + self.assertRaises(SentinelException, + azure_helper.get_metadata_from_fabric) + self.assertEqual(1, shim.return_value.clean_up.call_count) diff --git a/tests/unittests/test_datasource/test_cloudsigma.py b/tests/unittests/test_datasource/test_cloudsigma.py index 306ac7d8..772d189a 100644 --- a/tests/unittests/test_datasource/test_cloudsigma.py +++ b/tests/unittests/test_datasource/test_cloudsigma.py @@ -39,6 +39,7 @@ class CepkoMock(Cepko): class DataSourceCloudSigmaTest(test_helpers.TestCase): def setUp(self): + super(DataSourceCloudSigmaTest, self).setUp() self.datasource = DataSourceCloudSigma.DataSourceCloudSigma("", "", "") self.datasource.is_running_in_cloudsigma = lambda: True self.datasource.cepko = CepkoMock(SERVER_CONTEXT) diff --git a/tests/unittests/test_datasource/test_cloudstack.py b/tests/unittests/test_datasource/test_cloudstack.py new file mode 100644 index 00000000..656d80d1 --- /dev/null +++ b/tests/unittests/test_datasource/test_cloudstack.py @@ -0,0 +1,86 @@ +from cloudinit import helpers +from cloudinit.sources.DataSourceCloudStack import DataSourceCloudStack +from ..helpers import TestCase + +try: + from unittest import mock +except ImportError: + import mock +try: + from contextlib import ExitStack +except ImportError: + from contextlib2 import ExitStack + + +class TestCloudStackPasswordFetching(TestCase): + + def setUp(self): + super(TestCloudStackPasswordFetching, self).setUp() + self.patches = ExitStack() + self.addCleanup(self.patches.close) + mod_name = 'cloudinit.sources.DataSourceCloudStack' + self.patches.enter_context(mock.patch('{0}.ec2'.format(mod_name))) + self.patches.enter_context(mock.patch('{0}.uhelp'.format(mod_name))) + + def _set_password_server_response(self, response_string): + subp = mock.MagicMock(return_value=(response_string, '')) + self.patches.enter_context( + mock.patch('cloudinit.sources.DataSourceCloudStack.util.subp', + subp)) + return subp + + def test_empty_password_doesnt_create_config(self): + self._set_password_server_response('') + ds = DataSourceCloudStack({}, None, helpers.Paths({})) + ds.get_data() + self.assertEqual({}, ds.get_config_obj()) + + def test_saved_password_doesnt_create_config(self): + self._set_password_server_response('saved_password') + ds = DataSourceCloudStack({}, None, helpers.Paths({})) + ds.get_data() + self.assertEqual({}, ds.get_config_obj()) + + def test_password_sets_password(self): + password = 'SekritSquirrel' + self._set_password_server_response(password) + ds = DataSourceCloudStack({}, None, helpers.Paths({})) + ds.get_data() + self.assertEqual(password, ds.get_config_obj()['password']) + + def test_bad_request_doesnt_stop_ds_from_working(self): + self._set_password_server_response('bad_request') + ds = DataSourceCloudStack({}, None, helpers.Paths({})) + self.assertTrue(ds.get_data()) + + def assertRequestTypesSent(self, subp, expected_request_types): + request_types = [] + for call in subp.call_args_list: + args = call[0][0] + for arg in args: + if arg.startswith('DomU_Request'): + request_types.append(arg.split()[1]) + self.assertEqual(expected_request_types, request_types) + + def test_valid_response_means_password_marked_as_saved(self): + password = 'SekritSquirrel' + subp = self._set_password_server_response(password) + ds = DataSourceCloudStack({}, None, helpers.Paths({})) + ds.get_data() + self.assertRequestTypesSent(subp, + ['send_my_password', 'saved_password']) + + def _check_password_not_saved_for(self, response_string): + subp = self._set_password_server_response(response_string) + ds = DataSourceCloudStack({}, None, helpers.Paths({})) + ds.get_data() + self.assertRequestTypesSent(subp, ['send_my_password']) + + def test_password_not_saved_if_empty(self): + self._check_password_not_saved_for('') + + def test_password_not_saved_if_already_saved(self): + self._check_password_not_saved_for('saved_password') + + def test_password_not_saved_if_bad_request(self): + self._check_password_not_saved_for('bad_request') diff --git a/tests/unittests/test_datasource/test_configdrive.py b/tests/unittests/test_datasource/test_configdrive.py index d88066e5..89b15f54 100644 --- a/tests/unittests/test_datasource/test_configdrive.py +++ b/tests/unittests/test_datasource/test_configdrive.py @@ -1,10 +1,18 @@ from copy import copy import json import os -import os.path - -import mocker -from mocker import MockerTestCase +import shutil +import six +import tempfile + +try: + from unittest import mock +except ImportError: + import mock +try: + from contextlib import ExitStack +except ImportError: + from contextlib2 import ExitStack from cloudinit import helpers from cloudinit import settings @@ -12,7 +20,8 @@ from cloudinit.sources import DataSourceConfigDrive as ds from cloudinit.sources.helpers import openstack from cloudinit import util -from .. import helpers as unit_helpers +from ..helpers import TestCase + PUBKEY = u'ssh-rsa AAAAB3NzaC1....sIkJhq8wdX+4I3A4cYbYP ubuntu@server-460\n' EC2_META = { @@ -37,7 +46,7 @@ EC2_META = { 'reservation-id': 'r-iru5qm4m', 'security-groups': ['default'] } -USER_DATA = '#!/bin/sh\necho This is user data\n' +USER_DATA = b'#!/bin/sh\necho This is user data\n' OSTACK_META = { 'availability_zone': 'nova', 'files': [{'content_path': '/content/0000', 'path': '/etc/foo.cfg'}, @@ -48,27 +57,60 @@ OSTACK_META = { 'public_keys': {'mykey': PUBKEY}, 'uuid': 'b0fa911b-69d4-4476-bbe2-1c92bff6535c'} -CONTENT_0 = 'This is contents of /etc/foo.cfg\n' -CONTENT_1 = '# this is /etc/bar/bar.cfg\n' +CONTENT_0 = b'This is contents of /etc/foo.cfg\n' +CONTENT_1 = b'# this is /etc/bar/bar.cfg\n' +NETWORK_DATA = { + 'services': [ + {'type': 'dns', 'address': '199.204.44.24'}, + {'type': 'dns', 'address': '199.204.47.54'} + ], + 'links': [ + {'vif_id': '2ecc7709-b3f7-4448-9580-e1ec32d75bbd', + 'ethernet_mac_address': 'fa:16:3e:69:b0:58', + 'type': 'ovs', 'mtu': None, 'id': 'tap2ecc7709-b3'}, + {'vif_id': '2f88d109-5b57-40e6-af32-2472df09dc33', + 'ethernet_mac_address': 'fa:16:3e:d4:57:ad', + 'type': 'ovs', 'mtu': None, 'id': 'tap2f88d109-5b'}, + {'vif_id': '1a5382f8-04c5-4d75-ab98-d666c1ef52cc', + 'ethernet_mac_address': 'fa:16:3e:05:30:fe', + 'type': 'ovs', 'mtu': None, 'id': 'tap1a5382f8-04'} + ], + 'networks': [ + {'link': 'tap2ecc7709-b3', 'type': 'ipv4_dhcp', + 'network_id': '6d6357ac-0f70-4afa-8bd7-c274cc4ea235', + 'id': 'network0'}, + {'link': 'tap2f88d109-5b', 'type': 'ipv4_dhcp', + 'network_id': 'd227a9b3-6960-4d94-8976-ee5788b44f54', + 'id': 'network1'}, + {'link': 'tap1a5382f8-04', 'type': 'ipv4_dhcp', + 'network_id': 'dab2ba57-cae2-4311-a5ed-010b263891f5', + 'id': 'network2'} + ] +} CFG_DRIVE_FILES_V2 = { - 'ec2/2009-04-04/meta-data.json': json.dumps(EC2_META), - 'ec2/2009-04-04/user-data': USER_DATA, - 'ec2/latest/meta-data.json': json.dumps(EC2_META), - 'ec2/latest/user-data': USER_DATA, - 'openstack/2012-08-10/meta_data.json': json.dumps(OSTACK_META), - 'openstack/2012-08-10/user_data': USER_DATA, - 'openstack/content/0000': CONTENT_0, - 'openstack/content/0001': CONTENT_1, - 'openstack/latest/meta_data.json': json.dumps(OSTACK_META), - 'openstack/latest/user_data': USER_DATA} - - -class TestConfigDriveDataSource(MockerTestCase): + 'ec2/2009-04-04/meta-data.json': json.dumps(EC2_META), + 'ec2/2009-04-04/user-data': USER_DATA, + 'ec2/latest/meta-data.json': json.dumps(EC2_META), + 'ec2/latest/user-data': USER_DATA, + 'openstack/2012-08-10/meta_data.json': json.dumps(OSTACK_META), + 'openstack/2012-08-10/user_data': USER_DATA, + 'openstack/content/0000': CONTENT_0, + 'openstack/content/0001': CONTENT_1, + 'openstack/latest/meta_data.json': json.dumps(OSTACK_META), + 'openstack/latest/user_data': USER_DATA, + 'openstack/latest/network_data.json': json.dumps(NETWORK_DATA), + 'openstack/2015-10-15/meta_data.json': json.dumps(OSTACK_META), + 'openstack/2015-10-15/user_data': USER_DATA, + 'openstack/2015-10-15/network_data.json': json.dumps(NETWORK_DATA)} + + +class TestConfigDriveDataSource(TestCase): def setUp(self): super(TestConfigDriveDataSource, self).setUp() - self.tmp = self.makeDir() + self.tmp = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, self.tmp) def test_ec2_metadata(self): populate_dir(self.tmp, CFG_DRIVE_FILES_V2) @@ -91,23 +133,29 @@ class TestConfigDriveDataSource(MockerTestCase): 'swap': '/dev/vda3', } for name, dev_name in name_tests.items(): - with unit_helpers.mocker() as my_mock: - find_mock = my_mock.replace(util.find_devs_with, - spec=False, passthrough=False) + with ExitStack() as mocks: provided_name = dev_name[len('/dev/'):] provided_name = "s" + provided_name[1:] - find_mock(mocker.ARGS) - my_mock.result([provided_name]) - exists_mock = my_mock.replace(os.path.exists, - spec=False, passthrough=False) - exists_mock(mocker.ARGS) - my_mock.result(False) - exists_mock(mocker.ARGS) - my_mock.result(True) - my_mock.replay() + find_mock = mocks.enter_context( + mock.patch.object(util, 'find_devs_with', + return_value=[provided_name])) + # We want os.path.exists() to return False on its first call, + # and True on its second call. We use a handy generator as + # the mock side effect for this. The mocked function returns + # what the side effect returns. + + def exists_side_effect(): + yield False + yield True + exists_mock = mocks.enter_context( + mock.patch.object(os.path, 'exists', + side_effect=exists_side_effect())) device = cfg_ds.device_name_to_device(name) self.assertEquals(dev_name, device) + find_mock.assert_called_once_with(mock.ANY) + self.assertEqual(exists_mock.call_count, 2) + def test_dev_os_map(self): populate_dir(self.tmp, CFG_DRIVE_FILES_V2) cfg_ds = ds.DataSourceConfigDrive(settings.CFG_BUILTIN, @@ -123,19 +171,19 @@ class TestConfigDriveDataSource(MockerTestCase): 'swap': '/dev/vda3', } for name, dev_name in name_tests.items(): - with unit_helpers.mocker() as my_mock: - find_mock = my_mock.replace(util.find_devs_with, - spec=False, passthrough=False) - find_mock(mocker.ARGS) - my_mock.result([dev_name]) - exists_mock = my_mock.replace(os.path.exists, - spec=False, passthrough=False) - exists_mock(mocker.ARGS) - my_mock.result(True) - my_mock.replay() + with ExitStack() as mocks: + find_mock = mocks.enter_context( + mock.patch.object(util, 'find_devs_with', + return_value=[dev_name])) + exists_mock = mocks.enter_context( + mock.patch.object(os.path, 'exists', + return_value=True)) device = cfg_ds.device_name_to_device(name) self.assertEquals(dev_name, device) + find_mock.assert_called_once_with(mock.ANY) + exists_mock.assert_called_once_with(mock.ANY) + def test_dev_ec2_remap(self): populate_dir(self.tmp, CFG_DRIVE_FILES_V2) cfg_ds = ds.DataSourceConfigDrive(settings.CFG_BUILTIN, @@ -156,16 +204,21 @@ class TestConfigDriveDataSource(MockerTestCase): 'root2k': None, } for name, dev_name in name_tests.items(): - with unit_helpers.mocker(verify_calls=False) as my_mock: - exists_mock = my_mock.replace(os.path.exists, - spec=False, passthrough=False) - exists_mock(mocker.ARGS) - my_mock.result(False) - exists_mock(mocker.ARGS) - my_mock.result(True) - my_mock.replay() + # We want os.path.exists() to return False on its first call, + # and True on its second call. We use a handy generator as + # the mock side effect for this. The mocked function returns + # what the side effect returns. + def exists_side_effect(): + yield False + yield True + with mock.patch.object(os.path, 'exists', + side_effect=exists_side_effect()): device = cfg_ds.device_name_to_device(name) self.assertEquals(dev_name, device) + # We don't assert the call count for os.path.exists() because + # not all of the entries in name_tests results in two calls to + # that function. Specifically, 'root2k' doesn't seem to call + # it at all. def test_dev_ec2_map(self): populate_dir(self.tmp, CFG_DRIVE_FILES_V2) @@ -173,12 +226,6 @@ class TestConfigDriveDataSource(MockerTestCase): None, helpers.Paths({})) found = ds.read_config_drive(self.tmp) - exists_mock = self.mocker.replace(os.path.exists, - spec=False, passthrough=False) - exists_mock(mocker.ARGS) - self.mocker.count(0, None) - self.mocker.result(True) - self.mocker.replay() ec2_md = found['ec2-metadata'] os_md = found['metadata'] cfg_ds.ec2_metadata = ec2_md @@ -193,8 +240,9 @@ class TestConfigDriveDataSource(MockerTestCase): 'root2k': None, } for name, dev_name in name_tests.items(): - device = cfg_ds.device_name_to_device(name) - self.assertEquals(dev_name, device) + with mock.patch.object(os.path, 'exists', return_value=True): + device = cfg_ds.device_name_to_device(name) + self.assertEquals(dev_name, device) def test_dir_valid(self): """Verify a dir is read as such.""" @@ -209,6 +257,7 @@ class TestConfigDriveDataSource(MockerTestCase): self.assertEqual(USER_DATA, found['userdata']) self.assertEqual(expected_md, found['metadata']) + self.assertEqual(NETWORK_DATA, found['networkdata']) self.assertEqual(found['files']['/etc/foo.cfg'], CONTENT_0) self.assertEqual(found['files']['/etc/bar/bar.cfg'], CONTENT_1) @@ -234,6 +283,7 @@ class TestConfigDriveDataSource(MockerTestCase): data = copy(CFG_DRIVE_FILES_V2) data["openstack/2012-08-10/meta_data.json"] = "non-json garbage {}" + data["openstack/2015-10-15/meta_data.json"] = "non-json garbage {}" data["openstack/latest/meta_data.json"] = "non-json garbage {}" populate_dir(self.tmp, data) @@ -277,9 +327,8 @@ class TestConfigDriveDataSource(MockerTestCase): util.is_partition = my_is_partition devs_with_answers = {"TYPE=vfat": [], - "TYPE=iso9660": ["/dev/vdb"], - "LABEL=config-2": ["/dev/vdb"], - } + "TYPE=iso9660": ["/dev/vdb"], + "LABEL=config-2": ["/dev/vdb"]} self.assertEqual(["/dev/vdb"], ds.find_candidate_devs()) # add a vfat item @@ -290,9 +339,10 @@ class TestConfigDriveDataSource(MockerTestCase): # verify that partitions are considered, that have correct label. devs_with_answers = {"TYPE=vfat": ["/dev/sda1"], - "TYPE=iso9660": [], "LABEL=config-2": ["/dev/vdb3"]} + "TYPE=iso9660": [], + "LABEL=config-2": ["/dev/vdb3"]} self.assertEqual(["/dev/vdb3"], - ds.find_candidate_devs()) + ds.find_candidate_devs()) finally: util.find_devs_with = orig_find_devs_with @@ -303,7 +353,20 @@ class TestConfigDriveDataSource(MockerTestCase): populate_dir(self.tmp, CFG_DRIVE_FILES_V2) myds = cfg_ds_from_dir(self.tmp) self.assertEqual(myds.get_public_ssh_keys(), - [OSTACK_META['public_keys']['mykey']]) + [OSTACK_META['public_keys']['mykey']]) + + def test_network_data_is_found(self): + """Verify that network_data is present in ds in config-drive-v2.""" + populate_dir(self.tmp, CFG_DRIVE_FILES_V2) + myds = cfg_ds_from_dir(self.tmp) + self.assertEqual(myds.network_json, NETWORK_DATA) + + def test_network_config_is_converted(self): + """Verify that network_data is converted and present on ds object.""" + populate_dir(self.tmp, CFG_DRIVE_FILES_V2) + myds = cfg_ds_from_dir(self.tmp) + network_config = ds.convert_network_data(NETWORK_DATA) + self.assertEqual(myds.network_config, network_config) def cfg_ds_from_dir(seed_d): @@ -323,16 +386,22 @@ def populate_ds_from_read_config(cfg_ds, source, results): cfg_ds.ec2_metadata = results.get('ec2-metadata') cfg_ds.userdata_raw = results.get('userdata') cfg_ds.version = results.get('version') + cfg_ds.network_json = results.get('networkdata') + cfg_ds._network_config = ds.convert_network_data(cfg_ds.network_json) def populate_dir(seed_dir, files): - for (name, content) in files.iteritems(): + for (name, content) in files.items(): path = os.path.join(seed_dir, name) dirname = os.path.dirname(path) if not os.path.isdir(dirname): os.makedirs(dirname) - with open(path, "w") as fp: + if isinstance(content, six.text_type): + mode = "w" + else: + mode = "wb" + + with open(path, mode) as fp: fp.write(content) - fp.close() # vi: ts=4 expandtab diff --git a/tests/unittests/test_datasource/test_digitalocean.py b/tests/unittests/test_datasource/test_digitalocean.py index d1270fc2..679d1b82 100644 --- a/tests/unittests/test_datasource/test_digitalocean.py +++ b/tests/unittests/test_datasource/test_digitalocean.py @@ -15,11 +15,9 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. -import httpretty import re -from types import ListType -from urlparse import urlparse +from six.moves.urllib_parse import urlparse from cloudinit import settings from cloudinit import helpers @@ -27,6 +25,8 @@ from cloudinit.sources import DataSourceDigitalOcean from .. import helpers as test_helpers +httpretty = test_helpers.import_httpretty() + # Abbreviated for the test DO_INDEX = """id hostname @@ -110,7 +110,7 @@ class TestDataSourceDigitalOcean(test_helpers.HttprettyTestCase): self.assertEqual([DO_META.get('public-keys')], self.ds.get_public_ssh_keys()) - self.assertIs(type(self.ds.get_public_ssh_keys()), ListType) + self.assertIsInstance(self.ds.get_public_ssh_keys(), list) @httpretty.activate def test_multiple_ssh_keys(self): @@ -124,4 +124,4 @@ class TestDataSourceDigitalOcean(test_helpers.HttprettyTestCase): self.assertEqual(DO_META.get('public-keys').splitlines(), self.ds.get_public_ssh_keys()) - self.assertIs(type(self.ds.get_public_ssh_keys()), ListType) + self.assertIsInstance(self.ds.get_public_ssh_keys(), list) diff --git a/tests/unittests/test_datasource/test_gce.py b/tests/unittests/test_datasource/test_gce.py index 06050bb1..fa714070 100644 --- a/tests/unittests/test_datasource/test_gce.py +++ b/tests/unittests/test_datasource/test_gce.py @@ -15,11 +15,10 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. -import httpretty import re from base64 import b64encode, b64decode -from urlparse import urlparse +from six.moves.urllib_parse import urlparse from cloudinit import settings from cloudinit import helpers @@ -27,12 +26,14 @@ from cloudinit.sources import DataSourceGCE from .. import helpers as test_helpers +httpretty = test_helpers.import_httpretty() + GCE_META = { 'instance/id': '123', 'instance/zone': 'foo/bar', 'project/attributes/sshKeys': 'user:ssh-rsa AA2..+aRD0fyVw== root@server', 'instance/hostname': 'server.project-foo.local', - 'instance/attributes/user-data': '/bin/echo foo\n', + 'instance/attributes/user-data': b'/bin/echo foo\n', } GCE_META_PARTIAL = { @@ -45,7 +46,7 @@ GCE_META_ENCODING = { 'instance/id': '12345', 'instance/hostname': 'server.project-baz.local', 'instance/zone': 'baz/bang', - 'instance/attributes/user-data': b64encode('/bin/echo baz\n'), + 'instance/attributes/user-data': b64encode(b'/bin/echo baz\n'), 'instance/attributes/user-data-encoding': 'base64', } @@ -54,8 +55,8 @@ MD_URL_RE = re.compile( r'http://metadata.google.internal./computeMetadata/v1/.*') -def _new_request_callback(gce_meta=None): - if not gce_meta: +def _set_mock_metadata(gce_meta=None): + if gce_meta is None: gce_meta = GCE_META def _request_callback(method, uri, headers): @@ -69,9 +70,10 @@ def _new_request_callback(gce_meta=None): else: return (404, headers, '') - return _request_callback + httpretty.register_uri(httpretty.GET, MD_URL_RE, body=_request_callback) +@httpretty.activate class TestDataSourceGCE(test_helpers.HttprettyTestCase): def setUp(self): @@ -80,23 +82,16 @@ class TestDataSourceGCE(test_helpers.HttprettyTestCase): helpers.Paths({})) super(TestDataSourceGCE, self).setUp() - @httpretty.activate def test_connection(self): - httpretty.register_uri( - httpretty.GET, MD_URL_RE, - body=_new_request_callback()) - + _set_mock_metadata() success = self.ds.get_data() self.assertTrue(success) req_header = httpretty.last_request().headers self.assertDictContainsSubset(HEADERS, req_header) - @httpretty.activate def test_metadata(self): - httpretty.register_uri( - httpretty.GET, MD_URL_RE, - body=_new_request_callback()) + _set_mock_metadata() self.ds.get_data() shostname = GCE_META.get('instance/hostname').split('.')[0] @@ -106,22 +101,12 @@ class TestDataSourceGCE(test_helpers.HttprettyTestCase): self.assertEqual(GCE_META.get('instance/id'), self.ds.get_instance_id()) - self.assertEqual(GCE_META.get('instance/zone'), - self.ds.availability_zone) - self.assertEqual(GCE_META.get('instance/attributes/user-data'), self.ds.get_userdata_raw()) - # we expect a list of public ssh keys with user names stripped - self.assertEqual(['ssh-rsa AA2..+aRD0fyVw== root@server'], - self.ds.get_public_ssh_keys()) - # test partial metadata (missing user-data in particular) - @httpretty.activate def test_metadata_partial(self): - httpretty.register_uri( - httpretty.GET, MD_URL_RE, - body=_new_request_callback(GCE_META_PARTIAL)) + _set_mock_metadata(GCE_META_PARTIAL) self.ds.get_data() self.assertEqual(GCE_META_PARTIAL.get('instance/id'), @@ -130,13 +115,52 @@ class TestDataSourceGCE(test_helpers.HttprettyTestCase): shostname = GCE_META_PARTIAL.get('instance/hostname').split('.')[0] self.assertEqual(shostname, self.ds.get_hostname()) - @httpretty.activate def test_metadata_encoding(self): - httpretty.register_uri( - httpretty.GET, MD_URL_RE, - body=_new_request_callback(GCE_META_ENCODING)) + _set_mock_metadata(GCE_META_ENCODING) self.ds.get_data() decoded = b64decode( GCE_META_ENCODING.get('instance/attributes/user-data')) self.assertEqual(decoded, self.ds.get_userdata_raw()) + + def test_missing_required_keys_return_false(self): + for required_key in ['instance/id', 'instance/zone', + 'instance/hostname']: + meta = GCE_META_PARTIAL.copy() + del meta[required_key] + _set_mock_metadata(meta) + self.assertEqual(False, self.ds.get_data()) + httpretty.reset() + + def test_project_level_ssh_keys_are_used(self): + _set_mock_metadata() + self.ds.get_data() + + # we expect a list of public ssh keys with user names stripped + self.assertEqual(['ssh-rsa AA2..+aRD0fyVw== root@server'], + self.ds.get_public_ssh_keys()) + + def test_instance_level_ssh_keys_are_used(self): + key_content = 'ssh-rsa JustAUser root@server' + meta = GCE_META.copy() + meta['instance/attributes/sshKeys'] = 'user:{0}'.format(key_content) + + _set_mock_metadata(meta) + self.ds.get_data() + + self.assertIn(key_content, self.ds.get_public_ssh_keys()) + + def test_instance_level_keys_replace_project_level_keys(self): + key_content = 'ssh-rsa JustAUser root@server' + meta = GCE_META.copy() + meta['instance/attributes/sshKeys'] = 'user:{0}'.format(key_content) + + _set_mock_metadata(meta) + self.ds.get_data() + + self.assertEqual([key_content], self.ds.get_public_ssh_keys()) + + def test_only_last_part_of_zone_used_for_availability_zone(self): + _set_mock_metadata() + self.ds.get_data() + self.assertEqual('bar', self.ds.availability_zone) diff --git a/tests/unittests/test_datasource/test_maas.py b/tests/unittests/test_datasource/test_maas.py index c157beb8..77d15cac 100644 --- a/tests/unittests/test_datasource/test_maas.py +++ b/tests/unittests/test_datasource/test_maas.py @@ -1,27 +1,33 @@ from copy import copy import os +import shutil +import tempfile from cloudinit.sources import DataSourceMAAS from cloudinit import url_helper -from ..helpers import populate_dir +from ..helpers import TestCase, populate_dir -import mocker +try: + from unittest import mock +except ImportError: + import mock -class TestMAASDataSource(mocker.MockerTestCase): +class TestMAASDataSource(TestCase): def setUp(self): super(TestMAASDataSource, self).setUp() # Make a temp directoy for tests to use. - self.tmp = self.makeDir() + self.tmp = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, self.tmp) def test_seed_dir_valid(self): """Verify a valid seeddir is read as such.""" data = {'instance-id': 'i-valid01', - 'local-hostname': 'valid01-hostname', - 'user-data': 'valid01-userdata', - 'public-keys': 'ssh-rsa AAAAB3Nz...aC1yc2E= keyname'} + 'local-hostname': 'valid01-hostname', + 'user-data': b'valid01-userdata', + 'public-keys': 'ssh-rsa AAAAB3Nz...aC1yc2E= keyname'} my_d = os.path.join(self.tmp, "valid") populate_dir(my_d, data) @@ -39,8 +45,8 @@ class TestMAASDataSource(mocker.MockerTestCase): """Verify extra files do not affect seed_dir validity.""" data = {'instance-id': 'i-valid-extra', - 'local-hostname': 'valid-extra-hostname', - 'user-data': 'valid-extra-userdata', 'foo': 'bar'} + 'local-hostname': 'valid-extra-hostname', + 'user-data': b'valid-extra-userdata', 'foo': 'bar'} my_d = os.path.join(self.tmp, "valid_extra") populate_dir(my_d, data) @@ -58,7 +64,7 @@ class TestMAASDataSource(mocker.MockerTestCase): """Verify that invalid seed_dir raises MAASSeedDirMalformed.""" valid = {'instance-id': 'i-instanceid', - 'local-hostname': 'test-hostname', 'user-data': ''} + 'local-hostname': 'test-hostname', 'user-data': ''} my_based = os.path.join(self.tmp, "valid_extra") @@ -88,21 +94,23 @@ class TestMAASDataSource(mocker.MockerTestCase): def test_seed_dir_missing(self): """Verify that missing seed_dir raises MAASSeedDirNone.""" self.assertRaises(DataSourceMAAS.MAASSeedDirNone, - DataSourceMAAS.read_maas_seed_dir, - os.path.join(self.tmp, "nonexistantdirectory")) + DataSourceMAAS.read_maas_seed_dir, + os.path.join(self.tmp, "nonexistantdirectory")) def test_seed_url_valid(self): """Verify that valid seed_url is read as such.""" - valid = {'meta-data/instance-id': 'i-instanceid', + valid = { + 'meta-data/instance-id': 'i-instanceid', 'meta-data/local-hostname': 'test-hostname', 'meta-data/public-keys': 'test-hostname', - 'user-data': 'foodata'} + 'user-data': b'foodata', + } valid_order = [ 'meta-data/local-hostname', 'meta-data/instance-id', 'meta-data/public-keys', 'user-data', - ] + ] my_seed = "http://example.com/xmeta" my_ver = "1999-99-99" my_headers = {'header1': 'value1', 'header2': 'value2'} @@ -110,28 +118,38 @@ class TestMAASDataSource(mocker.MockerTestCase): def my_headers_cb(url): return my_headers - mock_request = self.mocker.replace(url_helper.readurl, - passthrough=False) - - for key in valid_order: - url = "%s/%s/%s" % (my_seed, my_ver, key) - mock_request(url, headers=None, timeout=mocker.ANY, - data=mocker.ANY, sec_between=mocker.ANY, - ssl_details=mocker.ANY, retries=mocker.ANY, - headers_cb=my_headers_cb, - exception_cb=mocker.ANY) - resp = valid.get(key) - self.mocker.result(url_helper.StringResponse(resp)) - self.mocker.replay() - - (userdata, metadata) = DataSourceMAAS.read_maas_seed_url(my_seed, - header_cb=my_headers_cb, version=my_ver) - - self.assertEqual("foodata", userdata) - self.assertEqual(metadata['instance-id'], - valid['meta-data/instance-id']) - self.assertEqual(metadata['local-hostname'], - valid['meta-data/local-hostname']) + # Each time url_helper.readurl() is called, something different is + # returned based on the canned data above. We need to build up a list + # of side effect return values, which the mock will return. At the + # same time, we'll build up a list of expected call arguments for + # asserting after the code under test is run. + calls = [] + + def side_effect(): + for key in valid_order: + resp = valid.get(key) + url = "%s/%s/%s" % (my_seed, my_ver, key) + calls.append( + mock.call(url, headers=None, timeout=mock.ANY, + data=mock.ANY, sec_between=mock.ANY, + ssl_details=mock.ANY, retries=mock.ANY, + headers_cb=my_headers_cb, + exception_cb=mock.ANY)) + yield url_helper.StringResponse(resp) + + # Now do the actual call of the code under test. + with mock.patch.object(url_helper, 'readurl', + side_effect=side_effect()) as mockobj: + userdata, metadata = DataSourceMAAS.read_maas_seed_url( + my_seed, version=my_ver) + + self.assertEqual(b"foodata", userdata) + self.assertEqual(metadata['instance-id'], + valid['meta-data/instance-id']) + self.assertEqual(metadata['local-hostname'], + valid['meta-data/local-hostname']) + + mockobj.has_calls(calls) def test_seed_url_invalid(self): """Verify that invalid seed_url raises MAASSeedDirMalformed.""" diff --git a/tests/unittests/test_datasource/test_nocloud.py b/tests/unittests/test_datasource/test_nocloud.py index e9235951..2d5fc37c 100644 --- a/tests/unittests/test_datasource/test_nocloud.py +++ b/tests/unittests/test_datasource/test_nocloud.py @@ -1,39 +1,43 @@ from cloudinit import helpers from cloudinit.sources import DataSourceNoCloud from cloudinit import util -from ..helpers import populate_dir +from ..helpers import TestCase, populate_dir -from mocker import MockerTestCase import os import yaml +import shutil +import tempfile +import unittest +try: + from unittest import mock +except ImportError: + import mock +try: + from contextlib import ExitStack +except ImportError: + from contextlib2 import ExitStack -class TestNoCloudDataSource(MockerTestCase): + +class TestNoCloudDataSource(TestCase): def setUp(self): - self.tmp = self.makeDir() + super(TestNoCloudDataSource, self).setUp() + self.tmp = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, self.tmp) self.paths = helpers.Paths({'cloud_dir': self.tmp}) self.cmdline = "root=TESTCMDLINE" - self.unapply = [] - self.apply_patches([(util, 'get_cmdline', self._getcmdline)]) - super(TestNoCloudDataSource, self).setUp() - - def tearDown(self): - apply_patches([i for i in reversed(self.unapply)]) - super(TestNoCloudDataSource, self).tearDown() + self.mocks = ExitStack() + self.addCleanup(self.mocks.close) - def apply_patches(self, patches): - ret = apply_patches(patches) - self.unapply += ret - - def _getcmdline(self): - return self.cmdline + self.mocks.enter_context( + mock.patch.object(util, 'get_cmdline', return_value=self.cmdline)) def test_nocloud_seed_dir(self): md = {'instance-id': 'IID', 'dsmode': 'local'} - ud = "USER_DATA_HERE" + ud = b"USER_DATA_HERE" populate_dir(os.path.join(self.paths.seed_dir, "nocloud"), {'user-data': ud, 'meta-data': yaml.safe_dump(md)}) @@ -59,7 +63,9 @@ class TestNoCloudDataSource(MockerTestCase): def my_find_devs_with(*args, **kwargs): raise PsuedoException - self.apply_patches([(util, 'find_devs_with', my_find_devs_with)]) + self.mocks.enter_context( + mock.patch.object(util, 'find_devs_with', + side_effect=PsuedoException)) # by default, NoCloud should search for filesystems by label sys_cfg = {'datasource': {'NoCloud': {}}} @@ -85,21 +91,21 @@ class TestNoCloudDataSource(MockerTestCase): data = { 'fs_label': None, - 'meta-data': {'instance-id': 'IID'}, - 'user-data': "USER_DATA_RAW", + 'meta-data': yaml.safe_dump({'instance-id': 'IID'}), + 'user-data': b"USER_DATA_RAW", } sys_cfg = {'datasource': {'NoCloud': data}} dsrc = ds(sys_cfg=sys_cfg, distro=None, paths=self.paths) ret = dsrc.get_data() - self.assertEqual(dsrc.userdata_raw, "USER_DATA_RAW") + self.assertEqual(dsrc.userdata_raw, b"USER_DATA_RAW") self.assertEqual(dsrc.metadata.get('instance-id'), 'IID') self.assertTrue(ret) def test_nocloud_seed_with_vendordata(self): md = {'instance-id': 'IID', 'dsmode': 'local'} - ud = "USER_DATA_HERE" - vd = "THIS IS MY VENDOR_DATA" + ud = b"USER_DATA_HERE" + vd = b"THIS IS MY VENDOR_DATA" populate_dir(os.path.join(self.paths.seed_dir, "nocloud"), {'user-data': ud, 'meta-data': yaml.safe_dump(md), @@ -115,12 +121,12 @@ class TestNoCloudDataSource(MockerTestCase): ret = dsrc.get_data() self.assertEqual(dsrc.userdata_raw, ud) self.assertEqual(dsrc.metadata, md) - self.assertEqual(dsrc.vendordata, vd) + self.assertEqual(dsrc.vendordata_raw, vd) self.assertTrue(ret) def test_nocloud_no_vendordata(self): populate_dir(os.path.join(self.paths.seed_dir, "nocloud"), - {'user-data': "ud", 'meta-data': "instance-id: IID\n"}) + {'user-data': b"ud", 'meta-data': "instance-id: IID\n"}) sys_cfg = {'datasource': {'NoCloud': {'fs_label': None}}} @@ -128,12 +134,12 @@ class TestNoCloudDataSource(MockerTestCase): dsrc = ds(sys_cfg=sys_cfg, distro=None, paths=self.paths) ret = dsrc.get_data() - self.assertEqual(dsrc.userdata_raw, "ud") + self.assertEqual(dsrc.userdata_raw, b"ud") self.assertFalse(dsrc.vendordata) self.assertTrue(ret) -class TestParseCommandLineData(MockerTestCase): +class TestParseCommandLineData(unittest.TestCase): def test_parse_cmdline_data_valid(self): ds_id = "ds=nocloud" @@ -178,15 +184,4 @@ class TestParseCommandLineData(MockerTestCase): self.assertFalse(ret) -def apply_patches(patches): - ret = [] - for (ref, name, replace) in patches: - if replace is None: - continue - orig = getattr(ref, name) - setattr(ref, name, replace) - ret.append((ref, name, orig)) - return ret - - # vi: ts=4 expandtab diff --git a/tests/unittests/test_datasource/test_opennebula.py b/tests/unittests/test_datasource/test_opennebula.py index b4fd1f4d..d796f030 100644 --- a/tests/unittests/test_datasource/test_opennebula.py +++ b/tests/unittests/test_datasource/test_opennebula.py @@ -1,12 +1,14 @@ from cloudinit import helpers from cloudinit.sources import DataSourceOpenNebula as ds from cloudinit import util -from mocker import MockerTestCase -from ..helpers import populate_dir +from ..helpers import TestCase, populate_dir -from base64 import b64encode import os import pwd +import shutil +import tempfile +import unittest + TEST_VARS = { 'VAR1': 'single', @@ -18,7 +20,7 @@ TEST_VARS = { 'VAR7': 'single\\t', 'VAR8': 'double\\tword', 'VAR9': 'multi\\t\nline\n', - 'VAR10': '\\', # expect \ + 'VAR10': '\\', # expect '\' 'VAR11': '\'', # expect ' 'VAR12': '$', # expect $ } @@ -37,12 +39,13 @@ CMD_IP_OUT = '''\ ''' -class TestOpenNebulaDataSource(MockerTestCase): +class TestOpenNebulaDataSource(TestCase): parsed_user = None def setUp(self): super(TestOpenNebulaDataSource, self).setUp() - self.tmp = self.makeDir() + self.tmp = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, self.tmp) self.paths = helpers.Paths({'cloud_dir': self.tmp}) # defaults for few tests @@ -176,7 +179,7 @@ class TestOpenNebulaDataSource(MockerTestCase): self.assertEqual(USER_DATA, results['userdata']) def test_user_data_encoding_required_for_decode(self): - b64userdata = b64encode(USER_DATA) + b64userdata = util.b64e(USER_DATA) for k in ('USER_DATA', 'USERDATA'): my_d = os.path.join(self.tmp, k) populate_context_dir(my_d, {k: b64userdata}) @@ -188,7 +191,7 @@ class TestOpenNebulaDataSource(MockerTestCase): def test_user_data_base64_encoding(self): for k in ('USER_DATA', 'USERDATA'): my_d = os.path.join(self.tmp, k) - populate_context_dir(my_d, {k: b64encode(USER_DATA), + populate_context_dir(my_d, {k: util.b64e(USER_DATA), 'USERDATA_ENCODING': 'base64'}) results = ds.read_context_disk_dir(my_d) @@ -228,7 +231,7 @@ class TestOpenNebulaDataSource(MockerTestCase): util.find_devs_with = orig_find_devs_with -class TestOpenNebulaNetwork(MockerTestCase): +class TestOpenNebulaNetwork(unittest.TestCase): def setUp(self): super(TestOpenNebulaNetwork, self).setUp() @@ -280,7 +283,7 @@ iface eth0 inet static ''') -class TestParseShellConfig(MockerTestCase): +class TestParseShellConfig(unittest.TestCase): def test_no_seconds(self): cfg = '\n'.join(["foo=bar", "SECONDS=2", "xx=foo"]) # we could test 'sleep 2', but that would make the test run slower. @@ -290,7 +293,7 @@ class TestParseShellConfig(MockerTestCase): def populate_context_dir(path, variables): data = "# Context variables generated by OpenNebula\n" - for (k, v) in variables.iteritems(): + for k, v in variables.items(): data += ("%s='%s'\n" % (k.upper(), v.replace(r"'", r"'\''"))) populate_dir(path, {'context.sh': data}) diff --git a/tests/unittests/test_datasource/test_openstack.py b/tests/unittests/test_datasource/test_openstack.py index 49894e51..0aa1ba84 100644 --- a/tests/unittests/test_datasource/test_openstack.py +++ b/tests/unittests/test_datasource/test_openstack.py @@ -20,19 +20,18 @@ import copy import json import re -from StringIO import StringIO - -from urlparse import urlparse - from .. import helpers as test_helpers +from six import StringIO +from six.moves.urllib.parse import urlparse + from cloudinit import helpers from cloudinit import settings from cloudinit.sources import DataSourceOpenStack as ds from cloudinit.sources.helpers import openstack from cloudinit import util -import httpretty as hp +hp = test_helpers.import_httpretty() BASE_URL = "http://169.254.169.254" PUBKEY = u'ssh-rsa AAAAB3NzaC1....sIkJhq8wdX+4I3A4cYbYP ubuntu@server-460\n' @@ -50,7 +49,7 @@ EC2_META = { 'public-ipv4': '0.0.0.1', 'reservation-id': 'r-iru5qm4m', } -USER_DATA = '#!/bin/sh\necho This is user data\n' +USER_DATA = b'#!/bin/sh\necho This is user data\n' VENDOR_DATA = { 'magic': '', } @@ -64,8 +63,8 @@ OSTACK_META = { 'public_keys': {'mykey': PUBKEY}, 'uuid': 'b0fa911b-69d4-4476-bbe2-1c92bff6535c', } -CONTENT_0 = 'This is contents of /etc/foo.cfg\n' -CONTENT_1 = '# this is /etc/bar/bar.cfg\n' +CONTENT_0 = b'This is contents of /etc/foo.cfg\n' +CONTENT_1 = b'# this is /etc/bar/bar.cfg\n' OS_FILES = { 'openstack/latest/meta_data.json': json.dumps(OSTACK_META), 'openstack/latest/user_data': USER_DATA, diff --git a/tests/unittests/test_datasource/test_smartos.py b/tests/unittests/test_datasource/test_smartos.py index 65675106..5c49966a 100644 --- a/tests/unittests/test_datasource/test_smartos.py +++ b/tests/unittests/test_datasource/test_smartos.py @@ -22,15 +22,30 @@ # return responses. # -import base64 -from cloudinit import helpers as c_helpers -from cloudinit.sources import DataSourceSmartOS -from .. import helpers +from __future__ import print_function + import os import os.path import re +import shutil import stat +import tempfile import uuid +from binascii import crc32 + +import serial +import six + +from cloudinit import helpers as c_helpers +from cloudinit.sources import DataSourceSmartOS +from cloudinit.util import b64e + +from .. import helpers + +try: + from unittest import mock +except ImportError: + import mock MOCK_RETURNS = { 'hostname': 'test-host', @@ -41,77 +56,34 @@ MOCK_RETURNS = { 'cloud-init:user-data': '\n'.join(['#!/bin/sh', '/bin/true', '']), 'sdc:datacenter_name': 'somewhere2', 'sdc:operator-script': '\n'.join(['bin/true', '']), + 'sdc:uuid': str(uuid.uuid4()), 'sdc:vendor-data': '\n'.join(['VENDOR_DATA', '']), 'user-data': '\n'.join(['something', '']), 'user-script': '\n'.join(['/bin/true', '']), } -DMI_DATA_RETURN = (str(uuid.uuid4()), 'smartdc') - - -class MockSerial(object): - """Fake a serial terminal for testing the code that - interfaces with the serial""" - - port = None - - def __init__(self, mockdata): - self.last = None - self.last = None - self.new = True - self.count = 0 - self.mocked_out = [] - self.mockdata = mockdata - - def open(self): - return True - - def close(self): - return True - - def isOpen(self): - return True +DMI_DATA_RETURN = 'smartdc' - def write(self, line): - line = line.replace('GET ', '') - self.last = line.rstrip() - def readline(self): - if self.new: - self.new = False - if self.last in self.mockdata: - return 'SUCCESS\n' - else: - return 'NOTFOUND %s\n' % self.last - - if self.last in self.mockdata: - if not self.mocked_out: - self.mocked_out = [x for x in self._format_out()] - - if len(self.mocked_out) > self.count: - self.count += 1 - return self.mocked_out[self.count - 1] +def get_mock_client(mockdata): + class MockMetadataClient(object): - def _format_out(self): - if self.last in self.mockdata: - _mret = self.mockdata[self.last] - try: - for l in _mret.splitlines(): - yield "%s\n" % l.rstrip() - except: - yield "%s\n" % _mret.rstrip() + def __init__(self, serial): + pass - yield '.' - yield '\n' + def get_metadata(self, metadata_key): + return mockdata.get(metadata_key) + return MockMetadataClient class TestSmartOSDataSource(helpers.FilesystemMockingTestCase): def setUp(self): - helpers.FilesystemMockingTestCase.setUp(self) + super(TestSmartOSDataSource, self).setUp() - # makeDir comes from MockerTestCase - self.tmp = self.makeDir() - self.legacy_user_d = self.makeDir() + self.tmp = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, self.tmp) + self.legacy_user_d = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, self.legacy_user_d) # If you should want to watch the logs... self._log = None @@ -140,7 +112,8 @@ class TestSmartOSDataSource(helpers.FilesystemMockingTestCase): ret = apply_patches(patches) self.unapply += ret - def _get_ds(self, sys_cfg=None, ds_cfg=None, mockdata=None, dmi_data=None): + def _get_ds(self, sys_cfg=None, ds_cfg=None, mockdata=None, dmi_data=None, + is_lxbrand=False): mod = DataSourceSmartOS if mockdata is None: @@ -149,16 +122,17 @@ class TestSmartOSDataSource(helpers.FilesystemMockingTestCase): if dmi_data is None: dmi_data = DMI_DATA_RETURN - def _get_serial(*_): - return MockSerial(mockdata) - def _dmi_data(): return dmi_data def _os_uname(): - # LP: #1243287. tests assume this runs, but running test on - # arm would cause them all to fail. - return ('LINUX', 'NODENAME', 'RELEASE', 'VERSION', 'x86_64') + if not is_lxbrand: + # LP: #1243287. tests assume this runs, but running test on + # arm would cause them all to fail. + return ('LINUX', 'NODENAME', 'RELEASE', 'VERSION', 'x86_64') + else: + return ('LINUX', 'NODENAME', 'RELEASE', 'BRANDZ VIRTUAL LINUX', + 'X86_64') if sys_cfg is None: sys_cfg = {} @@ -168,12 +142,14 @@ class TestSmartOSDataSource(helpers.FilesystemMockingTestCase): sys_cfg['datasource']['SmartOS'] = ds_cfg self.apply_patches([(mod, 'LEGACY_USER_D', self.legacy_user_d)]) - self.apply_patches([(mod, 'get_serial', _get_serial)]) + self.apply_patches([ + (mod, 'JoyentMetadataClient', get_mock_client(mockdata))]) self.apply_patches([(mod, 'dmi_data', _dmi_data)]) self.apply_patches([(os, 'uname', _os_uname)]) self.apply_patches([(mod, 'device_exists', lambda d: True)]) dsrc = mod.DataSourceSmartOS(sys_cfg, distro=None, paths=self.paths) + self.apply_patches([(dsrc, '_get_seed_file_object', mock.MagicMock())]) return dsrc def test_seed(self): @@ -181,14 +157,29 @@ class TestSmartOSDataSource(helpers.FilesystemMockingTestCase): dsrc = self._get_ds() ret = dsrc.get_data() self.assertTrue(ret) + self.assertEquals('kvm', dsrc.smartos_type) self.assertEquals('/dev/ttyS1', dsrc.seed) + def test_seed_lxbrand(self): + # default seed should be /dev/ttyS1 + dsrc = self._get_ds(is_lxbrand=True) + ret = dsrc.get_data() + self.assertTrue(ret) + self.assertEquals('lx-brand', dsrc.smartos_type) + self.assertEquals('/native/.zonecontrol/metadata.sock', dsrc.seed) + def test_issmartdc(self): dsrc = self._get_ds() ret = dsrc.get_data() self.assertTrue(ret) self.assertTrue(dsrc.is_smartdc) + def test_issmartdc_lxbrand(self): + dsrc = self._get_ds(is_lxbrand=True) + ret = dsrc.get_data() + self.assertTrue(ret) + self.assertTrue(dsrc.is_smartdc) + def test_no_base64(self): ds_cfg = {'no_base64_decode': ['test_var1'], 'all_base': True} dsrc = self._get_ds(ds_cfg=ds_cfg) @@ -199,7 +190,8 @@ class TestSmartOSDataSource(helpers.FilesystemMockingTestCase): dsrc = self._get_ds(mockdata=MOCK_RETURNS) ret = dsrc.get_data() self.assertTrue(ret) - self.assertEquals(DMI_DATA_RETURN[0], dsrc.metadata['instance-id']) + self.assertEquals(MOCK_RETURNS['sdc:uuid'], + dsrc.metadata['instance-id']) def test_root_keys(self): dsrc = self._get_ds(mockdata=MOCK_RETURNS) @@ -227,7 +219,7 @@ class TestSmartOSDataSource(helpers.FilesystemMockingTestCase): my_returns = MOCK_RETURNS.copy() my_returns['base64_all'] = "true" for k in ('hostname', 'cloud-init:user-data'): - my_returns[k] = base64.b64encode(my_returns[k]) + my_returns[k] = b64e(my_returns[k]) dsrc = self._get_ds(mockdata=my_returns) ret = dsrc.get_data() @@ -248,7 +240,7 @@ class TestSmartOSDataSource(helpers.FilesystemMockingTestCase): my_returns['b64-cloud-init:user-data'] = "true" my_returns['b64-hostname'] = "true" for k in ('hostname', 'cloud-init:user-data'): - my_returns[k] = base64.b64encode(my_returns[k]) + my_returns[k] = b64e(my_returns[k]) dsrc = self._get_ds(mockdata=my_returns) ret = dsrc.get_data() @@ -264,7 +256,7 @@ class TestSmartOSDataSource(helpers.FilesystemMockingTestCase): my_returns = MOCK_RETURNS.copy() my_returns['base64_keys'] = 'hostname,ignored' for k in ('hostname',): - my_returns[k] = base64.b64encode(my_returns[k]) + my_returns[k] = b64e(my_returns[k]) dsrc = self._get_ds(mockdata=my_returns) ret = dsrc.get_data() @@ -365,7 +357,7 @@ class TestSmartOSDataSource(helpers.FilesystemMockingTestCase): permissions = oct(os.stat(name_f)[stat.ST_MODE])[-3:] if re.match(r'.*\/mdata-user-data$', name_f): found_new = True - print name_f + print(name_f) self.assertEquals(permissions, '400') self.assertFalse(found_new) @@ -447,3 +439,147 @@ def apply_patches(patches): setattr(ref, name, replace) ret.append((ref, name, orig)) return ret + + +class TestJoyentMetadataClient(helpers.FilesystemMockingTestCase): + + def setUp(self): + super(TestJoyentMetadataClient, self).setUp() + self.serial = mock.MagicMock(spec=serial.Serial) + self.request_id = 0xabcdef12 + self.metadata_value = 'value' + self.response_parts = { + 'command': 'SUCCESS', + 'crc': 'b5a9ff00', + 'length': 17 + len(b64e(self.metadata_value)), + 'payload': b64e(self.metadata_value), + 'request_id': '{0:08x}'.format(self.request_id), + } + + def make_response(): + payloadstr = '' + if 'payload' in self.response_parts: + payloadstr = ' {0}'.format(self.response_parts['payload']) + return ('V2 {length} {crc} {request_id} ' + '{command}{payloadstr}\n'.format( + payloadstr=payloadstr, + **self.response_parts).encode('ascii')) + + self.metasource_data = None + + def read_response(length): + if not self.metasource_data: + self.metasource_data = make_response() + self.metasource_data_len = len(self.metasource_data) + resp = self.metasource_data[:length] + self.metasource_data = self.metasource_data[length:] + return resp + + self.serial.read.side_effect = read_response + self.patched_funcs.enter_context( + mock.patch('cloudinit.sources.DataSourceSmartOS.random.randint', + mock.Mock(return_value=self.request_id))) + + def _get_client(self): + return DataSourceSmartOS.JoyentMetadataClient(self.serial) + + def assertEndsWith(self, haystack, prefix): + self.assertTrue(haystack.endswith(prefix), + "{0} does not end with '{1}'".format( + repr(haystack), prefix)) + + def assertStartsWith(self, haystack, prefix): + self.assertTrue(haystack.startswith(prefix), + "{0} does not start with '{1}'".format( + repr(haystack), prefix)) + + def test_get_metadata_writes_a_single_line(self): + client = self._get_client() + client.get_metadata('some_key') + self.assertEqual(1, self.serial.write.call_count) + written_line = self.serial.write.call_args[0][0] + print(type(written_line)) + self.assertEndsWith(written_line.decode('ascii'), + b'\n'.decode('ascii')) + self.assertEqual(1, written_line.count(b'\n')) + + def _get_written_line(self, key='some_key'): + client = self._get_client() + client.get_metadata(key) + return self.serial.write.call_args[0][0] + + def test_get_metadata_writes_bytes(self): + self.assertIsInstance(self._get_written_line(), six.binary_type) + + def test_get_metadata_line_starts_with_v2(self): + foo = self._get_written_line() + self.assertStartsWith(foo.decode('ascii'), b'V2'.decode('ascii')) + + def test_get_metadata_uses_get_command(self): + parts = self._get_written_line().decode('ascii').strip().split(' ') + self.assertEqual('GET', parts[4]) + + def test_get_metadata_base64_encodes_argument(self): + key = 'my_key' + parts = self._get_written_line(key).decode('ascii').strip().split(' ') + self.assertEqual(b64e(key), parts[5]) + + def test_get_metadata_calculates_length_correctly(self): + parts = self._get_written_line().decode('ascii').strip().split(' ') + expected_length = len(' '.join(parts[3:])) + self.assertEqual(expected_length, int(parts[1])) + + def test_get_metadata_uses_appropriate_request_id(self): + parts = self._get_written_line().decode('ascii').strip().split(' ') + request_id = parts[3] + self.assertEqual(8, len(request_id)) + self.assertEqual(request_id, request_id.lower()) + + def test_get_metadata_uses_random_number_for_request_id(self): + line = self._get_written_line() + request_id = line.decode('ascii').strip().split(' ')[3] + self.assertEqual('{0:08x}'.format(self.request_id), request_id) + + def test_get_metadata_checksums_correctly(self): + parts = self._get_written_line().decode('ascii').strip().split(' ') + expected_checksum = '{0:08x}'.format( + crc32(' '.join(parts[3:]).encode('utf-8')) & 0xffffffff) + checksum = parts[2] + self.assertEqual(expected_checksum, checksum) + + def test_get_metadata_reads_a_line(self): + client = self._get_client() + client.get_metadata('some_key') + self.assertEqual(self.metasource_data_len, self.serial.read.call_count) + + def test_get_metadata_returns_valid_value(self): + client = self._get_client() + value = client.get_metadata('some_key') + self.assertEqual(self.metadata_value, value) + + def test_get_metadata_throws_exception_for_incorrect_length(self): + self.response_parts['length'] = 0 + client = self._get_client() + self.assertRaises(DataSourceSmartOS.JoyentMetadataFetchException, + client.get_metadata, 'some_key') + + def test_get_metadata_throws_exception_for_incorrect_crc(self): + self.response_parts['crc'] = 'deadbeef' + client = self._get_client() + self.assertRaises(DataSourceSmartOS.JoyentMetadataFetchException, + client.get_metadata, 'some_key') + + def test_get_metadata_throws_exception_for_request_id_mismatch(self): + self.response_parts['request_id'] = 'deadbeef' + client = self._get_client() + client._checksum = lambda _: self.response_parts['crc'] + self.assertRaises(DataSourceSmartOS.JoyentMetadataFetchException, + client.get_metadata, 'some_key') + + def test_get_metadata_returns_None_if_value_not_found(self): + self.response_parts['payload'] = '' + self.response_parts['command'] = 'NOTFOUND' + self.response_parts['length'] = 17 + client = self._get_client() + client._checksum = lambda _: self.response_parts['crc'] + self.assertIsNone(client.get_metadata('some_key')) diff --git a/tests/unittests/test_distros/test_generic.py b/tests/unittests/test_distros/test_generic.py index db6aa0e8..6ed1704c 100644 --- a/tests/unittests/test_distros/test_generic.py +++ b/tests/unittests/test_distros/test_generic.py @@ -4,6 +4,13 @@ from cloudinit import util from .. import helpers import os +import shutil +import tempfile + +try: + from unittest import mock +except ImportError: + import mock unknown_arch_info = { 'arches': ['default'], @@ -53,7 +60,8 @@ class TestGenericDistro(helpers.FilesystemMockingTestCase): def setUp(self): super(TestGenericDistro, self).setUp() # Make a temp directoy for tests to use. - self.tmp = self.makeDir() + self.tmp = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, self.tmp) def _write_load_sudoers(self, _user, rules): cls = distros.fetch("ubuntu") @@ -64,7 +72,6 @@ class TestGenericDistro(helpers.FilesystemMockingTestCase): self.patchUtils(self.tmp) d.write_sudo_rules("harlowja", rules) contents = util.load_file(d.ci_sudoers_fn) - self.restore() return contents def _count_in(self, lines_look_for, text_content): @@ -142,33 +149,35 @@ class TestGenericDistro(helpers.FilesystemMockingTestCase): def test_get_package_mirror_info_az_ec2(self): arch_mirrors = gapmi(package_mirrors, arch="amd64") + data_source_mock = mock.Mock(availability_zone="us-east-1a") - results = gpmi(arch_mirrors, availability_zone="us-east-1a", + results = gpmi(arch_mirrors, data_source=data_source_mock, mirror_filter=self.return_first) self.assertEqual(results, {'primary': 'http://us-east-1.ec2/', 'security': 'http://security-mirror1-intel'}) - results = gpmi(arch_mirrors, availability_zone="us-east-1a", + results = gpmi(arch_mirrors, data_source=data_source_mock, mirror_filter=self.return_second) self.assertEqual(results, {'primary': 'http://us-east-1a.clouds/', 'security': 'http://security-mirror2-intel'}) - results = gpmi(arch_mirrors, availability_zone="us-east-1a", + results = gpmi(arch_mirrors, data_source=data_source_mock, mirror_filter=self.return_none) self.assertEqual(results, package_mirrors[0]['failsafe']) def test_get_package_mirror_info_az_non_ec2(self): arch_mirrors = gapmi(package_mirrors, arch="amd64") + data_source_mock = mock.Mock(availability_zone="nova.cloudvendor") - results = gpmi(arch_mirrors, availability_zone="nova.cloudvendor", + results = gpmi(arch_mirrors, data_source=data_source_mock, mirror_filter=self.return_first) self.assertEqual(results, {'primary': 'http://nova.cloudvendor.clouds/', 'security': 'http://security-mirror1-intel'}) - results = gpmi(arch_mirrors, availability_zone="nova.cloudvendor", + results = gpmi(arch_mirrors, data_source=data_source_mock, mirror_filter=self.return_last) self.assertEqual(results, {'primary': 'http://nova.cloudvendor.clouds/', @@ -176,22 +185,46 @@ class TestGenericDistro(helpers.FilesystemMockingTestCase): def test_get_package_mirror_info_none(self): arch_mirrors = gapmi(package_mirrors, arch="amd64") + data_source_mock = mock.Mock(availability_zone=None) # because both search entries here replacement based on # availability-zone, the filter will be called with an empty list and # failsafe should be taken. - results = gpmi(arch_mirrors, availability_zone=None, + results = gpmi(arch_mirrors, data_source=data_source_mock, mirror_filter=self.return_first) self.assertEqual(results, {'primary': 'http://fs-primary-intel', 'security': 'http://security-mirror1-intel'}) - results = gpmi(arch_mirrors, availability_zone=None, + results = gpmi(arch_mirrors, data_source=data_source_mock, mirror_filter=self.return_last) self.assertEqual(results, {'primary': 'http://fs-primary-intel', 'security': 'http://security-mirror2-intel'}) + def test_systemd_in_use(self): + cls = distros.fetch("ubuntu") + d = cls("ubuntu", {}, None) + self.patchOS(self.tmp) + self.patchUtils(self.tmp) + os.makedirs('/run/systemd/system') + self.assertTrue(d.uses_systemd()) + + def test_systemd_not_in_use(self): + cls = distros.fetch("ubuntu") + d = cls("ubuntu", {}, None) + self.patchOS(self.tmp) + self.patchUtils(self.tmp) + self.assertFalse(d.uses_systemd()) + + def test_systemd_symlink(self): + cls = distros.fetch("ubuntu") + d = cls("ubuntu", {}, None) + self.patchOS(self.tmp) + self.patchUtils(self.tmp) + os.makedirs('/run/systemd') + os.symlink('/', '/run/systemd/system') + self.assertFalse(d.uses_systemd()) # def _get_package_mirror_info(mirror_info, availability_zone=None, # mirror_filter=util.search_for_mirror): diff --git a/tests/unittests/test_distros/test_hostname.py b/tests/unittests/test_distros/test_hostname.py index 8e644f4d..143e29a9 100644 --- a/tests/unittests/test_distros/test_hostname.py +++ b/tests/unittests/test_distros/test_hostname.py @@ -1,4 +1,4 @@ -from mocker import MockerTestCase +import unittest from cloudinit.distros.parsers import hostname @@ -12,7 +12,7 @@ blahblah BASE_HOSTNAME = BASE_HOSTNAME.strip() -class TestHostnameHelper(MockerTestCase): +class TestHostnameHelper(unittest.TestCase): def test_parse_same(self): hn = hostname.HostnameConf(BASE_HOSTNAME) self.assertEquals(str(hn).strip(), BASE_HOSTNAME) diff --git a/tests/unittests/test_distros/test_hosts.py b/tests/unittests/test_distros/test_hosts.py index 687a0dab..fc701eaa 100644 --- a/tests/unittests/test_distros/test_hosts.py +++ b/tests/unittests/test_distros/test_hosts.py @@ -1,4 +1,4 @@ -from mocker import MockerTestCase +import unittest from cloudinit.distros.parsers import hosts @@ -14,7 +14,7 @@ BASE_ETC = ''' BASE_ETC = BASE_ETC.strip() -class TestHostsHelper(MockerTestCase): +class TestHostsHelper(unittest.TestCase): def test_parse(self): eh = hosts.HostsConf(BASE_ETC) self.assertEquals(eh.get_entry('127.0.0.1'), [['localhost']]) diff --git a/tests/unittests/test_distros/test_netconfig.py b/tests/unittests/test_distros/test_netconfig.py index 193338e8..2c2a424d 100644 --- a/tests/unittests/test_distros/test_netconfig.py +++ b/tests/unittests/test_distros/test_netconfig.py @@ -1,8 +1,16 @@ -from mocker import MockerTestCase +import os -import mocker +try: + from unittest import mock +except ImportError: + import mock +try: + from contextlib import ExitStack +except ImportError: + from contextlib2 import ExitStack -import os +from six import StringIO +from ..helpers import TestCase from cloudinit import distros from cloudinit import helpers @@ -11,8 +19,6 @@ from cloudinit import util from cloudinit.distros.parsers.sys_conf import SysConf -from StringIO import StringIO - BASE_NET_CFG = ''' auto lo @@ -74,7 +80,7 @@ class WriteBuffer(object): return self.buffer.getvalue() -class TestNetCfgDistro(MockerTestCase): +class TestNetCfgDistro(TestCase): def _get_distro(self, dname): cls = distros.fetch(dname) @@ -85,34 +91,29 @@ class TestNetCfgDistro(MockerTestCase): def test_simple_write_ub(self): ub_distro = self._get_distro('ubuntu') - util_mock = self.mocker.replace(util.write_file, - spec=False, passthrough=False) - exists_mock = self.mocker.replace(os.path.isfile, - spec=False, passthrough=False) - - exists_mock(mocker.ARGS) - self.mocker.count(0, None) - self.mocker.result(False) - - write_bufs = {} - - def replace_write(filename, content, mode=0644, omode="wb"): - buf = WriteBuffer() - buf.mode = mode - buf.omode = omode - buf.write(content) - write_bufs[filename] = buf - - util_mock(mocker.ARGS) - self.mocker.call(replace_write) - self.mocker.replay() - ub_distro.apply_network(BASE_NET_CFG, False) - - self.assertEquals(len(write_bufs), 1) - self.assertIn('/etc/network/interfaces', write_bufs) - write_buf = write_bufs['/etc/network/interfaces'] - self.assertEquals(str(write_buf).strip(), BASE_NET_CFG.strip()) - self.assertEquals(write_buf.mode, 0644) + with ExitStack() as mocks: + write_bufs = {} + + def replace_write(filename, content, mode=0o644, omode="wb"): + buf = WriteBuffer() + buf.mode = mode + buf.omode = omode + buf.write(content) + write_bufs[filename] = buf + + mocks.enter_context( + mock.patch.object(util, 'write_file', replace_write)) + mocks.enter_context( + mock.patch.object(os.path, 'isfile', return_value=False)) + + ub_distro.apply_network(BASE_NET_CFG, False) + + self.assertEquals(len(write_bufs), 1) + eni_name = '/etc/network/interfaces.d/50-cloud-init.cfg' + self.assertIn(eni_name, write_bufs) + write_buf = write_bufs[eni_name] + self.assertEquals(str(write_buf).strip(), BASE_NET_CFG.strip()) + self.assertEquals(write_buf.mode, 0o644) def assertCfgEquals(self, blob1, blob2): b1 = dict(SysConf(blob1.strip().splitlines())) @@ -127,53 +128,41 @@ class TestNetCfgDistro(MockerTestCase): def test_simple_write_rh(self): rh_distro = self._get_distro('rhel') - write_mock = self.mocker.replace(util.write_file, - spec=False, passthrough=False) - load_mock = self.mocker.replace(util.load_file, - spec=False, passthrough=False) - exists_mock = self.mocker.replace(os.path.isfile, - spec=False, passthrough=False) write_bufs = {} - def replace_write(filename, content, mode=0644, omode="wb"): + def replace_write(filename, content, mode=0o644, omode="wb"): buf = WriteBuffer() buf.mode = mode buf.omode = omode buf.write(content) write_bufs[filename] = buf - exists_mock(mocker.ARGS) - self.mocker.count(0, None) - self.mocker.result(False) - - load_mock(mocker.ARGS) - self.mocker.count(0, None) - self.mocker.result('') - - for _i in range(0, 3): - write_mock(mocker.ARGS) - self.mocker.call(replace_write) - - write_mock(mocker.ARGS) - self.mocker.call(replace_write) - - self.mocker.replay() - rh_distro.apply_network(BASE_NET_CFG, False) - - self.assertEquals(len(write_bufs), 4) - self.assertIn('/etc/sysconfig/network-scripts/ifcfg-lo', write_bufs) - write_buf = write_bufs['/etc/sysconfig/network-scripts/ifcfg-lo'] - expected_buf = ''' + with ExitStack() as mocks: + mocks.enter_context( + mock.patch.object(util, 'write_file', replace_write)) + mocks.enter_context( + mock.patch.object(util, 'load_file', return_value='')) + mocks.enter_context( + mock.patch.object(os.path, 'isfile', return_value=False)) + + rh_distro.apply_network(BASE_NET_CFG, False) + + self.assertEquals(len(write_bufs), 4) + self.assertIn('/etc/sysconfig/network-scripts/ifcfg-lo', + write_bufs) + write_buf = write_bufs['/etc/sysconfig/network-scripts/ifcfg-lo'] + expected_buf = ''' DEVICE="lo" ONBOOT=yes ''' - self.assertCfgEquals(expected_buf, str(write_buf)) - self.assertEquals(write_buf.mode, 0644) + self.assertCfgEquals(expected_buf, str(write_buf)) + self.assertEquals(write_buf.mode, 0o644) - self.assertIn('/etc/sysconfig/network-scripts/ifcfg-eth0', write_bufs) - write_buf = write_bufs['/etc/sysconfig/network-scripts/ifcfg-eth0'] - expected_buf = ''' + self.assertIn('/etc/sysconfig/network-scripts/ifcfg-eth0', + write_bufs) + write_buf = write_bufs['/etc/sysconfig/network-scripts/ifcfg-eth0'] + expected_buf = ''' DEVICE="eth0" BOOTPROTO="static" NETMASK="255.255.255.0" @@ -182,77 +171,66 @@ ONBOOT=yes GATEWAY="192.168.1.254" BROADCAST="192.168.1.0" ''' - self.assertCfgEquals(expected_buf, str(write_buf)) - self.assertEquals(write_buf.mode, 0644) + self.assertCfgEquals(expected_buf, str(write_buf)) + self.assertEquals(write_buf.mode, 0o644) - self.assertIn('/etc/sysconfig/network-scripts/ifcfg-eth1', write_bufs) - write_buf = write_bufs['/etc/sysconfig/network-scripts/ifcfg-eth1'] - expected_buf = ''' + self.assertIn('/etc/sysconfig/network-scripts/ifcfg-eth1', + write_bufs) + write_buf = write_bufs['/etc/sysconfig/network-scripts/ifcfg-eth1'] + expected_buf = ''' DEVICE="eth1" BOOTPROTO="dhcp" ONBOOT=yes ''' - self.assertCfgEquals(expected_buf, str(write_buf)) - self.assertEquals(write_buf.mode, 0644) + self.assertCfgEquals(expected_buf, str(write_buf)) + self.assertEquals(write_buf.mode, 0o644) - self.assertIn('/etc/sysconfig/network', write_bufs) - write_buf = write_bufs['/etc/sysconfig/network'] - expected_buf = ''' + self.assertIn('/etc/sysconfig/network', write_bufs) + write_buf = write_bufs['/etc/sysconfig/network'] + expected_buf = ''' # Created by cloud-init v. 0.7 NETWORKING=yes ''' - self.assertCfgEquals(expected_buf, str(write_buf)) - self.assertEquals(write_buf.mode, 0644) + self.assertCfgEquals(expected_buf, str(write_buf)) + self.assertEquals(write_buf.mode, 0o644) def test_write_ipv6_rhel(self): rh_distro = self._get_distro('rhel') - write_mock = self.mocker.replace(util.write_file, - spec=False, passthrough=False) - load_mock = self.mocker.replace(util.load_file, - spec=False, passthrough=False) - exists_mock = self.mocker.replace(os.path.isfile, - spec=False, passthrough=False) write_bufs = {} - def replace_write(filename, content, mode=0644, omode="wb"): + def replace_write(filename, content, mode=0o644, omode="wb"): buf = WriteBuffer() buf.mode = mode buf.omode = omode buf.write(content) write_bufs[filename] = buf - exists_mock(mocker.ARGS) - self.mocker.count(0, None) - self.mocker.result(False) - - load_mock(mocker.ARGS) - self.mocker.count(0, None) - self.mocker.result('') - - for _i in range(0, 3): - write_mock(mocker.ARGS) - self.mocker.call(replace_write) - - write_mock(mocker.ARGS) - self.mocker.call(replace_write) - - self.mocker.replay() - rh_distro.apply_network(BASE_NET_CFG_IPV6, False) - - self.assertEquals(len(write_bufs), 4) - self.assertIn('/etc/sysconfig/network-scripts/ifcfg-lo', write_bufs) - write_buf = write_bufs['/etc/sysconfig/network-scripts/ifcfg-lo'] - expected_buf = ''' + with ExitStack() as mocks: + mocks.enter_context( + mock.patch.object(util, 'write_file', replace_write)) + mocks.enter_context( + mock.patch.object(util, 'load_file', return_value='')) + mocks.enter_context( + mock.patch.object(os.path, 'isfile', return_value=False)) + + rh_distro.apply_network(BASE_NET_CFG_IPV6, False) + + self.assertEquals(len(write_bufs), 4) + self.assertIn('/etc/sysconfig/network-scripts/ifcfg-lo', + write_bufs) + write_buf = write_bufs['/etc/sysconfig/network-scripts/ifcfg-lo'] + expected_buf = ''' DEVICE="lo" ONBOOT=yes ''' - self.assertCfgEquals(expected_buf, str(write_buf)) - self.assertEquals(write_buf.mode, 0644) + self.assertCfgEquals(expected_buf, str(write_buf)) + self.assertEquals(write_buf.mode, 0o644) - self.assertIn('/etc/sysconfig/network-scripts/ifcfg-eth0', write_bufs) - write_buf = write_bufs['/etc/sysconfig/network-scripts/ifcfg-eth0'] - expected_buf = ''' + self.assertIn('/etc/sysconfig/network-scripts/ifcfg-eth0', + write_bufs) + write_buf = write_bufs['/etc/sysconfig/network-scripts/ifcfg-eth0'] + expected_buf = ''' DEVICE="eth0" BOOTPROTO="static" NETMASK="255.255.255.0" @@ -264,11 +242,12 @@ IPV6INIT=yes IPV6ADDR="2607:f0d0:1002:0011::2" IPV6_DEFAULTGW="2607:f0d0:1002:0011::1" ''' - self.assertCfgEquals(expected_buf, str(write_buf)) - self.assertEquals(write_buf.mode, 0644) - self.assertIn('/etc/sysconfig/network-scripts/ifcfg-eth1', write_bufs) - write_buf = write_bufs['/etc/sysconfig/network-scripts/ifcfg-eth1'] - expected_buf = ''' + self.assertCfgEquals(expected_buf, str(write_buf)) + self.assertEquals(write_buf.mode, 0o644) + self.assertIn('/etc/sysconfig/network-scripts/ifcfg-eth1', + write_bufs) + write_buf = write_bufs['/etc/sysconfig/network-scripts/ifcfg-eth1'] + expected_buf = ''' DEVICE="eth1" BOOTPROTO="static" NETMASK="255.255.255.0" @@ -280,38 +259,22 @@ IPV6INIT=yes IPV6ADDR="2607:f0d0:1002:0011::3" IPV6_DEFAULTGW="2607:f0d0:1002:0011::1" ''' - self.assertCfgEquals(expected_buf, str(write_buf)) - self.assertEquals(write_buf.mode, 0644) + self.assertCfgEquals(expected_buf, str(write_buf)) + self.assertEquals(write_buf.mode, 0o644) - self.assertIn('/etc/sysconfig/network', write_bufs) - write_buf = write_bufs['/etc/sysconfig/network'] - expected_buf = ''' + self.assertIn('/etc/sysconfig/network', write_bufs) + write_buf = write_bufs['/etc/sysconfig/network'] + expected_buf = ''' # Created by cloud-init v. 0.7 NETWORKING=yes NETWORKING_IPV6=yes IPV6_AUTOCONF=no ''' - self.assertCfgEquals(expected_buf, str(write_buf)) - self.assertEquals(write_buf.mode, 0644) + self.assertCfgEquals(expected_buf, str(write_buf)) + self.assertEquals(write_buf.mode, 0o644) def test_simple_write_freebsd(self): fbsd_distro = self._get_distro('freebsd') - util_mock = self.mocker.replace(util.write_file, - spec=False, passthrough=False) - exists_mock = self.mocker.replace(os.path.isfile, - spec=False, passthrough=False) - load_mock = self.mocker.replace(util.load_file, - spec=False, passthrough=False) - subp_mock = self.mocker.replace(util.subp, - spec=False, passthrough=False) - - subp_mock(['ifconfig', '-a']) - self.mocker.count(0, None) - self.mocker.result(('vtnet0', '')) - - exists_mock(mocker.ARGS) - self.mocker.count(0, None) - self.mocker.result(False) write_bufs = {} read_bufs = { @@ -319,7 +282,7 @@ IPV6_AUTOCONF=no '/etc/resolv.conf': '', } - def replace_write(filename, content, mode=0644, omode="wb"): + def replace_write(filename, content, mode=0o644, omode="wb"): buf = WriteBuffer() buf.mode = mode buf.omode = omode @@ -336,23 +299,24 @@ IPV6_AUTOCONF=no return str(write_bufs[fname]) return read_bufs[fname] - util_mock(mocker.ARGS) - self.mocker.call(replace_write) - self.mocker.count(0, None) - - load_mock(mocker.ARGS) - self.mocker.call(replace_read) - self.mocker.count(0, None) - - self.mocker.replay() - fbsd_distro.apply_network(BASE_NET_CFG, False) - - self.assertIn('/etc/rc.conf', write_bufs) - write_buf = write_bufs['/etc/rc.conf'] - expected_buf = ''' + with ExitStack() as mocks: + mocks.enter_context( + mock.patch.object(util, 'subp', return_value=('vtnet0', ''))) + mocks.enter_context( + mock.patch.object(os.path, 'exists', return_value=False)) + mocks.enter_context( + mock.patch.object(util, 'write_file', replace_write)) + mocks.enter_context( + mock.patch.object(util, 'load_file', replace_read)) + + fbsd_distro.apply_network(BASE_NET_CFG, False) + + self.assertIn('/etc/rc.conf', write_bufs) + write_buf = write_bufs['/etc/rc.conf'] + expected_buf = ''' ifconfig_vtnet0="192.168.1.5 netmask 255.255.255.0" ifconfig_vtnet1="DHCP" defaultrouter="192.168.1.254" ''' - self.assertCfgEquals(expected_buf, str(write_buf)) - self.assertEquals(write_buf.mode, 0644) + self.assertCfgEquals(expected_buf, str(write_buf)) + self.assertEquals(write_buf.mode, 0o644) diff --git a/tests/unittests/test_distros/test_resolv.py b/tests/unittests/test_distros/test_resolv.py index 6b6ff6aa..faaf5b7f 100644 --- a/tests/unittests/test_distros/test_resolv.py +++ b/tests/unittests/test_distros/test_resolv.py @@ -1,8 +1,7 @@ -from mocker import MockerTestCase - from cloudinit.distros.parsers import resolv_conf import re +from ..helpers import TestCase BASE_RESOLVE = ''' @@ -14,7 +13,7 @@ nameserver 10.15.30.92 BASE_RESOLVE = BASE_RESOLVE.strip() -class TestResolvHelper(MockerTestCase): +class TestResolvHelper(TestCase): def test_parse_same(self): rp = resolv_conf.ResolvConf(BASE_RESOLVE) rp_r = str(rp).strip() diff --git a/tests/unittests/test_distros/test_sysconfig.py b/tests/unittests/test_distros/test_sysconfig.py index 0c651407..03d89a10 100644 --- a/tests/unittests/test_distros/test_sysconfig.py +++ b/tests/unittests/test_distros/test_sysconfig.py @@ -1,14 +1,13 @@ -from mocker import MockerTestCase - import re from cloudinit.distros.parsers.sys_conf import SysConf +from ..helpers import TestCase # Lots of good examples @ # http://content.hccfl.edu/pollock/AUnix1/SysconfigFilesDesc.txt -class TestSysConfHelper(MockerTestCase): +class TestSysConfHelper(TestCase): # This function was added in 2.7, make it work for 2.6 def assertRegMatches(self, text, regexp): regexp = re.compile(regexp) diff --git a/tests/unittests/test_distros/test_user_data_normalize.py b/tests/unittests/test_distros/test_user_data_normalize.py index 50398c74..4525f487 100644 --- a/tests/unittests/test_distros/test_user_data_normalize.py +++ b/tests/unittests/test_distros/test_user_data_normalize.py @@ -1,21 +1,22 @@ -from mocker import MockerTestCase - from cloudinit import distros from cloudinit import helpers from cloudinit import settings +from ..helpers import TestCase + + bcfg = { - 'name': 'bob', - 'plain_text_passwd': 'ubuntu', - 'home': "/home/ubuntu", - 'shell': "/bin/bash", - 'lock_passwd': True, - 'gecos': "Ubuntu", - 'groups': ["foo"] + 'name': 'bob', + 'plain_text_passwd': 'ubuntu', + 'home': "/home/ubuntu", + 'shell': "/bin/bash", + 'lock_passwd': True, + 'gecos': "Ubuntu", + 'groups': ["foo"] } -class TestUGNormalize(MockerTestCase): +class TestUGNormalize(TestCase): def _make_distro(self, dtype, def_user=None): cfg = dict(settings.CFG_BUILTIN) @@ -33,16 +34,11 @@ class TestUGNormalize(MockerTestCase): def test_group_dict(self): distro = self._make_distro('ubuntu') g = {'groups': [ - { - 'ubuntu': ['foo', 'bar'], - 'bob': 'users', - }, - 'cloud-users', - { - 'bob': 'users2', - }, - ] - } + {'ubuntu': ['foo', 'bar'], + 'bob': 'users'}, + 'cloud-users', + {'bob': 'users2'} + ]} (_users, groups) = self._norm(g, distro) self.assertIn('ubuntu', groups) ub_members = groups['ubuntu'] diff --git a/tests/unittests/test_ec2_util.py b/tests/unittests/test_ec2_util.py index 84aa002e..99fc54be 100644 --- a/tests/unittests/test_ec2_util.py +++ b/tests/unittests/test_ec2_util.py @@ -3,7 +3,7 @@ from . import helpers from cloudinit import ec2_utils as eu from cloudinit import url_helper as uh -import httpretty as hp +hp = helpers.import_httpretty() class TestEc2Util(helpers.HttprettyTestCase): @@ -16,7 +16,7 @@ class TestEc2Util(helpers.HttprettyTestCase): body='stuff', status=200) userdata = eu.get_instance_userdata(self.VERSION) - self.assertEquals('stuff', userdata) + self.assertEquals('stuff', userdata.decode('utf-8')) @hp.activate def test_userdata_fetch_fail_not_found(self): diff --git a/tests/unittests/test_filters/test_launch_index.py b/tests/unittests/test_filters/test_launch_index.py index 2f4c2fda..95d24b9b 100644 --- a/tests/unittests/test_filters/test_launch_index.py +++ b/tests/unittests/test_filters/test_launch_index.py @@ -2,7 +2,7 @@ import copy from .. import helpers -import itertools +from six.moves import filterfalse from cloudinit.filters import launch_index from cloudinit import user_data as ud @@ -36,11 +36,9 @@ class TestLaunchFilter(helpers.ResourceUsingTestCase): return False # Do some basic payload checking msg1_msgs = [m for m in msg1.walk()] - msg1_msgs = [m for m in - itertools.ifilterfalse(ud.is_skippable, msg1_msgs)] + msg1_msgs = [m for m in filterfalse(ud.is_skippable, msg1_msgs)] msg2_msgs = [m for m in msg2.walk()] - msg2_msgs = [m for m in - itertools.ifilterfalse(ud.is_skippable, msg2_msgs)] + msg2_msgs = [m for m in filterfalse(ud.is_skippable, msg2_msgs)] for i in range(0, len(msg2_msgs)): m1_msg = msg1_msgs[i] m2_msg = msg2_msgs[i] diff --git a/tests/unittests/test_handler/test_handler_apt_configure.py b/tests/unittests/test_handler/test_handler_apt_configure.py index 203dd2aa..1ed185ca 100644 --- a/tests/unittests/test_handler/test_handler_apt_configure.py +++ b/tests/unittests/test_handler/test_handler_apt_configure.py @@ -1,27 +1,30 @@ -from mocker import MockerTestCase - from cloudinit import util from cloudinit.config import cc_apt_configure +from ..helpers import TestCase import os import re +import shutil +import tempfile + +def load_tfile_or_url(*args, **kwargs): + return(util.decode_binary(util.read_file_or_url(*args, **kwargs).contents)) -class TestAptProxyConfig(MockerTestCase): + +class TestAptProxyConfig(TestCase): def setUp(self): super(TestAptProxyConfig, self).setUp() - self.tmp = self.makeDir() + self.tmp = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, self.tmp) self.pfile = os.path.join(self.tmp, "proxy.cfg") self.cfile = os.path.join(self.tmp, "config.cfg") def _search_apt_config(self, contents, ptype, value): - print( - r"acquire::%s::proxy\s+[\"']%s[\"'];\n" % (ptype, value), - contents, "flags=re.IGNORECASE") - return(re.search( + return re.search( r"acquire::%s::proxy\s+[\"']%s[\"'];\n" % (ptype, value), - contents, flags=re.IGNORECASE)) + contents, flags=re.IGNORECASE) def test_apt_proxy_written(self): cfg = {'apt_proxy': 'myproxy'} @@ -30,7 +33,7 @@ class TestAptProxyConfig(MockerTestCase): self.assertTrue(os.path.isfile(self.pfile)) self.assertFalse(os.path.isfile(self.cfile)) - contents = str(util.read_file_or_url(self.pfile)) + contents = load_tfile_or_url(self.pfile) self.assertTrue(self._search_apt_config(contents, "http", "myproxy")) def test_apt_http_proxy_written(self): @@ -40,7 +43,7 @@ class TestAptProxyConfig(MockerTestCase): self.assertTrue(os.path.isfile(self.pfile)) self.assertFalse(os.path.isfile(self.cfile)) - contents = str(util.read_file_or_url(self.pfile)) + contents = load_tfile_or_url(self.pfile) self.assertTrue(self._search_apt_config(contents, "http", "myproxy")) def test_apt_all_proxy_written(self): @@ -58,9 +61,9 @@ class TestAptProxyConfig(MockerTestCase): self.assertTrue(os.path.isfile(self.pfile)) self.assertFalse(os.path.isfile(self.cfile)) - contents = str(util.read_file_or_url(self.pfile)) + contents = load_tfile_or_url(self.pfile) - for ptype, pval in values.iteritems(): + for ptype, pval in values.items(): self.assertTrue(self._search_apt_config(contents, ptype, pval)) def test_proxy_deleted(self): @@ -74,7 +77,7 @@ class TestAptProxyConfig(MockerTestCase): cc_apt_configure.apply_apt_config({'apt_proxy': "foo"}, self.pfile, self.cfile) self.assertTrue(os.path.isfile(self.pfile)) - contents = str(util.read_file_or_url(self.pfile)) + contents = load_tfile_or_url(self.pfile) self.assertTrue(self._search_apt_config(contents, "http", "foo")) def test_config_written(self): @@ -86,14 +89,14 @@ class TestAptProxyConfig(MockerTestCase): self.assertTrue(os.path.isfile(self.cfile)) self.assertFalse(os.path.isfile(self.pfile)) - self.assertEqual(str(util.read_file_or_url(self.cfile)), payload) + self.assertEqual(load_tfile_or_url(self.cfile), payload) def test_config_replaced(self): util.write_file(self.pfile, "content doesnt matter") cc_apt_configure.apply_apt_config({'apt_config': "foo"}, self.pfile, self.cfile) self.assertTrue(os.path.isfile(self.cfile)) - self.assertEqual(str(util.read_file_or_url(self.cfile)), "foo") + self.assertEqual(load_tfile_or_url(self.cfile), "foo") def test_config_deleted(self): # if no 'apt_config' is provided, delete any previously written file diff --git a/tests/unittests/test_handler/test_handler_ca_certs.py b/tests/unittests/test_handler/test_handler_ca_certs.py index 0558023a..a6b9c0fd 100644 --- a/tests/unittests/test_handler/test_handler_ca_certs.py +++ b/tests/unittests/test_handler/test_handler_ca_certs.py @@ -1,15 +1,26 @@ -from mocker import MockerTestCase - from cloudinit import cloud from cloudinit import helpers from cloudinit import util from cloudinit.config import cc_ca_certs +from ..helpers import TestCase import logging +import shutil +import tempfile +import unittest + +try: + from unittest import mock +except ImportError: + import mock +try: + from contextlib import ExitStack +except ImportError: + from contextlib2 import ExitStack -class TestNoConfig(MockerTestCase): +class TestNoConfig(unittest.TestCase): def setUp(self): super(TestNoConfig, self).setUp() self.name = "ca-certs" @@ -22,15 +33,20 @@ class TestNoConfig(MockerTestCase): Test that nothing is done if no ca-certs configuration is provided. """ config = util.get_builtin_cfg() - self.mocker.replace(util.write_file, passthrough=False) - self.mocker.replace(cc_ca_certs.update_ca_certs, passthrough=False) - self.mocker.replay() + with ExitStack() as mocks: + util_mock = mocks.enter_context( + mock.patch.object(util, 'write_file')) + certs_mock = mocks.enter_context( + mock.patch.object(cc_ca_certs, 'update_ca_certs')) - cc_ca_certs.handle(self.name, config, self.cloud_init, self.log, - self.args) + cc_ca_certs.handle(self.name, config, self.cloud_init, self.log, + self.args) + self.assertEqual(util_mock.call_count, 0) + self.assertEqual(certs_mock.call_count, 0) -class TestConfig(MockerTestCase): + +class TestConfig(TestCase): def setUp(self): super(TestConfig, self).setUp() self.name = "ca-certs" @@ -39,16 +55,16 @@ class TestConfig(MockerTestCase): self.log = logging.getLogger("TestNoConfig") self.args = [] - # Mock out the functions that actually modify the system - self.mock_add = self.mocker.replace(cc_ca_certs.add_ca_certs, - passthrough=False) - self.mock_update = self.mocker.replace(cc_ca_certs.update_ca_certs, - passthrough=False) - self.mock_remove = self.mocker.replace( - cc_ca_certs.remove_default_ca_certs, passthrough=False) + self.mocks = ExitStack() + self.addCleanup(self.mocks.close) - # Order must be correct - self.mocker.order() + # Mock out the functions that actually modify the system + self.mock_add = self.mocks.enter_context( + mock.patch.object(cc_ca_certs, 'add_ca_certs')) + self.mock_update = self.mocks.enter_context( + mock.patch.object(cc_ca_certs, 'update_ca_certs')) + self.mock_remove = self.mocks.enter_context( + mock.patch.object(cc_ca_certs, 'remove_default_ca_certs')) def test_no_trusted_list(self): """ @@ -57,86 +73,88 @@ class TestConfig(MockerTestCase): """ config = {"ca-certs": {}} - # No functions should be called - self.mock_update() - self.mocker.replay() - cc_ca_certs.handle(self.name, config, self.cloud, self.log, self.args) + self.assertEqual(self.mock_add.call_count, 0) + self.assertEqual(self.mock_update.call_count, 1) + self.assertEqual(self.mock_remove.call_count, 0) + def test_empty_trusted_list(self): """Test that no certificate are written if 'trusted' list is empty.""" config = {"ca-certs": {"trusted": []}} - # No functions should be called - self.mock_update() - self.mocker.replay() - cc_ca_certs.handle(self.name, config, self.cloud, self.log, self.args) + self.assertEqual(self.mock_add.call_count, 0) + self.assertEqual(self.mock_update.call_count, 1) + self.assertEqual(self.mock_remove.call_count, 0) + def test_single_trusted(self): """Test that a single cert gets passed to add_ca_certs.""" config = {"ca-certs": {"trusted": ["CERT1"]}} - self.mock_add(["CERT1"]) - self.mock_update() - self.mocker.replay() - cc_ca_certs.handle(self.name, config, self.cloud, self.log, self.args) + self.mock_add.assert_called_once_with(['CERT1']) + self.assertEqual(self.mock_update.call_count, 1) + self.assertEqual(self.mock_remove.call_count, 0) + def test_multiple_trusted(self): """Test that multiple certs get passed to add_ca_certs.""" config = {"ca-certs": {"trusted": ["CERT1", "CERT2"]}} - self.mock_add(["CERT1", "CERT2"]) - self.mock_update() - self.mocker.replay() - cc_ca_certs.handle(self.name, config, self.cloud, self.log, self.args) + self.mock_add.assert_called_once_with(['CERT1', 'CERT2']) + self.assertEqual(self.mock_update.call_count, 1) + self.assertEqual(self.mock_remove.call_count, 0) + def test_remove_default_ca_certs(self): """Test remove_defaults works as expected.""" config = {"ca-certs": {"remove-defaults": True}} - self.mock_remove() - self.mock_update() - self.mocker.replay() - cc_ca_certs.handle(self.name, config, self.cloud, self.log, self.args) + self.assertEqual(self.mock_add.call_count, 0) + self.assertEqual(self.mock_update.call_count, 1) + self.assertEqual(self.mock_remove.call_count, 1) + def test_no_remove_defaults_if_false(self): """Test remove_defaults is not called when config value is False.""" config = {"ca-certs": {"remove-defaults": False}} - self.mock_update() - self.mocker.replay() - cc_ca_certs.handle(self.name, config, self.cloud, self.log, self.args) + self.assertEqual(self.mock_add.call_count, 0) + self.assertEqual(self.mock_update.call_count, 1) + self.assertEqual(self.mock_remove.call_count, 0) + def test_correct_order_for_remove_then_add(self): """Test remove_defaults is not called when config value is False.""" config = {"ca-certs": {"remove-defaults": True, "trusted": ["CERT1"]}} - self.mock_remove() - self.mock_add(["CERT1"]) - self.mock_update() - self.mocker.replay() - cc_ca_certs.handle(self.name, config, self.cloud, self.log, self.args) + self.mock_add.assert_called_once_with(['CERT1']) + self.assertEqual(self.mock_update.call_count, 1) + self.assertEqual(self.mock_remove.call_count, 1) -class TestAddCaCerts(MockerTestCase): + +class TestAddCaCerts(TestCase): def setUp(self): super(TestAddCaCerts, self).setUp() + tmpdir = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, tmpdir) self.paths = helpers.Paths({ - 'cloud_dir': self.makeDir() + 'cloud_dir': tmpdir, }) def test_no_certs_in_list(self): """Test that no certificate are written if not provided.""" - self.mocker.replace(util.write_file, passthrough=False) - self.mocker.replay() - cc_ca_certs.add_ca_certs([]) + with mock.patch.object(util, 'write_file') as mockobj: + cc_ca_certs.add_ca_certs([]) + self.assertEqual(mockobj.call_count, 0) def test_single_cert_trailing_cr(self): """Test adding a single certificate to the trusted CAs @@ -146,19 +164,21 @@ class TestAddCaCerts(MockerTestCase): ca_certs_content = "line1\nline2\ncloud-init-ca-certs.crt\nline3\n" expected = "line1\nline2\nline3\ncloud-init-ca-certs.crt\n" - mock_write = self.mocker.replace(util.write_file, passthrough=False) - mock_load = self.mocker.replace(util.load_file, passthrough=False) - - mock_write("/usr/share/ca-certificates/cloud-init-ca-certs.crt", - cert, mode=0644) - - mock_load("/etc/ca-certificates.conf") - self.mocker.result(ca_certs_content) + with ExitStack() as mocks: + mock_write = mocks.enter_context( + mock.patch.object(util, 'write_file')) + mock_load = mocks.enter_context( + mock.patch.object(util, 'load_file', + return_value=ca_certs_content)) - mock_write("/etc/ca-certificates.conf", expected, omode="wb") - self.mocker.replay() + cc_ca_certs.add_ca_certs([cert]) - cc_ca_certs.add_ca_certs([cert]) + mock_write.assert_has_calls([ + mock.call("/usr/share/ca-certificates/cloud-init-ca-certs.crt", + cert, mode=0o644), + mock.call("/etc/ca-certificates.conf", expected, omode="wb"), + ]) + mock_load.assert_called_once_with("/etc/ca-certificates.conf") def test_single_cert_no_trailing_cr(self): """Test adding a single certificate to the trusted CAs @@ -167,75 +187,89 @@ class TestAddCaCerts(MockerTestCase): ca_certs_content = "line1\nline2\nline3" - mock_write = self.mocker.replace(util.write_file, passthrough=False) - mock_load = self.mocker.replace(util.load_file, passthrough=False) + with ExitStack() as mocks: + mock_write = mocks.enter_context( + mock.patch.object(util, 'write_file')) + mock_load = mocks.enter_context( + mock.patch.object(util, 'load_file', + return_value=ca_certs_content)) - mock_write("/usr/share/ca-certificates/cloud-init-ca-certs.crt", - cert, mode=0644) + cc_ca_certs.add_ca_certs([cert]) - mock_load("/etc/ca-certificates.conf") - self.mocker.result(ca_certs_content) + mock_write.assert_has_calls([ + mock.call("/usr/share/ca-certificates/cloud-init-ca-certs.crt", + cert, mode=0o644), + mock.call("/etc/ca-certificates.conf", + "%s\n%s\n" % (ca_certs_content, + "cloud-init-ca-certs.crt"), + omode="wb"), + ]) - mock_write("/etc/ca-certificates.conf", - "%s\n%s\n" % (ca_certs_content, "cloud-init-ca-certs.crt"), - omode="wb") - self.mocker.replay() - - cc_ca_certs.add_ca_certs([cert]) + mock_load.assert_called_once_with("/etc/ca-certificates.conf") def test_multiple_certs(self): """Test adding multiple certificates to the trusted CAs.""" certs = ["CERT1\nLINE2\nLINE3", "CERT2\nLINE2\nLINE3"] expected_cert_file = "\n".join(certs) - - mock_write = self.mocker.replace(util.write_file, passthrough=False) - mock_load = self.mocker.replace(util.load_file, passthrough=False) - - mock_write("/usr/share/ca-certificates/cloud-init-ca-certs.crt", - expected_cert_file, mode=0644) - ca_certs_content = "line1\nline2\nline3" - mock_load("/etc/ca-certificates.conf") - self.mocker.result(ca_certs_content) - out = "%s\n%s\n" % (ca_certs_content, "cloud-init-ca-certs.crt") - mock_write("/etc/ca-certificates.conf", out, omode="wb") + with ExitStack() as mocks: + mock_write = mocks.enter_context( + mock.patch.object(util, 'write_file')) + mock_load = mocks.enter_context( + mock.patch.object(util, 'load_file', + return_value=ca_certs_content)) - self.mocker.replay() + cc_ca_certs.add_ca_certs(certs) - cc_ca_certs.add_ca_certs(certs) + mock_write.assert_has_calls([ + mock.call("/usr/share/ca-certificates/cloud-init-ca-certs.crt", + expected_cert_file, mode=0o644), + mock.call("/etc/ca-certificates.conf", + "%s\n%s\n" % (ca_certs_content, + "cloud-init-ca-certs.crt"), + omode='wb'), + ]) + mock_load.assert_called_once_with("/etc/ca-certificates.conf") -class TestUpdateCaCerts(MockerTestCase): - def test_commands(self): - mock_check_call = self.mocker.replace(util.subp, - passthrough=False) - mock_check_call(["update-ca-certificates"], capture=False) - self.mocker.replay() - cc_ca_certs.update_ca_certs() +class TestUpdateCaCerts(unittest.TestCase): + def test_commands(self): + with mock.patch.object(util, 'subp') as mockobj: + cc_ca_certs.update_ca_certs() + mockobj.assert_called_once_with( + ["update-ca-certificates"], capture=False) -class TestRemoveDefaultCaCerts(MockerTestCase): +class TestRemoveDefaultCaCerts(TestCase): def setUp(self): super(TestRemoveDefaultCaCerts, self).setUp() + tmpdir = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, tmpdir) self.paths = helpers.Paths({ - 'cloud_dir': self.makeDir() + 'cloud_dir': tmpdir, }) def test_commands(self): - mock_delete_dir_contents = self.mocker.replace( - util.delete_dir_contents, passthrough=False) - mock_write = self.mocker.replace(util.write_file, passthrough=False) - mock_subp = self.mocker.replace(util.subp, - passthrough=False) - - mock_delete_dir_contents("/usr/share/ca-certificates/") - mock_delete_dir_contents("/etc/ssl/certs/") - mock_write("/etc/ca-certificates.conf", "", mode=0644) - mock_subp(('debconf-set-selections', '-'), - "ca-certificates ca-certificates/trust_new_crts select no") - self.mocker.replay() - - cc_ca_certs.remove_default_ca_certs() + with ExitStack() as mocks: + mock_delete = mocks.enter_context( + mock.patch.object(util, 'delete_dir_contents')) + mock_write = mocks.enter_context( + mock.patch.object(util, 'write_file')) + mock_subp = mocks.enter_context(mock.patch.object(util, 'subp')) + + cc_ca_certs.remove_default_ca_certs() + + mock_delete.assert_has_calls([ + mock.call("/usr/share/ca-certificates/"), + mock.call("/etc/ssl/certs/"), + ]) + + mock_write.assert_called_once_with( + "/etc/ca-certificates.conf", "", mode=0o644) + + mock_subp.assert_called_once_with( + ('debconf-set-selections', '-'), + "ca-certificates ca-certificates/trust_new_crts select no") diff --git a/tests/unittests/test_handler/test_handler_chef.py b/tests/unittests/test_handler/test_handler_chef.py index ef1aa208..edad88cb 100644 --- a/tests/unittests/test_handler/test_handler_chef.py +++ b/tests/unittests/test_handler/test_handler_chef.py @@ -11,15 +11,21 @@ from cloudinit.sources import DataSourceNone from .. import helpers as t_help +import six import logging +import shutil +import tempfile LOG = logging.getLogger(__name__) +CLIENT_TEMPL = os.path.sep.join(["templates", "chef_client.rb.tmpl"]) + class TestChef(t_help.FilesystemMockingTestCase): def setUp(self): super(TestChef, self).setUp() - self.tmp = self.makeDir(prefix="unittest_") + self.tmp = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, self.tmp) def fetch_cloud(self, distro_kind): cls = distros.fetch(distro_kind) @@ -37,9 +43,13 @@ class TestChef(t_help.FilesystemMockingTestCase): for d in cc_chef.CHEF_DIRS: self.assertFalse(os.path.isdir(d)) + @t_help.skipIf(not os.path.isfile(CLIENT_TEMPL), + CLIENT_TEMPL + " is not available") def test_basic_config(self): - # This should create a file of the format... """ + test basic config looks sane + + # This should create a file of the format... # Created by cloud-init v. 0.7.6 on Sat, 11 Oct 2014 23:57:21 +0000 log_level :info ssl_verify_mode :verify_none @@ -74,7 +84,7 @@ class TestChef(t_help.FilesystemMockingTestCase): for k, v in cfg['chef'].items(): self.assertIn(v, c) for k, v in cc_chef.CHEF_RB_TPL_DEFAULTS.items(): - if isinstance(v, basestring): + if isinstance(v, six.string_types): self.assertIn(v, c) c = util.load_file(cc_chef.CHEF_FB_PATH) self.assertEqual({}, json.loads(c)) @@ -101,6 +111,8 @@ class TestChef(t_help.FilesystemMockingTestCase): 'c': 'd', }, json.loads(c)) + @t_help.skipIf(not os.path.isfile(CLIENT_TEMPL), + CLIENT_TEMPL + " is not available") def test_template_deletes(self): tpl_file = util.load_file('templates/chef_client.rb.tmpl') self.patchUtils(self.tmp) diff --git a/tests/unittests/test_handler/test_handler_debug.py b/tests/unittests/test_handler/test_handler_debug.py index 8891ca04..80708d7b 100644 --- a/tests/unittests/test_handler/test_handler_debug.py +++ b/tests/unittests/test_handler/test_handler_debug.py @@ -26,6 +26,8 @@ from cloudinit.sources import DataSourceNone from .. import helpers as t_help import logging +import shutil +import tempfile LOG = logging.getLogger(__name__) @@ -33,7 +35,8 @@ LOG = logging.getLogger(__name__) class TestDebug(t_help.FilesystemMockingTestCase): def setUp(self): super(TestDebug, self).setUp() - self.new_root = self.makeDir(prefix="unittest_") + self.new_root = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, self.new_root) def _get_cloud(self, distro, metadata=None): self.patchUtils(self.new_root) diff --git a/tests/unittests/test_handler/test_handler_disk_setup.py b/tests/unittests/test_handler/test_handler_disk_setup.py new file mode 100644 index 00000000..ddef8d48 --- /dev/null +++ b/tests/unittests/test_handler/test_handler_disk_setup.py @@ -0,0 +1,30 @@ +from cloudinit.config import cc_disk_setup +from ..helpers import ExitStack, mock, TestCase + + +class TestIsDiskUsed(TestCase): + + def setUp(self): + super(TestIsDiskUsed, self).setUp() + self.patches = ExitStack() + mod_name = 'cloudinit.config.cc_disk_setup' + self.enumerate_disk = self.patches.enter_context( + mock.patch('{0}.enumerate_disk'.format(mod_name))) + self.check_fs = self.patches.enter_context( + mock.patch('{0}.check_fs'.format(mod_name))) + + def test_multiple_child_nodes_returns_true(self): + self.enumerate_disk.return_value = (mock.MagicMock() for _ in range(2)) + self.check_fs.return_value = (mock.MagicMock(), None, mock.MagicMock()) + self.assertTrue(cc_disk_setup.is_disk_used(mock.MagicMock())) + + def test_valid_filesystem_returns_true(self): + self.enumerate_disk.return_value = (mock.MagicMock() for _ in range(1)) + self.check_fs.return_value = ( + mock.MagicMock(), 'ext4', mock.MagicMock()) + self.assertTrue(cc_disk_setup.is_disk_used(mock.MagicMock())) + + def test_one_child_nodes_and_no_fs_returns_false(self): + self.enumerate_disk.return_value = (mock.MagicMock() for _ in range(1)) + self.check_fs.return_value = (mock.MagicMock(), None, mock.MagicMock()) + self.assertFalse(cc_disk_setup.is_disk_used(mock.MagicMock())) diff --git a/tests/unittests/test_handler/test_handler_growpart.py b/tests/unittests/test_handler/test_handler_growpart.py index 5d0636d1..bef0d80d 100644 --- a/tests/unittests/test_handler/test_handler_growpart.py +++ b/tests/unittests/test_handler/test_handler_growpart.py @@ -1,14 +1,23 @@ -from mocker import MockerTestCase - from cloudinit import cloud from cloudinit import util from cloudinit.config import cc_growpart +from ..helpers import TestCase import errno import logging import os import re +import unittest + +try: + from unittest import mock +except ImportError: + import mock +try: + from contextlib import ExitStack +except ImportError: + from contextlib2 import ExitStack # growpart: # mode: auto # off, on, auto, 'growpart' @@ -42,7 +51,7 @@ growpart disk partition """ -class TestDisabled(MockerTestCase): +class TestDisabled(unittest.TestCase): def setUp(self): super(TestDisabled, self).setUp() self.name = "growpart" @@ -57,14 +66,14 @@ class TestDisabled(MockerTestCase): # this really only verifies that resizer_factory isn't called config = {'growpart': {'mode': 'off'}} - self.mocker.replace(cc_growpart.resizer_factory, - passthrough=False) - self.mocker.replay() - self.handle(self.name, config, self.cloud_init, self.log, self.args) + with mock.patch.object(cc_growpart, 'resizer_factory') as mockobj: + self.handle(self.name, config, self.cloud_init, self.log, + self.args) + self.assertEqual(mockobj.call_count, 0) -class TestConfig(MockerTestCase): +class TestConfig(TestCase): def setUp(self): super(TestConfig, self).setUp() self.name = "growpart" @@ -77,75 +86,76 @@ class TestConfig(MockerTestCase): self.cloud_init = None self.handle = cc_growpart.handle - # Order must be correct - self.mocker.order() - def test_no_resizers_auto_is_fine(self): - subp = self.mocker.replace(util.subp, passthrough=False) - subp(['growpart', '--help'], env={'LANG': 'C'}) - self.mocker.result((HELP_GROWPART_NO_RESIZE, "")) - self.mocker.replay() + with mock.patch.object( + util, 'subp', + return_value=(HELP_GROWPART_NO_RESIZE, "")) as mockobj: - config = {'growpart': {'mode': 'auto'}} - self.handle(self.name, config, self.cloud_init, self.log, self.args) + config = {'growpart': {'mode': 'auto'}} + self.handle(self.name, config, self.cloud_init, self.log, + self.args) + + mockobj.assert_called_once_with( + ['growpart', '--help'], env={'LANG': 'C'}) def test_no_resizers_mode_growpart_is_exception(self): - subp = self.mocker.replace(util.subp, passthrough=False) - subp(['growpart', '--help'], env={'LANG': 'C'}) - self.mocker.result((HELP_GROWPART_NO_RESIZE, "")) - self.mocker.replay() + with mock.patch.object( + util, 'subp', + return_value=(HELP_GROWPART_NO_RESIZE, "")) as mockobj: + config = {'growpart': {'mode': "growpart"}} + self.assertRaises( + ValueError, self.handle, self.name, config, + self.cloud_init, self.log, self.args) - config = {'growpart': {'mode': "growpart"}} - self.assertRaises(ValueError, self.handle, self.name, config, - self.cloud_init, self.log, self.args) + mockobj.assert_called_once_with( + ['growpart', '--help'], env={'LANG': 'C'}) def test_mode_auto_prefers_growpart(self): - subp = self.mocker.replace(util.subp, passthrough=False) - subp(['growpart', '--help'], env={'LANG': 'C'}) - self.mocker.result((HELP_GROWPART_RESIZE, "")) - self.mocker.replay() + with mock.patch.object( + util, 'subp', + return_value=(HELP_GROWPART_RESIZE, "")) as mockobj: + ret = cc_growpart.resizer_factory(mode="auto") + self.assertIsInstance(ret, cc_growpart.ResizeGrowPart) - ret = cc_growpart.resizer_factory(mode="auto") - self.assertTrue(isinstance(ret, cc_growpart.ResizeGrowPart)) + mockobj.assert_called_once_with( + ['growpart', '--help'], env={'LANG': 'C'}) def test_handle_with_no_growpart_entry(self): # if no 'growpart' entry in config, then mode=auto should be used myresizer = object() + retval = (("/", cc_growpart.RESIZE.CHANGED, "my-message",),) + + with ExitStack() as mocks: + factory = mocks.enter_context( + mock.patch.object(cc_growpart, 'resizer_factory', + return_value=myresizer)) + rsdevs = mocks.enter_context( + mock.patch.object(cc_growpart, 'resize_devices', + return_value=retval)) + mocks.enter_context( + mock.patch.object(cc_growpart, 'RESIZERS', + (('mysizer', object),) + )) - factory = self.mocker.replace(cc_growpart.resizer_factory, - passthrough=False) - rsdevs = self.mocker.replace(cc_growpart.resize_devices, - passthrough=False) - factory("auto") - self.mocker.result(myresizer) - rsdevs(myresizer, ["/"]) - self.mocker.result((("/", cc_growpart.RESIZE.CHANGED, "my-message",),)) - self.mocker.replay() - - try: - orig_resizers = cc_growpart.RESIZERS - cc_growpart.RESIZERS = (('mysizer', object),) self.handle(self.name, {}, self.cloud_init, self.log, self.args) - finally: - cc_growpart.RESIZERS = orig_resizers + factory.assert_called_once_with('auto') + rsdevs.assert_called_once_with(myresizer, ['/']) -class TestResize(MockerTestCase): + +class TestResize(unittest.TestCase): def setUp(self): super(TestResize, self).setUp() self.name = "growpart" self.log = logging.getLogger("TestResize") - # Order must be correct - self.mocker.order() - def test_simple_devices(self): # test simple device list # this patches out devent2dev, os.stat, and device_part_info # so in the end, doesn't test a lot devs = ["/dev/XXda1", "/dev/YYda2"] - devstat_ret = Bunch(st_mode=25008, st_ino=6078, st_dev=5L, + devstat_ret = Bunch(st_mode=25008, st_ino=6078, st_dev=5, st_nlink=1, st_uid=0, st_gid=6, st_size=0, st_atime=0, st_mtime=0, st_ctime=0) enoent = ["/dev/NOENT"] diff --git a/tests/unittests/test_handler/test_handler_locale.py b/tests/unittests/test_handler/test_handler_locale.py index eb251636..de85eff6 100644 --- a/tests/unittests/test_handler/test_handler_locale.py +++ b/tests/unittests/test_handler/test_handler_locale.py @@ -29,9 +29,11 @@ from .. import helpers as t_help from configobj import ConfigObj -from StringIO import StringIO +from six import BytesIO import logging +import shutil +import tempfile LOG = logging.getLogger(__name__) @@ -39,7 +41,8 @@ LOG = logging.getLogger(__name__) class TestLocale(t_help.FilesystemMockingTestCase): def setUp(self): super(TestLocale, self).setUp() - self.new_root = self.makeDir(prefix="unittest_") + self.new_root = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, self.new_root) def _get_cloud(self, distro): self.patchUtils(self.new_root) @@ -59,6 +62,6 @@ class TestLocale(t_help.FilesystemMockingTestCase): cc = self._get_cloud('sles') cc_locale.handle('cc_locale', cfg, cc, LOG, []) - contents = util.load_file('/etc/sysconfig/language') - n_cfg = ConfigObj(StringIO(contents)) + contents = util.load_file('/etc/sysconfig/language', decode=False) + n_cfg = ConfigObj(BytesIO(contents)) self.assertEquals({'RC_LANG': cfg['locale']}, dict(n_cfg)) diff --git a/tests/unittests/test_handler/test_handler_lxd.py b/tests/unittests/test_handler/test_handler_lxd.py new file mode 100644 index 00000000..7ffa2a53 --- /dev/null +++ b/tests/unittests/test_handler/test_handler_lxd.py @@ -0,0 +1,75 @@ +from cloudinit.config import cc_lxd +from cloudinit import (distros, helpers, cloud) +from cloudinit.sources import DataSourceNoCloud +from .. import helpers as t_help + +import logging + +try: + from unittest import mock +except ImportError: + import mock + +LOG = logging.getLogger(__name__) + + +class TestLxd(t_help.TestCase): + lxd_cfg = { + 'lxd': { + 'init': { + 'network_address': '0.0.0.0', + 'storage_backend': 'zfs', + 'storage_pool': 'poolname', + } + } + } + + def setUp(self): + super(TestLxd, self).setUp() + + def _get_cloud(self, distro): + cls = distros.fetch(distro) + paths = helpers.Paths({}) + d = cls(distro, {}, paths) + ds = DataSourceNoCloud.DataSourceNoCloud({}, d, paths) + cc = cloud.Cloud(ds, paths, {}, d, None) + return cc + + @mock.patch("cloudinit.config.cc_lxd.util") + def test_lxd_init(self, mock_util): + cc = self._get_cloud('ubuntu') + mock_util.which.return_value = True + cc_lxd.handle('cc_lxd', self.lxd_cfg, cc, LOG, []) + self.assertTrue(mock_util.which.called) + init_call = mock_util.subp.call_args_list[0][0][0] + self.assertEquals(init_call, + ['lxd', 'init', '--auto', + '--network-address=0.0.0.0', + '--storage-backend=zfs', + '--storage-pool=poolname']) + + @mock.patch("cloudinit.config.cc_lxd.util") + def test_lxd_install(self, mock_util): + cc = self._get_cloud('ubuntu') + cc.distro = mock.MagicMock() + mock_util.which.return_value = None + cc_lxd.handle('cc_lxd', self.lxd_cfg, cc, LOG, []) + self.assertTrue(cc.distro.install_packages.called) + install_pkg = cc.distro.install_packages.call_args_list[0][0][0] + self.assertEquals(sorted(install_pkg), ['lxd', 'zfs']) + + @mock.patch("cloudinit.config.cc_lxd.util") + def test_no_init_does_nothing(self, mock_util): + cc = self._get_cloud('ubuntu') + cc.distro = mock.MagicMock() + cc_lxd.handle('cc_lxd', {'lxd': {}}, cc, LOG, []) + self.assertFalse(cc.distro.install_packages.called) + self.assertFalse(mock_util.subp.called) + + @mock.patch("cloudinit.config.cc_lxd.util") + def test_no_lxd_does_nothing(self, mock_util): + cc = self._get_cloud('ubuntu') + cc.distro = mock.MagicMock() + cc_lxd.handle('cc_lxd', {'package_update': True}, cc, LOG, []) + self.assertFalse(cc.distro.install_packages.called) + self.assertFalse(mock_util.subp.called) diff --git a/tests/unittests/test_handler/test_handler_mounts.py b/tests/unittests/test_handler/test_handler_mounts.py new file mode 100644 index 00000000..355674b2 --- /dev/null +++ b/tests/unittests/test_handler/test_handler_mounts.py @@ -0,0 +1,133 @@ +import os.path +import shutil +import tempfile + +from cloudinit.config import cc_mounts + +from .. import helpers as test_helpers + +try: + from unittest import mock +except ImportError: + import mock + + +class TestSanitizeDevname(test_helpers.FilesystemMockingTestCase): + + def setUp(self): + super(TestSanitizeDevname, self).setUp() + self.new_root = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, self.new_root) + self.patchOS(self.new_root) + + def _touch(self, path): + path = os.path.join(self.new_root, path.lstrip('/')) + basedir = os.path.dirname(path) + if not os.path.exists(basedir): + os.makedirs(basedir) + open(path, 'a').close() + + def _makedirs(self, directory): + directory = os.path.join(self.new_root, directory.lstrip('/')) + if not os.path.exists(directory): + os.makedirs(directory) + + def mock_existence_of_disk(self, disk_path): + self._touch(disk_path) + self._makedirs(os.path.join('/sys/block', disk_path.split('/')[-1])) + + def mock_existence_of_partition(self, disk_path, partition_number): + self.mock_existence_of_disk(disk_path) + self._touch(disk_path + str(partition_number)) + disk_name = disk_path.split('/')[-1] + self._makedirs(os.path.join('/sys/block', + disk_name, + disk_name + str(partition_number))) + + def test_existent_full_disk_path_is_returned(self): + disk_path = '/dev/sda' + self.mock_existence_of_disk(disk_path) + self.assertEqual(disk_path, + cc_mounts.sanitize_devname(disk_path, + lambda x: None, + mock.Mock())) + + def test_existent_disk_name_returns_full_path(self): + disk_name = 'sda' + disk_path = '/dev/' + disk_name + self.mock_existence_of_disk(disk_path) + self.assertEqual(disk_path, + cc_mounts.sanitize_devname(disk_name, + lambda x: None, + mock.Mock())) + + def test_existent_meta_disk_is_returned(self): + actual_disk_path = '/dev/sda' + self.mock_existence_of_disk(actual_disk_path) + self.assertEqual( + actual_disk_path, + cc_mounts.sanitize_devname('ephemeral0', + lambda x: actual_disk_path, + mock.Mock())) + + def test_existent_meta_partition_is_returned(self): + disk_name, partition_part = '/dev/sda', '1' + actual_partition_path = disk_name + partition_part + self.mock_existence_of_partition(disk_name, partition_part) + self.assertEqual( + actual_partition_path, + cc_mounts.sanitize_devname('ephemeral0.1', + lambda x: disk_name, + mock.Mock())) + + def test_existent_meta_partition_with_p_is_returned(self): + disk_name, partition_part = '/dev/sda', 'p1' + actual_partition_path = disk_name + partition_part + self.mock_existence_of_partition(disk_name, partition_part) + self.assertEqual( + actual_partition_path, + cc_mounts.sanitize_devname('ephemeral0.1', + lambda x: disk_name, + mock.Mock())) + + def test_first_partition_returned_if_existent_disk_is_partitioned(self): + disk_name, partition_part = '/dev/sda', '1' + actual_partition_path = disk_name + partition_part + self.mock_existence_of_partition(disk_name, partition_part) + self.assertEqual( + actual_partition_path, + cc_mounts.sanitize_devname('ephemeral0', + lambda x: disk_name, + mock.Mock())) + + def test_nth_partition_returned_if_requested(self): + disk_name, partition_part = '/dev/sda', '3' + actual_partition_path = disk_name + partition_part + self.mock_existence_of_partition(disk_name, partition_part) + self.assertEqual( + actual_partition_path, + cc_mounts.sanitize_devname('ephemeral0.3', + lambda x: disk_name, + mock.Mock())) + + def test_transformer_returning_none_returns_none(self): + self.assertIsNone( + cc_mounts.sanitize_devname( + 'ephemeral0', lambda x: None, mock.Mock())) + + def test_missing_device_returns_none(self): + self.assertIsNone( + cc_mounts.sanitize_devname('/dev/sda', None, mock.Mock())) + + def test_missing_sys_returns_none(self): + disk_path = '/dev/sda' + self._makedirs(disk_path) + self.assertIsNone( + cc_mounts.sanitize_devname(disk_path, None, mock.Mock())) + + def test_existent_disk_but_missing_partition_returns_none(self): + disk_path = '/dev/sda' + self.mock_existence_of_disk(disk_path) + self.assertIsNone( + cc_mounts.sanitize_devname( + 'ephemeral0.1', lambda x: disk_path, mock.Mock())) diff --git a/tests/unittests/test_handler/test_handler_power_state.py b/tests/unittests/test_handler/test_handler_power_state.py index 2f86b8f8..04ce5687 100644 --- a/tests/unittests/test_handler/test_handler_power_state.py +++ b/tests/unittests/test_handler/test_handler_power_state.py @@ -1,6 +1,9 @@ +import sys + from cloudinit.config import cc_power_state_change as psc from .. import helpers as t_help +from ..helpers import mock class TestLoadPowerState(t_help.TestCase): @@ -9,12 +12,12 @@ class TestLoadPowerState(t_help.TestCase): def test_no_config(self): # completely empty config should mean do nothing - (cmd, _timeout) = psc.load_power_state({}) + (cmd, _timeout, _condition) = psc.load_power_state({}) self.assertEqual(cmd, None) def test_irrelevant_config(self): # no power_state field in config should return None for cmd - (cmd, _timeout) = psc.load_power_state({'foo': 'bar'}) + (cmd, _timeout, _condition) = psc.load_power_state({'foo': 'bar'}) self.assertEqual(cmd, None) def test_invalid_mode(self): @@ -53,23 +56,59 @@ class TestLoadPowerState(t_help.TestCase): def test_no_message(self): # if message is not present, then no argument should be passed for it cfg = {'power_state': {'mode': 'poweroff'}} - (cmd, _timeout) = psc.load_power_state(cfg) + (cmd, _timeout, _condition) = psc.load_power_state(cfg) self.assertNotIn("", cmd) check_lps_ret(psc.load_power_state(cfg)) self.assertTrue(len(cmd) == 3) + def test_condition_null_raises(self): + cfg = {'power_state': {'mode': 'poweroff', 'condition': None}} + self.assertRaises(TypeError, psc.load_power_state, cfg) + + def test_condition_default_is_true(self): + cfg = {'power_state': {'mode': 'poweroff'}} + _cmd, _timeout, cond = psc.load_power_state(cfg) + self.assertEqual(cond, True) + + +class TestCheckCondition(t_help.TestCase): + def cmd_with_exit(self, rc): + return([sys.executable, '-c', 'import sys; sys.exit(%s)' % rc]) + + def test_true_is_true(self): + self.assertEqual(psc.check_condition(True), True) + + def test_false_is_false(self): + self.assertEqual(psc.check_condition(False), False) + + def test_cmd_exit_zero_true(self): + self.assertEqual(psc.check_condition(self.cmd_with_exit(0)), True) + + def test_cmd_exit_one_false(self): + self.assertEqual(psc.check_condition(self.cmd_with_exit(1)), False) + + def test_cmd_exit_nonzero_warns(self): + mocklog = mock.Mock() + self.assertEqual( + psc.check_condition(self.cmd_with_exit(2), mocklog), False) + self.assertEqual(mocklog.warn.call_count, 1) + def check_lps_ret(psc_return, mode=None): - if len(psc_return) != 2: + if len(psc_return) != 3: raise TypeError("length returned = %d" % len(psc_return)) errs = [] cmd = psc_return[0] timeout = psc_return[1] + condition = psc_return[2] if 'shutdown' not in psc_return[0][0]: errs.append("string 'shutdown' not in cmd") + if condition is None: + errs.append("condition was not returned") + if mode is not None: opt = {'halt': '-H', 'poweroff': '-P', 'reboot': '-r'}[mode] if opt not in psc_return[0]: diff --git a/tests/unittests/test_handler/test_handler_rsyslog.py b/tests/unittests/test_handler/test_handler_rsyslog.py new file mode 100644 index 00000000..b932165c --- /dev/null +++ b/tests/unittests/test_handler/test_handler_rsyslog.py @@ -0,0 +1,174 @@ +import os +import shutil +import tempfile + +from cloudinit.config.cc_rsyslog import ( + apply_rsyslog_changes, DEF_DIR, DEF_FILENAME, DEF_RELOAD, load_config, + parse_remotes_line, remotes_to_rsyslog_cfg) +from cloudinit import util + +from .. import helpers as t_help + + +class TestLoadConfig(t_help.TestCase): + def setUp(self): + super(TestLoadConfig, self).setUp() + self.basecfg = { + 'config_filename': DEF_FILENAME, + 'config_dir': DEF_DIR, + 'service_reload_command': DEF_RELOAD, + 'configs': [], + 'remotes': {}, + } + + def test_legacy_full(self): + found = load_config({ + 'rsyslog': ['*.* @192.168.1.1'], + 'rsyslog_dir': "mydir", + 'rsyslog_filename': "myfilename"}) + self.basecfg.update({ + 'configs': ['*.* @192.168.1.1'], + 'config_dir': "mydir", + 'config_filename': 'myfilename', + 'service_reload_command': 'auto'} + ) + + self.assertEqual(found, self.basecfg) + + def test_legacy_defaults(self): + found = load_config({ + 'rsyslog': ['*.* @192.168.1.1']}) + self.basecfg.update({ + 'configs': ['*.* @192.168.1.1']}) + self.assertEqual(found, self.basecfg) + + def test_new_defaults(self): + self.assertEqual(load_config({}), self.basecfg) + + def test_new_configs(self): + cfgs = ['*.* myhost', '*.* my2host'] + self.basecfg.update({'configs': cfgs}) + self.assertEqual( + load_config({'rsyslog': {'configs': cfgs}}), + self.basecfg) + + +class TestApplyChanges(t_help.TestCase): + def setUp(self): + self.tmp = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, self.tmp) + + def test_simple(self): + cfgline = "*.* foohost" + changed = apply_rsyslog_changes( + configs=[cfgline], def_fname="foo.cfg", cfg_dir=self.tmp) + + fname = os.path.join(self.tmp, "foo.cfg") + self.assertEqual([fname], changed) + self.assertEqual( + util.load_file(fname), cfgline + "\n") + + def test_multiple_files(self): + configs = [ + '*.* foohost', + {'content': 'abc', 'filename': 'my.cfg'}, + {'content': 'filefoo-content', + 'filename': os.path.join(self.tmp, 'mydir/mycfg')}, + ] + + changed = apply_rsyslog_changes( + configs=configs, def_fname="default.cfg", cfg_dir=self.tmp) + + expected = [ + (os.path.join(self.tmp, "default.cfg"), + "*.* foohost\n"), + (os.path.join(self.tmp, "my.cfg"), "abc\n"), + (os.path.join(self.tmp, "mydir/mycfg"), "filefoo-content\n"), + ] + self.assertEqual([f[0] for f in expected], changed) + actual = [] + for fname, _content in expected: + util.load_file(fname) + actual.append((fname, util.load_file(fname),)) + self.assertEqual(expected, actual) + + def test_repeat_def(self): + configs = ['*.* foohost', "*.warn otherhost"] + + changed = apply_rsyslog_changes( + configs=configs, def_fname="default.cfg", cfg_dir=self.tmp) + + fname = os.path.join(self.tmp, "default.cfg") + self.assertEqual([fname], changed) + + expected_content = '\n'.join([c for c in configs]) + '\n' + found_content = util.load_file(fname) + self.assertEqual(expected_content, found_content) + + def test_multiline_content(self): + configs = ['line1', 'line2\nline3\n'] + + apply_rsyslog_changes( + configs=configs, def_fname="default.cfg", cfg_dir=self.tmp) + + fname = os.path.join(self.tmp, "default.cfg") + expected_content = '\n'.join([c for c in configs]) + found_content = util.load_file(fname) + self.assertEqual(expected_content, found_content) + + +class TestParseRemotesLine(t_help.TestCase): + def test_valid_port(self): + r = parse_remotes_line("foo:9") + self.assertEqual(9, r.port) + + def test_invalid_port(self): + with self.assertRaises(ValueError): + parse_remotes_line("*.* foo:abc") + + def test_valid_ipv6(self): + r = parse_remotes_line("*.* [::1]") + self.assertEqual("*.* @[::1]", str(r)) + + def test_valid_ipv6_with_port(self): + r = parse_remotes_line("*.* [::1]:100") + self.assertEqual(r.port, 100) + self.assertEqual(r.addr, "::1") + self.assertEqual("*.* @[::1]:100", str(r)) + + def test_invalid_multiple_colon(self): + with self.assertRaises(ValueError): + parse_remotes_line("*.* ::1:100") + + def test_name_in_string(self): + r = parse_remotes_line("syslog.host", name="foobar") + self.assertEqual("*.* @syslog.host # foobar", str(r)) + + +class TestRemotesToSyslog(t_help.TestCase): + def test_simple(self): + # str rendered line must appear in remotes_to_ryslog_cfg return + mycfg = "*.* myhost" + myline = str(parse_remotes_line(mycfg, name="myname")) + r = remotes_to_rsyslog_cfg({'myname': mycfg}) + lines = r.splitlines() + self.assertEqual(1, len(lines)) + self.assertTrue(myline in r.splitlines()) + + def test_header_footer(self): + header = "#foo head" + footer = "#foo foot" + r = remotes_to_rsyslog_cfg( + {'myname': "*.* myhost"}, header=header, footer=footer) + lines = r.splitlines() + self.assertTrue(header, lines[0]) + self.assertTrue(footer, lines[-1]) + + def test_with_empty_or_null(self): + mycfg = "*.* myhost" + myline = str(parse_remotes_line(mycfg, name="myname")) + r = remotes_to_rsyslog_cfg( + {'myname': mycfg, 'removed': None, 'removed2': ""}) + lines = r.splitlines() + self.assertEqual(1, len(lines)) + self.assertTrue(myline in r.splitlines()) diff --git a/tests/unittests/test_handler/test_handler_seed_random.py b/tests/unittests/test_handler/test_handler_seed_random.py index 40481f16..98bc9b81 100644 --- a/tests/unittests/test_handler/test_handler_seed_random.py +++ b/tests/unittests/test_handler/test_handler_seed_random.py @@ -18,11 +18,10 @@ from cloudinit.config import cc_seed_random -import base64 import gzip import tempfile -from StringIO import StringIO +from six import BytesIO from cloudinit import cloud from cloudinit import distros @@ -69,7 +68,7 @@ class TestRandomSeed(t_help.TestCase): return def _compress(self, text): - contents = StringIO() + contents = BytesIO() gz_fh = gzip.GzipFile(mode='wb', fileobj=contents) gz_fh.write(text) gz_fh.close() @@ -96,7 +95,7 @@ class TestRandomSeed(t_help.TestCase): self.assertEquals("tiny-tim-was-here", contents) def test_append_random_unknown_encoding(self): - data = self._compress("tiny-toe") + data = self._compress(b"tiny-toe") cfg = { 'random_seed': { 'file': self._seed_file, @@ -108,7 +107,7 @@ class TestRandomSeed(t_help.TestCase): self._get_cloud('ubuntu'), LOG, []) def test_append_random_gzip(self): - data = self._compress("tiny-toe") + data = self._compress(b"tiny-toe") cfg = { 'random_seed': { 'file': self._seed_file, @@ -121,7 +120,7 @@ class TestRandomSeed(t_help.TestCase): self.assertEquals("tiny-toe", contents) def test_append_random_gz(self): - data = self._compress("big-toe") + data = self._compress(b"big-toe") cfg = { 'random_seed': { 'file': self._seed_file, @@ -134,7 +133,7 @@ class TestRandomSeed(t_help.TestCase): self.assertEquals("big-toe", contents) def test_append_random_base64(self): - data = base64.b64encode('bubbles') + data = util.b64e('bubbles') cfg = { 'random_seed': { 'file': self._seed_file, @@ -147,7 +146,7 @@ class TestRandomSeed(t_help.TestCase): self.assertEquals("bubbles", contents) def test_append_random_b64(self): - data = base64.b64encode('kit-kat') + data = util.b64e('kit-kat') cfg = { 'random_seed': { 'file': self._seed_file, @@ -171,27 +170,30 @@ class TestRandomSeed(t_help.TestCase): contents = util.load_file(self._seed_file) self.assertEquals('tiny-tim-was-here-so-was-josh', contents) - def test_seed_command_not_provided_pollinate_available(self): + def test_seed_command_provided_and_available(self): c = self._get_cloud('ubuntu', {}) self.whichdata = {'pollinate': '/usr/bin/pollinate'} - cc_seed_random.handle('test', {}, c, LOG, []) + cfg = {'random_seed': {'command': ['pollinate', '-q']}} + cc_seed_random.handle('test', cfg, c, LOG, []) subp_args = [f['args'] for f in self.subp_called] self.assertIn(['pollinate', '-q'], subp_args) - def test_seed_command_not_provided_pollinate_not_available(self): + def test_seed_command_not_provided(self): c = self._get_cloud('ubuntu', {}) self.whichdata = {} cc_seed_random.handle('test', {}, c, LOG, []) # subp should not have been called as which would say not available - self.assertEquals(self.subp_called, list()) + self.assertFalse(self.subp_called) def test_unavailable_seed_command_and_required_raises_error(self): c = self._get_cloud('ubuntu', {}) self.whichdata = {} + cfg = {'random_seed': {'command': ['THIS_NO_COMMAND'], + 'command_required': True}} self.assertRaises(ValueError, cc_seed_random.handle, - 'test', {'random_seed': {'command_required': True}}, c, LOG, []) + 'test', cfg, c, LOG, []) def test_seed_command_and_required(self): c = self._get_cloud('ubuntu', {}) diff --git a/tests/unittests/test_handler/test_handler_set_hostname.py b/tests/unittests/test_handler/test_handler_set_hostname.py index e1530e30..d358b069 100644 --- a/tests/unittests/test_handler/test_handler_set_hostname.py +++ b/tests/unittests/test_handler/test_handler_set_hostname.py @@ -7,9 +7,11 @@ from cloudinit import util from .. import helpers as t_help +import shutil +import tempfile import logging -from StringIO import StringIO +from six import BytesIO from configobj import ConfigObj @@ -19,7 +21,8 @@ LOG = logging.getLogger(__name__) class TestHostname(t_help.FilesystemMockingTestCase): def setUp(self): super(TestHostname, self).setUp() - self.tmp = self.makeDir(prefix="unittest_") + self.tmp = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, self.tmp) def _fetch_distro(self, kind): cls = distros.fetch(kind) @@ -38,8 +41,8 @@ class TestHostname(t_help.FilesystemMockingTestCase): cc_set_hostname.handle('cc_set_hostname', cfg, cc, LOG, []) if not distro.uses_systemd(): - contents = util.load_file("/etc/sysconfig/network") - n_cfg = ConfigObj(StringIO(contents)) + contents = util.load_file("/etc/sysconfig/network", decode=False) + n_cfg = ConfigObj(BytesIO(contents)) self.assertEquals({'HOSTNAME': 'blah.blah.blah.yahoo.com'}, dict(n_cfg)) diff --git a/tests/unittests/test_handler/test_handler_snappy.py b/tests/unittests/test_handler/test_handler_snappy.py new file mode 100644 index 00000000..8aeff53c --- /dev/null +++ b/tests/unittests/test_handler/test_handler_snappy.py @@ -0,0 +1,305 @@ +from cloudinit.config.cc_snappy import ( + makeop, get_package_ops, render_snap_op) +from cloudinit import util +from .. import helpers as t_help + +import os +import shutil +import tempfile +import yaml + +ALLOWED = (dict, list, int, str) + + +class TestInstallPackages(t_help.TestCase): + def setUp(self): + super(TestInstallPackages, self).setUp() + self.unapply = [] + + # by default 'which' has nothing in its path + self.apply_patches([(util, 'subp', self._subp)]) + self.subp_called = [] + self.snapcmds = [] + self.tmp = tempfile.mkdtemp(prefix="TestInstallPackages") + + def tearDown(self): + apply_patches([i for i in reversed(self.unapply)]) + shutil.rmtree(self.tmp) + + def apply_patches(self, patches): + ret = apply_patches(patches) + self.unapply += ret + + def populate_tmp(self, files): + return t_help.populate_dir(self.tmp, files) + + def _subp(self, *args, **kwargs): + # supports subp calling with cmd as args or kwargs + if 'args' not in kwargs: + kwargs['args'] = args[0] + self.subp_called.append(kwargs) + args = kwargs['args'] + # here we basically parse the snappy command invoked + # and append to snapcmds a list of (mode, pkg, config) + if args[0:2] == ['snappy', 'config']: + if args[3] == "-": + config = kwargs.get('data', '') + else: + with open(args[3], "rb") as fp: + config = yaml.safe_load(fp.read()) + self.snapcmds.append(['config', args[2], config]) + elif args[0:2] == ['snappy', 'install']: + config = None + pkg = None + for arg in args[2:]: + if arg.startswith("-"): + continue + if not pkg: + pkg = arg + elif not config: + cfgfile = arg + if cfgfile == "-": + config = kwargs.get('data', '') + elif cfgfile: + with open(cfgfile, "rb") as fp: + config = yaml.safe_load(fp.read()) + self.snapcmds.append(['install', pkg, config]) + + def test_package_ops_1(self): + ret = get_package_ops( + packages=['pkg1', 'pkg2', 'pkg3'], + configs={'pkg2': b'mycfg2'}, installed=[]) + self.assertEqual( + ret, [makeop('install', 'pkg1', None, None), + makeop('install', 'pkg2', b'mycfg2', None), + makeop('install', 'pkg3', None, None)]) + + def test_package_ops_config_only(self): + ret = get_package_ops( + packages=None, + configs={'pkg2': b'mycfg2'}, installed=['pkg1', 'pkg2']) + self.assertEqual( + ret, [makeop('config', 'pkg2', b'mycfg2')]) + + def test_package_ops_install_and_config(self): + ret = get_package_ops( + packages=['pkg3', 'pkg2'], + configs={'pkg2': b'mycfg2', 'xinstalled': b'xcfg'}, + installed=['xinstalled']) + self.assertEqual( + ret, [makeop('install', 'pkg3'), + makeop('install', 'pkg2', b'mycfg2'), + makeop('config', 'xinstalled', b'xcfg')]) + + def test_package_ops_install_long_config_short(self): + # a package can be installed by full name, but have config by short + cfg = {'k1': 'k2'} + ret = get_package_ops( + packages=['config-example.canonical'], + configs={'config-example': cfg}, installed=[]) + self.assertEqual( + ret, [makeop('install', 'config-example.canonical', cfg)]) + + def test_package_ops_with_file(self): + self.populate_tmp( + {"snapf1.snap": b"foo1", "snapf1.config": b"snapf1cfg", + "snapf2.snap": b"foo2", "foo.bar": "ignored"}) + ret = get_package_ops( + packages=['pkg1'], configs={}, installed=[], fspath=self.tmp) + self.assertEqual( + ret, + [makeop_tmpd(self.tmp, 'install', 'snapf1', path="snapf1.snap", + cfgfile="snapf1.config"), + makeop_tmpd(self.tmp, 'install', 'snapf2', path="snapf2.snap"), + makeop('install', 'pkg1')]) + + def test_package_ops_common_filename(self): + # fish package name from filename + # package names likely look like: pkgname.namespace_version_arch.snap + + # find filenames + self.populate_tmp( + {"pkg-ws.smoser_0.3.4_all.snap": "pkg-ws-snapdata", + "pkg-ws.config": "pkg-ws-config", + "pkg1.smoser_1.2.3_all.snap": "pkg1.snapdata", + "pkg1.smoser.config": "pkg1.smoser.config-data", + "pkg1.config": "pkg1.config-data", + "pkg2.smoser_0.0_amd64.snap": "pkg2-snapdata", + "pkg2.smoser_0.0_amd64.config": "pkg2.config"}) + + ret = get_package_ops( + packages=[], configs={}, installed=[], fspath=self.tmp) + self.assertEqual( + ret, + [makeop_tmpd(self.tmp, 'install', 'pkg-ws.smoser', + path="pkg-ws.smoser_0.3.4_all.snap", + cfgfile="pkg-ws.config"), + makeop_tmpd(self.tmp, 'install', 'pkg1.smoser', + path="pkg1.smoser_1.2.3_all.snap", + cfgfile="pkg1.smoser.config"), + makeop_tmpd(self.tmp, 'install', 'pkg2.smoser', + path="pkg2.smoser_0.0_amd64.snap", + cfgfile="pkg2.smoser_0.0_amd64.config"), + ]) + + def test_package_ops_config_overrides_file(self): + # config data overrides local file .config + self.populate_tmp( + {"snapf1.snap": b"foo1", "snapf1.config": b"snapf1cfg"}) + ret = get_package_ops( + packages=[], configs={'snapf1': 'snapf1cfg-config'}, + installed=[], fspath=self.tmp) + self.assertEqual( + ret, [makeop_tmpd(self.tmp, 'install', 'snapf1', + path="snapf1.snap", config="snapf1cfg-config")]) + + def test_package_ops_namespacing(self): + cfgs = { + 'config-example': {'k1': 'v1'}, + 'pkg1': {'p1': 'p2'}, + 'ubuntu-core': {'c1': 'c2'}, + 'notinstalled.smoser': {'s1': 's2'}, + } + ret = get_package_ops( + packages=['config-example.canonical'], configs=cfgs, + installed=['config-example.smoser', 'pkg1.canonical', + 'ubuntu-core']) + + expected_configs = [ + makeop('config', 'pkg1', config=cfgs['pkg1']), + makeop('config', 'ubuntu-core', config=cfgs['ubuntu-core'])] + expected_installs = [ + makeop('install', 'config-example.canonical', + config=cfgs['config-example'])] + + installs = [i for i in ret if i['op'] == 'install'] + configs = [c for c in ret if c['op'] == 'config'] + + self.assertEqual(installs, expected_installs) + # configs are not ordered + self.assertEqual(len(configs), len(expected_configs)) + self.assertTrue(all(found in expected_configs for found in configs)) + + def test_render_op_localsnap(self): + self.populate_tmp({"snapf1.snap": b"foo1"}) + op = makeop_tmpd(self.tmp, 'install', 'snapf1', + path='snapf1.snap') + render_snap_op(**op) + self.assertEqual( + self.snapcmds, [['install', op['path'], None]]) + + def test_render_op_localsnap_localconfig(self): + self.populate_tmp( + {"snapf1.snap": b"foo1", 'snapf1.config': b'snapf1cfg'}) + op = makeop_tmpd(self.tmp, 'install', 'snapf1', + path='snapf1.snap', cfgfile='snapf1.config') + render_snap_op(**op) + self.assertEqual( + self.snapcmds, [['install', op['path'], 'snapf1cfg']]) + + def test_render_op_snap(self): + op = makeop('install', 'snapf1') + render_snap_op(**op) + self.assertEqual( + self.snapcmds, [['install', 'snapf1', None]]) + + def test_render_op_snap_config(self): + mycfg = {'key1': 'value1'} + name = "snapf1" + op = makeop('install', name, config=mycfg) + render_snap_op(**op) + self.assertEqual( + self.snapcmds, [['install', name, {'config': {name: mycfg}}]]) + + def test_render_op_config_bytes(self): + name = "snapf1" + mycfg = b'myconfig' + op = makeop('config', name, config=mycfg) + render_snap_op(**op) + self.assertEqual( + self.snapcmds, [['config', 'snapf1', {'config': {name: mycfg}}]]) + + def test_render_op_config_string(self): + name = 'snapf1' + mycfg = 'myconfig: foo\nhisconfig: bar\n' + op = makeop('config', name, config=mycfg) + render_snap_op(**op) + self.assertEqual( + self.snapcmds, [['config', 'snapf1', {'config': {name: mycfg}}]]) + + def test_render_op_config_dict(self): + # config entry for package can be a dict, not a string blob + mycfg = {'foo': 'bar'} + name = 'snapf1' + op = makeop('config', name, config=mycfg) + render_snap_op(**op) + # snapcmds is a list of 3-entry lists. data_found will be the + # blob of data in the file in 'snappy install --config=<file>' + data_found = self.snapcmds[0][2] + self.assertEqual(mycfg, data_found['config'][name]) + + def test_render_op_config_list(self): + # config entry for package can be a list, not a string blob + mycfg = ['foo', 'bar', 'wark', {'f1': 'b1'}] + name = "snapf1" + op = makeop('config', name, config=mycfg) + render_snap_op(**op) + data_found = self.snapcmds[0][2] + self.assertEqual(mycfg, data_found['config'][name]) + + def test_render_op_config_int(self): + # config entry for package can be a list, not a string blob + mycfg = 1 + name = 'snapf1' + op = makeop('config', name, config=mycfg) + render_snap_op(**op) + data_found = self.snapcmds[0][2] + self.assertEqual(mycfg, data_found['config'][name]) + + def test_render_long_configs_short(self): + # install a namespaced package should have un-namespaced config + mycfg = {'k1': 'k2'} + name = 'snapf1' + op = makeop('install', name + ".smoser", config=mycfg) + render_snap_op(**op) + data_found = self.snapcmds[0][2] + self.assertEqual(mycfg, data_found['config'][name]) + + def test_render_does_not_pad_cfgfile(self): + # package_ops with cfgfile should not modify --file= content. + mydata = "foo1: bar1\nk: [l1, l2, l3]\n" + self.populate_tmp( + {"snapf1.snap": b"foo1", "snapf1.config": mydata.encode()}) + ret = get_package_ops( + packages=[], configs={}, installed=[], fspath=self.tmp) + self.assertEqual( + ret, + [makeop_tmpd(self.tmp, 'install', 'snapf1', path="snapf1.snap", + cfgfile="snapf1.config")]) + + # now the op was ok, but test that render didn't mess it up. + render_snap_op(**ret[0]) + data_found = self.snapcmds[0][2] + # the data found gets loaded in the snapcmd interpretation + # so this comparison is a bit lossy, but input to snappy config + # is expected to be yaml loadable, so it should be OK. + self.assertEqual(yaml.safe_load(mydata), data_found) + + +def makeop_tmpd(tmpd, op, name, config=None, path=None, cfgfile=None): + if cfgfile: + cfgfile = os.path.sep.join([tmpd, cfgfile]) + if path: + path = os.path.sep.join([tmpd, path]) + return(makeop(op=op, name=name, config=config, path=path, cfgfile=cfgfile)) + + +def apply_patches(patches): + ret = [] + for (ref, name, replace) in patches: + if replace is None: + continue + orig = getattr(ref, name) + setattr(ref, name, replace) + ret.append((ref, name, orig)) + return ret diff --git a/tests/unittests/test_handler/test_handler_timezone.py b/tests/unittests/test_handler/test_handler_timezone.py index 874db340..e3df8759 100644 --- a/tests/unittests/test_handler/test_handler_timezone.py +++ b/tests/unittests/test_handler/test_handler_timezone.py @@ -29,8 +29,10 @@ from .. import helpers as t_help from configobj import ConfigObj -from StringIO import StringIO +from six import BytesIO +import shutil +import tempfile import logging LOG = logging.getLogger(__name__) @@ -39,7 +41,8 @@ LOG = logging.getLogger(__name__) class TestTimezone(t_help.FilesystemMockingTestCase): def setUp(self): super(TestTimezone, self).setUp() - self.new_root = self.makeDir(prefix="unittest_") + self.new_root = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, self.new_root) def _get_cloud(self, distro): self.patchUtils(self.new_root) @@ -67,8 +70,8 @@ class TestTimezone(t_help.FilesystemMockingTestCase): cc_timezone.handle('cc_timezone', cfg, cc, LOG, []) - contents = util.load_file('/etc/sysconfig/clock') - n_cfg = ConfigObj(StringIO(contents)) + contents = util.load_file('/etc/sysconfig/clock', decode=False) + n_cfg = ConfigObj(BytesIO(contents)) self.assertEquals({'TIMEZONE': cfg['timezone']}, dict(n_cfg)) contents = util.load_file('/etc/localtime') diff --git a/tests/unittests/test_handler/test_handler_yum_add_repo.py b/tests/unittests/test_handler/test_handler_yum_add_repo.py index 435c9787..3a8aa7c1 100644 --- a/tests/unittests/test_handler/test_handler_yum_add_repo.py +++ b/tests/unittests/test_handler/test_handler_yum_add_repo.py @@ -4,9 +4,11 @@ from cloudinit.config import cc_yum_add_repo from .. import helpers +import shutil +import tempfile import logging -from StringIO import StringIO +from six import BytesIO import configobj @@ -16,7 +18,8 @@ LOG = logging.getLogger(__name__) class TestConfig(helpers.FilesystemMockingTestCase): def setUp(self): super(TestConfig, self).setUp() - self.tmp = self.makeDir(prefix="unittest_") + self.tmp = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, self.tmp) def test_bad_config(self): cfg = { @@ -52,8 +55,9 @@ class TestConfig(helpers.FilesystemMockingTestCase): } self.patchUtils(self.tmp) cc_yum_add_repo.handle('yum_add_repo', cfg, None, LOG, []) - contents = util.load_file("/etc/yum.repos.d/epel_testing.repo") - contents = configobj.ConfigObj(StringIO(contents)) + contents = util.load_file("/etc/yum.repos.d/epel_testing.repo", + decode=False) + contents = configobj.ConfigObj(BytesIO(contents)) expected = { 'epel_testing': { 'name': 'Extra Packages for Enterprise Linux 5 - Testing', diff --git a/tests/unittests/test_merging.py b/tests/unittests/test_merging.py index 07b610f7..976d8283 100644 --- a/tests/unittests/test_merging.py +++ b/tests/unittests/test_merging.py @@ -11,11 +11,13 @@ import glob import os import random import re +import six import string SOURCE_PAT = "source*.*yaml" EXPECTED_PAT = "expected%s.yaml" -TYPES = [long, int, dict, str, list, tuple, None] +TYPES = [dict, str, list, tuple, None] +TYPES.extend(six.integer_types) def _old_mergedict(src, cand): @@ -25,7 +27,7 @@ def _old_mergedict(src, cand): Nested dictionaries are merged recursively. """ if isinstance(src, dict) and isinstance(cand, dict): - for (k, v) in cand.iteritems(): + for (k, v) in cand.items(): if k not in src: src[k] = v else: @@ -42,8 +44,8 @@ def _old_mergemanydict(*args): def _random_str(rand): base = '' - for _i in xrange(rand.randint(1, 2 ** 8)): - base += rand.choice(string.letters + string.digits) + for _i in range(rand.randint(1, 2 ** 8)): + base += rand.choice(string.ascii_letters + string.digits) return base @@ -64,7 +66,7 @@ def _make_dict(current_depth, max_depth, rand): if t in [dict, list, tuple]: if t in [dict]: amount = rand.randint(0, 5) - keys = [_random_str(rand) for _i in xrange(0, amount)] + keys = [_random_str(rand) for _i in range(0, amount)] base = {} for k in keys: try: @@ -74,14 +76,14 @@ def _make_dict(current_depth, max_depth, rand): elif t in [list, tuple]: base = [] amount = rand.randint(0, 5) - for _i in xrange(0, amount): + for _i in range(0, amount): try: base.append(_make_dict(current_depth + 1, max_depth, rand)) except _NoMoreException: pass if t in [tuple]: base = tuple(base) - elif t in [long, int]: + elif t in six.integer_types: base = rand.randint(0, 2 ** 8) elif t in [str]: base = _random_str(rand) diff --git a/tests/unittests/test_net.py b/tests/unittests/test_net.py new file mode 100644 index 00000000..dfb31710 --- /dev/null +++ b/tests/unittests/test_net.py @@ -0,0 +1,127 @@ +from cloudinit import util +from cloudinit import net +from .helpers import TestCase + +import base64 +import copy +import io +import gzip +import json +import os + +DHCP_CONTENT_1 = """ +DEVICE='eth0' +PROTO='dhcp' +IPV4ADDR='192.168.122.89' +IPV4BROADCAST='192.168.122.255' +IPV4NETMASK='255.255.255.0' +IPV4GATEWAY='192.168.122.1' +IPV4DNS0='192.168.122.1' +IPV4DNS1='0.0.0.0' +HOSTNAME='foohost' +DNSDOMAIN='' +NISDOMAIN='' +ROOTSERVER='192.168.122.1' +ROOTPATH='' +filename='' +UPTIME='21' +DHCPLEASETIME='3600' +DOMAINSEARCH='foo.com' +""" + +DHCP_EXPECTED_1 = { + 'name': 'eth0', + 'type': 'physical', + 'subnets': [{'broadcast': '192.168.122.255', + 'gateway': '192.168.122.1', + 'dns_search': ['foo.com'], + 'type': 'dhcp', + 'netmask': '255.255.255.0', + 'dns_nameservers': ['192.168.122.1']}], +} + + +STATIC_CONTENT_1 = """ +DEVICE='eth1' +PROTO='static' +IPV4ADDR='10.0.0.2' +IPV4BROADCAST='10.0.0.255' +IPV4NETMASK='255.255.255.0' +IPV4GATEWAY='10.0.0.1' +IPV4DNS0='10.0.1.1' +IPV4DNS1='0.0.0.0' +HOSTNAME='foohost' +UPTIME='21' +DHCPLEASETIME='3600' +DOMAINSEARCH='foo.com' +""" + +STATIC_EXPECTED_1 = { + 'name': 'eth1', + 'type': 'physical', + 'subnets': [{'broadcast': '10.0.0.255', 'gateway': '10.0.0.1', + 'dns_search': ['foo.com'], 'type': 'static', + 'netmask': '255.255.255.0', + 'dns_nameservers': ['10.0.1.1']}], +} + + +class TestNetConfigParsing(TestCase): + simple_cfg = { + 'config': [{"type": "physical", "name": "eth0", + "mac_address": "c0:d6:9f:2c:e8:80", + "subnets": [{"type": "dhcp"}]}]} + + def test_klibc_convert_dhcp(self): + found = net._klibc_to_config_entry(DHCP_CONTENT_1) + self.assertEqual(found, ('eth0', DHCP_EXPECTED_1)) + + def test_klibc_convert_static(self): + found = net._klibc_to_config_entry(STATIC_CONTENT_1) + self.assertEqual(found, ('eth1', STATIC_EXPECTED_1)) + + def test_config_from_klibc_net_cfg(self): + files = [] + pairs = (('net-eth0.cfg', DHCP_CONTENT_1), + ('net-eth1.cfg', STATIC_CONTENT_1)) + + macs = {'eth1': 'b8:ae:ed:75:ff:2b', + 'eth0': 'b8:ae:ed:75:ff:2a'} + + dhcp = copy.deepcopy(DHCP_EXPECTED_1) + dhcp['mac_address'] = macs['eth0'] + + static = copy.deepcopy(STATIC_EXPECTED_1) + static['mac_address'] = macs['eth1'] + + expected = {'version': 1, 'config': [dhcp, static]} + with util.tempdir() as tmpd: + for fname, content in pairs: + fp = os.path.join(tmpd, fname) + files.append(fp) + util.write_file(fp, content) + + found = net.config_from_klibc_net_cfg(files=files, mac_addrs=macs) + self.assertEqual(found, expected) + + def test_cmdline_with_b64(self): + data = base64.b64encode(json.dumps(self.simple_cfg).encode()) + encoded_text = data.decode() + cmdline = 'ro network-config=' + encoded_text + ' root=foo' + found = net.read_kernel_cmdline_config(cmdline=cmdline) + self.assertEqual(found, self.simple_cfg) + + def test_cmdline_with_b64_gz(self): + data = _gzip_data(json.dumps(self.simple_cfg).encode()) + encoded_text = base64.b64encode(data).decode() + cmdline = 'ro network-config=' + encoded_text + ' root=foo' + found = net.read_kernel_cmdline_config(cmdline=cmdline) + self.assertEqual(found, self.simple_cfg) + + +def _gzip_data(data): + with io.BytesIO() as iobuf: + gzfp = gzip.GzipFile(mode="wb", fileobj=iobuf) + gzfp.write(data) + gzfp.close() + return iobuf.getvalue() diff --git a/tests/unittests/test_pathprefix2dict.py b/tests/unittests/test_pathprefix2dict.py index 590c4b82..38fd75b6 100644 --- a/tests/unittests/test_pathprefix2dict.py +++ b/tests/unittests/test_pathprefix2dict.py @@ -1,37 +1,41 @@ from cloudinit import util -from mocker import MockerTestCase -from .helpers import populate_dir +from .helpers import TestCase, populate_dir +import shutil +import tempfile -class TestPathPrefix2Dict(MockerTestCase): + +class TestPathPrefix2Dict(TestCase): def setUp(self): - self.tmp = self.makeDir() + super(TestPathPrefix2Dict, self).setUp() + self.tmp = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, self.tmp) def test_required_only(self): - dirdata = {'f1': 'f1content', 'f2': 'f2content'} + dirdata = {'f1': b'f1content', 'f2': b'f2content'} populate_dir(self.tmp, dirdata) ret = util.pathprefix2dict(self.tmp, required=['f1', 'f2']) self.assertEqual(dirdata, ret) def test_required_missing(self): - dirdata = {'f1': 'f1content'} + dirdata = {'f1': b'f1content'} populate_dir(self.tmp, dirdata) kwargs = {'required': ['f1', 'f2']} self.assertRaises(ValueError, util.pathprefix2dict, self.tmp, **kwargs) def test_no_required_and_optional(self): - dirdata = {'f1': 'f1c', 'f2': 'f2c'} + dirdata = {'f1': b'f1c', 'f2': b'f2c'} populate_dir(self.tmp, dirdata) ret = util.pathprefix2dict(self.tmp, required=None, - optional=['f1', 'f2']) + optional=['f1', 'f2']) self.assertEqual(dirdata, ret) def test_required_and_optional(self): - dirdata = {'f1': 'f1c', 'f2': 'f2c'} + dirdata = {'f1': b'f1c', 'f2': b'f2c'} populate_dir(self.tmp, dirdata) ret = util.pathprefix2dict(self.tmp, required=['f1'], optional=['f2']) diff --git a/tests/unittests/test_registry.py b/tests/unittests/test_registry.py new file mode 100644 index 00000000..bcf01475 --- /dev/null +++ b/tests/unittests/test_registry.py @@ -0,0 +1,28 @@ +from cloudinit.registry import DictRegistry + +from .helpers import (mock, TestCase) + + +class TestDictRegistry(TestCase): + + def test_added_item_included_in_output(self): + registry = DictRegistry() + item_key, item_to_register = 'test_key', mock.Mock() + registry.register_item(item_key, item_to_register) + self.assertEqual({item_key: item_to_register}, + registry.registered_items) + + def test_registry_starts_out_empty(self): + self.assertEqual({}, DictRegistry().registered_items) + + def test_modifying_registered_items_isnt_exposed_to_other_callers(self): + registry = DictRegistry() + registry.registered_items['test_item'] = mock.Mock() + self.assertEqual({}, registry.registered_items) + + def test_keys_cannot_be_replaced(self): + registry = DictRegistry() + item_key = 'test_key' + registry.register_item(item_key, mock.Mock()) + self.assertRaises(ValueError, + registry.register_item, item_key, mock.Mock()) diff --git a/tests/unittests/test_reporting.py b/tests/unittests/test_reporting.py new file mode 100644 index 00000000..32356ef9 --- /dev/null +++ b/tests/unittests/test_reporting.py @@ -0,0 +1,369 @@ +# Copyright 2015 Canonical Ltd. +# This file is part of cloud-init. See LICENCE file for license information. +# +# vi: ts=4 expandtab + +from cloudinit import reporting +from cloudinit.reporting import handlers +from cloudinit.reporting import events + +from .helpers import (mock, TestCase) + + +def _fake_registry(): + return mock.Mock(registered_items={'a': mock.MagicMock(), + 'b': mock.MagicMock()}) + + +class TestReportStartEvent(TestCase): + + @mock.patch('cloudinit.reporting.events.instantiated_handler_registry', + new_callable=_fake_registry) + def test_report_start_event_passes_something_with_as_string_to_handlers( + self, instantiated_handler_registry): + event_name, event_description = 'my_test_event', 'my description' + events.report_start_event(event_name, event_description) + expected_string_representation = ': '.join( + ['start', event_name, event_description]) + for _, handler in ( + instantiated_handler_registry.registered_items.items()): + self.assertEqual(1, handler.publish_event.call_count) + event = handler.publish_event.call_args[0][0] + self.assertEqual(expected_string_representation, event.as_string()) + + +class TestReportFinishEvent(TestCase): + + def _report_finish_event(self, result=events.status.SUCCESS): + event_name, event_description = 'my_test_event', 'my description' + events.report_finish_event( + event_name, event_description, result=result) + return event_name, event_description + + def assertHandlersPassedObjectWithAsString( + self, handlers, expected_as_string): + for _, handler in handlers.items(): + self.assertEqual(1, handler.publish_event.call_count) + event = handler.publish_event.call_args[0][0] + self.assertEqual(expected_as_string, event.as_string()) + + @mock.patch('cloudinit.reporting.events.instantiated_handler_registry', + new_callable=_fake_registry) + def test_report_finish_event_passes_something_with_as_string_to_handlers( + self, instantiated_handler_registry): + event_name, event_description = self._report_finish_event() + expected_string_representation = ': '.join( + ['finish', event_name, events.status.SUCCESS, + event_description]) + self.assertHandlersPassedObjectWithAsString( + instantiated_handler_registry.registered_items, + expected_string_representation) + + @mock.patch('cloudinit.reporting.events.instantiated_handler_registry', + new_callable=_fake_registry) + def test_reporting_successful_finish_has_sensible_string_repr( + self, instantiated_handler_registry): + event_name, event_description = self._report_finish_event( + result=events.status.SUCCESS) + expected_string_representation = ': '.join( + ['finish', event_name, events.status.SUCCESS, + event_description]) + self.assertHandlersPassedObjectWithAsString( + instantiated_handler_registry.registered_items, + expected_string_representation) + + @mock.patch('cloudinit.reporting.events.instantiated_handler_registry', + new_callable=_fake_registry) + def test_reporting_unsuccessful_finish_has_sensible_string_repr( + self, instantiated_handler_registry): + event_name, event_description = self._report_finish_event( + result=events.status.FAIL) + expected_string_representation = ': '.join( + ['finish', event_name, events.status.FAIL, event_description]) + self.assertHandlersPassedObjectWithAsString( + instantiated_handler_registry.registered_items, + expected_string_representation) + + def test_invalid_result_raises_attribute_error(self): + self.assertRaises(ValueError, self._report_finish_event, ("BOGUS",)) + + +class TestReportingEvent(TestCase): + + def test_as_string(self): + event_type, name, description = 'test_type', 'test_name', 'test_desc' + event = events.ReportingEvent(event_type, name, description) + expected_string_representation = ': '.join( + [event_type, name, description]) + self.assertEqual(expected_string_representation, event.as_string()) + + def test_as_dict(self): + event_type, name, desc = 'test_type', 'test_name', 'test_desc' + event = events.ReportingEvent(event_type, name, desc) + expected = {'event_type': event_type, 'name': name, + 'description': desc, 'origin': 'cloudinit'} + + # allow for timestamp to differ, but must be present + as_dict = event.as_dict() + self.assertIn('timestamp', as_dict) + del as_dict['timestamp'] + + self.assertEqual(expected, as_dict) + + +class TestFinishReportingEvent(TestCase): + def test_as_has_result(self): + result = events.status.SUCCESS + name, desc = 'test_name', 'test_desc' + event = events.FinishReportingEvent(name, desc, result) + ret = event.as_dict() + self.assertTrue('result' in ret) + self.assertEqual(ret['result'], result) + + +class TestBaseReportingHandler(TestCase): + + def test_base_reporting_handler_is_abstract(self): + regexp = r".*abstract.*publish_event.*" + self.assertRaisesRegexp(TypeError, regexp, handlers.ReportingHandler) + + +class TestLogHandler(TestCase): + + @mock.patch.object(reporting.handlers.logging, 'getLogger') + def test_appropriate_logger_used(self, getLogger): + event_type, event_name = 'test_type', 'test_name' + event = events.ReportingEvent(event_type, event_name, 'description') + reporting.handlers.LogHandler().publish_event(event) + self.assertEqual( + [mock.call( + 'cloudinit.reporting.{0}.{1}'.format(event_type, event_name))], + getLogger.call_args_list) + + @mock.patch.object(reporting.handlers.logging, 'getLogger') + def test_single_log_message_at_info_published(self, getLogger): + event = events.ReportingEvent('type', 'name', 'description') + reporting.handlers.LogHandler().publish_event(event) + self.assertEqual(1, getLogger.return_value.log.call_count) + + @mock.patch.object(reporting.handlers.logging, 'getLogger') + def test_log_message_uses_event_as_string(self, getLogger): + event = events.ReportingEvent('type', 'name', 'description') + reporting.handlers.LogHandler(level="INFO").publish_event(event) + self.assertIn(event.as_string(), + getLogger.return_value.log.call_args[0][1]) + + +class TestDefaultRegisteredHandler(TestCase): + + def test_log_handler_registered_by_default(self): + registered_items = ( + reporting.instantiated_handler_registry.registered_items) + for _, item in registered_items.items(): + if isinstance(item, reporting.handlers.LogHandler): + break + else: + self.fail('No reporting LogHandler registered by default.') + + +class TestReportingConfiguration(TestCase): + + @mock.patch.object(reporting, 'instantiated_handler_registry') + def test_empty_configuration_doesnt_add_handlers( + self, instantiated_handler_registry): + reporting.update_configuration({}) + self.assertEqual( + 0, instantiated_handler_registry.register_item.call_count) + + @mock.patch.object( + reporting, 'instantiated_handler_registry', reporting.DictRegistry()) + @mock.patch.object(reporting, 'available_handlers') + def test_looks_up_handler_by_type_and_adds_it(self, available_handlers): + handler_type_name = 'test_handler' + handler_cls = mock.Mock() + available_handlers.registered_items = {handler_type_name: handler_cls} + handler_name = 'my_test_handler' + reporting.update_configuration( + {handler_name: {'type': handler_type_name}}) + self.assertEqual( + {handler_name: handler_cls.return_value}, + reporting.instantiated_handler_registry.registered_items) + + @mock.patch.object( + reporting, 'instantiated_handler_registry', reporting.DictRegistry()) + @mock.patch.object(reporting, 'available_handlers') + def test_uses_non_type_parts_of_config_dict_as_kwargs( + self, available_handlers): + handler_type_name = 'test_handler' + handler_cls = mock.Mock() + available_handlers.registered_items = {handler_type_name: handler_cls} + extra_kwargs = {'foo': 'bar', 'bar': 'baz'} + handler_config = extra_kwargs.copy() + handler_config.update({'type': handler_type_name}) + handler_name = 'my_test_handler' + reporting.update_configuration({handler_name: handler_config}) + self.assertEqual( + handler_cls.return_value, + reporting.instantiated_handler_registry.registered_items[ + handler_name]) + self.assertEqual([mock.call(**extra_kwargs)], + handler_cls.call_args_list) + + @mock.patch.object( + reporting, 'instantiated_handler_registry', reporting.DictRegistry()) + @mock.patch.object(reporting, 'available_handlers') + def test_handler_config_not_modified(self, available_handlers): + handler_type_name = 'test_handler' + handler_cls = mock.Mock() + available_handlers.registered_items = {handler_type_name: handler_cls} + handler_config = {'type': handler_type_name, 'foo': 'bar'} + expected_handler_config = handler_config.copy() + reporting.update_configuration({'my_test_handler': handler_config}) + self.assertEqual(expected_handler_config, handler_config) + + @mock.patch.object( + reporting, 'instantiated_handler_registry', reporting.DictRegistry()) + @mock.patch.object(reporting, 'available_handlers') + def test_handlers_removed_if_falseish_specified(self, available_handlers): + handler_type_name = 'test_handler' + handler_cls = mock.Mock() + available_handlers.registered_items = {handler_type_name: handler_cls} + handler_name = 'my_test_handler' + reporting.update_configuration( + {handler_name: {'type': handler_type_name}}) + self.assertEqual( + 1, len(reporting.instantiated_handler_registry.registered_items)) + reporting.update_configuration({handler_name: None}) + self.assertEqual( + 0, len(reporting.instantiated_handler_registry.registered_items)) + + +class TestReportingEventStack(TestCase): + @mock.patch('cloudinit.reporting.events.report_finish_event') + @mock.patch('cloudinit.reporting.events.report_start_event') + def test_start_and_finish_success(self, report_start, report_finish): + with events.ReportEventStack(name="myname", description="mydesc"): + pass + self.assertEqual( + [mock.call('myname', 'mydesc')], report_start.call_args_list) + self.assertEqual( + [mock.call('myname', 'mydesc', events.status.SUCCESS, + post_files=[])], + report_finish.call_args_list) + + @mock.patch('cloudinit.reporting.events.report_finish_event') + @mock.patch('cloudinit.reporting.events.report_start_event') + def test_finish_exception_defaults_fail(self, report_start, report_finish): + name = "myname" + desc = "mydesc" + try: + with events.ReportEventStack(name, description=desc): + raise ValueError("This didnt work") + except ValueError: + pass + self.assertEqual([mock.call(name, desc)], report_start.call_args_list) + self.assertEqual( + [mock.call(name, desc, events.status.FAIL, post_files=[])], + report_finish.call_args_list) + + @mock.patch('cloudinit.reporting.events.report_finish_event') + @mock.patch('cloudinit.reporting.events.report_start_event') + def test_result_on_exception_used(self, report_start, report_finish): + name = "myname" + desc = "mydesc" + try: + with events.ReportEventStack( + name, desc, result_on_exception=events.status.WARN): + raise ValueError("This didnt work") + except ValueError: + pass + self.assertEqual([mock.call(name, desc)], report_start.call_args_list) + self.assertEqual( + [mock.call(name, desc, events.status.WARN, post_files=[])], + report_finish.call_args_list) + + @mock.patch('cloudinit.reporting.events.report_start_event') + def test_child_fullname_respects_parent(self, report_start): + parent_name = "topname" + c1_name = "c1name" + c2_name = "c2name" + c2_expected_fullname = '/'.join([parent_name, c1_name, c2_name]) + c1_expected_fullname = '/'.join([parent_name, c1_name]) + + parent = events.ReportEventStack(parent_name, "topdesc") + c1 = events.ReportEventStack(c1_name, "c1desc", parent=parent) + c2 = events.ReportEventStack(c2_name, "c2desc", parent=c1) + with c1: + report_start.assert_called_with(c1_expected_fullname, "c1desc") + with c2: + report_start.assert_called_with(c2_expected_fullname, "c2desc") + + @mock.patch('cloudinit.reporting.events.report_finish_event') + @mock.patch('cloudinit.reporting.events.report_start_event') + def test_child_result_bubbles_up(self, report_start, report_finish): + parent = events.ReportEventStack("topname", "topdesc") + child = events.ReportEventStack("c_name", "c_desc", parent=parent) + with parent: + with child: + child.result = events.status.WARN + + report_finish.assert_called_with( + "topname", "topdesc", events.status.WARN, post_files=[]) + + @mock.patch('cloudinit.reporting.events.report_finish_event') + def test_message_used_in_finish(self, report_finish): + with events.ReportEventStack("myname", "mydesc", + message="mymessage"): + pass + self.assertEqual( + [mock.call("myname", "mymessage", events.status.SUCCESS, + post_files=[])], + report_finish.call_args_list) + + @mock.patch('cloudinit.reporting.events.report_finish_event') + def test_message_updatable(self, report_finish): + with events.ReportEventStack("myname", "mydesc") as c: + c.message = "all good" + self.assertEqual( + [mock.call("myname", "all good", events.status.SUCCESS, + post_files=[])], + report_finish.call_args_list) + + @mock.patch('cloudinit.reporting.events.report_start_event') + @mock.patch('cloudinit.reporting.events.report_finish_event') + def test_reporting_disabled_does_not_report_events( + self, report_start, report_finish): + with events.ReportEventStack("a", "b", reporting_enabled=False): + pass + self.assertEqual(report_start.call_count, 0) + self.assertEqual(report_finish.call_count, 0) + + @mock.patch('cloudinit.reporting.events.report_start_event') + @mock.patch('cloudinit.reporting.events.report_finish_event') + def test_reporting_child_default_to_parent( + self, report_start, report_finish): + parent = events.ReportEventStack( + "pname", "pdesc", reporting_enabled=False) + child = events.ReportEventStack("cname", "cdesc", parent=parent) + with parent: + with child: + pass + pass + self.assertEqual(report_start.call_count, 0) + self.assertEqual(report_finish.call_count, 0) + + def test_reporting_event_has_sane_repr(self): + myrep = events.ReportEventStack("fooname", "foodesc", + reporting_enabled=True).__repr__() + self.assertIn("fooname", myrep) + self.assertIn("foodesc", myrep) + self.assertIn("True", myrep) + + def test_set_invalid_result_raises_value_error(self): + f = events.ReportEventStack("myname", "mydesc") + self.assertRaises(ValueError, setattr, f, "result", "BOGUS") + + +class TestStatusAccess(TestCase): + def test_invalid_status_access_raises_value_error(self): + self.assertRaises(AttributeError, getattr, events.status, "BOGUS") diff --git a/tests/unittests/test_rh_subscription.py b/tests/unittests/test_rh_subscription.py new file mode 100644 index 00000000..38d5763a --- /dev/null +++ b/tests/unittests/test_rh_subscription.py @@ -0,0 +1,208 @@ +from cloudinit import util +from cloudinit.config import cc_rh_subscription +import logging +import mock +import unittest + + +class GoodTests(unittest.TestCase): + def setUp(self): + super(GoodTests, self).setUp() + self.name = "cc_rh_subscription" + self.cloud_init = None + self.log = logging.getLogger("good_tests") + self.args = [] + self.handle = cc_rh_subscription.handle + self.SM = cc_rh_subscription.SubscriptionManager + + self.config = {'rh_subscription': + {'username': 'scooby@do.com', + 'password': 'scooby-snacks' + }} + self.config_full = {'rh_subscription': + {'username': 'scooby@do.com', + 'password': 'scooby-snacks', + 'auto-attach': True, + 'service-level': 'self-support', + 'add-pool': ['pool1', 'pool2', 'pool3'], + 'enable-repo': ['repo1', 'repo2', 'repo3'], + 'disable-repo': ['repo4', 'repo5'] + }} + + def test_already_registered(self): + ''' + Emulates a system that is already registered. Ensure it gets + a non-ProcessExecution error from is_registered() + ''' + with mock.patch.object(cc_rh_subscription.SubscriptionManager, + '_sub_man_cli') as mockobj: + self.SM.log_success = mock.MagicMock() + self.handle(self.name, self.config, self.cloud_init, + self.log, self.args) + self.assertEqual(self.SM.log_success.call_count, 1) + self.assertEqual(mockobj.call_count, 1) + + def test_simple_registration(self): + ''' + Simple registration with username and password + ''' + self.SM.log_success = mock.MagicMock() + reg = "The system has been registered with ID:" \ + " 12345678-abde-abcde-1234-1234567890abc" + self.SM._sub_man_cli = mock.MagicMock( + side_effect=[util.ProcessExecutionError, (reg, 'bar')]) + self.handle(self.name, self.config, self.cloud_init, + self.log, self.args) + self.assertIn(mock.call(['identity']), + self.SM._sub_man_cli.call_args_list) + self.assertIn(mock.call(['register', '--username=scooby@do.com', + '--password=scooby-snacks'], + logstring_val=True), + self.SM._sub_man_cli.call_args_list) + + self.assertEqual(self.SM.log_success.call_count, 1) + self.assertEqual(self.SM._sub_man_cli.call_count, 2) + + def test_full_registration(self): + ''' + Registration with auto-attach, service-level, adding pools, + and enabling and disabling yum repos + ''' + call_lists = [] + call_lists.append(['attach', '--pool=pool1', '--pool=pool3']) + call_lists.append(['repos', '--enable=repo2', '--enable=repo3', + '--disable=repo5']) + call_lists.append(['attach', '--auto', '--servicelevel=self-support']) + self.SM.log_success = mock.MagicMock() + reg = "The system has been registered with ID:" \ + " 12345678-abde-abcde-1234-1234567890abc" + self.SM._sub_man_cli = mock.MagicMock( + side_effect=[util.ProcessExecutionError, (reg, 'bar'), + ('Service level set to: self-support', ''), + ('pool1\npool3\n', ''), ('pool2\n', ''), ('', ''), + ('Repo ID: repo1\nRepo ID: repo5\n', ''), + ('Repo ID: repo2\nRepo ID: repo3\nRepo ID: ' + 'repo4', ''), + ('', '')]) + self.handle(self.name, self.config_full, self.cloud_init, + self.log, self.args) + for call in call_lists: + self.assertIn(mock.call(call), self.SM._sub_man_cli.call_args_list) + self.assertEqual(self.SM.log_success.call_count, 1) + self.assertEqual(self.SM._sub_man_cli.call_count, 9) + + +class TestBadInput(unittest.TestCase): + name = "cc_rh_subscription" + cloud_init = None + log = logging.getLogger("bad_tests") + args = [] + SM = cc_rh_subscription.SubscriptionManager + reg = "The system has been registered with ID:" \ + " 12345678-abde-abcde-1234-1234567890abc" + + config_no_password = {'rh_subscription': + {'username': 'scooby@do.com' + }} + + config_no_key = {'rh_subscription': + {'activation-key': '1234abcde', + }} + + config_service = {'rh_subscription': + {'username': 'scooby@do.com', + 'password': 'scooby-snacks', + 'service-level': 'self-support' + }} + + config_badpool = {'rh_subscription': + {'username': 'scooby@do.com', + 'password': 'scooby-snacks', + 'add-pool': 'not_a_list' + }} + config_badrepo = {'rh_subscription': + {'username': 'scooby@do.com', + 'password': 'scooby-snacks', + 'enable-repo': 'not_a_list' + }} + config_badkey = {'rh_subscription': + {'activation_key': 'abcdef1234', + 'org': '123', + }} + + def setUp(self): + super(TestBadInput, self).setUp() + self.handle = cc_rh_subscription.handle + + def test_no_password(self): + ''' + Attempt to register without the password key/value + ''' + self.input_is_missing_data(self.config_no_password) + + def test_no_org(self): + ''' + Attempt to register without the org key/value + ''' + self.input_is_missing_data(self.config_no_key) + + def test_service_level_without_auto(self): + ''' + Attempt to register using service-level without the auto-attach key + ''' + self.SM.log_warn = mock.MagicMock() + self.SM._sub_man_cli = mock.MagicMock( + side_effect=[util.ProcessExecutionError, (self.reg, 'bar')]) + self.handle(self.name, self.config_service, self.cloud_init, + self.log, self.args) + self.assertEqual(self.SM._sub_man_cli.call_count, 1) + self.assertEqual(self.SM.log_warn.call_count, 2) + + def test_pool_not_a_list(self): + ''' + Register with pools that are not in the format of a list + ''' + self.SM.log_warn = mock.MagicMock() + self.SM._sub_man_cli = mock.MagicMock( + side_effect=[util.ProcessExecutionError, (self.reg, 'bar')]) + self.handle(self.name, self.config_badpool, self.cloud_init, + self.log, self.args) + self.assertEqual(self.SM._sub_man_cli.call_count, 2) + self.assertEqual(self.SM.log_warn.call_count, 2) + + def test_repo_not_a_list(self): + ''' + Register with repos that are not in the format of a list + ''' + self.SM.log_warn = mock.MagicMock() + self.SM._sub_man_cli = mock.MagicMock( + side_effect=[util.ProcessExecutionError, (self.reg, 'bar')]) + self.handle(self.name, self.config_badrepo, self.cloud_init, + self.log, self.args) + self.assertEqual(self.SM.log_warn.call_count, 3) + self.assertEqual(self.SM._sub_man_cli.call_count, 2) + + def test_bad_key_value(self): + ''' + Attempt to register with a key that we don't know + ''' + self.SM.log_warn = mock.MagicMock() + self.SM._sub_man_cli = mock.MagicMock( + side_effect=[util.ProcessExecutionError, (self.reg, 'bar')]) + self.handle(self.name, self.config_badkey, self.cloud_init, + self.log, self.args) + self.assertEqual(self.SM.log_warn.call_count, 2) + self.assertEqual(self.SM._sub_man_cli.call_count, 1) + + def input_is_missing_data(self, config): + ''' + Helper def for tests that having missing information + ''' + self.SM.log_warn = mock.MagicMock() + self.SM._sub_man_cli = mock.MagicMock( + side_effect=[util.ProcessExecutionError]) + self.handle(self.name, config, self.cloud_init, + self.log, self.args) + self.SM._sub_man_cli.assert_called_with(['identity']) + self.assertEqual(self.SM.log_warn.call_count, 4) + self.assertEqual(self.SM._sub_man_cli.call_count, 1) diff --git a/tests/unittests/test_runs/test_merge_run.py b/tests/unittests/test_runs/test_merge_run.py index 977adb34..d0ec36a9 100644 --- a/tests/unittests/test_runs/test_merge_run.py +++ b/tests/unittests/test_runs/test_merge_run.py @@ -1,20 +1,22 @@ import os +import shutil +import tempfile from .. import helpers -from cloudinit.settings import (PER_INSTANCE) +from cloudinit.settings import PER_INSTANCE from cloudinit import stages from cloudinit import util class TestMergeRun(helpers.FilesystemMockingTestCase): def _patchIn(self, root): - self.restore() self.patchOS(root) self.patchUtils(root) def test_none_ds(self): - new_root = self.makeDir() + new_root = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, new_root) self.replicateTestRoot('simple_ubuntu', new_root) cfg = { 'datasource_list': ['None'], diff --git a/tests/unittests/test_runs/test_simple_run.py b/tests/unittests/test_runs/test_simple_run.py index c9ba949e..e19e65cd 100644 --- a/tests/unittests/test_runs/test_simple_run.py +++ b/tests/unittests/test_runs/test_simple_run.py @@ -1,20 +1,20 @@ import os +import shutil +import tempfile from .. import helpers -from cloudinit.settings import (PER_INSTANCE) +from cloudinit.settings import PER_INSTANCE from cloudinit import stages from cloudinit import util class TestSimpleRun(helpers.FilesystemMockingTestCase): def _patchIn(self, root): - self.restore() self.patchOS(root) self.patchUtils(root) def _pp_root(self, root, repatch=True): - self.restore() for (dirpath, dirnames, filenames) in os.walk(root): print(dirpath) for f in filenames: @@ -33,7 +33,8 @@ class TestSimpleRun(helpers.FilesystemMockingTestCase): self._patchIn(root) def test_none_ds(self): - new_root = self.makeDir() + new_root = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, new_root) self.replicateTestRoot('simple_ubuntu', new_root) cfg = { 'datasource_list': ['None'], @@ -41,7 +42,7 @@ class TestSimpleRun(helpers.FilesystemMockingTestCase): { 'path': '/etc/blah.ini', 'content': 'blah', - 'permissions': 0755, + 'permissions': 0o755, }, ], 'cloud_init_modules': ['write-files'], diff --git a/tests/unittests/test_sshutil.py b/tests/unittests/test_sshutil.py index 3b317121..9aeb1cde 100644 --- a/tests/unittests/test_sshutil.py +++ b/tests/unittests/test_sshutil.py @@ -32,7 +32,8 @@ VALID_CONTENT = { ), } -TEST_OPTIONS = ("no-port-forwarding,no-agent-forwarding,no-X11-forwarding," +TEST_OPTIONS = ( + "no-port-forwarding,no-agent-forwarding,no-X11-forwarding," 'command="echo \'Please login as the user \"ubuntu\" rather than the' 'user \"root\".\';echo;sleep 10"') diff --git a/tests/unittests/test_templating.py b/tests/unittests/test_templating.py index 3ba4ed8a..b9863650 100644 --- a/tests/unittests/test_templating.py +++ b/tests/unittests/test_templating.py @@ -16,11 +16,20 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. +from __future__ import print_function + from . import helpers as test_helpers import textwrap from cloudinit import templater +try: + import Cheetah + HAS_CHEETAH = True + Cheetah # make pyflakes happy, as Cheetah is not used here +except ImportError: + HAS_CHEETAH = False + class TestTemplates(test_helpers.TestCase): def test_render_basic(self): @@ -38,6 +47,7 @@ class TestTemplates(test_helpers.TestCase): out_data = templater.basic_render(in_data, {'b': 2}) self.assertEqual(expected_data.strip(), out_data) + @test_helpers.skipIf(not HAS_CHEETAH, 'cheetah renderer not available') def test_detection(self): blob = "## template:cheetah" @@ -104,5 +114,6 @@ $a,$b''' codename) out_data = templater.basic_render(in_data, - {'mirror': mirror, 'codename': codename}) + {'mirror': mirror, + 'codename': codename}) self.assertEqual(ex_data, out_data) diff --git a/tests/unittests/test_util.py b/tests/unittests/test_util.py index 35e92445..37a984ac 100644 --- a/tests/unittests/test_util.py +++ b/tests/unittests/test_util.py @@ -1,13 +1,21 @@ +from __future__ import print_function + +import logging import os +import shutil import stat +import tempfile + +import six import yaml -from mocker import MockerTestCase +from cloudinit import importer, util from . import helpers -import unittest -from cloudinit import importer -from cloudinit import util +try: + from unittest import mock +except ImportError: + import mock class FakeSelinux(object): @@ -29,7 +37,7 @@ class FakeSelinux(object): self.restored.append(path) -class TestGetCfgOptionListOrStr(unittest.TestCase): +class TestGetCfgOptionListOrStr(helpers.TestCase): def test_not_found_no_default(self): """None is returned if key is not found and no default given.""" config = {} @@ -61,10 +69,11 @@ class TestGetCfgOptionListOrStr(unittest.TestCase): self.assertEqual([], result) -class TestWriteFile(MockerTestCase): +class TestWriteFile(helpers.TestCase): def setUp(self): super(TestWriteFile, self).setUp() - self.tmp = self.makeDir(prefix="unittest_") + self.tmp = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, self.tmp) def test_basic_usage(self): """Verify basic usage with default args.""" @@ -79,7 +88,7 @@ class TestWriteFile(MockerTestCase): create_contents = f.read() self.assertEqual(contents, create_contents) file_stat = os.stat(path) - self.assertEqual(0644, stat.S_IMODE(file_stat.st_mode)) + self.assertEqual(0o644, stat.S_IMODE(file_stat.st_mode)) def test_dir_is_created_if_required(self): """Verifiy that directories are created is required.""" @@ -97,12 +106,12 @@ class TestWriteFile(MockerTestCase): path = os.path.join(self.tmp, "NewFile.txt") contents = "Hey there" - util.write_file(path, contents, mode=0666) + util.write_file(path, contents, mode=0o666) self.assertTrue(os.path.exists(path)) self.assertTrue(os.path.isfile(path)) file_stat = os.stat(path) - self.assertEqual(0666, stat.S_IMODE(file_stat.st_mode)) + self.assertEqual(0o666, stat.S_IMODE(file_stat.st_mode)) def test_custom_omode(self): """Verify custom omode works properly.""" @@ -111,7 +120,7 @@ class TestWriteFile(MockerTestCase): # Create file first with basic content with open(path, "wb") as f: - f.write("LINE1\n") + f.write(b"LINE1\n") util.write_file(path, contents, omode="a") self.assertTrue(os.path.exists(path)) @@ -126,23 +135,24 @@ class TestWriteFile(MockerTestCase): with open(my_file, "w") as fp: fp.write("My Content") - import_mock = self.mocker.replace(importer.import_module, - passthrough=False) - import_mock('selinux') - fake_se = FakeSelinux(my_file) - self.mocker.result(fake_se) - self.mocker.replay() - with util.SeLinuxGuard(my_file) as is_on: - self.assertTrue(is_on) + + with mock.patch.object(importer, 'import_module', + return_value=fake_se) as mockobj: + with util.SeLinuxGuard(my_file) as is_on: + self.assertTrue(is_on) + self.assertEqual(1, len(fake_se.restored)) self.assertEqual(my_file, fake_se.restored[0]) + mockobj.assert_called_once_with('selinux') + -class TestDeleteDirContents(MockerTestCase): +class TestDeleteDirContents(helpers.TestCase): def setUp(self): super(TestDeleteDirContents, self).setUp() - self.tmp = self.makeDir(prefix="unittest_") + self.tmp = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, self.tmp) def assertDirEmpty(self, dirname): self.assertEqual([], os.listdir(dirname)) @@ -157,7 +167,7 @@ class TestDeleteDirContents(MockerTestCase): def test_deletes_files(self): """Single file should be deleted.""" with open(os.path.join(self.tmp, "new_file.txt"), "wb") as f: - f.write("DELETE ME") + f.write(b"DELETE ME") util.delete_dir_contents(self.tmp) @@ -185,7 +195,7 @@ class TestDeleteDirContents(MockerTestCase): os.mkdir(os.path.join(self.tmp, "new_dir")) f_name = os.path.join(self.tmp, "new_dir", "new_file.txt") with open(f_name, "wb") as f: - f.write("DELETE ME") + f.write(b"DELETE ME") util.delete_dir_contents(self.tmp) @@ -196,7 +206,7 @@ class TestDeleteDirContents(MockerTestCase): file_name = os.path.join(self.tmp, "new_file.txt") link_name = os.path.join(self.tmp, "new_file_link.txt") with open(file_name, "wb") as f: - f.write("DELETE ME") + f.write(b"DELETE ME") os.symlink(file_name, link_name) util.delete_dir_contents(self.tmp) @@ -204,20 +214,20 @@ class TestDeleteDirContents(MockerTestCase): self.assertDirEmpty(self.tmp) -class TestKeyValStrings(unittest.TestCase): +class TestKeyValStrings(helpers.TestCase): def test_keyval_str_to_dict(self): expected = {'1': 'one', '2': 'one+one', 'ro': True} cmdline = "1=one ro 2=one+one" self.assertEqual(expected, util.keyval_str_to_dict(cmdline)) -class TestGetCmdline(unittest.TestCase): +class TestGetCmdline(helpers.TestCase): def test_cmdline_reads_debug_env(self): os.environ['DEBUG_PROC_CMDLINE'] = 'abcd 123' self.assertEqual(os.environ['DEBUG_PROC_CMDLINE'], util.get_cmdline()) -class TestLoadYaml(unittest.TestCase): +class TestLoadYaml(helpers.TestCase): mydefault = "7b03a8ebace993d806255121073fed52" def test_simple(self): @@ -246,8 +256,8 @@ class TestLoadYaml(unittest.TestCase): self.mydefault) def test_python_unicode(self): - # complex type of python/unicde is explicitly allowed - myobj = {'1': unicode("FOOBAR")} + # complex type of python/unicode is explicitly allowed + myobj = {'1': six.text_type("FOOBAR")} safe_yaml = yaml.dump(myobj) self.assertEqual(util.load_yaml(blob=safe_yaml, default=self.mydefault), @@ -310,4 +320,170 @@ class TestMountinfoParsing(helpers.ResourceUsingTestCase): expected = ('none', 'tmpfs', '/run/lock') self.assertEqual(expected, util.parse_mount_info('/run/lock', lines)) + +class TestReadDMIData(helpers.FilesystemMockingTestCase): + + def setUp(self): + super(TestReadDMIData, self).setUp() + self.new_root = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, self.new_root) + self.patchOS(self.new_root) + self.patchUtils(self.new_root) + + def _create_sysfs_parent_directory(self): + util.ensure_dir(os.path.join('sys', 'class', 'dmi', 'id')) + + def _create_sysfs_file(self, key, content): + """Mocks the sys path found on Linux systems.""" + self._create_sysfs_parent_directory() + dmi_key = "/sys/class/dmi/id/{0}".format(key) + util.write_file(dmi_key, content) + + def _configure_dmidecode_return(self, key, content, error=None): + """ + In order to test a missing sys path and call outs to dmidecode, this + function fakes the results of dmidecode to test the results. + """ + def _dmidecode_subp(cmd): + if cmd[-1] != key: + raise util.ProcessExecutionError() + return (content, error) + + self.patched_funcs.enter_context( + mock.patch.object(util, 'which', lambda _: True)) + self.patched_funcs.enter_context( + mock.patch.object(util, 'subp', _dmidecode_subp)) + + def patch_mapping(self, new_mapping): + self.patched_funcs.enter_context( + mock.patch('cloudinit.util.DMIDECODE_TO_DMI_SYS_MAPPING', + new_mapping)) + + def test_sysfs_used_with_key_in_mapping_and_file_on_disk(self): + self.patch_mapping({'mapped-key': 'mapped-value'}) + expected_dmi_value = 'sys-used-correctly' + self._create_sysfs_file('mapped-value', expected_dmi_value) + self._configure_dmidecode_return('mapped-key', 'wrong-wrong-wrong') + self.assertEqual(expected_dmi_value, util.read_dmi_data('mapped-key')) + + def test_dmidecode_used_if_no_sysfs_file_on_disk(self): + self.patch_mapping({}) + self._create_sysfs_parent_directory() + expected_dmi_value = 'dmidecode-used' + self._configure_dmidecode_return('use-dmidecode', expected_dmi_value) + self.assertEqual(expected_dmi_value, + util.read_dmi_data('use-dmidecode')) + + def test_none_returned_if_neither_source_has_data(self): + self.patch_mapping({}) + self._configure_dmidecode_return('key', 'value') + self.assertEqual(None, util.read_dmi_data('expect-fail')) + + def test_none_returned_if_dmidecode_not_in_path(self): + self.patched_funcs.enter_context( + mock.patch.object(util, 'which', lambda _: False)) + self.patch_mapping({}) + self.assertEqual(None, util.read_dmi_data('expect-fail')) + + def test_dots_returned_instead_of_foxfox(self): + # uninitialized dmi values show as \xff, return those as . + my_len = 32 + dmi_value = b'\xff' * my_len + b'\n' + expected = "" + dmi_key = 'system-product-name' + sysfs_key = 'product_name' + self._create_sysfs_file(sysfs_key, dmi_value) + self.assertEqual(expected, util.read_dmi_data(dmi_key)) + + +class TestMultiLog(helpers.FilesystemMockingTestCase): + + def _createConsole(self, root): + os.mkdir(os.path.join(root, 'dev')) + open(os.path.join(root, 'dev', 'console'), 'a').close() + + def setUp(self): + super(TestMultiLog, self).setUp() + self.root = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, self.root) + self.patchOS(self.root) + self.patchUtils(self.root) + self.patchOpen(self.root) + self.stdout = six.StringIO() + self.stderr = six.StringIO() + self.patchStdoutAndStderr(self.stdout, self.stderr) + + def test_stderr_used_by_default(self): + logged_string = 'test stderr output' + util.multi_log(logged_string) + self.assertEqual(logged_string, self.stderr.getvalue()) + + def test_stderr_not_used_if_false(self): + util.multi_log('should not see this', stderr=False) + self.assertEqual('', self.stderr.getvalue()) + + def test_logs_go_to_console_by_default(self): + self._createConsole(self.root) + logged_string = 'something very important' + util.multi_log(logged_string) + self.assertEqual(logged_string, open('/dev/console').read()) + + def test_logs_dont_go_to_stdout_if_console_exists(self): + self._createConsole(self.root) + util.multi_log('something') + self.assertEqual('', self.stdout.getvalue()) + + def test_logs_go_to_stdout_if_console_does_not_exist(self): + logged_string = 'something very important' + util.multi_log(logged_string) + self.assertEqual(logged_string, self.stdout.getvalue()) + + def test_logs_go_to_log_if_given(self): + log = mock.MagicMock() + logged_string = 'something very important' + util.multi_log(logged_string, log=log) + self.assertEqual([((mock.ANY, logged_string), {})], + log.log.call_args_list) + + def test_newlines_stripped_from_log_call(self): + log = mock.MagicMock() + expected_string = 'something very important' + util.multi_log('{0}\n'.format(expected_string), log=log) + self.assertEqual((mock.ANY, expected_string), log.log.call_args[0]) + + def test_log_level_defaults_to_debug(self): + log = mock.MagicMock() + util.multi_log('message', log=log) + self.assertEqual((logging.DEBUG, mock.ANY), log.log.call_args[0]) + + def test_given_log_level_used(self): + log = mock.MagicMock() + log_level = mock.Mock() + util.multi_log('message', log=log, log_level=log_level) + self.assertEqual((log_level, mock.ANY), log.log.call_args[0]) + + +class TestMessageFromString(helpers.TestCase): + + def test_unicode_not_messed_up(self): + roundtripped = util.message_from_string(u'\n').as_string() + self.assertNotIn('\x00', roundtripped) + + +class TestReadSeeded(helpers.TestCase): + def setUp(self): + super(TestReadSeeded, self).setUp() + self.tmp = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, self.tmp) + + def test_unicode_not_messed_up(self): + ud = b"userdatablob" + helpers.populate_dir( + self.tmp, {'meta-data': "key1: val1", 'user-data': ud}) + sdir = self.tmp + os.path.sep + (found_md, found_ud) = util.read_seeded(sdir) + + self.assertEqual(found_md, {'key1': 'val1'}) + self.assertEqual(found_ud, ud) + # vi: ts=4 expandtab diff --git a/tests/unittests/test_vmware_config_file.py b/tests/unittests/test_vmware_config_file.py new file mode 100644 index 00000000..d5c7367b --- /dev/null +++ b/tests/unittests/test_vmware_config_file.py @@ -0,0 +1,103 @@ +# vi: ts=4 expandtab +# +# Copyright (C) 2015 Canonical Ltd. +# Copyright (C) 2016 VMware INC. +# +# Author: Sankar Tanguturi <stanguturi@vmware.com> +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 3, as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import logging +import sys +import unittest + +from cloudinit.sources.helpers.vmware.imc.boot_proto import BootProtoEnum +from cloudinit.sources.helpers.vmware.imc.config import Config +from cloudinit.sources.helpers.vmware.imc.config_file import ConfigFile + +logging.basicConfig(level=logging.DEBUG, stream=sys.stdout) +logger = logging.getLogger(__name__) + + +class TestVmwareConfigFile(unittest.TestCase): + + def test_utility_methods(self): + cf = ConfigFile("tests/data/vmware/cust-dhcp-2nic.cfg") + + cf.clear() + + self.assertEqual(0, len(cf), "clear size") + + cf._insertKey(" PASSWORD|-PASS ", " foo ") + cf._insertKey("BAR", " ") + + self.assertEqual(2, len(cf), "insert size") + self.assertEqual('foo', cf["PASSWORD|-PASS"], "password") + self.assertTrue("PASSWORD|-PASS" in cf, "hasPassword") + self.assertFalse(cf.should_keep_current_value("PASSWORD|-PASS"), + "keepPassword") + self.assertFalse(cf.should_remove_current_value("PASSWORD|-PASS"), + "removePassword") + self.assertFalse("FOO" in cf, "hasFoo") + self.assertTrue(cf.should_keep_current_value("FOO"), "keepFoo") + self.assertFalse(cf.should_remove_current_value("FOO"), "removeFoo") + self.assertTrue("BAR" in cf, "hasBar") + self.assertFalse(cf.should_keep_current_value("BAR"), "keepBar") + self.assertTrue(cf.should_remove_current_value("BAR"), "removeBar") + + def test_configfile_static_2nics(self): + cf = ConfigFile("tests/data/vmware/cust-static-2nic.cfg") + + conf = Config(cf) + + self.assertEqual('myhost1', conf.host_name, "hostName") + self.assertEqual('Africa/Abidjan', conf.timezone, "tz") + self.assertTrue(conf.utc, "utc") + + self.assertEqual(['10.20.145.1', '10.20.145.2'], + conf.name_servers, + "dns") + self.assertEqual(['eng.vmware.com', 'proxy.vmware.com'], + conf.dns_suffixes, + "suffixes") + + nics = conf.nics + ipv40 = nics[0].staticIpv4 + + self.assertEqual(2, len(nics), "nics") + self.assertEqual('NIC1', nics[0].name, "nic0") + self.assertEqual('00:50:56:a6:8c:08', nics[0].mac, "mac0") + self.assertEqual(BootProtoEnum.STATIC, nics[0].bootProto, "bootproto0") + self.assertEqual('10.20.87.154', ipv40[0].ip, "ipv4Addr0") + self.assertEqual('255.255.252.0', ipv40[0].netmask, "ipv4Mask0") + self.assertEqual(2, len(ipv40[0].gateways), "ipv4Gw0") + self.assertEqual('10.20.87.253', ipv40[0].gateways[0], "ipv4Gw0_0") + self.assertEqual('10.20.87.105', ipv40[0].gateways[1], "ipv4Gw0_1") + + self.assertEqual(1, len(nics[0].staticIpv6), "ipv6Cnt0") + self.assertEqual('fc00:10:20:87::154', + nics[0].staticIpv6[0].ip, + "ipv6Addr0") + + self.assertEqual('NIC2', nics[1].name, "nic1") + self.assertTrue(not nics[1].staticIpv6, "ipv61 dhcp") + + def test_config_file_dhcp_2nics(self): + cf = ConfigFile("tests/data/vmware/cust-dhcp-2nic.cfg") + + conf = Config(cf) + nics = conf.nics + self.assertEqual(2, len(nics), "nics") + self.assertEqual('NIC1', nics[0].name, "nic0") + self.assertEqual('00:50:56:a6:8c:08', nics[0].mac, "mac0") + self.assertEqual(BootProtoEnum.DHCP, nics[0].bootProto, "bootproto0") |