diff options
Diffstat (limited to 'controller/JSONDB.cpp')
-rw-r--r-- | controller/JSONDB.cpp | 194 |
1 files changed, 112 insertions, 82 deletions
diff --git a/controller/JSONDB.cpp b/controller/JSONDB.cpp index ad9ba248..216470b4 100644 --- a/controller/JSONDB.cpp +++ b/controller/JSONDB.cpp @@ -41,7 +41,8 @@ JSONDB::JSONDB(const std::string &basePath) : _basePath(basePath), _rawInput(-1), _rawOutput(-1), - _summaryThreadRun(true) + _summaryThreadRun(true), + _dataReady(false) { if ((_basePath.length() > 7)&&(_basePath.substr(0,7) == "http://")) { // If base path is http:// we run in HTTP mode @@ -75,6 +76,8 @@ JSONDB::JSONDB(const std::string &basePath) : OSUtils::lockDownFile(_basePath.c_str(),true); // networks might contain auth tokens, etc., so restrict directory permissions } + _networks_m.lock(); // locked until data is loaded, etc. + if (_rawInput < 0) { unsigned int cnt = 0; while (!_load(_basePath)) { @@ -82,27 +85,25 @@ JSONDB::JSONDB(const std::string &basePath) : fprintf(stderr,"WARNING: controller still waiting to read '%s'..." ZT_EOL_S,_basePath.c_str()); Thread::sleep(250); } - } - for(std::unordered_map<uint64_t,_NW>::iterator n(_networks.begin());n!=_networks.end();++n) - _recomputeSummaryInfo(n->first); - for(;;) { - _summaryThread_m.lock(); - if (_summaryThreadToDo.empty()) { - _summaryThread_m.unlock(); - break; + for(std::unordered_map<uint64_t,_NW>::iterator n(_networks.begin());n!=_networks.end();++n) + _summaryThreadToDo.push_back(n->first); + + if (_summaryThreadToDo.size() > 0) { + _summaryThread = Thread::start(this); + } else { + _dataReady = true; + _networks_m.unlock(); } - _summaryThread_m.unlock(); - Thread::sleep(50); + } else { + // In IPC mode we wait for the first message to start, and we start + // this thread since this thread is responsible for reading from stdin. + _summaryThread = Thread::start(this); } } JSONDB::~JSONDB() { - { - Mutex::Lock _l(_networks_m); - _networks.clear(); - } Thread t; { Mutex::Lock _l(_summaryThread_m); @@ -119,11 +120,11 @@ bool JSONDB::writeRaw(const std::string &n,const std::string &obj) #ifndef __WINDOWS__ if (obj.length() > 0) { Mutex::Lock _l(_rawLock); - if (write(_rawOutput,obj.c_str(),obj.length() + 1) > 0) - return true; - } else { - return true; - } + if ((long)write(_rawOutput,obj.data(),obj.length()) == (long)obj.length()) { + if (write(_rawOutput,"\n",1) == 1) + return true; + } + } else return true; #endif return false; } else if (_httpAddr) { @@ -202,7 +203,7 @@ void JSONDB::saveNetwork(const uint64_t networkId,const nlohmann::json &networkC { char n[64]; OSUtils::ztsnprintf(n,sizeof(n),"network/%.16llx",(unsigned long long)networkId); - writeRaw(n,OSUtils::jsonDump(networkConfig)); + writeRaw(n,OSUtils::jsonDump(networkConfig,-1)); { Mutex::Lock _l(_networks_m); _networks[networkId].config = nlohmann::json::to_msgpack(networkConfig); @@ -214,7 +215,7 @@ void JSONDB::saveNetworkMember(const uint64_t networkId,const uint64_t nodeId,co { char n[256]; OSUtils::ztsnprintf(n,sizeof(n),"network/%.16llx/member/%.10llx",(unsigned long long)networkId,(unsigned long long)nodeId); - writeRaw(n,OSUtils::jsonDump(memberConfig)); + writeRaw(n,OSUtils::jsonDump(memberConfig,-1)); { Mutex::Lock _l(_networks_m); _networks[networkId].members[nodeId] = nlohmann::json::to_msgpack(memberConfig); @@ -310,6 +311,7 @@ void JSONDB::threadMain() std::string rawInputBuf; FD_ZERO(&readfds); FD_ZERO(&nullfds); + struct timeval tv; #endif std::vector<uint64_t> todo; @@ -317,24 +319,42 @@ void JSONDB::threadMain() while (_summaryThreadRun) { #ifndef __WINDOWS__ if (_rawInput < 0) { + // In HTTP and filesystem mode we just wait for summary to-do items Thread::sleep(25); } else { + // In IPC mode we wait but also select() on STDIN to read database updates FD_SET(_rawInput,&readfds); - struct timeval tv; tv.tv_sec = 0; tv.tv_usec = 25000; select(_rawInput+1,&readfds,&nullfds,&nullfds,&tv); if (FD_ISSET(_rawInput,&readfds)) { const long rn = (long)read(_rawInput,readbuf,1048576); + bool gotMessage = false; for(long i=0;i<rn;++i) { - if (readbuf[i]) { + if ((readbuf[i] != '\n')&&(readbuf[i] != '\r')&&(readbuf[i] != 0)) { // compatible with nodeJS IPC rawInputBuf.push_back(readbuf[i]); } else if (rawInputBuf.length() > 0) { - _add(OSUtils::jsonParse(rawInputBuf)); + try { + const nlohmann::json obj(OSUtils::jsonParse(rawInputBuf)); + + gotMessage = true; + if (!_dataReady) { + _dataReady = true; + _networks_m.unlock(); + } + + if (obj.is_array()) { + for(unsigned long i=0;i<obj.size();++i) + _add(obj[i]); + } else if (obj.is_object()) { + _add(obj); + } + } catch ( ... ) {} // ignore malformed JSON rawInputBuf.clear(); } } - continue; // we only want to do the stuff below this every few dozen ms or so, so pause again + if (!gotMessage) // select() again immediately until we get at least one full message + continue; } } #else @@ -347,73 +367,83 @@ void JSONDB::threadMain() continue; else _summaryThreadToDo.swap(todo); } - const uint64_t now = OSUtils::now(); - for(std::vector<uint64_t>::iterator ii(todo.begin());ii!=todo.end();++ii) { - const uint64_t networkId = *ii; + if (!_dataReady) { + _dataReady = true; + _networks_m.unlock(); + } + + const uint64_t now = OSUtils::now(); + try { Mutex::Lock _l(_networks_m); - std::unordered_map<uint64_t,_NW>::iterator n(_networks.find(networkId)); - if (n != _networks.end()) { - NetworkSummaryInfo &ns = n->second.summaryInfo; - ns.activeBridges.clear(); - ns.allocatedIps.clear(); - ns.authorizedMemberCount = 0; - ns.activeMemberCount = 0; - ns.totalMemberCount = 0; - ns.mostRecentDeauthTime = 0; - - for(std::unordered_map< uint64_t,std::vector<uint8_t> >::const_iterator m(n->second.members.begin());m!=n->second.members.end();++m) { - try { - nlohmann::json member(nlohmann::json::from_msgpack(m->second)); - - if (OSUtils::jsonBool(member["authorized"],false)) { - ++ns.authorizedMemberCount; - - try { - const nlohmann::json &mlog = member["recentLog"]; - if ((mlog.is_array())&&(mlog.size() > 0)) { - const nlohmann::json &mlog1 = mlog[0]; - if (mlog1.is_object()) { - if ((now - OSUtils::jsonInt(mlog1["ts"],0ULL)) < (ZT_NETWORK_AUTOCONF_DELAY * 2)) - ++ns.activeMemberCount; + for(std::vector<uint64_t>::iterator ii(todo.begin());ii!=todo.end();++ii) { + const uint64_t networkId = *ii; + std::unordered_map<uint64_t,_NW>::iterator n(_networks.find(networkId)); + if (n != _networks.end()) { + NetworkSummaryInfo &ns = n->second.summaryInfo; + ns.activeBridges.clear(); + ns.allocatedIps.clear(); + ns.authorizedMemberCount = 0; + ns.activeMemberCount = 0; + ns.totalMemberCount = 0; + ns.mostRecentDeauthTime = 0; + + for(std::unordered_map< uint64_t,std::vector<uint8_t> >::const_iterator m(n->second.members.begin());m!=n->second.members.end();++m) { + try { + nlohmann::json member(nlohmann::json::from_msgpack(m->second)); + + if (OSUtils::jsonBool(member["authorized"],false)) { + ++ns.authorizedMemberCount; + + try { + const nlohmann::json &mlog = member["recentLog"]; + if ((mlog.is_array())&&(mlog.size() > 0)) { + const nlohmann::json &mlog1 = mlog[0]; + if (mlog1.is_object()) { + if ((now - OSUtils::jsonInt(mlog1["ts"],0ULL)) < (ZT_NETWORK_AUTOCONF_DELAY * 2)) + ++ns.activeMemberCount; + } } - } - } catch ( ... ) {} - - try { - if (OSUtils::jsonBool(member["activeBridge"],false)) - ns.activeBridges.push_back(Address(m->first)); - } catch ( ... ) {} - - try { - const nlohmann::json &mips = member["ipAssignments"]; - if (mips.is_array()) { - for(unsigned long i=0;i<mips.size();++i) { - InetAddress mip(OSUtils::jsonString(mips[i],"").c_str()); - if ((mip.ss_family == AF_INET)||(mip.ss_family == AF_INET6)) - ns.allocatedIps.push_back(mip); + } catch ( ... ) {} + + try { + if (OSUtils::jsonBool(member["activeBridge"],false)) + ns.activeBridges.push_back(Address(m->first)); + } catch ( ... ) {} + + try { + const nlohmann::json &mips = member["ipAssignments"]; + if (mips.is_array()) { + for(unsigned long i=0;i<mips.size();++i) { + InetAddress mip(OSUtils::jsonString(mips[i],"").c_str()); + if ((mip.ss_family == AF_INET)||(mip.ss_family == AF_INET6)) + ns.allocatedIps.push_back(mip); + } } - } - } catch ( ... ) {} - } else { - try { - ns.mostRecentDeauthTime = std::max(ns.mostRecentDeauthTime,OSUtils::jsonInt(member["lastDeauthorizedTime"],0ULL)); - } catch ( ... ) {} - } - ++ns.totalMemberCount; - } catch ( ... ) {} - } + } catch ( ... ) {} + } else { + try { + ns.mostRecentDeauthTime = std::max(ns.mostRecentDeauthTime,OSUtils::jsonInt(member["lastDeauthorizedTime"],0ULL)); + } catch ( ... ) {} + } + ++ns.totalMemberCount; + } catch ( ... ) {} + } - std::sort(ns.activeBridges.begin(),ns.activeBridges.end()); - std::sort(ns.allocatedIps.begin(),ns.allocatedIps.end()); + std::sort(ns.activeBridges.begin(),ns.activeBridges.end()); + std::sort(ns.allocatedIps.begin(),ns.allocatedIps.end()); - n->second.summaryInfoLastComputed = now; + n->second.summaryInfoLastComputed = now; + } } - } + } catch ( ... ) {} todo.clear(); } + if (!_dataReady) // sanity check + _networks_m.unlock(); + #ifndef __WINDOWS__ delete [] readbuf; #endif |