summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAdam Ierymenko <adam.ierymenko@gmail.com>2019-07-26 17:39:00 -0700
committerAdam Ierymenko <adam.ierymenko@gmail.com>2019-07-26 17:39:00 -0700
commitf6b080b8a2c071266270de23c7b99722b2cef21a (patch)
tree1fbfdc4d07b1061a58b1eb64a6baa47082d0acc2
parentc8c33db1d12359e38dbe0cae80143963ea0b8c8a (diff)
downloadinfinitytier-f6b080b8a2c071266270de23c7b99722b2cef21a.tar.gz
infinitytier-f6b080b8a2c071266270de23c7b99722b2cef21a.zip
Abstract out change listener from controller itself to permit DBs to shadow changes from other DBs.
-rw-r--r--controller/DB.cpp47
-rw-r--r--controller/DB.hpp47
-rw-r--r--controller/EmbeddedNetworkController.cpp12
-rw-r--r--controller/EmbeddedNetworkController.hpp9
-rw-r--r--controller/FileDB.cpp4
-rw-r--r--controller/FileDB.hpp2
-rw-r--r--controller/LFDB.cpp27
-rw-r--r--controller/LFDB.hpp4
-rw-r--r--controller/PostgreSQL.cpp4
-rw-r--r--controller/PostgreSQL.hpp6
10 files changed, 88 insertions, 74 deletions
diff --git a/controller/DB.cpp b/controller/DB.cpp
index bdd37c3b..bb734dc8 100644
--- a/controller/DB.cpp
+++ b/controller/DB.cpp
@@ -104,8 +104,7 @@ void DB::cleanMember(nlohmann::json &member)
member.erase("lastRequestMetaData");
}
-DB::DB(EmbeddedNetworkController *const nc,const Identity &myId,const char *path) :
- _controller(nc),
+DB::DB(const Identity &myId,const char *path) :
_myId(myId),
_myAddress(myId.address()),
_path((path) ? path : "")
@@ -115,9 +114,7 @@ DB::DB(EmbeddedNetworkController *const nc,const Identity &myId,const char *path
_myAddressStr = tmp;
}
-DB::~DB()
-{
-}
+DB::~DB() {}
bool DB::get(const uint64_t networkId,nlohmann::json &network)
{
@@ -229,7 +226,7 @@ void DB::networks(std::vector<uint64_t> &networks)
networks.push_back(n->first);
}
-void DB::_memberChanged(nlohmann::json &old,nlohmann::json &memberConfig,bool push)
+void DB::_memberChanged(nlohmann::json &old,nlohmann::json &memberConfig,bool initialized)
{
uint64_t memberId = 0;
uint64_t networkId = 0;
@@ -313,8 +310,12 @@ void DB::_memberChanged(nlohmann::json &old,nlohmann::json &memberConfig,bool pu
}
}
- if (push)
- _controller->onNetworkMemberUpdate(networkId,memberId);
+ if (initialized) {
+ std::lock_guard<std::mutex> ll(_changeListeners_l);
+ for(auto i=_changeListeners.begin();i!=_changeListeners.end();++i) {
+ (*i)->onNetworkMemberUpdate(networkId,memberId,memberConfig);
+ }
+ }
} else if (memberId) {
if (nw) {
std::lock_guard<std::mutex> l(nw->lock);
@@ -332,20 +333,24 @@ void DB::_memberChanged(nlohmann::json &old,nlohmann::json &memberConfig,bool pu
}
}
- if ((push)&&((wasAuth)&&(!isAuth)&&(networkId)&&(memberId)))
- _controller->onNetworkMemberDeauthorize(networkId,memberId);
+ if ((initialized)&&((wasAuth)&&(!isAuth)&&(networkId)&&(memberId))) {
+ std::lock_guard<std::mutex> ll(_changeListeners_l);
+ for(auto i=_changeListeners.begin();i!=_changeListeners.end();++i) {
+ (*i)->onNetworkMemberDeauthorize(networkId,memberId);
+ }
+ }
}
-void DB::_networkChanged(nlohmann::json &old,nlohmann::json &networkConfig,bool push)
+void DB::_networkChanged(nlohmann::json &old,nlohmann::json &networkConfig,bool initialized)
{
if (networkConfig.is_object()) {
const std::string ids = networkConfig["id"];
- const uint64_t id = Utils::hexStrToU64(ids.c_str());
- if (id) {
+ const uint64_t networkId = Utils::hexStrToU64(ids.c_str());
+ if (networkId) {
std::shared_ptr<_Network> nw;
{
std::lock_guard<std::mutex> l(_networks_l);
- std::shared_ptr<_Network> &nw2 = _networks[id];
+ std::shared_ptr<_Network> &nw2 = _networks[networkId];
if (!nw2)
nw2.reset(new _Network);
nw = nw2;
@@ -354,15 +359,19 @@ void DB::_networkChanged(nlohmann::json &old,nlohmann::json &networkConfig,bool
std::lock_guard<std::mutex> l2(nw->lock);
nw->config = networkConfig;
}
- if (push)
- _controller->onNetworkUpdate(id);
+ if (initialized) {
+ std::lock_guard<std::mutex> ll(_changeListeners_l);
+ for(auto i=_changeListeners.begin();i!=_changeListeners.end();++i) {
+ (*i)->onNetworkUpdate(networkId,networkConfig);
+ }
+ }
}
} else if (old.is_object()) {
const std::string ids = old["id"];
- const uint64_t id = Utils::hexStrToU64(ids.c_str());
- if (id) {
+ const uint64_t networkId = Utils::hexStrToU64(ids.c_str());
+ if (networkId) {
std::lock_guard<std::mutex> l(_networks_l);
- _networks.erase(id);
+ _networks.erase(networkId);
}
}
}
diff --git a/controller/DB.hpp b/controller/DB.hpp
index e59230e1..f499d387 100644
--- a/controller/DB.hpp
+++ b/controller/DB.hpp
@@ -47,14 +47,22 @@
namespace ZeroTier
{
-class EmbeddedNetworkController;
-
/**
* Base class with common infrastructure for all controller DB implementations
*/
class DB
{
public:
+ class ChangeListener
+ {
+ public:
+ ChangeListener() {}
+ virtual ~ChangeListener() {}
+ virtual void onNetworkUpdate(uint64_t networkId,const nlohmann::json &network) {}
+ virtual void onNetworkMemberUpdate(uint64_t networkId,uint64_t memberId,const nlohmann::json &member) {}
+ virtual void onNetworkMemberDeauthorize(uint64_t networkId,uint64_t memberId) {}
+ };
+
struct NetworkSummaryInfo
{
NetworkSummaryInfo() : authorizedMemberCount(0),totalMemberCount(0),mostRecentDeauthTime(0) {}
@@ -65,27 +73,12 @@ public:
int64_t mostRecentDeauthTime;
};
- /**
- * Ensure that all network fields are present
- */
static void initNetwork(nlohmann::json &network);
-
- /**
- * Ensure that all member fields are present
- */
static void initMember(nlohmann::json &member);
-
- /**
- * Remove old and temporary network fields
- */
static void cleanNetwork(nlohmann::json &network);
-
- /**
- * Remove old and temporary member fields
- */
static void cleanMember(nlohmann::json &member);
- DB(EmbeddedNetworkController *const nc,const Identity &myId,const char *path);
+ DB(const Identity &myId,const char *path);
virtual ~DB();
virtual bool waitForReady() = 0;
@@ -101,19 +94,20 @@ public:
bool get(const uint64_t networkId,nlohmann::json &network,const uint64_t memberId,nlohmann::json &member);
bool get(const uint64_t networkId,nlohmann::json &network,const uint64_t memberId,nlohmann::json &member,NetworkSummaryInfo &info);
bool get(const uint64_t networkId,nlohmann::json &network,std::vector<nlohmann::json> &members);
-
bool summary(const uint64_t networkId,NetworkSummaryInfo &info);
-
void networks(std::vector<uint64_t> &networks);
virtual void save(nlohmann::json *orig,nlohmann::json &record) = 0;
-
virtual void eraseNetwork(const uint64_t networkId) = 0;
-
virtual void eraseMember(const uint64_t networkId,const uint64_t memberId) = 0;
-
virtual void nodeIsOnline(const uint64_t networkId,const uint64_t memberId,const InetAddress &physicalAddress) = 0;
+ inline void addListener(DB::ChangeListener *const listener)
+ {
+ std::lock_guard<std::mutex> l(_changeListeners_l);
+ _changeListeners.push_back(listener);
+ }
+
protected:
struct _Network
{
@@ -127,18 +121,19 @@ protected:
std::mutex lock;
};
- void _memberChanged(nlohmann::json &old,nlohmann::json &memberConfig,bool push);
- void _networkChanged(nlohmann::json &old,nlohmann::json &networkConfig,bool push);
+ void _memberChanged(nlohmann::json &old,nlohmann::json &memberConfig,bool initialized);
+ void _networkChanged(nlohmann::json &old,nlohmann::json &networkConfig,bool initialized);
void _fillSummaryInfo(const std::shared_ptr<_Network> &nw,NetworkSummaryInfo &info);
- EmbeddedNetworkController *const _controller;
const Identity _myId;
const Address _myAddress;
const std::string _path;
std::string _myAddressStr;
+ std::vector<DB::ChangeListener *> _changeListeners;
std::unordered_map< uint64_t,std::shared_ptr<_Network> > _networks;
std::unordered_multimap< uint64_t,uint64_t > _networkByMember;
+ mutable std::mutex _changeListeners_l;
mutable std::mutex _networks_l;
};
diff --git a/controller/EmbeddedNetworkController.cpp b/controller/EmbeddedNetworkController.cpp
index 3ebdccf9..bf568527 100644
--- a/controller/EmbeddedNetworkController.cpp
+++ b/controller/EmbeddedNetworkController.cpp
@@ -496,7 +496,7 @@ void EmbeddedNetworkController::init(const Identity &signingId,Sender *sender)
#ifdef ZT_CONTROLLER_USE_LIBPQ
if ((_path.length() > 9)&&(_path.substr(0,9) == "postgres:")) {
- _db.reset(new PostgreSQL(this,_signingId,_path.substr(9).c_str(), _listenPort, _mqc));
+ _db.reset(new PostgreSQL(_signingId,_path.substr(9).c_str(), _listenPort, _mqc));
} else {
#endif
@@ -521,7 +521,7 @@ void EmbeddedNetworkController::init(const Identity &signingId,Sender *sender)
std::size_t pubHdrEnd = lfOwnerPublic.find_first_of("\n\r\t ");
if (pubHdrEnd != std::string::npos) {
lfOwnerPublic = lfOwnerPublic.substr(0,pubHdrEnd);
- _db.reset(new LFDB(this,_signingId,_path.c_str(),lfOwner.c_str(),lfOwnerPublic.c_str(),lfHost.c_str(),lfPort,storeOnlineState));
+ _db.reset(new LFDB(_signingId,_path.c_str(),lfOwner.c_str(),lfOwnerPublic.c_str(),lfHost.c_str(),lfPort,storeOnlineState));
}
}
}
@@ -530,7 +530,9 @@ void EmbeddedNetworkController::init(const Identity &signingId,Sender *sender)
}
}
if (!_db)
- _db.reset(new FileDB(this,_signingId,_path.c_str()));
+ _db.reset(new FileDB(_signingId,_path.c_str()));
+
+ _db->addListener(this);
#ifdef ZT_CONTROLLER_USE_LIBPQ
}
@@ -1188,7 +1190,7 @@ void EmbeddedNetworkController::handleRemoteTrace(const ZT_RemoteTrace &rt)
}
}
-void EmbeddedNetworkController::onNetworkUpdate(const uint64_t networkId)
+void EmbeddedNetworkController::onNetworkUpdate(const uint64_t networkId,const nlohmann::json &network)
{
// Send an update to all members of the network that are online
const int64_t now = OSUtils::now();
@@ -1199,7 +1201,7 @@ void EmbeddedNetworkController::onNetworkUpdate(const uint64_t networkId)
}
}
-void EmbeddedNetworkController::onNetworkMemberUpdate(const uint64_t networkId,const uint64_t memberId)
+void EmbeddedNetworkController::onNetworkMemberUpdate(const uint64_t networkId,const uint64_t memberId,const nlohmann::json &member)
{
// Push update to member if online
try {
diff --git a/controller/EmbeddedNetworkController.hpp b/controller/EmbeddedNetworkController.hpp
index 6ce0b5cf..7bc37be2 100644
--- a/controller/EmbeddedNetworkController.hpp
+++ b/controller/EmbeddedNetworkController.hpp
@@ -58,7 +58,7 @@ class Node;
struct MQConfig;
-class EmbeddedNetworkController : public NetworkController
+class EmbeddedNetworkController : public NetworkController,public DB::ChangeListener
{
public:
/**
@@ -101,10 +101,9 @@ public:
void handleRemoteTrace(const ZT_RemoteTrace &rt);
- // Called on update via POST or by JSONDB on external update of network or network member records
- void onNetworkUpdate(const uint64_t networkId);
- void onNetworkMemberUpdate(const uint64_t networkId,const uint64_t memberId);
- void onNetworkMemberDeauthorize(const uint64_t networkId,const uint64_t memberId);
+ virtual void onNetworkUpdate(const uint64_t networkId,const nlohmann::json &network);
+ virtual void onNetworkMemberUpdate(const uint64_t networkId,const uint64_t memberId,const nlohmann::json &member);
+ virtual void onNetworkMemberDeauthorize(const uint64_t networkId,const uint64_t memberId);
private:
void _request(uint64_t nwid,const InetAddress &fromAddr,uint64_t requestPacketId,const Identity &identity,const Dictionary<ZT_NETWORKCONFIG_METADATA_DICT_CAPACITY> &metaData);
diff --git a/controller/FileDB.cpp b/controller/FileDB.cpp
index eb2ec00d..7b997c49 100644
--- a/controller/FileDB.cpp
+++ b/controller/FileDB.cpp
@@ -29,8 +29,8 @@
namespace ZeroTier
{
-FileDB::FileDB(EmbeddedNetworkController *const nc,const Identity &myId,const char *path) :
- DB(nc,myId,path),
+FileDB::FileDB(const Identity &myId,const char *path) :
+ DB(myId,path),
_networksPath(_path + ZT_PATH_SEPARATOR_S + "network"),
_tracePath(_path + ZT_PATH_SEPARATOR_S + "trace"),
_onlineChanged(false),
diff --git a/controller/FileDB.hpp b/controller/FileDB.hpp
index 0a8b9d2e..5d55d0a4 100644
--- a/controller/FileDB.hpp
+++ b/controller/FileDB.hpp
@@ -35,7 +35,7 @@ namespace ZeroTier
class FileDB : public DB
{
public:
- FileDB(EmbeddedNetworkController *const nc,const Identity &myId,const char *path);
+ FileDB(const Identity &myId,const char *path);
virtual ~FileDB();
virtual bool waitForReady();
diff --git a/controller/LFDB.cpp b/controller/LFDB.cpp
index 5c5b687d..1f7f701a 100644
--- a/controller/LFDB.cpp
+++ b/controller/LFDB.cpp
@@ -37,9 +37,8 @@
namespace ZeroTier
{
-LFDB::LFDB(EmbeddedNetworkController *const nc,const Identity &myId,const char *path,const char *lfOwnerPrivate,const char *lfOwnerPublic,const char *lfNodeHost,int lfNodePort,bool storeOnlineState) :
- DB(nc,myId,path),
- _nc(nc),
+LFDB::LFDB(const Identity &myId,const char *path,const char *lfOwnerPrivate,const char *lfOwnerPublic,const char *lfNodeHost,int lfNodePort,bool storeOnlineState) :
+ DB(myId,path),
_myId(myId),
_lfOwnerPrivate((lfOwnerPrivate) ? lfOwnerPrivate : ""),
_lfOwnerPublic((lfOwnerPublic) ? lfOwnerPublic : ""),
@@ -54,7 +53,7 @@ LFDB::LFDB(EmbeddedNetworkController *const nc,const Identity &myId,const char *
const uint64_t controllerAddressInt = _myId.address().toInt();
_myId.address().toString(controllerAddress);
std::string networksSelectorName("com.zerotier.controller.lfdb:"); networksSelectorName.append(controllerAddress); networksSelectorName.append("/network");
- std::string membersSelectorName("com.zerotier.controller.lfdb:"); membersSelectorName.append(controllerAddress); membersSelectorName.append("/network/member");
+ std::string membersSelectorName("com.zerotier.controller.lfdb:"); membersSelectorName.append(controllerAddress); membersSelectorName.append("/member");
httplib::Client htcli(_lfNodeHost.c_str(),_lfNodePort,600);
int64_t timeRangeStart = 0;
@@ -90,10 +89,10 @@ LFDB::LFDB(EmbeddedNetworkController *const nc,const Identity &myId,const char *
for(auto ms=ns->second.members.begin();ms!=ns->second.members.end();++ms) {
if ((_storeOnlineState)&&(ms->second.lastOnlineDirty)&&(ms->second.lastOnlineAddress)) {
+ nlohmann::json newrec,selector0,selector1,selectors,ip;
char tmp[1024],tmp2[128];
OSUtils::ztsnprintf(tmp,sizeof(tmp),"com.zerotier.controller.lfdb:%s/network/%.16llx/online",controllerAddress,(unsigned long long)ns->first);
ms->second.lastOnlineAddress.toIpString(tmp2);
- nlohmann::json newrec,selector0,selector1,selectors;
selector0["Name"] = tmp;
selector0["Ordinal"] = ms->first;
selector1["Name"] = tmp2;
@@ -101,7 +100,21 @@ LFDB::LFDB(EmbeddedNetworkController *const nc,const Identity &myId,const char *
selectors.push_back(selector0);
selectors.push_back(selector1);
newrec["Selectors"] = selectors;
- newrec["Value"] = tmp2;
+ const uint8_t *const rawip = (const uint8_t *)ms->second.lastOnlineAddress.rawIpData();
+ switch(ms->second.lastOnlineAddress) {
+ case AF_INET:
+ for(int j=0;j<4;++j)
+ ip.push_back((unsigned int)rawip[j]);
+ break;
+ case AF_INET6:
+ for(int j=0;j<16;++j)
+ ip.push_back((unsigned int)rawip[j]);
+ break;
+ default:
+ ip = tmp2; // should never happen since only IP transport is currently supported
+ break;
+ }
+ newrec["Value"] = ip;
newrec["OwnerPrivate"] = _lfOwnerPrivate;
newrec["MaskingKey"] = controllerAddress;
newrec["Timestamp"] = ms->second.lastOnlineTime;
@@ -112,7 +125,7 @@ LFDB::LFDB(EmbeddedNetworkController *const nc,const Identity &myId,const char *
ms->second.lastOnlineDirty = false;
printf("SET member online %.16llx %.10llx %s\n",ns->first,ms->first,resp->body.c_str());
} else {
- fprintf(stderr,"ERROR: LFDB: %d from node (create/update member): %s" ZT_EOL_S,resp->status,resp->body.c_str());
+ fprintf(stderr,"ERROR: LFDB: %d from node (create/update member online status): %s" ZT_EOL_S,resp->status,resp->body.c_str());
}
} else {
fprintf(stderr,"ERROR: LFDB: node is offline" ZT_EOL_S);
diff --git a/controller/LFDB.hpp b/controller/LFDB.hpp
index 6659c0bf..73187462 100644
--- a/controller/LFDB.hpp
+++ b/controller/LFDB.hpp
@@ -43,7 +43,6 @@ class LFDB : public DB
{
public:
/**
- * @param nc Network controller
* @param myId Identity of controller node (with secret)
* @param path Base path for ZeroTier node itself
* @param lfOwnerPrivate LF owner private in PEM format
@@ -52,7 +51,7 @@ public:
* @param lfNodePort LF node http (not https) port
* @param storeOnlineState If true, store online/offline state and IP info in LF (a lot of data, only for private networks!)
*/
- LFDB(EmbeddedNetworkController *const nc,const Identity &myId,const char *path,const char *lfOwnerPrivate,const char *lfOwnerPublic,const char *lfNodeHost,int lfNodePort,bool storeOnlineState);
+ LFDB(const Identity &myId,const char *path,const char *lfOwnerPrivate,const char *lfOwnerPublic,const char *lfNodeHost,int lfNodePort,bool storeOnlineState);
virtual ~LFDB();
virtual bool waitForReady();
@@ -63,7 +62,6 @@ public:
virtual void nodeIsOnline(const uint64_t networkId,const uint64_t memberId,const InetAddress &physicalAddress);
protected:
- EmbeddedNetworkController *const _nc;
const Identity _myId;
std::string _lfOwnerPrivate,_lfOwnerPublic;
diff --git a/controller/PostgreSQL.cpp b/controller/PostgreSQL.cpp
index 594591bd..c6b9ada4 100644
--- a/controller/PostgreSQL.cpp
+++ b/controller/PostgreSQL.cpp
@@ -77,8 +77,8 @@ std::string join(const std::vector<std::string> &elements, const char * const se
using namespace ZeroTier;
-PostgreSQL::PostgreSQL(EmbeddedNetworkController *const nc, const Identity &myId, const char *path, int listenPort, MQConfig *mqc)
- : DB(nc, myId, path)
+PostgreSQL::PostgreSQL(const Identity &myId, const char *path, int listenPort, MQConfig *mqc)
+ : DB(myId, path)
, _ready(0)
, _connected(1)
, _run(1)
diff --git a/controller/PostgreSQL.hpp b/controller/PostgreSQL.hpp
index 779d47bd..f35f89fc 100644
--- a/controller/PostgreSQL.hpp
+++ b/controller/PostgreSQL.hpp
@@ -51,7 +51,7 @@ struct MQConfig;
class PostgreSQL : public DB
{
public:
- PostgreSQL(EmbeddedNetworkController *const nc, const Identity &myId, const char *path, int listenPort, MQConfig *mqc = NULL);
+ PostgreSQL(const Identity &myId, const char *path, int listenPort, MQConfig *mqc = NULL);
virtual ~PostgreSQL();
virtual bool waitForReady();
@@ -78,7 +78,6 @@ private:
void _networksWatcher_Postgres(PGconn *conn);
void _networksWatcher_RabbitMQ();
-
void commitThread();
void onlineNotificationThread();
@@ -93,7 +92,6 @@ private:
BlockingQueue<nlohmann::json *> _commitQueue;
-
std::thread _heartbeatThread;
std::thread _membersDbWatcher;
std::thread _networksDbWatcher;
@@ -116,4 +114,4 @@ private:
#endif // ZT_CONTROLLER_LIBPQ_HPP
-#endif // ZT_CONTROLLER_USE_LIBPQ \ No newline at end of file
+#endif // ZT_CONTROLLER_USE_LIBPQ