summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cloudinit/reporting/events.py23
-rwxr-xr-xcloudinit/reporting/handlers.py12
-rwxr-xr-xcloudinit/sources/DataSourceAzure.py12
-rwxr-xr-xcloudinit/sources/helpers/azure.py58
-rw-r--r--tests/unittests/test_reporting_hyperv.py74
5 files changed, 162 insertions, 17 deletions
diff --git a/cloudinit/reporting/events.py b/cloudinit/reporting/events.py
index e5dfab33..b8677c8b 100644
--- a/cloudinit/reporting/events.py
+++ b/cloudinit/reporting/events.py
@@ -12,7 +12,7 @@ import base64
import os.path
import time
-from . import instantiated_handler_registry
+from . import instantiated_handler_registry, available_handlers
FINISH_EVENT_TYPE = 'finish'
START_EVENT_TYPE = 'start'
@@ -81,17 +81,32 @@ class FinishReportingEvent(ReportingEvent):
return data
-def report_event(event):
- """Report an event to all registered event handlers.
+def report_event(event, excluded_handler_types=None):
+ """Report an event to all registered event handlers
+ except those whose type is in excluded_handler_types.
This should generally be called via one of the other functions in
the reporting module.
+ :param excluded_handler_types:
+ List of handlers types to exclude from reporting the event to.
:param event_type:
The type of the event; this should be a constant from the
reporting module.
"""
- for _, handler in instantiated_handler_registry.registered_items.items():
+
+ if not excluded_handler_types:
+ excluded_handler_types = {}
+ excluded_handler_classes = {
+ hndl_cls
+ for hndl_type, hndl_cls in available_handlers.registered_items.items()
+ if hndl_type in excluded_handler_types
+ }
+
+ handlers = instantiated_handler_registry.registered_items.items()
+ for _, handler in handlers:
+ if type(handler) in excluded_handler_classes:
+ continue # skip this excluded handler
handler.publish_event(event)
diff --git a/cloudinit/reporting/handlers.py b/cloudinit/reporting/handlers.py
index 1986ebdd..0a8c7af3 100755
--- a/cloudinit/reporting/handlers.py
+++ b/cloudinit/reporting/handlers.py
@@ -113,6 +113,8 @@ class HyperVKvpReportingHandler(ReportingHandler):
https://technet.microsoft.com/en-us/library/dn798287.aspx#Linux%20guests
"""
HV_KVP_EXCHANGE_MAX_VALUE_SIZE = 2048
+ # The maximum value size expected in Azure
+ HV_KVP_AZURE_MAX_VALUE_SIZE = 1024
HV_KVP_EXCHANGE_MAX_KEY_SIZE = 512
HV_KVP_RECORD_SIZE = (HV_KVP_EXCHANGE_MAX_KEY_SIZE +
HV_KVP_EXCHANGE_MAX_VALUE_SIZE)
@@ -195,7 +197,8 @@ class HyperVKvpReportingHandler(ReportingHandler):
def _event_key(self, event):
"""
the event key format is:
- CLOUD_INIT|<incarnation number>|<event_type>|<event_name>|<time>
+ CLOUD_INIT|<incarnation number>|<event_type>|<event_name>|<uuid>
+ [|subevent_index]
"""
return u"{0}|{1}|{2}|{3}".format(self.event_key_prefix,
event.event_type, event.name,
@@ -249,13 +252,14 @@ class HyperVKvpReportingHandler(ReportingHandler):
data_without_desc = json.dumps(meta_data,
separators=self.JSON_SEPARATORS)
room_for_desc = (
- self.HV_KVP_EXCHANGE_MAX_VALUE_SIZE -
+ self.HV_KVP_AZURE_MAX_VALUE_SIZE -
len(data_without_desc) - 8)
value = data_without_desc.replace(
message_place_holder,
'"{key}":"{desc}"'.format(
key=self.MSG_KEY, desc=des_in_json[:room_for_desc]))
- result_array.append(self._encode_kvp_item(key, value))
+ subkey = "{}|{}".format(key, i)
+ result_array.append(self._encode_kvp_item(subkey, value))
i += 1
des_in_json = des_in_json[room_for_desc:]
if len(des_in_json) == 0:
@@ -282,7 +286,7 @@ class HyperVKvpReportingHandler(ReportingHandler):
# if it reaches the maximum length of kvp value,
# break it down to slices.
# this should be very corner case.
- if len(value) > self.HV_KVP_EXCHANGE_MAX_VALUE_SIZE:
+ if len(value) > self.HV_KVP_AZURE_MAX_VALUE_SIZE:
return self._break_down(key, meta_data, event.description)
else:
data = self._encode_kvp_item(key, value)
diff --git a/cloudinit/sources/DataSourceAzure.py b/cloudinit/sources/DataSourceAzure.py
index a3810ca8..f572a4ef 100755
--- a/cloudinit/sources/DataSourceAzure.py
+++ b/cloudinit/sources/DataSourceAzure.py
@@ -35,7 +35,8 @@ from cloudinit.sources.helpers.azure import (
report_diagnostic_event,
EphemeralDHCPv4WithReporting,
is_byte_swapped,
- dhcp_log_cb)
+ dhcp_log_cb,
+ push_log_to_kvp)
LOG = logging.getLogger(__name__)
@@ -789,9 +790,12 @@ class DataSourceAzure(sources.DataSource):
@azure_ds_telemetry_reporter
def activate(self, cfg, is_new_instance):
- address_ephemeral_resize(is_new_instance=is_new_instance,
- preserve_ntfs=self.ds_cfg.get(
- DS_CFG_KEY_PRESERVE_NTFS, False))
+ try:
+ address_ephemeral_resize(is_new_instance=is_new_instance,
+ preserve_ntfs=self.ds_cfg.get(
+ DS_CFG_KEY_PRESERVE_NTFS, False))
+ finally:
+ push_log_to_kvp(self.sys_cfg['def_log_file'])
return
@property
diff --git a/cloudinit/sources/helpers/azure.py b/cloudinit/sources/helpers/azure.py
index 6df28ccf..6156c75b 100755
--- a/cloudinit/sources/helpers/azure.py
+++ b/cloudinit/sources/helpers/azure.py
@@ -1,5 +1,5 @@
# This file is part of cloud-init. See LICENSE file for license information.
-
+import base64
import json
import logging
import os
@@ -8,7 +8,9 @@ import socket
import struct
import time
import textwrap
+import zlib
+from cloudinit.settings import CFG_BUILTIN
from cloudinit.net import dhcp
from cloudinit import stages
from cloudinit import temp_utils
@@ -33,7 +35,14 @@ DEFAULT_WIRESERVER_ENDPOINT = "a8:3f:81:10"
BOOT_EVENT_TYPE = 'boot-telemetry'
SYSTEMINFO_EVENT_TYPE = 'system-info'
DIAGNOSTIC_EVENT_TYPE = 'diagnostic'
-
+COMPRESSED_EVENT_TYPE = 'compressed'
+# Maximum number of bytes of the cloud-init.log file that can be dumped to KVP
+# at once. This number is based on the analysis done on a large sample of
+# cloud-init.log files where the P95 of the file sizes was 537KB and the time
+# consumed to dump 500KB file was (P95:76, P99:233, P99.9:1170) in ms
+MAX_LOG_TO_KVP_LENGTH = 512000
+# Marker file to indicate whether cloud-init.log is pushed to KVP
+LOG_PUSHED_TO_KVP_MARKER_FILE = '/var/lib/cloud/data/log_pushed_to_kvp'
azure_ds_reporter = events.ReportEventStack(
name="azure-ds",
description="initialize reporter for azure ds",
@@ -177,6 +186,49 @@ def report_diagnostic_event(str):
return evt
+def report_compressed_event(event_name, event_content):
+ """Report a compressed event"""
+ compressed_data = base64.encodebytes(zlib.compress(event_content))
+ event_data = {"encoding": "gz+b64",
+ "data": compressed_data.decode('ascii')}
+ evt = events.ReportingEvent(
+ COMPRESSED_EVENT_TYPE, event_name,
+ json.dumps(event_data),
+ events.DEFAULT_EVENT_ORIGIN)
+ events.report_event(evt,
+ excluded_handler_types={"log", "print", "webhook"})
+
+ # return the event for unit testing purpose
+ return evt
+
+
+@azure_ds_telemetry_reporter
+def push_log_to_kvp(file_name=CFG_BUILTIN['def_log_file']):
+ """Push a portion of cloud-init.log file or the whole file to KVP
+ based on the file size.
+ If called more than once, it skips pushing the log file to KVP again."""
+
+ log_pushed_to_kvp = bool(os.path.isfile(LOG_PUSHED_TO_KVP_MARKER_FILE))
+ if log_pushed_to_kvp:
+ report_diagnostic_event("cloud-init.log is already pushed to KVP")
+ return
+
+ LOG.debug("Dumping cloud-init.log file to KVP")
+ try:
+ with open(file_name, "rb") as f:
+ f.seek(0, os.SEEK_END)
+ seek_index = max(f.tell() - MAX_LOG_TO_KVP_LENGTH, 0)
+ report_diagnostic_event(
+ "Dumping last {} bytes of cloud-init.log file to KVP".format(
+ f.tell() - seek_index))
+ f.seek(seek_index, os.SEEK_SET)
+ report_compressed_event("cloud-init.log", f.read())
+ util.write_file(LOG_PUSHED_TO_KVP_MARKER_FILE, '')
+ except Exception as ex:
+ report_diagnostic_event("Exception when dumping log file: %s" %
+ repr(ex))
+
+
@contextmanager
def cd(newdir):
prevdir = os.getcwd()
@@ -474,6 +526,8 @@ class GoalStateHealthReporter:
@azure_ds_telemetry_reporter
def _post_health_report(self, document):
+ push_log_to_kvp()
+
# Whenever report_diagnostic_event(diagnostic_msg) is invoked in code,
# the diagnostic messages are written to special files
# (/var/opt/hyperv/.kvp_pool_*) as Hyper-V KVP messages.
diff --git a/tests/unittests/test_reporting_hyperv.py b/tests/unittests/test_reporting_hyperv.py
index bacf5da9..47ede670 100644
--- a/tests/unittests/test_reporting_hyperv.py
+++ b/tests/unittests/test_reporting_hyperv.py
@@ -1,7 +1,9 @@
# This file is part of cloud-init. See LICENSE file for license information.
+import base64
+import zlib
-from cloudinit.reporting import events
-from cloudinit.reporting.handlers import HyperVKvpReportingHandler
+from cloudinit.reporting import events, instantiated_handler_registry
+from cloudinit.reporting.handlers import HyperVKvpReportingHandler, LogHandler
import json
import os
@@ -72,7 +74,7 @@ class TextKvpReporter(CiTestCase):
def test_event_very_long(self):
reporter = HyperVKvpReportingHandler(
kvp_file_path=self.tmp_file_path)
- description = 'ab' * reporter.HV_KVP_EXCHANGE_MAX_VALUE_SIZE
+ description = 'ab' * reporter.HV_KVP_AZURE_MAX_VALUE_SIZE
long_event = events.FinishReportingEvent(
'event_name',
description,
@@ -199,6 +201,72 @@ class TextKvpReporter(CiTestCase):
if "test_diagnostic" not in evt_msg:
raise AssertionError("missing expected diagnostic message")
+ def test_report_compressed_event(self):
+ reporter = HyperVKvpReportingHandler(kvp_file_path=self.tmp_file_path)
+ try:
+ instantiated_handler_registry.register_item("telemetry", reporter)
+ event_desc = b'test_compressed'
+ azure.report_compressed_event(
+ "compressed event", event_desc)
+
+ self.validate_compressed_kvps(reporter, 1, [event_desc])
+ finally:
+ instantiated_handler_registry.unregister_item("telemetry",
+ force=False)
+
+ @mock.patch.object(LogHandler, 'publish_event')
+ def test_push_log_to_kvp(self, publish_event):
+ reporter = HyperVKvpReportingHandler(kvp_file_path=self.tmp_file_path)
+ try:
+ instantiated_handler_registry.register_item("telemetry", reporter)
+ log_file = self.tmp_path("cloud-init.log")
+ azure.MAX_LOG_TO_KVP_LENGTH = 100
+ azure.LOG_PUSHED_TO_KVP_MARKER_FILE = self.tmp_path(
+ 'log_pushed_to_kvp')
+ with open(log_file, "w") as f:
+ log_content = "A" * 50 + "B" * 100
+ f.write(log_content)
+ azure.push_log_to_kvp(log_file)
+
+ with open(log_file, "a") as f:
+ extra_content = "C" * 10
+ f.write(extra_content)
+ azure.push_log_to_kvp(log_file)
+
+ for call_arg in publish_event.call_args_list:
+ event = call_arg[0][0]
+ self.assertNotEqual(
+ event.event_type, azure.COMPRESSED_EVENT_TYPE)
+ self.validate_compressed_kvps(
+ reporter, 1,
+ [log_content[-azure.MAX_LOG_TO_KVP_LENGTH:].encode()])
+ finally:
+ instantiated_handler_registry.unregister_item("telemetry",
+ force=False)
+
+ def validate_compressed_kvps(self, reporter, count, values):
+ reporter.q.join()
+ kvps = list(reporter._iterate_kvps(0))
+ compressed_count = 0
+ for i in range(len(kvps)):
+ kvp = kvps[i]
+ kvp_value = kvp['value']
+ kvp_value_json = json.loads(kvp_value)
+ evt_msg = kvp_value_json["msg"]
+ evt_type = kvp_value_json["type"]
+ if evt_type != azure.COMPRESSED_EVENT_TYPE:
+ continue
+ evt_msg_json = json.loads(evt_msg)
+ evt_encoding = evt_msg_json["encoding"]
+ evt_data = zlib.decompress(
+ base64.decodebytes(evt_msg_json["data"].encode("ascii")))
+
+ self.assertLess(compressed_count, len(values))
+ self.assertEqual(evt_data, values[compressed_count])
+ self.assertEqual(evt_encoding, "gz+b64")
+ compressed_count += 1
+ self.assertEqual(compressed_count, count)
+
def test_unique_kvp_key(self):
reporter = HyperVKvpReportingHandler(kvp_file_path=self.tmp_file_path)
evt1 = events.ReportingEvent(