diff options
-rw-r--r-- | controller/EmbeddedNetworkController.cpp | 193 | ||||
-rw-r--r-- | controller/EmbeddedNetworkController.hpp | 7 | ||||
-rw-r--r-- | controller/JSONDB.cpp | 36 | ||||
-rw-r--r-- | controller/JSONDB.hpp | 15 | ||||
-rw-r--r-- | selftest.cpp | 2 |
5 files changed, 106 insertions, 147 deletions
diff --git a/controller/EmbeddedNetworkController.cpp b/controller/EmbeddedNetworkController.cpp index 597bc9c9..3b901afe 100644 --- a/controller/EmbeddedNetworkController.cpp +++ b/controller/EmbeddedNetworkController.cpp @@ -32,6 +32,7 @@ #include <stdexcept> #include <set> #include <map> +#include <thread> #include "../include/ZeroTierOne.h" #include "../node/Constants.hpp" @@ -430,7 +431,6 @@ static bool _parseRule(json &r,ZT_VirtualNetworkRule &rule) EmbeddedNetworkController::EmbeddedNetworkController(Node *node,const char *dbPath) : _startTime(OSUtils::now()), - _threadsStarted(false), _db(dbPath), _node(node) { @@ -439,11 +439,11 @@ EmbeddedNetworkController::EmbeddedNetworkController(Node *node,const char *dbPa EmbeddedNetworkController::~EmbeddedNetworkController() { Mutex::Lock _l(_threads_m); - if (_threadsStarted) { - for(int i=0;i<(ZT_EMBEDDEDNETWORKCONTROLLER_BACKGROUND_THREAD_COUNT*2);++i) + if (_threads.size() > 0) { + for(unsigned long i=0;i<(((unsigned long)_threads.size())*2);++i) _queue.post((_RQEntry *)0); - for(int i=0;i<ZT_EMBEDDEDNETWORKCONTROLLER_BACKGROUND_THREAD_COUNT;++i) - Thread::join(_threads[i]); + for(std::vector<Thread>::iterator i(_threads.begin());i!=_threads.end();++i) + Thread::join(*i); } } @@ -465,11 +465,13 @@ void EmbeddedNetworkController::request( { Mutex::Lock _l(_threads_m); - if (!_threadsStarted) { - for(int i=0;i<ZT_EMBEDDEDNETWORKCONTROLLER_BACKGROUND_THREAD_COUNT;++i) - _threads[i] = Thread::start(this); + if (_threads.size() == 0) { + long hwc = (long)std::thread::hardware_concurrency(); + if (hwc <= 0) + hwc = 1; + for(long i=0;i<hwc;++i) + _threads.push_back(Thread::start(this)); } - _threadsStarted = true; } _RQEntry *qe = new _RQEntry; @@ -496,11 +498,7 @@ unsigned int EmbeddedNetworkController::handleControlPlaneHttpGET( char nwids[24]; Utils::snprintf(nwids,sizeof(nwids),"%.16llx",(unsigned long long)nwid); - json network; - { - Mutex::Lock _l(_db_m); - network = _db.get("network",nwids); - } + json network(_db.get("network",nwids)); if (!network.size()) return 404; @@ -510,24 +508,13 @@ unsigned int EmbeddedNetworkController::handleControlPlaneHttpGET( if (path.size() >= 4) { const uint64_t address = Utils::hexStrToU64(path[3].c_str()); - - json member; - { - Mutex::Lock _l(_db_m); - member = _db.get("network",nwids,"member",Address(address).toString()); - } + json member(_db.get("network",nwids,"member",Address(address).toString())); if (!member.size()) return 404; - _addMemberNonPersistedFields(member,OSUtils::now()); responseBody = OSUtils::jsonDump(member); responseContentType = "application/json"; - - return 200; } else { - - Mutex::Lock _l(_db_m); - responseBody = "{"; _db.filter((std::string("network/") + nwids + "/member/"),[&responseBody](const std::string &n,const json &member) { if ((member.is_object())&&(member.size() > 0)) { @@ -540,9 +527,8 @@ unsigned int EmbeddedNetworkController::handleControlPlaneHttpGET( }); responseBody.push_back('}'); responseContentType = "application/json"; - - return 200; } + return 200; } // else 404 @@ -560,14 +546,11 @@ unsigned int EmbeddedNetworkController::handleControlPlaneHttpGET( } else if (path.size() == 1) { std::set<std::string> networkIds; - { - Mutex::Lock _l(_db_m); - _db.filter("network/",[&networkIds](const std::string &n,const json &obj) { - if (n.length() == (16 + 8)) - networkIds.insert(n.substr(8)); - return true; // do not delete - }); - } + _db.filter("network/",[&networkIds](const std::string &n,const json &obj) { + if (n.length() == (16 + 8)) + networkIds.insert(n.substr(8)); + return true; // do not delete + }); responseBody.push_back('['); for(std::set<std::string>::iterator i(networkIds.begin());i!=networkIds.end();++i) { @@ -634,11 +617,7 @@ unsigned int EmbeddedNetworkController::handleControlPlaneHttpPOST( char addrs[24]; Utils::snprintf(addrs,sizeof(addrs),"%.10llx",(unsigned long long)address); - json member; - { - Mutex::Lock _l(_db_m); - member = _db.get("network",nwids,"member",Address(address).toString()); - } + json member(_db.get("network",nwids,"member",Address(address).toString())); json origMember(member); // for detecting changes _initMember(member); @@ -735,10 +714,7 @@ unsigned int EmbeddedNetworkController::handleControlPlaneHttpPOST( member["lastModified"] = now; json &revj = member["revision"]; member["revision"] = (revj.is_number() ? ((uint64_t)revj + 1ULL) : 1ULL); - { - Mutex::Lock _l(_db_m); - _db.put("network",nwids,"member",Address(address).toString(),member); - } + _db.put("network",nwids,"member",Address(address).toString(),member); _pushMemberUpdate(now,nwid,member); } @@ -806,31 +782,26 @@ unsigned int EmbeddedNetworkController::handleControlPlaneHttpPOST( } else { // POST to network ID - json network; - { - Mutex::Lock _l(_db_m); - - // Magic ID ending with ______ picks a random unused network ID - if (path[1].substr(10) == "______") { - nwid = 0; - uint64_t nwidPrefix = (Utils::hexStrToU64(path[1].substr(0,10).c_str()) << 24) & 0xffffffffff000000ULL; - uint64_t nwidPostfix = 0; - for(unsigned long k=0;k<100000;++k) { // sanity limit on trials - Utils::getSecureRandom(&nwidPostfix,sizeof(nwidPostfix)); - uint64_t tryNwid = nwidPrefix | (nwidPostfix & 0xffffffULL); - if ((tryNwid & 0xffffffULL) == 0ULL) tryNwid |= 1ULL; - Utils::snprintf(nwids,sizeof(nwids),"%.16llx",(unsigned long long)tryNwid); - if (_db.get("network",nwids).size() <= 0) { - nwid = tryNwid; - break; - } + // Magic ID ending with ______ picks a random unused network ID + if (path[1].substr(10) == "______") { + nwid = 0; + uint64_t nwidPrefix = (Utils::hexStrToU64(path[1].substr(0,10).c_str()) << 24) & 0xffffffffff000000ULL; + uint64_t nwidPostfix = 0; + for(unsigned long k=0;k<100000;++k) { // sanity limit on trials + Utils::getSecureRandom(&nwidPostfix,sizeof(nwidPostfix)); + uint64_t tryNwid = nwidPrefix | (nwidPostfix & 0xffffffULL); + if ((tryNwid & 0xffffffULL) == 0ULL) tryNwid |= 1ULL; + Utils::snprintf(nwids,sizeof(nwids),"%.16llx",(unsigned long long)tryNwid); + if (_db.get("network",nwids).size() <= 0) { + nwid = tryNwid; + break; } - if (!nwid) - return 503; } - - network = _db.get("network",nwids); + if (!nwid) + return 503; } + json network(_db.get("network",nwids)); + json origNetwork(network); // for detecting changes _initNetwork(network); @@ -1044,10 +1015,7 @@ unsigned int EmbeddedNetworkController::handleControlPlaneHttpPOST( json &revj = network["revision"]; network["revision"] = (revj.is_number() ? ((uint64_t)revj + 1ULL) : 1ULL); network["lastModified"] = now; - { - Mutex::Lock _l(_db_m); - _db.put("network",nwids,network); - } + _db.put("network",nwids,network); // Send an update to all members of the network _db.filter((std::string("network/") + nwids + "/member/"),[this,&now,&nwid](const std::string &n,const json &obj) { @@ -1101,11 +1069,7 @@ unsigned int EmbeddedNetworkController::handleControlPlaneHttpDELETE( char nwids[24]; Utils::snprintf(nwids,sizeof(nwids),"%.16llx",nwid); - json network; - { - Mutex::Lock _l(_db_m); - network = _db.get("network",nwids); - } + json network(_db.get("network",nwids)); if (!network.size()) return 404; @@ -1113,8 +1077,6 @@ unsigned int EmbeddedNetworkController::handleControlPlaneHttpDELETE( if ((path.size() == 4)&&(path[2] == "member")&&(path[3].length() == 10)) { const uint64_t address = Utils::hexStrToU64(path[3].c_str()); - Mutex::Lock _l(_db_m); - json member = _db.get("network",nwids,"member",Address(address).toString()); _db.erase("network",nwids,"member",Address(address).toString()); @@ -1125,8 +1087,6 @@ unsigned int EmbeddedNetworkController::handleControlPlaneHttpDELETE( return 200; } } else { - Mutex::Lock _l(_db_m); - std::string pfx("network/"); pfx.append(nwids); _db.filter(pfx,[](const std::string &n,const json &obj) { @@ -1226,7 +1186,6 @@ void EmbeddedNetworkController::_circuitTestCallback(ZT_Node *node,ZT_CircuitTes reinterpret_cast<const InetAddress *>(&(report->receivedFromRemoteAddress))->toString().c_str(), ((double)report->receivedFromLinkQuality / (double)ZT_PATH_LINK_QUALITY_MAX)); - Mutex::Lock _l(self->_db_m); self->_db.writeRaw(id,std::string(tmp)); } @@ -1252,13 +1211,8 @@ void EmbeddedNetworkController::_request( char nwids[24]; Utils::snprintf(nwids,sizeof(nwids),"%.16llx",nwid); - json network; - json member; - { - Mutex::Lock _l(_db_m); - network = _db.get("network",nwids); - member = _db.get("network",nwids,"member",identity.address().toString()); - } + json network(_db.get("network",nwids)); + json member(_db.get("network",nwids,"member",identity.address().toString())); if (!network.size()) { _sender->ncSendError(nwid,requestPacketId,identity.address(),NetworkController::NC_ERROR_OBJECT_NOT_FOUND); @@ -1403,7 +1357,6 @@ void EmbeddedNetworkController::_request( if (!authorizedBy) { if (origMember != member) { member["lastModified"] = now; - Mutex::Lock _l(_db_m); _db.put("network",nwids,"member",identity.address().toString(),member); } _sender->ncSendError(nwid,requestPacketId,identity.address(),NetworkController::NC_ERROR_ACCESS_DENIED); @@ -1759,7 +1712,6 @@ void EmbeddedNetworkController::_request( if (member != origMember) { member["lastModified"] = now; - Mutex::Lock _l(_db_m); _db.put("network",nwids,"member",identity.address().toString(),member); } @@ -1780,45 +1732,42 @@ void EmbeddedNetworkController::_getNetworkMemberInfo(uint64_t now,uint64_t nwid } } - { - Mutex::Lock _l(_db_m); - _db.filter(pfx,[&nmi,&now](const std::string &n,const json &member) { - try { - if (OSUtils::jsonBool(member["authorized"],false)) { - ++nmi.authorizedMemberCount; - - if (member.count("recentLog")) { - const json &mlog = member["recentLog"]; - if ((mlog.is_array())&&(mlog.size() > 0)) { - const json &mlog1 = mlog[0]; - if (mlog1.is_object()) { - if ((now - OSUtils::jsonInt(mlog1["ts"],0ULL)) < ZT_NETCONF_NODE_ACTIVE_THRESHOLD) - ++nmi.activeMemberCount; - } + _db.filter(pfx,[&nmi,&now](const std::string &n,const json &member) { + try { + if (OSUtils::jsonBool(member["authorized"],false)) { + ++nmi.authorizedMemberCount; + + if (member.count("recentLog")) { + const json &mlog = member["recentLog"]; + if ((mlog.is_array())&&(mlog.size() > 0)) { + const json &mlog1 = mlog[0]; + if (mlog1.is_object()) { + if ((now - OSUtils::jsonInt(mlog1["ts"],0ULL)) < ZT_NETCONF_NODE_ACTIVE_THRESHOLD) + ++nmi.activeMemberCount; } } + } - if (OSUtils::jsonBool(member["activeBridge"],false)) { - nmi.activeBridges.insert(Address(Utils::hexStrToU64(OSUtils::jsonString(member["id"],"0000000000").c_str()))); - } + if (OSUtils::jsonBool(member["activeBridge"],false)) { + nmi.activeBridges.insert(Address(Utils::hexStrToU64(OSUtils::jsonString(member["id"],"0000000000").c_str()))); + } - if (member.count("ipAssignments")) { - const json &mips = member["ipAssignments"]; - if (mips.is_array()) { - for(unsigned long i=0;i<mips.size();++i) { - InetAddress mip(OSUtils::jsonString(mips[i],"")); - if ((mip.ss_family == AF_INET)||(mip.ss_family == AF_INET6)) - nmi.allocatedIps.insert(mip); - } + if (member.count("ipAssignments")) { + const json &mips = member["ipAssignments"]; + if (mips.is_array()) { + for(unsigned long i=0;i<mips.size();++i) { + InetAddress mip(OSUtils::jsonString(mips[i],"")); + if ((mip.ss_family == AF_INET)||(mip.ss_family == AF_INET6)) + nmi.allocatedIps.insert(mip); } } - } else { - nmi.mostRecentDeauthTime = std::max(nmi.mostRecentDeauthTime,OSUtils::jsonInt(member["lastDeauthorizedTime"],0ULL)); } - } catch ( ... ) {} - return true; - }); - } + } else { + nmi.mostRecentDeauthTime = std::max(nmi.mostRecentDeauthTime,OSUtils::jsonInt(member["lastDeauthorizedTime"],0ULL)); + } + } catch ( ... ) {} + return true; + }); nmi.nmiTimestamp = now; { diff --git a/controller/EmbeddedNetworkController.hpp b/controller/EmbeddedNetworkController.hpp index 0ae2f3b5..906f4345 100644 --- a/controller/EmbeddedNetworkController.hpp +++ b/controller/EmbeddedNetworkController.hpp @@ -43,9 +43,6 @@ #include "JSONDB.hpp" -// Number of background threads to start -- not actually started until needed -#define ZT_EMBEDDEDNETWORKCONTROLLER_BACKGROUND_THREAD_COUNT 4 - // TTL for circuit tests #define ZT_EMBEDDEDNETWORKCONTROLLER_CIRCUIT_TEST_EXPIRATION 120000 @@ -182,15 +179,13 @@ private: const uint64_t _startTime; BlockingQueue<_RQEntry *> _queue; - Thread _threads[ZT_EMBEDDEDNETWORKCONTROLLER_BACKGROUND_THREAD_COUNT]; - bool _threadsStarted; + std::vector<Thread> _threads; Mutex _threads_m; std::map<uint64_t,_NetworkMemberInfo> _nmiCache; Mutex _nmiCache_m; JSONDB _db; - Mutex _db_m; Node *const _node; std::string _path; diff --git a/controller/JSONDB.cpp b/controller/JSONDB.cpp index d3e76fc1..dd8e3968 100644 --- a/controller/JSONDB.cpp +++ b/controller/JSONDB.cpp @@ -78,22 +78,29 @@ bool JSONDB::writeRaw(const std::string &n,const std::string &obj) bool JSONDB::put(const std::string &n,const nlohmann::json &obj) { const bool r = writeRaw(n,OSUtils::jsonDump(obj)); - _db[n].obj = obj; + { + Mutex::Lock _l(_db_m); + _db[n].obj = obj; + } return r; } -const nlohmann::json &JSONDB::get(const std::string &n) +nlohmann::json JSONDB::get(const std::string &n) { - while (!_ready) { - Thread::sleep(250); - _ready = _reload(_basePath,std::string()); - } + { + Mutex::Lock _l(_db_m); - if (!_isValidObjectName(n)) - return _EMPTY_JSON; - std::map<std::string,_E>::iterator e(_db.find(n)); - if (e != _db.end()) - return e->second.obj; + while (!_ready) { + Thread::sleep(250); + _ready = _reload(_basePath,std::string()); + } + + if (!_isValidObjectName(n)) + return _EMPTY_JSON; + std::map<std::string,_E>::iterator e(_db.find(n)); + if (e != _db.end()) + return e->second.obj; + } std::string buf; if (_httpAddr) { @@ -110,6 +117,7 @@ const nlohmann::json &JSONDB::get(const std::string &n) } try { + Mutex::Lock _l(_db_m); _E &e2 = _db[n]; e2.obj = OSUtils::jsonParse(buf); return e2.obj; @@ -135,11 +143,15 @@ void JSONDB::erase(const std::string &n) OSUtils::rm(path.c_str()); } - _db.erase(n); + { + Mutex::Lock _l(_db_m); + _db.erase(n); + } } bool JSONDB::_reload(const std::string &p,const std::string &b) { + // Assumes _db_m is locked if (_httpAddr) { std::string body; std::map<std::string,std::string> headers; diff --git a/controller/JSONDB.hpp b/controller/JSONDB.hpp index beafbaf5..2d3a5224 100644 --- a/controller/JSONDB.hpp +++ b/controller/JSONDB.hpp @@ -51,18 +51,16 @@ public: bool writeRaw(const std::string &n,const std::string &obj); bool put(const std::string &n,const nlohmann::json &obj); - inline bool put(const std::string &n1,const std::string &n2,const nlohmann::json &obj) { return this->put((n1 + "/" + n2),obj); } inline bool put(const std::string &n1,const std::string &n2,const std::string &n3,const nlohmann::json &obj) { return this->put((n1 + "/" + n2 + "/" + n3),obj); } inline bool put(const std::string &n1,const std::string &n2,const std::string &n3,const std::string &n4,const nlohmann::json &obj) { return this->put((n1 + "/" + n2 + "/" + n3 + "/" + n4),obj); } inline bool put(const std::string &n1,const std::string &n2,const std::string &n3,const std::string &n4,const std::string &n5,const nlohmann::json &obj) { return this->put((n1 + "/" + n2 + "/" + n3 + "/" + n4 + "/" + n5),obj); } - const nlohmann::json &get(const std::string &n); - - inline const nlohmann::json &get(const std::string &n1,const std::string &n2) { return this->get((n1 + "/" + n2)); } - inline const nlohmann::json &get(const std::string &n1,const std::string &n2,const std::string &n3) { return this->get((n1 + "/" + n2 + "/" + n3)); } - inline const nlohmann::json &get(const std::string &n1,const std::string &n2,const std::string &n3,const std::string &n4) { return this->get((n1 + "/" + n2 + "/" + n3 + "/" + n4)); } - inline const nlohmann::json &get(const std::string &n1,const std::string &n2,const std::string &n3,const std::string &n4,const std::string &n5) { return this->get((n1 + "/" + n2 + "/" + n3 + "/" + n4 + "/" + n5)); } + nlohmann::json get(const std::string &n); + inline nlohmann::json get(const std::string &n1,const std::string &n2) { return this->get((n1 + "/" + n2)); } + inline nlohmann::json get(const std::string &n1,const std::string &n2,const std::string &n3) { return this->get((n1 + "/" + n2 + "/" + n3)); } + inline nlohmann::json get(const std::string &n1,const std::string &n2,const std::string &n3,const std::string &n4) { return this->get((n1 + "/" + n2 + "/" + n3 + "/" + n4)); } + inline nlohmann::json get(const std::string &n1,const std::string &n2,const std::string &n3,const std::string &n4,const std::string &n5) { return this->get((n1 + "/" + n2 + "/" + n3 + "/" + n4 + "/" + n5)); } void erase(const std::string &n); @@ -74,6 +72,8 @@ public: template<typename F> inline void filter(const std::string &prefix,F func) { + Mutex::Lock _l(_db_m); + while (!_ready) { Thread::sleep(250); _ready = _reload(_basePath,std::string()); @@ -108,6 +108,7 @@ private: InetAddress _httpAddr; std::string _basePath; std::map<std::string,_E> _db; + Mutex _db_m; volatile bool _ready; }; diff --git a/selftest.cpp b/selftest.cpp index 91d304a6..33e65f2c 100644 --- a/selftest.cpp +++ b/selftest.cpp @@ -25,6 +25,7 @@ #include <iostream> #include <string> #include <vector> +#include <thread> #include "node/Constants.hpp" #include "node/Hashtable.hpp" @@ -1114,6 +1115,7 @@ int main(int argc,char **argv) */ std::cout << "[info] sizeof(void *) == " << sizeof(void *) << std::endl; + std::cout << "[info] hardware concurrency == " << std::thread::hardware_concurrency() << std::endl; std::cout << "[info] sizeof(NetworkConfig) == " << sizeof(ZeroTier::NetworkConfig) << std::endl; srand((unsigned int)time(0)); |