summaryrefslogtreecommitdiff
path: root/node/Switch.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'node/Switch.cpp')
-rw-r--r--node/Switch.cpp179
1 files changed, 94 insertions, 85 deletions
diff --git a/node/Switch.cpp b/node/Switch.cpp
index 989f497a..ecae9b76 100644
--- a/node/Switch.cpp
+++ b/node/Switch.cpp
@@ -67,7 +67,10 @@ static const char *etherTypeName(const unsigned int etherType)
Switch::Switch(const RuntimeEnvironment *renv) :
RR(renv),
- _lastBeaconResponse(0)
+ _lastBeaconResponse(0),
+ _outstandingWhoisRequests(32),
+ _defragQueue(32),
+ _lastUniteAttempt(8) // only really used on root servers and upstreams, and it'll grow there just fine
{
}
@@ -75,7 +78,7 @@ Switch::~Switch()
{
}
-void Switch::onRemotePacket(const InetAddress &fromAddr,const void *data,unsigned int len)
+void Switch::onRemotePacket(const InetAddress &localAddr,const InetAddress &fromAddr,const void *data,unsigned int len)
{
try {
if (len == 13) {
@@ -93,14 +96,14 @@ void Switch::onRemotePacket(const InetAddress &fromAddr,const void *data,unsigne
_lastBeaconResponse = now;
Packet outp(peer->address(),RR->identity.address(),Packet::VERB_NOP);
outp.armor(peer->key(),false);
- RR->node->putPacket(fromAddr,outp.data(),outp.size());
+ RR->node->putPacket(localAddr,fromAddr,outp.data(),outp.size());
}
}
} else if (len > ZT_PROTO_MIN_FRAGMENT_LENGTH) {
if (((const unsigned char *)data)[ZT_PACKET_FRAGMENT_IDX_FRAGMENT_INDICATOR] == ZT_PACKET_FRAGMENT_INDICATOR) {
- _handleRemotePacketFragment(fromAddr,data,len);
+ _handleRemotePacketFragment(localAddr,fromAddr,data,len);
} else if (len >= ZT_PROTO_MIN_PACKET_LENGTH) {
- _handleRemotePacketHead(fromAddr,data,len);
+ _handleRemotePacketHead(localAddr,fromAddr,data,len);
}
}
} catch (std::exception &ex) {
@@ -291,7 +294,7 @@ void Switch::send(const Packet &packet,bool encrypt,uint64_t nwid)
if (!_trySend(packet,encrypt,nwid)) {
Mutex::Lock _l(_txQueue_m);
- _txQueue.insert(std::pair< Address,TXQueueEntry >(packet.destination(),TXQueueEntry(RR->node->now(),packet,encrypt,nwid)));
+ _txQueue.push_back(TXQueueEntry(packet.destination(),RR->node->now(),packet,encrypt,nwid));
}
}
@@ -309,31 +312,18 @@ bool Switch::unite(const Address &p1,const Address &p2,bool force)
const uint64_t now = RR->node->now();
- std::pair<InetAddress,InetAddress> cg(Peer::findCommonGround(*p1p,*p2p,now));
- if (!(cg.first))
- return false;
-
- if (cg.first.ipScope() != cg.second.ipScope())
- return false;
-
- // Addresses are sorted in key for last unite attempt map for order
- // invariant lookup: (p1,p2) == (p2,p1)
- Array<Address,2> uniteKey;
- if (p1 >= p2) {
- uniteKey[0] = p2;
- uniteKey[1] = p1;
- } else {
- uniteKey[0] = p1;
- uniteKey[1] = p2;
- }
{
Mutex::Lock _l(_lastUniteAttempt_m);
- std::map< Array< Address,2 >,uint64_t >::const_iterator e(_lastUniteAttempt.find(uniteKey));
- if ((!force)&&(e != _lastUniteAttempt.end())&&((now - e->second) < ZT_MIN_UNITE_INTERVAL))
+ uint64_t &luts = _lastUniteAttempt[_LastUniteKey(p1,p2)];
+ if (((now - luts) < ZT_MIN_UNITE_INTERVAL)&&(!force))
return false;
- else _lastUniteAttempt[uniteKey] = now;
+ luts = now;
}
+ std::pair<InetAddress,InetAddress> cg(Peer::findCommonGround(*p1p,*p2p,now));
+ if ((!(cg.first))||(cg.first.ipScope() != cg.second.ipScope()))
+ return false;
+
TRACE("unite: %s(%s) <> %s(%s)",p1.toString().c_str(),cg.second.toString().c_str(),p2.toString().c_str(),cg.first.toString().c_str());
/* Tell P1 where to find P2 and vice versa, sending the packets to P1 and
@@ -386,14 +376,14 @@ bool Switch::unite(const Address &p1,const Address &p2,bool force)
return true;
}
-void Switch::rendezvous(const SharedPtr<Peer> &peer,const InetAddress &atAddr)
+void Switch::rendezvous(const SharedPtr<Peer> &peer,const InetAddress &localAddr,const InetAddress &atAddr)
{
TRACE("sending NAT-t message to %s(%s)",peer->address().toString().c_str(),atAddr.toString().c_str());
const uint64_t now = RR->node->now();
- peer->attemptToContactAt(RR,atAddr,now);
+ peer->attemptToContactAt(RR,localAddr,atAddr,now);
{
Mutex::Lock _l(_contactQueue_m);
- _contactQueue.push_back(ContactQueueEntry(peer,now + ZT_NAT_T_TACTICAL_ESCALATION_DELAY,atAddr));
+ _contactQueue.push_back(ContactQueueEntry(peer,now + ZT_NAT_T_TACTICAL_ESCALATION_DELAY,localAddr,atAddr));
}
}
@@ -402,10 +392,13 @@ void Switch::requestWhois(const Address &addr)
bool inserted = false;
{
Mutex::Lock _l(_outstandingWhoisRequests_m);
- std::pair< std::map< Address,WhoisRequest >::iterator,bool > entry(_outstandingWhoisRequests.insert(std::pair<Address,WhoisRequest>(addr,WhoisRequest())));
- if ((inserted = entry.second))
- entry.first->second.lastSent = RR->node->now();
- entry.first->second.retries = 0; // reset retry count if entry already existed
+ WhoisRequest &r = _outstandingWhoisRequests[addr];
+ if (r.lastSent) {
+ r.retries = 0; // reset retry count if entry already existed, but keep waiting and retry again after normal timeout
+ } else {
+ r.lastSent = RR->node->now();
+ inserted = true;
+ }
}
if (inserted)
_sendWhoisRequest(addr,(const Address *)0,0);
@@ -435,11 +428,12 @@ void Switch::doAnythingWaitingForPeer(const SharedPtr<Peer> &peer)
{ // finish sending any packets waiting on peer's public key / identity
Mutex::Lock _l(_txQueue_m);
- std::pair< std::multimap< Address,TXQueueEntry >::iterator,std::multimap< Address,TXQueueEntry >::iterator > waitingTxQueueItems(_txQueue.equal_range(peer->address()));
- for(std::multimap< Address,TXQueueEntry >::iterator txi(waitingTxQueueItems.first);txi!=waitingTxQueueItems.second;) {
- if (_trySend(txi->second.packet,txi->second.encrypt,txi->second.nwid))
- _txQueue.erase(txi++);
- else ++txi;
+ for(std::list< TXQueueEntry >::iterator txi(_txQueue.begin());txi!=_txQueue.end();) {
+ if (txi->dest == peer->address()) {
+ if (_trySend(txi->packet,txi->encrypt,txi->nwid))
+ _txQueue.erase(txi++);
+ else ++txi;
+ } else ++txi;
}
}
}
@@ -459,14 +453,14 @@ unsigned long Switch::doTimerTasks(uint64_t now)
} else {
if (qi->strategyIteration == 0) {
// First strategy: send packet directly to destination
- qi->peer->attemptToContactAt(RR,qi->inaddr,now);
+ qi->peer->attemptToContactAt(RR,qi->localAddr,qi->inaddr,now);
} else if (qi->strategyIteration <= 4) {
// Strategies 1-4: try escalating ports for symmetric NATs that remap sequentially
InetAddress tmpaddr(qi->inaddr);
int p = (int)qi->inaddr.port() + qi->strategyIteration;
if (p < 0xffff) {
tmpaddr.setPort((unsigned int)p);
- qi->peer->attemptToContactAt(RR,tmpaddr,now);
+ qi->peer->attemptToContactAt(RR,qi->localAddr,tmpaddr,now);
} else qi->strategyIteration = 5;
} else {
// All strategies tried, expire entry
@@ -486,36 +480,37 @@ unsigned long Switch::doTimerTasks(uint64_t now)
{ // Retry outstanding WHOIS requests
Mutex::Lock _l(_outstandingWhoisRequests_m);
- for(std::map< Address,WhoisRequest >::iterator i(_outstandingWhoisRequests.begin());i!=_outstandingWhoisRequests.end();) {
- unsigned long since = (unsigned long)(now - i->second.lastSent);
+ Hashtable< Address,WhoisRequest >::Iterator i(_outstandingWhoisRequests);
+ Address *a = (Address *)0;
+ WhoisRequest *r = (WhoisRequest *)0;
+ while (i.next(a,r)) {
+ const unsigned long since = (unsigned long)(now - r->lastSent);
if (since >= ZT_WHOIS_RETRY_DELAY) {
- if (i->second.retries >= ZT_MAX_WHOIS_RETRIES) {
- TRACE("WHOIS %s timed out",i->first.toString().c_str());
- _outstandingWhoisRequests.erase(i++);
- continue;
+ if (r->retries >= ZT_MAX_WHOIS_RETRIES) {
+ TRACE("WHOIS %s timed out",a->toString().c_str());
+ _outstandingWhoisRequests.erase(*a);
} else {
- i->second.lastSent = now;
- i->second.peersConsulted[i->second.retries] = _sendWhoisRequest(i->first,i->second.peersConsulted,i->second.retries);
- ++i->second.retries;
- TRACE("WHOIS %s (retry %u)",i->first.toString().c_str(),i->second.retries);
+ r->lastSent = now;
+ r->peersConsulted[r->retries] = _sendWhoisRequest(*a,r->peersConsulted,r->retries);
+ ++r->retries;
+ TRACE("WHOIS %s (retry %u)",a->toString().c_str(),r->retries);
nextDelay = std::min(nextDelay,(unsigned long)ZT_WHOIS_RETRY_DELAY);
}
} else {
nextDelay = std::min(nextDelay,ZT_WHOIS_RETRY_DELAY - since);
}
- ++i;
}
}
{ // Time out TX queue packets that never got WHOIS lookups or other info.
Mutex::Lock _l(_txQueue_m);
- for(std::multimap< Address,TXQueueEntry >::iterator i(_txQueue.begin());i!=_txQueue.end();) {
- if (_trySend(i->second.packet,i->second.encrypt,i->second.nwid))
- _txQueue.erase(i++);
- else if ((now - i->second.creationTime) > ZT_TRANSMIT_QUEUE_TIMEOUT) {
- TRACE("TX %s -> %s timed out",i->second.packet.source().toString().c_str(),i->second.packet.destination().toString().c_str());
- _txQueue.erase(i++);
- } else ++i;
+ for(std::list< TXQueueEntry >::iterator txi(_txQueue.begin());txi!=_txQueue.end();) {
+ if (_trySend(txi->packet,txi->encrypt,txi->nwid))
+ _txQueue.erase(txi++);
+ else if ((now - txi->creationTime) > ZT_TRANSMIT_QUEUE_TIMEOUT) {
+ TRACE("TX %s -> %s timed out",txi->packet.source().toString().c_str(),txi->packet.destination().toString().c_str());
+ _txQueue.erase(txi++);
+ } else ++txi;
}
}
@@ -531,18 +526,32 @@ unsigned long Switch::doTimerTasks(uint64_t now)
{ // Time out packets that didn't get all their fragments.
Mutex::Lock _l(_defragQueue_m);
- for(std::map< uint64_t,DefragQueueEntry >::iterator i(_defragQueue.begin());i!=_defragQueue.end();) {
- if ((now - i->second.creationTime) > ZT_FRAGMENTED_PACKET_RECEIVE_TIMEOUT) {
- TRACE("incomplete fragmented packet %.16llx timed out, fragments discarded",i->first);
- _defragQueue.erase(i++);
- } else ++i;
+ Hashtable< uint64_t,DefragQueueEntry >::Iterator i(_defragQueue);
+ uint64_t *packetId = (uint64_t *)0;
+ DefragQueueEntry *qe = (DefragQueueEntry *)0;
+ while (i.next(packetId,qe)) {
+ if ((now - qe->creationTime) > ZT_FRAGMENTED_PACKET_RECEIVE_TIMEOUT) {
+ TRACE("incomplete fragmented packet %.16llx timed out, fragments discarded",*packetId);
+ _defragQueue.erase(*packetId);
+ }
+ }
+ }
+
+ { // Remove really old last unite attempt entries to keep table size controlled
+ Mutex::Lock _l(_lastUniteAttempt_m);
+ Hashtable< _LastUniteKey,uint64_t >::Iterator i(_lastUniteAttempt);
+ _LastUniteKey *k = (_LastUniteKey *)0;
+ uint64_t *v = (uint64_t *)0;
+ while (i.next(k,v)) {
+ if ((now - *v) >= (ZT_MIN_UNITE_INTERVAL * 16))
+ _lastUniteAttempt.erase(*k);
}
}
return nextDelay;
}
-void Switch::_handleRemotePacketFragment(const InetAddress &fromAddr,const void *data,unsigned int len)
+void Switch::_handleRemotePacketFragment(const InetAddress &localAddr,const InetAddress &fromAddr,const void *data,unsigned int len)
{
Packet::Fragment fragment(data,len);
Address destination(fragment.destination());
@@ -577,32 +586,31 @@ void Switch::_handleRemotePacketFragment(const InetAddress &fromAddr,const void
// seeing a Packet::Fragment?
Mutex::Lock _l(_defragQueue_m);
- std::map< uint64_t,DefragQueueEntry >::iterator dqe(_defragQueue.find(pid));
+ DefragQueueEntry &dq = _defragQueue[pid];
- if (dqe == _defragQueue.end()) {
+ if (!dq.creationTime) {
// We received a Packet::Fragment without its head, so queue it and wait
- DefragQueueEntry &dq = _defragQueue[pid];
dq.creationTime = RR->node->now();
dq.frags[fno - 1] = fragment;
dq.totalFragments = tf; // total fragment count is known
dq.haveFragments = 1 << fno; // we have only this fragment
//TRACE("fragment (%u/%u) of %.16llx from %s",fno + 1,tf,pid,fromAddr.toString().c_str());
- } else if (!(dqe->second.haveFragments & (1 << fno))) {
+ } else if (!(dq.haveFragments & (1 << fno))) {
// We have other fragments and maybe the head, so add this one and check
- dqe->second.frags[fno - 1] = fragment;
- dqe->second.totalFragments = tf;
+ dq.frags[fno - 1] = fragment;
+ dq.totalFragments = tf;
//TRACE("fragment (%u/%u) of %.16llx from %s",fno + 1,tf,pid,fromAddr.toString().c_str());
- if (Utils::countBits(dqe->second.haveFragments |= (1 << fno)) == tf) {
+ if (Utils::countBits(dq.haveFragments |= (1 << fno)) == tf) {
// We have all fragments -- assemble and process full Packet
//TRACE("packet %.16llx is complete, assembling and processing...",pid);
- SharedPtr<IncomingPacket> packet(dqe->second.frag0);
+ SharedPtr<IncomingPacket> packet(dq.frag0);
for(unsigned int f=1;f<tf;++f)
- packet->append(dqe->second.frags[f - 1].payload(),dqe->second.frags[f - 1].payloadLength());
- _defragQueue.erase(dqe);
+ packet->append(dq.frags[f - 1].payload(),dq.frags[f - 1].payloadLength());
+ _defragQueue.erase(pid); // dq no longer valid after this
if (!packet->tryDecode(RR)) {
Mutex::Lock _l(_rxQueue_m);
@@ -614,9 +622,9 @@ void Switch::_handleRemotePacketFragment(const InetAddress &fromAddr,const void
}
}
-void Switch::_handleRemotePacketHead(const InetAddress &fromAddr,const void *data,unsigned int len)
+void Switch::_handleRemotePacketHead(const InetAddress &localAddr,const InetAddress &fromAddr,const void *data,unsigned int len)
{
- SharedPtr<IncomingPacket> packet(new IncomingPacket(data,len,fromAddr,RR->node->now()));
+ SharedPtr<IncomingPacket> packet(new IncomingPacket(data,len,localAddr,fromAddr,RR->node->now()));
Address source(packet->source());
Address destination(packet->destination());
@@ -645,26 +653,27 @@ void Switch::_handleRemotePacketHead(const InetAddress &fromAddr,const void *dat
uint64_t pid = packet->packetId();
Mutex::Lock _l(_defragQueue_m);
- std::map< uint64_t,DefragQueueEntry >::iterator dqe(_defragQueue.find(pid));
+ DefragQueueEntry &dq = _defragQueue[pid];
- if (dqe == _defragQueue.end()) {
+ if (!dq.creationTime) {
// If we have no other fragments yet, create an entry and save the head
- DefragQueueEntry &dq = _defragQueue[pid];
+
dq.creationTime = RR->node->now();
dq.frag0 = packet;
dq.totalFragments = 0; // 0 == unknown, waiting for Packet::Fragment
dq.haveFragments = 1; // head is first bit (left to right)
//TRACE("fragment (0/?) of %.16llx from %s",pid,fromAddr.toString().c_str());
- } else if (!(dqe->second.haveFragments & 1)) {
+ } else if (!(dq.haveFragments & 1)) {
// If we have other fragments but no head, see if we are complete with the head
- if ((dqe->second.totalFragments)&&(Utils::countBits(dqe->second.haveFragments |= 1) == dqe->second.totalFragments)) {
+
+ if ((dq.totalFragments)&&(Utils::countBits(dq.haveFragments |= 1) == dq.totalFragments)) {
// We have all fragments -- assemble and process full Packet
//TRACE("packet %.16llx is complete, assembling and processing...",pid);
// packet already contains head, so append fragments
- for(unsigned int f=1;f<dqe->second.totalFragments;++f)
- packet->append(dqe->second.frags[f - 1].payload(),dqe->second.frags[f - 1].payloadLength());
- _defragQueue.erase(dqe);
+ for(unsigned int f=1;f<dq.totalFragments;++f)
+ packet->append(dq.frags[f - 1].payload(),dq.frags[f - 1].payloadLength());
+ _defragQueue.erase(pid); // dq no longer valid after this
if (!packet->tryDecode(RR)) {
Mutex::Lock _l(_rxQueue_m);
@@ -672,7 +681,7 @@ void Switch::_handleRemotePacketHead(const InetAddress &fromAddr,const void *dat
}
} else {
// Still waiting on more fragments, so queue the head
- dqe->second.frag0 = packet;
+ dq.frag0 = packet;
}
} // else this is a duplicate head, ignore
} else {