diff options
author | Ryan Harper <ryan.harper@canonical.com> | 2018-08-31 21:47:18 +0000 |
---|---|---|
committer | Server Team CI Bot <josh.powers+server-team-bot@canonical.com> | 2018-08-31 21:47:18 +0000 |
commit | 43e51a04515686a15c410d1a16dd5ff06fd1afd4 (patch) | |
tree | 4ad8fb33fc749669f7a1ef583cfe82979b207bb2 /cloudinit | |
parent | 9c35f9762028b8bf15cdcd6b42c0fafc233ddda3 (diff) | |
download | vyos-cloud-init-43e51a04515686a15c410d1a16dd5ff06fd1afd4.tar.gz vyos-cloud-init-43e51a04515686a15c410d1a16dd5ff06fd1afd4.zip |
hyperv_reporting_handler: simplify threaded publisher
Switch the implementation to a daemon thread which uses a
blocking get from the Queue. No additional locking or flag checking
is needed since the Queue itself handles acquiring the lock as needed.
cloud-init only has a single producer (the main thread calling publish)
and the consumer will read all events in the queue and write them out.
Using the daemon mode of the thread handles flushing the queue on
main exit in python3; in python2.7 we handle the EOFError that results
when the publish thread calls to get() fails indicating the main thread
has exited.
The result is that the handler is no longer spawing a thread on each
publish event but rather creates a single thread when we start up
the reporter and we remove any additional use of separate locks and
flags as we only have a single Queue object and we're only calling
queue.put() from main thread and queue.get() from consuming thread.
Diffstat (limited to 'cloudinit')
-rw-r--r-- | cloudinit/cmd/main.py | 4 | ||||
-rw-r--r-- | cloudinit/reporting/__init__.py | 6 | ||||
-rw-r--r-- | cloudinit/reporting/handlers.py | 49 |
3 files changed, 29 insertions, 30 deletions
diff --git a/cloudinit/cmd/main.py b/cloudinit/cmd/main.py index c0edee18..4ea4fe7f 100644 --- a/cloudinit/cmd/main.py +++ b/cloudinit/cmd/main.py @@ -877,9 +877,11 @@ def main(sysv_args=None): rname, rdesc, reporting_enabled=report_on) with args.reporter: - return util.log_time( + retval = util.log_time( logfunc=LOG.debug, msg="cloud-init mode '%s'" % name, get_uptime=True, func=functor, args=(name, args)) + reporting.flush_events() + return retval if __name__ == '__main__': diff --git a/cloudinit/reporting/__init__.py b/cloudinit/reporting/__init__.py index e047767e..ed5c7038 100644 --- a/cloudinit/reporting/__init__.py +++ b/cloudinit/reporting/__init__.py @@ -37,6 +37,12 @@ def update_configuration(config): instantiated_handler_registry.register_item(handler_name, instance) +def flush_events(): + for _, handler in instantiated_handler_registry.registered_items.items(): + if hasattr(handler, 'flush'): + handler.flush() + + instantiated_handler_registry = DictRegistry() update_configuration(DEFAULT_CONFIG) diff --git a/cloudinit/reporting/handlers.py b/cloudinit/reporting/handlers.py index 4b4bb396..6d23558e 100644 --- a/cloudinit/reporting/handlers.py +++ b/cloudinit/reporting/handlers.py @@ -16,10 +16,8 @@ from cloudinit import (url_helper, util) from datetime import datetime if six.PY2: - import multiprocessing.queues as queue from multiprocessing.queues import JoinableQueue as JQueue else: - import queue from queue import Queue as JQueue LOG = logging.getLogger(__name__) @@ -41,6 +39,10 @@ class ReportingHandler(object): def publish_event(self, event): """Publish an event.""" + def flush(self): + """Ensure ReportingHandler has published all events""" + pass + class LogHandler(ReportingHandler): """Publishes events to the cloud-init log at the ``DEBUG`` log level.""" @@ -134,15 +136,16 @@ class HyperVKvpReportingHandler(ReportingHandler): super(HyperVKvpReportingHandler, self).__init__() self._kvp_file_path = kvp_file_path self._event_types = event_types - self.running = False - self.queue_lock = threading.Lock() - self.running_lock = threading.Lock() self.q = JQueue() self.kvp_file = None self.incarnation_no = self._get_incarnation_no() self.event_key_prefix = u"{0}|{1}".format(self.EVENT_PREFIX, self.incarnation_no) self._current_offset = 0 + self.publish_thread = threading.Thread( + target=self._publish_event_routine) + self.publish_thread.daemon = True + self.publish_thread.start() def _get_incarnation_no(self): """ @@ -276,10 +279,8 @@ class HyperVKvpReportingHandler(ReportingHandler): def _publish_event_routine(self): while True: - event = None try: - # acquire the lock. - event = self.q.get_nowait() + event = self.q.get(block=True) need_append = True try: if not os.path.exists(self._kvp_file_path): @@ -302,41 +303,31 @@ class HyperVKvpReportingHandler(ReportingHandler): if int(match_groups[0]) < self.incarnation_no: need_append = False self._update_kvp_item(encoded_data) - break + continue if need_append: self._append_kvp_item(encoded_data) except IOError as e: LOG.warning( "failed posting event to kvp: %s e:%s", event.as_string(), e) - self.running = False - break finally: self.q.task_done() - except queue.Empty: - with self.queue_lock: - # double check the queue is empty - if self.q.empty(): - self.running = False - break - - def trigger_publish_event(self): - if not self.running: - with self.running_lock: - if not self.running: - self.running = True - thread = threading.Thread( - target=self._publish_event_routine) - thread.start() + + # when main process exits, q.get() will through EOFError + # indicating we should exit this thread. + except EOFError: + return # since the saving to the kvp pool can be a time costing task # if the kvp pool already contains a chunk of data, # so defer it to another thread. def publish_event(self, event): if (not self._event_types or event.event_type in self._event_types): - with self.queue_lock: - self.q.put(event) - self.trigger_publish_event() + self.q.put(event) + + def flush(self): + LOG.debug('HyperVReportingHandler flushing remaining events') + self.q.join() available_handlers = DictRegistry() |