diff options
author | Grant Limberg <grant.limberg@zerotier.com> | 2019-03-05 15:11:50 -0800 |
---|---|---|
committer | Grant Limberg <grant.limberg@zerotier.com> | 2019-03-05 15:11:50 -0800 |
commit | fcb4221f97a7545ac741b184c53c10c3ac2c06b3 (patch) | |
tree | 2ce08a4b2447a620c1225d19401f34dbd8929d18 /controller | |
parent | 1f13374a4f24c5398d4f1978b217db39aefdffad (diff) | |
download | infinitytier-fcb4221f97a7545ac741b184c53c10c3ac2c06b3.tar.gz infinitytier-fcb4221f97a7545ac741b184c53c10c3ac2c06b3.zip |
rabbitMQ implementation
Diffstat (limited to 'controller')
-rw-r--r-- | controller/EmbeddedNetworkController.cpp | 7 | ||||
-rw-r--r-- | controller/EmbeddedNetworkController.hpp | 6 | ||||
-rw-r--r-- | controller/PostgreSQL.cpp | 57 | ||||
-rw-r--r-- | controller/PostgreSQL.hpp | 12 | ||||
-rw-r--r-- | controller/RabbitMQ.cpp | 91 | ||||
-rw-r--r-- | controller/RabbitMQ.hpp | 75 |
6 files changed, 232 insertions, 16 deletions
diff --git a/controller/EmbeddedNetworkController.cpp b/controller/EmbeddedNetworkController.cpp index 8b3f1517..5ddcaa4b 100644 --- a/controller/EmbeddedNetworkController.cpp +++ b/controller/EmbeddedNetworkController.cpp @@ -464,12 +464,13 @@ static bool _parseRule(json &r,ZT_VirtualNetworkRule &rule) } // anonymous namespace -EmbeddedNetworkController::EmbeddedNetworkController(Node *node,const char *dbPath, int listenPort) : +EmbeddedNetworkController::EmbeddedNetworkController(Node *node,const char *dbPath, int listenPort, MQConfig *mqc) : _startTime(OSUtils::now()), _listenPort(listenPort), _node(node), _path(dbPath), - _sender((NetworkController::Sender *)0) + _sender((NetworkController::Sender *)0), + _mqc(mqc) { } @@ -489,7 +490,7 @@ void EmbeddedNetworkController::init(const Identity &signingId,Sender *sender) _signingIdAddressString = signingId.address().toString(tmp); #ifdef ZT_CONTROLLER_USE_LIBPQ if ((_path.length() > 9)&&(_path.substr(0,9) == "postgres:")) - _db.reset(new PostgreSQL(this,_signingId,_path.substr(9).c_str(), _listenPort)); + _db.reset(new PostgreSQL(this,_signingId,_path.substr(9).c_str(), _listenPort, _mqc)); else // else use FileDB after endif #endif _db.reset(new FileDB(this,_signingId,_path.c_str())); diff --git a/controller/EmbeddedNetworkController.hpp b/controller/EmbeddedNetworkController.hpp index 718c97ff..269442a8 100644 --- a/controller/EmbeddedNetworkController.hpp +++ b/controller/EmbeddedNetworkController.hpp @@ -60,6 +60,8 @@ namespace ZeroTier { class Node; +struct MQConfig; + class EmbeddedNetworkController : public NetworkController { public: @@ -67,7 +69,7 @@ public: * @param node Parent node * @param dbPath Database path (file path or database credentials) */ - EmbeddedNetworkController(Node *node,const char *dbPath, int listenPort); + EmbeddedNetworkController(Node *node,const char *dbPath, int listenPort, MQConfig *mqc = NULL); virtual ~EmbeddedNetworkController(); virtual void init(const Identity &signingId,Sender *sender); @@ -164,6 +166,8 @@ private: std::unordered_map< _MemberStatusKey,_MemberStatus,_MemberStatusHash > _memberStatus; std::mutex _memberStatus_l; + + MQConfig *_mqc; }; } // namespace ZeroTier diff --git a/controller/PostgreSQL.cpp b/controller/PostgreSQL.cpp index 8dc56c9c..5192dad5 100644 --- a/controller/PostgreSQL.cpp +++ b/controller/PostgreSQL.cpp @@ -28,6 +28,7 @@ #include "PostgreSQL.hpp" #include "EmbeddedNetworkController.hpp" +#include "RabbitMQ.hpp" #include "../version.h" #include <libpq-fe.h> @@ -74,7 +75,7 @@ std::string join(const std::vector<std::string> &elements, const char * const se using namespace ZeroTier; -PostgreSQL::PostgreSQL(EmbeddedNetworkController *const nc, const Identity &myId, const char *path, int listenPort, mq_config *mqc) +PostgreSQL::PostgreSQL(EmbeddedNetworkController *const nc, const Identity &myId, const char *path, int listenPort, MQConfig *mqc) : DB(nc, myId, path) , _ready(0) , _connected(1) @@ -646,7 +647,32 @@ void PostgreSQL::_membersWatcher_Postgres(PGconn *conn) { } void PostgreSQL::_membersWatcher_RabbitMQ() { - + char buf[11] = {0}; + std::string qname = "member_"+ std::string(_myAddress.toString(buf)); + RabbitMQ rmq(_mqc, qname.c_str()); + try { + rmq.init(); + } catch (std::runtime_error &e) { + fprintf(stderr, "RABBITMQ ERROR: %s\n", e.what()); + exit(11); + } + while (_run == 1) { + try { + std::string msg = rmq.consume(); + json tmp(json::parse(msg)); + json &ov = tmp["old_val"]; + json &nv = tmp["new_val"]; + json oldConfig, newConfig; + if (ov.is_object()) oldConfig = ov; + if (nv.is_object()) newConfig = nv; + if (oldConfig.is_object() || newConfig.is_object()) { + _memberChanged(oldConfig,newConfig,(this->_ready>=2)); + } + } catch (std::runtime_error &e) { + fprintf(stderr, "RABBITMQ ERROR: %s\n", e.what()); + break; + } catch(...) {} + } } void PostgreSQL::networksDbWatcher() @@ -714,7 +740,32 @@ void PostgreSQL::_networksWatcher_Postgres(PGconn *conn) { } void PostgreSQL::_networksWatcher_RabbitMQ() { - + char buf[11] = {0}; + std::string qname = "network_"+ std::string(_myAddress.toString(buf)); + RabbitMQ rmq(_mqc, qname.c_str()); + try { + rmq.init(); + } catch (std::runtime_error &e) { + fprintf(stderr, "RABBITMQ ERROR: %s\n", e.what()); + exit(11); + } + while (_run == 1) { + try { + std::string msg = rmq.consume(); + json tmp(json::parse(msg)); + json &ov = tmp["old_val"]; + json &nv = tmp["new_val"]; + json oldConfig, newConfig; + if (ov.is_object()) oldConfig = ov; + if (nv.is_object()) newConfig = nv; + if (oldConfig.is_object()||newConfig.is_object()) { + _networkChanged(oldConfig,newConfig,(this->_ready >= 2)); + } + } catch (std::runtime_error &e) { + fprintf(stderr, "RABBITMQ ERROR: %s\n", e.what()); + break; + } catch(...) {} + } } void PostgreSQL::commitThread() diff --git a/controller/PostgreSQL.hpp b/controller/PostgreSQL.hpp index e0dcdf06..779d47bd 100644 --- a/controller/PostgreSQL.hpp +++ b/controller/PostgreSQL.hpp @@ -40,13 +40,7 @@ extern "C" { namespace ZeroTier { -struct mq_config -{ - const char *host; - int port; - const char *username; - const char *password; -}; +struct MQConfig; /** * A controller database driver that talks to PostgreSQL @@ -57,7 +51,7 @@ struct mq_config class PostgreSQL : public DB { public: - PostgreSQL(EmbeddedNetworkController *const nc, const Identity &myId, const char *path, int listenPort, mq_config *mqc = NULL); + PostgreSQL(EmbeddedNetworkController *const nc, const Identity &myId, const char *path, int listenPort, MQConfig *mqc = NULL); virtual ~PostgreSQL(); virtual bool waitForReady(); @@ -115,7 +109,7 @@ private: int _listenPort; - mq_config *_mqc; + MQConfig *_mqc; }; } diff --git a/controller/RabbitMQ.cpp b/controller/RabbitMQ.cpp new file mode 100644 index 00000000..34efcde4 --- /dev/null +++ b/controller/RabbitMQ.cpp @@ -0,0 +1,91 @@ +#include "RabbitMQ.hpp" + +#include <amqp.h> +#include <amqp_tcp_socket.h> +#include <stdexcept> +#include <cstring> + +namespace ZeroTier +{ + +RabbitMQ::RabbitMQ(MQConfig *cfg, const char *queueName) + : _mqc(cfg) + , _qName(queueName) + , _socket(NULL) + , _status(0) +{ +} + +RabbitMQ::~RabbitMQ() +{ + amqp_channel_close(_conn, _channel, AMQP_REPLY_SUCCESS); + amqp_connection_close(_conn, AMQP_REPLY_SUCCESS); + amqp_destroy_connection(_conn); +} + +void RabbitMQ::init() +{ + struct timeval tval; + memset(&tval, 0, sizeof(struct timeval)); + tval.tv_sec = 5; + + fprintf(stderr, "Initializing RabbitMQ %s\n", _qName); + _conn = amqp_new_connection(); + _socket = amqp_tcp_socket_new(_conn); + if (!_socket) { + throw std::runtime_error("Can't create socket for RabbitMQ"); + } + + _status = amqp_socket_open_noblock(_socket, _mqc->host, _mqc->port, &tval); + if (_status) { + throw std::runtime_error("Can't connect to RabbitMQ"); + } + + amqp_rpc_reply_t r = amqp_login(_conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, + _mqc->username, _mqc->password); + if (r.reply_type != AMQP_RESPONSE_NORMAL) { + throw std::runtime_error("RabbitMQ Login Error"); + } + + static int chan = 0; + amqp_channel_open(_conn, ++chan); + r = amqp_get_rpc_reply(_conn); + if(r.reply_type != AMQP_RESPONSE_NORMAL) { + throw std::runtime_error("Error opening communication channel"); + } + _channel = chan; + + _q = amqp_queue_declare(_conn, _channel, amqp_cstring_bytes(_qName), 0, 0, 0, 0, amqp_empty_table); + r = amqp_get_rpc_reply(_conn); + if (r.reply_type != AMQP_RESPONSE_NORMAL) { + throw std::runtime_error("Error declaring queue"); + } + + amqp_basic_consume(_conn, _channel, amqp_cstring_bytes(_qName), amqp_empty_bytes, 0, 1, 0, amqp_empty_table); + r = amqp_get_rpc_reply(_conn); + if (r.reply_type != AMQP_RESPONSE_NORMAL) { + throw std::runtime_error("Error conuming"); + } + fprintf(stderr, "RabbitMQ Init OK %s\n", _qName); +} + +std::string RabbitMQ::consume() +{ + amqp_rpc_reply_t res; + amqp_envelope_t envelope; + amqp_maybe_release_buffers(_conn); + + res = amqp_consume_message(_conn, &envelope, NULL, 0); + if (res.reply_type != AMQP_RESPONSE_NORMAL) { + throw std::runtime_error("Error getting message"); + } + + std::string msg( + (const char*)envelope.message.body.bytes, + envelope.message.body.len + ); + amqp_destroy_envelope(&envelope); + return msg; +} + +}
\ No newline at end of file diff --git a/controller/RabbitMQ.hpp b/controller/RabbitMQ.hpp new file mode 100644 index 00000000..74023b12 --- /dev/null +++ b/controller/RabbitMQ.hpp @@ -0,0 +1,75 @@ +/* + * ZeroTier One - Network Virtualization Everywhere + * Copyright (C) 2011-2019 ZeroTier, Inc. https://www.zerotier.com/ + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * -- + * + * You can be released from the requirements of the license by purchasing + * a commercial license. Buying such a license is mandatory as soon as you + * develop commercial closed-source software that incorporates or links + * directly against ZeroTier software without disclosing the source code + * of your own application. + */ +#ifndef ZT_CONTROLLER_RABBITMQ_HPP +#define ZT_CONTROLLER_RABBITMQ_HPP + +namespace ZeroTier +{ +struct MQConfig { + const char *host; + int port; + const char *username; + const char *password; +}; +} + + +#ifdef ZT_CONTROLLER_USE_LIBPQ + +#include <amqp.h> +#include <amqp_tcp_socket.h> +#include <string> + +namespace ZeroTier +{ + +class RabbitMQ { +public: + RabbitMQ(MQConfig *cfg, const char *queueName); + ~RabbitMQ(); + + void init(); + + std::string consume(); + +private: + MQConfig *_mqc; + const char *_qName; + + amqp_socket_t *_socket; + amqp_connection_state_t _conn; + amqp_queue_declare_ok_t *_q; + int _status; + + int _channel; +}; + +} + +#endif // ZT_CONTROLLER_USE_LIBPQ + +#endif // ZT_CONTROLLER_RABBITMQ_HPP + |