# This file is part of cloud-init. See LICENSE file for license information. from cloudinit.url_helper import ( NOT_FOUND, UrlError, REDACTED, oauth_headers, read_file_or_url, retry_on_url_exc) from cloudinit.tests.helpers import CiTestCase, mock, skipIf from cloudinit import util from cloudinit import version import httpretty import logging import requests try: import oauthlib assert oauthlib # avoid pyflakes error F401: import unused _missing_oauthlib_dep = False except ImportError: _missing_oauthlib_dep = True M_PATH = 'cloudinit.url_helper.' class TestOAuthHeaders(CiTestCase): def test_oauth_headers_raises_not_implemented_when_oathlib_missing(self): """oauth_headers raises a NotImplemented error when oauth absent.""" with mock.patch.dict('sys.modules', {'oauthlib': None}): with self.assertRaises(NotImplementedError) as context_manager: oauth_headers(1, 2, 3, 4, 5) self.assertEqual( 'oauth support is not available', str(context_manager.exception)) @skipIf(_missing_oauthlib_dep, "No python-oauthlib dependency") @mock.patch('oauthlib.oauth1.Client') def test_oauth_headers_calls_oathlibclient_when_available(self, m_client): """oauth_headers calls oaut1.hClient.sign with the provided url.""" class fakeclient(object): def sign(self, url): # The first and 3rd item of the client.sign tuple are ignored return ('junk', url, 'junk2') m_client.return_value = fakeclient() return_value = oauth_headers( 'url', 'consumer_key', 'token_key', 'token_secret', 'consumer_secret') self.assertEqual('url', return_value) class TestReadFileOrUrl(CiTestCase): with_logs = True def test_read_file_or_url_str_from_file(self): """Test that str(result.contents) on file is text version of contents. It should not be "b'data'", but just "'data'" """ tmpf = self.tmp_path("myfile1") data = b'This is my file content\n' util.write_file(tmpf, data, omode="wb") result = read_file_or_url("file://%s" % tmpf) self.assertEqual(result.contents, data) self.assertEqual(str(result), data.decode('utf-8')) @httpretty.activate def test_read_file_or_url_str_from_url(self): """Test that str(result.contents) on url is text version of contents. It should not be "b'data'", but just "'data'" """ url = 'http://hostname/path' data = b'This is my url content\n' httpretty.register_uri(httpretty.GET, url, data) result = read_file_or_url(url) self.assertEqual(result.contents, data) self.assertEqual(str(result), data.decode('utf-8')) @httpretty.activate def test_read_file_or_url_str_from_url_redacting_headers_from_logs(self): """Headers are redacted from logs but unredacted in requests.""" url = 'http://hostname/path' headers = {'sensitive': 'sekret', 'server': 'blah'} httpretty.register_uri(httpretty.GET, url) # By default, httpretty will log our request along with the header, # so if we don't change this the secret will show up in the logs logging.getLogger('httpretty.core').setLevel(logging.CRITICAL) read_file_or_url(url, headers=headers, headers_redact=['sensitive']) logs = self.logs.getvalue() for k in headers.keys(): self.assertEqual(headers[k], httpretty.last_request().headers[k]) self.assertIn(REDACTED, logs) self.assertNotIn('sekret', logs) @httpretty.activate def test_read_file_or_url_str_from_url_redacts_noheaders(self): """When no headers_redact, header values are in logs and requests.""" url = 'http://hostname/path' headers = {'sensitive': 'sekret', 'server': 'blah'} httpretty.register_uri(httpretty.GET, url) read_file_or_url(url, headers=headers) for k in headers.keys(): self.assertEqual(headers[k], httpretty.last_request().headers[k]) logs = self.logs.getvalue() self.assertNotIn(REDACTED, logs) self.assertIn('sekret', logs) @mock.patch(M_PATH + 'readurl') def test_read_file_or_url_passes_params_to_readurl(self, m_readurl): """read_file_or_url passes all params through to readurl.""" url = 'http://hostname/path' response = 'This is my url content\n' m_readurl.return_value = response params = {'url': url, 'timeout': 1, 'retries': 2, 'headers': {'somehdr': 'val'}, 'data': 'data', 'sec_between': 1, 'ssl_details': {'cert_file': '/path/cert.pem'}, 'headers_cb': 'headers_cb', 'exception_cb': 'exception_cb'} self.assertEqual(response, read_file_or_url(**params)) params.pop('url') # url is passed in as a positional arg self.assertEqual([mock.call(url, **params)], m_readurl.call_args_list) def test_wb_read_url_defaults_honored_by_read_file_or_url_callers(self): """Readurl param defaults used when unspecified by read_file_or_url Param defaults tested are as follows: retries: 0, additional headers None beyond default, method: GET, data: None, check_status: True and allow_redirects: True """ url = 'http://hostname/path' m_response = mock.MagicMock() class FakeSession(requests.Session): @classmethod def request(cls, **kwargs): self.assertEqual( {'url': url, 'allow_redirects': True, 'method': 'GET', 'headers': { 'User-Agent': 'Cloud-Init/%s' % ( version.version_string())}}, kwargs) return m_response with mock.patch(M_PATH + 'requests.Session') as m_session: error = requests.exceptions.HTTPError('broke') m_session.side_effect = [error, FakeSession()] # assert no retries and check_status == True with self.assertRaises(UrlError) as context_manager: response = read_file_or_url(url) self.assertEqual('broke', str(context_manager.exception)) # assert default headers, method, url and allow_redirects True # Success on 2nd call with FakeSession response = read_file_or_url(url) self.assertEqual(m_response, response._response) class TestRetryOnUrlExc(CiTestCase): def test_do_not_retry_non_urlerror(self): """When exception is not UrlError return False.""" myerror = IOError('something unexcpected') self.assertFalse(retry_on_url_exc(msg='', exc=myerror)) def test_perform_retries_on_not_found(self): """When exception is UrlError with a 404 status code return True.""" myerror = UrlError(cause=RuntimeError( 'something was not found'), code=NOT_FOUND) self.assertTrue(retry_on_url_exc(msg='', exc=myerror)) def test_perform_retries_on_timeout(self): """When exception is a requests.Timout return True.""" myerror = UrlError(cause=requests.Timeout('something timed out')) self.assertTrue(retry_on_url_exc(msg='', exc=myerror)) # vi: ts=4 expandtab