From cbb6c944fea616547cec43f7f1ed6ea3cc4beb54 Mon Sep 17 00:00:00 2001 From: Christian Breunig Date: Tue, 22 Apr 2025 16:31:09 +0200 Subject: vyos.utils: T7122: fix IPv6 support in check_port_availability() Commit 4523e9c897b3 ("wireguard: T3763: Added check for listening port availability") added a function to check if a port is free to use or already occupied by a different running service. This has been done by trying to bind a socket to said given port. Unfortunately there is no support for IPv6 address-fdamily in both socketserver.TCPServer or socketserver.UDPServer. This must be done manually by deriving TCPServer and setting self.address_family for IPv6. The new implementation gets rid of both TCPServer and UDPServer and replaces it with a simple socket binding to a given IPv4/IPv6 address or any interface/ address if unspecified. In addition build time tests are added for the function to check for proper behavior during build time of vyos-1x. --- python/vyos/utils/network.py | 60 +++++++++++++++++++++++++++++--------------- 1 file changed, 40 insertions(+), 20 deletions(-) (limited to 'python') diff --git a/python/vyos/utils/network.py b/python/vyos/utils/network.py index 2f666f0ee..67d247fba 100644 --- a/python/vyos/utils/network.py +++ b/python/vyos/utils/network.py @@ -256,40 +256,60 @@ def mac2eui64(mac, prefix=None): except: # pylint: disable=bare-except return -def check_port_availability(ipaddress, port, protocol): +def check_port_availability(address: str=None, port: int=0, protocol: str='tcp') -> bool: """ - Check if port is available and not used by any service - Return False if a port is busy or IP address does not exists + Check if given port is available and not used by any service. + Should be used carefully for services that can start listening dynamically, because IP address may be dynamic too + + Args: + address: IPv4 or IPv6 address - if None, checks on all interfaces + port: TCP/UDP port number. + + + Returns: + False if a port is busy or IP address does not exists + True if a port is free and IP address exists """ - from socketserver import TCPServer, UDPServer + import socket from ipaddress import ip_address + # treat None as "any address" + address = address or '::' + # verify arguments try: - ipaddress = ip_address(ipaddress).compressed - except: - raise ValueError(f'The {ipaddress} is not a valid IPv4 or IPv6 address') + address = ip_address(address).compressed + except ValueError: + raise ValueError(f'{address} is not a valid IPv4 or IPv6 address') if port not in range(1, 65536): - raise ValueError(f'The port number {port} is not in the 1-65535 range') + raise ValueError(f'Port {port} is not in range 1-65535') if protocol not in ['tcp', 'udp']: - raise ValueError(f'The protocol {protocol} is not supported. Only tcp and udp are allowed') + raise ValueError(f'{protocol} is not supported - only tcp and udp are allowed') - # check port availability + protocol = socket.SOCK_STREAM if protocol == 'tcp' else socket.SOCK_DGRAM try: - if protocol == 'tcp': - server = TCPServer((ipaddress, port), None, bind_and_activate=True) - if protocol == 'udp': - server = UDPServer((ipaddress, port), None, bind_and_activate=True) - server.server_close() - except Exception as e: - # errno.h: - #define EADDRINUSE 98 /* Address already in use */ - if e.errno == 98: + addr_info = socket.getaddrinfo(address, port, socket.AF_UNSPEC, protocol) + except socket.gaierror as e: + print(f'Invalid address: {address}') + return False + + for family, socktype, proto, canonname, sockaddr in addr_info: + try: + with socket.socket(family, socktype, proto) as s: + s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + s.bind(sockaddr) + # port is free to use + return True + except OSError: + # port is already in use return False - return True + # if we reach this point, no socket was tested and we assume the port is + # already in use - better safe then sorry + return False + def is_listen_port_bind_service(port: int, service: str) -> bool: """Check if listen port bound to expected program name -- cgit v1.2.3