From fcb4221f97a7545ac741b184c53c10c3ac2c06b3 Mon Sep 17 00:00:00 2001 From: Grant Limberg Date: Tue, 5 Mar 2019 15:11:50 -0800 Subject: rabbitMQ implementation --- controller/RabbitMQ.cpp | 91 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 91 insertions(+) create mode 100644 controller/RabbitMQ.cpp (limited to 'controller/RabbitMQ.cpp') diff --git a/controller/RabbitMQ.cpp b/controller/RabbitMQ.cpp new file mode 100644 index 00000000..34efcde4 --- /dev/null +++ b/controller/RabbitMQ.cpp @@ -0,0 +1,91 @@ +#include "RabbitMQ.hpp" + +#include +#include +#include +#include + +namespace ZeroTier +{ + +RabbitMQ::RabbitMQ(MQConfig *cfg, const char *queueName) + : _mqc(cfg) + , _qName(queueName) + , _socket(NULL) + , _status(0) +{ +} + +RabbitMQ::~RabbitMQ() +{ + amqp_channel_close(_conn, _channel, AMQP_REPLY_SUCCESS); + amqp_connection_close(_conn, AMQP_REPLY_SUCCESS); + amqp_destroy_connection(_conn); +} + +void RabbitMQ::init() +{ + struct timeval tval; + memset(&tval, 0, sizeof(struct timeval)); + tval.tv_sec = 5; + + fprintf(stderr, "Initializing RabbitMQ %s\n", _qName); + _conn = amqp_new_connection(); + _socket = amqp_tcp_socket_new(_conn); + if (!_socket) { + throw std::runtime_error("Can't create socket for RabbitMQ"); + } + + _status = amqp_socket_open_noblock(_socket, _mqc->host, _mqc->port, &tval); + if (_status) { + throw std::runtime_error("Can't connect to RabbitMQ"); + } + + amqp_rpc_reply_t r = amqp_login(_conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, + _mqc->username, _mqc->password); + if (r.reply_type != AMQP_RESPONSE_NORMAL) { + throw std::runtime_error("RabbitMQ Login Error"); + } + + static int chan = 0; + amqp_channel_open(_conn, ++chan); + r = amqp_get_rpc_reply(_conn); + if(r.reply_type != AMQP_RESPONSE_NORMAL) { + throw std::runtime_error("Error opening communication channel"); + } + _channel = chan; + + _q = amqp_queue_declare(_conn, _channel, amqp_cstring_bytes(_qName), 0, 0, 0, 0, amqp_empty_table); + r = amqp_get_rpc_reply(_conn); + if (r.reply_type != AMQP_RESPONSE_NORMAL) { + throw std::runtime_error("Error declaring queue"); + } + + amqp_basic_consume(_conn, _channel, amqp_cstring_bytes(_qName), amqp_empty_bytes, 0, 1, 0, amqp_empty_table); + r = amqp_get_rpc_reply(_conn); + if (r.reply_type != AMQP_RESPONSE_NORMAL) { + throw std::runtime_error("Error conuming"); + } + fprintf(stderr, "RabbitMQ Init OK %s\n", _qName); +} + +std::string RabbitMQ::consume() +{ + amqp_rpc_reply_t res; + amqp_envelope_t envelope; + amqp_maybe_release_buffers(_conn); + + res = amqp_consume_message(_conn, &envelope, NULL, 0); + if (res.reply_type != AMQP_RESPONSE_NORMAL) { + throw std::runtime_error("Error getting message"); + } + + std::string msg( + (const char*)envelope.message.body.bytes, + envelope.message.body.len + ); + amqp_destroy_envelope(&envelope); + return msg; +} + +} \ No newline at end of file -- cgit v1.2.3 From 814104356085198fb6aa09f6f5e6f39406d2fc06 Mon Sep 17 00:00:00 2001 From: Grant Limberg Date: Wed, 6 Mar 2019 16:16:49 -0800 Subject: finish up rabbitmq integration --- controller/PostgreSQL.cpp | 69 ++++++++++++++++++++++++++--------------------- controller/RabbitMQ.cpp | 4 +-- 2 files changed, 41 insertions(+), 32 deletions(-) (limited to 'controller/RabbitMQ.cpp') diff --git a/controller/PostgreSQL.cpp b/controller/PostgreSQL.cpp index 5192dad5..9df0f440 100644 --- a/controller/PostgreSQL.cpp +++ b/controller/PostgreSQL.cpp @@ -542,7 +542,8 @@ void PostgreSQL::heartbeat() std::string build = std::to_string(ZEROTIER_ONE_VERSION_BUILD); std::string now = std::to_string(OSUtils::now()); std::string host_port = std::to_string(_listenPort); - const char *values[9] = { + std::string use_rabbitmq = (_mqc != NULL) ? "true" : "false"; + const char *values[10] = { controllerId, hostname, now.c_str(), @@ -551,16 +552,18 @@ void PostgreSQL::heartbeat() minor.c_str(), rev.c_str(), build.c_str(), - host_port.c_str() + host_port.c_str(), + use_rabbitmq.c_str() }; PGresult *res = PQexecParams(conn, - "INSERT INTO ztc_controller (id, cluster_host, last_alive, public_identity, v_major, v_minor, v_rev, v_build, host_port) " - "VALUES ($1, $2, TO_TIMESTAMP($3::double precision/1000), $4, $5, $6, $7, $8, $9) " + "INSERT INTO ztc_controller (id, cluster_host, last_alive, public_identity, v_major, v_minor, v_rev, v_build, host_port, use_rabbitmq) " + "VALUES ($1, $2, TO_TIMESTAMP($3::double precision/1000), $4, $5, $6, $7, $8, $9, $10) " "ON CONFLICT (id) DO UPDATE SET cluster_host = EXCLUDED.cluster_host, last_alive = EXCLUDED.last_alive, " "public_identity = EXCLUDED.public_identity, v_major = EXCLUDED.v_major, v_minor = EXCLUDED.v_minor, " - "v_rev = EXCLUDED.v_rev, v_build = EXCLUDED.v_rev, host_port = EXCLUDED.host_port", - 9, // number of parameters + "v_rev = EXCLUDED.v_rev, v_build = EXCLUDED.v_rev, host_port = EXCLUDED.host_port, " + "use_rabbitmq = EXCLUDED.use_rabbitmq", + 10, // number of parameters NULL, // oid field. ignore values, // values for substitution NULL, // lengths in bytes of each value @@ -591,19 +594,9 @@ void PostgreSQL::membersDbWatcher() initializeMembers(conn); - char buf[11] = {0}; - std::string cmd = "LISTEN member_" + std::string(_myAddress.toString(buf)); - PGresult *res = PQexec(conn, cmd.c_str()); - if (!res || PQresultStatus(res) != PGRES_COMMAND_OK) { - fprintf(stderr, "LISTEN command failed: %s\n", PQresultErrorMessage(res)); - PQclear(res); - PQfinish(conn); - exit(1); - } - - PQclear(res); res = NULL; - if (this->_mqc != NULL) { + PQfinish(conn); + conn = NULL; _membersWatcher_RabbitMQ(); } else { _membersWatcher_Postgres(conn); @@ -618,6 +611,18 @@ void PostgreSQL::membersDbWatcher() } void PostgreSQL::_membersWatcher_Postgres(PGconn *conn) { + char buf[11] = {0}; + std::string cmd = "LISTEN member_" + std::string(_myAddress.toString(buf)); + PGresult *res = PQexec(conn, cmd.c_str()); + if (!res || PQresultStatus(res) != PGRES_COMMAND_OK) { + fprintf(stderr, "LISTEN command failed: %s\n", PQresultErrorMessage(res)); + PQclear(res); + PQfinish(conn); + exit(1); + } + + PQclear(res); res = NULL; + while(_run == 1) { if (PQstatus(conn) != CONNECTION_OK) { fprintf(stderr, "ERROR: Member Watcher lost connection to Postgres."); @@ -659,6 +664,7 @@ void PostgreSQL::_membersWatcher_RabbitMQ() { while (_run == 1) { try { std::string msg = rmq.consume(); + // fprintf(stderr, "Got Member Update: %s\n", msg.c_str()); json tmp(json::parse(msg)); json &ov = tmp["old_val"]; json &nv = tmp["new_val"]; @@ -686,19 +692,9 @@ void PostgreSQL::networksDbWatcher() initializeNetworks(conn); - char buf[11] = {0}; - std::string cmd = "LISTEN network_" + std::string(_myAddress.toString(buf)); - PGresult *res = PQexec(conn, cmd.c_str()); - if (!res || PQresultStatus(res) != PGRES_COMMAND_OK) { - fprintf(stderr, "LISTEN command failed: %s\n", PQresultErrorMessage(res)); - PQclear(res); - PQfinish(conn); - exit(1); - } - - PQclear(res); res = NULL; - if (this->_mqc != NULL) { + PQfinish(conn); + conn = NULL; _networksWatcher_RabbitMQ(); } else { _networksWatcher_Postgres(conn); @@ -713,6 +709,18 @@ void PostgreSQL::networksDbWatcher() } void PostgreSQL::_networksWatcher_Postgres(PGconn *conn) { + char buf[11] = {0}; + std::string cmd = "LISTEN network_" + std::string(_myAddress.toString(buf)); + PGresult *res = PQexec(conn, cmd.c_str()); + if (!res || PQresultStatus(res) != PGRES_COMMAND_OK) { + fprintf(stderr, "LISTEN command failed: %s\n", PQresultErrorMessage(res)); + PQclear(res); + PQfinish(conn); + exit(1); + } + + PQclear(res); res = NULL; + while(_run == 1) { if (PQstatus(conn) != CONNECTION_OK) { fprintf(stderr, "ERROR: Network Watcher lost connection to Postgres."); @@ -752,6 +760,7 @@ void PostgreSQL::_networksWatcher_RabbitMQ() { while (_run == 1) { try { std::string msg = rmq.consume(); + // fprintf(stderr, "Got network update: %s\n", msg.c_str()); json tmp(json::parse(msg)); json &ov = tmp["old_val"]; json &nv = tmp["new_val"]; diff --git a/controller/RabbitMQ.cpp b/controller/RabbitMQ.cpp index 34efcde4..e6b8c819 100644 --- a/controller/RabbitMQ.cpp +++ b/controller/RabbitMQ.cpp @@ -58,13 +58,13 @@ void RabbitMQ::init() _q = amqp_queue_declare(_conn, _channel, amqp_cstring_bytes(_qName), 0, 0, 0, 0, amqp_empty_table); r = amqp_get_rpc_reply(_conn); if (r.reply_type != AMQP_RESPONSE_NORMAL) { - throw std::runtime_error("Error declaring queue"); + throw std::runtime_error("Error declaring queue " + std::string(_qName)); } amqp_basic_consume(_conn, _channel, amqp_cstring_bytes(_qName), amqp_empty_bytes, 0, 1, 0, amqp_empty_table); r = amqp_get_rpc_reply(_conn); if (r.reply_type != AMQP_RESPONSE_NORMAL) { - throw std::runtime_error("Error conuming"); + throw std::runtime_error("Error consuming queue " + std::string(_qName)); } fprintf(stderr, "RabbitMQ Init OK %s\n", _qName); } -- cgit v1.2.3 From 171cc3e01add8d25d221f4a29d6961b8a1a9d32e Mon Sep 17 00:00:00 2001 From: Grant Limberg Date: Fri, 8 Mar 2019 10:20:19 -0800 Subject: reorder a couple of things --- controller/RabbitMQ.cpp | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) (limited to 'controller/RabbitMQ.cpp') diff --git a/controller/RabbitMQ.cpp b/controller/RabbitMQ.cpp index e6b8c819..096d3f53 100644 --- a/controller/RabbitMQ.cpp +++ b/controller/RabbitMQ.cpp @@ -48,13 +48,13 @@ void RabbitMQ::init() } static int chan = 0; - amqp_channel_open(_conn, ++chan); + _channel = ++chan; + amqp_channel_open(_conn, _channel); r = amqp_get_rpc_reply(_conn); if(r.reply_type != AMQP_RESPONSE_NORMAL) { throw std::runtime_error("Error opening communication channel"); } - _channel = chan; - + _q = amqp_queue_declare(_conn, _channel, amqp_cstring_bytes(_qName), 0, 0, 0, 0, amqp_empty_table); r = amqp_get_rpc_reply(_conn); if (r.reply_type != AMQP_RESPONSE_NORMAL) { -- cgit v1.2.3 From cda07b20a26c0ac774445da1334234aab342ce63 Mon Sep 17 00:00:00 2001 From: Grant Limberg Date: Fri, 8 Mar 2019 10:29:36 -0800 Subject: add mutex to channel numbering --- controller/RabbitMQ.cpp | 7 +++++-- controller/RabbitMQ.hpp | 6 +++++- 2 files changed, 10 insertions(+), 3 deletions(-) (limited to 'controller/RabbitMQ.cpp') diff --git a/controller/RabbitMQ.cpp b/controller/RabbitMQ.cpp index 096d3f53..f5a5c1d6 100644 --- a/controller/RabbitMQ.cpp +++ b/controller/RabbitMQ.cpp @@ -48,7 +48,10 @@ void RabbitMQ::init() } static int chan = 0; - _channel = ++chan; + { + Mutex::Lock l(_chan_m); + _channel = ++chan; + } amqp_channel_open(_conn, _channel); r = amqp_get_rpc_reply(_conn); if(r.reply_type != AMQP_RESPONSE_NORMAL) { @@ -88,4 +91,4 @@ std::string RabbitMQ::consume() return msg; } -} \ No newline at end of file +} diff --git a/controller/RabbitMQ.hpp b/controller/RabbitMQ.hpp index 74023b12..d341681b 100644 --- a/controller/RabbitMQ.hpp +++ b/controller/RabbitMQ.hpp @@ -36,9 +36,10 @@ struct MQConfig { }; } - #ifdef ZT_CONTROLLER_USE_LIBPQ +#include "../node/Mutex.hpp" + #include #include #include @@ -65,6 +66,9 @@ private: int _status; int _channel; + + Mutex _chan_m; + }; } -- cgit v1.2.3