diff options
-rw-r--r-- | cloudinit/cloud.py | 4 | ||||
-rw-r--r-- | cloudinit/cmd/main.py | 4 | ||||
-rwxr-xr-x | cloudinit/distros/__init__.py | 2 | ||||
-rw-r--r-- | cloudinit/reporting/__init__.py | 2 | ||||
-rw-r--r-- | cloudinit/reporting/handlers.py | 255 | ||||
-rw-r--r-- | cloudinit/stages.py | 2 | ||||
-rw-r--r-- | tests/unittests/test_reporting_hyperv.py | 134 |
7 files changed, 396 insertions, 7 deletions
diff --git a/cloudinit/cloud.py b/cloudinit/cloud.py index 6d12c437..7ae98e1c 100644 --- a/cloudinit/cloud.py +++ b/cloudinit/cloud.py @@ -47,7 +47,7 @@ class Cloud(object): @property def cfg(self): - # Ensure that not indirectly modified + # Ensure that cfg is not indirectly modified return copy.deepcopy(self._cfg) def run(self, name, functor, args, freq=None, clear_on_fail=False): @@ -61,7 +61,7 @@ class Cloud(object): return None return fn - # The rest of thes are just useful proxies + # The rest of these are just useful proxies def get_userdata(self, apply_filter=True): return self.datasource.get_userdata(apply_filter) diff --git a/cloudinit/cmd/main.py b/cloudinit/cmd/main.py index d6ba90f4..c0edee18 100644 --- a/cloudinit/cmd/main.py +++ b/cloudinit/cmd/main.py @@ -315,7 +315,7 @@ def main_init(name, args): existing = "trust" init.purge_cache() - # Delete the non-net file as well + # Delete the no-net file as well util.del_file(os.path.join(path_helper.get_cpath("data"), "no-net")) # Stage 5 @@ -339,7 +339,7 @@ def main_init(name, args): " Likely bad things to come!")) if not args.force: init.apply_network_config(bring_up=not args.local) - LOG.debug("[%s] Exiting without datasource in local mode", mode) + LOG.debug("[%s] Exiting without datasource", mode) if mode == sources.DSMODE_LOCAL: return (None, []) else: diff --git a/cloudinit/distros/__init__.py b/cloudinit/distros/__init__.py index ab0b0776..fde054e9 100755 --- a/cloudinit/distros/__init__.py +++ b/cloudinit/distros/__init__.py @@ -157,7 +157,7 @@ class Distro(object): distro) header = '\n'.join([ "# Converted from network_config for distro %s" % distro, - "# Implmentation of _write_network_config is needed." + "# Implementation of _write_network_config is needed." ]) ns = network_state.parse_net_config_data(netconfig) contents = eni.network_state_to_eni( diff --git a/cloudinit/reporting/__init__.py b/cloudinit/reporting/__init__.py index 1ed2b487..e047767e 100644 --- a/cloudinit/reporting/__init__.py +++ b/cloudinit/reporting/__init__.py @@ -18,7 +18,7 @@ DEFAULT_CONFIG = { def update_configuration(config): - """Update the instanciated_handler_registry. + """Update the instantiated_handler_registry. :param config: The dictionary containing changes to apply. If a key is given diff --git a/cloudinit/reporting/handlers.py b/cloudinit/reporting/handlers.py index 4066076c..4b4bb396 100644 --- a/cloudinit/reporting/handlers.py +++ b/cloudinit/reporting/handlers.py @@ -1,17 +1,34 @@ # This file is part of cloud-init. See LICENSE file for license information. import abc +import fcntl import json import six +import os +import re +import struct +import threading +import time from cloudinit import log as logging from cloudinit.registry import DictRegistry from cloudinit import (url_helper, util) +from datetime import datetime +if six.PY2: + import multiprocessing.queues as queue + from multiprocessing.queues import JoinableQueue as JQueue +else: + import queue + from queue import Queue as JQueue LOG = logging.getLogger(__name__) +class ReportException(Exception): + pass + + @six.add_metaclass(abc.ABCMeta) class ReportingHandler(object): """Base class for report handlers. @@ -85,9 +102,247 @@ class WebHookHandler(ReportingHandler): LOG.warning("failed posting event: %s", event.as_string()) +class HyperVKvpReportingHandler(ReportingHandler): + """ + Reports events to a Hyper-V host using Key-Value-Pair exchange protocol + and can be used to obtain high level diagnostic information from the host. + + To use this facility, the KVP user-space daemon (hv_kvp_daemon) has to be + running. It reads the kvp_file when the host requests the guest to + enumerate the KVP's. + + This reporter collates all events for a module (origin|name) in a single + json string in the dictionary. + + For more information, see + https://technet.microsoft.com/en-us/library/dn798287.aspx#Linux%20guests + """ + HV_KVP_EXCHANGE_MAX_VALUE_SIZE = 2048 + HV_KVP_EXCHANGE_MAX_KEY_SIZE = 512 + HV_KVP_RECORD_SIZE = (HV_KVP_EXCHANGE_MAX_KEY_SIZE + + HV_KVP_EXCHANGE_MAX_VALUE_SIZE) + EVENT_PREFIX = 'CLOUD_INIT' + MSG_KEY = 'msg' + RESULT_KEY = 'result' + DESC_IDX_KEY = 'msg_i' + JSON_SEPARATORS = (',', ':') + KVP_POOL_FILE_GUEST = '/var/lib/hyperv/.kvp_pool_1' + + def __init__(self, + kvp_file_path=KVP_POOL_FILE_GUEST, + event_types=None): + super(HyperVKvpReportingHandler, self).__init__() + self._kvp_file_path = kvp_file_path + self._event_types = event_types + self.running = False + self.queue_lock = threading.Lock() + self.running_lock = threading.Lock() + self.q = JQueue() + self.kvp_file = None + self.incarnation_no = self._get_incarnation_no() + self.event_key_prefix = u"{0}|{1}".format(self.EVENT_PREFIX, + self.incarnation_no) + self._current_offset = 0 + + def _get_incarnation_no(self): + """ + use the time passed as the incarnation number. + the incarnation number is the number which are used to + distinguish the old data stored in kvp and the new data. + """ + uptime_str = util.uptime() + try: + return int(time.time() - float(uptime_str)) + except ValueError: + LOG.warning("uptime '%s' not in correct format.", uptime_str) + return 0 + + def _iterate_kvps(self, offset): + """iterate the kvp file from the current offset.""" + try: + with open(self._kvp_file_path, 'rb+') as f: + self.kvp_file = f + fcntl.flock(f, fcntl.LOCK_EX) + f.seek(offset) + record_data = f.read(self.HV_KVP_RECORD_SIZE) + while len(record_data) == self.HV_KVP_RECORD_SIZE: + self._current_offset += self.HV_KVP_RECORD_SIZE + kvp_item = self._decode_kvp_item(record_data) + yield kvp_item + record_data = f.read(self.HV_KVP_RECORD_SIZE) + fcntl.flock(f, fcntl.LOCK_UN) + finally: + self.kvp_file = None + + def _event_key(self, event): + """ + the event key format is: + CLOUD_INIT|<incarnation number>|<event_type>|<event_name> + """ + return u"{0}|{1}|{2}".format(self.event_key_prefix, + event.event_type, event.name) + + def _encode_kvp_item(self, key, value): + data = (struct.pack("%ds%ds" % ( + self.HV_KVP_EXCHANGE_MAX_KEY_SIZE, + self.HV_KVP_EXCHANGE_MAX_VALUE_SIZE), + key.encode('utf-8'), value.encode('utf-8'))) + return data + + def _decode_kvp_item(self, record_data): + record_data_len = len(record_data) + if record_data_len != self.HV_KVP_RECORD_SIZE: + raise ReportException( + "record_data len not correct {0} {1}." + .format(record_data_len, self.HV_KVP_RECORD_SIZE)) + k = (record_data[0:self.HV_KVP_EXCHANGE_MAX_KEY_SIZE].decode('utf-8') + .strip('\x00')) + v = ( + record_data[ + self.HV_KVP_EXCHANGE_MAX_KEY_SIZE:self.HV_KVP_RECORD_SIZE + ].decode('utf-8').strip('\x00')) + + return {'key': k, 'value': v} + + def _update_kvp_item(self, record_data): + if self.kvp_file is None: + raise ReportException( + "kvp file '{0}' not opened." + .format(self._kvp_file_path)) + self.kvp_file.seek(-self.HV_KVP_RECORD_SIZE, 1) + self.kvp_file.write(record_data) + + def _append_kvp_item(self, record_data): + with open(self._kvp_file_path, 'rb+') as f: + fcntl.flock(f, fcntl.LOCK_EX) + # seek to end of the file + f.seek(0, 2) + f.write(record_data) + f.flush() + fcntl.flock(f, fcntl.LOCK_UN) + self._current_offset = f.tell() + + def _break_down(self, key, meta_data, description): + del meta_data[self.MSG_KEY] + des_in_json = json.dumps(description) + des_in_json = des_in_json[1:(len(des_in_json) - 1)] + i = 0 + result_array = [] + message_place_holder = "\"" + self.MSG_KEY + "\":\"\"" + while True: + meta_data[self.DESC_IDX_KEY] = i + meta_data[self.MSG_KEY] = '' + data_without_desc = json.dumps(meta_data, + separators=self.JSON_SEPARATORS) + room_for_desc = ( + self.HV_KVP_EXCHANGE_MAX_VALUE_SIZE - + len(data_without_desc) - 8) + value = data_without_desc.replace( + message_place_holder, + '"{key}":"{desc}"'.format( + key=self.MSG_KEY, desc=des_in_json[:room_for_desc])) + result_array.append(self._encode_kvp_item(key, value)) + i += 1 + des_in_json = des_in_json[room_for_desc:] + if len(des_in_json) == 0: + break + return result_array + + def _encode_event(self, event): + """ + encode the event into kvp data bytes. + if the event content reaches the maximum length of kvp value. + then it would be cut to multiple slices. + """ + key = self._event_key(event) + meta_data = { + "name": event.name, + "type": event.event_type, + "ts": (datetime.utcfromtimestamp(event.timestamp) + .isoformat() + 'Z'), + } + if hasattr(event, self.RESULT_KEY): + meta_data[self.RESULT_KEY] = event.result + meta_data[self.MSG_KEY] = event.description + value = json.dumps(meta_data, separators=self.JSON_SEPARATORS) + # if it reaches the maximum length of kvp value, + # break it down to slices. + # this should be very corner case. + if len(value) > self.HV_KVP_EXCHANGE_MAX_VALUE_SIZE: + return self._break_down(key, meta_data, event.description) + else: + data = self._encode_kvp_item(key, value) + return [data] + + def _publish_event_routine(self): + while True: + event = None + try: + # acquire the lock. + event = self.q.get_nowait() + need_append = True + try: + if not os.path.exists(self._kvp_file_path): + LOG.warning( + "skip writing events %s to %s. file not present.", + event.as_string(), + self._kvp_file_path) + encoded_event = self._encode_event(event) + # for each encoded_event + for encoded_data in (encoded_event): + for kvp in self._iterate_kvps(self._current_offset): + match = ( + re.match( + r"^{0}\|(\d+)\|.+" + .format(self.EVENT_PREFIX), + kvp['key'] + )) + if match: + match_groups = match.groups(0) + if int(match_groups[0]) < self.incarnation_no: + need_append = False + self._update_kvp_item(encoded_data) + break + if need_append: + self._append_kvp_item(encoded_data) + except IOError as e: + LOG.warning( + "failed posting event to kvp: %s e:%s", + event.as_string(), e) + self.running = False + break + finally: + self.q.task_done() + except queue.Empty: + with self.queue_lock: + # double check the queue is empty + if self.q.empty(): + self.running = False + break + + def trigger_publish_event(self): + if not self.running: + with self.running_lock: + if not self.running: + self.running = True + thread = threading.Thread( + target=self._publish_event_routine) + thread.start() + + # since the saving to the kvp pool can be a time costing task + # if the kvp pool already contains a chunk of data, + # so defer it to another thread. + def publish_event(self, event): + if (not self._event_types or event.event_type in self._event_types): + with self.queue_lock: + self.q.put(event) + self.trigger_publish_event() + + available_handlers = DictRegistry() available_handlers.register_item('log', LogHandler) available_handlers.register_item('print', PrintHandler) available_handlers.register_item('webhook', WebHookHandler) +available_handlers.register_item('hyperv', HyperVKvpReportingHandler) # vi: ts=4 expandtab diff --git a/cloudinit/stages.py b/cloudinit/stages.py index c132b57d..8874d405 100644 --- a/cloudinit/stages.py +++ b/cloudinit/stages.py @@ -510,7 +510,7 @@ class Init(object): # The default frequency if handlers don't have one 'frequency': frequency, # This will be used when new handlers are found - # to help write there contents to files with numbered + # to help write their contents to files with numbered # names... 'handlercount': 0, 'excluded': excluded, diff --git a/tests/unittests/test_reporting_hyperv.py b/tests/unittests/test_reporting_hyperv.py new file mode 100644 index 00000000..2e64c6c7 --- /dev/null +++ b/tests/unittests/test_reporting_hyperv.py @@ -0,0 +1,134 @@ +# This file is part of cloud-init. See LICENSE file for license information. + +from cloudinit.reporting import events +from cloudinit.reporting import handlers + +import json +import os + +from cloudinit import util +from cloudinit.tests.helpers import CiTestCase + + +class TestKvpEncoding(CiTestCase): + def test_encode_decode(self): + kvp = {'key': 'key1', 'value': 'value1'} + kvp_reporting = handlers.HyperVKvpReportingHandler() + data = kvp_reporting._encode_kvp_item(kvp['key'], kvp['value']) + self.assertEqual(len(data), kvp_reporting.HV_KVP_RECORD_SIZE) + decoded_kvp = kvp_reporting._decode_kvp_item(data) + self.assertEqual(kvp, decoded_kvp) + + +class TextKvpReporter(CiTestCase): + def setUp(self): + super(TextKvpReporter, self).setUp() + self.tmp_file_path = self.tmp_path('kvp_pool_file') + util.ensure_file(self.tmp_file_path) + + def test_event_type_can_be_filtered(self): + reporter = handlers.HyperVKvpReportingHandler( + kvp_file_path=self.tmp_file_path, + event_types=['foo', 'bar']) + + reporter.publish_event( + events.ReportingEvent('foo', 'name', 'description')) + reporter.publish_event( + events.ReportingEvent('some_other', 'name', 'description3')) + reporter.q.join() + + kvps = list(reporter._iterate_kvps(0)) + self.assertEqual(1, len(kvps)) + + reporter.publish_event( + events.ReportingEvent('bar', 'name', 'description2')) + reporter.q.join() + kvps = list(reporter._iterate_kvps(0)) + self.assertEqual(2, len(kvps)) + + self.assertIn('foo', kvps[0]['key']) + self.assertIn('bar', kvps[1]['key']) + self.assertNotIn('some_other', kvps[0]['key']) + self.assertNotIn('some_other', kvps[1]['key']) + + def test_events_are_over_written(self): + reporter = handlers.HyperVKvpReportingHandler( + kvp_file_path=self.tmp_file_path) + + self.assertEqual(0, len(list(reporter._iterate_kvps(0)))) + + reporter.publish_event( + events.ReportingEvent('foo', 'name1', 'description')) + reporter.publish_event( + events.ReportingEvent('foo', 'name2', 'description')) + reporter.q.join() + self.assertEqual(2, len(list(reporter._iterate_kvps(0)))) + + reporter2 = handlers.HyperVKvpReportingHandler( + kvp_file_path=self.tmp_file_path) + reporter2.incarnation_no = reporter.incarnation_no + 1 + reporter2.publish_event( + events.ReportingEvent('foo', 'name3', 'description')) + reporter2.q.join() + + self.assertEqual(2, len(list(reporter2._iterate_kvps(0)))) + + def test_events_with_higher_incarnation_not_over_written(self): + reporter = handlers.HyperVKvpReportingHandler( + kvp_file_path=self.tmp_file_path) + + self.assertEqual(0, len(list(reporter._iterate_kvps(0)))) + + reporter.publish_event( + events.ReportingEvent('foo', 'name1', 'description')) + reporter.publish_event( + events.ReportingEvent('foo', 'name2', 'description')) + reporter.q.join() + self.assertEqual(2, len(list(reporter._iterate_kvps(0)))) + + reporter3 = handlers.HyperVKvpReportingHandler( + kvp_file_path=self.tmp_file_path) + reporter3.incarnation_no = reporter.incarnation_no - 1 + reporter3.publish_event( + events.ReportingEvent('foo', 'name3', 'description')) + reporter3.q.join() + self.assertEqual(3, len(list(reporter3._iterate_kvps(0)))) + + def test_finish_event_result_is_logged(self): + reporter = handlers.HyperVKvpReportingHandler( + kvp_file_path=self.tmp_file_path) + reporter.publish_event( + events.FinishReportingEvent('name2', 'description1', + result=events.status.FAIL)) + reporter.q.join() + self.assertIn('FAIL', list(reporter._iterate_kvps(0))[0]['value']) + + def test_file_operation_issue(self): + os.remove(self.tmp_file_path) + reporter = handlers.HyperVKvpReportingHandler( + kvp_file_path=self.tmp_file_path) + reporter.publish_event( + events.FinishReportingEvent('name2', 'description1', + result=events.status.FAIL)) + reporter.q.join() + + def test_event_very_long(self): + reporter = handlers.HyperVKvpReportingHandler( + kvp_file_path=self.tmp_file_path) + description = 'ab' * reporter.HV_KVP_EXCHANGE_MAX_VALUE_SIZE + long_event = events.FinishReportingEvent( + 'event_name', + description, + result=events.status.FAIL) + reporter.publish_event(long_event) + reporter.q.join() + kvps = list(reporter._iterate_kvps(0)) + self.assertEqual(3, len(kvps)) + + # restore from the kvp to see the content are all there + full_description = '' + for i in range(len(kvps)): + msg_slice = json.loads(kvps[i]['value']) + self.assertEqual(msg_slice['msg_i'], i) + full_description += msg_slice['msg'] + self.assertEqual(description, full_description) |