diff options
author | Dmitry Kozlov <xeb@mail.ru> | 2011-01-05 15:18:59 +0300 |
---|---|---|
committer | Dmitry Kozlov <xeb@mail.ru> | 2011-01-05 15:18:59 +0300 |
commit | f28cb1b0a926f1ea98700b7871537ad1793511fd (patch) | |
tree | baf35570bc6b38b6fab5b6524e8f19f58f71e57f /accel-pppd/logs | |
parent | 2fdf3586c13a72c36f9530084962e29d57dc0329 (diff) | |
download | accel-ppp-f28cb1b0a926f1ea98700b7871537ad1793511fd.tar.gz accel-ppp-f28cb1b0a926f1ea98700b7871537ad1793511fd.zip |
rename accel-pptp to accel-ppp
Diffstat (limited to 'accel-pppd/logs')
-rw-r--r-- | accel-pppd/logs/CMakeLists.txt | 17 | ||||
-rw-r--r-- | accel-pppd/logs/log_file.c | 614 | ||||
-rw-r--r-- | accel-pppd/logs/log_pgsql.c | 321 | ||||
-rw-r--r-- | accel-pppd/logs/log_tcp.c | 323 |
4 files changed, 1275 insertions, 0 deletions
diff --git a/accel-pppd/logs/CMakeLists.txt b/accel-pppd/logs/CMakeLists.txt new file mode 100644 index 0000000..ce909b6 --- /dev/null +++ b/accel-pppd/logs/CMakeLists.txt @@ -0,0 +1,17 @@ +ADD_LIBRARY(log_file SHARED log_file.c) +TARGET_LINK_LIBRARIES(log_file rt) + +ADD_LIBRARY(log_tcp SHARED log_tcp.c) + +INSTALL(TARGETS log_file log_tcp + LIBRARY DESTINATION lib/accel-ppp +) + +IF(LOG_PGSQL) + ADD_LIBRARY(log_pgsql SHARED log_pgsql.c) + TARGET_LINK_LIBRARIES(log_pgsql pq) + INSTALL(TARGETS log_pgsql + LIBRARY DESTINATION lib/accel-ppp + ) +ENDIF(LOG_PGSQL) + diff --git a/accel-pppd/logs/log_file.c b/accel-pppd/logs/log_file.c new file mode 100644 index 0000000..e435757 --- /dev/null +++ b/accel-pppd/logs/log_file.c @@ -0,0 +1,614 @@ +#include <stdlib.h> +#include <stdio.h> +#include <errno.h> +#include <string.h> +#include <fcntl.h> +#include <unistd.h> +#include <limits.h> +#include <aio.h> +#include <sys/stat.h> +#include <sys/types.h> + +#include "log.h" +#include "events.h" +#include "ppp.h" +#include "spinlock.h" +#include "mempool.h" + +#include "memdebug.h" + +#define LOG_BUF_SIZE 16*1024 + +#define RED_COLOR "\033[1;31m" +#define GREEN_COLOR "\033[1;32m" +#define YELLOW_COLOR "\033[1;33m" +#define BLUE_COLOR "\033[1;34m" +#define NORMAL_COLOR "\033[0;39m" + +struct log_file_t +{ + struct list_head entry; + struct list_head msgs; + spinlock_t lock; + int need_free:1; + int queued:1; + struct log_file_pd_t *lpd; + + int fd; + int new_fd; + off_t offset; + unsigned long magic; +}; + +struct log_file_pd_t +{ + struct ppp_pd_t pd; + struct log_file_t lf; + unsigned long tmp; +}; + +static int conf_color; +static int conf_per_session; +static char *conf_per_user_dir; +static char *conf_per_session_dir; +static int conf_copy; + +static const char* level_name[]={" msg", "error", " warn", " info", " info", "debug"}; +static const char* level_color[]={NORMAL_COLOR, RED_COLOR, YELLOW_COLOR, GREEN_COLOR, GREEN_COLOR, BLUE_COLOR}; + +static void *pd_key1; +static void *pd_key2; +static struct log_file_t *log_file; + +static mempool_t lpd_pool; +static char *log_buf; + +static struct aiocb aiocb = { + .aio_lio_opcode = LIO_WRITE, + .aio_sigevent.sigev_notify = SIGEV_SIGNAL, + .aio_sigevent.sigev_signo = SIGIO, +}; + +static LIST_HEAD(lf_queue); +static spinlock_t lf_queue_lock = SPINLOCK_INITIALIZER; +static int lf_queue_sleeping = 1; + +static unsigned long temp_seq; + +static void send_next_chunk(); + + +static void log_file_init(struct log_file_t *lf) +{ + spinlock_init(&lf->lock); + INIT_LIST_HEAD(&lf->msgs); + lf->fd = -1; + lf->new_fd = -1; +} + +static int log_file_open(struct log_file_t *lf, const char *fname) +{ + lf->fd = open(fname, O_WRONLY | O_CREAT, S_IRUSR | S_IWUSR); + if (lf->fd < 0) { + log_emerg("log_file: open '%s': %s\n", fname, strerror(errno)); + return -1; + } + + lf->offset = lseek(lf->fd, 0, SEEK_END); + + return 0; +} + +static void sigio(int num, siginfo_t *si, void *uc) +{ + struct log_file_t *lf; + int n; + + if (si->si_signo != SIGIO) + return; + + if (si->si_code != SI_ASYNCIO) { + if (aio_write(&aiocb)) + log_emerg("log_file: aio_write: %s\n", strerror(errno)); + return; + } + + lf = (struct log_file_t *)si->si_ptr; + + n = aio_return(&aiocb); + if (n < 0) + log_emerg("log_file: %s\n", strerror(aio_error(&aiocb))); + else if (n != aiocb.aio_nbytes) + log_emerg("log_file: short write %p %i %lu\n", lf, n, aiocb.aio_nbytes); + + spin_lock(&lf->lock); + lf->offset += n; + if (list_empty(&lf->msgs)) { + if (lf->need_free) { + spin_unlock(&lf->lock); + close(lf->fd); + mempool_free(lf->lpd); + } else { + lf->queued = 0; + spin_unlock(&lf->lock); + } + } else { + spin_unlock(&lf->lock); + + spin_lock(&lf_queue_lock); + list_add_tail(&lf->entry, &lf_queue); + spin_unlock(&lf_queue_lock); + } + + send_next_chunk(); +} + +static int dequeue_log(struct log_file_t *lf) +{ + int n, pos = 0; + struct log_msg_t *msg; + struct log_chunk_t *chunk; + + while (1) { + spin_lock(&lf->lock); + if (list_empty(&lf->msgs)) { + spin_unlock(&lf->lock); + return pos; + } + msg = list_entry(lf->msgs.next, typeof(*msg), entry); + list_del(&msg->entry); + spin_unlock(&lf->lock); + + if (pos + msg->hdr->len > LOG_BUF_SIZE) + goto overrun; + memcpy(log_buf + pos, msg->hdr->msg, msg->hdr->len); + n = msg->hdr->len; + + list_for_each_entry(chunk, msg->chunks, entry) { + if (pos + n + chunk->len > LOG_BUF_SIZE) + goto overrun; + memcpy(log_buf + pos + n, chunk->msg, chunk->len); + n += chunk->len; + } + + log_free_msg(msg); + pos += n; + } + +overrun: + spin_lock(&lf->lock); + list_add(&msg->entry, &lf->msgs); + spin_unlock(&lf->lock); + + return pos; +} + +static void send_next_chunk(void) +{ + struct log_file_t *lf; + int n; + + spin_lock(&lf_queue_lock); + if (list_empty(&lf_queue)) { + lf_queue_sleeping = 1; + spin_unlock(&lf_queue_lock); + return; + } + lf = list_entry(lf_queue.next, typeof(*lf), entry); + + n = log_file->entry.next == NULL; + list_del(&lf->entry); + + spin_unlock(&lf_queue_lock); + + if (lf->new_fd != -1) { + close(lf->fd); + lf->fd = lf->new_fd; + lf->new_fd = -1; + lf->offset = 0; + } + + aiocb.aio_fildes = lf->fd; + aiocb.aio_offset = lf->offset; + aiocb.aio_sigevent.sigev_value.sival_ptr = lf; + aiocb.aio_nbytes = dequeue_log(lf); + + if (aio_write(&aiocb)) + log_emerg("log_file: aio_write: %s\n", strerror(errno)); +} + +static void queue_lf(struct log_file_t *lf) +{ + int r; + + spin_lock(&lf_queue_lock); + list_add_tail(&lf->entry, &lf_queue); + r = lf_queue_sleeping; + lf_queue_sleeping = 0; + spin_unlock(&lf_queue_lock); + + if (r) + send_next_chunk(); +} + +static void queue_log(struct log_file_t *lf, struct log_msg_t *msg) +{ + int r; + + spin_lock(&lf->lock); + list_add_tail(&msg->entry, &lf->msgs); + if (lf->fd != -1) { + r = lf->queued; + lf->queued = 1; + } else + r = 1; + spin_unlock(&lf->lock); + + if (!r) + queue_lf(lf); +} + +static void set_hdr(struct log_msg_t *msg, struct ppp_t *ppp) +{ + struct tm tm; + char timestamp[32]; + + localtime_r(&msg->timestamp.tv_sec, &tm); + + strftime(timestamp, sizeof(timestamp), "%Y-%m-%d %H:%M:%S", &tm); + sprintf(msg->hdr->msg, "%s[%s]: %s: %s%s%s", conf_color ? level_color[msg->level] : "", + timestamp, level_name[msg->level], + ppp ? ppp->ifname : "", + ppp ? ": " : "", + conf_color ? NORMAL_COLOR : ""); + msg->hdr->len = strlen(msg->hdr->msg); +} + +static void general_log(struct log_target_t *t, struct log_msg_t *msg, struct ppp_t *ppp) +{ + if (ppp && !conf_copy) { + log_free_msg(msg); + return; + } + + set_hdr(msg, ppp); + queue_log(log_file, msg); +} + +static struct log_file_pd_t *find_pd(struct ppp_t *ppp, void *pd_key) +{ + struct ppp_pd_t *pd; + struct log_file_pd_t *lpd; + + list_for_each_entry(pd, &ppp->pd_list, entry) { + if (pd->key == pd_key) { + lpd = container_of(pd, typeof(*lpd), pd); + return lpd; + } + } + return NULL; +} + +static void per_user_log(struct log_target_t *t, struct log_msg_t *msg, struct ppp_t *ppp) +{ + struct log_file_pd_t *lpd; + + if (!ppp) { + log_free_msg(msg); + return; + } + + lpd = find_pd(ppp, &pd_key1); + + if (!lpd) { + log_free_msg(msg); + return; + } + + set_hdr(msg, ppp); + queue_log(&lpd->lf, msg); +} + +static void per_session_log(struct log_target_t *t, struct log_msg_t *msg, struct ppp_t *ppp) +{ + struct log_file_pd_t *lpd; + + if (!ppp) { + log_free_msg(msg); + return; + } + + lpd = find_pd(ppp, &pd_key2); + + if (!lpd) { + log_free_msg(msg); + return; + } + + set_hdr(msg, ppp); + queue_log(&lpd->lf, msg); +} + +static void general_reopen(void) +{ + char *fname = conf_get_opt("log", "log-file"); + int fd = open(fname, O_WRONLY | O_CREAT, S_IRUSR | S_IWUSR); + if (fd < 0) { + log_emerg("log_file: open '%s': %s\n", fname, strerror(errno)); + return; + } + log_file->new_fd = fd; +} + +static void free_lpd(struct log_file_pd_t *lpd) +{ + struct log_msg_t *msg; + + spin_lock(&lpd->lf.lock); + list_del(&lpd->pd.entry); + lpd->lf.need_free = 1; + if (lpd->lf.queued) + spin_unlock(&lpd->lf.lock); + else { + while (!list_empty(&lpd->lf.msgs)) { + msg = list_entry(lpd->lf.msgs.next, typeof(*msg), entry); + list_del(&msg->entry); + log_free_msg(msg); + } + if (lpd->lf.fd != -1) + close(lpd->lf.fd); + spin_unlock(&lpd->lf.lock); + mempool_free(lpd); + } +} + +static void ev_ctrl_started(struct ppp_t *ppp) +{ + struct log_file_pd_t *lpd; + char *fname; + + if (conf_per_user_dir) { + lpd = mempool_alloc(lpd_pool); + if (!lpd) { + log_emerg("log_file: out of memory\n"); + return; + } + memset(lpd, 0, sizeof(*lpd)); + lpd->pd.key = &pd_key1; + log_file_init(&lpd->lf); + lpd->lf.lpd = lpd; + list_add_tail(&lpd->pd.entry, &ppp->pd_list); + } + + if (conf_per_session_dir) { + lpd = mempool_alloc(lpd_pool); + if (!lpd) { + log_emerg("log_file: out of memory\n"); + return; + } + memset(lpd, 0, sizeof(*lpd)); + lpd->pd.key = &pd_key2; + log_file_init(&lpd->lf); + lpd->lf.lpd = lpd; + + fname = _malloc(PATH_MAX); + if (!fname) { + mempool_free(lpd); + log_emerg("log_file: out of memory\n"); + return; + } + + lpd->tmp = temp_seq++; + strcpy(fname, conf_per_session_dir); + strcat(fname, "/tmp"); + sprintf(fname + strlen(fname), "%lu", lpd->tmp); + + if (log_file_open(&lpd->lf, fname)) { + mempool_free(lpd); + _free(fname); + return; + } + + _free(fname); + + list_add_tail(&lpd->pd.entry, &ppp->pd_list); + } +} + +static void ev_ctrl_finished(struct ppp_t *ppp) +{ + struct log_file_pd_t *lpd; + char *fname; + + lpd = find_pd(ppp, &pd_key1); + if (lpd) + free_lpd(lpd); + + + lpd = find_pd(ppp, &pd_key2); + if (lpd) { + if (lpd->tmp) { + fname = _malloc(PATH_MAX); + if (fname) { + strcpy(fname, conf_per_session_dir); + strcat(fname, "/tmp"); + sprintf(fname + strlen(fname), "%lu", lpd->tmp); + if (unlink(fname)) + log_emerg("log_file: unlink '%s': %s\n", fname, strerror(errno)); + _free(fname); + } else + log_emerg("log_file: out of memory\n"); + } + free_lpd(lpd); + } +} + +static void ev_ppp_starting(struct ppp_t *ppp) +{ + struct log_file_pd_t *lpd; + char *fname1, *fname2; + + lpd = find_pd(ppp, &pd_key2); + if (!lpd) + return; + + fname1 = _malloc(PATH_MAX); + if (!fname1) { + log_emerg("log_file: out of memory\n"); + return; + } + + fname2 = _malloc(PATH_MAX); + if (!fname2) { + log_emerg("log_file: out of memory\n"); + _free(fname1); + return; + } + + strcpy(fname1, conf_per_session_dir); + strcat(fname1, "/tmp"); + sprintf(fname1 + strlen(fname1), "%lu", lpd->tmp); + + strcpy(fname2, conf_per_session_dir); + strcat(fname2, "/"); + strcat(fname2, ppp->sessionid); + strcat(fname2, ".log"); + + if (rename(fname1, fname2)) + log_emerg("log_file: rename '%s' to '%s': %s\n", fname1, fname2, strerror(errno)); + + lpd->tmp = 0; + + _free(fname1); + _free(fname2); +} + +static void ev_ppp_authorized(struct ppp_t *ppp) +{ + struct log_file_pd_t *lpd; + char *fname; + + lpd = find_pd(ppp, &pd_key1); + if (!lpd) + return; + + fname = _malloc(PATH_MAX); + if (!fname) { + log_emerg("log_file: out of memory\n"); + return; + } + + strcpy(fname, conf_per_user_dir); + strcat(fname, "/"); + strcat(fname, ppp->username); + if (conf_per_session) { + if (mkdir(fname, S_IRWXU) && errno != EEXIST) { + log_emerg("log_file: mkdir '%s': %s'\n", fname, strerror(errno)); + goto out_err; + } + strcat(fname, "/"); + strcat(fname, ppp->sessionid); + } + strcat(fname, ".log"); + + if (log_file_open(&lpd->lf, fname)) + goto out_err; + + _free(fname); + + if (!list_empty(&lpd->lf.msgs)) { + lpd->lf.queued = 1; + queue_lf(&lpd->lf); + } + + return; + +out_err: + _free(fname); + list_del(&lpd->pd.entry); + free_lpd(lpd); +} + +static struct log_target_t general_target = +{ + .log = general_log, + .reopen = general_reopen, +}; + +static struct log_target_t per_user_target = +{ + .log = per_user_log, +}; + +static struct log_target_t per_session_target = +{ + .log = per_session_log, +}; + +static void __init init(void) +{ + char *opt; + + sigset_t set; + sigemptyset(&set); + sigaddset(&set, SIGIO); + + struct sigaction sa = { + .sa_sigaction = sigio, + .sa_flags = SA_SIGINFO, + .sa_mask = set, + }; + + lpd_pool = mempool_create(sizeof(struct log_file_pd_t)); + log_buf = malloc(LOG_BUF_SIZE); + aiocb.aio_buf = log_buf; + + if (sigaction(SIGIO, &sa, NULL)) { + log_emerg("log_file: sigaction: %s\n", strerror(errno)); + return; + } + + opt = conf_get_opt("log", "log-file"); + if (opt) { + log_file = malloc(sizeof(*log_file)); + memset(log_file, 0, sizeof(*log_file)); + log_file_init(log_file); + if (log_file_open(log_file, opt)) { + free(log_file); + _exit(EXIT_FAILURE); + } + } + + opt = conf_get_opt("log","color"); + if (opt && atoi(opt) > 0) + conf_color = 1; + + opt = conf_get_opt("log", "per-user-dir"); + if (opt) + conf_per_user_dir = _strdup(opt); + + opt = conf_get_opt("log", "per-session-dir"); + if (opt) + conf_per_session_dir = _strdup(opt); + + opt = conf_get_opt("log", "per-session"); + if (opt && atoi(opt) > 0) + conf_per_session = 1; + + opt = conf_get_opt("log", "copy"); + if (opt && atoi(opt) > 0) + conf_copy = 1; + + log_register_target(&general_target); + + if (conf_per_user_dir) + log_register_target(&per_user_target); + + if (conf_per_session_dir) + log_register_target(&per_session_target); + + triton_event_register_handler(EV_CTRL_STARTED, (triton_event_func)ev_ctrl_started); + triton_event_register_handler(EV_CTRL_FINISHED, (triton_event_func)ev_ctrl_finished); + triton_event_register_handler(EV_PPP_STARTING, (triton_event_func)ev_ppp_starting); + triton_event_register_handler(EV_PPP_AUTHORIZED, (triton_event_func)ev_ppp_authorized); +} diff --git a/accel-pppd/logs/log_pgsql.c b/accel-pppd/logs/log_pgsql.c new file mode 100644 index 0000000..af67e0b --- /dev/null +++ b/accel-pppd/logs/log_pgsql.c @@ -0,0 +1,321 @@ +#include <stdio.h> +#include <stdlib.h> +#include <string.h> + +#include <postgresql/libpq-fe.h> + +#include "triton.h" +#include "spinlock.h" +#include "log.h" +#include "list.h" +#include "ppp.h" + +#include "memdebug.h" + +static char *conf_conninfo; +static int conf_queue_max = 1000; +static char *conf_query; +#define QUERY_TEMPLATE "insert into %s (timestamp, username, sessionid, msg) values ($1, $2, $3, $4)" + +static void start_connect(void); +static void start_connect_timer(struct triton_timer_t *); +static void pgsql_close(struct triton_context_t *ctx); + +static struct triton_context_t pgsql_ctx = { + .close = pgsql_close, + .before_switch = log_switch, +}; +static struct triton_md_handler_t pgsql_hnd; +static struct triton_timer_t connect_timer = { + .period = 5000, + .expire = start_connect_timer, +}; + +static PGconn *conn; + +static LIST_HEAD(msg_queue); +static int queue_size; +static int sleeping = 0; +static spinlock_t queue_lock = SPINLOCK_INITIALIZER; +static char *log_buf; +static int need_close; + +static void unpack_msg(struct log_msg_t *msg) +{ + struct log_chunk_t *chunk; + int pos = 0; + + list_for_each_entry(chunk, msg->chunks, entry) { + memcpy(log_buf + pos, chunk->msg, chunk->len); + pos += chunk->len; + } + if (pos > 1) + log_buf[pos - 1] = 0; + else + log_buf[0] = 0; +} + +static void set_hdr(struct log_msg_t *msg, struct ppp_t *ppp) +{ + struct tm tm; + + localtime_r(&msg->timestamp.tv_sec, &tm); + + strftime(msg->hdr->msg, LOG_CHUNK_SIZE, "%Y-%m-%d %H:%M:%S", &tm); + msg->hdr->len = strlen(msg->hdr->msg) + 1; + if (ppp && ppp->username) { + strcpy(msg->hdr->msg + msg->hdr->len, ppp->username); + msg->hdr->len += strlen(ppp->username) + 1; + strcpy(msg->hdr->msg + msg->hdr->len, ppp->sessionid); + msg->hdr->len += strlen(ppp->sessionid) + 1; + } else + memset(msg->hdr->msg + msg->hdr->len, 0, 2); + +} + +static void write_next_msg(void) +{ + struct log_msg_t *msg; + const char *paramValues[4]; + int paramFormats[4] = {0, 0, 0, 0}; + char *ptr1, *ptr2; + int r; + + spin_lock(&queue_lock); + if (list_empty(&msg_queue)) { + sleeping = 1; + spin_unlock(&queue_lock); + if (need_close) { + triton_md_unregister_handler(&pgsql_hnd); + PQfinish(conn); + conn = NULL; + triton_context_unregister(&pgsql_ctx); + } + return; + } + + msg = list_entry(msg_queue.next, typeof(*msg), entry); + list_del(&msg->entry); + --queue_size; + spin_unlock(&queue_lock); + + unpack_msg(msg); + + ptr1 = strchr(msg->hdr->msg, 0); + ptr2 = strchr(ptr1 + 1, 0); + + paramValues[1] = ptr1[1] ? ptr1 + 1 : NULL; + paramValues[2] = ptr2[1] ? ptr2 + 1 : NULL; + paramValues[0] = msg->hdr->msg; + paramValues[3] = log_buf; + + if (!PQsendQueryParams(conn, conf_query, 4, NULL, paramValues, NULL, paramFormats, 0)) + log_emerg("log_pgsql: %s\n", PQerrorMessage(conn)); + + log_free_msg(msg); + + r = PQflush(conn); + if (r == -1) + log_emerg("log_pgsql: %s\n", PQerrorMessage(conn)); + if (r == 0) + triton_md_enable_handler(&pgsql_hnd, MD_MODE_WRITE); +} + +static int pgsql_check_ready(struct triton_md_handler_t *h) +{ + PGresult *res; + + if (!PQconsumeInput(conn)) { + log_emerg("log_pgsql: %s\n", PQerrorMessage(conn)); + if (PQstatus(conn) == CONNECTION_BAD) { + PQfinish(conn); + start_connect(); + } + } + + if (PQisBusy(conn)) + return 0; + + while (1) { + res = PQgetResult(conn); + if (!res) + break; + if (PQresultStatus(res) != PGRES_COMMAND_OK) + log_emerg("log_pgsql: %s\n", PQerrorMessage(conn)); + PQclear(res); + } + + write_next_msg(); + + return 0; +} + +static int pgsql_flush(struct triton_md_handler_t *h) +{ + int r; + + r = PQflush(conn); + if (r == -1) + log_emerg("log_pgsql: %s\n", PQerrorMessage(conn)); + if (r == 1) + return 0; + + triton_md_disable_handler(&pgsql_hnd, MD_MODE_WRITE); + return 0; +} + +static void wakeup_log(void) +{ + write_next_msg(); +} + +static void queue_log(struct log_msg_t *msg) +{ + int r = 0, f = 0; + spin_lock(&queue_lock); + if (!conn) { + log_free_msg(msg); + spin_unlock(&queue_lock); + return; + } + if (queue_size < conf_queue_max) { + list_add_tail(&msg->entry, &msg_queue); + ++queue_size; + r = sleeping; + sleeping = 0; + } else + f = 1; + spin_unlock(&queue_lock); + + if (r) + triton_context_call(&pgsql_ctx, (void (*)(void*))wakeup_log, NULL); + else if (f) + log_free_msg(msg); +} + + +static void general_log(struct log_target_t *t, struct log_msg_t *msg, struct ppp_t *ppp) +{ + set_hdr(msg, ppp); + queue_log(msg); +} + +static int wait_connect(struct triton_md_handler_t *h) +{ + PostgresPollingStatusType status = PQconnectPoll(conn); + char *err_msg; + + switch(status) { + case PGRES_POLLING_READING: + triton_md_enable_handler(h, MD_MODE_READ); + triton_md_disable_handler(h, MD_MODE_WRITE); + break; + case PGRES_POLLING_WRITING: + triton_md_enable_handler(h, MD_MODE_WRITE); + triton_md_disable_handler(h, MD_MODE_READ); + break; + case PGRES_POLLING_FAILED: + err_msg = PQerrorMessage(conn); + log_emerg("log_pgsql: %s\n", err_msg); + triton_md_disable_handler(h, MD_MODE_READ | MD_MODE_WRITE); + PQfinish(conn); + h->read = NULL; + h->write = NULL; + if (!connect_timer.tpd) + triton_timer_add(&pgsql_ctx, &connect_timer, 0); + break; + case PGRES_POLLING_OK: + //triton_md_disable_handler(h, MD_MODE_READ | MD_MODE_WRITE); + PQsetnonblocking(conn, 1); + h->write = pgsql_flush; + h->read = pgsql_check_ready; + triton_md_enable_handler(&pgsql_hnd, MD_MODE_READ); + wakeup_log(); + break; + default: + break; + } + return 0; +} + +static void start_connect(void) +{ + conn = PQconnectStart(conf_conninfo); + if (!conn) { + log_emerg("log_pgsql: out of memory\n"); + return; + } + + if (PQstatus(conn) == CONNECTION_BAD) { + log_emerg("log_pgsql: PQconnectStart failed\n"); + } + + pgsql_hnd.fd = PQsocket(conn); + pgsql_hnd.read = wait_connect; + pgsql_hnd.write = wait_connect; + + wait_connect(&pgsql_hnd); +} + +static void start_connect_timer(struct triton_timer_t *t) +{ + triton_timer_del(t); + start_connect(); +} + +static void pgsql_close(struct triton_context_t *ctx) +{ + spin_lock(&queue_lock); + if (sleeping) { + triton_md_unregister_handler(&pgsql_hnd); + PQfinish(conn); + conn = NULL; + triton_context_unregister(&pgsql_ctx); + } else + need_close = 1; + spin_unlock(&queue_lock); +} + +static struct log_target_t target = { + .log = general_log, +}; + +static void __init init(void) +{ + char *opt; + + opt = conf_get_opt("log-pgsql", "conninfo"); + if (!opt) + return; + conf_conninfo = _strdup(opt); + + opt = conf_get_opt("log-pgsql", "connect-inteval"); + if (opt && atoi(opt) > 0) + connect_timer.period = atoi(opt) * 1000; + + opt = conf_get_opt("log-pgsql", "log-query"); + if (opt) + conf_query = _strdup(opt); + else { + opt = conf_get_opt("log-pgsql", "log-table"); + if (!opt || strlen(opt) > 32) + opt = "log"; + conf_query = _malloc(sizeof(QUERY_TEMPLATE) + strlen(opt)); + sprintf(conf_query, QUERY_TEMPLATE, opt); + } + + log_buf = _malloc(LOG_MAX_SIZE + 1); + if (!log_buf) { + log_emerg("log_pgsql: out of memory\n"); + return; + } + + triton_context_register(&pgsql_ctx, NULL); + triton_md_register_handler(&pgsql_ctx, &pgsql_hnd); + triton_md_set_trig(&pgsql_hnd, MD_TRIG_LEVEL); + triton_context_wakeup(&pgsql_ctx); + + start_connect(); + + log_register_target(&target); +} diff --git a/accel-pppd/logs/log_tcp.c b/accel-pppd/logs/log_tcp.c new file mode 100644 index 0000000..306c450 --- /dev/null +++ b/accel-pppd/logs/log_tcp.c @@ -0,0 +1,323 @@ +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <fcntl.h> +#include <unistd.h> +#include <errno.h> +#include <sys/socket.h> +#include <arpa/inet.h> +#include <netinet/in.h> + +#include "log.h" +#include "triton.h" +#include "events.h" +#include "ppp.h" +#include "spinlock.h" +#include "mempool.h" + +#include "memdebug.h" + +struct tcp_target_t +{ + struct log_target_t target; + struct list_head entry; + struct triton_md_handler_t hnd; + struct triton_timer_t conn_timer; + struct sockaddr_in addr; + char *buf; + int buf_size; + int buf_pos; + spinlock_t lock; + struct list_head queue; + int queue_len; + int connected:1; + int wait:1; +}; + +static int conf_connect_interval = 5; +static int conf_queue_len = 1000; + +static struct triton_context_t tcp_ctx; + +static const char* level_name[]={" msg", "error", " warn", " info", " info", "debug"}; + +static void start_connect(struct tcp_target_t *t); + +static LIST_HEAD(targets); + +static void disconnect(struct tcp_target_t *t) +{ + triton_md_unregister_handler(&t->hnd); + close(t->hnd.fd); + + start_connect(t); +} + +static void unpack_msg(struct tcp_target_t *t, struct log_msg_t *msg) +{ + struct log_chunk_t *chunk; + int pos = strlen(msg->hdr->msg); + + strcpy(t->buf, msg->hdr->msg); + + list_for_each_entry(chunk, msg->chunks, entry) { + memcpy(t->buf + pos, chunk->msg, chunk->len); + pos += chunk->len; + } + + t->buf_size = pos; + t->buf_pos = 0; +} + +static int send_log(struct tcp_target_t *t) +{ + struct log_msg_t *msg; + int n; + + while (1) { + spin_lock(&t->lock); + if (!t->queue_len) { + t->wait = 0; + spin_unlock(&t->lock); + return 0; + } + msg = list_entry(t->queue.next, typeof(*msg), entry); + list_del(&msg->entry); + t->queue_len--; + spin_unlock(&t->lock); + + unpack_msg(t, msg); + + log_free_msg(msg); + + while (t->buf_pos != t->buf_size) { + n = write(t->hnd.fd, t->buf + t->buf_pos, t->buf_size - t->buf_pos); + if (n < 0) { + if (errno == EAGAIN) + return 1; + if (errno != EPIPE) + log_emerg("log-tcp: write: %s\n", strerror(errno)); + disconnect(t); + return 0; + } + t->buf_pos += n; + } + } +} + +static void queue_log(struct tcp_target_t *t, struct log_msg_t *msg) +{ + int r; + + spin_lock(&t->lock); + if (t->queue_len == conf_queue_len) { + spin_unlock(&t->lock); + log_free_msg(msg); + return; + } + list_add_tail(&msg->entry, &t->queue); + t->queue_len++; + if (t->connected) { + r = t->wait; + t->wait = 1; + } else + r = 1; + spin_unlock(&t->lock); + + if (!r) { + if (send_log(t)) + triton_md_enable_handler(&t->hnd, MD_MODE_WRITE); + } +} + +static void set_hdr(struct log_msg_t *msg, struct ppp_t *ppp) +{ + struct tm tm; + char timestamp[32]; + + localtime_r(&msg->timestamp.tv_sec, &tm); + + strftime(timestamp, sizeof(timestamp), "%Y-%m-%d %H:%M:%S", &tm); + sprintf(msg->hdr->msg, "[%s]: %s: %s: ", timestamp, level_name[msg->level], ppp ? ppp->ifname : ""); + msg->hdr->len = strlen(msg->hdr->msg); +} + +static void general_log(struct log_target_t *lt, struct log_msg_t *msg, struct ppp_t *ppp) +{ + struct tcp_target_t *t = container_of(lt, typeof(*t), target); + + set_hdr(msg, ppp); + queue_log(t, msg); +} + +static int log_tcp_write(struct triton_md_handler_t *h) +{ + struct tcp_target_t *t = container_of(h, typeof(*t), hnd); + + if (!send_log(t)) + triton_md_disable_handler(h, MD_MODE_WRITE); + + return 0; +} + +static int log_tcp_connect(struct triton_md_handler_t *h) +{ + struct tcp_target_t *t = container_of(h, typeof(*t), hnd); + + if (connect(t->hnd.fd, &t->addr, sizeof(t->addr))) { + if (errno == EAGAIN) + return 0; + if (errno == EINPROGRESS) + return 0; + log_emerg("log-tcp: connect: %s\n", strerror(errno)); + triton_md_unregister_handler(&t->hnd); + close(t->hnd.fd); + triton_timer_add(&tcp_ctx, &t->conn_timer, 0); + return 0; + } + + t->hnd.write = log_tcp_write; + + triton_md_disable_handler(&t->hnd, MD_MODE_WRITE); + + spin_lock(&t->lock); + t->connected = 1; + t->wait = 1; + spin_unlock(&t->lock); + + if (send_log(t)) + triton_md_enable_handler(&t->hnd, MD_MODE_WRITE); + + return 0; +} + +static void connect_timer(struct triton_timer_t *timer) +{ + struct tcp_target_t *t = container_of(timer, typeof(*t), conn_timer); + + triton_timer_del(timer); + + start_connect(t); +} + +static void start_connect(struct tcp_target_t *t) +{ + t->hnd.write = log_tcp_connect; + t->hnd.fd = socket(PF_INET, SOCK_STREAM, 0); + + if (!t->hnd.fd) { + log_emerg("log-tcp: socket: %s\n", strerror(errno)); + return; + } + + if (fcntl(t->hnd.fd, F_SETFL, O_NONBLOCK)) { + log_emerg("log-tcp: failed to set nonblocking mode: %s\n", strerror(errno)); + close(t->hnd.fd); + return; + } + + if (connect(t->hnd.fd, &t->addr, sizeof(t->addr))) { + if (errno != EINPROGRESS) { + log_emerg("log-tcp: connect: %s\n", strerror(errno)); + close(t->hnd.fd); + return; + } + } + + triton_md_register_handler(&tcp_ctx, &t->hnd); + triton_md_enable_handler(&t->hnd, MD_MODE_WRITE); +} + +static void log_tcp_close(struct triton_context_t *ctx) +{ + struct tcp_target_t *t; + + while (!list_empty(&targets)) { + t = list_entry(targets.next, typeof(*t), entry); + list_del(&t->entry); + if (t->conn_timer.tpd) + triton_timer_del(&t->conn_timer); + else { + t->connected = 0; + triton_md_unregister_handler(&t->hnd); + close(t->hnd.fd); + } + } + + triton_context_unregister(&tcp_ctx); +} + +static int start_log(const char *_opt) +{ + struct tcp_target_t *t; + char *opt = strdup(_opt); + int port; + char *d; + + d = strchr(opt, ':'); + if (!d) + goto err; + + *d = 0; + + port = atoi(d + 1); + if (port <= 0) + goto err; + + t = _malloc(sizeof(*t)); + memset(t, 0, sizeof(*t)); + + t->buf = _malloc(LOG_MAX_SIZE + 64); + + t->conn_timer.expire_tv.tv_sec = conf_connect_interval; + t->conn_timer.expire = connect_timer; + + t->target.log = general_log; + + memset(&t->addr, 0, sizeof(t->addr)); + t->addr.sin_family = AF_INET; + t->addr.sin_port = htons(port); + t->addr.sin_addr.s_addr = inet_addr(opt); + + INIT_LIST_HEAD(&t->queue); + + spinlock_init(&t->lock); + + start_connect(t); + + log_register_target(&t->target); + + list_add_tail(&t->entry, &targets); + + return 0; + +err: + free(opt); + return -1; +} + +static struct triton_context_t tcp_ctx ={ + .close = log_tcp_close, + .before_switch = log_switch, +}; + +static void __init init(void) +{ + struct conf_sect_t *s = conf_get_section("log"); + struct conf_option_t *opt; + + if (!s) + return; + + triton_context_register(&tcp_ctx, NULL); + + list_for_each_entry(opt, &s->items, entry) { + if (strcmp(opt->name, "log-tcp")) + continue; + if (!opt->val || start_log(opt->val)) + log_emerg("log: log-tcp: invalid format: '%s'\n", opt->val); + } + + triton_context_wakeup(&tcp_ctx); +} + |