diff options
author | Joseph Henry <josephjah@gmail.com> | 2018-05-01 16:32:15 -0700 |
---|---|---|
committer | Joseph Henry <josephjah@gmail.com> | 2018-05-01 16:32:15 -0700 |
commit | 6a2ba4baca326272c45930208b70cfedf8cb1638 (patch) | |
tree | 434403aecca63908909678bd234ef8b4ffb1d1e4 /osdep | |
parent | 836d897aecc193ec3477e67858237a3f97819024 (diff) | |
download | infinitytier-6a2ba4baca326272c45930208b70cfedf8cb1638.tar.gz infinitytier-6a2ba4baca326272c45930208b70cfedf8cb1638.zip |
Introduced basic multipath support
Diffstat (limited to 'osdep')
-rw-r--r-- | osdep/Binder.hpp | 15 | ||||
-rw-r--r-- | osdep/Phy.hpp | 195 |
2 files changed, 209 insertions, 1 deletions
diff --git a/osdep/Binder.hpp b/osdep/Binder.hpp index 93fad9f1..6e13836c 100644 --- a/osdep/Binder.hpp +++ b/osdep/Binder.hpp @@ -388,6 +388,7 @@ public: _bindings[_bindingCount].udpSock = udps; _bindings[_bindingCount].tcpListenSock = tcps; _bindings[_bindingCount].address = ii->first; + phy.setIfName(udps, (char*)ii->second.c_str(), ii->second.length()); ++_bindingCount; } } else { @@ -455,6 +456,20 @@ public: return false; } + /** + * Get a list of socket pointers for all bindings. + * + * @return A list of socket pointers for current bindings + */ + inline std::vector<PhySocket*> getBoundSockets() + { + std::vector<PhySocket*> sockets; + for (int i=0; i<ZT_BINDER_MAX_BINDINGS; i++) { + sockets.push_back(_bindings[i].udpSock); + } + return sockets; + } + private: _Binding _bindings[ZT_BINDER_MAX_BINDINGS]; std::atomic<unsigned int> _bindingCount; diff --git a/osdep/Phy.hpp b/osdep/Phy.hpp index e359ccdd..d4f68681 100644 --- a/osdep/Phy.hpp +++ b/osdep/Phy.hpp @@ -27,6 +27,8 @@ #ifndef ZT_PHY_HPP #define ZT_PHY_HPP +#include "../osdep/OSUtils.hpp" + #include <stdio.h> #include <stdlib.h> #include <string.h> @@ -86,6 +88,22 @@ namespace ZeroTier { */ typedef void PhySocket; +struct link_test_record +{ + link_test_record(PhySocket *_s, uint64_t _id, uint64_t _egress_time, uint32_t _length) : + s(_s), + id(_id), + egress_time(_egress_time), + length(_length) + { + // + } + PhySocket *s; + uint64_t id; + uint64_t egress_time; + uint32_t length; +}; + /** * Simple templated non-blocking sockets implementation * @@ -154,10 +172,17 @@ private: struct PhySocketImpl { + PhySocketImpl() : + throughput(0) + { + memset(ifname, 0, sizeof(ifname)); + } PhySocketType type; ZT_PHY_SOCKFD_TYPE sock; void *uptr; // user-settable pointer ZT_PHY_SOCKADDR_STORAGE_TYPE saddr; // remote for TCP_OUT and TCP_IN, local for TCP_LISTEN, RAW, and UDP + char ifname[16]; + uint64_t throughput; }; std::list<PhySocketImpl> _socks; @@ -173,6 +198,7 @@ private: bool _noDelay; bool _noCheck; + std::vector<struct link_test_record*> link_test_records; public: /** @@ -250,6 +276,173 @@ public: static inline void** getuptr(PhySocket *s) throw() { return &(reinterpret_cast<PhySocketImpl *>(s)->uptr); } /** + * @param s Socket object + * @param nameBuf Buffer to store name of interface which this Socket object is bound to + * @param buflen Length of buffer to copy name into + */ + static inline void getIfName(PhySocket *s, char *nameBuf, int buflen) + { + memcpy(nameBuf, reinterpret_cast<PhySocketImpl *>(s)->ifname, buflen); + } + + /** + * @param s Socket object + * @param ifname Buffer containing name of interface that this Socket object is bound to + * @param len Length of name of interface + */ + static inline void setIfName(PhySocket *s, char *ifname, int len) + { + memcpy(&(reinterpret_cast<PhySocketImpl *>(s)->ifname), ifname, len); + } + + /** + * Get result of most recent throughput test + * + * @param s Socket object + */ + inline uint64_t getThroughput(PhySocket *s) + { + PhySocketImpl *sws = (reinterpret_cast<PhySocketImpl *>(s)); + return sws ? sws->throughput : 0; + } + + /** + * Whether or not the socket object is in a closed state + * + * @param s Socket object + * @return true if socket is closed, false if otherwise + */ + inline bool isClosed(PhySocket *s) + { + PhySocketImpl *sws = (reinterpret_cast<PhySocketImpl *>(s)); + return sws->type == ZT_PHY_SOCKET_CLOSED; + } + + /** + * Get state of socket object + * + * @param s Socket object + * @return State of socket + */ + inline int getState(PhySocket *s) + { + PhySocketImpl *sws = (reinterpret_cast<PhySocketImpl *>(s)); + return sws->type; + } + + /** + * In the event that this socket is erased, we need a way to convey to the multipath logic + * that this path is no longer valid. + * + * @param s Socket object + * @return Whether the state of this socket is within an acceptable range of values + */ + inline bool isValidState(PhySocket *s) + { + PhySocketImpl *sws = (reinterpret_cast<PhySocketImpl *>(s)); + return sws->type >= ZT_PHY_SOCKET_CLOSED && sws->type <= ZT_PHY_SOCKET_UNIX_LISTEN; + } + + /** + * Send a datagram of a known size to a selected peer and record egress time. The peer + * shall eventually respond by echoing back a smaller datagram. + * + * @param s Socket object + * @param remoteAddress Address of remote peer to receive link test packet + * @param data Buffer containing random packet data + * @param len Length of packet data buffer + * @return Number of bytes successfully written to socket + */ + inline int test_link_speed(PhySocket *s, const struct sockaddr *to, void *data, uint32_t len) { + if (!reinterpret_cast<PhySocketImpl *>(s)) { + return 0; + } + uint64_t *buf = (uint64_t*)data; + uint64_t id = buf[0]; + if (to->sa_family != AF_INET && to->sa_family != AF_INET6) { + return 0; + } + uint64_t egress_time = OSUtils::now(); + PhySocketImpl *sws = (reinterpret_cast<PhySocketImpl *>(s)); +#if defined(_WIN32) || defined(_WIN64) + return ((long)::sendto(sws->sock,reinterpret_cast<const char *>(data),len,0,to,(to->sa_family == AF_INET6) ? sizeof(struct sockaddr_in6) : sizeof(struct sockaddr_in)) == (long)len); +#else + int w = ::sendto(sws->sock,data,len,0,to,(to->sa_family == AF_INET6) ? sizeof(struct sockaddr_in6) : sizeof(struct sockaddr_in)); +#endif + if (w > 0) { + link_test_records.push_back(new link_test_record(s, id, egress_time, len)); + } + return w; + } + + /** + * Remove link speed test records which have timed-out and record a 0 bits/s measurement + */ + inline void refresh_link_speed_records() + { + for(int i=0;i<link_test_records.size();i++) { + if(OSUtils::now() - link_test_records[i]->egress_time > ZT_LINK_TEST_TIMEOUT) { + PhySocketImpl *sws = (reinterpret_cast<PhySocketImpl *>(link_test_records[i]->s)); + if (sws) { + sws->throughput = 0; + } + link_test_records.erase(link_test_records.begin() + i); + } + } + } + + /** + * Upon receipt of a link speed test datagram we echo back only the identification portion + * + * @param s Socket object + * @param from Address of remote peer that sent this datagram + * @param data Buffer containing datagram's contents + * @param len Length of datagram + * @return Number of bytes successfully written to socket in response + */ + inline int respond_to_link_test(PhySocket *s,const struct sockaddr *from,void *data,unsigned long len) { + PhySocketImpl *sws = (reinterpret_cast<PhySocketImpl *>(s)); + uint64_t *id = (uint64_t*)data; +#if defined(_WIN32) || defined(_WIN64) + return ((long)::sendto(sws->sock,reinterpret_cast<const char *>(data),len,0,from,(from->sa_family == AF_INET6) ? sizeof(struct sockaddr_in6) : sizeof(struct sockaddr_in)) == (long)len); +#else + int w = ::sendto(sws->sock,id,sizeof(id[0]),0,from,(from->sa_family == AF_INET6) ? sizeof(struct sockaddr_in6) : sizeof(struct sockaddr_in)); +#endif + return w; + } + + /** + * Upon receipt of a response to our original link test datagram, correlate this new datagram with the record + * of the one we sent. Compute the transit time and update the throughput field of the relevant socket. This + * value will later be read by the path quality estimation logic located in Path.hpp. + * + * @param s Socket object + * @param from Address of remote peer that sent this datagram + * @param data Buffer containing datagram contents (ID of original link test datagram) + * @param len Length of datagram + * @return true if datagram correponded to previous record, false if otherwise + */ + inline bool handle_link_test_response(PhySocket *s,const struct sockaddr *from,void *data,unsigned long len) { + uint64_t *id = (uint64_t*)data; + for(int i=0;i<link_test_records.size();i++) { + if(link_test_records[i]->id == id[0]) { + float rtt = (OSUtils::now()-link_test_records[i]->egress_time) / (float)1000; // s + uint32_t sz = (link_test_records[i]->length) * 8; // bits + float transit_time = rtt / 2.0; + int64_t raw = sz / transit_time; + PhySocketImpl *sws = (reinterpret_cast<PhySocketImpl *>(s)); + if (sws) { + sws->throughput = raw; + } + delete link_test_records[i]; + link_test_records.erase(link_test_records.begin() + i); + return true; + } + } + return false; + } + + /** * Cause poll() to stop waiting immediately * * This can be used to reset the polling loop after changes that require @@ -985,7 +1178,7 @@ public: ZT_PHY_SOCKFD_TYPE sock = s->sock; // if closed, s->sock becomes invalid as s is no longer dereferencable if ((FD_ISSET(sock,&wfds))&&(FD_ISSET(sock,&_writefds))) { try { - _handler->phyOnUnixWritable((PhySocket *)&(*s),&(s->uptr),false); + _handler->phyOnUnixWritable((PhySocket *)&(*s),&(s->uptr)); } catch ( ... ) {} } if (FD_ISSET(sock,&rfds)) { |