#include #include #include #include #include #include #include #include "triton_p.h" #include "memdebug.h" int thread_count = 2; int max_events = 64; static spinlock_t threads_lock = SPINLOCK_INITIALIZER; static LIST_HEAD(threads); static LIST_HEAD(sleep_threads); static LIST_HEAD(ctx_queue); static spinlock_t ctx_list_lock = SPINLOCK_INITIALIZER; static LIST_HEAD(ctx_list); static int terminate; static int need_terminate; static int need_config_reload; static void (*config_reload_notify)(int); static mempool_t *ctx_pool; static mempool_t *call_pool; struct triton_stat_t __export triton_stat; static struct timeval ru_utime; static struct timeval ru_stime; static struct timespec ru_timestamp; static int ru_refs; static void ru_update(struct triton_timer_t *); static struct triton_timer_t ru_timer = { .period = 1000, .expire = ru_update, }; struct triton_context_t default_ctx; static struct triton_context_t __thread *this_ctx; #define log_debug2(fmt, ...) void triton_thread_wakeup(struct _triton_thread_t *thread) { log_debug2("wake up thread %p\n", thread); pthread_kill(thread->thread, SIGUSR1); } static void __config_reload(void (*notify)(int)) { struct _triton_thread_t *t; int r; log_debug2("config_reload: enter\n"); r = conf_reload(NULL); notify(r); spin_lock(&threads_lock); need_config_reload = 0; list_for_each_entry(t, &threads, entry) triton_thread_wakeup(t); spin_unlock(&threads_lock); log_debug2("config_reload: exit\n"); } static void ctx_thread(struct _triton_context_t *ctx); static void* triton_thread(struct _triton_thread_t *thread) { sigset_t set; int sig; sigfillset(&set); sigdelset(&set, SIGKILL); sigdelset(&set, SIGSTOP); pthread_sigmask(SIG_BLOCK, &set, NULL); sigemptyset(&set); sigaddset(&set, SIGUSR1); sigaddset(&set, SIGQUIT); pthread_mutex_lock(&thread->sleep_lock); pthread_mutex_unlock(&thread->sleep_lock); while (1) { spin_lock(&threads_lock); if (!list_empty(&ctx_queue) && !need_config_reload) { thread->ctx = list_entry(ctx_queue.next, typeof(*thread->ctx), entry2); log_debug2("thread: %p: dequeued ctx %p\n", thread, thread->ctx); list_del(&thread->ctx->entry2); spin_unlock(&threads_lock); spin_lock(&thread->ctx->lock); thread->ctx->thread = thread; thread->ctx->queued = 0; spin_unlock(&thread->ctx->lock); __sync_sub_and_fetch(&triton_stat.context_pending, 1); } else { if (triton_stat.thread_count > thread_count + triton_stat.context_sleeping) { __sync_sub_and_fetch(&triton_stat.thread_active, 1); __sync_sub_and_fetch(&triton_stat.thread_count, 1); list_del(&thread->entry); spin_unlock(&threads_lock); pthread_detach(pthread_self()); log_debug2("thread: %p: exit\n", thread); _free(thread); return NULL; } log_debug2("thread: %p: sleeping\n", thread); if (!terminate) list_add(&thread->entry2, &sleep_threads); if (__sync_sub_and_fetch(&triton_stat.thread_active, 1) == 0 && need_config_reload) { spin_unlock(&threads_lock); __config_reload(config_reload_notify); } else spin_unlock(&threads_lock); if (terminate) return NULL; //printf("thread %p: enter sigwait\n", thread); sigwait(&set, &sig); //printf("thread %p: exit sigwait\n", thread); spin_lock(&threads_lock); __sync_add_and_fetch(&triton_stat.thread_active, 1); if (!thread->ctx) { list_del(&thread->entry2); spin_unlock(&threads_lock); continue; } spin_unlock(&threads_lock); } cont: log_debug2("thread %p: ctx=%p %p\n", thread, thread->ctx, thread->ctx ? thread->ctx->thread : NULL); this_ctx = thread->ctx->ud; if (thread->ctx->ud->before_switch) thread->ctx->ud->before_switch(thread->ctx->ud, thread->ctx->bf_arg); log_debug2("thread %p: switch to %p\n", thread, thread->ctx); ctx_thread(thread->ctx); log_debug2("thread %p: switch from %p %p\n", thread, thread->ctx, thread->ctx->thread); spin_lock(&thread->ctx->lock); if (thread->ctx->pending) { spin_unlock(&thread->ctx->lock); goto cont; } thread->ctx->thread = NULL; spin_unlock(&thread->ctx->lock); if (thread->ctx->need_free) { log_debug2("- context %p removed\n", thread->ctx); mempool_free(thread->ctx); } thread->ctx = NULL; } } static void ctx_thread(struct _triton_context_t *ctx) { struct _triton_md_handler_t *h; struct _triton_timer_t *t; struct _triton_ctx_call_t *call; uint64_t tt; log_debug2("ctx %p %p: enter\n", ctx, ctx->thread); if (ctx->need_close) { if (ctx->ud->close) ctx->ud->close(ctx->ud); ctx->need_close = 0; } while (1) { spin_lock(&ctx->lock); if (!list_empty(&ctx->pending_timers)) { t = list_entry(ctx->pending_timers.next, typeof(*t), entry2); list_del(&t->entry2); t->pending = 0; spin_unlock(&ctx->lock); __sync_sub_and_fetch(&triton_stat.timer_pending, 1); read(t->fd, &tt, sizeof(tt)); t->ud->expire(t->ud); continue; } if (!list_empty(&ctx->pending_handlers)) { h = list_entry(ctx->pending_handlers.next, typeof(*h), entry2); list_del(&h->entry2); h->pending = 0; spin_unlock(&ctx->lock); __sync_sub_and_fetch(&triton_stat.md_handler_pending, 1); if (h->trig_epoll_events & (EPOLLIN | EPOLLERR | EPOLLHUP)) if (h->ud && h->ud->read) if (h->ud->read(h->ud)) continue; if (h->trig_epoll_events & (EPOLLOUT | EPOLLERR | EPOLLHUP)) if (h->ud && h->ud->write) if (h->ud->write(h->ud)) continue; h->trig_epoll_events = 0; continue; } if (!list_empty(&ctx->pending_calls)) { call = list_entry(ctx->pending_calls.next, typeof(*call), entry); list_del(&call->entry); spin_unlock(&ctx->lock); call->func(call->arg); mempool_free(call); continue; } ctx->pending = 0; spin_unlock(&ctx->lock); break; } log_debug2("ctx %p %p: exit\n", ctx, ctx->thread); } struct _triton_thread_t *create_thread() { struct _triton_thread_t *thread = _malloc(sizeof(*thread)); if (!thread) return NULL; memset(thread, 0, sizeof(*thread)); pthread_mutex_init(&thread->sleep_lock, NULL); pthread_cond_init(&thread->sleep_cond, NULL); pthread_mutex_lock(&thread->sleep_lock); if (pthread_create(&thread->thread, NULL, (void*(*)(void*))triton_thread, thread)) { triton_log_error("pthread_create: %s", strerror(errno)); return NULL; } __sync_add_and_fetch(&triton_stat.thread_count, 1); __sync_add_and_fetch(&triton_stat.thread_active, 1); return thread; } int triton_queue_ctx(struct _triton_context_t *ctx) { ctx->pending = 1; if (ctx->thread || ctx->queued || ctx->init) return 0; spin_lock(&threads_lock); if (list_empty(&sleep_threads) || need_config_reload) { if (ctx->priority) list_add(&ctx->entry2, &ctx_queue); else list_add_tail(&ctx->entry2, &ctx_queue); spin_unlock(&threads_lock); ctx->queued = 1; log_debug2("ctx %p: queued\n", ctx); __sync_add_and_fetch(&triton_stat.context_pending, 1); return 0; } ctx->thread = list_entry(sleep_threads.next, typeof(*ctx->thread), entry2); ctx->thread->ctx = ctx; log_debug2("ctx %p: assigned to thread %p\n", ctx, ctx->thread); list_del(&ctx->thread->entry2); spin_unlock(&threads_lock); return 1; } int __export triton_context_register(struct triton_context_t *ud, void *bf_arg) { struct _triton_context_t *ctx = mempool_alloc(ctx_pool); log_debug2("ctx %p: register\n", ctx); if (!ctx) return -1; memset(ctx, 0, sizeof(*ctx)); ctx->ud = ud; ctx->bf_arg = bf_arg; ctx->init = 1; spinlock_init(&ctx->lock); INIT_LIST_HEAD(&ctx->handlers); INIT_LIST_HEAD(&ctx->timers); INIT_LIST_HEAD(&ctx->pending_handlers); INIT_LIST_HEAD(&ctx->pending_timers); INIT_LIST_HEAD(&ctx->pending_calls); ud->tpd = ctx; spin_lock(&ctx_list_lock); list_add_tail(&ctx->entry, &ctx_list); spin_unlock(&ctx_list_lock); __sync_add_and_fetch(&triton_stat.context_sleeping, 1); __sync_add_and_fetch(&triton_stat.context_count, 1); return 0; } void __export triton_context_unregister(struct triton_context_t *ud) { struct _triton_context_t *ctx = (struct _triton_context_t *)ud->tpd; struct _triton_ctx_call_t *call; struct _triton_thread_t *t; log_debug2("ctx %p: unregister\n", ctx); while (!list_empty(&ctx->pending_calls)) { call = list_entry(ctx->pending_calls.next, typeof(*call), entry); list_del(&call->entry); mempool_free(call); } if (!list_empty(&ctx->handlers)) { triton_log_error("BUG:ctx:triton_unregister_ctx: handlers is not empty"); { struct _triton_md_handler_t *h; list_for_each_entry(h, &ctx->handlers, entry) if (h->ud) printf("%p\n", h->ud); } abort(); } if (!list_empty(&ctx->pending_handlers)) { triton_log_error("BUG:ctx:triton_unregister_ctx: pending_handlers is not empty"); abort(); } if (!list_empty(&ctx->timers)) { triton_log_error("BUG:ctx:triton_unregister_ctx: timers is not empty"); abort(); } if (!list_empty(&ctx->pending_timers)) { triton_log_error("BUG:ctx:triton_unregister_ctx: pending_timers is not empty"); abort(); } ctx->need_free = 1; spin_lock(&ctx_list_lock); list_del(&ctx->entry); if (__sync_sub_and_fetch(&triton_stat.context_count, 1) == 1) { if (need_terminate) terminate = 1; } spin_unlock(&ctx_list_lock); if (terminate) { list_for_each_entry(t, &threads, entry) triton_thread_wakeup(t); } } void __export triton_context_set_priority(struct triton_context_t *ud, int prio) { struct _triton_context_t *ctx = (struct _triton_context_t *)ud->tpd; ctx->priority = prio > 0; } void __export triton_context_schedule() { struct _triton_context_t *ctx = (struct _triton_context_t *)this_ctx->tpd; struct _triton_thread_t *t = NULL; log_debug2("ctx %p: enter schedule\n", ctx); __sync_add_and_fetch(&triton_stat.context_sleeping, 1); __sync_sub_and_fetch(&triton_stat.thread_active, 1); pthread_mutex_lock(&ctx->thread->sleep_lock); while (1) { if (ctx->wakeup) { ctx->wakeup = 0; break; } else { if (!t && triton_stat.thread_count <= thread_count + triton_stat.context_sleeping) { t = create_thread(); spin_lock(&threads_lock); list_add_tail(&t->entry, &threads); spin_unlock(&threads_lock); pthread_mutex_unlock(&t->sleep_lock); } pthread_cond_wait(&ctx->thread->sleep_cond, &ctx->thread->sleep_lock); } } pthread_mutex_unlock(&ctx->thread->sleep_lock); __sync_sub_and_fetch(&triton_stat.context_sleeping, 1); __sync_add_and_fetch(&triton_stat.thread_active, 1); log_debug2("ctx %p: exit schedule\n", ctx); } struct triton_context_t __export *triton_context_self(void) { return this_ctx; } void triton_context_print(void) { struct _triton_context_t *ctx; list_for_each_entry(ctx, &ctx_list, entry) printf("%p\n", ctx); } void __export triton_context_wakeup(struct triton_context_t *ud) { struct _triton_context_t *ctx = (struct _triton_context_t *)ud->tpd; int r = 0; log_debug2("ctx %p: wakeup\n", ctx); if (ctx->init) { __sync_sub_and_fetch(&triton_stat.context_sleeping, 1); spin_lock(&ctx->lock); ctx->init = 0; if (ctx->pending) r = triton_queue_ctx(ctx); spin_unlock(&ctx->lock); if (r) triton_thread_wakeup(ctx->thread); return; } pthread_mutex_lock(&ctx->thread->sleep_lock); ctx->wakeup = 1; pthread_cond_signal(&ctx->thread->sleep_cond); pthread_mutex_unlock(&ctx->thread->sleep_lock); } int __export triton_context_call(struct triton_context_t *ud, void (*func)(void *), void *arg) { struct _triton_context_t *ctx = (struct _triton_context_t *)ud->tpd; struct _triton_ctx_call_t *call = mempool_alloc(call_pool); int r; if (!call) return -1; call->func = func; call->arg = arg; spin_lock(&ctx->lock); list_add_tail(&call->entry, &ctx->pending_calls); r = triton_queue_ctx(ctx); spin_unlock(&ctx->lock); if (r) triton_thread_wakeup(ctx->thread); return 0; } void __export triton_cancel_call(struct triton_context_t *ud, void (*func)(void *)) { struct _triton_context_t *ctx = (struct _triton_context_t *)ud->tpd; struct list_head *pos, *n; struct _triton_ctx_call_t *call; list_for_each_safe(pos, n, &ctx->pending_calls) { call = list_entry(pos, typeof(*call), entry); if (call->func != func) continue; list_del(&call->entry); mempool_free(call); } } void __export triton_collect_cpu_usage(void) { struct rusage rusage; if (__sync_fetch_and_add(&ru_refs, 1) == 0) { triton_timer_add(NULL, &ru_timer, 0); getrusage(RUSAGE_SELF, &rusage); clock_gettime(CLOCK_MONOTONIC, &ru_timestamp); ru_utime = rusage.ru_utime; ru_stime = rusage.ru_stime; triton_stat.cpu = 0; } } void __export triton_stop_collect_cpu_usage(void) { if (__sync_sub_and_fetch(&ru_refs, 1) == 0) triton_timer_del(&ru_timer); } static void ru_update(struct triton_timer_t *t) { struct timespec ts; struct rusage rusage; unsigned int dt; unsigned int val; getrusage(RUSAGE_SELF, &rusage); clock_gettime(CLOCK_MONOTONIC, &ts); dt = (ts.tv_sec - ru_timestamp.tv_sec) * 1000000 + (ts.tv_nsec - ru_timestamp.tv_nsec) / 1000000; val = (double)((rusage.ru_utime.tv_sec - ru_utime.tv_sec) * 1000000 + (rusage.ru_utime.tv_usec - ru_utime.tv_usec) + (rusage.ru_stime.tv_sec - ru_stime.tv_sec) * 1000000 + (rusage.ru_stime.tv_usec - ru_stime.tv_usec)) / dt * 100; if (val <= 100) triton_stat.cpu = val; ru_timestamp = ts; ru_utime = rusage.ru_utime; ru_stime = rusage.ru_stime; } int __export triton_init(const char *conf_file) { ctx_pool = mempool_create2(sizeof(struct _triton_context_t)); call_pool = mempool_create(sizeof(struct _triton_ctx_call_t)); if (conf_load(conf_file)) return -1; if (log_init()) return -1; if (md_init()) return -1; if (timer_init()) return -1; if (event_init()) return -1; return 0; } int __export triton_load_modules(const char *mod_sect) { if (load_modules(mod_sect)) return -1; return 0; } void __export triton_conf_reload(void (*notify)(int)) { spin_lock(&threads_lock); need_config_reload = 1; config_reload_notify = notify; if (triton_stat.thread_active == 0) { spin_unlock(&threads_lock); __config_reload(notify); } else spin_unlock(&threads_lock); } void __export triton_run() { struct _triton_thread_t *t; int i; char *opt; opt = conf_get_opt("core", "thread-count"); if (opt && atoi(opt) > 0) thread_count = atoi(opt); for(i = 0; i < thread_count; i++) { t = create_thread(); if (!t) _exit(-1); list_add_tail(&t->entry, &threads); pthread_mutex_unlock(&t->sleep_lock); } time(&triton_stat.start_time); md_run(); timer_run(); triton_context_register(&default_ctx, NULL); triton_context_wakeup(&default_ctx); } void __export triton_terminate() { struct _triton_context_t *ctx; struct _triton_thread_t *t; int r; need_terminate = 1; spin_lock(&ctx_list_lock); list_for_each_entry(ctx, &ctx_list, entry) { spin_lock(&ctx->lock); ctx->need_close = 1; r = triton_queue_ctx(ctx); if (r) triton_thread_wakeup(ctx->thread); spin_unlock(&ctx->lock); } spin_unlock(&ctx_list_lock); list_for_each_entry(t, &threads, entry) pthread_join(t->thread, NULL); md_terminate(); timer_terminate(); }