diff options
author | Adam Ierymenko <adam.ierymenko@gmail.com> | 2017-11-07 14:44:46 -0800 |
---|---|---|
committer | Adam Ierymenko <adam.ierymenko@gmail.com> | 2017-11-07 14:44:46 -0800 |
commit | 1613f42d0082cf6438ad0c62d89405ab82625f98 (patch) | |
tree | bc46b11bc909b28de54252e1691a58380b28bae9 /controller | |
parent | 7fc9094d8ea1c2d28d003c499016f0755b73063d (diff) | |
download | infinitytier-1613f42d0082cf6438ad0c62d89405ab82625f98.tar.gz infinitytier-1613f42d0082cf6438ad0c62d89405ab82625f98.zip |
Re-integrate in-filesystem DB into new controller DB structure.
Diffstat (limited to 'controller')
-rw-r--r-- | controller/DB.cpp | 314 | ||||
-rw-r--r-- | controller/DB.hpp | 116 | ||||
-rw-r--r-- | controller/EmbeddedNetworkController.hpp | 5 | ||||
-rw-r--r-- | controller/FileDB.cpp | 129 | ||||
-rw-r--r-- | controller/FileDB.hpp | 47 | ||||
-rw-r--r-- | controller/JSONDB.cpp | 525 | ||||
-rw-r--r-- | controller/JSONDB.hpp | 192 | ||||
-rw-r--r-- | controller/RethinkDB.cpp | 287 | ||||
-rw-r--r-- | controller/RethinkDB.hpp | 111 |
9 files changed, 645 insertions, 1081 deletions
diff --git a/controller/DB.cpp b/controller/DB.cpp new file mode 100644 index 00000000..4a5688e3 --- /dev/null +++ b/controller/DB.cpp @@ -0,0 +1,314 @@ +/* + * ZeroTier One - Network Virtualization Everywhere + * Copyright (C) 2011-2015 ZeroTier, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#include "DB.hpp" +#include "EmbeddedNetworkController.hpp" + +#include <chrono> +#include <algorithm> +#include <stdexcept> + +using json = nlohmann::json; + +namespace ZeroTier { + +DB::DB(EmbeddedNetworkController *const nc,const Address &myAddress,const char *path) : + _controller(nc), + _myAddress(myAddress), + _path((path) ? path : "") +{ + { + char tmp[32]; + _myAddress.toString(tmp); + _myAddressStr = tmp; + } +} + +DB::~DB() +{ +} + +bool DB::get(const uint64_t networkId,nlohmann::json &network) +{ + waitForReady(); + std::shared_ptr<_Network> nw; + { + std::lock_guard<std::mutex> l(_networks_l); + auto nwi = _networks.find(networkId); + if (nwi == _networks.end()) + return false; + nw = nwi->second; + } + { + std::lock_guard<std::mutex> l2(nw->lock); + network = nw->config; + } + return true; +} + +bool DB::get(const uint64_t networkId,nlohmann::json &network,const uint64_t memberId,nlohmann::json &member) +{ + waitForReady(); + std::shared_ptr<_Network> nw; + { + std::lock_guard<std::mutex> l(_networks_l); + auto nwi = _networks.find(networkId); + if (nwi == _networks.end()) + return false; + nw = nwi->second; + } + { + std::lock_guard<std::mutex> l2(nw->lock); + network = nw->config; + auto m = nw->members.find(memberId); + if (m == nw->members.end()) + return false; + member = m->second; + } + return true; +} + +bool DB::get(const uint64_t networkId,nlohmann::json &network,const uint64_t memberId,nlohmann::json &member,NetworkSummaryInfo &info) +{ + waitForReady(); + std::shared_ptr<_Network> nw; + { + std::lock_guard<std::mutex> l(_networks_l); + auto nwi = _networks.find(networkId); + if (nwi == _networks.end()) + return false; + nw = nwi->second; + } + { + std::lock_guard<std::mutex> l2(nw->lock); + network = nw->config; + _fillSummaryInfo(nw,info); + auto m = nw->members.find(memberId); + if (m == nw->members.end()) + return false; + member = m->second; + } + return true; +} + +bool DB::get(const uint64_t networkId,nlohmann::json &network,std::vector<nlohmann::json> &members) +{ + waitForReady(); + std::shared_ptr<_Network> nw; + { + std::lock_guard<std::mutex> l(_networks_l); + auto nwi = _networks.find(networkId); + if (nwi == _networks.end()) + return false; + nw = nwi->second; + } + { + std::lock_guard<std::mutex> l2(nw->lock); + network = nw->config; + for(auto m=nw->members.begin();m!=nw->members.end();++m) + members.push_back(m->second); + } + return true; +} + +bool DB::summary(const uint64_t networkId,NetworkSummaryInfo &info) +{ + waitForReady(); + std::shared_ptr<_Network> nw; + { + std::lock_guard<std::mutex> l(_networks_l); + auto nwi = _networks.find(networkId); + if (nwi == _networks.end()) + return false; + nw = nwi->second; + } + { + std::lock_guard<std::mutex> l2(nw->lock); + _fillSummaryInfo(nw,info); + } + return true; +} + +void DB::networks(std::vector<uint64_t> &networks) +{ + waitForReady(); + std::lock_guard<std::mutex> l(_networks_l); + networks.reserve(_networks.size() + 1); + for(auto n=_networks.begin();n!=_networks.end();++n) + networks.push_back(n->first); +} + +void DB::_memberChanged(nlohmann::json &old,nlohmann::json &member,bool push) +{ + uint64_t memberId = 0; + uint64_t networkId = 0; + bool isAuth = false; + bool wasAuth = false; + std::shared_ptr<_Network> nw; + + if (old.is_object()) { + json &config = old["config"]; + if (config.is_object()) { + memberId = OSUtils::jsonIntHex(config["id"],0ULL); + networkId = OSUtils::jsonIntHex(config["nwid"],0ULL); + if ((memberId)&&(networkId)) { + { + std::lock_guard<std::mutex> l(_networks_l); + auto nw2 = _networks.find(networkId); + if (nw2 != _networks.end()) + nw = nw2->second; + } + if (nw) { + std::lock_guard<std::mutex> l(nw->lock); + if (OSUtils::jsonBool(config["activeBridge"],false)) + nw->activeBridgeMembers.erase(memberId); + wasAuth = OSUtils::jsonBool(config["authorized"],false); + if (wasAuth) + nw->authorizedMembers.erase(memberId); + json &ips = config["ipAssignments"]; + if (ips.is_array()) { + for(unsigned long i=0;i<ips.size();++i) { + json &ipj = ips[i]; + if (ipj.is_string()) { + const std::string ips = ipj; + InetAddress ipa(ips.c_str()); + ipa.setPort(0); + nw->allocatedIps.erase(ipa); + } + } + } + } + } + } + } + + if (member.is_object()) { + json &config = member["config"]; + if (config.is_object()) { + if (!nw) { + memberId = OSUtils::jsonIntHex(config["id"],0ULL); + networkId = OSUtils::jsonIntHex(config["nwid"],0ULL); + if ((!memberId)||(!networkId)) + return; + std::lock_guard<std::mutex> l(_networks_l); + std::shared_ptr<_Network> &nw2 = _networks[networkId]; + if (!nw2) + nw2.reset(new _Network); + nw = nw2; + } + + { + std::lock_guard<std::mutex> l(nw->lock); + + nw->members[memberId] = config; + + if (OSUtils::jsonBool(config["activeBridge"],false)) + nw->activeBridgeMembers.insert(memberId); + isAuth = OSUtils::jsonBool(config["authorized"],false); + if (isAuth) + nw->authorizedMembers.insert(memberId); + json &ips = config["ipAssignments"]; + if (ips.is_array()) { + for(unsigned long i=0;i<ips.size();++i) { + json &ipj = ips[i]; + if (ipj.is_string()) { + const std::string ips = ipj; + InetAddress ipa(ips.c_str()); + ipa.setPort(0); + nw->allocatedIps.insert(ipa); + } + } + } + + if (!isAuth) { + const int64_t ldt = (int64_t)OSUtils::jsonInt(config["lastDeauthorizedTime"],0ULL); + if (ldt > nw->mostRecentDeauthTime) + nw->mostRecentDeauthTime = ldt; + } + } + + if (push) + _controller->onNetworkMemberUpdate(networkId,memberId); + } + } else if (memberId) { + if (nw) { + std::lock_guard<std::mutex> l(nw->lock); + nw->members.erase(memberId); + } + if (networkId) { + std::lock_guard<std::mutex> l(_networks_l); + auto er = _networkByMember.equal_range(memberId); + for(auto i=er.first;i!=er.second;++i) { + if (i->second == networkId) { + _networkByMember.erase(i); + break; + } + } + } + } + + if ((push)&&((wasAuth)&&(!isAuth)&&(networkId)&&(memberId))) + _controller->onNetworkMemberDeauthorize(networkId,memberId); +} + +void DB::_networkChanged(nlohmann::json &old,nlohmann::json &network,bool push) +{ + if (network.is_object()) { + json &config = network["config"]; + if (config.is_object()) { + const std::string ids = config["id"]; + const uint64_t id = Utils::hexStrToU64(ids.c_str()); + if (id) { + std::shared_ptr<_Network> nw; + { + std::lock_guard<std::mutex> l(_networks_l); + std::shared_ptr<_Network> &nw2 = _networks[id]; + if (!nw2) + nw2.reset(new _Network); + nw = nw2; + } + { + std::lock_guard<std::mutex> l2(nw->lock); + nw->config = config; + } + if (push) + _controller->onNetworkUpdate(id); + } + } + } else if (old.is_object()) { + const std::string ids = old["id"]; + const uint64_t id = Utils::hexStrToU64(ids.c_str()); + if (id) { + std::lock_guard<std::mutex> l(_networks_l); + _networks.erase(id); + } + } +} + +void DB::_fillSummaryInfo(const std::shared_ptr<_Network> &nw,NetworkSummaryInfo &info) +{ + for(auto ab=nw->activeBridgeMembers.begin();ab!=nw->activeBridgeMembers.end();++ab) + info.activeBridges.push_back(Address(*ab)); + for(auto ip=nw->allocatedIps.begin();ip!=nw->allocatedIps.end();++ip) + info.allocatedIps.push_back(*ip); + info.authorizedMemberCount = (unsigned long)nw->authorizedMembers.size(); + info.totalMemberCount = (unsigned long)nw->members.size(); + info.mostRecentDeauthTime = nw->mostRecentDeauthTime; +} + +} // namespace ZeroTier diff --git a/controller/DB.hpp b/controller/DB.hpp new file mode 100644 index 00000000..dfc8ac95 --- /dev/null +++ b/controller/DB.hpp @@ -0,0 +1,116 @@ +/* + * ZeroTier One - Network Virtualization Everywhere + * Copyright (C) 2011-2015 ZeroTier, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#ifndef ZT_CONTROLLER_DB_HPP +#define ZT_CONTROLLER_DB_HPP + +#include "../node/Constants.hpp" +#include "../node/Address.hpp" +#include "../node/InetAddress.hpp" +#include "../osdep/OSUtils.hpp" +#include "../osdep/BlockingQueue.hpp" + +#include <memory> +#include <string> +#include <thread> +#include <unordered_map> +#include <unordered_set> +#include <vector> +#include <atomic> + +#include "../ext/json/json.hpp" + +#define ZT_CONTROLLER_RETHINKDB_COMMIT_THREADS 2 + +namespace ZeroTier +{ + +class EmbeddedNetworkController; + +/** + * Base class with common infrastructure for all controller DB implementations + */ +class DB +{ +public: + struct NetworkSummaryInfo + { + NetworkSummaryInfo() : authorizedMemberCount(0),totalMemberCount(0),mostRecentDeauthTime(0) {} + std::vector<Address> activeBridges; + std::vector<InetAddress> allocatedIps; + unsigned long authorizedMemberCount; + unsigned long totalMemberCount; + int64_t mostRecentDeauthTime; + }; + + DB(EmbeddedNetworkController *const nc,const Address &myAddress,const char *path); + virtual ~DB(); + + virtual bool waitForReady() = 0; + + inline bool hasNetwork(const uint64_t networkId) const + { + std::lock_guard<std::mutex> l(_networks_l); + return (_networks.find(networkId) != _networks.end()); + } + + bool get(const uint64_t networkId,nlohmann::json &network); + bool get(const uint64_t networkId,nlohmann::json &network,const uint64_t memberId,nlohmann::json &member); + bool get(const uint64_t networkId,nlohmann::json &network,const uint64_t memberId,nlohmann::json &member,NetworkSummaryInfo &info); + bool get(const uint64_t networkId,nlohmann::json &network,std::vector<nlohmann::json> &members); + + bool summary(const uint64_t networkId,NetworkSummaryInfo &info); + + void networks(std::vector<uint64_t> &networks); + + virtual void save(const nlohmann::json &record) = 0; + + virtual void eraseNetwork(const uint64_t networkId) = 0; + + virtual void eraseMember(const uint64_t networkId,const uint64_t memberId) = 0; + +protected: + struct _Network + { + _Network() : mostRecentDeauthTime(0) {} + nlohmann::json config; + std::unordered_map<uint64_t,nlohmann::json> members; + std::unordered_set<uint64_t> activeBridgeMembers; + std::unordered_set<uint64_t> authorizedMembers; + std::unordered_set<InetAddress,InetAddress::Hasher> allocatedIps; + int64_t mostRecentDeauthTime; + std::mutex lock; + }; + + void _memberChanged(nlohmann::json &old,nlohmann::json &member,bool push); + void _networkChanged(nlohmann::json &old,nlohmann::json &network,bool push); + void _fillSummaryInfo(const std::shared_ptr<_Network> &nw,NetworkSummaryInfo &info); + + EmbeddedNetworkController *const _controller; + const Address _myAddress; + const std::string _path; + std::string _myAddressStr; + + std::unordered_map< uint64_t,std::shared_ptr<_Network> > _networks; + std::unordered_multimap< uint64_t,uint64_t > _networkByMember; + mutable std::mutex _networks_l; +}; + +} // namespace ZeroTier + +#endif diff --git a/controller/EmbeddedNetworkController.hpp b/controller/EmbeddedNetworkController.hpp index f9b6fb5a..bc59b359 100644 --- a/controller/EmbeddedNetworkController.hpp +++ b/controller/EmbeddedNetworkController.hpp @@ -45,13 +45,18 @@ #ifdef ZT_CONTROLLER_USE_RETHINKDB #include "RethinkDB.hpp" +#else +#include "FileDB.hpp" #endif namespace ZeroTier { #ifdef ZT_CONTROLLER_USE_RETHINKDB typedef RethinkDB ControllerDB; +#else +typedef FileDB ControllerDB; #endif + class Node; class EmbeddedNetworkController : public NetworkController diff --git a/controller/FileDB.cpp b/controller/FileDB.cpp new file mode 100644 index 00000000..b48d5e87 --- /dev/null +++ b/controller/FileDB.cpp @@ -0,0 +1,129 @@ +/* + * ZeroTier One - Network Virtualization Everywhere + * Copyright (C) 2011-2015 ZeroTier, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#include "FileDB.hpp" + +namespace ZeroTier +{ + +FileDB::FileDB(EmbeddedNetworkController *const nc,const Address &myAddress,const char *path) : + DB(nc,myAddress,path), + _networksPath(_path + ZT_PATH_SEPARATOR_S + "network") +{ + OSUtils::mkdir(_path.c_str()); + OSUtils::lockDownFile(_path.c_str(),true); + + std::vector<std::string> networks(OSUtils::listDirectory(_networksPath.c_str(),false)); + std::string buf; + for(auto n=networks.begin();n!=networks.end();++n) { + buf.clear(); + if ((n->length() == 21)&&(OSUtils::readFile((_networksPath + ZT_PATH_SEPARATOR_S + *n).c_str(),buf))) { + try { + nlohmann::json network(OSUtils::jsonParse(buf)); + const std::string nwids = network["id"]; + if (nwids.length() == 16) { + nlohmann::json nullJson; + _networkChanged(nullJson,network,false); + std::string membersPath(_networksPath + ZT_PATH_SEPARATOR_S + nwids + ZT_PATH_SEPARATOR_S "member"); + std::vector<std::string> members(OSUtils::listDirectory(membersPath.c_str(),false)); + for(auto m=members.begin();m!=members.end();++m) { + buf.clear(); + if ((m->length() == 15)&&(OSUtils::readFile((membersPath + ZT_PATH_SEPARATOR_S + *m).c_str(),buf))) { + try { + nlohmann::json member(OSUtils::jsonParse(buf)); + const std::string addrs = member["id"]; + if (addrs.length() == 10) { + nlohmann::json nullJson2; + _memberChanged(nullJson2,member,false); + } + } catch ( ... ) {} + } + } + } + } catch ( ... ) {} + } + } +} + +FileDB::~FileDB() +{ +} + +bool FileDB::waitForReady() +{ + return true; +} + +void FileDB::save(const nlohmann::json &record) +{ + char p1[16384],p2[16384]; + try { + nlohmann::json rec(record); + const std::string objtype = rec["objtype"]; + if (objtype == "network") { + const uint64_t nwid = OSUtils::jsonIntHex(rec["id"],0ULL); + if (nwid) { + nlohmann::json old; + get(nwid,old); + + OSUtils::ztsnprintf(p1,sizeof(p1),"%s" ZT_PATH_SEPARATOR_S "%.16llx.json.new",_networksPath.c_str(),nwid); + OSUtils::ztsnprintf(p2,sizeof(p2),"%s" ZT_PATH_SEPARATOR_S "%.16llx.json",_networksPath.c_str(),nwid); + if (!OSUtils::writeFile(p1,OSUtils::jsonDump(rec,-1))) + fprintf(stderr,"WARNING: controller unable to write to path: %s" ZT_EOL_S,p1); + OSUtils::rename(p1,p2); + + _networkChanged(old,rec,true); + } + } else if (objtype == "member") { + const uint64_t id = OSUtils::jsonIntHex(rec["id"],0ULL); + const uint64_t nwid = OSUtils::jsonIntHex(rec["nwid"],0ULL); + if ((id)&&(nwid)) { + nlohmann::json network,old; + get(nwid,network,id,old); + + OSUtils::ztsnprintf(p1,sizeof(p1),"%s" ZT_PATH_SEPARATOR_S "%.16llx" ZT_PATH_SEPARATOR_S "member" ZT_PATH_SEPARATOR_S "%.10llx.json.new",_networksPath.c_str(),nwid); + OSUtils::ztsnprintf(p2,sizeof(p2),"%s" ZT_PATH_SEPARATOR_S "%.16llx" ZT_PATH_SEPARATOR_S "member" ZT_PATH_SEPARATOR_S "%.10llx.json",_networksPath.c_str(),nwid); + if (!OSUtils::writeFile(p1,OSUtils::jsonDump(rec,-1))) + fprintf(stderr,"WARNING: controller unable to write to path: %s" ZT_EOL_S,p1); + OSUtils::rename(p1,p2); + + _memberChanged(old,rec,true); + } + } else if (objtype == "trace") { + const std::string id = rec["id"]; + OSUtils::ztsnprintf(p1,sizeof(p1),"%s" ZT_PATH_SEPARATOR_S "trace" ZT_PATH_SEPARATOR_S "%s.json",_path.c_str(),id.c_str()); + OSUtils::writeFile(p1,OSUtils::jsonDump(rec,-1)); + } + } catch ( ... ) {} // drop invalid records missing fields +} + +void FileDB::eraseNetwork(const uint64_t networkId) +{ + nlohmann::json network,nullJson; + get(networkId,network); + char p[16384]; + OSUtils::ztsnprintf(p,sizeof(p),"%s" ZT_PATH_SEPARATOR_S "%.16llx.json",_networksPath.c_str(),networkId); + OSUtils::rm(p); + _networkChanged(network,nullJson,true); +} + +void FileDB::eraseMember(const uint64_t networkId,const uint64_t memberId) +{ +} + +} // namespace ZeroTier diff --git a/controller/FileDB.hpp b/controller/FileDB.hpp new file mode 100644 index 00000000..fe9869b9 --- /dev/null +++ b/controller/FileDB.hpp @@ -0,0 +1,47 @@ +/* + * ZeroTier One - Network Virtualization Everywhere + * Copyright (C) 2011-2015 ZeroTier, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#ifndef ZT_CONTROLLER_FILEDB_HPP +#define ZT_CONTROLLER_FILEDB_HPP + +#include "DB.hpp" + +namespace ZeroTier +{ + +class FileDB : public DB +{ +public: + FileDB(EmbeddedNetworkController *const nc,const Address &myAddress,const char *path); + virtual ~FileDB(); + + virtual bool waitForReady(); + + virtual void save(const nlohmann::json &record); + + virtual void eraseNetwork(const uint64_t networkId); + + virtual void eraseMember(const uint64_t networkId,const uint64_t memberId); + +protected: + std::string _networksPath; +}; + +} // namespace ZeroTier + +#endif diff --git a/controller/JSONDB.cpp b/controller/JSONDB.cpp deleted file mode 100644 index 67b13393..00000000 --- a/controller/JSONDB.cpp +++ /dev/null @@ -1,525 +0,0 @@ -/* - * ZeroTier One - Network Virtualization Everywhere - * Copyright (C) 2011-2015 ZeroTier, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - */ - -#include <stdio.h> -#include <stdlib.h> -#include <stdint.h> -#ifndef _WIN32 -#include <unistd.h> -#include <fcntl.h> -#include <sys/time.h> -#include <sys/types.h> -#include <sys/socket.h> -#include <sys/select.h> -#endif - -#include "JSONDB.hpp" -#include "EmbeddedNetworkController.hpp" - -namespace ZeroTier { - -static const nlohmann::json _EMPTY_JSON(nlohmann::json::object()); - -JSONDB::JSONDB(const std::string &basePath,EmbeddedNetworkController *parent) : - _parent(parent), - _basePath(basePath), - _rawInput(-1), - _rawOutput(-1), - _summaryThreadRun(true), - _dataReady(false) -{ -#ifndef __WINDOWS__ - if (_basePath == "-") { - // If base path is "-" we run in Central harnessed mode. We read pseudo-http-requests from stdin and write - // them to stdout. - _rawInput = STDIN_FILENO; - _rawOutput = STDOUT_FILENO; - fcntl(_rawInput,F_SETFL,O_NONBLOCK); - } else { -#endif - // Default mode of operation is to store files in the filesystem - OSUtils::mkdir(_basePath.c_str()); - OSUtils::lockDownFile(_basePath.c_str(),true); // networks might contain auth tokens, etc., so restrict directory permissions -#ifndef __WINDOWS__ - } -#endif - - _networks_m.lock(); // locked until data is loaded, etc. - - if (_rawInput < 0) { - _load(basePath); - _dataReady = true; - _networks_m.unlock(); - } else { - // In harnessed mode we leave the lock locked and wait for our initial DB from Central. - _summaryThread = Thread::start(this); - } -} - -JSONDB::~JSONDB() -{ - Thread t; - { - Mutex::Lock _l(_summaryThread_m); - _summaryThreadRun = false; - t = _summaryThread; - } - if (t) - Thread::join(t); -} - -bool JSONDB::writeRaw(const std::string &n,const std::string &obj) -{ - if (_rawOutput >= 0) { -#ifndef __WINDOWS__ - if (obj.length() > 0) { - Mutex::Lock _l(_rawLock); - //fprintf(stderr,"%s\n",obj.c_str()); - if ((long)write(_rawOutput,obj.data(),obj.length()) == (long)obj.length()) { - if (write(_rawOutput,"\n",1) == 1) - return true; - } - } else return true; -#endif - return false; - } else { - const std::string path(_genPath(n,true)); - if (!path.length()) - return false; - return OSUtils::writeFile(path.c_str(),obj); - } -} - -bool JSONDB::hasNetwork(const uint64_t networkId) const -{ - Mutex::Lock _l(_networks_m); - return (_networks.find(networkId) != _networks.end()); -} - -bool JSONDB::getNetwork(const uint64_t networkId,nlohmann::json &config) const -{ - Mutex::Lock _l(_networks_m); - const std::unordered_map<uint64_t,_NW>::const_iterator i(_networks.find(networkId)); - if (i == _networks.end()) - return false; - config = nlohmann::json::from_msgpack(i->second.config); - return true; -} - -bool JSONDB::getNetworkSummaryInfo(const uint64_t networkId,NetworkSummaryInfo &ns) const -{ - Mutex::Lock _l(_networks_m); - const std::unordered_map<uint64_t,_NW>::const_iterator i(_networks.find(networkId)); - if (i == _networks.end()) - return false; - ns = i->second.summaryInfo; - return true; -} - -int JSONDB::getNetworkAndMember(const uint64_t networkId,const uint64_t nodeId,nlohmann::json &networkConfig,nlohmann::json &memberConfig,NetworkSummaryInfo &ns) const -{ - Mutex::Lock _l(_networks_m); - const std::unordered_map<uint64_t,_NW>::const_iterator i(_networks.find(networkId)); - if (i == _networks.end()) - return 0; - const std::unordered_map< uint64_t,std::vector<uint8_t> >::const_iterator j(i->second.members.find(nodeId)); - if (j == i->second.members.end()) - return 1; - networkConfig = nlohmann::json::from_msgpack(i->second.config); - memberConfig = nlohmann::json::from_msgpack(j->second); - ns = i->second.summaryInfo; - return 3; -} - -bool JSONDB::getNetworkMember(const uint64_t networkId,const uint64_t nodeId,nlohmann::json &memberConfig) const -{ - Mutex::Lock _l(_networks_m); - const std::unordered_map<uint64_t,_NW>::const_iterator i(_networks.find(networkId)); - if (i == _networks.end()) - return false; - const std::unordered_map< uint64_t,std::vector<uint8_t> >::const_iterator j(i->second.members.find(nodeId)); - if (j == i->second.members.end()) - return false; - memberConfig = nlohmann::json::from_msgpack(j->second); - return true; -} - -void JSONDB::saveNetwork(const uint64_t networkId,const nlohmann::json &networkConfig) -{ - char n[64]; - OSUtils::ztsnprintf(n,sizeof(n),"network/%.16llx",(unsigned long long)networkId); - writeRaw(n,OSUtils::jsonDump(networkConfig,-1)); - { - Mutex::Lock _l(_networks_m); - _NW &nw = _networks[networkId]; - nw.config = nlohmann::json::to_msgpack(networkConfig); - } - _recomputeSummaryInfo(networkId); -} - -void JSONDB::saveNetworkMember(const uint64_t networkId,const uint64_t nodeId,const nlohmann::json &memberConfig) -{ - char n[256]; - OSUtils::ztsnprintf(n,sizeof(n),"network/%.16llx/member/%.10llx",(unsigned long long)networkId,(unsigned long long)nodeId); - writeRaw(n,OSUtils::jsonDump(memberConfig,-1)); - { - Mutex::Lock _l(_networks_m); - std::vector<uint8_t> &m = _networks[networkId].members[nodeId]; - m = nlohmann::json::to_msgpack(memberConfig); - _members[nodeId].insert(networkId); - } - _recomputeSummaryInfo(networkId); -} - -nlohmann::json JSONDB::eraseNetwork(const uint64_t networkId) -{ - if (_rawOutput >= 0) { - // In harnessed mode, DB deletes occur in the Central database and we do - // not need to erase files. - } else { - std::vector<uint64_t> memberIds; - { - Mutex::Lock _l(_networks_m); - const std::unordered_map<uint64_t,_NW>::iterator i(_networks.find(networkId)); - if (i == _networks.end()) - return _EMPTY_JSON; - for(std::unordered_map< uint64_t,std::vector<uint8_t> >::iterator m(i->second.members.begin());m!=i->second.members.end();++m) - memberIds.push_back(m->first); - } - for(std::vector<uint64_t>::iterator m(memberIds.begin());m!=memberIds.end();++m) - eraseNetworkMember(networkId,*m,false); - - char n[256]; - OSUtils::ztsnprintf(n,sizeof(n),"network/%.16llx",(unsigned long long)networkId); - const std::string path(_genPath(n,false)); - if (path.length()) - OSUtils::rm(path.c_str()); - } - - // This also erases all members from the memory cache - { - Mutex::Lock _l(_networks_m); - std::unordered_map<uint64_t,_NW>::iterator i(_networks.find(networkId)); - if (i == _networks.end()) - return _EMPTY_JSON; // sanity check, shouldn't happen - nlohmann::json tmp(nlohmann::json::from_msgpack(i->second.config)); - _networks.erase(i); - return tmp; - } -} - -nlohmann::json JSONDB::eraseNetworkMember(const uint64_t networkId,const uint64_t nodeId,bool recomputeSummaryInfo) -{ - if (_rawOutput >= 0) { - // In harnessed mode, DB deletes occur in Central and we do not remove files. - } else { - char n[256]; - OSUtils::ztsnprintf(n,sizeof(n),"network/%.16llx/member/%.10llx",(unsigned long long)networkId,(unsigned long long)nodeId); - const std::string path(_genPath(n,false)); - if (path.length()) - OSUtils::rm(path.c_str()); - } - - { - Mutex::Lock _l(_networks_m); - _members[nodeId].erase(networkId); - std::unordered_map<uint64_t,_NW>::iterator i(_networks.find(networkId)); - if (i == _networks.end()) - return _EMPTY_JSON; - std::unordered_map< uint64_t,std::vector<uint8_t> >::iterator j(i->second.members.find(nodeId)); - if (j == i->second.members.end()) - return _EMPTY_JSON; - nlohmann::json tmp(j->second); - i->second.members.erase(j); - if (recomputeSummaryInfo) - _recomputeSummaryInfo(networkId); - return tmp; - } -} - -void JSONDB::threadMain() - throw() -{ -#ifndef __WINDOWS__ - fd_set readfds,nullfds; - char *const readbuf = (_rawInput >= 0) ? (new char[1048576]) : (char *)0; - std::string rawInputBuf; - FD_ZERO(&readfds); - FD_ZERO(&nullfds); - struct timeval tv; -#endif - - std::vector<uint64_t> todo; - - while (_summaryThreadRun) { -#ifndef __WINDOWS__ - if (_rawInput < 0) { - Thread::sleep(25); - } else { - // In IPC mode we wait but also select() on STDIN to read database updates - FD_SET(_rawInput,&readfds); - tv.tv_sec = 0; - tv.tv_usec = 25000; - select(_rawInput+1,&readfds,&nullfds,&nullfds,&tv); - if (FD_ISSET(_rawInput,&readfds)) { - const long rn = (long)read(_rawInput,readbuf,1048576); - bool gotMessage = false; - for(long i=0;i<rn;++i) { - if ((readbuf[i] != '\n')&&(readbuf[i] != '\r')&&(readbuf[i] != 0)) { // compatible with nodeJS IPC - rawInputBuf.push_back(readbuf[i]); - } else if (rawInputBuf.length() > 0) { - try { - const nlohmann::json obj(OSUtils::jsonParse(rawInputBuf)); - gotMessage = true; - - if (!_dataReady) { - _dataReady = true; - _networks_m.unlock(); - } - - if (obj.is_array()) { - for(unsigned long i=0;i<obj.size();++i) - _addOrUpdate(obj[i]); - } else if (obj.is_object()) { - _addOrUpdate(obj); - } - } catch ( ... ) {} // ignore malformed JSON - - rawInputBuf.clear(); - } - } - if (!gotMessage) // select() again immediately until we get at least one full message - continue; - } - } -#else - Thread::sleep(25); -#endif - - { - Mutex::Lock _l(_summaryThread_m); - if (_summaryThreadToDo.empty()) - continue; - else _summaryThreadToDo.swap(todo); - } - - if (!_dataReady) { // sanity check - _dataReady = true; - _networks_m.unlock(); - } - - const int64_t now = OSUtils::now(); - try { - Mutex::Lock _l(_networks_m); - for(std::vector<uint64_t>::iterator ii(todo.begin());ii!=todo.end();++ii) { - const uint64_t networkId = *ii; - std::unordered_map<uint64_t,_NW>::iterator n(_networks.find(networkId)); - if (n != _networks.end()) { - NetworkSummaryInfo &ns = n->second.summaryInfo; - ns.activeBridges.clear(); - ns.allocatedIps.clear(); - ns.authorizedMemberCount = 0; - ns.activeMemberCount = 0; - ns.totalMemberCount = 0; - ns.mostRecentDeauthTime = 0; - - for(std::unordered_map< uint64_t,std::vector<uint8_t> >::const_iterator m(n->second.members.begin());m!=n->second.members.end();++m) { - try { - nlohmann::json member(nlohmann::json::from_msgpack(m->second)); - - if (OSUtils::jsonBool(member["authorized"],false)) { - ++ns.authorizedMemberCount; - - try { - const nlohmann::json &mlog = member["recentLog"]; - if ((mlog.is_array())&&(mlog.size() > 0)) { - const nlohmann::json &mlog1 = mlog[0]; - if (mlog1.is_object()) { - if ((now - OSUtils::jsonInt(mlog1["ts"],0ULL)) < (ZT_NETWORK_AUTOCONF_DELAY * 2)) - ++ns.activeMemberCount; - } - } - } catch ( ... ) {} - - try { - if (OSUtils::jsonBool(member["activeBridge"],false)) - ns.activeBridges.push_back(Address(m->first)); - } catch ( ... ) {} - - try { - const nlohmann::json &mips = member["ipAssignments"]; - if (mips.is_array()) { - for(unsigned long i=0;i<mips.size();++i) { - InetAddress mip(OSUtils::jsonString(mips[i],"").c_str()); - if ((mip.ss_family == AF_INET)||(mip.ss_family == AF_INET6)) - ns.allocatedIps.push_back(mip); - } - } - } catch ( ... ) {} - } else { - try { - ns.mostRecentDeauthTime = std::max(ns.mostRecentDeauthTime,(int64_t)OSUtils::jsonInt(member["lastDeauthorizedTime"],0LL)); - } catch ( ... ) {} - } - ++ns.totalMemberCount; - } catch ( ... ) {} - } - - std::sort(ns.activeBridges.begin(),ns.activeBridges.end()); - std::sort(ns.allocatedIps.begin(),ns.allocatedIps.end()); - - n->second.summaryInfoLastComputed = now; - } - } - } catch ( ... ) {} - - todo.clear(); - } - - if (!_dataReady) // sanity check - _networks_m.unlock(); - -#ifndef __WINDOWS__ - delete [] readbuf; -#endif -} - -bool JSONDB::_addOrUpdate(const nlohmann::json &j) -{ - try { - if (j.is_object()) { - std::string id(OSUtils::jsonString(j["id"],"0")); - const std::string objtype(OSUtils::jsonString(j["objtype"],"")); - if ((id.length() == 16)&&(objtype == "network")) { - - const uint64_t nwid = Utils::hexStrToU64(id.c_str()); - if (nwid) { - bool update; - { - Mutex::Lock _l(_networks_m); - _NW &nw = _networks[nwid]; - update = !nw.config.empty(); - nw.config = nlohmann::json::to_msgpack(j); - } - if (update) - _parent->onNetworkUpdate(nwid); - _recomputeSummaryInfo(nwid); - return true; - } - - } else if ((id.length() == 10)&&(objtype == "member")) { - - const uint64_t mid = Utils::hexStrToU64(id.c_str()); - const uint64_t nwid = Utils::hexStrToU64(OSUtils::jsonString(j["nwid"],"0").c_str()); - if ((mid)&&(nwid)) { - bool update = false; - bool deauth = false; - { - Mutex::Lock _l(_networks_m); - std::vector<uint8_t> &m = _networks[nwid].members[mid]; - if (!m.empty()) { - update = true; - nlohmann::json oldm(nlohmann::json::from_msgpack(m)); - deauth = ((OSUtils::jsonBool(oldm["authorized"],false))&&(!OSUtils::jsonBool(j["authorized"],false))); - } - m = nlohmann::json::to_msgpack(j); - _members[mid].insert(nwid); - } - if (update) { - _parent->onNetworkMemberUpdate(nwid,mid); - if (deauth) - _parent->onNetworkMemberDeauthorize(nwid,mid); - } - _recomputeSummaryInfo(nwid); - return true; - } - - } else if (objtype == "_delete") { // pseudo-object-type, only used in Central harnessed mode - - const std::string deleteType(OSUtils::jsonString(j["deleteType"],"")); - id = OSUtils::jsonString(j["deleteId"],""); - if ((deleteType == "network")&&(id.length() == 16)) { - eraseNetwork(Utils::hexStrToU64(id.c_str())); - } else if ((deleteType == "member")&&(id.length() == 10)) { - const std::string networkId(OSUtils::jsonString(j["deleteNetworkId"],"")); - const uint64_t nwid = Utils::hexStrToU64(networkId.c_str()); - const uint64_t mid = Utils::hexStrToU64(id.c_str()); - if (networkId.length() == 16) - eraseNetworkMember(nwid,mid,true); - _parent->onNetworkMemberDeauthorize(nwid,mid); - } - - } - } - } catch ( ... ) {} - return false; -} - -bool JSONDB::_load(const std::string &p) -{ - // This is not used in stdin/stdout mode. Instead data is populated by - // sending it all to stdin. - - std::vector<std::string> dl(OSUtils::listDirectory(p.c_str(),true)); - for(std::vector<std::string>::const_iterator di(dl.begin());di!=dl.end();++di) { - if ((di->length() > 5)&&(di->substr(di->length() - 5) == ".json")) { - std::string buf; - if (OSUtils::readFile((p + ZT_PATH_SEPARATOR_S + *di).c_str(),buf)) { - try { - _addOrUpdate(OSUtils::jsonParse(buf)); - } catch ( ... ) {} - } - } else { - this->_load((p + ZT_PATH_SEPARATOR_S + *di)); - } - } - - return true; -} - -void JSONDB::_recomputeSummaryInfo(const uint64_t networkId) -{ - Mutex::Lock _l(_summaryThread_m); - if (std::find(_summaryThreadToDo.begin(),_summaryThreadToDo.end(),networkId) == _summaryThreadToDo.end()) - _summaryThreadToDo.push_back(networkId); - if (!_summaryThread) - _summaryThread = Thread::start(this); -} - -std::string JSONDB::_genPath(const std::string &n,bool create) -{ - std::vector<std::string> pt(OSUtils::split(n.c_str(),"/","","")); - if (pt.size() == 0) - return std::string(); - - std::string p(_basePath); - if (create) OSUtils::mkdir(p.c_str()); - for(unsigned long i=0,j=(unsigned long)(pt.size()-1);i<j;++i) { - p.push_back(ZT_PATH_SEPARATOR); - p.append(pt[i]); - if (create) OSUtils::mkdir(p.c_str()); - } - - p.push_back(ZT_PATH_SEPARATOR); - p.append(pt[pt.size()-1]); - p.append(".json"); - - return p; -} - -} // namespace ZeroTier diff --git a/controller/JSONDB.hpp b/controller/JSONDB.hpp deleted file mode 100644 index db909cb0..00000000 --- a/controller/JSONDB.hpp +++ /dev/null @@ -1,192 +0,0 @@ -/* - * ZeroTier One - Network Virtualization Everywhere - * Copyright (C) 2011-2015 ZeroTier, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - */ - -#ifndef ZT_JSONDB_HPP -#define ZT_JSONDB_HPP - -#include <stdio.h> -#include <stdlib.h> -#include <string.h> - -#include <string> -#include <map> -#include <stdexcept> -#include <vector> -#include <algorithm> -#include <unordered_map> -#include <unordered_set> - -#include "../node/Constants.hpp" -#include "../node/Utils.hpp" -#include "../node/InetAddress.hpp" -#include "../node/Mutex.hpp" -#include "../ext/json/json.hpp" -#include "../osdep/OSUtils.hpp" -#include "../osdep/Thread.hpp" - -namespace ZeroTier { - -class EmbeddedNetworkController; - -/** - * Hierarchical JSON store that persists into the filesystem or via HTTP - */ -class JSONDB -{ -public: - struct NetworkSummaryInfo - { - NetworkSummaryInfo() : authorizedMemberCount(0),activeMemberCount(0),totalMemberCount(0),mostRecentDeauthTime(0) {} - std::vector<Address> activeBridges; - std::vector<InetAddress> allocatedIps; - unsigned long authorizedMemberCount; - unsigned long activeMemberCount; - unsigned long totalMemberCount; - int64_t mostRecentDeauthTime; - }; - - JSONDB(const std::string &basePath,EmbeddedNetworkController *parent); - ~JSONDB(); - - /** - * Write a JSON object to the data store - * - * It's important that obj contain a valid JSON object with no newlines (jsonDump with -1 - * for indentation), since newline-delimited JSON is what nodeJS's IPC speaks and this - * is important in Central-harnessed mode. - * - * @param n Path name of object - * @param obj Object in single-line no-CRs JSON object format (OSUtils::jsonDump(obj,-1)) - * @return True if write appears successful - */ - bool writeRaw(const std::string &n,const std::string &obj); - - bool hasNetwork(const uint64_t networkId) const; - - bool getNetwork(const uint64_t networkId,nlohmann::json &config) const; - - bool getNetworkSummaryInfo(const uint64_t networkId,NetworkSummaryInfo &ns) const; - - /** - * @return Bit mask: 0 == none, 1 == network only, 3 == network and member - */ - int getNetworkAndMember(const uint64_t networkId,const uint64_t nodeId,nlohmann::json &networkConfig,nlohmann::json &memberConfig,NetworkSummaryInfo &ns) const; - - bool getNetworkMember(const uint64_t networkId,const uint64_t nodeId,nlohmann::json &memberConfig) const; - - void saveNetwork(const uint64_t networkId,const nlohmann::json &networkConfig); - - void saveNetworkMember(const uint64_t networkId,const uint64_t nodeId,const nlohmann::json &memberConfig); - - nlohmann::json eraseNetwork(const uint64_t networkId); - - nlohmann::json eraseNetworkMember(const uint64_t networkId,const uint64_t nodeId,bool recomputeSummaryInfo = true); - - std::vector<uint64_t> networkIds() const - { - std::vector<uint64_t> r; - Mutex::Lock _l(_networks_m); - for(std::unordered_map<uint64_t,_NW>::const_iterator n(_networks.begin());n!=_networks.end();++n) - r.push_back(n->first); - return r; - } - - inline unsigned long memberCount(const uint64_t networkId) - { - Mutex::Lock _l(_networks_m); - std::unordered_map<uint64_t,_NW>::const_iterator i(_networks.find(networkId)); - if (i != _networks.end()) - return (unsigned long)i->second.members.size(); - return 0; - } - - template<typename F> - inline void eachMember(const uint64_t networkId,F func) - { - Mutex::Lock _l(_networks_m); - std::unordered_map<uint64_t,_NW>::const_iterator i(_networks.find(networkId)); - if (i != _networks.end()) { - for(std::unordered_map< uint64_t,std::vector<uint8_t> >::const_iterator m(i->second.members.begin());m!=i->second.members.end();++m) { - try { - func(networkId,m->first,nlohmann::json::from_msgpack(m->second)); - } catch ( ... ) {} - } - } - } - - template<typename F> - inline void eachId(F func) - { - Mutex::Lock _l(_networks_m); - for(std::unordered_map<uint64_t,_NW>::const_iterator i(_networks.begin());i!=_networks.end();++i) { - for(std::unordered_map< uint64_t,std::vector<uint8_t> >::const_iterator m(i->second.members.begin());m!=i->second.members.end();++m) { - try { - func(i->first,m->first); - } catch ( ... ) {} - } - } - } - - inline std::vector<uint64_t> networksForMember(const uint64_t nodeId) - { - Mutex::Lock _l(_networks_m); - std::unordered_map< uint64_t,std::unordered_set< uint64_t > >::const_iterator m(_members.find(nodeId)); - if (m != _members.end()) { - return std::vector<uint64_t>(m->second.begin(),m->second.end()); - } else { - return std::vector<uint64_t>(); - } - } - - void threadMain() - throw(); - -private: - bool _addOrUpdate(const nlohmann::json &j); - bool _load(const std::string &p); - void _recomputeSummaryInfo(const uint64_t networkId); - std::string _genPath(const std::string &n,bool create); - - EmbeddedNetworkController *const _parent; - std::string _basePath; - int _rawInput,_rawOutput; - Mutex _rawLock; - - Thread _summaryThread; - std::vector<uint64_t> _summaryThreadToDo; - volatile bool _summaryThreadRun; - Mutex _summaryThread_m; - - struct _NW - { - _NW() : summaryInfoLastComputed(0) {} - std::vector<uint8_t> config; - NetworkSummaryInfo summaryInfo; - uint64_t summaryInfoLastComputed; - std::unordered_map< uint64_t,std::vector<uint8_t> > members; - }; - - std::unordered_map< uint64_t,_NW > _networks; - std::unordered_map< uint64_t,std::unordered_set< uint64_t > > _members; - bool _dataReady; - Mutex _networks_m; -}; - -} // namespace ZeroTier - -#endif diff --git a/controller/RethinkDB.cpp b/controller/RethinkDB.cpp index ffa1a188..bea941fa 100644 --- a/controller/RethinkDB.cpp +++ b/controller/RethinkDB.cpp @@ -33,12 +33,12 @@ using json = nlohmann::json; namespace ZeroTier { RethinkDB::RethinkDB(EmbeddedNetworkController *const nc,const Address &myAddress,const char *path) : - _controller(nc), - _myAddress(myAddress), + DB(nc,myAddress,path), _ready(2), // two tables need to be synchronized before we're ready, so this is ready when it reaches 0 _run(1), _waitNoticePrinted(false) { + // rethinkdb:host:port:db[:auth] std::vector<std::string> ps(OSUtils::split(path,":","","")); if ((ps.size() < 4)||(ps[0] != "rethinkdb")) throw std::runtime_error("invalid rethinkdb database url"); @@ -50,12 +50,6 @@ RethinkDB::RethinkDB(EmbeddedNetworkController *const nc,const Address &myAddres _readyLock.lock(); - { - char tmp[32]; - _myAddress.toString(tmp); - _myAddressStr = tmp; - } - _membersDbWatcher = std::thread([this]() { try { while (_run == 1) { @@ -79,7 +73,7 @@ RethinkDB::RethinkDB(EmbeddedNetworkController *const nc,const Address &myAddres json &nv = tmp["new_val"]; if (ov.is_object()||nv.is_object()) { //if (nv.is_object()) printf("MEMBER: %s" ZT_EOL_S,nv.dump().c_str()); - this->_memberChanged(ov,nv); + this->_memberChanged(ov,nv,(this->_ready <= 0)); } } catch ( ... ) {} // ignore bad records } @@ -120,7 +114,7 @@ RethinkDB::RethinkDB(EmbeddedNetworkController *const nc,const Address &myAddres json &nv = tmp["new_val"]; if (ov.is_object()||nv.is_object()) { //if (nv.is_object()) printf("NETWORK: %s" ZT_EOL_S,nv.dump().c_str()); - this->_networkChanged(ov,nv); + this->_networkChanged(ov,nv,(this->_ready <= 0)); } } catch ( ... ) {} // ignore bad records } @@ -166,18 +160,18 @@ RethinkDB::RethinkDB(EmbeddedNetworkController *const nc,const Address &myAddres record["controllerId"] = this->_myAddressStr; record["config"] = *config; table = "Network"; - } else if (objtype == "delete_network") { + } else if (objtype == "trace") { + record = *config; + table = "RemoteTrace"; + } else if (objtype == "_delete_network") { deleteId = (*config)["id"]; table = "Network"; - } else if (objtype == "delete_member") { + } else if (objtype == "_delete_member") { deleteId = (*config)["nwid"]; deleteId.push_back('-'); const std::string tmp = (*config)["id"]; deleteId.append(tmp); table = "Member"; - } else if (objtype == "trace") { - record = *config; - table = "RemoteTrace"; } else { delete config; continue; @@ -259,114 +253,16 @@ RethinkDB::~RethinkDB() _heartbeatThread.join(); } -bool RethinkDB::get(const uint64_t networkId,nlohmann::json &network) -{ - waitForReady(); - std::shared_ptr<_Network> nw; - { - std::lock_guard<std::mutex> l(_networks_l); - auto nwi = _networks.find(networkId); - if (nwi == _networks.end()) - return false; - nw = nwi->second; - } - { - std::lock_guard<std::mutex> l2(nw->lock); - network = nw->config; - } - return true; -} - -bool RethinkDB::get(const uint64_t networkId,nlohmann::json &network,const uint64_t memberId,nlohmann::json &member) -{ - waitForReady(); - std::shared_ptr<_Network> nw; - { - std::lock_guard<std::mutex> l(_networks_l); - auto nwi = _networks.find(networkId); - if (nwi == _networks.end()) - return false; - nw = nwi->second; - } - { - std::lock_guard<std::mutex> l2(nw->lock); - network = nw->config; - auto m = nw->members.find(memberId); - if (m == nw->members.end()) - return false; - member = m->second; - } - return true; -} - -bool RethinkDB::get(const uint64_t networkId,nlohmann::json &network,const uint64_t memberId,nlohmann::json &member,NetworkSummaryInfo &info) +void RethinkDB::waitForReady() const { - waitForReady(); - std::shared_ptr<_Network> nw; - { - std::lock_guard<std::mutex> l(_networks_l); - auto nwi = _networks.find(networkId); - if (nwi == _networks.end()) - return false; - nw = nwi->second; - } - { - std::lock_guard<std::mutex> l2(nw->lock); - network = nw->config; - _fillSummaryInfo(nw,info); - auto m = nw->members.find(memberId); - if (m == nw->members.end()) - return false; - member = m->second; - } - return true; -} - -bool RethinkDB::get(const uint64_t networkId,nlohmann::json &network,std::vector<nlohmann::json> &members) -{ - waitForReady(); - std::shared_ptr<_Network> nw; - { - std::lock_guard<std::mutex> l(_networks_l); - auto nwi = _networks.find(networkId); - if (nwi == _networks.end()) - return false; - nw = nwi->second; - } - { - std::lock_guard<std::mutex> l2(nw->lock); - network = nw->config; - for(auto m=nw->members.begin();m!=nw->members.end();++m) - members.push_back(m->second); - } - return true; -} - -bool RethinkDB::summary(const uint64_t networkId,NetworkSummaryInfo &info) -{ - waitForReady(); - std::shared_ptr<_Network> nw; - { - std::lock_guard<std::mutex> l(_networks_l); - auto nwi = _networks.find(networkId); - if (nwi == _networks.end()) - return false; - nw = nwi->second; - } - { - std::lock_guard<std::mutex> l2(nw->lock); - _fillSummaryInfo(nw,info); + while (_ready > 0) { + if (!_waitNoticePrinted) { + _waitNoticePrinted = true; + fprintf(stderr,"NOTICE: controller RethinkDB waiting for initial data download..." ZT_EOL_S); + } + _readyLock.lock(); + _readyLock.unlock(); } - return true; -} - -void RethinkDB::networks(std::vector<uint64_t> &networks) -{ - waitForReady(); - std::lock_guard<std::mutex> l(_networks_l); - networks.reserve(_networks.size() + 1); - for(auto n=_networks.begin();n!=_networks.end();++n) - networks.push_back(n->first); } void RethinkDB::save(const nlohmann::json &record) @@ -382,7 +278,7 @@ void RethinkDB::eraseNetwork(const uint64_t networkId) Utils::hex(networkId,tmp2); json *tmp = new json(); (*tmp)["id"] = tmp2; - (*tmp)["objtype"] = "delete_network"; // pseudo-type, tells thread to delete network + (*tmp)["objtype"] = "_delete_network"; // pseudo-type, tells thread to delete network _commitQueue.post(tmp); } @@ -395,155 +291,10 @@ void RethinkDB::eraseMember(const uint64_t networkId,const uint64_t memberId) (*tmp)["nwid"] = tmp2; Utils::hex10(memberId,tmp2); (*tmp)["id"] = tmp2; - (*tmp)["objtype"] = "delete_member"; // pseudo-type, tells thread to delete network + (*tmp)["objtype"] = "_delete_member"; // pseudo-type, tells thread to delete network _commitQueue.post(tmp); } -void RethinkDB::_memberChanged(nlohmann::json &old,nlohmann::json &member) -{ - uint64_t memberId = 0; - uint64_t networkId = 0; - bool isAuth = false; - bool wasAuth = false; - std::shared_ptr<_Network> nw; - - if (old.is_object()) { - json &config = old["config"]; - if (config.is_object()) { - memberId = OSUtils::jsonIntHex(config["id"],0ULL); - networkId = OSUtils::jsonIntHex(config["nwid"],0ULL); - if ((memberId)&&(networkId)) { - { - std::lock_guard<std::mutex> l(_networks_l); - auto nw2 = _networks.find(networkId); - if (nw2 != _networks.end()) - nw = nw2->second; - } - if (nw) { - std::lock_guard<std::mutex> l(nw->lock); - if (OSUtils::jsonBool(config["activeBridge"],false)) - nw->activeBridgeMembers.erase(memberId); - wasAuth = OSUtils::jsonBool(config["authorized"],false); - if (wasAuth) - nw->authorizedMembers.erase(memberId); - json &ips = config["ipAssignments"]; - if (ips.is_array()) { - for(unsigned long i=0;i<ips.size();++i) { - json &ipj = ips[i]; - if (ipj.is_string()) { - const std::string ips = ipj; - InetAddress ipa(ips.c_str()); - ipa.setPort(0); - nw->allocatedIps.erase(ipa); - } - } - } - } - } - } - } - - if (member.is_object()) { - json &config = member["config"]; - if (config.is_object()) { - if (!nw) { - memberId = OSUtils::jsonIntHex(config["id"],0ULL); - networkId = OSUtils::jsonIntHex(config["nwid"],0ULL); - if ((!memberId)||(!networkId)) - return; - std::lock_guard<std::mutex> l(_networks_l); - std::shared_ptr<_Network> &nw2 = _networks[networkId]; - if (!nw2) - nw2.reset(new _Network); - nw = nw2; - } - - { - std::lock_guard<std::mutex> l(nw->lock); - - nw->members[memberId] = config; - - if (OSUtils::jsonBool(config["activeBridge"],false)) - nw->activeBridgeMembers.insert(memberId); - isAuth = OSUtils::jsonBool(config["authorized"],false); - if (isAuth) - nw->authorizedMembers.insert(memberId); - json &ips = config["ipAssignments"]; - if (ips.is_array()) { - for(unsigned long i=0;i<ips.size();++i) { - json &ipj = ips[i]; - if (ipj.is_string()) { - const std::string ips = ipj; - InetAddress ipa(ips.c_str()); - ipa.setPort(0); - nw->allocatedIps.insert(ipa); - } - } - } - - if (!isAuth) { - const int64_t ldt = (int64_t)OSUtils::jsonInt(config["lastDeauthorizedTime"],0ULL); - if (ldt > nw->mostRecentDeauthTime) - nw->mostRecentDeauthTime = ldt; - } - } - - _controller->onNetworkMemberUpdate(networkId,memberId); - } - } else if (memberId) { - if (nw) { - std::lock_guard<std::mutex> l(nw->lock); - nw->members.erase(memberId); - } - if (networkId) { - std::lock_guard<std::mutex> l(_networks_l); - auto er = _networkByMember.equal_range(memberId); - for(auto i=er.first;i!=er.second;++i) { - if (i->second == networkId) { - _networkByMember.erase(i); - break; - } - } - } - } - - if ((wasAuth)&&(!isAuth)&&(networkId)&&(memberId)) - _controller->onNetworkMemberDeauthorize(networkId,memberId); -} - -void RethinkDB::_networkChanged(nlohmann::json &old,nlohmann::json &network) -{ - if (network.is_object()) { - json &config = network["config"]; - if (config.is_object()) { - const std::string ids = config["id"]; - const uint64_t id = Utils::hexStrToU64(ids.c_str()); - if (id) { - std::shared_ptr<_Network> nw; - { - std::lock_guard<std::mutex> l(_networks_l); - std::shared_ptr<_Network> &nw2 = _networks[id]; - if (!nw2) - nw2.reset(new _Network); - nw = nw2; - } - { - std::lock_guard<std::mutex> l2(nw->lock); - nw->config = config; - } - _controller->onNetworkUpdate(id); - } - } - } else if (old.is_object()) { - const std::string ids = old["id"]; - const uint64_t id = Utils::hexStrToU64(ids.c_str()); - if (id) { - std::lock_guard<std::mutex> l(_networks_l); - _networks.erase(id); - } - } -} - } // namespace ZeroTier #endif // ZT_CONTROLLER_USE_RETHINKDB diff --git a/controller/RethinkDB.hpp b/controller/RethinkDB.hpp index 62586803..2d7ba58e 100644 --- a/controller/RethinkDB.hpp +++ b/controller/RethinkDB.hpp @@ -16,112 +16,35 @@ * along with this program. If not, see <http://www.gnu.org/licenses/>. */ +#define ZT_CONTROLLER_USE_RETHINKDB + #ifdef ZT_CONTROLLER_USE_RETHINKDB #ifndef ZT_CONTROLLER_RETHINKDB_HPP #define ZT_CONTROLLER_RETHINKDB_HPP -#include "../node/Constants.hpp" -#include "../node/Address.hpp" -#include "../node/InetAddress.hpp" -#include "../osdep/OSUtils.hpp" -#include "../osdep/BlockingQueue.hpp" - -#include <memory> -#include <string> -#include <thread> -#include <unordered_map> -#include <unordered_set> -#include <vector> -#include <atomic> - -#include "../ext/json/json.hpp" +#include "DB.hpp" #define ZT_CONTROLLER_RETHINKDB_COMMIT_THREADS 2 namespace ZeroTier { -class EmbeddedNetworkController; - -class RethinkDB +class RethinkDB : public DB { public: - struct NetworkSummaryInfo - { - NetworkSummaryInfo() : authorizedMemberCount(0),totalMemberCount(0),mostRecentDeauthTime(0) {} - std::vector<Address> activeBridges; - std::vector<InetAddress> allocatedIps; - unsigned long authorizedMemberCount; - unsigned long totalMemberCount; - int64_t mostRecentDeauthTime; - }; - RethinkDB(EmbeddedNetworkController *const nc,const Address &myAddress,const char *path); - ~RethinkDB(); - - inline void waitForReady() const - { - while (_ready > 0) { - if (!_waitNoticePrinted) { - _waitNoticePrinted = true; - fprintf(stderr,"NOTICE: controller RethinkDB waiting for initial data download..." ZT_EOL_S); - } - _readyLock.lock(); - _readyLock.unlock(); - } - } - - inline bool hasNetwork(const uint64_t networkId) const - { - std::lock_guard<std::mutex> l(_networks_l); - return (_networks.find(networkId) != _networks.end()); - } - - bool get(const uint64_t networkId,nlohmann::json &network); - bool get(const uint64_t networkId,nlohmann::json &network,const uint64_t memberId,nlohmann::json &member); - bool get(const uint64_t networkId,nlohmann::json &network,const uint64_t memberId,nlohmann::json &member,NetworkSummaryInfo &info); - bool get(const uint64_t networkId,nlohmann::json &network,std::vector<nlohmann::json> &members); - - bool summary(const uint64_t networkId,NetworkSummaryInfo &info); - - void networks(std::vector<uint64_t> &networks); - - void save(const nlohmann::json &record); - - void eraseNetwork(const uint64_t networkId); - void eraseMember(const uint64_t networkId,const uint64_t memberId); - -private: - struct _Network - { - _Network() : mostRecentDeauthTime(0) {} - nlohmann::json config; - std::unordered_map<uint64_t,nlohmann::json> members; - std::unordered_set<uint64_t> activeBridgeMembers; - std::unordered_set<uint64_t> authorizedMembers; - std::unordered_set<InetAddress,InetAddress::Hasher> allocatedIps; - int64_t mostRecentDeauthTime; - std::mutex lock; - }; - - void _memberChanged(nlohmann::json &old,nlohmann::json &member); - void _networkChanged(nlohmann::json &old,nlohmann::json &network); - - inline void _fillSummaryInfo(const std::shared_ptr<_Network> &nw,NetworkSummaryInfo &info) - { - for(auto ab=nw->activeBridgeMembers.begin();ab!=nw->activeBridgeMembers.end();++ab) - info.activeBridges.push_back(Address(*ab)); - for(auto ip=nw->allocatedIps.begin();ip!=nw->allocatedIps.end();++ip) - info.allocatedIps.push_back(*ip); - info.authorizedMemberCount = (unsigned long)nw->authorizedMembers.size(); - info.totalMemberCount = (unsigned long)nw->members.size(); - info.mostRecentDeauthTime = nw->mostRecentDeauthTime; - } - - EmbeddedNetworkController *const _controller; - const Address _myAddress; - std::string _myAddressStr; + virtual ~RethinkDB(); + + virtual void waitForReady() const; + + virtual void save(const nlohmann::json &record); + + virtual void eraseNetwork(const uint64_t networkId); + + virtual void eraseMember(const uint64_t networkId,const uint64_t memberId); + +protected: std::string _host; std::string _db; std::string _auth; @@ -132,10 +55,6 @@ private: std::thread _networksDbWatcher; std::thread _membersDbWatcher; - std::unordered_map< uint64_t,std::shared_ptr<_Network> > _networks; - std::unordered_multimap< uint64_t,uint64_t > _networkByMember; - mutable std::mutex _networks_l; - BlockingQueue< nlohmann::json * > _commitQueue; std::thread _commitThread[ZT_CONTROLLER_RETHINKDB_COMMIT_THREADS]; |