diff options
Diffstat (limited to 'src/charon/processing')
-rw-r--r-- | src/charon/processing/jobs/acquire_job.c | 31 | ||||
-rw-r--r-- | src/charon/processing/jobs/acquire_job.h | 9 | ||||
-rw-r--r-- | src/charon/processing/jobs/callback_job.c | 30 | ||||
-rw-r--r-- | src/charon/processing/jobs/initiate_mediation_job.c | 82 | ||||
-rw-r--r-- | src/charon/processing/jobs/migrate_job.c | 152 | ||||
-rw-r--r-- | src/charon/processing/jobs/migrate_job.h | 65 | ||||
-rw-r--r-- | src/charon/processing/processor.c | 68 |
7 files changed, 349 insertions, 88 deletions
diff --git a/src/charon/processing/jobs/acquire_job.c b/src/charon/processing/jobs/acquire_job.c index b39e8e680..50cebd88a 100644 --- a/src/charon/processing/jobs/acquire_job.c +++ b/src/charon/processing/jobs/acquire_job.c @@ -12,7 +12,7 @@ * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License * for more details. * - * $Id: acquire_job.c 3589 2008-03-13 14:14:44Z martin $ + * $Id: acquire_job.c 4535 2008-10-31 01:43:23Z andreas $ */ #include "acquire_job.h" @@ -35,6 +35,16 @@ struct private_acquire_job_t { * reqid of the child to rekey */ u_int32_t reqid; + + /** + * acquired source traffic selector + */ + traffic_selector_t *src_ts; + + /** + * acquired destination traffic selector + */ + traffic_selector_t *dst_ts; }; /** @@ -42,6 +52,8 @@ struct private_acquire_job_t { */ static void destroy(private_acquire_job_t *this) { + DESTROY_IF(this->src_ts); + DESTROY_IF(this->dst_ts); free(this); } @@ -50,13 +62,16 @@ static void destroy(private_acquire_job_t *this) */ static void execute(private_acquire_job_t *this) { - ike_sa_t *ike_sa; + ike_sa_t *ike_sa = NULL; - ike_sa = charon->ike_sa_manager->checkout_by_id(charon->ike_sa_manager, - this->reqid, TRUE); + if (this->reqid) + { + ike_sa = charon->ike_sa_manager->checkout_by_id(charon->ike_sa_manager, + this->reqid, TRUE); + } if (ike_sa == NULL) { - DBG2(DBG_JOB, "CHILD_SA with reqid %d not found for acquiring", + DBG1(DBG_JOB, "acquire job found no CHILD_SA with reqid {%d}", this->reqid); } else @@ -71,7 +86,9 @@ static void execute(private_acquire_job_t *this) /* * Described in header */ -acquire_job_t *acquire_job_create(u_int32_t reqid) +acquire_job_t *acquire_job_create(u_int32_t reqid, + traffic_selector_t *src_ts, + traffic_selector_t *dst_ts) { private_acquire_job_t *this = malloc_thing(private_acquire_job_t); @@ -81,6 +98,8 @@ acquire_job_t *acquire_job_create(u_int32_t reqid) /* private variables */ this->reqid = reqid; + this->src_ts = src_ts; + this->dst_ts = dst_ts; return &this->public; } diff --git a/src/charon/processing/jobs/acquire_job.h b/src/charon/processing/jobs/acquire_job.h index 17c993d8e..7459ccc21 100644 --- a/src/charon/processing/jobs/acquire_job.h +++ b/src/charon/processing/jobs/acquire_job.h @@ -12,7 +12,7 @@ * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License * for more details. * - * $Id: acquire_job.h 3589 2008-03-13 14:14:44Z martin $ + * $Id: acquire_job.h 4535 2008-10-31 01:43:23Z andreas $ */ /** @@ -26,6 +26,7 @@ typedef struct acquire_job_t acquire_job_t; #include <library.h> +#include <config/traffic_selector.h> #include <processing/jobs/job.h> /** @@ -46,8 +47,12 @@ struct acquire_job_t { * We use the reqid to find the routed CHILD_SA. * * @param reqid reqid of the CHILD_SA to acquire + * @param src_ts source traffic selector + * @param dst_ts destination traffic selector * @return acquire_job_t object */ -acquire_job_t *acquire_job_create(u_int32_t reqid); +acquire_job_t *acquire_job_create(u_int32_t reqid, + traffic_selector_t *src_ts, + traffic_selector_t *dst_ts); #endif /* REKEY_CHILD_SA_JOB_H_ @} */ diff --git a/src/charon/processing/jobs/callback_job.c b/src/charon/processing/jobs/callback_job.c index e8892ee82..f0cebd473 100644 --- a/src/charon/processing/jobs/callback_job.c +++ b/src/charon/processing/jobs/callback_job.c @@ -12,12 +12,15 @@ * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License * for more details. * - * $Id: callback_job.c 3742 2008-04-03 09:19:12Z tobias $ + * $Id: callback_job.c 4579 2008-11-05 11:29:56Z martin $ */ #include "callback_job.h" +#include <pthread.h> + #include <daemon.h> +#include <utils/mutex.h> typedef struct private_callback_job_t private_callback_job_t; @@ -49,12 +52,12 @@ struct private_callback_job_t { * thread ID of the job, if running */ pthread_t thread; - + /** * mutex to access jobs interna */ - pthread_mutex_t mutex; - + mutex_t *mutex; + /** * list of asociated child jobs */ @@ -76,6 +79,7 @@ static void destroy(private_callback_job_t *this) this->cleanup(this->data); } this->children->destroy(this->children); + this->mutex->destroy(this->mutex); free(this); } @@ -89,7 +93,7 @@ static void unregister(private_callback_job_t *this) iterator_t *iterator; private_callback_job_t *child; - pthread_mutex_lock(&this->parent->mutex); + this->parent->mutex->lock(this->parent->mutex); iterator = this->parent->children->create_iterator(this->parent->children, TRUE); while (iterator->iterate(iterator, (void**)&child)) { @@ -100,7 +104,7 @@ static void unregister(private_callback_job_t *this) } } iterator->destroy(iterator); - pthread_mutex_unlock(&this->parent->mutex); + this->parent->mutex->unlock(this->parent->mutex); } } @@ -111,12 +115,12 @@ static void cancel(private_callback_job_t *this) { pthread_t thread; - pthread_mutex_lock(&this->mutex); + this->mutex->lock(this->mutex); thread = this->thread; /* terminate its children */ this->children->invoke_offset(this->children, offsetof(callback_job_t, cancel)); - pthread_mutex_unlock(&this->mutex); + this->mutex->unlock(this->mutex); /* terminate thread */ if (thread) @@ -133,9 +137,9 @@ static void execute(private_callback_job_t *this) { bool cleanup = FALSE; - pthread_mutex_lock(&this->mutex); + this->mutex->lock(this->mutex); this->thread = pthread_self(); - pthread_mutex_unlock(&this->mutex); + this->mutex->unlock(this->mutex); pthread_cleanup_push((void*)destroy, this); while (TRUE) @@ -180,7 +184,7 @@ callback_job_t *callback_job_create(callback_job_cb_t cb, void *data, this->public.cancel = (void(*)(callback_job_t*))cancel; /* private variables */ - pthread_mutex_init(&this->mutex, NULL); + this->mutex = mutex_create(MUTEX_DEFAULT); this->callback = cb; this->data = data; this->cleanup = cleanup; @@ -191,9 +195,9 @@ callback_job_t *callback_job_create(callback_job_cb_t cb, void *data, /* register us at parent */ if (parent) { - pthread_mutex_lock(&this->parent->mutex); + this->parent->mutex->lock(this->parent->mutex); this->parent->children->insert_last(this->parent->children, this); - pthread_mutex_unlock(&this->parent->mutex); + this->parent->mutex->unlock(this->parent->mutex); } return &this->public; diff --git a/src/charon/processing/jobs/initiate_mediation_job.c b/src/charon/processing/jobs/initiate_mediation_job.c index ee9644045..4d4fd8dc6 100644 --- a/src/charon/processing/jobs/initiate_mediation_job.c +++ b/src/charon/processing/jobs/initiate_mediation_job.c @@ -12,7 +12,7 @@ * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License * for more details. * - * $Id: initiate_mediation_job.c 4192 2008-07-18 15:51:40Z martin $ + * $Id: initiate_mediation_job.c 4625 2008-11-11 13:12:05Z tobias $ */ #include "initiate_mediation_job.h" @@ -57,15 +57,13 @@ static void destroy(private_initiate_mediation_job_t *this) * Callback to handle initiation of mediation connection */ static bool initiate_callback(private_initiate_mediation_job_t *this, - signal_t signal, level_t level, ike_sa_t *ike_sa, - void *data, char *format, va_list args) + debug_t group, level_t level, ike_sa_t *ike_sa, + char *format, va_list args) { - if (signal == CHD_UP_SUCCESS) + if (ike_sa && !this->mediation_sa_id) { - /* mediation connection is up */ this->mediation_sa_id = ike_sa->get_id(ike_sa); this->mediation_sa_id = this->mediation_sa_id->clone(this->mediation_sa_id); - return FALSE; } return TRUE; } @@ -74,16 +72,15 @@ static bool initiate_callback(private_initiate_mediation_job_t *this, * Implementation of job_t.execute. */ static void initiate(private_initiate_mediation_job_t *this) -{ /* FIXME: check the logging */ +{ ike_sa_t *mediated_sa, *mediation_sa; peer_cfg_t *mediated_cfg, *mediation_cfg; mediated_sa = charon->ike_sa_manager->checkout(charon->ike_sa_manager, - this->mediated_sa_id); + this->mediated_sa_id); if (mediated_sa) { mediated_cfg = mediated_sa->get_peer_cfg(mediated_sa); - /* get_peer_cfg returns an internal object */ mediated_cfg->get_ref(mediated_cfg); charon->ike_sa_manager->checkin(charon->ike_sa_manager, mediated_sa); @@ -98,29 +95,35 @@ static void initiate(private_initiate_mediation_job_t *this) { mediated_cfg->destroy(mediated_cfg); mediation_cfg->destroy(mediation_cfg); - /* this pointer should still be valid */ - charon->bus->set_sa(charon->bus, mediated_sa); - DBG1(DBG_IKE, "mediation with the same peer is already in progress, queued"); + + mediated_sa = charon->ike_sa_manager->checkout( + charon->ike_sa_manager, this->mediated_sa_id); + if (mediated_sa) + { + DBG1(DBG_IKE, "mediation with the same peer is already in " + "progress, queued"); + charon->ike_sa_manager->checkin( + charon->ike_sa_manager, mediated_sa); + } destroy(this); return; } /* we need an additional reference because initiate consumes one */ mediation_cfg->get_ref(mediation_cfg); - /* this function call blocks until the connection is up or failed - * we do not check the status, but NEED_MORE would be returned on success - * because the registered callback returns FALSE then - * this->mediation_sa_id is set in the callback */ - charon->controller->initiate(charon->controller, - mediation_cfg, NULL, (controller_cb_t)initiate_callback, this); - if (!this->mediation_sa_id) + if (charon->controller->initiate(charon->controller, mediation_cfg, + NULL, (controller_cb_t)initiate_callback, this) != SUCCESS) { - DBG1(DBG_JOB, "initiating mediation connection '%s' failed", - mediation_cfg->get_name(mediation_cfg)); mediation_cfg->destroy(mediation_cfg); mediated_cfg->destroy(mediated_cfg); - charon->bus->set_sa(charon->bus, mediated_sa); - SIG_IKE(UP_FAILED, "mediation failed"); + mediated_sa = charon->ike_sa_manager->checkout( + charon->ike_sa_manager, this->mediated_sa_id); + if (mediated_sa) + { + DBG1(DBG_IKE, "initiating mediation connection failed"); + charon->ike_sa_manager->checkin_and_destroy( + charon->ike_sa_manager, mediated_sa); + } destroy(this); return; } @@ -131,15 +134,20 @@ static void initiate(private_initiate_mediation_job_t *this) if (mediation_sa) { - if (mediation_sa->initiate_mediation(mediation_sa, mediated_cfg) != SUCCESS) + if (mediation_sa->initiate_mediation(mediation_sa, + mediated_cfg) != SUCCESS) { - DBG1(DBG_JOB, "initiating mediated connection '%s' failed", - mediated_cfg->get_name(mediated_cfg)); mediated_cfg->destroy(mediated_cfg); - charon->ike_sa_manager->checkin_and_destroy(charon->ike_sa_manager, mediation_sa); - - charon->bus->set_sa(charon->bus, mediated_sa); - SIG_IKE(UP_FAILED, "mediation failed"); + charon->ike_sa_manager->checkin_and_destroy( + charon->ike_sa_manager, mediation_sa); + mediated_sa = charon->ike_sa_manager->checkout( + charon->ike_sa_manager, this->mediated_sa_id); + if (mediated_sa) + { + DBG1(DBG_IKE, "establishing mediation connection failed"); + charon->ike_sa_manager->checkin_and_destroy( + charon->ike_sa_manager, mediated_sa); + } destroy(this); return; } @@ -156,7 +164,7 @@ static void initiate(private_initiate_mediation_job_t *this) * Implementation of job_t.execute. */ static void reinitiate(private_initiate_mediation_job_t *this) -{ /* FIXME: check the logging */ +{ ike_sa_t *mediated_sa, *mediation_sa; peer_cfg_t *mediated_cfg; @@ -178,13 +186,17 @@ static void reinitiate(private_initiate_mediation_job_t *this) mediated_cfg->get_name(mediated_cfg)); mediated_cfg->destroy(mediated_cfg); charon->ike_sa_manager->checkin_and_destroy(charon->ike_sa_manager, mediation_sa); - - charon->bus->set_sa(charon->bus, mediated_sa); - SIG_IKE(UP_FAILED, "mediation failed"); + mediated_sa = charon->ike_sa_manager->checkout( + charon->ike_sa_manager, this->mediated_sa_id); + if (mediated_sa) + { + DBG1(DBG_IKE, "establishing mediation connection failed"); + charon->ike_sa_manager->checkin_and_destroy( + charon->ike_sa_manager, mediated_sa); + } destroy(this); return; } - charon->ike_sa_manager->checkin(charon->ike_sa_manager, mediation_sa); } diff --git a/src/charon/processing/jobs/migrate_job.c b/src/charon/processing/jobs/migrate_job.c new file mode 100644 index 000000000..ec0a76fb9 --- /dev/null +++ b/src/charon/processing/jobs/migrate_job.c @@ -0,0 +1,152 @@ +/* + * Copyright (C) 2008 Andreas Steffen + * Hochschule fuer Technik Rapperswil + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the + * Free Software Foundation; either version 2 of the License, or (at your + * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY + * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * for more details. + * + * $Id: migrate_job.c 4662 2008-11-16 21:19:58Z andreas $ + */ + +#include "migrate_job.h" + +#include <daemon.h> + +#include <config/child_cfg.h> + + +typedef struct private_migrate_job_t private_migrate_job_t; + +/** + * Private data of a migrate_job_t object. + */ +struct private_migrate_job_t { + /** + * Public migrate_job_t interface. + */ + migrate_job_t public; + + /** + * reqid of the CHILD_SA if it already exists + */ + u_int32_t reqid; + + /** + * source traffic selector + */ + traffic_selector_t *src_ts; + + /** + * destination traffic selector + */ + traffic_selector_t *dst_ts; + + /** + * local host address to be used for IKE + */ + host_t *local; + + /** + * remote host address to be used for IKE + */ + host_t *remote; +}; + +/** + * Implementation of job_t.destroy. + */ +static void destroy(private_migrate_job_t *this) +{ + DESTROY_IF(this->src_ts); + DESTROY_IF(this->dst_ts); + DESTROY_IF(this->local); + DESTROY_IF(this->remote); + free(this); +} + +/** + * Implementation of job_t.execute. + */ +static void execute(private_migrate_job_t *this) +{ + ike_sa_t *ike_sa = NULL; + + if (this->reqid) + { + ike_sa = charon->ike_sa_manager->checkout_by_id(charon->ike_sa_manager, + this->reqid, TRUE); + } + if (ike_sa) + { + iterator_t *children; + child_sa_t *child_sa; + host_t *host; + + children = ike_sa->create_child_sa_iterator(ike_sa); + while (children->iterate(children, (void**)&child_sa)) + { + if (child_sa->get_reqid(child_sa) == this->reqid) + { + break; + } + } + children->destroy(children); + DBG2(DBG_JOB, "found CHILD_SA with reqid {%d}", this->reqid); + + ike_sa->set_kmaddress(ike_sa, this->local, this->remote); + + host = this->local->clone(this->local); + host->set_port(host, IKEV2_UDP_PORT); + ike_sa->set_my_host(ike_sa, host); + + host = this->remote->clone(this->remote); + host->set_port(host, IKEV2_UDP_PORT); + ike_sa->set_other_host(ike_sa, host); + + if (child_sa->update_hosts(child_sa, this->local, this->remote, + ike_sa->get_virtual_ip(ike_sa, TRUE), + ike_sa->has_condition(ike_sa, COND_NAT_ANY)) == NOT_SUPPORTED) + { + ike_sa->rekey_child_sa(ike_sa, child_sa->get_protocol(child_sa), + child_sa->get_spi(child_sa, TRUE)); + } + charon->ike_sa_manager->checkin(charon->ike_sa_manager, ike_sa); + } + else + { + DBG1(DBG_JOB, "no CHILD_SA found with reqid {%d}", this->reqid); + } + destroy(this); +} + +/* + * Described in header + */ +migrate_job_t *migrate_job_create(u_int32_t reqid, + traffic_selector_t *src_ts, + traffic_selector_t *dst_ts, + policy_dir_t dir, + host_t *local, host_t *remote) +{ + private_migrate_job_t *this = malloc_thing(private_migrate_job_t); + + /* interface functions */ + this->public.job_interface.execute = (void (*) (job_t *)) execute; + this->public.job_interface.destroy = (void (*)(job_t*)) destroy; + + /* private variables */ + this->reqid = reqid; + this->src_ts = (dir == POLICY_OUT) ? src_ts : dst_ts; + this->dst_ts = (dir == POLICY_OUT) ? dst_ts : src_ts; + this->local = local; + this->remote = remote; + + return &this->public; +} diff --git a/src/charon/processing/jobs/migrate_job.h b/src/charon/processing/jobs/migrate_job.h new file mode 100644 index 000000000..a99ffbb0c --- /dev/null +++ b/src/charon/processing/jobs/migrate_job.h @@ -0,0 +1,65 @@ +/* + * Copyright (C) 2008 Andreas Steffen + * Hochschule fuer Technik Rapperswil + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the + * Free Software Foundation; either version 2 of the License, or (at your + * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY + * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * for more details. + * + * $Id: migrate_job.h 4662 2008-11-16 21:19:58Z andreas $ + */ + +/** + * @defgroup migrate_job migrate_job + * @{ @ingroup jobs + */ + +#ifndef MIGRATE_JOB_H_ +#define MIGRATE_JOB_H_ + +typedef struct migrate_job_t migrate_job_t; + +#include <library.h> +#include <utils/host.h> +#include <config/traffic_selector.h> +#include <kernel/kernel_ipsec.h> +#include <processing/jobs/job.h> + +/** + * Class representing a MIGRATE Job. + * + * This job sets a routed CHILD_SA for an existing IPsec policy. + */ +struct migrate_job_t { + /** + * The job_t interface. + */ + job_t job_interface; +}; + +/** + * Creates a job of type MIGRATE. + * + * We use the reqid or the traffic selectors to find a matching CHILD_SA. + * + * @param reqid reqid of the CHILD_SA to acquire + * @param src_ts source traffic selector to be used in the policy + * @param dst_ts destination traffic selector to be used in the policy + * @param dir direction of the policy (in|out) + * @param local local host address to be used in the IKE_SA + * @param remote remote host address to be used in the IKE_SA + * @return migrate_job_t object + */ +migrate_job_t *migrate_job_create(u_int32_t reqid, + traffic_selector_t *src_ts, + traffic_selector_t *dst_ts, + policy_dir_t dir, + host_t *local, host_t *remote); + +#endif /* MIGRATE_JOB_H_ @} */ diff --git a/src/charon/processing/processor.c b/src/charon/processing/processor.c index 010f6624f..9cff090bf 100644 --- a/src/charon/processing/processor.c +++ b/src/charon/processing/processor.c @@ -13,7 +13,7 @@ * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License * for more details. * - * $Id: processor.c 3742 2008-04-03 09:19:12Z tobias $ + * $Id: processor.c 4579 2008-11-05 11:29:56Z martin $ */ #include <stdlib.h> @@ -24,6 +24,7 @@ #include "processor.h" #include <daemon.h> +#include <utils/mutex.h> #include <utils/linked_list.h> @@ -61,17 +62,17 @@ struct private_processor_t { /** * access to linked_list is locked through this mutex */ - pthread_mutex_t mutex; + mutex_t *mutex; /** * Condvar to wait for new jobs */ - pthread_cond_t jobadded; + condvar_t *job_added; /** * Condvar to wait for terminated threads */ - pthread_cond_t threadterminated; + condvar_t *thread_terminated; }; static void process_jobs(private_processor_t *this); @@ -85,10 +86,10 @@ static void restart(private_processor_t *this) if (pthread_create(&thread, NULL, (void*)process_jobs, this) != 0) { - pthread_mutex_lock(&this->mutex); + this->mutex->lock(this->mutex); this->total_threads--; - pthread_cond_broadcast(&this->threadterminated); - pthread_mutex_unlock(&this->mutex); + this->thread_terminated->broadcast(this->thread_terminated); + this->mutex->unlock(this->mutex); } } @@ -103,7 +104,7 @@ static void process_jobs(private_processor_t *this) DBG2(DBG_JOB, "started worker thread, thread_ID: %06u", (int)pthread_self()); - pthread_mutex_lock(&this->mutex); + this->mutex->lock(this->mutex); while (this->desired_threads >= this->total_threads) { job_t *job; @@ -111,21 +112,21 @@ static void process_jobs(private_processor_t *this) if (this->list->get_count(this->list) == 0) { this->idle_threads++; - pthread_cond_wait(&this->jobadded, &this->mutex); + this->job_added->wait(this->job_added, this->mutex); this->idle_threads--; continue; } this->list->remove_first(this->list, (void**)&job); - pthread_mutex_unlock(&this->mutex); + this->mutex->unlock(this->mutex); /* terminated threads are restarted, so we have a constant pool */ pthread_cleanup_push((void*)restart, this); job->execute(job); pthread_cleanup_pop(0); - pthread_mutex_lock(&this->mutex); + this->mutex->lock(this->mutex); } this->total_threads--; - pthread_cond_signal(&this->threadterminated); - pthread_mutex_unlock(&this->mutex); + this->thread_terminated->signal(this->thread_terminated); + this->mutex->unlock(this->mutex); } /** @@ -134,9 +135,9 @@ static void process_jobs(private_processor_t *this) static u_int get_total_threads(private_processor_t *this) { u_int count; - pthread_mutex_lock(&this->mutex); + this->mutex->lock(this->mutex); count = this->total_threads; - pthread_mutex_unlock(&this->mutex); + this->mutex->unlock(this->mutex); return count; } @@ -146,9 +147,9 @@ static u_int get_total_threads(private_processor_t *this) static u_int get_idle_threads(private_processor_t *this) { u_int count; - pthread_mutex_lock(&this->mutex); + this->mutex->lock(this->mutex); count = this->idle_threads; - pthread_mutex_unlock(&this->mutex); + this->mutex->unlock(this->mutex); return count; } @@ -158,9 +159,9 @@ static u_int get_idle_threads(private_processor_t *this) static u_int get_job_load(private_processor_t *this) { u_int load; - pthread_mutex_lock(&this->mutex); + this->mutex->lock(this->mutex); load = this->list->get_count(this->list); - pthread_mutex_unlock(&this->mutex); + this->mutex->unlock(this->mutex); return load; } @@ -169,10 +170,10 @@ static u_int get_job_load(private_processor_t *this) */ static void queue_job(private_processor_t *this, job_t *job) { - pthread_mutex_lock(&this->mutex); + this->mutex->lock(this->mutex); this->list->insert_last(this->list, job); - pthread_cond_signal(&this->jobadded); - pthread_mutex_unlock(&this->mutex); + this->job_added->signal(this->job_added); + this->mutex->unlock(this->mutex); } /** @@ -180,7 +181,7 @@ static void queue_job(private_processor_t *this, job_t *job) */ static void set_threads(private_processor_t *this, u_int count) { - pthread_mutex_lock(&this->mutex); + this->mutex->lock(this->mutex); if (count > this->total_threads) { /* increase thread count */ int i; @@ -200,8 +201,8 @@ static void set_threads(private_processor_t *this, u_int count) { /* decrease thread count */ this->desired_threads = count; } - pthread_cond_broadcast(&this->jobadded); - pthread_mutex_unlock(&this->mutex); + this->job_added->broadcast(this->job_added); + this->mutex->unlock(this->mutex); } /** @@ -210,13 +211,16 @@ static void set_threads(private_processor_t *this, u_int count) static void destroy(private_processor_t *this) { set_threads(this, 0); - pthread_mutex_lock(&this->mutex); + this->mutex->lock(this->mutex); while (this->total_threads > 0) { - pthread_cond_broadcast(&this->jobadded); - pthread_cond_wait(&this->threadterminated, &this->mutex); + this->job_added->broadcast(this->job_added); + this->thread_terminated->wait(this->thread_terminated, this->mutex); } - pthread_mutex_unlock(&this->mutex); + this->mutex->unlock(this->mutex); + this->thread_terminated->destroy(this->thread_terminated); + this->job_added->destroy(this->job_added); + this->mutex->destroy(this->mutex); this->list->destroy_offset(this->list, offsetof(job_t, destroy)); free(this); } @@ -236,9 +240,9 @@ processor_t *processor_create(size_t pool_size) this->public.destroy = (void(*)(processor_t*))destroy; this->list = linked_list_create(); - pthread_mutex_init(&this->mutex, NULL); - pthread_cond_init(&this->jobadded, NULL); - pthread_cond_init(&this->threadterminated, NULL); + this->mutex = mutex_create(MUTEX_DEFAULT); + this->job_added = condvar_create(CONDVAR_DEFAULT); + this->thread_terminated = condvar_create(CONDVAR_DEFAULT); this->total_threads = 0; this->desired_threads = 0; this->idle_threads = 0; |