From f873881a0ddf2043758b3e7925c95168600f42da Mon Sep 17 00:00:00 2001 From: Adam Ierymenko Date: Tue, 28 Oct 2014 12:28:45 -0700 Subject: Helps to use a proper multithreaded queue instead of ugly plebian hackery. --- testnet/MTQ.hpp | 181 ++++++++++++++++++++++++++++++++++++++++ testnet/Semaphore.hpp | 142 ------------------------------- testnet/SimNetSocketManager.cpp | 26 +----- testnet/SimNetSocketManager.hpp | 12 +-- testnet/TestEthernetTap.cpp | 41 +++------ testnet/TestEthernetTap.hpp | 20 ++--- 6 files changed, 204 insertions(+), 218 deletions(-) create mode 100644 testnet/MTQ.hpp delete mode 100644 testnet/Semaphore.hpp (limited to 'testnet') diff --git a/testnet/MTQ.hpp b/testnet/MTQ.hpp new file mode 100644 index 00000000..2a2fe1cd --- /dev/null +++ b/testnet/MTQ.hpp @@ -0,0 +1,181 @@ +/* + * ZeroTier One - Global Peer to Peer Ethernet + * Copyright (C) 2012-2014 ZeroTier Networks LLC + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + * + * -- + * + * ZeroTier may be used and distributed under the terms of the GPLv3, which + * are available at: http://www.gnu.org/licenses/gpl-3.0.html + * + * If you would like to embed ZeroTier into a commercial application or + * redistribute it in a modified binary form, please contact ZeroTier Networks + * LLC. Start here: http://www.zerotier.com/ + */ + +#ifndef ZT_MTQ_HPP +#define ZT_MTQ_HPP + +#include +#include + +#include + +#include "../node/Constants.hpp" +#include "../node/NonCopyable.hpp" +#include "../node/Utils.hpp" + +#ifdef __WINDOWS__ +#include +#else +#include +#include +#endif + +namespace ZeroTier { + +/** + * A synchronized multithreaded FIFO queue + * + * This is designed for a use case where one thread pushes, the + * other pops. + */ +template +class MTQ : NonCopyable +{ +public: + MTQ() + { +#ifdef __WINDOWS__ + _sem = CreateSemaphore(NULL,0,0x7fffffff,NULL); + InitializeCriticalSection(&_cs); +#else + pthread_mutex_init(&_mh,(const pthread_mutexattr_t *)0); + pthread_cond_init(&_cond,(const pthread_condattr_t *)0); +#endif + } + + ~MTQ() + { +#ifdef __WINDOWS__ + CloseHandle(_sem); + DeleteCriticalSection(&_cs); +#else + pthread_cond_destroy(&_cond); + pthread_mutex_destroy(&_mh); +#endif + } + + /** + * Push something onto the end of the FIFO and signal waiting thread(s) + * + * @param v Value to push + */ + inline void push(const T &v) + { +#ifdef __WINDOWS__ + EnterCriticalSection(&_cs); + try { + _q.push(v); + LeaveCriticalSection(&_cs); + ReleaseSemaphore(_sem,1,NULL); + } catch ( ... ) { + LeaveCriticalSection(&_cs); + throw; + } +#else + pthread_mutex_lock(const_cast (&_mh)); + try { + _q.push(v); + pthread_mutex_unlock(const_cast (&_mh)); + pthread_cond_signal(const_cast (&_cond)); + } catch ( ... ) { + pthread_mutex_unlock(const_cast (&_mh)); + throw; + } +#endif + } + + /** + * Pop fron queue with optional timeout + * + * @param v Result parameter to set to next value + * @param ms Milliseconds timeout or 0 for none + * @return True if v was set to something, false on timeout + */ + inline bool pop(T &v,unsigned long ms = 0) + { +#ifdef __WINDOWS__ + if (ms > 0) + WaitForSingleObject(_sem,(DWORD)ms); + else WaitForSingleObject(_sem,INFINITE); + EnterCriticalSection(&_cs); + try { + if (_q.empty()) { + LeaveCriticalSection(&_cs); + return false; + } else { + v = _q.front(); + _q.pop(); + LeaveCriticalSection(&_cs); + return true; + } + } catch ( ... ) { + LeaveCriticalSection(&_cs); + throw; + } +#else + pthread_mutex_lock(const_cast (&_mh)); + try { + if (_q.empty()) { + if (ms > 0) { + uint64_t when = Utils::now() + (uint64_t)ms; + struct timespec ts; + ts.tv_sec = (unsigned long)(when / 1000); + ts.tv_nsec = (unsigned long)(when % 1000) * (unsigned long)1000000; + pthread_cond_timedwait(const_cast (&_cond),const_cast (&_mh),&ts); + } else { + pthread_cond_wait(const_cast (&_cond),const_cast (&_mh)); + } + if (_q.empty()) { + pthread_mutex_unlock(const_cast (&_mh)); + return false; + } + } + v = _q.front(); + _q.pop(); + pthread_mutex_unlock(const_cast (&_mh)); + return true; + } catch ( ... ) { + pthread_mutex_unlock(const_cast (&_mh)); + throw; + } +#endif + } + +private: + std::queue _q; +#ifdef __WINDOWS__ + HANDLE _sem; + CRITICAL_SECTION _cs; +#else + pthread_cond_t _cond; + pthread_mutex_t _mh; +#endif +}; + +} // namespace ZeroTier + +#endif diff --git a/testnet/Semaphore.hpp b/testnet/Semaphore.hpp deleted file mode 100644 index d1c0d416..00000000 --- a/testnet/Semaphore.hpp +++ /dev/null @@ -1,142 +0,0 @@ -/* - * ZeroTier One - Global Peer to Peer Ethernet - * Copyright (C) 2012-2014 ZeroTier Networks LLC - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . - * - * -- - * - * ZeroTier may be used and distributed under the terms of the GPLv3, which - * are available at: http://www.gnu.org/licenses/gpl-3.0.html - * - * If you would like to embed ZeroTier into a commercial application or - * redistribute it in a modified binary form, please contact ZeroTier Networks - * LLC. Start here: http://www.zerotier.com/ - */ - -#ifndef ZT_SEMAPHORE_HPP -#define ZT_SEMAPHORE_HPP - -#include "../node/Constants.hpp" -#include "../node/NonCopyable.hpp" - -#ifdef __WINDOWS__ - -#include -#include - -namespace ZeroTier { - -class Semaphore : NonCopyable -{ -public: - Semaphore() throw() { _sem = CreateSemaphore(NULL,0,0x7fffffff,NULL); } - ~Semaphore() { CloseHandle(_sem); } - - inline void wait(unsigned long ms = 0) const - throw() - { - if (ms > 0) - WaitForSingleObject(_sem,(DWORD)ms); - else WaitForSingleObject(_sem,INFINITE); - } - - inline void signal() const - throw() - { - ReleaseSemaphore(_sem,1,NULL); - } - -private: - HANDLE _sem; -}; - -} // namespace ZeroTier - -#else // !__WINDOWS__ - -#include -#include -#include - -#include "../node/Utils.hpp" - -namespace ZeroTier { - -// This isn't quite a perfect semaphore, but the way we use it it's fine... we -// just want this to signal when queues are ready. -class Semaphore : NonCopyable -{ -public: - Semaphore() - throw() - { - pthread_mutex_init(&_mh,(const pthread_mutexattr_t *)0); - pthread_cond_init(&_cond,(const pthread_condattr_t *)0); - _cnt = 0; - } - - ~Semaphore() - { - pthread_cond_destroy(&_cond); - pthread_mutex_destroy(&_mh); - } - - inline void wait() - throw() - { - pthread_mutex_lock(const_cast (&_mh)); - if (_cnt <= 0) - pthread_cond_wait(const_cast (&_cond),const_cast (&_mh)); - if (_cnt > 0) - --_cnt; - pthread_mutex_unlock(const_cast (&_mh)); - } - - inline void wait(unsigned long ms) - throw() - { - uint64_t when = Utils::now() + (uint64_t)ms; - struct timespec ts; - ts.tv_sec = (unsigned long)(when / 1000); - ts.tv_nsec = (unsigned long)(when % 1000) * 1000000; - - pthread_mutex_lock(const_cast (&_mh)); - if (_cnt <= 0) - pthread_cond_timedwait(const_cast (&_cond),const_cast (&_mh),&ts); - if (_cnt > 0) - --_cnt; - pthread_mutex_unlock(const_cast (&_mh)); - } - - inline void signal() - throw() - { - pthread_mutex_lock(const_cast (&_mh)); - ++_cnt; - pthread_mutex_unlock(const_cast (&_mh)); - pthread_cond_signal(const_cast (&_cond)); - } - -private: - pthread_cond_t _cond; - pthread_mutex_t _mh; - volatile int _cnt; -}; - -} // namespace ZeroTier - -#endif // !__WINDOWS__ - -#endif diff --git a/testnet/SimNetSocketManager.cpp b/testnet/SimNetSocketManager.cpp index a520f5ca..9269992a 100644 --- a/testnet/SimNetSocketManager.cpp +++ b/testnet/SimNetSocketManager.cpp @@ -73,32 +73,14 @@ bool SimNetSocketManager::send(const InetAddress &to,bool tcp,bool autoConnectTc void SimNetSocketManager::poll(unsigned long timeout,void (*handler)(const SharedPtr &,void *,const InetAddress &,Buffer &),void *arg) { - std::vector< std::pair< InetAddress,Buffer > > inb; - - { - Mutex::Lock _l(_inbox_m); - inb = _inbox; - _inbox.clear(); - } - for(std::vector< std::pair< InetAddress,Buffer > >::iterator i(inb.begin());i!=inb.end();++i) - handler(_mySocket,arg,i->first,i->second); - - if (timeout) - _waitCond.wait(timeout); - else _waitCond.wait(); - - { - Mutex::Lock _l(_inbox_m); - inb = _inbox; - _inbox.clear(); - } - for(std::vector< std::pair< InetAddress,Buffer > >::iterator i(inb.begin());i!=inb.end();++i) - handler(_mySocket,arg,i->first,i->second); + std::pair< InetAddress,Buffer > msg; + if ((_inbox.pop(msg,timeout))&&(msg.second.size())) + handler(_mySocket,arg,msg.first,msg.second); } void SimNetSocketManager::whack() { - _waitCond.signal(); + _inbox.push(std::pair< InetAddress,Buffer >()); } void SimNetSocketManager::closeTcpSockets() diff --git a/testnet/SimNetSocketManager.hpp b/testnet/SimNetSocketManager.hpp index df587072..a34728db 100644 --- a/testnet/SimNetSocketManager.hpp +++ b/testnet/SimNetSocketManager.hpp @@ -35,7 +35,8 @@ #include "../node/Constants.hpp" #include "../node/SocketManager.hpp" #include "../node/Mutex.hpp" -#include "Semaphore.hpp" + +#include "MTQ.hpp" namespace ZeroTier { @@ -96,9 +97,7 @@ public: */ inline void enqueue(const InetAddress &from,const void *data,unsigned int len) { - Mutex::Lock _l(_inbox_m); - _inbox.push_back(std::pair< InetAddress,Buffer >(from,Buffer(data,len))); - _waitCond.signal(); + _inbox.push(std::pair< InetAddress,Buffer >(from,Buffer(data,len))); } virtual bool send(const InetAddress &to,bool tcp,bool autoConnectTcp,const void *msg,unsigned int msglen); @@ -114,13 +113,10 @@ private: SharedPtr _mySocket; TransferStats _totals; - std::vector< std::pair< InetAddress,Buffer > > _inbox; - Mutex _inbox_m; + MTQ< std::pair< InetAddress,Buffer > > _inbox; std::map< InetAddress,TransferStats > _stats; Mutex _stats_m; - - Semaphore _waitCond; }; } // namespace ZeroTier diff --git a/testnet/TestEthernetTap.cpp b/testnet/TestEthernetTap.cpp index 3d29d6e2..6a151aca 100644 --- a/testnet/TestEthernetTap.cpp +++ b/testnet/TestEthernetTap.cpp @@ -77,12 +77,8 @@ TestEthernetTap::TestEthernetTap( TestEthernetTap::~TestEthernetTap() { - static const TestFrame zf; - { - Mutex::Lock _l(_pq_m); - _pq.push_back(zf); // 0 length frame = exit - _pq_c.signal(); - } + static const TestFrame zf; // use a static empty frame because of weirdo G++ warning bug... + _pq.push(zf); // empty frame terminates thread Thread::join(_thread); } @@ -113,8 +109,7 @@ std::set TestEthernetTap::ips() const void TestEthernetTap::put(const MAC &from,const MAC &to,unsigned int etherType,const void *data,unsigned int len) { - Mutex::Lock _l(_gq_m); - _gq.push_back(TestFrame(from,to,data,etherType,len)); + _gq.push(TestFrame(from,to,data,etherType,len)); } std::string TestEthernetTap::deviceName() const @@ -135,38 +130,22 @@ bool TestEthernetTap::injectPacketFromHost(const MAC &from,const MAC &to,unsigne { if ((len == 0)||(len > 2800)) return false; - - { - Mutex::Lock _l(_pq_m); - _pq.push_back(TestFrame(from,to,data,etherType & 0xffff,len)); - _pq_c.signal(); - } - + _pq.push(TestFrame(from,to,data,etherType & 0xffff,len)); return true; } void TestEthernetTap::threadMain() throw() { - std::vector q; + TestFrame f; for(;;) { - { - Mutex::Lock _l(_pq_m); - q = _pq; - _pq.clear(); - } - - for(std::vector::iterator f(q.begin());f!=q.end();++f) { - if (!f->len) - return; // empty frame signals thread to die - else if (_enabled) { + if (_pq.pop(f,0)) { + if (f.len) { try { - _handler(_arg,f->from,f->to,f->etherType,Buffer<4096>(f->data,f->len)); - } catch ( ... ) {} // handlers should not throw - } + _handler(_arg,f.from,f.to,f.etherType,Buffer<4096>(f.data,f.len)); + } catch ( ... ) {} + } else break; } - - _pq_c.wait(1000); } } diff --git a/testnet/TestEthernetTap.hpp b/testnet/TestEthernetTap.hpp index 3b1782e3..cfe8807c 100644 --- a/testnet/TestEthernetTap.hpp +++ b/testnet/TestEthernetTap.hpp @@ -40,7 +40,8 @@ #include "../node/SharedPtr.hpp" #include "../node/Thread.hpp" #include "../node/Mutex.hpp" -#include "Semaphore.hpp" + +#include "MTQ.hpp" namespace ZeroTier { @@ -105,14 +106,7 @@ public: virtual bool injectPacketFromHost(const MAC &from,const MAC &to,unsigned int etherType,const void *data,unsigned int len); inline uint64_t nwid() const { return _nwid; } - - // Get things that have been put() and empty queue - inline void get(std::vector &v) - { - Mutex::Lock _l(_gq_m); - v = _gq; - _gq.clear(); - } + inline bool getNextReceivedFrame(TestFrame &v,unsigned long timeout) { return _gq.pop(v,timeout); } void threadMain() throw(); @@ -127,12 +121,8 @@ private: std::string _dev; volatile bool _enabled; - std::vector< TestFrame > _pq; - Mutex _pq_m; - Semaphore _pq_c; - - std::vector< TestFrame > _gq; - Mutex _gq_m; + MTQ _pq; + MTQ _gq; AtomicCounter __refCount; }; -- cgit v1.2.3