summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--controller/EmbeddedNetworkController.cpp79
-rw-r--r--controller/EmbeddedNetworkController.hpp19
2 files changed, 57 insertions, 41 deletions
diff --git a/controller/EmbeddedNetworkController.cpp b/controller/EmbeddedNetworkController.cpp
index 90dd8f52..91b8671e 100644
--- a/controller/EmbeddedNetworkController.cpp
+++ b/controller/EmbeddedNetworkController.cpp
@@ -467,26 +467,14 @@ void EmbeddedNetworkController::request(
{
if (((!_signingId)||(!_signingId.hasPrivate()))||(_signingId.address().toInt() != (nwid >> 24))||(!_sender))
return;
-
- {
- Mutex::Lock _l(_threads_m);
- if (_threads.size() == 0) {
- long hwc = (long)(std::thread::hardware_concurrency() / 2);
- if (hwc < 1)
- hwc = 1;
- else if (hwc > 16)
- hwc = 16;
- for(long i=0;i<hwc;++i)
- _threads.push_back(Thread::start(this));
- }
- }
-
+ _startThreads();
_RQEntry *qe = new _RQEntry;
qe->nwid = nwid;
qe->requestPacketId = requestPacketId;
qe->fromAddr = fromAddr;
qe->identity = identity;
qe->metaData = metaData;
+ qe->type = _RQEntry::RQENTRY_TYPE_REQUEST;
_queue.post(qe);
}
@@ -1051,33 +1039,14 @@ unsigned int EmbeddedNetworkController::handleControlPlaneHttpPOST(
} else if (path[0] == "ping") {
- const uint64_t now = OSUtils::now();
- bool first = true;
- std::string pong("{\"memberStatus\":{");
- {
- Mutex::Lock _l(_memberStatus_m);
- pong.reserve(64 * _memberStatus.size());
- _db.eachId([this,&pong,&now,&first](uint64_t networkId,uint64_t nodeId) {
- char tmp[64];
- uint64_t lrt = 0ULL;
- auto ms = this->_memberStatus.find(_MemberStatusKey(networkId,nodeId));
- if (ms != _memberStatus.end())
- lrt = ms->second.lastRequestTime;
- Utils::snprintf(tmp,sizeof(tmp),"%s\"%.16llx-%.10llx\":%llu",
- (first) ? "" : ",",
- (unsigned long long)networkId,
- (unsigned long long)nodeId,
- (unsigned long long)lrt);
- pong.append(tmp);
- first = false;
- });
- }
- char tmp2[256];
- Utils::snprintf(tmp2,sizeof(tmp2),"},\"clock\":%llu,\"startTime\":%llu}",(unsigned long long)now,(unsigned long long)_startTime);
- pong.append(tmp2);
- _db.writeRaw("pong",pong);
+ _startThreads();
+ _RQEntry *qe = new _RQEntry;
+ qe->type = _RQEntry::RQENTRY_TYPE_PING;
+ _queue.post(qe);
- responseBody = "{}";
+ char tmp[64];
+ Utils::snprintf(tmp,sizeof(tmp),"{\"clock\":%llu}",(unsigned long long)now);
+ responseBody = tmp;
responseContentType = "application/json";
return 200;
@@ -1150,7 +1119,35 @@ void EmbeddedNetworkController::threadMain()
_RQEntry *qe = (_RQEntry *)0;
while ((_running)&&((qe = _queue.get()))) {
try {
- _request(qe->nwid,qe->fromAddr,qe->requestPacketId,qe->identity,qe->metaData);
+ if (qe->type == _RQEntry::RQENTRY_TYPE_REQUEST) {
+ _request(qe->nwid,qe->fromAddr,qe->requestPacketId,qe->identity,qe->metaData);
+ } else if (qe->type == _RQEntry::RQENTRY_TYPE_PING) {
+ const uint64_t now = OSUtils::now();
+ bool first = true;
+ std::string pong("{\"memberStatus\":{");
+ {
+ Mutex::Lock _l(_memberStatus_m);
+ pong.reserve(64 * _memberStatus.size());
+ _db.eachId([this,&pong,&now,&first](uint64_t networkId,uint64_t nodeId) {
+ char tmp[64];
+ uint64_t lrt = 0ULL;
+ auto ms = this->_memberStatus.find(_MemberStatusKey(networkId,nodeId));
+ if (ms != _memberStatus.end())
+ lrt = ms->second.lastRequestTime;
+ Utils::snprintf(tmp,sizeof(tmp),"%s\"%.16llx-%.10llx\":%llu",
+ (first) ? "" : ",",
+ (unsigned long long)networkId,
+ (unsigned long long)nodeId,
+ (unsigned long long)lrt);
+ pong.append(tmp);
+ first = false;
+ });
+ }
+ char tmp2[256];
+ Utils::snprintf(tmp2,sizeof(tmp2),"},\"clock\":%llu,\"startTime\":%llu}",(unsigned long long)now,(unsigned long long)_startTime);
+ pong.append(tmp2);
+ _db.writeRaw("pong",pong);
+ }
} catch ( ... ) {}
delete qe;
diff --git a/controller/EmbeddedNetworkController.hpp b/controller/EmbeddedNetworkController.hpp
index 0a6b8176..ade7eb20 100644
--- a/controller/EmbeddedNetworkController.hpp
+++ b/controller/EmbeddedNetworkController.hpp
@@ -26,6 +26,7 @@
#include <vector>
#include <set>
#include <list>
+#include <thread>
#include "../node/Constants.hpp"
@@ -103,11 +104,29 @@ private:
InetAddress fromAddr;
Identity identity;
Dictionary<ZT_NETWORKCONFIG_METADATA_DICT_CAPACITY> metaData;
+ enum {
+ RQENTRY_TYPE_REQUEST = 0,
+ RQENTRY_TYPE_PING = 1
+ } type;
};
static void _circuitTestCallback(ZT_Node *node,ZT_CircuitTest *test,const ZT_CircuitTestReport *report);
void _request(uint64_t nwid,const InetAddress &fromAddr,uint64_t requestPacketId,const Identity &identity,const Dictionary<ZT_NETWORKCONFIG_METADATA_DICT_CAPACITY> &metaData);
+ inline void _startThreads()
+ {
+ Mutex::Lock _l(_threads_m);
+ if (_threads.size() == 0) {
+ long hwc = (long)std::thread::hardware_concurrency();
+ if (hwc < 1)
+ hwc = 1;
+ else if (hwc > 16)
+ hwc = 16;
+ for(long i=0;i<hwc;++i)
+ _threads.push_back(Thread::start(this));
+ }
+ }
+
// These init objects with default and static/informational fields
inline void _initMember(nlohmann::json &member)
{