diff options
Diffstat (limited to 'node/Cluster.cpp')
-rw-r--r-- | node/Cluster.cpp | 239 |
1 files changed, 178 insertions, 61 deletions
diff --git a/node/Cluster.cpp b/node/Cluster.cpp index 2a261e51..52e03ffe 100644 --- a/node/Cluster.cpp +++ b/node/Cluster.cpp @@ -44,6 +44,7 @@ #include "Packet.hpp" #include "Switch.hpp" #include "Node.hpp" +#include "Network.hpp" #include "Array.hpp" namespace ZeroTier { @@ -254,7 +255,7 @@ void Cluster::handleIncomingStateMessage(const void *msg,unsigned int len) // One-time-use Poly1305 key from first 32 bytes of Salsa20 keystream (as per DJB/NaCl "standard") char polykey[ZT_POLY1305_KEY_LEN]; memset(polykey,0,sizeof(polykey)); - s20.encrypt12(polykey,polykey,sizeof(polykey)); + s20.crypt12(polykey,polykey,sizeof(polykey)); // Compute 16-byte MAC char mac[ZT_POLY1305_MAC_LEN]; @@ -266,7 +267,7 @@ void Cluster::handleIncomingStateMessage(const void *msg,unsigned int len) // Decrypt! dmsg.setSize(len - 24); - s20.decrypt12(reinterpret_cast<const char *>(msg) + 24,const_cast<void *>(dmsg.data()),dmsg.size()); + s20.crypt12(reinterpret_cast<const char *>(msg) + 24,const_cast<void *>(dmsg.data()),dmsg.size()); } if (dmsg.size() < 4) @@ -341,17 +342,20 @@ void Cluster::handleIncomingStateMessage(const void *msg,unsigned int len) Identity id; ptr += id.deserialize(dmsg,ptr); if (id) { - RR->topology->saveIdentity(id); - { Mutex::Lock _l(_remotePeers_m); - _remotePeers[std::pair<Address,unsigned int>(id.address(),(unsigned int)fromMemberId)] = RR->node->now(); + _RemotePeer &rp = _remotePeers[std::pair<Address,unsigned int>(id.address(),(unsigned int)fromMemberId)]; + if (!rp.lastHavePeerReceived) { + RR->topology->saveIdentity(id); + RR->identity.agree(id,rp.key,ZT_PEER_SECRET_KEY_LENGTH); + } + rp.lastHavePeerReceived = RR->node->now(); } _ClusterSendQueueEntry *q[16384]; // 16384 is "tons" unsigned int qc = _sendQueue->getByDest(id.address(),q,16384); for(unsigned int i=0;i<qc;++i) - this->sendViaCluster(q[i]->fromPeerAddress,q[i]->toPeerAddress,q[i]->data,q[i]->len,q[i]->unite); + this->relayViaCluster(q[i]->fromPeerAddress,q[i]->toPeerAddress,q[i]->data,q[i]->len,q[i]->unite); _sendQueue->returnToPool(q,qc); TRACE("[%u] has %s (retried %u queued sends)",(unsigned int)fromMemberId,id.address().toString().c_str(),qc); @@ -396,7 +400,7 @@ void Cluster::handleIncomingStateMessage(const void *msg,unsigned int len) SharedPtr<Peer> localPeer(RR->topology->getPeerNoCache(localPeerAddress)); if ((localPeer)&&(numRemotePeerPaths > 0)) { InetAddress bestLocalV4,bestLocalV6; - localPeer->getBestActiveAddresses(now,bestLocalV4,bestLocalV6); + localPeer->getRendezvousAddresses(now,bestLocalV4,bestLocalV6); InetAddress bestRemoteV4,bestRemoteV6; for(unsigned int i=0;i<numRemotePeerPaths;++i) { @@ -469,6 +473,15 @@ void Cluster::handleIncomingStateMessage(const void *msg,unsigned int len) RR->sw->send(outp,true); //TRACE("[%u] proxy send %s to %s length %u",(unsigned int)fromMemberId,Packet::verbString(verb),rcpt.toString().c_str(),len); } break; + + case CLUSTER_MESSAGE_NETWORK_CONFIG: { + const SharedPtr<Network> network(RR->node->network(dmsg.at<uint64_t>(ptr))); + if (network) { + // Copy into a Packet just to conform to Network API. Eventually + // will want to refactor. + network->handleConfigChunk(0,Address(),Buffer<ZT_PROTO_MAX_PACKET_LENGTH>(dmsg),ptr); + } + } break; } } catch ( ... ) { TRACE("invalid message of size %u type %d (inner decode), discarding",mlen,mtype); @@ -494,7 +507,84 @@ void Cluster::broadcastHavePeer(const Identity &id) } } -void Cluster::sendViaCluster(const Address &fromPeerAddress,const Address &toPeerAddress,const void *data,unsigned int len,bool unite) +void Cluster::broadcastNetworkConfigChunk(const void *chunk,unsigned int len) +{ + Mutex::Lock _l(_memberIds_m); + for(std::vector<uint16_t>::const_iterator mid(_memberIds.begin());mid!=_memberIds.end();++mid) { + Mutex::Lock _l2(_members[*mid].lock); + _send(*mid,CLUSTER_MESSAGE_NETWORK_CONFIG,chunk,len); + } +} + +int Cluster::checkSendViaCluster(const Address &toPeerAddress,uint64_t &mostRecentTs,void *peerSecret) +{ + const uint64_t now = RR->node->now(); + mostRecentTs = 0; + int mostRecentMemberId = -1; + { + Mutex::Lock _l2(_remotePeers_m); + std::map< std::pair<Address,unsigned int>,_RemotePeer >::const_iterator rpe(_remotePeers.lower_bound(std::pair<Address,unsigned int>(toPeerAddress,0))); + for(;;) { + if ((rpe == _remotePeers.end())||(rpe->first.first != toPeerAddress)) + break; + else if (rpe->second.lastHavePeerReceived > mostRecentTs) { + mostRecentTs = rpe->second.lastHavePeerReceived; + memcpy(peerSecret,rpe->second.key,ZT_PEER_SECRET_KEY_LENGTH); + mostRecentMemberId = (int)rpe->first.second; + } + ++rpe; + } + } + + const uint64_t ageOfMostRecentHavePeerAnnouncement = now - mostRecentTs; + if (ageOfMostRecentHavePeerAnnouncement >= (ZT_PEER_ACTIVITY_TIMEOUT / 3)) { + if (ageOfMostRecentHavePeerAnnouncement >= ZT_PEER_ACTIVITY_TIMEOUT) + mostRecentMemberId = -1; + + bool sendWantPeer = true; + { + Mutex::Lock _l(_remotePeers_m); + _RemotePeer &rp = _remotePeers[std::pair<Address,unsigned int>(toPeerAddress,(unsigned int)_id)]; + if ((now - rp.lastSentWantPeer) >= ZT_CLUSTER_WANT_PEER_EVERY) { + rp.lastSentWantPeer = now; + } else { + sendWantPeer = false; // don't flood WANT_PEER + } + } + if (sendWantPeer) { + char tmp[ZT_ADDRESS_LENGTH]; + toPeerAddress.copyTo(tmp,ZT_ADDRESS_LENGTH); + { + Mutex::Lock _l(_memberIds_m); + for(std::vector<uint16_t>::const_iterator mid(_memberIds.begin());mid!=_memberIds.end();++mid) { + Mutex::Lock _l2(_members[*mid].lock); + _send(*mid,CLUSTER_MESSAGE_WANT_PEER,tmp,ZT_ADDRESS_LENGTH); + } + } + } + } + + return mostRecentMemberId; +} + +bool Cluster::sendViaCluster(int mostRecentMemberId,const Address &toPeerAddress,const void *data,unsigned int len) +{ + if ((mostRecentMemberId < 0)||(mostRecentMemberId >= ZT_CLUSTER_MAX_MEMBERS)) // sanity check + return false; + Mutex::Lock _l2(_members[mostRecentMemberId].lock); + for(std::vector<InetAddress>::const_iterator i1(_zeroTierPhysicalEndpoints.begin());i1!=_zeroTierPhysicalEndpoints.end();++i1) { + for(std::vector<InetAddress>::const_iterator i2(_members[mostRecentMemberId].zeroTierPhysicalEndpoints.begin());i2!=_members[mostRecentMemberId].zeroTierPhysicalEndpoints.end();++i2) { + if (i1->ss_family == i2->ss_family) { + TRACE("sendViaCluster sending %u bytes to %s by way of %u (%s->%s)",len,toPeerAddress.toString().c_str(),(unsigned int)mostRecentMemberId,i1->toString().c_str(),i2->toString().c_str()); + RR->node->putPacket(*i1,*i2,data,len); + return true; + } + } + } + return false; +} + +void Cluster::relayViaCluster(const Address &fromPeerAddress,const Address &toPeerAddress,const void *data,unsigned int len,bool unite) { if (len > ZT_PROTO_MAX_PACKET_LENGTH) // sanity check return; @@ -502,87 +592,101 @@ void Cluster::sendViaCluster(const Address &fromPeerAddress,const Address &toPee const uint64_t now = RR->node->now(); uint64_t mostRecentTs = 0; - unsigned int mostRecentMemberId = 0xffffffff; + int mostRecentMemberId = -1; { Mutex::Lock _l2(_remotePeers_m); - std::map< std::pair<Address,unsigned int>,uint64_t >::const_iterator rpe(_remotePeers.lower_bound(std::pair<Address,unsigned int>(toPeerAddress,0))); + std::map< std::pair<Address,unsigned int>,_RemotePeer >::const_iterator rpe(_remotePeers.lower_bound(std::pair<Address,unsigned int>(toPeerAddress,0))); for(;;) { if ((rpe == _remotePeers.end())||(rpe->first.first != toPeerAddress)) break; - else if (rpe->second > mostRecentTs) { - mostRecentTs = rpe->second; - mostRecentMemberId = rpe->first.second; + else if (rpe->second.lastHavePeerReceived > mostRecentTs) { + mostRecentTs = rpe->second.lastHavePeerReceived; + mostRecentMemberId = (int)rpe->first.second; } ++rpe; } } - const uint64_t age = now - mostRecentTs; - if (age >= (ZT_PEER_ACTIVITY_TIMEOUT / 3)) { - const bool enqueueAndWait = ((age >= ZT_PEER_ACTIVITY_TIMEOUT)||(mostRecentMemberId > 0xffff)); + const uint64_t ageOfMostRecentHavePeerAnnouncement = now - mostRecentTs; + if (ageOfMostRecentHavePeerAnnouncement >= (ZT_PEER_ACTIVITY_TIMEOUT / 3)) { + // Enqueue and wait if peer seems alive, but do WANT_PEER to refresh homing + const bool enqueueAndWait = ((ageOfMostRecentHavePeerAnnouncement >= ZT_PEER_ACTIVITY_TIMEOUT)||(mostRecentMemberId < 0)); // Poll everyone with WANT_PEER if the age of our most recent entry is // approaching expiration (or has expired, or does not exist). - char tmp[ZT_ADDRESS_LENGTH]; - toPeerAddress.copyTo(tmp,ZT_ADDRESS_LENGTH); + bool sendWantPeer = true; { - Mutex::Lock _l(_memberIds_m); - for(std::vector<uint16_t>::const_iterator mid(_memberIds.begin());mid!=_memberIds.end();++mid) { - Mutex::Lock _l2(_members[*mid].lock); - _send(*mid,CLUSTER_MESSAGE_WANT_PEER,tmp,ZT_ADDRESS_LENGTH); + Mutex::Lock _l(_remotePeers_m); + _RemotePeer &rp = _remotePeers[std::pair<Address,unsigned int>(toPeerAddress,(unsigned int)_id)]; + if ((now - rp.lastSentWantPeer) >= ZT_CLUSTER_WANT_PEER_EVERY) { + rp.lastSentWantPeer = now; + } else { + sendWantPeer = false; // don't flood WANT_PEER + } + } + if (sendWantPeer) { + char tmp[ZT_ADDRESS_LENGTH]; + toPeerAddress.copyTo(tmp,ZT_ADDRESS_LENGTH); + { + Mutex::Lock _l(_memberIds_m); + for(std::vector<uint16_t>::const_iterator mid(_memberIds.begin());mid!=_memberIds.end();++mid) { + Mutex::Lock _l2(_members[*mid].lock); + _send(*mid,CLUSTER_MESSAGE_WANT_PEER,tmp,ZT_ADDRESS_LENGTH); + } } } // If there isn't a good place to send via, then enqueue this for retrying // later and return after having broadcasted a WANT_PEER. if (enqueueAndWait) { - TRACE("sendViaCluster %s -> %s enqueueing to wait for HAVE_PEER",fromPeerAddress.toString().c_str(),toPeerAddress.toString().c_str()); + TRACE("relayViaCluster %s -> %s enqueueing to wait for HAVE_PEER",fromPeerAddress.toString().c_str(),toPeerAddress.toString().c_str()); _sendQueue->enqueue(now,fromPeerAddress,toPeerAddress,data,len,unite); return; } } - Buffer<1024> buf; - if (unite) { - InetAddress v4,v6; - if (fromPeerAddress) { - SharedPtr<Peer> fromPeer(RR->topology->getPeerNoCache(fromPeerAddress)); - if (fromPeer) - fromPeer->getBestActiveAddresses(now,v4,v6); - } - uint8_t addrCount = 0; - if (v4) - ++addrCount; - if (v6) - ++addrCount; - if (addrCount) { - toPeerAddress.appendTo(buf); - fromPeerAddress.appendTo(buf); - buf.append(addrCount); + if (mostRecentMemberId >= 0) { + Buffer<1024> buf; + if (unite) { + InetAddress v4,v6; + if (fromPeerAddress) { + SharedPtr<Peer> fromPeer(RR->topology->getPeerNoCache(fromPeerAddress)); + if (fromPeer) + fromPeer->getRendezvousAddresses(now,v4,v6); + } + uint8_t addrCount = 0; if (v4) - v4.serialize(buf); + ++addrCount; if (v6) - v6.serialize(buf); + ++addrCount; + if (addrCount) { + toPeerAddress.appendTo(buf); + fromPeerAddress.appendTo(buf); + buf.append(addrCount); + if (v4) + v4.serialize(buf); + if (v6) + v6.serialize(buf); + } } - } - { - Mutex::Lock _l2(_members[mostRecentMemberId].lock); - if (buf.size() > 0) - _send(mostRecentMemberId,CLUSTER_MESSAGE_PROXY_UNITE,buf.data(),buf.size()); - - for(std::vector<InetAddress>::const_iterator i1(_zeroTierPhysicalEndpoints.begin());i1!=_zeroTierPhysicalEndpoints.end();++i1) { - for(std::vector<InetAddress>::const_iterator i2(_members[mostRecentMemberId].zeroTierPhysicalEndpoints.begin());i2!=_members[mostRecentMemberId].zeroTierPhysicalEndpoints.end();++i2) { - if (i1->ss_family == i2->ss_family) { - TRACE("sendViaCluster relaying %u bytes from %s to %s by way of %u (%s->%s)",len,fromPeerAddress.toString().c_str(),toPeerAddress.toString().c_str(),(unsigned int)mostRecentMemberId,i1->toString().c_str(),i2->toString().c_str()); - RR->node->putPacket(*i1,*i2,data,len); - return; + { + Mutex::Lock _l2(_members[mostRecentMemberId].lock); + if (buf.size() > 0) + _send(mostRecentMemberId,CLUSTER_MESSAGE_PROXY_UNITE,buf.data(),buf.size()); + + for(std::vector<InetAddress>::const_iterator i1(_zeroTierPhysicalEndpoints.begin());i1!=_zeroTierPhysicalEndpoints.end();++i1) { + for(std::vector<InetAddress>::const_iterator i2(_members[mostRecentMemberId].zeroTierPhysicalEndpoints.begin());i2!=_members[mostRecentMemberId].zeroTierPhysicalEndpoints.end();++i2) { + if (i1->ss_family == i2->ss_family) { + TRACE("relayViaCluster relaying %u bytes from %s to %s by way of %u (%s->%s)",len,fromPeerAddress.toString().c_str(),toPeerAddress.toString().c_str(),(unsigned int)mostRecentMemberId,i1->toString().c_str(),i2->toString().c_str()); + RR->node->putPacket(*i1,*i2,data,len); + return; + } } } - } - TRACE("sendViaCluster relaying %u bytes from %s to %s by way of %u failed: no common endpoints with the same address family!",len,fromPeerAddress.toString().c_str(),toPeerAddress.toString().c_str(),(unsigned int)mostRecentMemberId); - return; + TRACE("relayViaCluster relaying %u bytes from %s to %s by way of %u failed: no common endpoints with the same address family!",len,fromPeerAddress.toString().c_str(),toPeerAddress.toString().c_str(),(unsigned int)mostRecentMemberId); + } } } @@ -644,8 +748,8 @@ void Cluster::doPeriodicTasks() _lastCleanedRemotePeers = now; Mutex::Lock _l(_remotePeers_m); - for(std::map< std::pair<Address,unsigned int>,uint64_t >::iterator rp(_remotePeers.begin());rp!=_remotePeers.end();) { - if ((now - rp->second) >= ZT_PEER_ACTIVITY_TIMEOUT) + for(std::map< std::pair<Address,unsigned int>,_RemotePeer >::iterator rp(_remotePeers.begin());rp!=_remotePeers.end();) { + if ((now - rp->second.lastHavePeerReceived) >= ZT_PEER_ACTIVITY_TIMEOUT) _remotePeers.erase(rp++); else ++rp; } @@ -758,6 +862,19 @@ bool Cluster::findBetterEndpoint(InetAddress &redirectTo,const Address &peerAddr } } +bool Cluster::isClusterPeerFrontplane(const InetAddress &ip) const +{ + Mutex::Lock _l(_memberIds_m); + for(std::vector<uint16_t>::const_iterator mid(_memberIds.begin());mid!=_memberIds.end();++mid) { + Mutex::Lock _l2(_members[*mid].lock); + for(std::vector<InetAddress>::const_iterator i2(_members[*mid].zeroTierPhysicalEndpoints.begin());i2!=_members[*mid].zeroTierPhysicalEndpoints.end();++i2) { + if (ip == *i2) + return true; + } + } + return false; +} + void Cluster::status(ZT_ClusterStatus &status) const { const uint64_t now = RR->node->now(); @@ -837,10 +954,10 @@ void Cluster::_flush(uint16_t memberId) // One-time-use Poly1305 key from first 32 bytes of Salsa20 keystream (as per DJB/NaCl "standard") char polykey[ZT_POLY1305_KEY_LEN]; memset(polykey,0,sizeof(polykey)); - s20.encrypt12(polykey,polykey,sizeof(polykey)); + s20.crypt12(polykey,polykey,sizeof(polykey)); // Encrypt m.q in place - s20.encrypt12(reinterpret_cast<const char *>(m.q.data()) + 24,const_cast<char *>(reinterpret_cast<const char *>(m.q.data())) + 24,m.q.size() - 24); + s20.crypt12(reinterpret_cast<const char *>(m.q.data()) + 24,const_cast<char *>(reinterpret_cast<const char *>(m.q.data())) + 24,m.q.size() - 24); // Add MAC for authentication (encrypt-then-MAC) char mac[ZT_POLY1305_MAC_LEN]; |