From b62296a40bd4ed2d01404679cdc9512a1f18bcca Mon Sep 17 00:00:00 2001 From: Adam Ierymenko Date: Wed, 19 Jul 2017 14:13:17 -0700 Subject: Bug fixes in new harness mode. --- controller/EmbeddedNetworkController.cpp | 20 ++-- controller/JSONDB.cpp | 194 ++++++++++++++++++------------- controller/JSONDB.hpp | 12 ++ osdep/OSUtils.cpp | 2 +- osdep/OSUtils.hpp | 2 +- service/OneService.cpp | 29 ++--- 6 files changed, 147 insertions(+), 112 deletions(-) diff --git a/controller/EmbeddedNetworkController.cpp b/controller/EmbeddedNetworkController.cpp index cb0c05af..70691fe0 100644 --- a/controller/EmbeddedNetworkController.cpp +++ b/controller/EmbeddedNetworkController.cpp @@ -1145,7 +1145,7 @@ void EmbeddedNetworkController::handleRemoteTrace(const ZT_RemoteTrace &rt) if (accept) { char p[128]; OSUtils::ztsnprintf(p,sizeof(p),"trace/%s",id); - _db.writeRaw(p,OSUtils::jsonDump(d)); + _db.writeRaw(p,OSUtils::jsonDump(d,-1)); } } catch ( ... ) { // drop invalid trace messages if an error occurs @@ -1157,17 +1157,24 @@ void EmbeddedNetworkController::threadMain() { char tmp[256]; _RQEntry *qe = (_RQEntry *)0; - while ((_running)&&(_queue.get(qe,1000) != BlockingQueue<_RQEntry *>::STOP)) { + while (_running) { + const BlockingQueue<_RQEntry *>::TimedWaitResult wr = _queue.get(qe,1000); + if ((wr == BlockingQueue<_RQEntry *>::STOP)||(!_running)) + break; + try { - if (qe->type == _RQEntry::RQENTRY_TYPE_REQUEST) + if ((wr == BlockingQueue<_RQEntry *>::OK)&&(qe->type == _RQEntry::RQENTRY_TYPE_REQUEST)) { _request(qe->nwid,qe->fromAddr,qe->requestPacketId,qe->identity,qe->metaData); + delete qe; + } // Every 10s we update a 'status' containing member online state, etc. const uint64_t now = OSUtils::now(); if ((now - _lastDumpedStatus) >= 10000) { _lastDumpedStatus = now; bool first = true; - std::string st("{\"id\":\"status\",\"objtype\":\"status\",\"memberStatus\":{"); + OSUtils::ztsnprintf(tmp,sizeof(tmp),"{\"id\":\"%.10llx-status\",\"objtype\":\"status\",\"memberStatus\":[",_signingId.address().toInt()); + std::string st(tmp); { Mutex::Lock _l(_memberStatus_m); st.reserve(48 * (_memberStatus.size() + 1)); @@ -1176,7 +1183,7 @@ void EmbeddedNetworkController::threadMain() auto ms = this->_memberStatus.find(_MemberStatusKey(networkId,nodeId)); if (ms != _memberStatus.end()) lrt = ms->second.lastRequestTime; - OSUtils::ztsnprintf(tmp,sizeof(tmp),"%s\"%.16llx-%.10llx\":%llu", + OSUtils::ztsnprintf(tmp,sizeof(tmp),"%s\"%.16llx\",%llu,%llu", (first) ? "" : ",", (unsigned long long)networkId, (unsigned long long)nodeId, @@ -1185,12 +1192,11 @@ void EmbeddedNetworkController::threadMain() first = false; }); } - OSUtils::ztsnprintf(tmp,sizeof(tmp),"},\"clock\":%llu,\"startTime\":%llu}",(unsigned long long)now,(unsigned long long)_startTime); + OSUtils::ztsnprintf(tmp,sizeof(tmp),"],\"clock\":%llu,\"startTime\":%llu,\"uptime\":%llu}",(unsigned long long)now,(unsigned long long)_startTime,(unsigned long long)(now - _startTime)); st.append(tmp); _db.writeRaw("status",st); } } catch ( ... ) {} - delete qe; } } diff --git a/controller/JSONDB.cpp b/controller/JSONDB.cpp index ad9ba248..216470b4 100644 --- a/controller/JSONDB.cpp +++ b/controller/JSONDB.cpp @@ -41,7 +41,8 @@ JSONDB::JSONDB(const std::string &basePath) : _basePath(basePath), _rawInput(-1), _rawOutput(-1), - _summaryThreadRun(true) + _summaryThreadRun(true), + _dataReady(false) { if ((_basePath.length() > 7)&&(_basePath.substr(0,7) == "http://")) { // If base path is http:// we run in HTTP mode @@ -75,6 +76,8 @@ JSONDB::JSONDB(const std::string &basePath) : OSUtils::lockDownFile(_basePath.c_str(),true); // networks might contain auth tokens, etc., so restrict directory permissions } + _networks_m.lock(); // locked until data is loaded, etc. + if (_rawInput < 0) { unsigned int cnt = 0; while (!_load(_basePath)) { @@ -82,27 +85,25 @@ JSONDB::JSONDB(const std::string &basePath) : fprintf(stderr,"WARNING: controller still waiting to read '%s'..." ZT_EOL_S,_basePath.c_str()); Thread::sleep(250); } - } - for(std::unordered_map::iterator n(_networks.begin());n!=_networks.end();++n) - _recomputeSummaryInfo(n->first); - for(;;) { - _summaryThread_m.lock(); - if (_summaryThreadToDo.empty()) { - _summaryThread_m.unlock(); - break; + for(std::unordered_map::iterator n(_networks.begin());n!=_networks.end();++n) + _summaryThreadToDo.push_back(n->first); + + if (_summaryThreadToDo.size() > 0) { + _summaryThread = Thread::start(this); + } else { + _dataReady = true; + _networks_m.unlock(); } - _summaryThread_m.unlock(); - Thread::sleep(50); + } else { + // In IPC mode we wait for the first message to start, and we start + // this thread since this thread is responsible for reading from stdin. + _summaryThread = Thread::start(this); } } JSONDB::~JSONDB() { - { - Mutex::Lock _l(_networks_m); - _networks.clear(); - } Thread t; { Mutex::Lock _l(_summaryThread_m); @@ -119,11 +120,11 @@ bool JSONDB::writeRaw(const std::string &n,const std::string &obj) #ifndef __WINDOWS__ if (obj.length() > 0) { Mutex::Lock _l(_rawLock); - if (write(_rawOutput,obj.c_str(),obj.length() + 1) > 0) - return true; - } else { - return true; - } + if ((long)write(_rawOutput,obj.data(),obj.length()) == (long)obj.length()) { + if (write(_rawOutput,"\n",1) == 1) + return true; + } + } else return true; #endif return false; } else if (_httpAddr) { @@ -202,7 +203,7 @@ void JSONDB::saveNetwork(const uint64_t networkId,const nlohmann::json &networkC { char n[64]; OSUtils::ztsnprintf(n,sizeof(n),"network/%.16llx",(unsigned long long)networkId); - writeRaw(n,OSUtils::jsonDump(networkConfig)); + writeRaw(n,OSUtils::jsonDump(networkConfig,-1)); { Mutex::Lock _l(_networks_m); _networks[networkId].config = nlohmann::json::to_msgpack(networkConfig); @@ -214,7 +215,7 @@ void JSONDB::saveNetworkMember(const uint64_t networkId,const uint64_t nodeId,co { char n[256]; OSUtils::ztsnprintf(n,sizeof(n),"network/%.16llx/member/%.10llx",(unsigned long long)networkId,(unsigned long long)nodeId); - writeRaw(n,OSUtils::jsonDump(memberConfig)); + writeRaw(n,OSUtils::jsonDump(memberConfig,-1)); { Mutex::Lock _l(_networks_m); _networks[networkId].members[nodeId] = nlohmann::json::to_msgpack(memberConfig); @@ -310,6 +311,7 @@ void JSONDB::threadMain() std::string rawInputBuf; FD_ZERO(&readfds); FD_ZERO(&nullfds); + struct timeval tv; #endif std::vector todo; @@ -317,24 +319,42 @@ void JSONDB::threadMain() while (_summaryThreadRun) { #ifndef __WINDOWS__ if (_rawInput < 0) { + // In HTTP and filesystem mode we just wait for summary to-do items Thread::sleep(25); } else { + // In IPC mode we wait but also select() on STDIN to read database updates FD_SET(_rawInput,&readfds); - struct timeval tv; tv.tv_sec = 0; tv.tv_usec = 25000; select(_rawInput+1,&readfds,&nullfds,&nullfds,&tv); if (FD_ISSET(_rawInput,&readfds)) { const long rn = (long)read(_rawInput,readbuf,1048576); + bool gotMessage = false; for(long i=0;i 0) { - _add(OSUtils::jsonParse(rawInputBuf)); + try { + const nlohmann::json obj(OSUtils::jsonParse(rawInputBuf)); + + gotMessage = true; + if (!_dataReady) { + _dataReady = true; + _networks_m.unlock(); + } + + if (obj.is_array()) { + for(unsigned long i=0;i::iterator ii(todo.begin());ii!=todo.end();++ii) { - const uint64_t networkId = *ii; + if (!_dataReady) { + _dataReady = true; + _networks_m.unlock(); + } + + const uint64_t now = OSUtils::now(); + try { Mutex::Lock _l(_networks_m); - std::unordered_map::iterator n(_networks.find(networkId)); - if (n != _networks.end()) { - NetworkSummaryInfo &ns = n->second.summaryInfo; - ns.activeBridges.clear(); - ns.allocatedIps.clear(); - ns.authorizedMemberCount = 0; - ns.activeMemberCount = 0; - ns.totalMemberCount = 0; - ns.mostRecentDeauthTime = 0; - - for(std::unordered_map< uint64_t,std::vector >::const_iterator m(n->second.members.begin());m!=n->second.members.end();++m) { - try { - nlohmann::json member(nlohmann::json::from_msgpack(m->second)); - - if (OSUtils::jsonBool(member["authorized"],false)) { - ++ns.authorizedMemberCount; - - try { - const nlohmann::json &mlog = member["recentLog"]; - if ((mlog.is_array())&&(mlog.size() > 0)) { - const nlohmann::json &mlog1 = mlog[0]; - if (mlog1.is_object()) { - if ((now - OSUtils::jsonInt(mlog1["ts"],0ULL)) < (ZT_NETWORK_AUTOCONF_DELAY * 2)) - ++ns.activeMemberCount; + for(std::vector::iterator ii(todo.begin());ii!=todo.end();++ii) { + const uint64_t networkId = *ii; + std::unordered_map::iterator n(_networks.find(networkId)); + if (n != _networks.end()) { + NetworkSummaryInfo &ns = n->second.summaryInfo; + ns.activeBridges.clear(); + ns.allocatedIps.clear(); + ns.authorizedMemberCount = 0; + ns.activeMemberCount = 0; + ns.totalMemberCount = 0; + ns.mostRecentDeauthTime = 0; + + for(std::unordered_map< uint64_t,std::vector >::const_iterator m(n->second.members.begin());m!=n->second.members.end();++m) { + try { + nlohmann::json member(nlohmann::json::from_msgpack(m->second)); + + if (OSUtils::jsonBool(member["authorized"],false)) { + ++ns.authorizedMemberCount; + + try { + const nlohmann::json &mlog = member["recentLog"]; + if ((mlog.is_array())&&(mlog.size() > 0)) { + const nlohmann::json &mlog1 = mlog[0]; + if (mlog1.is_object()) { + if ((now - OSUtils::jsonInt(mlog1["ts"],0ULL)) < (ZT_NETWORK_AUTOCONF_DELAY * 2)) + ++ns.activeMemberCount; + } } - } - } catch ( ... ) {} - - try { - if (OSUtils::jsonBool(member["activeBridge"],false)) - ns.activeBridges.push_back(Address(m->first)); - } catch ( ... ) {} - - try { - const nlohmann::json &mips = member["ipAssignments"]; - if (mips.is_array()) { - for(unsigned long i=0;ifirst)); + } catch ( ... ) {} + + try { + const nlohmann::json &mips = member["ipAssignments"]; + if (mips.is_array()) { + for(unsigned long i=0;isecond.summaryInfoLastComputed = now; + n->second.summaryInfoLastComputed = now; + } } - } + } catch ( ... ) {} todo.clear(); } + if (!_dataReady) // sanity check + _networks_m.unlock(); + #ifndef __WINDOWS__ delete [] readbuf; #endif diff --git a/controller/JSONDB.hpp b/controller/JSONDB.hpp index 23d00a51..7131b0c1 100644 --- a/controller/JSONDB.hpp +++ b/controller/JSONDB.hpp @@ -62,6 +62,17 @@ public: JSONDB(const std::string &basePath); ~JSONDB(); + /** + * Write a JSON object to the data store + * + * It's important that obj contain a valid JSON object with no newlines (jsonDump with -1 + * for indentation), since newline-delimited JSON is what nodeJS's IPC speaks and this + * is important in Central-harnessed mode. + * + * @param n Path name of object + * @param obj Object in single-line no-CRs JSON object format (OSUtils::jsonDump(obj,-1)) + * @return True if write appears successful + */ bool writeRaw(const std::string &n,const std::string &obj); bool hasNetwork(const uint64_t networkId) const; @@ -171,6 +182,7 @@ private: std::unordered_map< uint64_t,_NW > _networks; std::unordered_map< uint64_t,std::unordered_set< uint64_t > > _members; + bool _dataReady; Mutex _networks_m; }; diff --git a/osdep/OSUtils.cpp b/osdep/OSUtils.cpp index 882b8255..c1edc353 100644 --- a/osdep/OSUtils.cpp +++ b/osdep/OSUtils.cpp @@ -434,7 +434,7 @@ std::string OSUtils::platformDefaultHomePath() // Inline these massive JSON operations in one place only to reduce binary footprint and compile time nlohmann::json OSUtils::jsonParse(const std::string &buf) { return nlohmann::json::parse(buf.c_str()); } -std::string OSUtils::jsonDump(const nlohmann::json &j) { return j.dump(1); } +std::string OSUtils::jsonDump(const nlohmann::json &j,int indentation) { return j.dump(indentation); } uint64_t OSUtils::jsonInt(const nlohmann::json &jv,const uint64_t dfl) { diff --git a/osdep/OSUtils.hpp b/osdep/OSUtils.hpp index d6f32822..8683ba25 100644 --- a/osdep/OSUtils.hpp +++ b/osdep/OSUtils.hpp @@ -276,7 +276,7 @@ public: static std::string platformDefaultHomePath(); static nlohmann::json jsonParse(const std::string &buf); - static std::string jsonDump(const nlohmann::json &j); + static std::string jsonDump(const nlohmann::json &j,int indentation = 1); static uint64_t jsonInt(const nlohmann::json &jv,const uint64_t dfl); static bool jsonBool(const nlohmann::json &jv,const bool dfl); static std::string jsonString(const nlohmann::json &jv,const char *dfl); diff --git a/service/OneService.cpp b/service/OneService.cpp index 115830e5..27f2ef3c 100644 --- a/service/OneService.cpp +++ b/service/OneService.cpp @@ -629,6 +629,14 @@ public: } } + // Allow controller DB path to be put somewhere else + json &settings = _localConfig["settings"]; + if (settings.is_object()) { + const std::string cdbp(OSUtils::jsonString(settings["controllerDbPath"],"")); + if (cdbp.length() > 0) + _controllerDbPath = cdbp; + } + // Set trusted paths if there are any if (trustedPathCount) _node->setTrustedPaths(reinterpret_cast(trustedPathNetworks),trustedPathIds,trustedPathCount); @@ -1484,27 +1492,6 @@ public: _allowManagementFrom.push_back(nw); } } - - json &controllerDbHttpHost = settings["controllerDbHttpHost"]; - json &controllerDbHttpPort = settings["controllerDbHttpPort"]; - json &controllerDbHttpPath = settings["controllerDbHttpPath"]; - if ((controllerDbHttpHost.is_string())&&(controllerDbHttpPort.is_number())) { - _controllerDbPath = "http://"; - std::string h = controllerDbHttpHost; - _controllerDbPath.append(h); - char dbp[128]; - OSUtils::ztsnprintf(dbp,sizeof(dbp),"%d",(int)controllerDbHttpPort); - _controllerDbPath.push_back(':'); - _controllerDbPath.append(dbp); - if (controllerDbHttpPath.is_string()) { - std::string p = controllerDbHttpPath; - if ((p.length() == 0)||(p[0] != '/')) - _controllerDbPath.push_back('/'); - _controllerDbPath.append(p); - } else { - _controllerDbPath.push_back('/'); - } - } } // Checks if a managed IP or route target is allowed -- cgit v1.2.3