summaryrefslogtreecommitdiff
path: root/cloudinit/reporting
diff options
context:
space:
mode:
Diffstat (limited to 'cloudinit/reporting')
-rw-r--r--cloudinit/reporting/__init__.py6
-rw-r--r--cloudinit/reporting/handlers.py49
2 files changed, 26 insertions, 29 deletions
diff --git a/cloudinit/reporting/__init__.py b/cloudinit/reporting/__init__.py
index e047767e..ed5c7038 100644
--- a/cloudinit/reporting/__init__.py
+++ b/cloudinit/reporting/__init__.py
@@ -37,6 +37,12 @@ def update_configuration(config):
instantiated_handler_registry.register_item(handler_name, instance)
+def flush_events():
+ for _, handler in instantiated_handler_registry.registered_items.items():
+ if hasattr(handler, 'flush'):
+ handler.flush()
+
+
instantiated_handler_registry = DictRegistry()
update_configuration(DEFAULT_CONFIG)
diff --git a/cloudinit/reporting/handlers.py b/cloudinit/reporting/handlers.py
index 4b4bb396..6d23558e 100644
--- a/cloudinit/reporting/handlers.py
+++ b/cloudinit/reporting/handlers.py
@@ -16,10 +16,8 @@ from cloudinit import (url_helper, util)
from datetime import datetime
if six.PY2:
- import multiprocessing.queues as queue
from multiprocessing.queues import JoinableQueue as JQueue
else:
- import queue
from queue import Queue as JQueue
LOG = logging.getLogger(__name__)
@@ -41,6 +39,10 @@ class ReportingHandler(object):
def publish_event(self, event):
"""Publish an event."""
+ def flush(self):
+ """Ensure ReportingHandler has published all events"""
+ pass
+
class LogHandler(ReportingHandler):
"""Publishes events to the cloud-init log at the ``DEBUG`` log level."""
@@ -134,15 +136,16 @@ class HyperVKvpReportingHandler(ReportingHandler):
super(HyperVKvpReportingHandler, self).__init__()
self._kvp_file_path = kvp_file_path
self._event_types = event_types
- self.running = False
- self.queue_lock = threading.Lock()
- self.running_lock = threading.Lock()
self.q = JQueue()
self.kvp_file = None
self.incarnation_no = self._get_incarnation_no()
self.event_key_prefix = u"{0}|{1}".format(self.EVENT_PREFIX,
self.incarnation_no)
self._current_offset = 0
+ self.publish_thread = threading.Thread(
+ target=self._publish_event_routine)
+ self.publish_thread.daemon = True
+ self.publish_thread.start()
def _get_incarnation_no(self):
"""
@@ -276,10 +279,8 @@ class HyperVKvpReportingHandler(ReportingHandler):
def _publish_event_routine(self):
while True:
- event = None
try:
- # acquire the lock.
- event = self.q.get_nowait()
+ event = self.q.get(block=True)
need_append = True
try:
if not os.path.exists(self._kvp_file_path):
@@ -302,41 +303,31 @@ class HyperVKvpReportingHandler(ReportingHandler):
if int(match_groups[0]) < self.incarnation_no:
need_append = False
self._update_kvp_item(encoded_data)
- break
+ continue
if need_append:
self._append_kvp_item(encoded_data)
except IOError as e:
LOG.warning(
"failed posting event to kvp: %s e:%s",
event.as_string(), e)
- self.running = False
- break
finally:
self.q.task_done()
- except queue.Empty:
- with self.queue_lock:
- # double check the queue is empty
- if self.q.empty():
- self.running = False
- break
-
- def trigger_publish_event(self):
- if not self.running:
- with self.running_lock:
- if not self.running:
- self.running = True
- thread = threading.Thread(
- target=self._publish_event_routine)
- thread.start()
+
+ # when main process exits, q.get() will through EOFError
+ # indicating we should exit this thread.
+ except EOFError:
+ return
# since the saving to the kvp pool can be a time costing task
# if the kvp pool already contains a chunk of data,
# so defer it to another thread.
def publish_event(self, event):
if (not self._event_types or event.event_type in self._event_types):
- with self.queue_lock:
- self.q.put(event)
- self.trigger_publish_event()
+ self.q.put(event)
+
+ def flush(self):
+ LOG.debug('HyperVReportingHandler flushing remaining events')
+ self.q.join()
available_handlers = DictRegistry()