diff options
| author | Adam Ierymenko <adam.ierymenko@gmail.com> | 2013-10-05 10:19:12 -0400 | 
|---|---|---|
| committer | Adam Ierymenko <adam.ierymenko@gmail.com> | 2013-10-05 10:19:12 -0400 | 
| commit | 4267e7da93a7394ad7059b6f71569642446f04a6 (patch) | |
| tree | 44b2a6fb57d924a3d48da47f9551b77f2978eedd /node/Topology.cpp | |
| parent | 0e43e5e8f2eddd5768873350d12103f2684bedbf (diff) | |
| download | infinitytier-4267e7da93a7394ad7059b6f71569642446f04a6.tar.gz infinitytier-4267e7da93a7394ad7059b6f71569642446f04a6.zip | |
Remove a whole bunch of now-unnecessary cruft from Topology and PacketDecoder.
Diffstat (limited to 'node/Topology.cpp')
| -rw-r--r-- | node/Topology.cpp | 213 | 
1 files changed, 49 insertions, 164 deletions
| diff --git a/node/Topology.cpp b/node/Topology.cpp index 128a24d2..8acbb027 100644 --- a/node/Topology.cpp +++ b/node/Topology.cpp @@ -54,22 +54,12 @@ Topology::Topology(const RuntimeEnvironment *renv,const char *dbpath)  	}  	Utils::lockDownFile(dbpath,false); // node.db caches secrets - -	_thread = Thread::start(this);  }  Topology::~Topology()  { -	{ -		Mutex::Lock _l(_peerDeepVerifyJobs_m); -		_peerDeepVerifyJobs.push_back(_PeerDeepVerifyJob()); -		_peerDeepVerifyJobs.back().type = _PeerDeepVerifyJob::CLEAN_CACHE; -		_peerDeepVerifyJobs.push_back(_PeerDeepVerifyJob()); -		_peerDeepVerifyJobs.back().type = _PeerDeepVerifyJob::EXIT_THREAD; -	} -	_peerDeepVerifyJobs_c.signal(); -	Thread::join(_thread); -	KISSDB_close(&_dbm); +	// Flush last changes to disk +	clean();  }  void Topology::setSupernodes(const std::map< Identity,std::vector<InetAddress> > &sn) @@ -83,10 +73,8 @@ void Topology::setSupernodes(const std::map< Identity,std::vector<InetAddress> >  	for(std::map< Identity,std::vector<InetAddress> >::const_iterator i(sn.begin());i!=sn.end();++i) {  		if (i->first != _r->identity) {  			SharedPtr<Peer> p(getPeer(i->first.address())); -			if ((!p)||(p->identity() != i->first)) { -				p = SharedPtr<Peer>(new Peer(_r->identity,i->first)); -				_reallyAddPeer(p); -			} +			if (!p) +				p = addPeer(SharedPtr<Peer>(new Peer(_r->identity,i->first)));  			for(std::vector<InetAddress>::const_iterator j(i->second.begin());j!=i->second.end();++j)  				p->setPathAddress(*j,true);  			_supernodePeers.push_back(p); @@ -97,22 +85,33 @@ void Topology::setSupernodes(const std::map< Identity,std::vector<InetAddress> >  	_amSupernode = (_supernodes.find(_r->identity) != _supernodes.end());  } -void Topology::addPeer(const SharedPtr<Peer> &candidate,void (*callback)(void *,const SharedPtr<Peer> &,Topology::PeerVerifyResult),void *arg) +SharedPtr<Peer> Topology::addPeer(const SharedPtr<Peer> &peer)  { -	if (candidate->address() != _r->identity.address()) { -		Mutex::Lock _l(_peerDeepVerifyJobs_m); -		_peerDeepVerifyJobs.push_back(_PeerDeepVerifyJob()); -		_PeerDeepVerifyJob &job = _peerDeepVerifyJobs.back(); -		job.callback = callback; -		job.arg = arg; -		job.candidate = candidate; -		job.type = _PeerDeepVerifyJob::VERIFY_PEER; -		_peerDeepVerifyJobs_c.signal(); -	} else { -		TRACE("BUG: addPeer() caught and ignored attempt to add peer for self"); -		if (callback) -			callback(arg,candidate,PEER_VERIFY_REJECTED_DUPLICATE_TRIAGED); +	if (peer->address() == _r->identity.address()) { +		TRACE("BUG: addNewPeer() caught and ignored attempt to add peer for self"); +		throw std::logic_error("cannot add peer for self"); +	} + +	SharedPtr<Peer> actualPeer; +	{ +		Mutex::Lock _l(_activePeers_m); +		actualPeer = _activePeers.insert(std::pair< Address,SharedPtr<Peer> >(peer->address(),peer)).first->second;  	} + +	uint64_t atmp[ZT_ADDRESS_LENGTH]; +	actualPeer->address().copyTo(atmp,ZT_ADDRESS_LENGTH); + +	Buffer<ZT_PEER_MAX_SERIALIZED_LENGTH> b; +	actualPeer->serialize(b); +	b.zeroUnused(); + +	_dbm_m.lock(); +	if (KISSDB_put(&_dbm,atmp,b.data())) { +		TRACE("error writing %s to peerdb",actualPeer->address().toString().c_str()); +	} else actualPeer->getAndResetDirty(); +	_dbm_m.unlock(); + +	return actualPeer;  }  SharedPtr<Peer> Topology::getPeer(const Address &zta) @@ -212,143 +211,29 @@ skip_and_try_next_supernode:  void Topology::clean()  { -	{ -		Mutex::Lock _l(_peerDeepVerifyJobs_m); -		_peerDeepVerifyJobs.push_back(_PeerDeepVerifyJob()); -		_peerDeepVerifyJobs.back().type = _PeerDeepVerifyJob::CLEAN_CACHE; -	} -	_peerDeepVerifyJobs_c.signal(); -} - -void Topology::threadMain() -	throw() -{ -	for(;;) { -		_peerDeepVerifyJobs_m.lock(); -		if (_peerDeepVerifyJobs.empty()) { -			_peerDeepVerifyJobs_m.unlock(); -			_peerDeepVerifyJobs_c.wait(); -			continue; -		} -		_PeerDeepVerifyJob job(_peerDeepVerifyJobs.front()); -		_peerDeepVerifyJobs.pop_front(); -		unsigned long queueRemaining = (unsigned long)_peerDeepVerifyJobs.size(); -		_peerDeepVerifyJobs_m.unlock(); - -		switch(job.type) { -			case _PeerDeepVerifyJob::VERIFY_PEER: -				/* TODO: We should really verify peers every time completely if this -				 * is a supernode, perhaps deferring the expensive part for new -				 * addresses. An attempt at claim jumping should also trigger a -				 * short duration ban of the originating IP address in most cases, -				 * since this means either malicious intent or broken software. */ -				TRACE("verifying peer: %s",job.candidate->identity().address().toString().c_str()); - -				if ((job.candidate->identity())&&(!job.candidate->identity().address().isReserved())&&(job.candidate->identity().locallyValidate())) { -					// Peer passes sniff test, so check to see if we've already got -					// one with the same address. - -					SharedPtr<Peer> existingPeer(getPeer(job.candidate->identity().address())); - -					if (existingPeer) { -						if (existingPeer->identity() == job.candidate->identity()) { -							// It's an *exact* duplicate, so return the existing peer -							if (job.callback) -								job.callback(job.arg,existingPeer,PEER_VERIFY_ACCEPTED_ALREADY_HAVE); -						} else if (queueRemaining > 3) { -							/* Prevents a CPU hog DOS attack, while allowing a very unlikely kind of -							 * DOS attack where someone knows someone else's address prior to their -							 * registering it and claim-jumps them and then floods with bad identities -							 * to hold their claim. Of the two, the latter would be infeasable -							 * without already having cracked the target's machine in which case -							 * the attacker has their private key anyway and can really steal their -							 * identity. So why bother.*/ -							TRACE("%s is duplicate, load too high, old won",job.candidate->identity().address().toString().c_str()); -							if (job.callback) -								job.callback(job.arg,job.candidate,PEER_VERIFY_REJECTED_DUPLICATE_TRIAGED); -						} else { -							// It's different so deeply validate it first, then the -							// existing claimant, and toss the imposter. If both verify, the -							// one we already have wins. - -							if (!job.candidate->identity().locallyValidate()) { -								LOG("Topology: IMPOSTER %s rejected",job.candidate->identity().address().toString().c_str()); -								if (job.callback) -									job.callback(job.arg,job.candidate,PEER_VERIFY_REJECTED_INVALID_IDENTITY); -							} else if (!existingPeer->identity().locallyValidate()) { -								LOG("Topology: previous IMPOSTER %s displaced by valid identity!",job.candidate->identity().address().toString().c_str()); -								_reallyAddPeer(job.candidate); -								if (job.callback) -									job.callback(job.arg,job.candidate,PEER_VERIFY_ACCEPTED_DISPLACED_INVALID_ADDRESS); -							} else { -								LOG("Topology: tie between apparently valid claims on %s, oldest won",job.candidate->identity().address().toString().c_str()); -								if (job.callback) -									job.callback(job.arg,job.candidate,PEER_VERIFY_REJECTED_DUPLICATE); -							} -						} -					} else { -						TRACE("%s accepted as new",job.candidate->identity().address().toString().c_str()); -						_reallyAddPeer(job.candidate); -						if (job.callback) -							job.callback(job.arg,job.candidate,PEER_VERIFY_ACCEPTED_NEW); -					} -				} else { -					TRACE("%s rejected, identity failed initial checks",job.candidate->identity().address().toString().c_str()); -					if (job.callback) -						job.callback(job.arg,job.candidate,PEER_VERIFY_REJECTED_INVALID_IDENTITY); +	TRACE("cleaning caches and flushing modified peers to disk..."); + +	Mutex::Lock _l(_activePeers_m); +	for(std::map< Address,SharedPtr<Peer> >::iterator p(_activePeers.begin());p!=_activePeers.end();++p) { +		if (p->second->getAndResetDirty()) { +			try { +				uint64_t atmp[ZT_ADDRESS_LENGTH]; +				p->second->identity().address().copyTo(atmp,ZT_ADDRESS_LENGTH); + +				Buffer<ZT_PEER_MAX_SERIALIZED_LENGTH> b; +				p->second->serialize(b); +				b.zeroUnused(); + +				_dbm_m.lock(); +				if (KISSDB_put(&_dbm,atmp,b.data())) { +					TRACE("error writing %s to peer.db",p->second->identity().address().toString().c_str());  				} -				break; -			case _PeerDeepVerifyJob::CLEAN_CACHE: -				TRACE("cleaning caches and flushing modified peers to disk..."); -				{ -					Mutex::Lock _l(_activePeers_m); -					for(std::map< Address,SharedPtr<Peer> >::iterator p(_activePeers.begin());p!=_activePeers.end();++p) { -						if (p->second->getAndResetDirty()) { -							try { -								uint64_t atmp[ZT_ADDRESS_LENGTH]; -								p->second->identity().address().copyTo(atmp,ZT_ADDRESS_LENGTH); -								Buffer<ZT_PEER_MAX_SERIALIZED_LENGTH> b; -								p->second->serialize(b); -								b.zeroUnused(); -								_dbm_m.lock(); -								if (KISSDB_put(&_dbm,atmp,b.data())) { -									TRACE("error writing %s to peer.db",p->second->identity().address().toString().c_str()); -								} -								_dbm_m.unlock(); -							} catch ( ... ) { -								TRACE("unexpected exception flushing %s to peer.db",p->second->identity().address().toString().c_str()); -							} -						} -					} -				} -				break; -			case _PeerDeepVerifyJob::EXIT_THREAD: -				TRACE("thread terminating..."); -				return; +				_dbm_m.unlock(); +			} catch ( ... ) { +				TRACE("unexpected exception flushing %s to peer.db",p->second->identity().address().toString().c_str()); +			}  		}  	}  } -void Topology::_reallyAddPeer(const SharedPtr<Peer> &p) -{ -	{ -		Mutex::Lock _l(_activePeers_m); -		_activePeers[p->identity().address()] = p; -	} -	try { -		uint64_t atmp[ZT_ADDRESS_LENGTH]; -		p->address().copyTo(atmp,ZT_ADDRESS_LENGTH); -		Buffer<ZT_PEER_MAX_SERIALIZED_LENGTH> b; -		p->serialize(b); -		b.zeroUnused(); -		_dbm_m.lock(); -		if (KISSDB_put(&_dbm,atmp,b.data())) { -			TRACE("error writing %s to peerdb",p->address().toString().c_str()); -		} else p->getAndResetDirty(); -		_dbm_m.unlock(); -	} catch ( ... ) { -		TRACE("unexpected exception flushing to peerdb"); -	} -} -  } // namespace ZeroTier | 
