summaryrefslogtreecommitdiff
path: root/tests/unittests/test_reporting_hyperv.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/unittests/test_reporting_hyperv.py')
-rw-r--r--tests/unittests/test_reporting_hyperv.py193
1 files changed, 105 insertions, 88 deletions
diff --git a/tests/unittests/test_reporting_hyperv.py b/tests/unittests/test_reporting_hyperv.py
index 24a1dcc7..35ab0c58 100644
--- a/tests/unittests/test_reporting_hyperv.py
+++ b/tests/unittests/test_reporting_hyperv.py
@@ -1,27 +1,25 @@
# This file is part of cloud-init. See LICENSE file for license information.
import base64
-import zlib
-
-from cloudinit.reporting import events, instantiated_handler_registry
-from cloudinit.reporting.handlers import HyperVKvpReportingHandler, LogHandler
-
import json
import os
+import re
import struct
import time
-import re
+import zlib
from unittest import mock
from cloudinit import util
-from tests.unittests.helpers import CiTestCase
+from cloudinit.reporting import events, instantiated_handler_registry
+from cloudinit.reporting.handlers import HyperVKvpReportingHandler, LogHandler
from cloudinit.sources.helpers import azure
+from tests.unittests.helpers import CiTestCase
class TestKvpEncoding(CiTestCase):
def test_encode_decode(self):
- kvp = {'key': 'key1', 'value': 'value1'}
+ kvp = {"key": "key1", "value": "value1"}
kvp_reporting = HyperVKvpReportingHandler()
- data = kvp_reporting._encode_kvp_item(kvp['key'], kvp['value'])
+ data = kvp_reporting._encode_kvp_item(kvp["key"], kvp["value"])
self.assertEqual(len(data), kvp_reporting.HV_KVP_RECORD_SIZE)
decoded_kvp = kvp_reporting._decode_kvp_item(data)
self.assertEqual(kvp, decoded_kvp)
@@ -30,71 +28,72 @@ class TestKvpEncoding(CiTestCase):
class TextKvpReporter(CiTestCase):
def setUp(self):
super(TextKvpReporter, self).setUp()
- self.tmp_file_path = self.tmp_path('kvp_pool_file')
+ self.tmp_file_path = self.tmp_path("kvp_pool_file")
util.ensure_file(self.tmp_file_path)
def test_events_with_higher_incarnation_not_over_written(self):
- reporter = HyperVKvpReportingHandler(
- kvp_file_path=self.tmp_file_path)
+ reporter = HyperVKvpReportingHandler(kvp_file_path=self.tmp_file_path)
self.assertEqual(0, len(list(reporter._iterate_kvps(0))))
reporter.publish_event(
- events.ReportingEvent('foo', 'name1', 'description'))
+ events.ReportingEvent("foo", "name1", "description")
+ )
reporter.publish_event(
- events.ReportingEvent('foo', 'name2', 'description'))
+ events.ReportingEvent("foo", "name2", "description")
+ )
reporter.q.join()
self.assertEqual(2, len(list(reporter._iterate_kvps(0))))
- reporter3 = HyperVKvpReportingHandler(
- kvp_file_path=self.tmp_file_path)
+ reporter3 = HyperVKvpReportingHandler(kvp_file_path=self.tmp_file_path)
reporter3.incarnation_no = reporter.incarnation_no - 1
reporter3.publish_event(
- events.ReportingEvent('foo', 'name3', 'description'))
+ events.ReportingEvent("foo", "name3", "description")
+ )
reporter3.q.join()
self.assertEqual(3, len(list(reporter3._iterate_kvps(0))))
def test_finish_event_result_is_logged(self):
- reporter = HyperVKvpReportingHandler(
- kvp_file_path=self.tmp_file_path)
+ reporter = HyperVKvpReportingHandler(kvp_file_path=self.tmp_file_path)
reporter.publish_event(
- events.FinishReportingEvent('name2', 'description1',
- result=events.status.FAIL))
+ events.FinishReportingEvent(
+ "name2", "description1", result=events.status.FAIL
+ )
+ )
reporter.q.join()
- self.assertIn('FAIL', list(reporter._iterate_kvps(0))[0]['value'])
+ self.assertIn("FAIL", list(reporter._iterate_kvps(0))[0]["value"])
def test_file_operation_issue(self):
os.remove(self.tmp_file_path)
- reporter = HyperVKvpReportingHandler(
- kvp_file_path=self.tmp_file_path)
+ reporter = HyperVKvpReportingHandler(kvp_file_path=self.tmp_file_path)
reporter.publish_event(
- events.FinishReportingEvent('name2', 'description1',
- result=events.status.FAIL))
+ events.FinishReportingEvent(
+ "name2", "description1", result=events.status.FAIL
+ )
+ )
reporter.q.join()
def test_event_very_long(self):
- reporter = HyperVKvpReportingHandler(
- kvp_file_path=self.tmp_file_path)
- description = 'ab' * reporter.HV_KVP_AZURE_MAX_VALUE_SIZE
+ reporter = HyperVKvpReportingHandler(kvp_file_path=self.tmp_file_path)
+ description = "ab" * reporter.HV_KVP_AZURE_MAX_VALUE_SIZE
long_event = events.FinishReportingEvent(
- 'event_name',
- description,
- result=events.status.FAIL)
+ "event_name", description, result=events.status.FAIL
+ )
reporter.publish_event(long_event)
reporter.q.join()
kvps = list(reporter._iterate_kvps(0))
self.assertEqual(3, len(kvps))
# restore from the kvp to see the content are all there
- full_description = ''
+ full_description = ""
for i in range(len(kvps)):
- msg_slice = json.loads(kvps[i]['value'])
- self.assertEqual(msg_slice['msg_i'], i)
- full_description += msg_slice['msg']
+ msg_slice = json.loads(kvps[i]["value"])
+ self.assertEqual(msg_slice["msg_i"], i)
+ full_description += msg_slice["msg"]
self.assertEqual(description, full_description)
def test_not_truncate_kvp_file_modified_after_boot(self):
with open(self.tmp_file_path, "wb+") as f:
- kvp = {'key': 'key1', 'value': 'value1'}
+ kvp = {"key": "key1", "value": "value1"}
data = struct.pack(
"%ds%ds"
% (
@@ -118,11 +117,16 @@ class TextKvpReporter(CiTestCase):
def test_truncate_stale_kvp_file(self):
with open(self.tmp_file_path, "wb+") as f:
- kvp = {'key': 'key1', 'value': 'value1'}
- data = (struct.pack("%ds%ds" % (
- HyperVKvpReportingHandler.HV_KVP_EXCHANGE_MAX_KEY_SIZE,
- HyperVKvpReportingHandler.HV_KVP_EXCHANGE_MAX_VALUE_SIZE),
- kvp['key'].encode('utf-8'), kvp['value'].encode('utf-8')))
+ kvp = {"key": "key1", "value": "value1"}
+ data = struct.pack(
+ "%ds%ds"
+ % (
+ HyperVKvpReportingHandler.HV_KVP_EXCHANGE_MAX_KEY_SIZE,
+ HyperVKvpReportingHandler.HV_KVP_EXCHANGE_MAX_VALUE_SIZE,
+ ),
+ kvp["key"].encode("utf-8"),
+ kvp["value"].encode("utf-8"),
+ )
f.write(data)
# set the time ways back to make it look like
@@ -137,8 +141,8 @@ class TextKvpReporter(CiTestCase):
kvps = list(reporter._iterate_kvps(0))
self.assertEqual(0, len(kvps))
- @mock.patch('cloudinit.distros.uses_systemd')
- @mock.patch('cloudinit.subp.subp')
+ @mock.patch("cloudinit.distros.uses_systemd")
+ @mock.patch("cloudinit.subp.subp")
def test_get_boot_telemetry(self, m_subp, m_sysd):
reporter = HyperVKvpReportingHandler(kvp_file_path=self.tmp_file_path)
datetime_pattern = (
@@ -149,8 +153,9 @@ class TextKvpReporter(CiTestCase):
# get_boot_telemetry makes two subp calls to systemctl. We provide
# a list of values that the subp calls should return
m_subp.side_effect = [
- ('UserspaceTimestampMonotonic=1844838', ''),
- ('InactiveExitTimestampMonotonic=3068203', '')]
+ ("UserspaceTimestampMonotonic=1844838", ""),
+ ("InactiveExitTimestampMonotonic=3068203", ""),
+ ]
m_sysd.return_value = True
reporter.publish_event(azure.get_boot_telemetry())
@@ -158,15 +163,13 @@ class TextKvpReporter(CiTestCase):
kvps = list(reporter._iterate_kvps(0))
self.assertEqual(1, len(kvps))
- evt_msg = kvps[0]['value']
+ evt_msg = kvps[0]["value"]
if not re.search("kernel_start=" + datetime_pattern, evt_msg):
raise AssertionError("missing kernel_start timestamp")
if not re.search("user_start=" + datetime_pattern, evt_msg):
raise AssertionError("missing user_start timestamp")
- if not re.search("cloudinit_activation=" + datetime_pattern,
- evt_msg):
- raise AssertionError(
- "missing cloudinit_activation timestamp")
+ if not re.search("cloudinit_activation=" + datetime_pattern, evt_msg):
+ raise AssertionError("missing cloudinit_activation timestamp")
def test_get_system_info(self):
reporter = HyperVKvpReportingHandler(kvp_file_path=self.tmp_file_path)
@@ -176,7 +179,7 @@ class TextKvpReporter(CiTestCase):
reporter.q.join()
kvps = list(reporter._iterate_kvps(0))
self.assertEqual(1, len(kvps))
- evt_msg = kvps[0]['value']
+ evt_msg = kvps[0]["value"]
# the most important information is cloudinit version,
# kernel_version, and the distro variant. It is ok if
@@ -191,12 +194,11 @@ class TextKvpReporter(CiTestCase):
def test_report_diagnostic_event_without_logger_func(self):
reporter = HyperVKvpReportingHandler(kvp_file_path=self.tmp_file_path)
diagnostic_msg = "test_diagnostic"
- reporter.publish_event(
- azure.report_diagnostic_event(diagnostic_msg))
+ reporter.publish_event(azure.report_diagnostic_event(diagnostic_msg))
reporter.q.join()
kvps = list(reporter._iterate_kvps(0))
self.assertEqual(1, len(kvps))
- evt_msg = kvps[0]['value']
+ evt_msg = kvps[0]["value"]
if diagnostic_msg not in evt_msg:
raise AssertionError("missing expected diagnostic message")
@@ -206,12 +208,14 @@ class TextKvpReporter(CiTestCase):
logger_func = mock.MagicMock()
diagnostic_msg = "test_diagnostic"
reporter.publish_event(
- azure.report_diagnostic_event(diagnostic_msg,
- logger_func=logger_func))
+ azure.report_diagnostic_event(
+ diagnostic_msg, logger_func=logger_func
+ )
+ )
reporter.q.join()
kvps = list(reporter._iterate_kvps(0))
self.assertEqual(1, len(kvps))
- evt_msg = kvps[0]['value']
+ evt_msg = kvps[0]["value"]
if diagnostic_msg not in evt_msg:
raise AssertionError("missing expected diagnostic message")
@@ -221,18 +225,18 @@ class TextKvpReporter(CiTestCase):
reporter = HyperVKvpReportingHandler(kvp_file_path=self.tmp_file_path)
try:
instantiated_handler_registry.register_item("telemetry", reporter)
- event_desc = b'test_compressed'
- azure.report_compressed_event(
- "compressed event", event_desc)
+ event_desc = b"test_compressed"
+ azure.report_compressed_event("compressed event", event_desc)
self.validate_compressed_kvps(reporter, 1, [event_desc])
finally:
- instantiated_handler_registry.unregister_item("telemetry",
- force=False)
+ instantiated_handler_registry.unregister_item(
+ "telemetry", force=False
+ )
- @mock.patch('cloudinit.sources.helpers.azure.report_compressed_event')
- @mock.patch('cloudinit.sources.helpers.azure.report_diagnostic_event')
- @mock.patch('cloudinit.subp.subp')
+ @mock.patch("cloudinit.sources.helpers.azure.report_compressed_event")
+ @mock.patch("cloudinit.sources.helpers.azure.report_diagnostic_event")
+ @mock.patch("cloudinit.subp.subp")
def test_push_log_to_kvp_exception_handling(self, m_subp, m_diag, m_com):
reporter = HyperVKvpReportingHandler(kvp_file_path=self.tmp_file_path)
try:
@@ -240,7 +244,8 @@ class TextKvpReporter(CiTestCase):
log_file = self.tmp_path("cloud-init.log")
azure.MAX_LOG_TO_KVP_LENGTH = 100
azure.LOG_PUSHED_TO_KVP_INDEX_FILE = self.tmp_path(
- 'log_pushed_to_kvp')
+ "log_pushed_to_kvp"
+ )
with open(log_file, "w") as f:
log_content = "A" * 50 + "B" * 100
f.write(log_content)
@@ -251,11 +256,12 @@ class TextKvpReporter(CiTestCase):
# exceptions will trigger diagnostic reporting calls
self.assertEqual(m_diag.call_count, 3)
finally:
- instantiated_handler_registry.unregister_item("telemetry",
- force=False)
+ instantiated_handler_registry.unregister_item(
+ "telemetry", force=False
+ )
- @mock.patch('cloudinit.subp.subp')
- @mock.patch.object(LogHandler, 'publish_event')
+ @mock.patch("cloudinit.subp.subp")
+ @mock.patch.object(LogHandler, "publish_event")
def test_push_log_to_kvp(self, publish_event, m_subp):
reporter = HyperVKvpReportingHandler(kvp_file_path=self.tmp_file_path)
try:
@@ -263,7 +269,8 @@ class TextKvpReporter(CiTestCase):
log_file = self.tmp_path("cloud-init.log")
azure.MAX_LOG_TO_KVP_LENGTH = 100
azure.LOG_PUSHED_TO_KVP_INDEX_FILE = self.tmp_path(
- 'log_pushed_to_kvp')
+ "log_pushed_to_kvp"
+ )
with open(log_file, "w") as f:
log_content = "A" * 50 + "B" * 100
f.write(log_content)
@@ -275,20 +282,25 @@ class TextKvpReporter(CiTestCase):
azure.push_log_to_kvp(log_file)
# make sure dmesg is called every time
- m_subp.assert_called_with(
- ['dmesg'], capture=True, decode=False)
+ m_subp.assert_called_with(["dmesg"], capture=True, decode=False)
for call_arg in publish_event.call_args_list:
event = call_arg[0][0]
self.assertNotEqual(
- event.event_type, azure.COMPRESSED_EVENT_TYPE)
+ event.event_type, azure.COMPRESSED_EVENT_TYPE
+ )
self.validate_compressed_kvps(
- reporter, 2,
- [log_content[-azure.MAX_LOG_TO_KVP_LENGTH:].encode(),
- extra_content.encode()])
+ reporter,
+ 2,
+ [
+ log_content[-azure.MAX_LOG_TO_KVP_LENGTH :].encode(),
+ extra_content.encode(),
+ ],
+ )
finally:
- instantiated_handler_registry.unregister_item("telemetry",
- force=False)
+ instantiated_handler_registry.unregister_item(
+ "telemetry", force=False
+ )
def validate_compressed_kvps(self, reporter, count, values):
reporter.q.join()
@@ -296,7 +308,7 @@ class TextKvpReporter(CiTestCase):
compressed_count = 0
for i in range(len(kvps)):
kvp = kvps[i]
- kvp_value = kvp['value']
+ kvp_value = kvp["value"]
kvp_value_json = json.loads(kvp_value)
evt_msg = kvp_value_json["msg"]
evt_type = kvp_value_json["type"]
@@ -305,7 +317,8 @@ class TextKvpReporter(CiTestCase):
evt_msg_json = json.loads(evt_msg)
evt_encoding = evt_msg_json["encoding"]
evt_data = zlib.decompress(
- base64.decodebytes(evt_msg_json["data"].encode("ascii")))
+ base64.decodebytes(evt_msg_json["data"].encode("ascii"))
+ )
self.assertLess(compressed_count, len(values))
self.assertEqual(evt_data, values[compressed_count])
@@ -316,17 +329,21 @@ class TextKvpReporter(CiTestCase):
def test_unique_kvp_key(self):
reporter = HyperVKvpReportingHandler(kvp_file_path=self.tmp_file_path)
evt1 = events.ReportingEvent(
- "event_type", 'event_message',
- "event_description")
+ "event_type", "event_message", "event_description"
+ )
reporter.publish_event(evt1)
evt2 = events.ReportingEvent(
- "event_type", 'event_message',
- "event_description", timestamp=evt1.timestamp + 1)
+ "event_type",
+ "event_message",
+ "event_description",
+ timestamp=evt1.timestamp + 1,
+ )
reporter.publish_event(evt2)
reporter.q.join()
kvps = list(reporter._iterate_kvps(0))
self.assertEqual(2, len(kvps))
- self.assertNotEqual(kvps[0]["key"], kvps[1]["key"],
- "duplicate keys for KVP entries")
+ self.assertNotEqual(
+ kvps[0]["key"], kvps[1]["key"], "duplicate keys for KVP entries"
+ )