diff options
Diffstat (limited to 'src/sync-mode.c')
-rw-r--r-- | src/sync-mode.c | 109 |
1 files changed, 87 insertions, 22 deletions
diff --git a/src/sync-mode.c b/src/sync-mode.c index 00e2f7b..0dbd12d 100644 --- a/src/sync-mode.c +++ b/src/sync-mode.c @@ -33,8 +33,10 @@ #include <time.h> #include <string.h> #include <stdlib.h> +#include <net/if.h> -static void do_mcast_handler_step(struct nethdr *net, size_t remain) +static void +do_mcast_handler_step(int if_idx, struct nethdr *net, size_t remain) { char __ct[nfct_maxsize()]; struct nf_conntrack *ct = (struct nf_conntrack *)(void*) __ct; @@ -47,6 +49,9 @@ static void do_mcast_handler_step(struct nethdr *net, size_t remain) return; } + if (if_idx != mcast_get_current_ifidx(STATE_SYNC(mcast_client))) + mcast_set_current_link(STATE_SYNC(mcast_client), if_idx); + switch (STATE_SYNC(sync)->recv(net)) { case MSG_DATA: break; @@ -111,13 +116,13 @@ retry: } /* handler for multicast messages received */ -static void mcast_handler(void) +static void mcast_handler(struct mcast_sock *m, int if_idx) { ssize_t numbytes; ssize_t remain; char __net[65536], *ptr = __net; /* XXX: maximum MTU for IPv4 */ - numbytes = mcast_recv(STATE_SYNC(mcast_server), __net, sizeof(__net)); + numbytes = mcast_recv(m, __net, sizeof(__net)); if (numbytes <= 0) return; @@ -160,12 +165,46 @@ static void mcast_handler(void) HDR_NETWORK2HOST(net); - do_mcast_handler_step(net, remain); + do_mcast_handler_step(if_idx, net, remain); ptr += net->len; remain -= net->len; } } +/* select a new interface candidate in a round robin basis */ +static void mcast_iface_candidate(void) +{ + int i, idx; + unsigned int flags; + char buf[IFNAMSIZ]; + + for (i=0; i<STATE_SYNC(mcast_client)->num_links; i++) { + idx = mcast_get_ifidx(STATE_SYNC(mcast_client), i); + if (idx == mcast_get_current_ifidx(STATE_SYNC(mcast_client))) + continue; + nlif_get_ifflags(STATE_SYNC(mcast_iface), idx, &flags); + if (flags & (IFF_RUNNING | IFF_UP)) { + mcast_set_current_link(STATE_SYNC(mcast_client), i); + dlog(LOG_NOTICE, "device `%s' becomes multicast " + "dedicated link", + if_indextoname(idx, buf)); + return; + } + } + dlog(LOG_ERR, "no dedicated links available!"); +} + +static void mcast_iface_handler(void) +{ + int idx = mcast_get_current_ifidx(STATE_SYNC(mcast_client)); + unsigned int flags; + + nlif_catch(STATE_SYNC(mcast_iface)); + nlif_get_ifflags(STATE_SYNC(mcast_iface), idx, &flags); + if (!(flags & IFF_RUNNING) || !(flags & IFF_UP)) + mcast_iface_candidate(); +} + static int init_sync(void) { state.sync = malloc(sizeof(struct ct_sync_state)); @@ -216,30 +255,35 @@ static int init_sync(void) } /* multicast server to receive events from the wire */ - STATE_SYNC(mcast_server) = mcast_server_create(&CONFIG(mcast)); + STATE_SYNC(mcast_server) = + mcast_server_create_multi(CONFIG(mcast), CONFIG(mcast_links)); if (STATE_SYNC(mcast_server) == NULL) { dlog(LOG_ERR, "can't open multicast server!"); return -1; } - dlog(LOG_NOTICE, "multicast server socket receiver queue " - "has been set to %d bytes", CONFIG(mcast).rcvbuf); - /* multicast client to send events on the wire */ - STATE_SYNC(mcast_client) = mcast_client_create(&CONFIG(mcast)); + STATE_SYNC(mcast_client) = + mcast_client_create_multi(CONFIG(mcast), CONFIG(mcast_links)); if (STATE_SYNC(mcast_client) == NULL) { dlog(LOG_ERR, "can't open client multicast socket"); - mcast_server_destroy(STATE_SYNC(mcast_server)); + mcast_server_destroy_multi(STATE_SYNC(mcast_server)); return -1; } + /* we only use one link to send events, but all to receive them */ + mcast_set_current_link(STATE_SYNC(mcast_client), + CONFIG(mcast_default_link)); - dlog(LOG_NOTICE, "multicast client socket sender queue " - "has been set to %d bytes", CONFIG(mcast).sndbuf); - - if (mcast_buffered_init(CONFIG(mcast).mtu) == -1) { + if (mcast_buffered_init(STATE_SYNC(mcast_client)->max_mtu) == -1) { dlog(LOG_ERR, "can't init tx buffer!"); - mcast_server_destroy(STATE_SYNC(mcast_server)); - mcast_client_destroy(STATE_SYNC(mcast_client)); + mcast_server_destroy_multi(STATE_SYNC(mcast_server)); + mcast_client_destroy_multi(STATE_SYNC(mcast_client)); + return -1; + } + + STATE_SYNC(mcast_iface) = nl_init_interface_handler(); + if (!STATE_SYNC(mcast_iface)) { + dlog(LOG_ERR, "can't open interface watcher"); return -1; } @@ -257,7 +301,14 @@ static int init_sync(void) static int register_fds_sync(struct fds *fds) { - if (register_fd(STATE_SYNC(mcast_server->fd), fds) == -1) + int i; + + for (i=0; i<STATE_SYNC(mcast_server)->num_links; i++) { + int fd = mcast_get_fd(STATE_SYNC(mcast_server)->multi[i]); + if (register_fd(fd, fds) == -1) + return -1; + } + if (register_fd(nlif_fd(STATE_SYNC(mcast_iface)), fds) == -1) return -1; if (register_fd(queue_get_eventfd(STATE_SYNC(tx_queue)), fds) == -1) @@ -268,13 +319,20 @@ static int register_fds_sync(struct fds *fds) static void run_sync(fd_set *readfds) { - /* multicast packet has been received */ - if (FD_ISSET(STATE_SYNC(mcast_server->fd), readfds)) - mcast_handler(); + int i; + + for (i=0; i<STATE_SYNC(mcast_server)->num_links; i++) { + int fd = mcast_get_fd(STATE_SYNC(mcast_server)->multi[i]); + if (FD_ISSET(fd, readfds)) + mcast_handler(STATE_SYNC(mcast_server)->multi[i], i); + } if (FD_ISSET(queue_get_eventfd(STATE_SYNC(tx_queue)), readfds)) STATE_SYNC(sync)->xmit(); + if (FD_ISSET(nlif_fd(STATE_SYNC(mcast_iface)), readfds)) + mcast_iface_handler(); + /* flush pending messages */ mcast_buffered_pending_netmsg(STATE_SYNC(mcast_client)); } @@ -284,8 +342,10 @@ static void kill_sync(void) cache_destroy(STATE_SYNC(internal)); cache_destroy(STATE_SYNC(external)); - mcast_server_destroy(STATE_SYNC(mcast_server)); - mcast_client_destroy(STATE_SYNC(mcast_client)); + mcast_server_destroy_multi(STATE_SYNC(mcast_server)); + mcast_client_destroy_multi(STATE_SYNC(mcast_client)); + + nlif_close(STATE_SYNC(mcast_iface)); mcast_buffered_destroy(); queue_destroy(STATE_SYNC(tx_queue)); @@ -418,6 +478,11 @@ static int local_handler_sync(int fd, int type, void *data) cache_stats_extended(STATE_SYNC(internal), fd); cache_stats_extended(STATE_SYNC(external), fd); break; + case STATS_MULTICAST: + mcast_dump_stats_extended(fd, STATE_SYNC(mcast_client), + STATE_SYNC(mcast_server), + STATE_SYNC(mcast_iface)); + break; default: if (STATE_SYNC(sync)->local) ret = STATE_SYNC(sync)->local(fd, type, data); |