diff options
author | Grant Limberg <glimberg@gmail.com> | 2015-10-26 18:10:27 -0700 |
---|---|---|
committer | Grant Limberg <glimberg@gmail.com> | 2015-10-26 18:10:27 -0700 |
commit | 352b83252fb2617a15cde0927cc30110b729e46d (patch) | |
tree | 38569e4f6c1c162cdaabf7ac8c9d6b7917d71ffd /node/Cluster.cpp | |
parent | a0c3083af0821db0303c62dfae9ebc560c3c147a (diff) | |
parent | 6625d7929654803f99b7a69f56a400046314acac (diff) | |
download | infinitytier-352b83252fb2617a15cde0927cc30110b729e46d.tar.gz infinitytier-352b83252fb2617a15cde0927cc30110b729e46d.zip |
Merge branch 'adamierymenko-dev' into windows-ui
Diffstat (limited to 'node/Cluster.cpp')
-rw-r--r-- | node/Cluster.cpp | 458 |
1 files changed, 267 insertions, 191 deletions
diff --git a/node/Cluster.cpp b/node/Cluster.cpp index d9514db5..9d25593a 100644 --- a/node/Cluster.cpp +++ b/node/Cluster.cpp @@ -143,212 +143,223 @@ void Cluster::handleIncomingStateMessage(const void *msg,unsigned int len) return; const uint16_t fromMemberId = dmsg.at<uint16_t>(0); unsigned int ptr = 2; - if (fromMemberId == _id) + if (fromMemberId == _id) // sanity check: we don't talk to ourselves return; const uint16_t toMemberId = dmsg.at<uint16_t>(ptr); ptr += 2; - if (toMemberId != _id) + if (toMemberId != _id) // sanity check: message not for us? return; - _Member &m = _members[fromMemberId]; - Mutex::Lock mlck(m.lock); - - try { - while (ptr < dmsg.size()) { - const unsigned int mlen = dmsg.at<uint16_t>(ptr); ptr += 2; - const unsigned int nextPtr = ptr + mlen; - - int mtype = -1; - try { - switch((StateMessageType)(mtype = (int)dmsg[ptr++])) { - default: - break; - - case STATE_MESSAGE_ALIVE: { - ptr += 7; // skip version stuff, not used yet - m.x = dmsg.at<int32_t>(ptr); ptr += 4; - m.y = dmsg.at<int32_t>(ptr); ptr += 4; - m.z = dmsg.at<int32_t>(ptr); ptr += 4; - ptr += 8; // skip local clock, not used - m.load = dmsg.at<uint64_t>(ptr); ptr += 8; - ptr += 8; // skip flags, unused + { // make sure sender is actually considered a member + Mutex::Lock _l3(_memberIds_m); + if (std::find(_memberIds.begin(),_memberIds.end(),fromMemberId) == _memberIds.end()) + return; + } + + { + _Member &m = _members[fromMemberId]; + Mutex::Lock mlck(m.lock); + + try { + while (ptr < dmsg.size()) { + const unsigned int mlen = dmsg.at<uint16_t>(ptr); ptr += 2; + const unsigned int nextPtr = ptr + mlen; + if (nextPtr > dmsg.size()) + break; + + int mtype = -1; + try { + switch((StateMessageType)(mtype = (int)dmsg[ptr++])) { + default: + break; + + case STATE_MESSAGE_ALIVE: { + ptr += 7; // skip version stuff, not used yet + m.x = dmsg.at<int32_t>(ptr); ptr += 4; + m.y = dmsg.at<int32_t>(ptr); ptr += 4; + m.z = dmsg.at<int32_t>(ptr); ptr += 4; + ptr += 8; // skip local clock, not used + m.load = dmsg.at<uint64_t>(ptr); ptr += 8; + ptr += 8; // skip flags, unused #ifdef ZT_TRACE - std::string addrs; + std::string addrs; #endif - unsigned int physicalAddressCount = dmsg[ptr++]; - for(unsigned int i=0;i<physicalAddressCount;++i) { - m.zeroTierPhysicalEndpoints.push_back(InetAddress()); - ptr += m.zeroTierPhysicalEndpoints.back().deserialize(dmsg,ptr); - if (!(m.zeroTierPhysicalEndpoints.back())) { - m.zeroTierPhysicalEndpoints.pop_back(); - } + unsigned int physicalAddressCount = dmsg[ptr++]; + m.zeroTierPhysicalEndpoints.clear(); + for(unsigned int i=0;i<physicalAddressCount;++i) { + m.zeroTierPhysicalEndpoints.push_back(InetAddress()); + ptr += m.zeroTierPhysicalEndpoints.back().deserialize(dmsg,ptr); + if (!(m.zeroTierPhysicalEndpoints.back())) { + m.zeroTierPhysicalEndpoints.pop_back(); + } #ifdef ZT_TRACE - else { - if (addrs.length() > 0) - addrs.push_back(','); - addrs.append(m.zeroTierPhysicalEndpoints.back().toString()); - } + else { + if (addrs.length() > 0) + addrs.push_back(','); + addrs.append(m.zeroTierPhysicalEndpoints.back().toString()); + } #endif - } - m.lastReceivedAliveAnnouncement = RR->node->now(); + } + m.lastReceivedAliveAnnouncement = RR->node->now(); #ifdef ZT_TRACE - TRACE("[%u] I'm alive! send me peers at %s",(unsigned int)fromMemberId,addrs.c_str()); + TRACE("[%u] I'm alive! peers close to %d,%d,%d can be redirected to: %s",(unsigned int)fromMemberId,m.x,m.y,m.z,addrs.c_str()); #endif - } break; - - case STATE_MESSAGE_HAVE_PEER: { - try { - Identity id; - ptr += id.deserialize(dmsg,ptr); - if (id) { - RR->topology->saveIdentity(id); - - { // Add or update peer affinity entry - _PeerAffinity pa(id.address(),fromMemberId,RR->node->now()); - Mutex::Lock _l2(_peerAffinities_m); - std::vector<_PeerAffinity>::iterator i(std::lower_bound(_peerAffinities.begin(),_peerAffinities.end(),pa)); // O(log(n)) - if ((i != _peerAffinities.end())&&(i->key == pa.key)) { - i->timestamp = pa.timestamp; - } else { - _peerAffinities.push_back(pa); - std::sort(_peerAffinities.begin(),_peerAffinities.end()); // probably a more efficient way to insert but okay for now - } - } + } break; + + case STATE_MESSAGE_HAVE_PEER: { + try { + Identity id; + ptr += id.deserialize(dmsg,ptr); + if (id) { + RR->topology->saveIdentity(id); + + { // Add or update peer affinity entry + _PeerAffinity pa(id.address(),fromMemberId,RR->node->now()); + Mutex::Lock _l2(_peerAffinities_m); + std::vector<_PeerAffinity>::iterator i(std::lower_bound(_peerAffinities.begin(),_peerAffinities.end(),pa)); // O(log(n)) + if ((i != _peerAffinities.end())&&(i->key == pa.key)) { + i->timestamp = pa.timestamp; + } else { + _peerAffinities.push_back(pa); + std::sort(_peerAffinities.begin(),_peerAffinities.end()); // probably a more efficient way to insert but okay for now + } + } - TRACE("[%u] has %s",(unsigned int)fromMemberId,id.address().toString().c_str()); - } - } catch ( ... ) { - // ignore invalid identities - } - } break; - - case STATE_MESSAGE_MULTICAST_LIKE: { - const uint64_t nwid = dmsg.at<uint64_t>(ptr); ptr += 8; - const Address address(dmsg.field(ptr,ZT_ADDRESS_LENGTH),ZT_ADDRESS_LENGTH); ptr += ZT_ADDRESS_LENGTH; - const MAC mac(dmsg.field(ptr,6),6); ptr += 6; - const uint32_t adi = dmsg.at<uint32_t>(ptr); ptr += 4; - RR->mc->add(RR->node->now(),nwid,MulticastGroup(mac,adi),address); - TRACE("[%u] %s likes %s/%u on %.16llu",(unsigned int)fromMemberId,address.toString().c_str(),mac.toString().c_str(),(unsigned int)adi,nwid); - } break; - - case STATE_MESSAGE_COM: { - CertificateOfMembership com; - ptr += com.deserialize(dmsg,ptr); - if (com) { - TRACE("[%u] COM for %s on %.16llu rev %llu",(unsigned int)fromMemberId,com.issuedTo().toString().c_str(),com.networkId(),com.revision()); - } - } break; - - case STATE_MESSAGE_RELAY: { - const unsigned int numRemotePeerPaths = dmsg[ptr++]; - InetAddress remotePeerPaths[256]; // size is 8-bit, so 256 is max - for(unsigned int i=0;i<numRemotePeerPaths;++i) - ptr += remotePeerPaths[i].deserialize(dmsg,ptr); - const unsigned int packetLen = dmsg.at<uint16_t>(ptr); ptr += 2; - const void *packet = (const void *)dmsg.field(ptr,packetLen); ptr += packetLen; - - if (packetLen >= ZT_PROTO_MIN_FRAGMENT_LENGTH) { // ignore anything too short to contain a dest address - const Address destinationAddress(reinterpret_cast<const char *>(packet) + 8,ZT_ADDRESS_LENGTH); - TRACE("[%u] relay %u bytes to %s (%u remote paths included)",(unsigned int)fromMemberId,packetLen,destinationAddress.toString().c_str(),numRemotePeerPaths); - - SharedPtr<Peer> destinationPeer(RR->topology->getPeer(destinationAddress)); - if (destinationPeer) { - if ( - (destinationPeer->send(RR,packet,packetLen,RR->node->now()))&& - (numRemotePeerPaths > 0)&& - (packetLen >= 18)&& - (reinterpret_cast<const unsigned char *>(packet)[ZT_PACKET_FRAGMENT_IDX_FRAGMENT_INDICATOR] == ZT_PACKET_FRAGMENT_INDICATOR) - ) { - // If remote peer paths were sent with this relayed packet, we do - // RENDEZVOUS. It's handled here for cluster-relayed packets since - // we don't have both Peer records so this is a different path. - - const Address remotePeerAddress(reinterpret_cast<const char *>(packet) + 13,ZT_ADDRESS_LENGTH); - - InetAddress bestDestV4,bestDestV6; - destinationPeer->getBestActiveAddresses(RR->node->now(),bestDestV4,bestDestV6); - InetAddress bestRemoteV4,bestRemoteV6; - for(unsigned int i=0;i<numRemotePeerPaths;++i) { - if ((bestRemoteV4)&&(bestRemoteV6)) - break; - switch(remotePeerPaths[i].ss_family) { - case AF_INET: - if (!bestRemoteV4) - bestRemoteV4 = remotePeerPaths[i]; - break; - case AF_INET6: - if (!bestRemoteV6) - bestRemoteV6 = remotePeerPaths[i]; + TRACE("[%u] has %s",(unsigned int)fromMemberId,id.address().toString().c_str()); + } + } catch ( ... ) { + // ignore invalid identities + } + } break; + + case STATE_MESSAGE_MULTICAST_LIKE: { + const uint64_t nwid = dmsg.at<uint64_t>(ptr); ptr += 8; + const Address address(dmsg.field(ptr,ZT_ADDRESS_LENGTH),ZT_ADDRESS_LENGTH); ptr += ZT_ADDRESS_LENGTH; + const MAC mac(dmsg.field(ptr,6),6); ptr += 6; + const uint32_t adi = dmsg.at<uint32_t>(ptr); ptr += 4; + RR->mc->add(RR->node->now(),nwid,MulticastGroup(mac,adi),address); + TRACE("[%u] %s likes %s/%u on %.16llu",(unsigned int)fromMemberId,address.toString().c_str(),mac.toString().c_str(),(unsigned int)adi,nwid); + } break; + + case STATE_MESSAGE_COM: { + CertificateOfMembership com; + ptr += com.deserialize(dmsg,ptr); + if (com) { + TRACE("[%u] COM for %s on %.16llu rev %llu",(unsigned int)fromMemberId,com.issuedTo().toString().c_str(),com.networkId(),com.revision()); + } + } break; + + case STATE_MESSAGE_RELAY: { + const unsigned int numRemotePeerPaths = dmsg[ptr++]; + InetAddress remotePeerPaths[256]; // size is 8-bit, so 256 is max + for(unsigned int i=0;i<numRemotePeerPaths;++i) + ptr += remotePeerPaths[i].deserialize(dmsg,ptr); + const unsigned int packetLen = dmsg.at<uint16_t>(ptr); ptr += 2; + const void *packet = (const void *)dmsg.field(ptr,packetLen); ptr += packetLen; + + if (packetLen >= ZT_PROTO_MIN_FRAGMENT_LENGTH) { // ignore anything too short to contain a dest address + const Address destinationAddress(reinterpret_cast<const char *>(packet) + 8,ZT_ADDRESS_LENGTH); + TRACE("[%u] relay %u bytes to %s (%u remote paths included)",(unsigned int)fromMemberId,packetLen,destinationAddress.toString().c_str(),numRemotePeerPaths); + + SharedPtr<Peer> destinationPeer(RR->topology->getPeer(destinationAddress)); + if (destinationPeer) { + if ( + (destinationPeer->send(RR,packet,packetLen,RR->node->now()))&& + (numRemotePeerPaths > 0)&& + (packetLen >= 18)&& + (reinterpret_cast<const unsigned char *>(packet)[ZT_PACKET_FRAGMENT_IDX_FRAGMENT_INDICATOR] == ZT_PACKET_FRAGMENT_INDICATOR) + ) { + // If remote peer paths were sent with this relayed packet, we do + // RENDEZVOUS. It's handled here for cluster-relayed packets since + // we don't have both Peer records so this is a different path. + + const Address remotePeerAddress(reinterpret_cast<const char *>(packet) + 13,ZT_ADDRESS_LENGTH); + + InetAddress bestDestV4,bestDestV6; + destinationPeer->getBestActiveAddresses(RR->node->now(),bestDestV4,bestDestV6); + InetAddress bestRemoteV4,bestRemoteV6; + for(unsigned int i=0;i<numRemotePeerPaths;++i) { + if ((bestRemoteV4)&&(bestRemoteV6)) break; + switch(remotePeerPaths[i].ss_family) { + case AF_INET: + if (!bestRemoteV4) + bestRemoteV4 = remotePeerPaths[i]; + break; + case AF_INET6: + if (!bestRemoteV6) + bestRemoteV6 = remotePeerPaths[i]; + break; + } } - } - Packet rendezvousForDest(destinationAddress,RR->identity.address(),Packet::VERB_RENDEZVOUS); - rendezvousForDest.append((uint8_t)0); - remotePeerAddress.appendTo(rendezvousForDest); - - Buffer<2048> rendezvousForOtherEnd; - remotePeerAddress.appendTo(rendezvousForOtherEnd); - rendezvousForOtherEnd.append((uint8_t)Packet::VERB_RENDEZVOUS); - const unsigned int rendezvousForOtherEndPayloadSizePtr = rendezvousForOtherEnd.size(); - rendezvousForOtherEnd.addSize(2); // space for actual packet payload length - rendezvousForOtherEnd.append((uint8_t)0); // flags == 0 - destinationAddress.appendTo(rendezvousForOtherEnd); - - bool haveMatch = false; - if ((bestDestV6)&&(bestRemoteV6)) { - haveMatch = true; - - rendezvousForDest.append((uint16_t)bestRemoteV6.port()); - rendezvousForDest.append((uint8_t)16); - rendezvousForDest.append(bestRemoteV6.rawIpData(),16); - - rendezvousForOtherEnd.append((uint16_t)bestDestV6.port()); - rendezvousForOtherEnd.append((uint8_t)16); - rendezvousForOtherEnd.append(bestDestV6.rawIpData(),16); - rendezvousForOtherEnd.setAt<uint16_t>(rendezvousForOtherEndPayloadSizePtr,(uint16_t)(9 + 16)); - } else if ((bestDestV4)&&(bestRemoteV4)) { - haveMatch = true; - - rendezvousForDest.append((uint16_t)bestRemoteV4.port()); - rendezvousForDest.append((uint8_t)4); - rendezvousForDest.append(bestRemoteV4.rawIpData(),4); - - rendezvousForOtherEnd.append((uint16_t)bestDestV4.port()); - rendezvousForOtherEnd.append((uint8_t)4); - rendezvousForOtherEnd.append(bestDestV4.rawIpData(),4); - rendezvousForOtherEnd.setAt<uint16_t>(rendezvousForOtherEndPayloadSizePtr,(uint16_t)(9 + 4)); - } + Packet rendezvousForDest(destinationAddress,RR->identity.address(),Packet::VERB_RENDEZVOUS); + rendezvousForDest.append((uint8_t)0); + remotePeerAddress.appendTo(rendezvousForDest); + + Buffer<2048> rendezvousForOtherEnd; + remotePeerAddress.appendTo(rendezvousForOtherEnd); + rendezvousForOtherEnd.append((uint8_t)Packet::VERB_RENDEZVOUS); + const unsigned int rendezvousForOtherEndPayloadSizePtr = rendezvousForOtherEnd.size(); + rendezvousForOtherEnd.addSize(2); // space for actual packet payload length + rendezvousForOtherEnd.append((uint8_t)0); // flags == 0 + destinationAddress.appendTo(rendezvousForOtherEnd); + + bool haveMatch = false; + if ((bestDestV6)&&(bestRemoteV6)) { + haveMatch = true; + + rendezvousForDest.append((uint16_t)bestRemoteV6.port()); + rendezvousForDest.append((uint8_t)16); + rendezvousForDest.append(bestRemoteV6.rawIpData(),16); + + rendezvousForOtherEnd.append((uint16_t)bestDestV6.port()); + rendezvousForOtherEnd.append((uint8_t)16); + rendezvousForOtherEnd.append(bestDestV6.rawIpData(),16); + rendezvousForOtherEnd.setAt<uint16_t>(rendezvousForOtherEndPayloadSizePtr,(uint16_t)(9 + 16)); + } else if ((bestDestV4)&&(bestRemoteV4)) { + haveMatch = true; + + rendezvousForDest.append((uint16_t)bestRemoteV4.port()); + rendezvousForDest.append((uint8_t)4); + rendezvousForDest.append(bestRemoteV4.rawIpData(),4); + + rendezvousForOtherEnd.append((uint16_t)bestDestV4.port()); + rendezvousForOtherEnd.append((uint8_t)4); + rendezvousForOtherEnd.append(bestDestV4.rawIpData(),4); + rendezvousForOtherEnd.setAt<uint16_t>(rendezvousForOtherEndPayloadSizePtr,(uint16_t)(9 + 4)); + } - if (haveMatch) { - _send(fromMemberId,STATE_MESSAGE_PROXY_SEND,rendezvousForOtherEnd.data(),rendezvousForOtherEnd.size()); - RR->sw->send(rendezvousForDest,true,0); + if (haveMatch) { + _send(fromMemberId,STATE_MESSAGE_PROXY_SEND,rendezvousForOtherEnd.data(),rendezvousForOtherEnd.size()); + RR->sw->send(rendezvousForDest,true,0); + } } } } - } - } break; - - case STATE_MESSAGE_PROXY_SEND: { - const Address rcpt(dmsg.field(ptr,ZT_ADDRESS_LENGTH),ZT_ADDRESS_LENGTH); - const Packet::Verb verb = (Packet::Verb)dmsg[ptr++]; - const unsigned int len = dmsg.at<uint16_t>(ptr); ptr += 2; - Packet outp(rcpt,RR->identity.address(),verb); - outp.append(dmsg.field(ptr,len),len); - RR->sw->send(outp,true,0); - TRACE("[%u] proxy send %s to %s length %u",(unsigned int)fromMemberId,Packet::verbString(verb),rcpt.toString().c_str(),len); - } break; + } break; + + case STATE_MESSAGE_PROXY_SEND: { + const Address rcpt(dmsg.field(ptr,ZT_ADDRESS_LENGTH),ZT_ADDRESS_LENGTH); + const Packet::Verb verb = (Packet::Verb)dmsg[ptr++]; + const unsigned int len = dmsg.at<uint16_t>(ptr); ptr += 2; + Packet outp(rcpt,RR->identity.address(),verb); + outp.append(dmsg.field(ptr,len),len); + RR->sw->send(outp,true,0); + TRACE("[%u] proxy send %s to %s length %u",(unsigned int)fromMemberId,Packet::verbString(verb),rcpt.toString().c_str(),len); + } break; + } + } catch ( ... ) { + TRACE("invalid message of size %u type %d (inner decode), discarding",mlen,mtype); + // drop invalids } - } catch ( ... ) { - TRACE("invalid message of size %u type %d (inner decode), discarding",mlen,mtype); - // drop invalids - } - ptr = nextPtr; + ptr = nextPtr; + } + } catch ( ... ) { + TRACE("invalid message (outer loop), discarding"); + // drop invalids } - } catch ( ... ) { - TRACE("invalid message (outer loop), discarding"); - // drop invalids } } @@ -395,10 +406,12 @@ bool Cluster::sendViaCluster(const Address &fromPeerAddress,const Address &toPee _send(canHasPeer,STATE_MESSAGE_RELAY,buf.data(),buf.size()); } + TRACE("sendViaCluster(): relaying %u bytes from %s to %s by way of %u",len,fromPeerAddress.toString().c_str(),toPeerAddress.toString().c_str(),(unsigned int)canHasPeer); return true; + } else { + TRACE("sendViaCluster(): unable to relay %u bytes from %s to %s since no cluster members seem to have it!",len,fromPeerAddress.toString().c_str(),toPeerAddress.toString().c_str()); + return false; } - - return false; } void Cluster::replicateHavePeer(const Identity &peerId) @@ -436,11 +449,12 @@ void Cluster::replicateHavePeer(const Identity &peerId) void Cluster::replicateMulticastLike(uint64_t nwid,const Address &peerAddress,const MulticastGroup &group) { - Buffer<4096> buf; + Buffer<2048> buf; buf.append((uint64_t)nwid); peerAddress.appendTo(buf); group.mac().appendTo(buf); buf.append((uint32_t)group.adi()); + TRACE("replicating %s MULTICAST_LIKE %.16llx/%s/%u to all members",peerAddress.toString().c_str(),nwid,group.mac().toString().c_str(),(unsigned int)group.adi()); { Mutex::Lock _l(_memberIds_m); for(std::vector<uint16_t>::const_iterator mid(_memberIds.begin());mid!=_memberIds.end();++mid) { @@ -452,8 +466,9 @@ void Cluster::replicateMulticastLike(uint64_t nwid,const Address &peerAddress,co void Cluster::replicateCertificateOfNetworkMembership(const CertificateOfMembership &com) { - Buffer<4096> buf; + Buffer<2048> buf; com.serialize(buf); + TRACE("replicating %s COM for %.16llx to all members",com.issuedTo().toString().c_str(),com.networkId()); { Mutex::Lock _l(_memberIds_m); for(std::vector<uint16_t>::const_iterator mid(_memberIds.begin());mid!=_memberIds.end();++mid) { @@ -504,7 +519,7 @@ void Cluster::doPeriodicTasks() void Cluster::addMember(uint16_t memberId) { - if (memberId >= ZT_CLUSTER_MAX_MEMBERS) + if ((memberId >= ZT_CLUSTER_MAX_MEMBERS)||(memberId == _id)) return; Mutex::Lock _l2(_members[memberId].lock); @@ -553,11 +568,12 @@ bool Cluster::redirectPeer(const Address &peerAddress,const InetAddress &peerPhy { if (!peerPhysicalAddress) // sanity check return false; + if (_addressToLocationFunction) { // Pick based on location if it can be determined int px = 0,py = 0,pz = 0; if (_addressToLocationFunction(_addressToLocationFunctionArg,reinterpret_cast<const struct sockaddr_storage *>(&peerPhysicalAddress),&px,&py,&pz) == 0) { - // No geo-info so no change + TRACE("no geolocation available for %s",peerPhysicalAddress.toIpString().c_str()); return false; } @@ -567,6 +583,7 @@ bool Cluster::redirectPeer(const Address &peerAddress,const InetAddress &peerPhy const double currentDistance = _dist3d(_x,_y,_z,px,py,pz); double bestDistance = (offload ? 2147483648.0 : currentDistance); unsigned int bestMember = _id; + TRACE("%s is at %d,%d,%d -- looking for anyone closer than %d,%d,%d (%fkm)",peerPhysicalAddress.toString().c_str(),px,py,pz,_x,_y,_z,bestDistance); { Mutex::Lock _l(_memberIds_m); for(std::vector<uint16_t>::const_iterator mid(_memberIds.begin());mid!=_memberIds.end();++mid) { @@ -577,6 +594,7 @@ bool Cluster::redirectPeer(const Address &peerAddress,const InetAddress &peerPhy if ( ((now - m.lastReceivedAliveAnnouncement) < ZT_CLUSTER_TIMEOUT) && ((m.x != 0)||(m.y != 0)||(m.z != 0)) && (m.zeroTierPhysicalEndpoints.size() > 0) ) { double mdist = _dist3d(m.x,m.y,m.z,px,py,pz); if (mdist < bestDistance) { + bestDistance = mdist; bestMember = *mid; best = m.zeroTierPhysicalEndpoints; } @@ -585,7 +603,7 @@ bool Cluster::redirectPeer(const Address &peerAddress,const InetAddress &peerPhy } if (best.size() > 0) { - TRACE("peer %s is at [%d,%d,%d], distance to us is %f, sending to %u instead for better distance %f",peerAddress.toString().c_str(),px,py,pz,currentDistance,bestMember,bestDistance); + TRACE("%s seems closer to %u at %fkm, suggesting redirect...",peerAddress.toString().c_str(),bestMember,bestDistance); /* if (peer->remoteVersionProtocol() >= 5) { // If it's a newer peer send VERB_PUSH_DIRECT_PATHS which is more idiomatic @@ -620,8 +638,66 @@ bool Cluster::redirectPeer(const Address &peerAddress,const InetAddress &peerPhy } } +void Cluster::status(ZT_ClusterStatus &status) const +{ + const uint64_t now = RR->node->now(); + memset(&status,0,sizeof(ZT_ClusterStatus)); + ZT_ClusterMemberStatus *ms[ZT_CLUSTER_MAX_MEMBERS]; + memset(ms,0,sizeof(ms)); + + status.myId = _id; + + ms[_id] = &(status.members[status.clusterSize++]); + ms[_id]->id = _id; + ms[_id]->alive = 1; + ms[_id]->x = _x; + ms[_id]->y = _y; + ms[_id]->z = _z; + ms[_id]->peers = RR->topology->countAlive(); + for(std::vector<InetAddress>::const_iterator ep(_zeroTierPhysicalEndpoints.begin());ep!=_zeroTierPhysicalEndpoints.end();++ep) { + if (ms[_id]->numZeroTierPhysicalEndpoints >= ZT_CLUSTER_MAX_ZT_PHYSICAL_ADDRESSES) // sanity check + break; + memcpy(&(ms[_id]->zeroTierPhysicalEndpoints[ms[_id]->numZeroTierPhysicalEndpoints++]),&(*ep),sizeof(struct sockaddr_storage)); + } + + { + Mutex::Lock _l1(_memberIds_m); + for(std::vector<uint16_t>::const_iterator mid(_memberIds.begin());mid!=_memberIds.end();++mid) { + if (status.clusterSize >= ZT_CLUSTER_MAX_MEMBERS) // sanity check + break; + ZT_ClusterMemberStatus *s = ms[*mid] = &(status.members[status.clusterSize++]); + _Member &m = _members[*mid]; + Mutex::Lock ml(m.lock); + + s->id = *mid; + s->msSinceLastHeartbeat = (unsigned int)std::min((uint64_t)(~((unsigned int)0)),(now - m.lastReceivedAliveAnnouncement)); + s->alive = (s->msSinceLastHeartbeat < ZT_CLUSTER_TIMEOUT) ? 1 : 0; + s->x = m.x; + s->y = m.y; + s->z = m.z; + s->load = m.load; + for(std::vector<InetAddress>::const_iterator ep(m.zeroTierPhysicalEndpoints.begin());ep!=m.zeroTierPhysicalEndpoints.end();++ep) { + if (s->numZeroTierPhysicalEndpoints >= ZT_CLUSTER_MAX_ZT_PHYSICAL_ADDRESSES) // sanity check + break; + memcpy(&(s->zeroTierPhysicalEndpoints[s->numZeroTierPhysicalEndpoints++]),&(*ep),sizeof(struct sockaddr_storage)); + } + } + } + + { + Mutex::Lock _l2(_peerAffinities_m); + for(std::vector<_PeerAffinity>::const_iterator pi(_peerAffinities.begin());pi!=_peerAffinities.end();++pi) { + unsigned int mid = pi->clusterMemberId(); + if ((ms[mid])&&(mid != _id)&&((now - pi->timestamp) < ZT_PEER_ACTIVITY_TIMEOUT)) + ++ms[mid]->peers; + } + } +} + void Cluster::_send(uint16_t memberId,StateMessageType type,const void *msg,unsigned int len) { + if ((len + 3) > (ZT_CLUSTER_MAX_MESSAGE_LENGTH - (24 + 2 + 2))) // sanity check + return; _Member &m = _members[memberId]; // assumes m.lock is locked! if ((m.q.size() + len + 3) > ZT_CLUSTER_MAX_MESSAGE_LENGTH) |