diff options
Diffstat (limited to 'src/charon/bus')
-rw-r--r-- | src/charon/bus/bus.c | 306 | ||||
-rw-r--r-- | src/charon/bus/bus.h | 52 |
2 files changed, 115 insertions, 243 deletions
diff --git a/src/charon/bus/bus.c b/src/charon/bus/bus.c index 5fda36925..e53ac43ce 100644 --- a/src/charon/bus/bus.c +++ b/src/charon/bus/bus.c @@ -24,6 +24,8 @@ #include <pthread.h> +#include <daemon.h> + ENUM(signal_names, SIG_ANY, SIG_MAX, /** should not get printed */ "SIG_ANY", @@ -53,104 +55,74 @@ ENUM(signal_names, SIG_ANY, SIG_MAX, "SIG_MAX", ); -typedef struct active_listener_t active_listener_t; +typedef struct private_bus_t private_bus_t; /** - * information for a active listener + * Private data of a bus_t object. */ -struct active_listener_t { - - /** - * associated thread - */ - pthread_t id; - - /** - * condvar to wait for a signal - */ - pthread_cond_t cond; - - /** - * state of the thread - */ - enum { - /** not registered, do not wait for thread */ - UNREGISTERED, - /** registered, if a signal occurs, wait until it is LISTENING */ - REGISTERED, - /** listening, deliver signal */ - LISTENING, - } state; - - /** - * currently processed signals type - */ - signal_t signal; - +struct private_bus_t { /** - * verbosity level of the signal + * Public part of a bus_t object. */ - level_t level; + bus_t public; /** - * current processed signals thread number + * List of registered listeners as entry_t's */ - int thread; + linked_list_t *listeners; /** - * currently processed signals ike_sa + * mutex to synchronize active listeners */ - ike_sa_t *ike_sa; + pthread_mutex_t mutex; /** - * currently processed signals format string + * Thread local storage for a unique, simple thread ID */ - char *format; + pthread_key_t thread_id; /** - * currently processed signals format varargs + * Thread local storage the threads IKE_SA */ - va_list args; - + pthread_key_t thread_sa; }; -typedef struct private_bus_t private_bus_t; +typedef struct entry_t entry_t; /** - * Private data of a bus_t object. + * a listener entry, either active or passive */ -struct private_bus_t { - /** - * Public part of a bus_t object. - */ - bus_t public; - - /** - * List of registered listeners implementing the bus_t interface - */ - linked_list_t *listeners; - +struct entry_t { + /** - * List of active listeners with listener_state TRUE + * registered listener interface */ - linked_list_t *active_listeners; + bus_listener_t *listener; /** - * mutex to synchronize active listeners + * is this a active listen() call with a blocking thread */ - pthread_mutex_t mutex; + bool blocker; /** - * Thread local storage for a unique, simple thread ID + * condvar where active listeners wait */ - pthread_key_t thread_id; + pthread_cond_t cond; +}; + +/** + * create a listener entry + */ +static entry_t *entry_create(bus_listener_t *listener, bool blocker) +{ + entry_t *this = malloc_thing(entry_t); - /** - * Thread local storage the threads IKE_SA - */ - pthread_key_t thread_sa; + this->listener = listener; + this->blocker = blocker; + pthread_cond_init(&this->cond, NULL); -}; + return this; +} /** * Get a unique thread number for a calling thread. Since @@ -160,7 +132,7 @@ struct private_bus_t { static int get_thread_number(private_bus_t *this) { static long current_num = 0; - static long stored_num; + long stored_num; stored_num = (long)pthread_getspecific(this->thread_id); if (stored_num == 0) @@ -180,7 +152,7 @@ static int get_thread_number(private_bus_t *this) static void add_listener(private_bus_t *this, bus_listener_t *listener) { pthread_mutex_lock(&this->mutex); - this->listeners->insert_last(this->listeners, listener); + this->listeners->insert_last(this->listeners, entry_create(listener, FALSE)); pthread_mutex_unlock(&this->mutex); } @@ -190,15 +162,16 @@ static void add_listener(private_bus_t *this, bus_listener_t *listener) static void remove_listener(private_bus_t *this, bus_listener_t *listener) { iterator_t *iterator; - bus_listener_t *current; + entry_t *entry; pthread_mutex_lock(&this->mutex); iterator = this->listeners->create_iterator(this->listeners, TRUE); - while (iterator->iterate(iterator, (void**)¤t)) + while (iterator->iterate(iterator, (void**)&entry)) { - if (current == listener) + if (entry->listener == listener) { iterator->remove(iterator); + free(entry); break; } } @@ -206,109 +179,67 @@ static void remove_listener(private_bus_t *this, bus_listener_t *listener) pthread_mutex_unlock(&this->mutex); } +typedef struct cleanup_data_t cleanup_data_t; + +/** + * data to remove a listener using pthread_cleanup handler + */ +struct cleanup_data_t { + /** bus instance */ + private_bus_t *this; + /** listener entry */ + entry_t *entry; +}; + /** - * Get the listener object for the calling thread + * pthread_cleanup handler to remove a listener */ -static active_listener_t *get_active_listener(private_bus_t *this) +static void listener_cleanup(cleanup_data_t *data) { - active_listener_t *current, *found = NULL; iterator_t *iterator; - - /* if the thread was here once before, we have a active_listener record */ - iterator = this->active_listeners->create_iterator(this->active_listeners, TRUE); - while (iterator->iterate(iterator, (void**)¤t)) + entry_t *entry; + + iterator = data->this->listeners->create_iterator(data->this->listeners, TRUE); + while (iterator->iterate(iterator, (void**)&entry)) { - if (current->id == pthread_self()) + if (entry == data->entry) { - found = current; + iterator->remove(iterator); + free(entry); break; } } iterator->destroy(iterator); - - if (found == NULL) - { - /* create a new object for a never-seen thread */ - found = malloc_thing(active_listener_t); - found->id = pthread_self(); - pthread_cond_init(&found->cond, NULL); - this->active_listeners->insert_last(this->active_listeners, found); - } - - return found; -} - -/** - * disable a listener to cleanly clean up - */ -static void unregister(active_listener_t *listener) -{ - listener->state = UNREGISTERED; - pthread_cond_broadcast(&listener->cond); } /** * Implementation of bus_t.listen. */ -static signal_t listen_(private_bus_t *this, level_t *level, int *thread, - ike_sa_t **ike_sa, char** format, va_list* args) +static void listen_(private_bus_t *this, bus_listener_t *listener, job_t *job) { - active_listener_t *listener; - int oldstate; - - pthread_mutex_lock(&this->mutex); - listener = get_active_listener(this); - /* go "listening", say hello to a thread which have a signal for us */ - listener->state = LISTENING; - pthread_cond_broadcast(&listener->cond); - /* wait until it has us delivered a signal, and go back to "registered". - * we allow cancellation here, but must cleanly disable the listener. */ - pthread_cleanup_push((void*)pthread_mutex_unlock, &this->mutex); - pthread_cleanup_push((void*)unregister, listener); - pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &oldstate); - pthread_cond_wait(&listener->cond, &this->mutex); - pthread_setcancelstate(oldstate, NULL); - pthread_cleanup_pop(0); - pthread_cleanup_pop(0); - - pthread_mutex_unlock(&this->mutex); - - /* return signal values */ - *level = listener->level; - *thread = listener->thread; - *ike_sa = listener->ike_sa; - *format = listener->format; - va_copy(*args, listener->args); - va_end(listener->args); + int old; + cleanup_data_t data; - return listener->signal; -} + data.this = this; + data.entry = entry_create(listener, TRUE); -/** - * Implementation of bus_t.set_listen_state. - */ -static void set_listen_state(private_bus_t *this, bool active) -{ - active_listener_t *listener; - pthread_mutex_lock(&this->mutex); - - listener = get_active_listener(this); - if (active) - { - listener->state = REGISTERED; - } - else + this->listeners->insert_last(this->listeners, data.entry); + charon->processor->queue_job(charon->processor, job); + pthread_cleanup_push((void*)pthread_mutex_unlock, &this->mutex); + pthread_cleanup_push((void*)listener_cleanup, &data); + pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &old); + while (data.entry->blocker) { - listener->state = UNREGISTERED; - /* say hello to signal emitter; we are finished processing the signal */ - pthread_cond_broadcast(&listener->cond); + pthread_cond_wait(&data.entry->cond, &this->mutex); } - - pthread_mutex_unlock(&this->mutex); + pthread_setcancelstate(old, NULL); + pthread_cleanup_pop(FALSE); + /* unlock mutex */ + pthread_cleanup_pop(TRUE); + free(data.entry); } - /** * Implementation of bus_t.set_sa. */ @@ -324,72 +255,37 @@ static void vsignal(private_bus_t *this, signal_t signal, level_t level, char* format, va_list args) { iterator_t *iterator; - bus_listener_t *listener; - active_listener_t *active_listener; + entry_t *entry; ike_sa_t *ike_sa; long thread; + pthread_mutex_lock(&this->mutex); ike_sa = pthread_getspecific(this->thread_sa); thread = get_thread_number(this); - pthread_mutex_lock(&this->mutex); - - /* do the job for all passive bus_listeners */ iterator = this->listeners->create_iterator(this->listeners, TRUE); - while (iterator->iterate(iterator, (void**)&listener)) + while (iterator->iterate(iterator, (void**)&entry)) { va_list args_copy; va_copy(args_copy, args); - if (!listener->signal(listener, signal, level, thread, - ike_sa, format, args_copy)) + if (!entry->listener->signal(entry->listener, signal, level, thread, + ike_sa, format, args_copy)) { - /* unregister listener if requested */ iterator->remove(iterator); + if (entry->blocker) + { + entry->blocker = FALSE; + pthread_cond_signal(&entry->cond); + } + else + { + free(entry); + } } va_end(args_copy); } iterator->destroy(iterator); - /* wake up all active listeners */ - iterator = this->active_listeners->create_iterator(this->active_listeners, TRUE); - while (iterator->iterate(iterator, (void**)&active_listener)) - { - /* wait until all threads are registered. But if the thread raising - * the signal is the same as the one that listens, we skip it. - * Otherwise we would deadlock. */ - while (active_listener->id != pthread_self() && - active_listener->state == REGISTERED) - { - pthread_cond_wait(&active_listener->cond, &this->mutex); - } - /* if thread is listening now, give it the signal to process */ - if (active_listener->state == LISTENING) - { - active_listener->level = level; - active_listener->thread = thread; - active_listener->ike_sa = ike_sa; - active_listener->signal = signal; - active_listener->format = format; - va_copy(active_listener->args, args); - active_listener->state = REGISTERED; - pthread_cond_broadcast(&active_listener->cond); - } - } - - /* we must wait now until all are not in state REGISTERED, - * as they may still use our arguments */ - iterator->reset(iterator); - while (iterator->iterate(iterator, (void**)&active_listener)) - { - /* do not wait for ourself, it won't happen (see above) */ - while (active_listener->id != pthread_self() && - active_listener->state == REGISTERED) - { - pthread_cond_wait(&active_listener->cond, &this->mutex); - } - } - iterator->destroy(iterator); - pthread_mutex_unlock(&this->mutex); } @@ -411,8 +307,7 @@ static void signal_(private_bus_t *this, signal_t signal, level_t level, */ static void destroy(private_bus_t *this) { - this->active_listeners->destroy_function(this->active_listeners, free); - this->listeners->destroy(this->listeners); + this->listeners->destroy_function(this->listeners, free); free(this); } @@ -425,18 +320,17 @@ bus_t *bus_create() this->public.add_listener = (void(*)(bus_t*,bus_listener_t*))add_listener; this->public.remove_listener = (void(*)(bus_t*,bus_listener_t*))remove_listener; - this->public.listen = (signal_t(*)(bus_t*,level_t*,int*,ike_sa_t**,char**,va_list*))listen_; - this->public.set_listen_state = (void(*)(bus_t*,bool))set_listen_state; + this->public.listen = (void(*)(bus_t*, bus_listener_t *listener, job_t *job))listen_; this->public.set_sa = (void(*)(bus_t*,ike_sa_t*))set_sa; this->public.signal = (void(*)(bus_t*,signal_t,level_t,char*,...))signal_; this->public.vsignal = (void(*)(bus_t*,signal_t,level_t,char*,va_list))vsignal; this->public.destroy = (void(*)(bus_t*)) destroy; this->listeners = linked_list_create(); - this->active_listeners = linked_list_create(); pthread_mutex_init(&this->mutex, NULL); pthread_key_create(&this->thread_id, NULL); pthread_key_create(&this->thread_sa, NULL); - return &(this->public); + return &this->public; } + diff --git a/src/charon/bus/bus.h b/src/charon/bus/bus.h index 00f1ab7ac..f71018444 100644 --- a/src/charon/bus/bus.h +++ b/src/charon/bus/bus.h @@ -32,6 +32,7 @@ typedef struct bus_t bus_t; #include <sa/ike_sa.h> #include <sa/child_sa.h> +#include <processing/jobs/job.h> /** @@ -251,9 +252,7 @@ struct bus_listener_t { * in receiving event signals registers at the bus. Any signals sent to * are delivered to all registered listeners. * To deliver signals to threads, the blocking listen() call may be used - * to wait for a signal. However, passive listeners should be preferred, - * as listening actively requires some synchronization overhead as data - * must be passed from the raising thread to the listening thread. + * to wait for a signal. * * @ingroup bus */ @@ -280,44 +279,19 @@ struct bus_t { void (*remove_listener) (bus_t *this, bus_listener_t *listener); /** - * @brief Listen actively on the bus. + * @brief Register a listener and block the calling thread. * - * As we are fully multithreaded, we must provide a mechanism - * for active threads to listen to the bus. With the listen() method, - * a thread waits until a signal occurs, and then processes it. - * To prevent the listen() calling thread to miss signals emitted while - * it processes a signal, registration is required. This is done through - * the set_listen_state() method, see below. - * - * The listen() function is (has) a thread cancellation point, so you might - * want to register cleanup handlers. + * This call registers a listener and blocks the calling thread until + * its listeners function returns FALSE. This allows to wait for certain + * events. The associated job is executed after the listener has been + * registered, this allows to listen on events we initiate with the job + * without missing any signals. * * @param this bus - * @param level verbosity level of the signal - * @param thread receives thread number emitted the signal - * @param ike_sa receives the IKE_SA involved in the signal, or NULL - * @param format receives the format string supplied with the signal - * @param va_list receives the variable argument list for format - * @return the emitted signal type + * @param listener listener to register + * @param job job to execute asynchronously when registered, or NULL */ - signal_t (*listen) (bus_t *this, level_t* level, int *thread, - ike_sa_t **ike_sa, char** format, va_list* args); - - /** - * @brief Set the listening state of the calling thread. - * - * To prevent message loss for active listeners using listen(), threads - * must register themself to the bus before starting to listen(). When - * a signal occurs, the emitter waits until all threads with listen_state - * TRUE are waiting in the listen() method to process the signal. - * It is important that a thread with listen_state TRUE calls listen() - * periodically, or sets it's listening state to FALSE; otherwise - * all signal emitting threads get blocked on the bus. - * - * @param this bus - * @param active TRUE to set to listening - */ - void (*set_listen_state) (bus_t *this, bool active); + void (*listen)(bus_t *this, bus_listener_t *listener, job_t *job); /** * @brief Set the IKE_SA the calling thread is using. @@ -355,6 +329,10 @@ struct bus_t { * * Same as bus_t.signal(), but uses va_list argument list. * + * @todo Improve performace of vsignal implementation. This method is + * called extensively and therefore shouldn't allocate heap memory or + * do other expensive tasks! + * * @param this bus * @param singal kind of the signal (up, down, rekeyed, ...) * @param level verbosity level of the signal |