diff options
Diffstat (limited to 'accel-pppd/triton/triton.c')
-rw-r--r-- | accel-pppd/triton/triton.c | 610 |
1 files changed, 610 insertions, 0 deletions
diff --git a/accel-pppd/triton/triton.c b/accel-pppd/triton/triton.c new file mode 100644 index 00000000..00dfcf6e --- /dev/null +++ b/accel-pppd/triton/triton.c @@ -0,0 +1,610 @@ +#include <signal.h> +#include <errno.h> +#include <stdlib.h> +#include <stdio.h> +#include <string.h> +#include <unistd.h> +#include <sys/resource.h> + +#include "triton_p.h" +#include "memdebug.h" + +int thread_count = 2; +int max_events = 64; + +static spinlock_t threads_lock = SPINLOCK_INITIALIZER; +static LIST_HEAD(threads); +static LIST_HEAD(sleep_threads); + +static LIST_HEAD(ctx_queue); + +static spinlock_t ctx_list_lock = SPINLOCK_INITIALIZER; +static LIST_HEAD(ctx_list); + +static int terminate; +static int need_terminate; + +static int need_config_reload; +static void (*config_reload_notify)(int); + +static mempool_t *ctx_pool; +static mempool_t *call_pool; + +struct triton_stat_t __export triton_stat; + +static struct timeval ru_utime; +static struct timeval ru_stime; +static struct timespec ru_timestamp; +static int ru_refs; +static void ru_update(struct triton_timer_t *); +static struct triton_timer_t ru_timer = { + .period = 1000, + .expire = ru_update, +}; +struct triton_context_t default_ctx; + +static struct triton_context_t __thread *this_ctx; + +#define log_debug2(fmt, ...) + +void triton_thread_wakeup(struct _triton_thread_t *thread) +{ + log_debug2("wake up thread %p\n", thread); + pthread_kill(thread->thread, SIGUSR1); +} + +static void __config_reload(void (*notify)(int)) +{ + struct _triton_thread_t *t; + int r; + + log_debug2("config_reload: enter\n"); + r = conf_reload(NULL); + notify(r); + + spin_lock(&threads_lock); + need_config_reload = 0; + list_for_each_entry(t, &threads, entry) + triton_thread_wakeup(t); + spin_unlock(&threads_lock); + log_debug2("config_reload: exit\n"); +} + +static void ctx_thread(struct _triton_context_t *ctx); +static void* triton_thread(struct _triton_thread_t *thread) +{ + sigset_t set; + int sig; + + sigfillset(&set); + sigdelset(&set, SIGKILL); + sigdelset(&set, SIGSTOP); + pthread_sigmask(SIG_BLOCK, &set, NULL); + + sigemptyset(&set); + sigaddset(&set, SIGUSR1); + sigaddset(&set, SIGQUIT); + + pthread_mutex_lock(&thread->sleep_lock); + pthread_mutex_unlock(&thread->sleep_lock); + + while (1) { + spin_lock(&threads_lock); + if (!list_empty(&ctx_queue) && !need_config_reload) { + thread->ctx = list_entry(ctx_queue.next, typeof(*thread->ctx), entry2); + log_debug2("thread: %p: dequeued ctx %p\n", thread, thread->ctx); + list_del(&thread->ctx->entry2); + spin_unlock(&threads_lock); + spin_lock(&thread->ctx->lock); + thread->ctx->thread = thread; + thread->ctx->queued = 0; + spin_unlock(&thread->ctx->lock); + __sync_sub_and_fetch(&triton_stat.context_pending, 1); + } else { + if (triton_stat.thread_count > thread_count + triton_stat.context_sleeping) { + __sync_sub_and_fetch(&triton_stat.thread_active, 1); + __sync_sub_and_fetch(&triton_stat.thread_count, 1); + list_del(&thread->entry); + spin_unlock(&threads_lock); + pthread_detach(pthread_self()); + log_debug2("thread: %p: exit\n", thread); + _free(thread); + return NULL; + } + log_debug2("thread: %p: sleeping\n", thread); + if (!terminate) + list_add(&thread->entry2, &sleep_threads); + + if (__sync_sub_and_fetch(&triton_stat.thread_active, 1) == 0 && need_config_reload) { + spin_unlock(&threads_lock); + __config_reload(config_reload_notify); + } else + spin_unlock(&threads_lock); + + if (terminate) + return NULL; + + //printf("thread %p: enter sigwait\n", thread); + sigwait(&set, &sig); + //printf("thread %p: exit sigwait\n", thread); + + spin_lock(&threads_lock); + __sync_add_and_fetch(&triton_stat.thread_active, 1); + if (!thread->ctx) { + list_del(&thread->entry2); + spin_unlock(&threads_lock); + continue; + } + spin_unlock(&threads_lock); + } + +cont: + log_debug2("thread %p: ctx=%p %p\n", thread, thread->ctx, thread->ctx ? thread->ctx->thread : NULL); + this_ctx = thread->ctx->ud; + if (thread->ctx->ud->before_switch) + thread->ctx->ud->before_switch(thread->ctx->ud, thread->ctx->bf_arg); + + log_debug2("thread %p: switch to %p\n", thread, thread->ctx); + ctx_thread(thread->ctx); + log_debug2("thread %p: switch from %p %p\n", thread, thread->ctx, thread->ctx->thread); + + spin_lock(&thread->ctx->lock); + if (thread->ctx->pending) { + spin_unlock(&thread->ctx->lock); + goto cont; + } + thread->ctx->thread = NULL; + spin_unlock(&thread->ctx->lock); + + if (thread->ctx->need_free) { + log_debug2("- context %p removed\n", thread->ctx); + mempool_free(thread->ctx); + } + + thread->ctx = NULL; + } +} + +static void ctx_thread(struct _triton_context_t *ctx) +{ + struct _triton_md_handler_t *h; + struct _triton_timer_t *t; + struct _triton_ctx_call_t *call; + uint64_t tt; + + log_debug2("ctx %p %p: enter\n", ctx, ctx->thread); + if (ctx->need_close) { + if (ctx->ud->close) + ctx->ud->close(ctx->ud); + ctx->need_close = 0; + } + + while (1) { + spin_lock(&ctx->lock); + if (!list_empty(&ctx->pending_timers)) { + t = list_entry(ctx->pending_timers.next, typeof(*t), entry2); + list_del(&t->entry2); + t->pending = 0; + spin_unlock(&ctx->lock); + __sync_sub_and_fetch(&triton_stat.timer_pending, 1); + read(t->fd, &tt, sizeof(tt)); + t->ud->expire(t->ud); + continue; + } + if (!list_empty(&ctx->pending_handlers)) { + h = list_entry(ctx->pending_handlers.next, typeof(*h), entry2); + list_del(&h->entry2); + h->pending = 0; + spin_unlock(&ctx->lock); + __sync_sub_and_fetch(&triton_stat.md_handler_pending, 1); + if (h->trig_epoll_events & (EPOLLIN | EPOLLERR | EPOLLHUP)) + if (h->ud && h->ud->read) + if (h->ud->read(h->ud)) + continue; + if (h->trig_epoll_events & (EPOLLOUT | EPOLLERR | EPOLLHUP)) + if (h->ud && h->ud->write) + if (h->ud->write(h->ud)) + continue; + h->trig_epoll_events = 0; + continue; + } + if (!list_empty(&ctx->pending_calls)) { + call = list_entry(ctx->pending_calls.next, typeof(*call), entry); + list_del(&call->entry); + spin_unlock(&ctx->lock); + call->func(call->arg); + mempool_free(call); + continue; + } + ctx->pending = 0; + spin_unlock(&ctx->lock); + break; + } + + log_debug2("ctx %p %p: exit\n", ctx, ctx->thread); +} + +struct _triton_thread_t *create_thread() +{ + struct _triton_thread_t *thread = _malloc(sizeof(*thread)); + if (!thread) + return NULL; + + memset(thread, 0, sizeof(*thread)); + pthread_mutex_init(&thread->sleep_lock, NULL); + pthread_cond_init(&thread->sleep_cond, NULL); + pthread_mutex_lock(&thread->sleep_lock); + if (pthread_create(&thread->thread, NULL, (void*(*)(void*))triton_thread, thread)) { + triton_log_error("pthread_create: %s", strerror(errno)); + return NULL; + } + + __sync_add_and_fetch(&triton_stat.thread_count, 1); + __sync_add_and_fetch(&triton_stat.thread_active, 1); + + return thread; +} + +int triton_queue_ctx(struct _triton_context_t *ctx) +{ + ctx->pending = 1; + if (ctx->thread || ctx->queued || ctx->init) + return 0; + + spin_lock(&threads_lock); + if (list_empty(&sleep_threads) || need_config_reload) { + if (ctx->priority) + list_add(&ctx->entry2, &ctx_queue); + else + list_add_tail(&ctx->entry2, &ctx_queue); + spin_unlock(&threads_lock); + ctx->queued = 1; + log_debug2("ctx %p: queued\n", ctx); + __sync_add_and_fetch(&triton_stat.context_pending, 1); + return 0; + } + + ctx->thread = list_entry(sleep_threads.next, typeof(*ctx->thread), entry2); + ctx->thread->ctx = ctx; + log_debug2("ctx %p: assigned to thread %p\n", ctx, ctx->thread); + list_del(&ctx->thread->entry2); + spin_unlock(&threads_lock); + + return 1; +} + +int __export triton_context_register(struct triton_context_t *ud, void *bf_arg) +{ + struct _triton_context_t *ctx = mempool_alloc(ctx_pool); + + log_debug2("ctx %p: register\n", ctx); + if (!ctx) + return -1; + + memset(ctx, 0, sizeof(*ctx)); + ctx->ud = ud; + ctx->bf_arg = bf_arg; + ctx->init = 1; + spinlock_init(&ctx->lock); + INIT_LIST_HEAD(&ctx->handlers); + INIT_LIST_HEAD(&ctx->timers); + INIT_LIST_HEAD(&ctx->pending_handlers); + INIT_LIST_HEAD(&ctx->pending_timers); + INIT_LIST_HEAD(&ctx->pending_calls); + + ud->tpd = ctx; + + spin_lock(&ctx_list_lock); + list_add_tail(&ctx->entry, &ctx_list); + spin_unlock(&ctx_list_lock); + + __sync_add_and_fetch(&triton_stat.context_sleeping, 1); + __sync_add_and_fetch(&triton_stat.context_count, 1); + + return 0; +} + +void __export triton_context_unregister(struct triton_context_t *ud) +{ + struct _triton_context_t *ctx = (struct _triton_context_t *)ud->tpd; + struct _triton_ctx_call_t *call; + struct _triton_thread_t *t; + + log_debug2("ctx %p: unregister\n", ctx); + + while (!list_empty(&ctx->pending_calls)) { + call = list_entry(ctx->pending_calls.next, typeof(*call), entry); + list_del(&call->entry); + mempool_free(call); + } + + if (!list_empty(&ctx->handlers)) { + triton_log_error("BUG:ctx:triton_unregister_ctx: handlers is not empty"); + { + struct _triton_md_handler_t *h; + list_for_each_entry(h, &ctx->handlers, entry) + if (h->ud) + printf("%p\n", h->ud); + } + abort(); + } + if (!list_empty(&ctx->pending_handlers)) { + triton_log_error("BUG:ctx:triton_unregister_ctx: pending_handlers is not empty"); + abort(); + } + if (!list_empty(&ctx->timers)) { + triton_log_error("BUG:ctx:triton_unregister_ctx: timers is not empty"); + abort(); + } + if (!list_empty(&ctx->pending_timers)) { + triton_log_error("BUG:ctx:triton_unregister_ctx: pending_timers is not empty"); + abort(); + } + + ctx->need_free = 1; + spin_lock(&ctx_list_lock); + list_del(&ctx->entry); + if (__sync_sub_and_fetch(&triton_stat.context_count, 1) == 1) { + if (need_terminate) + terminate = 1; + } + spin_unlock(&ctx_list_lock); + + if (terminate) { + list_for_each_entry(t, &threads, entry) + triton_thread_wakeup(t); + } +} + +void __export triton_context_set_priority(struct triton_context_t *ud, int prio) +{ + struct _triton_context_t *ctx = (struct _triton_context_t *)ud->tpd; + + ctx->priority = prio > 0; +} + +void __export triton_context_schedule() +{ + struct _triton_context_t *ctx = (struct _triton_context_t *)this_ctx->tpd; + struct _triton_thread_t *t = NULL; + + log_debug2("ctx %p: enter schedule\n", ctx); + __sync_add_and_fetch(&triton_stat.context_sleeping, 1); + __sync_sub_and_fetch(&triton_stat.thread_active, 1); + pthread_mutex_lock(&ctx->thread->sleep_lock); + while (1) { + if (ctx->wakeup) { + ctx->wakeup = 0; + break; + } else { + if (!t && triton_stat.thread_count <= thread_count + triton_stat.context_sleeping) { + t = create_thread(); + spin_lock(&threads_lock); + list_add_tail(&t->entry, &threads); + spin_unlock(&threads_lock); + pthread_mutex_unlock(&t->sleep_lock); + } + pthread_cond_wait(&ctx->thread->sleep_cond, &ctx->thread->sleep_lock); + } + } + pthread_mutex_unlock(&ctx->thread->sleep_lock); + __sync_sub_and_fetch(&triton_stat.context_sleeping, 1); + __sync_add_and_fetch(&triton_stat.thread_active, 1); + log_debug2("ctx %p: exit schedule\n", ctx); +} + +struct triton_context_t __export *triton_context_self(void) +{ + return this_ctx; +} + +void triton_context_print(void) +{ + struct _triton_context_t *ctx; + + list_for_each_entry(ctx, &ctx_list, entry) + printf("%p\n", ctx); +} + +void __export triton_context_wakeup(struct triton_context_t *ud) +{ + struct _triton_context_t *ctx = (struct _triton_context_t *)ud->tpd; + int r = 0; + + log_debug2("ctx %p: wakeup\n", ctx); + + if (ctx->init) { + __sync_sub_and_fetch(&triton_stat.context_sleeping, 1); + spin_lock(&ctx->lock); + ctx->init = 0; + if (ctx->pending) + r = triton_queue_ctx(ctx); + spin_unlock(&ctx->lock); + if (r) + triton_thread_wakeup(ctx->thread); + return; + } + + pthread_mutex_lock(&ctx->thread->sleep_lock); + ctx->wakeup = 1; + pthread_cond_signal(&ctx->thread->sleep_cond); + pthread_mutex_unlock(&ctx->thread->sleep_lock); +} + +int __export triton_context_call(struct triton_context_t *ud, void (*func)(void *), void *arg) +{ + struct _triton_context_t *ctx = (struct _triton_context_t *)ud->tpd; + struct _triton_ctx_call_t *call = mempool_alloc(call_pool); + int r; + + if (!call) + return -1; + + call->func = func; + call->arg = arg; + + spin_lock(&ctx->lock); + list_add_tail(&call->entry, &ctx->pending_calls); + r = triton_queue_ctx(ctx); + spin_unlock(&ctx->lock); + + if (r) + triton_thread_wakeup(ctx->thread); + + return 0; +} + +void __export triton_cancel_call(struct triton_context_t *ud, void (*func)(void *)) +{ + struct _triton_context_t *ctx = (struct _triton_context_t *)ud->tpd; + struct list_head *pos, *n; + struct _triton_ctx_call_t *call; + + list_for_each_safe(pos, n, &ctx->pending_calls) { + call = list_entry(pos, typeof(*call), entry); + if (call->func != func) + continue; + list_del(&call->entry); + mempool_free(call); + } +} + +void __export triton_collect_cpu_usage(void) +{ + struct rusage rusage; + + if (__sync_fetch_and_add(&ru_refs, 1) == 0) { + triton_timer_add(NULL, &ru_timer, 0); + getrusage(RUSAGE_SELF, &rusage); + clock_gettime(CLOCK_MONOTONIC, &ru_timestamp); + ru_utime = rusage.ru_utime; + ru_stime = rusage.ru_stime; + triton_stat.cpu = 0; + } +} + +void __export triton_stop_collect_cpu_usage(void) +{ + if (__sync_sub_and_fetch(&ru_refs, 1) == 0) + triton_timer_del(&ru_timer); +} + +static void ru_update(struct triton_timer_t *t) +{ + struct timespec ts; + struct rusage rusage; + unsigned int dt; + unsigned int val; + + getrusage(RUSAGE_SELF, &rusage); + clock_gettime(CLOCK_MONOTONIC, &ts); + + dt = (ts.tv_sec - ru_timestamp.tv_sec) * 1000000 + (ts.tv_nsec - ru_timestamp.tv_nsec) / 1000000; + val = (double)((rusage.ru_utime.tv_sec - ru_utime.tv_sec) * 1000000 + (rusage.ru_utime.tv_usec - ru_utime.tv_usec) + + (rusage.ru_stime.tv_sec - ru_stime.tv_sec) * 1000000 + (rusage.ru_stime.tv_usec - ru_stime.tv_usec)) / dt * 100; + + if (val <= 100) + triton_stat.cpu = val; + + ru_timestamp = ts; + ru_utime = rusage.ru_utime; + ru_stime = rusage.ru_stime; +} + +int __export triton_init(const char *conf_file) +{ + ctx_pool = mempool_create2(sizeof(struct _triton_context_t)); + call_pool = mempool_create(sizeof(struct _triton_ctx_call_t)); + + if (conf_load(conf_file)) + return -1; + + if (log_init()) + return -1; + + if (md_init()) + return -1; + + if (timer_init()) + return -1; + + if (event_init()) + return -1; + + return 0; +} + +int __export triton_load_modules(const char *mod_sect) +{ + if (load_modules(mod_sect)) + return -1; + + return 0; +} + +void __export triton_conf_reload(void (*notify)(int)) +{ + spin_lock(&threads_lock); + need_config_reload = 1; + config_reload_notify = notify; + if (triton_stat.thread_active == 0) { + spin_unlock(&threads_lock); + __config_reload(notify); + } else + spin_unlock(&threads_lock); +} + +void __export triton_run() +{ + struct _triton_thread_t *t; + int i; + char *opt; + + opt = conf_get_opt("core", "thread-count"); + if (opt && atoi(opt) > 0) + thread_count = atoi(opt); + + for(i = 0; i < thread_count; i++) { + t = create_thread(); + if (!t) + _exit(-1); + + list_add_tail(&t->entry, &threads); + pthread_mutex_unlock(&t->sleep_lock); + } + + time(&triton_stat.start_time); + + md_run(); + timer_run(); + + triton_context_register(&default_ctx, NULL); + triton_context_wakeup(&default_ctx); +} + +void __export triton_terminate() +{ + struct _triton_context_t *ctx; + struct _triton_thread_t *t; + int r; + + need_terminate = 1; + + spin_lock(&ctx_list_lock); + list_for_each_entry(ctx, &ctx_list, entry) { + spin_lock(&ctx->lock); + ctx->need_close = 1; + r = triton_queue_ctx(ctx); + if (r) + triton_thread_wakeup(ctx->thread); + spin_unlock(&ctx->lock); + } + spin_unlock(&ctx_list_lock); + + list_for_each_entry(t, &threads, entry) + pthread_join(t->thread, NULL); + + md_terminate(); + timer_terminate(); +} + |