diff options
Diffstat (limited to 'src/libfast/dispatcher.c')
-rw-r--r-- | src/libfast/dispatcher.c | 261 |
1 files changed, 165 insertions, 96 deletions
diff --git a/src/libfast/dispatcher.c b/src/libfast/dispatcher.c index 35ae55814..7690230d3 100644 --- a/src/libfast/dispatcher.c +++ b/src/libfast/dispatcher.c @@ -19,12 +19,18 @@ #include "session.h" #include <fcgiapp.h> -#include <pthread.h> #include <signal.h> #include <unistd.h> #include <debug.h> +#include <threading/thread.h> +#include <threading/condvar.h> +#include <threading/mutex.h> #include <utils/linked_list.h> +#include <utils/hashtable.h> + +/** Intervall to check for expired sessions, in seconds */ +#define CLEANUP_INTERVAL 30 typedef struct private_dispatcher_t private_dispatcher_t; @@ -37,57 +43,62 @@ struct private_dispatcher_t { * public functions */ dispatcher_t public; - + /** * fcgi socket fd */ int fd; - + /** * thread list */ - pthread_t *threads; - + thread_t **threads; + /** * number of threads in "threads" */ int thread_count; - + /** * session locking mutex */ - pthread_mutex_t mutex; - + mutex_t *mutex; + /** - * List of sessions + * Hahstable with active sessions */ - linked_list_t *sessions; - + hashtable_t *sessions; + /** * session timeout */ time_t timeout; - + + /** + * timestamp of last session cleanup round + */ + time_t last_cleanup; + /** * running in debug mode? */ bool debug; - + /** * List of controllers controller_constructor_t */ linked_list_t *controllers; - + /** * List of filters filter_constructor_t */ linked_list_t *filters; - - /** + + /** * constructor function to create session context (in controller_entry_t) */ context_constructor_t context_constructor; - + /** * user param to context constructor */ @@ -112,7 +123,7 @@ typedef struct { /** session instance */ session_t *session; /** condvar to wait for session */ - pthread_cond_t cond; + condvar_t *cond; /** client host address, to prevent session hijacking */ char *host; /** TRUE if session is in use */ @@ -128,36 +139,36 @@ typedef struct { */ static session_t* load_session(private_dispatcher_t *this) { - iterator_t *iterator; + enumerator_t *enumerator; controller_entry_t *centry; filter_entry_t *fentry; session_t *session; context_t *context = NULL; controller_t *controller; filter_t *filter; - + if (this->context_constructor) { context = this->context_constructor(this->param); } session = session_create(context); - - iterator = this->controllers->create_iterator(this->controllers, TRUE); - while (iterator->iterate(iterator, (void**)¢ry)) + + enumerator = this->controllers->create_enumerator(this->controllers); + while (enumerator->enumerate(enumerator, ¢ry)) { controller = centry->constructor(context, centry->param); session->add_controller(session, controller); } - iterator->destroy(iterator); - - iterator = this->filters->create_iterator(this->filters, TRUE); - while (iterator->iterate(iterator, (void**)&fentry)) + enumerator->destroy(enumerator); + + enumerator = this->filters->create_enumerator(this->filters); + while (enumerator->enumerate(enumerator, &fentry)) { filter = fentry->constructor(context, fentry->param); session->add_filter(session, filter); } - iterator->destroy(iterator); - + enumerator->destroy(enumerator); + return session; } @@ -168,21 +179,25 @@ static session_entry_t *session_entry_create(private_dispatcher_t *this, char *host) { session_entry_t *entry; - + entry = malloc_thing(session_entry_t); entry->in_use = FALSE; entry->closed = FALSE; - pthread_cond_init(&entry->cond, NULL); + entry->cond = condvar_create(CONDVAR_TYPE_DEFAULT); entry->session = load_session(this); - entry->used = time(NULL); + entry->used = time_monotonic(NULL); entry->host = strdup(host); - + return entry; } +/** + * destroy a session + */ static void session_entry_destroy(session_entry_t *entry) { entry->session->destroy(entry->session); + entry->cond->destroy(entry->cond); free(entry->host); free(entry); } @@ -194,7 +209,7 @@ static void add_controller(private_dispatcher_t *this, controller_constructor_t constructor, void *param) { controller_entry_t *entry = malloc_thing(controller_entry_t); - + entry->constructor = constructor; entry->param = param; this->controllers->insert_last(this->controllers, entry); @@ -207,90 +222,129 @@ static void add_filter(private_dispatcher_t *this, filter_constructor_t constructor, void *param) { filter_entry_t *entry = malloc_thing(filter_entry_t); - + entry->constructor = constructor; entry->param = param; this->filters->insert_last(this->filters, entry); } /** - * Actual dispatching code + * Hashtable hash function + */ +static u_int session_hash(char *sid) +{ + return chunk_hash(chunk_create(sid, strlen(sid))); +} + +/** + * Hashtable equals function + */ +static bool session_equals(char *sid1, char *sid2) +{ + return streq(sid1, sid2); +} + +/** + * Cleanup unused sessions + */ +static void cleanup_sessions(private_dispatcher_t *this, time_t now) +{ + if (this->last_cleanup < now - CLEANUP_INTERVAL) + { + char *sid; + session_entry_t *entry; + enumerator_t *enumerator; + linked_list_t *remove; + + this->last_cleanup = now; + remove = linked_list_create(); + enumerator = this->sessions->create_enumerator(this->sessions); + while (enumerator->enumerate(enumerator, &sid, &entry)) + { + /* check all sessions for timeout or close flag */ + if (!entry->in_use && + (entry->used < now - this->timeout || entry->closed)) + { + remove->insert_last(remove, sid); + } + } + enumerator->destroy(enumerator); + + while (remove->remove_last(remove, (void**)&sid) == SUCCESS) + { + entry = this->sessions->remove(this->sessions, sid); + if (entry) + { + session_entry_destroy(entry); + } + } + remove->destroy(remove); + } +} + +/** + * Actual dispatching code */ static void dispatch(private_dispatcher_t *this) { - pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); + thread_cancelability(FALSE); while (TRUE) { request_t *request; - session_entry_t *current, *found = NULL; - iterator_t *iterator; + session_entry_t *found = NULL; time_t now; char *sid; - - pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); + + thread_cancelability(TRUE); request = request_create(this->fd, this->debug); - pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); + thread_cancelability(FALSE); if (request == NULL) { continue; } + now = time_monotonic(NULL); sid = request->get_cookie(request, "SID"); - now = time(NULL); - - /* find session */ - pthread_mutex_lock(&this->mutex); - iterator = this->sessions->create_iterator(this->sessions, TRUE); - while (iterator->iterate(iterator, (void**)¤t)) + + this->mutex->lock(this->mutex); + if (sid) { - /* check all sessions for timeout or close flag - * TODO: use a seperate cleanup thread */ - if (!current->in_use && - (current->used < now - this->timeout || current->closed)) - { - iterator->remove(iterator); - session_entry_destroy(current); - continue; - } - /* find by session ID. Prevent session hijacking by host check */ - if (!found && sid && - streq(current->session->get_sid(current->session), sid) && - streq(current->host, request->get_host(request))) - { - found = current; - } + found = this->sessions->get(this->sessions, sid); + } + if (found && !streq(found->host, request->get_host(request))) + { + found = NULL; } - iterator->destroy(iterator); - if (found) { /* wait until session is unused */ while (found->in_use) { - pthread_cond_wait(&found->cond, &this->mutex); + found->cond->wait(found->cond, this->mutex); } } else { /* create a new session if not found */ found = session_entry_create(this, request->get_host(request)); - this->sessions->insert_first(this->sessions, found); + sid = found->session->get_sid(found->session); + this->sessions->put(this->sessions, sid, found); } found->in_use = TRUE; - pthread_mutex_unlock(&this->mutex); - + this->mutex->unlock(this->mutex); + /* start processing */ found->session->process(found->session, request); - found->used = time(NULL); - + found->used = time_monotonic(NULL); + /* release session */ - pthread_mutex_lock(&this->mutex); + this->mutex->lock(this->mutex); found->in_use = FALSE; found->closed = request->session_closed(request); - pthread_cond_signal(&found->cond); - pthread_mutex_unlock(&this->mutex); - - /* cleanup */ + found->cond->signal(found->cond); + cleanup_sessions(this, now); + this->mutex->unlock(this->mutex); + request->destroy(request); } } @@ -301,11 +355,12 @@ static void dispatch(private_dispatcher_t *this) static void run(private_dispatcher_t *this, int threads) { this->thread_count = threads; - this->threads = malloc(sizeof(pthread_t) * threads); + this->threads = malloc(sizeof(thread_t*) * threads); while (threads) { - if (pthread_create(&this->threads[threads - 1], - NULL, (void*)dispatch, this) == 0) + this->threads[threads - 1] = thread_create((thread_main_t)dispatch, + this); + if (this->threads[threads - 1]) { threads--; } @@ -319,7 +374,7 @@ static void waitsignal(private_dispatcher_t *this) { sigset_t set; int sig; - + sigemptyset(&set); sigaddset(&set, SIGINT); sigaddset(&set, SIGTERM); @@ -333,15 +388,27 @@ static void waitsignal(private_dispatcher_t *this) */ static void destroy(private_dispatcher_t *this) { + char *sid; + session_entry_t *entry; + enumerator_t *enumerator; + FCGX_ShutdownPending(); while (this->thread_count--) { - pthread_cancel(this->threads[this->thread_count]); - pthread_join(this->threads[this->thread_count], NULL); + thread_t *thread = this->threads[this->thread_count]; + thread->cancel(thread); + thread->join(thread); + } + enumerator = this->sessions->create_enumerator(this->sessions); + while (enumerator->enumerate(enumerator, &sid, &entry)) + { + session_entry_destroy(entry); } - this->sessions->destroy_function(this->sessions, (void*)session_entry_destroy); + enumerator->destroy(enumerator); + this->sessions->destroy(this->sessions); this->controllers->destroy_function(this->controllers, free); this->filters->destroy_function(this->filters, free); + this->mutex->destroy(this->mutex); free(this->threads); free(this); } @@ -359,22 +426,24 @@ dispatcher_t *dispatcher_create(char *socket, bool debug, int timeout, this->public.run = (void(*)(dispatcher_t*, int threads))run; this->public.waitsignal = (void(*)(dispatcher_t*))waitsignal; this->public.destroy = (void(*)(dispatcher_t*))destroy; - - this->sessions = linked_list_create(); + + this->sessions = hashtable_create((void*)session_hash, + (void*)session_equals, 4096); this->controllers = linked_list_create(); this->filters = linked_list_create(); this->context_constructor = constructor; - pthread_mutex_init(&this->mutex, NULL); + this->mutex = mutex_create(MUTEX_TYPE_DEFAULT); this->param = param; - this->fd = 0; - this->timeout = timeout; - this->debug = debug; - this->threads = NULL; - - FCGX_Init(); - - if (socket) - { + this->fd = 0; + this->timeout = timeout; + this->last_cleanup = time_monotonic(NULL); + this->debug = debug; + this->threads = NULL; + + FCGX_Init(); + + if (socket) + { unlink(socket); this->fd = FCGX_OpenSocket(socket, 10); } |