diff options
author | Christian Breunig <christian@breunig.cc> | 2023-07-15 20:12:56 +0200 |
---|---|---|
committer | Christian Breunig <christian@breunig.cc> | 2023-07-15 20:13:12 +0200 |
commit | 5f77ccf91eb402c548fc91b2e080a4b2b86f4181 (patch) | |
tree | 9b926dedc07ef547ad8bbe539f89990249552414 /python/vyos/utils | |
parent | 9285b9a571ee944daf6f17847a62f115146834a4 (diff) | |
download | vyos-1x-5f77ccf91eb402c548fc91b2e080a4b2b86f4181.tar.gz vyos-1x-5f77ccf91eb402c548fc91b2e080a4b2b86f4181.zip |
T5195: vyos.util -> vyos.utils package refactoring part #2
Diffstat (limited to 'python/vyos/utils')
-rw-r--r-- | python/vyos/utils/__init__.py | 3 | ||||
-rw-r--r-- | python/vyos/utils/convert.py | 30 | ||||
-rw-r--r-- | python/vyos/utils/kernel.py | 25 | ||||
-rw-r--r-- | python/vyos/utils/list.py | 20 | ||||
-rw-r--r-- | python/vyos/utils/misc.py | 66 | ||||
-rw-r--r-- | python/vyos/utils/network.py | 24 | ||||
-rw-r--r-- | python/vyos/utils/permission.py | 15 | ||||
-rw-r--r-- | python/vyos/utils/system.py | 33 |
8 files changed, 211 insertions, 5 deletions
diff --git a/python/vyos/utils/__init__.py b/python/vyos/utils/__init__.py index abc9af5da..6cca4e935 100644 --- a/python/vyos/utils/__init__.py +++ b/python/vyos/utils/__init__.py @@ -19,6 +19,9 @@ from vyos.utils import convert from vyos.utils import dict from vyos.utils import file from vyos.utils import io +from vyos.utils import kernel +from vyos.utils import list +from vyos.utils import misc from vyos.utils import network from vyos.utils import permission from vyos.utils import process diff --git a/python/vyos/utils/convert.py b/python/vyos/utils/convert.py index 975c67e0a..ec2333ef0 100644 --- a/python/vyos/utils/convert.py +++ b/python/vyos/utils/convert.py @@ -143,3 +143,33 @@ def mac_to_eui64(mac, prefix=None): return str(net[euil]) except: # pylint: disable=bare-except return + +def convert_data(data): + """Convert multiple types of data to types usable in CLI + + Args: + data (str | bytes | list | OrderedDict): input data + + Returns: + str | list | dict: converted data + """ + from base64 import b64encode + from collections import OrderedDict + + if isinstance(data, str): + return data + if isinstance(data, bytes): + try: + return data.decode() + except UnicodeDecodeError: + return b64encode(data).decode() + if isinstance(data, list): + list_tmp = [] + for item in data: + list_tmp.append(convert_data(item)) + return list_tmp + if isinstance(data, OrderedDict): + dict_tmp = {} + for key, value in data.items(): + dict_tmp[key] = convert_data(value) + return dict_tmp diff --git a/python/vyos/utils/kernel.py b/python/vyos/utils/kernel.py new file mode 100644 index 000000000..d950b8e75 --- /dev/null +++ b/python/vyos/utils/kernel.py @@ -0,0 +1,25 @@ +# Copyright 2023 VyOS maintainers and contributors <maintainers@vyos.io> +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library. If not, see <http://www.gnu.org/licenses/>. + +def check_kmod(k_mod): + """ Common utility function to load required kernel modules on demand """ + from vyos import ConfigError + from vyos.utils.process import call + if isinstance(k_mod, str): + k_mod = k_mod.split() + for module in k_mod: + if not os.path.exists(f'/sys/module/{module}'): + if call(f'modprobe {module}') != 0: + raise ConfigError(f'Loading Kernel module {module} failed') diff --git a/python/vyos/utils/list.py b/python/vyos/utils/list.py new file mode 100644 index 000000000..63ef720ab --- /dev/null +++ b/python/vyos/utils/list.py @@ -0,0 +1,20 @@ +# Copyright 2023 VyOS maintainers and contributors <maintainers@vyos.io> +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library. If not, see <http://www.gnu.org/licenses/>. + +def is_list_equal(first: list, second: list) -> bool: + """ Check if 2 lists are equal and list not empty """ + if len(first) != len(second) or len(first) == 0: + return False + return sorted(first) == sorted(second) diff --git a/python/vyos/utils/misc.py b/python/vyos/utils/misc.py new file mode 100644 index 000000000..d82655914 --- /dev/null +++ b/python/vyos/utils/misc.py @@ -0,0 +1,66 @@ +# Copyright 2023 VyOS maintainers and contributors <maintainers@vyos.io> +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library. If not, see <http://www.gnu.org/licenses/>. + +def begin(*args): + """ + Evaluate arguments in order and return the result of the *last* argument. + For combining multiple expressions in one statement. Useful for lambdas. + """ + return args[-1] + +def begin0(*args): + """ + Evaluate arguments in order and return the result of the *first* argument. + For combining multiple expressions in one statement. Useful for lambdas. + """ + return args[0] + +def install_into_config(conf, config_paths, override_prompt=True): + # Allows op-mode scripts to install values if called from an active config session + # config_paths: dict of config paths + # override_prompt: if True, user will be prompted before existing nodes are overwritten + if not config_paths: + return None + + from vyos.config import Config + from vyos.utils.io import ask_yes_no + from vyos.utils.process import cmd + if not Config().in_session(): + print('You are not in configure mode, commands to install manually from configure mode:') + for path in config_paths: + print(f'set {path}') + return None + + count = 0 + failed = [] + + for path in config_paths: + if override_prompt and conf.exists(path) and not conf.is_multi(path): + if not ask_yes_no(f'Config node "{node}" already exists. Do you want to overwrite it?'): + continue + + try: + cmd(f'/opt/vyatta/sbin/my_set {path}') + count += 1 + except: + failed.append(path) + + if failed: + print(f'Failed to install {len(failed)} value(s). Commands to manually install:') + for path in failed: + print(f'set {path}') + + if count > 0: + print(f'{count} value(s) installed. Use "compare" to see the pending changes, and "commit" to apply.') diff --git a/python/vyos/utils/network.py b/python/vyos/utils/network.py index 8db598f05..3786caf26 100644 --- a/python/vyos/utils/network.py +++ b/python/vyos/utils/network.py @@ -154,7 +154,6 @@ def mac2eui64(mac, prefix=None): except: # pylint: disable=bare-except return - def check_port_availability(ipaddress, port, protocol): """ Check if port is available and not used by any service @@ -189,3 +188,26 @@ def check_port_availability(ipaddress, port, protocol): return False return True + +def is_listen_port_bind_service(port: int, service: str) -> bool: + """Check if listen port bound to expected program name + :param port: Bind port + :param service: Program name + :return: bool + + Example: + % is_listen_port_bind_service(443, 'nginx') + True + % is_listen_port_bind_service(443, 'ocserv-main') + False + """ + from psutil import net_connections as connections + from psutil import Process as process + for connection in connections(): + addr = connection.laddr + pid = connection.pid + pid_name = process(pid).name() + pid_port = addr.port + if service == pid_name and port == pid_port: + return True + return False diff --git a/python/vyos/utils/permission.py b/python/vyos/utils/permission.py index 8c2d72b83..d938b494f 100644 --- a/python/vyos/utils/permission.py +++ b/python/vyos/utils/permission.py @@ -61,3 +61,18 @@ def chmod_755(path): bitmask = S_IRUSR | S_IWUSR | S_IXUSR | S_IRGRP | S_IXGRP | \ S_IROTH | S_IXOTH chmod(path, bitmask) + +def is_admin() -> bool: + """Look if current user is in sudo group""" + from getpass import getuser + from grp import getgrnam + current_user = getuser() + (_, _, _, admin_group_members) = getgrnam('sudo') + return current_user in admin_group_members + +def get_cfg_group_id(): + from grp import getgrnam + from vyos.defaults import cfg_group + + group_data = getgrnam(cfg_group) + return group_data.gr_gid diff --git a/python/vyos/utils/system.py b/python/vyos/utils/system.py index 7102d5985..5d41c0c05 100644 --- a/python/vyos/utils/system.py +++ b/python/vyos/utils/system.py @@ -13,9 +13,9 @@ # You should have received a copy of the GNU Lesser General Public # License along with this library. If not, see <http://www.gnu.org/licenses/>. +import os from subprocess import run - def sysctl_read(name: str) -> str: """Read and return current value of sysctl() option @@ -28,7 +28,6 @@ def sysctl_read(name: str) -> str: tmp = run(['sysctl', '-nb', name], capture_output=True) return tmp.stdout.decode() - def sysctl_write(name: str, value: str | int) -> bool: """Change value via sysctl() @@ -56,13 +55,12 @@ def sysctl_write(name: str, value: str | int) -> bool: # False in other cases return False - def sysctl_apply(sysctl_dict: dict[str, str], revert: bool = True) -> bool: """Apply sysctl values. Args: sysctl_dict (dict[str, str]): dictionary with sysctl keys with values - revert (bool, optional): Revert to original values if new were not + revert (bool, optional): Revert to original values if new were not applied. Defaults to True. Returns: @@ -80,3 +78,30 @@ def sysctl_apply(sysctl_dict: dict[str, str], revert: bool = True) -> bool: return False # everything applied return True + +def get_half_cpus(): + """ return 1/2 of the numbers of available CPUs """ + cpu = os.cpu_count() + if cpu > 1: + cpu /= 2 + return int(cpu) + +def find_device_file(device): + """ Recurively search /dev for the given device file and return its full path. + If no device file was found 'None' is returned """ + from fnmatch import fnmatch + + for root, dirs, files in os.walk('/dev'): + for basename in files: + if fnmatch(basename, device): + return os.path.join(root, basename) + + return None + +def load_as_module(name: str, path: str): + import importlib.util + + spec = importlib.util.spec_from_file_location(name, path) + mod = importlib.util.module_from_spec(spec) + spec.loader.exec_module(mod) + return mod |