summaryrefslogtreecommitdiff
path: root/src/charon/network
diff options
context:
space:
mode:
Diffstat (limited to 'src/charon/network')
-rw-r--r--src/charon/network/receiver.c164
-rw-r--r--src/charon/network/sender.c69
2 files changed, 108 insertions, 125 deletions
diff --git a/src/charon/network/receiver.c b/src/charon/network/receiver.c
index 9b4bf71ac..1de1dd3d2 100644
--- a/src/charon/network/receiver.c
+++ b/src/charon/network/receiver.c
@@ -30,9 +30,9 @@
#include <daemon.h>
#include <network/socket.h>
#include <network/packet.h>
-#include <processing/job_queue.h>
#include <processing/jobs/job.h>
#include <processing/jobs/process_message_job.h>
+#include <processing/jobs/callback_job.h>
/** length of the full cookie, including time (u_int32_t + SHA1()) */
#define COOKIE_LENGTH 24
@@ -56,12 +56,17 @@ struct private_receiver_t {
/**
* Public part of a receiver_t object.
*/
- receiver_t public;
+ receiver_t public;
+
+ /**
+ * Threads job receiving packets
+ */
+ callback_job_t *job;
- /**
- * Assigned thread.
- */
- pthread_t assigned_thread;
+ /**
+ * Assigned thread.
+ */
+ pthread_t assigned_thread;
/**
* current secret to use for cookie calculation
@@ -245,94 +250,84 @@ static bool peer_to_aggressive(private_receiver_t *this, message_t *message)
/**
* Implementation of receiver_t.receive_packets.
*/
-static void receive_packets(private_receiver_t *this)
+static job_requeue_t receive_packets(private_receiver_t *this)
{
packet_t *packet;
message_t *message;
job_t *job;
- pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
- DBG1(DBG_NET, "receiver thread running, thread_ID: %06u",
- (int)pthread_self());
+ /* read in a packet */
+ if (charon->socket->receive(charon->socket, &packet) != SUCCESS)
+ {
+ DBG2(DBG_NET, "receiving from socket failed!");
+ return JOB_REQUEUE_FAIR;
+ }
- charon->drop_capabilities(charon, TRUE);
+ /* parse message header */
+ message = message_create_from_packet(packet);
+ if (message->parse_header(message) != SUCCESS)
+ {
+ DBG1(DBG_NET, "received invalid IKE header from %H - ignored",
+ packet->get_source(packet));
+ message->destroy(message);
+ return JOB_REQUEUE_DIRECT;
+ }
- while (TRUE)
+ /* check IKE major version */
+ if (message->get_major_version(message) != IKE_MAJOR_VERSION)
{
- /* read in a packet */
- if (charon->socket->receive(charon->socket, &packet) != SUCCESS)
- {
- DBG2(DBG_NET, "receiving from socket failed!");
- /* try again after a delay */
- sleep(1);
- continue;
- }
-
- /* parse message header */
- message = message_create_from_packet(packet);
- if (message->parse_header(message) != SUCCESS)
+ DBG1(DBG_NET, "received unsupported IKE version %d.%d from %H, "
+ "sending INVALID_MAJOR_VERSION", message->get_major_version(message),
+ message->get_minor_version(message), packet->get_source(packet));
+ send_notify(message, INVALID_MAJOR_VERSION, chunk_empty);
+ message->destroy(message);
+ return JOB_REQUEUE_DIRECT;
+ }
+
+ if (message->get_request(message) &&
+ message->get_exchange_type(message) == IKE_SA_INIT)
+ {
+ /* check for cookies */
+ if (cookie_required(this, message))
{
- DBG1(DBG_NET, "received invalid IKE header from %H - ignored",
- packet->get_source(packet));
+ u_int32_t now = time(NULL);
+ chunk_t cookie = cookie_build(this, message, now - this->secret_offset,
+ chunk_from_thing(this->secret));
+
+ DBG2(DBG_NET, "received packet from: %#H to %#H",
+ message->get_source(message),
+ message->get_destination(message));
+ DBG2(DBG_NET, "sending COOKIE notify to %H",
+ message->get_source(message));
+ send_notify(message, COOKIE, cookie);
+ chunk_free(&cookie);
+ if (++this->secret_used > COOKIE_REUSE)
+ {
+ /* create new cookie */
+ DBG1(DBG_NET, "generating new cookie secret after %d uses",
+ this->secret_used);
+ memcpy(this->secret_old, this->secret, SECRET_LENGTH);
+ this->randomizer->get_pseudo_random_bytes(this->randomizer,
+ SECRET_LENGTH, this->secret);
+ this->secret_switch = now;
+ this->secret_used = 0;
+ }
message->destroy(message);
- continue;
+ return JOB_REQUEUE_DIRECT;
}
- /* check IKE major version */
- if (message->get_major_version(message) != IKE_MAJOR_VERSION)
+ /* check if peer has not too many IKE_SAs half open */
+ if (peer_to_aggressive(this, message))
{
- DBG1(DBG_NET, "received unsupported IKE version %d.%d from %H, "
- "sending INVALID_MAJOR_VERSION", message->get_major_version(message),
- message->get_minor_version(message), packet->get_source(packet));
- send_notify(message, INVALID_MAJOR_VERSION, chunk_empty);
+ DBG1(DBG_NET, "ignoring IKE_SA setup from %H, "
+ "peer to aggressive", message->get_source(message));
message->destroy(message);
- continue;
- }
-
- if (message->get_request(message) &&
- message->get_exchange_type(message) == IKE_SA_INIT)
- {
- /* check for cookies */
- if (cookie_required(this, message))
- {
- u_int32_t now = time(NULL);
- chunk_t cookie = cookie_build(this, message, now - this->secret_offset,
- chunk_from_thing(this->secret));
-
- DBG2(DBG_NET, "received packet from: %#H to %#H",
- message->get_source(message),
- message->get_destination(message));
- DBG2(DBG_NET, "sending COOKIE notify to %H",
- message->get_source(message));
- send_notify(message, COOKIE, cookie);
- chunk_free(&cookie);
- if (++this->secret_used > COOKIE_REUSE)
- {
- /* create new cookie */
- DBG1(DBG_NET, "generating new cookie secret after %d uses",
- this->secret_used);
- memcpy(this->secret_old, this->secret, SECRET_LENGTH);
- this->randomizer->get_pseudo_random_bytes(this->randomizer,
- SECRET_LENGTH, this->secret);
- this->secret_switch = now;
- this->secret_used = 0;
- }
- message->destroy(message);
- continue;
- }
-
- /* check if peer has not too many IKE_SAs half open */
- if (peer_to_aggressive(this, message))
- {
- DBG1(DBG_NET, "ignoring IKE_SA setup from %H, "
- "peer to aggressive", message->get_source(message));
- message->destroy(message);
- continue;
- }
+ return JOB_REQUEUE_DIRECT;
}
- job = (job_t *)process_message_job_create(message);
- charon->job_queue->add(charon->job_queue, job);
}
+ job = (job_t*)process_message_job_create(message);
+ charon->processor->queue_job(charon->processor, job);
+ return JOB_REQUEUE_DIRECT;
}
/**
@@ -340,8 +335,7 @@ static void receive_packets(private_receiver_t *this)
*/
static void destroy(private_receiver_t *this)
{
- pthread_cancel(this->assigned_thread);
- pthread_join(this->assigned_thread, NULL);
+ this->job->cancel(this->job);
this->randomizer->destroy(this->randomizer);
this->hasher->destroy(this->hasher);
free(this);
@@ -366,12 +360,10 @@ receiver_t *receiver_create()
this->secret);
memcpy(this->secret_old, this->secret, SECRET_LENGTH);
- if (pthread_create(&this->assigned_thread, NULL,
- (void*)receive_packets, this) != 0)
- {
- free(this);
- charon->kill(charon, "unable to create receiver thread");
- }
+ this->job = callback_job_create((callback_job_cb_t)receive_packets,
+ this, NULL, NULL);
+ charon->processor->queue_job(charon->processor, (job_t*)this->job);
return &this->public;
}
+
diff --git a/src/charon/network/sender.c b/src/charon/network/sender.c
index 933b8c192..f934dc509 100644
--- a/src/charon/network/sender.c
+++ b/src/charon/network/sender.c
@@ -28,6 +28,7 @@
#include <daemon.h>
#include <network/socket.h>
+#include <processing/jobs/callback_job.h>
typedef struct private_sender_t private_sender_t;
@@ -39,12 +40,12 @@ struct private_sender_t {
/**
* Public part of a sender_t object.
*/
- sender_t public;
+ sender_t public;
- /**
- * Assigned thread.
- */
- pthread_t assigned_thread;
+ /**
+ * Sender threads job.
+ */
+ callback_job_t *job;
/**
* The packets are stored in a linked list
@@ -82,37 +83,29 @@ static void send_(private_sender_t *this, packet_t *packet)
/**
* Implementation of private_sender_t.send_packets.
*/
-static void send_packets(private_sender_t * this)
+static job_requeue_t send_packets(private_sender_t * this)
{
- /* cancellation disabled by default */
- pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
- DBG1(DBG_NET, "sender thread running, thread_ID: %06u", (int)pthread_self());
+ packet_t *packet;
+ int oldstate;
- charon->drop_capabilities(charon, TRUE);
-
- while (TRUE)
+ pthread_mutex_lock(&this->mutex);
+ while (this->list->get_count(this->list) == 0)
{
- packet_t *packet;
- int oldstate;
-
- pthread_mutex_lock(&this->mutex);
- /* go to wait while no packets available */
- while (this->list->get_count(this->list) == 0)
- {
- /* add cleanup handler, wait for packet, remove cleanup handler */
- pthread_cleanup_push((void(*)(void*))pthread_mutex_unlock, (void*)&this->mutex);
- pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &oldstate);
- pthread_cond_wait(&this->condvar, &this->mutex);
-
- pthread_setcancelstate(oldstate, NULL);
- pthread_cleanup_pop(0);
- }
- this->list->remove_first(this->list, (void**)&packet);
- pthread_mutex_unlock(&this->mutex);
+ /* add cleanup handler, wait for packet, remove cleanup handler */
+ pthread_cleanup_push((void(*)(void*))pthread_mutex_unlock, (void*)&this->mutex);
+ pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &oldstate);
+
+ pthread_cond_wait(&this->condvar, &this->mutex);
- charon->socket->send(charon->socket, packet);
- packet->destroy(packet);
+ pthread_setcancelstate(oldstate, NULL);
+ pthread_cleanup_pop(0);
}
+ this->list->remove_first(this->list, (void**)&packet);
+ pthread_mutex_unlock(&this->mutex);
+
+ charon->socket->send(charon->socket, packet);
+ packet->destroy(packet);
+ return JOB_REQUEUE_DIRECT;
}
/**
@@ -125,8 +118,7 @@ static void destroy(private_sender_t *this)
{
sched_yield();
}
- pthread_cancel(this->assigned_thread);
- pthread_join(this->assigned_thread, NULL);
+ this->job->cancel(this->job);
this->list->destroy(this->list);
free(this);
}
@@ -145,11 +137,10 @@ sender_t * sender_create()
pthread_mutex_init(&this->mutex, NULL);
pthread_cond_init(&this->condvar, NULL);
- if (pthread_create(&this->assigned_thread, NULL,
- (void*)send_packets, this) != 0)
- {
- charon->kill(charon, "unable to create sender thread");
- }
+ this->job = callback_job_create((callback_job_cb_t)send_packets,
+ this, NULL, NULL);
+ charon->processor->queue_job(charon->processor, (job_t*)this->job);
- return &(this->public);
+ return &this->public;
}
+