summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--controller/DB.cpp44
-rw-r--r--controller/DB.hpp24
-rw-r--r--controller/DBMirrorSet.cpp106
-rw-r--r--controller/DBMirrorSet.hpp25
-rw-r--r--controller/EmbeddedNetworkController.cpp6
-rw-r--r--controller/EmbeddedNetworkController.hpp6
-rw-r--r--controller/FileDB.cpp26
-rw-r--r--controller/FileDB.hpp5
-rw-r--r--controller/LFDB.cpp37
-rw-r--r--controller/LFDB.hpp4
-rw-r--r--controller/PostgreSQL.cpp71
-rw-r--r--controller/PostgreSQL.hpp7
12 files changed, 210 insertions, 151 deletions
diff --git a/controller/DB.cpp b/controller/DB.cpp
index 75adf53e..a4440f38 100644
--- a/controller/DB.cpp
+++ b/controller/DB.cpp
@@ -104,16 +104,7 @@ void DB::cleanMember(nlohmann::json &member)
member.erase("lastRequestMetaData");
}
-DB::DB(const Identity &myId,const char *path) :
- _myId(myId),
- _myAddress(myId.address()),
- _path((path) ? path : "")
-{
- char tmp[32];
- _myAddress.toString(tmp);
- _myAddressStr = tmp;
-}
-
+DB::DB() {}
DB::~DB() {}
bool DB::get(const uint64_t networkId,nlohmann::json &network)
@@ -199,34 +190,15 @@ bool DB::get(const uint64_t networkId,nlohmann::json &network,std::vector<nlohma
return true;
}
-bool DB::summary(const uint64_t networkId,NetworkSummaryInfo &info)
-{
- waitForReady();
- std::shared_ptr<_Network> nw;
- {
- std::lock_guard<std::mutex> l(_networks_l);
- auto nwi = _networks.find(networkId);
- if (nwi == _networks.end())
- return false;
- nw = nwi->second;
- }
- {
- std::lock_guard<std::mutex> l2(nw->lock);
- _fillSummaryInfo(nw,info);
- }
- return true;
-}
-
-void DB::networks(std::vector<uint64_t> &networks)
+void DB::networks(std::set<uint64_t> &networks)
{
waitForReady();
std::lock_guard<std::mutex> l(_networks_l);
- networks.reserve(_networks.size() + 1);
for(auto n=_networks.begin();n!=_networks.end();++n)
- networks.push_back(n->first);
+ networks.insert(n->first);
}
-void DB::_memberChanged(nlohmann::json &old,nlohmann::json &memberConfig,bool initialized)
+void DB::_memberChanged(nlohmann::json &old,nlohmann::json &memberConfig,bool notifyListeners)
{
uint64_t memberId = 0;
uint64_t networkId = 0;
@@ -310,7 +282,7 @@ void DB::_memberChanged(nlohmann::json &old,nlohmann::json &memberConfig,bool in
}
}
- if (initialized) {
+ if (notifyListeners) {
std::lock_guard<std::mutex> ll(_changeListeners_l);
for(auto i=_changeListeners.begin();i!=_changeListeners.end();++i) {
(*i)->onNetworkMemberUpdate(this,networkId,memberId,memberConfig);
@@ -333,7 +305,7 @@ void DB::_memberChanged(nlohmann::json &old,nlohmann::json &memberConfig,bool in
}
}
- if ((initialized)&&((wasAuth)&&(!isAuth)&&(networkId)&&(memberId))) {
+ if ((notifyListeners)&&((wasAuth)&&(!isAuth)&&(networkId)&&(memberId))) {
std::lock_guard<std::mutex> ll(_changeListeners_l);
for(auto i=_changeListeners.begin();i!=_changeListeners.end();++i) {
(*i)->onNetworkMemberDeauthorize(this,networkId,memberId);
@@ -341,7 +313,7 @@ void DB::_memberChanged(nlohmann::json &old,nlohmann::json &memberConfig,bool in
}
}
-void DB::_networkChanged(nlohmann::json &old,nlohmann::json &networkConfig,bool initialized)
+void DB::_networkChanged(nlohmann::json &old,nlohmann::json &networkConfig,bool notifyListeners)
{
if (networkConfig.is_object()) {
const std::string ids = networkConfig["id"];
@@ -359,7 +331,7 @@ void DB::_networkChanged(nlohmann::json &old,nlohmann::json &networkConfig,bool
std::lock_guard<std::mutex> l2(nw->lock);
nw->config = networkConfig;
}
- if (initialized) {
+ if (notifyListeners) {
std::lock_guard<std::mutex> ll(_changeListeners_l);
for(auto i=_changeListeners.begin();i!=_changeListeners.end();++i) {
(*i)->onNetworkUpdate(this,networkId,networkConfig);
diff --git a/controller/DB.hpp b/controller/DB.hpp
index 732a6e25..e391538e 100644
--- a/controller/DB.hpp
+++ b/controller/DB.hpp
@@ -43,6 +43,7 @@
#include <vector>
#include <atomic>
#include <mutex>
+#include <set>
#include "../ext/json/json.hpp"
@@ -60,10 +61,9 @@ public:
public:
ChangeListener() {}
virtual ~ChangeListener() {}
- virtual void onNetworkUpdate(const DB *db,uint64_t networkId,const nlohmann::json &network) {}
- virtual void onNetworkMemberUpdate(const DB *db,uint64_t networkId,uint64_t memberId,const nlohmann::json &member) {}
- virtual void onNetworkMemberDeauthorize(const DB *db,uint64_t networkId,uint64_t memberId) {}
- virtual void onNetworkMemberOnline(const DB *db,uint64_t networkId,uint64_t memberId,const InetAddress &physicalAddress) {}
+ virtual void onNetworkUpdate(const void *db,uint64_t networkId,const nlohmann::json &network) {}
+ virtual void onNetworkMemberUpdate(const void *db,uint64_t networkId,uint64_t memberId,const nlohmann::json &member) {}
+ virtual void onNetworkMemberDeauthorize(const void *db,uint64_t networkId,uint64_t memberId) {}
};
struct NetworkSummaryInfo
@@ -81,7 +81,7 @@ public:
static void cleanNetwork(nlohmann::json &network);
static void cleanMember(nlohmann::json &member);
- DB(const Identity &myId,const char *path);
+ DB();
virtual ~DB();
virtual bool waitForReady() = 0;
@@ -98,10 +98,9 @@ public:
bool get(const uint64_t networkId,nlohmann::json &network,const uint64_t memberId,nlohmann::json &member,NetworkSummaryInfo &info);
bool get(const uint64_t networkId,nlohmann::json &network,std::vector<nlohmann::json> &members);
- bool summary(const uint64_t networkId,NetworkSummaryInfo &info);
- void networks(std::vector<uint64_t> &networks);
+ void networks(std::set<uint64_t> &networks);
- virtual void save(nlohmann::json &record) = 0;
+ virtual bool save(nlohmann::json &record,bool notifyListeners) = 0;
virtual void eraseNetwork(const uint64_t networkId) = 0;
virtual void eraseMember(const uint64_t networkId,const uint64_t memberId) = 0;
@@ -127,15 +126,10 @@ protected:
std::mutex lock;
};
- void _memberChanged(nlohmann::json &old,nlohmann::json &memberConfig,bool initialized);
- void _networkChanged(nlohmann::json &old,nlohmann::json &networkConfig,bool initialized);
+ void _memberChanged(nlohmann::json &old,nlohmann::json &memberConfig,bool notifyListeners);
+ void _networkChanged(nlohmann::json &old,nlohmann::json &networkConfig,bool notifyListeners);
void _fillSummaryInfo(const std::shared_ptr<_Network> &nw,NetworkSummaryInfo &info);
- const Identity _myId;
- const Address _myAddress;
- const std::string _path;
- std::string _myAddressStr;
-
std::vector<DB::ChangeListener *> _changeListeners;
std::unordered_map< uint64_t,std::shared_ptr<_Network> > _networks;
std::unordered_multimap< uint64_t,uint64_t > _networkByMember;
diff --git a/controller/DBMirrorSet.cpp b/controller/DBMirrorSet.cpp
index bccdefdd..63c0f58b 100644
--- a/controller/DBMirrorSet.cpp
+++ b/controller/DBMirrorSet.cpp
@@ -28,7 +28,8 @@
namespace ZeroTier {
-DBMirrorSet::DBMirrorSet()
+DBMirrorSet::DBMirrorSet(DB::ChangeListener *listener) :
+ _listener(listener)
{
}
@@ -36,6 +37,65 @@ DBMirrorSet::~DBMirrorSet()
{
}
+bool DBMirrorSet::hasNetwork(const uint64_t networkId) const
+{
+ std::lock_guard<std::mutex> l(_dbs_l);
+ for(auto d=_dbs.begin();d!=_dbs.end();++d) {
+ if ((*d)->hasNetwork(networkId))
+ return true;
+ }
+ return false;
+}
+
+bool DBMirrorSet::get(const uint64_t networkId,nlohmann::json &network)
+{
+ std::lock_guard<std::mutex> l(_dbs_l);
+ for(auto d=_dbs.begin();d!=_dbs.end();++d) {
+ if (get(networkId,network)) {
+ return true;
+ }
+ }
+ return false;
+}
+
+bool DBMirrorSet::get(const uint64_t networkId,nlohmann::json &network,const uint64_t memberId,nlohmann::json &member)
+{
+ std::lock_guard<std::mutex> l(_dbs_l);
+ for(auto d=_dbs.begin();d!=_dbs.end();++d) {
+ if (get(networkId,network,memberId,member))
+ return true;
+ }
+ return false;
+}
+
+bool DBMirrorSet::get(const uint64_t networkId,nlohmann::json &network,const uint64_t memberId,nlohmann::json &member,DB::NetworkSummaryInfo &info)
+{
+ std::lock_guard<std::mutex> l(_dbs_l);
+ for(auto d=_dbs.begin();d!=_dbs.end();++d) {
+ if (get(networkId,network,memberId,member,info))
+ return true;
+ }
+ return false;
+}
+
+bool DBMirrorSet::get(const uint64_t networkId,nlohmann::json &network,std::vector<nlohmann::json> &members)
+{
+ std::lock_guard<std::mutex> l(_dbs_l);
+ for(auto d=_dbs.begin();d!=_dbs.end();++d) {
+ if (get(networkId,network,members))
+ return true;
+ }
+ return false;
+}
+
+void DBMirrorSet::networks(std::set<uint64_t> &networks)
+{
+ std::lock_guard<std::mutex> l(_dbs_l);
+ for(auto d=_dbs.begin();d!=_dbs.end();++d) {
+ (*d)->networks(networks);
+ }
+}
+
bool DBMirrorSet::waitForReady()
{
bool r = false;
@@ -56,11 +116,21 @@ bool DBMirrorSet::isReady()
return true;
}
-void DBMirrorSet::save(nlohmann::json &record)
+bool DBMirrorSet::save(nlohmann::json &record,bool notifyListeners)
{
std::lock_guard<std::mutex> l(_dbs_l);
- for(auto d=_dbs.begin();d!=_dbs.end();++d) {
- (*d)->save(record);
+ if (notifyListeners) {
+ for(auto d=_dbs.begin();d!=_dbs.end();++d) {
+ if ((*d)->save(record,notifyListeners))
+ return true;
+ }
+ return false;
+ } else {
+ bool modified = false;
+ for(auto d=_dbs.begin();d!=_dbs.end();++d) {
+ modified |= (*d)->save(record,notifyListeners);
+ }
+ return modified;
}
}
@@ -88,25 +158,39 @@ void DBMirrorSet::nodeIsOnline(const uint64_t networkId,const uint64_t memberId,
}
}
-void DBMirrorSet::onNetworkUpdate(const DB *db,uint64_t networkId,const nlohmann::json &network)
+void DBMirrorSet::onNetworkUpdate(const void *db,uint64_t networkId,const nlohmann::json &network)
{
+ bool modified = false;
+ nlohmann::json record(network);
std::lock_guard<std::mutex> l(_dbs_l);
for(auto d=_dbs.begin();d!=_dbs.end();++d) {
if (d->get() != db) {
+ modified |= (*d)->save(record,false);
}
}
+ if (modified) {
+ _listener->onNetworkUpdate(this,networkId,network);
+ }
}
-void DBMirrorSet::onNetworkMemberUpdate(const DB *db,uint64_t networkId,uint64_t memberId,const nlohmann::json &member)
-{
-}
-
-void DBMirrorSet::onNetworkMemberDeauthorize(const DB *db,uint64_t networkId,uint64_t memberId)
+void DBMirrorSet::onNetworkMemberUpdate(const void *db,uint64_t networkId,uint64_t memberId,const nlohmann::json &member)
{
+ bool modified = false;
+ nlohmann::json record(member);
+ std::lock_guard<std::mutex> l(_dbs_l);
+ for(auto d=_dbs.begin();d!=_dbs.end();++d) {
+ if (d->get() != db) {
+ modified |= (*d)->save(record,false);
+ }
+ }
+ if (modified) {
+ _listener->onNetworkMemberUpdate(this,networkId,memberId,member);
+ }
}
-void DBMirrorSet::onNetworkMemberOnline(const DB *db,uint64_t networkId,uint64_t memberId,const InetAddress &physicalAddress)
+void DBMirrorSet::onNetworkMemberDeauthorize(const void *db,uint64_t networkId,uint64_t memberId)
{
+ _listener->onNetworkMemberDeauthorize(this,networkId,memberId);
}
} // namespace ZeroTier
diff --git a/controller/DBMirrorSet.hpp b/controller/DBMirrorSet.hpp
index 1af0018e..aee598af 100644
--- a/controller/DBMirrorSet.hpp
+++ b/controller/DBMirrorSet.hpp
@@ -32,37 +32,48 @@
#include <vector>
#include <memory>
#include <mutex>
+#include <set>
namespace ZeroTier {
class DBMirrorSet : public DB::ChangeListener
{
public:
- DBMirrorSet();
+ DBMirrorSet(DB::ChangeListener *listener);
virtual ~DBMirrorSet();
+ bool hasNetwork(const uint64_t networkId) const;
+
+ bool get(const uint64_t networkId,nlohmann::json &network);
+ bool get(const uint64_t networkId,nlohmann::json &network,const uint64_t memberId,nlohmann::json &member);
+ bool get(const uint64_t networkId,nlohmann::json &network,const uint64_t memberId,nlohmann::json &member,DB::NetworkSummaryInfo &info);
+ bool get(const uint64_t networkId,nlohmann::json &network,std::vector<nlohmann::json> &members);
+
+ void networks(std::set<uint64_t> &networks);
+
bool waitForReady();
bool isReady();
- void save(nlohmann::json &record);
+ bool save(nlohmann::json &record,bool notifyListeners);
void eraseNetwork(const uint64_t networkId);
void eraseMember(const uint64_t networkId,const uint64_t memberId);
void nodeIsOnline(const uint64_t networkId,const uint64_t memberId,const InetAddress &physicalAddress);
// These are called by various DB instances when changes occur.
- virtual void onNetworkUpdate(const DB *db,uint64_t networkId,const nlohmann::json &network);
- virtual void onNetworkMemberUpdate(const DB *db,uint64_t networkId,uint64_t memberId,const nlohmann::json &member);
- virtual void onNetworkMemberDeauthorize(const DB *db,uint64_t networkId,uint64_t memberId);
- virtual void onNetworkMemberOnline(const DB *db,uint64_t networkId,uint64_t memberId,const InetAddress &physicalAddress);
+ virtual void onNetworkUpdate(const void *db,uint64_t networkId,const nlohmann::json &network);
+ virtual void onNetworkMemberUpdate(const void *db,uint64_t networkId,uint64_t memberId,const nlohmann::json &member);
+ virtual void onNetworkMemberDeauthorize(const void *db,uint64_t networkId,uint64_t memberId);
inline void addDB(const std::shared_ptr<DB> &db)
{
+ db->addListener(this);
std::lock_guard<std::mutex> l(_dbs_l);
_dbs.push_back(db);
}
private:
+ DB::ChangeListener *const _listener;
std::vector< std::shared_ptr< DB > > _dbs;
- std::mutex _dbs_l;
+ mutable std::mutex _dbs_l;
};
} // namespace ZeroTier
diff --git a/controller/EmbeddedNetworkController.cpp b/controller/EmbeddedNetworkController.cpp
index d9c6364b..64acda0f 100644
--- a/controller/EmbeddedNetworkController.cpp
+++ b/controller/EmbeddedNetworkController.cpp
@@ -1188,7 +1188,7 @@ void EmbeddedNetworkController::handleRemoteTrace(const ZT_RemoteTrace &rt)
}
}
-void EmbeddedNetworkController::onNetworkUpdate(const DB *db,uint64_t networkId,const nlohmann::json &network)
+void EmbeddedNetworkController::onNetworkUpdate(const void *db,uint64_t networkId,const nlohmann::json &network)
{
// Send an update to all members of the network that are online
const int64_t now = OSUtils::now();
@@ -1199,7 +1199,7 @@ void EmbeddedNetworkController::onNetworkUpdate(const DB *db,uint64_t networkId,
}
}
-void EmbeddedNetworkController::onNetworkMemberUpdate(const DB *db,uint64_t networkId,uint64_t memberId,const nlohmann::json &member)
+void EmbeddedNetworkController::onNetworkMemberUpdate(const void *db,uint64_t networkId,uint64_t memberId,const nlohmann::json &member)
{
// Push update to member if online
try {
@@ -1210,7 +1210,7 @@ void EmbeddedNetworkController::onNetworkMemberUpdate(const DB *db,uint64_t netw
} catch ( ... ) {}
}
-void EmbeddedNetworkController::onNetworkMemberDeauthorize(const DB *db,uint64_t networkId,uint64_t memberId)
+void EmbeddedNetworkController::onNetworkMemberDeauthorize(const void *db,uint64_t networkId,uint64_t memberId)
{
const int64_t now = OSUtils::now();
Revocation rev((uint32_t)_node->prng(),networkId,0,now,ZT_REVOCATION_FLAG_FAST_PROPAGATE,Address(memberId),Revocation::CREDENTIAL_TYPE_COM);
diff --git a/controller/EmbeddedNetworkController.hpp b/controller/EmbeddedNetworkController.hpp
index c0f14f8b..85223dc4 100644
--- a/controller/EmbeddedNetworkController.hpp
+++ b/controller/EmbeddedNetworkController.hpp
@@ -101,9 +101,9 @@ public:
void handleRemoteTrace(const ZT_RemoteTrace &rt);
- virtual void onNetworkUpdate(const DB *db,uint64_t networkId,const nlohmann::json &network);
- virtual void onNetworkMemberUpdate(const DB *db,uint64_t networkId,uint64_t memberId,const nlohmann::json &member);
- virtual void onNetworkMemberDeauthorize(const DB *db,uint64_t networkId,uint64_t memberId);
+ virtual void onNetworkUpdate(const void *db,uint64_t networkId,const nlohmann::json &network);
+ virtual void onNetworkMemberUpdate(const void *db,uint64_t networkId,uint64_t memberId,const nlohmann::json &member);
+ virtual void onNetworkMemberDeauthorize(const void *db,uint64_t networkId,uint64_t memberId);
private:
void _request(uint64_t nwid,const InetAddress &fromAddr,uint64_t requestPacketId,const Identity &identity,const Dictionary<ZT_NETWORKCONFIG_METADATA_DICT_CAPACITY> &metaData);
diff --git a/controller/FileDB.cpp b/controller/FileDB.cpp
index 66b3d2c2..1dc2498a 100644
--- a/controller/FileDB.cpp
+++ b/controller/FileDB.cpp
@@ -29,8 +29,9 @@
namespace ZeroTier
{
-FileDB::FileDB(const Identity &myId,const char *path) :
- DB(myId,path),
+FileDB::FileDB(const char *path) :
+ DB(),
+ _path(path),
_networksPath(_path + ZT_PATH_SEPARATOR_S + "network"),
_tracePath(_path + ZT_PATH_SEPARATOR_S + "trace"),
_running(true)
@@ -85,9 +86,10 @@ FileDB::~FileDB()
bool FileDB::waitForReady() { return true; }
bool FileDB::isReady() { return true; }
-void FileDB::save(nlohmann::json &record)
+bool FileDB::save(nlohmann::json &record,bool notifyListeners)
{
char p1[4096],p2[4096],pb[4096];
+ bool modified = false;
try {
const std::string objtype = record["objtype"];
if (objtype == "network") {
@@ -101,7 +103,8 @@ void FileDB::save(nlohmann::json &record)
OSUtils::ztsnprintf(p1,sizeof(p1),"%s" ZT_PATH_SEPARATOR_S "%.16llx.json",_networksPath.c_str(),nwid);
if (!OSUtils::writeFile(p1,OSUtils::jsonDump(record,-1)))
fprintf(stderr,"WARNING: controller unable to write to path: %s" ZT_EOL_S,p1);
- _networkChanged(old,record,true);
+ _networkChanged(old,record,notifyListeners);
+ modified = true;
}
}
@@ -123,12 +126,14 @@ void FileDB::save(nlohmann::json &record)
if (!OSUtils::writeFile(p1,OSUtils::jsonDump(record,-1)))
fprintf(stderr,"WARNING: controller unable to write to path: %s" ZT_EOL_S,p1);
}
- _memberChanged(old,record,true);
+ _memberChanged(old,record,notifyListeners);
+ modified = true;
}
}
}
} catch ( ... ) {} // drop invalid records missing fields
+ return modified;
}
void FileDB::eraseNetwork(const uint64_t networkId)
@@ -163,15 +168,8 @@ void FileDB::nodeIsOnline(const uint64_t networkId,const uint64_t memberId,const
char mid[32],atmp[64];
OSUtils::ztsnprintf(mid,sizeof(mid),"%.10llx",(unsigned long long)memberId);
physicalAddress.toString(atmp);
- {
- std::lock_guard<std::mutex> l(this->_online_l);
- this->_online[networkId][memberId][OSUtils::now()] = physicalAddress;
- }
- {
- std::lock_guard<std::mutex> l2(_changeListeners_l);
- for(auto i=_changeListeners.begin();i!=_changeListeners.end();++i)
- (*i)->onNetworkMemberOnline(this,networkId,memberId,physicalAddress);
- }
+ std::lock_guard<std::mutex> l(this->_online_l);
+ this->_online[networkId][memberId][OSUtils::now()] = physicalAddress;
}
} // namespace ZeroTier
diff --git a/controller/FileDB.hpp b/controller/FileDB.hpp
index deef8854..8aa2c18e 100644
--- a/controller/FileDB.hpp
+++ b/controller/FileDB.hpp
@@ -35,17 +35,18 @@ namespace ZeroTier
class FileDB : public DB
{
public:
- FileDB(const Identity &myId,const char *path);
+ FileDB(const char *path);
virtual ~FileDB();
virtual bool waitForReady();
virtual bool isReady();
- virtual void save(nlohmann::json &record);
+ virtual bool save(nlohmann::json &record,bool notifyListeners);
virtual void eraseNetwork(const uint64_t networkId);
virtual void eraseMember(const uint64_t networkId,const uint64_t memberId);
virtual void nodeIsOnline(const uint64_t networkId,const uint64_t memberId,const InetAddress &physicalAddress);
protected:
+ std::string _path;
std::string _networksPath;
std::string _tracePath;
std::thread _onlineUpdateThread;
diff --git a/controller/LFDB.cpp b/controller/LFDB.cpp
index 5bf0aaf7..5dd6d082 100644
--- a/controller/LFDB.cpp
+++ b/controller/LFDB.cpp
@@ -38,7 +38,7 @@ namespace ZeroTier
{
LFDB::LFDB(const Identity &myId,const char *path,const char *lfOwnerPrivate,const char *lfOwnerPublic,const char *lfNodeHost,int lfNodePort,bool storeOnlineState) :
- DB(myId,path),
+ DB(),
_myId(myId),
_lfOwnerPrivate((lfOwnerPrivate) ? lfOwnerPrivate : ""),
_lfOwnerPublic((lfOwnerPublic) ? lfOwnerPublic : ""),
@@ -335,8 +335,9 @@ bool LFDB::isReady()
return (_ready.load());
}
-void LFDB::save(nlohmann::json &record)
+bool LFDB::save(nlohmann::json &record,bool notifyListeners)
{
+ bool modified = false;
const std::string objtype = record["objtype"];
if (objtype == "network") {
const uint64_t nwid = OSUtils::jsonIntHex(record["id"],0ULL);
@@ -345,11 +346,12 @@ void LFDB::save(nlohmann::json &record)
get(nwid,old);
if ((!old.is_object())||(old != record)) {
record["revision"] = OSUtils::jsonInt(record["revision"],0ULL) + 1ULL;
- _networkChanged(old,record,true);
+ _networkChanged(old,record,notifyListeners);
{
std::lock_guard<std::mutex> l(_state_l);
_state[nwid].dirty = true;
}
+ modified = true;
}
}
} else if (objtype == "member") {
@@ -360,14 +362,16 @@ void LFDB::save(nlohmann::json &record)
get(nwid,network,id,old);
if ((!old.is_object())||(old != record)) {
record["revision"] = OSUtils::jsonInt(record["revision"],0ULL) + 1ULL;
- _memberChanged(old,record,true);
+ _memberChanged(old,record,notifyListeners);
{
std::lock_guard<std::mutex> l(_state_l);
_state[nwid].members[id].dirty = true;
}
+ modified = true;
}
}
}
+ return modified;
}
void LFDB::eraseNetwork(const uint64_t networkId)
@@ -382,24 +386,17 @@ void LFDB::eraseMember(const uint64_t networkId,const uint64_t memberId)
void LFDB::nodeIsOnline(const uint64_t networkId,const uint64_t memberId,const InetAddress &physicalAddress)
{
- {
- std::lock_guard<std::mutex> l(_state_l);
- auto nw = _state.find(networkId);
- if (nw != _state.end()) {
- auto m = nw->second.members.find(memberId);
- if (m != nw->second.members.end()) {
- m->second.lastOnlineTime = OSUtils::now();
- if (physicalAddress)
- m->second.lastOnlineAddress = physicalAddress;
- m->second.lastOnlineDirty = true;
- }
+ std::lock_guard<std::mutex> l(_state_l);
+ auto nw = _state.find(networkId);
+ if (nw != _state.end()) {
+ auto m = nw->second.members.find(memberId);
+ if (m != nw->second.members.end()) {
+ m->second.lastOnlineTime = OSUtils::now();
+ if (physicalAddress)
+ m->second.lastOnlineAddress = physicalAddress;
+ m->second.lastOnlineDirty = true;
}
}
- {
- std::lock_guard<std::mutex> l2(_changeListeners_l);
- for(auto i=_changeListeners.begin();i!=_changeListeners.end();++i)
- (*i)->onNetworkMemberOnline(this,networkId,memberId,physicalAddress);
- }
}
} // namespace ZeroTier
diff --git a/controller/LFDB.hpp b/controller/LFDB.hpp
index 646da65d..bcd6cdd0 100644
--- a/controller/LFDB.hpp
+++ b/controller/LFDB.hpp
@@ -43,7 +43,7 @@ class LFDB : public DB
{
public:
/**
- * @param myId Identity of controller node (with secret)
+ * @param myId This controller's identity
* @param path Base path for ZeroTier node itself
* @param lfOwnerPrivate LF owner private in PEM format
* @param lfOwnerPublic LF owner public in @base62 format
@@ -56,7 +56,7 @@ public:
virtual bool waitForReady();
virtual bool isReady();
- virtual void save(nlohmann::json &record);
+ virtual bool save(nlohmann::json &record,bool notifyListeners);
virtual void eraseNetwork(const uint64_t networkId);
virtual void eraseMember(const uint64_t networkId,const uint64_t memberId);
virtual void nodeIsOnline(const uint64_t networkId,const uint64_t memberId,const InetAddress &physicalAddress);
diff --git a/controller/PostgreSQL.cpp b/controller/PostgreSQL.cpp
index 121d00df..13b7b8d8 100644
--- a/controller/PostgreSQL.cpp
+++ b/controller/PostgreSQL.cpp
@@ -81,7 +81,9 @@ std::string join(const std::vector<std::string> &elements, const char * const se
using namespace ZeroTier;
PostgreSQL::PostgreSQL(const Identity &myId, const char *path, int listenPort, MQConfig *mqc)
- : DB(myId, path)
+ : DB()
+ , _myId(myId)
+ , _myAddress(myId.address())
, _ready(0)
, _connected(1)
, _run(1)
@@ -89,7 +91,9 @@ PostgreSQL::PostgreSQL(const Identity &myId, const char *path, int listenPort, M
, _listenPort(listenPort)
, _mqc(mqc)
{
- _connString = std::string(path) + " application_name=controller_" +_myAddressStr;
+ char myAddress[64];
+ _myAddressStr = myId.address().toString(myAddress);
+ _connString = std::string(path) + " application_name=controller_" + _myAddressStr;
// Database Schema Version Check
PGconn *conn = getPgConn();
@@ -165,8 +169,9 @@ bool PostgreSQL::isReady()
return ((_ready == 2)&&(_connected));
}
-void PostgreSQL::save(nlohmann::json &record)
+bool PostgreSQL::save(nlohmann::json &record,bool notifyListeners)
{
+ bool modified = false;
try {
if (!record.is_object())
return;
@@ -178,7 +183,8 @@ void PostgreSQL::save(nlohmann::json &record)
get(nwid,old);
if ((!old.is_object())||(old != record)) {
record["revision"] = OSUtils::jsonInt(record["revision"],0ULL) + 1ULL;
- _commitQueue.post(new nlohmann::json(record));
+ _commitQueue.post(std::pair<nlohmann::json,bool>(record,notifyListeners));
+ modified = true;
}
}
} else if (objtype == "member") {
@@ -189,7 +195,8 @@ void PostgreSQL::save(nlohmann::json &record)
get(nwid,network,id,old);
if ((!old.is_object())||(old != record)) {
record["revision"] = OSUtils::jsonInt(record["revision"],0ULL) + 1ULL;
- _commitQueue.post(new nlohmann::json(record));
+ _commitQueue.post(std::pair<nlohmann::json,bool>(record,notifyListeners));
+ modified = true;
}
}
}
@@ -210,6 +217,7 @@ void PostgreSQL::save(nlohmann::json &record)
} catch (...) {
fprintf(stderr, "Unknown error on PostgreSQL::save\n");
}
+ return modified;
}
void PostgreSQL::eraseNetwork(const uint64_t networkId)
@@ -217,38 +225,33 @@ void PostgreSQL::eraseNetwork(const uint64_t networkId)
char tmp2[24];
waitForReady();
Utils::hex(networkId, tmp2);
- json *tmp = new json();
- (*tmp)["id"] = tmp2;
- (*tmp)["objtype"] = "_delete_network";
+ std::pair<nlohmann::json,bool> tmp;
+ tmp.first["id"] = tmp2;
+ tmp.first["objtype"] = "_delete_network";
+ tmp.second = true;
_commitQueue.post(tmp);
}
void PostgreSQL::eraseMember(const uint64_t networkId, const uint64_t memberId)
{
char tmp2[24];
- json *tmp = new json();
+ std::pair<nlohmann::json,bool> tmp;
Utils::hex(networkId, tmp2);
- (*tmp)["nwid"] = tmp2;
+ tmp.first["nwid"] = tmp2;
Utils::hex(memberId, tmp2);
- (*tmp)["id"] = tmp2;
- (*tmp)["objtype"] = "_delete_member";
+ tmp.first["id"] = tmp2;
+ tmp.first["objtype"] = "_delete_member";
+ tmp.second = true;
_commitQueue.post(tmp);
}
void PostgreSQL::nodeIsOnline(const uint64_t networkId, const uint64_t memberId, const InetAddress &physicalAddress)
{
- {
- std::lock_guard<std::mutex> l(_lastOnline_l);
- std::pair<int64_t, InetAddress> &i = _lastOnline[std::pair<uint64_t,uint64_t>(networkId, memberId)];
- i.first = OSUtils::now();
- if (physicalAddress) {
- i.second = physicalAddress;
- }
- }
- {
- std::lock_guard<std::mutex> l2(_changeListeners_l);
- for(auto i=_changeListeners.begin();i!=_changeListeners.end();++i)
- (*i)->onNetworkMemberOnline(this,networkId,memberId,physicalAddress);
+ std::lock_guard<std::mutex> l(_lastOnline_l);
+ std::pair<int64_t, InetAddress> &i = _lastOnline[std::pair<uint64_t,uint64_t>(networkId, memberId)];
+ i.first = OSUtils::now();
+ if (physicalAddress) {
+ i.second = physicalAddress;
}
}
@@ -868,18 +871,18 @@ void PostgreSQL::commitThread()
exit(1);
}
- json *config = nullptr;
- while(_commitQueue.get(config)&(_run == 1)) {
- if (!config) {
+ std::pair<nlohmann::json,bool> qitem;
+ while(_commitQueue.get(qitem)&(_run == 1)) {
+ if (!qitem.first.is_object()) {
continue;
}
if (PQstatus(conn) == CONNECTION_BAD) {
fprintf(stderr, "ERROR: Connection to database failed: %s\n", PQerrorMessage(conn));
PQfinish(conn);
- delete config;
exit(1);
}
- try {
+ try {
+ nlohmann::json *config = &(qitem.first);
const std::string objtype = (*config)["objtype"];
if (objtype == "member") {
try {
@@ -1034,10 +1037,10 @@ void PostgreSQL::commitThread()
nlohmann::json memOrig;
nlohmann::json memNew(*config);
-
+
get(nwidInt, nwOrig, memberidInt, memOrig);
- _memberChanged(memOrig, memNew, (this->_ready>=2));
+ _memberChanged(memOrig, memNew, qitem.second);
} else {
fprintf(stderr, "Can't notify of change. Error parsing nwid or memberid: %lu-%lu\n", nwidInt, memberidInt);
}
@@ -1260,7 +1263,7 @@ void PostgreSQL::commitThread()
get(nwidInt, nwOrig);
- _networkChanged(nwOrig, nwNew, true);
+ _networkChanged(nwOrig, nwNew, qitem.second);
} else {
fprintf(stderr, "Can't notify network changed: %lu\n", nwidInt);
}
@@ -1268,8 +1271,6 @@ void PostgreSQL::commitThread()
} catch (std::exception &e) {
fprintf(stderr, "ERROR: Error updating member: %s\n", e.what());
}
- } else if (objtype == "trace") {
- fprintf(stderr, "ERROR: Trace not yet implemented");
} else if (objtype == "_delete_network") {
try {
std::string networkId = (*config)["nwid"];
@@ -1326,8 +1327,6 @@ void PostgreSQL::commitThread()
} catch (std::exception &e) {
fprintf(stderr, "ERROR: Error getting objtype: %s\n", e.what());
}
- delete config;
- config = nullptr;
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
diff --git a/controller/PostgreSQL.hpp b/controller/PostgreSQL.hpp
index ce6fb242..6b0ea996 100644
--- a/controller/PostgreSQL.hpp
+++ b/controller/PostgreSQL.hpp
@@ -55,7 +55,7 @@ public:
virtual bool waitForReady();
virtual bool isReady();
- virtual void save(nlohmann::json &record);
+ virtual bool save(nlohmann::json &record,bool notifyListeners);
virtual void eraseNetwork(const uint64_t networkId);
virtual void eraseMember(const uint64_t networkId, const uint64_t memberId);
virtual void nodeIsOnline(const uint64_t networkId, const uint64_t memberId, const InetAddress &physicalAddress);
@@ -87,9 +87,12 @@ private:
PGconn * getPgConn( OverrideMode m = ALLOW_PGBOUNCER_OVERRIDE );
+ const Identity _myId;
+ const Address _myAddress;
+ std::string _myAddressStr;
std::string _connString;
- BlockingQueue<nlohmann::json *> _commitQueue;
+ BlockingQueue< std::pair<nlohmann::json,bool> > _commitQueue;
std::thread _heartbeatThread;
std::thread _membersDbWatcher;