diff options
Diffstat (limited to 'node')
-rw-r--r-- | node/Cluster.cpp | 458 | ||||
-rw-r--r-- | node/Cluster.hpp | 13 | ||||
-rw-r--r-- | node/InetAddress.hpp | 10 | ||||
-rw-r--r-- | node/Network.cpp | 120 | ||||
-rw-r--r-- | node/Network.hpp | 5 | ||||
-rw-r--r-- | node/Node.cpp | 69 | ||||
-rw-r--r-- | node/Node.hpp | 1 | ||||
-rw-r--r-- | node/SelfAwareness.cpp | 9 | ||||
-rw-r--r-- | node/SelfAwareness.hpp | 10 | ||||
-rw-r--r-- | node/Topology.cpp | 40 | ||||
-rw-r--r-- | node/Topology.hpp | 10 |
11 files changed, 438 insertions, 307 deletions
diff --git a/node/Cluster.cpp b/node/Cluster.cpp index d9514db5..9d25593a 100644 --- a/node/Cluster.cpp +++ b/node/Cluster.cpp @@ -143,212 +143,223 @@ void Cluster::handleIncomingStateMessage(const void *msg,unsigned int len) return; const uint16_t fromMemberId = dmsg.at<uint16_t>(0); unsigned int ptr = 2; - if (fromMemberId == _id) + if (fromMemberId == _id) // sanity check: we don't talk to ourselves return; const uint16_t toMemberId = dmsg.at<uint16_t>(ptr); ptr += 2; - if (toMemberId != _id) + if (toMemberId != _id) // sanity check: message not for us? return; - _Member &m = _members[fromMemberId]; - Mutex::Lock mlck(m.lock); - - try { - while (ptr < dmsg.size()) { - const unsigned int mlen = dmsg.at<uint16_t>(ptr); ptr += 2; - const unsigned int nextPtr = ptr + mlen; - - int mtype = -1; - try { - switch((StateMessageType)(mtype = (int)dmsg[ptr++])) { - default: - break; - - case STATE_MESSAGE_ALIVE: { - ptr += 7; // skip version stuff, not used yet - m.x = dmsg.at<int32_t>(ptr); ptr += 4; - m.y = dmsg.at<int32_t>(ptr); ptr += 4; - m.z = dmsg.at<int32_t>(ptr); ptr += 4; - ptr += 8; // skip local clock, not used - m.load = dmsg.at<uint64_t>(ptr); ptr += 8; - ptr += 8; // skip flags, unused + { // make sure sender is actually considered a member + Mutex::Lock _l3(_memberIds_m); + if (std::find(_memberIds.begin(),_memberIds.end(),fromMemberId) == _memberIds.end()) + return; + } + + { + _Member &m = _members[fromMemberId]; + Mutex::Lock mlck(m.lock); + + try { + while (ptr < dmsg.size()) { + const unsigned int mlen = dmsg.at<uint16_t>(ptr); ptr += 2; + const unsigned int nextPtr = ptr + mlen; + if (nextPtr > dmsg.size()) + break; + + int mtype = -1; + try { + switch((StateMessageType)(mtype = (int)dmsg[ptr++])) { + default: + break; + + case STATE_MESSAGE_ALIVE: { + ptr += 7; // skip version stuff, not used yet + m.x = dmsg.at<int32_t>(ptr); ptr += 4; + m.y = dmsg.at<int32_t>(ptr); ptr += 4; + m.z = dmsg.at<int32_t>(ptr); ptr += 4; + ptr += 8; // skip local clock, not used + m.load = dmsg.at<uint64_t>(ptr); ptr += 8; + ptr += 8; // skip flags, unused #ifdef ZT_TRACE - std::string addrs; + std::string addrs; #endif - unsigned int physicalAddressCount = dmsg[ptr++]; - for(unsigned int i=0;i<physicalAddressCount;++i) { - m.zeroTierPhysicalEndpoints.push_back(InetAddress()); - ptr += m.zeroTierPhysicalEndpoints.back().deserialize(dmsg,ptr); - if (!(m.zeroTierPhysicalEndpoints.back())) { - m.zeroTierPhysicalEndpoints.pop_back(); - } + unsigned int physicalAddressCount = dmsg[ptr++]; + m.zeroTierPhysicalEndpoints.clear(); + for(unsigned int i=0;i<physicalAddressCount;++i) { + m.zeroTierPhysicalEndpoints.push_back(InetAddress()); + ptr += m.zeroTierPhysicalEndpoints.back().deserialize(dmsg,ptr); + if (!(m.zeroTierPhysicalEndpoints.back())) { + m.zeroTierPhysicalEndpoints.pop_back(); + } #ifdef ZT_TRACE - else { - if (addrs.length() > 0) - addrs.push_back(','); - addrs.append(m.zeroTierPhysicalEndpoints.back().toString()); - } + else { + if (addrs.length() > 0) + addrs.push_back(','); + addrs.append(m.zeroTierPhysicalEndpoints.back().toString()); + } #endif - } - m.lastReceivedAliveAnnouncement = RR->node->now(); + } + m.lastReceivedAliveAnnouncement = RR->node->now(); #ifdef ZT_TRACE - TRACE("[%u] I'm alive! send me peers at %s",(unsigned int)fromMemberId,addrs.c_str()); + TRACE("[%u] I'm alive! peers close to %d,%d,%d can be redirected to: %s",(unsigned int)fromMemberId,m.x,m.y,m.z,addrs.c_str()); #endif - } break; - - case STATE_MESSAGE_HAVE_PEER: { - try { - Identity id; - ptr += id.deserialize(dmsg,ptr); - if (id) { - RR->topology->saveIdentity(id); - - { // Add or update peer affinity entry - _PeerAffinity pa(id.address(),fromMemberId,RR->node->now()); - Mutex::Lock _l2(_peerAffinities_m); - std::vector<_PeerAffinity>::iterator i(std::lower_bound(_peerAffinities.begin(),_peerAffinities.end(),pa)); // O(log(n)) - if ((i != _peerAffinities.end())&&(i->key == pa.key)) { - i->timestamp = pa.timestamp; - } else { - _peerAffinities.push_back(pa); - std::sort(_peerAffinities.begin(),_peerAffinities.end()); // probably a more efficient way to insert but okay for now - } - } + } break; + + case STATE_MESSAGE_HAVE_PEER: { + try { + Identity id; + ptr += id.deserialize(dmsg,ptr); + if (id) { + RR->topology->saveIdentity(id); + + { // Add or update peer affinity entry + _PeerAffinity pa(id.address(),fromMemberId,RR->node->now()); + Mutex::Lock _l2(_peerAffinities_m); + std::vector<_PeerAffinity>::iterator i(std::lower_bound(_peerAffinities.begin(),_peerAffinities.end(),pa)); // O(log(n)) + if ((i != _peerAffinities.end())&&(i->key == pa.key)) { + i->timestamp = pa.timestamp; + } else { + _peerAffinities.push_back(pa); + std::sort(_peerAffinities.begin(),_peerAffinities.end()); // probably a more efficient way to insert but okay for now + } + } - TRACE("[%u] has %s",(unsigned int)fromMemberId,id.address().toString().c_str()); - } - } catch ( ... ) { - // ignore invalid identities - } - } break; - - case STATE_MESSAGE_MULTICAST_LIKE: { - const uint64_t nwid = dmsg.at<uint64_t>(ptr); ptr += 8; - const Address address(dmsg.field(ptr,ZT_ADDRESS_LENGTH),ZT_ADDRESS_LENGTH); ptr += ZT_ADDRESS_LENGTH; - const MAC mac(dmsg.field(ptr,6),6); ptr += 6; - const uint32_t adi = dmsg.at<uint32_t>(ptr); ptr += 4; - RR->mc->add(RR->node->now(),nwid,MulticastGroup(mac,adi),address); - TRACE("[%u] %s likes %s/%u on %.16llu",(unsigned int)fromMemberId,address.toString().c_str(),mac.toString().c_str(),(unsigned int)adi,nwid); - } break; - - case STATE_MESSAGE_COM: { - CertificateOfMembership com; - ptr += com.deserialize(dmsg,ptr); - if (com) { - TRACE("[%u] COM for %s on %.16llu rev %llu",(unsigned int)fromMemberId,com.issuedTo().toString().c_str(),com.networkId(),com.revision()); - } - } break; - - case STATE_MESSAGE_RELAY: { - const unsigned int numRemotePeerPaths = dmsg[ptr++]; - InetAddress remotePeerPaths[256]; // size is 8-bit, so 256 is max - for(unsigned int i=0;i<numRemotePeerPaths;++i) - ptr += remotePeerPaths[i].deserialize(dmsg,ptr); - const unsigned int packetLen = dmsg.at<uint16_t>(ptr); ptr += 2; - const void *packet = (const void *)dmsg.field(ptr,packetLen); ptr += packetLen; - - if (packetLen >= ZT_PROTO_MIN_FRAGMENT_LENGTH) { // ignore anything too short to contain a dest address - const Address destinationAddress(reinterpret_cast<const char *>(packet) + 8,ZT_ADDRESS_LENGTH); - TRACE("[%u] relay %u bytes to %s (%u remote paths included)",(unsigned int)fromMemberId,packetLen,destinationAddress.toString().c_str(),numRemotePeerPaths); - - SharedPtr<Peer> destinationPeer(RR->topology->getPeer(destinationAddress)); - if (destinationPeer) { - if ( - (destinationPeer->send(RR,packet,packetLen,RR->node->now()))&& - (numRemotePeerPaths > 0)&& - (packetLen >= 18)&& - (reinterpret_cast<const unsigned char *>(packet)[ZT_PACKET_FRAGMENT_IDX_FRAGMENT_INDICATOR] == ZT_PACKET_FRAGMENT_INDICATOR) - ) { - // If remote peer paths were sent with this relayed packet, we do - // RENDEZVOUS. It's handled here for cluster-relayed packets since - // we don't have both Peer records so this is a different path. - - const Address remotePeerAddress(reinterpret_cast<const char *>(packet) + 13,ZT_ADDRESS_LENGTH); - - InetAddress bestDestV4,bestDestV6; - destinationPeer->getBestActiveAddresses(RR->node->now(),bestDestV4,bestDestV6); - InetAddress bestRemoteV4,bestRemoteV6; - for(unsigned int i=0;i<numRemotePeerPaths;++i) { - if ((bestRemoteV4)&&(bestRemoteV6)) - break; - switch(remotePeerPaths[i].ss_family) { - case AF_INET: - if (!bestRemoteV4) - bestRemoteV4 = remotePeerPaths[i]; - break; - case AF_INET6: - if (!bestRemoteV6) - bestRemoteV6 = remotePeerPaths[i]; + TRACE("[%u] has %s",(unsigned int)fromMemberId,id.address().toString().c_str()); + } + } catch ( ... ) { + // ignore invalid identities + } + } break; + + case STATE_MESSAGE_MULTICAST_LIKE: { + const uint64_t nwid = dmsg.at<uint64_t>(ptr); ptr += 8; + const Address address(dmsg.field(ptr,ZT_ADDRESS_LENGTH),ZT_ADDRESS_LENGTH); ptr += ZT_ADDRESS_LENGTH; + const MAC mac(dmsg.field(ptr,6),6); ptr += 6; + const uint32_t adi = dmsg.at<uint32_t>(ptr); ptr += 4; + RR->mc->add(RR->node->now(),nwid,MulticastGroup(mac,adi),address); + TRACE("[%u] %s likes %s/%u on %.16llu",(unsigned int)fromMemberId,address.toString().c_str(),mac.toString().c_str(),(unsigned int)adi,nwid); + } break; + + case STATE_MESSAGE_COM: { + CertificateOfMembership com; + ptr += com.deserialize(dmsg,ptr); + if (com) { + TRACE("[%u] COM for %s on %.16llu rev %llu",(unsigned int)fromMemberId,com.issuedTo().toString().c_str(),com.networkId(),com.revision()); + } + } break; + + case STATE_MESSAGE_RELAY: { + const unsigned int numRemotePeerPaths = dmsg[ptr++]; + InetAddress remotePeerPaths[256]; // size is 8-bit, so 256 is max + for(unsigned int i=0;i<numRemotePeerPaths;++i) + ptr += remotePeerPaths[i].deserialize(dmsg,ptr); + const unsigned int packetLen = dmsg.at<uint16_t>(ptr); ptr += 2; + const void *packet = (const void *)dmsg.field(ptr,packetLen); ptr += packetLen; + + if (packetLen >= ZT_PROTO_MIN_FRAGMENT_LENGTH) { // ignore anything too short to contain a dest address + const Address destinationAddress(reinterpret_cast<const char *>(packet) + 8,ZT_ADDRESS_LENGTH); + TRACE("[%u] relay %u bytes to %s (%u remote paths included)",(unsigned int)fromMemberId,packetLen,destinationAddress.toString().c_str(),numRemotePeerPaths); + + SharedPtr<Peer> destinationPeer(RR->topology->getPeer(destinationAddress)); + if (destinationPeer) { + if ( + (destinationPeer->send(RR,packet,packetLen,RR->node->now()))&& + (numRemotePeerPaths > 0)&& + (packetLen >= 18)&& + (reinterpret_cast<const unsigned char *>(packet)[ZT_PACKET_FRAGMENT_IDX_FRAGMENT_INDICATOR] == ZT_PACKET_FRAGMENT_INDICATOR) + ) { + // If remote peer paths were sent with this relayed packet, we do + // RENDEZVOUS. It's handled here for cluster-relayed packets since + // we don't have both Peer records so this is a different path. + + const Address remotePeerAddress(reinterpret_cast<const char *>(packet) + 13,ZT_ADDRESS_LENGTH); + + InetAddress bestDestV4,bestDestV6; + destinationPeer->getBestActiveAddresses(RR->node->now(),bestDestV4,bestDestV6); + InetAddress bestRemoteV4,bestRemoteV6; + for(unsigned int i=0;i<numRemotePeerPaths;++i) { + if ((bestRemoteV4)&&(bestRemoteV6)) break; + switch(remotePeerPaths[i].ss_family) { + case AF_INET: + if (!bestRemoteV4) + bestRemoteV4 = remotePeerPaths[i]; + break; + case AF_INET6: + if (!bestRemoteV6) + bestRemoteV6 = remotePeerPaths[i]; + break; + } } - } - Packet rendezvousForDest(destinationAddress,RR->identity.address(),Packet::VERB_RENDEZVOUS); - rendezvousForDest.append((uint8_t)0); - remotePeerAddress.appendTo(rendezvousForDest); - - Buffer<2048> rendezvousForOtherEnd; - remotePeerAddress.appendTo(rendezvousForOtherEnd); - rendezvousForOtherEnd.append((uint8_t)Packet::VERB_RENDEZVOUS); - const unsigned int rendezvousForOtherEndPayloadSizePtr = rendezvousForOtherEnd.size(); - rendezvousForOtherEnd.addSize(2); // space for actual packet payload length - rendezvousForOtherEnd.append((uint8_t)0); // flags == 0 - destinationAddress.appendTo(rendezvousForOtherEnd); - - bool haveMatch = false; - if ((bestDestV6)&&(bestRemoteV6)) { - haveMatch = true; - - rendezvousForDest.append((uint16_t)bestRemoteV6.port()); - rendezvousForDest.append((uint8_t)16); - rendezvousForDest.append(bestRemoteV6.rawIpData(),16); - - rendezvousForOtherEnd.append((uint16_t)bestDestV6.port()); - rendezvousForOtherEnd.append((uint8_t)16); - rendezvousForOtherEnd.append(bestDestV6.rawIpData(),16); - rendezvousForOtherEnd.setAt<uint16_t>(rendezvousForOtherEndPayloadSizePtr,(uint16_t)(9 + 16)); - } else if ((bestDestV4)&&(bestRemoteV4)) { - haveMatch = true; - - rendezvousForDest.append((uint16_t)bestRemoteV4.port()); - rendezvousForDest.append((uint8_t)4); - rendezvousForDest.append(bestRemoteV4.rawIpData(),4); - - rendezvousForOtherEnd.append((uint16_t)bestDestV4.port()); - rendezvousForOtherEnd.append((uint8_t)4); - rendezvousForOtherEnd.append(bestDestV4.rawIpData(),4); - rendezvousForOtherEnd.setAt<uint16_t>(rendezvousForOtherEndPayloadSizePtr,(uint16_t)(9 + 4)); - } + Packet rendezvousForDest(destinationAddress,RR->identity.address(),Packet::VERB_RENDEZVOUS); + rendezvousForDest.append((uint8_t)0); + remotePeerAddress.appendTo(rendezvousForDest); + + Buffer<2048> rendezvousForOtherEnd; + remotePeerAddress.appendTo(rendezvousForOtherEnd); + rendezvousForOtherEnd.append((uint8_t)Packet::VERB_RENDEZVOUS); + const unsigned int rendezvousForOtherEndPayloadSizePtr = rendezvousForOtherEnd.size(); + rendezvousForOtherEnd.addSize(2); // space for actual packet payload length + rendezvousForOtherEnd.append((uint8_t)0); // flags == 0 + destinationAddress.appendTo(rendezvousForOtherEnd); + + bool haveMatch = false; + if ((bestDestV6)&&(bestRemoteV6)) { + haveMatch = true; + + rendezvousForDest.append((uint16_t)bestRemoteV6.port()); + rendezvousForDest.append((uint8_t)16); + rendezvousForDest.append(bestRemoteV6.rawIpData(),16); + + rendezvousForOtherEnd.append((uint16_t)bestDestV6.port()); + rendezvousForOtherEnd.append((uint8_t)16); + rendezvousForOtherEnd.append(bestDestV6.rawIpData(),16); + rendezvousForOtherEnd.setAt<uint16_t>(rendezvousForOtherEndPayloadSizePtr,(uint16_t)(9 + 16)); + } else if ((bestDestV4)&&(bestRemoteV4)) { + haveMatch = true; + + rendezvousForDest.append((uint16_t)bestRemoteV4.port()); + rendezvousForDest.append((uint8_t)4); + rendezvousForDest.append(bestRemoteV4.rawIpData(),4); + + rendezvousForOtherEnd.append((uint16_t)bestDestV4.port()); + rendezvousForOtherEnd.append((uint8_t)4); + rendezvousForOtherEnd.append(bestDestV4.rawIpData(),4); + rendezvousForOtherEnd.setAt<uint16_t>(rendezvousForOtherEndPayloadSizePtr,(uint16_t)(9 + 4)); + } - if (haveMatch) { - _send(fromMemberId,STATE_MESSAGE_PROXY_SEND,rendezvousForOtherEnd.data(),rendezvousForOtherEnd.size()); - RR->sw->send(rendezvousForDest,true,0); + if (haveMatch) { + _send(fromMemberId,STATE_MESSAGE_PROXY_SEND,rendezvousForOtherEnd.data(),rendezvousForOtherEnd.size()); + RR->sw->send(rendezvousForDest,true,0); + } } } } - } - } break; - - case STATE_MESSAGE_PROXY_SEND: { - const Address rcpt(dmsg.field(ptr,ZT_ADDRESS_LENGTH),ZT_ADDRESS_LENGTH); - const Packet::Verb verb = (Packet::Verb)dmsg[ptr++]; - const unsigned int len = dmsg.at<uint16_t>(ptr); ptr += 2; - Packet outp(rcpt,RR->identity.address(),verb); - outp.append(dmsg.field(ptr,len),len); - RR->sw->send(outp,true,0); - TRACE("[%u] proxy send %s to %s length %u",(unsigned int)fromMemberId,Packet::verbString(verb),rcpt.toString().c_str(),len); - } break; + } break; + + case STATE_MESSAGE_PROXY_SEND: { + const Address rcpt(dmsg.field(ptr,ZT_ADDRESS_LENGTH),ZT_ADDRESS_LENGTH); + const Packet::Verb verb = (Packet::Verb)dmsg[ptr++]; + const unsigned int len = dmsg.at<uint16_t>(ptr); ptr += 2; + Packet outp(rcpt,RR->identity.address(),verb); + outp.append(dmsg.field(ptr,len),len); + RR->sw->send(outp,true,0); + TRACE("[%u] proxy send %s to %s length %u",(unsigned int)fromMemberId,Packet::verbString(verb),rcpt.toString().c_str(),len); + } break; + } + } catch ( ... ) { + TRACE("invalid message of size %u type %d (inner decode), discarding",mlen,mtype); + // drop invalids } - } catch ( ... ) { - TRACE("invalid message of size %u type %d (inner decode), discarding",mlen,mtype); - // drop invalids - } - ptr = nextPtr; + ptr = nextPtr; + } + } catch ( ... ) { + TRACE("invalid message (outer loop), discarding"); + // drop invalids } - } catch ( ... ) { - TRACE("invalid message (outer loop), discarding"); - // drop invalids } } @@ -395,10 +406,12 @@ bool Cluster::sendViaCluster(const Address &fromPeerAddress,const Address &toPee _send(canHasPeer,STATE_MESSAGE_RELAY,buf.data(),buf.size()); } + TRACE("sendViaCluster(): relaying %u bytes from %s to %s by way of %u",len,fromPeerAddress.toString().c_str(),toPeerAddress.toString().c_str(),(unsigned int)canHasPeer); return true; + } else { + TRACE("sendViaCluster(): unable to relay %u bytes from %s to %s since no cluster members seem to have it!",len,fromPeerAddress.toString().c_str(),toPeerAddress.toString().c_str()); + return false; } - - return false; } void Cluster::replicateHavePeer(const Identity &peerId) @@ -436,11 +449,12 @@ void Cluster::replicateHavePeer(const Identity &peerId) void Cluster::replicateMulticastLike(uint64_t nwid,const Address &peerAddress,const MulticastGroup &group) { - Buffer<4096> buf; + Buffer<2048> buf; buf.append((uint64_t)nwid); peerAddress.appendTo(buf); group.mac().appendTo(buf); buf.append((uint32_t)group.adi()); + TRACE("replicating %s MULTICAST_LIKE %.16llx/%s/%u to all members",peerAddress.toString().c_str(),nwid,group.mac().toString().c_str(),(unsigned int)group.adi()); { Mutex::Lock _l(_memberIds_m); for(std::vector<uint16_t>::const_iterator mid(_memberIds.begin());mid!=_memberIds.end();++mid) { @@ -452,8 +466,9 @@ void Cluster::replicateMulticastLike(uint64_t nwid,const Address &peerAddress,co void Cluster::replicateCertificateOfNetworkMembership(const CertificateOfMembership &com) { - Buffer<4096> buf; + Buffer<2048> buf; com.serialize(buf); + TRACE("replicating %s COM for %.16llx to all members",com.issuedTo().toString().c_str(),com.networkId()); { Mutex::Lock _l(_memberIds_m); for(std::vector<uint16_t>::const_iterator mid(_memberIds.begin());mid!=_memberIds.end();++mid) { @@ -504,7 +519,7 @@ void Cluster::doPeriodicTasks() void Cluster::addMember(uint16_t memberId) { - if (memberId >= ZT_CLUSTER_MAX_MEMBERS) + if ((memberId >= ZT_CLUSTER_MAX_MEMBERS)||(memberId == _id)) return; Mutex::Lock _l2(_members[memberId].lock); @@ -553,11 +568,12 @@ bool Cluster::redirectPeer(const Address &peerAddress,const InetAddress &peerPhy { if (!peerPhysicalAddress) // sanity check return false; + if (_addressToLocationFunction) { // Pick based on location if it can be determined int px = 0,py = 0,pz = 0; if (_addressToLocationFunction(_addressToLocationFunctionArg,reinterpret_cast<const struct sockaddr_storage *>(&peerPhysicalAddress),&px,&py,&pz) == 0) { - // No geo-info so no change + TRACE("no geolocation available for %s",peerPhysicalAddress.toIpString().c_str()); return false; } @@ -567,6 +583,7 @@ bool Cluster::redirectPeer(const Address &peerAddress,const InetAddress &peerPhy const double currentDistance = _dist3d(_x,_y,_z,px,py,pz); double bestDistance = (offload ? 2147483648.0 : currentDistance); unsigned int bestMember = _id; + TRACE("%s is at %d,%d,%d -- looking for anyone closer than %d,%d,%d (%fkm)",peerPhysicalAddress.toString().c_str(),px,py,pz,_x,_y,_z,bestDistance); { Mutex::Lock _l(_memberIds_m); for(std::vector<uint16_t>::const_iterator mid(_memberIds.begin());mid!=_memberIds.end();++mid) { @@ -577,6 +594,7 @@ bool Cluster::redirectPeer(const Address &peerAddress,const InetAddress &peerPhy if ( ((now - m.lastReceivedAliveAnnouncement) < ZT_CLUSTER_TIMEOUT) && ((m.x != 0)||(m.y != 0)||(m.z != 0)) && (m.zeroTierPhysicalEndpoints.size() > 0) ) { double mdist = _dist3d(m.x,m.y,m.z,px,py,pz); if (mdist < bestDistance) { + bestDistance = mdist; bestMember = *mid; best = m.zeroTierPhysicalEndpoints; } @@ -585,7 +603,7 @@ bool Cluster::redirectPeer(const Address &peerAddress,const InetAddress &peerPhy } if (best.size() > 0) { - TRACE("peer %s is at [%d,%d,%d], distance to us is %f, sending to %u instead for better distance %f",peerAddress.toString().c_str(),px,py,pz,currentDistance,bestMember,bestDistance); + TRACE("%s seems closer to %u at %fkm, suggesting redirect...",peerAddress.toString().c_str(),bestMember,bestDistance); /* if (peer->remoteVersionProtocol() >= 5) { // If it's a newer peer send VERB_PUSH_DIRECT_PATHS which is more idiomatic @@ -620,8 +638,66 @@ bool Cluster::redirectPeer(const Address &peerAddress,const InetAddress &peerPhy } } +void Cluster::status(ZT_ClusterStatus &status) const +{ + const uint64_t now = RR->node->now(); + memset(&status,0,sizeof(ZT_ClusterStatus)); + ZT_ClusterMemberStatus *ms[ZT_CLUSTER_MAX_MEMBERS]; + memset(ms,0,sizeof(ms)); + + status.myId = _id; + + ms[_id] = &(status.members[status.clusterSize++]); + ms[_id]->id = _id; + ms[_id]->alive = 1; + ms[_id]->x = _x; + ms[_id]->y = _y; + ms[_id]->z = _z; + ms[_id]->peers = RR->topology->countAlive(); + for(std::vector<InetAddress>::const_iterator ep(_zeroTierPhysicalEndpoints.begin());ep!=_zeroTierPhysicalEndpoints.end();++ep) { + if (ms[_id]->numZeroTierPhysicalEndpoints >= ZT_CLUSTER_MAX_ZT_PHYSICAL_ADDRESSES) // sanity check + break; + memcpy(&(ms[_id]->zeroTierPhysicalEndpoints[ms[_id]->numZeroTierPhysicalEndpoints++]),&(*ep),sizeof(struct sockaddr_storage)); + } + + { + Mutex::Lock _l1(_memberIds_m); + for(std::vector<uint16_t>::const_iterator mid(_memberIds.begin());mid!=_memberIds.end();++mid) { + if (status.clusterSize >= ZT_CLUSTER_MAX_MEMBERS) // sanity check + break; + ZT_ClusterMemberStatus *s = ms[*mid] = &(status.members[status.clusterSize++]); + _Member &m = _members[*mid]; + Mutex::Lock ml(m.lock); + + s->id = *mid; + s->msSinceLastHeartbeat = (unsigned int)std::min((uint64_t)(~((unsigned int)0)),(now - m.lastReceivedAliveAnnouncement)); + s->alive = (s->msSinceLastHeartbeat < ZT_CLUSTER_TIMEOUT) ? 1 : 0; + s->x = m.x; + s->y = m.y; + s->z = m.z; + s->load = m.load; + for(std::vector<InetAddress>::const_iterator ep(m.zeroTierPhysicalEndpoints.begin());ep!=m.zeroTierPhysicalEndpoints.end();++ep) { + if (s->numZeroTierPhysicalEndpoints >= ZT_CLUSTER_MAX_ZT_PHYSICAL_ADDRESSES) // sanity check + break; + memcpy(&(s->zeroTierPhysicalEndpoints[s->numZeroTierPhysicalEndpoints++]),&(*ep),sizeof(struct sockaddr_storage)); + } + } + } + + { + Mutex::Lock _l2(_peerAffinities_m); + for(std::vector<_PeerAffinity>::const_iterator pi(_peerAffinities.begin());pi!=_peerAffinities.end();++pi) { + unsigned int mid = pi->clusterMemberId(); + if ((ms[mid])&&(mid != _id)&&((now - pi->timestamp) < ZT_PEER_ACTIVITY_TIMEOUT)) + ++ms[mid]->peers; + } + } +} + void Cluster::_send(uint16_t memberId,StateMessageType type,const void *msg,unsigned int len) { + if ((len + 3) > (ZT_CLUSTER_MAX_MESSAGE_LENGTH - (24 + 2 + 2))) // sanity check + return; _Member &m = _members[memberId]; // assumes m.lock is locked! if ((m.q.size() + len + 3) > ZT_CLUSTER_MAX_MESSAGE_LENGTH) diff --git a/node/Cluster.hpp b/node/Cluster.hpp index 2e60fd6b..be346659 100644 --- a/node/Cluster.hpp +++ b/node/Cluster.hpp @@ -47,7 +47,7 @@ /** * Timeout for cluster members being considered "alive" */ -#define ZT_CLUSTER_TIMEOUT 30000 +#define ZT_CLUSTER_TIMEOUT 10000 /** * How often should we announce that we have a peer? @@ -57,7 +57,7 @@ /** * Desired period between doPeriodicTasks() in milliseconds */ -#define ZT_CLUSTER_PERIODIC_TASK_PERIOD 50 +#define ZT_CLUSTER_PERIODIC_TASK_PERIOD 100 namespace ZeroTier { @@ -145,7 +145,7 @@ public: STATE_MESSAGE_RELAY = 5, /** - * Request to send a packet to a locally-known peer: + * Request that a cluster member send a packet to a locally-known peer: * <[5] ZeroTier address of recipient> * <[1] packet verb> * <[2] length of packet payload> @@ -254,6 +254,13 @@ public: */ bool redirectPeer(const Address &peerAddress,const InetAddress &peerPhysicalAddress,bool offload); + /** + * Fill out ZT_ClusterStatus structure (from core API) + * + * @param status Reference to structure to hold result (anything there is replaced) + */ + void status(ZT_ClusterStatus &status) const; + private: void _send(uint16_t memberId,StateMessageType type,const void *msg,unsigned int len); void _flush(uint16_t memberId); diff --git a/node/InetAddress.hpp b/node/InetAddress.hpp index 50db272a..ecafcf51 100644 --- a/node/InetAddress.hpp +++ b/node/InetAddress.hpp @@ -409,6 +409,16 @@ struct InetAddress : public sockaddr_storage switch(b[p++]) { case 0: return 1; + case 0x01: + // TODO: Ethernet address (but accept for forward compatibility) + return 7; + case 0x02: + // TODO: Bluetooth address (but accept for forward compatibility) + return 7; + case 0x03: + // TODO: Other address types (but accept for forward compatibility) + // These could be extended/optional things like AF_UNIX, LTE Direct, shared memory, etc. + return (unsigned int)(b.template at<uint16_t>(p) + 3); // other addresses begin with 16-bit non-inclusive length case 0x04: ss_family = AF_INET; memcpy(&(reinterpret_cast<struct sockaddr_in *>(this)->sin_addr.s_addr),b.field(p,4),4); p += 4; diff --git a/node/Network.cpp b/node/Network.cpp index 46f93241..cd30e386 100644 --- a/node/Network.cpp +++ b/node/Network.cpp @@ -144,7 +144,15 @@ void Network::multicastUnsubscribe(const MulticastGroup &mg) bool Network::tryAnnounceMulticastGroupsTo(const SharedPtr<Peer> &peer) { Mutex::Lock _l(_lock); - return _tryAnnounceMulticastGroupsTo(RR->topology->rootAddresses(),_allMulticastGroups(),peer,RR->node->now()); + if ( + (_isAllowed(peer)) || + (peer->address() == this->controller()) || + (RR->topology->isRoot(peer->identity())) + ) { + _announceMulticastGroupsTo(peer->address(),_allMulticastGroups()); + return true; + } + return false; } bool Network::applyConfiguration(const SharedPtr<NetworkConfig> &conf) @@ -400,77 +408,80 @@ bool Network::_isAllowed(const SharedPtr<Peer> &peer) const return false; // default position on any failure } -bool Network::_tryAnnounceMulticastGroupsTo(const std::vector<Address> &alwaysAddresses,const std::vector<MulticastGroup> &allMulticastGroups,const SharedPtr<Peer> &peer,uint64_t now) const -{ - // assumes _lock is locked - if ( - (_isAllowed(peer)) || - (peer->address() == this->controller()) || - (std::find(alwaysAddresses.begin(),alwaysAddresses.end(),peer->address()) != alwaysAddresses.end()) - ) { - - if ((_config)&&(_config->com())&&(!_config->isPublic())&&(peer->needsOurNetworkMembershipCertificate(_id,now,true))) { - Packet outp(peer->address(),RR->identity.address(),Packet::VERB_NETWORK_MEMBERSHIP_CERTIFICATE); - _config->com().serialize(outp); - outp.armor(peer->key(),true); - peer->send(RR,outp.data(),outp.size(),now); - } - - { - Packet outp(peer->address(),RR->identity.address(),Packet::VERB_MULTICAST_LIKE); - - for(std::vector<MulticastGroup>::const_iterator mg(allMulticastGroups.begin());mg!=allMulticastGroups.end();++mg) { - if ((outp.size() + 18) >= ZT_UDP_DEFAULT_PAYLOAD_MTU) { - outp.armor(peer->key(),true); - peer->send(RR,outp.data(),outp.size(),now); - outp.reset(peer->address(),RR->identity.address(),Packet::VERB_MULTICAST_LIKE); - } - - // network ID, MAC, ADI - outp.append((uint64_t)_id); - mg->mac().appendTo(outp); - outp.append((uint32_t)mg->adi()); - } - - if (outp.size() > ZT_PROTO_MIN_PACKET_LENGTH) { - outp.armor(peer->key(),true); - peer->send(RR,outp.data(),outp.size(),now); - } - } - - return true; - } - return false; -} - -class _AnnounceMulticastGroupsToAll +class _GetPeersThatNeedMulticastAnnouncement { public: - _AnnounceMulticastGroupsToAll(const RuntimeEnvironment *renv,Network *nw) : + _GetPeersThatNeedMulticastAnnouncement(const RuntimeEnvironment *renv,Network *nw) : _now(renv->node->now()), + _controller(nw->controller()), _network(nw), - _rootAddresses(renv->topology->rootAddresses()), - _allMulticastGroups(nw->_allMulticastGroups()) + _rootAddresses(renv->topology->rootAddresses()) {} - - inline void operator()(Topology &t,const SharedPtr<Peer> &p) { _network->_tryAnnounceMulticastGroupsTo(_rootAddresses,_allMulticastGroups,p,_now); } - + inline void operator()(Topology &t,const SharedPtr<Peer> &p) + { + if ( + (_network->_isAllowed(p)) || + (p->address() == _controller) || + (std::find(_rootAddresses.begin(),_rootAddresses.end(),p->address()) != _rootAddresses.end()) + ) { + peers.push_back(p->address()); + } + } + std::vector<Address> peers; private: uint64_t _now; + Address _controller; Network *_network; std::vector<Address> _rootAddresses; - std::vector<MulticastGroup> _allMulticastGroups; }; void Network::_announceMulticastGroups() { // Assumes _lock is locked - _AnnounceMulticastGroupsToAll afunc(RR,this); - RR->topology->eachPeer<_AnnounceMulticastGroupsToAll &>(afunc); + + _GetPeersThatNeedMulticastAnnouncement gpfunc(RR,this); + RR->topology->eachPeer<_GetPeersThatNeedMulticastAnnouncement &>(gpfunc); + + std::vector<MulticastGroup> allMulticastGroups(_allMulticastGroups()); + for(std::vector<Address>::const_iterator pa(gpfunc.peers.begin());pa!=gpfunc.peers.end();++pa) + _announceMulticastGroupsTo(*pa,allMulticastGroups); +} + +void Network::_announceMulticastGroupsTo(const Address &peerAddress,const std::vector<MulticastGroup> &allMulticastGroups) const +{ + // Assumes _lock is locked + + // We push COMs ahead of MULTICAST_LIKE since they're used for access control -- a COM is a public + // credential so "over-sharing" isn't really an issue (and we only do so with roots). + if ((_config)&&(_config->com())&&(!_config->isPublic())) { + Packet outp(peerAddress,RR->identity.address(),Packet::VERB_NETWORK_MEMBERSHIP_CERTIFICATE); + _config->com().serialize(outp); + RR->sw->send(outp,true,0); + } + + { + Packet outp(peerAddress,RR->identity.address(),Packet::VERB_MULTICAST_LIKE); + + for(std::vector<MulticastGroup>::const_iterator mg(allMulticastGroups.begin());mg!=allMulticastGroups.end();++mg) { + if ((outp.size() + 18) >= ZT_UDP_DEFAULT_PAYLOAD_MTU) { + RR->sw->send(outp,true,0); + outp.reset(peerAddress,RR->identity.address(),Packet::VERB_MULTICAST_LIKE); + } + + // network ID, MAC, ADI + outp.append((uint64_t)_id); + mg->mac().appendTo(outp); + outp.append((uint32_t)mg->adi()); + } + + if (outp.size() > ZT_PROTO_MIN_PACKET_LENGTH) + RR->sw->send(outp,true,0); + } } std::vector<MulticastGroup> Network::_allMulticastGroups() const { // Assumes _lock is locked + std::vector<MulticastGroup> mgs; mgs.reserve(_myMulticastGroups.size() + _multicastGroupsBehindMe.size() + 1); mgs.insert(mgs.end(),_myMulticastGroups.begin(),_myMulticastGroups.end()); @@ -479,6 +490,7 @@ std::vector<MulticastGroup> Network::_allMulticastGroups() const mgs.push_back(Network::BROADCAST); std::sort(mgs.begin(),mgs.end()); mgs.erase(std::unique(mgs.begin(),mgs.end()),mgs.end()); + return mgs; } diff --git a/node/Network.hpp b/node/Network.hpp index f7939323..0effa8e2 100644 --- a/node/Network.hpp +++ b/node/Network.hpp @@ -56,7 +56,7 @@ namespace ZeroTier { class RuntimeEnvironment; class Peer; -class _AnnounceMulticastGroupsToAll; // internal function object in Network.cpp +class _GetPeersThatNeedMulticastAnnouncement; /** * A virtual LAN @@ -64,7 +64,7 @@ class _AnnounceMulticastGroupsToAll; // internal function object in Network.cpp class Network : NonCopyable { friend class SharedPtr<Network>; - friend class _AnnounceMulticastGroupsToAll; + friend class _GetPeersThatNeedMulticastAnnouncement; // internal function object public: /** @@ -344,6 +344,7 @@ private: bool _isAllowed(const SharedPtr<Peer> &peer) const; bool _tryAnnounceMulticastGroupsTo(const std::vector<Address> &rootAddresses,const std::vector<MulticastGroup> &allMulticastGroups,const SharedPtr<Peer> &peer,uint64_t now) const; void _announceMulticastGroups(); + void _announceMulticastGroupsTo(const Address &peerAddress,const std::vector<MulticastGroup> &allMulticastGroups) const; std::vector<MulticastGroup> _allMulticastGroups() const; const RuntimeEnvironment *RR; diff --git a/node/Node.cpp b/node/Node.cpp index 6eea3d3d..2b298903 100644 --- a/node/Node.cpp +++ b/node/Node.cpp @@ -211,8 +211,15 @@ public: } } - // If this is a network preferred relay, also always ping and if a stable endpoint is specified use that if not alive if (!upstream) { + // If I am a root server, only ping other root servers -- roots don't ping "down" + // since that would just be a waste of bandwidth and could potentially cause route + // flapping in Cluster mode. + if (RR->topology->amRoot()) + return; + + // Check for network preferred relays, also considered 'upstream' and thus always + // pinged to keep links up. If they have stable addresses we will try them there. for(std::vector< std::pair<Address,InetAddress> >::const_iterator r(_relays.begin());r!=_relays.end();++r) { if (r->first == p->address()) { if (r->second.ss_family == AF_INET) @@ -229,18 +236,22 @@ public: // "Upstream" devices are roots and relays and get special treatment -- they stay alive // forever and we try to keep (if available) both IPv4 and IPv6 channels open to them. bool needToContactIndirect = true; - if (!p->doPingAndKeepalive(RR,_now,AF_INET)) { + if (p->doPingAndKeepalive(RR,_now,AF_INET)) { + needToContactIndirect = false; + } else { if (stableEndpoint4) { needToContactIndirect = false; p->attemptToContactAt(RR,InetAddress(),stableEndpoint4,_now); } - } else needToContactIndirect = false; - if (!p->doPingAndKeepalive(RR,_now,AF_INET6)) { + } + if (p->doPingAndKeepalive(RR,_now,AF_INET6)) { + needToContactIndirect = false; + } else { if (stableEndpoint6) { needToContactIndirect = false; p->attemptToContactAt(RR,InetAddress(),stableEndpoint6,_now); } - } else needToContactIndirect = false; + } if (needToContactIndirect) { // If this is an upstream and we have no stable endpoint for either IPv4 or IPv6, @@ -625,6 +636,18 @@ void Node::clusterHandleIncomingMessage(const void *msg,unsigned int len) #endif } +void Node::clusterStatus(ZT_ClusterStatus *cs) +{ + if (!cs) + return; +#ifdef ZT_ENABLE_CLUSTER + if (RR->cluster) + RR->cluster->status(*cs); + else +#endif + memset(cs,0,sizeof(ZT_ClusterStatus)); +} + /****************************************************************************/ /* Node methods used only within node/ */ /****************************************************************************/ @@ -936,15 +959,6 @@ enum ZT_ResultCode ZT_Node_clusterInit( } } -/** - * Add a member to this cluster - * - * Calling this without having called clusterInit() will do nothing. - * - * @param node Node instance - * @param memberId Member ID (must be less than or equal to ZT_CLUSTER_MAX_MEMBERS) - * @return OK or error if clustering is disabled, ID invalid, etc. - */ enum ZT_ResultCode ZT_Node_clusterAddMember(ZT_Node *node,unsigned int memberId) { try { @@ -954,14 +968,6 @@ enum ZT_ResultCode ZT_Node_clusterAddMember(ZT_Node *node,unsigned int memberId) } } -/** - * Remove a member from this cluster - * - * Calling this without having called clusterInit() will do nothing. - * - * @param node Node instance - * @param memberId Member ID to remove (nothing happens if not present) - */ void ZT_Node_clusterRemoveMember(ZT_Node *node,unsigned int memberId) { try { @@ -969,18 +975,6 @@ void ZT_Node_clusterRemoveMember(ZT_Node *node,unsigned int memberId) } catch ( ... ) {} } -/** - * Handle an incoming cluster state message - * - * The message itself contains cluster member IDs, and invalid or badly - * addressed messages will be silently discarded. - * - * Calling this without having called clusterInit() will do nothing. - * - * @param node Node instance - * @param msg Cluster message - * @param len Length of cluster message - */ void ZT_Node_clusterHandleIncomingMessage(ZT_Node *node,const void *msg,unsigned int len) { try { @@ -988,6 +982,13 @@ void ZT_Node_clusterHandleIncomingMessage(ZT_Node *node,const void *msg,unsigned } catch ( ... ) {} } +void ZT_Node_clusterStatus(ZT_Node *node,ZT_ClusterStatus *cs) +{ + try { + reinterpret_cast<ZeroTier::Node *>(node)->clusterStatus(cs); + } catch ( ... ) {} +} + void ZT_version(int *major,int *minor,int *revision,unsigned long *featureFlags) { if (major) *major = ZEROTIER_ONE_VERSION_MAJOR; diff --git a/node/Node.hpp b/node/Node.hpp index b8bd4dc5..4094a79e 100644 --- a/node/Node.hpp +++ b/node/Node.hpp @@ -124,6 +124,7 @@ public: ZT_ResultCode clusterAddMember(unsigned int memberId); void clusterRemoveMember(unsigned int memberId); void clusterHandleIncomingMessage(const void *msg,unsigned int len); + void clusterStatus(ZT_ClusterStatus *cs); // Internal functions ------------------------------------------------------ diff --git a/node/SelfAwareness.cpp b/node/SelfAwareness.cpp index 7329322a..1b70f17c 100644 --- a/node/SelfAwareness.cpp +++ b/node/SelfAwareness.cpp @@ -94,7 +94,7 @@ void SelfAwareness::iam(const Address &reporter,const InetAddress &reporterPhysi Mutex::Lock _l(_phy_m); - PhySurfaceEntry &entry = _phy[PhySurfaceKey(reporter,scope)]; + PhySurfaceEntry &entry = _phy[PhySurfaceKey(reporter,reporterPhysicalAddress,scope)]; if ((now - entry.ts) >= ZT_SELFAWARENESS_ENTRY_TIMEOUT) { entry.mySurface = myPhysicalAddress; @@ -105,14 +105,15 @@ void SelfAwareness::iam(const Address &reporter,const InetAddress &reporterPhysi entry.ts = now; TRACE("learned physical address %s for scope %u as seen from %s(%s) (replaced %s, resetting all in scope)",myPhysicalAddress.toString().c_str(),(unsigned int)scope,reporter.toString().c_str(),reporterPhysicalAddress.toString().c_str(),entry.mySurface.toString().c_str()); - // Erase all entries (other than this one) for this scope to prevent thrashing - // Note: we should probably not use 'entry' after this + // Erase all entries in this scope that were not reported by this remote address to prevent 'thrashing' + // due to multiple reports of endpoint change. + // Don't use 'entry' after this since hash table gets modified. { Hashtable< PhySurfaceKey,PhySurfaceEntry >::Iterator i(_phy); PhySurfaceKey *k = (PhySurfaceKey *)0; PhySurfaceEntry *e = (PhySurfaceEntry *)0; while (i.next(k,e)) { - if ((k->reporter != reporter)&&(k->scope == scope)) + if ((k->reporterPhysicalAddress != reporterPhysicalAddress)&&(k->scope == scope)) _phy.erase(*k); } } diff --git a/node/SelfAwareness.hpp b/node/SelfAwareness.hpp index 3133553e..400b05e6 100644 --- a/node/SelfAwareness.hpp +++ b/node/SelfAwareness.hpp @@ -69,14 +69,14 @@ private: struct PhySurfaceKey { Address reporter; + InetAddress reporterPhysicalAddress; InetAddress::IpScope scope; - inline unsigned long hashCode() const throw() { return ((unsigned long)reporter.toInt() + (unsigned long)scope); } - PhySurfaceKey() : reporter(),scope(InetAddress::IP_SCOPE_NONE) {} - PhySurfaceKey(const Address &r,InetAddress::IpScope s) : reporter(r),scope(s) {} - inline bool operator<(const PhySurfaceKey &k) const throw() { return ((reporter < k.reporter) ? true : ((reporter == k.reporter) ? ((int)scope < (int)k.scope) : false)); } - inline bool operator==(const PhySurfaceKey &k) const throw() { return ((reporter == k.reporter)&&(scope == k.scope)); } + PhySurfaceKey(const Address &r,const InetAddress &ra,InetAddress::IpScope s) : reporter(r),reporterPhysicalAddress(ra),scope(s) {} + + inline unsigned long hashCode() const throw() { return ((unsigned long)reporter.toInt() + (unsigned long)scope); } + inline bool operator==(const PhySurfaceKey &k) const throw() { return ((reporter == k.reporter)&&(reporterPhysicalAddress == k.reporterPhysicalAddress)&&(scope == k.scope)); } }; struct PhySurfaceEntry { diff --git a/node/Topology.cpp b/node/Topology.cpp index 88c8856c..e56d1f47 100644 --- a/node/Topology.cpp +++ b/node/Topology.cpp @@ -35,14 +35,9 @@ namespace ZeroTier { -// Default World #define ZT_DEFAULT_WORLD_LENGTH 494 static const unsigned char ZT_DEFAULT_WORLD[ZT_DEFAULT_WORLD_LENGTH] = {0x01,0x00,0x00,0x00,0x00,0x08,0xea,0xc9,0x0a,0x00,0x00,0x01,0x4f,0xdf,0xbf,0xfc,0xbb,0x6c,0x7e,0x15,0x67,0x85,0x1b,0xb4,0x65,0x04,0x01,0xaf,0x56,0xbf,0xe7,0x63,0x9d,0x77,0xef,0xa4,0x1e,0x61,0x53,0x88,0xcb,0x8d,0x78,0xe5,0x47,0x38,0x98,0x5a,0x6c,0x8a,0xdd,0xe6,0x9c,0x65,0xdf,0x1a,0x80,0x63,0xce,0x2e,0x4d,0x48,0x24,0x3d,0x68,0x87,0x96,0x13,0x89,0xba,0x25,0x6f,0xc9,0xb0,0x9f,0x20,0xc5,0x4c,0x51,0x7b,0x30,0xb7,0x5f,0xba,0xca,0xa4,0xc5,0x48,0xa3,0x15,0xab,0x2f,0x1d,0x64,0xe8,0x04,0x42,0xb3,0x1c,0x51,0x8b,0x2a,0x04,0x01,0xf8,0xe1,0x81,0xaf,0x60,0x2f,0x70,0x3e,0xcd,0x0b,0x21,0x38,0x19,0x62,0x02,0xbd,0x0e,0x33,0x1d,0x0a,0x7b,0xf1,0xec,0xad,0xef,0x54,0xb3,0x7b,0x17,0x84,0xaa,0xda,0x0a,0x85,0x5d,0x0b,0x1c,0x05,0x83,0xb9,0x0e,0x3e,0xe3,0xb4,0xd1,0x8b,0x5b,0x64,0xf7,0xcf,0xe1,0xff,0x5d,0xc2,0x2a,0xcf,0x60,0x7b,0x09,0xb4,0xa3,0x86,0x3c,0x5a,0x7e,0x31,0xa0,0xc7,0xb4,0x86,0xe3,0x41,0x33,0x04,0x7e,0x19,0x87,0x6a,0xba,0x00,0x2a,0x6e,0x2b,0x23,0x18,0x93,0x0f,0x60,0xeb,0x09,0x7f,0x70,0xd0,0xf4,0xb0,0x28,0xb2,0xcd,0x6d,0x3d,0x0c,0x63,0xc0,0x14,0xb9,0x03,0x9f,0xf3,0x53,0x90,0xe4,0x11,0x81,0xf2,0x16,0xfb,0x2e,0x6f,0xa8,0xd9,0x5c,0x1e,0xe9,0x66,0x71,0x56,0x41,0x19,0x05,0xc3,0xdc,0xcf,0xea,0x78,0xd8,0xc6,0xdf,0xaf,0xba,0x68,0x81,0x70,0xb3,0xfa,0x00,0x01,0x04,0xc6,0xc7,0x61,0xdc,0x27,0x09,0x88,0x41,0x40,0x8a,0x2e,0x00,0xbb,0x1d,0x31,0xf2,0xc3,0x23,0xe2,0x64,0xe9,0xe6,0x41,0x72,0xc1,0xa7,0x4f,0x77,0x89,0x95,0x55,0xed,0x10,0x75,0x1c,0xd5,0x6e,0x86,0x40,0x5c,0xde,0x11,0x8d,0x02,0xdf,0xfe,0x55,0x5d,0x46,0x2c,0xcf,0x6a,0x85,0xb5,0x63,0x1c,0x12,0x35,0x0c,0x8d,0x5d,0xc4,0x09,0xba,0x10,0xb9,0x02,0x5d,0x0f,0x44,0x5c,0xf4,0x49,0xd9,0x2b,0x1c,0x00,0x01,0x04,0x6b,0xbf,0x2e,0xd2,0x27,0x09,0x8a,0xcf,0x05,0x9f,0xe3,0x00,0x48,0x2f,0x6e,0xe5,0xdf,0xe9,0x02,0x31,0x9b,0x41,0x9d,0xe5,0xbd,0xc7,0x65,0x20,0x9c,0x0e,0xcd,0xa3,0x8c,0x4d,0x6e,0x4f,0xcf,0x0d,0x33,0x65,0x83,0x98,0xb4,0x52,0x7d,0xcd,0x22,0xf9,0x31,0x12,0xfb,0x9b,0xef,0xd0,0x2f,0xd7,0x8b,0xf7,0x26,0x1b,0x33,0x3f,0xc1,0x05,0xd1,0x92,0xa6,0x23,0xca,0x9e,0x50,0xfc,0x60,0xb3,0x74,0xa5,0x00,0x01,0x04,0xa2,0xf3,0x4d,0x6f,0x27,0x09,0x9d,0x21,0x90,0x39,0xf3,0x00,0x01,0xf0,0x92,0x2a,0x98,0xe3,0xb3,0x4e,0xbc,0xbf,0xf3,0x33,0x26,0x9d,0xc2,0x65,0xd7,0xa0,0x20,0xaa,0xb6,0x9d,0x72,0xbe,0x4d,0x4a,0xcc,0x9c,0x8c,0x92,0x94,0x78,0x57,0x71,0x25,0x6c,0xd1,0xd9,0x42,0xa9,0x0d,0x1b,0xd1,0xd2,0xdc,0xa3,0xea,0x84,0xef,0x7d,0x85,0xaf,0xe6,0x61,0x1f,0xb4,0x3f,0xf0,0xb7,0x41,0x26,0xd9,0x0a,0x6e,0x00,0x01,0x04,0x80,0xc7,0xc5,0xd9,0x27,0x09}; -// ALICE-TEST -//#define ZT_DEFAULT_WORLD_LENGTH 257 -//static const unsigned char ZT_DEFAULT_WORLD[ZT_DEFAULT_WORLD_LENGTH] = {0x01,0x00,0x00,0x00,0x00,0x08,0xea,0xc9,0x0a,0x00,0x00,0x01,0x50,0x81,0x2a,0x54,0x6f,0x72,0xb0,0x3b,0xbe,0x73,0xda,0xbd,0xfb,0x85,0x77,0x9f,0xc9,0x2e,0x17,0xc8,0x11,0x6e,0xda,0x61,0x80,0xd1,0x41,0xcb,0x7c,0x2d,0x2b,0xa4,0x34,0x75,0x19,0x64,0x20,0x80,0x0a,0x22,0x32,0xf2,0x01,0x6c,0xfe,0x79,0xa6,0x7d,0xec,0x10,0x7e,0x03,0xf3,0xa2,0xa0,0x19,0xc8,0x7c,0xfd,0x6c,0x56,0x52,0xa8,0xfb,0xdc,0xfb,0x93,0x81,0x3e,0x63,0x8b,0xb3,0xb6,0x72,0x45,0xa9,0x81,0x81,0xcc,0xea,0x7f,0x2f,0xd9,0x59,0xce,0xc8,0x51,0x12,0xc3,0xe3,0x44,0x76,0x54,0xed,0xe7,0x8d,0x34,0x0b,0x5d,0x10,0x3d,0x52,0x04,0x9b,0xe1,0xb2,0x36,0x51,0x75,0x14,0x30,0x53,0xe8,0x4b,0xe4,0x91,0x9a,0xed,0x99,0x56,0xa3,0x8d,0x5e,0x14,0xff,0x66,0xd8,0x4f,0xf7,0x3c,0x23,0xbe,0x02,0xbb,0x1e,0xb6,0x7e,0x07,0xfa,0x7c,0x7e,0x50,0xe8,0x40,0xf9,0x37,0x70,0x1a,0x75,0xcf,0x19,0xe6,0x83,0xe1,0x5c,0x20,0x1d,0x1e,0x5b,0xe5,0x6a,0xbe,0xe7,0xab,0xec,0x01,0xd6,0xdd,0xca,0x6a,0xb5,0x00,0x4e,0x76,0x12,0x07,0xd8,0xb4,0x20,0x0b,0xe4,0x4f,0x47,0x8e,0x3d,0xa1,0x48,0xc1,0x60,0x99,0x11,0x0e,0xe7,0x1b,0x64,0x58,0x6d,0xda,0x11,0x8e,0x40,0x22,0xab,0x63,0x68,0x2c,0xe1,0x37,0xda,0x8b,0xa8,0x17,0xfc,0x7f,0x73,0xaa,0x31,0x63,0xf2,0xe3,0x33,0x93,0x3e,0x29,0x94,0xc4,0x6b,0x4f,0x41,0x19,0x30,0x7b,0xe8,0x85,0x5a,0x72,0x00,0x01,0x04,0xa9,0x39,0x8f,0x68,0x27,0x09}; - Topology::Topology(const RuntimeEnvironment *renv) : RR(renv), _amRoot(false) @@ -337,6 +332,21 @@ void Topology::clean(uint64_t now) } } +unsigned long Topology::countAlive() const +{ + const uint64_t now = RR->node->now(); + unsigned long cnt = 0; + Mutex::Lock _l(_lock); + Hashtable< Address,SharedPtr<Peer> >::Iterator i(const_cast<Topology *>(this)->_peers); + Address *a = (Address *)0; + SharedPtr<Peer> *p = (SharedPtr<Peer> *)0; + while (i.next(a,p)) { + if ((*p)->alive(now)) + ++cnt; + } + return cnt; +} + Identity Topology::_getIdentity(const Address &zta) { char p[128]; @@ -358,16 +368,18 @@ void Topology::_setWorld(const World &newWorld) _rootAddresses.clear(); _rootPeers.clear(); for(std::vector<World::Root>::const_iterator r(_world.roots().begin());r!=_world.roots().end();++r) { - if (r->identity == RR->identity) - _amRoot = true; _rootAddresses.push_back(r->identity.address()); - SharedPtr<Peer> *rp = _peers.get(r->identity.address()); - if (rp) { - _rootPeers.push_back(*rp); - } else if (r->identity.address() != RR->identity.address()) { - SharedPtr<Peer> newrp(new Peer(RR->identity,r->identity)); - _peers.set(r->identity.address(),newrp); - _rootPeers.push_back(newrp); + if (r->identity.address() == RR->identity.address()) { + _amRoot = true; + } else { + SharedPtr<Peer> *rp = _peers.get(r->identity.address()); + if (rp) { + _rootPeers.push_back(*rp); + } else { + SharedPtr<Peer> newrp(new Peer(RR->identity,r->identity)); + _peers.set(r->identity.address(),newrp); + _rootPeers.push_back(newrp); + } } } } diff --git a/node/Topology.hpp b/node/Topology.hpp index 48e264a8..ee9827b9 100644 --- a/node/Topology.hpp +++ b/node/Topology.hpp @@ -193,6 +193,11 @@ public: void clean(uint64_t now); /** + * @return Number of 'alive' peers + */ + unsigned long countAlive() const; + + /** * Apply a function or function object to all peers * * Note: explicitly template this by reference if you want the object @@ -225,6 +230,11 @@ public: return _peers.entries(); } + /** + * @return True if I am a root server in the current World + */ + inline bool amRoot() const throw() { return _amRoot; } + private: Identity _getIdentity(const Address &zta); void _setWorld(const World &newWorld); |