summaryrefslogtreecommitdiff
path: root/src/charon/processing
diff options
context:
space:
mode:
Diffstat (limited to 'src/charon/processing')
-rw-r--r--src/charon/processing/jobs/acquire_job.c31
-rw-r--r--src/charon/processing/jobs/acquire_job.h9
-rw-r--r--src/charon/processing/jobs/callback_job.c30
-rw-r--r--src/charon/processing/jobs/initiate_mediation_job.c82
-rw-r--r--src/charon/processing/jobs/migrate_job.c152
-rw-r--r--src/charon/processing/jobs/migrate_job.h65
-rw-r--r--src/charon/processing/processor.c68
7 files changed, 349 insertions, 88 deletions
diff --git a/src/charon/processing/jobs/acquire_job.c b/src/charon/processing/jobs/acquire_job.c
index b39e8e680..50cebd88a 100644
--- a/src/charon/processing/jobs/acquire_job.c
+++ b/src/charon/processing/jobs/acquire_job.c
@@ -12,7 +12,7 @@
* or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* for more details.
*
- * $Id: acquire_job.c 3589 2008-03-13 14:14:44Z martin $
+ * $Id: acquire_job.c 4535 2008-10-31 01:43:23Z andreas $
*/
#include "acquire_job.h"
@@ -35,6 +35,16 @@ struct private_acquire_job_t {
* reqid of the child to rekey
*/
u_int32_t reqid;
+
+ /**
+ * acquired source traffic selector
+ */
+ traffic_selector_t *src_ts;
+
+ /**
+ * acquired destination traffic selector
+ */
+ traffic_selector_t *dst_ts;
};
/**
@@ -42,6 +52,8 @@ struct private_acquire_job_t {
*/
static void destroy(private_acquire_job_t *this)
{
+ DESTROY_IF(this->src_ts);
+ DESTROY_IF(this->dst_ts);
free(this);
}
@@ -50,13 +62,16 @@ static void destroy(private_acquire_job_t *this)
*/
static void execute(private_acquire_job_t *this)
{
- ike_sa_t *ike_sa;
+ ike_sa_t *ike_sa = NULL;
- ike_sa = charon->ike_sa_manager->checkout_by_id(charon->ike_sa_manager,
- this->reqid, TRUE);
+ if (this->reqid)
+ {
+ ike_sa = charon->ike_sa_manager->checkout_by_id(charon->ike_sa_manager,
+ this->reqid, TRUE);
+ }
if (ike_sa == NULL)
{
- DBG2(DBG_JOB, "CHILD_SA with reqid %d not found for acquiring",
+ DBG1(DBG_JOB, "acquire job found no CHILD_SA with reqid {%d}",
this->reqid);
}
else
@@ -71,7 +86,9 @@ static void execute(private_acquire_job_t *this)
/*
* Described in header
*/
-acquire_job_t *acquire_job_create(u_int32_t reqid)
+acquire_job_t *acquire_job_create(u_int32_t reqid,
+ traffic_selector_t *src_ts,
+ traffic_selector_t *dst_ts)
{
private_acquire_job_t *this = malloc_thing(private_acquire_job_t);
@@ -81,6 +98,8 @@ acquire_job_t *acquire_job_create(u_int32_t reqid)
/* private variables */
this->reqid = reqid;
+ this->src_ts = src_ts;
+ this->dst_ts = dst_ts;
return &this->public;
}
diff --git a/src/charon/processing/jobs/acquire_job.h b/src/charon/processing/jobs/acquire_job.h
index 17c993d8e..7459ccc21 100644
--- a/src/charon/processing/jobs/acquire_job.h
+++ b/src/charon/processing/jobs/acquire_job.h
@@ -12,7 +12,7 @@
* or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* for more details.
*
- * $Id: acquire_job.h 3589 2008-03-13 14:14:44Z martin $
+ * $Id: acquire_job.h 4535 2008-10-31 01:43:23Z andreas $
*/
/**
@@ -26,6 +26,7 @@
typedef struct acquire_job_t acquire_job_t;
#include <library.h>
+#include <config/traffic_selector.h>
#include <processing/jobs/job.h>
/**
@@ -46,8 +47,12 @@ struct acquire_job_t {
* We use the reqid to find the routed CHILD_SA.
*
* @param reqid reqid of the CHILD_SA to acquire
+ * @param src_ts source traffic selector
+ * @param dst_ts destination traffic selector
* @return acquire_job_t object
*/
-acquire_job_t *acquire_job_create(u_int32_t reqid);
+acquire_job_t *acquire_job_create(u_int32_t reqid,
+ traffic_selector_t *src_ts,
+ traffic_selector_t *dst_ts);
#endif /* REKEY_CHILD_SA_JOB_H_ @} */
diff --git a/src/charon/processing/jobs/callback_job.c b/src/charon/processing/jobs/callback_job.c
index e8892ee82..f0cebd473 100644
--- a/src/charon/processing/jobs/callback_job.c
+++ b/src/charon/processing/jobs/callback_job.c
@@ -12,12 +12,15 @@
* or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* for more details.
*
- * $Id: callback_job.c 3742 2008-04-03 09:19:12Z tobias $
+ * $Id: callback_job.c 4579 2008-11-05 11:29:56Z martin $
*/
#include "callback_job.h"
+#include <pthread.h>
+
#include <daemon.h>
+#include <utils/mutex.h>
typedef struct private_callback_job_t private_callback_job_t;
@@ -49,12 +52,12 @@ struct private_callback_job_t {
* thread ID of the job, if running
*/
pthread_t thread;
-
+
/**
* mutex to access jobs interna
*/
- pthread_mutex_t mutex;
-
+ mutex_t *mutex;
+
/**
* list of asociated child jobs
*/
@@ -76,6 +79,7 @@ static void destroy(private_callback_job_t *this)
this->cleanup(this->data);
}
this->children->destroy(this->children);
+ this->mutex->destroy(this->mutex);
free(this);
}
@@ -89,7 +93,7 @@ static void unregister(private_callback_job_t *this)
iterator_t *iterator;
private_callback_job_t *child;
- pthread_mutex_lock(&this->parent->mutex);
+ this->parent->mutex->lock(this->parent->mutex);
iterator = this->parent->children->create_iterator(this->parent->children, TRUE);
while (iterator->iterate(iterator, (void**)&child))
{
@@ -100,7 +104,7 @@ static void unregister(private_callback_job_t *this)
}
}
iterator->destroy(iterator);
- pthread_mutex_unlock(&this->parent->mutex);
+ this->parent->mutex->unlock(this->parent->mutex);
}
}
@@ -111,12 +115,12 @@ static void cancel(private_callback_job_t *this)
{
pthread_t thread;
- pthread_mutex_lock(&this->mutex);
+ this->mutex->lock(this->mutex);
thread = this->thread;
/* terminate its children */
this->children->invoke_offset(this->children, offsetof(callback_job_t, cancel));
- pthread_mutex_unlock(&this->mutex);
+ this->mutex->unlock(this->mutex);
/* terminate thread */
if (thread)
@@ -133,9 +137,9 @@ static void execute(private_callback_job_t *this)
{
bool cleanup = FALSE;
- pthread_mutex_lock(&this->mutex);
+ this->mutex->lock(this->mutex);
this->thread = pthread_self();
- pthread_mutex_unlock(&this->mutex);
+ this->mutex->unlock(this->mutex);
pthread_cleanup_push((void*)destroy, this);
while (TRUE)
@@ -180,7 +184,7 @@ callback_job_t *callback_job_create(callback_job_cb_t cb, void *data,
this->public.cancel = (void(*)(callback_job_t*))cancel;
/* private variables */
- pthread_mutex_init(&this->mutex, NULL);
+ this->mutex = mutex_create(MUTEX_DEFAULT);
this->callback = cb;
this->data = data;
this->cleanup = cleanup;
@@ -191,9 +195,9 @@ callback_job_t *callback_job_create(callback_job_cb_t cb, void *data,
/* register us at parent */
if (parent)
{
- pthread_mutex_lock(&this->parent->mutex);
+ this->parent->mutex->lock(this->parent->mutex);
this->parent->children->insert_last(this->parent->children, this);
- pthread_mutex_unlock(&this->parent->mutex);
+ this->parent->mutex->unlock(this->parent->mutex);
}
return &this->public;
diff --git a/src/charon/processing/jobs/initiate_mediation_job.c b/src/charon/processing/jobs/initiate_mediation_job.c
index ee9644045..4d4fd8dc6 100644
--- a/src/charon/processing/jobs/initiate_mediation_job.c
+++ b/src/charon/processing/jobs/initiate_mediation_job.c
@@ -12,7 +12,7 @@
* or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* for more details.
*
- * $Id: initiate_mediation_job.c 4192 2008-07-18 15:51:40Z martin $
+ * $Id: initiate_mediation_job.c 4625 2008-11-11 13:12:05Z tobias $
*/
#include "initiate_mediation_job.h"
@@ -57,15 +57,13 @@ static void destroy(private_initiate_mediation_job_t *this)
* Callback to handle initiation of mediation connection
*/
static bool initiate_callback(private_initiate_mediation_job_t *this,
- signal_t signal, level_t level, ike_sa_t *ike_sa,
- void *data, char *format, va_list args)
+ debug_t group, level_t level, ike_sa_t *ike_sa,
+ char *format, va_list args)
{
- if (signal == CHD_UP_SUCCESS)
+ if (ike_sa && !this->mediation_sa_id)
{
- /* mediation connection is up */
this->mediation_sa_id = ike_sa->get_id(ike_sa);
this->mediation_sa_id = this->mediation_sa_id->clone(this->mediation_sa_id);
- return FALSE;
}
return TRUE;
}
@@ -74,16 +72,15 @@ static bool initiate_callback(private_initiate_mediation_job_t *this,
* Implementation of job_t.execute.
*/
static void initiate(private_initiate_mediation_job_t *this)
-{ /* FIXME: check the logging */
+{
ike_sa_t *mediated_sa, *mediation_sa;
peer_cfg_t *mediated_cfg, *mediation_cfg;
mediated_sa = charon->ike_sa_manager->checkout(charon->ike_sa_manager,
- this->mediated_sa_id);
+ this->mediated_sa_id);
if (mediated_sa)
{
mediated_cfg = mediated_sa->get_peer_cfg(mediated_sa);
- /* get_peer_cfg returns an internal object */
mediated_cfg->get_ref(mediated_cfg);
charon->ike_sa_manager->checkin(charon->ike_sa_manager, mediated_sa);
@@ -98,29 +95,35 @@ static void initiate(private_initiate_mediation_job_t *this)
{
mediated_cfg->destroy(mediated_cfg);
mediation_cfg->destroy(mediation_cfg);
- /* this pointer should still be valid */
- charon->bus->set_sa(charon->bus, mediated_sa);
- DBG1(DBG_IKE, "mediation with the same peer is already in progress, queued");
+
+ mediated_sa = charon->ike_sa_manager->checkout(
+ charon->ike_sa_manager, this->mediated_sa_id);
+ if (mediated_sa)
+ {
+ DBG1(DBG_IKE, "mediation with the same peer is already in "
+ "progress, queued");
+ charon->ike_sa_manager->checkin(
+ charon->ike_sa_manager, mediated_sa);
+ }
destroy(this);
return;
}
/* we need an additional reference because initiate consumes one */
mediation_cfg->get_ref(mediation_cfg);
- /* this function call blocks until the connection is up or failed
- * we do not check the status, but NEED_MORE would be returned on success
- * because the registered callback returns FALSE then
- * this->mediation_sa_id is set in the callback */
- charon->controller->initiate(charon->controller,
- mediation_cfg, NULL, (controller_cb_t)initiate_callback, this);
- if (!this->mediation_sa_id)
+ if (charon->controller->initiate(charon->controller, mediation_cfg,
+ NULL, (controller_cb_t)initiate_callback, this) != SUCCESS)
{
- DBG1(DBG_JOB, "initiating mediation connection '%s' failed",
- mediation_cfg->get_name(mediation_cfg));
mediation_cfg->destroy(mediation_cfg);
mediated_cfg->destroy(mediated_cfg);
- charon->bus->set_sa(charon->bus, mediated_sa);
- SIG_IKE(UP_FAILED, "mediation failed");
+ mediated_sa = charon->ike_sa_manager->checkout(
+ charon->ike_sa_manager, this->mediated_sa_id);
+ if (mediated_sa)
+ {
+ DBG1(DBG_IKE, "initiating mediation connection failed");
+ charon->ike_sa_manager->checkin_and_destroy(
+ charon->ike_sa_manager, mediated_sa);
+ }
destroy(this);
return;
}
@@ -131,15 +134,20 @@ static void initiate(private_initiate_mediation_job_t *this)
if (mediation_sa)
{
- if (mediation_sa->initiate_mediation(mediation_sa, mediated_cfg) != SUCCESS)
+ if (mediation_sa->initiate_mediation(mediation_sa,
+ mediated_cfg) != SUCCESS)
{
- DBG1(DBG_JOB, "initiating mediated connection '%s' failed",
- mediated_cfg->get_name(mediated_cfg));
mediated_cfg->destroy(mediated_cfg);
- charon->ike_sa_manager->checkin_and_destroy(charon->ike_sa_manager, mediation_sa);
-
- charon->bus->set_sa(charon->bus, mediated_sa);
- SIG_IKE(UP_FAILED, "mediation failed");
+ charon->ike_sa_manager->checkin_and_destroy(
+ charon->ike_sa_manager, mediation_sa);
+ mediated_sa = charon->ike_sa_manager->checkout(
+ charon->ike_sa_manager, this->mediated_sa_id);
+ if (mediated_sa)
+ {
+ DBG1(DBG_IKE, "establishing mediation connection failed");
+ charon->ike_sa_manager->checkin_and_destroy(
+ charon->ike_sa_manager, mediated_sa);
+ }
destroy(this);
return;
}
@@ -156,7 +164,7 @@ static void initiate(private_initiate_mediation_job_t *this)
* Implementation of job_t.execute.
*/
static void reinitiate(private_initiate_mediation_job_t *this)
-{ /* FIXME: check the logging */
+{
ike_sa_t *mediated_sa, *mediation_sa;
peer_cfg_t *mediated_cfg;
@@ -178,13 +186,17 @@ static void reinitiate(private_initiate_mediation_job_t *this)
mediated_cfg->get_name(mediated_cfg));
mediated_cfg->destroy(mediated_cfg);
charon->ike_sa_manager->checkin_and_destroy(charon->ike_sa_manager, mediation_sa);
-
- charon->bus->set_sa(charon->bus, mediated_sa);
- SIG_IKE(UP_FAILED, "mediation failed");
+ mediated_sa = charon->ike_sa_manager->checkout(
+ charon->ike_sa_manager, this->mediated_sa_id);
+ if (mediated_sa)
+ {
+ DBG1(DBG_IKE, "establishing mediation connection failed");
+ charon->ike_sa_manager->checkin_and_destroy(
+ charon->ike_sa_manager, mediated_sa);
+ }
destroy(this);
return;
}
-
charon->ike_sa_manager->checkin(charon->ike_sa_manager, mediation_sa);
}
diff --git a/src/charon/processing/jobs/migrate_job.c b/src/charon/processing/jobs/migrate_job.c
new file mode 100644
index 000000000..ec0a76fb9
--- /dev/null
+++ b/src/charon/processing/jobs/migrate_job.c
@@ -0,0 +1,152 @@
+/*
+ * Copyright (C) 2008 Andreas Steffen
+ * Hochschule fuer Technik Rapperswil
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License as published by the
+ * Free Software Foundation; either version 2 of the License, or (at your
+ * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
+ * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * for more details.
+ *
+ * $Id: migrate_job.c 4662 2008-11-16 21:19:58Z andreas $
+ */
+
+#include "migrate_job.h"
+
+#include <daemon.h>
+
+#include <config/child_cfg.h>
+
+
+typedef struct private_migrate_job_t private_migrate_job_t;
+
+/**
+ * Private data of a migrate_job_t object.
+ */
+struct private_migrate_job_t {
+ /**
+ * Public migrate_job_t interface.
+ */
+ migrate_job_t public;
+
+ /**
+ * reqid of the CHILD_SA if it already exists
+ */
+ u_int32_t reqid;
+
+ /**
+ * source traffic selector
+ */
+ traffic_selector_t *src_ts;
+
+ /**
+ * destination traffic selector
+ */
+ traffic_selector_t *dst_ts;
+
+ /**
+ * local host address to be used for IKE
+ */
+ host_t *local;
+
+ /**
+ * remote host address to be used for IKE
+ */
+ host_t *remote;
+};
+
+/**
+ * Implementation of job_t.destroy.
+ */
+static void destroy(private_migrate_job_t *this)
+{
+ DESTROY_IF(this->src_ts);
+ DESTROY_IF(this->dst_ts);
+ DESTROY_IF(this->local);
+ DESTROY_IF(this->remote);
+ free(this);
+}
+
+/**
+ * Implementation of job_t.execute.
+ */
+static void execute(private_migrate_job_t *this)
+{
+ ike_sa_t *ike_sa = NULL;
+
+ if (this->reqid)
+ {
+ ike_sa = charon->ike_sa_manager->checkout_by_id(charon->ike_sa_manager,
+ this->reqid, TRUE);
+ }
+ if (ike_sa)
+ {
+ iterator_t *children;
+ child_sa_t *child_sa;
+ host_t *host;
+
+ children = ike_sa->create_child_sa_iterator(ike_sa);
+ while (children->iterate(children, (void**)&child_sa))
+ {
+ if (child_sa->get_reqid(child_sa) == this->reqid)
+ {
+ break;
+ }
+ }
+ children->destroy(children);
+ DBG2(DBG_JOB, "found CHILD_SA with reqid {%d}", this->reqid);
+
+ ike_sa->set_kmaddress(ike_sa, this->local, this->remote);
+
+ host = this->local->clone(this->local);
+ host->set_port(host, IKEV2_UDP_PORT);
+ ike_sa->set_my_host(ike_sa, host);
+
+ host = this->remote->clone(this->remote);
+ host->set_port(host, IKEV2_UDP_PORT);
+ ike_sa->set_other_host(ike_sa, host);
+
+ if (child_sa->update_hosts(child_sa, this->local, this->remote,
+ ike_sa->get_virtual_ip(ike_sa, TRUE),
+ ike_sa->has_condition(ike_sa, COND_NAT_ANY)) == NOT_SUPPORTED)
+ {
+ ike_sa->rekey_child_sa(ike_sa, child_sa->get_protocol(child_sa),
+ child_sa->get_spi(child_sa, TRUE));
+ }
+ charon->ike_sa_manager->checkin(charon->ike_sa_manager, ike_sa);
+ }
+ else
+ {
+ DBG1(DBG_JOB, "no CHILD_SA found with reqid {%d}", this->reqid);
+ }
+ destroy(this);
+}
+
+/*
+ * Described in header
+ */
+migrate_job_t *migrate_job_create(u_int32_t reqid,
+ traffic_selector_t *src_ts,
+ traffic_selector_t *dst_ts,
+ policy_dir_t dir,
+ host_t *local, host_t *remote)
+{
+ private_migrate_job_t *this = malloc_thing(private_migrate_job_t);
+
+ /* interface functions */
+ this->public.job_interface.execute = (void (*) (job_t *)) execute;
+ this->public.job_interface.destroy = (void (*)(job_t*)) destroy;
+
+ /* private variables */
+ this->reqid = reqid;
+ this->src_ts = (dir == POLICY_OUT) ? src_ts : dst_ts;
+ this->dst_ts = (dir == POLICY_OUT) ? dst_ts : src_ts;
+ this->local = local;
+ this->remote = remote;
+
+ return &this->public;
+}
diff --git a/src/charon/processing/jobs/migrate_job.h b/src/charon/processing/jobs/migrate_job.h
new file mode 100644
index 000000000..a99ffbb0c
--- /dev/null
+++ b/src/charon/processing/jobs/migrate_job.h
@@ -0,0 +1,65 @@
+/*
+ * Copyright (C) 2008 Andreas Steffen
+ * Hochschule fuer Technik Rapperswil
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License as published by the
+ * Free Software Foundation; either version 2 of the License, or (at your
+ * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
+ * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * for more details.
+ *
+ * $Id: migrate_job.h 4662 2008-11-16 21:19:58Z andreas $
+ */
+
+/**
+ * @defgroup migrate_job migrate_job
+ * @{ @ingroup jobs
+ */
+
+#ifndef MIGRATE_JOB_H_
+#define MIGRATE_JOB_H_
+
+typedef struct migrate_job_t migrate_job_t;
+
+#include <library.h>
+#include <utils/host.h>
+#include <config/traffic_selector.h>
+#include <kernel/kernel_ipsec.h>
+#include <processing/jobs/job.h>
+
+/**
+ * Class representing a MIGRATE Job.
+ *
+ * This job sets a routed CHILD_SA for an existing IPsec policy.
+ */
+struct migrate_job_t {
+ /**
+ * The job_t interface.
+ */
+ job_t job_interface;
+};
+
+/**
+ * Creates a job of type MIGRATE.
+ *
+ * We use the reqid or the traffic selectors to find a matching CHILD_SA.
+ *
+ * @param reqid reqid of the CHILD_SA to acquire
+ * @param src_ts source traffic selector to be used in the policy
+ * @param dst_ts destination traffic selector to be used in the policy
+ * @param dir direction of the policy (in|out)
+ * @param local local host address to be used in the IKE_SA
+ * @param remote remote host address to be used in the IKE_SA
+ * @return migrate_job_t object
+ */
+migrate_job_t *migrate_job_create(u_int32_t reqid,
+ traffic_selector_t *src_ts,
+ traffic_selector_t *dst_ts,
+ policy_dir_t dir,
+ host_t *local, host_t *remote);
+
+#endif /* MIGRATE_JOB_H_ @} */
diff --git a/src/charon/processing/processor.c b/src/charon/processing/processor.c
index 010f6624f..9cff090bf 100644
--- a/src/charon/processing/processor.c
+++ b/src/charon/processing/processor.c
@@ -13,7 +13,7 @@
* or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* for more details.
*
- * $Id: processor.c 3742 2008-04-03 09:19:12Z tobias $
+ * $Id: processor.c 4579 2008-11-05 11:29:56Z martin $
*/
#include <stdlib.h>
@@ -24,6 +24,7 @@
#include "processor.h"
#include <daemon.h>
+#include <utils/mutex.h>
#include <utils/linked_list.h>
@@ -61,17 +62,17 @@ struct private_processor_t {
/**
* access to linked_list is locked through this mutex
*/
- pthread_mutex_t mutex;
+ mutex_t *mutex;
/**
* Condvar to wait for new jobs
*/
- pthread_cond_t jobadded;
+ condvar_t *job_added;
/**
* Condvar to wait for terminated threads
*/
- pthread_cond_t threadterminated;
+ condvar_t *thread_terminated;
};
static void process_jobs(private_processor_t *this);
@@ -85,10 +86,10 @@ static void restart(private_processor_t *this)
if (pthread_create(&thread, NULL, (void*)process_jobs, this) != 0)
{
- pthread_mutex_lock(&this->mutex);
+ this->mutex->lock(this->mutex);
this->total_threads--;
- pthread_cond_broadcast(&this->threadterminated);
- pthread_mutex_unlock(&this->mutex);
+ this->thread_terminated->broadcast(this->thread_terminated);
+ this->mutex->unlock(this->mutex);
}
}
@@ -103,7 +104,7 @@ static void process_jobs(private_processor_t *this)
DBG2(DBG_JOB, "started worker thread, thread_ID: %06u", (int)pthread_self());
- pthread_mutex_lock(&this->mutex);
+ this->mutex->lock(this->mutex);
while (this->desired_threads >= this->total_threads)
{
job_t *job;
@@ -111,21 +112,21 @@ static void process_jobs(private_processor_t *this)
if (this->list->get_count(this->list) == 0)
{
this->idle_threads++;
- pthread_cond_wait(&this->jobadded, &this->mutex);
+ this->job_added->wait(this->job_added, this->mutex);
this->idle_threads--;
continue;
}
this->list->remove_first(this->list, (void**)&job);
- pthread_mutex_unlock(&this->mutex);
+ this->mutex->unlock(this->mutex);
/* terminated threads are restarted, so we have a constant pool */
pthread_cleanup_push((void*)restart, this);
job->execute(job);
pthread_cleanup_pop(0);
- pthread_mutex_lock(&this->mutex);
+ this->mutex->lock(this->mutex);
}
this->total_threads--;
- pthread_cond_signal(&this->threadterminated);
- pthread_mutex_unlock(&this->mutex);
+ this->thread_terminated->signal(this->thread_terminated);
+ this->mutex->unlock(this->mutex);
}
/**
@@ -134,9 +135,9 @@ static void process_jobs(private_processor_t *this)
static u_int get_total_threads(private_processor_t *this)
{
u_int count;
- pthread_mutex_lock(&this->mutex);
+ this->mutex->lock(this->mutex);
count = this->total_threads;
- pthread_mutex_unlock(&this->mutex);
+ this->mutex->unlock(this->mutex);
return count;
}
@@ -146,9 +147,9 @@ static u_int get_total_threads(private_processor_t *this)
static u_int get_idle_threads(private_processor_t *this)
{
u_int count;
- pthread_mutex_lock(&this->mutex);
+ this->mutex->lock(this->mutex);
count = this->idle_threads;
- pthread_mutex_unlock(&this->mutex);
+ this->mutex->unlock(this->mutex);
return count;
}
@@ -158,9 +159,9 @@ static u_int get_idle_threads(private_processor_t *this)
static u_int get_job_load(private_processor_t *this)
{
u_int load;
- pthread_mutex_lock(&this->mutex);
+ this->mutex->lock(this->mutex);
load = this->list->get_count(this->list);
- pthread_mutex_unlock(&this->mutex);
+ this->mutex->unlock(this->mutex);
return load;
}
@@ -169,10 +170,10 @@ static u_int get_job_load(private_processor_t *this)
*/
static void queue_job(private_processor_t *this, job_t *job)
{
- pthread_mutex_lock(&this->mutex);
+ this->mutex->lock(this->mutex);
this->list->insert_last(this->list, job);
- pthread_cond_signal(&this->jobadded);
- pthread_mutex_unlock(&this->mutex);
+ this->job_added->signal(this->job_added);
+ this->mutex->unlock(this->mutex);
}
/**
@@ -180,7 +181,7 @@ static void queue_job(private_processor_t *this, job_t *job)
*/
static void set_threads(private_processor_t *this, u_int count)
{
- pthread_mutex_lock(&this->mutex);
+ this->mutex->lock(this->mutex);
if (count > this->total_threads)
{ /* increase thread count */
int i;
@@ -200,8 +201,8 @@ static void set_threads(private_processor_t *this, u_int count)
{ /* decrease thread count */
this->desired_threads = count;
}
- pthread_cond_broadcast(&this->jobadded);
- pthread_mutex_unlock(&this->mutex);
+ this->job_added->broadcast(this->job_added);
+ this->mutex->unlock(this->mutex);
}
/**
@@ -210,13 +211,16 @@ static void set_threads(private_processor_t *this, u_int count)
static void destroy(private_processor_t *this)
{
set_threads(this, 0);
- pthread_mutex_lock(&this->mutex);
+ this->mutex->lock(this->mutex);
while (this->total_threads > 0)
{
- pthread_cond_broadcast(&this->jobadded);
- pthread_cond_wait(&this->threadterminated, &this->mutex);
+ this->job_added->broadcast(this->job_added);
+ this->thread_terminated->wait(this->thread_terminated, this->mutex);
}
- pthread_mutex_unlock(&this->mutex);
+ this->mutex->unlock(this->mutex);
+ this->thread_terminated->destroy(this->thread_terminated);
+ this->job_added->destroy(this->job_added);
+ this->mutex->destroy(this->mutex);
this->list->destroy_offset(this->list, offsetof(job_t, destroy));
free(this);
}
@@ -236,9 +240,9 @@ processor_t *processor_create(size_t pool_size)
this->public.destroy = (void(*)(processor_t*))destroy;
this->list = linked_list_create();
- pthread_mutex_init(&this->mutex, NULL);
- pthread_cond_init(&this->jobadded, NULL);
- pthread_cond_init(&this->threadterminated, NULL);
+ this->mutex = mutex_create(MUTEX_DEFAULT);
+ this->job_added = condvar_create(CONDVAR_DEFAULT);
+ this->thread_terminated = condvar_create(CONDVAR_DEFAULT);
this->total_threads = 0;
this->desired_threads = 0;
this->idle_threads = 0;