diff options
Diffstat (limited to 'accel-pptpd/triton/triton.c')
-rw-r--r-- | accel-pptpd/triton/triton.c | 472 |
1 files changed, 472 insertions, 0 deletions
diff --git a/accel-pptpd/triton/triton.c b/accel-pptpd/triton/triton.c new file mode 100644 index 0000000..b2aaa7b --- /dev/null +++ b/accel-pptpd/triton/triton.c @@ -0,0 +1,472 @@ +#include <signal.h> +#include <errno.h> +#include <stdlib.h> +#include <stdio.h> +#include <string.h> +#include <unistd.h> + +#include "triton_p.h" +#include "memdebug.h" + +int thread_count = 2; +int max_events = 64; + +static spinlock_t threads_lock = SPINLOCK_INITIALIZER; +static LIST_HEAD(threads); +static LIST_HEAD(sleep_threads); + +static LIST_HEAD(ctx_queue); + +static spinlock_t ctx_list_lock = SPINLOCK_INITIALIZER; +static LIST_HEAD(ctx_list); + +static int terminate; +static int need_terminate; + +static mempool_t *ctx_pool; +static mempool_t *call_pool; +static mempool_t *ctx_stack_pool; + +__export struct triton_stat_t triton_stat; + +void triton_thread_wakeup(struct _triton_thread_t *thread) +{ + //printf("wake up thread %p\n", thread); + pthread_kill(thread->thread, SIGUSR1); +} + +static void* triton_thread(struct _triton_thread_t *thread) +{ + sigset_t set; + int sig; + + sigfillset(&set); + sigdelset(&set, SIGKILL); + sigdelset(&set, SIGSTOP); + pthread_sigmask(SIG_BLOCK, &set, NULL); + + sigemptyset(&set); + sigaddset(&set, SIGUSR1); + sigaddset(&set, SIGQUIT); + + while (1) { + spin_lock(&threads_lock); + if (!list_empty(&ctx_queue)) { + thread->ctx = list_entry(ctx_queue.next, typeof(*thread->ctx), entry2); + //printf("thread: %p: dequeued ctx %p\n", thread, thread->ctx); + list_del(&thread->ctx->entry2); + spin_unlock(&threads_lock); + spin_lock(&thread->ctx->lock); + thread->ctx->thread = thread; + thread->ctx->queued = 0; + spin_unlock(&thread->ctx->lock); + __sync_fetch_and_sub(&triton_stat.context_pending, 1); + } else { + //printf("thread: %p: sleeping\n", thread); + if (!terminate) + list_add(&thread->entry2, &sleep_threads); + spin_unlock(&threads_lock); + if (terminate) + return NULL; + + __sync_fetch_and_sub(&triton_stat.thread_active, 1); + //printf("thread %p: enter sigwait\n", thread); + sigwait(&set, &sig); + //printf("thread %p: exit sigwait\n", thread); + __sync_fetch_and_add(&triton_stat.thread_active, 1); + + if (!thread->ctx) + continue; + } + +cont: + //printf("thread %p: ctx=%p %p\n", thread, thread->ctx, thread->ctx ? thread->ctx->thread : NULL); + if (thread->ctx->ud->before_switch) + thread->ctx->ud->before_switch(thread->ctx->ud, thread->ctx->bf_arg); + + //printf("thread %p: switch to %p\n", thread, thread->ctx); + while (1) { + if (swapcontext(&thread->uctx, &thread->ctx->uctx)) { + if (errno == EINTR) + continue; + triton_log_error("swapcontext: %s\n", strerror(errno)); + } else + break; + } + //printf("thread %p: switch from %p %p\n", thread, thread->ctx, thread->ctx->thread); + + if (thread->ctx->thread) { + spin_lock(&thread->ctx->lock); + if (thread->ctx->pending) { + spin_unlock(&thread->ctx->lock); + goto cont; + } + thread->ctx->thread = NULL; + spin_unlock(&thread->ctx->lock); + + if (thread->ctx->need_free) { + //printf("- context %p removed\n", thread->ctx); + mempool_free(thread->ctx->uctx.uc_stack.ss_sp); + mempool_free(thread->ctx); + } + } + + thread->ctx = NULL; + } +} + +static void ctx_thread(struct _triton_context_t *ctx) +{ + struct _triton_md_handler_t *h; + struct _triton_timer_t *t; + struct _triton_ctx_call_t *call; + uint64_t tt; + + while (1) { + //printf("ctx %p %p: enter\n", ctx, ctx->thread); + if (ctx->need_close) { + if (ctx->ud->close) + ctx->ud->close(ctx->ud); + ctx->need_close = 0; + } + + while (1) { + spin_lock(&ctx->lock); + if (!list_empty(&ctx->pending_timers)) { + t = list_entry(ctx->pending_timers.next, typeof(*t), entry2); + list_del(&t->entry2); + t->pending = 0; + spin_unlock(&ctx->lock); + read(t->fd, &tt, sizeof(tt)); + t->ud->expire(t->ud); + continue; + } + if (!list_empty(&ctx->pending_handlers)) { + h = list_entry(ctx->pending_handlers.next, typeof(*h), entry2); + list_del(&h->entry2); + h->pending = 0; + spin_unlock(&ctx->lock); + if (h->trig_epoll_events & (EPOLLIN | EPOLLERR | EPOLLHUP)) + if (h->ud && h->ud->read) + h->ud->read(h->ud); + if (h->trig_epoll_events & (EPOLLOUT | EPOLLERR | EPOLLHUP)) + if (h->ud && h->ud->write) + h->ud->write(h->ud); + h->trig_epoll_events = 0; + continue; + } + if (!list_empty(&ctx->pending_calls)) { + call = list_entry(ctx->pending_calls.next, typeof(*call), entry); + list_del(&call->entry); + spin_unlock(&ctx->lock); + call->func(call->arg); + mempool_free(call); + } + ctx->pending = 0; + spin_unlock(&ctx->lock); + break; + } + + //printf("ctx %p %p: exit\n", ctx, ctx->thread); + while (1) { + if (swapcontext(&ctx->uctx, &ctx->thread->uctx)) { + if (errno == EINTR) + continue; + triton_log_error("swapcontext: %s\n", strerror(errno)); + } else + break; + } + } +} + +struct _triton_thread_t *create_thread() +{ + struct _triton_thread_t *thread = malloc(sizeof(*thread)); + if (!thread) + return NULL; + + memset(thread, 0, sizeof(*thread)); + if (pthread_create(&thread->thread, NULL, (void*(*)(void*))triton_thread, thread)) { + triton_log_error("pthread_create: %s", strerror(errno)); + return NULL; + } + + triton_stat.thread_count++; + triton_stat.thread_active++; + + return thread; +} + +int triton_queue_ctx(struct _triton_context_t *ctx) +{ + ctx->pending = 1; + if (ctx->thread || ctx->queued || ctx->sleeping) + return 0; + + spin_lock(&threads_lock); + if (list_empty(&sleep_threads)) { + list_add_tail(&ctx->entry2, &ctx_queue); + spin_unlock(&threads_lock); + ctx->queued = 1; + //printf("ctx %p: queued\n", ctx); + __sync_fetch_and_add(&triton_stat.context_pending, 1); + return 0; + } + + ctx->thread = list_entry(sleep_threads.next, typeof(*ctx->thread), entry2); + ctx->thread->ctx = ctx; + //printf("ctx %p: assigned to thread %p\n", ctx, ctx->thread); + list_del(&ctx->thread->entry2); + spin_unlock(&threads_lock); + + return 1; +} + +int __export triton_context_register(struct triton_context_t *ud, void *bf_arg) +{ + struct _triton_context_t *ctx = mempool_alloc(ctx_pool); + + if (!ctx) + return -1; + + memset(ctx, 0, sizeof(*ctx)); + ctx->ud = ud; + ctx->bf_arg = bf_arg; + ctx->sleeping = 1; + spinlock_init(&ctx->lock); + INIT_LIST_HEAD(&ctx->handlers); + INIT_LIST_HEAD(&ctx->timers); + INIT_LIST_HEAD(&ctx->pending_handlers); + INIT_LIST_HEAD(&ctx->pending_timers); + INIT_LIST_HEAD(&ctx->pending_calls); + + if (getcontext(&ctx->uctx)) { + triton_log_error("getcontext: %s\n", strerror(errno)); + _free(ctx); + return -1; + } + + ctx->uctx.uc_stack.ss_size = CTX_STACK_SIZE; + ctx->uctx.uc_stack.ss_sp = mempool_alloc(ctx_stack_pool); + if (!ctx->uctx.uc_stack.ss_sp) { + triton_log_error("out of memory\n"); + _free(ctx); + return -1; + } + sigfillset(&ctx->uctx.uc_sigmask); + makecontext(&ctx->uctx, (void (*)())ctx_thread, 1, ctx); + + ud->tpd = ctx; + + spin_lock(&ctx_list_lock); + list_add_tail(&ctx->entry, &ctx_list); + spin_unlock(&ctx_list_lock); + + __sync_fetch_and_add(&triton_stat.context_sleeping, 1); + __sync_fetch_and_add(&triton_stat.context_count, 1); + + return 0; +} + +void __export triton_context_unregister(struct triton_context_t *ud) +{ + struct _triton_context_t *ctx = (struct _triton_context_t *)ud->tpd; + struct _triton_ctx_call_t *call; + struct _triton_thread_t *t; + + while (!list_empty(&ctx->pending_calls)) { + call = list_entry(ctx->pending_calls.next, typeof(*call), entry); + list_del(&call->entry); + mempool_free(call); + } + + if (!list_empty(&ctx->handlers)) { + triton_log_error("BUG:ctx:triton_unregister_ctx: handlers is not empty"); + { + struct _triton_md_handler_t *h; + list_for_each_entry(h, &ctx->handlers, entry) + if (h->ud) + printf("%p\n", h->ud); + } + abort(); + } + if (!list_empty(&ctx->pending_handlers)) { + triton_log_error("BUG:ctx:triton_unregister_ctx: pending_handlers is not empty"); + abort(); + } + if (!list_empty(&ctx->timers)) { + triton_log_error("BUG:ctx:triton_unregister_ctx: timers is not empty"); + abort(); + } + if (!list_empty(&ctx->pending_timers)) { + triton_log_error("BUG:ctx:triton_unregister_ctx: pending_timers is not empty"); + abort(); + } + + ctx->need_free = 1; + spin_lock(&ctx_list_lock); + list_del(&ctx->entry); + if (need_terminate && list_empty(&ctx_list)) + terminate = 1; + spin_unlock(&ctx_list_lock); + + __sync_fetch_and_sub(&triton_stat.context_count, 1); + + if (terminate) { + list_for_each_entry(t, &threads, entry) + triton_thread_wakeup(t); + } +} +void __export triton_context_schedule(struct triton_context_t *ud) +{ + struct _triton_context_t *ctx = (struct _triton_context_t *)ud->tpd; + ucontext_t *uctx = &ctx->thread->uctx; + + spin_lock(&ctx->lock); + if (ctx->wakeup) { + ctx->wakeup = 0; + spin_unlock(&ctx->lock); + return; + } + ctx->sleeping = 1; + ctx->thread = NULL; + spin_unlock(&ctx->lock); + + while (1) { + if (swapcontext(&ctx->uctx, uctx)) { + if (errno == EINTR) + continue; + triton_log_error("swaswpntext: %s\n", strerror(errno)); + } else + break; + } + + __sync_fetch_and_add(&triton_stat.context_sleeping, 1); +} + +int __export triton_context_wakeup(struct triton_context_t *ud) +{ + struct _triton_context_t *ctx = (struct _triton_context_t *)ud->tpd; + int r; + + spin_lock(&ctx->lock); + if (!ctx->sleeping) { + ctx->wakeup = 1; + spin_unlock(&ctx->lock); + return -1; + } + ctx->sleeping = 0; + r = triton_queue_ctx(ctx); + spin_unlock(&ctx->lock); + + if (r) + triton_thread_wakeup(ctx->thread); + + __sync_fetch_and_sub(&triton_stat.context_sleeping, 1); + + return 0; +} + +int __export triton_context_call(struct triton_context_t *ud, void (*func)(void *), void *arg) +{ + struct _triton_context_t *ctx = (struct _triton_context_t *)ud->tpd; + struct _triton_ctx_call_t *call = mempool_alloc(call_pool); + int r; + + if (!call) + return -1; + + call->func = func; + call->arg = arg; + + spin_lock(&ctx->lock); + list_add_tail(&call->entry, &ctx->pending_calls); + r = triton_queue_ctx(ctx); + spin_unlock(&ctx->lock); + + if (r) + triton_thread_wakeup(ctx->thread); + + return 0; +} + +int __export triton_init(const char *conf_file) +{ + ctx_pool = mempool_create(sizeof(struct _triton_context_t)); + call_pool = mempool_create(sizeof(struct _triton_ctx_call_t)); + ctx_stack_pool = mempool_create(CTX_STACK_SIZE); + + if (conf_load(conf_file)) + return -1; + + if (log_init()) + return -1; + + if (md_init()) + return -1; + + if (timer_init()) + return -1; + + if (event_init()) + return -1; + + return 0; +} + +int __export triton_load_modules(const char *mod_sect) +{ + if (load_modules(mod_sect)) + return -1; + + return 0; +} + +void __export triton_run() +{ + struct _triton_thread_t *t; + int i; + char *opt; + + opt = conf_get_opt("core", "thread-count"); + if (opt && atoi(opt) > 0) + thread_count = atoi(opt); + + for(i = 0; i < thread_count; i++) { + t = create_thread(); + if (!t) + _exit(-1); + + list_add_tail(&t->entry, &threads); + } + + md_run(); + timer_run(); +} + +void __export triton_terminate() +{ + struct _triton_context_t *ctx; + struct _triton_thread_t *t; + int r; + + need_terminate = 1; + + spin_lock(&ctx_list_lock); + list_for_each_entry(ctx, &ctx_list, entry) { + spin_lock(&ctx->lock); + ctx->need_close = 1; + r = triton_queue_ctx(ctx); + if (r) + triton_thread_wakeup(ctx->thread); + spin_unlock(&ctx->lock); + } + spin_unlock(&ctx_list_lock); + + list_for_each_entry(t, &threads, entry) + pthread_join(t->thread, NULL); + + md_terminate(); + timer_terminate(); +} + |