diff options
author | Adam Ierymenko <adam.ierymenko@gmail.com> | 2013-10-28 09:33:32 -0400 |
---|---|---|
committer | Adam Ierymenko <adam.ierymenko@gmail.com> | 2013-10-28 09:33:32 -0400 |
commit | 148619f0ba836ff894185663633556dd04700a60 (patch) | |
tree | 710ecbbb7ebc6a2b19d910e10a2ccd472b86c579 /mac-tap/tuntap/test | |
parent | 17778a36badcb9fdb9b3292e8ec32be3b836c811 (diff) | |
download | infinitytier-148619f0ba836ff894185663633556dd04700a60.tar.gz infinitytier-148619f0ba836ff894185663633556dd04700a60.zip |
Make tap build on OSX 10.9, though not in a way that is easy for users. Instead package binaries.
Diffstat (limited to 'mac-tap/tuntap/test')
-rw-r--r-- | mac-tap/tuntap/test/tuntap/__init__.py | 21 | ||||
-rw-r--r-- | mac-tap/tuntap/test/tuntap/char_dev_harness.py | 173 | ||||
-rw-r--r-- | mac-tap/tuntap/test/tuntap/interface_harness.py | 279 | ||||
-rw-r--r-- | mac-tap/tuntap/test/tuntap/ioctl.py | 31 | ||||
-rw-r--r-- | mac-tap/tuntap/test/tuntap/packet.py | 531 | ||||
-rw-r--r-- | mac-tap/tuntap/test/tuntap/packet_codec.py | 244 | ||||
-rw-r--r-- | mac-tap/tuntap/test/tuntap/packet_reader.py | 270 | ||||
-rw-r--r-- | mac-tap/tuntap/test/tuntap/route.py | 112 | ||||
-rw-r--r-- | mac-tap/tuntap/test/tuntap/sockaddr.py | 124 | ||||
-rw-r--r-- | mac-tap/tuntap/test/tuntap/test_char_dev.py | 86 | ||||
-rw-r--r-- | mac-tap/tuntap/test/tuntap/test_interface.py | 120 | ||||
-rw-r--r-- | mac-tap/tuntap/test/tuntap/test_ip.py | 218 | ||||
-rw-r--r-- | mac-tap/tuntap/test/tuntap/tun_tap_harness.py | 96 | ||||
-rw-r--r-- | mac-tap/tuntap/test/tuntap/tun_tap_test_case.py | 40 | ||||
-rw-r--r-- | mac-tap/tuntap/test/tuntap/tuntap_tests.py | 83 |
15 files changed, 0 insertions, 2428 deletions
diff --git a/mac-tap/tuntap/test/tuntap/__init__.py b/mac-tap/tuntap/test/tuntap/__init__.py deleted file mode 100644 index 664bdaae..00000000 --- a/mac-tap/tuntap/test/tuntap/__init__.py +++ /dev/null @@ -1,21 +0,0 @@ -# Copyright (c) 2011 Mattias Nissler <mattias.nissler@gmx.de> -# -# Redistribution and use in source and binary forms, with or without modification, are permitted -# provided that the following conditions are met: -# -# 1. Redistributions of source code must retain the above copyright notice, this list of -# conditions and the following disclaimer. -# 2. Redistributions in binary form must reproduce the above copyright notice, this list of -# conditions and the following disclaimer in the documentation and/or other materials provided -# with the distribution. -# 3. The name of the author may not be used to endorse or promote products derived from this -# software without specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, -# INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A -# PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, -# INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED -# TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS -# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT -# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS -# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/mac-tap/tuntap/test/tuntap/char_dev_harness.py b/mac-tap/tuntap/test/tuntap/char_dev_harness.py deleted file mode 100644 index 515e50bf..00000000 --- a/mac-tap/tuntap/test/tuntap/char_dev_harness.py +++ /dev/null @@ -1,173 +0,0 @@ -# Copyright (c) 2011 Mattias Nissler <mattias.nissler@gmx.de> -# -# Redistribution and use in source and binary forms, with or without modification, are permitted -# provided that the following conditions are met: -# -# 1. Redistributions of source code must retain the above copyright notice, this list of -# conditions and the following disclaimer. -# 2. Redistributions in binary form must reproduce the above copyright notice, this list of -# conditions and the following disclaimer in the documentation and/or other materials provided -# with the distribution. -# 3. The name of the author may not be used to endorse or promote products derived from this -# software without specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, -# INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A -# PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, -# INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED -# TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS -# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT -# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS -# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - -import errno -import fcntl -import io -import os -import struct -from tuntap import ioctl - -class CharDevHarness(object): - """ - Base class for the tun and tap character device harnesses. Manages a single character - interface, keeps the file descriptor and handles I/O. - """ - - _MAX_CHAR_DEV = 16 - _MAX_PACKET_SIZE = 4096 - - def __init__(self, class_name, unit = None): - """ - Initializes the harness. - - Args: - class_name: Path name pattern. - unit: The character device number. - """ - self._class_name = class_name - self._unit = unit - self._dev = None - - def _openCharDev(self, unit): - """ - Opens the character device. - - Args: - unit: The character device number. - """ - assert not self._dev - - name = self._class_name % unit - self._dev = os.open(name, os.O_RDWR) - - def open(self): - """ - Opens the character device. - """ - if self._unit != None: - self._openCharDev(self._unit) - return - - # Try to open character devices in turn. - for i in xrange(0, self._MAX_CHAR_DEV): - try: - self._openCharDev(i) - self._unit = i - return - except OSError as e: - if e.errno != errno.EBUSY: - raise e - - # All devices busy. - raise OSError(errno.EBUSY) - - def close(self): - """ - Closes the character device. - """ - assert self._dev - os.close(self._dev) - self._dev = None - - def fileno(self): - assert self._dev - return self._dev - - def send(self, packet): - assert self._dev - os.write(self._dev, packet) - - def ioctl(self, cmd, format, arg): - """ - Performs an ioctl on the character device. - - Args: - cmd: the ioctl cmd identifier. - format: argument format. - arg: argument data tuple. - - Returns: - Output argument tuple. - """ - assert self._dev - return struct.unpack(format, fcntl.ioctl(self._dev, cmd, struct.pack(format, arg))) - - @property - def unit(self): - """ - Returns the interface unit, if known. - """ - return self._unit - - -class TunCharDevHarness(CharDevHarness): - """ - Character device harness for tun devices. - """ - - TUNSIFHEAD = ioctl.IOC(ioctl.OUT, 't', 96, 'i') - TUNGIFHEAD = ioctl.IOC(ioctl.IN, 't', 97, 'i') - - def __init__(self, unit = None): - """ - Initializes the harness. - - Args: - unit: Character device index - """ - super(TunCharDevHarness, self).__init__('/dev/tun%d', unit = unit) - - @property - def prependAF(self): - """ - Gets the AF prepending flag. - - Returns: - A flag indicating whether packets on the char dev are prefixed with the AF number. - """ - return self.ioctl(self.TUNGIFHEAD, 'i', (0))[0] - - @prependAF.setter - def prependAF(self, prependAF): - """ - Sets the AF prepending flag. - - Args: - prependAF: whether the packets on the char dev are prefixed with the AF number. - """ - self.ioctl(self.TUNSIFHEAD, 'i', (prependAF)) - - -class TapCharDevHarness(CharDevHarness): - """ - Character device harness for tap devices. - """ - - def __init__(self, unit = None): - """ - Initializes the harness. - - Args: - unit: Character device index - """ - super(TapCharDevHarness, self).__init__('/dev/tap%d', unit = unit) diff --git a/mac-tap/tuntap/test/tuntap/interface_harness.py b/mac-tap/tuntap/test/tuntap/interface_harness.py deleted file mode 100644 index fe4f1fc5..00000000 --- a/mac-tap/tuntap/test/tuntap/interface_harness.py +++ /dev/null @@ -1,279 +0,0 @@ -# Copyright (c) 2011 Mattias Nissler <mattias.nissler@gmx.de> -# -# Redistribution and use in source and binary forms, with or without modification, are permitted -# provided that the following conditions are met: -# -# 1. Redistributions of source code must retain the above copyright notice, this list of -# conditions and the following disclaimer. -# 2. Redistributions in binary form must reproduce the above copyright notice, this list of -# conditions and the following disclaimer in the documentation and/or other materials provided -# with the distribution. -# 3. The name of the author may not be used to endorse or promote products derived from this -# software without specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, -# INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A -# PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, -# INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED -# TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS -# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT -# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS -# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - -import ctypes -import ctypes.util -import errno -import fcntl -import socket -import struct - -from tuntap import ioctl -from tuntap.sockaddr import SockaddrDl, SockaddrIn, SockaddrIn6 - -libc = ctypes.CDLL(ctypes.util.find_library('c')) - -class struct_sockaddr(ctypes.Structure): - _fields_ = [ ('sa_len', ctypes.c_uint8), - ('sa_family', ctypes.c_uint8) ] - -class struct_ifaddrs(ctypes.Structure): - pass - -struct_ifaddrs._fields_ = [ ('ifa_next', ctypes.POINTER(struct_ifaddrs)), - ('ifa_name', ctypes.c_char_p), - ('ifa_flags', ctypes.c_uint), - ('ifa_addr', ctypes.POINTER(struct_sockaddr)), - ('ifa_netmask', ctypes.POINTER(struct_sockaddr)), - ('ifa_dstaddr', ctypes.POINTER(struct_sockaddr)), - ('ifa_data', ctypes.c_void_p) ] - -def decodeSockaddr(sockaddr): - if not sockaddr: - return None - - data = ctypes.string_at(sockaddr, max(sockaddr.contents.sa_len, 16)) - if sockaddr.contents.sa_family == SockaddrDl.AF_LINK: - return SockaddrDl.decode(data) - elif sockaddr.contents.sa_family == socket.AF_INET: - return SockaddrIn.decode(data) - elif sockaddr.contents.sa_family == socket.AF_INET6: - return SockaddrIn6.decode(data) - - return None - -def getIfAddrs(ifname): - ifaddrs = (ctypes.POINTER(struct_ifaddrs))() - assert not libc.getifaddrs(ctypes.byref(ifaddrs)) - - addrs = [] - try: - entry = ifaddrs - while entry: - ia = entry.contents - entry = ia.ifa_next - if ia.ifa_name != ifname: - continue - - addrs.append((decodeSockaddr(ia.ifa_addr), - decodeSockaddr(ia.ifa_netmask), - decodeSockaddr(ia.ifa_dstaddr))) - return addrs - finally: - libc.freeifaddrs(ifaddrs) - - -def ifNameToIndex(ifname): - libc.if_nametoindex.restype = ctypes.c_uint - index = libc.if_nametoindex(ifname) - if not index: - raise OSError(ctypes.get_errno) - return index - - -class Address(object): - """ - Wraps address parameters for an interface. - """ - - def __init__(self, af, local, remote, dst, mask): - self.af = af - self.local = local - self.remote = remote - self.dst = dst - self.mask = mask - - def __makeSaProperty(name): - def get(self): - addrmap = { socket.AF_INET: SockaddrIn, - socket.AF_INET6: SockaddrIn6 } - addr = getattr(self, name) - if self.af not in addrmap: - return None - if addr == None: - return addrmap[self.af](af = 0, addr = None) - return addrmap[self.af](af = self.af, addr = addr) - - return property(get) - - sa_local = __makeSaProperty('local') - sa_remote = __makeSaProperty('remote') - sa_dst = __makeSaProperty('dst') - sa_mask = __makeSaProperty('mask') - - -class InterfaceHarness(object): - """ - Base class for network interface harnesses. Provides helpers to configure the interface. - """ - - SIOCSIFFLAGS = ioctl.IOC(ioctl.OUT, 'i', 16, '16s16s') - SIOCGIFFLAGS = ioctl.IOC(ioctl.INOUT, 'i', 17, '16s16s') - - SIOCAIFADDR = ioctl.IOC(ioctl.OUT, 'i', 26, '16s16s16s16s') - SIOCAIFADDR_IN6 = ioctl.IOC(ioctl.OUT, 'i', 26, '16s28s28s28sIiiII') - SIOCSIFLLADDR = ioctl.IOC(ioctl.OUT, 'i', 60, '16s16s') - - IFF_UP = 0x1 - IFF_BROADCAST = 0x2 - IFF_DEBUG = 0x4 - IFF_LOOPBACK = 0x8 - IFF_POINTOPOINT = 0x10 - IFF_NOTRAILERS = 0x20 - IFF_RUNNING = 0x40 - IFF_NOARP = 0x80 - IFF_PROMISC = 0x100 - IFF_ALLMULTI = 0x200 - IFF_OACTIVE = 0x400 - IFF_SIMPLEX = 0x800 - IFF_LINK0 = 0x1000 - IFF_LINK1 = 0x2000 - IFF_LINK2 = 0x4000 - IFF_MULTICAST = 0x8000 - - def __init__(self, class_name, unit): - """ - Initializes the harness. - - Args: - class_name: Interface class name. - unit: The interface number. - """ - self._class_name = class_name - self._unit = unit - - def _ioctl(self, af, cmd, format, arg): - """ - Performs a socket ioctl. - - Args: - af: address family. - cmd: the ioctl cmd. - format: argument format description. - arg: argument data tuple. - - Returns: - Output data tuple. - """ - s = socket.socket(af, socket.SOCK_DGRAM) - try: - return struct.unpack(format, fcntl.ioctl(s, cmd, struct.pack(format, *arg))) - finally: - s.close() - - @property - def flags(self): - """ - Retrieves the interface flags. - - Returns: - The interface flags. - """ - return self._ioctl(socket.AF_INET, InterfaceHarness.SIOCGIFFLAGS, - '16sH', (self.name, 0))[1] - - @flags.setter - def flags(self, flags): - """ - Sets new interface flags. - - Args: - flags: new interface flags. - """ - self._ioctl(socket.AF_INET, InterfaceHarness.SIOCSIFFLAGS, - '16sH', (self.name, flags)) - - @property - def name(self): - """ - Gets the interface name. - - Returns: - Full interface name. - """ - return "%s%d" % (self._class_name, self._unit) - - @property - def index(self): - """ - Gets the interface index. - - Returns: - Interface index. - """ - return ifNameToIndex(self.name) - - def getAddrs(self, af = None): - def check(addr): - if addr and addr.af == af: - return addr - else: - return None - return filter(lambda (a, n, d): a != None, - map(lambda (a, n, d): (check(a), check(n), check(d)), getIfAddrs(self.name))) - - @property - def lladdr(self): - entry = self.getAddrs(SockaddrDl.AF_LINK) - if entry: - return entry[0][0] - return None - - @lladdr.setter - def lladdr(self, addr): - self._ioctl(socket.AF_INET, InterfaceHarness.SIOCSIFLLADDR, - '16sBB14s', (self.name, len(addr.addr), addr.af, addr.addr)) - - def addIfAddr(self, local, dst, mask): - """ - Set an interface address. - - Args: - local: local address. - dst: broadcast address or destination address, respectively. - mask: the netmask. - """ - self._ioctl(socket.AF_INET, InterfaceHarness.SIOCAIFADDR, - '16s16s16s16s', (self.name, local.encode(), dst.encode(), mask.encode())) - - def addIfAddr6(self, local, dst, mask): - """ - Set an INET6 address for the interface. - - Args: - local: local address. - dst: destination address. - mask: the netmask. - """ - # This sometimes fails on Tiger with ENOBUFS. Just retry... - ntries = 0 - while True: - try: - self._ioctl(socket.AF_INET6, InterfaceHarness.SIOCAIFADDR_IN6, - '16s28s28s28sIiiII', - (self.name, local.encode(), dst.encode(), mask.encode(), - 0, 0, 0, 0xffffffff, 0xffffffff)) - break - except IOError as e: - if e.errno != errno.ENOBUFS or ntries > 10: - raise e - ntries += 1 diff --git a/mac-tap/tuntap/test/tuntap/ioctl.py b/mac-tap/tuntap/test/tuntap/ioctl.py deleted file mode 100644 index 641ddf24..00000000 --- a/mac-tap/tuntap/test/tuntap/ioctl.py +++ /dev/null @@ -1,31 +0,0 @@ -# Copyright (c) 2011 Mattias Nissler <mattias.nissler@gmx.de> -# -# Redistribution and use in source and binary forms, with or without modification, are permitted -# provided that the following conditions are met: -# -# 1. Redistributions of source code must retain the above copyright notice, this list of -# conditions and the following disclaimer. -# 2. Redistributions in binary form must reproduce the above copyright notice, this list of -# conditions and the following disclaimer in the documentation and/or other materials provided -# with the distribution. -# 3. The name of the author may not be used to endorse or promote products derived from this -# software without specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, -# INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A -# PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, -# INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED -# TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS -# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT -# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS -# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - -import struct - -VOID = 0x20000000 -IN = 0x40000000 -OUT = 0x80000000 -INOUT = IN | OUT - -def IOC(inout, group, num, format): - return inout | ((struct.calcsize(format) & 0x1fff) << 16) | (ord(group) << 8) | num diff --git a/mac-tap/tuntap/test/tuntap/packet.py b/mac-tap/tuntap/test/tuntap/packet.py deleted file mode 100644 index 6fbbb736..00000000 --- a/mac-tap/tuntap/test/tuntap/packet.py +++ /dev/null @@ -1,531 +0,0 @@ -# Copyright (c) 2011 Mattias Nissler <mattias.nissler@gmx.de> -# -# Redistribution and use in source and binary forms, with or without modification, are permitted -# provided that the following conditions are met: -# -# 1. Redistributions of source code must retain the above copyright notice, this list of -# conditions and the following disclaimer. -# 2. Redistributions in binary form must reproduce the above copyright notice, this list of -# conditions and the following disclaimer in the documentation and/or other materials provided -# with the distribution. -# 3. The name of the author may not be used to endorse or promote products derived from this -# software without specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, -# INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A -# PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, -# INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED -# TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS -# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT -# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS -# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - -import socket - -class BinStruct(object): - """ - Handles packing and unpacking of binary data. It is vaguely inspired by the struct module but - taylored for bit-granular fields. Also, it's probably not very fast :) - """ - - @staticmethod - def str2num(data, width): - if not data: - return 0 - len, rem = divmod(width, 8) - val = 0 - for i in range(len): - val = (val << 8) | ord(data[i]) - if rem: - val = (val << rem) | (ord(data[len]) & ((1 << rem) - 1)) - return val - - @staticmethod - def num2str(val, width): - result = bytearray((width + 7) / 8) - p, rem = divmod(width, 8) - if rem: - result[p] = chr(val & ((1 << rem) - 1)) - val >>= rem - while p > 0: - p -= 1 - result[p] = chr(val & 0xff) - val >>= 8 - return str(result) - - def __init__(self, format): - """ - Initializes a BinStruct object that can encode and decode the binary structure specified in - the format parameters. - - Args: - format: Specifies the format of the binary data. The syntax is - - (<width><type>)* - - where: - width is the width of a component in number of bits - type is indicates the type of the component and may be one of: - s: binary data - n: number - """ - self._format = [] - id = lambda x, width : x or 0 - typemap = { - 'n': (id, id), - 's': (BinStruct.num2str, BinStruct.str2num), - } - pos = 0 - self._width = 0 - while pos < len(format): - start = pos - while str.isdigit(format[pos]): - pos += 1 - width = int(format[start:pos]) - self._width += width - codec = typemap[format[pos]] - self._format.insert(0, (width, codec[0], codec[1])) - pos += 1 - - @property - def size(self): - return (self._width + 7) / 8 - - def pack(self, *values): - """ - Encodes the passed values according to this BinStruct's format definition. - - Args: - values: The values to encode. - Returns: - The encoded struct as a binary string. - """ - assert len(values) == len(self._format) - val = 0 - pos = len(self._format) - for value in values: - pos -= 1 - (width, decode, encode) = self._format[pos] - val = (val << width) | (encode(value, width) & ((1 << width) - 1)) - return BinStruct.num2str(val, self._width) - - def unpack(self, data): - """ - Decodes a binary string according to the format definition. - - Args: - data: The binary string to decode. - Returns: - A value tuple. - """ - assert len(data) >= self.size - val = BinStruct.str2num(data, self._width) - pos = len(self._format) - result = [ None for i in range(pos) ] - for (width, decode, encode) in self._format: - pos -= 1 - result[pos] = decode(val & ((1 << width) - 1), width) - val >>= width - return tuple(result) - - -class Packet(object): - """ - Base class for packet encoding and decoding. - """ - - def __init__(self, format, names, data = None, **initializer): - """ - Initializes the packet. - - Args: - format: Binary format description. - names: Names for the packet fields. - data: Optional binary packet to decode. - initializer: Optional initialization values for the packet fields. - """ - self._struct = BinStruct(format) - self._names = names - self.__dict__.update(dict.fromkeys(self._names, None)) - self.payload = None - - if isinstance(data, str): - self.decode(data) - elif isinstance(data, Packet): - self.update(data) - - self.__dict__.update(initializer) - - def __repr__(self): - return repr(dict(map(lambda x : (x, getattr(self, x)), self._names + ('payload',)))) - - def _payloadPos(self): - """ - Returns: The payload position in the data buffer. - """ - return self._struct.size - - def _decodePayload(self, data): - """ - Decodes the payload data. - - Args: - data: Payload data buffer. - Returns: - The payload object. - """ - return data - - def _encodePayload(self): - """ - Encodes the payload data. - - Args: - payload: Payload object. - Returns: - Encoded payload byte string. - """ - if issubclass(self.payload.__class__, Packet): - return self.payload.encode() - return str(self.payload) - - def _encodeFields(self, *fields): - """ - Takes a fields tuple and returns encoded field data. - - Args: - fields: Field values. - Returns: - Tuple of encoded fields. - """ - return self._struct.pack(*fields) - - def decode(self, data): - """ - Decode a binary packet. - - Args: - data: Binary packet data to decode. - """ - fields = self._struct.unpack(data) - assert len(fields) == len(self._names) - self.__dict__.update(dict(zip(self._names, fields))) - self.payload = self._decodePayload(data[self._payloadPos():]) - - def update(self, data): - """ - Update the packet from a dictionary. - - Args: - data: The dictionary to update from. - """ - self.__dict__.update(map(lambda x : (x, getattr(data, x)), self._names + ('payload',))) - if isinstance(self.payload, str): - self.payload = self._decodePayload(self.payload) - - - def encode(self): - """ - Encodes the packet into binary format. - - Returns: - The packet data. - """ - fields = map(lambda x : getattr(self, x), self._names) - return self._encodeFields(*fields) + self._encodePayload() - - @property - def headerLen(self): - """ - The size of the header according to the format. - - Returns: - The header length. - """ - return self._struct.size - - -class TunAFFrame(Packet): - - def __init__(self, data = None, **initializer): - super(TunAFFrame, self).__init__('32n', ('af',), data, **initializer) - - def _decodePayload(self, data): - if self.af == socket.AF_INET: - return IPv4Packet(data) - elif self.af == socket.AF_INET6: - return IPv6Packet(data) - return data - - -class EthernetFrame(Packet): - - TYPE_IPV4 = 0x0800 - TYPE_ARP = 0x0806 - TYPE_IPV6 = 0x86dd - - def __init__(self, data = None, **initializer): - super(EthernetFrame, self).__init__('48s48s16n', ('dst', 'src', 'type'), - data, **initializer) - - def _decodePayload(self, data): - if self.type == EthernetFrame.TYPE_IPV4: - return IPv4Packet(data) - elif self.type == EthernetFrame.TYPE_ARP: - return ARPPacket(data) - elif self.type == EthernetFrame.TYPE_IPV6: - return IPv6Packet(data) - return data - - -class ARPPacket(Packet): - - HTYPE_ETHERNET = 0x01 - HLEN_ETHERNET = 6 - PTYPE_IPV4 = 0x0800 - PLEN_IPV4 = 4 - OPER_REQUEST = 1 - OPER_REPLY = 2 - - def __init__(self, data = None, **initializer): - super(ARPPacket, self).__init__('16n16n8n8n16n48s32s48s32s', - ('htype', 'ptype', 'hlen', 'plen', 'oper', - 'sha', 'spa', 'tha', 'tpa'), - data, **initializer) - - -class IPv4Packet(Packet): - - PROTO_ICMP = 0x01 - PROTO_TCP = 0x06 - PROTO_UDP = 0x11 - - class UDPPseudoHeader(Packet): - - def __init__(self, data = None, **initializer): - super(IPv4Packet.UDPPseudoHeader, self).__init__('32s32s8s8n16n', - ('src', 'dst', - 'padding', 'proto', 'length'), - data, **initializer) - - - def __init__(self, data = None, **initializer): - super(IPv4Packet, self).__init__('4n4n6n2n16n16n2n14n8n8n16n32s32s', - ('version', 'hdrlen', 'dscp', 'ecn', - 'len', 'id', 'flags', 'fragoffset', - 'ttl', 'proto', 'checksum', 'src', 'dst'), - data, **initializer) - - def _payloadPos(self): - return self.hdrlen * 4 - - def _decodePayload(self, data): - if self.proto == IPv4Packet.PROTO_UDP: - return UDPPacket(data) - return data - - @staticmethod - def computeChecksum(data): - """ - Computes the IPv4 header checksum. - - Args: - Header in binary. - Returns: - The header checksum. - """ - sum = 0 - for i in range(0, len(data) - 1, 2): - sum += ord(data[i]) << 8 | ord(data[i + 1]) - if len(data) % 2 == 1: - sum += ord(data[-1]) << 8 | 0 - return ~((sum & 0xffff) + (sum >> 16)) - - def encode(self): - payload = self._encodePayload() - hdrlen = self.hdrlen or 5 - payloadlen = self.len or len(payload) - fields = [self.version or 4, hdrlen, self.dscp or 0, self.ecn or 0, - payloadlen + hdrlen * 4, self.id or 0, self.flags or 0, - self.fragoffset or 0, self.ttl or 255, self.proto, self.checksum or 0, - self.src, self.dst] - - # Need to compute UDP checksum here since it includes the IPv4 pseudo header. - if (self.proto == IPv4Packet.PROTO_UDP and - issubclass(self.payload.__class__, UDPPacket) and - self.payload.checksum == None): - - header = IPv4Packet.UDPPseudoHeader(src = self.src, dst = self.dst, - proto = IPv4Packet.PROTO_UDP, length = payloadlen, - payload = payload) - payload = UDPPacket(data = self.payload, - checksum = IPv4Packet.computeChecksum(header.encode())).encode() - - header = self._encodeFields(*tuple(fields)) - if self.checksum == None: - fields[10] = IPv4Packet.computeChecksum(header) - header = self._encodeFields(*tuple(fields)) - return header + payload - -class IPv6Packet(Packet): - - PROTO_ICMP = 1 - PROTO_TCP = 6 - PROTO_UDP = 17 - PROTO_ICMPV6 = 58 - - class UDPPseudoHeader(Packet): - - def __init__(self, data = None, **initializer): - super(IPv6Packet.UDPPseudoHeader, self).__init__('128s128s32n24s8n', - ('src', 'dst', - 'length', 'padding', 'proto'), - data, **initializer) - - - def __init__(self, data = None, **initializer): - super(IPv6Packet, self).__init__('4n8n20n16n8n8n128s128s', - ('version', 'traffic_class', 'flow_label', - 'len', 'proto', 'hop_limit', - 'src', 'dst'), - data, **initializer) - - def _decodePayload(self, data): - if self.proto == IPv6Packet.PROTO_UDP: - return UDPPacket(data) - elif self.proto == IPv6Packet.PROTO_ICMPV6: - return ICMPV6Packet(data) - return data - - def encode(self): - payload = self._encodePayload() - fields = [self.version or 6, self.traffic_class or 0, self.flow_label or 0, - self.len or len(payload), self.proto, self.hop_limit or 255, - self.src, self.dst] - - # Need to compute checksum for UDP, ICMPV6 here since it includes the IPv6 pseudo header. - checksummedProtos = { IPv6Packet.PROTO_UDP: UDPPacket, - IPv6Packet.PROTO_ICMPV6: ICMPV6Packet } - payloadClass = checksummedProtos.get(self.proto) - if (payloadClass != None and - issubclass(self.payload.__class__, payloadClass) and - self.payload.checksum == None): - - header = IPv6Packet.UDPPseudoHeader(src = self.src, dst = self.dst, length = fields[3], - proto = self.proto, payload = payload) - payload = payloadClass(data = self.payload, - checksum = IPv4Packet.computeChecksum(header.encode())).encode() - - return self._encodeFields(*tuple(fields)) + payload - - -class ICMPV6Packet(Packet): - - TYPE_NEIGHBOR_SOLICITATION = 135 - TYPE_NEIGHBOR_ADVERTISMENT = 136 - - def __init__(self, data = None, **initializer): - super(ICMPV6Packet, self).__init__('8n8n16n', - ('type', 'code', 'checksum'), - data, **initializer) - - def _decodePayload(self, data): - if self.type == ICMPV6Packet.TYPE_NEIGHBOR_SOLICITATION: - return ICMPV6NeighborSolicitation(data) - elif self.type == ICMPV6Packet.TYPE_NEIGHBOR_ADVERTISMENT: - return ICMPV6NeighborAdvertisement(data) - return data - - -class ICMPV6NeighborDiscoveryOption(Packet): - - TYPE_SOURCE_LINK_LAYER_ADDRESS = 1 - TYPE_TARGET_LINK_LAYER_ADDRESS = 2 - - def __init__(self, data = None, **initializer): - super(ICMPV6NeighborDiscoveryOption, self).__init__('8n8n', - ('type', 'length'), - data, **initializer) - - def encode(self): - payload = self._encodePayload() - length = self.length - if length == None: - length = (len(payload) + 2 + 7) / 8 - payload += '\x00' * (length * 8 - len(payload) - 2) - fields = [self.type, length] - header = self._encodeFields(*tuple(fields)) - return header + payload - - @staticmethod - def decodeOptions(data): - options = [] - while len(data) > 2: - type = ord(data[0]) - length = ord(data[1]) - if len(data) < length * 8: - break - options.append(ICMPV6NeighborDiscoveryOption(type = type, length = length, - payload = data[0:length * 8])) - data = data[length * 8:] - return options - - -class ICMPV6NeighborSolicitation(Packet): - - def __init__(self, data = None, **initializer): - super(ICMPV6NeighborSolicitation, self).__init__('32s128s', - ('reserved', 'target'), - data, **initializer) - self.target_lladdr = initializer.get('src_lladdr') - - def _decodePayload(self, data): - for option in ICMPV6NeighborDiscoveryOption.decodeOptions(data): - if option.type == ICMPV6NeighborDiscoveryOption.TYPE_SOURCE_LINK_LAYER_ADDRESS: - self.src_lladdr = option.payload - return None - - def _encodePayload(self): - if self.src_lladdr: - return ICMPV6NeighborDiscoveryOption( - type = ICMPV6NeighborDiscoveryOption.TYPE_SOURCE_LINK_LAYER_ADDRESS, - payload = self.src_lladdr).encode() - return '' - - -class ICMPV6NeighborAdvertisement(Packet): - - def __init__(self, data = None, **initializer): - super(ICMPV6NeighborAdvertisement, self).__init__('1n1n1n29s128s', - ('router', 'solicited', 'override', - 'reserved', 'target'), - data, **initializer) - self.target_lladdr = initializer.get('target_lladdr') - - def _decodePayload(self, data): - for option in ICMPV6NeighborDiscoveryOptions.decodeOptions(data): - if option.type == ICMPV6NeighborDiscoveryOption.TYPE_TARGET_LINK_LAYER_ADDRESS: - self.target_lladdr = option.payload - return None - - def _encodePayload(self): - if self.target_lladdr: - return ICMPV6NeighborDiscoveryOption( - type = ICMPV6NeighborDiscoveryOption.TYPE_TARGET_LINK_LAYER_ADDRESS, - payload = self.target_lladdr).encode() - return '' - - -class UDPPacket(Packet): - - def __init__(self, data = None, **initializer): - super(UDPPacket, self).__init__('16n16n16n16n', - ('src', 'dst', 'len', 'checksum'), - data, **initializer) - - def encode(self): - payload = self._encodePayload() - packetlen = self.len or (len(payload) + self.headerLen) - fields = [self.src, self.dst, packetlen, self.checksum or 0] - header = self._encodeFields(*tuple(fields)) - return header + payload diff --git a/mac-tap/tuntap/test/tuntap/packet_codec.py b/mac-tap/tuntap/test/tuntap/packet_codec.py deleted file mode 100644 index 09095966..00000000 --- a/mac-tap/tuntap/test/tuntap/packet_codec.py +++ /dev/null @@ -1,244 +0,0 @@ -# Copyright (c) 2011 Mattias Nissler <mattias.nissler@gmx.de> -# -# Redistribution and use in source and binary forms, with or without modification, are permitted -# provided that the following conditions are met: -# -# 1. Redistributions of source code must retain the above copyright notice, this list of -# conditions and the following disclaimer. -# 2. Redistributions in binary form must reproduce the above copyright notice, this list of -# conditions and the following disclaimer in the documentation and/or other materials provided -# with the distribution. -# 3. The name of the author may not be used to endorse or promote products derived from this -# software without specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, -# INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A -# PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, -# INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED -# TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS -# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT -# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS -# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - -import functools -import socket - -from tuntap.tun_tap_harness import TunHarness, TapHarness -from tuntap.packet import ( - ARPPacket, - EthernetFrame, - ICMPV6Packet, - ICMPV6NeighborAdvertisement, - ICMPV6NeighborSolicitation, - IPv4Packet, - IPv6Packet, - TunAFFrame, - UDPPacket -) -from tuntap.packet_reader import PacketReader, SelectPacketSource - -class PacketCodec(object): - """ - Helper for tests that wish to send and receive packets. This provides the interface to send and - receive packets at the IP/IPv6 level on both the network interface and char dev sides. - """ - - def __init__(self, af, listenAddress, newHarness, newPacketSource): - self._af = af - self._listenAddress = listenAddress - self._newHarness = newHarness - self._newPacketSource = newPacketSource - - def __str__(self): - af_map = { socket.AF_INET: 'IN', socket.AF_INET6: 'IN6' } - return '<%s<%s, %s>>' % (self.__class__.__name__, - af_map[self._af], - self._newPacketSource.__name__) - - def _decodePacket(self, packet): - return packet - - def _framePacket(self, payload): - return payload - - def _frameExpectation(self, expectation): - return expectation - - @property - def af(self): - return self._af - - @property - def addr(self): - if self._af == socket.AF_INET: - return self._harness.addr - elif self._af == socket.AF_INET6: - return self._harness.addr6 - assert False - - @property - def UDPPort(self): - return self._recvSock.getsockname()[1] - - def start(self): - self._harness = self._newHarness() - self._harness.start() - self._harness.up() - - self._sendSock = socket.socket(self.addr.af, socket.SOCK_DGRAM) - self._recvSock = socket.socket(self.addr.af, socket.SOCK_DGRAM) - self._recvSock.bind((self._listenAddress or self.addr.local, 0)) - - self._reader = PacketReader(source = self._newPacketSource(self._harness.char_dev.fileno()), - skip = True, - decode = lambda packet : self._decodePacket(packet)) - self._sockReader = PacketReader(source = SelectPacketSource(self._recvSock.fileno())) - - self._reader.start() - self._sockReader.start() - - def stop(self): - self._sockReader.stop() - self._reader.stop() - self._harness.stop() - self._sendSock.close() - self._recvSock.close() - - def sendUDP(self, payload, addr): - self._sendSock.sendto(payload, addr) - - def expectUDP(self, expectation): - self._sockReader.expect(expectation) - - def runUDP(self): - return self._sockReader.run() - - def sendPacket(self, payload): - self._harness.char_dev.send(self._framePacket(payload)) - - def expectPacket(self, expectation): - self._reader.expect(self._frameExpectation(expectation)) - - def runPacket(self): - return self._reader.run() - - -class TunPacketCodec(PacketCodec): - - def __init__(self, af, listenAddress, newPacketSource): - super(TunPacketCodec, self).__init__(af, listenAddress, TunHarness, newPacketSource) - - def _decodePacket(self, packet): - # Look at the first byte to figure out whether it's IPv4 or IPv6. - version = (ord(packet[0]) & 0xf0) >> 4 - if version == 4: - return IPv4Packet(packet) - elif version == 6: - return IPv6Packet(packet) - else: - return packet - - -class TunAFPacketCodec(PacketCodec): - - def __init__(self, af, listenAddress, newPacketSource): - super(TunAFPacketCodec, self).__init__(af, listenAddress, TunHarness, newPacketSource) - - def _decodePacket(self, packet): - return TunAFFrame(packet) - - def _framePacket(self, payload): - return TunAFFrame(af = self.addr.af, payload = payload).encode() - - def _frameExpectation(self, expectation): - return { 'af': self.addr.af, - 'payload': expectation } - - def start(self): - super(TunAFPacketCodec, self).start() - self._harness.char_dev.prependAF = 1 - - -class TapPacketCodec(PacketCodec): - - TYPE_MAP = { socket.AF_INET: EthernetFrame.TYPE_IPV4, - socket.AF_INET6: EthernetFrame.TYPE_IPV6 } - - ETHER_ADDR_ANY = '\xff\xff\xff\xff\xff\xff' - ETHER_ADDR_REMOTE = '\x11\x22\x33\x44\x55\x66' - - def __init__(self, af, listenAddress, newPacketSource): - super(TapPacketCodec, self).__init__(af, listenAddress, TapHarness, newPacketSource) - - def _decodePacket(self, packet): - return EthernetFrame(packet) - - def _framePacket(self, payload): - return EthernetFrame(src = TapPacketCodec.ETHER_ADDR_REMOTE, - dst = self._harness.interface.lladdr.addr, - type = TapPacketCodec.TYPE_MAP[self.addr.af], - payload = payload).encode() - - def _frameExpectation(self, expectation): - return { 'type': TapPacketCodec.TYPE_MAP[self.addr.af], - 'src': self._harness.interface.lladdr.addr, - 'payload': expectation } - - def _sendArpReply(self, packet): - reply = EthernetFrame(dst = packet.src, - src = TapPacketCodec.ETHER_ADDR_ANY, - type = EthernetFrame.TYPE_ARP, - payload = ARPPacket(htype = ARPPacket.HTYPE_ETHERNET, - ptype = ARPPacket.PTYPE_IPV4, - hlen = ARPPacket.HLEN_ETHERNET, - plen = ARPPacket.PLEN_IPV4, - oper = ARPPacket.OPER_REPLY, - sha = TapPacketCodec.ETHER_ADDR_REMOTE, - spa = packet.payload.tpa, - tha = packet.payload.sha, - tpa = packet.payload.spa)) - self._harness.char_dev.send(reply.encode()) - - def _sendNeighborAdvertisement(self, packet): - reply = EthernetFrame( - dst = packet.payload.payload.payload.src_lladdr, - src = TapPacketCodec.ETHER_ADDR_ANY, - type = EthernetFrame.TYPE_IPV6, - payload = IPv6Packet( - src = socket.inet_pton(self.addr.af, self.addr.remote), - dst = packet.payload.src, - proto = IPv6Packet.PROTO_ICMPV6, - payload = ICMPV6Packet( - type = ICMPV6Packet.TYPE_NEIGHBOR_ADVERTISMENT, - payload = ICMPV6NeighborAdvertisement( - solicited = 1, - override = 1, - target = socket.inet_pton(self.addr.af, self.addr.remote), - target_lladdr = TapPacketCodec.ETHER_ADDR_REMOTE)))) - self._harness.char_dev.send(reply.encode()) - - def start(self): - super(TapPacketCodec, self).start() - # Answer ARP resolution requests for the destination address. - self._reader.expect( - expectation = { 'type': EthernetFrame.TYPE_ARP, - 'payload': { 'htype': ARPPacket.HTYPE_ETHERNET, - 'ptype': ARPPacket.PTYPE_IPV4, - 'hlen': ARPPacket.HLEN_ETHERNET, - 'plen': ARPPacket.PLEN_IPV4, - 'oper': ARPPacket.OPER_REQUEST, - 'tpa': socket.inet_pton(self.addr.af, self.addr.remote) }}, - times = None, - action = functools.partial(TapPacketCodec._sendArpReply, self)) - # Answer Neighbor Solicitation requests for IPv6. - self._reader.expect( - expectation = { - 'type': EthernetFrame.TYPE_IPV6, - 'payload': { - 'proto': IPv6Packet.PROTO_ICMPV6, - 'payload': { - 'type': ICMPV6Packet.TYPE_NEIGHBOR_SOLICITATION, - 'payload': { - 'target': socket.inet_pton(self.addr.af, self.addr.remote) }}}}, - times = None, - action = functools.partial(TapPacketCodec._sendNeighborAdvertisement, self)) diff --git a/mac-tap/tuntap/test/tuntap/packet_reader.py b/mac-tap/tuntap/test/tuntap/packet_reader.py deleted file mode 100644 index b92d3ac8..00000000 --- a/mac-tap/tuntap/test/tuntap/packet_reader.py +++ /dev/null @@ -1,270 +0,0 @@ -# Copyright (c) 2011 Mattias Nissler <mattias.nissler@gmx.de> -# -# Redistribution and use in source and binary forms, with or without modification, are permitted -# provided that the following conditions are met: -# -# 1. Redistributions of source code must retain the above copyright notice, this list of -# conditions and the following disclaimer. -# 2. Redistributions in binary form must reproduce the above copyright notice, this list of -# conditions and the following disclaimer in the documentation and/or other materials provided -# with the distribution. -# 3. The name of the author may not be used to endorse or promote products derived from this -# software without specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, -# INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A -# PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, -# INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED -# TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS -# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT -# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS -# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - -import errno -import os -import Queue -import select -import signal -import socket -import pickle -import threading - -MAX_PACKET_SIZE = 4096 - -def handleEAgain(fn, *args, **kwargs): - """ - Wraps a function call in loop, restarting on EAGAIN. - """ - while True: - try: - return fn(*args, **kwargs) - except EnvironmentError as e: - if e.errno != errno.EAGAIN: - raise - except: - raise - - -class BlockingPacketSource(object): - """ - In order to be able to test blocking reads and not hang forever if the expected data never - arrives, we do the blocking read call in a forked subprocess that forwards the data read from - the fd over a domain socket. - """ - - def __init__(self, fd): - (self._rsock, wsock) = socket.socketpair(socket.AF_UNIX, socket.SOCK_DGRAM) - child = os.fork() - if child != 0: - wsock.close() - self._child = child - return - - self._rsock.close() - - # This is the read loop in the forked process and it won't quit until either the process - # gets killed or there is a read error. - try: - while True: - packet = handleEAgain(os.read, fd, MAX_PACKET_SIZE) - handleEAgain(wsock.send, pickle.dumps((0, packet))) - if len(packet) == 0: - break - except KeyboardInterrupt: - pass - except EnvironmentError as e: - handleEAgain(wsock.send, pickle.dumps((e.errno, ''))) - finally: - os.close(fd) - wsock.close() - os._exit(os.EX_OK) - - def read(self, killpipe): - (r, w, x) = select.select([self._rsock, killpipe], [], []) - if killpipe in r: - return None - if self._rsock in r: - try: - return handleEAgain(self._rsock.recv, MAX_PACKET_SIZE) - except EnvironmentError as e: - # If there's a read error on the subprocess, it'll close the socket. - if e.errno != errno.ECONNRESET: - raise e - return None - - def stop(self): - os.kill(self._child, signal.SIGINT) - os.waitpid(self._child, 0) - self._rsock.close() - - -class SelectPacketSource(object): - """ - Reads data from a file descriptor, waiting for input using select(). - """ - - def __init__(self, fd): - self._fd = fd - - def read(self, killpipe): - (r, w, x) = select.select([self._fd, killpipe], [], []) - if killpipe in r: - return None - if self._fd in r: - packet = handleEAgain(os.read, self._fd, MAX_PACKET_SIZE) - return pickle.dumps((0, packet)) - return None - - def stop(self): - pass - -class Expectation(object): - """ - Describes an expectation. Expectations are specified as dictionaries to match the packet - against. Entries may specify nested dictionaries for recursive matching and callables can be - used as predicates. Any other entry will be compared to the corresponding value in the packet. - """ - - def __init__(self, expectation, times, action): - self._expectation = expectation - self._times = times - self._action = action - - @property - def active(self): - return self._times == None or self.pending - - @property - def pending(self): - return self._times != None and self._times > 0 - - def check(self, packet): - #print 'Matching %s against %s' % (packet, self._expectation) - if self.active and Expectation._matches(packet, self._expectation): - if self._times: - self._times -= 1 - if callable(self._action): - self._action(packet) - return True - return False - - @staticmethod - def _matches(packet, expectation): - if isinstance(expectation, dict): - for (name, entry) in expectation.iteritems(): - try: - val = getattr(packet, name) - except AttributeError: - return False - if not Expectation._matches(val, entry): - return False - return True - elif callable(expectation): - return expectation(packet) - else: - return packet == expectation - - -class PacketReader(object): - """ - Takes care of reading packets and matching them against expectations. - """ - - def __init__(self, source, decode = str, skip = False): - """ - Initializes a new reader. - - Args: - source: packet source to read packets from. - decode: packet decoding function. - skip: whether non-matching packets are to be skipped. - """ - self._source = source - self._decode = decode - self._skip = skip - self._expectations = [] - self._packets = Queue.Queue() - self._shutdownPipe = os.pipe() - self._stop = threading.Event() - - def start(self): - self._readThread = threading.Thread(target = self) - self._readThread.start() - - def stop(self): - self._stop.set() - handleEAgain(os.write, self._shutdownPipe[1], 'stop') - self._readThread.join() - self._source.stop() - os.close(self._shutdownPipe[0]) - os.close(self._shutdownPipe[1]) - - def __call__(self): - """ - Reading service function, runs in a separate thread. - """ - try: - while True: - packet = handleEAgain(self._source.read, self._shutdownPipe[0]) - if not packet: - self._packets.put((0, '')) - break - self._packets.put(pickle.loads(packet)) - except EnvironmentError as e: - # The read() is racing against stop(), ignore these situations. - if e.errno == EIO and self._stop.isSet(): - self._packets.put((0, '')) - self._packets.put((e.errno, '')) - - def expect(self, expectation, times = 1, action = None): - """ - Adds an expectation for a packet to be received. - - Args: - expectation: Dictionary describing the expected packet. - times: Number of packets expected. None for unlimited. - action: A callback to run after the packet has been received. - """ - assert times != 0 - self._expectations.append(Expectation(expectation, times, action)) - - @property - def expectationsPending(self): - for e in self._expectations: - if e.pending: - return True - return False - - def run(self, timeout = 1): - """ - Runs the packet reader, waiting for all limited expectations to be met. - - Args: - timeout: Wait timeout in seconds. - """ - while self.expectationsPending: - try: - (code, payload) = self._packets.get(True, timeout) - except Queue.Empty: - # No packet received. - break - - if code != 0: - # read error, re-raise. - raise OSError((code, os.strerror(code))) - - if len(payload) == 0: - # EOF on read. - break - - # decode the packet and match it against expectation. - matches = False - for e in self._expectations: - if e.check(self._decode(payload)): - matches = True - break - if not matches and not self._skip: - return False - - return not self.expectationsPending - diff --git a/mac-tap/tuntap/test/tuntap/route.py b/mac-tap/tuntap/test/tuntap/route.py deleted file mode 100644 index b59707ec..00000000 --- a/mac-tap/tuntap/test/tuntap/route.py +++ /dev/null @@ -1,112 +0,0 @@ -# Copyright (c) 2011 Mattias Nissler <mattias.nissler@gmx.de> -# -# Redistribution and use in source and binary forms, with or without modification, are permitted -# provided that the following conditions are met: -# -# 1. Redistributions of source code must retain the above copyright notice, this list of -# conditions and the following disclaimer. -# 2. Redistributions in binary form must reproduce the above copyright notice, this list of -# conditions and the following disclaimer in the documentation and/or other materials provided -# with the distribution. -# 3. The name of the author may not be used to endorse or promote products derived from this -# software without specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, -# INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A -# PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, -# INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED -# TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS -# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT -# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS -# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - -import socket -import struct - -# from net/route.h -RTM_ADD = 0x1 # Add Route -RTM_DELETE = 0x2 # Delete Route -RTM_CHANGE = 0x3 # Change Metrics or flags -RTM_GET = 0x4 # Report Metrics -RTM_LOSING = 0x5 # Kernel Suspects Partitioning -RTM_REDIRECT = 0x6 # Told to use different route -RTM_MISS = 0x7 # Lookup failed on this address -RTM_LOCK = 0x8 # fix specified metrics -RTM_OLDADD = 0x9 # caused by SIOCADDRT -RTM_OLDDEL = 0xa # caused by SIOCDELRT -RTM_RESOLVE = 0xb # req to resolve dst to LL addr -RTM_NEWADDR = 0xc # address being added to iface -RTM_DELADDR = 0xd # address being removed from iface -RTM_IFINFO = 0xe # iface going up/down etc. -RTM_NEWMADDR = 0xf # mcast group membership being added to if -RTM_DELMADDR = 0x10 # mcast group membership being deleted - -RTF_UP = 0x1 # route usable -RTF_GATEWAY = 0x2 # destination is a gateway -RTF_HOST = 0x4 # host entry (net otherwise) -RTF_REJECT = 0x8 # host or net unreachable -RTF_DYNAMIC = 0x10 # created dynamically (by redirect) -RTF_MODIFIED = 0x20 # modified dynamically (by redirect) -RTF_DONE = 0x40 # message confirmed -RTF_DELCLONE = 0x80 # delete cloned route -RTF_CLONING = 0x100 # generate new routes on use -RTF_XRESOLVE = 0x200 # external daemon resolves name -RTF_LLINFO = 0x400 # generated by link layer (e.g. ARP) -RTF_STATIC = 0x800 # manually added -RTF_BLACKHOLE = 0x1000 # just discard pkts (during updates) -RTF_PROTO2 = 0x4000 # protocol specific routing flag -RTF_PROTO1 = 0x8000 # protocol specific routing flag - -RTF_PRCLONING = 0x10000 # protocol requires cloning -RTF_WASCLONED = 0x20000 # route generated through cloning -RTF_PROTO3 = 0x40000 # protocol specific routing flag -RTF_LOCAL = 0x200000 # route represents a local address -RTF_BROADCAST = 0x400000 # route represents a bcast address -RTF_MULTICAST = 0x800000 # route represents a mcast address -RTF_IFSCOPE = 0x1000000 # has valid interface scope -RTF_CONDEMNED = 0x2000000 # defunct; no longer modifiable - -RTA_DST = 0x1 # destination sockaddr present -RTA_GATEWAY = 0x2 # gateway sockaddr present -RTA_NETMASK = 0x4 # netmask sockaddr present -RTA_GENMASK = 0x8 # cloning mask sockaddr present -RTA_IFP = 0x10 # interface name sockaddr present -RTA_IFA = 0x20 # interface addr sockaddr present -RTA_AUTHOR = 0x40 # sockaddr for author of redirect -RTA_BRD = 0x80 # for NEWADDR, broadcast or p-p dest addr - -RTM_VERSION = 5 - -PF_ROUTE = 17 - -STRUCT_RTMSG = struct.Struct('HBBHiiHiiiI3Ii10I') - -def _sendRouteMsg(type, index = 0, flags = 0, addrs = {}): - def add_addr((addr_flags, payload), (addr, flag)): - if not addr: - return (addr_flags, payload) - - return (addr_flags | flag, payload + addr.encode()) - - (addr_flags, payload) = reduce(add_addr, - [ (addrs['dst'], RTA_DST), - (addrs['gateway'], RTA_GATEWAY), - (addrs['netmask'], RTA_NETMASK) ], - (0, '')) - msglen = STRUCT_RTMSG.size + len(payload) - data = STRUCT_RTMSG.pack(msglen, RTM_VERSION, type, index, flags, addr_flags, *((0,) * 19)) - - sock = socket.socket(PF_ROUTE, socket.SOCK_RAW) - try: - sock.send(data + payload) - finally: - sock.close() - -def addNet(dst = None, gateway = None, netmask = None, interface = None): - flags = RTF_STATIC | RTF_UP - if gateway: - flags |= RTF_GATEWAY - elif interface: - gateway = interface - _sendRouteMsg(type = RTM_ADD, flags = flags, - addrs = dict(dst = dst, gateway = gateway, netmask = netmask)) diff --git a/mac-tap/tuntap/test/tuntap/sockaddr.py b/mac-tap/tuntap/test/tuntap/sockaddr.py deleted file mode 100644 index 59edbfc0..00000000 --- a/mac-tap/tuntap/test/tuntap/sockaddr.py +++ /dev/null @@ -1,124 +0,0 @@ -# Copyright (c) 2011 Mattias Nissler <mattias.nissler@gmx.de> -# -# Redistribution and use in source and binary forms, with or without modification, are permitted -# provided that the following conditions are met: -# -# 1. Redistributions of source code must retain the above copyright notice, this list of -# conditions and the following disclaimer. -# 2. Redistributions in binary form must reproduce the above copyright notice, this list of -# conditions and the following disclaimer in the documentation and/or other materials provided -# with the distribution. -# 3. The name of the author may not be used to endorse or promote products derived from this -# software without specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, -# INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A -# PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, -# INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED -# TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS -# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT -# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS -# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - -import socket -import struct - -class SockaddrDl(object): - - AF_LINK = 18 - STRUCT = struct.Struct('BBH4B') - - def __init__(self, name, addr, type, index = 0, af = AF_LINK): - self.af = af - self.index = index - self.type = type - self.name = name - self.addr = addr - - def __repr__(self): - return 'SockaddrDl<%d, %d, %d, %s, %s>' % (self.af, self.index, self.type, - self.name, repr(self.addr)) - - def __eq__(self, other): - return (self.af == other.af and self.index == other.index and self.type == other.type and - self.name == other.name and self.addr == other.addr) - - def encode(self): - # It's important to make this size 12 at least to meet sizeof(struct sockaddr_dl), routing - # setup chokes if it's not. - datalen = max(len(self.name) + len(self.addr), 12) - namelen = datalen - len(self.addr) - data = SockaddrDl.STRUCT.pack(SockaddrDl.STRUCT.size + datalen, - self.af, self.index, self.type, - namelen, len(self.addr), 0) - return data + self.name + '\x00' * (namelen - len(self.name)) + self.addr - - @classmethod - def decode(self, data): - fields = SockaddrDl.STRUCT.unpack_from(data) - pname = SockaddrDl.STRUCT.size - paddr = pname + fields[4] - pend = paddr + fields[5] - return SockaddrDl(af = fields[1], index = fields[2], type = fields[3], - name = data[pname:paddr], addr = data[paddr:pend]) - - -class SockaddrIn(object): - """ - Python wrapper for struct sockaddr_in. - """ - - STRUCT = struct.Struct('BBH4s8x') - - def __init__(self, addr, port = 0, af = socket.AF_INET): - self.addr = addr or '0.0.0.0' - self.port = port - self.af = af - - def __repr__(self): - return 'SockaddrIn<%d, %d, %s>' % (self.af, self.port, self.addr) - - def __eq__(self, other): - return self.encode() == other.encode() - - def encode(self): - return SockaddrIn.STRUCT.pack(16, self.af, self.port, socket.inet_aton(self.addr)) - - @classmethod - def decode(cls, data): - t = SockaddrIn.STRUCT.unpack(data) - return SockaddrIn(addr = socket.inet_ntoa(t[3]), port = t[2], af = t[1]) - - -class SockaddrIn6(object): - """ - Python wrapper for struct sockaddr_in6. - """ - - STRUCT = struct.Struct('BBHI16sI') - - def __init__(self, addr, port = 0, af = socket.AF_INET6, flowinfo = 0, scopeid = 0): - self.addr = addr or '::0' - self.port = port - self.af = af - self.flowinfo = flowinfo - self.scopeid = scopeid - - def __repr__(self): - return 'SockaddrIn6<%d, %d, %s, %d, %d>' % (self.af, self.port, self.addr, - self.flowinfo, self.scopeid) - - def __eq__(self, other): - return self.encode() == other.encode() - - def encode(self): - return SockaddrIn6.STRUCT.pack(28, self.af, self.port, self.flowinfo, - socket.inet_pton(socket.AF_INET6, self.addr), self.scopeid) - - @classmethod - def decode(cls, data): - t = SockaddrIn6.STRUCT.unpack(data) - return SockaddrIn6(addr = socket.inet_ntop(socket.AF_INET6, t[4]), port = t[2], af = t[1], - flowinfo = t[3], scopeid = t[5]) - - diff --git a/mac-tap/tuntap/test/tuntap/test_char_dev.py b/mac-tap/tuntap/test/tuntap/test_char_dev.py deleted file mode 100644 index ae34bf7c..00000000 --- a/mac-tap/tuntap/test/tuntap/test_char_dev.py +++ /dev/null @@ -1,86 +0,0 @@ -# Copyright (c) 2011 Mattias Nissler <mattias.nissler@gmx.de> -# -# Redistribution and use in source and binary forms, with or without modification, are permitted -# provided that the following conditions are met: -# -# 1. Redistributions of source code must retain the above copyright notice, this list of -# conditions and the following disclaimer. -# 2. Redistributions in binary form must reproduce the above copyright notice, this list of -# conditions and the following disclaimer in the documentation and/or other materials provided -# with the distribution. -# 3. The name of the author may not be used to endorse or promote products derived from this -# software without specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, -# INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A -# PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, -# INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED -# TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS -# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT -# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS -# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - -import errno -import os -from tuntap.char_dev_harness import TunCharDevHarness, TapCharDevHarness -from unittest import TestCase - -class TestCharDev(TestCase): - - def __init__(self, name, newHarness): - super(TestCharDev, self).__init__(name) - self._newHarness = newHarness - - def setUp(self): - self.char_dev = self._newHarness() - self.char_dev.open() - - def tearDown(self): - self.char_dev.close() - - def test_Open(self): - pass - - def test_OpenTwiceBusy(self): - second = self._newHarness(self.char_dev.unit) - try: - second.open() - second.close() - self.fail() - except OSError as e: - self.assertEqual(errno.EBUSY, e.errno) - - def test_ReadFails(self): - try: - os.read(self.char_dev.fileno(), 1) - self.fail() - except OSError as e: - self.assertEqual(errno.EIO, e.errno) - - def test_WriteFails(self): - try: - os.write(self.char_dev.fileno(), '') - self.fail() - except OSError as e: - self.assertEqual(errno.EIO, e.errno) - - -class TestTunCharDev(TestCharDev): - - def __init__(self, name): - super(TestTunCharDev, self).__init__(name, TunCharDevHarness) - - def test_AFPrepend(self): - self.assertFalse(self.char_dev.prependAF) - - self.char_dev.prependAF = 1 - self.assertTrue(self.char_dev.prependAF) - - self.char_dev.prependAF = 0 - self.assertFalse(self.char_dev.prependAF) - - -class TestTapCharDev(TestCharDev): - - def __init__(self, name): - super(TestTapCharDev, self).__init__(name, TapCharDevHarness) diff --git a/mac-tap/tuntap/test/tuntap/test_interface.py b/mac-tap/tuntap/test/tuntap/test_interface.py deleted file mode 100644 index 7cf19b20..00000000 --- a/mac-tap/tuntap/test/tuntap/test_interface.py +++ /dev/null @@ -1,120 +0,0 @@ -# Copyright (c) 2011 Mattias Nissler <mattias.nissler@gmx.de> -# -# Redistribution and use in source and binary forms, with or without modification, are permitted -# provided that the following conditions are met: -# -# 1. Redistributions of source code must retain the above copyright notice, this list of -# conditions and the following disclaimer. -# 2. Redistributions in binary form must reproduce the above copyright notice, this list of -# conditions and the following disclaimer in the documentation and/or other materials provided -# with the distribution. -# 3. The name of the author may not be used to endorse or promote products derived from this -# software without specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, -# INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A -# PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, -# INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED -# TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS -# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT -# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS -# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - -import errno -import socket -import unittest - -from tuntap.char_dev_harness import TunCharDevHarness, TapCharDevHarness -from tuntap.interface_harness import Address, InterfaceHarness -from tuntap.sockaddr import SockaddrDl, SockaddrIn, SockaddrIn6 -from tuntap.tun_tap_harness import TunHarness, TapHarness - -class TestInterface(unittest.TestCase): - - def __init__(self, name, harness): - super(TestInterface, self).__init__(name) - self.harness = harness - - def setUp(self): - self.harness.start() - - def tearDown(self): - self.harness.stop() - - def test_CloseWhileUp(self): - self.harness.interface.flags |= InterfaceHarness.IFF_UP - self.harness.char_dev.close() - self.harness.start() - - def test_UpDown(self): - self.harness.interface.flags |= InterfaceHarness.IFF_UP - self.assertEquals(InterfaceHarness.IFF_UP, - self.harness.interface.flags & InterfaceHarness.IFF_UP) - self.harness.interface.flags &= ~InterfaceHarness.IFF_UP - self.assertEquals(0, - self.harness.interface.flags & InterfaceHarness.IFF_UP) - - def test_NetmaskAFFix(self): - self.harness.interface.addIfAddr(local = self.harness.addr.sa_local, - dst = self.harness.addr.sa_dst, - mask = SockaddrIn(af = 0, addr = self.harness.addr.mask)) - for addr in self.harness.interface.getAddrs(socket.AF_INET): - if addr[1] == self.harness.addr.sa_mask: - return; - self.fail() - - def test_Address(self): - self.harness.interface.addIfAddr(local = self.harness.addr.sa_local, - dst = self.harness.addr.sa_dst, - mask = self.harness.addr.sa_mask) - for addr in self.harness.interface.getAddrs(socket.AF_INET): - if (addr[0] == self.harness.addr.sa_local and - addr[1] == self.harness.addr.sa_mask and - addr[2] == self.harness.addr.sa_dst): - return - self.fail() - - def test_Address6(self): - def compare(expected, actual): - return (expected or SockaddrIn6(af = 0, addr = None)) == actual - - self.harness.interface.addIfAddr6(local = self.harness.addr6.sa_local, - dst = self.harness.addr6.sa_dst, - mask = self.harness.addr6.sa_mask) - for addr in self.harness.interface.getAddrs(socket.AF_INET6): - if (compare(addr[0], self.harness.addr6.sa_local) and - compare(addr[1], self.harness.addr6.sa_mask) and - compare(addr[2], self.harness.addr6.sa_dst)): - return - self.fail() - - -class TestTunInterface(TestInterface): - - def __init__(self, name): - super(TestTunInterface, self).__init__(name, TunHarness()) - - def test_Flags(self): - self.assertEquals(InterfaceHarness.IFF_POINTOPOINT | - InterfaceHarness.IFF_RUNNING | - InterfaceHarness.IFF_SIMPLEX | - InterfaceHarness.IFF_MULTICAST, - self.harness.interface.flags) - - -class TestTapInterface(TestInterface): - - def __init__(self, name): - super(TestTapInterface, self).__init__(name, TapHarness()) - - def test_Flags(self): - self.assertEquals(InterfaceHarness.IFF_BROADCAST | - InterfaceHarness.IFF_RUNNING | - InterfaceHarness.IFF_SIMPLEX | - InterfaceHarness.IFF_MULTICAST, - self.harness.interface.flags) - - def test_SetLladdr(self): - addr = SockaddrDl(name = '', addr = '\x11\x22\x33\x44\x55\x66', type = 0) - self.harness.interface.lladdr = addr - self.assertEquals(addr.addr, self.harness.interface.lladdr.addr) diff --git a/mac-tap/tuntap/test/tuntap/test_ip.py b/mac-tap/tuntap/test/tuntap/test_ip.py deleted file mode 100644 index b910785c..00000000 --- a/mac-tap/tuntap/test/tuntap/test_ip.py +++ /dev/null @@ -1,218 +0,0 @@ -# Copyright (c) 2011 Mattias Nissler <mattias.nissler@gmx.de> -# -# Redistribution and use in source and binary forms, with or without modification, are permitted -# provided that the following conditions are met: -# -# 1. Redistributions of source code must retain the above copyright notice, this list of -# conditions and the following disclaimer. -# 2. Redistributions in binary form must reproduce the above copyright notice, this list of -# conditions and the following disclaimer in the documentation and/or other materials provided -# with the distribution. -# 3. The name of the author may not be used to endorse or promote products derived from this -# software without specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, -# INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A -# PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, -# INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED -# TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS -# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT -# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS -# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - -import functools -import socket -import struct -from unittest import TestCase - -from tuntap.packet import IPv4Packet, IPv6Packet, UDPPacket -from tuntap.packet_codec import TapPacketCodec -from tuntap.packet_reader import SelectPacketSource - -class TestIO(TestCase): - - def __init__(self, name, af, listenAddress, codec): - super(TestIO, self).__init__(name) - self._codec = codec(af, listenAddress); - - def __str__(self): - return '%s [%s]' % (super(TestIO, self).__str__(), str(self._codec)) - - def setUp(self): - super(TestIO, self).setUp() - self._codec.start() - - def tearDown(self): - self._codec.stop() - super(TestIO, self).tearDown() - - -class TestIp(TestIO): - - def __init__(self, name, codec): - super(TestIp, self).__init__(name, socket.AF_INET, None, codec) - - def test_Send(self): - payload = 'knock, knock!' - port = 12345 - self._codec.sendUDP(payload, (self._codec.addr.remote, port)) - self._codec.expectPacket( - { 'version': 4, - 'src': socket.inet_pton(self._codec.af, self._codec.addr.local), - 'dst': socket.inet_pton(self._codec.af, self._codec.addr.remote), - 'proto': IPv4Packet.PROTO_UDP, - 'payload': { 'dst': port, - 'payload': payload } }) - self.assertTrue(self._codec.runPacket()) - - def test_Recv(self): - srcport = 23456 - payload = 'who\'s there?' - packet = IPv4Packet(proto = IPv4Packet.PROTO_UDP, - src = socket.inet_pton(self._codec.af, self._codec.addr.remote), - dst = socket.inet_pton(self._codec.af, self._codec.addr.local), - payload = UDPPacket(src = srcport, - dst = self._codec.UDPPort, - payload = payload)) - self._codec.sendPacket(packet.encode()) - self._codec.expectUDP(payload) - self.assertTrue(self._codec.runUDP()) - - -class TestIp6(TestIO): - - def __init__(self, name, codec): - super(TestIp6, self).__init__(name, socket.AF_INET6, None, codec) - - def test_Send(self): - payload = 'knock, knock!' - port = 12345 - self._codec.sendUDP(payload, (self._codec.addr.remote, port)) - self._codec.expectPacket( - { 'version': 6, - 'src': socket.inet_pton(self._codec.af, self._codec.addr.local), - 'dst': socket.inet_pton(self._codec.af, self._codec.addr.remote), - 'proto': IPv6Packet.PROTO_UDP, - 'payload': { 'dst': port, - 'payload': payload } }) - self.assertTrue(self._codec.runPacket()) - - def test_Recv(self): - srcport = 23456 - payload = 'who\'s there?' - packet = IPv6Packet(proto = IPv6Packet.PROTO_UDP, - src = socket.inet_pton(self._codec.af, self._codec.addr.remote), - dst = socket.inet_pton(self._codec.af, self._codec.addr.local), - payload = UDPPacket(src = srcport, - dst = self._codec.UDPPort, - payload = payload)) - self._codec.sendPacket(packet.encode()) - self._codec.expectUDP(payload) - self.assertTrue(self._codec.runUDP()) - - -class TestMulticast(TestIO): - - MULTICAST_GROUP = '224.1.2.3' - - def __init__(self, name, codec): - super(TestMulticast, self).__init__(name, socket.AF_INET, TestMulticast.MULTICAST_GROUP, - codec) - - def setUp(self): - super(TestMulticast, self).setUp() - mreq = struct.pack('4s4s', - socket.inet_pton(self._codec.af, TestMulticast.MULTICAST_GROUP), - socket.inet_pton(self._codec.af, self._codec.addr.local)) - self._codec._recvSock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq) - self._codec._sendSock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 1) - self._codec._sendSock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_IF, - socket.inet_pton(self._codec.af, self._codec.addr.local)) - - def test_Send(self): - payload = 'knock, knock!' - port = 12345 - self._codec.sendUDP(payload, (TestMulticast.MULTICAST_GROUP, port)) - self._codec.expectPacket( - { 'version': 4, - 'src': socket.inet_pton(self._codec.af, self._codec.addr.local), - 'dst': socket.inet_pton(self._codec.af, TestMulticast.MULTICAST_GROUP), - 'proto': IPv4Packet.PROTO_UDP, - 'payload': { 'dst': port, - 'payload': payload } }) - self.assertTrue(self._codec.runPacket()) - - def test_Recv(self): - srcport = 23456 - payload = 'who\'s there?' - packet = IPv4Packet(proto = IPv4Packet.PROTO_UDP, - src = socket.inet_pton(self._codec.af, self._codec.addr.remote), - dst = socket.inet_pton(self._codec.af, TestMulticast.MULTICAST_GROUP), - payload = UDPPacket(src = srcport, - dst = self._codec.UDPPort, - payload = payload)) - self._codec.sendPacket(packet.encode()) - self._codec.expectUDP(payload) - self.assertTrue(self._codec.runUDP()) - - -class TestMulticast6(TestIO): - - MULTICAST_GROUP = 'ff05::114' - - def __init__(self, name, codec): - super(TestMulticast6, self).__init__(name, socket.AF_INET6, TestMulticast6.MULTICAST_GROUP, - codec) - - def setUp(self): - super(TestMulticast6, self).setUp() - mreq = struct.pack('16sI', - socket.inet_pton(self._codec.af, TestMulticast6.MULTICAST_GROUP), - self._codec._harness.interface.index) - self._codec._recvSock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_JOIN_GROUP, mreq) - self._codec._sendSock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_MULTICAST_HOPS, 1) - self._codec._sendSock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_MULTICAST_IF, - self._codec._harness.interface.index) - - def test_Send(self): - payload = 'knock, knock!' - port = 12345 - self._codec.sendUDP(payload, (TestMulticast6.MULTICAST_GROUP, port)) - self._codec.expectPacket( - { 'version': 6, - 'dst': socket.inet_pton(self._codec.af, TestMulticast6.MULTICAST_GROUP), - 'proto': IPv6Packet.PROTO_UDP, - 'payload': { 'dst': port, - 'payload': payload } }) - self.assertTrue(self._codec.runPacket()) - - def test_Recv(self): - srcport = 23456 - payload = 'who\'s there?' - packet = IPv6Packet(proto = IPv6Packet.PROTO_UDP, - src = socket.inet_pton(self._codec.af, self._codec.addr.remote), - dst = socket.inet_pton(self._codec.af, TestMulticast6.MULTICAST_GROUP), - payload = UDPPacket(src = srcport, - dst = self._codec.UDPPort, - payload = payload)) - self._codec.sendPacket(packet.encode()) - self._codec.expectUDP(payload) - self.assertTrue(self._codec.runUDP()) - - -class TestTapLladdr(TestIp): - - def __init__(self, name): - super(TestTapLladdr, self).__init__(name, - lambda af, addr: TapPacketCodec(af, addr, - SelectPacketSource)) - - def setUp(self): - super(TestTapLladdr, self).setUp() - - # Swap out the link-level address with a different address. - lladdr = self._codec._harness.interface.lladdr - mac_addr = list(lladdr.addr) - mac_addr[5] = chr(ord(mac_addr[5]) ^ 0xff) - lladdr.addr = ''.join(mac_addr) - self._codec._harness.interface.lladdr = lladdr diff --git a/mac-tap/tuntap/test/tuntap/tun_tap_harness.py b/mac-tap/tuntap/test/tuntap/tun_tap_harness.py deleted file mode 100644 index cb07638e..00000000 --- a/mac-tap/tuntap/test/tuntap/tun_tap_harness.py +++ /dev/null @@ -1,96 +0,0 @@ -# Copyright (c) 2011 Mattias Nissler <mattias.nissler@gmx.de> -# -# Redistribution and use in source and binary forms, with or without modification, are permitted -# provided that the following conditions are met: -# -# 1. Redistributions of source code must retain the above copyright notice, this list of -# conditions and the following disclaimer. -# 2. Redistributions in binary form must reproduce the above copyright notice, this list of -# conditions and the following disclaimer in the documentation and/or other materials provided -# with the distribution. -# 3. The name of the author may not be used to endorse or promote products derived from this -# software without specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, -# INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A -# PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, -# INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED -# TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS -# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT -# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS -# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - -import errno -import socket - -from tuntap.char_dev_harness import TunCharDevHarness, TapCharDevHarness -from tuntap.interface_harness import Address, InterfaceHarness -import tuntap.route - -class TunTapHarness(object): - - def __init__(self, name, newCharDevHarness, addr, addr6): - self._newCharDevHarness = newCharDevHarness - self.name = name - self.addr = addr - self.addr6 = addr6 - - def start(self): - self.char_dev = self._newCharDevHarness() - self.char_dev.open() - self.interface = InterfaceHarness(self.name, self.char_dev.unit) - - def up(self): - self.interface.addIfAddr(local = self.addr.sa_local, - dst = self.addr.sa_dst, - mask = self.addr.sa_mask) - self.interface.addIfAddr6(local = self.addr6.sa_local, - dst = self.addr6.sa_dst, - mask = self.addr6.sa_mask) - - # Lion automatically creates routes for IPv6 addresses, earlier versions don't. - try: - tuntap.route.addNet(dst = self.addr6.sa_remote, - netmask = self.addr6.sa_mask, - interface = self.interface.lladdr) - except IOError as e: - if e.errno != errno.EEXIST: - raise e - - self.interface.flags |= InterfaceHarness.IFF_UP - - def stop(self): - self.interface.flags &= ~InterfaceHarness.IFF_UP - self.char_dev.close() - - -class TunHarness(TunTapHarness): - - def __init__(self, - addr = Address(af = socket.AF_INET, - local = '10.0.0.1', - remote = '10.0.0.2', - dst = '10.0.0.2', - mask = '255.255.255.255'), - addr6 = Address(af = socket.AF_INET6, - local = 'fd00::1', - remote = 'fd00::2', - dst = 'fd00::2', - mask = 'ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff')): - super(TunHarness, self).__init__('tun', TunCharDevHarness, addr, addr6) - - -class TapHarness(TunTapHarness): - - def __init__(self, - addr = Address(af = socket.AF_INET, - local = '10.0.0.1', - remote = '10.0.0.2', - dst = '10.255.255.255', - mask = '255.0.0.0'), - addr6 = Address(af = socket.AF_INET6, - local = 'fd00::1', - remote = 'fd00::2', - dst = None, - mask = 'ffff:ffff:ffff:ffff::0')): - super(TapHarness, self).__init__('tap', TapCharDevHarness, addr, addr6) diff --git a/mac-tap/tuntap/test/tuntap/tun_tap_test_case.py b/mac-tap/tuntap/test/tuntap/tun_tap_test_case.py deleted file mode 100644 index 28edc467..00000000 --- a/mac-tap/tuntap/test/tuntap/tun_tap_test_case.py +++ /dev/null @@ -1,40 +0,0 @@ -# Copyright (c) 2011 Mattias Nissler <mattias.nissler@gmx.de> -# -# Redistribution and use in source and binary forms, with or without modification, are permitted -# provided that the following conditions are met: -# -# 1. Redistributions of source code must retain the above copyright notice, this list of -# conditions and the following disclaimer. -# 2. Redistributions in binary form must reproduce the above copyright notice, this list of -# conditions and the following disclaimer in the documentation and/or other materials provided -# with the distribution. -# 3. The name of the author may not be used to endorse or promote products derived from this -# software without specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, -# INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A -# PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, -# INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED -# TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS -# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT -# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS -# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - -from unittest import TestCase - -class TunTapTestCase(TestCase): - - def __init__(self, name, harness): - super(TunTapTestCase, self).__init__(name) - self.harness = harness - - def __str__(self): - return '%s [%s]' % (super(TunTapTestCase, self).__str__(), - self.harness.__class__.__name__) - - def setUp(self): - self.harness.start() - self.harness.up() - - def tearDown(self): - self.harness.stop() diff --git a/mac-tap/tuntap/test/tuntap/tuntap_tests.py b/mac-tap/tuntap/test/tuntap/tuntap_tests.py deleted file mode 100644 index fb5a431c..00000000 --- a/mac-tap/tuntap/test/tuntap/tuntap_tests.py +++ /dev/null @@ -1,83 +0,0 @@ -# Copyright (c) 2011 Mattias Nissler <mattias.nissler@gmx.de> -# -# Redistribution and use in source and binary forms, with or without modification, are permitted -# provided that the following conditions are met: -# -# 1. Redistributions of source code must retain the above copyright notice, this list of -# conditions and the following disclaimer. -# 2. Redistributions in binary form must reproduce the above copyright notice, this list of -# conditions and the following disclaimer in the documentation and/or other materials provided -# with the distribution. -# 3. The name of the author may not be used to endorse or promote products derived from this -# software without specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, -# INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A -# PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, -# INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED -# TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS -# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT -# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS -# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - -import argparse -import itertools -import re -import sys -import unittest - -from tuntap.packet_codec import TunPacketCodec, TunAFPacketCodec, TapPacketCodec -from tuntap.packet_reader import BlockingPacketSource, SelectPacketSource - -from tuntap.test_char_dev import TestTunCharDev, TestTapCharDev -from tuntap.test_interface import TestTunInterface, TestTapInterface -from tuntap.test_ip import TestIp, TestIp6, TestMulticast, TestMulticast6, TestTapLladdr - -class FilteringTestSuite(unittest.TestSuite): - - def __init__(self, filter): - super(FilteringTestSuite, self).__init__() - self._matcher = re.compile(filter or '.*') - - def __iter__(self): - return itertools.ifilter(lambda test : self._matcher.search(str(test)), - super(FilteringTestSuite, self).__iter__()) - -def loadTestsFromTestCase(testCaseClass, *args, **kwargs): - testCaseNames = unittest.getTestCaseNames(testCaseClass, 'test_') - return unittest.TestSuite(map(lambda n : testCaseClass(n, *args, **kwargs), testCaseNames)) - -def main(argv): - # Parse the command line. - parser = argparse.ArgumentParser(description = 'Run tuntap unit tests.') - parser.add_argument('--tests', type = str, nargs = '?', default = None, - help = 'tests to run') - parser.add_argument('--verbosity', type = int, nargs = '?', default = 2, - help = 'verbosity level') - options = parser.parse_args(argv[1:]) - - # Gather tests and run them. - loader = unittest.TestLoader() - suite = FilteringTestSuite(options.tests) - suite.addTests(loadTestsFromTestCase(TestTunCharDev)) - suite.addTests(loadTestsFromTestCase(TestTapCharDev)) - suite.addTests(loadTestsFromTestCase(TestTunInterface)) - suite.addTests(loadTestsFromTestCase(TestTapInterface)) - - codecs = (TunPacketCodec, TunAFPacketCodec, TapPacketCodec) - sources = (SelectPacketSource, BlockingPacketSource) - tests = (TestIp, TestIp6, TestMulticast, TestMulticast6) - for (test, codec, source) in [ (test, codec, source) for test in tests - for codec in codecs - for source in sources ]: - suite.addTests(loadTestsFromTestCase(test, lambda af, addr: codec(af, addr, source))) - - suite.addTests(loadTestsFromTestCase(TestTapLladdr)) - - runner = unittest.TextTestRunner(stream = sys.stderr, - descriptions = True, - verbosity = options.verbosity) - runner.run(suite) - -if __name__ == '__main__': - main(sys.argv) |