summaryrefslogtreecommitdiff
path: root/ext/librethinkdbxx/src/connection_p.h
diff options
context:
space:
mode:
Diffstat (limited to 'ext/librethinkdbxx/src/connection_p.h')
-rw-r--r--ext/librethinkdbxx/src/connection_p.h133
1 files changed, 133 insertions, 0 deletions
diff --git a/ext/librethinkdbxx/src/connection_p.h b/ext/librethinkdbxx/src/connection_p.h
new file mode 100644
index 00000000..d8a95e3c
--- /dev/null
+++ b/ext/librethinkdbxx/src/connection_p.h
@@ -0,0 +1,133 @@
+#ifndef CONNECTION_P_H
+#define CONNECTION_P_H
+
+#include <inttypes.h>
+
+#include "connection.h"
+#include "term.h"
+#include "json_p.h"
+
+namespace RethinkDB {
+
+extern const int debug_net;
+
+struct Query {
+ Protocol::Query::QueryType type;
+ uint64_t token;
+ Datum term;
+ OptArgs optArgs;
+
+ std::string serialize() {
+ Array query_arr{static_cast<double>(type)};
+ if (term.is_valid()) query_arr.emplace_back(term);
+ if (!optArgs.empty())
+ query_arr.emplace_back(Term(std::move(optArgs)).datum);
+
+ std::string query_str = write_datum(query_arr);
+ if (debug_net > 0) {
+ fprintf(stderr, "[%" PRIu64 "] >> %s\n", token, query_str.c_str());
+ }
+
+ char header[12];
+ memcpy(header, &token, 8);
+ uint32_t size = query_str.size();
+ memcpy(header + 8, &size, 4);
+ query_str.insert(0, header, 12);
+ return query_str;
+ }
+};
+
+// Used internally to convert a raw response type into an enum
+Protocol::Response::ResponseType response_type(double t);
+Protocol::Response::ErrorType runtime_error_type(double t);
+
+// Contains a response from the server. Use the Cursor class to interact with these responses
+class Response {
+public:
+ Response() = delete;
+ explicit Response(Datum&& datum) :
+ type(response_type(std::move(datum).extract_field("t").extract_number())),
+ error_type(datum.get_field("e") ?
+ runtime_error_type(std::move(datum).extract_field("e").extract_number()) :
+ Protocol::Response::ErrorType(0)),
+ result(std::move(datum).extract_field("r").extract_array()) { }
+ Error as_error();
+ Protocol::Response::ResponseType type;
+ Protocol::Response::ErrorType error_type;
+ Array result;
+};
+
+class Token;
+class ConnectionPrivate {
+public:
+ ConnectionPrivate(int sockfd)
+ : guarded_next_token(1), guarded_sockfd(sockfd), guarded_loop_active(false)
+ { }
+
+ void run_query(Query query, bool no_reply = false);
+
+ Response wait_for_response(uint64_t, double);
+ uint64_t new_token() {
+ return guarded_next_token++;
+ }
+
+ std::mutex read_lock;
+ std::mutex write_lock;
+ std::mutex cache_lock;
+
+ struct TokenCache {
+ bool closed = false;
+ std::condition_variable cond;
+ std::queue<Response> responses;
+ };
+
+ std::map<uint64_t, TokenCache> guarded_cache;
+ uint64_t guarded_next_token;
+ int guarded_sockfd;
+ bool guarded_loop_active;
+};
+
+class CacheLock {
+public:
+ CacheLock(ConnectionPrivate* conn) : inner_lock(conn->cache_lock) { }
+
+ void lock() {
+ inner_lock.lock();
+ }
+
+ void unlock() {
+ inner_lock.unlock();
+ }
+
+ std::unique_lock<std::mutex> inner_lock;
+};
+
+class ReadLock {
+public:
+ ReadLock(ConnectionPrivate* conn_) : lock(conn_->read_lock), conn(conn_) { }
+
+ size_t recv_some(char*, size_t, double wait);
+ void recv(char*, size_t, double wait);
+ std::string recv(size_t);
+ size_t recv_cstring(char*, size_t);
+
+ Response read_loop(uint64_t, CacheLock&&, double);
+
+ std::lock_guard<std::mutex> lock;
+ ConnectionPrivate* conn;
+};
+
+class WriteLock {
+public:
+ WriteLock(ConnectionPrivate* conn_) : lock(conn_->write_lock), conn(conn_) { }
+
+ void send(const char*, size_t);
+ void send(std::string);
+
+ std::lock_guard<std::mutex> lock;
+ ConnectionPrivate* conn;
+};
+
+} // namespace RethinkDB
+
+#endif // CONNECTION_P_H