summaryrefslogtreecommitdiff
path: root/controller
diff options
context:
space:
mode:
authorAdam Ierymenko <adam.ierymenko@gmail.com>2017-11-02 07:05:11 -0700
committerAdam Ierymenko <adam.ierymenko@gmail.com>2017-11-02 07:05:11 -0700
commit4e88c80a22b6ca982341413ee806ade0df57b4b7 (patch)
tree82c2daaac597f74595bc83c18646280a56898e1a /controller
parenta6203ed0389c1b995ebe94935b2d1ddeb01f36ee (diff)
downloadinfinitytier-4e88c80a22b6ca982341413ee806ade0df57b4b7.tar.gz
infinitytier-4e88c80a22b6ca982341413ee806ade0df57b4b7.zip
RethinkDB native connector work, minor fixes.
Diffstat (limited to 'controller')
-rw-r--r--controller/RethinkDB.cpp308
-rw-r--r--controller/RethinkDB.hpp101
2 files changed, 409 insertions, 0 deletions
diff --git a/controller/RethinkDB.cpp b/controller/RethinkDB.cpp
new file mode 100644
index 00000000..5d93bf72
--- /dev/null
+++ b/controller/RethinkDB.cpp
@@ -0,0 +1,308 @@
+#include "RethinkDB.hpp"
+
+#include <chrono>
+#include <algorithm>
+#include <stdexcept>
+
+#include "../ext/librethinkdbxx/build/include/rethinkdb.h"
+
+namespace R = RethinkDB;
+using nlohmann::json;
+
+namespace ZeroTier {
+
+RethinkDB::RethinkDB(const Address &myAddress,const char *host,const int port,const char *db,const char *auth) :
+ _myAddress(myAddress),
+ _host(host ? host : "127.0.0.1"),
+ _db(db),
+ _auth(auth ? auth : ""),
+ _port((port > 0) ? port : 28015),
+ _ready(2), // two tables need to be synchronized before we're ready
+ _run(1)
+{
+ _readyLock.lock();
+
+ {
+ char tmp[32];
+ _myAddress.toString(tmp);
+ _myAddressStr = tmp;
+ }
+
+ _membersDbWatcher = std::thread([this]() {
+ while (_run == 1) {
+ try {
+ auto rdb = R::connect(this->_host,this->_port,this->_auth);
+ if (rdb) {
+ _membersDbWatcherConnection = (void *)rdb.get();
+ auto cur = R::db(this->_db).table("Member").get_all(this->_myAddressStr,R::optargs("index","controllerId")).changes(R::optargs("squash",0.1,"include_initial",true,"include_types",true,"include_states",true)).run(*rdb);
+ while (cur.has_next()) {
+ if (_run != 1) break;
+ json tmp(json::parse(cur.next().as_json()));
+ if ((tmp["type"] == "state")&&(tmp["state"] == "ready")) {
+ if (--this->_ready == 0)
+ this->_readyLock.unlock();
+ } else {
+ try {
+ this->_memberChanged(tmp["old_val"],tmp["new_val"]);
+ } catch ( ... ) {} // ignore bad records
+ }
+ }
+ }
+ } catch (std::exception &e) {
+ fprintf(stderr,"ERROR: controller RethinkDB: %s" ZT_EOL_S,e.what());
+ } catch (R::Error &e) {
+ fprintf(stderr,"ERROR: controller RethinkDB: %s" ZT_EOL_S,e.message.c_str());
+ } catch ( ... ) {
+ fprintf(stderr,"ERROR: controller RethinkDB: unknown exception" ZT_EOL_S);
+ }
+ std::this_thread::sleep_for(std::chrono::milliseconds(250));
+ }
+ });
+
+ _networksDbWatcher = std::thread([this]() {
+ while (_run == 1) {
+ try {
+ auto rdb = R::connect(this->_host,this->_port,this->_auth);
+ if (rdb) {
+ _membersDbWatcherConnection = (void *)rdb.get();
+ auto cur = R::db(this->_db).table("Network").get_all(this->_myAddressStr,R::optargs("index","controllerId")).changes(R::optargs("squash",0.1,"include_initial",true,"include_types",true,"include_states",true)).run(*rdb);
+ while (cur.has_next()) {
+ if (_run != 1) break;
+ json tmp(json::parse(cur.next().as_json()));
+ if ((tmp["type"] == "state")&&(tmp["state"] == "ready")) {
+ if (--this->_ready == 0)
+ this->_readyLock.unlock();
+ } else {
+ try {
+ this->_networkChanged(tmp["old_val"],tmp["new_val"]);
+ } catch ( ... ) {} // ignore bad records
+ }
+ }
+ }
+ } catch (std::exception &e) {
+ fprintf(stderr,"ERROR: controller RethinkDB: %s" ZT_EOL_S,e.what());
+ } catch (R::Error &e) {
+ fprintf(stderr,"ERROR: controller RethinkDB: %s" ZT_EOL_S,e.message.c_str());
+ } catch ( ... ) {
+ fprintf(stderr,"ERROR: controller RethinkDB: unknown exception" ZT_EOL_S);
+ }
+ std::this_thread::sleep_for(std::chrono::milliseconds(250));
+ }
+ });
+}
+
+RethinkDB::~RethinkDB()
+{
+ // FIXME: not totally safe but will generally work, and only happens on shutdown anyway
+ _run = 0;
+ std::this_thread::sleep_for(std::chrono::milliseconds(10));
+ if (_membersDbWatcherConnection)
+ ((R::Connection *)_membersDbWatcherConnection)->close();
+ if (_networksDbWatcherConnection)
+ ((R::Connection *)_networksDbWatcherConnection)->close();
+ _membersDbWatcher.join();
+ _networksDbWatcher.join();
+}
+
+inline bool RethinkDB::get(const uint64_t networkId,nlohmann::json &network)
+{
+ std::shared_ptr<_Network> nw;
+ {
+ std::lock_guard<std::mutex> l(_networks_l);
+ auto nwi = _networks.find(networkId);
+ if (nwi == _networks.end())
+ return false;
+ nw = nwi->second;
+ }
+
+ std::lock_guard<std::mutex> l2(nw->lock);
+ network = nw->config;
+
+ return true;
+}
+
+inline bool RethinkDB::get(const uint64_t networkId,nlohmann::json &network,const uint64_t memberId,nlohmann::json &member,NetworkSummaryInfo &info)
+{
+ std::shared_ptr<_Network> nw;
+ {
+ std::lock_guard<std::mutex> l(_networks_l);
+ auto nwi = _networks.find(networkId);
+ if (nwi == _networks.end())
+ return false;
+ nw = nwi->second;
+ }
+
+ std::lock_guard<std::mutex> l2(nw->lock);
+ auto m = nw->members.find(memberId);
+ if (m == nw->members.end())
+ return false;
+ network = nw->config;
+ member = m->second;
+ _fillSummaryInfo(nw,info);
+
+ return true;
+}
+
+inline bool RethinkDB::get(const uint64_t networkId,nlohmann::json &network,std::vector<nlohmann::json> &members)
+{
+ std::shared_ptr<_Network> nw;
+ {
+ std::lock_guard<std::mutex> l(_networks_l);
+ auto nwi = _networks.find(networkId);
+ if (nwi == _networks.end())
+ return false;
+ nw = nwi->second;
+ }
+
+ std::lock_guard<std::mutex> l2(nw->lock);
+ network = nw->config;
+ for(auto m=nw->members.begin();m!=nw->members.end();++m)
+ members.push_back(m->second);
+
+ return true;
+}
+
+inline bool RethinkDB::summary(const uint64_t networkId,NetworkSummaryInfo &info)
+{
+ std::shared_ptr<_Network> nw;
+ {
+ std::lock_guard<std::mutex> l(_networks_l);
+ auto nwi = _networks.find(networkId);
+ if (nwi == _networks.end())
+ return false;
+ nw = nwi->second;
+ }
+
+ std::lock_guard<std::mutex> l2(nw->lock);
+ _fillSummaryInfo(nw,info);
+
+ return true;
+}
+
+void RethinkDB::_memberChanged(nlohmann::json &old,nlohmann::json &member)
+{
+ uint64_t memberId = 0;
+ uint64_t networkId = 0;
+ std::shared_ptr<_Network> nw;
+
+ if (old.is_object()) {
+ json &config = old["config"];
+ if (config.is_object()) {
+ memberId = OSUtils::jsonIntHex(config["id"],0ULL);
+ networkId = OSUtils::jsonIntHex(config["nwid"],0ULL);
+ if ((memberId)&&(networkId)) {
+ {
+ std::lock_guard<std::mutex> l(_networks_l);
+ auto nw2 = _networks.find(networkId);
+ if (nw2 != _networks.end())
+ nw = nw2->second;
+ }
+ if (nw) {
+ std::lock_guard<std::mutex> l(nw->lock);
+ if (OSUtils::jsonBool(config["activeBridge"],false))
+ nw->activeBridgeMembers.erase(memberId);
+ if (OSUtils::jsonBool(config["authorized"],false))
+ nw->authorizedMembers.erase(memberId);
+ json &ips = config["ipAssignments"];
+ if (ips.is_array()) {
+ for(unsigned long i=0;i<ips.size();++i) {
+ json &ipj = ips[i];
+ if (ipj.is_string()) {
+ const std::string ips = ipj;
+ InetAddress ipa(ips.c_str());
+ ipa.setPort(0);
+ nw->allocatedIps.erase(ipa);
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+
+ if (member.is_object()) {
+ json &config = member["config"];
+ if (config.is_object()) {
+ if (!nw) {
+ memberId = OSUtils::jsonIntHex(config["id"],0ULL);
+ networkId = OSUtils::jsonIntHex(config["nwid"],0ULL);
+ if ((!memberId)||(!networkId))
+ return;
+ std::lock_guard<std::mutex> l(_networks_l);
+ std::shared_ptr<_Network> &nw2 = _networks[networkId];
+ if (!nw2)
+ nw2.reset(new _Network);
+ nw = nw2;
+ }
+ std::lock_guard<std::mutex> l(nw->lock);
+
+ nw->members[memberId] = config;
+
+ if (OSUtils::jsonBool(config["activeBridge"],false))
+ nw->activeBridgeMembers.insert(memberId);
+ const bool isAuth = OSUtils::jsonBool(config["authorized"],false);
+ if (isAuth)
+ nw->authorizedMembers.insert(memberId);
+ json &ips = config["ipAssignments"];
+ if (ips.is_array()) {
+ for(unsigned long i=0;i<ips.size();++i) {
+ json &ipj = ips[i];
+ if (ipj.is_string()) {
+ const std::string ips = ipj;
+ InetAddress ipa(ips.c_str());
+ ipa.setPort(0);
+ nw->allocatedIps.insert(ipa);
+ }
+ }
+ }
+
+ if (!isAuth) {
+ const int64_t ldt = (int64_t)OSUtils::jsonInt(config["lastDeauthorizedTime"],0ULL);
+ if (ldt > nw->mostRecentDeauthTime)
+ nw->mostRecentDeauthTime = ldt;
+ }
+ }
+ }
+}
+
+void RethinkDB::_networkChanged(nlohmann::json &old,nlohmann::json &network)
+{
+ if (network.is_object()) {
+ json &config = network["config"];
+ if (config.is_object()) {
+ const std::string ids = config["id"];
+ const uint64_t id = Utils::hexStrToU64(ids.c_str());
+ if (id) {
+ std::shared_ptr<_Network> nw;
+ {
+ std::lock_guard<std::mutex> l(_networks_l);
+ std::shared_ptr<_Network> &nw2 = _networks[id];
+ if (!nw2)
+ nw2.reset(new _Network);
+ nw = nw2;
+ }
+ std::lock_guard<std::mutex> l2(nw->lock);
+ nw->config = config;
+ }
+ }
+ } else if (old.is_object()) {
+ const std::string ids = old["id"];
+ const uint64_t id = Utils::hexStrToU64(ids.c_str());
+ if (id) {
+ std::lock_guard<std::mutex> l(_networks_l);
+ _networks.erase(id);
+ }
+ }
+}
+
+} // namespace ZeroTier
+
+/*
+int main(int argc,char **argv)
+{
+ ZeroTier::RethinkDB db(ZeroTier::Address(0x8056c2e21cULL),"10.6.6.188",28015,"ztc","");
+ db.waitForReady();
+ printf("ready.\n");
+ pause();
+}
+*/
diff --git a/controller/RethinkDB.hpp b/controller/RethinkDB.hpp
new file mode 100644
index 00000000..7ed0e0a8
--- /dev/null
+++ b/controller/RethinkDB.hpp
@@ -0,0 +1,101 @@
+#ifndef ZT_CONTROLLER_RETHINKDB_HPP
+#define ZT_CONTROLLER_RETHINKDB_HPP
+
+#include "../node/Constants.hpp"
+#include "../node/Address.hpp"
+#include "../node/InetAddress.hpp"
+#include "../osdep/OSUtils.hpp"
+
+#include <memory>
+#include <string>
+#include <thread>
+#include <unordered_map>
+#include <unordered_set>
+#include <vector>
+
+#include "../ext/json/json.hpp"
+
+namespace ZeroTier
+{
+
+class RethinkDB
+{
+public:
+ struct NetworkSummaryInfo
+ {
+ NetworkSummaryInfo() : authorizedMemberCount(0),totalMemberCount(0),mostRecentDeauthTime(0) {}
+ std::vector<Address> activeBridges;
+ std::vector<InetAddress> allocatedIps;
+ unsigned long authorizedMemberCount;
+ unsigned long totalMemberCount;
+ int64_t mostRecentDeauthTime;
+ };
+
+ RethinkDB(const Address &myAddress,const char *host,const int port,const char *db,const char *auth);
+ ~RethinkDB();
+
+ inline bool ready() const { return (_ready <= 0); }
+
+ inline void waitForReady() const
+ {
+ while (_ready > 0) {
+ _readyLock.lock();
+ _readyLock.unlock();
+ }
+ }
+
+ bool get(const uint64_t networkId,nlohmann::json &network);
+ bool get(const uint64_t networkId,nlohmann::json &network,const uint64_t memberId,nlohmann::json &member,NetworkSummaryInfo &info);
+ bool get(const uint64_t networkId,nlohmann::json &network,std::vector<nlohmann::json> &members);
+ bool summary(const uint64_t networkId,NetworkSummaryInfo &info);
+
+private:
+ struct _Network
+ {
+ _Network() : mostRecentDeauthTime(0) {}
+ nlohmann::json config;
+ std::unordered_map<uint64_t,nlohmann::json> members;
+ std::unordered_set<uint64_t> activeBridgeMembers;
+ std::unordered_set<uint64_t> authorizedMembers;
+ std::unordered_set<InetAddress,InetAddress::Hasher> allocatedIps;
+ int64_t mostRecentDeauthTime;
+ std::mutex lock;
+ };
+
+ void _memberChanged(nlohmann::json &old,nlohmann::json &member);
+ void _networkChanged(nlohmann::json &old,nlohmann::json &network);
+
+ inline void _fillSummaryInfo(const std::shared_ptr<_Network> &nw,NetworkSummaryInfo &info)
+ {
+ for(auto ab=nw->activeBridgeMembers.begin();ab!=nw->activeBridgeMembers.end();++ab)
+ info.activeBridges.push_back(Address(*ab));
+ for(auto ip=nw->allocatedIps.begin();ip!=nw->allocatedIps.end();++ip)
+ info.allocatedIps.push_back(*ip);
+ info.authorizedMemberCount = (unsigned long)nw->authorizedMembers.size();
+ info.totalMemberCount = (unsigned long)nw->members.size();
+ info.mostRecentDeauthTime = nw->mostRecentDeauthTime;
+ }
+
+ const Address _myAddress;
+ std::string _myAddressStr;
+ std::string _host;
+ std::string _db;
+ std::string _auth;
+ const int _port;
+
+ void *_networksDbWatcherConnection;
+ void *_membersDbWatcherConnection;
+ std::thread _networksDbWatcher;
+ std::thread _membersDbWatcher;
+
+ std::unordered_map< uint64_t,std::shared_ptr<_Network> > _networks;
+ std::mutex _networks_l;
+
+ mutable std::mutex _readyLock; // locked until ready
+ std::atomic<int> _ready;
+ std::atomic<int> _run;
+};
+
+} // namespace ZeroTier
+
+#endif