summaryrefslogtreecommitdiff
path: root/controller/RethinkDB.cpp
diff options
context:
space:
mode:
authorAdam Ierymenko <adam.ierymenko@gmail.com>2017-11-08 11:06:14 -0800
committerAdam Ierymenko <adam.ierymenko@gmail.com>2017-11-08 11:06:14 -0800
commit4166d8ca35ded34180d60b56105a853dd6b02ff4 (patch)
tree3464cfb6e36f67ec5a8fe6f59eae8cbc40759c40 /controller/RethinkDB.cpp
parent53e7e950f187008939dd5021f9d5f635f995f022 (diff)
downloadinfinitytier-4166d8ca35ded34180d60b56105a853dd6b02ff4.tar.gz
infinitytier-4166d8ca35ded34180d60b56105a853dd6b02ff4.zip
Fix a deadlock and some more work on RethinkDB (for central) integration.
Diffstat (limited to 'controller/RethinkDB.cpp')
-rw-r--r--controller/RethinkDB.cpp74
1 files changed, 71 insertions, 3 deletions
diff --git a/controller/RethinkDB.cpp b/controller/RethinkDB.cpp
index d1012167..031bd516 100644
--- a/controller/RethinkDB.cpp
+++ b/controller/RethinkDB.cpp
@@ -215,6 +215,47 @@ RethinkDB::RethinkDB(EmbeddedNetworkController *const nc,const Address &myAddres
});
}
+ _onlineNotificationThread = std::thread([this]() {
+ try {
+ std::unique_ptr<R::Connection> rdb;
+ while (_run == 1) {
+ try {
+ if (!rdb)
+ rdb = R::connect(this->_host,this->_port,this->_auth);
+ if (rdb) {
+ std::lock_guard<std::mutex> l(_lastOnline_l);
+ R::Array batch;
+ R::Object tmpobj;
+ for(auto i=_lastOnline.begin();i!=_lastOnline.end();++i) {
+ char nodeId[16];
+ Utils::hex10(i->first,nodeId);
+ tmpobj["id"] = nodeId;
+ tmpobj["ts"] = i->second;
+ batch.emplace_back(tmpobj);
+ if (batch.size() >= 256) {
+ R::db(this->_db).table("NodeLastOnline").insert(R::args(batch),R::optargs("conflict","update")).run(*rdb);
+ batch.clear();
+ }
+ }
+ if (batch.size() > 0)
+ R::db(this->_db).table("NodeLastOnline").insert(R::args(batch),R::optargs("conflict","update")).run(*rdb);
+ _lastOnline.clear();
+ }
+ } catch (std::exception &e) {
+ fprintf(stderr,"ERROR: controller RethinkDB (node status update): %s" ZT_EOL_S,e.what());
+ rdb.reset();
+ } catch (R::Error &e) {
+ fprintf(stderr,"ERROR: controller RethinkDB (node status update): %s" ZT_EOL_S,e.message.c_str());
+ rdb.reset();
+ } catch ( ... ) {
+ fprintf(stderr,"ERROR: controller RethinkDB (node status update): unknown exception" ZT_EOL_S);
+ rdb.reset();
+ }
+ std::this_thread::sleep_for(std::chrono::milliseconds(250));
+ }
+ } catch ( ... ) {}
+ });
+
_heartbeatThread = std::thread([this]() {
try {
char tmp[1024];
@@ -251,9 +292,10 @@ RethinkDB::~RethinkDB()
_membersDbWatcher.join();
_networksDbWatcher.join();
_heartbeatThread.join();
+ _onlineNotificationThread.join();
}
-void RethinkDB::waitForReady()
+bool RethinkDB::waitForReady()
{
while (_ready > 0) {
if (!_waitNoticePrinted) {
@@ -263,12 +305,32 @@ void RethinkDB::waitForReady()
_readyLock.lock();
_readyLock.unlock();
}
+ return true;
}
-void RethinkDB::save(const nlohmann::json &record)
+void RethinkDB::save(nlohmann::json *orig,nlohmann::json &record)
{
+ if (!record.is_object()) // sanity check
+ return;
waitForReady();
- _commitQueue.post(new nlohmann::json(record));
+ if (orig) {
+ if (*orig != record) {
+ nlohmann::json *q = new nlohmann::json();
+ try {
+ record["revision"] = OSUtils::jsonInt(record["revision"],0ULL) + 1;
+ for(auto kv=record.begin();kv!=record.end();++kv) {
+ if ((kv.key() == "id")||(kv.key() == "nwid")||(kv.key() == "objtype")||((*q)[kv.key()] != kv.value()))
+ (*q)[kv.key()] = kv.value();
+ }
+ } catch ( ... ) {
+ delete q;
+ throw;
+ }
+ }
+ } else {
+ record["revision"] = 1;
+ _commitQueue.post(new nlohmann::json(record));
+ }
}
void RethinkDB::eraseNetwork(const uint64_t networkId)
@@ -295,6 +357,12 @@ void RethinkDB::eraseMember(const uint64_t networkId,const uint64_t memberId)
_commitQueue.post(tmp);
}
+void RethinkDB::nodeIsOnline(const uint64_t memberId)
+{
+ std::lock_guard<std::mutex> l(_lastOnline_l);
+ _lastOnline[memberId] = OSUtils::now();
+}
+
} // namespace ZeroTier
#endif // ZT_CONTROLLER_USE_RETHINKDB