summaryrefslogtreecommitdiff
path: root/src/sync-ftfw.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/sync-ftfw.c')
-rw-r--r--src/sync-ftfw.c37
1 files changed, 11 insertions, 26 deletions
diff --git a/src/sync-ftfw.c b/src/sync-ftfw.c
index 565a4bc..d544a7b 100644
--- a/src/sync-ftfw.c
+++ b/src/sync-ftfw.c
@@ -34,7 +34,6 @@
#define dp(...)
#endif
-struct queue *tx_queue;
struct queue *rs_queue;
static uint32_t exp_seq;
static uint32_t window;
@@ -108,7 +107,7 @@ static void tx_queue_add_ctlmsg(uint32_t flags, uint32_t from, uint32_t to)
ack->from = from;
ack->to = to;
- queue_add(tx_queue, &qobj->qnode);
+ queue_add(STATE_SYNC(tx_queue), &qobj->qnode);
}
static void tx_queue_add_ctlmsg2(uint32_t flags)
@@ -124,7 +123,7 @@ static void tx_queue_add_ctlmsg2(uint32_t flags)
ctl->type = NET_T_CTL;
ctl->flags = flags;
- queue_add(tx_queue, &qobj->qnode);
+ queue_add(STATE_SYNC(tx_queue), &qobj->qnode);
}
/* this function is called from the alarm framework */
@@ -144,11 +143,6 @@ static void do_alive_alarm(struct alarm_block *a, void *data)
static int ftfw_init(void)
{
- tx_queue = queue_create(INT_MAX, QUEUE_F_EVFD);
- if (tx_queue == NULL) {
- dlog(LOG_ERR, "cannot create tx queue");
- return -1;
- }
rs_queue = queue_create(INT_MAX, 0);
if (rs_queue == NULL) {
dlog(LOG_ERR, "cannot create rs queue");
@@ -167,7 +161,6 @@ static int ftfw_init(void)
static void ftfw_kill(void)
{
queue_destroy(rs_queue);
- queue_destroy(tx_queue);
}
static int do_cache_to_tx(void *data1, void *data2)
@@ -178,7 +171,7 @@ static int do_cache_to_tx(void *data1, void *data2)
if (queue_in(rs_queue, &cn->qnode))
queue_del(&cn->qnode);
- queue_add(tx_queue, &cn->qnode);
+ queue_add(STATE_SYNC(tx_queue), &cn->qnode);
return 0;
}
@@ -259,7 +252,7 @@ static int rs_queue_to_tx(struct queue_node *n, const void *data)
net->seq, net->flags, net->len);
queue_del(n);
- queue_add(tx_queue, n);
+ queue_add(STATE_SYNC(tx_queue), n);
break;
}
case Q_ELEM_OBJ: {
@@ -274,7 +267,7 @@ static int rs_queue_to_tx(struct queue_node *n, const void *data)
dp("resending nack'ed (oldseq=%u)\n", cn->seq);
queue_del(n);
- queue_add(tx_queue, n);
+ queue_add(STATE_SYNC(tx_queue), n);
break;
}
}
@@ -526,19 +519,12 @@ static int tx_queue_xmit(struct queue_node *n, const void *data)
return 0;
}
-static void ftfw_run(fd_set *readfds)
+static void ftfw_xmit(void)
{
- if (FD_ISSET(queue_get_eventfd(tx_queue), readfds)) {
- queue_iterate(tx_queue, NULL, tx_queue_xmit);
- add_alarm(&alive_alarm, 1, 0);
- dp("tx_queue_len:%u rs_queue_len:%u\n",
- queue_len(tx_queue), queue_len(rs_queue));
- }
-}
-
-static int ftfw_register_fds(struct fds *fds)
-{
- return register_fd(queue_get_eventfd(tx_queue), fds);
+ queue_iterate(STATE_SYNC(tx_queue), NULL, tx_queue_xmit);
+ add_alarm(&alive_alarm, ALIVE_INT, 0);
+ dp("tx_queue_len:%u rs_queue_len:%u\n",
+ queue_len(tx_queue), queue_len(rs_queue));
}
struct sync_mode sync_ftfw = {
@@ -550,6 +536,5 @@ struct sync_mode sync_ftfw = {
.local = ftfw_local,
.recv = ftfw_recv,
.send = ftfw_send,
- .run = ftfw_run,
- .register_fds = ftfw_register_fds,
+ .xmit = ftfw_xmit,
};