diff options
author | Grant Limberg <grant.limberg@zerotier.com> | 2019-04-04 12:40:49 -0700 |
---|---|---|
committer | Grant Limberg <grant.limberg@zerotier.com> | 2019-04-04 12:40:49 -0700 |
commit | 55a9e6e05ebd18ca6d9b4f1ac482b980173d832d (patch) | |
tree | 586e17398f1f2f51ac35a5399fd9f95691812616 | |
parent | 02f0eead1ce979297f370be1a66b4defdefc37a8 (diff) | |
download | infinitytier-55a9e6e05ebd18ca6d9b4f1ac482b980173d832d.tar.gz infinitytier-55a9e6e05ebd18ca6d9b4f1ac482b980173d832d.zip |
Queue up inserts in onlineNotificationThread() into a multiple insert statement
-rw-r--r-- | controller/PostgreSQL.cpp | 144 |
1 files changed, 99 insertions, 45 deletions
diff --git a/controller/PostgreSQL.cpp b/controller/PostgreSQL.cpp index a856e4af..aa87331c 100644 --- a/controller/PostgreSQL.cpp +++ b/controller/PostgreSQL.cpp @@ -1298,6 +1298,10 @@ void PostgreSQL::onlineNotificationThread() PGresult *res = NULL; + std::stringstream memberUpdate; + memberUpdate << "INSERT INTO ztc_member_status (network_id, member_id, address, last_updated) VALUES "; + bool firstRun = true; + bool memberAdded = false; for (auto i=lastOnline.begin(); i != lastOnline.end(); ++i) { uint64_t nwid_i = i->first.first; char nwidTmp[64]; @@ -1311,41 +1315,63 @@ void PostgreSQL::onlineNotificationThread() continue; // skip members trying to join non-existant networks } - lastOnlineCumulative[i->first] = i->second.first; - - - std::string networkId(nwidTmp); - std::string memberId(memTmp); - std::vector<std::string> &members = updateMap[networkId]; members.push_back(memberId); - int64_t ts = i->second.first; - std::string ipAddr = i->second.second.toIpString(ipTmp); - std::string timestamp = std::to_string(ts); - - const char *values[4] = { + lastOnlineCumulative[i->first] = i->second.first; + std::string networkId(nwidTmp); + std::string memberId(memTmp); + + const char *qvals[2] = { networkId.c_str(), - memberId.c_str(), - (ipAddr.empty() ? NULL : ipAddr.c_str()), - timestamp.c_str(), + memberId.c_str() }; res = PQexecParams(conn, - "INSERT INTO ztc_member_status (network_id, member_id, address, last_updated) VALUES ($1, $2, $3, TO_TIMESTAMP($4::double precision/1000)) " - "ON CONFLICT (network_id, member_id) DO UPDATE SET address = EXCLUDED.address, last_updated = EXCLUDED.last_updated", - 4, // number of parameters - NULL, // oid field. ignore - values, // values for substitution - NULL, // lengths in bytes of each value + "SELECT id, network_id FROM ztc_member WHERE network_id = $1 AND id = $2", + 2, + NULL, + qvals, + NULL, NULL, 0); - if (PQresultStatus(res) != PGRES_COMMAND_OK) { - fprintf(stderr, "Error on Member Status upsert: %s\n", PQresultErrorMessage(res)); + if (PQresultStatus(res) != PGRES_TUPLES_OK) { + fprintf(stderr, "Member count failed: %s", PQerrorMessage(conn)); PQclear(res); continue; } + + int nrows = PQntuples(res); + PQclear(res); + + if (nrows == 1) { + int64_t ts = i->second.first; + std::string ipAddr = i->second.second.toIpString(ipTmp); + std::string timestamp = std::to_string(ts); + + if (firstRun) { + firstRun = false; + } else { + memberUpdate << ", "; + } + + memberUpdate << "('" << networkId << "', '" << memberId << '", ' << ipAddr << "', TO_TIMESTAMP(" << timestamp << "::double precision/1000))"; + memberAdded = true; + } else if (nrows > 1) { + fprintf(stderr, "nrows > 1?!?"); + continue; + } else { + continue; + } + } + memberUpdate << " ON CONFLICT (network_id, member_id) DO UPDATE SET address = EXCLUDED.address, last_updated = EXCLUDED.last_updated;"; + + if (memberAdded) { + res = PQexec(conn, memberUpdate.str().c_str()); + if (res != PGRES_COMMAND_OK) { + fprintf(stderr, "Multiple insert failed: %s", PQerrorMessage(conn)); + } PQclear(res); } @@ -1361,6 +1387,10 @@ void PostgreSQL::onlineNotificationThread() } } + std::stringstream networkUpdate; + networkUpdate << "INSERT INTO ztc_network_status (network_id, bridge_count, authorized_member_count, online_member_count, total_member_count, last_modified) VALUES "; + bool nwFirstRun = true; + bool networkAdded = false; for (auto i = networks.begin(); i != networks.end(); ++i) { char tmp[64]; Utils::hex(i->first, tmp); @@ -1392,37 +1422,61 @@ void PostgreSQL::onlineNotificationThread() } } - std::string bc = std::to_string(bridgeCount); - std::string amc = std::to_string(authMemberCount); - std::string omc = std::to_string(onlineMemberCount); - std::string tmc = std::to_string(totalMemberCount); - std::string timestamp = std::to_string(ts); - const char *values[6] = { - networkId.c_str(), - bc.c_str(), - amc.c_str(), - omc.c_str(), - tmc.c_str(), - timestamp.c_str() + char *nvals[1] = { + networkId.c_str(); }; - res = PQexecParams(conn, "INSERT INTO ztc_network_status (network_id, bridge_count, authorized_member_count, " - "online_member_count, total_member_count, last_modified) VALUES ($1, $2, $3, $4, $5, TO_TIMESTAMP($6::double precision/1000)) " - "ON CONFLICT (network_id) DO UPDATE SET bridge_count = EXCLUDED.bridge_count, " - "authorized_member_count = EXCLUDED.authorized_member_count, online_member_count = EXCLUDED.online_member_count, " - "total_member_count = EXCLUDED.total_member_count, last_modified = EXCLUDED.last_modified", - 6, + res = PQExecParams(conn, + "SELECT id FROM ztc_network WHERE id = $1", + 1, NULL, - values, + nvals, NULL, NULL, 0); - - if (PQresultStatus(res) != PGRES_COMMAND_OK) { - fprintf(stderr, "ERROR: Error on Network Status upsert (onlineNotificationThread): %s\n", PQresultErrorMessage(res)); + + if (PQresultStatus(res) != PGRES_TUPLES_OK) { + fprintf(stderr, "Network lookup failed: %s", PQerrorMessage(conn)); PQclear(res); continue; } + + int nrows = PQntuples(res); + PQclear(res); + + if (nrows == 1) { + std::string bc = std::to_string(bridgeCount); + std::string amc = std::to_string(authMemberCount); + std::string omc = std::to_string(onlineMemberCount); + std::string tmc = std::to_string(totalMemberCount); + std::string timestamp = std::to_string(ts); + + if (nwFirstRun) { + nwFirstRun = false; + } else { + networkUpdate << ", "; + } + + networkUpdate << "('" << networkId << "', " << bc << ", " << amc << ", " << omc << ", " << tmc << ", " + << "TO_TIMESTAMP(" << timestamp << "::double precision/1000))"; + + networkAdded = true; + + } else if (nrows > 1) { + fprintf(stderr, "Number of networks > 1?!?!?"); + continue; + } else { + continue; + } + } + networkUpdate << " ON CONFLICT (network_id) DO UPDATE SET bridge_count = EXCLUDED.bridge_count, " + << "authorized_member_count = EXCLUDED.authorized_member_count, online_member_count = EXCLUDED.online_member_count, " + << "total_member_count = EXCLUDED.total_member_count, last_modified = EXCLUDED.last_modified"; + if (networkAdded) { + res = PQExec(conn, networkUpdate.str().c_str()); + if (res != PGRES_COMMAND_OK) { + fprintf(stderr, "Error during multiple network upsert: %s", PQresultErrorMessage(res)); + } PQclear(res); } } @@ -1444,7 +1498,7 @@ void PostgreSQL::onlineNotificationThread() // PQclear(res); // } - std::this_thread::sleep_for(std::chrono::milliseconds(250)); + std::this_thread::sleep_for(std::chrono::milliseconds(0)); } fprintf(stderr, "%s: Fell out of run loop in onlineNotificationThread\n", _myAddressStr.c_str()); PQfinish(conn); |