summaryrefslogtreecommitdiff
path: root/node/Topology.cpp
diff options
context:
space:
mode:
authorAdam Ierymenko <adam.ierymenko@gmail.com>2013-10-05 10:19:12 -0400
committerAdam Ierymenko <adam.ierymenko@gmail.com>2013-10-05 10:19:12 -0400
commit4267e7da93a7394ad7059b6f71569642446f04a6 (patch)
tree44b2a6fb57d924a3d48da47f9551b77f2978eedd /node/Topology.cpp
parent0e43e5e8f2eddd5768873350d12103f2684bedbf (diff)
downloadinfinitytier-4267e7da93a7394ad7059b6f71569642446f04a6.tar.gz
infinitytier-4267e7da93a7394ad7059b6f71569642446f04a6.zip
Remove a whole bunch of now-unnecessary cruft from Topology and PacketDecoder.
Diffstat (limited to 'node/Topology.cpp')
-rw-r--r--node/Topology.cpp213
1 files changed, 49 insertions, 164 deletions
diff --git a/node/Topology.cpp b/node/Topology.cpp
index 128a24d2..8acbb027 100644
--- a/node/Topology.cpp
+++ b/node/Topology.cpp
@@ -54,22 +54,12 @@ Topology::Topology(const RuntimeEnvironment *renv,const char *dbpath)
}
Utils::lockDownFile(dbpath,false); // node.db caches secrets
-
- _thread = Thread::start(this);
}
Topology::~Topology()
{
- {
- Mutex::Lock _l(_peerDeepVerifyJobs_m);
- _peerDeepVerifyJobs.push_back(_PeerDeepVerifyJob());
- _peerDeepVerifyJobs.back().type = _PeerDeepVerifyJob::CLEAN_CACHE;
- _peerDeepVerifyJobs.push_back(_PeerDeepVerifyJob());
- _peerDeepVerifyJobs.back().type = _PeerDeepVerifyJob::EXIT_THREAD;
- }
- _peerDeepVerifyJobs_c.signal();
- Thread::join(_thread);
- KISSDB_close(&_dbm);
+ // Flush last changes to disk
+ clean();
}
void Topology::setSupernodes(const std::map< Identity,std::vector<InetAddress> > &sn)
@@ -83,10 +73,8 @@ void Topology::setSupernodes(const std::map< Identity,std::vector<InetAddress> >
for(std::map< Identity,std::vector<InetAddress> >::const_iterator i(sn.begin());i!=sn.end();++i) {
if (i->first != _r->identity) {
SharedPtr<Peer> p(getPeer(i->first.address()));
- if ((!p)||(p->identity() != i->first)) {
- p = SharedPtr<Peer>(new Peer(_r->identity,i->first));
- _reallyAddPeer(p);
- }
+ if (!p)
+ p = addPeer(SharedPtr<Peer>(new Peer(_r->identity,i->first)));
for(std::vector<InetAddress>::const_iterator j(i->second.begin());j!=i->second.end();++j)
p->setPathAddress(*j,true);
_supernodePeers.push_back(p);
@@ -97,22 +85,33 @@ void Topology::setSupernodes(const std::map< Identity,std::vector<InetAddress> >
_amSupernode = (_supernodes.find(_r->identity) != _supernodes.end());
}
-void Topology::addPeer(const SharedPtr<Peer> &candidate,void (*callback)(void *,const SharedPtr<Peer> &,Topology::PeerVerifyResult),void *arg)
+SharedPtr<Peer> Topology::addPeer(const SharedPtr<Peer> &peer)
{
- if (candidate->address() != _r->identity.address()) {
- Mutex::Lock _l(_peerDeepVerifyJobs_m);
- _peerDeepVerifyJobs.push_back(_PeerDeepVerifyJob());
- _PeerDeepVerifyJob &job = _peerDeepVerifyJobs.back();
- job.callback = callback;
- job.arg = arg;
- job.candidate = candidate;
- job.type = _PeerDeepVerifyJob::VERIFY_PEER;
- _peerDeepVerifyJobs_c.signal();
- } else {
- TRACE("BUG: addPeer() caught and ignored attempt to add peer for self");
- if (callback)
- callback(arg,candidate,PEER_VERIFY_REJECTED_DUPLICATE_TRIAGED);
+ if (peer->address() == _r->identity.address()) {
+ TRACE("BUG: addNewPeer() caught and ignored attempt to add peer for self");
+ throw std::logic_error("cannot add peer for self");
+ }
+
+ SharedPtr<Peer> actualPeer;
+ {
+ Mutex::Lock _l(_activePeers_m);
+ actualPeer = _activePeers.insert(std::pair< Address,SharedPtr<Peer> >(peer->address(),peer)).first->second;
}
+
+ uint64_t atmp[ZT_ADDRESS_LENGTH];
+ actualPeer->address().copyTo(atmp,ZT_ADDRESS_LENGTH);
+
+ Buffer<ZT_PEER_MAX_SERIALIZED_LENGTH> b;
+ actualPeer->serialize(b);
+ b.zeroUnused();
+
+ _dbm_m.lock();
+ if (KISSDB_put(&_dbm,atmp,b.data())) {
+ TRACE("error writing %s to peerdb",actualPeer->address().toString().c_str());
+ } else actualPeer->getAndResetDirty();
+ _dbm_m.unlock();
+
+ return actualPeer;
}
SharedPtr<Peer> Topology::getPeer(const Address &zta)
@@ -212,143 +211,29 @@ skip_and_try_next_supernode:
void Topology::clean()
{
- {
- Mutex::Lock _l(_peerDeepVerifyJobs_m);
- _peerDeepVerifyJobs.push_back(_PeerDeepVerifyJob());
- _peerDeepVerifyJobs.back().type = _PeerDeepVerifyJob::CLEAN_CACHE;
- }
- _peerDeepVerifyJobs_c.signal();
-}
-
-void Topology::threadMain()
- throw()
-{
- for(;;) {
- _peerDeepVerifyJobs_m.lock();
- if (_peerDeepVerifyJobs.empty()) {
- _peerDeepVerifyJobs_m.unlock();
- _peerDeepVerifyJobs_c.wait();
- continue;
- }
- _PeerDeepVerifyJob job(_peerDeepVerifyJobs.front());
- _peerDeepVerifyJobs.pop_front();
- unsigned long queueRemaining = (unsigned long)_peerDeepVerifyJobs.size();
- _peerDeepVerifyJobs_m.unlock();
-
- switch(job.type) {
- case _PeerDeepVerifyJob::VERIFY_PEER:
- /* TODO: We should really verify peers every time completely if this
- * is a supernode, perhaps deferring the expensive part for new
- * addresses. An attempt at claim jumping should also trigger a
- * short duration ban of the originating IP address in most cases,
- * since this means either malicious intent or broken software. */
- TRACE("verifying peer: %s",job.candidate->identity().address().toString().c_str());
-
- if ((job.candidate->identity())&&(!job.candidate->identity().address().isReserved())&&(job.candidate->identity().locallyValidate())) {
- // Peer passes sniff test, so check to see if we've already got
- // one with the same address.
-
- SharedPtr<Peer> existingPeer(getPeer(job.candidate->identity().address()));
-
- if (existingPeer) {
- if (existingPeer->identity() == job.candidate->identity()) {
- // It's an *exact* duplicate, so return the existing peer
- if (job.callback)
- job.callback(job.arg,existingPeer,PEER_VERIFY_ACCEPTED_ALREADY_HAVE);
- } else if (queueRemaining > 3) {
- /* Prevents a CPU hog DOS attack, while allowing a very unlikely kind of
- * DOS attack where someone knows someone else's address prior to their
- * registering it and claim-jumps them and then floods with bad identities
- * to hold their claim. Of the two, the latter would be infeasable
- * without already having cracked the target's machine in which case
- * the attacker has their private key anyway and can really steal their
- * identity. So why bother.*/
- TRACE("%s is duplicate, load too high, old won",job.candidate->identity().address().toString().c_str());
- if (job.callback)
- job.callback(job.arg,job.candidate,PEER_VERIFY_REJECTED_DUPLICATE_TRIAGED);
- } else {
- // It's different so deeply validate it first, then the
- // existing claimant, and toss the imposter. If both verify, the
- // one we already have wins.
-
- if (!job.candidate->identity().locallyValidate()) {
- LOG("Topology: IMPOSTER %s rejected",job.candidate->identity().address().toString().c_str());
- if (job.callback)
- job.callback(job.arg,job.candidate,PEER_VERIFY_REJECTED_INVALID_IDENTITY);
- } else if (!existingPeer->identity().locallyValidate()) {
- LOG("Topology: previous IMPOSTER %s displaced by valid identity!",job.candidate->identity().address().toString().c_str());
- _reallyAddPeer(job.candidate);
- if (job.callback)
- job.callback(job.arg,job.candidate,PEER_VERIFY_ACCEPTED_DISPLACED_INVALID_ADDRESS);
- } else {
- LOG("Topology: tie between apparently valid claims on %s, oldest won",job.candidate->identity().address().toString().c_str());
- if (job.callback)
- job.callback(job.arg,job.candidate,PEER_VERIFY_REJECTED_DUPLICATE);
- }
- }
- } else {
- TRACE("%s accepted as new",job.candidate->identity().address().toString().c_str());
- _reallyAddPeer(job.candidate);
- if (job.callback)
- job.callback(job.arg,job.candidate,PEER_VERIFY_ACCEPTED_NEW);
- }
- } else {
- TRACE("%s rejected, identity failed initial checks",job.candidate->identity().address().toString().c_str());
- if (job.callback)
- job.callback(job.arg,job.candidate,PEER_VERIFY_REJECTED_INVALID_IDENTITY);
+ TRACE("cleaning caches and flushing modified peers to disk...");
+
+ Mutex::Lock _l(_activePeers_m);
+ for(std::map< Address,SharedPtr<Peer> >::iterator p(_activePeers.begin());p!=_activePeers.end();++p) {
+ if (p->second->getAndResetDirty()) {
+ try {
+ uint64_t atmp[ZT_ADDRESS_LENGTH];
+ p->second->identity().address().copyTo(atmp,ZT_ADDRESS_LENGTH);
+
+ Buffer<ZT_PEER_MAX_SERIALIZED_LENGTH> b;
+ p->second->serialize(b);
+ b.zeroUnused();
+
+ _dbm_m.lock();
+ if (KISSDB_put(&_dbm,atmp,b.data())) {
+ TRACE("error writing %s to peer.db",p->second->identity().address().toString().c_str());
}
- break;
- case _PeerDeepVerifyJob::CLEAN_CACHE:
- TRACE("cleaning caches and flushing modified peers to disk...");
- {
- Mutex::Lock _l(_activePeers_m);
- for(std::map< Address,SharedPtr<Peer> >::iterator p(_activePeers.begin());p!=_activePeers.end();++p) {
- if (p->second->getAndResetDirty()) {
- try {
- uint64_t atmp[ZT_ADDRESS_LENGTH];
- p->second->identity().address().copyTo(atmp,ZT_ADDRESS_LENGTH);
- Buffer<ZT_PEER_MAX_SERIALIZED_LENGTH> b;
- p->second->serialize(b);
- b.zeroUnused();
- _dbm_m.lock();
- if (KISSDB_put(&_dbm,atmp,b.data())) {
- TRACE("error writing %s to peer.db",p->second->identity().address().toString().c_str());
- }
- _dbm_m.unlock();
- } catch ( ... ) {
- TRACE("unexpected exception flushing %s to peer.db",p->second->identity().address().toString().c_str());
- }
- }
- }
- }
- break;
- case _PeerDeepVerifyJob::EXIT_THREAD:
- TRACE("thread terminating...");
- return;
+ _dbm_m.unlock();
+ } catch ( ... ) {
+ TRACE("unexpected exception flushing %s to peer.db",p->second->identity().address().toString().c_str());
+ }
}
}
}
-void Topology::_reallyAddPeer(const SharedPtr<Peer> &p)
-{
- {
- Mutex::Lock _l(_activePeers_m);
- _activePeers[p->identity().address()] = p;
- }
- try {
- uint64_t atmp[ZT_ADDRESS_LENGTH];
- p->address().copyTo(atmp,ZT_ADDRESS_LENGTH);
- Buffer<ZT_PEER_MAX_SERIALIZED_LENGTH> b;
- p->serialize(b);
- b.zeroUnused();
- _dbm_m.lock();
- if (KISSDB_put(&_dbm,atmp,b.data())) {
- TRACE("error writing %s to peerdb",p->address().toString().c_str());
- } else p->getAndResetDirty();
- _dbm_m.unlock();
- } catch ( ... ) {
- TRACE("unexpected exception flushing to peerdb");
- }
-}
-
} // namespace ZeroTier