From 4e88c80a22b6ca982341413ee806ade0df57b4b7 Mon Sep 17 00:00:00 2001 From: Adam Ierymenko Date: Thu, 2 Nov 2017 07:05:11 -0700 Subject: RethinkDB native connector work, minor fixes. --- ext/librethinkdbxx/src/cursor.cc | 221 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 221 insertions(+) create mode 100644 ext/librethinkdbxx/src/cursor.cc (limited to 'ext/librethinkdbxx/src/cursor.cc') diff --git a/ext/librethinkdbxx/src/cursor.cc b/ext/librethinkdbxx/src/cursor.cc new file mode 100644 index 00000000..987c4dba --- /dev/null +++ b/ext/librethinkdbxx/src/cursor.cc @@ -0,0 +1,221 @@ +#include "cursor.h" +#include "cursor_p.h" +#include "exceptions.h" + +namespace RethinkDB { + +// for type completion, in order to forward declare with unique_ptr +Cursor::Cursor(Cursor&&) = default; +Cursor& Cursor::operator=(Cursor&&) = default; + +CursorPrivate::CursorPrivate(uint64_t token_, Connection *conn_) + : single(false), no_more(false), index(0), + token(token_), conn(conn_) +{ } + +CursorPrivate::CursorPrivate(uint64_t token_, Connection *conn_, Datum&& datum) + : single(true), no_more(true), index(0), buffer(Array{std::move(datum)}), + token(token_), conn(conn_) +{ } + +Cursor::Cursor(CursorPrivate *dd) : d(dd) {} + +Cursor::~Cursor() { + if (d && d->conn) { + close(); + } +} + +Datum& Cursor::next(double wait) const { + if (!has_next(wait)) { + throw Error("next: No more data"); + } + + return d->buffer[d->index++]; +} + +Datum& Cursor::peek(double wait) const { + if (!has_next(wait)) { + throw Error("next: No more data"); + } + + return d->buffer[d->index]; +} + +void Cursor::each(std::function f, double wait) const { + while (has_next(wait)) { + f(std::move(d->buffer[d->index++])); + } +} + +void CursorPrivate::convert_single() const { + if (index != 0) { + throw Error("Cursor: already consumed"); + } + + if (buffer.size() != 1) { + throw Error("Cursor: invalid response from server"); + } + + if (!buffer[0].is_array()) { + throw Error("Cursor: not an array"); + } + + buffer.swap(buffer[0].extract_array()); + single = false; +} + +void CursorPrivate::clear_and_read_all() const { + if (single) { + convert_single(); + } + if (index != 0) { + buffer.erase(buffer.begin(), buffer.begin() + index); + index = 0; + } + while (!no_more) { + add_response(conn->d->wait_for_response(token, FOREVER)); + } +} + +Array&& Cursor::to_array() && { + d->clear_and_read_all(); + return std::move(d->buffer); +} + +Array Cursor::to_array() const & { + d->clear_and_read_all(); + return d->buffer; +} + +Datum Cursor::to_datum() const & { + if (d->single) { + if (d->index != 0) { + throw Error("to_datum: already consumed"); + } + return d->buffer[0]; + } + + d->clear_and_read_all(); + return d->buffer; +} + +Datum Cursor::to_datum() && { + Datum ret((Nil())); + if (d->single) { + if (d->index != 0) { + throw Error("to_datum: already consumed"); + } + ret = std::move(d->buffer[0]); + } else { + d->clear_and_read_all(); + ret = std::move(d->buffer); + } + + return ret; +} + +void Cursor::close() const { + d->conn->stop_query(d->token); + d->no_more = true; +} + +bool Cursor::has_next(double wait) const { + if (d->single) { + d->convert_single(); + } + + while (true) { + if (d->index >= d->buffer.size()) { + if (d->no_more) { + return false; + } + d->add_response(d->conn->d->wait_for_response(d->token, wait)); + } else { + return true; + } + } +} + +bool Cursor::is_single() const { + return d->single; +} + +void CursorPrivate::add_results(Array&& results) const { + if (index >= buffer.size()) { + buffer = std::move(results); + index = 0; + } else { + for (auto& it : results) { + buffer.emplace_back(std::move(it)); + } + } +} + +void CursorPrivate::add_response(Response&& response) const { + using RT = Protocol::Response::ResponseType; + switch (response.type) { + case RT::SUCCESS_SEQUENCE: + add_results(std::move(response.result)); + no_more = true; + break; + case RT::SUCCESS_PARTIAL: + conn->continue_query(token); + add_results(std::move(response.result)); + break; + case RT::SUCCESS_ATOM: + add_results(std::move(response.result)); + single = true; + no_more = true; + break; + case RT::SERVER_INFO: + add_results(std::move(response.result)); + single = true; + no_more = true; + break; + case RT::WAIT_COMPLETE: + case RT::CLIENT_ERROR: + case RT::COMPILE_ERROR: + case RT::RUNTIME_ERROR: + no_more = true; + throw response.as_error(); + } +} + +Cursor::iterator Cursor::begin() { + return iterator(this); +} + +Cursor::iterator Cursor::end() { + return iterator(nullptr); +} + +Cursor::iterator::iterator(Cursor* cursor_) : cursor(cursor_) {} + +Cursor::iterator& Cursor::iterator::operator++ () { + if (cursor == nullptr) { + throw Error("incrementing an exhausted Cursor iterator"); + } + + cursor->next(); + return *this; +} + +Datum& Cursor::iterator::operator* () { + if (cursor == nullptr) { + throw Error("reading from empty Cursor iterator"); + } + + return cursor->peek(); +} + +bool Cursor::iterator::operator!= (const Cursor::iterator& other) const { + if (cursor == other.cursor) { + return false; + } + + return !((cursor == nullptr && !other.cursor->has_next()) || + (other.cursor == nullptr && !cursor->has_next())); +} + +} -- cgit v1.2.3 From 7fc9094d8ea1c2d28d003c499016f0755b73063d Mon Sep 17 00:00:00 2001 From: Adam Ierymenko Date: Fri, 3 Nov 2017 22:40:26 -0400 Subject: More fixes to RethinkDB. --- controller/EmbeddedNetworkController.cpp | 11 ++- controller/RethinkDB.cpp | 145 ++++++++++++++++--------------- ext/librethinkdbxx/src/connection.cc | 5 +- ext/librethinkdbxx/src/cursor.cc | 8 +- 4 files changed, 93 insertions(+), 76 deletions(-) (limited to 'ext/librethinkdbxx/src/cursor.cc') diff --git a/controller/EmbeddedNetworkController.cpp b/controller/EmbeddedNetworkController.cpp index 1ca2ee08..5707e6e0 100644 --- a/controller/EmbeddedNetworkController.cpp +++ b/controller/EmbeddedNetworkController.cpp @@ -1185,8 +1185,9 @@ void EmbeddedNetworkController::_request( ms.lastRequestTime = now; } - OSUtils::ztsnprintf(nwids,sizeof(nwids),"%.16llx",nwid); - if (!_db->get(nwid,network,identity.address().toInt(),member,ns)) { + Utils::hex(nwid,nwids); + _db->get(nwid,network,identity.address().toInt(),member,ns); + if ((!network.is_object())||(network.size() == 0)) { _sender->ncSendError(nwid,requestPacketId,identity.address(),NetworkController::NC_ERROR_OBJECT_NOT_FOUND); return; } @@ -1684,11 +1685,13 @@ void EmbeddedNetworkController::_startThreads() _threads.emplace_back([this]() { for(;;) { _RQEntry *qe = (_RQEntry *)0; - if (_queue.get(qe)) + if (!_queue.get(qe)) break; try { - if (qe) + if (qe) { _request(qe->nwid,qe->fromAddr,qe->requestPacketId,qe->identity,qe->metaData); + delete qe; + } } catch (std::exception &e) { fprintf(stderr,"ERROR: exception in controller request handling thread: %s" ZT_EOL_S,e.what()); } catch ( ... ) { diff --git a/controller/RethinkDB.cpp b/controller/RethinkDB.cpp index 1f8d29df..ffa1a188 100644 --- a/controller/RethinkDB.cpp +++ b/controller/RethinkDB.cpp @@ -78,6 +78,7 @@ RethinkDB::RethinkDB(EmbeddedNetworkController *const nc,const Address &myAddres json &ov = tmp["old_val"]; json &nv = tmp["new_val"]; if (ov.is_object()||nv.is_object()) { + //if (nv.is_object()) printf("MEMBER: %s" ZT_EOL_S,nv.dump().c_str()); this->_memberChanged(ov,nv); } } catch ( ... ) {} // ignore bad records @@ -118,6 +119,7 @@ RethinkDB::RethinkDB(EmbeddedNetworkController *const nc,const Address &myAddres json &ov = tmp["old_val"]; json &nv = tmp["new_val"]; if (ov.is_object()||nv.is_object()) { + //if (nv.is_object()) printf("NETWORK: %s" ZT_EOL_S,nv.dump().c_str()); this->_networkChanged(ov,nv); } } catch ( ... ) {} // ignore bad records @@ -148,34 +150,41 @@ RethinkDB::RethinkDB(EmbeddedNetworkController *const nc,const Address &myAddres const std::string objtype = (*config)["objtype"]; const char *table; std::string deleteId; - if (objtype == "member") { - const std::string nwid = (*config)["nwid"]; - const std::string id = (*config)["id"]; - record["id"] = nwid + "-" + id; - record["controllerId"] = this->_myAddressStr; - record["networkId"] = nwid; - record["nodeId"] = id; - record["config"] = *config; - table = "Member"; - } else if (objtype == "network") { - const std::string id = (*config)["id"]; - record["id"] = id; - record["controllerId"] = this->_myAddressStr; - record["config"] = *config; - table = "Network"; - } else if (objtype == "delete_network") { - deleteId = (*config)["id"]; - table = "Network"; - } else if (objtype == "delete_member") { - deleteId = (*config)["nwid"]; - deleteId.push_back('-'); - const std::string tmp = (*config)["id"]; - deleteId.append(tmp); - table = "Member"; - } else if (objtype == "trace") { - record = *config; - table = "RemoteTrace"; - } else { + try { + if (objtype == "member") { + const std::string nwid = (*config)["nwid"]; + const std::string id = (*config)["id"]; + record["id"] = nwid + "-" + id; + record["controllerId"] = this->_myAddressStr; + record["networkId"] = nwid; + record["nodeId"] = id; + record["config"] = *config; + table = "Member"; + } else if (objtype == "network") { + const std::string id = (*config)["id"]; + record["id"] = id; + record["controllerId"] = this->_myAddressStr; + record["config"] = *config; + table = "Network"; + } else if (objtype == "delete_network") { + deleteId = (*config)["id"]; + table = "Network"; + } else if (objtype == "delete_member") { + deleteId = (*config)["nwid"]; + deleteId.push_back('-'); + const std::string tmp = (*config)["id"]; + deleteId.append(tmp); + table = "Member"; + } else if (objtype == "trace") { + record = *config; + table = "RemoteTrace"; + } else { + delete config; + continue; + } + delete config; + } catch ( ... ) { + delete config; continue; } @@ -185,10 +194,10 @@ RethinkDB::RethinkDB(EmbeddedNetworkController *const nc,const Address &myAddres rdb = R::connect(this->_host,this->_port,this->_auth); if (rdb) { if (deleteId.length() > 0) { - printf("DELETE: %s" ZT_EOL_S,deleteId.c_str()); + //printf("DELETE: %s" ZT_EOL_S,deleteId.c_str()); R::db(this->_db).table(table).get(deleteId).delete_().run(*rdb); } else { - printf("UPSERT: %s" ZT_EOL_S,record.dump().c_str()); + //printf("UPSERT: %s" ZT_EOL_S,record.dump().c_str()); R::db(this->_db).table(table).insert(R::Datum::from_json(record.dump()),R::optargs("conflict","update","return_changes",false)).run(*rdb); } break; @@ -222,6 +231,7 @@ RethinkDB::RethinkDB(EmbeddedNetworkController *const nc,const Address &myAddres rdb = R::connect(this->_host,this->_port,this->_auth); if (rdb) { OSUtils::ztsnprintf(tmp,sizeof(tmp),"{\"id\":\"%s\",\"lastAlive\":%lld}",this->_myAddressStr.c_str(),(long long)OSUtils::now()); + //printf("HEARTBEAT: %s" ZT_EOL_S,tmp); R::db(this->_db).table("Controller").update(R::Datum::from_json(tmp)).run(*rdb); } } catch ( ... ) { @@ -240,15 +250,18 @@ RethinkDB::~RethinkDB() _commitQueue.stop(); for(int t=0;tclose(); + if (_networksDbWatcherConnection) + ((R::Connection *)_networksDbWatcherConnection)->close(); + _membersDbWatcher.join(); + _networksDbWatcher.join(); _heartbeatThread.join(); } bool RethinkDB::get(const uint64_t networkId,nlohmann::json &network) { waitForReady(); - std::shared_ptr<_Network> nw; { std::lock_guard l(_networks_l); @@ -257,17 +270,16 @@ bool RethinkDB::get(const uint64_t networkId,nlohmann::json &network) return false; nw = nwi->second; } - - std::lock_guard l2(nw->lock); - network = nw->config; - + { + std::lock_guard l2(nw->lock); + network = nw->config; + } return true; } bool RethinkDB::get(const uint64_t networkId,nlohmann::json &network,const uint64_t memberId,nlohmann::json &member) { waitForReady(); - std::shared_ptr<_Network> nw; { std::lock_guard l(_networks_l); @@ -276,21 +288,20 @@ bool RethinkDB::get(const uint64_t networkId,nlohmann::json &network,const uint6 return false; nw = nwi->second; } - - std::lock_guard l2(nw->lock); - auto m = nw->members.find(memberId); - if (m == nw->members.end()) - return false; - network = nw->config; - member = m->second; - + { + std::lock_guard l2(nw->lock); + network = nw->config; + auto m = nw->members.find(memberId); + if (m == nw->members.end()) + return false; + member = m->second; + } return true; } bool RethinkDB::get(const uint64_t networkId,nlohmann::json &network,const uint64_t memberId,nlohmann::json &member,NetworkSummaryInfo &info) { waitForReady(); - std::shared_ptr<_Network> nw; { std::lock_guard l(_networks_l); @@ -299,22 +310,21 @@ bool RethinkDB::get(const uint64_t networkId,nlohmann::json &network,const uint6 return false; nw = nwi->second; } - - std::lock_guard l2(nw->lock); - auto m = nw->members.find(memberId); - if (m == nw->members.end()) - return false; - network = nw->config; - member = m->second; - _fillSummaryInfo(nw,info); - + { + std::lock_guard l2(nw->lock); + network = nw->config; + _fillSummaryInfo(nw,info); + auto m = nw->members.find(memberId); + if (m == nw->members.end()) + return false; + member = m->second; + } return true; } bool RethinkDB::get(const uint64_t networkId,nlohmann::json &network,std::vector &members) { waitForReady(); - std::shared_ptr<_Network> nw; { std::lock_guard l(_networks_l); @@ -323,19 +333,18 @@ bool RethinkDB::get(const uint64_t networkId,nlohmann::json &network,std::vector return false; nw = nwi->second; } - - std::lock_guard l2(nw->lock); - network = nw->config; - for(auto m=nw->members.begin();m!=nw->members.end();++m) - members.push_back(m->second); - + { + std::lock_guard l2(nw->lock); + network = nw->config; + for(auto m=nw->members.begin();m!=nw->members.end();++m) + members.push_back(m->second); + } return true; } bool RethinkDB::summary(const uint64_t networkId,NetworkSummaryInfo &info) { waitForReady(); - std::shared_ptr<_Network> nw; { std::lock_guard l(_networks_l); @@ -344,10 +353,10 @@ bool RethinkDB::summary(const uint64_t networkId,NetworkSummaryInfo &info) return false; nw = nwi->second; } - - std::lock_guard l2(nw->lock); - _fillSummaryInfo(nw,info); - + { + std::lock_guard l2(nw->lock); + _fillSummaryInfo(nw,info); + } return true; } diff --git a/ext/librethinkdbxx/src/connection.cc b/ext/librethinkdbxx/src/connection.cc index 62f6efee..53d106ec 100644 --- a/ext/librethinkdbxx/src/connection.cc +++ b/ext/librethinkdbxx/src/connection.cc @@ -101,6 +101,8 @@ std::unique_ptr connect(std::string host, int port, std::string auth Connection::Connection(ConnectionPrivate *dd) : d(dd) { } Connection::~Connection() { // close(); + if (d->guarded_sockfd >= 0) + ::close(d->guarded_sockfd); } size_t ReadLock::recv_some(char* buf, size_t size, double wait) { @@ -128,7 +130,7 @@ size_t ReadLock::recv_some(char* buf, size_t size, double wait) { } ssize_t numbytes = ::recv(conn->guarded_sockfd, buf, size, 0); - if (numbytes == -1) throw Error::from_errno("recv"); + if (numbytes <= 0) throw Error::from_errno("recv"); if (debug_net > 1) { fprintf(stderr, "<< %s\n", write_datum(std::string(buf, numbytes)).c_str()); } @@ -190,6 +192,7 @@ void Connection::close() { if (ret == -1) { throw Error::from_errno("close"); } + d->guarded_sockfd = -1; } Response ConnectionPrivate::wait_for_response(uint64_t token_want, double wait) { diff --git a/ext/librethinkdbxx/src/cursor.cc b/ext/librethinkdbxx/src/cursor.cc index 987c4dba..df0621eb 100644 --- a/ext/librethinkdbxx/src/cursor.cc +++ b/ext/librethinkdbxx/src/cursor.cc @@ -21,9 +21,11 @@ CursorPrivate::CursorPrivate(uint64_t token_, Connection *conn_, Datum&& datum) Cursor::Cursor(CursorPrivate *dd) : d(dd) {} Cursor::~Cursor() { - if (d && d->conn) { - close(); - } + try { + if (d && d->conn) { + close(); + } + } catch ( ... ) {} } Datum& Cursor::next(double wait) const { -- cgit v1.2.3