summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAdam Ierymenko <adam.ierymenko@gmail.com>2017-11-08 11:06:14 -0800
committerAdam Ierymenko <adam.ierymenko@gmail.com>2017-11-08 11:06:14 -0800
commit4166d8ca35ded34180d60b56105a853dd6b02ff4 (patch)
tree3464cfb6e36f67ec5a8fe6f59eae8cbc40759c40
parent53e7e950f187008939dd5021f9d5f635f995f022 (diff)
downloadinfinitytier-4166d8ca35ded34180d60b56105a853dd6b02ff4.tar.gz
infinitytier-4166d8ca35ded34180d60b56105a853dd6b02ff4.zip
Fix a deadlock and some more work on RethinkDB (for central) integration.
-rw-r--r--controller/DB.hpp4
-rw-r--r--controller/EmbeddedNetworkController.cpp31
-rw-r--r--controller/FileDB.cpp7
-rw-r--r--controller/FileDB.hpp4
-rw-r--r--controller/RethinkDB.cpp74
-rw-r--r--controller/RethinkDB.hpp10
-rw-r--r--node/Peer.cpp59
7 files changed, 106 insertions, 83 deletions
diff --git a/controller/DB.hpp b/controller/DB.hpp
index dfc8ac95..fe06c24d 100644
--- a/controller/DB.hpp
+++ b/controller/DB.hpp
@@ -78,12 +78,14 @@ public:
void networks(std::vector<uint64_t> &networks);
- virtual void save(const nlohmann::json &record) = 0;
+ virtual void save(nlohmann::json *orig,nlohmann::json &record) = 0;
virtual void eraseNetwork(const uint64_t networkId) = 0;
virtual void eraseMember(const uint64_t networkId,const uint64_t memberId) = 0;
+ virtual void nodeIsOnline(const uint64_t memberId) = 0;
+
protected:
struct _Network
{
diff --git a/controller/EmbeddedNetworkController.cpp b/controller/EmbeddedNetworkController.cpp
index 5707e6e0..a2795d96 100644
--- a/controller/EmbeddedNetworkController.cpp
+++ b/controller/EmbeddedNetworkController.cpp
@@ -734,12 +734,7 @@ unsigned int EmbeddedNetworkController::handleControlPlaneHttpPOST(
member["nwid"] = nwids;
_removeMemberNonPersistedFields(member);
- if (member != origMember) {
- json &revj = member["revision"];
- member["revision"] = (revj.is_number() ? ((uint64_t)revj + 1ULL) : 1ULL);
- _db->save(member);
- }
-
+ _db->save(&origMember,member);
_addMemberNonPersistedFields(nwid,address,member,now);
responseBody = OSUtils::jsonDump(member);
responseContentType = "application/json";
@@ -986,12 +981,7 @@ unsigned int EmbeddedNetworkController::handleControlPlaneHttpPOST(
network["nwid"] = nwids; // legacy
_removeNetworkNonPersistedFields(network);
- if (network != origNetwork) {
- json &revj = network["revision"];
- network["revision"] = (revj.is_number() ? ((uint64_t)revj + 1ULL) : 1ULL);
- _db->save(network);
- }
-
+ _db->save(&origNetwork,network);
ControllerDB::NetworkSummaryInfo ns;
_db->summary(nwid,ns);
_addNetworkNonPersistedFields(nwid,network,now,ns);
@@ -1116,7 +1106,7 @@ void EmbeddedNetworkController::handleRemoteTrace(const ZT_RemoteTrace &rt)
d["objtype"] = "trace";
d["ts"] = now;
d["nodeId"] = Utils::hex10(rt.origin,tmp);
- _db->save(d);
+ _db->save((nlohmann::json *)0,d);
} catch ( ... ) {
// drop invalid trace messages if an error occurs
}
@@ -1185,6 +1175,8 @@ void EmbeddedNetworkController::_request(
ms.lastRequestTime = now;
}
+ _db->nodeIsOnline(identity.address().toInt());
+
Utils::hex(nwid,nwids);
_db->get(nwid,network,identity.address().toInt(),member,ns);
if ((!network.is_object())||(network.size() == 0)) {
@@ -1299,11 +1291,7 @@ void EmbeddedNetworkController::_request(
} else {
// If they are not authorized, STOP!
_removeMemberNonPersistedFields(member);
- if (origMember != member) {
- json &revj = member["revision"];
- member["revision"] = (revj.is_number() ? ((uint64_t)revj + 1ULL) : 1ULL);
- _db->save(member);
- }
+ _db->save(&origMember,member);
_sender->ncSendError(nwid,requestPacketId,identity.address(),NetworkController::NC_ERROR_ACCESS_DENIED);
return;
}
@@ -1666,12 +1654,7 @@ void EmbeddedNetworkController::_request(
}
_removeMemberNonPersistedFields(member);
- if (member != origMember) {
- json &revj = member["revision"];
- member["revision"] = (revj.is_number() ? ((uint64_t)revj + 1ULL) : 1ULL);
- _db->save(member);
- }
-
+ _db->save(&origMember,member);
_sender->ncSendConfig(nwid,requestPacketId,identity.address(),*(nc.get()),metaData.getUI(ZT_NETWORKCONFIG_REQUEST_METADATA_KEY_VERSION,0) < 6);
}
diff --git a/controller/FileDB.cpp b/controller/FileDB.cpp
index b48d5e87..646fa2fe 100644
--- a/controller/FileDB.cpp
+++ b/controller/FileDB.cpp
@@ -69,7 +69,7 @@ bool FileDB::waitForReady()
return true;
}
-void FileDB::save(const nlohmann::json &record)
+void FileDB::save(nlohmann::json *orig,nlohmann::json &record)
{
char p1[16384],p2[16384];
try {
@@ -126,4 +126,9 @@ void FileDB::eraseMember(const uint64_t networkId,const uint64_t memberId)
{
}
+void FileDB::nodeIsOnline(const uint64_t memberId)
+{
+ // Nothing to do here right now in the filesystem store mode since we can just get this from the peer list
+}
+
} // namespace ZeroTier
diff --git a/controller/FileDB.hpp b/controller/FileDB.hpp
index fe9869b9..76a47936 100644
--- a/controller/FileDB.hpp
+++ b/controller/FileDB.hpp
@@ -32,12 +32,14 @@ public:
virtual bool waitForReady();
- virtual void save(const nlohmann::json &record);
+ virtual void save(nlohmann::json *orig,nlohmann::json &record);
virtual void eraseNetwork(const uint64_t networkId);
virtual void eraseMember(const uint64_t networkId,const uint64_t memberId);
+ virtual void nodeIsOnline(const uint64_t memberId);
+
protected:
std::string _networksPath;
};
diff --git a/controller/RethinkDB.cpp b/controller/RethinkDB.cpp
index d1012167..031bd516 100644
--- a/controller/RethinkDB.cpp
+++ b/controller/RethinkDB.cpp
@@ -215,6 +215,47 @@ RethinkDB::RethinkDB(EmbeddedNetworkController *const nc,const Address &myAddres
});
}
+ _onlineNotificationThread = std::thread([this]() {
+ try {
+ std::unique_ptr<R::Connection> rdb;
+ while (_run == 1) {
+ try {
+ if (!rdb)
+ rdb = R::connect(this->_host,this->_port,this->_auth);
+ if (rdb) {
+ std::lock_guard<std::mutex> l(_lastOnline_l);
+ R::Array batch;
+ R::Object tmpobj;
+ for(auto i=_lastOnline.begin();i!=_lastOnline.end();++i) {
+ char nodeId[16];
+ Utils::hex10(i->first,nodeId);
+ tmpobj["id"] = nodeId;
+ tmpobj["ts"] = i->second;
+ batch.emplace_back(tmpobj);
+ if (batch.size() >= 256) {
+ R::db(this->_db).table("NodeLastOnline").insert(R::args(batch),R::optargs("conflict","update")).run(*rdb);
+ batch.clear();
+ }
+ }
+ if (batch.size() > 0)
+ R::db(this->_db).table("NodeLastOnline").insert(R::args(batch),R::optargs("conflict","update")).run(*rdb);
+ _lastOnline.clear();
+ }
+ } catch (std::exception &e) {
+ fprintf(stderr,"ERROR: controller RethinkDB (node status update): %s" ZT_EOL_S,e.what());
+ rdb.reset();
+ } catch (R::Error &e) {
+ fprintf(stderr,"ERROR: controller RethinkDB (node status update): %s" ZT_EOL_S,e.message.c_str());
+ rdb.reset();
+ } catch ( ... ) {
+ fprintf(stderr,"ERROR: controller RethinkDB (node status update): unknown exception" ZT_EOL_S);
+ rdb.reset();
+ }
+ std::this_thread::sleep_for(std::chrono::milliseconds(250));
+ }
+ } catch ( ... ) {}
+ });
+
_heartbeatThread = std::thread([this]() {
try {
char tmp[1024];
@@ -251,9 +292,10 @@ RethinkDB::~RethinkDB()
_membersDbWatcher.join();
_networksDbWatcher.join();
_heartbeatThread.join();
+ _onlineNotificationThread.join();
}
-void RethinkDB::waitForReady()
+bool RethinkDB::waitForReady()
{
while (_ready > 0) {
if (!_waitNoticePrinted) {
@@ -263,12 +305,32 @@ void RethinkDB::waitForReady()
_readyLock.lock();
_readyLock.unlock();
}
+ return true;
}
-void RethinkDB::save(const nlohmann::json &record)
+void RethinkDB::save(nlohmann::json *orig,nlohmann::json &record)
{
+ if (!record.is_object()) // sanity check
+ return;
waitForReady();
- _commitQueue.post(new nlohmann::json(record));
+ if (orig) {
+ if (*orig != record) {
+ nlohmann::json *q = new nlohmann::json();
+ try {
+ record["revision"] = OSUtils::jsonInt(record["revision"],0ULL) + 1;
+ for(auto kv=record.begin();kv!=record.end();++kv) {
+ if ((kv.key() == "id")||(kv.key() == "nwid")||(kv.key() == "objtype")||((*q)[kv.key()] != kv.value()))
+ (*q)[kv.key()] = kv.value();
+ }
+ } catch ( ... ) {
+ delete q;
+ throw;
+ }
+ }
+ } else {
+ record["revision"] = 1;
+ _commitQueue.post(new nlohmann::json(record));
+ }
}
void RethinkDB::eraseNetwork(const uint64_t networkId)
@@ -295,6 +357,12 @@ void RethinkDB::eraseMember(const uint64_t networkId,const uint64_t memberId)
_commitQueue.post(tmp);
}
+void RethinkDB::nodeIsOnline(const uint64_t memberId)
+{
+ std::lock_guard<std::mutex> l(_lastOnline_l);
+ _lastOnline[memberId] = OSUtils::now();
+}
+
} // namespace ZeroTier
#endif // ZT_CONTROLLER_USE_RETHINKDB
diff --git a/controller/RethinkDB.hpp b/controller/RethinkDB.hpp
index 2309a25c..8c8b16a6 100644
--- a/controller/RethinkDB.hpp
+++ b/controller/RethinkDB.hpp
@@ -34,14 +34,16 @@ public:
RethinkDB(EmbeddedNetworkController *const nc,const Address &myAddress,const char *path);
virtual ~RethinkDB();
- virtual void waitForReady();
+ virtual bool waitForReady();
- virtual void save(const nlohmann::json &record);
+ virtual void save(nlohmann::json *orig,nlohmann::json &record);
virtual void eraseNetwork(const uint64_t networkId);
virtual void eraseMember(const uint64_t networkId,const uint64_t memberId);
+ virtual void nodeIsOnline(const uint64_t memberId);
+
protected:
std::string _host;
std::string _db;
@@ -56,6 +58,10 @@ protected:
BlockingQueue< nlohmann::json * > _commitQueue;
std::thread _commitThread[ZT_CONTROLLER_RETHINKDB_COMMIT_THREADS];
+ std::unordered_map< uint64_t,int64_t > _lastOnline;
+ mutable std::mutex _lastOnline_l;
+ std::thread _onlineNotificationThread;
+
std::thread _heartbeatThread;
mutable std::mutex _readyLock; // locked until ready
diff --git a/node/Peer.cpp b/node/Peer.cpp
index a3682a97..2d562f12 100644
--- a/node/Peer.cpp
+++ b/node/Peer.cpp
@@ -78,54 +78,6 @@ void Peer::received(
{
const int64_t now = RR->node->now();
-/*
-#ifdef ZT_ENABLE_CLUSTER
- bool isClusterSuboptimalPath = false;
- if ((RR->cluster)&&(hops == 0)) {
- // Note: findBetterEndpoint() is first since we still want to check
- // for a better endpoint even if we don't actually send a redirect.
- InetAddress redirectTo;
- if ( (verb != Packet::VERB_OK) && (verb != Packet::VERB_ERROR) && (verb != Packet::VERB_RENDEZVOUS) && (verb != Packet::VERB_PUSH_DIRECT_PATHS) && (RR->cluster->findBetterEndpoint(redirectTo,_id.address(),path->address(),false)) ) {
- if (_vProto >= 5) {
- // For newer peers we can send a more idiomatic verb: PUSH_DIRECT_PATHS.
- Packet outp(_id.address(),RR->identity.address(),Packet::VERB_PUSH_DIRECT_PATHS);
- outp.append((uint16_t)1); // count == 1
- outp.append((uint8_t)ZT_PUSH_DIRECT_PATHS_FLAG_CLUSTER_REDIRECT); // flags: cluster redirect
- outp.append((uint16_t)0); // no extensions
- if (redirectTo.ss_family == AF_INET) {
- outp.append((uint8_t)4);
- outp.append((uint8_t)6);
- outp.append(redirectTo.rawIpData(),4);
- } else {
- outp.append((uint8_t)6);
- outp.append((uint8_t)18);
- outp.append(redirectTo.rawIpData(),16);
- }
- outp.append((uint16_t)redirectTo.port());
- outp.armor(_key,true,path->nextOutgoingCounter());
- path->send(RR,tPtr,outp.data(),outp.size(),now);
- } else {
- // For older peers we use RENDEZVOUS to coax them into contacting us elsewhere.
- Packet outp(_id.address(),RR->identity.address(),Packet::VERB_RENDEZVOUS);
- outp.append((uint8_t)0); // no flags
- RR->identity.address().appendTo(outp);
- outp.append((uint16_t)redirectTo.port());
- if (redirectTo.ss_family == AF_INET) {
- outp.append((uint8_t)4);
- outp.append(redirectTo.rawIpData(),4);
- } else {
- outp.append((uint8_t)16);
- outp.append(redirectTo.rawIpData(),16);
- }
- outp.armor(_key,true,path->nextOutgoingCounter());
- path->send(RR,tPtr,outp.data(),outp.size(),now);
- }
- isClusterSuboptimalPath = true;
- }
- }
-#endif
-*/
-
_lastReceive = now;
switch (verb) {
case Packet::VERB_FRAME:
@@ -163,6 +115,7 @@ void Peer::received(
}
}
+ bool attemptToContact = false;
if ((!havePath)&&(RR->node->shouldUsePathForZeroTierTraffic(tPtr,_id.address(),path->localSocket(),path->address()))) {
Mutex::Lock _l(_paths_m);
@@ -201,13 +154,17 @@ void Peer::received(
_paths[replacePath].p = path;
_paths[replacePath].priority = 1;
} else {
- attemptToContactAt(tPtr,path->localSocket(),path->address(),now,true,path->nextOutgoingCounter());
- path->sent(now);
- RR->t->peerConfirmingUnknownPath(tPtr,networkId,*this,path,packetId,verb);
+ attemptToContact = true;
}
}
}
}
+
+ if (attemptToContact) {
+ attemptToContactAt(tPtr,path->localSocket(),path->address(),now,true,path->nextOutgoingCounter());
+ path->sent(now);
+ RR->t->peerConfirmingUnknownPath(tPtr,networkId,*this,path,packetId,verb);
+ }
}
// If we have a trust relationship periodically push a message enumerating