From b251a183b30a5915fb8e2a8f7a194d75e65ccb34 Mon Sep 17 00:00:00 2001 From: aapostoliuk Date: Fri, 14 Apr 2023 12:54:35 +0300 Subject: ipsec: T5042: Rewritten 'show vpn ipsec remote-access' command Now 'show vpn ipsec remote-access' shows only IKEv2 Remote access VPN IPSec connections. Added option 'summary' that shows a summary table for these connections. Added option 'detail' that shows only RA SAs output of 'swanctl -l' Added options 'username' and 'connection-id' that filters output. Fixed output 'show vpn ipsec sa detail', the previous was 'show vpn ipsec sa verbose'. --- op-mode-definitions/vpn-ipsec.xml.in | 41 +++++-- src/op_mode/ipsec.py | 216 ++++++++++++++++++++++++++++++++++- src/op_mode/show_vpn_ra.py | 56 --------- 3 files changed, 244 insertions(+), 69 deletions(-) delete mode 100755 src/op_mode/show_vpn_ra.py diff --git a/op-mode-definitions/vpn-ipsec.xml.in b/op-mode-definitions/vpn-ipsec.xml.in index 5a7e6dd63..1eb5a3709 100644 --- a/op-mode-definitions/vpn-ipsec.xml.in +++ b/op-mode-definitions/vpn-ipsec.xml.in @@ -204,12 +204,37 @@ sudo ip xfrm policy list - - - Show active VPN server sessions - - ${vyos_op_scripts_dir}/show_vpn_ra.py - + + + Show active VPN server sessions + + + + + Show detail active IKEv2 RA sessions + + if systemctl is-active --quiet strongswan ; then sudo ${vyos_op_scripts_dir}/ipsec.py show_ra_detail; else echo "IPsec process not running" ; fi + + + + Show detail active IKEv2 RA sessions by connection-id + + if systemctl is-active --quiet strongswan ; then sudo ${vyos_op_scripts_dir}/ipsec.py show_ra_detail --conn_id="$6"; else echo "IPsec process not running" ; fi + + + + Show active IKEv2 RA sessions summary + + if systemctl is-active --quiet strongswan ; then sudo ${vyos_op_scripts_dir}/ipsec.py show_ra_summary; else echo "IPsec process not running" ; fi + + + + Show detail active IKEv2 RA sessions by username + + if systemctl is-active --quiet strongswan ; then sudo ${vyos_op_scripts_dir}/ipsec.py show_ra_detail --username="$6"; else echo "IPsec process not running" ; fi + + + Show all active IPsec Security Associations (SA) @@ -241,11 +266,11 @@ --> - + Show Verbose Detail on all active IPsec Security Associations (SA) - if systemctl is-active --quiet strongswan ; then sudo /usr/sbin/ipsec statusall ; else echo "IPsec process not running" ; fi + if systemctl is-active --quiet strongswan ; then sudo ${vyos_op_scripts_dir}/ipsec.py show_sa_detail ; else echo "IPsec process not running" ; fi if systemctl is-active --quiet strongswan ; then sudo ${vyos_op_scripts_dir}/ipsec.py show_sa ; else echo "IPsec process not running" ; fi diff --git a/src/op_mode/ipsec.py b/src/op_mode/ipsec.py index 7f4fb72e5..db4948d7a 100755 --- a/src/op_mode/ipsec.py +++ b/src/op_mode/ipsec.py @@ -13,7 +13,6 @@ # # You should have received a copy of the GNU General Public License # along with this program. If not, see . - import re import sys import typing @@ -24,6 +23,7 @@ from tabulate import tabulate from vyos.util import convert_data from vyos.util import seconds_to_human +from vyos.util import cmd from vyos.configquery import ConfigTreeQuery import vyos.opmode @@ -46,6 +46,25 @@ def _get_raw_data_sas(): except (vyos.ipsec.ViciInitiateError) as err: raise vyos.opmode.UnconfiguredSubsystem(err) + +def _get_output_swanctl_sas_from_list(ra_output_list: list) -> str: + """ + Template for output for VICI + Inserts \n after each IKE SA + :param ra_output_list: IKE SAs list + :type ra_output_list: list + :return: formatted string + :rtype: str + """ + output = ''; + for sa_val in ra_output_list: + for sa in sa_val.values(): + swanctl_output: str = cmd( + f'sudo swanctl -l --ike-id {sa["uniqueid"]}') + output = f'{output}{swanctl_output}\n\n' + return output + + def _get_formatted_output_sas(sas): sa_data = [] for sa in sas: @@ -444,6 +463,7 @@ def reset_peer(peer: str, tunnel: typing.Optional[str] = None): except (vyos.ipsec.ViciCommandError) as err: raise vyos.opmode.IncorrectValue(err) + def reset_all_peers(): sitetosite_list = _get_all_sitetosite_peers_name_list() if sitetosite_list: @@ -457,6 +477,7 @@ def reset_all_peers(): raise vyos.opmode.UnconfiguredSubsystem( 'VPN IPSec site-to-site is not configured, aborting') + def _get_ra_session_list_by_username(username: typing.Optional[str] = None): """ Return list of remote-access IKE_SAs uniqueids @@ -466,15 +487,15 @@ def _get_ra_session_list_by_username(username: typing.Optional[str] = None): :rtype: """ list_sa_id = [] - sa_list = vyos.ipsec.get_vici_sas() + sa_list = _get_raw_data_sas() for sa_val in sa_list: for sa in sa_val.values(): if 'remote-eap-id' in sa: if username: - if username == sa['remote-eap-id'].decode(): - list_sa_id.append(sa['uniqueid'].decode()) + if username == sa['remote-eap-id']: + list_sa_id.append(sa['uniqueid']) else: - list_sa_id.append(sa['uniqueid'].decode()) + list_sa_id.append(sa['uniqueid']) return list_sa_id @@ -556,6 +577,24 @@ def show_sa(raw: bool): return _get_formatted_output_sas(sa_data) +def _get_output_sas_detail(ra_output_list: list) -> str: + """ + Formate all IKE SAs detail output + :param ra_output_list: IKE SAs list + :type ra_output_list: list + :return: formatted RA IKE SAs detail output + :rtype: str + """ + return _get_output_swanctl_sas_from_list(ra_output_list) + + +def show_sa_detail(raw: bool): + sa_data = _get_raw_data_sas() + if raw: + return sa_data + return _get_output_sas_detail(sa_data) + + def show_connections(raw: bool): list_conns = _get_convert_data_connections() list_sas = _get_raw_data_sas() @@ -573,6 +612,173 @@ def show_connections_summary(raw: bool): return _get_raw_connections_summary(list_conns, list_sas) +def _get_ra_sessions(username: typing.Optional[str] = None) -> list: + """ + Return list of remote-access IKE_SAs from VICI by username. + If username unspecified, return all remote-access IKE_SAs + :param username: Username of RA connection + :type username: str + :return: list of ra remote-access IKE_SAs + :rtype: list + """ + list_sa = [] + sa_list = _get_raw_data_sas() + for conn in sa_list: + for sa in conn.values(): + if 'remote-eap-id' in sa: + if username: + if username == sa['remote-eap-id']: + list_sa.append(conn) + else: + list_sa.append(conn) + return list_sa + + +def _filter_ikesas(list_sa: list, filter_key: str, filter_value: str) -> list: + """ + Filter IKE SAs by specifice key + :param list_sa: list of IKE SAs + :type list_sa: list + :param filter_key: Filter Key + :type filter_key: str + :param filter_value: Filter Value + :type filter_value: str + :return: Filtered list of IKE SAs + :rtype: list + """ + filtered_sa_list = [] + for conn in list_sa: + for sa in conn.values(): + if sa[filter_key] and sa[filter_key] == filter_value: + filtered_sa_list.append(conn) + return filtered_sa_list + + +def _get_last_installed_childsa(sa: dict) -> str: + """ + Return name of last installed active Child SA + :param sa: Dictionary with Child SAs + :type sa: dict + :return: Name of the Last installed active Child SA + :rtype: str + """ + child_sa_name = None + child_sa_id = 0 + for sa_name, child_sa in sa['child-sas'].items(): + if child_sa['state'] == 'INSTALLED': + if child_sa_id == 0 or int(child_sa['uniqueid']) > child_sa_id: + child_sa_id = int(child_sa['uniqueid']) + child_sa_name = sa_name + return child_sa_name + + +def _get_formatted_ike_proposal(sa: dict) -> str: + """ + Return IKE proposal string in format + EncrALG-EncrKeySize/PFR/HASH/DH-GROUP + :param sa: IKE SA + :type sa: dict + :return: IKE proposal string + :rtype: str + """ + proposal = '' + proposal = f'{proposal}{sa["encr-alg"]}' if 'encr-alg' in sa else proposal + proposal = f'{proposal}-{sa["encr-keysize"]}' if 'encr-keysize' in sa else proposal + proposal = f'{proposal}/{sa["prf-alg"]}' if 'prf-alg' in sa else proposal + proposal = f'{proposal}/{sa["integ-alg"]}' if 'integ-alg' in sa else proposal + proposal = f'{proposal}/{sa["dh-group"]}' if 'dh-group' in sa else proposal + return proposal + + +def _get_formatted_ipsec_proposal(sa: dict) -> str: + """ + Return IPSec proposal string in format + Protocol: EncrALG-EncrKeySize/HASH/PFS + :param sa: Child SA + :type sa: dict + :return: IPSec proposal string + :rtype: str + """ + proposal = '' + proposal = f'{proposal}{sa["protocol"]}' if 'protocol' in sa else proposal + proposal = f'{proposal}:{sa["encr-alg"]}' if 'encr-alg' in sa else proposal + proposal = f'{proposal}-{sa["encr-keysize"]}' if 'encr-keysize' in sa else proposal + proposal = f'{proposal}/{sa["integ-alg"]}' if 'integ-alg' in sa else proposal + proposal = f'{proposal}/{sa["dh-group"]}' if 'dh-group' in sa else proposal + return proposal + + +def _get_output_ra_sas_detail(ra_output_list: list) -> str: + """ + Formate RA IKE SAs detail output + :param ra_output_list: IKE SAs list + :type ra_output_list: list + :return: formatted RA IKE SAs detail output + :rtype: str + """ + return _get_output_swanctl_sas_from_list(ra_output_list) + + +def _get_formatted_output_ra_summary(ra_output_list: list): + sa_data = [] + for conn in ra_output_list: + for sa in conn.values(): + sa_id = sa['uniqueid'] if 'uniqueid' in sa else '' + sa_username = sa['remote-eap-id'] if 'remote-eap-id' in sa else '' + sa_protocol = f'IKEv{sa["version"]}' if 'version' in sa else '' + sa_remotehost = sa['remote-host'] if 'remote-host' in sa else '' + sa_remoteid = sa['remote-id'] if 'remote-id' in sa else '' + sa_ike_proposal = _get_formatted_ike_proposal(sa) + sa_tunnel_ip = sa['remote-vips'] + child_sa_key = _get_last_installed_childsa(sa) + if child_sa_key: + child_sa = sa['child-sas'][child_sa_key] + sa_ipsec_proposal = _get_formatted_ipsec_proposal(child_sa) + sa_state = "UP" + sa_uptime = seconds_to_human(sa['established']) + else: + sa_ipsec_proposal = '' + sa_state = "DOWN" + sa_uptime = '' + sa_data.append( + [sa_id, sa_username, sa_protocol, sa_state, sa_uptime, + sa_tunnel_ip, + sa_remotehost, sa_remoteid, sa_ike_proposal, + sa_ipsec_proposal]) + + headers = ["Connection ID", "Username", "Protocol", "State", "Uptime", + "Tunnel IP", "Remote Host", "Remote ID", "IKE Proposal", + "IPSec Proposal"] + sa_data = sorted(sa_data, key=_alphanum_key) + output = tabulate(sa_data, headers) + return output + + +def show_ra_detail(raw: bool, username: typing.Optional[str] = None, + conn_id: typing.Optional[str] = None): + list_sa: list = _get_ra_sessions() + if username: + list_sa = _filter_ikesas(list_sa, 'remote-eap-id', username) + elif conn_id: + list_sa = _filter_ikesas(list_sa, 'uniqueid', conn_id) + if not list_sa: + raise vyos.opmode.IncorrectValue( + f'No active connections found, aborting') + if raw: + return list_sa + return _get_output_ra_sas_detail(list_sa) + + +def show_ra_summary(raw: bool): + list_sa: list = _get_ra_sessions() + if not list_sa: + raise vyos.opmode.IncorrectValue( + f'No active connections found, aborting') + if raw: + return list_sa + return _get_formatted_output_ra_summary(list_sa) + + if __name__ == '__main__': try: res = vyos.opmode.run(sys.modules[__name__]) diff --git a/src/op_mode/show_vpn_ra.py b/src/op_mode/show_vpn_ra.py deleted file mode 100755 index 73688c4ea..000000000 --- a/src/op_mode/show_vpn_ra.py +++ /dev/null @@ -1,56 +0,0 @@ -#!/usr/bin/env python3 -# -# Copyright (C) 2019 VyOS maintainers and contributors -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License version 2 or later as -# published by the Free Software Foundation. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . - -import os -import sys -import re - -from vyos.util import popen - -# chech connection to pptp and l2tp daemon -def get_sessions(): - absent_pptp = False - absent_l2tp = False - pptp_cmd = "accel-cmd -p 2003 show sessions" - l2tp_cmd = "accel-cmd -p 2004 show sessions" - err_pattern = "^Connection.+failed$" - # This value for chack only output header without sessions. - len_def_header = 170 - - # Check pptp - output, err = popen(pptp_cmd, decode='utf-8') - if not err and len(output) > len_def_header and not re.search(err_pattern, output): - print(output) - else: - absent_pptp = True - - # Check l2tp - output, err = popen(l2tp_cmd, decode='utf-8') - if not err and len(output) > len_def_header and not re.search(err_pattern, output): - print(output) - else: - absent_l2tp = True - - if absent_l2tp and absent_pptp: - print("No active remote access VPN sessions") - - -def main(): - get_sessions() - - -if __name__ == '__main__': - main() -- cgit v1.2.3