diff options
Diffstat (limited to 'python')
-rw-r--r-- | python/vyos/utils/__init__.py | 1 | ||||
-rw-r--r-- | python/vyos/utils/locking.py | 115 | ||||
-rw-r--r-- | python/vyos/utils/strip_config.py | 210 |
3 files changed, 326 insertions, 0 deletions
diff --git a/python/vyos/utils/__init__.py b/python/vyos/utils/__init__.py index 90620071b..3759b2125 100644 --- a/python/vyos/utils/__init__.py +++ b/python/vyos/utils/__init__.py @@ -25,6 +25,7 @@ from vyos.utils import file from vyos.utils import io from vyos.utils import kernel from vyos.utils import list +from vyos.utils import locking from vyos.utils import misc from vyos.utils import network from vyos.utils import permission diff --git a/python/vyos/utils/locking.py b/python/vyos/utils/locking.py new file mode 100644 index 000000000..63cb1a816 --- /dev/null +++ b/python/vyos/utils/locking.py @@ -0,0 +1,115 @@ +# Copyright 2024 VyOS maintainers and contributors <maintainers@vyos.io> +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library. If not, see <http://www.gnu.org/licenses/>. + +import fcntl +import re +import time +from pathlib import Path + + +class LockTimeoutError(Exception): + """Custom exception raised when lock acquisition times out.""" + + pass + + +class InvalidLockNameError(Exception): + """Custom exception raised when the lock name is invalid.""" + + pass + + +class Lock: + """Lock class to acquire and release a lock file""" + + def __init__(self, lock_name: str) -> None: + """Lock class constructor + + Args: + lock_name (str): Name of the lock file + + Raises: + InvalidLockNameError: If the lock name is invalid + """ + # Validate lock name + if not re.match(r'^[a-zA-Z0-9_\-]+$', lock_name): + raise InvalidLockNameError(f'Invalid lock name: {lock_name}') + + self.__lock_dir = Path('/run/vyos/lock') + self.__lock_dir.mkdir(parents=True, exist_ok=True) + + self.__lock_file_path: Path = self.__lock_dir / f'{lock_name}.lock' + self.__lock_file = None + + self._is_locked = False + + def __del__(self) -> None: + """Ensure the lock file is removed when the object is deleted""" + self.release() + + @property + def is_locked(self) -> bool: + """Check if the lock is acquired + + Returns: + bool: True if the lock is acquired, False otherwise + """ + return self._is_locked + + def __unlink_lockfile(self) -> None: + """Remove the lock file if it is not currently locked.""" + try: + with self.__lock_file_path.open('w') as f: + fcntl.flock(f, fcntl.LOCK_EX | fcntl.LOCK_NB) + self.__lock_file_path.unlink(missing_ok=True) + except IOError: + # If we cannot acquire the lock, it means another process has it, so we do nothing. + pass + + def acquire(self, timeout: int = 0) -> None: + """Acquire a lock file + + Args: + timeout (int, optional): A time to wait for lock. Defaults to 0. + + Raises: + LockTimeoutError: If lock could not be acquired within timeout + """ + start_time: float = time.time() + while True: + try: + self.__lock_file = self.__lock_file_path.open('w') + fcntl.flock(self.__lock_file, fcntl.LOCK_EX | fcntl.LOCK_NB) + self._is_locked = True + return + except IOError: + if timeout > 0 and (time.time() - start_time) >= timeout: + if self.__lock_file: + self.__lock_file.close() + raise LockTimeoutError( + f'Could not acquire lock within {timeout} seconds' + ) + time.sleep(0.1) + + def release(self) -> None: + """Release a lock file""" + if self.__lock_file and self._is_locked: + try: + fcntl.flock(self.__lock_file, fcntl.LOCK_UN) + self._is_locked = False + finally: + self.__lock_file.close() + self.__lock_file = None + self.__unlink_lockfile() diff --git a/python/vyos/utils/strip_config.py b/python/vyos/utils/strip_config.py new file mode 100644 index 000000000..7a9c78c9f --- /dev/null +++ b/python/vyos/utils/strip_config.py @@ -0,0 +1,210 @@ +#!/usr/bin/python3 +# +# Copyright 2024 VyOS maintainers and contributors <maintainers@vyos.io> +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library. If not, see <http://www.gnu.org/licenses/>. + +# XXX: these functions assume that the config is at the top level, +# and aren't capable of anonymizing config subtress. +# They shouldn't be used as a basis for a strip-private filter +# until we figure out if we can pass the config path information to the filter. + +import copy + +import vyos.configtree + + +def __anonymize_password(v): + return "<PASSWORD REDACTED>" + +def __anonymize_key(v): + return "<KEY DATA REDACTED>" + +def __anonymize_data(v): + return "<DATA REDACTED>" + +__secret_paths = [ + # System user password hashes + {"base_path": ['system', 'login', 'user'], "secret_path": ["authentication", "encrypted-password"], "func": __anonymize_password}, + + # PKI data + {"base_path": ["pki", "ca"], "secret_path": ["private", "key"], "func": __anonymize_key}, + {"base_path": ["pki", "ca"], "secret_path": ["certificate"], "func": __anonymize_key}, + {"base_path": ["pki", "ca"], "secret_path": ["crl"], "func": __anonymize_key}, + {"base_path": ["pki", "certificate"], "secret_path": ["private", "key"], "func": __anonymize_key}, + {"base_path": ["pki", "certificate"], "secret_path": ["certificate"], "func": __anonymize_key}, + {"base_path": ["pki", "certificate"], "secret_path": ["acme", "email"], "func": __anonymize_data}, + {"base_path": ["pki", "key-pair"], "secret_path": ["private", "key"], "func": __anonymize_key}, + {"base_path": ["pki", "key-pair"], "secret_path": ["public", "key"], "func": __anonymize_key}, + {"base_path": ["pki", "openssh"], "secret_path": ["private", "key"], "func": __anonymize_key}, + {"base_path": ["pki", "openssh"], "secret_path": ["public", "key"], "func": __anonymize_key}, + {"base_path": ["pki", "openvpn", "shared-secret"], "secret_path": ["key"], "func": __anonymize_key}, + {"base_path": ["pki", "dh"], "secret_path": ["parameters"], "func": __anonymize_key}, + + # IPsec pre-shared secrets + {"base_path": ['vpn', 'ipsec', 'authentication', 'psk'], "secret_path": ["secret"], "func": __anonymize_password}, + + # IPsec x509 passphrases + {"base_path": ['vpn', 'ipsec', 'site-to-site', 'peer'], "secret_path": ['authentication', 'x509'], "func": __anonymize_password}, + + # IPsec remote-access secrets and passwords + {"base_path": ["vpn", "ipsec", "remote-access", "connection"], "secret_path": ["authentication", "pre-shared-secret"], "func": __anonymize_password}, + # Passwords in remote-access IPsec local users have their own fixup + # due to deeper nesting. + + # PPTP passwords + {"base_path": ['vpn', 'pptp', 'remote-access', 'authentication', 'local-users', 'username'], "secret_path": ['password'], "func": __anonymize_password}, + + # L2TP passwords + {"base_path": ['vpn', 'l2tp', 'remote-access', 'authentication', 'local-users', 'username'], "secret_path": ['password'], "func": __anonymize_password}, + {"path": ['vpn', 'l2tp', 'remote-access', 'ipsec-settings', 'authentication', 'pre-shared-secret'], "func": __anonymize_password}, + + # SSTP passwords + {"base_path": ['vpn', 'sstp', 'remote-access', 'authentication', 'local-users', 'username'], "secret_path": ['password'], "func": __anonymize_password}, + + # OpenConnect passwords + {"base_path": ['vpn', 'openconnect', 'authentication', 'local-users', 'username'], "secret_path": ['password'], "func": __anonymize_password}, + + # PPPoE server passwords + {"base_path": ['service', 'pppoe-server', 'authentication', 'local-users', 'username'], "secret_path": ['password'], "func": __anonymize_password}, + + # RADIUS PSKs for VPN services + {"base_path": ["vpn", "sstp", "authentication", "radius", "server"], "secret_path": ["key"], "func": __anonymize_password}, + {"base_path": ["vpn", "l2tp", "authentication", "radius", "server"], "secret_path": ["key"], "func": __anonymize_password}, + {"base_path": ["vpn", "pptp", "authentication", "radius", "server"], "secret_path": ["key"], "func": __anonymize_password}, + {"base_path": ["vpn", "openconnect", "authentication", "radius", "server"], "secret_path": ["key"], "func": __anonymize_password}, + {"base_path": ["service", "ipoe-server", "authentication", "radius", "server"], "secret_path": ["key"], "func": __anonymize_password}, + {"base_path": ["service", "pppoe-server", "authentication", "radius", "server"], "secret_path": ["key"], "func": __anonymize_password}, + + # VRRP passwords + {"base_path": ['high-availability', 'vrrp', 'group'], "secret_path": ['authentication', 'password'], "func": __anonymize_password}, + + # BGP neighbor and peer group passwords + {"base_path": ['protocols', 'bgp', 'neighbor'], "secret_path": ["password"], "func": __anonymize_password}, + {"base_path": ['protocols', 'bgp', 'peer-group'], "secret_path": ["password"], "func": __anonymize_password}, + + # WireGuard private keys + {"base_path": ["interfaces", "wireguard"], "secret_path": ["private-key"], "func": __anonymize_password}, + + # NHRP passwords + {"base_path": ["protocols", "nhrp", "tunnel"], "secret_path": ["cisco-authentication"], "func": __anonymize_password}, + + # RIP passwords + {"base_path": ["protocols", "rip", "interface"], "secret_path": ["authentication", "plaintext-password"], "func": __anonymize_password}, + + # IS-IS passwords + {"path": ["protocols", "isis", "area-password", "plaintext-password"], "func": __anonymize_password}, + {"base_path": ["protocols", "isis", "interface"], "secret_path": ["password", "plaintext-password"], "func": __anonymize_password}, + + # HTTP API servers + {"base_path": ["service", "https", "api", "keys", "id"], "secret_path": ["key"], "func": __anonymize_password}, + + # Telegraf + {"path": ["service", "monitoring", "telegraf", "prometheus-client", "authentication", "password"], "func": __anonymize_password}, + {"path": ["service", "monitoring", "telegraf", "influxdb", "authentication", "token"], "func": __anonymize_password}, + {"path": ["service", "monitoring", "telegraf", "azure-data-explorer", "authentication", "client-secret"], "func": __anonymize_password}, + {"path": ["service", "monitoring", "telegraf", "splunk", "authentication", "token"], "func": __anonymize_password}, + + # SNMPv3 passwords + {"base_path": ["service", "snmp", "v3", "user"], "secret_path": ["privacy", "encrypted-password"], "func": __anonymize_password}, + {"base_path": ["service", "snmp", "v3", "user"], "secret_path": ["privacy", "plaintext-password"], "func": __anonymize_password}, + {"base_path": ["service", "snmp", "v3", "user"], "secret_path": ["auth", "encrypted-password"], "func": __anonymize_password}, + {"base_path": ["service", "snmp", "v3", "user"], "secret_path": ["auth", "encrypted-password"], "func": __anonymize_password}, +] + +def __prepare_secret_paths(config_tree, secret_paths): + """ Generate a list of secret paths for the current system, + adjusted for variable parts such as VRFs and remote access IPsec instances + """ + + # Fixup for remote-access IPsec local users that are nested under two tag nodes + # We generate the list of their paths dynamically + ipsec_ra_base = {"base_path": ["vpn", "ipsec", "remote-access", "connection"], "func": __anonymize_password} + if config_tree.exists(ipsec_ra_base["base_path"]): + for conn in config_tree.list_nodes(ipsec_ra_base["base_path"]): + if config_tree.exists(ipsec_ra_base["base_path"] + [conn] + ["authentication", "local-users", "username"]): + for u in config_tree.list_nodes(ipsec_ra_base["base_path"] + [conn] + ["authentication", "local-users", "username"]): + p = copy.copy(ipsec_ra_base) + p["base_path"] = p["base_path"] + [conn] + ["authentication", "local-users", "username"] + p["secret_path"] = ["password"] + secret_paths.append(p) + + # Fixup for VRFs that may contain routing protocols and other nodes nested under them + vrf_paths = [] + vrf_base_path = ["vrf", "name"] + if config_tree.exists(vrf_base_path): + for v in config_tree.list_nodes(vrf_base_path): + vrf_secret_paths = copy.deepcopy(secret_paths) + for sp in vrf_secret_paths: + if "base_path" in sp: + sp["base_path"] = vrf_base_path + [v] + sp["base_path"] + elif "path" in sp: + sp["path"] = vrf_base_path + [v] + sp["path"] + vrf_paths.append(sp) + + secret_paths = secret_paths + vrf_paths + + # Fixup for user SSH keys, that are nested under a tag node + #ssh_key_base_path = {"base_path": ['system', 'login', 'user'], "secret_path": ["authentication", "encrypted-password"], "func": __anonymize_password}, + user_base_path = ['system', 'login', 'user'] + ssh_key_paths = [] + if config_tree.exists(user_base_path): + for u in config_tree.list_nodes(user_base_path): + kp = {"base_path": user_base_path + [u, "authentication", "public-keys"], "secret_path": ["key"], "func": __anonymize_key} + ssh_key_paths.append(kp) + + secret_paths = secret_paths + ssh_key_paths + + # Fixup for OSPF passwords and keys that are nested under OSPF interfaces + ospf_base_path = ["protocols", "ospf", "interface"] + ospf_paths = [] + if config_tree.exists(ospf_base_path): + for i in config_tree.list_nodes(ospf_base_path): + # Plaintext password, there can be only one + opp = {"path": ospf_base_path + [i, "authentication", "plaintext-password"], "func": __anonymize_password} + md5kp = {"base_path": ospf_base_path + [i, "authentication", "md5", "key-id"], "secret_path": ["md5-key"], "func": __anonymize_password} + ospf_paths.append(opp) + ospf_paths.append(md5kp) + + secret_paths = secret_paths + ospf_paths + + return secret_paths + +def __strip_private(ct, secret_paths): + for sp in secret_paths: + if "base_path" in sp: + if ct.exists(sp["base_path"]): + for n in ct.list_nodes(sp["base_path"]): + if ct.exists(sp["base_path"] + [n] + sp["secret_path"]): + secret = ct.return_value(sp["base_path"] + [n] + sp["secret_path"]) + ct.set(sp["base_path"] + [n] + sp["secret_path"], value=sp["func"](secret)) + elif "path" in sp: + if ct.exists(sp["path"]): + secret = ct.return_value(sp["path"]) + ct.set(sp["path"], value=sp["func"](secret)) + else: + raise ValueError("Malformed secret path dict, has neither base_path nor path in it ") + + return ct.to_string() + +def strip_config_source(config_source): + config_tree = vyos.configtree.ConfigTree(config_source) + secret_paths = __prepare_secret_paths(config_tree, __secret_paths) + stripped_config = __strip_private(config_tree, secret_paths) + + return stripped_config + +def strip_config_tree(config_tree): + secret_paths = __prepare_secret_paths(config_tree, __secret_paths) + return __strip_private(config_tree, secret_paths) |