summaryrefslogtreecommitdiff
path: root/cloudinit/reporting/handlers.py
diff options
context:
space:
mode:
Diffstat (limited to 'cloudinit/reporting/handlers.py')
-rwxr-xr-xcloudinit/reporting/handlers.py128
1 files changed, 78 insertions, 50 deletions
diff --git a/cloudinit/reporting/handlers.py b/cloudinit/reporting/handlers.py
index 0a8c7af3..e163e168 100755
--- a/cloudinit/reporting/handlers.py
+++ b/cloudinit/reporting/handlers.py
@@ -12,8 +12,8 @@ import uuid
from datetime import datetime
from cloudinit import log as logging
+from cloudinit import url_helper, util
from cloudinit.registry import DictRegistry
-from cloudinit import (url_helper, util)
LOG = logging.getLogger(__name__)
@@ -55,7 +55,8 @@ class LogHandler(ReportingHandler):
def publish_event(self, event):
logger = logging.getLogger(
- '.'.join(['cloudinit', 'reporting', event.event_type, event.name]))
+ ".".join(["cloudinit", "reporting", event.event_type, event.name])
+ )
logger.log(self.level, event.as_string())
@@ -67,15 +68,25 @@ class PrintHandler(ReportingHandler):
class WebHookHandler(ReportingHandler):
- def __init__(self, endpoint, consumer_key=None, token_key=None,
- token_secret=None, consumer_secret=None, timeout=None,
- retries=None):
+ def __init__(
+ self,
+ endpoint,
+ consumer_key=None,
+ token_key=None,
+ token_secret=None,
+ consumer_secret=None,
+ timeout=None,
+ retries=None,
+ ):
super(WebHookHandler, self).__init__()
if any([consumer_key, token_key, token_secret, consumer_secret]):
self.oauth_helper = url_helper.OauthUrlHelper(
- consumer_key=consumer_key, token_key=token_key,
- token_secret=token_secret, consumer_secret=consumer_secret)
+ consumer_key=consumer_key,
+ token_key=token_key,
+ token_secret=token_secret,
+ consumer_secret=consumer_secret,
+ )
else:
self.oauth_helper = None
self.endpoint = endpoint
@@ -90,9 +101,12 @@ class WebHookHandler(ReportingHandler):
readurl = url_helper.readurl
try:
return readurl(
- self.endpoint, data=json.dumps(event.as_dict()),
+ self.endpoint,
+ data=json.dumps(event.as_dict()),
timeout=self.timeout,
- retries=self.retries, ssl_details=self.ssl_details)
+ retries=self.retries,
+ ssl_details=self.ssl_details,
+ )
except Exception:
LOG.warning("failed posting event: %s", event.as_string())
@@ -112,33 +126,35 @@ class HyperVKvpReportingHandler(ReportingHandler):
For more information, see
https://technet.microsoft.com/en-us/library/dn798287.aspx#Linux%20guests
"""
+
HV_KVP_EXCHANGE_MAX_VALUE_SIZE = 2048
# The maximum value size expected in Azure
HV_KVP_AZURE_MAX_VALUE_SIZE = 1024
HV_KVP_EXCHANGE_MAX_KEY_SIZE = 512
- HV_KVP_RECORD_SIZE = (HV_KVP_EXCHANGE_MAX_KEY_SIZE +
- HV_KVP_EXCHANGE_MAX_VALUE_SIZE)
- EVENT_PREFIX = 'CLOUD_INIT'
- MSG_KEY = 'msg'
- RESULT_KEY = 'result'
- DESC_IDX_KEY = 'msg_i'
- JSON_SEPARATORS = (',', ':')
- KVP_POOL_FILE_GUEST = '/var/lib/hyperv/.kvp_pool_1'
+ HV_KVP_RECORD_SIZE = (
+ HV_KVP_EXCHANGE_MAX_KEY_SIZE + HV_KVP_EXCHANGE_MAX_VALUE_SIZE
+ )
+ EVENT_PREFIX = "CLOUD_INIT"
+ MSG_KEY = "msg"
+ RESULT_KEY = "result"
+ DESC_IDX_KEY = "msg_i"
+ JSON_SEPARATORS = (",", ":")
+ KVP_POOL_FILE_GUEST = "/var/lib/hyperv/.kvp_pool_1"
_already_truncated_pool_file = False
- def __init__(self,
- kvp_file_path=KVP_POOL_FILE_GUEST,
- event_types=None):
+ def __init__(self, kvp_file_path=KVP_POOL_FILE_GUEST, event_types=None):
super(HyperVKvpReportingHandler, self).__init__()
self._kvp_file_path = kvp_file_path
HyperVKvpReportingHandler._truncate_guest_pool_file(
- self._kvp_file_path)
+ self._kvp_file_path
+ )
self._event_types = event_types
self.q = queue.Queue()
self.incarnation_no = self._get_incarnation_no()
- self.event_key_prefix = u"{0}|{1}".format(self.EVENT_PREFIX,
- self.incarnation_no)
+ self.event_key_prefix = "{0}|{1}".format(
+ self.EVENT_PREFIX, self.incarnation_no
+ )
self.publish_thread = threading.Thread(
target=self._publish_event_routine
)
@@ -184,7 +200,7 @@ class HyperVKvpReportingHandler(ReportingHandler):
def _iterate_kvps(self, offset):
"""iterate the kvp file from the current offset."""
- with open(self._kvp_file_path, 'rb') as f:
+ with open(self._kvp_file_path, "rb") as f:
fcntl.flock(f, fcntl.LOCK_EX)
f.seek(offset)
record_data = f.read(self.HV_KVP_RECORD_SIZE)
@@ -200,9 +216,9 @@ class HyperVKvpReportingHandler(ReportingHandler):
CLOUD_INIT|<incarnation number>|<event_type>|<event_name>|<uuid>
[|subevent_index]
"""
- return u"{0}|{1}|{2}|{3}".format(self.event_key_prefix,
- event.event_type, event.name,
- uuid.uuid4())
+ return "{0}|{1}|{2}|{3}".format(
+ self.event_key_prefix, event.event_type, event.name, uuid.uuid4()
+ )
def _encode_kvp_item(self, key, value):
data = struct.pack(
@@ -220,19 +236,27 @@ class HyperVKvpReportingHandler(ReportingHandler):
record_data_len = len(record_data)
if record_data_len != self.HV_KVP_RECORD_SIZE:
raise ReportException(
- "record_data len not correct {0} {1}."
- .format(record_data_len, self.HV_KVP_RECORD_SIZE))
- k = (record_data[0:self.HV_KVP_EXCHANGE_MAX_KEY_SIZE].decode('utf-8')
- .strip('\x00'))
+ "record_data len not correct {0} {1}.".format(
+ record_data_len, self.HV_KVP_RECORD_SIZE
+ )
+ )
+ k = (
+ record_data[0 : self.HV_KVP_EXCHANGE_MAX_KEY_SIZE]
+ .decode("utf-8")
+ .strip("\x00")
+ )
v = (
record_data[
- self.HV_KVP_EXCHANGE_MAX_KEY_SIZE:self.HV_KVP_RECORD_SIZE
- ].decode('utf-8').strip('\x00'))
+ self.HV_KVP_EXCHANGE_MAX_KEY_SIZE : self.HV_KVP_RECORD_SIZE
+ ]
+ .decode("utf-8")
+ .strip("\x00")
+ )
- return {'key': k, 'value': v}
+ return {"key": k, "value": v}
def _append_kvp_item(self, record_data):
- with open(self._kvp_file_path, 'ab') as f:
+ with open(self._kvp_file_path, "ab") as f:
fcntl.flock(f, fcntl.LOCK_EX)
for data in record_data:
f.write(data)
@@ -242,22 +266,25 @@ class HyperVKvpReportingHandler(ReportingHandler):
def _break_down(self, key, meta_data, description):
del meta_data[self.MSG_KEY]
des_in_json = json.dumps(description)
- des_in_json = des_in_json[1:(len(des_in_json) - 1)]
+ des_in_json = des_in_json[1 : (len(des_in_json) - 1)]
i = 0
result_array = []
- message_place_holder = "\"" + self.MSG_KEY + "\":\"\""
+ message_place_holder = '"' + self.MSG_KEY + '":""'
while True:
meta_data[self.DESC_IDX_KEY] = i
- meta_data[self.MSG_KEY] = ''
- data_without_desc = json.dumps(meta_data,
- separators=self.JSON_SEPARATORS)
+ meta_data[self.MSG_KEY] = ""
+ data_without_desc = json.dumps(
+ meta_data, separators=self.JSON_SEPARATORS
+ )
room_for_desc = (
- self.HV_KVP_AZURE_MAX_VALUE_SIZE -
- len(data_without_desc) - 8)
+ self.HV_KVP_AZURE_MAX_VALUE_SIZE - len(data_without_desc) - 8
+ )
value = data_without_desc.replace(
message_place_holder,
'"{key}":"{desc}"'.format(
- key=self.MSG_KEY, desc=des_in_json[:room_for_desc]))
+ key=self.MSG_KEY, desc=des_in_json[:room_for_desc]
+ ),
+ )
subkey = "{}|{}".format(key, i)
result_array.append(self._encode_kvp_item(subkey, value))
i += 1
@@ -276,8 +303,9 @@ class HyperVKvpReportingHandler(ReportingHandler):
meta_data = {
"name": event.name,
"type": event.event_type,
- "ts": (datetime.utcfromtimestamp(event.timestamp)
- .isoformat() + 'Z'),
+ "ts": (
+ datetime.utcfromtimestamp(event.timestamp).isoformat() + "Z"
+ ),
}
if hasattr(event, self.RESULT_KEY):
meta_data[self.RESULT_KEY] = event.result
@@ -327,14 +355,14 @@ class HyperVKvpReportingHandler(ReportingHandler):
self.q.put(event)
def flush(self):
- LOG.debug('HyperVReportingHandler flushing remaining events')
+ LOG.debug("HyperVReportingHandler flushing remaining events")
self.q.join()
available_handlers = DictRegistry()
-available_handlers.register_item('log', LogHandler)
-available_handlers.register_item('print', PrintHandler)
-available_handlers.register_item('webhook', WebHookHandler)
-available_handlers.register_item('hyperv', HyperVKvpReportingHandler)
+available_handlers.register_item("log", LogHandler)
+available_handlers.register_item("print", PrintHandler)
+available_handlers.register_item("webhook", WebHookHandler)
+available_handlers.register_item("hyperv", HyperVKvpReportingHandler)
# vi: ts=4 expandtab