diff options
-rw-r--r-- | controller/PostgreSQL.cpp | 69 | ||||
-rw-r--r-- | controller/RabbitMQ.cpp | 4 |
2 files changed, 41 insertions, 32 deletions
diff --git a/controller/PostgreSQL.cpp b/controller/PostgreSQL.cpp index 5192dad5..9df0f440 100644 --- a/controller/PostgreSQL.cpp +++ b/controller/PostgreSQL.cpp @@ -542,7 +542,8 @@ void PostgreSQL::heartbeat() std::string build = std::to_string(ZEROTIER_ONE_VERSION_BUILD); std::string now = std::to_string(OSUtils::now()); std::string host_port = std::to_string(_listenPort); - const char *values[9] = { + std::string use_rabbitmq = (_mqc != NULL) ? "true" : "false"; + const char *values[10] = { controllerId, hostname, now.c_str(), @@ -551,16 +552,18 @@ void PostgreSQL::heartbeat() minor.c_str(), rev.c_str(), build.c_str(), - host_port.c_str() + host_port.c_str(), + use_rabbitmq.c_str() }; PGresult *res = PQexecParams(conn, - "INSERT INTO ztc_controller (id, cluster_host, last_alive, public_identity, v_major, v_minor, v_rev, v_build, host_port) " - "VALUES ($1, $2, TO_TIMESTAMP($3::double precision/1000), $4, $5, $6, $7, $8, $9) " + "INSERT INTO ztc_controller (id, cluster_host, last_alive, public_identity, v_major, v_minor, v_rev, v_build, host_port, use_rabbitmq) " + "VALUES ($1, $2, TO_TIMESTAMP($3::double precision/1000), $4, $5, $6, $7, $8, $9, $10) " "ON CONFLICT (id) DO UPDATE SET cluster_host = EXCLUDED.cluster_host, last_alive = EXCLUDED.last_alive, " "public_identity = EXCLUDED.public_identity, v_major = EXCLUDED.v_major, v_minor = EXCLUDED.v_minor, " - "v_rev = EXCLUDED.v_rev, v_build = EXCLUDED.v_rev, host_port = EXCLUDED.host_port", - 9, // number of parameters + "v_rev = EXCLUDED.v_rev, v_build = EXCLUDED.v_rev, host_port = EXCLUDED.host_port, " + "use_rabbitmq = EXCLUDED.use_rabbitmq", + 10, // number of parameters NULL, // oid field. ignore values, // values for substitution NULL, // lengths in bytes of each value @@ -591,19 +594,9 @@ void PostgreSQL::membersDbWatcher() initializeMembers(conn); - char buf[11] = {0}; - std::string cmd = "LISTEN member_" + std::string(_myAddress.toString(buf)); - PGresult *res = PQexec(conn, cmd.c_str()); - if (!res || PQresultStatus(res) != PGRES_COMMAND_OK) { - fprintf(stderr, "LISTEN command failed: %s\n", PQresultErrorMessage(res)); - PQclear(res); - PQfinish(conn); - exit(1); - } - - PQclear(res); res = NULL; - if (this->_mqc != NULL) { + PQfinish(conn); + conn = NULL; _membersWatcher_RabbitMQ(); } else { _membersWatcher_Postgres(conn); @@ -618,6 +611,18 @@ void PostgreSQL::membersDbWatcher() } void PostgreSQL::_membersWatcher_Postgres(PGconn *conn) { + char buf[11] = {0}; + std::string cmd = "LISTEN member_" + std::string(_myAddress.toString(buf)); + PGresult *res = PQexec(conn, cmd.c_str()); + if (!res || PQresultStatus(res) != PGRES_COMMAND_OK) { + fprintf(stderr, "LISTEN command failed: %s\n", PQresultErrorMessage(res)); + PQclear(res); + PQfinish(conn); + exit(1); + } + + PQclear(res); res = NULL; + while(_run == 1) { if (PQstatus(conn) != CONNECTION_OK) { fprintf(stderr, "ERROR: Member Watcher lost connection to Postgres."); @@ -659,6 +664,7 @@ void PostgreSQL::_membersWatcher_RabbitMQ() { while (_run == 1) { try { std::string msg = rmq.consume(); + // fprintf(stderr, "Got Member Update: %s\n", msg.c_str()); json tmp(json::parse(msg)); json &ov = tmp["old_val"]; json &nv = tmp["new_val"]; @@ -686,19 +692,9 @@ void PostgreSQL::networksDbWatcher() initializeNetworks(conn); - char buf[11] = {0}; - std::string cmd = "LISTEN network_" + std::string(_myAddress.toString(buf)); - PGresult *res = PQexec(conn, cmd.c_str()); - if (!res || PQresultStatus(res) != PGRES_COMMAND_OK) { - fprintf(stderr, "LISTEN command failed: %s\n", PQresultErrorMessage(res)); - PQclear(res); - PQfinish(conn); - exit(1); - } - - PQclear(res); res = NULL; - if (this->_mqc != NULL) { + PQfinish(conn); + conn = NULL; _networksWatcher_RabbitMQ(); } else { _networksWatcher_Postgres(conn); @@ -713,6 +709,18 @@ void PostgreSQL::networksDbWatcher() } void PostgreSQL::_networksWatcher_Postgres(PGconn *conn) { + char buf[11] = {0}; + std::string cmd = "LISTEN network_" + std::string(_myAddress.toString(buf)); + PGresult *res = PQexec(conn, cmd.c_str()); + if (!res || PQresultStatus(res) != PGRES_COMMAND_OK) { + fprintf(stderr, "LISTEN command failed: %s\n", PQresultErrorMessage(res)); + PQclear(res); + PQfinish(conn); + exit(1); + } + + PQclear(res); res = NULL; + while(_run == 1) { if (PQstatus(conn) != CONNECTION_OK) { fprintf(stderr, "ERROR: Network Watcher lost connection to Postgres."); @@ -752,6 +760,7 @@ void PostgreSQL::_networksWatcher_RabbitMQ() { while (_run == 1) { try { std::string msg = rmq.consume(); + // fprintf(stderr, "Got network update: %s\n", msg.c_str()); json tmp(json::parse(msg)); json &ov = tmp["old_val"]; json &nv = tmp["new_val"]; diff --git a/controller/RabbitMQ.cpp b/controller/RabbitMQ.cpp index 34efcde4..e6b8c819 100644 --- a/controller/RabbitMQ.cpp +++ b/controller/RabbitMQ.cpp @@ -58,13 +58,13 @@ void RabbitMQ::init() _q = amqp_queue_declare(_conn, _channel, amqp_cstring_bytes(_qName), 0, 0, 0, 0, amqp_empty_table); r = amqp_get_rpc_reply(_conn); if (r.reply_type != AMQP_RESPONSE_NORMAL) { - throw std::runtime_error("Error declaring queue"); + throw std::runtime_error("Error declaring queue " + std::string(_qName)); } amqp_basic_consume(_conn, _channel, amqp_cstring_bytes(_qName), amqp_empty_bytes, 0, 1, 0, amqp_empty_table); r = amqp_get_rpc_reply(_conn); if (r.reply_type != AMQP_RESPONSE_NORMAL) { - throw std::runtime_error("Error conuming"); + throw std::runtime_error("Error consuming queue " + std::string(_qName)); } fprintf(stderr, "RabbitMQ Init OK %s\n", _qName); } |