From 0ec62154931a80459e3888b7b34d622f1e9304cc Mon Sep 17 00:00:00 2001 From: Grant Limberg Date: Thu, 6 Sep 2018 15:14:16 -0700 Subject: Send member/network change notifications via Postgres --- controller/PostgreSQL.cpp | 46 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 46 insertions(+) diff --git a/controller/PostgreSQL.cpp b/controller/PostgreSQL.cpp index b4b8d5f7..e0e25718 100644 --- a/controller/PostgreSQL.cpp +++ b/controller/PostgreSQL.cpp @@ -44,6 +44,21 @@ static const char *_timestr() return ts; } +std::string join(const std::vector &elements, const char * const separator) +{ + switch(elements.size()) { + case 0: + return ""; + case 1: + return elements[0]; + default: + std::ostringstream os; + std::copy(elements.begin(), elements.end()-1, std::ostream_iterator(os, separator)); + os << *elements.rbegin(); + return os.str(); + } +} + } using namespace ZeroTier; @@ -983,12 +998,16 @@ void PostgreSQL::onlineNotificationThread() int64_t lastUpdatedNetworkStatus = 0; std::unordered_map< std::pair,int64_t,_PairHasher > lastOnlineCumulative; + while (_run == 1) { if (PQstatus(conn) != CONNECTION_OK) { fprintf(stderr, "ERROR: Online Notification thread lost connection to Postgres."); exit(-1); } + // map used to send notifications to front end + std::unordered_map> updateMap; + std::unordered_map< std::pair,std::pair,_PairHasher > lastOnline; { std::lock_guard l(_lastOnline_l); @@ -1024,6 +1043,10 @@ void PostgreSQL::onlineNotificationThread() std::string networkId(nwidTmp); std::string memberId(memTmp); + + std::vector &members = updateMap[networkId]; + members.push_back(memberId); + int64_t ts = i->second.first; std::string ipAddr = i->second.second.toIpString(ipTmp); @@ -1108,6 +1131,10 @@ void PostgreSQL::onlineNotificationThread() Utils::hex(i->first, tmp); std::string networkId(tmp); + + std::vector &_notUsed = updateMap[networkId]; + (void)_notUsed; + uint64_t authMemberCount = 0; uint64_t totalMemberCount = 0; uint64_t onlineMemberCount = 0; @@ -1186,6 +1213,25 @@ void PostgreSQL::onlineNotificationThread() } } + for (auto it = updateMap.begin(); it != updateMap.end(); ++it) { + std::string networkId = it->first; + std::vector members = it->second; + std::stringstream queryBuilder; + + std::string membersStr = ::join(members, ","); + + queryBuilder << "NOTIFY controller, '" << networkId << ":" << membersStr << "'"; + std::string query = queryBuilder.str(); + + fprintf(stderr, "%s\n", query.c_str()); + + PGresult *res = PQexec(conn,query.c_str()); + if (PQresultStatus(res) != PGRES_COMMAND_OK) { + fprintf(stderr, "ERROR: Error sending NOTIFY: %s\n", PQresultErrorMessage(res)); + } + PQclear(res); + } + std::this_thread::sleep_for(std::chrono::milliseconds(250)); } PQfinish(conn); -- cgit v1.2.3