diff options
author | Adam Ierymenko <adam.ierymenko@gmail.com> | 2017-11-02 07:05:11 -0700 |
---|---|---|
committer | Adam Ierymenko <adam.ierymenko@gmail.com> | 2017-11-02 07:05:11 -0700 |
commit | 4e88c80a22b6ca982341413ee806ade0df57b4b7 (patch) | |
tree | 82c2daaac597f74595bc83c18646280a56898e1a /controller | |
parent | a6203ed0389c1b995ebe94935b2d1ddeb01f36ee (diff) | |
download | infinitytier-4e88c80a22b6ca982341413ee806ade0df57b4b7.tar.gz infinitytier-4e88c80a22b6ca982341413ee806ade0df57b4b7.zip |
RethinkDB native connector work, minor fixes.
Diffstat (limited to 'controller')
-rw-r--r-- | controller/RethinkDB.cpp | 308 | ||||
-rw-r--r-- | controller/RethinkDB.hpp | 101 |
2 files changed, 409 insertions, 0 deletions
diff --git a/controller/RethinkDB.cpp b/controller/RethinkDB.cpp new file mode 100644 index 00000000..5d93bf72 --- /dev/null +++ b/controller/RethinkDB.cpp @@ -0,0 +1,308 @@ +#include "RethinkDB.hpp" + +#include <chrono> +#include <algorithm> +#include <stdexcept> + +#include "../ext/librethinkdbxx/build/include/rethinkdb.h" + +namespace R = RethinkDB; +using nlohmann::json; + +namespace ZeroTier { + +RethinkDB::RethinkDB(const Address &myAddress,const char *host,const int port,const char *db,const char *auth) : + _myAddress(myAddress), + _host(host ? host : "127.0.0.1"), + _db(db), + _auth(auth ? auth : ""), + _port((port > 0) ? port : 28015), + _ready(2), // two tables need to be synchronized before we're ready + _run(1) +{ + _readyLock.lock(); + + { + char tmp[32]; + _myAddress.toString(tmp); + _myAddressStr = tmp; + } + + _membersDbWatcher = std::thread([this]() { + while (_run == 1) { + try { + auto rdb = R::connect(this->_host,this->_port,this->_auth); + if (rdb) { + _membersDbWatcherConnection = (void *)rdb.get(); + auto cur = R::db(this->_db).table("Member").get_all(this->_myAddressStr,R::optargs("index","controllerId")).changes(R::optargs("squash",0.1,"include_initial",true,"include_types",true,"include_states",true)).run(*rdb); + while (cur.has_next()) { + if (_run != 1) break; + json tmp(json::parse(cur.next().as_json())); + if ((tmp["type"] == "state")&&(tmp["state"] == "ready")) { + if (--this->_ready == 0) + this->_readyLock.unlock(); + } else { + try { + this->_memberChanged(tmp["old_val"],tmp["new_val"]); + } catch ( ... ) {} // ignore bad records + } + } + } + } catch (std::exception &e) { + fprintf(stderr,"ERROR: controller RethinkDB: %s" ZT_EOL_S,e.what()); + } catch (R::Error &e) { + fprintf(stderr,"ERROR: controller RethinkDB: %s" ZT_EOL_S,e.message.c_str()); + } catch ( ... ) { + fprintf(stderr,"ERROR: controller RethinkDB: unknown exception" ZT_EOL_S); + } + std::this_thread::sleep_for(std::chrono::milliseconds(250)); + } + }); + + _networksDbWatcher = std::thread([this]() { + while (_run == 1) { + try { + auto rdb = R::connect(this->_host,this->_port,this->_auth); + if (rdb) { + _membersDbWatcherConnection = (void *)rdb.get(); + auto cur = R::db(this->_db).table("Network").get_all(this->_myAddressStr,R::optargs("index","controllerId")).changes(R::optargs("squash",0.1,"include_initial",true,"include_types",true,"include_states",true)).run(*rdb); + while (cur.has_next()) { + if (_run != 1) break; + json tmp(json::parse(cur.next().as_json())); + if ((tmp["type"] == "state")&&(tmp["state"] == "ready")) { + if (--this->_ready == 0) + this->_readyLock.unlock(); + } else { + try { + this->_networkChanged(tmp["old_val"],tmp["new_val"]); + } catch ( ... ) {} // ignore bad records + } + } + } + } catch (std::exception &e) { + fprintf(stderr,"ERROR: controller RethinkDB: %s" ZT_EOL_S,e.what()); + } catch (R::Error &e) { + fprintf(stderr,"ERROR: controller RethinkDB: %s" ZT_EOL_S,e.message.c_str()); + } catch ( ... ) { + fprintf(stderr,"ERROR: controller RethinkDB: unknown exception" ZT_EOL_S); + } + std::this_thread::sleep_for(std::chrono::milliseconds(250)); + } + }); +} + +RethinkDB::~RethinkDB() +{ + // FIXME: not totally safe but will generally work, and only happens on shutdown anyway + _run = 0; + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + if (_membersDbWatcherConnection) + ((R::Connection *)_membersDbWatcherConnection)->close(); + if (_networksDbWatcherConnection) + ((R::Connection *)_networksDbWatcherConnection)->close(); + _membersDbWatcher.join(); + _networksDbWatcher.join(); +} + +inline bool RethinkDB::get(const uint64_t networkId,nlohmann::json &network) +{ + std::shared_ptr<_Network> nw; + { + std::lock_guard<std::mutex> l(_networks_l); + auto nwi = _networks.find(networkId); + if (nwi == _networks.end()) + return false; + nw = nwi->second; + } + + std::lock_guard<std::mutex> l2(nw->lock); + network = nw->config; + + return true; +} + +inline bool RethinkDB::get(const uint64_t networkId,nlohmann::json &network,const uint64_t memberId,nlohmann::json &member,NetworkSummaryInfo &info) +{ + std::shared_ptr<_Network> nw; + { + std::lock_guard<std::mutex> l(_networks_l); + auto nwi = _networks.find(networkId); + if (nwi == _networks.end()) + return false; + nw = nwi->second; + } + + std::lock_guard<std::mutex> l2(nw->lock); + auto m = nw->members.find(memberId); + if (m == nw->members.end()) + return false; + network = nw->config; + member = m->second; + _fillSummaryInfo(nw,info); + + return true; +} + +inline bool RethinkDB::get(const uint64_t networkId,nlohmann::json &network,std::vector<nlohmann::json> &members) +{ + std::shared_ptr<_Network> nw; + { + std::lock_guard<std::mutex> l(_networks_l); + auto nwi = _networks.find(networkId); + if (nwi == _networks.end()) + return false; + nw = nwi->second; + } + + std::lock_guard<std::mutex> l2(nw->lock); + network = nw->config; + for(auto m=nw->members.begin();m!=nw->members.end();++m) + members.push_back(m->second); + + return true; +} + +inline bool RethinkDB::summary(const uint64_t networkId,NetworkSummaryInfo &info) +{ + std::shared_ptr<_Network> nw; + { + std::lock_guard<std::mutex> l(_networks_l); + auto nwi = _networks.find(networkId); + if (nwi == _networks.end()) + return false; + nw = nwi->second; + } + + std::lock_guard<std::mutex> l2(nw->lock); + _fillSummaryInfo(nw,info); + + return true; +} + +void RethinkDB::_memberChanged(nlohmann::json &old,nlohmann::json &member) +{ + uint64_t memberId = 0; + uint64_t networkId = 0; + std::shared_ptr<_Network> nw; + + if (old.is_object()) { + json &config = old["config"]; + if (config.is_object()) { + memberId = OSUtils::jsonIntHex(config["id"],0ULL); + networkId = OSUtils::jsonIntHex(config["nwid"],0ULL); + if ((memberId)&&(networkId)) { + { + std::lock_guard<std::mutex> l(_networks_l); + auto nw2 = _networks.find(networkId); + if (nw2 != _networks.end()) + nw = nw2->second; + } + if (nw) { + std::lock_guard<std::mutex> l(nw->lock); + if (OSUtils::jsonBool(config["activeBridge"],false)) + nw->activeBridgeMembers.erase(memberId); + if (OSUtils::jsonBool(config["authorized"],false)) + nw->authorizedMembers.erase(memberId); + json &ips = config["ipAssignments"]; + if (ips.is_array()) { + for(unsigned long i=0;i<ips.size();++i) { + json &ipj = ips[i]; + if (ipj.is_string()) { + const std::string ips = ipj; + InetAddress ipa(ips.c_str()); + ipa.setPort(0); + nw->allocatedIps.erase(ipa); + } + } + } + } + } + } + } + + if (member.is_object()) { + json &config = member["config"]; + if (config.is_object()) { + if (!nw) { + memberId = OSUtils::jsonIntHex(config["id"],0ULL); + networkId = OSUtils::jsonIntHex(config["nwid"],0ULL); + if ((!memberId)||(!networkId)) + return; + std::lock_guard<std::mutex> l(_networks_l); + std::shared_ptr<_Network> &nw2 = _networks[networkId]; + if (!nw2) + nw2.reset(new _Network); + nw = nw2; + } + std::lock_guard<std::mutex> l(nw->lock); + + nw->members[memberId] = config; + + if (OSUtils::jsonBool(config["activeBridge"],false)) + nw->activeBridgeMembers.insert(memberId); + const bool isAuth = OSUtils::jsonBool(config["authorized"],false); + if (isAuth) + nw->authorizedMembers.insert(memberId); + json &ips = config["ipAssignments"]; + if (ips.is_array()) { + for(unsigned long i=0;i<ips.size();++i) { + json &ipj = ips[i]; + if (ipj.is_string()) { + const std::string ips = ipj; + InetAddress ipa(ips.c_str()); + ipa.setPort(0); + nw->allocatedIps.insert(ipa); + } + } + } + + if (!isAuth) { + const int64_t ldt = (int64_t)OSUtils::jsonInt(config["lastDeauthorizedTime"],0ULL); + if (ldt > nw->mostRecentDeauthTime) + nw->mostRecentDeauthTime = ldt; + } + } + } +} + +void RethinkDB::_networkChanged(nlohmann::json &old,nlohmann::json &network) +{ + if (network.is_object()) { + json &config = network["config"]; + if (config.is_object()) { + const std::string ids = config["id"]; + const uint64_t id = Utils::hexStrToU64(ids.c_str()); + if (id) { + std::shared_ptr<_Network> nw; + { + std::lock_guard<std::mutex> l(_networks_l); + std::shared_ptr<_Network> &nw2 = _networks[id]; + if (!nw2) + nw2.reset(new _Network); + nw = nw2; + } + std::lock_guard<std::mutex> l2(nw->lock); + nw->config = config; + } + } + } else if (old.is_object()) { + const std::string ids = old["id"]; + const uint64_t id = Utils::hexStrToU64(ids.c_str()); + if (id) { + std::lock_guard<std::mutex> l(_networks_l); + _networks.erase(id); + } + } +} + +} // namespace ZeroTier + +/* +int main(int argc,char **argv) +{ + ZeroTier::RethinkDB db(ZeroTier::Address(0x8056c2e21cULL),"10.6.6.188",28015,"ztc",""); + db.waitForReady(); + printf("ready.\n"); + pause(); +} +*/ diff --git a/controller/RethinkDB.hpp b/controller/RethinkDB.hpp new file mode 100644 index 00000000..7ed0e0a8 --- /dev/null +++ b/controller/RethinkDB.hpp @@ -0,0 +1,101 @@ +#ifndef ZT_CONTROLLER_RETHINKDB_HPP +#define ZT_CONTROLLER_RETHINKDB_HPP + +#include "../node/Constants.hpp" +#include "../node/Address.hpp" +#include "../node/InetAddress.hpp" +#include "../osdep/OSUtils.hpp" + +#include <memory> +#include <string> +#include <thread> +#include <unordered_map> +#include <unordered_set> +#include <vector> + +#include "../ext/json/json.hpp" + +namespace ZeroTier +{ + +class RethinkDB +{ +public: + struct NetworkSummaryInfo + { + NetworkSummaryInfo() : authorizedMemberCount(0),totalMemberCount(0),mostRecentDeauthTime(0) {} + std::vector<Address> activeBridges; + std::vector<InetAddress> allocatedIps; + unsigned long authorizedMemberCount; + unsigned long totalMemberCount; + int64_t mostRecentDeauthTime; + }; + + RethinkDB(const Address &myAddress,const char *host,const int port,const char *db,const char *auth); + ~RethinkDB(); + + inline bool ready() const { return (_ready <= 0); } + + inline void waitForReady() const + { + while (_ready > 0) { + _readyLock.lock(); + _readyLock.unlock(); + } + } + + bool get(const uint64_t networkId,nlohmann::json &network); + bool get(const uint64_t networkId,nlohmann::json &network,const uint64_t memberId,nlohmann::json &member,NetworkSummaryInfo &info); + bool get(const uint64_t networkId,nlohmann::json &network,std::vector<nlohmann::json> &members); + bool summary(const uint64_t networkId,NetworkSummaryInfo &info); + +private: + struct _Network + { + _Network() : mostRecentDeauthTime(0) {} + nlohmann::json config; + std::unordered_map<uint64_t,nlohmann::json> members; + std::unordered_set<uint64_t> activeBridgeMembers; + std::unordered_set<uint64_t> authorizedMembers; + std::unordered_set<InetAddress,InetAddress::Hasher> allocatedIps; + int64_t mostRecentDeauthTime; + std::mutex lock; + }; + + void _memberChanged(nlohmann::json &old,nlohmann::json &member); + void _networkChanged(nlohmann::json &old,nlohmann::json &network); + + inline void _fillSummaryInfo(const std::shared_ptr<_Network> &nw,NetworkSummaryInfo &info) + { + for(auto ab=nw->activeBridgeMembers.begin();ab!=nw->activeBridgeMembers.end();++ab) + info.activeBridges.push_back(Address(*ab)); + for(auto ip=nw->allocatedIps.begin();ip!=nw->allocatedIps.end();++ip) + info.allocatedIps.push_back(*ip); + info.authorizedMemberCount = (unsigned long)nw->authorizedMembers.size(); + info.totalMemberCount = (unsigned long)nw->members.size(); + info.mostRecentDeauthTime = nw->mostRecentDeauthTime; + } + + const Address _myAddress; + std::string _myAddressStr; + std::string _host; + std::string _db; + std::string _auth; + const int _port; + + void *_networksDbWatcherConnection; + void *_membersDbWatcherConnection; + std::thread _networksDbWatcher; + std::thread _membersDbWatcher; + + std::unordered_map< uint64_t,std::shared_ptr<_Network> > _networks; + std::mutex _networks_l; + + mutable std::mutex _readyLock; // locked until ready + std::atomic<int> _ready; + std::atomic<int> _run; +}; + +} // namespace ZeroTier + +#endif |