summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--node/Cluster.cpp379
1 files changed, 195 insertions, 184 deletions
diff --git a/node/Cluster.cpp b/node/Cluster.cpp
index e7aa5a41..c943e62b 100644
--- a/node/Cluster.cpp
+++ b/node/Cluster.cpp
@@ -143,212 +143,223 @@ void Cluster::handleIncomingStateMessage(const void *msg,unsigned int len)
return;
const uint16_t fromMemberId = dmsg.at<uint16_t>(0);
unsigned int ptr = 2;
- if (fromMemberId == _id)
+ if (fromMemberId == _id) // sanity check: we don't talk to ourselves
return;
const uint16_t toMemberId = dmsg.at<uint16_t>(ptr);
ptr += 2;
- if (toMemberId != _id)
+ if (toMemberId != _id) // sanity check: message not for us?
return;
- _Member &m = _members[fromMemberId];
- Mutex::Lock mlck(m.lock);
-
- try {
- while (ptr < dmsg.size()) {
- const unsigned int mlen = dmsg.at<uint16_t>(ptr); ptr += 2;
- const unsigned int nextPtr = ptr + mlen;
-
- int mtype = -1;
- try {
- switch((StateMessageType)(mtype = (int)dmsg[ptr++])) {
- default:
- break;
-
- case STATE_MESSAGE_ALIVE: {
- ptr += 7; // skip version stuff, not used yet
- m.x = dmsg.at<int32_t>(ptr); ptr += 4;
- m.y = dmsg.at<int32_t>(ptr); ptr += 4;
- m.z = dmsg.at<int32_t>(ptr); ptr += 4;
- ptr += 8; // skip local clock, not used
- m.load = dmsg.at<uint64_t>(ptr); ptr += 8;
- ptr += 8; // skip flags, unused
+ { // make sure sender is actually considered a member
+ Mutex::Lock _l3(_memberIds_m);
+ if (std::find(_memberIds.begin(),_memberIds.end(),fromMemberId) == _memberIds.end())
+ return;
+ }
+
+ {
+ _Member &m = _members[fromMemberId];
+ Mutex::Lock mlck(m.lock);
+
+ try {
+ while (ptr < dmsg.size()) {
+ const unsigned int mlen = dmsg.at<uint16_t>(ptr); ptr += 2;
+ const unsigned int nextPtr = ptr + mlen;
+ if (nextPtr > dmsg.size())
+ break;
+
+ int mtype = -1;
+ try {
+ switch((StateMessageType)(mtype = (int)dmsg[ptr++])) {
+ default:
+ break;
+
+ case STATE_MESSAGE_ALIVE: {
+ ptr += 7; // skip version stuff, not used yet
+ m.x = dmsg.at<int32_t>(ptr); ptr += 4;
+ m.y = dmsg.at<int32_t>(ptr); ptr += 4;
+ m.z = dmsg.at<int32_t>(ptr); ptr += 4;
+ ptr += 8; // skip local clock, not used
+ m.load = dmsg.at<uint64_t>(ptr); ptr += 8;
+ ptr += 8; // skip flags, unused
#ifdef ZT_TRACE
- std::string addrs;
+ std::string addrs;
#endif
- unsigned int physicalAddressCount = dmsg[ptr++];
- for(unsigned int i=0;i<physicalAddressCount;++i) {
- m.zeroTierPhysicalEndpoints.push_back(InetAddress());
- ptr += m.zeroTierPhysicalEndpoints.back().deserialize(dmsg,ptr);
- if (!(m.zeroTierPhysicalEndpoints.back())) {
- m.zeroTierPhysicalEndpoints.pop_back();
- }
+ unsigned int physicalAddressCount = dmsg[ptr++];
+ m.zeroTierPhysicalEndpoints.clear();
+ for(unsigned int i=0;i<physicalAddressCount;++i) {
+ m.zeroTierPhysicalEndpoints.push_back(InetAddress());
+ ptr += m.zeroTierPhysicalEndpoints.back().deserialize(dmsg,ptr);
+ if (!(m.zeroTierPhysicalEndpoints.back())) {
+ m.zeroTierPhysicalEndpoints.pop_back();
+ }
#ifdef ZT_TRACE
- else {
- if (addrs.length() > 0)
- addrs.push_back(',');
- addrs.append(m.zeroTierPhysicalEndpoints.back().toString());
- }
+ else {
+ if (addrs.length() > 0)
+ addrs.push_back(',');
+ addrs.append(m.zeroTierPhysicalEndpoints.back().toString());
+ }
#endif
- }
- m.lastReceivedAliveAnnouncement = RR->node->now();
+ }
+ m.lastReceivedAliveAnnouncement = RR->node->now();
#ifdef ZT_TRACE
- TRACE("[%u] I'm alive! send me peers at %s",(unsigned int)fromMemberId,addrs.c_str());
+ TRACE("[%u] I'm alive! peers may be redirected to: %s",(unsigned int)fromMemberId,addrs.c_str());
#endif
- } break;
-
- case STATE_MESSAGE_HAVE_PEER: {
- try {
- Identity id;
- ptr += id.deserialize(dmsg,ptr);
- if (id) {
- RR->topology->saveIdentity(id);
-
- { // Add or update peer affinity entry
- _PeerAffinity pa(id.address(),fromMemberId,RR->node->now());
- Mutex::Lock _l2(_peerAffinities_m);
- std::vector<_PeerAffinity>::iterator i(std::lower_bound(_peerAffinities.begin(),_peerAffinities.end(),pa)); // O(log(n))
- if ((i != _peerAffinities.end())&&(i->key == pa.key)) {
- i->timestamp = pa.timestamp;
- } else {
- _peerAffinities.push_back(pa);
- std::sort(_peerAffinities.begin(),_peerAffinities.end()); // probably a more efficient way to insert but okay for now
- }
- }
+ } break;
+
+ case STATE_MESSAGE_HAVE_PEER: {
+ try {
+ Identity id;
+ ptr += id.deserialize(dmsg,ptr);
+ if (id) {
+ RR->topology->saveIdentity(id);
+
+ { // Add or update peer affinity entry
+ _PeerAffinity pa(id.address(),fromMemberId,RR->node->now());
+ Mutex::Lock _l2(_peerAffinities_m);
+ std::vector<_PeerAffinity>::iterator i(std::lower_bound(_peerAffinities.begin(),_peerAffinities.end(),pa)); // O(log(n))
+ if ((i != _peerAffinities.end())&&(i->key == pa.key)) {
+ i->timestamp = pa.timestamp;
+ } else {
+ _peerAffinities.push_back(pa);
+ std::sort(_peerAffinities.begin(),_peerAffinities.end()); // probably a more efficient way to insert but okay for now
+ }
+ }
- TRACE("[%u] has %s",(unsigned int)fromMemberId,id.address().toString().c_str());
- }
- } catch ( ... ) {
- // ignore invalid identities
- }
- } break;
-
- case STATE_MESSAGE_MULTICAST_LIKE: {
- const uint64_t nwid = dmsg.at<uint64_t>(ptr); ptr += 8;
- const Address address(dmsg.field(ptr,ZT_ADDRESS_LENGTH),ZT_ADDRESS_LENGTH); ptr += ZT_ADDRESS_LENGTH;
- const MAC mac(dmsg.field(ptr,6),6); ptr += 6;
- const uint32_t adi = dmsg.at<uint32_t>(ptr); ptr += 4;
- RR->mc->add(RR->node->now(),nwid,MulticastGroup(mac,adi),address);
- TRACE("[%u] %s likes %s/%u on %.16llu",(unsigned int)fromMemberId,address.toString().c_str(),mac.toString().c_str(),(unsigned int)adi,nwid);
- } break;
-
- case STATE_MESSAGE_COM: {
- CertificateOfMembership com;
- ptr += com.deserialize(dmsg,ptr);
- if (com) {
- TRACE("[%u] COM for %s on %.16llu rev %llu",(unsigned int)fromMemberId,com.issuedTo().toString().c_str(),com.networkId(),com.revision());
- }
- } break;
-
- case STATE_MESSAGE_RELAY: {
- const unsigned int numRemotePeerPaths = dmsg[ptr++];
- InetAddress remotePeerPaths[256]; // size is 8-bit, so 256 is max
- for(unsigned int i=0;i<numRemotePeerPaths;++i)
- ptr += remotePeerPaths[i].deserialize(dmsg,ptr);
- const unsigned int packetLen = dmsg.at<uint16_t>(ptr); ptr += 2;
- const void *packet = (const void *)dmsg.field(ptr,packetLen); ptr += packetLen;
-
- if (packetLen >= ZT_PROTO_MIN_FRAGMENT_LENGTH) { // ignore anything too short to contain a dest address
- const Address destinationAddress(reinterpret_cast<const char *>(packet) + 8,ZT_ADDRESS_LENGTH);
- TRACE("[%u] relay %u bytes to %s (%u remote paths included)",(unsigned int)fromMemberId,packetLen,destinationAddress.toString().c_str(),numRemotePeerPaths);
-
- SharedPtr<Peer> destinationPeer(RR->topology->getPeer(destinationAddress));
- if (destinationPeer) {
- if (
- (destinationPeer->send(RR,packet,packetLen,RR->node->now()))&&
- (numRemotePeerPaths > 0)&&
- (packetLen >= 18)&&
- (reinterpret_cast<const unsigned char *>(packet)[ZT_PACKET_FRAGMENT_IDX_FRAGMENT_INDICATOR] == ZT_PACKET_FRAGMENT_INDICATOR)
- ) {
- // If remote peer paths were sent with this relayed packet, we do
- // RENDEZVOUS. It's handled here for cluster-relayed packets since
- // we don't have both Peer records so this is a different path.
-
- const Address remotePeerAddress(reinterpret_cast<const char *>(packet) + 13,ZT_ADDRESS_LENGTH);
-
- InetAddress bestDestV4,bestDestV6;
- destinationPeer->getBestActiveAddresses(RR->node->now(),bestDestV4,bestDestV6);
- InetAddress bestRemoteV4,bestRemoteV6;
- for(unsigned int i=0;i<numRemotePeerPaths;++i) {
- if ((bestRemoteV4)&&(bestRemoteV6))
- break;
- switch(remotePeerPaths[i].ss_family) {
- case AF_INET:
- if (!bestRemoteV4)
- bestRemoteV4 = remotePeerPaths[i];
- break;
- case AF_INET6:
- if (!bestRemoteV6)
- bestRemoteV6 = remotePeerPaths[i];
+ TRACE("[%u] has %s",(unsigned int)fromMemberId,id.address().toString().c_str());
+ }
+ } catch ( ... ) {
+ // ignore invalid identities
+ }
+ } break;
+
+ case STATE_MESSAGE_MULTICAST_LIKE: {
+ const uint64_t nwid = dmsg.at<uint64_t>(ptr); ptr += 8;
+ const Address address(dmsg.field(ptr,ZT_ADDRESS_LENGTH),ZT_ADDRESS_LENGTH); ptr += ZT_ADDRESS_LENGTH;
+ const MAC mac(dmsg.field(ptr,6),6); ptr += 6;
+ const uint32_t adi = dmsg.at<uint32_t>(ptr); ptr += 4;
+ RR->mc->add(RR->node->now(),nwid,MulticastGroup(mac,adi),address);
+ TRACE("[%u] %s likes %s/%u on %.16llu",(unsigned int)fromMemberId,address.toString().c_str(),mac.toString().c_str(),(unsigned int)adi,nwid);
+ } break;
+
+ case STATE_MESSAGE_COM: {
+ CertificateOfMembership com;
+ ptr += com.deserialize(dmsg,ptr);
+ if (com) {
+ TRACE("[%u] COM for %s on %.16llu rev %llu",(unsigned int)fromMemberId,com.issuedTo().toString().c_str(),com.networkId(),com.revision());
+ }
+ } break;
+
+ case STATE_MESSAGE_RELAY: {
+ const unsigned int numRemotePeerPaths = dmsg[ptr++];
+ InetAddress remotePeerPaths[256]; // size is 8-bit, so 256 is max
+ for(unsigned int i=0;i<numRemotePeerPaths;++i)
+ ptr += remotePeerPaths[i].deserialize(dmsg,ptr);
+ const unsigned int packetLen = dmsg.at<uint16_t>(ptr); ptr += 2;
+ const void *packet = (const void *)dmsg.field(ptr,packetLen); ptr += packetLen;
+
+ if (packetLen >= ZT_PROTO_MIN_FRAGMENT_LENGTH) { // ignore anything too short to contain a dest address
+ const Address destinationAddress(reinterpret_cast<const char *>(packet) + 8,ZT_ADDRESS_LENGTH);
+ TRACE("[%u] relay %u bytes to %s (%u remote paths included)",(unsigned int)fromMemberId,packetLen,destinationAddress.toString().c_str(),numRemotePeerPaths);
+
+ SharedPtr<Peer> destinationPeer(RR->topology->getPeer(destinationAddress));
+ if (destinationPeer) {
+ if (
+ (destinationPeer->send(RR,packet,packetLen,RR->node->now()))&&
+ (numRemotePeerPaths > 0)&&
+ (packetLen >= 18)&&
+ (reinterpret_cast<const unsigned char *>(packet)[ZT_PACKET_FRAGMENT_IDX_FRAGMENT_INDICATOR] == ZT_PACKET_FRAGMENT_INDICATOR)
+ ) {
+ // If remote peer paths were sent with this relayed packet, we do
+ // RENDEZVOUS. It's handled here for cluster-relayed packets since
+ // we don't have both Peer records so this is a different path.
+
+ const Address remotePeerAddress(reinterpret_cast<const char *>(packet) + 13,ZT_ADDRESS_LENGTH);
+
+ InetAddress bestDestV4,bestDestV6;
+ destinationPeer->getBestActiveAddresses(RR->node->now(),bestDestV4,bestDestV6);
+ InetAddress bestRemoteV4,bestRemoteV6;
+ for(unsigned int i=0;i<numRemotePeerPaths;++i) {
+ if ((bestRemoteV4)&&(bestRemoteV6))
break;
+ switch(remotePeerPaths[i].ss_family) {
+ case AF_INET:
+ if (!bestRemoteV4)
+ bestRemoteV4 = remotePeerPaths[i];
+ break;
+ case AF_INET6:
+ if (!bestRemoteV6)
+ bestRemoteV6 = remotePeerPaths[i];
+ break;
+ }
}
- }
- Packet rendezvousForDest(destinationAddress,RR->identity.address(),Packet::VERB_RENDEZVOUS);
- rendezvousForDest.append((uint8_t)0);
- remotePeerAddress.appendTo(rendezvousForDest);
-
- Buffer<2048> rendezvousForOtherEnd;
- remotePeerAddress.appendTo(rendezvousForOtherEnd);
- rendezvousForOtherEnd.append((uint8_t)Packet::VERB_RENDEZVOUS);
- const unsigned int rendezvousForOtherEndPayloadSizePtr = rendezvousForOtherEnd.size();
- rendezvousForOtherEnd.addSize(2); // space for actual packet payload length
- rendezvousForOtherEnd.append((uint8_t)0); // flags == 0
- destinationAddress.appendTo(rendezvousForOtherEnd);
-
- bool haveMatch = false;
- if ((bestDestV6)&&(bestRemoteV6)) {
- haveMatch = true;
-
- rendezvousForDest.append((uint16_t)bestRemoteV6.port());
- rendezvousForDest.append((uint8_t)16);
- rendezvousForDest.append(bestRemoteV6.rawIpData(),16);
-
- rendezvousForOtherEnd.append((uint16_t)bestDestV6.port());
- rendezvousForOtherEnd.append((uint8_t)16);
- rendezvousForOtherEnd.append(bestDestV6.rawIpData(),16);
- rendezvousForOtherEnd.setAt<uint16_t>(rendezvousForOtherEndPayloadSizePtr,(uint16_t)(9 + 16));
- } else if ((bestDestV4)&&(bestRemoteV4)) {
- haveMatch = true;
-
- rendezvousForDest.append((uint16_t)bestRemoteV4.port());
- rendezvousForDest.append((uint8_t)4);
- rendezvousForDest.append(bestRemoteV4.rawIpData(),4);
-
- rendezvousForOtherEnd.append((uint16_t)bestDestV4.port());
- rendezvousForOtherEnd.append((uint8_t)4);
- rendezvousForOtherEnd.append(bestDestV4.rawIpData(),4);
- rendezvousForOtherEnd.setAt<uint16_t>(rendezvousForOtherEndPayloadSizePtr,(uint16_t)(9 + 4));
- }
+ Packet rendezvousForDest(destinationAddress,RR->identity.address(),Packet::VERB_RENDEZVOUS);
+ rendezvousForDest.append((uint8_t)0);
+ remotePeerAddress.appendTo(rendezvousForDest);
+
+ Buffer<2048> rendezvousForOtherEnd;
+ remotePeerAddress.appendTo(rendezvousForOtherEnd);
+ rendezvousForOtherEnd.append((uint8_t)Packet::VERB_RENDEZVOUS);
+ const unsigned int rendezvousForOtherEndPayloadSizePtr = rendezvousForOtherEnd.size();
+ rendezvousForOtherEnd.addSize(2); // space for actual packet payload length
+ rendezvousForOtherEnd.append((uint8_t)0); // flags == 0
+ destinationAddress.appendTo(rendezvousForOtherEnd);
+
+ bool haveMatch = false;
+ if ((bestDestV6)&&(bestRemoteV6)) {
+ haveMatch = true;
+
+ rendezvousForDest.append((uint16_t)bestRemoteV6.port());
+ rendezvousForDest.append((uint8_t)16);
+ rendezvousForDest.append(bestRemoteV6.rawIpData(),16);
+
+ rendezvousForOtherEnd.append((uint16_t)bestDestV6.port());
+ rendezvousForOtherEnd.append((uint8_t)16);
+ rendezvousForOtherEnd.append(bestDestV6.rawIpData(),16);
+ rendezvousForOtherEnd.setAt<uint16_t>(rendezvousForOtherEndPayloadSizePtr,(uint16_t)(9 + 16));
+ } else if ((bestDestV4)&&(bestRemoteV4)) {
+ haveMatch = true;
+
+ rendezvousForDest.append((uint16_t)bestRemoteV4.port());
+ rendezvousForDest.append((uint8_t)4);
+ rendezvousForDest.append(bestRemoteV4.rawIpData(),4);
+
+ rendezvousForOtherEnd.append((uint16_t)bestDestV4.port());
+ rendezvousForOtherEnd.append((uint8_t)4);
+ rendezvousForOtherEnd.append(bestDestV4.rawIpData(),4);
+ rendezvousForOtherEnd.setAt<uint16_t>(rendezvousForOtherEndPayloadSizePtr,(uint16_t)(9 + 4));
+ }
- if (haveMatch) {
- _send(fromMemberId,STATE_MESSAGE_PROXY_SEND,rendezvousForOtherEnd.data(),rendezvousForOtherEnd.size());
- RR->sw->send(rendezvousForDest,true,0);
+ if (haveMatch) {
+ _send(fromMemberId,STATE_MESSAGE_PROXY_SEND,rendezvousForOtherEnd.data(),rendezvousForOtherEnd.size());
+ RR->sw->send(rendezvousForDest,true,0);
+ }
}
}
}
- }
- } break;
-
- case STATE_MESSAGE_PROXY_SEND: {
- const Address rcpt(dmsg.field(ptr,ZT_ADDRESS_LENGTH),ZT_ADDRESS_LENGTH);
- const Packet::Verb verb = (Packet::Verb)dmsg[ptr++];
- const unsigned int len = dmsg.at<uint16_t>(ptr); ptr += 2;
- Packet outp(rcpt,RR->identity.address(),verb);
- outp.append(dmsg.field(ptr,len),len);
- RR->sw->send(outp,true,0);
- TRACE("[%u] proxy send %s to %s length %u",(unsigned int)fromMemberId,Packet::verbString(verb),rcpt.toString().c_str(),len);
- } break;
+ } break;
+
+ case STATE_MESSAGE_PROXY_SEND: {
+ const Address rcpt(dmsg.field(ptr,ZT_ADDRESS_LENGTH),ZT_ADDRESS_LENGTH);
+ const Packet::Verb verb = (Packet::Verb)dmsg[ptr++];
+ const unsigned int len = dmsg.at<uint16_t>(ptr); ptr += 2;
+ Packet outp(rcpt,RR->identity.address(),verb);
+ outp.append(dmsg.field(ptr,len),len);
+ RR->sw->send(outp,true,0);
+ TRACE("[%u] proxy send %s to %s length %u",(unsigned int)fromMemberId,Packet::verbString(verb),rcpt.toString().c_str(),len);
+ } break;
+ }
+ } catch ( ... ) {
+ TRACE("invalid message of size %u type %d (inner decode), discarding",mlen,mtype);
+ // drop invalids
}
- } catch ( ... ) {
- TRACE("invalid message of size %u type %d (inner decode), discarding",mlen,mtype);
- // drop invalids
- }
- ptr = nextPtr;
+ ptr = nextPtr;
+ }
+ } catch ( ... ) {
+ TRACE("invalid message (outer loop), discarding");
+ // drop invalids
}
- } catch ( ... ) {
- TRACE("invalid message (outer loop), discarding");
- // drop invalids
}
}