From 5f77ccf91eb402c548fc91b2e080a4b2b86f4181 Mon Sep 17 00:00:00 2001 From: Christian Breunig Date: Sat, 15 Jul 2023 20:12:56 +0200 Subject: T5195: vyos.util -> vyos.utils package refactoring part #2 --- python/vyos/util.py | 347 +--------------------------------------------------- 1 file changed, 1 insertion(+), 346 deletions(-) (limited to 'python/vyos/util.py') diff --git a/python/vyos/util.py b/python/vyos/util.py index 0d7985d54..63539e897 100644 --- a/python/vyos/util.py +++ b/python/vyos/util.py @@ -13,16 +13,6 @@ # You should have received a copy of the GNU Lesser General Public # License along with this library. If not, see . -import os -import re -import sys - -# -# NOTE: Do not import full classes here, move your import to the function -# where it is used so it is as local as possible to the execution -# - - def _mangle_dict_keys(data, regex, replacement, abs_path=[], no_tag_node_value_mangle=False, mod=0): """ Mangles dict keys according to a regex and replacement character. Some libraries like Jinja2 do not like certain characters in dict keys. @@ -34,6 +24,7 @@ def _mangle_dict_keys(data, regex, replacement, abs_path=[], no_tag_node_value_m Returns: dict """ + import re from vyos.xml import is_tag new_dict = {} @@ -68,339 +59,3 @@ def _mangle_dict_keys(data, regex, replacement, abs_path=[], no_tag_node_value_m def mangle_dict_keys(data, regex, replacement, abs_path=[], no_tag_node_value_mangle=False): return _mangle_dict_keys(data, regex, replacement, abs_path=abs_path, no_tag_node_value_mangle=no_tag_node_value_mangle, mod=0) - -def is_list_equal(first: list, second: list) -> bool: - """ Check if 2 lists are equal and list not empty """ - if len(first) != len(second) or len(first) == 0: - return False - return sorted(first) == sorted(second) - -def is_listen_port_bind_service(port: int, service: str) -> bool: - """Check if listen port bound to expected program name - :param port: Bind port - :param service: Program name - :return: bool - - Example: - % is_listen_port_bind_service(443, 'nginx') - True - % is_listen_port_bind_service(443, 'ocserv-main') - False - """ - from psutil import net_connections as connections - from psutil import Process as process - for connection in connections(): - addr = connection.laddr - pid = connection.pid - pid_name = process(pid).name() - pid_port = addr.port - if service == pid_name and port == pid_port: - return True - return False - -def seconds_to_human(s, separator=""): - """ Converts number of seconds passed to a human-readable - interval such as 1w4d18h35m59s - """ - s = int(s) - - week = 60 * 60 * 24 * 7 - day = 60 * 60 * 24 - hour = 60 * 60 - - remainder = 0 - result = "" - - weeks = s // week - if weeks > 0: - result = "{0}w".format(weeks) - s = s % week - - days = s // day - if days > 0: - result = "{0}{1}{2}d".format(result, separator, days) - s = s % day - - hours = s // hour - if hours > 0: - result = "{0}{1}{2}h".format(result, separator, hours) - s = s % hour - - minutes = s // 60 - if minutes > 0: - result = "{0}{1}{2}m".format(result, separator, minutes) - s = s % 60 - - seconds = s - if seconds > 0: - result = "{0}{1}{2}s".format(result, separator, seconds) - - return result - -def bytes_to_human(bytes, initial_exponent=0, precision=2): - """ Converts a value in bytes to a human-readable size string like 640 KB - - The initial_exponent parameter is the exponent of 2, - e.g. 10 (1024) for kilobytes, 20 (1024 * 1024) for megabytes. - """ - - if bytes == 0: - return "0 B" - - from math import log2 - - bytes = bytes * (2**initial_exponent) - - # log2 is a float, while range checking requires an int - exponent = int(log2(bytes)) - - if exponent < 10: - value = bytes - suffix = "B" - elif exponent in range(10, 20): - value = bytes / 1024 - suffix = "KB" - elif exponent in range(20, 30): - value = bytes / 1024**2 - suffix = "MB" - elif exponent in range(30, 40): - value = bytes / 1024**3 - suffix = "GB" - else: - value = bytes / 1024**4 - suffix = "TB" - # Add a new case when the first machine with petabyte RAM - # hits the market. - - size_string = "{0:.{1}f} {2}".format(value, precision, suffix) - return size_string - -def human_to_bytes(value): - """ Converts a data amount with a unit suffix to bytes, like 2K to 2048 """ - - from re import match as re_match - - res = re_match(r'^\s*(\d+(?:\.\d+)?)\s*([a-zA-Z]+)\s*$', value) - - if not res: - raise ValueError(f"'{value}' is not a valid data amount") - else: - amount = float(res.group(1)) - unit = res.group(2).lower() - - if unit == 'b': - res = amount - elif (unit == 'k') or (unit == 'kb'): - res = amount * 1024 - elif (unit == 'm') or (unit == 'mb'): - res = amount * 1024**2 - elif (unit == 'g') or (unit == 'gb'): - res = amount * 1024**3 - elif (unit == 't') or (unit == 'tb'): - res = amount * 1024**4 - else: - raise ValueError(f"Unsupported data unit '{unit}'") - - # There cannot be fractional bytes, so we convert them to integer. - # However, truncating causes problems with conversion back to human unit, - # so we round instead -- that seems to work well enough. - return round(res) - -def get_cfg_group_id(): - from grp import getgrnam - from vyos.defaults import cfg_group - - group_data = getgrnam(cfg_group) - return group_data.gr_gid - -def wait_for_inotify(file_path, pre_hook=None, event_type=None, timeout=None, sleep_interval=0.1): - """ Waits for an inotify event to occur """ - if not os.path.dirname(file_path): - raise ValueError( - "File path {} does not have a directory part (required for inotify watching)".format(file_path)) - if not os.path.basename(file_path): - raise ValueError( - "File path {} does not have a file part, do not know what to watch for".format(file_path)) - - from inotify.adapters import Inotify - from time import time - from time import sleep - - time_start = time() - - i = Inotify() - i.add_watch(os.path.dirname(file_path)) - - if pre_hook: - pre_hook() - - for event in i.event_gen(yield_nones=True): - if (timeout is not None) and ((time() - time_start) > timeout): - # If the function didn't return until this point, - # the file failed to have been written to and closed within the timeout - raise OSError("Waiting for file {} to be written has failed".format(file_path)) - - # Most such events don't take much time, so it's better to check right away - # and sleep later. - if event is not None: - (_, type_names, path, filename) = event - if filename == os.path.basename(file_path): - if event_type in type_names: - return - sleep(sleep_interval) - -def wait_for_file_write_complete(file_path, pre_hook=None, timeout=None, sleep_interval=0.1): - """ Waits for a process to close a file after opening it in write mode. """ - wait_for_inotify(file_path, - event_type='IN_CLOSE_WRITE', pre_hook=pre_hook, timeout=timeout, sleep_interval=sleep_interval) - -def is_admin() -> bool: - """Look if current user is in sudo group""" - from getpass import getuser - from grp import getgrnam - current_user = getuser() - (_, _, _, admin_group_members) = getgrnam('sudo') - return current_user in admin_group_members - -def mac2eui64(mac, prefix=None): - """ - Convert a MAC address to a EUI64 address or, with prefix provided, a full - IPv6 address. - Thankfully copied from https://gist.github.com/wido/f5e32576bb57b5cc6f934e177a37a0d3 - """ - import re - from ipaddress import ip_network - # http://tools.ietf.org/html/rfc4291#section-2.5.1 - eui64 = re.sub(r'[.:-]', '', mac).lower() - eui64 = eui64[0:6] + 'fffe' + eui64[6:] - eui64 = hex(int(eui64[0:2], 16) ^ 2)[2:].zfill(2) + eui64[2:] - - if prefix is None: - return ':'.join(re.findall(r'.{4}', eui64)) - else: - try: - net = ip_network(prefix, strict=False) - euil = int('0x{0}'.format(eui64), 16) - return str(net[euil]) - except: # pylint: disable=bare-except - return - -def get_half_cpus(): - """ return 1/2 of the numbers of available CPUs """ - cpu = os.cpu_count() - if cpu > 1: - cpu /= 2 - return int(cpu) - -def check_kmod(k_mod): - """ Common utility function to load required kernel modules on demand """ - from vyos import ConfigError - from vyos.utils.process import call - if isinstance(k_mod, str): - k_mod = k_mod.split() - for module in k_mod: - if not os.path.exists(f'/sys/module/{module}'): - if call(f'modprobe {module}') != 0: - raise ConfigError(f'Loading Kernel module {module} failed') - -def find_device_file(device): - """ Recurively search /dev for the given device file and return its full path. - If no device file was found 'None' is returned """ - from fnmatch import fnmatch - - for root, dirs, files in os.walk('/dev'): - for basename in files: - if fnmatch(basename, device): - return os.path.join(root, basename) - - return None - -def convert_data(data): - """Convert multiple types of data to types usable in CLI - - Args: - data (str | bytes | list | OrderedDict): input data - - Returns: - str | list | dict: converted data - """ - from base64 import b64encode - from collections import OrderedDict - - if isinstance(data, str): - return data - if isinstance(data, bytes): - try: - return data.decode() - except UnicodeDecodeError: - return b64encode(data).decode() - if isinstance(data, list): - list_tmp = [] - for item in data: - list_tmp.append(convert_data(item)) - return list_tmp - if isinstance(data, OrderedDict): - dict_tmp = {} - for key, value in data.items(): - dict_tmp[key] = convert_data(value) - return dict_tmp - -def begin(*args): - """ - Evaluate arguments in order and return the result of the *last* argument. - For combining multiple expressions in one statement. Useful for lambdas. - """ - return args[-1] - -def begin0(*args): - """ - Evaluate arguments in order and return the result of the *first* argument. - For combining multiple expressions in one statement. Useful for lambdas. - """ - return args[0] - -def install_into_config(conf, config_paths, override_prompt=True): - # Allows op-mode scripts to install values if called from an active config session - # config_paths: dict of config paths - # override_prompt: if True, user will be prompted before existing nodes are overwritten - if not config_paths: - return None - - from vyos.config import Config - from vyos.utils.io import ask_yes_no - from vyos.utils.process import cmd - if not Config().in_session(): - print('You are not in configure mode, commands to install manually from configure mode:') - for path in config_paths: - print(f'set {path}') - return None - - count = 0 - failed = [] - - for path in config_paths: - if override_prompt and conf.exists(path) and not conf.is_multi(path): - if not ask_yes_no(f'Config node "{node}" already exists. Do you want to overwrite it?'): - continue - - try: - cmd(f'/opt/vyatta/sbin/my_set {path}') - count += 1 - except: - failed.append(path) - - if failed: - print(f'Failed to install {len(failed)} value(s). Commands to manually install:') - for path in failed: - print(f'set {path}') - - if count > 0: - print(f'{count} value(s) installed. Use "compare" to see the pending changes, and "commit" to apply.') - -def load_as_module(name: str, path: str): - import importlib.util - - spec = importlib.util.spec_from_file_location(name, path) - mod = importlib.util.module_from_spec(spec) - spec.loader.exec_module(mod) - return mod -- cgit v1.2.3