From 4523e9c897b3fa8d12c1b16c830c01820fee5583 Mon Sep 17 00:00:00 2001
From: zsdc <taras@vyos.io>
Date: Thu, 26 Aug 2021 18:15:36 +0300
Subject: wireguard: T3763: Added check for listening port availability

Each wireguard interface requires a unique port for in and out
connections. This commit adds the new `vyos.util` function -
`check_port_availability`, and uses it to be sure that a port
that is planned to be used for wireguard interface is truly
available and not used by any other services (not only other
wireguard interfaces).
---
 python/vyos/util.py | 39 +++++++++++++++++++++++++++++++++++++++
 1 file changed, 39 insertions(+)

(limited to 'python/vyos')

diff --git a/python/vyos/util.py b/python/vyos/util.py
index 8af46a6ee..fc2834a97 100644
--- a/python/vyos/util.py
+++ b/python/vyos/util.py
@@ -819,3 +819,42 @@ def is_systemd_service_running(service):
     Copied from: https://unix.stackexchange.com/a/435317 """
     tmp = cmd(f'systemctl show --value -p SubState {service}')
     return bool((tmp == 'running'))
+
+def check_port_availability(ipaddress, port, protocol):
+    """
+    Check if port is available and not used by any service
+    Return False if a port is busy or IP address does not exists
+    Should be used carefully for services that can start listening
+    dynamically, because IP address may be dynamic too
+    """
+    from socketserver import TCPServer, UDPServer
+    from ipaddress import ip_address
+
+    # verify arguments
+    try:
+        ipaddress = ip_address(ipaddress).compressed
+    except:
+        print(f'The {ipaddress} is not a valid IPv4 or IPv6 address')
+        return
+    if port not in range(1, 65536):
+        print(f'The port number {port} is not in the 1-65535 range')
+        return
+    if protocol not in ['tcp', 'udp']:
+        print(
+            f'The protocol {protocol} is not supported. Only tcp and udp are allowed'
+        )
+        return
+
+    # check port availability
+    try:
+        if protocol == 'tcp':
+            server = TCPServer((ipaddress, port), None, bind_and_activate=True)
+        if protocol == 'udp':
+            server = UDPServer((ipaddress, port), None, bind_and_activate=True)
+        server.server_close()
+        return True
+    except:
+        print(
+            f'The {protocol} port {port} on the {ipaddress} is busy or unavailable'
+        )
+        return False
-- 
cgit v1.2.3