summaryrefslogtreecommitdiff
path: root/accel-pppd/triton/triton.c
diff options
context:
space:
mode:
Diffstat (limited to 'accel-pppd/triton/triton.c')
-rw-r--r--accel-pppd/triton/triton.c610
1 files changed, 610 insertions, 0 deletions
diff --git a/accel-pppd/triton/triton.c b/accel-pppd/triton/triton.c
new file mode 100644
index 00000000..00dfcf6e
--- /dev/null
+++ b/accel-pppd/triton/triton.c
@@ -0,0 +1,610 @@
+#include <signal.h>
+#include <errno.h>
+#include <stdlib.h>
+#include <stdio.h>
+#include <string.h>
+#include <unistd.h>
+#include <sys/resource.h>
+
+#include "triton_p.h"
+#include "memdebug.h"
+
+int thread_count = 2;
+int max_events = 64;
+
+static spinlock_t threads_lock = SPINLOCK_INITIALIZER;
+static LIST_HEAD(threads);
+static LIST_HEAD(sleep_threads);
+
+static LIST_HEAD(ctx_queue);
+
+static spinlock_t ctx_list_lock = SPINLOCK_INITIALIZER;
+static LIST_HEAD(ctx_list);
+
+static int terminate;
+static int need_terminate;
+
+static int need_config_reload;
+static void (*config_reload_notify)(int);
+
+static mempool_t *ctx_pool;
+static mempool_t *call_pool;
+
+struct triton_stat_t __export triton_stat;
+
+static struct timeval ru_utime;
+static struct timeval ru_stime;
+static struct timespec ru_timestamp;
+static int ru_refs;
+static void ru_update(struct triton_timer_t *);
+static struct triton_timer_t ru_timer = {
+ .period = 1000,
+ .expire = ru_update,
+};
+struct triton_context_t default_ctx;
+
+static struct triton_context_t __thread *this_ctx;
+
+#define log_debug2(fmt, ...)
+
+void triton_thread_wakeup(struct _triton_thread_t *thread)
+{
+ log_debug2("wake up thread %p\n", thread);
+ pthread_kill(thread->thread, SIGUSR1);
+}
+
+static void __config_reload(void (*notify)(int))
+{
+ struct _triton_thread_t *t;
+ int r;
+
+ log_debug2("config_reload: enter\n");
+ r = conf_reload(NULL);
+ notify(r);
+
+ spin_lock(&threads_lock);
+ need_config_reload = 0;
+ list_for_each_entry(t, &threads, entry)
+ triton_thread_wakeup(t);
+ spin_unlock(&threads_lock);
+ log_debug2("config_reload: exit\n");
+}
+
+static void ctx_thread(struct _triton_context_t *ctx);
+static void* triton_thread(struct _triton_thread_t *thread)
+{
+ sigset_t set;
+ int sig;
+
+ sigfillset(&set);
+ sigdelset(&set, SIGKILL);
+ sigdelset(&set, SIGSTOP);
+ pthread_sigmask(SIG_BLOCK, &set, NULL);
+
+ sigemptyset(&set);
+ sigaddset(&set, SIGUSR1);
+ sigaddset(&set, SIGQUIT);
+
+ pthread_mutex_lock(&thread->sleep_lock);
+ pthread_mutex_unlock(&thread->sleep_lock);
+
+ while (1) {
+ spin_lock(&threads_lock);
+ if (!list_empty(&ctx_queue) && !need_config_reload) {
+ thread->ctx = list_entry(ctx_queue.next, typeof(*thread->ctx), entry2);
+ log_debug2("thread: %p: dequeued ctx %p\n", thread, thread->ctx);
+ list_del(&thread->ctx->entry2);
+ spin_unlock(&threads_lock);
+ spin_lock(&thread->ctx->lock);
+ thread->ctx->thread = thread;
+ thread->ctx->queued = 0;
+ spin_unlock(&thread->ctx->lock);
+ __sync_sub_and_fetch(&triton_stat.context_pending, 1);
+ } else {
+ if (triton_stat.thread_count > thread_count + triton_stat.context_sleeping) {
+ __sync_sub_and_fetch(&triton_stat.thread_active, 1);
+ __sync_sub_and_fetch(&triton_stat.thread_count, 1);
+ list_del(&thread->entry);
+ spin_unlock(&threads_lock);
+ pthread_detach(pthread_self());
+ log_debug2("thread: %p: exit\n", thread);
+ _free(thread);
+ return NULL;
+ }
+ log_debug2("thread: %p: sleeping\n", thread);
+ if (!terminate)
+ list_add(&thread->entry2, &sleep_threads);
+
+ if (__sync_sub_and_fetch(&triton_stat.thread_active, 1) == 0 && need_config_reload) {
+ spin_unlock(&threads_lock);
+ __config_reload(config_reload_notify);
+ } else
+ spin_unlock(&threads_lock);
+
+ if (terminate)
+ return NULL;
+
+ //printf("thread %p: enter sigwait\n", thread);
+ sigwait(&set, &sig);
+ //printf("thread %p: exit sigwait\n", thread);
+
+ spin_lock(&threads_lock);
+ __sync_add_and_fetch(&triton_stat.thread_active, 1);
+ if (!thread->ctx) {
+ list_del(&thread->entry2);
+ spin_unlock(&threads_lock);
+ continue;
+ }
+ spin_unlock(&threads_lock);
+ }
+
+cont:
+ log_debug2("thread %p: ctx=%p %p\n", thread, thread->ctx, thread->ctx ? thread->ctx->thread : NULL);
+ this_ctx = thread->ctx->ud;
+ if (thread->ctx->ud->before_switch)
+ thread->ctx->ud->before_switch(thread->ctx->ud, thread->ctx->bf_arg);
+
+ log_debug2("thread %p: switch to %p\n", thread, thread->ctx);
+ ctx_thread(thread->ctx);
+ log_debug2("thread %p: switch from %p %p\n", thread, thread->ctx, thread->ctx->thread);
+
+ spin_lock(&thread->ctx->lock);
+ if (thread->ctx->pending) {
+ spin_unlock(&thread->ctx->lock);
+ goto cont;
+ }
+ thread->ctx->thread = NULL;
+ spin_unlock(&thread->ctx->lock);
+
+ if (thread->ctx->need_free) {
+ log_debug2("- context %p removed\n", thread->ctx);
+ mempool_free(thread->ctx);
+ }
+
+ thread->ctx = NULL;
+ }
+}
+
+static void ctx_thread(struct _triton_context_t *ctx)
+{
+ struct _triton_md_handler_t *h;
+ struct _triton_timer_t *t;
+ struct _triton_ctx_call_t *call;
+ uint64_t tt;
+
+ log_debug2("ctx %p %p: enter\n", ctx, ctx->thread);
+ if (ctx->need_close) {
+ if (ctx->ud->close)
+ ctx->ud->close(ctx->ud);
+ ctx->need_close = 0;
+ }
+
+ while (1) {
+ spin_lock(&ctx->lock);
+ if (!list_empty(&ctx->pending_timers)) {
+ t = list_entry(ctx->pending_timers.next, typeof(*t), entry2);
+ list_del(&t->entry2);
+ t->pending = 0;
+ spin_unlock(&ctx->lock);
+ __sync_sub_and_fetch(&triton_stat.timer_pending, 1);
+ read(t->fd, &tt, sizeof(tt));
+ t->ud->expire(t->ud);
+ continue;
+ }
+ if (!list_empty(&ctx->pending_handlers)) {
+ h = list_entry(ctx->pending_handlers.next, typeof(*h), entry2);
+ list_del(&h->entry2);
+ h->pending = 0;
+ spin_unlock(&ctx->lock);
+ __sync_sub_and_fetch(&triton_stat.md_handler_pending, 1);
+ if (h->trig_epoll_events & (EPOLLIN | EPOLLERR | EPOLLHUP))
+ if (h->ud && h->ud->read)
+ if (h->ud->read(h->ud))
+ continue;
+ if (h->trig_epoll_events & (EPOLLOUT | EPOLLERR | EPOLLHUP))
+ if (h->ud && h->ud->write)
+ if (h->ud->write(h->ud))
+ continue;
+ h->trig_epoll_events = 0;
+ continue;
+ }
+ if (!list_empty(&ctx->pending_calls)) {
+ call = list_entry(ctx->pending_calls.next, typeof(*call), entry);
+ list_del(&call->entry);
+ spin_unlock(&ctx->lock);
+ call->func(call->arg);
+ mempool_free(call);
+ continue;
+ }
+ ctx->pending = 0;
+ spin_unlock(&ctx->lock);
+ break;
+ }
+
+ log_debug2("ctx %p %p: exit\n", ctx, ctx->thread);
+}
+
+struct _triton_thread_t *create_thread()
+{
+ struct _triton_thread_t *thread = _malloc(sizeof(*thread));
+ if (!thread)
+ return NULL;
+
+ memset(thread, 0, sizeof(*thread));
+ pthread_mutex_init(&thread->sleep_lock, NULL);
+ pthread_cond_init(&thread->sleep_cond, NULL);
+ pthread_mutex_lock(&thread->sleep_lock);
+ if (pthread_create(&thread->thread, NULL, (void*(*)(void*))triton_thread, thread)) {
+ triton_log_error("pthread_create: %s", strerror(errno));
+ return NULL;
+ }
+
+ __sync_add_and_fetch(&triton_stat.thread_count, 1);
+ __sync_add_and_fetch(&triton_stat.thread_active, 1);
+
+ return thread;
+}
+
+int triton_queue_ctx(struct _triton_context_t *ctx)
+{
+ ctx->pending = 1;
+ if (ctx->thread || ctx->queued || ctx->init)
+ return 0;
+
+ spin_lock(&threads_lock);
+ if (list_empty(&sleep_threads) || need_config_reload) {
+ if (ctx->priority)
+ list_add(&ctx->entry2, &ctx_queue);
+ else
+ list_add_tail(&ctx->entry2, &ctx_queue);
+ spin_unlock(&threads_lock);
+ ctx->queued = 1;
+ log_debug2("ctx %p: queued\n", ctx);
+ __sync_add_and_fetch(&triton_stat.context_pending, 1);
+ return 0;
+ }
+
+ ctx->thread = list_entry(sleep_threads.next, typeof(*ctx->thread), entry2);
+ ctx->thread->ctx = ctx;
+ log_debug2("ctx %p: assigned to thread %p\n", ctx, ctx->thread);
+ list_del(&ctx->thread->entry2);
+ spin_unlock(&threads_lock);
+
+ return 1;
+}
+
+int __export triton_context_register(struct triton_context_t *ud, void *bf_arg)
+{
+ struct _triton_context_t *ctx = mempool_alloc(ctx_pool);
+
+ log_debug2("ctx %p: register\n", ctx);
+ if (!ctx)
+ return -1;
+
+ memset(ctx, 0, sizeof(*ctx));
+ ctx->ud = ud;
+ ctx->bf_arg = bf_arg;
+ ctx->init = 1;
+ spinlock_init(&ctx->lock);
+ INIT_LIST_HEAD(&ctx->handlers);
+ INIT_LIST_HEAD(&ctx->timers);
+ INIT_LIST_HEAD(&ctx->pending_handlers);
+ INIT_LIST_HEAD(&ctx->pending_timers);
+ INIT_LIST_HEAD(&ctx->pending_calls);
+
+ ud->tpd = ctx;
+
+ spin_lock(&ctx_list_lock);
+ list_add_tail(&ctx->entry, &ctx_list);
+ spin_unlock(&ctx_list_lock);
+
+ __sync_add_and_fetch(&triton_stat.context_sleeping, 1);
+ __sync_add_and_fetch(&triton_stat.context_count, 1);
+
+ return 0;
+}
+
+void __export triton_context_unregister(struct triton_context_t *ud)
+{
+ struct _triton_context_t *ctx = (struct _triton_context_t *)ud->tpd;
+ struct _triton_ctx_call_t *call;
+ struct _triton_thread_t *t;
+
+ log_debug2("ctx %p: unregister\n", ctx);
+
+ while (!list_empty(&ctx->pending_calls)) {
+ call = list_entry(ctx->pending_calls.next, typeof(*call), entry);
+ list_del(&call->entry);
+ mempool_free(call);
+ }
+
+ if (!list_empty(&ctx->handlers)) {
+ triton_log_error("BUG:ctx:triton_unregister_ctx: handlers is not empty");
+ {
+ struct _triton_md_handler_t *h;
+ list_for_each_entry(h, &ctx->handlers, entry)
+ if (h->ud)
+ printf("%p\n", h->ud);
+ }
+ abort();
+ }
+ if (!list_empty(&ctx->pending_handlers)) {
+ triton_log_error("BUG:ctx:triton_unregister_ctx: pending_handlers is not empty");
+ abort();
+ }
+ if (!list_empty(&ctx->timers)) {
+ triton_log_error("BUG:ctx:triton_unregister_ctx: timers is not empty");
+ abort();
+ }
+ if (!list_empty(&ctx->pending_timers)) {
+ triton_log_error("BUG:ctx:triton_unregister_ctx: pending_timers is not empty");
+ abort();
+ }
+
+ ctx->need_free = 1;
+ spin_lock(&ctx_list_lock);
+ list_del(&ctx->entry);
+ if (__sync_sub_and_fetch(&triton_stat.context_count, 1) == 1) {
+ if (need_terminate)
+ terminate = 1;
+ }
+ spin_unlock(&ctx_list_lock);
+
+ if (terminate) {
+ list_for_each_entry(t, &threads, entry)
+ triton_thread_wakeup(t);
+ }
+}
+
+void __export triton_context_set_priority(struct triton_context_t *ud, int prio)
+{
+ struct _triton_context_t *ctx = (struct _triton_context_t *)ud->tpd;
+
+ ctx->priority = prio > 0;
+}
+
+void __export triton_context_schedule()
+{
+ struct _triton_context_t *ctx = (struct _triton_context_t *)this_ctx->tpd;
+ struct _triton_thread_t *t = NULL;
+
+ log_debug2("ctx %p: enter schedule\n", ctx);
+ __sync_add_and_fetch(&triton_stat.context_sleeping, 1);
+ __sync_sub_and_fetch(&triton_stat.thread_active, 1);
+ pthread_mutex_lock(&ctx->thread->sleep_lock);
+ while (1) {
+ if (ctx->wakeup) {
+ ctx->wakeup = 0;
+ break;
+ } else {
+ if (!t && triton_stat.thread_count <= thread_count + triton_stat.context_sleeping) {
+ t = create_thread();
+ spin_lock(&threads_lock);
+ list_add_tail(&t->entry, &threads);
+ spin_unlock(&threads_lock);
+ pthread_mutex_unlock(&t->sleep_lock);
+ }
+ pthread_cond_wait(&ctx->thread->sleep_cond, &ctx->thread->sleep_lock);
+ }
+ }
+ pthread_mutex_unlock(&ctx->thread->sleep_lock);
+ __sync_sub_and_fetch(&triton_stat.context_sleeping, 1);
+ __sync_add_and_fetch(&triton_stat.thread_active, 1);
+ log_debug2("ctx %p: exit schedule\n", ctx);
+}
+
+struct triton_context_t __export *triton_context_self(void)
+{
+ return this_ctx;
+}
+
+void triton_context_print(void)
+{
+ struct _triton_context_t *ctx;
+
+ list_for_each_entry(ctx, &ctx_list, entry)
+ printf("%p\n", ctx);
+}
+
+void __export triton_context_wakeup(struct triton_context_t *ud)
+{
+ struct _triton_context_t *ctx = (struct _triton_context_t *)ud->tpd;
+ int r = 0;
+
+ log_debug2("ctx %p: wakeup\n", ctx);
+
+ if (ctx->init) {
+ __sync_sub_and_fetch(&triton_stat.context_sleeping, 1);
+ spin_lock(&ctx->lock);
+ ctx->init = 0;
+ if (ctx->pending)
+ r = triton_queue_ctx(ctx);
+ spin_unlock(&ctx->lock);
+ if (r)
+ triton_thread_wakeup(ctx->thread);
+ return;
+ }
+
+ pthread_mutex_lock(&ctx->thread->sleep_lock);
+ ctx->wakeup = 1;
+ pthread_cond_signal(&ctx->thread->sleep_cond);
+ pthread_mutex_unlock(&ctx->thread->sleep_lock);
+}
+
+int __export triton_context_call(struct triton_context_t *ud, void (*func)(void *), void *arg)
+{
+ struct _triton_context_t *ctx = (struct _triton_context_t *)ud->tpd;
+ struct _triton_ctx_call_t *call = mempool_alloc(call_pool);
+ int r;
+
+ if (!call)
+ return -1;
+
+ call->func = func;
+ call->arg = arg;
+
+ spin_lock(&ctx->lock);
+ list_add_tail(&call->entry, &ctx->pending_calls);
+ r = triton_queue_ctx(ctx);
+ spin_unlock(&ctx->lock);
+
+ if (r)
+ triton_thread_wakeup(ctx->thread);
+
+ return 0;
+}
+
+void __export triton_cancel_call(struct triton_context_t *ud, void (*func)(void *))
+{
+ struct _triton_context_t *ctx = (struct _triton_context_t *)ud->tpd;
+ struct list_head *pos, *n;
+ struct _triton_ctx_call_t *call;
+
+ list_for_each_safe(pos, n, &ctx->pending_calls) {
+ call = list_entry(pos, typeof(*call), entry);
+ if (call->func != func)
+ continue;
+ list_del(&call->entry);
+ mempool_free(call);
+ }
+}
+
+void __export triton_collect_cpu_usage(void)
+{
+ struct rusage rusage;
+
+ if (__sync_fetch_and_add(&ru_refs, 1) == 0) {
+ triton_timer_add(NULL, &ru_timer, 0);
+ getrusage(RUSAGE_SELF, &rusage);
+ clock_gettime(CLOCK_MONOTONIC, &ru_timestamp);
+ ru_utime = rusage.ru_utime;
+ ru_stime = rusage.ru_stime;
+ triton_stat.cpu = 0;
+ }
+}
+
+void __export triton_stop_collect_cpu_usage(void)
+{
+ if (__sync_sub_and_fetch(&ru_refs, 1) == 0)
+ triton_timer_del(&ru_timer);
+}
+
+static void ru_update(struct triton_timer_t *t)
+{
+ struct timespec ts;
+ struct rusage rusage;
+ unsigned int dt;
+ unsigned int val;
+
+ getrusage(RUSAGE_SELF, &rusage);
+ clock_gettime(CLOCK_MONOTONIC, &ts);
+
+ dt = (ts.tv_sec - ru_timestamp.tv_sec) * 1000000 + (ts.tv_nsec - ru_timestamp.tv_nsec) / 1000000;
+ val = (double)((rusage.ru_utime.tv_sec - ru_utime.tv_sec) * 1000000 + (rusage.ru_utime.tv_usec - ru_utime.tv_usec) +
+ (rusage.ru_stime.tv_sec - ru_stime.tv_sec) * 1000000 + (rusage.ru_stime.tv_usec - ru_stime.tv_usec)) / dt * 100;
+
+ if (val <= 100)
+ triton_stat.cpu = val;
+
+ ru_timestamp = ts;
+ ru_utime = rusage.ru_utime;
+ ru_stime = rusage.ru_stime;
+}
+
+int __export triton_init(const char *conf_file)
+{
+ ctx_pool = mempool_create2(sizeof(struct _triton_context_t));
+ call_pool = mempool_create(sizeof(struct _triton_ctx_call_t));
+
+ if (conf_load(conf_file))
+ return -1;
+
+ if (log_init())
+ return -1;
+
+ if (md_init())
+ return -1;
+
+ if (timer_init())
+ return -1;
+
+ if (event_init())
+ return -1;
+
+ return 0;
+}
+
+int __export triton_load_modules(const char *mod_sect)
+{
+ if (load_modules(mod_sect))
+ return -1;
+
+ return 0;
+}
+
+void __export triton_conf_reload(void (*notify)(int))
+{
+ spin_lock(&threads_lock);
+ need_config_reload = 1;
+ config_reload_notify = notify;
+ if (triton_stat.thread_active == 0) {
+ spin_unlock(&threads_lock);
+ __config_reload(notify);
+ } else
+ spin_unlock(&threads_lock);
+}
+
+void __export triton_run()
+{
+ struct _triton_thread_t *t;
+ int i;
+ char *opt;
+
+ opt = conf_get_opt("core", "thread-count");
+ if (opt && atoi(opt) > 0)
+ thread_count = atoi(opt);
+
+ for(i = 0; i < thread_count; i++) {
+ t = create_thread();
+ if (!t)
+ _exit(-1);
+
+ list_add_tail(&t->entry, &threads);
+ pthread_mutex_unlock(&t->sleep_lock);
+ }
+
+ time(&triton_stat.start_time);
+
+ md_run();
+ timer_run();
+
+ triton_context_register(&default_ctx, NULL);
+ triton_context_wakeup(&default_ctx);
+}
+
+void __export triton_terminate()
+{
+ struct _triton_context_t *ctx;
+ struct _triton_thread_t *t;
+ int r;
+
+ need_terminate = 1;
+
+ spin_lock(&ctx_list_lock);
+ list_for_each_entry(ctx, &ctx_list, entry) {
+ spin_lock(&ctx->lock);
+ ctx->need_close = 1;
+ r = triton_queue_ctx(ctx);
+ if (r)
+ triton_thread_wakeup(ctx->thread);
+ spin_unlock(&ctx->lock);
+ }
+ spin_unlock(&ctx_list_lock);
+
+ list_for_each_entry(t, &threads, entry)
+ pthread_join(t->thread, NULL);
+
+ md_terminate();
+ timer_terminate();
+}
+