diff options
Diffstat (limited to 'controller/PostgreSQL.cpp')
-rw-r--r-- | controller/PostgreSQL.cpp | 133 |
1 files changed, 116 insertions, 17 deletions
diff --git a/controller/PostgreSQL.cpp b/controller/PostgreSQL.cpp index 6165d8c4..a856e4af 100644 --- a/controller/PostgreSQL.cpp +++ b/controller/PostgreSQL.cpp @@ -28,10 +28,13 @@ #include "PostgreSQL.hpp" #include "EmbeddedNetworkController.hpp" +#include "RabbitMQ.hpp" #include "../version.h" #include <libpq-fe.h> #include <sstream> +#include <amqp.h> +#include <amqp_tcp_socket.h> using json = nlohmann::json; namespace { @@ -72,13 +75,14 @@ std::string join(const std::vector<std::string> &elements, const char * const se using namespace ZeroTier; -PostgreSQL::PostgreSQL(EmbeddedNetworkController *const nc, const Identity &myId, const char *path, int listenPort) +PostgreSQL::PostgreSQL(EmbeddedNetworkController *const nc, const Identity &myId, const char *path, int listenPort, MQConfig *mqc) : DB(nc, myId, path) , _ready(0) , _connected(1) , _run(1) , _waitNoticePrinted(false) , _listenPort(listenPort) + , _mqc(mqc) { _connString = std::string(path) + " application_name=controller_" +_myAddressStr; @@ -538,7 +542,8 @@ void PostgreSQL::heartbeat() std::string build = std::to_string(ZEROTIER_ONE_VERSION_BUILD); std::string now = std::to_string(OSUtils::now()); std::string host_port = std::to_string(_listenPort); - const char *values[9] = { + std::string use_rabbitmq = (_mqc != NULL) ? "true" : "false"; + const char *values[10] = { controllerId, hostname, now.c_str(), @@ -547,16 +552,18 @@ void PostgreSQL::heartbeat() minor.c_str(), rev.c_str(), build.c_str(), - host_port.c_str() + host_port.c_str(), + use_rabbitmq.c_str() }; PGresult *res = PQexecParams(conn, - "INSERT INTO ztc_controller (id, cluster_host, last_alive, public_identity, v_major, v_minor, v_rev, v_build, host_port) " - "VALUES ($1, $2, TO_TIMESTAMP($3::double precision/1000), $4, $5, $6, $7, $8, $9) " + "INSERT INTO ztc_controller (id, cluster_host, last_alive, public_identity, v_major, v_minor, v_rev, v_build, host_port, use_rabbitmq) " + "VALUES ($1, $2, TO_TIMESTAMP($3::double precision/1000), $4, $5, $6, $7, $8, $9, $10) " "ON CONFLICT (id) DO UPDATE SET cluster_host = EXCLUDED.cluster_host, last_alive = EXCLUDED.last_alive, " "public_identity = EXCLUDED.public_identity, v_major = EXCLUDED.v_major, v_minor = EXCLUDED.v_minor, " - "v_rev = EXCLUDED.v_rev, v_build = EXCLUDED.v_rev, host_port = EXCLUDED.host_port", - 9, // number of parameters + "v_rev = EXCLUDED.v_rev, v_build = EXCLUDED.v_rev, host_port = EXCLUDED.host_port, " + "use_rabbitmq = EXCLUDED.use_rabbitmq", + 10, // number of parameters NULL, // oid field. ignore values, // values for substitution NULL, // lengths in bytes of each value @@ -587,6 +594,24 @@ void PostgreSQL::membersDbWatcher() initializeMembers(conn); + if (this->_mqc != NULL) { + PQfinish(conn); + conn = NULL; + _membersWatcher_RabbitMQ(); + } else { + _membersWatcher_Postgres(conn); + PQfinish(conn); + conn = NULL; + } + + if (_run == 1) { + fprintf(stderr, "ERROR: %s membersDbWatcher should still be running! Exiting Controller.\n", _myAddressStr.c_str()); + exit(9); + } + fprintf(stderr, "Exited membersDbWatcher\n"); +} + +void PostgreSQL::_membersWatcher_Postgres(PGconn *conn) { char buf[11] = {0}; std::string cmd = "LISTEN member_" + std::string(_myAddress.toString(buf)); PGresult *res = PQexec(conn, cmd.c_str()); @@ -625,11 +650,39 @@ void PostgreSQL::membersDbWatcher() } std::this_thread::sleep_for(std::chrono::milliseconds(10)); } - PQfinish(conn); - conn = NULL; - if (_run == 1) { - fprintf(stderr, "ERROR: %s membersDbWatcher should still be running! Exiting Controller.\n", _myAddressStr.c_str()); - exit(9); +} + +void PostgreSQL::_membersWatcher_RabbitMQ() { + char buf[11] = {0}; + std::string qname = "member_"+ std::string(_myAddress.toString(buf)); + RabbitMQ rmq(_mqc, qname.c_str()); + try { + rmq.init(); + } catch (std::runtime_error &e) { + fprintf(stderr, "RABBITMQ ERROR: %s\n", e.what()); + exit(11); + } + while (_run == 1) { + try { + std::string msg = rmq.consume(); + // fprintf(stderr, "Got Member Update: %s\n", msg.c_str()); + json tmp(json::parse(msg)); + json &ov = tmp["old_val"]; + json &nv = tmp["new_val"]; + json oldConfig, newConfig; + if (ov.is_object()) oldConfig = ov; + if (nv.is_object()) newConfig = nv; + if (oldConfig.is_object() || newConfig.is_object()) { + _memberChanged(oldConfig,newConfig,(this->_ready>=2)); + } + } catch (std::runtime_error &e) { + fprintf(stderr, "RABBITMQ ERROR member change: %s\n", e.what()); + break; + } catch(std::exception &e ) { + fprintf(stderr, "RABBITMQ ERROR member change: %s\n", e.what()); + } catch(...) { + fprintf(stderr, "RABBITMQ ERROR member change: unknown error\n"); + } } } @@ -644,6 +697,24 @@ void PostgreSQL::networksDbWatcher() initializeNetworks(conn); + if (this->_mqc != NULL) { + PQfinish(conn); + conn = NULL; + _networksWatcher_RabbitMQ(); + } else { + _networksWatcher_Postgres(conn); + PQfinish(conn); + conn = NULL; + } + + if (_run == 1) { + fprintf(stderr, "ERROR: %s networksDbWatcher should still be running! Exiting Controller.\n", _myAddressStr.c_str()); + exit(8); + } + fprintf(stderr, "Exited membersDbWatcher\n"); +} + +void PostgreSQL::_networksWatcher_Postgres(PGconn *conn) { char buf[11] = {0}; std::string cmd = "LISTEN network_" + std::string(_myAddress.toString(buf)); PGresult *res = PQexec(conn, cmd.c_str()); @@ -680,11 +751,39 @@ void PostgreSQL::networksDbWatcher() } std::this_thread::sleep_for(std::chrono::milliseconds(10)); } - PQfinish(conn); - conn = NULL; - if (_run == 1) { - fprintf(stderr, "ERROR: %s networksDbWatcher should still be running! Exiting Controller.\n", _myAddressStr.c_str()); - exit(8); +} + +void PostgreSQL::_networksWatcher_RabbitMQ() { + char buf[11] = {0}; + std::string qname = "network_"+ std::string(_myAddress.toString(buf)); + RabbitMQ rmq(_mqc, qname.c_str()); + try { + rmq.init(); + } catch (std::runtime_error &e) { + fprintf(stderr, "RABBITMQ ERROR: %s\n", e.what()); + exit(11); + } + while (_run == 1) { + try { + std::string msg = rmq.consume(); + // fprintf(stderr, "Got network update: %s\n", msg.c_str()); + json tmp(json::parse(msg)); + json &ov = tmp["old_val"]; + json &nv = tmp["new_val"]; + json oldConfig, newConfig; + if (ov.is_object()) oldConfig = ov; + if (nv.is_object()) newConfig = nv; + if (oldConfig.is_object()||newConfig.is_object()) { + _networkChanged(oldConfig,newConfig,(this->_ready >= 2)); + } + } catch (std::runtime_error &e) { + fprintf(stderr, "RABBITMQ ERROR: %s\n", e.what()); + break; + } catch (std::exception &e) { + fprintf(stderr, "RABBITMQ ERROR network watcher: %s\n", e.what()); + } catch(...) { + fprintf(stderr, "RABBITMQ ERROR network watcher: unknown error\n"); + } } } |