diff options
Diffstat (limited to 'src/charon/network')
-rw-r--r-- | src/charon/network/receiver.c | 164 | ||||
-rw-r--r-- | src/charon/network/sender.c | 69 |
2 files changed, 108 insertions, 125 deletions
diff --git a/src/charon/network/receiver.c b/src/charon/network/receiver.c index 9b4bf71ac..1de1dd3d2 100644 --- a/src/charon/network/receiver.c +++ b/src/charon/network/receiver.c @@ -30,9 +30,9 @@ #include <daemon.h> #include <network/socket.h> #include <network/packet.h> -#include <processing/job_queue.h> #include <processing/jobs/job.h> #include <processing/jobs/process_message_job.h> +#include <processing/jobs/callback_job.h> /** length of the full cookie, including time (u_int32_t + SHA1()) */ #define COOKIE_LENGTH 24 @@ -56,12 +56,17 @@ struct private_receiver_t { /** * Public part of a receiver_t object. */ - receiver_t public; + receiver_t public; + + /** + * Threads job receiving packets + */ + callback_job_t *job; - /** - * Assigned thread. - */ - pthread_t assigned_thread; + /** + * Assigned thread. + */ + pthread_t assigned_thread; /** * current secret to use for cookie calculation @@ -245,94 +250,84 @@ static bool peer_to_aggressive(private_receiver_t *this, message_t *message) /** * Implementation of receiver_t.receive_packets. */ -static void receive_packets(private_receiver_t *this) +static job_requeue_t receive_packets(private_receiver_t *this) { packet_t *packet; message_t *message; job_t *job; - pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); - DBG1(DBG_NET, "receiver thread running, thread_ID: %06u", - (int)pthread_self()); + /* read in a packet */ + if (charon->socket->receive(charon->socket, &packet) != SUCCESS) + { + DBG2(DBG_NET, "receiving from socket failed!"); + return JOB_REQUEUE_FAIR; + } - charon->drop_capabilities(charon, TRUE); + /* parse message header */ + message = message_create_from_packet(packet); + if (message->parse_header(message) != SUCCESS) + { + DBG1(DBG_NET, "received invalid IKE header from %H - ignored", + packet->get_source(packet)); + message->destroy(message); + return JOB_REQUEUE_DIRECT; + } - while (TRUE) + /* check IKE major version */ + if (message->get_major_version(message) != IKE_MAJOR_VERSION) { - /* read in a packet */ - if (charon->socket->receive(charon->socket, &packet) != SUCCESS) - { - DBG2(DBG_NET, "receiving from socket failed!"); - /* try again after a delay */ - sleep(1); - continue; - } - - /* parse message header */ - message = message_create_from_packet(packet); - if (message->parse_header(message) != SUCCESS) + DBG1(DBG_NET, "received unsupported IKE version %d.%d from %H, " + "sending INVALID_MAJOR_VERSION", message->get_major_version(message), + message->get_minor_version(message), packet->get_source(packet)); + send_notify(message, INVALID_MAJOR_VERSION, chunk_empty); + message->destroy(message); + return JOB_REQUEUE_DIRECT; + } + + if (message->get_request(message) && + message->get_exchange_type(message) == IKE_SA_INIT) + { + /* check for cookies */ + if (cookie_required(this, message)) { - DBG1(DBG_NET, "received invalid IKE header from %H - ignored", - packet->get_source(packet)); + u_int32_t now = time(NULL); + chunk_t cookie = cookie_build(this, message, now - this->secret_offset, + chunk_from_thing(this->secret)); + + DBG2(DBG_NET, "received packet from: %#H to %#H", + message->get_source(message), + message->get_destination(message)); + DBG2(DBG_NET, "sending COOKIE notify to %H", + message->get_source(message)); + send_notify(message, COOKIE, cookie); + chunk_free(&cookie); + if (++this->secret_used > COOKIE_REUSE) + { + /* create new cookie */ + DBG1(DBG_NET, "generating new cookie secret after %d uses", + this->secret_used); + memcpy(this->secret_old, this->secret, SECRET_LENGTH); + this->randomizer->get_pseudo_random_bytes(this->randomizer, + SECRET_LENGTH, this->secret); + this->secret_switch = now; + this->secret_used = 0; + } message->destroy(message); - continue; + return JOB_REQUEUE_DIRECT; } - /* check IKE major version */ - if (message->get_major_version(message) != IKE_MAJOR_VERSION) + /* check if peer has not too many IKE_SAs half open */ + if (peer_to_aggressive(this, message)) { - DBG1(DBG_NET, "received unsupported IKE version %d.%d from %H, " - "sending INVALID_MAJOR_VERSION", message->get_major_version(message), - message->get_minor_version(message), packet->get_source(packet)); - send_notify(message, INVALID_MAJOR_VERSION, chunk_empty); + DBG1(DBG_NET, "ignoring IKE_SA setup from %H, " + "peer to aggressive", message->get_source(message)); message->destroy(message); - continue; - } - - if (message->get_request(message) && - message->get_exchange_type(message) == IKE_SA_INIT) - { - /* check for cookies */ - if (cookie_required(this, message)) - { - u_int32_t now = time(NULL); - chunk_t cookie = cookie_build(this, message, now - this->secret_offset, - chunk_from_thing(this->secret)); - - DBG2(DBG_NET, "received packet from: %#H to %#H", - message->get_source(message), - message->get_destination(message)); - DBG2(DBG_NET, "sending COOKIE notify to %H", - message->get_source(message)); - send_notify(message, COOKIE, cookie); - chunk_free(&cookie); - if (++this->secret_used > COOKIE_REUSE) - { - /* create new cookie */ - DBG1(DBG_NET, "generating new cookie secret after %d uses", - this->secret_used); - memcpy(this->secret_old, this->secret, SECRET_LENGTH); - this->randomizer->get_pseudo_random_bytes(this->randomizer, - SECRET_LENGTH, this->secret); - this->secret_switch = now; - this->secret_used = 0; - } - message->destroy(message); - continue; - } - - /* check if peer has not too many IKE_SAs half open */ - if (peer_to_aggressive(this, message)) - { - DBG1(DBG_NET, "ignoring IKE_SA setup from %H, " - "peer to aggressive", message->get_source(message)); - message->destroy(message); - continue; - } + return JOB_REQUEUE_DIRECT; } - job = (job_t *)process_message_job_create(message); - charon->job_queue->add(charon->job_queue, job); } + job = (job_t*)process_message_job_create(message); + charon->processor->queue_job(charon->processor, job); + return JOB_REQUEUE_DIRECT; } /** @@ -340,8 +335,7 @@ static void receive_packets(private_receiver_t *this) */ static void destroy(private_receiver_t *this) { - pthread_cancel(this->assigned_thread); - pthread_join(this->assigned_thread, NULL); + this->job->cancel(this->job); this->randomizer->destroy(this->randomizer); this->hasher->destroy(this->hasher); free(this); @@ -366,12 +360,10 @@ receiver_t *receiver_create() this->secret); memcpy(this->secret_old, this->secret, SECRET_LENGTH); - if (pthread_create(&this->assigned_thread, NULL, - (void*)receive_packets, this) != 0) - { - free(this); - charon->kill(charon, "unable to create receiver thread"); - } + this->job = callback_job_create((callback_job_cb_t)receive_packets, + this, NULL, NULL); + charon->processor->queue_job(charon->processor, (job_t*)this->job); return &this->public; } + diff --git a/src/charon/network/sender.c b/src/charon/network/sender.c index 933b8c192..f934dc509 100644 --- a/src/charon/network/sender.c +++ b/src/charon/network/sender.c @@ -28,6 +28,7 @@ #include <daemon.h> #include <network/socket.h> +#include <processing/jobs/callback_job.h> typedef struct private_sender_t private_sender_t; @@ -39,12 +40,12 @@ struct private_sender_t { /** * Public part of a sender_t object. */ - sender_t public; + sender_t public; - /** - * Assigned thread. - */ - pthread_t assigned_thread; + /** + * Sender threads job. + */ + callback_job_t *job; /** * The packets are stored in a linked list @@ -82,37 +83,29 @@ static void send_(private_sender_t *this, packet_t *packet) /** * Implementation of private_sender_t.send_packets. */ -static void send_packets(private_sender_t * this) +static job_requeue_t send_packets(private_sender_t * this) { - /* cancellation disabled by default */ - pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); - DBG1(DBG_NET, "sender thread running, thread_ID: %06u", (int)pthread_self()); + packet_t *packet; + int oldstate; - charon->drop_capabilities(charon, TRUE); - - while (TRUE) + pthread_mutex_lock(&this->mutex); + while (this->list->get_count(this->list) == 0) { - packet_t *packet; - int oldstate; - - pthread_mutex_lock(&this->mutex); - /* go to wait while no packets available */ - while (this->list->get_count(this->list) == 0) - { - /* add cleanup handler, wait for packet, remove cleanup handler */ - pthread_cleanup_push((void(*)(void*))pthread_mutex_unlock, (void*)&this->mutex); - pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &oldstate); - pthread_cond_wait(&this->condvar, &this->mutex); - - pthread_setcancelstate(oldstate, NULL); - pthread_cleanup_pop(0); - } - this->list->remove_first(this->list, (void**)&packet); - pthread_mutex_unlock(&this->mutex); + /* add cleanup handler, wait for packet, remove cleanup handler */ + pthread_cleanup_push((void(*)(void*))pthread_mutex_unlock, (void*)&this->mutex); + pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &oldstate); + + pthread_cond_wait(&this->condvar, &this->mutex); - charon->socket->send(charon->socket, packet); - packet->destroy(packet); + pthread_setcancelstate(oldstate, NULL); + pthread_cleanup_pop(0); } + this->list->remove_first(this->list, (void**)&packet); + pthread_mutex_unlock(&this->mutex); + + charon->socket->send(charon->socket, packet); + packet->destroy(packet); + return JOB_REQUEUE_DIRECT; } /** @@ -125,8 +118,7 @@ static void destroy(private_sender_t *this) { sched_yield(); } - pthread_cancel(this->assigned_thread); - pthread_join(this->assigned_thread, NULL); + this->job->cancel(this->job); this->list->destroy(this->list); free(this); } @@ -145,11 +137,10 @@ sender_t * sender_create() pthread_mutex_init(&this->mutex, NULL); pthread_cond_init(&this->condvar, NULL); - if (pthread_create(&this->assigned_thread, NULL, - (void*)send_packets, this) != 0) - { - charon->kill(charon, "unable to create sender thread"); - } + this->job = callback_job_create((callback_job_cb_t)send_packets, + this, NULL, NULL); + charon->processor->queue_job(charon->processor, (job_t*)this->job); - return &(this->public); + return &this->public; } + |