From f260c2839c73afa9898547398e1911c585904132 Mon Sep 17 00:00:00 2001 From: Adam Ierymenko Date: Wed, 31 Jul 2013 17:24:59 -0400 Subject: Local service plugin stuff... work in progress. --- node/Service.cpp | 192 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 192 insertions(+) create mode 100644 node/Service.cpp (limited to 'node/Service.cpp') diff --git a/node/Service.cpp b/node/Service.cpp new file mode 100644 index 00000000..e0d06792 --- /dev/null +++ b/node/Service.cpp @@ -0,0 +1,192 @@ +/* + * ZeroTier One - Global Peer to Peer Ethernet + * Copyright (C) 2012-2013 ZeroTier Networks LLC + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + * + * -- + * + * ZeroTier may be used and distributed under the terms of the GPLv3, which + * are available at: http://www.gnu.org/licenses/gpl-3.0.html + * + * If you would like to embed ZeroTier into a commercial application or + * redistribute it in a modified binary form, please contact ZeroTier Networks + * LLC. Start here: http://www.zerotier.com/ + */ + +#include "Constants.hpp" + +#ifndef __WINDOWS__ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "Service.hpp" +#include "RuntimeEnvironment.hpp" +#include "Utils.hpp" +#include "Logger.hpp" + +namespace ZeroTier { + +Service::Service(const RuntimeEnvironment *renv,const char *name,const char *path,void (*handler)(void *,Service &,const Dictionary &),void *arg) : + _r(renv), + _path(path), + _name(name), + _arg(arg), + _handler(handler), + _pid(-1), + _childStdin(0), + _childStdout(0), + _childStderr(0), + _run(true) +{ + start(); +} + +Service::~Service() +{ + _run = false; + long pid = _pid; + if (pid > 0) { + int st = 0; + ::kill(pid,SIGTERM); + for(int i=0;i<20;++i) { + if (waitpid(pid,&st,WNOHANG) == pid) { + pid = 0; + break; + } + Thread::sleep(100); + } + if (pid > 0) { + ::kill(pid,SIGKILL); + waitpid(pid,&st,0); + } + } + join(); +} + +bool Service::send(const Dictionary &msg) +{ + if (_childStdin <= 0) + return false; + + std::string mser = msg.toString(); + if (mser.length() > ZT_SERVICE_MAX_MESSAGE_SIZE) + return false; + + // This can technically block. We'll fix this if it ends up being a + // problem. + uint32_t len = Utils::hton((uint32_t)mser.length()); + if (write(_childStdin,&len,4) != 4) + return false; + if ((int)write(_childStdin,mser.data(),mser.length()) != (int)mser.length()) + return false; + + return true; +} + +void Service::main() + throw() +{ + fd_set readfds,writefds,exceptfds; + struct timeval tv; + + while (_run) { + if (_pid <= 0) { + LOG("launching service %s...",_name.c_str()); + + int in[2],out[2],err[2]; + pipe(in); + pipe(out); + pipe(err); + + long pid = fork(); + if (pid < 0) { + LOG("service %s terminating: could not fork!",_name.c_str()); + return; + } else if (pid) { + close(in[1]); + close(out[0]); + close(err[0]); + Thread::sleep(500); // give child time to start + _childStdin = in[1]; + _childStdout = out[0]; + _childStderr = err[0]; + } else { + dup2(in[0],STDIN_FILENO); + dup2(out[1],STDOUT_FILENO); + dup2(err[1],STDERR_FILENO); + execl(_path.c_str(),_path.c_str(),_r->homePath.c_str(),(const char *)0); + exit(-1); + } + } else { + int st = 0; + if (waitpid(_pid,&st,WNOHANG) == _pid) { + if (_childStdin > 0) close(_childStdin); + _childStdin = 0; + if (_childStdout > 0) close(_childStdout); + if (_childStderr > 0) close(_childStderr); + _pid = 0; + + if (!_run) + return; + + LOG("service %s exited with exit code: %d, delaying 1s to attempt relaunch",_name.c_str(),st); + + Thread::sleep(1000); // wait to relaunch + continue; + } + } + + FD_ZERO(&readfds); + FD_ZERO(&writefds); + FD_ZERO(&exceptfds); + + FD_SET(_childStdout,&readfds); + FD_SET(_childStderr,&readfds); + + tv.tv_sec = 1; + tv.tv_usec = 0; + select(std::max(_childStdout,_childStderr)+1,&readfds,&writefds,&exceptfds,&tv); + + if (!_run) { + if (_childStdin > 0) close(_childStdin); + _childStdin = 0; + if (_childStdout > 0) close(_childStdout); + if (_childStderr > 0) close(_childStderr); + return; + } + + if ((_childStderr > 0)&&(FD_ISSET(_childStderr,&readfds))) { + } + + if ((_childStdout > 0)&&(FD_ISSET(_childStdout,&readfds))) { + } + } +} + +} // namespace ZeroTier + +#endif // __WINDOWS__ + -- cgit v1.2.3 From ee9a811b81c5deefd459960936fc9f416c7aa8d2 Mon Sep 17 00:00:00 2001 From: Adam Ierymenko Date: Thu, 1 Aug 2013 10:11:59 -0400 Subject: Netconf service code, interacts with our MySQL database. --- netconf-plugin/Makefile | 6 ++ netconf-plugin/netconf.cpp | 196 ++++++++++++++++++++++++++++++++++++++++++--- node/Service.cpp | 43 ++++++++++ 3 files changed, 232 insertions(+), 13 deletions(-) (limited to 'node/Service.cpp') diff --git a/netconf-plugin/Makefile b/netconf-plugin/Makefile index e69de29b..bd6b052c 100644 --- a/netconf-plugin/Makefile +++ b/netconf-plugin/Makefile @@ -0,0 +1,6 @@ +all: + gcc -O6 -c ../ext/lz4/lz4hc.c ../ext/lz4/lz4.c + g++ -DZT_OSNAME="linux" -DZT_ARCH="x86_64" -I/usr/include/mysql -I../ext/bin/libcrypto/include -O -o netconf.service netconf.cpp ../node/Utils.cpp ../node/Identity.cpp ../node/EllipticCurveKeyPair.cpp ../node/Salsa20.cpp ../node/HMAC.cpp lz4.o lz4hc.o ../ext/bin/libcrypto/linux-x86_64/libcrypto.a -lmysqlpp + +clean: + rm -f *.o netconf.service diff --git a/netconf-plugin/netconf.cpp b/netconf-plugin/netconf.cpp index 57d3653b..a92ff8a0 100644 --- a/netconf-plugin/netconf.cpp +++ b/netconf-plugin/netconf.cpp @@ -52,8 +52,12 @@ #include #include -#include #include +#include +#include +#include +#include +#include #include #include @@ -62,32 +66,35 @@ #include #include -#include +#include #include "../node/Dictionary.hpp" +#include "../node/Identity.hpp" +#include "../node/Utils.hpp" using namespace ZeroTier; using namespace mysqlpp; static Connection *dbCon = (Connection *)0; +static char mysqlHost[64],mysqlPort[64],mysqlDatabase[64],mysqlUser[64],mysqlPassword[64]; static void connectOrReconnect() { - if (dbCon) - delete dbCon; - dbCon = new Connection(mysqlDatabase,mysqlHost,mysqlUser,mysqlPassword,(unsigned int)strtol(mysqlPort,(char **)0,10)); - if (dbCon->connected()) - break; - else { - fprintf(stderr,"Unable to connect to database server.\n"); - usleep(1000); + for(;;) { + if (dbCon) + delete dbCon; + dbCon = new Connection(mysqlDatabase,mysqlHost,mysqlUser,mysqlPassword,(unsigned int)strtol(mysqlPort,(char **)0,10)); + if (dbCon->connected()) + break; + else { + fprintf(stderr,"Unable to connect to database server.\n"); + usleep(1000); + } } } int main(int argc,char **argv) { - char mysqlHost[64],mysqlPort[64],mysqlDatabase[64],mysqlUser[64],mysqlPassword[64]; - { char *ee = getenv("ZT_NETCONF_MYSQL_HOST"); if (!ee) { @@ -96,7 +103,7 @@ int main(int argc,char **argv) } strcpy(mysqlHost,ee); ee = getenv("ZT_NETCONF_MYSQL_PORT"); - if (ee == null) + if (!ee) strcpy(mysqlPort,"3306"); else strcpy(mysqlPort,ee); ee = getenv("ZT_NETCONF_MYSQL_DATABASE"); @@ -119,9 +126,172 @@ int main(int argc,char **argv) strcpy(mysqlPassword,ee); } + char buf[4096]; + std::string dictBuf; + connectOrReconnect(); for(;;) { + if (read(STDIN_FILENO,buf,4) != 4) { + fprintf(stderr,"Error reading frame size from stdin\n"); + return -1; + } + unsigned int fsize = (unsigned int)ntohl(*((const uint32_t *)buf)); + while (dictBuf.length() < fsize) { + int n = (int)read(STDIN_FILENO,buf,std::min((int)sizeof(buf),(int)(fsize - dictBuf.length()))); + for(int i=0;iconnected()) connectOrReconnect(); + + try { + const std::string &command = msg.get("command"); + if (command == "config") { // NETWORK_CONFIG_REQUEST packet + Identity peerIdentity(msg.get("peerIdentity")); + uint64_t nwid = strtoull(msg.get("nwid").c_str(),(char **)0,16); + Dictionary meta; + if (msg.contains("meta")) + meta.fromString(msg.get("meta")); + + // Do quick signature check / sanity check + if (!peerIdentity.locallyValidate(false)) { + fprintf(stderr,"identity failed signature check: %s",peerIdentity.toString(false).c_str()); + continue; + } + + // Save identity if unknown + { + Query q = dbCon->query(); + q << "SELECT identity,identityValidated FROM Node WHERE id = " << peerIdentity.address().toInt(); + StoreQueryResult rs = q.store(); + if (rs.num_rows() > 0) { + if (rs[0]["identity"] != peerIdentity.toString(false)) { + // TODO: handle collisions... + continue; + } else if ((int)rs[0]["identityValidated"] == 0) { + // TODO: launch background validation + } + } else { + q = dbCon->query(); + uint64_t now = Utils::now(); + q << "INSERT INTO Node (id,creationTime,lastSeen,identity) VALUES (" << peerIdentity.address().toInt() << "," << now << "," << now << "," << peerIdentity.toString(false) << ")"; + if (!q.exec()) { + fprintf(stderr,"Error inserting Node row for peer %s, aborting netconf request",peerIdentity.address().toString().c_str()); + continue; + } + // TODO: launch background validation + } + } + + bool isOpen = false; + { + Query q = dbCon->query(); + q << "SELECT isOpen FROM Network WHERE id = " << nwid; + StoreQueryResult rs = q.store(); + if (rs.num_rows() > 0) + isOpen = ((int)rs[0]["isOpen"] > 0); + } + + Dictionary netconf; + + netconf["peer"] = peerIdentity.address().toString(); + sprintf(buf,"%.16llx",(unsigned long long)nwid); + netconf["nwid"] = buf; + netconf["isOpen"] = (isOpen ? "1" : "0"); + + if (!isOpen) { + // TODO: handle closed networks, look up private membership, + // generate signed cert. + } + + std::string ipv4Static,ipv6Static; + + { + // Check for IPv4 static assignments + Query q = dbCon->query(); + q << "SELECT INET_NTOA(ip) AS ip,netmaskBits FROM IPv4Static WHERE Node_id = " << peerIdentity.address().toInt() << " AND Network_id = " << nwid; + StoreQueryResult rs = q.store(); + if (rs.num_rows() > 0) { + for(int i=0;iquery(); + q << "SELECT ipNet,netmaskBits FROM IPv4AutoAssign WHERE Network_id = " << nwid; + rs = q.store(); + if (rs.num_rows() > 0) { + for(int aaRow=0;aaRow> netmaskBits); + tryIp |= ipNet; + + for(int k=0;k<100000;++k) { + Query q2 = dbCon->query(); + q2 << "INSERT INTO IPv4Static (Network_id,Node_id,ip,netmaskBits) VALUES (" << nwid << "," << peerIdentity.address().toInt() << "," << tryIp << "," << netmaskBits << ")"; + if (q2.exec()) { + sprintf(buf,"%u.%u.%u.%u",(unsigned int)((tryIp >> 24) & 0xff),(unsigned int)((tryIp >> 16) & 0xff),(unsigned int)((tryIp >> 8) & 0xff),(unsigned int)(tryIp & 0xff)); + if (ipv4Static.length()) + ipv4Static.push_back(','); + ipv4Static.append(buf); + ipv4Static.push_back('/'); + sprintf(buf,"%u",netmaskBits); + ipv4Static.append(buf); + break; + } else { // insert will fail if IP is in use due to uniqueness constraints in DB + ++tryIp; + if ((tryIp & 0xff) == 0) + tryIp |= 1; + tryIp &= (0xffffffff >> netmaskBits); + tryIp |= ipNet; + } + } + + if (ipv4Static.length()) + break; + } + } + } + } + + if (ipv4Static.length()) + netconf["ipv4Static"] = ipv4Static; + if (ipv6Static.length()) + netconf["ipv6Static"] = ipv6Static; + + Dictionary resp; + resp["peer"] = peerIdentity.address().toString(); + resp["nwid"] = msg.get("nwid"); + resp["requestId"] = msg.get("requestId"); + resp["netconf"] = netconf.toString(); + std::string respm = resp.toString(); + uint32_t respml = (uint32_t)htonl((uint32_t)respm.length()); + write(STDOUT_FILENO,&respml,4); + write(STDOUT_FILENO,respm.data(),respm.length()); + } + } catch (std::exception &exc) { + fprintf(stderr,"unexpected exception handling message: %s",exc.what()); + } catch ( ... ) { + fprintf(stderr,"unexpected exception handling message: unknown exception"); + } } } diff --git a/node/Service.cpp b/node/Service.cpp index e0d06792..88a6d15c 100644 --- a/node/Service.cpp +++ b/node/Service.cpp @@ -109,9 +109,14 @@ bool Service::send(const Dictionary &msg) void Service::main() throw() { + char buf[4096]; fd_set readfds,writefds,exceptfds; struct timeval tv; + std::string stderrBuf; + std::string stdoutBuf; + unsigned int stdoutExpecting = 0; + while (_run) { if (_pid <= 0) { LOG("launching service %s...",_name.c_str()); @@ -133,10 +138,15 @@ void Service::main() _childStdin = in[1]; _childStdout = out[0]; _childStderr = err[0]; + fcntl(_childStdout,F_SETFL,O_NONBLOCK); + fcntl(_childStderr,F_SETFL,O_NONBLOCK); } else { dup2(in[0],STDIN_FILENO); dup2(out[1],STDOUT_FILENO); dup2(err[1],STDERR_FILENO); + close(in[1]); + close(out[0]); + close(err[0]); execl(_path.c_str(),_path.c_str(),_r->homePath.c_str(),(const char *)0); exit(-1); } @@ -179,9 +189,42 @@ void Service::main() } if ((_childStderr > 0)&&(FD_ISSET(_childStderr,&readfds))) { + int n = (int)read(_childStderr,buf,sizeof(buf)); + for(int i=0;i 0)&&(FD_ISSET(_childStdout,&readfds))) { + int n = (int)read(_childStdout,buf,sizeof(buf)); + for(int i=0;i ZT_SERVICE_MAX_MESSAGE_SIZE) { + LOG("message size overrun from service %s: %u bytes -- restarting service",_name.c_str(),stdoutExpecting); + stdoutExpecting = 0; + kill(_pid,SIGKILL); + break; + } + } + } } } } -- cgit v1.2.3 From 2a6b74746edbfde0b0f1468cdf153263670c908a Mon Sep 17 00:00:00 2001 From: Adam Ierymenko Date: Fri, 2 Aug 2013 14:25:23 -0400 Subject: Netconf service itself works, time to integrate. --- netconf-service/Makefile | 5 +- netconf-service/netconf-test.cpp | 17 ++++++ netconf-service/netconf.cpp | 108 +++++++++++++++++++++++++-------------- node/Identity.hpp | 4 +- node/Service.cpp | 22 +++++--- node/Thread.cpp | 2 +- 6 files changed, 107 insertions(+), 51 deletions(-) (limited to 'node/Service.cpp') diff --git a/netconf-service/Makefile b/netconf-service/Makefile index bd6b052c..af737a97 100644 --- a/netconf-service/Makefile +++ b/netconf-service/Makefile @@ -1,6 +1,7 @@ all: gcc -O6 -c ../ext/lz4/lz4hc.c ../ext/lz4/lz4.c - g++ -DZT_OSNAME="linux" -DZT_ARCH="x86_64" -I/usr/include/mysql -I../ext/bin/libcrypto/include -O -o netconf.service netconf.cpp ../node/Utils.cpp ../node/Identity.cpp ../node/EllipticCurveKeyPair.cpp ../node/Salsa20.cpp ../node/HMAC.cpp lz4.o lz4hc.o ../ext/bin/libcrypto/linux-x86_64/libcrypto.a -lmysqlpp + g++ -DZT_OSNAME="linux" -DZT_ARCH="x86_64" -I/usr/include/mysql -I../ext/bin/libcrypto/include -O -pthread -o netconf.service netconf.cpp ../node/Utils.cpp ../node/Identity.cpp ../node/EllipticCurveKeyPair.cpp ../node/Salsa20.cpp ../node/HMAC.cpp lz4.o lz4hc.o ../ext/bin/libcrypto/linux-x86_64/libcrypto.a -lmysqlpp + g++ -DZT_OSNAME="linux" -DZT_ARCH="x86_64" -I/usr/include/mysql -I../ext/bin/libcrypto/include -O -pthread -o netconf-test netconf-test.cpp ../node/Utils.cpp ../node/Identity.cpp ../node/EllipticCurveKeyPair.cpp ../node/Salsa20.cpp ../node/HMAC.cpp ../node/Logger.cpp ../node/Service.cpp ../node/Thread.cpp lz4.o lz4hc.o ../ext/bin/libcrypto/linux-x86_64/libcrypto.a clean: - rm -f *.o netconf.service + rm -f *.o netconf.service netconf-test diff --git a/netconf-service/netconf-test.cpp b/netconf-service/netconf-test.cpp index aadaa51d..6c7ef9f2 100644 --- a/netconf-service/netconf-test.cpp +++ b/netconf-service/netconf-test.cpp @@ -42,11 +42,13 @@ #include "../node/Identity.hpp" #include "../node/RuntimeEnvironment.hpp" #include "../node/Logger.hpp" +#include "../node/Thread.hpp" using namespace ZeroTier; static void svcHandler(void *arg,Service &svc,const Dictionary &msg) { + std::cout << msg.toString(); } int main(int argc,char **argv) @@ -59,8 +61,23 @@ int main(int argc,char **argv) std::vector population; for(;;) { + Identity id; if ((population.empty())||(rand() < (RAND_MAX / 4))) { + id.generate(); + population.push_back(id); + std::cout << "Testing with new identity: " << id.address().toString() << std::endl; } else { + id = population[rand() % population.size()]; + Thread::sleep(1000); + std::cout << "Testing with existing identity: " << id.address().toString() << std::endl; } + + Dictionary request; + request["type"] = "netconf-request"; + request["peerId"] = id.toString(false); + request["nwid"] = "6c92786fee000001"; + request["requestId"] = "12345"; + + svc.send(request); } } diff --git a/netconf-service/netconf.cpp b/netconf-service/netconf.cpp index a92ff8a0..01f33120 100644 --- a/netconf-service/netconf.cpp +++ b/netconf-service/netconf.cpp @@ -55,6 +55,7 @@ #include #include #include +#include #include #include #include @@ -71,24 +72,34 @@ #include "../node/Dictionary.hpp" #include "../node/Identity.hpp" #include "../node/Utils.hpp" +#include "../node/Mutex.hpp" using namespace ZeroTier; using namespace mysqlpp; +static Mutex stdoutWriteLock; static Connection *dbCon = (Connection *)0; static char mysqlHost[64],mysqlPort[64],mysqlDatabase[64],mysqlUser[64],mysqlPassword[64]; static void connectOrReconnect() { for(;;) { - if (dbCon) - delete dbCon; - dbCon = new Connection(mysqlDatabase,mysqlHost,mysqlUser,mysqlPassword,(unsigned int)strtol(mysqlPort,(char **)0,10)); - if (dbCon->connected()) - break; - else { - fprintf(stderr,"Unable to connect to database server.\n"); - usleep(1000); + delete dbCon; + try { + dbCon = new Connection(mysqlDatabase,mysqlHost,mysqlUser,mysqlPassword,(unsigned int)strtol(mysqlPort,(char **)0,10)); + if (dbCon->connected()) { + fprintf(stderr,"(re?)-connected to mysql server successfully\n"); + break; + } else { + fprintf(stderr,"unable to connect to database server (connection closed), trying again in 1s...\n"); + usleep(1000000); + } + } catch (std::exception &exc) { + fprintf(stderr,"unable to connect to database server (%s), trying again in 1s...\n",exc.what()); + usleep(1000000); + } catch ( ... ) { + fprintf(stderr,"unable to connect to database server (unknown exception), trying again in 1s...\n"); + usleep(1000000); } } } @@ -98,7 +109,7 @@ int main(int argc,char **argv) { char *ee = getenv("ZT_NETCONF_MYSQL_HOST"); if (!ee) { - fprintf(stderr,"Missing environment variable: ZT_NETCONF_MYSQL_HOST\n"); + fprintf(stderr,"missing environment variable: ZT_NETCONF_MYSQL_HOST\n"); return -1; } strcpy(mysqlHost,ee); @@ -108,57 +119,66 @@ int main(int argc,char **argv) else strcpy(mysqlPort,ee); ee = getenv("ZT_NETCONF_MYSQL_DATABASE"); if (!ee) { - fprintf(stderr,"Missing environment variable: ZT_NETCONF_MYSQL_DATABASE\n"); + fprintf(stderr,"missing environment variable: ZT_NETCONF_MYSQL_DATABASE\n"); return -1; } strcpy(mysqlDatabase,ee); ee = getenv("ZT_NETCONF_MYSQL_USER"); if (!ee) { - fprintf(stderr,"Missing environment variable: ZT_NETCONF_MYSQL_USER\n"); + fprintf(stderr,"missing environment variable: ZT_NETCONF_MYSQL_USER\n"); return -1; } strcpy(mysqlUser,ee); ee = getenv("ZT_NETCONF_MYSQL_PASSWORD"); if (!ee) { - fprintf(stderr,"Missing environment variable: ZT_NETCONF_MYSQL_PASSWORD\n"); + fprintf(stderr,"missing environment variable: ZT_NETCONF_MYSQL_PASSWORD\n"); return -1; } strcpy(mysqlPassword,ee); } - char buf[4096]; + char buf[131072]; std::string dictBuf; connectOrReconnect(); for(;;) { - if (read(STDIN_FILENO,buf,4) != 4) { - fprintf(stderr,"Error reading frame size from stdin\n"); - return -1; + for(int l=0;l<4;) { + int n = (int)read(STDIN_FILENO,buf + l,4 - l); + if (n < 0) { + fprintf(stderr,"error reading frame size from stdin: %s\n",strerror(errno)); + return -1; + } + l += n; } unsigned int fsize = (unsigned int)ntohl(*((const uint32_t *)buf)); + while (dictBuf.length() < fsize) { int n = (int)read(STDIN_FILENO,buf,std::min((int)sizeof(buf),(int)(fsize - dictBuf.length()))); + if (n < 0) { + fprintf(stderr,"error reading frame from stdin: %s\n",strerror(errno)); + return -1; + } for(int i=0;iconnected()) connectOrReconnect(); try { - const std::string &command = msg.get("command"); - if (command == "config") { // NETWORK_CONFIG_REQUEST packet - Identity peerIdentity(msg.get("peerIdentity")); - uint64_t nwid = strtoull(msg.get("nwid").c_str(),(char **)0,16); + const std::string &reqType = request.get("type"); + if (reqType == "netconf-request") { // NETWORK_CONFIG_REQUEST packet + Identity peerIdentity(request.get("peerId")); + uint64_t nwid = strtoull(request.get("nwid").c_str(),(char **)0,16); Dictionary meta; - if (msg.contains("meta")) - meta.fromString(msg.get("meta")); + if (request.contains("meta")) + meta.fromString(request.get("meta")); // Do quick signature check / sanity check if (!peerIdentity.locallyValidate(false)) { - fprintf(stderr,"identity failed signature check: %s",peerIdentity.toString(false).c_str()); + fprintf(stderr,"identity failed signature check: %s\n",peerIdentity.toString(false).c_str()); continue; } @@ -176,16 +196,22 @@ int main(int argc,char **argv) } } else { q = dbCon->query(); - uint64_t now = Utils::now(); - q << "INSERT INTO Node (id,creationTime,lastSeen,identity) VALUES (" << peerIdentity.address().toInt() << "," << now << "," << now << "," << peerIdentity.toString(false) << ")"; + q << "INSERT INTO Node (id,creationTime,lastSeen,identity) VALUES (" << peerIdentity.address().toInt() << "," << Utils::now() << ",0," << quote << peerIdentity.toString(false) << ")"; if (!q.exec()) { - fprintf(stderr,"Error inserting Node row for peer %s, aborting netconf request",peerIdentity.address().toString().c_str()); + fprintf(stderr,"error inserting Node row for peer %s, aborting netconf request\n",peerIdentity.address().toString().c_str()); continue; } // TODO: launch background validation } } + // Update lastSeen + { + Query q = dbCon->query(); + q << "UPDATE Node SET lastSeen = " << Utils::now() << " WHERE id = " << peerIdentity.address().toInt(); + q.exec(); + } + bool isOpen = false; { Query q = dbCon->query(); @@ -278,20 +304,26 @@ int main(int argc,char **argv) if (ipv6Static.length()) netconf["ipv6Static"] = ipv6Static; - Dictionary resp; - resp["peer"] = peerIdentity.address().toString(); - resp["nwid"] = msg.get("nwid"); - resp["requestId"] = msg.get("requestId"); - resp["netconf"] = netconf.toString(); - std::string respm = resp.toString(); - uint32_t respml = (uint32_t)htonl((uint32_t)respm.length()); - write(STDOUT_FILENO,&respml,4); - write(STDOUT_FILENO,respm.data(),respm.length()); + { + Dictionary response; + response["peer"] = peerIdentity.address().toString(); + response["nwid"] = request.get("nwid"); + response["type"] = "netconf-response"; + response["requestId"] = request.get("requestId"); + response["netconf"] = netconf.toString(); + std::string respm = response.toString(); + uint32_t respml = (uint32_t)htonl((uint32_t)respm.length()); + + stdoutWriteLock.lock(); + write(STDOUT_FILENO,&respml,4); + write(STDOUT_FILENO,respm.data(),respm.length()); + stdoutWriteLock.unlock(); + } } } catch (std::exception &exc) { - fprintf(stderr,"unexpected exception handling message: %s",exc.what()); + fprintf(stderr,"unexpected exception handling message: %s\n",exc.what()); } catch ( ... ) { - fprintf(stderr,"unexpected exception handling message: unknown exception"); + fprintf(stderr,"unexpected exception handling message: unknown exception\n"); } } } diff --git a/node/Identity.hpp b/node/Identity.hpp index a9f78c8a..1cce4fb0 100644 --- a/node/Identity.hpp +++ b/node/Identity.hpp @@ -104,7 +104,7 @@ public: _keyPair((EllipticCurveKeyPair *)0) { if (!fromString(str)) - throw std::invalid_argument("invalid string-serialized identity"); + throw std::invalid_argument(std::string("invalid string-serialized identity: ") + str); } Identity(const std::string &str) @@ -112,7 +112,7 @@ public: _keyPair((EllipticCurveKeyPair *)0) { if (!fromString(str)) - throw std::invalid_argument("invalid string-serialized identity"); + throw std::invalid_argument(std::string("invalid string-serialized identity: ") + str); } template diff --git a/node/Service.cpp b/node/Service.cpp index 88a6d15c..c614e4e4 100644 --- a/node/Service.cpp +++ b/node/Service.cpp @@ -36,6 +36,7 @@ #include #include #include +#include #include #include #include @@ -109,7 +110,7 @@ bool Service::send(const Dictionary &msg) void Service::main() throw() { - char buf[4096]; + char buf[131072]; fd_set readfds,writefds,exceptfds; struct timeval tv; @@ -126,27 +127,30 @@ void Service::main() pipe(out); pipe(err); - long pid = fork(); + long pid = vfork(); if (pid < 0) { LOG("service %s terminating: could not fork!",_name.c_str()); return; } else if (pid) { - close(in[1]); - close(out[0]); - close(err[0]); + // Parent + close(in[0]); + close(out[1]); + close(err[1]); Thread::sleep(500); // give child time to start _childStdin = in[1]; _childStdout = out[0]; _childStderr = err[0]; fcntl(_childStdout,F_SETFL,O_NONBLOCK); fcntl(_childStderr,F_SETFL,O_NONBLOCK); + _pid = pid; } else { - dup2(in[0],STDIN_FILENO); - dup2(out[1],STDOUT_FILENO); - dup2(err[1],STDERR_FILENO); + // Child close(in[1]); close(out[0]); close(err[0]); + dup2(in[0],STDIN_FILENO); + dup2(out[1],STDOUT_FILENO); + dup2(err[1],STDERR_FILENO); execl(_path.c_str(),_path.c_str(),_r->homePath.c_str(),(const char *)0); exit(-1); } @@ -169,6 +173,8 @@ void Service::main() } } + // If we've made it here, _pid is running last we checked. + FD_ZERO(&readfds); FD_ZERO(&writefds); FD_ZERO(&exceptfds); diff --git a/node/Thread.cpp b/node/Thread.cpp index 71cfcaea..f650f6fc 100644 --- a/node/Thread.cpp +++ b/node/Thread.cpp @@ -75,7 +75,7 @@ void Thread::join() void Thread::sleep(unsigned long ms) { - usleep(ms); + usleep(ms * 1000); } void Thread::__intl_run() -- cgit v1.2.3