diff options
Diffstat (limited to 'controller')
| -rw-r--r-- | controller/DB.cpp | 59 | ||||
| -rw-r--r-- | controller/DB.hpp | 62 | ||||
| -rw-r--r-- | controller/EmbeddedNetworkController.cpp | 101 | ||||
| -rw-r--r-- | controller/EmbeddedNetworkController.hpp | 35 | ||||
| -rw-r--r-- | controller/FileDB.cpp | 114 | ||||
| -rw-r--r-- | controller/FileDB.hpp | 19 | ||||
| -rw-r--r-- | controller/LFDB.cpp | 400 | ||||
| -rw-r--r-- | controller/LFDB.hpp | 102 | ||||
| -rw-r--r-- | controller/PostgreSQL.cpp | 1571 | ||||
| -rw-r--r-- | controller/PostgreSQL.hpp | 117 | ||||
| -rw-r--r-- | controller/README.md | 4 | ||||
| -rw-r--r-- | controller/RabbitMQ.cpp | 107 | ||||
| -rw-r--r-- | controller/RabbitMQ.hpp | 79 | ||||
| -rw-r--r-- | controller/RethinkDB.cpp | 497 | ||||
| -rw-r--r-- | controller/RethinkDB.hpp | 84 |
15 files changed, 2663 insertions, 688 deletions
diff --git a/controller/DB.cpp b/controller/DB.cpp index 61eed0e9..bb734dc8 100644 --- a/controller/DB.cpp +++ b/controller/DB.cpp @@ -1,6 +1,6 @@ /* * ZeroTier One - Network Virtualization Everywhere - * Copyright (C) 2011-2018 ZeroTier, Inc. + * Copyright (C) 2011-2019 ZeroTier, Inc. https://www.zerotier.com/ * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by @@ -13,7 +13,15 @@ * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * -- + * + * You can be released from the requirements of the license by purchasing + * a commercial license. Buying such a license is mandatory as soon as you + * develop commercial closed-source software that incorporates or links + * directly against ZeroTier software without disclosing the source code + * of your own application. */ #include "DB.hpp" @@ -96,8 +104,7 @@ void DB::cleanMember(nlohmann::json &member) member.erase("lastRequestMetaData"); } -DB::DB(EmbeddedNetworkController *const nc,const Identity &myId,const char *path) : - _controller(nc), +DB::DB(const Identity &myId,const char *path) : _myId(myId), _myAddress(myId.address()), _path((path) ? path : "") @@ -107,9 +114,7 @@ DB::DB(EmbeddedNetworkController *const nc,const Identity &myId,const char *path _myAddressStr = tmp; } -DB::~DB() -{ -} +DB::~DB() {} bool DB::get(const uint64_t networkId,nlohmann::json &network) { @@ -221,7 +226,7 @@ void DB::networks(std::vector<uint64_t> &networks) networks.push_back(n->first); } -void DB::_memberChanged(nlohmann::json &old,nlohmann::json &memberConfig,bool push) +void DB::_memberChanged(nlohmann::json &old,nlohmann::json &memberConfig,bool initialized) { uint64_t memberId = 0; uint64_t networkId = 0; @@ -305,8 +310,12 @@ void DB::_memberChanged(nlohmann::json &old,nlohmann::json &memberConfig,bool pu } } - if (push) - _controller->onNetworkMemberUpdate(networkId,memberId); + if (initialized) { + std::lock_guard<std::mutex> ll(_changeListeners_l); + for(auto i=_changeListeners.begin();i!=_changeListeners.end();++i) { + (*i)->onNetworkMemberUpdate(networkId,memberId,memberConfig); + } + } } else if (memberId) { if (nw) { std::lock_guard<std::mutex> l(nw->lock); @@ -324,20 +333,24 @@ void DB::_memberChanged(nlohmann::json &old,nlohmann::json &memberConfig,bool pu } } - if ((push)&&((wasAuth)&&(!isAuth)&&(networkId)&&(memberId))) - _controller->onNetworkMemberDeauthorize(networkId,memberId); + if ((initialized)&&((wasAuth)&&(!isAuth)&&(networkId)&&(memberId))) { + std::lock_guard<std::mutex> ll(_changeListeners_l); + for(auto i=_changeListeners.begin();i!=_changeListeners.end();++i) { + (*i)->onNetworkMemberDeauthorize(networkId,memberId); + } + } } -void DB::_networkChanged(nlohmann::json &old,nlohmann::json &networkConfig,bool push) +void DB::_networkChanged(nlohmann::json &old,nlohmann::json &networkConfig,bool initialized) { if (networkConfig.is_object()) { const std::string ids = networkConfig["id"]; - const uint64_t id = Utils::hexStrToU64(ids.c_str()); - if (id) { + const uint64_t networkId = Utils::hexStrToU64(ids.c_str()); + if (networkId) { std::shared_ptr<_Network> nw; { std::lock_guard<std::mutex> l(_networks_l); - std::shared_ptr<_Network> &nw2 = _networks[id]; + std::shared_ptr<_Network> &nw2 = _networks[networkId]; if (!nw2) nw2.reset(new _Network); nw = nw2; @@ -346,15 +359,19 @@ void DB::_networkChanged(nlohmann::json &old,nlohmann::json &networkConfig,bool std::lock_guard<std::mutex> l2(nw->lock); nw->config = networkConfig; } - if (push) - _controller->onNetworkUpdate(id); + if (initialized) { + std::lock_guard<std::mutex> ll(_changeListeners_l); + for(auto i=_changeListeners.begin();i!=_changeListeners.end();++i) { + (*i)->onNetworkUpdate(networkId,networkConfig); + } + } } } else if (old.is_object()) { const std::string ids = old["id"]; - const uint64_t id = Utils::hexStrToU64(ids.c_str()); - if (id) { + const uint64_t networkId = Utils::hexStrToU64(ids.c_str()); + if (networkId) { std::lock_guard<std::mutex> l(_networks_l); - _networks.erase(id); + _networks.erase(networkId); } } } diff --git a/controller/DB.hpp b/controller/DB.hpp index 4b2940cd..f499d387 100644 --- a/controller/DB.hpp +++ b/controller/DB.hpp @@ -1,6 +1,6 @@ /* * ZeroTier One - Network Virtualization Everywhere - * Copyright (C) 2011-2018 ZeroTier, Inc. + * Copyright (C) 2011-2019 ZeroTier, Inc. https://www.zerotier.com/ * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by @@ -13,7 +13,15 @@ * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * -- + * + * You can be released from the requirements of the license by purchasing + * a commercial license. Buying such a license is mandatory as soon as you + * develop commercial closed-source software that incorporates or links + * directly against ZeroTier software without disclosing the source code + * of your own application. */ #ifndef ZT_CONTROLLER_DB_HPP @@ -32,22 +40,29 @@ #include <unordered_set> #include <vector> #include <atomic> +#include <mutex> #include "../ext/json/json.hpp" -#define ZT_CONTROLLER_RETHINKDB_COMMIT_THREADS 2 - namespace ZeroTier { -class EmbeddedNetworkController; - /** * Base class with common infrastructure for all controller DB implementations */ class DB { public: + class ChangeListener + { + public: + ChangeListener() {} + virtual ~ChangeListener() {} + virtual void onNetworkUpdate(uint64_t networkId,const nlohmann::json &network) {} + virtual void onNetworkMemberUpdate(uint64_t networkId,uint64_t memberId,const nlohmann::json &member) {} + virtual void onNetworkMemberDeauthorize(uint64_t networkId,uint64_t memberId) {} + }; + struct NetworkSummaryInfo { NetworkSummaryInfo() : authorizedMemberCount(0),totalMemberCount(0),mostRecentDeauthTime(0) {} @@ -58,27 +73,12 @@ public: int64_t mostRecentDeauthTime; }; - /** - * Ensure that all network fields are present - */ static void initNetwork(nlohmann::json &network); - - /** - * Ensure that all member fields are present - */ static void initMember(nlohmann::json &member); - - /** - * Remove old and temporary network fields - */ static void cleanNetwork(nlohmann::json &network); - - /** - * Remove old and temporary member fields - */ static void cleanMember(nlohmann::json &member); - DB(EmbeddedNetworkController *const nc,const Identity &myId,const char *path); + DB(const Identity &myId,const char *path); virtual ~DB(); virtual bool waitForReady() = 0; @@ -94,19 +94,20 @@ public: bool get(const uint64_t networkId,nlohmann::json &network,const uint64_t memberId,nlohmann::json &member); bool get(const uint64_t networkId,nlohmann::json &network,const uint64_t memberId,nlohmann::json &member,NetworkSummaryInfo &info); bool get(const uint64_t networkId,nlohmann::json &network,std::vector<nlohmann::json> &members); - bool summary(const uint64_t networkId,NetworkSummaryInfo &info); - void networks(std::vector<uint64_t> &networks); virtual void save(nlohmann::json *orig,nlohmann::json &record) = 0; - virtual void eraseNetwork(const uint64_t networkId) = 0; - virtual void eraseMember(const uint64_t networkId,const uint64_t memberId) = 0; - virtual void nodeIsOnline(const uint64_t networkId,const uint64_t memberId,const InetAddress &physicalAddress) = 0; + inline void addListener(DB::ChangeListener *const listener) + { + std::lock_guard<std::mutex> l(_changeListeners_l); + _changeListeners.push_back(listener); + } + protected: struct _Network { @@ -120,18 +121,19 @@ protected: std::mutex lock; }; - void _memberChanged(nlohmann::json &old,nlohmann::json &memberConfig,bool push); - void _networkChanged(nlohmann::json &old,nlohmann::json &networkConfig,bool push); + void _memberChanged(nlohmann::json &old,nlohmann::json &memberConfig,bool initialized); + void _networkChanged(nlohmann::json &old,nlohmann::json &networkConfig,bool initialized); void _fillSummaryInfo(const std::shared_ptr<_Network> &nw,NetworkSummaryInfo &info); - EmbeddedNetworkController *const _controller; const Identity _myId; const Address _myAddress; const std::string _path; std::string _myAddressStr; + std::vector<DB::ChangeListener *> _changeListeners; std::unordered_map< uint64_t,std::shared_ptr<_Network> > _networks; std::unordered_multimap< uint64_t,uint64_t > _networkByMember; + mutable std::mutex _changeListeners_l; mutable std::mutex _networks_l; }; diff --git a/controller/EmbeddedNetworkController.cpp b/controller/EmbeddedNetworkController.cpp index 6a4134c6..bf568527 100644 --- a/controller/EmbeddedNetworkController.cpp +++ b/controller/EmbeddedNetworkController.cpp @@ -1,6 +1,6 @@ /* * ZeroTier One - Network Virtualization Everywhere - * Copyright (C) 2011-2018 ZeroTier, Inc + * Copyright (C) 2011-2019 ZeroTier, Inc. https://www.zerotier.com/ * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by @@ -13,7 +13,15 @@ * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * -- + * + * You can be released from the requirements of the license by purchasing + * a commercial license. Buying such a license is mandatory as soon as you + * develop commercial closed-source software that incorporates or links + * directly against ZeroTier software without disclosing the source code + * of your own application. */ #include <stdint.h> @@ -38,6 +46,11 @@ #include "../version.h" #include "EmbeddedNetworkController.hpp" +#include "LFDB.hpp" +#include "FileDB.hpp" +#ifdef ZT_CONTROLLER_USE_LIBPQ +#include "PostgreSQL.hpp" +#endif #include "../node/Node.hpp" #include "../node/CertificateOfMembership.hpp" @@ -336,14 +349,14 @@ static bool _parseRule(json &r,ZT_VirtualNetworkRule &rule) } else if (t == "MATCH_IPV6_SOURCE") { rule.t |= ZT_NETWORK_RULE_MATCH_IPV6_SOURCE; InetAddress ip(OSUtils::jsonString(r["ip"],"::0").c_str()); - ZT_FAST_MEMCPY(rule.v.ipv6.ip,reinterpret_cast<struct sockaddr_in6 *>(&ip)->sin6_addr.s6_addr,16); + memcpy(rule.v.ipv6.ip,reinterpret_cast<struct sockaddr_in6 *>(&ip)->sin6_addr.s6_addr,16); rule.v.ipv6.mask = Utils::ntoh(reinterpret_cast<struct sockaddr_in6 *>(&ip)->sin6_port) & 0xff; if (rule.v.ipv6.mask > 128) rule.v.ipv6.mask = 128; return true; } else if (t == "MATCH_IPV6_DEST") { rule.t |= ZT_NETWORK_RULE_MATCH_IPV6_DEST; InetAddress ip(OSUtils::jsonString(r["ip"],"::0").c_str()); - ZT_FAST_MEMCPY(rule.v.ipv6.ip,reinterpret_cast<struct sockaddr_in6 *>(&ip)->sin6_addr.s6_addr,16); + memcpy(rule.v.ipv6.ip,reinterpret_cast<struct sockaddr_in6 *>(&ip)->sin6_addr.s6_addr,16); rule.v.ipv6.mask = Utils::ntoh(reinterpret_cast<struct sockaddr_in6 *>(&ip)->sin6_port) & 0xff; if (rule.v.ipv6.mask > 128) rule.v.ipv6.mask = 128; return true; @@ -456,11 +469,13 @@ static bool _parseRule(json &r,ZT_VirtualNetworkRule &rule) } // anonymous namespace -EmbeddedNetworkController::EmbeddedNetworkController(Node *node,const char *dbPath) : +EmbeddedNetworkController::EmbeddedNetworkController(Node *node,const char *dbPath, int listenPort, MQConfig *mqc) : _startTime(OSUtils::now()), + _listenPort(listenPort), _node(node), _path(dbPath), - _sender((NetworkController::Sender *)0) + _sender((NetworkController::Sender *)0), + _mqc(mqc) { } @@ -478,12 +493,51 @@ void EmbeddedNetworkController::init(const Identity &signingId,Sender *sender) _signingId = signingId; _sender = sender; _signingIdAddressString = signingId.address().toString(tmp); -#ifdef ZT_CONTROLLER_USE_RETHINKDB - if ((_path.length() > 10)&&(_path.substr(0,10) == "rethinkdb:")) - _db.reset(new RethinkDB(this,_signingId,_path.c_str())); - else // else use FileDB after endif + +#ifdef ZT_CONTROLLER_USE_LIBPQ + if ((_path.length() > 9)&&(_path.substr(0,9) == "postgres:")) { + _db.reset(new PostgreSQL(_signingId,_path.substr(9).c_str(), _listenPort, _mqc)); + } else { #endif - _db.reset(new FileDB(this,_signingId,_path.c_str())); + + std::string lfJSON; + OSUtils::readFile((_path + ZT_PATH_SEPARATOR_S ".." ZT_PATH_SEPARATOR_S "local.conf").c_str(),lfJSON); + if (lfJSON.length() > 0) { + nlohmann::json lfConfig(OSUtils::jsonParse(lfJSON)); + nlohmann::json &settings = lfConfig["settings"]; + if (settings.is_object()) { + nlohmann::json &controllerDb = settings["controllerDb"]; + if (controllerDb.is_object()) { + std::string type = controllerDb["type"]; + if (type == "lf") { + std::string lfOwner = controllerDb["owner"]; + std::string lfHost = controllerDb["host"]; + int lfPort = controllerDb["port"]; + bool storeOnlineState = controllerDb["storeOnlineState"]; + if ((lfOwner.length())&&(lfHost.length())&&(lfPort > 0)&&(lfPort < 65536)) { + std::size_t pubHdrLoc = lfOwner.find("Public: "); + if ((pubHdrLoc > 0)&&((pubHdrLoc + 8) < lfOwner.length())) { + std::string lfOwnerPublic = lfOwner.substr(pubHdrLoc + 8); + std::size_t pubHdrEnd = lfOwnerPublic.find_first_of("\n\r\t "); + if (pubHdrEnd != std::string::npos) { + lfOwnerPublic = lfOwnerPublic.substr(0,pubHdrEnd); + _db.reset(new LFDB(_signingId,_path.c_str(),lfOwner.c_str(),lfOwnerPublic.c_str(),lfHost.c_str(),lfPort,storeOnlineState)); + } + } + } + } + } + } + } + if (!_db) + _db.reset(new FileDB(_signingId,_path.c_str())); + + _db->addListener(this); + +#ifdef ZT_CONTROLLER_USE_LIBPQ + } +#endif + _db->waitForReady(); } @@ -1043,6 +1097,7 @@ unsigned int EmbeddedNetworkController::handleControlPlaneHttpDELETE( json network,member; _db->get(nwid,network,address,member); + _db->eraseMember(nwid, address); { std::lock_guard<std::mutex> l(_memberStatus_l); @@ -1135,7 +1190,7 @@ void EmbeddedNetworkController::handleRemoteTrace(const ZT_RemoteTrace &rt) } } -void EmbeddedNetworkController::onNetworkUpdate(const uint64_t networkId) +void EmbeddedNetworkController::onNetworkUpdate(const uint64_t networkId,const nlohmann::json &network) { // Send an update to all members of the network that are online const int64_t now = OSUtils::now(); @@ -1146,7 +1201,7 @@ void EmbeddedNetworkController::onNetworkUpdate(const uint64_t networkId) } } -void EmbeddedNetworkController::onNetworkMemberUpdate(const uint64_t networkId,const uint64_t memberId) +void EmbeddedNetworkController::onNetworkMemberUpdate(const uint64_t networkId,const uint64_t memberId,const nlohmann::json &member) { // Push update to member if online try { @@ -1511,13 +1566,13 @@ void EmbeddedNetworkController::_request( const std::string ips = ipAssignments[i]; InetAddress ip(ips.c_str()); - // IP assignments are only pushed if there is a corresponding local route. We also now get the netmask bits from - // this route, ignoring the netmask bits field of the assigned IP itself. Using that was worthless and a source - // of user error / poor UX. int routedNetmaskBits = -1; for(unsigned int rk=0;rk<nc->routeCount;++rk) { - if ( (!nc->routes[rk].via.ss_family) && (reinterpret_cast<const InetAddress *>(&(nc->routes[rk].target))->containsAddress(ip)) ) - routedNetmaskBits = reinterpret_cast<const InetAddress *>(&(nc->routes[rk].target))->netmaskBits(); + if (reinterpret_cast<const InetAddress *>(&(nc->routes[rk].target))->containsAddress(ip)) { + const int nb = (int)(reinterpret_cast<const InetAddress *>(&(nc->routes[rk].target))->netmaskBits()); + if (nb > routedNetmaskBits) + routedNetmaskBits = nb; + } } if (routedNetmaskBits >= 0) { @@ -1544,8 +1599,8 @@ void EmbeddedNetworkController::_request( InetAddress ipRangeEnd(OSUtils::jsonString(pool["ipRangeEnd"],"").c_str()); if ( (ipRangeStart.ss_family == AF_INET6) && (ipRangeEnd.ss_family == AF_INET6) ) { uint64_t s[2],e[2],x[2],xx[2]; - ZT_FAST_MEMCPY(s,ipRangeStart.rawIpData(),16); - ZT_FAST_MEMCPY(e,ipRangeEnd.rawIpData(),16); + memcpy(s,ipRangeStart.rawIpData(),16); + memcpy(e,ipRangeEnd.rawIpData(),16); s[0] = Utils::ntoh(s[0]); s[1] = Utils::ntoh(s[1]); e[0] = Utils::ntoh(e[0]); @@ -1609,18 +1664,20 @@ void EmbeddedNetworkController::_request( if ( (ipRangeStartIA.ss_family == AF_INET) && (ipRangeEndIA.ss_family == AF_INET) ) { uint32_t ipRangeStart = Utils::ntoh((uint32_t)(reinterpret_cast<struct sockaddr_in *>(&ipRangeStartIA)->sin_addr.s_addr)); uint32_t ipRangeEnd = Utils::ntoh((uint32_t)(reinterpret_cast<struct sockaddr_in *>(&ipRangeEndIA)->sin_addr.s_addr)); + if ((ipRangeEnd < ipRangeStart)||(ipRangeStart == 0)) continue; uint32_t ipRangeLen = ipRangeEnd - ipRangeStart; - + // Start with the LSB of the member's address uint32_t ipTrialCounter = (uint32_t)(identity.address().toInt() & 0xffffffff); for(uint32_t k=ipRangeStart,trialCount=0;((k<=ipRangeEnd)&&(trialCount < 1000));++k,++trialCount) { uint32_t ip = (ipRangeLen > 0) ? (ipRangeStart + (ipTrialCounter % ipRangeLen)) : ipRangeStart; ++ipTrialCounter; - if ((ip & 0x000000ff) == 0x000000ff) + if ((ip & 0x000000ff) == 0x000000ff) { continue; // don't allow addresses that end in .255 + } // Check if this IP is within a local-to-Ethernet routed network int routedNetmaskBits = -1; diff --git a/controller/EmbeddedNetworkController.hpp b/controller/EmbeddedNetworkController.hpp index 417005a4..7bc37be2 100644 --- a/controller/EmbeddedNetworkController.hpp +++ b/controller/EmbeddedNetworkController.hpp @@ -1,6 +1,6 @@ /* * ZeroTier One - Network Virtualization Everywhere - * Copyright (C) 2011-2018 ZeroTier, Inc. + * Copyright (C) 2011-2019 ZeroTier, Inc. https://www.zerotier.com/ * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by @@ -13,7 +13,15 @@ * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * -- + * + * You can be released from the requirements of the license by purchasing + * a commercial license. Buying such a license is mandatory as soon as you + * develop commercial closed-source software that incorporates or links + * directly against ZeroTier software without disclosing the source code + * of your own application. */ #ifndef ZT_SQLITENETWORKCONTROLLER_HPP @@ -43,23 +51,21 @@ #include "../ext/json/json.hpp" #include "DB.hpp" -#include "FileDB.hpp" -#ifdef ZT_CONTROLLER_USE_RETHINKDB -#include "RethinkDB.hpp" -#endif namespace ZeroTier { class Node; -class EmbeddedNetworkController : public NetworkController +struct MQConfig; + +class EmbeddedNetworkController : public NetworkController,public DB::ChangeListener { public: /** * @param node Parent node * @param dbPath Database path (file path or database credentials) */ - EmbeddedNetworkController(Node *node,const char *dbPath); + EmbeddedNetworkController(Node *node,const char *dbPath, int listenPort, MQConfig *mqc = NULL); virtual ~EmbeddedNetworkController(); virtual void init(const Identity &signingId,Sender *sender); @@ -95,10 +101,9 @@ public: void handleRemoteTrace(const ZT_RemoteTrace &rt); - // Called on update via POST or by JSONDB on external update of network or network member records - void onNetworkUpdate(const uint64_t networkId); - void onNetworkMemberUpdate(const uint64_t networkId,const uint64_t memberId); - void onNetworkMemberDeauthorize(const uint64_t networkId,const uint64_t memberId); + virtual void onNetworkUpdate(const uint64_t networkId,const nlohmann::json &network); + virtual void onNetworkMemberUpdate(const uint64_t networkId,const uint64_t memberId,const nlohmann::json &member); + virtual void onNetworkMemberDeauthorize(const uint64_t networkId,const uint64_t memberId); private: void _request(uint64_t nwid,const InetAddress &fromAddr,uint64_t requestPacketId,const Identity &identity,const Dictionary<ZT_NETWORKCONFIG_METADATA_DICT_CAPACITY> &metaData); @@ -141,17 +146,23 @@ private: }; const int64_t _startTime; + int _listenPort; Node *const _node; std::string _path; Identity _signingId; std::string _signingIdAddressString; NetworkController::Sender *_sender; + std::unique_ptr<DB> _db; BlockingQueue< _RQEntry * > _queue; + std::vector<std::thread> _threads; std::mutex _threads_l; + std::unordered_map< _MemberStatusKey,_MemberStatus,_MemberStatusHash > _memberStatus; std::mutex _memberStatus_l; + + MQConfig *_mqc; }; } // namespace ZeroTier diff --git a/controller/FileDB.cpp b/controller/FileDB.cpp index 8cbd60ce..7b997c49 100644 --- a/controller/FileDB.cpp +++ b/controller/FileDB.cpp @@ -1,6 +1,6 @@ /* * ZeroTier One - Network Virtualization Everywhere - * Copyright (C) 2011-2018 ZeroTier, Inc. + * Copyright (C) 2011-2019 ZeroTier, Inc. https://www.zerotier.com/ * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by @@ -13,7 +13,15 @@ * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * -- + * + * You can be released from the requirements of the license by purchasing + * a commercial license. Buying such a license is mandatory as soon as you + * develop commercial closed-source software that incorporates or links + * directly against ZeroTier software without disclosing the source code + * of your own application. */ #include "FileDB.hpp" @@ -21,10 +29,12 @@ namespace ZeroTier { -FileDB::FileDB(EmbeddedNetworkController *const nc,const Identity &myId,const char *path) : - DB(nc,myId,path), +FileDB::FileDB(const Identity &myId,const char *path) : + DB(myId,path), _networksPath(_path + ZT_PATH_SEPARATOR_S + "network"), - _tracePath(_path + ZT_PATH_SEPARATOR_S + "trace") + _tracePath(_path + ZT_PATH_SEPARATOR_S + "trace"), + _onlineChanged(false), + _running(true) { OSUtils::mkdir(_path.c_str()); OSUtils::lockDownFile(_path.c_str(),true); @@ -61,9 +71,65 @@ FileDB::FileDB(EmbeddedNetworkController *const nc,const Identity &myId,const ch } catch ( ... ) {} } } + + _onlineUpdateThread = std::thread([this]() { + unsigned int cnt = 0; + while (this->_running) { + std::this_thread::sleep_for(std::chrono::microseconds(100)); + if ((++cnt % 20) == 0) { // 5 seconds + std::lock_guard<std::mutex> l(this->_online_l); + if (!this->_running) return; + if (this->_onlineChanged) { + char p[4096],atmp[64]; + for(auto nw=this->_online.begin();nw!=this->_online.end();++nw) { + OSUtils::ztsnprintf(p,sizeof(p),"%s" ZT_PATH_SEPARATOR_S "%.16llx-online.json",_networksPath.c_str(),(unsigned long long)nw->first); + FILE *f = fopen(p,"wb"); + if (f) { + fprintf(f,"{"); + const char *memberPrefix = ""; + for(auto m=nw->second.begin();m!=nw->second.end();++m) { + fprintf(f,"%s\"%.10llx\":{" ZT_EOL_S,memberPrefix,(unsigned long long)m->first); + memberPrefix = ","; + InetAddress lastAddr; + const char *timestampPrefix = " "; + int cnt = 0; + for(auto ts=m->second.rbegin();ts!=m->second.rend();) { + if (cnt < 25) { + if (lastAddr != ts->second) { + lastAddr = ts->second; + fprintf(f,"%s\"%lld\":\"%s\"" ZT_EOL_S,timestampPrefix,(long long)ts->first,ts->second.toString(atmp)); + timestampPrefix = ","; + ++cnt; + ++ts; + } else { + ts = std::map<int64_t,InetAddress>::reverse_iterator(m->second.erase(std::next(ts).base())); + } + } else { + ts = std::map<int64_t,InetAddress>::reverse_iterator(m->second.erase(std::next(ts).base())); + } + } + fprintf(f,"}"); + } + fprintf(f,"}" ZT_EOL_S); + fclose(f); + } + } + this->_onlineChanged = false; + } + } + } + }); } -FileDB::~FileDB() {} +FileDB::~FileDB() +{ + try { + _online_l.lock(); + _running = false; + _online_l.unlock(); + _onlineUpdateThread.join(); + } catch ( ... ) {} +} bool FileDB::waitForReady() { return true; } bool FileDB::isReady() { return true; } @@ -86,14 +152,10 @@ void FileDB::save(nlohmann::json *orig,nlohmann::json &record) if (nwid) { nlohmann::json old; get(nwid,old); - if ((!old.is_object())||(old != record)) { - OSUtils::ztsnprintf(p1,sizeof(p1),"%s" ZT_PATH_SEPARATOR_S "%.16llx.json.new",_networksPath.c_str(),nwid); - OSUtils::ztsnprintf(p2,sizeof(p2),"%s" ZT_PATH_SEPARATOR_S "%.16llx.json",_networksPath.c_str(),nwid); + OSUtils::ztsnprintf(p1,sizeof(p1),"%s" ZT_PATH_SEPARATOR_S "%.16llx.json",_networksPath.c_str(),nwid); if (!OSUtils::writeFile(p1,OSUtils::jsonDump(record,-1))) fprintf(stderr,"WARNING: controller unable to write to path: %s" ZT_EOL_S,p1); - OSUtils::rename(p1,p2); - _networkChanged(old,record,true); } } @@ -103,10 +165,9 @@ void FileDB::save(nlohmann::json *orig,nlohmann::json &record) if ((id)&&(nwid)) { nlohmann::json network,old; get(nwid,network,id,old); - if ((!old.is_object())||(old != record)) { OSUtils::ztsnprintf(pb,sizeof(pb),"%s" ZT_PATH_SEPARATOR_S "%.16llx" ZT_PATH_SEPARATOR_S "member",_networksPath.c_str(),(unsigned long long)nwid); - OSUtils::ztsnprintf(p1,sizeof(p1),"%s" ZT_PATH_SEPARATOR_S "%.10llx.json.new",pb,(unsigned long long)id); + OSUtils::ztsnprintf(p1,sizeof(p1),"%s" ZT_PATH_SEPARATOR_S "%.10llx.json",pb,(unsigned long long)id); if (!OSUtils::writeFile(p1,OSUtils::jsonDump(record,-1))) { OSUtils::ztsnprintf(p2,sizeof(p2),"%s" ZT_PATH_SEPARATOR_S "%.16llx",_networksPath.c_str(),(unsigned long long)nwid); OSUtils::mkdir(p2); @@ -114,9 +175,6 @@ void FileDB::save(nlohmann::json *orig,nlohmann::json &record) if (!OSUtils::writeFile(p1,OSUtils::jsonDump(record,-1))) fprintf(stderr,"WARNING: controller unable to write to path: %s" ZT_EOL_S,p1); } - OSUtils::ztsnprintf(p2,sizeof(p2),"%s" ZT_PATH_SEPARATOR_S "%.10llx.json",pb,(unsigned long long)id); - OSUtils::rename(p1,p2); - _memberChanged(old,record,true); } } @@ -137,16 +195,38 @@ void FileDB::eraseNetwork(const uint64_t networkId) char p[16384]; OSUtils::ztsnprintf(p,sizeof(p),"%s" ZT_PATH_SEPARATOR_S "%.16llx.json",_networksPath.c_str(),networkId); OSUtils::rm(p); + OSUtils::ztsnprintf(p,sizeof(p),"%s" ZT_PATH_SEPARATOR_S "%.16llx-online.json",_networksPath.c_str(),networkId); + OSUtils::rm(p); + OSUtils::ztsnprintf(p,sizeof(p),"%s" ZT_PATH_SEPARATOR_S "%.16llx" ZT_PATH_SEPARATOR_S "member",_networksPath.c_str(),(unsigned long long)networkId); + OSUtils::rmDashRf(p); _networkChanged(network,nullJson,true); + std::lock_guard<std::mutex> l(this->_online_l); + this->_online.erase(networkId); + this->_onlineChanged = true; } void FileDB::eraseMember(const uint64_t networkId,const uint64_t memberId) { + nlohmann::json network,member,nullJson; + get(networkId,network); + get(memberId,member); + char p[4096]; + OSUtils::ztsnprintf(p,sizeof(p),"%s" ZT_PATH_SEPARATOR_S "%.16llx" ZT_PATH_SEPARATOR_S "member" ZT_PATH_SEPARATOR_S "%.10llx.json",_networksPath.c_str(),networkId,memberId); + OSUtils::rm(p); + _memberChanged(member,nullJson,true); + std::lock_guard<std::mutex> l(this->_online_l); + this->_online[networkId].erase(memberId); + this->_onlineChanged = true; } void FileDB::nodeIsOnline(const uint64_t networkId,const uint64_t memberId,const InetAddress &physicalAddress) { - // Nothing to do here right now in the filesystem store mode since we can just get this from the peer list + char mid[32],atmp[64]; + OSUtils::ztsnprintf(mid,sizeof(mid),"%.10llx",(unsigned long long)memberId); + physicalAddress.toString(atmp); + std::lock_guard<std::mutex> l(this->_online_l); + this->_online[networkId][memberId][OSUtils::now()] = physicalAddress; + this->_onlineChanged = true; } } // namespace ZeroTier diff --git a/controller/FileDB.hpp b/controller/FileDB.hpp index 1a3c12e9..5d55d0a4 100644 --- a/controller/FileDB.hpp +++ b/controller/FileDB.hpp @@ -1,6 +1,6 @@ /* * ZeroTier One - Network Virtualization Everywhere - * Copyright (C) 2011-2018 ZeroTier, Inc. + * Copyright (C) 2011-2019 ZeroTier, Inc. https://www.zerotier.com/ * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by @@ -13,7 +13,15 @@ * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * -- + * + * You can be released from the requirements of the license by purchasing + * a commercial license. Buying such a license is mandatory as soon as you + * develop commercial closed-source software that incorporates or links + * directly against ZeroTier software without disclosing the source code + * of your own application. */ #ifndef ZT_CONTROLLER_FILEDB_HPP @@ -27,7 +35,7 @@ namespace ZeroTier class FileDB : public DB { public: - FileDB(EmbeddedNetworkController *const nc,const Identity &myId,const char *path); + FileDB(const Identity &myId,const char *path); virtual ~FileDB(); virtual bool waitForReady(); @@ -40,6 +48,11 @@ public: protected: std::string _networksPath; std::string _tracePath; + std::thread _onlineUpdateThread; + std::map< uint64_t,std::map<uint64_t,std::map<int64_t,InetAddress> > > _online; + std::mutex _online_l; + bool _onlineChanged; + bool _running; }; } // namespace ZeroTier diff --git a/controller/LFDB.cpp b/controller/LFDB.cpp new file mode 100644 index 00000000..f0c8ebfb --- /dev/null +++ b/controller/LFDB.cpp @@ -0,0 +1,400 @@ +/* + * ZeroTier One - Network Virtualization Everywhere + * Copyright (C) 2011-2019 ZeroTier, Inc. https://www.zerotier.com/ + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * -- + * + * You can be released from the requirements of the license by purchasing + * a commercial license. Buying such a license is mandatory as soon as you + * develop commercial closed-source software that incorporates or links + * directly against ZeroTier software without disclosing the source code + * of your own application. + */ + +#include "LFDB.hpp" + +#include <thread> +#include <chrono> +#include <iostream> +#include <sstream> + +#include "../osdep/OSUtils.hpp" +#include "../ext/cpp-httplib/httplib.h" + +namespace ZeroTier +{ + +LFDB::LFDB(const Identity &myId,const char *path,const char *lfOwnerPrivate,const char *lfOwnerPublic,const char *lfNodeHost,int lfNodePort,bool storeOnlineState) : + DB(myId,path), + _myId(myId), + _lfOwnerPrivate((lfOwnerPrivate) ? lfOwnerPrivate : ""), + _lfOwnerPublic((lfOwnerPublic) ? lfOwnerPublic : ""), + _lfNodeHost((lfNodeHost) ? lfNodeHost : "127.0.0.1"), + _lfNodePort(((lfNodePort > 0)&&(lfNodePort < 65536)) ? lfNodePort : 9980), + _running(true), + _ready(false), + _storeOnlineState(storeOnlineState) +{ + _syncThread = std::thread([this]() { + char controllerAddress[24]; + const uint64_t controllerAddressInt = _myId.address().toInt(); + _myId.address().toString(controllerAddress); + std::string networksSelectorName("com.zerotier.controller.lfdb:"); networksSelectorName.append(controllerAddress); networksSelectorName.append("/network"); + std::string membersSelectorName("com.zerotier.controller.lfdb:"); membersSelectorName.append(controllerAddress); membersSelectorName.append("/member"); + + // LF record masking key is the first 32 bytes of SHA512(controller private key) in hex, + // hiding record values from anything but the controller or someone who has its key. + uint8_t sha512pk[64]; + _myId.sha512PrivateKey(sha512pk); + char maskingKey [128]; + Utils::hex(sha512pk,32,maskingKey); + + httplib::Client htcli(_lfNodeHost.c_str(),_lfNodePort,600); + int64_t timeRangeStart = 0; + while (_running) { + { + std::lock_guard<std::mutex> sl(_state_l); + for(auto ns=_state.begin();ns!=_state.end();++ns) { + if (ns->second.dirty) { + nlohmann::json network; + if (get(ns->first,network)) { + nlohmann::json newrec,selector0; + selector0["Name"] = networksSelectorName; + selector0["Ordinal"] = ns->first; + newrec["Selectors"].push_back(selector0); + newrec["Value"] = network.dump(); + newrec["OwnerPrivate"] = _lfOwnerPrivate; + newrec["MaskingKey"] = maskingKey; + newrec["PulseIfUnchanged"] = true; + auto resp = htcli.Post("/makerecord",newrec.dump(),"application/json"); + if (resp) { + if (resp->status == 200) { + ns->second.dirty = false; + printf("SET network %.16llx %s\n",ns->first,resp->body.c_str()); + } else { + fprintf(stderr,"ERROR: LFDB: %d from node (create/update network): %s" ZT_EOL_S,resp->status,resp->body.c_str()); + } + } else { + fprintf(stderr,"ERROR: LFDB: node is offline" ZT_EOL_S); + } + } + } + + for(auto ms=ns->second.members.begin();ms!=ns->second.members.end();++ms) { + if ((_storeOnlineState)&&(ms->second.lastOnlineDirty)&&(ms->second.lastOnlineAddress)) { + nlohmann::json newrec,selector0,selector1,selectors,ip; + char tmp[1024],tmp2[128]; + OSUtils::ztsnprintf(tmp,sizeof(tmp),"com.zerotier.controller.lfdb:%s/network/%.16llx/online",controllerAddress,(unsigned long long)ns->first); + ms->second.lastOnlineAddress.toIpString(tmp2); + selector0["Name"] = tmp; + selector0["Ordinal"] = ms->first; + selector1["Name"] = tmp2; + selector1["Ordinal"] = 0; + selectors.push_back(selector0); + selectors.push_back(selector1); + newrec["Selectors"] = selectors; + const uint8_t *const rawip = (const uint8_t *)ms->second.lastOnlineAddress.rawIpData(); + switch(ms->second.lastOnlineAddress.ss_family) { + case AF_INET: + for(int j=0;j<4;++j) + ip.push_back((unsigned int)rawip[j]); + break; + case AF_INET6: + for(int j=0;j<16;++j) + ip.push_back((unsigned int)rawip[j]); + break; + default: + ip = tmp2; // should never happen since only IP transport is currently supported + break; + } + newrec["Value"] = ip; + newrec["OwnerPrivate"] = _lfOwnerPrivate; + newrec["MaskingKey"] = maskingKey; + newrec["Timestamp"] = ms->second.lastOnlineTime; + newrec["PulseIfUnchanged"] = true; + auto resp = htcli.Post("/makerecord",newrec.dump(),"application/json"); + if (resp) { + if (resp->status == 200) { + ms->second.lastOnlineDirty = false; + printf("SET member online %.16llx %.10llx %s\n",ns->first,ms->first,resp->body.c_str()); + } else { + fprintf(stderr,"ERROR: LFDB: %d from node (create/update member online status): %s" ZT_EOL_S,resp->status,resp->body.c_str()); + } + } else { + fprintf(stderr,"ERROR: LFDB: node is offline" ZT_EOL_S); + } + } + + if (ms->second.dirty) { + nlohmann::json network,member; + if (get(ns->first,network,ms->first,member)) { + nlohmann::json newrec,selector0,selector1,selectors; + selector0["Name"] = networksSelectorName; + selector0["Ordinal"] = ns->first; + selector1["Name"] = membersSelectorName; + selector1["Ordinal"] = ms->first; + selectors.push_back(selector0); + selectors.push_back(selector1); + newrec["Selectors"] = selectors; + newrec["Value"] = member.dump(); + newrec["OwnerPrivate"] = _lfOwnerPrivate; + newrec["MaskingKey"] = maskingKey; + newrec["PulseIfUnchanged"] = true; + auto resp = htcli.Post("/makerecord",newrec.dump(),"application/json"); + if (resp) { + if (resp->status == 200) { + ms->second.dirty = false; + printf("SET member %.16llx %.10llx %s\n",ns->first,ms->first,resp->body.c_str()); + } else { + fprintf(stderr,"ERROR: LFDB: %d from node (create/update member): %s" ZT_EOL_S,resp->status,resp->body.c_str()); + } + } else { + fprintf(stderr,"ERROR: LFDB: node is offline" ZT_EOL_S); + } + } + } + } + } + } + + { + std::ostringstream query; + query + << '{' + << "\"Ranges\":[{" + << "\"Name\":\"" << networksSelectorName << "\"," + << "\"Range\":[0,18446744073709551615]" + << "}]," + << "\"TimeRange\":[" << timeRangeStart << ",18446744073709551615]," + << "\"MaskingKey\":\"" << maskingKey << "\"," + << "\"Owners\":[\"" << _lfOwnerPublic << "\"]" + << '}'; + auto resp = htcli.Post("/query",query.str(),"application/json"); + if (resp) { + if (resp->status == 200) { + nlohmann::json results(OSUtils::jsonParse(resp->body)); + if ((results.is_array())&&(results.size() > 0)) { + for(std::size_t ri=0;ri<results.size();++ri) { + nlohmann::json &rset = results[ri]; + if ((rset.is_array())&&(rset.size() > 0)) { + nlohmann::json &result = rset[0]; + if (result.is_object()) { + nlohmann::json &record = result["Record"]; + if (record.is_object()) { + const std::string recordValue = result["Value"]; + printf("GET network %s\n",recordValue.c_str()); + nlohmann::json network(OSUtils::jsonParse(recordValue)); + if (network.is_object()) { + const std::string idstr = network["id"]; + const uint64_t id = Utils::hexStrToU64(idstr.c_str()); + if ((id >> 24) == controllerAddressInt) { // sanity check + + std::lock_guard<std::mutex> sl(_state_l); + _NetworkState &ns = _state[id]; + if (!ns.dirty) { + nlohmann::json oldNetwork; + if (get(id,oldNetwork)) { + const uint64_t revision = network["revision"]; + const uint64_t prevRevision = oldNetwork["revision"]; + if (prevRevision < revision) { + _networkChanged(oldNetwork,network,timeRangeStart > 0); + } + } else { + nlohmann::json nullJson; + _networkChanged(nullJson,network,timeRangeStart > 0); + } + } + + } + } + } + } + } + } + } + } else { + fprintf(stderr,"ERROR: LFDB: %d from node: %s" ZT_EOL_S,resp->status,resp->body.c_str()); + } + } else { + fprintf(stderr,"ERROR: LFDB: node is offline" ZT_EOL_S); + } + } + + { + std::ostringstream query; + query + << '{' + << "\"Ranges\":[{" + << "\"Name\":\"" << networksSelectorName << "\"," + << "\"Range\":[0,18446744073709551615]" + << "},{" + << "\"Name\":\"" << membersSelectorName << "\"," + << "\"Range\":[0,18446744073709551615]" + << "}]," + << "\"TimeRange\":[" << timeRangeStart << ",18446744073709551615]," + << "\"MaskingKey\":\"" << maskingKey << "\"," + << "\"Owners\":[\"" << _lfOwnerPublic << "\"]" + << '}'; + auto resp = htcli.Post("/query",query.str(),"application/json"); + if (resp) { + if (resp->status == 200) { + nlohmann::json results(OSUtils::jsonParse(resp->body)); + if ((results.is_array())&&(results.size() > 0)) { + for(std::size_t ri=0;ri<results.size();++ri) { + nlohmann::json &rset = results[ri]; + if ((rset.is_array())&&(rset.size() > 0)) { + nlohmann::json &result = rset[0]; + if (result.is_object()) { + nlohmann::json &record = result["Record"]; + if (record.is_object()) { + const std::string recordValue = result["Value"]; + printf("GET member %s\n",recordValue.c_str()); + nlohmann::json member(OSUtils::jsonParse(recordValue)); + if (member.is_object()) { + const std::string nwidstr = member["nwid"]; + const std::string idstr = member["id"]; + const uint64_t nwid = Utils::hexStrToU64(nwidstr.c_str()); + const uint64_t id = Utils::hexStrToU64(idstr.c_str()); + if ((id)&&((nwid >> 24) == controllerAddressInt)) { // sanity check + + std::lock_guard<std::mutex> sl(_state_l); + auto ns = _state.find(nwid); + if ((ns == _state.end())||(!ns->second.members[id].dirty)) { + nlohmann::json network,oldMember; + if (get(nwid,network,id,oldMember)) { + const uint64_t revision = member["revision"]; + const uint64_t prevRevision = oldMember["revision"]; + if (prevRevision < revision) + _memberChanged(oldMember,member,timeRangeStart > 0); + } + } else { + nlohmann::json nullJson; + _memberChanged(nullJson,member,timeRangeStart > 0); + } + + } + } + } + } + } + } + } + } else { + fprintf(stderr,"ERROR: LFDB: %d from node: %s" ZT_EOL_S,resp->status,resp->body.c_str()); + } + } else { + fprintf(stderr,"ERROR: LFDB: node is offline" ZT_EOL_S); + } + } + + timeRangeStart = time(nullptr) - 120; // start next query 2m before now to avoid losing updates + _ready = true; + + for(int k=0;k<20;++k) { // 2s delay between queries for remotely modified networks or members + if (!_running) + return; + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + } + }); +} + +LFDB::~LFDB() +{ + _running = false; + _syncThread.join(); +} + +bool LFDB::waitForReady() +{ + while (!_ready) { + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + return true; +} + +bool LFDB::isReady() +{ + return (_ready); +} + +void LFDB::save(nlohmann::json *orig,nlohmann::json &record) +{ + if (orig) { + if (*orig != record) { + record["revision"] = OSUtils::jsonInt(record["revision"],0ULL) + 1; + } + } else { + record["revision"] = 1; + } + + const std::string objtype = record["objtype"]; + if (objtype == "network") { + const uint64_t nwid = OSUtils::jsonIntHex(record["id"],0ULL); + if (nwid) { + nlohmann::json old; + get(nwid,old); + if ((!old.is_object())||(old != record)) { + _networkChanged(old,record,true); + { + std::lock_guard<std::mutex> l(_state_l); + _state[nwid].dirty = true; + } + } + } + } else if (objtype == "member") { + const uint64_t nwid = OSUtils::jsonIntHex(record["nwid"],0ULL); + const uint64_t id = OSUtils::jsonIntHex(record["id"],0ULL); + if ((id)&&(nwid)) { + nlohmann::json network,old; + get(nwid,network,id,old); + if ((!old.is_object())||(old != record)) { + _memberChanged(old,record,true); + { + std::lock_guard<std::mutex> l(_state_l); + _state[nwid].members[id].dirty = true; + } + } + } + } +} + +void LFDB::eraseNetwork(const uint64_t networkId) +{ + // TODO +} + +void LFDB::eraseMember(const uint64_t networkId,const uint64_t memberId) +{ + // TODO +} + +void LFDB::nodeIsOnline(const uint64_t networkId,const uint64_t memberId,const InetAddress &physicalAddress) +{ + std::lock_guard<std::mutex> l(_state_l); + auto nw = _state.find(networkId); + if (nw != _state.end()) { + auto m = nw->second.members.find(memberId); + if (m != nw->second.members.end()) { + m->second.lastOnlineTime = OSUtils::now(); + if (physicalAddress) + m->second.lastOnlineAddress = physicalAddress; + m->second.lastOnlineDirty = true; + } + } +} + +} // namespace ZeroTier diff --git a/controller/LFDB.hpp b/controller/LFDB.hpp new file mode 100644 index 00000000..73187462 --- /dev/null +++ b/controller/LFDB.hpp @@ -0,0 +1,102 @@ +/* + * ZeroTier One - Network Virtualization Everywhere + * Copyright (C) 2011-2019 ZeroTier, Inc. https://www.zerotier.com/ + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * -- + * + * You can be released from the requirements of the license by purchasing + * a commercial license. Buying such a license is mandatory as soon as you + * develop commercial closed-source software that incorporates or links + * directly against ZeroTier software without disclosing the source code + * of your own application. + */ + +#ifndef ZT_CONTROLLER_LFDB_HPP +#define ZT_CONTROLLER_LFDB_HPP + +#include "DB.hpp" + +#include <mutex> +#include <string> +#include <unordered_map> +#include <atomic> + +namespace ZeroTier { + +/** + * DB implementation for controller that stores data in LF + */ +class LFDB : public DB +{ +public: + /** + * @param myId Identity of controller node (with secret) + * @param path Base path for ZeroTier node itself + * @param lfOwnerPrivate LF owner private in PEM format + * @param lfOwnerPublic LF owner public in @base62 format + * @param lfNodeHost LF node host + * @param lfNodePort LF node http (not https) port + * @param storeOnlineState If true, store online/offline state and IP info in LF (a lot of data, only for private networks!) + */ + LFDB(const Identity &myId,const char *path,const char *lfOwnerPrivate,const char *lfOwnerPublic,const char *lfNodeHost,int lfNodePort,bool storeOnlineState); + virtual ~LFDB(); + + virtual bool waitForReady(); + virtual bool isReady(); + virtual void save(nlohmann::json *orig,nlohmann::json &record); + virtual void eraseNetwork(const uint64_t networkId); + virtual void eraseMember(const uint64_t networkId,const uint64_t memberId); + virtual void nodeIsOnline(const uint64_t networkId,const uint64_t memberId,const InetAddress &physicalAddress); + +protected: + const Identity _myId; + + std::string _lfOwnerPrivate,_lfOwnerPublic; + std::string _lfNodeHost; + int _lfNodePort; + + struct _MemberState + { + _MemberState() : + lastOnlineAddress(), + lastOnlineTime(0), + dirty(false), + lastOnlineDirty(false) {} + InetAddress lastOnlineAddress; + int64_t lastOnlineTime; + bool dirty; + bool lastOnlineDirty; + }; + struct _NetworkState + { + _NetworkState() : + members(), + dirty(false) {} + std::unordered_map<uint64_t,_MemberState> members; + bool dirty; + }; + std::unordered_map<uint64_t,_NetworkState> _state; + std::mutex _state_l; + + std::atomic_bool _running; + std::atomic_bool _ready; + std::thread _syncThread; + bool _storeOnlineState; +}; + +} // namespace ZeroTier + +#endif diff --git a/controller/PostgreSQL.cpp b/controller/PostgreSQL.cpp new file mode 100644 index 00000000..c6b9ada4 --- /dev/null +++ b/controller/PostgreSQL.cpp @@ -0,0 +1,1571 @@ +/* + * ZeroTier One - Network Virtualization Everywhere + * Copyright (C) 2011-2019 ZeroTier, Inc. https://www.zerotier.com/ + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * -- + * + * You can be released from the requirements of the license by purchasing + * a commercial license. Buying such a license is mandatory as soon as you + * develop commercial closed-source software that incorporates or links + * directly against ZeroTier software without disclosing the source code + * of your own application. + */ + +#ifdef ZT_CONTROLLER_USE_LIBPQ + +#include "PostgreSQL.hpp" +#include "EmbeddedNetworkController.hpp" +#include "RabbitMQ.hpp" +#include "../version.h" + +#include <libpq-fe.h> +#include <sstream> +#include <amqp.h> +#include <amqp_tcp_socket.h> + +using json = nlohmann::json; +namespace { + +static const int DB_MINIMUM_VERSION = 5; + +static const char *_timestr() +{ + time_t t = time(0); + char *ts = ctime(&t); + char *p = ts; + if (!p) + return ""; + while (*p) { + if (*p == '\n') { + *p = (char)0; + break; + } + ++p; + } + return ts; +} + +std::string join(const std::vector<std::string> &elements, const char * const separator) +{ + switch(elements.size()) { + case 0: + return ""; + case 1: + return elements[0]; + default: + std::ostringstream os; + std::copy(elements.begin(), elements.end()-1, std::ostream_iterator<std::string>(os, separator)); + os << *elements.rbegin(); + return os.str(); + } +} + +} + +using namespace ZeroTier; + +PostgreSQL::PostgreSQL(const Identity &myId, const char *path, int listenPort, MQConfig *mqc) + : DB(myId, path) + , _ready(0) + , _connected(1) + , _run(1) + , _waitNoticePrinted(false) + , _listenPort(listenPort) + , _mqc(mqc) +{ + _connString = std::string(path) + " application_name=controller_" +_myAddressStr; + + // Database Schema Version Check + PGconn *conn = getPgConn(); + if (PQstatus(conn) != CONNECTION_OK) { + fprintf(stderr, "Bad Database Connection: %s", PQerrorMessage(conn)); + exit(1); + } + + PGresult *res = PQexec(conn, "SELECT version FROM ztc_database"); + if (PQresultStatus(res) != PGRES_TUPLES_OK) { + fprintf(stderr, "Error determining database version"); + exit(1); + } + + if (PQntuples(res) != 1) { + fprintf(stderr, "Invalid number of db version tuples returned."); + exit(1); + } + + int dbVersion = std::stoi(PQgetvalue(res, 0, 0)); + + if (dbVersion < DB_MINIMUM_VERSION) { + fprintf(stderr, "Central database schema version too low. This controller version requires a minimum schema version of %d. Please upgrade your Central instance", DB_MINIMUM_VERSION); + exit(1); + } + + PQclear(res); + res = NULL; + PQfinish(conn); + conn = NULL; + + _readyLock.lock(); + _heartbeatThread = std::thread(&PostgreSQL::heartbeat, this); + _membersDbWatcher = std::thread(&PostgreSQL::membersDbWatcher, this); + _networksDbWatcher = std::thread(&PostgreSQL::networksDbWatcher, this); + for (int i = 0; i < ZT_CENTRAL_CONTROLLER_COMMIT_THREADS; ++i) { + _commitThread[i] = std::thread(&PostgreSQL::commitThread, this); + } + _onlineNotificationThread = std::thread(&PostgreSQL::onlineNotificationThread, this); +} + +PostgreSQL::~PostgreSQL() +{ + _run = 0; + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + + _heartbeatThread.join(); + _membersDbWatcher.join(); + _networksDbWatcher.join(); + for (int i = 0; i < ZT_CENTRAL_CONTROLLER_COMMIT_THREADS; ++i) { + _commitThread[i].join(); + } + _onlineNotificationThread.join(); + +} + + +bool PostgreSQL::waitForReady() +{ + while (_ready < 2) { + if (!_waitNoticePrinted) { + _waitNoticePrinted = true; + fprintf(stderr, "[%s] NOTICE: %.10llx controller PostgreSQL waiting for initial data download..." ZT_EOL_S, ::_timestr(), (unsigned long long)_myAddress.toInt()); + } + _readyLock.lock(); + _readyLock.unlock(); + } + return true; +} + +bool PostgreSQL::isReady() +{ + return ((_ready == 2)&&(_connected)); +} + +void PostgreSQL::save(nlohmann::json *orig, nlohmann::json &record) +{ + try { + if (!record.is_object()) { + return; + } + waitForReady(); + if (orig) { + if (*orig != record) { + record["revision"] = OSUtils::jsonInt(record["revision"],0ULL) + 1; + _commitQueue.post(new nlohmann::json(record)); + } + } else { + record["revision"] = 1; + _commitQueue.post(new nlohmann::json(record)); + } + } catch (std::exception &e) { + fprintf(stderr, "Error on PostgreSQL::save: %s\n", e.what()); + } catch (...) { + fprintf(stderr, "Unknown error on PostgreSQL::save\n"); + } +} + +void PostgreSQL::eraseNetwork(const uint64_t networkId) +{ + char tmp2[24]; + waitForReady(); + Utils::hex(networkId, tmp2); + json *tmp = new json(); + (*tmp)["id"] = tmp2; + (*tmp)["objtype"] = "_delete_network"; + _commitQueue.post(tmp); +} + +void PostgreSQL::eraseMember(const uint64_t networkId, const uint64_t memberId) +{ + char tmp2[24]; + json *tmp = new json(); + Utils::hex(networkId, tmp2); + (*tmp)["nwid"] = tmp2; + Utils::hex(memberId, tmp2); + (*tmp)["id"] = tmp2; + (*tmp)["objtype"] = "_delete_member"; + _commitQueue.post(tmp); +} + +void PostgreSQL::nodeIsOnline(const uint64_t networkId, const uint64_t memberId, const InetAddress &physicalAddress) +{ + std::lock_guard<std::mutex> l(_lastOnline_l); + std::pair<int64_t, InetAddress> &i = _lastOnline[std::pair<uint64_t,uint64_t>(networkId, memberId)]; + i.first = OSUtils::now(); + if (physicalAddress) { + i.second = physicalAddress; + } +} + +void PostgreSQL::initializeNetworks(PGconn *conn) +{ + try { + if (PQstatus(conn) != CONNECTION_OK) { + fprintf(stderr, "Bad Database Connection: %s", PQerrorMessage(conn)); + exit(1); + } + + const char *params[1] = { + _myAddressStr.c_str() + }; + + PGresult *res = PQexecParams(conn, "SELECT id, EXTRACT(EPOCH FROM creation_time AT TIME ZONE 'UTC')*1000, capabilities, " + "enable_broadcast, EXTRACT(EPOCH FROM last_modified AT TIME ZONE 'UTC')*1000, mtu, multicast_limit, name, private, remote_trace_level, " + "remote_trace_target, revision, rules, tags, v4_assign_mode, v6_assign_mode FROM ztc_network " + "WHERE deleted = false AND controller_id = $1", + 1, + NULL, + params, + NULL, + NULL, + 0); + + if (PQresultStatus(res) != PGRES_TUPLES_OK) { + fprintf(stderr, "Networks Initialization Failed: %s", PQerrorMessage(conn)); + PQclear(res); + exit(1); + } + + int numRows = PQntuples(res); + for (int i = 0; i < numRows; ++i) { + json empty; + json config; + + const char *nwidparam[1] = { + PQgetvalue(res, i, 0) + }; + + config["id"] = PQgetvalue(res, i, 0); + config["nwid"] = PQgetvalue(res, i, 0); + try { + config["creationTime"] = std::stoull(PQgetvalue(res, i, 1)); + } catch (std::exception &e) { + config["creationTime"] = 0ULL; + //fprintf(stderr, "Error converting creation time: %s\n", PQgetvalue(res, i, 1)); + } + config["capabilities"] = json::parse(PQgetvalue(res, i, 2)); + config["enableBroadcast"] = (strcmp(PQgetvalue(res, i, 3),"t")==0); + try { + config["lastModified"] = std::stoull(PQgetvalue(res, i, 4)); + } catch (std::exception &e) { + config["lastModified"] = 0ULL; + //fprintf(stderr, "Error converting last modified: %s\n", PQgetvalue(res, i, 4)); + } + try { + config["mtu"] = std::stoi(PQgetvalue(res, i, 5)); + } catch (std::exception &e) { + config["mtu"] = 2800; + } + try { + config["multicastLimit"] = std::stoi(PQgetvalue(res, i, 6)); + } catch (std::exception &e) { + config["multicastLimit"] = 64; + } + config["name"] = PQgetvalue(res, i, 7); + config["private"] = (strcmp(PQgetvalue(res, i, 8),"t")==0); + try { + config["remoteTraceLevel"] = std::stoi(PQgetvalue(res, i, 9)); + } catch (std::exception &e) { + config["remoteTraceLevel"] = 0; + } + config["remoteTraceTarget"] = PQgetvalue(res, i, 10); + try { + config["revision"] = std::stoull(PQgetvalue(res, i, 11)); + } catch (std::exception &e) { + config["revision"] = 0ULL; + //fprintf(stderr, "Error converting revision: %s\n", PQgetvalue(res, i, 11)); + } + config["rules"] = json::parse(PQgetvalue(res, i, 12)); + config["tags"] = json::parse(PQgetvalue(res, i, 13)); + config["v4AssignMode"] = json::parse(PQgetvalue(res, i, 14)); + config["v6AssignMode"] = json::parse(PQgetvalue(res, i, 15)); + config["objtype"] = "network"; + config["ipAssignmentPools"] = json::array(); + config["routes"] = json::array(); + + PGresult *r2 = PQexecParams(conn, + "SELECT host(ip_range_start), host(ip_range_end) FROM ztc_network_assignment_pool WHERE network_id = $1", + 1, + NULL, + nwidparam, + NULL, + NULL, + 0); + + if (PQresultStatus(r2) != PGRES_TUPLES_OK) { + fprintf(stderr, "ERROR: Error retreiving IP pools for network: %s\n", PQresultErrorMessage(r2)); + PQclear(r2); + PQclear(res); + exit(1); + } + + int n = PQntuples(r2); + for (int j = 0; j < n; ++j) { + json ip; + ip["ipRangeStart"] = PQgetvalue(r2, j, 0); + ip["ipRangeEnd"] = PQgetvalue(r2, j, 1); + + config["ipAssignmentPools"].push_back(ip); + } + + PQclear(r2); + + r2 = PQexecParams(conn, + "SELECT host(address), bits, host(via) FROM ztc_network_route WHERE network_id = $1", + 1, + NULL, + nwidparam, + NULL, + NULL, + 0); + + if (PQresultStatus(r2) != PGRES_TUPLES_OK) { + fprintf(stderr, "ERROR: Error retreiving routes for network: %s\n", PQresultErrorMessage(r2)); + PQclear(r2); + PQclear(res); + exit(1); + } + + n = PQntuples(r2); + for (int j = 0; j < n; ++j) { + std::string addr = PQgetvalue(r2, j, 0); + std::string bits = PQgetvalue(r2, j, 1); + std::string via = PQgetvalue(r2, j, 2); + json route; + route["target"] = addr + "/" + bits; + + if (via == "NULL") { + route["via"] = nullptr; + } else { + route["via"] = via; + } + config["routes"].push_back(route); + } + + PQclear(r2); + + _networkChanged(empty, config, false); + } + + PQclear(res); + + if (++this->_ready == 2) { + if (_waitNoticePrinted) { + fprintf(stderr,"[%s] NOTICE: %.10llx controller PostgreSQL data download complete." ZT_EOL_S,_timestr(),(unsigned long long)_myAddress.toInt()); + } + _readyLock.unlock(); + } + } catch (std::exception &e) { + fprintf(stderr, "ERROR: Error initializing networks: %s", e.what()); + exit(-1); + } +} + +void PostgreSQL::initializeMembers(PGconn *conn) +{ + try { + if (PQstatus(conn) != CONNECTION_OK) { + fprintf(stderr, "Bad Database Connection: %s", PQerrorMessage(conn)); + exit(1); + } + + const char *params[1] = { + _myAddressStr.c_str() + }; + + PGresult *res = PQexecParams(conn, + "SELECT m.id, m.network_id, m.active_bridge, m.authorized, m.capabilities, EXTRACT(EPOCH FROM m.creation_time AT TIME ZONE 'UTC')*1000, m.identity, " + " EXTRACT(EPOCH FROM m.last_authorized_time AT TIME ZONE 'UTC')*1000, " + " EXTRACT(EPOCH FROM m.last_deauthorized_time AT TIME ZONE 'UTC')*1000, " + " m.remote_trace_level, m.remote_trace_target, m.tags, m.v_major, m.v_minor, m.v_rev, m.v_proto, " + " m.no_auto_assign_ips, m.revision " + "FROM ztc_member m " + "INNER JOIN ztc_network n " + " ON n.id = m.network_id " + "WHERE n.controller_id = $1 AND m.deleted = false", + 1, + NULL, + params, + NULL, + NULL, + 0); + + if (PQresultStatus(res) != PGRES_TUPLES_OK) { + fprintf(stderr, "Member Initialization Failed: %s", PQerrorMessage(conn)); + PQclear(res); + exit(1); + } + + int numRows = PQntuples(res); + for (int i = 0; i < numRows; ++i) { + json empty; + json config; + + std::string memberId(PQgetvalue(res, i, 0)); + std::string networkId(PQgetvalue(res, i, 1)); + std::string ctime = PQgetvalue(res, i, 5); + config["id"] = memberId; + config["nwid"] = networkId; + config["activeBridge"] = (strcmp(PQgetvalue(res, i, 2), "t") == 0); + config["authorized"] = (strcmp(PQgetvalue(res, i, 3), "t") == 0); + try { + config["capabilities"] = json::parse(PQgetvalue(res, i, 4)); + } catch (std::exception &e) { + config["capabilities"] = json::array(); + } + try { + config["creationTime"] = std::stoull(PQgetvalue(res, i, 5)); + } catch (std::exception &e) { + config["creationTime"] = 0ULL; + //fprintf(stderr, "Error upding creation time (member): %s\n", PQgetvalue(res, i, 5)); + } + config["identity"] = PQgetvalue(res, i, 6); + try { + config["lastAuthorizedTime"] = std::stoull(PQgetvalue(res, i, 7)); + } catch(std::exception &e) { + config["lastAuthorizedTime"] = 0ULL; + //fprintf(stderr, "Error updating last auth time (member): %s\n", PQgetvalue(res, i, 7)); + } + try { + config["lastDeauthorizedTime"] = std::stoull(PQgetvalue(res, i, 8)); + } catch( std::exception &e) { + config["lastDeauthorizedTime"] = 0ULL; + //fprintf(stderr, "Error updating last deauth time (member): %s\n", PQgetvalue(res, i, 8)); + } + try { + config["remoteTraceLevel"] = std::stoi(PQgetvalue(res, i, 9)); + } catch (std::exception &e) { + config["remoteTraceLevel"] = 0; + } + config["remoteTraceTarget"] = PQgetvalue(res, i, 10); + try { + config["tags"] = json::parse(PQgetvalue(res, i, 11)); + } catch (std::exception &e) { + config["tags"] = json::array(); + } + try { + config["vMajor"] = std::stoi(PQgetvalue(res, i, 12)); + } catch(std::exception &e) { + config["vMajor"] = -1; + } + try { + config["vMinor"] = std::stoi(PQgetvalue(res, i, 13)); + } catch (std::exception &e) { + config["vMinor"] = -1; + } + try { + config["vRev"] = std::stoi(PQgetvalue(res, i, 14)); + } catch (std::exception &e) { + config["vRev"] = -1; + } + try { + config["vProto"] = std::stoi(PQgetvalue(res, i, 15)); + } catch (std::exception &e) { + config["vProto"] = -1; + } + config["noAutoAssignIps"] = (strcmp(PQgetvalue(res, i, 16), "t") == 0); + try { + config["revision"] = std::stoull(PQgetvalue(res, i, 17)); + } catch (std::exception &e) { + config["revision"] = 0ULL; + //fprintf(stderr, "Error updating revision (member): %s\n", PQgetvalue(res, i, 17)); + } + config["objtype"] = "member"; + config["ipAssignments"] = json::array(); + const char *p2[2] = { + memberId.c_str(), + networkId.c_str() + }; + + PGresult *r2 = PQexecParams(conn, + "SELECT DISTINCT address FROM ztc_member_ip_assignment WHERE member_id = $1 AND network_id = $2", + 2, + NULL, + p2, + NULL, + NULL, + 0); + + if (PQresultStatus(r2) != PGRES_TUPLES_OK) { + fprintf(stderr, "Member Initialization Failed: %s", PQerrorMessage(conn)); + PQclear(r2); + PQclear(res); + exit(1); + } + + int n = PQntuples(r2); + for (int j = 0; j < n; ++j) { + config["ipAssignments"].push_back(PQgetvalue(r2, j, 0)); + } + + _memberChanged(empty, config, false); + } + + PQclear(res); + + if (++this->_ready == 2) { + if (_waitNoticePrinted) { + fprintf(stderr,"[%s] NOTICE: %.10llx controller PostgreSQL data download complete." ZT_EOL_S,_timestr(),(unsigned long long)_myAddress.toInt()); + } + _readyLock.unlock(); + } + } catch (std::exception &e) { + fprintf(stderr, "ERROR: Error initializing members: %s\n", e.what()); + exit(-1); + } +} + +void PostgreSQL::heartbeat() +{ + char publicId[1024]; + char hostnameTmp[1024]; + _myId.toString(false,publicId); + if (gethostname(hostnameTmp, sizeof(hostnameTmp))!= 0) { + hostnameTmp[0] = (char)0; + } else { + for (int i = 0; i < sizeof(hostnameTmp); ++i) { + if ((hostnameTmp[i] == '.')||(hostnameTmp[i] == 0)) { + hostnameTmp[i] = (char)0; + break; + } + } + } + const char *controllerId = _myAddressStr.c_str(); + const char *publicIdentity = publicId; + const char *hostname = hostnameTmp; + + PGconn *conn = getPgConn(); + if (PQstatus(conn) == CONNECTION_BAD) { + fprintf(stderr, "Connection to database failed: %s\n", PQerrorMessage(conn)); + PQfinish(conn); + exit(1); + } + while (_run == 1) { + if(PQstatus(conn) != CONNECTION_OK) { + fprintf(stderr, "%s heartbeat thread lost connection to Database\n", _myAddressStr.c_str()); + PQfinish(conn); + exit(6); + } + if (conn) { + std::string major = std::to_string(ZEROTIER_ONE_VERSION_MAJOR); + std::string minor = std::to_string(ZEROTIER_ONE_VERSION_MINOR); + std::string rev = std::to_string(ZEROTIER_ONE_VERSION_REVISION); + std::string build = std::to_string(ZEROTIER_ONE_VERSION_BUILD); + std::string now = std::to_string(OSUtils::now()); + std::string host_port = std::to_string(_listenPort); + std::string use_rabbitmq = (_mqc != NULL) ? "true" : "false"; + const char *values[10] = { + controllerId, + hostname, + now.c_str(), + publicIdentity, + major.c_str(), + minor.c_str(), + rev.c_str(), + build.c_str(), + host_port.c_str(), + use_rabbitmq.c_str() + }; + + PGresult *res = PQexecParams(conn, + "INSERT INTO ztc_controller (id, cluster_host, last_alive, public_identity, v_major, v_minor, v_rev, v_build, host_port, use_rabbitmq) " + "VALUES ($1, $2, TO_TIMESTAMP($3::double precision/1000), $4, $5, $6, $7, $8, $9, $10) " + "ON CONFLICT (id) DO UPDATE SET cluster_host = EXCLUDED.cluster_host, last_alive = EXCLUDED.last_alive, " + "public_identity = EXCLUDED.public_identity, v_major = EXCLUDED.v_major, v_minor = EXCLUDED.v_minor, " + "v_rev = EXCLUDED.v_rev, v_build = EXCLUDED.v_rev, host_port = EXCLUDED.host_port, " + "use_rabbitmq = EXCLUDED.use_rabbitmq", + 10, // number of parameters + NULL, // oid field. ignore + values, // values for substitution + NULL, // lengths in bytes of each value + NULL, // binary? + 0); + + if (PQresultStatus(res) != PGRES_COMMAND_OK) { + fprintf(stderr, "Heartbeat Update Failed: %s\n", PQresultErrorMessage(res)); + } + PQclear(res); + } + + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + } + + PQfinish(conn); + conn = NULL; +} + +void PostgreSQL::membersDbWatcher() +{ + PGconn *conn = getPgConn(NO_OVERRIDE); + if (PQstatus(conn) == CONNECTION_BAD) { + fprintf(stderr, "Connection to database failed: %s\n", PQerrorMessage(conn)); + PQfinish(conn); + exit(1); + } + + initializeMembers(conn); + + if (this->_mqc != NULL) { + PQfinish(conn); + conn = NULL; + _membersWatcher_RabbitMQ(); + } else { + _membersWatcher_Postgres(conn); + PQfinish(conn); + conn = NULL; + } + + if (_run == 1) { + fprintf(stderr, "ERROR: %s membersDbWatcher should still be running! Exiting Controller.\n", _myAddressStr.c_str()); + exit(9); + } + fprintf(stderr, "Exited membersDbWatcher\n"); +} + +void PostgreSQL::_membersWatcher_Postgres(PGconn *conn) { + char buf[11] = {0}; + std::string cmd = "LISTEN member_" + std::string(_myAddress.toString(buf)); + PGresult *res = PQexec(conn, cmd.c_str()); + if (!res || PQresultStatus(res) != PGRES_COMMAND_OK) { + fprintf(stderr, "LISTEN command failed: %s\n", PQresultErrorMessage(res)); + PQclear(res); + PQfinish(conn); + exit(1); + } + + PQclear(res); res = NULL; + + while(_run == 1) { + if (PQstatus(conn) != CONNECTION_OK) { + fprintf(stderr, "ERROR: Member Watcher lost connection to Postgres."); + exit(-1); + } + PGnotify *notify = NULL; + PQconsumeInput(conn); + while ((notify = PQnotifies(conn)) != NULL) { + //fprintf(stderr, "ASYNC NOTIFY of '%s' id:%s received\n", notify->relname, notify->extra); + + try { + json tmp(json::parse(notify->extra)); + json &ov = tmp["old_val"]; + json &nv = tmp["new_val"]; + json oldConfig, newConfig; + if (ov.is_object()) oldConfig = ov; + if (nv.is_object()) newConfig = nv; + if (oldConfig.is_object() || newConfig.is_object()) { + _memberChanged(oldConfig,newConfig,(this->_ready>=2)); + } + } catch (...) {} // ignore bad records + + free(notify); + } + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } +} + +void PostgreSQL::_membersWatcher_RabbitMQ() { + char buf[11] = {0}; + std::string qname = "member_"+ std::string(_myAddress.toString(buf)); + RabbitMQ rmq(_mqc, qname.c_str()); + try { + rmq.init(); + } catch (std::runtime_error &e) { + fprintf(stderr, "RABBITMQ ERROR: %s\n", e.what()); + exit(11); + } + while (_run == 1) { + try { + std::string msg = rmq.consume(); + // fprintf(stderr, "Got Member Update: %s\n", msg.c_str()); + if (msg.empty()) { + continue; + } + json tmp(json::parse(msg)); + json &ov = tmp["old_val"]; + json &nv = tmp["new_val"]; + json oldConfig, newConfig; + if (ov.is_object()) oldConfig = ov; + if (nv.is_object()) newConfig = nv; + if (oldConfig.is_object() || newConfig.is_object()) { + _memberChanged(oldConfig,newConfig,(this->_ready>=2)); + } + } catch (std::runtime_error &e) { + fprintf(stderr, "RABBITMQ ERROR member change: %s\n", e.what()); + break; + } catch(std::exception &e ) { + fprintf(stderr, "RABBITMQ ERROR member change: %s\n", e.what()); + } catch(...) { + fprintf(stderr, "RABBITMQ ERROR member change: unknown error\n"); + } + } +} + +void PostgreSQL::networksDbWatcher() +{ + PGconn *conn = getPgConn(NO_OVERRIDE); + if (PQstatus(conn) == CONNECTION_BAD) { + fprintf(stderr, "Connection to database failed: %s\n", PQerrorMessage(conn)); + PQfinish(conn); + exit(1); + } + + initializeNetworks(conn); + + if (this->_mqc != NULL) { + PQfinish(conn); + conn = NULL; + _networksWatcher_RabbitMQ(); + } else { + _networksWatcher_Postgres(conn); + PQfinish(conn); + conn = NULL; + } + + if (_run == 1) { + fprintf(stderr, "ERROR: %s networksDbWatcher should still be running! Exiting Controller.\n", _myAddressStr.c_str()); + exit(8); + } + fprintf(stderr, "Exited membersDbWatcher\n"); +} + +void PostgreSQL::_networksWatcher_Postgres(PGconn *conn) { + char buf[11] = {0}; + std::string cmd = "LISTEN network_" + std::string(_myAddress.toString(buf)); + PGresult *res = PQexec(conn, cmd.c_str()); + if (!res || PQresultStatus(res) != PGRES_COMMAND_OK) { + fprintf(stderr, "LISTEN command failed: %s\n", PQresultErrorMessage(res)); + PQclear(res); + PQfinish(conn); + exit(1); + } + + PQclear(res); res = NULL; + + while(_run == 1) { + if (PQstatus(conn) != CONNECTION_OK) { + fprintf(stderr, "ERROR: Network Watcher lost connection to Postgres."); + exit(-1); + } + PGnotify *notify = NULL; + PQconsumeInput(conn); + while ((notify = PQnotifies(conn)) != NULL) { + //fprintf(stderr, "ASYNC NOTIFY of '%s' id:%s received\n", notify->relname, notify->extra); + try { + json tmp(json::parse(notify->extra)); + json &ov = tmp["old_val"]; + json &nv = tmp["new_val"]; + json oldConfig, newConfig; + if (ov.is_object()) oldConfig = ov; + if (nv.is_object()) newConfig = nv; + if (oldConfig.is_object()||newConfig.is_object()) { + _networkChanged(oldConfig,newConfig,(this->_ready >= 2)); + } + } catch (...) {} // ignore bad records + free(notify); + } + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } +} + +void PostgreSQL::_networksWatcher_RabbitMQ() { + char buf[11] = {0}; + std::string qname = "network_"+ std::string(_myAddress.toString(buf)); + RabbitMQ rmq(_mqc, qname.c_str()); + try { + rmq.init(); + } catch (std::runtime_error &e) { + fprintf(stderr, "RABBITMQ ERROR: %s\n", e.what()); + exit(11); + } + while (_run == 1) { + try { + std::string msg = rmq.consume(); + if (msg.empty()) { + continue; + } + // fprintf(stderr, "Got network update: %s\n", msg.c_str()); + json tmp(json::parse(msg)); + json &ov = tmp["old_val"]; + json &nv = tmp["new_val"]; + json oldConfig, newConfig; + if (ov.is_object()) oldConfig = ov; + if (nv.is_object()) newConfig = nv; + if (oldConfig.is_object()||newConfig.is_object()) { + _networkChanged(oldConfig,newConfig,(this->_ready >= 2)); + } + } catch (std::runtime_error &e) { + fprintf(stderr, "RABBITMQ ERROR: %s\n", e.what()); + break; + } catch (std::exception &e) { + fprintf(stderr, "RABBITMQ ERROR network watcher: %s\n", e.what()); + } catch(...) { + fprintf(stderr, "RABBITMQ ERROR network watcher: unknown error\n"); + } + } +} + +void PostgreSQL::commitThread() +{ + PGconn *conn = getPgConn(); + if (PQstatus(conn) == CONNECTION_BAD) { + fprintf(stderr, "ERROR: Connection to database failed: %s\n", PQerrorMessage(conn)); + PQfinish(conn); + exit(1); + } + + json *config = nullptr; + while(_commitQueue.get(config)&(_run == 1)) { + if (!config) { + continue; + } + if (PQstatus(conn) == CONNECTION_BAD) { + fprintf(stderr, "ERROR: Connection to database failed: %s\n", PQerrorMessage(conn)); + PQfinish(conn); + delete config; + exit(1); + } + try { + const std::string objtype = (*config)["objtype"]; + if (objtype == "member") { + try { + std::string memberId = (*config)["id"]; + std::string networkId = (*config)["nwid"]; + std::string identity = (*config)["identity"]; + std::string target = "NULL"; + + if (!(*config)["remoteTraceTarget"].is_null()) { + target = (*config)["remoteTraceTarget"]; + } + + std::string caps = OSUtils::jsonDump((*config)["capabilities"], -1); + std::string lastAuthTime = std::to_string((long long)(*config)["lastAuthorizedTime"]); + std::string lastDeauthTime = std::to_string((long long)(*config)["lastDeauthorizedTime"]); + std::string rtraceLevel = std::to_string((int)(*config)["remoteTraceLevel"]); + std::string rev = std::to_string((unsigned long long)(*config)["revision"]); + std::string tags = OSUtils::jsonDump((*config)["tags"], -1); + std::string vmajor = std::to_string((int)(*config)["vMajor"]); + std::string vminor = std::to_string((int)(*config)["vMinor"]); + std::string vrev = std::to_string((int)(*config)["vRev"]); + std::string vproto = std::to_string((int)(*config)["vProto"]); + const char *values[19] = { + memberId.c_str(), + networkId.c_str(), + ((*config)["activeBridge"] ? "true" : "false"), + ((*config)["authorized"] ? "true" : "false"), + caps.c_str(), + identity.c_str(), + lastAuthTime.c_str(), + lastDeauthTime.c_str(), + ((*config)["noAutoAssignIps"] ? "true" : "false"), + rtraceLevel.c_str(), + (target == "NULL") ? NULL : target.c_str(), + rev.c_str(), + tags.c_str(), + vmajor.c_str(), + vminor.c_str(), + vrev.c_str(), + vproto.c_str() + }; + + PGresult *res = PQexecParams(conn, + "INSERT INTO ztc_member (id, network_id, active_bridge, authorized, capabilities, " + "identity, last_authorized_time, last_deauthorized_time, no_auto_assign_ips, " + "remote_trace_level, remote_trace_target, revision, tags, v_major, v_minor, v_rev, v_proto) " + "VALUES ($1, $2, $3, $4, $5, $6, " + "TO_TIMESTAMP($7::double precision/1000), TO_TIMESTAMP($8::double precision/1000), " + "$9, $10, $11, $12, $13, $14, $15, $16, $17) ON CONFLICT (network_id, id) DO UPDATE SET " + "active_bridge = EXCLUDED.active_bridge, authorized = EXCLUDED.authorized, capabilities = EXCLUDED.capabilities, " + "identity = EXCLUDED.identity, last_authorized_time = EXCLUDED.last_authorized_time, " + "last_deauthorized_time = EXCLUDED.last_deauthorized_time, no_auto_assign_ips = EXCLUDED.no_auto_assign_ips, " + "remote_trace_level = EXCLUDED.remote_trace_level, remote_trace_target = EXCLUDED.remote_trace_target, " + "revision = EXCLUDED.revision+1, tags = EXCLUDED.tags, v_major = EXCLUDED.v_major, " + "v_minor = EXCLUDED.v_minor, v_rev = EXCLUDED.v_rev, v_proto = EXCLUDED.v_proto", + 17, + NULL, + values, + NULL, + NULL, + 0); + + if (PQresultStatus(res) != PGRES_COMMAND_OK) { + fprintf(stderr, "ERROR: Error updating member: %s\n", PQresultErrorMessage(res)); + fprintf(stderr, "%s", OSUtils::jsonDump(*config, 2).c_str()); + PQclear(res); + delete config; + config = nullptr; + continue; + } + + PQclear(res); + + res = PQexec(conn, "BEGIN"); + if (PQresultStatus(res) != PGRES_COMMAND_OK) { + fprintf(stderr, "ERROR: Error beginning transaction: %s\n", PQresultErrorMessage(res)); + PQclear(res); + delete config; + config = nullptr; + continue; + } + + PQclear(res); + + const char *v2[2] = { + memberId.c_str(), + networkId.c_str() + }; + + res = PQexecParams(conn, + "DELETE FROM ztc_member_ip_assignment WHERE member_id = $1 AND network_id = $2", + 2, + NULL, + v2, + NULL, + NULL, + 0); + + if (PQresultStatus(res) != PGRES_COMMAND_OK) { + fprintf(stderr, "ERROR: Error updating IP address assignments: %s\n", PQresultErrorMessage(res)); + PQclear(res); + PQclear(PQexec(conn, "ROLLBACK"));; + delete config; + config = nullptr; + continue; + } + + PQclear(res); + + std::vector<std::string> assignments; + for (auto i = (*config)["ipAssignments"].begin(); i != (*config)["ipAssignments"].end(); ++i) { + std::string addr = *i; + + if (std::find(assignments.begin(), assignments.end(), addr) != assignments.end()) { + continue; + } + + const char *v3[3] = { + memberId.c_str(), + networkId.c_str(), + addr.c_str() + }; + + res = PQexecParams(conn, + "INSERT INTO ztc_member_ip_assignment (member_id, network_id, address) VALUES ($1, $2, $3)", + 3, + NULL, + v3, + NULL, + NULL, + 0); + + if (PQresultStatus(res) != PGRES_COMMAND_OK) { + fprintf(stderr, "ERROR: Error setting IP addresses for member: %s\n", PQresultErrorMessage(res)); + PQclear(res); + PQclear(PQexec(conn, "ROLLBACK")); + break;; + } + } + + res = PQexec(conn, "COMMIT"); + if (PQresultStatus(res) != PGRES_COMMAND_OK) { + fprintf(stderr, "ERROR: Error committing ip address data: %s\n", PQresultErrorMessage(res)); + } + + PQclear(res); + + const uint64_t nwidInt = OSUtils::jsonIntHex((*config)["nwid"], 0ULL); + const uint64_t memberidInt = OSUtils::jsonIntHex((*config)["id"], 0ULL); + if (nwidInt && memberidInt) { + nlohmann::json nwOrig; + nlohmann::json memOrig; + + nlohmann::json memNew(*config); + + get(nwidInt, nwOrig, memberidInt, memOrig); + + _memberChanged(memOrig, memNew, (this->_ready>=2)); + } else { + fprintf(stderr, "Can't notify of change. Error parsing nwid or memberid: %lu-%lu\n", nwidInt, memberidInt); + } + + } catch (std::exception &e) { + fprintf(stderr, "ERROR: Error updating member: %s\n", e.what()); + } + } else if (objtype == "network") { + try { + std::string id = (*config)["id"]; + std::string controllerId = _myAddressStr.c_str(); + std::string name = (*config)["name"]; + std::string remoteTraceTarget("NULL"); + if (!(*config)["remoteTraceTarget"].is_null()) { + remoteTraceTarget = (*config)["remoteTraceTarget"]; + } + std::string rulesSource = (*config)["rulesSource"]; + std::string caps = OSUtils::jsonDump((*config)["capabilitles"], -1); + std::string now = std::to_string(OSUtils::now()); + std::string mtu = std::to_string((int)(*config)["mtu"]); + std::string mcastLimit = std::to_string((int)(*config)["multicastLimit"]); + std::string rtraceLevel = std::to_string((int)(*config)["remoteTraceLevel"]); + std::string rules = OSUtils::jsonDump((*config)["rules"], -1); + std::string tags = OSUtils::jsonDump((*config)["tags"], -1); + std::string v4mode = OSUtils::jsonDump((*config)["v4AssignMode"],-1); + std::string v6mode = OSUtils::jsonDump((*config)["v6AssignMode"], -1); + bool enableBroadcast = (*config)["enableBroadcast"]; + bool isPrivate = (*config)["private"]; + + const char *values[16] = { + id.c_str(), + controllerId.c_str(), + caps.c_str(), + enableBroadcast ? "true" : "false", + now.c_str(), + mtu.c_str(), + mcastLimit.c_str(), + name.c_str(), + isPrivate ? "true" : "false", + rtraceLevel.c_str(), + (remoteTraceTarget == "NULL" ? NULL : remoteTraceTarget.c_str()), + rules.c_str(), + rulesSource.c_str(), + tags.c_str(), + v4mode.c_str(), + v6mode.c_str(), + }; + + PGresult *res = PQexecParams(conn, + "UPDATE ztc_network SET controller_id = $2, capabilities = $3, enable_broadcast = $4, " + "last_updated = $5, mtu = $6, multicast_limit = $7, name = $8, private = $9, " + "remote_trace_level = $10, remote_trace_target = $11, rules = $12, rules_source = $13, " + "tags = $14, v4_assign_mode = $15, v6_assign_mode = $16 " + "WHERE id = $1", + 16, + NULL, + values, + NULL, + NULL, + 0); + + if (PQresultStatus(res) != PGRES_COMMAND_OK) { + fprintf(stderr, "ERROR: Error updating network record: %s\n", PQresultErrorMessage(res)); + PQclear(res); + delete config; + config = nullptr; + continue; + } + + PQclear(res); + + res = PQexec(conn, "BEGIN"); + if (PQresultStatus(res) != PGRES_COMMAND_OK) { + fprintf(stderr, "ERROR: Error beginnning transaction: %s\n", PQresultErrorMessage(res)); + PQclear(res); + delete config; + config = nullptr; + continue; + } + + PQclear(res); + + const char *params[1] = { + id.c_str() + }; + res = PQexecParams(conn, + "DELETE FROM ztc_network_assignment_pool WHERE network_id = $1", + 1, + NULL, + params, + NULL, + NULL, + 0); + if (PQresultStatus(res) != PGRES_COMMAND_OK) { + fprintf(stderr, "ERROR: Error updating assignment pool: %s\n", PQresultErrorMessage(res)); + PQclear(res); + PQclear(PQexec(conn, "ROLLBACK")); + delete config; + config = nullptr; + continue; + } + + PQclear(res); + + auto pool = (*config)["ipAssignmentPools"]; + bool err = false; + for (auto i = pool.begin(); i != pool.end(); ++i) { + std::string start = (*i)["ipRangeStart"]; + std::string end = (*i)["ipRangeEnd"]; + const char *p[3] = { + id.c_str(), + start.c_str(), + end.c_str() + }; + + res = PQexecParams(conn, + "INSERT INTO ztc_network_assignment_pool (network_id, ip_range_start, ip_range_end) " + "VALUES ($1, $2, $3)", + 3, + NULL, + p, + NULL, + NULL, + 0); + if (PQresultStatus(res) != PGRES_COMMAND_OK) { + fprintf(stderr, "ERROR: Error updating assignment pool: %s\n", PQresultErrorMessage(res)); + PQclear(res); + err = true; + break; + } + PQclear(res); + } + if (err) { + PQclear(PQexec(conn, "ROLLBACK")); + delete config; + config = nullptr; + continue; + } + + res = PQexecParams(conn, + "DELETE FROM ztc_network_route WHERE network_id = $1", + 1, + NULL, + params, + NULL, + NULL, + 0); + + if (PQresultStatus(res) != PGRES_COMMAND_OK) { + fprintf(stderr, "ERROR: Error updating routes: %s\n", PQresultErrorMessage(res)); + PQclear(res); + PQclear(PQexec(conn, "ROLLBACK")); + delete config; + config = nullptr; + continue; + } + + + auto routes = (*config)["routes"]; + err = false; + for (auto i = routes.begin(); i != routes.end(); ++i) { + std::string t = (*i)["target"]; + std::vector<std::string> target; + std::istringstream f(t); + std::string s; + while(std::getline(f, s, '/')) { + target.push_back(s); + } + if (target.empty() || target.size() != 2) { + continue; + } + std::string targetAddr = target[0]; + std::string targetBits = target[1]; + std::string via = "NULL"; + if (!(*i)["via"].is_null()) { + via = (*i)["via"]; + } + + const char *p[4] = { + id.c_str(), + targetAddr.c_str(), + targetBits.c_str(), + (via == "NULL" ? NULL : via.c_str()), + }; + + res = PQexecParams(conn, + "INSERT INTO ztc_network_route (network_id, address, bits, via) VALUES ($1, $2, $3, $4)", + 4, + NULL, + p, + NULL, + NULL, + 0); + + if (PQresultStatus(res) != PGRES_COMMAND_OK) { + fprintf(stderr, "ERROR: Error updating routes: %s\n", PQresultErrorMessage(res)); + PQclear(res); + err = true; + break; + } + PQclear(res); + } + if (err) { + PQclear(PQexec(conn, "ROLLBACK")); + delete config; + config = nullptr; + continue; + } + + res = PQexec(conn, "COMMIT"); + if (PQresultStatus(res) != PGRES_COMMAND_OK) { + fprintf(stderr, "ERROR: Error committing network update: %s\n", PQresultErrorMessage(res)); + } + PQclear(res); + + const uint64_t nwidInt = OSUtils::jsonIntHex((*config)["nwid"], 0ULL); + if (nwidInt) { + nlohmann::json nwOrig; + nlohmann::json nwNew(*config); + + get(nwidInt, nwOrig); + + _networkChanged(nwOrig, nwNew, true); + } else { + fprintf(stderr, "Can't notify network changed: %lu\n", nwidInt); + } + + } catch (std::exception &e) { + fprintf(stderr, "ERROR: Error updating member: %s\n", e.what()); + } + } else if (objtype == "trace") { + fprintf(stderr, "ERROR: Trace not yet implemented"); + } else if (objtype == "_delete_network") { + try { + std::string networkId = (*config)["nwid"]; + const char *values[1] = { + networkId.c_str() + }; + PGresult * res = PQexecParams(conn, + "UPDATE ztc_network SET deleted = true WHERE id = $1", + 1, + NULL, + values, + NULL, + NULL, + 0); + + if (PQresultStatus(res) != PGRES_COMMAND_OK) { + fprintf(stderr, "ERROR: Error deleting network: %s\n", PQresultErrorMessage(res)); + } + + PQclear(res); + } catch (std::exception &e) { + fprintf(stderr, "ERROR: Error deleting network: %s\n", e.what()); + } + } else if (objtype == "_delete_member") { + try { + std::string memberId = (*config)["id"]; + std::string networkId = (*config)["nwid"]; + + const char *values[2] = { + memberId.c_str(), + networkId.c_str() + }; + + PGresult *res = PQexecParams(conn, + "UPDATE ztc_member SET hidden = true, deleted = true WHERE id = $1 AND network_id = $2", + 2, + NULL, + values, + NULL, + NULL, + 0); + + if (PQresultStatus(res) != PGRES_COMMAND_OK) { + fprintf(stderr, "ERROR: Error deleting member: %s\n", PQresultErrorMessage(res)); + } + + PQclear(res); + } catch (std::exception &e) { + fprintf(stderr, "ERROR: Error deleting member: %s\n", e.what()); + } + } else { + fprintf(stderr, "ERROR: unknown objtype"); + } + } catch (std::exception &e) { + fprintf(stderr, "ERROR: Error getting objtype: %s\n", e.what()); + } + delete config; + config = nullptr; + + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + + PQfinish(conn); + if (_run == 1) { + fprintf(stderr, "ERROR: %s commitThread should still be running! Exiting Controller.\n", _myAddressStr.c_str()); + exit(7); + } +} + +void PostgreSQL::onlineNotificationThread() +{ + PGconn *conn = getPgConn(); + if (PQstatus(conn) == CONNECTION_BAD) { + fprintf(stderr, "Connection to database failed: %s\n", PQerrorMessage(conn)); + PQfinish(conn); + exit(1); + } + _connected = 1; + + int64_t lastUpdatedNetworkStatus = 0; + std::unordered_map< std::pair<uint64_t,uint64_t>,int64_t,_PairHasher > lastOnlineCumulative; + + while (_run == 1) { + if (PQstatus(conn) != CONNECTION_OK) { + fprintf(stderr, "ERROR: Online Notification thread lost connection to Postgres."); + PQfinish(conn); + exit(5); + } + + // map used to send notifications to front end + std::unordered_map<std::string, std::vector<std::string>> updateMap; + + std::unordered_map< std::pair<uint64_t,uint64_t>,std::pair<int64_t,InetAddress>,_PairHasher > lastOnline; + { + std::lock_guard<std::mutex> l(_lastOnline_l); + lastOnline.swap(_lastOnline); + } + + PGresult *res = NULL; + + std::stringstream memberUpdate; + memberUpdate << "INSERT INTO ztc_member_status (network_id, member_id, address, last_updated) VALUES "; + bool firstRun = true; + bool memberAdded = false; + for (auto i=lastOnline.begin(); i != lastOnline.end(); ++i) { + uint64_t nwid_i = i->first.first; + char nwidTmp[64]; + char memTmp[64]; + char ipTmp[64]; + OSUtils::ztsnprintf(nwidTmp,sizeof(nwidTmp), "%.16llx", nwid_i); + OSUtils::ztsnprintf(memTmp,sizeof(memTmp), "%.10llx", i->first.second); + + auto found = _networks.find(nwid_i); + if (found == _networks.end()) { + continue; // skip members trying to join non-existant networks + } + + std::string networkId(nwidTmp); + std::string memberId(memTmp); + + std::vector<std::string> &members = updateMap[networkId]; + members.push_back(memberId); + + lastOnlineCumulative[i->first] = i->second.first; + + + const char *qvals[2] = { + networkId.c_str(), + memberId.c_str() + }; + + res = PQexecParams(conn, + "SELECT id, network_id FROM ztc_member WHERE network_id = $1 AND id = $2", + 2, + NULL, + qvals, + NULL, + NULL, + 0); + + if (PQresultStatus(res) != PGRES_TUPLES_OK) { + fprintf(stderr, "Member count failed: %s", PQerrorMessage(conn)); + PQclear(res); + continue; + } + + int nrows = PQntuples(res); + PQclear(res); + + if (nrows == 1) { + int64_t ts = i->second.first; + std::string ipAddr = i->second.second.toIpString(ipTmp); + std::string timestamp = std::to_string(ts); + + if (firstRun) { + firstRun = false; + } else { + memberUpdate << ", "; + } + + memberUpdate << "('" << networkId << "', '" << memberId << "', "; + if (ipAddr.empty()) { + memberUpdate << "NULL, "; + } else { + memberUpdate << "'" << ipAddr << "', "; + } + memberUpdate << "TO_TIMESTAMP(" << timestamp << "::double precision/1000))"; + memberAdded = true; + } else if (nrows > 1) { + fprintf(stderr, "nrows > 1?!?"); + continue; + } else { + continue; + } + } + memberUpdate << " ON CONFLICT (network_id, member_id) DO UPDATE SET address = EXCLUDED.address, last_updated = EXCLUDED.last_updated;"; + + if (memberAdded) { + res = PQexec(conn, memberUpdate.str().c_str()); + if (PQresultStatus(res) != PGRES_COMMAND_OK) { + fprintf(stderr, "Multiple insert failed: %s", PQerrorMessage(conn)); + } + PQclear(res); + } + + const int64_t now = OSUtils::now(); + if ((now - lastUpdatedNetworkStatus) > 10000) { + lastUpdatedNetworkStatus = now; + + std::vector<std::pair<uint64_t, std::shared_ptr<_Network>>> networks; + { + std::lock_guard<std::mutex> l(_networks_l); + for (auto i = _networks.begin(); i != _networks.end(); ++i) { + networks.push_back(*i); + } + } + + std::stringstream networkUpdate; + networkUpdate << "INSERT INTO ztc_network_status (network_id, bridge_count, authorized_member_count, online_member_count, total_member_count, last_modified) VALUES "; + bool nwFirstRun = true; + bool networkAdded = false; + for (auto i = networks.begin(); i != networks.end(); ++i) { + char tmp[64]; + Utils::hex(i->first, tmp); + + std::string networkId(tmp); + + std::vector<std::string> &_notUsed = updateMap[networkId]; + (void)_notUsed; + + uint64_t authMemberCount = 0; + uint64_t totalMemberCount = 0; + uint64_t onlineMemberCount = 0; + uint64_t bridgeCount = 0; + uint64_t ts = now; + { + std::lock_guard<std::mutex> l2(i->second->lock); + authMemberCount = i->second->authorizedMembers.size(); + totalMemberCount = i->second->members.size(); + bridgeCount = i->second->activeBridgeMembers.size(); + for (auto m=i->second->members.begin(); m != i->second->members.end(); ++m) { + auto lo = lastOnlineCumulative.find(std::pair<uint64_t,uint64_t>(i->first, m->first)); + if (lo != lastOnlineCumulative.end()) { + if ((now - lo->second) <= (ZT_NETWORK_AUTOCONF_DELAY * 2)) { + ++onlineMemberCount; + } else { + lastOnlineCumulative.erase(lo); + } + } + } + } + + const char *nvals[1] = { + networkId.c_str() + }; + + res = PQexecParams(conn, + "SELECT id FROM ztc_network WHERE id = $1", + 1, + NULL, + nvals, + NULL, + NULL, + 0); + + if (PQresultStatus(res) != PGRES_TUPLES_OK) { + fprintf(stderr, "Network lookup failed: %s", PQerrorMessage(conn)); + PQclear(res); + continue; + } + + int nrows = PQntuples(res); + PQclear(res); + + if (nrows == 1) { + std::string bc = std::to_string(bridgeCount); + std::string amc = std::to_string(authMemberCount); + std::string omc = std::to_string(onlineMemberCount); + std::string tmc = std::to_string(totalMemberCount); + std::string timestamp = std::to_string(ts); + + if (nwFirstRun) { + nwFirstRun = false; + } else { + networkUpdate << ", "; + } + + networkUpdate << "('" << networkId << "', " << bc << ", " << amc << ", " << omc << ", " << tmc << ", " + << "TO_TIMESTAMP(" << timestamp << "::double precision/1000))"; + + networkAdded = true; + + } else if (nrows > 1) { + fprintf(stderr, "Number of networks > 1?!?!?"); + continue; + } else { + continue; + } + } + networkUpdate << " ON CONFLICT (network_id) DO UPDATE SET bridge_count = EXCLUDED.bridge_count, " + << "authorized_member_count = EXCLUDED.authorized_member_count, online_member_count = EXCLUDED.online_member_count, " + << "total_member_count = EXCLUDED.total_member_count, last_modified = EXCLUDED.last_modified"; + if (networkAdded) { + res = PQexec(conn, networkUpdate.str().c_str()); + if (PQresultStatus(res) != PGRES_COMMAND_OK) { + fprintf(stderr, "Error during multiple network upsert: %s", PQresultErrorMessage(res)); + } + PQclear(res); + } + } + + // for (auto it = updateMap.begin(); it != updateMap.end(); ++it) { + // std::string networkId = it->first; + // std::vector<std::string> members = it->second; + // std::stringstream queryBuilder; + + // std::string membersStr = ::join(members, ","); + + // queryBuilder << "NOTIFY controller, '" << networkId << ":" << membersStr << "'"; + // std::string query = queryBuilder.str(); + + // PGresult *res = PQexec(conn,query.c_str()); + // if (PQresultStatus(res) != PGRES_COMMAND_OK) { + // fprintf(stderr, "ERROR: Error sending NOTIFY: %s\n", PQresultErrorMessage(res)); + // } + // PQclear(res); + // } + + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + fprintf(stderr, "%s: Fell out of run loop in onlineNotificationThread\n", _myAddressStr.c_str()); + PQfinish(conn); + if (_run == 1) { + fprintf(stderr, "ERROR: %s onlineNotificationThread should still be running! Exiting Controller.\n", _myAddressStr.c_str()); + exit(6); + } +} + +PGconn *PostgreSQL::getPgConn(OverrideMode m) { + if (m == ALLOW_PGBOUNCER_OVERRIDE) { + char *connStr = getenv("PGBOUNCER_CONNSTR"); + if (connStr != NULL) { + fprintf(stderr, "PGBouncer Override\n"); + std::string conn(connStr); + conn += " application_name=controller-"; + conn += _myAddressStr.c_str(); + return PQconnectdb(conn.c_str()); + } + } + + return PQconnectdb(_connString.c_str()); +} +#endif //ZT_CONTROLLER_USE_LIBPQ diff --git a/controller/PostgreSQL.hpp b/controller/PostgreSQL.hpp new file mode 100644 index 00000000..f35f89fc --- /dev/null +++ b/controller/PostgreSQL.hpp @@ -0,0 +1,117 @@ +/* + * ZeroTier One - Network Virtualization Everywhere + * Copyright (C) 2011-2019 ZeroTier, Inc. https://www.zerotier.com/ + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * -- + * + * You can be released from the requirements of the license by purchasing + * a commercial license. Buying such a license is mandatory as soon as you + * develop commercial closed-source software that incorporates or links + * directly against ZeroTier software without disclosing the source code + * of your own application. + */ + +#ifdef ZT_CONTROLLER_USE_LIBPQ + +#ifndef ZT_CONTROLLER_LIBPQ_HPP +#define ZT_CONTROLLER_LIBPQ_HPP + +#include "DB.hpp" + +#define ZT_CENTRAL_CONTROLLER_COMMIT_THREADS 4 + +extern "C" { + typedef struct pg_conn PGconn; +} + +namespace ZeroTier +{ + +struct MQConfig; + +/** + * A controller database driver that talks to PostgreSQL + * + * This is for use with ZeroTier Central. Others are free to build and use it + * but be aware taht we might change it at any time. + */ +class PostgreSQL : public DB +{ +public: + PostgreSQL(const Identity &myId, const char *path, int listenPort, MQConfig *mqc = NULL); + virtual ~PostgreSQL(); + + virtual bool waitForReady(); + virtual bool isReady(); + virtual void save(nlohmann::json *orig, nlohmann::json &record); + virtual void eraseNetwork(const uint64_t networkId); + virtual void eraseMember(const uint64_t networkId, const uint64_t memberId); + virtual void nodeIsOnline(const uint64_t networkId, const uint64_t memberId, const InetAddress &physicalAddress); + +protected: + struct _PairHasher + { + inline std::size_t operator()(const std::pair<uint64_t,uint64_t> &p) const { return (std::size_t)(p.first ^ p.second); } + }; + +private: + void initializeNetworks(PGconn *conn); + void initializeMembers(PGconn *conn); + void heartbeat(); + void membersDbWatcher(); + void _membersWatcher_Postgres(PGconn *conn); + void _membersWatcher_RabbitMQ(); + void networksDbWatcher(); + void _networksWatcher_Postgres(PGconn *conn); + void _networksWatcher_RabbitMQ(); + + void commitThread(); + void onlineNotificationThread(); + + enum OverrideMode { + ALLOW_PGBOUNCER_OVERRIDE = 0, + NO_OVERRIDE = 1 + }; + + PGconn * getPgConn( OverrideMode m = ALLOW_PGBOUNCER_OVERRIDE ); + + std::string _connString; + + BlockingQueue<nlohmann::json *> _commitQueue; + + std::thread _heartbeatThread; + std::thread _membersDbWatcher; + std::thread _networksDbWatcher; + std::thread _commitThread[ZT_CENTRAL_CONTROLLER_COMMIT_THREADS]; + std::thread _onlineNotificationThread; + + std::unordered_map< std::pair<uint64_t,uint64_t>,std::pair<int64_t,InetAddress>,_PairHasher > _lastOnline; + + mutable std::mutex _lastOnline_l; + mutable std::mutex _readyLock; + std::atomic<int> _ready, _connected, _run; + mutable volatile bool _waitNoticePrinted; + + int _listenPort; + + MQConfig *_mqc; +}; + +} + +#endif // ZT_CONTROLLER_LIBPQ_HPP + +#endif // ZT_CONTROLLER_USE_LIBPQ diff --git a/controller/README.md b/controller/README.md index 20fb69a0..c93c08f5 100644 --- a/controller/README.md +++ b/controller/README.md @@ -19,9 +19,9 @@ Since ZeroTier nodes are mobile and do not need static IPs, implementing high av ZeroTier network controllers can easily be run in Docker or other container systems. Since containers do not need to actually join networks, extra privilege options like "--device=/dev/net/tun --privileged" are not needed. You'll just need to map the local JSON API port of the running controller and allow it to access the Internet (over UDP/9993 at a minimum) so things can reach and query it. -### RethinkDB Database Implementation +### PostgreSQL Database Implementation -The default controller stores its data in the filesystem in `controller.d` under ZeroTier's home folder. There's an alternative implementation that stores data in RethinkDB that can be built with `make central-controller`. Right now this is only guaranteed to build and run on Linux and is designed for use with [ZeroTier Central](https://my.zerotier.com/). You're welcome to use it but we don't "officially" support it for end-user use and it could change at any time. +The default controller stores its data in the filesystem in `controller.d` under ZeroTier's home folder. There's an alternative implementation that stores data in PostgreSQL that can be built with `make central-controller`. Right now this is only guaranteed to build and run on Centos 7 Linux with PostgreSQL 10 installed via the [PostgreSQL Yum Repository](https://www.postgresql.org/download/linux/redhat/) and is designed for use with [ZeroTier Central](https://my.zerotier.com/). You're welcome to use it but we don't "officially" support it for end-user use and it could change at any time. ### Upgrading from Older (1.1.14 or earlier) Versions diff --git a/controller/RabbitMQ.cpp b/controller/RabbitMQ.cpp new file mode 100644 index 00000000..eec9745d --- /dev/null +++ b/controller/RabbitMQ.cpp @@ -0,0 +1,107 @@ +#include "RabbitMQ.hpp" + +#ifdef ZT_CONTROLLER_USE_LIBPQ + +#include <amqp.h> +#include <amqp_tcp_socket.h> +#include <stdexcept> +#include <cstring> + +namespace ZeroTier +{ + +RabbitMQ::RabbitMQ(MQConfig *cfg, const char *queueName) + : _mqc(cfg) + , _qName(queueName) + , _socket(NULL) + , _status(0) +{ +} + +RabbitMQ::~RabbitMQ() +{ + amqp_channel_close(_conn, _channel, AMQP_REPLY_SUCCESS); + amqp_connection_close(_conn, AMQP_REPLY_SUCCESS); + amqp_destroy_connection(_conn); +} + +void RabbitMQ::init() +{ + struct timeval tval; + memset(&tval, 0, sizeof(struct timeval)); + tval.tv_sec = 5; + + fprintf(stderr, "Initializing RabbitMQ %s\n", _qName); + _conn = amqp_new_connection(); + _socket = amqp_tcp_socket_new(_conn); + if (!_socket) { + throw std::runtime_error("Can't create socket for RabbitMQ"); + } + + _status = amqp_socket_open_noblock(_socket, _mqc->host, _mqc->port, &tval); + if (_status) { + throw std::runtime_error("Can't connect to RabbitMQ"); + } + + amqp_rpc_reply_t r = amqp_login(_conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, + _mqc->username, _mqc->password); + if (r.reply_type != AMQP_RESPONSE_NORMAL) { + throw std::runtime_error("RabbitMQ Login Error"); + } + + static int chan = 0; + { + Mutex::Lock l(_chan_m); + _channel = ++chan; + } + amqp_channel_open(_conn, _channel); + r = amqp_get_rpc_reply(_conn); + if(r.reply_type != AMQP_RESPONSE_NORMAL) { + throw std::runtime_error("Error opening communication channel"); + } + + _q = amqp_queue_declare(_conn, _channel, amqp_cstring_bytes(_qName), 0, 0, 0, 0, amqp_empty_table); + r = amqp_get_rpc_reply(_conn); + if (r.reply_type != AMQP_RESPONSE_NORMAL) { + throw std::runtime_error("Error declaring queue " + std::string(_qName)); + } + + amqp_basic_consume(_conn, _channel, amqp_cstring_bytes(_qName), amqp_empty_bytes, 0, 1, 0, amqp_empty_table); + r = amqp_get_rpc_reply(_conn); + if (r.reply_type != AMQP_RESPONSE_NORMAL) { + throw std::runtime_error("Error consuming queue " + std::string(_qName)); + } + fprintf(stderr, "RabbitMQ Init OK %s\n", _qName); +} + +std::string RabbitMQ::consume() +{ + amqp_rpc_reply_t res; + amqp_envelope_t envelope; + amqp_maybe_release_buffers(_conn); + + struct timeval timeout; + timeout.tv_sec = 1; + timeout.tv_usec = 0; + + res = amqp_consume_message(_conn, &envelope, &timeout, 0); + if (res.reply_type != AMQP_RESPONSE_NORMAL) { + if (res.reply_type == AMQP_RESPONSE_LIBRARY_EXCEPTION && res.library_error == AMQP_STATUS_TIMEOUT) { + // timeout waiting for message. Return empty string + return ""; + } else { + throw std::runtime_error("Error getting message"); + } + } + + std::string msg( + (const char*)envelope.message.body.bytes, + envelope.message.body.len + ); + amqp_destroy_envelope(&envelope); + return msg; +} + +} + +#endif // ZT_CONTROLLER_USE_LIBPQ diff --git a/controller/RabbitMQ.hpp b/controller/RabbitMQ.hpp new file mode 100644 index 00000000..d341681b --- /dev/null +++ b/controller/RabbitMQ.hpp @@ -0,0 +1,79 @@ +/* + * ZeroTier One - Network Virtualization Everywhere + * Copyright (C) 2011-2019 ZeroTier, Inc. https://www.zerotier.com/ + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * -- + * + * You can be released from the requirements of the license by purchasing + * a commercial license. Buying such a license is mandatory as soon as you + * develop commercial closed-source software that incorporates or links + * directly against ZeroTier software without disclosing the source code + * of your own application. + */ +#ifndef ZT_CONTROLLER_RABBITMQ_HPP +#define ZT_CONTROLLER_RABBITMQ_HPP + +namespace ZeroTier +{ +struct MQConfig { + const char *host; + int port; + const char *username; + const char *password; +}; +} + +#ifdef ZT_CONTROLLER_USE_LIBPQ + +#include "../node/Mutex.hpp" + +#include <amqp.h> +#include <amqp_tcp_socket.h> +#include <string> + +namespace ZeroTier +{ + +class RabbitMQ { +public: + RabbitMQ(MQConfig *cfg, const char *queueName); + ~RabbitMQ(); + + void init(); + + std::string consume(); + +private: + MQConfig *_mqc; + const char *_qName; + + amqp_socket_t *_socket; + amqp_connection_state_t _conn; + amqp_queue_declare_ok_t *_q; + int _status; + + int _channel; + + Mutex _chan_m; + +}; + +} + +#endif // ZT_CONTROLLER_USE_LIBPQ + +#endif // ZT_CONTROLLER_RABBITMQ_HPP + diff --git a/controller/RethinkDB.cpp b/controller/RethinkDB.cpp deleted file mode 100644 index a46d033f..00000000 --- a/controller/RethinkDB.cpp +++ /dev/null @@ -1,497 +0,0 @@ -/* - * ZeroTier One - Network Virtualization Everywhere - * Copyright (C) 2011-2018 ZeroTier, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - */ - -//#define ZT_CONTROLLER_USE_RETHINKDB - -#ifdef ZT_CONTROLLER_USE_RETHINKDB - -#include <stdio.h> -#include <stdlib.h> -#include <unistd.h> -#include <time.h> - -#include "RethinkDB.hpp" -#include "EmbeddedNetworkController.hpp" - -#include "../version.h" - -#include <chrono> -#include <algorithm> -#include <stdexcept> - -#include "../ext/librethinkdbxx/build/include/rethinkdb.h" - -namespace R = RethinkDB; -using json = nlohmann::json; - -namespace ZeroTier { - -static const char *_timestr() -{ - time_t t = time(0); - char *ts = ctime(&t); - char *p = ts; - if (!p) - return ""; - while (*p) { - if (*p == '\n') { - *p = (char)0; - break; - } - ++p; - } - return ts; -} - -RethinkDB::RethinkDB(EmbeddedNetworkController *const nc,const Identity &myId,const char *path) : - DB(nc,myId,path), - _ready(2), // two tables need to be synchronized before we're ready, so this is ready when it reaches 0 - _run(1), - _waitNoticePrinted(false) -{ - // rethinkdb:host:port:db[:auth] - std::vector<std::string> ps(OSUtils::split(path,":","","")); - if ((ps.size() < 4)||(ps[0] != "rethinkdb")) - throw std::runtime_error("invalid rethinkdb database url"); - _host = ps[1]; - _port = Utils::strToInt(ps[2].c_str()); - _db = ps[3]; - if (ps.size() > 4) - _auth = ps[4]; - - _readyLock.lock(); - - _membersDbWatcher = std::thread([this]() { - try { - while (_run == 1) { - try { - std::unique_ptr<R::Connection> rdb(R::connect(this->_host,this->_port,this->_auth)); - if (rdb) { - _membersDbWatcherConnection = (void *)rdb.get(); - auto cur = R::db(this->_db).table("Member",R::optargs("read_mode","outdated")).get_all(this->_myAddressStr,R::optargs("index","controllerId")).changes(R::optargs("squash",0.05,"include_initial",true,"include_types",true,"include_states",true)).run(*rdb); - while (cur.has_next()) { - if (_run != 1) break; - json tmp(json::parse(cur.next().as_json())); - if ((tmp["type"] == "state")&&(tmp["state"] == "ready")) { - if (--this->_ready == 0) { - if (_waitNoticePrinted) - fprintf(stderr,"[%s] NOTICE: %.10llx controller RethinkDB data download complete." ZT_EOL_S,_timestr(),(unsigned long long)_myAddress.toInt()); - this->_readyLock.unlock(); - } - } else { - try { - json &ov = tmp["old_val"]; - json &nv = tmp["new_val"]; - json oldConfig,newConfig; - if (ov.is_object()) oldConfig = ov["config"]; - if (nv.is_object()) newConfig = nv["config"]; - if (oldConfig.is_object()||newConfig.is_object()) - this->_memberChanged(oldConfig,newConfig,(this->_ready <= 0)); - } catch ( ... ) {} // ignore bad records - } - } - } - } catch (std::exception &e) { - fprintf(stderr,"[%s] ERROR: %.10llx controller RethinkDB (member change stream): %s" ZT_EOL_S,_timestr(),(unsigned long long)_myAddress.toInt(),e.what()); - } catch (R::Error &e) { - fprintf(stderr,"[%s] ERROR: %.10llx controller RethinkDB (member change stream): %s" ZT_EOL_S,_timestr(),(unsigned long long)_myAddress.toInt(),e.message.c_str()); - } catch ( ... ) { - fprintf(stderr,"[%s] ERROR: %.10llx controller RethinkDB (member change stream): unknown exception" ZT_EOL_S,_timestr(),(unsigned long long)_myAddress.toInt()); - } - std::this_thread::sleep_for(std::chrono::milliseconds(250)); - } - } catch ( ... ) {} - }); - - _networksDbWatcher = std::thread([this]() { - try { - while (_run == 1) { - try { - std::unique_ptr<R::Connection> rdb(R::connect(this->_host,this->_port,this->_auth)); - if (rdb) { - _networksDbWatcherConnection = (void *)rdb.get(); - auto cur = R::db(this->_db).table("Network",R::optargs("read_mode","outdated")).get_all(this->_myAddressStr,R::optargs("index","controllerId")).changes(R::optargs("squash",0.05,"include_initial",true,"include_types",true,"include_states",true)).run(*rdb); - while (cur.has_next()) { - if (_run != 1) break; - json tmp(json::parse(cur.next().as_json())); - if ((tmp["type"] == "state")&&(tmp["state"] == "ready")) { - if (--this->_ready == 0) { - if (_waitNoticePrinted) - fprintf(stderr,"[%s] NOTICE: %.10llx controller RethinkDB data download complete." ZT_EOL_S,_timestr(),(unsigned long long)_myAddress.toInt()); - this->_readyLock.unlock(); - } - } else { - try { - json &ov = tmp["old_val"]; - json &nv = tmp["new_val"]; - json oldConfig,newConfig; - if (ov.is_object()) oldConfig = ov["config"]; - if (nv.is_object()) newConfig = nv["config"]; - if (oldConfig.is_object()||newConfig.is_object()) - this->_networkChanged(oldConfig,newConfig,(this->_ready <= 0)); - } catch ( ... ) {} // ignore bad records - } - } - } - } catch (std::exception &e) { - fprintf(stderr,"[%s] ERROR: %.10llx controller RethinkDB (network change stream): %s" ZT_EOL_S,_timestr(),(unsigned long long)_myAddress.toInt(),e.what()); - } catch (R::Error &e) { - fprintf(stderr,"[%s] ERROR: %.10llx controller RethinkDB (network change stream): %s" ZT_EOL_S,_timestr(),(unsigned long long)_myAddress.toInt(),e.message.c_str()); - } catch ( ... ) { - fprintf(stderr,"[%s] ERROR: %.10llx controller RethinkDB (network change stream): unknown exception" ZT_EOL_S,_timestr(),(unsigned long long)_myAddress.toInt()); - } - std::this_thread::sleep_for(std::chrono::milliseconds(250)); - } - } catch ( ... ) {} - }); - - for(int t=0;t<ZT_CONTROLLER_RETHINKDB_COMMIT_THREADS;++t) { - _commitThread[t] = std::thread([this]() { - try { - std::unique_ptr<R::Connection> rdb; - nlohmann::json *config = (nlohmann::json *)0; - while ((this->_commitQueue.get(config))&&(_run == 1)) { - if (!config) - continue; - nlohmann::json record; - const char *table = (const char *)0; - std::string deleteId; - try { - const std::string objtype = (*config)["objtype"]; - if (objtype == "member") { - const std::string nwid = (*config)["nwid"]; - const std::string id = (*config)["id"]; - record["id"] = nwid + "-" + id; - record["controllerId"] = this->_myAddressStr; - record["networkId"] = nwid; - record["nodeId"] = id; - record["config"] = *config; - table = "Member"; - } else if (objtype == "network") { - const std::string id = (*config)["id"]; - record["id"] = id; - record["controllerId"] = this->_myAddressStr; - record["config"] = *config; - table = "Network"; - } else if (objtype == "trace") { - record = *config; - table = "RemoteTrace"; - } else if (objtype == "_delete_network") { - deleteId = (*config)["id"]; - table = "Network"; - } else if (objtype == "_delete_member") { - deleteId = (*config)["nwid"]; - deleteId.push_back('-'); - const std::string tmp = (*config)["id"]; - deleteId.append(tmp); - table = "Member"; - } - } catch (std::exception &e) { - fprintf(stderr,"[%s] ERROR: %.10llx controller RethinkDB (insert/update record creation): %s" ZT_EOL_S,_timestr(),(unsigned long long)_myAddress.toInt(),e.what()); - table = (const char *)0; - } catch (R::Error &e) { - fprintf(stderr,"[%s] ERROR: %.10llx controller RethinkDB (insert/update record creation): %s" ZT_EOL_S,_timestr(),(unsigned long long)_myAddress.toInt(),e.message.c_str()); - table = (const char *)0; - } catch ( ... ) { - fprintf(stderr,"[%s] ERROR: %.10llx controller RethinkDB (insert/update record creation): unknown exception" ZT_EOL_S,_timestr(),(unsigned long long)_myAddress.toInt()); - table = (const char *)0; - } - delete config; - if (!table) - continue; - const std::string jdump(OSUtils::jsonDump(record,-1)); - - while (_run == 1) { - try { - if (!rdb) - rdb = R::connect(this->_host,this->_port,this->_auth); - if (rdb) { - if (deleteId.length() > 0) { - //printf("DELETE: %s" ZT_EOL_S,deleteId.c_str()); - R::db(this->_db).table(table).get(deleteId).delete_().run(*rdb); - } else { - //printf("UPSERT: %s" ZT_EOL_S,record.dump().c_str()); - R::db(this->_db).table(table).insert(R::Datum::from_json(jdump),R::optargs("conflict","update","return_changes",false)).run(*rdb); - } - break; - } else { - fprintf(stderr,"[%s] ERROR: %.10llx controller RethinkDB (insert/update): connect failed (will retry)" ZT_EOL_S,_timestr(),(unsigned long long)_myAddress.toInt()); - rdb.reset(); - } - } catch (std::exception &e) { - fprintf(stderr,"[%s] ERROR: %.10llx controller RethinkDB (insert/update): %s [%s]" ZT_EOL_S,_timestr(),(unsigned long long)_myAddress.toInt(),e.what(),jdump.c_str()); - rdb.reset(); - } catch (R::Error &e) { - fprintf(stderr,"[%s] ERROR: %.10llx controller RethinkDB (insert/update): %s [%s]" ZT_EOL_S,_timestr(),(unsigned long long)_myAddress.toInt(),e.message.c_str(),jdump.c_str()); - rdb.reset(); - } catch ( ... ) { - fprintf(stderr,"[%s] ERROR: %.10llx controller RethinkDB (insert/update): unknown exception [%s]" ZT_EOL_S,_timestr(),(unsigned long long)_myAddress.toInt(),jdump.c_str()); - rdb.reset(); - } - std::this_thread::sleep_for(std::chrono::milliseconds(250)); - } - } - } catch (std::exception &e) { - fprintf(stderr,"[%s] ERROR: %.10llx controller RethinkDB (insert/update outer loop): %s" ZT_EOL_S,_timestr(),(unsigned long long)_myAddress.toInt(),e.what()); - } catch (R::Error &e) { - fprintf(stderr,"[%s] ERROR: %.10llx controller RethinkDB (insert/update outer loop): %s" ZT_EOL_S,_timestr(),(unsigned long long)_myAddress.toInt(),e.message.c_str()); - } catch ( ... ) { - fprintf(stderr,"[%s] ERROR: %.10llx controller RethinkDB (insert/update outer loop): unknown exception" ZT_EOL_S,_timestr(),(unsigned long long)_myAddress.toInt()); - } - }); - } - - _onlineNotificationThread = std::thread([this]() { - int64_t lastUpdatedNetworkStatus = 0; - std::unordered_map< std::pair<uint64_t,uint64_t>,int64_t,_PairHasher > lastOnlineCumulative; - try { - std::unique_ptr<R::Connection> rdb; - while (_run == 1) { - try { - if (!rdb) { - _connected = 0; - rdb = R::connect(this->_host,this->_port,this->_auth); - } - - if (rdb) { - _connected = 1; - R::Array batch; - R::Object tmpobj; - - std::unordered_map< std::pair<uint64_t,uint64_t>,std::pair<int64_t,InetAddress>,_PairHasher > lastOnline; - { - std::lock_guard<std::mutex> l(_lastOnline_l); - lastOnline.swap(_lastOnline); - } - - for(auto i=lastOnline.begin();i!=lastOnline.end();++i) { - lastOnlineCumulative[i->first] = i->second.first; - char tmp[64],tmp2[64]; - OSUtils::ztsnprintf(tmp,sizeof(tmp),"%.16llx-%.10llx",i->first.first,i->first.second); - tmpobj["id"] = tmp; - tmpobj["ts"] = i->second.first; - tmpobj["phy"] = i->second.second.toIpString(tmp2); - batch.emplace_back(tmpobj); - if (batch.size() >= 1024) { - R::db(this->_db).table("MemberStatus",R::optargs("read_mode","outdated")).insert(batch,R::optargs("conflict","update")).run(*rdb); - batch.clear(); - } - } - if (batch.size() > 0) { - R::db(this->_db).table("MemberStatus",R::optargs("read_mode","outdated")).insert(batch,R::optargs("conflict","update")).run(*rdb); - batch.clear(); - } - tmpobj.clear(); - - const int64_t now = OSUtils::now(); - if ((now - lastUpdatedNetworkStatus) > 10000) { - lastUpdatedNetworkStatus = now; - - std::vector< std::pair< uint64_t,std::shared_ptr<_Network> > > networks; - { - std::lock_guard<std::mutex> l(_networks_l); - networks.reserve(_networks.size() + 1); - for(auto i=_networks.begin();i!=_networks.end();++i) - networks.push_back(*i); - } - - for(auto i=networks.begin();i!=networks.end();++i) { - char tmp[64]; - Utils::hex(i->first,tmp); - tmpobj["id"] = tmp; - { - std::lock_guard<std::mutex> l2(i->second->lock); - tmpobj["authorizedMemberCount"] = i->second->authorizedMembers.size(); - tmpobj["totalMemberCount"] = i->second->members.size(); - unsigned long onlineMemberCount = 0; - for(auto m=i->second->members.begin();m!=i->second->members.end();++m) { - auto lo = lastOnlineCumulative.find(std::pair<uint64_t,uint64_t>(i->first,m->first)); - if (lo != lastOnlineCumulative.end()) { - if ((now - lo->second) <= (ZT_NETWORK_AUTOCONF_DELAY * 2)) - ++onlineMemberCount; - else lastOnlineCumulative.erase(lo); - } - } - tmpobj["onlineMemberCount"] = onlineMemberCount; - tmpobj["bridgeCount"] = i->second->activeBridgeMembers.size(); - tmpobj["ts"] = now; - } - batch.emplace_back(tmpobj); - if (batch.size() >= 1024) { - R::db(this->_db).table("NetworkStatus",R::optargs("read_mode","outdated")).insert(batch,R::optargs("conflict","update")).run(*rdb); - batch.clear(); - } - } - if (batch.size() > 0) { - R::db(this->_db).table("NetworkStatus",R::optargs("read_mode","outdated")).insert(batch,R::optargs("conflict","update")).run(*rdb); - batch.clear(); - } - } - } - } catch (std::exception &e) { - fprintf(stderr,"[%s] ERROR: %.10llx controller RethinkDB (node status update): %s" ZT_EOL_S,_timestr(),(unsigned long long)_myAddress.toInt(),e.what()); - rdb.reset(); - } catch (R::Error &e) { - fprintf(stderr,"[%s] ERROR: %.10llx controller RethinkDB (node status update): %s" ZT_EOL_S,_timestr(),(unsigned long long)_myAddress.toInt(),e.message.c_str()); - rdb.reset(); - } catch ( ... ) { - fprintf(stderr,"[%s] ERROR: %.10llx controller RethinkDB (node status update): unknown exception" ZT_EOL_S,_timestr(),(unsigned long long)_myAddress.toInt()); - rdb.reset(); - } - std::this_thread::sleep_for(std::chrono::milliseconds(250)); - } - } catch ( ... ) {} - }); - - _heartbeatThread = std::thread([this]() { - try { - R::Object controllerRecord; - std::unique_ptr<R::Connection> rdb; - - { - char publicId[1024]; - //char secretId[1024]; - char hostname[1024]; - this->_myId.toString(false,publicId); - //this->_myId.toString(true,secretId); - if (gethostname(hostname,sizeof(hostname)) != 0) { - hostname[0] = (char)0; - } else { - for(int i=0;i<sizeof(hostname);++i) { - if ((hostname[i] == '.')||(hostname[i] == 0)) { - hostname[i] = (char)0; - break; - } - } - } - controllerRecord["id"] = this->_myAddressStr.c_str(); - controllerRecord["publicIdentity"] = publicId; - //controllerRecord["secretIdentity"] = secretId; - if (hostname[0]) - controllerRecord["clusterHost"] = hostname; - controllerRecord["vMajor"] = ZEROTIER_ONE_VERSION_MAJOR; - controllerRecord["vMinor"] = ZEROTIER_ONE_VERSION_MINOR; - controllerRecord["vRev"] = ZEROTIER_ONE_VERSION_REVISION; - controllerRecord["vBuild"] = ZEROTIER_ONE_VERSION_BUILD; - } - - while (_run == 1) { - try { - if (!rdb) - rdb = R::connect(this->_host,this->_port,this->_auth); - if (rdb) { - controllerRecord["lastAlive"] = OSUtils::now(); - //printf("HEARTBEAT: %s" ZT_EOL_S,tmp); - R::db(this->_db).table("Controller",R::optargs("read_mode","outdated")).insert(controllerRecord,R::optargs("conflict","update")).run(*rdb); - } - } catch ( ... ) { - rdb.reset(); - } - std::this_thread::sleep_for(std::chrono::milliseconds(1000)); - } - } catch ( ... ) {} - }); -} - -RethinkDB::~RethinkDB() -{ - _run = 0; - std::this_thread::sleep_for(std::chrono::milliseconds(100)); - _commitQueue.stop(); - for(int t=0;t<ZT_CONTROLLER_RETHINKDB_COMMIT_THREADS;++t) - _commitThread[t].join(); - if (_membersDbWatcherConnection) - ((R::Connection *)_membersDbWatcherConnection)->close(); - if (_networksDbWatcherConnection) - ((R::Connection *)_networksDbWatcherConnection)->close(); - _membersDbWatcher.join(); - _networksDbWatcher.join(); - _heartbeatThread.join(); - _onlineNotificationThread.join(); -} - -bool RethinkDB::waitForReady() -{ - while (_ready > 0) { - if (!_waitNoticePrinted) { - _waitNoticePrinted = true; - fprintf(stderr,"[%s] NOTICE: %.10llx controller RethinkDB waiting for initial data download..." ZT_EOL_S,_timestr(),(unsigned long long)_myAddress.toInt()); - } - _readyLock.lock(); - _readyLock.unlock(); - } - return true; -} - -bool RethinkDB::isReady() -{ - return ((_ready)&&(_connected)); -} - -void RethinkDB::save(nlohmann::json *orig,nlohmann::json &record) -{ - if (!record.is_object()) // sanity check - return; - waitForReady(); - if (orig) { - if (*orig != record) { - record["revision"] = OSUtils::jsonInt(record["revision"],0ULL) + 1; - _commitQueue.post(new nlohmann::json(record)); - } - } else { - record["revision"] = 1; - _commitQueue.post(new nlohmann::json(record)); - } -} - -void RethinkDB::eraseNetwork(const uint64_t networkId) -{ - char tmp2[24]; - waitForReady(); - Utils::hex(networkId,tmp2); - json *tmp = new json(); - (*tmp)["id"] = tmp2; - (*tmp)["objtype"] = "_delete_network"; // pseudo-type, tells thread to delete network - _commitQueue.post(tmp); -} - -void RethinkDB::eraseMember(const uint64_t networkId,const uint64_t memberId) -{ - char tmp2[24]; - json *tmp = new json(); - waitForReady(); - Utils::hex(networkId,tmp2); - (*tmp)["nwid"] = tmp2; - Utils::hex10(memberId,tmp2); - (*tmp)["id"] = tmp2; - (*tmp)["objtype"] = "_delete_member"; // pseudo-type, tells thread to delete network - _commitQueue.post(tmp); -} - -void RethinkDB::nodeIsOnline(const uint64_t networkId,const uint64_t memberId,const InetAddress &physicalAddress) -{ - std::lock_guard<std::mutex> l(_lastOnline_l); - std::pair<int64_t,InetAddress> &i = _lastOnline[std::pair<uint64_t,uint64_t>(networkId,memberId)]; - i.first = OSUtils::now(); - if (physicalAddress) - i.second = physicalAddress; -} - -} // namespace ZeroTier - -#endif // ZT_CONTROLLER_USE_RETHINKDB diff --git a/controller/RethinkDB.hpp b/controller/RethinkDB.hpp deleted file mode 100644 index 60f04c5b..00000000 --- a/controller/RethinkDB.hpp +++ /dev/null @@ -1,84 +0,0 @@ -/* - * ZeroTier One - Network Virtualization Everywhere - * Copyright (C) 2011-2018 ZeroTier, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - */ - -#ifdef ZT_CONTROLLER_USE_RETHINKDB - -#ifndef ZT_CONTROLLER_RETHINKDB_HPP -#define ZT_CONTROLLER_RETHINKDB_HPP - -#include "DB.hpp" - -#define ZT_CONTROLLER_RETHINKDB_COMMIT_THREADS 4 - -namespace ZeroTier -{ - -/** - * A controller database driver that talks to RethinkDB - * - * This is for use with ZeroTier Central. Others are free to build and use it - * but be aware that we might change it at any time. - */ -class RethinkDB : public DB -{ -public: - RethinkDB(EmbeddedNetworkController *const nc,const Identity &myId,const char *path); - virtual ~RethinkDB(); - - virtual bool waitForReady(); - virtual bool isReady(); - virtual void save(nlohmann::json *orig,nlohmann::json &record); - virtual void eraseNetwork(const uint64_t networkId); - virtual void eraseMember(const uint64_t networkId,const uint64_t memberId); - virtual void nodeIsOnline(const uint64_t networkId,const uint64_t memberId,const InetAddress &physicalAddress); - -protected: - struct _PairHasher - { - inline std::size_t operator()(const std::pair<uint64_t,uint64_t> &p) const { return (std::size_t)(p.first ^ p.second); } - }; - - std::string _host; - std::string _db; - std::string _auth; - int _port; - - void *_networksDbWatcherConnection; - void *_membersDbWatcherConnection; - std::thread _networksDbWatcher; - std::thread _membersDbWatcher; - - BlockingQueue< nlohmann::json * > _commitQueue; - std::thread _commitThread[ZT_CONTROLLER_RETHINKDB_COMMIT_THREADS]; - - std::unordered_map< std::pair<uint64_t,uint64_t>,std::pair<int64_t,InetAddress>,_PairHasher > _lastOnline; - mutable std::mutex _lastOnline_l; - std::thread _onlineNotificationThread; - - std::thread _heartbeatThread; - - mutable std::mutex _readyLock; // locked until ready - std::atomic<int> _ready,_connected,_run; - mutable volatile bool _waitNoticePrinted; -}; - -} // namespace ZeroTier - -#endif - -#endif // ZT_CONTROLLER_USE_RETHINKDB |
