summaryrefslogtreecommitdiff
path: root/node
diff options
context:
space:
mode:
authorAdam Ierymenko <adam.ierymenko@gmail.com>2015-11-09 09:45:43 -0800
committerAdam Ierymenko <adam.ierymenko@gmail.com>2015-11-09 09:45:43 -0800
commit21341ab15c8f5f0c0596421da5695f1d54a8dbec (patch)
tree196bee05238904361687dfd43c1c29f530c4fe8c /node
parentf4d12603e03c88e7c5d83b2419cb9ec17412b928 (diff)
downloadinfinitytier-21341ab15c8f5f0c0596421da5695f1d54a8dbec.tar.gz
infinitytier-21341ab15c8f5f0c0596421da5695f1d54a8dbec.zip
Ready to test.
Diffstat (limited to 'node')
-rw-r--r--node/Cluster.cpp74
-rw-r--r--node/Multicaster.cpp2
-rw-r--r--node/Multicaster.hpp2
3 files changed, 63 insertions, 15 deletions
diff --git a/node/Cluster.cpp b/node/Cluster.cpp
index e6fc18ec..18ea9eb4 100644
--- a/node/Cluster.cpp
+++ b/node/Cluster.cpp
@@ -161,8 +161,6 @@ void Cluster::handleIncomingStateMessage(const void *msg,unsigned int len)
return;
}
- _Member &m = _members[fromMemberId];
-
try {
while (ptr < dmsg.size()) {
const unsigned int mlen = dmsg.at<uint16_t>(ptr); ptr += 2;
@@ -177,6 +175,7 @@ void Cluster::handleIncomingStateMessage(const void *msg,unsigned int len)
break;
case CLUSTER_MESSAGE_ALIVE: {
+ _Member &m = _members[fromMemberId];
Mutex::Lock mlck(m.lock);
ptr += 7; // skip version stuff, not used yet
m.x = dmsg.at<int32_t>(ptr); ptr += 4;
@@ -253,7 +252,7 @@ void Cluster::handleIncomingStateMessage(const void *msg,unsigned int len)
peer->identity().serialize(buf);
Mutex::Lock _l2(_members[fromMemberId].lock);
_send(fromMemberId,CLUSTER_MESSAGE_HAVE_PEER,buf.data(),buf.size());
- _flush(fromMemberId); // lookups are latency sensitive
+ _flush(fromMemberId);
}
} break;
@@ -340,8 +339,11 @@ void Cluster::handleIncomingStateMessage(const void *msg,unsigned int len)
}
if (haveMatch) {
- _send(fromMemberId,CLUSTER_MESSAGE_PROXY_SEND,rendezvousForRemote.data(),rendezvousForRemote.size());
- _flush(fromMemberId);
+ {
+ Mutex::Lock _l2(_members[fromMemberId].lock);
+ _send(fromMemberId,CLUSTER_MESSAGE_PROXY_SEND,rendezvousForRemote.data(),rendezvousForRemote.size());
+ _flush(fromMemberId);
+ }
RR->sw->send(rendezvousForLocal,true,0);
}
}
@@ -382,7 +384,6 @@ void Cluster::sendViaCluster(const Address &fromPeerAddress,const Address &toPee
return;
const uint64_t now = RR->node->now();
- unsigned int canHasPeer = 0;
uint64_t mostRecentTs = 0;
unsigned int mostRecentMemberId = 0xffffffff;
@@ -413,7 +414,7 @@ void Cluster::sendViaCluster(const Address &fromPeerAddress,const Address &toPee
Mutex::Lock _l2(_members[*mid].lock);
_send(*mid,CLUSTER_MESSAGE_WANT_PEER,tmp,ZT_ADDRESS_LENGTH);
if ((enqueueAndWait)&&(queueCount == 0))
- _flush(*mid); // send first query immediately to reduce latency
+ _flush(*mid);
}
}
@@ -453,14 +454,14 @@ void Cluster::sendViaCluster(const Address &fromPeerAddress,const Address &toPee
{
Mutex::Lock _l2(_members[mostRecentMemberId].lock);
if (buf.size() > 0) {
- _send(canHasPeer,CLUSTER_MESSAGE_PROXY_UNITE,buf.data(),buf.size());
- _flush(canHasPeer); // latency sensitive
+ _send(mostRecentMemberId,CLUSTER_MESSAGE_PROXY_UNITE,buf.data(),buf.size());
+ _flush(mostRecentMemberId);
}
if (_members[mostRecentMemberId].zeroTierPhysicalEndpoints.size() > 0)
- RR->node->putPacket(InetAddress(),_members[canHasPeer].zeroTierPhysicalEndpoints.front(),data,len);
+ RR->node->putPacket(InetAddress(),_members[mostRecentMemberId].zeroTierPhysicalEndpoints.front(),data,len);
}
- TRACE("sendViaCluster(): relaying %u bytes from %s to %s by way of %u",len,fromPeerAddress.toString().c_str(),toPeerAddress.toString().c_str(),(unsigned int)canHasPeer);
+ TRACE("sendViaCluster(): relaying %u bytes from %s to %s by way of %u",len,fromPeerAddress.toString().c_str(),toPeerAddress.toString().c_str(),(unsigned int)mostRecentMemberId);
}
void Cluster::sendDistributedQuery(const Packet &pkt)
@@ -472,7 +473,7 @@ void Cluster::sendDistributedQuery(const Packet &pkt)
for(std::vector<uint16_t>::const_iterator mid(_memberIds.begin());mid!=_memberIds.end();++mid) {
Mutex::Lock _l2(_members[*mid].lock);
_send(*mid,CLUSTER_MESSAGE_REMOTE_PACKET,buf.data(),buf.size());
- _flush(*mid); // these tend to be latency-sensitive
+ _flush(*mid);
}
}
@@ -524,7 +525,7 @@ void Cluster::doPeriodicTasks()
_send(*mid,CLUSTER_MESSAGE_ALIVE,alive.data(),alive.size());
}
- _flush(*mid); // does nothing if nothing to flush
+ _flush(*mid);
}
}
@@ -740,6 +741,53 @@ void Cluster::_flush(uint16_t memberId)
}
}
+void Cluster::_doREMOTE_WHOIS(uint64_t fromMemberId,const Packet &remotep)
+{
+ if (remotep.payloadLength() >= ZT_ADDRESS_LENGTH) {
+ Identity queried(RR->topology->getIdentity(Address(remotep.payload(),ZT_ADDRESS_LENGTH)));
+ if (queried) {
+ Buffer<1024> routp;
+ remotep.source().appendTo(routp);
+ routp.append((uint8_t)Packet::VERB_OK);
+ routp.append((uint8_t)Packet::VERB_WHOIS);
+ routp.append(remotep.packetId());
+ queried.serialize(routp);
+
+ Mutex::Lock _l2(_members[fromMemberId].lock);
+ _send(fromMemberId,CLUSTER_MESSAGE_PROXY_SEND,routp.data(),routp.size());
+ _flush(fromMemberId);
+ }
+ }
+}
+
+void Cluster::_doREMOTE_MULTICAST_GATHER(uint64_t fromMemberId,const Packet &remotep)
+{
+ const uint64_t nwid = remotep.at<uint64_t>(ZT_PROTO_VERB_MULTICAST_GATHER_IDX_NETWORK_ID);
+ const MulticastGroup mg(MAC(remotep.field(ZT_PROTO_VERB_MULTICAST_GATHER_IDX_MAC,6),6),remotep.at<uint32_t>(ZT_PROTO_VERB_MULTICAST_GATHER_IDX_ADI));
+ unsigned int gatherLimit = remotep.at<uint32_t>(ZT_PROTO_VERB_MULTICAST_GATHER_IDX_GATHER_LIMIT);
+ const Address remotePeerAddress(remotep.source());
+
+ //TRACE("<<MC %s(%s) GATHER up to %u in %.16llx/%s",source().toString().c_str(),_remoteAddress.toString().c_str(),gatherLimit,nwid,mg.toString().c_str());
+
+ if (gatherLimit) {
+ Buffer<ZT_PROTO_MAX_PACKET_LENGTH> routp;
+ remotePeerAddress.appendTo(routp);
+ routp.append((uint8_t)Packet::VERB_OK);
+ routp.append((uint8_t)Packet::VERB_MULTICAST_GATHER);
+ routp.append(remotep.packetId());
+ routp.append(nwid);
+ mg.mac().appendTo(routp);
+ routp.append((uint32_t)mg.adi());
+
+ if (gatherLimit > ((ZT_CLUSTER_MAX_MESSAGE_LENGTH - 64) / 5))
+ gatherLimit = ((ZT_CLUSTER_MAX_MESSAGE_LENGTH - 64) / 5);
+ if (RR->mc->gather(remotePeerAddress,nwid,mg,routp,gatherLimit)) {
+ Mutex::Lock _l2(_members[fromMemberId].lock);
+ _send(fromMemberId,CLUSTER_MESSAGE_PROXY_SEND,routp.data(),routp.size());
+ }
+ }
+}
+
} // namespace ZeroTier
#endif // ZT_ENABLE_CLUSTER
diff --git a/node/Multicaster.cpp b/node/Multicaster.cpp
index 01e6b799..41838552 100644
--- a/node/Multicaster.cpp
+++ b/node/Multicaster.cpp
@@ -78,7 +78,7 @@ void Multicaster::remove(uint64_t nwid,const MulticastGroup &mg,const Address &m
}
}
-unsigned int Multicaster::gather(const Address &queryingPeer,uint64_t nwid,const MulticastGroup &mg,Packet &appendTo,unsigned int limit) const
+unsigned int Multicaster::gather(const Address &queryingPeer,uint64_t nwid,const MulticastGroup &mg,Buffer<ZT_PROTO_MAX_PACKET_LENGTH> &appendTo,unsigned int limit) const
{
unsigned char *p;
unsigned int added = 0,i,k,rptr,totalKnown = 0;
diff --git a/node/Multicaster.hpp b/node/Multicaster.hpp
index 898c4db7..8e6a7556 100644
--- a/node/Multicaster.hpp
+++ b/node/Multicaster.hpp
@@ -146,7 +146,7 @@ public:
* @return Number of addresses appended
* @throws std::out_of_range Buffer overflow writing to packet
*/
- unsigned int gather(const Address &queryingPeer,uint64_t nwid,const MulticastGroup &mg,Packet &appendTo,unsigned int limit) const;
+ unsigned int gather(const Address &queryingPeer,uint64_t nwid,const MulticastGroup &mg,Buffer<ZT_PROTO_MAX_PACKET_LENGTH> &appendTo,unsigned int limit) const;
/**
* Get subscribers to a multicast group