summaryrefslogtreecommitdiff
path: root/node/Switch.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'node/Switch.cpp')
-rw-r--r--node/Switch.cpp283
1 files changed, 273 insertions, 10 deletions
diff --git a/node/Switch.cpp b/node/Switch.cpp
index eeeca5db..7b517864 100644
--- a/node/Switch.cpp
+++ b/node/Switch.cpp
@@ -269,6 +269,8 @@ void Switch::onLocalEthernet(void *tPtr,const SharedPtr<Network> &network,const
}
}
+ uint8_t qosBucket = ZT_QOS_DEFAULT_BUCKET;
+
if (to.isMulticast()) {
MulticastGroup multicastGroup(to,0);
@@ -386,7 +388,7 @@ void Switch::onLocalEthernet(void *tPtr,const SharedPtr<Network> &network,const
network->learnBridgedMulticastGroup(tPtr,multicastGroup,RR->node->now());
// First pass sets noTee to false, but noTee is set to true in OutboundMulticast to prevent duplicates.
- if (!network->filterOutgoingPacket(tPtr,false,RR->identity.address(),Address(),from,to,(const uint8_t *)data,len,etherType,vlanId)) {
+ if (!network->filterOutgoingPacket(tPtr,false,RR->identity.address(),Address(),from,to,(const uint8_t *)data,len,etherType,vlanId,qosBucket)) {
RR->t->outgoingNetworkFrameDropped(tPtr,network,from,to,etherType,vlanId,len,"filter blocked");
return;
}
@@ -410,7 +412,7 @@ void Switch::onLocalEthernet(void *tPtr,const SharedPtr<Network> &network,const
Address toZT(to.toAddress(network->id())); // since in-network MACs are derived from addresses and network IDs, we can reverse this
SharedPtr<Peer> toPeer(RR->topology->getPeer(tPtr,toZT));
- if (!network->filterOutgoingPacket(tPtr,false,RR->identity.address(),toZT,from,to,(const uint8_t *)data,len,etherType,vlanId)) {
+ if (!network->filterOutgoingPacket(tPtr,false,RR->identity.address(),toZT,from,to,(const uint8_t *)data,len,etherType,vlanId,qosBucket)) {
RR->t->outgoingNetworkFrameDropped(tPtr,network,from,to,etherType,vlanId,len,"filter blocked");
return;
}
@@ -425,7 +427,7 @@ void Switch::onLocalEthernet(void *tPtr,const SharedPtr<Network> &network,const
outp.append(data,len);
if (!network->config().disableCompression())
outp.compress();
- send(tPtr,outp,true);
+ aqm_enqueue(tPtr,network,outp,true,qosBucket);
} else {
Packet outp(toZT,RR->identity.address(),Packet::VERB_FRAME);
outp.append(network->id());
@@ -433,7 +435,7 @@ void Switch::onLocalEthernet(void *tPtr,const SharedPtr<Network> &network,const
outp.append(data,len);
if (!network->config().disableCompression())
outp.compress();
- send(tPtr,outp,true);
+ aqm_enqueue(tPtr,network,outp,true,qosBucket);
}
} else {
@@ -442,7 +444,7 @@ void Switch::onLocalEthernet(void *tPtr,const SharedPtr<Network> &network,const
// We filter with a NULL destination ZeroTier address first. Filtrations
// for each ZT destination are also done below. This is the same rationale
// and design as for multicast.
- if (!network->filterOutgoingPacket(tPtr,false,RR->identity.address(),Address(),from,to,(const uint8_t *)data,len,etherType,vlanId)) {
+ if (!network->filterOutgoingPacket(tPtr,false,RR->identity.address(),Address(),from,to,(const uint8_t *)data,len,etherType,vlanId,qosBucket)) {
RR->t->outgoingNetworkFrameDropped(tPtr,network,from,to,etherType,vlanId,len,"filter blocked");
return;
}
@@ -480,7 +482,7 @@ void Switch::onLocalEthernet(void *tPtr,const SharedPtr<Network> &network,const
}
for(unsigned int b=0;b<numBridges;++b) {
- if (network->filterOutgoingPacket(tPtr,true,RR->identity.address(),bridges[b],from,to,(const uint8_t *)data,len,etherType,vlanId)) {
+ if (network->filterOutgoingPacket(tPtr,true,RR->identity.address(),bridges[b],from,to,(const uint8_t *)data,len,etherType,vlanId,qosBucket)) {
Packet outp(bridges[b],RR->identity.address(),Packet::VERB_EXT_FRAME);
outp.append(network->id());
outp.append((uint8_t)0x00);
@@ -490,7 +492,7 @@ void Switch::onLocalEthernet(void *tPtr,const SharedPtr<Network> &network,const
outp.append(data,len);
if (!network->config().disableCompression())
outp.compress();
- send(tPtr,outp,true);
+ aqm_enqueue(tPtr,network,outp,true,qosBucket);
} else {
RR->t->outgoingNetworkFrameDropped(tPtr,network,from,to,etherType,vlanId,len,"filter blocked (bridge replication)");
}
@@ -498,6 +500,263 @@ void Switch::onLocalEthernet(void *tPtr,const SharedPtr<Network> &network,const
}
}
+void Switch::aqm_enqueue(void *tPtr, const SharedPtr<Network> &network, Packet &packet,bool encrypt,int qosBucket)
+{
+ if(!network->QoSEnabled()) {
+ send(tPtr, packet, encrypt);
+ return;
+ }
+ NetworkQoSControlBlock *nqcb = _netQueueControlBlock[network->id()];
+ if (!nqcb) {
+ // DEBUG_INFO("creating network QoS control block (NQCB) for network %llx", network->id());
+ nqcb = new NetworkQoSControlBlock();
+ _netQueueControlBlock[network->id()] = nqcb;
+ // Initialize ZT_QOS_NUM_BUCKETS queues and place them in the INACTIVE list
+ // These queues will be shuffled between the new/old/inactive lists by the enqueue/dequeue algorithm
+ for (int i=0; i<ZT_QOS_NUM_BUCKETS; i++) {
+ nqcb->inactiveQueues.push_back(new ManagedQueue(i));
+ }
+ }
+
+ if (packet.verb() != Packet::VERB_FRAME && packet.verb() != Packet::VERB_EXT_FRAME) {
+ // DEBUG_INFO("skipping, no QoS for this packet, verb=%x", packet.verb());
+ // just send packet normally, no QoS for ZT protocol traffic
+ send(tPtr, packet, encrypt);
+ }
+
+ _aqm_m.lock();
+
+ // Enqueue packet and move queue to appropriate list
+
+ const Address dest(packet.destination());
+ TXQueueEntry *txEntry = new TXQueueEntry(dest,RR->node->now(),packet,encrypt);
+
+ ManagedQueue *selectedQueue = nullptr;
+ for (int i=0; i<ZT_QOS_NUM_BUCKETS; i++) {
+ if (i < nqcb->oldQueues.size()) { // search old queues first (I think this is best since old would imply most recent usage of the queue)
+ if (nqcb->oldQueues[i]->id == qosBucket) {
+ selectedQueue = nqcb->oldQueues[i];
+ }
+ } if (i < nqcb->newQueues.size()) { // search new queues (this would imply not often-used queues)
+ if (nqcb->newQueues[i]->id == qosBucket) {
+ selectedQueue = nqcb->newQueues[i];
+ }
+ } if (i < nqcb->inactiveQueues.size()) { // search inactive queues
+ if (nqcb->inactiveQueues[i]->id == qosBucket) {
+ selectedQueue = nqcb->inactiveQueues[i];
+ // move queue to end of NEW queue list
+ selectedQueue->byteCredit = ZT_QOS_QUANTUM;
+ // DEBUG_INFO("moving q=%p from INACTIVE to NEW list", selectedQueue);
+ nqcb->newQueues.push_back(selectedQueue);
+ nqcb->inactiveQueues.erase(nqcb->inactiveQueues.begin() + i);
+ }
+ }
+ }
+ if (!selectedQueue) {
+ return;
+ }
+
+ selectedQueue->q.push_back(txEntry);
+ selectedQueue->byteLength+=txEntry->packet.payloadLength();
+ nqcb->_currEnqueuedPackets++;
+
+ // DEBUG_INFO("nq=%2lu, oq=%2lu, iq=%2lu, nqcb.size()=%3d, bucket=%2d, q=%p", nqcb->newQueues.size(), nqcb->oldQueues.size(), nqcb->inactiveQueues.size(), nqcb->_currEnqueuedPackets, qosBucket, selectedQueue);
+
+ // Drop a packet if necessary
+ ManagedQueue *selectedQueueToDropFrom = nullptr;
+ if (nqcb->_currEnqueuedPackets > ZT_QOS_MAX_ENQUEUED_PACKETS)
+ {
+ // DEBUG_INFO("too many enqueued packets (%d), finding packet to drop", nqcb->_currEnqueuedPackets);
+ int maxQueueLength = 0;
+ for (int i=0; i<ZT_QOS_NUM_BUCKETS; i++) {
+ if (i < nqcb->oldQueues.size()) {
+ if (nqcb->oldQueues[i]->byteLength > maxQueueLength) {
+ maxQueueLength = nqcb->oldQueues[i]->byteLength;
+ selectedQueueToDropFrom = nqcb->oldQueues[i];
+ }
+ } if (i < nqcb->newQueues.size()) {
+ if (nqcb->newQueues[i]->byteLength > maxQueueLength) {
+ maxQueueLength = nqcb->newQueues[i]->byteLength;
+ selectedQueueToDropFrom = nqcb->newQueues[i];
+ }
+ } if (i < nqcb->inactiveQueues.size()) {
+ if (nqcb->inactiveQueues[i]->byteLength > maxQueueLength) {
+ maxQueueLength = nqcb->inactiveQueues[i]->byteLength;
+ selectedQueueToDropFrom = nqcb->inactiveQueues[i];
+ }
+ }
+ }
+ if (selectedQueueToDropFrom) {
+ // DEBUG_INFO("dropping packet from head of largest queue (%d payload bytes)", maxQueueLength);
+ int sizeOfDroppedPacket = selectedQueueToDropFrom->q.front()->packet.payloadLength();
+ delete selectedQueueToDropFrom->q.front();
+ selectedQueueToDropFrom->q.pop_front();
+ selectedQueueToDropFrom->byteLength-=sizeOfDroppedPacket;
+ nqcb->_currEnqueuedPackets--;
+ }
+ }
+ _aqm_m.unlock();
+ aqm_dequeue(tPtr);
+}
+
+uint64_t Switch::control_law(uint64_t t, int count)
+{
+ return t + ZT_QOS_INTERVAL / sqrt(count);
+}
+
+Switch::dqr Switch::dodequeue(ManagedQueue *q, uint64_t now)
+{
+ dqr r;
+ r.ok_to_drop = false;
+ r.p = q->q.front();
+
+ if (r.p == NULL) {
+ q->first_above_time = 0;
+ return r;
+ }
+ uint64_t sojourn_time = now - r.p->creationTime;
+ if (sojourn_time < ZT_QOS_TARGET || q->byteLength <= ZT_DEFAULT_MTU) {
+ // went below - stay below for at least interval
+ q->first_above_time = 0;
+ } else {
+ if (q->first_above_time == 0) {
+ // just went above from below. if still above at
+ // first_above_time, will say it's ok to drop.
+ q->first_above_time = now + ZT_QOS_INTERVAL;
+ } else if (now >= q->first_above_time) {
+ r.ok_to_drop = true;
+ }
+ }
+ return r;
+}
+
+Switch::TXQueueEntry * Switch::CoDelDequeue(ManagedQueue *q, bool isNew, uint64_t now)
+{
+ dqr r = dodequeue(q, now);
+
+ if (q->dropping) {
+ if (!r.ok_to_drop) {
+ q->dropping = false;
+ }
+ while (now >= q->drop_next && q->dropping) {
+ q->q.pop_front(); // drop
+ r = dodequeue(q, now);
+ if (!r.ok_to_drop) {
+ // leave dropping state
+ q->dropping = false;
+ } else {
+ ++(q->count);
+ // schedule the next drop.
+ q->drop_next = control_law(q->drop_next, q->count);
+ }
+ }
+ } else if (r.ok_to_drop) {
+ q->q.pop_front(); // drop
+ r = dodequeue(q, now);
+ q->dropping = true;
+ q->count = (q->count > 2 && now - q->drop_next < 8*ZT_QOS_INTERVAL)?
+ q->count - 2 : 1;
+ q->drop_next = control_law(now, q->count);
+ }
+ return r.p;
+}
+
+void Switch::aqm_dequeue(void *tPtr)
+{
+ // Cycle through network-specific QoS control blocks
+ for(std::map<uint64_t,NetworkQoSControlBlock*>::iterator nqcb(_netQueueControlBlock.begin());nqcb!=_netQueueControlBlock.end();) {
+ if (!(*nqcb).second->_currEnqueuedPackets) {
+ return;
+ }
+
+ uint64_t now = RR->node->now();
+ TXQueueEntry *entryToEmit = nullptr;
+ std::vector<ManagedQueue*> *currQueues = &((*nqcb).second->newQueues);
+ std::vector<ManagedQueue*> *oldQueues = &((*nqcb).second->oldQueues);
+ std::vector<ManagedQueue*> *inactiveQueues = &((*nqcb).second->inactiveQueues);
+
+ _aqm_m.lock();
+
+ // Attempt dequeue from queues in NEW list
+ bool examiningNewQueues = true;
+ while (currQueues->size()) {
+ ManagedQueue *queueAtFrontOfList = currQueues->front();
+ if (queueAtFrontOfList->byteCredit < 0) {
+ queueAtFrontOfList->byteCredit += ZT_QOS_QUANTUM;
+ // Move to list of OLD queues
+ // DEBUG_INFO("moving q=%p from NEW to OLD list", queueAtFrontOfList);
+ oldQueues->push_back(queueAtFrontOfList);
+ currQueues->erase(currQueues->begin());
+ } else {
+ entryToEmit = CoDelDequeue(queueAtFrontOfList, examiningNewQueues, now);
+ if (!entryToEmit) {
+ // Move to end of list of OLD queues
+ // DEBUG_INFO("moving q=%p from NEW to OLD list", queueAtFrontOfList);
+ oldQueues->push_back(queueAtFrontOfList);
+ currQueues->erase(currQueues->begin());
+ }
+ else {
+ int len = entryToEmit->packet.payloadLength();
+ queueAtFrontOfList->byteLength -= len;
+ queueAtFrontOfList->byteCredit -= len;
+ // Send the packet!
+ queueAtFrontOfList->q.pop_front();
+ send(tPtr, entryToEmit->packet, entryToEmit->encrypt);
+ (*nqcb).second->_currEnqueuedPackets--;
+ }
+ if (queueAtFrontOfList) {
+ //DEBUG_INFO("dequeuing from q=%p, len=%lu in NEW list (byteCredit=%d)", queueAtFrontOfList, queueAtFrontOfList->q.size(), queueAtFrontOfList->byteCredit);
+ }
+ break;
+ }
+ }
+
+ // Attempt dequeue from queues in OLD list
+ examiningNewQueues = false;
+ currQueues = &((*nqcb).second->oldQueues);
+ while (currQueues->size()) {
+ ManagedQueue *queueAtFrontOfList = currQueues->front();
+ if (queueAtFrontOfList->byteCredit < 0) {
+ queueAtFrontOfList->byteCredit += ZT_QOS_QUANTUM;
+ oldQueues->push_back(queueAtFrontOfList);
+ currQueues->erase(currQueues->begin());
+ } else {
+ entryToEmit = CoDelDequeue(queueAtFrontOfList, examiningNewQueues, now);
+ if (!entryToEmit) {
+ //DEBUG_INFO("moving q=%p from OLD to INACTIVE list", queueAtFrontOfList);
+ // Move to inactive list of queues
+ inactiveQueues->push_back(queueAtFrontOfList);
+ currQueues->erase(currQueues->begin());
+ }
+ else {
+ int len = entryToEmit->packet.payloadLength();
+ queueAtFrontOfList->byteLength -= len;
+ queueAtFrontOfList->byteCredit -= len;
+ queueAtFrontOfList->q.pop_front();
+ send(tPtr, entryToEmit->packet, entryToEmit->encrypt);
+ (*nqcb).second->_currEnqueuedPackets--;
+ }
+ if (queueAtFrontOfList) {
+ //DEBUG_INFO("dequeuing from q=%p, len=%lu in OLD list (byteCredit=%d)", queueAtFrontOfList, queueAtFrontOfList->q.size(), queueAtFrontOfList->byteCredit);
+ }
+ break;
+ }
+ }
+ nqcb++;
+ _aqm_m.unlock();
+ }
+}
+
+void Switch::removeNetworkQoSControlBlock(uint64_t nwid)
+{
+ NetworkQoSControlBlock *nq = _netQueueControlBlock[nwid];
+ if (nq) {
+ _netQueueControlBlock.erase(nwid);
+ delete nq;
+ nq = NULL;
+ }
+}
+
void Switch::send(void *tPtr,Packet &packet,bool encrypt)
{
const Address dest(packet.destination());
@@ -557,6 +816,7 @@ void Switch::doAnythingWaitingForPeer(void *tPtr,const SharedPtr<Peer> &peer)
{
Mutex::Lock _l(_txQueue_m);
+
for(std::list< TXQueueEntry >::iterator txi(_txQueue.begin());txi!=_txQueue.end();) {
if (txi->dest == peer->address()) {
if (_trySend(tPtr,txi->packet,txi->encrypt)) {
@@ -581,6 +841,7 @@ unsigned long Switch::doTimerTasks(void *tPtr,int64_t now)
std::vector<Address> needWhois;
{
Mutex::Lock _l(_txQueue_m);
+
for(std::list< TXQueueEntry >::iterator txi(_txQueue.begin());txi!=_txQueue.end();) {
if (_trySend(tPtr,txi->packet,txi->encrypt)) {
_txQueue.erase(txi++);
@@ -654,12 +915,12 @@ bool Switch::_trySend(void *tPtr,Packet &packet,bool encrypt)
const SharedPtr<Peer> peer(RR->topology->getPeer(tPtr,destination));
if (peer) {
- viaPath = peer->getBestPath(now,false);
+ viaPath = peer->getAppropriatePath(now,false);
if (!viaPath) {
peer->tryMemorizedPath(tPtr,now); // periodically attempt memorized or statically defined paths, if any are known
const SharedPtr<Peer> relay(RR->topology->getUpstreamPeer());
- if ( (!relay) || (!(viaPath = relay->getBestPath(now,false))) ) {
- if (!(viaPath = peer->getBestPath(now,true)))
+ if ( (!relay) || (!(viaPath = relay->getAppropriatePath(now,false))) ) {
+ if (!(viaPath = peer->getAppropriatePath(now,true)))
return false;
}
}
@@ -674,6 +935,8 @@ bool Switch::_trySend(void *tPtr,Packet &packet,bool encrypt)
unsigned int chunkSize = std::min(packet.size(),mtu);
packet.setFragmented(chunkSize < packet.size());
+ peer->recordOutgoingPacket(viaPath, packet.packetId(), packet.payloadLength(), packet.verb(), now);
+
if (trustedPathId) {
packet.setTrusted(trustedPathId);
} else {