summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorPablo Neira Ayuso <pablo@netfilter.org>2009-08-23 12:11:20 +0200
committerPablo Neira Ayuso <pablo@netfilter.org>2009-08-23 12:11:20 +0200
commitcf3be894fcb95adb360425c8482954522e9110d2 (patch)
tree9a6f2a95cd36218bcf6e852ecc300074ba7fef16 /src
parent9d99a7699d7021a1c219d6553e037ac7ba4a5a37 (diff)
downloadconntrack-tools-cf3be894fcb95adb360425c8482954522e9110d2.tar.gz
conntrack-tools-cf3be894fcb95adb360425c8482954522e9110d2.zip
conntrackd: add support state-replication based on TCP
This patch adds support for TCP as protocol to replicate state-changes between two daemons. Note that this only makes sense with the notrack mode. Signed-off-by: Pablo Neira Ayuso <pablo@netfilter.org>
Diffstat (limited to 'src')
-rw-r--r--src/Makefile.am1
-rw-r--r--src/channel.c17
-rw-r--r--src/channel_mcast.c15
-rw-r--r--src/channel_tcp.c149
-rw-r--r--src/channel_udp.c15
-rw-r--r--src/mcast.c5
-rw-r--r--src/read_config_lex.l1
-rw-r--r--src/read_config_yy.y158
-rw-r--r--src/sync-mode.c65
-rw-r--r--src/tcp.c440
-rw-r--r--src/udp.c5
11 files changed, 857 insertions, 14 deletions
diff --git a/src/Makefile.am b/src/Makefile.am
index e969f4d..8b36642 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -20,6 +20,7 @@ conntrackd_SOURCES = alarm.c main.c run.c hash.c queue.c rbtree.c \
network.c cidr.c \
build.c parse.c \
channel.c multichannel.c channel_mcast.c channel_udp.c \
+ tcp.c channel_tcp.c \
external_cache.c external_inject.c \
read_config_yy.y read_config_lex.l
diff --git a/src/channel.c b/src/channel.c
index 9d74b7f..76fb057 100644
--- a/src/channel.c
+++ b/src/channel.c
@@ -20,11 +20,13 @@
static struct channel_ops *ops[CHANNEL_MAX];
extern struct channel_ops channel_mcast;
extern struct channel_ops channel_udp;
+extern struct channel_ops channel_tcp;
void channel_init(void)
{
ops[CHANNEL_MCAST] = &channel_mcast;
ops[CHANNEL_UDP] = &channel_udp;
+ ops[CHANNEL_TCP] = &channel_tcp;
}
#define HEADERSIZ 28 /* IP header (20 bytes) + UDP header 8 (bytes) */
@@ -183,3 +185,18 @@ void channel_stats_extended(struct channel *c, int active,
{
return c->ops->stats_extended(c, active, h, fd);
}
+
+int channel_accept_isset(struct channel *c, fd_set *readfds)
+{
+ return c->ops->accept_isset(c, readfds);
+}
+
+int channel_isset(struct channel *c, fd_set *readfds)
+{
+ return c->ops->isset(c, readfds);
+}
+
+int channel_accept(struct channel *c)
+{
+ return c->ops->accept(c);
+}
diff --git a/src/channel_mcast.c b/src/channel_mcast.c
index 898b194..9fcacac 100644
--- a/src/channel_mcast.c
+++ b/src/channel_mcast.c
@@ -112,12 +112,27 @@ channel_mcast_stats_extended(struct channel *c, int active,
send(fd, buf, size, 0);
}
+static int
+channel_mcast_isset(struct channel *c, fd_set *readfds)
+{
+ struct mcast_channel *m = c->data;
+ return mcast_isset(m->server, readfds);
+}
+
+static int
+channel_mcast_accept_isset(struct channel *c, fd_set *readfds)
+{
+ return 0;
+}
+
struct channel_ops channel_mcast = {
.open = channel_mcast_open,
.close = channel_mcast_close,
.send = channel_mcast_send,
.recv = channel_mcast_recv,
.get_fd = channel_mcast_get_fd,
+ .isset = channel_mcast_isset,
+ .accept_isset = channel_mcast_accept_isset,
.stats = channel_mcast_stats,
.stats_extended = channel_mcast_stats_extended,
};
diff --git a/src/channel_tcp.c b/src/channel_tcp.c
new file mode 100644
index 0000000..9fb4b07
--- /dev/null
+++ b/src/channel_tcp.c
@@ -0,0 +1,149 @@
+/*
+ * (C) 2009 by Pablo Neira Ayuso <pablo@netfilter.org>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * TCP support has been sponsored by 6WIND <www.6wind.com>.
+ */
+
+#include <stdlib.h>
+#include <libnfnetlink/libnfnetlink.h>
+
+#include "channel.h"
+#include "tcp.h"
+
+static void
+*channel_tcp_open(void *conf)
+{
+ struct tcp_channel *m;
+ struct tcp_conf *c = conf;
+
+ m = calloc(sizeof(struct tcp_channel), 1);
+ if (m == NULL)
+ return NULL;
+
+ m->client = tcp_client_create(c);
+ if (m->client == NULL) {
+ free(m);
+ return NULL;
+ }
+
+ m->server = tcp_server_create(c);
+ if (m->server == NULL) {
+ tcp_client_destroy(m->client);
+ free(m);
+ return NULL;
+ }
+ return m;
+}
+
+static int
+channel_tcp_send(void *channel, const void *data, int len)
+{
+ struct tcp_channel *m = channel;
+ return tcp_send(m->client, data, len);
+}
+
+static int
+channel_tcp_recv(void *channel, char *buf, int size)
+{
+ struct tcp_channel *m = channel;
+ return tcp_recv(m->server, buf, size);
+}
+
+static void
+channel_tcp_close(void *channel)
+{
+ struct tcp_channel *m = channel;
+ tcp_client_destroy(m->client);
+ tcp_server_destroy(m->server);
+ free(m);
+}
+
+static int
+channel_tcp_get_fd(void *channel)
+{
+ struct tcp_channel *m = channel;
+ return tcp_get_fd(m->server);
+}
+
+static void
+channel_tcp_stats(struct channel *c, int fd)
+{
+ struct tcp_channel *m = c->data;
+ char ifname[IFNAMSIZ], buf[512];
+ int size;
+
+ if_indextoname(c->channel_ifindex, ifname);
+ size = tcp_snprintf_stats(buf, sizeof(buf), ifname,
+ m->client, m->server);
+ send(fd, buf, size, 0);
+}
+
+static void
+channel_tcp_stats_extended(struct channel *c, int active,
+ struct nlif_handle *h, int fd)
+{
+ struct tcp_channel *m = c->data;
+ char ifname[IFNAMSIZ], buf[512];
+ const char *status;
+ unsigned int flags;
+ int size;
+
+ if_indextoname(c->channel_ifindex, ifname);
+ nlif_get_ifflags(h, c->channel_ifindex, &flags);
+ /*
+ * IFF_UP shows administrative status
+ * IFF_RUNNING shows carrier status
+ */
+ if (flags & IFF_UP) {
+ if (!(flags & IFF_RUNNING))
+ status = "NO-CARRIER";
+ else
+ status = "RUNNING";
+ } else {
+ status = "DOWN";
+ }
+ size = tcp_snprintf_stats2(buf, sizeof(buf),
+ ifname, status, active,
+ &m->client->stats,
+ &m->server->stats);
+ send(fd, buf, size, 0);
+}
+
+static int
+channel_tcp_isset(struct channel *c, fd_set *readfds)
+{
+ struct tcp_channel *m = c->data;
+ return tcp_isset(m->server, readfds);
+}
+
+static int
+channel_tcp_accept_isset(struct channel *c, fd_set *readfds)
+{
+ struct tcp_channel *m = c->data;
+ return tcp_accept_isset(m->server, readfds);
+}
+
+static int
+channel_tcp_accept(struct channel *c)
+{
+ struct tcp_channel *m = c->data;
+ return tcp_accept(m->server);
+}
+
+struct channel_ops channel_tcp = {
+ .open = channel_tcp_open,
+ .close = channel_tcp_close,
+ .send = channel_tcp_send,
+ .recv = channel_tcp_recv,
+ .accept = channel_tcp_accept,
+ .get_fd = channel_tcp_get_fd,
+ .isset = channel_tcp_isset,
+ .accept_isset = channel_tcp_accept_isset,
+ .stats = channel_tcp_stats,
+ .stats_extended = channel_tcp_stats_extended,
+};
diff --git a/src/channel_udp.c b/src/channel_udp.c
index 1c15b47..5c88647 100644
--- a/src/channel_udp.c
+++ b/src/channel_udp.c
@@ -112,12 +112,27 @@ channel_udp_stats_extended(struct channel *c, int active,
send(fd, buf, size, 0);
}
+static int
+channel_udp_isset(struct channel *c, fd_set *readfds)
+{
+ struct udp_channel *m = c->data;
+ return udp_isset(m->server, readfds);
+}
+
+static int
+channel_udp_accept_isset(struct channel *c, fd_set *readfds)
+{
+ return 0;
+}
+
struct channel_ops channel_udp = {
.open = channel_udp_open,
.close = channel_udp_close,
.send = channel_udp_send,
.recv = channel_udp_recv,
.get_fd = channel_udp_get_fd,
+ .isset = channel_udp_isset,
+ .accept_isset = channel_udp_accept_isset,
.stats = channel_udp_stats,
.stats_extended = channel_udp_stats_extended,
};
diff --git a/src/mcast.c b/src/mcast.c
index ec11100..4107d5d 100644
--- a/src/mcast.c
+++ b/src/mcast.c
@@ -304,6 +304,11 @@ int mcast_get_fd(struct mcast_sock *m)
return m->fd;
}
+int mcast_isset(struct mcast_sock *m, fd_set *readfds)
+{
+ return FD_ISSET(m->fd, readfds);
+}
+
int
mcast_snprintf_stats(char *buf, size_t buflen, char *ifname,
struct mcast_stats *s, struct mcast_stats *r)
diff --git a/src/read_config_lex.l b/src/read_config_lex.l
index d3f83aa..9c53c6c 100644
--- a/src/read_config_lex.l
+++ b/src/read_config_lex.l
@@ -65,6 +65,7 @@ notrack [N|n][O|o][T|t][R|r][A|a][C|c][K|k]
"Interface" { return T_IFACE; }
"Multicast" { return T_MULTICAST; }
"UDP" { return T_UDP; }
+"TCP" { return T_TCP; }
"HashSize" { return T_HASHSIZE; }
"RefreshTime" { return T_REFRESH; }
"CacheTimeout" { return T_EXPIRE; }
diff --git a/src/read_config_yy.y b/src/read_config_yy.y
index 38c5929..0804689 100644
--- a/src/read_config_yy.y
+++ b/src/read_config_yy.y
@@ -58,7 +58,7 @@ static void __max_dedicated_links_reached(void);
%token T_IPV4_ADDR T_IPV4_IFACE T_PORT T_HASHSIZE T_HASHLIMIT T_MULTICAST
%token T_PATH T_UNIX T_REFRESH T_IPV6_ADDR T_IPV6_IFACE
%token T_IGNORE_UDP T_IGNORE_ICMP T_IGNORE_TRAFFIC T_BACKLOG T_GROUP
-%token T_LOG T_UDP T_ICMP T_IGMP T_VRRP T_IGNORE_PROTOCOL
+%token T_LOG T_UDP T_ICMP T_IGMP T_VRRP T_TCP T_IGNORE_PROTOCOL
%token T_LOCK T_STRIP_NAT T_BUFFER_SIZE_MAX_GROWN T_EXPIRE T_TIMEOUT
%token T_GENERAL T_SYNC T_STATS T_RELAX_TRANSITIONS T_BUFFER_SIZE T_DELAY
%token T_SYNC_MODE T_LISTEN_TO T_FAMILY T_RESEND_BUFFER_SIZE
@@ -573,6 +573,142 @@ udp_option: T_CHECKSUM T_OFF
conf.channel[conf.channel_num].u.udp.checksum = 1;
};
+tcp_line : T_TCP '{' tcp_options '}'
+{
+ if (conf.channel_type_global != CHANNEL_NONE &&
+ conf.channel_type_global != CHANNEL_TCP) {
+ print_err(CTD_CFG_ERROR, "cannot use `TCP' with other "
+ "dedicated link protocols!");
+ exit(EXIT_FAILURE);
+ }
+ conf.channel_type_global = CHANNEL_TCP;
+ conf.channel[conf.channel_num].channel_type = CHANNEL_TCP;
+ conf.channel[conf.channel_num].channel_flags = CHANNEL_F_BUFFERED |
+ CHANNEL_F_STREAM;
+ conf.channel_num++;
+};
+
+tcp_line : T_TCP T_DEFAULT '{' tcp_options '}'
+{
+ if (conf.channel_type_global != CHANNEL_NONE &&
+ conf.channel_type_global != CHANNEL_TCP) {
+ print_err(CTD_CFG_ERROR, "cannot use `TCP' with other "
+ "dedicated link protocols!");
+ exit(EXIT_FAILURE);
+ }
+ conf.channel_type_global = CHANNEL_TCP;
+ conf.channel[conf.channel_num].channel_type = CHANNEL_TCP;
+ conf.channel[conf.channel_num].channel_flags = CHANNEL_F_DEFAULT |
+ CHANNEL_F_BUFFERED |
+ CHANNEL_F_STREAM;
+ conf.channel_default = conf.channel_num;
+ conf.channel_num++;
+};
+
+tcp_options :
+ | tcp_options tcp_option;
+
+tcp_option : T_IPV4_ADDR T_IP
+{
+ __max_dedicated_links_reached();
+
+ if (!inet_aton($2, &conf.channel[conf.channel_num].u.tcp.server.ipv4)) {
+ print_err(CTD_CFG_WARN, "%s is not a valid IPv4 address", $2);
+ break;
+ }
+ conf.channel[conf.channel_num].u.tcp.ipproto = AF_INET;
+};
+
+tcp_option : T_IPV6_ADDR T_IP
+{
+ __max_dedicated_links_reached();
+
+#ifdef HAVE_INET_PTON_IPV6
+ if (inet_pton(AF_INET6, $2,
+ &conf.channel[conf.channel_num].u.tcp.server.ipv6) <= 0) {
+ print_err(CTD_CFG_WARN, "%s is not a valid IPv6 address", $2);
+ break;
+ }
+#else
+ print_err(CTD_CFG_WARN, "cannot find inet_pton(), IPv6 unsupported!");
+ break;
+#endif
+ conf.channel[conf.channel_num].u.tcp.ipproto = AF_INET6;
+};
+
+tcp_option : T_IPV4_DEST_ADDR T_IP
+{
+ __max_dedicated_links_reached();
+
+ if (!inet_aton($2, &conf.channel[conf.channel_num].u.tcp.client)) {
+ print_err(CTD_CFG_WARN, "%s is not a valid IPv4 address", $2);
+ break;
+ }
+ conf.channel[conf.channel_num].u.tcp.ipproto = AF_INET;
+};
+
+tcp_option : T_IPV6_DEST_ADDR T_IP
+{
+ __max_dedicated_links_reached();
+
+#ifdef HAVE_INET_PTON_IPV6
+ if (inet_pton(AF_INET6, $2,
+ &conf.channel[conf.channel_num].u.tcp.client) <= 0) {
+ print_err(CTD_CFG_WARN, "%s is not a valid IPv6 address", $2);
+ break;
+ }
+#else
+ print_err(CTD_CFG_WARN, "cannot find inet_pton(), IPv6 unsupported!");
+ break;
+#endif
+ conf.channel[conf.channel_num].u.tcp.ipproto = AF_INET6;
+};
+
+tcp_option : T_IFACE T_STRING
+{
+ int idx;
+
+ __max_dedicated_links_reached();
+ strncpy(conf.channel[conf.channel_num].channel_ifname, $2, IFNAMSIZ);
+
+ idx = if_nametoindex($2);
+ if (!idx) {
+ print_err(CTD_CFG_WARN, "%s is an invalid interface", $2);
+ break;
+ }
+ conf.channel[conf.channel_num].u.tcp.server.ipv6.scope_id = idx;
+};
+
+tcp_option : T_PORT T_NUMBER
+{
+ __max_dedicated_links_reached();
+ conf.channel[conf.channel_num].u.tcp.port = $2;
+};
+
+tcp_option: T_SNDBUFF T_NUMBER
+{
+ __max_dedicated_links_reached();
+ conf.channel[conf.channel_num].u.tcp.sndbuf = $2;
+};
+
+tcp_option: T_RCVBUFF T_NUMBER
+{
+ __max_dedicated_links_reached();
+ conf.channel[conf.channel_num].u.tcp.rcvbuf = $2;
+};
+
+tcp_option: T_CHECKSUM T_ON
+{
+ __max_dedicated_links_reached();
+ conf.channel[conf.channel_num].u.tcp.checksum = 0;
+};
+
+tcp_option: T_CHECKSUM T_OFF
+{
+ __max_dedicated_links_reached();
+ conf.channel[conf.channel_num].u.tcp.checksum = 1;
+};
+
hashsize : T_HASHSIZE T_NUMBER
{
conf.hashsize = $2;
@@ -654,6 +790,7 @@ sync_line: refreshtime
| checksum
| multicast_line
| udp_line
+ | tcp_line
| relax_transitions
| delay_destroy_msgs
| sync_mode_alarm
@@ -1043,6 +1180,25 @@ filter_protocol_item : T_STRING
pent->p_proto);
};
+filter_protocol_item : T_TCP
+{
+ struct protoent *pent;
+
+ pent = getprotobyname("tcp");
+ if (pent == NULL) {
+ print_err(CTD_CFG_WARN, "getprotobyname() cannot find "
+ "protocol `tcp' in /etc/protocols");
+ break;
+ }
+ ct_filter_add_proto(STATE(us_filter), pent->p_proto);
+
+ __kernel_filter_start();
+
+ nfct_filter_add_attr_u32(STATE(filter),
+ NFCT_FILTER_L4PROTO,
+ pent->p_proto);
+};
+
filter_item : T_ADDRESS T_ACCEPT '{' filter_address_list '}'
{
ct_filter_set_logic(STATE(us_filter),
diff --git a/src/sync-mode.c b/src/sync-mode.c
index 174df80..6781f10 100644
--- a/src/sync-mode.c
+++ b/src/sync-mode.c
@@ -98,39 +98,70 @@ do_channel_handler_step(int i, struct nethdr *net, size_t remain)
}
}
+static char __net[65536]; /* XXX: maximum MTU for IPv4 */
+static char *cur = __net;
+
+static int channel_stream(struct channel *m, const char *ptr, ssize_t remain)
+{
+ if (m->channel_flags & CHANNEL_F_STREAM) {
+ /* truncated data. */
+ memcpy(__net, ptr, remain);
+ cur = __net + remain;
+ return 1;
+ }
+ return 0;
+}
+
/* handler for messages received */
static int channel_handler_routine(struct channel *m, int i)
{
ssize_t numbytes;
- ssize_t remain;
- char __net[65536], *ptr = __net; /* XXX: maximum MTU for IPv4 */
+ ssize_t remain, pending = cur - __net;
+ char *ptr = __net;
- numbytes = channel_recv(m, __net, sizeof(__net));
+ numbytes = channel_recv(m, cur, sizeof(__net) - pending);
if (numbytes <= 0)
return -1;
remain = numbytes;
+ if (pending) {
+ remain += pending;
+ cur = __net;
+ }
+
while (remain > 0) {
struct nethdr *net = (struct nethdr *) ptr;
int len;
if (remain < NETHDR_SIZ) {
- STATE_SYNC(error).msg_rcv_malformed++;
- STATE_SYNC(error).msg_rcv_truncated++;
+ if (!channel_stream(m, ptr, remain)) {
+ STATE_SYNC(error).msg_rcv_malformed++;
+ STATE_SYNC(error).msg_rcv_truncated++;
+ }
break;
}
len = ntohs(net->len);
- if (len > remain || len <= 0) {
+ if (len <= 0) {
STATE_SYNC(error).msg_rcv_malformed++;
STATE_SYNC(error).msg_rcv_bad_size++;
break;
}
+ if (len > remain) {
+ if (!channel_stream(m, ptr, remain)) {
+ STATE_SYNC(error).msg_rcv_malformed++;
+ STATE_SYNC(error).msg_rcv_bad_size++;
+ }
+ break;
+ }
+
if (IS_ACK(net) || IS_NACK(net) || IS_RESYNC(net)) {
if (remain < NETHDR_ACK_SIZ) {
- STATE_SYNC(error).msg_rcv_malformed++;
- STATE_SYNC(error).msg_rcv_truncated++;
+ if (!channel_stream(m, ptr, remain)) {
+ STATE_SYNC(error).msg_rcv_malformed++;
+ STATE_SYNC(error).msg_rcv_truncated++;
+ }
break;
}
@@ -322,15 +353,23 @@ static int init_sync(void)
return 0;
}
+static void channel_check(struct channel *c, int i, fd_set *readfds)
+{
+ /* In case that this channel is connection-oriented. */
+ if (channel_accept_isset(c, readfds))
+ channel_accept(c);
+
+ /* For data handling. */
+ if (channel_isset(c, readfds))
+ channel_handler(c, i);
+}
+
static void run_sync(fd_set *readfds)
{
int i;
- for (i=0; i<STATE_SYNC(channel)->channel_num; i++) {
- int fd = channel_get_fd(STATE_SYNC(channel)->channel[i]);
- if (FD_ISSET(fd, readfds))
- channel_handler(STATE_SYNC(channel)->channel[i], i);
- }
+ for (i=0; i<STATE_SYNC(channel)->channel_num; i++)
+ channel_check(STATE_SYNC(channel)->channel[i], i, readfds);
if (FD_ISSET(queue_get_eventfd(STATE_SYNC(tx_queue)), readfds))
STATE_SYNC(sync)->xmit();
diff --git a/src/tcp.c b/src/tcp.c
new file mode 100644
index 0000000..f99c1cb
--- /dev/null
+++ b/src/tcp.c
@@ -0,0 +1,440 @@
+/*
+ * (C) 2009 by Pablo Neira Ayuso <pablo@netfilter.org>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * TCP support has been sponsored by 6WIND <www.6wind.com>.
+ */
+
+#include "tcp.h"
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <arpa/inet.h>
+#include <unistd.h>
+#include <string.h>
+#include <sys/ioctl.h>
+#include <net/if.h>
+#include <errno.h>
+#include <fcntl.h>
+#include <limits.h>
+
+#include "conntrackd.h"
+#include "fds.h"
+
+struct tcp_sock *tcp_server_create(struct tcp_conf *c)
+{
+ int yes = 1, ret;
+ struct tcp_sock *m;
+ socklen_t socklen = sizeof(int);
+
+ m = calloc(sizeof(struct tcp_sock), 1);
+ if (m == NULL)
+ return NULL;
+
+ switch(c->ipproto) {
+ case AF_INET:
+ m->addr.ipv4.sin_family = AF_INET;
+ m->addr.ipv4.sin_port = htons(c->port);
+ m->addr.ipv4.sin_addr = c->server.ipv4.inet_addr;
+ m->sockaddr_len = sizeof(struct sockaddr_in);
+ break;
+
+ case AF_INET6:
+ m->addr.ipv6.sin6_family = AF_INET6;
+ m->addr.ipv6.sin6_port = htons(c->port);
+ m->addr.ipv6.sin6_addr = c->server.ipv6.inet_addr6;
+ m->addr.ipv6.sin6_scope_id = c->server.ipv6.scope_id;
+ m->sockaddr_len = sizeof(struct sockaddr_in6);
+ break;
+ }
+
+ m->fd = socket(c->ipproto, SOCK_STREAM, 0);
+ if (m->fd == -1) {
+ free(m);
+ return NULL;
+ }
+
+ if (setsockopt(m->fd, SOL_SOCKET, SO_REUSEADDR, &yes,
+ sizeof(int)) == -1) {
+ close(m->fd);
+ free(m);
+ return NULL;
+ }
+
+ if (setsockopt(m->fd, SOL_SOCKET, SO_KEEPALIVE, &yes,
+ sizeof(int)) == -1) {
+ close(m->fd);
+ free(m);
+ return NULL;
+ }
+
+#ifndef SO_RCVBUFFORCE
+#define SO_RCVBUFFORCE 33
+#endif
+
+ if (c->rcvbuf &&
+ setsockopt(m->fd, SOL_SOCKET, SO_RCVBUFFORCE, &c->rcvbuf,
+ sizeof(int)) == -1) {
+ /* not supported in linux kernel < 2.6.14 */
+ if (errno != ENOPROTOOPT) {
+ close(m->fd);
+ free(m);
+ return NULL;
+ }
+ }
+
+ getsockopt(m->fd, SOL_SOCKET, SO_RCVBUF, &c->rcvbuf, &socklen);
+
+ if (bind(m->fd, (struct sockaddr *) &m->addr, m->sockaddr_len) == -1) {
+ close(m->fd);
+ free(m);
+ return NULL;
+ }
+
+ if (listen(m->fd, 1) == -1) {
+ close(m->fd);
+ free(m);
+ return NULL;
+ }
+
+ if (fcntl(m->fd, F_SETFL, O_NONBLOCK) == -1) {
+ close(m->fd);
+ free(m);
+ return NULL;
+ }
+
+ /* now we accept new connections ... */
+ ret = accept(m->fd, NULL, NULL);
+ if (ret == -1) {
+ if (errno != EAGAIN) {
+ /* unexpected error, give up. */
+ close(m->fd);
+ free(m);
+ m = NULL;
+ } else {
+ /* still in progress ... we'll do it in tcp_recv() */
+ m->state = TCP_SERVER_ACCEPTING;
+ }
+ } else {
+ /* very unlikely at this stage. */
+ if (fcntl(ret, F_SETFL, O_NONBLOCK) == -1) {
+ /* unexpected error, give up. */
+ close(m->fd);
+ free(m);
+ return NULL;
+ }
+ m->client_fd = ret;
+ m->state = TCP_SERVER_CONNECTED;
+ register_fd(m->client_fd, STATE(fds));
+ }
+
+ return m;
+}
+
+void tcp_server_destroy(struct tcp_sock *m)
+{
+ close(m->fd);
+ free(m);
+}
+
+static int
+tcp_client_init(struct tcp_sock *m, struct tcp_conf *c)
+{
+ int ret = 0;
+ socklen_t socklen = sizeof(int);
+
+ m->fd = socket(c->ipproto, SOCK_STREAM, 0);
+ if (m->fd == -1)
+ return -1;
+
+ if (setsockopt(m->fd, SOL_SOCKET, SO_NO_CHECK, &c->checksum,
+ sizeof(int)) == -1) {
+ close(m->fd);
+ return -1;
+ }
+
+#ifndef SO_SNDBUFFORCE
+#define SO_SNDBUFFORCE 32
+#endif
+
+ if (c->sndbuf &&
+ setsockopt(m->fd, SOL_SOCKET, SO_SNDBUFFORCE, &c->sndbuf,
+ sizeof(int)) == -1) {
+ /* not supported in linux kernel < 2.6.14 */
+ if (errno != ENOPROTOOPT) {
+ close(m->fd);
+ return -1;
+ }
+ }
+
+ getsockopt(m->fd, SOL_SOCKET, SO_SNDBUF, &c->sndbuf, &socklen);
+
+ switch(c->ipproto) {
+ case AF_INET:
+ m->addr.ipv4.sin_family = AF_INET;
+ m->addr.ipv4.sin_port = htons(c->port);
+ m->addr.ipv4.sin_addr = c->client.inet_addr;
+ m->sockaddr_len = sizeof(struct sockaddr_in);
+ break;
+ case AF_INET6:
+ m->addr.ipv6.sin6_family = AF_INET6;
+ m->addr.ipv6.sin6_port = htons(c->port);
+ memcpy(&m->addr.ipv6.sin6_addr, &c->client.inet_addr6,
+ sizeof(struct in6_addr));
+ m->sockaddr_len = sizeof(struct sockaddr_in6);
+ break;
+ default:
+ ret = -1;
+ break;
+ }
+
+ if (ret == -1) {
+ close(m->fd);
+ return -1;
+ }
+
+ if (fcntl(m->fd, F_SETFL, O_NONBLOCK) == -1) {
+ close(m->fd);
+ return -1;
+ }
+
+ ret = connect(m->fd, (struct sockaddr *)&m->addr, m->sockaddr_len);
+ if (ret == -1) {
+ if (errno == EINPROGRESS) {
+ /* connection in progress ... */
+ m->state = TCP_CLIENT_DISCONNECTED;
+ } else if (errno == ECONNREFUSED) {
+ /* connection refused. */
+ m->state = TCP_CLIENT_DISCONNECTED;
+ } else {
+ /* unexpected error, give up. */
+ close(m->fd);
+ return -1;
+ }
+ } else {
+ /* very unlikely at this stage. */
+ m->state = TCP_CLIENT_CONNECTED;
+ }
+ return 0;
+}
+
+static struct tcp_conf *tcp_client_conf; /* XXX: need this to re-connect. */
+
+struct tcp_sock *tcp_client_create(struct tcp_conf *c)
+{
+ struct tcp_sock *m;
+
+ tcp_client_conf = c;
+
+ m = calloc(sizeof(struct tcp_sock), 1);
+ if (m == NULL)
+ return NULL;
+
+ if (tcp_client_init(m, c) == -1) {
+ free(m);
+ return NULL;
+ }
+
+ return m;
+}
+
+void tcp_client_destroy(struct tcp_sock *m)
+{
+ close(m->fd);
+ free(m);
+}
+
+int tcp_accept(struct tcp_sock *m)
+{
+ int ret;
+
+ /* we got an attempt to connect but we already have a client? */
+ if (m->state != TCP_SERVER_ACCEPTING) {
+ /* clear the session and restart ... */
+ unregister_fd(m->client_fd, STATE(fds));
+ close(m->client_fd);
+ m->client_fd = -1;
+ m->state = TCP_SERVER_ACCEPTING;
+ }
+
+ /* the other peer wants to connect ... */
+ ret = accept(m->fd, NULL, NULL);
+ if (ret == -1) {
+ if (errno != EAGAIN) {
+ /* unexpected error. Give us another try. */
+ m->state = TCP_SERVER_ACCEPTING;
+ } else {
+ /* waiting for new connections. */
+ m->state = TCP_SERVER_ACCEPTING;
+ }
+ } else {
+ /* the peer finally got connected. */
+ if (fcntl(ret, F_SETFL, O_NONBLOCK) == -1) {
+ /* close the connection and give us another chance. */
+ close(ret);
+ return -1;
+ }
+
+ m->client_fd = ret;
+ m->state = TCP_SERVER_CONNECTED;
+ register_fd(m->client_fd, STATE(fds));
+ }
+ return m->client_fd;
+}
+
+ssize_t tcp_send(struct tcp_sock *m, const void *data, int size)
+{
+ ssize_t ret = 0;
+
+ switch(m->state) {
+ case TCP_CLIENT_DISCONNECTED:
+ ret = connect(m->fd, (struct sockaddr *)&m->addr,
+ m->sockaddr_len);
+ if (ret == -1) {
+ if (errno == EINPROGRESS || errno == EALREADY) {
+ /* connection in progress or already trying. */
+ m->state = TCP_CLIENT_DISCONNECTED;
+ } else if (errno == ECONNREFUSED) {
+ /* connection refused. */
+ m->state = TCP_CLIENT_DISCONNECTED;
+ } else {
+ /* unexpected error, give up. */
+ m->state = TCP_CLIENT_DISCONNECTED;
+ }
+ break;
+ } else {
+ /* we got connected :) */
+ m->state = TCP_CLIENT_CONNECTED;
+ }
+ case TCP_CLIENT_CONNECTED:
+ ret = sendto(m->fd, data, size, 0,
+ (struct sockaddr *) &m->addr, m->sockaddr_len);
+ if (ret == -1) {
+ if (errno == EPIPE || errno == ECONNRESET) {
+ close(m->fd);
+ tcp_client_init(m, tcp_client_conf);
+ m->state = TCP_CLIENT_DISCONNECTED;
+ } else {
+ m->stats.error++;
+ return 0;
+ }
+ }
+ }
+
+ if (ret >= 0) {
+ m->stats.bytes += ret;
+ m->stats.messages++;
+ }
+ return ret;
+}
+
+ssize_t tcp_recv(struct tcp_sock *m, void *data, int size)
+{
+ ssize_t ret = 0;
+ socklen_t sin_size = sizeof(struct sockaddr_in);
+
+ /* we are not connected, skip. */
+ if (m->state != TCP_SERVER_CONNECTED)
+ return 0;
+
+ ret = recvfrom(m->client_fd, data, size, 0,
+ (struct sockaddr *)&m->addr, &sin_size);
+ if (ret == -1) {
+ /* the other peer has disconnected... */
+ if (errno == ENOTCONN) {
+ unregister_fd(m->client_fd, STATE(fds));
+ close(m->client_fd);
+ m->client_fd = -1;
+ m->state = TCP_SERVER_ACCEPTING;
+ tcp_accept(m);
+ } else if (errno != EAGAIN) {
+ m->stats.error++;
+ }
+ } else if (ret == 0) {
+ /* the other peer has closed the connection... */
+ unregister_fd(m->client_fd, STATE(fds));
+ close(m->client_fd);
+ m->client_fd = -1;
+ m->state = TCP_SERVER_ACCEPTING;
+ tcp_accept(m);
+ }
+
+ if (ret >= 0) {
+ m->stats.bytes += ret;
+ m->stats.messages++;
+ }
+ return ret;
+}
+
+int tcp_get_fd(struct tcp_sock *m)
+{
+ return m->fd;
+}
+
+int tcp_isset(struct tcp_sock *m, fd_set *readfds)
+{
+ return m->client_fd >= 0 ? FD_ISSET(m->client_fd, readfds) : 0;
+}
+
+int tcp_accept_isset(struct tcp_sock *m, fd_set *readfds)
+{
+ return FD_ISSET(m->fd, readfds);
+}
+
+int
+tcp_snprintf_stats(char *buf, size_t buflen, char *ifname,
+ struct tcp_sock *client, struct tcp_sock *server)
+{
+ size_t size;
+ struct tcp_stats *s = &client->stats, *r = &server->stats;
+
+ size = snprintf(buf, buflen, "TCP traffic (active device=%s) "
+ "server=%s client=%s:\n"
+ "%20llu Bytes sent "
+ "%20llu Bytes recv\n"
+ "%20llu Pckts sent "
+ "%20llu Pckts recv\n"
+ "%20llu Error send "
+ "%20llu Error recv\n\n",
+ ifname,
+ server->state == TCP_SERVER_CONNECTED ?
+ "connected" : "disconnected",
+ client->state == TCP_CLIENT_CONNECTED ?
+ "connected" : "disconnected",
+ (unsigned long long)s->bytes,
+ (unsigned long long)r->bytes,
+ (unsigned long long)s->messages,
+ (unsigned long long)r->messages,
+ (unsigned long long)s->error,
+ (unsigned long long)r->error);
+ return size;
+}
+
+int
+tcp_snprintf_stats2(char *buf, size_t buflen, const char *ifname,
+ const char *status, int active,
+ struct tcp_stats *s, struct tcp_stats *r)
+{
+ size_t size;
+
+ size = snprintf(buf, buflen,
+ "TCP traffic device=%s status=%s role=%s:\n"
+ "%20llu Bytes sent "
+ "%20llu Bytes recv\n"
+ "%20llu Pckts sent "
+ "%20llu Pckts recv\n"
+ "%20llu Error send "
+ "%20llu Error recv\n\n",
+ ifname, status, active ? "ACTIVE" : "BACKUP",
+ (unsigned long long)s->bytes,
+ (unsigned long long)r->bytes,
+ (unsigned long long)s->messages,
+ (unsigned long long)r->messages,
+ (unsigned long long)s->error,
+ (unsigned long long)r->error);
+ return size;
+}
diff --git a/src/udp.c b/src/udp.c
index 4b9eb80..ecaa46e 100644
--- a/src/udp.c
+++ b/src/udp.c
@@ -214,6 +214,11 @@ int udp_get_fd(struct udp_sock *m)
return m->fd;
}
+int udp_isset(struct udp_sock *m, fd_set *readfds)
+{
+ return FD_ISSET(m->fd, readfds);
+}
+
int
udp_snprintf_stats(char *buf, size_t buflen, char *ifname,
struct udp_stats *s, struct udp_stats *r)