summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAdam Ierymenko <adam.ierymenko@gmail.com>2017-07-19 14:13:17 -0700
committerAdam Ierymenko <adam.ierymenko@gmail.com>2017-07-19 14:13:17 -0700
commitb62296a40bd4ed2d01404679cdc9512a1f18bcca (patch)
tree4a0d32583785537107351f162839bcf9b591c67c
parent31785f7f6ec27e826efc3cc2b45979e5d58f37bb (diff)
downloadinfinitytier-b62296a40bd4ed2d01404679cdc9512a1f18bcca.tar.gz
infinitytier-b62296a40bd4ed2d01404679cdc9512a1f18bcca.zip
Bug fixes in new harness mode.
-rw-r--r--controller/EmbeddedNetworkController.cpp20
-rw-r--r--controller/JSONDB.cpp194
-rw-r--r--controller/JSONDB.hpp12
-rw-r--r--osdep/OSUtils.cpp2
-rw-r--r--osdep/OSUtils.hpp2
-rw-r--r--service/OneService.cpp29
6 files changed, 147 insertions, 112 deletions
diff --git a/controller/EmbeddedNetworkController.cpp b/controller/EmbeddedNetworkController.cpp
index cb0c05af..70691fe0 100644
--- a/controller/EmbeddedNetworkController.cpp
+++ b/controller/EmbeddedNetworkController.cpp
@@ -1145,7 +1145,7 @@ void EmbeddedNetworkController::handleRemoteTrace(const ZT_RemoteTrace &rt)
if (accept) {
char p[128];
OSUtils::ztsnprintf(p,sizeof(p),"trace/%s",id);
- _db.writeRaw(p,OSUtils::jsonDump(d));
+ _db.writeRaw(p,OSUtils::jsonDump(d,-1));
}
} catch ( ... ) {
// drop invalid trace messages if an error occurs
@@ -1157,17 +1157,24 @@ void EmbeddedNetworkController::threadMain()
{
char tmp[256];
_RQEntry *qe = (_RQEntry *)0;
- while ((_running)&&(_queue.get(qe,1000) != BlockingQueue<_RQEntry *>::STOP)) {
+ while (_running) {
+ const BlockingQueue<_RQEntry *>::TimedWaitResult wr = _queue.get(qe,1000);
+ if ((wr == BlockingQueue<_RQEntry *>::STOP)||(!_running))
+ break;
+
try {
- if (qe->type == _RQEntry::RQENTRY_TYPE_REQUEST)
+ if ((wr == BlockingQueue<_RQEntry *>::OK)&&(qe->type == _RQEntry::RQENTRY_TYPE_REQUEST)) {
_request(qe->nwid,qe->fromAddr,qe->requestPacketId,qe->identity,qe->metaData);
+ delete qe;
+ }
// Every 10s we update a 'status' containing member online state, etc.
const uint64_t now = OSUtils::now();
if ((now - _lastDumpedStatus) >= 10000) {
_lastDumpedStatus = now;
bool first = true;
- std::string st("{\"id\":\"status\",\"objtype\":\"status\",\"memberStatus\":{");
+ OSUtils::ztsnprintf(tmp,sizeof(tmp),"{\"id\":\"%.10llx-status\",\"objtype\":\"status\",\"memberStatus\":[",_signingId.address().toInt());
+ std::string st(tmp);
{
Mutex::Lock _l(_memberStatus_m);
st.reserve(48 * (_memberStatus.size() + 1));
@@ -1176,7 +1183,7 @@ void EmbeddedNetworkController::threadMain()
auto ms = this->_memberStatus.find(_MemberStatusKey(networkId,nodeId));
if (ms != _memberStatus.end())
lrt = ms->second.lastRequestTime;
- OSUtils::ztsnprintf(tmp,sizeof(tmp),"%s\"%.16llx-%.10llx\":%llu",
+ OSUtils::ztsnprintf(tmp,sizeof(tmp),"%s\"%.16llx\",%llu,%llu",
(first) ? "" : ",",
(unsigned long long)networkId,
(unsigned long long)nodeId,
@@ -1185,12 +1192,11 @@ void EmbeddedNetworkController::threadMain()
first = false;
});
}
- OSUtils::ztsnprintf(tmp,sizeof(tmp),"},\"clock\":%llu,\"startTime\":%llu}",(unsigned long long)now,(unsigned long long)_startTime);
+ OSUtils::ztsnprintf(tmp,sizeof(tmp),"],\"clock\":%llu,\"startTime\":%llu,\"uptime\":%llu}",(unsigned long long)now,(unsigned long long)_startTime,(unsigned long long)(now - _startTime));
st.append(tmp);
_db.writeRaw("status",st);
}
} catch ( ... ) {}
- delete qe;
}
}
diff --git a/controller/JSONDB.cpp b/controller/JSONDB.cpp
index ad9ba248..216470b4 100644
--- a/controller/JSONDB.cpp
+++ b/controller/JSONDB.cpp
@@ -41,7 +41,8 @@ JSONDB::JSONDB(const std::string &basePath) :
_basePath(basePath),
_rawInput(-1),
_rawOutput(-1),
- _summaryThreadRun(true)
+ _summaryThreadRun(true),
+ _dataReady(false)
{
if ((_basePath.length() > 7)&&(_basePath.substr(0,7) == "http://")) {
// If base path is http:// we run in HTTP mode
@@ -75,6 +76,8 @@ JSONDB::JSONDB(const std::string &basePath) :
OSUtils::lockDownFile(_basePath.c_str(),true); // networks might contain auth tokens, etc., so restrict directory permissions
}
+ _networks_m.lock(); // locked until data is loaded, etc.
+
if (_rawInput < 0) {
unsigned int cnt = 0;
while (!_load(_basePath)) {
@@ -82,27 +85,25 @@ JSONDB::JSONDB(const std::string &basePath) :
fprintf(stderr,"WARNING: controller still waiting to read '%s'..." ZT_EOL_S,_basePath.c_str());
Thread::sleep(250);
}
- }
- for(std::unordered_map<uint64_t,_NW>::iterator n(_networks.begin());n!=_networks.end();++n)
- _recomputeSummaryInfo(n->first);
- for(;;) {
- _summaryThread_m.lock();
- if (_summaryThreadToDo.empty()) {
- _summaryThread_m.unlock();
- break;
+ for(std::unordered_map<uint64_t,_NW>::iterator n(_networks.begin());n!=_networks.end();++n)
+ _summaryThreadToDo.push_back(n->first);
+
+ if (_summaryThreadToDo.size() > 0) {
+ _summaryThread = Thread::start(this);
+ } else {
+ _dataReady = true;
+ _networks_m.unlock();
}
- _summaryThread_m.unlock();
- Thread::sleep(50);
+ } else {
+ // In IPC mode we wait for the first message to start, and we start
+ // this thread since this thread is responsible for reading from stdin.
+ _summaryThread = Thread::start(this);
}
}
JSONDB::~JSONDB()
{
- {
- Mutex::Lock _l(_networks_m);
- _networks.clear();
- }
Thread t;
{
Mutex::Lock _l(_summaryThread_m);
@@ -119,11 +120,11 @@ bool JSONDB::writeRaw(const std::string &n,const std::string &obj)
#ifndef __WINDOWS__
if (obj.length() > 0) {
Mutex::Lock _l(_rawLock);
- if (write(_rawOutput,obj.c_str(),obj.length() + 1) > 0)
- return true;
- } else {
- return true;
- }
+ if ((long)write(_rawOutput,obj.data(),obj.length()) == (long)obj.length()) {
+ if (write(_rawOutput,"\n",1) == 1)
+ return true;
+ }
+ } else return true;
#endif
return false;
} else if (_httpAddr) {
@@ -202,7 +203,7 @@ void JSONDB::saveNetwork(const uint64_t networkId,const nlohmann::json &networkC
{
char n[64];
OSUtils::ztsnprintf(n,sizeof(n),"network/%.16llx",(unsigned long long)networkId);
- writeRaw(n,OSUtils::jsonDump(networkConfig));
+ writeRaw(n,OSUtils::jsonDump(networkConfig,-1));
{
Mutex::Lock _l(_networks_m);
_networks[networkId].config = nlohmann::json::to_msgpack(networkConfig);
@@ -214,7 +215,7 @@ void JSONDB::saveNetworkMember(const uint64_t networkId,const uint64_t nodeId,co
{
char n[256];
OSUtils::ztsnprintf(n,sizeof(n),"network/%.16llx/member/%.10llx",(unsigned long long)networkId,(unsigned long long)nodeId);
- writeRaw(n,OSUtils::jsonDump(memberConfig));
+ writeRaw(n,OSUtils::jsonDump(memberConfig,-1));
{
Mutex::Lock _l(_networks_m);
_networks[networkId].members[nodeId] = nlohmann::json::to_msgpack(memberConfig);
@@ -310,6 +311,7 @@ void JSONDB::threadMain()
std::string rawInputBuf;
FD_ZERO(&readfds);
FD_ZERO(&nullfds);
+ struct timeval tv;
#endif
std::vector<uint64_t> todo;
@@ -317,24 +319,42 @@ void JSONDB::threadMain()
while (_summaryThreadRun) {
#ifndef __WINDOWS__
if (_rawInput < 0) {
+ // In HTTP and filesystem mode we just wait for summary to-do items
Thread::sleep(25);
} else {
+ // In IPC mode we wait but also select() on STDIN to read database updates
FD_SET(_rawInput,&readfds);
- struct timeval tv;
tv.tv_sec = 0;
tv.tv_usec = 25000;
select(_rawInput+1,&readfds,&nullfds,&nullfds,&tv);
if (FD_ISSET(_rawInput,&readfds)) {
const long rn = (long)read(_rawInput,readbuf,1048576);
+ bool gotMessage = false;
for(long i=0;i<rn;++i) {
- if (readbuf[i]) {
+ if ((readbuf[i] != '\n')&&(readbuf[i] != '\r')&&(readbuf[i] != 0)) { // compatible with nodeJS IPC
rawInputBuf.push_back(readbuf[i]);
} else if (rawInputBuf.length() > 0) {
- _add(OSUtils::jsonParse(rawInputBuf));
+ try {
+ const nlohmann::json obj(OSUtils::jsonParse(rawInputBuf));
+
+ gotMessage = true;
+ if (!_dataReady) {
+ _dataReady = true;
+ _networks_m.unlock();
+ }
+
+ if (obj.is_array()) {
+ for(unsigned long i=0;i<obj.size();++i)
+ _add(obj[i]);
+ } else if (obj.is_object()) {
+ _add(obj);
+ }
+ } catch ( ... ) {} // ignore malformed JSON
rawInputBuf.clear();
}
}
- continue; // we only want to do the stuff below this every few dozen ms or so, so pause again
+ if (!gotMessage) // select() again immediately until we get at least one full message
+ continue;
}
}
#else
@@ -347,73 +367,83 @@ void JSONDB::threadMain()
continue;
else _summaryThreadToDo.swap(todo);
}
- const uint64_t now = OSUtils::now();
- for(std::vector<uint64_t>::iterator ii(todo.begin());ii!=todo.end();++ii) {
- const uint64_t networkId = *ii;
+ if (!_dataReady) {
+ _dataReady = true;
+ _networks_m.unlock();
+ }
+
+ const uint64_t now = OSUtils::now();
+ try {
Mutex::Lock _l(_networks_m);
- std::unordered_map<uint64_t,_NW>::iterator n(_networks.find(networkId));
- if (n != _networks.end()) {
- NetworkSummaryInfo &ns = n->second.summaryInfo;
- ns.activeBridges.clear();
- ns.allocatedIps.clear();
- ns.authorizedMemberCount = 0;
- ns.activeMemberCount = 0;
- ns.totalMemberCount = 0;
- ns.mostRecentDeauthTime = 0;
-
- for(std::unordered_map< uint64_t,std::vector<uint8_t> >::const_iterator m(n->second.members.begin());m!=n->second.members.end();++m) {
- try {
- nlohmann::json member(nlohmann::json::from_msgpack(m->second));
-
- if (OSUtils::jsonBool(member["authorized"],false)) {
- ++ns.authorizedMemberCount;
-
- try {
- const nlohmann::json &mlog = member["recentLog"];
- if ((mlog.is_array())&&(mlog.size() > 0)) {
- const nlohmann::json &mlog1 = mlog[0];
- if (mlog1.is_object()) {
- if ((now - OSUtils::jsonInt(mlog1["ts"],0ULL)) < (ZT_NETWORK_AUTOCONF_DELAY * 2))
- ++ns.activeMemberCount;
+ for(std::vector<uint64_t>::iterator ii(todo.begin());ii!=todo.end();++ii) {
+ const uint64_t networkId = *ii;
+ std::unordered_map<uint64_t,_NW>::iterator n(_networks.find(networkId));
+ if (n != _networks.end()) {
+ NetworkSummaryInfo &ns = n->second.summaryInfo;
+ ns.activeBridges.clear();
+ ns.allocatedIps.clear();
+ ns.authorizedMemberCount = 0;
+ ns.activeMemberCount = 0;
+ ns.totalMemberCount = 0;
+ ns.mostRecentDeauthTime = 0;
+
+ for(std::unordered_map< uint64_t,std::vector<uint8_t> >::const_iterator m(n->second.members.begin());m!=n->second.members.end();++m) {
+ try {
+ nlohmann::json member(nlohmann::json::from_msgpack(m->second));
+
+ if (OSUtils::jsonBool(member["authorized"],false)) {
+ ++ns.authorizedMemberCount;
+
+ try {
+ const nlohmann::json &mlog = member["recentLog"];
+ if ((mlog.is_array())&&(mlog.size() > 0)) {
+ const nlohmann::json &mlog1 = mlog[0];
+ if (mlog1.is_object()) {
+ if ((now - OSUtils::jsonInt(mlog1["ts"],0ULL)) < (ZT_NETWORK_AUTOCONF_DELAY * 2))
+ ++ns.activeMemberCount;
+ }
}
- }
- } catch ( ... ) {}
-
- try {
- if (OSUtils::jsonBool(member["activeBridge"],false))
- ns.activeBridges.push_back(Address(m->first));
- } catch ( ... ) {}
-
- try {
- const nlohmann::json &mips = member["ipAssignments"];
- if (mips.is_array()) {
- for(unsigned long i=0;i<mips.size();++i) {
- InetAddress mip(OSUtils::jsonString(mips[i],"").c_str());
- if ((mip.ss_family == AF_INET)||(mip.ss_family == AF_INET6))
- ns.allocatedIps.push_back(mip);
+ } catch ( ... ) {}
+
+ try {
+ if (OSUtils::jsonBool(member["activeBridge"],false))
+ ns.activeBridges.push_back(Address(m->first));
+ } catch ( ... ) {}
+
+ try {
+ const nlohmann::json &mips = member["ipAssignments"];
+ if (mips.is_array()) {
+ for(unsigned long i=0;i<mips.size();++i) {
+ InetAddress mip(OSUtils::jsonString(mips[i],"").c_str());
+ if ((mip.ss_family == AF_INET)||(mip.ss_family == AF_INET6))
+ ns.allocatedIps.push_back(mip);
+ }
}
- }
- } catch ( ... ) {}
- } else {
- try {
- ns.mostRecentDeauthTime = std::max(ns.mostRecentDeauthTime,OSUtils::jsonInt(member["lastDeauthorizedTime"],0ULL));
- } catch ( ... ) {}
- }
- ++ns.totalMemberCount;
- } catch ( ... ) {}
- }
+ } catch ( ... ) {}
+ } else {
+ try {
+ ns.mostRecentDeauthTime = std::max(ns.mostRecentDeauthTime,OSUtils::jsonInt(member["lastDeauthorizedTime"],0ULL));
+ } catch ( ... ) {}
+ }
+ ++ns.totalMemberCount;
+ } catch ( ... ) {}
+ }
- std::sort(ns.activeBridges.begin(),ns.activeBridges.end());
- std::sort(ns.allocatedIps.begin(),ns.allocatedIps.end());
+ std::sort(ns.activeBridges.begin(),ns.activeBridges.end());
+ std::sort(ns.allocatedIps.begin(),ns.allocatedIps.end());
- n->second.summaryInfoLastComputed = now;
+ n->second.summaryInfoLastComputed = now;
+ }
}
- }
+ } catch ( ... ) {}
todo.clear();
}
+ if (!_dataReady) // sanity check
+ _networks_m.unlock();
+
#ifndef __WINDOWS__
delete [] readbuf;
#endif
diff --git a/controller/JSONDB.hpp b/controller/JSONDB.hpp
index 23d00a51..7131b0c1 100644
--- a/controller/JSONDB.hpp
+++ b/controller/JSONDB.hpp
@@ -62,6 +62,17 @@ public:
JSONDB(const std::string &basePath);
~JSONDB();
+ /**
+ * Write a JSON object to the data store
+ *
+ * It's important that obj contain a valid JSON object with no newlines (jsonDump with -1
+ * for indentation), since newline-delimited JSON is what nodeJS's IPC speaks and this
+ * is important in Central-harnessed mode.
+ *
+ * @param n Path name of object
+ * @param obj Object in single-line no-CRs JSON object format (OSUtils::jsonDump(obj,-1))
+ * @return True if write appears successful
+ */
bool writeRaw(const std::string &n,const std::string &obj);
bool hasNetwork(const uint64_t networkId) const;
@@ -171,6 +182,7 @@ private:
std::unordered_map< uint64_t,_NW > _networks;
std::unordered_map< uint64_t,std::unordered_set< uint64_t > > _members;
+ bool _dataReady;
Mutex _networks_m;
};
diff --git a/osdep/OSUtils.cpp b/osdep/OSUtils.cpp
index 882b8255..c1edc353 100644
--- a/osdep/OSUtils.cpp
+++ b/osdep/OSUtils.cpp
@@ -434,7 +434,7 @@ std::string OSUtils::platformDefaultHomePath()
// Inline these massive JSON operations in one place only to reduce binary footprint and compile time
nlohmann::json OSUtils::jsonParse(const std::string &buf) { return nlohmann::json::parse(buf.c_str()); }
-std::string OSUtils::jsonDump(const nlohmann::json &j) { return j.dump(1); }
+std::string OSUtils::jsonDump(const nlohmann::json &j,int indentation) { return j.dump(indentation); }
uint64_t OSUtils::jsonInt(const nlohmann::json &jv,const uint64_t dfl)
{
diff --git a/osdep/OSUtils.hpp b/osdep/OSUtils.hpp
index d6f32822..8683ba25 100644
--- a/osdep/OSUtils.hpp
+++ b/osdep/OSUtils.hpp
@@ -276,7 +276,7 @@ public:
static std::string platformDefaultHomePath();
static nlohmann::json jsonParse(const std::string &buf);
- static std::string jsonDump(const nlohmann::json &j);
+ static std::string jsonDump(const nlohmann::json &j,int indentation = 1);
static uint64_t jsonInt(const nlohmann::json &jv,const uint64_t dfl);
static bool jsonBool(const nlohmann::json &jv,const bool dfl);
static std::string jsonString(const nlohmann::json &jv,const char *dfl);
diff --git a/service/OneService.cpp b/service/OneService.cpp
index 115830e5..27f2ef3c 100644
--- a/service/OneService.cpp
+++ b/service/OneService.cpp
@@ -629,6 +629,14 @@ public:
}
}
+ // Allow controller DB path to be put somewhere else
+ json &settings = _localConfig["settings"];
+ if (settings.is_object()) {
+ const std::string cdbp(OSUtils::jsonString(settings["controllerDbPath"],""));
+ if (cdbp.length() > 0)
+ _controllerDbPath = cdbp;
+ }
+
// Set trusted paths if there are any
if (trustedPathCount)
_node->setTrustedPaths(reinterpret_cast<const struct sockaddr_storage *>(trustedPathNetworks),trustedPathIds,trustedPathCount);
@@ -1484,27 +1492,6 @@ public:
_allowManagementFrom.push_back(nw);
}
}
-
- json &controllerDbHttpHost = settings["controllerDbHttpHost"];
- json &controllerDbHttpPort = settings["controllerDbHttpPort"];
- json &controllerDbHttpPath = settings["controllerDbHttpPath"];
- if ((controllerDbHttpHost.is_string())&&(controllerDbHttpPort.is_number())) {
- _controllerDbPath = "http://";
- std::string h = controllerDbHttpHost;
- _controllerDbPath.append(h);
- char dbp[128];
- OSUtils::ztsnprintf(dbp,sizeof(dbp),"%d",(int)controllerDbHttpPort);
- _controllerDbPath.push_back(':');
- _controllerDbPath.append(dbp);
- if (controllerDbHttpPath.is_string()) {
- std::string p = controllerDbHttpPath;
- if ((p.length() == 0)||(p[0] != '/'))
- _controllerDbPath.push_back('/');
- _controllerDbPath.append(p);
- } else {
- _controllerDbPath.push_back('/');
- }
- }
}
// Checks if a managed IP or route target is allowed