diff options
Diffstat (limited to 'src/libcharon/plugins/stroke/stroke_socket.c')
-rw-r--r-- | src/libcharon/plugins/stroke/stroke_socket.c | 199 |
1 files changed, 167 insertions, 32 deletions
diff --git a/src/libcharon/plugins/stroke/stroke_socket.c b/src/libcharon/plugins/stroke/stroke_socket.c index 88d0270d8..57648feb8 100644 --- a/src/libcharon/plugins/stroke/stroke_socket.c +++ b/src/libcharon/plugins/stroke/stroke_socket.c @@ -1,4 +1,5 @@ /* + * Copyright (C) 2011-2012 Tobias Brunner * Copyright (C) 2008 Martin Willi * Hochschule fuer Technik Rapperswil * @@ -20,13 +21,15 @@ #include <sys/stat.h> #include <sys/socket.h> #include <sys/un.h> -#include <sys/fcntl.h> #include <unistd.h> #include <errno.h> #include <hydra.h> #include <daemon.h> +#include <threading/mutex.h> #include <threading/thread.h> +#include <threading/condvar.h> +#include <utils/linked_list.h> #include <processing/jobs/callback_job.h> #include "stroke_config.h" @@ -36,6 +39,12 @@ #include "stroke_attribute.h" #include "stroke_list.h" +/** + * To avoid clogging the thread pool with (blocking) jobs, we limit the number + * of concurrently handled stroke commands. + */ +#define MAX_CONCURRENT_DEFAULT 4 + typedef struct stroke_job_context_t stroke_job_context_t; typedef struct private_stroke_socket_t private_stroke_socket_t; @@ -57,7 +66,37 @@ struct private_stroke_socket_t { /** * job accepting stroke messages */ - callback_job_t *job; + callback_job_t *receiver; + + /** + * job handling stroke messages + */ + callback_job_t *handler; + + /** + * queued stroke commands + */ + linked_list_t *commands; + + /** + * lock for command list + */ + mutex_t *mutex; + + /** + * condvar to signal the arrival or completion of commands + */ + condvar_t *condvar; + + /** + * the number of currently handled commands + */ + u_int handling; + + /** + * the maximum number of concurrently handled commands + */ + u_int max_concurrent; /** * configuration backend @@ -85,7 +124,7 @@ struct private_stroke_socket_t { stroke_ca_t *ca; /** - * Status information logging + * status information logging */ stroke_list_t *list; }; @@ -146,6 +185,7 @@ static void pop_end(stroke_msg_t *msg, const char* label, stroke_end_t *end) pop_string(msg, &end->auth2); pop_string(msg, &end->id); pop_string(msg, &end->id2); + pop_string(msg, &end->rsakey); pop_string(msg, &end->cert); pop_string(msg, &end->cert2); pop_string(msg, &end->ca); @@ -161,6 +201,7 @@ static void pop_end(stroke_msg_t *msg, const char* label, stroke_end_t *end) DBG2(DBG_CFG, " %sauth2=%s", label, end->auth2); DBG2(DBG_CFG, " %sid=%s", label, end->id); DBG2(DBG_CFG, " %sid2=%s", label, end->id2); + DBG2(DBG_CFG, " %srsakey=%s", label, end->rsakey); DBG2(DBG_CFG, " %scert=%s", label, end->cert); DBG2(DBG_CFG, " %scert2=%s", label, end->cert2); DBG2(DBG_CFG, " %sca=%s", label, end->ca); @@ -190,6 +231,9 @@ static void stroke_add_conn(private_stroke_socket_t *this, stroke_msg_t *msg) DBG2(DBG_CFG, " aaa_identity=%s", msg->add_conn.aaa_identity); DBG2(DBG_CFG, " ike=%s", msg->add_conn.algorithms.ike); DBG2(DBG_CFG, " esp=%s", msg->add_conn.algorithms.esp); + DBG2(DBG_CFG, " dpddelay=%d", msg->add_conn.dpd.delay); + DBG2(DBG_CFG, " dpdaction=%d", msg->add_conn.dpd.action); + DBG2(DBG_CFG, " closeaction=%d", msg->add_conn.close_action); DBG2(DBG_CFG, " mediation=%s", msg->add_conn.ikeme.mediation ? "yes" : "no"); DBG2(DBG_CFG, " mediated_by=%s", msg->add_conn.ikeme.mediated_by); DBG2(DBG_CFG, " me_peerid=%s", msg->add_conn.ikeme.peerid); @@ -322,11 +366,11 @@ static void stroke_del_ca(private_stroke_socket_t *this, * show status of daemon */ static void stroke_status(private_stroke_socket_t *this, - stroke_msg_t *msg, FILE *out, bool all) + stroke_msg_t *msg, FILE *out, bool all, bool wait) { pop_string(msg, &(msg->status.name)); - this->list->status(this->list, msg, out, all); + this->list->status(this->list, msg, out, all, wait); } /** @@ -382,7 +426,7 @@ static void stroke_export(private_stroke_socket_t *this, { pop_string(msg, &msg->export.selector); - if (msg->purge.flags & EXPORT_X509) + if (msg->export.flags & EXPORT_X509) { enumerator_t *enumerator; identification_t *id; @@ -418,6 +462,33 @@ static void stroke_leases(private_stroke_socket_t *this, } /** + * Show memory usage + */ +static void stroke_memusage(private_stroke_socket_t *this, + stroke_msg_t *msg, FILE *out) +{ + if (lib->leak_detective) + { + lib->leak_detective->usage(lib->leak_detective, out); + } +} + +/** + * Set username and password for a connection + */ +static void stroke_user_creds(private_stroke_socket_t *this, + stroke_msg_t *msg, FILE *out) +{ + pop_string(msg, &msg->user_creds.name); + pop_string(msg, &msg->user_creds.username); + pop_string(msg, &msg->user_creds.password); + + DBG1(DBG_CFG, "received stroke: user-creds '%s'", msg->user_creds.name); + + this->config->set_user_credentials(this->config, msg, out); +} + +/** * set the verbosity debug output */ static void stroke_loglevel(private_stroke_socket_t *this, @@ -433,7 +504,7 @@ static void stroke_loglevel(private_stroke_socket_t *this, msg->loglevel.level, msg->loglevel.type); group = enum_from_name(debug_names, msg->loglevel.type); - if (group < 0) + if ((int)group < 0) { fprintf(out, "invalid type (%s)!\n", msg->loglevel.type); return; @@ -475,6 +546,18 @@ static void stroke_job_context_destroy(stroke_job_context_t *this) } /** + * called to signal the completion of a command + */ +static inline job_requeue_t job_processed(private_stroke_socket_t *this) +{ + this->mutex->lock(this->mutex); + this->handling--; + this->condvar->signal(this->condvar); + this->mutex->unlock(this->mutex); + return JOB_REQUEUE_NONE; +} + +/** * process a stroke request from the socket pointed by "fd" */ static job_requeue_t process(stroke_job_context_t *ctx) @@ -492,7 +575,7 @@ static job_requeue_t process(stroke_job_context_t *ctx) { DBG1(DBG_CFG, "reading length of stroke message failed: %s", strerror(errno)); - return JOB_REQUEUE_NONE; + return job_processed(this); } /* read message */ @@ -501,14 +584,14 @@ static job_requeue_t process(stroke_job_context_t *ctx) if (bytes_read != msg_length) { DBG1(DBG_CFG, "reading stroke message failed: %s", strerror(errno)); - return JOB_REQUEUE_NONE; + return job_processed(this); } out = fdopen(strokefd, "w+"); if (out == NULL) { DBG1(DBG_CFG, "opening stroke output channel failed: %s", strerror(errno)); - return JOB_REQUEUE_NONE; + return job_processed(this); } DBG3(DBG_CFG, "stroke message %b", (void*)msg, msg_length); @@ -534,10 +617,13 @@ static job_requeue_t process(stroke_job_context_t *ctx) stroke_rekey(this, msg, out); break; case STR_STATUS: - stroke_status(this, msg, out, FALSE); + stroke_status(this, msg, out, FALSE, TRUE); break; case STR_STATUS_ALL: - stroke_status(this, msg, out, TRUE); + stroke_status(this, msg, out, TRUE, TRUE); + break; + case STR_STATUS_ALL_NOBLK: + stroke_status(this, msg, out, TRUE, FALSE); break; case STR_ADD_CONN: stroke_add_conn(this, msg); @@ -572,6 +658,12 @@ static job_requeue_t process(stroke_job_context_t *ctx) case STR_LEASES: stroke_leases(this, msg, out); break; + case STR_MEMUSAGE: + stroke_memusage(this, msg, out); + break; + case STR_USER_CREDS: + stroke_user_creds(this, msg, out); + break; default: DBG1(DBG_CFG, "received unknown stroke"); break; @@ -579,11 +671,38 @@ static job_requeue_t process(stroke_job_context_t *ctx) fclose(out); /* fclose() closes underlying FD */ ctx->fd = 0; - return JOB_REQUEUE_NONE; + return job_processed(this); +} + +/** + * Handle queued stroke commands + */ +static job_requeue_t handle(private_stroke_socket_t *this) +{ + stroke_job_context_t *ctx; + callback_job_t *job; + bool oldstate; + + this->mutex->lock(this->mutex); + thread_cleanup_push((thread_cleanup_t)this->mutex->unlock, this->mutex); + oldstate = thread_cancelability(TRUE); + while (this->commands->get_count(this->commands) == 0 || + this->handling >= this->max_concurrent) + { + this->condvar->wait(this->condvar, this->mutex); + } + thread_cancelability(oldstate); + this->commands->remove_first(this->commands, (void**)&ctx); + this->handling++; + thread_cleanup_pop(TRUE); + job = callback_job_create_with_prio((callback_job_cb_t)process, ctx, + (void*)stroke_job_context_destroy, this->handler, JOB_PRIO_HIGH); + lib->processor->queue_job(lib->processor, (job_t*)job); + return JOB_REQUEUE_DIRECT; } /** - * Implementation of private_stroke_socket_t.stroke_receive. + * Accept stroke commands and queue them to be handled */ static job_requeue_t receive(private_stroke_socket_t *this) { @@ -591,7 +710,6 @@ static job_requeue_t receive(private_stroke_socket_t *this) int strokeaddrlen = sizeof(strokeaddr); int strokefd; bool oldstate; - callback_job_t *job; stroke_job_context_t *ctx; oldstate = thread_cancelability(TRUE); @@ -604,17 +722,18 @@ static job_requeue_t receive(private_stroke_socket_t *this) return JOB_REQUEUE_FAIR; } - ctx = malloc_thing(stroke_job_context_t); - ctx->fd = strokefd; - ctx->this = this; - job = callback_job_create((callback_job_cb_t)process, - ctx, (void*)stroke_job_context_destroy, this->job); - lib->processor->queue_job(lib->processor, (job_t*)job); + INIT(ctx, + .fd = strokefd, + .this = this, + ); + this->mutex->lock(this->mutex); + this->commands->insert_last(this->commands, ctx); + this->condvar->signal(this->condvar); + this->mutex->unlock(this->mutex); return JOB_REQUEUE_FAIR; } - /** * initialize and open stroke socket */ @@ -659,12 +778,14 @@ static bool open_socket(private_stroke_socket_t *this) return TRUE; } -/** - * Implementation of stroke_socket_t.destroy - */ -static void destroy(private_stroke_socket_t *this) +METHOD(stroke_socket_t, destroy, void, + private_stroke_socket_t *this) { - this->job->cancel(this->job); + this->handler->cancel(this->handler); + this->receiver->cancel(this->receiver); + this->commands->destroy_function(this->commands, (void*)stroke_job_context_destroy); + this->condvar->destroy(this->condvar); + this->mutex->destroy(this->mutex); lib->credmgr->remove_set(lib->credmgr, &this->ca->set); lib->credmgr->remove_set(lib->credmgr, &this->cred->set); charon->backends->remove_backend(charon->backends, &this->config->backend); @@ -683,9 +804,13 @@ static void destroy(private_stroke_socket_t *this) */ stroke_socket_t *stroke_socket_create() { - private_stroke_socket_t *this = malloc_thing(private_stroke_socket_t); + private_stroke_socket_t *this; - this->public.destroy = (void(*)(stroke_socket_t*))destroy; + INIT(this, + .public = { + .destroy = _destroy, + }, + ); if (!open_socket(this)) { @@ -700,14 +825,24 @@ stroke_socket_t *stroke_socket_create() this->control = stroke_control_create(); this->list = stroke_list_create(this->attribute); + this->mutex = mutex_create(MUTEX_TYPE_DEFAULT); + this->condvar = condvar_create(CONDVAR_TYPE_DEFAULT); + this->commands = linked_list_create(); + this->max_concurrent = lib->settings->get_int(lib->settings, + "charon.plugins.stroke.max_concurrent", MAX_CONCURRENT_DEFAULT); + lib->credmgr->add_set(lib->credmgr, &this->ca->set); lib->credmgr->add_set(lib->credmgr, &this->cred->set); charon->backends->add_backend(charon->backends, &this->config->backend); hydra->attributes->add_provider(hydra->attributes, &this->attribute->provider); - this->job = callback_job_create((callback_job_cb_t)receive, - this, NULL, NULL); - lib->processor->queue_job(lib->processor, (job_t*)this->job); + this->receiver = callback_job_create_with_prio((callback_job_cb_t)receive, + this, NULL, NULL, JOB_PRIO_CRITICAL); + lib->processor->queue_job(lib->processor, (job_t*)this->receiver); + + this->handler = callback_job_create_with_prio((callback_job_cb_t)handle, + this, NULL, NULL, JOB_PRIO_CRITICAL); + lib->processor->queue_job(lib->processor, (job_t*)this->handler); return &this->public; } |