summaryrefslogtreecommitdiff
path: root/controller/PostgreSQL.cpp
diff options
context:
space:
mode:
authorGrant Limberg <grant.limberg@zerotier.com>2019-03-05 15:11:50 -0800
committerGrant Limberg <grant.limberg@zerotier.com>2019-03-05 15:11:50 -0800
commitfcb4221f97a7545ac741b184c53c10c3ac2c06b3 (patch)
tree2ce08a4b2447a620c1225d19401f34dbd8929d18 /controller/PostgreSQL.cpp
parent1f13374a4f24c5398d4f1978b217db39aefdffad (diff)
downloadinfinitytier-fcb4221f97a7545ac741b184c53c10c3ac2c06b3.tar.gz
infinitytier-fcb4221f97a7545ac741b184c53c10c3ac2c06b3.zip
rabbitMQ implementation
Diffstat (limited to 'controller/PostgreSQL.cpp')
-rw-r--r--controller/PostgreSQL.cpp57
1 files changed, 54 insertions, 3 deletions
diff --git a/controller/PostgreSQL.cpp b/controller/PostgreSQL.cpp
index 8dc56c9c..5192dad5 100644
--- a/controller/PostgreSQL.cpp
+++ b/controller/PostgreSQL.cpp
@@ -28,6 +28,7 @@
#include "PostgreSQL.hpp"
#include "EmbeddedNetworkController.hpp"
+#include "RabbitMQ.hpp"
#include "../version.h"
#include <libpq-fe.h>
@@ -74,7 +75,7 @@ std::string join(const std::vector<std::string> &elements, const char * const se
using namespace ZeroTier;
-PostgreSQL::PostgreSQL(EmbeddedNetworkController *const nc, const Identity &myId, const char *path, int listenPort, mq_config *mqc)
+PostgreSQL::PostgreSQL(EmbeddedNetworkController *const nc, const Identity &myId, const char *path, int listenPort, MQConfig *mqc)
: DB(nc, myId, path)
, _ready(0)
, _connected(1)
@@ -646,7 +647,32 @@ void PostgreSQL::_membersWatcher_Postgres(PGconn *conn) {
}
void PostgreSQL::_membersWatcher_RabbitMQ() {
-
+ char buf[11] = {0};
+ std::string qname = "member_"+ std::string(_myAddress.toString(buf));
+ RabbitMQ rmq(_mqc, qname.c_str());
+ try {
+ rmq.init();
+ } catch (std::runtime_error &e) {
+ fprintf(stderr, "RABBITMQ ERROR: %s\n", e.what());
+ exit(11);
+ }
+ while (_run == 1) {
+ try {
+ std::string msg = rmq.consume();
+ json tmp(json::parse(msg));
+ json &ov = tmp["old_val"];
+ json &nv = tmp["new_val"];
+ json oldConfig, newConfig;
+ if (ov.is_object()) oldConfig = ov;
+ if (nv.is_object()) newConfig = nv;
+ if (oldConfig.is_object() || newConfig.is_object()) {
+ _memberChanged(oldConfig,newConfig,(this->_ready>=2));
+ }
+ } catch (std::runtime_error &e) {
+ fprintf(stderr, "RABBITMQ ERROR: %s\n", e.what());
+ break;
+ } catch(...) {}
+ }
}
void PostgreSQL::networksDbWatcher()
@@ -714,7 +740,32 @@ void PostgreSQL::_networksWatcher_Postgres(PGconn *conn) {
}
void PostgreSQL::_networksWatcher_RabbitMQ() {
-
+ char buf[11] = {0};
+ std::string qname = "network_"+ std::string(_myAddress.toString(buf));
+ RabbitMQ rmq(_mqc, qname.c_str());
+ try {
+ rmq.init();
+ } catch (std::runtime_error &e) {
+ fprintf(stderr, "RABBITMQ ERROR: %s\n", e.what());
+ exit(11);
+ }
+ while (_run == 1) {
+ try {
+ std::string msg = rmq.consume();
+ json tmp(json::parse(msg));
+ json &ov = tmp["old_val"];
+ json &nv = tmp["new_val"];
+ json oldConfig, newConfig;
+ if (ov.is_object()) oldConfig = ov;
+ if (nv.is_object()) newConfig = nv;
+ if (oldConfig.is_object()||newConfig.is_object()) {
+ _networkChanged(oldConfig,newConfig,(this->_ready >= 2));
+ }
+ } catch (std::runtime_error &e) {
+ fprintf(stderr, "RABBITMQ ERROR: %s\n", e.what());
+ break;
+ } catch(...) {}
+ }
}
void PostgreSQL::commitThread()