summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGrant Limberg <grant.limberg@zerotier.com>2019-04-04 12:40:49 -0700
committerGrant Limberg <grant.limberg@zerotier.com>2019-04-04 12:40:49 -0700
commit55a9e6e05ebd18ca6d9b4f1ac482b980173d832d (patch)
tree586e17398f1f2f51ac35a5399fd9f95691812616
parent02f0eead1ce979297f370be1a66b4defdefc37a8 (diff)
downloadinfinitytier-55a9e6e05ebd18ca6d9b4f1ac482b980173d832d.tar.gz
infinitytier-55a9e6e05ebd18ca6d9b4f1ac482b980173d832d.zip
Queue up inserts in onlineNotificationThread() into a multiple insert statement
-rw-r--r--controller/PostgreSQL.cpp144
1 files changed, 99 insertions, 45 deletions
diff --git a/controller/PostgreSQL.cpp b/controller/PostgreSQL.cpp
index a856e4af..aa87331c 100644
--- a/controller/PostgreSQL.cpp
+++ b/controller/PostgreSQL.cpp
@@ -1298,6 +1298,10 @@ void PostgreSQL::onlineNotificationThread()
PGresult *res = NULL;
+ std::stringstream memberUpdate;
+ memberUpdate << "INSERT INTO ztc_member_status (network_id, member_id, address, last_updated) VALUES ";
+ bool firstRun = true;
+ bool memberAdded = false;
for (auto i=lastOnline.begin(); i != lastOnline.end(); ++i) {
uint64_t nwid_i = i->first.first;
char nwidTmp[64];
@@ -1311,41 +1315,63 @@ void PostgreSQL::onlineNotificationThread()
continue; // skip members trying to join non-existant networks
}
- lastOnlineCumulative[i->first] = i->second.first;
-
-
- std::string networkId(nwidTmp);
- std::string memberId(memTmp);
-
std::vector<std::string> &members = updateMap[networkId];
members.push_back(memberId);
- int64_t ts = i->second.first;
- std::string ipAddr = i->second.second.toIpString(ipTmp);
- std::string timestamp = std::to_string(ts);
-
- const char *values[4] = {
+ lastOnlineCumulative[i->first] = i->second.first;
+ std::string networkId(nwidTmp);
+ std::string memberId(memTmp);
+
+ const char *qvals[2] = {
networkId.c_str(),
- memberId.c_str(),
- (ipAddr.empty() ? NULL : ipAddr.c_str()),
- timestamp.c_str(),
+ memberId.c_str()
};
res = PQexecParams(conn,
- "INSERT INTO ztc_member_status (network_id, member_id, address, last_updated) VALUES ($1, $2, $3, TO_TIMESTAMP($4::double precision/1000)) "
- "ON CONFLICT (network_id, member_id) DO UPDATE SET address = EXCLUDED.address, last_updated = EXCLUDED.last_updated",
- 4, // number of parameters
- NULL, // oid field. ignore
- values, // values for substitution
- NULL, // lengths in bytes of each value
+ "SELECT id, network_id FROM ztc_member WHERE network_id = $1 AND id = $2",
+ 2,
+ NULL,
+ qvals,
+ NULL,
NULL,
0);
- if (PQresultStatus(res) != PGRES_COMMAND_OK) {
- fprintf(stderr, "Error on Member Status upsert: %s\n", PQresultErrorMessage(res));
+ if (PQresultStatus(res) != PGRES_TUPLES_OK) {
+ fprintf(stderr, "Member count failed: %s", PQerrorMessage(conn));
PQclear(res);
continue;
}
+
+ int nrows = PQntuples(res);
+ PQclear(res);
+
+ if (nrows == 1) {
+ int64_t ts = i->second.first;
+ std::string ipAddr = i->second.second.toIpString(ipTmp);
+ std::string timestamp = std::to_string(ts);
+
+ if (firstRun) {
+ firstRun = false;
+ } else {
+ memberUpdate << ", ";
+ }
+
+ memberUpdate << "('" << networkId << "', '" << memberId << '", ' << ipAddr << "', TO_TIMESTAMP(" << timestamp << "::double precision/1000))";
+ memberAdded = true;
+ } else if (nrows > 1) {
+ fprintf(stderr, "nrows > 1?!?");
+ continue;
+ } else {
+ continue;
+ }
+ }
+ memberUpdate << " ON CONFLICT (network_id, member_id) DO UPDATE SET address = EXCLUDED.address, last_updated = EXCLUDED.last_updated;";
+
+ if (memberAdded) {
+ res = PQexec(conn, memberUpdate.str().c_str());
+ if (res != PGRES_COMMAND_OK) {
+ fprintf(stderr, "Multiple insert failed: %s", PQerrorMessage(conn));
+ }
PQclear(res);
}
@@ -1361,6 +1387,10 @@ void PostgreSQL::onlineNotificationThread()
}
}
+ std::stringstream networkUpdate;
+ networkUpdate << "INSERT INTO ztc_network_status (network_id, bridge_count, authorized_member_count, online_member_count, total_member_count, last_modified) VALUES ";
+ bool nwFirstRun = true;
+ bool networkAdded = false;
for (auto i = networks.begin(); i != networks.end(); ++i) {
char tmp[64];
Utils::hex(i->first, tmp);
@@ -1392,37 +1422,61 @@ void PostgreSQL::onlineNotificationThread()
}
}
- std::string bc = std::to_string(bridgeCount);
- std::string amc = std::to_string(authMemberCount);
- std::string omc = std::to_string(onlineMemberCount);
- std::string tmc = std::to_string(totalMemberCount);
- std::string timestamp = std::to_string(ts);
- const char *values[6] = {
- networkId.c_str(),
- bc.c_str(),
- amc.c_str(),
- omc.c_str(),
- tmc.c_str(),
- timestamp.c_str()
+ char *nvals[1] = {
+ networkId.c_str();
};
- res = PQexecParams(conn, "INSERT INTO ztc_network_status (network_id, bridge_count, authorized_member_count, "
- "online_member_count, total_member_count, last_modified) VALUES ($1, $2, $3, $4, $5, TO_TIMESTAMP($6::double precision/1000)) "
- "ON CONFLICT (network_id) DO UPDATE SET bridge_count = EXCLUDED.bridge_count, "
- "authorized_member_count = EXCLUDED.authorized_member_count, online_member_count = EXCLUDED.online_member_count, "
- "total_member_count = EXCLUDED.total_member_count, last_modified = EXCLUDED.last_modified",
- 6,
+ res = PQExecParams(conn,
+ "SELECT id FROM ztc_network WHERE id = $1",
+ 1,
NULL,
- values,
+ nvals,
NULL,
NULL,
0);
-
- if (PQresultStatus(res) != PGRES_COMMAND_OK) {
- fprintf(stderr, "ERROR: Error on Network Status upsert (onlineNotificationThread): %s\n", PQresultErrorMessage(res));
+
+ if (PQresultStatus(res) != PGRES_TUPLES_OK) {
+ fprintf(stderr, "Network lookup failed: %s", PQerrorMessage(conn));
PQclear(res);
continue;
}
+
+ int nrows = PQntuples(res);
+ PQclear(res);
+
+ if (nrows == 1) {
+ std::string bc = std::to_string(bridgeCount);
+ std::string amc = std::to_string(authMemberCount);
+ std::string omc = std::to_string(onlineMemberCount);
+ std::string tmc = std::to_string(totalMemberCount);
+ std::string timestamp = std::to_string(ts);
+
+ if (nwFirstRun) {
+ nwFirstRun = false;
+ } else {
+ networkUpdate << ", ";
+ }
+
+ networkUpdate << "('" << networkId << "', " << bc << ", " << amc << ", " << omc << ", " << tmc << ", "
+ << "TO_TIMESTAMP(" << timestamp << "::double precision/1000))";
+
+ networkAdded = true;
+
+ } else if (nrows > 1) {
+ fprintf(stderr, "Number of networks > 1?!?!?");
+ continue;
+ } else {
+ continue;
+ }
+ }
+ networkUpdate << " ON CONFLICT (network_id) DO UPDATE SET bridge_count = EXCLUDED.bridge_count, "
+ << "authorized_member_count = EXCLUDED.authorized_member_count, online_member_count = EXCLUDED.online_member_count, "
+ << "total_member_count = EXCLUDED.total_member_count, last_modified = EXCLUDED.last_modified";
+ if (networkAdded) {
+ res = PQExec(conn, networkUpdate.str().c_str());
+ if (res != PGRES_COMMAND_OK) {
+ fprintf(stderr, "Error during multiple network upsert: %s", PQresultErrorMessage(res));
+ }
PQclear(res);
}
}
@@ -1444,7 +1498,7 @@ void PostgreSQL::onlineNotificationThread()
// PQclear(res);
// }
- std::this_thread::sleep_for(std::chrono::milliseconds(250));
+ std::this_thread::sleep_for(std::chrono::milliseconds(0));
}
fprintf(stderr, "%s: Fell out of run loop in onlineNotificationThread\n", _myAddressStr.c_str());
PQfinish(conn);