summaryrefslogtreecommitdiff
path: root/cloudinit/reporting/handlers.py
diff options
context:
space:
mode:
Diffstat (limited to 'cloudinit/reporting/handlers.py')
-rwxr-xr-x[-rw-r--r--]cloudinit/reporting/handlers.py117
1 files changed, 57 insertions, 60 deletions
diff --git a/cloudinit/reporting/handlers.py b/cloudinit/reporting/handlers.py
index 6d23558e..10165aec 100644..100755
--- a/cloudinit/reporting/handlers.py
+++ b/cloudinit/reporting/handlers.py
@@ -5,7 +5,6 @@ import fcntl
import json
import six
import os
-import re
import struct
import threading
import time
@@ -14,6 +13,7 @@ from cloudinit import log as logging
from cloudinit.registry import DictRegistry
from cloudinit import (url_helper, util)
from datetime import datetime
+from six.moves.queue import Empty as QueueEmptyError
if six.PY2:
from multiprocessing.queues import JoinableQueue as JQueue
@@ -129,24 +129,50 @@ class HyperVKvpReportingHandler(ReportingHandler):
DESC_IDX_KEY = 'msg_i'
JSON_SEPARATORS = (',', ':')
KVP_POOL_FILE_GUEST = '/var/lib/hyperv/.kvp_pool_1'
+ _already_truncated_pool_file = False
def __init__(self,
kvp_file_path=KVP_POOL_FILE_GUEST,
event_types=None):
super(HyperVKvpReportingHandler, self).__init__()
self._kvp_file_path = kvp_file_path
+ HyperVKvpReportingHandler._truncate_guest_pool_file(
+ self._kvp_file_path)
+
self._event_types = event_types
self.q = JQueue()
- self.kvp_file = None
self.incarnation_no = self._get_incarnation_no()
self.event_key_prefix = u"{0}|{1}".format(self.EVENT_PREFIX,
self.incarnation_no)
- self._current_offset = 0
self.publish_thread = threading.Thread(
target=self._publish_event_routine)
self.publish_thread.daemon = True
self.publish_thread.start()
+ @classmethod
+ def _truncate_guest_pool_file(cls, kvp_file):
+ """
+ Truncate the pool file if it has not been truncated since boot.
+ This should be done exactly once for the file indicated by
+ KVP_POOL_FILE_GUEST constant above. This method takes a filename
+ so that we can use an arbitrary file during unit testing.
+ Since KVP is a best-effort telemetry channel we only attempt to
+ truncate the file once and only if the file has not been modified
+ since boot. Additional truncation can lead to loss of existing
+ KVPs.
+ """
+ if cls._already_truncated_pool_file:
+ return
+ boot_time = time.time() - float(util.uptime())
+ try:
+ if os.path.getmtime(kvp_file) < boot_time:
+ with open(kvp_file, "w"):
+ pass
+ except (OSError, IOError) as e:
+ LOG.warning("failed to truncate kvp pool file, %s", e)
+ finally:
+ cls._already_truncated_pool_file = True
+
def _get_incarnation_no(self):
"""
use the time passed as the incarnation number.
@@ -162,20 +188,15 @@ class HyperVKvpReportingHandler(ReportingHandler):
def _iterate_kvps(self, offset):
"""iterate the kvp file from the current offset."""
- try:
- with open(self._kvp_file_path, 'rb+') as f:
- self.kvp_file = f
- fcntl.flock(f, fcntl.LOCK_EX)
- f.seek(offset)
+ with open(self._kvp_file_path, 'rb') as f:
+ fcntl.flock(f, fcntl.LOCK_EX)
+ f.seek(offset)
+ record_data = f.read(self.HV_KVP_RECORD_SIZE)
+ while len(record_data) == self.HV_KVP_RECORD_SIZE:
+ kvp_item = self._decode_kvp_item(record_data)
+ yield kvp_item
record_data = f.read(self.HV_KVP_RECORD_SIZE)
- while len(record_data) == self.HV_KVP_RECORD_SIZE:
- self._current_offset += self.HV_KVP_RECORD_SIZE
- kvp_item = self._decode_kvp_item(record_data)
- yield kvp_item
- record_data = f.read(self.HV_KVP_RECORD_SIZE)
- fcntl.flock(f, fcntl.LOCK_UN)
- finally:
- self.kvp_file = None
+ fcntl.flock(f, fcntl.LOCK_UN)
def _event_key(self, event):
"""
@@ -207,23 +228,13 @@ class HyperVKvpReportingHandler(ReportingHandler):
return {'key': k, 'value': v}
- def _update_kvp_item(self, record_data):
- if self.kvp_file is None:
- raise ReportException(
- "kvp file '{0}' not opened."
- .format(self._kvp_file_path))
- self.kvp_file.seek(-self.HV_KVP_RECORD_SIZE, 1)
- self.kvp_file.write(record_data)
-
def _append_kvp_item(self, record_data):
- with open(self._kvp_file_path, 'rb+') as f:
+ with open(self._kvp_file_path, 'ab') as f:
fcntl.flock(f, fcntl.LOCK_EX)
- # seek to end of the file
- f.seek(0, 2)
- f.write(record_data)
+ for data in record_data:
+ f.write(data)
f.flush()
fcntl.flock(f, fcntl.LOCK_UN)
- self._current_offset = f.tell()
def _break_down(self, key, meta_data, description):
del meta_data[self.MSG_KEY]
@@ -279,40 +290,26 @@ class HyperVKvpReportingHandler(ReportingHandler):
def _publish_event_routine(self):
while True:
+ items_from_queue = 0
try:
event = self.q.get(block=True)
- need_append = True
+ items_from_queue += 1
+ encoded_data = []
+ while event is not None:
+ encoded_data += self._encode_event(event)
+ try:
+ # get all the rest of the events in the queue
+ event = self.q.get(block=False)
+ items_from_queue += 1
+ except QueueEmptyError:
+ event = None
try:
- if not os.path.exists(self._kvp_file_path):
- LOG.warning(
- "skip writing events %s to %s. file not present.",
- event.as_string(),
- self._kvp_file_path)
- encoded_event = self._encode_event(event)
- # for each encoded_event
- for encoded_data in (encoded_event):
- for kvp in self._iterate_kvps(self._current_offset):
- match = (
- re.match(
- r"^{0}\|(\d+)\|.+"
- .format(self.EVENT_PREFIX),
- kvp['key']
- ))
- if match:
- match_groups = match.groups(0)
- if int(match_groups[0]) < self.incarnation_no:
- need_append = False
- self._update_kvp_item(encoded_data)
- continue
- if need_append:
- self._append_kvp_item(encoded_data)
- except IOError as e:
- LOG.warning(
- "failed posting event to kvp: %s e:%s",
- event.as_string(), e)
+ self._append_kvp_item(encoded_data)
+ except (OSError, IOError) as e:
+ LOG.warning("failed posting events to kvp, %s", e)
finally:
- self.q.task_done()
-
+ for _ in range(items_from_queue):
+ self.q.task_done()
# when main process exits, q.get() will through EOFError
# indicating we should exit this thread.
except EOFError:
@@ -322,7 +319,7 @@ class HyperVKvpReportingHandler(ReportingHandler):
# if the kvp pool already contains a chunk of data,
# so defer it to another thread.
def publish_event(self, event):
- if (not self._event_types or event.event_type in self._event_types):
+ if not self._event_types or event.event_type in self._event_types:
self.q.put(event)
def flush(self):