summaryrefslogtreecommitdiff
path: root/tests/unittests/test_url_helper.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/unittests/test_url_helper.py')
-rw-r--r--tests/unittests/test_url_helper.py134
1 files changed, 78 insertions, 56 deletions
diff --git a/tests/unittests/test_url_helper.py b/tests/unittests/test_url_helper.py
index 501d9533..85810e00 100644
--- a/tests/unittests/test_url_helper.py
+++ b/tests/unittests/test_url_helper.py
@@ -1,54 +1,63 @@
# This file is part of cloud-init. See LICENSE file for license information.
-from cloudinit.url_helper import (
- NOT_FOUND, UrlError, REDACTED, oauth_headers, read_file_or_url,
- retry_on_url_exc)
-from tests.unittests.helpers import CiTestCase, mock, skipIf
-from cloudinit import util
-from cloudinit import version
+import logging
import httpretty
-import logging
import requests
+from cloudinit import util, version
+from cloudinit.url_helper import (
+ NOT_FOUND,
+ REDACTED,
+ UrlError,
+ oauth_headers,
+ read_file_or_url,
+ retry_on_url_exc,
+)
+from tests.unittests.helpers import CiTestCase, mock, skipIf
try:
import oauthlib
+
assert oauthlib # avoid pyflakes error F401: import unused
_missing_oauthlib_dep = False
except ImportError:
_missing_oauthlib_dep = True
-M_PATH = 'cloudinit.url_helper.'
+M_PATH = "cloudinit.url_helper."
class TestOAuthHeaders(CiTestCase):
-
def test_oauth_headers_raises_not_implemented_when_oathlib_missing(self):
"""oauth_headers raises a NotImplemented error when oauth absent."""
- with mock.patch.dict('sys.modules', {'oauthlib': None}):
+ with mock.patch.dict("sys.modules", {"oauthlib": None}):
with self.assertRaises(NotImplementedError) as context_manager:
oauth_headers(1, 2, 3, 4, 5)
self.assertEqual(
- 'oauth support is not available',
- str(context_manager.exception))
+ "oauth support is not available", str(context_manager.exception)
+ )
@skipIf(_missing_oauthlib_dep, "No python-oauthlib dependency")
- @mock.patch('oauthlib.oauth1.Client')
+ @mock.patch("oauthlib.oauth1.Client")
def test_oauth_headers_calls_oathlibclient_when_available(self, m_client):
"""oauth_headers calls oaut1.hClient.sign with the provided url."""
+
class fakeclient(object):
def sign(self, url):
# The first and 3rd item of the client.sign tuple are ignored
- return ('junk', url, 'junk2')
+ return ("junk", url, "junk2")
m_client.return_value = fakeclient()
return_value = oauth_headers(
- 'url', 'consumer_key', 'token_key', 'token_secret',
- 'consumer_secret')
- self.assertEqual('url', return_value)
+ "url",
+ "consumer_key",
+ "token_key",
+ "token_secret",
+ "consumer_secret",
+ )
+ self.assertEqual("url", return_value)
class TestReadFileOrUrl(CiTestCase):
@@ -59,45 +68,45 @@ class TestReadFileOrUrl(CiTestCase):
"""Test that str(result.contents) on file is text version of contents.
It should not be "b'data'", but just "'data'" """
tmpf = self.tmp_path("myfile1")
- data = b'This is my file content\n'
+ data = b"This is my file content\n"
util.write_file(tmpf, data, omode="wb")
result = read_file_or_url("file://%s" % tmpf)
self.assertEqual(result.contents, data)
- self.assertEqual(str(result), data.decode('utf-8'))
+ self.assertEqual(str(result), data.decode("utf-8"))
@httpretty.activate
def test_read_file_or_url_str_from_url(self):
"""Test that str(result.contents) on url is text version of contents.
It should not be "b'data'", but just "'data'" """
- url = 'http://hostname/path'
- data = b'This is my url content\n'
+ url = "http://hostname/path"
+ data = b"This is my url content\n"
httpretty.register_uri(httpretty.GET, url, data)
result = read_file_or_url(url)
self.assertEqual(result.contents, data)
- self.assertEqual(str(result), data.decode('utf-8'))
+ self.assertEqual(str(result), data.decode("utf-8"))
@httpretty.activate
def test_read_file_or_url_str_from_url_redacting_headers_from_logs(self):
"""Headers are redacted from logs but unredacted in requests."""
- url = 'http://hostname/path'
- headers = {'sensitive': 'sekret', 'server': 'blah'}
+ url = "http://hostname/path"
+ headers = {"sensitive": "sekret", "server": "blah"}
httpretty.register_uri(httpretty.GET, url)
# By default, httpretty will log our request along with the header,
# so if we don't change this the secret will show up in the logs
- logging.getLogger('httpretty.core').setLevel(logging.CRITICAL)
+ logging.getLogger("httpretty.core").setLevel(logging.CRITICAL)
- read_file_or_url(url, headers=headers, headers_redact=['sensitive'])
+ read_file_or_url(url, headers=headers, headers_redact=["sensitive"])
logs = self.logs.getvalue()
for k in headers.keys():
self.assertEqual(headers[k], httpretty.last_request().headers[k])
self.assertIn(REDACTED, logs)
- self.assertNotIn('sekret', logs)
+ self.assertNotIn("sekret", logs)
@httpretty.activate
def test_read_file_or_url_str_from_url_redacts_noheaders(self):
"""When no headers_redact, header values are in logs and requests."""
- url = 'http://hostname/path'
- headers = {'sensitive': 'sekret', 'server': 'blah'}
+ url = "http://hostname/path"
+ headers = {"sensitive": "sekret", "server": "blah"}
httpretty.register_uri(httpretty.GET, url)
read_file_or_url(url, headers=headers)
@@ -105,21 +114,27 @@ class TestReadFileOrUrl(CiTestCase):
self.assertEqual(headers[k], httpretty.last_request().headers[k])
logs = self.logs.getvalue()
self.assertNotIn(REDACTED, logs)
- self.assertIn('sekret', logs)
+ self.assertIn("sekret", logs)
- @mock.patch(M_PATH + 'readurl')
+ @mock.patch(M_PATH + "readurl")
def test_read_file_or_url_passes_params_to_readurl(self, m_readurl):
"""read_file_or_url passes all params through to readurl."""
- url = 'http://hostname/path'
- response = 'This is my url content\n'
+ url = "http://hostname/path"
+ response = "This is my url content\n"
m_readurl.return_value = response
- params = {'url': url, 'timeout': 1, 'retries': 2,
- 'headers': {'somehdr': 'val'},
- 'data': 'data', 'sec_between': 1,
- 'ssl_details': {'cert_file': '/path/cert.pem'},
- 'headers_cb': 'headers_cb', 'exception_cb': 'exception_cb'}
+ params = {
+ "url": url,
+ "timeout": 1,
+ "retries": 2,
+ "headers": {"somehdr": "val"},
+ "data": "data",
+ "sec_between": 1,
+ "ssl_details": {"cert_file": "/path/cert.pem"},
+ "headers_cb": "headers_cb",
+ "exception_cb": "exception_cb",
+ }
self.assertEqual(response, read_file_or_url(**params))
- params.pop('url') # url is passed in as a positional arg
+ params.pop("url") # url is passed in as a positional arg
self.assertEqual([mock.call(url, **params)], m_readurl.call_args_list)
def test_wb_read_url_defaults_honored_by_read_file_or_url_callers(self):
@@ -129,7 +144,7 @@ class TestReadFileOrUrl(CiTestCase):
retries: 0, additional headers None beyond default, method: GET,
data: None, check_status: True and allow_redirects: True
"""
- url = 'http://hostname/path'
+ url = "http://hostname/path"
m_response = mock.MagicMock()
@@ -137,20 +152,26 @@ class TestReadFileOrUrl(CiTestCase):
@classmethod
def request(cls, **kwargs):
self.assertEqual(
- {'url': url, 'allow_redirects': True, 'method': 'GET',
- 'headers': {
- 'User-Agent': 'Cloud-Init/%s' % (
- version.version_string())}},
- kwargs)
+ {
+ "url": url,
+ "allow_redirects": True,
+ "method": "GET",
+ "headers": {
+ "User-Agent": "Cloud-Init/%s"
+ % (version.version_string())
+ },
+ },
+ kwargs,
+ )
return m_response
- with mock.patch(M_PATH + 'requests.Session') as m_session:
- error = requests.exceptions.HTTPError('broke')
+ with mock.patch(M_PATH + "requests.Session") as m_session:
+ error = requests.exceptions.HTTPError("broke")
m_session.side_effect = [error, FakeSession()]
# assert no retries and check_status == True
with self.assertRaises(UrlError) as context_manager:
response = read_file_or_url(url)
- self.assertEqual('broke', str(context_manager.exception))
+ self.assertEqual("broke", str(context_manager.exception))
# assert default headers, method, url and allow_redirects True
# Success on 2nd call with FakeSession
response = read_file_or_url(url)
@@ -158,21 +179,22 @@ class TestReadFileOrUrl(CiTestCase):
class TestRetryOnUrlExc(CiTestCase):
-
def test_do_not_retry_non_urlerror(self):
"""When exception is not UrlError return False."""
- myerror = IOError('something unexcpected')
- self.assertFalse(retry_on_url_exc(msg='', exc=myerror))
+ myerror = IOError("something unexcpected")
+ self.assertFalse(retry_on_url_exc(msg="", exc=myerror))
def test_perform_retries_on_not_found(self):
"""When exception is UrlError with a 404 status code return True."""
- myerror = UrlError(cause=RuntimeError(
- 'something was not found'), code=NOT_FOUND)
- self.assertTrue(retry_on_url_exc(msg='', exc=myerror))
+ myerror = UrlError(
+ cause=RuntimeError("something was not found"), code=NOT_FOUND
+ )
+ self.assertTrue(retry_on_url_exc(msg="", exc=myerror))
def test_perform_retries_on_timeout(self):
"""When exception is a requests.Timout return True."""
- myerror = UrlError(cause=requests.Timeout('something timed out'))
- self.assertTrue(retry_on_url_exc(msg='', exc=myerror))
+ myerror = UrlError(cause=requests.Timeout("something timed out"))
+ self.assertTrue(retry_on_url_exc(msg="", exc=myerror))
+
# vi: ts=4 expandtab