From 1613f42d0082cf6438ad0c62d89405ab82625f98 Mon Sep 17 00:00:00 2001 From: Adam Ierymenko Date: Tue, 7 Nov 2017 14:44:46 -0800 Subject: Re-integrate in-filesystem DB into new controller DB structure. --- attic/JSONDB.cpp | 525 +++++++++++++++++++++++++++++++ attic/JSONDB.hpp | 192 +++++++++++ controller/DB.cpp | 314 ++++++++++++++++++ controller/DB.hpp | 116 +++++++ controller/EmbeddedNetworkController.hpp | 5 + controller/FileDB.cpp | 129 ++++++++ controller/FileDB.hpp | 47 +++ controller/JSONDB.cpp | 525 ------------------------------- controller/JSONDB.hpp | 192 ----------- controller/RethinkDB.cpp | 287 ++--------------- controller/RethinkDB.hpp | 111 +------ objects.mk | 2 + osdep/OSUtils.hpp | 15 +- 13 files changed, 1376 insertions(+), 1084 deletions(-) create mode 100644 attic/JSONDB.cpp create mode 100644 attic/JSONDB.hpp create mode 100644 controller/DB.cpp create mode 100644 controller/DB.hpp create mode 100644 controller/FileDB.cpp create mode 100644 controller/FileDB.hpp delete mode 100644 controller/JSONDB.cpp delete mode 100644 controller/JSONDB.hpp diff --git a/attic/JSONDB.cpp b/attic/JSONDB.cpp new file mode 100644 index 00000000..67b13393 --- /dev/null +++ b/attic/JSONDB.cpp @@ -0,0 +1,525 @@ +/* + * ZeroTier One - Network Virtualization Everywhere + * Copyright (C) 2011-2015 ZeroTier, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +#include +#include +#include +#ifndef _WIN32 +#include +#include +#include +#include +#include +#include +#endif + +#include "JSONDB.hpp" +#include "EmbeddedNetworkController.hpp" + +namespace ZeroTier { + +static const nlohmann::json _EMPTY_JSON(nlohmann::json::object()); + +JSONDB::JSONDB(const std::string &basePath,EmbeddedNetworkController *parent) : + _parent(parent), + _basePath(basePath), + _rawInput(-1), + _rawOutput(-1), + _summaryThreadRun(true), + _dataReady(false) +{ +#ifndef __WINDOWS__ + if (_basePath == "-") { + // If base path is "-" we run in Central harnessed mode. We read pseudo-http-requests from stdin and write + // them to stdout. + _rawInput = STDIN_FILENO; + _rawOutput = STDOUT_FILENO; + fcntl(_rawInput,F_SETFL,O_NONBLOCK); + } else { +#endif + // Default mode of operation is to store files in the filesystem + OSUtils::mkdir(_basePath.c_str()); + OSUtils::lockDownFile(_basePath.c_str(),true); // networks might contain auth tokens, etc., so restrict directory permissions +#ifndef __WINDOWS__ + } +#endif + + _networks_m.lock(); // locked until data is loaded, etc. + + if (_rawInput < 0) { + _load(basePath); + _dataReady = true; + _networks_m.unlock(); + } else { + // In harnessed mode we leave the lock locked and wait for our initial DB from Central. + _summaryThread = Thread::start(this); + } +} + +JSONDB::~JSONDB() +{ + Thread t; + { + Mutex::Lock _l(_summaryThread_m); + _summaryThreadRun = false; + t = _summaryThread; + } + if (t) + Thread::join(t); +} + +bool JSONDB::writeRaw(const std::string &n,const std::string &obj) +{ + if (_rawOutput >= 0) { +#ifndef __WINDOWS__ + if (obj.length() > 0) { + Mutex::Lock _l(_rawLock); + //fprintf(stderr,"%s\n",obj.c_str()); + if ((long)write(_rawOutput,obj.data(),obj.length()) == (long)obj.length()) { + if (write(_rawOutput,"\n",1) == 1) + return true; + } + } else return true; +#endif + return false; + } else { + const std::string path(_genPath(n,true)); + if (!path.length()) + return false; + return OSUtils::writeFile(path.c_str(),obj); + } +} + +bool JSONDB::hasNetwork(const uint64_t networkId) const +{ + Mutex::Lock _l(_networks_m); + return (_networks.find(networkId) != _networks.end()); +} + +bool JSONDB::getNetwork(const uint64_t networkId,nlohmann::json &config) const +{ + Mutex::Lock _l(_networks_m); + const std::unordered_map::const_iterator i(_networks.find(networkId)); + if (i == _networks.end()) + return false; + config = nlohmann::json::from_msgpack(i->second.config); + return true; +} + +bool JSONDB::getNetworkSummaryInfo(const uint64_t networkId,NetworkSummaryInfo &ns) const +{ + Mutex::Lock _l(_networks_m); + const std::unordered_map::const_iterator i(_networks.find(networkId)); + if (i == _networks.end()) + return false; + ns = i->second.summaryInfo; + return true; +} + +int JSONDB::getNetworkAndMember(const uint64_t networkId,const uint64_t nodeId,nlohmann::json &networkConfig,nlohmann::json &memberConfig,NetworkSummaryInfo &ns) const +{ + Mutex::Lock _l(_networks_m); + const std::unordered_map::const_iterator i(_networks.find(networkId)); + if (i == _networks.end()) + return 0; + const std::unordered_map< uint64_t,std::vector >::const_iterator j(i->second.members.find(nodeId)); + if (j == i->second.members.end()) + return 1; + networkConfig = nlohmann::json::from_msgpack(i->second.config); + memberConfig = nlohmann::json::from_msgpack(j->second); + ns = i->second.summaryInfo; + return 3; +} + +bool JSONDB::getNetworkMember(const uint64_t networkId,const uint64_t nodeId,nlohmann::json &memberConfig) const +{ + Mutex::Lock _l(_networks_m); + const std::unordered_map::const_iterator i(_networks.find(networkId)); + if (i == _networks.end()) + return false; + const std::unordered_map< uint64_t,std::vector >::const_iterator j(i->second.members.find(nodeId)); + if (j == i->second.members.end()) + return false; + memberConfig = nlohmann::json::from_msgpack(j->second); + return true; +} + +void JSONDB::saveNetwork(const uint64_t networkId,const nlohmann::json &networkConfig) +{ + char n[64]; + OSUtils::ztsnprintf(n,sizeof(n),"network/%.16llx",(unsigned long long)networkId); + writeRaw(n,OSUtils::jsonDump(networkConfig,-1)); + { + Mutex::Lock _l(_networks_m); + _NW &nw = _networks[networkId]; + nw.config = nlohmann::json::to_msgpack(networkConfig); + } + _recomputeSummaryInfo(networkId); +} + +void JSONDB::saveNetworkMember(const uint64_t networkId,const uint64_t nodeId,const nlohmann::json &memberConfig) +{ + char n[256]; + OSUtils::ztsnprintf(n,sizeof(n),"network/%.16llx/member/%.10llx",(unsigned long long)networkId,(unsigned long long)nodeId); + writeRaw(n,OSUtils::jsonDump(memberConfig,-1)); + { + Mutex::Lock _l(_networks_m); + std::vector &m = _networks[networkId].members[nodeId]; + m = nlohmann::json::to_msgpack(memberConfig); + _members[nodeId].insert(networkId); + } + _recomputeSummaryInfo(networkId); +} + +nlohmann::json JSONDB::eraseNetwork(const uint64_t networkId) +{ + if (_rawOutput >= 0) { + // In harnessed mode, DB deletes occur in the Central database and we do + // not need to erase files. + } else { + std::vector memberIds; + { + Mutex::Lock _l(_networks_m); + const std::unordered_map::iterator i(_networks.find(networkId)); + if (i == _networks.end()) + return _EMPTY_JSON; + for(std::unordered_map< uint64_t,std::vector >::iterator m(i->second.members.begin());m!=i->second.members.end();++m) + memberIds.push_back(m->first); + } + for(std::vector::iterator m(memberIds.begin());m!=memberIds.end();++m) + eraseNetworkMember(networkId,*m,false); + + char n[256]; + OSUtils::ztsnprintf(n,sizeof(n),"network/%.16llx",(unsigned long long)networkId); + const std::string path(_genPath(n,false)); + if (path.length()) + OSUtils::rm(path.c_str()); + } + + // This also erases all members from the memory cache + { + Mutex::Lock _l(_networks_m); + std::unordered_map::iterator i(_networks.find(networkId)); + if (i == _networks.end()) + return _EMPTY_JSON; // sanity check, shouldn't happen + nlohmann::json tmp(nlohmann::json::from_msgpack(i->second.config)); + _networks.erase(i); + return tmp; + } +} + +nlohmann::json JSONDB::eraseNetworkMember(const uint64_t networkId,const uint64_t nodeId,bool recomputeSummaryInfo) +{ + if (_rawOutput >= 0) { + // In harnessed mode, DB deletes occur in Central and we do not remove files. + } else { + char n[256]; + OSUtils::ztsnprintf(n,sizeof(n),"network/%.16llx/member/%.10llx",(unsigned long long)networkId,(unsigned long long)nodeId); + const std::string path(_genPath(n,false)); + if (path.length()) + OSUtils::rm(path.c_str()); + } + + { + Mutex::Lock _l(_networks_m); + _members[nodeId].erase(networkId); + std::unordered_map::iterator i(_networks.find(networkId)); + if (i == _networks.end()) + return _EMPTY_JSON; + std::unordered_map< uint64_t,std::vector >::iterator j(i->second.members.find(nodeId)); + if (j == i->second.members.end()) + return _EMPTY_JSON; + nlohmann::json tmp(j->second); + i->second.members.erase(j); + if (recomputeSummaryInfo) + _recomputeSummaryInfo(networkId); + return tmp; + } +} + +void JSONDB::threadMain() + throw() +{ +#ifndef __WINDOWS__ + fd_set readfds,nullfds; + char *const readbuf = (_rawInput >= 0) ? (new char[1048576]) : (char *)0; + std::string rawInputBuf; + FD_ZERO(&readfds); + FD_ZERO(&nullfds); + struct timeval tv; +#endif + + std::vector todo; + + while (_summaryThreadRun) { +#ifndef __WINDOWS__ + if (_rawInput < 0) { + Thread::sleep(25); + } else { + // In IPC mode we wait but also select() on STDIN to read database updates + FD_SET(_rawInput,&readfds); + tv.tv_sec = 0; + tv.tv_usec = 25000; + select(_rawInput+1,&readfds,&nullfds,&nullfds,&tv); + if (FD_ISSET(_rawInput,&readfds)) { + const long rn = (long)read(_rawInput,readbuf,1048576); + bool gotMessage = false; + for(long i=0;i 0) { + try { + const nlohmann::json obj(OSUtils::jsonParse(rawInputBuf)); + gotMessage = true; + + if (!_dataReady) { + _dataReady = true; + _networks_m.unlock(); + } + + if (obj.is_array()) { + for(unsigned long i=0;i::iterator ii(todo.begin());ii!=todo.end();++ii) { + const uint64_t networkId = *ii; + std::unordered_map::iterator n(_networks.find(networkId)); + if (n != _networks.end()) { + NetworkSummaryInfo &ns = n->second.summaryInfo; + ns.activeBridges.clear(); + ns.allocatedIps.clear(); + ns.authorizedMemberCount = 0; + ns.activeMemberCount = 0; + ns.totalMemberCount = 0; + ns.mostRecentDeauthTime = 0; + + for(std::unordered_map< uint64_t,std::vector >::const_iterator m(n->second.members.begin());m!=n->second.members.end();++m) { + try { + nlohmann::json member(nlohmann::json::from_msgpack(m->second)); + + if (OSUtils::jsonBool(member["authorized"],false)) { + ++ns.authorizedMemberCount; + + try { + const nlohmann::json &mlog = member["recentLog"]; + if ((mlog.is_array())&&(mlog.size() > 0)) { + const nlohmann::json &mlog1 = mlog[0]; + if (mlog1.is_object()) { + if ((now - OSUtils::jsonInt(mlog1["ts"],0ULL)) < (ZT_NETWORK_AUTOCONF_DELAY * 2)) + ++ns.activeMemberCount; + } + } + } catch ( ... ) {} + + try { + if (OSUtils::jsonBool(member["activeBridge"],false)) + ns.activeBridges.push_back(Address(m->first)); + } catch ( ... ) {} + + try { + const nlohmann::json &mips = member["ipAssignments"]; + if (mips.is_array()) { + for(unsigned long i=0;isecond.summaryInfoLastComputed = now; + } + } + } catch ( ... ) {} + + todo.clear(); + } + + if (!_dataReady) // sanity check + _networks_m.unlock(); + +#ifndef __WINDOWS__ + delete [] readbuf; +#endif +} + +bool JSONDB::_addOrUpdate(const nlohmann::json &j) +{ + try { + if (j.is_object()) { + std::string id(OSUtils::jsonString(j["id"],"0")); + const std::string objtype(OSUtils::jsonString(j["objtype"],"")); + if ((id.length() == 16)&&(objtype == "network")) { + + const uint64_t nwid = Utils::hexStrToU64(id.c_str()); + if (nwid) { + bool update; + { + Mutex::Lock _l(_networks_m); + _NW &nw = _networks[nwid]; + update = !nw.config.empty(); + nw.config = nlohmann::json::to_msgpack(j); + } + if (update) + _parent->onNetworkUpdate(nwid); + _recomputeSummaryInfo(nwid); + return true; + } + + } else if ((id.length() == 10)&&(objtype == "member")) { + + const uint64_t mid = Utils::hexStrToU64(id.c_str()); + const uint64_t nwid = Utils::hexStrToU64(OSUtils::jsonString(j["nwid"],"0").c_str()); + if ((mid)&&(nwid)) { + bool update = false; + bool deauth = false; + { + Mutex::Lock _l(_networks_m); + std::vector &m = _networks[nwid].members[mid]; + if (!m.empty()) { + update = true; + nlohmann::json oldm(nlohmann::json::from_msgpack(m)); + deauth = ((OSUtils::jsonBool(oldm["authorized"],false))&&(!OSUtils::jsonBool(j["authorized"],false))); + } + m = nlohmann::json::to_msgpack(j); + _members[mid].insert(nwid); + } + if (update) { + _parent->onNetworkMemberUpdate(nwid,mid); + if (deauth) + _parent->onNetworkMemberDeauthorize(nwid,mid); + } + _recomputeSummaryInfo(nwid); + return true; + } + + } else if (objtype == "_delete") { // pseudo-object-type, only used in Central harnessed mode + + const std::string deleteType(OSUtils::jsonString(j["deleteType"],"")); + id = OSUtils::jsonString(j["deleteId"],""); + if ((deleteType == "network")&&(id.length() == 16)) { + eraseNetwork(Utils::hexStrToU64(id.c_str())); + } else if ((deleteType == "member")&&(id.length() == 10)) { + const std::string networkId(OSUtils::jsonString(j["deleteNetworkId"],"")); + const uint64_t nwid = Utils::hexStrToU64(networkId.c_str()); + const uint64_t mid = Utils::hexStrToU64(id.c_str()); + if (networkId.length() == 16) + eraseNetworkMember(nwid,mid,true); + _parent->onNetworkMemberDeauthorize(nwid,mid); + } + + } + } + } catch ( ... ) {} + return false; +} + +bool JSONDB::_load(const std::string &p) +{ + // This is not used in stdin/stdout mode. Instead data is populated by + // sending it all to stdin. + + std::vector dl(OSUtils::listDirectory(p.c_str(),true)); + for(std::vector::const_iterator di(dl.begin());di!=dl.end();++di) { + if ((di->length() > 5)&&(di->substr(di->length() - 5) == ".json")) { + std::string buf; + if (OSUtils::readFile((p + ZT_PATH_SEPARATOR_S + *di).c_str(),buf)) { + try { + _addOrUpdate(OSUtils::jsonParse(buf)); + } catch ( ... ) {} + } + } else { + this->_load((p + ZT_PATH_SEPARATOR_S + *di)); + } + } + + return true; +} + +void JSONDB::_recomputeSummaryInfo(const uint64_t networkId) +{ + Mutex::Lock _l(_summaryThread_m); + if (std::find(_summaryThreadToDo.begin(),_summaryThreadToDo.end(),networkId) == _summaryThreadToDo.end()) + _summaryThreadToDo.push_back(networkId); + if (!_summaryThread) + _summaryThread = Thread::start(this); +} + +std::string JSONDB::_genPath(const std::string &n,bool create) +{ + std::vector pt(OSUtils::split(n.c_str(),"/","","")); + if (pt.size() == 0) + return std::string(); + + std::string p(_basePath); + if (create) OSUtils::mkdir(p.c_str()); + for(unsigned long i=0,j=(unsigned long)(pt.size()-1);i. + */ + +#ifndef ZT_JSONDB_HPP +#define ZT_JSONDB_HPP + +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include + +#include "../node/Constants.hpp" +#include "../node/Utils.hpp" +#include "../node/InetAddress.hpp" +#include "../node/Mutex.hpp" +#include "../ext/json/json.hpp" +#include "../osdep/OSUtils.hpp" +#include "../osdep/Thread.hpp" + +namespace ZeroTier { + +class EmbeddedNetworkController; + +/** + * Hierarchical JSON store that persists into the filesystem or via HTTP + */ +class JSONDB +{ +public: + struct NetworkSummaryInfo + { + NetworkSummaryInfo() : authorizedMemberCount(0),activeMemberCount(0),totalMemberCount(0),mostRecentDeauthTime(0) {} + std::vector
activeBridges; + std::vector allocatedIps; + unsigned long authorizedMemberCount; + unsigned long activeMemberCount; + unsigned long totalMemberCount; + int64_t mostRecentDeauthTime; + }; + + JSONDB(const std::string &basePath,EmbeddedNetworkController *parent); + ~JSONDB(); + + /** + * Write a JSON object to the data store + * + * It's important that obj contain a valid JSON object with no newlines (jsonDump with -1 + * for indentation), since newline-delimited JSON is what nodeJS's IPC speaks and this + * is important in Central-harnessed mode. + * + * @param n Path name of object + * @param obj Object in single-line no-CRs JSON object format (OSUtils::jsonDump(obj,-1)) + * @return True if write appears successful + */ + bool writeRaw(const std::string &n,const std::string &obj); + + bool hasNetwork(const uint64_t networkId) const; + + bool getNetwork(const uint64_t networkId,nlohmann::json &config) const; + + bool getNetworkSummaryInfo(const uint64_t networkId,NetworkSummaryInfo &ns) const; + + /** + * @return Bit mask: 0 == none, 1 == network only, 3 == network and member + */ + int getNetworkAndMember(const uint64_t networkId,const uint64_t nodeId,nlohmann::json &networkConfig,nlohmann::json &memberConfig,NetworkSummaryInfo &ns) const; + + bool getNetworkMember(const uint64_t networkId,const uint64_t nodeId,nlohmann::json &memberConfig) const; + + void saveNetwork(const uint64_t networkId,const nlohmann::json &networkConfig); + + void saveNetworkMember(const uint64_t networkId,const uint64_t nodeId,const nlohmann::json &memberConfig); + + nlohmann::json eraseNetwork(const uint64_t networkId); + + nlohmann::json eraseNetworkMember(const uint64_t networkId,const uint64_t nodeId,bool recomputeSummaryInfo = true); + + std::vector networkIds() const + { + std::vector r; + Mutex::Lock _l(_networks_m); + for(std::unordered_map::const_iterator n(_networks.begin());n!=_networks.end();++n) + r.push_back(n->first); + return r; + } + + inline unsigned long memberCount(const uint64_t networkId) + { + Mutex::Lock _l(_networks_m); + std::unordered_map::const_iterator i(_networks.find(networkId)); + if (i != _networks.end()) + return (unsigned long)i->second.members.size(); + return 0; + } + + template + inline void eachMember(const uint64_t networkId,F func) + { + Mutex::Lock _l(_networks_m); + std::unordered_map::const_iterator i(_networks.find(networkId)); + if (i != _networks.end()) { + for(std::unordered_map< uint64_t,std::vector >::const_iterator m(i->second.members.begin());m!=i->second.members.end();++m) { + try { + func(networkId,m->first,nlohmann::json::from_msgpack(m->second)); + } catch ( ... ) {} + } + } + } + + template + inline void eachId(F func) + { + Mutex::Lock _l(_networks_m); + for(std::unordered_map::const_iterator i(_networks.begin());i!=_networks.end();++i) { + for(std::unordered_map< uint64_t,std::vector >::const_iterator m(i->second.members.begin());m!=i->second.members.end();++m) { + try { + func(i->first,m->first); + } catch ( ... ) {} + } + } + } + + inline std::vector networksForMember(const uint64_t nodeId) + { + Mutex::Lock _l(_networks_m); + std::unordered_map< uint64_t,std::unordered_set< uint64_t > >::const_iterator m(_members.find(nodeId)); + if (m != _members.end()) { + return std::vector(m->second.begin(),m->second.end()); + } else { + return std::vector(); + } + } + + void threadMain() + throw(); + +private: + bool _addOrUpdate(const nlohmann::json &j); + bool _load(const std::string &p); + void _recomputeSummaryInfo(const uint64_t networkId); + std::string _genPath(const std::string &n,bool create); + + EmbeddedNetworkController *const _parent; + std::string _basePath; + int _rawInput,_rawOutput; + Mutex _rawLock; + + Thread _summaryThread; + std::vector _summaryThreadToDo; + volatile bool _summaryThreadRun; + Mutex _summaryThread_m; + + struct _NW + { + _NW() : summaryInfoLastComputed(0) {} + std::vector config; + NetworkSummaryInfo summaryInfo; + uint64_t summaryInfoLastComputed; + std::unordered_map< uint64_t,std::vector > members; + }; + + std::unordered_map< uint64_t,_NW > _networks; + std::unordered_map< uint64_t,std::unordered_set< uint64_t > > _members; + bool _dataReady; + Mutex _networks_m; +}; + +} // namespace ZeroTier + +#endif diff --git a/controller/DB.cpp b/controller/DB.cpp new file mode 100644 index 00000000..4a5688e3 --- /dev/null +++ b/controller/DB.cpp @@ -0,0 +1,314 @@ +/* + * ZeroTier One - Network Virtualization Everywhere + * Copyright (C) 2011-2015 ZeroTier, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +#include "DB.hpp" +#include "EmbeddedNetworkController.hpp" + +#include +#include +#include + +using json = nlohmann::json; + +namespace ZeroTier { + +DB::DB(EmbeddedNetworkController *const nc,const Address &myAddress,const char *path) : + _controller(nc), + _myAddress(myAddress), + _path((path) ? path : "") +{ + { + char tmp[32]; + _myAddress.toString(tmp); + _myAddressStr = tmp; + } +} + +DB::~DB() +{ +} + +bool DB::get(const uint64_t networkId,nlohmann::json &network) +{ + waitForReady(); + std::shared_ptr<_Network> nw; + { + std::lock_guard l(_networks_l); + auto nwi = _networks.find(networkId); + if (nwi == _networks.end()) + return false; + nw = nwi->second; + } + { + std::lock_guard l2(nw->lock); + network = nw->config; + } + return true; +} + +bool DB::get(const uint64_t networkId,nlohmann::json &network,const uint64_t memberId,nlohmann::json &member) +{ + waitForReady(); + std::shared_ptr<_Network> nw; + { + std::lock_guard l(_networks_l); + auto nwi = _networks.find(networkId); + if (nwi == _networks.end()) + return false; + nw = nwi->second; + } + { + std::lock_guard l2(nw->lock); + network = nw->config; + auto m = nw->members.find(memberId); + if (m == nw->members.end()) + return false; + member = m->second; + } + return true; +} + +bool DB::get(const uint64_t networkId,nlohmann::json &network,const uint64_t memberId,nlohmann::json &member,NetworkSummaryInfo &info) +{ + waitForReady(); + std::shared_ptr<_Network> nw; + { + std::lock_guard l(_networks_l); + auto nwi = _networks.find(networkId); + if (nwi == _networks.end()) + return false; + nw = nwi->second; + } + { + std::lock_guard l2(nw->lock); + network = nw->config; + _fillSummaryInfo(nw,info); + auto m = nw->members.find(memberId); + if (m == nw->members.end()) + return false; + member = m->second; + } + return true; +} + +bool DB::get(const uint64_t networkId,nlohmann::json &network,std::vector &members) +{ + waitForReady(); + std::shared_ptr<_Network> nw; + { + std::lock_guard l(_networks_l); + auto nwi = _networks.find(networkId); + if (nwi == _networks.end()) + return false; + nw = nwi->second; + } + { + std::lock_guard l2(nw->lock); + network = nw->config; + for(auto m=nw->members.begin();m!=nw->members.end();++m) + members.push_back(m->second); + } + return true; +} + +bool DB::summary(const uint64_t networkId,NetworkSummaryInfo &info) +{ + waitForReady(); + std::shared_ptr<_Network> nw; + { + std::lock_guard l(_networks_l); + auto nwi = _networks.find(networkId); + if (nwi == _networks.end()) + return false; + nw = nwi->second; + } + { + std::lock_guard l2(nw->lock); + _fillSummaryInfo(nw,info); + } + return true; +} + +void DB::networks(std::vector &networks) +{ + waitForReady(); + std::lock_guard l(_networks_l); + networks.reserve(_networks.size() + 1); + for(auto n=_networks.begin();n!=_networks.end();++n) + networks.push_back(n->first); +} + +void DB::_memberChanged(nlohmann::json &old,nlohmann::json &member,bool push) +{ + uint64_t memberId = 0; + uint64_t networkId = 0; + bool isAuth = false; + bool wasAuth = false; + std::shared_ptr<_Network> nw; + + if (old.is_object()) { + json &config = old["config"]; + if (config.is_object()) { + memberId = OSUtils::jsonIntHex(config["id"],0ULL); + networkId = OSUtils::jsonIntHex(config["nwid"],0ULL); + if ((memberId)&&(networkId)) { + { + std::lock_guard l(_networks_l); + auto nw2 = _networks.find(networkId); + if (nw2 != _networks.end()) + nw = nw2->second; + } + if (nw) { + std::lock_guard l(nw->lock); + if (OSUtils::jsonBool(config["activeBridge"],false)) + nw->activeBridgeMembers.erase(memberId); + wasAuth = OSUtils::jsonBool(config["authorized"],false); + if (wasAuth) + nw->authorizedMembers.erase(memberId); + json &ips = config["ipAssignments"]; + if (ips.is_array()) { + for(unsigned long i=0;iallocatedIps.erase(ipa); + } + } + } + } + } + } + } + + if (member.is_object()) { + json &config = member["config"]; + if (config.is_object()) { + if (!nw) { + memberId = OSUtils::jsonIntHex(config["id"],0ULL); + networkId = OSUtils::jsonIntHex(config["nwid"],0ULL); + if ((!memberId)||(!networkId)) + return; + std::lock_guard l(_networks_l); + std::shared_ptr<_Network> &nw2 = _networks[networkId]; + if (!nw2) + nw2.reset(new _Network); + nw = nw2; + } + + { + std::lock_guard l(nw->lock); + + nw->members[memberId] = config; + + if (OSUtils::jsonBool(config["activeBridge"],false)) + nw->activeBridgeMembers.insert(memberId); + isAuth = OSUtils::jsonBool(config["authorized"],false); + if (isAuth) + nw->authorizedMembers.insert(memberId); + json &ips = config["ipAssignments"]; + if (ips.is_array()) { + for(unsigned long i=0;iallocatedIps.insert(ipa); + } + } + } + + if (!isAuth) { + const int64_t ldt = (int64_t)OSUtils::jsonInt(config["lastDeauthorizedTime"],0ULL); + if (ldt > nw->mostRecentDeauthTime) + nw->mostRecentDeauthTime = ldt; + } + } + + if (push) + _controller->onNetworkMemberUpdate(networkId,memberId); + } + } else if (memberId) { + if (nw) { + std::lock_guard l(nw->lock); + nw->members.erase(memberId); + } + if (networkId) { + std::lock_guard l(_networks_l); + auto er = _networkByMember.equal_range(memberId); + for(auto i=er.first;i!=er.second;++i) { + if (i->second == networkId) { + _networkByMember.erase(i); + break; + } + } + } + } + + if ((push)&&((wasAuth)&&(!isAuth)&&(networkId)&&(memberId))) + _controller->onNetworkMemberDeauthorize(networkId,memberId); +} + +void DB::_networkChanged(nlohmann::json &old,nlohmann::json &network,bool push) +{ + if (network.is_object()) { + json &config = network["config"]; + if (config.is_object()) { + const std::string ids = config["id"]; + const uint64_t id = Utils::hexStrToU64(ids.c_str()); + if (id) { + std::shared_ptr<_Network> nw; + { + std::lock_guard l(_networks_l); + std::shared_ptr<_Network> &nw2 = _networks[id]; + if (!nw2) + nw2.reset(new _Network); + nw = nw2; + } + { + std::lock_guard l2(nw->lock); + nw->config = config; + } + if (push) + _controller->onNetworkUpdate(id); + } + } + } else if (old.is_object()) { + const std::string ids = old["id"]; + const uint64_t id = Utils::hexStrToU64(ids.c_str()); + if (id) { + std::lock_guard l(_networks_l); + _networks.erase(id); + } + } +} + +void DB::_fillSummaryInfo(const std::shared_ptr<_Network> &nw,NetworkSummaryInfo &info) +{ + for(auto ab=nw->activeBridgeMembers.begin();ab!=nw->activeBridgeMembers.end();++ab) + info.activeBridges.push_back(Address(*ab)); + for(auto ip=nw->allocatedIps.begin();ip!=nw->allocatedIps.end();++ip) + info.allocatedIps.push_back(*ip); + info.authorizedMemberCount = (unsigned long)nw->authorizedMembers.size(); + info.totalMemberCount = (unsigned long)nw->members.size(); + info.mostRecentDeauthTime = nw->mostRecentDeauthTime; +} + +} // namespace ZeroTier diff --git a/controller/DB.hpp b/controller/DB.hpp new file mode 100644 index 00000000..dfc8ac95 --- /dev/null +++ b/controller/DB.hpp @@ -0,0 +1,116 @@ +/* + * ZeroTier One - Network Virtualization Everywhere + * Copyright (C) 2011-2015 ZeroTier, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +#ifndef ZT_CONTROLLER_DB_HPP +#define ZT_CONTROLLER_DB_HPP + +#include "../node/Constants.hpp" +#include "../node/Address.hpp" +#include "../node/InetAddress.hpp" +#include "../osdep/OSUtils.hpp" +#include "../osdep/BlockingQueue.hpp" + +#include +#include +#include +#include +#include +#include +#include + +#include "../ext/json/json.hpp" + +#define ZT_CONTROLLER_RETHINKDB_COMMIT_THREADS 2 + +namespace ZeroTier +{ + +class EmbeddedNetworkController; + +/** + * Base class with common infrastructure for all controller DB implementations + */ +class DB +{ +public: + struct NetworkSummaryInfo + { + NetworkSummaryInfo() : authorizedMemberCount(0),totalMemberCount(0),mostRecentDeauthTime(0) {} + std::vector
activeBridges; + std::vector allocatedIps; + unsigned long authorizedMemberCount; + unsigned long totalMemberCount; + int64_t mostRecentDeauthTime; + }; + + DB(EmbeddedNetworkController *const nc,const Address &myAddress,const char *path); + virtual ~DB(); + + virtual bool waitForReady() = 0; + + inline bool hasNetwork(const uint64_t networkId) const + { + std::lock_guard l(_networks_l); + return (_networks.find(networkId) != _networks.end()); + } + + bool get(const uint64_t networkId,nlohmann::json &network); + bool get(const uint64_t networkId,nlohmann::json &network,const uint64_t memberId,nlohmann::json &member); + bool get(const uint64_t networkId,nlohmann::json &network,const uint64_t memberId,nlohmann::json &member,NetworkSummaryInfo &info); + bool get(const uint64_t networkId,nlohmann::json &network,std::vector &members); + + bool summary(const uint64_t networkId,NetworkSummaryInfo &info); + + void networks(std::vector &networks); + + virtual void save(const nlohmann::json &record) = 0; + + virtual void eraseNetwork(const uint64_t networkId) = 0; + + virtual void eraseMember(const uint64_t networkId,const uint64_t memberId) = 0; + +protected: + struct _Network + { + _Network() : mostRecentDeauthTime(0) {} + nlohmann::json config; + std::unordered_map members; + std::unordered_set activeBridgeMembers; + std::unordered_set authorizedMembers; + std::unordered_set allocatedIps; + int64_t mostRecentDeauthTime; + std::mutex lock; + }; + + void _memberChanged(nlohmann::json &old,nlohmann::json &member,bool push); + void _networkChanged(nlohmann::json &old,nlohmann::json &network,bool push); + void _fillSummaryInfo(const std::shared_ptr<_Network> &nw,NetworkSummaryInfo &info); + + EmbeddedNetworkController *const _controller; + const Address _myAddress; + const std::string _path; + std::string _myAddressStr; + + std::unordered_map< uint64_t,std::shared_ptr<_Network> > _networks; + std::unordered_multimap< uint64_t,uint64_t > _networkByMember; + mutable std::mutex _networks_l; +}; + +} // namespace ZeroTier + +#endif diff --git a/controller/EmbeddedNetworkController.hpp b/controller/EmbeddedNetworkController.hpp index f9b6fb5a..bc59b359 100644 --- a/controller/EmbeddedNetworkController.hpp +++ b/controller/EmbeddedNetworkController.hpp @@ -45,13 +45,18 @@ #ifdef ZT_CONTROLLER_USE_RETHINKDB #include "RethinkDB.hpp" +#else +#include "FileDB.hpp" #endif namespace ZeroTier { #ifdef ZT_CONTROLLER_USE_RETHINKDB typedef RethinkDB ControllerDB; +#else +typedef FileDB ControllerDB; #endif + class Node; class EmbeddedNetworkController : public NetworkController diff --git a/controller/FileDB.cpp b/controller/FileDB.cpp new file mode 100644 index 00000000..b48d5e87 --- /dev/null +++ b/controller/FileDB.cpp @@ -0,0 +1,129 @@ +/* + * ZeroTier One - Network Virtualization Everywhere + * Copyright (C) 2011-2015 ZeroTier, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +#include "FileDB.hpp" + +namespace ZeroTier +{ + +FileDB::FileDB(EmbeddedNetworkController *const nc,const Address &myAddress,const char *path) : + DB(nc,myAddress,path), + _networksPath(_path + ZT_PATH_SEPARATOR_S + "network") +{ + OSUtils::mkdir(_path.c_str()); + OSUtils::lockDownFile(_path.c_str(),true); + + std::vector networks(OSUtils::listDirectory(_networksPath.c_str(),false)); + std::string buf; + for(auto n=networks.begin();n!=networks.end();++n) { + buf.clear(); + if ((n->length() == 21)&&(OSUtils::readFile((_networksPath + ZT_PATH_SEPARATOR_S + *n).c_str(),buf))) { + try { + nlohmann::json network(OSUtils::jsonParse(buf)); + const std::string nwids = network["id"]; + if (nwids.length() == 16) { + nlohmann::json nullJson; + _networkChanged(nullJson,network,false); + std::string membersPath(_networksPath + ZT_PATH_SEPARATOR_S + nwids + ZT_PATH_SEPARATOR_S "member"); + std::vector members(OSUtils::listDirectory(membersPath.c_str(),false)); + for(auto m=members.begin();m!=members.end();++m) { + buf.clear(); + if ((m->length() == 15)&&(OSUtils::readFile((membersPath + ZT_PATH_SEPARATOR_S + *m).c_str(),buf))) { + try { + nlohmann::json member(OSUtils::jsonParse(buf)); + const std::string addrs = member["id"]; + if (addrs.length() == 10) { + nlohmann::json nullJson2; + _memberChanged(nullJson2,member,false); + } + } catch ( ... ) {} + } + } + } + } catch ( ... ) {} + } + } +} + +FileDB::~FileDB() +{ +} + +bool FileDB::waitForReady() +{ + return true; +} + +void FileDB::save(const nlohmann::json &record) +{ + char p1[16384],p2[16384]; + try { + nlohmann::json rec(record); + const std::string objtype = rec["objtype"]; + if (objtype == "network") { + const uint64_t nwid = OSUtils::jsonIntHex(rec["id"],0ULL); + if (nwid) { + nlohmann::json old; + get(nwid,old); + + OSUtils::ztsnprintf(p1,sizeof(p1),"%s" ZT_PATH_SEPARATOR_S "%.16llx.json.new",_networksPath.c_str(),nwid); + OSUtils::ztsnprintf(p2,sizeof(p2),"%s" ZT_PATH_SEPARATOR_S "%.16llx.json",_networksPath.c_str(),nwid); + if (!OSUtils::writeFile(p1,OSUtils::jsonDump(rec,-1))) + fprintf(stderr,"WARNING: controller unable to write to path: %s" ZT_EOL_S,p1); + OSUtils::rename(p1,p2); + + _networkChanged(old,rec,true); + } + } else if (objtype == "member") { + const uint64_t id = OSUtils::jsonIntHex(rec["id"],0ULL); + const uint64_t nwid = OSUtils::jsonIntHex(rec["nwid"],0ULL); + if ((id)&&(nwid)) { + nlohmann::json network,old; + get(nwid,network,id,old); + + OSUtils::ztsnprintf(p1,sizeof(p1),"%s" ZT_PATH_SEPARATOR_S "%.16llx" ZT_PATH_SEPARATOR_S "member" ZT_PATH_SEPARATOR_S "%.10llx.json.new",_networksPath.c_str(),nwid); + OSUtils::ztsnprintf(p2,sizeof(p2),"%s" ZT_PATH_SEPARATOR_S "%.16llx" ZT_PATH_SEPARATOR_S "member" ZT_PATH_SEPARATOR_S "%.10llx.json",_networksPath.c_str(),nwid); + if (!OSUtils::writeFile(p1,OSUtils::jsonDump(rec,-1))) + fprintf(stderr,"WARNING: controller unable to write to path: %s" ZT_EOL_S,p1); + OSUtils::rename(p1,p2); + + _memberChanged(old,rec,true); + } + } else if (objtype == "trace") { + const std::string id = rec["id"]; + OSUtils::ztsnprintf(p1,sizeof(p1),"%s" ZT_PATH_SEPARATOR_S "trace" ZT_PATH_SEPARATOR_S "%s.json",_path.c_str(),id.c_str()); + OSUtils::writeFile(p1,OSUtils::jsonDump(rec,-1)); + } + } catch ( ... ) {} // drop invalid records missing fields +} + +void FileDB::eraseNetwork(const uint64_t networkId) +{ + nlohmann::json network,nullJson; + get(networkId,network); + char p[16384]; + OSUtils::ztsnprintf(p,sizeof(p),"%s" ZT_PATH_SEPARATOR_S "%.16llx.json",_networksPath.c_str(),networkId); + OSUtils::rm(p); + _networkChanged(network,nullJson,true); +} + +void FileDB::eraseMember(const uint64_t networkId,const uint64_t memberId) +{ +} + +} // namespace ZeroTier diff --git a/controller/FileDB.hpp b/controller/FileDB.hpp new file mode 100644 index 00000000..fe9869b9 --- /dev/null +++ b/controller/FileDB.hpp @@ -0,0 +1,47 @@ +/* + * ZeroTier One - Network Virtualization Everywhere + * Copyright (C) 2011-2015 ZeroTier, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +#ifndef ZT_CONTROLLER_FILEDB_HPP +#define ZT_CONTROLLER_FILEDB_HPP + +#include "DB.hpp" + +namespace ZeroTier +{ + +class FileDB : public DB +{ +public: + FileDB(EmbeddedNetworkController *const nc,const Address &myAddress,const char *path); + virtual ~FileDB(); + + virtual bool waitForReady(); + + virtual void save(const nlohmann::json &record); + + virtual void eraseNetwork(const uint64_t networkId); + + virtual void eraseMember(const uint64_t networkId,const uint64_t memberId); + +protected: + std::string _networksPath; +}; + +} // namespace ZeroTier + +#endif diff --git a/controller/JSONDB.cpp b/controller/JSONDB.cpp deleted file mode 100644 index 67b13393..00000000 --- a/controller/JSONDB.cpp +++ /dev/null @@ -1,525 +0,0 @@ -/* - * ZeroTier One - Network Virtualization Everywhere - * Copyright (C) 2011-2015 ZeroTier, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . - */ - -#include -#include -#include -#ifndef _WIN32 -#include -#include -#include -#include -#include -#include -#endif - -#include "JSONDB.hpp" -#include "EmbeddedNetworkController.hpp" - -namespace ZeroTier { - -static const nlohmann::json _EMPTY_JSON(nlohmann::json::object()); - -JSONDB::JSONDB(const std::string &basePath,EmbeddedNetworkController *parent) : - _parent(parent), - _basePath(basePath), - _rawInput(-1), - _rawOutput(-1), - _summaryThreadRun(true), - _dataReady(false) -{ -#ifndef __WINDOWS__ - if (_basePath == "-") { - // If base path is "-" we run in Central harnessed mode. We read pseudo-http-requests from stdin and write - // them to stdout. - _rawInput = STDIN_FILENO; - _rawOutput = STDOUT_FILENO; - fcntl(_rawInput,F_SETFL,O_NONBLOCK); - } else { -#endif - // Default mode of operation is to store files in the filesystem - OSUtils::mkdir(_basePath.c_str()); - OSUtils::lockDownFile(_basePath.c_str(),true); // networks might contain auth tokens, etc., so restrict directory permissions -#ifndef __WINDOWS__ - } -#endif - - _networks_m.lock(); // locked until data is loaded, etc. - - if (_rawInput < 0) { - _load(basePath); - _dataReady = true; - _networks_m.unlock(); - } else { - // In harnessed mode we leave the lock locked and wait for our initial DB from Central. - _summaryThread = Thread::start(this); - } -} - -JSONDB::~JSONDB() -{ - Thread t; - { - Mutex::Lock _l(_summaryThread_m); - _summaryThreadRun = false; - t = _summaryThread; - } - if (t) - Thread::join(t); -} - -bool JSONDB::writeRaw(const std::string &n,const std::string &obj) -{ - if (_rawOutput >= 0) { -#ifndef __WINDOWS__ - if (obj.length() > 0) { - Mutex::Lock _l(_rawLock); - //fprintf(stderr,"%s\n",obj.c_str()); - if ((long)write(_rawOutput,obj.data(),obj.length()) == (long)obj.length()) { - if (write(_rawOutput,"\n",1) == 1) - return true; - } - } else return true; -#endif - return false; - } else { - const std::string path(_genPath(n,true)); - if (!path.length()) - return false; - return OSUtils::writeFile(path.c_str(),obj); - } -} - -bool JSONDB::hasNetwork(const uint64_t networkId) const -{ - Mutex::Lock _l(_networks_m); - return (_networks.find(networkId) != _networks.end()); -} - -bool JSONDB::getNetwork(const uint64_t networkId,nlohmann::json &config) const -{ - Mutex::Lock _l(_networks_m); - const std::unordered_map::const_iterator i(_networks.find(networkId)); - if (i == _networks.end()) - return false; - config = nlohmann::json::from_msgpack(i->second.config); - return true; -} - -bool JSONDB::getNetworkSummaryInfo(const uint64_t networkId,NetworkSummaryInfo &ns) const -{ - Mutex::Lock _l(_networks_m); - const std::unordered_map::const_iterator i(_networks.find(networkId)); - if (i == _networks.end()) - return false; - ns = i->second.summaryInfo; - return true; -} - -int JSONDB::getNetworkAndMember(const uint64_t networkId,const uint64_t nodeId,nlohmann::json &networkConfig,nlohmann::json &memberConfig,NetworkSummaryInfo &ns) const -{ - Mutex::Lock _l(_networks_m); - const std::unordered_map::const_iterator i(_networks.find(networkId)); - if (i == _networks.end()) - return 0; - const std::unordered_map< uint64_t,std::vector >::const_iterator j(i->second.members.find(nodeId)); - if (j == i->second.members.end()) - return 1; - networkConfig = nlohmann::json::from_msgpack(i->second.config); - memberConfig = nlohmann::json::from_msgpack(j->second); - ns = i->second.summaryInfo; - return 3; -} - -bool JSONDB::getNetworkMember(const uint64_t networkId,const uint64_t nodeId,nlohmann::json &memberConfig) const -{ - Mutex::Lock _l(_networks_m); - const std::unordered_map::const_iterator i(_networks.find(networkId)); - if (i == _networks.end()) - return false; - const std::unordered_map< uint64_t,std::vector >::const_iterator j(i->second.members.find(nodeId)); - if (j == i->second.members.end()) - return false; - memberConfig = nlohmann::json::from_msgpack(j->second); - return true; -} - -void JSONDB::saveNetwork(const uint64_t networkId,const nlohmann::json &networkConfig) -{ - char n[64]; - OSUtils::ztsnprintf(n,sizeof(n),"network/%.16llx",(unsigned long long)networkId); - writeRaw(n,OSUtils::jsonDump(networkConfig,-1)); - { - Mutex::Lock _l(_networks_m); - _NW &nw = _networks[networkId]; - nw.config = nlohmann::json::to_msgpack(networkConfig); - } - _recomputeSummaryInfo(networkId); -} - -void JSONDB::saveNetworkMember(const uint64_t networkId,const uint64_t nodeId,const nlohmann::json &memberConfig) -{ - char n[256]; - OSUtils::ztsnprintf(n,sizeof(n),"network/%.16llx/member/%.10llx",(unsigned long long)networkId,(unsigned long long)nodeId); - writeRaw(n,OSUtils::jsonDump(memberConfig,-1)); - { - Mutex::Lock _l(_networks_m); - std::vector &m = _networks[networkId].members[nodeId]; - m = nlohmann::json::to_msgpack(memberConfig); - _members[nodeId].insert(networkId); - } - _recomputeSummaryInfo(networkId); -} - -nlohmann::json JSONDB::eraseNetwork(const uint64_t networkId) -{ - if (_rawOutput >= 0) { - // In harnessed mode, DB deletes occur in the Central database and we do - // not need to erase files. - } else { - std::vector memberIds; - { - Mutex::Lock _l(_networks_m); - const std::unordered_map::iterator i(_networks.find(networkId)); - if (i == _networks.end()) - return _EMPTY_JSON; - for(std::unordered_map< uint64_t,std::vector >::iterator m(i->second.members.begin());m!=i->second.members.end();++m) - memberIds.push_back(m->first); - } - for(std::vector::iterator m(memberIds.begin());m!=memberIds.end();++m) - eraseNetworkMember(networkId,*m,false); - - char n[256]; - OSUtils::ztsnprintf(n,sizeof(n),"network/%.16llx",(unsigned long long)networkId); - const std::string path(_genPath(n,false)); - if (path.length()) - OSUtils::rm(path.c_str()); - } - - // This also erases all members from the memory cache - { - Mutex::Lock _l(_networks_m); - std::unordered_map::iterator i(_networks.find(networkId)); - if (i == _networks.end()) - return _EMPTY_JSON; // sanity check, shouldn't happen - nlohmann::json tmp(nlohmann::json::from_msgpack(i->second.config)); - _networks.erase(i); - return tmp; - } -} - -nlohmann::json JSONDB::eraseNetworkMember(const uint64_t networkId,const uint64_t nodeId,bool recomputeSummaryInfo) -{ - if (_rawOutput >= 0) { - // In harnessed mode, DB deletes occur in Central and we do not remove files. - } else { - char n[256]; - OSUtils::ztsnprintf(n,sizeof(n),"network/%.16llx/member/%.10llx",(unsigned long long)networkId,(unsigned long long)nodeId); - const std::string path(_genPath(n,false)); - if (path.length()) - OSUtils::rm(path.c_str()); - } - - { - Mutex::Lock _l(_networks_m); - _members[nodeId].erase(networkId); - std::unordered_map::iterator i(_networks.find(networkId)); - if (i == _networks.end()) - return _EMPTY_JSON; - std::unordered_map< uint64_t,std::vector >::iterator j(i->second.members.find(nodeId)); - if (j == i->second.members.end()) - return _EMPTY_JSON; - nlohmann::json tmp(j->second); - i->second.members.erase(j); - if (recomputeSummaryInfo) - _recomputeSummaryInfo(networkId); - return tmp; - } -} - -void JSONDB::threadMain() - throw() -{ -#ifndef __WINDOWS__ - fd_set readfds,nullfds; - char *const readbuf = (_rawInput >= 0) ? (new char[1048576]) : (char *)0; - std::string rawInputBuf; - FD_ZERO(&readfds); - FD_ZERO(&nullfds); - struct timeval tv; -#endif - - std::vector todo; - - while (_summaryThreadRun) { -#ifndef __WINDOWS__ - if (_rawInput < 0) { - Thread::sleep(25); - } else { - // In IPC mode we wait but also select() on STDIN to read database updates - FD_SET(_rawInput,&readfds); - tv.tv_sec = 0; - tv.tv_usec = 25000; - select(_rawInput+1,&readfds,&nullfds,&nullfds,&tv); - if (FD_ISSET(_rawInput,&readfds)) { - const long rn = (long)read(_rawInput,readbuf,1048576); - bool gotMessage = false; - for(long i=0;i 0) { - try { - const nlohmann::json obj(OSUtils::jsonParse(rawInputBuf)); - gotMessage = true; - - if (!_dataReady) { - _dataReady = true; - _networks_m.unlock(); - } - - if (obj.is_array()) { - for(unsigned long i=0;i::iterator ii(todo.begin());ii!=todo.end();++ii) { - const uint64_t networkId = *ii; - std::unordered_map::iterator n(_networks.find(networkId)); - if (n != _networks.end()) { - NetworkSummaryInfo &ns = n->second.summaryInfo; - ns.activeBridges.clear(); - ns.allocatedIps.clear(); - ns.authorizedMemberCount = 0; - ns.activeMemberCount = 0; - ns.totalMemberCount = 0; - ns.mostRecentDeauthTime = 0; - - for(std::unordered_map< uint64_t,std::vector >::const_iterator m(n->second.members.begin());m!=n->second.members.end();++m) { - try { - nlohmann::json member(nlohmann::json::from_msgpack(m->second)); - - if (OSUtils::jsonBool(member["authorized"],false)) { - ++ns.authorizedMemberCount; - - try { - const nlohmann::json &mlog = member["recentLog"]; - if ((mlog.is_array())&&(mlog.size() > 0)) { - const nlohmann::json &mlog1 = mlog[0]; - if (mlog1.is_object()) { - if ((now - OSUtils::jsonInt(mlog1["ts"],0ULL)) < (ZT_NETWORK_AUTOCONF_DELAY * 2)) - ++ns.activeMemberCount; - } - } - } catch ( ... ) {} - - try { - if (OSUtils::jsonBool(member["activeBridge"],false)) - ns.activeBridges.push_back(Address(m->first)); - } catch ( ... ) {} - - try { - const nlohmann::json &mips = member["ipAssignments"]; - if (mips.is_array()) { - for(unsigned long i=0;isecond.summaryInfoLastComputed = now; - } - } - } catch ( ... ) {} - - todo.clear(); - } - - if (!_dataReady) // sanity check - _networks_m.unlock(); - -#ifndef __WINDOWS__ - delete [] readbuf; -#endif -} - -bool JSONDB::_addOrUpdate(const nlohmann::json &j) -{ - try { - if (j.is_object()) { - std::string id(OSUtils::jsonString(j["id"],"0")); - const std::string objtype(OSUtils::jsonString(j["objtype"],"")); - if ((id.length() == 16)&&(objtype == "network")) { - - const uint64_t nwid = Utils::hexStrToU64(id.c_str()); - if (nwid) { - bool update; - { - Mutex::Lock _l(_networks_m); - _NW &nw = _networks[nwid]; - update = !nw.config.empty(); - nw.config = nlohmann::json::to_msgpack(j); - } - if (update) - _parent->onNetworkUpdate(nwid); - _recomputeSummaryInfo(nwid); - return true; - } - - } else if ((id.length() == 10)&&(objtype == "member")) { - - const uint64_t mid = Utils::hexStrToU64(id.c_str()); - const uint64_t nwid = Utils::hexStrToU64(OSUtils::jsonString(j["nwid"],"0").c_str()); - if ((mid)&&(nwid)) { - bool update = false; - bool deauth = false; - { - Mutex::Lock _l(_networks_m); - std::vector &m = _networks[nwid].members[mid]; - if (!m.empty()) { - update = true; - nlohmann::json oldm(nlohmann::json::from_msgpack(m)); - deauth = ((OSUtils::jsonBool(oldm["authorized"],false))&&(!OSUtils::jsonBool(j["authorized"],false))); - } - m = nlohmann::json::to_msgpack(j); - _members[mid].insert(nwid); - } - if (update) { - _parent->onNetworkMemberUpdate(nwid,mid); - if (deauth) - _parent->onNetworkMemberDeauthorize(nwid,mid); - } - _recomputeSummaryInfo(nwid); - return true; - } - - } else if (objtype == "_delete") { // pseudo-object-type, only used in Central harnessed mode - - const std::string deleteType(OSUtils::jsonString(j["deleteType"],"")); - id = OSUtils::jsonString(j["deleteId"],""); - if ((deleteType == "network")&&(id.length() == 16)) { - eraseNetwork(Utils::hexStrToU64(id.c_str())); - } else if ((deleteType == "member")&&(id.length() == 10)) { - const std::string networkId(OSUtils::jsonString(j["deleteNetworkId"],"")); - const uint64_t nwid = Utils::hexStrToU64(networkId.c_str()); - const uint64_t mid = Utils::hexStrToU64(id.c_str()); - if (networkId.length() == 16) - eraseNetworkMember(nwid,mid,true); - _parent->onNetworkMemberDeauthorize(nwid,mid); - } - - } - } - } catch ( ... ) {} - return false; -} - -bool JSONDB::_load(const std::string &p) -{ - // This is not used in stdin/stdout mode. Instead data is populated by - // sending it all to stdin. - - std::vector dl(OSUtils::listDirectory(p.c_str(),true)); - for(std::vector::const_iterator di(dl.begin());di!=dl.end();++di) { - if ((di->length() > 5)&&(di->substr(di->length() - 5) == ".json")) { - std::string buf; - if (OSUtils::readFile((p + ZT_PATH_SEPARATOR_S + *di).c_str(),buf)) { - try { - _addOrUpdate(OSUtils::jsonParse(buf)); - } catch ( ... ) {} - } - } else { - this->_load((p + ZT_PATH_SEPARATOR_S + *di)); - } - } - - return true; -} - -void JSONDB::_recomputeSummaryInfo(const uint64_t networkId) -{ - Mutex::Lock _l(_summaryThread_m); - if (std::find(_summaryThreadToDo.begin(),_summaryThreadToDo.end(),networkId) == _summaryThreadToDo.end()) - _summaryThreadToDo.push_back(networkId); - if (!_summaryThread) - _summaryThread = Thread::start(this); -} - -std::string JSONDB::_genPath(const std::string &n,bool create) -{ - std::vector pt(OSUtils::split(n.c_str(),"/","","")); - if (pt.size() == 0) - return std::string(); - - std::string p(_basePath); - if (create) OSUtils::mkdir(p.c_str()); - for(unsigned long i=0,j=(unsigned long)(pt.size()-1);i. - */ - -#ifndef ZT_JSONDB_HPP -#define ZT_JSONDB_HPP - -#include -#include -#include - -#include -#include -#include -#include -#include -#include -#include - -#include "../node/Constants.hpp" -#include "../node/Utils.hpp" -#include "../node/InetAddress.hpp" -#include "../node/Mutex.hpp" -#include "../ext/json/json.hpp" -#include "../osdep/OSUtils.hpp" -#include "../osdep/Thread.hpp" - -namespace ZeroTier { - -class EmbeddedNetworkController; - -/** - * Hierarchical JSON store that persists into the filesystem or via HTTP - */ -class JSONDB -{ -public: - struct NetworkSummaryInfo - { - NetworkSummaryInfo() : authorizedMemberCount(0),activeMemberCount(0),totalMemberCount(0),mostRecentDeauthTime(0) {} - std::vector
activeBridges; - std::vector allocatedIps; - unsigned long authorizedMemberCount; - unsigned long activeMemberCount; - unsigned long totalMemberCount; - int64_t mostRecentDeauthTime; - }; - - JSONDB(const std::string &basePath,EmbeddedNetworkController *parent); - ~JSONDB(); - - /** - * Write a JSON object to the data store - * - * It's important that obj contain a valid JSON object with no newlines (jsonDump with -1 - * for indentation), since newline-delimited JSON is what nodeJS's IPC speaks and this - * is important in Central-harnessed mode. - * - * @param n Path name of object - * @param obj Object in single-line no-CRs JSON object format (OSUtils::jsonDump(obj,-1)) - * @return True if write appears successful - */ - bool writeRaw(const std::string &n,const std::string &obj); - - bool hasNetwork(const uint64_t networkId) const; - - bool getNetwork(const uint64_t networkId,nlohmann::json &config) const; - - bool getNetworkSummaryInfo(const uint64_t networkId,NetworkSummaryInfo &ns) const; - - /** - * @return Bit mask: 0 == none, 1 == network only, 3 == network and member - */ - int getNetworkAndMember(const uint64_t networkId,const uint64_t nodeId,nlohmann::json &networkConfig,nlohmann::json &memberConfig,NetworkSummaryInfo &ns) const; - - bool getNetworkMember(const uint64_t networkId,const uint64_t nodeId,nlohmann::json &memberConfig) const; - - void saveNetwork(const uint64_t networkId,const nlohmann::json &networkConfig); - - void saveNetworkMember(const uint64_t networkId,const uint64_t nodeId,const nlohmann::json &memberConfig); - - nlohmann::json eraseNetwork(const uint64_t networkId); - - nlohmann::json eraseNetworkMember(const uint64_t networkId,const uint64_t nodeId,bool recomputeSummaryInfo = true); - - std::vector networkIds() const - { - std::vector r; - Mutex::Lock _l(_networks_m); - for(std::unordered_map::const_iterator n(_networks.begin());n!=_networks.end();++n) - r.push_back(n->first); - return r; - } - - inline unsigned long memberCount(const uint64_t networkId) - { - Mutex::Lock _l(_networks_m); - std::unordered_map::const_iterator i(_networks.find(networkId)); - if (i != _networks.end()) - return (unsigned long)i->second.members.size(); - return 0; - } - - template - inline void eachMember(const uint64_t networkId,F func) - { - Mutex::Lock _l(_networks_m); - std::unordered_map::const_iterator i(_networks.find(networkId)); - if (i != _networks.end()) { - for(std::unordered_map< uint64_t,std::vector >::const_iterator m(i->second.members.begin());m!=i->second.members.end();++m) { - try { - func(networkId,m->first,nlohmann::json::from_msgpack(m->second)); - } catch ( ... ) {} - } - } - } - - template - inline void eachId(F func) - { - Mutex::Lock _l(_networks_m); - for(std::unordered_map::const_iterator i(_networks.begin());i!=_networks.end();++i) { - for(std::unordered_map< uint64_t,std::vector >::const_iterator m(i->second.members.begin());m!=i->second.members.end();++m) { - try { - func(i->first,m->first); - } catch ( ... ) {} - } - } - } - - inline std::vector networksForMember(const uint64_t nodeId) - { - Mutex::Lock _l(_networks_m); - std::unordered_map< uint64_t,std::unordered_set< uint64_t > >::const_iterator m(_members.find(nodeId)); - if (m != _members.end()) { - return std::vector(m->second.begin(),m->second.end()); - } else { - return std::vector(); - } - } - - void threadMain() - throw(); - -private: - bool _addOrUpdate(const nlohmann::json &j); - bool _load(const std::string &p); - void _recomputeSummaryInfo(const uint64_t networkId); - std::string _genPath(const std::string &n,bool create); - - EmbeddedNetworkController *const _parent; - std::string _basePath; - int _rawInput,_rawOutput; - Mutex _rawLock; - - Thread _summaryThread; - std::vector _summaryThreadToDo; - volatile bool _summaryThreadRun; - Mutex _summaryThread_m; - - struct _NW - { - _NW() : summaryInfoLastComputed(0) {} - std::vector config; - NetworkSummaryInfo summaryInfo; - uint64_t summaryInfoLastComputed; - std::unordered_map< uint64_t,std::vector > members; - }; - - std::unordered_map< uint64_t,_NW > _networks; - std::unordered_map< uint64_t,std::unordered_set< uint64_t > > _members; - bool _dataReady; - Mutex _networks_m; -}; - -} // namespace ZeroTier - -#endif diff --git a/controller/RethinkDB.cpp b/controller/RethinkDB.cpp index ffa1a188..bea941fa 100644 --- a/controller/RethinkDB.cpp +++ b/controller/RethinkDB.cpp @@ -33,12 +33,12 @@ using json = nlohmann::json; namespace ZeroTier { RethinkDB::RethinkDB(EmbeddedNetworkController *const nc,const Address &myAddress,const char *path) : - _controller(nc), - _myAddress(myAddress), + DB(nc,myAddress,path), _ready(2), // two tables need to be synchronized before we're ready, so this is ready when it reaches 0 _run(1), _waitNoticePrinted(false) { + // rethinkdb:host:port:db[:auth] std::vector ps(OSUtils::split(path,":","","")); if ((ps.size() < 4)||(ps[0] != "rethinkdb")) throw std::runtime_error("invalid rethinkdb database url"); @@ -50,12 +50,6 @@ RethinkDB::RethinkDB(EmbeddedNetworkController *const nc,const Address &myAddres _readyLock.lock(); - { - char tmp[32]; - _myAddress.toString(tmp); - _myAddressStr = tmp; - } - _membersDbWatcher = std::thread([this]() { try { while (_run == 1) { @@ -79,7 +73,7 @@ RethinkDB::RethinkDB(EmbeddedNetworkController *const nc,const Address &myAddres json &nv = tmp["new_val"]; if (ov.is_object()||nv.is_object()) { //if (nv.is_object()) printf("MEMBER: %s" ZT_EOL_S,nv.dump().c_str()); - this->_memberChanged(ov,nv); + this->_memberChanged(ov,nv,(this->_ready <= 0)); } } catch ( ... ) {} // ignore bad records } @@ -120,7 +114,7 @@ RethinkDB::RethinkDB(EmbeddedNetworkController *const nc,const Address &myAddres json &nv = tmp["new_val"]; if (ov.is_object()||nv.is_object()) { //if (nv.is_object()) printf("NETWORK: %s" ZT_EOL_S,nv.dump().c_str()); - this->_networkChanged(ov,nv); + this->_networkChanged(ov,nv,(this->_ready <= 0)); } } catch ( ... ) {} // ignore bad records } @@ -166,18 +160,18 @@ RethinkDB::RethinkDB(EmbeddedNetworkController *const nc,const Address &myAddres record["controllerId"] = this->_myAddressStr; record["config"] = *config; table = "Network"; - } else if (objtype == "delete_network") { + } else if (objtype == "trace") { + record = *config; + table = "RemoteTrace"; + } else if (objtype == "_delete_network") { deleteId = (*config)["id"]; table = "Network"; - } else if (objtype == "delete_member") { + } else if (objtype == "_delete_member") { deleteId = (*config)["nwid"]; deleteId.push_back('-'); const std::string tmp = (*config)["id"]; deleteId.append(tmp); table = "Member"; - } else if (objtype == "trace") { - record = *config; - table = "RemoteTrace"; } else { delete config; continue; @@ -259,114 +253,16 @@ RethinkDB::~RethinkDB() _heartbeatThread.join(); } -bool RethinkDB::get(const uint64_t networkId,nlohmann::json &network) -{ - waitForReady(); - std::shared_ptr<_Network> nw; - { - std::lock_guard l(_networks_l); - auto nwi = _networks.find(networkId); - if (nwi == _networks.end()) - return false; - nw = nwi->second; - } - { - std::lock_guard l2(nw->lock); - network = nw->config; - } - return true; -} - -bool RethinkDB::get(const uint64_t networkId,nlohmann::json &network,const uint64_t memberId,nlohmann::json &member) -{ - waitForReady(); - std::shared_ptr<_Network> nw; - { - std::lock_guard l(_networks_l); - auto nwi = _networks.find(networkId); - if (nwi == _networks.end()) - return false; - nw = nwi->second; - } - { - std::lock_guard l2(nw->lock); - network = nw->config; - auto m = nw->members.find(memberId); - if (m == nw->members.end()) - return false; - member = m->second; - } - return true; -} - -bool RethinkDB::get(const uint64_t networkId,nlohmann::json &network,const uint64_t memberId,nlohmann::json &member,NetworkSummaryInfo &info) +void RethinkDB::waitForReady() const { - waitForReady(); - std::shared_ptr<_Network> nw; - { - std::lock_guard l(_networks_l); - auto nwi = _networks.find(networkId); - if (nwi == _networks.end()) - return false; - nw = nwi->second; - } - { - std::lock_guard l2(nw->lock); - network = nw->config; - _fillSummaryInfo(nw,info); - auto m = nw->members.find(memberId); - if (m == nw->members.end()) - return false; - member = m->second; - } - return true; -} - -bool RethinkDB::get(const uint64_t networkId,nlohmann::json &network,std::vector &members) -{ - waitForReady(); - std::shared_ptr<_Network> nw; - { - std::lock_guard l(_networks_l); - auto nwi = _networks.find(networkId); - if (nwi == _networks.end()) - return false; - nw = nwi->second; - } - { - std::lock_guard l2(nw->lock); - network = nw->config; - for(auto m=nw->members.begin();m!=nw->members.end();++m) - members.push_back(m->second); - } - return true; -} - -bool RethinkDB::summary(const uint64_t networkId,NetworkSummaryInfo &info) -{ - waitForReady(); - std::shared_ptr<_Network> nw; - { - std::lock_guard l(_networks_l); - auto nwi = _networks.find(networkId); - if (nwi == _networks.end()) - return false; - nw = nwi->second; - } - { - std::lock_guard l2(nw->lock); - _fillSummaryInfo(nw,info); + while (_ready > 0) { + if (!_waitNoticePrinted) { + _waitNoticePrinted = true; + fprintf(stderr,"NOTICE: controller RethinkDB waiting for initial data download..." ZT_EOL_S); + } + _readyLock.lock(); + _readyLock.unlock(); } - return true; -} - -void RethinkDB::networks(std::vector &networks) -{ - waitForReady(); - std::lock_guard l(_networks_l); - networks.reserve(_networks.size() + 1); - for(auto n=_networks.begin();n!=_networks.end();++n) - networks.push_back(n->first); } void RethinkDB::save(const nlohmann::json &record) @@ -382,7 +278,7 @@ void RethinkDB::eraseNetwork(const uint64_t networkId) Utils::hex(networkId,tmp2); json *tmp = new json(); (*tmp)["id"] = tmp2; - (*tmp)["objtype"] = "delete_network"; // pseudo-type, tells thread to delete network + (*tmp)["objtype"] = "_delete_network"; // pseudo-type, tells thread to delete network _commitQueue.post(tmp); } @@ -395,155 +291,10 @@ void RethinkDB::eraseMember(const uint64_t networkId,const uint64_t memberId) (*tmp)["nwid"] = tmp2; Utils::hex10(memberId,tmp2); (*tmp)["id"] = tmp2; - (*tmp)["objtype"] = "delete_member"; // pseudo-type, tells thread to delete network + (*tmp)["objtype"] = "_delete_member"; // pseudo-type, tells thread to delete network _commitQueue.post(tmp); } -void RethinkDB::_memberChanged(nlohmann::json &old,nlohmann::json &member) -{ - uint64_t memberId = 0; - uint64_t networkId = 0; - bool isAuth = false; - bool wasAuth = false; - std::shared_ptr<_Network> nw; - - if (old.is_object()) { - json &config = old["config"]; - if (config.is_object()) { - memberId = OSUtils::jsonIntHex(config["id"],0ULL); - networkId = OSUtils::jsonIntHex(config["nwid"],0ULL); - if ((memberId)&&(networkId)) { - { - std::lock_guard l(_networks_l); - auto nw2 = _networks.find(networkId); - if (nw2 != _networks.end()) - nw = nw2->second; - } - if (nw) { - std::lock_guard l(nw->lock); - if (OSUtils::jsonBool(config["activeBridge"],false)) - nw->activeBridgeMembers.erase(memberId); - wasAuth = OSUtils::jsonBool(config["authorized"],false); - if (wasAuth) - nw->authorizedMembers.erase(memberId); - json &ips = config["ipAssignments"]; - if (ips.is_array()) { - for(unsigned long i=0;iallocatedIps.erase(ipa); - } - } - } - } - } - } - } - - if (member.is_object()) { - json &config = member["config"]; - if (config.is_object()) { - if (!nw) { - memberId = OSUtils::jsonIntHex(config["id"],0ULL); - networkId = OSUtils::jsonIntHex(config["nwid"],0ULL); - if ((!memberId)||(!networkId)) - return; - std::lock_guard l(_networks_l); - std::shared_ptr<_Network> &nw2 = _networks[networkId]; - if (!nw2) - nw2.reset(new _Network); - nw = nw2; - } - - { - std::lock_guard l(nw->lock); - - nw->members[memberId] = config; - - if (OSUtils::jsonBool(config["activeBridge"],false)) - nw->activeBridgeMembers.insert(memberId); - isAuth = OSUtils::jsonBool(config["authorized"],false); - if (isAuth) - nw->authorizedMembers.insert(memberId); - json &ips = config["ipAssignments"]; - if (ips.is_array()) { - for(unsigned long i=0;iallocatedIps.insert(ipa); - } - } - } - - if (!isAuth) { - const int64_t ldt = (int64_t)OSUtils::jsonInt(config["lastDeauthorizedTime"],0ULL); - if (ldt > nw->mostRecentDeauthTime) - nw->mostRecentDeauthTime = ldt; - } - } - - _controller->onNetworkMemberUpdate(networkId,memberId); - } - } else if (memberId) { - if (nw) { - std::lock_guard l(nw->lock); - nw->members.erase(memberId); - } - if (networkId) { - std::lock_guard l(_networks_l); - auto er = _networkByMember.equal_range(memberId); - for(auto i=er.first;i!=er.second;++i) { - if (i->second == networkId) { - _networkByMember.erase(i); - break; - } - } - } - } - - if ((wasAuth)&&(!isAuth)&&(networkId)&&(memberId)) - _controller->onNetworkMemberDeauthorize(networkId,memberId); -} - -void RethinkDB::_networkChanged(nlohmann::json &old,nlohmann::json &network) -{ - if (network.is_object()) { - json &config = network["config"]; - if (config.is_object()) { - const std::string ids = config["id"]; - const uint64_t id = Utils::hexStrToU64(ids.c_str()); - if (id) { - std::shared_ptr<_Network> nw; - { - std::lock_guard l(_networks_l); - std::shared_ptr<_Network> &nw2 = _networks[id]; - if (!nw2) - nw2.reset(new _Network); - nw = nw2; - } - { - std::lock_guard l2(nw->lock); - nw->config = config; - } - _controller->onNetworkUpdate(id); - } - } - } else if (old.is_object()) { - const std::string ids = old["id"]; - const uint64_t id = Utils::hexStrToU64(ids.c_str()); - if (id) { - std::lock_guard l(_networks_l); - _networks.erase(id); - } - } -} - } // namespace ZeroTier #endif // ZT_CONTROLLER_USE_RETHINKDB diff --git a/controller/RethinkDB.hpp b/controller/RethinkDB.hpp index 62586803..2d7ba58e 100644 --- a/controller/RethinkDB.hpp +++ b/controller/RethinkDB.hpp @@ -16,112 +16,35 @@ * along with this program. If not, see . */ +#define ZT_CONTROLLER_USE_RETHINKDB + #ifdef ZT_CONTROLLER_USE_RETHINKDB #ifndef ZT_CONTROLLER_RETHINKDB_HPP #define ZT_CONTROLLER_RETHINKDB_HPP -#include "../node/Constants.hpp" -#include "../node/Address.hpp" -#include "../node/InetAddress.hpp" -#include "../osdep/OSUtils.hpp" -#include "../osdep/BlockingQueue.hpp" - -#include -#include -#include -#include -#include -#include -#include - -#include "../ext/json/json.hpp" +#include "DB.hpp" #define ZT_CONTROLLER_RETHINKDB_COMMIT_THREADS 2 namespace ZeroTier { -class EmbeddedNetworkController; - -class RethinkDB +class RethinkDB : public DB { public: - struct NetworkSummaryInfo - { - NetworkSummaryInfo() : authorizedMemberCount(0),totalMemberCount(0),mostRecentDeauthTime(0) {} - std::vector
activeBridges; - std::vector allocatedIps; - unsigned long authorizedMemberCount; - unsigned long totalMemberCount; - int64_t mostRecentDeauthTime; - }; - RethinkDB(EmbeddedNetworkController *const nc,const Address &myAddress,const char *path); - ~RethinkDB(); - - inline void waitForReady() const - { - while (_ready > 0) { - if (!_waitNoticePrinted) { - _waitNoticePrinted = true; - fprintf(stderr,"NOTICE: controller RethinkDB waiting for initial data download..." ZT_EOL_S); - } - _readyLock.lock(); - _readyLock.unlock(); - } - } - - inline bool hasNetwork(const uint64_t networkId) const - { - std::lock_guard l(_networks_l); - return (_networks.find(networkId) != _networks.end()); - } - - bool get(const uint64_t networkId,nlohmann::json &network); - bool get(const uint64_t networkId,nlohmann::json &network,const uint64_t memberId,nlohmann::json &member); - bool get(const uint64_t networkId,nlohmann::json &network,const uint64_t memberId,nlohmann::json &member,NetworkSummaryInfo &info); - bool get(const uint64_t networkId,nlohmann::json &network,std::vector &members); - - bool summary(const uint64_t networkId,NetworkSummaryInfo &info); - - void networks(std::vector &networks); - - void save(const nlohmann::json &record); - - void eraseNetwork(const uint64_t networkId); - void eraseMember(const uint64_t networkId,const uint64_t memberId); - -private: - struct _Network - { - _Network() : mostRecentDeauthTime(0) {} - nlohmann::json config; - std::unordered_map members; - std::unordered_set activeBridgeMembers; - std::unordered_set authorizedMembers; - std::unordered_set allocatedIps; - int64_t mostRecentDeauthTime; - std::mutex lock; - }; - - void _memberChanged(nlohmann::json &old,nlohmann::json &member); - void _networkChanged(nlohmann::json &old,nlohmann::json &network); - - inline void _fillSummaryInfo(const std::shared_ptr<_Network> &nw,NetworkSummaryInfo &info) - { - for(auto ab=nw->activeBridgeMembers.begin();ab!=nw->activeBridgeMembers.end();++ab) - info.activeBridges.push_back(Address(*ab)); - for(auto ip=nw->allocatedIps.begin();ip!=nw->allocatedIps.end();++ip) - info.allocatedIps.push_back(*ip); - info.authorizedMemberCount = (unsigned long)nw->authorizedMembers.size(); - info.totalMemberCount = (unsigned long)nw->members.size(); - info.mostRecentDeauthTime = nw->mostRecentDeauthTime; - } - - EmbeddedNetworkController *const _controller; - const Address _myAddress; - std::string _myAddressStr; + virtual ~RethinkDB(); + + virtual void waitForReady() const; + + virtual void save(const nlohmann::json &record); + + virtual void eraseNetwork(const uint64_t networkId); + + virtual void eraseMember(const uint64_t networkId,const uint64_t memberId); + +protected: std::string _host; std::string _db; std::string _auth; @@ -132,10 +55,6 @@ private: std::thread _networksDbWatcher; std::thread _membersDbWatcher; - std::unordered_map< uint64_t,std::shared_ptr<_Network> > _networks; - std::unordered_multimap< uint64_t,uint64_t > _networkByMember; - mutable std::mutex _networks_l; - BlockingQueue< nlohmann::json * > _commitQueue; std::thread _commitThread[ZT_CONTROLLER_RETHINKDB_COMMIT_THREADS]; diff --git a/objects.mk b/objects.mk index d18efbe6..d4a3c16c 100644 --- a/objects.mk +++ b/objects.mk @@ -28,6 +28,8 @@ CORE_OBJS=\ ONE_OBJS=\ controller/EmbeddedNetworkController.o \ + controller/DB.o \ + controller/FileDB.o \ controller/RethinkDB.o \ osdep/ManagedRoute.o \ osdep/Http.o \ diff --git a/osdep/OSUtils.hpp b/osdep/OSUtils.hpp index fd1b31d9..274b48df 100644 --- a/osdep/OSUtils.hpp +++ b/osdep/OSUtils.hpp @@ -101,7 +101,6 @@ public: * @return True if delete was successful */ static inline bool rm(const char *path) - throw() { #ifdef __WINDOWS__ return (DeleteFileA(path) != FALSE); @@ -109,7 +108,7 @@ public: return (unlink(path) == 0); #endif } - static inline bool rm(const std::string &path) throw() { return rm(path.c_str()); } + static inline bool rm(const std::string &path) { return rm(path.c_str()); } static inline bool mkdir(const char *path) { @@ -123,7 +122,17 @@ public: return true; #endif } - static inline bool mkdir(const std::string &path) throw() { return OSUtils::mkdir(path.c_str()); } + static inline bool mkdir(const std::string &path) { return OSUtils::mkdir(path.c_str()); } + + static inline bool rename(const char *o,const char *n) + { +#ifdef __WINDOWS__ + DeleteFileA(n); + return (::rename(o,n) == 0); +#else + return (::rename(o,n) == 0); +#endif + } /** * List a directory's contents -- cgit v1.2.3