diff options
author | Grant Limberg <grant.limberg@zerotier.com> | 2019-04-18 14:57:06 -0700 |
---|---|---|
committer | Grant Limberg <grant.limberg@zerotier.com> | 2019-04-18 14:57:06 -0700 |
commit | 6a027c9c0abaf8dd46403ed94f4cce3378c0f826 (patch) | |
tree | 0ee5b28b3f3135fda0e7458ea86457cc83d5b584 | |
parent | 6014df2847087f0682cd2bdcd25b91d5c700b750 (diff) | |
download | infinitytier-6a027c9c0abaf8dd46403ed94f4cce3378c0f826.tar.gz infinitytier-6a027c9c0abaf8dd46403ed94f4cce3378c0f826.zip |
amqp_consume_message now has a timeout
RabbitMQ::consume() will return an empty string if the call to amqp_consume_message times out
-rw-r--r-- | controller/PostgreSQL.cpp | 6 | ||||
-rw-r--r-- | controller/RabbitMQ.cpp | 13 |
2 files changed, 17 insertions, 2 deletions
diff --git a/controller/PostgreSQL.cpp b/controller/PostgreSQL.cpp index ad53bc54..9aa4fbaa 100644 --- a/controller/PostgreSQL.cpp +++ b/controller/PostgreSQL.cpp @@ -666,6 +666,9 @@ void PostgreSQL::_membersWatcher_RabbitMQ() { try { std::string msg = rmq.consume(); // fprintf(stderr, "Got Member Update: %s\n", msg.c_str()); + if (msg.empty()) { + continue; + } json tmp(json::parse(msg)); json &ov = tmp["old_val"]; json &nv = tmp["new_val"]; @@ -766,6 +769,9 @@ void PostgreSQL::_networksWatcher_RabbitMQ() { while (_run == 1) { try { std::string msg = rmq.consume(); + if (msg.empty()) { + continue; + } // fprintf(stderr, "Got network update: %s\n", msg.c_str()); json tmp(json::parse(msg)); json &ov = tmp["old_val"]; diff --git a/controller/RabbitMQ.cpp b/controller/RabbitMQ.cpp index 0b8cec73..eec9745d 100644 --- a/controller/RabbitMQ.cpp +++ b/controller/RabbitMQ.cpp @@ -80,9 +80,18 @@ std::string RabbitMQ::consume() amqp_envelope_t envelope; amqp_maybe_release_buffers(_conn); - res = amqp_consume_message(_conn, &envelope, NULL, 0); + struct timeval timeout; + timeout.tv_sec = 1; + timeout.tv_usec = 0; + + res = amqp_consume_message(_conn, &envelope, &timeout, 0); if (res.reply_type != AMQP_RESPONSE_NORMAL) { - throw std::runtime_error("Error getting message"); + if (res.reply_type == AMQP_RESPONSE_LIBRARY_EXCEPTION && res.library_error == AMQP_STATUS_TIMEOUT) { + // timeout waiting for message. Return empty string + return ""; + } else { + throw std::runtime_error("Error getting message"); + } } std::string msg( |