summaryrefslogtreecommitdiff
path: root/controller
diff options
context:
space:
mode:
authorAdam Ierymenko <adam.ierymenko@gmail.com>2017-11-03 22:40:26 -0400
committerAdam Ierymenko <adam.ierymenko@gmail.com>2017-11-03 22:40:26 -0400
commit7fc9094d8ea1c2d28d003c499016f0755b73063d (patch)
tree9f5b5fd51aad00bc964e06e188486d026b0e27e3 /controller
parent92c7070aa85425041f856d7e4203bdd1ae713c33 (diff)
downloadinfinitytier-7fc9094d8ea1c2d28d003c499016f0755b73063d.tar.gz
infinitytier-7fc9094d8ea1c2d28d003c499016f0755b73063d.zip
More fixes to RethinkDB.
Diffstat (limited to 'controller')
-rw-r--r--controller/EmbeddedNetworkController.cpp11
-rw-r--r--controller/RethinkDB.cpp145
2 files changed, 84 insertions, 72 deletions
diff --git a/controller/EmbeddedNetworkController.cpp b/controller/EmbeddedNetworkController.cpp
index 1ca2ee08..5707e6e0 100644
--- a/controller/EmbeddedNetworkController.cpp
+++ b/controller/EmbeddedNetworkController.cpp
@@ -1185,8 +1185,9 @@ void EmbeddedNetworkController::_request(
ms.lastRequestTime = now;
}
- OSUtils::ztsnprintf(nwids,sizeof(nwids),"%.16llx",nwid);
- if (!_db->get(nwid,network,identity.address().toInt(),member,ns)) {
+ Utils::hex(nwid,nwids);
+ _db->get(nwid,network,identity.address().toInt(),member,ns);
+ if ((!network.is_object())||(network.size() == 0)) {
_sender->ncSendError(nwid,requestPacketId,identity.address(),NetworkController::NC_ERROR_OBJECT_NOT_FOUND);
return;
}
@@ -1684,11 +1685,13 @@ void EmbeddedNetworkController::_startThreads()
_threads.emplace_back([this]() {
for(;;) {
_RQEntry *qe = (_RQEntry *)0;
- if (_queue.get(qe))
+ if (!_queue.get(qe))
break;
try {
- if (qe)
+ if (qe) {
_request(qe->nwid,qe->fromAddr,qe->requestPacketId,qe->identity,qe->metaData);
+ delete qe;
+ }
} catch (std::exception &e) {
fprintf(stderr,"ERROR: exception in controller request handling thread: %s" ZT_EOL_S,e.what());
} catch ( ... ) {
diff --git a/controller/RethinkDB.cpp b/controller/RethinkDB.cpp
index 1f8d29df..ffa1a188 100644
--- a/controller/RethinkDB.cpp
+++ b/controller/RethinkDB.cpp
@@ -78,6 +78,7 @@ RethinkDB::RethinkDB(EmbeddedNetworkController *const nc,const Address &myAddres
json &ov = tmp["old_val"];
json &nv = tmp["new_val"];
if (ov.is_object()||nv.is_object()) {
+ //if (nv.is_object()) printf("MEMBER: %s" ZT_EOL_S,nv.dump().c_str());
this->_memberChanged(ov,nv);
}
} catch ( ... ) {} // ignore bad records
@@ -118,6 +119,7 @@ RethinkDB::RethinkDB(EmbeddedNetworkController *const nc,const Address &myAddres
json &ov = tmp["old_val"];
json &nv = tmp["new_val"];
if (ov.is_object()||nv.is_object()) {
+ //if (nv.is_object()) printf("NETWORK: %s" ZT_EOL_S,nv.dump().c_str());
this->_networkChanged(ov,nv);
}
} catch ( ... ) {} // ignore bad records
@@ -148,34 +150,41 @@ RethinkDB::RethinkDB(EmbeddedNetworkController *const nc,const Address &myAddres
const std::string objtype = (*config)["objtype"];
const char *table;
std::string deleteId;
- if (objtype == "member") {
- const std::string nwid = (*config)["nwid"];
- const std::string id = (*config)["id"];
- record["id"] = nwid + "-" + id;
- record["controllerId"] = this->_myAddressStr;
- record["networkId"] = nwid;
- record["nodeId"] = id;
- record["config"] = *config;
- table = "Member";
- } else if (objtype == "network") {
- const std::string id = (*config)["id"];
- record["id"] = id;
- record["controllerId"] = this->_myAddressStr;
- record["config"] = *config;
- table = "Network";
- } else if (objtype == "delete_network") {
- deleteId = (*config)["id"];
- table = "Network";
- } else if (objtype == "delete_member") {
- deleteId = (*config)["nwid"];
- deleteId.push_back('-');
- const std::string tmp = (*config)["id"];
- deleteId.append(tmp);
- table = "Member";
- } else if (objtype == "trace") {
- record = *config;
- table = "RemoteTrace";
- } else {
+ try {
+ if (objtype == "member") {
+ const std::string nwid = (*config)["nwid"];
+ const std::string id = (*config)["id"];
+ record["id"] = nwid + "-" + id;
+ record["controllerId"] = this->_myAddressStr;
+ record["networkId"] = nwid;
+ record["nodeId"] = id;
+ record["config"] = *config;
+ table = "Member";
+ } else if (objtype == "network") {
+ const std::string id = (*config)["id"];
+ record["id"] = id;
+ record["controllerId"] = this->_myAddressStr;
+ record["config"] = *config;
+ table = "Network";
+ } else if (objtype == "delete_network") {
+ deleteId = (*config)["id"];
+ table = "Network";
+ } else if (objtype == "delete_member") {
+ deleteId = (*config)["nwid"];
+ deleteId.push_back('-');
+ const std::string tmp = (*config)["id"];
+ deleteId.append(tmp);
+ table = "Member";
+ } else if (objtype == "trace") {
+ record = *config;
+ table = "RemoteTrace";
+ } else {
+ delete config;
+ continue;
+ }
+ delete config;
+ } catch ( ... ) {
+ delete config;
continue;
}
@@ -185,10 +194,10 @@ RethinkDB::RethinkDB(EmbeddedNetworkController *const nc,const Address &myAddres
rdb = R::connect(this->_host,this->_port,this->_auth);
if (rdb) {
if (deleteId.length() > 0) {
- printf("DELETE: %s" ZT_EOL_S,deleteId.c_str());
+ //printf("DELETE: %s" ZT_EOL_S,deleteId.c_str());
R::db(this->_db).table(table).get(deleteId).delete_().run(*rdb);
} else {
- printf("UPSERT: %s" ZT_EOL_S,record.dump().c_str());
+ //printf("UPSERT: %s" ZT_EOL_S,record.dump().c_str());
R::db(this->_db).table(table).insert(R::Datum::from_json(record.dump()),R::optargs("conflict","update","return_changes",false)).run(*rdb);
}
break;
@@ -222,6 +231,7 @@ RethinkDB::RethinkDB(EmbeddedNetworkController *const nc,const Address &myAddres
rdb = R::connect(this->_host,this->_port,this->_auth);
if (rdb) {
OSUtils::ztsnprintf(tmp,sizeof(tmp),"{\"id\":\"%s\",\"lastAlive\":%lld}",this->_myAddressStr.c_str(),(long long)OSUtils::now());
+ //printf("HEARTBEAT: %s" ZT_EOL_S,tmp);
R::db(this->_db).table("Controller").update(R::Datum::from_json(tmp)).run(*rdb);
}
} catch ( ... ) {
@@ -240,15 +250,18 @@ RethinkDB::~RethinkDB()
_commitQueue.stop();
for(int t=0;t<ZT_CONTROLLER_RETHINKDB_COMMIT_THREADS;++t)
_commitThread[t].join();
- _membersDbWatcher.detach();
- _networksDbWatcher.detach();
+ if (_membersDbWatcherConnection)
+ ((R::Connection *)_membersDbWatcherConnection)->close();
+ if (_networksDbWatcherConnection)
+ ((R::Connection *)_networksDbWatcherConnection)->close();
+ _membersDbWatcher.join();
+ _networksDbWatcher.join();
_heartbeatThread.join();
}
bool RethinkDB::get(const uint64_t networkId,nlohmann::json &network)
{
waitForReady();
-
std::shared_ptr<_Network> nw;
{
std::lock_guard<std::mutex> l(_networks_l);
@@ -257,17 +270,16 @@ bool RethinkDB::get(const uint64_t networkId,nlohmann::json &network)
return false;
nw = nwi->second;
}
-
- std::lock_guard<std::mutex> l2(nw->lock);
- network = nw->config;
-
+ {
+ std::lock_guard<std::mutex> l2(nw->lock);
+ network = nw->config;
+ }
return true;
}
bool RethinkDB::get(const uint64_t networkId,nlohmann::json &network,const uint64_t memberId,nlohmann::json &member)
{
waitForReady();
-
std::shared_ptr<_Network> nw;
{
std::lock_guard<std::mutex> l(_networks_l);
@@ -276,21 +288,20 @@ bool RethinkDB::get(const uint64_t networkId,nlohmann::json &network,const uint6
return false;
nw = nwi->second;
}
-
- std::lock_guard<std::mutex> l2(nw->lock);
- auto m = nw->members.find(memberId);
- if (m == nw->members.end())
- return false;
- network = nw->config;
- member = m->second;
-
+ {
+ std::lock_guard<std::mutex> l2(nw->lock);
+ network = nw->config;
+ auto m = nw->members.find(memberId);
+ if (m == nw->members.end())
+ return false;
+ member = m->second;
+ }
return true;
}
bool RethinkDB::get(const uint64_t networkId,nlohmann::json &network,const uint64_t memberId,nlohmann::json &member,NetworkSummaryInfo &info)
{
waitForReady();
-
std::shared_ptr<_Network> nw;
{
std::lock_guard<std::mutex> l(_networks_l);
@@ -299,22 +310,21 @@ bool RethinkDB::get(const uint64_t networkId,nlohmann::json &network,const uint6
return false;
nw = nwi->second;
}
-
- std::lock_guard<std::mutex> l2(nw->lock);
- auto m = nw->members.find(memberId);
- if (m == nw->members.end())
- return false;
- network = nw->config;
- member = m->second;
- _fillSummaryInfo(nw,info);
-
+ {
+ std::lock_guard<std::mutex> l2(nw->lock);
+ network = nw->config;
+ _fillSummaryInfo(nw,info);
+ auto m = nw->members.find(memberId);
+ if (m == nw->members.end())
+ return false;
+ member = m->second;
+ }
return true;
}
bool RethinkDB::get(const uint64_t networkId,nlohmann::json &network,std::vector<nlohmann::json> &members)
{
waitForReady();
-
std::shared_ptr<_Network> nw;
{
std::lock_guard<std::mutex> l(_networks_l);
@@ -323,19 +333,18 @@ bool RethinkDB::get(const uint64_t networkId,nlohmann::json &network,std::vector
return false;
nw = nwi->second;
}
-
- std::lock_guard<std::mutex> l2(nw->lock);
- network = nw->config;
- for(auto m=nw->members.begin();m!=nw->members.end();++m)
- members.push_back(m->second);
-
+ {
+ std::lock_guard<std::mutex> l2(nw->lock);
+ network = nw->config;
+ for(auto m=nw->members.begin();m!=nw->members.end();++m)
+ members.push_back(m->second);
+ }
return true;
}
bool RethinkDB::summary(const uint64_t networkId,NetworkSummaryInfo &info)
{
waitForReady();
-
std::shared_ptr<_Network> nw;
{
std::lock_guard<std::mutex> l(_networks_l);
@@ -344,10 +353,10 @@ bool RethinkDB::summary(const uint64_t networkId,NetworkSummaryInfo &info)
return false;
nw = nwi->second;
}
-
- std::lock_guard<std::mutex> l2(nw->lock);
- _fillSummaryInfo(nw,info);
-
+ {
+ std::lock_guard<std::mutex> l2(nw->lock);
+ _fillSummaryInfo(nw,info);
+ }
return true;
}