diff options
-rw-r--r-- | doc/sync/ftfw/conntrackd.conf | 15 | ||||
-rw-r--r-- | include/queue.h | 1 | ||||
-rw-r--r-- | src/queue.c | 7 | ||||
-rw-r--r-- | src/read_config_lex.l | 1 | ||||
-rw-r--r-- | src/read_config_yy.y | 13 | ||||
-rw-r--r-- | src/sync-ftfw.c | 44 |
6 files changed, 66 insertions, 15 deletions
diff --git a/doc/sync/ftfw/conntrackd.conf b/doc/sync/ftfw/conntrackd.conf index 77ef76c..4fd86d7 100644 --- a/doc/sync/ftfw/conntrackd.conf +++ b/doc/sync/ftfw/conntrackd.conf @@ -4,14 +4,15 @@ Sync { Mode FTFW { # - # Size of the buffer that hold control messages for - # possible resends (in bytes). Under message omission, - # this size determines the length of the history window - # of control message. Control messages are 16 bytes long, - # so that we keep a history of 262144/16 = 16384 control - # messages. + # Size of the resend queue (in objects). This is the maximum + # number of objects that can be stored waiting to be confirmed + # via acknoledgment. If you keep this value low, the daemon + # will have less chances to recover state-changes under message + # omission. On the other hand, if you keep this value high, + # the daemon will consume more memory to store dead objects. + # Default is 131072 objects. # - ResendBufferSize 262144 + # ResendQueueSize 131072 # # Entries committed to the connection tracking table diff --git a/include/queue.h b/include/queue.h index ef56323..9213b3d 100644 --- a/include/queue.h +++ b/include/queue.h @@ -44,6 +44,7 @@ void queue_destroy(struct queue *b); unsigned int queue_len(const struct queue *b); int queue_add(struct queue *b, struct queue_node *n); int queue_del(struct queue_node *n); +struct queue_node *queue_del_head(struct queue *b); int queue_in(struct queue *b, struct queue_node *n); void queue_iterate(struct queue *b, const void *data, diff --git a/src/queue.c b/src/queue.c index cffcc93..7b36dc6 100644 --- a/src/queue.c +++ b/src/queue.c @@ -113,6 +113,13 @@ int queue_del(struct queue_node *n) return 1; } +struct queue_node *queue_del_head(struct queue *b) +{ + struct queue_node *n = (struct queue_node *) b->head.next; + queue_del(n); + return n; +} + int queue_in(struct queue *b, struct queue_node *n) { return b == n->owner; diff --git a/src/read_config_lex.l b/src/read_config_lex.l index 4953974..9bc4c18 100644 --- a/src/read_config_lex.l +++ b/src/read_config_lex.l @@ -88,6 +88,7 @@ notrack [N|n][O|o][T|t][R|r][A|a][C|c][K|k] "ListenTo" { return T_LISTEN_TO; } "Family" { return T_FAMILY; } "ResendBufferSize" { return T_RESEND_BUFFER_SIZE; } +"ResendQueueSize" { return T_RESEND_QUEUE_SIZE; } "Checksum" { return T_CHECKSUM; } "ACKWindowSize" { return T_WINDOWSIZE; } "Replicate" { return T_REPLICATE; } diff --git a/src/read_config_yy.y b/src/read_config_yy.y index ce604d9..97aa178 100644 --- a/src/read_config_yy.y +++ b/src/read_config_yy.y @@ -54,7 +54,7 @@ static void __max_mcast_dedicated_links_reached(void); %token T_GENERAL T_SYNC T_STATS T_RELAX_TRANSITIONS T_BUFFER_SIZE T_DELAY %token T_SYNC_MODE T_LISTEN_TO T_FAMILY T_RESEND_BUFFER_SIZE %token T_ALARM T_FTFW T_CHECKSUM T_WINDOWSIZE T_ON T_OFF -%token T_REPLICATE T_FOR T_IFACE T_PURGE +%token T_REPLICATE T_FOR T_IFACE T_PURGE T_RESEND_QUEUE_SIZE %token T_ESTABLISHED T_SYN_SENT T_SYN_RECV T_FIN_WAIT %token T_CLOSE_WAIT T_LAST_ACK T_TIME_WAIT T_CLOSE T_LISTEN %token T_SYSLOG T_WRITE_THROUGH T_STAT_BUFFER_SIZE T_DESTROY_TIMEOUT @@ -525,6 +525,7 @@ sync_mode_ftfw_list: | sync_mode_ftfw_list sync_mode_ftfw_line; sync_mode_ftfw_line: resend_queue_size + | resend_buffer_size | timeout | purge | window_size @@ -537,7 +538,13 @@ sync_mode_notrack_line: timeout | purge ; -resend_queue_size: T_RESEND_BUFFER_SIZE T_NUMBER +resend_buffer_size: T_RESEND_BUFFER_SIZE T_NUMBER +{ + fprintf(stderr, "WARNING: `ResendBufferSize' is deprecated. " + "Use `ResendQueueSize' instead\n"); +}; + +resend_queue_size: T_RESEND_QUEUE_SIZE T_NUMBER { conf.resend_queue_size = $2; }; @@ -1146,7 +1153,7 @@ init_config(char *filename) CONFIG(refresh) = 60; if (CONFIG(resend_queue_size) == 0) - CONFIG(resend_queue_size) = 262144; + CONFIG(resend_queue_size) = 131072; /* default to a window size of 300 packets */ if (CONFIG(window_size) == 0) diff --git a/src/sync-ftfw.c b/src/sync-ftfw.c index 0d49756..493c15f 100644 --- a/src/sync-ftfw.c +++ b/src/sync-ftfw.c @@ -27,6 +27,7 @@ #include "fds.h" #include <string.h> +#include <errno.h> #if 0 #define dp printf @@ -143,7 +144,7 @@ static void do_alive_alarm(struct alarm_block *a, void *data) static int ftfw_init(void) { - rs_queue = queue_create(INT_MAX, 0); + rs_queue = queue_create(CONFIG(resend_queue_size), 0); if (rs_queue == NULL) { dlog(LOG_ERR, "cannot create rs queue"); return -1; @@ -451,6 +452,29 @@ out: return ret; } +static void rs_queue_purge_full(void) +{ + struct queue_node *n; + + n = queue_del_head(rs_queue); + switch(n->type) { + case Q_ELEM_CTL: { + struct queue_object *qobj = (struct queue_object *)n; + queue_object_free(qobj); + break; + } + case Q_ELEM_OBJ: { + struct cache_ftfw *cn; + struct cache_object *obj; + + cn = (struct cache_ftfw *)n; + obj = cache_data_get_object(STATE_SYNC(internal), cn); + cache_object_put(obj); + break; + } + } +} + static int tx_queue_xmit(struct queue_node *n, const void *data) { queue_del(n); @@ -474,9 +498,14 @@ static int tx_queue_xmit(struct queue_node *n, const void *data) mcast_buffered_send_netmsg(STATE_SYNC(mcast_client), net); HDR_NETWORK2HOST(net); - if (IS_ACK(net) || IS_NACK(net) || IS_RESYNC(net)) - queue_add(rs_queue, n); - else + if (IS_ACK(net) || IS_NACK(net) || IS_RESYNC(net)) { + if (queue_add(rs_queue, n) < 0) { + if (errno == ENOSPC) { + rs_queue_purge_full(); + queue_add(rs_queue, n); + } + } + } else queue_object_free((struct queue_object *)n); break; } @@ -497,7 +526,12 @@ static int tx_queue_xmit(struct queue_node *n, const void *data) mcast_buffered_send_netmsg(STATE_SYNC(mcast_client), net); cn->seq = ntohl(net->seq); - queue_add(rs_queue, &cn->qnode); + if (queue_add(rs_queue, &cn->qnode) < 0) { + if (errno == ENOSPC) { + rs_queue_purge_full(); + queue_add(rs_queue, &cn->qnode); + } + } /* we release the object once we get the acknowlegment */ break; } |