From f692cec763d67caae54a4f47446657c390563319 Mon Sep 17 00:00:00 2001 From: Adam Ierymenko Date: Tue, 27 Oct 2015 14:04:12 -0700 Subject: Change how cluster relays packets -- just PROXY_UNITE and then send packet via normal ZeroTier front plane -- more efficient and eliminates fragmentation issues. --- node/Cluster.cpp | 195 +++++++++++++++++++++++++++---------------------------- 1 file changed, 96 insertions(+), 99 deletions(-) (limited to 'node/Cluster.cpp') diff --git a/node/Cluster.cpp b/node/Cluster.cpp index eef02bc7..c18663bc 100644 --- a/node/Cluster.cpp +++ b/node/Cluster.cpp @@ -250,102 +250,87 @@ void Cluster::handleIncomingStateMessage(const void *msg,unsigned int len) } } break; - case STATE_MESSAGE_RELAY: { + case STATE_MESSAGE_PROXY_UNITE: { + const Address localPeerAddress(dmsg.field(ptr,ZT_ADDRESS_LENGTH),ZT_ADDRESS_LENGTH); ptr += ZT_ADDRESS_LENGTH; + const Address remotePeerAddress(dmsg.field(ptr,ZT_ADDRESS_LENGTH),ZT_ADDRESS_LENGTH); ptr += ZT_ADDRESS_LENGTH; const unsigned int numRemotePeerPaths = dmsg[ptr++]; InetAddress remotePeerPaths[256]; // size is 8-bit, so 256 is max for(unsigned int i=0;i(ptr); ptr += 2; - const void *packet = (const void *)dmsg.field(ptr,packetLen); ptr += packetLen; - - if (packetLen >= ZT_PROTO_MIN_FRAGMENT_LENGTH) { // ignore anything too short to contain a dest address - const Address destinationAddress(reinterpret_cast(packet) + 8,ZT_ADDRESS_LENGTH); - TRACE("[%u] relay %u bytes to %s (%u remote paths included)",(unsigned int)fromMemberId,packetLen,destinationAddress.toString().c_str(),numRemotePeerPaths); - - SharedPtr destinationPeer(RR->topology->getPeer(destinationAddress)); - if (destinationPeer) { - if ( - (destinationPeer->send(RR,packet,packetLen,RR->node->now()))&& - (numRemotePeerPaths > 0)&& - (packetLen >= 18)&& - (reinterpret_cast(packet)[ZT_PACKET_FRAGMENT_IDX_FRAGMENT_INDICATOR] == ZT_PACKET_FRAGMENT_INDICATOR) - ) { - // If remote peer paths were sent with this relayed packet, we do - // RENDEZVOUS. It's handled here for cluster-relayed packets since - // we don't have both Peer records so this is a different path. - - const Address remotePeerAddress(reinterpret_cast(packet) + 13,ZT_ADDRESS_LENGTH); - - InetAddress bestDestV4,bestDestV6; - destinationPeer->getBestActiveAddresses(RR->node->now(),bestDestV4,bestDestV6); - InetAddress bestRemoteV4,bestRemoteV6; - for(unsigned int i=0;iidentity.address(),Packet::VERB_RENDEZVOUS); - rendezvousForDest.append((uint8_t)0); - remotePeerAddress.appendTo(rendezvousForDest); - - Buffer<2048> rendezvousForOtherEnd; - remotePeerAddress.appendTo(rendezvousForOtherEnd); - rendezvousForOtherEnd.append((uint8_t)Packet::VERB_RENDEZVOUS); - const unsigned int rendezvousForOtherEndPayloadSizePtr = rendezvousForOtherEnd.size(); - rendezvousForOtherEnd.addSize(2); // space for actual packet payload length - rendezvousForOtherEnd.append((uint8_t)0); // flags == 0 - destinationAddress.appendTo(rendezvousForOtherEnd); - - bool haveMatch = false; - if ((bestDestV6)&&(bestRemoteV6)) { - haveMatch = true; - - rendezvousForDest.append((uint16_t)bestRemoteV6.port()); - rendezvousForDest.append((uint8_t)16); - rendezvousForDest.append(bestRemoteV6.rawIpData(),16); - - rendezvousForOtherEnd.append((uint16_t)bestDestV6.port()); - rendezvousForOtherEnd.append((uint8_t)16); - rendezvousForOtherEnd.append(bestDestV6.rawIpData(),16); - rendezvousForOtherEnd.setAt(rendezvousForOtherEndPayloadSizePtr,(uint16_t)(9 + 16)); - } else if ((bestDestV4)&&(bestRemoteV4)) { - haveMatch = true; - - rendezvousForDest.append((uint16_t)bestRemoteV4.port()); - rendezvousForDest.append((uint8_t)4); - rendezvousForDest.append(bestRemoteV4.rawIpData(),4); - - rendezvousForOtherEnd.append((uint16_t)bestDestV4.port()); - rendezvousForOtherEnd.append((uint8_t)4); - rendezvousForOtherEnd.append(bestDestV4.rawIpData(),4); - rendezvousForOtherEnd.setAt(rendezvousForOtherEndPayloadSizePtr,(uint16_t)(9 + 4)); - } - if (haveMatch) { - _send(fromMemberId,STATE_MESSAGE_PROXY_SEND,rendezvousForOtherEnd.data(),rendezvousForOtherEnd.size()); - RR->sw->send(rendezvousForDest,true,0); - } + TRACE("[%u] requested proxy unite between local peer %s and remote peer %s",(unsigned int)fromMemberId,localPeerAddress.toString().c_str(),remotePeerAddress.toString().c_str()); + + SharedPtr localPeer(RR->topology->getPeer(localPeerAddress)); + if ((localPeer)&&(numRemotePeerPaths > 0)) { + InetAddress bestLocalV4,bestLocalV6; + localPeer->getBestActiveAddresses(RR->node->now(),bestLocalV4,bestLocalV6); + + InetAddress bestRemoteV4,bestRemoteV6; + for(unsigned int i=0;iidentity.address(),Packet::VERB_RENDEZVOUS); + rendezvousForLocal.append((uint8_t)0); + remotePeerAddress.appendTo(rendezvousForLocal); + + Buffer<2048> rendezvousForRemote; + remotePeerAddress.appendTo(rendezvousForRemote); + rendezvousForRemote.append((uint8_t)Packet::VERB_RENDEZVOUS); + const unsigned int rendezvousForOtherEndPayloadSizePtr = rendezvousForRemote.size(); + rendezvousForRemote.addSize(2); // space for actual packet payload length + rendezvousForRemote.append((uint8_t)0); // flags == 0 + localPeerAddress.appendTo(rendezvousForRemote); + + bool haveMatch = false; + if ((bestLocalV6)&&(bestRemoteV6)) { + haveMatch = true; + + rendezvousForLocal.append((uint16_t)bestRemoteV6.port()); + rendezvousForLocal.append((uint8_t)16); + rendezvousForLocal.append(bestRemoteV6.rawIpData(),16); + + rendezvousForRemote.append((uint16_t)bestLocalV6.port()); + rendezvousForRemote.append((uint8_t)16); + rendezvousForRemote.append(bestLocalV6.rawIpData(),16); + rendezvousForRemote.setAt(rendezvousForOtherEndPayloadSizePtr,(uint16_t)(9 + 16)); + } else if ((bestLocalV4)&&(bestRemoteV4)) { + haveMatch = true; + + rendezvousForLocal.append((uint16_t)bestRemoteV4.port()); + rendezvousForLocal.append((uint8_t)4); + rendezvousForLocal.append(bestRemoteV4.rawIpData(),4); + + rendezvousForRemote.append((uint16_t)bestLocalV4.port()); + rendezvousForRemote.append((uint8_t)4); + rendezvousForRemote.append(bestLocalV4.rawIpData(),4); + rendezvousForRemote.setAt(rendezvousForOtherEndPayloadSizePtr,(uint16_t)(9 + 4)); + } + + if (haveMatch) { + _send(fromMemberId,STATE_MESSAGE_PROXY_SEND,rendezvousForRemote.data(),rendezvousForRemote.size()); + RR->sw->send(rendezvousForLocal,true,0); + } } } break; case STATE_MESSAGE_PROXY_SEND: { - const Address rcpt(dmsg.field(ptr,ZT_ADDRESS_LENGTH),ZT_ADDRESS_LENGTH); + const Address rcpt(dmsg.field(ptr,ZT_ADDRESS_LENGTH),ZT_ADDRESS_LENGTH); ptr += ZT_ADDRESS_LENGTH; const Packet::Verb verb = (Packet::Verb)dmsg[ptr++]; const unsigned int len = dmsg.at(ptr); ptr += 2; Packet outp(rcpt,RR->identity.address(),verb); - outp.append(dmsg.field(ptr,len),len); + outp.append(dmsg.field(ptr,len),len); ptr += len; RR->sw->send(outp,true,0); TRACE("[%u] proxy send %s to %s length %u",(unsigned int)fromMemberId,Packet::verbString(verb),rcpt.toString().c_str(),len); } break; @@ -364,13 +349,13 @@ void Cluster::handleIncomingStateMessage(const void *msg,unsigned int len) } } -bool Cluster::sendViaCluster(const Address &fromPeerAddress,const Address &toPeerAddress,const void *data,unsigned int len) +bool Cluster::sendViaCluster(const Address &fromPeerAddress,const Address &toPeerAddress,const void *data,unsigned int len,bool unite) { if (len > 16384) // sanity check return false; uint64_t mostRecentTimestamp = 0; - uint16_t canHasPeer = 0; + unsigned int canHasPeer = 0; { // Anyone got this peer? Mutex::Lock _l2(_peerAffinities_m); @@ -387,25 +372,37 @@ bool Cluster::sendViaCluster(const Address &fromPeerAddress,const Address &toPee const uint64_t now = RR->node->now(); if ((now - mostRecentTimestamp) < ZT_PEER_ACTIVITY_TIMEOUT) { - Buffer<16384> buf; - - InetAddress v4,v6; - if (fromPeerAddress) { - SharedPtr fromPeer(RR->topology->getPeer(fromPeerAddress)); - if (fromPeer) - fromPeer->getBestActiveAddresses(now,v4,v6); + Buffer<2048> buf; + + if (unite) { + InetAddress v4,v6; + if (fromPeerAddress) { + SharedPtr fromPeer(RR->topology->getPeer(fromPeerAddress)); + if (fromPeer) + fromPeer->getBestActiveAddresses(now,v4,v6); + } + uint8_t addrCount = 0; + if (v4) + ++addrCount; + if (v6) + ++addrCount; + if (addrCount) { + toPeerAddress.appendTo(buf); + fromPeerAddress.appendTo(buf); + buf.append(addrCount); + if (v4) + v4.serialize(buf); + if (v6) + v6.serialize(buf); + } } - buf.append((uint8_t)( (v4) ? ((v6) ? 2 : 1) : ((v6) ? 1 : 0) )); - if (v4) - v4.serialize(buf); - if (v6) - v6.serialize(buf); - buf.append((uint16_t)len); - buf.append(data,len); { Mutex::Lock _l2(_members[canHasPeer].lock); - _send(canHasPeer,STATE_MESSAGE_RELAY,buf.data(),buf.size()); + if (buf.size() > 0) + _send(canHasPeer,STATE_MESSAGE_PROXY_UNITE,buf.data(),buf.size()); + if (_members[canHasPeer].zeroTierPhysicalEndpoints.size() > 0) + RR->node->putPacket(InetAddress(),_members[canHasPeer].zeroTierPhysicalEndpoints.front(),data,len); } TRACE("sendViaCluster(): relaying %u bytes from %s to %s by way of %u",len,fromPeerAddress.toString().c_str(),toPeerAddress.toString().c_str(),(unsigned int)canHasPeer); -- cgit v1.2.3