summaryrefslogtreecommitdiff
path: root/controller/PostgreSQL.cpp
diff options
context:
space:
mode:
authorAdam Ierymenko <adam.ierymenko@gmail.com>2019-03-08 13:16:02 -0800
committerAdam Ierymenko <adam.ierymenko@gmail.com>2019-03-08 13:16:02 -0800
commitd7a6357393f25748e4256d6ccf1321d7532d02b9 (patch)
tree177ddac683b2a4c688da02f809f6e008fc5d22a4 /controller/PostgreSQL.cpp
parent6fbf21b4f0abb89049e724b3bd625458327b6a77 (diff)
parent993d850f697dbedd5c8967f7fab482af923df926 (diff)
downloadinfinitytier-d7a6357393f25748e4256d6ccf1321d7532d02b9.tar.gz
infinitytier-d7a6357393f25748e4256d6ccf1321d7532d02b9.zip
Merge branch 'dev' into edge
Diffstat (limited to 'controller/PostgreSQL.cpp')
-rw-r--r--controller/PostgreSQL.cpp133
1 files changed, 116 insertions, 17 deletions
diff --git a/controller/PostgreSQL.cpp b/controller/PostgreSQL.cpp
index 6165d8c4..a856e4af 100644
--- a/controller/PostgreSQL.cpp
+++ b/controller/PostgreSQL.cpp
@@ -28,10 +28,13 @@
#include "PostgreSQL.hpp"
#include "EmbeddedNetworkController.hpp"
+#include "RabbitMQ.hpp"
#include "../version.h"
#include <libpq-fe.h>
#include <sstream>
+#include <amqp.h>
+#include <amqp_tcp_socket.h>
using json = nlohmann::json;
namespace {
@@ -72,13 +75,14 @@ std::string join(const std::vector<std::string> &elements, const char * const se
using namespace ZeroTier;
-PostgreSQL::PostgreSQL(EmbeddedNetworkController *const nc, const Identity &myId, const char *path, int listenPort)
+PostgreSQL::PostgreSQL(EmbeddedNetworkController *const nc, const Identity &myId, const char *path, int listenPort, MQConfig *mqc)
: DB(nc, myId, path)
, _ready(0)
, _connected(1)
, _run(1)
, _waitNoticePrinted(false)
, _listenPort(listenPort)
+ , _mqc(mqc)
{
_connString = std::string(path) + " application_name=controller_" +_myAddressStr;
@@ -538,7 +542,8 @@ void PostgreSQL::heartbeat()
std::string build = std::to_string(ZEROTIER_ONE_VERSION_BUILD);
std::string now = std::to_string(OSUtils::now());
std::string host_port = std::to_string(_listenPort);
- const char *values[9] = {
+ std::string use_rabbitmq = (_mqc != NULL) ? "true" : "false";
+ const char *values[10] = {
controllerId,
hostname,
now.c_str(),
@@ -547,16 +552,18 @@ void PostgreSQL::heartbeat()
minor.c_str(),
rev.c_str(),
build.c_str(),
- host_port.c_str()
+ host_port.c_str(),
+ use_rabbitmq.c_str()
};
PGresult *res = PQexecParams(conn,
- "INSERT INTO ztc_controller (id, cluster_host, last_alive, public_identity, v_major, v_minor, v_rev, v_build, host_port) "
- "VALUES ($1, $2, TO_TIMESTAMP($3::double precision/1000), $4, $5, $6, $7, $8, $9) "
+ "INSERT INTO ztc_controller (id, cluster_host, last_alive, public_identity, v_major, v_minor, v_rev, v_build, host_port, use_rabbitmq) "
+ "VALUES ($1, $2, TO_TIMESTAMP($3::double precision/1000), $4, $5, $6, $7, $8, $9, $10) "
"ON CONFLICT (id) DO UPDATE SET cluster_host = EXCLUDED.cluster_host, last_alive = EXCLUDED.last_alive, "
"public_identity = EXCLUDED.public_identity, v_major = EXCLUDED.v_major, v_minor = EXCLUDED.v_minor, "
- "v_rev = EXCLUDED.v_rev, v_build = EXCLUDED.v_rev, host_port = EXCLUDED.host_port",
- 9, // number of parameters
+ "v_rev = EXCLUDED.v_rev, v_build = EXCLUDED.v_rev, host_port = EXCLUDED.host_port, "
+ "use_rabbitmq = EXCLUDED.use_rabbitmq",
+ 10, // number of parameters
NULL, // oid field. ignore
values, // values for substitution
NULL, // lengths in bytes of each value
@@ -587,6 +594,24 @@ void PostgreSQL::membersDbWatcher()
initializeMembers(conn);
+ if (this->_mqc != NULL) {
+ PQfinish(conn);
+ conn = NULL;
+ _membersWatcher_RabbitMQ();
+ } else {
+ _membersWatcher_Postgres(conn);
+ PQfinish(conn);
+ conn = NULL;
+ }
+
+ if (_run == 1) {
+ fprintf(stderr, "ERROR: %s membersDbWatcher should still be running! Exiting Controller.\n", _myAddressStr.c_str());
+ exit(9);
+ }
+ fprintf(stderr, "Exited membersDbWatcher\n");
+}
+
+void PostgreSQL::_membersWatcher_Postgres(PGconn *conn) {
char buf[11] = {0};
std::string cmd = "LISTEN member_" + std::string(_myAddress.toString(buf));
PGresult *res = PQexec(conn, cmd.c_str());
@@ -625,11 +650,39 @@ void PostgreSQL::membersDbWatcher()
}
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
- PQfinish(conn);
- conn = NULL;
- if (_run == 1) {
- fprintf(stderr, "ERROR: %s membersDbWatcher should still be running! Exiting Controller.\n", _myAddressStr.c_str());
- exit(9);
+}
+
+void PostgreSQL::_membersWatcher_RabbitMQ() {
+ char buf[11] = {0};
+ std::string qname = "member_"+ std::string(_myAddress.toString(buf));
+ RabbitMQ rmq(_mqc, qname.c_str());
+ try {
+ rmq.init();
+ } catch (std::runtime_error &e) {
+ fprintf(stderr, "RABBITMQ ERROR: %s\n", e.what());
+ exit(11);
+ }
+ while (_run == 1) {
+ try {
+ std::string msg = rmq.consume();
+ // fprintf(stderr, "Got Member Update: %s\n", msg.c_str());
+ json tmp(json::parse(msg));
+ json &ov = tmp["old_val"];
+ json &nv = tmp["new_val"];
+ json oldConfig, newConfig;
+ if (ov.is_object()) oldConfig = ov;
+ if (nv.is_object()) newConfig = nv;
+ if (oldConfig.is_object() || newConfig.is_object()) {
+ _memberChanged(oldConfig,newConfig,(this->_ready>=2));
+ }
+ } catch (std::runtime_error &e) {
+ fprintf(stderr, "RABBITMQ ERROR member change: %s\n", e.what());
+ break;
+ } catch(std::exception &e ) {
+ fprintf(stderr, "RABBITMQ ERROR member change: %s\n", e.what());
+ } catch(...) {
+ fprintf(stderr, "RABBITMQ ERROR member change: unknown error\n");
+ }
}
}
@@ -644,6 +697,24 @@ void PostgreSQL::networksDbWatcher()
initializeNetworks(conn);
+ if (this->_mqc != NULL) {
+ PQfinish(conn);
+ conn = NULL;
+ _networksWatcher_RabbitMQ();
+ } else {
+ _networksWatcher_Postgres(conn);
+ PQfinish(conn);
+ conn = NULL;
+ }
+
+ if (_run == 1) {
+ fprintf(stderr, "ERROR: %s networksDbWatcher should still be running! Exiting Controller.\n", _myAddressStr.c_str());
+ exit(8);
+ }
+ fprintf(stderr, "Exited membersDbWatcher\n");
+}
+
+void PostgreSQL::_networksWatcher_Postgres(PGconn *conn) {
char buf[11] = {0};
std::string cmd = "LISTEN network_" + std::string(_myAddress.toString(buf));
PGresult *res = PQexec(conn, cmd.c_str());
@@ -680,11 +751,39 @@ void PostgreSQL::networksDbWatcher()
}
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
- PQfinish(conn);
- conn = NULL;
- if (_run == 1) {
- fprintf(stderr, "ERROR: %s networksDbWatcher should still be running! Exiting Controller.\n", _myAddressStr.c_str());
- exit(8);
+}
+
+void PostgreSQL::_networksWatcher_RabbitMQ() {
+ char buf[11] = {0};
+ std::string qname = "network_"+ std::string(_myAddress.toString(buf));
+ RabbitMQ rmq(_mqc, qname.c_str());
+ try {
+ rmq.init();
+ } catch (std::runtime_error &e) {
+ fprintf(stderr, "RABBITMQ ERROR: %s\n", e.what());
+ exit(11);
+ }
+ while (_run == 1) {
+ try {
+ std::string msg = rmq.consume();
+ // fprintf(stderr, "Got network update: %s\n", msg.c_str());
+ json tmp(json::parse(msg));
+ json &ov = tmp["old_val"];
+ json &nv = tmp["new_val"];
+ json oldConfig, newConfig;
+ if (ov.is_object()) oldConfig = ov;
+ if (nv.is_object()) newConfig = nv;
+ if (oldConfig.is_object()||newConfig.is_object()) {
+ _networkChanged(oldConfig,newConfig,(this->_ready >= 2));
+ }
+ } catch (std::runtime_error &e) {
+ fprintf(stderr, "RABBITMQ ERROR: %s\n", e.what());
+ break;
+ } catch (std::exception &e) {
+ fprintf(stderr, "RABBITMQ ERROR network watcher: %s\n", e.what());
+ } catch(...) {
+ fprintf(stderr, "RABBITMQ ERROR network watcher: unknown error\n");
+ }
}
}