diff options
| author | Adam Ierymenko <adam.ierymenko@gmail.com> | 2015-11-08 13:57:02 -0800 |
|---|---|---|
| committer | Adam Ierymenko <adam.ierymenko@gmail.com> | 2015-11-08 13:57:02 -0800 |
| commit | 57b71bfff0dc50505495e64b75988227d4b85905 (patch) | |
| tree | 018360d5330a96c10c83224a33216bb1693fca3f /node/Cluster.cpp | |
| parent | 6bc8c9d8efd05c7f85ac269be29c781fcc40672b (diff) | |
| download | infinitytier-57b71bfff0dc50505495e64b75988227d4b85905.tar.gz infinitytier-57b71bfff0dc50505495e64b75988227d4b85905.zip | |
Cluster simplification and refactor work in progress...
Diffstat (limited to 'node/Cluster.cpp')
| -rw-r--r-- | node/Cluster.cpp | 505 |
1 files changed, 272 insertions, 233 deletions
diff --git a/node/Cluster.cpp b/node/Cluster.cpp index 9c954fa3..c40fbd55 100644 --- a/node/Cluster.cpp +++ b/node/Cluster.cpp @@ -86,7 +86,8 @@ Cluster::Cluster( _peerAffinities(65536), _lastCleanedPeerAffinities(0), _lastCheckedPeersForAnnounce(0), - _lastFlushed(0) + _lastFlushed(0), + _lastCleanedRemotePeers(0) { uint16_t stmp[ZT_SHA512_DIGEST_LEN / sizeof(uint16_t)]; @@ -107,6 +108,9 @@ Cluster::~Cluster() Utils::burn(_masterSecret,sizeof(_masterSecret)); Utils::burn(_key,sizeof(_key)); delete [] _members; + + for(std::multimap<Address,_SQE *>::iterator qi(_sendViaClusterQueue.begin());qi!=_sendViaClusterQueue.end();) + delete qi->second; } void Cluster::handleIncomingStateMessage(const void *msg,unsigned int len) @@ -160,222 +164,263 @@ void Cluster::handleIncomingStateMessage(const void *msg,unsigned int len) return; } - { - _Member &m = _members[fromMemberId]; - Mutex::Lock mlck(m.lock); - - try { - while (ptr < dmsg.size()) { - const unsigned int mlen = dmsg.at<uint16_t>(ptr); ptr += 2; - const unsigned int nextPtr = ptr + mlen; - if (nextPtr > dmsg.size()) - break; + _Member &m = _members[fromMemberId]; - int mtype = -1; - try { - switch((StateMessageType)(mtype = (int)dmsg[ptr++])) { - default: - break; - - case STATE_MESSAGE_ALIVE: { - ptr += 7; // skip version stuff, not used yet - m.x = dmsg.at<int32_t>(ptr); ptr += 4; - m.y = dmsg.at<int32_t>(ptr); ptr += 4; - m.z = dmsg.at<int32_t>(ptr); ptr += 4; - ptr += 8; // skip local clock, not used - m.load = dmsg.at<uint64_t>(ptr); ptr += 8; - m.peers = dmsg.at<uint64_t>(ptr); ptr += 8; - ptr += 8; // skip flags, unused -#ifdef ZT_TRACE - std::string addrs; -#endif - unsigned int physicalAddressCount = dmsg[ptr++]; - m.zeroTierPhysicalEndpoints.clear(); - for(unsigned int i=0;i<physicalAddressCount;++i) { - m.zeroTierPhysicalEndpoints.push_back(InetAddress()); - ptr += m.zeroTierPhysicalEndpoints.back().deserialize(dmsg,ptr); - if (!(m.zeroTierPhysicalEndpoints.back())) { - m.zeroTierPhysicalEndpoints.pop_back(); - } + try { + while (ptr < dmsg.size()) { + const unsigned int mlen = dmsg.at<uint16_t>(ptr); ptr += 2; + const unsigned int nextPtr = ptr + mlen; + if (nextPtr > dmsg.size()) + break; + + int mtype = -1; + try { + switch((StateMessageType)(mtype = (int)dmsg[ptr++])) { + default: + break; + + case CLUSTER_MESSAGE_ALIVE: { + Mutex::Lock mlck(m.lock); + ptr += 7; // skip version stuff, not used yet + m.x = dmsg.at<int32_t>(ptr); ptr += 4; + m.y = dmsg.at<int32_t>(ptr); ptr += 4; + m.z = dmsg.at<int32_t>(ptr); ptr += 4; + ptr += 8; // skip local clock, not used + m.load = dmsg.at<uint64_t>(ptr); ptr += 8; + m.peers = dmsg.at<uint64_t>(ptr); ptr += 8; + ptr += 8; // skip flags, unused #ifdef ZT_TRACE - else { - if (addrs.length() > 0) - addrs.push_back(','); - addrs.append(m.zeroTierPhysicalEndpoints.back().toString()); - } + std::string addrs; #endif + unsigned int physicalAddressCount = dmsg[ptr++]; + m.zeroTierPhysicalEndpoints.clear(); + for(unsigned int i=0;i<physicalAddressCount;++i) { + m.zeroTierPhysicalEndpoints.push_back(InetAddress()); + ptr += m.zeroTierPhysicalEndpoints.back().deserialize(dmsg,ptr); + if (!(m.zeroTierPhysicalEndpoints.back())) { + m.zeroTierPhysicalEndpoints.pop_back(); } #ifdef ZT_TRACE - if ((RR->node->now() - m.lastReceivedAliveAnnouncement) >= ZT_CLUSTER_TIMEOUT) { - TRACE("[%u] I'm alive! peers close to %d,%d,%d can be redirected to: %s",(unsigned int)fromMemberId,m.x,m.y,m.z,addrs.c_str()); + else { + if (addrs.length() > 0) + addrs.push_back(','); + addrs.append(m.zeroTierPhysicalEndpoints.back().toString()); } #endif - m.lastReceivedAliveAnnouncement = RR->node->now(); - } break; - - case STATE_MESSAGE_HAVE_PEER: { - const Address zeroTierAddress(dmsg.field(ptr,ZT_ADDRESS_LENGTH),ZT_ADDRESS_LENGTH); ptr += ZT_ADDRESS_LENGTH; - Mutex::Lock _l2(_peerAffinities_m); - _peerAffinities.set(zeroTierAddress,fromMemberId); - TRACE("[%u] has %s @ %s",(unsigned int)fromMemberId,id.address().toString().c_str(),physicalAddress.toString().c_str()); - } break; - - case STATE_MESSAGE_WANT_PEER: { - const Address zeroTierAddress(dmsg.field(ptr,ZT_ADDRESS_LENGTH),ZT_ADDRESS_LENGTH); ptr += ZT_ADDRESS_LENGTH; - SharedPtr<Peer> peer(RR->topology->getPeerNoCache(zeroTierAddress)); - if ((peer)&&(peer->hasActiveDirectPath(RR->node->now()))) { - char buf[ZT_ADDRESS_LENGTH]; - peer->address().copyTo(buf,ZT_ADDRESS_LENGTH); + } +#ifdef ZT_TRACE + if ((RR->node->now() - m.lastReceivedAliveAnnouncement) >= ZT_CLUSTER_TIMEOUT) { + TRACE("[%u] I'm alive! peers close to %d,%d,%d can be redirected to: %s",(unsigned int)fromMemberId,m.x,m.y,m.z,addrs.c_str()); + } +#endif + m.lastReceivedAliveAnnouncement = RR->node->now(); + } break; + + case CLUSTER_MESSAGE_HAVE_PEER: { + Identity id; + ptr += id.deserialize(dmsg,ptr); + if (id) { + RR->topology->saveIdentity(id); + + { + Mutex::Lock _l(_remotePeers_m); + _remotePeers[std::pair<Address,unsigned int>(id.address(),(unsigned int)fromMemberId)] = RR->node->now(); + } + + std::pair<Address,_SQE *> q[ZT_CLUSTER_MAX_QUEUE_PER_SENDER]; + unsigned int qc = 0; + { + Mutex::Lock _l(_sendViaClusterQueue_m); + std::pair< std::multimap<Address,_SQE *>::iterator,std::multimap<Address,_SQE>::iterator > er(_sendViaClusterQueue.equal_range(id.address())); + for(std::multimap<Address,_SQE *>::iterator qi(er.first);qi!=er.second;) { + if (qc >= ZT_CLUSTER_MAX_QUEUE_PER_SENDER) // sanity check + break; + q[qc++] = *qi; + _sendViaClusterQueue.erase(qi++); + } + } + for(unsigned int i=0;i<qc;++i) { + this->sendViaCluster(q[i].first,q[i].second->toPeerAddress,q[i].second->data,q[i].second->len,q[i].second->unite); + delete q[i].second; + } + + TRACE("[%u] has %s",(unsigned int)fromMemberId,id.address().toString().c_str()); + } + } break; + + case CLUSTER_MESSAGE_WANT_PEER: { + const Address zeroTierAddress(dmsg.field(ptr,ZT_ADDRESS_LENGTH),ZT_ADDRESS_LENGTH); ptr += ZT_ADDRESS_LENGTH; + SharedPtr<Peer> peer(RR->topology->getPeerNoCache(zeroTierAddress)); + if ( (peer) && (peer->hasActiveDirectPath(RR->node->now())) ) { + Buffer<1024> buf; + peer->identity().serialize(buf); Mutex::Lock _l2(_members[fromMemberId].lock); - _send(fromMemberId,STATE_MESSAGE_HAVE_PEER,buf,ZT_ADDRESS_LENGTH); - _flush(fromMemberId); + _send(fromMemberId,CLUSTER_MESSAGE_HAVE_PEER,buf.data(),buf.size()); + _flush(fromMemberId); // lookups are latency sensitive } - } break; - - case STATE_MESSAGE_MULTICAST_LIKE: { - const uint64_t nwid = dmsg.at<uint64_t>(ptr); ptr += 8; - const Address address(dmsg.field(ptr,ZT_ADDRESS_LENGTH),ZT_ADDRESS_LENGTH); ptr += ZT_ADDRESS_LENGTH; - const MAC mac(dmsg.field(ptr,6),6); ptr += 6; - const uint32_t adi = dmsg.at<uint32_t>(ptr); ptr += 4; - RR->mc->add(RR->node->now(),nwid,MulticastGroup(mac,adi),address); - TRACE("[%u] %s likes %s/%.8x on %.16llx",(unsigned int)fromMemberId,address.toString().c_str(),mac.toString().c_str(),(unsigned int)adi,nwid); - } break; - - case STATE_MESSAGE_COM: { - /* not currently used so not decoded yet - CertificateOfMembership com; - ptr += com.deserialize(dmsg,ptr); - if (com) { - TRACE("[%u] COM for %s on %.16llu rev %llu",(unsigned int)fromMemberId,com.issuedTo().toString().c_str(),com.networkId(),com.revision()); + } + } break; + + case CLUSTER_MESSAGE_REMOTE_PACKET: { + const unsigned int plen = dmsg.at<uint16_t>(ptr); ptr += 2; + if (plen) { + Packet remotep(dmsg.field(ptr,plen),plen); ptr += plen; + TRACE("remote %s from %s via %u (%u bytes)",Packet::verbString(remotep.verb()),remotep.source().toString().c_str(),fromMemberId,plen); + switch(remotep.verb()) { + case Packet::VERB_WHOIS: _doREMOTE_WHOIS(fromMemberId,remotep); break; + case Packet::VERB_MULTICAST_GATHER: _doREMOTE_MULTICAST_GATHER(fromMemberId,remotep); break; + default: break; // ignore things we don't care about across cluster } - */ - } break; - - case STATE_MESSAGE_PROXY_UNITE: { - const Address localPeerAddress(dmsg.field(ptr,ZT_ADDRESS_LENGTH),ZT_ADDRESS_LENGTH); ptr += ZT_ADDRESS_LENGTH; - const Address remotePeerAddress(dmsg.field(ptr,ZT_ADDRESS_LENGTH),ZT_ADDRESS_LENGTH); ptr += ZT_ADDRESS_LENGTH; - const unsigned int numRemotePeerPaths = dmsg[ptr++]; - InetAddress remotePeerPaths[256]; // size is 8-bit, so 256 is max - for(unsigned int i=0;i<numRemotePeerPaths;++i) - ptr += remotePeerPaths[i].deserialize(dmsg,ptr); - - TRACE("[%u] requested that we unite local %s with remote %s",(unsigned int)fromMemberId,localPeerAddress.toString().c_str(),remotePeerAddress.toString().c_str()); - - const uint64_t now = RR->node->now(); - SharedPtr<Peer> localPeer(RR->topology->getPeerNoCache(localPeerAddress)); - if ((localPeer)&&(numRemotePeerPaths > 0)) { - InetAddress bestLocalV4,bestLocalV6; - localPeer->getBestActiveAddresses(now,bestLocalV4,bestLocalV6); - - InetAddress bestRemoteV4,bestRemoteV6; - for(unsigned int i=0;i<numRemotePeerPaths;++i) { - if ((bestRemoteV4)&&(bestRemoteV6)) + } + } break; + + case CLUSTER_MESSAGE_PROXY_UNITE: { + const Address localPeerAddress(dmsg.field(ptr,ZT_ADDRESS_LENGTH),ZT_ADDRESS_LENGTH); ptr += ZT_ADDRESS_LENGTH; + const Address remotePeerAddress(dmsg.field(ptr,ZT_ADDRESS_LENGTH),ZT_ADDRESS_LENGTH); ptr += ZT_ADDRESS_LENGTH; + const unsigned int numRemotePeerPaths = dmsg[ptr++]; + InetAddress remotePeerPaths[256]; // size is 8-bit, so 256 is max + for(unsigned int i=0;i<numRemotePeerPaths;++i) + ptr += remotePeerPaths[i].deserialize(dmsg,ptr); + + TRACE("[%u] requested that we unite local %s with remote %s",(unsigned int)fromMemberId,localPeerAddress.toString().c_str(),remotePeerAddress.toString().c_str()); + + const uint64_t now = RR->node->now(); + SharedPtr<Peer> localPeer(RR->topology->getPeerNoCache(localPeerAddress)); + if ((localPeer)&&(numRemotePeerPaths > 0)) { + InetAddress bestLocalV4,bestLocalV6; + localPeer->getBestActiveAddresses(now,bestLocalV4,bestLocalV6); + + InetAddress bestRemoteV4,bestRemoteV6; + for(unsigned int i=0;i<numRemotePeerPaths;++i) { + if ((bestRemoteV4)&&(bestRemoteV6)) + break; + switch(remotePeerPaths[i].ss_family) { + case AF_INET: + if (!bestRemoteV4) + bestRemoteV4 = remotePeerPaths[i]; + break; + case AF_INET6: + if (!bestRemoteV6) + bestRemoteV6 = remotePeerPaths[i]; break; - switch(remotePeerPaths[i].ss_family) { - case AF_INET: - if (!bestRemoteV4) - bestRemoteV4 = remotePeerPaths[i]; - break; - case AF_INET6: - if (!bestRemoteV6) - bestRemoteV6 = remotePeerPaths[i]; - break; - } } + } - Packet rendezvousForLocal(localPeerAddress,RR->identity.address(),Packet::VERB_RENDEZVOUS); - rendezvousForLocal.append((uint8_t)0); - remotePeerAddress.appendTo(rendezvousForLocal); - - Buffer<2048> rendezvousForRemote; - remotePeerAddress.appendTo(rendezvousForRemote); - rendezvousForRemote.append((uint8_t)Packet::VERB_RENDEZVOUS); - const unsigned int rendezvousForOtherEndPayloadSizePtr = rendezvousForRemote.size(); - rendezvousForRemote.addSize(2); // space for actual packet payload length - rendezvousForRemote.append((uint8_t)0); // flags == 0 - localPeerAddress.appendTo(rendezvousForRemote); - - bool haveMatch = false; - if ((bestLocalV6)&&(bestRemoteV6)) { - haveMatch = true; - - rendezvousForLocal.append((uint16_t)bestRemoteV6.port()); - rendezvousForLocal.append((uint8_t)16); - rendezvousForLocal.append(bestRemoteV6.rawIpData(),16); - - rendezvousForRemote.append((uint16_t)bestLocalV6.port()); - rendezvousForRemote.append((uint8_t)16); - rendezvousForRemote.append(bestLocalV6.rawIpData(),16); - rendezvousForRemote.setAt<uint16_t>(rendezvousForOtherEndPayloadSizePtr,(uint16_t)(9 + 16)); - } else if ((bestLocalV4)&&(bestRemoteV4)) { - haveMatch = true; - - rendezvousForLocal.append((uint16_t)bestRemoteV4.port()); - rendezvousForLocal.append((uint8_t)4); - rendezvousForLocal.append(bestRemoteV4.rawIpData(),4); - - rendezvousForRemote.append((uint16_t)bestLocalV4.port()); - rendezvousForRemote.append((uint8_t)4); - rendezvousForRemote.append(bestLocalV4.rawIpData(),4); - rendezvousForRemote.setAt<uint16_t>(rendezvousForOtherEndPayloadSizePtr,(uint16_t)(9 + 4)); - } + Packet rendezvousForLocal(localPeerAddress,RR->identity.address(),Packet::VERB_RENDEZVOUS); + rendezvousForLocal.append((uint8_t)0); + remotePeerAddress.appendTo(rendezvousForLocal); + + Buffer<2048> rendezvousForRemote; + remotePeerAddress.appendTo(rendezvousForRemote); + rendezvousForRemote.append((uint8_t)Packet::VERB_RENDEZVOUS); + const unsigned int rendezvousForOtherEndPayloadSizePtr = rendezvousForRemote.size(); + rendezvousForRemote.addSize(2); // space for actual packet payload length + rendezvousForRemote.append((uint8_t)0); // flags == 0 + localPeerAddress.appendTo(rendezvousForRemote); + + bool haveMatch = false; + if ((bestLocalV6)&&(bestRemoteV6)) { + haveMatch = true; + + rendezvousForLocal.append((uint16_t)bestRemoteV6.port()); + rendezvousForLocal.append((uint8_t)16); + rendezvousForLocal.append(bestRemoteV6.rawIpData(),16); + + rendezvousForRemote.append((uint16_t)bestLocalV6.port()); + rendezvousForRemote.append((uint8_t)16); + rendezvousForRemote.append(bestLocalV6.rawIpData(),16); + rendezvousForRemote.setAt<uint16_t>(rendezvousForOtherEndPayloadSizePtr,(uint16_t)(9 + 16)); + } else if ((bestLocalV4)&&(bestRemoteV4)) { + haveMatch = true; + + rendezvousForLocal.append((uint16_t)bestRemoteV4.port()); + rendezvousForLocal.append((uint8_t)4); + rendezvousForLocal.append(bestRemoteV4.rawIpData(),4); + + rendezvousForRemote.append((uint16_t)bestLocalV4.port()); + rendezvousForRemote.append((uint8_t)4); + rendezvousForRemote.append(bestLocalV4.rawIpData(),4); + rendezvousForRemote.setAt<uint16_t>(rendezvousForOtherEndPayloadSizePtr,(uint16_t)(9 + 4)); + } - if (haveMatch) { - _send(fromMemberId,STATE_MESSAGE_PROXY_SEND,rendezvousForRemote.data(),rendezvousForRemote.size()); - _flush(fromMemberId); - RR->sw->send(rendezvousForLocal,true,0); - } + if (haveMatch) { + _send(fromMemberId,CLUSTER_MESSAGE_PROXY_SEND,rendezvousForRemote.data(),rendezvousForRemote.size()); + _flush(fromMemberId); + RR->sw->send(rendezvousForLocal,true,0); } - } break; - - case STATE_MESSAGE_PROXY_SEND: { - const Address rcpt(dmsg.field(ptr,ZT_ADDRESS_LENGTH),ZT_ADDRESS_LENGTH); ptr += ZT_ADDRESS_LENGTH; - const Packet::Verb verb = (Packet::Verb)dmsg[ptr++]; - const unsigned int len = dmsg.at<uint16_t>(ptr); ptr += 2; - Packet outp(rcpt,RR->identity.address(),verb); - outp.append(dmsg.field(ptr,len),len); ptr += len; - RR->sw->send(outp,true,0); - TRACE("[%u] proxy send %s to %s length %u",(unsigned int)fromMemberId,Packet::verbString(verb),rcpt.toString().c_str(),len); - } break; - } - } catch ( ... ) { - TRACE("invalid message of size %u type %d (inner decode), discarding",mlen,mtype); - // drop invalids + } + } break; + + case CLUSTER_MESSAGE_PROXY_SEND: { + const Address rcpt(dmsg.field(ptr,ZT_ADDRESS_LENGTH),ZT_ADDRESS_LENGTH); ptr += ZT_ADDRESS_LENGTH; + const Packet::Verb verb = (Packet::Verb)dmsg[ptr++]; + const unsigned int len = dmsg.at<uint16_t>(ptr); ptr += 2; + Packet outp(rcpt,RR->identity.address(),verb); + outp.append(dmsg.field(ptr,len),len); ptr += len; + RR->sw->send(outp,true,0); + //TRACE("[%u] proxy send %s to %s length %u",(unsigned int)fromMemberId,Packet::verbString(verb),rcpt.toString().c_str(),len); + } break; } - - ptr = nextPtr; + } catch ( ... ) { + TRACE("invalid message of size %u type %d (inner decode), discarding",mlen,mtype); + // drop invalids } - } catch ( ... ) { - TRACE("invalid message (outer loop), discarding"); - // drop invalids + + ptr = nextPtr; } + } catch ( ... ) { + TRACE("invalid message (outer loop), discarding"); + // drop invalids } } bool Cluster::sendViaCluster(const Address &fromPeerAddress,const Address &toPeerAddress,const void *data,unsigned int len,bool unite) { - if (len > 16384) // sanity check + if (len > ZT_PROTO_MAX_PACKET_LENGTH) // sanity check return false; const uint64_t now = RR->node->now(); unsigned int canHasPeer = 0; + uint64_t mostRecentTs = 0; + unsigned int mostRecentMemberId = 0xffffffff; { - Mutex::Lock _l2(_peerAffinities_m); - const unsigned int *pa = _peerAffinities.get(toPeerAddress); - if (!pa) { - char buf[ZT_ADDRESS_LENGTH]; - peerId.address().copyTo(buf,ZT_ADDRESS_LENGTH); - { - Mutex::Lock _l(_memberIds_m); - for(std::vector<uint16_t>::const_iterator mid(_memberIds.begin());mid!=_memberIds.end();++mid) { - Mutex::Lock _l2(_members[*mid].lock); - _send(*mid,STATE_MESSAGE_WANT_PEER,buf,ZT_ADDRESS_LENGTH); - } + Mutex::Lock _l2(_remotePeers_m); + std::map< std::pair<Address,unsigned int>,uint64_t >::const_iterator rpe(_remotePeers.lower_bound(std::pair<Address,unsigned int>(fromPeerAddress,0))); + for(;;) { + if ((rpe == _remotePeers.end())||(rpe->first.first != fromPeerAddress)) + break; + else if (rpe->second > mostRecentTs) { + mostRecentTs = rpe->second; + mostRecentMemberId = rpe->first.second; } - return false; } - canHasPeer = *pa; + } + + const uint64_t age = now - mostRecentTs; + if (age >= (ZT_PEER_ACTIVITY_TIMEOUT / 3)) { + // Poll everyone with WANT_PEER if the age of our most recent entry is + // approaching expiration (or has expired, or does not exist). + char tmp[ZT_ADDRESS_LENGTH]; + toPeerAddress.copyTo(tmp,ZT_ADDRESS_LENGTH); + { + Mutex::Lock _l(_memberIds_m); + for(std::vector<uint16_t>::const_iterator mid(_memberIds.begin());mid!=_memberIds.end();++mid) { + Mutex::Lock _l2(_members[*mid].lock); + _send(*mid,CLUSTER_MESSAGE_WANT_PEER,tmp,ZT_ADDRESS_LENGTH); + if (mostRecentMemberId > 0xffff) + _flush(*mid); // latency sensitive if we don't have one + } + } + + // If there isn't a good place to send via, then enqueue this for retrying + // later and return after having broadcasted a WANT_PEER. + if ((age >= ZT_PEER_ACTIVITY_TIMEOUT)||(mostRecentMemberId > 0xffff)) { + Mutex::Lock _l(_sendViaClusterQueue_m); + if (_sendViaClusterQueue.count(fromPeerAddress) < ZT_CLUSTER_MAX_QUEUE_PER_SENDER) + _sendViaClusterQueue.insert(std::pair<Address,_SQE *>(fromPeerAddress,new _SQE(now,toPeerAddress,data,len,unite))); + return true; + } } Buffer<1024> buf; @@ -401,11 +446,14 @@ bool Cluster::sendViaCluster(const Address &fromPeerAddress,const Address &toPee v6.serialize(buf); } } + { - Mutex::Lock _l2(_members[canHasPeer].lock); - if (buf.size() > 0) - _send(canHasPeer,STATE_MESSAGE_PROXY_UNITE,buf.data(),buf.size()); - if (_members[canHasPeer].zeroTierPhysicalEndpoints.size() > 0) + Mutex::Lock _l2(_members[mostRecentMemberId].lock); + if (buf.size() > 0) { + _send(canHasPeer,CLUSTER_MESSAGE_PROXY_UNITE,buf.data(),buf.size()); + _flush(canHasPeer); // latency sensitive + } + if (_members[mostRecentMemberId].zeroTierPhysicalEndpoints.size() > 0) RR->node->putPacket(InetAddress(),_members[canHasPeer].zeroTierPhysicalEndpoints.front(),data,len); } @@ -414,62 +462,43 @@ bool Cluster::sendViaCluster(const Address &fromPeerAddress,const Address &toPee return true; } -void Cluster::replicateHavePeer(const Identity &peerId) -{ - char buf[ZT_ADDRESS_LENGTH]; - peerId.address().copyTo(buf,ZT_ADDRESS_LENGTH); - { - Mutex::Lock _l(_memberIds_m); - for(std::vector<uint16_t>::const_iterator mid(_memberIds.begin());mid!=_memberIds.end();++mid) { - Mutex::Lock _l2(_members[*mid].lock); - _send(*mid,STATE_MESSAGE_HAVE_PEER,buf,ZT_ADDRESS_LENGTH); - } - } -} - -void Cluster::replicateMulticastLike(uint64_t nwid,const Address &peerAddress,const MulticastGroup &group) -{ - Buffer<1024> buf; - buf.append((uint64_t)nwid); - peerAddress.appendTo(buf); - group.mac().appendTo(buf); - buf.append((uint32_t)group.adi()); - TRACE("replicating %s MULTICAST_LIKE %.16llx/%s/%u to all members",peerAddress.toString().c_str(),nwid,group.mac().toString().c_str(),(unsigned int)group.adi()); - { - Mutex::Lock _l(_memberIds_m); - for(std::vector<uint16_t>::const_iterator mid(_memberIds.begin());mid!=_memberIds.end();++mid) { - Mutex::Lock _l2(_members[*mid].lock); - _send(*mid,STATE_MESSAGE_MULTICAST_LIKE,buf.data(),buf.size()); - } - } -} - -void Cluster::replicateCertificateOfNetworkMembership(const CertificateOfMembership &com) +void Cluster::sendDistributedQuery(const Packet &pkt) { - /* not used yet, so don't do this yet Buffer<4096> buf; - com.serialize(buf); - TRACE("replicating %s COM for %.16llx to all members",com.issuedTo().toString().c_str(),com.networkId()); - { - Mutex::Lock _l(_memberIds_m); - for(std::vector<uint16_t>::const_iterator mid(_memberIds.begin());mid!=_memberIds.end();++mid) { - Mutex::Lock _l2(_members[*mid].lock); - _send(*mid,STATE_MESSAGE_COM,buf.data(),buf.size()); - } + buf.append((uint16_t)pkt.size()); + buf.append(pkt.data(),pkt.size()); + Mutex::Lock _l(_memberIds_m); + for(std::vector<uint16_t>::const_iterator mid(_memberIds.begin());mid!=_memberIds.end();++mid) { + Mutex::Lock _l2(_members[*mid].lock); + _send(*mid,CLUSTER_MESSAGE_REMOTE_PACKET,buf.data(),buf.size()); + _flush(*mid); // these tend to be latency-sensitive } - */ } void Cluster::doPeriodicTasks() { const uint64_t now = RR->node->now(); + if ((now - _lastFlushed) >= ZT_CLUSTER_FLUSH_PERIOD) { _lastFlushed = now; + + { + Mutex::Lock _l2(_sendViaClusterQueue_m); + for(std::multimap<Address,_SQE *>::iterator qi(_sendViaClusterQueue.begin());qi!=_sendViaClusterQueue.end();) { + if ((now - qi->second->timestamp) >= ZT_CLUSTER_QUEUE_EXPIRATION) { + delete qi->second; + _sendViaClusterQueue.erase(qi++); + } else ++qi; + } + } + Mutex::Lock _l(_memberIds_m); for(std::vector<uint16_t>::const_iterator mid(_memberIds.begin());mid!=_memberIds.end();++mid) { Mutex::Lock _l2(_members[*mid].lock); if ((now - _members[*mid].lastAnnouncedAliveTo) >= ((ZT_CLUSTER_TIMEOUT / 2) - 1000)) { + _members[*mid].lastAnnouncedAliveTo = now; + Buffer<2048> alive; alive.append((uint16_t)ZEROTIER_ONE_VERSION_MAJOR); alive.append((uint16_t)ZEROTIER_ONE_VERSION_MINOR); @@ -491,13 +520,23 @@ void Cluster::doPeriodicTasks() alive.append((uint8_t)_zeroTierPhysicalEndpoints.size()); for(std::vector<InetAddress>::const_iterator pe(_zeroTierPhysicalEndpoints.begin());pe!=_zeroTierPhysicalEndpoints.end();++pe) pe->serialize(alive); - _send(*mid,STATE_MESSAGE_ALIVE,alive.data(),alive.size()); - _members[*mid].lastAnnouncedAliveTo = now; + _send(*mid,CLUSTER_MESSAGE_ALIVE,alive.data(),alive.size()); } _flush(*mid); // does nothing if nothing to flush } } + + if ((now - _lastCleanedRemotePeers) >= (ZT_PEER_ACTIVITY_TIMEOUT * 2)) { + _lastCleanedRemotePeers = now; + + Mutex::Lock _l(_remotePeers_m); + for(std::map< std::pair<Address,unsigned int>,uint64_t >::iterator rp(_remotePeers.begin());rp!=_remotePeers.end();) { + if ((now - rp->second) >= ZT_PEER_ACTIVITY_TIMEOUT) + _remotePeers.erase(rp++); + else ++rp; + } + } } void Cluster::addMember(uint16_t memberId) |
