summaryrefslogtreecommitdiff
path: root/src/charon/bus
diff options
context:
space:
mode:
Diffstat (limited to 'src/charon/bus')
-rw-r--r--src/charon/bus/bus.c306
-rw-r--r--src/charon/bus/bus.h52
2 files changed, 115 insertions, 243 deletions
diff --git a/src/charon/bus/bus.c b/src/charon/bus/bus.c
index 5fda36925..e53ac43ce 100644
--- a/src/charon/bus/bus.c
+++ b/src/charon/bus/bus.c
@@ -24,6 +24,8 @@
#include <pthread.h>
+#include <daemon.h>
+
ENUM(signal_names, SIG_ANY, SIG_MAX,
/** should not get printed */
"SIG_ANY",
@@ -53,104 +55,74 @@ ENUM(signal_names, SIG_ANY, SIG_MAX,
"SIG_MAX",
);
-typedef struct active_listener_t active_listener_t;
+typedef struct private_bus_t private_bus_t;
/**
- * information for a active listener
+ * Private data of a bus_t object.
*/
-struct active_listener_t {
-
- /**
- * associated thread
- */
- pthread_t id;
-
- /**
- * condvar to wait for a signal
- */
- pthread_cond_t cond;
-
- /**
- * state of the thread
- */
- enum {
- /** not registered, do not wait for thread */
- UNREGISTERED,
- /** registered, if a signal occurs, wait until it is LISTENING */
- REGISTERED,
- /** listening, deliver signal */
- LISTENING,
- } state;
-
- /**
- * currently processed signals type
- */
- signal_t signal;
-
+struct private_bus_t {
/**
- * verbosity level of the signal
+ * Public part of a bus_t object.
*/
- level_t level;
+ bus_t public;
/**
- * current processed signals thread number
+ * List of registered listeners as entry_t's
*/
- int thread;
+ linked_list_t *listeners;
/**
- * currently processed signals ike_sa
+ * mutex to synchronize active listeners
*/
- ike_sa_t *ike_sa;
+ pthread_mutex_t mutex;
/**
- * currently processed signals format string
+ * Thread local storage for a unique, simple thread ID
*/
- char *format;
+ pthread_key_t thread_id;
/**
- * currently processed signals format varargs
+ * Thread local storage the threads IKE_SA
*/
- va_list args;
-
+ pthread_key_t thread_sa;
};
-typedef struct private_bus_t private_bus_t;
+typedef struct entry_t entry_t;
/**
- * Private data of a bus_t object.
+ * a listener entry, either active or passive
*/
-struct private_bus_t {
- /**
- * Public part of a bus_t object.
- */
- bus_t public;
-
- /**
- * List of registered listeners implementing the bus_t interface
- */
- linked_list_t *listeners;
-
+struct entry_t {
+
/**
- * List of active listeners with listener_state TRUE
+ * registered listener interface
*/
- linked_list_t *active_listeners;
+ bus_listener_t *listener;
/**
- * mutex to synchronize active listeners
+ * is this a active listen() call with a blocking thread
*/
- pthread_mutex_t mutex;
+ bool blocker;
/**
- * Thread local storage for a unique, simple thread ID
+ * condvar where active listeners wait
*/
- pthread_key_t thread_id;
+ pthread_cond_t cond;
+};
+
+/**
+ * create a listener entry
+ */
+static entry_t *entry_create(bus_listener_t *listener, bool blocker)
+{
+ entry_t *this = malloc_thing(entry_t);
- /**
- * Thread local storage the threads IKE_SA
- */
- pthread_key_t thread_sa;
+ this->listener = listener;
+ this->blocker = blocker;
+ pthread_cond_init(&this->cond, NULL);
-};
+ return this;
+}
/**
* Get a unique thread number for a calling thread. Since
@@ -160,7 +132,7 @@ struct private_bus_t {
static int get_thread_number(private_bus_t *this)
{
static long current_num = 0;
- static long stored_num;
+ long stored_num;
stored_num = (long)pthread_getspecific(this->thread_id);
if (stored_num == 0)
@@ -180,7 +152,7 @@ static int get_thread_number(private_bus_t *this)
static void add_listener(private_bus_t *this, bus_listener_t *listener)
{
pthread_mutex_lock(&this->mutex);
- this->listeners->insert_last(this->listeners, listener);
+ this->listeners->insert_last(this->listeners, entry_create(listener, FALSE));
pthread_mutex_unlock(&this->mutex);
}
@@ -190,15 +162,16 @@ static void add_listener(private_bus_t *this, bus_listener_t *listener)
static void remove_listener(private_bus_t *this, bus_listener_t *listener)
{
iterator_t *iterator;
- bus_listener_t *current;
+ entry_t *entry;
pthread_mutex_lock(&this->mutex);
iterator = this->listeners->create_iterator(this->listeners, TRUE);
- while (iterator->iterate(iterator, (void**)&current))
+ while (iterator->iterate(iterator, (void**)&entry))
{
- if (current == listener)
+ if (entry->listener == listener)
{
iterator->remove(iterator);
+ free(entry);
break;
}
}
@@ -206,109 +179,67 @@ static void remove_listener(private_bus_t *this, bus_listener_t *listener)
pthread_mutex_unlock(&this->mutex);
}
+typedef struct cleanup_data_t cleanup_data_t;
+
+/**
+ * data to remove a listener using pthread_cleanup handler
+ */
+struct cleanup_data_t {
+ /** bus instance */
+ private_bus_t *this;
+ /** listener entry */
+ entry_t *entry;
+};
+
/**
- * Get the listener object for the calling thread
+ * pthread_cleanup handler to remove a listener
*/
-static active_listener_t *get_active_listener(private_bus_t *this)
+static void listener_cleanup(cleanup_data_t *data)
{
- active_listener_t *current, *found = NULL;
iterator_t *iterator;
-
- /* if the thread was here once before, we have a active_listener record */
- iterator = this->active_listeners->create_iterator(this->active_listeners, TRUE);
- while (iterator->iterate(iterator, (void**)&current))
+ entry_t *entry;
+
+ iterator = data->this->listeners->create_iterator(data->this->listeners, TRUE);
+ while (iterator->iterate(iterator, (void**)&entry))
{
- if (current->id == pthread_self())
+ if (entry == data->entry)
{
- found = current;
+ iterator->remove(iterator);
+ free(entry);
break;
}
}
iterator->destroy(iterator);
-
- if (found == NULL)
- {
- /* create a new object for a never-seen thread */
- found = malloc_thing(active_listener_t);
- found->id = pthread_self();
- pthread_cond_init(&found->cond, NULL);
- this->active_listeners->insert_last(this->active_listeners, found);
- }
-
- return found;
-}
-
-/**
- * disable a listener to cleanly clean up
- */
-static void unregister(active_listener_t *listener)
-{
- listener->state = UNREGISTERED;
- pthread_cond_broadcast(&listener->cond);
}
/**
* Implementation of bus_t.listen.
*/
-static signal_t listen_(private_bus_t *this, level_t *level, int *thread,
- ike_sa_t **ike_sa, char** format, va_list* args)
+static void listen_(private_bus_t *this, bus_listener_t *listener, job_t *job)
{
- active_listener_t *listener;
- int oldstate;
-
- pthread_mutex_lock(&this->mutex);
- listener = get_active_listener(this);
- /* go "listening", say hello to a thread which have a signal for us */
- listener->state = LISTENING;
- pthread_cond_broadcast(&listener->cond);
- /* wait until it has us delivered a signal, and go back to "registered".
- * we allow cancellation here, but must cleanly disable the listener. */
- pthread_cleanup_push((void*)pthread_mutex_unlock, &this->mutex);
- pthread_cleanup_push((void*)unregister, listener);
- pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &oldstate);
- pthread_cond_wait(&listener->cond, &this->mutex);
- pthread_setcancelstate(oldstate, NULL);
- pthread_cleanup_pop(0);
- pthread_cleanup_pop(0);
-
- pthread_mutex_unlock(&this->mutex);
-
- /* return signal values */
- *level = listener->level;
- *thread = listener->thread;
- *ike_sa = listener->ike_sa;
- *format = listener->format;
- va_copy(*args, listener->args);
- va_end(listener->args);
+ int old;
+ cleanup_data_t data;
- return listener->signal;
-}
+ data.this = this;
+ data.entry = entry_create(listener, TRUE);
-/**
- * Implementation of bus_t.set_listen_state.
- */
-static void set_listen_state(private_bus_t *this, bool active)
-{
- active_listener_t *listener;
-
pthread_mutex_lock(&this->mutex);
-
- listener = get_active_listener(this);
- if (active)
- {
- listener->state = REGISTERED;
- }
- else
+ this->listeners->insert_last(this->listeners, data.entry);
+ charon->processor->queue_job(charon->processor, job);
+ pthread_cleanup_push((void*)pthread_mutex_unlock, &this->mutex);
+ pthread_cleanup_push((void*)listener_cleanup, &data);
+ pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &old);
+ while (data.entry->blocker)
{
- listener->state = UNREGISTERED;
- /* say hello to signal emitter; we are finished processing the signal */
- pthread_cond_broadcast(&listener->cond);
+ pthread_cond_wait(&data.entry->cond, &this->mutex);
}
-
- pthread_mutex_unlock(&this->mutex);
+ pthread_setcancelstate(old, NULL);
+ pthread_cleanup_pop(FALSE);
+ /* unlock mutex */
+ pthread_cleanup_pop(TRUE);
+ free(data.entry);
}
-
/**
* Implementation of bus_t.set_sa.
*/
@@ -324,72 +255,37 @@ static void vsignal(private_bus_t *this, signal_t signal, level_t level,
char* format, va_list args)
{
iterator_t *iterator;
- bus_listener_t *listener;
- active_listener_t *active_listener;
+ entry_t *entry;
ike_sa_t *ike_sa;
long thread;
+ pthread_mutex_lock(&this->mutex);
ike_sa = pthread_getspecific(this->thread_sa);
thread = get_thread_number(this);
- pthread_mutex_lock(&this->mutex);
-
- /* do the job for all passive bus_listeners */
iterator = this->listeners->create_iterator(this->listeners, TRUE);
- while (iterator->iterate(iterator, (void**)&listener))
+ while (iterator->iterate(iterator, (void**)&entry))
{
va_list args_copy;
va_copy(args_copy, args);
- if (!listener->signal(listener, signal, level, thread,
- ike_sa, format, args_copy))
+ if (!entry->listener->signal(entry->listener, signal, level, thread,
+ ike_sa, format, args_copy))
{
- /* unregister listener if requested */
iterator->remove(iterator);
+ if (entry->blocker)
+ {
+ entry->blocker = FALSE;
+ pthread_cond_signal(&entry->cond);
+ }
+ else
+ {
+ free(entry);
+ }
}
va_end(args_copy);
}
iterator->destroy(iterator);
- /* wake up all active listeners */
- iterator = this->active_listeners->create_iterator(this->active_listeners, TRUE);
- while (iterator->iterate(iterator, (void**)&active_listener))
- {
- /* wait until all threads are registered. But if the thread raising
- * the signal is the same as the one that listens, we skip it.
- * Otherwise we would deadlock. */
- while (active_listener->id != pthread_self() &&
- active_listener->state == REGISTERED)
- {
- pthread_cond_wait(&active_listener->cond, &this->mutex);
- }
- /* if thread is listening now, give it the signal to process */
- if (active_listener->state == LISTENING)
- {
- active_listener->level = level;
- active_listener->thread = thread;
- active_listener->ike_sa = ike_sa;
- active_listener->signal = signal;
- active_listener->format = format;
- va_copy(active_listener->args, args);
- active_listener->state = REGISTERED;
- pthread_cond_broadcast(&active_listener->cond);
- }
- }
-
- /* we must wait now until all are not in state REGISTERED,
- * as they may still use our arguments */
- iterator->reset(iterator);
- while (iterator->iterate(iterator, (void**)&active_listener))
- {
- /* do not wait for ourself, it won't happen (see above) */
- while (active_listener->id != pthread_self() &&
- active_listener->state == REGISTERED)
- {
- pthread_cond_wait(&active_listener->cond, &this->mutex);
- }
- }
- iterator->destroy(iterator);
-
pthread_mutex_unlock(&this->mutex);
}
@@ -411,8 +307,7 @@ static void signal_(private_bus_t *this, signal_t signal, level_t level,
*/
static void destroy(private_bus_t *this)
{
- this->active_listeners->destroy_function(this->active_listeners, free);
- this->listeners->destroy(this->listeners);
+ this->listeners->destroy_function(this->listeners, free);
free(this);
}
@@ -425,18 +320,17 @@ bus_t *bus_create()
this->public.add_listener = (void(*)(bus_t*,bus_listener_t*))add_listener;
this->public.remove_listener = (void(*)(bus_t*,bus_listener_t*))remove_listener;
- this->public.listen = (signal_t(*)(bus_t*,level_t*,int*,ike_sa_t**,char**,va_list*))listen_;
- this->public.set_listen_state = (void(*)(bus_t*,bool))set_listen_state;
+ this->public.listen = (void(*)(bus_t*, bus_listener_t *listener, job_t *job))listen_;
this->public.set_sa = (void(*)(bus_t*,ike_sa_t*))set_sa;
this->public.signal = (void(*)(bus_t*,signal_t,level_t,char*,...))signal_;
this->public.vsignal = (void(*)(bus_t*,signal_t,level_t,char*,va_list))vsignal;
this->public.destroy = (void(*)(bus_t*)) destroy;
this->listeners = linked_list_create();
- this->active_listeners = linked_list_create();
pthread_mutex_init(&this->mutex, NULL);
pthread_key_create(&this->thread_id, NULL);
pthread_key_create(&this->thread_sa, NULL);
- return &(this->public);
+ return &this->public;
}
+
diff --git a/src/charon/bus/bus.h b/src/charon/bus/bus.h
index 00f1ab7ac..f71018444 100644
--- a/src/charon/bus/bus.h
+++ b/src/charon/bus/bus.h
@@ -32,6 +32,7 @@ typedef struct bus_t bus_t;
#include <sa/ike_sa.h>
#include <sa/child_sa.h>
+#include <processing/jobs/job.h>
/**
@@ -251,9 +252,7 @@ struct bus_listener_t {
* in receiving event signals registers at the bus. Any signals sent to
* are delivered to all registered listeners.
* To deliver signals to threads, the blocking listen() call may be used
- * to wait for a signal. However, passive listeners should be preferred,
- * as listening actively requires some synchronization overhead as data
- * must be passed from the raising thread to the listening thread.
+ * to wait for a signal.
*
* @ingroup bus
*/
@@ -280,44 +279,19 @@ struct bus_t {
void (*remove_listener) (bus_t *this, bus_listener_t *listener);
/**
- * @brief Listen actively on the bus.
+ * @brief Register a listener and block the calling thread.
*
- * As we are fully multithreaded, we must provide a mechanism
- * for active threads to listen to the bus. With the listen() method,
- * a thread waits until a signal occurs, and then processes it.
- * To prevent the listen() calling thread to miss signals emitted while
- * it processes a signal, registration is required. This is done through
- * the set_listen_state() method, see below.
- *
- * The listen() function is (has) a thread cancellation point, so you might
- * want to register cleanup handlers.
+ * This call registers a listener and blocks the calling thread until
+ * its listeners function returns FALSE. This allows to wait for certain
+ * events. The associated job is executed after the listener has been
+ * registered, this allows to listen on events we initiate with the job
+ * without missing any signals.
*
* @param this bus
- * @param level verbosity level of the signal
- * @param thread receives thread number emitted the signal
- * @param ike_sa receives the IKE_SA involved in the signal, or NULL
- * @param format receives the format string supplied with the signal
- * @param va_list receives the variable argument list for format
- * @return the emitted signal type
+ * @param listener listener to register
+ * @param job job to execute asynchronously when registered, or NULL
*/
- signal_t (*listen) (bus_t *this, level_t* level, int *thread,
- ike_sa_t **ike_sa, char** format, va_list* args);
-
- /**
- * @brief Set the listening state of the calling thread.
- *
- * To prevent message loss for active listeners using listen(), threads
- * must register themself to the bus before starting to listen(). When
- * a signal occurs, the emitter waits until all threads with listen_state
- * TRUE are waiting in the listen() method to process the signal.
- * It is important that a thread with listen_state TRUE calls listen()
- * periodically, or sets it's listening state to FALSE; otherwise
- * all signal emitting threads get blocked on the bus.
- *
- * @param this bus
- * @param active TRUE to set to listening
- */
- void (*set_listen_state) (bus_t *this, bool active);
+ void (*listen)(bus_t *this, bus_listener_t *listener, job_t *job);
/**
* @brief Set the IKE_SA the calling thread is using.
@@ -355,6 +329,10 @@ struct bus_t {
*
* Same as bus_t.signal(), but uses va_list argument list.
*
+ * @todo Improve performace of vsignal implementation. This method is
+ * called extensively and therefore shouldn't allocate heap memory or
+ * do other expensive tasks!
+ *
* @param this bus
* @param singal kind of the signal (up, down, rekeyed, ...)
* @param level verbosity level of the signal