diff options
author | Adam Ierymenko <adam.ierymenko@gmail.com> | 2017-11-03 22:40:26 -0400 |
---|---|---|
committer | Adam Ierymenko <adam.ierymenko@gmail.com> | 2017-11-03 22:40:26 -0400 |
commit | 7fc9094d8ea1c2d28d003c499016f0755b73063d (patch) | |
tree | 9f5b5fd51aad00bc964e06e188486d026b0e27e3 /controller | |
parent | 92c7070aa85425041f856d7e4203bdd1ae713c33 (diff) | |
download | infinitytier-7fc9094d8ea1c2d28d003c499016f0755b73063d.tar.gz infinitytier-7fc9094d8ea1c2d28d003c499016f0755b73063d.zip |
More fixes to RethinkDB.
Diffstat (limited to 'controller')
-rw-r--r-- | controller/EmbeddedNetworkController.cpp | 11 | ||||
-rw-r--r-- | controller/RethinkDB.cpp | 145 |
2 files changed, 84 insertions, 72 deletions
diff --git a/controller/EmbeddedNetworkController.cpp b/controller/EmbeddedNetworkController.cpp index 1ca2ee08..5707e6e0 100644 --- a/controller/EmbeddedNetworkController.cpp +++ b/controller/EmbeddedNetworkController.cpp @@ -1185,8 +1185,9 @@ void EmbeddedNetworkController::_request( ms.lastRequestTime = now; } - OSUtils::ztsnprintf(nwids,sizeof(nwids),"%.16llx",nwid); - if (!_db->get(nwid,network,identity.address().toInt(),member,ns)) { + Utils::hex(nwid,nwids); + _db->get(nwid,network,identity.address().toInt(),member,ns); + if ((!network.is_object())||(network.size() == 0)) { _sender->ncSendError(nwid,requestPacketId,identity.address(),NetworkController::NC_ERROR_OBJECT_NOT_FOUND); return; } @@ -1684,11 +1685,13 @@ void EmbeddedNetworkController::_startThreads() _threads.emplace_back([this]() { for(;;) { _RQEntry *qe = (_RQEntry *)0; - if (_queue.get(qe)) + if (!_queue.get(qe)) break; try { - if (qe) + if (qe) { _request(qe->nwid,qe->fromAddr,qe->requestPacketId,qe->identity,qe->metaData); + delete qe; + } } catch (std::exception &e) { fprintf(stderr,"ERROR: exception in controller request handling thread: %s" ZT_EOL_S,e.what()); } catch ( ... ) { diff --git a/controller/RethinkDB.cpp b/controller/RethinkDB.cpp index 1f8d29df..ffa1a188 100644 --- a/controller/RethinkDB.cpp +++ b/controller/RethinkDB.cpp @@ -78,6 +78,7 @@ RethinkDB::RethinkDB(EmbeddedNetworkController *const nc,const Address &myAddres json &ov = tmp["old_val"]; json &nv = tmp["new_val"]; if (ov.is_object()||nv.is_object()) { + //if (nv.is_object()) printf("MEMBER: %s" ZT_EOL_S,nv.dump().c_str()); this->_memberChanged(ov,nv); } } catch ( ... ) {} // ignore bad records @@ -118,6 +119,7 @@ RethinkDB::RethinkDB(EmbeddedNetworkController *const nc,const Address &myAddres json &ov = tmp["old_val"]; json &nv = tmp["new_val"]; if (ov.is_object()||nv.is_object()) { + //if (nv.is_object()) printf("NETWORK: %s" ZT_EOL_S,nv.dump().c_str()); this->_networkChanged(ov,nv); } } catch ( ... ) {} // ignore bad records @@ -148,34 +150,41 @@ RethinkDB::RethinkDB(EmbeddedNetworkController *const nc,const Address &myAddres const std::string objtype = (*config)["objtype"]; const char *table; std::string deleteId; - if (objtype == "member") { - const std::string nwid = (*config)["nwid"]; - const std::string id = (*config)["id"]; - record["id"] = nwid + "-" + id; - record["controllerId"] = this->_myAddressStr; - record["networkId"] = nwid; - record["nodeId"] = id; - record["config"] = *config; - table = "Member"; - } else if (objtype == "network") { - const std::string id = (*config)["id"]; - record["id"] = id; - record["controllerId"] = this->_myAddressStr; - record["config"] = *config; - table = "Network"; - } else if (objtype == "delete_network") { - deleteId = (*config)["id"]; - table = "Network"; - } else if (objtype == "delete_member") { - deleteId = (*config)["nwid"]; - deleteId.push_back('-'); - const std::string tmp = (*config)["id"]; - deleteId.append(tmp); - table = "Member"; - } else if (objtype == "trace") { - record = *config; - table = "RemoteTrace"; - } else { + try { + if (objtype == "member") { + const std::string nwid = (*config)["nwid"]; + const std::string id = (*config)["id"]; + record["id"] = nwid + "-" + id; + record["controllerId"] = this->_myAddressStr; + record["networkId"] = nwid; + record["nodeId"] = id; + record["config"] = *config; + table = "Member"; + } else if (objtype == "network") { + const std::string id = (*config)["id"]; + record["id"] = id; + record["controllerId"] = this->_myAddressStr; + record["config"] = *config; + table = "Network"; + } else if (objtype == "delete_network") { + deleteId = (*config)["id"]; + table = "Network"; + } else if (objtype == "delete_member") { + deleteId = (*config)["nwid"]; + deleteId.push_back('-'); + const std::string tmp = (*config)["id"]; + deleteId.append(tmp); + table = "Member"; + } else if (objtype == "trace") { + record = *config; + table = "RemoteTrace"; + } else { + delete config; + continue; + } + delete config; + } catch ( ... ) { + delete config; continue; } @@ -185,10 +194,10 @@ RethinkDB::RethinkDB(EmbeddedNetworkController *const nc,const Address &myAddres rdb = R::connect(this->_host,this->_port,this->_auth); if (rdb) { if (deleteId.length() > 0) { - printf("DELETE: %s" ZT_EOL_S,deleteId.c_str()); + //printf("DELETE: %s" ZT_EOL_S,deleteId.c_str()); R::db(this->_db).table(table).get(deleteId).delete_().run(*rdb); } else { - printf("UPSERT: %s" ZT_EOL_S,record.dump().c_str()); + //printf("UPSERT: %s" ZT_EOL_S,record.dump().c_str()); R::db(this->_db).table(table).insert(R::Datum::from_json(record.dump()),R::optargs("conflict","update","return_changes",false)).run(*rdb); } break; @@ -222,6 +231,7 @@ RethinkDB::RethinkDB(EmbeddedNetworkController *const nc,const Address &myAddres rdb = R::connect(this->_host,this->_port,this->_auth); if (rdb) { OSUtils::ztsnprintf(tmp,sizeof(tmp),"{\"id\":\"%s\",\"lastAlive\":%lld}",this->_myAddressStr.c_str(),(long long)OSUtils::now()); + //printf("HEARTBEAT: %s" ZT_EOL_S,tmp); R::db(this->_db).table("Controller").update(R::Datum::from_json(tmp)).run(*rdb); } } catch ( ... ) { @@ -240,15 +250,18 @@ RethinkDB::~RethinkDB() _commitQueue.stop(); for(int t=0;t<ZT_CONTROLLER_RETHINKDB_COMMIT_THREADS;++t) _commitThread[t].join(); - _membersDbWatcher.detach(); - _networksDbWatcher.detach(); + if (_membersDbWatcherConnection) + ((R::Connection *)_membersDbWatcherConnection)->close(); + if (_networksDbWatcherConnection) + ((R::Connection *)_networksDbWatcherConnection)->close(); + _membersDbWatcher.join(); + _networksDbWatcher.join(); _heartbeatThread.join(); } bool RethinkDB::get(const uint64_t networkId,nlohmann::json &network) { waitForReady(); - std::shared_ptr<_Network> nw; { std::lock_guard<std::mutex> l(_networks_l); @@ -257,17 +270,16 @@ bool RethinkDB::get(const uint64_t networkId,nlohmann::json &network) return false; nw = nwi->second; } - - std::lock_guard<std::mutex> l2(nw->lock); - network = nw->config; - + { + std::lock_guard<std::mutex> l2(nw->lock); + network = nw->config; + } return true; } bool RethinkDB::get(const uint64_t networkId,nlohmann::json &network,const uint64_t memberId,nlohmann::json &member) { waitForReady(); - std::shared_ptr<_Network> nw; { std::lock_guard<std::mutex> l(_networks_l); @@ -276,21 +288,20 @@ bool RethinkDB::get(const uint64_t networkId,nlohmann::json &network,const uint6 return false; nw = nwi->second; } - - std::lock_guard<std::mutex> l2(nw->lock); - auto m = nw->members.find(memberId); - if (m == nw->members.end()) - return false; - network = nw->config; - member = m->second; - + { + std::lock_guard<std::mutex> l2(nw->lock); + network = nw->config; + auto m = nw->members.find(memberId); + if (m == nw->members.end()) + return false; + member = m->second; + } return true; } bool RethinkDB::get(const uint64_t networkId,nlohmann::json &network,const uint64_t memberId,nlohmann::json &member,NetworkSummaryInfo &info) { waitForReady(); - std::shared_ptr<_Network> nw; { std::lock_guard<std::mutex> l(_networks_l); @@ -299,22 +310,21 @@ bool RethinkDB::get(const uint64_t networkId,nlohmann::json &network,const uint6 return false; nw = nwi->second; } - - std::lock_guard<std::mutex> l2(nw->lock); - auto m = nw->members.find(memberId); - if (m == nw->members.end()) - return false; - network = nw->config; - member = m->second; - _fillSummaryInfo(nw,info); - + { + std::lock_guard<std::mutex> l2(nw->lock); + network = nw->config; + _fillSummaryInfo(nw,info); + auto m = nw->members.find(memberId); + if (m == nw->members.end()) + return false; + member = m->second; + } return true; } bool RethinkDB::get(const uint64_t networkId,nlohmann::json &network,std::vector<nlohmann::json> &members) { waitForReady(); - std::shared_ptr<_Network> nw; { std::lock_guard<std::mutex> l(_networks_l); @@ -323,19 +333,18 @@ bool RethinkDB::get(const uint64_t networkId,nlohmann::json &network,std::vector return false; nw = nwi->second; } - - std::lock_guard<std::mutex> l2(nw->lock); - network = nw->config; - for(auto m=nw->members.begin();m!=nw->members.end();++m) - members.push_back(m->second); - + { + std::lock_guard<std::mutex> l2(nw->lock); + network = nw->config; + for(auto m=nw->members.begin();m!=nw->members.end();++m) + members.push_back(m->second); + } return true; } bool RethinkDB::summary(const uint64_t networkId,NetworkSummaryInfo &info) { waitForReady(); - std::shared_ptr<_Network> nw; { std::lock_guard<std::mutex> l(_networks_l); @@ -344,10 +353,10 @@ bool RethinkDB::summary(const uint64_t networkId,NetworkSummaryInfo &info) return false; nw = nwi->second; } - - std::lock_guard<std::mutex> l2(nw->lock); - _fillSummaryInfo(nw,info); - + { + std::lock_guard<std::mutex> l2(nw->lock); + _fillSummaryInfo(nw,info); + } return true; } |