diff options
Diffstat (limited to 'node/Switch.cpp')
-rw-r--r-- | node/Switch.cpp | 283 |
1 files changed, 273 insertions, 10 deletions
diff --git a/node/Switch.cpp b/node/Switch.cpp index eeeca5db..7b517864 100644 --- a/node/Switch.cpp +++ b/node/Switch.cpp @@ -269,6 +269,8 @@ void Switch::onLocalEthernet(void *tPtr,const SharedPtr<Network> &network,const } } + uint8_t qosBucket = ZT_QOS_DEFAULT_BUCKET; + if (to.isMulticast()) { MulticastGroup multicastGroup(to,0); @@ -386,7 +388,7 @@ void Switch::onLocalEthernet(void *tPtr,const SharedPtr<Network> &network,const network->learnBridgedMulticastGroup(tPtr,multicastGroup,RR->node->now()); // First pass sets noTee to false, but noTee is set to true in OutboundMulticast to prevent duplicates. - if (!network->filterOutgoingPacket(tPtr,false,RR->identity.address(),Address(),from,to,(const uint8_t *)data,len,etherType,vlanId)) { + if (!network->filterOutgoingPacket(tPtr,false,RR->identity.address(),Address(),from,to,(const uint8_t *)data,len,etherType,vlanId,qosBucket)) { RR->t->outgoingNetworkFrameDropped(tPtr,network,from,to,etherType,vlanId,len,"filter blocked"); return; } @@ -410,7 +412,7 @@ void Switch::onLocalEthernet(void *tPtr,const SharedPtr<Network> &network,const Address toZT(to.toAddress(network->id())); // since in-network MACs are derived from addresses and network IDs, we can reverse this SharedPtr<Peer> toPeer(RR->topology->getPeer(tPtr,toZT)); - if (!network->filterOutgoingPacket(tPtr,false,RR->identity.address(),toZT,from,to,(const uint8_t *)data,len,etherType,vlanId)) { + if (!network->filterOutgoingPacket(tPtr,false,RR->identity.address(),toZT,from,to,(const uint8_t *)data,len,etherType,vlanId,qosBucket)) { RR->t->outgoingNetworkFrameDropped(tPtr,network,from,to,etherType,vlanId,len,"filter blocked"); return; } @@ -425,7 +427,7 @@ void Switch::onLocalEthernet(void *tPtr,const SharedPtr<Network> &network,const outp.append(data,len); if (!network->config().disableCompression()) outp.compress(); - send(tPtr,outp,true); + aqm_enqueue(tPtr,network,outp,true,qosBucket); } else { Packet outp(toZT,RR->identity.address(),Packet::VERB_FRAME); outp.append(network->id()); @@ -433,7 +435,7 @@ void Switch::onLocalEthernet(void *tPtr,const SharedPtr<Network> &network,const outp.append(data,len); if (!network->config().disableCompression()) outp.compress(); - send(tPtr,outp,true); + aqm_enqueue(tPtr,network,outp,true,qosBucket); } } else { @@ -442,7 +444,7 @@ void Switch::onLocalEthernet(void *tPtr,const SharedPtr<Network> &network,const // We filter with a NULL destination ZeroTier address first. Filtrations // for each ZT destination are also done below. This is the same rationale // and design as for multicast. - if (!network->filterOutgoingPacket(tPtr,false,RR->identity.address(),Address(),from,to,(const uint8_t *)data,len,etherType,vlanId)) { + if (!network->filterOutgoingPacket(tPtr,false,RR->identity.address(),Address(),from,to,(const uint8_t *)data,len,etherType,vlanId,qosBucket)) { RR->t->outgoingNetworkFrameDropped(tPtr,network,from,to,etherType,vlanId,len,"filter blocked"); return; } @@ -480,7 +482,7 @@ void Switch::onLocalEthernet(void *tPtr,const SharedPtr<Network> &network,const } for(unsigned int b=0;b<numBridges;++b) { - if (network->filterOutgoingPacket(tPtr,true,RR->identity.address(),bridges[b],from,to,(const uint8_t *)data,len,etherType,vlanId)) { + if (network->filterOutgoingPacket(tPtr,true,RR->identity.address(),bridges[b],from,to,(const uint8_t *)data,len,etherType,vlanId,qosBucket)) { Packet outp(bridges[b],RR->identity.address(),Packet::VERB_EXT_FRAME); outp.append(network->id()); outp.append((uint8_t)0x00); @@ -490,7 +492,7 @@ void Switch::onLocalEthernet(void *tPtr,const SharedPtr<Network> &network,const outp.append(data,len); if (!network->config().disableCompression()) outp.compress(); - send(tPtr,outp,true); + aqm_enqueue(tPtr,network,outp,true,qosBucket); } else { RR->t->outgoingNetworkFrameDropped(tPtr,network,from,to,etherType,vlanId,len,"filter blocked (bridge replication)"); } @@ -498,6 +500,263 @@ void Switch::onLocalEthernet(void *tPtr,const SharedPtr<Network> &network,const } } +void Switch::aqm_enqueue(void *tPtr, const SharedPtr<Network> &network, Packet &packet,bool encrypt,int qosBucket) +{ + if(!network->QoSEnabled()) { + send(tPtr, packet, encrypt); + return; + } + NetworkQoSControlBlock *nqcb = _netQueueControlBlock[network->id()]; + if (!nqcb) { + // DEBUG_INFO("creating network QoS control block (NQCB) for network %llx", network->id()); + nqcb = new NetworkQoSControlBlock(); + _netQueueControlBlock[network->id()] = nqcb; + // Initialize ZT_QOS_NUM_BUCKETS queues and place them in the INACTIVE list + // These queues will be shuffled between the new/old/inactive lists by the enqueue/dequeue algorithm + for (int i=0; i<ZT_QOS_NUM_BUCKETS; i++) { + nqcb->inactiveQueues.push_back(new ManagedQueue(i)); + } + } + + if (packet.verb() != Packet::VERB_FRAME && packet.verb() != Packet::VERB_EXT_FRAME) { + // DEBUG_INFO("skipping, no QoS for this packet, verb=%x", packet.verb()); + // just send packet normally, no QoS for ZT protocol traffic + send(tPtr, packet, encrypt); + } + + _aqm_m.lock(); + + // Enqueue packet and move queue to appropriate list + + const Address dest(packet.destination()); + TXQueueEntry *txEntry = new TXQueueEntry(dest,RR->node->now(),packet,encrypt); + + ManagedQueue *selectedQueue = nullptr; + for (int i=0; i<ZT_QOS_NUM_BUCKETS; i++) { + if (i < nqcb->oldQueues.size()) { // search old queues first (I think this is best since old would imply most recent usage of the queue) + if (nqcb->oldQueues[i]->id == qosBucket) { + selectedQueue = nqcb->oldQueues[i]; + } + } if (i < nqcb->newQueues.size()) { // search new queues (this would imply not often-used queues) + if (nqcb->newQueues[i]->id == qosBucket) { + selectedQueue = nqcb->newQueues[i]; + } + } if (i < nqcb->inactiveQueues.size()) { // search inactive queues + if (nqcb->inactiveQueues[i]->id == qosBucket) { + selectedQueue = nqcb->inactiveQueues[i]; + // move queue to end of NEW queue list + selectedQueue->byteCredit = ZT_QOS_QUANTUM; + // DEBUG_INFO("moving q=%p from INACTIVE to NEW list", selectedQueue); + nqcb->newQueues.push_back(selectedQueue); + nqcb->inactiveQueues.erase(nqcb->inactiveQueues.begin() + i); + } + } + } + if (!selectedQueue) { + return; + } + + selectedQueue->q.push_back(txEntry); + selectedQueue->byteLength+=txEntry->packet.payloadLength(); + nqcb->_currEnqueuedPackets++; + + // DEBUG_INFO("nq=%2lu, oq=%2lu, iq=%2lu, nqcb.size()=%3d, bucket=%2d, q=%p", nqcb->newQueues.size(), nqcb->oldQueues.size(), nqcb->inactiveQueues.size(), nqcb->_currEnqueuedPackets, qosBucket, selectedQueue); + + // Drop a packet if necessary + ManagedQueue *selectedQueueToDropFrom = nullptr; + if (nqcb->_currEnqueuedPackets > ZT_QOS_MAX_ENQUEUED_PACKETS) + { + // DEBUG_INFO("too many enqueued packets (%d), finding packet to drop", nqcb->_currEnqueuedPackets); + int maxQueueLength = 0; + for (int i=0; i<ZT_QOS_NUM_BUCKETS; i++) { + if (i < nqcb->oldQueues.size()) { + if (nqcb->oldQueues[i]->byteLength > maxQueueLength) { + maxQueueLength = nqcb->oldQueues[i]->byteLength; + selectedQueueToDropFrom = nqcb->oldQueues[i]; + } + } if (i < nqcb->newQueues.size()) { + if (nqcb->newQueues[i]->byteLength > maxQueueLength) { + maxQueueLength = nqcb->newQueues[i]->byteLength; + selectedQueueToDropFrom = nqcb->newQueues[i]; + } + } if (i < nqcb->inactiveQueues.size()) { + if (nqcb->inactiveQueues[i]->byteLength > maxQueueLength) { + maxQueueLength = nqcb->inactiveQueues[i]->byteLength; + selectedQueueToDropFrom = nqcb->inactiveQueues[i]; + } + } + } + if (selectedQueueToDropFrom) { + // DEBUG_INFO("dropping packet from head of largest queue (%d payload bytes)", maxQueueLength); + int sizeOfDroppedPacket = selectedQueueToDropFrom->q.front()->packet.payloadLength(); + delete selectedQueueToDropFrom->q.front(); + selectedQueueToDropFrom->q.pop_front(); + selectedQueueToDropFrom->byteLength-=sizeOfDroppedPacket; + nqcb->_currEnqueuedPackets--; + } + } + _aqm_m.unlock(); + aqm_dequeue(tPtr); +} + +uint64_t Switch::control_law(uint64_t t, int count) +{ + return t + ZT_QOS_INTERVAL / sqrt(count); +} + +Switch::dqr Switch::dodequeue(ManagedQueue *q, uint64_t now) +{ + dqr r; + r.ok_to_drop = false; + r.p = q->q.front(); + + if (r.p == NULL) { + q->first_above_time = 0; + return r; + } + uint64_t sojourn_time = now - r.p->creationTime; + if (sojourn_time < ZT_QOS_TARGET || q->byteLength <= ZT_DEFAULT_MTU) { + // went below - stay below for at least interval + q->first_above_time = 0; + } else { + if (q->first_above_time == 0) { + // just went above from below. if still above at + // first_above_time, will say it's ok to drop. + q->first_above_time = now + ZT_QOS_INTERVAL; + } else if (now >= q->first_above_time) { + r.ok_to_drop = true; + } + } + return r; +} + +Switch::TXQueueEntry * Switch::CoDelDequeue(ManagedQueue *q, bool isNew, uint64_t now) +{ + dqr r = dodequeue(q, now); + + if (q->dropping) { + if (!r.ok_to_drop) { + q->dropping = false; + } + while (now >= q->drop_next && q->dropping) { + q->q.pop_front(); // drop + r = dodequeue(q, now); + if (!r.ok_to_drop) { + // leave dropping state + q->dropping = false; + } else { + ++(q->count); + // schedule the next drop. + q->drop_next = control_law(q->drop_next, q->count); + } + } + } else if (r.ok_to_drop) { + q->q.pop_front(); // drop + r = dodequeue(q, now); + q->dropping = true; + q->count = (q->count > 2 && now - q->drop_next < 8*ZT_QOS_INTERVAL)? + q->count - 2 : 1; + q->drop_next = control_law(now, q->count); + } + return r.p; +} + +void Switch::aqm_dequeue(void *tPtr) +{ + // Cycle through network-specific QoS control blocks + for(std::map<uint64_t,NetworkQoSControlBlock*>::iterator nqcb(_netQueueControlBlock.begin());nqcb!=_netQueueControlBlock.end();) { + if (!(*nqcb).second->_currEnqueuedPackets) { + return; + } + + uint64_t now = RR->node->now(); + TXQueueEntry *entryToEmit = nullptr; + std::vector<ManagedQueue*> *currQueues = &((*nqcb).second->newQueues); + std::vector<ManagedQueue*> *oldQueues = &((*nqcb).second->oldQueues); + std::vector<ManagedQueue*> *inactiveQueues = &((*nqcb).second->inactiveQueues); + + _aqm_m.lock(); + + // Attempt dequeue from queues in NEW list + bool examiningNewQueues = true; + while (currQueues->size()) { + ManagedQueue *queueAtFrontOfList = currQueues->front(); + if (queueAtFrontOfList->byteCredit < 0) { + queueAtFrontOfList->byteCredit += ZT_QOS_QUANTUM; + // Move to list of OLD queues + // DEBUG_INFO("moving q=%p from NEW to OLD list", queueAtFrontOfList); + oldQueues->push_back(queueAtFrontOfList); + currQueues->erase(currQueues->begin()); + } else { + entryToEmit = CoDelDequeue(queueAtFrontOfList, examiningNewQueues, now); + if (!entryToEmit) { + // Move to end of list of OLD queues + // DEBUG_INFO("moving q=%p from NEW to OLD list", queueAtFrontOfList); + oldQueues->push_back(queueAtFrontOfList); + currQueues->erase(currQueues->begin()); + } + else { + int len = entryToEmit->packet.payloadLength(); + queueAtFrontOfList->byteLength -= len; + queueAtFrontOfList->byteCredit -= len; + // Send the packet! + queueAtFrontOfList->q.pop_front(); + send(tPtr, entryToEmit->packet, entryToEmit->encrypt); + (*nqcb).second->_currEnqueuedPackets--; + } + if (queueAtFrontOfList) { + //DEBUG_INFO("dequeuing from q=%p, len=%lu in NEW list (byteCredit=%d)", queueAtFrontOfList, queueAtFrontOfList->q.size(), queueAtFrontOfList->byteCredit); + } + break; + } + } + + // Attempt dequeue from queues in OLD list + examiningNewQueues = false; + currQueues = &((*nqcb).second->oldQueues); + while (currQueues->size()) { + ManagedQueue *queueAtFrontOfList = currQueues->front(); + if (queueAtFrontOfList->byteCredit < 0) { + queueAtFrontOfList->byteCredit += ZT_QOS_QUANTUM; + oldQueues->push_back(queueAtFrontOfList); + currQueues->erase(currQueues->begin()); + } else { + entryToEmit = CoDelDequeue(queueAtFrontOfList, examiningNewQueues, now); + if (!entryToEmit) { + //DEBUG_INFO("moving q=%p from OLD to INACTIVE list", queueAtFrontOfList); + // Move to inactive list of queues + inactiveQueues->push_back(queueAtFrontOfList); + currQueues->erase(currQueues->begin()); + } + else { + int len = entryToEmit->packet.payloadLength(); + queueAtFrontOfList->byteLength -= len; + queueAtFrontOfList->byteCredit -= len; + queueAtFrontOfList->q.pop_front(); + send(tPtr, entryToEmit->packet, entryToEmit->encrypt); + (*nqcb).second->_currEnqueuedPackets--; + } + if (queueAtFrontOfList) { + //DEBUG_INFO("dequeuing from q=%p, len=%lu in OLD list (byteCredit=%d)", queueAtFrontOfList, queueAtFrontOfList->q.size(), queueAtFrontOfList->byteCredit); + } + break; + } + } + nqcb++; + _aqm_m.unlock(); + } +} + +void Switch::removeNetworkQoSControlBlock(uint64_t nwid) +{ + NetworkQoSControlBlock *nq = _netQueueControlBlock[nwid]; + if (nq) { + _netQueueControlBlock.erase(nwid); + delete nq; + nq = NULL; + } +} + void Switch::send(void *tPtr,Packet &packet,bool encrypt) { const Address dest(packet.destination()); @@ -557,6 +816,7 @@ void Switch::doAnythingWaitingForPeer(void *tPtr,const SharedPtr<Peer> &peer) { Mutex::Lock _l(_txQueue_m); + for(std::list< TXQueueEntry >::iterator txi(_txQueue.begin());txi!=_txQueue.end();) { if (txi->dest == peer->address()) { if (_trySend(tPtr,txi->packet,txi->encrypt)) { @@ -581,6 +841,7 @@ unsigned long Switch::doTimerTasks(void *tPtr,int64_t now) std::vector<Address> needWhois; { Mutex::Lock _l(_txQueue_m); + for(std::list< TXQueueEntry >::iterator txi(_txQueue.begin());txi!=_txQueue.end();) { if (_trySend(tPtr,txi->packet,txi->encrypt)) { _txQueue.erase(txi++); @@ -654,12 +915,12 @@ bool Switch::_trySend(void *tPtr,Packet &packet,bool encrypt) const SharedPtr<Peer> peer(RR->topology->getPeer(tPtr,destination)); if (peer) { - viaPath = peer->getBestPath(now,false); + viaPath = peer->getAppropriatePath(now,false); if (!viaPath) { peer->tryMemorizedPath(tPtr,now); // periodically attempt memorized or statically defined paths, if any are known const SharedPtr<Peer> relay(RR->topology->getUpstreamPeer()); - if ( (!relay) || (!(viaPath = relay->getBestPath(now,false))) ) { - if (!(viaPath = peer->getBestPath(now,true))) + if ( (!relay) || (!(viaPath = relay->getAppropriatePath(now,false))) ) { + if (!(viaPath = peer->getAppropriatePath(now,true))) return false; } } @@ -674,6 +935,8 @@ bool Switch::_trySend(void *tPtr,Packet &packet,bool encrypt) unsigned int chunkSize = std::min(packet.size(),mtu); packet.setFragmented(chunkSize < packet.size()); + peer->recordOutgoingPacket(viaPath, packet.packetId(), packet.payloadLength(), packet.verb(), now); + if (trustedPathId) { packet.setTrusted(trustedPathId); } else { |