# Copyright 2015 Canonical Ltd. # # This file is part of cloud-init. See LICENSE file for license information. from unittest import mock from cloudinit import reporting from cloudinit.reporting import events from cloudinit.reporting import handlers from tests.unittests.helpers import TestCase def _fake_registry(): return mock.Mock(registered_items={'a': mock.MagicMock(), 'b': mock.MagicMock()}) class TestReportStartEvent(TestCase): @mock.patch('cloudinit.reporting.events.instantiated_handler_registry', new_callable=_fake_registry) def test_report_start_event_passes_something_with_as_string_to_handlers( self, instantiated_handler_registry): event_name, event_description = 'my_test_event', 'my description' events.report_start_event(event_name, event_description) expected_string_representation = ': '.join( ['start', event_name, event_description]) for _, handler in ( instantiated_handler_registry.registered_items.items()): self.assertEqual(1, handler.publish_event.call_count) event = handler.publish_event.call_args[0][0] self.assertEqual(expected_string_representation, event.as_string()) class TestReportFinishEvent(TestCase): def _report_finish_event(self, result=events.status.SUCCESS): event_name, event_description = 'my_test_event', 'my description' events.report_finish_event( event_name, event_description, result=result) return event_name, event_description def assertHandlersPassedObjectWithAsString( self, handlers, expected_as_string): for _, handler in handlers.items(): self.assertEqual(1, handler.publish_event.call_count) event = handler.publish_event.call_args[0][0] self.assertEqual(expected_as_string, event.as_string()) @mock.patch('cloudinit.reporting.events.instantiated_handler_registry', new_callable=_fake_registry) def test_report_finish_event_passes_something_with_as_string_to_handlers( self, instantiated_handler_registry): event_name, event_description = self._report_finish_event() expected_string_representation = ': '.join( ['finish', event_name, events.status.SUCCESS, event_description]) self.assertHandlersPassedObjectWithAsString( instantiated_handler_registry.registered_items, expected_string_representation) @mock.patch('cloudinit.reporting.events.instantiated_handler_registry', new_callable=_fake_registry) def test_reporting_successful_finish_has_sensible_string_repr( self, instantiated_handler_registry): event_name, event_description = self._report_finish_event( result=events.status.SUCCESS) expected_string_representation = ': '.join( ['finish', event_name, events.status.SUCCESS, event_description]) self.assertHandlersPassedObjectWithAsString( instantiated_handler_registry.registered_items, expected_string_representation) @mock.patch('cloudinit.reporting.events.instantiated_handler_registry', new_callable=_fake_registry) def test_reporting_unsuccessful_finish_has_sensible_string_repr( self, instantiated_handler_registry): event_name, event_description = self._report_finish_event( result=events.status.FAIL) expected_string_representation = ': '.join( ['finish', event_name, events.status.FAIL, event_description]) self.assertHandlersPassedObjectWithAsString( instantiated_handler_registry.registered_items, expected_string_representation) def test_invalid_result_raises_attribute_error(self): self.assertRaises(ValueError, self._report_finish_event, ("BOGUS",)) class TestReportingEvent(TestCase): def test_as_string(self): event_type, name, description = 'test_type', 'test_name', 'test_desc' event = events.ReportingEvent(event_type, name, description) expected_string_representation = ': '.join( [event_type, name, description]) self.assertEqual(expected_string_representation, event.as_string()) def test_as_dict(self): event_type, name, desc = 'test_type', 'test_name', 'test_desc' event = events.ReportingEvent(event_type, name, desc) expected = {'event_type': event_type, 'name': name, 'description': desc, 'origin': 'cloudinit'} # allow for timestamp to differ, but must be present as_dict = event.as_dict() self.assertIn('timestamp', as_dict) del as_dict['timestamp'] self.assertEqual(expected, as_dict) class TestFinishReportingEvent(TestCase): def test_as_has_result(self): result = events.status.SUCCESS name, desc = 'test_name', 'test_desc' event = events.FinishReportingEvent(name, desc, result) ret = event.as_dict() self.assertTrue('result' in ret) self.assertEqual(ret['result'], result) def test_has_result_with_optional_post_files(self): result = events.status.SUCCESS name, desc, files = 'test_name', 'test_desc', [ '/really/fake/path/install.log'] event = events.FinishReportingEvent( name, desc, result, post_files=files) ret = event.as_dict() self.assertTrue('result' in ret) self.assertTrue('files' in ret) self.assertEqual(ret['result'], result) posted_install_log = ret['files'][0] self.assertTrue('path' in posted_install_log) self.assertTrue('content' in posted_install_log) self.assertTrue('encoding' in posted_install_log) self.assertEqual(posted_install_log['path'], files[0]) self.assertEqual(posted_install_log['encoding'], 'base64') class TestBaseReportingHandler(TestCase): def test_base_reporting_handler_is_abstract(self): regexp = r".*abstract.*publish_event.*" self.assertRaisesRegex(TypeError, regexp, handlers.ReportingHandler) class TestLogHandler(TestCase): @mock.patch.object(reporting.handlers.logging, 'getLogger') def test_appropriate_logger_used(self, getLogger): event_type, event_name = 'test_type', 'test_name' event = events.ReportingEvent(event_type, event_name, 'description') reporting.handlers.LogHandler().publish_event(event) self.assertEqual( [mock.call( 'cloudinit.reporting.{0}.{1}'.format(event_type, event_name))], getLogger.call_args_list) @mock.patch.object(reporting.handlers.logging, 'getLogger') def test_single_log_message_at_info_published(self, getLogger): event = events.ReportingEvent('type', 'name', 'description') reporting.handlers.LogHandler().publish_event(event) self.assertEqual(1, getLogger.return_value.log.call_count) @mock.patch.object(reporting.handlers.logging, 'getLogger') def test_log_message_uses_event_as_string(self, getLogger): event = events.ReportingEvent('type', 'name', 'description') reporting.handlers.LogHandler(level="INFO").publish_event(event) self.assertIn(event.as_string(), getLogger.return_value.log.call_args[0][1]) class TestDefaultRegisteredHandler(TestCase): def test_log_handler_registered_by_default(self): registered_items = ( reporting.instantiated_handler_registry.registered_items) for _, item in registered_items.items(): if isinstance(item, reporting.handlers.LogHandler): break else: self.fail('No reporting LogHandler registered by default.') class TestReportingConfiguration(TestCase): @mock.patch.object(reporting, 'instantiated_handler_registry') def test_empty_configuration_doesnt_add_handlers( self, instantiated_handler_registry): reporting.update_configuration({}) self.assertEqual( 0, instantiated_handler_registry.register_item.call_count) @mock.patch.object( reporting, 'instantiated_handler_registry', reporting.DictRegistry()) @mock.patch.object(reporting, 'available_handlers') def test_looks_up_handler_by_type_and_adds_it(self, available_handlers): handler_type_name = 'test_handler' handler_cls = mock.Mock() available_handlers.registered_items = {handler_type_name: handler_cls} handler_name = 'my_test_handler' reporting.update_configuration( {handler_name: {'type': handler_type_name}}) self.assertEqual( {handler_name: handler_cls.return_value}, reporting.instantiated_handler_registry.registered_items) @mock.patch.object( reporting, 'instantiated_handler_registry', reporting.DictRegistry()) @mock.patch.object(reporting, 'available_handlers') def test_uses_non_type_parts_of_config_dict_as_kwargs( self, available_handlers): handler_type_name = 'test_handler' handler_cls = mock.Mock() available_handlers.registered_items = {handler_type_name: handler_cls} extra_kwargs = {'foo': 'bar', 'bar': 'baz'} handler_config = extra_kwargs.copy() handler_config.update({'type': handler_type_name}) handler_name = 'my_test_handler' reporting.update_configuration({handler_name: handler_config}) self.assertEqual( handler_cls.return_value, reporting.instantiated_handler_registry.registered_items[ handler_name]) self.assertEqual([mock.call(**extra_kwargs)], handler_cls.call_args_list) @mock.patch.object( reporting, 'instantiated_handler_registry', reporting.DictRegistry()) @mock.patch.object(reporting, 'available_handlers') def test_handler_config_not_modified(self, available_handlers): handler_type_name = 'test_handler' handler_cls = mock.Mock() available_handlers.registered_items = {handler_type_name: handler_cls} handler_config = {'type': handler_type_name, 'foo': 'bar'} expected_handler_config = handler_config.copy() reporting.update_configuration({'my_test_handler': handler_config}) self.assertEqual(expected_handler_config, handler_config) @mock.patch.object( reporting, 'instantiated_handler_registry', reporting.DictRegistry()) @mock.patch.object(reporting, 'available_handlers') def test_handlers_removed_if_falseish_specified(self, available_handlers): handler_type_name = 'test_handler' handler_cls = mock.Mock() available_handlers.registered_items = {handler_type_name: handler_cls} handler_name = 'my_test_handler' reporting.update_configuration( {handler_name: {'type': handler_type_name}}) self.assertEqual( 1, len(reporting.instantiated_handler_registry.registered_items)) reporting.update_configuration({handler_name: None}) self.assertEqual( 0, len(reporting.instantiated_handler_registry.registered_items)) class TestReportingEventStack(TestCase): @mock.patch('cloudinit.reporting.events.report_finish_event') @mock.patch('cloudinit.reporting.events.report_start_event') def test_start_and_finish_success(self, report_start, report_finish): with events.ReportEventStack(name="myname", description="mydesc"): pass self.assertEqual( [mock.call('myname', 'mydesc')], report_start.call_args_list) self.assertEqual( [mock.call('myname', 'mydesc', events.status.SUCCESS, post_files=[])], report_finish.call_args_list) @mock.patch('cloudinit.reporting.events.report_finish_event') @mock.patch('cloudinit.reporting.events.report_start_event') def test_finish_exception_defaults_fail(self, report_start, report_finish): name = "myname" desc = "mydesc" try: with events.ReportEventStack(name, description=desc): raise ValueError("This didnt work") except ValueError: pass self.assertEqual([mock.call(name, desc)], report_start.call_args_list) self.assertEqual( [mock.call(name, desc, events.status.FAIL, post_files=[])], report_finish.call_args_list) @mock.patch('cloudinit.reporting.events.report_finish_event') @mock.patch('cloudinit.reporting.events.report_start_event') def test_result_on_exception_used(self, report_start, report_finish): name = "myname" desc = "mydesc" try: with events.ReportEventStack( name, desc, result_on_exception=events.status.WARN): raise ValueError("This didnt work") except ValueError: pass self.assertEqual([mock.call(name, desc)], report_start.call_args_list) self.assertEqual( [mock.call(name, desc, events.status.WARN, post_files=[])], report_finish.call_args_list) @mock.patch('cloudinit.reporting.events.report_start_event') def test_child_fullname_respects_parent(self, report_start): parent_name = "topname" c1_name = "c1name" c2_name = "c2name" c2_expected_fullname = '/'.join([parent_name, c1_name, c2_name]) c1_expected_fullname = '/'.join([parent_name, c1_name]) parent = events.ReportEventStack(parent_name, "topdesc") c1 = events.ReportEventStack(c1_name, "c1desc", parent=parent) c2 = events.ReportEventStack(c2_name, "c2desc", parent=c1) with c1: report_start.assert_called_with(c1_expected_fullname, "c1desc") with c2: report_start.assert_called_with(c2_expected_fullname, "c2desc") @mock.patch('cloudinit.reporting.events.report_finish_event') @mock.patch('cloudinit.reporting.events.report_start_event') def test_child_result_bubbles_up(self, report_start, report_finish): parent = events.ReportEventStack("topname", "topdesc") child = events.ReportEventStack("c_name", "c_desc", parent=parent) with parent: with child: child.result = events.status.WARN report_finish.assert_called_with( "topname", "topdesc", events.status.WARN, post_files=[]) @mock.patch('cloudinit.reporting.events.report_finish_event') def test_message_used_in_finish(self, report_finish): with events.ReportEventStack("myname", "mydesc", message="mymessage"): pass self.assertEqual( [mock.call("myname", "mymessage", events.status.SUCCESS, post_files=[])], report_finish.call_args_list) @mock.patch('cloudinit.reporting.events.report_finish_event') def test_message_updatable(self, report_finish): with events.ReportEventStack("myname", "mydesc") as c: c.message = "all good" self.assertEqual( [mock.call("myname", "all good", events.status.SUCCESS, post_files=[])], report_finish.call_args_list) @mock.patch('cloudinit.reporting.events.report_start_event') @mock.patch('cloudinit.reporting.events.report_finish_event') def test_reporting_disabled_does_not_report_events( self, report_start, report_finish): with events.ReportEventStack("a", "b", reporting_enabled=False): pass self.assertEqual(report_start.call_count, 0) self.assertEqual(report_finish.call_count, 0) @mock.patch('cloudinit.reporting.events.report_start_event') @mock.patch('cloudinit.reporting.events.report_finish_event') def test_reporting_child_default_to_parent( self, report_start, report_finish): parent = events.ReportEventStack( "pname", "pdesc", reporting_enabled=False) child = events.ReportEventStack("cname", "cdesc", parent=parent) with parent: with child: pass self.assertEqual(report_start.call_count, 0) self.assertEqual(report_finish.call_count, 0) def test_reporting_event_has_sane_repr(self): myrep = events.ReportEventStack("fooname", "foodesc", reporting_enabled=True).__repr__() self.assertIn("fooname", myrep) self.assertIn("foodesc", myrep) self.assertIn("True", myrep) def test_set_invalid_result_raises_value_error(self): f = events.ReportEventStack("myname", "mydesc") self.assertRaises(ValueError, setattr, f, "result", "BOGUS") class TestStatusAccess(TestCase): def test_invalid_status_access_raises_value_error(self): self.assertRaises(AttributeError, getattr, events.status, "BOGUS") # vi: ts=4 expandtab