diff options
author | Grant Limberg <grant.limberg@zerotier.com> | 2018-11-30 10:37:27 -0800 |
---|---|---|
committer | Grant Limberg <grant.limberg@zerotier.com> | 2018-11-30 10:40:57 -0800 |
commit | ccb856749fba9e2fa6bc844f13527888e832e04f (patch) | |
tree | 1fbe9465efe8c681e9d87e1afe45a352ce4b4976 | |
parent | 9d2bc9bf4e2c0be7f763297c5509e995703747d0 (diff) | |
download | infinitytier-ccb856749fba9e2fa6bc844f13527888e832e04f.tar.gz infinitytier-ccb856749fba9e2fa6bc844f13527888e832e04f.zip |
back to plain old libpq in dev
-rw-r--r-- | controller/PostgreSQL.cpp | 1312 | ||||
-rw-r--r-- | controller/PostgreSQL.hpp | 11 | ||||
-rw-r--r-- | make-linux.mk | 2 | ||||
-rwxr-xr-x | update_controllers.sh | 2 |
4 files changed, 848 insertions, 479 deletions
diff --git a/controller/PostgreSQL.cpp b/controller/PostgreSQL.cpp index c99974fd..4182e0f8 100644 --- a/controller/PostgreSQL.cpp +++ b/controller/PostgreSQL.cpp @@ -22,7 +22,7 @@ #include "EmbeddedNetworkController.hpp" #include "../version.h" -#include <pqxx/pqxx> +#include <libpq-fe.h> #include <sstream> using json = nlohmann::json; @@ -62,67 +62,6 @@ std::string join(const std::vector<std::string> &elements, const char * const se } -namespace ZeroTier { - -class _MemberNotificationReceiver : public pqxx::notification_receiver -{ -private: - ZeroTier::PostgreSQL *_pgsql; - -public: - _MemberNotificationReceiver(pqxx::connection_base &c, const std::string &channel, ZeroTier::PostgreSQL *pgsql) - : pqxx::notification_receiver(c, channel) - , _pgsql(pgsql) - {} - - virtual void operator()(const std::string &payload, int backend_pid) - { - try { - json tmp(json::parse(payload)); - json &ov = tmp["old_val"]; - json &nv = tmp["new_val"]; - json oldConfig, newConfig; - if (ov.is_object()) oldConfig = ov; - if (nv.is_object()) newConfig = nv; - if (oldConfig.is_object() || newConfig.is_object()) { - _pgsql->_memberChanged(oldConfig,newConfig,_pgsql->isReady()); - } - } catch (std::exception &e) { - fprintf(stderr, "Exception parsing member notification: %s\n", e.what()); - } - } -}; - -class _NetworkNotificationReceiver : public pqxx::notification_receiver -{ -private: - ZeroTier::PostgreSQL *_pgsql; -public: - _NetworkNotificationReceiver(pqxx::connection_base &c, const std::string &channel, ZeroTier::PostgreSQL *pgsql) - : pqxx::notification_receiver(c, channel) - , _pgsql(pgsql) - {} - - virtual void operator()(const std::string &payload, int backend_pid) - { - try { - json tmp(json::parse(payload)); - json &ov = tmp["old_val"]; - json &nv = tmp["new_val"]; - json oldConfig, newConfig; - if (ov.is_object()) oldConfig = ov; - if (nv.is_object()) newConfig = nv; - if (oldConfig.is_object()||newConfig.is_object()) { - _pgsql->_networkChanged(oldConfig,newConfig,_pgsql->isReady()); - } - } catch (std::exception &e) { - fprintf(stderr, "Exception parsing member notification: %s\n", e.what()); - } // ignore bad records - } -}; - -} // namespace ZeroTier - using namespace ZeroTier; PostgreSQL::PostgreSQL(EmbeddedNetworkController *const nc, const Identity &myId, const char *path) @@ -132,8 +71,7 @@ PostgreSQL::PostgreSQL(EmbeddedNetworkController *const nc, const Identity &myId , _run(1) , _waitNoticePrinted(false) { - fprintf(stderr, "PostgreSQL Constructed"); - _connString = std::string(path); + _connString = std::string(path) + " application_name=controller_" +_myAddressStr; _readyLock.lock(); _heartbeatThread = std::thread(&PostgreSQL::heartbeat, this); @@ -235,101 +173,157 @@ void PostgreSQL::nodeIsOnline(const uint64_t networkId, const uint64_t memberId, } } -void PostgreSQL::initializeNetworks(pqxx::connection &conn) +void PostgreSQL::initializeNetworks(PGconn *conn) { try { - if (!conn.is_open()) { - fprintf(stderr, "Bad Database Connection in initializeNetworks\n"); + if (PQstatus(conn) != CONNECTION_OK) { + fprintf(stderr, "Bad Database Connection: %s", PQerrorMessage(conn)); exit(1); } - pqxx::work w(conn); + const char *params[1] = { + _myAddressStr.c_str() + }; - pqxx::result r = w.exec("SELECT id, EXTRACT(EPOCH FROM creation_time AT TIME ZONE 'UTC')*1000 AS creation_time, capabilities, " - "enable_broadcast, EXTRACT(EPOCH FROM last_modified AT TIME ZONE 'UTC')*1000 AS last_modified, mtu, multicast_limit, name, private, remote_trace_level, " + PGresult *res = PQexecParams(conn, "SELECT id, EXTRACT(EPOCH FROM creation_time AT TIME ZONE 'UTC')*1000, capabilities, " + "enable_broadcast, EXTRACT(EPOCH FROM last_modified AT TIME ZONE 'UTC')*1000, mtu, multicast_limit, name, private, remote_trace_level, " "remote_trace_target, revision, rules, tags, v4_assign_mode, v6_assign_mode FROM ztc_network " - "WHERE deleted = false AND controller_id = '" + w.esc(_myAddressStr) + "'"); - + "WHERE deleted = false AND controller_id = $1", + 1, + NULL, + params, + NULL, + NULL, + 0); + + if (PQresultStatus(res) != PGRES_TUPLES_OK) { + fprintf(stderr, "Networks Initialization Failed: %s", PQerrorMessage(conn)); + PQclear(res); + exit(1); + } - for (pqxx::result::const_iterator row = r.begin(); row != r.end(); ++row) { + int numRows = PQntuples(res); + for (int i = 0; i < numRows; ++i) { json empty; json config; - std::string nwid = row["id"].as<std::string>(); - config["id"] = nwid; - config["nwid"] = nwid; + const char *nwidparam[1] = { + PQgetvalue(res, i, 0) + }; + + config["id"] = PQgetvalue(res, i, 0); + config["nwid"] = PQgetvalue(res, i, 0); try { - config["creationTime"] = row["creation_time"].as<uint64_t>(); - } catch(std::exception &e) { + config["creationTime"] = std::stoull(PQgetvalue(res, i, 1)); + } catch (std::exception &e) { config["creationTime"] = 0ULL; + //fprintf(stderr, "Error converting creation time: %s\n", PQgetvalue(res, i, 1)); } - config["capabilities"] = json::parse(row["capabilities"].as<std::string>()); - config["enableBroadcast"] = row["enable_broadcast"].as<bool>(); + config["capabilities"] = json::parse(PQgetvalue(res, i, 2)); + config["enableBroadcast"] = (strcmp(PQgetvalue(res, i, 3),"t")==0); try { - config["lastModified"] = row["last_modified"].as<uint64_t>(); + config["lastModified"] = std::stoull(PQgetvalue(res, i, 4)); } catch (std::exception &e) { config["lastModified"] = 0ULL; + //fprintf(stderr, "Error converting last modified: %s\n", PQgetvalue(res, i, 4)); } try { - config["mtu"] = row["mtu"].as<int>(); + config["mtu"] = std::stoi(PQgetvalue(res, i, 5)); } catch (std::exception &e) { config["mtu"] = 2800; } try { - config["multicastLimit"] = row["multicast_limit"].as<int>(); + config["multicastLimit"] = std::stoi(PQgetvalue(res, i, 6)); } catch (std::exception &e) { config["multicastLimit"] = 64; } - config["name"] = row["name"].as<std::string>(); - config["private"] = row["private"].as<bool>(); + config["name"] = PQgetvalue(res, i, 7); + config["private"] = (strcmp(PQgetvalue(res, i, 8),"t")==0); try { - config["remoteTraceLevel"] = row["remote_trace_level"].as<int>(); + config["remoteTraceLevel"] = std::stoi(PQgetvalue(res, i, 9)); } catch (std::exception &e) { config["remoteTraceLevel"] = 0; } - config["remoteTraceTarget"] = (row["remote_trace_target"].is_null() ? nullptr : row["remote_trace_target"].as<std::string>()); + config["remoteTraceTarget"] = PQgetvalue(res, i, 10); try { - config["revision"] = row["revision"].as<uint64_t>(); + config["revision"] = std::stoull(PQgetvalue(res, i, 11)); } catch (std::exception &e) { config["revision"] = 0ULL; //fprintf(stderr, "Error converting revision: %s\n", PQgetvalue(res, i, 11)); } - config["rules"] = json::parse(row["rules"].as<std::string>()); - config["tags"] = json::parse(row["tags"].as<std::string>()); - config["v4AssignMode"] = json::parse(row["v4_assign_mode"].as<std::string>()); - config["v6AssignMode"] = json::parse(row["v6_assign_mode"].as<std::string>()); + config["rules"] = json::parse(PQgetvalue(res, i, 12)); + config["tags"] = json::parse(PQgetvalue(res, i, 13)); + config["v4AssignMode"] = json::parse(PQgetvalue(res, i, 14)); + config["v6AssignMode"] = json::parse(PQgetvalue(res, i, 15)); config["objtype"] = "network"; config["ipAssignmentPools"] = json::array(); config["routes"] = json::array(); - pqxx::work w2(conn); - pqxx::result res2 = w2.exec("SELECT host(ip_range_start) AS ip_range_start, host(ip_range_end) AS ip_range_end FROM ztc_network_assignment_pool WHERE network_id = '" + w2.esc(nwid) + "'"); - for(pqxx::result::const_iterator it = res2.begin(); it != res2.end(); ++it) { + PGresult *r2 = PQexecParams(conn, + "SELECT host(ip_range_start), host(ip_range_end) FROM ztc_network_assignment_pool WHERE network_id = $1", + 1, + NULL, + nwidparam, + NULL, + NULL, + 0); + + if (PQresultStatus(r2) != PGRES_TUPLES_OK) { + fprintf(stderr, "ERROR: Error retreiving IP pools for network: %s\n", PQresultErrorMessage(r2)); + PQclear(r2); + PQclear(res); + exit(1); + } + + int n = PQntuples(r2); + for (int j = 0; j < n; ++j) { json ip; - ip["ipRangeStart"] = it["ip_range_start"].as<std::string>(); - ip["ipRangeEnd"] = it["ip_range_end"].as<std::string>(); + ip["ipRangeStart"] = PQgetvalue(r2, j, 0); + ip["ipRangeEnd"] = PQgetvalue(r2, j, 1); + config["ipAssignmentPools"].push_back(ip); } - w2.commit(); - pqxx::work w3(conn); - pqxx::result res3 = w3.exec("SELECT host(address) AS address, bits, host(via) AS via FROM ztc_network_route WHERE network_id = '" + w3.esc(nwid) + "'"); - for(pqxx::result::const_iterator it = res3.begin(); it != res3.end(); ++it) { + PQclear(r2); + + r2 = PQexecParams(conn, + "SELECT host(address), bits, host(via) FROM ztc_network_route WHERE network_id = $1", + 1, + NULL, + nwidparam, + NULL, + NULL, + 0); + + if (PQresultStatus(r2) != PGRES_TUPLES_OK) { + fprintf(stderr, "ERROR: Error retreiving routes for network: %s\n", PQresultErrorMessage(r2)); + PQclear(r2); + PQclear(res); + exit(1); + } + + n = PQntuples(r2); + for (int j = 0; j < n; ++j) { + std::string addr = PQgetvalue(r2, j, 0); + std::string bits = PQgetvalue(r2, j, 1); + std::string via = PQgetvalue(r2, j, 2); json route; - route["target"] = it["address"].as<std::string>() + "/" + it["bits"].as<std::string>(); - if (route["via"].is_null()) { + route["target"] = addr + "/" + bits; + + if (via == "NULL") { route["via"] = nullptr; } else { - route["via"] = it["via"].as<std::string>(); + route["via"] = via; } config["routes"].push_back(route); } - w3.commit(); + PQclear(r2); + _networkChanged(empty, config, false); } - w.commit(); + PQclear(res); if (++this->_ready == 2) { if (_waitNoticePrinted) { @@ -343,115 +337,147 @@ void PostgreSQL::initializeNetworks(pqxx::connection &conn) } } -void PostgreSQL::initializeMembers(pqxx::connection &conn) +void PostgreSQL::initializeMembers(PGconn *conn) { try { - if (!conn.is_open()) { - fprintf(stderr, "Bad Database Connection in initializeMembers\n"); + if (PQstatus(conn) != CONNECTION_OK) { + fprintf(stderr, "Bad Database Connection: %s", PQerrorMessage(conn)); exit(1); } - pqxx::work w(conn); - pqxx::result res = w.exec( - "SELECT m.id AS id, m.network_id AS network_id, m.active_bridge AS active_bridge, " - " m.authorized AS authorized, m.capabilities AS capabilities, " - " EXTRACT(EPOCH FROM m.creation_time AT TIME ZONE 'UTC')*1000 AS creation_time, m.identity AS identity, " - " EXTRACT(EPOCH FROM m.last_authorized_time AT TIME ZONE 'UTC')*1000 AS last_authorized_time, " - " EXTRACT(EPOCH FROM m.last_deauthorized_time AT TIME ZONE 'UTC')*1000 AS last_deauthorized_time, " - " m.remote_trace_level AS remote_trace_level, m.remote_trace_target AS remote_trace_target, " - " m.tags AS tags, m.v_major AS v_major, m.v_minor AS v_minor, m.v_rev AS v_rev, " - " m.v_proto AS v_proto, m.no_auto_assign_ips AS no_auto_assign_ips, m.revision AS revision " + const char *params[1] = { + _myAddressStr.c_str() + }; + + PGresult *res = PQexecParams(conn, + "SELECT m.id, m.network_id, m.active_bridge, m.authorized, m.capabilities, EXTRACT(EPOCH FROM m.creation_time AT TIME ZONE 'UTC')*1000, m.identity, " + " EXTRACT(EPOCH FROM m.last_authorized_time AT TIME ZONE 'UTC')*1000, " + " EXTRACT(EPOCH FROM m.last_deauthorized_time AT TIME ZONE 'UTC')*1000, " + " m.remote_trace_level, m.remote_trace_target, m.tags, m.v_major, m.v_minor, m.v_rev, m.v_proto, " + " m.no_auto_assign_ips, m.revision " "FROM ztc_member m " "INNER JOIN ztc_network n " " ON n.id = m.network_id " - "WHERE n.controller_id = '" + w.esc(_myAddressStr) + "' AND m.deleted = false" - ); + "WHERE n.controller_id = $1 AND m.deleted = false", + 1, + NULL, + params, + NULL, + NULL, + 0); + + if (PQresultStatus(res) != PGRES_TUPLES_OK) { + fprintf(stderr, "Member Initialization Failed: %s", PQerrorMessage(conn)); + PQclear(res); + exit(1); + } - for(pqxx::result::const_iterator row = res.begin(); row != res.end(); ++row) { + int numRows = PQntuples(res); + for (int i = 0; i < numRows; ++i) { json empty; json config; - std::string networkId = row["network_id"].as<std::string>(); - std::string memberId = row["id"].as<std::string>(); - + std::string memberId(PQgetvalue(res, i, 0)); + std::string networkId(PQgetvalue(res, i, 1)); + std::string ctime = PQgetvalue(res, i, 5); config["id"] = memberId; config["nwid"] = networkId; - config["activeBridge"] = row["active_bridge"].as<bool>(); - config["authorized"] = row["authorized"].as<bool>(); + config["activeBridge"] = (strcmp(PQgetvalue(res, i, 2), "t") == 0); + config["authorized"] = (strcmp(PQgetvalue(res, i, 3), "t") == 0); try { - config["capabilities"] = json::parse(row["capabilities"].as<std::string>()); - } catch(std::exception &e) { + config["capabilities"] = json::parse(PQgetvalue(res, i, 4)); + } catch (std::exception &e) { config["capabilities"] = json::array(); } try { - config["creationTime"] = row["creation_time"].as<uint64_t>(); - } catch(std::exception &e) { + config["creationTime"] = std::stoull(PQgetvalue(res, i, 5)); + } catch (std::exception &e) { config["creationTime"] = 0ULL; + //fprintf(stderr, "Error upding creation time (member): %s\n", PQgetvalue(res, i, 5)); } - config["identity"] = row["identity"].as<std::string>(); + config["identity"] = PQgetvalue(res, i, 6); try { - config["lastAuthorizedTime"] = row["last_authorized_time"].as<uint64_t>(); + config["lastAuthorizedTime"] = std::stoull(PQgetvalue(res, i, 7)); } catch(std::exception &e) { config["lastAuthorizedTime"] = 0ULL; + //fprintf(stderr, "Error updating last auth time (member): %s\n", PQgetvalue(res, i, 7)); } try { - config["lastDeauthorizedTime"] = row["last_deauthorized_time"].as<uint64_t>(); - } catch(std::exception &e) { + config["lastDeauthorizedTime"] = std::stoull(PQgetvalue(res, i, 8)); + } catch( std::exception &e) { config["lastDeauthorizedTime"] = 0ULL; + //fprintf(stderr, "Error updating last deauth time (member): %s\n", PQgetvalue(res, i, 8)); } try { - config["remoteTraceLevel"] = row["remote_trace_level"].as<int>(); - } catch(std::exception &e) { + config["remoteTraceLevel"] = std::stoi(PQgetvalue(res, i, 9)); + } catch (std::exception &e) { config["remoteTraceLevel"] = 0; } - config["remoteTraceTarget"] = (row["remote_trace_target"].is_null() ? nullptr : row["remote_trace_target"].as<std::string>()); + config["remoteTraceTarget"] = PQgetvalue(res, i, 10); try { - config["tags"] = json::parse(row["tags"].as<std::string>()); - } catch(std::exception &e) { + config["tags"] = json::parse(PQgetvalue(res, i, 11)); + } catch (std::exception &e) { config["tags"] = json::array(); } try { - config["vMajor"] = row["v_major"].as<int>(); + config["vMajor"] = std::stoi(PQgetvalue(res, i, 12)); } catch(std::exception &e) { config["vMajor"] = -1; } try { - config["vMinor"] = row["v_minor"].as<int>(); + config["vMinor"] = std::stoi(PQgetvalue(res, i, 13)); } catch (std::exception &e) { config["vMinor"] = -1; } try { - config["vRev"] = row["v_rev"].as<int>(); + config["vRev"] = std::stoi(PQgetvalue(res, i, 14)); } catch (std::exception &e) { config["vRev"] = -1; } try { - config["vProto"] = row["v_proto"].as<int>(); + config["vProto"] = std::stoi(PQgetvalue(res, i, 15)); } catch (std::exception &e) { config["vProto"] = -1; } - config["noAutoAssignIps"] = row["no_auto_assign_ips"].as<bool>(); + config["noAutoAssignIps"] = (strcmp(PQgetvalue(res, i, 16), "t") == 0); try { - config["revision"] = row["revision"].as<uint64_t>(); + config["revision"] = std::stoull(PQgetvalue(res, i, 17)); } catch (std::exception &e) { config["revision"] = 0ULL; //fprintf(stderr, "Error updating revision (member): %s\n", PQgetvalue(res, i, 17)); } config["objtype"] = "member"; config["ipAssignments"] = json::array(); + const char *p2[2] = { + memberId.c_str(), + networkId.c_str() + }; + + PGresult *r2 = PQexecParams(conn, + "SELECT DISTINCT address FROM ztc_member_ip_assignment WHERE member_id = $1 AND network_id = $2", + 2, + NULL, + p2, + NULL, + NULL, + 0); + + if (PQresultStatus(r2) != PGRES_TUPLES_OK) { + fprintf(stderr, "Member Initialization Failed: %s", PQerrorMessage(conn)); + PQclear(r2); + PQclear(res); + exit(1); + } - pqxx::work w2(conn); - pqxx::result r2 = w2.exec( - "SELECT address FROM ztc_member_ip_assignment WHERE member_id = '"+w2.esc(memberId)+"' AND network_id = '"+w2.esc(networkId)+"'" - ); - for(pqxx::result::const_iterator it = r2.begin(); it != r2.end(); ++it) { - config["ipAssignments"].push_back(it["address"].as<std::string>()); + int n = PQntuples(r2); + for (int j = 0; j < n; ++j) { + config["ipAssignments"].push_back(PQgetvalue(r2, j, 0)); } - w2.commit(); _memberChanged(empty, config, false); } - w.commit(); + + PQclear(res); if (++this->_ready == 2) { if (_waitNoticePrinted) { @@ -484,237 +510,503 @@ void PostgreSQL::heartbeat() const char *publicIdentity = publicId; const char *hostname = hostnameTmp; - fprintf(stderr, "Heartbeat connection opening\n"); - pqxx::connection conn(_connString); - if (!conn.is_open()) { - fprintf(stderr, "Connection to database failed: heartbeat\n"); + PGconn *conn = PQconnectdb(_connString.c_str()); + if (PQstatus(conn) == CONNECTION_BAD) { + fprintf(stderr, "Connection to database failed: %s\n", PQerrorMessage(conn)); + PQfinish(conn); exit(1); } - fprintf(stderr, "Heartbeat connection opened\n"); - conn.prepare("heartbeat", - "INSERT INTO ztc_controller (id, cluster_host, last_alive, public_identity, v_major, v_minor, v_rev, v_build) " - "VALUES ($1, $2, TO_TIMESTAMP($3::double precision/1000), $4, $5, $6, $7, $8) " - "ON CONFLICT (id) DO UPDATE SET cluster_host = EXCLUDED.cluster_host, last_alive = EXCLUDED.last_alive, " - "public_identity = EXCLUDED.public_identity, v_major = EXCLUDED.v_major, v_minor = EXCLUDED.v_minor, " - "v_rev = EXCLUDED.v_rev, v_build = EXCLUDED.v_rev" - ); - while (_run == 1) { - try { - pqxx::work w(conn); - pqxx::result res = w.prepared("heartbeat")(controllerId)(hostname) - (OSUtils::now())(publicIdentity)(ZEROTIER_ONE_VERSION_MAJOR) - (ZEROTIER_ONE_VERSION_MINOR)(ZEROTIER_ONE_VERSION_REVISION) - (ZEROTIER_ONE_VERSION_BUILD).exec(); - w.commit(); - } catch (std::exception &e) { - fprintf(stderr, "Error inserting heartbeat: %s\n", e.what()); - exit(1); + if(PQstatus(conn) != CONNECTION_OK) { + PQfinish(conn); + conn = PQconnectdb(_connString.c_str()); } + if (conn) { + std::string major = std::to_string(ZEROTIER_ONE_VERSION_MAJOR); + std::string minor = std::to_string(ZEROTIER_ONE_VERSION_MINOR); + std::string rev = std::to_string(ZEROTIER_ONE_VERSION_REVISION); + std::string build = std::to_string(ZEROTIER_ONE_VERSION_BUILD); + std::string now = std::to_string(OSUtils::now()); + const char *values[8] = { + controllerId, + hostname, + now.c_str(), + publicIdentity, + major.c_str(), + minor.c_str(), + rev.c_str(), + build.c_str() + }; + + PGresult *res = PQexecParams(conn, + "INSERT INTO ztc_controller (id, cluster_host, last_alive, public_identity, v_major, v_minor, v_rev, v_build) " + "VALUES ($1, $2, TO_TIMESTAMP($3::double precision/1000), $4, $5, $6, $7, $8) " + "ON CONFLICT (id) DO UPDATE SET cluster_host = EXCLUDED.cluster_host, last_alive = EXCLUDED.last_alive, " + "public_identity = EXCLUDED.public_identity, v_major = EXCLUDED.v_major, v_minor = EXCLUDED.v_minor, " + "v_rev = EXCLUDED.v_rev, v_build = EXCLUDED.v_rev", + 8, // number of parameters + NULL, // oid field. ignore + values, // values for substitution + NULL, // lengths in bytes of each value + NULL, // binary? + 0); + + if (PQresultStatus(res) != PGRES_COMMAND_OK) { + fprintf(stderr, "Heartbeat Update Failed: %s\n", PQresultErrorMessage(res)); + } + PQclear(res); + } + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); } + + PQfinish(conn); + conn = NULL; } void PostgreSQL::membersDbWatcher() { - try { - pqxx::connection conn(_connString); - if (!conn.is_open()) { - fprintf(stderr, "Connection to database failed: membersDbWatcher\n"); - exit(1); + PGconn *conn = PQconnectdb(_connString.c_str()); + if (PQstatus(conn) == CONNECTION_BAD) { + fprintf(stderr, "Connection to database failed: %s\n", PQerrorMessage(conn)); + PQfinish(conn); + exit(1); + } + + initializeMembers(conn); + + char buf[11] = {0}; + std::string cmd = "LISTEN member_" + std::string(_myAddress.toString(buf)); + PGresult *res = PQexec(conn, cmd.c_str()); + if (!res || PQresultStatus(res) != PGRES_COMMAND_OK) { + fprintf(stderr, "LISTEN command failed: %s\n", PQresultErrorMessage(res)); + PQclear(res); + PQfinish(conn); + exit(1); + } + + PQclear(res); res = NULL; + + while(_run == 1) { + if (PQstatus(conn) != CONNECTION_OK) { + fprintf(stderr, "ERROR: Member Watcher lost connection to Postgres."); + exit(-1); } + PGnotify *notify = NULL; + PQconsumeInput(conn); + while ((notify = PQnotifies(conn)) != NULL) { + //fprintf(stderr, "ASYNC NOTIFY of '%s' id:%s received\n", notify->relname, notify->extra); - initializeMembers(conn); + try { + json tmp(json::parse(notify->extra)); + json &ov = tmp["old_val"]; + json &nv = tmp["new_val"]; + json oldConfig, newConfig; + if (ov.is_object()) oldConfig = ov; + if (nv.is_object()) newConfig = nv; + if (oldConfig.is_object() || newConfig.is_object()) { + _memberChanged(oldConfig,newConfig,(this->_ready>=2)); + } + } catch (...) {} // ignore bad records - char buf[11] = {0}; - std::string cmd = "member_" + std::string(_myAddress.toString(buf)); - _MemberNotificationReceiver receiver(conn, cmd, this); - while(_run == 1) { - conn.await_notification(5, 0); - std::this_thread::sleep_for(std::chrono::milliseconds(10)); + free(notify); } - conn.disconnect(); - } catch (std::exception &e) { - fprintf(stderr, "Exception in membersDbWatcher: %s\n", e.what()); - exit(1); + std::this_thread::sleep_for(std::chrono::milliseconds(10)); } + PQfinish(conn); + conn = NULL; } void PostgreSQL::networksDbWatcher() { - try { - pqxx::connection conn(_connString); - if (!conn.is_open()) { - fprintf(stderr, "Connection to database failed: networksDbWatcher\n"); - exit(1); - } + PGconn *conn = PQconnectdb(_connString.c_str()); + if (PQstatus(conn) == CONNECTION_BAD) { + fprintf(stderr, "Connection to database failed: %s\n", PQerrorMessage(conn)); + PQfinish(conn); + exit(1); + } - initializeNetworks(conn); + initializeNetworks(conn); - char buf[11] = {0}; - std::string cmd = "network_" + std::string(_myAddress.toString(buf)); - _NetworkNotificationReceiver receiver(conn, cmd, this); - while(_run == 1) { - conn.await_notification(5, 0); - std::this_thread::sleep_for(std::chrono::milliseconds(10)); - } - } catch(std::exception &e) { - fprintf(stderr, "Exception in networksDbWatcher: %s\n", e.what()); + char buf[11] = {0}; + std::string cmd = "LISTEN network_" + std::string(_myAddress.toString(buf)); + PGresult *res = PQexec(conn, cmd.c_str()); + if (!res || PQresultStatus(res) != PGRES_COMMAND_OK) { + fprintf(stderr, "LISTEN command failed: %s\n", PQresultErrorMessage(res)); + PQclear(res); + PQfinish(conn); exit(1); } + + PQclear(res); res = NULL; + + while(_run == 1) { + if (PQstatus(conn) != CONNECTION_OK) { + fprintf(stderr, "ERROR: Network Watcher lost connection to Postgres."); + exit(-1); + } + PGnotify *notify = NULL; + PQconsumeInput(conn); + while ((notify = PQnotifies(conn)) != NULL) { + //fprintf(stderr, "ASYNC NOTIFY of '%s' id:%s received\n", notify->relname, notify->extra); + try { + json tmp(json::parse(notify->extra)); + json &ov = tmp["old_val"]; + json &nv = tmp["new_val"]; + json oldConfig, newConfig; + if (ov.is_object()) oldConfig = ov; + if (nv.is_object()) newConfig = nv; + if (oldConfig.is_object()||newConfig.is_object()) { + _networkChanged(oldConfig,newConfig,(this->_ready >= 2)); + } + } catch (...) {} // ignore bad records + free(notify); + } + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + PQfinish(conn); + conn = NULL; } void PostgreSQL::commitThread() { - pqxx::connection conn(_connString); - if (!conn.is_open()) { - fprintf(stderr, "ERROR: Connection to database failed: commitThread\n"); + PGconn *conn = PQconnectdb(_connString.c_str()); + if (PQstatus(conn) == CONNECTION_BAD) { + fprintf(stderr, "ERROR: Connection to database failed: %s\n", PQerrorMessage(conn)); + PQfinish(conn); exit(1); } - conn.prepare("insert_member", - "INSERT INTO ztc_member (id, network_id, active_bridge, authorized, capabilities, " - "identity, last_authorized_time, last_deauthorized_time, no_auto_assign_ips, " - "remote_trace_level, remote_trace_target, revision, tags, v_major, v_minor, v_rev, v_proto) " - "VALUES ($1, $2, $3, $4, $5, $6, " - "TO_TIMESTAMP($7::double precision/1000), TO_TIMESTAMP($8::double precision/1000), " - "$9, $10, (CASE WHEN $11='' THEN NULL ELSE $1 END), $12, $13, $14, $15, $16, $17) ON CONFLICT (network_id, id) DO UPDATE SET " - "active_bridge = EXCLUDED.active_bridge, authorized = EXCLUDED.authorized, capabilities = EXCLUDED.capabilities, " - "identity = EXCLUDED.identity, last_authorized_time = EXCLUDED.last_authorized_time, " - "last_deauthorized_time = EXCLUDED.last_deauthorized_time, no_auto_assign_ips = EXCLUDED.no_auto_assign_ips, " - "remote_trace_level = EXCLUDED.remote_trace_level, remote_trace_target = EXCLUDED.remote_trace_target, " - "revision = EXCLUDED.revision+1, tags = EXCLUDED.tags, v_major = EXCLUDED.v_major, " - "v_minor = EXCLUDED.v_minor, v_rev = EXCLUDED.v_rev, v_proto = EXCLUDED.v_proto"); - conn.prepare("delete_ip_assignments", - "DELETE FROM ztc_member_ip_assignment WHERE member_id = $1 AND network_id = $2"); - conn.prepare("insert_ip_assignments", - "INSERT INTO ztc_member_ip_assignment (member_id, network_id, address) VALUES ($1, $2, $3)"); - - conn.prepare("update_network", - "UPDATE ztc_network SET controller_id = $2, capabilities = $3, enable_broadcast = $4, " - "last_updated = $5, mtu = $6, multicast_limit = $7, name = $8, private = $9, " - "remote_trace_level = $10, remote_trace_target = $11, rules = $12, rules_source = $13, " - "tags = $14, v4_assign_mode = $15, v6_assign_mode = $16 " - "WHERE id = $1"); - conn.prepare("delete_network_ip_pool", - "DELETE FROM ztc_network_assignment_pool WHERE network_id = $1"); - conn.prepare("insert_network_ip_pool", - "INSERT INTO ztc_network_assignment_pool (network_id, ip_range_start, ip_range_end) VALUES ($1, $2, $3)"); - conn.prepare("delete_network_route", "DELETE FROM ztc_network_route WHERE network_id = $1"); - conn.prepare("insert_network_route", "INSERT INTO ztc_network_route (network_id, address, bits, via) VALUES ($1, $2, $3, $4)"); - conn.prepare("delete_network", "UPDATE ztc_network SET deleted = true WHERE id = $1"); - conn.prepare("delete_member", "UPDATE ztc_member SET hidden = true, deleted = true WHERE id = $1 AND network_id = $2"); - json *config = nullptr; while(_commitQueue.get(config)&(_run == 1)) { if (!config) { continue; } + if (PQstatus(conn) == CONNECTION_BAD) { + fprintf(stderr, "ERROR: Connection to database failed: %s\n", PQerrorMessage(conn)); + PQfinish(conn); + delete config; + exit(1); + } try { const std::string objtype = (*config)["objtype"]; if (objtype == "member") { - std::string memberId = (*config)["id"]; - std::string networkId = (*config)["nwid"]; - std::string target(""); - if (!(*config)["remoteTraceTarget"].is_null()) { - target = (*config)["remoteTraceTarget"]; - } - std::string identity = (*config)["identity"]; - try { + std::string memberId = (*config)["id"]; + std::string networkId = (*config)["nwid"]; + std::string identity = (*config)["identity"]; + std::string target = "NULL"; + + if (!(*config)["remoteTraceTarget"].is_null()) { + target = (*config)["remoteTraceTarget"]; + } + std::string caps = OSUtils::jsonDump((*config)["capabilities"], -1); + std::string lastAuthTime = std::to_string((long long)(*config)["lastAuthorizedTime"]); + std::string lastDeauthTime = std::to_string((long long)(*config)["lastDeauthorizedTime"]); + std::string rtraceLevel = std::to_string((int)(*config)["remoteTraceLevel"]); + std::string rev = std::to_string((unsigned long long)(*config)["revision"]); std::string tags = OSUtils::jsonDump((*config)["tags"], -1); + std::string vmajor = std::to_string((int)(*config)["vMajor"]); + std::string vminor = std::to_string((int)(*config)["vMinor"]); + std::string vrev = std::to_string((int)(*config)["vRev"]); + std::string vproto = std::to_string((int)(*config)["vProto"]); + const char *values[19] = { + memberId.c_str(), + networkId.c_str(), + ((*config)["activeBridge"] ? "true" : "false"), + ((*config)["authorized"] ? "true" : "false"), + caps.c_str(), + identity.c_str(), + lastAuthTime.c_str(), + lastDeauthTime.c_str(), + ((*config)["noAutoAssignIps"] ? "true" : "false"), + rtraceLevel.c_str(), + (target == "NULL") ? NULL : target.c_str(), + rev.c_str(), + tags.c_str(), + vmajor.c_str(), + vminor.c_str(), + vrev.c_str(), + vproto.c_str() + }; + + PGresult *res = PQexecParams(conn, + "INSERT INTO ztc_member (id, network_id, active_bridge, authorized, capabilities, " + "identity, last_authorized_time, last_deauthorized_time, no_auto_assign_ips, " + "remote_trace_level, remote_trace_target, revision, tags, v_major, v_minor, v_rev, v_proto) " + "VALUES ($1, $2, $3, $4, $5, $6, " + "TO_TIMESTAMP($7::double precision/1000), TO_TIMESTAMP($8::double precision/1000), " + "$9, $10, $11, $12, $13, $14, $15, $16, $17) ON CONFLICT (network_id, id) DO UPDATE SET " + "active_bridge = EXCLUDED.active_bridge, authorized = EXCLUDED.authorized, capabilities = EXCLUDED.capabilities, " + "identity = EXCLUDED.identity, last_authorized_time = EXCLUDED.last_authorized_time, " + "last_deauthorized_time = EXCLUDED.last_deauthorized_time, no_auto_assign_ips = EXCLUDED.no_auto_assign_ips, " + "remote_trace_level = EXCLUDED.remote_trace_level, remote_trace_target = EXCLUDED.remote_trace_target, " + "revision = EXCLUDED.revision+1, tags = EXCLUDED.tags, v_major = EXCLUDED.v_major, " + "v_minor = EXCLUDED.v_minor, v_rev = EXCLUDED.v_rev, v_proto = EXCLUDED.v_proto", + 17, + NULL, + values, + NULL, + NULL, + 0); + + if (PQresultStatus(res) != PGRES_COMMAND_OK) { + fprintf(stderr, "ERROR: Error updating member: %s\n", PQresultErrorMessage(res)); + fprintf(stderr, "%s", OSUtils::jsonDump(*config, 2).c_str()); + PQclear(res); + delete config; + config = nullptr; + continue; + } - pqxx::work w(conn); - pqxx::result res = w.prepared("insert_member")(memberId)(networkId) - ((bool)(*config)["activeBridge"])((bool)(*config)["authorized"])(caps) - (identity)((long long)(*config)["lastAuthorizedTime"])((long long)(*config)["lastDeauthorizedTime"]) - ((bool)(*config)["noAutoAssignIps"])((int)(*config)["remoteTraceLevel"]) - (target, !target.empty()) - ((int)(*config)["revision"])(tags)((int)(*config)["vMajor"])((int)(*config)["vMinor"]) - ((int)(*config)["vRev"])((int)(*config)["vProto"]).exec(); - w.commit(); - } catch (std::exception &e) { - fprintf(stderr, "Exception upserting member: %s\n", e.what()); - delete config; - config = nullptr; - continue; - } + PQclear(res); - try { - pqxx::work w(conn); - pqxx::result res = w.prepared("delete_ip_assignments")(memberId)(networkId).exec(); - + res = PQexec(conn, "BEGIN"); + if (PQresultStatus(res) != PGRES_COMMAND_OK) { + fprintf(stderr, "ERROR: Error beginning transaction: %s\n", PQresultErrorMessage(res)); + PQclear(res); + delete config; + config = nullptr; + continue; + } + + PQclear(res); + + const char *v2[2] = { + memberId.c_str(), + networkId.c_str() + }; + + res = PQexecParams(conn, + "DELETE FROM ztc_member_ip_assignment WHERE member_id = $1 AND network_id = $2", + 2, + NULL, + v2, + NULL, + NULL, + 0); + + if (PQresultStatus(res) != PGRES_COMMAND_OK) { + fprintf(stderr, "ERROR: Error updating IP address assignments: %s\n", PQresultErrorMessage(res)); + PQclear(res); + PQclear(PQexec(conn, "ROLLBACK"));; + delete config; + config = nullptr; + continue; + } + + PQclear(res); + + std::vector<std::string> assignments; for (auto i = (*config)["ipAssignments"].begin(); i != (*config)["ipAssignments"].end(); ++i) { std::string addr = *i; - pqxx::result res2 = w.prepared("insert_ip_assignments")(memberId)(networkId)(addr).exec(); + + if (std::find(assignments.begin(), assignments.end(), addr) != assignments.end()) { + continue; + } + + const char *v3[3] = { + memberId.c_str(), + networkId.c_str(), + addr.c_str() + }; + + res = PQexecParams(conn, + "INSERT INTO ztc_member_ip_assignment (member_id, network_id, address) VALUES ($1, $2, $3)", + 3, + NULL, + v3, + NULL, + NULL, + 0); + + if (PQresultStatus(res) != PGRES_COMMAND_OK) { + fprintf(stderr, "ERROR: Error setting IP addresses for member: %s\n", PQresultErrorMessage(res)); + PQclear(res); + PQclear(PQexec(conn, "ROLLBACK")); + break;; + } } - w.commit(); - } catch (std::exception &e) { - fprintf(stderr, "Error assigning member IP addresses: %s\n", e.what()); - delete config; - config = nullptr; - } - const uint64_t nwidInt = OSUtils::jsonIntHex((*config)["nwid"], 0ULL); - const uint64_t memberidInt = OSUtils::jsonIntHex((*config)["id"], 0ULL); - if (nwidInt && memberidInt) { - nlohmann::json nwOrig; - nlohmann::json memOrig; + res = PQexec(conn, "COMMIT"); + if (PQresultStatus(res) != PGRES_COMMAND_OK) { + fprintf(stderr, "ERROR: Error committing ip address data: %s\n", PQresultErrorMessage(res)); + } - nlohmann::json memNew(*config); - - get(nwidInt, nwOrig, memberidInt, memOrig); - - _memberChanged(memOrig, memNew, (this->_ready>=2)); - } else { - fprintf(stderr, "Can't notify of change. Error parsing nwid or memberid: %lu-%lu\n", nwidInt, memberidInt); + PQclear(res); + + const uint64_t nwidInt = OSUtils::jsonIntHex((*config)["nwid"], 0ULL); + const uint64_t memberidInt = OSUtils::jsonIntHex((*config)["id"], 0ULL); + if (nwidInt && memberidInt) { + nlohmann::json nwOrig; + nlohmann::json memOrig; + + nlohmann::json memNew(*config); + + get(nwidInt, nwOrig, memberidInt, memOrig); + + _memberChanged(memOrig, memNew, (this->_ready>=2)); + } else { + fprintf(stderr, "Can't notify of change. Error parsing nwid or memberid: %lu-%lu\n", nwidInt, memberidInt); + } + + } catch (std::exception &e) { + fprintf(stderr, "ERROR: Error updating member: %s\n", e.what()); } } else if (objtype == "network") { - std::string id = (*config)["id"]; - std::string controllerId = _myAddressStr.c_str(); - std::string name = (*config)["name"]; - std::string rulesSource = (*config)["rulesSource"]; - std::string caps = OSUtils::jsonDump((*config)["capabilitles"], -1); - std::string rtraceLevel = std::to_string((int)(*config)["remoteTraceLevel"]); - std::string rules = OSUtils::jsonDump((*config)["rules"], -1); - std::string tags = OSUtils::jsonDump((*config)["tags"], -1); - std::string v4mode = OSUtils::jsonDump((*config)["v4AssignMode"],-1); - std::string v6mode = OSUtils::jsonDump((*config)["v6AssignMode"], -1); - std::string target = ""; - if (!(*config)["remoteTraceTarget"].is_null()) { - target = (*config)["remoteTraceTarget"]; - } try { - pqxx::work w(conn); - pqxx::result res = w.prepared("update_network")(id)(controllerId)(caps)((bool)(*config)["enableBroadcast"]) - (OSUtils::now())((int)(*config)["mtu"])((int)(*config)["multicastLimit"])(name)((bool)(*config)["private"]) - ((int)(*config)["remoteTraceLevel"])(target, !target.empty()) - (rules)(rulesSource)(tags)(v4mode)(v6mode).exec(); - w.commit(); - } catch (std::exception &e) { - fprintf(stderr, "Error updating network config: %s\n", e.what()); - delete config; - config = nullptr; - continue; - } + std::string id = (*config)["id"]; + std::string controllerId = _myAddressStr.c_str(); + std::string name = (*config)["name"]; + std::string remoteTraceTarget("NULL"); + if (!(*config)["remoteTraceTarget"].is_null()) { + remoteTraceTarget = (*config)["remoteTraceTarget"]; + } + std::string rulesSource = (*config)["rulesSource"]; + std::string caps = OSUtils::jsonDump((*config)["capabilitles"], -1); + std::string now = std::to_string(OSUtils::now()); + std::string mtu = std::to_string((int)(*config)["mtu"]); + std::string mcastLimit = std::to_string((int)(*config)["multicastLimit"]); + std::string rtraceLevel = std::to_string((int)(*config)["remoteTraceLevel"]); + std::string rules = OSUtils::jsonDump((*config)["rules"], -1); + std::string tags = OSUtils::jsonDump((*config)["tags"], -1); + std::string v4mode = OSUtils::jsonDump((*config)["v4AssignMode"],-1); + std::string v6mode = OSUtils::jsonDump((*config)["v6AssignMode"], -1); + bool enableBroadcast = (*config)["enableBroadcast"]; + bool isPrivate = (*config)["private"]; + + const char *values[16] = { + id.c_str(), + controllerId.c_str(), + caps.c_str(), + enableBroadcast ? "true" : "false", + now.c_str(), + mtu.c_str(), + mcastLimit.c_str(), + name.c_str(), + isPrivate ? "true" : "false", + rtraceLevel.c_str(), + (remoteTraceTarget == "NULL" ? NULL : remoteTraceTarget.c_str()), + rules.c_str(), + rulesSource.c_str(), + tags.c_str(), + v4mode.c_str(), + v6mode.c_str(), + }; + + PGresult *res = PQexecParams(conn, + "UPDATE ztc_network SET controller_id = $2, capabilities = $3, enable_broadcast = $4, " + "last_updated = $5, mtu = $6, multicast_limit = $7, name = $8, private = $9, " + "remote_trace_level = $10, remote_trace_target = $11, rules = $12, rules_source = $13, " + "tags = $14, v4_assign_mode = $15, v6_assign_mode = $16 " + "WHERE id = $1", + 16, + NULL, + values, + NULL, + NULL, + 0); + + if (PQresultStatus(res) != PGRES_COMMAND_OK) { + fprintf(stderr, "ERROR: Error updating network record: %s\n", PQresultErrorMessage(res)); + PQclear(res); + delete config; + config = nullptr; + continue; + } - try { - pqxx::work w(conn); - pqxx::result res = w.prepared("delete_network_ip_pool")(id).exec(); + PQclear(res); + + res = PQexec(conn, "BEGIN"); + if (PQresultStatus(res) != PGRES_COMMAND_OK) { + fprintf(stderr, "ERROR: Error beginnning transaction: %s\n", PQresultErrorMessage(res)); + PQclear(res); + delete config; + config = nullptr; + continue; + } + + PQclear(res); + + const char *params[1] = { + id.c_str() + }; + res = PQexecParams(conn, + "DELETE FROM ztc_network_assignment_pool WHERE network_id = $1", + 1, + NULL, + params, + NULL, + NULL, + 0); + if (PQresultStatus(res) != PGRES_COMMAND_OK) { + fprintf(stderr, "ERROR: Error updating assignment pool: %s\n", PQresultErrorMessage(res)); + PQclear(res); + PQclear(PQexec(conn, "ROLLBACK")); + delete config; + config = nullptr; + continue; + } + + PQclear(res); auto pool = (*config)["ipAssignmentPools"]; + bool err = false; for (auto i = pool.begin(); i != pool.end(); ++i) { std::string start = (*i)["ipRangeStart"]; std::string end = (*i)["ipRangeEnd"]; - - pqxx::result r2 = w.prepared("insert_nework_ip_pool")(id)(start)(end).exec(); + const char *p[3] = { + id.c_str(), + start.c_str(), + end.c_str() + }; + + res = PQexecParams(conn, + "INSERT INTO ztc_network_assignment_pool (network_id, ip_range_start, ip_range_end) " + "VALUES ($1, $2, $3)", + 3, + NULL, + p, + NULL, + NULL, + 0); + if (PQresultStatus(res) != PGRES_COMMAND_OK) { + fprintf(stderr, "ERROR: Error updating assignment pool: %s\n", PQresultErrorMessage(res)); + PQclear(res); + err = true; + break; + } + PQclear(res); + } + if (err) { + PQclear(PQexec(conn, "ROLLBACK")); + delete config; + config = nullptr; + continue; } + res = PQexecParams(conn, + "DELETE FROM ztc_network_route WHERE network_id = $1", + 1, + NULL, + params, + NULL, + NULL, + 0); + + if (PQresultStatus(res) != PGRES_COMMAND_OK) { + fprintf(stderr, "ERROR: Error updating routes: %s\n", PQresultErrorMessage(res)); + PQclear(res); + PQclear(PQexec(conn, "ROLLBACK")); + delete config; + config = nullptr; + continue; + } - pqxx::result res2 = w.prepared("delete_network_route")(id).exec(); auto routes = (*config)["routes"]; + err = false; for (auto i = routes.begin(); i != routes.end(); ++i) { std::string t = (*i)["target"]; std::vector<std::string> target; @@ -727,42 +1019,86 @@ void PostgreSQL::commitThread() continue; } std::string targetAddr = target[0]; - int targetBits = std::stoi(target[1]); - std::string via = ""; + std::string targetBits = target[1]; + std::string via = "NULL"; if (!(*i)["via"].is_null()) { via = (*i)["via"]; } - - pqxx::result res3 = w.prepared("insert_network_route")(id)(targetAddr)(targetBits) - (via, !via.empty()).exec(); + + const char *p[4] = { + id.c_str(), + targetAddr.c_str(), + targetBits.c_str(), + (via == "NULL" ? NULL : via.c_str()), + }; + + res = PQexecParams(conn, + "INSERT INTO ztc_network_route (network_id, address, bits, via) VALUES ($1, $2, $3, $4)", + 4, + NULL, + p, + NULL, + NULL, + 0); + + if (PQresultStatus(res) != PGRES_COMMAND_OK) { + fprintf(stderr, "ERROR: Error updating routes: %s\n", PQresultErrorMessage(res)); + PQclear(res); + err = true; + break; + } + PQclear(res); + } + if (err) { + PQclear(PQexec(conn, "ROLLBACK")); + delete config; + config = nullptr; + continue; } - w.commit(); + res = PQexec(conn, "COMMIT"); + if (PQresultStatus(res) != PGRES_COMMAND_OK) { + fprintf(stderr, "ERROR: Error committing network update: %s\n", PQresultErrorMessage(res)); + } + PQclear(res); - } catch (std::exception &e) { - fprintf(stderr, "Error updating network IP pool: %s\n", e.what()); - } + const uint64_t nwidInt = OSUtils::jsonIntHex((*config)["nwid"], 0ULL); + if (nwidInt) { + nlohmann::json nwOrig; + nlohmann::json nwNew(*config); - const uint64_t nwidInt = OSUtils::jsonIntHex((*config)["nwid"], 0ULL); - if (nwidInt) { - nlohmann::json nwOrig; - nlohmann::json nwNew(*config); + get(nwidInt, nwOrig); - get(nwidInt, nwOrig); + _networkChanged(nwOrig, nwNew, true); + } else { + fprintf(stderr, "Can't notify network changed: %lu\n", nwidInt); + } - _networkChanged(nwOrig, nwNew, true); - } else { - fprintf(stderr, "Can't notify network changed: %lu\n", nwidInt); + } catch (std::exception &e) { + fprintf(stderr, "ERROR: Error updating member: %s\n", e.what()); } } else if (objtype == "trace") { fprintf(stderr, "ERROR: Trace not yet implemented"); } else if (objtype == "_delete_network") { try { std::string networkId = (*config)["nwid"]; + const char *values[1] = { + networkId.c_str() + }; + PGresult * res = PQexecParams(conn, + "UPDATE ztc_network SET deleted = true WHERE id = $1", + 1, + NULL, + values, + NULL, + NULL, + 0); - pqxx::work w(conn); - pqxx::result res = w.prepared("delete_network")(networkId).exec(); - w.commit(); + if (PQresultStatus(res) != PGRES_COMMAND_OK) { + fprintf(stderr, "ERROR: Error deleting network: %s\n", PQresultErrorMessage(res)); + } + + PQclear(res); } catch (std::exception &e) { fprintf(stderr, "ERROR: Error deleting network: %s\n", e.what()); } @@ -771,9 +1107,25 @@ void PostgreSQL::commitThread() std::string memberId = (*config)["id"]; std::string networkId = (*config)["nwid"]; - pqxx::work w(conn); - pqxx::result res = w.prepared("delete_member")(memberId)(networkId).exec(); - w.commit(); + const char *values[2] = { + memberId.c_str(), + networkId.c_str() + }; + + PGresult *res = PQexecParams(conn, + "UPDATE ztc_member SET hidden = true, deleted = true WHERE id = $1 AND network_id = $2", + 2, + NULL, + values, + NULL, + NULL, + 0); + + if (PQresultStatus(res) != PGRES_COMMAND_OK) { + fprintf(stderr, "ERROR: Error deleting member: %s\n", PQresultErrorMessage(res)); + } + + PQclear(res); } catch (std::exception &e) { fprintf(stderr, "ERROR: Error deleting member: %s\n", e.what()); } @@ -786,15 +1138,18 @@ void PostgreSQL::commitThread() delete config; config = nullptr; - std::this_thread::sleep_for(std::chrono::milliseconds(1)); + std::this_thread::sleep_for(std::chrono::milliseconds(10)); } + + PQfinish(conn); } void PostgreSQL::onlineNotificationThread() { - pqxx::connection conn(_connString); - if(!conn.is_open()) { - fprintf(stderr, "Connection to database failed: onlineNotificationThread\n"); + PGconn *conn = PQconnectdb(_connString.c_str()); + if (PQstatus(conn) == CONNECTION_BAD) { + fprintf(stderr, "Connection to database failed: %s\n", PQerrorMessage(conn)); + PQfinish(conn); exit(1); } _connected = 1; @@ -803,6 +1158,13 @@ void PostgreSQL::onlineNotificationThread() std::unordered_map< std::pair<uint64_t,uint64_t>,int64_t,_PairHasher > lastOnlineCumulative; while (_run == 1) { + if (PQstatus(conn) != CONNECTION_OK) { + fprintf(stderr, "ERROR: Online Notification thread lost connection to Postgres."); + PQfinish(conn); + conn = PQconnectdb(_connString.c_str()); + continue; + } + // map used to send notifications to front end std::unordered_map<std::string, std::vector<std::string>> updateMap; @@ -812,144 +1174,158 @@ void PostgreSQL::onlineNotificationThread() lastOnline.swap(_lastOnline); } - try { - pqxx::work w(conn); - pqxx::pipeline p(w, "Member Update Pipeline"); - for (auto i=lastOnline.begin(); i != lastOnline.end(); ++i) { - uint64_t nwid_i = i->first.first; - char nwidTmp[64]; - char memTmp[64]; - char ipTmp[64]; - OSUtils::ztsnprintf(nwidTmp,sizeof(nwidTmp), "%.16llx", nwid_i); - OSUtils::ztsnprintf(memTmp,sizeof(memTmp), "%.10llx", i->first.second); - - auto found = _networks.find(nwid_i); - if (found == _networks.end()) { - continue; // skip members trying to join non-existant networks - } - - lastOnlineCumulative[i->first] = i->second.first; - + PGresult *res = NULL; - std::string networkId(nwidTmp); - std::string memberId(memTmp); + for (auto i=lastOnline.begin(); i != lastOnline.end(); ++i) { + uint64_t nwid_i = i->first.first; + char nwidTmp[64]; + char memTmp[64]; + char ipTmp[64]; + OSUtils::ztsnprintf(nwidTmp,sizeof(nwidTmp), "%.16llx", nwid_i); + OSUtils::ztsnprintf(memTmp,sizeof(memTmp), "%.10llx", i->first.second); - std::vector<std::string> &members = updateMap[networkId]; - members.push_back(memberId); + auto found = _networks.find(nwid_i); + if (found == _networks.end()) { + continue; // skip members trying to join non-existant networks + } - int64_t ts = i->second.first; - std::string ipAddr = i->second.second.toIpString(ipTmp); - std::string timestamp = std::to_string(ts); + lastOnlineCumulative[i->first] = i->second.first; + - std::stringstream ss; - ss << "INSERT INTO ztc_member_status (network_id, member_id, address, last_updated) VALUES (" - << "'" << w.esc(networkId) << "', " - << "'" << w.esc(memberId) << "', " - << "'" << w.esc(ipAddr) << "', " - << timestamp << ") " - << "ON CONFLICT (network_id, member_id) DO UPDATE SET address = EXCLUDED.address, last_updated = EXCLUDED.last_updated"; - p.insert(ss.str()); - } - p.complete(); - w.commit(); - } catch (std::exception &e) { - fprintf(stderr, "Error updating member status: %s\n", e.what()); + std::string networkId(nwidTmp); + std::string memberId(memTmp); + + std::vector<std::string> &members = updateMap[networkId]; + members.push_back(memberId); + + int64_t ts = i->second.first; + std::string ipAddr = i->second.second.toIpString(ipTmp); + std::string timestamp = std::to_string(ts); + + const char *values[4] = { + networkId.c_str(), + memberId.c_str(), + (ipAddr.empty() ? NULL : ipAddr.c_str()), + timestamp.c_str(), + }; + + res = PQexecParams(conn, + "INSERT INTO ztc_member_status (network_id, member_id, address, last_updated) VALUES ($1, $2, $3, TO_TIMESTAMP($4::double precision/1000)) " + "ON CONFLICT (network_id, member_id) DO UPDATE SET address = EXCLUDED.address, last_updated = EXCLUDED.last_updated", + 4, // number of parameters + NULL, // oid field. ignore + values, // values for substitution + NULL, // lengths in bytes of each value + NULL, + 0); + + if (PQresultStatus(res) != PGRES_COMMAND_OK) { + fprintf(stderr, "Error on Member Status upsert: %s\n", PQresultErrorMessage(res)); + PQclear(res); + continue; + } + PQclear(res); } - try { - const int64_t now = OSUtils::now(); - if ((now - lastUpdatedNetworkStatus) > 10000) { - pqxx::work w(conn); - pqxx::pipeline p(w, "Network Update Pipeline"); - lastUpdatedNetworkStatus = now; + const int64_t now = OSUtils::now(); + if ((now - lastUpdatedNetworkStatus) > 10000) { + lastUpdatedNetworkStatus = now; - std::vector<std::pair<uint64_t, std::shared_ptr<_Network>>> networks; - { - std::lock_guard<std::mutex> l(_networks_l); - for (auto i = _networks.begin(); i != _networks.end(); ++i) { - networks.push_back(*i); - } + std::vector<std::pair<uint64_t, std::shared_ptr<_Network>>> networks; + { + std::lock_guard<std::mutex> l(_networks_l); + for (auto i = _networks.begin(); i != _networks.end(); ++i) { + networks.push_back(*i); } + } + + for (auto i = networks.begin(); i != networks.end(); ++i) { + char tmp[64]; + Utils::hex(i->first, tmp); + + std::string networkId(tmp); - for (auto i = networks.begin(); i != networks.end(); ++i) { - char tmp[64]; - Utils::hex(i->first, tmp); - - std::string networkId(tmp); - - std::vector<std::string> &_notUsed = updateMap[networkId]; - (void)_notUsed; - - uint64_t authMemberCount = 0; - uint64_t totalMemberCount = 0; - uint64_t onlineMemberCount = 0; - uint64_t bridgeCount = 0; - uint64_t ts = now; - { - std::lock_guard<std::mutex> l2(i->second->lock); - authMemberCount = i->second->authorizedMembers.size(); - totalMemberCount = i->second->members.size(); - bridgeCount = i->second->activeBridgeMembers.size(); - for (auto m=i->second->members.begin(); m != i->second->members.end(); ++m) { - auto lo = lastOnlineCumulative.find(std::pair<uint64_t,uint64_t>(i->first, m->first)); - if (lo != lastOnlineCumulative.end()) { - if ((now - lo->second) <= (ZT_NETWORK_AUTOCONF_DELAY * 2)) { - ++onlineMemberCount; - } else { - lastOnlineCumulative.erase(lo); - } + std::vector<std::string> &_notUsed = updateMap[networkId]; + (void)_notUsed; + + uint64_t authMemberCount = 0; + uint64_t totalMemberCount = 0; + uint64_t onlineMemberCount = 0; + uint64_t bridgeCount = 0; + uint64_t ts = now; + { + std::lock_guard<std::mutex> l2(i->second->lock); + authMemberCount = i->second->authorizedMembers.size(); + totalMemberCount = i->second->members.size(); + bridgeCount = i->second->activeBridgeMembers.size(); + for (auto m=i->second->members.begin(); m != i->second->members.end(); ++m) { + auto lo = lastOnlineCumulative.find(std::pair<uint64_t,uint64_t>(i->first, m->first)); + if (lo != lastOnlineCumulative.end()) { + if ((now - lo->second) <= (ZT_NETWORK_AUTOCONF_DELAY * 2)) { + ++onlineMemberCount; + } else { + lastOnlineCumulative.erase(lo); } } } + } - std::string bc = std::to_string(bridgeCount); - std::string amc = std::to_string(authMemberCount); - std::string omc = std::to_string(onlineMemberCount); - std::string tmc = std::to_string(totalMemberCount); - std::string timestamp = std::to_string(ts); - - std::stringstream ss; - ss << "INSERT INTO ztc_network_status (network_id, bridge_count, authorized_member_count, " - << "online_member_count, total_member_count, last_modified) VALUES (" - << "'" << w.esc(networkId) << "', " - << bridgeCount << ", " - << authMemberCount << ", " - << onlineMemberCount << ", " - << totalMemberCount << ", " - << "TO_TIMESTAMP(" << ts << "::double precision/1000)) " - << "ON CONFLICT (network_id) DO UPDATE SET bridge_count = EXCLUDED.bridge_count, " - << "authorized_member_count = EXCLUDED.authorized_member_count, online_member_count = EXCLUDED.online_member_count, " - << "total_member_count = EXCLUDED.total_member_count, last_modified = EXCLUDED.last_modified"; - p.insert(ss.str()); + std::string bc = std::to_string(bridgeCount); + std::string amc = std::to_string(authMemberCount); + std::string omc = std::to_string(onlineMemberCount); + std::string tmc = std::to_string(totalMemberCount); + std::string timestamp = std::to_string(ts); + const char *values[6] = { + networkId.c_str(), + bc.c_str(), + amc.c_str(), + omc.c_str(), + tmc.c_str(), + timestamp.c_str() + }; + + res = PQexecParams(conn, "INSERT INTO ztc_network_status (network_id, bridge_count, authorized_member_count, " + "online_member_count, total_member_count, last_modified) VALUES ($1, $2, $3, $4, $5, TO_TIMESTAMP($6::double precision/1000)) " + "ON CONFLICT (network_id) DO UPDATE SET bridge_count = EXCLUDED.bridge_count, " + "authorized_member_count = EXCLUDED.authorized_member_count, online_member_count = EXCLUDED.online_member_count, " + "total_member_count = EXCLUDED.total_member_count, last_modified = EXCLUDED.last_modified", + 6, + NULL, + values, + NULL, + NULL, + 0); + + if (PQresultStatus(res) != PGRES_COMMAND_OK) { + fprintf(stderr, "ERROR: Error on Network Status upsert (onlineNotificationThread): %s\n", PQresultErrorMessage(res)); + PQclear(res); + continue; } - p.complete(); - w.commit(); + PQclear(res); } - } catch (std::exception &e) { - fprintf(stderr, "Error updating network status: %s\n", e.what()); } - try { - pqxx::work w(conn); - pqxx::pipeline p(w, "Notification Sender"); - for (auto it = updateMap.begin(); it != updateMap.end(); ++it) { - std::string networkId = it->first; - std::vector<std::string> members = it->second; - std::stringstream queryBuilder; + // for (auto it = updateMap.begin(); it != updateMap.end(); ++it) { + // std::string networkId = it->first; + // std::vector<std::string> members = it->second; + // std::stringstream queryBuilder; - std::string membersStr = ::join(members, ","); + // std::string membersStr = ::join(members, ","); - queryBuilder << "NOTIFY controller, '" << networkId << ":" << membersStr << "'"; - std::string query = queryBuilder.str(); + // queryBuilder << "NOTIFY controller, '" << networkId << ":" << membersStr << "'"; + // std::string query = queryBuilder.str(); - p.insert(query); - } - p.complete(); - w.commit(); - } catch (std::exception &e) { - fprintf(stderr, "Error notifying webapp: %s\n", e.what()); - } - std::this_thread::sleep_for(std::chrono::milliseconds(1)); + // PGresult *res = PQexec(conn,query.c_str()); + // if (PQresultStatus(res) != PGRES_COMMAND_OK) { + // fprintf(stderr, "ERROR: Error sending NOTIFY: %s\n", PQresultErrorMessage(res)); + // } + // PQclear(res); + // } + + std::this_thread::sleep_for(std::chrono::milliseconds(250)); } + fprintf(stderr, "%s: Fell out of run loop in onlineNotificationThread", _myAddressStr.c_str()); + PQfinish(conn); + exit(5); } #endif //ZT_CONTROLLER_USE_LIBPQ diff --git a/controller/PostgreSQL.hpp b/controller/PostgreSQL.hpp index 86aac80d..36fe8c9f 100644 --- a/controller/PostgreSQL.hpp +++ b/controller/PostgreSQL.hpp @@ -23,16 +23,12 @@ #include "DB.hpp" -#include <pqxx/pqxx> - extern "C" { typedef struct pg_conn PGconn; } namespace ZeroTier { -class _MemberNotificationReceiver; -class _NetworkNotificationReceiver; /** * A controller database driver that talks to PostgreSQL @@ -60,8 +56,8 @@ protected: }; private: - void initializeNetworks(pqxx::connection &conn); - void initializeMembers(pqxx::connection &conn); + void initializeNetworks(PGconn *conn); + void initializeMembers(PGconn *conn); void heartbeat(); void membersDbWatcher(); void networksDbWatcher(); @@ -85,9 +81,6 @@ private: mutable std::mutex _readyLock; std::atomic<int> _ready, _connected, _run; mutable volatile bool _waitNoticePrinted; - - friend class _MemberNotificationReceiver; - friend class _NetworkNotificationReceiver; }; } diff --git a/make-linux.mk b/make-linux.mk index f5c86aeb..66a6376a 100644 --- a/make-linux.mk +++ b/make-linux.mk @@ -319,7 +319,7 @@ official: FORCE make -j4 ZT_OFFICIAL=1 all central-controller: FORCE - make -j4 LDLIBS="-L/usr/pgsql-10/lib/ -lpqxx -lpq" CXXFLAGS="-I/usr/pgsql-10/include -fPIC" DEFS="-DZT_CONTROLLER_USE_LIBPQ" ZT_OFFICIAL=1 ZT_USE_X64_ASM_ED25519=1 one + make -j4 LDLIBS="-L/usr/pgsql-10/lib/ -lpq" CXXFLAGS="-I/usr/pgsql-10/include -fPIC" DEFS="-DZT_CONTROLLER_USE_LIBPQ" ZT_OFFICIAL=1 ZT_USE_X64_ASM_ED25519=1 one central-controller-docker: central-controller docker build -t gcr.io/zerotier-central/ztcentral-controller:${TIMESTAMP} -f docker/Dockerfile . diff --git a/update_controllers.sh b/update_controllers.sh index 8d50c5bf..5e11434d 100755 --- a/update_controllers.sh +++ b/update_controllers.sh @@ -8,7 +8,7 @@ fi TAG=$1 -CONTROLLERS=( 12ac4a1e71 159924d630 17d709436c 1c33c1ced0 1d71939404 1d71939404 565799d8f6 6ab565387a 8056c2e21c 8850338390 8bd5124fd6 93afae5963 9bee8941b5 9f77fc393e a09acf0233 a84ac5c10a abfd31bd47 af78bf9436 c7c8172af1 d5e5fb6537 e4da7455b2 e5cd7a9e1c ea9349aa9c ) +CONTROLLERS=`kubectl get pods -o=name | grep controller | sed "s/^.\{4\}//" | cut -d '-' -f 2` for c in ${CONTROLLERS[@]} do |