summaryrefslogtreecommitdiff
path: root/cloudinit
diff options
context:
space:
mode:
Diffstat (limited to 'cloudinit')
-rw-r--r--cloudinit/reporting/handlers.py28
-rw-r--r--cloudinit/sources/DataSourceMAAS.py88
-rw-r--r--cloudinit/url_helper.py142
-rw-r--r--cloudinit/util.py3
4 files changed, 186 insertions, 75 deletions
diff --git a/cloudinit/reporting/handlers.py b/cloudinit/reporting/handlers.py
index 86cbe3c3..d8f69641 100644
--- a/cloudinit/reporting/handlers.py
+++ b/cloudinit/reporting/handlers.py
@@ -34,5 +34,33 @@ class LogHandler(ReportingHandler):
logger.info(event.as_string())
+class WebHookHandler(ReportingHandler):
+ def __init__(self, endpoint, consumer_key=None, token_key=None,
+ token_secret=None, consumer_secret=None, timeout=None,
+ retries=None):
+ super(WebHookHandler, self).__init__()
+
+ if any(consumer_key, token_key, token_secret, consumer_secret):
+ self.oauth_helper = url_helper.OauthHelper(
+ consumer_key=consumer_key, token_key=token_key,
+ token_secret=token_secret, consumer_secret=consumer_secret)
+ else:
+ self.oauth_helper = None
+ self.endpoint = endpoint
+ self.timeout = timeout
+ self.retries = retries
+ self.ssl_details = util.fetch_ssl_details()
+
+ def publish_event(self, event):
+ if self.oauth_helper:
+ readurl = self.oauth_helper.readurl
+ else:
+ readurl = url_helper.readurl
+ return readurl(
+ self.endpoint, data=event.as_dict(),
+ timeout=self.timeout,
+ retries=self.retries, ssl_details=self.ssl_details)
+
+
available_handlers = DictRegistry()
available_handlers.register_item('log', LogHandler)
diff --git a/cloudinit/sources/DataSourceMAAS.py b/cloudinit/sources/DataSourceMAAS.py
index c1a0eb61..279da238 100644
--- a/cloudinit/sources/DataSourceMAAS.py
+++ b/cloudinit/sources/DataSourceMAAS.py
@@ -52,7 +52,20 @@ class DataSourceMAAS(sources.DataSource):
sources.DataSource.__init__(self, sys_cfg, distro, paths)
self.base_url = None
self.seed_dir = os.path.join(paths.seed_dir, 'maas')
- self.oauth_clockskew = None
+ self.oauth_helper = self._get_helper()
+
+ def _get_helper(self):
+ mcfg = self.ds_cfg
+ # If we are missing token_key, token_secret or consumer_key
+ # then just do non-authed requests
+ for required in ('token_key', 'token_secret', 'consumer_key'):
+ if required not in mcfg:
+ return url_helper.OauthUrlHelper()
+
+ return url_helper.OauthHelper(
+ consumer_key=mcfg['consumer_key'], token_key=mcfg['token_key'],
+ token_secret=mcfg['token_secret'],
+ consumer_secret=mcfg.get('consumer_secret'))
def __str__(self):
root = sources.DataSource.__str__(self)
@@ -84,9 +97,9 @@ class DataSourceMAAS(sources.DataSource):
self.base_url = url
- (userdata, metadata) = read_maas_seed_url(self.base_url,
- self._md_headers,
- paths=self.paths)
+ (userdata, metadata) = read_maas_seed_url(
+ self.base_url, self.oauth_helper.md_headers,
+ paths=self.paths)
self.userdata_raw = userdata
self.metadata = metadata
return True
@@ -94,31 +107,8 @@ class DataSourceMAAS(sources.DataSource):
util.logexc(LOG, "Failed fetching metadata from url %s", url)
return False
- def _md_headers(self, url):
- mcfg = self.ds_cfg
-
- # If we are missing token_key, token_secret or consumer_key
- # then just do non-authed requests
- for required in ('token_key', 'token_secret', 'consumer_key'):
- if required not in mcfg:
- return {}
-
- consumer_secret = mcfg.get('consumer_secret', "")
-
- timestamp = None
- if self.oauth_clockskew:
- timestamp = int(time.time()) + self.oauth_clockskew
-
- return oauth_headers(url=url,
- consumer_key=mcfg['consumer_key'],
- token_key=mcfg['token_key'],
- token_secret=mcfg['token_secret'],
- consumer_secret=consumer_secret,
- timestamp=timestamp)
-
def wait_for_metadata_service(self, url):
mcfg = self.ds_cfg
-
max_wait = 120
try:
max_wait = int(mcfg.get("max_wait", max_wait))
@@ -138,10 +128,8 @@ class DataSourceMAAS(sources.DataSource):
starttime = time.time()
check_url = "%s/%s/meta-data/instance-id" % (url, MD_VERSION)
urls = [check_url]
- url = url_helper.wait_for_url(urls=urls, max_wait=max_wait,
- timeout=timeout,
- exception_cb=self._except_cb,
- headers_cb=self._md_headers)
+ url = self.oauth_helper.wait_for_url(
+ urls=urls, max_wait=max_wait, timeout=timeout)
if url:
LOG.debug("Using metadata source: '%s'", url)
@@ -151,26 +139,6 @@ class DataSourceMAAS(sources.DataSource):
return bool(url)
- def _except_cb(self, msg, exception):
- if not (isinstance(exception, url_helper.UrlError) and
- (exception.code == 403 or exception.code == 401)):
- return
-
- if 'date' not in exception.headers:
- LOG.warn("Missing header 'date' in %s response", exception.code)
- return
-
- date = exception.headers['date']
- try:
- ret_time = time.mktime(parsedate(date))
- except Exception as e:
- LOG.warn("Failed to convert datetime '%s': %s", date, e)
- return
-
- self.oauth_clockskew = int(ret_time - time.time())
- LOG.warn("Setting oauth clockskew to %d", self.oauth_clockskew)
- return
-
def read_maas_seed_dir(seed_d):
"""
@@ -280,24 +248,6 @@ def check_seed_contents(content, seed):
return (userdata, md)
-def oauth_headers(url, consumer_key, token_key, token_secret, consumer_secret,
- timestamp=None):
- if timestamp:
- timestamp = str(timestamp)
- else:
- timestamp = None
-
- client = oauth1.Client(
- consumer_key,
- client_secret=consumer_secret,
- resource_owner_key=token_key,
- resource_owner_secret=token_secret,
- signature_method=oauth1.SIGNATURE_PLAINTEXT,
- timestamp=timestamp)
- uri, signed_headers, body = client.sign(url)
- return signed_headers
-
-
class MAASSeedDirNone(Exception):
pass
diff --git a/cloudinit/url_helper.py b/cloudinit/url_helper.py
index 0e65f431..2141cdc5 100644
--- a/cloudinit/url_helper.py
+++ b/cloudinit/url_helper.py
@@ -25,6 +25,10 @@ import time
import six
import requests
+import oauthlib.oauth1 as oauth1
+import os
+import json
+from functools import partial
from requests import exceptions
from six.moves.urllib.parse import (
@@ -147,13 +151,14 @@ class UrlResponse(object):
class UrlError(IOError):
- def __init__(self, cause, code=None, headers=None):
+ def __init__(self, cause, code=None, headers=None, url=None):
IOError.__init__(self, str(cause))
self.cause = cause
self.code = code
self.headers = headers
if self.headers is None:
self.headers = {}
+ self.url = url
def _get_ssl_args(url, ssl_details):
@@ -247,9 +252,10 @@ def readurl(url, data=None, timeout=None, retries=0, sec_between=1,
and hasattr(e, 'response') # This appeared in v 0.10.8
and hasattr(e.response, 'status_code')):
excps.append(UrlError(e, code=e.response.status_code,
- headers=e.response.headers))
+ headers=e.response.headers,
+ url=url))
else:
- excps.append(UrlError(e))
+ excps.append(UrlError(e, url=url))
if SSL_ENABLED and isinstance(e, exceptions.SSLError):
# ssl exceptions are not going to get fixed by waiting a
# few seconds
@@ -333,11 +339,11 @@ def wait_for_url(urls, max_wait=None, timeout=None,
if not response.contents:
reason = "empty response [%s]" % (response.code)
url_exc = UrlError(ValueError(reason), code=response.code,
- headers=response.headers)
+ headers=response.headers, url=url)
elif not response.ok():
reason = "bad status code [%s]" % (response.code)
url_exc = UrlError(ValueError(reason), code=response.code,
- headers=response.headers)
+ headers=response.headers, url=url)
else:
return url
except UrlError as e:
@@ -368,3 +374,129 @@ def wait_for_url(urls, max_wait=None, timeout=None,
time.sleep(sleep_time)
return False
+
+
+class OauthUrlHelper(object):
+ def __init__(self, consumer_key=None, token_key=None,
+ token_secret=None, consumer_secret=None,
+ skew_data_file="/run/oauth_skew.json"):
+ self.consumer_key = consumer_key
+ self.consumer_secret = consumer_secret or ""
+ self.token_key = token_key
+ self.token_secret = token_secret
+ self.skew_data_file = skew_data_file
+ self.skew_data = {}
+ self._do_oauth = True
+ self.skew_change_limit = 5
+ required = (self.token_key, self.token_secret, self.consumer_key)
+ if not any(required):
+ self._do_oauth = False
+ elif not all(required):
+ raise ValueError("all or none of token_key, token_secret, or "
+ "consumer_key can be set")
+
+ self.skew_data = self.read_skew_file()
+
+ def read_skew_file(self):
+ if self.skew_data_file and os.path.isfile(self.skew_data_file):
+ with open(self.skew_data_file, mode="r") as fp:
+ return json.load(fp.read())
+ return None
+
+ def update_skew_file(self, host, value):
+ # this is not atomic
+ cur = self.read_skew_file()
+ if cur is None or not self.skew_data_file:
+ return
+ cur[host] = value
+ with open(self.skew_data_file, mode="w") as fp:
+ fp.write(json.dumps(cur))
+
+ def exception_cb(self, msg, exception):
+ if not (isinstance(exception, UrlError) and
+ (exception.code == 403 or exception.code == 401)):
+ return
+
+ if 'date' not in exception.headers:
+ LOG.warn("Missing header 'date' in %s response", exception.code)
+ return
+
+ date = exception.headers['date']
+ try:
+ ret_time = time.mktime(parsedate(date))
+ except Exception as e:
+ LOG.warn("Failed to convert datetime '%s': %s", date, e)
+ return
+
+ host = urlparse(exception.url).netloc
+ skew = int(ret_time - time.time())
+ old_skew = self.skew_data.get(host)
+ if abs(old_skew - skew) > self.skew_change_limit:
+ self.update_skew_file(host, skew)
+ LOG.warn("Setting oauth clockskew for %s to %d",
+ host, skew)
+ skew_data[host] = skew
+
+ return
+
+ def headers_cb(self, url):
+ if not self._do_oauth:
+ return {}
+
+ timestamp = None
+ host = urlparse(url).netloc
+ if host in self.skew_data:
+ timestamp = int(time.time()) + self.skew_data[host]
+
+ return oauth_headers(
+ url=url, consumer_key=self.consumer_key,
+ token_key=self.token_key, token_secret=self.token_secret,
+ consumer_secret=self.consumer_secret, timestamp=timestamp)
+
+ def _wrapped(self, wrapped_func, args, kwargs):
+ kwargs['headers_cb'] = partial(
+ self._headers_cb, kwargs.get('headers_cb'))
+ kwargs['exception_cb'] = partial(
+ self._exception_cb, kwargs.get('exception_cb'))
+ return wrapped_func(*args, **kwargs)
+
+ def wait_for_url(self, *args, **kwargs):
+ return self._wrapped(wait_for_url, args, kwargs)
+
+ def readurl(self, *args, **kwargs):
+ return self._wrapped(readurl, args, kwargs)
+
+ def _exception_cb(self, extra_exception_cb, url, msg, exception):
+ ret = None
+ try:
+ if extra_exception_cb:
+ ret = extra_exception_cb(msg, exception)
+ finally:
+ self.exception_cb(self, msg, exception)
+ return ret
+
+ def _headers_cb(self, extra_headers_cb, url):
+ headers = {}
+ if extra_headers_cb:
+ headers = extra_headers_cb(url)
+ if headers:
+ headers.update(self.headers_cb(url))
+ return headers
+
+
+def oauth_headers(url, consumer_key, token_key, token_secret, consumer_secret,
+ timestamp=None):
+ if timestamp:
+ timestamp = str(timestamp)
+ else:
+ timestamp = None
+
+ client = oauth1.Client(
+ consumer_key,
+ client_secret=consumer_secret,
+ resource_owner_key=token_key,
+ resource_owner_secret=token_secret,
+ signature_method=oauth1.SIGNATURE_PLAINTEXT,
+ timestamp=timestamp)
+ uri, signed_headers, body = client.sign(url)
+ return signed_headers
diff --git a/cloudinit/util.py b/cloudinit/util.py
index 02ba654a..09e583f5 100644
--- a/cloudinit/util.py
+++ b/cloudinit/util.py
@@ -782,7 +782,8 @@ def read_file_or_url(url, timeout=5, retries=10,
code = e.errno
if e.errno == errno.ENOENT:
code = url_helper.NOT_FOUND
- raise url_helper.UrlError(cause=e, code=code, headers=None)
+ raise url_helper.UrlError(cause=e, code=code, headers=None,
+ url=url)
return url_helper.FileResponse(file_path, contents=contents)
else:
return url_helper.readurl(url,