From a577b8d3816069a448a946302048a377b55cd74a Mon Sep 17 00:00:00 2001 From: Adam Ierymenko Date: Wed, 1 Mar 2017 16:33:34 -0800 Subject: Update how controller handles circuit tests -- save results to filesystem. --- controller/EmbeddedNetworkController.cpp | 137 +++++++++++++++---------------- 1 file changed, 65 insertions(+), 72 deletions(-) (limited to 'controller/EmbeddedNetworkController.cpp') diff --git a/controller/EmbeddedNetworkController.cpp b/controller/EmbeddedNetworkController.cpp index 0e57fc14..7915765b 100644 --- a/controller/EmbeddedNetworkController.cpp +++ b/controller/EmbeddedNetworkController.cpp @@ -428,9 +428,9 @@ static bool _parseRule(json &r,ZT_VirtualNetworkRule &rule) return false; } -EmbeddedNetworkController::EmbeddedNetworkController(Node *node,const char *dbPath,FILE *feed) : +EmbeddedNetworkController::EmbeddedNetworkController(Node *node,const char *dbPath) : _threadsStarted(false), - _db(dbPath,feed), + _db(dbPath), _node(node) { OSUtils::mkdir(dbPath); @@ -546,21 +546,6 @@ unsigned int EmbeddedNetworkController::handleControlPlaneHttpGET( return 200; } - } else if ((path[2] == "test")&&(path.size() >= 4)) { - - Mutex::Lock _l(_circuitTests_m); - std::map< uint64_t,_CircuitTestEntry >::iterator cte(_circuitTests.find(Utils::hexStrToU64(path[3].c_str()))); - if ((cte != _circuitTests.end())&&(cte->second.test)) { - - responseBody = "["; - responseBody.append(cte->second.jsonResults); - responseBody.push_back(']'); - responseContentType = "application/json"; - - return 200; - - } // else 404 - } // else 404 } else { @@ -755,9 +740,10 @@ unsigned int EmbeddedNetworkController::handleControlPlaneHttpPOST( return 200; } else if ((path.size() == 3)&&(path[2] == "test")) { - Mutex::Lock _l(_circuitTests_m); + Mutex::Lock _l(_tests_m); - ZT_CircuitTest *test = (ZT_CircuitTest *)malloc(sizeof(ZT_CircuitTest)); + _tests.push_back(ZT_CircuitTest()); + ZT_CircuitTest *const test = &(_tests.back()); memset(test,0,sizeof(ZT_CircuitTest)); Utils::getSecureRandom(&(test->testId),sizeof(test->testId)); @@ -781,7 +767,7 @@ unsigned int EmbeddedNetworkController::handleControlPlaneHttpPOST( test->reportAtEveryHop = (OSUtils::jsonBool(b["reportAtEveryHop"],true) ? 1 : 0); if (!test->hopCount) { - ::free((void *)test); + _tests.pop_back(); responseBody = "{ \"message\": \"a test must contain at least one hop\" }"; responseContentType = "application/json"; return 400; @@ -789,18 +775,18 @@ unsigned int EmbeddedNetworkController::handleControlPlaneHttpPOST( test->timestamp = OSUtils::now(); - _CircuitTestEntry &te = _circuitTests[test->testId]; - te.test = test; - te.jsonResults = ""; - - if (_node) + if (_node) { _node->circuitTestBegin(test,&(EmbeddedNetworkController::_circuitTestCallback)); - else return 500; + } else { + _tests.pop_back(); + return 500; + } char json[1024]; Utils::snprintf(json,sizeof(json),"{\"testId\":\"%.16llx\"}",test->testId); responseBody = json; responseContentType = "application/json"; + return 200; } // else 404 @@ -1137,62 +1123,67 @@ unsigned int EmbeddedNetworkController::handleControlPlaneHttpDELETE( void EmbeddedNetworkController::threadMain() throw() { + uint64_t lastCircuitTestCheck = 0; for(;;) { - _RQEntry *const qe = _queue.get(); + _RQEntry *const qe = _queue.get(); // waits on next request if (!qe) break; // enqueue a NULL to terminate threads try { _request(qe->nwid,qe->fromAddr,qe->requestPacketId,qe->identity,qe->metaData); } catch ( ... ) {} delete qe; + + uint64_t now = OSUtils::now(); + if ((now - lastCircuitTestCheck) > ZT_EMBEDDEDNETWORKCONTROLLER_CIRCUIT_TEST_EXPIRATION) { + lastCircuitTestCheck = now; + Mutex::Lock _l(_tests_m); + for(std::list< ZT_CircuitTest >::iterator i(_tests.begin());i!=_tests.end();) { + if ((now - i->timestamp) > ZT_EMBEDDEDNETWORKCONTROLLER_CIRCUIT_TEST_EXPIRATION) { + _node->circuitTestEnd(&(*i)); + _tests.erase(i++); + } else ++i; + } + } } } void EmbeddedNetworkController::_circuitTestCallback(ZT_Node *node,ZT_CircuitTest *test,const ZT_CircuitTestReport *report) { - char tmp[65535]; + char tmp[1024],id[128]; EmbeddedNetworkController *const self = reinterpret_cast(test->ptr); - if (!test) - return; - if (!report) - return; - - Mutex::Lock _l(self->_circuitTests_m); - std::map< uint64_t,_CircuitTestEntry >::iterator cte(self->_circuitTests.find(test->testId)); - - if (cte == self->_circuitTests.end()) { // sanity check: a circuit test we didn't launch? - self->_node->circuitTestEnd(test); - ::free((void *)test); - return; - } + if ((!test)||(!report)||(!test->credentialNetworkId)) return; // sanity check + const uint64_t now = OSUtils::now(); + Utils::snprintf(id,sizeof(id),"network/%.16llx/test/%.16llx-%.16llx-%.10llx-%.10llx",test->credentialNetworkId,test->testId,now,report->upstream,report->current); Utils::snprintf(tmp,sizeof(tmp), - "%s{\n" - "\t\"timestamp\": %llu," ZT_EOL_S - "\t\"testId\": \"%.16llx\"," ZT_EOL_S - "\t\"upstream\": \"%.10llx\"," ZT_EOL_S - "\t\"current\": \"%.10llx\"," ZT_EOL_S - "\t\"receivedTimestamp\": %llu," ZT_EOL_S - "\t\"sourcePacketId\": \"%.16llx\"," ZT_EOL_S - "\t\"flags\": %llu," ZT_EOL_S - "\t\"sourcePacketHopCount\": %u," ZT_EOL_S - "\t\"errorCode\": %u," ZT_EOL_S - "\t\"vendor\": %d," ZT_EOL_S - "\t\"protocolVersion\": %u," ZT_EOL_S - "\t\"majorVersion\": %u," ZT_EOL_S - "\t\"minorVersion\": %u," ZT_EOL_S - "\t\"revision\": %u," ZT_EOL_S - "\t\"platform\": %d," ZT_EOL_S - "\t\"architecture\": %d," ZT_EOL_S - "\t\"receivedOnLocalAddress\": \"%s\"," ZT_EOL_S - "\t\"receivedFromRemoteAddress\": \"%s\"" ZT_EOL_S - "}", - ((cte->second.jsonResults.length() > 0) ? ",\n" : ""), - (unsigned long long)report->timestamp, + "{\"id\": \"%s\"," + "\"timestamp\": %llu," + "\"networkId\": \"%.16llx\"," + "\"testId\": \"%.16llx\"," + "\"upstream\": \"%.10llx\"," + "\"current\": \"%.10llx\"," + "\"receivedTimestamp\": %llu," + "\"sourcePacketId\": \"%.16llx\"," + "\"flags\": %llu," + "\"sourcePacketHopCount\": %u," + "\"errorCode\": %u," + "\"vendor\": %d," + "\"protocolVersion\": %u," + "\"majorVersion\": %u," + "\"minorVersion\": %u," + "\"revision\": %u," + "\"platform\": %d," + "\"architecture\": %d," + "\"receivedOnLocalAddress\": \"%s\"," + "\"receivedFromRemoteAddress\": \"%s\"," + "\"receivedFromLinkQuality\": %f}", + id + 30, // last bit only, not leading path + (unsigned long long)test->timestamp, + (unsigned long long)test->credentialNetworkId, (unsigned long long)test->testId, (unsigned long long)report->upstream, (unsigned long long)report->current, - (unsigned long long)OSUtils::now(), + (unsigned long long)now, (unsigned long long)report->sourcePacketId, (unsigned long long)report->flags, report->sourcePacketHopCount, @@ -1205,9 +1196,11 @@ void EmbeddedNetworkController::_circuitTestCallback(ZT_Node *node,ZT_CircuitTes (int)report->platform, (int)report->architecture, reinterpret_cast(&(report->receivedOnLocalAddress))->toString().c_str(), - reinterpret_cast(&(report->receivedFromRemoteAddress))->toString().c_str()); + reinterpret_cast(&(report->receivedFromRemoteAddress))->toString().c_str(), + ((double)report->receivedFromLinkQuality / (double)ZT_PATH_LINK_QUALITY_MAX)); - cte->second.jsonResults.append(tmp); + Mutex::Lock _l(self->_db_m); + self->_db.writeRaw(id,std::string(tmp)); } void EmbeddedNetworkController::_request( @@ -1354,12 +1347,12 @@ void EmbeddedNetworkController::_request( if (requestPacketId) { // only log if this is a request, not for generated pushes json rlEntry = json::object(); rlEntry["ts"] = now; - rlEntry["authorized"] = (authorizedBy) ? true : false; - rlEntry["authorizedBy"] = (authorizedBy) ? authorizedBy : ""; - rlEntry["clientMajorVersion"] = metaData.getUI(ZT_NETWORKCONFIG_REQUEST_METADATA_KEY_NODE_MAJOR_VERSION,0); - rlEntry["clientMinorVersion"] = metaData.getUI(ZT_NETWORKCONFIG_REQUEST_METADATA_KEY_NODE_MINOR_VERSION,0); - rlEntry["clientRevision"] = metaData.getUI(ZT_NETWORKCONFIG_REQUEST_METADATA_KEY_NODE_REVISION,0); - rlEntry["clientProtocolVersion"] = metaData.getUI(ZT_NETWORKCONFIG_REQUEST_METADATA_KEY_PROTOCOL_VERSION,0); + rlEntry["auth"] = (authorizedBy) ? true : false; + rlEntry["authBy"] = (authorizedBy) ? authorizedBy : ""; + rlEntry["vMajor"] = metaData.getUI(ZT_NETWORKCONFIG_REQUEST_METADATA_KEY_NODE_MAJOR_VERSION,0); + rlEntry["vMinor"] = metaData.getUI(ZT_NETWORKCONFIG_REQUEST_METADATA_KEY_NODE_MINOR_VERSION,0); + rlEntry["vRev"] = metaData.getUI(ZT_NETWORKCONFIG_REQUEST_METADATA_KEY_NODE_REVISION,0); + rlEntry["vProto"] = metaData.getUI(ZT_NETWORKCONFIG_REQUEST_METADATA_KEY_PROTOCOL_VERSION,0); if (fromAddr) rlEntry["fromAddr"] = fromAddr.toString(); -- cgit v1.2.3