diff options
Diffstat (limited to 'cloudinit')
-rw-r--r-- | cloudinit/reporting/handlers.py | 28 | ||||
-rw-r--r-- | cloudinit/sources/DataSourceMAAS.py | 88 | ||||
-rw-r--r-- | cloudinit/url_helper.py | 142 | ||||
-rw-r--r-- | cloudinit/util.py | 3 |
4 files changed, 186 insertions, 75 deletions
diff --git a/cloudinit/reporting/handlers.py b/cloudinit/reporting/handlers.py index 86cbe3c3..d8f69641 100644 --- a/cloudinit/reporting/handlers.py +++ b/cloudinit/reporting/handlers.py @@ -34,5 +34,33 @@ class LogHandler(ReportingHandler): logger.info(event.as_string()) +class WebHookHandler(ReportingHandler): + def __init__(self, endpoint, consumer_key=None, token_key=None, + token_secret=None, consumer_secret=None, timeout=None, + retries=None): + super(WebHookHandler, self).__init__() + + if any(consumer_key, token_key, token_secret, consumer_secret): + self.oauth_helper = url_helper.OauthHelper( + consumer_key=consumer_key, token_key=token_key, + token_secret=token_secret, consumer_secret=consumer_secret) + else: + self.oauth_helper = None + self.endpoint = endpoint + self.timeout = timeout + self.retries = retries + self.ssl_details = util.fetch_ssl_details() + + def publish_event(self, event): + if self.oauth_helper: + readurl = self.oauth_helper.readurl + else: + readurl = url_helper.readurl + return readurl( + self.endpoint, data=event.as_dict(), + timeout=self.timeout, + retries=self.retries, ssl_details=self.ssl_details) + + available_handlers = DictRegistry() available_handlers.register_item('log', LogHandler) diff --git a/cloudinit/sources/DataSourceMAAS.py b/cloudinit/sources/DataSourceMAAS.py index c1a0eb61..279da238 100644 --- a/cloudinit/sources/DataSourceMAAS.py +++ b/cloudinit/sources/DataSourceMAAS.py @@ -52,7 +52,20 @@ class DataSourceMAAS(sources.DataSource): sources.DataSource.__init__(self, sys_cfg, distro, paths) self.base_url = None self.seed_dir = os.path.join(paths.seed_dir, 'maas') - self.oauth_clockskew = None + self.oauth_helper = self._get_helper() + + def _get_helper(self): + mcfg = self.ds_cfg + # If we are missing token_key, token_secret or consumer_key + # then just do non-authed requests + for required in ('token_key', 'token_secret', 'consumer_key'): + if required not in mcfg: + return url_helper.OauthUrlHelper() + + return url_helper.OauthHelper( + consumer_key=mcfg['consumer_key'], token_key=mcfg['token_key'], + token_secret=mcfg['token_secret'], + consumer_secret=mcfg.get('consumer_secret')) def __str__(self): root = sources.DataSource.__str__(self) @@ -84,9 +97,9 @@ class DataSourceMAAS(sources.DataSource): self.base_url = url - (userdata, metadata) = read_maas_seed_url(self.base_url, - self._md_headers, - paths=self.paths) + (userdata, metadata) = read_maas_seed_url( + self.base_url, self.oauth_helper.md_headers, + paths=self.paths) self.userdata_raw = userdata self.metadata = metadata return True @@ -94,31 +107,8 @@ class DataSourceMAAS(sources.DataSource): util.logexc(LOG, "Failed fetching metadata from url %s", url) return False - def _md_headers(self, url): - mcfg = self.ds_cfg - - # If we are missing token_key, token_secret or consumer_key - # then just do non-authed requests - for required in ('token_key', 'token_secret', 'consumer_key'): - if required not in mcfg: - return {} - - consumer_secret = mcfg.get('consumer_secret', "") - - timestamp = None - if self.oauth_clockskew: - timestamp = int(time.time()) + self.oauth_clockskew - - return oauth_headers(url=url, - consumer_key=mcfg['consumer_key'], - token_key=mcfg['token_key'], - token_secret=mcfg['token_secret'], - consumer_secret=consumer_secret, - timestamp=timestamp) - def wait_for_metadata_service(self, url): mcfg = self.ds_cfg - max_wait = 120 try: max_wait = int(mcfg.get("max_wait", max_wait)) @@ -138,10 +128,8 @@ class DataSourceMAAS(sources.DataSource): starttime = time.time() check_url = "%s/%s/meta-data/instance-id" % (url, MD_VERSION) urls = [check_url] - url = url_helper.wait_for_url(urls=urls, max_wait=max_wait, - timeout=timeout, - exception_cb=self._except_cb, - headers_cb=self._md_headers) + url = self.oauth_helper.wait_for_url( + urls=urls, max_wait=max_wait, timeout=timeout) if url: LOG.debug("Using metadata source: '%s'", url) @@ -151,26 +139,6 @@ class DataSourceMAAS(sources.DataSource): return bool(url) - def _except_cb(self, msg, exception): - if not (isinstance(exception, url_helper.UrlError) and - (exception.code == 403 or exception.code == 401)): - return - - if 'date' not in exception.headers: - LOG.warn("Missing header 'date' in %s response", exception.code) - return - - date = exception.headers['date'] - try: - ret_time = time.mktime(parsedate(date)) - except Exception as e: - LOG.warn("Failed to convert datetime '%s': %s", date, e) - return - - self.oauth_clockskew = int(ret_time - time.time()) - LOG.warn("Setting oauth clockskew to %d", self.oauth_clockskew) - return - def read_maas_seed_dir(seed_d): """ @@ -280,24 +248,6 @@ def check_seed_contents(content, seed): return (userdata, md) -def oauth_headers(url, consumer_key, token_key, token_secret, consumer_secret, - timestamp=None): - if timestamp: - timestamp = str(timestamp) - else: - timestamp = None - - client = oauth1.Client( - consumer_key, - client_secret=consumer_secret, - resource_owner_key=token_key, - resource_owner_secret=token_secret, - signature_method=oauth1.SIGNATURE_PLAINTEXT, - timestamp=timestamp) - uri, signed_headers, body = client.sign(url) - return signed_headers - - class MAASSeedDirNone(Exception): pass diff --git a/cloudinit/url_helper.py b/cloudinit/url_helper.py index 0e65f431..2141cdc5 100644 --- a/cloudinit/url_helper.py +++ b/cloudinit/url_helper.py @@ -25,6 +25,10 @@ import time import six import requests +import oauthlib.oauth1 as oauth1 +import os +import json +from functools import partial from requests import exceptions from six.moves.urllib.parse import ( @@ -147,13 +151,14 @@ class UrlResponse(object): class UrlError(IOError): - def __init__(self, cause, code=None, headers=None): + def __init__(self, cause, code=None, headers=None, url=None): IOError.__init__(self, str(cause)) self.cause = cause self.code = code self.headers = headers if self.headers is None: self.headers = {} + self.url = url def _get_ssl_args(url, ssl_details): @@ -247,9 +252,10 @@ def readurl(url, data=None, timeout=None, retries=0, sec_between=1, and hasattr(e, 'response') # This appeared in v 0.10.8 and hasattr(e.response, 'status_code')): excps.append(UrlError(e, code=e.response.status_code, - headers=e.response.headers)) + headers=e.response.headers, + url=url)) else: - excps.append(UrlError(e)) + excps.append(UrlError(e, url=url)) if SSL_ENABLED and isinstance(e, exceptions.SSLError): # ssl exceptions are not going to get fixed by waiting a # few seconds @@ -333,11 +339,11 @@ def wait_for_url(urls, max_wait=None, timeout=None, if not response.contents: reason = "empty response [%s]" % (response.code) url_exc = UrlError(ValueError(reason), code=response.code, - headers=response.headers) + headers=response.headers, url=url) elif not response.ok(): reason = "bad status code [%s]" % (response.code) url_exc = UrlError(ValueError(reason), code=response.code, - headers=response.headers) + headers=response.headers, url=url) else: return url except UrlError as e: @@ -368,3 +374,129 @@ def wait_for_url(urls, max_wait=None, timeout=None, time.sleep(sleep_time) return False + + +class OauthUrlHelper(object): + def __init__(self, consumer_key=None, token_key=None, + token_secret=None, consumer_secret=None, + skew_data_file="/run/oauth_skew.json"): + self.consumer_key = consumer_key + self.consumer_secret = consumer_secret or "" + self.token_key = token_key + self.token_secret = token_secret + self.skew_data_file = skew_data_file + self.skew_data = {} + self._do_oauth = True + self.skew_change_limit = 5 + required = (self.token_key, self.token_secret, self.consumer_key) + if not any(required): + self._do_oauth = False + elif not all(required): + raise ValueError("all or none of token_key, token_secret, or " + "consumer_key can be set") + + self.skew_data = self.read_skew_file() + + def read_skew_file(self): + if self.skew_data_file and os.path.isfile(self.skew_data_file): + with open(self.skew_data_file, mode="r") as fp: + return json.load(fp.read()) + return None + + def update_skew_file(self, host, value): + # this is not atomic + cur = self.read_skew_file() + if cur is None or not self.skew_data_file: + return + cur[host] = value + with open(self.skew_data_file, mode="w") as fp: + fp.write(json.dumps(cur)) + + def exception_cb(self, msg, exception): + if not (isinstance(exception, UrlError) and + (exception.code == 403 or exception.code == 401)): + return + + if 'date' not in exception.headers: + LOG.warn("Missing header 'date' in %s response", exception.code) + return + + date = exception.headers['date'] + try: + ret_time = time.mktime(parsedate(date)) + except Exception as e: + LOG.warn("Failed to convert datetime '%s': %s", date, e) + return + + host = urlparse(exception.url).netloc + skew = int(ret_time - time.time()) + old_skew = self.skew_data.get(host) + if abs(old_skew - skew) > self.skew_change_limit: + self.update_skew_file(host, skew) + LOG.warn("Setting oauth clockskew for %s to %d", + host, skew) + skew_data[host] = skew + + return + + def headers_cb(self, url): + if not self._do_oauth: + return {} + + timestamp = None + host = urlparse(url).netloc + if host in self.skew_data: + timestamp = int(time.time()) + self.skew_data[host] + + return oauth_headers( + url=url, consumer_key=self.consumer_key, + token_key=self.token_key, token_secret=self.token_secret, + consumer_secret=self.consumer_secret, timestamp=timestamp) + + def _wrapped(self, wrapped_func, args, kwargs): + kwargs['headers_cb'] = partial( + self._headers_cb, kwargs.get('headers_cb')) + kwargs['exception_cb'] = partial( + self._exception_cb, kwargs.get('exception_cb')) + return wrapped_func(*args, **kwargs) + + def wait_for_url(self, *args, **kwargs): + return self._wrapped(wait_for_url, args, kwargs) + + def readurl(self, *args, **kwargs): + return self._wrapped(readurl, args, kwargs) + + def _exception_cb(self, extra_exception_cb, url, msg, exception): + ret = None + try: + if extra_exception_cb: + ret = extra_exception_cb(msg, exception) + finally: + self.exception_cb(self, msg, exception) + return ret + + def _headers_cb(self, extra_headers_cb, url): + headers = {} + if extra_headers_cb: + headers = extra_headers_cb(url) + if headers: + headers.update(self.headers_cb(url)) + return headers + + +def oauth_headers(url, consumer_key, token_key, token_secret, consumer_secret, + timestamp=None): + if timestamp: + timestamp = str(timestamp) + else: + timestamp = None + + client = oauth1.Client( + consumer_key, + client_secret=consumer_secret, + resource_owner_key=token_key, + resource_owner_secret=token_secret, + signature_method=oauth1.SIGNATURE_PLAINTEXT, + timestamp=timestamp) + uri, signed_headers, body = client.sign(url) + return signed_headers diff --git a/cloudinit/util.py b/cloudinit/util.py index 02ba654a..09e583f5 100644 --- a/cloudinit/util.py +++ b/cloudinit/util.py @@ -782,7 +782,8 @@ def read_file_or_url(url, timeout=5, retries=10, code = e.errno if e.errno == errno.ENOENT: code = url_helper.NOT_FOUND - raise url_helper.UrlError(cause=e, code=code, headers=None) + raise url_helper.UrlError(cause=e, code=code, headers=None, + url=url) return url_helper.FileResponse(file_path, contents=contents) else: return url_helper.readurl(url, |