summaryrefslogtreecommitdiff
path: root/controller/PostgreSQL.cpp
diff options
context:
space:
mode:
authorAdam Ierymenko <adam.ierymenko@gmail.com>2019-08-06 10:42:54 -0500
committerAdam Ierymenko <adam.ierymenko@gmail.com>2019-08-06 10:42:54 -0500
commit00fb9c475e7bd68a12d6d581539862c31aeb2e74 (patch)
tree2f8423fff1914998fd858e1b4214a0394437974c /controller/PostgreSQL.cpp
parent3c776675b3824d4497d913386793efaece2ee7d1 (diff)
downloadinfinitytier-00fb9c475e7bd68a12d6d581539862c31aeb2e74.tar.gz
infinitytier-00fb9c475e7bd68a12d6d581539862c31aeb2e74.zip
More work on DB mirroring.
Diffstat (limited to 'controller/PostgreSQL.cpp')
-rw-r--r--controller/PostgreSQL.cpp71
1 files changed, 35 insertions, 36 deletions
diff --git a/controller/PostgreSQL.cpp b/controller/PostgreSQL.cpp
index 121d00df..13b7b8d8 100644
--- a/controller/PostgreSQL.cpp
+++ b/controller/PostgreSQL.cpp
@@ -81,7 +81,9 @@ std::string join(const std::vector<std::string> &elements, const char * const se
using namespace ZeroTier;
PostgreSQL::PostgreSQL(const Identity &myId, const char *path, int listenPort, MQConfig *mqc)
- : DB(myId, path)
+ : DB()
+ , _myId(myId)
+ , _myAddress(myId.address())
, _ready(0)
, _connected(1)
, _run(1)
@@ -89,7 +91,9 @@ PostgreSQL::PostgreSQL(const Identity &myId, const char *path, int listenPort, M
, _listenPort(listenPort)
, _mqc(mqc)
{
- _connString = std::string(path) + " application_name=controller_" +_myAddressStr;
+ char myAddress[64];
+ _myAddressStr = myId.address().toString(myAddress);
+ _connString = std::string(path) + " application_name=controller_" + _myAddressStr;
// Database Schema Version Check
PGconn *conn = getPgConn();
@@ -165,8 +169,9 @@ bool PostgreSQL::isReady()
return ((_ready == 2)&&(_connected));
}
-void PostgreSQL::save(nlohmann::json &record)
+bool PostgreSQL::save(nlohmann::json &record,bool notifyListeners)
{
+ bool modified = false;
try {
if (!record.is_object())
return;
@@ -178,7 +183,8 @@ void PostgreSQL::save(nlohmann::json &record)
get(nwid,old);
if ((!old.is_object())||(old != record)) {
record["revision"] = OSUtils::jsonInt(record["revision"],0ULL) + 1ULL;
- _commitQueue.post(new nlohmann::json(record));
+ _commitQueue.post(std::pair<nlohmann::json,bool>(record,notifyListeners));
+ modified = true;
}
}
} else if (objtype == "member") {
@@ -189,7 +195,8 @@ void PostgreSQL::save(nlohmann::json &record)
get(nwid,network,id,old);
if ((!old.is_object())||(old != record)) {
record["revision"] = OSUtils::jsonInt(record["revision"],0ULL) + 1ULL;
- _commitQueue.post(new nlohmann::json(record));
+ _commitQueue.post(std::pair<nlohmann::json,bool>(record,notifyListeners));
+ modified = true;
}
}
}
@@ -210,6 +217,7 @@ void PostgreSQL::save(nlohmann::json &record)
} catch (...) {
fprintf(stderr, "Unknown error on PostgreSQL::save\n");
}
+ return modified;
}
void PostgreSQL::eraseNetwork(const uint64_t networkId)
@@ -217,38 +225,33 @@ void PostgreSQL::eraseNetwork(const uint64_t networkId)
char tmp2[24];
waitForReady();
Utils::hex(networkId, tmp2);
- json *tmp = new json();
- (*tmp)["id"] = tmp2;
- (*tmp)["objtype"] = "_delete_network";
+ std::pair<nlohmann::json,bool> tmp;
+ tmp.first["id"] = tmp2;
+ tmp.first["objtype"] = "_delete_network";
+ tmp.second = true;
_commitQueue.post(tmp);
}
void PostgreSQL::eraseMember(const uint64_t networkId, const uint64_t memberId)
{
char tmp2[24];
- json *tmp = new json();
+ std::pair<nlohmann::json,bool> tmp;
Utils::hex(networkId, tmp2);
- (*tmp)["nwid"] = tmp2;
+ tmp.first["nwid"] = tmp2;
Utils::hex(memberId, tmp2);
- (*tmp)["id"] = tmp2;
- (*tmp)["objtype"] = "_delete_member";
+ tmp.first["id"] = tmp2;
+ tmp.first["objtype"] = "_delete_member";
+ tmp.second = true;
_commitQueue.post(tmp);
}
void PostgreSQL::nodeIsOnline(const uint64_t networkId, const uint64_t memberId, const InetAddress &physicalAddress)
{
- {
- std::lock_guard<std::mutex> l(_lastOnline_l);
- std::pair<int64_t, InetAddress> &i = _lastOnline[std::pair<uint64_t,uint64_t>(networkId, memberId)];
- i.first = OSUtils::now();
- if (physicalAddress) {
- i.second = physicalAddress;
- }
- }
- {
- std::lock_guard<std::mutex> l2(_changeListeners_l);
- for(auto i=_changeListeners.begin();i!=_changeListeners.end();++i)
- (*i)->onNetworkMemberOnline(this,networkId,memberId,physicalAddress);
+ std::lock_guard<std::mutex> l(_lastOnline_l);
+ std::pair<int64_t, InetAddress> &i = _lastOnline[std::pair<uint64_t,uint64_t>(networkId, memberId)];
+ i.first = OSUtils::now();
+ if (physicalAddress) {
+ i.second = physicalAddress;
}
}
@@ -868,18 +871,18 @@ void PostgreSQL::commitThread()
exit(1);
}
- json *config = nullptr;
- while(_commitQueue.get(config)&(_run == 1)) {
- if (!config) {
+ std::pair<nlohmann::json,bool> qitem;
+ while(_commitQueue.get(qitem)&(_run == 1)) {
+ if (!qitem.first.is_object()) {
continue;
}
if (PQstatus(conn) == CONNECTION_BAD) {
fprintf(stderr, "ERROR: Connection to database failed: %s\n", PQerrorMessage(conn));
PQfinish(conn);
- delete config;
exit(1);
}
- try {
+ try {
+ nlohmann::json *config = &(qitem.first);
const std::string objtype = (*config)["objtype"];
if (objtype == "member") {
try {
@@ -1034,10 +1037,10 @@ void PostgreSQL::commitThread()
nlohmann::json memOrig;
nlohmann::json memNew(*config);
-
+
get(nwidInt, nwOrig, memberidInt, memOrig);
- _memberChanged(memOrig, memNew, (this->_ready>=2));
+ _memberChanged(memOrig, memNew, qitem.second);
} else {
fprintf(stderr, "Can't notify of change. Error parsing nwid or memberid: %lu-%lu\n", nwidInt, memberidInt);
}
@@ -1260,7 +1263,7 @@ void PostgreSQL::commitThread()
get(nwidInt, nwOrig);
- _networkChanged(nwOrig, nwNew, true);
+ _networkChanged(nwOrig, nwNew, qitem.second);
} else {
fprintf(stderr, "Can't notify network changed: %lu\n", nwidInt);
}
@@ -1268,8 +1271,6 @@ void PostgreSQL::commitThread()
} catch (std::exception &e) {
fprintf(stderr, "ERROR: Error updating member: %s\n", e.what());
}
- } else if (objtype == "trace") {
- fprintf(stderr, "ERROR: Trace not yet implemented");
} else if (objtype == "_delete_network") {
try {
std::string networkId = (*config)["nwid"];
@@ -1326,8 +1327,6 @@ void PostgreSQL::commitThread()
} catch (std::exception &e) {
fprintf(stderr, "ERROR: Error getting objtype: %s\n", e.what());
}
- delete config;
- config = nullptr;
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}