diff options
-rw-r--r-- | src/sync-ftfw.c | 65 |
1 files changed, 22 insertions, 43 deletions
diff --git a/src/sync-ftfw.c b/src/sync-ftfw.c index bb53849..565a4bc 100644 --- a/src/sync-ftfw.c +++ b/src/sync-ftfw.c @@ -77,6 +77,22 @@ static struct cache_extra cache_ftfw_extra = { .destroy = cache_ftfw_del }; +static void nethdr_set_hello(struct nethdr *net) +{ + switch(hello_state) { + case HELLO_INIT: + hello_state = HELLO_SAY; + /* fall through */ + case HELLO_SAY: + net->flags |= NET_F_HELLO; + break; + } + if (say_hello_back) { + net->flags |= NET_F_HELLO_BACK; + say_hello_back = 0; + } +} + static void tx_queue_add_ctlmsg(uint32_t flags, uint32_t from, uint32_t to) { struct queue_object *qobj; @@ -92,20 +108,6 @@ static void tx_queue_add_ctlmsg(uint32_t flags, uint32_t from, uint32_t to) ack->from = from; ack->to = to; - switch(hello_state) { - case HELLO_INIT: - hello_state = HELLO_SAY; - /* fall through */ - case HELLO_SAY: - ack->flags |= NET_F_HELLO; - break; - } - - if (say_hello_back) { - ack->flags |= NET_F_HELLO_BACK; - say_hello_back = 0; - } - queue_add(tx_queue, &qobj->qnode); } @@ -122,20 +124,6 @@ static void tx_queue_add_ctlmsg2(uint32_t flags) ctl->type = NET_T_CTL; ctl->flags = flags; - switch(hello_state) { - case HELLO_INIT: - hello_state = HELLO_SAY; - /* fall through */ - case HELLO_SAY: - ctl->flags |= NET_F_HELLO; - break; - } - - if (say_hello_back) { - ctl->flags |= NET_F_HELLO_BACK; - say_hello_back = 0; - } - queue_add(tx_queue, &qobj->qnode); } @@ -474,19 +462,7 @@ static void ftfw_send(struct nethdr *net, struct cache_object *obj) if (queue_in(rs_queue, &cn->qnode)) queue_del(&cn->qnode); - switch(hello_state) { - case HELLO_INIT: - hello_state = HELLO_SAY; - /* fall through */ - case HELLO_SAY: - net->flags |= NET_F_HELLO; - break; - } - - if (say_hello_back) { - net->flags |= NET_F_HELLO_BACK; - say_hello_back = 0; - } + nethdr_set_hello(net); cn->seq = ntohl(net->seq); queue_add(rs_queue, &cn->qnode); @@ -496,10 +472,14 @@ static void ftfw_send(struct nethdr *net, struct cache_object *obj) static int tx_queue_xmit(struct queue_node *n, const void *data) { + queue_del(n); + switch(n->type) { case Q_ELEM_CTL: { struct nethdr *net = queue_node_data(n); + nethdr_set_hello(net); + if (IS_ACK(net) || IS_NACK(net) || IS_RESYNC(net)) { nethdr_set_ack(net); } else if (IS_ALIVE(net)) { @@ -516,7 +496,6 @@ static int tx_queue_xmit(struct queue_node *n, const void *data) mcast_buffered_send_netmsg(STATE_SYNC(mcast_client), net); HDR_NETWORK2HOST(net); - queue_del(n); if (IS_ACK(net) || IS_NACK(net) || IS_RESYNC(net)) queue_add(rs_queue, n); else @@ -533,11 +512,11 @@ static int tx_queue_xmit(struct queue_node *n, const void *data) obj = cache_data_get_object(STATE_SYNC(internal), cn); type = object_status_to_network_type(obj->status); net = BUILD_NETMSG(obj->ct, type); + nethdr_set_hello(net); dp("tx_list sq: %u fl:%u len:%u\n", ntohl(net->seq), net->flags, ntohs(net->len)); - queue_del(n); mcast_buffered_send_netmsg(STATE_SYNC(mcast_client), net); ftfw_send(net, obj); break; |