diff options
Diffstat (limited to 'tests/unittests')
| -rw-r--r-- | tests/unittests/test_datasource/test_maas.py | 2 | ||||
| -rw-r--r-- | tests/unittests/test_registry.py | 28 | ||||
| -rw-r--r-- | tests/unittests/test_reporting.py | 359 | 
3 files changed, 388 insertions, 1 deletions
| diff --git a/tests/unittests/test_datasource/test_maas.py b/tests/unittests/test_datasource/test_maas.py index f109bb04..eb97b692 100644 --- a/tests/unittests/test_datasource/test_maas.py +++ b/tests/unittests/test_datasource/test_maas.py @@ -141,7 +141,7 @@ class TestMAASDataSource(TestCase):          with mock.patch.object(url_helper, 'readurl',                                 side_effect=side_effect()) as mockobj:              userdata, metadata = DataSourceMAAS.read_maas_seed_url( -                my_seed, header_cb=my_headers_cb, version=my_ver) +                my_seed, version=my_ver)              self.assertEqual(b"foodata", userdata)              self.assertEqual(metadata['instance-id'], diff --git a/tests/unittests/test_registry.py b/tests/unittests/test_registry.py new file mode 100644 index 00000000..bcf01475 --- /dev/null +++ b/tests/unittests/test_registry.py @@ -0,0 +1,28 @@ +from cloudinit.registry import DictRegistry + +from .helpers import (mock, TestCase) + + +class TestDictRegistry(TestCase): + +    def test_added_item_included_in_output(self): +        registry = DictRegistry() +        item_key, item_to_register = 'test_key', mock.Mock() +        registry.register_item(item_key, item_to_register) +        self.assertEqual({item_key: item_to_register}, +                         registry.registered_items) + +    def test_registry_starts_out_empty(self): +        self.assertEqual({}, DictRegistry().registered_items) + +    def test_modifying_registered_items_isnt_exposed_to_other_callers(self): +        registry = DictRegistry() +        registry.registered_items['test_item'] = mock.Mock() +        self.assertEqual({}, registry.registered_items) + +    def test_keys_cannot_be_replaced(self): +        registry = DictRegistry() +        item_key = 'test_key' +        registry.register_item(item_key, mock.Mock()) +        self.assertRaises(ValueError, +                          registry.register_item, item_key, mock.Mock()) diff --git a/tests/unittests/test_reporting.py b/tests/unittests/test_reporting.py new file mode 100644 index 00000000..1a4ee8c4 --- /dev/null +++ b/tests/unittests/test_reporting.py @@ -0,0 +1,359 @@ +# Copyright 2015 Canonical Ltd. +# This file is part of cloud-init.  See LICENCE file for license information. +# +# vi: ts=4 expandtab + +from cloudinit import reporting +from cloudinit.reporting import handlers + +from .helpers import (mock, TestCase) + + +def _fake_registry(): +    return mock.Mock(registered_items={'a': mock.MagicMock(), +                                       'b': mock.MagicMock()}) + + +class TestReportStartEvent(TestCase): + +    @mock.patch('cloudinit.reporting.instantiated_handler_registry', +                new_callable=_fake_registry) +    def test_report_start_event_passes_something_with_as_string_to_handlers( +            self, instantiated_handler_registry): +        event_name, event_description = 'my_test_event', 'my description' +        reporting.report_start_event(event_name, event_description) +        expected_string_representation = ': '.join( +            ['start', event_name, event_description]) +        for _, handler in ( +                instantiated_handler_registry.registered_items.items()): +            self.assertEqual(1, handler.publish_event.call_count) +            event = handler.publish_event.call_args[0][0] +            self.assertEqual(expected_string_representation, event.as_string()) + + +class TestReportFinishEvent(TestCase): + +    def _report_finish_event(self, result=reporting.status.SUCCESS): +        event_name, event_description = 'my_test_event', 'my description' +        reporting.report_finish_event( +            event_name, event_description, result=result) +        return event_name, event_description + +    def assertHandlersPassedObjectWithAsString( +            self, handlers, expected_as_string): +        for _, handler in handlers.items(): +            self.assertEqual(1, handler.publish_event.call_count) +            event = handler.publish_event.call_args[0][0] +            self.assertEqual(expected_as_string, event.as_string()) + +    @mock.patch('cloudinit.reporting.instantiated_handler_registry', +                new_callable=_fake_registry) +    def test_report_finish_event_passes_something_with_as_string_to_handlers( +            self, instantiated_handler_registry): +        event_name, event_description = self._report_finish_event() +        expected_string_representation = ': '.join( +            ['finish', event_name, reporting.status.SUCCESS, +             event_description]) +        self.assertHandlersPassedObjectWithAsString( +            instantiated_handler_registry.registered_items, +            expected_string_representation) + +    @mock.patch('cloudinit.reporting.instantiated_handler_registry', +                new_callable=_fake_registry) +    def test_reporting_successful_finish_has_sensible_string_repr( +            self, instantiated_handler_registry): +        event_name, event_description = self._report_finish_event( +            result=reporting.status.SUCCESS) +        expected_string_representation = ': '.join( +            ['finish', event_name, reporting.status.SUCCESS, +             event_description]) +        self.assertHandlersPassedObjectWithAsString( +            instantiated_handler_registry.registered_items, +            expected_string_representation) + +    @mock.patch('cloudinit.reporting.instantiated_handler_registry', +                new_callable=_fake_registry) +    def test_reporting_unsuccessful_finish_has_sensible_string_repr( +            self, instantiated_handler_registry): +        event_name, event_description = self._report_finish_event( +            result=reporting.status.FAIL) +        expected_string_representation = ': '.join( +            ['finish', event_name, reporting.status.FAIL, event_description]) +        self.assertHandlersPassedObjectWithAsString( +            instantiated_handler_registry.registered_items, +            expected_string_representation) + +    def test_invalid_result_raises_attribute_error(self): +        self.assertRaises(ValueError, self._report_finish_event, ("BOGUS",)) + + +class TestReportingEvent(TestCase): + +    def test_as_string(self): +        event_type, name, description = 'test_type', 'test_name', 'test_desc' +        event = reporting.ReportingEvent(event_type, name, description) +        expected_string_representation = ': '.join( +            [event_type, name, description]) +        self.assertEqual(expected_string_representation, event.as_string()) + +    def test_as_dict(self): +        event_type, name, desc = 'test_type', 'test_name', 'test_desc' +        event = reporting.ReportingEvent(event_type, name, desc) +        self.assertEqual( +            {'event_type': event_type, 'name': name, 'description': desc}, +            event.as_dict()) + + +class TestFinishReportingEvent(TestCase): +    def test_as_has_result(self): +        result = reporting.status.SUCCESS +        name, desc = 'test_name', 'test_desc' +        event = reporting.FinishReportingEvent(name, desc, result) +        ret = event.as_dict() +        self.assertTrue('result' in ret) +        self.assertEqual(ret['result'], result) + + +class TestBaseReportingHandler(TestCase): + +    def test_base_reporting_handler_is_abstract(self): +        regexp = r".*abstract.*publish_event.*" +        self.assertRaisesRegexp(TypeError, regexp, handlers.ReportingHandler) + + +class TestLogHandler(TestCase): + +    @mock.patch.object(reporting.handlers.logging, 'getLogger') +    def test_appropriate_logger_used(self, getLogger): +        event_type, event_name = 'test_type', 'test_name' +        event = reporting.ReportingEvent(event_type, event_name, 'description') +        reporting.handlers.LogHandler().publish_event(event) +        self.assertEqual( +            [mock.call( +                'cloudinit.reporting.{0}.{1}'.format(event_type, event_name))], +            getLogger.call_args_list) + +    @mock.patch.object(reporting.handlers.logging, 'getLogger') +    def test_single_log_message_at_info_published(self, getLogger): +        event = reporting.ReportingEvent('type', 'name', 'description') +        reporting.handlers.LogHandler().publish_event(event) +        self.assertEqual(1, getLogger.return_value.info.call_count) + +    @mock.patch.object(reporting.handlers.logging, 'getLogger') +    def test_log_message_uses_event_as_string(self, getLogger): +        event = reporting.ReportingEvent('type', 'name', 'description') +        reporting.handlers.LogHandler().publish_event(event) +        self.assertIn(event.as_string(), +                      getLogger.return_value.info.call_args[0][0]) + + +class TestDefaultRegisteredHandler(TestCase): + +    def test_log_handler_registered_by_default(self): +        registered_items = ( +            reporting.instantiated_handler_registry.registered_items) +        for _, item in registered_items.items(): +            if isinstance(item, reporting.handlers.LogHandler): +                break +        else: +            self.fail('No reporting LogHandler registered by default.') + + +class TestReportingConfiguration(TestCase): + +    @mock.patch.object(reporting, 'instantiated_handler_registry') +    def test_empty_configuration_doesnt_add_handlers( +            self, instantiated_handler_registry): +        reporting.update_configuration({}) +        self.assertEqual( +            0, instantiated_handler_registry.register_item.call_count) + +    @mock.patch.object( +        reporting, 'instantiated_handler_registry', reporting.DictRegistry()) +    @mock.patch.object(reporting, 'available_handlers') +    def test_looks_up_handler_by_type_and_adds_it(self, available_handlers): +        handler_type_name = 'test_handler' +        handler_cls = mock.Mock() +        available_handlers.registered_items = {handler_type_name: handler_cls} +        handler_name = 'my_test_handler' +        reporting.update_configuration( +            {handler_name: {'type': handler_type_name}}) +        self.assertEqual( +            {handler_name: handler_cls.return_value}, +            reporting.instantiated_handler_registry.registered_items) + +    @mock.patch.object( +        reporting, 'instantiated_handler_registry', reporting.DictRegistry()) +    @mock.patch.object(reporting, 'available_handlers') +    def test_uses_non_type_parts_of_config_dict_as_kwargs( +            self, available_handlers): +        handler_type_name = 'test_handler' +        handler_cls = mock.Mock() +        available_handlers.registered_items = {handler_type_name: handler_cls} +        extra_kwargs = {'foo': 'bar', 'bar': 'baz'} +        handler_config = extra_kwargs.copy() +        handler_config.update({'type': handler_type_name}) +        handler_name = 'my_test_handler' +        reporting.update_configuration({handler_name: handler_config}) +        self.assertEqual( +            handler_cls.return_value, +            reporting.instantiated_handler_registry.registered_items[ +                handler_name]) +        self.assertEqual([mock.call(**extra_kwargs)], +                         handler_cls.call_args_list) + +    @mock.patch.object( +        reporting, 'instantiated_handler_registry', reporting.DictRegistry()) +    @mock.patch.object(reporting, 'available_handlers') +    def test_handler_config_not_modified(self, available_handlers): +        handler_type_name = 'test_handler' +        handler_cls = mock.Mock() +        available_handlers.registered_items = {handler_type_name: handler_cls} +        handler_config = {'type': handler_type_name, 'foo': 'bar'} +        expected_handler_config = handler_config.copy() +        reporting.update_configuration({'my_test_handler': handler_config}) +        self.assertEqual(expected_handler_config, handler_config) + +    @mock.patch.object( +        reporting, 'instantiated_handler_registry', reporting.DictRegistry()) +    @mock.patch.object(reporting, 'available_handlers') +    def test_handlers_removed_if_falseish_specified(self, available_handlers): +        handler_type_name = 'test_handler' +        handler_cls = mock.Mock() +        available_handlers.registered_items = {handler_type_name: handler_cls} +        handler_name = 'my_test_handler' +        reporting.update_configuration( +            {handler_name: {'type': handler_type_name}}) +        self.assertEqual( +            1, len(reporting.instantiated_handler_registry.registered_items)) +        reporting.update_configuration({handler_name: None}) +        self.assertEqual( +            0, len(reporting.instantiated_handler_registry.registered_items)) + + +class TestReportingEventStack(TestCase): +    @mock.patch('cloudinit.reporting.report_finish_event') +    @mock.patch('cloudinit.reporting.report_start_event') +    def test_start_and_finish_success(self, report_start, report_finish): +        with reporting.ReportEventStack(name="myname", description="mydesc"): +            pass +        self.assertEqual( +            [mock.call('myname', 'mydesc')], report_start.call_args_list) +        self.assertEqual( +            [mock.call('myname', 'mydesc', reporting.status.SUCCESS)], +            report_finish.call_args_list) + +    @mock.patch('cloudinit.reporting.report_finish_event') +    @mock.patch('cloudinit.reporting.report_start_event') +    def test_finish_exception_defaults_fail(self, report_start, report_finish): +        name = "myname" +        desc = "mydesc" +        try: +            with reporting.ReportEventStack(name, description=desc): +                raise ValueError("This didnt work") +        except ValueError: +            pass +        self.assertEqual([mock.call(name, desc)], report_start.call_args_list) +        self.assertEqual( +            [mock.call(name, desc, reporting.status.FAIL)], +            report_finish.call_args_list) + +    @mock.patch('cloudinit.reporting.report_finish_event') +    @mock.patch('cloudinit.reporting.report_start_event') +    def test_result_on_exception_used(self, report_start, report_finish): +        name = "myname" +        desc = "mydesc" +        try: +            with reporting.ReportEventStack( +                    name, desc, result_on_exception=reporting.status.WARN): +                raise ValueError("This didnt work") +        except ValueError: +            pass +        self.assertEqual([mock.call(name, desc)], report_start.call_args_list) +        self.assertEqual( +            [mock.call(name, desc, reporting.status.WARN)], +            report_finish.call_args_list) + +    @mock.patch('cloudinit.reporting.report_start_event') +    def test_child_fullname_respects_parent(self, report_start): +        parent_name = "topname" +        c1_name = "c1name" +        c2_name = "c2name" +        c2_expected_fullname = '/'.join([parent_name, c1_name, c2_name]) +        c1_expected_fullname = '/'.join([parent_name, c1_name]) + +        parent = reporting.ReportEventStack(parent_name, "topdesc") +        c1 = reporting.ReportEventStack(c1_name, "c1desc", parent=parent) +        c2 = reporting.ReportEventStack(c2_name, "c2desc", parent=c1) +        with c1: +            report_start.assert_called_with(c1_expected_fullname, "c1desc") +            with c2: +                report_start.assert_called_with(c2_expected_fullname, "c2desc") + +    @mock.patch('cloudinit.reporting.report_finish_event') +    @mock.patch('cloudinit.reporting.report_start_event') +    def test_child_result_bubbles_up(self, report_start, report_finish): +        parent = reporting.ReportEventStack("topname", "topdesc") +        child = reporting.ReportEventStack("c_name", "c_desc", parent=parent) +        with parent: +            with child: +                child.result = reporting.status.WARN + +        report_finish.assert_called_with( +            "topname", "topdesc", reporting.status.WARN) + +    @mock.patch('cloudinit.reporting.report_finish_event') +    def test_message_used_in_finish(self, report_finish): +        with reporting.ReportEventStack("myname", "mydesc", +                                        message="mymessage"): +            pass +        self.assertEqual( +            [mock.call("myname", "mymessage", reporting.status.SUCCESS)], +            report_finish.call_args_list) + +    @mock.patch('cloudinit.reporting.report_finish_event') +    def test_message_updatable(self, report_finish): +        with reporting.ReportEventStack("myname", "mydesc") as c: +            c.message = "all good" +        self.assertEqual( +            [mock.call("myname", "all good", reporting.status.SUCCESS)], +            report_finish.call_args_list) + +    @mock.patch('cloudinit.reporting.report_start_event') +    @mock.patch('cloudinit.reporting.report_finish_event') +    def test_reporting_disabled_does_not_report_events( +            self, report_start, report_finish): +        with reporting.ReportEventStack("a", "b", reporting_enabled=False): +            pass +        self.assertEqual(report_start.call_count, 0) +        self.assertEqual(report_finish.call_count, 0) + +    @mock.patch('cloudinit.reporting.report_start_event') +    @mock.patch('cloudinit.reporting.report_finish_event') +    def test_reporting_child_default_to_parent( +            self, report_start, report_finish): +        parent = reporting.ReportEventStack( +            "pname", "pdesc", reporting_enabled=False) +        child = reporting.ReportEventStack("cname", "cdesc", parent=parent) +        with parent: +            with child: +                pass +            pass +        self.assertEqual(report_start.call_count, 0) +        self.assertEqual(report_finish.call_count, 0) + +    def test_reporting_event_has_sane_repr(self): +        myrep = reporting.ReportEventStack("fooname", "foodesc", +                                           reporting_enabled=True).__repr__() +        self.assertIn("fooname", myrep) +        self.assertIn("foodesc", myrep) +        self.assertIn("True", myrep) + +    def test_set_invalid_result_raises_value_error(self): +        f = reporting.ReportEventStack("myname", "mydesc") +        self.assertRaises(ValueError, setattr, f, "result", "BOGUS") + + +class TestStatusAccess(TestCase): +    def test_invalid_status_access_raises_value_error(self): +        self.assertRaises(AttributeError, getattr, reporting.status, "BOGUS") | 
