From 92c7070aa85425041f856d7e4203bdd1ae713c33 Mon Sep 17 00:00:00 2001 From: Adam Ierymenko Date: Fri, 3 Nov 2017 20:55:16 -0400 Subject: RethinkDB fixes. --- controller/EmbeddedNetworkController.cpp | 105 ++++------- controller/EmbeddedNetworkController.hpp | 2 +- controller/RethinkDB.cpp | 288 +++++++++++++++++-------------- controller/RethinkDB.hpp | 2 + make-linux.mk | 2 +- service/OneService.cpp | 6 +- 6 files changed, 200 insertions(+), 205 deletions(-) diff --git a/controller/EmbeddedNetworkController.cpp b/controller/EmbeddedNetworkController.cpp index 7cb931c6..1ca2ee08 100644 --- a/controller/EmbeddedNetworkController.cpp +++ b/controller/EmbeddedNetworkController.cpp @@ -457,8 +457,7 @@ EmbeddedNetworkController::EmbeddedNetworkController(Node *node,const char *dbPa _startTime(OSUtils::now()), _node(node), _path(dbPath), - _sender((NetworkController::Sender *)0), - _db(this,_signingId.address(),dbPath) + _sender((NetworkController::Sender *)0) { } @@ -476,7 +475,8 @@ void EmbeddedNetworkController::init(const Identity &signingId,Sender *sender) _signingId = signingId; _sender = sender; _signingIdAddressString = signingId.address().toString(tmp); - _db.waitForReady(); + _db.reset(new ControllerDB(this,_signingId.address(),_path.c_str())); + _db->waitForReady(); } void EmbeddedNetworkController::request( @@ -507,12 +507,15 @@ unsigned int EmbeddedNetworkController::handleControlPlaneHttpGET( std::string &responseBody, std::string &responseContentType) { + if (!_db) + return 500; + if ((path.size() > 0)&&(path[0] == "network")) { if ((path.size() >= 2)&&(path[1].length() == 16)) { const uint64_t nwid = Utils::hexStrToU64(path[1].c_str()); json network; - if (!_db.get(nwid,network)) + if (!_db->get(nwid,network)) return 404; if (path.size() >= 3) { @@ -524,7 +527,7 @@ unsigned int EmbeddedNetworkController::handleControlPlaneHttpGET( const uint64_t address = Utils::hexStrToU64(path[3].c_str()); json member; - if (!_db.get(nwid,network,address,member)) + if (!_db->get(nwid,network,address,member)) return 404; _addMemberNonPersistedFields(nwid,address,member,OSUtils::now()); responseBody = OSUtils::jsonDump(member); @@ -535,7 +538,7 @@ unsigned int EmbeddedNetworkController::handleControlPlaneHttpGET( responseBody = "{"; std::vector members; - if (_db.get(nwid,network,members)) { + if (_db->get(nwid,network,members)) { responseBody.reserve((members.size() + 2) * 32); std::string mid; for(auto member=members.begin();member!=members.end();++member) { @@ -558,7 +561,7 @@ unsigned int EmbeddedNetworkController::handleControlPlaneHttpGET( const int64_t now = OSUtils::now(); ControllerDB::NetworkSummaryInfo ns; - _db.summary(nwid,ns); + _db->summary(nwid,ns); _addNetworkNonPersistedFields(nwid,network,now,ns); responseBody = OSUtils::jsonDump(network); responseContentType = "application/json"; @@ -569,7 +572,7 @@ unsigned int EmbeddedNetworkController::handleControlPlaneHttpGET( // List networks std::vector networkIds; - _db.networks(networkIds); + _db->networks(networkIds); char tmp[64]; responseBody = "["; responseBody.reserve((networkIds.size() + 1) * 24); @@ -608,6 +611,8 @@ unsigned int EmbeddedNetworkController::handleControlPlaneHttpPOST( std::string &responseBody, std::string &responseContentType) { + if (!_db) + return 500; if (path.empty()) return 404; @@ -641,7 +646,7 @@ unsigned int EmbeddedNetworkController::handleControlPlaneHttpPOST( OSUtils::ztsnprintf(addrs,sizeof(addrs),"%.10llx",(unsigned long long)address); json member,network; - _db.get(nwid,network,address,member); + _db->get(nwid,network,address,member); json origMember(member); // for detecting changes _initMember(member); @@ -732,7 +737,7 @@ unsigned int EmbeddedNetworkController::handleControlPlaneHttpPOST( if (member != origMember) { json &revj = member["revision"]; member["revision"] = (revj.is_number() ? ((uint64_t)revj + 1ULL) : 1ULL); - _db.save(member); + _db->save(member); } _addMemberNonPersistedFields(nwid,address,member,now); @@ -754,7 +759,7 @@ unsigned int EmbeddedNetworkController::handleControlPlaneHttpPOST( Utils::getSecureRandom(&nwidPostfix,sizeof(nwidPostfix)); uint64_t tryNwid = nwidPrefix | (nwidPostfix & 0xffffffULL); if ((tryNwid & 0xffffffULL) == 0ULL) tryNwid |= 1ULL; - if (!_db.hasNetwork(tryNwid)) { + if (!_db->hasNetwork(tryNwid)) { nwid = tryNwid; break; } @@ -765,7 +770,7 @@ unsigned int EmbeddedNetworkController::handleControlPlaneHttpPOST( OSUtils::ztsnprintf(nwids,sizeof(nwids),"%.16llx",(unsigned long long)nwid); json network; - _db.get(nwid,network); + _db->get(nwid,network); json origNetwork(network); // for detecting changes _initNetwork(network); @@ -984,11 +989,11 @@ unsigned int EmbeddedNetworkController::handleControlPlaneHttpPOST( if (network != origNetwork) { json &revj = network["revision"]; network["revision"] = (revj.is_number() ? ((uint64_t)revj + 1ULL) : 1ULL); - _db.save(network); + _db->save(network); } ControllerDB::NetworkSummaryInfo ns; - _db.summary(nwid,ns); + _db->summary(nwid,ns); _addNetworkNonPersistedFields(nwid,network,now,ns); responseBody = OSUtils::jsonDump(network); @@ -1011,6 +1016,8 @@ unsigned int EmbeddedNetworkController::handleControlPlaneHttpDELETE( std::string &responseBody, std::string &responseContentType) { + if (!_db) + return 500; if (path.empty()) return 404; @@ -1022,7 +1029,7 @@ unsigned int EmbeddedNetworkController::handleControlPlaneHttpDELETE( const uint64_t address = Utils::hexStrToU64(path[3].c_str()); json network,member; - _db.get(nwid,network,address,member); + _db->get(nwid,network,address,member); { std::lock_guard l(_memberStatus_l); @@ -1037,8 +1044,8 @@ unsigned int EmbeddedNetworkController::handleControlPlaneHttpDELETE( } } else { json network; - _db.get(nwid,network); - _db.eraseNetwork(nwid); + _db->get(nwid,network); + _db->eraseNetwork(nwid); { std::lock_guard l(_memberStatus_l); @@ -1068,6 +1075,9 @@ void EmbeddedNetworkController::handleRemoteTrace(const ZT_RemoteTrace &rt) char id[128],tmp[128]; std::string k,v; + if (!_db) + return; + try { // Convert Dictionary into JSON object json d; @@ -1106,7 +1116,7 @@ void EmbeddedNetworkController::handleRemoteTrace(const ZT_RemoteTrace &rt) d["objtype"] = "trace"; d["ts"] = now; d["nodeId"] = Utils::hex10(rt.origin,tmp); - _db.save(d); + _db->save(d); } catch ( ... ) { // drop invalid trace messages if an error occurs } @@ -1159,6 +1169,9 @@ void EmbeddedNetworkController::_request( ControllerDB::NetworkSummaryInfo ns; json network,member,origMember; + if (!_db) + return; + if (((!_signingId)||(!_signingId.hasPrivate()))||(_signingId.address().toInt() != (nwid >> 24))||(!_sender)) return; @@ -1173,7 +1186,7 @@ void EmbeddedNetworkController::_request( } OSUtils::ztsnprintf(nwids,sizeof(nwids),"%.16llx",nwid); - if (!_db.get(nwid,network,identity.address().toInt(),member,ns)) { + if (!_db->get(nwid,network,identity.address().toInt(),member,ns)) { _sender->ncSendError(nwid,requestPacketId,identity.address(),NetworkController::NC_ERROR_OBJECT_NOT_FOUND); return; } @@ -1288,7 +1301,7 @@ void EmbeddedNetworkController::_request( if (origMember != member) { json &revj = member["revision"]; member["revision"] = (revj.is_number() ? ((uint64_t)revj + 1ULL) : 1ULL); - _db.save(member); + _db->save(member); } _sender->ncSendError(nwid,requestPacketId,identity.address(),NetworkController::NC_ERROR_ACCESS_DENIED); return; @@ -1655,62 +1668,12 @@ void EmbeddedNetworkController::_request( if (member != origMember) { json &revj = member["revision"]; member["revision"] = (revj.is_number() ? ((uint64_t)revj + 1ULL) : 1ULL); - _db.save(member); + _db->save(member); } _sender->ncSendConfig(nwid,requestPacketId,identity.address(),*(nc.get()),metaData.getUI(ZT_NETWORKCONFIG_REQUEST_METADATA_KEY_VERSION,0) < 6); } -/* -void EmbeddedNetworkController::threadMain() - throw() -{ - char tmp[256]; - _RQEntry *qe = (_RQEntry *)0; - while (_running) { - const BlockingQueue<_RQEntry *>::TimedWaitResult wr = _queue.get(qe,1000); - if ((wr == BlockingQueue<_RQEntry *>::STOP)||(!_running)) - break; - - try { - if ((wr == BlockingQueue<_RQEntry *>::OK)&&(qe->type == _RQEntry::RQENTRY_TYPE_REQUEST)) { - _request(qe->nwid,qe->fromAddr,qe->requestPacketId,qe->identity,qe->metaData); - delete qe; - } - - // Every 10s we update a 'status' containing member online state, etc. - const uint64_t now = OSUtils::now(); - if ((now - _lastDumpedStatus) >= 10000) { - _lastDumpedStatus = now; - bool first = true; - OSUtils::ztsnprintf(tmp,sizeof(tmp),"{\"id\":\"%.10llx-status\",\"objtype\":\"status\",\"memberStatus\":[",_signingId.address().toInt()); - std::string st(tmp); - { - Mutex::Lock _l(_memberStatus_m); - st.reserve(48 * (_memberStatus.size() + 1)); - _db.eachId([this,&st,&now,&first,&tmp](uint64_t networkId,uint64_t nodeId) { - uint64_t lrt = 0ULL; - auto ms = this->_memberStatus.find(_MemberStatusKey(networkId,nodeId)); - if (ms != _memberStatus.end()) - lrt = ms->second.lastRequestTime; - OSUtils::ztsnprintf(tmp,sizeof(tmp),"%s\"%.16llx\",\"%.10llx\",%llu", - (first) ? "" : ",", - (unsigned long long)networkId, - (unsigned long long)nodeId, - (unsigned long long)lrt); - st.append(tmp); - first = false; - }); - } - OSUtils::ztsnprintf(tmp,sizeof(tmp),"],\"clock\":%llu,\"startTime\":%llu,\"uptime\":%llu,\"vMajor\":%d,\"vMinor\":%d,\"vRev\":%d}",(unsigned long long)now,(unsigned long long)_startTime,(unsigned long long)(now - _startTime),ZEROTIER_ONE_VERSION_MAJOR,ZEROTIER_ONE_VERSION_MINOR,ZEROTIER_ONE_VERSION_REVISION); - st.append(tmp); - _db.writeRaw("status",st); - } - } catch ( ... ) {} - } -} -*/ - void EmbeddedNetworkController::_startThreads() { std::lock_guard l(_threads_l); diff --git a/controller/EmbeddedNetworkController.hpp b/controller/EmbeddedNetworkController.hpp index 7149df10..f9b6fb5a 100644 --- a/controller/EmbeddedNetworkController.hpp +++ b/controller/EmbeddedNetworkController.hpp @@ -239,7 +239,7 @@ private: Identity _signingId; std::string _signingIdAddressString; NetworkController::Sender *_sender; - ControllerDB _db; + std::unique_ptr _db; BlockingQueue< _RQEntry * > _queue; std::vector _threads; std::mutex _threads_l; diff --git a/controller/RethinkDB.cpp b/controller/RethinkDB.cpp index 6e656d23..1f8d29df 100644 --- a/controller/RethinkDB.cpp +++ b/controller/RethinkDB.cpp @@ -40,12 +40,13 @@ RethinkDB::RethinkDB(EmbeddedNetworkController *const nc,const Address &myAddres _waitNoticePrinted(false) { std::vector ps(OSUtils::split(path,":","","")); - if ((ps.size() != 5)||(ps[0] != "rethinkdb")) + if ((ps.size() < 4)||(ps[0] != "rethinkdb")) throw std::runtime_error("invalid rethinkdb database url"); _host = ps[1]; - _db = ps[2]; - _auth = ps[3]; - _port = Utils::strToInt(ps[4].c_str()); + _port = Utils::strToInt(ps[2].c_str()); + _db = ps[3]; + if (ps.size() > 4) + _auth = ps[4]; _readyLock.lock(); @@ -56,167 +57,192 @@ RethinkDB::RethinkDB(EmbeddedNetworkController *const nc,const Address &myAddres } _membersDbWatcher = std::thread([this]() { - while (_run == 1) { - try { - std::unique_ptr rdb(R::connect(this->_host,this->_port,this->_auth)); - if (rdb) { - _membersDbWatcherConnection = (void *)rdb.get(); - auto cur = R::db(this->_db).table("Member",R::optargs("read_mode","outdated")).get_all(this->_myAddressStr,R::optargs("index","controllerId")).changes(R::optargs("squash",0.05,"include_initial",true,"include_types",true,"include_states",true)).run(*rdb); - while (cur.has_next()) { - if (_run != 1) break; - json tmp(json::parse(cur.next().as_json())); - if ((tmp["type"] == "state")&&(tmp["state"] == "ready")) { - if (--this->_ready == 0) { - if (_waitNoticePrinted) - fprintf(stderr,"NOTICE: controller RethinkDB data download complete." ZT_EOL_S); - this->_readyLock.unlock(); + try { + while (_run == 1) { + try { + std::unique_ptr rdb(R::connect(this->_host,this->_port,this->_auth)); + if (rdb) { + _membersDbWatcherConnection = (void *)rdb.get(); + auto cur = R::db(this->_db).table("Member",R::optargs("read_mode","outdated")).get_all(this->_myAddressStr,R::optargs("index","controllerId")).changes(R::optargs("squash",0.05,"include_initial",true,"include_types",true,"include_states",true)).run(*rdb); + while (cur.has_next()) { + if (_run != 1) break; + json tmp(json::parse(cur.next().as_json())); + if ((tmp["type"] == "state")&&(tmp["state"] == "ready")) { + if (--this->_ready == 0) { + if (_waitNoticePrinted) + fprintf(stderr,"NOTICE: controller RethinkDB data download complete." ZT_EOL_S); + this->_readyLock.unlock(); + } + } else { + try { + json &ov = tmp["old_val"]; + json &nv = tmp["new_val"]; + if (ov.is_object()||nv.is_object()) { + this->_memberChanged(ov,nv); + } + } catch ( ... ) {} // ignore bad records } - } else { - try { - json &ov = tmp["old_val"]; - json &nv = tmp["new_val"]; - if (ov.is_object()||nv.is_object()) - this->_memberChanged(ov,nv); - } catch ( ... ) {} // ignore bad records } } + } catch (std::exception &e) { + fprintf(stderr,"ERROR: controller RethinkDB (member change stream): %s" ZT_EOL_S,e.what()); + } catch (R::Error &e) { + fprintf(stderr,"ERROR: controller RethinkDB (member change stream): %s" ZT_EOL_S,e.message.c_str()); + } catch ( ... ) { + fprintf(stderr,"ERROR: controller RethinkDB (member change stream): unknown exception" ZT_EOL_S); } - } catch (std::exception &e) { - fprintf(stderr,"ERROR: controller RethinkDB (member change stream): %s" ZT_EOL_S,e.what()); - } catch (R::Error &e) { - fprintf(stderr,"ERROR: controller RethinkDB (member change stream): %s" ZT_EOL_S,e.message.c_str()); - } catch ( ... ) { - fprintf(stderr,"ERROR: controller RethinkDB (member change stream): unknown exception" ZT_EOL_S); + std::this_thread::sleep_for(std::chrono::milliseconds(250)); } - std::this_thread::sleep_for(std::chrono::milliseconds(250)); - } + } catch ( ... ) {} }); _networksDbWatcher = std::thread([this]() { - while (_run == 1) { - try { - std::unique_ptr rdb(R::connect(this->_host,this->_port,this->_auth)); - if (rdb) { - _membersDbWatcherConnection = (void *)rdb.get(); - auto cur = R::db(this->_db).table("Network",R::optargs("read_mode","outdated")).get_all(this->_myAddressStr,R::optargs("index","controllerId")).changes(R::optargs("squash",0.05,"include_initial",true,"include_types",true,"include_states",true)).run(*rdb); - while (cur.has_next()) { - if (_run != 1) break; - json tmp(json::parse(cur.next().as_json())); - if ((tmp["type"] == "state")&&(tmp["state"] == "ready")) { - if (--this->_ready == 0) { - if (_waitNoticePrinted) - fprintf(stderr,"NOTICE: controller RethinkDB data download complete." ZT_EOL_S); - this->_readyLock.unlock(); + try { + while (_run == 1) { + try { + std::unique_ptr rdb(R::connect(this->_host,this->_port,this->_auth)); + if (rdb) { + _networksDbWatcherConnection = (void *)rdb.get(); + auto cur = R::db(this->_db).table("Network",R::optargs("read_mode","outdated")).get_all(this->_myAddressStr,R::optargs("index","controllerId")).changes(R::optargs("squash",0.05,"include_initial",true,"include_types",true,"include_states",true)).run(*rdb); + while (cur.has_next()) { + if (_run != 1) break; + json tmp(json::parse(cur.next().as_json())); + if ((tmp["type"] == "state")&&(tmp["state"] == "ready")) { + if (--this->_ready == 0) { + if (_waitNoticePrinted) + fprintf(stderr,"NOTICE: controller RethinkDB data download complete." ZT_EOL_S); + this->_readyLock.unlock(); + } + } else { + try { + json &ov = tmp["old_val"]; + json &nv = tmp["new_val"]; + if (ov.is_object()||nv.is_object()) { + this->_networkChanged(ov,nv); + } + } catch ( ... ) {} // ignore bad records } - } else { - try { - json &ov = tmp["old_val"]; - json &nv = tmp["new_val"]; - if (ov.is_object()||nv.is_object()) - this->_networkChanged(ov,nv); - } catch ( ... ) {} // ignore bad records } } + } catch (std::exception &e) { + fprintf(stderr,"ERROR: controller RethinkDB (network change stream): %s" ZT_EOL_S,e.what()); + } catch (R::Error &e) { + fprintf(stderr,"ERROR: controller RethinkDB (network change stream): %s" ZT_EOL_S,e.message.c_str()); + } catch ( ... ) { + fprintf(stderr,"ERROR: controller RethinkDB (network change stream): unknown exception" ZT_EOL_S); } - } catch (std::exception &e) { - fprintf(stderr,"ERROR: controller RethinkDB (network change stream): %s" ZT_EOL_S,e.what()); - } catch (R::Error &e) { - fprintf(stderr,"ERROR: controller RethinkDB (network change stream): %s" ZT_EOL_S,e.message.c_str()); - } catch ( ... ) { - fprintf(stderr,"ERROR: controller RethinkDB (network change stream): unknown exception" ZT_EOL_S); + std::this_thread::sleep_for(std::chrono::milliseconds(250)); } - std::this_thread::sleep_for(std::chrono::milliseconds(250)); - } + } catch ( ... ) {} }); for(int t=0;t rdb; - nlohmann::json *config = (nlohmann::json *)0; - while ((this->_commitQueue.get(config))&&(_run == 1)) { - if (!config) - continue; - json record; - const std::string objtype = (*config)["objtype"]; - const char *table; - std::string deleteId; - if (objtype == "member") { - const std::string nwid = (*config)["nwid"]; - const std::string id = (*config)["id"]; - record["id"] = nwid + "-" + id; - record["controllerId"] = this->_myAddressStr; - record["networkId"] = nwid; - record["nodeId"] = id; - record["config"] = *config; - table = "Member"; - } else if (objtype == "network") { - const std::string id = (*config)["id"]; - record["id"] = id; - record["controllerId"] = this->_myAddressStr; - record["config"] = *config; - table = "Network"; - } else if (objtype == "delete_network") { - deleteId = (*config)["id"]; - table = "Network"; - } else if (objtype == "delete_member") { - deleteId = (*config)["nwid"]; - deleteId.push_back('-'); - const std::string tmp = (*config)["id"]; - deleteId.append(tmp); - table = "Member"; - } else if (objtype == "trace") { - record = *config; - table = "RemoteTrace"; - } else { - continue; - } + try { + std::unique_ptr rdb; + nlohmann::json *config = (nlohmann::json *)0; + while ((this->_commitQueue.get(config))&&(_run == 1)) { + if (!config) + continue; + json record; + const std::string objtype = (*config)["objtype"]; + const char *table; + std::string deleteId; + if (objtype == "member") { + const std::string nwid = (*config)["nwid"]; + const std::string id = (*config)["id"]; + record["id"] = nwid + "-" + id; + record["controllerId"] = this->_myAddressStr; + record["networkId"] = nwid; + record["nodeId"] = id; + record["config"] = *config; + table = "Member"; + } else if (objtype == "network") { + const std::string id = (*config)["id"]; + record["id"] = id; + record["controllerId"] = this->_myAddressStr; + record["config"] = *config; + table = "Network"; + } else if (objtype == "delete_network") { + deleteId = (*config)["id"]; + table = "Network"; + } else if (objtype == "delete_member") { + deleteId = (*config)["nwid"]; + deleteId.push_back('-'); + const std::string tmp = (*config)["id"]; + deleteId.append(tmp); + table = "Member"; + } else if (objtype == "trace") { + record = *config; + table = "RemoteTrace"; + } else { + continue; + } - while (_run == 1) { - try { - if (!rdb) - rdb = R::connect(this->_host,this->_port,this->_auth); - if (rdb) { - if (deleteId.length() > 0) { - R::db(this->_db).table(table).get(deleteId).delete_().run(*rdb); + while (_run == 1) { + try { + if (!rdb) + rdb = R::connect(this->_host,this->_port,this->_auth); + if (rdb) { + if (deleteId.length() > 0) { + printf("DELETE: %s" ZT_EOL_S,deleteId.c_str()); + R::db(this->_db).table(table).get(deleteId).delete_().run(*rdb); + } else { + printf("UPSERT: %s" ZT_EOL_S,record.dump().c_str()); + R::db(this->_db).table(table).insert(R::Datum::from_json(record.dump()),R::optargs("conflict","update","return_changes",false)).run(*rdb); + } + break; } else { - R::db(this->_db).table(table).insert(record.dump(),R::optargs("conflict","update","return_changes",false)).run(*rdb); + fprintf(stderr,"ERROR: controller RethinkDB (insert/update): connect failed (will retry)" ZT_EOL_S); } - break; - } else { - fprintf(stderr,"ERROR: controller RethinkDB (insert/update): connect failed (will retry)" ZT_EOL_S); + } catch (std::exception &e) { + fprintf(stderr,"ERROR: controller RethinkDB (insert/update): %s" ZT_EOL_S,e.what()); + rdb.reset(); + } catch (R::Error &e) { + fprintf(stderr,"ERROR: controller RethinkDB (insert/update): %s" ZT_EOL_S,e.message.c_str()); + rdb.reset(); + } catch ( ... ) { + fprintf(stderr,"ERROR: controller RethinkDB (insert/update): unknown exception" ZT_EOL_S); + rdb.reset(); } - } catch (std::exception &e) { - fprintf(stderr,"ERROR: controller RethinkDB (insert/update): %s" ZT_EOL_S,e.what()); - rdb.reset(); - } catch (R::Error &e) { - fprintf(stderr,"ERROR: controller RethinkDB (insert/update): %s" ZT_EOL_S,e.message.c_str()); - rdb.reset(); - } catch ( ... ) { - fprintf(stderr,"ERROR: controller RethinkDB (insert/update): unknown exception" ZT_EOL_S); - rdb.reset(); + std::this_thread::sleep_for(std::chrono::milliseconds(250)); } - std::this_thread::sleep_for(std::chrono::milliseconds(250)); } - } + } catch ( ... ) {} }); } + + _heartbeatThread = std::thread([this]() { + try { + char tmp[1024]; + std::unique_ptr rdb; + while (_run == 1) { + try { + if (!rdb) + rdb = R::connect(this->_host,this->_port,this->_auth); + if (rdb) { + OSUtils::ztsnprintf(tmp,sizeof(tmp),"{\"id\":\"%s\",\"lastAlive\":%lld}",this->_myAddressStr.c_str(),(long long)OSUtils::now()); + R::db(this->_db).table("Controller").update(R::Datum::from_json(tmp)).run(*rdb); + } + } catch ( ... ) { + rdb.reset(); + } + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + } + } catch ( ... ) {} + }); } RethinkDB::~RethinkDB() { - // FIXME: not totally safe but will generally work, and only happens on shutdown anyway. - // Would need to add some kind of 'whack it' support to librethinkdbxx to do better. _run = 0; std::this_thread::sleep_for(std::chrono::milliseconds(100)); - if (_membersDbWatcherConnection) - ((R::Connection *)_membersDbWatcherConnection)->close(); - if (_networksDbWatcherConnection) - ((R::Connection *)_networksDbWatcherConnection)->close(); _commitQueue.stop(); for(int t=0;t _commitQueue; std::thread _commitThread[ZT_CONTROLLER_RETHINKDB_COMMIT_THREADS]; + std::thread _heartbeatThread; + mutable std::mutex _readyLock; // locked until ready std::atomic _ready; std::atomic _run; diff --git a/make-linux.mk b/make-linux.mk index 4d944776..877d6dc9 100644 --- a/make-linux.mk +++ b/make-linux.mk @@ -268,7 +268,7 @@ official-static: FORCE central-controller: FORCE cd ext/librethinkdbxx ; make - make LDLIBS="-ljemalloc ext/librethinkdbxx/build/librethinkdb++.a" DEFS="-DZT_CONTROLLER_USE_RETHINKDB" one + make -j4 LDLIBS="ext/librethinkdbxx/build/librethinkdb++.a" DEFS="-DZT_CONTROLLER_USE_RETHINKDB" one debug: FORCE make ZT_DEBUG=1 one diff --git a/service/OneService.cpp b/service/OneService.cpp index e962fb5b..66cb708d 100644 --- a/service/OneService.cpp +++ b/service/OneService.cpp @@ -867,10 +867,14 @@ public: clockShouldBe = now + (uint64_t)delay; _phy.poll(delay); } + } catch (std::exception &e) { + Mutex::Lock _l(_termReason_m); + _termReason = ONE_UNRECOVERABLE_ERROR; + _fatalErrorMessage = std::string("unexpected exception in main thread: ")+e.what(); } catch ( ... ) { Mutex::Lock _l(_termReason_m); _termReason = ONE_UNRECOVERABLE_ERROR; - _fatalErrorMessage = "unexpected exception in main thread"; + _fatalErrorMessage = "unexpected exception in main thread: unknown exception"; } try { -- cgit v1.2.3