diff options
| -rw-r--r-- | python/vyos/utils/network.py | 60 | ||||
| -rwxr-xr-x | src/conf_mode/interfaces_wireguard.py | 2 | ||||
| -rw-r--r-- | src/conf_mode/load-balancing_haproxy.py | 2 | ||||
| -rw-r--r-- | src/tests/test_utils_network.py | 11 | 
4 files changed, 52 insertions, 23 deletions
diff --git a/python/vyos/utils/network.py b/python/vyos/utils/network.py index 2f666f0ee..67d247fba 100644 --- a/python/vyos/utils/network.py +++ b/python/vyos/utils/network.py @@ -256,40 +256,60 @@ def mac2eui64(mac, prefix=None):          except:  # pylint: disable=bare-except              return -def check_port_availability(ipaddress, port, protocol): +def check_port_availability(address: str=None, port: int=0, protocol: str='tcp') -> bool:      """ -    Check if port is available and not used by any service -    Return False if a port is busy or IP address does not exists +    Check if given port is available and not used by any service. +      Should be used carefully for services that can start listening      dynamically, because IP address may be dynamic too + +    Args: +      address: IPv4 or IPv6 address - if None, checks on all interfaces +      port:  TCP/UDP port number. + + +    Returns: +      False if a port is busy or IP address does not exists +      True if a port is free and IP address exists      """ -    from socketserver import TCPServer, UDPServer +    import socket      from ipaddress import ip_address +    # treat None as "any address" +    address = address or '::' +      # verify arguments      try: -        ipaddress = ip_address(ipaddress).compressed -    except: -        raise ValueError(f'The {ipaddress} is not a valid IPv4 or IPv6 address') +        address = ip_address(address).compressed +    except ValueError: +        raise ValueError(f'{address} is not a valid IPv4 or IPv6 address')      if port not in range(1, 65536): -        raise ValueError(f'The port number {port} is not in the 1-65535 range') +        raise ValueError(f'Port {port} is not in range 1-65535')      if protocol not in ['tcp', 'udp']: -        raise ValueError(f'The protocol {protocol} is not supported. Only tcp and udp are allowed') +        raise ValueError(f'{protocol} is not supported - only tcp and udp are allowed') -    # check port availability +    protocol = socket.SOCK_STREAM if protocol == 'tcp' else socket.SOCK_DGRAM      try: -        if protocol == 'tcp': -            server = TCPServer((ipaddress, port), None, bind_and_activate=True) -        if protocol == 'udp': -            server = UDPServer((ipaddress, port), None, bind_and_activate=True) -        server.server_close() -    except Exception as e: -        # errno.h: -        #define EADDRINUSE  98  /* Address already in use */ -        if e.errno == 98: +        addr_info = socket.getaddrinfo(address, port, socket.AF_UNSPEC, protocol) +    except socket.gaierror as e: +        print(f'Invalid address: {address}') +        return False + +    for family, socktype, proto, canonname, sockaddr in addr_info: +        try: +            with socket.socket(family, socktype, proto) as s: +                s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) +                s.bind(sockaddr) +                # port is free to use +                return True +        except OSError: +            # port is already in use              return False -    return True +    # if we reach this point, no socket was tested and we assume the port is +    # already in use - better safe then sorry +    return False +  def is_listen_port_bind_service(port: int, service: str) -> bool:      """Check if listen port bound to expected program name diff --git a/src/conf_mode/interfaces_wireguard.py b/src/conf_mode/interfaces_wireguard.py index 192937dba..3ca6ecdca 100755 --- a/src/conf_mode/interfaces_wireguard.py +++ b/src/conf_mode/interfaces_wireguard.py @@ -97,7 +97,7 @@ def verify(wireguard):      if 'port' in wireguard and 'port_changed' in wireguard:          listen_port = int(wireguard['port']) -        if check_port_availability('0.0.0.0', listen_port, 'udp') is not True: +        if check_port_availability(None, listen_port, protocol='udp') is not True:              raise ConfigError(f'UDP port {listen_port} is busy or unavailable and '                                 'cannot be used for the interface!') diff --git a/src/conf_mode/load-balancing_haproxy.py b/src/conf_mode/load-balancing_haproxy.py index 5fd1beec9..16c9300c2 100644 --- a/src/conf_mode/load-balancing_haproxy.py +++ b/src/conf_mode/load-balancing_haproxy.py @@ -72,7 +72,7 @@ def verify(lb):              raise ConfigError(f'"{front} service port" must be configured!')          # Check if bind address:port are used by another service -        tmp_address = front_config.get('address', '0.0.0.0') +        tmp_address = front_config.get('address', None)          tmp_port = front_config['port']          if check_port_availability(tmp_address, int(tmp_port), 'tcp') is not True and \                  not is_listen_port_bind_service(int(tmp_port), 'haproxy'): diff --git a/src/tests/test_utils_network.py b/src/tests/test_utils_network.py index d68dec16f..92fde447d 100644 --- a/src/tests/test_utils_network.py +++ b/src/tests/test_utils_network.py @@ -1,4 +1,4 @@ -# Copyright (C) 2020-2024 VyOS maintainers and contributors +# Copyright (C) 2020-2025 VyOS maintainers and contributors  #  # This program is free software; you can redistribute it and/or modify  # it under the terms of the GNU General Public License version 2 or later as @@ -43,3 +43,12 @@ class TestVyOSUtilsNetwork(TestCase):          self.assertFalse(vyos.utils.network.is_loopback_addr('::2'))          self.assertFalse(vyos.utils.network.is_loopback_addr('192.0.2.1')) + +    def test_check_port_availability(self): +        self.assertTrue(vyos.utils.network.check_port_availability('::1', 8080)) +        self.assertTrue(vyos.utils.network.check_port_availability('127.0.0.1', 8080)) +        self.assertTrue(vyos.utils.network.check_port_availability(None, 8080, protocol='udp')) +        # We do not have 192.0.2.1 configured on this system +        self.assertFalse(vyos.utils.network.check_port_availability('192.0.2.1', 443)) +        # We do not have 2001:db8::1 configured on this system +        self.assertFalse(vyos.utils.network.check_port_availability('2001:db8::1', 80, protocol='udp'))  | 
