diff options
Diffstat (limited to 'controller')
| -rw-r--r-- | controller/PostgreSQL.cpp | 1368 | ||||
| -rw-r--r-- | controller/PostgreSQL.hpp | 11 | 
2 files changed, 473 insertions, 906 deletions
| diff --git a/controller/PostgreSQL.cpp b/controller/PostgreSQL.cpp index ea04f551..1b97042c 100644 --- a/controller/PostgreSQL.cpp +++ b/controller/PostgreSQL.cpp @@ -22,7 +22,7 @@  #include "EmbeddedNetworkController.hpp"  #include "../version.h" -#include <libpq-fe.h> +#include <pqxx/pqxx>  #include <sstream>  using json = nlohmann::json; @@ -62,6 +62,67 @@ std::string join(const std::vector<std::string> &elements, const char * const se  } +namespace ZeroTier { + +class _MemberNotificationReceiver : public pqxx::notification_receiver +{ +private: +	ZeroTier::PostgreSQL *_pgsql; + +public: +	_MemberNotificationReceiver(pqxx::connection_base &c, const std::string &channel, ZeroTier::PostgreSQL *pgsql) +		: pqxx::notification_receiver(c, channel) +		, _pgsql(pgsql) +	{} + +	virtual void operator()(const std::string &payload, int backend_pid) +	{ +		try { +			json tmp(json::parse(payload)); +			json &ov = tmp["old_val"]; +			json &nv = tmp["new_val"]; +			json oldConfig, newConfig; +			if (ov.is_object()) oldConfig = ov; +			if (nv.is_object()) newConfig = nv; +			if (oldConfig.is_object() || newConfig.is_object()) { +				_pgsql->_memberChanged(oldConfig,newConfig,_pgsql->isReady()); +			} +		} catch (std::exception &e) { +			fprintf(stderr, "Exception parsing member notification: %s\n", e.what()); +		} +	} +}; + +class _NetworkNotificationReceiver : public pqxx::notification_receiver +{ +private: +	ZeroTier::PostgreSQL *_pgsql; +public: +	_NetworkNotificationReceiver(pqxx::connection_base &c, const std::string &channel, ZeroTier::PostgreSQL *pgsql) +		: pqxx::notification_receiver(c, channel) +		, _pgsql(pgsql) +	{} + +	virtual void operator()(const std::string &payload, int backend_pid) +	{ +		try { +			json tmp(json::parse(payload)); +			json &ov = tmp["old_val"]; +			json &nv = tmp["new_val"]; +			json oldConfig, newConfig; +			if (ov.is_object()) oldConfig = ov; +			if (nv.is_object()) newConfig = nv; +			if (oldConfig.is_object()||newConfig.is_object()) { +				_pgsql->_networkChanged(oldConfig,newConfig,_pgsql->isReady()); +			} +		} catch (std::exception &e) { +			fprintf(stderr, "Exception parsing member notification: %s\n", e.what()); +		} // ignore bad records +	} +}; + +} // namespace ZeroTier +  using namespace ZeroTier;  PostgreSQL::PostgreSQL(EmbeddedNetworkController *const nc, const Identity &myId, const char *path) @@ -71,6 +132,7 @@ PostgreSQL::PostgreSQL(EmbeddedNetworkController *const nc, const Identity &myId      , _run(1)      , _waitNoticePrinted(false)  { +	fprintf(stderr, "PostgreSQL Constructed");  	_connString = std::string(path);  	_readyLock.lock(); @@ -173,157 +235,101 @@ void PostgreSQL::nodeIsOnline(const uint64_t networkId, const uint64_t memberId,  	}  } -void PostgreSQL::initializeNetworks(PGconn *conn) +void PostgreSQL::initializeNetworks(pqxx::connection &conn)  {  	try { -		if (PQstatus(conn) != CONNECTION_OK) { -			fprintf(stderr, "Bad Database Connection: %s", PQerrorMessage(conn)); +		if (!conn.is_open()) { +			fprintf(stderr, "Bad Database Connection in initializeNetworks\n");  			exit(1);  		} -		const char *params[1] = { -			_myAddressStr.c_str() -		}; +		pqxx::work w(conn); -		PGresult *res = PQexecParams(conn, "SELECT id, EXTRACT(EPOCH FROM creation_time AT TIME ZONE 'UTC')*1000, capabilities, " -			"enable_broadcast, EXTRACT(EPOCH FROM last_modified AT TIME ZONE 'UTC')*1000, mtu, multicast_limit, name, private, remote_trace_level, " +		pqxx::result r = w.exec("SELECT id, EXTRACT(EPOCH FROM creation_time AT TIME ZONE 'UTC')*1000 AS creation_time, capabilities, " +			"enable_broadcast, EXTRACT(EPOCH FROM last_modified AT TIME ZONE 'UTC')*1000 AS last_modified, mtu, multicast_limit, name, private, remote_trace_level, "  			"remote_trace_target, revision, rules, tags, v4_assign_mode, v6_assign_mode FROM ztc_network " -			"WHERE deleted = false AND controller_id = $1", -			1, -			NULL, -			params, -			NULL, -			NULL, -			0); -		 -		if (PQresultStatus(res) != PGRES_TUPLES_OK) { -			fprintf(stderr, "Networks Initialization Failed: %s", PQerrorMessage(conn)); -			PQclear(res); -			exit(1); -		} +			"WHERE deleted = false AND controller_id = '" + w.esc(_myAddressStr) + "'"); -		int numRows = PQntuples(res); -		for (int i = 0; i < numRows; ++i) { + +		for (pqxx::result::const_iterator row = r.begin(); row != r.end(); ++row) {  			json empty;  			json config; -			const char *nwidparam[1] = { -				PQgetvalue(res, i, 0) -			}; - -			config["id"] = PQgetvalue(res, i, 0); -			config["nwid"] = PQgetvalue(res, i, 0); +			std::string nwid = row["id"].as<std::string>(); +			config["id"] = nwid; +			config["nwid"] = nwid;  			try { -				config["creationTime"] = std::stoull(PQgetvalue(res, i, 1)); -			} catch (std::exception &e) { +				config["creationTime"] = row["creation_time"].as<uint64_t>(); +			} catch(std::exception &e) {  				config["creationTime"] = 0ULL; -				//fprintf(stderr, "Error converting creation time: %s\n", PQgetvalue(res, i, 1));  			} -			config["capabilities"] = json::parse(PQgetvalue(res, i, 2)); -			config["enableBroadcast"] = (strcmp(PQgetvalue(res, i, 3),"t")==0); +			config["capabilities"] = json::parse(row["capabilities"].as<std::string>()); +			config["enableBroadcast"] = row["enable_broadcast"].as<bool>();  			try { -				config["lastModified"] = std::stoull(PQgetvalue(res, i, 4)); +				config["lastModified"] = row["last_modified"].as<uint64_t>();  			} catch (std::exception &e) {  				config["lastModified"] = 0ULL; -				//fprintf(stderr, "Error converting last modified: %s\n", PQgetvalue(res, i, 4));  			}  			try { -				config["mtu"] = std::stoi(PQgetvalue(res, i, 5)); +				config["mtu"] = row["mtu"].as<int>();  			} catch (std::exception &e) {  				config["mtu"] = 2800;  			}  			try { -				config["multicastLimit"] = std::stoi(PQgetvalue(res, i, 6)); +				config["multicastLimit"] = row["multicast_limit"].as<int>();  			} catch (std::exception &e) {  				config["multicastLimit"] = 64;  			} -			config["name"] = PQgetvalue(res, i, 7); -			config["private"] = (strcmp(PQgetvalue(res, i, 8),"t")==0); +			config["name"] = row["name"].as<std::string>(); +			config["private"] = row["private"].as<bool>();  			try { -				config["remoteTraceLevel"] = std::stoi(PQgetvalue(res, i, 9)); +				config["remoteTraceLevel"] = row["remote_trace_level"].as<int>();  			} catch (std::exception &e) {  				config["remoteTraceLevel"] = 0;  			} -			config["remoteTraceTarget"] = PQgetvalue(res, i, 10); +			config["remoteTraceTarget"] = (row["remote_trace_target"].is_null() ? nullptr : row["remote_trace_target"].as<std::string>());  			try { -				config["revision"] = std::stoull(PQgetvalue(res, i, 11)); +				config["revision"] = row["revision"].as<uint64_t>();  			} catch (std::exception &e) {  				config["revision"] = 0ULL;  				//fprintf(stderr, "Error converting revision: %s\n", PQgetvalue(res, i, 11));  			} -			config["rules"] = json::parse(PQgetvalue(res, i, 12)); -			config["tags"] = json::parse(PQgetvalue(res, i, 13)); -			config["v4AssignMode"] = json::parse(PQgetvalue(res, i, 14)); -			config["v6AssignMode"] = json::parse(PQgetvalue(res, i, 15)); +			config["rules"] = json::parse(row["rules"].as<std::string>()); +			config["tags"] = json::parse(row["tags"].as<std::string>()); +			config["v4AssignMode"] = json::parse(row["v4_assign_mode"].as<std::string>()); +			config["v6AssignMode"] = json::parse(row["v6_assign_mode"].as<std::string>());  			config["objtype"] = "network";  			config["ipAssignmentPools"] = json::array();  			config["routes"] = json::array(); -			PGresult *r2 = PQexecParams(conn, -				"SELECT host(ip_range_start), host(ip_range_end) FROM ztc_network_assignment_pool WHERE network_id = $1", -				1, -				NULL, -				nwidparam, -				NULL, -				NULL, -				0); -			 -			if (PQresultStatus(r2) != PGRES_TUPLES_OK) { -				fprintf(stderr, "ERROR: Error retreiving IP pools for network: %s\n", PQresultErrorMessage(r2)); -				PQclear(r2); -				PQclear(res); -				exit(1); -			} - -			int n = PQntuples(r2); -			for (int j = 0; j < n; ++j) { +			pqxx::work w2(conn); +			pqxx::result res2 = w2.exec("SELECT host(ip_range_start) AS ip_range_start, host(ip_range_end) AS ip_range_end FROM ztc_network_assignment_pool WHERE network_id = '" + w2.esc(nwid) + "'"); +			for(pqxx::result::const_iterator it = res2.begin(); it != res2.end(); ++it) {  				json ip; -				ip["ipRangeStart"] = PQgetvalue(r2, j, 0); -				ip["ipRangeEnd"] = PQgetvalue(r2, j, 1); - +				ip["ipRangeStart"] = it["ip_range_start"].as<std::string>(); +				ip["ipRangeEnd"] = it["ip_range_end"].as<std::string>();  				config["ipAssignmentPools"].push_back(ip);  			} +			w2.commit(); -			PQclear(r2); - -			r2 = PQexecParams(conn, -				"SELECT host(address), bits, host(via) FROM ztc_network_route WHERE network_id = $1", -				1, -				NULL, -				nwidparam, -				NULL, -				NULL, -				0); - -			if (PQresultStatus(r2) != PGRES_TUPLES_OK) { -				fprintf(stderr, "ERROR: Error retreiving routes for network: %s\n", PQresultErrorMessage(r2)); -				PQclear(r2); -				PQclear(res); -				exit(1); -			} - -			n = PQntuples(r2); -			for (int j = 0; j < n; ++j) { -				std::string addr = PQgetvalue(r2, j, 0); -				std::string bits = PQgetvalue(r2, j, 1); -				std::string via = PQgetvalue(r2, j, 2); +			pqxx::work w3(conn); +			pqxx::result res3 = w3.exec("SELECT host(address) AS address, bits, host(via) AS via FROM ztc_network_route WHERE network_id = '" + w3.esc(nwid) + "'"); +			for(pqxx::result::const_iterator it = res3.begin(); it != res3.end(); ++it) {  				json route; -				route["target"] = addr + "/" + bits; - -				if (via == "NULL") { +				route["target"] = it["address"].as<std::string>() + "/" + it["bits"].as<std::string>(); +				if (route["via"].is_null()) {  					route["via"] = nullptr;  				} else { -					route["via"] = via; +					route["via"] = it["via"].as<std::string>();  				}  				config["routes"].push_back(route);  			} +			w3.commit(); -			PQclear(r2); -			  			_networkChanged(empty, config, false);  		} -		PQclear(res); +		w.commit();  		if (++this->_ready == 2) {  			if (_waitNoticePrinted) { @@ -337,147 +343,115 @@ void PostgreSQL::initializeNetworks(PGconn *conn)  	}  } -void PostgreSQL::initializeMembers(PGconn *conn) +void PostgreSQL::initializeMembers(pqxx::connection &conn)  {  	try { -		if (PQstatus(conn) != CONNECTION_OK) { -			fprintf(stderr, "Bad Database Connection: %s", PQerrorMessage(conn)); +		if (!conn.is_open()) { +			fprintf(stderr, "Bad Database Connection in initializeMembers\n");  			exit(1);  		} -		const char *params[1] = { -			_myAddressStr.c_str() -		}; - -		PGresult *res = PQexecParams(conn, -			"SELECT m.id, m.network_id, m.active_bridge, m.authorized, m.capabilities, EXTRACT(EPOCH FROM m.creation_time AT TIME ZONE 'UTC')*1000, m.identity, " -			"	EXTRACT(EPOCH FROM m.last_authorized_time AT TIME ZONE 'UTC')*1000, " -			"	EXTRACT(EPOCH FROM m.last_deauthorized_time AT TIME ZONE 'UTC')*1000, " -			"	m.remote_trace_level, m.remote_trace_target, m.tags, m.v_major, m.v_minor, m.v_rev, m.v_proto, " -			"	m.no_auto_assign_ips, m.revision " +		pqxx::work w(conn); +		pqxx::result res = w.exec( +			"SELECT m.id AS id, m.network_id AS network_id, m.active_bridge AS active_bridge, " +			"   m.authorized AS authorized, m.capabilities AS capabilities, " +			"   EXTRACT(EPOCH FROM m.creation_time AT TIME ZONE 'UTC')*1000 AS creation_time, m.identity AS identity, " +			"	EXTRACT(EPOCH FROM m.last_authorized_time AT TIME ZONE 'UTC')*1000 AS last_authorized_time, " +			"	EXTRACT(EPOCH FROM m.last_deauthorized_time AT TIME ZONE 'UTC')*1000 AS last_deauthorized_time, " +			"	m.remote_trace_level AS remote_trace_level, m.remote_trace_target AS remote_trace_target, " +			"   m.tags AS tags, m.v_major AS v_major, m.v_minor AS v_minor, m.v_rev AS v_rev, " +			"   m.v_proto AS v_proto, m.no_auto_assign_ips AS no_auto_assign_ips, m.revision AS revision"  			"FROM ztc_member m "  			"INNER JOIN ztc_network n "  			"	ON n.id = m.network_id " -			"WHERE n.controller_id = $1 AND m.deleted = false", -			1, -			NULL, -			params, -			NULL, -			NULL, -			0); - -		if (PQresultStatus(res) != PGRES_TUPLES_OK) { -			fprintf(stderr, "Member Initialization Failed: %s", PQerrorMessage(conn)); -			PQclear(res); -			exit(1); -		} +			"WHERE n.controller_id = '" + w.esc(_myAddressStr) + "' AND m.deleted = false" +		); -		int numRows = PQntuples(res); -		for (int i = 0; i < numRows; ++i) { +		for(pqxx::result::const_iterator row = res.begin(); row != res.end(); ++row) {  			json empty;  			json config; -			std::string memberId(PQgetvalue(res, i, 0)); -			std::string networkId(PQgetvalue(res, i, 1)); -			std::string ctime = PQgetvalue(res, i, 5); +			std::string networkId = row["network_id"].as<std::string>(); +			std::string memberId = row["id"].as<std::string>(); +  			config["id"] = memberId;  			config["nwid"] = networkId; -			config["activeBridge"] = (strcmp(PQgetvalue(res, i, 2), "t") == 0); -			config["authorized"] = (strcmp(PQgetvalue(res, i, 3), "t") == 0); +			config["activeBridge"] = row["active_bridge"].as<bool>(); +			config["authorized"] = row["authorized"].as<bool>();  			try { -				config["capabilities"] = json::parse(PQgetvalue(res, i, 4)); -			} catch (std::exception &e) { +				config["capabilities"] = json::parse(row["capabilities"].as<std::string>()); +			} catch(std::exception &e) {  				config["capabilities"] = json::array();  			}  			try { -				config["creationTime"] = std::stoull(PQgetvalue(res, i, 5)); -			} catch (std::exception &e) { +				config["creationTime"] = row["creation_time"].as<uint64_t>(); +			} catch(std::exception &e) {  				config["creationTime"] = 0ULL; -				//fprintf(stderr, "Error upding creation time (member): %s\n", PQgetvalue(res, i, 5));  			} -			config["identity"] = PQgetvalue(res, i, 6); +			config["identity"] = row["identity"].as<std::string>();  			try { -				config["lastAuthorizedTime"] = std::stoull(PQgetvalue(res, i, 7)); +				config["lastAuthorizedTime"] = row["last_authorized_time"].as<uint64_t>();  			} catch(std::exception &e) {  				config["lastAuthorizedTime"] = 0ULL; -				//fprintf(stderr, "Error updating last auth time (member): %s\n", PQgetvalue(res, i, 7));  			}  			try { -				config["lastDeauthorizedTime"] = std::stoull(PQgetvalue(res, i, 8)); -			} catch( std::exception &e) { +				config["lastDeauthorizedTime"] = row["last_deauthorized_time"].as<uint64_t>(); +			} catch(std::exception &e) {  				config["lastDeauthorizedTime"] = 0ULL; -				//fprintf(stderr, "Error updating last deauth time (member): %s\n", PQgetvalue(res, i, 8));  			}  			try { -				config["remoteTraceLevel"] = std::stoi(PQgetvalue(res, i, 9)); -			} catch (std::exception &e) { +				config["remoteTraceLevel"] = row["remote_trace_level"].as<int>(); +			} catch(std::exception &e) {  				config["remoteTraceLevel"] = 0;  			} -			config["remoteTraceTarget"] = PQgetvalue(res, i, 10); +			config["remoteTraceTarget"] = (row["remote_trace_target"].is_null() ? nullptr : row["remote_trace_target"].as<std::string>());  			try { -				config["tags"] = json::parse(PQgetvalue(res, i, 11)); -			} catch (std::exception &e) { +				config["tags"] = json::parse(row["tags"].as<std::string>()); +			} catch(std::exception &e) {  				config["tags"] = json::array();  			}  			try { -				config["vMajor"] = std::stoi(PQgetvalue(res, i, 12)); +				config["vMajor"] = row["v_major"].as<int>();  			} catch(std::exception &e) {  				config["vMajor"] = -1;  			}  			try { -				config["vMinor"] = std::stoi(PQgetvalue(res, i, 13)); +				config["vMinor"] = row["v_minor"].as<int>();  			} catch (std::exception &e) {  				config["vMinor"] = -1;  			}  			try { -				config["vRev"] = std::stoi(PQgetvalue(res, i, 14)); +				config["vRev"] = row["v_rev"].as<int>();  			} catch (std::exception &e) {  				config["vRev"] = -1;  			}  			try { -				config["vProto"] = std::stoi(PQgetvalue(res, i, 15)); +				config["vProto"] = row["v_proto"].as<int>();  			} catch (std::exception &e) {  				config["vProto"] = -1;  			} -			config["noAutoAssignIps"] = (strcmp(PQgetvalue(res, i, 16), "t") == 0); +			config["noAutoAssignIps"] = row["no_auto_assign_ips"].as<bool>();  			try { -				config["revision"] = std::stoull(PQgetvalue(res, i, 17)); +				config["revision"] = row["revision"].as<uint64_t>();  			} catch (std::exception &e) {  				config["revision"] = 0ULL;  				//fprintf(stderr, "Error updating revision (member): %s\n", PQgetvalue(res, i, 17));  			}  			config["objtype"] = "member";  			config["ipAssignments"] = json::array(); -			const char *p2[2] = { -				memberId.c_str(), -				networkId.c_str() -			}; - -			PGresult *r2 = PQexecParams(conn, -				"SELECT address FROM ztc_member_ip_assignment WHERE member_id = $1 AND network_id = $2", -				2, -				NULL, -				p2, -				NULL, -				NULL, -				0); - -			if (PQresultStatus(r2) != PGRES_TUPLES_OK) { -				fprintf(stderr, "Member Initialization Failed: %s", PQerrorMessage(conn)); -				PQclear(r2); -				PQclear(res); -				exit(1); -			} -			int n = PQntuples(r2); -			for (int j = 0; j < n; ++j) { -				config["ipAssignments"].push_back(PQgetvalue(r2, j, 0)); +			pqxx::work w2(conn); +			pqxx::result r2 = w2.exec( +				"SELECT address FROM ztc_member_ip_assignment WHERE member_id = '"+w2.esc(memberId)+"' AND network_id = '"+w2.esc(networkId)+"'" +			); +			for(pqxx::result::const_iterator it = r2.begin(); it != r2.end(); ++it) { +				config["ipAssignments"].push_back(it["address"].as<std::string>());  			} +			w2.commit();  			_memberChanged(empty, config, false);  		} - -		PQclear(res); +		w.commit();  		if (++this->_ready == 2) {  			if (_waitNoticePrinted) { @@ -510,497 +484,237 @@ void PostgreSQL::heartbeat()  	const char *publicIdentity = publicId;  	const char *hostname = hostnameTmp; -	PGconn *conn = PQconnectdb(_path.c_str()); -	if (PQstatus(conn) == CONNECTION_BAD) { -		fprintf(stderr, "Connection to database failed: %s\n", PQerrorMessage(conn)); -		PQfinish(conn); +	fprintf(stderr, "Heartbeat connection opening"); +	pqxx::connection conn(_connString); +	if (!conn.is_open()) { +		fprintf(stderr, "Connection to database failed: heartbeat\n");  		exit(1);  	} +	fprintf(stderr, "Heartbeat connection opened"); +	conn.prepare("heartbeat",  +		"INSERT INTO ztc_controller (id, cluster_host, last_alive, public_identity, v_major, v_minor, v_rev, v_build) "  +		"VALUES ($1, $2, TO_TIMESTAMP($3::double precision/1000), $4, $5, $6, $7, $8) " +		"ON CONFLICT (id) DO UPDATE SET cluster_host = EXCLUDED.cluster_host, last_alive = EXCLUDED.last_alive, " +		"public_identity = EXCLUDED.public_identity, v_major = EXCLUDED.v_major, v_minor = EXCLUDED.v_minor, " +		"v_rev = EXCLUDED.v_rev, v_build = EXCLUDED.v_rev" +	); +  	while (_run == 1) { -		if(PQstatus(conn) != CONNECTION_OK) { -			PQfinish(conn); -			conn = PQconnectdb(_path.c_str()); -		} -		if (conn) { -			std::string major = std::to_string(ZEROTIER_ONE_VERSION_MAJOR); -			std::string minor = std::to_string(ZEROTIER_ONE_VERSION_MINOR); -			std::string rev = std::to_string(ZEROTIER_ONE_VERSION_REVISION); -			std::string build = std::to_string(ZEROTIER_ONE_VERSION_BUILD); -			std::string now = std::to_string(OSUtils::now()); -			const char *values[8] = { -				controllerId, -				hostname, -				now.c_str(), -				publicIdentity, -				major.c_str(), -				minor.c_str(), -				rev.c_str(), -				build.c_str() -			}; - -			PGresult *res = PQexecParams(conn, -				"INSERT INTO ztc_controller (id, cluster_host, last_alive, public_identity, v_major, v_minor, v_rev, v_build) "  -				"VALUES ($1, $2, TO_TIMESTAMP($3::double precision/1000), $4, $5, $6, $7, $8) " -				"ON CONFLICT (id) DO UPDATE SET cluster_host = EXCLUDED.cluster_host, last_alive = EXCLUDED.last_alive, " -				"public_identity = EXCLUDED.public_identity, v_major = EXCLUDED.v_major, v_minor = EXCLUDED.v_minor, " -				"v_rev = EXCLUDED.v_rev, v_build = EXCLUDED.v_rev", -				8,       // number of parameters -				NULL,    // oid field.   ignore -				values,  // values for substitution -				NULL, // lengths in bytes of each value -				NULL,  // binary? -				0); - -			if (PQresultStatus(res) != PGRES_COMMAND_OK) { -				fprintf(stderr, "Heartbeat Update Failed: %s\n", PQresultErrorMessage(res)); -			} -			PQclear(res); +		try { +			pqxx::work w(conn); +			pqxx::result res = w.prepared("heartbeat")(controllerId)(hostname) +				(OSUtils::now())(publicIdentity)(ZEROTIER_ONE_VERSION_MAJOR) +				(ZEROTIER_ONE_VERSION_MINOR)(ZEROTIER_ONE_VERSION_REVISION) +				(ZEROTIER_ONE_VERSION_BUILD).exec(); +			w.commit(); +		} catch (std::exception &e) { +			fprintf(stderr, "Error inserting heartbeat: %s\n", e.what()); +			exit(1);  		} -  		std::this_thread::sleep_for(std::chrono::milliseconds(1000));  	} - -	PQfinish(conn); -	conn = NULL;  }  void PostgreSQL::membersDbWatcher()  { -	PGconn *conn = PQconnectdb(_path.c_str()); -	if (PQstatus(conn) == CONNECTION_BAD) { -		fprintf(stderr, "Connection to database failed: %s\n", PQerrorMessage(conn)); -		PQfinish(conn); -		exit(1); -	} - -	initializeMembers(conn); - -	char buf[11] = {0}; -	std::string cmd = "LISTEN member_" + std::string(_myAddress.toString(buf)); -	PGresult *res = PQexec(conn, cmd.c_str()); -	if (!res || PQresultStatus(res) != PGRES_COMMAND_OK) { -		fprintf(stderr, "LISTEN command failed: %s\n", PQresultErrorMessage(res)); -		PQclear(res); -		PQfinish(conn); -		exit(1); -	} - -	PQclear(res); res = NULL; - -	while(_run == 1) { -		if (PQstatus(conn) != CONNECTION_OK) { -			fprintf(stderr, "ERROR: Member Watcher lost connection to Postgres."); -			exit(-1); +	try { +		pqxx::connection conn(_connString); +		if (!conn.is_open()) { +			fprintf(stderr, "Connection to database failed: membersDbWatcher\n"); +			exit(1);  		} -		PGnotify *notify = NULL; -		PQconsumeInput(conn); -		while ((notify = PQnotifies(conn)) != NULL) { -			//fprintf(stderr, "ASYNC NOTIFY of '%s' id:%s received\n", notify->relname, notify->extra); -			try { -				json tmp(json::parse(notify->extra)); -				json &ov = tmp["old_val"]; -				json &nv = tmp["new_val"]; -				json oldConfig, newConfig; -				if (ov.is_object()) oldConfig = ov; -				if (nv.is_object()) newConfig = nv; -				if (oldConfig.is_object() || newConfig.is_object()) { -					_memberChanged(oldConfig,newConfig,(this->_ready>=2)); -				} -			} catch (...) {} // ignore bad records +		initializeMembers(conn); -			free(notify); +		char buf[11] = {0}; +		std::string cmd = "member_" + std::string(_myAddress.toString(buf)); +		_MemberNotificationReceiver receiver(conn, cmd, this); +		while(_run == 1) { +			conn.await_notification(5, 0); +			std::this_thread::sleep_for(std::chrono::milliseconds(10));  		} -		std::this_thread::sleep_for(std::chrono::milliseconds(10)); +		conn.disconnect(); +	} catch (std::exception &e) { +		fprintf(stderr, "Exception in membersDbWatcher: %s\n", e.what()); +		exit(1);  	} -	PQfinish(conn); -	conn = NULL;  }  void PostgreSQL::networksDbWatcher()  { -	PGconn *conn = PQconnectdb(_path.c_str()); -	if (PQstatus(conn) == CONNECTION_BAD) { -		fprintf(stderr, "Connection to database failed: %s\n", PQerrorMessage(conn)); -		PQfinish(conn); -		exit(1); -	} - -	initializeNetworks(conn); - -	char buf[11] = {0}; -	std::string cmd = "LISTEN network_" + std::string(_myAddress.toString(buf)); -	PGresult *res = PQexec(conn, cmd.c_str()); -	if (!res || PQresultStatus(res) != PGRES_COMMAND_OK) { -		fprintf(stderr, "LISTEN command failed: %s\n", PQresultErrorMessage(res)); -		PQclear(res); -		PQfinish(conn); -		exit(1); -	} +	try { +		pqxx::connection conn(_connString); +		if (!conn.is_open()) { +			fprintf(stderr, "Connection to database failed: networksDbWatcher\n"); +			exit(1); +		} -	PQclear(res); res = NULL; +		initializeNetworks(conn); -	while(_run == 1) { -		if (PQstatus(conn) != CONNECTION_OK) { -			fprintf(stderr, "ERROR: Network Watcher lost connection to Postgres."); -			exit(-1); -		} -		PGnotify *notify = NULL; -		PQconsumeInput(conn); -		while ((notify = PQnotifies(conn)) != NULL) { -			//fprintf(stderr, "ASYNC NOTIFY of '%s' id:%s received\n", notify->relname, notify->extra); -			try { -				json tmp(json::parse(notify->extra)); -				json &ov = tmp["old_val"]; -				json &nv = tmp["new_val"]; -				json oldConfig, newConfig; -				if (ov.is_object()) oldConfig = ov; -				if (nv.is_object()) newConfig = nv; -				if (oldConfig.is_object()||newConfig.is_object()) { -					_networkChanged(oldConfig,newConfig,(this->_ready >= 2)); -				} -			} catch (...) {} // ignore bad records -			free(notify); +		char buf[11] = {0}; +		std::string cmd = "network_" + std::string(_myAddress.toString(buf)); +		_NetworkNotificationReceiver receiver(conn, cmd, this); +		while(_run == 1) { +			conn.await_notification(5, 0); +			std::this_thread::sleep_for(std::chrono::milliseconds(10));  		} -		std::this_thread::sleep_for(std::chrono::milliseconds(10)); +	} catch(std::exception &e) { +		fprintf(stderr, "Exception in networksDbWatcher: %s\n", e.what()); +		exit(1);  	} -	PQfinish(conn); -	conn = NULL;  }  void PostgreSQL::commitThread()  { -	PGconn *conn = PQconnectdb(_path.c_str()); -	if (PQstatus(conn) == CONNECTION_BAD) { -		fprintf(stderr, "ERROR: Connection to database failed: %s\n", PQerrorMessage(conn)); -		PQfinish(conn); +	pqxx::connection conn(_connString); +	if (!conn.is_open()) { +		fprintf(stderr, "ERROR: Connection to database failed: commitThread\n");  		exit(1);  	} +	conn.prepare("insert_member", +		"INSERT INTO ztc_member (id, network_id, active_bridge, authorized, capabilities, " +		"identity, last_authorized_time, last_deauthorized_time, no_auto_assign_ips, " +		"remote_trace_level, remote_trace_target, revision, tags, v_major, v_minor, v_rev, v_proto) " +		"VALUES ($1, $2, $3, $4, $5, $6, " +		"TO_TIMESTAMP($7::double precision/1000), TO_TIMESTAMP($8::double precision/1000), " +		"$9, $10, (CASE WHEN $11='' THEN NULL ELSE $1 END), $12, $13, $14, $15, $16, $17) ON CONFLICT (network_id, id) DO UPDATE SET " +		"active_bridge = EXCLUDED.active_bridge, authorized = EXCLUDED.authorized, capabilities = EXCLUDED.capabilities, " +		"identity = EXCLUDED.identity, last_authorized_time = EXCLUDED.last_authorized_time, " +		"last_deauthorized_time = EXCLUDED.last_deauthorized_time, no_auto_assign_ips = EXCLUDED.no_auto_assign_ips, " +		"remote_trace_level = EXCLUDED.remote_trace_level, remote_trace_target = EXCLUDED.remote_trace_target, " +		"revision = EXCLUDED.revision+1, tags = EXCLUDED.tags, v_major = EXCLUDED.v_major, " +		"v_minor = EXCLUDED.v_minor, v_rev = EXCLUDED.v_rev, v_proto = EXCLUDED.v_proto"); +	conn.prepare("delete_ip_assignments", +		"DELETE FROM ztc_member_ip_assignment WHERE member_id = $1 AND network_id = $2"); +	conn.prepare("insert_ip_assignments", +		"INSERT INTO ztc_member_ip_assignment (member_id, network_id, address) VALUES ($1, $2, $3)"); + +	conn.prepare("update_network",  +		"UPDATE ztc_network SET controller_id = $2, capabilities = $3, enable_broadcast = $4, " +		"last_updated = $5, mtu = $6, multicast_limit = $7, name = $8, private = $9, " +		"remote_trace_level = $10, remote_trace_target = $11, rules = $12, rules_source = $13, " +		"tags = $14, v4_assign_mode = $15, v6_assign_mode = $16 " +		"WHERE id = $1"); +	conn.prepare("delete_network_ip_pool", +		"DELETE FROM ztc_network_assignment_pool WHERE network_id = $1"); +	conn.prepare("insert_network_ip_pool", +		"INSERT INTO ztc_network_assignment_pool (network_id, ip_range_start, ip_range_end) VALUES ($1, $2, $3)"); +	conn.prepare("delete_network_route", "DELETE FROM ztc_network_route WHERE network_id = $1"); +	conn.prepare("insert_network_route", "INSERT INTO ztc_network_route (network_id, address, bits, via) VALUES ($1, $2, $3, $4)"); +	conn.prepare("delete_network", "UPDATE ztc_network SET deleted = true WHERE id = $1"); +	conn.prepare("delete_member", "UPDATE ztc_member SET hidden = true, deleted = true WHERE id = $1 AND network_id = $2"); +  	json *config = nullptr;  	while(_commitQueue.get(config)&(_run == 1)) {  		if (!config) {  			continue;  		} -		if (PQstatus(conn) == CONNECTION_BAD) { -			fprintf(stderr, "ERROR: Connection to database failed: %s\n", PQerrorMessage(conn)); -			PQfinish(conn); -			delete config; -			exit(1); -		}  		try {   			const std::string objtype = (*config)["objtype"];  			if (objtype == "member") { -				try { -					std::string memberId = (*config)["id"]; -					std::string networkId = (*config)["nwid"]; -					std::string identity = (*config)["identity"]; -					std::string target = "NULL"; - -					if (!(*config)["remoteTraceTarget"].is_null()) { -						target = (*config)["remoteTraceTarget"]; -					} +				std::string memberId = (*config)["id"]; +				std::string networkId = (*config)["nwid"]; +				std::string target(""); +				if (!(*config)["remoteTraceTarget"].is_null()) { +					target = (*config)["remoteTraceTarget"]; +				} +				std::string identity = (*config)["identity"]; +				try {  					std::string caps = OSUtils::jsonDump((*config)["capabilities"], -1); -					std::string lastAuthTime = std::to_string((long long)(*config)["lastAuthorizedTime"]); -					std::string lastDeauthTime = std::to_string((long long)(*config)["lastDeauthorizedTime"]); -					std::string rtraceLevel = std::to_string((int)(*config)["remoteTraceLevel"]); -					std::string rev = std::to_string((unsigned long long)(*config)["revision"]);  					std::string tags = OSUtils::jsonDump((*config)["tags"], -1); -					std::string vmajor = std::to_string((int)(*config)["vMajor"]); -					std::string vminor = std::to_string((int)(*config)["vMinor"]); -					std::string vrev = std::to_string((int)(*config)["vRev"]); -					std::string vproto = std::to_string((int)(*config)["vProto"]); -					const char *values[19] = { -						memberId.c_str(), -						networkId.c_str(), -						((*config)["activeBridge"] ? "true" : "false"), -						((*config)["authorized"] ? "true" : "false"), -						caps.c_str(), -						identity.c_str(), -						lastAuthTime.c_str(), -						lastDeauthTime.c_str(), -						((*config)["noAutoAssignIps"] ? "true" : "false"), -						rtraceLevel.c_str(), -						(target == "NULL") ? NULL : target.c_str(), -						rev.c_str(), -						tags.c_str(), -						vmajor.c_str(), -						vminor.c_str(), -						vrev.c_str(), -						vproto.c_str() -					}; - -					PGresult *res = PQexecParams(conn, -						"INSERT INTO ztc_member (id, network_id, active_bridge, authorized, capabilities, " -						"identity, last_authorized_time, last_deauthorized_time, no_auto_assign_ips, " -						"remote_trace_level, remote_trace_target, revision, tags, v_major, v_minor, v_rev, v_proto) " -						"VALUES ($1, $2, $3, $4, $5, $6, " -						"TO_TIMESTAMP($7::double precision/1000), TO_TIMESTAMP($8::double precision/1000), " -						"$9, $10, $11, $12, $13, $14, $15, $16, $17) ON CONFLICT (network_id, id) DO UPDATE SET " -						"active_bridge = EXCLUDED.active_bridge, authorized = EXCLUDED.authorized, capabilities = EXCLUDED.capabilities, " -						"identity = EXCLUDED.identity, last_authorized_time = EXCLUDED.last_authorized_time, " -						"last_deauthorized_time = EXCLUDED.last_deauthorized_time, no_auto_assign_ips = EXCLUDED.no_auto_assign_ips, " -						"remote_trace_level = EXCLUDED.remote_trace_level, remote_trace_target = EXCLUDED.remote_trace_target, " -						"revision = EXCLUDED.revision+1, tags = EXCLUDED.tags, v_major = EXCLUDED.v_major, " -						"v_minor = EXCLUDED.v_minor, v_rev = EXCLUDED.v_rev, v_proto = EXCLUDED.v_proto", -						17, -						NULL, -						values, -						NULL, -						NULL, -						0); -					 -					if (PQresultStatus(res) != PGRES_COMMAND_OK) { -						fprintf(stderr, "ERROR: Error updating member: %s\n", PQresultErrorMessage(res)); -						fprintf(stderr, "%s", OSUtils::jsonDump(*config, 2).c_str()); -						PQclear(res); -						delete config; -						config = nullptr; -						continue; -					} - -					PQclear(res); - -					res = PQexec(conn, "BEGIN"); -					if (PQresultStatus(res) != PGRES_COMMAND_OK) { -						fprintf(stderr, "ERROR: Error beginning transaction: %s\n", PQresultErrorMessage(res)); -						PQclear(res); -						delete config; -						config = nullptr; -						continue; -					} - -					PQclear(res); - -					const char *v2[2] = { -						memberId.c_str(), -						networkId.c_str() -					}; - -					res = PQexecParams(conn, -						"DELETE FROM ztc_member_ip_assignment WHERE member_id = $1 AND network_id = $2", -						2, -						NULL, -						v2, -						NULL, -						NULL, -						0); - -					if (PQresultStatus(res) != PGRES_COMMAND_OK) { -						fprintf(stderr, "ERROR: Error updating IP address assignments: %s\n", PQresultErrorMessage(res)); -						PQclear(res); -						PQclear(PQexec(conn, "ROLLBACK"));; -						delete config; -						config = nullptr; -						continue; -					} -					PQclear(res); +					pqxx::work w(conn); +					pqxx::result res = w.prepared("insert_member")(memberId)(networkId) +						((bool)(*config)["activeBridge"])((bool)(*config)["authorized"])(caps) +						(identity)((long long)(*config)["lastAuthorizedTime"])((long long)(*config)["lastDeauthorizedTime"]) +						((bool)(*config)["noAutoAssignIps"])((int)(*config)["remoteTraceLevel"]) +						(target, !target.empty()) +						((int)(*config)["revision"])(tags)((int)(*config)["vMajor"])((int)(*config)["vMinor"]) +						((int)(*config)["vRev"])((int)(*config)["vProto"]).exec(); +					w.commit(); +				} catch (std::exception &e) { +					fprintf(stderr, "Exception upserting member: %s\n", e.what()); +					delete config; +					config = nullptr; +					continue; +				} +				try { +					pqxx::work w(conn); +					pqxx::result res = w.prepared("delete_ip_assignments")(memberId)(networkId).exec(); +					  					for (auto i = (*config)["ipAssignments"].begin(); i != (*config)["ipAssignments"].end(); ++i) {  						std::string addr = *i; -						const char *v3[3] = { -							memberId.c_str(), -							networkId.c_str(), -							addr.c_str() -						}; - -						res = PQexecParams(conn, -							"INSERT INTO ztc_member_ip_assignment (member_id, network_id, address) VALUES ($1, $2, $3)", -							3, -							NULL, -							v3, -							NULL, -							NULL, -							0); -						 -						if (PQresultStatus(res) != PGRES_COMMAND_OK) { -							fprintf(stderr, "ERROR: Error setting IP addresses for member: %s\n", PQresultErrorMessage(res)); -							PQclear(res); -							PQclear(PQexec(conn, "ROLLBACK")); -							continue; -						} -					} - -					res = PQexec(conn, "COMMIT"); -					if (PQresultStatus(res) != PGRES_COMMAND_OK) { -						fprintf(stderr, "ERROR: Error committing ip address data: %s\n", PQresultErrorMessage(res)); +						pqxx::result res2 = w.prepared("insert_ip_assignments")(memberId)(networkId)(addr).exec();  					} +					w.commit(); +				} catch (std::exception &e) { +					fprintf(stderr, "Error assigning member IP addresses: %s\n", e.what()); +					delete config; +					config = nullptr; +				} -					PQclear(res); - -					const uint64_t nwidInt = OSUtils::jsonIntHex((*config)["nwid"], 0ULL); -					const uint64_t memberidInt = OSUtils::jsonIntHex((*config)["id"], 0ULL); -					if (nwidInt && memberidInt) { -						nlohmann::json nwOrig; -						nlohmann::json memOrig; - -						nlohmann::json memNew(*config); -						 -						get(nwidInt, nwOrig, memberidInt, memOrig); -				 -						_memberChanged(memOrig, memNew, (this->_ready>=2)); -					} else { -						fprintf(stderr, "Can't notify of change.  Error parsing nwid or memberid: %lu-%lu\n", nwidInt, memberidInt); -					} +				const uint64_t nwidInt = OSUtils::jsonIntHex((*config)["nwid"], 0ULL); +				const uint64_t memberidInt = OSUtils::jsonIntHex((*config)["id"], 0ULL); +				if (nwidInt && memberidInt) { +					nlohmann::json nwOrig; +					nlohmann::json memOrig; -				} catch (std::exception &e) { -					fprintf(stderr, "ERROR: Error updating member: %s\n", e.what()); +					nlohmann::json memNew(*config); +					 +					get(nwidInt, nwOrig, memberidInt, memOrig); +			 +					_memberChanged(memOrig, memNew, (this->_ready>=2)); +				} else { +					fprintf(stderr, "Can't notify of change.  Error parsing nwid or memberid: %lu-%lu\n", nwidInt, memberidInt);  				}  			} else if (objtype == "network") { +				std::string id = (*config)["id"]; +				std::string controllerId = _myAddressStr.c_str(); +				std::string name = (*config)["name"]; +				std::string rulesSource = (*config)["rulesSource"]; +				std::string caps = OSUtils::jsonDump((*config)["capabilitles"], -1); +				std::string rtraceLevel = std::to_string((int)(*config)["remoteTraceLevel"]); +				std::string rules = OSUtils::jsonDump((*config)["rules"], -1); +				std::string tags = OSUtils::jsonDump((*config)["tags"], -1); +				std::string v4mode = OSUtils::jsonDump((*config)["v4AssignMode"],-1); +				std::string v6mode = OSUtils::jsonDump((*config)["v6AssignMode"], -1); +				std::string target = ""; +				if (!(*config)["remoteTraceTarget"].is_null()) { +					target = (*config)["remoteTraceTarget"]; +				}  				try { -					std::string id = (*config)["id"]; -					std::string controllerId = _myAddressStr.c_str(); -					std::string name = (*config)["name"]; -					std::string remoteTraceTarget("NULL"); -					if (!(*config)["remoteTraceTarget"].is_null()) { -						remoteTraceTarget = (*config)["remoteTraceTarget"]; -					} -					std::string rulesSource = (*config)["rulesSource"]; -					std::string caps = OSUtils::jsonDump((*config)["capabilitles"], -1); -					std::string now = std::to_string(OSUtils::now()); -					std::string mtu = std::to_string((int)(*config)["mtu"]); -					std::string mcastLimit = std::to_string((int)(*config)["multicastLimit"]); -					std::string rtraceLevel = std::to_string((int)(*config)["remoteTraceLevel"]); -					std::string rules = OSUtils::jsonDump((*config)["rules"], -1); -					std::string tags = OSUtils::jsonDump((*config)["tags"], -1); -					std::string v4mode = OSUtils::jsonDump((*config)["v4AssignMode"],-1); -					std::string v6mode = OSUtils::jsonDump((*config)["v6AssignMode"], -1); -					bool enableBroadcast = (*config)["enableBroadcast"]; -					bool isPrivate = (*config)["private"]; - -					const char *values[16] = { -						id.c_str(), -						controllerId.c_str(), -						caps.c_str(), -						enableBroadcast ? "true" : "false", -						now.c_str(), -						mtu.c_str(), -						mcastLimit.c_str(), -						name.c_str(), -						isPrivate ? "true" : "false", -						rtraceLevel.c_str(), -						(remoteTraceTarget == "NULL" ? NULL : remoteTraceTarget.c_str()), -						rules.c_str(), -						rulesSource.c_str(), -						tags.c_str(), -						v4mode.c_str(), -						v6mode.c_str(), -					}; - -					PGresult *res = PQexecParams(conn, -						"UPDATE ztc_network SET controller_id = $2, capabilities = $3, enable_broadcast = $4, " -						"last_updated = $5, mtu = $6, multicast_limit = $7, name = $8, private = $9, " -						"remote_trace_level = $10, remote_trace_target = $11, rules = $12, rules_source = $13, " -						"tags = $14, v4_assign_mode = $15, v6_assign_mode = $16 " -						"WHERE id = $1", -						16, -						NULL, -						values, -						NULL, -						NULL, -						0); -					 -					if (PQresultStatus(res) != PGRES_COMMAND_OK) { -						fprintf(stderr, "ERROR: Error updating network record: %s\n", PQresultErrorMessage(res)); -						PQclear(res); -						delete config; -						config = nullptr; -						continue; -					} - -					PQclear(res); - -					res = PQexec(conn, "BEGIN"); -					if (PQresultStatus(res) != PGRES_COMMAND_OK) { -						fprintf(stderr, "ERROR: Error beginnning transaction: %s\n", PQresultErrorMessage(res)); -						PQclear(res); -						delete config; -						config = nullptr; -						continue; -					} - -					PQclear(res); - -					const char *params[1] = { -						id.c_str() -					}; -					res = PQexecParams(conn,  -						"DELETE FROM ztc_network_assignment_pool WHERE network_id = $1", -						1, -						NULL, -						params, -						NULL, -						NULL, -						0); -					if (PQresultStatus(res) != PGRES_COMMAND_OK) { -						fprintf(stderr, "ERROR: Error updating assignment pool: %s\n", PQresultErrorMessage(res)); -						PQclear(res); -						PQclear(PQexec(conn, "ROLLBACK")); -						delete config; -						config = nullptr; -						continue; -					} +					pqxx::work w(conn); +					pqxx::result res = w.prepared("update_network")(id)(controllerId)(caps)((bool)(*config)["enableBroadcast"]) +						(OSUtils::now())((int)(*config)["mtu"])((int)(*config)["multicastLimit"])(name)((bool)(*config)["private"]) +						((int)(*config)["remoteTraceLevel"])(target, !target.empty()) +						(rules)(rulesSource)(tags)(v4mode)(v6mode).exec(); +					w.commit(); +				} catch (std::exception &e) { +					fprintf(stderr, "Error updating network config: %s\n", e.what()); +					delete config; +					config = nullptr; +					continue; +				} -					PQclear(res); +				try { +					pqxx::work w(conn); +					pqxx::result res = w.prepared("delete_network_ip_pool")(id).exec();  					auto pool = (*config)["ipAssignmentPools"]; -					bool err = false;  					for (auto i = pool.begin(); i != pool.end(); ++i) {  						std::string start = (*i)["ipRangeStart"];  						std::string end = (*i)["ipRangeEnd"]; -						const char *p[3] = { -							id.c_str(), -							start.c_str(), -							end.c_str() -						}; - -						res = PQexecParams(conn, -							"INSERT INTO ztc_network_assignment_pool (network_id, ip_range_start, ip_range_end) " -							"VALUES ($1, $2, $3)", -							3, -							NULL, -							p, -							NULL, -							NULL, -							0); -						if (PQresultStatus(res) != PGRES_COMMAND_OK) { -							fprintf(stderr, "ERROR: Error updating assignment pool: %s\n", PQresultErrorMessage(res)); -							PQclear(res); -							err = true; -							break; -						} -						PQclear(res); -					} -					if (err) { -						PQclear(PQexec(conn, "ROLLBACK")); -						delete config; -						config = nullptr; -						continue; -					} -					res = PQexecParams(conn,  -						"DELETE FROM ztc_network_route WHERE network_id = $1", -						1, -						NULL, -						params, -						NULL, -						NULL, -						0); - -					if (PQresultStatus(res) != PGRES_COMMAND_OK) { -						fprintf(stderr, "ERROR: Error updating routes: %s\n", PQresultErrorMessage(res)); -						PQclear(res); -						PQclear(PQexec(conn, "ROLLBACK")); -						delete config; -						config = nullptr; -						continue; +						pqxx::result r2 = w.prepared("insert_nework_ip_pool")(id)(start)(end).exec();  					} +					pqxx::result res2 = w.prepared("delete_network_route")(id).exec(); +  					auto routes = (*config)["routes"]; -					err = false;  					for (auto i = routes.begin(); i != routes.end(); ++i) {  						std::string t = (*i)["target"];  						std::vector<std::string> target; @@ -1013,86 +727,42 @@ void PostgreSQL::commitThread()  							continue;  						}  						std::string targetAddr = target[0]; -						std::string targetBits = target[1]; -						std::string via = "NULL"; +						int targetBits = std::stoi(target[1]); +						std::string via = "";  						if (!(*i)["via"].is_null()) {  							via = (*i)["via"];  						} - -						const char *p[4] = { -							id.c_str(), -							targetAddr.c_str(), -							targetBits.c_str(), -							(via == "NULL" ? NULL : via.c_str()), -						}; - -						res = PQexecParams(conn, -							"INSERT INTO ztc_network_route (network_id, address, bits, via) VALUES ($1, $2, $3, $4)", -							4, -							NULL, -							p, -							NULL, -							NULL, -							0); - -						if (PQresultStatus(res) != PGRES_COMMAND_OK) { -							fprintf(stderr, "ERROR: Error updating routes: %s\n", PQresultErrorMessage(res)); -							PQclear(res); -							err = true; -							break; -						} -						PQclear(res); -					} -					if (err) { -						PQclear(PQexec(conn, "ROLLBACK")); -						delete config; -						config = nullptr; -						continue; +						 +						pqxx::result res3 = w.prepared("insert_network_route")(id)(targetAddr)(targetBits) +							(via, !via.empty()).exec();  					} -					res = PQexec(conn, "COMMIT"); -					if (PQresultStatus(res) != PGRES_COMMAND_OK) { -						fprintf(stderr, "ERROR: Error committing network update: %s\n", PQresultErrorMessage(res)); -					} -					PQclear(res); +					w.commit(); -					const uint64_t nwidInt = OSUtils::jsonIntHex((*config)["nwid"], 0ULL); -					if (nwidInt) { -						nlohmann::json nwOrig; -						nlohmann::json nwNew(*config); +				} catch (std::exception &e) { +					fprintf(stderr, "Error updating network IP pool: %s\n", e.what()); +				} -						get(nwidInt, nwOrig); +				const uint64_t nwidInt = OSUtils::jsonIntHex((*config)["nwid"], 0ULL); +				if (nwidInt) { +					nlohmann::json nwOrig; +					nlohmann::json nwNew(*config); -						_networkChanged(nwOrig, nwNew, true); -					} else { -						fprintf(stderr, "Can't notify network changed: %lu\n", nwidInt); -					} +					get(nwidInt, nwOrig); -				} catch (std::exception &e) { -					fprintf(stderr, "ERROR: Error updating member: %s\n", e.what()); +					_networkChanged(nwOrig, nwNew, true); +				} else { +					fprintf(stderr, "Can't notify network changed: %lu\n", nwidInt);  				}  			} else if (objtype == "trace") {  				fprintf(stderr, "ERROR: Trace not yet implemented");  			} else if (objtype == "_delete_network") {  				try {  					std::string networkId = (*config)["nwid"]; -					const char *values[1] = { -						networkId.c_str() -					}; -					PGresult * res = PQexecParams(conn, -						"UPDATE ztc_network SET deleted = true WHERE id = $1", -						1, -						NULL, -						values, -						NULL, -						NULL, -						0); -					if (PQresultStatus(res) != PGRES_COMMAND_OK) { -						fprintf(stderr, "ERROR: Error deleting network: %s\n", PQresultErrorMessage(res)); -					} - -					PQclear(res); +					pqxx::work w(conn); +					pqxx::result res = w.prepared("delete_network")(networkId).exec(); +					w.commit();  				} catch (std::exception &e) {  					fprintf(stderr, "ERROR: Error deleting network: %s\n", e.what());  				} @@ -1101,25 +771,9 @@ void PostgreSQL::commitThread()  					std::string memberId = (*config)["id"];  					std::string networkId = (*config)["nwid"]; -					const char *values[2] = { -						memberId.c_str(), -						networkId.c_str() -					}; - -					PGresult *res = PQexecParams(conn, -						"UPDATE ztc_member SET hidden = true, deleted = true WHERE id = $1 AND network_id = $2", -						2, -						NULL, -						values, -						NULL, -						NULL, -						0); - -					if (PQresultStatus(res) != PGRES_COMMAND_OK) { -						fprintf(stderr, "ERROR: Error deleting member: %s\n", PQresultErrorMessage(res)); -					} - -					PQclear(res); +					pqxx::work w(conn); +					pqxx::result res = w.prepared("delete_member")(memberId)(networkId).exec(); +					w.commit();  				} catch (std::exception &e) {  					fprintf(stderr, "ERROR: Error deleting member: %s\n", e.what());  				} @@ -1132,18 +786,15 @@ void PostgreSQL::commitThread()  		delete config;  		config = nullptr; -		std::this_thread::sleep_for(std::chrono::milliseconds(10)); +		std::this_thread::sleep_for(std::chrono::milliseconds(1));  	} - -	PQfinish(conn);  }  void PostgreSQL::onlineNotificationThread()  { -	PGconn *conn = PQconnectdb(_path.c_str()); -	if (PQstatus(conn) == CONNECTION_BAD) { -		fprintf(stderr, "Connection to database failed: %s\n", PQerrorMessage(conn)); -		PQfinish(conn); +	pqxx::connection conn(_connString); +	if(!conn.is_open()) { +		fprintf(stderr, "Connection to database failed: onlineNotificationThread\n");  		exit(1);  	}  	_connected = 1; @@ -1152,11 +803,6 @@ void PostgreSQL::onlineNotificationThread()  	std::unordered_map< std::pair<uint64_t,uint64_t>,int64_t,_PairHasher > lastOnlineCumulative;  	while (_run == 1) { -		if (PQstatus(conn) != CONNECTION_OK) { -			fprintf(stderr, "ERROR: Online Notification thread lost connection to Postgres."); -			exit(-1); -		} -  		// map used to send notifications to front end  		std::unordered_map<std::string, std::vector<std::string>> updateMap; @@ -1166,230 +812,144 @@ void PostgreSQL::onlineNotificationThread()  			lastOnline.swap(_lastOnline);  		} -		PGresult *res = NULL; -		int qCount = 0; - -		res = PQexec(conn, "BEGIN"); -		if (PQresultStatus(res) != PGRES_COMMAND_OK) { -			fprintf(stderr, "ERROR: Error on BEGIN command (onlineNotificationThread): %s\n", PQresultErrorMessage(res)); -			PQclear(res); -			exit(1); -		} -		PQclear(res); - -		for (auto i=lastOnline.begin(); i != lastOnline.end(); ++i) { -			uint64_t nwid_i = i->first.first; -			char nwidTmp[64]; -			char memTmp[64]; -			char ipTmp[64]; -			OSUtils::ztsnprintf(nwidTmp,sizeof(nwidTmp), "%.16llx", nwid_i); -			OSUtils::ztsnprintf(memTmp,sizeof(memTmp), "%.10llx", i->first.second); - -			auto found = _networks.find(nwid_i); -			if (found == _networks.end()) { -				continue; // skip members trying to join non-existant networks -			} - -			lastOnlineCumulative[i->first] = i->second.first; -			 - -			std::string networkId(nwidTmp); -			std::string memberId(memTmp); - -			std::vector<std::string> &members = updateMap[networkId]; -			members.push_back(memberId); - -			int64_t ts = i->second.first; -			std::string ipAddr = i->second.second.toIpString(ipTmp); -			std::string timestamp = std::to_string(ts); - -			const char *values[4] = { -				networkId.c_str(), -				memberId.c_str(), -				(ipAddr.empty() ? NULL : ipAddr.c_str()), -				timestamp.c_str(), -			}; - -			res = PQexecParams(conn, -				"INSERT INTO ztc_member_status (network_id, member_id, address, last_updated) VALUES ($1, $2, $3, TO_TIMESTAMP($4::double precision/1000)) " -				"ON CONFLICT (network_id, member_id) DO UPDATE SET address = EXCLUDED.address, last_updated = EXCLUDED.last_updated", -				4,       // number of parameters -				NULL,    // oid field.   ignore -				values,  // values for substitution -				NULL, // lengths in bytes of each value -				NULL, -				0); - -			if (PQresultStatus(res) != PGRES_COMMAND_OK) { -				fprintf(stderr, "Error on Member Status upsert: %s\n", PQresultErrorMessage(res)); -				PQclear(res); -				PQclear(PQexec(conn, "ROLLBACK")); -				continue; -			} - -			PQclear(res); - -			if ((++qCount) == 1024) { -				res = PQexec(conn, "COMMIT"); -				if (PQresultStatus(res) != PGRES_COMMAND_OK) { -					fprintf(stderr, "ERROR: Error on commit (onlineNotificationThread): %s\n", PQresultErrorMessage(res)); -					PQclear(res); -					PQclear(PQexec(conn, "ROLLBACK")); -					exit(1); -				} -				PQclear(res); - -				res = PQexec(conn, "BEGIN"); -				if (PQresultStatus(res) != PGRES_COMMAND_OK) { -					fprintf(stderr, "ERROR: Error on BEGIN (onlineNotificationThread): %s\n", PQresultErrorMessage(res)); -					PQclear(res); -					exit(1); +		try { +			pqxx::work w(conn); +			pqxx::pipeline p(w, "Member Update Pipeline"); +			for (auto i=lastOnline.begin(); i != lastOnline.end(); ++i) { +				uint64_t nwid_i = i->first.first; +				char nwidTmp[64]; +				char memTmp[64]; +				char ipTmp[64]; +				OSUtils::ztsnprintf(nwidTmp,sizeof(nwidTmp), "%.16llx", nwid_i); +				OSUtils::ztsnprintf(memTmp,sizeof(memTmp), "%.10llx", i->first.second); + +				auto found = _networks.find(nwid_i); +				if (found == _networks.end()) { +					continue; // skip members trying to join non-existant networks  				} -				PQclear(res); -				qCount = 0; -			} -		} -		res = PQexec(conn, "COMMIT"); -		if (PQresultStatus(res) != PGRES_COMMAND_OK) { -			fprintf(stderr, "ERROR: Error on commit (onlineNotificationThread): %s\n", PQresultErrorMessage(res)); -			PQclear(res); -			PQclear(PQexec(conn, "ROLLBACK")); -			exit(1); -		} -		PQclear(res); -		const int64_t now = OSUtils::now(); -		if ((now - lastUpdatedNetworkStatus) > 10000) { -			lastUpdatedNetworkStatus = now; +				lastOnlineCumulative[i->first] = i->second.first; +				 -			std::vector<std::pair<uint64_t, std::shared_ptr<_Network>>> networks; -			{ -				std::lock_guard<std::mutex> l(_networks_l); -				for (auto i = _networks.begin(); i != _networks.end(); ++i) { -					networks.push_back(*i); -				} -			} +				std::string networkId(nwidTmp); +				std::string memberId(memTmp); -			int nCount = 0; +				std::vector<std::string> &members = updateMap[networkId]; +				members.push_back(memberId); -			res = PQexec(conn, "BEGIN"); -			if (PQresultStatus(res) != PGRES_COMMAND_OK) { -				fprintf(stderr, "ERROR: Error on BEGIN command (onlineNotificationThread): %s\n", PQresultErrorMessage(res)); -				PQclear(res); -				exit(1); -			} -			PQclear(res); -			for (auto i = networks.begin(); i != networks.end(); ++i) { -				char tmp[64]; -				Utils::hex(i->first, tmp); +				int64_t ts = i->second.first; +				std::string ipAddr = i->second.second.toIpString(ipTmp); +				std::string timestamp = std::to_string(ts); -				std::string networkId(tmp); +				std::stringstream ss; +				ss << "INSERT INTO ztc_member_status (network_id, member_id, address, last_updated) VALUES (" +				<< "'" << w.esc(networkId) << "', " +				<< "'" << w.esc(memberId) << "', " +				<< "'" << w.esc(ipAddr) << "', " +				<< timestamp << ") " +				<< "ON CONFLICT (network_id, member_id) DO UPDATE SET address = EXCLUDED.address, last_updated = EXCLUDED.last_updated"; +				p.insert(ss.str()); +			} +			p.complete(); +			w.commit(); +		} catch (std::exception &e) { +			fprintf(stderr, "Error updating member status: %s\n", e.what()); +		} -				std::vector<std::string> &_notUsed = updateMap[networkId]; -				(void)_notUsed; +		try { +			const int64_t now = OSUtils::now(); +			if ((now - lastUpdatedNetworkStatus) > 10000) { +				pqxx::work w(conn); +				pqxx::pipeline p(w, "Network Update Pipeline"); +				lastUpdatedNetworkStatus = now; -				uint64_t authMemberCount = 0; -				uint64_t totalMemberCount = 0; -				uint64_t onlineMemberCount = 0; -				uint64_t bridgeCount = 0; -				uint64_t ts = now; +				std::vector<std::pair<uint64_t, std::shared_ptr<_Network>>> networks;  				{ -					std::lock_guard<std::mutex> l2(i->second->lock); -					authMemberCount = i->second->authorizedMembers.size(); -					totalMemberCount = i->second->members.size(); -					bridgeCount = i->second->activeBridgeMembers.size(); -					for (auto m=i->second->members.begin(); m != i->second->members.end(); ++m) { -						auto lo = lastOnlineCumulative.find(std::pair<uint64_t,uint64_t>(i->first, m->first)); -						if (lo != lastOnlineCumulative.end()) { -							if ((now - lo->second) <= (ZT_NETWORK_AUTOCONF_DELAY * 2)) { -								++onlineMemberCount; -							} else { -								lastOnlineCumulative.erase(lo); -							} -						} +					std::lock_guard<std::mutex> l(_networks_l); +					for (auto i = _networks.begin(); i != _networks.end(); ++i) { +						networks.push_back(*i);  					}  				} -				std::string bc = std::to_string(bridgeCount); -				std::string amc = std::to_string(authMemberCount); -				std::string omc = std::to_string(onlineMemberCount); -				std::string tmc = std::to_string(totalMemberCount); -				std::string timestamp = std::to_string(ts); -				const char *values[6] = { -					networkId.c_str(), -					bc.c_str(), -					amc.c_str(), -					omc.c_str(), -					tmc.c_str(), -					timestamp.c_str() -				}; - -				res = PQexecParams(conn, "INSERT INTO ztc_network_status (network_id, bridge_count, authorized_member_count, " -					"online_member_count, total_member_count, last_modified) VALUES ($1, $2, $3, $4, $5, TO_TIMESTAMP($6::double precision/1000)) " -					"ON CONFLICT (network_id) DO UPDATE SET bridge_count = EXCLUDED.bridge_count, " -					"authorized_member_count = EXCLUDED.authorized_member_count, online_member_count = EXCLUDED.online_member_count, " -					"total_member_count = EXCLUDED.total_member_count, last_modified = EXCLUDED.last_modified", -					6, -					NULL, -					values, -					NULL, -					NULL, -					0); -				 -				if (PQresultStatus(res) != PGRES_COMMAND_OK) { -					fprintf(stderr, "ERROR: Error on Network Status upsert (onlineNotificationThread): %s\n", PQresultErrorMessage(res)); -					PQclear(res); -					PQclear(PQexec(conn, "ROLLBACK")); -					exit(1); -				} - -				if ((++nCount) == 1024) { -					res = PQexec(conn, "COMMIT"); -					if (PQresultStatus(res) != PGRES_COMMAND_OK) { -						fprintf(stderr, "ERROR: Error on COMMIT (onlineNotificationThread): %s\n" , PQresultErrorMessage(res)); -						PQclear(res); -						PQclear(PQexec(conn, "ROLLBACK")); -						exit(1); -					} - -					res = PQexec(conn, "BEGIN"); -					if (PQresultStatus(res) != PGRES_COMMAND_OK) { -						fprintf(stderr, "ERROR: Error on BEGIN command (onlineNotificationThread): %s\n", PQresultErrorMessage(res)); -						PQclear(res); -						exit(1); +				for (auto i = networks.begin(); i != networks.end(); ++i) { +					char tmp[64]; +					Utils::hex(i->first, tmp); + +					std::string networkId(tmp); + +					std::vector<std::string> &_notUsed = updateMap[networkId]; +					(void)_notUsed; + +					uint64_t authMemberCount = 0; +					uint64_t totalMemberCount = 0; +					uint64_t onlineMemberCount = 0; +					uint64_t bridgeCount = 0; +					uint64_t ts = now; +					{ +						std::lock_guard<std::mutex> l2(i->second->lock); +						authMemberCount = i->second->authorizedMembers.size(); +						totalMemberCount = i->second->members.size(); +						bridgeCount = i->second->activeBridgeMembers.size(); +						for (auto m=i->second->members.begin(); m != i->second->members.end(); ++m) { +							auto lo = lastOnlineCumulative.find(std::pair<uint64_t,uint64_t>(i->first, m->first)); +							if (lo != lastOnlineCumulative.end()) { +								if ((now - lo->second) <= (ZT_NETWORK_AUTOCONF_DELAY * 2)) { +									++onlineMemberCount; +								} else { +									lastOnlineCumulative.erase(lo); +								} +							} +						}  					} -					nCount = 0; +					std::string bc = std::to_string(bridgeCount); +					std::string amc = std::to_string(authMemberCount); +					std::string omc = std::to_string(onlineMemberCount); +					std::string tmc = std::to_string(totalMemberCount); +					std::string timestamp = std::to_string(ts); + +					std::stringstream ss; +					ss << "INSERT INTO ztc_network_status (network_id, bridge_count, authorized_member_count, " +					   << "online_member_count, total_member_count, last_modified) VALUES (" +					   << "'" << w.esc(networkId) << "', " +					   << bridgeCount << ", " +					   << authMemberCount << ", " +					   << onlineMemberCount << ", " +					   << totalMemberCount << ", " +					   << "TO_TIMESTAMP(" << ts << "::double precision/1000)) " +					   << "ON CONFLICT (network_id) DO UPDATE SET bridge_count = EXCLUDED.bridge_count, " +					   << "authorized_member_count = EXCLUDED.authorized_member_count, online_member_count = EXCLUDED.online_member_count, " +					   << "total_member_count = EXCLUDED.total_member_count, last_modified = EXCLUDED.last_modified";  +					p.insert(ss.str());  				} +				p.complete(); +				w.commit();  			} -			res = PQexec(conn, "COMMIT"); -			if (PQresultStatus(res) != PGRES_COMMAND_OK) { -				fprintf(stderr, "ERROR: Error on COMMIT (onlineNotificationThread): %s\n", PQresultErrorMessage(res)); -				PQclear(res); -				PQclear(PQexec(conn, "ROLLBACK")); -				exit(1); -			} +		} catch (std::exception &e) { +			fprintf(stderr, "Error updating network status: %s\n", e.what());  		} -		for (auto it = updateMap.begin(); it != updateMap.end(); ++it) { -			std::string networkId = it->first; -			std::vector<std::string> members = it->second; -			std::stringstream queryBuilder; +		try { +			pqxx::work w(conn); +			pqxx::pipeline p(w, "Notification Sender"); +			for (auto it = updateMap.begin(); it != updateMap.end(); ++it) { +				std::string networkId = it->first; +				std::vector<std::string> members = it->second; +				std::stringstream queryBuilder; -			std::string membersStr = ::join(members, ","); +				std::string membersStr = ::join(members, ","); -			queryBuilder << "NOTIFY controller, '" << networkId << ":" << membersStr << "'"; -			std::string query = queryBuilder.str(); +				queryBuilder << "NOTIFY controller, '" << networkId << ":" << membersStr << "'"; +				std::string query = queryBuilder.str(); -			PGresult *res = PQexec(conn,query.c_str()); -			if (PQresultStatus(res) != PGRES_COMMAND_OK) { -				fprintf(stderr, "ERROR: Error sending NOTIFY: %s\n", PQresultErrorMessage(res)); +				p.insert(query);  			} -			PQclear(res); +			p.complete(); +			w.commit(); +		} catch (std::exception &e) { +			fprintf(stderr, "Error notifying webapp: %s\n", e.what());  		} - -		std::this_thread::sleep_for(std::chrono::milliseconds(250)); +		std::this_thread::sleep_for(std::chrono::milliseconds(1));  	} -	PQfinish(conn);  }  #endif //ZT_CONTROLLER_USE_LIBPQ diff --git a/controller/PostgreSQL.hpp b/controller/PostgreSQL.hpp index 36fe8c9f..86aac80d 100644 --- a/controller/PostgreSQL.hpp +++ b/controller/PostgreSQL.hpp @@ -23,12 +23,16 @@  #include "DB.hpp" +#include <pqxx/pqxx> +  extern "C" {      typedef struct pg_conn PGconn;  }  namespace ZeroTier  { +class _MemberNotificationReceiver; +class _NetworkNotificationReceiver;  /**   * A controller database driver that talks to PostgreSQL @@ -56,8 +60,8 @@ protected:  	};  private: -    void initializeNetworks(PGconn *conn); -    void initializeMembers(PGconn *conn); +    void initializeNetworks(pqxx::connection &conn); +    void initializeMembers(pqxx::connection &conn);      void heartbeat();      void membersDbWatcher();      void networksDbWatcher(); @@ -81,6 +85,9 @@ private:      mutable std::mutex _readyLock;      std::atomic<int> _ready, _connected, _run;      mutable volatile bool _waitNoticePrinted; + +    friend class _MemberNotificationReceiver; +    friend class _NetworkNotificationReceiver;  };  } | 
