diff options
author | Grant Limberg <grant.limberg@zerotier.com> | 2018-09-06 15:14:16 -0700 |
---|---|---|
committer | Grant Limberg <grant.limberg@zerotier.com> | 2018-09-06 15:14:16 -0700 |
commit | 0ec62154931a80459e3888b7b34d622f1e9304cc (patch) | |
tree | 7d37011508fd8b8872d2192ce97a6f5ccc1fdf3e | |
parent | de45bdb44817decfadc091e10143b7967a25be56 (diff) | |
download | infinitytier-0ec62154931a80459e3888b7b34d622f1e9304cc.tar.gz infinitytier-0ec62154931a80459e3888b7b34d622f1e9304cc.zip |
Send member/network change notifications via Postgres
-rw-r--r-- | controller/PostgreSQL.cpp | 46 |
1 files changed, 46 insertions, 0 deletions
diff --git a/controller/PostgreSQL.cpp b/controller/PostgreSQL.cpp index b4b8d5f7..e0e25718 100644 --- a/controller/PostgreSQL.cpp +++ b/controller/PostgreSQL.cpp @@ -44,6 +44,21 @@ static const char *_timestr() return ts; } +std::string join(const std::vector<std::string> &elements, const char * const separator) +{ + switch(elements.size()) { + case 0: + return ""; + case 1: + return elements[0]; + default: + std::ostringstream os; + std::copy(elements.begin(), elements.end()-1, std::ostream_iterator<std::string>(os, separator)); + os << *elements.rbegin(); + return os.str(); + } +} + } using namespace ZeroTier; @@ -983,12 +998,16 @@ void PostgreSQL::onlineNotificationThread() int64_t lastUpdatedNetworkStatus = 0; std::unordered_map< std::pair<uint64_t,uint64_t>,int64_t,_PairHasher > lastOnlineCumulative; + while (_run == 1) { if (PQstatus(conn) != CONNECTION_OK) { fprintf(stderr, "ERROR: Online Notification thread lost connection to Postgres."); exit(-1); } + // map used to send notifications to front end + std::unordered_map<std::string, std::vector<std::string>> updateMap; + std::unordered_map< std::pair<uint64_t,uint64_t>,std::pair<int64_t,InetAddress>,_PairHasher > lastOnline; { std::lock_guard<std::mutex> l(_lastOnline_l); @@ -1024,6 +1043,10 @@ void PostgreSQL::onlineNotificationThread() std::string networkId(nwidTmp); std::string memberId(memTmp); + + std::vector<std::string> &members = updateMap[networkId]; + members.push_back(memberId); + int64_t ts = i->second.first; std::string ipAddr = i->second.second.toIpString(ipTmp); @@ -1108,6 +1131,10 @@ void PostgreSQL::onlineNotificationThread() Utils::hex(i->first, tmp); std::string networkId(tmp); + + std::vector<std::string> &_notUsed = updateMap[networkId]; + (void)_notUsed; + uint64_t authMemberCount = 0; uint64_t totalMemberCount = 0; uint64_t onlineMemberCount = 0; @@ -1186,6 +1213,25 @@ void PostgreSQL::onlineNotificationThread() } } + for (auto it = updateMap.begin(); it != updateMap.end(); ++it) { + std::string networkId = it->first; + std::vector<std::string> members = it->second; + std::stringstream queryBuilder; + + std::string membersStr = ::join(members, ","); + + queryBuilder << "NOTIFY controller, '" << networkId << ":" << membersStr << "'"; + std::string query = queryBuilder.str(); + + fprintf(stderr, "%s\n", query.c_str()); + + PGresult *res = PQexec(conn,query.c_str()); + if (PQresultStatus(res) != PGRES_COMMAND_OK) { + fprintf(stderr, "ERROR: Error sending NOTIFY: %s\n", PQresultErrorMessage(res)); + } + PQclear(res); + } + std::this_thread::sleep_for(std::chrono::milliseconds(250)); } PQfinish(conn); |