diff options
author | Adam Ierymenko <adam.ierymenko@gmail.com> | 2017-11-07 14:44:46 -0800 |
---|---|---|
committer | Adam Ierymenko <adam.ierymenko@gmail.com> | 2017-11-07 14:44:46 -0800 |
commit | 1613f42d0082cf6438ad0c62d89405ab82625f98 (patch) | |
tree | bc46b11bc909b28de54252e1691a58380b28bae9 | |
parent | 7fc9094d8ea1c2d28d003c499016f0755b73063d (diff) | |
download | infinitytier-1613f42d0082cf6438ad0c62d89405ab82625f98.tar.gz infinitytier-1613f42d0082cf6438ad0c62d89405ab82625f98.zip |
Re-integrate in-filesystem DB into new controller DB structure.
-rw-r--r-- | attic/JSONDB.cpp (renamed from controller/JSONDB.cpp) | 0 | ||||
-rw-r--r-- | attic/JSONDB.hpp (renamed from controller/JSONDB.hpp) | 0 | ||||
-rw-r--r-- | controller/DB.cpp | 314 | ||||
-rw-r--r-- | controller/DB.hpp | 116 | ||||
-rw-r--r-- | controller/EmbeddedNetworkController.hpp | 5 | ||||
-rw-r--r-- | controller/FileDB.cpp | 129 | ||||
-rw-r--r-- | controller/FileDB.hpp | 47 | ||||
-rw-r--r-- | controller/RethinkDB.cpp | 287 | ||||
-rw-r--r-- | controller/RethinkDB.hpp | 111 | ||||
-rw-r--r-- | objects.mk | 2 | ||||
-rw-r--r-- | osdep/OSUtils.hpp | 15 |
11 files changed, 659 insertions, 367 deletions
diff --git a/controller/JSONDB.cpp b/attic/JSONDB.cpp index 67b13393..67b13393 100644 --- a/controller/JSONDB.cpp +++ b/attic/JSONDB.cpp diff --git a/controller/JSONDB.hpp b/attic/JSONDB.hpp index db909cb0..db909cb0 100644 --- a/controller/JSONDB.hpp +++ b/attic/JSONDB.hpp diff --git a/controller/DB.cpp b/controller/DB.cpp new file mode 100644 index 00000000..4a5688e3 --- /dev/null +++ b/controller/DB.cpp @@ -0,0 +1,314 @@ +/* + * ZeroTier One - Network Virtualization Everywhere + * Copyright (C) 2011-2015 ZeroTier, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#include "DB.hpp" +#include "EmbeddedNetworkController.hpp" + +#include <chrono> +#include <algorithm> +#include <stdexcept> + +using json = nlohmann::json; + +namespace ZeroTier { + +DB::DB(EmbeddedNetworkController *const nc,const Address &myAddress,const char *path) : + _controller(nc), + _myAddress(myAddress), + _path((path) ? path : "") +{ + { + char tmp[32]; + _myAddress.toString(tmp); + _myAddressStr = tmp; + } +} + +DB::~DB() +{ +} + +bool DB::get(const uint64_t networkId,nlohmann::json &network) +{ + waitForReady(); + std::shared_ptr<_Network> nw; + { + std::lock_guard<std::mutex> l(_networks_l); + auto nwi = _networks.find(networkId); + if (nwi == _networks.end()) + return false; + nw = nwi->second; + } + { + std::lock_guard<std::mutex> l2(nw->lock); + network = nw->config; + } + return true; +} + +bool DB::get(const uint64_t networkId,nlohmann::json &network,const uint64_t memberId,nlohmann::json &member) +{ + waitForReady(); + std::shared_ptr<_Network> nw; + { + std::lock_guard<std::mutex> l(_networks_l); + auto nwi = _networks.find(networkId); + if (nwi == _networks.end()) + return false; + nw = nwi->second; + } + { + std::lock_guard<std::mutex> l2(nw->lock); + network = nw->config; + auto m = nw->members.find(memberId); + if (m == nw->members.end()) + return false; + member = m->second; + } + return true; +} + +bool DB::get(const uint64_t networkId,nlohmann::json &network,const uint64_t memberId,nlohmann::json &member,NetworkSummaryInfo &info) +{ + waitForReady(); + std::shared_ptr<_Network> nw; + { + std::lock_guard<std::mutex> l(_networks_l); + auto nwi = _networks.find(networkId); + if (nwi == _networks.end()) + return false; + nw = nwi->second; + } + { + std::lock_guard<std::mutex> l2(nw->lock); + network = nw->config; + _fillSummaryInfo(nw,info); + auto m = nw->members.find(memberId); + if (m == nw->members.end()) + return false; + member = m->second; + } + return true; +} + +bool DB::get(const uint64_t networkId,nlohmann::json &network,std::vector<nlohmann::json> &members) +{ + waitForReady(); + std::shared_ptr<_Network> nw; + { + std::lock_guard<std::mutex> l(_networks_l); + auto nwi = _networks.find(networkId); + if (nwi == _networks.end()) + return false; + nw = nwi->second; + } + { + std::lock_guard<std::mutex> l2(nw->lock); + network = nw->config; + for(auto m=nw->members.begin();m!=nw->members.end();++m) + members.push_back(m->second); + } + return true; +} + +bool DB::summary(const uint64_t networkId,NetworkSummaryInfo &info) +{ + waitForReady(); + std::shared_ptr<_Network> nw; + { + std::lock_guard<std::mutex> l(_networks_l); + auto nwi = _networks.find(networkId); + if (nwi == _networks.end()) + return false; + nw = nwi->second; + } + { + std::lock_guard<std::mutex> l2(nw->lock); + _fillSummaryInfo(nw,info); + } + return true; +} + +void DB::networks(std::vector<uint64_t> &networks) +{ + waitForReady(); + std::lock_guard<std::mutex> l(_networks_l); + networks.reserve(_networks.size() + 1); + for(auto n=_networks.begin();n!=_networks.end();++n) + networks.push_back(n->first); +} + +void DB::_memberChanged(nlohmann::json &old,nlohmann::json &member,bool push) +{ + uint64_t memberId = 0; + uint64_t networkId = 0; + bool isAuth = false; + bool wasAuth = false; + std::shared_ptr<_Network> nw; + + if (old.is_object()) { + json &config = old["config"]; + if (config.is_object()) { + memberId = OSUtils::jsonIntHex(config["id"],0ULL); + networkId = OSUtils::jsonIntHex(config["nwid"],0ULL); + if ((memberId)&&(networkId)) { + { + std::lock_guard<std::mutex> l(_networks_l); + auto nw2 = _networks.find(networkId); + if (nw2 != _networks.end()) + nw = nw2->second; + } + if (nw) { + std::lock_guard<std::mutex> l(nw->lock); + if (OSUtils::jsonBool(config["activeBridge"],false)) + nw->activeBridgeMembers.erase(memberId); + wasAuth = OSUtils::jsonBool(config["authorized"],false); + if (wasAuth) + nw->authorizedMembers.erase(memberId); + json &ips = config["ipAssignments"]; + if (ips.is_array()) { + for(unsigned long i=0;i<ips.size();++i) { + json &ipj = ips[i]; + if (ipj.is_string()) { + const std::string ips = ipj; + InetAddress ipa(ips.c_str()); + ipa.setPort(0); + nw->allocatedIps.erase(ipa); + } + } + } + } + } + } + } + + if (member.is_object()) { + json &config = member["config"]; + if (config.is_object()) { + if (!nw) { + memberId = OSUtils::jsonIntHex(config["id"],0ULL); + networkId = OSUtils::jsonIntHex(config["nwid"],0ULL); + if ((!memberId)||(!networkId)) + return; + std::lock_guard<std::mutex> l(_networks_l); + std::shared_ptr<_Network> &nw2 = _networks[networkId]; + if (!nw2) + nw2.reset(new _Network); + nw = nw2; + } + + { + std::lock_guard<std::mutex> l(nw->lock); + + nw->members[memberId] = config; + + if (OSUtils::jsonBool(config["activeBridge"],false)) + nw->activeBridgeMembers.insert(memberId); + isAuth = OSUtils::jsonBool(config["authorized"],false); + if (isAuth) + nw->authorizedMembers.insert(memberId); + json &ips = config["ipAssignments"]; + if (ips.is_array()) { + for(unsigned long i=0;i<ips.size();++i) { + json &ipj = ips[i]; + if (ipj.is_string()) { + const std::string ips = ipj; + InetAddress ipa(ips.c_str()); + ipa.setPort(0); + nw->allocatedIps.insert(ipa); + } + } + } + + if (!isAuth) { + const int64_t ldt = (int64_t)OSUtils::jsonInt(config["lastDeauthorizedTime"],0ULL); + if (ldt > nw->mostRecentDeauthTime) + nw->mostRecentDeauthTime = ldt; + } + } + + if (push) + _controller->onNetworkMemberUpdate(networkId,memberId); + } + } else if (memberId) { + if (nw) { + std::lock_guard<std::mutex> l(nw->lock); + nw->members.erase(memberId); + } + if (networkId) { + std::lock_guard<std::mutex> l(_networks_l); + auto er = _networkByMember.equal_range(memberId); + for(auto i=er.first;i!=er.second;++i) { + if (i->second == networkId) { + _networkByMember.erase(i); + break; + } + } + } + } + + if ((push)&&((wasAuth)&&(!isAuth)&&(networkId)&&(memberId))) + _controller->onNetworkMemberDeauthorize(networkId,memberId); +} + +void DB::_networkChanged(nlohmann::json &old,nlohmann::json &network,bool push) +{ + if (network.is_object()) { + json &config = network["config"]; + if (config.is_object()) { + const std::string ids = config["id"]; + const uint64_t id = Utils::hexStrToU64(ids.c_str()); + if (id) { + std::shared_ptr<_Network> nw; + { + std::lock_guard<std::mutex> l(_networks_l); + std::shared_ptr<_Network> &nw2 = _networks[id]; + if (!nw2) + nw2.reset(new _Network); + nw = nw2; + } + { + std::lock_guard<std::mutex> l2(nw->lock); + nw->config = config; + } + if (push) + _controller->onNetworkUpdate(id); + } + } + } else if (old.is_object()) { + const std::string ids = old["id"]; + const uint64_t id = Utils::hexStrToU64(ids.c_str()); + if (id) { + std::lock_guard<std::mutex> l(_networks_l); + _networks.erase(id); + } + } +} + +void DB::_fillSummaryInfo(const std::shared_ptr<_Network> &nw,NetworkSummaryInfo &info) +{ + for(auto ab=nw->activeBridgeMembers.begin();ab!=nw->activeBridgeMembers.end();++ab) + info.activeBridges.push_back(Address(*ab)); + for(auto ip=nw->allocatedIps.begin();ip!=nw->allocatedIps.end();++ip) + info.allocatedIps.push_back(*ip); + info.authorizedMemberCount = (unsigned long)nw->authorizedMembers.size(); + info.totalMemberCount = (unsigned long)nw->members.size(); + info.mostRecentDeauthTime = nw->mostRecentDeauthTime; +} + +} // namespace ZeroTier diff --git a/controller/DB.hpp b/controller/DB.hpp new file mode 100644 index 00000000..dfc8ac95 --- /dev/null +++ b/controller/DB.hpp @@ -0,0 +1,116 @@ +/* + * ZeroTier One - Network Virtualization Everywhere + * Copyright (C) 2011-2015 ZeroTier, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#ifndef ZT_CONTROLLER_DB_HPP +#define ZT_CONTROLLER_DB_HPP + +#include "../node/Constants.hpp" +#include "../node/Address.hpp" +#include "../node/InetAddress.hpp" +#include "../osdep/OSUtils.hpp" +#include "../osdep/BlockingQueue.hpp" + +#include <memory> +#include <string> +#include <thread> +#include <unordered_map> +#include <unordered_set> +#include <vector> +#include <atomic> + +#include "../ext/json/json.hpp" + +#define ZT_CONTROLLER_RETHINKDB_COMMIT_THREADS 2 + +namespace ZeroTier +{ + +class EmbeddedNetworkController; + +/** + * Base class with common infrastructure for all controller DB implementations + */ +class DB +{ +public: + struct NetworkSummaryInfo + { + NetworkSummaryInfo() : authorizedMemberCount(0),totalMemberCount(0),mostRecentDeauthTime(0) {} + std::vector<Address> activeBridges; + std::vector<InetAddress> allocatedIps; + unsigned long authorizedMemberCount; + unsigned long totalMemberCount; + int64_t mostRecentDeauthTime; + }; + + DB(EmbeddedNetworkController *const nc,const Address &myAddress,const char *path); + virtual ~DB(); + + virtual bool waitForReady() = 0; + + inline bool hasNetwork(const uint64_t networkId) const + { + std::lock_guard<std::mutex> l(_networks_l); + return (_networks.find(networkId) != _networks.end()); + } + + bool get(const uint64_t networkId,nlohmann::json &network); + bool get(const uint64_t networkId,nlohmann::json &network,const uint64_t memberId,nlohmann::json &member); + bool get(const uint64_t networkId,nlohmann::json &network,const uint64_t memberId,nlohmann::json &member,NetworkSummaryInfo &info); + bool get(const uint64_t networkId,nlohmann::json &network,std::vector<nlohmann::json> &members); + + bool summary(const uint64_t networkId,NetworkSummaryInfo &info); + + void networks(std::vector<uint64_t> &networks); + + virtual void save(const nlohmann::json &record) = 0; + + virtual void eraseNetwork(const uint64_t networkId) = 0; + + virtual void eraseMember(const uint64_t networkId,const uint64_t memberId) = 0; + +protected: + struct _Network + { + _Network() : mostRecentDeauthTime(0) {} + nlohmann::json config; + std::unordered_map<uint64_t,nlohmann::json> members; + std::unordered_set<uint64_t> activeBridgeMembers; + std::unordered_set<uint64_t> authorizedMembers; + std::unordered_set<InetAddress,InetAddress::Hasher> allocatedIps; + int64_t mostRecentDeauthTime; + std::mutex lock; + }; + + void _memberChanged(nlohmann::json &old,nlohmann::json &member,bool push); + void _networkChanged(nlohmann::json &old,nlohmann::json &network,bool push); + void _fillSummaryInfo(const std::shared_ptr<_Network> &nw,NetworkSummaryInfo &info); + + EmbeddedNetworkController *const _controller; + const Address _myAddress; + const std::string _path; + std::string _myAddressStr; + + std::unordered_map< uint64_t,std::shared_ptr<_Network> > _networks; + std::unordered_multimap< uint64_t,uint64_t > _networkByMember; + mutable std::mutex _networks_l; +}; + +} // namespace ZeroTier + +#endif diff --git a/controller/EmbeddedNetworkController.hpp b/controller/EmbeddedNetworkController.hpp index f9b6fb5a..bc59b359 100644 --- a/controller/EmbeddedNetworkController.hpp +++ b/controller/EmbeddedNetworkController.hpp @@ -45,13 +45,18 @@ #ifdef ZT_CONTROLLER_USE_RETHINKDB #include "RethinkDB.hpp" +#else +#include "FileDB.hpp" #endif namespace ZeroTier { #ifdef ZT_CONTROLLER_USE_RETHINKDB typedef RethinkDB ControllerDB; +#else +typedef FileDB ControllerDB; #endif + class Node; class EmbeddedNetworkController : public NetworkController diff --git a/controller/FileDB.cpp b/controller/FileDB.cpp new file mode 100644 index 00000000..b48d5e87 --- /dev/null +++ b/controller/FileDB.cpp @@ -0,0 +1,129 @@ +/* + * ZeroTier One - Network Virtualization Everywhere + * Copyright (C) 2011-2015 ZeroTier, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#include "FileDB.hpp" + +namespace ZeroTier +{ + +FileDB::FileDB(EmbeddedNetworkController *const nc,const Address &myAddress,const char *path) : + DB(nc,myAddress,path), + _networksPath(_path + ZT_PATH_SEPARATOR_S + "network") +{ + OSUtils::mkdir(_path.c_str()); + OSUtils::lockDownFile(_path.c_str(),true); + + std::vector<std::string> networks(OSUtils::listDirectory(_networksPath.c_str(),false)); + std::string buf; + for(auto n=networks.begin();n!=networks.end();++n) { + buf.clear(); + if ((n->length() == 21)&&(OSUtils::readFile((_networksPath + ZT_PATH_SEPARATOR_S + *n).c_str(),buf))) { + try { + nlohmann::json network(OSUtils::jsonParse(buf)); + const std::string nwids = network["id"]; + if (nwids.length() == 16) { + nlohmann::json nullJson; + _networkChanged(nullJson,network,false); + std::string membersPath(_networksPath + ZT_PATH_SEPARATOR_S + nwids + ZT_PATH_SEPARATOR_S "member"); + std::vector<std::string> members(OSUtils::listDirectory(membersPath.c_str(),false)); + for(auto m=members.begin();m!=members.end();++m) { + buf.clear(); + if ((m->length() == 15)&&(OSUtils::readFile((membersPath + ZT_PATH_SEPARATOR_S + *m).c_str(),buf))) { + try { + nlohmann::json member(OSUtils::jsonParse(buf)); + const std::string addrs = member["id"]; + if (addrs.length() == 10) { + nlohmann::json nullJson2; + _memberChanged(nullJson2,member,false); + } + } catch ( ... ) {} + } + } + } + } catch ( ... ) {} + } + } +} + +FileDB::~FileDB() +{ +} + +bool FileDB::waitForReady() +{ + return true; +} + +void FileDB::save(const nlohmann::json &record) +{ + char p1[16384],p2[16384]; + try { + nlohmann::json rec(record); + const std::string objtype = rec["objtype"]; + if (objtype == "network") { + const uint64_t nwid = OSUtils::jsonIntHex(rec["id"],0ULL); + if (nwid) { + nlohmann::json old; + get(nwid,old); + + OSUtils::ztsnprintf(p1,sizeof(p1),"%s" ZT_PATH_SEPARATOR_S "%.16llx.json.new",_networksPath.c_str(),nwid); + OSUtils::ztsnprintf(p2,sizeof(p2),"%s" ZT_PATH_SEPARATOR_S "%.16llx.json",_networksPath.c_str(),nwid); + if (!OSUtils::writeFile(p1,OSUtils::jsonDump(rec,-1))) + fprintf(stderr,"WARNING: controller unable to write to path: %s" ZT_EOL_S,p1); + OSUtils::rename(p1,p2); + + _networkChanged(old,rec,true); + } + } else if (objtype == "member") { + const uint64_t id = OSUtils::jsonIntHex(rec["id"],0ULL); + const uint64_t nwid = OSUtils::jsonIntHex(rec["nwid"],0ULL); + if ((id)&&(nwid)) { + nlohmann::json network,old; + get(nwid,network,id,old); + + OSUtils::ztsnprintf(p1,sizeof(p1),"%s" ZT_PATH_SEPARATOR_S "%.16llx" ZT_PATH_SEPARATOR_S "member" ZT_PATH_SEPARATOR_S "%.10llx.json.new",_networksPath.c_str(),nwid); + OSUtils::ztsnprintf(p2,sizeof(p2),"%s" ZT_PATH_SEPARATOR_S "%.16llx" ZT_PATH_SEPARATOR_S "member" ZT_PATH_SEPARATOR_S "%.10llx.json",_networksPath.c_str(),nwid); + if (!OSUtils::writeFile(p1,OSUtils::jsonDump(rec,-1))) + fprintf(stderr,"WARNING: controller unable to write to path: %s" ZT_EOL_S,p1); + OSUtils::rename(p1,p2); + + _memberChanged(old,rec,true); + } + } else if (objtype == "trace") { + const std::string id = rec["id"]; + OSUtils::ztsnprintf(p1,sizeof(p1),"%s" ZT_PATH_SEPARATOR_S "trace" ZT_PATH_SEPARATOR_S "%s.json",_path.c_str(),id.c_str()); + OSUtils::writeFile(p1,OSUtils::jsonDump(rec,-1)); + } + } catch ( ... ) {} // drop invalid records missing fields +} + +void FileDB::eraseNetwork(const uint64_t networkId) +{ + nlohmann::json network,nullJson; + get(networkId,network); + char p[16384]; + OSUtils::ztsnprintf(p,sizeof(p),"%s" ZT_PATH_SEPARATOR_S "%.16llx.json",_networksPath.c_str(),networkId); + OSUtils::rm(p); + _networkChanged(network,nullJson,true); +} + +void FileDB::eraseMember(const uint64_t networkId,const uint64_t memberId) +{ +} + +} // namespace ZeroTier diff --git a/controller/FileDB.hpp b/controller/FileDB.hpp new file mode 100644 index 00000000..fe9869b9 --- /dev/null +++ b/controller/FileDB.hpp @@ -0,0 +1,47 @@ +/* + * ZeroTier One - Network Virtualization Everywhere + * Copyright (C) 2011-2015 ZeroTier, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#ifndef ZT_CONTROLLER_FILEDB_HPP +#define ZT_CONTROLLER_FILEDB_HPP + +#include "DB.hpp" + +namespace ZeroTier +{ + +class FileDB : public DB +{ +public: + FileDB(EmbeddedNetworkController *const nc,const Address &myAddress,const char *path); + virtual ~FileDB(); + + virtual bool waitForReady(); + + virtual void save(const nlohmann::json &record); + + virtual void eraseNetwork(const uint64_t networkId); + + virtual void eraseMember(const uint64_t networkId,const uint64_t memberId); + +protected: + std::string _networksPath; +}; + +} // namespace ZeroTier + +#endif diff --git a/controller/RethinkDB.cpp b/controller/RethinkDB.cpp index ffa1a188..bea941fa 100644 --- a/controller/RethinkDB.cpp +++ b/controller/RethinkDB.cpp @@ -33,12 +33,12 @@ using json = nlohmann::json; namespace ZeroTier { RethinkDB::RethinkDB(EmbeddedNetworkController *const nc,const Address &myAddress,const char *path) : - _controller(nc), - _myAddress(myAddress), + DB(nc,myAddress,path), _ready(2), // two tables need to be synchronized before we're ready, so this is ready when it reaches 0 _run(1), _waitNoticePrinted(false) { + // rethinkdb:host:port:db[:auth] std::vector<std::string> ps(OSUtils::split(path,":","","")); if ((ps.size() < 4)||(ps[0] != "rethinkdb")) throw std::runtime_error("invalid rethinkdb database url"); @@ -50,12 +50,6 @@ RethinkDB::RethinkDB(EmbeddedNetworkController *const nc,const Address &myAddres _readyLock.lock(); - { - char tmp[32]; - _myAddress.toString(tmp); - _myAddressStr = tmp; - } - _membersDbWatcher = std::thread([this]() { try { while (_run == 1) { @@ -79,7 +73,7 @@ RethinkDB::RethinkDB(EmbeddedNetworkController *const nc,const Address &myAddres json &nv = tmp["new_val"]; if (ov.is_object()||nv.is_object()) { //if (nv.is_object()) printf("MEMBER: %s" ZT_EOL_S,nv.dump().c_str()); - this->_memberChanged(ov,nv); + this->_memberChanged(ov,nv,(this->_ready <= 0)); } } catch ( ... ) {} // ignore bad records } @@ -120,7 +114,7 @@ RethinkDB::RethinkDB(EmbeddedNetworkController *const nc,const Address &myAddres json &nv = tmp["new_val"]; if (ov.is_object()||nv.is_object()) { //if (nv.is_object()) printf("NETWORK: %s" ZT_EOL_S,nv.dump().c_str()); - this->_networkChanged(ov,nv); + this->_networkChanged(ov,nv,(this->_ready <= 0)); } } catch ( ... ) {} // ignore bad records } @@ -166,18 +160,18 @@ RethinkDB::RethinkDB(EmbeddedNetworkController *const nc,const Address &myAddres record["controllerId"] = this->_myAddressStr; record["config"] = *config; table = "Network"; - } else if (objtype == "delete_network") { + } else if (objtype == "trace") { + record = *config; + table = "RemoteTrace"; + } else if (objtype == "_delete_network") { deleteId = (*config)["id"]; table = "Network"; - } else if (objtype == "delete_member") { + } else if (objtype == "_delete_member") { deleteId = (*config)["nwid"]; deleteId.push_back('-'); const std::string tmp = (*config)["id"]; deleteId.append(tmp); table = "Member"; - } else if (objtype == "trace") { - record = *config; - table = "RemoteTrace"; } else { delete config; continue; @@ -259,114 +253,16 @@ RethinkDB::~RethinkDB() _heartbeatThread.join(); } -bool RethinkDB::get(const uint64_t networkId,nlohmann::json &network) -{ - waitForReady(); - std::shared_ptr<_Network> nw; - { - std::lock_guard<std::mutex> l(_networks_l); - auto nwi = _networks.find(networkId); - if (nwi == _networks.end()) - return false; - nw = nwi->second; - } - { - std::lock_guard<std::mutex> l2(nw->lock); - network = nw->config; - } - return true; -} - -bool RethinkDB::get(const uint64_t networkId,nlohmann::json &network,const uint64_t memberId,nlohmann::json &member) -{ - waitForReady(); - std::shared_ptr<_Network> nw; - { - std::lock_guard<std::mutex> l(_networks_l); - auto nwi = _networks.find(networkId); - if (nwi == _networks.end()) - return false; - nw = nwi->second; - } - { - std::lock_guard<std::mutex> l2(nw->lock); - network = nw->config; - auto m = nw->members.find(memberId); - if (m == nw->members.end()) - return false; - member = m->second; - } - return true; -} - -bool RethinkDB::get(const uint64_t networkId,nlohmann::json &network,const uint64_t memberId,nlohmann::json &member,NetworkSummaryInfo &info) +void RethinkDB::waitForReady() const { - waitForReady(); - std::shared_ptr<_Network> nw; - { - std::lock_guard<std::mutex> l(_networks_l); - auto nwi = _networks.find(networkId); - if (nwi == _networks.end()) - return false; - nw = nwi->second; - } - { - std::lock_guard<std::mutex> l2(nw->lock); - network = nw->config; - _fillSummaryInfo(nw,info); - auto m = nw->members.find(memberId); - if (m == nw->members.end()) - return false; - member = m->second; - } - return true; -} - -bool RethinkDB::get(const uint64_t networkId,nlohmann::json &network,std::vector<nlohmann::json> &members) -{ - waitForReady(); - std::shared_ptr<_Network> nw; - { - std::lock_guard<std::mutex> l(_networks_l); - auto nwi = _networks.find(networkId); - if (nwi == _networks.end()) - return false; - nw = nwi->second; - } - { - std::lock_guard<std::mutex> l2(nw->lock); - network = nw->config; - for(auto m=nw->members.begin();m!=nw->members.end();++m) - members.push_back(m->second); - } - return true; -} - -bool RethinkDB::summary(const uint64_t networkId,NetworkSummaryInfo &info) -{ - waitForReady(); - std::shared_ptr<_Network> nw; - { - std::lock_guard<std::mutex> l(_networks_l); - auto nwi = _networks.find(networkId); - if (nwi == _networks.end()) - return false; - nw = nwi->second; - } - { - std::lock_guard<std::mutex> l2(nw->lock); - _fillSummaryInfo(nw,info); + while (_ready > 0) { + if (!_waitNoticePrinted) { + _waitNoticePrinted = true; + fprintf(stderr,"NOTICE: controller RethinkDB waiting for initial data download..." ZT_EOL_S); + } + _readyLock.lock(); + _readyLock.unlock(); } - return true; -} - -void RethinkDB::networks(std::vector<uint64_t> &networks) -{ - waitForReady(); - std::lock_guard<std::mutex> l(_networks_l); - networks.reserve(_networks.size() + 1); - for(auto n=_networks.begin();n!=_networks.end();++n) - networks.push_back(n->first); } void RethinkDB::save(const nlohmann::json &record) @@ -382,7 +278,7 @@ void RethinkDB::eraseNetwork(const uint64_t networkId) Utils::hex(networkId,tmp2); json *tmp = new json(); (*tmp)["id"] = tmp2; - (*tmp)["objtype"] = "delete_network"; // pseudo-type, tells thread to delete network + (*tmp)["objtype"] = "_delete_network"; // pseudo-type, tells thread to delete network _commitQueue.post(tmp); } @@ -395,155 +291,10 @@ void RethinkDB::eraseMember(const uint64_t networkId,const uint64_t memberId) (*tmp)["nwid"] = tmp2; Utils::hex10(memberId,tmp2); (*tmp)["id"] = tmp2; - (*tmp)["objtype"] = "delete_member"; // pseudo-type, tells thread to delete network + (*tmp)["objtype"] = "_delete_member"; // pseudo-type, tells thread to delete network _commitQueue.post(tmp); } -void RethinkDB::_memberChanged(nlohmann::json &old,nlohmann::json &member) -{ - uint64_t memberId = 0; - uint64_t networkId = 0; - bool isAuth = false; - bool wasAuth = false; - std::shared_ptr<_Network> nw; - - if (old.is_object()) { - json &config = old["config"]; - if (config.is_object()) { - memberId = OSUtils::jsonIntHex(config["id"],0ULL); - networkId = OSUtils::jsonIntHex(config["nwid"],0ULL); - if ((memberId)&&(networkId)) { - { - std::lock_guard<std::mutex> l(_networks_l); - auto nw2 = _networks.find(networkId); - if (nw2 != _networks.end()) - nw = nw2->second; - } - if (nw) { - std::lock_guard<std::mutex> l(nw->lock); - if (OSUtils::jsonBool(config["activeBridge"],false)) - nw->activeBridgeMembers.erase(memberId); - wasAuth = OSUtils::jsonBool(config["authorized"],false); - if (wasAuth) - nw->authorizedMembers.erase(memberId); - json &ips = config["ipAssignments"]; - if (ips.is_array()) { - for(unsigned long i=0;i<ips.size();++i) { - json &ipj = ips[i]; - if (ipj.is_string()) { - const std::string ips = ipj; - InetAddress ipa(ips.c_str()); - ipa.setPort(0); - nw->allocatedIps.erase(ipa); - } - } - } - } - } - } - } - - if (member.is_object()) { - json &config = member["config"]; - if (config.is_object()) { - if (!nw) { - memberId = OSUtils::jsonIntHex(config["id"],0ULL); - networkId = OSUtils::jsonIntHex(config["nwid"],0ULL); - if ((!memberId)||(!networkId)) - return; - std::lock_guard<std::mutex> l(_networks_l); - std::shared_ptr<_Network> &nw2 = _networks[networkId]; - if (!nw2) - nw2.reset(new _Network); - nw = nw2; - } - - { - std::lock_guard<std::mutex> l(nw->lock); - - nw->members[memberId] = config; - - if (OSUtils::jsonBool(config["activeBridge"],false)) - nw->activeBridgeMembers.insert(memberId); - isAuth = OSUtils::jsonBool(config["authorized"],false); - if (isAuth) - nw->authorizedMembers.insert(memberId); - json &ips = config["ipAssignments"]; - if (ips.is_array()) { - for(unsigned long i=0;i<ips.size();++i) { - json &ipj = ips[i]; - if (ipj.is_string()) { - const std::string ips = ipj; - InetAddress ipa(ips.c_str()); - ipa.setPort(0); - nw->allocatedIps.insert(ipa); - } - } - } - - if (!isAuth) { - const int64_t ldt = (int64_t)OSUtils::jsonInt(config["lastDeauthorizedTime"],0ULL); - if (ldt > nw->mostRecentDeauthTime) - nw->mostRecentDeauthTime = ldt; - } - } - - _controller->onNetworkMemberUpdate(networkId,memberId); - } - } else if (memberId) { - if (nw) { - std::lock_guard<std::mutex> l(nw->lock); - nw->members.erase(memberId); - } - if (networkId) { - std::lock_guard<std::mutex> l(_networks_l); - auto er = _networkByMember.equal_range(memberId); - for(auto i=er.first;i!=er.second;++i) { - if (i->second == networkId) { - _networkByMember.erase(i); - break; - } - } - } - } - - if ((wasAuth)&&(!isAuth)&&(networkId)&&(memberId)) - _controller->onNetworkMemberDeauthorize(networkId,memberId); -} - -void RethinkDB::_networkChanged(nlohmann::json &old,nlohmann::json &network) -{ - if (network.is_object()) { - json &config = network["config"]; - if (config.is_object()) { - const std::string ids = config["id"]; - const uint64_t id = Utils::hexStrToU64(ids.c_str()); - if (id) { - std::shared_ptr<_Network> nw; - { - std::lock_guard<std::mutex> l(_networks_l); - std::shared_ptr<_Network> &nw2 = _networks[id]; - if (!nw2) - nw2.reset(new _Network); - nw = nw2; - } - { - std::lock_guard<std::mutex> l2(nw->lock); - nw->config = config; - } - _controller->onNetworkUpdate(id); - } - } - } else if (old.is_object()) { - const std::string ids = old["id"]; - const uint64_t id = Utils::hexStrToU64(ids.c_str()); - if (id) { - std::lock_guard<std::mutex> l(_networks_l); - _networks.erase(id); - } - } -} - } // namespace ZeroTier #endif // ZT_CONTROLLER_USE_RETHINKDB diff --git a/controller/RethinkDB.hpp b/controller/RethinkDB.hpp index 62586803..2d7ba58e 100644 --- a/controller/RethinkDB.hpp +++ b/controller/RethinkDB.hpp @@ -16,112 +16,35 @@ * along with this program. If not, see <http://www.gnu.org/licenses/>. */ +#define ZT_CONTROLLER_USE_RETHINKDB + #ifdef ZT_CONTROLLER_USE_RETHINKDB #ifndef ZT_CONTROLLER_RETHINKDB_HPP #define ZT_CONTROLLER_RETHINKDB_HPP -#include "../node/Constants.hpp" -#include "../node/Address.hpp" -#include "../node/InetAddress.hpp" -#include "../osdep/OSUtils.hpp" -#include "../osdep/BlockingQueue.hpp" - -#include <memory> -#include <string> -#include <thread> -#include <unordered_map> -#include <unordered_set> -#include <vector> -#include <atomic> - -#include "../ext/json/json.hpp" +#include "DB.hpp" #define ZT_CONTROLLER_RETHINKDB_COMMIT_THREADS 2 namespace ZeroTier { -class EmbeddedNetworkController; - -class RethinkDB +class RethinkDB : public DB { public: - struct NetworkSummaryInfo - { - NetworkSummaryInfo() : authorizedMemberCount(0),totalMemberCount(0),mostRecentDeauthTime(0) {} - std::vector<Address> activeBridges; - std::vector<InetAddress> allocatedIps; - unsigned long authorizedMemberCount; - unsigned long totalMemberCount; - int64_t mostRecentDeauthTime; - }; - RethinkDB(EmbeddedNetworkController *const nc,const Address &myAddress,const char *path); - ~RethinkDB(); - - inline void waitForReady() const - { - while (_ready > 0) { - if (!_waitNoticePrinted) { - _waitNoticePrinted = true; - fprintf(stderr,"NOTICE: controller RethinkDB waiting for initial data download..." ZT_EOL_S); - } - _readyLock.lock(); - _readyLock.unlock(); - } - } - - inline bool hasNetwork(const uint64_t networkId) const - { - std::lock_guard<std::mutex> l(_networks_l); - return (_networks.find(networkId) != _networks.end()); - } - - bool get(const uint64_t networkId,nlohmann::json &network); - bool get(const uint64_t networkId,nlohmann::json &network,const uint64_t memberId,nlohmann::json &member); - bool get(const uint64_t networkId,nlohmann::json &network,const uint64_t memberId,nlohmann::json &member,NetworkSummaryInfo &info); - bool get(const uint64_t networkId,nlohmann::json &network,std::vector<nlohmann::json> &members); - - bool summary(const uint64_t networkId,NetworkSummaryInfo &info); - - void networks(std::vector<uint64_t> &networks); - - void save(const nlohmann::json &record); - - void eraseNetwork(const uint64_t networkId); - void eraseMember(const uint64_t networkId,const uint64_t memberId); - -private: - struct _Network - { - _Network() : mostRecentDeauthTime(0) {} - nlohmann::json config; - std::unordered_map<uint64_t,nlohmann::json> members; - std::unordered_set<uint64_t> activeBridgeMembers; - std::unordered_set<uint64_t> authorizedMembers; - std::unordered_set<InetAddress,InetAddress::Hasher> allocatedIps; - int64_t mostRecentDeauthTime; - std::mutex lock; - }; - - void _memberChanged(nlohmann::json &old,nlohmann::json &member); - void _networkChanged(nlohmann::json &old,nlohmann::json &network); - - inline void _fillSummaryInfo(const std::shared_ptr<_Network> &nw,NetworkSummaryInfo &info) - { - for(auto ab=nw->activeBridgeMembers.begin();ab!=nw->activeBridgeMembers.end();++ab) - info.activeBridges.push_back(Address(*ab)); - for(auto ip=nw->allocatedIps.begin();ip!=nw->allocatedIps.end();++ip) - info.allocatedIps.push_back(*ip); - info.authorizedMemberCount = (unsigned long)nw->authorizedMembers.size(); - info.totalMemberCount = (unsigned long)nw->members.size(); - info.mostRecentDeauthTime = nw->mostRecentDeauthTime; - } - - EmbeddedNetworkController *const _controller; - const Address _myAddress; - std::string _myAddressStr; + virtual ~RethinkDB(); + + virtual void waitForReady() const; + + virtual void save(const nlohmann::json &record); + + virtual void eraseNetwork(const uint64_t networkId); + + virtual void eraseMember(const uint64_t networkId,const uint64_t memberId); + +protected: std::string _host; std::string _db; std::string _auth; @@ -132,10 +55,6 @@ private: std::thread _networksDbWatcher; std::thread _membersDbWatcher; - std::unordered_map< uint64_t,std::shared_ptr<_Network> > _networks; - std::unordered_multimap< uint64_t,uint64_t > _networkByMember; - mutable std::mutex _networks_l; - BlockingQueue< nlohmann::json * > _commitQueue; std::thread _commitThread[ZT_CONTROLLER_RETHINKDB_COMMIT_THREADS]; @@ -28,6 +28,8 @@ CORE_OBJS=\ ONE_OBJS=\ controller/EmbeddedNetworkController.o \ + controller/DB.o \ + controller/FileDB.o \ controller/RethinkDB.o \ osdep/ManagedRoute.o \ osdep/Http.o \ diff --git a/osdep/OSUtils.hpp b/osdep/OSUtils.hpp index fd1b31d9..274b48df 100644 --- a/osdep/OSUtils.hpp +++ b/osdep/OSUtils.hpp @@ -101,7 +101,6 @@ public: * @return True if delete was successful */ static inline bool rm(const char *path) - throw() { #ifdef __WINDOWS__ return (DeleteFileA(path) != FALSE); @@ -109,7 +108,7 @@ public: return (unlink(path) == 0); #endif } - static inline bool rm(const std::string &path) throw() { return rm(path.c_str()); } + static inline bool rm(const std::string &path) { return rm(path.c_str()); } static inline bool mkdir(const char *path) { @@ -123,7 +122,17 @@ public: return true; #endif } - static inline bool mkdir(const std::string &path) throw() { return OSUtils::mkdir(path.c_str()); } + static inline bool mkdir(const std::string &path) { return OSUtils::mkdir(path.c_str()); } + + static inline bool rename(const char *o,const char *n) + { +#ifdef __WINDOWS__ + DeleteFileA(n); + return (::rename(o,n) == 0); +#else + return (::rename(o,n) == 0); +#endif + } /** * List a directory's contents |