diff options
-rw-r--r-- | cloudinit/reporting/events.py | 23 | ||||
-rwxr-xr-x | cloudinit/reporting/handlers.py | 12 | ||||
-rwxr-xr-x | cloudinit/sources/DataSourceAzure.py | 12 | ||||
-rwxr-xr-x | cloudinit/sources/helpers/azure.py | 58 | ||||
-rw-r--r-- | tests/unittests/test_reporting_hyperv.py | 74 |
5 files changed, 162 insertions, 17 deletions
diff --git a/cloudinit/reporting/events.py b/cloudinit/reporting/events.py index e5dfab33..b8677c8b 100644 --- a/cloudinit/reporting/events.py +++ b/cloudinit/reporting/events.py @@ -12,7 +12,7 @@ import base64 import os.path import time -from . import instantiated_handler_registry +from . import instantiated_handler_registry, available_handlers FINISH_EVENT_TYPE = 'finish' START_EVENT_TYPE = 'start' @@ -81,17 +81,32 @@ class FinishReportingEvent(ReportingEvent): return data -def report_event(event): - """Report an event to all registered event handlers. +def report_event(event, excluded_handler_types=None): + """Report an event to all registered event handlers + except those whose type is in excluded_handler_types. This should generally be called via one of the other functions in the reporting module. + :param excluded_handler_types: + List of handlers types to exclude from reporting the event to. :param event_type: The type of the event; this should be a constant from the reporting module. """ - for _, handler in instantiated_handler_registry.registered_items.items(): + + if not excluded_handler_types: + excluded_handler_types = {} + excluded_handler_classes = { + hndl_cls + for hndl_type, hndl_cls in available_handlers.registered_items.items() + if hndl_type in excluded_handler_types + } + + handlers = instantiated_handler_registry.registered_items.items() + for _, handler in handlers: + if type(handler) in excluded_handler_classes: + continue # skip this excluded handler handler.publish_event(event) diff --git a/cloudinit/reporting/handlers.py b/cloudinit/reporting/handlers.py index 1986ebdd..0a8c7af3 100755 --- a/cloudinit/reporting/handlers.py +++ b/cloudinit/reporting/handlers.py @@ -113,6 +113,8 @@ class HyperVKvpReportingHandler(ReportingHandler): https://technet.microsoft.com/en-us/library/dn798287.aspx#Linux%20guests """ HV_KVP_EXCHANGE_MAX_VALUE_SIZE = 2048 + # The maximum value size expected in Azure + HV_KVP_AZURE_MAX_VALUE_SIZE = 1024 HV_KVP_EXCHANGE_MAX_KEY_SIZE = 512 HV_KVP_RECORD_SIZE = (HV_KVP_EXCHANGE_MAX_KEY_SIZE + HV_KVP_EXCHANGE_MAX_VALUE_SIZE) @@ -195,7 +197,8 @@ class HyperVKvpReportingHandler(ReportingHandler): def _event_key(self, event): """ the event key format is: - CLOUD_INIT|<incarnation number>|<event_type>|<event_name>|<time> + CLOUD_INIT|<incarnation number>|<event_type>|<event_name>|<uuid> + [|subevent_index] """ return u"{0}|{1}|{2}|{3}".format(self.event_key_prefix, event.event_type, event.name, @@ -249,13 +252,14 @@ class HyperVKvpReportingHandler(ReportingHandler): data_without_desc = json.dumps(meta_data, separators=self.JSON_SEPARATORS) room_for_desc = ( - self.HV_KVP_EXCHANGE_MAX_VALUE_SIZE - + self.HV_KVP_AZURE_MAX_VALUE_SIZE - len(data_without_desc) - 8) value = data_without_desc.replace( message_place_holder, '"{key}":"{desc}"'.format( key=self.MSG_KEY, desc=des_in_json[:room_for_desc])) - result_array.append(self._encode_kvp_item(key, value)) + subkey = "{}|{}".format(key, i) + result_array.append(self._encode_kvp_item(subkey, value)) i += 1 des_in_json = des_in_json[room_for_desc:] if len(des_in_json) == 0: @@ -282,7 +286,7 @@ class HyperVKvpReportingHandler(ReportingHandler): # if it reaches the maximum length of kvp value, # break it down to slices. # this should be very corner case. - if len(value) > self.HV_KVP_EXCHANGE_MAX_VALUE_SIZE: + if len(value) > self.HV_KVP_AZURE_MAX_VALUE_SIZE: return self._break_down(key, meta_data, event.description) else: data = self._encode_kvp_item(key, value) diff --git a/cloudinit/sources/DataSourceAzure.py b/cloudinit/sources/DataSourceAzure.py index a3810ca8..f572a4ef 100755 --- a/cloudinit/sources/DataSourceAzure.py +++ b/cloudinit/sources/DataSourceAzure.py @@ -35,7 +35,8 @@ from cloudinit.sources.helpers.azure import ( report_diagnostic_event, EphemeralDHCPv4WithReporting, is_byte_swapped, - dhcp_log_cb) + dhcp_log_cb, + push_log_to_kvp) LOG = logging.getLogger(__name__) @@ -789,9 +790,12 @@ class DataSourceAzure(sources.DataSource): @azure_ds_telemetry_reporter def activate(self, cfg, is_new_instance): - address_ephemeral_resize(is_new_instance=is_new_instance, - preserve_ntfs=self.ds_cfg.get( - DS_CFG_KEY_PRESERVE_NTFS, False)) + try: + address_ephemeral_resize(is_new_instance=is_new_instance, + preserve_ntfs=self.ds_cfg.get( + DS_CFG_KEY_PRESERVE_NTFS, False)) + finally: + push_log_to_kvp(self.sys_cfg['def_log_file']) return @property diff --git a/cloudinit/sources/helpers/azure.py b/cloudinit/sources/helpers/azure.py index 6df28ccf..6156c75b 100755 --- a/cloudinit/sources/helpers/azure.py +++ b/cloudinit/sources/helpers/azure.py @@ -1,5 +1,5 @@ # This file is part of cloud-init. See LICENSE file for license information. - +import base64 import json import logging import os @@ -8,7 +8,9 @@ import socket import struct import time import textwrap +import zlib +from cloudinit.settings import CFG_BUILTIN from cloudinit.net import dhcp from cloudinit import stages from cloudinit import temp_utils @@ -33,7 +35,14 @@ DEFAULT_WIRESERVER_ENDPOINT = "a8:3f:81:10" BOOT_EVENT_TYPE = 'boot-telemetry' SYSTEMINFO_EVENT_TYPE = 'system-info' DIAGNOSTIC_EVENT_TYPE = 'diagnostic' - +COMPRESSED_EVENT_TYPE = 'compressed' +# Maximum number of bytes of the cloud-init.log file that can be dumped to KVP +# at once. This number is based on the analysis done on a large sample of +# cloud-init.log files where the P95 of the file sizes was 537KB and the time +# consumed to dump 500KB file was (P95:76, P99:233, P99.9:1170) in ms +MAX_LOG_TO_KVP_LENGTH = 512000 +# Marker file to indicate whether cloud-init.log is pushed to KVP +LOG_PUSHED_TO_KVP_MARKER_FILE = '/var/lib/cloud/data/log_pushed_to_kvp' azure_ds_reporter = events.ReportEventStack( name="azure-ds", description="initialize reporter for azure ds", @@ -177,6 +186,49 @@ def report_diagnostic_event(str): return evt +def report_compressed_event(event_name, event_content): + """Report a compressed event""" + compressed_data = base64.encodebytes(zlib.compress(event_content)) + event_data = {"encoding": "gz+b64", + "data": compressed_data.decode('ascii')} + evt = events.ReportingEvent( + COMPRESSED_EVENT_TYPE, event_name, + json.dumps(event_data), + events.DEFAULT_EVENT_ORIGIN) + events.report_event(evt, + excluded_handler_types={"log", "print", "webhook"}) + + # return the event for unit testing purpose + return evt + + +@azure_ds_telemetry_reporter +def push_log_to_kvp(file_name=CFG_BUILTIN['def_log_file']): + """Push a portion of cloud-init.log file or the whole file to KVP + based on the file size. + If called more than once, it skips pushing the log file to KVP again.""" + + log_pushed_to_kvp = bool(os.path.isfile(LOG_PUSHED_TO_KVP_MARKER_FILE)) + if log_pushed_to_kvp: + report_diagnostic_event("cloud-init.log is already pushed to KVP") + return + + LOG.debug("Dumping cloud-init.log file to KVP") + try: + with open(file_name, "rb") as f: + f.seek(0, os.SEEK_END) + seek_index = max(f.tell() - MAX_LOG_TO_KVP_LENGTH, 0) + report_diagnostic_event( + "Dumping last {} bytes of cloud-init.log file to KVP".format( + f.tell() - seek_index)) + f.seek(seek_index, os.SEEK_SET) + report_compressed_event("cloud-init.log", f.read()) + util.write_file(LOG_PUSHED_TO_KVP_MARKER_FILE, '') + except Exception as ex: + report_diagnostic_event("Exception when dumping log file: %s" % + repr(ex)) + + @contextmanager def cd(newdir): prevdir = os.getcwd() @@ -474,6 +526,8 @@ class GoalStateHealthReporter: @azure_ds_telemetry_reporter def _post_health_report(self, document): + push_log_to_kvp() + # Whenever report_diagnostic_event(diagnostic_msg) is invoked in code, # the diagnostic messages are written to special files # (/var/opt/hyperv/.kvp_pool_*) as Hyper-V KVP messages. diff --git a/tests/unittests/test_reporting_hyperv.py b/tests/unittests/test_reporting_hyperv.py index bacf5da9..47ede670 100644 --- a/tests/unittests/test_reporting_hyperv.py +++ b/tests/unittests/test_reporting_hyperv.py @@ -1,7 +1,9 @@ # This file is part of cloud-init. See LICENSE file for license information. +import base64 +import zlib -from cloudinit.reporting import events -from cloudinit.reporting.handlers import HyperVKvpReportingHandler +from cloudinit.reporting import events, instantiated_handler_registry +from cloudinit.reporting.handlers import HyperVKvpReportingHandler, LogHandler import json import os @@ -72,7 +74,7 @@ class TextKvpReporter(CiTestCase): def test_event_very_long(self): reporter = HyperVKvpReportingHandler( kvp_file_path=self.tmp_file_path) - description = 'ab' * reporter.HV_KVP_EXCHANGE_MAX_VALUE_SIZE + description = 'ab' * reporter.HV_KVP_AZURE_MAX_VALUE_SIZE long_event = events.FinishReportingEvent( 'event_name', description, @@ -199,6 +201,72 @@ class TextKvpReporter(CiTestCase): if "test_diagnostic" not in evt_msg: raise AssertionError("missing expected diagnostic message") + def test_report_compressed_event(self): + reporter = HyperVKvpReportingHandler(kvp_file_path=self.tmp_file_path) + try: + instantiated_handler_registry.register_item("telemetry", reporter) + event_desc = b'test_compressed' + azure.report_compressed_event( + "compressed event", event_desc) + + self.validate_compressed_kvps(reporter, 1, [event_desc]) + finally: + instantiated_handler_registry.unregister_item("telemetry", + force=False) + + @mock.patch.object(LogHandler, 'publish_event') + def test_push_log_to_kvp(self, publish_event): + reporter = HyperVKvpReportingHandler(kvp_file_path=self.tmp_file_path) + try: + instantiated_handler_registry.register_item("telemetry", reporter) + log_file = self.tmp_path("cloud-init.log") + azure.MAX_LOG_TO_KVP_LENGTH = 100 + azure.LOG_PUSHED_TO_KVP_MARKER_FILE = self.tmp_path( + 'log_pushed_to_kvp') + with open(log_file, "w") as f: + log_content = "A" * 50 + "B" * 100 + f.write(log_content) + azure.push_log_to_kvp(log_file) + + with open(log_file, "a") as f: + extra_content = "C" * 10 + f.write(extra_content) + azure.push_log_to_kvp(log_file) + + for call_arg in publish_event.call_args_list: + event = call_arg[0][0] + self.assertNotEqual( + event.event_type, azure.COMPRESSED_EVENT_TYPE) + self.validate_compressed_kvps( + reporter, 1, + [log_content[-azure.MAX_LOG_TO_KVP_LENGTH:].encode()]) + finally: + instantiated_handler_registry.unregister_item("telemetry", + force=False) + + def validate_compressed_kvps(self, reporter, count, values): + reporter.q.join() + kvps = list(reporter._iterate_kvps(0)) + compressed_count = 0 + for i in range(len(kvps)): + kvp = kvps[i] + kvp_value = kvp['value'] + kvp_value_json = json.loads(kvp_value) + evt_msg = kvp_value_json["msg"] + evt_type = kvp_value_json["type"] + if evt_type != azure.COMPRESSED_EVENT_TYPE: + continue + evt_msg_json = json.loads(evt_msg) + evt_encoding = evt_msg_json["encoding"] + evt_data = zlib.decompress( + base64.decodebytes(evt_msg_json["data"].encode("ascii"))) + + self.assertLess(compressed_count, len(values)) + self.assertEqual(evt_data, values[compressed_count]) + self.assertEqual(evt_encoding, "gz+b64") + compressed_count += 1 + self.assertEqual(compressed_count, count) + def test_unique_kvp_key(self): reporter = HyperVKvpReportingHandler(kvp_file_path=self.tmp_file_path) evt1 = events.ReportingEvent( |