1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
|
# vi: ts=4 expandtab
import abc
import logging
import oauthlib.oauth1 as oauth1
import six
from cloudinit.registry import DictRegistry
from cloudinit import url_helper
from cloudinit import util
@six.add_metaclass(abc.ABCMeta)
class ReportingHandler(object):
"""Base class for report handlers.
Implement :meth:`~publish_event` for controlling what
the handler does with an event.
"""
@abc.abstractmethod
def publish_event(self, event):
"""Publish an event to the ``INFO`` log level."""
class LogHandler(ReportingHandler):
"""Publishes events to the cloud-init log at the ``INFO`` log level."""
def publish_event(self, event):
"""Publish an event to the ``INFO`` log level."""
logger = logging.getLogger(
'.'.join(['cloudinit', 'reporting', event.event_type, event.name]))
logger.info(event.as_string())
class WebHookHandler(ReportingHandler):
def __init__(self, endpoint, consumer_key=None, token_key=None,
token_secret=None, consumer_secret=None, timeout=None,
retries=None):
super(WebHookHandler, self).__init__()
if any(consumer_key, token_key, token_secret, consumer_secret):
self.oauth_helper = url_helper.OauthHelper(
consumer_key=consumer_key, token_key=token_key,
token_secret=token_secret, consumer_secret=consumer_secret)
else:
self.oauth_helper = None
self.endpoint = endpoint
self.timeout = timeout
self.retries = retries
self.ssl_details = util.fetch_ssl_details()
def publish_event(self, event):
if self.oauth_helper:
readurl = self.oauth_helper.readurl
else:
readurl = url_helper.readurl
return readurl(
self.endpoint, data=event.as_dict(),
timeout=self.timeout,
retries=self.retries, ssl_details=self.ssl_details)
available_handlers = DictRegistry()
available_handlers.register_item('log', LogHandler)
|