summaryrefslogtreecommitdiff
path: root/tests/unittests/test_url_helper.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/unittests/test_url_helper.py')
-rw-r--r--tests/unittests/test_url_helper.py178
1 files changed, 178 insertions, 0 deletions
diff --git a/tests/unittests/test_url_helper.py b/tests/unittests/test_url_helper.py
new file mode 100644
index 00000000..501d9533
--- /dev/null
+++ b/tests/unittests/test_url_helper.py
@@ -0,0 +1,178 @@
+# This file is part of cloud-init. See LICENSE file for license information.
+
+from cloudinit.url_helper import (
+ NOT_FOUND, UrlError, REDACTED, oauth_headers, read_file_or_url,
+ retry_on_url_exc)
+from tests.unittests.helpers import CiTestCase, mock, skipIf
+from cloudinit import util
+from cloudinit import version
+
+import httpretty
+import logging
+import requests
+
+
+try:
+ import oauthlib
+ assert oauthlib # avoid pyflakes error F401: import unused
+ _missing_oauthlib_dep = False
+except ImportError:
+ _missing_oauthlib_dep = True
+
+
+M_PATH = 'cloudinit.url_helper.'
+
+
+class TestOAuthHeaders(CiTestCase):
+
+ def test_oauth_headers_raises_not_implemented_when_oathlib_missing(self):
+ """oauth_headers raises a NotImplemented error when oauth absent."""
+ with mock.patch.dict('sys.modules', {'oauthlib': None}):
+ with self.assertRaises(NotImplementedError) as context_manager:
+ oauth_headers(1, 2, 3, 4, 5)
+ self.assertEqual(
+ 'oauth support is not available',
+ str(context_manager.exception))
+
+ @skipIf(_missing_oauthlib_dep, "No python-oauthlib dependency")
+ @mock.patch('oauthlib.oauth1.Client')
+ def test_oauth_headers_calls_oathlibclient_when_available(self, m_client):
+ """oauth_headers calls oaut1.hClient.sign with the provided url."""
+ class fakeclient(object):
+ def sign(self, url):
+ # The first and 3rd item of the client.sign tuple are ignored
+ return ('junk', url, 'junk2')
+
+ m_client.return_value = fakeclient()
+
+ return_value = oauth_headers(
+ 'url', 'consumer_key', 'token_key', 'token_secret',
+ 'consumer_secret')
+ self.assertEqual('url', return_value)
+
+
+class TestReadFileOrUrl(CiTestCase):
+
+ with_logs = True
+
+ def test_read_file_or_url_str_from_file(self):
+ """Test that str(result.contents) on file is text version of contents.
+ It should not be "b'data'", but just "'data'" """
+ tmpf = self.tmp_path("myfile1")
+ data = b'This is my file content\n'
+ util.write_file(tmpf, data, omode="wb")
+ result = read_file_or_url("file://%s" % tmpf)
+ self.assertEqual(result.contents, data)
+ self.assertEqual(str(result), data.decode('utf-8'))
+
+ @httpretty.activate
+ def test_read_file_or_url_str_from_url(self):
+ """Test that str(result.contents) on url is text version of contents.
+ It should not be "b'data'", but just "'data'" """
+ url = 'http://hostname/path'
+ data = b'This is my url content\n'
+ httpretty.register_uri(httpretty.GET, url, data)
+ result = read_file_or_url(url)
+ self.assertEqual(result.contents, data)
+ self.assertEqual(str(result), data.decode('utf-8'))
+
+ @httpretty.activate
+ def test_read_file_or_url_str_from_url_redacting_headers_from_logs(self):
+ """Headers are redacted from logs but unredacted in requests."""
+ url = 'http://hostname/path'
+ headers = {'sensitive': 'sekret', 'server': 'blah'}
+ httpretty.register_uri(httpretty.GET, url)
+ # By default, httpretty will log our request along with the header,
+ # so if we don't change this the secret will show up in the logs
+ logging.getLogger('httpretty.core').setLevel(logging.CRITICAL)
+
+ read_file_or_url(url, headers=headers, headers_redact=['sensitive'])
+ logs = self.logs.getvalue()
+ for k in headers.keys():
+ self.assertEqual(headers[k], httpretty.last_request().headers[k])
+ self.assertIn(REDACTED, logs)
+ self.assertNotIn('sekret', logs)
+
+ @httpretty.activate
+ def test_read_file_or_url_str_from_url_redacts_noheaders(self):
+ """When no headers_redact, header values are in logs and requests."""
+ url = 'http://hostname/path'
+ headers = {'sensitive': 'sekret', 'server': 'blah'}
+ httpretty.register_uri(httpretty.GET, url)
+
+ read_file_or_url(url, headers=headers)
+ for k in headers.keys():
+ self.assertEqual(headers[k], httpretty.last_request().headers[k])
+ logs = self.logs.getvalue()
+ self.assertNotIn(REDACTED, logs)
+ self.assertIn('sekret', logs)
+
+ @mock.patch(M_PATH + 'readurl')
+ def test_read_file_or_url_passes_params_to_readurl(self, m_readurl):
+ """read_file_or_url passes all params through to readurl."""
+ url = 'http://hostname/path'
+ response = 'This is my url content\n'
+ m_readurl.return_value = response
+ params = {'url': url, 'timeout': 1, 'retries': 2,
+ 'headers': {'somehdr': 'val'},
+ 'data': 'data', 'sec_between': 1,
+ 'ssl_details': {'cert_file': '/path/cert.pem'},
+ 'headers_cb': 'headers_cb', 'exception_cb': 'exception_cb'}
+ self.assertEqual(response, read_file_or_url(**params))
+ params.pop('url') # url is passed in as a positional arg
+ self.assertEqual([mock.call(url, **params)], m_readurl.call_args_list)
+
+ def test_wb_read_url_defaults_honored_by_read_file_or_url_callers(self):
+ """Readurl param defaults used when unspecified by read_file_or_url
+
+ Param defaults tested are as follows:
+ retries: 0, additional headers None beyond default, method: GET,
+ data: None, check_status: True and allow_redirects: True
+ """
+ url = 'http://hostname/path'
+
+ m_response = mock.MagicMock()
+
+ class FakeSession(requests.Session):
+ @classmethod
+ def request(cls, **kwargs):
+ self.assertEqual(
+ {'url': url, 'allow_redirects': True, 'method': 'GET',
+ 'headers': {
+ 'User-Agent': 'Cloud-Init/%s' % (
+ version.version_string())}},
+ kwargs)
+ return m_response
+
+ with mock.patch(M_PATH + 'requests.Session') as m_session:
+ error = requests.exceptions.HTTPError('broke')
+ m_session.side_effect = [error, FakeSession()]
+ # assert no retries and check_status == True
+ with self.assertRaises(UrlError) as context_manager:
+ response = read_file_or_url(url)
+ self.assertEqual('broke', str(context_manager.exception))
+ # assert default headers, method, url and allow_redirects True
+ # Success on 2nd call with FakeSession
+ response = read_file_or_url(url)
+ self.assertEqual(m_response, response._response)
+
+
+class TestRetryOnUrlExc(CiTestCase):
+
+ def test_do_not_retry_non_urlerror(self):
+ """When exception is not UrlError return False."""
+ myerror = IOError('something unexcpected')
+ self.assertFalse(retry_on_url_exc(msg='', exc=myerror))
+
+ def test_perform_retries_on_not_found(self):
+ """When exception is UrlError with a 404 status code return True."""
+ myerror = UrlError(cause=RuntimeError(
+ 'something was not found'), code=NOT_FOUND)
+ self.assertTrue(retry_on_url_exc(msg='', exc=myerror))
+
+ def test_perform_retries_on_timeout(self):
+ """When exception is a requests.Timout return True."""
+ myerror = UrlError(cause=requests.Timeout('something timed out'))
+ self.assertTrue(retry_on_url_exc(msg='', exc=myerror))
+
+# vi: ts=4 expandtab