diff options
Diffstat (limited to 'controller/JSONDB.cpp')
-rw-r--r-- | controller/JSONDB.cpp | 185 |
1 files changed, 130 insertions, 55 deletions
diff --git a/controller/JSONDB.cpp b/controller/JSONDB.cpp index 0c061266..7f92d6ee 100644 --- a/controller/JSONDB.cpp +++ b/controller/JSONDB.cpp @@ -16,6 +16,18 @@ * along with this program. If not, see <http://www.gnu.org/licenses/>. */ +#include <stdio.h> +#include <stdlib.h> +#include <stdint.h> +#ifndef __WINDOWS__ +#include <unistd.h> +#include <fcntl.h> +#include <sys/time.h> +#include <sys/types.h> +#include <sys/socket.h> +#include <sys/select.h> +#endif + #include "JSONDB.hpp" #define ZT_JSONDB_HTTP_TIMEOUT 60000 @@ -27,9 +39,12 @@ static const std::map<std::string,std::string> _ZT_JSONDB_GET_HEADERS; JSONDB::JSONDB(const std::string &basePath) : _basePath(basePath), + _rawInput(-1), + _rawOutput(-1), _summaryThreadRun(true) { if ((_basePath.length() > 7)&&(_basePath.substr(0,7) == "http://")) { + // If base path is http:// we run in HTTP mode // TODO: this doesn't yet support IPv6 since bracketed address notiation isn't supported. // Typically it's just used with 127.0.0.1 anyway. std::string hn = _basePath.substr(7); @@ -46,16 +61,27 @@ JSONDB::JSONDB(const std::string &basePath) : _basePath = "/"; if (_basePath[0] != '/') _basePath = std::string("/") + _basePath; +#ifndef __WINDOWS__ + } else if (_basePath == "-") { + // If base path is "-" we run in stdin/stdout mode and expect our database to be populated on startup via stdin + // Not supported on Windows + _rawInput = STDIN_FILENO; + _rawOutput = STDOUT_FILENO; + fcntl(_rawInput,F_SETFL,O_NONBLOCK); +#endif } else { + // Default mode of operation is to store files in the filesystem OSUtils::mkdir(_basePath.c_str()); OSUtils::lockDownFile(_basePath.c_str(),true); // networks might contain auth tokens, etc., so restrict directory permissions } - unsigned int cnt = 0; - while (!_load(_basePath)) { - if ((++cnt & 7) == 0) - fprintf(stderr,"WARNING: controller still waiting to read '%s'..." ZT_EOL_S,_basePath.c_str()); - Thread::sleep(250); + if (_rawInput < 0) { + unsigned int cnt = 0; + while (!_load(_basePath)) { + if ((++cnt & 7) == 0) + fprintf(stderr,"WARNING: controller still waiting to read '%s'..." ZT_EOL_S,_basePath.c_str()); + Thread::sleep(250); + } } for(std::unordered_map<uint64_t,_NW>::iterator n(_networks.begin());n!=_networks.end();++n) @@ -89,7 +115,18 @@ JSONDB::~JSONDB() bool JSONDB::writeRaw(const std::string &n,const std::string &obj) { - if (_httpAddr) { + if (_rawOutput >= 0) { +#ifndef __WINDOWS__ + if (obj.length() > 0) { + Mutex::Lock _l(_rawLock); + if (write(_rawOutput,obj.c_str(),obj.length() + 1) > 0) + return true; + } else { + return true; + } +#endif + return false; + } else if (_httpAddr) { std::map<std::string,std::string> headers; std::string body; std::map<std::string,std::string> reqHeaders; @@ -205,11 +242,13 @@ nlohmann::json JSONDB::eraseNetwork(const uint64_t networkId) char n[256]; OSUtils::ztsnprintf(n,sizeof(n),"network/%.16llx",(unsigned long long)networkId); - if (_httpAddr) { - // Deletion is currently done by Central in harnessed mode - //std::map<std::string,std::string> headers; - //std::string body; - //Http::DEL(0,ZT_JSONDB_HTTP_TIMEOUT,reinterpret_cast<const struct sockaddr *>(&_httpAddr),(_basePath+"/"+n).c_str(),_ZT_JSONDB_GET_HEADERS,headers,body); + if (_rawOutput >= 0) { + // In harnessed mode, deletes occur in Central or other management + // software and do not need to be executed this way. + } else if (_httpAddr) { + std::map<std::string,std::string> headers; + std::string body; + Http::DEL(0,ZT_JSONDB_HTTP_TIMEOUT,reinterpret_cast<const struct sockaddr *>(&_httpAddr),(_basePath+"/"+n).c_str(),_ZT_JSONDB_GET_HEADERS,headers,body); } else { const std::string path(_genPath(n,false)); if (path.length()) @@ -232,11 +271,13 @@ nlohmann::json JSONDB::eraseNetworkMember(const uint64_t networkId,const uint64_ char n[256]; OSUtils::ztsnprintf(n,sizeof(n),"network/%.16llx/member/%.10llx",(unsigned long long)networkId,(unsigned long long)nodeId); - if (_httpAddr) { - // Deletion is currently done by the caller in Central harnessed mode - //std::map<std::string,std::string> headers; - //std::string body; - //Http::DEL(0,ZT_JSONDB_HTTP_TIMEOUT,reinterpret_cast<const struct sockaddr *>(&_httpAddr),(_basePath+"/"+n).c_str(),_ZT_JSONDB_GET_HEADERS,headers,body); + if (_rawOutput >= 0) { + // In harnessed mode, deletes occur in Central or other management + // software and do not need to be executed this way. + } else if (_httpAddr) { + std::map<std::string,std::string> headers; + std::string body; + Http::DEL(0,ZT_JSONDB_HTTP_TIMEOUT,reinterpret_cast<const struct sockaddr *>(&_httpAddr),(_basePath+"/"+n).c_str(),_ZT_JSONDB_GET_HEADERS,headers,body); } else { const std::string path(_genPath(n,false)); if (path.length()) @@ -263,9 +304,41 @@ nlohmann::json JSONDB::eraseNetworkMember(const uint64_t networkId,const uint64_ void JSONDB::threadMain() throw() { +#ifndef __WINDOWS__ + fd_set readfds,nullfds; + char *const readbuf = (_rawInput >= 0) ? (new char[1048576]) : (char *)0; + std::string rawInputBuf; + FD_ZERO(&readfds); + FD_ZERO(&nullfds); +#endif + std::vector<uint64_t> todo; + while (_summaryThreadRun) { - Thread::sleep(10); +#ifndef __WINDOWS__ + if (_rawInput < 0) { + Thread::sleep(25); + } else { + FD_SET(_rawInput,&readfds); + struct timeval tv; + tv.tv_sec = 0; + tv.tv_usec = 25000; + select(_rawInput+1,&readfds,&nullfds,&nullfds,&tv); + if (FD_ISSET(_rawInput,&readfds)) { + const long rn = (long)read(_rawInput,readbuf,1048576); + for(long i=0;i<rn;++i) { + if (readbuf[i]) { + rawInputBuf.push_back(readbuf[i]); + } else if (rawInputBuf.length() > 0) { + _add(OSUtils::jsonParse(rawInputBuf)); + rawInputBuf.clear(); + } + } + } + } +#else + Thread::sleep(25); +#endif { Mutex::Lock _l(_summaryThread_m); @@ -273,7 +346,6 @@ void JSONDB::threadMain() continue; else _summaryThreadToDo.swap(todo); } - const uint64_t now = OSUtils::now(); for(std::vector<uint64_t>::iterator ii(todo.begin());ii!=todo.end();++ii) { const uint64_t networkId = *ii; @@ -340,10 +412,46 @@ void JSONDB::threadMain() todo.clear(); } + +#ifndef __WINDOWS__ + delete [] readbuf; +#endif +} + +bool JSONDB::_add(const nlohmann::json &j) +{ + try { + if (j.is_object()) { + std::string id(OSUtils::jsonString(j["id"],"0")); + std::string objtype(OSUtils::jsonString(j["objtype"],"")); + + if ((id.length() == 16)&&(objtype == "network")) { + const uint64_t nwid = Utils::hexStrToU64(id.c_str()); + if (nwid) { + Mutex::Lock _l(_networks_m); + _networks[nwid].config = nlohmann::json::to_msgpack(j); + return true; + } + } else if ((id.length() == 10)&&(objtype == "member")) { + const uint64_t mid = Utils::hexStrToU64(id.c_str()); + const uint64_t nwid = Utils::hexStrToU64(OSUtils::jsonString(j["nwid"],"0").c_str()); + if ((mid)&&(nwid)) { + Mutex::Lock _l(_networks_m); + _networks[nwid].members[mid] = nlohmann::json::to_msgpack(j); + _members[mid].insert(nwid); + return true; + } + } + } + } catch ( ... ) {} + return false; } bool JSONDB::_load(const std::string &p) { + // This is not used in stdin/stdout mode. Instead data is populated by + // sending it all to stdin. + if (_httpAddr) { // In HTTP harnessed mode we download our entire working data set on startup. @@ -357,24 +465,9 @@ bool JSONDB::_load(const std::string &p) if (dbImg.is_object()) { Mutex::Lock _l(_networks_m); for(nlohmann::json::iterator i(dbImg.begin());i!=dbImg.end();++i) { - nlohmann::json &j = i.value(); - if (j.is_object()) { - std::string id(OSUtils::jsonString(j["id"],"0")); - std::string objtype(OSUtils::jsonString(j["objtype"],"")); - - if ((id.length() == 16)&&(objtype == "network")) { - const uint64_t nwid = Utils::hexStrToU64(id.c_str()); - if (nwid) - _networks[nwid].config = nlohmann::json::to_msgpack(j); - } else if ((id.length() == 10)&&(objtype == "member")) { - const uint64_t mid = Utils::hexStrToU64(id.c_str()); - const uint64_t nwid = Utils::hexStrToU64(OSUtils::jsonString(j["nwid"],"0").c_str()); - if ((mid)&&(nwid)) { - _networks[nwid].members[mid] = nlohmann::json::to_msgpack(j); - _members[mid].insert(nwid); - } - } - } + try { + _add(i.value()); + } catch ( ... ) {} } return true; } @@ -391,25 +484,7 @@ bool JSONDB::_load(const std::string &p) std::string buf; if (OSUtils::readFile((p + ZT_PATH_SEPARATOR_S + *di).c_str(),buf)) { try { - nlohmann::json j(OSUtils::jsonParse(buf)); - std::string id(OSUtils::jsonString(j["id"],"0")); - std::string objtype(OSUtils::jsonString(j["objtype"],"")); - - if ((id.length() == 16)&&(objtype == "network")) { - const uint64_t nwid = Utils::hexStrToU64(id.c_str()); - if (nwid) { - Mutex::Lock _l(_networks_m); - _networks[nwid].config = nlohmann::json::to_msgpack(j); - } - } else if ((id.length() == 10)&&(objtype == "member")) { - const uint64_t mid = Utils::hexStrToU64(id.c_str()); - const uint64_t nwid = Utils::hexStrToU64(OSUtils::jsonString(j["nwid"],"0").c_str()); - if ((mid)&&(nwid)) { - Mutex::Lock _l(_networks_m); - _networks[nwid].members[mid] = nlohmann::json::to_msgpack(j); - _members[mid].insert(nwid); - } - } + _add(OSUtils::jsonParse(buf)); } catch ( ... ) {} } } else { |