diff options
Diffstat (limited to 'src/op_mode/ipsec.py')
-rwxr-xr-x | src/op_mode/ipsec.py | 216 |
1 files changed, 211 insertions, 5 deletions
diff --git a/src/op_mode/ipsec.py b/src/op_mode/ipsec.py index 7f4fb72e5..db4948d7a 100755 --- a/src/op_mode/ipsec.py +++ b/src/op_mode/ipsec.py @@ -13,7 +13,6 @@ # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. - import re import sys import typing @@ -24,6 +23,7 @@ from tabulate import tabulate from vyos.util import convert_data from vyos.util import seconds_to_human +from vyos.util import cmd from vyos.configquery import ConfigTreeQuery import vyos.opmode @@ -46,6 +46,25 @@ def _get_raw_data_sas(): except (vyos.ipsec.ViciInitiateError) as err: raise vyos.opmode.UnconfiguredSubsystem(err) + +def _get_output_swanctl_sas_from_list(ra_output_list: list) -> str: + """ + Template for output for VICI + Inserts \n after each IKE SA + :param ra_output_list: IKE SAs list + :type ra_output_list: list + :return: formatted string + :rtype: str + """ + output = ''; + for sa_val in ra_output_list: + for sa in sa_val.values(): + swanctl_output: str = cmd( + f'sudo swanctl -l --ike-id {sa["uniqueid"]}') + output = f'{output}{swanctl_output}\n\n' + return output + + def _get_formatted_output_sas(sas): sa_data = [] for sa in sas: @@ -444,6 +463,7 @@ def reset_peer(peer: str, tunnel: typing.Optional[str] = None): except (vyos.ipsec.ViciCommandError) as err: raise vyos.opmode.IncorrectValue(err) + def reset_all_peers(): sitetosite_list = _get_all_sitetosite_peers_name_list() if sitetosite_list: @@ -457,6 +477,7 @@ def reset_all_peers(): raise vyos.opmode.UnconfiguredSubsystem( 'VPN IPSec site-to-site is not configured, aborting') + def _get_ra_session_list_by_username(username: typing.Optional[str] = None): """ Return list of remote-access IKE_SAs uniqueids @@ -466,15 +487,15 @@ def _get_ra_session_list_by_username(username: typing.Optional[str] = None): :rtype: """ list_sa_id = [] - sa_list = vyos.ipsec.get_vici_sas() + sa_list = _get_raw_data_sas() for sa_val in sa_list: for sa in sa_val.values(): if 'remote-eap-id' in sa: if username: - if username == sa['remote-eap-id'].decode(): - list_sa_id.append(sa['uniqueid'].decode()) + if username == sa['remote-eap-id']: + list_sa_id.append(sa['uniqueid']) else: - list_sa_id.append(sa['uniqueid'].decode()) + list_sa_id.append(sa['uniqueid']) return list_sa_id @@ -556,6 +577,24 @@ def show_sa(raw: bool): return _get_formatted_output_sas(sa_data) +def _get_output_sas_detail(ra_output_list: list) -> str: + """ + Formate all IKE SAs detail output + :param ra_output_list: IKE SAs list + :type ra_output_list: list + :return: formatted RA IKE SAs detail output + :rtype: str + """ + return _get_output_swanctl_sas_from_list(ra_output_list) + + +def show_sa_detail(raw: bool): + sa_data = _get_raw_data_sas() + if raw: + return sa_data + return _get_output_sas_detail(sa_data) + + def show_connections(raw: bool): list_conns = _get_convert_data_connections() list_sas = _get_raw_data_sas() @@ -573,6 +612,173 @@ def show_connections_summary(raw: bool): return _get_raw_connections_summary(list_conns, list_sas) +def _get_ra_sessions(username: typing.Optional[str] = None) -> list: + """ + Return list of remote-access IKE_SAs from VICI by username. + If username unspecified, return all remote-access IKE_SAs + :param username: Username of RA connection + :type username: str + :return: list of ra remote-access IKE_SAs + :rtype: list + """ + list_sa = [] + sa_list = _get_raw_data_sas() + for conn in sa_list: + for sa in conn.values(): + if 'remote-eap-id' in sa: + if username: + if username == sa['remote-eap-id']: + list_sa.append(conn) + else: + list_sa.append(conn) + return list_sa + + +def _filter_ikesas(list_sa: list, filter_key: str, filter_value: str) -> list: + """ + Filter IKE SAs by specifice key + :param list_sa: list of IKE SAs + :type list_sa: list + :param filter_key: Filter Key + :type filter_key: str + :param filter_value: Filter Value + :type filter_value: str + :return: Filtered list of IKE SAs + :rtype: list + """ + filtered_sa_list = [] + for conn in list_sa: + for sa in conn.values(): + if sa[filter_key] and sa[filter_key] == filter_value: + filtered_sa_list.append(conn) + return filtered_sa_list + + +def _get_last_installed_childsa(sa: dict) -> str: + """ + Return name of last installed active Child SA + :param sa: Dictionary with Child SAs + :type sa: dict + :return: Name of the Last installed active Child SA + :rtype: str + """ + child_sa_name = None + child_sa_id = 0 + for sa_name, child_sa in sa['child-sas'].items(): + if child_sa['state'] == 'INSTALLED': + if child_sa_id == 0 or int(child_sa['uniqueid']) > child_sa_id: + child_sa_id = int(child_sa['uniqueid']) + child_sa_name = sa_name + return child_sa_name + + +def _get_formatted_ike_proposal(sa: dict) -> str: + """ + Return IKE proposal string in format + EncrALG-EncrKeySize/PFR/HASH/DH-GROUP + :param sa: IKE SA + :type sa: dict + :return: IKE proposal string + :rtype: str + """ + proposal = '' + proposal = f'{proposal}{sa["encr-alg"]}' if 'encr-alg' in sa else proposal + proposal = f'{proposal}-{sa["encr-keysize"]}' if 'encr-keysize' in sa else proposal + proposal = f'{proposal}/{sa["prf-alg"]}' if 'prf-alg' in sa else proposal + proposal = f'{proposal}/{sa["integ-alg"]}' if 'integ-alg' in sa else proposal + proposal = f'{proposal}/{sa["dh-group"]}' if 'dh-group' in sa else proposal + return proposal + + +def _get_formatted_ipsec_proposal(sa: dict) -> str: + """ + Return IPSec proposal string in format + Protocol: EncrALG-EncrKeySize/HASH/PFS + :param sa: Child SA + :type sa: dict + :return: IPSec proposal string + :rtype: str + """ + proposal = '' + proposal = f'{proposal}{sa["protocol"]}' if 'protocol' in sa else proposal + proposal = f'{proposal}:{sa["encr-alg"]}' if 'encr-alg' in sa else proposal + proposal = f'{proposal}-{sa["encr-keysize"]}' if 'encr-keysize' in sa else proposal + proposal = f'{proposal}/{sa["integ-alg"]}' if 'integ-alg' in sa else proposal + proposal = f'{proposal}/{sa["dh-group"]}' if 'dh-group' in sa else proposal + return proposal + + +def _get_output_ra_sas_detail(ra_output_list: list) -> str: + """ + Formate RA IKE SAs detail output + :param ra_output_list: IKE SAs list + :type ra_output_list: list + :return: formatted RA IKE SAs detail output + :rtype: str + """ + return _get_output_swanctl_sas_from_list(ra_output_list) + + +def _get_formatted_output_ra_summary(ra_output_list: list): + sa_data = [] + for conn in ra_output_list: + for sa in conn.values(): + sa_id = sa['uniqueid'] if 'uniqueid' in sa else '' + sa_username = sa['remote-eap-id'] if 'remote-eap-id' in sa else '' + sa_protocol = f'IKEv{sa["version"]}' if 'version' in sa else '' + sa_remotehost = sa['remote-host'] if 'remote-host' in sa else '' + sa_remoteid = sa['remote-id'] if 'remote-id' in sa else '' + sa_ike_proposal = _get_formatted_ike_proposal(sa) + sa_tunnel_ip = sa['remote-vips'] + child_sa_key = _get_last_installed_childsa(sa) + if child_sa_key: + child_sa = sa['child-sas'][child_sa_key] + sa_ipsec_proposal = _get_formatted_ipsec_proposal(child_sa) + sa_state = "UP" + sa_uptime = seconds_to_human(sa['established']) + else: + sa_ipsec_proposal = '' + sa_state = "DOWN" + sa_uptime = '' + sa_data.append( + [sa_id, sa_username, sa_protocol, sa_state, sa_uptime, + sa_tunnel_ip, + sa_remotehost, sa_remoteid, sa_ike_proposal, + sa_ipsec_proposal]) + + headers = ["Connection ID", "Username", "Protocol", "State", "Uptime", + "Tunnel IP", "Remote Host", "Remote ID", "IKE Proposal", + "IPSec Proposal"] + sa_data = sorted(sa_data, key=_alphanum_key) + output = tabulate(sa_data, headers) + return output + + +def show_ra_detail(raw: bool, username: typing.Optional[str] = None, + conn_id: typing.Optional[str] = None): + list_sa: list = _get_ra_sessions() + if username: + list_sa = _filter_ikesas(list_sa, 'remote-eap-id', username) + elif conn_id: + list_sa = _filter_ikesas(list_sa, 'uniqueid', conn_id) + if not list_sa: + raise vyos.opmode.IncorrectValue( + f'No active connections found, aborting') + if raw: + return list_sa + return _get_output_ra_sas_detail(list_sa) + + +def show_ra_summary(raw: bool): + list_sa: list = _get_ra_sessions() + if not list_sa: + raise vyos.opmode.IncorrectValue( + f'No active connections found, aborting') + if raw: + return list_sa + return _get_formatted_output_ra_summary(list_sa) + + if __name__ == '__main__': try: res = vyos.opmode.run(sys.modules[__name__]) |