diff options
Diffstat (limited to 'python/vyos/utils')
| -rw-r--r-- | python/vyos/utils/__init__.py | 30 | ||||
| -rw-r--r-- | python/vyos/utils/assertion.py | 81 | ||||
| -rw-r--r-- | python/vyos/utils/auth.py | 41 | ||||
| -rw-r--r-- | python/vyos/utils/boot.py | 35 | ||||
| -rw-r--r-- | python/vyos/utils/commit.py | 60 | ||||
| -rw-r--r-- | python/vyos/utils/convert.py | 52 | ||||
| -rw-r--r-- | python/vyos/utils/dict.py | 101 | ||||
| -rw-r--r-- | python/vyos/utils/file.py | 12 | ||||
| -rw-r--r-- | python/vyos/utils/kernel.py | 38 | ||||
| -rw-r--r-- | python/vyos/utils/list.py | 20 | ||||
| -rw-r--r-- | python/vyos/utils/misc.py | 66 | ||||
| -rw-r--r-- | python/vyos/utils/network.py | 417 | ||||
| -rw-r--r-- | python/vyos/utils/permission.py | 78 | ||||
| -rw-r--r-- | python/vyos/utils/process.py | 232 | ||||
| -rw-r--r-- | python/vyos/utils/system.py | 107 |
15 files changed, 1345 insertions, 25 deletions
diff --git a/python/vyos/utils/__init__.py b/python/vyos/utils/__init__.py index e69de29bb..12ef2d3b8 100644 --- a/python/vyos/utils/__init__.py +++ b/python/vyos/utils/__init__.py @@ -0,0 +1,30 @@ +# Copyright 2023 VyOS maintainers and contributors <maintainers@vyos.io> +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library. If not, see <http://www.gnu.org/licenses/>. + +from vyos.utils import assertion +from vyos.utils import auth +from vyos.utils import boot +from vyos.utils import commit +from vyos.utils import convert +from vyos.utils import dict +from vyos.utils import file +from vyos.utils import io +from vyos.utils import kernel +from vyos.utils import list +from vyos.utils import misc +from vyos.utils import network +from vyos.utils import permission +from vyos.utils import process +from vyos.utils import system diff --git a/python/vyos/utils/assertion.py b/python/vyos/utils/assertion.py new file mode 100644 index 000000000..1aaa54dff --- /dev/null +++ b/python/vyos/utils/assertion.py @@ -0,0 +1,81 @@ +# Copyright 2023 VyOS maintainers and contributors <maintainers@vyos.io> +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library. If not, see <http://www.gnu.org/licenses/>. + +def assert_boolean(b): + if int(b) not in (0, 1): + raise ValueError(f'Value {b} out of range') + +def assert_range(value, lower=0, count=3): + if int(value, 16) not in range(lower, lower+count): + raise ValueError("Value out of range") + +def assert_list(s, l): + if s not in l: + o = ' or '.join([f'"{n}"' for n in l]) + raise ValueError(f'state must be {o}, got {s}') + +def assert_number(n): + if not str(n).isnumeric(): + raise ValueError(f'{n} must be a number') + +def assert_positive(n, smaller=0): + assert_number(n) + if int(n) < smaller: + raise ValueError(f'{n} is smaller than {smaller}') + +def assert_mtu(mtu, ifname): + assert_number(mtu) + + import json + from vyos.utils.process import cmd + out = cmd(f'ip -j -d link show dev {ifname}') + # [{"ifindex":2,"ifname":"eth0","flags":["BROADCAST","MULTICAST","UP","LOWER_UP"],"mtu":1500,"qdisc":"pfifo_fast","operstate":"UP","linkmode":"DEFAULT","group":"default","txqlen":1000,"link_type":"ether","address":"08:00:27:d9:5b:04","broadcast":"ff:ff:ff:ff:ff:ff","promiscuity":0,"min_mtu":46,"max_mtu":16110,"inet6_addr_gen_mode":"none","num_tx_queues":1,"num_rx_queues":1,"gso_max_size":65536,"gso_max_segs":65535}] + parsed = json.loads(out)[0] + min_mtu = int(parsed.get('min_mtu', '0')) + # cur_mtu = parsed.get('mtu',0), + max_mtu = int(parsed.get('max_mtu', '0')) + cur_mtu = int(mtu) + + if (min_mtu and cur_mtu < min_mtu) or cur_mtu < 68: + raise ValueError(f'MTU is too small for interface "{ifname}": {mtu} < {min_mtu}') + if (max_mtu and cur_mtu > max_mtu) or cur_mtu > 65536: + raise ValueError(f'MTU is too small for interface "{ifname}": {mtu} > {max_mtu}') + +def assert_mac(m): + split = m.split(':') + size = len(split) + + # a mac address consits out of 6 octets + if size != 6: + raise ValueError(f'wrong number of MAC octets ({size}): {m}') + + octets = [] + try: + for octet in split: + octets.append(int(octet, 16)) + except ValueError: + raise ValueError(f'invalid hex number "{octet}" in : {m}') + + # validate against the first mac address byte if it's a multicast + # address + if octets[0] & 1: + raise ValueError(f'{m} is a multicast MAC address') + + # overall mac address is not allowed to be 00:00:00:00:00:00 + if sum(octets) == 0: + raise ValueError('00:00:00:00:00:00 is not a valid MAC address') + + if octets[:5] == (0, 0, 94, 0, 1): + raise ValueError(f'{m} is a VRRP MAC address') diff --git a/python/vyos/utils/auth.py b/python/vyos/utils/auth.py new file mode 100644 index 000000000..a59858d72 --- /dev/null +++ b/python/vyos/utils/auth.py @@ -0,0 +1,41 @@ +# authutils -- miscelanneous functions for handling passwords and publis keys +# +# Copyright (C) 2018 VyOS maintainers and contributors +# +# This library is free software; you can redistribute it and/or modify it under the terms of +# the GNU Lesser General Public License as published by the Free Software Foundation; +# either version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; +# without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. +# See the GNU Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public License along with this library; +# if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + +import re + +from vyos.utils.process import cmd + + +def make_password_hash(password): + """ Makes a password hash for /etc/shadow using mkpasswd """ + + mkpassword = 'mkpasswd --method=sha-512 --stdin' + return cmd(mkpassword, input=password, timeout=5) + +def split_ssh_public_key(key_string, defaultname=""): + """ Splits an SSH public key into its components """ + + key_string = key_string.strip() + parts = re.split(r'\s+', key_string) + + if len(parts) == 3: + key_type, key_data, key_name = parts[0], parts[1], parts[2] + else: + key_type, key_data, key_name = parts[0], parts[1], defaultname + + if key_type not in ['ssh-rsa', 'ssh-dss', 'ecdsa-sha2-nistp256', 'ecdsa-sha2-nistp384', 'ecdsa-sha2-nistp521', 'ssh-ed25519']: + raise ValueError("Bad key type \'{0}\', must be one of must be one of ssh-rsa, ssh-dss, ecdsa-sha2-nistp<256|384|521> or ssh-ed25519".format(key_type)) + + return({"type": key_type, "data": key_data, "name": key_name}) diff --git a/python/vyos/utils/boot.py b/python/vyos/utils/boot.py new file mode 100644 index 000000000..3aecbec64 --- /dev/null +++ b/python/vyos/utils/boot.py @@ -0,0 +1,35 @@ +# Copyright 2023 VyOS maintainers and contributors <maintainers@vyos.io> +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library. If not, see <http://www.gnu.org/licenses/>. + +import os + +def boot_configuration_complete() -> bool: + """ Check if the boot config loader has completed + """ + from vyos.defaults import config_status + if os.path.isfile(config_status): + return True + return False + +def boot_configuration_success() -> bool: + from vyos.defaults import config_status + try: + with open(config_status) as f: + res = f.read().strip() + except FileNotFoundError: + return False + if int(res) == 0: + return True + return False diff --git a/python/vyos/utils/commit.py b/python/vyos/utils/commit.py new file mode 100644 index 000000000..105aed8c2 --- /dev/null +++ b/python/vyos/utils/commit.py @@ -0,0 +1,60 @@ +# Copyright 2023 VyOS maintainers and contributors <maintainers@vyos.io> +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library. If not, see <http://www.gnu.org/licenses/>. + +def commit_in_progress(): + """ Not to be used in normal op mode scripts! """ + + # The CStore backend locks the config by opening a file + # The file is not removed after commit, so just checking + # if it exists is insufficient, we need to know if it's open by anyone + + # There are two ways to check if any other process keeps a file open. + # The first one is to try opening it and see if the OS objects. + # That's faster but prone to race conditions and can be intrusive. + # The other one is to actually check if any process keeps it open. + # It's non-intrusive but needs root permissions, else you can't check + # processes of other users. + # + # Since this will be used in scripts that modify the config outside of the CLI + # framework, those knowingly have root permissions. + # For everything else, we add a safeguard. + from psutil import process_iter + from psutil import NoSuchProcess + from getpass import getuser + from vyos.defaults import commit_lock + + if getuser() != 'root': + raise OSError('This functions needs to be run as root to return correct results!') + + for proc in process_iter(): + try: + files = proc.open_files() + if files: + for f in files: + if f.path == commit_lock: + return True + except NoSuchProcess as err: + # Process died before we could examine it + pass + # Default case + return False + + +def wait_for_commit_lock(): + """ Not to be used in normal op mode scripts! """ + from time import sleep + # Very synchronous approach to multiprocessing + while commit_in_progress(): + sleep(1) diff --git a/python/vyos/utils/convert.py b/python/vyos/utils/convert.py index 975c67e0a..9a8a1ff7d 100644 --- a/python/vyos/utils/convert.py +++ b/python/vyos/utils/convert.py @@ -143,3 +143,55 @@ def mac_to_eui64(mac, prefix=None): return str(net[euil]) except: # pylint: disable=bare-except return + + +def convert_data(data) -> dict | list | tuple | str | int | float | bool | None: + """Filter and convert multiple types of data to types usable in CLI/API + + WARNING: Must not be used for anything except formatting output for API or CLI + + On the output allowed everything supported in JSON. + + Args: + data (Any): input data + + Returns: + dict | list | tuple | str | int | float | bool | None: converted data + """ + from base64 import b64encode + + # return original data for types which do not require conversion + if isinstance(data, str | int | float | bool | None): + return data + + if isinstance(data, list): + list_tmp = [] + for item in data: + list_tmp.append(convert_data(item)) + return list_tmp + + if isinstance(data, tuple): + list_tmp = list(data) + tuple_tmp = tuple(convert_data(list_tmp)) + return tuple_tmp + + if isinstance(data, bytes | bytearray): + try: + return data.decode() + except UnicodeDecodeError: + return b64encode(data).decode() + + if isinstance(data, set | frozenset): + list_tmp = convert_data(list(data)) + return list_tmp + + if isinstance(data, dict): + dict_tmp = {} + for key, value in data.items(): + dict_tmp[key] = convert_data(value) + return dict_tmp + + # do not return anything for other types + # which cannot be converted to JSON + # for example: complex | range | memoryview + return diff --git a/python/vyos/utils/dict.py b/python/vyos/utils/dict.py index 7c93deef6..9484eacdd 100644 --- a/python/vyos/utils/dict.py +++ b/python/vyos/utils/dict.py @@ -13,7 +13,6 @@ # You should have received a copy of the GNU Lesser General Public # License along with this library. If not, see <http://www.gnu.org/licenses/>. - def colon_separated_to_dict(data_string, uniquekeys=False): """ Converts a string containing newline-separated entries of colon-separated key-value pairs into a dict. @@ -65,7 +64,7 @@ def colon_separated_to_dict(data_string, uniquekeys=False): return data -def _mangle_dict_keys(data, regex, replacement, abs_path=[], no_tag_node_value_mangle=False, mod=0): +def mangle_dict_keys(data, regex, replacement, abs_path=None, no_tag_node_value_mangle=False): """ Mangles dict keys according to a regex and replacement character. Some libraries like Jinja2 do not like certain characters in dict keys. This function can be used for replacing all offending characters @@ -73,44 +72,39 @@ def _mangle_dict_keys(data, regex, replacement, abs_path=[], no_tag_node_value_m Args: data (dict): Original dict to mangle + regex, replacement (str): arguments to re.sub(regex, replacement, ...) + abs_path (list): if data is a config dict and no_tag_node_value_mangle is True + then abs_path should be the absolute config path to the first + keys of data, non-inclusive + no_tag_node_value_mangle (bool): do not mangle keys of tag node values Returns: dict """ - from vyos.xml import is_tag - - new_dict = {} + import re + from vyos.xml_ref import is_tag_value - for key in data.keys(): - save_mod = mod - save_path = abs_path[:] + if abs_path is None: + abs_path = [] - abs_path.append(key) + new_dict = type(data)() - if not is_tag(abs_path): - new_key = re.sub(regex, replacement, key) + for k in data.keys(): + if no_tag_node_value_mangle and is_tag_value(abs_path + [k]): + new_key = k else: - if mod%2: - new_key = key - else: - new_key = re.sub(regex, replacement, key) - if no_tag_node_value_mangle: - mod += 1 + new_key = re.sub(regex, replacement, k) - value = data[key] + value = data[k] if isinstance(value, dict): - new_dict[new_key] = _mangle_dict_keys(value, regex, replacement, abs_path=abs_path, mod=mod, no_tag_node_value_mangle=no_tag_node_value_mangle) + new_dict[new_key] = mangle_dict_keys(value, regex, replacement, + abs_path=abs_path + [k], + no_tag_node_value_mangle=no_tag_node_value_mangle) else: new_dict[new_key] = value - mod = save_mod - abs_path = save_path[:] - return new_dict -def mangle_dict_keys(data, regex, replacement, abs_path=[], no_tag_node_value_mangle=False): - return _mangle_dict_keys(data, regex, replacement, abs_path=abs_path, no_tag_node_value_mangle=no_tag_node_value_mangle, mod=0) - def _get_sub_dict(d, lpath): k = lpath[0] if k not in d.keys(): @@ -234,6 +228,27 @@ def dict_to_list(d, save_key_to=None): return collect +def dict_to_paths(d: dict) -> list: + """ Generator to return list of paths from dict of list[str]|str + """ + def func(d, path): + if isinstance(d, dict): + if not d: + yield path + for k, v in d.items(): + for r in func(v, path + [k]): + yield r + elif isinstance(d, list): + for i in d: + for r in func(i, path): + yield r + elif isinstance(d, str): + yield path + [d] + else: + raise ValueError('object is not a dict of strings/list of strings') + for r in func(d, []): + yield r + def check_mutually_exclusive_options(d, keys, required=False): """ Checks if a dict has at most one or only one of mutually exclusive keys. @@ -254,3 +269,39 @@ def check_mutually_exclusive_options(d, keys, required=False): if required and (len(present_keys) < 1): raise ValueError(f"At least one of the following options is required: {orig_keys}") + +class FixedDict(dict): + """ + FixedDict: A dictionnary not allowing new keys to be created after initialisation. + + >>> f = FixedDict(**{'count':1}) + >>> f['count'] = 2 + >>> f['king'] = 3 + File "...", line ..., in __setitem__ + raise ConfigError(f'Option "{k}" has no defined default') + """ + + from vyos import ConfigError + + def __init__(self, **options): + self._allowed = options.keys() + super().__init__(**options) + + def __setitem__(self, k, v): + """ + __setitem__ is a builtin which is called by python when setting dict values: + >>> d = dict() + >>> d['key'] = 'value' + >>> d + {'key': 'value'} + + is syntaxic sugar for + + >>> d = dict() + >>> d.__setitem__('key','value') + >>> d + {'key': 'value'} + """ + if k not in self._allowed: + raise ConfigError(f'Option "{k}" has no defined default') + super().__setitem__(k, v) diff --git a/python/vyos/utils/file.py b/python/vyos/utils/file.py index 2560a35be..667a2464b 100644 --- a/python/vyos/utils/file.py +++ b/python/vyos/utils/file.py @@ -14,7 +14,19 @@ # License along with this library. If not, see <http://www.gnu.org/licenses/>. import os +from vyos.utils.permission import chown +def makedir(path, user=None, group=None): + if os.path.exists(path): + return + os.makedirs(path, mode=0o755) + chown(path, user, group) + +def file_is_persistent(path): + import re + location = r'^(/config|/opt/vyatta/etc/config)' + absolute = os.path.abspath(os.path.dirname(path)) + return re.match(location,absolute) def read_file(fname, defaultonfailure=None): """ diff --git a/python/vyos/utils/kernel.py b/python/vyos/utils/kernel.py new file mode 100644 index 000000000..1f3bbdffe --- /dev/null +++ b/python/vyos/utils/kernel.py @@ -0,0 +1,38 @@ +# Copyright 2023 VyOS maintainers and contributors <maintainers@vyos.io> +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library. If not, see <http://www.gnu.org/licenses/>. + +import os + +def check_kmod(k_mod): + """ Common utility function to load required kernel modules on demand """ + from vyos import ConfigError + from vyos.utils.process import call + if isinstance(k_mod, str): + k_mod = k_mod.split() + for module in k_mod: + if not os.path.exists(f'/sys/module/{module}'): + if call(f'modprobe {module}') != 0: + raise ConfigError(f'Loading Kernel module {module} failed') + +def unload_kmod(k_mod): + """ Common utility function to unload required kernel modules on demand """ + from vyos import ConfigError + from vyos.utils.process import call + if isinstance(k_mod, str): + k_mod = k_mod.split() + for module in k_mod: + if os.path.exists(f'/sys/module/{module}'): + if call(f'rmmod {module}') != 0: + raise ConfigError(f'Unloading Kernel module {module} failed') diff --git a/python/vyos/utils/list.py b/python/vyos/utils/list.py new file mode 100644 index 000000000..63ef720ab --- /dev/null +++ b/python/vyos/utils/list.py @@ -0,0 +1,20 @@ +# Copyright 2023 VyOS maintainers and contributors <maintainers@vyos.io> +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library. If not, see <http://www.gnu.org/licenses/>. + +def is_list_equal(first: list, second: list) -> bool: + """ Check if 2 lists are equal and list not empty """ + if len(first) != len(second) or len(first) == 0: + return False + return sorted(first) == sorted(second) diff --git a/python/vyos/utils/misc.py b/python/vyos/utils/misc.py new file mode 100644 index 000000000..d82655914 --- /dev/null +++ b/python/vyos/utils/misc.py @@ -0,0 +1,66 @@ +# Copyright 2023 VyOS maintainers and contributors <maintainers@vyos.io> +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library. If not, see <http://www.gnu.org/licenses/>. + +def begin(*args): + """ + Evaluate arguments in order and return the result of the *last* argument. + For combining multiple expressions in one statement. Useful for lambdas. + """ + return args[-1] + +def begin0(*args): + """ + Evaluate arguments in order and return the result of the *first* argument. + For combining multiple expressions in one statement. Useful for lambdas. + """ + return args[0] + +def install_into_config(conf, config_paths, override_prompt=True): + # Allows op-mode scripts to install values if called from an active config session + # config_paths: dict of config paths + # override_prompt: if True, user will be prompted before existing nodes are overwritten + if not config_paths: + return None + + from vyos.config import Config + from vyos.utils.io import ask_yes_no + from vyos.utils.process import cmd + if not Config().in_session(): + print('You are not in configure mode, commands to install manually from configure mode:') + for path in config_paths: + print(f'set {path}') + return None + + count = 0 + failed = [] + + for path in config_paths: + if override_prompt and conf.exists(path) and not conf.is_multi(path): + if not ask_yes_no(f'Config node "{node}" already exists. Do you want to overwrite it?'): + continue + + try: + cmd(f'/opt/vyatta/sbin/my_set {path}') + count += 1 + except: + failed.append(path) + + if failed: + print(f'Failed to install {len(failed)} value(s). Commands to manually install:') + for path in failed: + print(f'set {path}') + + if count > 0: + print(f'{count} value(s) installed. Use "compare" to see the pending changes, and "commit" to apply.') diff --git a/python/vyos/utils/network.py b/python/vyos/utils/network.py new file mode 100644 index 000000000..2f181d8d9 --- /dev/null +++ b/python/vyos/utils/network.py @@ -0,0 +1,417 @@ +# Copyright 2023 VyOS maintainers and contributors <maintainers@vyos.io> +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library. If not, see <http://www.gnu.org/licenses/>. + +def _are_same_ip(one, two): + from socket import AF_INET + from socket import AF_INET6 + from socket import inet_pton + from vyos.template import is_ipv4 + # compare the binary representation of the IP + f_one = AF_INET if is_ipv4(one) else AF_INET6 + s_two = AF_INET if is_ipv4(two) else AF_INET6 + return inet_pton(f_one, one) == inet_pton(f_one, two) + +def get_protocol_by_name(protocol_name): + """Get protocol number by protocol name + + % get_protocol_by_name('tcp') + % 6 + """ + import socket + try: + protocol_number = socket.getprotobyname(protocol_name) + return protocol_number + except socket.error: + return protocol_name + +def interface_exists(interface) -> bool: + import os + return os.path.exists(f'/sys/class/net/{interface}') + +def interface_exists_in_netns(interface_name, netns): + from vyos.utils.process import rc_cmd + rc, out = rc_cmd(f'ip netns exec {netns} ip link show dev {interface_name}') + if rc == 0: + return True + return False + +def get_vrf_members(vrf: str) -> list: + """ + Get list of interface VRF members + :param vrf: str + :return: list + """ + import json + from vyos.utils.process import cmd + if not interface_exists(vrf): + raise ValueError(f'VRF "{vrf}" does not exist!') + output = cmd(f'ip --json --brief link show master {vrf}') + answer = json.loads(output) + interfaces = [] + for data in answer: + if 'ifname' in data: + interfaces.append(data.get('ifname')) + return interfaces + +def get_interface_vrf(interface): + """ Returns VRF of given interface """ + from vyos.utils.dict import dict_search + from vyos.utils.network import get_interface_config + tmp = get_interface_config(interface) + if dict_search('linkinfo.info_slave_kind', tmp) == 'vrf': + return tmp['master'] + return 'default' + +def get_interface_config(interface): + """ Returns the used encapsulation protocol for given interface. + If interface does not exist, None is returned. + """ + import os + if not os.path.exists(f'/sys/class/net/{interface}'): + return None + from json import loads + from vyos.utils.process import cmd + tmp = loads(cmd(f'ip --detail --json link show dev {interface}'))[0] + return tmp + +def get_interface_address(interface): + """ Returns the used encapsulation protocol for given interface. + If interface does not exist, None is returned. + """ + import os + if not os.path.exists(f'/sys/class/net/{interface}'): + return None + from json import loads + from vyos.utils.process import cmd + tmp = loads(cmd(f'ip --detail --json addr show dev {interface}'))[0] + return tmp + +def get_interface_namespace(iface): + """ + Returns wich netns the interface belongs to + """ + from json import loads + from vyos.utils.process import cmd + # Check if netns exist + tmp = loads(cmd(f'ip --json netns ls')) + if len(tmp) == 0: + return None + + for ns in tmp: + netns = f'{ns["name"]}' + # Search interface in each netns + data = loads(cmd(f'ip netns exec {netns} ip --json link show')) + for tmp in data: + if iface == tmp["ifname"]: + return netns + +def is_wwan_connected(interface): + """ Determine if a given WWAN interface, e.g. wwan0 is connected to the + carrier network or not """ + import json + from vyos.utils.process import cmd + + if not interface.startswith('wwan'): + raise ValueError(f'Specified interface "{interface}" is not a WWAN interface') + + # ModemManager is required for connection(s) - if service is not running, + # there won't be any connection at all! + if not is_systemd_service_active('ModemManager.service'): + return False + + modem = interface.lstrip('wwan') + + tmp = cmd(f'mmcli --modem {modem} --output-json') + tmp = json.loads(tmp) + + # return True/False if interface is in connected state + return dict_search('modem.generic.state', tmp) == 'connected' + +def get_bridge_fdb(interface): + """ Returns the forwarding database entries for a given interface """ + import os + if not os.path.exists(f'/sys/class/net/{interface}'): + return None + from json import loads + from vyos.utils.process import cmd + tmp = loads(cmd(f'bridge -j fdb show dev {interface}')) + return tmp + +def get_all_vrfs(): + """ Return a dictionary of all system wide known VRF instances """ + from json import loads + from vyos.utils.process import cmd + tmp = loads(cmd('ip --json vrf list')) + # Result is of type [{"name":"red","table":1000},{"name":"blue","table":2000}] + # so we will re-arrange it to a more nicer representation: + # {'red': {'table': 1000}, 'blue': {'table': 2000}} + data = {} + for entry in tmp: + name = entry.pop('name') + data[name] = entry + return data + +def mac2eui64(mac, prefix=None): + """ + Convert a MAC address to a EUI64 address or, with prefix provided, a full + IPv6 address. + Thankfully copied from https://gist.github.com/wido/f5e32576bb57b5cc6f934e177a37a0d3 + """ + import re + from ipaddress import ip_network + # http://tools.ietf.org/html/rfc4291#section-2.5.1 + eui64 = re.sub(r'[.:-]', '', mac).lower() + eui64 = eui64[0:6] + 'fffe' + eui64[6:] + eui64 = hex(int(eui64[0:2], 16) ^ 2)[2:].zfill(2) + eui64[2:] + + if prefix is None: + return ':'.join(re.findall(r'.{4}', eui64)) + else: + try: + net = ip_network(prefix, strict=False) + euil = int('0x{0}'.format(eui64), 16) + return str(net[euil]) + except: # pylint: disable=bare-except + return + +def check_port_availability(ipaddress, port, protocol): + """ + Check if port is available and not used by any service + Return False if a port is busy or IP address does not exists + Should be used carefully for services that can start listening + dynamically, because IP address may be dynamic too + """ + from socketserver import TCPServer, UDPServer + from ipaddress import ip_address + + # verify arguments + try: + ipaddress = ip_address(ipaddress).compressed + except: + raise ValueError(f'The {ipaddress} is not a valid IPv4 or IPv6 address') + if port not in range(1, 65536): + raise ValueError(f'The port number {port} is not in the 1-65535 range') + if protocol not in ['tcp', 'udp']: + raise ValueError(f'The protocol {protocol} is not supported. Only tcp and udp are allowed') + + # check port availability + try: + if protocol == 'tcp': + server = TCPServer((ipaddress, port), None, bind_and_activate=True) + if protocol == 'udp': + server = UDPServer((ipaddress, port), None, bind_and_activate=True) + server.server_close() + except Exception as e: + # errno.h: + #define EADDRINUSE 98 /* Address already in use */ + if e.errno == 98: + return False + + return True + +def is_listen_port_bind_service(port: int, service: str) -> bool: + """Check if listen port bound to expected program name + :param port: Bind port + :param service: Program name + :return: bool + + Example: + % is_listen_port_bind_service(443, 'nginx') + True + % is_listen_port_bind_service(443, 'ocserv-main') + False + """ + from psutil import net_connections as connections + from psutil import Process as process + for connection in connections(): + addr = connection.laddr + pid = connection.pid + pid_name = process(pid).name() + pid_port = addr.port + if service == pid_name and port == pid_port: + return True + return False + +def is_ipv6_link_local(addr): + """ Check if addrsss is an IPv6 link-local address. Returns True/False """ + from ipaddress import ip_interface + from vyos.template import is_ipv6 + addr = addr.split('%')[0] + if is_ipv6(addr): + if ip_interface(addr).is_link_local: + return True + + return False + +def is_addr_assigned(ip_address, vrf=None) -> bool: + """ Verify if the given IPv4/IPv6 address is assigned to any interface """ + from netifaces import interfaces + from vyos.utils.network import get_interface_config + from vyos.utils.dict import dict_search + + for interface in interfaces(): + # Check if interface belongs to the requested VRF, if this is not the + # case there is no need to proceed with this data set - continue loop + # with next element + tmp = get_interface_config(interface) + if dict_search('master', tmp) != vrf: + continue + + if is_intf_addr_assigned(interface, ip_address): + return True + + return False + +def is_intf_addr_assigned(intf, address) -> bool: + """ + Verify if the given IPv4/IPv6 address is assigned to specific interface. + It can check both a single IP address (e.g. 192.0.2.1 or a assigned CIDR + address 192.0.2.1/24. + """ + from vyos.template import is_ipv4 + + from netifaces import ifaddresses + from netifaces import AF_INET + from netifaces import AF_INET6 + + # check if the requested address type is configured at all + # { + # 17: [{'addr': '08:00:27:d9:5b:04', 'broadcast': 'ff:ff:ff:ff:ff:ff'}], + # 2: [{'addr': '10.0.2.15', 'netmask': '255.255.255.0', 'broadcast': '10.0.2.255'}], + # 10: [{'addr': 'fe80::a00:27ff:fed9:5b04%eth0', 'netmask': 'ffff:ffff:ffff:ffff::'}] + # } + try: + addresses = ifaddresses(intf) + except ValueError as e: + print(e) + return False + + # determine IP version (AF_INET or AF_INET6) depending on passed address + addr_type = AF_INET if is_ipv4(address) else AF_INET6 + + # Check every IP address on this interface for a match + netmask = None + if '/' in address: + address, netmask = address.split('/') + for ip in addresses.get(addr_type, []): + # ip can have the interface name in the 'addr' field, we need to remove it + # {'addr': 'fe80::a00:27ff:fec5:f821%eth2', 'netmask': 'ffff:ffff:ffff:ffff::'} + ip_addr = ip['addr'].split('%')[0] + + if not _are_same_ip(address, ip_addr): + continue + + # we do not have a netmask to compare against, they are the same + if not netmask: + return True + + prefixlen = '' + if is_ipv4(ip_addr): + prefixlen = sum([bin(int(_)).count('1') for _ in ip['netmask'].split('.')]) + else: + prefixlen = sum([bin(int(_,16)).count('1') for _ in ip['netmask'].split('/')[0].split(':') if _]) + + if str(prefixlen) == netmask: + return True + + return False + +def is_loopback_addr(addr): + """ Check if supplied IPv4/IPv6 address is a loopback address """ + from ipaddress import ip_address + return ip_address(addr).is_loopback + +def is_wireguard_key_pair(private_key: str, public_key:str) -> bool: + """ + Checks if public/private keys are keypair + :param private_key: Wireguard private key + :type private_key: str + :param public_key: Wireguard public key + :type public_key: str + :return: If public/private keys are keypair returns True else False + :rtype: bool + """ + from vyos.utils.process import cmd + gen_public_key = cmd('wg pubkey', input=private_key) + if gen_public_key == public_key: + return True + else: + return False + +def is_subnet_connected(subnet, primary=False): + """ + Verify is the given IPv4/IPv6 subnet is connected to any interface on this + system. + + primary check if the subnet is reachable via the primary IP address of this + interface, or in other words has a broadcast address configured. ISC DHCP + for instance will complain if it should listen on non broadcast interfaces. + + Return True/False + """ + from ipaddress import ip_address + from ipaddress import ip_network + + from netifaces import ifaddresses + from netifaces import interfaces + from netifaces import AF_INET + from netifaces import AF_INET6 + + from vyos.template import is_ipv6 + + # determine IP version (AF_INET or AF_INET6) depending on passed address + addr_type = AF_INET + if is_ipv6(subnet): + addr_type = AF_INET6 + + for interface in interfaces(): + # check if the requested address type is configured at all + if addr_type not in ifaddresses(interface).keys(): + continue + + # An interface can have multiple addresses, but some software components + # only support the primary address :( + if primary: + ip = ifaddresses(interface)[addr_type][0]['addr'] + if ip_address(ip) in ip_network(subnet): + return True + else: + # Check every assigned IP address if it is connected to the subnet + # in question + for ip in ifaddresses(interface)[addr_type]: + # remove interface extension (e.g. %eth0) that gets thrown on the end of _some_ addrs + addr = ip['addr'].split('%')[0] + if ip_address(addr) in ip_network(subnet): + return True + + return False + +def is_afi_configured(interface, afi): + """ Check if given address family is configured, or in other words - an IP + address is assigned to the interface. """ + from netifaces import ifaddresses + from netifaces import AF_INET + from netifaces import AF_INET6 + + if afi not in [AF_INET, AF_INET6]: + raise ValueError('Address family must be in [AF_INET, AF_INET6]') + + try: + addresses = ifaddresses(interface) + except ValueError as e: + print(e) + return False + + return afi in addresses diff --git a/python/vyos/utils/permission.py b/python/vyos/utils/permission.py new file mode 100644 index 000000000..d938b494f --- /dev/null +++ b/python/vyos/utils/permission.py @@ -0,0 +1,78 @@ +# Copyright 2023 VyOS maintainers and contributors <maintainers@vyos.io> +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library. If not, see <http://www.gnu.org/licenses/>. + +import os + +def chown(path, user, group): + """ change file/directory owner """ + from pwd import getpwnam + from grp import getgrnam + + if user is None or group is None: + return False + + # path may also be an open file descriptor + if not isinstance(path, int) and not os.path.exists(path): + return False + + uid = getpwnam(user).pw_uid + gid = getgrnam(group).gr_gid + os.chown(path, uid, gid) + return True + +def chmod(path, bitmask): + # path may also be an open file descriptor + if not isinstance(path, int) and not os.path.exists(path): + return + if bitmask is None: + return + os.chmod(path, bitmask) + +def chmod_600(path): + """ make file only read/writable by owner """ + from stat import S_IRUSR, S_IWUSR + + bitmask = S_IRUSR | S_IWUSR + chmod(path, bitmask) + +def chmod_750(path): + """ make file/directory only executable to user and group """ + from stat import S_IRUSR, S_IWUSR, S_IXUSR, S_IRGRP, S_IXGRP + + bitmask = S_IRUSR | S_IWUSR | S_IXUSR | S_IRGRP | S_IXGRP + chmod(path, bitmask) + +def chmod_755(path): + """ make file executable by all """ + from stat import S_IRUSR, S_IWUSR, S_IXUSR, S_IRGRP, S_IXGRP, S_IROTH, S_IXOTH + + bitmask = S_IRUSR | S_IWUSR | S_IXUSR | S_IRGRP | S_IXGRP | \ + S_IROTH | S_IXOTH + chmod(path, bitmask) + +def is_admin() -> bool: + """Look if current user is in sudo group""" + from getpass import getuser + from grp import getgrnam + current_user = getuser() + (_, _, _, admin_group_members) = getgrnam('sudo') + return current_user in admin_group_members + +def get_cfg_group_id(): + from grp import getgrnam + from vyos.defaults import cfg_group + + group_data = getgrnam(cfg_group) + return group_data.gr_gid diff --git a/python/vyos/utils/process.py b/python/vyos/utils/process.py new file mode 100644 index 000000000..e09c7d86d --- /dev/null +++ b/python/vyos/utils/process.py @@ -0,0 +1,232 @@ +# Copyright 2023 VyOS maintainers and contributors <maintainers@vyos.io> +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library. If not, see <http://www.gnu.org/licenses/>. + +import os + +from subprocess import Popen +from subprocess import PIPE +from subprocess import STDOUT +from subprocess import DEVNULL + +def popen(command, flag='', shell=None, input=None, timeout=None, env=None, + stdout=PIPE, stderr=PIPE, decode='utf-8'): + """ + popen is a wrapper helper aound subprocess.Popen + with it default setting it will return a tuple (out, err) + out: the output of the program run + err: the error code returned by the program + + it can be affected by the following flags: + shell: do not try to auto-detect if a shell is required + for example if a pipe (|) or redirection (>, >>) is used + input: data to sent to the child process via STDIN + the data should be bytes but string will be converted + timeout: time after which the command will be considered to have failed + env: mapping that defines the environment variables for the new process + stdout: define how the output of the program should be handled + - PIPE (default), sends stdout to the output + - DEVNULL, discard the output + stderr: define how the output of the program should be handled + - None (default), send/merge the data to/with stderr + - PIPE, popen will append it to output + - STDOUT, send the data to be merged with stdout + - DEVNULL, discard the output + decode: specify the expected text encoding (utf-8, ascii, ...) + the default is explicitely utf-8 which is python's own default + + usage: + get both stdout and stderr: popen('command', stdout=PIPE, stderr=STDOUT) + discard stdout and get stderr: popen('command', stdout=DEVNUL, stderr=PIPE) + """ + + # airbag must be left as an import in the function as otherwise we have a + # a circual import dependency + from vyos import debug + from vyos import airbag + + # log if the flag is set, otherwise log if command is set + if not debug.enabled(flag): + flag = 'command' + + cmd_msg = f"cmd '{command}'" + debug.message(cmd_msg, flag) + + use_shell = shell + stdin = None + if shell is None: + use_shell = False + if ' ' in command: + use_shell = True + if env: + use_shell = True + + if input: + stdin = PIPE + input = input.encode() if type(input) is str else input + + p = Popen(command, stdin=stdin, stdout=stdout, stderr=stderr, + env=env, shell=use_shell) + + pipe = p.communicate(input, timeout) + + pipe_out = b'' + if stdout == PIPE: + pipe_out = pipe[0] + + pipe_err = b'' + if stderr == PIPE: + pipe_err = pipe[1] + + str_out = pipe_out.decode(decode).replace('\r\n', '\n').strip() + str_err = pipe_err.decode(decode).replace('\r\n', '\n').strip() + + out_msg = f"returned (out):\n{str_out}" + if str_out: + debug.message(out_msg, flag) + + if str_err: + from sys import stderr + err_msg = f"returned (err):\n{str_err}" + # this message will also be send to syslog via airbag + debug.message(err_msg, flag, destination=stderr) + + # should something go wrong, report this too via airbag + airbag.noteworthy(cmd_msg) + airbag.noteworthy(out_msg) + airbag.noteworthy(err_msg) + + return str_out, p.returncode + + +def run(command, flag='', shell=None, input=None, timeout=None, env=None, + stdout=DEVNULL, stderr=PIPE, decode='utf-8'): + """ + A wrapper around popen, which discard the stdout and + will return the error code of a command + """ + _, code = popen( + command, flag, + stdout=stdout, stderr=stderr, + input=input, timeout=timeout, + env=env, shell=shell, + decode=decode, + ) + return code + + +def cmd(command, flag='', shell=None, input=None, timeout=None, env=None, + stdout=PIPE, stderr=PIPE, decode='utf-8', raising=None, message='', + expect=[0]): + """ + A wrapper around popen, which returns the stdout and + will raise the error code of a command + + raising: specify which call should be used when raising + the class should only require a string as parameter + (default is OSError) with the error code + expect: a list of error codes to consider as normal + """ + decoded, code = popen( + command, flag, + stdout=stdout, stderr=stderr, + input=input, timeout=timeout, + env=env, shell=shell, + decode=decode, + ) + if code not in expect: + feedback = message + '\n' if message else '' + feedback += f'failed to run command: {command}\n' + feedback += f'returned: {decoded}\n' + feedback += f'exit code: {code}' + if raising is None: + # error code can be recovered with .errno + raise OSError(code, feedback) + else: + raise raising(feedback) + return decoded + + +def rc_cmd(command, flag='', shell=None, input=None, timeout=None, env=None, + stdout=PIPE, stderr=STDOUT, decode='utf-8'): + """ + A wrapper around popen, which returns the return code + of a command and stdout + + % rc_cmd('uname') + (0, 'Linux') + % rc_cmd('ip link show dev eth99') + (1, 'Device "eth99" does not exist.') + """ + out, code = popen( + command, flag, + stdout=stdout, stderr=stderr, + input=input, timeout=timeout, + env=env, shell=shell, + decode=decode, + ) + return code, out + +def call(command, flag='', shell=None, input=None, timeout=None, env=None, + stdout=None, stderr=None, decode='utf-8'): + """ + A wrapper around popen, which print the stdout and + will return the error code of a command + """ + out, code = popen( + command, flag, + stdout=stdout, stderr=stderr, + input=input, timeout=timeout, + env=env, shell=shell, + decode=decode, + ) + if out: + print(out) + return code + +def process_running(pid_file): + """ Checks if a process with PID in pid_file is running """ + from psutil import pid_exists + if not os.path.isfile(pid_file): + return False + with open(pid_file, 'r') as f: + pid = f.read().strip() + return pid_exists(int(pid)) + +def process_named_running(name, cmdline: str=None): + """ Checks if process with given name is running and returns its PID. + If Process is not running, return None + """ + from psutil import process_iter + for p in process_iter(['name', 'pid', 'cmdline']): + if cmdline: + if p.info['name'] == name and cmdline in p.info['cmdline']: + return p.info['pid'] + elif p.info['name'] == name: + return p.info['pid'] + return None + +def is_systemd_service_active(service): + """ Test is a specified systemd service is activated. + Returns True if service is active, false otherwise. + Copied from: https://unix.stackexchange.com/a/435317 """ + tmp = cmd(f'systemctl show --value -p ActiveState {service}') + return bool((tmp == 'active')) + +def is_systemd_service_running(service): + """ Test is a specified systemd service is actually running. + Returns True if service is running, false otherwise. + Copied from: https://unix.stackexchange.com/a/435317 """ + tmp = cmd(f'systemctl show --value -p SubState {service}') + return bool((tmp == 'running')) diff --git a/python/vyos/utils/system.py b/python/vyos/utils/system.py new file mode 100644 index 000000000..5d41c0c05 --- /dev/null +++ b/python/vyos/utils/system.py @@ -0,0 +1,107 @@ +# Copyright 2023 VyOS maintainers and contributors <maintainers@vyos.io> +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library. If not, see <http://www.gnu.org/licenses/>. + +import os +from subprocess import run + +def sysctl_read(name: str) -> str: + """Read and return current value of sysctl() option + + Args: + name (str): sysctl key name + + Returns: + str: sysctl key value + """ + tmp = run(['sysctl', '-nb', name], capture_output=True) + return tmp.stdout.decode() + +def sysctl_write(name: str, value: str | int) -> bool: + """Change value via sysctl() + + Args: + name (str): sysctl key name + value (str | int): sysctl key value + + Returns: + bool: True if changed, False otherwise + """ + # convert other types to string before comparison + if not isinstance(value, str): + value = str(value) + # do not change anything if a value is already configured + if sysctl_read(name) == value: + return True + # return False if sysctl call failed + if run(['sysctl', '-wq', f'{name}={value}']).returncode != 0: + return False + # compare old and new values + # sysctl may apply value, but its actual value will be + # different from requested + if sysctl_read(name) == value: + return True + # False in other cases + return False + +def sysctl_apply(sysctl_dict: dict[str, str], revert: bool = True) -> bool: + """Apply sysctl values. + + Args: + sysctl_dict (dict[str, str]): dictionary with sysctl keys with values + revert (bool, optional): Revert to original values if new were not + applied. Defaults to True. + + Returns: + bool: True if all params configured properly, False in other cases + """ + # get current values + sysctl_original: dict[str, str] = {} + for key_name in sysctl_dict.keys(): + sysctl_original[key_name] = sysctl_read(key_name) + # apply new values and revert in case one of them was not applied + for key_name, value in sysctl_dict.items(): + if not sysctl_write(key_name, value): + if revert: + sysctl_apply(sysctl_original, revert=False) + return False + # everything applied + return True + +def get_half_cpus(): + """ return 1/2 of the numbers of available CPUs """ + cpu = os.cpu_count() + if cpu > 1: + cpu /= 2 + return int(cpu) + +def find_device_file(device): + """ Recurively search /dev for the given device file and return its full path. + If no device file was found 'None' is returned """ + from fnmatch import fnmatch + + for root, dirs, files in os.walk('/dev'): + for basename in files: + if fnmatch(basename, device): + return os.path.join(root, basename) + + return None + +def load_as_module(name: str, path: str): + import importlib.util + + spec = importlib.util.spec_from_file_location(name, path) + mod = importlib.util.module_from_spec(spec) + spec.loader.exec_module(mod) + return mod |
