diff options
Diffstat (limited to 'python/vyos/utils')
-rw-r--r-- | python/vyos/utils/__init__.py | 13 | ||||
-rw-r--r-- | python/vyos/utils/auth.py | 41 | ||||
-rw-r--r-- | python/vyos/utils/boot.py | 35 | ||||
-rw-r--r-- | python/vyos/utils/commit.py | 60 | ||||
-rw-r--r-- | python/vyos/utils/convert.py | 30 | ||||
-rw-r--r-- | python/vyos/utils/dict.py | 39 | ||||
-rw-r--r-- | python/vyos/utils/file.py | 12 | ||||
-rw-r--r-- | python/vyos/utils/kernel.py | 27 | ||||
-rw-r--r-- | python/vyos/utils/list.py | 20 | ||||
-rw-r--r-- | python/vyos/utils/misc.py | 66 | ||||
-rw-r--r-- | python/vyos/utils/network.py | 185 | ||||
-rw-r--r-- | python/vyos/utils/permission.py | 78 | ||||
-rw-r--r-- | python/vyos/utils/process.py | 232 | ||||
-rw-r--r-- | python/vyos/utils/system.py | 107 |
14 files changed, 942 insertions, 3 deletions
diff --git a/python/vyos/utils/__init__.py b/python/vyos/utils/__init__.py index 0d3998053..f2783113a 100644 --- a/python/vyos/utils/__init__.py +++ b/python/vyos/utils/__init__.py @@ -13,4 +13,17 @@ # You should have received a copy of the GNU Lesser General Public # License along with this library. If not, see <http://www.gnu.org/licenses/>. +from vyos.utils import auth +from vyos.utils import boot +from vyos.utils import commit +from vyos.utils import convert +from vyos.utils import dict +from vyos.utils import file +from vyos.utils import io +from vyos.utils import kernel +from vyos.utils import list +from vyos.utils import misc from vyos.utils import network +from vyos.utils import permission +from vyos.utils import process +from vyos.utils import system diff --git a/python/vyos/utils/auth.py b/python/vyos/utils/auth.py new file mode 100644 index 000000000..a59858d72 --- /dev/null +++ b/python/vyos/utils/auth.py @@ -0,0 +1,41 @@ +# authutils -- miscelanneous functions for handling passwords and publis keys +# +# Copyright (C) 2018 VyOS maintainers and contributors +# +# This library is free software; you can redistribute it and/or modify it under the terms of +# the GNU Lesser General Public License as published by the Free Software Foundation; +# either version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; +# without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. +# See the GNU Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public License along with this library; +# if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + +import re + +from vyos.utils.process import cmd + + +def make_password_hash(password): + """ Makes a password hash for /etc/shadow using mkpasswd """ + + mkpassword = 'mkpasswd --method=sha-512 --stdin' + return cmd(mkpassword, input=password, timeout=5) + +def split_ssh_public_key(key_string, defaultname=""): + """ Splits an SSH public key into its components """ + + key_string = key_string.strip() + parts = re.split(r'\s+', key_string) + + if len(parts) == 3: + key_type, key_data, key_name = parts[0], parts[1], parts[2] + else: + key_type, key_data, key_name = parts[0], parts[1], defaultname + + if key_type not in ['ssh-rsa', 'ssh-dss', 'ecdsa-sha2-nistp256', 'ecdsa-sha2-nistp384', 'ecdsa-sha2-nistp521', 'ssh-ed25519']: + raise ValueError("Bad key type \'{0}\', must be one of must be one of ssh-rsa, ssh-dss, ecdsa-sha2-nistp<256|384|521> or ssh-ed25519".format(key_type)) + + return({"type": key_type, "data": key_data, "name": key_name}) diff --git a/python/vyos/utils/boot.py b/python/vyos/utils/boot.py new file mode 100644 index 000000000..3aecbec64 --- /dev/null +++ b/python/vyos/utils/boot.py @@ -0,0 +1,35 @@ +# Copyright 2023 VyOS maintainers and contributors <maintainers@vyos.io> +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library. If not, see <http://www.gnu.org/licenses/>. + +import os + +def boot_configuration_complete() -> bool: + """ Check if the boot config loader has completed + """ + from vyos.defaults import config_status + if os.path.isfile(config_status): + return True + return False + +def boot_configuration_success() -> bool: + from vyos.defaults import config_status + try: + with open(config_status) as f: + res = f.read().strip() + except FileNotFoundError: + return False + if int(res) == 0: + return True + return False diff --git a/python/vyos/utils/commit.py b/python/vyos/utils/commit.py new file mode 100644 index 000000000..105aed8c2 --- /dev/null +++ b/python/vyos/utils/commit.py @@ -0,0 +1,60 @@ +# Copyright 2023 VyOS maintainers and contributors <maintainers@vyos.io> +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library. If not, see <http://www.gnu.org/licenses/>. + +def commit_in_progress(): + """ Not to be used in normal op mode scripts! """ + + # The CStore backend locks the config by opening a file + # The file is not removed after commit, so just checking + # if it exists is insufficient, we need to know if it's open by anyone + + # There are two ways to check if any other process keeps a file open. + # The first one is to try opening it and see if the OS objects. + # That's faster but prone to race conditions and can be intrusive. + # The other one is to actually check if any process keeps it open. + # It's non-intrusive but needs root permissions, else you can't check + # processes of other users. + # + # Since this will be used in scripts that modify the config outside of the CLI + # framework, those knowingly have root permissions. + # For everything else, we add a safeguard. + from psutil import process_iter + from psutil import NoSuchProcess + from getpass import getuser + from vyos.defaults import commit_lock + + if getuser() != 'root': + raise OSError('This functions needs to be run as root to return correct results!') + + for proc in process_iter(): + try: + files = proc.open_files() + if files: + for f in files: + if f.path == commit_lock: + return True + except NoSuchProcess as err: + # Process died before we could examine it + pass + # Default case + return False + + +def wait_for_commit_lock(): + """ Not to be used in normal op mode scripts! """ + from time import sleep + # Very synchronous approach to multiprocessing + while commit_in_progress(): + sleep(1) diff --git a/python/vyos/utils/convert.py b/python/vyos/utils/convert.py index 975c67e0a..ec2333ef0 100644 --- a/python/vyos/utils/convert.py +++ b/python/vyos/utils/convert.py @@ -143,3 +143,33 @@ def mac_to_eui64(mac, prefix=None): return str(net[euil]) except: # pylint: disable=bare-except return + +def convert_data(data): + """Convert multiple types of data to types usable in CLI + + Args: + data (str | bytes | list | OrderedDict): input data + + Returns: + str | list | dict: converted data + """ + from base64 import b64encode + from collections import OrderedDict + + if isinstance(data, str): + return data + if isinstance(data, bytes): + try: + return data.decode() + except UnicodeDecodeError: + return b64encode(data).decode() + if isinstance(data, list): + list_tmp = [] + for item in data: + list_tmp.append(convert_data(item)) + return list_tmp + if isinstance(data, OrderedDict): + dict_tmp = {} + for key, value in data.items(): + dict_tmp[key] = convert_data(value) + return dict_tmp diff --git a/python/vyos/utils/dict.py b/python/vyos/utils/dict.py index 28d32bb8d..9484eacdd 100644 --- a/python/vyos/utils/dict.py +++ b/python/vyos/utils/dict.py @@ -13,7 +13,6 @@ # You should have received a copy of the GNU Lesser General Public # License along with this library. If not, see <http://www.gnu.org/licenses/>. - def colon_separated_to_dict(data_string, uniquekeys=False): """ Converts a string containing newline-separated entries of colon-separated key-value pairs into a dict. @@ -87,7 +86,7 @@ def mangle_dict_keys(data, regex, replacement, abs_path=None, no_tag_node_value_ if abs_path is None: abs_path = [] - new_dict = {} + new_dict = type(data)() for k in data.keys(): if no_tag_node_value_mangle and is_tag_value(abs_path + [k]): @@ -270,3 +269,39 @@ def check_mutually_exclusive_options(d, keys, required=False): if required and (len(present_keys) < 1): raise ValueError(f"At least one of the following options is required: {orig_keys}") + +class FixedDict(dict): + """ + FixedDict: A dictionnary not allowing new keys to be created after initialisation. + + >>> f = FixedDict(**{'count':1}) + >>> f['count'] = 2 + >>> f['king'] = 3 + File "...", line ..., in __setitem__ + raise ConfigError(f'Option "{k}" has no defined default') + """ + + from vyos import ConfigError + + def __init__(self, **options): + self._allowed = options.keys() + super().__init__(**options) + + def __setitem__(self, k, v): + """ + __setitem__ is a builtin which is called by python when setting dict values: + >>> d = dict() + >>> d['key'] = 'value' + >>> d + {'key': 'value'} + + is syntaxic sugar for + + >>> d = dict() + >>> d.__setitem__('key','value') + >>> d + {'key': 'value'} + """ + if k not in self._allowed: + raise ConfigError(f'Option "{k}" has no defined default') + super().__setitem__(k, v) diff --git a/python/vyos/utils/file.py b/python/vyos/utils/file.py index 2560a35be..667a2464b 100644 --- a/python/vyos/utils/file.py +++ b/python/vyos/utils/file.py @@ -14,7 +14,19 @@ # License along with this library. If not, see <http://www.gnu.org/licenses/>. import os +from vyos.utils.permission import chown +def makedir(path, user=None, group=None): + if os.path.exists(path): + return + os.makedirs(path, mode=0o755) + chown(path, user, group) + +def file_is_persistent(path): + import re + location = r'^(/config|/opt/vyatta/etc/config)' + absolute = os.path.abspath(os.path.dirname(path)) + return re.match(location,absolute) def read_file(fname, defaultonfailure=None): """ diff --git a/python/vyos/utils/kernel.py b/python/vyos/utils/kernel.py new file mode 100644 index 000000000..0eb113174 --- /dev/null +++ b/python/vyos/utils/kernel.py @@ -0,0 +1,27 @@ +# Copyright 2023 VyOS maintainers and contributors <maintainers@vyos.io> +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library. If not, see <http://www.gnu.org/licenses/>. + +import os + +def check_kmod(k_mod): + """ Common utility function to load required kernel modules on demand """ + from vyos import ConfigError + from vyos.utils.process import call + if isinstance(k_mod, str): + k_mod = k_mod.split() + for module in k_mod: + if not os.path.exists(f'/sys/module/{module}'): + if call(f'modprobe {module}') != 0: + raise ConfigError(f'Loading Kernel module {module} failed') diff --git a/python/vyos/utils/list.py b/python/vyos/utils/list.py new file mode 100644 index 000000000..63ef720ab --- /dev/null +++ b/python/vyos/utils/list.py @@ -0,0 +1,20 @@ +# Copyright 2023 VyOS maintainers and contributors <maintainers@vyos.io> +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library. If not, see <http://www.gnu.org/licenses/>. + +def is_list_equal(first: list, second: list) -> bool: + """ Check if 2 lists are equal and list not empty """ + if len(first) != len(second) or len(first) == 0: + return False + return sorted(first) == sorted(second) diff --git a/python/vyos/utils/misc.py b/python/vyos/utils/misc.py new file mode 100644 index 000000000..d82655914 --- /dev/null +++ b/python/vyos/utils/misc.py @@ -0,0 +1,66 @@ +# Copyright 2023 VyOS maintainers and contributors <maintainers@vyos.io> +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library. If not, see <http://www.gnu.org/licenses/>. + +def begin(*args): + """ + Evaluate arguments in order and return the result of the *last* argument. + For combining multiple expressions in one statement. Useful for lambdas. + """ + return args[-1] + +def begin0(*args): + """ + Evaluate arguments in order and return the result of the *first* argument. + For combining multiple expressions in one statement. Useful for lambdas. + """ + return args[0] + +def install_into_config(conf, config_paths, override_prompt=True): + # Allows op-mode scripts to install values if called from an active config session + # config_paths: dict of config paths + # override_prompt: if True, user will be prompted before existing nodes are overwritten + if not config_paths: + return None + + from vyos.config import Config + from vyos.utils.io import ask_yes_no + from vyos.utils.process import cmd + if not Config().in_session(): + print('You are not in configure mode, commands to install manually from configure mode:') + for path in config_paths: + print(f'set {path}') + return None + + count = 0 + failed = [] + + for path in config_paths: + if override_prompt and conf.exists(path) and not conf.is_multi(path): + if not ask_yes_no(f'Config node "{node}" already exists. Do you want to overwrite it?'): + continue + + try: + cmd(f'/opt/vyatta/sbin/my_set {path}') + count += 1 + except: + failed.append(path) + + if failed: + print(f'Failed to install {len(failed)} value(s). Commands to manually install:') + for path in failed: + print(f'set {path}') + + if count > 0: + print(f'{count} value(s) installed. Use "compare" to see the pending changes, and "commit" to apply.') diff --git a/python/vyos/utils/network.py b/python/vyos/utils/network.py index 72b7ca6da..3786caf26 100644 --- a/python/vyos/utils/network.py +++ b/python/vyos/utils/network.py @@ -15,7 +15,6 @@ import os - def get_protocol_by_name(protocol_name): """Get protocol number by protocol name @@ -28,3 +27,187 @@ def get_protocol_by_name(protocol_name): return protocol_number except socket.error: return protocol_name + +def interface_exists_in_netns(interface_name, netns): + from vyos.utils.process import rc_cmd + rc, out = rc_cmd(f'ip netns exec {netns} ip link show dev {interface_name}') + if rc == 0: + return True + return False + +def get_interface_vrf(interface): + """ Returns VRF of given interface """ + from vyos.utils.dict import dict_search + from vyos.utils.network import get_interface_config + tmp = get_interface_config(interface) + if dict_search('linkinfo.info_slave_kind', tmp) == 'vrf': + return tmp['master'] + return 'default' + +def get_interface_config(interface): + """ Returns the used encapsulation protocol for given interface. + If interface does not exist, None is returned. + """ + if not os.path.exists(f'/sys/class/net/{interface}'): + return None + from json import loads + from vyos.utils.process import cmd + tmp = loads(cmd(f'ip --detail --json link show dev {interface}'))[0] + return tmp + +def get_interface_address(interface): + """ Returns the used encapsulation protocol for given interface. + If interface does not exist, None is returned. + """ + if not os.path.exists(f'/sys/class/net/{interface}'): + return None + from json import loads + from vyos.utils.process import cmd + tmp = loads(cmd(f'ip --detail --json addr show dev {interface}'))[0] + return tmp + +def get_interface_namespace(iface): + """ + Returns wich netns the interface belongs to + """ + from json import loads + from vyos.utils.process import cmd + # Check if netns exist + tmp = loads(cmd(f'ip --json netns ls')) + if len(tmp) == 0: + return None + + for ns in tmp: + netns = f'{ns["name"]}' + # Search interface in each netns + data = loads(cmd(f'ip netns exec {netns} ip --json link show')) + for tmp in data: + if iface == tmp["ifname"]: + return netns + + +def is_wwan_connected(interface): + """ Determine if a given WWAN interface, e.g. wwan0 is connected to the + carrier network or not """ + import json + from vyos.utils.process import cmd + + if not interface.startswith('wwan'): + raise ValueError(f'Specified interface "{interface}" is not a WWAN interface') + + # ModemManager is required for connection(s) - if service is not running, + # there won't be any connection at all! + if not is_systemd_service_active('ModemManager.service'): + return False + + modem = interface.lstrip('wwan') + + tmp = cmd(f'mmcli --modem {modem} --output-json') + tmp = json.loads(tmp) + + # return True/False if interface is in connected state + return dict_search('modem.generic.state', tmp) == 'connected' + +def get_bridge_fdb(interface): + """ Returns the forwarding database entries for a given interface """ + if not os.path.exists(f'/sys/class/net/{interface}'): + return None + from json import loads + from vyos.utils.process import cmd + tmp = loads(cmd(f'bridge -j fdb show dev {interface}')) + return tmp + +def get_all_vrfs(): + """ Return a dictionary of all system wide known VRF instances """ + from json import loads + from vyos.utils.process import cmd + tmp = loads(cmd('ip --json vrf list')) + # Result is of type [{"name":"red","table":1000},{"name":"blue","table":2000}] + # so we will re-arrange it to a more nicer representation: + # {'red': {'table': 1000}, 'blue': {'table': 2000}} + data = {} + for entry in tmp: + name = entry.pop('name') + data[name] = entry + return data + +def mac2eui64(mac, prefix=None): + """ + Convert a MAC address to a EUI64 address or, with prefix provided, a full + IPv6 address. + Thankfully copied from https://gist.github.com/wido/f5e32576bb57b5cc6f934e177a37a0d3 + """ + import re + from ipaddress import ip_network + # http://tools.ietf.org/html/rfc4291#section-2.5.1 + eui64 = re.sub(r'[.:-]', '', mac).lower() + eui64 = eui64[0:6] + 'fffe' + eui64[6:] + eui64 = hex(int(eui64[0:2], 16) ^ 2)[2:].zfill(2) + eui64[2:] + + if prefix is None: + return ':'.join(re.findall(r'.{4}', eui64)) + else: + try: + net = ip_network(prefix, strict=False) + euil = int('0x{0}'.format(eui64), 16) + return str(net[euil]) + except: # pylint: disable=bare-except + return + +def check_port_availability(ipaddress, port, protocol): + """ + Check if port is available and not used by any service + Return False if a port is busy or IP address does not exists + Should be used carefully for services that can start listening + dynamically, because IP address may be dynamic too + """ + from socketserver import TCPServer, UDPServer + from ipaddress import ip_address + + # verify arguments + try: + ipaddress = ip_address(ipaddress).compressed + except: + raise ValueError(f'The {ipaddress} is not a valid IPv4 or IPv6 address') + if port not in range(1, 65536): + raise ValueError(f'The port number {port} is not in the 1-65535 range') + if protocol not in ['tcp', 'udp']: + raise ValueError(f'The protocol {protocol} is not supported. Only tcp and udp are allowed') + + # check port availability + try: + if protocol == 'tcp': + server = TCPServer((ipaddress, port), None, bind_and_activate=True) + if protocol == 'udp': + server = UDPServer((ipaddress, port), None, bind_and_activate=True) + server.server_close() + except Exception as e: + # errno.h: + #define EADDRINUSE 98 /* Address already in use */ + if e.errno == 98: + return False + + return True + +def is_listen_port_bind_service(port: int, service: str) -> bool: + """Check if listen port bound to expected program name + :param port: Bind port + :param service: Program name + :return: bool + + Example: + % is_listen_port_bind_service(443, 'nginx') + True + % is_listen_port_bind_service(443, 'ocserv-main') + False + """ + from psutil import net_connections as connections + from psutil import Process as process + for connection in connections(): + addr = connection.laddr + pid = connection.pid + pid_name = process(pid).name() + pid_port = addr.port + if service == pid_name and port == pid_port: + return True + return False diff --git a/python/vyos/utils/permission.py b/python/vyos/utils/permission.py new file mode 100644 index 000000000..d938b494f --- /dev/null +++ b/python/vyos/utils/permission.py @@ -0,0 +1,78 @@ +# Copyright 2023 VyOS maintainers and contributors <maintainers@vyos.io> +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library. If not, see <http://www.gnu.org/licenses/>. + +import os + +def chown(path, user, group): + """ change file/directory owner """ + from pwd import getpwnam + from grp import getgrnam + + if user is None or group is None: + return False + + # path may also be an open file descriptor + if not isinstance(path, int) and not os.path.exists(path): + return False + + uid = getpwnam(user).pw_uid + gid = getgrnam(group).gr_gid + os.chown(path, uid, gid) + return True + +def chmod(path, bitmask): + # path may also be an open file descriptor + if not isinstance(path, int) and not os.path.exists(path): + return + if bitmask is None: + return + os.chmod(path, bitmask) + +def chmod_600(path): + """ make file only read/writable by owner """ + from stat import S_IRUSR, S_IWUSR + + bitmask = S_IRUSR | S_IWUSR + chmod(path, bitmask) + +def chmod_750(path): + """ make file/directory only executable to user and group """ + from stat import S_IRUSR, S_IWUSR, S_IXUSR, S_IRGRP, S_IXGRP + + bitmask = S_IRUSR | S_IWUSR | S_IXUSR | S_IRGRP | S_IXGRP + chmod(path, bitmask) + +def chmod_755(path): + """ make file executable by all """ + from stat import S_IRUSR, S_IWUSR, S_IXUSR, S_IRGRP, S_IXGRP, S_IROTH, S_IXOTH + + bitmask = S_IRUSR | S_IWUSR | S_IXUSR | S_IRGRP | S_IXGRP | \ + S_IROTH | S_IXOTH + chmod(path, bitmask) + +def is_admin() -> bool: + """Look if current user is in sudo group""" + from getpass import getuser + from grp import getgrnam + current_user = getuser() + (_, _, _, admin_group_members) = getgrnam('sudo') + return current_user in admin_group_members + +def get_cfg_group_id(): + from grp import getgrnam + from vyos.defaults import cfg_group + + group_data = getgrnam(cfg_group) + return group_data.gr_gid diff --git a/python/vyos/utils/process.py b/python/vyos/utils/process.py new file mode 100644 index 000000000..911547995 --- /dev/null +++ b/python/vyos/utils/process.py @@ -0,0 +1,232 @@ +# Copyright 2023 VyOS maintainers and contributors <maintainers@vyos.io> +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library. If not, see <http://www.gnu.org/licenses/>. + +import os + +from subprocess import Popen +from subprocess import PIPE +from subprocess import STDOUT +from subprocess import DEVNULL + +def popen(command, flag='', shell=None, input=None, timeout=None, env=None, + stdout=PIPE, stderr=PIPE, decode='utf-8'): + """ + popen is a wrapper helper aound subprocess.Popen + with it default setting it will return a tuple (out, err) + out: the output of the program run + err: the error code returned by the program + + it can be affected by the following flags: + shell: do not try to auto-detect if a shell is required + for example if a pipe (|) or redirection (>, >>) is used + input: data to sent to the child process via STDIN + the data should be bytes but string will be converted + timeout: time after which the command will be considered to have failed + env: mapping that defines the environment variables for the new process + stdout: define how the output of the program should be handled + - PIPE (default), sends stdout to the output + - DEVNULL, discard the output + stderr: define how the output of the program should be handled + - None (default), send/merge the data to/with stderr + - PIPE, popen will append it to output + - STDOUT, send the data to be merged with stdout + - DEVNULL, discard the output + decode: specify the expected text encoding (utf-8, ascii, ...) + the default is explicitely utf-8 which is python's own default + + usage: + get both stdout and stderr: popen('command', stdout=PIPE, stderr=STDOUT) + discard stdout and get stderr: popen('command', stdout=DEVNUL, stderr=PIPE) + """ + + # airbag must be left as an import in the function as otherwise we have a + # a circual import dependency + from vyos import debug + from vyos import airbag + + # log if the flag is set, otherwise log if command is set + if not debug.enabled(flag): + flag = 'command' + + cmd_msg = f"cmd '{command}'" + debug.message(cmd_msg, flag) + + use_shell = shell + stdin = None + if shell is None: + use_shell = False + if ' ' in command: + use_shell = True + if env: + use_shell = True + + if input: + stdin = PIPE + input = input.encode() if type(input) is str else input + + p = Popen(command, stdin=stdin, stdout=stdout, stderr=stderr, + env=env, shell=use_shell) + + pipe = p.communicate(input, timeout) + + pipe_out = b'' + if stdout == PIPE: + pipe_out = pipe[0] + + pipe_err = b'' + if stderr == PIPE: + pipe_err = pipe[1] + + str_out = pipe_out.decode(decode).replace('\r\n', '\n').strip() + str_err = pipe_err.decode(decode).replace('\r\n', '\n').strip() + + out_msg = f"returned (out):\n{str_out}" + if str_out: + debug.message(out_msg, flag) + + if str_err: + from sys import stderr + err_msg = f"returned (err):\n{str_err}" + # this message will also be send to syslog via airbag + debug.message(err_msg, flag, destination=stderr) + + # should something go wrong, report this too via airbag + airbag.noteworthy(cmd_msg) + airbag.noteworthy(out_msg) + airbag.noteworthy(err_msg) + + return str_out, p.returncode + + +def run(command, flag='', shell=None, input=None, timeout=None, env=None, + stdout=DEVNULL, stderr=PIPE, decode='utf-8'): + """ + A wrapper around popen, which discard the stdout and + will return the error code of a command + """ + _, code = popen( + command, flag, + stdout=stdout, stderr=stderr, + input=input, timeout=timeout, + env=env, shell=shell, + decode=decode, + ) + return code + + +def cmd(command, flag='', shell=None, input=None, timeout=None, env=None, + stdout=PIPE, stderr=PIPE, decode='utf-8', raising=None, message='', + expect=[0]): + """ + A wrapper around popen, which returns the stdout and + will raise the error code of a command + + raising: specify which call should be used when raising + the class should only require a string as parameter + (default is OSError) with the error code + expect: a list of error codes to consider as normal + """ + decoded, code = popen( + command, flag, + stdout=stdout, stderr=stderr, + input=input, timeout=timeout, + env=env, shell=shell, + decode=decode, + ) + if code not in expect: + feedback = message + '\n' if message else '' + feedback += f'failed to run command: {command}\n' + feedback += f'returned: {decoded}\n' + feedback += f'exit code: {code}' + if raising is None: + # error code can be recovered with .errno + raise OSError(code, feedback) + else: + raise raising(feedback) + return decoded + + +def rc_cmd(command, flag='', shell=None, input=None, timeout=None, env=None, + stdout=PIPE, stderr=STDOUT, decode='utf-8'): + """ + A wrapper around popen, which returns the return code + of a command and stdout + + % rc_cmd('uname') + (0, 'Linux') + % rc_cmd('ip link show dev eth99') + (1, 'Device "eth99" does not exist.') + """ + out, code = popen( + command, flag, + stdout=stdout, stderr=stderr, + input=input, timeout=timeout, + env=env, shell=shell, + decode=decode, + ) + return code, out + +def call(command, flag='', shell=None, input=None, timeout=None, env=None, + stdout=PIPE, stderr=PIPE, decode='utf-8'): + """ + A wrapper around popen, which print the stdout and + will return the error code of a command + """ + out, code = popen( + command, flag, + stdout=stdout, stderr=stderr, + input=input, timeout=timeout, + env=env, shell=shell, + decode=decode, + ) + if out: + print(out) + return code + +def process_running(pid_file): + """ Checks if a process with PID in pid_file is running """ + from psutil import pid_exists + if not os.path.isfile(pid_file): + return False + with open(pid_file, 'r') as f: + pid = f.read().strip() + return pid_exists(int(pid)) + +def process_named_running(name, cmdline: str=None): + """ Checks if process with given name is running and returns its PID. + If Process is not running, return None + """ + from psutil import process_iter + for p in process_iter(['name', 'pid', 'cmdline']): + if cmdline: + if p.info['name'] == name and cmdline in p.info['cmdline']: + return p.info['pid'] + elif p.info['name'] == name: + return p.info['pid'] + return None + +def is_systemd_service_active(service): + """ Test is a specified systemd service is activated. + Returns True if service is active, false otherwise. + Copied from: https://unix.stackexchange.com/a/435317 """ + tmp = cmd(f'systemctl show --value -p ActiveState {service}') + return bool((tmp == 'active')) + +def is_systemd_service_running(service): + """ Test is a specified systemd service is actually running. + Returns True if service is running, false otherwise. + Copied from: https://unix.stackexchange.com/a/435317 """ + tmp = cmd(f'systemctl show --value -p SubState {service}') + return bool((tmp == 'running')) diff --git a/python/vyos/utils/system.py b/python/vyos/utils/system.py new file mode 100644 index 000000000..5d41c0c05 --- /dev/null +++ b/python/vyos/utils/system.py @@ -0,0 +1,107 @@ +# Copyright 2023 VyOS maintainers and contributors <maintainers@vyos.io> +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library. If not, see <http://www.gnu.org/licenses/>. + +import os +from subprocess import run + +def sysctl_read(name: str) -> str: + """Read and return current value of sysctl() option + + Args: + name (str): sysctl key name + + Returns: + str: sysctl key value + """ + tmp = run(['sysctl', '-nb', name], capture_output=True) + return tmp.stdout.decode() + +def sysctl_write(name: str, value: str | int) -> bool: + """Change value via sysctl() + + Args: + name (str): sysctl key name + value (str | int): sysctl key value + + Returns: + bool: True if changed, False otherwise + """ + # convert other types to string before comparison + if not isinstance(value, str): + value = str(value) + # do not change anything if a value is already configured + if sysctl_read(name) == value: + return True + # return False if sysctl call failed + if run(['sysctl', '-wq', f'{name}={value}']).returncode != 0: + return False + # compare old and new values + # sysctl may apply value, but its actual value will be + # different from requested + if sysctl_read(name) == value: + return True + # False in other cases + return False + +def sysctl_apply(sysctl_dict: dict[str, str], revert: bool = True) -> bool: + """Apply sysctl values. + + Args: + sysctl_dict (dict[str, str]): dictionary with sysctl keys with values + revert (bool, optional): Revert to original values if new were not + applied. Defaults to True. + + Returns: + bool: True if all params configured properly, False in other cases + """ + # get current values + sysctl_original: dict[str, str] = {} + for key_name in sysctl_dict.keys(): + sysctl_original[key_name] = sysctl_read(key_name) + # apply new values and revert in case one of them was not applied + for key_name, value in sysctl_dict.items(): + if not sysctl_write(key_name, value): + if revert: + sysctl_apply(sysctl_original, revert=False) + return False + # everything applied + return True + +def get_half_cpus(): + """ return 1/2 of the numbers of available CPUs """ + cpu = os.cpu_count() + if cpu > 1: + cpu /= 2 + return int(cpu) + +def find_device_file(device): + """ Recurively search /dev for the given device file and return its full path. + If no device file was found 'None' is returned """ + from fnmatch import fnmatch + + for root, dirs, files in os.walk('/dev'): + for basename in files: + if fnmatch(basename, device): + return os.path.join(root, basename) + + return None + +def load_as_module(name: str, path: str): + import importlib.util + + spec = importlib.util.spec_from_file_location(name, path) + mod = importlib.util.module_from_spec(spec) + spec.loader.exec_module(mod) + return mod |