summaryrefslogtreecommitdiff
path: root/node/Cluster.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'node/Cluster.cpp')
-rw-r--r--node/Cluster.cpp239
1 files changed, 178 insertions, 61 deletions
diff --git a/node/Cluster.cpp b/node/Cluster.cpp
index 2a261e51..52e03ffe 100644
--- a/node/Cluster.cpp
+++ b/node/Cluster.cpp
@@ -44,6 +44,7 @@
#include "Packet.hpp"
#include "Switch.hpp"
#include "Node.hpp"
+#include "Network.hpp"
#include "Array.hpp"
namespace ZeroTier {
@@ -254,7 +255,7 @@ void Cluster::handleIncomingStateMessage(const void *msg,unsigned int len)
// One-time-use Poly1305 key from first 32 bytes of Salsa20 keystream (as per DJB/NaCl "standard")
char polykey[ZT_POLY1305_KEY_LEN];
memset(polykey,0,sizeof(polykey));
- s20.encrypt12(polykey,polykey,sizeof(polykey));
+ s20.crypt12(polykey,polykey,sizeof(polykey));
// Compute 16-byte MAC
char mac[ZT_POLY1305_MAC_LEN];
@@ -266,7 +267,7 @@ void Cluster::handleIncomingStateMessage(const void *msg,unsigned int len)
// Decrypt!
dmsg.setSize(len - 24);
- s20.decrypt12(reinterpret_cast<const char *>(msg) + 24,const_cast<void *>(dmsg.data()),dmsg.size());
+ s20.crypt12(reinterpret_cast<const char *>(msg) + 24,const_cast<void *>(dmsg.data()),dmsg.size());
}
if (dmsg.size() < 4)
@@ -341,17 +342,20 @@ void Cluster::handleIncomingStateMessage(const void *msg,unsigned int len)
Identity id;
ptr += id.deserialize(dmsg,ptr);
if (id) {
- RR->topology->saveIdentity(id);
-
{
Mutex::Lock _l(_remotePeers_m);
- _remotePeers[std::pair<Address,unsigned int>(id.address(),(unsigned int)fromMemberId)] = RR->node->now();
+ _RemotePeer &rp = _remotePeers[std::pair<Address,unsigned int>(id.address(),(unsigned int)fromMemberId)];
+ if (!rp.lastHavePeerReceived) {
+ RR->topology->saveIdentity(id);
+ RR->identity.agree(id,rp.key,ZT_PEER_SECRET_KEY_LENGTH);
+ }
+ rp.lastHavePeerReceived = RR->node->now();
}
_ClusterSendQueueEntry *q[16384]; // 16384 is "tons"
unsigned int qc = _sendQueue->getByDest(id.address(),q,16384);
for(unsigned int i=0;i<qc;++i)
- this->sendViaCluster(q[i]->fromPeerAddress,q[i]->toPeerAddress,q[i]->data,q[i]->len,q[i]->unite);
+ this->relayViaCluster(q[i]->fromPeerAddress,q[i]->toPeerAddress,q[i]->data,q[i]->len,q[i]->unite);
_sendQueue->returnToPool(q,qc);
TRACE("[%u] has %s (retried %u queued sends)",(unsigned int)fromMemberId,id.address().toString().c_str(),qc);
@@ -396,7 +400,7 @@ void Cluster::handleIncomingStateMessage(const void *msg,unsigned int len)
SharedPtr<Peer> localPeer(RR->topology->getPeerNoCache(localPeerAddress));
if ((localPeer)&&(numRemotePeerPaths > 0)) {
InetAddress bestLocalV4,bestLocalV6;
- localPeer->getBestActiveAddresses(now,bestLocalV4,bestLocalV6);
+ localPeer->getRendezvousAddresses(now,bestLocalV4,bestLocalV6);
InetAddress bestRemoteV4,bestRemoteV6;
for(unsigned int i=0;i<numRemotePeerPaths;++i) {
@@ -469,6 +473,15 @@ void Cluster::handleIncomingStateMessage(const void *msg,unsigned int len)
RR->sw->send(outp,true);
//TRACE("[%u] proxy send %s to %s length %u",(unsigned int)fromMemberId,Packet::verbString(verb),rcpt.toString().c_str(),len);
} break;
+
+ case CLUSTER_MESSAGE_NETWORK_CONFIG: {
+ const SharedPtr<Network> network(RR->node->network(dmsg.at<uint64_t>(ptr)));
+ if (network) {
+ // Copy into a Packet just to conform to Network API. Eventually
+ // will want to refactor.
+ network->handleConfigChunk(0,Address(),Buffer<ZT_PROTO_MAX_PACKET_LENGTH>(dmsg),ptr);
+ }
+ } break;
}
} catch ( ... ) {
TRACE("invalid message of size %u type %d (inner decode), discarding",mlen,mtype);
@@ -494,7 +507,84 @@ void Cluster::broadcastHavePeer(const Identity &id)
}
}
-void Cluster::sendViaCluster(const Address &fromPeerAddress,const Address &toPeerAddress,const void *data,unsigned int len,bool unite)
+void Cluster::broadcastNetworkConfigChunk(const void *chunk,unsigned int len)
+{
+ Mutex::Lock _l(_memberIds_m);
+ for(std::vector<uint16_t>::const_iterator mid(_memberIds.begin());mid!=_memberIds.end();++mid) {
+ Mutex::Lock _l2(_members[*mid].lock);
+ _send(*mid,CLUSTER_MESSAGE_NETWORK_CONFIG,chunk,len);
+ }
+}
+
+int Cluster::checkSendViaCluster(const Address &toPeerAddress,uint64_t &mostRecentTs,void *peerSecret)
+{
+ const uint64_t now = RR->node->now();
+ mostRecentTs = 0;
+ int mostRecentMemberId = -1;
+ {
+ Mutex::Lock _l2(_remotePeers_m);
+ std::map< std::pair<Address,unsigned int>,_RemotePeer >::const_iterator rpe(_remotePeers.lower_bound(std::pair<Address,unsigned int>(toPeerAddress,0)));
+ for(;;) {
+ if ((rpe == _remotePeers.end())||(rpe->first.first != toPeerAddress))
+ break;
+ else if (rpe->second.lastHavePeerReceived > mostRecentTs) {
+ mostRecentTs = rpe->second.lastHavePeerReceived;
+ memcpy(peerSecret,rpe->second.key,ZT_PEER_SECRET_KEY_LENGTH);
+ mostRecentMemberId = (int)rpe->first.second;
+ }
+ ++rpe;
+ }
+ }
+
+ const uint64_t ageOfMostRecentHavePeerAnnouncement = now - mostRecentTs;
+ if (ageOfMostRecentHavePeerAnnouncement >= (ZT_PEER_ACTIVITY_TIMEOUT / 3)) {
+ if (ageOfMostRecentHavePeerAnnouncement >= ZT_PEER_ACTIVITY_TIMEOUT)
+ mostRecentMemberId = -1;
+
+ bool sendWantPeer = true;
+ {
+ Mutex::Lock _l(_remotePeers_m);
+ _RemotePeer &rp = _remotePeers[std::pair<Address,unsigned int>(toPeerAddress,(unsigned int)_id)];
+ if ((now - rp.lastSentWantPeer) >= ZT_CLUSTER_WANT_PEER_EVERY) {
+ rp.lastSentWantPeer = now;
+ } else {
+ sendWantPeer = false; // don't flood WANT_PEER
+ }
+ }
+ if (sendWantPeer) {
+ char tmp[ZT_ADDRESS_LENGTH];
+ toPeerAddress.copyTo(tmp,ZT_ADDRESS_LENGTH);
+ {
+ Mutex::Lock _l(_memberIds_m);
+ for(std::vector<uint16_t>::const_iterator mid(_memberIds.begin());mid!=_memberIds.end();++mid) {
+ Mutex::Lock _l2(_members[*mid].lock);
+ _send(*mid,CLUSTER_MESSAGE_WANT_PEER,tmp,ZT_ADDRESS_LENGTH);
+ }
+ }
+ }
+ }
+
+ return mostRecentMemberId;
+}
+
+bool Cluster::sendViaCluster(int mostRecentMemberId,const Address &toPeerAddress,const void *data,unsigned int len)
+{
+ if ((mostRecentMemberId < 0)||(mostRecentMemberId >= ZT_CLUSTER_MAX_MEMBERS)) // sanity check
+ return false;
+ Mutex::Lock _l2(_members[mostRecentMemberId].lock);
+ for(std::vector<InetAddress>::const_iterator i1(_zeroTierPhysicalEndpoints.begin());i1!=_zeroTierPhysicalEndpoints.end();++i1) {
+ for(std::vector<InetAddress>::const_iterator i2(_members[mostRecentMemberId].zeroTierPhysicalEndpoints.begin());i2!=_members[mostRecentMemberId].zeroTierPhysicalEndpoints.end();++i2) {
+ if (i1->ss_family == i2->ss_family) {
+ TRACE("sendViaCluster sending %u bytes to %s by way of %u (%s->%s)",len,toPeerAddress.toString().c_str(),(unsigned int)mostRecentMemberId,i1->toString().c_str(),i2->toString().c_str());
+ RR->node->putPacket(*i1,*i2,data,len);
+ return true;
+ }
+ }
+ }
+ return false;
+}
+
+void Cluster::relayViaCluster(const Address &fromPeerAddress,const Address &toPeerAddress,const void *data,unsigned int len,bool unite)
{
if (len > ZT_PROTO_MAX_PACKET_LENGTH) // sanity check
return;
@@ -502,87 +592,101 @@ void Cluster::sendViaCluster(const Address &fromPeerAddress,const Address &toPee
const uint64_t now = RR->node->now();
uint64_t mostRecentTs = 0;
- unsigned int mostRecentMemberId = 0xffffffff;
+ int mostRecentMemberId = -1;
{
Mutex::Lock _l2(_remotePeers_m);
- std::map< std::pair<Address,unsigned int>,uint64_t >::const_iterator rpe(_remotePeers.lower_bound(std::pair<Address,unsigned int>(toPeerAddress,0)));
+ std::map< std::pair<Address,unsigned int>,_RemotePeer >::const_iterator rpe(_remotePeers.lower_bound(std::pair<Address,unsigned int>(toPeerAddress,0)));
for(;;) {
if ((rpe == _remotePeers.end())||(rpe->first.first != toPeerAddress))
break;
- else if (rpe->second > mostRecentTs) {
- mostRecentTs = rpe->second;
- mostRecentMemberId = rpe->first.second;
+ else if (rpe->second.lastHavePeerReceived > mostRecentTs) {
+ mostRecentTs = rpe->second.lastHavePeerReceived;
+ mostRecentMemberId = (int)rpe->first.second;
}
++rpe;
}
}
- const uint64_t age = now - mostRecentTs;
- if (age >= (ZT_PEER_ACTIVITY_TIMEOUT / 3)) {
- const bool enqueueAndWait = ((age >= ZT_PEER_ACTIVITY_TIMEOUT)||(mostRecentMemberId > 0xffff));
+ const uint64_t ageOfMostRecentHavePeerAnnouncement = now - mostRecentTs;
+ if (ageOfMostRecentHavePeerAnnouncement >= (ZT_PEER_ACTIVITY_TIMEOUT / 3)) {
+ // Enqueue and wait if peer seems alive, but do WANT_PEER to refresh homing
+ const bool enqueueAndWait = ((ageOfMostRecentHavePeerAnnouncement >= ZT_PEER_ACTIVITY_TIMEOUT)||(mostRecentMemberId < 0));
// Poll everyone with WANT_PEER if the age of our most recent entry is
// approaching expiration (or has expired, or does not exist).
- char tmp[ZT_ADDRESS_LENGTH];
- toPeerAddress.copyTo(tmp,ZT_ADDRESS_LENGTH);
+ bool sendWantPeer = true;
{
- Mutex::Lock _l(_memberIds_m);
- for(std::vector<uint16_t>::const_iterator mid(_memberIds.begin());mid!=_memberIds.end();++mid) {
- Mutex::Lock _l2(_members[*mid].lock);
- _send(*mid,CLUSTER_MESSAGE_WANT_PEER,tmp,ZT_ADDRESS_LENGTH);
+ Mutex::Lock _l(_remotePeers_m);
+ _RemotePeer &rp = _remotePeers[std::pair<Address,unsigned int>(toPeerAddress,(unsigned int)_id)];
+ if ((now - rp.lastSentWantPeer) >= ZT_CLUSTER_WANT_PEER_EVERY) {
+ rp.lastSentWantPeer = now;
+ } else {
+ sendWantPeer = false; // don't flood WANT_PEER
+ }
+ }
+ if (sendWantPeer) {
+ char tmp[ZT_ADDRESS_LENGTH];
+ toPeerAddress.copyTo(tmp,ZT_ADDRESS_LENGTH);
+ {
+ Mutex::Lock _l(_memberIds_m);
+ for(std::vector<uint16_t>::const_iterator mid(_memberIds.begin());mid!=_memberIds.end();++mid) {
+ Mutex::Lock _l2(_members[*mid].lock);
+ _send(*mid,CLUSTER_MESSAGE_WANT_PEER,tmp,ZT_ADDRESS_LENGTH);
+ }
}
}
// If there isn't a good place to send via, then enqueue this for retrying
// later and return after having broadcasted a WANT_PEER.
if (enqueueAndWait) {
- TRACE("sendViaCluster %s -> %s enqueueing to wait for HAVE_PEER",fromPeerAddress.toString().c_str(),toPeerAddress.toString().c_str());
+ TRACE("relayViaCluster %s -> %s enqueueing to wait for HAVE_PEER",fromPeerAddress.toString().c_str(),toPeerAddress.toString().c_str());
_sendQueue->enqueue(now,fromPeerAddress,toPeerAddress,data,len,unite);
return;
}
}
- Buffer<1024> buf;
- if (unite) {
- InetAddress v4,v6;
- if (fromPeerAddress) {
- SharedPtr<Peer> fromPeer(RR->topology->getPeerNoCache(fromPeerAddress));
- if (fromPeer)
- fromPeer->getBestActiveAddresses(now,v4,v6);
- }
- uint8_t addrCount = 0;
- if (v4)
- ++addrCount;
- if (v6)
- ++addrCount;
- if (addrCount) {
- toPeerAddress.appendTo(buf);
- fromPeerAddress.appendTo(buf);
- buf.append(addrCount);
+ if (mostRecentMemberId >= 0) {
+ Buffer<1024> buf;
+ if (unite) {
+ InetAddress v4,v6;
+ if (fromPeerAddress) {
+ SharedPtr<Peer> fromPeer(RR->topology->getPeerNoCache(fromPeerAddress));
+ if (fromPeer)
+ fromPeer->getRendezvousAddresses(now,v4,v6);
+ }
+ uint8_t addrCount = 0;
if (v4)
- v4.serialize(buf);
+ ++addrCount;
if (v6)
- v6.serialize(buf);
+ ++addrCount;
+ if (addrCount) {
+ toPeerAddress.appendTo(buf);
+ fromPeerAddress.appendTo(buf);
+ buf.append(addrCount);
+ if (v4)
+ v4.serialize(buf);
+ if (v6)
+ v6.serialize(buf);
+ }
}
- }
- {
- Mutex::Lock _l2(_members[mostRecentMemberId].lock);
- if (buf.size() > 0)
- _send(mostRecentMemberId,CLUSTER_MESSAGE_PROXY_UNITE,buf.data(),buf.size());
-
- for(std::vector<InetAddress>::const_iterator i1(_zeroTierPhysicalEndpoints.begin());i1!=_zeroTierPhysicalEndpoints.end();++i1) {
- for(std::vector<InetAddress>::const_iterator i2(_members[mostRecentMemberId].zeroTierPhysicalEndpoints.begin());i2!=_members[mostRecentMemberId].zeroTierPhysicalEndpoints.end();++i2) {
- if (i1->ss_family == i2->ss_family) {
- TRACE("sendViaCluster relaying %u bytes from %s to %s by way of %u (%s->%s)",len,fromPeerAddress.toString().c_str(),toPeerAddress.toString().c_str(),(unsigned int)mostRecentMemberId,i1->toString().c_str(),i2->toString().c_str());
- RR->node->putPacket(*i1,*i2,data,len);
- return;
+ {
+ Mutex::Lock _l2(_members[mostRecentMemberId].lock);
+ if (buf.size() > 0)
+ _send(mostRecentMemberId,CLUSTER_MESSAGE_PROXY_UNITE,buf.data(),buf.size());
+
+ for(std::vector<InetAddress>::const_iterator i1(_zeroTierPhysicalEndpoints.begin());i1!=_zeroTierPhysicalEndpoints.end();++i1) {
+ for(std::vector<InetAddress>::const_iterator i2(_members[mostRecentMemberId].zeroTierPhysicalEndpoints.begin());i2!=_members[mostRecentMemberId].zeroTierPhysicalEndpoints.end();++i2) {
+ if (i1->ss_family == i2->ss_family) {
+ TRACE("relayViaCluster relaying %u bytes from %s to %s by way of %u (%s->%s)",len,fromPeerAddress.toString().c_str(),toPeerAddress.toString().c_str(),(unsigned int)mostRecentMemberId,i1->toString().c_str(),i2->toString().c_str());
+ RR->node->putPacket(*i1,*i2,data,len);
+ return;
+ }
}
}
- }
- TRACE("sendViaCluster relaying %u bytes from %s to %s by way of %u failed: no common endpoints with the same address family!",len,fromPeerAddress.toString().c_str(),toPeerAddress.toString().c_str(),(unsigned int)mostRecentMemberId);
- return;
+ TRACE("relayViaCluster relaying %u bytes from %s to %s by way of %u failed: no common endpoints with the same address family!",len,fromPeerAddress.toString().c_str(),toPeerAddress.toString().c_str(),(unsigned int)mostRecentMemberId);
+ }
}
}
@@ -644,8 +748,8 @@ void Cluster::doPeriodicTasks()
_lastCleanedRemotePeers = now;
Mutex::Lock _l(_remotePeers_m);
- for(std::map< std::pair<Address,unsigned int>,uint64_t >::iterator rp(_remotePeers.begin());rp!=_remotePeers.end();) {
- if ((now - rp->second) >= ZT_PEER_ACTIVITY_TIMEOUT)
+ for(std::map< std::pair<Address,unsigned int>,_RemotePeer >::iterator rp(_remotePeers.begin());rp!=_remotePeers.end();) {
+ if ((now - rp->second.lastHavePeerReceived) >= ZT_PEER_ACTIVITY_TIMEOUT)
_remotePeers.erase(rp++);
else ++rp;
}
@@ -758,6 +862,19 @@ bool Cluster::findBetterEndpoint(InetAddress &redirectTo,const Address &peerAddr
}
}
+bool Cluster::isClusterPeerFrontplane(const InetAddress &ip) const
+{
+ Mutex::Lock _l(_memberIds_m);
+ for(std::vector<uint16_t>::const_iterator mid(_memberIds.begin());mid!=_memberIds.end();++mid) {
+ Mutex::Lock _l2(_members[*mid].lock);
+ for(std::vector<InetAddress>::const_iterator i2(_members[*mid].zeroTierPhysicalEndpoints.begin());i2!=_members[*mid].zeroTierPhysicalEndpoints.end();++i2) {
+ if (ip == *i2)
+ return true;
+ }
+ }
+ return false;
+}
+
void Cluster::status(ZT_ClusterStatus &status) const
{
const uint64_t now = RR->node->now();
@@ -837,10 +954,10 @@ void Cluster::_flush(uint16_t memberId)
// One-time-use Poly1305 key from first 32 bytes of Salsa20 keystream (as per DJB/NaCl "standard")
char polykey[ZT_POLY1305_KEY_LEN];
memset(polykey,0,sizeof(polykey));
- s20.encrypt12(polykey,polykey,sizeof(polykey));
+ s20.crypt12(polykey,polykey,sizeof(polykey));
// Encrypt m.q in place
- s20.encrypt12(reinterpret_cast<const char *>(m.q.data()) + 24,const_cast<char *>(reinterpret_cast<const char *>(m.q.data())) + 24,m.q.size() - 24);
+ s20.crypt12(reinterpret_cast<const char *>(m.q.data()) + 24,const_cast<char *>(reinterpret_cast<const char *>(m.q.data())) + 24,m.q.size() - 24);
// Add MAC for authentication (encrypt-then-MAC)
char mac[ZT_POLY1305_MAC_LEN];