summaryrefslogtreecommitdiff
path: root/mac-tap/tuntap/test
diff options
context:
space:
mode:
authorAdam Ierymenko <adam.ierymenko@gmail.com>2013-10-28 09:33:32 -0400
committerAdam Ierymenko <adam.ierymenko@gmail.com>2013-10-28 09:33:32 -0400
commit148619f0ba836ff894185663633556dd04700a60 (patch)
tree710ecbbb7ebc6a2b19d910e10a2ccd472b86c579 /mac-tap/tuntap/test
parent17778a36badcb9fdb9b3292e8ec32be3b836c811 (diff)
downloadinfinitytier-148619f0ba836ff894185663633556dd04700a60.tar.gz
infinitytier-148619f0ba836ff894185663633556dd04700a60.zip
Make tap build on OSX 10.9, though not in a way that is easy for users. Instead package binaries.
Diffstat (limited to 'mac-tap/tuntap/test')
-rw-r--r--mac-tap/tuntap/test/tuntap/__init__.py21
-rw-r--r--mac-tap/tuntap/test/tuntap/char_dev_harness.py173
-rw-r--r--mac-tap/tuntap/test/tuntap/interface_harness.py279
-rw-r--r--mac-tap/tuntap/test/tuntap/ioctl.py31
-rw-r--r--mac-tap/tuntap/test/tuntap/packet.py531
-rw-r--r--mac-tap/tuntap/test/tuntap/packet_codec.py244
-rw-r--r--mac-tap/tuntap/test/tuntap/packet_reader.py270
-rw-r--r--mac-tap/tuntap/test/tuntap/route.py112
-rw-r--r--mac-tap/tuntap/test/tuntap/sockaddr.py124
-rw-r--r--mac-tap/tuntap/test/tuntap/test_char_dev.py86
-rw-r--r--mac-tap/tuntap/test/tuntap/test_interface.py120
-rw-r--r--mac-tap/tuntap/test/tuntap/test_ip.py218
-rw-r--r--mac-tap/tuntap/test/tuntap/tun_tap_harness.py96
-rw-r--r--mac-tap/tuntap/test/tuntap/tun_tap_test_case.py40
-rw-r--r--mac-tap/tuntap/test/tuntap/tuntap_tests.py83
15 files changed, 0 insertions, 2428 deletions
diff --git a/mac-tap/tuntap/test/tuntap/__init__.py b/mac-tap/tuntap/test/tuntap/__init__.py
deleted file mode 100644
index 664bdaae..00000000
--- a/mac-tap/tuntap/test/tuntap/__init__.py
+++ /dev/null
@@ -1,21 +0,0 @@
-# Copyright (c) 2011 Mattias Nissler <mattias.nissler@gmx.de>
-#
-# Redistribution and use in source and binary forms, with or without modification, are permitted
-# provided that the following conditions are met:
-#
-# 1. Redistributions of source code must retain the above copyright notice, this list of
-# conditions and the following disclaimer.
-# 2. Redistributions in binary form must reproduce the above copyright notice, this list of
-# conditions and the following disclaimer in the documentation and/or other materials provided
-# with the distribution.
-# 3. The name of the author may not be used to endorse or promote products derived from this
-# software without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES,
-# INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
-# PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT,
-# INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED
-# TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
-# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
-# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
-# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
diff --git a/mac-tap/tuntap/test/tuntap/char_dev_harness.py b/mac-tap/tuntap/test/tuntap/char_dev_harness.py
deleted file mode 100644
index 515e50bf..00000000
--- a/mac-tap/tuntap/test/tuntap/char_dev_harness.py
+++ /dev/null
@@ -1,173 +0,0 @@
-# Copyright (c) 2011 Mattias Nissler <mattias.nissler@gmx.de>
-#
-# Redistribution and use in source and binary forms, with or without modification, are permitted
-# provided that the following conditions are met:
-#
-# 1. Redistributions of source code must retain the above copyright notice, this list of
-# conditions and the following disclaimer.
-# 2. Redistributions in binary form must reproduce the above copyright notice, this list of
-# conditions and the following disclaimer in the documentation and/or other materials provided
-# with the distribution.
-# 3. The name of the author may not be used to endorse or promote products derived from this
-# software without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES,
-# INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
-# PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT,
-# INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED
-# TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
-# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
-# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
-# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-import errno
-import fcntl
-import io
-import os
-import struct
-from tuntap import ioctl
-
-class CharDevHarness(object):
- """
- Base class for the tun and tap character device harnesses. Manages a single character
- interface, keeps the file descriptor and handles I/O.
- """
-
- _MAX_CHAR_DEV = 16
- _MAX_PACKET_SIZE = 4096
-
- def __init__(self, class_name, unit = None):
- """
- Initializes the harness.
-
- Args:
- class_name: Path name pattern.
- unit: The character device number.
- """
- self._class_name = class_name
- self._unit = unit
- self._dev = None
-
- def _openCharDev(self, unit):
- """
- Opens the character device.
-
- Args:
- unit: The character device number.
- """
- assert not self._dev
-
- name = self._class_name % unit
- self._dev = os.open(name, os.O_RDWR)
-
- def open(self):
- """
- Opens the character device.
- """
- if self._unit != None:
- self._openCharDev(self._unit)
- return
-
- # Try to open character devices in turn.
- for i in xrange(0, self._MAX_CHAR_DEV):
- try:
- self._openCharDev(i)
- self._unit = i
- return
- except OSError as e:
- if e.errno != errno.EBUSY:
- raise e
-
- # All devices busy.
- raise OSError(errno.EBUSY)
-
- def close(self):
- """
- Closes the character device.
- """
- assert self._dev
- os.close(self._dev)
- self._dev = None
-
- def fileno(self):
- assert self._dev
- return self._dev
-
- def send(self, packet):
- assert self._dev
- os.write(self._dev, packet)
-
- def ioctl(self, cmd, format, arg):
- """
- Performs an ioctl on the character device.
-
- Args:
- cmd: the ioctl cmd identifier.
- format: argument format.
- arg: argument data tuple.
-
- Returns:
- Output argument tuple.
- """
- assert self._dev
- return struct.unpack(format, fcntl.ioctl(self._dev, cmd, struct.pack(format, arg)))
-
- @property
- def unit(self):
- """
- Returns the interface unit, if known.
- """
- return self._unit
-
-
-class TunCharDevHarness(CharDevHarness):
- """
- Character device harness for tun devices.
- """
-
- TUNSIFHEAD = ioctl.IOC(ioctl.OUT, 't', 96, 'i')
- TUNGIFHEAD = ioctl.IOC(ioctl.IN, 't', 97, 'i')
-
- def __init__(self, unit = None):
- """
- Initializes the harness.
-
- Args:
- unit: Character device index
- """
- super(TunCharDevHarness, self).__init__('/dev/tun%d', unit = unit)
-
- @property
- def prependAF(self):
- """
- Gets the AF prepending flag.
-
- Returns:
- A flag indicating whether packets on the char dev are prefixed with the AF number.
- """
- return self.ioctl(self.TUNGIFHEAD, 'i', (0))[0]
-
- @prependAF.setter
- def prependAF(self, prependAF):
- """
- Sets the AF prepending flag.
-
- Args:
- prependAF: whether the packets on the char dev are prefixed with the AF number.
- """
- self.ioctl(self.TUNSIFHEAD, 'i', (prependAF))
-
-
-class TapCharDevHarness(CharDevHarness):
- """
- Character device harness for tap devices.
- """
-
- def __init__(self, unit = None):
- """
- Initializes the harness.
-
- Args:
- unit: Character device index
- """
- super(TapCharDevHarness, self).__init__('/dev/tap%d', unit = unit)
diff --git a/mac-tap/tuntap/test/tuntap/interface_harness.py b/mac-tap/tuntap/test/tuntap/interface_harness.py
deleted file mode 100644
index fe4f1fc5..00000000
--- a/mac-tap/tuntap/test/tuntap/interface_harness.py
+++ /dev/null
@@ -1,279 +0,0 @@
-# Copyright (c) 2011 Mattias Nissler <mattias.nissler@gmx.de>
-#
-# Redistribution and use in source and binary forms, with or without modification, are permitted
-# provided that the following conditions are met:
-#
-# 1. Redistributions of source code must retain the above copyright notice, this list of
-# conditions and the following disclaimer.
-# 2. Redistributions in binary form must reproduce the above copyright notice, this list of
-# conditions and the following disclaimer in the documentation and/or other materials provided
-# with the distribution.
-# 3. The name of the author may not be used to endorse or promote products derived from this
-# software without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES,
-# INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
-# PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT,
-# INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED
-# TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
-# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
-# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
-# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-import ctypes
-import ctypes.util
-import errno
-import fcntl
-import socket
-import struct
-
-from tuntap import ioctl
-from tuntap.sockaddr import SockaddrDl, SockaddrIn, SockaddrIn6
-
-libc = ctypes.CDLL(ctypes.util.find_library('c'))
-
-class struct_sockaddr(ctypes.Structure):
- _fields_ = [ ('sa_len', ctypes.c_uint8),
- ('sa_family', ctypes.c_uint8) ]
-
-class struct_ifaddrs(ctypes.Structure):
- pass
-
-struct_ifaddrs._fields_ = [ ('ifa_next', ctypes.POINTER(struct_ifaddrs)),
- ('ifa_name', ctypes.c_char_p),
- ('ifa_flags', ctypes.c_uint),
- ('ifa_addr', ctypes.POINTER(struct_sockaddr)),
- ('ifa_netmask', ctypes.POINTER(struct_sockaddr)),
- ('ifa_dstaddr', ctypes.POINTER(struct_sockaddr)),
- ('ifa_data', ctypes.c_void_p) ]
-
-def decodeSockaddr(sockaddr):
- if not sockaddr:
- return None
-
- data = ctypes.string_at(sockaddr, max(sockaddr.contents.sa_len, 16))
- if sockaddr.contents.sa_family == SockaddrDl.AF_LINK:
- return SockaddrDl.decode(data)
- elif sockaddr.contents.sa_family == socket.AF_INET:
- return SockaddrIn.decode(data)
- elif sockaddr.contents.sa_family == socket.AF_INET6:
- return SockaddrIn6.decode(data)
-
- return None
-
-def getIfAddrs(ifname):
- ifaddrs = (ctypes.POINTER(struct_ifaddrs))()
- assert not libc.getifaddrs(ctypes.byref(ifaddrs))
-
- addrs = []
- try:
- entry = ifaddrs
- while entry:
- ia = entry.contents
- entry = ia.ifa_next
- if ia.ifa_name != ifname:
- continue
-
- addrs.append((decodeSockaddr(ia.ifa_addr),
- decodeSockaddr(ia.ifa_netmask),
- decodeSockaddr(ia.ifa_dstaddr)))
- return addrs
- finally:
- libc.freeifaddrs(ifaddrs)
-
-
-def ifNameToIndex(ifname):
- libc.if_nametoindex.restype = ctypes.c_uint
- index = libc.if_nametoindex(ifname)
- if not index:
- raise OSError(ctypes.get_errno)
- return index
-
-
-class Address(object):
- """
- Wraps address parameters for an interface.
- """
-
- def __init__(self, af, local, remote, dst, mask):
- self.af = af
- self.local = local
- self.remote = remote
- self.dst = dst
- self.mask = mask
-
- def __makeSaProperty(name):
- def get(self):
- addrmap = { socket.AF_INET: SockaddrIn,
- socket.AF_INET6: SockaddrIn6 }
- addr = getattr(self, name)
- if self.af not in addrmap:
- return None
- if addr == None:
- return addrmap[self.af](af = 0, addr = None)
- return addrmap[self.af](af = self.af, addr = addr)
-
- return property(get)
-
- sa_local = __makeSaProperty('local')
- sa_remote = __makeSaProperty('remote')
- sa_dst = __makeSaProperty('dst')
- sa_mask = __makeSaProperty('mask')
-
-
-class InterfaceHarness(object):
- """
- Base class for network interface harnesses. Provides helpers to configure the interface.
- """
-
- SIOCSIFFLAGS = ioctl.IOC(ioctl.OUT, 'i', 16, '16s16s')
- SIOCGIFFLAGS = ioctl.IOC(ioctl.INOUT, 'i', 17, '16s16s')
-
- SIOCAIFADDR = ioctl.IOC(ioctl.OUT, 'i', 26, '16s16s16s16s')
- SIOCAIFADDR_IN6 = ioctl.IOC(ioctl.OUT, 'i', 26, '16s28s28s28sIiiII')
- SIOCSIFLLADDR = ioctl.IOC(ioctl.OUT, 'i', 60, '16s16s')
-
- IFF_UP = 0x1
- IFF_BROADCAST = 0x2
- IFF_DEBUG = 0x4
- IFF_LOOPBACK = 0x8
- IFF_POINTOPOINT = 0x10
- IFF_NOTRAILERS = 0x20
- IFF_RUNNING = 0x40
- IFF_NOARP = 0x80
- IFF_PROMISC = 0x100
- IFF_ALLMULTI = 0x200
- IFF_OACTIVE = 0x400
- IFF_SIMPLEX = 0x800
- IFF_LINK0 = 0x1000
- IFF_LINK1 = 0x2000
- IFF_LINK2 = 0x4000
- IFF_MULTICAST = 0x8000
-
- def __init__(self, class_name, unit):
- """
- Initializes the harness.
-
- Args:
- class_name: Interface class name.
- unit: The interface number.
- """
- self._class_name = class_name
- self._unit = unit
-
- def _ioctl(self, af, cmd, format, arg):
- """
- Performs a socket ioctl.
-
- Args:
- af: address family.
- cmd: the ioctl cmd.
- format: argument format description.
- arg: argument data tuple.
-
- Returns:
- Output data tuple.
- """
- s = socket.socket(af, socket.SOCK_DGRAM)
- try:
- return struct.unpack(format, fcntl.ioctl(s, cmd, struct.pack(format, *arg)))
- finally:
- s.close()
-
- @property
- def flags(self):
- """
- Retrieves the interface flags.
-
- Returns:
- The interface flags.
- """
- return self._ioctl(socket.AF_INET, InterfaceHarness.SIOCGIFFLAGS,
- '16sH', (self.name, 0))[1]
-
- @flags.setter
- def flags(self, flags):
- """
- Sets new interface flags.
-
- Args:
- flags: new interface flags.
- """
- self._ioctl(socket.AF_INET, InterfaceHarness.SIOCSIFFLAGS,
- '16sH', (self.name, flags))
-
- @property
- def name(self):
- """
- Gets the interface name.
-
- Returns:
- Full interface name.
- """
- return "%s%d" % (self._class_name, self._unit)
-
- @property
- def index(self):
- """
- Gets the interface index.
-
- Returns:
- Interface index.
- """
- return ifNameToIndex(self.name)
-
- def getAddrs(self, af = None):
- def check(addr):
- if addr and addr.af == af:
- return addr
- else:
- return None
- return filter(lambda (a, n, d): a != None,
- map(lambda (a, n, d): (check(a), check(n), check(d)), getIfAddrs(self.name)))
-
- @property
- def lladdr(self):
- entry = self.getAddrs(SockaddrDl.AF_LINK)
- if entry:
- return entry[0][0]
- return None
-
- @lladdr.setter
- def lladdr(self, addr):
- self._ioctl(socket.AF_INET, InterfaceHarness.SIOCSIFLLADDR,
- '16sBB14s', (self.name, len(addr.addr), addr.af, addr.addr))
-
- def addIfAddr(self, local, dst, mask):
- """
- Set an interface address.
-
- Args:
- local: local address.
- dst: broadcast address or destination address, respectively.
- mask: the netmask.
- """
- self._ioctl(socket.AF_INET, InterfaceHarness.SIOCAIFADDR,
- '16s16s16s16s', (self.name, local.encode(), dst.encode(), mask.encode()))
-
- def addIfAddr6(self, local, dst, mask):
- """
- Set an INET6 address for the interface.
-
- Args:
- local: local address.
- dst: destination address.
- mask: the netmask.
- """
- # This sometimes fails on Tiger with ENOBUFS. Just retry...
- ntries = 0
- while True:
- try:
- self._ioctl(socket.AF_INET6, InterfaceHarness.SIOCAIFADDR_IN6,
- '16s28s28s28sIiiII',
- (self.name, local.encode(), dst.encode(), mask.encode(),
- 0, 0, 0, 0xffffffff, 0xffffffff))
- break
- except IOError as e:
- if e.errno != errno.ENOBUFS or ntries > 10:
- raise e
- ntries += 1
diff --git a/mac-tap/tuntap/test/tuntap/ioctl.py b/mac-tap/tuntap/test/tuntap/ioctl.py
deleted file mode 100644
index 641ddf24..00000000
--- a/mac-tap/tuntap/test/tuntap/ioctl.py
+++ /dev/null
@@ -1,31 +0,0 @@
-# Copyright (c) 2011 Mattias Nissler <mattias.nissler@gmx.de>
-#
-# Redistribution and use in source and binary forms, with or without modification, are permitted
-# provided that the following conditions are met:
-#
-# 1. Redistributions of source code must retain the above copyright notice, this list of
-# conditions and the following disclaimer.
-# 2. Redistributions in binary form must reproduce the above copyright notice, this list of
-# conditions and the following disclaimer in the documentation and/or other materials provided
-# with the distribution.
-# 3. The name of the author may not be used to endorse or promote products derived from this
-# software without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES,
-# INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
-# PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT,
-# INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED
-# TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
-# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
-# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
-# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-import struct
-
-VOID = 0x20000000
-IN = 0x40000000
-OUT = 0x80000000
-INOUT = IN | OUT
-
-def IOC(inout, group, num, format):
- return inout | ((struct.calcsize(format) & 0x1fff) << 16) | (ord(group) << 8) | num
diff --git a/mac-tap/tuntap/test/tuntap/packet.py b/mac-tap/tuntap/test/tuntap/packet.py
deleted file mode 100644
index 6fbbb736..00000000
--- a/mac-tap/tuntap/test/tuntap/packet.py
+++ /dev/null
@@ -1,531 +0,0 @@
-# Copyright (c) 2011 Mattias Nissler <mattias.nissler@gmx.de>
-#
-# Redistribution and use in source and binary forms, with or without modification, are permitted
-# provided that the following conditions are met:
-#
-# 1. Redistributions of source code must retain the above copyright notice, this list of
-# conditions and the following disclaimer.
-# 2. Redistributions in binary form must reproduce the above copyright notice, this list of
-# conditions and the following disclaimer in the documentation and/or other materials provided
-# with the distribution.
-# 3. The name of the author may not be used to endorse or promote products derived from this
-# software without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES,
-# INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
-# PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT,
-# INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED
-# TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
-# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
-# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
-# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-import socket
-
-class BinStruct(object):
- """
- Handles packing and unpacking of binary data. It is vaguely inspired by the struct module but
- taylored for bit-granular fields. Also, it's probably not very fast :)
- """
-
- @staticmethod
- def str2num(data, width):
- if not data:
- return 0
- len, rem = divmod(width, 8)
- val = 0
- for i in range(len):
- val = (val << 8) | ord(data[i])
- if rem:
- val = (val << rem) | (ord(data[len]) & ((1 << rem) - 1))
- return val
-
- @staticmethod
- def num2str(val, width):
- result = bytearray((width + 7) / 8)
- p, rem = divmod(width, 8)
- if rem:
- result[p] = chr(val & ((1 << rem) - 1))
- val >>= rem
- while p > 0:
- p -= 1
- result[p] = chr(val & 0xff)
- val >>= 8
- return str(result)
-
- def __init__(self, format):
- """
- Initializes a BinStruct object that can encode and decode the binary structure specified in
- the format parameters.
-
- Args:
- format: Specifies the format of the binary data. The syntax is
-
- (<width><type>)*
-
- where:
- width is the width of a component in number of bits
- type is indicates the type of the component and may be one of:
- s: binary data
- n: number
- """
- self._format = []
- id = lambda x, width : x or 0
- typemap = {
- 'n': (id, id),
- 's': (BinStruct.num2str, BinStruct.str2num),
- }
- pos = 0
- self._width = 0
- while pos < len(format):
- start = pos
- while str.isdigit(format[pos]):
- pos += 1
- width = int(format[start:pos])
- self._width += width
- codec = typemap[format[pos]]
- self._format.insert(0, (width, codec[0], codec[1]))
- pos += 1
-
- @property
- def size(self):
- return (self._width + 7) / 8
-
- def pack(self, *values):
- """
- Encodes the passed values according to this BinStruct's format definition.
-
- Args:
- values: The values to encode.
- Returns:
- The encoded struct as a binary string.
- """
- assert len(values) == len(self._format)
- val = 0
- pos = len(self._format)
- for value in values:
- pos -= 1
- (width, decode, encode) = self._format[pos]
- val = (val << width) | (encode(value, width) & ((1 << width) - 1))
- return BinStruct.num2str(val, self._width)
-
- def unpack(self, data):
- """
- Decodes a binary string according to the format definition.
-
- Args:
- data: The binary string to decode.
- Returns:
- A value tuple.
- """
- assert len(data) >= self.size
- val = BinStruct.str2num(data, self._width)
- pos = len(self._format)
- result = [ None for i in range(pos) ]
- for (width, decode, encode) in self._format:
- pos -= 1
- result[pos] = decode(val & ((1 << width) - 1), width)
- val >>= width
- return tuple(result)
-
-
-class Packet(object):
- """
- Base class for packet encoding and decoding.
- """
-
- def __init__(self, format, names, data = None, **initializer):
- """
- Initializes the packet.
-
- Args:
- format: Binary format description.
- names: Names for the packet fields.
- data: Optional binary packet to decode.
- initializer: Optional initialization values for the packet fields.
- """
- self._struct = BinStruct(format)
- self._names = names
- self.__dict__.update(dict.fromkeys(self._names, None))
- self.payload = None
-
- if isinstance(data, str):
- self.decode(data)
- elif isinstance(data, Packet):
- self.update(data)
-
- self.__dict__.update(initializer)
-
- def __repr__(self):
- return repr(dict(map(lambda x : (x, getattr(self, x)), self._names + ('payload',))))
-
- def _payloadPos(self):
- """
- Returns: The payload position in the data buffer.
- """
- return self._struct.size
-
- def _decodePayload(self, data):
- """
- Decodes the payload data.
-
- Args:
- data: Payload data buffer.
- Returns:
- The payload object.
- """
- return data
-
- def _encodePayload(self):
- """
- Encodes the payload data.
-
- Args:
- payload: Payload object.
- Returns:
- Encoded payload byte string.
- """
- if issubclass(self.payload.__class__, Packet):
- return self.payload.encode()
- return str(self.payload)
-
- def _encodeFields(self, *fields):
- """
- Takes a fields tuple and returns encoded field data.
-
- Args:
- fields: Field values.
- Returns:
- Tuple of encoded fields.
- """
- return self._struct.pack(*fields)
-
- def decode(self, data):
- """
- Decode a binary packet.
-
- Args:
- data: Binary packet data to decode.
- """
- fields = self._struct.unpack(data)
- assert len(fields) == len(self._names)
- self.__dict__.update(dict(zip(self._names, fields)))
- self.payload = self._decodePayload(data[self._payloadPos():])
-
- def update(self, data):
- """
- Update the packet from a dictionary.
-
- Args:
- data: The dictionary to update from.
- """
- self.__dict__.update(map(lambda x : (x, getattr(data, x)), self._names + ('payload',)))
- if isinstance(self.payload, str):
- self.payload = self._decodePayload(self.payload)
-
-
- def encode(self):
- """
- Encodes the packet into binary format.
-
- Returns:
- The packet data.
- """
- fields = map(lambda x : getattr(self, x), self._names)
- return self._encodeFields(*fields) + self._encodePayload()
-
- @property
- def headerLen(self):
- """
- The size of the header according to the format.
-
- Returns:
- The header length.
- """
- return self._struct.size
-
-
-class TunAFFrame(Packet):
-
- def __init__(self, data = None, **initializer):
- super(TunAFFrame, self).__init__('32n', ('af',), data, **initializer)
-
- def _decodePayload(self, data):
- if self.af == socket.AF_INET:
- return IPv4Packet(data)
- elif self.af == socket.AF_INET6:
- return IPv6Packet(data)
- return data
-
-
-class EthernetFrame(Packet):
-
- TYPE_IPV4 = 0x0800
- TYPE_ARP = 0x0806
- TYPE_IPV6 = 0x86dd
-
- def __init__(self, data = None, **initializer):
- super(EthernetFrame, self).__init__('48s48s16n', ('dst', 'src', 'type'),
- data, **initializer)
-
- def _decodePayload(self, data):
- if self.type == EthernetFrame.TYPE_IPV4:
- return IPv4Packet(data)
- elif self.type == EthernetFrame.TYPE_ARP:
- return ARPPacket(data)
- elif self.type == EthernetFrame.TYPE_IPV6:
- return IPv6Packet(data)
- return data
-
-
-class ARPPacket(Packet):
-
- HTYPE_ETHERNET = 0x01
- HLEN_ETHERNET = 6
- PTYPE_IPV4 = 0x0800
- PLEN_IPV4 = 4
- OPER_REQUEST = 1
- OPER_REPLY = 2
-
- def __init__(self, data = None, **initializer):
- super(ARPPacket, self).__init__('16n16n8n8n16n48s32s48s32s',
- ('htype', 'ptype', 'hlen', 'plen', 'oper',
- 'sha', 'spa', 'tha', 'tpa'),
- data, **initializer)
-
-
-class IPv4Packet(Packet):
-
- PROTO_ICMP = 0x01
- PROTO_TCP = 0x06
- PROTO_UDP = 0x11
-
- class UDPPseudoHeader(Packet):
-
- def __init__(self, data = None, **initializer):
- super(IPv4Packet.UDPPseudoHeader, self).__init__('32s32s8s8n16n',
- ('src', 'dst',
- 'padding', 'proto', 'length'),
- data, **initializer)
-
-
- def __init__(self, data = None, **initializer):
- super(IPv4Packet, self).__init__('4n4n6n2n16n16n2n14n8n8n16n32s32s',
- ('version', 'hdrlen', 'dscp', 'ecn',
- 'len', 'id', 'flags', 'fragoffset',
- 'ttl', 'proto', 'checksum', 'src', 'dst'),
- data, **initializer)
-
- def _payloadPos(self):
- return self.hdrlen * 4
-
- def _decodePayload(self, data):
- if self.proto == IPv4Packet.PROTO_UDP:
- return UDPPacket(data)
- return data
-
- @staticmethod
- def computeChecksum(data):
- """
- Computes the IPv4 header checksum.
-
- Args:
- Header in binary.
- Returns:
- The header checksum.
- """
- sum = 0
- for i in range(0, len(data) - 1, 2):
- sum += ord(data[i]) << 8 | ord(data[i + 1])
- if len(data) % 2 == 1:
- sum += ord(data[-1]) << 8 | 0
- return ~((sum & 0xffff) + (sum >> 16))
-
- def encode(self):
- payload = self._encodePayload()
- hdrlen = self.hdrlen or 5
- payloadlen = self.len or len(payload)
- fields = [self.version or 4, hdrlen, self.dscp or 0, self.ecn or 0,
- payloadlen + hdrlen * 4, self.id or 0, self.flags or 0,
- self.fragoffset or 0, self.ttl or 255, self.proto, self.checksum or 0,
- self.src, self.dst]
-
- # Need to compute UDP checksum here since it includes the IPv4 pseudo header.
- if (self.proto == IPv4Packet.PROTO_UDP and
- issubclass(self.payload.__class__, UDPPacket) and
- self.payload.checksum == None):
-
- header = IPv4Packet.UDPPseudoHeader(src = self.src, dst = self.dst,
- proto = IPv4Packet.PROTO_UDP, length = payloadlen,
- payload = payload)
- payload = UDPPacket(data = self.payload,
- checksum = IPv4Packet.computeChecksum(header.encode())).encode()
-
- header = self._encodeFields(*tuple(fields))
- if self.checksum == None:
- fields[10] = IPv4Packet.computeChecksum(header)
- header = self._encodeFields(*tuple(fields))
- return header + payload
-
-class IPv6Packet(Packet):
-
- PROTO_ICMP = 1
- PROTO_TCP = 6
- PROTO_UDP = 17
- PROTO_ICMPV6 = 58
-
- class UDPPseudoHeader(Packet):
-
- def __init__(self, data = None, **initializer):
- super(IPv6Packet.UDPPseudoHeader, self).__init__('128s128s32n24s8n',
- ('src', 'dst',
- 'length', 'padding', 'proto'),
- data, **initializer)
-
-
- def __init__(self, data = None, **initializer):
- super(IPv6Packet, self).__init__('4n8n20n16n8n8n128s128s',
- ('version', 'traffic_class', 'flow_label',
- 'len', 'proto', 'hop_limit',
- 'src', 'dst'),
- data, **initializer)
-
- def _decodePayload(self, data):
- if self.proto == IPv6Packet.PROTO_UDP:
- return UDPPacket(data)
- elif self.proto == IPv6Packet.PROTO_ICMPV6:
- return ICMPV6Packet(data)
- return data
-
- def encode(self):
- payload = self._encodePayload()
- fields = [self.version or 6, self.traffic_class or 0, self.flow_label or 0,
- self.len or len(payload), self.proto, self.hop_limit or 255,
- self.src, self.dst]
-
- # Need to compute checksum for UDP, ICMPV6 here since it includes the IPv6 pseudo header.
- checksummedProtos = { IPv6Packet.PROTO_UDP: UDPPacket,
- IPv6Packet.PROTO_ICMPV6: ICMPV6Packet }
- payloadClass = checksummedProtos.get(self.proto)
- if (payloadClass != None and
- issubclass(self.payload.__class__, payloadClass) and
- self.payload.checksum == None):
-
- header = IPv6Packet.UDPPseudoHeader(src = self.src, dst = self.dst, length = fields[3],
- proto = self.proto, payload = payload)
- payload = payloadClass(data = self.payload,
- checksum = IPv4Packet.computeChecksum(header.encode())).encode()
-
- return self._encodeFields(*tuple(fields)) + payload
-
-
-class ICMPV6Packet(Packet):
-
- TYPE_NEIGHBOR_SOLICITATION = 135
- TYPE_NEIGHBOR_ADVERTISMENT = 136
-
- def __init__(self, data = None, **initializer):
- super(ICMPV6Packet, self).__init__('8n8n16n',
- ('type', 'code', 'checksum'),
- data, **initializer)
-
- def _decodePayload(self, data):
- if self.type == ICMPV6Packet.TYPE_NEIGHBOR_SOLICITATION:
- return ICMPV6NeighborSolicitation(data)
- elif self.type == ICMPV6Packet.TYPE_NEIGHBOR_ADVERTISMENT:
- return ICMPV6NeighborAdvertisement(data)
- return data
-
-
-class ICMPV6NeighborDiscoveryOption(Packet):
-
- TYPE_SOURCE_LINK_LAYER_ADDRESS = 1
- TYPE_TARGET_LINK_LAYER_ADDRESS = 2
-
- def __init__(self, data = None, **initializer):
- super(ICMPV6NeighborDiscoveryOption, self).__init__('8n8n',
- ('type', 'length'),
- data, **initializer)
-
- def encode(self):
- payload = self._encodePayload()
- length = self.length
- if length == None:
- length = (len(payload) + 2 + 7) / 8
- payload += '\x00' * (length * 8 - len(payload) - 2)
- fields = [self.type, length]
- header = self._encodeFields(*tuple(fields))
- return header + payload
-
- @staticmethod
- def decodeOptions(data):
- options = []
- while len(data) > 2:
- type = ord(data[0])
- length = ord(data[1])
- if len(data) < length * 8:
- break
- options.append(ICMPV6NeighborDiscoveryOption(type = type, length = length,
- payload = data[0:length * 8]))
- data = data[length * 8:]
- return options
-
-
-class ICMPV6NeighborSolicitation(Packet):
-
- def __init__(self, data = None, **initializer):
- super(ICMPV6NeighborSolicitation, self).__init__('32s128s',
- ('reserved', 'target'),
- data, **initializer)
- self.target_lladdr = initializer.get('src_lladdr')
-
- def _decodePayload(self, data):
- for option in ICMPV6NeighborDiscoveryOption.decodeOptions(data):
- if option.type == ICMPV6NeighborDiscoveryOption.TYPE_SOURCE_LINK_LAYER_ADDRESS:
- self.src_lladdr = option.payload
- return None
-
- def _encodePayload(self):
- if self.src_lladdr:
- return ICMPV6NeighborDiscoveryOption(
- type = ICMPV6NeighborDiscoveryOption.TYPE_SOURCE_LINK_LAYER_ADDRESS,
- payload = self.src_lladdr).encode()
- return ''
-
-
-class ICMPV6NeighborAdvertisement(Packet):
-
- def __init__(self, data = None, **initializer):
- super(ICMPV6NeighborAdvertisement, self).__init__('1n1n1n29s128s',
- ('router', 'solicited', 'override',
- 'reserved', 'target'),
- data, **initializer)
- self.target_lladdr = initializer.get('target_lladdr')
-
- def _decodePayload(self, data):
- for option in ICMPV6NeighborDiscoveryOptions.decodeOptions(data):
- if option.type == ICMPV6NeighborDiscoveryOption.TYPE_TARGET_LINK_LAYER_ADDRESS:
- self.target_lladdr = option.payload
- return None
-
- def _encodePayload(self):
- if self.target_lladdr:
- return ICMPV6NeighborDiscoveryOption(
- type = ICMPV6NeighborDiscoveryOption.TYPE_TARGET_LINK_LAYER_ADDRESS,
- payload = self.target_lladdr).encode()
- return ''
-
-
-class UDPPacket(Packet):
-
- def __init__(self, data = None, **initializer):
- super(UDPPacket, self).__init__('16n16n16n16n',
- ('src', 'dst', 'len', 'checksum'),
- data, **initializer)
-
- def encode(self):
- payload = self._encodePayload()
- packetlen = self.len or (len(payload) + self.headerLen)
- fields = [self.src, self.dst, packetlen, self.checksum or 0]
- header = self._encodeFields(*tuple(fields))
- return header + payload
diff --git a/mac-tap/tuntap/test/tuntap/packet_codec.py b/mac-tap/tuntap/test/tuntap/packet_codec.py
deleted file mode 100644
index 09095966..00000000
--- a/mac-tap/tuntap/test/tuntap/packet_codec.py
+++ /dev/null
@@ -1,244 +0,0 @@
-# Copyright (c) 2011 Mattias Nissler <mattias.nissler@gmx.de>
-#
-# Redistribution and use in source and binary forms, with or without modification, are permitted
-# provided that the following conditions are met:
-#
-# 1. Redistributions of source code must retain the above copyright notice, this list of
-# conditions and the following disclaimer.
-# 2. Redistributions in binary form must reproduce the above copyright notice, this list of
-# conditions and the following disclaimer in the documentation and/or other materials provided
-# with the distribution.
-# 3. The name of the author may not be used to endorse or promote products derived from this
-# software without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES,
-# INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
-# PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT,
-# INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED
-# TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
-# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
-# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
-# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-import functools
-import socket
-
-from tuntap.tun_tap_harness import TunHarness, TapHarness
-from tuntap.packet import (
- ARPPacket,
- EthernetFrame,
- ICMPV6Packet,
- ICMPV6NeighborAdvertisement,
- ICMPV6NeighborSolicitation,
- IPv4Packet,
- IPv6Packet,
- TunAFFrame,
- UDPPacket
-)
-from tuntap.packet_reader import PacketReader, SelectPacketSource
-
-class PacketCodec(object):
- """
- Helper for tests that wish to send and receive packets. This provides the interface to send and
- receive packets at the IP/IPv6 level on both the network interface and char dev sides.
- """
-
- def __init__(self, af, listenAddress, newHarness, newPacketSource):
- self._af = af
- self._listenAddress = listenAddress
- self._newHarness = newHarness
- self._newPacketSource = newPacketSource
-
- def __str__(self):
- af_map = { socket.AF_INET: 'IN', socket.AF_INET6: 'IN6' }
- return '<%s<%s, %s>>' % (self.__class__.__name__,
- af_map[self._af],
- self._newPacketSource.__name__)
-
- def _decodePacket(self, packet):
- return packet
-
- def _framePacket(self, payload):
- return payload
-
- def _frameExpectation(self, expectation):
- return expectation
-
- @property
- def af(self):
- return self._af
-
- @property
- def addr(self):
- if self._af == socket.AF_INET:
- return self._harness.addr
- elif self._af == socket.AF_INET6:
- return self._harness.addr6
- assert False
-
- @property
- def UDPPort(self):
- return self._recvSock.getsockname()[1]
-
- def start(self):
- self._harness = self._newHarness()
- self._harness.start()
- self._harness.up()
-
- self._sendSock = socket.socket(self.addr.af, socket.SOCK_DGRAM)
- self._recvSock = socket.socket(self.addr.af, socket.SOCK_DGRAM)
- self._recvSock.bind((self._listenAddress or self.addr.local, 0))
-
- self._reader = PacketReader(source = self._newPacketSource(self._harness.char_dev.fileno()),
- skip = True,
- decode = lambda packet : self._decodePacket(packet))
- self._sockReader = PacketReader(source = SelectPacketSource(self._recvSock.fileno()))
-
- self._reader.start()
- self._sockReader.start()
-
- def stop(self):
- self._sockReader.stop()
- self._reader.stop()
- self._harness.stop()
- self._sendSock.close()
- self._recvSock.close()
-
- def sendUDP(self, payload, addr):
- self._sendSock.sendto(payload, addr)
-
- def expectUDP(self, expectation):
- self._sockReader.expect(expectation)
-
- def runUDP(self):
- return self._sockReader.run()
-
- def sendPacket(self, payload):
- self._harness.char_dev.send(self._framePacket(payload))
-
- def expectPacket(self, expectation):
- self._reader.expect(self._frameExpectation(expectation))
-
- def runPacket(self):
- return self._reader.run()
-
-
-class TunPacketCodec(PacketCodec):
-
- def __init__(self, af, listenAddress, newPacketSource):
- super(TunPacketCodec, self).__init__(af, listenAddress, TunHarness, newPacketSource)
-
- def _decodePacket(self, packet):
- # Look at the first byte to figure out whether it's IPv4 or IPv6.
- version = (ord(packet[0]) & 0xf0) >> 4
- if version == 4:
- return IPv4Packet(packet)
- elif version == 6:
- return IPv6Packet(packet)
- else:
- return packet
-
-
-class TunAFPacketCodec(PacketCodec):
-
- def __init__(self, af, listenAddress, newPacketSource):
- super(TunAFPacketCodec, self).__init__(af, listenAddress, TunHarness, newPacketSource)
-
- def _decodePacket(self, packet):
- return TunAFFrame(packet)
-
- def _framePacket(self, payload):
- return TunAFFrame(af = self.addr.af, payload = payload).encode()
-
- def _frameExpectation(self, expectation):
- return { 'af': self.addr.af,
- 'payload': expectation }
-
- def start(self):
- super(TunAFPacketCodec, self).start()
- self._harness.char_dev.prependAF = 1
-
-
-class TapPacketCodec(PacketCodec):
-
- TYPE_MAP = { socket.AF_INET: EthernetFrame.TYPE_IPV4,
- socket.AF_INET6: EthernetFrame.TYPE_IPV6 }
-
- ETHER_ADDR_ANY = '\xff\xff\xff\xff\xff\xff'
- ETHER_ADDR_REMOTE = '\x11\x22\x33\x44\x55\x66'
-
- def __init__(self, af, listenAddress, newPacketSource):
- super(TapPacketCodec, self).__init__(af, listenAddress, TapHarness, newPacketSource)
-
- def _decodePacket(self, packet):
- return EthernetFrame(packet)
-
- def _framePacket(self, payload):
- return EthernetFrame(src = TapPacketCodec.ETHER_ADDR_REMOTE,
- dst = self._harness.interface.lladdr.addr,
- type = TapPacketCodec.TYPE_MAP[self.addr.af],
- payload = payload).encode()
-
- def _frameExpectation(self, expectation):
- return { 'type': TapPacketCodec.TYPE_MAP[self.addr.af],
- 'src': self._harness.interface.lladdr.addr,
- 'payload': expectation }
-
- def _sendArpReply(self, packet):
- reply = EthernetFrame(dst = packet.src,
- src = TapPacketCodec.ETHER_ADDR_ANY,
- type = EthernetFrame.TYPE_ARP,
- payload = ARPPacket(htype = ARPPacket.HTYPE_ETHERNET,
- ptype = ARPPacket.PTYPE_IPV4,
- hlen = ARPPacket.HLEN_ETHERNET,
- plen = ARPPacket.PLEN_IPV4,
- oper = ARPPacket.OPER_REPLY,
- sha = TapPacketCodec.ETHER_ADDR_REMOTE,
- spa = packet.payload.tpa,
- tha = packet.payload.sha,
- tpa = packet.payload.spa))
- self._harness.char_dev.send(reply.encode())
-
- def _sendNeighborAdvertisement(self, packet):
- reply = EthernetFrame(
- dst = packet.payload.payload.payload.src_lladdr,
- src = TapPacketCodec.ETHER_ADDR_ANY,
- type = EthernetFrame.TYPE_IPV6,
- payload = IPv6Packet(
- src = socket.inet_pton(self.addr.af, self.addr.remote),
- dst = packet.payload.src,
- proto = IPv6Packet.PROTO_ICMPV6,
- payload = ICMPV6Packet(
- type = ICMPV6Packet.TYPE_NEIGHBOR_ADVERTISMENT,
- payload = ICMPV6NeighborAdvertisement(
- solicited = 1,
- override = 1,
- target = socket.inet_pton(self.addr.af, self.addr.remote),
- target_lladdr = TapPacketCodec.ETHER_ADDR_REMOTE))))
- self._harness.char_dev.send(reply.encode())
-
- def start(self):
- super(TapPacketCodec, self).start()
- # Answer ARP resolution requests for the destination address.
- self._reader.expect(
- expectation = { 'type': EthernetFrame.TYPE_ARP,
- 'payload': { 'htype': ARPPacket.HTYPE_ETHERNET,
- 'ptype': ARPPacket.PTYPE_IPV4,
- 'hlen': ARPPacket.HLEN_ETHERNET,
- 'plen': ARPPacket.PLEN_IPV4,
- 'oper': ARPPacket.OPER_REQUEST,
- 'tpa': socket.inet_pton(self.addr.af, self.addr.remote) }},
- times = None,
- action = functools.partial(TapPacketCodec._sendArpReply, self))
- # Answer Neighbor Solicitation requests for IPv6.
- self._reader.expect(
- expectation = {
- 'type': EthernetFrame.TYPE_IPV6,
- 'payload': {
- 'proto': IPv6Packet.PROTO_ICMPV6,
- 'payload': {
- 'type': ICMPV6Packet.TYPE_NEIGHBOR_SOLICITATION,
- 'payload': {
- 'target': socket.inet_pton(self.addr.af, self.addr.remote) }}}},
- times = None,
- action = functools.partial(TapPacketCodec._sendNeighborAdvertisement, self))
diff --git a/mac-tap/tuntap/test/tuntap/packet_reader.py b/mac-tap/tuntap/test/tuntap/packet_reader.py
deleted file mode 100644
index b92d3ac8..00000000
--- a/mac-tap/tuntap/test/tuntap/packet_reader.py
+++ /dev/null
@@ -1,270 +0,0 @@
-# Copyright (c) 2011 Mattias Nissler <mattias.nissler@gmx.de>
-#
-# Redistribution and use in source and binary forms, with or without modification, are permitted
-# provided that the following conditions are met:
-#
-# 1. Redistributions of source code must retain the above copyright notice, this list of
-# conditions and the following disclaimer.
-# 2. Redistributions in binary form must reproduce the above copyright notice, this list of
-# conditions and the following disclaimer in the documentation and/or other materials provided
-# with the distribution.
-# 3. The name of the author may not be used to endorse or promote products derived from this
-# software without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES,
-# INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
-# PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT,
-# INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED
-# TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
-# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
-# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
-# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-import errno
-import os
-import Queue
-import select
-import signal
-import socket
-import pickle
-import threading
-
-MAX_PACKET_SIZE = 4096
-
-def handleEAgain(fn, *args, **kwargs):
- """
- Wraps a function call in loop, restarting on EAGAIN.
- """
- while True:
- try:
- return fn(*args, **kwargs)
- except EnvironmentError as e:
- if e.errno != errno.EAGAIN:
- raise
- except:
- raise
-
-
-class BlockingPacketSource(object):
- """
- In order to be able to test blocking reads and not hang forever if the expected data never
- arrives, we do the blocking read call in a forked subprocess that forwards the data read from
- the fd over a domain socket.
- """
-
- def __init__(self, fd):
- (self._rsock, wsock) = socket.socketpair(socket.AF_UNIX, socket.SOCK_DGRAM)
- child = os.fork()
- if child != 0:
- wsock.close()
- self._child = child
- return
-
- self._rsock.close()
-
- # This is the read loop in the forked process and it won't quit until either the process
- # gets killed or there is a read error.
- try:
- while True:
- packet = handleEAgain(os.read, fd, MAX_PACKET_SIZE)
- handleEAgain(wsock.send, pickle.dumps((0, packet)))
- if len(packet) == 0:
- break
- except KeyboardInterrupt:
- pass
- except EnvironmentError as e:
- handleEAgain(wsock.send, pickle.dumps((e.errno, '')))
- finally:
- os.close(fd)
- wsock.close()
- os._exit(os.EX_OK)
-
- def read(self, killpipe):
- (r, w, x) = select.select([self._rsock, killpipe], [], [])
- if killpipe in r:
- return None
- if self._rsock in r:
- try:
- return handleEAgain(self._rsock.recv, MAX_PACKET_SIZE)
- except EnvironmentError as e:
- # If there's a read error on the subprocess, it'll close the socket.
- if e.errno != errno.ECONNRESET:
- raise e
- return None
-
- def stop(self):
- os.kill(self._child, signal.SIGINT)
- os.waitpid(self._child, 0)
- self._rsock.close()
-
-
-class SelectPacketSource(object):
- """
- Reads data from a file descriptor, waiting for input using select().
- """
-
- def __init__(self, fd):
- self._fd = fd
-
- def read(self, killpipe):
- (r, w, x) = select.select([self._fd, killpipe], [], [])
- if killpipe in r:
- return None
- if self._fd in r:
- packet = handleEAgain(os.read, self._fd, MAX_PACKET_SIZE)
- return pickle.dumps((0, packet))
- return None
-
- def stop(self):
- pass
-
-class Expectation(object):
- """
- Describes an expectation. Expectations are specified as dictionaries to match the packet
- against. Entries may specify nested dictionaries for recursive matching and callables can be
- used as predicates. Any other entry will be compared to the corresponding value in the packet.
- """
-
- def __init__(self, expectation, times, action):
- self._expectation = expectation
- self._times = times
- self._action = action
-
- @property
- def active(self):
- return self._times == None or self.pending
-
- @property
- def pending(self):
- return self._times != None and self._times > 0
-
- def check(self, packet):
- #print 'Matching %s against %s' % (packet, self._expectation)
- if self.active and Expectation._matches(packet, self._expectation):
- if self._times:
- self._times -= 1
- if callable(self._action):
- self._action(packet)
- return True
- return False
-
- @staticmethod
- def _matches(packet, expectation):
- if isinstance(expectation, dict):
- for (name, entry) in expectation.iteritems():
- try:
- val = getattr(packet, name)
- except AttributeError:
- return False
- if not Expectation._matches(val, entry):
- return False
- return True
- elif callable(expectation):
- return expectation(packet)
- else:
- return packet == expectation
-
-
-class PacketReader(object):
- """
- Takes care of reading packets and matching them against expectations.
- """
-
- def __init__(self, source, decode = str, skip = False):
- """
- Initializes a new reader.
-
- Args:
- source: packet source to read packets from.
- decode: packet decoding function.
- skip: whether non-matching packets are to be skipped.
- """
- self._source = source
- self._decode = decode
- self._skip = skip
- self._expectations = []
- self._packets = Queue.Queue()
- self._shutdownPipe = os.pipe()
- self._stop = threading.Event()
-
- def start(self):
- self._readThread = threading.Thread(target = self)
- self._readThread.start()
-
- def stop(self):
- self._stop.set()
- handleEAgain(os.write, self._shutdownPipe[1], 'stop')
- self._readThread.join()
- self._source.stop()
- os.close(self._shutdownPipe[0])
- os.close(self._shutdownPipe[1])
-
- def __call__(self):
- """
- Reading service function, runs in a separate thread.
- """
- try:
- while True:
- packet = handleEAgain(self._source.read, self._shutdownPipe[0])
- if not packet:
- self._packets.put((0, ''))
- break
- self._packets.put(pickle.loads(packet))
- except EnvironmentError as e:
- # The read() is racing against stop(), ignore these situations.
- if e.errno == EIO and self._stop.isSet():
- self._packets.put((0, ''))
- self._packets.put((e.errno, ''))
-
- def expect(self, expectation, times = 1, action = None):
- """
- Adds an expectation for a packet to be received.
-
- Args:
- expectation: Dictionary describing the expected packet.
- times: Number of packets expected. None for unlimited.
- action: A callback to run after the packet has been received.
- """
- assert times != 0
- self._expectations.append(Expectation(expectation, times, action))
-
- @property
- def expectationsPending(self):
- for e in self._expectations:
- if e.pending:
- return True
- return False
-
- def run(self, timeout = 1):
- """
- Runs the packet reader, waiting for all limited expectations to be met.
-
- Args:
- timeout: Wait timeout in seconds.
- """
- while self.expectationsPending:
- try:
- (code, payload) = self._packets.get(True, timeout)
- except Queue.Empty:
- # No packet received.
- break
-
- if code != 0:
- # read error, re-raise.
- raise OSError((code, os.strerror(code)))
-
- if len(payload) == 0:
- # EOF on read.
- break
-
- # decode the packet and match it against expectation.
- matches = False
- for e in self._expectations:
- if e.check(self._decode(payload)):
- matches = True
- break
- if not matches and not self._skip:
- return False
-
- return not self.expectationsPending
-
diff --git a/mac-tap/tuntap/test/tuntap/route.py b/mac-tap/tuntap/test/tuntap/route.py
deleted file mode 100644
index b59707ec..00000000
--- a/mac-tap/tuntap/test/tuntap/route.py
+++ /dev/null
@@ -1,112 +0,0 @@
-# Copyright (c) 2011 Mattias Nissler <mattias.nissler@gmx.de>
-#
-# Redistribution and use in source and binary forms, with or without modification, are permitted
-# provided that the following conditions are met:
-#
-# 1. Redistributions of source code must retain the above copyright notice, this list of
-# conditions and the following disclaimer.
-# 2. Redistributions in binary form must reproduce the above copyright notice, this list of
-# conditions and the following disclaimer in the documentation and/or other materials provided
-# with the distribution.
-# 3. The name of the author may not be used to endorse or promote products derived from this
-# software without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES,
-# INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
-# PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT,
-# INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED
-# TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
-# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
-# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
-# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-import socket
-import struct
-
-# from net/route.h
-RTM_ADD = 0x1 # Add Route
-RTM_DELETE = 0x2 # Delete Route
-RTM_CHANGE = 0x3 # Change Metrics or flags
-RTM_GET = 0x4 # Report Metrics
-RTM_LOSING = 0x5 # Kernel Suspects Partitioning
-RTM_REDIRECT = 0x6 # Told to use different route
-RTM_MISS = 0x7 # Lookup failed on this address
-RTM_LOCK = 0x8 # fix specified metrics
-RTM_OLDADD = 0x9 # caused by SIOCADDRT
-RTM_OLDDEL = 0xa # caused by SIOCDELRT
-RTM_RESOLVE = 0xb # req to resolve dst to LL addr
-RTM_NEWADDR = 0xc # address being added to iface
-RTM_DELADDR = 0xd # address being removed from iface
-RTM_IFINFO = 0xe # iface going up/down etc.
-RTM_NEWMADDR = 0xf # mcast group membership being added to if
-RTM_DELMADDR = 0x10 # mcast group membership being deleted
-
-RTF_UP = 0x1 # route usable
-RTF_GATEWAY = 0x2 # destination is a gateway
-RTF_HOST = 0x4 # host entry (net otherwise)
-RTF_REJECT = 0x8 # host or net unreachable
-RTF_DYNAMIC = 0x10 # created dynamically (by redirect)
-RTF_MODIFIED = 0x20 # modified dynamically (by redirect)
-RTF_DONE = 0x40 # message confirmed
-RTF_DELCLONE = 0x80 # delete cloned route
-RTF_CLONING = 0x100 # generate new routes on use
-RTF_XRESOLVE = 0x200 # external daemon resolves name
-RTF_LLINFO = 0x400 # generated by link layer (e.g. ARP)
-RTF_STATIC = 0x800 # manually added
-RTF_BLACKHOLE = 0x1000 # just discard pkts (during updates)
-RTF_PROTO2 = 0x4000 # protocol specific routing flag
-RTF_PROTO1 = 0x8000 # protocol specific routing flag
-
-RTF_PRCLONING = 0x10000 # protocol requires cloning
-RTF_WASCLONED = 0x20000 # route generated through cloning
-RTF_PROTO3 = 0x40000 # protocol specific routing flag
-RTF_LOCAL = 0x200000 # route represents a local address
-RTF_BROADCAST = 0x400000 # route represents a bcast address
-RTF_MULTICAST = 0x800000 # route represents a mcast address
-RTF_IFSCOPE = 0x1000000 # has valid interface scope
-RTF_CONDEMNED = 0x2000000 # defunct; no longer modifiable
-
-RTA_DST = 0x1 # destination sockaddr present
-RTA_GATEWAY = 0x2 # gateway sockaddr present
-RTA_NETMASK = 0x4 # netmask sockaddr present
-RTA_GENMASK = 0x8 # cloning mask sockaddr present
-RTA_IFP = 0x10 # interface name sockaddr present
-RTA_IFA = 0x20 # interface addr sockaddr present
-RTA_AUTHOR = 0x40 # sockaddr for author of redirect
-RTA_BRD = 0x80 # for NEWADDR, broadcast or p-p dest addr
-
-RTM_VERSION = 5
-
-PF_ROUTE = 17
-
-STRUCT_RTMSG = struct.Struct('HBBHiiHiiiI3Ii10I')
-
-def _sendRouteMsg(type, index = 0, flags = 0, addrs = {}):
- def add_addr((addr_flags, payload), (addr, flag)):
- if not addr:
- return (addr_flags, payload)
-
- return (addr_flags | flag, payload + addr.encode())
-
- (addr_flags, payload) = reduce(add_addr,
- [ (addrs['dst'], RTA_DST),
- (addrs['gateway'], RTA_GATEWAY),
- (addrs['netmask'], RTA_NETMASK) ],
- (0, ''))
- msglen = STRUCT_RTMSG.size + len(payload)
- data = STRUCT_RTMSG.pack(msglen, RTM_VERSION, type, index, flags, addr_flags, *((0,) * 19))
-
- sock = socket.socket(PF_ROUTE, socket.SOCK_RAW)
- try:
- sock.send(data + payload)
- finally:
- sock.close()
-
-def addNet(dst = None, gateway = None, netmask = None, interface = None):
- flags = RTF_STATIC | RTF_UP
- if gateway:
- flags |= RTF_GATEWAY
- elif interface:
- gateway = interface
- _sendRouteMsg(type = RTM_ADD, flags = flags,
- addrs = dict(dst = dst, gateway = gateway, netmask = netmask))
diff --git a/mac-tap/tuntap/test/tuntap/sockaddr.py b/mac-tap/tuntap/test/tuntap/sockaddr.py
deleted file mode 100644
index 59edbfc0..00000000
--- a/mac-tap/tuntap/test/tuntap/sockaddr.py
+++ /dev/null
@@ -1,124 +0,0 @@
-# Copyright (c) 2011 Mattias Nissler <mattias.nissler@gmx.de>
-#
-# Redistribution and use in source and binary forms, with or without modification, are permitted
-# provided that the following conditions are met:
-#
-# 1. Redistributions of source code must retain the above copyright notice, this list of
-# conditions and the following disclaimer.
-# 2. Redistributions in binary form must reproduce the above copyright notice, this list of
-# conditions and the following disclaimer in the documentation and/or other materials provided
-# with the distribution.
-# 3. The name of the author may not be used to endorse or promote products derived from this
-# software without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES,
-# INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
-# PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT,
-# INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED
-# TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
-# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
-# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
-# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-import socket
-import struct
-
-class SockaddrDl(object):
-
- AF_LINK = 18
- STRUCT = struct.Struct('BBH4B')
-
- def __init__(self, name, addr, type, index = 0, af = AF_LINK):
- self.af = af
- self.index = index
- self.type = type
- self.name = name
- self.addr = addr
-
- def __repr__(self):
- return 'SockaddrDl<%d, %d, %d, %s, %s>' % (self.af, self.index, self.type,
- self.name, repr(self.addr))
-
- def __eq__(self, other):
- return (self.af == other.af and self.index == other.index and self.type == other.type and
- self.name == other.name and self.addr == other.addr)
-
- def encode(self):
- # It's important to make this size 12 at least to meet sizeof(struct sockaddr_dl), routing
- # setup chokes if it's not.
- datalen = max(len(self.name) + len(self.addr), 12)
- namelen = datalen - len(self.addr)
- data = SockaddrDl.STRUCT.pack(SockaddrDl.STRUCT.size + datalen,
- self.af, self.index, self.type,
- namelen, len(self.addr), 0)
- return data + self.name + '\x00' * (namelen - len(self.name)) + self.addr
-
- @classmethod
- def decode(self, data):
- fields = SockaddrDl.STRUCT.unpack_from(data)
- pname = SockaddrDl.STRUCT.size
- paddr = pname + fields[4]
- pend = paddr + fields[5]
- return SockaddrDl(af = fields[1], index = fields[2], type = fields[3],
- name = data[pname:paddr], addr = data[paddr:pend])
-
-
-class SockaddrIn(object):
- """
- Python wrapper for struct sockaddr_in.
- """
-
- STRUCT = struct.Struct('BBH4s8x')
-
- def __init__(self, addr, port = 0, af = socket.AF_INET):
- self.addr = addr or '0.0.0.0'
- self.port = port
- self.af = af
-
- def __repr__(self):
- return 'SockaddrIn<%d, %d, %s>' % (self.af, self.port, self.addr)
-
- def __eq__(self, other):
- return self.encode() == other.encode()
-
- def encode(self):
- return SockaddrIn.STRUCT.pack(16, self.af, self.port, socket.inet_aton(self.addr))
-
- @classmethod
- def decode(cls, data):
- t = SockaddrIn.STRUCT.unpack(data)
- return SockaddrIn(addr = socket.inet_ntoa(t[3]), port = t[2], af = t[1])
-
-
-class SockaddrIn6(object):
- """
- Python wrapper for struct sockaddr_in6.
- """
-
- STRUCT = struct.Struct('BBHI16sI')
-
- def __init__(self, addr, port = 0, af = socket.AF_INET6, flowinfo = 0, scopeid = 0):
- self.addr = addr or '::0'
- self.port = port
- self.af = af
- self.flowinfo = flowinfo
- self.scopeid = scopeid
-
- def __repr__(self):
- return 'SockaddrIn6<%d, %d, %s, %d, %d>' % (self.af, self.port, self.addr,
- self.flowinfo, self.scopeid)
-
- def __eq__(self, other):
- return self.encode() == other.encode()
-
- def encode(self):
- return SockaddrIn6.STRUCT.pack(28, self.af, self.port, self.flowinfo,
- socket.inet_pton(socket.AF_INET6, self.addr), self.scopeid)
-
- @classmethod
- def decode(cls, data):
- t = SockaddrIn6.STRUCT.unpack(data)
- return SockaddrIn6(addr = socket.inet_ntop(socket.AF_INET6, t[4]), port = t[2], af = t[1],
- flowinfo = t[3], scopeid = t[5])
-
-
diff --git a/mac-tap/tuntap/test/tuntap/test_char_dev.py b/mac-tap/tuntap/test/tuntap/test_char_dev.py
deleted file mode 100644
index ae34bf7c..00000000
--- a/mac-tap/tuntap/test/tuntap/test_char_dev.py
+++ /dev/null
@@ -1,86 +0,0 @@
-# Copyright (c) 2011 Mattias Nissler <mattias.nissler@gmx.de>
-#
-# Redistribution and use in source and binary forms, with or without modification, are permitted
-# provided that the following conditions are met:
-#
-# 1. Redistributions of source code must retain the above copyright notice, this list of
-# conditions and the following disclaimer.
-# 2. Redistributions in binary form must reproduce the above copyright notice, this list of
-# conditions and the following disclaimer in the documentation and/or other materials provided
-# with the distribution.
-# 3. The name of the author may not be used to endorse or promote products derived from this
-# software without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES,
-# INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
-# PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT,
-# INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED
-# TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
-# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
-# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
-# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-import errno
-import os
-from tuntap.char_dev_harness import TunCharDevHarness, TapCharDevHarness
-from unittest import TestCase
-
-class TestCharDev(TestCase):
-
- def __init__(self, name, newHarness):
- super(TestCharDev, self).__init__(name)
- self._newHarness = newHarness
-
- def setUp(self):
- self.char_dev = self._newHarness()
- self.char_dev.open()
-
- def tearDown(self):
- self.char_dev.close()
-
- def test_Open(self):
- pass
-
- def test_OpenTwiceBusy(self):
- second = self._newHarness(self.char_dev.unit)
- try:
- second.open()
- second.close()
- self.fail()
- except OSError as e:
- self.assertEqual(errno.EBUSY, e.errno)
-
- def test_ReadFails(self):
- try:
- os.read(self.char_dev.fileno(), 1)
- self.fail()
- except OSError as e:
- self.assertEqual(errno.EIO, e.errno)
-
- def test_WriteFails(self):
- try:
- os.write(self.char_dev.fileno(), '')
- self.fail()
- except OSError as e:
- self.assertEqual(errno.EIO, e.errno)
-
-
-class TestTunCharDev(TestCharDev):
-
- def __init__(self, name):
- super(TestTunCharDev, self).__init__(name, TunCharDevHarness)
-
- def test_AFPrepend(self):
- self.assertFalse(self.char_dev.prependAF)
-
- self.char_dev.prependAF = 1
- self.assertTrue(self.char_dev.prependAF)
-
- self.char_dev.prependAF = 0
- self.assertFalse(self.char_dev.prependAF)
-
-
-class TestTapCharDev(TestCharDev):
-
- def __init__(self, name):
- super(TestTapCharDev, self).__init__(name, TapCharDevHarness)
diff --git a/mac-tap/tuntap/test/tuntap/test_interface.py b/mac-tap/tuntap/test/tuntap/test_interface.py
deleted file mode 100644
index 7cf19b20..00000000
--- a/mac-tap/tuntap/test/tuntap/test_interface.py
+++ /dev/null
@@ -1,120 +0,0 @@
-# Copyright (c) 2011 Mattias Nissler <mattias.nissler@gmx.de>
-#
-# Redistribution and use in source and binary forms, with or without modification, are permitted
-# provided that the following conditions are met:
-#
-# 1. Redistributions of source code must retain the above copyright notice, this list of
-# conditions and the following disclaimer.
-# 2. Redistributions in binary form must reproduce the above copyright notice, this list of
-# conditions and the following disclaimer in the documentation and/or other materials provided
-# with the distribution.
-# 3. The name of the author may not be used to endorse or promote products derived from this
-# software without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES,
-# INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
-# PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT,
-# INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED
-# TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
-# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
-# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
-# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-import errno
-import socket
-import unittest
-
-from tuntap.char_dev_harness import TunCharDevHarness, TapCharDevHarness
-from tuntap.interface_harness import Address, InterfaceHarness
-from tuntap.sockaddr import SockaddrDl, SockaddrIn, SockaddrIn6
-from tuntap.tun_tap_harness import TunHarness, TapHarness
-
-class TestInterface(unittest.TestCase):
-
- def __init__(self, name, harness):
- super(TestInterface, self).__init__(name)
- self.harness = harness
-
- def setUp(self):
- self.harness.start()
-
- def tearDown(self):
- self.harness.stop()
-
- def test_CloseWhileUp(self):
- self.harness.interface.flags |= InterfaceHarness.IFF_UP
- self.harness.char_dev.close()
- self.harness.start()
-
- def test_UpDown(self):
- self.harness.interface.flags |= InterfaceHarness.IFF_UP
- self.assertEquals(InterfaceHarness.IFF_UP,
- self.harness.interface.flags & InterfaceHarness.IFF_UP)
- self.harness.interface.flags &= ~InterfaceHarness.IFF_UP
- self.assertEquals(0,
- self.harness.interface.flags & InterfaceHarness.IFF_UP)
-
- def test_NetmaskAFFix(self):
- self.harness.interface.addIfAddr(local = self.harness.addr.sa_local,
- dst = self.harness.addr.sa_dst,
- mask = SockaddrIn(af = 0, addr = self.harness.addr.mask))
- for addr in self.harness.interface.getAddrs(socket.AF_INET):
- if addr[1] == self.harness.addr.sa_mask:
- return;
- self.fail()
-
- def test_Address(self):
- self.harness.interface.addIfAddr(local = self.harness.addr.sa_local,
- dst = self.harness.addr.sa_dst,
- mask = self.harness.addr.sa_mask)
- for addr in self.harness.interface.getAddrs(socket.AF_INET):
- if (addr[0] == self.harness.addr.sa_local and
- addr[1] == self.harness.addr.sa_mask and
- addr[2] == self.harness.addr.sa_dst):
- return
- self.fail()
-
- def test_Address6(self):
- def compare(expected, actual):
- return (expected or SockaddrIn6(af = 0, addr = None)) == actual
-
- self.harness.interface.addIfAddr6(local = self.harness.addr6.sa_local,
- dst = self.harness.addr6.sa_dst,
- mask = self.harness.addr6.sa_mask)
- for addr in self.harness.interface.getAddrs(socket.AF_INET6):
- if (compare(addr[0], self.harness.addr6.sa_local) and
- compare(addr[1], self.harness.addr6.sa_mask) and
- compare(addr[2], self.harness.addr6.sa_dst)):
- return
- self.fail()
-
-
-class TestTunInterface(TestInterface):
-
- def __init__(self, name):
- super(TestTunInterface, self).__init__(name, TunHarness())
-
- def test_Flags(self):
- self.assertEquals(InterfaceHarness.IFF_POINTOPOINT |
- InterfaceHarness.IFF_RUNNING |
- InterfaceHarness.IFF_SIMPLEX |
- InterfaceHarness.IFF_MULTICAST,
- self.harness.interface.flags)
-
-
-class TestTapInterface(TestInterface):
-
- def __init__(self, name):
- super(TestTapInterface, self).__init__(name, TapHarness())
-
- def test_Flags(self):
- self.assertEquals(InterfaceHarness.IFF_BROADCAST |
- InterfaceHarness.IFF_RUNNING |
- InterfaceHarness.IFF_SIMPLEX |
- InterfaceHarness.IFF_MULTICAST,
- self.harness.interface.flags)
-
- def test_SetLladdr(self):
- addr = SockaddrDl(name = '', addr = '\x11\x22\x33\x44\x55\x66', type = 0)
- self.harness.interface.lladdr = addr
- self.assertEquals(addr.addr, self.harness.interface.lladdr.addr)
diff --git a/mac-tap/tuntap/test/tuntap/test_ip.py b/mac-tap/tuntap/test/tuntap/test_ip.py
deleted file mode 100644
index b910785c..00000000
--- a/mac-tap/tuntap/test/tuntap/test_ip.py
+++ /dev/null
@@ -1,218 +0,0 @@
-# Copyright (c) 2011 Mattias Nissler <mattias.nissler@gmx.de>
-#
-# Redistribution and use in source and binary forms, with or without modification, are permitted
-# provided that the following conditions are met:
-#
-# 1. Redistributions of source code must retain the above copyright notice, this list of
-# conditions and the following disclaimer.
-# 2. Redistributions in binary form must reproduce the above copyright notice, this list of
-# conditions and the following disclaimer in the documentation and/or other materials provided
-# with the distribution.
-# 3. The name of the author may not be used to endorse or promote products derived from this
-# software without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES,
-# INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
-# PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT,
-# INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED
-# TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
-# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
-# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
-# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-import functools
-import socket
-import struct
-from unittest import TestCase
-
-from tuntap.packet import IPv4Packet, IPv6Packet, UDPPacket
-from tuntap.packet_codec import TapPacketCodec
-from tuntap.packet_reader import SelectPacketSource
-
-class TestIO(TestCase):
-
- def __init__(self, name, af, listenAddress, codec):
- super(TestIO, self).__init__(name)
- self._codec = codec(af, listenAddress);
-
- def __str__(self):
- return '%s [%s]' % (super(TestIO, self).__str__(), str(self._codec))
-
- def setUp(self):
- super(TestIO, self).setUp()
- self._codec.start()
-
- def tearDown(self):
- self._codec.stop()
- super(TestIO, self).tearDown()
-
-
-class TestIp(TestIO):
-
- def __init__(self, name, codec):
- super(TestIp, self).__init__(name, socket.AF_INET, None, codec)
-
- def test_Send(self):
- payload = 'knock, knock!'
- port = 12345
- self._codec.sendUDP(payload, (self._codec.addr.remote, port))
- self._codec.expectPacket(
- { 'version': 4,
- 'src': socket.inet_pton(self._codec.af, self._codec.addr.local),
- 'dst': socket.inet_pton(self._codec.af, self._codec.addr.remote),
- 'proto': IPv4Packet.PROTO_UDP,
- 'payload': { 'dst': port,
- 'payload': payload } })
- self.assertTrue(self._codec.runPacket())
-
- def test_Recv(self):
- srcport = 23456
- payload = 'who\'s there?'
- packet = IPv4Packet(proto = IPv4Packet.PROTO_UDP,
- src = socket.inet_pton(self._codec.af, self._codec.addr.remote),
- dst = socket.inet_pton(self._codec.af, self._codec.addr.local),
- payload = UDPPacket(src = srcport,
- dst = self._codec.UDPPort,
- payload = payload))
- self._codec.sendPacket(packet.encode())
- self._codec.expectUDP(payload)
- self.assertTrue(self._codec.runUDP())
-
-
-class TestIp6(TestIO):
-
- def __init__(self, name, codec):
- super(TestIp6, self).__init__(name, socket.AF_INET6, None, codec)
-
- def test_Send(self):
- payload = 'knock, knock!'
- port = 12345
- self._codec.sendUDP(payload, (self._codec.addr.remote, port))
- self._codec.expectPacket(
- { 'version': 6,
- 'src': socket.inet_pton(self._codec.af, self._codec.addr.local),
- 'dst': socket.inet_pton(self._codec.af, self._codec.addr.remote),
- 'proto': IPv6Packet.PROTO_UDP,
- 'payload': { 'dst': port,
- 'payload': payload } })
- self.assertTrue(self._codec.runPacket())
-
- def test_Recv(self):
- srcport = 23456
- payload = 'who\'s there?'
- packet = IPv6Packet(proto = IPv6Packet.PROTO_UDP,
- src = socket.inet_pton(self._codec.af, self._codec.addr.remote),
- dst = socket.inet_pton(self._codec.af, self._codec.addr.local),
- payload = UDPPacket(src = srcport,
- dst = self._codec.UDPPort,
- payload = payload))
- self._codec.sendPacket(packet.encode())
- self._codec.expectUDP(payload)
- self.assertTrue(self._codec.runUDP())
-
-
-class TestMulticast(TestIO):
-
- MULTICAST_GROUP = '224.1.2.3'
-
- def __init__(self, name, codec):
- super(TestMulticast, self).__init__(name, socket.AF_INET, TestMulticast.MULTICAST_GROUP,
- codec)
-
- def setUp(self):
- super(TestMulticast, self).setUp()
- mreq = struct.pack('4s4s',
- socket.inet_pton(self._codec.af, TestMulticast.MULTICAST_GROUP),
- socket.inet_pton(self._codec.af, self._codec.addr.local))
- self._codec._recvSock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq)
- self._codec._sendSock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 1)
- self._codec._sendSock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_IF,
- socket.inet_pton(self._codec.af, self._codec.addr.local))
-
- def test_Send(self):
- payload = 'knock, knock!'
- port = 12345
- self._codec.sendUDP(payload, (TestMulticast.MULTICAST_GROUP, port))
- self._codec.expectPacket(
- { 'version': 4,
- 'src': socket.inet_pton(self._codec.af, self._codec.addr.local),
- 'dst': socket.inet_pton(self._codec.af, TestMulticast.MULTICAST_GROUP),
- 'proto': IPv4Packet.PROTO_UDP,
- 'payload': { 'dst': port,
- 'payload': payload } })
- self.assertTrue(self._codec.runPacket())
-
- def test_Recv(self):
- srcport = 23456
- payload = 'who\'s there?'
- packet = IPv4Packet(proto = IPv4Packet.PROTO_UDP,
- src = socket.inet_pton(self._codec.af, self._codec.addr.remote),
- dst = socket.inet_pton(self._codec.af, TestMulticast.MULTICAST_GROUP),
- payload = UDPPacket(src = srcport,
- dst = self._codec.UDPPort,
- payload = payload))
- self._codec.sendPacket(packet.encode())
- self._codec.expectUDP(payload)
- self.assertTrue(self._codec.runUDP())
-
-
-class TestMulticast6(TestIO):
-
- MULTICAST_GROUP = 'ff05::114'
-
- def __init__(self, name, codec):
- super(TestMulticast6, self).__init__(name, socket.AF_INET6, TestMulticast6.MULTICAST_GROUP,
- codec)
-
- def setUp(self):
- super(TestMulticast6, self).setUp()
- mreq = struct.pack('16sI',
- socket.inet_pton(self._codec.af, TestMulticast6.MULTICAST_GROUP),
- self._codec._harness.interface.index)
- self._codec._recvSock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_JOIN_GROUP, mreq)
- self._codec._sendSock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_MULTICAST_HOPS, 1)
- self._codec._sendSock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_MULTICAST_IF,
- self._codec._harness.interface.index)
-
- def test_Send(self):
- payload = 'knock, knock!'
- port = 12345
- self._codec.sendUDP(payload, (TestMulticast6.MULTICAST_GROUP, port))
- self._codec.expectPacket(
- { 'version': 6,
- 'dst': socket.inet_pton(self._codec.af, TestMulticast6.MULTICAST_GROUP),
- 'proto': IPv6Packet.PROTO_UDP,
- 'payload': { 'dst': port,
- 'payload': payload } })
- self.assertTrue(self._codec.runPacket())
-
- def test_Recv(self):
- srcport = 23456
- payload = 'who\'s there?'
- packet = IPv6Packet(proto = IPv6Packet.PROTO_UDP,
- src = socket.inet_pton(self._codec.af, self._codec.addr.remote),
- dst = socket.inet_pton(self._codec.af, TestMulticast6.MULTICAST_GROUP),
- payload = UDPPacket(src = srcport,
- dst = self._codec.UDPPort,
- payload = payload))
- self._codec.sendPacket(packet.encode())
- self._codec.expectUDP(payload)
- self.assertTrue(self._codec.runUDP())
-
-
-class TestTapLladdr(TestIp):
-
- def __init__(self, name):
- super(TestTapLladdr, self).__init__(name,
- lambda af, addr: TapPacketCodec(af, addr,
- SelectPacketSource))
-
- def setUp(self):
- super(TestTapLladdr, self).setUp()
-
- # Swap out the link-level address with a different address.
- lladdr = self._codec._harness.interface.lladdr
- mac_addr = list(lladdr.addr)
- mac_addr[5] = chr(ord(mac_addr[5]) ^ 0xff)
- lladdr.addr = ''.join(mac_addr)
- self._codec._harness.interface.lladdr = lladdr
diff --git a/mac-tap/tuntap/test/tuntap/tun_tap_harness.py b/mac-tap/tuntap/test/tuntap/tun_tap_harness.py
deleted file mode 100644
index cb07638e..00000000
--- a/mac-tap/tuntap/test/tuntap/tun_tap_harness.py
+++ /dev/null
@@ -1,96 +0,0 @@
-# Copyright (c) 2011 Mattias Nissler <mattias.nissler@gmx.de>
-#
-# Redistribution and use in source and binary forms, with or without modification, are permitted
-# provided that the following conditions are met:
-#
-# 1. Redistributions of source code must retain the above copyright notice, this list of
-# conditions and the following disclaimer.
-# 2. Redistributions in binary form must reproduce the above copyright notice, this list of
-# conditions and the following disclaimer in the documentation and/or other materials provided
-# with the distribution.
-# 3. The name of the author may not be used to endorse or promote products derived from this
-# software without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES,
-# INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
-# PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT,
-# INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED
-# TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
-# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
-# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
-# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-import errno
-import socket
-
-from tuntap.char_dev_harness import TunCharDevHarness, TapCharDevHarness
-from tuntap.interface_harness import Address, InterfaceHarness
-import tuntap.route
-
-class TunTapHarness(object):
-
- def __init__(self, name, newCharDevHarness, addr, addr6):
- self._newCharDevHarness = newCharDevHarness
- self.name = name
- self.addr = addr
- self.addr6 = addr6
-
- def start(self):
- self.char_dev = self._newCharDevHarness()
- self.char_dev.open()
- self.interface = InterfaceHarness(self.name, self.char_dev.unit)
-
- def up(self):
- self.interface.addIfAddr(local = self.addr.sa_local,
- dst = self.addr.sa_dst,
- mask = self.addr.sa_mask)
- self.interface.addIfAddr6(local = self.addr6.sa_local,
- dst = self.addr6.sa_dst,
- mask = self.addr6.sa_mask)
-
- # Lion automatically creates routes for IPv6 addresses, earlier versions don't.
- try:
- tuntap.route.addNet(dst = self.addr6.sa_remote,
- netmask = self.addr6.sa_mask,
- interface = self.interface.lladdr)
- except IOError as e:
- if e.errno != errno.EEXIST:
- raise e
-
- self.interface.flags |= InterfaceHarness.IFF_UP
-
- def stop(self):
- self.interface.flags &= ~InterfaceHarness.IFF_UP
- self.char_dev.close()
-
-
-class TunHarness(TunTapHarness):
-
- def __init__(self,
- addr = Address(af = socket.AF_INET,
- local = '10.0.0.1',
- remote = '10.0.0.2',
- dst = '10.0.0.2',
- mask = '255.255.255.255'),
- addr6 = Address(af = socket.AF_INET6,
- local = 'fd00::1',
- remote = 'fd00::2',
- dst = 'fd00::2',
- mask = 'ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff')):
- super(TunHarness, self).__init__('tun', TunCharDevHarness, addr, addr6)
-
-
-class TapHarness(TunTapHarness):
-
- def __init__(self,
- addr = Address(af = socket.AF_INET,
- local = '10.0.0.1',
- remote = '10.0.0.2',
- dst = '10.255.255.255',
- mask = '255.0.0.0'),
- addr6 = Address(af = socket.AF_INET6,
- local = 'fd00::1',
- remote = 'fd00::2',
- dst = None,
- mask = 'ffff:ffff:ffff:ffff::0')):
- super(TapHarness, self).__init__('tap', TapCharDevHarness, addr, addr6)
diff --git a/mac-tap/tuntap/test/tuntap/tun_tap_test_case.py b/mac-tap/tuntap/test/tuntap/tun_tap_test_case.py
deleted file mode 100644
index 28edc467..00000000
--- a/mac-tap/tuntap/test/tuntap/tun_tap_test_case.py
+++ /dev/null
@@ -1,40 +0,0 @@
-# Copyright (c) 2011 Mattias Nissler <mattias.nissler@gmx.de>
-#
-# Redistribution and use in source and binary forms, with or without modification, are permitted
-# provided that the following conditions are met:
-#
-# 1. Redistributions of source code must retain the above copyright notice, this list of
-# conditions and the following disclaimer.
-# 2. Redistributions in binary form must reproduce the above copyright notice, this list of
-# conditions and the following disclaimer in the documentation and/or other materials provided
-# with the distribution.
-# 3. The name of the author may not be used to endorse or promote products derived from this
-# software without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES,
-# INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
-# PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT,
-# INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED
-# TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
-# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
-# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
-# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-from unittest import TestCase
-
-class TunTapTestCase(TestCase):
-
- def __init__(self, name, harness):
- super(TunTapTestCase, self).__init__(name)
- self.harness = harness
-
- def __str__(self):
- return '%s [%s]' % (super(TunTapTestCase, self).__str__(),
- self.harness.__class__.__name__)
-
- def setUp(self):
- self.harness.start()
- self.harness.up()
-
- def tearDown(self):
- self.harness.stop()
diff --git a/mac-tap/tuntap/test/tuntap/tuntap_tests.py b/mac-tap/tuntap/test/tuntap/tuntap_tests.py
deleted file mode 100644
index fb5a431c..00000000
--- a/mac-tap/tuntap/test/tuntap/tuntap_tests.py
+++ /dev/null
@@ -1,83 +0,0 @@
-# Copyright (c) 2011 Mattias Nissler <mattias.nissler@gmx.de>
-#
-# Redistribution and use in source and binary forms, with or without modification, are permitted
-# provided that the following conditions are met:
-#
-# 1. Redistributions of source code must retain the above copyright notice, this list of
-# conditions and the following disclaimer.
-# 2. Redistributions in binary form must reproduce the above copyright notice, this list of
-# conditions and the following disclaimer in the documentation and/or other materials provided
-# with the distribution.
-# 3. The name of the author may not be used to endorse or promote products derived from this
-# software without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES,
-# INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
-# PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT,
-# INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED
-# TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
-# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
-# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
-# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-import argparse
-import itertools
-import re
-import sys
-import unittest
-
-from tuntap.packet_codec import TunPacketCodec, TunAFPacketCodec, TapPacketCodec
-from tuntap.packet_reader import BlockingPacketSource, SelectPacketSource
-
-from tuntap.test_char_dev import TestTunCharDev, TestTapCharDev
-from tuntap.test_interface import TestTunInterface, TestTapInterface
-from tuntap.test_ip import TestIp, TestIp6, TestMulticast, TestMulticast6, TestTapLladdr
-
-class FilteringTestSuite(unittest.TestSuite):
-
- def __init__(self, filter):
- super(FilteringTestSuite, self).__init__()
- self._matcher = re.compile(filter or '.*')
-
- def __iter__(self):
- return itertools.ifilter(lambda test : self._matcher.search(str(test)),
- super(FilteringTestSuite, self).__iter__())
-
-def loadTestsFromTestCase(testCaseClass, *args, **kwargs):
- testCaseNames = unittest.getTestCaseNames(testCaseClass, 'test_')
- return unittest.TestSuite(map(lambda n : testCaseClass(n, *args, **kwargs), testCaseNames))
-
-def main(argv):
- # Parse the command line.
- parser = argparse.ArgumentParser(description = 'Run tuntap unit tests.')
- parser.add_argument('--tests', type = str, nargs = '?', default = None,
- help = 'tests to run')
- parser.add_argument('--verbosity', type = int, nargs = '?', default = 2,
- help = 'verbosity level')
- options = parser.parse_args(argv[1:])
-
- # Gather tests and run them.
- loader = unittest.TestLoader()
- suite = FilteringTestSuite(options.tests)
- suite.addTests(loadTestsFromTestCase(TestTunCharDev))
- suite.addTests(loadTestsFromTestCase(TestTapCharDev))
- suite.addTests(loadTestsFromTestCase(TestTunInterface))
- suite.addTests(loadTestsFromTestCase(TestTapInterface))
-
- codecs = (TunPacketCodec, TunAFPacketCodec, TapPacketCodec)
- sources = (SelectPacketSource, BlockingPacketSource)
- tests = (TestIp, TestIp6, TestMulticast, TestMulticast6)
- for (test, codec, source) in [ (test, codec, source) for test in tests
- for codec in codecs
- for source in sources ]:
- suite.addTests(loadTestsFromTestCase(test, lambda af, addr: codec(af, addr, source)))
-
- suite.addTests(loadTestsFromTestCase(TestTapLladdr))
-
- runner = unittest.TextTestRunner(stream = sys.stderr,
- descriptions = True,
- verbosity = options.verbosity)
- runner.run(suite)
-
-if __name__ == '__main__':
- main(sys.argv)