summaryrefslogtreecommitdiff
path: root/python/vyos
diff options
context:
space:
mode:
authorChristian Breunig <christian@breunig.cc>2023-07-15 20:12:56 +0200
committerChristian Breunig <christian@breunig.cc>2023-07-15 20:13:12 +0200
commit5f77ccf91eb402c548fc91b2e080a4b2b86f4181 (patch)
tree9b926dedc07ef547ad8bbe539f89990249552414 /python/vyos
parent9285b9a571ee944daf6f17847a62f115146834a4 (diff)
downloadvyos-1x-5f77ccf91eb402c548fc91b2e080a4b2b86f4181.tar.gz
vyos-1x-5f77ccf91eb402c548fc91b2e080a4b2b86f4181.zip
T5195: vyos.util -> vyos.utils package refactoring part #2
Diffstat (limited to 'python/vyos')
-rw-r--r--python/vyos/configdep.py2
-rw-r--r--python/vyos/configdict.py2
-rw-r--r--python/vyos/ifconfig/vrrp.py19
-rw-r--r--python/vyos/remote.py5
-rw-r--r--python/vyos/template.py6
-rw-r--r--python/vyos/util.py347
-rw-r--r--python/vyos/utils/__init__.py3
-rw-r--r--python/vyos/utils/convert.py30
-rw-r--r--python/vyos/utils/kernel.py25
-rw-r--r--python/vyos/utils/list.py20
-rw-r--r--python/vyos/utils/misc.py66
-rw-r--r--python/vyos/utils/network.py24
-rw-r--r--python/vyos/utils/permission.py15
-rw-r--r--python/vyos/utils/system.py33
-rw-r--r--python/vyos/validate.py1
15 files changed, 231 insertions, 367 deletions
diff --git a/python/vyos/configdep.py b/python/vyos/configdep.py
index d4b2cc78f..7a8559839 100644
--- a/python/vyos/configdep.py
+++ b/python/vyos/configdep.py
@@ -18,7 +18,7 @@ import json
import typing
from inspect import stack
-from vyos.util import load_as_module
+from vyos.utils.system import load_as_module
from vyos.defaults import directories
from vyos.configsource import VyOSError
from vyos import ConfigError
diff --git a/python/vyos/configdict.py b/python/vyos/configdict.py
index 2411a04c4..f642d38f2 100644
--- a/python/vyos/configdict.py
+++ b/python/vyos/configdict.py
@@ -590,7 +590,7 @@ def get_accel_dict(config, base, chap_secrets):
Return a dictionary with the necessary interface config keys.
"""
- from vyos.util import get_half_cpus
+ from vyos.utils.system import get_half_cpus
from vyos.template import is_ipv4
dict = config.get_config_dict(base, key_mangling=('-', '_'),
diff --git a/python/vyos/ifconfig/vrrp.py b/python/vyos/ifconfig/vrrp.py
index 47aaadecd..fde903a53 100644
--- a/python/vyos/ifconfig/vrrp.py
+++ b/python/vyos/ifconfig/vrrp.py
@@ -1,4 +1,4 @@
-# Copyright 2019 VyOS maintainers and contributors <maintainers@vyos.io>
+# Copyright 2019-2023 VyOS maintainers and contributors <maintainers@vyos.io>
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
@@ -21,8 +21,11 @@ from time import time
from time import sleep
from tabulate import tabulate
-from vyos import util
from vyos.configquery import ConfigTreeQuery
+from vyos.utils.convert import seconds_to_human
+from vyos.utils.file import read_file
+from vyos.utils.file import wait_for_file_write_complete
+from vyos.utils.process import process_running
class VRRPError(Exception):
pass
@@ -84,21 +87,21 @@ class VRRP(object):
def is_running(cls):
if not os.path.exists(cls.location['pid']):
return False
- return util.process_running(cls.location['pid'])
+ return process_running(cls.location['pid'])
@classmethod
def collect(cls, what):
fname = cls.location[what]
try:
# send signal to generate the configuration file
- pid = util.read_file(cls.location['pid'])
- util.wait_for_file_write_complete(fname,
+ pid = read_file(cls.location['pid'])
+ wait_for_file_write_complete(fname,
pre_hook=(lambda: os.kill(int(pid), cls._signal[what])),
timeout=30)
- return util.read_file(fname)
+ return read_file(fname)
except OSError:
- # raised by vyos.util.read_file
+ # raised by vyos.utils.file.read_file
raise VRRPNoData("VRRP data is not available (wait time exceeded)")
except FileNotFoundError:
raise VRRPNoData("VRRP data is not available (process not running or no active groups)")
@@ -145,7 +148,7 @@ class VRRP(object):
priority = data['effective_priority']
since = int(time() - float(data['last_transition']))
- last = util.seconds_to_human(since)
+ last = seconds_to_human(since)
groups.append([name, intf, vrid, state, priority, last])
diff --git a/python/vyos/remote.py b/python/vyos/remote.py
index 15939a523..ef8c23da4 100644
--- a/python/vyos/remote.py
+++ b/python/vyos/remote.py
@@ -33,14 +33,13 @@ from requests.adapters import HTTPAdapter
from requests.packages.urllib3 import PoolManager
from vyos.utils.io import ask_yes_no
-from vyos.util import begin
-from vyos.utils.process import cmd
from vyos.utils.io import make_incremental_progressbar
from vyos.utils.io import make_progressbar
from vyos.utils.io import print_error
+from vyos.utils.misc import begin
+from vyos.utils.process import cm
from vyos.version import get_version
-
CHUNK_SIZE = 8192
class InteractivePolicy(MissingHostKeyPolicy):
diff --git a/python/vyos/template.py b/python/vyos/template.py
index 3cf60cea9..7d1c3970f 100644
--- a/python/vyos/template.py
+++ b/python/vyos/template.py
@@ -162,19 +162,19 @@ def force_to_list(value):
@register_filter('seconds_to_human')
def seconds_to_human(seconds, separator=""):
""" Convert seconds to human-readable values like 1d6h15m23s """
- from vyos.util import seconds_to_human
+ from vyos.utils.convert import seconds_to_human
return seconds_to_human(seconds, separator=separator)
@register_filter('bytes_to_human')
def bytes_to_human(bytes, initial_exponent=0, precision=2):
""" Convert bytes to human-readable values like 1.44M """
- from vyos.util import bytes_to_human
+ from vyos.utils.convert import bytes_to_human
return bytes_to_human(bytes, initial_exponent=initial_exponent, precision=precision)
@register_filter('human_to_bytes')
def human_to_bytes(value):
""" Convert a data amount with a unit suffix to bytes, like 2K to 2048 """
- from vyos.util import human_to_bytes
+ from vyos.utils.convert import human_to_bytes
return human_to_bytes(value)
@register_filter('ip_from_cidr')
diff --git a/python/vyos/util.py b/python/vyos/util.py
index 0d7985d54..63539e897 100644
--- a/python/vyos/util.py
+++ b/python/vyos/util.py
@@ -13,16 +13,6 @@
# You should have received a copy of the GNU Lesser General Public
# License along with this library. If not, see <http://www.gnu.org/licenses/>.
-import os
-import re
-import sys
-
-#
-# NOTE: Do not import full classes here, move your import to the function
-# where it is used so it is as local as possible to the execution
-#
-
-
def _mangle_dict_keys(data, regex, replacement, abs_path=[], no_tag_node_value_mangle=False, mod=0):
""" Mangles dict keys according to a regex and replacement character.
Some libraries like Jinja2 do not like certain characters in dict keys.
@@ -34,6 +24,7 @@ def _mangle_dict_keys(data, regex, replacement, abs_path=[], no_tag_node_value_m
Returns: dict
"""
+ import re
from vyos.xml import is_tag
new_dict = {}
@@ -68,339 +59,3 @@ def _mangle_dict_keys(data, regex, replacement, abs_path=[], no_tag_node_value_m
def mangle_dict_keys(data, regex, replacement, abs_path=[], no_tag_node_value_mangle=False):
return _mangle_dict_keys(data, regex, replacement, abs_path=abs_path, no_tag_node_value_mangle=no_tag_node_value_mangle, mod=0)
-
-def is_list_equal(first: list, second: list) -> bool:
- """ Check if 2 lists are equal and list not empty """
- if len(first) != len(second) or len(first) == 0:
- return False
- return sorted(first) == sorted(second)
-
-def is_listen_port_bind_service(port: int, service: str) -> bool:
- """Check if listen port bound to expected program name
- :param port: Bind port
- :param service: Program name
- :return: bool
-
- Example:
- % is_listen_port_bind_service(443, 'nginx')
- True
- % is_listen_port_bind_service(443, 'ocserv-main')
- False
- """
- from psutil import net_connections as connections
- from psutil import Process as process
- for connection in connections():
- addr = connection.laddr
- pid = connection.pid
- pid_name = process(pid).name()
- pid_port = addr.port
- if service == pid_name and port == pid_port:
- return True
- return False
-
-def seconds_to_human(s, separator=""):
- """ Converts number of seconds passed to a human-readable
- interval such as 1w4d18h35m59s
- """
- s = int(s)
-
- week = 60 * 60 * 24 * 7
- day = 60 * 60 * 24
- hour = 60 * 60
-
- remainder = 0
- result = ""
-
- weeks = s // week
- if weeks > 0:
- result = "{0}w".format(weeks)
- s = s % week
-
- days = s // day
- if days > 0:
- result = "{0}{1}{2}d".format(result, separator, days)
- s = s % day
-
- hours = s // hour
- if hours > 0:
- result = "{0}{1}{2}h".format(result, separator, hours)
- s = s % hour
-
- minutes = s // 60
- if minutes > 0:
- result = "{0}{1}{2}m".format(result, separator, minutes)
- s = s % 60
-
- seconds = s
- if seconds > 0:
- result = "{0}{1}{2}s".format(result, separator, seconds)
-
- return result
-
-def bytes_to_human(bytes, initial_exponent=0, precision=2):
- """ Converts a value in bytes to a human-readable size string like 640 KB
-
- The initial_exponent parameter is the exponent of 2,
- e.g. 10 (1024) for kilobytes, 20 (1024 * 1024) for megabytes.
- """
-
- if bytes == 0:
- return "0 B"
-
- from math import log2
-
- bytes = bytes * (2**initial_exponent)
-
- # log2 is a float, while range checking requires an int
- exponent = int(log2(bytes))
-
- if exponent < 10:
- value = bytes
- suffix = "B"
- elif exponent in range(10, 20):
- value = bytes / 1024
- suffix = "KB"
- elif exponent in range(20, 30):
- value = bytes / 1024**2
- suffix = "MB"
- elif exponent in range(30, 40):
- value = bytes / 1024**3
- suffix = "GB"
- else:
- value = bytes / 1024**4
- suffix = "TB"
- # Add a new case when the first machine with petabyte RAM
- # hits the market.
-
- size_string = "{0:.{1}f} {2}".format(value, precision, suffix)
- return size_string
-
-def human_to_bytes(value):
- """ Converts a data amount with a unit suffix to bytes, like 2K to 2048 """
-
- from re import match as re_match
-
- res = re_match(r'^\s*(\d+(?:\.\d+)?)\s*([a-zA-Z]+)\s*$', value)
-
- if not res:
- raise ValueError(f"'{value}' is not a valid data amount")
- else:
- amount = float(res.group(1))
- unit = res.group(2).lower()
-
- if unit == 'b':
- res = amount
- elif (unit == 'k') or (unit == 'kb'):
- res = amount * 1024
- elif (unit == 'm') or (unit == 'mb'):
- res = amount * 1024**2
- elif (unit == 'g') or (unit == 'gb'):
- res = amount * 1024**3
- elif (unit == 't') or (unit == 'tb'):
- res = amount * 1024**4
- else:
- raise ValueError(f"Unsupported data unit '{unit}'")
-
- # There cannot be fractional bytes, so we convert them to integer.
- # However, truncating causes problems with conversion back to human unit,
- # so we round instead -- that seems to work well enough.
- return round(res)
-
-def get_cfg_group_id():
- from grp import getgrnam
- from vyos.defaults import cfg_group
-
- group_data = getgrnam(cfg_group)
- return group_data.gr_gid
-
-def wait_for_inotify(file_path, pre_hook=None, event_type=None, timeout=None, sleep_interval=0.1):
- """ Waits for an inotify event to occur """
- if not os.path.dirname(file_path):
- raise ValueError(
- "File path {} does not have a directory part (required for inotify watching)".format(file_path))
- if not os.path.basename(file_path):
- raise ValueError(
- "File path {} does not have a file part, do not know what to watch for".format(file_path))
-
- from inotify.adapters import Inotify
- from time import time
- from time import sleep
-
- time_start = time()
-
- i = Inotify()
- i.add_watch(os.path.dirname(file_path))
-
- if pre_hook:
- pre_hook()
-
- for event in i.event_gen(yield_nones=True):
- if (timeout is not None) and ((time() - time_start) > timeout):
- # If the function didn't return until this point,
- # the file failed to have been written to and closed within the timeout
- raise OSError("Waiting for file {} to be written has failed".format(file_path))
-
- # Most such events don't take much time, so it's better to check right away
- # and sleep later.
- if event is not None:
- (_, type_names, path, filename) = event
- if filename == os.path.basename(file_path):
- if event_type in type_names:
- return
- sleep(sleep_interval)
-
-def wait_for_file_write_complete(file_path, pre_hook=None, timeout=None, sleep_interval=0.1):
- """ Waits for a process to close a file after opening it in write mode. """
- wait_for_inotify(file_path,
- event_type='IN_CLOSE_WRITE', pre_hook=pre_hook, timeout=timeout, sleep_interval=sleep_interval)
-
-def is_admin() -> bool:
- """Look if current user is in sudo group"""
- from getpass import getuser
- from grp import getgrnam
- current_user = getuser()
- (_, _, _, admin_group_members) = getgrnam('sudo')
- return current_user in admin_group_members
-
-def mac2eui64(mac, prefix=None):
- """
- Convert a MAC address to a EUI64 address or, with prefix provided, a full
- IPv6 address.
- Thankfully copied from https://gist.github.com/wido/f5e32576bb57b5cc6f934e177a37a0d3
- """
- import re
- from ipaddress import ip_network
- # http://tools.ietf.org/html/rfc4291#section-2.5.1
- eui64 = re.sub(r'[.:-]', '', mac).lower()
- eui64 = eui64[0:6] + 'fffe' + eui64[6:]
- eui64 = hex(int(eui64[0:2], 16) ^ 2)[2:].zfill(2) + eui64[2:]
-
- if prefix is None:
- return ':'.join(re.findall(r'.{4}', eui64))
- else:
- try:
- net = ip_network(prefix, strict=False)
- euil = int('0x{0}'.format(eui64), 16)
- return str(net[euil])
- except: # pylint: disable=bare-except
- return
-
-def get_half_cpus():
- """ return 1/2 of the numbers of available CPUs """
- cpu = os.cpu_count()
- if cpu > 1:
- cpu /= 2
- return int(cpu)
-
-def check_kmod(k_mod):
- """ Common utility function to load required kernel modules on demand """
- from vyos import ConfigError
- from vyos.utils.process import call
- if isinstance(k_mod, str):
- k_mod = k_mod.split()
- for module in k_mod:
- if not os.path.exists(f'/sys/module/{module}'):
- if call(f'modprobe {module}') != 0:
- raise ConfigError(f'Loading Kernel module {module} failed')
-
-def find_device_file(device):
- """ Recurively search /dev for the given device file and return its full path.
- If no device file was found 'None' is returned """
- from fnmatch import fnmatch
-
- for root, dirs, files in os.walk('/dev'):
- for basename in files:
- if fnmatch(basename, device):
- return os.path.join(root, basename)
-
- return None
-
-def convert_data(data):
- """Convert multiple types of data to types usable in CLI
-
- Args:
- data (str | bytes | list | OrderedDict): input data
-
- Returns:
- str | list | dict: converted data
- """
- from base64 import b64encode
- from collections import OrderedDict
-
- if isinstance(data, str):
- return data
- if isinstance(data, bytes):
- try:
- return data.decode()
- except UnicodeDecodeError:
- return b64encode(data).decode()
- if isinstance(data, list):
- list_tmp = []
- for item in data:
- list_tmp.append(convert_data(item))
- return list_tmp
- if isinstance(data, OrderedDict):
- dict_tmp = {}
- for key, value in data.items():
- dict_tmp[key] = convert_data(value)
- return dict_tmp
-
-def begin(*args):
- """
- Evaluate arguments in order and return the result of the *last* argument.
- For combining multiple expressions in one statement. Useful for lambdas.
- """
- return args[-1]
-
-def begin0(*args):
- """
- Evaluate arguments in order and return the result of the *first* argument.
- For combining multiple expressions in one statement. Useful for lambdas.
- """
- return args[0]
-
-def install_into_config(conf, config_paths, override_prompt=True):
- # Allows op-mode scripts to install values if called from an active config session
- # config_paths: dict of config paths
- # override_prompt: if True, user will be prompted before existing nodes are overwritten
- if not config_paths:
- return None
-
- from vyos.config import Config
- from vyos.utils.io import ask_yes_no
- from vyos.utils.process import cmd
- if not Config().in_session():
- print('You are not in configure mode, commands to install manually from configure mode:')
- for path in config_paths:
- print(f'set {path}')
- return None
-
- count = 0
- failed = []
-
- for path in config_paths:
- if override_prompt and conf.exists(path) and not conf.is_multi(path):
- if not ask_yes_no(f'Config node "{node}" already exists. Do you want to overwrite it?'):
- continue
-
- try:
- cmd(f'/opt/vyatta/sbin/my_set {path}')
- count += 1
- except:
- failed.append(path)
-
- if failed:
- print(f'Failed to install {len(failed)} value(s). Commands to manually install:')
- for path in failed:
- print(f'set {path}')
-
- if count > 0:
- print(f'{count} value(s) installed. Use "compare" to see the pending changes, and "commit" to apply.')
-
-def load_as_module(name: str, path: str):
- import importlib.util
-
- spec = importlib.util.spec_from_file_location(name, path)
- mod = importlib.util.module_from_spec(spec)
- spec.loader.exec_module(mod)
- return mod
diff --git a/python/vyos/utils/__init__.py b/python/vyos/utils/__init__.py
index abc9af5da..6cca4e935 100644
--- a/python/vyos/utils/__init__.py
+++ b/python/vyos/utils/__init__.py
@@ -19,6 +19,9 @@ from vyos.utils import convert
from vyos.utils import dict
from vyos.utils import file
from vyos.utils import io
+from vyos.utils import kernel
+from vyos.utils import list
+from vyos.utils import misc
from vyos.utils import network
from vyos.utils import permission
from vyos.utils import process
diff --git a/python/vyos/utils/convert.py b/python/vyos/utils/convert.py
index 975c67e0a..ec2333ef0 100644
--- a/python/vyos/utils/convert.py
+++ b/python/vyos/utils/convert.py
@@ -143,3 +143,33 @@ def mac_to_eui64(mac, prefix=None):
return str(net[euil])
except: # pylint: disable=bare-except
return
+
+def convert_data(data):
+ """Convert multiple types of data to types usable in CLI
+
+ Args:
+ data (str | bytes | list | OrderedDict): input data
+
+ Returns:
+ str | list | dict: converted data
+ """
+ from base64 import b64encode
+ from collections import OrderedDict
+
+ if isinstance(data, str):
+ return data
+ if isinstance(data, bytes):
+ try:
+ return data.decode()
+ except UnicodeDecodeError:
+ return b64encode(data).decode()
+ if isinstance(data, list):
+ list_tmp = []
+ for item in data:
+ list_tmp.append(convert_data(item))
+ return list_tmp
+ if isinstance(data, OrderedDict):
+ dict_tmp = {}
+ for key, value in data.items():
+ dict_tmp[key] = convert_data(value)
+ return dict_tmp
diff --git a/python/vyos/utils/kernel.py b/python/vyos/utils/kernel.py
new file mode 100644
index 000000000..d950b8e75
--- /dev/null
+++ b/python/vyos/utils/kernel.py
@@ -0,0 +1,25 @@
+# Copyright 2023 VyOS maintainers and contributors <maintainers@vyos.io>
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library. If not, see <http://www.gnu.org/licenses/>.
+
+def check_kmod(k_mod):
+ """ Common utility function to load required kernel modules on demand """
+ from vyos import ConfigError
+ from vyos.utils.process import call
+ if isinstance(k_mod, str):
+ k_mod = k_mod.split()
+ for module in k_mod:
+ if not os.path.exists(f'/sys/module/{module}'):
+ if call(f'modprobe {module}') != 0:
+ raise ConfigError(f'Loading Kernel module {module} failed')
diff --git a/python/vyos/utils/list.py b/python/vyos/utils/list.py
new file mode 100644
index 000000000..63ef720ab
--- /dev/null
+++ b/python/vyos/utils/list.py
@@ -0,0 +1,20 @@
+# Copyright 2023 VyOS maintainers and contributors <maintainers@vyos.io>
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library. If not, see <http://www.gnu.org/licenses/>.
+
+def is_list_equal(first: list, second: list) -> bool:
+ """ Check if 2 lists are equal and list not empty """
+ if len(first) != len(second) or len(first) == 0:
+ return False
+ return sorted(first) == sorted(second)
diff --git a/python/vyos/utils/misc.py b/python/vyos/utils/misc.py
new file mode 100644
index 000000000..d82655914
--- /dev/null
+++ b/python/vyos/utils/misc.py
@@ -0,0 +1,66 @@
+# Copyright 2023 VyOS maintainers and contributors <maintainers@vyos.io>
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library. If not, see <http://www.gnu.org/licenses/>.
+
+def begin(*args):
+ """
+ Evaluate arguments in order and return the result of the *last* argument.
+ For combining multiple expressions in one statement. Useful for lambdas.
+ """
+ return args[-1]
+
+def begin0(*args):
+ """
+ Evaluate arguments in order and return the result of the *first* argument.
+ For combining multiple expressions in one statement. Useful for lambdas.
+ """
+ return args[0]
+
+def install_into_config(conf, config_paths, override_prompt=True):
+ # Allows op-mode scripts to install values if called from an active config session
+ # config_paths: dict of config paths
+ # override_prompt: if True, user will be prompted before existing nodes are overwritten
+ if not config_paths:
+ return None
+
+ from vyos.config import Config
+ from vyos.utils.io import ask_yes_no
+ from vyos.utils.process import cmd
+ if not Config().in_session():
+ print('You are not in configure mode, commands to install manually from configure mode:')
+ for path in config_paths:
+ print(f'set {path}')
+ return None
+
+ count = 0
+ failed = []
+
+ for path in config_paths:
+ if override_prompt and conf.exists(path) and not conf.is_multi(path):
+ if not ask_yes_no(f'Config node "{node}" already exists. Do you want to overwrite it?'):
+ continue
+
+ try:
+ cmd(f'/opt/vyatta/sbin/my_set {path}')
+ count += 1
+ except:
+ failed.append(path)
+
+ if failed:
+ print(f'Failed to install {len(failed)} value(s). Commands to manually install:')
+ for path in failed:
+ print(f'set {path}')
+
+ if count > 0:
+ print(f'{count} value(s) installed. Use "compare" to see the pending changes, and "commit" to apply.')
diff --git a/python/vyos/utils/network.py b/python/vyos/utils/network.py
index 8db598f05..3786caf26 100644
--- a/python/vyos/utils/network.py
+++ b/python/vyos/utils/network.py
@@ -154,7 +154,6 @@ def mac2eui64(mac, prefix=None):
except: # pylint: disable=bare-except
return
-
def check_port_availability(ipaddress, port, protocol):
"""
Check if port is available and not used by any service
@@ -189,3 +188,26 @@ def check_port_availability(ipaddress, port, protocol):
return False
return True
+
+def is_listen_port_bind_service(port: int, service: str) -> bool:
+ """Check if listen port bound to expected program name
+ :param port: Bind port
+ :param service: Program name
+ :return: bool
+
+ Example:
+ % is_listen_port_bind_service(443, 'nginx')
+ True
+ % is_listen_port_bind_service(443, 'ocserv-main')
+ False
+ """
+ from psutil import net_connections as connections
+ from psutil import Process as process
+ for connection in connections():
+ addr = connection.laddr
+ pid = connection.pid
+ pid_name = process(pid).name()
+ pid_port = addr.port
+ if service == pid_name and port == pid_port:
+ return True
+ return False
diff --git a/python/vyos/utils/permission.py b/python/vyos/utils/permission.py
index 8c2d72b83..d938b494f 100644
--- a/python/vyos/utils/permission.py
+++ b/python/vyos/utils/permission.py
@@ -61,3 +61,18 @@ def chmod_755(path):
bitmask = S_IRUSR | S_IWUSR | S_IXUSR | S_IRGRP | S_IXGRP | \
S_IROTH | S_IXOTH
chmod(path, bitmask)
+
+def is_admin() -> bool:
+ """Look if current user is in sudo group"""
+ from getpass import getuser
+ from grp import getgrnam
+ current_user = getuser()
+ (_, _, _, admin_group_members) = getgrnam('sudo')
+ return current_user in admin_group_members
+
+def get_cfg_group_id():
+ from grp import getgrnam
+ from vyos.defaults import cfg_group
+
+ group_data = getgrnam(cfg_group)
+ return group_data.gr_gid
diff --git a/python/vyos/utils/system.py b/python/vyos/utils/system.py
index 7102d5985..5d41c0c05 100644
--- a/python/vyos/utils/system.py
+++ b/python/vyos/utils/system.py
@@ -13,9 +13,9 @@
# You should have received a copy of the GNU Lesser General Public
# License along with this library. If not, see <http://www.gnu.org/licenses/>.
+import os
from subprocess import run
-
def sysctl_read(name: str) -> str:
"""Read and return current value of sysctl() option
@@ -28,7 +28,6 @@ def sysctl_read(name: str) -> str:
tmp = run(['sysctl', '-nb', name], capture_output=True)
return tmp.stdout.decode()
-
def sysctl_write(name: str, value: str | int) -> bool:
"""Change value via sysctl()
@@ -56,13 +55,12 @@ def sysctl_write(name: str, value: str | int) -> bool:
# False in other cases
return False
-
def sysctl_apply(sysctl_dict: dict[str, str], revert: bool = True) -> bool:
"""Apply sysctl values.
Args:
sysctl_dict (dict[str, str]): dictionary with sysctl keys with values
- revert (bool, optional): Revert to original values if new were not
+ revert (bool, optional): Revert to original values if new were not
applied. Defaults to True.
Returns:
@@ -80,3 +78,30 @@ def sysctl_apply(sysctl_dict: dict[str, str], revert: bool = True) -> bool:
return False
# everything applied
return True
+
+def get_half_cpus():
+ """ return 1/2 of the numbers of available CPUs """
+ cpu = os.cpu_count()
+ if cpu > 1:
+ cpu /= 2
+ return int(cpu)
+
+def find_device_file(device):
+ """ Recurively search /dev for the given device file and return its full path.
+ If no device file was found 'None' is returned """
+ from fnmatch import fnmatch
+
+ for root, dirs, files in os.walk('/dev'):
+ for basename in files:
+ if fnmatch(basename, device):
+ return os.path.join(root, basename)
+
+ return None
+
+def load_as_module(name: str, path: str):
+ import importlib.util
+
+ spec = importlib.util.spec_from_file_location(name, path)
+ mod = importlib.util.module_from_spec(spec)
+ spec.loader.exec_module(mod)
+ return mod
diff --git a/python/vyos/validate.py b/python/vyos/validate.py
index 7afbe81c9..567f4c972 100644
--- a/python/vyos/validate.py
+++ b/python/vyos/validate.py
@@ -102,6 +102,7 @@ def is_addr_assigned(ip_address, vrf=None) -> bool:
from netifaces import interfaces
from vyos.utils.network import get_interface_config
from vyos.utils.dict import dict_search
+
for interface in interfaces():
# Check if interface belongs to the requested VRF, if this is not the
# case there is no need to proceed with this data set - continue loop