summaryrefslogtreecommitdiff
path: root/cloudinit/tests/test_url_helper.py
diff options
context:
space:
mode:
Diffstat (limited to 'cloudinit/tests/test_url_helper.py')
-rw-r--r--cloudinit/tests/test_url_helper.py178
1 files changed, 0 insertions, 178 deletions
diff --git a/cloudinit/tests/test_url_helper.py b/cloudinit/tests/test_url_helper.py
deleted file mode 100644
index c3918f80..00000000
--- a/cloudinit/tests/test_url_helper.py
+++ /dev/null
@@ -1,178 +0,0 @@
-# This file is part of cloud-init. See LICENSE file for license information.
-
-from cloudinit.url_helper import (
- NOT_FOUND, UrlError, REDACTED, oauth_headers, read_file_or_url,
- retry_on_url_exc)
-from cloudinit.tests.helpers import CiTestCase, mock, skipIf
-from cloudinit import util
-from cloudinit import version
-
-import httpretty
-import logging
-import requests
-
-
-try:
- import oauthlib
- assert oauthlib # avoid pyflakes error F401: import unused
- _missing_oauthlib_dep = False
-except ImportError:
- _missing_oauthlib_dep = True
-
-
-M_PATH = 'cloudinit.url_helper.'
-
-
-class TestOAuthHeaders(CiTestCase):
-
- def test_oauth_headers_raises_not_implemented_when_oathlib_missing(self):
- """oauth_headers raises a NotImplemented error when oauth absent."""
- with mock.patch.dict('sys.modules', {'oauthlib': None}):
- with self.assertRaises(NotImplementedError) as context_manager:
- oauth_headers(1, 2, 3, 4, 5)
- self.assertEqual(
- 'oauth support is not available',
- str(context_manager.exception))
-
- @skipIf(_missing_oauthlib_dep, "No python-oauthlib dependency")
- @mock.patch('oauthlib.oauth1.Client')
- def test_oauth_headers_calls_oathlibclient_when_available(self, m_client):
- """oauth_headers calls oaut1.hClient.sign with the provided url."""
- class fakeclient(object):
- def sign(self, url):
- # The first and 3rd item of the client.sign tuple are ignored
- return ('junk', url, 'junk2')
-
- m_client.return_value = fakeclient()
-
- return_value = oauth_headers(
- 'url', 'consumer_key', 'token_key', 'token_secret',
- 'consumer_secret')
- self.assertEqual('url', return_value)
-
-
-class TestReadFileOrUrl(CiTestCase):
-
- with_logs = True
-
- def test_read_file_or_url_str_from_file(self):
- """Test that str(result.contents) on file is text version of contents.
- It should not be "b'data'", but just "'data'" """
- tmpf = self.tmp_path("myfile1")
- data = b'This is my file content\n'
- util.write_file(tmpf, data, omode="wb")
- result = read_file_or_url("file://%s" % tmpf)
- self.assertEqual(result.contents, data)
- self.assertEqual(str(result), data.decode('utf-8'))
-
- @httpretty.activate
- def test_read_file_or_url_str_from_url(self):
- """Test that str(result.contents) on url is text version of contents.
- It should not be "b'data'", but just "'data'" """
- url = 'http://hostname/path'
- data = b'This is my url content\n'
- httpretty.register_uri(httpretty.GET, url, data)
- result = read_file_or_url(url)
- self.assertEqual(result.contents, data)
- self.assertEqual(str(result), data.decode('utf-8'))
-
- @httpretty.activate
- def test_read_file_or_url_str_from_url_redacting_headers_from_logs(self):
- """Headers are redacted from logs but unredacted in requests."""
- url = 'http://hostname/path'
- headers = {'sensitive': 'sekret', 'server': 'blah'}
- httpretty.register_uri(httpretty.GET, url)
- # By default, httpretty will log our request along with the header,
- # so if we don't change this the secret will show up in the logs
- logging.getLogger('httpretty.core').setLevel(logging.CRITICAL)
-
- read_file_or_url(url, headers=headers, headers_redact=['sensitive'])
- logs = self.logs.getvalue()
- for k in headers.keys():
- self.assertEqual(headers[k], httpretty.last_request().headers[k])
- self.assertIn(REDACTED, logs)
- self.assertNotIn('sekret', logs)
-
- @httpretty.activate
- def test_read_file_or_url_str_from_url_redacts_noheaders(self):
- """When no headers_redact, header values are in logs and requests."""
- url = 'http://hostname/path'
- headers = {'sensitive': 'sekret', 'server': 'blah'}
- httpretty.register_uri(httpretty.GET, url)
-
- read_file_or_url(url, headers=headers)
- for k in headers.keys():
- self.assertEqual(headers[k], httpretty.last_request().headers[k])
- logs = self.logs.getvalue()
- self.assertNotIn(REDACTED, logs)
- self.assertIn('sekret', logs)
-
- @mock.patch(M_PATH + 'readurl')
- def test_read_file_or_url_passes_params_to_readurl(self, m_readurl):
- """read_file_or_url passes all params through to readurl."""
- url = 'http://hostname/path'
- response = 'This is my url content\n'
- m_readurl.return_value = response
- params = {'url': url, 'timeout': 1, 'retries': 2,
- 'headers': {'somehdr': 'val'},
- 'data': 'data', 'sec_between': 1,
- 'ssl_details': {'cert_file': '/path/cert.pem'},
- 'headers_cb': 'headers_cb', 'exception_cb': 'exception_cb'}
- self.assertEqual(response, read_file_or_url(**params))
- params.pop('url') # url is passed in as a positional arg
- self.assertEqual([mock.call(url, **params)], m_readurl.call_args_list)
-
- def test_wb_read_url_defaults_honored_by_read_file_or_url_callers(self):
- """Readurl param defaults used when unspecified by read_file_or_url
-
- Param defaults tested are as follows:
- retries: 0, additional headers None beyond default, method: GET,
- data: None, check_status: True and allow_redirects: True
- """
- url = 'http://hostname/path'
-
- m_response = mock.MagicMock()
-
- class FakeSession(requests.Session):
- @classmethod
- def request(cls, **kwargs):
- self.assertEqual(
- {'url': url, 'allow_redirects': True, 'method': 'GET',
- 'headers': {
- 'User-Agent': 'Cloud-Init/%s' % (
- version.version_string())}},
- kwargs)
- return m_response
-
- with mock.patch(M_PATH + 'requests.Session') as m_session:
- error = requests.exceptions.HTTPError('broke')
- m_session.side_effect = [error, FakeSession()]
- # assert no retries and check_status == True
- with self.assertRaises(UrlError) as context_manager:
- response = read_file_or_url(url)
- self.assertEqual('broke', str(context_manager.exception))
- # assert default headers, method, url and allow_redirects True
- # Success on 2nd call with FakeSession
- response = read_file_or_url(url)
- self.assertEqual(m_response, response._response)
-
-
-class TestRetryOnUrlExc(CiTestCase):
-
- def test_do_not_retry_non_urlerror(self):
- """When exception is not UrlError return False."""
- myerror = IOError('something unexcpected')
- self.assertFalse(retry_on_url_exc(msg='', exc=myerror))
-
- def test_perform_retries_on_not_found(self):
- """When exception is UrlError with a 404 status code return True."""
- myerror = UrlError(cause=RuntimeError(
- 'something was not found'), code=NOT_FOUND)
- self.assertTrue(retry_on_url_exc(msg='', exc=myerror))
-
- def test_perform_retries_on_timeout(self):
- """When exception is a requests.Timout return True."""
- myerror = UrlError(cause=requests.Timeout('something timed out'))
- self.assertTrue(retry_on_url_exc(msg='', exc=myerror))
-
-# vi: ts=4 expandtab