diff options
author | Adam Ierymenko <adam.ierymenko@gmail.com> | 2016-04-18 10:21:38 -0700 |
---|---|---|
committer | Adam Ierymenko <adam.ierymenko@gmail.com> | 2016-04-18 10:21:38 -0700 |
commit | 02c3727ccdf27bf9ce77877f382d300a47531810 (patch) | |
tree | ae2d280da87499f377a108b9ef9fb0b87c97b2b0 /service/ClusterGeoIpService.cpp | |
parent | af471af8ef2179f07a81d52ca90351a04dd8bf56 (diff) | |
download | infinitytier-02c3727ccdf27bf9ce77877f382d300a47531810.tar.gz infinitytier-02c3727ccdf27bf9ce77877f382d300a47531810.zip |
.
Diffstat (limited to 'service/ClusterGeoIpService.cpp')
-rw-r--r-- | service/ClusterGeoIpService.cpp | 275 |
1 files changed, 144 insertions, 131 deletions
diff --git a/service/ClusterGeoIpService.cpp b/service/ClusterGeoIpService.cpp index e9a71ba1..c1483cac 100644 --- a/service/ClusterGeoIpService.cpp +++ b/service/ClusterGeoIpService.cpp @@ -18,168 +18,181 @@ #ifdef ZT_ENABLE_CLUSTER -#include <stdio.h> -#include <stdlib.h> -#include <string.h> -#include <stdint.h> -#include <unistd.h> -#include <sys/types.h> -#include <sys/stat.h> -#include <sys/wait.h> -#include <signal.h> -#include <errno.h> - -#include <iostream> +#include <math.h> + +#include <cmath> #include "ClusterGeoIpService.hpp" + #include "../node/Utils.hpp" +#include "../node/InetAddress.hpp" #include "../osdep/OSUtils.hpp" -// 120 days -#define ZT_CLUSTERGEOIPSERVICE_INTERNAL_CACHE_TTL 10368000000ULL +#define ZT_CLUSTERGEOIPSERVICE_FILE_MODIFICATION_CHECK_EVERY 10000 namespace ZeroTier { -ClusterGeoIpService::ClusterGeoIpService(const char *pathToExe) : - _pathToExe(pathToExe), - _sOutputFd(-1), - _sInputFd(-1), - _sPid(0), - _run(true) +ClusterGeoIpService::ClusterGeoIpService() : + _pathToCsv(), + _ipStartColumn(-1), + _ipEndColumn(-1), + _latitudeColumn(-1), + _longitudeColumn(-1), + _lastFileCheckTime(0), + _csvModificationTime(0), + _csvFileSize(0) { - _thread = Thread::start(this); } ClusterGeoIpService::~ClusterGeoIpService() { - _run = false; - long p = _sPid; - if (p > 0) { - ::kill(p,SIGTERM); - Thread::sleep(500); - ::kill(p,SIGKILL); - } - Thread::join(_thread); } bool ClusterGeoIpService::locate(const InetAddress &ip,int &x,int &y,int &z) { - InetAddress ipNoPort(ip); - ipNoPort.setPort(0); // we index cache by IP only - const uint64_t now = OSUtils::now(); - - bool r = false; - { - Mutex::Lock _l(_cache_m); - std::map< InetAddress,_CE >::iterator c(_cache.find(ipNoPort)); - if (c != _cache.end()) { - x = c->second.x; - y = c->second.y; - z = c->second.z; - if ((now - c->second.ts) < ZT_CLUSTERGEOIPSERVICE_INTERNAL_CACHE_TTL) - return true; - else r = true; // return true but refresh as well - } + Mutex::Lock _l(_lock); + + if ((_pathToCsv.length() > 0)&&((OSUtils::now() - _lastFileCheckTime) > ZT_CLUSTERGEOIPSERVICE_FILE_MODIFICATION_CHECK_EVERY)) { + _lastFileCheckTime = OSUtils::now(); + if ((_csvFileSize != OSUtils::getFileSize(_pathToCsv.c_str()))||(_csvModificationTime != OSUtils::getLastModified(_pathToCsv.c_str()))) + _load(_pathToCsv.c_str(),_ipStartColumn,_ipEndColumn,_latitudeColumn,_longitudeColumn); } - { - Mutex::Lock _l(_sOutputLock); - if (_sOutputFd >= 0) { - std::string ips(ipNoPort.toIpString()); - ips.push_back('\n'); - //fprintf(stderr,"ClusterGeoIpService: << %s",ips.c_str()); - ::write(_sOutputFd,ips.data(),ips.length()); + /* We search by looking up the upper bound of the sorted vXdb vectors + * and then iterating down for a matching IP range. We stop when we hit + * the beginning or an entry whose start and end are before the IP we + * are searching. */ + + if ((ip.ss_family == AF_INET)&&(_v4db.size() > 0)) { + _V4E key; + key.start = Utils::ntoh((uint32_t)(reinterpret_cast<const struct sockaddr_in *>(&ip)->sin_addr.s_addr)); + std::vector<_V4E>::const_iterator i(std::upper_bound(_v4db.begin(),_v4db.end(),key)); + while (i != _v4db.begin()) { + --i; + if ((key->start >= i->start)&&(key->start <= i->end)) { + x = i->x; + y = i->y; + z = i->z; + return true; + } else if ((key->start > i->start)&&(key->start > i->end)) + break; + } + } else if ((ip.ss_family == AF_INET6)&&(_v6db.size() > 0)) { + _V6E key; + memcpy(key.start,reinterpret_cast<const struct sockaddr_in6 *>(&ip)->sin6_addr.s6_addr,16); + std::vector<_V6E>::const_iterator i(std::upper_bound(_v6db.begin(),_v6db.end(),key)); + while (i != _v6db.begin()) { + --i; + const int s_vs_s = memcmp(key->start,i->start,16); + const int s_vs_e = memcmp(key->start,i->end,16); + if ((s_vs_s >= 0)&&(s_vs_e <= 0)) { + x = i->x; + y = i->y; + z = i->z; + return true; + } else if ((s_vs_s > 0)&&(s_vs_e > 0)) + break; } } - return r; + return false; } -void ClusterGeoIpService::threadMain() - throw() +static void _parseLine(const char *line,std::vector<_V4E> &v4db,std::vector<_V6E> &v6db,int ipStartColumn,int ipEndColumn,int latitudeColumn,int longitudeColumn) { - char linebuf[65536]; - char buf[65536]; - long n,lineptr; - - while (_run) { - { - Mutex::Lock _l(_sOutputLock); - - _sOutputFd = -1; - _sInputFd = -1; - _sPid = 0; - - int stdinfds[2] = { 0,0 }; // sub-process's stdin, our output - int stdoutfds[2] = { 0,0 }; // sub-process's stdout, our input - ::pipe(stdinfds); - ::pipe(stdoutfds); - - long p = (long)::vfork(); - if (p < 0) { - Thread::sleep(500); - continue; - } else if (p == 0) { - ::close(stdinfds[1]); - ::close(stdoutfds[0]); - ::dup2(stdinfds[0],STDIN_FILENO); - ::dup2(stdoutfds[1],STDOUT_FILENO); - ::execl(_pathToExe.c_str(),_pathToExe.c_str(),(const char *)0); - ::exit(1); - } else { - ::close(stdinfds[0]); - ::close(stdoutfds[1]); - _sOutputFd = stdinfds[1]; - _sInputFd = stdoutfds[0]; - _sPid = p; + std::vector<std::string> ls(Utils::split(line,",\t","\\","\"'")); + if ( ((ipStartColumn >= 0)&&(ipStartColumn < (int)ls.size()))&& + ((ipEndColumn >= 0)&&(ipEndColumn < (int)ls.size()))&& + ((latitudeColumn >= 0)&&(latitudeColumn < (int)ls.size()))&& + ((longitudeColumn >= 0)&&(longitudeColumn < (int)ls.size())) ) { + InetAddress ipStart(ls[ipStartColumn].c_str(),0); + InetAddress ipEnd(ls[ipEndColumn].c_str(),0); + const double lat = strtod(ls[latitudeColumn].c_str(),(char **)0); + const double lon = strtod(ls[longitudeColumn].c_str(),(char **)0); + + if ((ipStart.ss_family == ipEnd.ss_family)&&(ipStart)&&(ipEnd)&&(std::isfinite(lat))&&(std::isfinite(lon))) { + const double latRadians = lat * 0.01745329251994; // PI / 180 + const double lonRadians = lon * 0.01745329251994; // PI / 180 + const double cosLat = cos(latRadians); + const int x = (int)round((-6371.0) * cosLat * Math.cos(lonRadians)); // 6371 == Earth's approximate radius in kilometers + const int y = (int)round(6371.0 * sin(latRadians)); + const int z = (int)round(6371.0 * cosLat * Math.sin(lonRadians)); + + if (ipStart.ss_family == AF_INET) { + v4db.push_back(_V4E()); + v4db.back().start = Utils::ntoh((uint32_t)(reinterpret_cast<const struct sockaddr_in *>(&ipStart)->sin_addr.s_addr)); + v4db.back().end = Utils::ntoh((uint32_t)(reinterpret_cast<const struct sockaddr_in *>(&ipEnd)->sin_addr.s_addr)); + v4db.back().x = x; + v4db.back().y = y; + v4db.back().z = z; + } else if (ipStart.ss_family == AF_INET6) { + v6db.push_back(_V6E()); + memcpy(v6db.back().start,reinterpret_cast<const struct sockaddr_in6 *>(&ipStart)->sin6_addr.s6_addr,16); + memcpy(v6db.back().end,reinterpret_cast<const struct sockaddr_in6 *>(&ipEnd)->sin6_addr.s6_addr,16); + v6db.back().x = x; + v6db.back().y = y; + v6db.back().z = z; } } + } +} - lineptr = 0; - while (_run) { - n = ::read(_sInputFd,buf,sizeof(buf)); - if (n <= 0) { - if (errno == EINTR) - continue; - else break; - } - for(long i=0;i<n;++i) { - if (lineptr > (long)sizeof(linebuf)) - lineptr = 0; - if ((buf[i] == '\n')||(buf[i] == '\r')) { +long ClusterGeoIpService::_load(const char *pathToCsv,int ipStartColumn,int ipEndColumn,int latitudeColumn,int longitudeColumn) +{ + // assumes _lock is locked + + FILE *f = fopen(pathToCsv,"rb"); + if (!f) + return -1; + + std::vector<_V4E> v4db; + std::vector<_V6E> v6db; + + char buf[4096]; + char linebuf[1024]; + unsigned int lineptr = 0; + for(;;) { + int n = (int)fread(buf,1,sizeof(buf),f); + if (n <= 0) + break; + for(int i=0;i<n;++i) { + if ((buf[i] == '\r')||(buf[i] == '\n')||(buf[i] == (char)0)) { + if (lineptr) { linebuf[lineptr] = (char)0; - if (lineptr > 0) { - //fprintf(stderr,"ClusterGeoIpService: >> %s\n",linebuf); - try { - std::vector<std::string> result(Utils::split(linebuf,",","","")); - if ((result.size() >= 7)&&(result[1] == "1")) { - InetAddress rip(result[0],0); - if ((rip.ss_family == AF_INET)||(rip.ss_family == AF_INET6)) { - _CE ce; - ce.ts = OSUtils::now(); - ce.x = (int)::strtol(result[4].c_str(),(char **)0,10); - ce.y = (int)::strtol(result[5].c_str(),(char **)0,10); - ce.z = (int)::strtol(result[6].c_str(),(char **)0,10); - //fprintf(stderr,"ClusterGeoIpService: %s is at %d,%d,%d\n",rip.toIpString().c_str(),ce.x,ce.y,ce.z); - { - Mutex::Lock _l2(_cache_m); - _cache[rip] = ce; - } - } - } - } catch ( ... ) {} - } - lineptr = 0; - } else linebuf[lineptr++] = buf[i]; - } + _parseLine(linebuf,v4db,v6db,ipStartColumn,ipEndColumn,latitudeColumn,longitudeColumn); + } + lineptr = 0; + } else if (lineptr < (unsigned int)sizeof(linebuf)) + linebuf[lineptr++] = buf[i]; } + } + if (lineptr) { + linebuf[lineptr] = (char)0; + _parseLine(linebuf,v4db,v6db,ipStartColumn,ipEndColumn,latitudeColumn,longitudeColumn); + } + + fclose(f); + + if ((v4db.size() > 0)||(v6db.size() > 0)) { + std::sort(v4db.begin(),v4db.end()); + std::sort(v6db.begin(),v6db.end()); + + _pathToCsv = pathToCsv; + _ipStartColumn = ipStartColumn; + _ipEndColumn = ipEndColumn; + _latitudeColumn = latitudeColumn; + _longitudeColumn = longitudeColumn; + + _lastFileCheckTime = OSUtils::now(); + _csvModificationTime = OSUtils::getLastModified(pathToCsv); + _csvFileSize = OSUtils::getFileSize(pathToCsv); + + _v4db.swap(v4db); + _v6db.swap(v6db); - ::close(_sOutputFd); - ::close(_sInputFd); - ::kill(_sPid,SIGTERM); - Thread::sleep(250); - ::kill(_sPid,SIGKILL); - ::waitpid(_sPid,(int *)0,0); + return (long)(_v4db.size() + _v6db.size()); + } else { + return 0; } } |