diff options
Diffstat (limited to 'cloudinit/reporting/handlers.py')
-rwxr-xr-x | cloudinit/reporting/handlers.py | 128 |
1 files changed, 78 insertions, 50 deletions
diff --git a/cloudinit/reporting/handlers.py b/cloudinit/reporting/handlers.py index e32739ef..e163e168 100755 --- a/cloudinit/reporting/handlers.py +++ b/cloudinit/reporting/handlers.py @@ -12,8 +12,8 @@ import uuid from datetime import datetime from cloudinit import log as logging +from cloudinit import url_helper, util from cloudinit.registry import DictRegistry -from cloudinit import (url_helper, util) LOG = logging.getLogger(__name__) @@ -55,7 +55,8 @@ class LogHandler(ReportingHandler): def publish_event(self, event): logger = logging.getLogger( - '.'.join(['cloudinit', 'reporting', event.event_type, event.name])) + ".".join(["cloudinit", "reporting", event.event_type, event.name]) + ) logger.log(self.level, event.as_string()) @@ -67,15 +68,25 @@ class PrintHandler(ReportingHandler): class WebHookHandler(ReportingHandler): - def __init__(self, endpoint, consumer_key=None, token_key=None, - token_secret=None, consumer_secret=None, timeout=None, - retries=None): + def __init__( + self, + endpoint, + consumer_key=None, + token_key=None, + token_secret=None, + consumer_secret=None, + timeout=None, + retries=None, + ): super(WebHookHandler, self).__init__() if any([consumer_key, token_key, token_secret, consumer_secret]): self.oauth_helper = url_helper.OauthUrlHelper( - consumer_key=consumer_key, token_key=token_key, - token_secret=token_secret, consumer_secret=consumer_secret) + consumer_key=consumer_key, + token_key=token_key, + token_secret=token_secret, + consumer_secret=consumer_secret, + ) else: self.oauth_helper = None self.endpoint = endpoint @@ -90,9 +101,12 @@ class WebHookHandler(ReportingHandler): readurl = url_helper.readurl try: return readurl( - self.endpoint, data=json.dumps(event.as_dict()), + self.endpoint, + data=json.dumps(event.as_dict()), timeout=self.timeout, - retries=self.retries, ssl_details=self.ssl_details) + retries=self.retries, + ssl_details=self.ssl_details, + ) except Exception: LOG.warning("failed posting event: %s", event.as_string()) @@ -112,33 +126,35 @@ class HyperVKvpReportingHandler(ReportingHandler): For more information, see https://technet.microsoft.com/en-us/library/dn798287.aspx#Linux%20guests """ + HV_KVP_EXCHANGE_MAX_VALUE_SIZE = 2048 # The maximum value size expected in Azure HV_KVP_AZURE_MAX_VALUE_SIZE = 1024 HV_KVP_EXCHANGE_MAX_KEY_SIZE = 512 - HV_KVP_RECORD_SIZE = (HV_KVP_EXCHANGE_MAX_KEY_SIZE + - HV_KVP_EXCHANGE_MAX_VALUE_SIZE) - EVENT_PREFIX = 'CLOUD_INIT' - MSG_KEY = 'msg' - RESULT_KEY = 'result' - DESC_IDX_KEY = 'msg_i' - JSON_SEPARATORS = (',', ':') - KVP_POOL_FILE_GUEST = '/var/lib/hyperv/.kvp_pool_1' + HV_KVP_RECORD_SIZE = ( + HV_KVP_EXCHANGE_MAX_KEY_SIZE + HV_KVP_EXCHANGE_MAX_VALUE_SIZE + ) + EVENT_PREFIX = "CLOUD_INIT" + MSG_KEY = "msg" + RESULT_KEY = "result" + DESC_IDX_KEY = "msg_i" + JSON_SEPARATORS = (",", ":") + KVP_POOL_FILE_GUEST = "/var/lib/hyperv/.kvp_pool_1" _already_truncated_pool_file = False - def __init__(self, - kvp_file_path=KVP_POOL_FILE_GUEST, - event_types=None): + def __init__(self, kvp_file_path=KVP_POOL_FILE_GUEST, event_types=None): super(HyperVKvpReportingHandler, self).__init__() self._kvp_file_path = kvp_file_path HyperVKvpReportingHandler._truncate_guest_pool_file( - self._kvp_file_path) + self._kvp_file_path + ) self._event_types = event_types self.q = queue.Queue() self.incarnation_no = self._get_incarnation_no() - self.event_key_prefix = "{0}|{1}".format(self.EVENT_PREFIX, - self.incarnation_no) + self.event_key_prefix = "{0}|{1}".format( + self.EVENT_PREFIX, self.incarnation_no + ) self.publish_thread = threading.Thread( target=self._publish_event_routine ) @@ -184,7 +200,7 @@ class HyperVKvpReportingHandler(ReportingHandler): def _iterate_kvps(self, offset): """iterate the kvp file from the current offset.""" - with open(self._kvp_file_path, 'rb') as f: + with open(self._kvp_file_path, "rb") as f: fcntl.flock(f, fcntl.LOCK_EX) f.seek(offset) record_data = f.read(self.HV_KVP_RECORD_SIZE) @@ -200,9 +216,9 @@ class HyperVKvpReportingHandler(ReportingHandler): CLOUD_INIT|<incarnation number>|<event_type>|<event_name>|<uuid> [|subevent_index] """ - return "{0}|{1}|{2}|{3}".format(self.event_key_prefix, - event.event_type, event.name, - uuid.uuid4()) + return "{0}|{1}|{2}|{3}".format( + self.event_key_prefix, event.event_type, event.name, uuid.uuid4() + ) def _encode_kvp_item(self, key, value): data = struct.pack( @@ -220,19 +236,27 @@ class HyperVKvpReportingHandler(ReportingHandler): record_data_len = len(record_data) if record_data_len != self.HV_KVP_RECORD_SIZE: raise ReportException( - "record_data len not correct {0} {1}." - .format(record_data_len, self.HV_KVP_RECORD_SIZE)) - k = (record_data[0:self.HV_KVP_EXCHANGE_MAX_KEY_SIZE].decode('utf-8') - .strip('\x00')) + "record_data len not correct {0} {1}.".format( + record_data_len, self.HV_KVP_RECORD_SIZE + ) + ) + k = ( + record_data[0 : self.HV_KVP_EXCHANGE_MAX_KEY_SIZE] + .decode("utf-8") + .strip("\x00") + ) v = ( record_data[ - self.HV_KVP_EXCHANGE_MAX_KEY_SIZE:self.HV_KVP_RECORD_SIZE - ].decode('utf-8').strip('\x00')) + self.HV_KVP_EXCHANGE_MAX_KEY_SIZE : self.HV_KVP_RECORD_SIZE + ] + .decode("utf-8") + .strip("\x00") + ) - return {'key': k, 'value': v} + return {"key": k, "value": v} def _append_kvp_item(self, record_data): - with open(self._kvp_file_path, 'ab') as f: + with open(self._kvp_file_path, "ab") as f: fcntl.flock(f, fcntl.LOCK_EX) for data in record_data: f.write(data) @@ -242,22 +266,25 @@ class HyperVKvpReportingHandler(ReportingHandler): def _break_down(self, key, meta_data, description): del meta_data[self.MSG_KEY] des_in_json = json.dumps(description) - des_in_json = des_in_json[1:(len(des_in_json) - 1)] + des_in_json = des_in_json[1 : (len(des_in_json) - 1)] i = 0 result_array = [] - message_place_holder = "\"" + self.MSG_KEY + "\":\"\"" + message_place_holder = '"' + self.MSG_KEY + '":""' while True: meta_data[self.DESC_IDX_KEY] = i - meta_data[self.MSG_KEY] = '' - data_without_desc = json.dumps(meta_data, - separators=self.JSON_SEPARATORS) + meta_data[self.MSG_KEY] = "" + data_without_desc = json.dumps( + meta_data, separators=self.JSON_SEPARATORS + ) room_for_desc = ( - self.HV_KVP_AZURE_MAX_VALUE_SIZE - - len(data_without_desc) - 8) + self.HV_KVP_AZURE_MAX_VALUE_SIZE - len(data_without_desc) - 8 + ) value = data_without_desc.replace( message_place_holder, '"{key}":"{desc}"'.format( - key=self.MSG_KEY, desc=des_in_json[:room_for_desc])) + key=self.MSG_KEY, desc=des_in_json[:room_for_desc] + ), + ) subkey = "{}|{}".format(key, i) result_array.append(self._encode_kvp_item(subkey, value)) i += 1 @@ -276,8 +303,9 @@ class HyperVKvpReportingHandler(ReportingHandler): meta_data = { "name": event.name, "type": event.event_type, - "ts": (datetime.utcfromtimestamp(event.timestamp) - .isoformat() + 'Z'), + "ts": ( + datetime.utcfromtimestamp(event.timestamp).isoformat() + "Z" + ), } if hasattr(event, self.RESULT_KEY): meta_data[self.RESULT_KEY] = event.result @@ -327,14 +355,14 @@ class HyperVKvpReportingHandler(ReportingHandler): self.q.put(event) def flush(self): - LOG.debug('HyperVReportingHandler flushing remaining events') + LOG.debug("HyperVReportingHandler flushing remaining events") self.q.join() available_handlers = DictRegistry() -available_handlers.register_item('log', LogHandler) -available_handlers.register_item('print', PrintHandler) -available_handlers.register_item('webhook', WebHookHandler) -available_handlers.register_item('hyperv', HyperVKvpReportingHandler) +available_handlers.register_item("log", LogHandler) +available_handlers.register_item("print", PrintHandler) +available_handlers.register_item("webhook", WebHookHandler) +available_handlers.register_item("hyperv", HyperVKvpReportingHandler) # vi: ts=4 expandtab |