diff options
author | Grant Limberg <grant.limberg@zerotier.com> | 2019-03-05 15:11:50 -0800 |
---|---|---|
committer | Grant Limberg <grant.limberg@zerotier.com> | 2019-03-05 15:11:50 -0800 |
commit | fcb4221f97a7545ac741b184c53c10c3ac2c06b3 (patch) | |
tree | 2ce08a4b2447a620c1225d19401f34dbd8929d18 /controller/PostgreSQL.cpp | |
parent | 1f13374a4f24c5398d4f1978b217db39aefdffad (diff) | |
download | infinitytier-fcb4221f97a7545ac741b184c53c10c3ac2c06b3.tar.gz infinitytier-fcb4221f97a7545ac741b184c53c10c3ac2c06b3.zip |
rabbitMQ implementation
Diffstat (limited to 'controller/PostgreSQL.cpp')
-rw-r--r-- | controller/PostgreSQL.cpp | 57 |
1 files changed, 54 insertions, 3 deletions
diff --git a/controller/PostgreSQL.cpp b/controller/PostgreSQL.cpp index 8dc56c9c..5192dad5 100644 --- a/controller/PostgreSQL.cpp +++ b/controller/PostgreSQL.cpp @@ -28,6 +28,7 @@ #include "PostgreSQL.hpp" #include "EmbeddedNetworkController.hpp" +#include "RabbitMQ.hpp" #include "../version.h" #include <libpq-fe.h> @@ -74,7 +75,7 @@ std::string join(const std::vector<std::string> &elements, const char * const se using namespace ZeroTier; -PostgreSQL::PostgreSQL(EmbeddedNetworkController *const nc, const Identity &myId, const char *path, int listenPort, mq_config *mqc) +PostgreSQL::PostgreSQL(EmbeddedNetworkController *const nc, const Identity &myId, const char *path, int listenPort, MQConfig *mqc) : DB(nc, myId, path) , _ready(0) , _connected(1) @@ -646,7 +647,32 @@ void PostgreSQL::_membersWatcher_Postgres(PGconn *conn) { } void PostgreSQL::_membersWatcher_RabbitMQ() { - + char buf[11] = {0}; + std::string qname = "member_"+ std::string(_myAddress.toString(buf)); + RabbitMQ rmq(_mqc, qname.c_str()); + try { + rmq.init(); + } catch (std::runtime_error &e) { + fprintf(stderr, "RABBITMQ ERROR: %s\n", e.what()); + exit(11); + } + while (_run == 1) { + try { + std::string msg = rmq.consume(); + json tmp(json::parse(msg)); + json &ov = tmp["old_val"]; + json &nv = tmp["new_val"]; + json oldConfig, newConfig; + if (ov.is_object()) oldConfig = ov; + if (nv.is_object()) newConfig = nv; + if (oldConfig.is_object() || newConfig.is_object()) { + _memberChanged(oldConfig,newConfig,(this->_ready>=2)); + } + } catch (std::runtime_error &e) { + fprintf(stderr, "RABBITMQ ERROR: %s\n", e.what()); + break; + } catch(...) {} + } } void PostgreSQL::networksDbWatcher() @@ -714,7 +740,32 @@ void PostgreSQL::_networksWatcher_Postgres(PGconn *conn) { } void PostgreSQL::_networksWatcher_RabbitMQ() { - + char buf[11] = {0}; + std::string qname = "network_"+ std::string(_myAddress.toString(buf)); + RabbitMQ rmq(_mqc, qname.c_str()); + try { + rmq.init(); + } catch (std::runtime_error &e) { + fprintf(stderr, "RABBITMQ ERROR: %s\n", e.what()); + exit(11); + } + while (_run == 1) { + try { + std::string msg = rmq.consume(); + json tmp(json::parse(msg)); + json &ov = tmp["old_val"]; + json &nv = tmp["new_val"]; + json oldConfig, newConfig; + if (ov.is_object()) oldConfig = ov; + if (nv.is_object()) newConfig = nv; + if (oldConfig.is_object()||newConfig.is_object()) { + _networkChanged(oldConfig,newConfig,(this->_ready >= 2)); + } + } catch (std::runtime_error &e) { + fprintf(stderr, "RABBITMQ ERROR: %s\n", e.what()); + break; + } catch(...) {} + } } void PostgreSQL::commitThread() |