summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAdam Ierymenko <adam.ierymenko@gmail.com>2019-07-23 16:06:35 -0700
committerAdam Ierymenko <adam.ierymenko@gmail.com>2019-07-23 16:06:35 -0700
commitac2688de5853160a1d4afb02395488594e6cca8e (patch)
treebee1d2ace7b0b2018238b34f19c8a7354a7e972d
parent5f11daadf20cc92dc451d0fdd41dd930b2524e8a (diff)
downloadinfinitytier-ac2688de5853160a1d4afb02395488594e6cca8e.tar.gz
infinitytier-ac2688de5853160a1d4afb02395488594e6cca8e.zip
More LFDB work
-rw-r--r--controller/LFDB.cpp324
-rw-r--r--controller/LFDB.hpp7
2 files changed, 100 insertions, 231 deletions
diff --git a/controller/LFDB.cpp b/controller/LFDB.cpp
index 826d7cba..8779c47e 100644
--- a/controller/LFDB.cpp
+++ b/controller/LFDB.cpp
@@ -41,30 +41,77 @@ LFDB::LFDB(EmbeddedNetworkController *const nc,const Identity &myId,const char *
DB(nc,myId,path),
_nc(nc),
_myId(myId),
- _lfOwnerPrivate(lfOwnerPrivate),
- _lfOwnerPublic(lfOwnerPublic),
- _lfNodeHost(lfNodeHost),
- _lfNodePort(lfNodePort),
+ _lfOwnerPrivate((lfOwnerPrivate) ? lfOwnerPrivate : ""),
+ _lfOwnerPublic((lfOwnerPublic) ? lfOwnerPublic : ""),
+ _lfNodeHost((lfNodeHost) ? lfNodeHost : "127.0.0.1"),
+ _lfNodePort(((lfNodePort > 0)&&(lfNodePort < 65536)) ? lfNodePort : 9980),
_running(true),
_ready(false),
_storeOnlineState(storeOnlineState)
{
_syncThread = std::thread([this]() {
char controllerAddress[24];
+ const uint64_t controllerAddressInt = _myId.address().toInt();
_myId.address().toString(controllerAddress);
+ std::string networksSelectorName("com.zerotier.controller.lfdb:"); networksSelectorName.append(controllerAddress); networksSelectorName.append("/network");
+ std::string membersSelectorName("com.zerotier.controller.lfdb:"); membersSelectorName.append(controllerAddress); membersSelectorName.append("/network/member");
httplib::Client htcli(_lfNodeHost.c_str(),_lfNodePort,600);
std::ostringstream query;
int64_t timeRangeStart = 0;
while (_running) {
+ {
+ std::lock_guard<std::mutex> sl(_state_l);
+ for(auto ns=_state.begin();ns!=_state.end();++ns) {
+ if (ns->second.dirty) {
+ nlohmann::json network;
+ if (get(ns->first,network)) {
+ nlohmann::json newrec;
+ newrec["Selectors"] = {{ { { "Name",networksSelectorName },{ "Ordinal",ns->first } } }};
+ newrec["Value"] = network.dump();
+ newrec["OwnerPrivate"] = _lfOwnerPrivate;
+ newrec["MaskingKey"] = controllerAddress;
+ auto resp = htcli.Post("/make",newrec.dump(),"application/json");
+ if (resp->status == 200) {
+ ns->second.dirty = false;
+ } else {
+ fprintf(stderr,"ERROR: LFDB: %d from node (create/update network): %s" ZT_EOL_S,resp->status,resp->body.c_str());
+ }
+ }
+ }
+
+ for(auto ms=ns->second.members.begin();ms!=ns->second.members.end();++ms) {
+ if ((_storeOnlineState)&&(ms->second.lastOnlineDirty)) {
+ }
+
+ if (ms->second.dirty) {
+ nlohmann::json network,member;
+ if (get(ns->first,network,ms->first,member)) {
+ nlohmann::json newrec;
+ newrec["Selectors"] = {{ { { "Name",networksSelectorName },{ "Ordinal",ns->first } },{ { "Name",membersSelectorName },{ "Ordinal",ms->first } } }};
+ newrec["Value"] = member.dump();
+ newrec["OwnerPrivate"] = _lfOwnerPrivate;
+ newrec["MaskingKey"] = controllerAddress;
+ auto resp = htcli.Post("/make",newrec.dump(),"application/json");
+ if (resp->status == 200) {
+ ms->second.dirty = false;
+ } else {
+ fprintf(stderr,"ERROR: LFDB: %d from node (create/update member): %s" ZT_EOL_S,resp->status,resp->body.c_str());
+ }
+ }
+ }
+ }
+ }
+ }
+
query.clear();
query
<< '{'
<< "\"Ranges\":[{"
- << "\"Name\": \"com.zerotier.controller.lfdb:" << controllerAddress << "/network\","
- << "\"Range\": [ 0,18446744073709551615 ]"
+ << "\"Name\":\"" << networksSelectorName << "\","
+ << "\"Range\":[0,18446744073709551615]"
<< "}],"
- << "\"TimeRange\": [ " << timeRangeStart << ",18446744073709551615 ],"
+ << "\"TimeRange\":[" << timeRangeStart << ",18446744073709551615],"
<< "\"MaskingKey\":\"" << controllerAddress << "\","
<< "\"Owners\":[\"" << _lfOwnerPublic << "\"]"
<< '}';
@@ -79,11 +126,19 @@ LFDB::LFDB(EmbeddedNetworkController *const nc,const Identity &myId,const char *
if (result.is_object()) {
nlohmann::json &record = result["Record"];
if (record.is_object()) {
- int64_t ts = record["Timestamp"];
- std::string value = result["Value"];
- nlohmann::json network(OSUtils::jsonParse(value));
+ const std::string recordValue = result["Value"];
+ nlohmann::json network(OSUtils::jsonParse(recordValue));
if (network.is_object()) {
- std::string idstr = network["id"];
+ const std::string idstr = network["id"];
+ const uint64_t id = Utils::hexStrToU64(idstr.c_str());
+ if ((id >> 24) == controllerAddressInt) {
+ std::lock_guard<std::mutex> sl(_state_l);
+ _NetworkState &ns = _state[id];
+ if (!ns.dirty) {
+ nlohmann::json nullJson;
+ _networkChanged(nullJson,network,false);
+ }
+ }
}
}
}
@@ -98,13 +153,13 @@ LFDB::LFDB(EmbeddedNetworkController *const nc,const Identity &myId,const char *
query
<< '{'
<< "\"Ranges\":[{"
- << "\"Name\": \"com.zerotier.controller.lfdb:" << controllerAddress << "/network\","
- << "\"Range\": [ 0,18446744073709551615 ]"
+ << "\"Name\":\"" << networksSelectorName << "\","
+ << "\"Range\":[0,18446744073709551615]"
<< "},{"
- << "\"Name\": \"com.zerotier.controller.lfdb:" << controllerAddress << "/network/member\","
- << "\"Range\": [ 0,18446744073709551615 ]"
+ << "\"Name\":\"" << membersSelectorName << "\","
+ << "\"Range\":[0,18446744073709551615]"
<< "}],"
- << "\"TimeRange\": [ " << timeRangeStart << ",18446744073709551615 ],"
+ << "\"TimeRange\":[" << timeRangeStart << ",18446744073709551615],"
<< "\"MaskingKey\":\"" << controllerAddress << "\","
<< "\"Owners\":[\"" << _lfOwnerPublic << "\"]"
<< '}';
@@ -119,12 +174,24 @@ LFDB::LFDB(EmbeddedNetworkController *const nc,const Identity &myId,const char *
if (result.is_object()) {
nlohmann::json &record = result["Record"];
if (record.is_object()) {
- int64_t ts = record["Timestamp"];
- std::string value = result["Value"];
- nlohmann::json member(OSUtils::jsonParse(value));
+ const std::string recordValue = result["Value"];
+ nlohmann::json member(OSUtils::jsonParse(recordValue));
if (member.is_object()) {
- std::string nwidstr = member["nwid"];
- std::string idstr = member["id"];
+ const std::string nwidstr = member["nwid"];
+ const std::string idstr = member["id"];
+ const uint64_t nwid = Utils::hexStrToU64(nwidstr.c_str());
+ const uint64_t id = Utils::hexStrToU64(idstr.c_str());
+ if ((id)&&((nwid >> 24) == controllerAddressInt)) {
+ std::lock_guard<std::mutex> sl(_state_l);
+ auto ns = _state.find(nwid);
+ if (ns != _state.end()) {
+ _MemberState &ms = ns->second.members[id];
+ if (!ms.dirty) {
+ nlohmann::json nullJson;
+ _memberChanged(nullJson,member,false);
+ }
+ }
+ }
}
}
}
@@ -138,8 +205,7 @@ LFDB::LFDB(EmbeddedNetworkController *const nc,const Identity &myId,const char *
timeRangeStart = time(nullptr) - 120; // start next query 2m before now to avoid losing updates
_ready = true;
- // Delay 2s between queries, checking running flag every 100ms
- for(int k=0;k<20;++k) {
+ for(int k=0;k<20;++k) { // 2s delay between queries for remotely modified networks or members
if (!_running)
return;
std::this_thread::sleep_for(std::chrono::milliseconds(100));
@@ -184,6 +250,11 @@ void LFDB::save(nlohmann::json *orig,nlohmann::json &record)
nlohmann::json old;
get(nwid,old);
if ((!old.is_object())||(old != record)) {
+ _networkChanged(old,record,true);
+ {
+ std::lock_guard<std::mutex> l(_state_l);
+ _state[nwid].dirty = true;
+ }
}
}
} else if (objtype == "member") {
@@ -193,6 +264,11 @@ void LFDB::save(nlohmann::json *orig,nlohmann::json &record)
nlohmann::json network,old;
get(nwid,network,id,old);
if ((!old.is_object())||(old != record)) {
+ _memberChanged(old,record,true);
+ {
+ std::lock_guard<std::mutex> l(_state_l);
+ _state[nwid].members[id].dirty = true;
+ }
}
}
}
@@ -223,206 +299,4 @@ void LFDB::nodeIsOnline(const uint64_t networkId,const uint64_t memberId,const I
}
}
-#if 0
-FileDB::FileDB(EmbeddedNetworkController *const nc,const Identity &myId,const char *path) :
- DB(nc,myId,path),
- _networksPath(_path + ZT_PATH_SEPARATOR_S + "network"),
- _tracePath(_path + ZT_PATH_SEPARATOR_S + "trace"),
- _onlineChanged(false),
- _running(true)
-{
- OSUtils::mkdir(_path.c_str());
- OSUtils::lockDownFile(_path.c_str(),true);
- OSUtils::mkdir(_networksPath.c_str());
- OSUtils::mkdir(_tracePath.c_str());
-
- std::vector<std::string> networks(OSUtils::listDirectory(_networksPath.c_str(),false));
- std::string buf;
- for(auto n=networks.begin();n!=networks.end();++n) {
- buf.clear();
- if ((n->length() == 21)&&(OSUtils::readFile((_networksPath + ZT_PATH_SEPARATOR_S + *n).c_str(),buf))) {
- try {
- nlohmann::json network(OSUtils::jsonParse(buf));
- const std::string nwids = network["id"];
- if (nwids.length() == 16) {
- nlohmann::json nullJson;
- _networkChanged(nullJson,network,false);
- std::string membersPath(_networksPath + ZT_PATH_SEPARATOR_S + nwids + ZT_PATH_SEPARATOR_S "member");
- std::vector<std::string> members(OSUtils::listDirectory(membersPath.c_str(),false));
- for(auto m=members.begin();m!=members.end();++m) {
- buf.clear();
- if ((m->length() == 15)&&(OSUtils::readFile((membersPath + ZT_PATH_SEPARATOR_S + *m).c_str(),buf))) {
- try {
- nlohmann::json member(OSUtils::jsonParse(buf));
- const std::string addrs = member["id"];
- if (addrs.length() == 10) {
- nlohmann::json nullJson2;
- _memberChanged(nullJson2,member,false);
- }
- } catch ( ... ) {}
- }
- }
- }
- } catch ( ... ) {}
- }
- }
-
- _onlineUpdateThread = std::thread([this]() {
- unsigned int cnt = 0;
- while (this->_running) {
- std::this_thread::sleep_for(std::chrono::microseconds(100));
- if ((++cnt % 20) == 0) { // 5 seconds
- std::lock_guard<std::mutex> l(this->_online_l);
- if (!this->_running) return;
- if (this->_onlineChanged) {
- char p[4096],atmp[64];
- for(auto nw=this->_online.begin();nw!=this->_online.end();++nw) {
- OSUtils::ztsnprintf(p,sizeof(p),"%s" ZT_PATH_SEPARATOR_S "%.16llx-online.json",_networksPath.c_str(),(unsigned long long)nw->first);
- FILE *f = fopen(p,"wb");
- if (f) {
- fprintf(f,"{");
- const char *memberPrefix = "";
- for(auto m=nw->second.begin();m!=nw->second.end();++m) {
- fprintf(f,"%s\"%.10llx\":{" ZT_EOL_S,memberPrefix,(unsigned long long)m->first);
- memberPrefix = ",";
- InetAddress lastAddr;
- const char *timestampPrefix = " ";
- int cnt = 0;
- for(auto ts=m->second.rbegin();ts!=m->second.rend();) {
- if (cnt < 25) {
- if (lastAddr != ts->second) {
- lastAddr = ts->second;
- fprintf(f,"%s\"%lld\":\"%s\"" ZT_EOL_S,timestampPrefix,(long long)ts->first,ts->second.toString(atmp));
- timestampPrefix = ",";
- ++cnt;
- ++ts;
- } else {
- ts = std::map<int64_t,InetAddress>::reverse_iterator(m->second.erase(std::next(ts).base()));
- }
- } else {
- ts = std::map<int64_t,InetAddress>::reverse_iterator(m->second.erase(std::next(ts).base()));
- }
- }
- fprintf(f,"}");
- }
- fprintf(f,"}" ZT_EOL_S);
- fclose(f);
- }
- }
- this->_onlineChanged = false;
- }
- }
- }
- });
-}
-
-FileDB::~FileDB()
-{
- try {
- _online_l.lock();
- _running = false;
- _online_l.unlock();
- _onlineUpdateThread.join();
- } catch ( ... ) {}
-}
-
-bool FileDB::waitForReady() { return true; }
-bool FileDB::isReady() { return true; }
-
-void FileDB::save(nlohmann::json *orig,nlohmann::json &record)
-{
- char p1[4096],p2[4096],pb[4096];
- try {
- if (orig) {
- if (*orig != record) {
- record["revision"] = OSUtils::jsonInt(record["revision"],0ULL) + 1;
- }
- } else {
- record["revision"] = 1;
- }
-
- const std::string objtype = record["objtype"];
- if (objtype == "network") {
- const uint64_t nwid = OSUtils::jsonIntHex(record["id"],0ULL);
- if (nwid) {
- nlohmann::json old;
- get(nwid,old);
- if ((!old.is_object())||(old != record)) {
- OSUtils::ztsnprintf(p1,sizeof(p1),"%s" ZT_PATH_SEPARATOR_S "%.16llx.json",_networksPath.c_str(),nwid);
- if (!OSUtils::writeFile(p1,OSUtils::jsonDump(record,-1)))
- fprintf(stderr,"WARNING: controller unable to write to path: %s" ZT_EOL_S,p1);
- _networkChanged(old,record,true);
- }
- }
- } else if (objtype == "member") {
- const uint64_t id = OSUtils::jsonIntHex(record["id"],0ULL);
- const uint64_t nwid = OSUtils::jsonIntHex(record["nwid"],0ULL);
- if ((id)&&(nwid)) {
- nlohmann::json network,old;
- get(nwid,network,id,old);
- if ((!old.is_object())||(old != record)) {
- OSUtils::ztsnprintf(pb,sizeof(pb),"%s" ZT_PATH_SEPARATOR_S "%.16llx" ZT_PATH_SEPARATOR_S "member",_networksPath.c_str(),(unsigned long long)nwid);
- OSUtils::ztsnprintf(p1,sizeof(p1),"%s" ZT_PATH_SEPARATOR_S "%.10llx.json",pb,(unsigned long long)id);
- if (!OSUtils::writeFile(p1,OSUtils::jsonDump(record,-1))) {
- OSUtils::ztsnprintf(p2,sizeof(p2),"%s" ZT_PATH_SEPARATOR_S "%.16llx",_networksPath.c_str(),(unsigned long long)nwid);
- OSUtils::mkdir(p2);
- OSUtils::mkdir(pb);
- if (!OSUtils::writeFile(p1,OSUtils::jsonDump(record,-1)))
- fprintf(stderr,"WARNING: controller unable to write to path: %s" ZT_EOL_S,p1);
- }
- _memberChanged(old,record,true);
- }
- }
- } else if (objtype == "trace") {
- const std::string id = record["id"];
- if (id.length() > 0) {
- OSUtils::ztsnprintf(p1,sizeof(p1),"%s" ZT_PATH_SEPARATOR_S "%s.json",_tracePath.c_str(),id.c_str());
- OSUtils::writeFile(p1,OSUtils::jsonDump(record,-1));
- }
- }
- } catch ( ... ) {} // drop invalid records missing fields
-}
-
-void FileDB::eraseNetwork(const uint64_t networkId)
-{
- nlohmann::json network,nullJson;
- get(networkId,network);
- char p[16384];
- OSUtils::ztsnprintf(p,sizeof(p),"%s" ZT_PATH_SEPARATOR_S "%.16llx.json",_networksPath.c_str(),networkId);
- OSUtils::rm(p);
- OSUtils::ztsnprintf(p,sizeof(p),"%s" ZT_PATH_SEPARATOR_S "%.16llx-online.json",_networksPath.c_str(),networkId);
- OSUtils::rm(p);
- OSUtils::ztsnprintf(p,sizeof(p),"%s" ZT_PATH_SEPARATOR_S "%.16llx" ZT_PATH_SEPARATOR_S "member",_networksPath.c_str(),(unsigned long long)networkId);
- OSUtils::rmDashRf(p);
- _networkChanged(network,nullJson,true);
- std::lock_guard<std::mutex> l(this->_online_l);
- this->_online.erase(networkId);
- this->_onlineChanged = true;
-}
-
-void FileDB::eraseMember(const uint64_t networkId,const uint64_t memberId)
-{
- nlohmann::json network,member,nullJson;
- get(networkId,network);
- get(memberId,member);
- char p[4096];
- OSUtils::ztsnprintf(p,sizeof(p),"%s" ZT_PATH_SEPARATOR_S "%.16llx" ZT_PATH_SEPARATOR_S "member" ZT_PATH_SEPARATOR_S "%.10llx.json",_networksPath.c_str(),networkId,memberId);
- OSUtils::rm(p);
- _memberChanged(member,nullJson,true);
- std::lock_guard<std::mutex> l(this->_online_l);
- this->_online[networkId].erase(memberId);
- this->_onlineChanged = true;
-}
-
-void FileDB::nodeIsOnline(const uint64_t networkId,const uint64_t memberId,const InetAddress &physicalAddress)
-{
- char mid[32],atmp[64];
- OSUtils::ztsnprintf(mid,sizeof(mid),"%.10llx",(unsigned long long)memberId);
- physicalAddress.toString(atmp);
- std::lock_guard<std::mutex> l(this->_online_l);
- this->_online[networkId][memberId][OSUtils::now()] = physicalAddress;
- this->_onlineChanged = true;
-}
-#endif
-
} // namespace ZeroTier
diff --git a/controller/LFDB.hpp b/controller/LFDB.hpp
index 2fcb47ba..6659c0bf 100644
--- a/controller/LFDB.hpp
+++ b/controller/LFDB.hpp
@@ -66,8 +66,7 @@ protected:
EmbeddedNetworkController *const _nc;
const Identity _myId;
- std::string _lfOwnerPrivate;
- std::string _lfOwnerPublic;
+ std::string _lfOwnerPrivate,_lfOwnerPublic;
std::string _lfNodeHost;
int _lfNodePort;
@@ -76,12 +75,10 @@ protected:
_MemberState() :
lastOnlineAddress(),
lastOnlineTime(0),
- recordTimestamp(0),
dirty(false),
lastOnlineDirty(false) {}
InetAddress lastOnlineAddress;
int64_t lastOnlineTime;
- int64_t recordTimestamp;
bool dirty;
bool lastOnlineDirty;
};
@@ -89,10 +86,8 @@ protected:
{
_NetworkState() :
members(),
- recordTimestamp(0),
dirty(false) {}
std::unordered_map<uint64_t,_MemberState> members;
- int64_t recordTimestamp;
bool dirty;
};
std::unordered_map<uint64_t,_NetworkState> _state;