summaryrefslogtreecommitdiff
path: root/controller
diff options
context:
space:
mode:
Diffstat (limited to 'controller')
-rw-r--r--controller/EmbeddedNetworkController.cpp7
-rw-r--r--controller/EmbeddedNetworkController.hpp6
-rw-r--r--controller/PostgreSQL.cpp133
-rw-r--r--controller/PostgreSQL.hpp12
-rw-r--r--controller/RabbitMQ.cpp94
-rw-r--r--controller/RabbitMQ.hpp79
6 files changed, 309 insertions, 22 deletions
diff --git a/controller/EmbeddedNetworkController.cpp b/controller/EmbeddedNetworkController.cpp
index 8b3f1517..5ddcaa4b 100644
--- a/controller/EmbeddedNetworkController.cpp
+++ b/controller/EmbeddedNetworkController.cpp
@@ -464,12 +464,13 @@ static bool _parseRule(json &r,ZT_VirtualNetworkRule &rule)
} // anonymous namespace
-EmbeddedNetworkController::EmbeddedNetworkController(Node *node,const char *dbPath, int listenPort) :
+EmbeddedNetworkController::EmbeddedNetworkController(Node *node,const char *dbPath, int listenPort, MQConfig *mqc) :
_startTime(OSUtils::now()),
_listenPort(listenPort),
_node(node),
_path(dbPath),
- _sender((NetworkController::Sender *)0)
+ _sender((NetworkController::Sender *)0),
+ _mqc(mqc)
{
}
@@ -489,7 +490,7 @@ void EmbeddedNetworkController::init(const Identity &signingId,Sender *sender)
_signingIdAddressString = signingId.address().toString(tmp);
#ifdef ZT_CONTROLLER_USE_LIBPQ
if ((_path.length() > 9)&&(_path.substr(0,9) == "postgres:"))
- _db.reset(new PostgreSQL(this,_signingId,_path.substr(9).c_str(), _listenPort));
+ _db.reset(new PostgreSQL(this,_signingId,_path.substr(9).c_str(), _listenPort, _mqc));
else // else use FileDB after endif
#endif
_db.reset(new FileDB(this,_signingId,_path.c_str()));
diff --git a/controller/EmbeddedNetworkController.hpp b/controller/EmbeddedNetworkController.hpp
index 718c97ff..269442a8 100644
--- a/controller/EmbeddedNetworkController.hpp
+++ b/controller/EmbeddedNetworkController.hpp
@@ -60,6 +60,8 @@ namespace ZeroTier {
class Node;
+struct MQConfig;
+
class EmbeddedNetworkController : public NetworkController
{
public:
@@ -67,7 +69,7 @@ public:
* @param node Parent node
* @param dbPath Database path (file path or database credentials)
*/
- EmbeddedNetworkController(Node *node,const char *dbPath, int listenPort);
+ EmbeddedNetworkController(Node *node,const char *dbPath, int listenPort, MQConfig *mqc = NULL);
virtual ~EmbeddedNetworkController();
virtual void init(const Identity &signingId,Sender *sender);
@@ -164,6 +166,8 @@ private:
std::unordered_map< _MemberStatusKey,_MemberStatus,_MemberStatusHash > _memberStatus;
std::mutex _memberStatus_l;
+
+ MQConfig *_mqc;
};
} // namespace ZeroTier
diff --git a/controller/PostgreSQL.cpp b/controller/PostgreSQL.cpp
index 6165d8c4..a856e4af 100644
--- a/controller/PostgreSQL.cpp
+++ b/controller/PostgreSQL.cpp
@@ -28,10 +28,13 @@
#include "PostgreSQL.hpp"
#include "EmbeddedNetworkController.hpp"
+#include "RabbitMQ.hpp"
#include "../version.h"
#include <libpq-fe.h>
#include <sstream>
+#include <amqp.h>
+#include <amqp_tcp_socket.h>
using json = nlohmann::json;
namespace {
@@ -72,13 +75,14 @@ std::string join(const std::vector<std::string> &elements, const char * const se
using namespace ZeroTier;
-PostgreSQL::PostgreSQL(EmbeddedNetworkController *const nc, const Identity &myId, const char *path, int listenPort)
+PostgreSQL::PostgreSQL(EmbeddedNetworkController *const nc, const Identity &myId, const char *path, int listenPort, MQConfig *mqc)
: DB(nc, myId, path)
, _ready(0)
, _connected(1)
, _run(1)
, _waitNoticePrinted(false)
, _listenPort(listenPort)
+ , _mqc(mqc)
{
_connString = std::string(path) + " application_name=controller_" +_myAddressStr;
@@ -538,7 +542,8 @@ void PostgreSQL::heartbeat()
std::string build = std::to_string(ZEROTIER_ONE_VERSION_BUILD);
std::string now = std::to_string(OSUtils::now());
std::string host_port = std::to_string(_listenPort);
- const char *values[9] = {
+ std::string use_rabbitmq = (_mqc != NULL) ? "true" : "false";
+ const char *values[10] = {
controllerId,
hostname,
now.c_str(),
@@ -547,16 +552,18 @@ void PostgreSQL::heartbeat()
minor.c_str(),
rev.c_str(),
build.c_str(),
- host_port.c_str()
+ host_port.c_str(),
+ use_rabbitmq.c_str()
};
PGresult *res = PQexecParams(conn,
- "INSERT INTO ztc_controller (id, cluster_host, last_alive, public_identity, v_major, v_minor, v_rev, v_build, host_port) "
- "VALUES ($1, $2, TO_TIMESTAMP($3::double precision/1000), $4, $5, $6, $7, $8, $9) "
+ "INSERT INTO ztc_controller (id, cluster_host, last_alive, public_identity, v_major, v_minor, v_rev, v_build, host_port, use_rabbitmq) "
+ "VALUES ($1, $2, TO_TIMESTAMP($3::double precision/1000), $4, $5, $6, $7, $8, $9, $10) "
"ON CONFLICT (id) DO UPDATE SET cluster_host = EXCLUDED.cluster_host, last_alive = EXCLUDED.last_alive, "
"public_identity = EXCLUDED.public_identity, v_major = EXCLUDED.v_major, v_minor = EXCLUDED.v_minor, "
- "v_rev = EXCLUDED.v_rev, v_build = EXCLUDED.v_rev, host_port = EXCLUDED.host_port",
- 9, // number of parameters
+ "v_rev = EXCLUDED.v_rev, v_build = EXCLUDED.v_rev, host_port = EXCLUDED.host_port, "
+ "use_rabbitmq = EXCLUDED.use_rabbitmq",
+ 10, // number of parameters
NULL, // oid field. ignore
values, // values for substitution
NULL, // lengths in bytes of each value
@@ -587,6 +594,24 @@ void PostgreSQL::membersDbWatcher()
initializeMembers(conn);
+ if (this->_mqc != NULL) {
+ PQfinish(conn);
+ conn = NULL;
+ _membersWatcher_RabbitMQ();
+ } else {
+ _membersWatcher_Postgres(conn);
+ PQfinish(conn);
+ conn = NULL;
+ }
+
+ if (_run == 1) {
+ fprintf(stderr, "ERROR: %s membersDbWatcher should still be running! Exiting Controller.\n", _myAddressStr.c_str());
+ exit(9);
+ }
+ fprintf(stderr, "Exited membersDbWatcher\n");
+}
+
+void PostgreSQL::_membersWatcher_Postgres(PGconn *conn) {
char buf[11] = {0};
std::string cmd = "LISTEN member_" + std::string(_myAddress.toString(buf));
PGresult *res = PQexec(conn, cmd.c_str());
@@ -625,11 +650,39 @@ void PostgreSQL::membersDbWatcher()
}
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
- PQfinish(conn);
- conn = NULL;
- if (_run == 1) {
- fprintf(stderr, "ERROR: %s membersDbWatcher should still be running! Exiting Controller.\n", _myAddressStr.c_str());
- exit(9);
+}
+
+void PostgreSQL::_membersWatcher_RabbitMQ() {
+ char buf[11] = {0};
+ std::string qname = "member_"+ std::string(_myAddress.toString(buf));
+ RabbitMQ rmq(_mqc, qname.c_str());
+ try {
+ rmq.init();
+ } catch (std::runtime_error &e) {
+ fprintf(stderr, "RABBITMQ ERROR: %s\n", e.what());
+ exit(11);
+ }
+ while (_run == 1) {
+ try {
+ std::string msg = rmq.consume();
+ // fprintf(stderr, "Got Member Update: %s\n", msg.c_str());
+ json tmp(json::parse(msg));
+ json &ov = tmp["old_val"];
+ json &nv = tmp["new_val"];
+ json oldConfig, newConfig;
+ if (ov.is_object()) oldConfig = ov;
+ if (nv.is_object()) newConfig = nv;
+ if (oldConfig.is_object() || newConfig.is_object()) {
+ _memberChanged(oldConfig,newConfig,(this->_ready>=2));
+ }
+ } catch (std::runtime_error &e) {
+ fprintf(stderr, "RABBITMQ ERROR member change: %s\n", e.what());
+ break;
+ } catch(std::exception &e ) {
+ fprintf(stderr, "RABBITMQ ERROR member change: %s\n", e.what());
+ } catch(...) {
+ fprintf(stderr, "RABBITMQ ERROR member change: unknown error\n");
+ }
}
}
@@ -644,6 +697,24 @@ void PostgreSQL::networksDbWatcher()
initializeNetworks(conn);
+ if (this->_mqc != NULL) {
+ PQfinish(conn);
+ conn = NULL;
+ _networksWatcher_RabbitMQ();
+ } else {
+ _networksWatcher_Postgres(conn);
+ PQfinish(conn);
+ conn = NULL;
+ }
+
+ if (_run == 1) {
+ fprintf(stderr, "ERROR: %s networksDbWatcher should still be running! Exiting Controller.\n", _myAddressStr.c_str());
+ exit(8);
+ }
+ fprintf(stderr, "Exited membersDbWatcher\n");
+}
+
+void PostgreSQL::_networksWatcher_Postgres(PGconn *conn) {
char buf[11] = {0};
std::string cmd = "LISTEN network_" + std::string(_myAddress.toString(buf));
PGresult *res = PQexec(conn, cmd.c_str());
@@ -680,11 +751,39 @@ void PostgreSQL::networksDbWatcher()
}
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
- PQfinish(conn);
- conn = NULL;
- if (_run == 1) {
- fprintf(stderr, "ERROR: %s networksDbWatcher should still be running! Exiting Controller.\n", _myAddressStr.c_str());
- exit(8);
+}
+
+void PostgreSQL::_networksWatcher_RabbitMQ() {
+ char buf[11] = {0};
+ std::string qname = "network_"+ std::string(_myAddress.toString(buf));
+ RabbitMQ rmq(_mqc, qname.c_str());
+ try {
+ rmq.init();
+ } catch (std::runtime_error &e) {
+ fprintf(stderr, "RABBITMQ ERROR: %s\n", e.what());
+ exit(11);
+ }
+ while (_run == 1) {
+ try {
+ std::string msg = rmq.consume();
+ // fprintf(stderr, "Got network update: %s\n", msg.c_str());
+ json tmp(json::parse(msg));
+ json &ov = tmp["old_val"];
+ json &nv = tmp["new_val"];
+ json oldConfig, newConfig;
+ if (ov.is_object()) oldConfig = ov;
+ if (nv.is_object()) newConfig = nv;
+ if (oldConfig.is_object()||newConfig.is_object()) {
+ _networkChanged(oldConfig,newConfig,(this->_ready >= 2));
+ }
+ } catch (std::runtime_error &e) {
+ fprintf(stderr, "RABBITMQ ERROR: %s\n", e.what());
+ break;
+ } catch (std::exception &e) {
+ fprintf(stderr, "RABBITMQ ERROR network watcher: %s\n", e.what());
+ } catch(...) {
+ fprintf(stderr, "RABBITMQ ERROR network watcher: unknown error\n");
+ }
}
}
diff --git a/controller/PostgreSQL.hpp b/controller/PostgreSQL.hpp
index 7932317b..779d47bd 100644
--- a/controller/PostgreSQL.hpp
+++ b/controller/PostgreSQL.hpp
@@ -40,6 +40,8 @@ extern "C" {
namespace ZeroTier
{
+struct MQConfig;
+
/**
* A controller database driver that talks to PostgreSQL
*
@@ -49,7 +51,7 @@ namespace ZeroTier
class PostgreSQL : public DB
{
public:
- PostgreSQL(EmbeddedNetworkController *const nc, const Identity &myId, const char *path, int listenPort);
+ PostgreSQL(EmbeddedNetworkController *const nc, const Identity &myId, const char *path, int listenPort, MQConfig *mqc = NULL);
virtual ~PostgreSQL();
virtual bool waitForReady();
@@ -70,7 +72,13 @@ private:
void initializeMembers(PGconn *conn);
void heartbeat();
void membersDbWatcher();
+ void _membersWatcher_Postgres(PGconn *conn);
+ void _membersWatcher_RabbitMQ();
void networksDbWatcher();
+ void _networksWatcher_Postgres(PGconn *conn);
+ void _networksWatcher_RabbitMQ();
+
+
void commitThread();
void onlineNotificationThread();
@@ -100,6 +108,8 @@ private:
mutable volatile bool _waitNoticePrinted;
int _listenPort;
+
+ MQConfig *_mqc;
};
}
diff --git a/controller/RabbitMQ.cpp b/controller/RabbitMQ.cpp
new file mode 100644
index 00000000..f5a5c1d6
--- /dev/null
+++ b/controller/RabbitMQ.cpp
@@ -0,0 +1,94 @@
+#include "RabbitMQ.hpp"
+
+#include <amqp.h>
+#include <amqp_tcp_socket.h>
+#include <stdexcept>
+#include <cstring>
+
+namespace ZeroTier
+{
+
+RabbitMQ::RabbitMQ(MQConfig *cfg, const char *queueName)
+ : _mqc(cfg)
+ , _qName(queueName)
+ , _socket(NULL)
+ , _status(0)
+{
+}
+
+RabbitMQ::~RabbitMQ()
+{
+ amqp_channel_close(_conn, _channel, AMQP_REPLY_SUCCESS);
+ amqp_connection_close(_conn, AMQP_REPLY_SUCCESS);
+ amqp_destroy_connection(_conn);
+}
+
+void RabbitMQ::init()
+{
+ struct timeval tval;
+ memset(&tval, 0, sizeof(struct timeval));
+ tval.tv_sec = 5;
+
+ fprintf(stderr, "Initializing RabbitMQ %s\n", _qName);
+ _conn = amqp_new_connection();
+ _socket = amqp_tcp_socket_new(_conn);
+ if (!_socket) {
+ throw std::runtime_error("Can't create socket for RabbitMQ");
+ }
+
+ _status = amqp_socket_open_noblock(_socket, _mqc->host, _mqc->port, &tval);
+ if (_status) {
+ throw std::runtime_error("Can't connect to RabbitMQ");
+ }
+
+ amqp_rpc_reply_t r = amqp_login(_conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN,
+ _mqc->username, _mqc->password);
+ if (r.reply_type != AMQP_RESPONSE_NORMAL) {
+ throw std::runtime_error("RabbitMQ Login Error");
+ }
+
+ static int chan = 0;
+ {
+ Mutex::Lock l(_chan_m);
+ _channel = ++chan;
+ }
+ amqp_channel_open(_conn, _channel);
+ r = amqp_get_rpc_reply(_conn);
+ if(r.reply_type != AMQP_RESPONSE_NORMAL) {
+ throw std::runtime_error("Error opening communication channel");
+ }
+
+ _q = amqp_queue_declare(_conn, _channel, amqp_cstring_bytes(_qName), 0, 0, 0, 0, amqp_empty_table);
+ r = amqp_get_rpc_reply(_conn);
+ if (r.reply_type != AMQP_RESPONSE_NORMAL) {
+ throw std::runtime_error("Error declaring queue " + std::string(_qName));
+ }
+
+ amqp_basic_consume(_conn, _channel, amqp_cstring_bytes(_qName), amqp_empty_bytes, 0, 1, 0, amqp_empty_table);
+ r = amqp_get_rpc_reply(_conn);
+ if (r.reply_type != AMQP_RESPONSE_NORMAL) {
+ throw std::runtime_error("Error consuming queue " + std::string(_qName));
+ }
+ fprintf(stderr, "RabbitMQ Init OK %s\n", _qName);
+}
+
+std::string RabbitMQ::consume()
+{
+ amqp_rpc_reply_t res;
+ amqp_envelope_t envelope;
+ amqp_maybe_release_buffers(_conn);
+
+ res = amqp_consume_message(_conn, &envelope, NULL, 0);
+ if (res.reply_type != AMQP_RESPONSE_NORMAL) {
+ throw std::runtime_error("Error getting message");
+ }
+
+ std::string msg(
+ (const char*)envelope.message.body.bytes,
+ envelope.message.body.len
+ );
+ amqp_destroy_envelope(&envelope);
+ return msg;
+}
+
+}
diff --git a/controller/RabbitMQ.hpp b/controller/RabbitMQ.hpp
new file mode 100644
index 00000000..d341681b
--- /dev/null
+++ b/controller/RabbitMQ.hpp
@@ -0,0 +1,79 @@
+/*
+ * ZeroTier One - Network Virtualization Everywhere
+ * Copyright (C) 2011-2019 ZeroTier, Inc. https://www.zerotier.com/
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * --
+ *
+ * You can be released from the requirements of the license by purchasing
+ * a commercial license. Buying such a license is mandatory as soon as you
+ * develop commercial closed-source software that incorporates or links
+ * directly against ZeroTier software without disclosing the source code
+ * of your own application.
+ */
+#ifndef ZT_CONTROLLER_RABBITMQ_HPP
+#define ZT_CONTROLLER_RABBITMQ_HPP
+
+namespace ZeroTier
+{
+struct MQConfig {
+ const char *host;
+ int port;
+ const char *username;
+ const char *password;
+};
+}
+
+#ifdef ZT_CONTROLLER_USE_LIBPQ
+
+#include "../node/Mutex.hpp"
+
+#include <amqp.h>
+#include <amqp_tcp_socket.h>
+#include <string>
+
+namespace ZeroTier
+{
+
+class RabbitMQ {
+public:
+ RabbitMQ(MQConfig *cfg, const char *queueName);
+ ~RabbitMQ();
+
+ void init();
+
+ std::string consume();
+
+private:
+ MQConfig *_mqc;
+ const char *_qName;
+
+ amqp_socket_t *_socket;
+ amqp_connection_state_t _conn;
+ amqp_queue_declare_ok_t *_q;
+ int _status;
+
+ int _channel;
+
+ Mutex _chan_m;
+
+};
+
+}
+
+#endif // ZT_CONTROLLER_USE_LIBPQ
+
+#endif // ZT_CONTROLLER_RABBITMQ_HPP
+