diff options
| author | Ben Murray <ben@duosecurity.com> | 2013-11-21 12:05:44 -0500 |
|---|---|---|
| committer | Ben Murray <ben@duosecurity.com> | 2013-11-22 16:21:38 -0500 |
| commit | 438f391a1a2fac8a0aec3f6f25bbf4305232aac6 (patch) | |
| tree | e272fdda1a9c39fb7956e00d64b61a92c680e421 | |
| parent | e7fd788d6a11a9d8a13d7e249bac6c8bb610a16c (diff) | |
| download | openvpn-duo-plugin-438f391a1a2fac8a0aec3f6f25bbf4305232aac6.tar.gz openvpn-duo-plugin-438f391a1a2fac8a0aec3f6f25bbf4305232aac6.zip | |
Fixes #3 Add HTTPS proxy support (if using the Python helper script)
| -rw-r--r-- | duo_openvpn.c | 37 | ||||
| -rwxr-xr-x | duo_openvpn.py | 376 | ||||
| -rw-r--r-- | https_wrapper.py | 17 | ||||
| -rw-r--r-- | test_duo_openvpn.py | 479 |
4 files changed, 816 insertions, 93 deletions
diff --git a/duo_openvpn.c b/duo_openvpn.c index 6dc2c03..afe36b5 100644 --- a/duo_openvpn.c +++ b/duo_openvpn.c @@ -20,6 +20,8 @@ struct context { char *ikey; char *skey; char *host; + char *proxy_host; + char *proxy_port; }; static const char * @@ -74,6 +76,18 @@ auth_user_pass_verify(struct context *ctx, const char *args[], const char *envp[ setenv("ikey", ctx->ikey, 1); setenv("skey", ctx->skey, 1); setenv("host", ctx->host, 1); + if (ctx->proxy_host) { + setenv("proxy_host", ctx->proxy_host, 1); + } + else { + unsetenv("proxy_host"); + } + if (ctx->proxy_port) { + setenv("proxy_port", ctx->proxy_port, 1); + } + else { + unsetenv("proxy_port"); + } } setenv("control", control, 1); @@ -110,6 +124,23 @@ openvpn_plugin_open_v2(unsigned int *type_mask, const char *argv[], const char * ctx->host = strdup(argv[3]); } + /* Passing proxy_host even if proxy_port is not present + * generates a more informative log message. + */ + if (argv[4]) { + ctx->proxy_host = strdup(argv[4]); + if (argv[5]) { + ctx->proxy_port = strdup(argv[5]); + } + else { + ctx->proxy_port = NULL; + } + } + else { + ctx->proxy_host = NULL; + ctx->proxy_port = NULL; + } + *type_mask = OPENVPN_PLUGIN_MASK(OPENVPN_PLUGIN_AUTH_USER_PASS_VERIFY); return (openvpn_plugin_handle_t) ctx; @@ -123,5 +154,11 @@ openvpn_plugin_close_v1(openvpn_plugin_handle_t handle) free(ctx->ikey); free(ctx->skey); free(ctx->host); + if (ctx->proxy_host) { + free(ctx->proxy_host); + } + if (ctx->proxy_port) { + free(ctx->proxy_port); + } free(ctx); } diff --git a/duo_openvpn.py b/duo_openvpn.py index b5f890a..4d91db4 100755 --- a/duo_openvpn.py +++ b/duo_openvpn.py @@ -1,18 +1,28 @@ #!/usr/bin/env python # # duo_openvpn.py -# Duo OpenVPN v1 -# Copyright 2011 Duo Security, Inc. -# +# Duo OpenVPN +# Copyright 2013 Duo Security, Inc. + +__version__ = '2.0' -import syslog, sys +import base64 +import email.utils +import httplib +import os +import socket +import sys +import syslog +import urllib def log(msg): msg = 'Duo OpenVPN: %s' % msg syslog.syslog(msg) try: - import os, urllib, hashlib, hmac, base64, json + import hashlib + import hmac + import json from https_wrapper import CertValidatingHTTPSConnection except ImportError, e: log('ImportError: %s' % e) @@ -24,61 +34,255 @@ API_RESULT_ALLOW = 'allow' API_RESULT_DENY = 'deny' API_RESULT_ENROLL = 'enroll' -ca_certs = os.path.join(os.path.dirname(__file__), 'ca_certs.pem') - -def canonicalize(method, host, uri, params): - canon = [method.upper(), host.lower(), uri] +DEFAULT_CA_CERTS = os.path.join(os.path.dirname(__file__), 'ca_certs.pem') +def canon_params(params): + """ + Return a canonical string version of the given request parameters. + """ + # this is normalized the same as for OAuth 1.0, + # http://tools.ietf.org/html/rfc5849#section-3.4.1.3.2 args = [] - for key in sorted(params.keys()): - val = params[key] - arg = '%s=%s' % (urllib.quote(key, '~'), urllib.quote(val, '~')) - args.append(arg) - canon.append('&'.join(args)) - + for (key, vals) in sorted( + (urllib.quote(key, '~'), vals) for (key, vals) in params.items()): + for val in sorted(urllib.quote(val, '~') for val in vals): + args.append('%s=%s' % (key, val)) + return '&'.join(args) + +def canonicalize(method, host, uri, params, date, sig_version): + """ + Return a canonical string version of the given request attributes. + """ + if sig_version == 1: + canon = [] + elif sig_version == 2: + canon = [date] + else: + raise NotImplementedError(sig_version) + + canon += [ + method.upper(), + host.lower(), + uri, + canon_params(params), + ] return '\n'.join(canon) -def sign(ikey, skey, method, host, uri, params): - sig = hmac.new(skey, canonicalize(method, host, uri, params), hashlib.sha1) +def sign(ikey, skey, method, host, uri, date, sig_version, params): + """ + Return basic authorization header line with a Duo Web API signature. + """ + canonical = canonicalize(method, host, uri, params, date, sig_version) + if isinstance(skey, unicode): + skey = skey.encode('utf-8') + sig = hmac.new(skey, canonical, hashlib.sha1) auth = '%s:%s' % (ikey, sig.hexdigest()) return 'Basic %s' % base64.b64encode(auth) -def call(ikey, skey, host, method, path, **kwargs): - sig = sign(ikey, skey, method, host, path, kwargs) - - headers = { - 'Authorization': sig, - 'User-agent': 'duo_openvpn/1.0', - } - - if method in [ 'POST', 'PUT' ]: - headers['Content-type'] = 'application/x-www-form-urlencoded' - body = urllib.urlencode(kwargs, doseq=True) - uri = path - else: - body = None - uri = path + '?' + urllib.urlencode(kwargs, doseq=True) - - conn = CertValidatingHTTPSConnection(host, 443, ca_certs=ca_certs) - conn.request(method, uri, body, headers) - response = conn.getresponse() - data = response.read() - conn.close() - - return (response.status, response.reason, data) - -def api(ikey, skey, host, method, path, **kwargs): - (status, reason, data) = call(ikey, skey, host, method, path, **kwargs) - if status != 200: - raise RuntimeError('Received %s %s: %s' % (status, reason, data)) - - try: - data = json.loads(data) - if data['stat'] != 'OK': - raise RuntimeError('Received error response: %s' % data) - return data['response'] - except (ValueError, KeyError): - raise RuntimeError('Received bad response: %s' % data) +def normalize_params(params): + """ + Return copy of params with strings listified + and unicode strings utf-8 encoded. + """ + # urllib cannot handle unicode strings properly. quote() excepts, + # and urlencode() replaces them with '?'. + def encode(value): + if isinstance(value, unicode): + return value.encode("utf-8") + return value + def to_list(value): + if value is None or isinstance(value, basestring): + return [value] + return value + return dict( + (encode(key), [encode(v) for v in to_list(value)]) + for (key, value) in params.items()) + +class Client(object): + sig_version = 1 + + def __init__(self, ikey, skey, host, + ca_certs=DEFAULT_CA_CERTS, + sig_timezone='UTC', user_agent=None): + """ + ca_certs - Path to CA pem file. + """ + self.ikey = ikey + self.skey = skey + self.host = host + self.port = None + self.sig_timezone = sig_timezone + if ca_certs is None: + ca_certs = DEFAULT_CA_CERTS + self.ca_certs = ca_certs + self.user_agent = user_agent + self.set_proxy(host=None, proxy_type=None) + self.timeout = socket._GLOBAL_DEFAULT_TIMEOUT + + def set_proxy(self, host, port=None, headers=None, + proxy_type='CONNECT'): + """ + Configure proxy for API calls. Supported proxy_type values: + + 'CONNECT' - HTTP proxy with CONNECT. + None - Disable proxy. + """ + if proxy_type not in ('CONNECT', None): + raise NotImplementedError('proxy_type=%s' % (proxy_type,)) + self.proxy_headers = headers + self.proxy_host = host + self.proxy_port = port + self.proxy_type = proxy_type + + def api_call(self, method, path, params): + """ + Call a Duo API method. Return a (status, reason, data) tuple. + + * method: HTTP request method. E.g. "GET", "POST", or "DELETE". + * path: Full path of the API endpoint. E.g. "/auth/v2/ping". + * params: dict mapping from parameter name to stringified value. + """ + params = normalize_params(params) + now = email.utils.formatdate() + auth = sign(self.ikey, + self.skey, + method, + self.host, + path, + now, + self.sig_version, + params) + headers = { + 'Authorization': auth, + 'Date': now, + 'Host': self.host, + } + + if self.user_agent: + headers['User-Agent'] = self.user_agent + + if method in ['POST', 'PUT']: + headers['Content-type'] = 'application/x-www-form-urlencoded' + body = urllib.urlencode(params, doseq=True) + uri = path + else: + body = None + uri = path + '?' + urllib.urlencode(params, doseq=True) + + return self._make_request(method, uri, body, headers) + + def _connect(self): + # Host and port for the HTTP(S) connection to the API server. + if self.ca_certs == 'HTTP': + api_port = 80 + else: + api_port = 443 + if self.port is not None: + api_port = self.port + + # Host and port for outer HTTP(S) connection if proxied. + if self.proxy_type is None: + host = self.host + port = api_port + elif self.proxy_type == 'CONNECT': + host = self.proxy_host + port = self.proxy_port + else: + raise NotImplementedError('proxy_type=%s' % (self.proxy_type,)) + + # Create outer HTTP(S) connection. + if self.ca_certs == 'HTTP': + conn = httplib.HTTPConnection(host, port) + elif self.ca_certs == 'DISABLE': + conn = httplib.HTTPSConnection(host, port) + else: + conn = CertValidatingHTTPSConnection(host, + port, + ca_certs=self.ca_certs) + + # Override default socket timeout if requested. + conn.timeout = self.timeout + + # Configure CONNECT proxy tunnel, if any. + if self.proxy_type == 'CONNECT': + if hasattr(conn, 'set_tunnel'): # 2.7+ + conn.set_tunnel(self.host, + api_port, + self.proxy_headers) + elif hasattr(conn, '_set_tunnel'): # 2.6.3+ + # pylint: disable=E1103 + conn._set_tunnel(self.host, + api_port, + self.proxy_headers) + # pylint: enable=E1103 + + return conn + + def _make_request(self, method, uri, body, headers): + conn = self._connect() + if self.proxy_type == 'CONNECT': + # Ensure the request uses the correct protocol and Host. + if self.ca_certs == 'HTTP': + api_proto = 'http' + else: + api_proto = 'https' + uri = ''.join((api_proto, '://', self.host, uri)) + conn.request(method, uri, body, headers) + response = conn.getresponse() + data = response.read() + self._disconnect(conn) + return (response, data) + + def _disconnect(self, conn): + conn.close() + + def json_api_call(self, method, path, params): + """ + Call a Duo API method which is expected to return a JSON body + with a 200 status. Return the response data structure or raise + RuntimeError. + """ + (response, data) = self.api_call(method, path, params) + return self.parse_json_response(response, data) + + def parse_json_response(self, response, data): + """ + Return the parsed data structure or raise RuntimeError. + """ + def raise_error(msg): + error = RuntimeError(msg) + error.status = response.status + error.reason = response.reason + error.data = data + raise error + if response.status != 200: + try: + data = json.loads(data) + if data['stat'] == 'FAIL': + if 'message_detail' in data: + raise_error('Received %s %s (%s)' % ( + response.status, + data['message'], + data['message_detail'], + )) + else: + raise_error('Received %s %s' % ( + response.status, + data['message'], + )) + except (ValueError, KeyError, TypeError): + pass + raise_error('Received %s %s' % ( + response.status, + response.reason, + )) + try: + data = json.loads(data) + if data['stat'] != 'OK': + raise_error('Received error response: %s' % data) + return data['response'] + except (ValueError, KeyError, TypeError): + raise_error('Received bad response: %s' % data) def success(control): log('writing success code to %s' % control) @@ -98,26 +302,18 @@ def failure(control): sys.exit(1) -def preauth(ikey, skey, host, control, username): +def preauth(client, control, username): log('pre-authentication for %s' % username) - args = { - 'user': username, - } - - response = api(ikey, skey, host, 'POST', '/rest/v1/preauth', **args) + response = client.json_api_call('POST', '/rest/v1/preauth', { + 'user': username, + }) result = response.get('result') - - if not result: - log('invalid API response: %s' % response) - failure(control) - if result == API_RESULT_AUTH: return status = response.get('status') - if not status: log('invalid API response: %s' % response) failure(control) @@ -135,17 +331,15 @@ def preauth(ikey, skey, host, control, username): log('unknown preauth result: %s' % result) failure(control) -def auth(ikey, skey, host, control, username, password, ipaddr): +def auth(client, control, username, password, ipaddr): log('authentication for %s' % username) - args = { + response = client.json_api_call('POST', '/rest/v1/auth', { 'user': username, 'factor': 'auto', 'auto': password, - 'ipaddr': ipaddr - } - - response = api(ikey, skey, host, 'POST', '/rest/v1/auth', **args) + 'ipaddr': ipaddr, + }) result = response.get('result') status = response.get('status') @@ -164,32 +358,44 @@ def auth(ikey, skey, host, control, username, password, ipaddr): log('unknown auth result: %s' % result) failure(control) -def main(): - control = os.environ.get('control') - username = os.environ.get('username') - password = os.environ.get('password') - ipaddr = os.environ.get('ipaddr', '0.0.0.0') +def main(Client=Client, environ=os.environ): + control = environ.get('control') + username = environ.get('username') + password = environ.get('password') + ipaddr = environ.get('ipaddr', '0.0.0.0') if not control or not username or not password: log('required environment variables not found') sys.exit(1) - ikey = os.environ.get('ikey') - skey = os.environ.get('skey') - host = os.environ.get('host') - - if not ikey or not skey or not host: - log('required ikey/skey/host configuration parameters not found') - failure(control) + def get_config(k): + v = environ.get(k) + if v: + return v + else: + log('required configuration parameter "{0:s}" not found'.format(k)) + failure(control) + + client = Client( + ikey=get_config('ikey'), + skey=get_config('skey'), + host=get_config('host'), + user_agent='duo_openvpn/' + __version__, + ) + if environ.get('proxy_host'): + client.set_proxy( + host=get_config('proxy_host'), + port=get_config('proxy_port'), + ) try: - preauth(ikey, skey, host, control, username) + preauth(client, control, username) except Exception, e: log(str(e)) failure(control) try: - auth(ikey, skey, host, control, username, password, ipaddr) + auth(client, control, username, password, ipaddr) except Exception, e: log(str(e)) failure(control) diff --git a/https_wrapper.py b/https_wrapper.py index c3a9ce3..94c0c8a 100644 --- a/https_wrapper.py +++ b/https_wrapper.py @@ -1,8 +1,6 @@ -#!/usr/bin/env python -# -# Adapted from: -# https://googleappengine.googlecode.com/svn-history/r136/trunk/python/google/appengine/tools/https_wrapper.py -# +### The following code was adapted from: +### https://googleappengine.googlecode.com/svn-history/r136/trunk/python/google/appengine/tools/https_wrapper.py + # Copyright 2007 Google Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -49,6 +47,7 @@ class InvalidCertificateException(httplib.HTTPException): 'http://code.google.com/appengine/kb/general.html#rpcssl' % (self.host, self.reason, self.cert)) + class CertValidatingHTTPSConnection(httplib.HTTPConnection): """An HTTPConnection that connects over SSL and validates certificates.""" @@ -109,9 +108,11 @@ class CertValidatingHTTPSConnection(httplib.HTTPConnection): def connect(self): "Connect to a host on a given (SSL) port." - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - sock.connect((self.host, self.port)) - self.sock = ssl.wrap_socket(sock, keyfile=self.key_file, + self.sock = socket.create_connection((self.host, self.port), + self.timeout) + if self._tunnel_host: + self._tunnel() + self.sock = ssl.wrap_socket(self.sock, keyfile=self.key_file, certfile=self.cert_file, cert_reqs=self.cert_reqs, ca_certs=self.ca_certs) diff --git a/test_duo_openvpn.py b/test_duo_openvpn.py new file mode 100644 index 0000000..87d20d1 --- /dev/null +++ b/test_duo_openvpn.py @@ -0,0 +1,479 @@ +import StringIO +import email.utils +import json +import os +import tempfile +import unittest + +import mox + +import duo_openvpn + +def mock_client_factory(mock): + """ + Return a Client-alike that uses a mock instead of an HTTP + connection. Special case: Client.__init__() and set_proxy() + arguments are verified by calling mock.duo_client_init() and + mock.duo_client_set_proxy(), respectively. + """ + class MockClient(duo_openvpn.Client): + def __init__(self, *args, **kwargs): + mock.duo_client_init(*args, **kwargs) + return super(MockClient, self).__init__(*args, **kwargs) + + def set_proxy(self, *args, **kwargs): + mock.duo_client_set_proxy(*args, **kwargs) + return super(MockClient, self).set_proxy(*args, **kwargs) + + def _connect(self): + return mock + + return MockClient + +class MockResponse(StringIO.StringIO, object): + def __init__(self, status, body, reason='some reason'): + self.status = status + self.reason = reason + super(MockResponse, self).__init__(body) + +class TestIntegration(unittest.TestCase): + IKEY = 'expected ikey' + SKEY = 'expected skey' + HOST = 'expected hostname' + USERNAME = 'expected username' + PASSCODE = 'expected passcode' + IPADDR = 'expected_ipaddr' + PROXY_HOST = 'expected proxy host' + PROXY_PORT = 'expected proxy port' + EXPECTED_USER_AGENT = 'duo_openvpn/' + duo_openvpn.__version__ + EXPECTED_PREAUTH_PARAMS = 'user=expected+username' + EXPECTED_AUTH_PATH = '/rest/v1/auth' + EXPECTED_PREAUTH_PATH = '/rest/v1/preauth' + EXPECTED_AUTH_PARAMS = ( + 'auto=expected+passcode' + '&ipaddr=expected_ipaddr' + '&user=expected+username' + '&factor=auto' + ) + + def setUp(self): + self.mox = mox.Mox() + self.expected_calls = self.mox.CreateMockAnything() + + def assert_auth(self, environ, expected_control, + send_control=True): + self.mox.ReplayAll() + + with tempfile.NamedTemporaryFile() as control: + if send_control: + environ['control'] = control.name + + with self.assertRaises(SystemExit) as cm: + duo_openvpn.main( + environ=environ, + Client=mock_client_factory(self.expected_calls), + ) + self.mox.VerifyAll() + + control.seek(0, os.SEEK_SET) + output = control.read() + self.assertEqual(expected_control, output) + if expected_control == '1': + self.assertEqual(0, cm.exception.args[0]) + else: + self.assertEqual(1, cm.exception.args[0]) + + def normal_environ(self): + environ = { + 'ikey': self.IKEY, + 'skey': self.SKEY, + 'host': self.HOST, + 'username': self.USERNAME, + 'password': self.PASSCODE, + 'ipaddr': self.IPADDR, + } + self.expected_calls.duo_client_init( + ikey=self.IKEY, + skey=self.SKEY, + host=self.HOST, + user_agent=('duo_openvpn/' + duo_openvpn.__version__), + ) + self.expected_calls.duo_client_set_proxy( + host=None, + proxy_type=None, + ) + return environ + + def expect_request(self, method, path, params, response=None, raises=None): + self.expected_calls.request(method, path, params, { + 'User-Agent': self.EXPECTED_USER_AGENT, + 'Host': self.HOST, + 'Content-type': 'application/x-www-form-urlencoded', + 'Authorization': mox.Func((lambda s: s.startswith('Basic '))), + 'Date': mox.Func((lambda s: bool(email.utils.parsedate_tz(s)))) + }, + ) + meth = self.expected_calls.getresponse() + if raises is not None: + meth.AndRaise(raises) + else: + meth.AndReturn(response) + self.expected_calls.close() + + def expect_preauth(self, result, path=EXPECTED_PREAUTH_PATH): + self.expect_request( + method='POST', + path=path, + params=self.EXPECTED_PREAUTH_PARAMS, + response=MockResponse( + status=200, + body=json.dumps({ + 'stat': 'OK', + 'response': { + 'result': result, + 'status': 'expected status', + }, + }), + ), + ) + + def expect_auth(self, result, path=EXPECTED_AUTH_PATH): + self.expect_request( + method='POST', + path=path, + params=self.EXPECTED_AUTH_PARAMS, + response=MockResponse( + status=200, + body=json.dumps({ + 'stat': 'OK', + 'response': { + 'result': result, + 'status': 'expected status', + }, + }), + ), + ) + + def test_preauth_allow(self): + environ = self.normal_environ() + self.expect_preauth('allow') + self.assert_auth( + environ=environ, + expected_control='1', + ) + + def test_preauth_deny(self): + environ = self.normal_environ() + self.expect_preauth('deny') + self.assert_auth( + environ=environ, + expected_control='0', + ) + + def test_preauth_enroll(self): + environ = self.normal_environ() + self.expect_preauth('enroll') + self.assert_auth( + environ=environ, + expected_control='0', + ) + + def test_preauth_bogus(self): + environ = self.normal_environ() + self.expect_preauth('bogus') + self.assert_auth( + environ=environ, + expected_control='0', + ) + + def test_preauth_missing_result(self): + environ = self.normal_environ() + self.expect_request( + method='POST', + path=self.EXPECTED_PREAUTH_PATH, + params=self.EXPECTED_PREAUTH_PARAMS, + response=MockResponse( + status=200, + body=json.dumps({ + 'stat': 'OK', + 'response': { + 'status': 'expected status', + }, + }), + ), + ) + self.assert_auth( + environ=environ, + expected_control='0', + ) + + def test_preauth_missing_status(self): + environ = self.normal_environ() + self.expect_request( + method='POST', + path=self.EXPECTED_PREAUTH_PATH, + params=self.EXPECTED_PREAUTH_PARAMS, + response=MockResponse( + status=200, + body=json.dumps({ + 'stat': 'OK', + 'response': { + 'result': 'deny', + }, + }), + ), + ) + self.assert_auth( + environ=environ, + expected_control='0', + ) + + def test_preauth_exception(self): + environ = self.normal_environ() + self.expect_request( + method='POST', + path=self.EXPECTED_PREAUTH_PATH, + params=self.EXPECTED_PREAUTH_PARAMS, + raises=Exception('whoops'), + ) + self.assert_auth( + environ=environ, + expected_control='0', + ) + + def test_auth_allow(self): + environ = self.normal_environ() + self.expect_preauth('auth') + self.expect_auth('allow') + self.assert_auth( + environ=environ, + expected_control='1', + ) + + def test_auth_deny(self): + environ = self.normal_environ() + self.expect_preauth('auth') + self.expect_auth('deny') + self.assert_auth( + environ=environ, + expected_control='0', + ) + + def test_auth_bogus(self): + environ = self.normal_environ() + self.expect_preauth('auth') + self.expect_auth('bogus') + self.assert_auth( + environ=environ, + expected_control='0', + ) + + def test_auth_missing_reason(self): + environ = self.normal_environ() + self.expect_preauth('auth') + self.expect_request( + method='POST', + path=self.EXPECTED_AUTH_PATH, + params=self.EXPECTED_AUTH_PARAMS, + response=MockResponse( + status=200, + body=json.dumps({ + 'stat': 'OK', + 'response': { + 'status': 'expected status', + }, + }), + ), + ) + self.assert_auth( + environ=environ, + expected_control='0', + ) + + def test_auth_missing_status(self): + environ = self.normal_environ() + self.expect_preauth('auth') + self.expect_request( + method='POST', + path=self.EXPECTED_AUTH_PATH, + params=self.EXPECTED_AUTH_PARAMS, + response=MockResponse( + status=200, + body=json.dumps({ + 'stat': 'OK', + 'response': { + 'result': 'allow', + }, + }), + ), + ) + self.assert_auth( + environ=environ, + expected_control='0', + ) + + def test_auth_exception(self): + environ = self.normal_environ() + self.expect_preauth('auth') + self.expect_request( + method='POST', + path=self.EXPECTED_AUTH_PATH, + params=self.EXPECTED_AUTH_PARAMS, + raises=Exception('whoops'), + ) + self.assert_auth( + environ=environ, + expected_control='0', + ) + + def test_auth_no_ipaddr(self): + environ = self.normal_environ() + environ.pop('ipaddr') + self.expect_preauth('auth') + self.expect_request( + method='POST', + path=self.EXPECTED_AUTH_PATH, + params=( + 'auto=expected+passcode' + '&ipaddr=0.0.0.0' + '&user=expected+username' + '&factor=auto' + ), + response=MockResponse( + status=200, + body=json.dumps({ + 'stat': 'OK', + 'response': { + 'result': 'allow', + 'status': 'expected status', + }, + }), + ), + ) + self.assert_auth( + environ=environ, + expected_control='1', + ) + + def test_missing_control(self): + environ = { + 'ikey': self.IKEY, + 'skey': self.SKEY, + 'host': self.HOST, + 'password': self.PASSCODE, + 'username': self.USERNAME, + 'ipaddr': self.IPADDR, + } + self.assert_auth( + environ=environ, + send_control=False, + expected_control='', + ) + + def test_missing_username(self): + environ = { + 'ikey': self.IKEY, + 'skey': self.SKEY, + 'host': self.HOST, + 'password': self.PASSCODE, + 'ipaddr': self.IPADDR, + } + self.assert_auth( + environ=environ, + expected_control='', + ) + + def test_missing_password(self): + environ = { + 'ikey': self.IKEY, + 'skey': self.SKEY, + 'host': self.HOST, + 'username': self.USERNAME, + 'ipaddr': self.IPADDR, + } + self.assert_auth( + environ=environ, + expected_control='', + ) + + def test_missing_ikey(self): + environ = { + 'skey': self.SKEY, + 'host': self.HOST, + 'password': self.PASSCODE, + 'username': self.USERNAME, + 'ipaddr': self.IPADDR, + } + self.assert_auth( + environ=environ, + expected_control='0', + ) + + def test_missing_skey(self): + environ = { + 'ikey': self.IKEY, + 'host': self.HOST, + 'password': self.PASSCODE, + 'username': self.USERNAME, + 'ipaddr': self.IPADDR, + } + self.assert_auth( + environ=environ, + expected_control='0', + ) + + def test_missing_host(self): + environ = { + 'ikey': self.IKEY, + 'skey': self.SKEY, + 'password': self.PASSCODE, + 'username': self.USERNAME, + 'ipaddr': self.IPADDR, + } + self.assert_auth( + environ=environ, + expected_control='0', + ) + + def test_proxy_success(self): + environ = self.normal_environ() + environ['proxy_host'] = self.PROXY_HOST + environ['proxy_port'] = self.PROXY_PORT + self.expected_calls.duo_client_set_proxy( + host=self.PROXY_HOST, + port=self.PROXY_PORT, + ) + self.expect_preauth( + result='auth', + path=('https://' + self.HOST + self.EXPECTED_PREAUTH_PATH), + ) + self.expect_auth( + result='allow', + path=('https://' + self.HOST + self.EXPECTED_AUTH_PATH), + ) + self.assert_auth( + environ=environ, + expected_control='1', + ) + + def test_proxy_missing_port(self): + environ = self.normal_environ() + environ['proxy_host'] = self.PROXY_HOST + self.assert_auth( + environ=environ, + expected_control='0', + ) + + def test_proxy_missing_host(self): + environ = self.normal_environ() + # proxy_port is ignored if proxy_host isn't present. + environ['proxy_port'] = self.PROXY_PORT + self.expect_preauth('auth') + self.expect_auth('allow') + self.assert_auth( + environ=environ, + expected_control='1', + ) + +if __name__ == '__main__': + unittest.main() |
