From 3e61cb3f8d58f64c8023e95bf74341e6bc61560e Mon Sep 17 00:00:00 2001 From: Dmitry Kozlov Date: Wed, 22 Sep 2010 22:53:59 +0400 Subject: log: log_file rewrited to use aio log: simplified engine various bug fixes --- accel-pptpd/triton/md.c | 26 ++++----- accel-pptpd/triton/timer.c | 10 +--- accel-pptpd/triton/triton.c | 128 ++++++++++++++++++++++++++++++------------ accel-pptpd/triton/triton.h | 6 +- accel-pptpd/triton/triton_p.h | 1 + 5 files changed, 109 insertions(+), 62 deletions(-) (limited to 'accel-pptpd/triton') diff --git a/accel-pptpd/triton/md.c b/accel-pptpd/triton/md.c index 06b2b30b..133abe80 100644 --- a/accel-pptpd/triton/md.c +++ b/accel-pptpd/triton/md.c @@ -59,24 +59,16 @@ static void *md_thread(void *arg) sigset_t set; sigfillset(&set); + sigdelset(&set, SIGKILL); + sigdelset(&set, SIGSTOP); pthread_sigmask(SIG_BLOCK, &set, NULL); - sigemptyset(&set); - sigaddset(&set, SIGQUIT); - sigaddset(&set, SIGSEGV); - sigaddset(&set, SIGFPE); - sigaddset(&set, SIGILL); - sigaddset(&set, SIGBUS); - sigdelset(&set, 35); - sigdelset(&set, 36); - pthread_sigmask(SIG_UNBLOCK, &set, NULL); - while(1) { n = epoll_wait(epoll_fd, epoll_events, max_events, -1); if (n < 0) { if (errno == EINTR) continue; - triton_log_error("md:epoll_wait: %s", strerror(errno)); + triton_log_error("md:epoll_wait: %s\n", strerror(errno)); _exit(-1); } @@ -152,8 +144,10 @@ int __export triton_md_enable_handler(struct triton_md_handler_t *ud, int mode) else r = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, h->ud->fd, &h->epoll_event); - if (r) - triton_log_error("md:epoll_ctl: %s",strerror(errno)); + if (r) { + triton_log_error("md:epoll_ctl: %s\n",strerror(errno)); + abort(); + } return r; } @@ -177,8 +171,10 @@ int __export triton_md_disable_handler(struct triton_md_handler_t *ud,int mode) r = epoll_ctl(epoll_fd, EPOLL_CTL_DEL, h->ud->fd, NULL); } - if (r) - triton_log_error("md:epoll_ctl: %s",strerror(errno)); + if (r) { + triton_log_error("md:epoll_ctl: %s\n",strerror(errno)); + abort(); + } return r; } diff --git a/accel-pptpd/triton/timer.c b/accel-pptpd/triton/timer.c index 53abd3b9..cbf2b13d 100644 --- a/accel-pptpd/triton/timer.c +++ b/accel-pptpd/triton/timer.c @@ -60,16 +60,10 @@ void *timer_thread(void *arg) sigset_t set; sigfillset(&set); + sigdelset(&set, SIGKILL); + sigdelset(&set, SIGSTOP); pthread_sigmask(SIG_BLOCK, &set, NULL); - sigemptyset(&set); - sigaddset(&set, SIGQUIT); - sigaddset(&set, SIGSEGV); - sigaddset(&set, SIGFPE); - sigaddset(&set, SIGILL); - sigaddset(&set, SIGBUS); - pthread_sigmask(SIG_UNBLOCK, &set, NULL); - while(1) { n = epoll_wait(epoll_fd, epoll_events, max_events, -1); if (n < 0) { diff --git a/accel-pptpd/triton/triton.c b/accel-pptpd/triton/triton.c index 95930647..2de2e67b 100644 --- a/accel-pptpd/triton/triton.c +++ b/accel-pptpd/triton/triton.c @@ -8,7 +8,7 @@ #include "triton_p.h" #include "memdebug.h" -int thread_count = 1; +int thread_count = 2; int max_events = 64; static spinlock_t threads_lock = SPINLOCK_INITIALIZER; @@ -25,46 +25,75 @@ static int terminate; static mempool_t *ctx_pool; static mempool_t *call_pool; +static mempool_t *ctx_stack_pool; __export struct triton_stat_t triton_stat; void triton_thread_wakeup(struct _triton_thread_t *thread) { + //printf("wake up thread %p\n", thread); pthread_kill(thread->thread, SIGUSR1); } static void* triton_thread(struct _triton_thread_t *thread) { sigset_t set; + int sig; sigfillset(&set); - sigdelset(&set, SIGSEGV); - sigdelset(&set, SIGFPE); - sigdelset(&set, SIGILL); - sigdelset(&set, SIGBUS); - pthread_sigmask(SIG_SETMASK, &set, NULL); + sigdelset(&set, SIGKILL); + sigdelset(&set, SIGSTOP); + pthread_sigmask(SIG_BLOCK, &set, NULL); - sigdelset(&set, SIGUSR1); - sigdelset(&set, SIGQUIT); + sigemptyset(&set); + sigaddset(&set, SIGUSR1); + sigaddset(&set, SIGQUIT); while (1) { __sync_fetch_and_sub(&triton_stat.thread_active, 1); - sigsuspend(&set); + //printf("thread %p: enter sigwait\n", thread); + sigwait(&set, &sig); + //printf("thread %p: exit sigwait\n", thread); __sync_fetch_and_add(&triton_stat.thread_active, 1); cont: + //printf("thread %p: ctx=%p %p\n", thread, thread->ctx, thread->ctx ? thread->ctx->thread : NULL); if (thread->ctx->ud->before_switch) thread->ctx->ud->before_switch(thread->ctx->ud, thread->ctx->bf_arg); - if (swapcontext(&thread->uctx, &thread->ctx->uctx)) - triton_log_error("swapcontext: %s\n", strerror(errno)); - - if (thread->ctx->need_free) - mempool_free(thread->ctx); + + //printf("thread %p: switch to %p\n", thread, thread->ctx); + while (1) { + if (swapcontext(&thread->uctx, &thread->ctx->uctx)) { + if (errno == EINTR) + continue; + triton_log_error("swapcontext: %s\n", strerror(errno)); + } else + break; + } + //printf("thread %p: switch from %p %p\n", thread, thread->ctx, thread->ctx->thread); + + if (thread->ctx->thread) { + spin_lock(&thread->ctx->lock); + if (thread->ctx->pending) { + spin_unlock(&thread->ctx->lock); + goto cont; + } + thread->ctx->thread = NULL; + spin_unlock(&thread->ctx->lock); + + if (thread->ctx->need_free) { + //printf("- context %p removed\n", thread->ctx); + mempool_free(thread->ctx->uctx.uc_stack.ss_sp); + mempool_free(thread->ctx); + } + } + thread->ctx = NULL; spin_lock(&threads_lock); if (!list_empty(&ctx_queue)) { thread->ctx = list_entry(ctx_queue.next, typeof(*thread->ctx), entry2); + //printf("thread: %p: dequeued ctx %p\n", thread, thread->ctx); list_del(&thread->ctx->entry2); spin_unlock(&threads_lock); spin_lock(&thread->ctx->lock); @@ -74,6 +103,7 @@ cont: __sync_fetch_and_sub(&triton_stat.context_pending, 1); goto cont; } else { + //printf("thread: %p: sleeping\n", thread); if (!terminate) list_add(&thread->entry2, &sleep_threads); spin_unlock(&threads_lock); @@ -89,10 +119,9 @@ static void ctx_thread(struct _triton_context_t *ctx) struct _triton_timer_t *t; struct _triton_ctx_call_t *call; uint64_t tt; - ucontext_t *uctx; while (1) { - uctx = &ctx->thread->uctx; + //printf("ctx %p %p: enter\n", ctx, ctx->thread); if (ctx->need_close) { if (ctx->ud->close) ctx->ud->close(ctx->ud); @@ -131,11 +160,19 @@ static void ctx_thread(struct _triton_context_t *ctx) call->func(call->arg); mempool_free(call); } - ctx->thread = NULL; + ctx->pending = 0; spin_unlock(&ctx->lock); - - if (swapcontext(&ctx->uctx, uctx)) + break; + } + + //printf("ctx %p %p: exit\n", ctx, ctx->thread); + while (1) { + if (swapcontext(&ctx->uctx, &ctx->thread->uctx)) { + if (errno == EINTR) + continue; triton_log_error("swapcontext: %s\n", strerror(errno)); + } else + break; } } } @@ -160,6 +197,7 @@ struct _triton_thread_t *create_thread() int triton_queue_ctx(struct _triton_context_t *ctx) { + ctx->pending = 1; if (ctx->thread || ctx->queued || ctx->sleeping) return 0; @@ -168,12 +206,14 @@ int triton_queue_ctx(struct _triton_context_t *ctx) list_add_tail(&ctx->entry2, &ctx_queue); spin_unlock(&threads_lock); ctx->queued = 1; + //printf("ctx %p: queued\n", ctx); __sync_fetch_and_add(&triton_stat.context_pending, 1); return 0; } ctx->thread = list_entry(sleep_threads.next, typeof(*ctx->thread), entry2); ctx->thread->ctx = ctx; + //printf("ctx %p: assigned to thread %p\n", ctx, ctx->thread); list_del(&ctx->thread->entry2); spin_unlock(&threads_lock); @@ -204,12 +244,13 @@ int __export triton_context_register(struct triton_context_t *ud, void *bf_arg) } ctx->uctx.uc_stack.ss_size = CTX_STACK_SIZE; - ctx->uctx.uc_stack.ss_sp = _malloc(CTX_STACK_SIZE); + ctx->uctx.uc_stack.ss_sp = mempool_alloc(ctx_stack_pool); if (!ctx->uctx.uc_stack.ss_sp) { triton_log_error("out of memory\n"); _free(ctx); return -1; } + sigfillset(&ctx->uctx.uc_sigmask); makecontext(&ctx->uctx, (void (*)())ctx_thread, 1, ctx); ud->tpd = ctx; @@ -223,6 +264,15 @@ int __export triton_context_register(struct triton_context_t *ud, void *bf_arg) return 0; } +int __export triton_context_print() +{ + struct _triton_context_t *ctx; + + list_for_each_entry(ctx, &ctx_list, entry) + if (ctx->ud) + printf("%s:%i\n", ctx->ud->fname, ctx->ud->line); +} + void __export triton_context_unregister(struct triton_context_t *ud) { struct _triton_context_t *ctx = (struct _triton_context_t *)ud->tpd; @@ -244,14 +294,6 @@ void __export triton_context_unregister(struct triton_context_t *ud) } if (!list_empty(&ctx->timers)) { triton_log_error("BUG:ctx:triton_unregister_ctx: timers is not empty"); - { - struct _triton_timer_t *t; - while(!list_empty(&ctx->timers)) { - t = list_entry(ctx->timers.next, typeof(*t), entry); - t->ud->expire(t->ud); - list_del(&t->entry); - } - } abort(); } if (!list_empty(&ctx->pending_timers)) { @@ -259,8 +301,6 @@ void __export triton_context_unregister(struct triton_context_t *ud) abort(); } - _free(ctx->uctx.uc_stack.ss_sp); - ctx->need_free = 1; spin_lock(&ctx_list_lock); list_del(&ctx->entry); @@ -278,18 +318,28 @@ void __export triton_context_schedule(struct triton_context_t *ud) ctx->thread = NULL; spin_unlock(&ctx->lock); - if (swapcontext(&ctx->uctx, uctx)) - triton_log_error("swaswpntext: %s\n", strerror(errno)); + while (1) { + if (swapcontext(&ctx->uctx, uctx)) { + if (errno == EINTR) + continue; + triton_log_error("swaswpntext: %s\n", strerror(errno)); + } else + break; + } __sync_fetch_and_add(&triton_stat.context_sleeping, 1); } -void __export triton_context_wakeup(struct triton_context_t *ud) +int __export triton_context_wakeup(struct triton_context_t *ud) { struct _triton_context_t *ctx = (struct _triton_context_t *)ud->tpd; int r; spin_lock(&ctx->lock); + if (!ctx->sleeping) { + spin_unlock(&ctx->lock); + return -1; + } ctx->sleeping = 0; r = triton_queue_ctx(ctx); spin_unlock(&ctx->lock); @@ -298,6 +348,8 @@ void __export triton_context_wakeup(struct triton_context_t *ud) triton_thread_wakeup(ctx->thread); __sync_fetch_and_sub(&triton_stat.context_sleeping, 1); + + return 0; } int __export triton_context_call(struct triton_context_t *ud, void (*func)(void *), void *arg) @@ -327,12 +379,9 @@ int __export triton_init(const char *conf_file) { ctx_pool = mempool_create(sizeof(struct _triton_context_t)); call_pool = mempool_create(sizeof(struct _triton_ctx_call_t)); + ctx_stack_pool = mempool_create(CTX_STACK_SIZE); default_ctx = _malloc(sizeof(*default_ctx)); - if (!default_ctx) { - fprintf(stderr,"cann't allocate memory\n"); - return -1; - } triton_context_register(default_ctx, NULL); if (conf_load(conf_file)) @@ -365,6 +414,11 @@ void __export triton_run() { struct _triton_thread_t *t; int i; + char *opt; + + opt = conf_get_opt("core", "thread-count"); + if (opt && atoi(opt) > 0) + thread_count = atoi(opt); for(i = 0; i < thread_count; i++) { t = create_thread(); diff --git a/accel-pptpd/triton/triton.h b/accel-pptpd/triton/triton.h index c3c9d2a9..6f702759 100644 --- a/accel-pptpd/triton/triton.h +++ b/accel-pptpd/triton/triton.h @@ -8,7 +8,9 @@ struct triton_context_t { - const void *tpd; // triton private data, don't touch! + const void *tpd; // triton private data, don't touch + const char *fname; + int line; void (*close)(struct triton_context_t*); void (*free)(struct triton_context_t*); void (*before_switch)(struct triton_context_t *ctx, void *arg); @@ -69,7 +71,7 @@ extern struct triton_stat_t triton_stat; int triton_context_register(struct triton_context_t *, void *arg); void triton_context_unregister(struct triton_context_t *); void triton_context_schedule(struct triton_context_t *); -void triton_context_wakeup(struct triton_context_t *); +int triton_context_wakeup(struct triton_context_t *); int triton_context_call(struct triton_context_t *, void (*func)(void *), void *arg); #define MD_MODE_READ 1 diff --git a/accel-pptpd/triton/triton_p.h b/accel-pptpd/triton/triton_p.h index ec2eb840..7c0615f7 100644 --- a/accel-pptpd/triton/triton_p.h +++ b/accel-pptpd/triton/triton_p.h @@ -42,6 +42,7 @@ struct _triton_context_t int sleeping:1; int need_close:1; int need_free:1; + int pending:1; struct triton_context_t *ud; void *bf_arg; -- cgit v1.2.3