From c767ca5d3c09d8f64dbfa05e86fde1fd5d702083 Mon Sep 17 00:00:00 2001 From: Dmitry Kozlov Date: Tue, 26 Dec 2017 20:46:01 +0300 Subject: reworked context priorities Introduced 4 priorities: 0 - management (cli) 1 - starting sessions (default priority) 2 - active sessions 3 - finishing sessions --- accel-pppd/triton/triton.c | 105 +++++++++++++++++++++++-------------------- accel-pppd/triton/triton_p.h | 5 ++- 2 files changed, 59 insertions(+), 51 deletions(-) (limited to 'accel-pppd/triton') diff --git a/accel-pppd/triton/triton.c b/accel-pppd/triton/triton.c index ce054ce1..785d18f3 100644 --- a/accel-pppd/triton/triton.c +++ b/accel-pppd/triton/triton.c @@ -16,14 +16,13 @@ #define WORKER_STACK_SIZE 1024*1024 int thread_count = 2; -int thread_count_max = 200; int max_events = 64; static spinlock_t threads_lock; static LIST_HEAD(threads); static LIST_HEAD(sleep_threads); -static LIST_HEAD(ctx_queue); +static struct list_head ctx_queue[CTX_PRIO_MAX]; static spinlock_t ctx_list_lock; static LIST_HEAD(ctx_list); @@ -83,6 +82,25 @@ static void __config_reload(void (*notify)(int)) static void ctx_thread(struct _triton_context_t *ctx); +static int check_ctx_queue_empty(struct _triton_thread_t *t) +{ + int i; + + for (i = 0; i < CTX_PRIO_MAX; i++) { + if (!list_empty(&t->wakeup_list[i])) { + t->ctx = list_entry(t->wakeup_list[i].next, struct _triton_context_t, entry2); + return 1; + } + + if (!list_empty(&ctx_queue[i])) { + t->ctx = list_entry(ctx_queue[i].next, struct _triton_context_t, entry2); + return 1; + } + } + + return 0; +} + static void* triton_thread(struct _triton_thread_t *thread) { sigset_t set; @@ -105,22 +123,21 @@ static void* triton_thread(struct _triton_thread_t *thread) while (1) { spin_lock(&threads_lock); - if ((!list_empty(&ctx_queue) || !list_empty(&thread->wakeup_list)) && !need_config_reload && triton_stat.thread_active <= thread_count) { - if (!list_empty(&thread->wakeup_list)) { - thread->ctx = list_entry(thread->wakeup_list.next, typeof(*thread->ctx), entry2); + if (!need_config_reload && check_ctx_queue_empty(thread)) { + if (thread->ctx->wakeup) { log_debug2("thread: %p: wakeup ctx %p\n", thread, thread->ctx); list_del(&thread->ctx->entry2); spin_unlock(&threads_lock); this_ctx = thread->ctx->ud; - if (thread->ctx->ud->before_switch) - thread->ctx->ud->before_switch(thread->ctx->ud, thread->ctx->bf_arg); + if (this_ctx->before_switch) + this_ctx->before_switch(this_ctx, thread->ctx->bf_arg); - alloca(thread->ctx->uc->uc_stack.ss_size); + //alloca(thread->ctx->uc->uc_stack.ss_size); memcpy(thread_frame - thread->ctx->uc->uc_stack.ss_size, thread->ctx->uc->uc_stack.ss_sp, thread->ctx->uc->uc_stack.ss_size); setcontext(thread->ctx->uc); + abort(); } else { - thread->ctx = list_entry(ctx_queue.next, typeof(*thread->ctx), entry2); log_debug2("thread: %p: dequeued ctx %p\n", thread, thread->ctx); list_del(&thread->ctx->entry2); spin_unlock(&threads_lock); @@ -131,17 +148,6 @@ static void* triton_thread(struct _triton_thread_t *thread) __sync_sub_and_fetch(&triton_stat.context_pending, 1); } } else { - if (triton_stat.thread_count > thread_count + triton_stat.context_sleeping) { - __sync_sub_and_fetch(&triton_stat.thread_active, 1); - __sync_sub_and_fetch(&triton_stat.thread_count, 1); - list_del(&thread->entry); - spin_unlock(&threads_lock); - pthread_detach(pthread_self()); - log_debug2("thread: %p: exit\n", thread); - _free(thread); - return NULL; - } - log_debug2("thread: %p: sleeping\n", thread); if (!terminate) @@ -177,8 +183,8 @@ static void* triton_thread(struct _triton_thread_t *thread) if (setjmp(jmp_env) == 0) { log_debug2("thread %p: ctx=%p %p\n", thread, thread->ctx, thread->ctx ? thread->ctx->thread : NULL); this_ctx = thread->ctx->ud; - if (thread->ctx->ud->before_switch) - thread->ctx->ud->before_switch(thread->ctx->ud, thread->ctx->bf_arg); + if (this_ctx->before_switch) + this_ctx->before_switch(this_ctx, thread->ctx->bf_arg); while (1) { log_debug2("thread %p: switch to %p\n", thread, thread->ctx); @@ -288,6 +294,8 @@ static void ctx_thread(struct _triton_context_t *ctx) struct _triton_thread_t *create_thread() { + int i; + pthread_attr_t attr; struct _triton_thread_t *thread = _malloc(sizeof(*thread)); if (!thread) { @@ -295,14 +303,18 @@ struct _triton_thread_t *create_thread() return NULL; } - pthread_attr_init(&attr); - pthread_attr_setstacksize(&attr, WORKER_STACK_SIZE); memset(thread, 0, sizeof(*thread)); - INIT_LIST_HEAD(&thread->wakeup_list); + + for (i = 0; i < CTX_PRIO_MAX; i++) + INIT_LIST_HEAD(&thread->wakeup_list[i]); + pthread_mutex_init(&thread->sleep_lock, NULL); - pthread_cond_init(&thread->sleep_cond, NULL); pthread_mutex_lock(&thread->sleep_lock); + + pthread_attr_init(&attr); + pthread_attr_setstacksize(&attr, WORKER_STACK_SIZE); + while (pthread_create(&thread->thread, &attr, (void*(*)(void*))triton_thread, thread)) sleep(1); @@ -321,12 +333,8 @@ int triton_queue_ctx(struct _triton_context_t *ctx) return 0; } - if (list_empty(&sleep_threads) || need_config_reload || triton_stat.thread_active > thread_count || - (ctx->priority == 0 && triton_stat.thread_count > thread_count_max)) { - if (ctx->priority) - list_add(&ctx->entry2, &ctx_queue); - else - list_add_tail(&ctx->entry2, &ctx_queue); + if (list_empty(&sleep_threads) || need_config_reload) { + list_add_tail(&ctx->entry2, &ctx_queue[ctx->priority]); spin_unlock(&threads_lock); ctx->queued = 1; log_debug2("ctx %p: queued\n", ctx); @@ -362,6 +370,7 @@ int __export triton_context_register(struct triton_context_t *ud, void *bf_arg) ctx->bf_arg = bf_arg; ctx->init = 1; ctx->refs = 1; + ctx->priority = 1; spinlock_init(&ctx->lock); INIT_LIST_HEAD(&ctx->handlers); INIT_LIST_HEAD(&ctx->timers); @@ -444,7 +453,9 @@ void __export triton_context_set_priority(struct triton_context_t *ud, int prio) { struct _triton_context_t *ctx = (struct _triton_context_t *)ud->tpd; - ctx->priority = prio > 0; + assert(prio >= 0 && prio < CTX_PRIO_MAX); + + ctx->priority = prio; } struct triton_context_t __export *triton_context_self(void) @@ -476,7 +487,7 @@ static ucontext_t *alloc_context() void __export triton_context_schedule() { - struct _triton_context_t *ctx = (struct _triton_context_t *)this_ctx->tpd; + volatile struct _triton_context_t *ctx = (struct _triton_context_t *)this_ctx->tpd; log_debug2("ctx %p: enter schedule\n", ctx); __sync_add_and_fetch(&triton_stat.context_sleeping, 1); @@ -491,8 +502,8 @@ void __export triton_context_schedule() spin_lock(&threads_lock); if (ctx->wakeup) { - spin_unlock(&threads_lock); ctx->wakeup = 0; + spin_unlock(&threads_lock); free(ctx->uc); ctx->uc = NULL; __sync_sub_and_fetch(&triton_stat.context_sleeping, 1); @@ -518,18 +529,13 @@ void __export triton_context_wakeup(struct triton_context_t *ud) if (ctx->pending) r = triton_queue_ctx(ctx); spin_unlock(&ctx->lock); - if (r) - triton_thread_wakeup(ctx->thread); - return; - } - - spin_lock(&threads_lock); - ctx->wakeup = 1; - if (ctx->thread->ctx != ctx) { - list_add_tail(&ctx->entry2, &ctx->thread->wakeup_list); + } else { + spin_lock(&threads_lock); + ctx->wakeup = 1; + list_add_tail(&ctx->entry2, &ctx->thread->wakeup_list[ctx->priority]); r = ctx->thread->ctx == NULL; + spin_unlock(&threads_lock); } - spin_unlock(&threads_lock); if (r) triton_thread_wakeup(ctx->thread); @@ -641,12 +647,17 @@ void __export triton_register_init(int order, void (*func)(void)) int __export triton_init(const char *conf_file) { + int i; + spinlock_init(&threads_lock); spinlock_init(&ctx_list_lock); ctx_pool = mempool_create(sizeof(struct _triton_context_t)); call_pool = mempool_create(sizeof(struct _triton_ctx_call_t)); + for (i = 0; i < CTX_PRIO_MAX; i++) + INIT_LIST_HEAD(&ctx_queue[i]); + if (conf_load(conf_file)) return -1; @@ -715,10 +726,6 @@ void __export triton_run() } } - opt = conf_get_opt("core", "thread-count-max"); - if (opt && atoi(opt) > 0) - thread_count_max = atoi(opt); - for(i = 0; i < thread_count; i++) { t = create_thread(); if (!t) diff --git a/accel-pppd/triton/triton_p.h b/accel-pppd/triton/triton_p.h index a16ef98e..fe735bf6 100644 --- a/accel-pppd/triton/triton_p.h +++ b/accel-pppd/triton/triton_p.h @@ -10,6 +10,8 @@ #include "spinlock.h" #include "mempool.h" +#define CTX_PRIO_MAX 4 + struct _triton_thread_t { struct list_head entry; @@ -18,8 +20,7 @@ struct _triton_thread_t int terminate; struct _triton_context_t *ctx; pthread_mutex_t sleep_lock; - pthread_cond_t sleep_cond; - struct list_head wakeup_list; + struct list_head wakeup_list[CTX_PRIO_MAX]; }; struct _triton_context_t -- cgit v1.2.3