summaryrefslogtreecommitdiff
path: root/python/vyos/util.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/vyos/util.py')
-rw-r--r--python/vyos/util.py87
1 files changed, 73 insertions, 14 deletions
diff --git a/python/vyos/util.py b/python/vyos/util.py
index 05643a223..849b27d3b 100644
--- a/python/vyos/util.py
+++ b/python/vyos/util.py
@@ -562,12 +562,13 @@ def commit_in_progress():
# Since this will be used in scripts that modify the config outside of the CLI
# framework, those knowingly have root permissions.
# For everything else, we add a safeguard.
- from psutil import process_iter, NoSuchProcess
+ from psutil import process_iter
+ from psutil import NoSuchProcess
+ from getpass import getuser
from vyos.defaults import commit_lock
- idu = cmd('/usr/bin/id -u')
- if idu != '0':
- raise OSError("This functions needs root permissions to return correct results")
+ if getuser() != 'root':
+ raise OSError('This functions needs to be run as root to return correct results!')
for proc in process_iter():
try:
@@ -691,21 +692,21 @@ def find_device_file(device):
return None
-def dict_search(path, my_dict):
- """ Traverse Python dictionary (my_dict) delimited by dot (.).
+def dict_search(path, dict_object):
+ """ Traverse Python dictionary (dict_object) delimited by dot (.).
Return value of key if found, None otherwise.
- This is faster implementation then jmespath.search('foo.bar', my_dict)"""
- if not isinstance(my_dict, dict) or not path:
+ This is faster implementation then jmespath.search('foo.bar', dict_object)"""
+ if not isinstance(dict_object, dict) or not path:
return None
parts = path.split('.')
inside = parts[:-1]
if not inside:
- if path not in my_dict:
+ if path not in dict_object:
return None
- return my_dict[path]
- c = my_dict
+ return dict_object[path]
+ c = dict_object
for p in parts[:-1]:
c = c.get(p, {})
return c.get(parts[-1], None)
@@ -723,6 +724,23 @@ def dict_search_args(dict_object, *path):
dict_object = dict_object[item]
return dict_object
+def dict_search_recursive(dict_object, key):
+ """ Traverse a dictionary recurisvely and return the value of the key
+ we are looking for.
+
+ Thankfully copied from https://stackoverflow.com/a/19871956
+ """
+ if isinstance(dict_object, list):
+ for i in dict_object:
+ for x in dict_search_recursive(i, key):
+ yield x
+ elif isinstance(dict_object, dict):
+ if key in dict_object:
+ yield dict_object[key]
+ for j in dict_object.values():
+ for x in dict_search_recursive(j, key):
+ yield x
+
def get_interface_config(interface):
""" Returns the used encapsulation protocol for given interface.
If interface does not exist, None is returned.
@@ -805,8 +823,49 @@ def make_incremental_progressbar(increment: float):
while True:
yield
+def is_systemd_service_active(service):
+ """ Test is a specified systemd service is activated.
+ Returns True if service is active, false otherwise.
+ Copied from: https://unix.stackexchange.com/a/435317 """
+ tmp = cmd(f'systemctl show --value -p ActiveState {service}')
+ return bool((tmp == 'active'))
+
def is_systemd_service_running(service):
""" Test is a specified systemd service is actually running.
- Returns True if service is running, false otherwise. """
- tmp = run(f'systemctl is-active --quiet {service}')
- return bool((tmp == 0))
+ Returns True if service is running, false otherwise.
+ Copied from: https://unix.stackexchange.com/a/435317 """
+ tmp = cmd(f'systemctl show --value -p SubState {service}')
+ return bool((tmp == 'running'))
+
+def check_port_availability(ipaddress, port, protocol):
+ """
+ Check if port is available and not used by any service
+ Return False if a port is busy or IP address does not exists
+ Should be used carefully for services that can start listening
+ dynamically, because IP address may be dynamic too
+ """
+ from socketserver import TCPServer, UDPServer
+ from ipaddress import ip_address
+
+ # verify arguments
+ try:
+ ipaddress = ip_address(ipaddress).compressed
+ except:
+ raise ValueError(f'The {ipaddress} is not a valid IPv4 or IPv6 address')
+ if port not in range(1, 65536):
+ raise ValueError(f'The port number {port} is not in the 1-65535 range')
+ if protocol not in ['tcp', 'udp']:
+ raise ValueError(
+ f'The protocol {protocol} is not supported. Only tcp and udp are allowed'
+ )
+
+ # check port availability
+ try:
+ if protocol == 'tcp':
+ server = TCPServer((ipaddress, port), None, bind_and_activate=True)
+ if protocol == 'udp':
+ server = UDPServer((ipaddress, port), None, bind_and_activate=True)
+ server.server_close()
+ return True
+ except:
+ return False