summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBen Murray <ben@duosecurity.com>2013-11-21 12:05:44 -0500
committerBen Murray <ben@duosecurity.com>2013-11-22 16:21:38 -0500
commit438f391a1a2fac8a0aec3f6f25bbf4305232aac6 (patch)
treee272fdda1a9c39fb7956e00d64b61a92c680e421
parente7fd788d6a11a9d8a13d7e249bac6c8bb610a16c (diff)
downloadopenvpn-duo-plugin-438f391a1a2fac8a0aec3f6f25bbf4305232aac6.tar.gz
openvpn-duo-plugin-438f391a1a2fac8a0aec3f6f25bbf4305232aac6.zip
Fixes #3 Add HTTPS proxy support (if using the Python helper script)
-rw-r--r--duo_openvpn.c37
-rwxr-xr-xduo_openvpn.py376
-rw-r--r--https_wrapper.py17
-rw-r--r--test_duo_openvpn.py479
4 files changed, 816 insertions, 93 deletions
diff --git a/duo_openvpn.c b/duo_openvpn.c
index 6dc2c03..afe36b5 100644
--- a/duo_openvpn.c
+++ b/duo_openvpn.c
@@ -20,6 +20,8 @@ struct context {
char *ikey;
char *skey;
char *host;
+ char *proxy_host;
+ char *proxy_port;
};
static const char *
@@ -74,6 +76,18 @@ auth_user_pass_verify(struct context *ctx, const char *args[], const char *envp[
setenv("ikey", ctx->ikey, 1);
setenv("skey", ctx->skey, 1);
setenv("host", ctx->host, 1);
+ if (ctx->proxy_host) {
+ setenv("proxy_host", ctx->proxy_host, 1);
+ }
+ else {
+ unsetenv("proxy_host");
+ }
+ if (ctx->proxy_port) {
+ setenv("proxy_port", ctx->proxy_port, 1);
+ }
+ else {
+ unsetenv("proxy_port");
+ }
}
setenv("control", control, 1);
@@ -110,6 +124,23 @@ openvpn_plugin_open_v2(unsigned int *type_mask, const char *argv[], const char *
ctx->host = strdup(argv[3]);
}
+ /* Passing proxy_host even if proxy_port is not present
+ * generates a more informative log message.
+ */
+ if (argv[4]) {
+ ctx->proxy_host = strdup(argv[4]);
+ if (argv[5]) {
+ ctx->proxy_port = strdup(argv[5]);
+ }
+ else {
+ ctx->proxy_port = NULL;
+ }
+ }
+ else {
+ ctx->proxy_host = NULL;
+ ctx->proxy_port = NULL;
+ }
+
*type_mask = OPENVPN_PLUGIN_MASK(OPENVPN_PLUGIN_AUTH_USER_PASS_VERIFY);
return (openvpn_plugin_handle_t) ctx;
@@ -123,5 +154,11 @@ openvpn_plugin_close_v1(openvpn_plugin_handle_t handle)
free(ctx->ikey);
free(ctx->skey);
free(ctx->host);
+ if (ctx->proxy_host) {
+ free(ctx->proxy_host);
+ }
+ if (ctx->proxy_port) {
+ free(ctx->proxy_port);
+ }
free(ctx);
}
diff --git a/duo_openvpn.py b/duo_openvpn.py
index b5f890a..4d91db4 100755
--- a/duo_openvpn.py
+++ b/duo_openvpn.py
@@ -1,18 +1,28 @@
#!/usr/bin/env python
#
# duo_openvpn.py
-# Duo OpenVPN v1
-# Copyright 2011 Duo Security, Inc.
-#
+# Duo OpenVPN
+# Copyright 2013 Duo Security, Inc.
+
+__version__ = '2.0'
-import syslog, sys
+import base64
+import email.utils
+import httplib
+import os
+import socket
+import sys
+import syslog
+import urllib
def log(msg):
msg = 'Duo OpenVPN: %s' % msg
syslog.syslog(msg)
try:
- import os, urllib, hashlib, hmac, base64, json
+ import hashlib
+ import hmac
+ import json
from https_wrapper import CertValidatingHTTPSConnection
except ImportError, e:
log('ImportError: %s' % e)
@@ -24,61 +34,255 @@ API_RESULT_ALLOW = 'allow'
API_RESULT_DENY = 'deny'
API_RESULT_ENROLL = 'enroll'
-ca_certs = os.path.join(os.path.dirname(__file__), 'ca_certs.pem')
-
-def canonicalize(method, host, uri, params):
- canon = [method.upper(), host.lower(), uri]
+DEFAULT_CA_CERTS = os.path.join(os.path.dirname(__file__), 'ca_certs.pem')
+def canon_params(params):
+ """
+ Return a canonical string version of the given request parameters.
+ """
+ # this is normalized the same as for OAuth 1.0,
+ # http://tools.ietf.org/html/rfc5849#section-3.4.1.3.2
args = []
- for key in sorted(params.keys()):
- val = params[key]
- arg = '%s=%s' % (urllib.quote(key, '~'), urllib.quote(val, '~'))
- args.append(arg)
- canon.append('&'.join(args))
-
+ for (key, vals) in sorted(
+ (urllib.quote(key, '~'), vals) for (key, vals) in params.items()):
+ for val in sorted(urllib.quote(val, '~') for val in vals):
+ args.append('%s=%s' % (key, val))
+ return '&'.join(args)
+
+def canonicalize(method, host, uri, params, date, sig_version):
+ """
+ Return a canonical string version of the given request attributes.
+ """
+ if sig_version == 1:
+ canon = []
+ elif sig_version == 2:
+ canon = [date]
+ else:
+ raise NotImplementedError(sig_version)
+
+ canon += [
+ method.upper(),
+ host.lower(),
+ uri,
+ canon_params(params),
+ ]
return '\n'.join(canon)
-def sign(ikey, skey, method, host, uri, params):
- sig = hmac.new(skey, canonicalize(method, host, uri, params), hashlib.sha1)
+def sign(ikey, skey, method, host, uri, date, sig_version, params):
+ """
+ Return basic authorization header line with a Duo Web API signature.
+ """
+ canonical = canonicalize(method, host, uri, params, date, sig_version)
+ if isinstance(skey, unicode):
+ skey = skey.encode('utf-8')
+ sig = hmac.new(skey, canonical, hashlib.sha1)
auth = '%s:%s' % (ikey, sig.hexdigest())
return 'Basic %s' % base64.b64encode(auth)
-def call(ikey, skey, host, method, path, **kwargs):
- sig = sign(ikey, skey, method, host, path, kwargs)
-
- headers = {
- 'Authorization': sig,
- 'User-agent': 'duo_openvpn/1.0',
- }
-
- if method in [ 'POST', 'PUT' ]:
- headers['Content-type'] = 'application/x-www-form-urlencoded'
- body = urllib.urlencode(kwargs, doseq=True)
- uri = path
- else:
- body = None
- uri = path + '?' + urllib.urlencode(kwargs, doseq=True)
-
- conn = CertValidatingHTTPSConnection(host, 443, ca_certs=ca_certs)
- conn.request(method, uri, body, headers)
- response = conn.getresponse()
- data = response.read()
- conn.close()
-
- return (response.status, response.reason, data)
-
-def api(ikey, skey, host, method, path, **kwargs):
- (status, reason, data) = call(ikey, skey, host, method, path, **kwargs)
- if status != 200:
- raise RuntimeError('Received %s %s: %s' % (status, reason, data))
-
- try:
- data = json.loads(data)
- if data['stat'] != 'OK':
- raise RuntimeError('Received error response: %s' % data)
- return data['response']
- except (ValueError, KeyError):
- raise RuntimeError('Received bad response: %s' % data)
+def normalize_params(params):
+ """
+ Return copy of params with strings listified
+ and unicode strings utf-8 encoded.
+ """
+ # urllib cannot handle unicode strings properly. quote() excepts,
+ # and urlencode() replaces them with '?'.
+ def encode(value):
+ if isinstance(value, unicode):
+ return value.encode("utf-8")
+ return value
+ def to_list(value):
+ if value is None or isinstance(value, basestring):
+ return [value]
+ return value
+ return dict(
+ (encode(key), [encode(v) for v in to_list(value)])
+ for (key, value) in params.items())
+
+class Client(object):
+ sig_version = 1
+
+ def __init__(self, ikey, skey, host,
+ ca_certs=DEFAULT_CA_CERTS,
+ sig_timezone='UTC', user_agent=None):
+ """
+ ca_certs - Path to CA pem file.
+ """
+ self.ikey = ikey
+ self.skey = skey
+ self.host = host
+ self.port = None
+ self.sig_timezone = sig_timezone
+ if ca_certs is None:
+ ca_certs = DEFAULT_CA_CERTS
+ self.ca_certs = ca_certs
+ self.user_agent = user_agent
+ self.set_proxy(host=None, proxy_type=None)
+ self.timeout = socket._GLOBAL_DEFAULT_TIMEOUT
+
+ def set_proxy(self, host, port=None, headers=None,
+ proxy_type='CONNECT'):
+ """
+ Configure proxy for API calls. Supported proxy_type values:
+
+ 'CONNECT' - HTTP proxy with CONNECT.
+ None - Disable proxy.
+ """
+ if proxy_type not in ('CONNECT', None):
+ raise NotImplementedError('proxy_type=%s' % (proxy_type,))
+ self.proxy_headers = headers
+ self.proxy_host = host
+ self.proxy_port = port
+ self.proxy_type = proxy_type
+
+ def api_call(self, method, path, params):
+ """
+ Call a Duo API method. Return a (status, reason, data) tuple.
+
+ * method: HTTP request method. E.g. "GET", "POST", or "DELETE".
+ * path: Full path of the API endpoint. E.g. "/auth/v2/ping".
+ * params: dict mapping from parameter name to stringified value.
+ """
+ params = normalize_params(params)
+ now = email.utils.formatdate()
+ auth = sign(self.ikey,
+ self.skey,
+ method,
+ self.host,
+ path,
+ now,
+ self.sig_version,
+ params)
+ headers = {
+ 'Authorization': auth,
+ 'Date': now,
+ 'Host': self.host,
+ }
+
+ if self.user_agent:
+ headers['User-Agent'] = self.user_agent
+
+ if method in ['POST', 'PUT']:
+ headers['Content-type'] = 'application/x-www-form-urlencoded'
+ body = urllib.urlencode(params, doseq=True)
+ uri = path
+ else:
+ body = None
+ uri = path + '?' + urllib.urlencode(params, doseq=True)
+
+ return self._make_request(method, uri, body, headers)
+
+ def _connect(self):
+ # Host and port for the HTTP(S) connection to the API server.
+ if self.ca_certs == 'HTTP':
+ api_port = 80
+ else:
+ api_port = 443
+ if self.port is not None:
+ api_port = self.port
+
+ # Host and port for outer HTTP(S) connection if proxied.
+ if self.proxy_type is None:
+ host = self.host
+ port = api_port
+ elif self.proxy_type == 'CONNECT':
+ host = self.proxy_host
+ port = self.proxy_port
+ else:
+ raise NotImplementedError('proxy_type=%s' % (self.proxy_type,))
+
+ # Create outer HTTP(S) connection.
+ if self.ca_certs == 'HTTP':
+ conn = httplib.HTTPConnection(host, port)
+ elif self.ca_certs == 'DISABLE':
+ conn = httplib.HTTPSConnection(host, port)
+ else:
+ conn = CertValidatingHTTPSConnection(host,
+ port,
+ ca_certs=self.ca_certs)
+
+ # Override default socket timeout if requested.
+ conn.timeout = self.timeout
+
+ # Configure CONNECT proxy tunnel, if any.
+ if self.proxy_type == 'CONNECT':
+ if hasattr(conn, 'set_tunnel'): # 2.7+
+ conn.set_tunnel(self.host,
+ api_port,
+ self.proxy_headers)
+ elif hasattr(conn, '_set_tunnel'): # 2.6.3+
+ # pylint: disable=E1103
+ conn._set_tunnel(self.host,
+ api_port,
+ self.proxy_headers)
+ # pylint: enable=E1103
+
+ return conn
+
+ def _make_request(self, method, uri, body, headers):
+ conn = self._connect()
+ if self.proxy_type == 'CONNECT':
+ # Ensure the request uses the correct protocol and Host.
+ if self.ca_certs == 'HTTP':
+ api_proto = 'http'
+ else:
+ api_proto = 'https'
+ uri = ''.join((api_proto, '://', self.host, uri))
+ conn.request(method, uri, body, headers)
+ response = conn.getresponse()
+ data = response.read()
+ self._disconnect(conn)
+ return (response, data)
+
+ def _disconnect(self, conn):
+ conn.close()
+
+ def json_api_call(self, method, path, params):
+ """
+ Call a Duo API method which is expected to return a JSON body
+ with a 200 status. Return the response data structure or raise
+ RuntimeError.
+ """
+ (response, data) = self.api_call(method, path, params)
+ return self.parse_json_response(response, data)
+
+ def parse_json_response(self, response, data):
+ """
+ Return the parsed data structure or raise RuntimeError.
+ """
+ def raise_error(msg):
+ error = RuntimeError(msg)
+ error.status = response.status
+ error.reason = response.reason
+ error.data = data
+ raise error
+ if response.status != 200:
+ try:
+ data = json.loads(data)
+ if data['stat'] == 'FAIL':
+ if 'message_detail' in data:
+ raise_error('Received %s %s (%s)' % (
+ response.status,
+ data['message'],
+ data['message_detail'],
+ ))
+ else:
+ raise_error('Received %s %s' % (
+ response.status,
+ data['message'],
+ ))
+ except (ValueError, KeyError, TypeError):
+ pass
+ raise_error('Received %s %s' % (
+ response.status,
+ response.reason,
+ ))
+ try:
+ data = json.loads(data)
+ if data['stat'] != 'OK':
+ raise_error('Received error response: %s' % data)
+ return data['response']
+ except (ValueError, KeyError, TypeError):
+ raise_error('Received bad response: %s' % data)
def success(control):
log('writing success code to %s' % control)
@@ -98,26 +302,18 @@ def failure(control):
sys.exit(1)
-def preauth(ikey, skey, host, control, username):
+def preauth(client, control, username):
log('pre-authentication for %s' % username)
- args = {
- 'user': username,
- }
-
- response = api(ikey, skey, host, 'POST', '/rest/v1/preauth', **args)
+ response = client.json_api_call('POST', '/rest/v1/preauth', {
+ 'user': username,
+ })
result = response.get('result')
-
- if not result:
- log('invalid API response: %s' % response)
- failure(control)
-
if result == API_RESULT_AUTH:
return
status = response.get('status')
-
if not status:
log('invalid API response: %s' % response)
failure(control)
@@ -135,17 +331,15 @@ def preauth(ikey, skey, host, control, username):
log('unknown preauth result: %s' % result)
failure(control)
-def auth(ikey, skey, host, control, username, password, ipaddr):
+def auth(client, control, username, password, ipaddr):
log('authentication for %s' % username)
- args = {
+ response = client.json_api_call('POST', '/rest/v1/auth', {
'user': username,
'factor': 'auto',
'auto': password,
- 'ipaddr': ipaddr
- }
-
- response = api(ikey, skey, host, 'POST', '/rest/v1/auth', **args)
+ 'ipaddr': ipaddr,
+ })
result = response.get('result')
status = response.get('status')
@@ -164,32 +358,44 @@ def auth(ikey, skey, host, control, username, password, ipaddr):
log('unknown auth result: %s' % result)
failure(control)
-def main():
- control = os.environ.get('control')
- username = os.environ.get('username')
- password = os.environ.get('password')
- ipaddr = os.environ.get('ipaddr', '0.0.0.0')
+def main(Client=Client, environ=os.environ):
+ control = environ.get('control')
+ username = environ.get('username')
+ password = environ.get('password')
+ ipaddr = environ.get('ipaddr', '0.0.0.0')
if not control or not username or not password:
log('required environment variables not found')
sys.exit(1)
- ikey = os.environ.get('ikey')
- skey = os.environ.get('skey')
- host = os.environ.get('host')
-
- if not ikey or not skey or not host:
- log('required ikey/skey/host configuration parameters not found')
- failure(control)
+ def get_config(k):
+ v = environ.get(k)
+ if v:
+ return v
+ else:
+ log('required configuration parameter "{0:s}" not found'.format(k))
+ failure(control)
+
+ client = Client(
+ ikey=get_config('ikey'),
+ skey=get_config('skey'),
+ host=get_config('host'),
+ user_agent='duo_openvpn/' + __version__,
+ )
+ if environ.get('proxy_host'):
+ client.set_proxy(
+ host=get_config('proxy_host'),
+ port=get_config('proxy_port'),
+ )
try:
- preauth(ikey, skey, host, control, username)
+ preauth(client, control, username)
except Exception, e:
log(str(e))
failure(control)
try:
- auth(ikey, skey, host, control, username, password, ipaddr)
+ auth(client, control, username, password, ipaddr)
except Exception, e:
log(str(e))
failure(control)
diff --git a/https_wrapper.py b/https_wrapper.py
index c3a9ce3..94c0c8a 100644
--- a/https_wrapper.py
+++ b/https_wrapper.py
@@ -1,8 +1,6 @@
-#!/usr/bin/env python
-#
-# Adapted from:
-# https://googleappengine.googlecode.com/svn-history/r136/trunk/python/google/appengine/tools/https_wrapper.py
-#
+### The following code was adapted from:
+### https://googleappengine.googlecode.com/svn-history/r136/trunk/python/google/appengine/tools/https_wrapper.py
+
# Copyright 2007 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -49,6 +47,7 @@ class InvalidCertificateException(httplib.HTTPException):
'http://code.google.com/appengine/kb/general.html#rpcssl' %
(self.host, self.reason, self.cert))
+
class CertValidatingHTTPSConnection(httplib.HTTPConnection):
"""An HTTPConnection that connects over SSL and validates certificates."""
@@ -109,9 +108,11 @@ class CertValidatingHTTPSConnection(httplib.HTTPConnection):
def connect(self):
"Connect to a host on a given (SSL) port."
- sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- sock.connect((self.host, self.port))
- self.sock = ssl.wrap_socket(sock, keyfile=self.key_file,
+ self.sock = socket.create_connection((self.host, self.port),
+ self.timeout)
+ if self._tunnel_host:
+ self._tunnel()
+ self.sock = ssl.wrap_socket(self.sock, keyfile=self.key_file,
certfile=self.cert_file,
cert_reqs=self.cert_reqs,
ca_certs=self.ca_certs)
diff --git a/test_duo_openvpn.py b/test_duo_openvpn.py
new file mode 100644
index 0000000..87d20d1
--- /dev/null
+++ b/test_duo_openvpn.py
@@ -0,0 +1,479 @@
+import StringIO
+import email.utils
+import json
+import os
+import tempfile
+import unittest
+
+import mox
+
+import duo_openvpn
+
+def mock_client_factory(mock):
+ """
+ Return a Client-alike that uses a mock instead of an HTTP
+ connection. Special case: Client.__init__() and set_proxy()
+ arguments are verified by calling mock.duo_client_init() and
+ mock.duo_client_set_proxy(), respectively.
+ """
+ class MockClient(duo_openvpn.Client):
+ def __init__(self, *args, **kwargs):
+ mock.duo_client_init(*args, **kwargs)
+ return super(MockClient, self).__init__(*args, **kwargs)
+
+ def set_proxy(self, *args, **kwargs):
+ mock.duo_client_set_proxy(*args, **kwargs)
+ return super(MockClient, self).set_proxy(*args, **kwargs)
+
+ def _connect(self):
+ return mock
+
+ return MockClient
+
+class MockResponse(StringIO.StringIO, object):
+ def __init__(self, status, body, reason='some reason'):
+ self.status = status
+ self.reason = reason
+ super(MockResponse, self).__init__(body)
+
+class TestIntegration(unittest.TestCase):
+ IKEY = 'expected ikey'
+ SKEY = 'expected skey'
+ HOST = 'expected hostname'
+ USERNAME = 'expected username'
+ PASSCODE = 'expected passcode'
+ IPADDR = 'expected_ipaddr'
+ PROXY_HOST = 'expected proxy host'
+ PROXY_PORT = 'expected proxy port'
+ EXPECTED_USER_AGENT = 'duo_openvpn/' + duo_openvpn.__version__
+ EXPECTED_PREAUTH_PARAMS = 'user=expected+username'
+ EXPECTED_AUTH_PATH = '/rest/v1/auth'
+ EXPECTED_PREAUTH_PATH = '/rest/v1/preauth'
+ EXPECTED_AUTH_PARAMS = (
+ 'auto=expected+passcode'
+ '&ipaddr=expected_ipaddr'
+ '&user=expected+username'
+ '&factor=auto'
+ )
+
+ def setUp(self):
+ self.mox = mox.Mox()
+ self.expected_calls = self.mox.CreateMockAnything()
+
+ def assert_auth(self, environ, expected_control,
+ send_control=True):
+ self.mox.ReplayAll()
+
+ with tempfile.NamedTemporaryFile() as control:
+ if send_control:
+ environ['control'] = control.name
+
+ with self.assertRaises(SystemExit) as cm:
+ duo_openvpn.main(
+ environ=environ,
+ Client=mock_client_factory(self.expected_calls),
+ )
+ self.mox.VerifyAll()
+
+ control.seek(0, os.SEEK_SET)
+ output = control.read()
+ self.assertEqual(expected_control, output)
+ if expected_control == '1':
+ self.assertEqual(0, cm.exception.args[0])
+ else:
+ self.assertEqual(1, cm.exception.args[0])
+
+ def normal_environ(self):
+ environ = {
+ 'ikey': self.IKEY,
+ 'skey': self.SKEY,
+ 'host': self.HOST,
+ 'username': self.USERNAME,
+ 'password': self.PASSCODE,
+ 'ipaddr': self.IPADDR,
+ }
+ self.expected_calls.duo_client_init(
+ ikey=self.IKEY,
+ skey=self.SKEY,
+ host=self.HOST,
+ user_agent=('duo_openvpn/' + duo_openvpn.__version__),
+ )
+ self.expected_calls.duo_client_set_proxy(
+ host=None,
+ proxy_type=None,
+ )
+ return environ
+
+ def expect_request(self, method, path, params, response=None, raises=None):
+ self.expected_calls.request(method, path, params, {
+ 'User-Agent': self.EXPECTED_USER_AGENT,
+ 'Host': self.HOST,
+ 'Content-type': 'application/x-www-form-urlencoded',
+ 'Authorization': mox.Func((lambda s: s.startswith('Basic '))),
+ 'Date': mox.Func((lambda s: bool(email.utils.parsedate_tz(s))))
+ },
+ )
+ meth = self.expected_calls.getresponse()
+ if raises is not None:
+ meth.AndRaise(raises)
+ else:
+ meth.AndReturn(response)
+ self.expected_calls.close()
+
+ def expect_preauth(self, result, path=EXPECTED_PREAUTH_PATH):
+ self.expect_request(
+ method='POST',
+ path=path,
+ params=self.EXPECTED_PREAUTH_PARAMS,
+ response=MockResponse(
+ status=200,
+ body=json.dumps({
+ 'stat': 'OK',
+ 'response': {
+ 'result': result,
+ 'status': 'expected status',
+ },
+ }),
+ ),
+ )
+
+ def expect_auth(self, result, path=EXPECTED_AUTH_PATH):
+ self.expect_request(
+ method='POST',
+ path=path,
+ params=self.EXPECTED_AUTH_PARAMS,
+ response=MockResponse(
+ status=200,
+ body=json.dumps({
+ 'stat': 'OK',
+ 'response': {
+ 'result': result,
+ 'status': 'expected status',
+ },
+ }),
+ ),
+ )
+
+ def test_preauth_allow(self):
+ environ = self.normal_environ()
+ self.expect_preauth('allow')
+ self.assert_auth(
+ environ=environ,
+ expected_control='1',
+ )
+
+ def test_preauth_deny(self):
+ environ = self.normal_environ()
+ self.expect_preauth('deny')
+ self.assert_auth(
+ environ=environ,
+ expected_control='0',
+ )
+
+ def test_preauth_enroll(self):
+ environ = self.normal_environ()
+ self.expect_preauth('enroll')
+ self.assert_auth(
+ environ=environ,
+ expected_control='0',
+ )
+
+ def test_preauth_bogus(self):
+ environ = self.normal_environ()
+ self.expect_preauth('bogus')
+ self.assert_auth(
+ environ=environ,
+ expected_control='0',
+ )
+
+ def test_preauth_missing_result(self):
+ environ = self.normal_environ()
+ self.expect_request(
+ method='POST',
+ path=self.EXPECTED_PREAUTH_PATH,
+ params=self.EXPECTED_PREAUTH_PARAMS,
+ response=MockResponse(
+ status=200,
+ body=json.dumps({
+ 'stat': 'OK',
+ 'response': {
+ 'status': 'expected status',
+ },
+ }),
+ ),
+ )
+ self.assert_auth(
+ environ=environ,
+ expected_control='0',
+ )
+
+ def test_preauth_missing_status(self):
+ environ = self.normal_environ()
+ self.expect_request(
+ method='POST',
+ path=self.EXPECTED_PREAUTH_PATH,
+ params=self.EXPECTED_PREAUTH_PARAMS,
+ response=MockResponse(
+ status=200,
+ body=json.dumps({
+ 'stat': 'OK',
+ 'response': {
+ 'result': 'deny',
+ },
+ }),
+ ),
+ )
+ self.assert_auth(
+ environ=environ,
+ expected_control='0',
+ )
+
+ def test_preauth_exception(self):
+ environ = self.normal_environ()
+ self.expect_request(
+ method='POST',
+ path=self.EXPECTED_PREAUTH_PATH,
+ params=self.EXPECTED_PREAUTH_PARAMS,
+ raises=Exception('whoops'),
+ )
+ self.assert_auth(
+ environ=environ,
+ expected_control='0',
+ )
+
+ def test_auth_allow(self):
+ environ = self.normal_environ()
+ self.expect_preauth('auth')
+ self.expect_auth('allow')
+ self.assert_auth(
+ environ=environ,
+ expected_control='1',
+ )
+
+ def test_auth_deny(self):
+ environ = self.normal_environ()
+ self.expect_preauth('auth')
+ self.expect_auth('deny')
+ self.assert_auth(
+ environ=environ,
+ expected_control='0',
+ )
+
+ def test_auth_bogus(self):
+ environ = self.normal_environ()
+ self.expect_preauth('auth')
+ self.expect_auth('bogus')
+ self.assert_auth(
+ environ=environ,
+ expected_control='0',
+ )
+
+ def test_auth_missing_reason(self):
+ environ = self.normal_environ()
+ self.expect_preauth('auth')
+ self.expect_request(
+ method='POST',
+ path=self.EXPECTED_AUTH_PATH,
+ params=self.EXPECTED_AUTH_PARAMS,
+ response=MockResponse(
+ status=200,
+ body=json.dumps({
+ 'stat': 'OK',
+ 'response': {
+ 'status': 'expected status',
+ },
+ }),
+ ),
+ )
+ self.assert_auth(
+ environ=environ,
+ expected_control='0',
+ )
+
+ def test_auth_missing_status(self):
+ environ = self.normal_environ()
+ self.expect_preauth('auth')
+ self.expect_request(
+ method='POST',
+ path=self.EXPECTED_AUTH_PATH,
+ params=self.EXPECTED_AUTH_PARAMS,
+ response=MockResponse(
+ status=200,
+ body=json.dumps({
+ 'stat': 'OK',
+ 'response': {
+ 'result': 'allow',
+ },
+ }),
+ ),
+ )
+ self.assert_auth(
+ environ=environ,
+ expected_control='0',
+ )
+
+ def test_auth_exception(self):
+ environ = self.normal_environ()
+ self.expect_preauth('auth')
+ self.expect_request(
+ method='POST',
+ path=self.EXPECTED_AUTH_PATH,
+ params=self.EXPECTED_AUTH_PARAMS,
+ raises=Exception('whoops'),
+ )
+ self.assert_auth(
+ environ=environ,
+ expected_control='0',
+ )
+
+ def test_auth_no_ipaddr(self):
+ environ = self.normal_environ()
+ environ.pop('ipaddr')
+ self.expect_preauth('auth')
+ self.expect_request(
+ method='POST',
+ path=self.EXPECTED_AUTH_PATH,
+ params=(
+ 'auto=expected+passcode'
+ '&ipaddr=0.0.0.0'
+ '&user=expected+username'
+ '&factor=auto'
+ ),
+ response=MockResponse(
+ status=200,
+ body=json.dumps({
+ 'stat': 'OK',
+ 'response': {
+ 'result': 'allow',
+ 'status': 'expected status',
+ },
+ }),
+ ),
+ )
+ self.assert_auth(
+ environ=environ,
+ expected_control='1',
+ )
+
+ def test_missing_control(self):
+ environ = {
+ 'ikey': self.IKEY,
+ 'skey': self.SKEY,
+ 'host': self.HOST,
+ 'password': self.PASSCODE,
+ 'username': self.USERNAME,
+ 'ipaddr': self.IPADDR,
+ }
+ self.assert_auth(
+ environ=environ,
+ send_control=False,
+ expected_control='',
+ )
+
+ def test_missing_username(self):
+ environ = {
+ 'ikey': self.IKEY,
+ 'skey': self.SKEY,
+ 'host': self.HOST,
+ 'password': self.PASSCODE,
+ 'ipaddr': self.IPADDR,
+ }
+ self.assert_auth(
+ environ=environ,
+ expected_control='',
+ )
+
+ def test_missing_password(self):
+ environ = {
+ 'ikey': self.IKEY,
+ 'skey': self.SKEY,
+ 'host': self.HOST,
+ 'username': self.USERNAME,
+ 'ipaddr': self.IPADDR,
+ }
+ self.assert_auth(
+ environ=environ,
+ expected_control='',
+ )
+
+ def test_missing_ikey(self):
+ environ = {
+ 'skey': self.SKEY,
+ 'host': self.HOST,
+ 'password': self.PASSCODE,
+ 'username': self.USERNAME,
+ 'ipaddr': self.IPADDR,
+ }
+ self.assert_auth(
+ environ=environ,
+ expected_control='0',
+ )
+
+ def test_missing_skey(self):
+ environ = {
+ 'ikey': self.IKEY,
+ 'host': self.HOST,
+ 'password': self.PASSCODE,
+ 'username': self.USERNAME,
+ 'ipaddr': self.IPADDR,
+ }
+ self.assert_auth(
+ environ=environ,
+ expected_control='0',
+ )
+
+ def test_missing_host(self):
+ environ = {
+ 'ikey': self.IKEY,
+ 'skey': self.SKEY,
+ 'password': self.PASSCODE,
+ 'username': self.USERNAME,
+ 'ipaddr': self.IPADDR,
+ }
+ self.assert_auth(
+ environ=environ,
+ expected_control='0',
+ )
+
+ def test_proxy_success(self):
+ environ = self.normal_environ()
+ environ['proxy_host'] = self.PROXY_HOST
+ environ['proxy_port'] = self.PROXY_PORT
+ self.expected_calls.duo_client_set_proxy(
+ host=self.PROXY_HOST,
+ port=self.PROXY_PORT,
+ )
+ self.expect_preauth(
+ result='auth',
+ path=('https://' + self.HOST + self.EXPECTED_PREAUTH_PATH),
+ )
+ self.expect_auth(
+ result='allow',
+ path=('https://' + self.HOST + self.EXPECTED_AUTH_PATH),
+ )
+ self.assert_auth(
+ environ=environ,
+ expected_control='1',
+ )
+
+ def test_proxy_missing_port(self):
+ environ = self.normal_environ()
+ environ['proxy_host'] = self.PROXY_HOST
+ self.assert_auth(
+ environ=environ,
+ expected_control='0',
+ )
+
+ def test_proxy_missing_host(self):
+ environ = self.normal_environ()
+ # proxy_port is ignored if proxy_host isn't present.
+ environ['proxy_port'] = self.PROXY_PORT
+ self.expect_preauth('auth')
+ self.expect_auth('allow')
+ self.assert_auth(
+ environ=environ,
+ expected_control='1',
+ )
+
+if __name__ == '__main__':
+ unittest.main()