diff options
author | Adam Ierymenko <adam.ierymenko@gmail.com> | 2017-11-09 17:01:16 -0500 |
---|---|---|
committer | Adam Ierymenko <adam.ierymenko@gmail.com> | 2017-11-09 17:01:16 -0500 |
commit | 0fb22df633cde88f69fc93b07b24e5d33bf2c03e (patch) | |
tree | f1ef46eb6fdfb38d78afca4a229cda24277ddc68 /controller/RethinkDB.cpp | |
parent | 750e36993fea4bc6fe39e861d7630013e4f210d2 (diff) | |
download | infinitytier-0fb22df633cde88f69fc93b07b24e5d33bf2c03e.tar.gz infinitytier-0fb22df633cde88f69fc93b07b24e5d33bf2c03e.zip |
Get ephemeral status fields out of the configs. They do not belong there and it just complicates things.
Diffstat (limited to 'controller/RethinkDB.cpp')
-rw-r--r-- | controller/RethinkDB.cpp | 73 |
1 files changed, 65 insertions, 8 deletions
diff --git a/controller/RethinkDB.cpp b/controller/RethinkDB.cpp index cd968e26..00c17d2b 100644 --- a/controller/RethinkDB.cpp +++ b/controller/RethinkDB.cpp @@ -21,6 +21,8 @@ #include "RethinkDB.hpp" #include "EmbeddedNetworkController.hpp" +#include "../version.h" + #include <chrono> #include <algorithm> #include <stdexcept> @@ -216,6 +218,8 @@ RethinkDB::RethinkDB(EmbeddedNetworkController *const nc,const Address &myAddres } _onlineNotificationThread = std::thread([this]() { + int64_t lastUpdatedNetworkStatus = 0; + std::unordered_map< std::pair<uint64_t,uint64_t>,int64_t,_PairHasher > lastOnlineCumulative; try { std::unique_ptr<R::Connection> rdb; while (_run == 1) { @@ -223,24 +227,77 @@ RethinkDB::RethinkDB(EmbeddedNetworkController *const nc,const Address &myAddres if (!rdb) rdb = R::connect(this->_host,this->_port,this->_auth); if (rdb) { - std::lock_guard<std::mutex> l(_lastOnline_l); R::Array batch; R::Object tmpobj; - for(auto i=_lastOnline.begin();i!=_lastOnline.end();++i) { + + std::unordered_map< std::pair<uint64_t,uint64_t>,std::pair<int64_t,InetAddress>,_PairHasher > lastOnline; + { + std::lock_guard<std::mutex> l(_lastOnline_l); + lastOnline.swap(_lastOnline); + } + + for(auto i=lastOnline.begin();i!=lastOnline.end();++i) { + lastOnlineCumulative[i->first] = i->second.first; char tmp[64],tmp2[64]; OSUtils::ztsnprintf(tmp,sizeof(tmp),"%.16llx-%.10llx",i->first.first,i->first.second); tmpobj["id"] = tmp; tmpobj["ts"] = i->second.first; tmpobj["phy"] = i->second.second.toIpString(tmp2); batch.emplace_back(tmpobj); - if (batch.size() >= 256) { - R::db(this->_db).table("MemberLastRequest",R::optargs("read_mode","outdated")).insert(batch,R::optargs("conflict","update")).run(*rdb); + if (batch.size() >= 1024) { + R::db(this->_db).table("MemberStatus",R::optargs("read_mode","outdated")).insert(batch,R::optargs("conflict","update")).run(*rdb); + batch.clear(); + } + } + if (batch.size() > 0) { + R::db(this->_db).table("MemberStatus",R::optargs("read_mode","outdated")).insert(batch,R::optargs("conflict","update")).run(*rdb); + batch.clear(); + } + tmpobj.clear(); + + const int64_t now = OSUtils::now(); + if ((now - lastUpdatedNetworkStatus) > 10000) { + lastUpdatedNetworkStatus = now; + + std::vector< std::pair< uint64_t,std::shared_ptr<_Network> > > networks; + { + std::lock_guard<std::mutex> l(_networks_l); + networks.reserve(_networks.size() + 1); + for(auto i=_networks.begin();i!=_networks.end();++i) + networks.push_back(*i); + } + + for(auto i=networks.begin();i!=networks.end();++i) { + char tmp[64]; + Utils::hex(i->first,tmp); + tmpobj["id"] = tmp; + { + std::lock_guard<std::mutex> l2(i->second->lock); + tmpobj["authorizedMemberCount"] = i->second->authorizedMembers.size(); + tmpobj["totalMemberCount"] = i->second->members.size(); + unsigned long activeMemberCount = 0; + for(auto m=i->second->members.begin();m!=i->second->members.end();++m) { + auto lo = lastOnlineCumulative.find(std::pair<uint64_t,uint64_t>(i->first,m->first)); + if (lo != lastOnlineCumulative.end()) { + if ((now - lo->second) <= (ZT_NETWORK_AUTOCONF_DELAY * 2)) + ++activeMemberCount; + else lastOnlineCumulative.erase(lo); + } + } + tmpobj["activeMemberCount"] = activeMemberCount; + tmpobj["bridgeCount"] = i->second->activeBridgeMembers.size(); + } + batch.emplace_back(tmpobj); + if (batch.size() >= 1024) { + R::db(this->_db).table("NetworkStatus",R::optargs("read_mode","outdated")).insert(batch,R::optargs("conflict","update")).run(*rdb); + batch.clear(); + } + } + if (batch.size() > 0) { + R::db(this->_db).table("NetworkStatus",R::optargs("read_mode","outdated")).insert(batch,R::optargs("conflict","update")).run(*rdb); batch.clear(); } } - if (batch.size() > 0) - R::db(this->_db).table("MemberLastRequest",R::optargs("read_mode","outdated")).insert(batch,R::optargs("conflict","update")).run(*rdb); - _lastOnline.clear(); } } catch (std::exception &e) { fprintf(stderr,"ERROR: controller RethinkDB (node status update): %s" ZT_EOL_S,e.what()); @@ -266,7 +323,7 @@ RethinkDB::RethinkDB(EmbeddedNetworkController *const nc,const Address &myAddres if (!rdb) rdb = R::connect(this->_host,this->_port,this->_auth); if (rdb) { - OSUtils::ztsnprintf(tmp,sizeof(tmp),"{\"id\":\"%s\",\"lastAlive\":%lld}",this->_myAddressStr.c_str(),(long long)OSUtils::now()); + OSUtils::ztsnprintf(tmp,sizeof(tmp),"{\"id\":\"%s\",\"lastAlive\":%lld,\"version\":\"%d.%d.%d\"}",this->_myAddressStr.c_str(),(long long)OSUtils::now(),ZEROTIER_ONE_VERSION_MAJOR,ZEROTIER_ONE_VERSION_MINOR,ZEROTIER_ONE_VERSION_REVISION); //printf("HEARTBEAT: %s" ZT_EOL_S,tmp); R::db(this->_db).table("Controller").update(R::Datum::from_json(tmp)).run(*rdb); } |