From de1fc43cc58ce6a02c6eeef7791328c44189aa7c Mon Sep 17 00:00:00 2001 From: Grant Limberg Date: Mon, 4 Mar 2019 16:27:39 -0800 Subject: added librabbitmq library for central controller --- controller/PostgreSQL.cpp | 1 + 1 file changed, 1 insertion(+) (limited to 'controller/PostgreSQL.cpp') diff --git a/controller/PostgreSQL.cpp b/controller/PostgreSQL.cpp index 6165d8c4..9eb64800 100644 --- a/controller/PostgreSQL.cpp +++ b/controller/PostgreSQL.cpp @@ -32,6 +32,7 @@ #include #include +#include using json = nlohmann::json; namespace { -- cgit v1.2.3 From 1f13374a4f24c5398d4f1978b217db39aefdffad Mon Sep 17 00:00:00 2001 From: Grant Limberg Date: Mon, 4 Mar 2019 17:01:16 -0800 Subject: added struct for rabbitmq config --- controller/PostgreSQL.cpp | 53 +++++++++++++++++++++++++++++++++++------------ controller/PostgreSQL.hpp | 18 +++++++++++++++- 2 files changed, 57 insertions(+), 14 deletions(-) (limited to 'controller/PostgreSQL.cpp') diff --git a/controller/PostgreSQL.cpp b/controller/PostgreSQL.cpp index 29d61a39..8dc56c9c 100644 --- a/controller/PostgreSQL.cpp +++ b/controller/PostgreSQL.cpp @@ -74,13 +74,14 @@ std::string join(const std::vector &elements, const char * const se using namespace ZeroTier; -PostgreSQL::PostgreSQL(EmbeddedNetworkController *const nc, const Identity &myId, const char *path, int listenPort) +PostgreSQL::PostgreSQL(EmbeddedNetworkController *const nc, const Identity &myId, const char *path, int listenPort, mq_config *mqc) : DB(nc, myId, path) , _ready(0) , _connected(1) , _run(1) , _waitNoticePrinted(false) , _listenPort(listenPort) + , _mqc(mqc) { _connString = std::string(path) + " application_name=controller_" +_myAddressStr; @@ -601,6 +602,21 @@ void PostgreSQL::membersDbWatcher() PQclear(res); res = NULL; + if (this->_mqc != NULL) { + _membersWatcher_RabbitMQ(); + } else { + _membersWatcher_Postgres(conn); + PQfinish(conn); + conn = NULL; + } + + if (_run == 1) { + fprintf(stderr, "ERROR: %s membersDbWatcher should still be running! Exiting Controller.\n", _myAddressStr.c_str()); + exit(9); + } +} + +void PostgreSQL::_membersWatcher_Postgres(PGconn *conn) { while(_run == 1) { if (PQstatus(conn) != CONNECTION_OK) { fprintf(stderr, "ERROR: Member Watcher lost connection to Postgres."); @@ -627,12 +643,10 @@ void PostgreSQL::membersDbWatcher() } std::this_thread::sleep_for(std::chrono::milliseconds(10)); } - PQfinish(conn); - conn = NULL; - if (_run == 1) { - fprintf(stderr, "ERROR: %s membersDbWatcher should still be running! Exiting Controller.\n", _myAddressStr.c_str()); - exit(9); - } +} + +void PostgreSQL::_membersWatcher_RabbitMQ() { + } void PostgreSQL::networksDbWatcher() @@ -658,6 +672,21 @@ void PostgreSQL::networksDbWatcher() PQclear(res); res = NULL; + if (this->_mqc != NULL) { + _networksWatcher_RabbitMQ(); + } else { + _networksWatcher_Postgres(conn); + PQfinish(conn); + conn = NULL; + } + + if (_run == 1) { + fprintf(stderr, "ERROR: %s networksDbWatcher should still be running! Exiting Controller.\n", _myAddressStr.c_str()); + exit(8); + } +} + +void PostgreSQL::_networksWatcher_Postgres(PGconn *conn) { while(_run == 1) { if (PQstatus(conn) != CONNECTION_OK) { fprintf(stderr, "ERROR: Network Watcher lost connection to Postgres."); @@ -682,12 +711,10 @@ void PostgreSQL::networksDbWatcher() } std::this_thread::sleep_for(std::chrono::milliseconds(10)); } - PQfinish(conn); - conn = NULL; - if (_run == 1) { - fprintf(stderr, "ERROR: %s networksDbWatcher should still be running! Exiting Controller.\n", _myAddressStr.c_str()); - exit(8); - } +} + +void PostgreSQL::_networksWatcher_RabbitMQ() { + } void PostgreSQL::commitThread() diff --git a/controller/PostgreSQL.hpp b/controller/PostgreSQL.hpp index 7932317b..e0dcdf06 100644 --- a/controller/PostgreSQL.hpp +++ b/controller/PostgreSQL.hpp @@ -40,6 +40,14 @@ extern "C" { namespace ZeroTier { +struct mq_config +{ + const char *host; + int port; + const char *username; + const char *password; +}; + /** * A controller database driver that talks to PostgreSQL * @@ -49,7 +57,7 @@ namespace ZeroTier class PostgreSQL : public DB { public: - PostgreSQL(EmbeddedNetworkController *const nc, const Identity &myId, const char *path, int listenPort); + PostgreSQL(EmbeddedNetworkController *const nc, const Identity &myId, const char *path, int listenPort, mq_config *mqc = NULL); virtual ~PostgreSQL(); virtual bool waitForReady(); @@ -70,7 +78,13 @@ private: void initializeMembers(PGconn *conn); void heartbeat(); void membersDbWatcher(); + void _membersWatcher_Postgres(PGconn *conn); + void _membersWatcher_RabbitMQ(); void networksDbWatcher(); + void _networksWatcher_Postgres(PGconn *conn); + void _networksWatcher_RabbitMQ(); + + void commitThread(); void onlineNotificationThread(); @@ -100,6 +114,8 @@ private: mutable volatile bool _waitNoticePrinted; int _listenPort; + + mq_config *_mqc; }; } -- cgit v1.2.3 From fcb4221f97a7545ac741b184c53c10c3ac2c06b3 Mon Sep 17 00:00:00 2001 From: Grant Limberg Date: Tue, 5 Mar 2019 15:11:50 -0800 Subject: rabbitMQ implementation --- controller/EmbeddedNetworkController.cpp | 7 ++- controller/EmbeddedNetworkController.hpp | 6 +- controller/PostgreSQL.cpp | 57 ++++++++++++++++- controller/PostgreSQL.hpp | 12 +--- controller/RabbitMQ.cpp | 91 +++++++++++++++++++++++++++ controller/RabbitMQ.hpp | 75 ++++++++++++++++++++++ ext/librabbitmq/centos_x64/lib/librabbitmq.a | Bin 134844 -> 137332 bytes objects.mk | 1 + service/OneService.cpp | 17 ++++- 9 files changed, 249 insertions(+), 17 deletions(-) create mode 100644 controller/RabbitMQ.cpp create mode 100644 controller/RabbitMQ.hpp (limited to 'controller/PostgreSQL.cpp') diff --git a/controller/EmbeddedNetworkController.cpp b/controller/EmbeddedNetworkController.cpp index 8b3f1517..5ddcaa4b 100644 --- a/controller/EmbeddedNetworkController.cpp +++ b/controller/EmbeddedNetworkController.cpp @@ -464,12 +464,13 @@ static bool _parseRule(json &r,ZT_VirtualNetworkRule &rule) } // anonymous namespace -EmbeddedNetworkController::EmbeddedNetworkController(Node *node,const char *dbPath, int listenPort) : +EmbeddedNetworkController::EmbeddedNetworkController(Node *node,const char *dbPath, int listenPort, MQConfig *mqc) : _startTime(OSUtils::now()), _listenPort(listenPort), _node(node), _path(dbPath), - _sender((NetworkController::Sender *)0) + _sender((NetworkController::Sender *)0), + _mqc(mqc) { } @@ -489,7 +490,7 @@ void EmbeddedNetworkController::init(const Identity &signingId,Sender *sender) _signingIdAddressString = signingId.address().toString(tmp); #ifdef ZT_CONTROLLER_USE_LIBPQ if ((_path.length() > 9)&&(_path.substr(0,9) == "postgres:")) - _db.reset(new PostgreSQL(this,_signingId,_path.substr(9).c_str(), _listenPort)); + _db.reset(new PostgreSQL(this,_signingId,_path.substr(9).c_str(), _listenPort, _mqc)); else // else use FileDB after endif #endif _db.reset(new FileDB(this,_signingId,_path.c_str())); diff --git a/controller/EmbeddedNetworkController.hpp b/controller/EmbeddedNetworkController.hpp index 718c97ff..269442a8 100644 --- a/controller/EmbeddedNetworkController.hpp +++ b/controller/EmbeddedNetworkController.hpp @@ -60,6 +60,8 @@ namespace ZeroTier { class Node; +struct MQConfig; + class EmbeddedNetworkController : public NetworkController { public: @@ -67,7 +69,7 @@ public: * @param node Parent node * @param dbPath Database path (file path or database credentials) */ - EmbeddedNetworkController(Node *node,const char *dbPath, int listenPort); + EmbeddedNetworkController(Node *node,const char *dbPath, int listenPort, MQConfig *mqc = NULL); virtual ~EmbeddedNetworkController(); virtual void init(const Identity &signingId,Sender *sender); @@ -164,6 +166,8 @@ private: std::unordered_map< _MemberStatusKey,_MemberStatus,_MemberStatusHash > _memberStatus; std::mutex _memberStatus_l; + + MQConfig *_mqc; }; } // namespace ZeroTier diff --git a/controller/PostgreSQL.cpp b/controller/PostgreSQL.cpp index 8dc56c9c..5192dad5 100644 --- a/controller/PostgreSQL.cpp +++ b/controller/PostgreSQL.cpp @@ -28,6 +28,7 @@ #include "PostgreSQL.hpp" #include "EmbeddedNetworkController.hpp" +#include "RabbitMQ.hpp" #include "../version.h" #include @@ -74,7 +75,7 @@ std::string join(const std::vector &elements, const char * const se using namespace ZeroTier; -PostgreSQL::PostgreSQL(EmbeddedNetworkController *const nc, const Identity &myId, const char *path, int listenPort, mq_config *mqc) +PostgreSQL::PostgreSQL(EmbeddedNetworkController *const nc, const Identity &myId, const char *path, int listenPort, MQConfig *mqc) : DB(nc, myId, path) , _ready(0) , _connected(1) @@ -646,7 +647,32 @@ void PostgreSQL::_membersWatcher_Postgres(PGconn *conn) { } void PostgreSQL::_membersWatcher_RabbitMQ() { - + char buf[11] = {0}; + std::string qname = "member_"+ std::string(_myAddress.toString(buf)); + RabbitMQ rmq(_mqc, qname.c_str()); + try { + rmq.init(); + } catch (std::runtime_error &e) { + fprintf(stderr, "RABBITMQ ERROR: %s\n", e.what()); + exit(11); + } + while (_run == 1) { + try { + std::string msg = rmq.consume(); + json tmp(json::parse(msg)); + json &ov = tmp["old_val"]; + json &nv = tmp["new_val"]; + json oldConfig, newConfig; + if (ov.is_object()) oldConfig = ov; + if (nv.is_object()) newConfig = nv; + if (oldConfig.is_object() || newConfig.is_object()) { + _memberChanged(oldConfig,newConfig,(this->_ready>=2)); + } + } catch (std::runtime_error &e) { + fprintf(stderr, "RABBITMQ ERROR: %s\n", e.what()); + break; + } catch(...) {} + } } void PostgreSQL::networksDbWatcher() @@ -714,7 +740,32 @@ void PostgreSQL::_networksWatcher_Postgres(PGconn *conn) { } void PostgreSQL::_networksWatcher_RabbitMQ() { - + char buf[11] = {0}; + std::string qname = "network_"+ std::string(_myAddress.toString(buf)); + RabbitMQ rmq(_mqc, qname.c_str()); + try { + rmq.init(); + } catch (std::runtime_error &e) { + fprintf(stderr, "RABBITMQ ERROR: %s\n", e.what()); + exit(11); + } + while (_run == 1) { + try { + std::string msg = rmq.consume(); + json tmp(json::parse(msg)); + json &ov = tmp["old_val"]; + json &nv = tmp["new_val"]; + json oldConfig, newConfig; + if (ov.is_object()) oldConfig = ov; + if (nv.is_object()) newConfig = nv; + if (oldConfig.is_object()||newConfig.is_object()) { + _networkChanged(oldConfig,newConfig,(this->_ready >= 2)); + } + } catch (std::runtime_error &e) { + fprintf(stderr, "RABBITMQ ERROR: %s\n", e.what()); + break; + } catch(...) {} + } } void PostgreSQL::commitThread() diff --git a/controller/PostgreSQL.hpp b/controller/PostgreSQL.hpp index e0dcdf06..779d47bd 100644 --- a/controller/PostgreSQL.hpp +++ b/controller/PostgreSQL.hpp @@ -40,13 +40,7 @@ extern "C" { namespace ZeroTier { -struct mq_config -{ - const char *host; - int port; - const char *username; - const char *password; -}; +struct MQConfig; /** * A controller database driver that talks to PostgreSQL @@ -57,7 +51,7 @@ struct mq_config class PostgreSQL : public DB { public: - PostgreSQL(EmbeddedNetworkController *const nc, const Identity &myId, const char *path, int listenPort, mq_config *mqc = NULL); + PostgreSQL(EmbeddedNetworkController *const nc, const Identity &myId, const char *path, int listenPort, MQConfig *mqc = NULL); virtual ~PostgreSQL(); virtual bool waitForReady(); @@ -115,7 +109,7 @@ private: int _listenPort; - mq_config *_mqc; + MQConfig *_mqc; }; } diff --git a/controller/RabbitMQ.cpp b/controller/RabbitMQ.cpp new file mode 100644 index 00000000..34efcde4 --- /dev/null +++ b/controller/RabbitMQ.cpp @@ -0,0 +1,91 @@ +#include "RabbitMQ.hpp" + +#include +#include +#include +#include + +namespace ZeroTier +{ + +RabbitMQ::RabbitMQ(MQConfig *cfg, const char *queueName) + : _mqc(cfg) + , _qName(queueName) + , _socket(NULL) + , _status(0) +{ +} + +RabbitMQ::~RabbitMQ() +{ + amqp_channel_close(_conn, _channel, AMQP_REPLY_SUCCESS); + amqp_connection_close(_conn, AMQP_REPLY_SUCCESS); + amqp_destroy_connection(_conn); +} + +void RabbitMQ::init() +{ + struct timeval tval; + memset(&tval, 0, sizeof(struct timeval)); + tval.tv_sec = 5; + + fprintf(stderr, "Initializing RabbitMQ %s\n", _qName); + _conn = amqp_new_connection(); + _socket = amqp_tcp_socket_new(_conn); + if (!_socket) { + throw std::runtime_error("Can't create socket for RabbitMQ"); + } + + _status = amqp_socket_open_noblock(_socket, _mqc->host, _mqc->port, &tval); + if (_status) { + throw std::runtime_error("Can't connect to RabbitMQ"); + } + + amqp_rpc_reply_t r = amqp_login(_conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, + _mqc->username, _mqc->password); + if (r.reply_type != AMQP_RESPONSE_NORMAL) { + throw std::runtime_error("RabbitMQ Login Error"); + } + + static int chan = 0; + amqp_channel_open(_conn, ++chan); + r = amqp_get_rpc_reply(_conn); + if(r.reply_type != AMQP_RESPONSE_NORMAL) { + throw std::runtime_error("Error opening communication channel"); + } + _channel = chan; + + _q = amqp_queue_declare(_conn, _channel, amqp_cstring_bytes(_qName), 0, 0, 0, 0, amqp_empty_table); + r = amqp_get_rpc_reply(_conn); + if (r.reply_type != AMQP_RESPONSE_NORMAL) { + throw std::runtime_error("Error declaring queue"); + } + + amqp_basic_consume(_conn, _channel, amqp_cstring_bytes(_qName), amqp_empty_bytes, 0, 1, 0, amqp_empty_table); + r = amqp_get_rpc_reply(_conn); + if (r.reply_type != AMQP_RESPONSE_NORMAL) { + throw std::runtime_error("Error conuming"); + } + fprintf(stderr, "RabbitMQ Init OK %s\n", _qName); +} + +std::string RabbitMQ::consume() +{ + amqp_rpc_reply_t res; + amqp_envelope_t envelope; + amqp_maybe_release_buffers(_conn); + + res = amqp_consume_message(_conn, &envelope, NULL, 0); + if (res.reply_type != AMQP_RESPONSE_NORMAL) { + throw std::runtime_error("Error getting message"); + } + + std::string msg( + (const char*)envelope.message.body.bytes, + envelope.message.body.len + ); + amqp_destroy_envelope(&envelope); + return msg; +} + +} \ No newline at end of file diff --git a/controller/RabbitMQ.hpp b/controller/RabbitMQ.hpp new file mode 100644 index 00000000..74023b12 --- /dev/null +++ b/controller/RabbitMQ.hpp @@ -0,0 +1,75 @@ +/* + * ZeroTier One - Network Virtualization Everywhere + * Copyright (C) 2011-2019 ZeroTier, Inc. https://www.zerotier.com/ + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + * + * -- + * + * You can be released from the requirements of the license by purchasing + * a commercial license. Buying such a license is mandatory as soon as you + * develop commercial closed-source software that incorporates or links + * directly against ZeroTier software without disclosing the source code + * of your own application. + */ +#ifndef ZT_CONTROLLER_RABBITMQ_HPP +#define ZT_CONTROLLER_RABBITMQ_HPP + +namespace ZeroTier +{ +struct MQConfig { + const char *host; + int port; + const char *username; + const char *password; +}; +} + + +#ifdef ZT_CONTROLLER_USE_LIBPQ + +#include +#include +#include + +namespace ZeroTier +{ + +class RabbitMQ { +public: + RabbitMQ(MQConfig *cfg, const char *queueName); + ~RabbitMQ(); + + void init(); + + std::string consume(); + +private: + MQConfig *_mqc; + const char *_qName; + + amqp_socket_t *_socket; + amqp_connection_state_t _conn; + amqp_queue_declare_ok_t *_q; + int _status; + + int _channel; +}; + +} + +#endif // ZT_CONTROLLER_USE_LIBPQ + +#endif // ZT_CONTROLLER_RABBITMQ_HPP + diff --git a/ext/librabbitmq/centos_x64/lib/librabbitmq.a b/ext/librabbitmq/centos_x64/lib/librabbitmq.a index 8fc39d06..9495fa7b 100644 Binary files a/ext/librabbitmq/centos_x64/lib/librabbitmq.a and b/ext/librabbitmq/centos_x64/lib/librabbitmq.a differ diff --git a/objects.mk b/objects.mk index c15fe299..eb348dca 100644 --- a/objects.mk +++ b/objects.mk @@ -31,6 +31,7 @@ ONE_OBJS=\ controller/DB.o \ controller/FileDB.o \ controller/PostgreSQL.o \ + controller/RabbitMQ.o \ osdep/ManagedRoute.o \ osdep/Http.o \ osdep/OSUtils.o \ diff --git a/service/OneService.cpp b/service/OneService.cpp index b6673198..0fd31c64 100644 --- a/service/OneService.cpp +++ b/service/OneService.cpp @@ -99,6 +99,7 @@ extern "C" { using json = nlohmann::json; #include "../controller/EmbeddedNetworkController.hpp" +#include "../controller/RabbitMQ.hpp" #ifdef ZT_USE_TEST_TAP @@ -569,6 +570,8 @@ public: volatile bool _run; Mutex _run_m; + MQConfig *_mqc; + // end member variables ---------------------------------------------------- OneServiceImpl(const char *hp,unsigned int port) : @@ -604,6 +607,7 @@ public: ,_vaultPath("cubbyhole/zerotier") #endif ,_run(true) + ,_mqc(NULL) { _ports[0] = 0; _ports[1] = 0; @@ -678,6 +682,7 @@ public: delete _portMapper; #endif delete _controller; + delete _mqc; } virtual ReasonForTermination run() @@ -809,7 +814,7 @@ public: OSUtils::rmDashRf((_homePath + ZT_PATH_SEPARATOR_S "iddb.d").c_str()); // Network controller is now enabled by default for desktop and server - _controller = new EmbeddedNetworkController(_node,_controllerDbPath.c_str(),_ports[0]); + _controller = new EmbeddedNetworkController(_node,_controllerDbPath.c_str(),_ports[0], _mqc); _node->setNetconfMaster((void *)_controller); // Join existing networks in networks.d @@ -1073,6 +1078,16 @@ public: if (cdbp.length() > 0) _controllerDbPath = cdbp; + json &rmq = settings["rabbitmq"]; + if (rmq.is_object() && _mqc == NULL) { + fprintf(stderr, "Reading RabbitMQ Config\n"); + _mqc = new MQConfig; + _mqc->port = rmq["port"]; + _mqc->host = OSUtils::jsonString(rmq["host"], "").c_str(); + _mqc->username = OSUtils::jsonString(rmq["username"], "").c_str(); + _mqc->password = OSUtils::jsonString(rmq["password"], "").c_str(); + } + // Bind to wildcard instead of to specific interfaces (disables full tunnel capability) json &bind = settings["bind"]; if (bind.is_array()) { -- cgit v1.2.3 From 814104356085198fb6aa09f6f5e6f39406d2fc06 Mon Sep 17 00:00:00 2001 From: Grant Limberg Date: Wed, 6 Mar 2019 16:16:49 -0800 Subject: finish up rabbitmq integration --- controller/PostgreSQL.cpp | 69 ++++++++++++++++++++++++++--------------------- controller/RabbitMQ.cpp | 4 +-- 2 files changed, 41 insertions(+), 32 deletions(-) (limited to 'controller/PostgreSQL.cpp') diff --git a/controller/PostgreSQL.cpp b/controller/PostgreSQL.cpp index 5192dad5..9df0f440 100644 --- a/controller/PostgreSQL.cpp +++ b/controller/PostgreSQL.cpp @@ -542,7 +542,8 @@ void PostgreSQL::heartbeat() std::string build = std::to_string(ZEROTIER_ONE_VERSION_BUILD); std::string now = std::to_string(OSUtils::now()); std::string host_port = std::to_string(_listenPort); - const char *values[9] = { + std::string use_rabbitmq = (_mqc != NULL) ? "true" : "false"; + const char *values[10] = { controllerId, hostname, now.c_str(), @@ -551,16 +552,18 @@ void PostgreSQL::heartbeat() minor.c_str(), rev.c_str(), build.c_str(), - host_port.c_str() + host_port.c_str(), + use_rabbitmq.c_str() }; PGresult *res = PQexecParams(conn, - "INSERT INTO ztc_controller (id, cluster_host, last_alive, public_identity, v_major, v_minor, v_rev, v_build, host_port) " - "VALUES ($1, $2, TO_TIMESTAMP($3::double precision/1000), $4, $5, $6, $7, $8, $9) " + "INSERT INTO ztc_controller (id, cluster_host, last_alive, public_identity, v_major, v_minor, v_rev, v_build, host_port, use_rabbitmq) " + "VALUES ($1, $2, TO_TIMESTAMP($3::double precision/1000), $4, $5, $6, $7, $8, $9, $10) " "ON CONFLICT (id) DO UPDATE SET cluster_host = EXCLUDED.cluster_host, last_alive = EXCLUDED.last_alive, " "public_identity = EXCLUDED.public_identity, v_major = EXCLUDED.v_major, v_minor = EXCLUDED.v_minor, " - "v_rev = EXCLUDED.v_rev, v_build = EXCLUDED.v_rev, host_port = EXCLUDED.host_port", - 9, // number of parameters + "v_rev = EXCLUDED.v_rev, v_build = EXCLUDED.v_rev, host_port = EXCLUDED.host_port, " + "use_rabbitmq = EXCLUDED.use_rabbitmq", + 10, // number of parameters NULL, // oid field. ignore values, // values for substitution NULL, // lengths in bytes of each value @@ -591,19 +594,9 @@ void PostgreSQL::membersDbWatcher() initializeMembers(conn); - char buf[11] = {0}; - std::string cmd = "LISTEN member_" + std::string(_myAddress.toString(buf)); - PGresult *res = PQexec(conn, cmd.c_str()); - if (!res || PQresultStatus(res) != PGRES_COMMAND_OK) { - fprintf(stderr, "LISTEN command failed: %s\n", PQresultErrorMessage(res)); - PQclear(res); - PQfinish(conn); - exit(1); - } - - PQclear(res); res = NULL; - if (this->_mqc != NULL) { + PQfinish(conn); + conn = NULL; _membersWatcher_RabbitMQ(); } else { _membersWatcher_Postgres(conn); @@ -618,6 +611,18 @@ void PostgreSQL::membersDbWatcher() } void PostgreSQL::_membersWatcher_Postgres(PGconn *conn) { + char buf[11] = {0}; + std::string cmd = "LISTEN member_" + std::string(_myAddress.toString(buf)); + PGresult *res = PQexec(conn, cmd.c_str()); + if (!res || PQresultStatus(res) != PGRES_COMMAND_OK) { + fprintf(stderr, "LISTEN command failed: %s\n", PQresultErrorMessage(res)); + PQclear(res); + PQfinish(conn); + exit(1); + } + + PQclear(res); res = NULL; + while(_run == 1) { if (PQstatus(conn) != CONNECTION_OK) { fprintf(stderr, "ERROR: Member Watcher lost connection to Postgres."); @@ -659,6 +664,7 @@ void PostgreSQL::_membersWatcher_RabbitMQ() { while (_run == 1) { try { std::string msg = rmq.consume(); + // fprintf(stderr, "Got Member Update: %s\n", msg.c_str()); json tmp(json::parse(msg)); json &ov = tmp["old_val"]; json &nv = tmp["new_val"]; @@ -686,19 +692,9 @@ void PostgreSQL::networksDbWatcher() initializeNetworks(conn); - char buf[11] = {0}; - std::string cmd = "LISTEN network_" + std::string(_myAddress.toString(buf)); - PGresult *res = PQexec(conn, cmd.c_str()); - if (!res || PQresultStatus(res) != PGRES_COMMAND_OK) { - fprintf(stderr, "LISTEN command failed: %s\n", PQresultErrorMessage(res)); - PQclear(res); - PQfinish(conn); - exit(1); - } - - PQclear(res); res = NULL; - if (this->_mqc != NULL) { + PQfinish(conn); + conn = NULL; _networksWatcher_RabbitMQ(); } else { _networksWatcher_Postgres(conn); @@ -713,6 +709,18 @@ void PostgreSQL::networksDbWatcher() } void PostgreSQL::_networksWatcher_Postgres(PGconn *conn) { + char buf[11] = {0}; + std::string cmd = "LISTEN network_" + std::string(_myAddress.toString(buf)); + PGresult *res = PQexec(conn, cmd.c_str()); + if (!res || PQresultStatus(res) != PGRES_COMMAND_OK) { + fprintf(stderr, "LISTEN command failed: %s\n", PQresultErrorMessage(res)); + PQclear(res); + PQfinish(conn); + exit(1); + } + + PQclear(res); res = NULL; + while(_run == 1) { if (PQstatus(conn) != CONNECTION_OK) { fprintf(stderr, "ERROR: Network Watcher lost connection to Postgres."); @@ -752,6 +760,7 @@ void PostgreSQL::_networksWatcher_RabbitMQ() { while (_run == 1) { try { std::string msg = rmq.consume(); + // fprintf(stderr, "Got network update: %s\n", msg.c_str()); json tmp(json::parse(msg)); json &ov = tmp["old_val"]; json &nv = tmp["new_val"]; diff --git a/controller/RabbitMQ.cpp b/controller/RabbitMQ.cpp index 34efcde4..e6b8c819 100644 --- a/controller/RabbitMQ.cpp +++ b/controller/RabbitMQ.cpp @@ -58,13 +58,13 @@ void RabbitMQ::init() _q = amqp_queue_declare(_conn, _channel, amqp_cstring_bytes(_qName), 0, 0, 0, 0, amqp_empty_table); r = amqp_get_rpc_reply(_conn); if (r.reply_type != AMQP_RESPONSE_NORMAL) { - throw std::runtime_error("Error declaring queue"); + throw std::runtime_error("Error declaring queue " + std::string(_qName)); } amqp_basic_consume(_conn, _channel, amqp_cstring_bytes(_qName), amqp_empty_bytes, 0, 1, 0, amqp_empty_table); r = amqp_get_rpc_reply(_conn); if (r.reply_type != AMQP_RESPONSE_NORMAL) { - throw std::runtime_error("Error conuming"); + throw std::runtime_error("Error consuming queue " + std::string(_qName)); } fprintf(stderr, "RabbitMQ Init OK %s\n", _qName); } -- cgit v1.2.3 From 993d850f697dbedd5c8967f7fab482af923df926 Mon Sep 17 00:00:00 2001 From: Grant Limberg Date: Fri, 8 Mar 2019 10:50:33 -0800 Subject: more logs --- controller/PostgreSQL.cpp | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) (limited to 'controller/PostgreSQL.cpp') diff --git a/controller/PostgreSQL.cpp b/controller/PostgreSQL.cpp index 9df0f440..a856e4af 100644 --- a/controller/PostgreSQL.cpp +++ b/controller/PostgreSQL.cpp @@ -608,6 +608,7 @@ void PostgreSQL::membersDbWatcher() fprintf(stderr, "ERROR: %s membersDbWatcher should still be running! Exiting Controller.\n", _myAddressStr.c_str()); exit(9); } + fprintf(stderr, "Exited membersDbWatcher\n"); } void PostgreSQL::_membersWatcher_Postgres(PGconn *conn) { @@ -675,9 +676,13 @@ void PostgreSQL::_membersWatcher_RabbitMQ() { _memberChanged(oldConfig,newConfig,(this->_ready>=2)); } } catch (std::runtime_error &e) { - fprintf(stderr, "RABBITMQ ERROR: %s\n", e.what()); + fprintf(stderr, "RABBITMQ ERROR member change: %s\n", e.what()); break; - } catch(...) {} + } catch(std::exception &e ) { + fprintf(stderr, "RABBITMQ ERROR member change: %s\n", e.what()); + } catch(...) { + fprintf(stderr, "RABBITMQ ERROR member change: unknown error\n"); + } } } @@ -706,6 +711,7 @@ void PostgreSQL::networksDbWatcher() fprintf(stderr, "ERROR: %s networksDbWatcher should still be running! Exiting Controller.\n", _myAddressStr.c_str()); exit(8); } + fprintf(stderr, "Exited membersDbWatcher\n"); } void PostgreSQL::_networksWatcher_Postgres(PGconn *conn) { @@ -773,7 +779,11 @@ void PostgreSQL::_networksWatcher_RabbitMQ() { } catch (std::runtime_error &e) { fprintf(stderr, "RABBITMQ ERROR: %s\n", e.what()); break; - } catch(...) {} + } catch (std::exception &e) { + fprintf(stderr, "RABBITMQ ERROR network watcher: %s\n", e.what()); + } catch(...) { + fprintf(stderr, "RABBITMQ ERROR network watcher: unknown error\n"); + } } } -- cgit v1.2.3