diff options
author | Adam Ierymenko <adam.ierymenko@gmail.com> | 2017-11-09 17:01:16 -0500 |
---|---|---|
committer | Adam Ierymenko <adam.ierymenko@gmail.com> | 2017-11-09 17:01:16 -0500 |
commit | 0fb22df633cde88f69fc93b07b24e5d33bf2c03e (patch) | |
tree | f1ef46eb6fdfb38d78afca4a229cda24277ddc68 | |
parent | 750e36993fea4bc6fe39e861d7630013e4f210d2 (diff) | |
download | infinitytier-0fb22df633cde88f69fc93b07b24e5d33bf2c03e.tar.gz infinitytier-0fb22df633cde88f69fc93b07b24e5d33bf2c03e.zip |
Get ephemeral status fields out of the configs. They do not belong there and it just complicates things.
-rw-r--r-- | controller/EmbeddedNetworkController.cpp | 30 | ||||
-rw-r--r-- | controller/EmbeddedNetworkController.hpp | 43 | ||||
-rw-r--r-- | controller/README.md | 9 | ||||
-rw-r--r-- | controller/RethinkDB.cpp | 73 |
4 files changed, 83 insertions, 72 deletions
diff --git a/controller/EmbeddedNetworkController.cpp b/controller/EmbeddedNetworkController.cpp index 0f82ff63..018f2215 100644 --- a/controller/EmbeddedNetworkController.cpp +++ b/controller/EmbeddedNetworkController.cpp @@ -475,7 +475,12 @@ void EmbeddedNetworkController::init(const Identity &signingId,Sender *sender) _signingId = signingId; _sender = sender; _signingIdAddressString = signingId.address().toString(tmp); - _db.reset(new ControllerDB(this,_signingId.address(),_path.c_str())); +#ifdef ZT_CONTROLLER_USE_RETHINKDB + if ((_path.length() > 10)&&(_path.substr(0,10) == "rethinkdb:")) + _db.reset(new RethinkDB(this,_signingId.address(),_path.c_str())); + else // else use FileDB after endif +#endif + _db.reset(new FileDB(this,_signingId.address(),_path.c_str())); _db->waitForReady(); } @@ -529,7 +534,6 @@ unsigned int EmbeddedNetworkController::handleControlPlaneHttpGET( json member; if (!_db->get(nwid,network,address,member)) return 404; - _addMemberNonPersistedFields(nwid,address,member,OSUtils::now()); responseBody = OSUtils::jsonDump(member); responseContentType = "application/json"; @@ -559,10 +563,6 @@ unsigned int EmbeddedNetworkController::handleControlPlaneHttpGET( } else { // Get network - const int64_t now = OSUtils::now(); - ControllerDB::NetworkSummaryInfo ns; - _db->summary(nwid,ns); - _addNetworkNonPersistedFields(nwid,network,now,ns); responseBody = OSUtils::jsonDump(network); responseContentType = "application/json"; return 200; @@ -733,9 +733,8 @@ unsigned int EmbeddedNetworkController::handleControlPlaneHttpPOST( member["address"] = addrs; // legacy member["nwid"] = nwids; - _removeMemberNonPersistedFields(member); + _cleanMember(member); _db->save(&origMember,member); - _addMemberNonPersistedFields(nwid,address,member,now); responseBody = OSUtils::jsonDump(member); responseContentType = "application/json"; @@ -980,11 +979,8 @@ unsigned int EmbeddedNetworkController::handleControlPlaneHttpPOST( network["id"] = nwids; network["nwid"] = nwids; // legacy - _removeNetworkNonPersistedFields(network); + _cleanNetwork(network); _db->save(&origNetwork,network); - ControllerDB::NetworkSummaryInfo ns; - _db->summary(nwid,ns); - _addNetworkNonPersistedFields(nwid,network,now,ns); responseBody = OSUtils::jsonDump(network); responseContentType = "application/json"; @@ -1156,7 +1152,7 @@ void EmbeddedNetworkController::_request( const Dictionary<ZT_NETWORKCONFIG_METADATA_DICT_CAPACITY> &metaData) { char nwids[24]; - ControllerDB::NetworkSummaryInfo ns; + DB::NetworkSummaryInfo ns; json network,member,origMember; if (!_db) @@ -1282,15 +1278,11 @@ void EmbeddedNetworkController::_request( if (fromAddr) ms.physicalAddr = fromAddr; - - char tmpip[64]; - if (ms.physicalAddr) - member["physicalAddr"] = ms.physicalAddr.toString(tmpip); } } } else { // If they are not authorized, STOP! - _removeMemberNonPersistedFields(member); + _cleanMember(member); _db->save(&origMember,member); _sender->ncSendError(nwid,requestPacketId,identity.address(),NetworkController::NC_ERROR_ACCESS_DENIED); return; @@ -1653,7 +1645,7 @@ void EmbeddedNetworkController::_request( return; } - _removeMemberNonPersistedFields(member); + _cleanMember(member); _db->save(&origMember,member); _sender->ncSendConfig(nwid,requestPacketId,identity.address(),*(nc.get()),metaData.getUI(ZT_NETWORKCONFIG_REQUEST_METADATA_KEY_VERSION,0) < 6); } diff --git a/controller/EmbeddedNetworkController.hpp b/controller/EmbeddedNetworkController.hpp index bc59b359..b04a44c9 100644 --- a/controller/EmbeddedNetworkController.hpp +++ b/controller/EmbeddedNetworkController.hpp @@ -43,20 +43,14 @@ #include "../ext/json/json.hpp" +#include "DB.hpp" +#include "FileDB.hpp" #ifdef ZT_CONTROLLER_USE_RETHINKDB #include "RethinkDB.hpp" -#else -#include "FileDB.hpp" #endif namespace ZeroTier { -#ifdef ZT_CONTROLLER_USE_RETHINKDB -typedef RethinkDB ControllerDB; -#else -typedef FileDB ControllerDB; -#endif - class Node; class EmbeddedNetworkController : public NetworkController @@ -130,7 +124,6 @@ private: if (!member.count("vMinor")) member["vMinor"] = -1; if (!member.count("vRev")) member["vRev"] = -1; if (!member.count("vProto")) member["vProto"] = -1; - if (!member.count("physicalAddr")) member["physicalAddr"] = nlohmann::json(); if (!member.count("remoteTraceTarget")) member["remoteTraceTarget"] = nlohmann::json(); member["objtype"] = "member"; } @@ -160,42 +153,18 @@ private: } network["objtype"] = "network"; } - inline void _addNetworkNonPersistedFields(const uint64_t nwid,nlohmann::json &network,int64_t now,const ControllerDB::NetworkSummaryInfo &ns) - { - network["clock"] = now; - network["authorizedMemberCount"] = ns.authorizedMemberCount; - network["totalMemberCount"] = ns.totalMemberCount; - { - std::lock_guard<std::mutex> l(_memberStatus_l); - unsigned long ac = 0; - for(auto ms=_memberStatus.begin();ms!=_memberStatus.end();++ms) { - if ((ms->first.networkId == nwid)&&(ms->second.online(now))) - ++ac; - } - network["activeMemberCount"] = ac; - } - } - inline void _removeNetworkNonPersistedFields(nlohmann::json &network) + inline void _cleanNetwork(nlohmann::json &network) { network.erase("clock"); network.erase("authorizedMemberCount"); network.erase("activeMemberCount"); network.erase("totalMemberCount"); - // legacy fields network.erase("lastModified"); } - inline void _addMemberNonPersistedFields(uint64_t nwid,uint64_t nodeId,nlohmann::json &member,int64_t now) - { - member["clock"] = now; - { - std::lock_guard<std::mutex> l(_memberStatus_l); - member["online"] = _memberStatus[_MemberStatusKey(nwid,nodeId)].online(now); - } - } - inline void _removeMemberNonPersistedFields(nlohmann::json &member) + inline void _cleanMember(nlohmann::json &member) { member.erase("clock"); - // legacy fields + member.erase("physicalAddr"); member.erase("recentLog"); member.erase("lastModified"); member.erase("lastRequestMetaData"); @@ -244,7 +213,7 @@ private: Identity _signingId; std::string _signingIdAddressString; NetworkController::Sender *_sender; - std::unique_ptr<ControllerDB> _db; + std::unique_ptr<DB> _db; BlockingQueue< _RQEntry * > _queue; std::vector<std::thread> _threads; std::mutex _threads_l; diff --git a/controller/README.md b/controller/README.md index 5a9dadc2..a684ed9c 100644 --- a/controller/README.md +++ b/controller/README.md @@ -79,7 +79,6 @@ Example: | --------------------- | ------------- | ------------------------------------------------- | -------- | | id | string | 16-digit network ID | no | | nwid | string | 16-digit network ID (old, but still around) | no | -| clock | integer | Current clock, ms since epoch | no | | name | string | A short name for this network | YES | | private | boolean | Is access control enabled? | YES | | enableBroadcast | boolean | Ethernet ff:ff:ff:ff:ff:ff allowed? | YES | @@ -89,9 +88,6 @@ Example: | multicastLimit | integer | Maximum recipients for a multicast packet | YES | | creationTime | integer | Time network was first created | no | | revision | integer | Network config revision counter | no | -| authorizedMemberCount | integer | Number of authorized members (for private nets) | no | -| activeMemberCount | integer | Number of members that appear to be online | no | -| totalMemberCount | integer | Total known members of this network | no | | routes | array[object] | Managed IPv4 and IPv6 routes; see below | YES | | ipAssignmentPools | array[object] | IP auto-assign ranges; see below | YES | | rules | array[object] | Traffic rules; see below | YES | @@ -191,7 +187,7 @@ The entry types and their additional fields are: | `MATCH_TAGS_SAMENESS` | Match if both sides' tags differ by no more than value | `id`,`value` | | `MATCH_TAGS_BITWISE_AND` | Match if both sides' tags AND to value | `id`,`value` | | `MATCH_TAGS_BITWISE_OR` | Match if both sides' tags OR to value | `id`,`value` | -| `MATCH_TAGS_BITWISE_XOR` | Match if both sides` tags XOR to value | `id`,`value` | +| `MATCH_TAGS_BITWISE_XOR` | Match if both sides' tags XOR to value | `id`,`value` | Important notes about rules engine behavior: @@ -227,9 +223,7 @@ This returns an object containing all currently online members and the most rece | id | string | Member's 10-digit ZeroTier address | no | | address | string | Member's 10-digit ZeroTier address | no | | nwid | string | 16-digit network ID | no | -| clock | integer | Current clock, ms since epoch | no | | authorized | boolean | Is member authorized? (for private networks) | YES | -| authHistory | array[object] | History of auth changes, latest at end | no | | activeBridge | boolean | Member is able to bridge to other Ethernet nets | YES | | identity | string | Member's public ZeroTier identity (if known) | no | | ipAssignments | array[string] | Managed IP address assignments | YES | @@ -238,7 +232,6 @@ This returns an object containing all currently online members and the most rece | vMinor | integer | Most recently known minor version | no | | vRev | integer | Most recently known revision | no | | vProto | integer | Most recently known protocl version | no | -| physicalAddr | string | Last known physical IP/port or null if none | no | Note that managed IP assignments are only used if they fall within a managed route. Otherwise they are ignored. diff --git a/controller/RethinkDB.cpp b/controller/RethinkDB.cpp index cd968e26..00c17d2b 100644 --- a/controller/RethinkDB.cpp +++ b/controller/RethinkDB.cpp @@ -21,6 +21,8 @@ #include "RethinkDB.hpp" #include "EmbeddedNetworkController.hpp" +#include "../version.h" + #include <chrono> #include <algorithm> #include <stdexcept> @@ -216,6 +218,8 @@ RethinkDB::RethinkDB(EmbeddedNetworkController *const nc,const Address &myAddres } _onlineNotificationThread = std::thread([this]() { + int64_t lastUpdatedNetworkStatus = 0; + std::unordered_map< std::pair<uint64_t,uint64_t>,int64_t,_PairHasher > lastOnlineCumulative; try { std::unique_ptr<R::Connection> rdb; while (_run == 1) { @@ -223,24 +227,77 @@ RethinkDB::RethinkDB(EmbeddedNetworkController *const nc,const Address &myAddres if (!rdb) rdb = R::connect(this->_host,this->_port,this->_auth); if (rdb) { - std::lock_guard<std::mutex> l(_lastOnline_l); R::Array batch; R::Object tmpobj; - for(auto i=_lastOnline.begin();i!=_lastOnline.end();++i) { + + std::unordered_map< std::pair<uint64_t,uint64_t>,std::pair<int64_t,InetAddress>,_PairHasher > lastOnline; + { + std::lock_guard<std::mutex> l(_lastOnline_l); + lastOnline.swap(_lastOnline); + } + + for(auto i=lastOnline.begin();i!=lastOnline.end();++i) { + lastOnlineCumulative[i->first] = i->second.first; char tmp[64],tmp2[64]; OSUtils::ztsnprintf(tmp,sizeof(tmp),"%.16llx-%.10llx",i->first.first,i->first.second); tmpobj["id"] = tmp; tmpobj["ts"] = i->second.first; tmpobj["phy"] = i->second.second.toIpString(tmp2); batch.emplace_back(tmpobj); - if (batch.size() >= 256) { - R::db(this->_db).table("MemberLastRequest",R::optargs("read_mode","outdated")).insert(batch,R::optargs("conflict","update")).run(*rdb); + if (batch.size() >= 1024) { + R::db(this->_db).table("MemberStatus",R::optargs("read_mode","outdated")).insert(batch,R::optargs("conflict","update")).run(*rdb); + batch.clear(); + } + } + if (batch.size() > 0) { + R::db(this->_db).table("MemberStatus",R::optargs("read_mode","outdated")).insert(batch,R::optargs("conflict","update")).run(*rdb); + batch.clear(); + } + tmpobj.clear(); + + const int64_t now = OSUtils::now(); + if ((now - lastUpdatedNetworkStatus) > 10000) { + lastUpdatedNetworkStatus = now; + + std::vector< std::pair< uint64_t,std::shared_ptr<_Network> > > networks; + { + std::lock_guard<std::mutex> l(_networks_l); + networks.reserve(_networks.size() + 1); + for(auto i=_networks.begin();i!=_networks.end();++i) + networks.push_back(*i); + } + + for(auto i=networks.begin();i!=networks.end();++i) { + char tmp[64]; + Utils::hex(i->first,tmp); + tmpobj["id"] = tmp; + { + std::lock_guard<std::mutex> l2(i->second->lock); + tmpobj["authorizedMemberCount"] = i->second->authorizedMembers.size(); + tmpobj["totalMemberCount"] = i->second->members.size(); + unsigned long activeMemberCount = 0; + for(auto m=i->second->members.begin();m!=i->second->members.end();++m) { + auto lo = lastOnlineCumulative.find(std::pair<uint64_t,uint64_t>(i->first,m->first)); + if (lo != lastOnlineCumulative.end()) { + if ((now - lo->second) <= (ZT_NETWORK_AUTOCONF_DELAY * 2)) + ++activeMemberCount; + else lastOnlineCumulative.erase(lo); + } + } + tmpobj["activeMemberCount"] = activeMemberCount; + tmpobj["bridgeCount"] = i->second->activeBridgeMembers.size(); + } + batch.emplace_back(tmpobj); + if (batch.size() >= 1024) { + R::db(this->_db).table("NetworkStatus",R::optargs("read_mode","outdated")).insert(batch,R::optargs("conflict","update")).run(*rdb); + batch.clear(); + } + } + if (batch.size() > 0) { + R::db(this->_db).table("NetworkStatus",R::optargs("read_mode","outdated")).insert(batch,R::optargs("conflict","update")).run(*rdb); batch.clear(); } } - if (batch.size() > 0) - R::db(this->_db).table("MemberLastRequest",R::optargs("read_mode","outdated")).insert(batch,R::optargs("conflict","update")).run(*rdb); - _lastOnline.clear(); } } catch (std::exception &e) { fprintf(stderr,"ERROR: controller RethinkDB (node status update): %s" ZT_EOL_S,e.what()); @@ -266,7 +323,7 @@ RethinkDB::RethinkDB(EmbeddedNetworkController *const nc,const Address &myAddres if (!rdb) rdb = R::connect(this->_host,this->_port,this->_auth); if (rdb) { - OSUtils::ztsnprintf(tmp,sizeof(tmp),"{\"id\":\"%s\",\"lastAlive\":%lld}",this->_myAddressStr.c_str(),(long long)OSUtils::now()); + OSUtils::ztsnprintf(tmp,sizeof(tmp),"{\"id\":\"%s\",\"lastAlive\":%lld,\"version\":\"%d.%d.%d\"}",this->_myAddressStr.c_str(),(long long)OSUtils::now(),ZEROTIER_ONE_VERSION_MAJOR,ZEROTIER_ONE_VERSION_MINOR,ZEROTIER_ONE_VERSION_REVISION); //printf("HEARTBEAT: %s" ZT_EOL_S,tmp); R::db(this->_db).table("Controller").update(R::Datum::from_json(tmp)).run(*rdb); } |