diff options
Diffstat (limited to 'controller/LFDB.cpp')
-rw-r--r-- | controller/LFDB.cpp | 324 |
1 files changed, 99 insertions, 225 deletions
diff --git a/controller/LFDB.cpp b/controller/LFDB.cpp index 826d7cba..8779c47e 100644 --- a/controller/LFDB.cpp +++ b/controller/LFDB.cpp @@ -41,30 +41,77 @@ LFDB::LFDB(EmbeddedNetworkController *const nc,const Identity &myId,const char * DB(nc,myId,path), _nc(nc), _myId(myId), - _lfOwnerPrivate(lfOwnerPrivate), - _lfOwnerPublic(lfOwnerPublic), - _lfNodeHost(lfNodeHost), - _lfNodePort(lfNodePort), + _lfOwnerPrivate((lfOwnerPrivate) ? lfOwnerPrivate : ""), + _lfOwnerPublic((lfOwnerPublic) ? lfOwnerPublic : ""), + _lfNodeHost((lfNodeHost) ? lfNodeHost : "127.0.0.1"), + _lfNodePort(((lfNodePort > 0)&&(lfNodePort < 65536)) ? lfNodePort : 9980), _running(true), _ready(false), _storeOnlineState(storeOnlineState) { _syncThread = std::thread([this]() { char controllerAddress[24]; + const uint64_t controllerAddressInt = _myId.address().toInt(); _myId.address().toString(controllerAddress); + std::string networksSelectorName("com.zerotier.controller.lfdb:"); networksSelectorName.append(controllerAddress); networksSelectorName.append("/network"); + std::string membersSelectorName("com.zerotier.controller.lfdb:"); membersSelectorName.append(controllerAddress); membersSelectorName.append("/network/member"); httplib::Client htcli(_lfNodeHost.c_str(),_lfNodePort,600); std::ostringstream query; int64_t timeRangeStart = 0; while (_running) { + { + std::lock_guard<std::mutex> sl(_state_l); + for(auto ns=_state.begin();ns!=_state.end();++ns) { + if (ns->second.dirty) { + nlohmann::json network; + if (get(ns->first,network)) { + nlohmann::json newrec; + newrec["Selectors"] = {{ { { "Name",networksSelectorName },{ "Ordinal",ns->first } } }}; + newrec["Value"] = network.dump(); + newrec["OwnerPrivate"] = _lfOwnerPrivate; + newrec["MaskingKey"] = controllerAddress; + auto resp = htcli.Post("/make",newrec.dump(),"application/json"); + if (resp->status == 200) { + ns->second.dirty = false; + } else { + fprintf(stderr,"ERROR: LFDB: %d from node (create/update network): %s" ZT_EOL_S,resp->status,resp->body.c_str()); + } + } + } + + for(auto ms=ns->second.members.begin();ms!=ns->second.members.end();++ms) { + if ((_storeOnlineState)&&(ms->second.lastOnlineDirty)) { + } + + if (ms->second.dirty) { + nlohmann::json network,member; + if (get(ns->first,network,ms->first,member)) { + nlohmann::json newrec; + newrec["Selectors"] = {{ { { "Name",networksSelectorName },{ "Ordinal",ns->first } },{ { "Name",membersSelectorName },{ "Ordinal",ms->first } } }}; + newrec["Value"] = member.dump(); + newrec["OwnerPrivate"] = _lfOwnerPrivate; + newrec["MaskingKey"] = controllerAddress; + auto resp = htcli.Post("/make",newrec.dump(),"application/json"); + if (resp->status == 200) { + ms->second.dirty = false; + } else { + fprintf(stderr,"ERROR: LFDB: %d from node (create/update member): %s" ZT_EOL_S,resp->status,resp->body.c_str()); + } + } + } + } + } + } + query.clear(); query << '{' << "\"Ranges\":[{" - << "\"Name\": \"com.zerotier.controller.lfdb:" << controllerAddress << "/network\"," - << "\"Range\": [ 0,18446744073709551615 ]" + << "\"Name\":\"" << networksSelectorName << "\"," + << "\"Range\":[0,18446744073709551615]" << "}]," - << "\"TimeRange\": [ " << timeRangeStart << ",18446744073709551615 ]," + << "\"TimeRange\":[" << timeRangeStart << ",18446744073709551615]," << "\"MaskingKey\":\"" << controllerAddress << "\"," << "\"Owners\":[\"" << _lfOwnerPublic << "\"]" << '}'; @@ -79,11 +126,19 @@ LFDB::LFDB(EmbeddedNetworkController *const nc,const Identity &myId,const char * if (result.is_object()) { nlohmann::json &record = result["Record"]; if (record.is_object()) { - int64_t ts = record["Timestamp"]; - std::string value = result["Value"]; - nlohmann::json network(OSUtils::jsonParse(value)); + const std::string recordValue = result["Value"]; + nlohmann::json network(OSUtils::jsonParse(recordValue)); if (network.is_object()) { - std::string idstr = network["id"]; + const std::string idstr = network["id"]; + const uint64_t id = Utils::hexStrToU64(idstr.c_str()); + if ((id >> 24) == controllerAddressInt) { + std::lock_guard<std::mutex> sl(_state_l); + _NetworkState &ns = _state[id]; + if (!ns.dirty) { + nlohmann::json nullJson; + _networkChanged(nullJson,network,false); + } + } } } } @@ -98,13 +153,13 @@ LFDB::LFDB(EmbeddedNetworkController *const nc,const Identity &myId,const char * query << '{' << "\"Ranges\":[{" - << "\"Name\": \"com.zerotier.controller.lfdb:" << controllerAddress << "/network\"," - << "\"Range\": [ 0,18446744073709551615 ]" + << "\"Name\":\"" << networksSelectorName << "\"," + << "\"Range\":[0,18446744073709551615]" << "},{" - << "\"Name\": \"com.zerotier.controller.lfdb:" << controllerAddress << "/network/member\"," - << "\"Range\": [ 0,18446744073709551615 ]" + << "\"Name\":\"" << membersSelectorName << "\"," + << "\"Range\":[0,18446744073709551615]" << "}]," - << "\"TimeRange\": [ " << timeRangeStart << ",18446744073709551615 ]," + << "\"TimeRange\":[" << timeRangeStart << ",18446744073709551615]," << "\"MaskingKey\":\"" << controllerAddress << "\"," << "\"Owners\":[\"" << _lfOwnerPublic << "\"]" << '}'; @@ -119,12 +174,24 @@ LFDB::LFDB(EmbeddedNetworkController *const nc,const Identity &myId,const char * if (result.is_object()) { nlohmann::json &record = result["Record"]; if (record.is_object()) { - int64_t ts = record["Timestamp"]; - std::string value = result["Value"]; - nlohmann::json member(OSUtils::jsonParse(value)); + const std::string recordValue = result["Value"]; + nlohmann::json member(OSUtils::jsonParse(recordValue)); if (member.is_object()) { - std::string nwidstr = member["nwid"]; - std::string idstr = member["id"]; + const std::string nwidstr = member["nwid"]; + const std::string idstr = member["id"]; + const uint64_t nwid = Utils::hexStrToU64(nwidstr.c_str()); + const uint64_t id = Utils::hexStrToU64(idstr.c_str()); + if ((id)&&((nwid >> 24) == controllerAddressInt)) { + std::lock_guard<std::mutex> sl(_state_l); + auto ns = _state.find(nwid); + if (ns != _state.end()) { + _MemberState &ms = ns->second.members[id]; + if (!ms.dirty) { + nlohmann::json nullJson; + _memberChanged(nullJson,member,false); + } + } + } } } } @@ -138,8 +205,7 @@ LFDB::LFDB(EmbeddedNetworkController *const nc,const Identity &myId,const char * timeRangeStart = time(nullptr) - 120; // start next query 2m before now to avoid losing updates _ready = true; - // Delay 2s between queries, checking running flag every 100ms - for(int k=0;k<20;++k) { + for(int k=0;k<20;++k) { // 2s delay between queries for remotely modified networks or members if (!_running) return; std::this_thread::sleep_for(std::chrono::milliseconds(100)); @@ -184,6 +250,11 @@ void LFDB::save(nlohmann::json *orig,nlohmann::json &record) nlohmann::json old; get(nwid,old); if ((!old.is_object())||(old != record)) { + _networkChanged(old,record,true); + { + std::lock_guard<std::mutex> l(_state_l); + _state[nwid].dirty = true; + } } } } else if (objtype == "member") { @@ -193,6 +264,11 @@ void LFDB::save(nlohmann::json *orig,nlohmann::json &record) nlohmann::json network,old; get(nwid,network,id,old); if ((!old.is_object())||(old != record)) { + _memberChanged(old,record,true); + { + std::lock_guard<std::mutex> l(_state_l); + _state[nwid].members[id].dirty = true; + } } } } @@ -223,206 +299,4 @@ void LFDB::nodeIsOnline(const uint64_t networkId,const uint64_t memberId,const I } } -#if 0 -FileDB::FileDB(EmbeddedNetworkController *const nc,const Identity &myId,const char *path) : - DB(nc,myId,path), - _networksPath(_path + ZT_PATH_SEPARATOR_S + "network"), - _tracePath(_path + ZT_PATH_SEPARATOR_S + "trace"), - _onlineChanged(false), - _running(true) -{ - OSUtils::mkdir(_path.c_str()); - OSUtils::lockDownFile(_path.c_str(),true); - OSUtils::mkdir(_networksPath.c_str()); - OSUtils::mkdir(_tracePath.c_str()); - - std::vector<std::string> networks(OSUtils::listDirectory(_networksPath.c_str(),false)); - std::string buf; - for(auto n=networks.begin();n!=networks.end();++n) { - buf.clear(); - if ((n->length() == 21)&&(OSUtils::readFile((_networksPath + ZT_PATH_SEPARATOR_S + *n).c_str(),buf))) { - try { - nlohmann::json network(OSUtils::jsonParse(buf)); - const std::string nwids = network["id"]; - if (nwids.length() == 16) { - nlohmann::json nullJson; - _networkChanged(nullJson,network,false); - std::string membersPath(_networksPath + ZT_PATH_SEPARATOR_S + nwids + ZT_PATH_SEPARATOR_S "member"); - std::vector<std::string> members(OSUtils::listDirectory(membersPath.c_str(),false)); - for(auto m=members.begin();m!=members.end();++m) { - buf.clear(); - if ((m->length() == 15)&&(OSUtils::readFile((membersPath + ZT_PATH_SEPARATOR_S + *m).c_str(),buf))) { - try { - nlohmann::json member(OSUtils::jsonParse(buf)); - const std::string addrs = member["id"]; - if (addrs.length() == 10) { - nlohmann::json nullJson2; - _memberChanged(nullJson2,member,false); - } - } catch ( ... ) {} - } - } - } - } catch ( ... ) {} - } - } - - _onlineUpdateThread = std::thread([this]() { - unsigned int cnt = 0; - while (this->_running) { - std::this_thread::sleep_for(std::chrono::microseconds(100)); - if ((++cnt % 20) == 0) { // 5 seconds - std::lock_guard<std::mutex> l(this->_online_l); - if (!this->_running) return; - if (this->_onlineChanged) { - char p[4096],atmp[64]; - for(auto nw=this->_online.begin();nw!=this->_online.end();++nw) { - OSUtils::ztsnprintf(p,sizeof(p),"%s" ZT_PATH_SEPARATOR_S "%.16llx-online.json",_networksPath.c_str(),(unsigned long long)nw->first); - FILE *f = fopen(p,"wb"); - if (f) { - fprintf(f,"{"); - const char *memberPrefix = ""; - for(auto m=nw->second.begin();m!=nw->second.end();++m) { - fprintf(f,"%s\"%.10llx\":{" ZT_EOL_S,memberPrefix,(unsigned long long)m->first); - memberPrefix = ","; - InetAddress lastAddr; - const char *timestampPrefix = " "; - int cnt = 0; - for(auto ts=m->second.rbegin();ts!=m->second.rend();) { - if (cnt < 25) { - if (lastAddr != ts->second) { - lastAddr = ts->second; - fprintf(f,"%s\"%lld\":\"%s\"" ZT_EOL_S,timestampPrefix,(long long)ts->first,ts->second.toString(atmp)); - timestampPrefix = ","; - ++cnt; - ++ts; - } else { - ts = std::map<int64_t,InetAddress>::reverse_iterator(m->second.erase(std::next(ts).base())); - } - } else { - ts = std::map<int64_t,InetAddress>::reverse_iterator(m->second.erase(std::next(ts).base())); - } - } - fprintf(f,"}"); - } - fprintf(f,"}" ZT_EOL_S); - fclose(f); - } - } - this->_onlineChanged = false; - } - } - } - }); -} - -FileDB::~FileDB() -{ - try { - _online_l.lock(); - _running = false; - _online_l.unlock(); - _onlineUpdateThread.join(); - } catch ( ... ) {} -} - -bool FileDB::waitForReady() { return true; } -bool FileDB::isReady() { return true; } - -void FileDB::save(nlohmann::json *orig,nlohmann::json &record) -{ - char p1[4096],p2[4096],pb[4096]; - try { - if (orig) { - if (*orig != record) { - record["revision"] = OSUtils::jsonInt(record["revision"],0ULL) + 1; - } - } else { - record["revision"] = 1; - } - - const std::string objtype = record["objtype"]; - if (objtype == "network") { - const uint64_t nwid = OSUtils::jsonIntHex(record["id"],0ULL); - if (nwid) { - nlohmann::json old; - get(nwid,old); - if ((!old.is_object())||(old != record)) { - OSUtils::ztsnprintf(p1,sizeof(p1),"%s" ZT_PATH_SEPARATOR_S "%.16llx.json",_networksPath.c_str(),nwid); - if (!OSUtils::writeFile(p1,OSUtils::jsonDump(record,-1))) - fprintf(stderr,"WARNING: controller unable to write to path: %s" ZT_EOL_S,p1); - _networkChanged(old,record,true); - } - } - } else if (objtype == "member") { - const uint64_t id = OSUtils::jsonIntHex(record["id"],0ULL); - const uint64_t nwid = OSUtils::jsonIntHex(record["nwid"],0ULL); - if ((id)&&(nwid)) { - nlohmann::json network,old; - get(nwid,network,id,old); - if ((!old.is_object())||(old != record)) { - OSUtils::ztsnprintf(pb,sizeof(pb),"%s" ZT_PATH_SEPARATOR_S "%.16llx" ZT_PATH_SEPARATOR_S "member",_networksPath.c_str(),(unsigned long long)nwid); - OSUtils::ztsnprintf(p1,sizeof(p1),"%s" ZT_PATH_SEPARATOR_S "%.10llx.json",pb,(unsigned long long)id); - if (!OSUtils::writeFile(p1,OSUtils::jsonDump(record,-1))) { - OSUtils::ztsnprintf(p2,sizeof(p2),"%s" ZT_PATH_SEPARATOR_S "%.16llx",_networksPath.c_str(),(unsigned long long)nwid); - OSUtils::mkdir(p2); - OSUtils::mkdir(pb); - if (!OSUtils::writeFile(p1,OSUtils::jsonDump(record,-1))) - fprintf(stderr,"WARNING: controller unable to write to path: %s" ZT_EOL_S,p1); - } - _memberChanged(old,record,true); - } - } - } else if (objtype == "trace") { - const std::string id = record["id"]; - if (id.length() > 0) { - OSUtils::ztsnprintf(p1,sizeof(p1),"%s" ZT_PATH_SEPARATOR_S "%s.json",_tracePath.c_str(),id.c_str()); - OSUtils::writeFile(p1,OSUtils::jsonDump(record,-1)); - } - } - } catch ( ... ) {} // drop invalid records missing fields -} - -void FileDB::eraseNetwork(const uint64_t networkId) -{ - nlohmann::json network,nullJson; - get(networkId,network); - char p[16384]; - OSUtils::ztsnprintf(p,sizeof(p),"%s" ZT_PATH_SEPARATOR_S "%.16llx.json",_networksPath.c_str(),networkId); - OSUtils::rm(p); - OSUtils::ztsnprintf(p,sizeof(p),"%s" ZT_PATH_SEPARATOR_S "%.16llx-online.json",_networksPath.c_str(),networkId); - OSUtils::rm(p); - OSUtils::ztsnprintf(p,sizeof(p),"%s" ZT_PATH_SEPARATOR_S "%.16llx" ZT_PATH_SEPARATOR_S "member",_networksPath.c_str(),(unsigned long long)networkId); - OSUtils::rmDashRf(p); - _networkChanged(network,nullJson,true); - std::lock_guard<std::mutex> l(this->_online_l); - this->_online.erase(networkId); - this->_onlineChanged = true; -} - -void FileDB::eraseMember(const uint64_t networkId,const uint64_t memberId) -{ - nlohmann::json network,member,nullJson; - get(networkId,network); - get(memberId,member); - char p[4096]; - OSUtils::ztsnprintf(p,sizeof(p),"%s" ZT_PATH_SEPARATOR_S "%.16llx" ZT_PATH_SEPARATOR_S "member" ZT_PATH_SEPARATOR_S "%.10llx.json",_networksPath.c_str(),networkId,memberId); - OSUtils::rm(p); - _memberChanged(member,nullJson,true); - std::lock_guard<std::mutex> l(this->_online_l); - this->_online[networkId].erase(memberId); - this->_onlineChanged = true; -} - -void FileDB::nodeIsOnline(const uint64_t networkId,const uint64_t memberId,const InetAddress &physicalAddress) -{ - char mid[32],atmp[64]; - OSUtils::ztsnprintf(mid,sizeof(mid),"%.10llx",(unsigned long long)memberId); - physicalAddress.toString(atmp); - std::lock_guard<std::mutex> l(this->_online_l); - this->_online[networkId][memberId][OSUtils::now()] = physicalAddress; - this->_onlineChanged = true; -} -#endif - } // namespace ZeroTier |