From 86674f013dfcea3c075ab41373ffb475881066f6 Mon Sep 17 00:00:00 2001 From: Anh Vo Date: Mon, 29 Apr 2019 20:22:16 +0000 Subject: Azure: Changes to the Hyper-V KVP Reporter MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit  + Truncate KVP Pool file to prevent stale entries from being processed by the Hyper-V KVP reporter.  + Drop filtering of KVPs as it is no longer needed.  + Batch appending of existing KVP entries. --- cloudinit/reporting/handlers.py | 117 ++++++++++++++++++++-------------------- 1 file changed, 57 insertions(+), 60 deletions(-) mode change 100644 => 100755 cloudinit/reporting/handlers.py (limited to 'cloudinit/reporting') diff --git a/cloudinit/reporting/handlers.py b/cloudinit/reporting/handlers.py old mode 100644 new mode 100755 index 6d23558e..10165aec --- a/cloudinit/reporting/handlers.py +++ b/cloudinit/reporting/handlers.py @@ -5,7 +5,6 @@ import fcntl import json import six import os -import re import struct import threading import time @@ -14,6 +13,7 @@ from cloudinit import log as logging from cloudinit.registry import DictRegistry from cloudinit import (url_helper, util) from datetime import datetime +from six.moves.queue import Empty as QueueEmptyError if six.PY2: from multiprocessing.queues import JoinableQueue as JQueue @@ -129,24 +129,50 @@ class HyperVKvpReportingHandler(ReportingHandler): DESC_IDX_KEY = 'msg_i' JSON_SEPARATORS = (',', ':') KVP_POOL_FILE_GUEST = '/var/lib/hyperv/.kvp_pool_1' + _already_truncated_pool_file = False def __init__(self, kvp_file_path=KVP_POOL_FILE_GUEST, event_types=None): super(HyperVKvpReportingHandler, self).__init__() self._kvp_file_path = kvp_file_path + HyperVKvpReportingHandler._truncate_guest_pool_file( + self._kvp_file_path) + self._event_types = event_types self.q = JQueue() - self.kvp_file = None self.incarnation_no = self._get_incarnation_no() self.event_key_prefix = u"{0}|{1}".format(self.EVENT_PREFIX, self.incarnation_no) - self._current_offset = 0 self.publish_thread = threading.Thread( target=self._publish_event_routine) self.publish_thread.daemon = True self.publish_thread.start() + @classmethod + def _truncate_guest_pool_file(cls, kvp_file): + """ + Truncate the pool file if it has not been truncated since boot. + This should be done exactly once for the file indicated by + KVP_POOL_FILE_GUEST constant above. This method takes a filename + so that we can use an arbitrary file during unit testing. + Since KVP is a best-effort telemetry channel we only attempt to + truncate the file once and only if the file has not been modified + since boot. Additional truncation can lead to loss of existing + KVPs. + """ + if cls._already_truncated_pool_file: + return + boot_time = time.time() - float(util.uptime()) + try: + if os.path.getmtime(kvp_file) < boot_time: + with open(kvp_file, "w"): + pass + except (OSError, IOError) as e: + LOG.warning("failed to truncate kvp pool file, %s", e) + finally: + cls._already_truncated_pool_file = True + def _get_incarnation_no(self): """ use the time passed as the incarnation number. @@ -162,20 +188,15 @@ class HyperVKvpReportingHandler(ReportingHandler): def _iterate_kvps(self, offset): """iterate the kvp file from the current offset.""" - try: - with open(self._kvp_file_path, 'rb+') as f: - self.kvp_file = f - fcntl.flock(f, fcntl.LOCK_EX) - f.seek(offset) + with open(self._kvp_file_path, 'rb') as f: + fcntl.flock(f, fcntl.LOCK_EX) + f.seek(offset) + record_data = f.read(self.HV_KVP_RECORD_SIZE) + while len(record_data) == self.HV_KVP_RECORD_SIZE: + kvp_item = self._decode_kvp_item(record_data) + yield kvp_item record_data = f.read(self.HV_KVP_RECORD_SIZE) - while len(record_data) == self.HV_KVP_RECORD_SIZE: - self._current_offset += self.HV_KVP_RECORD_SIZE - kvp_item = self._decode_kvp_item(record_data) - yield kvp_item - record_data = f.read(self.HV_KVP_RECORD_SIZE) - fcntl.flock(f, fcntl.LOCK_UN) - finally: - self.kvp_file = None + fcntl.flock(f, fcntl.LOCK_UN) def _event_key(self, event): """ @@ -207,23 +228,13 @@ class HyperVKvpReportingHandler(ReportingHandler): return {'key': k, 'value': v} - def _update_kvp_item(self, record_data): - if self.kvp_file is None: - raise ReportException( - "kvp file '{0}' not opened." - .format(self._kvp_file_path)) - self.kvp_file.seek(-self.HV_KVP_RECORD_SIZE, 1) - self.kvp_file.write(record_data) - def _append_kvp_item(self, record_data): - with open(self._kvp_file_path, 'rb+') as f: + with open(self._kvp_file_path, 'ab') as f: fcntl.flock(f, fcntl.LOCK_EX) - # seek to end of the file - f.seek(0, 2) - f.write(record_data) + for data in record_data: + f.write(data) f.flush() fcntl.flock(f, fcntl.LOCK_UN) - self._current_offset = f.tell() def _break_down(self, key, meta_data, description): del meta_data[self.MSG_KEY] @@ -279,40 +290,26 @@ class HyperVKvpReportingHandler(ReportingHandler): def _publish_event_routine(self): while True: + items_from_queue = 0 try: event = self.q.get(block=True) - need_append = True + items_from_queue += 1 + encoded_data = [] + while event is not None: + encoded_data += self._encode_event(event) + try: + # get all the rest of the events in the queue + event = self.q.get(block=False) + items_from_queue += 1 + except QueueEmptyError: + event = None try: - if not os.path.exists(self._kvp_file_path): - LOG.warning( - "skip writing events %s to %s. file not present.", - event.as_string(), - self._kvp_file_path) - encoded_event = self._encode_event(event) - # for each encoded_event - for encoded_data in (encoded_event): - for kvp in self._iterate_kvps(self._current_offset): - match = ( - re.match( - r"^{0}\|(\d+)\|.+" - .format(self.EVENT_PREFIX), - kvp['key'] - )) - if match: - match_groups = match.groups(0) - if int(match_groups[0]) < self.incarnation_no: - need_append = False - self._update_kvp_item(encoded_data) - continue - if need_append: - self._append_kvp_item(encoded_data) - except IOError as e: - LOG.warning( - "failed posting event to kvp: %s e:%s", - event.as_string(), e) + self._append_kvp_item(encoded_data) + except (OSError, IOError) as e: + LOG.warning("failed posting events to kvp, %s", e) finally: - self.q.task_done() - + for _ in range(items_from_queue): + self.q.task_done() # when main process exits, q.get() will through EOFError # indicating we should exit this thread. except EOFError: @@ -322,7 +319,7 @@ class HyperVKvpReportingHandler(ReportingHandler): # if the kvp pool already contains a chunk of data, # so defer it to another thread. def publish_event(self, event): - if (not self._event_types or event.event_type in self._event_types): + if not self._event_types or event.event_type in self._event_types: self.q.put(event) def flush(self): -- cgit v1.2.3 From 92cc71dec8da73bf43d345418b73db1a32a6b8b9 Mon Sep 17 00:00:00 2001 From: momousta Date: Fri, 8 Nov 2019 15:02:07 -0600 Subject: reporting: Using a uuid to enforce uniqueness on the KVP keys. The KVPs currently being emitted to the .kvp_pool file can have duplicate keys which is wrong since these keys should be unique. The situation can occur if for example one azure function called twice or more and this function is reporting telemetry through the use of KVPs. Any KVP consumer can get confused by the duplicate keys and a race condition can and have occurred. --- cloudinit/reporting/handlers.py | 8 +++++--- tests/unittests/test_reporting_hyperv.py | 18 ++++++++++++++++++ 2 files changed, 23 insertions(+), 3 deletions(-) (limited to 'cloudinit/reporting') diff --git a/cloudinit/reporting/handlers.py b/cloudinit/reporting/handlers.py index 10165aec..6605e795 100755 --- a/cloudinit/reporting/handlers.py +++ b/cloudinit/reporting/handlers.py @@ -1,6 +1,7 @@ # This file is part of cloud-init. See LICENSE file for license information. import abc +import uuid import fcntl import json import six @@ -201,10 +202,11 @@ class HyperVKvpReportingHandler(ReportingHandler): def _event_key(self, event): """ the event key format is: - CLOUD_INIT||| + CLOUD_INIT||||