summaryrefslogtreecommitdiff
path: root/controller/RethinkDB.cpp
diff options
context:
space:
mode:
authorAdam Ierymenko <adam.ierymenko@gmail.com>2017-11-09 17:01:16 -0500
committerAdam Ierymenko <adam.ierymenko@gmail.com>2017-11-09 17:01:16 -0500
commit0fb22df633cde88f69fc93b07b24e5d33bf2c03e (patch)
treef1ef46eb6fdfb38d78afca4a229cda24277ddc68 /controller/RethinkDB.cpp
parent750e36993fea4bc6fe39e861d7630013e4f210d2 (diff)
downloadinfinitytier-0fb22df633cde88f69fc93b07b24e5d33bf2c03e.tar.gz
infinitytier-0fb22df633cde88f69fc93b07b24e5d33bf2c03e.zip
Get ephemeral status fields out of the configs. They do not belong there and it just complicates things.
Diffstat (limited to 'controller/RethinkDB.cpp')
-rw-r--r--controller/RethinkDB.cpp73
1 files changed, 65 insertions, 8 deletions
diff --git a/controller/RethinkDB.cpp b/controller/RethinkDB.cpp
index cd968e26..00c17d2b 100644
--- a/controller/RethinkDB.cpp
+++ b/controller/RethinkDB.cpp
@@ -21,6 +21,8 @@
#include "RethinkDB.hpp"
#include "EmbeddedNetworkController.hpp"
+#include "../version.h"
+
#include <chrono>
#include <algorithm>
#include <stdexcept>
@@ -216,6 +218,8 @@ RethinkDB::RethinkDB(EmbeddedNetworkController *const nc,const Address &myAddres
}
_onlineNotificationThread = std::thread([this]() {
+ int64_t lastUpdatedNetworkStatus = 0;
+ std::unordered_map< std::pair<uint64_t,uint64_t>,int64_t,_PairHasher > lastOnlineCumulative;
try {
std::unique_ptr<R::Connection> rdb;
while (_run == 1) {
@@ -223,24 +227,77 @@ RethinkDB::RethinkDB(EmbeddedNetworkController *const nc,const Address &myAddres
if (!rdb)
rdb = R::connect(this->_host,this->_port,this->_auth);
if (rdb) {
- std::lock_guard<std::mutex> l(_lastOnline_l);
R::Array batch;
R::Object tmpobj;
- for(auto i=_lastOnline.begin();i!=_lastOnline.end();++i) {
+
+ std::unordered_map< std::pair<uint64_t,uint64_t>,std::pair<int64_t,InetAddress>,_PairHasher > lastOnline;
+ {
+ std::lock_guard<std::mutex> l(_lastOnline_l);
+ lastOnline.swap(_lastOnline);
+ }
+
+ for(auto i=lastOnline.begin();i!=lastOnline.end();++i) {
+ lastOnlineCumulative[i->first] = i->second.first;
char tmp[64],tmp2[64];
OSUtils::ztsnprintf(tmp,sizeof(tmp),"%.16llx-%.10llx",i->first.first,i->first.second);
tmpobj["id"] = tmp;
tmpobj["ts"] = i->second.first;
tmpobj["phy"] = i->second.second.toIpString(tmp2);
batch.emplace_back(tmpobj);
- if (batch.size() >= 256) {
- R::db(this->_db).table("MemberLastRequest",R::optargs("read_mode","outdated")).insert(batch,R::optargs("conflict","update")).run(*rdb);
+ if (batch.size() >= 1024) {
+ R::db(this->_db).table("MemberStatus",R::optargs("read_mode","outdated")).insert(batch,R::optargs("conflict","update")).run(*rdb);
+ batch.clear();
+ }
+ }
+ if (batch.size() > 0) {
+ R::db(this->_db).table("MemberStatus",R::optargs("read_mode","outdated")).insert(batch,R::optargs("conflict","update")).run(*rdb);
+ batch.clear();
+ }
+ tmpobj.clear();
+
+ const int64_t now = OSUtils::now();
+ if ((now - lastUpdatedNetworkStatus) > 10000) {
+ lastUpdatedNetworkStatus = now;
+
+ std::vector< std::pair< uint64_t,std::shared_ptr<_Network> > > networks;
+ {
+ std::lock_guard<std::mutex> l(_networks_l);
+ networks.reserve(_networks.size() + 1);
+ for(auto i=_networks.begin();i!=_networks.end();++i)
+ networks.push_back(*i);
+ }
+
+ for(auto i=networks.begin();i!=networks.end();++i) {
+ char tmp[64];
+ Utils::hex(i->first,tmp);
+ tmpobj["id"] = tmp;
+ {
+ std::lock_guard<std::mutex> l2(i->second->lock);
+ tmpobj["authorizedMemberCount"] = i->second->authorizedMembers.size();
+ tmpobj["totalMemberCount"] = i->second->members.size();
+ unsigned long activeMemberCount = 0;
+ for(auto m=i->second->members.begin();m!=i->second->members.end();++m) {
+ auto lo = lastOnlineCumulative.find(std::pair<uint64_t,uint64_t>(i->first,m->first));
+ if (lo != lastOnlineCumulative.end()) {
+ if ((now - lo->second) <= (ZT_NETWORK_AUTOCONF_DELAY * 2))
+ ++activeMemberCount;
+ else lastOnlineCumulative.erase(lo);
+ }
+ }
+ tmpobj["activeMemberCount"] = activeMemberCount;
+ tmpobj["bridgeCount"] = i->second->activeBridgeMembers.size();
+ }
+ batch.emplace_back(tmpobj);
+ if (batch.size() >= 1024) {
+ R::db(this->_db).table("NetworkStatus",R::optargs("read_mode","outdated")).insert(batch,R::optargs("conflict","update")).run(*rdb);
+ batch.clear();
+ }
+ }
+ if (batch.size() > 0) {
+ R::db(this->_db).table("NetworkStatus",R::optargs("read_mode","outdated")).insert(batch,R::optargs("conflict","update")).run(*rdb);
batch.clear();
}
}
- if (batch.size() > 0)
- R::db(this->_db).table("MemberLastRequest",R::optargs("read_mode","outdated")).insert(batch,R::optargs("conflict","update")).run(*rdb);
- _lastOnline.clear();
}
} catch (std::exception &e) {
fprintf(stderr,"ERROR: controller RethinkDB (node status update): %s" ZT_EOL_S,e.what());
@@ -266,7 +323,7 @@ RethinkDB::RethinkDB(EmbeddedNetworkController *const nc,const Address &myAddres
if (!rdb)
rdb = R::connect(this->_host,this->_port,this->_auth);
if (rdb) {
- OSUtils::ztsnprintf(tmp,sizeof(tmp),"{\"id\":\"%s\",\"lastAlive\":%lld}",this->_myAddressStr.c_str(),(long long)OSUtils::now());
+ OSUtils::ztsnprintf(tmp,sizeof(tmp),"{\"id\":\"%s\",\"lastAlive\":%lld,\"version\":\"%d.%d.%d\"}",this->_myAddressStr.c_str(),(long long)OSUtils::now(),ZEROTIER_ONE_VERSION_MAJOR,ZEROTIER_ONE_VERSION_MINOR,ZEROTIER_ONE_VERSION_REVISION);
//printf("HEARTBEAT: %s" ZT_EOL_S,tmp);
R::db(this->_db).table("Controller").update(R::Datum::from_json(tmp)).run(*rdb);
}