diff options
| author | Adam Ierymenko <adam.ierymenko@gmail.com> | 2017-11-07 14:44:46 -0800 |
|---|---|---|
| committer | Adam Ierymenko <adam.ierymenko@gmail.com> | 2017-11-07 14:44:46 -0800 |
| commit | 1613f42d0082cf6438ad0c62d89405ab82625f98 (patch) | |
| tree | bc46b11bc909b28de54252e1691a58380b28bae9 /controller/RethinkDB.cpp | |
| parent | 7fc9094d8ea1c2d28d003c499016f0755b73063d (diff) | |
| download | infinitytier-1613f42d0082cf6438ad0c62d89405ab82625f98.tar.gz infinitytier-1613f42d0082cf6438ad0c62d89405ab82625f98.zip | |
Re-integrate in-filesystem DB into new controller DB structure.
Diffstat (limited to 'controller/RethinkDB.cpp')
| -rw-r--r-- | controller/RethinkDB.cpp | 287 |
1 files changed, 19 insertions, 268 deletions
diff --git a/controller/RethinkDB.cpp b/controller/RethinkDB.cpp index ffa1a188..bea941fa 100644 --- a/controller/RethinkDB.cpp +++ b/controller/RethinkDB.cpp @@ -33,12 +33,12 @@ using json = nlohmann::json; namespace ZeroTier { RethinkDB::RethinkDB(EmbeddedNetworkController *const nc,const Address &myAddress,const char *path) : - _controller(nc), - _myAddress(myAddress), + DB(nc,myAddress,path), _ready(2), // two tables need to be synchronized before we're ready, so this is ready when it reaches 0 _run(1), _waitNoticePrinted(false) { + // rethinkdb:host:port:db[:auth] std::vector<std::string> ps(OSUtils::split(path,":","","")); if ((ps.size() < 4)||(ps[0] != "rethinkdb")) throw std::runtime_error("invalid rethinkdb database url"); @@ -50,12 +50,6 @@ RethinkDB::RethinkDB(EmbeddedNetworkController *const nc,const Address &myAddres _readyLock.lock(); - { - char tmp[32]; - _myAddress.toString(tmp); - _myAddressStr = tmp; - } - _membersDbWatcher = std::thread([this]() { try { while (_run == 1) { @@ -79,7 +73,7 @@ RethinkDB::RethinkDB(EmbeddedNetworkController *const nc,const Address &myAddres json &nv = tmp["new_val"]; if (ov.is_object()||nv.is_object()) { //if (nv.is_object()) printf("MEMBER: %s" ZT_EOL_S,nv.dump().c_str()); - this->_memberChanged(ov,nv); + this->_memberChanged(ov,nv,(this->_ready <= 0)); } } catch ( ... ) {} // ignore bad records } @@ -120,7 +114,7 @@ RethinkDB::RethinkDB(EmbeddedNetworkController *const nc,const Address &myAddres json &nv = tmp["new_val"]; if (ov.is_object()||nv.is_object()) { //if (nv.is_object()) printf("NETWORK: %s" ZT_EOL_S,nv.dump().c_str()); - this->_networkChanged(ov,nv); + this->_networkChanged(ov,nv,(this->_ready <= 0)); } } catch ( ... ) {} // ignore bad records } @@ -166,18 +160,18 @@ RethinkDB::RethinkDB(EmbeddedNetworkController *const nc,const Address &myAddres record["controllerId"] = this->_myAddressStr; record["config"] = *config; table = "Network"; - } else if (objtype == "delete_network") { + } else if (objtype == "trace") { + record = *config; + table = "RemoteTrace"; + } else if (objtype == "_delete_network") { deleteId = (*config)["id"]; table = "Network"; - } else if (objtype == "delete_member") { + } else if (objtype == "_delete_member") { deleteId = (*config)["nwid"]; deleteId.push_back('-'); const std::string tmp = (*config)["id"]; deleteId.append(tmp); table = "Member"; - } else if (objtype == "trace") { - record = *config; - table = "RemoteTrace"; } else { delete config; continue; @@ -259,114 +253,16 @@ RethinkDB::~RethinkDB() _heartbeatThread.join(); } -bool RethinkDB::get(const uint64_t networkId,nlohmann::json &network) -{ - waitForReady(); - std::shared_ptr<_Network> nw; - { - std::lock_guard<std::mutex> l(_networks_l); - auto nwi = _networks.find(networkId); - if (nwi == _networks.end()) - return false; - nw = nwi->second; - } - { - std::lock_guard<std::mutex> l2(nw->lock); - network = nw->config; - } - return true; -} - -bool RethinkDB::get(const uint64_t networkId,nlohmann::json &network,const uint64_t memberId,nlohmann::json &member) -{ - waitForReady(); - std::shared_ptr<_Network> nw; - { - std::lock_guard<std::mutex> l(_networks_l); - auto nwi = _networks.find(networkId); - if (nwi == _networks.end()) - return false; - nw = nwi->second; - } - { - std::lock_guard<std::mutex> l2(nw->lock); - network = nw->config; - auto m = nw->members.find(memberId); - if (m == nw->members.end()) - return false; - member = m->second; - } - return true; -} - -bool RethinkDB::get(const uint64_t networkId,nlohmann::json &network,const uint64_t memberId,nlohmann::json &member,NetworkSummaryInfo &info) +void RethinkDB::waitForReady() const { - waitForReady(); - std::shared_ptr<_Network> nw; - { - std::lock_guard<std::mutex> l(_networks_l); - auto nwi = _networks.find(networkId); - if (nwi == _networks.end()) - return false; - nw = nwi->second; - } - { - std::lock_guard<std::mutex> l2(nw->lock); - network = nw->config; - _fillSummaryInfo(nw,info); - auto m = nw->members.find(memberId); - if (m == nw->members.end()) - return false; - member = m->second; - } - return true; -} - -bool RethinkDB::get(const uint64_t networkId,nlohmann::json &network,std::vector<nlohmann::json> &members) -{ - waitForReady(); - std::shared_ptr<_Network> nw; - { - std::lock_guard<std::mutex> l(_networks_l); - auto nwi = _networks.find(networkId); - if (nwi == _networks.end()) - return false; - nw = nwi->second; - } - { - std::lock_guard<std::mutex> l2(nw->lock); - network = nw->config; - for(auto m=nw->members.begin();m!=nw->members.end();++m) - members.push_back(m->second); - } - return true; -} - -bool RethinkDB::summary(const uint64_t networkId,NetworkSummaryInfo &info) -{ - waitForReady(); - std::shared_ptr<_Network> nw; - { - std::lock_guard<std::mutex> l(_networks_l); - auto nwi = _networks.find(networkId); - if (nwi == _networks.end()) - return false; - nw = nwi->second; - } - { - std::lock_guard<std::mutex> l2(nw->lock); - _fillSummaryInfo(nw,info); + while (_ready > 0) { + if (!_waitNoticePrinted) { + _waitNoticePrinted = true; + fprintf(stderr,"NOTICE: controller RethinkDB waiting for initial data download..." ZT_EOL_S); + } + _readyLock.lock(); + _readyLock.unlock(); } - return true; -} - -void RethinkDB::networks(std::vector<uint64_t> &networks) -{ - waitForReady(); - std::lock_guard<std::mutex> l(_networks_l); - networks.reserve(_networks.size() + 1); - for(auto n=_networks.begin();n!=_networks.end();++n) - networks.push_back(n->first); } void RethinkDB::save(const nlohmann::json &record) @@ -382,7 +278,7 @@ void RethinkDB::eraseNetwork(const uint64_t networkId) Utils::hex(networkId,tmp2); json *tmp = new json(); (*tmp)["id"] = tmp2; - (*tmp)["objtype"] = "delete_network"; // pseudo-type, tells thread to delete network + (*tmp)["objtype"] = "_delete_network"; // pseudo-type, tells thread to delete network _commitQueue.post(tmp); } @@ -395,155 +291,10 @@ void RethinkDB::eraseMember(const uint64_t networkId,const uint64_t memberId) (*tmp)["nwid"] = tmp2; Utils::hex10(memberId,tmp2); (*tmp)["id"] = tmp2; - (*tmp)["objtype"] = "delete_member"; // pseudo-type, tells thread to delete network + (*tmp)["objtype"] = "_delete_member"; // pseudo-type, tells thread to delete network _commitQueue.post(tmp); } -void RethinkDB::_memberChanged(nlohmann::json &old,nlohmann::json &member) -{ - uint64_t memberId = 0; - uint64_t networkId = 0; - bool isAuth = false; - bool wasAuth = false; - std::shared_ptr<_Network> nw; - - if (old.is_object()) { - json &config = old["config"]; - if (config.is_object()) { - memberId = OSUtils::jsonIntHex(config["id"],0ULL); - networkId = OSUtils::jsonIntHex(config["nwid"],0ULL); - if ((memberId)&&(networkId)) { - { - std::lock_guard<std::mutex> l(_networks_l); - auto nw2 = _networks.find(networkId); - if (nw2 != _networks.end()) - nw = nw2->second; - } - if (nw) { - std::lock_guard<std::mutex> l(nw->lock); - if (OSUtils::jsonBool(config["activeBridge"],false)) - nw->activeBridgeMembers.erase(memberId); - wasAuth = OSUtils::jsonBool(config["authorized"],false); - if (wasAuth) - nw->authorizedMembers.erase(memberId); - json &ips = config["ipAssignments"]; - if (ips.is_array()) { - for(unsigned long i=0;i<ips.size();++i) { - json &ipj = ips[i]; - if (ipj.is_string()) { - const std::string ips = ipj; - InetAddress ipa(ips.c_str()); - ipa.setPort(0); - nw->allocatedIps.erase(ipa); - } - } - } - } - } - } - } - - if (member.is_object()) { - json &config = member["config"]; - if (config.is_object()) { - if (!nw) { - memberId = OSUtils::jsonIntHex(config["id"],0ULL); - networkId = OSUtils::jsonIntHex(config["nwid"],0ULL); - if ((!memberId)||(!networkId)) - return; - std::lock_guard<std::mutex> l(_networks_l); - std::shared_ptr<_Network> &nw2 = _networks[networkId]; - if (!nw2) - nw2.reset(new _Network); - nw = nw2; - } - - { - std::lock_guard<std::mutex> l(nw->lock); - - nw->members[memberId] = config; - - if (OSUtils::jsonBool(config["activeBridge"],false)) - nw->activeBridgeMembers.insert(memberId); - isAuth = OSUtils::jsonBool(config["authorized"],false); - if (isAuth) - nw->authorizedMembers.insert(memberId); - json &ips = config["ipAssignments"]; - if (ips.is_array()) { - for(unsigned long i=0;i<ips.size();++i) { - json &ipj = ips[i]; - if (ipj.is_string()) { - const std::string ips = ipj; - InetAddress ipa(ips.c_str()); - ipa.setPort(0); - nw->allocatedIps.insert(ipa); - } - } - } - - if (!isAuth) { - const int64_t ldt = (int64_t)OSUtils::jsonInt(config["lastDeauthorizedTime"],0ULL); - if (ldt > nw->mostRecentDeauthTime) - nw->mostRecentDeauthTime = ldt; - } - } - - _controller->onNetworkMemberUpdate(networkId,memberId); - } - } else if (memberId) { - if (nw) { - std::lock_guard<std::mutex> l(nw->lock); - nw->members.erase(memberId); - } - if (networkId) { - std::lock_guard<std::mutex> l(_networks_l); - auto er = _networkByMember.equal_range(memberId); - for(auto i=er.first;i!=er.second;++i) { - if (i->second == networkId) { - _networkByMember.erase(i); - break; - } - } - } - } - - if ((wasAuth)&&(!isAuth)&&(networkId)&&(memberId)) - _controller->onNetworkMemberDeauthorize(networkId,memberId); -} - -void RethinkDB::_networkChanged(nlohmann::json &old,nlohmann::json &network) -{ - if (network.is_object()) { - json &config = network["config"]; - if (config.is_object()) { - const std::string ids = config["id"]; - const uint64_t id = Utils::hexStrToU64(ids.c_str()); - if (id) { - std::shared_ptr<_Network> nw; - { - std::lock_guard<std::mutex> l(_networks_l); - std::shared_ptr<_Network> &nw2 = _networks[id]; - if (!nw2) - nw2.reset(new _Network); - nw = nw2; - } - { - std::lock_guard<std::mutex> l2(nw->lock); - nw->config = config; - } - _controller->onNetworkUpdate(id); - } - } - } else if (old.is_object()) { - const std::string ids = old["id"]; - const uint64_t id = Utils::hexStrToU64(ids.c_str()); - if (id) { - std::lock_guard<std::mutex> l(_networks_l); - _networks.erase(id); - } - } -} - } // namespace ZeroTier #endif // ZT_CONTROLLER_USE_RETHINKDB |
