diff options
author | Adam Ierymenko <adam.ierymenko@gmail.com> | 2019-07-22 13:43:06 -0700 |
---|---|---|
committer | Adam Ierymenko <adam.ierymenko@gmail.com> | 2019-07-22 13:43:06 -0700 |
commit | 5edd04638d0bea58a619969cbb19bac2abb3f98e (patch) | |
tree | 5a6c491ab7a84accb30a1a96a4b10fac3a11f370 /controller | |
parent | a0f47b12b43c99100c8db87f5a127221d513784b (diff) | |
download | infinitytier-5edd04638d0bea58a619969cbb19bac2abb3f98e.tar.gz infinitytier-5edd04638d0bea58a619969cbb19bac2abb3f98e.zip |
LFDB work in progress
Diffstat (limited to 'controller')
-rw-r--r-- | controller/EmbeddedNetworkController.cpp | 46 | ||||
-rw-r--r-- | controller/EmbeddedNetworkController.hpp | 4 | ||||
-rw-r--r-- | controller/LFDB.cpp | 360 | ||||
-rw-r--r-- | controller/LFDB.hpp | 97 |
4 files changed, 501 insertions, 6 deletions
diff --git a/controller/EmbeddedNetworkController.cpp b/controller/EmbeddedNetworkController.cpp index 99c664f0..007a4112 100644 --- a/controller/EmbeddedNetworkController.cpp +++ b/controller/EmbeddedNetworkController.cpp @@ -46,6 +46,11 @@ #include "../version.h" #include "EmbeddedNetworkController.hpp" +#include "LFDB.hpp" +#include "FileDB.hpp" +#ifdef ZT_CONTROLLER_USE_LIBPQ +#include "PostgreSQL.hpp" +#endif #include "../node/Node.hpp" #include "../node/CertificateOfMembership.hpp" @@ -488,12 +493,49 @@ void EmbeddedNetworkController::init(const Identity &signingId,Sender *sender) _signingId = signingId; _sender = sender; _signingIdAddressString = signingId.address().toString(tmp); + #ifdef ZT_CONTROLLER_USE_LIBPQ - if ((_path.length() > 9)&&(_path.substr(0,9) == "postgres:")) + if ((_path.length() > 9)&&(_path.substr(0,9) == "postgres:")) { _db.reset(new PostgreSQL(this,_signingId,_path.substr(9).c_str(), _listenPort, _mqc)); - else // else use FileDB after endif + } else { #endif + + std::string lfJSON; + OSUtils::readFile((_path + ZT_PATH_SEPARATOR_S "local.conf").c_str(),lfJSON); + if (lfJSON.length() > 0) { + nlohmann::json lfConfig(OSUtils::jsonParse(lfJSON)); + nlohmann::json &settings = lfConfig["settings"]; + if (settings.is_object()) { + nlohmann::json &controllerDb = lfConfig["controllerDb"]; + if (controllerDb.is_object()) { + std::string type = controllerDb["type"]; + if (type == "lf") { + std::string lfOwner = controllerDb["owner"]; + std::string lfHost = controllerDb["host"]; + int lfPort = controllerDb["port"]; + bool storeOnlineState = controllerDb["storeOnlineState"]; + if ((lfOwner.length())&&(lfHost.length())&&(lfPort > 0)&&(lfPort < 65536)) { + std::size_t pubHdrLoc = lfOwner.find("Public: "); + if ((pubHdrLoc > 0)&&((pubHdrLoc + 8) < lfOwner.length())) { + std::string lfOwnerPublic = lfOwner.substr(pubHdrLoc + 8); + std::size_t pubHdrEnd = lfOwnerPublic.find_first_of("\n\r\t "); + if (pubHdrEnd != std::string::npos) { + lfOwnerPublic = lfOwnerPublic.substr(0,pubHdrEnd); + _db.reset(new LFDB(this,_signingId,_path.c_str(),lfOwner.c_str(),lfOwnerPublic.c_str(),lfHost.c_str(),lfPort,storeOnlineState)); + } + } + } + } + } + } + } + if (!_db) _db.reset(new FileDB(this,_signingId,_path.c_str())); + +#ifdef ZT_CONTROLLER_USE_LIBPQ + } +#endif + _db->waitForReady(); } diff --git a/controller/EmbeddedNetworkController.hpp b/controller/EmbeddedNetworkController.hpp index 269442a8..6ce0b5cf 100644 --- a/controller/EmbeddedNetworkController.hpp +++ b/controller/EmbeddedNetworkController.hpp @@ -51,10 +51,6 @@ #include "../ext/json/json.hpp" #include "DB.hpp" -#include "FileDB.hpp" -#ifdef ZT_CONTROLLER_USE_LIBPQ -#include "PostgreSQL.hpp" -#endif namespace ZeroTier { diff --git a/controller/LFDB.cpp b/controller/LFDB.cpp new file mode 100644 index 00000000..82858b50 --- /dev/null +++ b/controller/LFDB.cpp @@ -0,0 +1,360 @@ +/* + * ZeroTier One - Network Virtualization Everywhere + * Copyright (C) 2011-2019 ZeroTier, Inc. https://www.zerotier.com/ + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * -- + * + * You can be released from the requirements of the license by purchasing + * a commercial license. Buying such a license is mandatory as soon as you + * develop commercial closed-source software that incorporates or links + * directly against ZeroTier software without disclosing the source code + * of your own application. + */ + +#include "LFDB.hpp" + +#include <thread> +#include <iostream> +#include <sstream> + +#include "../osdep/OSUtils.hpp" +#include "../ext/cpp-httplib/httplib.h" + +namespace ZeroTier +{ + +LFDB::LFDB(EmbeddedNetworkController *const nc,const Identity &myId,const char *path,const char *lfOwnerPrivate,const char *lfOwnerPublic,const char *lfNodeHost,int lfNodePort,bool storeOnlineState) : + DB(nc,myId,path), + _nc(nc), + _myId(myId), + _lfOwnerPrivate(lfOwnerPrivate), + _lfOwnerPublic(lfOwnerPublic), + _lfNodeHost(lfNodeHost), + _lfNodePort(lfNodePort), + _running(true), + _ready(false), + _storeOnlineState(storeOnlineState) +{ + _syncThread = std::thread([this]() { + char controllerAddress[24]; + _myId.address().toString(controllerAddress); + + httplib::Client htcli(_lfNodeHost.c_str(),_lfNodePort,600); + while (_running) { + std::ostringstream query; + query + << '{' + << "\"Ranges\":[{" + << "\"Name\": \"com.zerotier.controller.lfdb:" << controllerAddress << "\"" + << "}]," + << "\"MaskingKey\":\"" << controllerAddress << "\"," + << "\"Owners\":[\"" << _lfOwnerPublic << "\"]," + << "\"Open\":true" + << '}'; + auto resp = htcli.Post("/query",query.str(),"application/json"); + if (resp->status == 200) { + fprintf(stderr,"%d %s\n",resp->status,resp->body.c_str()); + } else { + fprintf(stderr,"ERROR: LFDB: %d from node: %s" ZT_EOL_S,resp->status,resp->body.c_str()); + } + + _ready = true; + + for(int k=0;k<10;++k) { + if (!_running) + return; + usleep(100000); + } + } + }); +} + +LFDB::~LFDB() +{ + _running = false; + _syncThread.join(); +} + +bool LFDB::waitForReady() +{ + while (!_ready) + usleep(10000); +} + +bool LFDB::isReady() +{ + return (_ready); +} + +void LFDB::save(nlohmann::json *orig,nlohmann::json &record) +{ + if (orig) { + if (*orig != record) { + record["revision"] = OSUtils::jsonInt(record["revision"],0ULL) + 1; + } + } else { + record["revision"] = 1; + } + + const std::string objtype = record["objtype"]; + if (objtype == "network") { + const uint64_t nwid = OSUtils::jsonIntHex(record["id"],0ULL); + if (nwid) { + nlohmann::json old; + get(nwid,old); + if ((!old.is_object())||(old != record)) { + } + } + } else if (objtype == "member") { + const uint64_t nwid = OSUtils::jsonIntHex(record["nwid"],0ULL); + const uint64_t id = OSUtils::jsonIntHex(record["id"],0ULL); + if ((id)&&(nwid)) { + nlohmann::json network,old; + get(nwid,network,id,old); + if ((!old.is_object())||(old != record)) { + } + } + } +} + +void LFDB::eraseNetwork(const uint64_t networkId) +{ + // TODO +} + +void LFDB::eraseMember(const uint64_t networkId,const uint64_t memberId) +{ + // TODO +} + +void LFDB::nodeIsOnline(const uint64_t networkId,const uint64_t memberId,const InetAddress &physicalAddress) +{ + std::lock_guard<std::mutex> l(_state_l); + auto nw = _state.find(networkId); + if (nw != _state.end()) { + auto m = nw->second.members.find(memberId); + if (m != nw->second.members.end()) { + m->second.lastOnlineTime = OSUtils::now(); + if (physicalAddress) + m->second.lastOnlineAddress = physicalAddress; + m->second.lastOnlineDirty = true; + } + } +} + +#if 0 +FileDB::FileDB(EmbeddedNetworkController *const nc,const Identity &myId,const char *path) : + DB(nc,myId,path), + _networksPath(_path + ZT_PATH_SEPARATOR_S + "network"), + _tracePath(_path + ZT_PATH_SEPARATOR_S + "trace"), + _onlineChanged(false), + _running(true) +{ + OSUtils::mkdir(_path.c_str()); + OSUtils::lockDownFile(_path.c_str(),true); + OSUtils::mkdir(_networksPath.c_str()); + OSUtils::mkdir(_tracePath.c_str()); + + std::vector<std::string> networks(OSUtils::listDirectory(_networksPath.c_str(),false)); + std::string buf; + for(auto n=networks.begin();n!=networks.end();++n) { + buf.clear(); + if ((n->length() == 21)&&(OSUtils::readFile((_networksPath + ZT_PATH_SEPARATOR_S + *n).c_str(),buf))) { + try { + nlohmann::json network(OSUtils::jsonParse(buf)); + const std::string nwids = network["id"]; + if (nwids.length() == 16) { + nlohmann::json nullJson; + _networkChanged(nullJson,network,false); + std::string membersPath(_networksPath + ZT_PATH_SEPARATOR_S + nwids + ZT_PATH_SEPARATOR_S "member"); + std::vector<std::string> members(OSUtils::listDirectory(membersPath.c_str(),false)); + for(auto m=members.begin();m!=members.end();++m) { + buf.clear(); + if ((m->length() == 15)&&(OSUtils::readFile((membersPath + ZT_PATH_SEPARATOR_S + *m).c_str(),buf))) { + try { + nlohmann::json member(OSUtils::jsonParse(buf)); + const std::string addrs = member["id"]; + if (addrs.length() == 10) { + nlohmann::json nullJson2; + _memberChanged(nullJson2,member,false); + } + } catch ( ... ) {} + } + } + } + } catch ( ... ) {} + } + } + + _onlineUpdateThread = std::thread([this]() { + unsigned int cnt = 0; + while (this->_running) { + std::this_thread::sleep_for(std::chrono::microseconds(100)); + if ((++cnt % 20) == 0) { // 5 seconds + std::lock_guard<std::mutex> l(this->_online_l); + if (!this->_running) return; + if (this->_onlineChanged) { + char p[4096],atmp[64]; + for(auto nw=this->_online.begin();nw!=this->_online.end();++nw) { + OSUtils::ztsnprintf(p,sizeof(p),"%s" ZT_PATH_SEPARATOR_S "%.16llx-online.json",_networksPath.c_str(),(unsigned long long)nw->first); + FILE *f = fopen(p,"wb"); + if (f) { + fprintf(f,"{"); + const char *memberPrefix = ""; + for(auto m=nw->second.begin();m!=nw->second.end();++m) { + fprintf(f,"%s\"%.10llx\":{" ZT_EOL_S,memberPrefix,(unsigned long long)m->first); + memberPrefix = ","; + InetAddress lastAddr; + const char *timestampPrefix = " "; + int cnt = 0; + for(auto ts=m->second.rbegin();ts!=m->second.rend();) { + if (cnt < 25) { + if (lastAddr != ts->second) { + lastAddr = ts->second; + fprintf(f,"%s\"%lld\":\"%s\"" ZT_EOL_S,timestampPrefix,(long long)ts->first,ts->second.toString(atmp)); + timestampPrefix = ","; + ++cnt; + ++ts; + } else { + ts = std::map<int64_t,InetAddress>::reverse_iterator(m->second.erase(std::next(ts).base())); + } + } else { + ts = std::map<int64_t,InetAddress>::reverse_iterator(m->second.erase(std::next(ts).base())); + } + } + fprintf(f,"}"); + } + fprintf(f,"}" ZT_EOL_S); + fclose(f); + } + } + this->_onlineChanged = false; + } + } + } + }); +} + +FileDB::~FileDB() +{ + try { + _online_l.lock(); + _running = false; + _online_l.unlock(); + _onlineUpdateThread.join(); + } catch ( ... ) {} +} + +bool FileDB::waitForReady() { return true; } +bool FileDB::isReady() { return true; } + +void FileDB::save(nlohmann::json *orig,nlohmann::json &record) +{ + char p1[4096],p2[4096],pb[4096]; + try { + if (orig) { + if (*orig != record) { + record["revision"] = OSUtils::jsonInt(record["revision"],0ULL) + 1; + } + } else { + record["revision"] = 1; + } + + const std::string objtype = record["objtype"]; + if (objtype == "network") { + const uint64_t nwid = OSUtils::jsonIntHex(record["id"],0ULL); + if (nwid) { + nlohmann::json old; + get(nwid,old); + if ((!old.is_object())||(old != record)) { + OSUtils::ztsnprintf(p1,sizeof(p1),"%s" ZT_PATH_SEPARATOR_S "%.16llx.json",_networksPath.c_str(),nwid); + if (!OSUtils::writeFile(p1,OSUtils::jsonDump(record,-1))) + fprintf(stderr,"WARNING: controller unable to write to path: %s" ZT_EOL_S,p1); + _networkChanged(old,record,true); + } + } + } else if (objtype == "member") { + const uint64_t id = OSUtils::jsonIntHex(record["id"],0ULL); + const uint64_t nwid = OSUtils::jsonIntHex(record["nwid"],0ULL); + if ((id)&&(nwid)) { + nlohmann::json network,old; + get(nwid,network,id,old); + if ((!old.is_object())||(old != record)) { + OSUtils::ztsnprintf(pb,sizeof(pb),"%s" ZT_PATH_SEPARATOR_S "%.16llx" ZT_PATH_SEPARATOR_S "member",_networksPath.c_str(),(unsigned long long)nwid); + OSUtils::ztsnprintf(p1,sizeof(p1),"%s" ZT_PATH_SEPARATOR_S "%.10llx.json",pb,(unsigned long long)id); + if (!OSUtils::writeFile(p1,OSUtils::jsonDump(record,-1))) { + OSUtils::ztsnprintf(p2,sizeof(p2),"%s" ZT_PATH_SEPARATOR_S "%.16llx",_networksPath.c_str(),(unsigned long long)nwid); + OSUtils::mkdir(p2); + OSUtils::mkdir(pb); + if (!OSUtils::writeFile(p1,OSUtils::jsonDump(record,-1))) + fprintf(stderr,"WARNING: controller unable to write to path: %s" ZT_EOL_S,p1); + } + _memberChanged(old,record,true); + } + } + } else if (objtype == "trace") { + const std::string id = record["id"]; + if (id.length() > 0) { + OSUtils::ztsnprintf(p1,sizeof(p1),"%s" ZT_PATH_SEPARATOR_S "%s.json",_tracePath.c_str(),id.c_str()); + OSUtils::writeFile(p1,OSUtils::jsonDump(record,-1)); + } + } + } catch ( ... ) {} // drop invalid records missing fields +} + +void FileDB::eraseNetwork(const uint64_t networkId) +{ + nlohmann::json network,nullJson; + get(networkId,network); + char p[16384]; + OSUtils::ztsnprintf(p,sizeof(p),"%s" ZT_PATH_SEPARATOR_S "%.16llx.json",_networksPath.c_str(),networkId); + OSUtils::rm(p); + OSUtils::ztsnprintf(p,sizeof(p),"%s" ZT_PATH_SEPARATOR_S "%.16llx-online.json",_networksPath.c_str(),networkId); + OSUtils::rm(p); + OSUtils::ztsnprintf(p,sizeof(p),"%s" ZT_PATH_SEPARATOR_S "%.16llx" ZT_PATH_SEPARATOR_S "member",_networksPath.c_str(),(unsigned long long)networkId); + OSUtils::rmDashRf(p); + _networkChanged(network,nullJson,true); + std::lock_guard<std::mutex> l(this->_online_l); + this->_online.erase(networkId); + this->_onlineChanged = true; +} + +void FileDB::eraseMember(const uint64_t networkId,const uint64_t memberId) +{ + nlohmann::json network,member,nullJson; + get(networkId,network); + get(memberId,member); + char p[4096]; + OSUtils::ztsnprintf(p,sizeof(p),"%s" ZT_PATH_SEPARATOR_S "%.16llx" ZT_PATH_SEPARATOR_S "member" ZT_PATH_SEPARATOR_S "%.10llx.json",_networksPath.c_str(),networkId,memberId); + OSUtils::rm(p); + _memberChanged(member,nullJson,true); + std::lock_guard<std::mutex> l(this->_online_l); + this->_online[networkId].erase(memberId); + this->_onlineChanged = true; +} + +void FileDB::nodeIsOnline(const uint64_t networkId,const uint64_t memberId,const InetAddress &physicalAddress) +{ + char mid[32],atmp[64]; + OSUtils::ztsnprintf(mid,sizeof(mid),"%.10llx",(unsigned long long)memberId); + physicalAddress.toString(atmp); + std::lock_guard<std::mutex> l(this->_online_l); + this->_online[networkId][memberId][OSUtils::now()] = physicalAddress; + this->_onlineChanged = true; +} +#endif + +} // namespace ZeroTier diff --git a/controller/LFDB.hpp b/controller/LFDB.hpp new file mode 100644 index 00000000..a01e78e6 --- /dev/null +++ b/controller/LFDB.hpp @@ -0,0 +1,97 @@ +/* + * ZeroTier One - Network Virtualization Everywhere + * Copyright (C) 2011-2019 ZeroTier, Inc. https://www.zerotier.com/ + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * -- + * + * You can be released from the requirements of the license by purchasing + * a commercial license. Buying such a license is mandatory as soon as you + * develop commercial closed-source software that incorporates or links + * directly against ZeroTier software without disclosing the source code + * of your own application. + */ + +#ifndef ZT_CONTROLLER_LFDB_HPP +#define ZT_CONTROLLER_LFDB_HPP + +#include "DB.hpp" + +#include <mutex> +#include <string> +#include <unordered_map> +#include <atomic> + +namespace ZeroTier { + +/** + * DB implementation for controller that stores data in LF + */ +class LFDB : public DB +{ +public: + /** + * @param nc Network controller + * @param myId Identity of controller node (with secret) + * @param path Base path for ZeroTier node itself + * @param lfOwnerPrivate LF owner private in PEM format + * @param lfOwnerPublic LF owner public in @base62 format + * @param lfNodeHost LF node host + * @param lfNodePort LF node http (not https) port + * @param storeOnlineState If true, store online/offline state and IP info in LF (a lot of data, only for private networks!) + */ + LFDB(EmbeddedNetworkController *const nc,const Identity &myId,const char *path,const char *lfOwnerPrivate,const char *lfOwnerPublic,const char *lfNodeHost,int lfNodePort,bool storeOnlineState); + virtual ~LFDB(); + + virtual bool waitForReady(); + virtual bool isReady(); + virtual void save(nlohmann::json *orig,nlohmann::json &record); + virtual void eraseNetwork(const uint64_t networkId); + virtual void eraseMember(const uint64_t networkId,const uint64_t memberId); + virtual void nodeIsOnline(const uint64_t networkId,const uint64_t memberId,const InetAddress &physicalAddress); + +protected: + EmbeddedNetworkController *const _nc; + const Identity _myId; + + std::string _lfOwnerPrivate; + std::string _lfOwnerPublic; + std::string _lfNodeHost; + int _lfNodePort; + + struct _MemberState + { + InetAddress lastOnlineAddress; + int64_t lastOnlineTime; + bool dirty; + bool lastOnlineDirty; + }; + struct _NetworkState + { + std::unordered_map<uint64_t,_MemberState> members; + bool dirty; + }; + std::unordered_map<uint64_t,_NetworkState> _state; + std::mutex _state_l; + + std::atomic_bool _running; + std::atomic_bool _ready; + std::thread _syncThread; + bool _storeOnlineState; +}; + +} // namespace ZeroTier + +#endif |