diff options
Diffstat (limited to 'python/vyos')
-rw-r--r-- | python/vyos/ipsec.py | 136 | ||||
-rw-r--r-- | python/vyos/utils/serial.py | 118 |
2 files changed, 218 insertions, 36 deletions
diff --git a/python/vyos/ipsec.py b/python/vyos/ipsec.py index 4603aab22..28f77565a 100644 --- a/python/vyos/ipsec.py +++ b/python/vyos/ipsec.py @@ -1,4 +1,4 @@ -# Copyright 2020-2023 VyOS maintainers and contributors <maintainers@vyos.io> +# Copyright 2020-2024 VyOS maintainers and contributors <maintainers@vyos.io> # # This library is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public @@ -13,31 +13,38 @@ # You should have received a copy of the GNU Lesser General Public # License along with this library. If not, see <http://www.gnu.org/licenses/>. -#Package to communicate with Strongswan VICI +# Package to communicate with Strongswan VICI + class ViciInitiateError(Exception): """ - VICI can't initiate a session. + VICI can't initiate a session. """ + pass + + class ViciCommandError(Exception): """ - VICI can't execute a command by any reason. + VICI can't execute a command by any reason. """ + pass + def get_vici_sas(): from vici import Session as vici_session try: session = vici_session() except Exception: - raise ViciInitiateError("IPsec not initialized") + raise ViciInitiateError('IPsec not initialized') try: sas = list(session.list_sas()) return sas except Exception: - raise ViciCommandError(f'Failed to get SAs') + raise ViciCommandError('Failed to get SAs') + def get_vici_connections(): from vici import Session as vici_session @@ -45,18 +52,19 @@ def get_vici_connections(): try: session = vici_session() except Exception: - raise ViciInitiateError("IPsec not initialized") + raise ViciInitiateError('IPsec not initialized') try: connections = list(session.list_conns()) return connections except Exception: - raise ViciCommandError(f'Failed to get connections') + raise ViciCommandError('Failed to get connections') + def get_vici_sas_by_name(ike_name: str, tunnel: str) -> list: """ - Find sas by IKE_SA name and/or CHILD_SA name - and return list of OrdinaryDicts with SASs info - If tunnel is not None return value is list of OrdenaryDicts contained only + Find installed SAs by IKE_SA name and/or CHILD_SA name + and return list with SASs info. + If tunnel is not None return a list contained only CHILD_SAs wich names equal tunnel value. :param ike_name: IKE SA name :type ike_name: str @@ -70,7 +78,7 @@ def get_vici_sas_by_name(ike_name: str, tunnel: str) -> list: try: session = vici_session() except Exception: - raise ViciInitiateError("IPsec not initialized") + raise ViciInitiateError('IPsec not initialized') vici_dict = {} if ike_name: vici_dict['ike'] = ike_name @@ -80,7 +88,31 @@ def get_vici_sas_by_name(ike_name: str, tunnel: str) -> list: sas = list(session.list_sas(vici_dict)) return sas except Exception: - raise ViciCommandError(f'Failed to get SAs') + raise ViciCommandError('Failed to get SAs') + + +def get_vici_connection_by_name(ike_name: str) -> list: + """ + Find loaded SAs by IKE_SA name and return list with SASs info + :param ike_name: IKE SA name + :type ike_name: str + :return: list of Ordinary Dicts with SASs + :rtype: list + """ + from vici import Session as vici_session + + try: + session = vici_session() + except Exception: + raise ViciInitiateError('IPsec is not initialized') + vici_dict = {} + if ike_name: + vici_dict['ike'] = ike_name + try: + sas = list(session.list_conns(vici_dict)) + return sas + except Exception: + raise ViciCommandError('Failed to get SAs') def terminate_vici_ikeid_list(ike_id_list: list) -> None: @@ -94,19 +126,17 @@ def terminate_vici_ikeid_list(ike_id_list: list) -> None: try: session = vici_session() except Exception: - raise ViciInitiateError("IPsec not initialized") + raise ViciInitiateError('IPsec is not initialized') try: for ikeid in ike_id_list: - session_generator = session.terminate( - {'ike-id': ikeid, 'timeout': '-1'}) + session_generator = session.terminate({'ike-id': ikeid, 'timeout': '-1'}) # a dummy `for` loop is required because of requirements # from vici. Without a full iteration on the output, the # command to vici may not be executed completely for _ in session_generator: pass except Exception: - raise ViciCommandError( - f'Failed to terminate SA for IKE ids {ike_id_list}') + raise ViciCommandError(f'Failed to terminate SA for IKE ids {ike_id_list}') def terminate_vici_by_name(ike_name: str, child_name: str) -> None: @@ -123,9 +153,9 @@ def terminate_vici_by_name(ike_name: str, child_name: str) -> None: try: session = vici_session() except Exception: - raise ViciInitiateError("IPsec not initialized") + raise ViciInitiateError('IPsec is not initialized') try: - vici_dict: dict= {} + vici_dict: dict = {} if ike_name: vici_dict['ike'] = ike_name if child_name: @@ -138,16 +168,48 @@ def terminate_vici_by_name(ike_name: str, child_name: str) -> None: pass except Exception: if child_name: - raise ViciCommandError( - f'Failed to terminate SA for IPSEC {child_name}') + raise ViciCommandError(f'Failed to terminate SA for IPSEC {child_name}') else: - raise ViciCommandError( - f'Failed to terminate SA for IKE {ike_name}') + raise ViciCommandError(f'Failed to terminate SA for IKE {ike_name}') + + +def vici_initiate_all_child_sa_by_ike(ike_sa_name: str, child_sa_list: list) -> bool: + """ + Initiate IKE SA with scpecified CHILD_SAs in list + + Args: + ike_sa_name (str): an IKE SA connection name + child_sa_list (list): a list of child SA names + + Returns: + bool: a result of initiation command + """ + from vici import Session as vici_session + + try: + session = vici_session() + except Exception: + raise ViciInitiateError('IPsec is not initialized') + + try: + for child_sa_name in child_sa_list: + session_generator = session.initiate( + {'ike': ike_sa_name, 'child': child_sa_name, 'timeout': '-1'} + ) + # a dummy `for` loop is required because of requirements + # from vici. Without a full iteration on the output, the + # command to vici may not be executed completely + for _ in session_generator: + pass + return True + except Exception: + raise ViciCommandError(f'Failed to initiate SA for IKE {ike_sa_name}') -def vici_initiate(ike_sa_name: str, child_sa_name: str, src_addr: str, - dst_addr: str) -> bool: - """Initiate IKE SA connection with specific peer +def vici_initiate( + ike_sa_name: str, child_sa_name: str, src_addr: str, dst_addr: str +) -> bool: + """Initiate IKE SA with one child_sa connection with specific peer Args: ike_sa_name (str): an IKE SA connection name @@ -163,16 +225,18 @@ def vici_initiate(ike_sa_name: str, child_sa_name: str, src_addr: str, try: session = vici_session() except Exception: - raise ViciInitiateError("IPsec not initialized") + raise ViciInitiateError('IPsec is not initialized') try: - session_generator = session.initiate({ - 'ike': ike_sa_name, - 'child': child_sa_name, - 'timeout': '-1', - 'my-host': src_addr, - 'other-host': dst_addr - }) + session_generator = session.initiate( + { + 'ike': ike_sa_name, + 'child': child_sa_name, + 'timeout': '-1', + 'my-host': src_addr, + 'other-host': dst_addr, + } + ) # a dummy `for` loop is required because of requirements # from vici. Without a full iteration on the output, the # command to vici may not be executed completely @@ -180,4 +244,4 @@ def vici_initiate(ike_sa_name: str, child_sa_name: str, src_addr: str, pass return True except Exception: - raise ViciCommandError(f'Failed to initiate SA for IKE {ike_sa_name}')
\ No newline at end of file + raise ViciCommandError(f'Failed to initiate SA for IKE {ike_sa_name}') diff --git a/python/vyos/utils/serial.py b/python/vyos/utils/serial.py new file mode 100644 index 000000000..b646f881e --- /dev/null +++ b/python/vyos/utils/serial.py @@ -0,0 +1,118 @@ +# Copyright 2024 VyOS maintainers and contributors <maintainers@vyos.io> +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library. If not, see <http://www.gnu.org/licenses/>. + +import os, re, json +from typing import List + +from vyos.base import Warning +from vyos.utils.io import ask_yes_no +from vyos.utils.process import cmd + +GLOB_GETTY_UNITS = 'serial-getty@*.service' +RE_GETTY_DEVICES = re.compile(r'.+@(.+).service$') + +SD_UNIT_PATH = '/run/systemd/system' +UTMP_PATH = '/run/utmp' + +def get_serial_units(include_devices=[]): + # Since we cannot depend on the current config for decommissioned ports, + # we just grab everything that systemd knows about. + tmp = cmd(f'systemctl list-units {GLOB_GETTY_UNITS} --all --output json --no-pager') + getty_units = json.loads(tmp) + for sdunit in getty_units: + m = RE_GETTY_DEVICES.search(sdunit['unit']) + if m is None: + Warning(f'Serial console unit name "{sdunit["unit"]}" is malformed and cannot be checked for activity!') + continue + + getty_device = m.group(1) + if include_devices and getty_device not in include_devices: + continue + + sdunit['device'] = getty_device + + return getty_units + +def get_authenticated_ports(units): + connected = [] + ports = [ x['device'] for x in units if 'device' in x ] + # + # utmpdump just gives us an easily parseable dump of currently logged-in sessions, for eg: + # $ utmpdump /run/utmp + # Utmp dump of /run/utmp + # [2] [00000] [~~ ] [reboot ] [~ ] [6.6.31-amd64-vyos ] [0.0.0.0 ] [2024-06-18T13:56:53,958484+00:00] + # [1] [00051] [~~ ] [runlevel] [~ ] [6.6.31-amd64-vyos ] [0.0.0.0 ] [2024-06-18T13:57:01,790808+00:00] + # [6] [03178] [tty1] [LOGIN ] [tty1 ] [ ] [0.0.0.0 ] [2024-06-18T13:57:31,015392+00:00] + # [7] [37151] [ts/0] [vyos ] [pts/0 ] [10.9.8.7 ] [10.9.8.7 ] [2024-07-04T13:42:08,760892+00:00] + # [8] [24812] [ts/1] [ ] [pts/1 ] [10.9.8.7 ] [10.9.8.7 ] [2024-06-20T18:10:07,309365+00:00] + # + # We can safely skip blank or LOGIN sessions with valid device names. + # + for line in cmd(f'utmpdump {UTMP_PATH}').splitlines(): + row = line.split('] [') + user_name = row[3].strip() + user_term = row[4].strip() + if user_name and user_name != 'LOGIN' and user_term in ports: + connected.append(user_term) + + return connected + +def restart_login_consoles(prompt_user=False, quiet=True, devices: List[str]=[]): + # restart_login_consoles() is called from both conf- and op-mode scripts, including + # the warning messages and user prompts common to both. + # + # The default case, called with no arguments, is a simple serial-getty restart & + # cleanup wrapper with no output or prompts that can be used from anywhere. + # + # quiet and prompt_user args have been split from an original "no_prompt", in + # order to support the completely silent default use case. "no_prompt" would + # only suppress the user interactive prompt. + # + # quiet intentionally does not suppress a vyos.base.Warning() for malformed + # device names in _get_serial_units(). + # + cmd('systemctl daemon-reload') + + units = get_serial_units(devices) + connected = get_authenticated_ports(units) + + if connected: + if not quiet: + Warning('There are user sessions connected via serial console that '\ + 'will be terminated when serial console settings are changed!') + if not prompt_user: + # This flag is used by conf_mode/system_console.py to reset things, if there's + # a problem, the user should issue a manual restart for serial-getty. + Warning('Please ensure all settings are committed and saved before issuing a ' \ + '"restart serial console" command to apply new configuration!') + if not prompt_user: + return False + if not ask_yes_no('Any uncommitted changes from these sessions will be lost\n' \ + 'and in-progress actions may be left in an inconsistent state.\n'\ + '\nContinue?'): + return False + + for unit in units: + if 'device' not in unit: + continue # malformed or filtered. + unit_name = unit['unit'] + unit_device = unit['device'] + if os.path.exists(os.path.join(SD_UNIT_PATH, unit_name)): + cmd(f'systemctl restart {unit_name}') + else: + # Deleted stubs don't need to be restarted, just shut them down. + cmd(f'systemctl stop {unit_name}') + + return True |