summaryrefslogtreecommitdiff
path: root/controller/RabbitMQ.cpp
diff options
context:
space:
mode:
authorAdam Ierymenko <adam.ierymenko@gmail.com>2019-03-08 13:16:02 -0800
committerAdam Ierymenko <adam.ierymenko@gmail.com>2019-03-08 13:16:02 -0800
commitd7a6357393f25748e4256d6ccf1321d7532d02b9 (patch)
tree177ddac683b2a4c688da02f809f6e008fc5d22a4 /controller/RabbitMQ.cpp
parent6fbf21b4f0abb89049e724b3bd625458327b6a77 (diff)
parent993d850f697dbedd5c8967f7fab482af923df926 (diff)
downloadinfinitytier-d7a6357393f25748e4256d6ccf1321d7532d02b9.tar.gz
infinitytier-d7a6357393f25748e4256d6ccf1321d7532d02b9.zip
Merge branch 'dev' into edge
Diffstat (limited to 'controller/RabbitMQ.cpp')
-rw-r--r--controller/RabbitMQ.cpp94
1 files changed, 94 insertions, 0 deletions
diff --git a/controller/RabbitMQ.cpp b/controller/RabbitMQ.cpp
new file mode 100644
index 00000000..f5a5c1d6
--- /dev/null
+++ b/controller/RabbitMQ.cpp
@@ -0,0 +1,94 @@
+#include "RabbitMQ.hpp"
+
+#include <amqp.h>
+#include <amqp_tcp_socket.h>
+#include <stdexcept>
+#include <cstring>
+
+namespace ZeroTier
+{
+
+RabbitMQ::RabbitMQ(MQConfig *cfg, const char *queueName)
+ : _mqc(cfg)
+ , _qName(queueName)
+ , _socket(NULL)
+ , _status(0)
+{
+}
+
+RabbitMQ::~RabbitMQ()
+{
+ amqp_channel_close(_conn, _channel, AMQP_REPLY_SUCCESS);
+ amqp_connection_close(_conn, AMQP_REPLY_SUCCESS);
+ amqp_destroy_connection(_conn);
+}
+
+void RabbitMQ::init()
+{
+ struct timeval tval;
+ memset(&tval, 0, sizeof(struct timeval));
+ tval.tv_sec = 5;
+
+ fprintf(stderr, "Initializing RabbitMQ %s\n", _qName);
+ _conn = amqp_new_connection();
+ _socket = amqp_tcp_socket_new(_conn);
+ if (!_socket) {
+ throw std::runtime_error("Can't create socket for RabbitMQ");
+ }
+
+ _status = amqp_socket_open_noblock(_socket, _mqc->host, _mqc->port, &tval);
+ if (_status) {
+ throw std::runtime_error("Can't connect to RabbitMQ");
+ }
+
+ amqp_rpc_reply_t r = amqp_login(_conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN,
+ _mqc->username, _mqc->password);
+ if (r.reply_type != AMQP_RESPONSE_NORMAL) {
+ throw std::runtime_error("RabbitMQ Login Error");
+ }
+
+ static int chan = 0;
+ {
+ Mutex::Lock l(_chan_m);
+ _channel = ++chan;
+ }
+ amqp_channel_open(_conn, _channel);
+ r = amqp_get_rpc_reply(_conn);
+ if(r.reply_type != AMQP_RESPONSE_NORMAL) {
+ throw std::runtime_error("Error opening communication channel");
+ }
+
+ _q = amqp_queue_declare(_conn, _channel, amqp_cstring_bytes(_qName), 0, 0, 0, 0, amqp_empty_table);
+ r = amqp_get_rpc_reply(_conn);
+ if (r.reply_type != AMQP_RESPONSE_NORMAL) {
+ throw std::runtime_error("Error declaring queue " + std::string(_qName));
+ }
+
+ amqp_basic_consume(_conn, _channel, amqp_cstring_bytes(_qName), amqp_empty_bytes, 0, 1, 0, amqp_empty_table);
+ r = amqp_get_rpc_reply(_conn);
+ if (r.reply_type != AMQP_RESPONSE_NORMAL) {
+ throw std::runtime_error("Error consuming queue " + std::string(_qName));
+ }
+ fprintf(stderr, "RabbitMQ Init OK %s\n", _qName);
+}
+
+std::string RabbitMQ::consume()
+{
+ amqp_rpc_reply_t res;
+ amqp_envelope_t envelope;
+ amqp_maybe_release_buffers(_conn);
+
+ res = amqp_consume_message(_conn, &envelope, NULL, 0);
+ if (res.reply_type != AMQP_RESPONSE_NORMAL) {
+ throw std::runtime_error("Error getting message");
+ }
+
+ std::string msg(
+ (const char*)envelope.message.body.bytes,
+ envelope.message.body.len
+ );
+ amqp_destroy_envelope(&envelope);
+ return msg;
+}
+
+}