summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--controller/EmbeddedNetworkController.cpp193
-rw-r--r--controller/EmbeddedNetworkController.hpp7
-rw-r--r--controller/JSONDB.cpp36
-rw-r--r--controller/JSONDB.hpp15
-rw-r--r--selftest.cpp2
5 files changed, 106 insertions, 147 deletions
diff --git a/controller/EmbeddedNetworkController.cpp b/controller/EmbeddedNetworkController.cpp
index 597bc9c9..3b901afe 100644
--- a/controller/EmbeddedNetworkController.cpp
+++ b/controller/EmbeddedNetworkController.cpp
@@ -32,6 +32,7 @@
#include <stdexcept>
#include <set>
#include <map>
+#include <thread>
#include "../include/ZeroTierOne.h"
#include "../node/Constants.hpp"
@@ -430,7 +431,6 @@ static bool _parseRule(json &r,ZT_VirtualNetworkRule &rule)
EmbeddedNetworkController::EmbeddedNetworkController(Node *node,const char *dbPath) :
_startTime(OSUtils::now()),
- _threadsStarted(false),
_db(dbPath),
_node(node)
{
@@ -439,11 +439,11 @@ EmbeddedNetworkController::EmbeddedNetworkController(Node *node,const char *dbPa
EmbeddedNetworkController::~EmbeddedNetworkController()
{
Mutex::Lock _l(_threads_m);
- if (_threadsStarted) {
- for(int i=0;i<(ZT_EMBEDDEDNETWORKCONTROLLER_BACKGROUND_THREAD_COUNT*2);++i)
+ if (_threads.size() > 0) {
+ for(unsigned long i=0;i<(((unsigned long)_threads.size())*2);++i)
_queue.post((_RQEntry *)0);
- for(int i=0;i<ZT_EMBEDDEDNETWORKCONTROLLER_BACKGROUND_THREAD_COUNT;++i)
- Thread::join(_threads[i]);
+ for(std::vector<Thread>::iterator i(_threads.begin());i!=_threads.end();++i)
+ Thread::join(*i);
}
}
@@ -465,11 +465,13 @@ void EmbeddedNetworkController::request(
{
Mutex::Lock _l(_threads_m);
- if (!_threadsStarted) {
- for(int i=0;i<ZT_EMBEDDEDNETWORKCONTROLLER_BACKGROUND_THREAD_COUNT;++i)
- _threads[i] = Thread::start(this);
+ if (_threads.size() == 0) {
+ long hwc = (long)std::thread::hardware_concurrency();
+ if (hwc <= 0)
+ hwc = 1;
+ for(long i=0;i<hwc;++i)
+ _threads.push_back(Thread::start(this));
}
- _threadsStarted = true;
}
_RQEntry *qe = new _RQEntry;
@@ -496,11 +498,7 @@ unsigned int EmbeddedNetworkController::handleControlPlaneHttpGET(
char nwids[24];
Utils::snprintf(nwids,sizeof(nwids),"%.16llx",(unsigned long long)nwid);
- json network;
- {
- Mutex::Lock _l(_db_m);
- network = _db.get("network",nwids);
- }
+ json network(_db.get("network",nwids));
if (!network.size())
return 404;
@@ -510,24 +508,13 @@ unsigned int EmbeddedNetworkController::handleControlPlaneHttpGET(
if (path.size() >= 4) {
const uint64_t address = Utils::hexStrToU64(path[3].c_str());
-
- json member;
- {
- Mutex::Lock _l(_db_m);
- member = _db.get("network",nwids,"member",Address(address).toString());
- }
+ json member(_db.get("network",nwids,"member",Address(address).toString()));
if (!member.size())
return 404;
-
_addMemberNonPersistedFields(member,OSUtils::now());
responseBody = OSUtils::jsonDump(member);
responseContentType = "application/json";
-
- return 200;
} else {
-
- Mutex::Lock _l(_db_m);
-
responseBody = "{";
_db.filter((std::string("network/") + nwids + "/member/"),[&responseBody](const std::string &n,const json &member) {
if ((member.is_object())&&(member.size() > 0)) {
@@ -540,9 +527,8 @@ unsigned int EmbeddedNetworkController::handleControlPlaneHttpGET(
});
responseBody.push_back('}');
responseContentType = "application/json";
-
- return 200;
}
+ return 200;
} // else 404
@@ -560,14 +546,11 @@ unsigned int EmbeddedNetworkController::handleControlPlaneHttpGET(
} else if (path.size() == 1) {
std::set<std::string> networkIds;
- {
- Mutex::Lock _l(_db_m);
- _db.filter("network/",[&networkIds](const std::string &n,const json &obj) {
- if (n.length() == (16 + 8))
- networkIds.insert(n.substr(8));
- return true; // do not delete
- });
- }
+ _db.filter("network/",[&networkIds](const std::string &n,const json &obj) {
+ if (n.length() == (16 + 8))
+ networkIds.insert(n.substr(8));
+ return true; // do not delete
+ });
responseBody.push_back('[');
for(std::set<std::string>::iterator i(networkIds.begin());i!=networkIds.end();++i) {
@@ -634,11 +617,7 @@ unsigned int EmbeddedNetworkController::handleControlPlaneHttpPOST(
char addrs[24];
Utils::snprintf(addrs,sizeof(addrs),"%.10llx",(unsigned long long)address);
- json member;
- {
- Mutex::Lock _l(_db_m);
- member = _db.get("network",nwids,"member",Address(address).toString());
- }
+ json member(_db.get("network",nwids,"member",Address(address).toString()));
json origMember(member); // for detecting changes
_initMember(member);
@@ -735,10 +714,7 @@ unsigned int EmbeddedNetworkController::handleControlPlaneHttpPOST(
member["lastModified"] = now;
json &revj = member["revision"];
member["revision"] = (revj.is_number() ? ((uint64_t)revj + 1ULL) : 1ULL);
- {
- Mutex::Lock _l(_db_m);
- _db.put("network",nwids,"member",Address(address).toString(),member);
- }
+ _db.put("network",nwids,"member",Address(address).toString(),member);
_pushMemberUpdate(now,nwid,member);
}
@@ -806,31 +782,26 @@ unsigned int EmbeddedNetworkController::handleControlPlaneHttpPOST(
} else {
// POST to network ID
- json network;
- {
- Mutex::Lock _l(_db_m);
-
- // Magic ID ending with ______ picks a random unused network ID
- if (path[1].substr(10) == "______") {
- nwid = 0;
- uint64_t nwidPrefix = (Utils::hexStrToU64(path[1].substr(0,10).c_str()) << 24) & 0xffffffffff000000ULL;
- uint64_t nwidPostfix = 0;
- for(unsigned long k=0;k<100000;++k) { // sanity limit on trials
- Utils::getSecureRandom(&nwidPostfix,sizeof(nwidPostfix));
- uint64_t tryNwid = nwidPrefix | (nwidPostfix & 0xffffffULL);
- if ((tryNwid & 0xffffffULL) == 0ULL) tryNwid |= 1ULL;
- Utils::snprintf(nwids,sizeof(nwids),"%.16llx",(unsigned long long)tryNwid);
- if (_db.get("network",nwids).size() <= 0) {
- nwid = tryNwid;
- break;
- }
+ // Magic ID ending with ______ picks a random unused network ID
+ if (path[1].substr(10) == "______") {
+ nwid = 0;
+ uint64_t nwidPrefix = (Utils::hexStrToU64(path[1].substr(0,10).c_str()) << 24) & 0xffffffffff000000ULL;
+ uint64_t nwidPostfix = 0;
+ for(unsigned long k=0;k<100000;++k) { // sanity limit on trials
+ Utils::getSecureRandom(&nwidPostfix,sizeof(nwidPostfix));
+ uint64_t tryNwid = nwidPrefix | (nwidPostfix & 0xffffffULL);
+ if ((tryNwid & 0xffffffULL) == 0ULL) tryNwid |= 1ULL;
+ Utils::snprintf(nwids,sizeof(nwids),"%.16llx",(unsigned long long)tryNwid);
+ if (_db.get("network",nwids).size() <= 0) {
+ nwid = tryNwid;
+ break;
}
- if (!nwid)
- return 503;
}
-
- network = _db.get("network",nwids);
+ if (!nwid)
+ return 503;
}
+ json network(_db.get("network",nwids));
+
json origNetwork(network); // for detecting changes
_initNetwork(network);
@@ -1044,10 +1015,7 @@ unsigned int EmbeddedNetworkController::handleControlPlaneHttpPOST(
json &revj = network["revision"];
network["revision"] = (revj.is_number() ? ((uint64_t)revj + 1ULL) : 1ULL);
network["lastModified"] = now;
- {
- Mutex::Lock _l(_db_m);
- _db.put("network",nwids,network);
- }
+ _db.put("network",nwids,network);
// Send an update to all members of the network
_db.filter((std::string("network/") + nwids + "/member/"),[this,&now,&nwid](const std::string &n,const json &obj) {
@@ -1101,11 +1069,7 @@ unsigned int EmbeddedNetworkController::handleControlPlaneHttpDELETE(
char nwids[24];
Utils::snprintf(nwids,sizeof(nwids),"%.16llx",nwid);
- json network;
- {
- Mutex::Lock _l(_db_m);
- network = _db.get("network",nwids);
- }
+ json network(_db.get("network",nwids));
if (!network.size())
return 404;
@@ -1113,8 +1077,6 @@ unsigned int EmbeddedNetworkController::handleControlPlaneHttpDELETE(
if ((path.size() == 4)&&(path[2] == "member")&&(path[3].length() == 10)) {
const uint64_t address = Utils::hexStrToU64(path[3].c_str());
- Mutex::Lock _l(_db_m);
-
json member = _db.get("network",nwids,"member",Address(address).toString());
_db.erase("network",nwids,"member",Address(address).toString());
@@ -1125,8 +1087,6 @@ unsigned int EmbeddedNetworkController::handleControlPlaneHttpDELETE(
return 200;
}
} else {
- Mutex::Lock _l(_db_m);
-
std::string pfx("network/");
pfx.append(nwids);
_db.filter(pfx,[](const std::string &n,const json &obj) {
@@ -1226,7 +1186,6 @@ void EmbeddedNetworkController::_circuitTestCallback(ZT_Node *node,ZT_CircuitTes
reinterpret_cast<const InetAddress *>(&(report->receivedFromRemoteAddress))->toString().c_str(),
((double)report->receivedFromLinkQuality / (double)ZT_PATH_LINK_QUALITY_MAX));
- Mutex::Lock _l(self->_db_m);
self->_db.writeRaw(id,std::string(tmp));
}
@@ -1252,13 +1211,8 @@ void EmbeddedNetworkController::_request(
char nwids[24];
Utils::snprintf(nwids,sizeof(nwids),"%.16llx",nwid);
- json network;
- json member;
- {
- Mutex::Lock _l(_db_m);
- network = _db.get("network",nwids);
- member = _db.get("network",nwids,"member",identity.address().toString());
- }
+ json network(_db.get("network",nwids));
+ json member(_db.get("network",nwids,"member",identity.address().toString()));
if (!network.size()) {
_sender->ncSendError(nwid,requestPacketId,identity.address(),NetworkController::NC_ERROR_OBJECT_NOT_FOUND);
@@ -1403,7 +1357,6 @@ void EmbeddedNetworkController::_request(
if (!authorizedBy) {
if (origMember != member) {
member["lastModified"] = now;
- Mutex::Lock _l(_db_m);
_db.put("network",nwids,"member",identity.address().toString(),member);
}
_sender->ncSendError(nwid,requestPacketId,identity.address(),NetworkController::NC_ERROR_ACCESS_DENIED);
@@ -1759,7 +1712,6 @@ void EmbeddedNetworkController::_request(
if (member != origMember) {
member["lastModified"] = now;
- Mutex::Lock _l(_db_m);
_db.put("network",nwids,"member",identity.address().toString(),member);
}
@@ -1780,45 +1732,42 @@ void EmbeddedNetworkController::_getNetworkMemberInfo(uint64_t now,uint64_t nwid
}
}
- {
- Mutex::Lock _l(_db_m);
- _db.filter(pfx,[&nmi,&now](const std::string &n,const json &member) {
- try {
- if (OSUtils::jsonBool(member["authorized"],false)) {
- ++nmi.authorizedMemberCount;
-
- if (member.count("recentLog")) {
- const json &mlog = member["recentLog"];
- if ((mlog.is_array())&&(mlog.size() > 0)) {
- const json &mlog1 = mlog[0];
- if (mlog1.is_object()) {
- if ((now - OSUtils::jsonInt(mlog1["ts"],0ULL)) < ZT_NETCONF_NODE_ACTIVE_THRESHOLD)
- ++nmi.activeMemberCount;
- }
+ _db.filter(pfx,[&nmi,&now](const std::string &n,const json &member) {
+ try {
+ if (OSUtils::jsonBool(member["authorized"],false)) {
+ ++nmi.authorizedMemberCount;
+
+ if (member.count("recentLog")) {
+ const json &mlog = member["recentLog"];
+ if ((mlog.is_array())&&(mlog.size() > 0)) {
+ const json &mlog1 = mlog[0];
+ if (mlog1.is_object()) {
+ if ((now - OSUtils::jsonInt(mlog1["ts"],0ULL)) < ZT_NETCONF_NODE_ACTIVE_THRESHOLD)
+ ++nmi.activeMemberCount;
}
}
+ }
- if (OSUtils::jsonBool(member["activeBridge"],false)) {
- nmi.activeBridges.insert(Address(Utils::hexStrToU64(OSUtils::jsonString(member["id"],"0000000000").c_str())));
- }
+ if (OSUtils::jsonBool(member["activeBridge"],false)) {
+ nmi.activeBridges.insert(Address(Utils::hexStrToU64(OSUtils::jsonString(member["id"],"0000000000").c_str())));
+ }
- if (member.count("ipAssignments")) {
- const json &mips = member["ipAssignments"];
- if (mips.is_array()) {
- for(unsigned long i=0;i<mips.size();++i) {
- InetAddress mip(OSUtils::jsonString(mips[i],""));
- if ((mip.ss_family == AF_INET)||(mip.ss_family == AF_INET6))
- nmi.allocatedIps.insert(mip);
- }
+ if (member.count("ipAssignments")) {
+ const json &mips = member["ipAssignments"];
+ if (mips.is_array()) {
+ for(unsigned long i=0;i<mips.size();++i) {
+ InetAddress mip(OSUtils::jsonString(mips[i],""));
+ if ((mip.ss_family == AF_INET)||(mip.ss_family == AF_INET6))
+ nmi.allocatedIps.insert(mip);
}
}
- } else {
- nmi.mostRecentDeauthTime = std::max(nmi.mostRecentDeauthTime,OSUtils::jsonInt(member["lastDeauthorizedTime"],0ULL));
}
- } catch ( ... ) {}
- return true;
- });
- }
+ } else {
+ nmi.mostRecentDeauthTime = std::max(nmi.mostRecentDeauthTime,OSUtils::jsonInt(member["lastDeauthorizedTime"],0ULL));
+ }
+ } catch ( ... ) {}
+ return true;
+ });
nmi.nmiTimestamp = now;
{
diff --git a/controller/EmbeddedNetworkController.hpp b/controller/EmbeddedNetworkController.hpp
index 0ae2f3b5..906f4345 100644
--- a/controller/EmbeddedNetworkController.hpp
+++ b/controller/EmbeddedNetworkController.hpp
@@ -43,9 +43,6 @@
#include "JSONDB.hpp"
-// Number of background threads to start -- not actually started until needed
-#define ZT_EMBEDDEDNETWORKCONTROLLER_BACKGROUND_THREAD_COUNT 4
-
// TTL for circuit tests
#define ZT_EMBEDDEDNETWORKCONTROLLER_CIRCUIT_TEST_EXPIRATION 120000
@@ -182,15 +179,13 @@ private:
const uint64_t _startTime;
BlockingQueue<_RQEntry *> _queue;
- Thread _threads[ZT_EMBEDDEDNETWORKCONTROLLER_BACKGROUND_THREAD_COUNT];
- bool _threadsStarted;
+ std::vector<Thread> _threads;
Mutex _threads_m;
std::map<uint64_t,_NetworkMemberInfo> _nmiCache;
Mutex _nmiCache_m;
JSONDB _db;
- Mutex _db_m;
Node *const _node;
std::string _path;
diff --git a/controller/JSONDB.cpp b/controller/JSONDB.cpp
index d3e76fc1..dd8e3968 100644
--- a/controller/JSONDB.cpp
+++ b/controller/JSONDB.cpp
@@ -78,22 +78,29 @@ bool JSONDB::writeRaw(const std::string &n,const std::string &obj)
bool JSONDB::put(const std::string &n,const nlohmann::json &obj)
{
const bool r = writeRaw(n,OSUtils::jsonDump(obj));
- _db[n].obj = obj;
+ {
+ Mutex::Lock _l(_db_m);
+ _db[n].obj = obj;
+ }
return r;
}
-const nlohmann::json &JSONDB::get(const std::string &n)
+nlohmann::json JSONDB::get(const std::string &n)
{
- while (!_ready) {
- Thread::sleep(250);
- _ready = _reload(_basePath,std::string());
- }
+ {
+ Mutex::Lock _l(_db_m);
- if (!_isValidObjectName(n))
- return _EMPTY_JSON;
- std::map<std::string,_E>::iterator e(_db.find(n));
- if (e != _db.end())
- return e->second.obj;
+ while (!_ready) {
+ Thread::sleep(250);
+ _ready = _reload(_basePath,std::string());
+ }
+
+ if (!_isValidObjectName(n))
+ return _EMPTY_JSON;
+ std::map<std::string,_E>::iterator e(_db.find(n));
+ if (e != _db.end())
+ return e->second.obj;
+ }
std::string buf;
if (_httpAddr) {
@@ -110,6 +117,7 @@ const nlohmann::json &JSONDB::get(const std::string &n)
}
try {
+ Mutex::Lock _l(_db_m);
_E &e2 = _db[n];
e2.obj = OSUtils::jsonParse(buf);
return e2.obj;
@@ -135,11 +143,15 @@ void JSONDB::erase(const std::string &n)
OSUtils::rm(path.c_str());
}
- _db.erase(n);
+ {
+ Mutex::Lock _l(_db_m);
+ _db.erase(n);
+ }
}
bool JSONDB::_reload(const std::string &p,const std::string &b)
{
+ // Assumes _db_m is locked
if (_httpAddr) {
std::string body;
std::map<std::string,std::string> headers;
diff --git a/controller/JSONDB.hpp b/controller/JSONDB.hpp
index beafbaf5..2d3a5224 100644
--- a/controller/JSONDB.hpp
+++ b/controller/JSONDB.hpp
@@ -51,18 +51,16 @@ public:
bool writeRaw(const std::string &n,const std::string &obj);
bool put(const std::string &n,const nlohmann::json &obj);
-
inline bool put(const std::string &n1,const std::string &n2,const nlohmann::json &obj) { return this->put((n1 + "/" + n2),obj); }
inline bool put(const std::string &n1,const std::string &n2,const std::string &n3,const nlohmann::json &obj) { return this->put((n1 + "/" + n2 + "/" + n3),obj); }
inline bool put(const std::string &n1,const std::string &n2,const std::string &n3,const std::string &n4,const nlohmann::json &obj) { return this->put((n1 + "/" + n2 + "/" + n3 + "/" + n4),obj); }
inline bool put(const std::string &n1,const std::string &n2,const std::string &n3,const std::string &n4,const std::string &n5,const nlohmann::json &obj) { return this->put((n1 + "/" + n2 + "/" + n3 + "/" + n4 + "/" + n5),obj); }
- const nlohmann::json &get(const std::string &n);
-
- inline const nlohmann::json &get(const std::string &n1,const std::string &n2) { return this->get((n1 + "/" + n2)); }
- inline const nlohmann::json &get(const std::string &n1,const std::string &n2,const std::string &n3) { return this->get((n1 + "/" + n2 + "/" + n3)); }
- inline const nlohmann::json &get(const std::string &n1,const std::string &n2,const std::string &n3,const std::string &n4) { return this->get((n1 + "/" + n2 + "/" + n3 + "/" + n4)); }
- inline const nlohmann::json &get(const std::string &n1,const std::string &n2,const std::string &n3,const std::string &n4,const std::string &n5) { return this->get((n1 + "/" + n2 + "/" + n3 + "/" + n4 + "/" + n5)); }
+ nlohmann::json get(const std::string &n);
+ inline nlohmann::json get(const std::string &n1,const std::string &n2) { return this->get((n1 + "/" + n2)); }
+ inline nlohmann::json get(const std::string &n1,const std::string &n2,const std::string &n3) { return this->get((n1 + "/" + n2 + "/" + n3)); }
+ inline nlohmann::json get(const std::string &n1,const std::string &n2,const std::string &n3,const std::string &n4) { return this->get((n1 + "/" + n2 + "/" + n3 + "/" + n4)); }
+ inline nlohmann::json get(const std::string &n1,const std::string &n2,const std::string &n3,const std::string &n4,const std::string &n5) { return this->get((n1 + "/" + n2 + "/" + n3 + "/" + n4 + "/" + n5)); }
void erase(const std::string &n);
@@ -74,6 +72,8 @@ public:
template<typename F>
inline void filter(const std::string &prefix,F func)
{
+ Mutex::Lock _l(_db_m);
+
while (!_ready) {
Thread::sleep(250);
_ready = _reload(_basePath,std::string());
@@ -108,6 +108,7 @@ private:
InetAddress _httpAddr;
std::string _basePath;
std::map<std::string,_E> _db;
+ Mutex _db_m;
volatile bool _ready;
};
diff --git a/selftest.cpp b/selftest.cpp
index 91d304a6..33e65f2c 100644
--- a/selftest.cpp
+++ b/selftest.cpp
@@ -25,6 +25,7 @@
#include <iostream>
#include <string>
#include <vector>
+#include <thread>
#include "node/Constants.hpp"
#include "node/Hashtable.hpp"
@@ -1114,6 +1115,7 @@ int main(int argc,char **argv)
*/
std::cout << "[info] sizeof(void *) == " << sizeof(void *) << std::endl;
+ std::cout << "[info] hardware concurrency == " << std::thread::hardware_concurrency() << std::endl;
std::cout << "[info] sizeof(NetworkConfig) == " << sizeof(ZeroTier::NetworkConfig) << std::endl;
srand((unsigned int)time(0));