#include <stdio.h> #include <stdlib.h> #include <string.h> #include <postgresql/libpq-fe.h> #include "triton.h" #include "spinlock.h" #include "log.h" #include "list.h" #include "ppp.h" #include "memdebug.h" static char *conf_conninfo; static int conf_queue_max = 1000; static char *conf_query; #define QUERY_TEMPLATE "insert into %s (timestamp, username, sessionid, msg) values ($1, $2, $3, $4)" static void start_connect(void); static void start_connect_timer(struct triton_timer_t *); static void pgsql_close(struct triton_context_t *ctx); static struct triton_context_t pgsql_ctx = { .close = pgsql_close, .before_switch = log_switch, }; static struct triton_md_handler_t pgsql_hnd; static struct triton_timer_t connect_timer = { .period = 5000, .expire = start_connect_timer, }; static PGconn *conn; static LIST_HEAD(msg_queue); static int queue_size; static int sleeping = 0; static spinlock_t queue_lock = SPINLOCK_INITIALIZER; static char *log_buf; static int need_close; static void unpack_msg(struct log_msg_t *msg) { struct log_chunk_t *chunk; int pos = 0; list_for_each_entry(chunk, msg->chunks, entry) { memcpy(log_buf + pos, chunk->msg, chunk->len); pos += chunk->len; } if (pos > 1) log_buf[pos - 1] = 0; else log_buf[0] = 0; } static void set_hdr(struct log_msg_t *msg, struct ppp_t *ppp) { struct tm tm; localtime_r(&msg->timestamp.tv_sec, &tm); strftime(msg->hdr->msg, LOG_CHUNK_SIZE, "%Y-%m-%d %H:%M:%S", &tm); msg->hdr->len = strlen(msg->hdr->msg) + 1; if (ppp && ppp->username) { strcpy(msg->hdr->msg + msg->hdr->len, ppp->username); msg->hdr->len += strlen(ppp->username) + 1; strcpy(msg->hdr->msg + msg->hdr->len, ppp->sessionid); msg->hdr->len += strlen(ppp->sessionid) + 1; } else memset(msg->hdr->msg + msg->hdr->len, 0, 2); } static void write_next_msg(void) { struct log_msg_t *msg; const char *paramValues[4]; int paramFormats[4] = {0, 0, 0, 0}; char *ptr1, *ptr2; int r; spin_lock(&queue_lock); if (list_empty(&msg_queue)) { sleeping = 1; spin_unlock(&queue_lock); if (need_close) { triton_md_unregister_handler(&pgsql_hnd); PQfinish(conn); conn = NULL; triton_context_unregister(&pgsql_ctx); } return; } msg = list_entry(msg_queue.next, typeof(*msg), entry); list_del(&msg->entry); --queue_size; spin_unlock(&queue_lock); unpack_msg(msg); ptr1 = strchr(msg->hdr->msg, 0); ptr2 = strchr(ptr1 + 1, 0); paramValues[1] = ptr1[1] ? ptr1 + 1 : NULL; paramValues[2] = ptr2[1] ? ptr2 + 1 : NULL; paramValues[0] = msg->hdr->msg; paramValues[3] = log_buf; if (!PQsendQueryParams(conn, conf_query, 4, NULL, paramValues, NULL, paramFormats, 0)) log_emerg("log_pgsql: %s\n", PQerrorMessage(conn)); log_free_msg(msg); r = PQflush(conn); if (r == -1) log_emerg("log_pgsql: %s\n", PQerrorMessage(conn)); if (r == 0) triton_md_enable_handler(&pgsql_hnd, MD_MODE_WRITE); } static int pgsql_check_ready(struct triton_md_handler_t *h) { PGresult *res; if (!PQconsumeInput(conn)) { log_emerg("log_pgsql: %s\n", PQerrorMessage(conn)); if (PQstatus(conn) == CONNECTION_BAD) { PQfinish(conn); start_connect(); } } if (PQisBusy(conn)) return 0; while (1) { res = PQgetResult(conn); if (!res) break; if (PQresultStatus(res) != PGRES_COMMAND_OK) log_emerg("log_pgsql: %s\n", PQerrorMessage(conn)); PQclear(res); } write_next_msg(); return 0; } static int pgsql_flush(struct triton_md_handler_t *h) { int r; r = PQflush(conn); if (r == -1) log_emerg("log_pgsql: %s\n", PQerrorMessage(conn)); if (r == 1) return 0; triton_md_disable_handler(&pgsql_hnd, MD_MODE_WRITE); return 0; } static void wakeup_log(void) { write_next_msg(); } static void queue_log(struct log_msg_t *msg) { int r = 0, f = 0; spin_lock(&queue_lock); if (!conn) { log_free_msg(msg); spin_unlock(&queue_lock); return; } if (queue_size < conf_queue_max) { list_add_tail(&msg->entry, &msg_queue); ++queue_size; r = sleeping; sleeping = 0; } else f = 1; spin_unlock(&queue_lock); if (r) triton_context_call(&pgsql_ctx, (void (*)(void*))wakeup_log, NULL); else if (f) log_free_msg(msg); } static void general_log(struct log_target_t *t, struct log_msg_t *msg, struct ppp_t *ppp) { set_hdr(msg, ppp); queue_log(msg); } static int wait_connect(struct triton_md_handler_t *h) { PostgresPollingStatusType status = PQconnectPoll(conn); char *err_msg; switch(status) { case PGRES_POLLING_READING: triton_md_enable_handler(h, MD_MODE_READ); triton_md_disable_handler(h, MD_MODE_WRITE); break; case PGRES_POLLING_WRITING: triton_md_enable_handler(h, MD_MODE_WRITE); triton_md_disable_handler(h, MD_MODE_READ); break; case PGRES_POLLING_FAILED: err_msg = PQerrorMessage(conn); log_emerg("log_pgsql: %s\n", err_msg); triton_md_disable_handler(h, MD_MODE_READ | MD_MODE_WRITE); PQfinish(conn); h->read = NULL; h->write = NULL; if (!connect_timer.tpd) triton_timer_add(&pgsql_ctx, &connect_timer, 0); break; case PGRES_POLLING_OK: //triton_md_disable_handler(h, MD_MODE_READ | MD_MODE_WRITE); PQsetnonblocking(conn, 1); h->write = pgsql_flush; h->read = pgsql_check_ready; triton_md_enable_handler(&pgsql_hnd, MD_MODE_READ); wakeup_log(); break; default: break; } return 0; } static void start_connect(void) { conn = PQconnectStart(conf_conninfo); if (!conn) { log_emerg("log_pgsql: out of memory\n"); return; } if (PQstatus(conn) == CONNECTION_BAD) { log_emerg("log_pgsql: PQconnectStart failed\n"); } pgsql_hnd.fd = PQsocket(conn); pgsql_hnd.read = wait_connect; pgsql_hnd.write = wait_connect; wait_connect(&pgsql_hnd); } static void start_connect_timer(struct triton_timer_t *t) { triton_timer_del(t); start_connect(); } static void pgsql_close(struct triton_context_t *ctx) { spin_lock(&queue_lock); if (sleeping) { triton_md_unregister_handler(&pgsql_hnd); PQfinish(conn); conn = NULL; triton_context_unregister(&pgsql_ctx); } else need_close = 1; spin_unlock(&queue_lock); } static struct log_target_t target = { .log = general_log, }; static void init(void) { char *opt; opt = conf_get_opt("log-pgsql", "conninfo"); if (!opt) return; conf_conninfo = _strdup(opt); opt = conf_get_opt("log-pgsql", "connect-inteval"); if (opt && atoi(opt) > 0) connect_timer.period = atoi(opt) * 1000; opt = conf_get_opt("log-pgsql", "log-query"); if (opt) conf_query = _strdup(opt); else { opt = conf_get_opt("log-pgsql", "log-table"); if (!opt || strlen(opt) > 32) opt = "log"; conf_query = _malloc(sizeof(QUERY_TEMPLATE) + strlen(opt)); sprintf(conf_query, QUERY_TEMPLATE, opt); } log_buf = _malloc(LOG_MAX_SIZE + 1); if (!log_buf) { log_emerg("log_pgsql: out of memory\n"); return; } triton_context_register(&pgsql_ctx, NULL); triton_md_register_handler(&pgsql_ctx, &pgsql_hnd); triton_md_set_trig(&pgsql_hnd, MD_TRIG_LEVEL); triton_context_wakeup(&pgsql_ctx); start_connect(); log_register_target(&target); } DEFINE_INIT(1, init);