diff options
author | Grant Limberg <grant.limberg@zerotier.com> | 2018-09-04 14:00:02 -0700 |
---|---|---|
committer | Grant Limberg <grant.limberg@zerotier.com> | 2018-09-04 14:00:02 -0700 |
commit | cd657da4311f2b3fd2c8feb5322a23cafa8894a1 (patch) | |
tree | 474c759916c02302b2adb2b634d8b851d3c1cfdd | |
parent | 17af09a5cc52b8aabc6721cae588d8edd195e9d9 (diff) | |
download | infinitytier-cd657da4311f2b3fd2c8feb5322a23cafa8894a1.tar.gz infinitytier-cd657da4311f2b3fd2c8feb5322a23cafa8894a1.zip |
Online Update/Notify thread
-rw-r--r-- | controller/PostgreSQL.cpp | 271 |
1 files changed, 270 insertions, 1 deletions
diff --git a/controller/PostgreSQL.cpp b/controller/PostgreSQL.cpp index fd8d4f18..14a9c285 100644 --- a/controller/PostgreSQL.cpp +++ b/controller/PostgreSQL.cpp @@ -153,7 +153,57 @@ void PostgreSQL::nodeIsOnline(const uint64_t networkId, const uint64_t memberId, void PostgreSQL::initializeNetworks(PGconn *conn) { - // TODO: do stuff here + if (PQstatus(conn) != CONNECTION_OK) { + fprintf(stderr, "Bad Database Connection: %s", PQerrorMessage(conn)); + exit(1); + } + + const char *params[1] = { + _myAddressStr.c_str() + }; + + PGresult *res = PQexecParams(conn, "SELECT id, EXTRACT(EPOCH FROM creation_time AT TIME ZONE 'UTC')*1000, capabilities, " + "enable_broadcast, EXTRACT(EPOCH FROM last_modified AT TIME ZONE 'UTC')*1000, mtu, multicast_limit, name, private, remote_trace_level, " + "remote_trace_target, revision, rules, tags, v4_assign_mode, v6_assign_mode FROM ztc_network " + "WHERE deleted = false AND controller_id = $1", + 1, + NULL, + params, + NULL, + NULL, + 0); + + if (PQresultStatus(res) != PGRES_TUPLES_OK) { + fprintf(stderr, "Networks Initialization Failed: %s", PQerrorMessage(conn)); + PQclear(res); + exit(1); + } + + int numRows = PQntuples(res); + for (int i = 0; i < numRows; ++i) { + json empty; + json config; + config["nwid"] = PQgetvalue(res, i, 0); + config["creationTime"] = std::stoull(PQgetvalue(res, i, 1)); + config["capabilities"] = json::parse(PQgetvalue(res, i, 2)); + config["enableBroadcast"] = (strcmp(PQgetvalue(res, i, 3),"true")==0); + config["lastModified"] = std::stoull(PQgetvalue(res, i, 4)); + config["mtu"] = std::stoi(PQgetvalue(res, i, 5)); + config["multicastLimit"] = std::stoi(PQgetvalue(res, i, 6)); + config["name"] = PQgetvalue(res, i, 7); + config["private"] = (strcmp(PQgetvalue(res, i, 8),"true")==0); + config["remoteTraceLevel"] = std::stoi(PQgetvalue(res, i, 9)); + config["remoteTraceTarget"] = PQgetvalue(res, i, 10); + config["revision"] = std::stoull(PQgetvalue(res, i, 11)); + config["rules"] = json::parse(PQgetvalue(res, i, 12)); + config["tags"] = json::parse(PQgetvalue(res, i, 13)); + config["v4AssignMode"] = json::parse(PQgetvalue(res, i, 14)); + config["v6AssignMode"] = json::parse(PQgetvalue(res, i, 15)); + + _networkChanged(empty, config, false); + } + + PQclear(res); if (++this->_ready == 2) { if (_waitNoticePrinted) { @@ -275,6 +325,8 @@ void PostgreSQL::membersDbWatcher() exit(1); } + PQclear(res); res = NULL; + while(_run == 1) { if (PQstatus(conn) != CONNECTION_OK) { fprintf(stderr, "ERROR: Member Watcher lost connection to Postgres."); @@ -326,6 +378,8 @@ void PostgreSQL::networksDbWatcher() exit(1); } + PQclear(res); res = NULL; + while(_run == 1) { if (PQstatus(conn) != CONNECTION_OK) { fprintf(stderr, "ERROR: Network Watcher lost connection to Postgres."); @@ -368,10 +422,225 @@ void PostgreSQL::commitThread() void PostgreSQL::onlineNotificationThread() { + PGconn *conn = PQconnectdb(_path.c_str()); + if (PQstatus(conn) == CONNECTION_BAD) { + fprintf(stderr, "Connection to database failed: %s\n", PQerrorMessage(conn)); + PQfinish(conn); + exit(1); + } _connected = 1; + int64_t lastUpdatedNetworkStatus = 0; + std::unordered_map< std::pair<uint64_t,uint64_t>,int64_t,_PairHasher > lastOnlineCumulative; while (_run == 1) { + if (PQstatus(conn) != CONNECTION_OK) { + fprintf(stderr, "ERROR: Online Notification thread lost connection to Postgres."); + exit(-1); + } + + std::unordered_map< std::pair<uint64_t,uint64_t>,std::pair<int64_t,InetAddress>,_PairHasher > lastOnline; + { + std::lock_guard<std::mutex> l(_lastOnline_l); + lastOnline.swap(_lastOnline); + } + + PGresult *res = NULL; + int qCount = 0; + + if (!lastOnline.empty()) { + fprintf(stderr, "Last Online Update\n"); + res = PQexec(conn, "BEGIN"); + if (PQresultStatus(res) != PGRES_COMMAND_OK) { + fprintf(stderr, "ERROR: Error on BEGIN command (onlineNotificationThread): %s\n", PQresultErrorMessage(res)); + PQclear(res); + exit(1); + } + PQclear(res); + } + + for (auto i=lastOnline.begin(); i != lastOnline.end(); ++i) { + lastOnlineCumulative[i->first] = i->second.first; + char nwidTmp[64]; + char memTmp[64]; + char ipTmp[64]; + OSUtils::ztsnprintf(nwidTmp,sizeof(nwidTmp), "%.16llx", i->first.first); + OSUtils::ztsnprintf(memTmp,sizeof(memTmp), "%.10llx", i->first.second); + + std::string networkId(nwidTmp); + std::string memberId(memTmp); + int64_t ts = i->second.first; + std::string ipAddr = i->second.second.toIpString(ipTmp); + + const char *values[4] = { + networkId.c_str(), + memberId.c_str(), + std::to_string(ts).c_str(), + ipAddr.c_str() + }; + + res = PQexecParams(conn, + "INSERT INTO ztc_member_status (network_id, member_id, address, last_updated) VALUES ($1, $2, $3, $4)" + "ON CONFLICT (network_id, member_id) DO UPDATE SET address = EXCLUDED.address, last_updated = EXCLUDED.last_updated", + 8, // number of parameters + NULL, // oid field. ignore + values, // values for substitution + NULL, // lengths in bytes of each value + NULL, + 0); + + if (PQresultStatus(res) != PGRES_COMMAND_OK) { + fprintf(stderr, "Error on Member Status upsert: %s\n", PQresultErrorMessage(res)); + PQclear(res); + PQexec(conn, "ROLLBACK"); + exit(1); + } + + PQclear(res); + + if ((++qCount) == 1024) { + res = PQexec(conn, "COMMIT"); + if (PQresultStatus(res) != PGRES_COMMAND_OK) { + fprintf(stderr, "ERROR: Error on commit (onlineNotificationThread): %s\n", PQresultErrorMessage(res)); + PQclear(res); + PQexec(conn, "ROLLBACK"); + exit(1); + } + PQclear(res); + + res = PQexec(conn, "BEGIN"); + if (PQresultStatus(res) != PGRES_COMMAND_OK) { + fprintf(stderr, "ERROR: Error on BEGIN (onlineNotificationThread): %s\n", PQresultErrorMessage(res)); + PQclear(res); + exit(1); + } + PQclear(res); + qCount = 0; + } + } + if (qCount > 0) { + fprintf(stderr, "qCount is %d\n", qCount); + res = PQexec(conn, "COMMIT"); + if (PQresultStatus(res) != PGRES_COMMAND_OK) { + fprintf(stderr, "ERROR: Error on commit (onlineNotificationThread): %s\n", PQresultErrorMessage(res)); + PQclear(res); + PQexec(conn, "ROLLBACK"); + exit(1); + } + PQclear(res); + } + + const int64_t now = OSUtils::now(); + if ((now - lastUpdatedNetworkStatus) > 10000) { + lastUpdatedNetworkStatus = now; + + std::vector<std::pair<uint64_t, std::shared_ptr<_Network>>> networks; + { + std::lock_guard<std::mutex> l(_networks_l); + for (auto i = _networks.begin(); i != _networks.end(); ++i) { + networks.push_back(*i); + } + } + + int nCount = 0; + if (!networks.empty()) { + fprintf(stderr, "Network update"); + res = PQexec(conn, "BEGIN"); + if (PQresultStatus(res) != PGRES_COMMAND_OK) { + fprintf(stderr, "ERROR: Error on BEGIN command (onlineNotificationThread): %s\n", PQresultErrorMessage(res)); + PQclear(res); + exit(1); + } + PQclear(res); + } + for (auto i = networks.begin(); i != networks.end(); ++i) { + char tmp[64]; + Utils::hex(i->first, tmp); + + std::string networkId(tmp); + uint64_t authMemberCount = 0; + uint64_t totalMemberCount = 0; + uint64_t onlineMemberCount = 0; + uint64_t bridgeCount = 0; + uint64_t ts = now; + { + std::lock_guard<std::mutex> l2(i->second->lock); + authMemberCount = i->second->authorizedMembers.size(); + totalMemberCount = i->second->members.size(); + bridgeCount = i->second->activeBridgeMembers.size(); + for (auto m=i->second->members.begin(); m != i->second->members.end(); ++m) { + auto lo = lastOnlineCumulative.find(std::pair<uint64_t,uint64_t>(i->first, m->first)); + if (lo != lastOnlineCumulative.end()) { + if ((now - lo->second) <= (ZT_NETWORK_AUTOCONF_DELAY * 2)) { + ++onlineMemberCount; + } else { + lastOnlineCumulative.erase(lo); + } + } + } + } + + const char *values[6] = { + networkId.c_str(), + std::to_string(bridgeCount).c_str(), + std::to_string(authMemberCount).c_str(), + std::to_string(onlineMemberCount).c_str(), + std::to_string(totalMemberCount).c_str(), + std::to_string(ts).c_str() + }; + + res = PQexecParams(conn, "INSERT INTO ztc_network_status (network_id, bridge_count, authorized_member_count, " + "online_member_count, total_member_count, last_modified) VALUES ($1, $2, $3, $4, $5, $6) " + "ON CONFLICT (network_id) DO UPDATE SET bridge_count = EXCLUDED.bridge_count, " + "authorized_member_count = EXCLUDED.authorized_member_count, online_member_count = EXCDLUDED.online_member_count, " + "total_member_count = EXCLUDED.total_member_count, last_modified = EXCLUDED.last_modified", + 6, + NULL, + values, + NULL, + NULL, + 0); + + if (PQresultStatus(res) != PGRES_COMMAND_OK) { + fprintf(stderr, "ERROR: Error on Network Satus upsert (onlineNotificationThread): %s\n", PQresultErrorMessage(res)); + PQclear(res); + PQexec(conn, "ROLLBACK"); + exit(1); + } + + if ((++nCount) == 1024) { + res = PQexec(conn, "COMMIT"); + if (PQresultStatus(res) != PGRES_COMMAND_OK) { + fprintf(stderr, "ERROR: Error on COMMIT (onlineNotificationThread): %s\n" , PQresultErrorMessage(res)); + PQclear(res); + PQexec(conn, "ROLLBACK"); + exit(1); + } + + res = PQexec(conn, "BEGIN"); + if (PQresultStatus(res) != PGRES_COMMAND_OK) { + fprintf(stderr, "ERROR: Error on BEGIN command (onlineNotificationThread): %s\n", PQresultErrorMessage(res)); + PQclear(res); + exit(1); + } + + nCount = 0; + } + } + + if (nCount > 0) { + fprintf(stderr, "nCount is %d\n", nCount); + res = PQexec(conn, "COMMIT"); + if (PQresultStatus(res) != PGRES_COMMAND_OK) { + fprintf(stderr, "ERROR: Error on COMMIT (onlineNotificationThread): %s\n", PQresultErrorMessage(res)); + PQclear(res); + PQexec(conn, "ROLLBACK"); + exit(1); + } + } + } + std::this_thread::sleep_for(std::chrono::milliseconds(250)); } + PQfinish(conn); } #endif //ZT_CONTROLLER_USE_LIBPQ
\ No newline at end of file |