summaryrefslogtreecommitdiff
path: root/service
diff options
context:
space:
mode:
authorAdam Ierymenko <adam.ierymenko@gmail.com>2018-11-13 10:19:51 -0800
committerAdam Ierymenko <adam.ierymenko@gmail.com>2018-11-13 10:19:51 -0800
commitf6450cd7e14bc15ea7361a4f9e8e9a09fa15228d (patch)
treee3d8d90f96b330dc36ef1f5d78e85e97a573488d /service
parent3b6b1d167427b3d3873070bb111f18db5dedac1a (diff)
downloadinfinitytier-f6450cd7e14bc15ea7361a4f9e8e9a09fa15228d.tar.gz
infinitytier-f6450cd7e14bc15ea7361a4f9e8e9a09fa15228d.zip
Cleanup and a minor performance improvement.
Diffstat (limited to 'service')
-rw-r--r--service/OneService.cpp34
1 files changed, 14 insertions, 20 deletions
diff --git a/service/OneService.cpp b/service/OneService.cpp
index ad5680c2..796580af 100644
--- a/service/OneService.cpp
+++ b/service/OneService.cpp
@@ -468,12 +468,13 @@ public:
unsigned int _incomingPacketThreadPoolSize;
struct {
uint8_t data[2048];
- std::thread thr;
+ uint64_t now;
int64_t sock;
struct sockaddr_storage from;
int size;
std::condition_variable cond;
std::mutex lock;
+ std::thread thr;
} _incomingPacketWorker[ZT_INCOMING_PACKET_MAX_THREAD_POOL_SIZE];
// Local configuration and memo-ized information from it
@@ -607,21 +608,16 @@ public:
_incomingPacketThreadPoolSize = std::max(std::min((unsigned int)std::thread::hardware_concurrency(),(unsigned int)ZT_INCOMING_PACKET_MAX_THREAD_POOL_SIZE),(unsigned int)1);
for(unsigned int tn=0;tn<_incomingPacketThreadPoolSize;++tn) {
- _incomingPacketWorker[tn].thr = std::thread([this,tn]() {
- std::unique_lock<std::mutex> l(_incomingPacketWorker[tn].lock);
+ const unsigned int tno = tn;
+ _incomingPacketWorker[tn].thr = std::thread([this,tno]() {
+ std::unique_lock<std::mutex> l(_incomingPacketWorker[tno].lock);
for(;;) {
- _incomingPacketWorker[tn].cond.wait(l);
- if (_incomingPacketWorker[tn].size < 0) {
+ _incomingPacketWorker[tno].cond.wait(l);
+ const int s = _incomingPacketWorker[tno].size;
+ if (s < 0) {
break;
- } else if (_incomingPacketWorker[tn].size > 0) {
- const ZT_ResultCode rc = _node->processWirePacket(
- (void *)0,
- OSUtils::now(),
- _incomingPacketWorker[tn].sock,
- &(_incomingPacketWorker[tn].from),
- _incomingPacketWorker[tn].data,
- (unsigned int)_incomingPacketWorker[tn].size,
- &_nextBackgroundTaskDeadline);
+ } else if (s > 0) {
+ const ZT_ResultCode rc = _node->processWirePacket(nullptr,_incomingPacketWorker[tno].now,_incomingPacketWorker[tno].sock,&(_incomingPacketWorker[tno].from),_incomingPacketWorker[tno].data,(unsigned int)s,&_nextBackgroundTaskDeadline);
if (ZT_ResultCode_isFatal(rc)) {
char tmp[256];
OSUtils::ztsnprintf(tmp,sizeof(tmp),"fatal error code from processWirePacket: %d",(int)rc);
@@ -1896,8 +1892,9 @@ public:
inline void phyOnDatagram(PhySocket *sock,void **uptr,const struct sockaddr *localAddr,const struct sockaddr *from,void *data,unsigned long len)
{
+ const uint64_t now = OSUtils::now();
if ((len >= 16)&&(reinterpret_cast<const InetAddress *>(from)->ipScope() == InetAddress::IP_SCOPE_GLOBAL))
- _lastDirectReceiveFromGlobal = OSUtils::now();
+ _lastDirectReceiveFromGlobal = now;
/* Pick worker thread by checksumming the from address. This avoids thread
* scheduling caused packet re-ordering by binding each individual remote
@@ -1916,20 +1913,17 @@ public:
for(unsigned int i=0;i<16;++i)
cksum += ((const struct sockaddr_in6 *)from)->sin6_addr.s6_addr[i];
break;
- default:
- for(unsigned int i=0;i<sizeof(struct sockaddr_storage);++i)
- cksum += ((uint8_t *)from)[i];
- break;
}
const unsigned int tn = cksum % _incomingPacketThreadPoolSize;
_incomingPacketWorker[tn].lock.lock();
ZT_FAST_MEMCPY(_incomingPacketWorker[tn].data,data,len);
+ _incomingPacketWorker[tn].now = now;
_incomingPacketWorker[tn].sock = reinterpret_cast<int64_t>(sock);
ZT_FAST_MEMCPY(&_incomingPacketWorker[tn].from,from,sizeof(struct sockaddr_storage));
_incomingPacketWorker[tn].size = (int)len;
+ _incomingPacketWorker[tn].cond.notify_one();
_incomingPacketWorker[tn].lock.unlock();
- _incomingPacketWorker[tn].cond.notify_all();
}
inline void phyOnTcpConnect(PhySocket *sock,void **uptr,bool success)