diff options
-rw-r--r-- | controller/DB.cpp | 44 | ||||
-rw-r--r-- | controller/DB.hpp | 24 | ||||
-rw-r--r-- | controller/DBMirrorSet.cpp | 106 | ||||
-rw-r--r-- | controller/DBMirrorSet.hpp | 25 | ||||
-rw-r--r-- | controller/EmbeddedNetworkController.cpp | 6 | ||||
-rw-r--r-- | controller/EmbeddedNetworkController.hpp | 6 | ||||
-rw-r--r-- | controller/FileDB.cpp | 26 | ||||
-rw-r--r-- | controller/FileDB.hpp | 5 | ||||
-rw-r--r-- | controller/LFDB.cpp | 37 | ||||
-rw-r--r-- | controller/LFDB.hpp | 4 | ||||
-rw-r--r-- | controller/PostgreSQL.cpp | 71 | ||||
-rw-r--r-- | controller/PostgreSQL.hpp | 7 |
12 files changed, 210 insertions, 151 deletions
diff --git a/controller/DB.cpp b/controller/DB.cpp index 75adf53e..a4440f38 100644 --- a/controller/DB.cpp +++ b/controller/DB.cpp @@ -104,16 +104,7 @@ void DB::cleanMember(nlohmann::json &member) member.erase("lastRequestMetaData"); } -DB::DB(const Identity &myId,const char *path) : - _myId(myId), - _myAddress(myId.address()), - _path((path) ? path : "") -{ - char tmp[32]; - _myAddress.toString(tmp); - _myAddressStr = tmp; -} - +DB::DB() {} DB::~DB() {} bool DB::get(const uint64_t networkId,nlohmann::json &network) @@ -199,34 +190,15 @@ bool DB::get(const uint64_t networkId,nlohmann::json &network,std::vector<nlohma return true; } -bool DB::summary(const uint64_t networkId,NetworkSummaryInfo &info) -{ - waitForReady(); - std::shared_ptr<_Network> nw; - { - std::lock_guard<std::mutex> l(_networks_l); - auto nwi = _networks.find(networkId); - if (nwi == _networks.end()) - return false; - nw = nwi->second; - } - { - std::lock_guard<std::mutex> l2(nw->lock); - _fillSummaryInfo(nw,info); - } - return true; -} - -void DB::networks(std::vector<uint64_t> &networks) +void DB::networks(std::set<uint64_t> &networks) { waitForReady(); std::lock_guard<std::mutex> l(_networks_l); - networks.reserve(_networks.size() + 1); for(auto n=_networks.begin();n!=_networks.end();++n) - networks.push_back(n->first); + networks.insert(n->first); } -void DB::_memberChanged(nlohmann::json &old,nlohmann::json &memberConfig,bool initialized) +void DB::_memberChanged(nlohmann::json &old,nlohmann::json &memberConfig,bool notifyListeners) { uint64_t memberId = 0; uint64_t networkId = 0; @@ -310,7 +282,7 @@ void DB::_memberChanged(nlohmann::json &old,nlohmann::json &memberConfig,bool in } } - if (initialized) { + if (notifyListeners) { std::lock_guard<std::mutex> ll(_changeListeners_l); for(auto i=_changeListeners.begin();i!=_changeListeners.end();++i) { (*i)->onNetworkMemberUpdate(this,networkId,memberId,memberConfig); @@ -333,7 +305,7 @@ void DB::_memberChanged(nlohmann::json &old,nlohmann::json &memberConfig,bool in } } - if ((initialized)&&((wasAuth)&&(!isAuth)&&(networkId)&&(memberId))) { + if ((notifyListeners)&&((wasAuth)&&(!isAuth)&&(networkId)&&(memberId))) { std::lock_guard<std::mutex> ll(_changeListeners_l); for(auto i=_changeListeners.begin();i!=_changeListeners.end();++i) { (*i)->onNetworkMemberDeauthorize(this,networkId,memberId); @@ -341,7 +313,7 @@ void DB::_memberChanged(nlohmann::json &old,nlohmann::json &memberConfig,bool in } } -void DB::_networkChanged(nlohmann::json &old,nlohmann::json &networkConfig,bool initialized) +void DB::_networkChanged(nlohmann::json &old,nlohmann::json &networkConfig,bool notifyListeners) { if (networkConfig.is_object()) { const std::string ids = networkConfig["id"]; @@ -359,7 +331,7 @@ void DB::_networkChanged(nlohmann::json &old,nlohmann::json &networkConfig,bool std::lock_guard<std::mutex> l2(nw->lock); nw->config = networkConfig; } - if (initialized) { + if (notifyListeners) { std::lock_guard<std::mutex> ll(_changeListeners_l); for(auto i=_changeListeners.begin();i!=_changeListeners.end();++i) { (*i)->onNetworkUpdate(this,networkId,networkConfig); diff --git a/controller/DB.hpp b/controller/DB.hpp index 732a6e25..e391538e 100644 --- a/controller/DB.hpp +++ b/controller/DB.hpp @@ -43,6 +43,7 @@ #include <vector> #include <atomic> #include <mutex> +#include <set> #include "../ext/json/json.hpp" @@ -60,10 +61,9 @@ public: public: ChangeListener() {} virtual ~ChangeListener() {} - virtual void onNetworkUpdate(const DB *db,uint64_t networkId,const nlohmann::json &network) {} - virtual void onNetworkMemberUpdate(const DB *db,uint64_t networkId,uint64_t memberId,const nlohmann::json &member) {} - virtual void onNetworkMemberDeauthorize(const DB *db,uint64_t networkId,uint64_t memberId) {} - virtual void onNetworkMemberOnline(const DB *db,uint64_t networkId,uint64_t memberId,const InetAddress &physicalAddress) {} + virtual void onNetworkUpdate(const void *db,uint64_t networkId,const nlohmann::json &network) {} + virtual void onNetworkMemberUpdate(const void *db,uint64_t networkId,uint64_t memberId,const nlohmann::json &member) {} + virtual void onNetworkMemberDeauthorize(const void *db,uint64_t networkId,uint64_t memberId) {} }; struct NetworkSummaryInfo @@ -81,7 +81,7 @@ public: static void cleanNetwork(nlohmann::json &network); static void cleanMember(nlohmann::json &member); - DB(const Identity &myId,const char *path); + DB(); virtual ~DB(); virtual bool waitForReady() = 0; @@ -98,10 +98,9 @@ public: bool get(const uint64_t networkId,nlohmann::json &network,const uint64_t memberId,nlohmann::json &member,NetworkSummaryInfo &info); bool get(const uint64_t networkId,nlohmann::json &network,std::vector<nlohmann::json> &members); - bool summary(const uint64_t networkId,NetworkSummaryInfo &info); - void networks(std::vector<uint64_t> &networks); + void networks(std::set<uint64_t> &networks); - virtual void save(nlohmann::json &record) = 0; + virtual bool save(nlohmann::json &record,bool notifyListeners) = 0; virtual void eraseNetwork(const uint64_t networkId) = 0; virtual void eraseMember(const uint64_t networkId,const uint64_t memberId) = 0; @@ -127,15 +126,10 @@ protected: std::mutex lock; }; - void _memberChanged(nlohmann::json &old,nlohmann::json &memberConfig,bool initialized); - void _networkChanged(nlohmann::json &old,nlohmann::json &networkConfig,bool initialized); + void _memberChanged(nlohmann::json &old,nlohmann::json &memberConfig,bool notifyListeners); + void _networkChanged(nlohmann::json &old,nlohmann::json &networkConfig,bool notifyListeners); void _fillSummaryInfo(const std::shared_ptr<_Network> &nw,NetworkSummaryInfo &info); - const Identity _myId; - const Address _myAddress; - const std::string _path; - std::string _myAddressStr; - std::vector<DB::ChangeListener *> _changeListeners; std::unordered_map< uint64_t,std::shared_ptr<_Network> > _networks; std::unordered_multimap< uint64_t,uint64_t > _networkByMember; diff --git a/controller/DBMirrorSet.cpp b/controller/DBMirrorSet.cpp index bccdefdd..63c0f58b 100644 --- a/controller/DBMirrorSet.cpp +++ b/controller/DBMirrorSet.cpp @@ -28,7 +28,8 @@ namespace ZeroTier { -DBMirrorSet::DBMirrorSet() +DBMirrorSet::DBMirrorSet(DB::ChangeListener *listener) : + _listener(listener) { } @@ -36,6 +37,65 @@ DBMirrorSet::~DBMirrorSet() { } +bool DBMirrorSet::hasNetwork(const uint64_t networkId) const +{ + std::lock_guard<std::mutex> l(_dbs_l); + for(auto d=_dbs.begin();d!=_dbs.end();++d) { + if ((*d)->hasNetwork(networkId)) + return true; + } + return false; +} + +bool DBMirrorSet::get(const uint64_t networkId,nlohmann::json &network) +{ + std::lock_guard<std::mutex> l(_dbs_l); + for(auto d=_dbs.begin();d!=_dbs.end();++d) { + if (get(networkId,network)) { + return true; + } + } + return false; +} + +bool DBMirrorSet::get(const uint64_t networkId,nlohmann::json &network,const uint64_t memberId,nlohmann::json &member) +{ + std::lock_guard<std::mutex> l(_dbs_l); + for(auto d=_dbs.begin();d!=_dbs.end();++d) { + if (get(networkId,network,memberId,member)) + return true; + } + return false; +} + +bool DBMirrorSet::get(const uint64_t networkId,nlohmann::json &network,const uint64_t memberId,nlohmann::json &member,DB::NetworkSummaryInfo &info) +{ + std::lock_guard<std::mutex> l(_dbs_l); + for(auto d=_dbs.begin();d!=_dbs.end();++d) { + if (get(networkId,network,memberId,member,info)) + return true; + } + return false; +} + +bool DBMirrorSet::get(const uint64_t networkId,nlohmann::json &network,std::vector<nlohmann::json> &members) +{ + std::lock_guard<std::mutex> l(_dbs_l); + for(auto d=_dbs.begin();d!=_dbs.end();++d) { + if (get(networkId,network,members)) + return true; + } + return false; +} + +void DBMirrorSet::networks(std::set<uint64_t> &networks) +{ + std::lock_guard<std::mutex> l(_dbs_l); + for(auto d=_dbs.begin();d!=_dbs.end();++d) { + (*d)->networks(networks); + } +} + bool DBMirrorSet::waitForReady() { bool r = false; @@ -56,11 +116,21 @@ bool DBMirrorSet::isReady() return true; } -void DBMirrorSet::save(nlohmann::json &record) +bool DBMirrorSet::save(nlohmann::json &record,bool notifyListeners) { std::lock_guard<std::mutex> l(_dbs_l); - for(auto d=_dbs.begin();d!=_dbs.end();++d) { - (*d)->save(record); + if (notifyListeners) { + for(auto d=_dbs.begin();d!=_dbs.end();++d) { + if ((*d)->save(record,notifyListeners)) + return true; + } + return false; + } else { + bool modified = false; + for(auto d=_dbs.begin();d!=_dbs.end();++d) { + modified |= (*d)->save(record,notifyListeners); + } + return modified; } } @@ -88,25 +158,39 @@ void DBMirrorSet::nodeIsOnline(const uint64_t networkId,const uint64_t memberId, } } -void DBMirrorSet::onNetworkUpdate(const DB *db,uint64_t networkId,const nlohmann::json &network) +void DBMirrorSet::onNetworkUpdate(const void *db,uint64_t networkId,const nlohmann::json &network) { + bool modified = false; + nlohmann::json record(network); std::lock_guard<std::mutex> l(_dbs_l); for(auto d=_dbs.begin();d!=_dbs.end();++d) { if (d->get() != db) { + modified |= (*d)->save(record,false); } } + if (modified) { + _listener->onNetworkUpdate(this,networkId,network); + } } -void DBMirrorSet::onNetworkMemberUpdate(const DB *db,uint64_t networkId,uint64_t memberId,const nlohmann::json &member) -{ -} - -void DBMirrorSet::onNetworkMemberDeauthorize(const DB *db,uint64_t networkId,uint64_t memberId) +void DBMirrorSet::onNetworkMemberUpdate(const void *db,uint64_t networkId,uint64_t memberId,const nlohmann::json &member) { + bool modified = false; + nlohmann::json record(member); + std::lock_guard<std::mutex> l(_dbs_l); + for(auto d=_dbs.begin();d!=_dbs.end();++d) { + if (d->get() != db) { + modified |= (*d)->save(record,false); + } + } + if (modified) { + _listener->onNetworkMemberUpdate(this,networkId,memberId,member); + } } -void DBMirrorSet::onNetworkMemberOnline(const DB *db,uint64_t networkId,uint64_t memberId,const InetAddress &physicalAddress) +void DBMirrorSet::onNetworkMemberDeauthorize(const void *db,uint64_t networkId,uint64_t memberId) { + _listener->onNetworkMemberDeauthorize(this,networkId,memberId); } } // namespace ZeroTier diff --git a/controller/DBMirrorSet.hpp b/controller/DBMirrorSet.hpp index 1af0018e..aee598af 100644 --- a/controller/DBMirrorSet.hpp +++ b/controller/DBMirrorSet.hpp @@ -32,37 +32,48 @@ #include <vector> #include <memory> #include <mutex> +#include <set> namespace ZeroTier { class DBMirrorSet : public DB::ChangeListener { public: - DBMirrorSet(); + DBMirrorSet(DB::ChangeListener *listener); virtual ~DBMirrorSet(); + bool hasNetwork(const uint64_t networkId) const; + + bool get(const uint64_t networkId,nlohmann::json &network); + bool get(const uint64_t networkId,nlohmann::json &network,const uint64_t memberId,nlohmann::json &member); + bool get(const uint64_t networkId,nlohmann::json &network,const uint64_t memberId,nlohmann::json &member,DB::NetworkSummaryInfo &info); + bool get(const uint64_t networkId,nlohmann::json &network,std::vector<nlohmann::json> &members); + + void networks(std::set<uint64_t> &networks); + bool waitForReady(); bool isReady(); - void save(nlohmann::json &record); + bool save(nlohmann::json &record,bool notifyListeners); void eraseNetwork(const uint64_t networkId); void eraseMember(const uint64_t networkId,const uint64_t memberId); void nodeIsOnline(const uint64_t networkId,const uint64_t memberId,const InetAddress &physicalAddress); // These are called by various DB instances when changes occur. - virtual void onNetworkUpdate(const DB *db,uint64_t networkId,const nlohmann::json &network); - virtual void onNetworkMemberUpdate(const DB *db,uint64_t networkId,uint64_t memberId,const nlohmann::json &member); - virtual void onNetworkMemberDeauthorize(const DB *db,uint64_t networkId,uint64_t memberId); - virtual void onNetworkMemberOnline(const DB *db,uint64_t networkId,uint64_t memberId,const InetAddress &physicalAddress); + virtual void onNetworkUpdate(const void *db,uint64_t networkId,const nlohmann::json &network); + virtual void onNetworkMemberUpdate(const void *db,uint64_t networkId,uint64_t memberId,const nlohmann::json &member); + virtual void onNetworkMemberDeauthorize(const void *db,uint64_t networkId,uint64_t memberId); inline void addDB(const std::shared_ptr<DB> &db) { + db->addListener(this); std::lock_guard<std::mutex> l(_dbs_l); _dbs.push_back(db); } private: + DB::ChangeListener *const _listener; std::vector< std::shared_ptr< DB > > _dbs; - std::mutex _dbs_l; + mutable std::mutex _dbs_l; }; } // namespace ZeroTier diff --git a/controller/EmbeddedNetworkController.cpp b/controller/EmbeddedNetworkController.cpp index d9c6364b..64acda0f 100644 --- a/controller/EmbeddedNetworkController.cpp +++ b/controller/EmbeddedNetworkController.cpp @@ -1188,7 +1188,7 @@ void EmbeddedNetworkController::handleRemoteTrace(const ZT_RemoteTrace &rt) } } -void EmbeddedNetworkController::onNetworkUpdate(const DB *db,uint64_t networkId,const nlohmann::json &network) +void EmbeddedNetworkController::onNetworkUpdate(const void *db,uint64_t networkId,const nlohmann::json &network) { // Send an update to all members of the network that are online const int64_t now = OSUtils::now(); @@ -1199,7 +1199,7 @@ void EmbeddedNetworkController::onNetworkUpdate(const DB *db,uint64_t networkId, } } -void EmbeddedNetworkController::onNetworkMemberUpdate(const DB *db,uint64_t networkId,uint64_t memberId,const nlohmann::json &member) +void EmbeddedNetworkController::onNetworkMemberUpdate(const void *db,uint64_t networkId,uint64_t memberId,const nlohmann::json &member) { // Push update to member if online try { @@ -1210,7 +1210,7 @@ void EmbeddedNetworkController::onNetworkMemberUpdate(const DB *db,uint64_t netw } catch ( ... ) {} } -void EmbeddedNetworkController::onNetworkMemberDeauthorize(const DB *db,uint64_t networkId,uint64_t memberId) +void EmbeddedNetworkController::onNetworkMemberDeauthorize(const void *db,uint64_t networkId,uint64_t memberId) { const int64_t now = OSUtils::now(); Revocation rev((uint32_t)_node->prng(),networkId,0,now,ZT_REVOCATION_FLAG_FAST_PROPAGATE,Address(memberId),Revocation::CREDENTIAL_TYPE_COM); diff --git a/controller/EmbeddedNetworkController.hpp b/controller/EmbeddedNetworkController.hpp index c0f14f8b..85223dc4 100644 --- a/controller/EmbeddedNetworkController.hpp +++ b/controller/EmbeddedNetworkController.hpp @@ -101,9 +101,9 @@ public: void handleRemoteTrace(const ZT_RemoteTrace &rt); - virtual void onNetworkUpdate(const DB *db,uint64_t networkId,const nlohmann::json &network); - virtual void onNetworkMemberUpdate(const DB *db,uint64_t networkId,uint64_t memberId,const nlohmann::json &member); - virtual void onNetworkMemberDeauthorize(const DB *db,uint64_t networkId,uint64_t memberId); + virtual void onNetworkUpdate(const void *db,uint64_t networkId,const nlohmann::json &network); + virtual void onNetworkMemberUpdate(const void *db,uint64_t networkId,uint64_t memberId,const nlohmann::json &member); + virtual void onNetworkMemberDeauthorize(const void *db,uint64_t networkId,uint64_t memberId); private: void _request(uint64_t nwid,const InetAddress &fromAddr,uint64_t requestPacketId,const Identity &identity,const Dictionary<ZT_NETWORKCONFIG_METADATA_DICT_CAPACITY> &metaData); diff --git a/controller/FileDB.cpp b/controller/FileDB.cpp index 66b3d2c2..1dc2498a 100644 --- a/controller/FileDB.cpp +++ b/controller/FileDB.cpp @@ -29,8 +29,9 @@ namespace ZeroTier { -FileDB::FileDB(const Identity &myId,const char *path) : - DB(myId,path), +FileDB::FileDB(const char *path) : + DB(), + _path(path), _networksPath(_path + ZT_PATH_SEPARATOR_S + "network"), _tracePath(_path + ZT_PATH_SEPARATOR_S + "trace"), _running(true) @@ -85,9 +86,10 @@ FileDB::~FileDB() bool FileDB::waitForReady() { return true; } bool FileDB::isReady() { return true; } -void FileDB::save(nlohmann::json &record) +bool FileDB::save(nlohmann::json &record,bool notifyListeners) { char p1[4096],p2[4096],pb[4096]; + bool modified = false; try { const std::string objtype = record["objtype"]; if (objtype == "network") { @@ -101,7 +103,8 @@ void FileDB::save(nlohmann::json &record) OSUtils::ztsnprintf(p1,sizeof(p1),"%s" ZT_PATH_SEPARATOR_S "%.16llx.json",_networksPath.c_str(),nwid); if (!OSUtils::writeFile(p1,OSUtils::jsonDump(record,-1))) fprintf(stderr,"WARNING: controller unable to write to path: %s" ZT_EOL_S,p1); - _networkChanged(old,record,true); + _networkChanged(old,record,notifyListeners); + modified = true; } } @@ -123,12 +126,14 @@ void FileDB::save(nlohmann::json &record) if (!OSUtils::writeFile(p1,OSUtils::jsonDump(record,-1))) fprintf(stderr,"WARNING: controller unable to write to path: %s" ZT_EOL_S,p1); } - _memberChanged(old,record,true); + _memberChanged(old,record,notifyListeners); + modified = true; } } } } catch ( ... ) {} // drop invalid records missing fields + return modified; } void FileDB::eraseNetwork(const uint64_t networkId) @@ -163,15 +168,8 @@ void FileDB::nodeIsOnline(const uint64_t networkId,const uint64_t memberId,const char mid[32],atmp[64]; OSUtils::ztsnprintf(mid,sizeof(mid),"%.10llx",(unsigned long long)memberId); physicalAddress.toString(atmp); - { - std::lock_guard<std::mutex> l(this->_online_l); - this->_online[networkId][memberId][OSUtils::now()] = physicalAddress; - } - { - std::lock_guard<std::mutex> l2(_changeListeners_l); - for(auto i=_changeListeners.begin();i!=_changeListeners.end();++i) - (*i)->onNetworkMemberOnline(this,networkId,memberId,physicalAddress); - } + std::lock_guard<std::mutex> l(this->_online_l); + this->_online[networkId][memberId][OSUtils::now()] = physicalAddress; } } // namespace ZeroTier diff --git a/controller/FileDB.hpp b/controller/FileDB.hpp index deef8854..8aa2c18e 100644 --- a/controller/FileDB.hpp +++ b/controller/FileDB.hpp @@ -35,17 +35,18 @@ namespace ZeroTier class FileDB : public DB { public: - FileDB(const Identity &myId,const char *path); + FileDB(const char *path); virtual ~FileDB(); virtual bool waitForReady(); virtual bool isReady(); - virtual void save(nlohmann::json &record); + virtual bool save(nlohmann::json &record,bool notifyListeners); virtual void eraseNetwork(const uint64_t networkId); virtual void eraseMember(const uint64_t networkId,const uint64_t memberId); virtual void nodeIsOnline(const uint64_t networkId,const uint64_t memberId,const InetAddress &physicalAddress); protected: + std::string _path; std::string _networksPath; std::string _tracePath; std::thread _onlineUpdateThread; diff --git a/controller/LFDB.cpp b/controller/LFDB.cpp index 5bf0aaf7..5dd6d082 100644 --- a/controller/LFDB.cpp +++ b/controller/LFDB.cpp @@ -38,7 +38,7 @@ namespace ZeroTier { LFDB::LFDB(const Identity &myId,const char *path,const char *lfOwnerPrivate,const char *lfOwnerPublic,const char *lfNodeHost,int lfNodePort,bool storeOnlineState) : - DB(myId,path), + DB(), _myId(myId), _lfOwnerPrivate((lfOwnerPrivate) ? lfOwnerPrivate : ""), _lfOwnerPublic((lfOwnerPublic) ? lfOwnerPublic : ""), @@ -335,8 +335,9 @@ bool LFDB::isReady() return (_ready.load()); } -void LFDB::save(nlohmann::json &record) +bool LFDB::save(nlohmann::json &record,bool notifyListeners) { + bool modified = false; const std::string objtype = record["objtype"]; if (objtype == "network") { const uint64_t nwid = OSUtils::jsonIntHex(record["id"],0ULL); @@ -345,11 +346,12 @@ void LFDB::save(nlohmann::json &record) get(nwid,old); if ((!old.is_object())||(old != record)) { record["revision"] = OSUtils::jsonInt(record["revision"],0ULL) + 1ULL; - _networkChanged(old,record,true); + _networkChanged(old,record,notifyListeners); { std::lock_guard<std::mutex> l(_state_l); _state[nwid].dirty = true; } + modified = true; } } } else if (objtype == "member") { @@ -360,14 +362,16 @@ void LFDB::save(nlohmann::json &record) get(nwid,network,id,old); if ((!old.is_object())||(old != record)) { record["revision"] = OSUtils::jsonInt(record["revision"],0ULL) + 1ULL; - _memberChanged(old,record,true); + _memberChanged(old,record,notifyListeners); { std::lock_guard<std::mutex> l(_state_l); _state[nwid].members[id].dirty = true; } + modified = true; } } } + return modified; } void LFDB::eraseNetwork(const uint64_t networkId) @@ -382,24 +386,17 @@ void LFDB::eraseMember(const uint64_t networkId,const uint64_t memberId) void LFDB::nodeIsOnline(const uint64_t networkId,const uint64_t memberId,const InetAddress &physicalAddress) { - { - std::lock_guard<std::mutex> l(_state_l); - auto nw = _state.find(networkId); - if (nw != _state.end()) { - auto m = nw->second.members.find(memberId); - if (m != nw->second.members.end()) { - m->second.lastOnlineTime = OSUtils::now(); - if (physicalAddress) - m->second.lastOnlineAddress = physicalAddress; - m->second.lastOnlineDirty = true; - } + std::lock_guard<std::mutex> l(_state_l); + auto nw = _state.find(networkId); + if (nw != _state.end()) { + auto m = nw->second.members.find(memberId); + if (m != nw->second.members.end()) { + m->second.lastOnlineTime = OSUtils::now(); + if (physicalAddress) + m->second.lastOnlineAddress = physicalAddress; + m->second.lastOnlineDirty = true; } } - { - std::lock_guard<std::mutex> l2(_changeListeners_l); - for(auto i=_changeListeners.begin();i!=_changeListeners.end();++i) - (*i)->onNetworkMemberOnline(this,networkId,memberId,physicalAddress); - } } } // namespace ZeroTier diff --git a/controller/LFDB.hpp b/controller/LFDB.hpp index 646da65d..bcd6cdd0 100644 --- a/controller/LFDB.hpp +++ b/controller/LFDB.hpp @@ -43,7 +43,7 @@ class LFDB : public DB { public: /** - * @param myId Identity of controller node (with secret) + * @param myId This controller's identity * @param path Base path for ZeroTier node itself * @param lfOwnerPrivate LF owner private in PEM format * @param lfOwnerPublic LF owner public in @base62 format @@ -56,7 +56,7 @@ public: virtual bool waitForReady(); virtual bool isReady(); - virtual void save(nlohmann::json &record); + virtual bool save(nlohmann::json &record,bool notifyListeners); virtual void eraseNetwork(const uint64_t networkId); virtual void eraseMember(const uint64_t networkId,const uint64_t memberId); virtual void nodeIsOnline(const uint64_t networkId,const uint64_t memberId,const InetAddress &physicalAddress); diff --git a/controller/PostgreSQL.cpp b/controller/PostgreSQL.cpp index 121d00df..13b7b8d8 100644 --- a/controller/PostgreSQL.cpp +++ b/controller/PostgreSQL.cpp @@ -81,7 +81,9 @@ std::string join(const std::vector<std::string> &elements, const char * const se using namespace ZeroTier; PostgreSQL::PostgreSQL(const Identity &myId, const char *path, int listenPort, MQConfig *mqc) - : DB(myId, path) + : DB() + , _myId(myId) + , _myAddress(myId.address()) , _ready(0) , _connected(1) , _run(1) @@ -89,7 +91,9 @@ PostgreSQL::PostgreSQL(const Identity &myId, const char *path, int listenPort, M , _listenPort(listenPort) , _mqc(mqc) { - _connString = std::string(path) + " application_name=controller_" +_myAddressStr; + char myAddress[64]; + _myAddressStr = myId.address().toString(myAddress); + _connString = std::string(path) + " application_name=controller_" + _myAddressStr; // Database Schema Version Check PGconn *conn = getPgConn(); @@ -165,8 +169,9 @@ bool PostgreSQL::isReady() return ((_ready == 2)&&(_connected)); } -void PostgreSQL::save(nlohmann::json &record) +bool PostgreSQL::save(nlohmann::json &record,bool notifyListeners) { + bool modified = false; try { if (!record.is_object()) return; @@ -178,7 +183,8 @@ void PostgreSQL::save(nlohmann::json &record) get(nwid,old); if ((!old.is_object())||(old != record)) { record["revision"] = OSUtils::jsonInt(record["revision"],0ULL) + 1ULL; - _commitQueue.post(new nlohmann::json(record)); + _commitQueue.post(std::pair<nlohmann::json,bool>(record,notifyListeners)); + modified = true; } } } else if (objtype == "member") { @@ -189,7 +195,8 @@ void PostgreSQL::save(nlohmann::json &record) get(nwid,network,id,old); if ((!old.is_object())||(old != record)) { record["revision"] = OSUtils::jsonInt(record["revision"],0ULL) + 1ULL; - _commitQueue.post(new nlohmann::json(record)); + _commitQueue.post(std::pair<nlohmann::json,bool>(record,notifyListeners)); + modified = true; } } } @@ -210,6 +217,7 @@ void PostgreSQL::save(nlohmann::json &record) } catch (...) { fprintf(stderr, "Unknown error on PostgreSQL::save\n"); } + return modified; } void PostgreSQL::eraseNetwork(const uint64_t networkId) @@ -217,38 +225,33 @@ void PostgreSQL::eraseNetwork(const uint64_t networkId) char tmp2[24]; waitForReady(); Utils::hex(networkId, tmp2); - json *tmp = new json(); - (*tmp)["id"] = tmp2; - (*tmp)["objtype"] = "_delete_network"; + std::pair<nlohmann::json,bool> tmp; + tmp.first["id"] = tmp2; + tmp.first["objtype"] = "_delete_network"; + tmp.second = true; _commitQueue.post(tmp); } void PostgreSQL::eraseMember(const uint64_t networkId, const uint64_t memberId) { char tmp2[24]; - json *tmp = new json(); + std::pair<nlohmann::json,bool> tmp; Utils::hex(networkId, tmp2); - (*tmp)["nwid"] = tmp2; + tmp.first["nwid"] = tmp2; Utils::hex(memberId, tmp2); - (*tmp)["id"] = tmp2; - (*tmp)["objtype"] = "_delete_member"; + tmp.first["id"] = tmp2; + tmp.first["objtype"] = "_delete_member"; + tmp.second = true; _commitQueue.post(tmp); } void PostgreSQL::nodeIsOnline(const uint64_t networkId, const uint64_t memberId, const InetAddress &physicalAddress) { - { - std::lock_guard<std::mutex> l(_lastOnline_l); - std::pair<int64_t, InetAddress> &i = _lastOnline[std::pair<uint64_t,uint64_t>(networkId, memberId)]; - i.first = OSUtils::now(); - if (physicalAddress) { - i.second = physicalAddress; - } - } - { - std::lock_guard<std::mutex> l2(_changeListeners_l); - for(auto i=_changeListeners.begin();i!=_changeListeners.end();++i) - (*i)->onNetworkMemberOnline(this,networkId,memberId,physicalAddress); + std::lock_guard<std::mutex> l(_lastOnline_l); + std::pair<int64_t, InetAddress> &i = _lastOnline[std::pair<uint64_t,uint64_t>(networkId, memberId)]; + i.first = OSUtils::now(); + if (physicalAddress) { + i.second = physicalAddress; } } @@ -868,18 +871,18 @@ void PostgreSQL::commitThread() exit(1); } - json *config = nullptr; - while(_commitQueue.get(config)&(_run == 1)) { - if (!config) { + std::pair<nlohmann::json,bool> qitem; + while(_commitQueue.get(qitem)&(_run == 1)) { + if (!qitem.first.is_object()) { continue; } if (PQstatus(conn) == CONNECTION_BAD) { fprintf(stderr, "ERROR: Connection to database failed: %s\n", PQerrorMessage(conn)); PQfinish(conn); - delete config; exit(1); } - try { + try { + nlohmann::json *config = &(qitem.first); const std::string objtype = (*config)["objtype"]; if (objtype == "member") { try { @@ -1034,10 +1037,10 @@ void PostgreSQL::commitThread() nlohmann::json memOrig; nlohmann::json memNew(*config); - + get(nwidInt, nwOrig, memberidInt, memOrig); - _memberChanged(memOrig, memNew, (this->_ready>=2)); + _memberChanged(memOrig, memNew, qitem.second); } else { fprintf(stderr, "Can't notify of change. Error parsing nwid or memberid: %lu-%lu\n", nwidInt, memberidInt); } @@ -1260,7 +1263,7 @@ void PostgreSQL::commitThread() get(nwidInt, nwOrig); - _networkChanged(nwOrig, nwNew, true); + _networkChanged(nwOrig, nwNew, qitem.second); } else { fprintf(stderr, "Can't notify network changed: %lu\n", nwidInt); } @@ -1268,8 +1271,6 @@ void PostgreSQL::commitThread() } catch (std::exception &e) { fprintf(stderr, "ERROR: Error updating member: %s\n", e.what()); } - } else if (objtype == "trace") { - fprintf(stderr, "ERROR: Trace not yet implemented"); } else if (objtype == "_delete_network") { try { std::string networkId = (*config)["nwid"]; @@ -1326,8 +1327,6 @@ void PostgreSQL::commitThread() } catch (std::exception &e) { fprintf(stderr, "ERROR: Error getting objtype: %s\n", e.what()); } - delete config; - config = nullptr; std::this_thread::sleep_for(std::chrono::milliseconds(10)); } diff --git a/controller/PostgreSQL.hpp b/controller/PostgreSQL.hpp index ce6fb242..6b0ea996 100644 --- a/controller/PostgreSQL.hpp +++ b/controller/PostgreSQL.hpp @@ -55,7 +55,7 @@ public: virtual bool waitForReady(); virtual bool isReady(); - virtual void save(nlohmann::json &record); + virtual bool save(nlohmann::json &record,bool notifyListeners); virtual void eraseNetwork(const uint64_t networkId); virtual void eraseMember(const uint64_t networkId, const uint64_t memberId); virtual void nodeIsOnline(const uint64_t networkId, const uint64_t memberId, const InetAddress &physicalAddress); @@ -87,9 +87,12 @@ private: PGconn * getPgConn( OverrideMode m = ALLOW_PGBOUNCER_OVERRIDE ); + const Identity _myId; + const Address _myAddress; + std::string _myAddressStr; std::string _connString; - BlockingQueue<nlohmann::json *> _commitQueue; + BlockingQueue< std::pair<nlohmann::json,bool> > _commitQueue; std::thread _heartbeatThread; std::thread _membersDbWatcher; |