diff options
Diffstat (limited to 'tests')
40 files changed, 1121 insertions, 874 deletions
diff --git a/tests/unittests/helpers.py b/tests/unittests/helpers.py index 52305397..7516bd02 100644 --- a/tests/unittests/helpers.py +++ b/tests/unittests/helpers.py @@ -1,17 +1,25 @@ +from __future__ import print_function +  import os  import sys +import shutil +import tempfile  import unittest -from contextlib import contextmanager +import six -from mocker import Mocker -from mocker import MockerTestCase +try: +    from unittest import mock +except ImportError: +    import mock +try: +    from contextlib import ExitStack +except ImportError: +    from contextlib2 import ExitStack  from cloudinit import helpers as ch  from cloudinit import util -import shutil -  # Used for detecting different python versions  PY2 = False  PY26 = False @@ -33,8 +41,20 @@ else:          PY3 = True  if PY26: -    # For now add these on, taken from python 2.7 + slightly adjusted +    # For now add these on, taken from python 2.7 + slightly adjusted.  Drop +    # all this once Python 2.6 is dropped as a minimum requirement.      class TestCase(unittest.TestCase): +        def setUp(self): +            super(TestCase, self).setUp() +            self.__all_cleanups = ExitStack() + +        def tearDown(self): +            self.__all_cleanups.close() +            unittest.TestCase.tearDown(self) + +        def addCleanup(self, function, *args, **kws): +            self.__all_cleanups.callback(function, *args, **kws) +          def assertIs(self, expr1, expr2, msg=None):              if expr1 is not expr2:                  standardMsg = '%r is not %r' % (expr1, expr2) @@ -57,10 +77,17 @@ if PY26:                  standardMsg = standardMsg % (value)                  self.fail(self._formatMessage(msg, standardMsg)) +        def assertIsInstance(self, obj, cls, msg=None): +            """Same as self.assertTrue(isinstance(obj, cls)), with a nicer +            default message.""" +            if not isinstance(obj, cls): +                standardMsg = '%s is not an instance of %r' % (repr(obj), cls) +                self.fail(self._formatMessage(msg, standardMsg)) +          def assertDictContainsSubset(self, expected, actual, msg=None):              missing = []              mismatched = [] -            for k, v in expected.iteritems(): +            for k, v in expected.items():                  if k not in actual:                      missing.append(k)                  elif actual[k] != v: @@ -86,17 +113,6 @@ else:          pass -@contextmanager -def mocker(verify_calls=True): -    m = Mocker() -    try: -        yield m -    finally: -        m.restore() -        if verify_calls: -            m.verify() - -  # Makes the old path start  # with new base instead of whatever  # it previously had @@ -121,14 +137,19 @@ def retarget_many_wrapper(new_base, am, old_func):              nam = len(n_args)          for i in range(0, nam):              path = args[i] -            n_args[i] = rebase_path(path, new_base) +            # patchOS() wraps various os and os.path functions, however in +            # Python 3 some of these now accept file-descriptors (integers). +            # That breaks rebase_path() so in lieu of a better solution, just +            # don't rebase if we get a fd. +            if isinstance(path, six.string_types): +                n_args[i] = rebase_path(path, new_base)          return old_func(*n_args, **kwds)      return wrapper -class ResourceUsingTestCase(MockerTestCase): -    def __init__(self, methodName="runTest"): -        MockerTestCase.__init__(self, methodName) +class ResourceUsingTestCase(TestCase): +    def setUp(self): +        super(ResourceUsingTestCase, self).setUp()          self.resource_path = None      def resourceLocation(self, subname=None): @@ -156,17 +177,23 @@ class ResourceUsingTestCase(MockerTestCase):              return fh.read()      def getCloudPaths(self): +        tmpdir = tempfile.mkdtemp() +        self.addCleanup(shutil.rmtree, tmpdir)          cp = ch.Paths({ -            'cloud_dir': self.makeDir(), +            'cloud_dir': tmpdir,              'templates_dir': self.resourceLocation(),          })          return cp  class FilesystemMockingTestCase(ResourceUsingTestCase): -    def __init__(self, methodName="runTest"): -        ResourceUsingTestCase.__init__(self, methodName) -        self.patched_funcs = [] +    def setUp(self): +        super(FilesystemMockingTestCase, self).setUp() +        self.patched_funcs = ExitStack() + +    def tearDown(self): +        self.patched_funcs.close() +        ResourceUsingTestCase.tearDown(self)      def replicateTestRoot(self, example_root, target_root):          real_root = self.resourceLocation() @@ -180,15 +207,6 @@ class FilesystemMockingTestCase(ResourceUsingTestCase):                  make_path = util.abs_join(make_path, f)                  shutil.copy(real_path, make_path) -    def tearDown(self): -        self.restore() -        ResourceUsingTestCase.tearDown(self) - -    def restore(self): -        for (mod, f, func) in self.patched_funcs: -            setattr(mod, f, func) -        self.patched_funcs = [] -      def patchUtils(self, new_root):          patch_funcs = {              util: [('write_file', 1), @@ -205,8 +223,8 @@ class FilesystemMockingTestCase(ResourceUsingTestCase):              for (f, am) in funcs:                  func = getattr(mod, f)                  trap_func = retarget_many_wrapper(new_root, am, func) -                setattr(mod, f, trap_func) -                self.patched_funcs.append((mod, f, func)) +                self.patched_funcs.enter_context( +                    mock.patch.object(mod, f, trap_func))          # Handle subprocess calls          func = getattr(util, 'subp') @@ -214,16 +232,15 @@ class FilesystemMockingTestCase(ResourceUsingTestCase):          def nsubp(*_args, **_kwargs):              return ('', '') -        setattr(util, 'subp', nsubp) -        self.patched_funcs.append((util, 'subp', func)) +        self.patched_funcs.enter_context( +            mock.patch.object(util, 'subp', nsubp))          def null_func(*_args, **_kwargs):              return None          for f in ['chownbyid', 'chownbyname']: -            func = getattr(util, f) -            setattr(util, f, null_func) -            self.patched_funcs.append((util, f, func)) +            self.patched_funcs.enter_context( +                mock.patch.object(util, f, null_func))      def patchOS(self, new_root):          patch_funcs = { @@ -234,8 +251,21 @@ class FilesystemMockingTestCase(ResourceUsingTestCase):              for f in funcs:                  func = getattr(mod, f)                  trap_func = retarget_many_wrapper(new_root, 1, func) -                setattr(mod, f, trap_func) -                self.patched_funcs.append((mod, f, func)) +                self.patched_funcs.enter_context( +                    mock.patch.object(mod, f, trap_func)) + +    def patchOpen(self, new_root): +        trap_func = retarget_many_wrapper(new_root, 1, open) +        name = 'builtins.open' if PY3 else '__builtin__.open' +        self.patched_funcs.enter_context(mock.patch(name, trap_func)) + +    def patchStdoutAndStderr(self, stdout=None, stderr=None): +        if stdout is not None: +            self.patched_funcs.enter_context( +                mock.patch.object(sys, 'stdout', stdout)) +        if stderr is not None: +            self.patched_funcs.enter_context( +                mock.patch.object(sys, 'stderr', stderr))  class HttprettyTestCase(TestCase): @@ -256,7 +286,22 @@ class HttprettyTestCase(TestCase):  def populate_dir(path, files):      if not os.path.exists(path):          os.makedirs(path) -    for (name, content) in files.iteritems(): -        with open(os.path.join(path, name), "w") as fp: -            fp.write(content) +    for (name, content) in files.items(): +        with open(os.path.join(path, name), "wb") as fp: +            fp.write(content.encode('utf-8'))              fp.close() + + +try: +    skipIf = unittest.skipIf +except AttributeError: +    # Python 2.6.  Doesn't have to be high fidelity. +    def skipIf(condition, reason): +        def decorator(func): +            def wrapper(*args, **kws): +                if condition: +                    return func(*args, **kws) +                else: +                    print(reason, file=sys.stderr) +            return wrapper +        return decorator diff --git a/tests/unittests/test__init__.py b/tests/unittests/test__init__.py index 17965488..1a307e56 100644 --- a/tests/unittests/test__init__.py +++ b/tests/unittests/test__init__.py @@ -1,14 +1,25 @@  import os - -from mocker import MockerTestCase, ARGS, KWARGS +import shutil +import tempfile +import unittest + +try: +    from unittest import mock +except ImportError: +    import mock +try: +    from contextlib import ExitStack +except ImportError: +    from contextlib2 import ExitStack  from cloudinit import handlers  from cloudinit import helpers -from cloudinit import importer  from cloudinit import settings  from cloudinit import url_helper  from cloudinit import util +from .helpers import TestCase +  class FakeModule(handlers.Handler):      def __init__(self): @@ -22,76 +33,73 @@ class FakeModule(handlers.Handler):          pass -class TestWalkerHandleHandler(MockerTestCase): +class TestWalkerHandleHandler(TestCase):      def setUp(self): - -        MockerTestCase.setUp(self) +        super(TestWalkerHandleHandler, self).setUp() +        tmpdir = tempfile.mkdtemp() +        self.addCleanup(shutil.rmtree, tmpdir)          self.data = {              "handlercount": 0,              "frequency": "", -            "handlerdir": self.makeDir(), +            "handlerdir": tmpdir,              "handlers": helpers.ContentHandlers(),              "data": None}          self.expected_module_name = "part-handler-%03d" % (              self.data["handlercount"],)          expected_file_name = "%s.py" % self.expected_module_name -        expected_file_fullname = os.path.join(self.data["handlerdir"], -                                              expected_file_name) +        self.expected_file_fullname = os.path.join( +            self.data["handlerdir"], expected_file_name)          self.module_fake = FakeModule()          self.ctype = None          self.filename = None          self.payload = "dummy payload" -        # Mock the write_file function -        write_file_mock = self.mocker.replace(util.write_file, -                                              passthrough=False) -        write_file_mock(expected_file_fullname, self.payload, 0600) +        # Mock the write_file() function.  We'll assert that it got called as +        # expected in each of the individual tests. +        resources = ExitStack() +        self.addCleanup(resources.close) +        self.write_file_mock = resources.enter_context( +            mock.patch('cloudinit.util.write_file'))      def test_no_errors(self):          """Payload gets written to file and added to C{pdata}.""" -        import_mock = self.mocker.replace(importer.import_module, -                                          passthrough=False) -        import_mock(self.expected_module_name) -        self.mocker.result(self.module_fake) -        self.mocker.replay() - -        handlers.walker_handle_handler(self.data, self.ctype, self.filename, -                                       self.payload) - -        self.assertEqual(1, self.data["handlercount"]) +        with mock.patch('cloudinit.importer.import_module', +                        return_value=self.module_fake) as mockobj: +            handlers.walker_handle_handler(self.data, self.ctype, +                                           self.filename, self.payload) +            mockobj.assert_called_with_once(self.expected_module_name) +        self.write_file_mock.assert_called_with_once( +            self.expected_file_fullname, self.payload, 0o600) +        self.assertEqual(self.data['handlercount'], 1)      def test_import_error(self):          """Module import errors are logged. No handler added to C{pdata}.""" -        import_mock = self.mocker.replace(importer.import_module, -                                          passthrough=False) -        import_mock(self.expected_module_name) -        self.mocker.throw(ImportError()) -        self.mocker.replay() - -        handlers.walker_handle_handler(self.data, self.ctype, self.filename, -                                       self.payload) - -        self.assertEqual(0, self.data["handlercount"]) +        with mock.patch('cloudinit.importer.import_module', +                        side_effect=ImportError) as mockobj: +            handlers.walker_handle_handler(self.data, self.ctype, +                                           self.filename, self.payload) +            mockobj.assert_called_with_once(self.expected_module_name) +        self.write_file_mock.assert_called_with_once( +            self.expected_file_fullname, self.payload, 0o600) +        self.assertEqual(self.data['handlercount'], 0)      def test_attribute_error(self):          """Attribute errors are logged. No handler added to C{pdata}.""" -        import_mock = self.mocker.replace(importer.import_module, -                                          passthrough=False) -        import_mock(self.expected_module_name) -        self.mocker.result(self.module_fake) -        self.mocker.throw(AttributeError()) -        self.mocker.replay() - -        handlers.walker_handle_handler(self.data, self.ctype, self.filename, -                                       self.payload) +        with mock.patch('cloudinit.importer.import_module', +                        side_effect=AttributeError, +                        return_value=self.module_fake) as mockobj: +            handlers.walker_handle_handler(self.data, self.ctype, +                                           self.filename, self.payload) +            mockobj.assert_called_with_once(self.expected_module_name) +        self.write_file_mock.assert_called_with_once( +            self.expected_file_fullname, self.payload, 0o600) +        self.assertEqual(self.data['handlercount'], 0) -        self.assertEqual(0, self.data["handlercount"]) - -class TestHandlerHandlePart(MockerTestCase): +class TestHandlerHandlePart(unittest.TestCase):      def setUp(self):          self.data = "fake data" @@ -108,95 +116,80 @@ class TestHandlerHandlePart(MockerTestCase):          C{handle_part} is called without C{frequency} for          C{handler_version} == 1.          """ -        mod_mock = self.mocker.mock() -        getattr(mod_mock, "frequency") -        self.mocker.result(settings.PER_INSTANCE) -        getattr(mod_mock, "handler_version") -        self.mocker.result(1) -        mod_mock.handle_part(self.data, self.ctype, self.filename, -                             self.payload) -        self.mocker.replay() - -        handlers.run_part(mod_mock, self.data, self.filename, -                          self.payload, self.frequency, self.headers) +        mod_mock = mock.Mock(frequency=settings.PER_INSTANCE, +                             handler_version=1) +        handlers.run_part(mod_mock, self.data, self.filename, self.payload, +                          self.frequency, self.headers) +        # Assert that the handle_part() method of the mock object got +        # called with the expected arguments. +        mod_mock.handle_part.assert_called_with_once( +            self.data, self.ctype, self.filename, self.payload)      def test_normal_version_2(self):          """          C{handle_part} is called with C{frequency} for          C{handler_version} == 2.          """ -        mod_mock = self.mocker.mock() -        getattr(mod_mock, "frequency") -        self.mocker.result(settings.PER_INSTANCE) -        getattr(mod_mock, "handler_version") -        self.mocker.result(2) -        mod_mock.handle_part(self.data, self.ctype, self.filename, -                             self.payload, self.frequency) -        self.mocker.replay() - -        handlers.run_part(mod_mock, self.data, self.filename, -                          self.payload, self.frequency, self.headers) +        mod_mock = mock.Mock(frequency=settings.PER_INSTANCE, +                             handler_version=2) +        handlers.run_part(mod_mock, self.data, self.filename, self.payload, +                          self.frequency, self.headers) +        # Assert that the handle_part() method of the mock object got +        # called with the expected arguments. +        mod_mock.handle_part.assert_called_with_once( +            self.data, self.ctype, self.filename, self.payload)      def test_modfreq_per_always(self):          """          C{handle_part} is called regardless of frequency if nofreq is always.          """          self.frequency = "once" -        mod_mock = self.mocker.mock() -        getattr(mod_mock, "frequency") -        self.mocker.result(settings.PER_ALWAYS) -        getattr(mod_mock, "handler_version") -        self.mocker.result(1) -        mod_mock.handle_part(self.data, self.ctype, self.filename, -                             self.payload) -        self.mocker.replay() - -        handlers.run_part(mod_mock, self.data, self.filename, -                          self.payload, self.frequency, self.headers) +        mod_mock = mock.Mock(frequency=settings.PER_ALWAYS, +                             handler_version=1) +        handlers.run_part(mod_mock, self.data, self.filename, self.payload, +                          self.frequency, self.headers) +        # Assert that the handle_part() method of the mock object got +        # called with the expected arguments. +        mod_mock.handle_part.assert_called_with_once( +            self.data, self.ctype, self.filename, self.payload)      def test_no_handle_when_modfreq_once(self):          """C{handle_part} is not called if frequency is once."""          self.frequency = "once" -        mod_mock = self.mocker.mock() -        getattr(mod_mock, "frequency") -        self.mocker.result(settings.PER_ONCE) -        self.mocker.replay() - -        handlers.run_part(mod_mock, self.data, self.filename, -                          self.payload, self.frequency, self.headers) +        mod_mock = mock.Mock(frequency=settings.PER_ONCE) +        handlers.run_part(mod_mock, self.data, self.filename, self.payload, +                          self.frequency, self.headers) +        # Assert that the handle_part() method of the mock object got +        # called with the expected arguments. +        mod_mock.handle_part.assert_called_with_once( +            self.data, self.ctype, self.filename, self.payload)      def test_exception_is_caught(self):          """Exceptions within C{handle_part} are caught and logged.""" -        mod_mock = self.mocker.mock() -        getattr(mod_mock, "frequency") -        self.mocker.result(settings.PER_INSTANCE) -        getattr(mod_mock, "handler_version") -        self.mocker.result(1) -        mod_mock.handle_part(self.data, self.ctype, self.filename, -                             self.payload) -        self.mocker.throw(Exception()) -        self.mocker.replay() - -        handlers.run_part(mod_mock, self.data, self.filename, -                          self.payload, self.frequency, self.headers) - - -class TestCmdlineUrl(MockerTestCase): +        mod_mock = mock.Mock(frequency=settings.PER_INSTANCE, +                             handler_version=1) +        handlers.run_part(mod_mock, self.data, self.filename, self.payload, +                          self.frequency, self.headers) +        mod_mock.handle_part.side_effect = Exception +        handlers.run_part(mod_mock, self.data, self.filename, self.payload, +                          self.frequency, self.headers) +        mod_mock.handle_part.assert_called_with_once( +            self.data, self.ctype, self.filename, self.payload) + + +class TestCmdlineUrl(unittest.TestCase):      def test_invalid_content(self):          url = "http://example.com/foo"          key = "mykey"          payload = "0"          cmdline = "ro %s=%s bar=1" % (key, url) -        mock_readurl = self.mocker.replace(url_helper.readurl, -                                           passthrough=False) -        mock_readurl(url, ARGS, KWARGS) -        self.mocker.result(url_helper.StringResponse(payload)) -        self.mocker.replay() - -        self.assertEqual((key, url, None), -            util.get_cmdline_url(names=[key], starts="xxxxxx", -                                 cmdline=cmdline)) +        with mock.patch('cloudinit.url_helper.readurl', +                        return_value=url_helper.StringResponse(payload)): +            self.assertEqual( +                util.get_cmdline_url(names=[key], starts="xxxxxx", +                                     cmdline=cmdline), +                (key, url, None))      def test_valid_content(self):          url = "http://example.com/foo" @@ -204,27 +197,24 @@ class TestCmdlineUrl(MockerTestCase):          payload = "xcloud-config\nmydata: foo\nbar: wark\n"          cmdline = "ro %s=%s bar=1" % (key, url) -        mock_readurl = self.mocker.replace(url_helper.readurl, -                                           passthrough=False) -        mock_readurl(url, ARGS, KWARGS) -        self.mocker.result(url_helper.StringResponse(payload)) -        self.mocker.replay() - -        self.assertEqual((key, url, payload), -            util.get_cmdline_url(names=[key], starts="xcloud-config", -                            cmdline=cmdline)) +        with mock.patch('cloudinit.url_helper.readurl', +                        return_value=url_helper.StringResponse(payload)): +            self.assertEqual( +                util.get_cmdline_url(names=[key], starts="xcloud-config", +                                     cmdline=cmdline), +                (key, url, payload))      def test_no_key_found(self):          url = "http://example.com/foo"          key = "mykey"          cmdline = "ro %s=%s bar=1" % (key, url) -        self.mocker.replace(url_helper.readurl, passthrough=False) -        self.mocker.result(url_helper.StringResponse("")) -        self.mocker.replay() +        with mock.patch('cloudinit.url_helper.readurl', +                        return_value=url_helper.StringResponse('')): +            self.assertEqual( +                util.get_cmdline_url(names=["does-not-appear"], +                                     starts="#cloud-config", cmdline=cmdline), +                (None, None, None)) -        self.assertEqual((None, None, None), -            util.get_cmdline_url(names=["does-not-appear"], -                starts="#cloud-config", cmdline=cmdline))  # vi: ts=4 expandtab diff --git a/tests/unittests/test_builtin_handlers.py b/tests/unittests/test_builtin_handlers.py index af7f442e..ad32d0b2 100644 --- a/tests/unittests/test_builtin_handlers.py +++ b/tests/unittests/test_builtin_handlers.py @@ -1,6 +1,13 @@  """Tests of the built-in user data handlers."""  import os +import shutil +import tempfile + +try: +    from unittest import mock +except ImportError: +    import mock  from . import helpers as test_helpers @@ -14,10 +21,11 @@ from cloudinit.settings import (PER_ALWAYS, PER_INSTANCE)  class TestBuiltins(test_helpers.FilesystemMockingTestCase): -      def test_upstart_frequency_no_out(self): -        c_root = self.makeDir() -        up_root = self.makeDir() +        c_root = tempfile.mkdtemp() +        self.addCleanup(shutil.rmtree, c_root) +        up_root = tempfile.mkdtemp() +        self.addCleanup(shutil.rmtree, up_root)          paths = helpers.Paths({              'cloud_dir': c_root,              'upstart_dir': up_root, @@ -36,7 +44,8 @@ class TestBuiltins(test_helpers.FilesystemMockingTestCase):      def test_upstart_frequency_single(self):          # files should be written out when frequency is ! per-instance -        new_root = self.makeDir() +        new_root = tempfile.mkdtemp() +        self.addCleanup(shutil.rmtree, new_root)          freq = PER_INSTANCE          self.patchOS(new_root) @@ -49,16 +58,16 @@ class TestBuiltins(test_helpers.FilesystemMockingTestCase):          util.ensure_dir("/run")          util.ensure_dir("/etc/upstart") -        mock_subp = self.mocker.replace(util.subp, passthrough=False) -        mock_subp(["initctl", "reload-configuration"], capture=False) -        self.mocker.replay() +        with mock.patch.object(util, 'subp') as mockobj: +            h = upstart_job.UpstartJobPartHandler(paths) +            h.handle_part('', handlers.CONTENT_START, +                          None, None, None) +            h.handle_part('blah', 'text/upstart-job', +                          'test.conf', 'blah', freq) +            h.handle_part('', handlers.CONTENT_END, +                          None, None, None) -        h = upstart_job.UpstartJobPartHandler(paths) -        h.handle_part('', handlers.CONTENT_START, -                      None, None, None) -        h.handle_part('blah', 'text/upstart-job', -                      'test.conf', 'blah', freq) -        h.handle_part('', handlers.CONTENT_END, -                      None, None, None) +            self.assertEquals(len(os.listdir('/etc/upstart')), 1) -        self.assertEquals(1, len(os.listdir('/etc/upstart'))) +        mockobj.assert_called_once_with( +            ['initctl', 'reload-configuration'], capture=False) diff --git a/tests/unittests/test_cs_util.py b/tests/unittests/test_cs_util.py index 7d59222b..d7273035 100644 --- a/tests/unittests/test_cs_util.py +++ b/tests/unittests/test_cs_util.py @@ -1,7 +1,21 @@ -from mocker import MockerTestCase +from __future__ import print_function + +import sys +import unittest  from cloudinit.cs_utils import Cepko +try: +    skip = unittest.skip +except AttributeError: +    # Python 2.6.  Doesn't have to be high fidelity. +    def skip(reason): +        def decorator(func): +            def wrapper(*args, **kws): +                print(reason, file=sys.stderr) +            return wrapper +        return decorator +  SERVER_CONTEXT = {      "cpu": 1000, @@ -26,16 +40,21 @@ class CepkoMock(Cepko):          return SERVER_CONTEXT['tags'] -class CepkoResultTests(MockerTestCase): +# 2015-01-22 BAW: This test is completely useless because it only ever tests +# the CepkoMock object.  Even in its original form, I don't think it ever +# touched the underlying Cepko class methods. +@skip('This test is completely useless') +class CepkoResultTests(unittest.TestCase):      def setUp(self): -        self.mocked = self.mocker.replace("cloudinit.cs_utils.Cepko", -                            spec=CepkoMock, -                            count=False, -                            passthrough=False) -        self.mocked() -        self.mocker.result(CepkoMock()) -        self.mocker.replay() -        self.c = Cepko() +        pass +        # self.mocked = self.mocker.replace("cloudinit.cs_utils.Cepko", +        #                     spec=CepkoMock, +        #                     count=False, +        #                     passthrough=False) +        # self.mocked() +        # self.mocker.result(CepkoMock()) +        # self.mocker.replay() +        # self.c = Cepko()      def test_getitem(self):          result = self.c.all() diff --git a/tests/unittests/test_data.py b/tests/unittests/test_data.py index fd6bd8a1..e5b227f8 100644 --- a/tests/unittests/test_data.py +++ b/tests/unittests/test_data.py @@ -1,10 +1,17 @@  """Tests for handling of userdata within cloud init.""" -import StringIO -  import gzip  import logging  import os +import shutil +import tempfile + +try: +    from unittest import mock +except ImportError: +    import mock + +from six import BytesIO, StringIO  from email.mime.application import MIMEApplication  from email.mime.base import MIMEBase @@ -37,23 +44,22 @@ class FakeDataSource(sources.DataSource):  class TestConsumeUserData(helpers.FilesystemMockingTestCase):      def setUp(self): -        helpers.FilesystemMockingTestCase.setUp(self) +        super(TestConsumeUserData, self).setUp()          self._log = None          self._log_file = None          self._log_handler = None      def tearDown(self): -        helpers.FilesystemMockingTestCase.tearDown(self)          if self._log_handler and self._log:              self._log.removeHandler(self._log_handler) +        helpers.FilesystemMockingTestCase.tearDown(self)      def _patchIn(self, root): -        self.restore()          self.patchOS(root)          self.patchUtils(root)      def capture_log(self, lvl=logging.DEBUG): -        log_file = StringIO.StringIO() +        log_file = StringIO()          self._log_handler = logging.StreamHandler(log_file)          self._log_handler.setLevel(lvl)          self._log = log.getLogger() @@ -71,7 +77,8 @@ class TestConsumeUserData(helpers.FilesystemMockingTestCase):          ci = stages.Init()          ci.datasource = FakeDataSource(blob) -        new_root = self.makeDir() +        new_root = tempfile.mkdtemp() +        self.addCleanup(shutil.rmtree, new_root)          self.patchUtils(new_root)          self.patchOS(new_root)          ci.fetch() @@ -99,7 +106,8 @@ class TestConsumeUserData(helpers.FilesystemMockingTestCase):       { "op": "add", "path": "/foo", "value": "quxC" }  ]  ''' -        new_root = self.makeDir() +        new_root = tempfile.mkdtemp() +        self.addCleanup(shutil.rmtree, new_root)          self._patchIn(new_root)          initer = stages.Init()          initer.datasource = FakeDataSource(user_blob, vendordata=vendor_blob) @@ -138,7 +146,8 @@ class TestConsumeUserData(helpers.FilesystemMockingTestCase):       { "op": "add", "path": "/foo", "value": "quxC" }  ]  ''' -        new_root = self.makeDir() +        new_root = tempfile.mkdtemp() +        self.addCleanup(shutil.rmtree, new_root)          self._patchIn(new_root)          initer = stages.Init()          initer.datasource = FakeDataSource(user_blob, vendordata=vendor_blob) @@ -184,7 +193,8 @@ c: d          ci = stages.Init()          ci.datasource = FakeDataSource(str(message)) -        new_root = self.makeDir() +        new_root = tempfile.mkdtemp() +        self.addCleanup(shutil.rmtree, new_root)          self.patchUtils(new_root)          self.patchOS(new_root)          ci.fetch() @@ -214,7 +224,8 @@ name: user  run:   - z  ''' -        new_root = self.makeDir() +        new_root = tempfile.mkdtemp() +        self.addCleanup(shutil.rmtree, new_root)          self._patchIn(new_root)          initer = stages.Init()          initer.datasource = FakeDataSource(user_blob, vendordata=vendor_blob) @@ -249,7 +260,8 @@ vendor_data:    enabled: True    prefix: /bin/true  ''' -        new_root = self.makeDir() +        new_root = tempfile.mkdtemp() +        self.addCleanup(shutil.rmtree, new_root)          self._patchIn(new_root)          initer = stages.Init()          initer.datasource = FakeDataSource(user_blob, vendordata=vendor_blob) @@ -309,7 +321,8 @@ p: 1          paths = c_helpers.Paths({}, ds=FakeDataSource(''))          cloud_cfg = handlers.cloud_config.CloudConfigPartHandler(paths) -        new_root = self.makeDir() +        new_root = tempfile.mkdtemp() +        self.addCleanup(shutil.rmtree, new_root)          self.patchUtils(new_root)          self.patchOS(new_root)          cloud_cfg.handle_part(None, handlers.CONTENT_START, None, None, None, @@ -335,25 +348,24 @@ p: 1          data = "arbitrary text\n"          ci.datasource = FakeDataSource(data) -        mock_write = self.mocker.replace("cloudinit.util.write_file", -                                         passthrough=False) -        mock_write(ci.paths.get_ipath("cloud_config"), "", 0600) -        self.mocker.replay() +        with mock.patch('cloudinit.util.write_file') as mockobj: +            log_file = self.capture_log(logging.WARNING) +            ci.fetch() +            ci.consume_data() +            self.assertIn( +                "Unhandled non-multipart (text/x-not-multipart) userdata:", +                log_file.getvalue()) -        log_file = self.capture_log(logging.WARNING) -        ci.fetch() -        ci.consume_data() -        self.assertIn( -            "Unhandled non-multipart (text/x-not-multipart) userdata:", -            log_file.getvalue()) +        mockobj.assert_called_once_with( +            ci.paths.get_ipath("cloud_config"), "", 0o600)      def test_mime_gzip_compressed(self):          """Tests that individual message gzip encoding works."""          def gzip_part(text): -            contents = StringIO.StringIO() -            f = gzip.GzipFile(fileobj=contents, mode='w') -            f.write(str(text)) +            contents = BytesIO() +            f = gzip.GzipFile(fileobj=contents, mode='wb') +            f.write(util.encode_text(text))              f.flush()              f.close()              return MIMEApplication(contents.getvalue(), 'gzip') @@ -374,7 +386,8 @@ c: 4          message.attach(gzip_part(base_content2))          ci = stages.Init()          ci.datasource = FakeDataSource(str(message)) -        new_root = self.makeDir() +        new_root = tempfile.mkdtemp() +        self.addCleanup(shutil.rmtree, new_root)          self.patchUtils(new_root)          self.patchOS(new_root)          ci.fetch() @@ -394,17 +407,15 @@ c: 4          message.set_payload("Just text")          ci.datasource = FakeDataSource(message.as_string()) -        mock_write = self.mocker.replace("cloudinit.util.write_file", -                                         passthrough=False) -        mock_write(ci.paths.get_ipath("cloud_config"), "", 0600) -        self.mocker.replay() - -        log_file = self.capture_log(logging.WARNING) -        ci.fetch() -        ci.consume_data() -        self.assertIn( -            "Unhandled unknown content-type (text/plain)", -            log_file.getvalue()) +        with mock.patch('cloudinit.util.write_file') as mockobj: +            log_file = self.capture_log(logging.WARNING) +            ci.fetch() +            ci.consume_data() +            self.assertIn( +                "Unhandled unknown content-type (text/plain)", +                log_file.getvalue()) +        mockobj.assert_called_once_with( +            ci.paths.get_ipath("cloud_config"), "", 0o600)      def test_shellscript(self):          """Raw text starting #!/bin/sh is treated as script.""" @@ -413,16 +424,17 @@ c: 4          ci.datasource = FakeDataSource(script)          outpath = os.path.join(ci.paths.get_ipath_cur("scripts"), "part-001") -        mock_write = self.mocker.replace("cloudinit.util.write_file", -                                         passthrough=False) -        mock_write(ci.paths.get_ipath("cloud_config"), "", 0600) -        mock_write(outpath, script, 0700) -        self.mocker.replay() -        log_file = self.capture_log(logging.WARNING) -        ci.fetch() -        ci.consume_data() -        self.assertEqual("", log_file.getvalue()) +        with mock.patch('cloudinit.util.write_file') as mockobj: +            log_file = self.capture_log(logging.WARNING) +            ci.fetch() +            ci.consume_data() +            self.assertEqual("", log_file.getvalue()) + +        mockobj.assert_has_calls([ +            mock.call(outpath, script, 0o700), +            mock.call(ci.paths.get_ipath("cloud_config"), "", 0o600), +            ])      def test_mime_text_x_shellscript(self):          """Mime message of type text/x-shellscript is treated as script.""" @@ -433,16 +445,17 @@ c: 4          ci.datasource = FakeDataSource(message.as_string())          outpath = os.path.join(ci.paths.get_ipath_cur("scripts"), "part-001") -        mock_write = self.mocker.replace("cloudinit.util.write_file", -                                         passthrough=False) -        mock_write(ci.paths.get_ipath("cloud_config"), "", 0600) -        mock_write(outpath, script, 0700) -        self.mocker.replay() -        log_file = self.capture_log(logging.WARNING) -        ci.fetch() -        ci.consume_data() -        self.assertEqual("", log_file.getvalue()) +        with mock.patch('cloudinit.util.write_file') as mockobj: +            log_file = self.capture_log(logging.WARNING) +            ci.fetch() +            ci.consume_data() +            self.assertEqual("", log_file.getvalue()) + +        mockobj.assert_has_calls([ +            mock.call(outpath, script, 0o700), +            mock.call(ci.paths.get_ipath("cloud_config"), "", 0o600), +            ])      def test_mime_text_plain_shell(self):          """Mime type text/plain starting #!/bin/sh is treated as script.""" @@ -453,13 +466,14 @@ c: 4          ci.datasource = FakeDataSource(message.as_string())          outpath = os.path.join(ci.paths.get_ipath_cur("scripts"), "part-001") -        mock_write = self.mocker.replace("cloudinit.util.write_file", -                                         passthrough=False) -        mock_write(outpath, script, 0700) -        mock_write(ci.paths.get_ipath("cloud_config"), "", 0600) -        self.mocker.replay() -        log_file = self.capture_log(logging.WARNING) -        ci.fetch() -        ci.consume_data() -        self.assertEqual("", log_file.getvalue()) +        with mock.patch('cloudinit.util.write_file') as mockobj: +            log_file = self.capture_log(logging.WARNING) +            ci.fetch() +            ci.consume_data() +            self.assertEqual("", log_file.getvalue()) + +        mockobj.assert_has_calls([ +            mock.call(outpath, script, 0o700), +            mock.call(ci.paths.get_ipath("cloud_config"), "", 0o600), +            ]) diff --git a/tests/unittests/test_datasource/test_altcloud.py b/tests/unittests/test_datasource/test_altcloud.py index 1a48ee5f..e9cd2fa5 100644 --- a/tests/unittests/test_datasource/test_altcloud.py +++ b/tests/unittests/test_datasource/test_altcloud.py @@ -46,7 +46,7 @@ def _write_cloud_info_file(value):      cifile = open(cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE, 'w')      cifile.write(value)      cifile.close() -    os.chmod(cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE, 0664) +    os.chmod(cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE, 0o664)  def _remove_cloud_info_file(): @@ -67,12 +67,12 @@ def _write_user_data_files(mount_dir, value):      udfile = open(deltacloud_user_data_file, 'w')      udfile.write(value)      udfile.close() -    os.chmod(deltacloud_user_data_file, 0664) +    os.chmod(deltacloud_user_data_file, 0o664)      udfile = open(user_data_file, 'w')      udfile.write(value)      udfile.close() -    os.chmod(user_data_file, 0664) +    os.chmod(user_data_file, 0o664)  def _remove_user_data_files(mount_dir, diff --git a/tests/unittests/test_datasource/test_azure.py b/tests/unittests/test_datasource/test_azure.py index e992a006..38d70fcd 100644 --- a/tests/unittests/test_datasource/test_azure.py +++ b/tests/unittests/test_datasource/test_azure.py @@ -1,14 +1,24 @@  from cloudinit import helpers -from cloudinit.util import load_file +from cloudinit.util import b64e, load_file  from cloudinit.sources import DataSourceAzure -from ..helpers import populate_dir +from ..helpers import TestCase, populate_dir + +try: +    from unittest import mock +except ImportError: +    import mock +try: +    from contextlib import ExitStack +except ImportError: +    from contextlib2 import ExitStack -import base64  import crypt -from mocker import MockerTestCase  import os  import stat  import yaml +import shutil +import tempfile +import unittest  def construct_valid_ovf_env(data=None, pubkeys=None, userdata=None): @@ -40,7 +50,7 @@ def construct_valid_ovf_env(data=None, pubkeys=None, userdata=None):          content += "<%s%s>%s</%s>\n" % (key, attrs, val, key)      if userdata: -        content += "<UserData>%s</UserData>\n" % (base64.b64encode(userdata)) +        content += "<UserData>%s</UserData>\n" % (b64e(userdata))      if pubkeys:          content += "<SSH><PublicKeys>\n" @@ -66,26 +76,25 @@ def construct_valid_ovf_env(data=None, pubkeys=None, userdata=None):      return content -class TestAzureDataSource(MockerTestCase): +class TestAzureDataSource(TestCase):      def setUp(self): -        # makeDir comes from MockerTestCase -        self.tmp = self.makeDir() +        super(TestAzureDataSource, self).setUp() +        self.tmp = tempfile.mkdtemp() +        self.addCleanup(shutil.rmtree, self.tmp)          # patch cloud_dir, so our 'seed_dir' is guaranteed empty          self.paths = helpers.Paths({'cloud_dir': self.tmp})          self.waagent_d = os.path.join(self.tmp, 'var', 'lib', 'waagent') -        self.unapply = [] -        super(TestAzureDataSource, self).setUp() +        self.patches = ExitStack() +        self.addCleanup(self.patches.close) -    def tearDown(self): -        apply_patches([i for i in reversed(self.unapply)]) -        super(TestAzureDataSource, self).tearDown() +        super(TestAzureDataSource, self).setUp()      def apply_patches(self, patches): -        ret = apply_patches(patches) -        self.unapply += ret +        for module, name, new in patches: +            self.patches.enter_context(mock.patch.object(module, name, new))      def _get_ds(self, data): @@ -117,16 +126,14 @@ class TestAzureDataSource(MockerTestCase):          mod = DataSourceAzure          mod.BUILTIN_DS_CONFIG['data_dir'] = self.waagent_d -        self.apply_patches([(mod, 'list_possible_azure_ds_devs', dsdevs)]) - -        self.apply_patches([(mod, 'invoke_agent', _invoke_agent), -                            (mod, 'wait_for_files', _wait_for_files), -                            (mod, 'pubkeys_from_crt_files', -                             _pubkeys_from_crt_files), -                            (mod, 'iid_from_shared_config', -                             _iid_from_shared_config), -                            (mod, 'apply_hostname_bounce', -                             _apply_hostname_bounce), ]) +        self.apply_patches([ +            (mod, 'list_possible_azure_ds_devs', dsdevs), +            (mod, 'invoke_agent', _invoke_agent), +            (mod, 'wait_for_files', _wait_for_files), +            (mod, 'pubkeys_from_crt_files', _pubkeys_from_crt_files), +            (mod, 'iid_from_shared_config', _iid_from_shared_config), +            (mod, 'apply_hostname_bounce', _apply_hostname_bounce), +            ])          dsrc = mod.DataSourceAzureNet(              data.get('sys_cfg', {}), distro=None, paths=self.paths) @@ -153,7 +160,7 @@ class TestAzureDataSource(MockerTestCase):          ret = dsrc.get_data()          self.assertTrue(ret)          self.assertTrue(os.path.isdir(self.waagent_d)) -        self.assertEqual(stat.S_IMODE(os.stat(self.waagent_d).st_mode), 0700) +        self.assertEqual(stat.S_IMODE(os.stat(self.waagent_d).st_mode), 0o700)      def test_user_cfg_set_agent_command_plain(self):          # set dscfg in via plaintext @@ -174,7 +181,7 @@ class TestAzureDataSource(MockerTestCase):          # set dscfg in via base64 encoded yaml          cfg = {'agent_command': "my_command"}          odata = {'HostName': "myhost", 'UserName': "myuser", -                'dscfg': {'text': base64.b64encode(yaml.dump(cfg)), +                'dscfg': {'text': b64e(yaml.dump(cfg)),                            'encoding': 'base64'}}          data = {'ovfcontent': construct_valid_ovf_env(data=odata)} @@ -226,13 +233,13 @@ class TestAzureDataSource(MockerTestCase):      def test_userdata_found(self):          mydata = "FOOBAR" -        odata = {'UserData': base64.b64encode(mydata)} +        odata = {'UserData': b64e(mydata)}          data = {'ovfcontent': construct_valid_ovf_env(data=odata)}          dsrc = self._get_ds(data)          ret = dsrc.get_data()          self.assertTrue(ret) -        self.assertEqual(dsrc.userdata_raw, mydata) +        self.assertEqual(dsrc.userdata_raw, mydata.encode('utf-8'))      def test_no_datasource_expected(self):          # no source should be found if no seed_dir and no devs @@ -274,7 +281,7 @@ class TestAzureDataSource(MockerTestCase):                                     'command': 'my-bounce-command',                                     'hostname_command': 'my-hostname-command'}}          odata = {'HostName': "xhost", -                'dscfg': {'text': base64.b64encode(yaml.dump(cfg)), +                'dscfg': {'text': b64e(yaml.dump(cfg)),                            'encoding': 'base64'}}          data = {'ovfcontent': construct_valid_ovf_env(data=odata)}          self._get_ds(data).get_data() @@ -289,7 +296,7 @@ class TestAzureDataSource(MockerTestCase):          # config specifying set_hostname off should not bounce          cfg = {'set_hostname': False}          odata = {'HostName': "xhost", -                'dscfg': {'text': base64.b64encode(yaml.dump(cfg)), +                'dscfg': {'text': b64e(yaml.dump(cfg)),                            'encoding': 'base64'}}          data = {'ovfcontent': construct_valid_ovf_env(data=odata)}          self._get_ds(data).get_data() @@ -318,7 +325,7 @@ class TestAzureDataSource(MockerTestCase):          # Make sure that user can affect disk aliases          dscfg = {'disk_aliases': {'ephemeral0': '/dev/sdc'}}          odata = {'HostName': "myhost", 'UserName': "myuser", -                'dscfg': {'text': base64.b64encode(yaml.dump(dscfg)), +                'dscfg': {'text': b64e(yaml.dump(dscfg)),                            'encoding': 'base64'}}          usercfg = {'disk_setup': {'/dev/sdc': {'something': '...'},                                    'ephemeral0': False}} @@ -340,7 +347,7 @@ class TestAzureDataSource(MockerTestCase):          dsrc = self._get_ds(data)          dsrc.get_data() -        self.assertEqual(userdata, dsrc.userdata_raw) +        self.assertEqual(userdata.encode('us-ascii'), dsrc.userdata_raw)      def test_ovf_env_arrives_in_waagent_dir(self):          xml = construct_valid_ovf_env(data={}, userdata="FOODATA") @@ -353,9 +360,15 @@ class TestAzureDataSource(MockerTestCase):          self.assertTrue(os.path.exists(ovf_env_path))          self.assertEqual(xml, load_file(ovf_env_path)) +    def test_ovf_can_include_unicode(self): +        xml = construct_valid_ovf_env(data={}) +        xml = u'\ufeff{0}'.format(xml) +        dsrc = self._get_ds({'ovfcontent': xml}) +        dsrc.get_data() +      def test_existing_ovf_same(self):          # waagent/SharedConfig left alone if found ovf-env.xml same as cached -        odata = {'UserData': base64.b64encode("SOMEUSERDATA")} +        odata = {'UserData': b64e("SOMEUSERDATA")}          data = {'ovfcontent': construct_valid_ovf_env(data=odata)}          populate_dir(self.waagent_d, @@ -379,9 +392,9 @@ class TestAzureDataSource(MockerTestCase):          # 'get_data' should remove SharedConfig.xml in /var/lib/waagent          # if ovf-env.xml differs.          cached_ovfenv = construct_valid_ovf_env( -            {'userdata': base64.b64encode("FOO_USERDATA")}) +            {'userdata': b64e("FOO_USERDATA")})          new_ovfenv = construct_valid_ovf_env( -            {'userdata': base64.b64encode("NEW_USERDATA")}) +            {'userdata': b64e("NEW_USERDATA")})          populate_dir(self.waagent_d,              {'ovf-env.xml': cached_ovfenv, @@ -391,7 +404,7 @@ class TestAzureDataSource(MockerTestCase):          dsrc = self._get_ds({'ovfcontent': new_ovfenv})          ret = dsrc.get_data()          self.assertTrue(ret) -        self.assertEqual(dsrc.userdata_raw, "NEW_USERDATA") +        self.assertEqual(dsrc.userdata_raw, b"NEW_USERDATA")          self.assertTrue(os.path.exists(              os.path.join(self.waagent_d, 'otherfile')))          self.assertFalse( @@ -402,7 +415,7 @@ class TestAzureDataSource(MockerTestCase):              load_file(os.path.join(self.waagent_d, 'ovf-env.xml'))) -class TestReadAzureOvf(MockerTestCase): +class TestReadAzureOvf(TestCase):      def test_invalid_xml_raises_non_azure_ds(self):          invalid_xml = "<foo>" + construct_valid_ovf_env(data={})          self.assertRaises(DataSourceAzure.BrokenAzureDataSource, @@ -417,7 +430,7 @@ class TestReadAzureOvf(MockerTestCase):              self.assertIn(mypk, cfg['_pubkeys']) -class TestReadAzureSharedConfig(MockerTestCase): +class TestReadAzureSharedConfig(unittest.TestCase):      def test_valid_content(self):          xml = """<?xml version="1.0" encoding="utf-8"?>              <SharedConfig> @@ -429,14 +442,3 @@ class TestReadAzureSharedConfig(MockerTestCase):              </SharedConfig>"""          ret = DataSourceAzure.iid_from_shared_config_content(xml)          self.assertEqual("MY_INSTANCE_ID", ret) - - -def apply_patches(patches): -    ret = [] -    for (ref, name, replace) in patches: -        if replace is None: -            continue -        orig = getattr(ref, name) -        setattr(ref, name, replace) -        ret.append((ref, name, orig)) -    return ret diff --git a/tests/unittests/test_datasource/test_cloudsigma.py b/tests/unittests/test_datasource/test_cloudsigma.py index 306ac7d8..772d189a 100644 --- a/tests/unittests/test_datasource/test_cloudsigma.py +++ b/tests/unittests/test_datasource/test_cloudsigma.py @@ -39,6 +39,7 @@ class CepkoMock(Cepko):  class DataSourceCloudSigmaTest(test_helpers.TestCase):      def setUp(self): +        super(DataSourceCloudSigmaTest, self).setUp()          self.datasource = DataSourceCloudSigma.DataSourceCloudSigma("", "", "")          self.datasource.is_running_in_cloudsigma = lambda: True          self.datasource.cepko = CepkoMock(SERVER_CONTEXT) diff --git a/tests/unittests/test_datasource/test_configdrive.py b/tests/unittests/test_datasource/test_configdrive.py index d88066e5..e28bdd84 100644 --- a/tests/unittests/test_datasource/test_configdrive.py +++ b/tests/unittests/test_datasource/test_configdrive.py @@ -1,10 +1,17 @@  from copy import copy  import json  import os -import os.path - -import mocker -from mocker import MockerTestCase +import shutil +import tempfile + +try: +    from unittest import mock +except ImportError: +    import mock +try: +    from contextlib import ExitStack +except ImportError: +    from contextlib2 import ExitStack  from cloudinit import helpers  from cloudinit import settings @@ -12,7 +19,8 @@ from cloudinit.sources import DataSourceConfigDrive as ds  from cloudinit.sources.helpers import openstack  from cloudinit import util -from .. import helpers as unit_helpers +from ..helpers import TestCase +  PUBKEY = u'ssh-rsa AAAAB3NzaC1....sIkJhq8wdX+4I3A4cYbYP ubuntu@server-460\n'  EC2_META = { @@ -64,11 +72,12 @@ CFG_DRIVE_FILES_V2 = {    'openstack/latest/user_data': USER_DATA} -class TestConfigDriveDataSource(MockerTestCase): +class TestConfigDriveDataSource(TestCase):      def setUp(self):          super(TestConfigDriveDataSource, self).setUp() -        self.tmp = self.makeDir() +        self.tmp = tempfile.mkdtemp() +        self.addCleanup(shutil.rmtree, self.tmp)      def test_ec2_metadata(self):          populate_dir(self.tmp, CFG_DRIVE_FILES_V2) @@ -91,23 +100,29 @@ class TestConfigDriveDataSource(MockerTestCase):              'swap': '/dev/vda3',          }          for name, dev_name in name_tests.items(): -            with unit_helpers.mocker() as my_mock: -                find_mock = my_mock.replace(util.find_devs_with, -                                            spec=False, passthrough=False) +            with ExitStack() as mocks:                  provided_name = dev_name[len('/dev/'):]                  provided_name = "s" + provided_name[1:] -                find_mock(mocker.ARGS) -                my_mock.result([provided_name]) -                exists_mock = my_mock.replace(os.path.exists, -                                              spec=False, passthrough=False) -                exists_mock(mocker.ARGS) -                my_mock.result(False) -                exists_mock(mocker.ARGS) -                my_mock.result(True) -                my_mock.replay() +                find_mock = mocks.enter_context( +                    mock.patch.object(util, 'find_devs_with', +                                      return_value=[provided_name])) +                # We want os.path.exists() to return False on its first call, +                # and True on its second call.  We use a handy generator as +                # the mock side effect for this.  The mocked function returns +                # what the side effect returns. + +                def exists_side_effect(): +                    yield False +                    yield True +                exists_mock = mocks.enter_context( +                    mock.patch.object(os.path, 'exists', +                                      side_effect=exists_side_effect()))                  device = cfg_ds.device_name_to_device(name)                  self.assertEquals(dev_name, device) +                find_mock.assert_called_once_with(mock.ANY) +                self.assertEqual(exists_mock.call_count, 2) +      def test_dev_os_map(self):          populate_dir(self.tmp, CFG_DRIVE_FILES_V2)          cfg_ds = ds.DataSourceConfigDrive(settings.CFG_BUILTIN, @@ -123,19 +138,19 @@ class TestConfigDriveDataSource(MockerTestCase):              'swap': '/dev/vda3',          }          for name, dev_name in name_tests.items(): -            with unit_helpers.mocker() as my_mock: -                find_mock = my_mock.replace(util.find_devs_with, -                                            spec=False, passthrough=False) -                find_mock(mocker.ARGS) -                my_mock.result([dev_name]) -                exists_mock = my_mock.replace(os.path.exists, -                                              spec=False, passthrough=False) -                exists_mock(mocker.ARGS) -                my_mock.result(True) -                my_mock.replay() +            with ExitStack() as mocks: +                find_mock = mocks.enter_context( +                    mock.patch.object(util, 'find_devs_with', +                                      return_value=[dev_name])) +                exists_mock = mocks.enter_context( +                    mock.patch.object(os.path, 'exists', +                                      return_value=True))                  device = cfg_ds.device_name_to_device(name)                  self.assertEquals(dev_name, device) +                find_mock.assert_called_once_with(mock.ANY) +                exists_mock.assert_called_once_with(mock.ANY) +      def test_dev_ec2_remap(self):          populate_dir(self.tmp, CFG_DRIVE_FILES_V2)          cfg_ds = ds.DataSourceConfigDrive(settings.CFG_BUILTIN, @@ -156,16 +171,21 @@ class TestConfigDriveDataSource(MockerTestCase):              'root2k': None,          }          for name, dev_name in name_tests.items(): -            with unit_helpers.mocker(verify_calls=False) as my_mock: -                exists_mock = my_mock.replace(os.path.exists, -                                              spec=False, passthrough=False) -                exists_mock(mocker.ARGS) -                my_mock.result(False) -                exists_mock(mocker.ARGS) -                my_mock.result(True) -                my_mock.replay() +            # We want os.path.exists() to return False on its first call, +            # and True on its second call.  We use a handy generator as +            # the mock side effect for this.  The mocked function returns +            # what the side effect returns. +            def exists_side_effect(): +                yield False +                yield True +            with mock.patch.object(os.path, 'exists', +                                   side_effect=exists_side_effect()):                  device = cfg_ds.device_name_to_device(name)                  self.assertEquals(dev_name, device) +                # We don't assert the call count for os.path.exists() because +                # not all of the entries in name_tests results in two calls to +                # that function.  Specifically, 'root2k' doesn't seem to call +                # it at all.      def test_dev_ec2_map(self):          populate_dir(self.tmp, CFG_DRIVE_FILES_V2) @@ -173,12 +193,6 @@ class TestConfigDriveDataSource(MockerTestCase):                                            None,                                            helpers.Paths({}))          found = ds.read_config_drive(self.tmp) -        exists_mock = self.mocker.replace(os.path.exists, -                                          spec=False, passthrough=False) -        exists_mock(mocker.ARGS) -        self.mocker.count(0, None) -        self.mocker.result(True) -        self.mocker.replay()          ec2_md = found['ec2-metadata']          os_md = found['metadata']          cfg_ds.ec2_metadata = ec2_md @@ -193,8 +207,9 @@ class TestConfigDriveDataSource(MockerTestCase):              'root2k': None,          }          for name, dev_name in name_tests.items(): -            device = cfg_ds.device_name_to_device(name) -            self.assertEquals(dev_name, device) +            with mock.patch.object(os.path, 'exists', return_value=True): +                device = cfg_ds.device_name_to_device(name) +                self.assertEquals(dev_name, device)      def test_dir_valid(self):          """Verify a dir is read as such.""" @@ -326,7 +341,7 @@ def populate_ds_from_read_config(cfg_ds, source, results):  def populate_dir(seed_dir, files): -    for (name, content) in files.iteritems(): +    for (name, content) in files.items():          path = os.path.join(seed_dir, name)          dirname = os.path.dirname(path)          if not os.path.isdir(dirname): diff --git a/tests/unittests/test_datasource/test_digitalocean.py b/tests/unittests/test_datasource/test_digitalocean.py index d1270fc2..98f9cfac 100644 --- a/tests/unittests/test_datasource/test_digitalocean.py +++ b/tests/unittests/test_datasource/test_digitalocean.py @@ -18,8 +18,7 @@  import httpretty  import re -from types import ListType -from urlparse import urlparse +from six.moves.urllib_parse import urlparse  from cloudinit import settings  from cloudinit import helpers @@ -110,7 +109,7 @@ class TestDataSourceDigitalOcean(test_helpers.HttprettyTestCase):          self.assertEqual([DO_META.get('public-keys')],                           self.ds.get_public_ssh_keys()) -        self.assertIs(type(self.ds.get_public_ssh_keys()), ListType) +        self.assertIsInstance(self.ds.get_public_ssh_keys(), list)      @httpretty.activate      def test_multiple_ssh_keys(self): @@ -124,4 +123,4 @@ class TestDataSourceDigitalOcean(test_helpers.HttprettyTestCase):          self.assertEqual(DO_META.get('public-keys').splitlines(),                           self.ds.get_public_ssh_keys()) -        self.assertIs(type(self.ds.get_public_ssh_keys()), ListType) +        self.assertIsInstance(self.ds.get_public_ssh_keys(), list) diff --git a/tests/unittests/test_datasource/test_gce.py b/tests/unittests/test_datasource/test_gce.py index 06050bb1..6dd4b5ed 100644 --- a/tests/unittests/test_datasource/test_gce.py +++ b/tests/unittests/test_datasource/test_gce.py @@ -19,7 +19,7 @@ import httpretty  import re  from base64 import b64encode, b64decode -from urlparse import urlparse +from six.moves.urllib_parse import urlparse  from cloudinit import settings  from cloudinit import helpers @@ -45,7 +45,7 @@ GCE_META_ENCODING = {      'instance/id': '12345',      'instance/hostname': 'server.project-baz.local',      'instance/zone': 'baz/bang', -    'instance/attributes/user-data': b64encode('/bin/echo baz\n'), +    'instance/attributes/user-data': b64encode(b'/bin/echo baz\n'),      'instance/attributes/user-data-encoding': 'base64',  } diff --git a/tests/unittests/test_datasource/test_maas.py b/tests/unittests/test_datasource/test_maas.py index c157beb8..d25e1adc 100644 --- a/tests/unittests/test_datasource/test_maas.py +++ b/tests/unittests/test_datasource/test_maas.py @@ -1,19 +1,25 @@  from copy import copy  import os +import shutil +import tempfile  from cloudinit.sources import DataSourceMAAS  from cloudinit import url_helper -from ..helpers import populate_dir +from ..helpers import TestCase, populate_dir -import mocker +try: +    from unittest import mock +except ImportError: +    import mock -class TestMAASDataSource(mocker.MockerTestCase): +class TestMAASDataSource(TestCase):      def setUp(self):          super(TestMAASDataSource, self).setUp()          # Make a temp directoy for tests to use. -        self.tmp = self.makeDir() +        self.tmp = tempfile.mkdtemp() +        self.addCleanup(shutil.rmtree, self.tmp)      def test_seed_dir_valid(self):          """Verify a valid seeddir is read as such.""" @@ -93,16 +99,18 @@ class TestMAASDataSource(mocker.MockerTestCase):      def test_seed_url_valid(self):          """Verify that valid seed_url is read as such.""" -        valid = {'meta-data/instance-id': 'i-instanceid', +        valid = { +            'meta-data/instance-id': 'i-instanceid',              'meta-data/local-hostname': 'test-hostname',              'meta-data/public-keys': 'test-hostname', -            'user-data': 'foodata'} +            'user-data': 'foodata', +            }          valid_order = [              'meta-data/local-hostname',              'meta-data/instance-id',              'meta-data/public-keys',              'user-data', -        ] +            ]          my_seed = "http://example.com/xmeta"          my_ver = "1999-99-99"          my_headers = {'header1': 'value1', 'header2': 'value2'} @@ -110,28 +118,38 @@ class TestMAASDataSource(mocker.MockerTestCase):          def my_headers_cb(url):              return my_headers -        mock_request = self.mocker.replace(url_helper.readurl, -            passthrough=False) - -        for key in valid_order: -            url = "%s/%s/%s" % (my_seed, my_ver, key) -            mock_request(url, headers=None, timeout=mocker.ANY, -                         data=mocker.ANY, sec_between=mocker.ANY, -                         ssl_details=mocker.ANY, retries=mocker.ANY, -                         headers_cb=my_headers_cb, -                         exception_cb=mocker.ANY) -            resp = valid.get(key) -            self.mocker.result(url_helper.StringResponse(resp)) -        self.mocker.replay() - -        (userdata, metadata) = DataSourceMAAS.read_maas_seed_url(my_seed, -            header_cb=my_headers_cb, version=my_ver) - -        self.assertEqual("foodata", userdata) -        self.assertEqual(metadata['instance-id'], -            valid['meta-data/instance-id']) -        self.assertEqual(metadata['local-hostname'], -            valid['meta-data/local-hostname']) +        # Each time url_helper.readurl() is called, something different is +        # returned based on the canned data above.  We need to build up a list +        # of side effect return values, which the mock will return.  At the +        # same time, we'll build up a list of expected call arguments for +        # asserting after the code under test is run. +        calls = [] + +        def side_effect(): +            for key in valid_order: +                resp = valid.get(key) +                url = "%s/%s/%s" % (my_seed, my_ver, key) +                calls.append( +                    mock.call(url, headers=None, timeout=mock.ANY, +                              data=mock.ANY, sec_between=mock.ANY, +                              ssl_details=mock.ANY, retries=mock.ANY, +                              headers_cb=my_headers_cb, +                              exception_cb=mock.ANY)) +                yield url_helper.StringResponse(resp) + +        # Now do the actual call of the code under test. +        with mock.patch.object(url_helper, 'readurl', +                               side_effect=side_effect()) as mockobj: +            userdata, metadata = DataSourceMAAS.read_maas_seed_url( +                my_seed, header_cb=my_headers_cb, version=my_ver) + +            self.assertEqual("foodata", userdata) +            self.assertEqual(metadata['instance-id'], +                             valid['meta-data/instance-id']) +            self.assertEqual(metadata['local-hostname'], +                             valid['meta-data/local-hostname']) + +            mockobj.has_calls(calls)      def test_seed_url_invalid(self):          """Verify that invalid seed_url raises MAASSeedDirMalformed.""" diff --git a/tests/unittests/test_datasource/test_nocloud.py b/tests/unittests/test_datasource/test_nocloud.py index e9235951..4f967f58 100644 --- a/tests/unittests/test_datasource/test_nocloud.py +++ b/tests/unittests/test_datasource/test_nocloud.py @@ -1,35 +1,39 @@  from cloudinit import helpers  from cloudinit.sources import DataSourceNoCloud  from cloudinit import util -from ..helpers import populate_dir +from ..helpers import TestCase, populate_dir -from mocker import MockerTestCase  import os  import yaml +import shutil +import tempfile +import unittest +try: +    from unittest import mock +except ImportError: +    import mock +try: +    from contextlib import ExitStack +except ImportError: +    from contextlib2 import ExitStack -class TestNoCloudDataSource(MockerTestCase): + +class TestNoCloudDataSource(TestCase):      def setUp(self): -        self.tmp = self.makeDir() +        super(TestNoCloudDataSource, self).setUp() +        self.tmp = tempfile.mkdtemp() +        self.addCleanup(shutil.rmtree, self.tmp)          self.paths = helpers.Paths({'cloud_dir': self.tmp})          self.cmdline = "root=TESTCMDLINE" -        self.unapply = [] -        self.apply_patches([(util, 'get_cmdline', self._getcmdline)]) -        super(TestNoCloudDataSource, self).setUp() - -    def tearDown(self): -        apply_patches([i for i in reversed(self.unapply)]) -        super(TestNoCloudDataSource, self).tearDown() +        self.mocks = ExitStack() +        self.addCleanup(self.mocks.close) -    def apply_patches(self, patches): -        ret = apply_patches(patches) -        self.unapply += ret - -    def _getcmdline(self): -        return self.cmdline +        self.mocks.enter_context( +            mock.patch.object(util, 'get_cmdline', return_value=self.cmdline))      def test_nocloud_seed_dir(self):          md = {'instance-id': 'IID', 'dsmode': 'local'} @@ -59,7 +63,9 @@ class TestNoCloudDataSource(MockerTestCase):          def my_find_devs_with(*args, **kwargs):              raise PsuedoException -        self.apply_patches([(util, 'find_devs_with', my_find_devs_with)]) +        self.mocks.enter_context( +            mock.patch.object(util, 'find_devs_with', +                              side_effect=PsuedoException))          # by default, NoCloud should search for filesystems by label          sys_cfg = {'datasource': {'NoCloud': {}}} @@ -85,7 +91,7 @@ class TestNoCloudDataSource(MockerTestCase):          data = {              'fs_label': None, -            'meta-data': {'instance-id': 'IID'}, +            'meta-data': yaml.safe_dump({'instance-id': 'IID'}),              'user-data': "USER_DATA_RAW",          } @@ -133,7 +139,7 @@ class TestNoCloudDataSource(MockerTestCase):          self.assertTrue(ret) -class TestParseCommandLineData(MockerTestCase): +class TestParseCommandLineData(unittest.TestCase):      def test_parse_cmdline_data_valid(self):          ds_id = "ds=nocloud" @@ -178,15 +184,4 @@ class TestParseCommandLineData(MockerTestCase):              self.assertFalse(ret) -def apply_patches(patches): -    ret = [] -    for (ref, name, replace) in patches: -        if replace is None: -            continue -        orig = getattr(ref, name) -        setattr(ref, name, replace) -        ret.append((ref, name, orig)) -    return ret - -  # vi: ts=4 expandtab diff --git a/tests/unittests/test_datasource/test_opennebula.py b/tests/unittests/test_datasource/test_opennebula.py index b4fd1f4d..27adf21b 100644 --- a/tests/unittests/test_datasource/test_opennebula.py +++ b/tests/unittests/test_datasource/test_opennebula.py @@ -1,12 +1,14 @@  from cloudinit import helpers  from cloudinit.sources import DataSourceOpenNebula as ds  from cloudinit import util -from mocker import MockerTestCase -from ..helpers import populate_dir +from ..helpers import TestCase, populate_dir -from base64 import b64encode  import os  import pwd +import shutil +import tempfile +import unittest +  TEST_VARS = {      'VAR1': 'single', @@ -37,12 +39,13 @@ CMD_IP_OUT = '''\  ''' -class TestOpenNebulaDataSource(MockerTestCase): +class TestOpenNebulaDataSource(TestCase):      parsed_user = None      def setUp(self):          super(TestOpenNebulaDataSource, self).setUp() -        self.tmp = self.makeDir() +        self.tmp = tempfile.mkdtemp() +        self.addCleanup(shutil.rmtree, self.tmp)          self.paths = helpers.Paths({'cloud_dir': self.tmp})          # defaults for few tests @@ -176,7 +179,7 @@ class TestOpenNebulaDataSource(MockerTestCase):              self.assertEqual(USER_DATA, results['userdata'])      def test_user_data_encoding_required_for_decode(self): -        b64userdata = b64encode(USER_DATA) +        b64userdata = util.b64e(USER_DATA)          for k in ('USER_DATA', 'USERDATA'):              my_d = os.path.join(self.tmp, k)              populate_context_dir(my_d, {k: b64userdata}) @@ -188,7 +191,7 @@ class TestOpenNebulaDataSource(MockerTestCase):      def test_user_data_base64_encoding(self):          for k in ('USER_DATA', 'USERDATA'):              my_d = os.path.join(self.tmp, k) -            populate_context_dir(my_d, {k: b64encode(USER_DATA), +            populate_context_dir(my_d, {k: util.b64e(USER_DATA),                                          'USERDATA_ENCODING': 'base64'})              results = ds.read_context_disk_dir(my_d) @@ -228,7 +231,7 @@ class TestOpenNebulaDataSource(MockerTestCase):              util.find_devs_with = orig_find_devs_with -class TestOpenNebulaNetwork(MockerTestCase): +class TestOpenNebulaNetwork(unittest.TestCase):      def setUp(self):          super(TestOpenNebulaNetwork, self).setUp() @@ -280,7 +283,7 @@ iface eth0 inet static  ''') -class TestParseShellConfig(MockerTestCase): +class TestParseShellConfig(unittest.TestCase):      def test_no_seconds(self):          cfg = '\n'.join(["foo=bar", "SECONDS=2", "xx=foo"])          # we could test 'sleep 2', but that would make the test run slower. @@ -290,7 +293,7 @@ class TestParseShellConfig(MockerTestCase):  def populate_context_dir(path, variables):      data = "# Context variables generated by OpenNebula\n" -    for (k, v) in variables.iteritems(): +    for k, v in variables.items():          data += ("%s='%s'\n" % (k.upper(), v.replace(r"'", r"'\''")))      populate_dir(path, {'context.sh': data}) diff --git a/tests/unittests/test_datasource/test_openstack.py b/tests/unittests/test_datasource/test_openstack.py index 49894e51..81ef1546 100644 --- a/tests/unittests/test_datasource/test_openstack.py +++ b/tests/unittests/test_datasource/test_openstack.py @@ -20,12 +20,11 @@ import copy  import json  import re -from StringIO import StringIO - -from urlparse import urlparse -  from .. import helpers as test_helpers +from six import StringIO +from six.moves.urllib.parse import urlparse +  from cloudinit import helpers  from cloudinit import settings  from cloudinit.sources import DataSourceOpenStack as ds diff --git a/tests/unittests/test_datasource/test_smartos.py b/tests/unittests/test_datasource/test_smartos.py index 65675106..8b62b1b1 100644 --- a/tests/unittests/test_datasource/test_smartos.py +++ b/tests/unittests/test_datasource/test_smartos.py @@ -22,16 +22,21 @@  #   return responses.  # -import base64 +from __future__ import print_function +  from cloudinit import helpers as c_helpers  from cloudinit.sources import DataSourceSmartOS +from cloudinit.util import b64e  from .. import helpers  import os  import os.path  import re +import shutil +import tempfile  import stat  import uuid +  MOCK_RETURNS = {      'hostname': 'test-host',      'root_authorized_keys': 'ssh-rsa AAAAB3Nz...aC1yc2E= keyname', @@ -107,11 +112,12 @@ class MockSerial(object):  class TestSmartOSDataSource(helpers.FilesystemMockingTestCase):      def setUp(self): -        helpers.FilesystemMockingTestCase.setUp(self) +        super(TestSmartOSDataSource, self).setUp() -        # makeDir comes from MockerTestCase -        self.tmp = self.makeDir() -        self.legacy_user_d = self.makeDir() +        self.tmp = tempfile.mkdtemp() +        self.addCleanup(shutil.rmtree, self.tmp) +        self.legacy_user_d = tempfile.mkdtemp() +        self.addCleanup(shutil.rmtree, self.legacy_user_d)          # If you should want to watch the logs...          self._log = None @@ -227,7 +233,7 @@ class TestSmartOSDataSource(helpers.FilesystemMockingTestCase):          my_returns = MOCK_RETURNS.copy()          my_returns['base64_all'] = "true"          for k in ('hostname', 'cloud-init:user-data'): -            my_returns[k] = base64.b64encode(my_returns[k]) +            my_returns[k] = b64e(my_returns[k])          dsrc = self._get_ds(mockdata=my_returns)          ret = dsrc.get_data() @@ -248,7 +254,7 @@ class TestSmartOSDataSource(helpers.FilesystemMockingTestCase):          my_returns['b64-cloud-init:user-data'] = "true"          my_returns['b64-hostname'] = "true"          for k in ('hostname', 'cloud-init:user-data'): -            my_returns[k] = base64.b64encode(my_returns[k]) +            my_returns[k] = b64e(my_returns[k])          dsrc = self._get_ds(mockdata=my_returns)          ret = dsrc.get_data() @@ -264,7 +270,7 @@ class TestSmartOSDataSource(helpers.FilesystemMockingTestCase):          my_returns = MOCK_RETURNS.copy()          my_returns['base64_keys'] = 'hostname,ignored'          for k in ('hostname',): -            my_returns[k] = base64.b64encode(my_returns[k]) +            my_returns[k] = b64e(my_returns[k])          dsrc = self._get_ds(mockdata=my_returns)          ret = dsrc.get_data() @@ -365,7 +371,7 @@ class TestSmartOSDataSource(helpers.FilesystemMockingTestCase):                  permissions = oct(os.stat(name_f)[stat.ST_MODE])[-3:]                  if re.match(r'.*\/mdata-user-data$', name_f):                      found_new = True -                    print name_f +                    print(name_f)                      self.assertEquals(permissions, '400')          self.assertFalse(found_new) diff --git a/tests/unittests/test_distros/test_generic.py b/tests/unittests/test_distros/test_generic.py index db6aa0e8..35153f0d 100644 --- a/tests/unittests/test_distros/test_generic.py +++ b/tests/unittests/test_distros/test_generic.py @@ -4,6 +4,8 @@ from cloudinit import util  from .. import helpers  import os +import shutil +import tempfile  unknown_arch_info = {      'arches': ['default'], @@ -53,7 +55,8 @@ class TestGenericDistro(helpers.FilesystemMockingTestCase):      def setUp(self):          super(TestGenericDistro, self).setUp()          # Make a temp directoy for tests to use. -        self.tmp = self.makeDir() +        self.tmp = tempfile.mkdtemp() +        self.addCleanup(shutil.rmtree, self.tmp)      def _write_load_sudoers(self, _user, rules):          cls = distros.fetch("ubuntu") @@ -64,7 +67,6 @@ class TestGenericDistro(helpers.FilesystemMockingTestCase):          self.patchUtils(self.tmp)          d.write_sudo_rules("harlowja", rules)          contents = util.load_file(d.ci_sudoers_fn) -        self.restore()          return contents      def _count_in(self, lines_look_for, text_content): diff --git a/tests/unittests/test_distros/test_hostname.py b/tests/unittests/test_distros/test_hostname.py index 8e644f4d..143e29a9 100644 --- a/tests/unittests/test_distros/test_hostname.py +++ b/tests/unittests/test_distros/test_hostname.py @@ -1,4 +1,4 @@ -from mocker import MockerTestCase +import unittest  from cloudinit.distros.parsers import hostname @@ -12,7 +12,7 @@ blahblah  BASE_HOSTNAME = BASE_HOSTNAME.strip() -class TestHostnameHelper(MockerTestCase): +class TestHostnameHelper(unittest.TestCase):      def test_parse_same(self):          hn = hostname.HostnameConf(BASE_HOSTNAME)          self.assertEquals(str(hn).strip(), BASE_HOSTNAME) diff --git a/tests/unittests/test_distros/test_hosts.py b/tests/unittests/test_distros/test_hosts.py index 687a0dab..fc701eaa 100644 --- a/tests/unittests/test_distros/test_hosts.py +++ b/tests/unittests/test_distros/test_hosts.py @@ -1,4 +1,4 @@ -from mocker import MockerTestCase +import unittest  from cloudinit.distros.parsers import hosts @@ -14,7 +14,7 @@ BASE_ETC = '''  BASE_ETC = BASE_ETC.strip() -class TestHostsHelper(MockerTestCase): +class TestHostsHelper(unittest.TestCase):      def test_parse(self):          eh = hosts.HostsConf(BASE_ETC)          self.assertEquals(eh.get_entry('127.0.0.1'), [['localhost']]) diff --git a/tests/unittests/test_distros/test_netconfig.py b/tests/unittests/test_distros/test_netconfig.py index 193338e8..6d30c5b8 100644 --- a/tests/unittests/test_distros/test_netconfig.py +++ b/tests/unittests/test_distros/test_netconfig.py @@ -1,8 +1,16 @@ -from mocker import MockerTestCase +import os -import mocker +try: +    from unittest import mock +except ImportError: +    import mock +try: +    from contextlib import ExitStack +except ImportError: +    from contextlib2 import ExitStack -import os +from six import StringIO +from ..helpers import TestCase  from cloudinit import distros  from cloudinit import helpers @@ -11,8 +19,6 @@ from cloudinit import util  from cloudinit.distros.parsers.sys_conf import SysConf -from StringIO import StringIO -  BASE_NET_CFG = '''  auto lo @@ -74,7 +80,7 @@ class WriteBuffer(object):          return self.buffer.getvalue() -class TestNetCfgDistro(MockerTestCase): +class TestNetCfgDistro(TestCase):      def _get_distro(self, dname):          cls = distros.fetch(dname) @@ -85,34 +91,28 @@ class TestNetCfgDistro(MockerTestCase):      def test_simple_write_ub(self):          ub_distro = self._get_distro('ubuntu') -        util_mock = self.mocker.replace(util.write_file, -                                        spec=False, passthrough=False) -        exists_mock = self.mocker.replace(os.path.isfile, -                                          spec=False, passthrough=False) +        with ExitStack() as mocks: +            write_bufs = {} -        exists_mock(mocker.ARGS) -        self.mocker.count(0, None) -        self.mocker.result(False) +            def replace_write(filename, content, mode=0o644, omode="wb"): +                buf = WriteBuffer() +                buf.mode = mode +                buf.omode = omode +                buf.write(content) +                write_bufs[filename] = buf -        write_bufs = {} - -        def replace_write(filename, content, mode=0644, omode="wb"): -            buf = WriteBuffer() -            buf.mode = mode -            buf.omode = omode -            buf.write(content) -            write_bufs[filename] = buf +            mocks.enter_context( +                mock.patch.object(util, 'write_file', replace_write)) +            mocks.enter_context( +                mock.patch.object(os.path, 'isfile', return_value=False)) -        util_mock(mocker.ARGS) -        self.mocker.call(replace_write) -        self.mocker.replay() -        ub_distro.apply_network(BASE_NET_CFG, False) +            ub_distro.apply_network(BASE_NET_CFG, False) -        self.assertEquals(len(write_bufs), 1) -        self.assertIn('/etc/network/interfaces', write_bufs) -        write_buf = write_bufs['/etc/network/interfaces'] -        self.assertEquals(str(write_buf).strip(), BASE_NET_CFG.strip()) -        self.assertEquals(write_buf.mode, 0644) +            self.assertEquals(len(write_bufs), 1) +            self.assertIn('/etc/network/interfaces', write_bufs) +            write_buf = write_bufs['/etc/network/interfaces'] +            self.assertEquals(str(write_buf).strip(), BASE_NET_CFG.strip()) +            self.assertEquals(write_buf.mode, 0o644)      def assertCfgEquals(self, blob1, blob2):          b1 = dict(SysConf(blob1.strip().splitlines())) @@ -127,53 +127,41 @@ class TestNetCfgDistro(MockerTestCase):      def test_simple_write_rh(self):          rh_distro = self._get_distro('rhel') -        write_mock = self.mocker.replace(util.write_file, -                                         spec=False, passthrough=False) -        load_mock = self.mocker.replace(util.load_file, -                                        spec=False, passthrough=False) -        exists_mock = self.mocker.replace(os.path.isfile, -                                          spec=False, passthrough=False)          write_bufs = {} -        def replace_write(filename, content, mode=0644, omode="wb"): +        def replace_write(filename, content, mode=0o644, omode="wb"):              buf = WriteBuffer()              buf.mode = mode              buf.omode = omode              buf.write(content)              write_bufs[filename] = buf -        exists_mock(mocker.ARGS) -        self.mocker.count(0, None) -        self.mocker.result(False) - -        load_mock(mocker.ARGS) -        self.mocker.count(0, None) -        self.mocker.result('') - -        for _i in range(0, 3): -            write_mock(mocker.ARGS) -            self.mocker.call(replace_write) - -        write_mock(mocker.ARGS) -        self.mocker.call(replace_write) - -        self.mocker.replay() -        rh_distro.apply_network(BASE_NET_CFG, False) - -        self.assertEquals(len(write_bufs), 4) -        self.assertIn('/etc/sysconfig/network-scripts/ifcfg-lo', write_bufs) -        write_buf = write_bufs['/etc/sysconfig/network-scripts/ifcfg-lo'] -        expected_buf = ''' +        with ExitStack() as mocks: +            mocks.enter_context( +                mock.patch.object(util, 'write_file', replace_write)) +            mocks.enter_context( +                mock.patch.object(util, 'load_file', return_value='')) +            mocks.enter_context( +                mock.patch.object(os.path, 'isfile', return_value=False)) + +            rh_distro.apply_network(BASE_NET_CFG, False) + +            self.assertEquals(len(write_bufs), 4) +            self.assertIn('/etc/sysconfig/network-scripts/ifcfg-lo', +                          write_bufs) +            write_buf = write_bufs['/etc/sysconfig/network-scripts/ifcfg-lo'] +            expected_buf = '''  DEVICE="lo"  ONBOOT=yes  ''' -        self.assertCfgEquals(expected_buf, str(write_buf)) -        self.assertEquals(write_buf.mode, 0644) +            self.assertCfgEquals(expected_buf, str(write_buf)) +            self.assertEquals(write_buf.mode, 0o644) -        self.assertIn('/etc/sysconfig/network-scripts/ifcfg-eth0', write_bufs) -        write_buf = write_bufs['/etc/sysconfig/network-scripts/ifcfg-eth0'] -        expected_buf = ''' +            self.assertIn('/etc/sysconfig/network-scripts/ifcfg-eth0', +                          write_bufs) +            write_buf = write_bufs['/etc/sysconfig/network-scripts/ifcfg-eth0'] +            expected_buf = '''  DEVICE="eth0"  BOOTPROTO="static"  NETMASK="255.255.255.0" @@ -182,77 +170,66 @@ ONBOOT=yes  GATEWAY="192.168.1.254"  BROADCAST="192.168.1.0"  ''' -        self.assertCfgEquals(expected_buf, str(write_buf)) -        self.assertEquals(write_buf.mode, 0644) +            self.assertCfgEquals(expected_buf, str(write_buf)) +            self.assertEquals(write_buf.mode, 0o644) -        self.assertIn('/etc/sysconfig/network-scripts/ifcfg-eth1', write_bufs) -        write_buf = write_bufs['/etc/sysconfig/network-scripts/ifcfg-eth1'] -        expected_buf = ''' +            self.assertIn('/etc/sysconfig/network-scripts/ifcfg-eth1', +                          write_bufs) +            write_buf = write_bufs['/etc/sysconfig/network-scripts/ifcfg-eth1'] +            expected_buf = '''  DEVICE="eth1"  BOOTPROTO="dhcp"  ONBOOT=yes  ''' -        self.assertCfgEquals(expected_buf, str(write_buf)) -        self.assertEquals(write_buf.mode, 0644) +            self.assertCfgEquals(expected_buf, str(write_buf)) +            self.assertEquals(write_buf.mode, 0o644) -        self.assertIn('/etc/sysconfig/network', write_bufs) -        write_buf = write_bufs['/etc/sysconfig/network'] -        expected_buf = ''' +            self.assertIn('/etc/sysconfig/network', write_bufs) +            write_buf = write_bufs['/etc/sysconfig/network'] +            expected_buf = '''  # Created by cloud-init v. 0.7  NETWORKING=yes  ''' -        self.assertCfgEquals(expected_buf, str(write_buf)) -        self.assertEquals(write_buf.mode, 0644) +            self.assertCfgEquals(expected_buf, str(write_buf)) +            self.assertEquals(write_buf.mode, 0o644)      def test_write_ipv6_rhel(self):          rh_distro = self._get_distro('rhel') -        write_mock = self.mocker.replace(util.write_file, -                                         spec=False, passthrough=False) -        load_mock = self.mocker.replace(util.load_file, -                                        spec=False, passthrough=False) -        exists_mock = self.mocker.replace(os.path.isfile, -                                          spec=False, passthrough=False)          write_bufs = {} -        def replace_write(filename, content, mode=0644, omode="wb"): +        def replace_write(filename, content, mode=0o644, omode="wb"):              buf = WriteBuffer()              buf.mode = mode              buf.omode = omode              buf.write(content)              write_bufs[filename] = buf -        exists_mock(mocker.ARGS) -        self.mocker.count(0, None) -        self.mocker.result(False) - -        load_mock(mocker.ARGS) -        self.mocker.count(0, None) -        self.mocker.result('') - -        for _i in range(0, 3): -            write_mock(mocker.ARGS) -            self.mocker.call(replace_write) - -        write_mock(mocker.ARGS) -        self.mocker.call(replace_write) - -        self.mocker.replay() -        rh_distro.apply_network(BASE_NET_CFG_IPV6, False) - -        self.assertEquals(len(write_bufs), 4) -        self.assertIn('/etc/sysconfig/network-scripts/ifcfg-lo', write_bufs) -        write_buf = write_bufs['/etc/sysconfig/network-scripts/ifcfg-lo'] -        expected_buf = ''' +        with ExitStack() as mocks: +            mocks.enter_context( +                mock.patch.object(util, 'write_file', replace_write)) +            mocks.enter_context( +                mock.patch.object(util, 'load_file', return_value='')) +            mocks.enter_context( +                mock.patch.object(os.path, 'isfile', return_value=False)) + +            rh_distro.apply_network(BASE_NET_CFG_IPV6, False) + +            self.assertEquals(len(write_bufs), 4) +            self.assertIn('/etc/sysconfig/network-scripts/ifcfg-lo', +                          write_bufs) +            write_buf = write_bufs['/etc/sysconfig/network-scripts/ifcfg-lo'] +            expected_buf = '''  DEVICE="lo"  ONBOOT=yes  ''' -        self.assertCfgEquals(expected_buf, str(write_buf)) -        self.assertEquals(write_buf.mode, 0644) +            self.assertCfgEquals(expected_buf, str(write_buf)) +            self.assertEquals(write_buf.mode, 0o644) -        self.assertIn('/etc/sysconfig/network-scripts/ifcfg-eth0', write_bufs) -        write_buf = write_bufs['/etc/sysconfig/network-scripts/ifcfg-eth0'] -        expected_buf = ''' +            self.assertIn('/etc/sysconfig/network-scripts/ifcfg-eth0', +                          write_bufs) +            write_buf = write_bufs['/etc/sysconfig/network-scripts/ifcfg-eth0'] +            expected_buf = '''  DEVICE="eth0"  BOOTPROTO="static"  NETMASK="255.255.255.0" @@ -264,11 +241,12 @@ IPV6INIT=yes  IPV6ADDR="2607:f0d0:1002:0011::2"  IPV6_DEFAULTGW="2607:f0d0:1002:0011::1"  ''' -        self.assertCfgEquals(expected_buf, str(write_buf)) -        self.assertEquals(write_buf.mode, 0644) -        self.assertIn('/etc/sysconfig/network-scripts/ifcfg-eth1', write_bufs) -        write_buf = write_bufs['/etc/sysconfig/network-scripts/ifcfg-eth1'] -        expected_buf = ''' +            self.assertCfgEquals(expected_buf, str(write_buf)) +            self.assertEquals(write_buf.mode, 0o644) +            self.assertIn('/etc/sysconfig/network-scripts/ifcfg-eth1', +                          write_bufs) +            write_buf = write_bufs['/etc/sysconfig/network-scripts/ifcfg-eth1'] +            expected_buf = '''  DEVICE="eth1"  BOOTPROTO="static"  NETMASK="255.255.255.0" @@ -280,38 +258,22 @@ IPV6INIT=yes  IPV6ADDR="2607:f0d0:1002:0011::3"  IPV6_DEFAULTGW="2607:f0d0:1002:0011::1"  ''' -        self.assertCfgEquals(expected_buf, str(write_buf)) -        self.assertEquals(write_buf.mode, 0644) +            self.assertCfgEquals(expected_buf, str(write_buf)) +            self.assertEquals(write_buf.mode, 0o644) -        self.assertIn('/etc/sysconfig/network', write_bufs) -        write_buf = write_bufs['/etc/sysconfig/network'] -        expected_buf = ''' +            self.assertIn('/etc/sysconfig/network', write_bufs) +            write_buf = write_bufs['/etc/sysconfig/network'] +            expected_buf = '''  # Created by cloud-init v. 0.7  NETWORKING=yes  NETWORKING_IPV6=yes  IPV6_AUTOCONF=no  ''' -        self.assertCfgEquals(expected_buf, str(write_buf)) -        self.assertEquals(write_buf.mode, 0644) +            self.assertCfgEquals(expected_buf, str(write_buf)) +            self.assertEquals(write_buf.mode, 0o644)      def test_simple_write_freebsd(self):          fbsd_distro = self._get_distro('freebsd') -        util_mock = self.mocker.replace(util.write_file, -                                        spec=False, passthrough=False) -        exists_mock = self.mocker.replace(os.path.isfile, -                                          spec=False, passthrough=False) -        load_mock = self.mocker.replace(util.load_file, -                                        spec=False, passthrough=False) -        subp_mock = self.mocker.replace(util.subp, -                                        spec=False, passthrough=False) - -        subp_mock(['ifconfig', '-a']) -        self.mocker.count(0, None) -        self.mocker.result(('vtnet0', '')) - -        exists_mock(mocker.ARGS) -        self.mocker.count(0, None) -        self.mocker.result(False)          write_bufs = {}          read_bufs = { @@ -319,7 +281,7 @@ IPV6_AUTOCONF=no              '/etc/resolv.conf': '',          } -        def replace_write(filename, content, mode=0644, omode="wb"): +        def replace_write(filename, content, mode=0o644, omode="wb"):              buf = WriteBuffer()              buf.mode = mode              buf.omode = omode @@ -336,23 +298,24 @@ IPV6_AUTOCONF=no                      return str(write_bufs[fname])                  return read_bufs[fname] -        util_mock(mocker.ARGS) -        self.mocker.call(replace_write) -        self.mocker.count(0, None) - -        load_mock(mocker.ARGS) -        self.mocker.call(replace_read) -        self.mocker.count(0, None) - -        self.mocker.replay() -        fbsd_distro.apply_network(BASE_NET_CFG, False) - -        self.assertIn('/etc/rc.conf', write_bufs) -        write_buf = write_bufs['/etc/rc.conf'] -        expected_buf = ''' +        with ExitStack() as mocks: +            mocks.enter_context( +                mock.patch.object(util, 'subp', return_value=('vtnet0', ''))) +            mocks.enter_context( +                mock.patch.object(os.path, 'exists', return_value=False)) +            mocks.enter_context( +                mock.patch.object(util, 'write_file', replace_write)) +            mocks.enter_context( +                mock.patch.object(util, 'load_file', replace_read)) + +            fbsd_distro.apply_network(BASE_NET_CFG, False) + +            self.assertIn('/etc/rc.conf', write_bufs) +            write_buf = write_bufs['/etc/rc.conf'] +            expected_buf = '''  ifconfig_vtnet0="192.168.1.5 netmask 255.255.255.0"  ifconfig_vtnet1="DHCP"  defaultrouter="192.168.1.254"  ''' -        self.assertCfgEquals(expected_buf, str(write_buf)) -        self.assertEquals(write_buf.mode, 0644) +            self.assertCfgEquals(expected_buf, str(write_buf)) +            self.assertEquals(write_buf.mode, 0o644) diff --git a/tests/unittests/test_distros/test_resolv.py b/tests/unittests/test_distros/test_resolv.py index 6b6ff6aa..faaf5b7f 100644 --- a/tests/unittests/test_distros/test_resolv.py +++ b/tests/unittests/test_distros/test_resolv.py @@ -1,8 +1,7 @@ -from mocker import MockerTestCase -  from cloudinit.distros.parsers import resolv_conf  import re +from ..helpers import TestCase  BASE_RESOLVE = ''' @@ -14,7 +13,7 @@ nameserver 10.15.30.92  BASE_RESOLVE = BASE_RESOLVE.strip() -class TestResolvHelper(MockerTestCase): +class TestResolvHelper(TestCase):      def test_parse_same(self):          rp = resolv_conf.ResolvConf(BASE_RESOLVE)          rp_r = str(rp).strip() diff --git a/tests/unittests/test_distros/test_sysconfig.py b/tests/unittests/test_distros/test_sysconfig.py index 0c651407..03d89a10 100644 --- a/tests/unittests/test_distros/test_sysconfig.py +++ b/tests/unittests/test_distros/test_sysconfig.py @@ -1,14 +1,13 @@ -from mocker import MockerTestCase -  import re  from cloudinit.distros.parsers.sys_conf import SysConf +from ..helpers import TestCase  # Lots of good examples @  # http://content.hccfl.edu/pollock/AUnix1/SysconfigFilesDesc.txt -class TestSysConfHelper(MockerTestCase): +class TestSysConfHelper(TestCase):      # This function was added in 2.7, make it work for 2.6      def assertRegMatches(self, text, regexp):          regexp = re.compile(regexp) diff --git a/tests/unittests/test_distros/test_user_data_normalize.py b/tests/unittests/test_distros/test_user_data_normalize.py index 50398c74..e4488e2a 100644 --- a/tests/unittests/test_distros/test_user_data_normalize.py +++ b/tests/unittests/test_distros/test_user_data_normalize.py @@ -1,9 +1,10 @@ -from mocker import MockerTestCase -  from cloudinit import distros  from cloudinit import helpers  from cloudinit import settings +from ..helpers import TestCase + +  bcfg = {     'name': 'bob',     'plain_text_passwd': 'ubuntu', @@ -15,7 +16,7 @@ bcfg = {  } -class TestUGNormalize(MockerTestCase): +class TestUGNormalize(TestCase):      def _make_distro(self, dtype, def_user=None):          cfg = dict(settings.CFG_BUILTIN) diff --git a/tests/unittests/test_filters/test_launch_index.py b/tests/unittests/test_filters/test_launch_index.py index 2f4c2fda..95d24b9b 100644 --- a/tests/unittests/test_filters/test_launch_index.py +++ b/tests/unittests/test_filters/test_launch_index.py @@ -2,7 +2,7 @@ import copy  from .. import helpers -import itertools +from six.moves import filterfalse  from cloudinit.filters import launch_index  from cloudinit import user_data as ud @@ -36,11 +36,9 @@ class TestLaunchFilter(helpers.ResourceUsingTestCase):              return False          # Do some basic payload checking          msg1_msgs = [m for m in msg1.walk()] -        msg1_msgs = [m for m in -                     itertools.ifilterfalse(ud.is_skippable, msg1_msgs)] +        msg1_msgs = [m for m in filterfalse(ud.is_skippable, msg1_msgs)]          msg2_msgs = [m for m in msg2.walk()] -        msg2_msgs = [m for m in -                     itertools.ifilterfalse(ud.is_skippable, msg2_msgs)] +        msg2_msgs = [m for m in filterfalse(ud.is_skippable, msg2_msgs)]          for i in range(0, len(msg2_msgs)):              m1_msg = msg1_msgs[i]              m2_msg = msg2_msgs[i] diff --git a/tests/unittests/test_handler/test_handler_apt_configure.py b/tests/unittests/test_handler/test_handler_apt_configure.py index 203dd2aa..d8fe9a4f 100644 --- a/tests/unittests/test_handler/test_handler_apt_configure.py +++ b/tests/unittests/test_handler/test_handler_apt_configure.py @@ -1,27 +1,27 @@ -from mocker import MockerTestCase -  from cloudinit import util  from cloudinit.config import cc_apt_configure +from ..helpers import TestCase  import os  import re +import shutil +import tempfile +import unittest -class TestAptProxyConfig(MockerTestCase): +class TestAptProxyConfig(TestCase):      def setUp(self):          super(TestAptProxyConfig, self).setUp() -        self.tmp = self.makeDir() +        self.tmp = tempfile.mkdtemp() +        self.addCleanup(shutil.rmtree, self.tmp)          self.pfile = os.path.join(self.tmp, "proxy.cfg")          self.cfile = os.path.join(self.tmp, "config.cfg")      def _search_apt_config(self, contents, ptype, value): -        print( -            r"acquire::%s::proxy\s+[\"']%s[\"'];\n" % (ptype, value), -            contents, "flags=re.IGNORECASE") -        return(re.search( +        return re.search(              r"acquire::%s::proxy\s+[\"']%s[\"'];\n" % (ptype, value), -            contents, flags=re.IGNORECASE)) +            contents, flags=re.IGNORECASE)      def test_apt_proxy_written(self):          cfg = {'apt_proxy': 'myproxy'} @@ -60,7 +60,7 @@ class TestAptProxyConfig(MockerTestCase):          contents = str(util.read_file_or_url(self.pfile)) -        for ptype, pval in values.iteritems(): +        for ptype, pval in values.items():              self.assertTrue(self._search_apt_config(contents, ptype, pval))      def test_proxy_deleted(self): diff --git a/tests/unittests/test_handler/test_handler_ca_certs.py b/tests/unittests/test_handler/test_handler_ca_certs.py index 0558023a..a6b9c0fd 100644 --- a/tests/unittests/test_handler/test_handler_ca_certs.py +++ b/tests/unittests/test_handler/test_handler_ca_certs.py @@ -1,15 +1,26 @@ -from mocker import MockerTestCase -  from cloudinit import cloud  from cloudinit import helpers  from cloudinit import util  from cloudinit.config import cc_ca_certs +from ..helpers import TestCase  import logging +import shutil +import tempfile +import unittest + +try: +    from unittest import mock +except ImportError: +    import mock +try: +    from contextlib import ExitStack +except ImportError: +    from contextlib2 import ExitStack -class TestNoConfig(MockerTestCase): +class TestNoConfig(unittest.TestCase):      def setUp(self):          super(TestNoConfig, self).setUp()          self.name = "ca-certs" @@ -22,15 +33,20 @@ class TestNoConfig(MockerTestCase):          Test that nothing is done if no ca-certs configuration is provided.          """          config = util.get_builtin_cfg() -        self.mocker.replace(util.write_file, passthrough=False) -        self.mocker.replace(cc_ca_certs.update_ca_certs, passthrough=False) -        self.mocker.replay() +        with ExitStack() as mocks: +            util_mock = mocks.enter_context( +                mock.patch.object(util, 'write_file')) +            certs_mock = mocks.enter_context( +                mock.patch.object(cc_ca_certs, 'update_ca_certs')) -        cc_ca_certs.handle(self.name, config, self.cloud_init, self.log, -                           self.args) +            cc_ca_certs.handle(self.name, config, self.cloud_init, self.log, +                               self.args) +            self.assertEqual(util_mock.call_count, 0) +            self.assertEqual(certs_mock.call_count, 0) -class TestConfig(MockerTestCase): + +class TestConfig(TestCase):      def setUp(self):          super(TestConfig, self).setUp()          self.name = "ca-certs" @@ -39,16 +55,16 @@ class TestConfig(MockerTestCase):          self.log = logging.getLogger("TestNoConfig")          self.args = [] -        # Mock out the functions that actually modify the system -        self.mock_add = self.mocker.replace(cc_ca_certs.add_ca_certs, -                                            passthrough=False) -        self.mock_update = self.mocker.replace(cc_ca_certs.update_ca_certs, -                                               passthrough=False) -        self.mock_remove = self.mocker.replace( -            cc_ca_certs.remove_default_ca_certs, passthrough=False) +        self.mocks = ExitStack() +        self.addCleanup(self.mocks.close) -        # Order must be correct -        self.mocker.order() +        # Mock out the functions that actually modify the system +        self.mock_add = self.mocks.enter_context( +            mock.patch.object(cc_ca_certs, 'add_ca_certs')) +        self.mock_update = self.mocks.enter_context( +            mock.patch.object(cc_ca_certs, 'update_ca_certs')) +        self.mock_remove = self.mocks.enter_context( +            mock.patch.object(cc_ca_certs, 'remove_default_ca_certs'))      def test_no_trusted_list(self):          """ @@ -57,86 +73,88 @@ class TestConfig(MockerTestCase):          """          config = {"ca-certs": {}} -        # No functions should be called -        self.mock_update() -        self.mocker.replay() -          cc_ca_certs.handle(self.name, config, self.cloud, self.log, self.args) +        self.assertEqual(self.mock_add.call_count, 0) +        self.assertEqual(self.mock_update.call_count, 1) +        self.assertEqual(self.mock_remove.call_count, 0) +      def test_empty_trusted_list(self):          """Test that no certificate are written if 'trusted' list is empty."""          config = {"ca-certs": {"trusted": []}} -        # No functions should be called -        self.mock_update() -        self.mocker.replay() -          cc_ca_certs.handle(self.name, config, self.cloud, self.log, self.args) +        self.assertEqual(self.mock_add.call_count, 0) +        self.assertEqual(self.mock_update.call_count, 1) +        self.assertEqual(self.mock_remove.call_count, 0) +      def test_single_trusted(self):          """Test that a single cert gets passed to add_ca_certs."""          config = {"ca-certs": {"trusted": ["CERT1"]}} -        self.mock_add(["CERT1"]) -        self.mock_update() -        self.mocker.replay() -          cc_ca_certs.handle(self.name, config, self.cloud, self.log, self.args) +        self.mock_add.assert_called_once_with(['CERT1']) +        self.assertEqual(self.mock_update.call_count, 1) +        self.assertEqual(self.mock_remove.call_count, 0) +      def test_multiple_trusted(self):          """Test that multiple certs get passed to add_ca_certs."""          config = {"ca-certs": {"trusted": ["CERT1", "CERT2"]}} -        self.mock_add(["CERT1", "CERT2"]) -        self.mock_update() -        self.mocker.replay() -          cc_ca_certs.handle(self.name, config, self.cloud, self.log, self.args) +        self.mock_add.assert_called_once_with(['CERT1', 'CERT2']) +        self.assertEqual(self.mock_update.call_count, 1) +        self.assertEqual(self.mock_remove.call_count, 0) +      def test_remove_default_ca_certs(self):          """Test remove_defaults works as expected."""          config = {"ca-certs": {"remove-defaults": True}} -        self.mock_remove() -        self.mock_update() -        self.mocker.replay() -          cc_ca_certs.handle(self.name, config, self.cloud, self.log, self.args) +        self.assertEqual(self.mock_add.call_count, 0) +        self.assertEqual(self.mock_update.call_count, 1) +        self.assertEqual(self.mock_remove.call_count, 1) +      def test_no_remove_defaults_if_false(self):          """Test remove_defaults is not called when config value is False."""          config = {"ca-certs": {"remove-defaults": False}} -        self.mock_update() -        self.mocker.replay() -          cc_ca_certs.handle(self.name, config, self.cloud, self.log, self.args) +        self.assertEqual(self.mock_add.call_count, 0) +        self.assertEqual(self.mock_update.call_count, 1) +        self.assertEqual(self.mock_remove.call_count, 0) +      def test_correct_order_for_remove_then_add(self):          """Test remove_defaults is not called when config value is False."""          config = {"ca-certs": {"remove-defaults": True, "trusted": ["CERT1"]}} -        self.mock_remove() -        self.mock_add(["CERT1"]) -        self.mock_update() -        self.mocker.replay() -          cc_ca_certs.handle(self.name, config, self.cloud, self.log, self.args) +        self.mock_add.assert_called_once_with(['CERT1']) +        self.assertEqual(self.mock_update.call_count, 1) +        self.assertEqual(self.mock_remove.call_count, 1) -class TestAddCaCerts(MockerTestCase): + +class TestAddCaCerts(TestCase):      def setUp(self):          super(TestAddCaCerts, self).setUp() +        tmpdir = tempfile.mkdtemp() +        self.addCleanup(shutil.rmtree, tmpdir)          self.paths = helpers.Paths({ -            'cloud_dir': self.makeDir() +            'cloud_dir': tmpdir,          })      def test_no_certs_in_list(self):          """Test that no certificate are written if not provided.""" -        self.mocker.replace(util.write_file, passthrough=False) -        self.mocker.replay() -        cc_ca_certs.add_ca_certs([]) +        with mock.patch.object(util, 'write_file') as mockobj: +            cc_ca_certs.add_ca_certs([]) +        self.assertEqual(mockobj.call_count, 0)      def test_single_cert_trailing_cr(self):          """Test adding a single certificate to the trusted CAs @@ -146,19 +164,21 @@ class TestAddCaCerts(MockerTestCase):          ca_certs_content = "line1\nline2\ncloud-init-ca-certs.crt\nline3\n"          expected = "line1\nline2\nline3\ncloud-init-ca-certs.crt\n" -        mock_write = self.mocker.replace(util.write_file, passthrough=False) -        mock_load = self.mocker.replace(util.load_file, passthrough=False) - -        mock_write("/usr/share/ca-certificates/cloud-init-ca-certs.crt", -                   cert, mode=0644) - -        mock_load("/etc/ca-certificates.conf") -        self.mocker.result(ca_certs_content) +        with ExitStack() as mocks: +            mock_write = mocks.enter_context( +                mock.patch.object(util, 'write_file')) +            mock_load = mocks.enter_context( +                mock.patch.object(util, 'load_file', +                                  return_value=ca_certs_content)) -        mock_write("/etc/ca-certificates.conf", expected, omode="wb") -        self.mocker.replay() +            cc_ca_certs.add_ca_certs([cert]) -        cc_ca_certs.add_ca_certs([cert]) +            mock_write.assert_has_calls([ +                mock.call("/usr/share/ca-certificates/cloud-init-ca-certs.crt", +                          cert, mode=0o644), +                mock.call("/etc/ca-certificates.conf", expected, omode="wb"), +                ]) +            mock_load.assert_called_once_with("/etc/ca-certificates.conf")      def test_single_cert_no_trailing_cr(self):          """Test adding a single certificate to the trusted CAs @@ -167,75 +187,89 @@ class TestAddCaCerts(MockerTestCase):          ca_certs_content = "line1\nline2\nline3" -        mock_write = self.mocker.replace(util.write_file, passthrough=False) -        mock_load = self.mocker.replace(util.load_file, passthrough=False) +        with ExitStack() as mocks: +            mock_write = mocks.enter_context( +                mock.patch.object(util, 'write_file')) +            mock_load = mocks.enter_context( +                mock.patch.object(util, 'load_file', +                                  return_value=ca_certs_content)) -        mock_write("/usr/share/ca-certificates/cloud-init-ca-certs.crt", -                   cert, mode=0644) +            cc_ca_certs.add_ca_certs([cert]) -        mock_load("/etc/ca-certificates.conf") -        self.mocker.result(ca_certs_content) +            mock_write.assert_has_calls([ +                mock.call("/usr/share/ca-certificates/cloud-init-ca-certs.crt", +                          cert, mode=0o644), +                mock.call("/etc/ca-certificates.conf", +                          "%s\n%s\n" % (ca_certs_content, +                                        "cloud-init-ca-certs.crt"), +                          omode="wb"), +                ]) -        mock_write("/etc/ca-certificates.conf", -                   "%s\n%s\n" % (ca_certs_content, "cloud-init-ca-certs.crt"), -                   omode="wb") -        self.mocker.replay() - -        cc_ca_certs.add_ca_certs([cert]) +            mock_load.assert_called_once_with("/etc/ca-certificates.conf")      def test_multiple_certs(self):          """Test adding multiple certificates to the trusted CAs."""          certs = ["CERT1\nLINE2\nLINE3", "CERT2\nLINE2\nLINE3"]          expected_cert_file = "\n".join(certs) - -        mock_write = self.mocker.replace(util.write_file, passthrough=False) -        mock_load = self.mocker.replace(util.load_file, passthrough=False) - -        mock_write("/usr/share/ca-certificates/cloud-init-ca-certs.crt", -                   expected_cert_file, mode=0644) -          ca_certs_content = "line1\nline2\nline3" -        mock_load("/etc/ca-certificates.conf") -        self.mocker.result(ca_certs_content) -        out = "%s\n%s\n" % (ca_certs_content, "cloud-init-ca-certs.crt") -        mock_write("/etc/ca-certificates.conf", out, omode="wb") +        with ExitStack() as mocks: +            mock_write = mocks.enter_context( +                mock.patch.object(util, 'write_file')) +            mock_load = mocks.enter_context( +                mock.patch.object(util, 'load_file', +                                  return_value=ca_certs_content)) -        self.mocker.replay() +            cc_ca_certs.add_ca_certs(certs) -        cc_ca_certs.add_ca_certs(certs) +            mock_write.assert_has_calls([ +                mock.call("/usr/share/ca-certificates/cloud-init-ca-certs.crt", +                          expected_cert_file, mode=0o644), +                mock.call("/etc/ca-certificates.conf", +                          "%s\n%s\n" % (ca_certs_content, +                                        "cloud-init-ca-certs.crt"), +                          omode='wb'), +                ]) +            mock_load.assert_called_once_with("/etc/ca-certificates.conf") -class TestUpdateCaCerts(MockerTestCase): -    def test_commands(self): -        mock_check_call = self.mocker.replace(util.subp, -                                              passthrough=False) -        mock_check_call(["update-ca-certificates"], capture=False) -        self.mocker.replay() -        cc_ca_certs.update_ca_certs() +class TestUpdateCaCerts(unittest.TestCase): +    def test_commands(self): +        with mock.patch.object(util, 'subp') as mockobj: +            cc_ca_certs.update_ca_certs() +            mockobj.assert_called_once_with( +                ["update-ca-certificates"], capture=False) -class TestRemoveDefaultCaCerts(MockerTestCase): +class TestRemoveDefaultCaCerts(TestCase):      def setUp(self):          super(TestRemoveDefaultCaCerts, self).setUp() +        tmpdir = tempfile.mkdtemp() +        self.addCleanup(shutil.rmtree, tmpdir)          self.paths = helpers.Paths({ -            'cloud_dir': self.makeDir() +            'cloud_dir': tmpdir,          })      def test_commands(self): -        mock_delete_dir_contents = self.mocker.replace( -            util.delete_dir_contents, passthrough=False) -        mock_write = self.mocker.replace(util.write_file, passthrough=False) -        mock_subp = self.mocker.replace(util.subp, -                                        passthrough=False) - -        mock_delete_dir_contents("/usr/share/ca-certificates/") -        mock_delete_dir_contents("/etc/ssl/certs/") -        mock_write("/etc/ca-certificates.conf", "", mode=0644) -        mock_subp(('debconf-set-selections', '-'), -                  "ca-certificates ca-certificates/trust_new_crts select no") -        self.mocker.replay() - -        cc_ca_certs.remove_default_ca_certs() +        with ExitStack() as mocks: +            mock_delete = mocks.enter_context( +                mock.patch.object(util, 'delete_dir_contents')) +            mock_write = mocks.enter_context( +                mock.patch.object(util, 'write_file')) +            mock_subp = mocks.enter_context(mock.patch.object(util, 'subp')) + +            cc_ca_certs.remove_default_ca_certs() + +            mock_delete.assert_has_calls([ +                mock.call("/usr/share/ca-certificates/"), +                mock.call("/etc/ssl/certs/"), +                ]) + +            mock_write.assert_called_once_with( +                "/etc/ca-certificates.conf", "", mode=0o644) + +            mock_subp.assert_called_once_with( +                ('debconf-set-selections', '-'), +                "ca-certificates ca-certificates/trust_new_crts select no") diff --git a/tests/unittests/test_handler/test_handler_chef.py b/tests/unittests/test_handler/test_handler_chef.py index ef1aa208..edad88cb 100644 --- a/tests/unittests/test_handler/test_handler_chef.py +++ b/tests/unittests/test_handler/test_handler_chef.py @@ -11,15 +11,21 @@ from cloudinit.sources import DataSourceNone  from .. import helpers as t_help +import six  import logging +import shutil +import tempfile  LOG = logging.getLogger(__name__) +CLIENT_TEMPL = os.path.sep.join(["templates", "chef_client.rb.tmpl"]) +  class TestChef(t_help.FilesystemMockingTestCase):      def setUp(self):          super(TestChef, self).setUp() -        self.tmp = self.makeDir(prefix="unittest_") +        self.tmp = tempfile.mkdtemp() +        self.addCleanup(shutil.rmtree, self.tmp)      def fetch_cloud(self, distro_kind):          cls = distros.fetch(distro_kind) @@ -37,9 +43,13 @@ class TestChef(t_help.FilesystemMockingTestCase):          for d in cc_chef.CHEF_DIRS:              self.assertFalse(os.path.isdir(d)) +    @t_help.skipIf(not os.path.isfile(CLIENT_TEMPL), +                   CLIENT_TEMPL + " is not available")      def test_basic_config(self): -        # This should create a file of the format...          """ +        test basic config looks sane + +        # This should create a file of the format...          # Created by cloud-init v. 0.7.6 on Sat, 11 Oct 2014 23:57:21 +0000          log_level              :info          ssl_verify_mode        :verify_none @@ -74,7 +84,7 @@ class TestChef(t_help.FilesystemMockingTestCase):          for k, v in cfg['chef'].items():              self.assertIn(v, c)          for k, v in cc_chef.CHEF_RB_TPL_DEFAULTS.items(): -            if isinstance(v, basestring): +            if isinstance(v, six.string_types):                  self.assertIn(v, c)          c = util.load_file(cc_chef.CHEF_FB_PATH)          self.assertEqual({}, json.loads(c)) @@ -101,6 +111,8 @@ class TestChef(t_help.FilesystemMockingTestCase):                  'c': 'd',              }, json.loads(c)) +    @t_help.skipIf(not os.path.isfile(CLIENT_TEMPL), +                   CLIENT_TEMPL + " is not available")      def test_template_deletes(self):          tpl_file = util.load_file('templates/chef_client.rb.tmpl')          self.patchUtils(self.tmp) diff --git a/tests/unittests/test_handler/test_handler_debug.py b/tests/unittests/test_handler/test_handler_debug.py index 8891ca04..80708d7b 100644 --- a/tests/unittests/test_handler/test_handler_debug.py +++ b/tests/unittests/test_handler/test_handler_debug.py @@ -26,6 +26,8 @@ from cloudinit.sources import DataSourceNone  from .. import helpers as t_help  import logging +import shutil +import tempfile  LOG = logging.getLogger(__name__) @@ -33,7 +35,8 @@ LOG = logging.getLogger(__name__)  class TestDebug(t_help.FilesystemMockingTestCase):      def setUp(self):          super(TestDebug, self).setUp() -        self.new_root = self.makeDir(prefix="unittest_") +        self.new_root = tempfile.mkdtemp() +        self.addCleanup(shutil.rmtree, self.new_root)      def _get_cloud(self, distro, metadata=None):          self.patchUtils(self.new_root) diff --git a/tests/unittests/test_handler/test_handler_growpart.py b/tests/unittests/test_handler/test_handler_growpart.py index 5d0636d1..bef0d80d 100644 --- a/tests/unittests/test_handler/test_handler_growpart.py +++ b/tests/unittests/test_handler/test_handler_growpart.py @@ -1,14 +1,23 @@ -from mocker import MockerTestCase -  from cloudinit import cloud  from cloudinit import util  from cloudinit.config import cc_growpart +from ..helpers import TestCase  import errno  import logging  import os  import re +import unittest + +try: +    from unittest import mock +except ImportError: +    import mock +try: +    from contextlib import ExitStack +except ImportError: +    from contextlib2 import ExitStack  # growpart:  #   mode: auto  # off, on, auto, 'growpart' @@ -42,7 +51,7 @@ growpart disk partition  """ -class TestDisabled(MockerTestCase): +class TestDisabled(unittest.TestCase):      def setUp(self):          super(TestDisabled, self).setUp()          self.name = "growpart" @@ -57,14 +66,14 @@ class TestDisabled(MockerTestCase):          # this really only verifies that resizer_factory isn't called          config = {'growpart': {'mode': 'off'}} -        self.mocker.replace(cc_growpart.resizer_factory, -                            passthrough=False) -        self.mocker.replay() -        self.handle(self.name, config, self.cloud_init, self.log, self.args) +        with mock.patch.object(cc_growpart, 'resizer_factory') as mockobj: +            self.handle(self.name, config, self.cloud_init, self.log, +                        self.args) +            self.assertEqual(mockobj.call_count, 0) -class TestConfig(MockerTestCase): +class TestConfig(TestCase):      def setUp(self):          super(TestConfig, self).setUp()          self.name = "growpart" @@ -77,75 +86,76 @@ class TestConfig(MockerTestCase):          self.cloud_init = None          self.handle = cc_growpart.handle -        # Order must be correct -        self.mocker.order() -      def test_no_resizers_auto_is_fine(self): -        subp = self.mocker.replace(util.subp, passthrough=False) -        subp(['growpart', '--help'], env={'LANG': 'C'}) -        self.mocker.result((HELP_GROWPART_NO_RESIZE, "")) -        self.mocker.replay() +        with mock.patch.object( +                util, 'subp', +                return_value=(HELP_GROWPART_NO_RESIZE, "")) as mockobj: -        config = {'growpart': {'mode': 'auto'}} -        self.handle(self.name, config, self.cloud_init, self.log, self.args) +            config = {'growpart': {'mode': 'auto'}} +            self.handle(self.name, config, self.cloud_init, self.log, +                        self.args) + +            mockobj.assert_called_once_with( +                ['growpart', '--help'], env={'LANG': 'C'})      def test_no_resizers_mode_growpart_is_exception(self): -        subp = self.mocker.replace(util.subp, passthrough=False) -        subp(['growpart', '--help'], env={'LANG': 'C'}) -        self.mocker.result((HELP_GROWPART_NO_RESIZE, "")) -        self.mocker.replay() +        with mock.patch.object( +                util, 'subp', +                return_value=(HELP_GROWPART_NO_RESIZE, "")) as mockobj: +            config = {'growpart': {'mode': "growpart"}} +            self.assertRaises( +                ValueError, self.handle, self.name, config, +                self.cloud_init, self.log, self.args) -        config = {'growpart': {'mode': "growpart"}} -        self.assertRaises(ValueError, self.handle, self.name, config, -                          self.cloud_init, self.log, self.args) +            mockobj.assert_called_once_with( +                ['growpart', '--help'], env={'LANG': 'C'})      def test_mode_auto_prefers_growpart(self): -        subp = self.mocker.replace(util.subp, passthrough=False) -        subp(['growpart', '--help'], env={'LANG': 'C'}) -        self.mocker.result((HELP_GROWPART_RESIZE, "")) -        self.mocker.replay() +        with mock.patch.object( +                util, 'subp', +                return_value=(HELP_GROWPART_RESIZE, "")) as mockobj: +            ret = cc_growpart.resizer_factory(mode="auto") +            self.assertIsInstance(ret, cc_growpart.ResizeGrowPart) -        ret = cc_growpart.resizer_factory(mode="auto") -        self.assertTrue(isinstance(ret, cc_growpart.ResizeGrowPart)) +            mockobj.assert_called_once_with( +                ['growpart', '--help'], env={'LANG': 'C'})      def test_handle_with_no_growpart_entry(self):          # if no 'growpart' entry in config, then mode=auto should be used          myresizer = object() +        retval = (("/", cc_growpart.RESIZE.CHANGED, "my-message",),) + +        with ExitStack() as mocks: +            factory = mocks.enter_context( +                mock.patch.object(cc_growpart, 'resizer_factory', +                                  return_value=myresizer)) +            rsdevs = mocks.enter_context( +                mock.patch.object(cc_growpart, 'resize_devices', +                                  return_value=retval)) +            mocks.enter_context( +                mock.patch.object(cc_growpart, 'RESIZERS', +                                  (('mysizer', object),) +                                  )) -        factory = self.mocker.replace(cc_growpart.resizer_factory, -                                      passthrough=False) -        rsdevs = self.mocker.replace(cc_growpart.resize_devices, -                                     passthrough=False) -        factory("auto") -        self.mocker.result(myresizer) -        rsdevs(myresizer, ["/"]) -        self.mocker.result((("/", cc_growpart.RESIZE.CHANGED, "my-message",),)) -        self.mocker.replay() - -        try: -            orig_resizers = cc_growpart.RESIZERS -            cc_growpart.RESIZERS = (('mysizer', object),)              self.handle(self.name, {}, self.cloud_init, self.log, self.args) -        finally: -            cc_growpart.RESIZERS = orig_resizers +            factory.assert_called_once_with('auto') +            rsdevs.assert_called_once_with(myresizer, ['/']) -class TestResize(MockerTestCase): + +class TestResize(unittest.TestCase):      def setUp(self):          super(TestResize, self).setUp()          self.name = "growpart"          self.log = logging.getLogger("TestResize") -        # Order must be correct -        self.mocker.order() -      def test_simple_devices(self):          # test simple device list          # this patches out devent2dev, os.stat, and device_part_info          # so in the end, doesn't test a lot          devs = ["/dev/XXda1", "/dev/YYda2"] -        devstat_ret = Bunch(st_mode=25008, st_ino=6078, st_dev=5L, +        devstat_ret = Bunch(st_mode=25008, st_ino=6078, st_dev=5,                              st_nlink=1, st_uid=0, st_gid=6, st_size=0,                              st_atime=0, st_mtime=0, st_ctime=0)          enoent = ["/dev/NOENT"] diff --git a/tests/unittests/test_handler/test_handler_locale.py b/tests/unittests/test_handler/test_handler_locale.py index eb251636..de85eff6 100644 --- a/tests/unittests/test_handler/test_handler_locale.py +++ b/tests/unittests/test_handler/test_handler_locale.py @@ -29,9 +29,11 @@ from .. import helpers as t_help  from configobj import ConfigObj -from StringIO import StringIO +from six import BytesIO  import logging +import shutil +import tempfile  LOG = logging.getLogger(__name__) @@ -39,7 +41,8 @@ LOG = logging.getLogger(__name__)  class TestLocale(t_help.FilesystemMockingTestCase):      def setUp(self):          super(TestLocale, self).setUp() -        self.new_root = self.makeDir(prefix="unittest_") +        self.new_root = tempfile.mkdtemp() +        self.addCleanup(shutil.rmtree, self.new_root)      def _get_cloud(self, distro):          self.patchUtils(self.new_root) @@ -59,6 +62,6 @@ class TestLocale(t_help.FilesystemMockingTestCase):          cc = self._get_cloud('sles')          cc_locale.handle('cc_locale', cfg, cc, LOG, []) -        contents = util.load_file('/etc/sysconfig/language') -        n_cfg = ConfigObj(StringIO(contents)) +        contents = util.load_file('/etc/sysconfig/language', decode=False) +        n_cfg = ConfigObj(BytesIO(contents))          self.assertEquals({'RC_LANG': cfg['locale']}, dict(n_cfg)) diff --git a/tests/unittests/test_handler/test_handler_seed_random.py b/tests/unittests/test_handler/test_handler_seed_random.py index 40481f16..0bcdcb31 100644 --- a/tests/unittests/test_handler/test_handler_seed_random.py +++ b/tests/unittests/test_handler/test_handler_seed_random.py @@ -18,11 +18,10 @@  from cloudinit.config import cc_seed_random -import base64  import gzip  import tempfile -from StringIO import StringIO +from six import BytesIO  from cloudinit import cloud  from cloudinit import distros @@ -69,7 +68,7 @@ class TestRandomSeed(t_help.TestCase):          return      def _compress(self, text): -        contents = StringIO() +        contents = BytesIO()          gz_fh = gzip.GzipFile(mode='wb', fileobj=contents)          gz_fh.write(text)          gz_fh.close() @@ -96,7 +95,7 @@ class TestRandomSeed(t_help.TestCase):          self.assertEquals("tiny-tim-was-here", contents)      def test_append_random_unknown_encoding(self): -        data = self._compress("tiny-toe") +        data = self._compress(b"tiny-toe")          cfg = {              'random_seed': {                  'file': self._seed_file, @@ -108,7 +107,7 @@ class TestRandomSeed(t_help.TestCase):                            self._get_cloud('ubuntu'), LOG, [])      def test_append_random_gzip(self): -        data = self._compress("tiny-toe") +        data = self._compress(b"tiny-toe")          cfg = {              'random_seed': {                  'file': self._seed_file, @@ -121,7 +120,7 @@ class TestRandomSeed(t_help.TestCase):          self.assertEquals("tiny-toe", contents)      def test_append_random_gz(self): -        data = self._compress("big-toe") +        data = self._compress(b"big-toe")          cfg = {              'random_seed': {                  'file': self._seed_file, @@ -134,7 +133,7 @@ class TestRandomSeed(t_help.TestCase):          self.assertEquals("big-toe", contents)      def test_append_random_base64(self): -        data = base64.b64encode('bubbles') +        data = util.b64e('bubbles')          cfg = {              'random_seed': {                  'file': self._seed_file, @@ -147,7 +146,7 @@ class TestRandomSeed(t_help.TestCase):          self.assertEquals("bubbles", contents)      def test_append_random_b64(self): -        data = base64.b64encode('kit-kat') +        data = util.b64e('kit-kat')          cfg = {              'random_seed': {                  'file': self._seed_file, diff --git a/tests/unittests/test_handler/test_handler_set_hostname.py b/tests/unittests/test_handler/test_handler_set_hostname.py index e1530e30..d358b069 100644 --- a/tests/unittests/test_handler/test_handler_set_hostname.py +++ b/tests/unittests/test_handler/test_handler_set_hostname.py @@ -7,9 +7,11 @@ from cloudinit import util  from .. import helpers as t_help +import shutil +import tempfile  import logging -from StringIO import StringIO +from six import BytesIO  from configobj import ConfigObj @@ -19,7 +21,8 @@ LOG = logging.getLogger(__name__)  class TestHostname(t_help.FilesystemMockingTestCase):      def setUp(self):          super(TestHostname, self).setUp() -        self.tmp = self.makeDir(prefix="unittest_") +        self.tmp = tempfile.mkdtemp() +        self.addCleanup(shutil.rmtree, self.tmp)      def _fetch_distro(self, kind):          cls = distros.fetch(kind) @@ -38,8 +41,8 @@ class TestHostname(t_help.FilesystemMockingTestCase):          cc_set_hostname.handle('cc_set_hostname',                                 cfg, cc, LOG, [])          if not distro.uses_systemd(): -            contents = util.load_file("/etc/sysconfig/network") -            n_cfg = ConfigObj(StringIO(contents)) +            contents = util.load_file("/etc/sysconfig/network", decode=False) +            n_cfg = ConfigObj(BytesIO(contents))              self.assertEquals({'HOSTNAME': 'blah.blah.blah.yahoo.com'},                                dict(n_cfg)) diff --git a/tests/unittests/test_handler/test_handler_timezone.py b/tests/unittests/test_handler/test_handler_timezone.py index 874db340..e3df8759 100644 --- a/tests/unittests/test_handler/test_handler_timezone.py +++ b/tests/unittests/test_handler/test_handler_timezone.py @@ -29,8 +29,10 @@ from .. import helpers as t_help  from configobj import ConfigObj -from StringIO import StringIO +from six import BytesIO +import shutil +import tempfile  import logging  LOG = logging.getLogger(__name__) @@ -39,7 +41,8 @@ LOG = logging.getLogger(__name__)  class TestTimezone(t_help.FilesystemMockingTestCase):      def setUp(self):          super(TestTimezone, self).setUp() -        self.new_root = self.makeDir(prefix="unittest_") +        self.new_root = tempfile.mkdtemp() +        self.addCleanup(shutil.rmtree, self.new_root)      def _get_cloud(self, distro):          self.patchUtils(self.new_root) @@ -67,8 +70,8 @@ class TestTimezone(t_help.FilesystemMockingTestCase):          cc_timezone.handle('cc_timezone', cfg, cc, LOG, []) -        contents = util.load_file('/etc/sysconfig/clock') -        n_cfg = ConfigObj(StringIO(contents)) +        contents = util.load_file('/etc/sysconfig/clock', decode=False) +        n_cfg = ConfigObj(BytesIO(contents))          self.assertEquals({'TIMEZONE': cfg['timezone']}, dict(n_cfg))          contents = util.load_file('/etc/localtime') diff --git a/tests/unittests/test_handler/test_handler_yum_add_repo.py b/tests/unittests/test_handler/test_handler_yum_add_repo.py index 435c9787..3a8aa7c1 100644 --- a/tests/unittests/test_handler/test_handler_yum_add_repo.py +++ b/tests/unittests/test_handler/test_handler_yum_add_repo.py @@ -4,9 +4,11 @@ from cloudinit.config import cc_yum_add_repo  from .. import helpers +import shutil +import tempfile  import logging -from StringIO import StringIO +from six import BytesIO  import configobj @@ -16,7 +18,8 @@ LOG = logging.getLogger(__name__)  class TestConfig(helpers.FilesystemMockingTestCase):      def setUp(self):          super(TestConfig, self).setUp() -        self.tmp = self.makeDir(prefix="unittest_") +        self.tmp = tempfile.mkdtemp() +        self.addCleanup(shutil.rmtree, self.tmp)      def test_bad_config(self):          cfg = { @@ -52,8 +55,9 @@ class TestConfig(helpers.FilesystemMockingTestCase):          }          self.patchUtils(self.tmp)          cc_yum_add_repo.handle('yum_add_repo', cfg, None, LOG, []) -        contents = util.load_file("/etc/yum.repos.d/epel_testing.repo") -        contents = configobj.ConfigObj(StringIO(contents)) +        contents = util.load_file("/etc/yum.repos.d/epel_testing.repo", +                                  decode=False) +        contents = configobj.ConfigObj(BytesIO(contents))          expected = {              'epel_testing': {                  'name': 'Extra Packages for Enterprise Linux 5 - Testing', diff --git a/tests/unittests/test_merging.py b/tests/unittests/test_merging.py index 07b610f7..976d8283 100644 --- a/tests/unittests/test_merging.py +++ b/tests/unittests/test_merging.py @@ -11,11 +11,13 @@ import glob  import os  import random  import re +import six  import string  SOURCE_PAT = "source*.*yaml"  EXPECTED_PAT = "expected%s.yaml" -TYPES = [long, int, dict, str, list, tuple, None] +TYPES = [dict, str, list, tuple, None] +TYPES.extend(six.integer_types)  def _old_mergedict(src, cand): @@ -25,7 +27,7 @@ def _old_mergedict(src, cand):      Nested dictionaries are merged recursively.      """      if isinstance(src, dict) and isinstance(cand, dict): -        for (k, v) in cand.iteritems(): +        for (k, v) in cand.items():              if k not in src:                  src[k] = v              else: @@ -42,8 +44,8 @@ def _old_mergemanydict(*args):  def _random_str(rand):      base = '' -    for _i in xrange(rand.randint(1, 2 ** 8)): -        base += rand.choice(string.letters + string.digits) +    for _i in range(rand.randint(1, 2 ** 8)): +        base += rand.choice(string.ascii_letters + string.digits)      return base @@ -64,7 +66,7 @@ def _make_dict(current_depth, max_depth, rand):      if t in [dict, list, tuple]:          if t in [dict]:              amount = rand.randint(0, 5) -            keys = [_random_str(rand) for _i in xrange(0, amount)] +            keys = [_random_str(rand) for _i in range(0, amount)]              base = {}              for k in keys:                  try: @@ -74,14 +76,14 @@ def _make_dict(current_depth, max_depth, rand):          elif t in [list, tuple]:              base = []              amount = rand.randint(0, 5) -            for _i in xrange(0, amount): +            for _i in range(0, amount):                  try:                      base.append(_make_dict(current_depth + 1, max_depth, rand))                  except _NoMoreException:                      pass              if t in [tuple]:                  base = tuple(base) -    elif t in [long, int]: +    elif t in six.integer_types:          base = rand.randint(0, 2 ** 8)      elif t in [str]:          base = _random_str(rand) diff --git a/tests/unittests/test_pathprefix2dict.py b/tests/unittests/test_pathprefix2dict.py index 590c4b82..7089bde6 100644 --- a/tests/unittests/test_pathprefix2dict.py +++ b/tests/unittests/test_pathprefix2dict.py @@ -1,13 +1,17 @@  from cloudinit import util -from mocker import MockerTestCase -from .helpers import populate_dir +from .helpers import TestCase, populate_dir +import shutil +import tempfile -class TestPathPrefix2Dict(MockerTestCase): + +class TestPathPrefix2Dict(TestCase):      def setUp(self): -        self.tmp = self.makeDir() +        super(TestPathPrefix2Dict, self).setUp() +        self.tmp = tempfile.mkdtemp() +        self.addCleanup(shutil.rmtree, self.tmp)      def test_required_only(self):          dirdata = {'f1': 'f1content', 'f2': 'f2content'} diff --git a/tests/unittests/test_runs/test_merge_run.py b/tests/unittests/test_runs/test_merge_run.py index 977adb34..d0ec36a9 100644 --- a/tests/unittests/test_runs/test_merge_run.py +++ b/tests/unittests/test_runs/test_merge_run.py @@ -1,20 +1,22 @@  import os +import shutil +import tempfile  from .. import helpers -from cloudinit.settings import (PER_INSTANCE) +from cloudinit.settings import PER_INSTANCE  from cloudinit import stages  from cloudinit import util  class TestMergeRun(helpers.FilesystemMockingTestCase):      def _patchIn(self, root): -        self.restore()          self.patchOS(root)          self.patchUtils(root)      def test_none_ds(self): -        new_root = self.makeDir() +        new_root = tempfile.mkdtemp() +        self.addCleanup(shutil.rmtree, new_root)          self.replicateTestRoot('simple_ubuntu', new_root)          cfg = {              'datasource_list': ['None'], diff --git a/tests/unittests/test_runs/test_simple_run.py b/tests/unittests/test_runs/test_simple_run.py index c9ba949e..e19e65cd 100644 --- a/tests/unittests/test_runs/test_simple_run.py +++ b/tests/unittests/test_runs/test_simple_run.py @@ -1,20 +1,20 @@  import os +import shutil +import tempfile  from .. import helpers -from cloudinit.settings import (PER_INSTANCE) +from cloudinit.settings import PER_INSTANCE  from cloudinit import stages  from cloudinit import util  class TestSimpleRun(helpers.FilesystemMockingTestCase):      def _patchIn(self, root): -        self.restore()          self.patchOS(root)          self.patchUtils(root)      def _pp_root(self, root, repatch=True): -        self.restore()          for (dirpath, dirnames, filenames) in os.walk(root):              print(dirpath)              for f in filenames: @@ -33,7 +33,8 @@ class TestSimpleRun(helpers.FilesystemMockingTestCase):              self._patchIn(root)      def test_none_ds(self): -        new_root = self.makeDir() +        new_root = tempfile.mkdtemp() +        self.addCleanup(shutil.rmtree, new_root)          self.replicateTestRoot('simple_ubuntu', new_root)          cfg = {              'datasource_list': ['None'], @@ -41,7 +42,7 @@ class TestSimpleRun(helpers.FilesystemMockingTestCase):                  {                      'path': '/etc/blah.ini',                      'content': 'blah', -                    'permissions': 0755, +                    'permissions': 0o755,                  },              ],              'cloud_init_modules': ['write-files'], diff --git a/tests/unittests/test_templating.py b/tests/unittests/test_templating.py index 3ba4ed8a..cf7c03b0 100644 --- a/tests/unittests/test_templating.py +++ b/tests/unittests/test_templating.py @@ -16,11 +16,23 @@  #    You should have received a copy of the GNU General Public License  #    along with this program.  If not, see <http://www.gnu.org/licenses/>. +from __future__ import print_function + +import sys +import six +import unittest +  from . import helpers as test_helpers  import textwrap  from cloudinit import templater +try: +    import Cheetah +    HAS_CHEETAH = True +except ImportError: +    HAS_CHEETAH = False +  class TestTemplates(test_helpers.TestCase):      def test_render_basic(self): @@ -38,6 +50,7 @@ class TestTemplates(test_helpers.TestCase):          out_data = templater.basic_render(in_data, {'b': 2})          self.assertEqual(expected_data.strip(), out_data) +    @test_helpers.skipIf(not HAS_CHEETAH, 'cheetah renderer not available')      def test_detection(self):          blob = "## template:cheetah" diff --git a/tests/unittests/test_util.py b/tests/unittests/test_util.py index 3e079131..33c191a9 100644 --- a/tests/unittests/test_util.py +++ b/tests/unittests/test_util.py @@ -1,13 +1,21 @@ +from __future__ import print_function + +import logging  import os +import shutil  import stat +import tempfile + +import six  import yaml -from mocker import MockerTestCase +from cloudinit import importer, util  from . import helpers -import unittest -from cloudinit import importer -from cloudinit import util +try: +    from unittest import mock +except ImportError: +    import mock  class FakeSelinux(object): @@ -29,7 +37,7 @@ class FakeSelinux(object):          self.restored.append(path) -class TestGetCfgOptionListOrStr(unittest.TestCase): +class TestGetCfgOptionListOrStr(helpers.TestCase):      def test_not_found_no_default(self):          """None is returned if key is not found and no default given."""          config = {} @@ -61,10 +69,11 @@ class TestGetCfgOptionListOrStr(unittest.TestCase):          self.assertEqual([], result) -class TestWriteFile(MockerTestCase): +class TestWriteFile(helpers.TestCase):      def setUp(self):          super(TestWriteFile, self).setUp() -        self.tmp = self.makeDir(prefix="unittest_") +        self.tmp = tempfile.mkdtemp() +        self.addCleanup(shutil.rmtree, self.tmp)      def test_basic_usage(self):          """Verify basic usage with default args.""" @@ -79,7 +88,7 @@ class TestWriteFile(MockerTestCase):              create_contents = f.read()              self.assertEqual(contents, create_contents)          file_stat = os.stat(path) -        self.assertEqual(0644, stat.S_IMODE(file_stat.st_mode)) +        self.assertEqual(0o644, stat.S_IMODE(file_stat.st_mode))      def test_dir_is_created_if_required(self):          """Verifiy that directories are created is required.""" @@ -97,12 +106,12 @@ class TestWriteFile(MockerTestCase):          path = os.path.join(self.tmp, "NewFile.txt")          contents = "Hey there" -        util.write_file(path, contents, mode=0666) +        util.write_file(path, contents, mode=0o666)          self.assertTrue(os.path.exists(path))          self.assertTrue(os.path.isfile(path))          file_stat = os.stat(path) -        self.assertEqual(0666, stat.S_IMODE(file_stat.st_mode)) +        self.assertEqual(0o666, stat.S_IMODE(file_stat.st_mode))      def test_custom_omode(self):          """Verify custom omode works properly.""" @@ -111,7 +120,7 @@ class TestWriteFile(MockerTestCase):          # Create file first with basic content          with open(path, "wb") as f: -            f.write("LINE1\n") +            f.write(b"LINE1\n")          util.write_file(path, contents, omode="a")          self.assertTrue(os.path.exists(path)) @@ -126,23 +135,24 @@ class TestWriteFile(MockerTestCase):          with open(my_file, "w") as fp:              fp.write("My Content") -        import_mock = self.mocker.replace(importer.import_module, -                                          passthrough=False) -        import_mock('selinux') -          fake_se = FakeSelinux(my_file) -        self.mocker.result(fake_se) -        self.mocker.replay() -        with util.SeLinuxGuard(my_file) as is_on: -            self.assertTrue(is_on) + +        with mock.patch.object(importer, 'import_module', +                               return_value=fake_se) as mockobj: +            with util.SeLinuxGuard(my_file) as is_on: +                self.assertTrue(is_on) +          self.assertEqual(1, len(fake_se.restored))          self.assertEqual(my_file, fake_se.restored[0]) +        mockobj.assert_called_once_with('selinux') -class TestDeleteDirContents(MockerTestCase): + +class TestDeleteDirContents(helpers.TestCase):      def setUp(self):          super(TestDeleteDirContents, self).setUp() -        self.tmp = self.makeDir(prefix="unittest_") +        self.tmp = tempfile.mkdtemp() +        self.addCleanup(shutil.rmtree, self.tmp)      def assertDirEmpty(self, dirname):          self.assertEqual([], os.listdir(dirname)) @@ -157,7 +167,7 @@ class TestDeleteDirContents(MockerTestCase):      def test_deletes_files(self):          """Single file should be deleted."""          with open(os.path.join(self.tmp, "new_file.txt"), "wb") as f: -            f.write("DELETE ME") +            f.write(b"DELETE ME")          util.delete_dir_contents(self.tmp) @@ -185,7 +195,7 @@ class TestDeleteDirContents(MockerTestCase):          os.mkdir(os.path.join(self.tmp, "new_dir"))          f_name = os.path.join(self.tmp, "new_dir", "new_file.txt")          with open(f_name, "wb") as f: -            f.write("DELETE ME") +            f.write(b"DELETE ME")          util.delete_dir_contents(self.tmp) @@ -196,7 +206,7 @@ class TestDeleteDirContents(MockerTestCase):          file_name = os.path.join(self.tmp, "new_file.txt")          link_name = os.path.join(self.tmp, "new_file_link.txt")          with open(file_name, "wb") as f: -            f.write("DELETE ME") +            f.write(b"DELETE ME")          os.symlink(file_name, link_name)          util.delete_dir_contents(self.tmp) @@ -204,20 +214,20 @@ class TestDeleteDirContents(MockerTestCase):          self.assertDirEmpty(self.tmp) -class TestKeyValStrings(unittest.TestCase): +class TestKeyValStrings(helpers.TestCase):      def test_keyval_str_to_dict(self):          expected = {'1': 'one', '2': 'one+one', 'ro': True}          cmdline = "1=one ro 2=one+one"          self.assertEqual(expected, util.keyval_str_to_dict(cmdline)) -class TestGetCmdline(unittest.TestCase): +class TestGetCmdline(helpers.TestCase):      def test_cmdline_reads_debug_env(self):          os.environ['DEBUG_PROC_CMDLINE'] = 'abcd 123'          self.assertEqual(os.environ['DEBUG_PROC_CMDLINE'], util.get_cmdline()) -class TestLoadYaml(unittest.TestCase): +class TestLoadYaml(helpers.TestCase):      mydefault = "7b03a8ebace993d806255121073fed52"      def test_simple(self): @@ -246,8 +256,8 @@ class TestLoadYaml(unittest.TestCase):                           self.mydefault)      def test_python_unicode(self): -        # complex type of python/unicde is explicitly allowed -        myobj = {'1': unicode("FOOBAR")} +        # complex type of python/unicode is explicitly allowed +        myobj = {'1': six.text_type("FOOBAR")}          safe_yaml = yaml.dump(myobj)          self.assertEqual(util.load_yaml(blob=safe_yaml,                                          default=self.mydefault), @@ -314,17 +324,17 @@ class TestMountinfoParsing(helpers.ResourceUsingTestCase):  class TestReadDMIData(helpers.FilesystemMockingTestCase):      def _patchIn(self, root): -        self.restore()          self.patchOS(root)          self.patchUtils(root)      def _write_key(self, key, content):          """Mocks the sys path found on Linux systems.""" -        new_root = self.makeDir() +        new_root = tempfile.mkdtemp() +        self.addCleanup(shutil.rmtree, new_root)          self._patchIn(new_root)          util.ensure_dir(os.path.join('sys', 'class', 'dmi', 'id')) -        dmi_key = "/sys/class/dmi/id/{}".format(key) +        dmi_key = "/sys/class/dmi/id/{0}".format(key)          util.write_file(dmi_key, content)      def _no_syspath(self, key, content): @@ -332,7 +342,8 @@ class TestReadDMIData(helpers.FilesystemMockingTestCase):          In order to test a missing sys path and call outs to dmidecode, this          function fakes the results of dmidecode to test the results.          """ -        new_root = self.makeDir() +        new_root = tempfile.mkdtemp() +        self.addCleanup(shutil.rmtree, new_root)          self._patchIn(new_root)          self.real_which = util.which          self.real_subp = util.subp @@ -366,4 +377,70 @@ class TestReadDMIData(helpers.FilesystemMockingTestCase):          self.assertFalse(None, util.read_dmi_data("key")) +class TestMultiLog(helpers.FilesystemMockingTestCase): + +    def _createConsole(self, root): +        os.mkdir(os.path.join(root, 'dev')) +        open(os.path.join(root, 'dev', 'console'), 'a').close() + +    def setUp(self): +        super(TestMultiLog, self).setUp() +        self.root = tempfile.mkdtemp() +        self.addCleanup(shutil.rmtree, self.root) +        self.patchOS(self.root) +        self.patchUtils(self.root) +        self.patchOpen(self.root) +        self.stdout = six.StringIO() +        self.stderr = six.StringIO() +        self.patchStdoutAndStderr(self.stdout, self.stderr) + +    def test_stderr_used_by_default(self): +        logged_string = 'test stderr output' +        util.multi_log(logged_string) +        self.assertEqual(logged_string, self.stderr.getvalue()) + +    def test_stderr_not_used_if_false(self): +        util.multi_log('should not see this', stderr=False) +        self.assertEqual('', self.stderr.getvalue()) + +    def test_logs_go_to_console_by_default(self): +        self._createConsole(self.root) +        logged_string = 'something very important' +        util.multi_log(logged_string) +        self.assertEqual(logged_string, open('/dev/console').read()) + +    def test_logs_dont_go_to_stdout_if_console_exists(self): +        self._createConsole(self.root) +        util.multi_log('something') +        self.assertEqual('', self.stdout.getvalue()) + +    def test_logs_go_to_stdout_if_console_does_not_exist(self): +        logged_string = 'something very important' +        util.multi_log(logged_string) +        self.assertEqual(logged_string, self.stdout.getvalue()) + +    def test_logs_go_to_log_if_given(self): +        log = mock.MagicMock() +        logged_string = 'something very important' +        util.multi_log(logged_string, log=log) +        self.assertEqual([((mock.ANY, logged_string), {})], +                         log.log.call_args_list) + +    def test_newlines_stripped_from_log_call(self): +        log = mock.MagicMock() +        expected_string = 'something very important' +        util.multi_log('{0}\n'.format(expected_string), log=log) +        self.assertEqual((mock.ANY, expected_string), log.log.call_args[0]) + +    def test_log_level_defaults_to_debug(self): +        log = mock.MagicMock() +        util.multi_log('message', log=log) +        self.assertEqual((logging.DEBUG, mock.ANY), log.log.call_args[0]) + +    def test_given_log_level_used(self): +        log = mock.MagicMock() +        log_level = mock.Mock() +        util.multi_log('message', log=log, log_level=log_level) +        self.assertEqual((log_level, mock.ANY), log.log.call_args[0]) +  # vi: ts=4 expandtab  | 
