diff options
Diffstat (limited to 'controller/PostgreSQL.cpp')
-rw-r--r-- | controller/PostgreSQL.cpp | 71 |
1 files changed, 35 insertions, 36 deletions
diff --git a/controller/PostgreSQL.cpp b/controller/PostgreSQL.cpp index 121d00df..13b7b8d8 100644 --- a/controller/PostgreSQL.cpp +++ b/controller/PostgreSQL.cpp @@ -81,7 +81,9 @@ std::string join(const std::vector<std::string> &elements, const char * const se using namespace ZeroTier; PostgreSQL::PostgreSQL(const Identity &myId, const char *path, int listenPort, MQConfig *mqc) - : DB(myId, path) + : DB() + , _myId(myId) + , _myAddress(myId.address()) , _ready(0) , _connected(1) , _run(1) @@ -89,7 +91,9 @@ PostgreSQL::PostgreSQL(const Identity &myId, const char *path, int listenPort, M , _listenPort(listenPort) , _mqc(mqc) { - _connString = std::string(path) + " application_name=controller_" +_myAddressStr; + char myAddress[64]; + _myAddressStr = myId.address().toString(myAddress); + _connString = std::string(path) + " application_name=controller_" + _myAddressStr; // Database Schema Version Check PGconn *conn = getPgConn(); @@ -165,8 +169,9 @@ bool PostgreSQL::isReady() return ((_ready == 2)&&(_connected)); } -void PostgreSQL::save(nlohmann::json &record) +bool PostgreSQL::save(nlohmann::json &record,bool notifyListeners) { + bool modified = false; try { if (!record.is_object()) return; @@ -178,7 +183,8 @@ void PostgreSQL::save(nlohmann::json &record) get(nwid,old); if ((!old.is_object())||(old != record)) { record["revision"] = OSUtils::jsonInt(record["revision"],0ULL) + 1ULL; - _commitQueue.post(new nlohmann::json(record)); + _commitQueue.post(std::pair<nlohmann::json,bool>(record,notifyListeners)); + modified = true; } } } else if (objtype == "member") { @@ -189,7 +195,8 @@ void PostgreSQL::save(nlohmann::json &record) get(nwid,network,id,old); if ((!old.is_object())||(old != record)) { record["revision"] = OSUtils::jsonInt(record["revision"],0ULL) + 1ULL; - _commitQueue.post(new nlohmann::json(record)); + _commitQueue.post(std::pair<nlohmann::json,bool>(record,notifyListeners)); + modified = true; } } } @@ -210,6 +217,7 @@ void PostgreSQL::save(nlohmann::json &record) } catch (...) { fprintf(stderr, "Unknown error on PostgreSQL::save\n"); } + return modified; } void PostgreSQL::eraseNetwork(const uint64_t networkId) @@ -217,38 +225,33 @@ void PostgreSQL::eraseNetwork(const uint64_t networkId) char tmp2[24]; waitForReady(); Utils::hex(networkId, tmp2); - json *tmp = new json(); - (*tmp)["id"] = tmp2; - (*tmp)["objtype"] = "_delete_network"; + std::pair<nlohmann::json,bool> tmp; + tmp.first["id"] = tmp2; + tmp.first["objtype"] = "_delete_network"; + tmp.second = true; _commitQueue.post(tmp); } void PostgreSQL::eraseMember(const uint64_t networkId, const uint64_t memberId) { char tmp2[24]; - json *tmp = new json(); + std::pair<nlohmann::json,bool> tmp; Utils::hex(networkId, tmp2); - (*tmp)["nwid"] = tmp2; + tmp.first["nwid"] = tmp2; Utils::hex(memberId, tmp2); - (*tmp)["id"] = tmp2; - (*tmp)["objtype"] = "_delete_member"; + tmp.first["id"] = tmp2; + tmp.first["objtype"] = "_delete_member"; + tmp.second = true; _commitQueue.post(tmp); } void PostgreSQL::nodeIsOnline(const uint64_t networkId, const uint64_t memberId, const InetAddress &physicalAddress) { - { - std::lock_guard<std::mutex> l(_lastOnline_l); - std::pair<int64_t, InetAddress> &i = _lastOnline[std::pair<uint64_t,uint64_t>(networkId, memberId)]; - i.first = OSUtils::now(); - if (physicalAddress) { - i.second = physicalAddress; - } - } - { - std::lock_guard<std::mutex> l2(_changeListeners_l); - for(auto i=_changeListeners.begin();i!=_changeListeners.end();++i) - (*i)->onNetworkMemberOnline(this,networkId,memberId,physicalAddress); + std::lock_guard<std::mutex> l(_lastOnline_l); + std::pair<int64_t, InetAddress> &i = _lastOnline[std::pair<uint64_t,uint64_t>(networkId, memberId)]; + i.first = OSUtils::now(); + if (physicalAddress) { + i.second = physicalAddress; } } @@ -868,18 +871,18 @@ void PostgreSQL::commitThread() exit(1); } - json *config = nullptr; - while(_commitQueue.get(config)&(_run == 1)) { - if (!config) { + std::pair<nlohmann::json,bool> qitem; + while(_commitQueue.get(qitem)&(_run == 1)) { + if (!qitem.first.is_object()) { continue; } if (PQstatus(conn) == CONNECTION_BAD) { fprintf(stderr, "ERROR: Connection to database failed: %s\n", PQerrorMessage(conn)); PQfinish(conn); - delete config; exit(1); } - try { + try { + nlohmann::json *config = &(qitem.first); const std::string objtype = (*config)["objtype"]; if (objtype == "member") { try { @@ -1034,10 +1037,10 @@ void PostgreSQL::commitThread() nlohmann::json memOrig; nlohmann::json memNew(*config); - + get(nwidInt, nwOrig, memberidInt, memOrig); - _memberChanged(memOrig, memNew, (this->_ready>=2)); + _memberChanged(memOrig, memNew, qitem.second); } else { fprintf(stderr, "Can't notify of change. Error parsing nwid or memberid: %lu-%lu\n", nwidInt, memberidInt); } @@ -1260,7 +1263,7 @@ void PostgreSQL::commitThread() get(nwidInt, nwOrig); - _networkChanged(nwOrig, nwNew, true); + _networkChanged(nwOrig, nwNew, qitem.second); } else { fprintf(stderr, "Can't notify network changed: %lu\n", nwidInt); } @@ -1268,8 +1271,6 @@ void PostgreSQL::commitThread() } catch (std::exception &e) { fprintf(stderr, "ERROR: Error updating member: %s\n", e.what()); } - } else if (objtype == "trace") { - fprintf(stderr, "ERROR: Trace not yet implemented"); } else if (objtype == "_delete_network") { try { std::string networkId = (*config)["nwid"]; @@ -1326,8 +1327,6 @@ void PostgreSQL::commitThread() } catch (std::exception &e) { fprintf(stderr, "ERROR: Error getting objtype: %s\n", e.what()); } - delete config; - config = nullptr; std::this_thread::sleep_for(std::chrono::milliseconds(10)); } |