summaryrefslogtreecommitdiff
path: root/src/libfast/dispatcher.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/libfast/dispatcher.c')
-rw-r--r--src/libfast/dispatcher.c261
1 files changed, 165 insertions, 96 deletions
diff --git a/src/libfast/dispatcher.c b/src/libfast/dispatcher.c
index 35ae55814..7690230d3 100644
--- a/src/libfast/dispatcher.c
+++ b/src/libfast/dispatcher.c
@@ -19,12 +19,18 @@
#include "session.h"
#include <fcgiapp.h>
-#include <pthread.h>
#include <signal.h>
#include <unistd.h>
#include <debug.h>
+#include <threading/thread.h>
+#include <threading/condvar.h>
+#include <threading/mutex.h>
#include <utils/linked_list.h>
+#include <utils/hashtable.h>
+
+/** Intervall to check for expired sessions, in seconds */
+#define CLEANUP_INTERVAL 30
typedef struct private_dispatcher_t private_dispatcher_t;
@@ -37,57 +43,62 @@ struct private_dispatcher_t {
* public functions
*/
dispatcher_t public;
-
+
/**
* fcgi socket fd
*/
int fd;
-
+
/**
* thread list
*/
- pthread_t *threads;
-
+ thread_t **threads;
+
/**
* number of threads in "threads"
*/
int thread_count;
-
+
/**
* session locking mutex
*/
- pthread_mutex_t mutex;
-
+ mutex_t *mutex;
+
/**
- * List of sessions
+ * Hahstable with active sessions
*/
- linked_list_t *sessions;
-
+ hashtable_t *sessions;
+
/**
* session timeout
*/
time_t timeout;
-
+
+ /**
+ * timestamp of last session cleanup round
+ */
+ time_t last_cleanup;
+
/**
* running in debug mode?
*/
bool debug;
-
+
/**
* List of controllers controller_constructor_t
*/
linked_list_t *controllers;
-
+
/**
* List of filters filter_constructor_t
*/
linked_list_t *filters;
-
- /**
+
+ /**
* constructor function to create session context (in controller_entry_t)
*/
context_constructor_t context_constructor;
-
+
/**
* user param to context constructor
*/
@@ -112,7 +123,7 @@ typedef struct {
/** session instance */
session_t *session;
/** condvar to wait for session */
- pthread_cond_t cond;
+ condvar_t *cond;
/** client host address, to prevent session hijacking */
char *host;
/** TRUE if session is in use */
@@ -128,36 +139,36 @@ typedef struct {
*/
static session_t* load_session(private_dispatcher_t *this)
{
- iterator_t *iterator;
+ enumerator_t *enumerator;
controller_entry_t *centry;
filter_entry_t *fentry;
session_t *session;
context_t *context = NULL;
controller_t *controller;
filter_t *filter;
-
+
if (this->context_constructor)
{
context = this->context_constructor(this->param);
}
session = session_create(context);
-
- iterator = this->controllers->create_iterator(this->controllers, TRUE);
- while (iterator->iterate(iterator, (void**)&centry))
+
+ enumerator = this->controllers->create_enumerator(this->controllers);
+ while (enumerator->enumerate(enumerator, &centry))
{
controller = centry->constructor(context, centry->param);
session->add_controller(session, controller);
}
- iterator->destroy(iterator);
-
- iterator = this->filters->create_iterator(this->filters, TRUE);
- while (iterator->iterate(iterator, (void**)&fentry))
+ enumerator->destroy(enumerator);
+
+ enumerator = this->filters->create_enumerator(this->filters);
+ while (enumerator->enumerate(enumerator, &fentry))
{
filter = fentry->constructor(context, fentry->param);
session->add_filter(session, filter);
}
- iterator->destroy(iterator);
-
+ enumerator->destroy(enumerator);
+
return session;
}
@@ -168,21 +179,25 @@ static session_entry_t *session_entry_create(private_dispatcher_t *this,
char *host)
{
session_entry_t *entry;
-
+
entry = malloc_thing(session_entry_t);
entry->in_use = FALSE;
entry->closed = FALSE;
- pthread_cond_init(&entry->cond, NULL);
+ entry->cond = condvar_create(CONDVAR_TYPE_DEFAULT);
entry->session = load_session(this);
- entry->used = time(NULL);
+ entry->used = time_monotonic(NULL);
entry->host = strdup(host);
-
+
return entry;
}
+/**
+ * destroy a session
+ */
static void session_entry_destroy(session_entry_t *entry)
{
entry->session->destroy(entry->session);
+ entry->cond->destroy(entry->cond);
free(entry->host);
free(entry);
}
@@ -194,7 +209,7 @@ static void add_controller(private_dispatcher_t *this,
controller_constructor_t constructor, void *param)
{
controller_entry_t *entry = malloc_thing(controller_entry_t);
-
+
entry->constructor = constructor;
entry->param = param;
this->controllers->insert_last(this->controllers, entry);
@@ -207,90 +222,129 @@ static void add_filter(private_dispatcher_t *this,
filter_constructor_t constructor, void *param)
{
filter_entry_t *entry = malloc_thing(filter_entry_t);
-
+
entry->constructor = constructor;
entry->param = param;
this->filters->insert_last(this->filters, entry);
}
/**
- * Actual dispatching code
+ * Hashtable hash function
+ */
+static u_int session_hash(char *sid)
+{
+ return chunk_hash(chunk_create(sid, strlen(sid)));
+}
+
+/**
+ * Hashtable equals function
+ */
+static bool session_equals(char *sid1, char *sid2)
+{
+ return streq(sid1, sid2);
+}
+
+/**
+ * Cleanup unused sessions
+ */
+static void cleanup_sessions(private_dispatcher_t *this, time_t now)
+{
+ if (this->last_cleanup < now - CLEANUP_INTERVAL)
+ {
+ char *sid;
+ session_entry_t *entry;
+ enumerator_t *enumerator;
+ linked_list_t *remove;
+
+ this->last_cleanup = now;
+ remove = linked_list_create();
+ enumerator = this->sessions->create_enumerator(this->sessions);
+ while (enumerator->enumerate(enumerator, &sid, &entry))
+ {
+ /* check all sessions for timeout or close flag */
+ if (!entry->in_use &&
+ (entry->used < now - this->timeout || entry->closed))
+ {
+ remove->insert_last(remove, sid);
+ }
+ }
+ enumerator->destroy(enumerator);
+
+ while (remove->remove_last(remove, (void**)&sid) == SUCCESS)
+ {
+ entry = this->sessions->remove(this->sessions, sid);
+ if (entry)
+ {
+ session_entry_destroy(entry);
+ }
+ }
+ remove->destroy(remove);
+ }
+}
+
+/**
+ * Actual dispatching code
*/
static void dispatch(private_dispatcher_t *this)
{
- pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
+ thread_cancelability(FALSE);
while (TRUE)
{
request_t *request;
- session_entry_t *current, *found = NULL;
- iterator_t *iterator;
+ session_entry_t *found = NULL;
time_t now;
char *sid;
-
- pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
+
+ thread_cancelability(TRUE);
request = request_create(this->fd, this->debug);
- pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
+ thread_cancelability(FALSE);
if (request == NULL)
{
continue;
}
+ now = time_monotonic(NULL);
sid = request->get_cookie(request, "SID");
- now = time(NULL);
-
- /* find session */
- pthread_mutex_lock(&this->mutex);
- iterator = this->sessions->create_iterator(this->sessions, TRUE);
- while (iterator->iterate(iterator, (void**)&current))
+
+ this->mutex->lock(this->mutex);
+ if (sid)
{
- /* check all sessions for timeout or close flag
- * TODO: use a seperate cleanup thread */
- if (!current->in_use &&
- (current->used < now - this->timeout || current->closed))
- {
- iterator->remove(iterator);
- session_entry_destroy(current);
- continue;
- }
- /* find by session ID. Prevent session hijacking by host check */
- if (!found && sid &&
- streq(current->session->get_sid(current->session), sid) &&
- streq(current->host, request->get_host(request)))
- {
- found = current;
- }
+ found = this->sessions->get(this->sessions, sid);
+ }
+ if (found && !streq(found->host, request->get_host(request)))
+ {
+ found = NULL;
}
- iterator->destroy(iterator);
-
if (found)
{
/* wait until session is unused */
while (found->in_use)
{
- pthread_cond_wait(&found->cond, &this->mutex);
+ found->cond->wait(found->cond, this->mutex);
}
}
else
{ /* create a new session if not found */
found = session_entry_create(this, request->get_host(request));
- this->sessions->insert_first(this->sessions, found);
+ sid = found->session->get_sid(found->session);
+ this->sessions->put(this->sessions, sid, found);
}
found->in_use = TRUE;
- pthread_mutex_unlock(&this->mutex);
-
+ this->mutex->unlock(this->mutex);
+
/* start processing */
found->session->process(found->session, request);
- found->used = time(NULL);
-
+ found->used = time_monotonic(NULL);
+
/* release session */
- pthread_mutex_lock(&this->mutex);
+ this->mutex->lock(this->mutex);
found->in_use = FALSE;
found->closed = request->session_closed(request);
- pthread_cond_signal(&found->cond);
- pthread_mutex_unlock(&this->mutex);
-
- /* cleanup */
+ found->cond->signal(found->cond);
+ cleanup_sessions(this, now);
+ this->mutex->unlock(this->mutex);
+
request->destroy(request);
}
}
@@ -301,11 +355,12 @@ static void dispatch(private_dispatcher_t *this)
static void run(private_dispatcher_t *this, int threads)
{
this->thread_count = threads;
- this->threads = malloc(sizeof(pthread_t) * threads);
+ this->threads = malloc(sizeof(thread_t*) * threads);
while (threads)
{
- if (pthread_create(&this->threads[threads - 1],
- NULL, (void*)dispatch, this) == 0)
+ this->threads[threads - 1] = thread_create((thread_main_t)dispatch,
+ this);
+ if (this->threads[threads - 1])
{
threads--;
}
@@ -319,7 +374,7 @@ static void waitsignal(private_dispatcher_t *this)
{
sigset_t set;
int sig;
-
+
sigemptyset(&set);
sigaddset(&set, SIGINT);
sigaddset(&set, SIGTERM);
@@ -333,15 +388,27 @@ static void waitsignal(private_dispatcher_t *this)
*/
static void destroy(private_dispatcher_t *this)
{
+ char *sid;
+ session_entry_t *entry;
+ enumerator_t *enumerator;
+
FCGX_ShutdownPending();
while (this->thread_count--)
{
- pthread_cancel(this->threads[this->thread_count]);
- pthread_join(this->threads[this->thread_count], NULL);
+ thread_t *thread = this->threads[this->thread_count];
+ thread->cancel(thread);
+ thread->join(thread);
+ }
+ enumerator = this->sessions->create_enumerator(this->sessions);
+ while (enumerator->enumerate(enumerator, &sid, &entry))
+ {
+ session_entry_destroy(entry);
}
- this->sessions->destroy_function(this->sessions, (void*)session_entry_destroy);
+ enumerator->destroy(enumerator);
+ this->sessions->destroy(this->sessions);
this->controllers->destroy_function(this->controllers, free);
this->filters->destroy_function(this->filters, free);
+ this->mutex->destroy(this->mutex);
free(this->threads);
free(this);
}
@@ -359,22 +426,24 @@ dispatcher_t *dispatcher_create(char *socket, bool debug, int timeout,
this->public.run = (void(*)(dispatcher_t*, int threads))run;
this->public.waitsignal = (void(*)(dispatcher_t*))waitsignal;
this->public.destroy = (void(*)(dispatcher_t*))destroy;
-
- this->sessions = linked_list_create();
+
+ this->sessions = hashtable_create((void*)session_hash,
+ (void*)session_equals, 4096);
this->controllers = linked_list_create();
this->filters = linked_list_create();
this->context_constructor = constructor;
- pthread_mutex_init(&this->mutex, NULL);
+ this->mutex = mutex_create(MUTEX_TYPE_DEFAULT);
this->param = param;
- this->fd = 0;
- this->timeout = timeout;
- this->debug = debug;
- this->threads = NULL;
-
- FCGX_Init();
-
- if (socket)
- {
+ this->fd = 0;
+ this->timeout = timeout;
+ this->last_cleanup = time_monotonic(NULL);
+ this->debug = debug;
+ this->threads = NULL;
+
+ FCGX_Init();
+
+ if (socket)
+ {
unlink(socket);
this->fd = FCGX_OpenSocket(socket, 10);
}