diff options
| author | Adam Ierymenko <adam.ierymenko@gmail.com> | 2019-03-08 13:16:02 -0800 |
|---|---|---|
| committer | Adam Ierymenko <adam.ierymenko@gmail.com> | 2019-03-08 13:16:02 -0800 |
| commit | d7a6357393f25748e4256d6ccf1321d7532d02b9 (patch) | |
| tree | 177ddac683b2a4c688da02f809f6e008fc5d22a4 /controller/RabbitMQ.cpp | |
| parent | 6fbf21b4f0abb89049e724b3bd625458327b6a77 (diff) | |
| parent | 993d850f697dbedd5c8967f7fab482af923df926 (diff) | |
| download | infinitytier-d7a6357393f25748e4256d6ccf1321d7532d02b9.tar.gz infinitytier-d7a6357393f25748e4256d6ccf1321d7532d02b9.zip | |
Merge branch 'dev' into edge
Diffstat (limited to 'controller/RabbitMQ.cpp')
| -rw-r--r-- | controller/RabbitMQ.cpp | 94 |
1 files changed, 94 insertions, 0 deletions
diff --git a/controller/RabbitMQ.cpp b/controller/RabbitMQ.cpp new file mode 100644 index 00000000..f5a5c1d6 --- /dev/null +++ b/controller/RabbitMQ.cpp @@ -0,0 +1,94 @@ +#include "RabbitMQ.hpp" + +#include <amqp.h> +#include <amqp_tcp_socket.h> +#include <stdexcept> +#include <cstring> + +namespace ZeroTier +{ + +RabbitMQ::RabbitMQ(MQConfig *cfg, const char *queueName) + : _mqc(cfg) + , _qName(queueName) + , _socket(NULL) + , _status(0) +{ +} + +RabbitMQ::~RabbitMQ() +{ + amqp_channel_close(_conn, _channel, AMQP_REPLY_SUCCESS); + amqp_connection_close(_conn, AMQP_REPLY_SUCCESS); + amqp_destroy_connection(_conn); +} + +void RabbitMQ::init() +{ + struct timeval tval; + memset(&tval, 0, sizeof(struct timeval)); + tval.tv_sec = 5; + + fprintf(stderr, "Initializing RabbitMQ %s\n", _qName); + _conn = amqp_new_connection(); + _socket = amqp_tcp_socket_new(_conn); + if (!_socket) { + throw std::runtime_error("Can't create socket for RabbitMQ"); + } + + _status = amqp_socket_open_noblock(_socket, _mqc->host, _mqc->port, &tval); + if (_status) { + throw std::runtime_error("Can't connect to RabbitMQ"); + } + + amqp_rpc_reply_t r = amqp_login(_conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, + _mqc->username, _mqc->password); + if (r.reply_type != AMQP_RESPONSE_NORMAL) { + throw std::runtime_error("RabbitMQ Login Error"); + } + + static int chan = 0; + { + Mutex::Lock l(_chan_m); + _channel = ++chan; + } + amqp_channel_open(_conn, _channel); + r = amqp_get_rpc_reply(_conn); + if(r.reply_type != AMQP_RESPONSE_NORMAL) { + throw std::runtime_error("Error opening communication channel"); + } + + _q = amqp_queue_declare(_conn, _channel, amqp_cstring_bytes(_qName), 0, 0, 0, 0, amqp_empty_table); + r = amqp_get_rpc_reply(_conn); + if (r.reply_type != AMQP_RESPONSE_NORMAL) { + throw std::runtime_error("Error declaring queue " + std::string(_qName)); + } + + amqp_basic_consume(_conn, _channel, amqp_cstring_bytes(_qName), amqp_empty_bytes, 0, 1, 0, amqp_empty_table); + r = amqp_get_rpc_reply(_conn); + if (r.reply_type != AMQP_RESPONSE_NORMAL) { + throw std::runtime_error("Error consuming queue " + std::string(_qName)); + } + fprintf(stderr, "RabbitMQ Init OK %s\n", _qName); +} + +std::string RabbitMQ::consume() +{ + amqp_rpc_reply_t res; + amqp_envelope_t envelope; + amqp_maybe_release_buffers(_conn); + + res = amqp_consume_message(_conn, &envelope, NULL, 0); + if (res.reply_type != AMQP_RESPONSE_NORMAL) { + throw std::runtime_error("Error getting message"); + } + + std::string msg( + (const char*)envelope.message.body.bytes, + envelope.message.body.len + ); + amqp_destroy_envelope(&envelope); + return msg; +} + +} |
