summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAdam Ierymenko <adam.ierymenko@gmail.com>2014-09-24 13:45:58 -0700
committerAdam Ierymenko <adam.ierymenko@gmail.com>2014-09-24 13:45:58 -0700
commit431476e2e4474c83013c2a1a6d80f1e815a2d37c (patch)
tree2c1a6e2aabee108695d1f993e3085b0ed923f7e3
parent557801a09ef31c243add5f6d6de7a5f67af53ed2 (diff)
downloadinfinitytier-431476e2e4474c83013c2a1a6d80f1e815a2d37c.tar.gz
infinitytier-431476e2e4474c83013c2a1a6d80f1e815a2d37c.zip
Some more multicast algo work...
-rw-r--r--node/MulticastTopology.cpp19
-rw-r--r--node/MulticastTopology.hpp24
-rw-r--r--node/OutboundMulticast.hpp19
-rw-r--r--node/Packet.hpp1
-rw-r--r--node/Switch.cpp183
-rw-r--r--node/Switch.hpp52
6 files changed, 165 insertions, 133 deletions
diff --git a/node/MulticastTopology.cpp b/node/MulticastTopology.cpp
index e4b1066d..1db1aa7a 100644
--- a/node/MulticastTopology.cpp
+++ b/node/MulticastTopology.cpp
@@ -41,19 +41,8 @@ MulticastTopology::~MulticastTopology()
{
}
-void MulticastTopology::add(const MulticastGroup &mg,const Address &member,const Address &learnedFrom)
+void MulticastTopology::add(const MulticastGroup &mg,const Address &learnedFrom,const Address &member)
{
- Mutex::Lock _l(_groups_m);
- std::vector<MulticastGroupMember> &mv = _groups[mg].members;
- for(std::vector<MulticastGroupMember>::iterator m(mv.begin());m!=mv.end();++m) {
- if (m->address == member) {
- if (m->learnedFrom) // once a member has been seen directly, we keep its status as direct
- m->learnedFrom = learnedFrom;
- m->timestamp = Utils::now();
- return;
- }
- }
- mv.push_back(MulticastGroupMember(member,learnedFrom,Utils::now()));
}
void MulticastTopology::erase(const MulticastGroup &mg,const Address &member)
@@ -72,6 +61,12 @@ void MulticastTopology::erase(const MulticastGroup &mg,const Address &member)
}
}
+void send(uint64_t nwid,uint64_t now,const Address &self,const MulticastGroup &mg,const MAC &from,unsigned int etherType,const void *data,unsigned int len)
+{
+ Mutex::Lock _l(_groups_m);
+ std::map< MulticastGroup,MulticastGroupStatus >::iterator r(_groups.find(mg));
+}
+
unsigned int MulticastTopology::shouldGather(const MulticastGroup &mg,uint64_t now,unsigned int limit,bool updateLastGatheredTimeOnNonzeroReturn)
{
Mutex::Lock _l(_groups_m);
diff --git a/node/MulticastTopology.hpp b/node/MulticastTopology.hpp
index 8a68f2b2..0b330df3 100644
--- a/node/MulticastTopology.hpp
+++ b/node/MulticastTopology.hpp
@@ -33,10 +33,14 @@
#include <map>
#include <vector>
+#include <list>
#include "Constants.hpp"
#include "Address.hpp"
+#include "MAC.hpp"
#include "MulticastGroup.hpp"
+#include "OutboundMulticast.hpp"
+#include "Switch.hpp"
#include "Utils.hpp"
#include "Mutex.hpp"
@@ -70,6 +74,7 @@ private:
uint64_t lastGatheredMembers; // time we last gathered members
std::vector<MulticastGroupMember> members; // members of this group
+ std::list<OutboundMulticast> txQueue; // pending outbound multicasts
};
public:
@@ -80,10 +85,10 @@ public:
* Add or update a member in a multicast group
*
* @param mg Multicast group
- * @param member Member to add/update
* @param learnedFrom Address from which we learned this member or NULL/0 Address if direct
+ * @param member New member address
*/
- void add(const MulticastGroup &mg,const Address &member,const Address &learnedFrom);
+ void add(const MulticastGroup &mg,const Address &learnedFrom,const Address &member);
/**
* Erase a member from a multicast group (if present)
@@ -94,6 +99,21 @@ public:
void erase(const MulticastGroup &mg,const Address &member);
/**
+ * Send a multicast
+ *
+ * @param nwid Network ID
+ * @param now Current time
+ * @param sw Switch to use for sending packets
+ * @param self This node's address
+ * @param mg Multicast group
+ * @param from Source Ethernet MAC address
+ * @param etherType Ethernet frame type
+ * @param data Packet data
+ * @param len Length of packet data
+ */
+ void send(uint64_t nwid,uint64_t now,const Switch &sw,const Address &self,const MulticastGroup &mg,const MAC &from,unsigned int etherType,const void *data,unsigned int len);
+
+ /**
* @param mg Multicast group
* @return Tuple of: time we last gathered members (or 0 for never) and number of known members
*/
diff --git a/node/OutboundMulticast.hpp b/node/OutboundMulticast.hpp
index f8338b93..3ef9532b 100644
--- a/node/OutboundMulticast.hpp
+++ b/node/OutboundMulticast.hpp
@@ -51,6 +51,13 @@ class OutboundMulticast
{
public:
/**
+ * Create an uninitialized outbound multicast
+ *
+ * It must be initialized with init().
+ */
+ OutboundMulticast() {}
+
+ /**
* Initialize outbound multicast
*
* @param timestamp Creation time
@@ -63,13 +70,13 @@ public:
* @param len Length of data
* @throws std::out_of_range Data too large to fit in a MULTICAST_FRAME
*/
- OutboundMulticast(uint64_t timestamp,const Address &self,uint64_t nwid,const MAC &src,const MulticastGroup &dest,unsigned int etherType,const void *payload,unsigned int len) :
- _timestamp(timestamp),
- _nwid(nwid),
- _source(src),
- _destination(dest),
- _etherType(etherType)
+ inline void init(uint64_t timestamp,const Address &self,uint64_t nwid,const MAC &src,const MulticastGroup &dest,unsigned int etherType,const void *payload,unsigned int len)
{
+ _timestamp = timestamp;
+ _nwid = nwid;
+ _source = src;
+ _destination = dest;
+ _etherType = etherType;
_packet.setSource(self);
_packet.setVerb(Packet::VERB_MULTICAST_FRAME);
_packet.append((char)0);
diff --git a/node/Packet.hpp b/node/Packet.hpp
index aedc9e4c..214479d6 100644
--- a/node/Packet.hpp
+++ b/node/Packet.hpp
@@ -694,6 +694,7 @@ public:
VERB_MULTICAST_GATHER = 13,
/* Multicast frame:
+ * <[8] 64-bit network ID>
* <[1] flags (currently unused, must be 0)>
* <[4] 32-bit multicast ADI (note that this is out of order here -- it precedes MAC)>
* <[6] destination MAC or all zero for destination node>
diff --git a/node/Switch.cpp b/node/Switch.cpp
index 4e8cf4d8..f1c814ef 100644
--- a/node/Switch.cpp
+++ b/node/Switch.cpp
@@ -56,8 +56,7 @@ namespace ZeroTier {
Switch::Switch(const RuntimeEnvironment *renv) :
_r(renv),
- _lastBeacon(0),
- _multicastIdCounter((unsigned int)renv->prng->next32()) // start a random spot to minimize possible collisions on startup
+ _lastBeacon(0)
{
}
@@ -478,96 +477,6 @@ void Switch::contact(const SharedPtr<Peer> &peer,const InetAddress &atAddr)
_r->sm->whack();
}
-unsigned long Switch::doTimerTasks()
-{
- unsigned long nextDelay = ~((unsigned long)0); // big number, caller will cap return value
- uint64_t now = Utils::now();
-
- {
- Mutex::Lock _l(_contactQueue_m);
- for(std::list<ContactQueueEntry>::iterator qi(_contactQueue.begin());qi!=_contactQueue.end();) {
- if (now >= qi->fireAtTime) {
- if (!qi->peer->hasActiveDirectPath(now)) {
- TRACE("deploying aggressive NAT-t against %s(%s)",qi->peer->address().toString().c_str(),qi->inaddr.toString().c_str());
-
- /* Shotgun approach -- literally -- against symmetric NATs. Most of these
- * either increment or decrement ports so this gets a good number. Also try
- * the original port one more time for good measure, since sometimes it
- * fails first time around. */
- int p = (int)qi->inaddr.port() - 2;
- for(int k=0;k<5;++k) {
- if ((p > 0)&&(p <= 0xffff)) {
- qi->inaddr.setPort((unsigned int)p);
- sendHELLO(qi->peer,qi->inaddr);
- }
- ++p;
- }
- }
-
- _contactQueue.erase(qi++);
- } else {
- nextDelay = std::min(nextDelay,(unsigned long)(qi->fireAtTime - now));
- ++qi;
- }
- }
- }
-
- {
- Mutex::Lock _l(_outstandingWhoisRequests_m);
- for(std::map< Address,WhoisRequest >::iterator i(_outstandingWhoisRequests.begin());i!=_outstandingWhoisRequests.end();) {
- unsigned long since = (unsigned long)(now - i->second.lastSent);
- if (since >= ZT_WHOIS_RETRY_DELAY) {
- if (i->second.retries >= ZT_MAX_WHOIS_RETRIES) {
- TRACE("WHOIS %s timed out",i->first.toString().c_str());
- _outstandingWhoisRequests.erase(i++);
- continue;
- } else {
- i->second.lastSent = now;
- i->second.peersConsulted[i->second.retries] = _sendWhoisRequest(i->first,i->second.peersConsulted,i->second.retries);
- ++i->second.retries;
- TRACE("WHOIS %s (retry %u)",i->first.toString().c_str(),i->second.retries);
- nextDelay = std::min(nextDelay,(unsigned long)ZT_WHOIS_RETRY_DELAY);
- }
- } else nextDelay = std::min(nextDelay,ZT_WHOIS_RETRY_DELAY - since);
- ++i;
- }
- }
-
- {
- Mutex::Lock _l(_txQueue_m);
- for(std::multimap< Address,TXQueueEntry >::iterator i(_txQueue.begin());i!=_txQueue.end();) {
- if (_trySend(i->second.packet,i->second.encrypt))
- _txQueue.erase(i++);
- else if ((now - i->second.creationTime) > ZT_TRANSMIT_QUEUE_TIMEOUT) {
- TRACE("TX %s -> %s timed out",i->second.packet.source().toString().c_str(),i->second.packet.destination().toString().c_str());
- _txQueue.erase(i++);
- } else ++i;
- }
- }
-
- {
- Mutex::Lock _l(_rxQueue_m);
- for(std::list< SharedPtr<IncomingPacket> >::iterator i(_rxQueue.begin());i!=_rxQueue.end();) {
- if ((now - (*i)->receiveTime()) > ZT_RECEIVE_QUEUE_TIMEOUT) {
- TRACE("RX %s -> %s timed out",(*i)->source().toString().c_str(),(*i)->destination().toString().c_str());
- _rxQueue.erase(i++);
- } else ++i;
- }
- }
-
- {
- Mutex::Lock _l(_defragQueue_m);
- for(std::map< uint64_t,DefragQueueEntry >::iterator i(_defragQueue.begin());i!=_defragQueue.end();) {
- if ((now - i->second.creationTime) > ZT_FRAGMENTED_PACKET_RECEIVE_TIMEOUT) {
- TRACE("incomplete fragmented packet %.16llx timed out, fragments discarded",i->first);
- _defragQueue.erase(i++);
- } else ++i;
- }
- }
-
- return std::max(nextDelay,(unsigned long)10); // minimum delay
-}
-
void Switch::announceMulticastGroups(const std::map< SharedPtr<Network>,std::set<MulticastGroup> > &allMemberships)
{
std::vector< SharedPtr<Peer> > directPeers;
@@ -682,6 +591,96 @@ void Switch::doAnythingWaitingForPeer(const SharedPtr<Peer> &peer)
}
}
+unsigned long Switch::doTimerTasks()
+{
+ unsigned long nextDelay = ~((unsigned long)0); // big number, caller will cap return value
+ uint64_t now = Utils::now();
+
+ {
+ Mutex::Lock _l(_contactQueue_m);
+ for(std::list<ContactQueueEntry>::iterator qi(_contactQueue.begin());qi!=_contactQueue.end();) {
+ if (now >= qi->fireAtTime) {
+ if (!qi->peer->hasActiveDirectPath(now)) {
+ TRACE("deploying aggressive NAT-t against %s(%s)",qi->peer->address().toString().c_str(),qi->inaddr.toString().c_str());
+
+ /* Shotgun approach -- literally -- against symmetric NATs. Most of these
+ * either increment or decrement ports so this gets a good number. Also try
+ * the original port one more time for good measure, since sometimes it
+ * fails first time around. */
+ int p = (int)qi->inaddr.port() - 2;
+ for(int k=0;k<5;++k) {
+ if ((p > 0)&&(p <= 0xffff)) {
+ qi->inaddr.setPort((unsigned int)p);
+ sendHELLO(qi->peer,qi->inaddr);
+ }
+ ++p;
+ }
+ }
+
+ _contactQueue.erase(qi++);
+ } else {
+ nextDelay = std::min(nextDelay,(unsigned long)(qi->fireAtTime - now));
+ ++qi;
+ }
+ }
+ }
+
+ {
+ Mutex::Lock _l(_outstandingWhoisRequests_m);
+ for(std::map< Address,WhoisRequest >::iterator i(_outstandingWhoisRequests.begin());i!=_outstandingWhoisRequests.end();) {
+ unsigned long since = (unsigned long)(now - i->second.lastSent);
+ if (since >= ZT_WHOIS_RETRY_DELAY) {
+ if (i->second.retries >= ZT_MAX_WHOIS_RETRIES) {
+ TRACE("WHOIS %s timed out",i->first.toString().c_str());
+ _outstandingWhoisRequests.erase(i++);
+ continue;
+ } else {
+ i->second.lastSent = now;
+ i->second.peersConsulted[i->second.retries] = _sendWhoisRequest(i->first,i->second.peersConsulted,i->second.retries);
+ ++i->second.retries;
+ TRACE("WHOIS %s (retry %u)",i->first.toString().c_str(),i->second.retries);
+ nextDelay = std::min(nextDelay,(unsigned long)ZT_WHOIS_RETRY_DELAY);
+ }
+ } else nextDelay = std::min(nextDelay,ZT_WHOIS_RETRY_DELAY - since);
+ ++i;
+ }
+ }
+
+ {
+ Mutex::Lock _l(_txQueue_m);
+ for(std::multimap< Address,TXQueueEntry >::iterator i(_txQueue.begin());i!=_txQueue.end();) {
+ if (_trySend(i->second.packet,i->second.encrypt))
+ _txQueue.erase(i++);
+ else if ((now - i->second.creationTime) > ZT_TRANSMIT_QUEUE_TIMEOUT) {
+ TRACE("TX %s -> %s timed out",i->second.packet.source().toString().c_str(),i->second.packet.destination().toString().c_str());
+ _txQueue.erase(i++);
+ } else ++i;
+ }
+ }
+
+ {
+ Mutex::Lock _l(_rxQueue_m);
+ for(std::list< SharedPtr<IncomingPacket> >::iterator i(_rxQueue.begin());i!=_rxQueue.end();) {
+ if ((now - (*i)->receiveTime()) > ZT_RECEIVE_QUEUE_TIMEOUT) {
+ TRACE("RX %s -> %s timed out",(*i)->source().toString().c_str(),(*i)->destination().toString().c_str());
+ _rxQueue.erase(i++);
+ } else ++i;
+ }
+ }
+
+ {
+ Mutex::Lock _l(_defragQueue_m);
+ for(std::map< uint64_t,DefragQueueEntry >::iterator i(_defragQueue.begin());i!=_defragQueue.end();) {
+ if ((now - i->second.creationTime) > ZT_FRAGMENTED_PACKET_RECEIVE_TIMEOUT) {
+ TRACE("incomplete fragmented packet %.16llx timed out, fragments discarded",i->first);
+ _defragQueue.erase(i++);
+ } else ++i;
+ }
+ }
+
+ return std::max(nextDelay,(unsigned long)10); // minimum delay
+}
+
const char *Switch::etherTypeName(const unsigned int etherType)
throw()
{
diff --git a/node/Switch.hpp b/node/Switch.hpp
index c7f80606..a1c2f752 100644
--- a/node/Switch.hpp
+++ b/node/Switch.hpp
@@ -67,6 +67,11 @@ class Peer;
/**
* Core of the distributed Ethernet switch and protocol implementation
+ *
+ * This class is perhaps a bit misnamed, but it's basically where everything
+ * meets. Transport-layer ZT packets come in here, as do virtual network
+ * packets from tap devices, and this sends them where they need to go and
+ * wraps/unwraps accordingly. It also handles queues and timeouts and such.
*/
class Switch : NonCopyable
{
@@ -161,13 +166,6 @@ public:
void contact(const SharedPtr<Peer> &peer,const InetAddress &atAddr);
/**
- * Perform retries and other periodic timer tasks
- *
- * @return Number of milliseconds until doTimerTasks() should be run again
- */
- unsigned long doTimerTasks();
-
- /**
* Announce multicast group memberships
*
* This announces all the groups for all the networks in the supplied map to
@@ -204,7 +202,7 @@ public:
void cancelWhoisRequest(const Address &addr);
/**
- * Run any processes that are waiting for this peer
+ * Run any processes that are waiting for this peer's identity
*
* Called when we learn of a peer's identity from HELLO, OK(WHOIS), etc.
*
@@ -213,6 +211,13 @@ public:
void doAnythingWaitingForPeer(const SharedPtr<Peer> &peer);
/**
+ * Perform retries and other periodic timer tasks
+ *
+ * @return Number of milliseconds until doTimerTasks() should be run again
+ */
+ unsigned long doTimerTasks();
+
+ /**
* @param etherType Ethernet type ID
* @return Human-readable name
*/
@@ -235,8 +240,8 @@ private:
const RuntimeEnvironment *const _r;
volatile uint64_t _lastBeacon;
- volatile unsigned int _multicastIdCounter;
+ // Outsanding WHOIS requests and how many retries they've undergone
struct WhoisRequest
{
uint64_t lastSent;
@@ -246,9 +251,23 @@ private:
std::map< Address,WhoisRequest > _outstandingWhoisRequests;
Mutex _outstandingWhoisRequests_m;
- std::list< SharedPtr<IncomingPacket> > _rxQueue;
+ // Packet defragmentation queue -- comes before RX queue in path
+ struct DefragQueueEntry
+ {
+ uint64_t creationTime;
+ SharedPtr<IncomingPacket> frag0;
+ Packet::Fragment frags[ZT_MAX_PACKET_FRAGMENTS - 1];
+ unsigned int totalFragments; // 0 if only frag0 received, waiting for frags
+ uint32_t haveFragments; // bit mask, LSB to MSB
+ };
+ std::map< uint64_t,DefragQueueEntry > _defragQueue;
+ Mutex _defragQueue_m;
+
+ // ZeroTier-layer RX queue of incoming packets in the process of being decoded
+ std::vector< SharedPtr<IncomingPacket> > _rxQueue;
Mutex _rxQueue_m;
+ // ZeroTier-layer TX queue by destination ZeroTier address
struct TXQueueEntry
{
TXQueueEntry() {}
@@ -264,20 +283,11 @@ private:
std::multimap< Address,TXQueueEntry > _txQueue;
Mutex _txQueue_m;
- struct DefragQueueEntry
- {
- uint64_t creationTime;
- SharedPtr<IncomingPacket> frag0;
- Packet::Fragment frags[ZT_MAX_PACKET_FRAGMENTS - 1];
- unsigned int totalFragments; // 0 if only frag0 received, waiting for frags
- uint32_t haveFragments; // bit mask, LSB to MSB
- };
- std::map< uint64_t,DefragQueueEntry > _defragQueue;
- Mutex _defragQueue_m;
-
+ // Tracks sending of VERB_RENDEZVOUS to relaying peers
std::map< Array< Address,2 >,uint64_t > _lastUniteAttempt; // key is always sorted in ascending order, for set-like behavior
Mutex _lastUniteAttempt_m;
+ // Active attempts to contact remote peers, including state of multi-phase NAT traversal
struct ContactQueueEntry
{
ContactQueueEntry() {}