aboutsummaryrefslogtreecommitdiffstats
path: root/src/charon/network/sender.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/charon/network/sender.c')
-rw-r--r--src/charon/network/sender.c69
1 files changed, 30 insertions, 39 deletions
diff --git a/src/charon/network/sender.c b/src/charon/network/sender.c
index 933b8c192..f934dc509 100644
--- a/src/charon/network/sender.c
+++ b/src/charon/network/sender.c
@@ -28,6 +28,7 @@
#include <daemon.h>
#include <network/socket.h>
+#include <processing/jobs/callback_job.h>
typedef struct private_sender_t private_sender_t;
@@ -39,12 +40,12 @@ struct private_sender_t {
/**
* Public part of a sender_t object.
*/
- sender_t public;
+ sender_t public;
- /**
- * Assigned thread.
- */
- pthread_t assigned_thread;
+ /**
+ * Sender threads job.
+ */
+ callback_job_t *job;
/**
* The packets are stored in a linked list
@@ -82,37 +83,29 @@ static void send_(private_sender_t *this, packet_t *packet)
/**
* Implementation of private_sender_t.send_packets.
*/
-static void send_packets(private_sender_t * this)
+static job_requeue_t send_packets(private_sender_t * this)
{
- /* cancellation disabled by default */
- pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
- DBG1(DBG_NET, "sender thread running, thread_ID: %06u", (int)pthread_self());
+ packet_t *packet;
+ int oldstate;
- charon->drop_capabilities(charon, TRUE);
-
- while (TRUE)
+ pthread_mutex_lock(&this->mutex);
+ while (this->list->get_count(this->list) == 0)
{
- packet_t *packet;
- int oldstate;
-
- pthread_mutex_lock(&this->mutex);
- /* go to wait while no packets available */
- while (this->list->get_count(this->list) == 0)
- {
- /* add cleanup handler, wait for packet, remove cleanup handler */
- pthread_cleanup_push((void(*)(void*))pthread_mutex_unlock, (void*)&this->mutex);
- pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &oldstate);
- pthread_cond_wait(&this->condvar, &this->mutex);
-
- pthread_setcancelstate(oldstate, NULL);
- pthread_cleanup_pop(0);
- }
- this->list->remove_first(this->list, (void**)&packet);
- pthread_mutex_unlock(&this->mutex);
+ /* add cleanup handler, wait for packet, remove cleanup handler */
+ pthread_cleanup_push((void(*)(void*))pthread_mutex_unlock, (void*)&this->mutex);
+ pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &oldstate);
+
+ pthread_cond_wait(&this->condvar, &this->mutex);
- charon->socket->send(charon->socket, packet);
- packet->destroy(packet);
+ pthread_setcancelstate(oldstate, NULL);
+ pthread_cleanup_pop(0);
}
+ this->list->remove_first(this->list, (void**)&packet);
+ pthread_mutex_unlock(&this->mutex);
+
+ charon->socket->send(charon->socket, packet);
+ packet->destroy(packet);
+ return JOB_REQUEUE_DIRECT;
}
/**
@@ -125,8 +118,7 @@ static void destroy(private_sender_t *this)
{
sched_yield();
}
- pthread_cancel(this->assigned_thread);
- pthread_join(this->assigned_thread, NULL);
+ this->job->cancel(this->job);
this->list->destroy(this->list);
free(this);
}
@@ -145,11 +137,10 @@ sender_t * sender_create()
pthread_mutex_init(&this->mutex, NULL);
pthread_cond_init(&this->condvar, NULL);
- if (pthread_create(&this->assigned_thread, NULL,
- (void*)send_packets, this) != 0)
- {
- charon->kill(charon, "unable to create sender thread");
- }
+ this->job = callback_job_create((callback_job_cb_t)send_packets,
+ this, NULL, NULL);
+ charon->processor->queue_job(charon->processor, (job_t*)this->job);
- return &(this->public);
+ return &this->public;
}
+