diff options
Diffstat (limited to 'controller/RethinkDB.cpp')
-rw-r--r-- | controller/RethinkDB.cpp | 145 |
1 files changed, 77 insertions, 68 deletions
diff --git a/controller/RethinkDB.cpp b/controller/RethinkDB.cpp index 1f8d29df..ffa1a188 100644 --- a/controller/RethinkDB.cpp +++ b/controller/RethinkDB.cpp @@ -78,6 +78,7 @@ RethinkDB::RethinkDB(EmbeddedNetworkController *const nc,const Address &myAddres json &ov = tmp["old_val"]; json &nv = tmp["new_val"]; if (ov.is_object()||nv.is_object()) { + //if (nv.is_object()) printf("MEMBER: %s" ZT_EOL_S,nv.dump().c_str()); this->_memberChanged(ov,nv); } } catch ( ... ) {} // ignore bad records @@ -118,6 +119,7 @@ RethinkDB::RethinkDB(EmbeddedNetworkController *const nc,const Address &myAddres json &ov = tmp["old_val"]; json &nv = tmp["new_val"]; if (ov.is_object()||nv.is_object()) { + //if (nv.is_object()) printf("NETWORK: %s" ZT_EOL_S,nv.dump().c_str()); this->_networkChanged(ov,nv); } } catch ( ... ) {} // ignore bad records @@ -148,34 +150,41 @@ RethinkDB::RethinkDB(EmbeddedNetworkController *const nc,const Address &myAddres const std::string objtype = (*config)["objtype"]; const char *table; std::string deleteId; - if (objtype == "member") { - const std::string nwid = (*config)["nwid"]; - const std::string id = (*config)["id"]; - record["id"] = nwid + "-" + id; - record["controllerId"] = this->_myAddressStr; - record["networkId"] = nwid; - record["nodeId"] = id; - record["config"] = *config; - table = "Member"; - } else if (objtype == "network") { - const std::string id = (*config)["id"]; - record["id"] = id; - record["controllerId"] = this->_myAddressStr; - record["config"] = *config; - table = "Network"; - } else if (objtype == "delete_network") { - deleteId = (*config)["id"]; - table = "Network"; - } else if (objtype == "delete_member") { - deleteId = (*config)["nwid"]; - deleteId.push_back('-'); - const std::string tmp = (*config)["id"]; - deleteId.append(tmp); - table = "Member"; - } else if (objtype == "trace") { - record = *config; - table = "RemoteTrace"; - } else { + try { + if (objtype == "member") { + const std::string nwid = (*config)["nwid"]; + const std::string id = (*config)["id"]; + record["id"] = nwid + "-" + id; + record["controllerId"] = this->_myAddressStr; + record["networkId"] = nwid; + record["nodeId"] = id; + record["config"] = *config; + table = "Member"; + } else if (objtype == "network") { + const std::string id = (*config)["id"]; + record["id"] = id; + record["controllerId"] = this->_myAddressStr; + record["config"] = *config; + table = "Network"; + } else if (objtype == "delete_network") { + deleteId = (*config)["id"]; + table = "Network"; + } else if (objtype == "delete_member") { + deleteId = (*config)["nwid"]; + deleteId.push_back('-'); + const std::string tmp = (*config)["id"]; + deleteId.append(tmp); + table = "Member"; + } else if (objtype == "trace") { + record = *config; + table = "RemoteTrace"; + } else { + delete config; + continue; + } + delete config; + } catch ( ... ) { + delete config; continue; } @@ -185,10 +194,10 @@ RethinkDB::RethinkDB(EmbeddedNetworkController *const nc,const Address &myAddres rdb = R::connect(this->_host,this->_port,this->_auth); if (rdb) { if (deleteId.length() > 0) { - printf("DELETE: %s" ZT_EOL_S,deleteId.c_str()); + //printf("DELETE: %s" ZT_EOL_S,deleteId.c_str()); R::db(this->_db).table(table).get(deleteId).delete_().run(*rdb); } else { - printf("UPSERT: %s" ZT_EOL_S,record.dump().c_str()); + //printf("UPSERT: %s" ZT_EOL_S,record.dump().c_str()); R::db(this->_db).table(table).insert(R::Datum::from_json(record.dump()),R::optargs("conflict","update","return_changes",false)).run(*rdb); } break; @@ -222,6 +231,7 @@ RethinkDB::RethinkDB(EmbeddedNetworkController *const nc,const Address &myAddres rdb = R::connect(this->_host,this->_port,this->_auth); if (rdb) { OSUtils::ztsnprintf(tmp,sizeof(tmp),"{\"id\":\"%s\",\"lastAlive\":%lld}",this->_myAddressStr.c_str(),(long long)OSUtils::now()); + //printf("HEARTBEAT: %s" ZT_EOL_S,tmp); R::db(this->_db).table("Controller").update(R::Datum::from_json(tmp)).run(*rdb); } } catch ( ... ) { @@ -240,15 +250,18 @@ RethinkDB::~RethinkDB() _commitQueue.stop(); for(int t=0;t<ZT_CONTROLLER_RETHINKDB_COMMIT_THREADS;++t) _commitThread[t].join(); - _membersDbWatcher.detach(); - _networksDbWatcher.detach(); + if (_membersDbWatcherConnection) + ((R::Connection *)_membersDbWatcherConnection)->close(); + if (_networksDbWatcherConnection) + ((R::Connection *)_networksDbWatcherConnection)->close(); + _membersDbWatcher.join(); + _networksDbWatcher.join(); _heartbeatThread.join(); } bool RethinkDB::get(const uint64_t networkId,nlohmann::json &network) { waitForReady(); - std::shared_ptr<_Network> nw; { std::lock_guard<std::mutex> l(_networks_l); @@ -257,17 +270,16 @@ bool RethinkDB::get(const uint64_t networkId,nlohmann::json &network) return false; nw = nwi->second; } - - std::lock_guard<std::mutex> l2(nw->lock); - network = nw->config; - + { + std::lock_guard<std::mutex> l2(nw->lock); + network = nw->config; + } return true; } bool RethinkDB::get(const uint64_t networkId,nlohmann::json &network,const uint64_t memberId,nlohmann::json &member) { waitForReady(); - std::shared_ptr<_Network> nw; { std::lock_guard<std::mutex> l(_networks_l); @@ -276,21 +288,20 @@ bool RethinkDB::get(const uint64_t networkId,nlohmann::json &network,const uint6 return false; nw = nwi->second; } - - std::lock_guard<std::mutex> l2(nw->lock); - auto m = nw->members.find(memberId); - if (m == nw->members.end()) - return false; - network = nw->config; - member = m->second; - + { + std::lock_guard<std::mutex> l2(nw->lock); + network = nw->config; + auto m = nw->members.find(memberId); + if (m == nw->members.end()) + return false; + member = m->second; + } return true; } bool RethinkDB::get(const uint64_t networkId,nlohmann::json &network,const uint64_t memberId,nlohmann::json &member,NetworkSummaryInfo &info) { waitForReady(); - std::shared_ptr<_Network> nw; { std::lock_guard<std::mutex> l(_networks_l); @@ -299,22 +310,21 @@ bool RethinkDB::get(const uint64_t networkId,nlohmann::json &network,const uint6 return false; nw = nwi->second; } - - std::lock_guard<std::mutex> l2(nw->lock); - auto m = nw->members.find(memberId); - if (m == nw->members.end()) - return false; - network = nw->config; - member = m->second; - _fillSummaryInfo(nw,info); - + { + std::lock_guard<std::mutex> l2(nw->lock); + network = nw->config; + _fillSummaryInfo(nw,info); + auto m = nw->members.find(memberId); + if (m == nw->members.end()) + return false; + member = m->second; + } return true; } bool RethinkDB::get(const uint64_t networkId,nlohmann::json &network,std::vector<nlohmann::json> &members) { waitForReady(); - std::shared_ptr<_Network> nw; { std::lock_guard<std::mutex> l(_networks_l); @@ -323,19 +333,18 @@ bool RethinkDB::get(const uint64_t networkId,nlohmann::json &network,std::vector return false; nw = nwi->second; } - - std::lock_guard<std::mutex> l2(nw->lock); - network = nw->config; - for(auto m=nw->members.begin();m!=nw->members.end();++m) - members.push_back(m->second); - + { + std::lock_guard<std::mutex> l2(nw->lock); + network = nw->config; + for(auto m=nw->members.begin();m!=nw->members.end();++m) + members.push_back(m->second); + } return true; } bool RethinkDB::summary(const uint64_t networkId,NetworkSummaryInfo &info) { waitForReady(); - std::shared_ptr<_Network> nw; { std::lock_guard<std::mutex> l(_networks_l); @@ -344,10 +353,10 @@ bool RethinkDB::summary(const uint64_t networkId,NetworkSummaryInfo &info) return false; nw = nwi->second; } - - std::lock_guard<std::mutex> l2(nw->lock); - _fillSummaryInfo(nw,info); - + { + std::lock_guard<std::mutex> l2(nw->lock); + _fillSummaryInfo(nw,info); + } return true; } |