From 5f77ccf91eb402c548fc91b2e080a4b2b86f4181 Mon Sep 17 00:00:00 2001 From: Christian Breunig Date: Sat, 15 Jul 2023 20:12:56 +0200 Subject: T5195: vyos.util -> vyos.utils package refactoring part #2 --- python/vyos/configdep.py | 2 +- python/vyos/configdict.py | 2 +- python/vyos/ifconfig/vrrp.py | 19 ++- python/vyos/remote.py | 5 +- python/vyos/template.py | 6 +- python/vyos/util.py | 347 +--------------------------------------- python/vyos/utils/__init__.py | 3 + python/vyos/utils/convert.py | 30 ++++ python/vyos/utils/kernel.py | 25 +++ python/vyos/utils/list.py | 20 +++ python/vyos/utils/misc.py | 66 ++++++++ python/vyos/utils/network.py | 24 ++- python/vyos/utils/permission.py | 15 ++ python/vyos/utils/system.py | 33 +++- python/vyos/validate.py | 1 + 15 files changed, 231 insertions(+), 367 deletions(-) create mode 100644 python/vyos/utils/kernel.py create mode 100644 python/vyos/utils/list.py create mode 100644 python/vyos/utils/misc.py (limited to 'python') diff --git a/python/vyos/configdep.py b/python/vyos/configdep.py index d4b2cc78f..7a8559839 100644 --- a/python/vyos/configdep.py +++ b/python/vyos/configdep.py @@ -18,7 +18,7 @@ import json import typing from inspect import stack -from vyos.util import load_as_module +from vyos.utils.system import load_as_module from vyos.defaults import directories from vyos.configsource import VyOSError from vyos import ConfigError diff --git a/python/vyos/configdict.py b/python/vyos/configdict.py index 2411a04c4..f642d38f2 100644 --- a/python/vyos/configdict.py +++ b/python/vyos/configdict.py @@ -590,7 +590,7 @@ def get_accel_dict(config, base, chap_secrets): Return a dictionary with the necessary interface config keys. """ - from vyos.util import get_half_cpus + from vyos.utils.system import get_half_cpus from vyos.template import is_ipv4 dict = config.get_config_dict(base, key_mangling=('-', '_'), diff --git a/python/vyos/ifconfig/vrrp.py b/python/vyos/ifconfig/vrrp.py index 47aaadecd..fde903a53 100644 --- a/python/vyos/ifconfig/vrrp.py +++ b/python/vyos/ifconfig/vrrp.py @@ -1,4 +1,4 @@ -# Copyright 2019 VyOS maintainers and contributors +# Copyright 2019-2023 VyOS maintainers and contributors # # This library is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public @@ -21,8 +21,11 @@ from time import time from time import sleep from tabulate import tabulate -from vyos import util from vyos.configquery import ConfigTreeQuery +from vyos.utils.convert import seconds_to_human +from vyos.utils.file import read_file +from vyos.utils.file import wait_for_file_write_complete +from vyos.utils.process import process_running class VRRPError(Exception): pass @@ -84,21 +87,21 @@ class VRRP(object): def is_running(cls): if not os.path.exists(cls.location['pid']): return False - return util.process_running(cls.location['pid']) + return process_running(cls.location['pid']) @classmethod def collect(cls, what): fname = cls.location[what] try: # send signal to generate the configuration file - pid = util.read_file(cls.location['pid']) - util.wait_for_file_write_complete(fname, + pid = read_file(cls.location['pid']) + wait_for_file_write_complete(fname, pre_hook=(lambda: os.kill(int(pid), cls._signal[what])), timeout=30) - return util.read_file(fname) + return read_file(fname) except OSError: - # raised by vyos.util.read_file + # raised by vyos.utils.file.read_file raise VRRPNoData("VRRP data is not available (wait time exceeded)") except FileNotFoundError: raise VRRPNoData("VRRP data is not available (process not running or no active groups)") @@ -145,7 +148,7 @@ class VRRP(object): priority = data['effective_priority'] since = int(time() - float(data['last_transition'])) - last = util.seconds_to_human(since) + last = seconds_to_human(since) groups.append([name, intf, vrid, state, priority, last]) diff --git a/python/vyos/remote.py b/python/vyos/remote.py index 15939a523..ef8c23da4 100644 --- a/python/vyos/remote.py +++ b/python/vyos/remote.py @@ -33,14 +33,13 @@ from requests.adapters import HTTPAdapter from requests.packages.urllib3 import PoolManager from vyos.utils.io import ask_yes_no -from vyos.util import begin -from vyos.utils.process import cmd from vyos.utils.io import make_incremental_progressbar from vyos.utils.io import make_progressbar from vyos.utils.io import print_error +from vyos.utils.misc import begin +from vyos.utils.process import cm from vyos.version import get_version - CHUNK_SIZE = 8192 class InteractivePolicy(MissingHostKeyPolicy): diff --git a/python/vyos/template.py b/python/vyos/template.py index 3cf60cea9..7d1c3970f 100644 --- a/python/vyos/template.py +++ b/python/vyos/template.py @@ -162,19 +162,19 @@ def force_to_list(value): @register_filter('seconds_to_human') def seconds_to_human(seconds, separator=""): """ Convert seconds to human-readable values like 1d6h15m23s """ - from vyos.util import seconds_to_human + from vyos.utils.convert import seconds_to_human return seconds_to_human(seconds, separator=separator) @register_filter('bytes_to_human') def bytes_to_human(bytes, initial_exponent=0, precision=2): """ Convert bytes to human-readable values like 1.44M """ - from vyos.util import bytes_to_human + from vyos.utils.convert import bytes_to_human return bytes_to_human(bytes, initial_exponent=initial_exponent, precision=precision) @register_filter('human_to_bytes') def human_to_bytes(value): """ Convert a data amount with a unit suffix to bytes, like 2K to 2048 """ - from vyos.util import human_to_bytes + from vyos.utils.convert import human_to_bytes return human_to_bytes(value) @register_filter('ip_from_cidr') diff --git a/python/vyos/util.py b/python/vyos/util.py index 0d7985d54..63539e897 100644 --- a/python/vyos/util.py +++ b/python/vyos/util.py @@ -13,16 +13,6 @@ # You should have received a copy of the GNU Lesser General Public # License along with this library. If not, see . -import os -import re -import sys - -# -# NOTE: Do not import full classes here, move your import to the function -# where it is used so it is as local as possible to the execution -# - - def _mangle_dict_keys(data, regex, replacement, abs_path=[], no_tag_node_value_mangle=False, mod=0): """ Mangles dict keys according to a regex and replacement character. Some libraries like Jinja2 do not like certain characters in dict keys. @@ -34,6 +24,7 @@ def _mangle_dict_keys(data, regex, replacement, abs_path=[], no_tag_node_value_m Returns: dict """ + import re from vyos.xml import is_tag new_dict = {} @@ -68,339 +59,3 @@ def _mangle_dict_keys(data, regex, replacement, abs_path=[], no_tag_node_value_m def mangle_dict_keys(data, regex, replacement, abs_path=[], no_tag_node_value_mangle=False): return _mangle_dict_keys(data, regex, replacement, abs_path=abs_path, no_tag_node_value_mangle=no_tag_node_value_mangle, mod=0) - -def is_list_equal(first: list, second: list) -> bool: - """ Check if 2 lists are equal and list not empty """ - if len(first) != len(second) or len(first) == 0: - return False - return sorted(first) == sorted(second) - -def is_listen_port_bind_service(port: int, service: str) -> bool: - """Check if listen port bound to expected program name - :param port: Bind port - :param service: Program name - :return: bool - - Example: - % is_listen_port_bind_service(443, 'nginx') - True - % is_listen_port_bind_service(443, 'ocserv-main') - False - """ - from psutil import net_connections as connections - from psutil import Process as process - for connection in connections(): - addr = connection.laddr - pid = connection.pid - pid_name = process(pid).name() - pid_port = addr.port - if service == pid_name and port == pid_port: - return True - return False - -def seconds_to_human(s, separator=""): - """ Converts number of seconds passed to a human-readable - interval such as 1w4d18h35m59s - """ - s = int(s) - - week = 60 * 60 * 24 * 7 - day = 60 * 60 * 24 - hour = 60 * 60 - - remainder = 0 - result = "" - - weeks = s // week - if weeks > 0: - result = "{0}w".format(weeks) - s = s % week - - days = s // day - if days > 0: - result = "{0}{1}{2}d".format(result, separator, days) - s = s % day - - hours = s // hour - if hours > 0: - result = "{0}{1}{2}h".format(result, separator, hours) - s = s % hour - - minutes = s // 60 - if minutes > 0: - result = "{0}{1}{2}m".format(result, separator, minutes) - s = s % 60 - - seconds = s - if seconds > 0: - result = "{0}{1}{2}s".format(result, separator, seconds) - - return result - -def bytes_to_human(bytes, initial_exponent=0, precision=2): - """ Converts a value in bytes to a human-readable size string like 640 KB - - The initial_exponent parameter is the exponent of 2, - e.g. 10 (1024) for kilobytes, 20 (1024 * 1024) for megabytes. - """ - - if bytes == 0: - return "0 B" - - from math import log2 - - bytes = bytes * (2**initial_exponent) - - # log2 is a float, while range checking requires an int - exponent = int(log2(bytes)) - - if exponent < 10: - value = bytes - suffix = "B" - elif exponent in range(10, 20): - value = bytes / 1024 - suffix = "KB" - elif exponent in range(20, 30): - value = bytes / 1024**2 - suffix = "MB" - elif exponent in range(30, 40): - value = bytes / 1024**3 - suffix = "GB" - else: - value = bytes / 1024**4 - suffix = "TB" - # Add a new case when the first machine with petabyte RAM - # hits the market. - - size_string = "{0:.{1}f} {2}".format(value, precision, suffix) - return size_string - -def human_to_bytes(value): - """ Converts a data amount with a unit suffix to bytes, like 2K to 2048 """ - - from re import match as re_match - - res = re_match(r'^\s*(\d+(?:\.\d+)?)\s*([a-zA-Z]+)\s*$', value) - - if not res: - raise ValueError(f"'{value}' is not a valid data amount") - else: - amount = float(res.group(1)) - unit = res.group(2).lower() - - if unit == 'b': - res = amount - elif (unit == 'k') or (unit == 'kb'): - res = amount * 1024 - elif (unit == 'm') or (unit == 'mb'): - res = amount * 1024**2 - elif (unit == 'g') or (unit == 'gb'): - res = amount * 1024**3 - elif (unit == 't') or (unit == 'tb'): - res = amount * 1024**4 - else: - raise ValueError(f"Unsupported data unit '{unit}'") - - # There cannot be fractional bytes, so we convert them to integer. - # However, truncating causes problems with conversion back to human unit, - # so we round instead -- that seems to work well enough. - return round(res) - -def get_cfg_group_id(): - from grp import getgrnam - from vyos.defaults import cfg_group - - group_data = getgrnam(cfg_group) - return group_data.gr_gid - -def wait_for_inotify(file_path, pre_hook=None, event_type=None, timeout=None, sleep_interval=0.1): - """ Waits for an inotify event to occur """ - if not os.path.dirname(file_path): - raise ValueError( - "File path {} does not have a directory part (required for inotify watching)".format(file_path)) - if not os.path.basename(file_path): - raise ValueError( - "File path {} does not have a file part, do not know what to watch for".format(file_path)) - - from inotify.adapters import Inotify - from time import time - from time import sleep - - time_start = time() - - i = Inotify() - i.add_watch(os.path.dirname(file_path)) - - if pre_hook: - pre_hook() - - for event in i.event_gen(yield_nones=True): - if (timeout is not None) and ((time() - time_start) > timeout): - # If the function didn't return until this point, - # the file failed to have been written to and closed within the timeout - raise OSError("Waiting for file {} to be written has failed".format(file_path)) - - # Most such events don't take much time, so it's better to check right away - # and sleep later. - if event is not None: - (_, type_names, path, filename) = event - if filename == os.path.basename(file_path): - if event_type in type_names: - return - sleep(sleep_interval) - -def wait_for_file_write_complete(file_path, pre_hook=None, timeout=None, sleep_interval=0.1): - """ Waits for a process to close a file after opening it in write mode. """ - wait_for_inotify(file_path, - event_type='IN_CLOSE_WRITE', pre_hook=pre_hook, timeout=timeout, sleep_interval=sleep_interval) - -def is_admin() -> bool: - """Look if current user is in sudo group""" - from getpass import getuser - from grp import getgrnam - current_user = getuser() - (_, _, _, admin_group_members) = getgrnam('sudo') - return current_user in admin_group_members - -def mac2eui64(mac, prefix=None): - """ - Convert a MAC address to a EUI64 address or, with prefix provided, a full - IPv6 address. - Thankfully copied from https://gist.github.com/wido/f5e32576bb57b5cc6f934e177a37a0d3 - """ - import re - from ipaddress import ip_network - # http://tools.ietf.org/html/rfc4291#section-2.5.1 - eui64 = re.sub(r'[.:-]', '', mac).lower() - eui64 = eui64[0:6] + 'fffe' + eui64[6:] - eui64 = hex(int(eui64[0:2], 16) ^ 2)[2:].zfill(2) + eui64[2:] - - if prefix is None: - return ':'.join(re.findall(r'.{4}', eui64)) - else: - try: - net = ip_network(prefix, strict=False) - euil = int('0x{0}'.format(eui64), 16) - return str(net[euil]) - except: # pylint: disable=bare-except - return - -def get_half_cpus(): - """ return 1/2 of the numbers of available CPUs """ - cpu = os.cpu_count() - if cpu > 1: - cpu /= 2 - return int(cpu) - -def check_kmod(k_mod): - """ Common utility function to load required kernel modules on demand """ - from vyos import ConfigError - from vyos.utils.process import call - if isinstance(k_mod, str): - k_mod = k_mod.split() - for module in k_mod: - if not os.path.exists(f'/sys/module/{module}'): - if call(f'modprobe {module}') != 0: - raise ConfigError(f'Loading Kernel module {module} failed') - -def find_device_file(device): - """ Recurively search /dev for the given device file and return its full path. - If no device file was found 'None' is returned """ - from fnmatch import fnmatch - - for root, dirs, files in os.walk('/dev'): - for basename in files: - if fnmatch(basename, device): - return os.path.join(root, basename) - - return None - -def convert_data(data): - """Convert multiple types of data to types usable in CLI - - Args: - data (str | bytes | list | OrderedDict): input data - - Returns: - str | list | dict: converted data - """ - from base64 import b64encode - from collections import OrderedDict - - if isinstance(data, str): - return data - if isinstance(data, bytes): - try: - return data.decode() - except UnicodeDecodeError: - return b64encode(data).decode() - if isinstance(data, list): - list_tmp = [] - for item in data: - list_tmp.append(convert_data(item)) - return list_tmp - if isinstance(data, OrderedDict): - dict_tmp = {} - for key, value in data.items(): - dict_tmp[key] = convert_data(value) - return dict_tmp - -def begin(*args): - """ - Evaluate arguments in order and return the result of the *last* argument. - For combining multiple expressions in one statement. Useful for lambdas. - """ - return args[-1] - -def begin0(*args): - """ - Evaluate arguments in order and return the result of the *first* argument. - For combining multiple expressions in one statement. Useful for lambdas. - """ - return args[0] - -def install_into_config(conf, config_paths, override_prompt=True): - # Allows op-mode scripts to install values if called from an active config session - # config_paths: dict of config paths - # override_prompt: if True, user will be prompted before existing nodes are overwritten - if not config_paths: - return None - - from vyos.config import Config - from vyos.utils.io import ask_yes_no - from vyos.utils.process import cmd - if not Config().in_session(): - print('You are not in configure mode, commands to install manually from configure mode:') - for path in config_paths: - print(f'set {path}') - return None - - count = 0 - failed = [] - - for path in config_paths: - if override_prompt and conf.exists(path) and not conf.is_multi(path): - if not ask_yes_no(f'Config node "{node}" already exists. Do you want to overwrite it?'): - continue - - try: - cmd(f'/opt/vyatta/sbin/my_set {path}') - count += 1 - except: - failed.append(path) - - if failed: - print(f'Failed to install {len(failed)} value(s). Commands to manually install:') - for path in failed: - print(f'set {path}') - - if count > 0: - print(f'{count} value(s) installed. Use "compare" to see the pending changes, and "commit" to apply.') - -def load_as_module(name: str, path: str): - import importlib.util - - spec = importlib.util.spec_from_file_location(name, path) - mod = importlib.util.module_from_spec(spec) - spec.loader.exec_module(mod) - return mod diff --git a/python/vyos/utils/__init__.py b/python/vyos/utils/__init__.py index abc9af5da..6cca4e935 100644 --- a/python/vyos/utils/__init__.py +++ b/python/vyos/utils/__init__.py @@ -19,6 +19,9 @@ from vyos.utils import convert from vyos.utils import dict from vyos.utils import file from vyos.utils import io +from vyos.utils import kernel +from vyos.utils import list +from vyos.utils import misc from vyos.utils import network from vyos.utils import permission from vyos.utils import process diff --git a/python/vyos/utils/convert.py b/python/vyos/utils/convert.py index 975c67e0a..ec2333ef0 100644 --- a/python/vyos/utils/convert.py +++ b/python/vyos/utils/convert.py @@ -143,3 +143,33 @@ def mac_to_eui64(mac, prefix=None): return str(net[euil]) except: # pylint: disable=bare-except return + +def convert_data(data): + """Convert multiple types of data to types usable in CLI + + Args: + data (str | bytes | list | OrderedDict): input data + + Returns: + str | list | dict: converted data + """ + from base64 import b64encode + from collections import OrderedDict + + if isinstance(data, str): + return data + if isinstance(data, bytes): + try: + return data.decode() + except UnicodeDecodeError: + return b64encode(data).decode() + if isinstance(data, list): + list_tmp = [] + for item in data: + list_tmp.append(convert_data(item)) + return list_tmp + if isinstance(data, OrderedDict): + dict_tmp = {} + for key, value in data.items(): + dict_tmp[key] = convert_data(value) + return dict_tmp diff --git a/python/vyos/utils/kernel.py b/python/vyos/utils/kernel.py new file mode 100644 index 000000000..d950b8e75 --- /dev/null +++ b/python/vyos/utils/kernel.py @@ -0,0 +1,25 @@ +# Copyright 2023 VyOS maintainers and contributors +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library. If not, see . + +def check_kmod(k_mod): + """ Common utility function to load required kernel modules on demand """ + from vyos import ConfigError + from vyos.utils.process import call + if isinstance(k_mod, str): + k_mod = k_mod.split() + for module in k_mod: + if not os.path.exists(f'/sys/module/{module}'): + if call(f'modprobe {module}') != 0: + raise ConfigError(f'Loading Kernel module {module} failed') diff --git a/python/vyos/utils/list.py b/python/vyos/utils/list.py new file mode 100644 index 000000000..63ef720ab --- /dev/null +++ b/python/vyos/utils/list.py @@ -0,0 +1,20 @@ +# Copyright 2023 VyOS maintainers and contributors +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library. If not, see . + +def is_list_equal(first: list, second: list) -> bool: + """ Check if 2 lists are equal and list not empty """ + if len(first) != len(second) or len(first) == 0: + return False + return sorted(first) == sorted(second) diff --git a/python/vyos/utils/misc.py b/python/vyos/utils/misc.py new file mode 100644 index 000000000..d82655914 --- /dev/null +++ b/python/vyos/utils/misc.py @@ -0,0 +1,66 @@ +# Copyright 2023 VyOS maintainers and contributors +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library. If not, see . + +def begin(*args): + """ + Evaluate arguments in order and return the result of the *last* argument. + For combining multiple expressions in one statement. Useful for lambdas. + """ + return args[-1] + +def begin0(*args): + """ + Evaluate arguments in order and return the result of the *first* argument. + For combining multiple expressions in one statement. Useful for lambdas. + """ + return args[0] + +def install_into_config(conf, config_paths, override_prompt=True): + # Allows op-mode scripts to install values if called from an active config session + # config_paths: dict of config paths + # override_prompt: if True, user will be prompted before existing nodes are overwritten + if not config_paths: + return None + + from vyos.config import Config + from vyos.utils.io import ask_yes_no + from vyos.utils.process import cmd + if not Config().in_session(): + print('You are not in configure mode, commands to install manually from configure mode:') + for path in config_paths: + print(f'set {path}') + return None + + count = 0 + failed = [] + + for path in config_paths: + if override_prompt and conf.exists(path) and not conf.is_multi(path): + if not ask_yes_no(f'Config node "{node}" already exists. Do you want to overwrite it?'): + continue + + try: + cmd(f'/opt/vyatta/sbin/my_set {path}') + count += 1 + except: + failed.append(path) + + if failed: + print(f'Failed to install {len(failed)} value(s). Commands to manually install:') + for path in failed: + print(f'set {path}') + + if count > 0: + print(f'{count} value(s) installed. Use "compare" to see the pending changes, and "commit" to apply.') diff --git a/python/vyos/utils/network.py b/python/vyos/utils/network.py index 8db598f05..3786caf26 100644 --- a/python/vyos/utils/network.py +++ b/python/vyos/utils/network.py @@ -154,7 +154,6 @@ def mac2eui64(mac, prefix=None): except: # pylint: disable=bare-except return - def check_port_availability(ipaddress, port, protocol): """ Check if port is available and not used by any service @@ -189,3 +188,26 @@ def check_port_availability(ipaddress, port, protocol): return False return True + +def is_listen_port_bind_service(port: int, service: str) -> bool: + """Check if listen port bound to expected program name + :param port: Bind port + :param service: Program name + :return: bool + + Example: + % is_listen_port_bind_service(443, 'nginx') + True + % is_listen_port_bind_service(443, 'ocserv-main') + False + """ + from psutil import net_connections as connections + from psutil import Process as process + for connection in connections(): + addr = connection.laddr + pid = connection.pid + pid_name = process(pid).name() + pid_port = addr.port + if service == pid_name and port == pid_port: + return True + return False diff --git a/python/vyos/utils/permission.py b/python/vyos/utils/permission.py index 8c2d72b83..d938b494f 100644 --- a/python/vyos/utils/permission.py +++ b/python/vyos/utils/permission.py @@ -61,3 +61,18 @@ def chmod_755(path): bitmask = S_IRUSR | S_IWUSR | S_IXUSR | S_IRGRP | S_IXGRP | \ S_IROTH | S_IXOTH chmod(path, bitmask) + +def is_admin() -> bool: + """Look if current user is in sudo group""" + from getpass import getuser + from grp import getgrnam + current_user = getuser() + (_, _, _, admin_group_members) = getgrnam('sudo') + return current_user in admin_group_members + +def get_cfg_group_id(): + from grp import getgrnam + from vyos.defaults import cfg_group + + group_data = getgrnam(cfg_group) + return group_data.gr_gid diff --git a/python/vyos/utils/system.py b/python/vyos/utils/system.py index 7102d5985..5d41c0c05 100644 --- a/python/vyos/utils/system.py +++ b/python/vyos/utils/system.py @@ -13,9 +13,9 @@ # You should have received a copy of the GNU Lesser General Public # License along with this library. If not, see . +import os from subprocess import run - def sysctl_read(name: str) -> str: """Read and return current value of sysctl() option @@ -28,7 +28,6 @@ def sysctl_read(name: str) -> str: tmp = run(['sysctl', '-nb', name], capture_output=True) return tmp.stdout.decode() - def sysctl_write(name: str, value: str | int) -> bool: """Change value via sysctl() @@ -56,13 +55,12 @@ def sysctl_write(name: str, value: str | int) -> bool: # False in other cases return False - def sysctl_apply(sysctl_dict: dict[str, str], revert: bool = True) -> bool: """Apply sysctl values. Args: sysctl_dict (dict[str, str]): dictionary with sysctl keys with values - revert (bool, optional): Revert to original values if new were not + revert (bool, optional): Revert to original values if new were not applied. Defaults to True. Returns: @@ -80,3 +78,30 @@ def sysctl_apply(sysctl_dict: dict[str, str], revert: bool = True) -> bool: return False # everything applied return True + +def get_half_cpus(): + """ return 1/2 of the numbers of available CPUs """ + cpu = os.cpu_count() + if cpu > 1: + cpu /= 2 + return int(cpu) + +def find_device_file(device): + """ Recurively search /dev for the given device file and return its full path. + If no device file was found 'None' is returned """ + from fnmatch import fnmatch + + for root, dirs, files in os.walk('/dev'): + for basename in files: + if fnmatch(basename, device): + return os.path.join(root, basename) + + return None + +def load_as_module(name: str, path: str): + import importlib.util + + spec = importlib.util.spec_from_file_location(name, path) + mod = importlib.util.module_from_spec(spec) + spec.loader.exec_module(mod) + return mod diff --git a/python/vyos/validate.py b/python/vyos/validate.py index 7afbe81c9..567f4c972 100644 --- a/python/vyos/validate.py +++ b/python/vyos/validate.py @@ -102,6 +102,7 @@ def is_addr_assigned(ip_address, vrf=None) -> bool: from netifaces import interfaces from vyos.utils.network import get_interface_config from vyos.utils.dict import dict_search + for interface in interfaces(): # Check if interface belongs to the requested VRF, if this is not the # case there is no need to proceed with this data set - continue loop -- cgit v1.2.3