summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAdam Ierymenko <adam.ierymenko@gmail.com>2017-04-24 20:51:02 -0700
committerAdam Ierymenko <adam.ierymenko@gmail.com>2017-04-24 20:51:02 -0700
commit4f2a77976965f19640416afca24367d3037820b8 (patch)
treee68402e6564180cfe39b1d77ffc51c87120b7dd3
parentcafbe44dde55e57115cc654610172556dff19bec (diff)
downloadinfinitytier-4f2a77976965f19640416afca24367d3037820b8.tar.gz
infinitytier-4f2a77976965f19640416afca24367d3037820b8.zip
JSONDB performance improvements, threading fix.
-rw-r--r--controller/EmbeddedNetworkController.cpp59
-rw-r--r--controller/EmbeddedNetworkController.hpp1
-rw-r--r--controller/JSONDB.cpp57
-rw-r--r--controller/JSONDB.hpp31
-rw-r--r--node/Node.cpp3
-rw-r--r--osdep/Thread.hpp42
6 files changed, 96 insertions, 97 deletions
diff --git a/controller/EmbeddedNetworkController.cpp b/controller/EmbeddedNetworkController.cpp
index 3b901afe..84906849 100644
--- a/controller/EmbeddedNetworkController.cpp
+++ b/controller/EmbeddedNetworkController.cpp
@@ -431,6 +431,7 @@ static bool _parseRule(json &r,ZT_VirtualNetworkRule &rule)
EmbeddedNetworkController::EmbeddedNetworkController(Node *node,const char *dbPath) :
_startTime(OSUtils::now()),
+ _running(true),
_db(dbPath),
_node(node)
{
@@ -438,12 +439,19 @@ EmbeddedNetworkController::EmbeddedNetworkController(Node *node,const char *dbPa
EmbeddedNetworkController::~EmbeddedNetworkController()
{
- Mutex::Lock _l(_threads_m);
- if (_threads.size() > 0) {
- for(unsigned long i=0;i<(((unsigned long)_threads.size())*2);++i)
+ _running = false;
+ std::vector<Thread> t;
+ {
+ Mutex::Lock _l(_threads_m);
+ t = _threads;
+ }
+ if (t.size() > 0) {
+ for(unsigned long i=0,j=(unsigned long)(t.size() * 4);i<j;++i)
_queue.post((_RQEntry *)0);
- for(std::vector<Thread>::iterator i(_threads.begin());i!=_threads.end();++i)
+ /*
+ for(std::vector<Thread>::iterator i(t.begin());i!=t.end();++i)
Thread::join(*i);
+ */
}
}
@@ -1111,23 +1119,23 @@ void EmbeddedNetworkController::threadMain()
throw()
{
uint64_t lastCircuitTestCheck = 0;
- for(;;) {
- _RQEntry *const qe = _queue.get(); // waits on next request
- if (!qe) break; // enqueue a NULL to terminate threads
+ _RQEntry *qe = (_RQEntry *)0;
+ while ((_running)&&((qe = _queue.get()))) {
try {
_request(qe->nwid,qe->fromAddr,qe->requestPacketId,qe->identity,qe->metaData);
} catch ( ... ) {}
delete qe;
-
- uint64_t now = OSUtils::now();
- if ((now - lastCircuitTestCheck) > ZT_EMBEDDEDNETWORKCONTROLLER_CIRCUIT_TEST_EXPIRATION) {
- lastCircuitTestCheck = now;
- Mutex::Lock _l(_tests_m);
- for(std::list< ZT_CircuitTest >::iterator i(_tests.begin());i!=_tests.end();) {
- if ((now - i->timestamp) > ZT_EMBEDDEDNETWORKCONTROLLER_CIRCUIT_TEST_EXPIRATION) {
- _node->circuitTestEnd(&(*i));
- _tests.erase(i++);
- } else ++i;
+ if (_running) {
+ uint64_t now = OSUtils::now();
+ if ((now - lastCircuitTestCheck) > ZT_EMBEDDEDNETWORKCONTROLLER_CIRCUIT_TEST_EXPIRATION) {
+ lastCircuitTestCheck = now;
+ Mutex::Lock _l(_tests_m);
+ for(std::list< ZT_CircuitTest >::iterator i(_tests.begin());i!=_tests.end();) {
+ if ((now - i->timestamp) > ZT_EMBEDDEDNETWORKCONTROLLER_CIRCUIT_TEST_EXPIRATION) {
+ _node->circuitTestEnd(&(*i));
+ _tests.erase(i++);
+ } else ++i;
+ }
}
}
}
@@ -1723,13 +1731,11 @@ void EmbeddedNetworkController::_getNetworkMemberInfo(uint64_t now,uint64_t nwid
char pfx[256];
Utils::snprintf(pfx,sizeof(pfx),"network/%.16llx/member",nwid);
- {
- Mutex::Lock _l(_nmiCache_m);
- std::map<uint64_t,_NetworkMemberInfo>::iterator c(_nmiCache.find(nwid));
- if ((c != _nmiCache.end())&&((now - c->second.nmiTimestamp) < 1000)) { // a short duration cache but limits CPU use on big networks
- nmi = c->second;
- return;
- }
+ Mutex::Lock _l(_nmiCache_m);
+ std::map<uint64_t,_NetworkMemberInfo>::iterator c(_nmiCache.find(nwid));
+ if ((c != _nmiCache.end())&&((now - c->second.nmiTimestamp) < 1000)) { // a short duration cache but limits CPU use on big networks
+ nmi = c->second;
+ return;
}
_db.filter(pfx,[&nmi,&now](const std::string &n,const json &member) {
@@ -1770,10 +1776,7 @@ void EmbeddedNetworkController::_getNetworkMemberInfo(uint64_t now,uint64_t nwid
});
nmi.nmiTimestamp = now;
- {
- Mutex::Lock _l(_nmiCache_m);
- _nmiCache[nwid] = nmi;
- }
+ _nmiCache[nwid] = nmi;
}
void EmbeddedNetworkController::_pushMemberUpdate(uint64_t now,uint64_t nwid,const nlohmann::json &member)
diff --git a/controller/EmbeddedNetworkController.hpp b/controller/EmbeddedNetworkController.hpp
index 906f4345..04f52c7d 100644
--- a/controller/EmbeddedNetworkController.hpp
+++ b/controller/EmbeddedNetworkController.hpp
@@ -178,6 +178,7 @@ private:
const uint64_t _startTime;
+ volatile bool _running;
BlockingQueue<_RQEntry *> _queue;
std::vector<Thread> _threads;
Mutex _threads_m;
diff --git a/controller/JSONDB.cpp b/controller/JSONDB.cpp
index dd8e3968..509250b9 100644
--- a/controller/JSONDB.cpp
+++ b/controller/JSONDB.cpp
@@ -50,7 +50,7 @@ JSONDB::JSONDB(const std::string &basePath) :
OSUtils::mkdir(_basePath.c_str());
OSUtils::lockDownFile(_basePath.c_str(),true); // networks might contain auth tokens, etc., so restrict directory permissions
}
- _ready = _reload(_basePath,std::string());
+ _reload(_basePath,std::string());
}
bool JSONDB::writeRaw(const std::string &n,const std::string &obj)
@@ -87,16 +87,16 @@ bool JSONDB::put(const std::string &n,const nlohmann::json &obj)
nlohmann::json JSONDB::get(const std::string &n)
{
- {
- Mutex::Lock _l(_db_m);
+ while (!_ready) {
+ Thread::sleep(250);
+ _reload(_basePath,std::string());
+ }
- while (!_ready) {
- Thread::sleep(250);
- _ready = _reload(_basePath,std::string());
- }
+ if (!_isValidObjectName(n))
+ return _EMPTY_JSON;
- if (!_isValidObjectName(n))
- return _EMPTY_JSON;
+ {
+ Mutex::Lock _l(_db_m);
std::map<std::string,_E>::iterator e(_db.find(n));
if (e != _db.end())
return e->second.obj;
@@ -116,14 +116,16 @@ nlohmann::json JSONDB::get(const std::string &n)
return _EMPTY_JSON;
}
- try {
+ {
Mutex::Lock _l(_db_m);
- _E &e2 = _db[n];
- e2.obj = OSUtils::jsonParse(buf);
- return e2.obj;
- } catch ( ... ) {
- _db.erase(n);
- return _EMPTY_JSON;
+ try {
+ _E &e2 = _db[n];
+ e2.obj = OSUtils::jsonParse(buf);
+ return e2.obj;
+ } catch ( ... ) {
+ _db.erase(n);
+ return _EMPTY_JSON;
+ }
}
}
@@ -131,7 +133,15 @@ void JSONDB::erase(const std::string &n)
{
if (!_isValidObjectName(n))
return;
+ _erase(n);
+ {
+ Mutex::Lock _l(_db_m);
+ _db.erase(n);
+ }
+}
+void JSONDB::_erase(const std::string &n)
+{
if (_httpAddr) {
std::string body;
std::map<std::string,std::string> headers;
@@ -142,17 +152,12 @@ void JSONDB::erase(const std::string &n)
return;
OSUtils::rm(path.c_str());
}
-
- {
- Mutex::Lock _l(_db_m);
- _db.erase(n);
- }
}
-bool JSONDB::_reload(const std::string &p,const std::string &b)
+void JSONDB::_reload(const std::string &p,const std::string &b)
{
- // Assumes _db_m is locked
if (_httpAddr) {
+ Mutex::Lock _l(_db_m);
std::string body;
std::map<std::string,std::string> headers;
const unsigned int sc = Http::GET(2147483647,ZT_JSONDB_HTTP_TIMEOUT,reinterpret_cast<const struct sockaddr *>(&_httpAddr),_basePath.c_str(),_ZT_JSONDB_GET_HEADERS,headers,body);
@@ -161,18 +166,19 @@ bool JSONDB::_reload(const std::string &p,const std::string &b)
nlohmann::json dbImg(OSUtils::jsonParse(body));
std::string tmp;
if (dbImg.is_object()) {
+ _db.clear();
for(nlohmann::json::iterator i(dbImg.begin());i!=dbImg.end();++i) {
if (i.value().is_object()) {
tmp = i.key();
_db[tmp].obj = i.value();
}
}
- return true;
+ _ready = true;
}
} catch ( ... ) {} // invalid JSON, so maybe incomplete request
}
- return false;
} else {
+ _ready = true;
std::vector<std::string> dl(OSUtils::listDirectory(p.c_str(),true));
for(std::vector<std::string>::const_iterator di(dl.begin());di!=dl.end();++di) {
if ((di->length() > 5)&&(di->substr(di->length() - 5) == ".json")) {
@@ -181,7 +187,6 @@ bool JSONDB::_reload(const std::string &p,const std::string &b)
this->_reload((p + ZT_PATH_SEPARATOR + *di),(b + *di + ZT_PATH_SEPARATOR));
}
}
- return true;
}
}
diff --git a/controller/JSONDB.hpp b/controller/JSONDB.hpp
index 2d3a5224..a045d1b4 100644
--- a/controller/JSONDB.hpp
+++ b/controller/JSONDB.hpp
@@ -72,29 +72,28 @@ public:
template<typename F>
inline void filter(const std::string &prefix,F func)
{
- Mutex::Lock _l(_db_m);
-
while (!_ready) {
Thread::sleep(250);
- _ready = _reload(_basePath,std::string());
+ _reload(_basePath,std::string());
}
-
- for(std::map<std::string,_E>::iterator i(_db.lower_bound(prefix));i!=_db.end();) {
- if ((i->first.length() >= prefix.length())&&(!memcmp(i->first.data(),prefix.data(),prefix.length()))) {
- if (!func(i->first,get(i->first))) {
- std::map<std::string,_E>::iterator i2(i); ++i2;
- this->erase(i->first);
- i = i2;
- } else ++i;
- } else break;
+ {
+ Mutex::Lock _l(_db_m);
+ for(std::map<std::string,_E>::iterator i(_db.lower_bound(prefix));i!=_db.end();) {
+ if ((i->first.length() >= prefix.length())&&(!memcmp(i->first.data(),prefix.data(),prefix.length()))) {
+ if (!func(i->first,i->second.obj)) {
+ this->_erase(i->first);
+ _db.erase(i++);
+ } else {
+ ++i;
+ }
+ } else break;
+ }
}
}
- inline bool operator==(const JSONDB &db) const { return ((_basePath == db._basePath)&&(_db == db._db)); }
- inline bool operator!=(const JSONDB &db) const { return (!(*this == db)); }
-
private:
- bool _reload(const std::string &p,const std::string &b);
+ void _erase(const std::string &n);
+ void _reload(const std::string &p,const std::string &b);
bool _isValidObjectName(const std::string &n);
std::string _genPath(const std::string &n,bool create);
diff --git a/node/Node.cpp b/node/Node.cpp
index 2b3f7996..ccbe9411 100644
--- a/node/Node.cpp
+++ b/node/Node.cpp
@@ -490,7 +490,8 @@ int Node::sendUserMessage(void *tptr,uint64_t dest,uint64_t typeId,const void *d
void Node::setNetconfMaster(void *networkControllerInstance)
{
RR->localNetworkController = reinterpret_cast<NetworkController *>(networkControllerInstance);
- RR->localNetworkController->init(RR->identity,this);
+ if (networkControllerInstance)
+ RR->localNetworkController->init(RR->identity,this);
}
ZT_ResultCode Node::circuitTestBegin(void *tptr,ZT_CircuitTest *test,void (*reportCallback)(ZT_Node *,ZT_CircuitTest *,const ZT_CircuitTestReport *))
diff --git a/osdep/Thread.hpp b/osdep/Thread.hpp
index 227c2cfe..5423a8ab 100644
--- a/osdep/Thread.hpp
+++ b/osdep/Thread.hpp
@@ -46,7 +46,6 @@ class Thread
{
public:
Thread()
- throw()
{
_th = NULL;
_tid = 0;
@@ -54,7 +53,6 @@ public:
template<typename C>
static inline Thread start(C *instance)
- throw(std::runtime_error)
{
Thread t;
t._th = CreateThread(NULL,0,&___zt_threadMain<C>,(LPVOID)instance,0,&t._tid);
@@ -88,7 +86,7 @@ public:
CancelSynchronousIo(t._th);
}
- inline operator bool() const throw() { return (_th != NULL); }
+ inline operator bool() const { return (_th != NULL); }
private:
HANDLE _th;
@@ -123,33 +121,18 @@ class Thread
{
public:
Thread()
- throw()
{
- memset(&_tid,0,sizeof(_tid));
- pthread_attr_init(&_tattr);
- // This corrects for systems with abnormally small defaults (musl) and also
- // shrinks the stack on systems with large defaults to save a bit of memory.
- pthread_attr_setstacksize(&_tattr,ZT_THREAD_MIN_STACK_SIZE);
- _started = false;
- }
-
- ~Thread()
- {
- pthread_attr_destroy(&_tattr);
+ memset(this,0,sizeof(Thread));
}
Thread(const Thread &t)
- throw()
{
- memcpy(&_tid,&(t._tid),sizeof(_tid));
- _started = t._started;
+ memcpy(this,&t,sizeof(Thread));
}
inline Thread &operator=(const Thread &t)
- throw()
{
- memcpy(&_tid,&(t._tid),sizeof(_tid));
- _started = t._started;
+ memcpy(this,&t,sizeof(Thread));
return *this;
}
@@ -163,12 +146,20 @@ public:
*/
template<typename C>
static inline Thread start(C *instance)
- throw(std::runtime_error)
{
Thread t;
- t._started = true;
- if (pthread_create(&t._tid,&t._tattr,&___zt_threadMain<C>,instance))
+ pthread_attr_t tattr;
+ pthread_attr_init(&tattr);
+ // This corrects for systems with abnormally small defaults (musl) and also
+ // shrinks the stack on systems with large defaults to save a bit of memory.
+ pthread_attr_setstacksize(&tattr,ZT_THREAD_MIN_STACK_SIZE);
+ if (pthread_create(&t._tid,&tattr,&___zt_threadMain<C>,instance)) {
+ pthread_attr_destroy(&tattr);
throw std::runtime_error("pthread_create() failed, unable to create thread");
+ } else {
+ t._started = true;
+ pthread_attr_destroy(&tattr);
+ }
return t;
}
@@ -190,11 +181,10 @@ public:
*/
static inline void sleep(unsigned long ms) { usleep(ms * 1000); }
- inline operator bool() const throw() { return (_started); }
+ inline operator bool() const { return (_started); }
private:
pthread_t _tid;
- pthread_attr_t _tattr;
volatile bool _started;
};