diff options
| author | Adam Ierymenko <adam.ierymenko@gmail.com> | 2017-11-08 11:06:14 -0800 |
|---|---|---|
| committer | Adam Ierymenko <adam.ierymenko@gmail.com> | 2017-11-08 11:06:14 -0800 |
| commit | 4166d8ca35ded34180d60b56105a853dd6b02ff4 (patch) | |
| tree | 3464cfb6e36f67ec5a8fe6f59eae8cbc40759c40 /controller/RethinkDB.cpp | |
| parent | 53e7e950f187008939dd5021f9d5f635f995f022 (diff) | |
| download | infinitytier-4166d8ca35ded34180d60b56105a853dd6b02ff4.tar.gz infinitytier-4166d8ca35ded34180d60b56105a853dd6b02ff4.zip | |
Fix a deadlock and some more work on RethinkDB (for central) integration.
Diffstat (limited to 'controller/RethinkDB.cpp')
| -rw-r--r-- | controller/RethinkDB.cpp | 74 |
1 files changed, 71 insertions, 3 deletions
diff --git a/controller/RethinkDB.cpp b/controller/RethinkDB.cpp index d1012167..031bd516 100644 --- a/controller/RethinkDB.cpp +++ b/controller/RethinkDB.cpp @@ -215,6 +215,47 @@ RethinkDB::RethinkDB(EmbeddedNetworkController *const nc,const Address &myAddres }); } + _onlineNotificationThread = std::thread([this]() { + try { + std::unique_ptr<R::Connection> rdb; + while (_run == 1) { + try { + if (!rdb) + rdb = R::connect(this->_host,this->_port,this->_auth); + if (rdb) { + std::lock_guard<std::mutex> l(_lastOnline_l); + R::Array batch; + R::Object tmpobj; + for(auto i=_lastOnline.begin();i!=_lastOnline.end();++i) { + char nodeId[16]; + Utils::hex10(i->first,nodeId); + tmpobj["id"] = nodeId; + tmpobj["ts"] = i->second; + batch.emplace_back(tmpobj); + if (batch.size() >= 256) { + R::db(this->_db).table("NodeLastOnline").insert(R::args(batch),R::optargs("conflict","update")).run(*rdb); + batch.clear(); + } + } + if (batch.size() > 0) + R::db(this->_db).table("NodeLastOnline").insert(R::args(batch),R::optargs("conflict","update")).run(*rdb); + _lastOnline.clear(); + } + } catch (std::exception &e) { + fprintf(stderr,"ERROR: controller RethinkDB (node status update): %s" ZT_EOL_S,e.what()); + rdb.reset(); + } catch (R::Error &e) { + fprintf(stderr,"ERROR: controller RethinkDB (node status update): %s" ZT_EOL_S,e.message.c_str()); + rdb.reset(); + } catch ( ... ) { + fprintf(stderr,"ERROR: controller RethinkDB (node status update): unknown exception" ZT_EOL_S); + rdb.reset(); + } + std::this_thread::sleep_for(std::chrono::milliseconds(250)); + } + } catch ( ... ) {} + }); + _heartbeatThread = std::thread([this]() { try { char tmp[1024]; @@ -251,9 +292,10 @@ RethinkDB::~RethinkDB() _membersDbWatcher.join(); _networksDbWatcher.join(); _heartbeatThread.join(); + _onlineNotificationThread.join(); } -void RethinkDB::waitForReady() +bool RethinkDB::waitForReady() { while (_ready > 0) { if (!_waitNoticePrinted) { @@ -263,12 +305,32 @@ void RethinkDB::waitForReady() _readyLock.lock(); _readyLock.unlock(); } + return true; } -void RethinkDB::save(const nlohmann::json &record) +void RethinkDB::save(nlohmann::json *orig,nlohmann::json &record) { + if (!record.is_object()) // sanity check + return; waitForReady(); - _commitQueue.post(new nlohmann::json(record)); + if (orig) { + if (*orig != record) { + nlohmann::json *q = new nlohmann::json(); + try { + record["revision"] = OSUtils::jsonInt(record["revision"],0ULL) + 1; + for(auto kv=record.begin();kv!=record.end();++kv) { + if ((kv.key() == "id")||(kv.key() == "nwid")||(kv.key() == "objtype")||((*q)[kv.key()] != kv.value())) + (*q)[kv.key()] = kv.value(); + } + } catch ( ... ) { + delete q; + throw; + } + } + } else { + record["revision"] = 1; + _commitQueue.post(new nlohmann::json(record)); + } } void RethinkDB::eraseNetwork(const uint64_t networkId) @@ -295,6 +357,12 @@ void RethinkDB::eraseMember(const uint64_t networkId,const uint64_t memberId) _commitQueue.post(tmp); } +void RethinkDB::nodeIsOnline(const uint64_t memberId) +{ + std::lock_guard<std::mutex> l(_lastOnline_l); + _lastOnline[memberId] = OSUtils::now(); +} + } // namespace ZeroTier #endif // ZT_CONTROLLER_USE_RETHINKDB |
