From 92c7070aa85425041f856d7e4203bdd1ae713c33 Mon Sep 17 00:00:00 2001 From: Adam Ierymenko Date: Fri, 3 Nov 2017 20:55:16 -0400 Subject: RethinkDB fixes. --- controller/RethinkDB.cpp | 288 ++++++++++++++++++++++++++--------------------- 1 file changed, 157 insertions(+), 131 deletions(-) (limited to 'controller/RethinkDB.cpp') diff --git a/controller/RethinkDB.cpp b/controller/RethinkDB.cpp index 6e656d23..1f8d29df 100644 --- a/controller/RethinkDB.cpp +++ b/controller/RethinkDB.cpp @@ -40,12 +40,13 @@ RethinkDB::RethinkDB(EmbeddedNetworkController *const nc,const Address &myAddres _waitNoticePrinted(false) { std::vector ps(OSUtils::split(path,":","","")); - if ((ps.size() != 5)||(ps[0] != "rethinkdb")) + if ((ps.size() < 4)||(ps[0] != "rethinkdb")) throw std::runtime_error("invalid rethinkdb database url"); _host = ps[1]; - _db = ps[2]; - _auth = ps[3]; - _port = Utils::strToInt(ps[4].c_str()); + _port = Utils::strToInt(ps[2].c_str()); + _db = ps[3]; + if (ps.size() > 4) + _auth = ps[4]; _readyLock.lock(); @@ -56,167 +57,192 @@ RethinkDB::RethinkDB(EmbeddedNetworkController *const nc,const Address &myAddres } _membersDbWatcher = std::thread([this]() { - while (_run == 1) { - try { - std::unique_ptr rdb(R::connect(this->_host,this->_port,this->_auth)); - if (rdb) { - _membersDbWatcherConnection = (void *)rdb.get(); - auto cur = R::db(this->_db).table("Member",R::optargs("read_mode","outdated")).get_all(this->_myAddressStr,R::optargs("index","controllerId")).changes(R::optargs("squash",0.05,"include_initial",true,"include_types",true,"include_states",true)).run(*rdb); - while (cur.has_next()) { - if (_run != 1) break; - json tmp(json::parse(cur.next().as_json())); - if ((tmp["type"] == "state")&&(tmp["state"] == "ready")) { - if (--this->_ready == 0) { - if (_waitNoticePrinted) - fprintf(stderr,"NOTICE: controller RethinkDB data download complete." ZT_EOL_S); - this->_readyLock.unlock(); + try { + while (_run == 1) { + try { + std::unique_ptr rdb(R::connect(this->_host,this->_port,this->_auth)); + if (rdb) { + _membersDbWatcherConnection = (void *)rdb.get(); + auto cur = R::db(this->_db).table("Member",R::optargs("read_mode","outdated")).get_all(this->_myAddressStr,R::optargs("index","controllerId")).changes(R::optargs("squash",0.05,"include_initial",true,"include_types",true,"include_states",true)).run(*rdb); + while (cur.has_next()) { + if (_run != 1) break; + json tmp(json::parse(cur.next().as_json())); + if ((tmp["type"] == "state")&&(tmp["state"] == "ready")) { + if (--this->_ready == 0) { + if (_waitNoticePrinted) + fprintf(stderr,"NOTICE: controller RethinkDB data download complete." ZT_EOL_S); + this->_readyLock.unlock(); + } + } else { + try { + json &ov = tmp["old_val"]; + json &nv = tmp["new_val"]; + if (ov.is_object()||nv.is_object()) { + this->_memberChanged(ov,nv); + } + } catch ( ... ) {} // ignore bad records } - } else { - try { - json &ov = tmp["old_val"]; - json &nv = tmp["new_val"]; - if (ov.is_object()||nv.is_object()) - this->_memberChanged(ov,nv); - } catch ( ... ) {} // ignore bad records } } + } catch (std::exception &e) { + fprintf(stderr,"ERROR: controller RethinkDB (member change stream): %s" ZT_EOL_S,e.what()); + } catch (R::Error &e) { + fprintf(stderr,"ERROR: controller RethinkDB (member change stream): %s" ZT_EOL_S,e.message.c_str()); + } catch ( ... ) { + fprintf(stderr,"ERROR: controller RethinkDB (member change stream): unknown exception" ZT_EOL_S); } - } catch (std::exception &e) { - fprintf(stderr,"ERROR: controller RethinkDB (member change stream): %s" ZT_EOL_S,e.what()); - } catch (R::Error &e) { - fprintf(stderr,"ERROR: controller RethinkDB (member change stream): %s" ZT_EOL_S,e.message.c_str()); - } catch ( ... ) { - fprintf(stderr,"ERROR: controller RethinkDB (member change stream): unknown exception" ZT_EOL_S); + std::this_thread::sleep_for(std::chrono::milliseconds(250)); } - std::this_thread::sleep_for(std::chrono::milliseconds(250)); - } + } catch ( ... ) {} }); _networksDbWatcher = std::thread([this]() { - while (_run == 1) { - try { - std::unique_ptr rdb(R::connect(this->_host,this->_port,this->_auth)); - if (rdb) { - _membersDbWatcherConnection = (void *)rdb.get(); - auto cur = R::db(this->_db).table("Network",R::optargs("read_mode","outdated")).get_all(this->_myAddressStr,R::optargs("index","controllerId")).changes(R::optargs("squash",0.05,"include_initial",true,"include_types",true,"include_states",true)).run(*rdb); - while (cur.has_next()) { - if (_run != 1) break; - json tmp(json::parse(cur.next().as_json())); - if ((tmp["type"] == "state")&&(tmp["state"] == "ready")) { - if (--this->_ready == 0) { - if (_waitNoticePrinted) - fprintf(stderr,"NOTICE: controller RethinkDB data download complete." ZT_EOL_S); - this->_readyLock.unlock(); + try { + while (_run == 1) { + try { + std::unique_ptr rdb(R::connect(this->_host,this->_port,this->_auth)); + if (rdb) { + _networksDbWatcherConnection = (void *)rdb.get(); + auto cur = R::db(this->_db).table("Network",R::optargs("read_mode","outdated")).get_all(this->_myAddressStr,R::optargs("index","controllerId")).changes(R::optargs("squash",0.05,"include_initial",true,"include_types",true,"include_states",true)).run(*rdb); + while (cur.has_next()) { + if (_run != 1) break; + json tmp(json::parse(cur.next().as_json())); + if ((tmp["type"] == "state")&&(tmp["state"] == "ready")) { + if (--this->_ready == 0) { + if (_waitNoticePrinted) + fprintf(stderr,"NOTICE: controller RethinkDB data download complete." ZT_EOL_S); + this->_readyLock.unlock(); + } + } else { + try { + json &ov = tmp["old_val"]; + json &nv = tmp["new_val"]; + if (ov.is_object()||nv.is_object()) { + this->_networkChanged(ov,nv); + } + } catch ( ... ) {} // ignore bad records } - } else { - try { - json &ov = tmp["old_val"]; - json &nv = tmp["new_val"]; - if (ov.is_object()||nv.is_object()) - this->_networkChanged(ov,nv); - } catch ( ... ) {} // ignore bad records } } + } catch (std::exception &e) { + fprintf(stderr,"ERROR: controller RethinkDB (network change stream): %s" ZT_EOL_S,e.what()); + } catch (R::Error &e) { + fprintf(stderr,"ERROR: controller RethinkDB (network change stream): %s" ZT_EOL_S,e.message.c_str()); + } catch ( ... ) { + fprintf(stderr,"ERROR: controller RethinkDB (network change stream): unknown exception" ZT_EOL_S); } - } catch (std::exception &e) { - fprintf(stderr,"ERROR: controller RethinkDB (network change stream): %s" ZT_EOL_S,e.what()); - } catch (R::Error &e) { - fprintf(stderr,"ERROR: controller RethinkDB (network change stream): %s" ZT_EOL_S,e.message.c_str()); - } catch ( ... ) { - fprintf(stderr,"ERROR: controller RethinkDB (network change stream): unknown exception" ZT_EOL_S); + std::this_thread::sleep_for(std::chrono::milliseconds(250)); } - std::this_thread::sleep_for(std::chrono::milliseconds(250)); - } + } catch ( ... ) {} }); for(int t=0;t rdb; - nlohmann::json *config = (nlohmann::json *)0; - while ((this->_commitQueue.get(config))&&(_run == 1)) { - if (!config) - continue; - json record; - const std::string objtype = (*config)["objtype"]; - const char *table; - std::string deleteId; - if (objtype == "member") { - const std::string nwid = (*config)["nwid"]; - const std::string id = (*config)["id"]; - record["id"] = nwid + "-" + id; - record["controllerId"] = this->_myAddressStr; - record["networkId"] = nwid; - record["nodeId"] = id; - record["config"] = *config; - table = "Member"; - } else if (objtype == "network") { - const std::string id = (*config)["id"]; - record["id"] = id; - record["controllerId"] = this->_myAddressStr; - record["config"] = *config; - table = "Network"; - } else if (objtype == "delete_network") { - deleteId = (*config)["id"]; - table = "Network"; - } else if (objtype == "delete_member") { - deleteId = (*config)["nwid"]; - deleteId.push_back('-'); - const std::string tmp = (*config)["id"]; - deleteId.append(tmp); - table = "Member"; - } else if (objtype == "trace") { - record = *config; - table = "RemoteTrace"; - } else { - continue; - } + try { + std::unique_ptr rdb; + nlohmann::json *config = (nlohmann::json *)0; + while ((this->_commitQueue.get(config))&&(_run == 1)) { + if (!config) + continue; + json record; + const std::string objtype = (*config)["objtype"]; + const char *table; + std::string deleteId; + if (objtype == "member") { + const std::string nwid = (*config)["nwid"]; + const std::string id = (*config)["id"]; + record["id"] = nwid + "-" + id; + record["controllerId"] = this->_myAddressStr; + record["networkId"] = nwid; + record["nodeId"] = id; + record["config"] = *config; + table = "Member"; + } else if (objtype == "network") { + const std::string id = (*config)["id"]; + record["id"] = id; + record["controllerId"] = this->_myAddressStr; + record["config"] = *config; + table = "Network"; + } else if (objtype == "delete_network") { + deleteId = (*config)["id"]; + table = "Network"; + } else if (objtype == "delete_member") { + deleteId = (*config)["nwid"]; + deleteId.push_back('-'); + const std::string tmp = (*config)["id"]; + deleteId.append(tmp); + table = "Member"; + } else if (objtype == "trace") { + record = *config; + table = "RemoteTrace"; + } else { + continue; + } - while (_run == 1) { - try { - if (!rdb) - rdb = R::connect(this->_host,this->_port,this->_auth); - if (rdb) { - if (deleteId.length() > 0) { - R::db(this->_db).table(table).get(deleteId).delete_().run(*rdb); + while (_run == 1) { + try { + if (!rdb) + rdb = R::connect(this->_host,this->_port,this->_auth); + if (rdb) { + if (deleteId.length() > 0) { + printf("DELETE: %s" ZT_EOL_S,deleteId.c_str()); + R::db(this->_db).table(table).get(deleteId).delete_().run(*rdb); + } else { + printf("UPSERT: %s" ZT_EOL_S,record.dump().c_str()); + R::db(this->_db).table(table).insert(R::Datum::from_json(record.dump()),R::optargs("conflict","update","return_changes",false)).run(*rdb); + } + break; } else { - R::db(this->_db).table(table).insert(record.dump(),R::optargs("conflict","update","return_changes",false)).run(*rdb); + fprintf(stderr,"ERROR: controller RethinkDB (insert/update): connect failed (will retry)" ZT_EOL_S); } - break; - } else { - fprintf(stderr,"ERROR: controller RethinkDB (insert/update): connect failed (will retry)" ZT_EOL_S); + } catch (std::exception &e) { + fprintf(stderr,"ERROR: controller RethinkDB (insert/update): %s" ZT_EOL_S,e.what()); + rdb.reset(); + } catch (R::Error &e) { + fprintf(stderr,"ERROR: controller RethinkDB (insert/update): %s" ZT_EOL_S,e.message.c_str()); + rdb.reset(); + } catch ( ... ) { + fprintf(stderr,"ERROR: controller RethinkDB (insert/update): unknown exception" ZT_EOL_S); + rdb.reset(); } - } catch (std::exception &e) { - fprintf(stderr,"ERROR: controller RethinkDB (insert/update): %s" ZT_EOL_S,e.what()); - rdb.reset(); - } catch (R::Error &e) { - fprintf(stderr,"ERROR: controller RethinkDB (insert/update): %s" ZT_EOL_S,e.message.c_str()); - rdb.reset(); - } catch ( ... ) { - fprintf(stderr,"ERROR: controller RethinkDB (insert/update): unknown exception" ZT_EOL_S); - rdb.reset(); + std::this_thread::sleep_for(std::chrono::milliseconds(250)); } - std::this_thread::sleep_for(std::chrono::milliseconds(250)); } - } + } catch ( ... ) {} }); } + + _heartbeatThread = std::thread([this]() { + try { + char tmp[1024]; + std::unique_ptr rdb; + while (_run == 1) { + try { + if (!rdb) + rdb = R::connect(this->_host,this->_port,this->_auth); + if (rdb) { + OSUtils::ztsnprintf(tmp,sizeof(tmp),"{\"id\":\"%s\",\"lastAlive\":%lld}",this->_myAddressStr.c_str(),(long long)OSUtils::now()); + R::db(this->_db).table("Controller").update(R::Datum::from_json(tmp)).run(*rdb); + } + } catch ( ... ) { + rdb.reset(); + } + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + } + } catch ( ... ) {} + }); } RethinkDB::~RethinkDB() { - // FIXME: not totally safe but will generally work, and only happens on shutdown anyway. - // Would need to add some kind of 'whack it' support to librethinkdbxx to do better. _run = 0; std::this_thread::sleep_for(std::chrono::milliseconds(100)); - if (_membersDbWatcherConnection) - ((R::Connection *)_membersDbWatcherConnection)->close(); - if (_networksDbWatcherConnection) - ((R::Connection *)_networksDbWatcherConnection)->close(); _commitQueue.stop(); for(int t=0;t