diff options
author | Adam Ierymenko <adam.ierymenko@gmail.com> | 2019-07-26 17:39:00 -0700 |
---|---|---|
committer | Adam Ierymenko <adam.ierymenko@gmail.com> | 2019-07-26 17:39:00 -0700 |
commit | f6b080b8a2c071266270de23c7b99722b2cef21a (patch) | |
tree | 1fbfdc4d07b1061a58b1eb64a6baa47082d0acc2 | |
parent | c8c33db1d12359e38dbe0cae80143963ea0b8c8a (diff) | |
download | infinitytier-f6b080b8a2c071266270de23c7b99722b2cef21a.tar.gz infinitytier-f6b080b8a2c071266270de23c7b99722b2cef21a.zip |
Abstract out change listener from controller itself to permit DBs to shadow changes from other DBs.
-rw-r--r-- | controller/DB.cpp | 47 | ||||
-rw-r--r-- | controller/DB.hpp | 47 | ||||
-rw-r--r-- | controller/EmbeddedNetworkController.cpp | 12 | ||||
-rw-r--r-- | controller/EmbeddedNetworkController.hpp | 9 | ||||
-rw-r--r-- | controller/FileDB.cpp | 4 | ||||
-rw-r--r-- | controller/FileDB.hpp | 2 | ||||
-rw-r--r-- | controller/LFDB.cpp | 27 | ||||
-rw-r--r-- | controller/LFDB.hpp | 4 | ||||
-rw-r--r-- | controller/PostgreSQL.cpp | 4 | ||||
-rw-r--r-- | controller/PostgreSQL.hpp | 6 |
10 files changed, 88 insertions, 74 deletions
diff --git a/controller/DB.cpp b/controller/DB.cpp index bdd37c3b..bb734dc8 100644 --- a/controller/DB.cpp +++ b/controller/DB.cpp @@ -104,8 +104,7 @@ void DB::cleanMember(nlohmann::json &member) member.erase("lastRequestMetaData"); } -DB::DB(EmbeddedNetworkController *const nc,const Identity &myId,const char *path) : - _controller(nc), +DB::DB(const Identity &myId,const char *path) : _myId(myId), _myAddress(myId.address()), _path((path) ? path : "") @@ -115,9 +114,7 @@ DB::DB(EmbeddedNetworkController *const nc,const Identity &myId,const char *path _myAddressStr = tmp; } -DB::~DB() -{ -} +DB::~DB() {} bool DB::get(const uint64_t networkId,nlohmann::json &network) { @@ -229,7 +226,7 @@ void DB::networks(std::vector<uint64_t> &networks) networks.push_back(n->first); } -void DB::_memberChanged(nlohmann::json &old,nlohmann::json &memberConfig,bool push) +void DB::_memberChanged(nlohmann::json &old,nlohmann::json &memberConfig,bool initialized) { uint64_t memberId = 0; uint64_t networkId = 0; @@ -313,8 +310,12 @@ void DB::_memberChanged(nlohmann::json &old,nlohmann::json &memberConfig,bool pu } } - if (push) - _controller->onNetworkMemberUpdate(networkId,memberId); + if (initialized) { + std::lock_guard<std::mutex> ll(_changeListeners_l); + for(auto i=_changeListeners.begin();i!=_changeListeners.end();++i) { + (*i)->onNetworkMemberUpdate(networkId,memberId,memberConfig); + } + } } else if (memberId) { if (nw) { std::lock_guard<std::mutex> l(nw->lock); @@ -332,20 +333,24 @@ void DB::_memberChanged(nlohmann::json &old,nlohmann::json &memberConfig,bool pu } } - if ((push)&&((wasAuth)&&(!isAuth)&&(networkId)&&(memberId))) - _controller->onNetworkMemberDeauthorize(networkId,memberId); + if ((initialized)&&((wasAuth)&&(!isAuth)&&(networkId)&&(memberId))) { + std::lock_guard<std::mutex> ll(_changeListeners_l); + for(auto i=_changeListeners.begin();i!=_changeListeners.end();++i) { + (*i)->onNetworkMemberDeauthorize(networkId,memberId); + } + } } -void DB::_networkChanged(nlohmann::json &old,nlohmann::json &networkConfig,bool push) +void DB::_networkChanged(nlohmann::json &old,nlohmann::json &networkConfig,bool initialized) { if (networkConfig.is_object()) { const std::string ids = networkConfig["id"]; - const uint64_t id = Utils::hexStrToU64(ids.c_str()); - if (id) { + const uint64_t networkId = Utils::hexStrToU64(ids.c_str()); + if (networkId) { std::shared_ptr<_Network> nw; { std::lock_guard<std::mutex> l(_networks_l); - std::shared_ptr<_Network> &nw2 = _networks[id]; + std::shared_ptr<_Network> &nw2 = _networks[networkId]; if (!nw2) nw2.reset(new _Network); nw = nw2; @@ -354,15 +359,19 @@ void DB::_networkChanged(nlohmann::json &old,nlohmann::json &networkConfig,bool std::lock_guard<std::mutex> l2(nw->lock); nw->config = networkConfig; } - if (push) - _controller->onNetworkUpdate(id); + if (initialized) { + std::lock_guard<std::mutex> ll(_changeListeners_l); + for(auto i=_changeListeners.begin();i!=_changeListeners.end();++i) { + (*i)->onNetworkUpdate(networkId,networkConfig); + } + } } } else if (old.is_object()) { const std::string ids = old["id"]; - const uint64_t id = Utils::hexStrToU64(ids.c_str()); - if (id) { + const uint64_t networkId = Utils::hexStrToU64(ids.c_str()); + if (networkId) { std::lock_guard<std::mutex> l(_networks_l); - _networks.erase(id); + _networks.erase(networkId); } } } diff --git a/controller/DB.hpp b/controller/DB.hpp index e59230e1..f499d387 100644 --- a/controller/DB.hpp +++ b/controller/DB.hpp @@ -47,14 +47,22 @@ namespace ZeroTier { -class EmbeddedNetworkController; - /** * Base class with common infrastructure for all controller DB implementations */ class DB { public: + class ChangeListener + { + public: + ChangeListener() {} + virtual ~ChangeListener() {} + virtual void onNetworkUpdate(uint64_t networkId,const nlohmann::json &network) {} + virtual void onNetworkMemberUpdate(uint64_t networkId,uint64_t memberId,const nlohmann::json &member) {} + virtual void onNetworkMemberDeauthorize(uint64_t networkId,uint64_t memberId) {} + }; + struct NetworkSummaryInfo { NetworkSummaryInfo() : authorizedMemberCount(0),totalMemberCount(0),mostRecentDeauthTime(0) {} @@ -65,27 +73,12 @@ public: int64_t mostRecentDeauthTime; }; - /** - * Ensure that all network fields are present - */ static void initNetwork(nlohmann::json &network); - - /** - * Ensure that all member fields are present - */ static void initMember(nlohmann::json &member); - - /** - * Remove old and temporary network fields - */ static void cleanNetwork(nlohmann::json &network); - - /** - * Remove old and temporary member fields - */ static void cleanMember(nlohmann::json &member); - DB(EmbeddedNetworkController *const nc,const Identity &myId,const char *path); + DB(const Identity &myId,const char *path); virtual ~DB(); virtual bool waitForReady() = 0; @@ -101,19 +94,20 @@ public: bool get(const uint64_t networkId,nlohmann::json &network,const uint64_t memberId,nlohmann::json &member); bool get(const uint64_t networkId,nlohmann::json &network,const uint64_t memberId,nlohmann::json &member,NetworkSummaryInfo &info); bool get(const uint64_t networkId,nlohmann::json &network,std::vector<nlohmann::json> &members); - bool summary(const uint64_t networkId,NetworkSummaryInfo &info); - void networks(std::vector<uint64_t> &networks); virtual void save(nlohmann::json *orig,nlohmann::json &record) = 0; - virtual void eraseNetwork(const uint64_t networkId) = 0; - virtual void eraseMember(const uint64_t networkId,const uint64_t memberId) = 0; - virtual void nodeIsOnline(const uint64_t networkId,const uint64_t memberId,const InetAddress &physicalAddress) = 0; + inline void addListener(DB::ChangeListener *const listener) + { + std::lock_guard<std::mutex> l(_changeListeners_l); + _changeListeners.push_back(listener); + } + protected: struct _Network { @@ -127,18 +121,19 @@ protected: std::mutex lock; }; - void _memberChanged(nlohmann::json &old,nlohmann::json &memberConfig,bool push); - void _networkChanged(nlohmann::json &old,nlohmann::json &networkConfig,bool push); + void _memberChanged(nlohmann::json &old,nlohmann::json &memberConfig,bool initialized); + void _networkChanged(nlohmann::json &old,nlohmann::json &networkConfig,bool initialized); void _fillSummaryInfo(const std::shared_ptr<_Network> &nw,NetworkSummaryInfo &info); - EmbeddedNetworkController *const _controller; const Identity _myId; const Address _myAddress; const std::string _path; std::string _myAddressStr; + std::vector<DB::ChangeListener *> _changeListeners; std::unordered_map< uint64_t,std::shared_ptr<_Network> > _networks; std::unordered_multimap< uint64_t,uint64_t > _networkByMember; + mutable std::mutex _changeListeners_l; mutable std::mutex _networks_l; }; diff --git a/controller/EmbeddedNetworkController.cpp b/controller/EmbeddedNetworkController.cpp index 3ebdccf9..bf568527 100644 --- a/controller/EmbeddedNetworkController.cpp +++ b/controller/EmbeddedNetworkController.cpp @@ -496,7 +496,7 @@ void EmbeddedNetworkController::init(const Identity &signingId,Sender *sender) #ifdef ZT_CONTROLLER_USE_LIBPQ if ((_path.length() > 9)&&(_path.substr(0,9) == "postgres:")) { - _db.reset(new PostgreSQL(this,_signingId,_path.substr(9).c_str(), _listenPort, _mqc)); + _db.reset(new PostgreSQL(_signingId,_path.substr(9).c_str(), _listenPort, _mqc)); } else { #endif @@ -521,7 +521,7 @@ void EmbeddedNetworkController::init(const Identity &signingId,Sender *sender) std::size_t pubHdrEnd = lfOwnerPublic.find_first_of("\n\r\t "); if (pubHdrEnd != std::string::npos) { lfOwnerPublic = lfOwnerPublic.substr(0,pubHdrEnd); - _db.reset(new LFDB(this,_signingId,_path.c_str(),lfOwner.c_str(),lfOwnerPublic.c_str(),lfHost.c_str(),lfPort,storeOnlineState)); + _db.reset(new LFDB(_signingId,_path.c_str(),lfOwner.c_str(),lfOwnerPublic.c_str(),lfHost.c_str(),lfPort,storeOnlineState)); } } } @@ -530,7 +530,9 @@ void EmbeddedNetworkController::init(const Identity &signingId,Sender *sender) } } if (!_db) - _db.reset(new FileDB(this,_signingId,_path.c_str())); + _db.reset(new FileDB(_signingId,_path.c_str())); + + _db->addListener(this); #ifdef ZT_CONTROLLER_USE_LIBPQ } @@ -1188,7 +1190,7 @@ void EmbeddedNetworkController::handleRemoteTrace(const ZT_RemoteTrace &rt) } } -void EmbeddedNetworkController::onNetworkUpdate(const uint64_t networkId) +void EmbeddedNetworkController::onNetworkUpdate(const uint64_t networkId,const nlohmann::json &network) { // Send an update to all members of the network that are online const int64_t now = OSUtils::now(); @@ -1199,7 +1201,7 @@ void EmbeddedNetworkController::onNetworkUpdate(const uint64_t networkId) } } -void EmbeddedNetworkController::onNetworkMemberUpdate(const uint64_t networkId,const uint64_t memberId) +void EmbeddedNetworkController::onNetworkMemberUpdate(const uint64_t networkId,const uint64_t memberId,const nlohmann::json &member) { // Push update to member if online try { diff --git a/controller/EmbeddedNetworkController.hpp b/controller/EmbeddedNetworkController.hpp index 6ce0b5cf..7bc37be2 100644 --- a/controller/EmbeddedNetworkController.hpp +++ b/controller/EmbeddedNetworkController.hpp @@ -58,7 +58,7 @@ class Node; struct MQConfig; -class EmbeddedNetworkController : public NetworkController +class EmbeddedNetworkController : public NetworkController,public DB::ChangeListener { public: /** @@ -101,10 +101,9 @@ public: void handleRemoteTrace(const ZT_RemoteTrace &rt); - // Called on update via POST or by JSONDB on external update of network or network member records - void onNetworkUpdate(const uint64_t networkId); - void onNetworkMemberUpdate(const uint64_t networkId,const uint64_t memberId); - void onNetworkMemberDeauthorize(const uint64_t networkId,const uint64_t memberId); + virtual void onNetworkUpdate(const uint64_t networkId,const nlohmann::json &network); + virtual void onNetworkMemberUpdate(const uint64_t networkId,const uint64_t memberId,const nlohmann::json &member); + virtual void onNetworkMemberDeauthorize(const uint64_t networkId,const uint64_t memberId); private: void _request(uint64_t nwid,const InetAddress &fromAddr,uint64_t requestPacketId,const Identity &identity,const Dictionary<ZT_NETWORKCONFIG_METADATA_DICT_CAPACITY> &metaData); diff --git a/controller/FileDB.cpp b/controller/FileDB.cpp index eb2ec00d..7b997c49 100644 --- a/controller/FileDB.cpp +++ b/controller/FileDB.cpp @@ -29,8 +29,8 @@ namespace ZeroTier { -FileDB::FileDB(EmbeddedNetworkController *const nc,const Identity &myId,const char *path) : - DB(nc,myId,path), +FileDB::FileDB(const Identity &myId,const char *path) : + DB(myId,path), _networksPath(_path + ZT_PATH_SEPARATOR_S + "network"), _tracePath(_path + ZT_PATH_SEPARATOR_S + "trace"), _onlineChanged(false), diff --git a/controller/FileDB.hpp b/controller/FileDB.hpp index 0a8b9d2e..5d55d0a4 100644 --- a/controller/FileDB.hpp +++ b/controller/FileDB.hpp @@ -35,7 +35,7 @@ namespace ZeroTier class FileDB : public DB { public: - FileDB(EmbeddedNetworkController *const nc,const Identity &myId,const char *path); + FileDB(const Identity &myId,const char *path); virtual ~FileDB(); virtual bool waitForReady(); diff --git a/controller/LFDB.cpp b/controller/LFDB.cpp index 5c5b687d..1f7f701a 100644 --- a/controller/LFDB.cpp +++ b/controller/LFDB.cpp @@ -37,9 +37,8 @@ namespace ZeroTier { -LFDB::LFDB(EmbeddedNetworkController *const nc,const Identity &myId,const char *path,const char *lfOwnerPrivate,const char *lfOwnerPublic,const char *lfNodeHost,int lfNodePort,bool storeOnlineState) : - DB(nc,myId,path), - _nc(nc), +LFDB::LFDB(const Identity &myId,const char *path,const char *lfOwnerPrivate,const char *lfOwnerPublic,const char *lfNodeHost,int lfNodePort,bool storeOnlineState) : + DB(myId,path), _myId(myId), _lfOwnerPrivate((lfOwnerPrivate) ? lfOwnerPrivate : ""), _lfOwnerPublic((lfOwnerPublic) ? lfOwnerPublic : ""), @@ -54,7 +53,7 @@ LFDB::LFDB(EmbeddedNetworkController *const nc,const Identity &myId,const char * const uint64_t controllerAddressInt = _myId.address().toInt(); _myId.address().toString(controllerAddress); std::string networksSelectorName("com.zerotier.controller.lfdb:"); networksSelectorName.append(controllerAddress); networksSelectorName.append("/network"); - std::string membersSelectorName("com.zerotier.controller.lfdb:"); membersSelectorName.append(controllerAddress); membersSelectorName.append("/network/member"); + std::string membersSelectorName("com.zerotier.controller.lfdb:"); membersSelectorName.append(controllerAddress); membersSelectorName.append("/member"); httplib::Client htcli(_lfNodeHost.c_str(),_lfNodePort,600); int64_t timeRangeStart = 0; @@ -90,10 +89,10 @@ LFDB::LFDB(EmbeddedNetworkController *const nc,const Identity &myId,const char * for(auto ms=ns->second.members.begin();ms!=ns->second.members.end();++ms) { if ((_storeOnlineState)&&(ms->second.lastOnlineDirty)&&(ms->second.lastOnlineAddress)) { + nlohmann::json newrec,selector0,selector1,selectors,ip; char tmp[1024],tmp2[128]; OSUtils::ztsnprintf(tmp,sizeof(tmp),"com.zerotier.controller.lfdb:%s/network/%.16llx/online",controllerAddress,(unsigned long long)ns->first); ms->second.lastOnlineAddress.toIpString(tmp2); - nlohmann::json newrec,selector0,selector1,selectors; selector0["Name"] = tmp; selector0["Ordinal"] = ms->first; selector1["Name"] = tmp2; @@ -101,7 +100,21 @@ LFDB::LFDB(EmbeddedNetworkController *const nc,const Identity &myId,const char * selectors.push_back(selector0); selectors.push_back(selector1); newrec["Selectors"] = selectors; - newrec["Value"] = tmp2; + const uint8_t *const rawip = (const uint8_t *)ms->second.lastOnlineAddress.rawIpData(); + switch(ms->second.lastOnlineAddress) { + case AF_INET: + for(int j=0;j<4;++j) + ip.push_back((unsigned int)rawip[j]); + break; + case AF_INET6: + for(int j=0;j<16;++j) + ip.push_back((unsigned int)rawip[j]); + break; + default: + ip = tmp2; // should never happen since only IP transport is currently supported + break; + } + newrec["Value"] = ip; newrec["OwnerPrivate"] = _lfOwnerPrivate; newrec["MaskingKey"] = controllerAddress; newrec["Timestamp"] = ms->second.lastOnlineTime; @@ -112,7 +125,7 @@ LFDB::LFDB(EmbeddedNetworkController *const nc,const Identity &myId,const char * ms->second.lastOnlineDirty = false; printf("SET member online %.16llx %.10llx %s\n",ns->first,ms->first,resp->body.c_str()); } else { - fprintf(stderr,"ERROR: LFDB: %d from node (create/update member): %s" ZT_EOL_S,resp->status,resp->body.c_str()); + fprintf(stderr,"ERROR: LFDB: %d from node (create/update member online status): %s" ZT_EOL_S,resp->status,resp->body.c_str()); } } else { fprintf(stderr,"ERROR: LFDB: node is offline" ZT_EOL_S); diff --git a/controller/LFDB.hpp b/controller/LFDB.hpp index 6659c0bf..73187462 100644 --- a/controller/LFDB.hpp +++ b/controller/LFDB.hpp @@ -43,7 +43,6 @@ class LFDB : public DB { public: /** - * @param nc Network controller * @param myId Identity of controller node (with secret) * @param path Base path for ZeroTier node itself * @param lfOwnerPrivate LF owner private in PEM format @@ -52,7 +51,7 @@ public: * @param lfNodePort LF node http (not https) port * @param storeOnlineState If true, store online/offline state and IP info in LF (a lot of data, only for private networks!) */ - LFDB(EmbeddedNetworkController *const nc,const Identity &myId,const char *path,const char *lfOwnerPrivate,const char *lfOwnerPublic,const char *lfNodeHost,int lfNodePort,bool storeOnlineState); + LFDB(const Identity &myId,const char *path,const char *lfOwnerPrivate,const char *lfOwnerPublic,const char *lfNodeHost,int lfNodePort,bool storeOnlineState); virtual ~LFDB(); virtual bool waitForReady(); @@ -63,7 +62,6 @@ public: virtual void nodeIsOnline(const uint64_t networkId,const uint64_t memberId,const InetAddress &physicalAddress); protected: - EmbeddedNetworkController *const _nc; const Identity _myId; std::string _lfOwnerPrivate,_lfOwnerPublic; diff --git a/controller/PostgreSQL.cpp b/controller/PostgreSQL.cpp index 594591bd..c6b9ada4 100644 --- a/controller/PostgreSQL.cpp +++ b/controller/PostgreSQL.cpp @@ -77,8 +77,8 @@ std::string join(const std::vector<std::string> &elements, const char * const se using namespace ZeroTier; -PostgreSQL::PostgreSQL(EmbeddedNetworkController *const nc, const Identity &myId, const char *path, int listenPort, MQConfig *mqc) - : DB(nc, myId, path) +PostgreSQL::PostgreSQL(const Identity &myId, const char *path, int listenPort, MQConfig *mqc) + : DB(myId, path) , _ready(0) , _connected(1) , _run(1) diff --git a/controller/PostgreSQL.hpp b/controller/PostgreSQL.hpp index 779d47bd..f35f89fc 100644 --- a/controller/PostgreSQL.hpp +++ b/controller/PostgreSQL.hpp @@ -51,7 +51,7 @@ struct MQConfig; class PostgreSQL : public DB { public: - PostgreSQL(EmbeddedNetworkController *const nc, const Identity &myId, const char *path, int listenPort, MQConfig *mqc = NULL); + PostgreSQL(const Identity &myId, const char *path, int listenPort, MQConfig *mqc = NULL); virtual ~PostgreSQL(); virtual bool waitForReady(); @@ -78,7 +78,6 @@ private: void _networksWatcher_Postgres(PGconn *conn); void _networksWatcher_RabbitMQ(); - void commitThread(); void onlineNotificationThread(); @@ -93,7 +92,6 @@ private: BlockingQueue<nlohmann::json *> _commitQueue; - std::thread _heartbeatThread; std::thread _membersDbWatcher; std::thread _networksDbWatcher; @@ -116,4 +114,4 @@ private: #endif // ZT_CONTROLLER_LIBPQ_HPP -#endif // ZT_CONTROLLER_USE_LIBPQ
\ No newline at end of file +#endif // ZT_CONTROLLER_USE_LIBPQ |