diff options
author | Adam Ierymenko <adam.ierymenko@gmail.com> | 2013-10-05 10:19:12 -0400 |
---|---|---|
committer | Adam Ierymenko <adam.ierymenko@gmail.com> | 2013-10-05 10:19:12 -0400 |
commit | 4267e7da93a7394ad7059b6f71569642446f04a6 (patch) | |
tree | 44b2a6fb57d924a3d48da47f9551b77f2978eedd /node/Topology.cpp | |
parent | 0e43e5e8f2eddd5768873350d12103f2684bedbf (diff) | |
download | infinitytier-4267e7da93a7394ad7059b6f71569642446f04a6.tar.gz infinitytier-4267e7da93a7394ad7059b6f71569642446f04a6.zip |
Remove a whole bunch of now-unnecessary cruft from Topology and PacketDecoder.
Diffstat (limited to 'node/Topology.cpp')
-rw-r--r-- | node/Topology.cpp | 213 |
1 files changed, 49 insertions, 164 deletions
diff --git a/node/Topology.cpp b/node/Topology.cpp index 128a24d2..8acbb027 100644 --- a/node/Topology.cpp +++ b/node/Topology.cpp @@ -54,22 +54,12 @@ Topology::Topology(const RuntimeEnvironment *renv,const char *dbpath) } Utils::lockDownFile(dbpath,false); // node.db caches secrets - - _thread = Thread::start(this); } Topology::~Topology() { - { - Mutex::Lock _l(_peerDeepVerifyJobs_m); - _peerDeepVerifyJobs.push_back(_PeerDeepVerifyJob()); - _peerDeepVerifyJobs.back().type = _PeerDeepVerifyJob::CLEAN_CACHE; - _peerDeepVerifyJobs.push_back(_PeerDeepVerifyJob()); - _peerDeepVerifyJobs.back().type = _PeerDeepVerifyJob::EXIT_THREAD; - } - _peerDeepVerifyJobs_c.signal(); - Thread::join(_thread); - KISSDB_close(&_dbm); + // Flush last changes to disk + clean(); } void Topology::setSupernodes(const std::map< Identity,std::vector<InetAddress> > &sn) @@ -83,10 +73,8 @@ void Topology::setSupernodes(const std::map< Identity,std::vector<InetAddress> > for(std::map< Identity,std::vector<InetAddress> >::const_iterator i(sn.begin());i!=sn.end();++i) { if (i->first != _r->identity) { SharedPtr<Peer> p(getPeer(i->first.address())); - if ((!p)||(p->identity() != i->first)) { - p = SharedPtr<Peer>(new Peer(_r->identity,i->first)); - _reallyAddPeer(p); - } + if (!p) + p = addPeer(SharedPtr<Peer>(new Peer(_r->identity,i->first))); for(std::vector<InetAddress>::const_iterator j(i->second.begin());j!=i->second.end();++j) p->setPathAddress(*j,true); _supernodePeers.push_back(p); @@ -97,22 +85,33 @@ void Topology::setSupernodes(const std::map< Identity,std::vector<InetAddress> > _amSupernode = (_supernodes.find(_r->identity) != _supernodes.end()); } -void Topology::addPeer(const SharedPtr<Peer> &candidate,void (*callback)(void *,const SharedPtr<Peer> &,Topology::PeerVerifyResult),void *arg) +SharedPtr<Peer> Topology::addPeer(const SharedPtr<Peer> &peer) { - if (candidate->address() != _r->identity.address()) { - Mutex::Lock _l(_peerDeepVerifyJobs_m); - _peerDeepVerifyJobs.push_back(_PeerDeepVerifyJob()); - _PeerDeepVerifyJob &job = _peerDeepVerifyJobs.back(); - job.callback = callback; - job.arg = arg; - job.candidate = candidate; - job.type = _PeerDeepVerifyJob::VERIFY_PEER; - _peerDeepVerifyJobs_c.signal(); - } else { - TRACE("BUG: addPeer() caught and ignored attempt to add peer for self"); - if (callback) - callback(arg,candidate,PEER_VERIFY_REJECTED_DUPLICATE_TRIAGED); + if (peer->address() == _r->identity.address()) { + TRACE("BUG: addNewPeer() caught and ignored attempt to add peer for self"); + throw std::logic_error("cannot add peer for self"); + } + + SharedPtr<Peer> actualPeer; + { + Mutex::Lock _l(_activePeers_m); + actualPeer = _activePeers.insert(std::pair< Address,SharedPtr<Peer> >(peer->address(),peer)).first->second; } + + uint64_t atmp[ZT_ADDRESS_LENGTH]; + actualPeer->address().copyTo(atmp,ZT_ADDRESS_LENGTH); + + Buffer<ZT_PEER_MAX_SERIALIZED_LENGTH> b; + actualPeer->serialize(b); + b.zeroUnused(); + + _dbm_m.lock(); + if (KISSDB_put(&_dbm,atmp,b.data())) { + TRACE("error writing %s to peerdb",actualPeer->address().toString().c_str()); + } else actualPeer->getAndResetDirty(); + _dbm_m.unlock(); + + return actualPeer; } SharedPtr<Peer> Topology::getPeer(const Address &zta) @@ -212,143 +211,29 @@ skip_and_try_next_supernode: void Topology::clean() { - { - Mutex::Lock _l(_peerDeepVerifyJobs_m); - _peerDeepVerifyJobs.push_back(_PeerDeepVerifyJob()); - _peerDeepVerifyJobs.back().type = _PeerDeepVerifyJob::CLEAN_CACHE; - } - _peerDeepVerifyJobs_c.signal(); -} - -void Topology::threadMain() - throw() -{ - for(;;) { - _peerDeepVerifyJobs_m.lock(); - if (_peerDeepVerifyJobs.empty()) { - _peerDeepVerifyJobs_m.unlock(); - _peerDeepVerifyJobs_c.wait(); - continue; - } - _PeerDeepVerifyJob job(_peerDeepVerifyJobs.front()); - _peerDeepVerifyJobs.pop_front(); - unsigned long queueRemaining = (unsigned long)_peerDeepVerifyJobs.size(); - _peerDeepVerifyJobs_m.unlock(); - - switch(job.type) { - case _PeerDeepVerifyJob::VERIFY_PEER: - /* TODO: We should really verify peers every time completely if this - * is a supernode, perhaps deferring the expensive part for new - * addresses. An attempt at claim jumping should also trigger a - * short duration ban of the originating IP address in most cases, - * since this means either malicious intent or broken software. */ - TRACE("verifying peer: %s",job.candidate->identity().address().toString().c_str()); - - if ((job.candidate->identity())&&(!job.candidate->identity().address().isReserved())&&(job.candidate->identity().locallyValidate())) { - // Peer passes sniff test, so check to see if we've already got - // one with the same address. - - SharedPtr<Peer> existingPeer(getPeer(job.candidate->identity().address())); - - if (existingPeer) { - if (existingPeer->identity() == job.candidate->identity()) { - // It's an *exact* duplicate, so return the existing peer - if (job.callback) - job.callback(job.arg,existingPeer,PEER_VERIFY_ACCEPTED_ALREADY_HAVE); - } else if (queueRemaining > 3) { - /* Prevents a CPU hog DOS attack, while allowing a very unlikely kind of - * DOS attack where someone knows someone else's address prior to their - * registering it and claim-jumps them and then floods with bad identities - * to hold their claim. Of the two, the latter would be infeasable - * without already having cracked the target's machine in which case - * the attacker has their private key anyway and can really steal their - * identity. So why bother.*/ - TRACE("%s is duplicate, load too high, old won",job.candidate->identity().address().toString().c_str()); - if (job.callback) - job.callback(job.arg,job.candidate,PEER_VERIFY_REJECTED_DUPLICATE_TRIAGED); - } else { - // It's different so deeply validate it first, then the - // existing claimant, and toss the imposter. If both verify, the - // one we already have wins. - - if (!job.candidate->identity().locallyValidate()) { - LOG("Topology: IMPOSTER %s rejected",job.candidate->identity().address().toString().c_str()); - if (job.callback) - job.callback(job.arg,job.candidate,PEER_VERIFY_REJECTED_INVALID_IDENTITY); - } else if (!existingPeer->identity().locallyValidate()) { - LOG("Topology: previous IMPOSTER %s displaced by valid identity!",job.candidate->identity().address().toString().c_str()); - _reallyAddPeer(job.candidate); - if (job.callback) - job.callback(job.arg,job.candidate,PEER_VERIFY_ACCEPTED_DISPLACED_INVALID_ADDRESS); - } else { - LOG("Topology: tie between apparently valid claims on %s, oldest won",job.candidate->identity().address().toString().c_str()); - if (job.callback) - job.callback(job.arg,job.candidate,PEER_VERIFY_REJECTED_DUPLICATE); - } - } - } else { - TRACE("%s accepted as new",job.candidate->identity().address().toString().c_str()); - _reallyAddPeer(job.candidate); - if (job.callback) - job.callback(job.arg,job.candidate,PEER_VERIFY_ACCEPTED_NEW); - } - } else { - TRACE("%s rejected, identity failed initial checks",job.candidate->identity().address().toString().c_str()); - if (job.callback) - job.callback(job.arg,job.candidate,PEER_VERIFY_REJECTED_INVALID_IDENTITY); + TRACE("cleaning caches and flushing modified peers to disk..."); + + Mutex::Lock _l(_activePeers_m); + for(std::map< Address,SharedPtr<Peer> >::iterator p(_activePeers.begin());p!=_activePeers.end();++p) { + if (p->second->getAndResetDirty()) { + try { + uint64_t atmp[ZT_ADDRESS_LENGTH]; + p->second->identity().address().copyTo(atmp,ZT_ADDRESS_LENGTH); + + Buffer<ZT_PEER_MAX_SERIALIZED_LENGTH> b; + p->second->serialize(b); + b.zeroUnused(); + + _dbm_m.lock(); + if (KISSDB_put(&_dbm,atmp,b.data())) { + TRACE("error writing %s to peer.db",p->second->identity().address().toString().c_str()); } - break; - case _PeerDeepVerifyJob::CLEAN_CACHE: - TRACE("cleaning caches and flushing modified peers to disk..."); - { - Mutex::Lock _l(_activePeers_m); - for(std::map< Address,SharedPtr<Peer> >::iterator p(_activePeers.begin());p!=_activePeers.end();++p) { - if (p->second->getAndResetDirty()) { - try { - uint64_t atmp[ZT_ADDRESS_LENGTH]; - p->second->identity().address().copyTo(atmp,ZT_ADDRESS_LENGTH); - Buffer<ZT_PEER_MAX_SERIALIZED_LENGTH> b; - p->second->serialize(b); - b.zeroUnused(); - _dbm_m.lock(); - if (KISSDB_put(&_dbm,atmp,b.data())) { - TRACE("error writing %s to peer.db",p->second->identity().address().toString().c_str()); - } - _dbm_m.unlock(); - } catch ( ... ) { - TRACE("unexpected exception flushing %s to peer.db",p->second->identity().address().toString().c_str()); - } - } - } - } - break; - case _PeerDeepVerifyJob::EXIT_THREAD: - TRACE("thread terminating..."); - return; + _dbm_m.unlock(); + } catch ( ... ) { + TRACE("unexpected exception flushing %s to peer.db",p->second->identity().address().toString().c_str()); + } } } } -void Topology::_reallyAddPeer(const SharedPtr<Peer> &p) -{ - { - Mutex::Lock _l(_activePeers_m); - _activePeers[p->identity().address()] = p; - } - try { - uint64_t atmp[ZT_ADDRESS_LENGTH]; - p->address().copyTo(atmp,ZT_ADDRESS_LENGTH); - Buffer<ZT_PEER_MAX_SERIALIZED_LENGTH> b; - p->serialize(b); - b.zeroUnused(); - _dbm_m.lock(); - if (KISSDB_put(&_dbm,atmp,b.data())) { - TRACE("error writing %s to peerdb",p->address().toString().c_str()); - } else p->getAndResetDirty(); - _dbm_m.unlock(); - } catch ( ... ) { - TRACE("unexpected exception flushing to peerdb"); - } -} - } // namespace ZeroTier |