summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGrant Limberg <grant.limberg@zerotier.com>2018-09-06 15:14:16 -0700
committerGrant Limberg <grant.limberg@zerotier.com>2018-09-06 15:14:16 -0700
commit0ec62154931a80459e3888b7b34d622f1e9304cc (patch)
tree7d37011508fd8b8872d2192ce97a6f5ccc1fdf3e
parentde45bdb44817decfadc091e10143b7967a25be56 (diff)
downloadinfinitytier-0ec62154931a80459e3888b7b34d622f1e9304cc.tar.gz
infinitytier-0ec62154931a80459e3888b7b34d622f1e9304cc.zip
Send member/network change notifications via Postgres
-rw-r--r--controller/PostgreSQL.cpp46
1 files changed, 46 insertions, 0 deletions
diff --git a/controller/PostgreSQL.cpp b/controller/PostgreSQL.cpp
index b4b8d5f7..e0e25718 100644
--- a/controller/PostgreSQL.cpp
+++ b/controller/PostgreSQL.cpp
@@ -44,6 +44,21 @@ static const char *_timestr()
return ts;
}
+std::string join(const std::vector<std::string> &elements, const char * const separator)
+{
+ switch(elements.size()) {
+ case 0:
+ return "";
+ case 1:
+ return elements[0];
+ default:
+ std::ostringstream os;
+ std::copy(elements.begin(), elements.end()-1, std::ostream_iterator<std::string>(os, separator));
+ os << *elements.rbegin();
+ return os.str();
+ }
+}
+
}
using namespace ZeroTier;
@@ -983,12 +998,16 @@ void PostgreSQL::onlineNotificationThread()
int64_t lastUpdatedNetworkStatus = 0;
std::unordered_map< std::pair<uint64_t,uint64_t>,int64_t,_PairHasher > lastOnlineCumulative;
+
while (_run == 1) {
if (PQstatus(conn) != CONNECTION_OK) {
fprintf(stderr, "ERROR: Online Notification thread lost connection to Postgres.");
exit(-1);
}
+ // map used to send notifications to front end
+ std::unordered_map<std::string, std::vector<std::string>> updateMap;
+
std::unordered_map< std::pair<uint64_t,uint64_t>,std::pair<int64_t,InetAddress>,_PairHasher > lastOnline;
{
std::lock_guard<std::mutex> l(_lastOnline_l);
@@ -1024,6 +1043,10 @@ void PostgreSQL::onlineNotificationThread()
std::string networkId(nwidTmp);
std::string memberId(memTmp);
+
+ std::vector<std::string> &members = updateMap[networkId];
+ members.push_back(memberId);
+
int64_t ts = i->second.first;
std::string ipAddr = i->second.second.toIpString(ipTmp);
@@ -1108,6 +1131,10 @@ void PostgreSQL::onlineNotificationThread()
Utils::hex(i->first, tmp);
std::string networkId(tmp);
+
+ std::vector<std::string> &_notUsed = updateMap[networkId];
+ (void)_notUsed;
+
uint64_t authMemberCount = 0;
uint64_t totalMemberCount = 0;
uint64_t onlineMemberCount = 0;
@@ -1186,6 +1213,25 @@ void PostgreSQL::onlineNotificationThread()
}
}
+ for (auto it = updateMap.begin(); it != updateMap.end(); ++it) {
+ std::string networkId = it->first;
+ std::vector<std::string> members = it->second;
+ std::stringstream queryBuilder;
+
+ std::string membersStr = ::join(members, ",");
+
+ queryBuilder << "NOTIFY controller, '" << networkId << ":" << membersStr << "'";
+ std::string query = queryBuilder.str();
+
+ fprintf(stderr, "%s\n", query.c_str());
+
+ PGresult *res = PQexec(conn,query.c_str());
+ if (PQresultStatus(res) != PGRES_COMMAND_OK) {
+ fprintf(stderr, "ERROR: Error sending NOTIFY: %s\n", PQresultErrorMessage(res));
+ }
+ PQclear(res);
+ }
+
std::this_thread::sleep_for(std::chrono::milliseconds(250));
}
PQfinish(conn);