summaryrefslogtreecommitdiff
path: root/node
diff options
context:
space:
mode:
Diffstat (limited to 'node')
-rw-r--r--node/Constants.hpp35
-rw-r--r--node/Network.cpp16
-rw-r--r--node/Network.hpp10
-rw-r--r--node/Node.cpp1
-rw-r--r--node/OutboundMulticast.cpp3
-rw-r--r--node/Switch.cpp275
-rw-r--r--node/Switch.hpp94
7 files changed, 419 insertions, 15 deletions
diff --git a/node/Constants.hpp b/node/Constants.hpp
index 420343ad..5f21201e 100644
--- a/node/Constants.hpp
+++ b/node/Constants.hpp
@@ -411,6 +411,41 @@
#define ZT_PATH_IMBALANCE_THRESHOLD 0.20
/**
+ * Max allowable time spent in any queue
+ */
+#define ZT_QOS_TARGET 5 // ms
+
+/**
+ * Time period where the time spent in the queue by a packet should fall below
+ * target at least once
+ */
+#define ZT_QOS_INTERVAL 100 // ms
+
+/**
+ * The number of bytes that each queue is allowed to send during each DRR cycle.
+ * This approximates a single-byte-based fairness queuing scheme
+ */
+#define ZT_QOS_QUANTUM ZT_DEFAULT_MTU
+
+/**
+ * The maximum total number of packets that can be queued among all
+ * active/inactive, old/new queues
+ */
+#define ZT_QOS_MAX_ENQUEUED_PACKETS 1024
+
+/**
+ * Number of QoS queues (buckets)
+ */
+#define ZT_QOS_NUM_BUCKETS 9
+
+/**
+ * All unspecified traffic is put in this bucket. Anything in a bucket with a smaller
+ * value is de-prioritized. Anything in a bucket with a higher value is prioritized over
+ * other traffic.
+ */
+#define ZT_QOS_DEFAULT_BUCKET 0
+
+/**
* How frequently to send heartbeats over in-use paths
*/
#define ZT_PATH_HEARTBEAT_PERIOD 14000
diff --git a/node/Network.cpp b/node/Network.cpp
index a75d9fd1..a5c2fc3e 100644
--- a/node/Network.cpp
+++ b/node/Network.cpp
@@ -106,7 +106,8 @@ static _doZtFilterResult _doZtFilter(
const unsigned int ruleCount,
Address &cc, // MUTABLE -- set to TEE destination if TEE action is taken or left alone otherwise
unsigned int &ccLength, // MUTABLE -- set to length of packet payload to TEE
- bool &ccWatch) // MUTABLE -- set to true for WATCH target as opposed to normal TEE
+ bool &ccWatch, // MUTABLE -- set to true for WATCH target as opposed to normal TEE
+ uint8_t &qosBucket) // MUTABLE -- set to the value of the argument provided to the matching action
{
// Set to true if we are a TEE/REDIRECT/WATCH target
bool superAccept = false;
@@ -621,7 +622,8 @@ bool Network::filterOutgoingPacket(
const uint8_t *frameData,
const unsigned int frameLen,
const unsigned int etherType,
- const unsigned int vlanId)
+ const unsigned int vlanId,
+ uint8_t &qosBucket)
{
const int64_t now = RR->node->now();
Address ztFinalDest(ztDest);
@@ -636,7 +638,7 @@ bool Network::filterOutgoingPacket(
Membership *const membership = (ztDest) ? _memberships.get(ztDest) : (Membership *)0;
- switch(_doZtFilter(RR,rrl,_config,membership,false,ztSource,ztFinalDest,macSource,macDest,frameData,frameLen,etherType,vlanId,_config.rules,_config.ruleCount,cc,ccLength,ccWatch)) {
+ switch(_doZtFilter(RR,rrl,_config,membership,false,ztSource,ztFinalDest,macSource,macDest,frameData,frameLen,etherType,vlanId,_config.rules,_config.ruleCount,cc,ccLength,ccWatch,qosBucket)) {
case DOZTFILTER_NO_MATCH: {
for(unsigned int c=0;c<_config.capabilityCount;++c) {
@@ -644,7 +646,7 @@ bool Network::filterOutgoingPacket(
Address cc2;
unsigned int ccLength2 = 0;
bool ccWatch2 = false;
- switch (_doZtFilter(RR,crrl,_config,membership,false,ztSource,ztFinalDest,macSource,macDest,frameData,frameLen,etherType,vlanId,_config.capabilities[c].rules(),_config.capabilities[c].ruleCount(),cc2,ccLength2,ccWatch2)) {
+ switch (_doZtFilter(RR,crrl,_config,membership,false,ztSource,ztFinalDest,macSource,macDest,frameData,frameLen,etherType,vlanId,_config.capabilities[c].rules(),_config.capabilities[c].ruleCount(),cc2,ccLength2,ccWatch2,qosBucket)) {
case DOZTFILTER_NO_MATCH:
case DOZTFILTER_DROP: // explicit DROP in a capability just terminates its evaluation and is an anti-pattern
break;
@@ -759,11 +761,13 @@ int Network::filterIncomingPacket(
bool ccWatch = false;
const Capability *c = (Capability *)0;
+ uint8_t qosBucket = 255; // For incoming packets this is a dummy value
+
Mutex::Lock _l(_lock);
Membership &membership = _membership(sourcePeer->address());
- switch (_doZtFilter(RR,rrl,_config,&membership,true,sourcePeer->address(),ztFinalDest,macSource,macDest,frameData,frameLen,etherType,vlanId,_config.rules,_config.ruleCount,cc,ccLength,ccWatch)) {
+ switch (_doZtFilter(RR,rrl,_config,&membership,true,sourcePeer->address(),ztFinalDest,macSource,macDest,frameData,frameLen,etherType,vlanId,_config.rules,_config.ruleCount,cc,ccLength,ccWatch,qosBucket)) {
case DOZTFILTER_NO_MATCH: {
Membership::CapabilityIterator mci(membership,_config);
@@ -772,7 +776,7 @@ int Network::filterIncomingPacket(
Address cc2;
unsigned int ccLength2 = 0;
bool ccWatch2 = false;
- switch(_doZtFilter(RR,crrl,_config,&membership,true,sourcePeer->address(),ztFinalDest,macSource,macDest,frameData,frameLen,etherType,vlanId,c->rules(),c->ruleCount(),cc2,ccLength2,ccWatch2)) {
+ switch(_doZtFilter(RR,crrl,_config,&membership,true,sourcePeer->address(),ztFinalDest,macSource,macDest,frameData,frameLen,etherType,vlanId,c->rules(),c->ruleCount(),cc2,ccLength2,ccWatch2,qosBucket)) {
case DOZTFILTER_NO_MATCH:
case DOZTFILTER_DROP: // explicit DROP in a capability just terminates its evaluation and is an anti-pattern
break;
diff --git a/node/Network.hpp b/node/Network.hpp
index 38f03eb3..2baab511 100644
--- a/node/Network.hpp
+++ b/node/Network.hpp
@@ -132,7 +132,8 @@ public:
const uint8_t *frameData,
const unsigned int frameLen,
const unsigned int etherType,
- const unsigned int vlanId);
+ const unsigned int vlanId,
+ uint8_t &qosBucket);
/**
* Apply filters to an incoming packet
@@ -298,6 +299,13 @@ public:
void learnBridgeRoute(const MAC &mac,const Address &addr);
/**
+ * Whether QoS is in effect for this network
+ */
+ bool QoSEnabled() {
+ return false;
+ }
+
+ /**
* Learn a multicast group that is bridged to our tap device
*
* @param tPtr Thread pointer to be handed through to any callbacks called as a result of this call
diff --git a/node/Node.cpp b/node/Node.cpp
index 9b10dfdd..576b2e4a 100644
--- a/node/Node.cpp
+++ b/node/Node.cpp
@@ -368,6 +368,7 @@ ZT_ResultCode Node::leave(uint64_t nwid,void **uptr,void *tptr)
{
Mutex::Lock _l(_networks_m);
SharedPtr<Network> *nw = _networks.get(nwid);
+ RR->sw->removeNetworkQoSControlBlock(nwid);
if (!nw)
return ZT_RESULT_OK;
if (uptr)
diff --git a/node/OutboundMulticast.cpp b/node/OutboundMulticast.cpp
index d7a7b4d8..2391771f 100644
--- a/node/OutboundMulticast.cpp
+++ b/node/OutboundMulticast.cpp
@@ -85,7 +85,8 @@ void OutboundMulticast::sendOnly(const RuntimeEnvironment *RR,void *tPtr,const A
{
const SharedPtr<Network> nw(RR->node->network(_nwid));
const Address toAddr2(toAddr);
- if ((nw)&&(nw->filterOutgoingPacket(tPtr,true,RR->identity.address(),toAddr2,_macSrc,_macDest,_frameData,_frameLen,_etherType,0))) {
+ uint8_t QoSBucket = 255; // Dummy value
+ if ((nw)&&(nw->filterOutgoingPacket(tPtr,true,RR->identity.address(),toAddr2,_macSrc,_macDest,_frameData,_frameLen,_etherType,0,QoSBucket))) {
_packet.newInitializationVector();
_packet.setDestination(toAddr2);
RR->node->expectReplyTo(_packet.packetId());
diff --git a/node/Switch.cpp b/node/Switch.cpp
index d53bf53e..fddbd581 100644
--- a/node/Switch.cpp
+++ b/node/Switch.cpp
@@ -266,6 +266,8 @@ void Switch::onLocalEthernet(void *tPtr,const SharedPtr<Network> &network,const
}
}
+ uint8_t qosBucket = ZT_QOS_DEFAULT_BUCKET;
+
if (to.isMulticast()) {
MulticastGroup multicastGroup(to,0);
@@ -383,7 +385,7 @@ void Switch::onLocalEthernet(void *tPtr,const SharedPtr<Network> &network,const
network->learnBridgedMulticastGroup(tPtr,multicastGroup,RR->node->now());
// First pass sets noTee to false, but noTee is set to true in OutboundMulticast to prevent duplicates.
- if (!network->filterOutgoingPacket(tPtr,false,RR->identity.address(),Address(),from,to,(const uint8_t *)data,len,etherType,vlanId)) {
+ if (!network->filterOutgoingPacket(tPtr,false,RR->identity.address(),Address(),from,to,(const uint8_t *)data,len,etherType,vlanId,qosBucket)) {
RR->t->outgoingNetworkFrameDropped(tPtr,network,from,to,etherType,vlanId,len,"filter blocked");
return;
}
@@ -407,7 +409,7 @@ void Switch::onLocalEthernet(void *tPtr,const SharedPtr<Network> &network,const
Address toZT(to.toAddress(network->id())); // since in-network MACs are derived from addresses and network IDs, we can reverse this
SharedPtr<Peer> toPeer(RR->topology->getPeer(tPtr,toZT));
- if (!network->filterOutgoingPacket(tPtr,false,RR->identity.address(),toZT,from,to,(const uint8_t *)data,len,etherType,vlanId)) {
+ if (!network->filterOutgoingPacket(tPtr,false,RR->identity.address(),toZT,from,to,(const uint8_t *)data,len,etherType,vlanId,qosBucket)) {
RR->t->outgoingNetworkFrameDropped(tPtr,network,from,to,etherType,vlanId,len,"filter blocked");
return;
}
@@ -422,7 +424,7 @@ void Switch::onLocalEthernet(void *tPtr,const SharedPtr<Network> &network,const
outp.append(data,len);
if (!network->config().disableCompression())
outp.compress();
- send(tPtr,outp,true);
+ aqm_enqueue(tPtr,network,outp,true,qosBucket);
} else {
Packet outp(toZT,RR->identity.address(),Packet::VERB_FRAME);
outp.append(network->id());
@@ -430,7 +432,7 @@ void Switch::onLocalEthernet(void *tPtr,const SharedPtr<Network> &network,const
outp.append(data,len);
if (!network->config().disableCompression())
outp.compress();
- send(tPtr,outp,true);
+ aqm_enqueue(tPtr,network,outp,true,qosBucket);
}
} else {
@@ -439,7 +441,7 @@ void Switch::onLocalEthernet(void *tPtr,const SharedPtr<Network> &network,const
// We filter with a NULL destination ZeroTier address first. Filtrations
// for each ZT destination are also done below. This is the same rationale
// and design as for multicast.
- if (!network->filterOutgoingPacket(tPtr,false,RR->identity.address(),Address(),from,to,(const uint8_t *)data,len,etherType,vlanId)) {
+ if (!network->filterOutgoingPacket(tPtr,false,RR->identity.address(),Address(),from,to,(const uint8_t *)data,len,etherType,vlanId,qosBucket)) {
RR->t->outgoingNetworkFrameDropped(tPtr,network,from,to,etherType,vlanId,len,"filter blocked");
return;
}
@@ -477,7 +479,7 @@ void Switch::onLocalEthernet(void *tPtr,const SharedPtr<Network> &network,const
}
for(unsigned int b=0;b<numBridges;++b) {
- if (network->filterOutgoingPacket(tPtr,true,RR->identity.address(),bridges[b],from,to,(const uint8_t *)data,len,etherType,vlanId)) {
+ if (network->filterOutgoingPacket(tPtr,true,RR->identity.address(),bridges[b],from,to,(const uint8_t *)data,len,etherType,vlanId,qosBucket)) {
Packet outp(bridges[b],RR->identity.address(),Packet::VERB_EXT_FRAME);
outp.append(network->id());
outp.append((uint8_t)0x00);
@@ -487,7 +489,7 @@ void Switch::onLocalEthernet(void *tPtr,const SharedPtr<Network> &network,const
outp.append(data,len);
if (!network->config().disableCompression())
outp.compress();
- send(tPtr,outp,true);
+ aqm_enqueue(tPtr,network,outp,true,qosBucket);
} else {
RR->t->outgoingNetworkFrameDropped(tPtr,network,from,to,etherType,vlanId,len,"filter blocked (bridge replication)");
}
@@ -495,6 +497,263 @@ void Switch::onLocalEthernet(void *tPtr,const SharedPtr<Network> &network,const
}
}
+void Switch::aqm_enqueue(void *tPtr, const SharedPtr<Network> &network, Packet &packet,bool encrypt,int qosBucket)
+{
+ if(!network->QoSEnabled()) {
+ send(tPtr, packet, encrypt);
+ return;
+ }
+ NetworkQoSControlBlock *nqcb = _netQueueControlBlock[network->id()];
+ if (!nqcb) {
+ // DEBUG_INFO("creating network QoS control block (NQCB) for network %llx", network->id());
+ nqcb = new NetworkQoSControlBlock();
+ _netQueueControlBlock[network->id()] = nqcb;
+ // Initialize ZT_QOS_NUM_BUCKETS queues and place them in the INACTIVE list
+ // These queues will be shuffled between the new/old/inactive lists by the enqueue/dequeue algorithm
+ for (int i=0; i<ZT_QOS_NUM_BUCKETS; i++) {
+ nqcb->inactiveQueues.push_back(new ManagedQueue(i));
+ }
+ }
+
+ if (packet.verb() != Packet::VERB_FRAME && packet.verb() != Packet::VERB_EXT_FRAME) {
+ // DEBUG_INFO("skipping, no QoS for this packet, verb=%x", packet.verb());
+ // just send packet normally, no QoS for ZT protocol traffic
+ send(tPtr, packet, encrypt);
+ }
+
+ _aqm_m.lock();
+
+ // Enqueue packet and move queue to appropriate list
+
+ const Address dest(packet.destination());
+ TXQueueEntry *txEntry = new TXQueueEntry(dest,RR->node->now(),packet,encrypt);
+
+ ManagedQueue *selectedQueue = nullptr;
+ for (int i=0; i<ZT_QOS_NUM_BUCKETS; i++) {
+ if (i < nqcb->oldQueues.size()) { // search old queues first (I think this is best since old would imply most recent usage of the queue)
+ if (nqcb->oldQueues[i]->id == qosBucket) {
+ selectedQueue = nqcb->oldQueues[i];
+ }
+ } if (i < nqcb->newQueues.size()) { // search new queues (this would imply not often-used queues)
+ if (nqcb->newQueues[i]->id == qosBucket) {
+ selectedQueue = nqcb->newQueues[i];
+ }
+ } if (i < nqcb->inactiveQueues.size()) { // search inactive queues
+ if (nqcb->inactiveQueues[i]->id == qosBucket) {
+ selectedQueue = nqcb->inactiveQueues[i];
+ // move queue to end of NEW queue list
+ selectedQueue->byteCredit = ZT_QOS_QUANTUM;
+ // DEBUG_INFO("moving q=%p from INACTIVE to NEW list", selectedQueue);
+ nqcb->newQueues.push_back(selectedQueue);
+ nqcb->inactiveQueues.erase(nqcb->inactiveQueues.begin() + i);
+ }
+ }
+ }
+ if (!selectedQueue) {
+ return;
+ }
+
+ selectedQueue->q.push_back(txEntry);
+ selectedQueue->byteLength+=txEntry->packet.payloadLength();
+ nqcb->_currEnqueuedPackets++;
+
+ // DEBUG_INFO("nq=%2lu, oq=%2lu, iq=%2lu, nqcb.size()=%3d, bucket=%2d, q=%p", nqcb->newQueues.size(), nqcb->oldQueues.size(), nqcb->inactiveQueues.size(), nqcb->_currEnqueuedPackets, qosBucket, selectedQueue);
+
+ // Drop a packet if necessary
+ ManagedQueue *selectedQueueToDropFrom = nullptr;
+ if (nqcb->_currEnqueuedPackets > ZT_QOS_MAX_ENQUEUED_PACKETS)
+ {
+ // DEBUG_INFO("too many enqueued packets (%d), finding packet to drop", nqcb->_currEnqueuedPackets);
+ int maxQueueLength = 0;
+ for (int i=0; i<ZT_QOS_NUM_BUCKETS; i++) {
+ if (i < nqcb->oldQueues.size()) {
+ if (nqcb->oldQueues[i]->byteLength > maxQueueLength) {
+ maxQueueLength = nqcb->oldQueues[i]->byteLength;
+ selectedQueueToDropFrom = nqcb->oldQueues[i];
+ }
+ } if (i < nqcb->newQueues.size()) {
+ if (nqcb->newQueues[i]->byteLength > maxQueueLength) {
+ maxQueueLength = nqcb->newQueues[i]->byteLength;
+ selectedQueueToDropFrom = nqcb->newQueues[i];
+ }
+ } if (i < nqcb->inactiveQueues.size()) {
+ if (nqcb->inactiveQueues[i]->byteLength > maxQueueLength) {
+ maxQueueLength = nqcb->inactiveQueues[i]->byteLength;
+ selectedQueueToDropFrom = nqcb->inactiveQueues[i];
+ }
+ }
+ }
+ if (selectedQueueToDropFrom) {
+ // DEBUG_INFO("dropping packet from head of largest queue (%d payload bytes)", maxQueueLength);
+ int sizeOfDroppedPacket = selectedQueueToDropFrom->q.front()->packet.payloadLength();
+ delete selectedQueueToDropFrom->q.front();
+ selectedQueueToDropFrom->q.pop_front();
+ selectedQueueToDropFrom->byteLength-=sizeOfDroppedPacket;
+ nqcb->_currEnqueuedPackets--;
+ }
+ }
+ _aqm_m.unlock();
+ aqm_dequeue(tPtr);
+}
+
+uint64_t Switch::control_law(uint64_t t, int count)
+{
+ return t + ZT_QOS_INTERVAL / sqrt(count);
+}
+
+Switch::dqr Switch::dodequeue(ManagedQueue *q, uint64_t now)
+{
+ dqr r;
+ r.ok_to_drop = false;
+ r.p = q->q.front();
+
+ if (r.p == NULL) {
+ q->first_above_time = 0;
+ return r;
+ }
+ uint64_t sojourn_time = now - r.p->creationTime;
+ if (sojourn_time < ZT_QOS_TARGET || q->byteLength <= ZT_DEFAULT_MTU) {
+ // went below - stay below for at least interval
+ q->first_above_time = 0;
+ } else {
+ if (q->first_above_time == 0) {
+ // just went above from below. if still above at
+ // first_above_time, will say it's ok to drop.
+ q->first_above_time = now + ZT_QOS_INTERVAL;
+ } else if (now >= q->first_above_time) {
+ r.ok_to_drop = true;
+ }
+ }
+ return r;
+}
+
+Switch::TXQueueEntry * Switch::CoDelDequeue(ManagedQueue *q, bool isNew, uint64_t now)
+{
+ dqr r = dodequeue(q, now);
+
+ if (q->dropping) {
+ if (!r.ok_to_drop) {
+ q->dropping = false;
+ }
+ while (now >= q->drop_next && q->dropping) {
+ q->q.pop_front(); // drop
+ r = dodequeue(q, now);
+ if (!r.ok_to_drop) {
+ // leave dropping state
+ q->dropping = false;
+ } else {
+ ++(q->count);
+ // schedule the next drop.
+ q->drop_next = control_law(q->drop_next, q->count);
+ }
+ }
+ } else if (r.ok_to_drop) {
+ q->q.pop_front(); // drop
+ r = dodequeue(q, now);
+ q->dropping = true;
+ q->count = (q->count > 2 && now - q->drop_next < 8*ZT_QOS_INTERVAL)?
+ q->count - 2 : 1;
+ q->drop_next = control_law(now, q->count);
+ }
+ return r.p;
+}
+
+void Switch::aqm_dequeue(void *tPtr)
+{
+ // Cycle through network-specific QoS control blocks
+ for(std::map<uint64_t,NetworkQoSControlBlock*>::iterator nqcb(_netQueueControlBlock.begin());nqcb!=_netQueueControlBlock.end();) {
+ if (!(*nqcb).second->_currEnqueuedPackets) {
+ return;
+ }
+
+ uint64_t now = RR->node->now();
+ TXQueueEntry *entryToEmit = nullptr;
+ std::vector<ManagedQueue*> *currQueues = &((*nqcb).second->newQueues);
+ std::vector<ManagedQueue*> *oldQueues = &((*nqcb).second->oldQueues);
+ std::vector<ManagedQueue*> *inactiveQueues = &((*nqcb).second->inactiveQueues);
+
+ _aqm_m.lock();
+
+ // Attempt dequeue from queues in NEW list
+ bool examiningNewQueues = true;
+ while (currQueues->size()) {
+ ManagedQueue *queueAtFrontOfList = currQueues->front();
+ if (queueAtFrontOfList->byteCredit < 0) {
+ queueAtFrontOfList->byteCredit += ZT_QOS_QUANTUM;
+ // Move to list of OLD queues
+ // DEBUG_INFO("moving q=%p from NEW to OLD list", queueAtFrontOfList);
+ oldQueues->push_back(queueAtFrontOfList);
+ currQueues->erase(currQueues->begin());
+ } else {
+ entryToEmit = CoDelDequeue(queueAtFrontOfList, examiningNewQueues, now);
+ if (!entryToEmit) {
+ // Move to end of list of OLD queues
+ // DEBUG_INFO("moving q=%p from NEW to OLD list", queueAtFrontOfList);
+ oldQueues->push_back(queueAtFrontOfList);
+ currQueues->erase(currQueues->begin());
+ }
+ else {
+ int len = entryToEmit->packet.payloadLength();
+ queueAtFrontOfList->byteLength -= len;
+ queueAtFrontOfList->byteCredit -= len;
+ // Send the packet!
+ queueAtFrontOfList->q.pop_front();
+ send(tPtr, entryToEmit->packet, entryToEmit->encrypt);
+ (*nqcb).second->_currEnqueuedPackets--;
+ }
+ if (queueAtFrontOfList) {
+ //DEBUG_INFO("dequeuing from q=%p, len=%lu in NEW list (byteCredit=%d)", queueAtFrontOfList, queueAtFrontOfList->q.size(), queueAtFrontOfList->byteCredit);
+ }
+ break;
+ }
+ }
+
+ // Attempt dequeue from queues in OLD list
+ examiningNewQueues = false;
+ currQueues = &((*nqcb).second->oldQueues);
+ while (currQueues->size()) {
+ ManagedQueue *queueAtFrontOfList = currQueues->front();
+ if (queueAtFrontOfList->byteCredit < 0) {
+ queueAtFrontOfList->byteCredit += ZT_QOS_QUANTUM;
+ oldQueues->push_back(queueAtFrontOfList);
+ currQueues->erase(currQueues->begin());
+ } else {
+ entryToEmit = CoDelDequeue(queueAtFrontOfList, examiningNewQueues, now);
+ if (!entryToEmit) {
+ //DEBUG_INFO("moving q=%p from OLD to INACTIVE list", queueAtFrontOfList);
+ // Move to inactive list of queues
+ inactiveQueues->push_back(queueAtFrontOfList);
+ currQueues->erase(currQueues->begin());
+ }
+ else {
+ int len = entryToEmit->packet.payloadLength();
+ queueAtFrontOfList->byteLength -= len;
+ queueAtFrontOfList->byteCredit -= len;
+ queueAtFrontOfList->q.pop_front();
+ send(tPtr, entryToEmit->packet, entryToEmit->encrypt);
+ (*nqcb).second->_currEnqueuedPackets--;
+ }
+ if (queueAtFrontOfList) {
+ //DEBUG_INFO("dequeuing from q=%p, len=%lu in OLD list (byteCredit=%d)", queueAtFrontOfList, queueAtFrontOfList->q.size(), queueAtFrontOfList->byteCredit);
+ }
+ break;
+ }
+ }
+ nqcb++;
+ _aqm_m.unlock();
+ }
+}
+
+void Switch::removeNetworkQoSControlBlock(uint64_t nwid)
+{
+ NetworkQoSControlBlock *nq = _netQueueControlBlock[nwid];
+ if (nq) {
+ _netQueueControlBlock.erase(nwid);
+ delete nq;
+ nq = NULL;
+ }
+}
+
void Switch::send(void *tPtr,Packet &packet,bool encrypt)
{
const Address dest(packet.destination());
@@ -550,6 +809,7 @@ void Switch::doAnythingWaitingForPeer(void *tPtr,const SharedPtr<Peer> &peer)
{
Mutex::Lock _l(_txQueue_m);
+
for(std::list< TXQueueEntry >::iterator txi(_txQueue.begin());txi!=_txQueue.end();) {
if (txi->dest == peer->address()) {
if (_trySend(tPtr,txi->packet,txi->encrypt)) {
@@ -574,6 +834,7 @@ unsigned long Switch::doTimerTasks(void *tPtr,int64_t now)
std::vector<Address> needWhois;
{
Mutex::Lock _l(_txQueue_m);
+
for(std::list< TXQueueEntry >::iterator txi(_txQueue.begin());txi!=_txQueue.end();) {
if (_trySend(tPtr,txi->packet,txi->encrypt)) {
_txQueue.erase(txi++);
diff --git a/node/Switch.hpp b/node/Switch.hpp
index 906f418e..5f60fc46 100644
--- a/node/Switch.hpp
+++ b/node/Switch.hpp
@@ -59,6 +59,14 @@ class Peer;
*/
class Switch
{
+ struct ManagedQueue;
+ struct TXQueueEntry;
+
+ typedef struct {
+ TXQueueEntry *p;
+ bool ok_to_drop;
+ } dqr;
+
public:
Switch(const RuntimeEnvironment *renv);
@@ -88,6 +96,62 @@ public:
void onLocalEthernet(void *tPtr,const SharedPtr<Network> &network,const MAC &from,const MAC &to,unsigned int etherType,unsigned int vlanId,const void *data,unsigned int len);
/**
+ * Determines the next drop schedule for packets in the TX queue
+ *
+ * @param t Current time
+ * @param count Number of packets dropped this round
+ */
+ uint64_t control_law(uint64_t t, int count);
+
+ /**
+ * Selects a packet eligible for transmission from a TX queue. According to the control law, multiple packets
+ * may be intentionally dropped before a packet is returned to the AQM scheduler.
+ *
+ * @param q The TX queue that is being dequeued from
+ * @param now Current time
+ */
+ dqr dodequeue(ManagedQueue *q, uint64_t now);
+
+ /**
+ * Presents a packet to the AQM scheduler.
+ *
+ * @param tPtr Thread pointer to be handed through to any callbacks called as a result of this call
+ * @param network Network that the packet shall be sent over
+ * @param packet Packet to be sent
+ * @param encrypt Encrypt packet payload? (always true except for HELLO)
+ * @param qosBucket Which bucket the rule-system determined this packet should fall into
+ */
+ void aqm_enqueue(void *tPtr, const SharedPtr<Network> &network, Packet &packet,bool encrypt,int qosBucket);
+
+ /**
+ * Performs a single AQM cycle and dequeues and transmits all eligible packets on all networks
+ *
+ * @param tPtr Thread pointer to be handed through to any callbacks called as a result of this call
+ */
+ void aqm_dequeue(void *tPtr);
+
+ /**
+ * Calls the dequeue mechanism and adjust queue state variables
+ *
+ * @param q The TX queue that is being dequeued from
+ * @param isNew Whether or not this queue is in the NEW list
+ * @param now Current time
+ */
+ Switch::TXQueueEntry * CoDelDequeue(ManagedQueue *q, bool isNew, uint64_t now);
+
+ /**
+ * Removes QoS Queues and flow state variables for a specific network. These queues are created
+ * automatically upon the transmission of the first packet from this peer to another peer on the
+ * given network.
+ *
+ * The reason for existence of queues and flow state variables specific to each network is so that
+ * each network's QoS rules function independently.
+ *
+ * @param nwid Network ID
+ */
+ void removeNetworkQoSControlBlock(uint64_t nwid);
+
+ /**
* Send a packet to a ZeroTier address (destination in packet)
*
* The packet must be fully composed with source and destination but not
@@ -199,6 +263,7 @@ private:
};
std::list< TXQueueEntry > _txQueue;
Mutex _txQueue_m;
+ Mutex _aqm_m;
// Tracks sending of VERB_RENDEZVOUS to relaying peers
struct _LastUniteKey
@@ -220,6 +285,35 @@ private:
};
Hashtable< _LastUniteKey,uint64_t > _lastUniteAttempt; // key is always sorted in ascending order, for set-like behavior
Mutex _lastUniteAttempt_m;
+
+ // Queue with additional flow state variables
+ struct ManagedQueue
+ {
+ ManagedQueue(int id) :
+ id(id),
+ byteCredit(ZT_QOS_QUANTUM),
+ byteLength(0),
+ dropping(false)
+ {}
+ int id;
+ int byteCredit;
+ int byteLength;
+ uint64_t first_above_time;
+ uint32_t count;
+ uint64_t drop_next;
+ bool dropping;
+ uint64_t drop_next_time;
+ std::list< TXQueueEntry *> q;
+ };
+ // To implement fq_codel we need to maintain a queue of queues
+ struct NetworkQoSControlBlock
+ {
+ int _currEnqueuedPackets;
+ std::vector<ManagedQueue *> newQueues;
+ std::vector<ManagedQueue *> oldQueues;
+ std::vector<ManagedQueue *> inactiveQueues;
+ };
+ std::map<uint64_t,NetworkQoSControlBlock*> _netQueueControlBlock;
};
} // namespace ZeroTier