summaryrefslogtreecommitdiff
path: root/controller
diff options
context:
space:
mode:
authorAdam Ierymenko <adam.ierymenko@gmail.com>2019-08-06 07:51:50 -0500
committerAdam Ierymenko <adam.ierymenko@gmail.com>2019-08-06 07:51:50 -0500
commit37d508ab969afaf16c2aee1838a225022de34177 (patch)
tree1db65edb3ce0123e6e025f8b96b3bbadee952fdb /controller
parentf4f8fef82ed336d3a50dced91462462fbe61d58e (diff)
downloadinfinitytier-37d508ab969afaf16c2aee1838a225022de34177.tar.gz
infinitytier-37d508ab969afaf16c2aee1838a225022de34177.zip
Refactoring in prep for mirroring
Diffstat (limited to 'controller')
-rw-r--r--controller/DB.cpp6
-rw-r--r--controller/DB.hpp11
-rw-r--r--controller/EmbeddedNetworkController.cpp6
-rw-r--r--controller/EmbeddedNetworkController.hpp6
-rw-r--r--controller/FileDB.cpp2
-rw-r--r--controller/LFDB.cpp2
-rw-r--r--controller/PostgreSQL.cpp29
-rw-r--r--controller/PostgreSQL.hpp89
-rw-r--r--controller/RabbitMQ.cpp134
-rw-r--r--controller/RabbitMQ.hpp32
10 files changed, 163 insertions, 154 deletions
diff --git a/controller/DB.cpp b/controller/DB.cpp
index bb734dc8..75adf53e 100644
--- a/controller/DB.cpp
+++ b/controller/DB.cpp
@@ -313,7 +313,7 @@ void DB::_memberChanged(nlohmann::json &old,nlohmann::json &memberConfig,bool in
if (initialized) {
std::lock_guard<std::mutex> ll(_changeListeners_l);
for(auto i=_changeListeners.begin();i!=_changeListeners.end();++i) {
- (*i)->onNetworkMemberUpdate(networkId,memberId,memberConfig);
+ (*i)->onNetworkMemberUpdate(this,networkId,memberId,memberConfig);
}
}
} else if (memberId) {
@@ -336,7 +336,7 @@ void DB::_memberChanged(nlohmann::json &old,nlohmann::json &memberConfig,bool in
if ((initialized)&&((wasAuth)&&(!isAuth)&&(networkId)&&(memberId))) {
std::lock_guard<std::mutex> ll(_changeListeners_l);
for(auto i=_changeListeners.begin();i!=_changeListeners.end();++i) {
- (*i)->onNetworkMemberDeauthorize(networkId,memberId);
+ (*i)->onNetworkMemberDeauthorize(this,networkId,memberId);
}
}
}
@@ -362,7 +362,7 @@ void DB::_networkChanged(nlohmann::json &old,nlohmann::json &networkConfig,bool
if (initialized) {
std::lock_guard<std::mutex> ll(_changeListeners_l);
for(auto i=_changeListeners.begin();i!=_changeListeners.end();++i) {
- (*i)->onNetworkUpdate(networkId,networkConfig);
+ (*i)->onNetworkUpdate(this,networkId,networkConfig);
}
}
}
diff --git a/controller/DB.hpp b/controller/DB.hpp
index 85920eec..461f385e 100644
--- a/controller/DB.hpp
+++ b/controller/DB.hpp
@@ -58,10 +58,10 @@ public:
public:
ChangeListener() {}
virtual ~ChangeListener() {}
- virtual void onNetworkUpdate(uint64_t networkId,const nlohmann::json &network) {}
- virtual void onNetworkMemberUpdate(uint64_t networkId,uint64_t memberId,const nlohmann::json &member) {}
- virtual void onNetworkMemberDeauthorize(uint64_t networkId,uint64_t memberId) {}
- virtual void onNetworkMemberOnline(uint64_t networkId,uint64_t memberId,const InetAddress &physicalAddress) {}
+ virtual void onNetworkUpdate(const DB *db,uint64_t networkId,const nlohmann::json &network) {}
+ virtual void onNetworkMemberUpdate(const DB *db,uint64_t networkId,uint64_t memberId,const nlohmann::json &member) {}
+ virtual void onNetworkMemberDeauthorize(const DB *db,uint64_t networkId,uint64_t memberId) {}
+ virtual void onNetworkMemberOnline(const DB *db,uint64_t networkId,uint64_t memberId,const InetAddress &physicalAddress) {}
};
struct NetworkSummaryInfo
@@ -95,12 +95,15 @@ public:
bool get(const uint64_t networkId,nlohmann::json &network,const uint64_t memberId,nlohmann::json &member);
bool get(const uint64_t networkId,nlohmann::json &network,const uint64_t memberId,nlohmann::json &member,NetworkSummaryInfo &info);
bool get(const uint64_t networkId,nlohmann::json &network,std::vector<nlohmann::json> &members);
+
bool summary(const uint64_t networkId,NetworkSummaryInfo &info);
void networks(std::vector<uint64_t> &networks);
virtual void save(nlohmann::json *orig,nlohmann::json &record) = 0;
+
virtual void eraseNetwork(const uint64_t networkId) = 0;
virtual void eraseMember(const uint64_t networkId,const uint64_t memberId) = 0;
+
virtual void nodeIsOnline(const uint64_t networkId,const uint64_t memberId,const InetAddress &physicalAddress) = 0;
inline void addListener(DB::ChangeListener *const listener)
diff --git a/controller/EmbeddedNetworkController.cpp b/controller/EmbeddedNetworkController.cpp
index bf568527..80331578 100644
--- a/controller/EmbeddedNetworkController.cpp
+++ b/controller/EmbeddedNetworkController.cpp
@@ -1190,7 +1190,7 @@ void EmbeddedNetworkController::handleRemoteTrace(const ZT_RemoteTrace &rt)
}
}
-void EmbeddedNetworkController::onNetworkUpdate(const uint64_t networkId,const nlohmann::json &network)
+void EmbeddedNetworkController::onNetworkUpdate(const DB *db,uint64_t networkId,const nlohmann::json &network)
{
// Send an update to all members of the network that are online
const int64_t now = OSUtils::now();
@@ -1201,7 +1201,7 @@ void EmbeddedNetworkController::onNetworkUpdate(const uint64_t networkId,const n
}
}
-void EmbeddedNetworkController::onNetworkMemberUpdate(const uint64_t networkId,const uint64_t memberId,const nlohmann::json &member)
+void EmbeddedNetworkController::onNetworkMemberUpdate(const DB *db,uint64_t networkId,uint64_t memberId,const nlohmann::json &member)
{
// Push update to member if online
try {
@@ -1212,7 +1212,7 @@ void EmbeddedNetworkController::onNetworkMemberUpdate(const uint64_t networkId,c
} catch ( ... ) {}
}
-void EmbeddedNetworkController::onNetworkMemberDeauthorize(const uint64_t networkId,const uint64_t memberId)
+void EmbeddedNetworkController::onNetworkMemberDeauthorize(const DB *db,uint64_t networkId,uint64_t memberId)
{
const int64_t now = OSUtils::now();
Revocation rev((uint32_t)_node->prng(),networkId,0,now,ZT_REVOCATION_FLAG_FAST_PROPAGATE,Address(memberId),Revocation::CREDENTIAL_TYPE_COM);
diff --git a/controller/EmbeddedNetworkController.hpp b/controller/EmbeddedNetworkController.hpp
index 7bc37be2..c0f14f8b 100644
--- a/controller/EmbeddedNetworkController.hpp
+++ b/controller/EmbeddedNetworkController.hpp
@@ -101,9 +101,9 @@ public:
void handleRemoteTrace(const ZT_RemoteTrace &rt);
- virtual void onNetworkUpdate(const uint64_t networkId,const nlohmann::json &network);
- virtual void onNetworkMemberUpdate(const uint64_t networkId,const uint64_t memberId,const nlohmann::json &member);
- virtual void onNetworkMemberDeauthorize(const uint64_t networkId,const uint64_t memberId);
+ virtual void onNetworkUpdate(const DB *db,uint64_t networkId,const nlohmann::json &network);
+ virtual void onNetworkMemberUpdate(const DB *db,uint64_t networkId,uint64_t memberId,const nlohmann::json &member);
+ virtual void onNetworkMemberDeauthorize(const DB *db,uint64_t networkId,uint64_t memberId);
private:
void _request(uint64_t nwid,const InetAddress &fromAddr,uint64_t requestPacketId,const Identity &identity,const Dictionary<ZT_NETWORKCONFIG_METADATA_DICT_CAPACITY> &metaData);
diff --git a/controller/FileDB.cpp b/controller/FileDB.cpp
index 484aefa5..acc8680e 100644
--- a/controller/FileDB.cpp
+++ b/controller/FileDB.cpp
@@ -178,7 +178,7 @@ void FileDB::nodeIsOnline(const uint64_t networkId,const uint64_t memberId,const
{
std::lock_guard<std::mutex> l2(_changeListeners_l);
for(auto i=_changeListeners.begin();i!=_changeListeners.end();++i)
- (*i)->onNetworkMemberOnline(networkId,memberId,physicalAddress);
+ (*i)->onNetworkMemberOnline(this,networkId,memberId,physicalAddress);
}
}
diff --git a/controller/LFDB.cpp b/controller/LFDB.cpp
index a7bbf81d..9203a5a1 100644
--- a/controller/LFDB.cpp
+++ b/controller/LFDB.cpp
@@ -404,7 +404,7 @@ void LFDB::nodeIsOnline(const uint64_t networkId,const uint64_t memberId,const I
{
std::lock_guard<std::mutex> l2(_changeListeners_l);
for(auto i=_changeListeners.begin();i!=_changeListeners.end();++i)
- (*i)->onNetworkMemberOnline(networkId,memberId,physicalAddress);
+ (*i)->onNetworkMemberOnline(this,networkId,memberId,physicalAddress);
}
}
diff --git a/controller/PostgreSQL.cpp b/controller/PostgreSQL.cpp
index 709712b5..45be3e51 100644
--- a/controller/PostgreSQL.cpp
+++ b/controller/PostgreSQL.cpp
@@ -24,9 +24,11 @@
* of your own application.
*/
+#include "PostgreSQL.hpp"
+
#ifdef ZT_CONTROLLER_USE_LIBPQ
-#include "PostgreSQL.hpp"
+#include "../node/Constants.hpp"
#include "EmbeddedNetworkController.hpp"
#include "RabbitMQ.hpp"
#include "../version.h"
@@ -37,6 +39,7 @@
#include <amqp_tcp_socket.h>
using json = nlohmann::json;
+
namespace {
static const int DB_MINIMUM_VERSION = 5;
@@ -73,16 +76,16 @@ std::string join(const std::vector<std::string> &elements, const char * const se
}
}
-}
+} // anonymous namespace
using namespace ZeroTier;
PostgreSQL::PostgreSQL(const Identity &myId, const char *path, int listenPort, MQConfig *mqc)
- : DB(myId, path)
- , _ready(0)
+ : DB(myId, path)
+ , _ready(0)
, _connected(1)
- , _run(1)
- , _waitNoticePrinted(false)
+ , _run(1)
+ , _waitNoticePrinted(false)
, _listenPort(listenPort)
, _mqc(mqc)
{
@@ -221,7 +224,7 @@ void PostgreSQL::nodeIsOnline(const uint64_t networkId, const uint64_t memberId,
{
std::lock_guard<std::mutex> l2(_changeListeners_l);
for(auto i=_changeListeners.begin();i!=_changeListeners.end();++i)
- (*i)->onNetworkMemberOnline(networkId,memberId,physicalAddress);
+ (*i)->onNetworkMemberOnline(this,networkId,memberId,physicalAddress);
}
}
@@ -602,8 +605,8 @@ void PostgreSQL::heartbeat()
"public_identity = EXCLUDED.public_identity, v_major = EXCLUDED.v_major, v_minor = EXCLUDED.v_minor, "
"v_rev = EXCLUDED.v_rev, v_build = EXCLUDED.v_rev, host_port = EXCLUDED.host_port, "
"use_rabbitmq = EXCLUDED.use_rabbitmq",
- 10, // number of parameters
- NULL, // oid field. ignore
+ 10, // number of parameters
+ NULL, // oid field. ignore
values, // values for substitution
NULL, // lengths in bytes of each value
NULL, // binary?
@@ -724,7 +727,7 @@ void PostgreSQL::_membersWatcher_RabbitMQ() {
fprintf(stderr, "RABBITMQ ERROR member change: %s\n", e.what());
} catch(...) {
fprintf(stderr, "RABBITMQ ERROR member change: unknown error\n");
- }
+ }
}
}
@@ -1324,7 +1327,7 @@ void PostgreSQL::onlineNotificationThread()
int64_t lastUpdatedNetworkStatus = 0;
std::unordered_map< std::pair<uint64_t,uint64_t>,int64_t,_PairHasher > lastOnlineCumulative;
-
+
while (_run == 1) {
if (PQstatus(conn) != CONNECTION_OK) {
fprintf(stderr, "ERROR: Online Notification thread lost connection to Postgres.");
@@ -1438,7 +1441,8 @@ void PostgreSQL::onlineNotificationThread()
}
}
-PGconn *PostgreSQL::getPgConn(OverrideMode m) {
+PGconn *PostgreSQL::getPgConn(OverrideMode m)
+{
if (m == ALLOW_PGBOUNCER_OVERRIDE) {
char *connStr = getenv("PGBOUNCER_CONNSTR");
if (connStr != NULL) {
@@ -1452,4 +1456,5 @@ PGconn *PostgreSQL::getPgConn(OverrideMode m) {
return PQconnectdb(_connString.c_str());
}
+
#endif //ZT_CONTROLLER_USE_LIBPQ
diff --git a/controller/PostgreSQL.hpp b/controller/PostgreSQL.hpp
index f35f89fc..fe69635d 100644
--- a/controller/PostgreSQL.hpp
+++ b/controller/PostgreSQL.hpp
@@ -24,6 +24,8 @@
* of your own application.
*/
+#define ZT_CONTROLLER_USE_LIBPQ
+
#ifdef ZT_CONTROLLER_USE_LIBPQ
#ifndef ZT_CONTROLLER_LIBPQ_HPP
@@ -34,11 +36,10 @@
#define ZT_CENTRAL_CONTROLLER_COMMIT_THREADS 4
extern "C" {
- typedef struct pg_conn PGconn;
+typedef struct pg_conn PGconn;
}
-namespace ZeroTier
-{
+namespace ZeroTier {
struct MQConfig;
@@ -51,66 +52,66 @@ struct MQConfig;
class PostgreSQL : public DB
{
public:
- PostgreSQL(const Identity &myId, const char *path, int listenPort, MQConfig *mqc = NULL);
- virtual ~PostgreSQL();
+ PostgreSQL(const Identity &myId, const char *path, int listenPort, MQConfig *mqc = NULL);
+ virtual ~PostgreSQL();
- virtual bool waitForReady();
- virtual bool isReady();
- virtual void save(nlohmann::json *orig, nlohmann::json &record);
- virtual void eraseNetwork(const uint64_t networkId);
- virtual void eraseMember(const uint64_t networkId, const uint64_t memberId);
- virtual void nodeIsOnline(const uint64_t networkId, const uint64_t memberId, const InetAddress &physicalAddress);
+ virtual bool waitForReady();
+ virtual bool isReady();
+ virtual void save(nlohmann::json *orig, nlohmann::json &record);
+ virtual void eraseNetwork(const uint64_t networkId);
+ virtual void eraseMember(const uint64_t networkId, const uint64_t memberId);
+ virtual void nodeIsOnline(const uint64_t networkId, const uint64_t memberId, const InetAddress &physicalAddress);
protected:
- struct _PairHasher
+ struct _PairHasher
{
inline std::size_t operator()(const std::pair<uint64_t,uint64_t> &p) const { return (std::size_t)(p.first ^ p.second); }
};
private:
- void initializeNetworks(PGconn *conn);
- void initializeMembers(PGconn *conn);
- void heartbeat();
- void membersDbWatcher();
- void _membersWatcher_Postgres(PGconn *conn);
- void _membersWatcher_RabbitMQ();
- void networksDbWatcher();
- void _networksWatcher_Postgres(PGconn *conn);
- void _networksWatcher_RabbitMQ();
-
- void commitThread();
- void onlineNotificationThread();
-
- enum OverrideMode {
- ALLOW_PGBOUNCER_OVERRIDE = 0,
- NO_OVERRIDE = 1
- };
+ void initializeNetworks(PGconn *conn);
+ void initializeMembers(PGconn *conn);
+ void heartbeat();
+ void membersDbWatcher();
+ void _membersWatcher_Postgres(PGconn *conn);
+ void _membersWatcher_RabbitMQ();
+ void networksDbWatcher();
+ void _networksWatcher_Postgres(PGconn *conn);
+ void _networksWatcher_RabbitMQ();
+
+ void commitThread();
+ void onlineNotificationThread();
+
+ enum OverrideMode {
+ ALLOW_PGBOUNCER_OVERRIDE = 0,
+ NO_OVERRIDE = 1
+ };
- PGconn * getPgConn( OverrideMode m = ALLOW_PGBOUNCER_OVERRIDE );
+ PGconn * getPgConn( OverrideMode m = ALLOW_PGBOUNCER_OVERRIDE );
- std::string _connString;
+ std::string _connString;
- BlockingQueue<nlohmann::json *> _commitQueue;
+ BlockingQueue<nlohmann::json *> _commitQueue;
- std::thread _heartbeatThread;
- std::thread _membersDbWatcher;
- std::thread _networksDbWatcher;
- std::thread _commitThread[ZT_CENTRAL_CONTROLLER_COMMIT_THREADS];
- std::thread _onlineNotificationThread;
+ std::thread _heartbeatThread;
+ std::thread _membersDbWatcher;
+ std::thread _networksDbWatcher;
+ std::thread _commitThread[ZT_CENTRAL_CONTROLLER_COMMIT_THREADS];
+ std::thread _onlineNotificationThread;
std::unordered_map< std::pair<uint64_t,uint64_t>,std::pair<int64_t,InetAddress>,_PairHasher > _lastOnline;
- mutable std::mutex _lastOnline_l;
- mutable std::mutex _readyLock;
- std::atomic<int> _ready, _connected, _run;
- mutable volatile bool _waitNoticePrinted;
+ mutable std::mutex _lastOnline_l;
+ mutable std::mutex _readyLock;
+ std::atomic<int> _ready, _connected, _run;
+ mutable volatile bool _waitNoticePrinted;
- int _listenPort;
+ int _listenPort;
- MQConfig *_mqc;
+ MQConfig *_mqc;
};
-}
+} // namespace ZeroTier
#endif // ZT_CONTROLLER_LIBPQ_HPP
diff --git a/controller/RabbitMQ.cpp b/controller/RabbitMQ.cpp
index eec9745d..cf5c567d 100644
--- a/controller/RabbitMQ.cpp
+++ b/controller/RabbitMQ.cpp
@@ -11,95 +11,95 @@ namespace ZeroTier
{
RabbitMQ::RabbitMQ(MQConfig *cfg, const char *queueName)
- : _mqc(cfg)
- , _qName(queueName)
- , _socket(NULL)
- , _status(0)
+ : _mqc(cfg)
+ , _qName(queueName)
+ , _socket(NULL)
+ , _status(0)
{
}
RabbitMQ::~RabbitMQ()
{
- amqp_channel_close(_conn, _channel, AMQP_REPLY_SUCCESS);
- amqp_connection_close(_conn, AMQP_REPLY_SUCCESS);
- amqp_destroy_connection(_conn);
+ amqp_channel_close(_conn, _channel, AMQP_REPLY_SUCCESS);
+ amqp_connection_close(_conn, AMQP_REPLY_SUCCESS);
+ amqp_destroy_connection(_conn);
}
void RabbitMQ::init()
{
- struct timeval tval;
- memset(&tval, 0, sizeof(struct timeval));
- tval.tv_sec = 5;
+ struct timeval tval;
+ memset(&tval, 0, sizeof(struct timeval));
+ tval.tv_sec = 5;
- fprintf(stderr, "Initializing RabbitMQ %s\n", _qName);
- _conn = amqp_new_connection();
- _socket = amqp_tcp_socket_new(_conn);
- if (!_socket) {
- throw std::runtime_error("Can't create socket for RabbitMQ");
- }
-
- _status = amqp_socket_open_noblock(_socket, _mqc->host, _mqc->port, &tval);
- if (_status) {
- throw std::runtime_error("Can't connect to RabbitMQ");
- }
-
- amqp_rpc_reply_t r = amqp_login(_conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN,
- _mqc->username, _mqc->password);
- if (r.reply_type != AMQP_RESPONSE_NORMAL) {
- throw std::runtime_error("RabbitMQ Login Error");
- }
+ fprintf(stderr, "Initializing RabbitMQ %s\n", _qName);
+ _conn = amqp_new_connection();
+ _socket = amqp_tcp_socket_new(_conn);
+ if (!_socket) {
+ throw std::runtime_error("Can't create socket for RabbitMQ");
+ }
+
+ _status = amqp_socket_open_noblock(_socket, _mqc->host, _mqc->port, &tval);
+ if (_status) {
+ throw std::runtime_error("Can't connect to RabbitMQ");
+ }
+
+ amqp_rpc_reply_t r = amqp_login(_conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN,
+ _mqc->username, _mqc->password);
+ if (r.reply_type != AMQP_RESPONSE_NORMAL) {
+ throw std::runtime_error("RabbitMQ Login Error");
+ }
- static int chan = 0;
+ static int chan = 0;
{
Mutex::Lock l(_chan_m);
- _channel = ++chan;
+ _channel = ++chan;
+ }
+ amqp_channel_open(_conn, _channel);
+ r = amqp_get_rpc_reply(_conn);
+ if(r.reply_type != AMQP_RESPONSE_NORMAL) {
+ throw std::runtime_error("Error opening communication channel");
+ }
+
+ _q = amqp_queue_declare(_conn, _channel, amqp_cstring_bytes(_qName), 0, 0, 0, 0, amqp_empty_table);
+ r = amqp_get_rpc_reply(_conn);
+ if (r.reply_type != AMQP_RESPONSE_NORMAL) {
+ throw std::runtime_error("Error declaring queue " + std::string(_qName));
}
- amqp_channel_open(_conn, _channel);
- r = amqp_get_rpc_reply(_conn);
- if(r.reply_type != AMQP_RESPONSE_NORMAL) {
- throw std::runtime_error("Error opening communication channel");
- }
-
- _q = amqp_queue_declare(_conn, _channel, amqp_cstring_bytes(_qName), 0, 0, 0, 0, amqp_empty_table);
- r = amqp_get_rpc_reply(_conn);
- if (r.reply_type != AMQP_RESPONSE_NORMAL) {
- throw std::runtime_error("Error declaring queue " + std::string(_qName));
- }
- amqp_basic_consume(_conn, _channel, amqp_cstring_bytes(_qName), amqp_empty_bytes, 0, 1, 0, amqp_empty_table);
- r = amqp_get_rpc_reply(_conn);
- if (r.reply_type != AMQP_RESPONSE_NORMAL) {
- throw std::runtime_error("Error consuming queue " + std::string(_qName));
- }
- fprintf(stderr, "RabbitMQ Init OK %s\n", _qName);
+ amqp_basic_consume(_conn, _channel, amqp_cstring_bytes(_qName), amqp_empty_bytes, 0, 1, 0, amqp_empty_table);
+ r = amqp_get_rpc_reply(_conn);
+ if (r.reply_type != AMQP_RESPONSE_NORMAL) {
+ throw std::runtime_error("Error consuming queue " + std::string(_qName));
+ }
+ fprintf(stderr, "RabbitMQ Init OK %s\n", _qName);
}
std::string RabbitMQ::consume()
{
- amqp_rpc_reply_t res;
- amqp_envelope_t envelope;
- amqp_maybe_release_buffers(_conn);
+ amqp_rpc_reply_t res;
+ amqp_envelope_t envelope;
+ amqp_maybe_release_buffers(_conn);
- struct timeval timeout;
- timeout.tv_sec = 1;
- timeout.tv_usec = 0;
+ struct timeval timeout;
+ timeout.tv_sec = 1;
+ timeout.tv_usec = 0;
- res = amqp_consume_message(_conn, &envelope, &timeout, 0);
- if (res.reply_type != AMQP_RESPONSE_NORMAL) {
- if (res.reply_type == AMQP_RESPONSE_LIBRARY_EXCEPTION && res.library_error == AMQP_STATUS_TIMEOUT) {
- // timeout waiting for message. Return empty string
- return "";
- } else {
- throw std::runtime_error("Error getting message");
- }
- }
+ res = amqp_consume_message(_conn, &envelope, &timeout, 0);
+ if (res.reply_type != AMQP_RESPONSE_NORMAL) {
+ if (res.reply_type == AMQP_RESPONSE_LIBRARY_EXCEPTION && res.library_error == AMQP_STATUS_TIMEOUT) {
+ // timeout waiting for message. Return empty string
+ return "";
+ } else {
+ throw std::runtime_error("Error getting message");
+ }
+ }
- std::string msg(
- (const char*)envelope.message.body.bytes,
- envelope.message.body.len
- );
- amqp_destroy_envelope(&envelope);
- return msg;
+ std::string msg(
+ (const char*)envelope.message.body.bytes,
+ envelope.message.body.len
+ );
+ amqp_destroy_envelope(&envelope);
+ return msg;
}
}
diff --git a/controller/RabbitMQ.hpp b/controller/RabbitMQ.hpp
index d341681b..6bac68da 100644
--- a/controller/RabbitMQ.hpp
+++ b/controller/RabbitMQ.hpp
@@ -23,16 +23,17 @@
* directly against ZeroTier software without disclosing the source code
* of your own application.
*/
+
#ifndef ZT_CONTROLLER_RABBITMQ_HPP
#define ZT_CONTROLLER_RABBITMQ_HPP
namespace ZeroTier
{
struct MQConfig {
- const char *host;
- int port;
- const char *username;
- const char *password;
+ const char *host;
+ int port;
+ const char *username;
+ const char *password;
};
}
@@ -49,26 +50,25 @@ namespace ZeroTier
class RabbitMQ {
public:
- RabbitMQ(MQConfig *cfg, const char *queueName);
- ~RabbitMQ();
+ RabbitMQ(MQConfig *cfg, const char *queueName);
+ ~RabbitMQ();
- void init();
+ void init();
- std::string consume();
+ std::string consume();
private:
- MQConfig *_mqc;
- const char *_qName;
+ MQConfig *_mqc;
+ const char *_qName;
- amqp_socket_t *_socket;
- amqp_connection_state_t _conn;
- amqp_queue_declare_ok_t *_q;
- int _status;
+ amqp_socket_t *_socket;
+ amqp_connection_state_t _conn;
+ amqp_queue_declare_ok_t *_q;
+ int _status;
- int _channel;
+ int _channel;
Mutex _chan_m;
-
};
}