diff options
-rw-r--r-- | cloudinit/cmd/main.py | 4 | ||||
-rw-r--r-- | cloudinit/reporting/__init__.py | 6 | ||||
-rw-r--r-- | cloudinit/reporting/handlers.py | 49 |
3 files changed, 29 insertions, 30 deletions
diff --git a/cloudinit/cmd/main.py b/cloudinit/cmd/main.py index c0edee18..4ea4fe7f 100644 --- a/cloudinit/cmd/main.py +++ b/cloudinit/cmd/main.py @@ -877,9 +877,11 @@ def main(sysv_args=None): rname, rdesc, reporting_enabled=report_on) with args.reporter: - return util.log_time( + retval = util.log_time( logfunc=LOG.debug, msg="cloud-init mode '%s'" % name, get_uptime=True, func=functor, args=(name, args)) + reporting.flush_events() + return retval if __name__ == '__main__': diff --git a/cloudinit/reporting/__init__.py b/cloudinit/reporting/__init__.py index e047767e..ed5c7038 100644 --- a/cloudinit/reporting/__init__.py +++ b/cloudinit/reporting/__init__.py @@ -37,6 +37,12 @@ def update_configuration(config): instantiated_handler_registry.register_item(handler_name, instance) +def flush_events(): + for _, handler in instantiated_handler_registry.registered_items.items(): + if hasattr(handler, 'flush'): + handler.flush() + + instantiated_handler_registry = DictRegistry() update_configuration(DEFAULT_CONFIG) diff --git a/cloudinit/reporting/handlers.py b/cloudinit/reporting/handlers.py index 4b4bb396..6d23558e 100644 --- a/cloudinit/reporting/handlers.py +++ b/cloudinit/reporting/handlers.py @@ -16,10 +16,8 @@ from cloudinit import (url_helper, util) from datetime import datetime if six.PY2: - import multiprocessing.queues as queue from multiprocessing.queues import JoinableQueue as JQueue else: - import queue from queue import Queue as JQueue LOG = logging.getLogger(__name__) @@ -41,6 +39,10 @@ class ReportingHandler(object): def publish_event(self, event): """Publish an event.""" + def flush(self): + """Ensure ReportingHandler has published all events""" + pass + class LogHandler(ReportingHandler): """Publishes events to the cloud-init log at the ``DEBUG`` log level.""" @@ -134,15 +136,16 @@ class HyperVKvpReportingHandler(ReportingHandler): super(HyperVKvpReportingHandler, self).__init__() self._kvp_file_path = kvp_file_path self._event_types = event_types - self.running = False - self.queue_lock = threading.Lock() - self.running_lock = threading.Lock() self.q = JQueue() self.kvp_file = None self.incarnation_no = self._get_incarnation_no() self.event_key_prefix = u"{0}|{1}".format(self.EVENT_PREFIX, self.incarnation_no) self._current_offset = 0 + self.publish_thread = threading.Thread( + target=self._publish_event_routine) + self.publish_thread.daemon = True + self.publish_thread.start() def _get_incarnation_no(self): """ @@ -276,10 +279,8 @@ class HyperVKvpReportingHandler(ReportingHandler): def _publish_event_routine(self): while True: - event = None try: - # acquire the lock. - event = self.q.get_nowait() + event = self.q.get(block=True) need_append = True try: if not os.path.exists(self._kvp_file_path): @@ -302,41 +303,31 @@ class HyperVKvpReportingHandler(ReportingHandler): if int(match_groups[0]) < self.incarnation_no: need_append = False self._update_kvp_item(encoded_data) - break + continue if need_append: self._append_kvp_item(encoded_data) except IOError as e: LOG.warning( "failed posting event to kvp: %s e:%s", event.as_string(), e) - self.running = False - break finally: self.q.task_done() - except queue.Empty: - with self.queue_lock: - # double check the queue is empty - if self.q.empty(): - self.running = False - break - - def trigger_publish_event(self): - if not self.running: - with self.running_lock: - if not self.running: - self.running = True - thread = threading.Thread( - target=self._publish_event_routine) - thread.start() + + # when main process exits, q.get() will through EOFError + # indicating we should exit this thread. + except EOFError: + return # since the saving to the kvp pool can be a time costing task # if the kvp pool already contains a chunk of data, # so defer it to another thread. def publish_event(self, event): if (not self._event_types or event.event_type in self._event_types): - with self.queue_lock: - self.q.put(event) - self.trigger_publish_event() + self.q.put(event) + + def flush(self): + LOG.debug('HyperVReportingHandler flushing remaining events') + self.q.join() available_handlers = DictRegistry() |