From 5dbebc513ad67bb65c41c42a9c761c085309564f Mon Sep 17 00:00:00 2001 From: Adam Ierymenko Date: Wed, 1 Feb 2017 12:00:25 -0800 Subject: Minor send path refactor to make packet I/O work on clusters if they are members of networks. Also fix a crash if compiled in cluster mode but no cluster is enabled. --- node/Cluster.cpp | 201 +++++++++++++++++++++++++++++++++++++++---------------- node/Cluster.hpp | 42 +++++++++++- node/Network.cpp | 4 +- node/Switch.cpp | 126 ++++++++++++++++++++++------------ node/Switch.hpp | 6 +- 5 files changed, 271 insertions(+), 108 deletions(-) diff --git a/node/Cluster.cpp b/node/Cluster.cpp index 56a6a06d..16ef040b 100644 --- a/node/Cluster.cpp +++ b/node/Cluster.cpp @@ -342,17 +342,20 @@ void Cluster::handleIncomingStateMessage(const void *msg,unsigned int len) Identity id; ptr += id.deserialize(dmsg,ptr); if (id) { - RR->topology->saveIdentity(id); - { Mutex::Lock _l(_remotePeers_m); - _remotePeers[std::pair(id.address(),(unsigned int)fromMemberId)] = RR->node->now(); + _RemotePeer &rp = _remotePeers[std::pair(id.address(),(unsigned int)fromMemberId)]; + if (!rp.lastHavePeerReceived) { + RR->topology->saveIdentity(id); + id.agree(RR->identity,rp.key,ZT_PEER_SECRET_KEY_LENGTH); + } + rp.lastHavePeerReceived = RR->node->now(); } _ClusterSendQueueEntry *q[16384]; // 16384 is "tons" unsigned int qc = _sendQueue->getByDest(id.address(),q,16384); for(unsigned int i=0;isendViaCluster(q[i]->fromPeerAddress,q[i]->toPeerAddress,q[i]->data,q[i]->len,q[i]->unite); + this->relayViaCluster(q[i]->fromPeerAddress,q[i]->toPeerAddress,q[i]->data,q[i]->len,q[i]->unite); _sendQueue->returnToPool(q,qc); TRACE("[%u] has %s (retried %u queued sends)",(unsigned int)fromMemberId,id.address().toString().c_str(),qc); @@ -513,7 +516,79 @@ void Cluster::broadcastNetworkConfigChunk(const void *chunk,unsigned int len) } } -void Cluster::sendViaCluster(const Address &fromPeerAddress,const Address &toPeerAddress,const void *data,unsigned int len,bool unite) +int Cluster::prepSendViaCluster(const Address &toPeerAddress,Packet &outp,bool encrypt) +{ + const uint64_t now = RR->node->now(); + uint64_t mostRecentTs = 0; + int mostRecentMemberId = -1; + uint8_t mostRecentSecretKey[ZT_PEER_SECRET_KEY_LENGTH]; + { + Mutex::Lock _l2(_remotePeers_m); + std::map< std::pair,_RemotePeer >::const_iterator rpe(_remotePeers.lower_bound(std::pair(toPeerAddress,0))); + for(;;) { + if ((rpe == _remotePeers.end())||(rpe->first.first != toPeerAddress)) + break; + else if (rpe->second.lastHavePeerReceived > mostRecentTs) { + mostRecentTs = rpe->second.lastHavePeerReceived; + memcpy(mostRecentSecretKey,rpe->second.key,ZT_PEER_SECRET_KEY_LENGTH); + mostRecentMemberId = (int)rpe->first.second; + } + ++rpe; + } + } + + if (mostRecentMemberId >= 0) { + const uint64_t ageOfMostRecentHavePeerAnnouncement = now - mostRecentTs; + if (ageOfMostRecentHavePeerAnnouncement >= (ZT_PEER_ACTIVITY_TIMEOUT / 3)) { + if (ageOfMostRecentHavePeerAnnouncement >= ZT_PEER_ACTIVITY_TIMEOUT) + return -1; + + bool sendWantPeer = true; + { + Mutex::Lock _l(_remotePeers_m); + _RemotePeer &rp = _remotePeers[std::pair(toPeerAddress,(unsigned int)_id)]; + if ((now - rp.lastSentWantPeer) >= ZT_CLUSTER_WANT_PEER_EVERY) { + rp.lastSentWantPeer = now; + } else { + sendWantPeer = false; // don't flood WANT_PEER + } + } + if (sendWantPeer) { + char tmp[ZT_ADDRESS_LENGTH]; + toPeerAddress.copyTo(tmp,ZT_ADDRESS_LENGTH); + { + Mutex::Lock _l(_memberIds_m); + for(std::vector::const_iterator mid(_memberIds.begin());mid!=_memberIds.end();++mid) { + Mutex::Lock _l2(_members[*mid].lock); + _send(*mid,CLUSTER_MESSAGE_WANT_PEER,tmp,ZT_ADDRESS_LENGTH); + } + } + } + } + + outp.armor(mostRecentSecretKey,encrypt); + return mostRecentMemberId; + } else return -1; +} + +bool Cluster::sendViaCluster(int mostRecentMemberId,const Address &toPeerAddress,const void *data,unsigned int len) +{ + if ((mostRecentMemberId < 0)||(mostRecentMemberId >= ZT_CLUSTER_MAX_MEMBERS)) // sanity check + return false; + Mutex::Lock _l2(_members[mostRecentMemberId].lock); + for(std::vector::const_iterator i1(_zeroTierPhysicalEndpoints.begin());i1!=_zeroTierPhysicalEndpoints.end();++i1) { + for(std::vector::const_iterator i2(_members[mostRecentMemberId].zeroTierPhysicalEndpoints.begin());i2!=_members[mostRecentMemberId].zeroTierPhysicalEndpoints.end();++i2) { + if (i1->ss_family == i2->ss_family) { + TRACE("sendViaCluster sending %u bytes to %s by way of %u (%s->%s)",len,toPeerAddress.toString().c_str(),(unsigned int)mostRecentMemberId,i1->toString().c_str(),i2->toString().c_str()); + RR->node->putPacket(*i1,*i2,data,len); + return true; + } + } + } + return false; +} + +void Cluster::relayViaCluster(const Address &fromPeerAddress,const Address &toPeerAddress,const void *data,unsigned int len,bool unite) { if (len > ZT_PROTO_MAX_PACKET_LENGTH) // sanity check return; @@ -521,87 +596,101 @@ void Cluster::sendViaCluster(const Address &fromPeerAddress,const Address &toPee const uint64_t now = RR->node->now(); uint64_t mostRecentTs = 0; - unsigned int mostRecentMemberId = 0xffffffff; + int mostRecentMemberId = -1; { Mutex::Lock _l2(_remotePeers_m); - std::map< std::pair,uint64_t >::const_iterator rpe(_remotePeers.lower_bound(std::pair(toPeerAddress,0))); + std::map< std::pair,_RemotePeer >::const_iterator rpe(_remotePeers.lower_bound(std::pair(toPeerAddress,0))); for(;;) { if ((rpe == _remotePeers.end())||(rpe->first.first != toPeerAddress)) break; - else if (rpe->second > mostRecentTs) { - mostRecentTs = rpe->second; - mostRecentMemberId = rpe->first.second; + else if (rpe->second.lastHavePeerReceived > mostRecentTs) { + mostRecentTs = rpe->second.lastHavePeerReceived; + mostRecentMemberId = (int)rpe->first.second; } ++rpe; } } - const uint64_t age = now - mostRecentTs; - if (age >= (ZT_PEER_ACTIVITY_TIMEOUT / 3)) { - const bool enqueueAndWait = ((age >= ZT_PEER_ACTIVITY_TIMEOUT)||(mostRecentMemberId > 0xffff)); + const uint64_t ageOfMostRecentHavePeerAnnouncement = now - mostRecentTs; + if (ageOfMostRecentHavePeerAnnouncement >= (ZT_PEER_ACTIVITY_TIMEOUT / 3)) { + // Enqueue and wait if peer seems alive, but do WANT_PEER to refresh homing + const bool enqueueAndWait = ((ageOfMostRecentHavePeerAnnouncement >= ZT_PEER_ACTIVITY_TIMEOUT)||(mostRecentMemberId < 0)); // Poll everyone with WANT_PEER if the age of our most recent entry is // approaching expiration (or has expired, or does not exist). - char tmp[ZT_ADDRESS_LENGTH]; - toPeerAddress.copyTo(tmp,ZT_ADDRESS_LENGTH); + bool sendWantPeer = true; { - Mutex::Lock _l(_memberIds_m); - for(std::vector::const_iterator mid(_memberIds.begin());mid!=_memberIds.end();++mid) { - Mutex::Lock _l2(_members[*mid].lock); - _send(*mid,CLUSTER_MESSAGE_WANT_PEER,tmp,ZT_ADDRESS_LENGTH); + Mutex::Lock _l(_remotePeers_m); + _RemotePeer &rp = _remotePeers[std::pair(toPeerAddress,(unsigned int)_id)]; + if ((now - rp.lastSentWantPeer) >= ZT_CLUSTER_WANT_PEER_EVERY) { + rp.lastSentWantPeer = now; + } else { + sendWantPeer = false; // don't flood WANT_PEER + } + } + if (sendWantPeer) { + char tmp[ZT_ADDRESS_LENGTH]; + toPeerAddress.copyTo(tmp,ZT_ADDRESS_LENGTH); + { + Mutex::Lock _l(_memberIds_m); + for(std::vector::const_iterator mid(_memberIds.begin());mid!=_memberIds.end();++mid) { + Mutex::Lock _l2(_members[*mid].lock); + _send(*mid,CLUSTER_MESSAGE_WANT_PEER,tmp,ZT_ADDRESS_LENGTH); + } } } // If there isn't a good place to send via, then enqueue this for retrying // later and return after having broadcasted a WANT_PEER. if (enqueueAndWait) { - TRACE("sendViaCluster %s -> %s enqueueing to wait for HAVE_PEER",fromPeerAddress.toString().c_str(),toPeerAddress.toString().c_str()); + TRACE("relayViaCluster %s -> %s enqueueing to wait for HAVE_PEER",fromPeerAddress.toString().c_str(),toPeerAddress.toString().c_str()); _sendQueue->enqueue(now,fromPeerAddress,toPeerAddress,data,len,unite); return; } } - Buffer<1024> buf; - if (unite) { - InetAddress v4,v6; - if (fromPeerAddress) { - SharedPtr fromPeer(RR->topology->getPeerNoCache(fromPeerAddress)); - if (fromPeer) - fromPeer->getBestActiveAddresses(now,v4,v6); - } - uint8_t addrCount = 0; - if (v4) - ++addrCount; - if (v6) - ++addrCount; - if (addrCount) { - toPeerAddress.appendTo(buf); - fromPeerAddress.appendTo(buf); - buf.append(addrCount); + if (mostRecentMemberId >= 0) { + Buffer<1024> buf; + if (unite) { + InetAddress v4,v6; + if (fromPeerAddress) { + SharedPtr fromPeer(RR->topology->getPeerNoCache(fromPeerAddress)); + if (fromPeer) + fromPeer->getBestActiveAddresses(now,v4,v6); + } + uint8_t addrCount = 0; if (v4) - v4.serialize(buf); + ++addrCount; if (v6) - v6.serialize(buf); + ++addrCount; + if (addrCount) { + toPeerAddress.appendTo(buf); + fromPeerAddress.appendTo(buf); + buf.append(addrCount); + if (v4) + v4.serialize(buf); + if (v6) + v6.serialize(buf); + } } - } - { - Mutex::Lock _l2(_members[mostRecentMemberId].lock); - if (buf.size() > 0) - _send(mostRecentMemberId,CLUSTER_MESSAGE_PROXY_UNITE,buf.data(),buf.size()); - - for(std::vector::const_iterator i1(_zeroTierPhysicalEndpoints.begin());i1!=_zeroTierPhysicalEndpoints.end();++i1) { - for(std::vector::const_iterator i2(_members[mostRecentMemberId].zeroTierPhysicalEndpoints.begin());i2!=_members[mostRecentMemberId].zeroTierPhysicalEndpoints.end();++i2) { - if (i1->ss_family == i2->ss_family) { - TRACE("sendViaCluster relaying %u bytes from %s to %s by way of %u (%s->%s)",len,fromPeerAddress.toString().c_str(),toPeerAddress.toString().c_str(),(unsigned int)mostRecentMemberId,i1->toString().c_str(),i2->toString().c_str()); - RR->node->putPacket(*i1,*i2,data,len); - return; + { + Mutex::Lock _l2(_members[mostRecentMemberId].lock); + if (buf.size() > 0) + _send(mostRecentMemberId,CLUSTER_MESSAGE_PROXY_UNITE,buf.data(),buf.size()); + + for(std::vector::const_iterator i1(_zeroTierPhysicalEndpoints.begin());i1!=_zeroTierPhysicalEndpoints.end();++i1) { + for(std::vector::const_iterator i2(_members[mostRecentMemberId].zeroTierPhysicalEndpoints.begin());i2!=_members[mostRecentMemberId].zeroTierPhysicalEndpoints.end();++i2) { + if (i1->ss_family == i2->ss_family) { + TRACE("relayViaCluster relaying %u bytes from %s to %s by way of %u (%s->%s)",len,fromPeerAddress.toString().c_str(),toPeerAddress.toString().c_str(),(unsigned int)mostRecentMemberId,i1->toString().c_str(),i2->toString().c_str()); + RR->node->putPacket(*i1,*i2,data,len); + return; + } } } - } - TRACE("sendViaCluster relaying %u bytes from %s to %s by way of %u failed: no common endpoints with the same address family!",len,fromPeerAddress.toString().c_str(),toPeerAddress.toString().c_str(),(unsigned int)mostRecentMemberId); - return; + TRACE("relayViaCluster relaying %u bytes from %s to %s by way of %u failed: no common endpoints with the same address family!",len,fromPeerAddress.toString().c_str(),toPeerAddress.toString().c_str(),(unsigned int)mostRecentMemberId); + } } } @@ -663,8 +752,8 @@ void Cluster::doPeriodicTasks() _lastCleanedRemotePeers = now; Mutex::Lock _l(_remotePeers_m); - for(std::map< std::pair,uint64_t >::iterator rp(_remotePeers.begin());rp!=_remotePeers.end();) { - if ((now - rp->second) >= ZT_PEER_ACTIVITY_TIMEOUT) + for(std::map< std::pair,_RemotePeer >::iterator rp(_remotePeers.begin());rp!=_remotePeers.end();) { + if ((now - rp->second.lastHavePeerReceived) >= ZT_PEER_ACTIVITY_TIMEOUT) _remotePeers.erase(rp++); else ++rp; } diff --git a/node/Cluster.hpp b/node/Cluster.hpp index aba3b8a9..7ebef0c9 100644 --- a/node/Cluster.hpp +++ b/node/Cluster.hpp @@ -88,6 +88,11 @@ */ #define ZT_CLUSTER_SEND_QUEUE_DATA_MAX 1500 +/** + * We won't send WANT_PEER to other members more than every (ms) per recipient + */ +#define ZT_CLUSTER_WANT_PEER_EVERY 1000 + namespace ZeroTier { class RuntimeEnvironment; @@ -275,7 +280,30 @@ public: void broadcastNetworkConfigChunk(const void *chunk,unsigned int len); /** - * Send this packet via another node in this cluster if another node has this peer + * If the cluster has this peer, prepare the packet to send via cluster + * + * Note that outp is only armored (or modified at all) if the return value is a member ID. + * + * @param toPeerAddress Value of outp.destination(), simply to save additional lookup + * @param outp Packet to armor with peer key (via cluster knowledge of peer shared secret) + * @param encrypt If true, encrypt packet payload (passed to Packet::armor()) + * @return -1 if cluster does not know this peer, or a member ID to pass to sendViaCluster() + */ + int prepSendViaCluster(const Address &toPeerAddress,Packet &outp,bool encrypt); + + /** + * Send data via cluster front plane (packet head or fragment) + * + * @param haveMemberId Member ID that has this peer as returned by prepSendviaCluster() + * @param toPeerAddress Destination peer address + * @param data Packet or packet fragment data + * @param len Length of packet or fragment + * @return True if packet was sent (and outp was modified via armoring) + */ + bool sendViaCluster(int haveMemberId,const Address &toPeerAddress,const void *data,unsigned int len); + + /** + * Relay a packet via the cluster * * This is used in the outgoing packet and relaying logic in Switch to * relay packets to other cluster members. It isn't PROXY_SEND-- that is @@ -287,7 +315,7 @@ public: * @param len Length of packet or fragment * @param unite If true, also request proxy unite across cluster */ - void sendViaCluster(const Address &fromPeerAddress,const Address &toPeerAddress,const void *data,unsigned int len,bool unite); + void relayViaCluster(const Address &fromPeerAddress,const Address &toPeerAddress,const void *data,unsigned int len,bool unite); /** * Send a distributed query to other cluster members @@ -398,7 +426,15 @@ private: std::vector _memberIds; Mutex _memberIds_m; - std::map< std::pair,uint64_t > _remotePeers; // we need ordered behavior and lower_bound here + struct _RemotePeer + { + _RemotePeer() : lastHavePeerReceived(0),lastSentWantPeer(0) {} + ~_RemotePeer() { Utils::burn(key,ZT_PEER_SECRET_KEY_LENGTH); } + uint64_t lastHavePeerReceived; + uint64_t lastSentWantPeer; + uint8_t key[ZT_PEER_SECRET_KEY_LENGTH]; // secret key from identity agreement + }; + std::map< std::pair,_RemotePeer > _remotePeers; // we need ordered behavior and lower_bound here Mutex _remotePeers_m; uint64_t _lastFlushed; diff --git a/node/Network.cpp b/node/Network.cpp index 320dcf39..c5855418 100644 --- a/node/Network.cpp +++ b/node/Network.cpp @@ -974,7 +974,7 @@ uint64_t Network::handleConfigChunk(const uint64_t packetId,const Address &sourc } #ifdef ZT_ENABLE_CLUSTER - if (source) + if ((source)&&(RR->cluster)) RR->cluster->broadcastNetworkConfigChunk(chunk.field(start,chunk.size() - start),chunk.size() - start); #endif @@ -1007,7 +1007,7 @@ uint64_t Network::handleConfigChunk(const uint64_t packetId,const Address &sourc } #ifdef ZT_ENABLE_CLUSTER - if (source) + if ((source)&&(RR->cluster)) RR->cluster->broadcastNetworkConfigChunk(chunk.field(start,chunk.size() - start),chunk.size() - start); #endif } else { diff --git a/node/Switch.cpp b/node/Switch.cpp index f935b7aa..6df84101 100644 --- a/node/Switch.cpp +++ b/node/Switch.cpp @@ -117,7 +117,7 @@ void Switch::onRemotePacket(const InetAddress &localAddr,const InetAddress &from if ((!relayTo)||(!relayTo->sendDirect(fragment.data(),fragment.size(),now,false))) { #ifdef ZT_ENABLE_CLUSTER if (RR->cluster) { - RR->cluster->sendViaCluster(Address(),destination,fragment.data(),fragment.size(),false); + RR->cluster->relayViaCluster(Address(),destination,fragment.data(),fragment.size(),false); return; } #endif @@ -204,7 +204,6 @@ void Switch::onRemotePacket(const InetAddress &localAddr,const InetAddress &from //TRACE("<< %.16llx %s -> %s (size: %u)",(unsigned long long)packet->packetId(),source.toString().c_str(),destination.toString().c_str(),packet->size()); - if (destination != RR->identity.address()) { if ( (!RR->topology->amRoot()) && (!path->trustEstablished(now)) ) return; @@ -233,7 +232,7 @@ void Switch::onRemotePacket(const InetAddress &localAddr,const InetAddress &from if (shouldUnite) luts = now; } - RR->cluster->sendViaCluster(source,destination,packet.data(),packet.size(),shouldUnite); + RR->cluster->relayViaCluster(source,destination,packet.data(),packet.size(),shouldUnite); return; } #endif @@ -560,7 +559,7 @@ void Switch::onLocalEthernet(const SharedPtr &network,const MAC &from,c } } -void Switch::send(const Packet &packet,bool encrypt) +void Switch::send(Packet &packet,bool encrypt) { if (packet.destination() == RR->identity.address()) { TRACE("BUG: caught attempt to send() to self, ignored"); @@ -687,12 +686,17 @@ Address Switch::_sendWhoisRequest(const Address &addr,const Address *peersAlread return Address(); } -bool Switch::_trySend(const Packet &packet,bool encrypt) +bool Switch::_trySend(Packet &packet,bool encrypt) { - const SharedPtr peer(RR->topology->getPeer(packet.destination())); - if (peer) { - const uint64_t now = RR->node->now(); + SharedPtr viaPath; + const uint64_t now = RR->node->now(); + const Address destination(packet.destination()); +#ifdef ZT_ENABLE_CLUSTER + int clusterMostRecentMemberId = -1; +#endif + const SharedPtr peer(RR->topology->getPeer(destination)); + if (peer) { /* First get the best path, and if it's dead (and this is not a root) * we attempt to re-activate that path but this packet will flow * upstream. If the path comes back alive, it will be used in the future. @@ -700,58 +704,92 @@ bool Switch::_trySend(const Packet &packet,bool encrypt) * to send heartbeats "down" and because we have to at least try to * go somewhere. */ - SharedPtr viaPath(peer->getBestPath(now,false)); + viaPath = peer->getBestPath(now,false); if ( (viaPath) && (!viaPath->alive(now)) && (!RR->topology->isUpstream(peer->identity())) ) { if ((now - viaPath->lastOut()) > std::max((now - viaPath->lastIn()) * 4,(uint64_t)ZT_PATH_MIN_REACTIVATE_INTERVAL)) peer->attemptToContactAt(viaPath->localAddress(),viaPath->address(),now); viaPath.zero(); } + if (!viaPath) { - peer->tryMemorizedPath(now); // periodically attempt memorized or statically defined paths, if any are known - const SharedPtr relay(RR->topology->getUpstreamPeer()); - if ( (!relay) || (!(viaPath = relay->getBestPath(now,false))) ) { - if (!(viaPath = peer->getBestPath(now,true))) - return false; +#ifdef ZT_ENABLE_CLUSTER + if (RR->cluster) + clusterMostRecentMemberId = RR->cluster->prepSendViaCluster(destination,packet,encrypt); + if (clusterMostRecentMemberId < 0) { +#endif + peer->tryMemorizedPath(now); // periodically attempt memorized or statically defined paths, if any are known + const SharedPtr relay(RR->topology->getUpstreamPeer()); + if ( (!relay) || (!(viaPath = relay->getBestPath(now,false))) ) { + if (!(viaPath = peer->getBestPath(now,true))) // last resort: try an expired path... we usually can never get here + return false; + } +#ifdef ZT_ENABLE_CLUSTER } +#endif } + } else { + requestWhois(destination); +#ifndef ZT_ENABLE_CLUSTER + return false; // if we are not in cluster mode, there is no way we can send without knowing the peer directly +#endif + } - Packet tmp(packet); +#ifdef ZT_TRACE +#ifdef ZT_ENABLE_CLUSTER + if ((!viaPath)&&(clusterMostRecentMemberId < 0)) { + TRACE("BUG: both viaPath and clusterMostRecentMemberId ended up invalid in Switch::_trySend()!"); + abort(); + } +#else + if (!viaPath) { + TRACE("BUG: viaPath ended up NULL in Switch::_trySend()!"); + abort(); + } +#endif +#endif - unsigned int chunkSize = std::min(tmp.size(),(unsigned int)ZT_UDP_DEFAULT_PAYLOAD_MTU); - tmp.setFragmented(chunkSize < tmp.size()); + unsigned int chunkSize = std::min(packet.size(),(unsigned int)ZT_UDP_DEFAULT_PAYLOAD_MTU); + packet.setFragmented(chunkSize < packet.size()); - const uint64_t trustedPathId = RR->topology->getOutboundPathTrust(viaPath->address()); - if (trustedPathId) { - tmp.setTrusted(trustedPathId); - } else { - tmp.armor(peer->key(),encrypt); - } + const uint64_t trustedPathId = RR->topology->getOutboundPathTrust(viaPath->address()); + if (trustedPathId) { + packet.setTrusted(trustedPathId); + } else { + packet.armor(peer->key(),encrypt); + } - if (viaPath->send(RR,tmp.data(),chunkSize,now)) { - if (chunkSize < tmp.size()) { - // Too big for one packet, fragment the rest - unsigned int fragStart = chunkSize; - unsigned int remaining = tmp.size() - chunkSize; - unsigned int fragsRemaining = (remaining / (ZT_UDP_DEFAULT_PAYLOAD_MTU - ZT_PROTO_MIN_FRAGMENT_LENGTH)); - if ((fragsRemaining * (ZT_UDP_DEFAULT_PAYLOAD_MTU - ZT_PROTO_MIN_FRAGMENT_LENGTH)) < remaining) - ++fragsRemaining; - const unsigned int totalFragments = fragsRemaining + 1; - - for(unsigned int fno=1;fnosend(RR,packet.data(),chunkSize,now))) || ((clusterMostRecentMemberId >= 0)&&(RR->cluster->sendViaCluster(clusterMostRecentMemberId,destination,packet.data(),chunkSize))) ) { +#else + if (viaPath->send(RR,packet.data(),chunkSize,now)) { +#endif + if (chunkSize < packet.size()) { + // Too big for one packet, fragment the rest + unsigned int fragStart = chunkSize; + unsigned int remaining = packet.size() - chunkSize; + unsigned int fragsRemaining = (remaining / (ZT_UDP_DEFAULT_PAYLOAD_MTU - ZT_PROTO_MIN_FRAGMENT_LENGTH)); + if ((fragsRemaining * (ZT_UDP_DEFAULT_PAYLOAD_MTU - ZT_PROTO_MIN_FRAGMENT_LENGTH)) < remaining) + ++fragsRemaining; + const unsigned int totalFragments = fragsRemaining + 1; + + for(unsigned int fno=1;fnosend(RR,frag.data(),frag.size(),now); - fragStart += chunkSize; - remaining -= chunkSize; - } + else if (clusterMostRecentMemberId >= 0) + RR->cluster->sendViaCluster(clusterMostRecentMemberId,destination,frag.data(),frag.size()); +#else + viaPath->send(RR,frag.data(),frag.size(),now); +#endif + fragStart += chunkSize; + remaining -= chunkSize; } - - return true; } - } else { - requestWhois(packet.destination()); } - return false; + + return true; } bool Switch::_unite(const Address &p1,const Address &p2) diff --git a/node/Switch.hpp b/node/Switch.hpp index f44eef48..422f6c8e 100644 --- a/node/Switch.hpp +++ b/node/Switch.hpp @@ -92,10 +92,10 @@ public: * Needless to say, the packet's source must be this node. Otherwise it * won't be encrypted right. (This is not used for relaying.) * - * @param packet Packet to send + * @param packet Packet to send (buffer may be modified) * @param encrypt Encrypt packet payload? (always true except for HELLO) */ - void send(const Packet &packet,bool encrypt); + void send(Packet &packet,bool encrypt); /** * Request WHOIS on a given address @@ -126,7 +126,7 @@ public: private: Address _sendWhoisRequest(const Address &addr,const Address *peersAlreadyConsulted,unsigned int numPeersAlreadyConsulted); - bool _trySend(const Packet &packet,bool encrypt); + bool _trySend(Packet &packet,bool encrypt); // packet is modified if return is true bool _unite(const Address &p1,const Address &p2); const RuntimeEnvironment *const RR; -- cgit v1.2.3