diff options
Diffstat (limited to 'controller')
| -rw-r--r-- | controller/EmbeddedNetworkController.cpp | 7 | ||||
| -rw-r--r-- | controller/EmbeddedNetworkController.hpp | 6 | ||||
| -rw-r--r-- | controller/PostgreSQL.cpp | 133 | ||||
| -rw-r--r-- | controller/PostgreSQL.hpp | 12 | ||||
| -rw-r--r-- | controller/RabbitMQ.cpp | 94 | ||||
| -rw-r--r-- | controller/RabbitMQ.hpp | 79 |
6 files changed, 309 insertions, 22 deletions
diff --git a/controller/EmbeddedNetworkController.cpp b/controller/EmbeddedNetworkController.cpp index 8b3f1517..5ddcaa4b 100644 --- a/controller/EmbeddedNetworkController.cpp +++ b/controller/EmbeddedNetworkController.cpp @@ -464,12 +464,13 @@ static bool _parseRule(json &r,ZT_VirtualNetworkRule &rule) } // anonymous namespace -EmbeddedNetworkController::EmbeddedNetworkController(Node *node,const char *dbPath, int listenPort) : +EmbeddedNetworkController::EmbeddedNetworkController(Node *node,const char *dbPath, int listenPort, MQConfig *mqc) : _startTime(OSUtils::now()), _listenPort(listenPort), _node(node), _path(dbPath), - _sender((NetworkController::Sender *)0) + _sender((NetworkController::Sender *)0), + _mqc(mqc) { } @@ -489,7 +490,7 @@ void EmbeddedNetworkController::init(const Identity &signingId,Sender *sender) _signingIdAddressString = signingId.address().toString(tmp); #ifdef ZT_CONTROLLER_USE_LIBPQ if ((_path.length() > 9)&&(_path.substr(0,9) == "postgres:")) - _db.reset(new PostgreSQL(this,_signingId,_path.substr(9).c_str(), _listenPort)); + _db.reset(new PostgreSQL(this,_signingId,_path.substr(9).c_str(), _listenPort, _mqc)); else // else use FileDB after endif #endif _db.reset(new FileDB(this,_signingId,_path.c_str())); diff --git a/controller/EmbeddedNetworkController.hpp b/controller/EmbeddedNetworkController.hpp index 718c97ff..269442a8 100644 --- a/controller/EmbeddedNetworkController.hpp +++ b/controller/EmbeddedNetworkController.hpp @@ -60,6 +60,8 @@ namespace ZeroTier { class Node; +struct MQConfig; + class EmbeddedNetworkController : public NetworkController { public: @@ -67,7 +69,7 @@ public: * @param node Parent node * @param dbPath Database path (file path or database credentials) */ - EmbeddedNetworkController(Node *node,const char *dbPath, int listenPort); + EmbeddedNetworkController(Node *node,const char *dbPath, int listenPort, MQConfig *mqc = NULL); virtual ~EmbeddedNetworkController(); virtual void init(const Identity &signingId,Sender *sender); @@ -164,6 +166,8 @@ private: std::unordered_map< _MemberStatusKey,_MemberStatus,_MemberStatusHash > _memberStatus; std::mutex _memberStatus_l; + + MQConfig *_mqc; }; } // namespace ZeroTier diff --git a/controller/PostgreSQL.cpp b/controller/PostgreSQL.cpp index 6165d8c4..a856e4af 100644 --- a/controller/PostgreSQL.cpp +++ b/controller/PostgreSQL.cpp @@ -28,10 +28,13 @@ #include "PostgreSQL.hpp" #include "EmbeddedNetworkController.hpp" +#include "RabbitMQ.hpp" #include "../version.h" #include <libpq-fe.h> #include <sstream> +#include <amqp.h> +#include <amqp_tcp_socket.h> using json = nlohmann::json; namespace { @@ -72,13 +75,14 @@ std::string join(const std::vector<std::string> &elements, const char * const se using namespace ZeroTier; -PostgreSQL::PostgreSQL(EmbeddedNetworkController *const nc, const Identity &myId, const char *path, int listenPort) +PostgreSQL::PostgreSQL(EmbeddedNetworkController *const nc, const Identity &myId, const char *path, int listenPort, MQConfig *mqc) : DB(nc, myId, path) , _ready(0) , _connected(1) , _run(1) , _waitNoticePrinted(false) , _listenPort(listenPort) + , _mqc(mqc) { _connString = std::string(path) + " application_name=controller_" +_myAddressStr; @@ -538,7 +542,8 @@ void PostgreSQL::heartbeat() std::string build = std::to_string(ZEROTIER_ONE_VERSION_BUILD); std::string now = std::to_string(OSUtils::now()); std::string host_port = std::to_string(_listenPort); - const char *values[9] = { + std::string use_rabbitmq = (_mqc != NULL) ? "true" : "false"; + const char *values[10] = { controllerId, hostname, now.c_str(), @@ -547,16 +552,18 @@ void PostgreSQL::heartbeat() minor.c_str(), rev.c_str(), build.c_str(), - host_port.c_str() + host_port.c_str(), + use_rabbitmq.c_str() }; PGresult *res = PQexecParams(conn, - "INSERT INTO ztc_controller (id, cluster_host, last_alive, public_identity, v_major, v_minor, v_rev, v_build, host_port) " - "VALUES ($1, $2, TO_TIMESTAMP($3::double precision/1000), $4, $5, $6, $7, $8, $9) " + "INSERT INTO ztc_controller (id, cluster_host, last_alive, public_identity, v_major, v_minor, v_rev, v_build, host_port, use_rabbitmq) " + "VALUES ($1, $2, TO_TIMESTAMP($3::double precision/1000), $4, $5, $6, $7, $8, $9, $10) " "ON CONFLICT (id) DO UPDATE SET cluster_host = EXCLUDED.cluster_host, last_alive = EXCLUDED.last_alive, " "public_identity = EXCLUDED.public_identity, v_major = EXCLUDED.v_major, v_minor = EXCLUDED.v_minor, " - "v_rev = EXCLUDED.v_rev, v_build = EXCLUDED.v_rev, host_port = EXCLUDED.host_port", - 9, // number of parameters + "v_rev = EXCLUDED.v_rev, v_build = EXCLUDED.v_rev, host_port = EXCLUDED.host_port, " + "use_rabbitmq = EXCLUDED.use_rabbitmq", + 10, // number of parameters NULL, // oid field. ignore values, // values for substitution NULL, // lengths in bytes of each value @@ -587,6 +594,24 @@ void PostgreSQL::membersDbWatcher() initializeMembers(conn); + if (this->_mqc != NULL) { + PQfinish(conn); + conn = NULL; + _membersWatcher_RabbitMQ(); + } else { + _membersWatcher_Postgres(conn); + PQfinish(conn); + conn = NULL; + } + + if (_run == 1) { + fprintf(stderr, "ERROR: %s membersDbWatcher should still be running! Exiting Controller.\n", _myAddressStr.c_str()); + exit(9); + } + fprintf(stderr, "Exited membersDbWatcher\n"); +} + +void PostgreSQL::_membersWatcher_Postgres(PGconn *conn) { char buf[11] = {0}; std::string cmd = "LISTEN member_" + std::string(_myAddress.toString(buf)); PGresult *res = PQexec(conn, cmd.c_str()); @@ -625,11 +650,39 @@ void PostgreSQL::membersDbWatcher() } std::this_thread::sleep_for(std::chrono::milliseconds(10)); } - PQfinish(conn); - conn = NULL; - if (_run == 1) { - fprintf(stderr, "ERROR: %s membersDbWatcher should still be running! Exiting Controller.\n", _myAddressStr.c_str()); - exit(9); +} + +void PostgreSQL::_membersWatcher_RabbitMQ() { + char buf[11] = {0}; + std::string qname = "member_"+ std::string(_myAddress.toString(buf)); + RabbitMQ rmq(_mqc, qname.c_str()); + try { + rmq.init(); + } catch (std::runtime_error &e) { + fprintf(stderr, "RABBITMQ ERROR: %s\n", e.what()); + exit(11); + } + while (_run == 1) { + try { + std::string msg = rmq.consume(); + // fprintf(stderr, "Got Member Update: %s\n", msg.c_str()); + json tmp(json::parse(msg)); + json &ov = tmp["old_val"]; + json &nv = tmp["new_val"]; + json oldConfig, newConfig; + if (ov.is_object()) oldConfig = ov; + if (nv.is_object()) newConfig = nv; + if (oldConfig.is_object() || newConfig.is_object()) { + _memberChanged(oldConfig,newConfig,(this->_ready>=2)); + } + } catch (std::runtime_error &e) { + fprintf(stderr, "RABBITMQ ERROR member change: %s\n", e.what()); + break; + } catch(std::exception &e ) { + fprintf(stderr, "RABBITMQ ERROR member change: %s\n", e.what()); + } catch(...) { + fprintf(stderr, "RABBITMQ ERROR member change: unknown error\n"); + } } } @@ -644,6 +697,24 @@ void PostgreSQL::networksDbWatcher() initializeNetworks(conn); + if (this->_mqc != NULL) { + PQfinish(conn); + conn = NULL; + _networksWatcher_RabbitMQ(); + } else { + _networksWatcher_Postgres(conn); + PQfinish(conn); + conn = NULL; + } + + if (_run == 1) { + fprintf(stderr, "ERROR: %s networksDbWatcher should still be running! Exiting Controller.\n", _myAddressStr.c_str()); + exit(8); + } + fprintf(stderr, "Exited membersDbWatcher\n"); +} + +void PostgreSQL::_networksWatcher_Postgres(PGconn *conn) { char buf[11] = {0}; std::string cmd = "LISTEN network_" + std::string(_myAddress.toString(buf)); PGresult *res = PQexec(conn, cmd.c_str()); @@ -680,11 +751,39 @@ void PostgreSQL::networksDbWatcher() } std::this_thread::sleep_for(std::chrono::milliseconds(10)); } - PQfinish(conn); - conn = NULL; - if (_run == 1) { - fprintf(stderr, "ERROR: %s networksDbWatcher should still be running! Exiting Controller.\n", _myAddressStr.c_str()); - exit(8); +} + +void PostgreSQL::_networksWatcher_RabbitMQ() { + char buf[11] = {0}; + std::string qname = "network_"+ std::string(_myAddress.toString(buf)); + RabbitMQ rmq(_mqc, qname.c_str()); + try { + rmq.init(); + } catch (std::runtime_error &e) { + fprintf(stderr, "RABBITMQ ERROR: %s\n", e.what()); + exit(11); + } + while (_run == 1) { + try { + std::string msg = rmq.consume(); + // fprintf(stderr, "Got network update: %s\n", msg.c_str()); + json tmp(json::parse(msg)); + json &ov = tmp["old_val"]; + json &nv = tmp["new_val"]; + json oldConfig, newConfig; + if (ov.is_object()) oldConfig = ov; + if (nv.is_object()) newConfig = nv; + if (oldConfig.is_object()||newConfig.is_object()) { + _networkChanged(oldConfig,newConfig,(this->_ready >= 2)); + } + } catch (std::runtime_error &e) { + fprintf(stderr, "RABBITMQ ERROR: %s\n", e.what()); + break; + } catch (std::exception &e) { + fprintf(stderr, "RABBITMQ ERROR network watcher: %s\n", e.what()); + } catch(...) { + fprintf(stderr, "RABBITMQ ERROR network watcher: unknown error\n"); + } } } diff --git a/controller/PostgreSQL.hpp b/controller/PostgreSQL.hpp index 7932317b..779d47bd 100644 --- a/controller/PostgreSQL.hpp +++ b/controller/PostgreSQL.hpp @@ -40,6 +40,8 @@ extern "C" { namespace ZeroTier { +struct MQConfig; + /** * A controller database driver that talks to PostgreSQL * @@ -49,7 +51,7 @@ namespace ZeroTier class PostgreSQL : public DB { public: - PostgreSQL(EmbeddedNetworkController *const nc, const Identity &myId, const char *path, int listenPort); + PostgreSQL(EmbeddedNetworkController *const nc, const Identity &myId, const char *path, int listenPort, MQConfig *mqc = NULL); virtual ~PostgreSQL(); virtual bool waitForReady(); @@ -70,7 +72,13 @@ private: void initializeMembers(PGconn *conn); void heartbeat(); void membersDbWatcher(); + void _membersWatcher_Postgres(PGconn *conn); + void _membersWatcher_RabbitMQ(); void networksDbWatcher(); + void _networksWatcher_Postgres(PGconn *conn); + void _networksWatcher_RabbitMQ(); + + void commitThread(); void onlineNotificationThread(); @@ -100,6 +108,8 @@ private: mutable volatile bool _waitNoticePrinted; int _listenPort; + + MQConfig *_mqc; }; } diff --git a/controller/RabbitMQ.cpp b/controller/RabbitMQ.cpp new file mode 100644 index 00000000..f5a5c1d6 --- /dev/null +++ b/controller/RabbitMQ.cpp @@ -0,0 +1,94 @@ +#include "RabbitMQ.hpp" + +#include <amqp.h> +#include <amqp_tcp_socket.h> +#include <stdexcept> +#include <cstring> + +namespace ZeroTier +{ + +RabbitMQ::RabbitMQ(MQConfig *cfg, const char *queueName) + : _mqc(cfg) + , _qName(queueName) + , _socket(NULL) + , _status(0) +{ +} + +RabbitMQ::~RabbitMQ() +{ + amqp_channel_close(_conn, _channel, AMQP_REPLY_SUCCESS); + amqp_connection_close(_conn, AMQP_REPLY_SUCCESS); + amqp_destroy_connection(_conn); +} + +void RabbitMQ::init() +{ + struct timeval tval; + memset(&tval, 0, sizeof(struct timeval)); + tval.tv_sec = 5; + + fprintf(stderr, "Initializing RabbitMQ %s\n", _qName); + _conn = amqp_new_connection(); + _socket = amqp_tcp_socket_new(_conn); + if (!_socket) { + throw std::runtime_error("Can't create socket for RabbitMQ"); + } + + _status = amqp_socket_open_noblock(_socket, _mqc->host, _mqc->port, &tval); + if (_status) { + throw std::runtime_error("Can't connect to RabbitMQ"); + } + + amqp_rpc_reply_t r = amqp_login(_conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, + _mqc->username, _mqc->password); + if (r.reply_type != AMQP_RESPONSE_NORMAL) { + throw std::runtime_error("RabbitMQ Login Error"); + } + + static int chan = 0; + { + Mutex::Lock l(_chan_m); + _channel = ++chan; + } + amqp_channel_open(_conn, _channel); + r = amqp_get_rpc_reply(_conn); + if(r.reply_type != AMQP_RESPONSE_NORMAL) { + throw std::runtime_error("Error opening communication channel"); + } + + _q = amqp_queue_declare(_conn, _channel, amqp_cstring_bytes(_qName), 0, 0, 0, 0, amqp_empty_table); + r = amqp_get_rpc_reply(_conn); + if (r.reply_type != AMQP_RESPONSE_NORMAL) { + throw std::runtime_error("Error declaring queue " + std::string(_qName)); + } + + amqp_basic_consume(_conn, _channel, amqp_cstring_bytes(_qName), amqp_empty_bytes, 0, 1, 0, amqp_empty_table); + r = amqp_get_rpc_reply(_conn); + if (r.reply_type != AMQP_RESPONSE_NORMAL) { + throw std::runtime_error("Error consuming queue " + std::string(_qName)); + } + fprintf(stderr, "RabbitMQ Init OK %s\n", _qName); +} + +std::string RabbitMQ::consume() +{ + amqp_rpc_reply_t res; + amqp_envelope_t envelope; + amqp_maybe_release_buffers(_conn); + + res = amqp_consume_message(_conn, &envelope, NULL, 0); + if (res.reply_type != AMQP_RESPONSE_NORMAL) { + throw std::runtime_error("Error getting message"); + } + + std::string msg( + (const char*)envelope.message.body.bytes, + envelope.message.body.len + ); + amqp_destroy_envelope(&envelope); + return msg; +} + +} diff --git a/controller/RabbitMQ.hpp b/controller/RabbitMQ.hpp new file mode 100644 index 00000000..d341681b --- /dev/null +++ b/controller/RabbitMQ.hpp @@ -0,0 +1,79 @@ +/* + * ZeroTier One - Network Virtualization Everywhere + * Copyright (C) 2011-2019 ZeroTier, Inc. https://www.zerotier.com/ + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * -- + * + * You can be released from the requirements of the license by purchasing + * a commercial license. Buying such a license is mandatory as soon as you + * develop commercial closed-source software that incorporates or links + * directly against ZeroTier software without disclosing the source code + * of your own application. + */ +#ifndef ZT_CONTROLLER_RABBITMQ_HPP +#define ZT_CONTROLLER_RABBITMQ_HPP + +namespace ZeroTier +{ +struct MQConfig { + const char *host; + int port; + const char *username; + const char *password; +}; +} + +#ifdef ZT_CONTROLLER_USE_LIBPQ + +#include "../node/Mutex.hpp" + +#include <amqp.h> +#include <amqp_tcp_socket.h> +#include <string> + +namespace ZeroTier +{ + +class RabbitMQ { +public: + RabbitMQ(MQConfig *cfg, const char *queueName); + ~RabbitMQ(); + + void init(); + + std::string consume(); + +private: + MQConfig *_mqc; + const char *_qName; + + amqp_socket_t *_socket; + amqp_connection_state_t _conn; + amqp_queue_declare_ok_t *_q; + int _status; + + int _channel; + + Mutex _chan_m; + +}; + +} + +#endif // ZT_CONTROLLER_USE_LIBPQ + +#endif // ZT_CONTROLLER_RABBITMQ_HPP + |
