From 86674f013dfcea3c075ab41373ffb475881066f6 Mon Sep 17 00:00:00 2001 From: Anh Vo Date: Mon, 29 Apr 2019 20:22:16 +0000 Subject: Azure: Changes to the Hyper-V KVP Reporter MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit  + Truncate KVP Pool file to prevent stale entries from being processed by the Hyper-V KVP reporter.  + Drop filtering of KVPs as it is no longer needed.  + Batch appending of existing KVP entries. --- cloudinit/reporting/handlers.py | 117 +++++++++++++++---------------- tests/unittests/test_reporting_hyperv.py | 104 +++++++++++++-------------- 2 files changed, 106 insertions(+), 115 deletions(-) mode change 100644 => 100755 cloudinit/reporting/handlers.py mode change 100644 => 100755 tests/unittests/test_reporting_hyperv.py diff --git a/cloudinit/reporting/handlers.py b/cloudinit/reporting/handlers.py old mode 100644 new mode 100755 index 6d23558e..10165aec --- a/cloudinit/reporting/handlers.py +++ b/cloudinit/reporting/handlers.py @@ -5,7 +5,6 @@ import fcntl import json import six import os -import re import struct import threading import time @@ -14,6 +13,7 @@ from cloudinit import log as logging from cloudinit.registry import DictRegistry from cloudinit import (url_helper, util) from datetime import datetime +from six.moves.queue import Empty as QueueEmptyError if six.PY2: from multiprocessing.queues import JoinableQueue as JQueue @@ -129,24 +129,50 @@ class HyperVKvpReportingHandler(ReportingHandler): DESC_IDX_KEY = 'msg_i' JSON_SEPARATORS = (',', ':') KVP_POOL_FILE_GUEST = '/var/lib/hyperv/.kvp_pool_1' + _already_truncated_pool_file = False def __init__(self, kvp_file_path=KVP_POOL_FILE_GUEST, event_types=None): super(HyperVKvpReportingHandler, self).__init__() self._kvp_file_path = kvp_file_path + HyperVKvpReportingHandler._truncate_guest_pool_file( + self._kvp_file_path) + self._event_types = event_types self.q = JQueue() - self.kvp_file = None self.incarnation_no = self._get_incarnation_no() self.event_key_prefix = u"{0}|{1}".format(self.EVENT_PREFIX, self.incarnation_no) - self._current_offset = 0 self.publish_thread = threading.Thread( target=self._publish_event_routine) self.publish_thread.daemon = True self.publish_thread.start() + @classmethod + def _truncate_guest_pool_file(cls, kvp_file): + """ + Truncate the pool file if it has not been truncated since boot. + This should be done exactly once for the file indicated by + KVP_POOL_FILE_GUEST constant above. This method takes a filename + so that we can use an arbitrary file during unit testing. + Since KVP is a best-effort telemetry channel we only attempt to + truncate the file once and only if the file has not been modified + since boot. Additional truncation can lead to loss of existing + KVPs. + """ + if cls._already_truncated_pool_file: + return + boot_time = time.time() - float(util.uptime()) + try: + if os.path.getmtime(kvp_file) < boot_time: + with open(kvp_file, "w"): + pass + except (OSError, IOError) as e: + LOG.warning("failed to truncate kvp pool file, %s", e) + finally: + cls._already_truncated_pool_file = True + def _get_incarnation_no(self): """ use the time passed as the incarnation number. @@ -162,20 +188,15 @@ class HyperVKvpReportingHandler(ReportingHandler): def _iterate_kvps(self, offset): """iterate the kvp file from the current offset.""" - try: - with open(self._kvp_file_path, 'rb+') as f: - self.kvp_file = f - fcntl.flock(f, fcntl.LOCK_EX) - f.seek(offset) + with open(self._kvp_file_path, 'rb') as f: + fcntl.flock(f, fcntl.LOCK_EX) + f.seek(offset) + record_data = f.read(self.HV_KVP_RECORD_SIZE) + while len(record_data) == self.HV_KVP_RECORD_SIZE: + kvp_item = self._decode_kvp_item(record_data) + yield kvp_item record_data = f.read(self.HV_KVP_RECORD_SIZE) - while len(record_data) == self.HV_KVP_RECORD_SIZE: - self._current_offset += self.HV_KVP_RECORD_SIZE - kvp_item = self._decode_kvp_item(record_data) - yield kvp_item - record_data = f.read(self.HV_KVP_RECORD_SIZE) - fcntl.flock(f, fcntl.LOCK_UN) - finally: - self.kvp_file = None + fcntl.flock(f, fcntl.LOCK_UN) def _event_key(self, event): """ @@ -207,23 +228,13 @@ class HyperVKvpReportingHandler(ReportingHandler): return {'key': k, 'value': v} - def _update_kvp_item(self, record_data): - if self.kvp_file is None: - raise ReportException( - "kvp file '{0}' not opened." - .format(self._kvp_file_path)) - self.kvp_file.seek(-self.HV_KVP_RECORD_SIZE, 1) - self.kvp_file.write(record_data) - def _append_kvp_item(self, record_data): - with open(self._kvp_file_path, 'rb+') as f: + with open(self._kvp_file_path, 'ab') as f: fcntl.flock(f, fcntl.LOCK_EX) - # seek to end of the file - f.seek(0, 2) - f.write(record_data) + for data in record_data: + f.write(data) f.flush() fcntl.flock(f, fcntl.LOCK_UN) - self._current_offset = f.tell() def _break_down(self, key, meta_data, description): del meta_data[self.MSG_KEY] @@ -279,40 +290,26 @@ class HyperVKvpReportingHandler(ReportingHandler): def _publish_event_routine(self): while True: + items_from_queue = 0 try: event = self.q.get(block=True) - need_append = True + items_from_queue += 1 + encoded_data = [] + while event is not None: + encoded_data += self._encode_event(event) + try: + # get all the rest of the events in the queue + event = self.q.get(block=False) + items_from_queue += 1 + except QueueEmptyError: + event = None try: - if not os.path.exists(self._kvp_file_path): - LOG.warning( - "skip writing events %s to %s. file not present.", - event.as_string(), - self._kvp_file_path) - encoded_event = self._encode_event(event) - # for each encoded_event - for encoded_data in (encoded_event): - for kvp in self._iterate_kvps(self._current_offset): - match = ( - re.match( - r"^{0}\|(\d+)\|.+" - .format(self.EVENT_PREFIX), - kvp['key'] - )) - if match: - match_groups = match.groups(0) - if int(match_groups[0]) < self.incarnation_no: - need_append = False - self._update_kvp_item(encoded_data) - continue - if need_append: - self._append_kvp_item(encoded_data) - except IOError as e: - LOG.warning( - "failed posting event to kvp: %s e:%s", - event.as_string(), e) + self._append_kvp_item(encoded_data) + except (OSError, IOError) as e: + LOG.warning("failed posting events to kvp, %s", e) finally: - self.q.task_done() - + for _ in range(items_from_queue): + self.q.task_done() # when main process exits, q.get() will through EOFError # indicating we should exit this thread. except EOFError: @@ -322,7 +319,7 @@ class HyperVKvpReportingHandler(ReportingHandler): # if the kvp pool already contains a chunk of data, # so defer it to another thread. def publish_event(self, event): - if (not self._event_types or event.event_type in self._event_types): + if not self._event_types or event.event_type in self._event_types: self.q.put(event) def flush(self): diff --git a/tests/unittests/test_reporting_hyperv.py b/tests/unittests/test_reporting_hyperv.py old mode 100644 new mode 100755 index 2e64c6c7..d01ed5b3 --- a/tests/unittests/test_reporting_hyperv.py +++ b/tests/unittests/test_reporting_hyperv.py @@ -1,10 +1,12 @@ # This file is part of cloud-init. See LICENSE file for license information. from cloudinit.reporting import events -from cloudinit.reporting import handlers +from cloudinit.reporting.handlers import HyperVKvpReportingHandler import json import os +import struct +import time from cloudinit import util from cloudinit.tests.helpers import CiTestCase @@ -13,7 +15,7 @@ from cloudinit.tests.helpers import CiTestCase class TestKvpEncoding(CiTestCase): def test_encode_decode(self): kvp = {'key': 'key1', 'value': 'value1'} - kvp_reporting = handlers.HyperVKvpReportingHandler() + kvp_reporting = HyperVKvpReportingHandler() data = kvp_reporting._encode_kvp_item(kvp['key'], kvp['value']) self.assertEqual(len(data), kvp_reporting.HV_KVP_RECORD_SIZE) decoded_kvp = kvp_reporting._decode_kvp_item(data) @@ -26,57 +28,9 @@ class TextKvpReporter(CiTestCase): self.tmp_file_path = self.tmp_path('kvp_pool_file') util.ensure_file(self.tmp_file_path) - def test_event_type_can_be_filtered(self): - reporter = handlers.HyperVKvpReportingHandler( - kvp_file_path=self.tmp_file_path, - event_types=['foo', 'bar']) - - reporter.publish_event( - events.ReportingEvent('foo', 'name', 'description')) - reporter.publish_event( - events.ReportingEvent('some_other', 'name', 'description3')) - reporter.q.join() - - kvps = list(reporter._iterate_kvps(0)) - self.assertEqual(1, len(kvps)) - - reporter.publish_event( - events.ReportingEvent('bar', 'name', 'description2')) - reporter.q.join() - kvps = list(reporter._iterate_kvps(0)) - self.assertEqual(2, len(kvps)) - - self.assertIn('foo', kvps[0]['key']) - self.assertIn('bar', kvps[1]['key']) - self.assertNotIn('some_other', kvps[0]['key']) - self.assertNotIn('some_other', kvps[1]['key']) - - def test_events_are_over_written(self): - reporter = handlers.HyperVKvpReportingHandler( - kvp_file_path=self.tmp_file_path) - - self.assertEqual(0, len(list(reporter._iterate_kvps(0)))) - - reporter.publish_event( - events.ReportingEvent('foo', 'name1', 'description')) - reporter.publish_event( - events.ReportingEvent('foo', 'name2', 'description')) - reporter.q.join() - self.assertEqual(2, len(list(reporter._iterate_kvps(0)))) - - reporter2 = handlers.HyperVKvpReportingHandler( - kvp_file_path=self.tmp_file_path) - reporter2.incarnation_no = reporter.incarnation_no + 1 - reporter2.publish_event( - events.ReportingEvent('foo', 'name3', 'description')) - reporter2.q.join() - - self.assertEqual(2, len(list(reporter2._iterate_kvps(0)))) - def test_events_with_higher_incarnation_not_over_written(self): - reporter = handlers.HyperVKvpReportingHandler( + reporter = HyperVKvpReportingHandler( kvp_file_path=self.tmp_file_path) - self.assertEqual(0, len(list(reporter._iterate_kvps(0)))) reporter.publish_event( @@ -86,7 +40,7 @@ class TextKvpReporter(CiTestCase): reporter.q.join() self.assertEqual(2, len(list(reporter._iterate_kvps(0)))) - reporter3 = handlers.HyperVKvpReportingHandler( + reporter3 = HyperVKvpReportingHandler( kvp_file_path=self.tmp_file_path) reporter3.incarnation_no = reporter.incarnation_no - 1 reporter3.publish_event( @@ -95,7 +49,7 @@ class TextKvpReporter(CiTestCase): self.assertEqual(3, len(list(reporter3._iterate_kvps(0)))) def test_finish_event_result_is_logged(self): - reporter = handlers.HyperVKvpReportingHandler( + reporter = HyperVKvpReportingHandler( kvp_file_path=self.tmp_file_path) reporter.publish_event( events.FinishReportingEvent('name2', 'description1', @@ -105,7 +59,7 @@ class TextKvpReporter(CiTestCase): def test_file_operation_issue(self): os.remove(self.tmp_file_path) - reporter = handlers.HyperVKvpReportingHandler( + reporter = HyperVKvpReportingHandler( kvp_file_path=self.tmp_file_path) reporter.publish_event( events.FinishReportingEvent('name2', 'description1', @@ -113,7 +67,7 @@ class TextKvpReporter(CiTestCase): reporter.q.join() def test_event_very_long(self): - reporter = handlers.HyperVKvpReportingHandler( + reporter = HyperVKvpReportingHandler( kvp_file_path=self.tmp_file_path) description = 'ab' * reporter.HV_KVP_EXCHANGE_MAX_VALUE_SIZE long_event = events.FinishReportingEvent( @@ -132,3 +86,43 @@ class TextKvpReporter(CiTestCase): self.assertEqual(msg_slice['msg_i'], i) full_description += msg_slice['msg'] self.assertEqual(description, full_description) + + def test_not_truncate_kvp_file_modified_after_boot(self): + with open(self.tmp_file_path, "wb+") as f: + kvp = {'key': 'key1', 'value': 'value1'} + data = (struct.pack("%ds%ds" % ( + HyperVKvpReportingHandler.HV_KVP_EXCHANGE_MAX_KEY_SIZE, + HyperVKvpReportingHandler.HV_KVP_EXCHANGE_MAX_VALUE_SIZE), + kvp['key'].encode('utf-8'), kvp['value'].encode('utf-8'))) + f.write(data) + cur_time = time.time() + os.utime(self.tmp_file_path, (cur_time, cur_time)) + + # reset this because the unit test framework + # has already polluted the class variable + HyperVKvpReportingHandler._already_truncated_pool_file = False + + reporter = HyperVKvpReportingHandler(kvp_file_path=self.tmp_file_path) + kvps = list(reporter._iterate_kvps(0)) + self.assertEqual(1, len(kvps)) + + def test_truncate_stale_kvp_file(self): + with open(self.tmp_file_path, "wb+") as f: + kvp = {'key': 'key1', 'value': 'value1'} + data = (struct.pack("%ds%ds" % ( + HyperVKvpReportingHandler.HV_KVP_EXCHANGE_MAX_KEY_SIZE, + HyperVKvpReportingHandler.HV_KVP_EXCHANGE_MAX_VALUE_SIZE), + kvp['key'].encode('utf-8'), kvp['value'].encode('utf-8'))) + f.write(data) + + # set the time ways back to make it look like + # we had an old kvp file + os.utime(self.tmp_file_path, (1000000, 1000000)) + + # reset this because the unit test framework + # has already polluted the class variable + HyperVKvpReportingHandler._already_truncated_pool_file = False + + reporter = HyperVKvpReportingHandler(kvp_file_path=self.tmp_file_path) + kvps = list(reporter._iterate_kvps(0)) + self.assertEqual(0, len(kvps)) -- cgit v1.2.3