summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGrant Limberg <grant.limberg@zerotier.com>2019-04-18 14:57:06 -0700
committerGrant Limberg <grant.limberg@zerotier.com>2019-04-18 14:57:06 -0700
commit6a027c9c0abaf8dd46403ed94f4cce3378c0f826 (patch)
tree0ee5b28b3f3135fda0e7458ea86457cc83d5b584
parent6014df2847087f0682cd2bdcd25b91d5c700b750 (diff)
downloadinfinitytier-6a027c9c0abaf8dd46403ed94f4cce3378c0f826.tar.gz
infinitytier-6a027c9c0abaf8dd46403ed94f4cce3378c0f826.zip
amqp_consume_message now has a timeout
RabbitMQ::consume() will return an empty string if the call to amqp_consume_message times out
-rw-r--r--controller/PostgreSQL.cpp6
-rw-r--r--controller/RabbitMQ.cpp13
2 files changed, 17 insertions, 2 deletions
diff --git a/controller/PostgreSQL.cpp b/controller/PostgreSQL.cpp
index ad53bc54..9aa4fbaa 100644
--- a/controller/PostgreSQL.cpp
+++ b/controller/PostgreSQL.cpp
@@ -666,6 +666,9 @@ void PostgreSQL::_membersWatcher_RabbitMQ() {
try {
std::string msg = rmq.consume();
// fprintf(stderr, "Got Member Update: %s\n", msg.c_str());
+ if (msg.empty()) {
+ continue;
+ }
json tmp(json::parse(msg));
json &ov = tmp["old_val"];
json &nv = tmp["new_val"];
@@ -766,6 +769,9 @@ void PostgreSQL::_networksWatcher_RabbitMQ() {
while (_run == 1) {
try {
std::string msg = rmq.consume();
+ if (msg.empty()) {
+ continue;
+ }
// fprintf(stderr, "Got network update: %s\n", msg.c_str());
json tmp(json::parse(msg));
json &ov = tmp["old_val"];
diff --git a/controller/RabbitMQ.cpp b/controller/RabbitMQ.cpp
index 0b8cec73..eec9745d 100644
--- a/controller/RabbitMQ.cpp
+++ b/controller/RabbitMQ.cpp
@@ -80,9 +80,18 @@ std::string RabbitMQ::consume()
amqp_envelope_t envelope;
amqp_maybe_release_buffers(_conn);
- res = amqp_consume_message(_conn, &envelope, NULL, 0);
+ struct timeval timeout;
+ timeout.tv_sec = 1;
+ timeout.tv_usec = 0;
+
+ res = amqp_consume_message(_conn, &envelope, &timeout, 0);
if (res.reply_type != AMQP_RESPONSE_NORMAL) {
- throw std::runtime_error("Error getting message");
+ if (res.reply_type == AMQP_RESPONSE_LIBRARY_EXCEPTION && res.library_error == AMQP_STATUS_TIMEOUT) {
+ // timeout waiting for message. Return empty string
+ return "";
+ } else {
+ throw std::runtime_error("Error getting message");
+ }
}
std::string msg(