diff options
Diffstat (limited to 'tests/unittests/test_url_helper.py')
-rw-r--r-- | tests/unittests/test_url_helper.py | 200 |
1 files changed, 200 insertions, 0 deletions
diff --git a/tests/unittests/test_url_helper.py b/tests/unittests/test_url_helper.py new file mode 100644 index 00000000..85810e00 --- /dev/null +++ b/tests/unittests/test_url_helper.py @@ -0,0 +1,200 @@ +# This file is part of cloud-init. See LICENSE file for license information. + +import logging + +import httpretty +import requests + +from cloudinit import util, version +from cloudinit.url_helper import ( + NOT_FOUND, + REDACTED, + UrlError, + oauth_headers, + read_file_or_url, + retry_on_url_exc, +) +from tests.unittests.helpers import CiTestCase, mock, skipIf + +try: + import oauthlib + + assert oauthlib # avoid pyflakes error F401: import unused + _missing_oauthlib_dep = False +except ImportError: + _missing_oauthlib_dep = True + + +M_PATH = "cloudinit.url_helper." + + +class TestOAuthHeaders(CiTestCase): + def test_oauth_headers_raises_not_implemented_when_oathlib_missing(self): + """oauth_headers raises a NotImplemented error when oauth absent.""" + with mock.patch.dict("sys.modules", {"oauthlib": None}): + with self.assertRaises(NotImplementedError) as context_manager: + oauth_headers(1, 2, 3, 4, 5) + self.assertEqual( + "oauth support is not available", str(context_manager.exception) + ) + + @skipIf(_missing_oauthlib_dep, "No python-oauthlib dependency") + @mock.patch("oauthlib.oauth1.Client") + def test_oauth_headers_calls_oathlibclient_when_available(self, m_client): + """oauth_headers calls oaut1.hClient.sign with the provided url.""" + + class fakeclient(object): + def sign(self, url): + # The first and 3rd item of the client.sign tuple are ignored + return ("junk", url, "junk2") + + m_client.return_value = fakeclient() + + return_value = oauth_headers( + "url", + "consumer_key", + "token_key", + "token_secret", + "consumer_secret", + ) + self.assertEqual("url", return_value) + + +class TestReadFileOrUrl(CiTestCase): + + with_logs = True + + def test_read_file_or_url_str_from_file(self): + """Test that str(result.contents) on file is text version of contents. + It should not be "b'data'", but just "'data'" """ + tmpf = self.tmp_path("myfile1") + data = b"This is my file content\n" + util.write_file(tmpf, data, omode="wb") + result = read_file_or_url("file://%s" % tmpf) + self.assertEqual(result.contents, data) + self.assertEqual(str(result), data.decode("utf-8")) + + @httpretty.activate + def test_read_file_or_url_str_from_url(self): + """Test that str(result.contents) on url is text version of contents. + It should not be "b'data'", but just "'data'" """ + url = "http://hostname/path" + data = b"This is my url content\n" + httpretty.register_uri(httpretty.GET, url, data) + result = read_file_or_url(url) + self.assertEqual(result.contents, data) + self.assertEqual(str(result), data.decode("utf-8")) + + @httpretty.activate + def test_read_file_or_url_str_from_url_redacting_headers_from_logs(self): + """Headers are redacted from logs but unredacted in requests.""" + url = "http://hostname/path" + headers = {"sensitive": "sekret", "server": "blah"} + httpretty.register_uri(httpretty.GET, url) + # By default, httpretty will log our request along with the header, + # so if we don't change this the secret will show up in the logs + logging.getLogger("httpretty.core").setLevel(logging.CRITICAL) + + read_file_or_url(url, headers=headers, headers_redact=["sensitive"]) + logs = self.logs.getvalue() + for k in headers.keys(): + self.assertEqual(headers[k], httpretty.last_request().headers[k]) + self.assertIn(REDACTED, logs) + self.assertNotIn("sekret", logs) + + @httpretty.activate + def test_read_file_or_url_str_from_url_redacts_noheaders(self): + """When no headers_redact, header values are in logs and requests.""" + url = "http://hostname/path" + headers = {"sensitive": "sekret", "server": "blah"} + httpretty.register_uri(httpretty.GET, url) + + read_file_or_url(url, headers=headers) + for k in headers.keys(): + self.assertEqual(headers[k], httpretty.last_request().headers[k]) + logs = self.logs.getvalue() + self.assertNotIn(REDACTED, logs) + self.assertIn("sekret", logs) + + @mock.patch(M_PATH + "readurl") + def test_read_file_or_url_passes_params_to_readurl(self, m_readurl): + """read_file_or_url passes all params through to readurl.""" + url = "http://hostname/path" + response = "This is my url content\n" + m_readurl.return_value = response + params = { + "url": url, + "timeout": 1, + "retries": 2, + "headers": {"somehdr": "val"}, + "data": "data", + "sec_between": 1, + "ssl_details": {"cert_file": "/path/cert.pem"}, + "headers_cb": "headers_cb", + "exception_cb": "exception_cb", + } + self.assertEqual(response, read_file_or_url(**params)) + params.pop("url") # url is passed in as a positional arg + self.assertEqual([mock.call(url, **params)], m_readurl.call_args_list) + + def test_wb_read_url_defaults_honored_by_read_file_or_url_callers(self): + """Readurl param defaults used when unspecified by read_file_or_url + + Param defaults tested are as follows: + retries: 0, additional headers None beyond default, method: GET, + data: None, check_status: True and allow_redirects: True + """ + url = "http://hostname/path" + + m_response = mock.MagicMock() + + class FakeSession(requests.Session): + @classmethod + def request(cls, **kwargs): + self.assertEqual( + { + "url": url, + "allow_redirects": True, + "method": "GET", + "headers": { + "User-Agent": "Cloud-Init/%s" + % (version.version_string()) + }, + }, + kwargs, + ) + return m_response + + with mock.patch(M_PATH + "requests.Session") as m_session: + error = requests.exceptions.HTTPError("broke") + m_session.side_effect = [error, FakeSession()] + # assert no retries and check_status == True + with self.assertRaises(UrlError) as context_manager: + response = read_file_or_url(url) + self.assertEqual("broke", str(context_manager.exception)) + # assert default headers, method, url and allow_redirects True + # Success on 2nd call with FakeSession + response = read_file_or_url(url) + self.assertEqual(m_response, response._response) + + +class TestRetryOnUrlExc(CiTestCase): + def test_do_not_retry_non_urlerror(self): + """When exception is not UrlError return False.""" + myerror = IOError("something unexcpected") + self.assertFalse(retry_on_url_exc(msg="", exc=myerror)) + + def test_perform_retries_on_not_found(self): + """When exception is UrlError with a 404 status code return True.""" + myerror = UrlError( + cause=RuntimeError("something was not found"), code=NOT_FOUND + ) + self.assertTrue(retry_on_url_exc(msg="", exc=myerror)) + + def test_perform_retries_on_timeout(self): + """When exception is a requests.Timout return True.""" + myerror = UrlError(cause=requests.Timeout("something timed out")) + self.assertTrue(retry_on_url_exc(msg="", exc=myerror)) + + +# vi: ts=4 expandtab |