summaryrefslogtreecommitdiff
path: root/osdep
diff options
context:
space:
mode:
authorJoseph Henry <josephjah@gmail.com>2018-05-01 16:32:15 -0700
committerJoseph Henry <josephjah@gmail.com>2018-05-01 16:32:15 -0700
commit6a2ba4baca326272c45930208b70cfedf8cb1638 (patch)
tree434403aecca63908909678bd234ef8b4ffb1d1e4 /osdep
parent836d897aecc193ec3477e67858237a3f97819024 (diff)
downloadinfinitytier-6a2ba4baca326272c45930208b70cfedf8cb1638.tar.gz
infinitytier-6a2ba4baca326272c45930208b70cfedf8cb1638.zip
Introduced basic multipath support
Diffstat (limited to 'osdep')
-rw-r--r--osdep/Binder.hpp15
-rw-r--r--osdep/Phy.hpp195
2 files changed, 209 insertions, 1 deletions
diff --git a/osdep/Binder.hpp b/osdep/Binder.hpp
index 93fad9f1..6e13836c 100644
--- a/osdep/Binder.hpp
+++ b/osdep/Binder.hpp
@@ -388,6 +388,7 @@ public:
_bindings[_bindingCount].udpSock = udps;
_bindings[_bindingCount].tcpListenSock = tcps;
_bindings[_bindingCount].address = ii->first;
+ phy.setIfName(udps, (char*)ii->second.c_str(), ii->second.length());
++_bindingCount;
}
} else {
@@ -455,6 +456,20 @@ public:
return false;
}
+ /**
+ * Get a list of socket pointers for all bindings.
+ *
+ * @return A list of socket pointers for current bindings
+ */
+ inline std::vector<PhySocket*> getBoundSockets()
+ {
+ std::vector<PhySocket*> sockets;
+ for (int i=0; i<ZT_BINDER_MAX_BINDINGS; i++) {
+ sockets.push_back(_bindings[i].udpSock);
+ }
+ return sockets;
+ }
+
private:
_Binding _bindings[ZT_BINDER_MAX_BINDINGS];
std::atomic<unsigned int> _bindingCount;
diff --git a/osdep/Phy.hpp b/osdep/Phy.hpp
index e359ccdd..d4f68681 100644
--- a/osdep/Phy.hpp
+++ b/osdep/Phy.hpp
@@ -27,6 +27,8 @@
#ifndef ZT_PHY_HPP
#define ZT_PHY_HPP
+#include "../osdep/OSUtils.hpp"
+
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
@@ -86,6 +88,22 @@ namespace ZeroTier {
*/
typedef void PhySocket;
+struct link_test_record
+{
+ link_test_record(PhySocket *_s, uint64_t _id, uint64_t _egress_time, uint32_t _length) :
+ s(_s),
+ id(_id),
+ egress_time(_egress_time),
+ length(_length)
+ {
+ //
+ }
+ PhySocket *s;
+ uint64_t id;
+ uint64_t egress_time;
+ uint32_t length;
+};
+
/**
* Simple templated non-blocking sockets implementation
*
@@ -154,10 +172,17 @@ private:
struct PhySocketImpl
{
+ PhySocketImpl() :
+ throughput(0)
+ {
+ memset(ifname, 0, sizeof(ifname));
+ }
PhySocketType type;
ZT_PHY_SOCKFD_TYPE sock;
void *uptr; // user-settable pointer
ZT_PHY_SOCKADDR_STORAGE_TYPE saddr; // remote for TCP_OUT and TCP_IN, local for TCP_LISTEN, RAW, and UDP
+ char ifname[16];
+ uint64_t throughput;
};
std::list<PhySocketImpl> _socks;
@@ -173,6 +198,7 @@ private:
bool _noDelay;
bool _noCheck;
+ std::vector<struct link_test_record*> link_test_records;
public:
/**
@@ -250,6 +276,173 @@ public:
static inline void** getuptr(PhySocket *s) throw() { return &(reinterpret_cast<PhySocketImpl *>(s)->uptr); }
/**
+ * @param s Socket object
+ * @param nameBuf Buffer to store name of interface which this Socket object is bound to
+ * @param buflen Length of buffer to copy name into
+ */
+ static inline void getIfName(PhySocket *s, char *nameBuf, int buflen)
+ {
+ memcpy(nameBuf, reinterpret_cast<PhySocketImpl *>(s)->ifname, buflen);
+ }
+
+ /**
+ * @param s Socket object
+ * @param ifname Buffer containing name of interface that this Socket object is bound to
+ * @param len Length of name of interface
+ */
+ static inline void setIfName(PhySocket *s, char *ifname, int len)
+ {
+ memcpy(&(reinterpret_cast<PhySocketImpl *>(s)->ifname), ifname, len);
+ }
+
+ /**
+ * Get result of most recent throughput test
+ *
+ * @param s Socket object
+ */
+ inline uint64_t getThroughput(PhySocket *s)
+ {
+ PhySocketImpl *sws = (reinterpret_cast<PhySocketImpl *>(s));
+ return sws ? sws->throughput : 0;
+ }
+
+ /**
+ * Whether or not the socket object is in a closed state
+ *
+ * @param s Socket object
+ * @return true if socket is closed, false if otherwise
+ */
+ inline bool isClosed(PhySocket *s)
+ {
+ PhySocketImpl *sws = (reinterpret_cast<PhySocketImpl *>(s));
+ return sws->type == ZT_PHY_SOCKET_CLOSED;
+ }
+
+ /**
+ * Get state of socket object
+ *
+ * @param s Socket object
+ * @return State of socket
+ */
+ inline int getState(PhySocket *s)
+ {
+ PhySocketImpl *sws = (reinterpret_cast<PhySocketImpl *>(s));
+ return sws->type;
+ }
+
+ /**
+ * In the event that this socket is erased, we need a way to convey to the multipath logic
+ * that this path is no longer valid.
+ *
+ * @param s Socket object
+ * @return Whether the state of this socket is within an acceptable range of values
+ */
+ inline bool isValidState(PhySocket *s)
+ {
+ PhySocketImpl *sws = (reinterpret_cast<PhySocketImpl *>(s));
+ return sws->type >= ZT_PHY_SOCKET_CLOSED && sws->type <= ZT_PHY_SOCKET_UNIX_LISTEN;
+ }
+
+ /**
+ * Send a datagram of a known size to a selected peer and record egress time. The peer
+ * shall eventually respond by echoing back a smaller datagram.
+ *
+ * @param s Socket object
+ * @param remoteAddress Address of remote peer to receive link test packet
+ * @param data Buffer containing random packet data
+ * @param len Length of packet data buffer
+ * @return Number of bytes successfully written to socket
+ */
+ inline int test_link_speed(PhySocket *s, const struct sockaddr *to, void *data, uint32_t len) {
+ if (!reinterpret_cast<PhySocketImpl *>(s)) {
+ return 0;
+ }
+ uint64_t *buf = (uint64_t*)data;
+ uint64_t id = buf[0];
+ if (to->sa_family != AF_INET && to->sa_family != AF_INET6) {
+ return 0;
+ }
+ uint64_t egress_time = OSUtils::now();
+ PhySocketImpl *sws = (reinterpret_cast<PhySocketImpl *>(s));
+#if defined(_WIN32) || defined(_WIN64)
+ return ((long)::sendto(sws->sock,reinterpret_cast<const char *>(data),len,0,to,(to->sa_family == AF_INET6) ? sizeof(struct sockaddr_in6) : sizeof(struct sockaddr_in)) == (long)len);
+#else
+ int w = ::sendto(sws->sock,data,len,0,to,(to->sa_family == AF_INET6) ? sizeof(struct sockaddr_in6) : sizeof(struct sockaddr_in));
+#endif
+ if (w > 0) {
+ link_test_records.push_back(new link_test_record(s, id, egress_time, len));
+ }
+ return w;
+ }
+
+ /**
+ * Remove link speed test records which have timed-out and record a 0 bits/s measurement
+ */
+ inline void refresh_link_speed_records()
+ {
+ for(int i=0;i<link_test_records.size();i++) {
+ if(OSUtils::now() - link_test_records[i]->egress_time > ZT_LINK_TEST_TIMEOUT) {
+ PhySocketImpl *sws = (reinterpret_cast<PhySocketImpl *>(link_test_records[i]->s));
+ if (sws) {
+ sws->throughput = 0;
+ }
+ link_test_records.erase(link_test_records.begin() + i);
+ }
+ }
+ }
+
+ /**
+ * Upon receipt of a link speed test datagram we echo back only the identification portion
+ *
+ * @param s Socket object
+ * @param from Address of remote peer that sent this datagram
+ * @param data Buffer containing datagram's contents
+ * @param len Length of datagram
+ * @return Number of bytes successfully written to socket in response
+ */
+ inline int respond_to_link_test(PhySocket *s,const struct sockaddr *from,void *data,unsigned long len) {
+ PhySocketImpl *sws = (reinterpret_cast<PhySocketImpl *>(s));
+ uint64_t *id = (uint64_t*)data;
+#if defined(_WIN32) || defined(_WIN64)
+ return ((long)::sendto(sws->sock,reinterpret_cast<const char *>(data),len,0,from,(from->sa_family == AF_INET6) ? sizeof(struct sockaddr_in6) : sizeof(struct sockaddr_in)) == (long)len);
+#else
+ int w = ::sendto(sws->sock,id,sizeof(id[0]),0,from,(from->sa_family == AF_INET6) ? sizeof(struct sockaddr_in6) : sizeof(struct sockaddr_in));
+#endif
+ return w;
+ }
+
+ /**
+ * Upon receipt of a response to our original link test datagram, correlate this new datagram with the record
+ * of the one we sent. Compute the transit time and update the throughput field of the relevant socket. This
+ * value will later be read by the path quality estimation logic located in Path.hpp.
+ *
+ * @param s Socket object
+ * @param from Address of remote peer that sent this datagram
+ * @param data Buffer containing datagram contents (ID of original link test datagram)
+ * @param len Length of datagram
+ * @return true if datagram correponded to previous record, false if otherwise
+ */
+ inline bool handle_link_test_response(PhySocket *s,const struct sockaddr *from,void *data,unsigned long len) {
+ uint64_t *id = (uint64_t*)data;
+ for(int i=0;i<link_test_records.size();i++) {
+ if(link_test_records[i]->id == id[0]) {
+ float rtt = (OSUtils::now()-link_test_records[i]->egress_time) / (float)1000; // s
+ uint32_t sz = (link_test_records[i]->length) * 8; // bits
+ float transit_time = rtt / 2.0;
+ int64_t raw = sz / transit_time;
+ PhySocketImpl *sws = (reinterpret_cast<PhySocketImpl *>(s));
+ if (sws) {
+ sws->throughput = raw;
+ }
+ delete link_test_records[i];
+ link_test_records.erase(link_test_records.begin() + i);
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
* Cause poll() to stop waiting immediately
*
* This can be used to reset the polling loop after changes that require
@@ -985,7 +1178,7 @@ public:
ZT_PHY_SOCKFD_TYPE sock = s->sock; // if closed, s->sock becomes invalid as s is no longer dereferencable
if ((FD_ISSET(sock,&wfds))&&(FD_ISSET(sock,&_writefds))) {
try {
- _handler->phyOnUnixWritable((PhySocket *)&(*s),&(s->uptr),false);
+ _handler->phyOnUnixWritable((PhySocket *)&(*s),&(s->uptr));
} catch ( ... ) {}
}
if (FD_ISSET(sock,&rfds)) {