diff options
Diffstat (limited to 'tests/unittests/test__init__.py')
| -rw-r--r-- | tests/unittests/test__init__.py | 238 | 
1 files changed, 114 insertions, 124 deletions
diff --git a/tests/unittests/test__init__.py b/tests/unittests/test__init__.py index 17965488..1a307e56 100644 --- a/tests/unittests/test__init__.py +++ b/tests/unittests/test__init__.py @@ -1,14 +1,25 @@  import os - -from mocker import MockerTestCase, ARGS, KWARGS +import shutil +import tempfile +import unittest + +try: +    from unittest import mock +except ImportError: +    import mock +try: +    from contextlib import ExitStack +except ImportError: +    from contextlib2 import ExitStack  from cloudinit import handlers  from cloudinit import helpers -from cloudinit import importer  from cloudinit import settings  from cloudinit import url_helper  from cloudinit import util +from .helpers import TestCase +  class FakeModule(handlers.Handler):      def __init__(self): @@ -22,76 +33,73 @@ class FakeModule(handlers.Handler):          pass -class TestWalkerHandleHandler(MockerTestCase): +class TestWalkerHandleHandler(TestCase):      def setUp(self): - -        MockerTestCase.setUp(self) +        super(TestWalkerHandleHandler, self).setUp() +        tmpdir = tempfile.mkdtemp() +        self.addCleanup(shutil.rmtree, tmpdir)          self.data = {              "handlercount": 0,              "frequency": "", -            "handlerdir": self.makeDir(), +            "handlerdir": tmpdir,              "handlers": helpers.ContentHandlers(),              "data": None}          self.expected_module_name = "part-handler-%03d" % (              self.data["handlercount"],)          expected_file_name = "%s.py" % self.expected_module_name -        expected_file_fullname = os.path.join(self.data["handlerdir"], -                                              expected_file_name) +        self.expected_file_fullname = os.path.join( +            self.data["handlerdir"], expected_file_name)          self.module_fake = FakeModule()          self.ctype = None          self.filename = None          self.payload = "dummy payload" -        # Mock the write_file function -        write_file_mock = self.mocker.replace(util.write_file, -                                              passthrough=False) -        write_file_mock(expected_file_fullname, self.payload, 0600) +        # Mock the write_file() function.  We'll assert that it got called as +        # expected in each of the individual tests. +        resources = ExitStack() +        self.addCleanup(resources.close) +        self.write_file_mock = resources.enter_context( +            mock.patch('cloudinit.util.write_file'))      def test_no_errors(self):          """Payload gets written to file and added to C{pdata}.""" -        import_mock = self.mocker.replace(importer.import_module, -                                          passthrough=False) -        import_mock(self.expected_module_name) -        self.mocker.result(self.module_fake) -        self.mocker.replay() - -        handlers.walker_handle_handler(self.data, self.ctype, self.filename, -                                       self.payload) - -        self.assertEqual(1, self.data["handlercount"]) +        with mock.patch('cloudinit.importer.import_module', +                        return_value=self.module_fake) as mockobj: +            handlers.walker_handle_handler(self.data, self.ctype, +                                           self.filename, self.payload) +            mockobj.assert_called_with_once(self.expected_module_name) +        self.write_file_mock.assert_called_with_once( +            self.expected_file_fullname, self.payload, 0o600) +        self.assertEqual(self.data['handlercount'], 1)      def test_import_error(self):          """Module import errors are logged. No handler added to C{pdata}.""" -        import_mock = self.mocker.replace(importer.import_module, -                                          passthrough=False) -        import_mock(self.expected_module_name) -        self.mocker.throw(ImportError()) -        self.mocker.replay() - -        handlers.walker_handle_handler(self.data, self.ctype, self.filename, -                                       self.payload) - -        self.assertEqual(0, self.data["handlercount"]) +        with mock.patch('cloudinit.importer.import_module', +                        side_effect=ImportError) as mockobj: +            handlers.walker_handle_handler(self.data, self.ctype, +                                           self.filename, self.payload) +            mockobj.assert_called_with_once(self.expected_module_name) +        self.write_file_mock.assert_called_with_once( +            self.expected_file_fullname, self.payload, 0o600) +        self.assertEqual(self.data['handlercount'], 0)      def test_attribute_error(self):          """Attribute errors are logged. No handler added to C{pdata}.""" -        import_mock = self.mocker.replace(importer.import_module, -                                          passthrough=False) -        import_mock(self.expected_module_name) -        self.mocker.result(self.module_fake) -        self.mocker.throw(AttributeError()) -        self.mocker.replay() - -        handlers.walker_handle_handler(self.data, self.ctype, self.filename, -                                       self.payload) +        with mock.patch('cloudinit.importer.import_module', +                        side_effect=AttributeError, +                        return_value=self.module_fake) as mockobj: +            handlers.walker_handle_handler(self.data, self.ctype, +                                           self.filename, self.payload) +            mockobj.assert_called_with_once(self.expected_module_name) +        self.write_file_mock.assert_called_with_once( +            self.expected_file_fullname, self.payload, 0o600) +        self.assertEqual(self.data['handlercount'], 0) -        self.assertEqual(0, self.data["handlercount"]) - -class TestHandlerHandlePart(MockerTestCase): +class TestHandlerHandlePart(unittest.TestCase):      def setUp(self):          self.data = "fake data" @@ -108,95 +116,80 @@ class TestHandlerHandlePart(MockerTestCase):          C{handle_part} is called without C{frequency} for          C{handler_version} == 1.          """ -        mod_mock = self.mocker.mock() -        getattr(mod_mock, "frequency") -        self.mocker.result(settings.PER_INSTANCE) -        getattr(mod_mock, "handler_version") -        self.mocker.result(1) -        mod_mock.handle_part(self.data, self.ctype, self.filename, -                             self.payload) -        self.mocker.replay() - -        handlers.run_part(mod_mock, self.data, self.filename, -                          self.payload, self.frequency, self.headers) +        mod_mock = mock.Mock(frequency=settings.PER_INSTANCE, +                             handler_version=1) +        handlers.run_part(mod_mock, self.data, self.filename, self.payload, +                          self.frequency, self.headers) +        # Assert that the handle_part() method of the mock object got +        # called with the expected arguments. +        mod_mock.handle_part.assert_called_with_once( +            self.data, self.ctype, self.filename, self.payload)      def test_normal_version_2(self):          """          C{handle_part} is called with C{frequency} for          C{handler_version} == 2.          """ -        mod_mock = self.mocker.mock() -        getattr(mod_mock, "frequency") -        self.mocker.result(settings.PER_INSTANCE) -        getattr(mod_mock, "handler_version") -        self.mocker.result(2) -        mod_mock.handle_part(self.data, self.ctype, self.filename, -                             self.payload, self.frequency) -        self.mocker.replay() - -        handlers.run_part(mod_mock, self.data, self.filename, -                          self.payload, self.frequency, self.headers) +        mod_mock = mock.Mock(frequency=settings.PER_INSTANCE, +                             handler_version=2) +        handlers.run_part(mod_mock, self.data, self.filename, self.payload, +                          self.frequency, self.headers) +        # Assert that the handle_part() method of the mock object got +        # called with the expected arguments. +        mod_mock.handle_part.assert_called_with_once( +            self.data, self.ctype, self.filename, self.payload)      def test_modfreq_per_always(self):          """          C{handle_part} is called regardless of frequency if nofreq is always.          """          self.frequency = "once" -        mod_mock = self.mocker.mock() -        getattr(mod_mock, "frequency") -        self.mocker.result(settings.PER_ALWAYS) -        getattr(mod_mock, "handler_version") -        self.mocker.result(1) -        mod_mock.handle_part(self.data, self.ctype, self.filename, -                             self.payload) -        self.mocker.replay() - -        handlers.run_part(mod_mock, self.data, self.filename, -                          self.payload, self.frequency, self.headers) +        mod_mock = mock.Mock(frequency=settings.PER_ALWAYS, +                             handler_version=1) +        handlers.run_part(mod_mock, self.data, self.filename, self.payload, +                          self.frequency, self.headers) +        # Assert that the handle_part() method of the mock object got +        # called with the expected arguments. +        mod_mock.handle_part.assert_called_with_once( +            self.data, self.ctype, self.filename, self.payload)      def test_no_handle_when_modfreq_once(self):          """C{handle_part} is not called if frequency is once."""          self.frequency = "once" -        mod_mock = self.mocker.mock() -        getattr(mod_mock, "frequency") -        self.mocker.result(settings.PER_ONCE) -        self.mocker.replay() - -        handlers.run_part(mod_mock, self.data, self.filename, -                          self.payload, self.frequency, self.headers) +        mod_mock = mock.Mock(frequency=settings.PER_ONCE) +        handlers.run_part(mod_mock, self.data, self.filename, self.payload, +                          self.frequency, self.headers) +        # Assert that the handle_part() method of the mock object got +        # called with the expected arguments. +        mod_mock.handle_part.assert_called_with_once( +            self.data, self.ctype, self.filename, self.payload)      def test_exception_is_caught(self):          """Exceptions within C{handle_part} are caught and logged.""" -        mod_mock = self.mocker.mock() -        getattr(mod_mock, "frequency") -        self.mocker.result(settings.PER_INSTANCE) -        getattr(mod_mock, "handler_version") -        self.mocker.result(1) -        mod_mock.handle_part(self.data, self.ctype, self.filename, -                             self.payload) -        self.mocker.throw(Exception()) -        self.mocker.replay() - -        handlers.run_part(mod_mock, self.data, self.filename, -                          self.payload, self.frequency, self.headers) - - -class TestCmdlineUrl(MockerTestCase): +        mod_mock = mock.Mock(frequency=settings.PER_INSTANCE, +                             handler_version=1) +        handlers.run_part(mod_mock, self.data, self.filename, self.payload, +                          self.frequency, self.headers) +        mod_mock.handle_part.side_effect = Exception +        handlers.run_part(mod_mock, self.data, self.filename, self.payload, +                          self.frequency, self.headers) +        mod_mock.handle_part.assert_called_with_once( +            self.data, self.ctype, self.filename, self.payload) + + +class TestCmdlineUrl(unittest.TestCase):      def test_invalid_content(self):          url = "http://example.com/foo"          key = "mykey"          payload = "0"          cmdline = "ro %s=%s bar=1" % (key, url) -        mock_readurl = self.mocker.replace(url_helper.readurl, -                                           passthrough=False) -        mock_readurl(url, ARGS, KWARGS) -        self.mocker.result(url_helper.StringResponse(payload)) -        self.mocker.replay() - -        self.assertEqual((key, url, None), -            util.get_cmdline_url(names=[key], starts="xxxxxx", -                                 cmdline=cmdline)) +        with mock.patch('cloudinit.url_helper.readurl', +                        return_value=url_helper.StringResponse(payload)): +            self.assertEqual( +                util.get_cmdline_url(names=[key], starts="xxxxxx", +                                     cmdline=cmdline), +                (key, url, None))      def test_valid_content(self):          url = "http://example.com/foo" @@ -204,27 +197,24 @@ class TestCmdlineUrl(MockerTestCase):          payload = "xcloud-config\nmydata: foo\nbar: wark\n"          cmdline = "ro %s=%s bar=1" % (key, url) -        mock_readurl = self.mocker.replace(url_helper.readurl, -                                           passthrough=False) -        mock_readurl(url, ARGS, KWARGS) -        self.mocker.result(url_helper.StringResponse(payload)) -        self.mocker.replay() - -        self.assertEqual((key, url, payload), -            util.get_cmdline_url(names=[key], starts="xcloud-config", -                            cmdline=cmdline)) +        with mock.patch('cloudinit.url_helper.readurl', +                        return_value=url_helper.StringResponse(payload)): +            self.assertEqual( +                util.get_cmdline_url(names=[key], starts="xcloud-config", +                                     cmdline=cmdline), +                (key, url, payload))      def test_no_key_found(self):          url = "http://example.com/foo"          key = "mykey"          cmdline = "ro %s=%s bar=1" % (key, url) -        self.mocker.replace(url_helper.readurl, passthrough=False) -        self.mocker.result(url_helper.StringResponse("")) -        self.mocker.replay() +        with mock.patch('cloudinit.url_helper.readurl', +                        return_value=url_helper.StringResponse('')): +            self.assertEqual( +                util.get_cmdline_url(names=["does-not-appear"], +                                     starts="#cloud-config", cmdline=cmdline), +                (None, None, None)) -        self.assertEqual((None, None, None), -            util.get_cmdline_url(names=["does-not-appear"], -                starts="#cloud-config", cmdline=cmdline))  # vi: ts=4 expandtab  | 
