summaryrefslogtreecommitdiff
path: root/src/op_mode/ipsec.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/op_mode/ipsec.py')
-rwxr-xr-xsrc/op_mode/ipsec.py141
1 files changed, 132 insertions, 9 deletions
diff --git a/src/op_mode/ipsec.py b/src/op_mode/ipsec.py
index 8e76f4cc0..7f4fb72e5 100755
--- a/src/op_mode/ipsec.py
+++ b/src/op_mode/ipsec.py
@@ -24,6 +24,7 @@ from tabulate import tabulate
from vyos.util import convert_data
from vyos.util import seconds_to_human
+from vyos.configquery import ConfigTreeQuery
import vyos.opmode
import vyos.ipsec
@@ -401,30 +402,152 @@ def _get_childsa_id_list(ike_sas: list) -> list:
return list_childsa_id
+def _get_all_sitetosite_peers_name_list() -> list:
+ """
+ Return site-to-site peers configuration
+ :return: site-to-site peers configuration
+ :rtype: list
+ """
+ conf: ConfigTreeQuery = ConfigTreeQuery()
+ config_path = ['vpn', 'ipsec', 'site-to-site', 'peer']
+ peers_config = conf.get_config_dict(config_path, key_mangling=('-', '_'),
+ get_first_key=True,
+ no_tag_node_value_mangle=True)
+ peers_list: list = []
+ for name in peers_config:
+ peers_list.append(name)
+ return peers_list
+
+
def reset_peer(peer: str, tunnel: typing.Optional[str] = None):
# Convert tunnel to Strongwan format of CHILD_SA
+ tunnel_sw = None
if tunnel:
if tunnel.isnumeric():
- tunnel = f'{peer}-tunnel-{tunnel}'
+ tunnel_sw = f'{peer}-tunnel-{tunnel}'
elif tunnel == 'vti':
- tunnel = f'{peer}-vti'
+ tunnel_sw = f'{peer}-vti'
try:
- sa_list: list = vyos.ipsec.get_vici_sas_by_name(peer, tunnel)
-
+ sa_list: list = vyos.ipsec.get_vici_sas_by_name(peer, tunnel_sw)
if not sa_list:
- raise vyos.opmode.IncorrectValue('Peer not found, aborting')
+ raise vyos.opmode.IncorrectValue(
+ f'Peer\'s {peer} SA(s) not found, aborting')
if tunnel and sa_list:
childsa_id_list: list = _get_childsa_id_list(sa_list)
if not childsa_id_list:
raise vyos.opmode.IncorrectValue(
- 'Peer or tunnel(s) not found, aborting')
- vyos.ipsec.terminate_vici_by_name(peer, tunnel)
- print('Peer reset result: success')
+ f'Peer {peer} tunnel {tunnel} SA(s) not found, aborting')
+ vyos.ipsec.terminate_vici_by_name(peer, tunnel_sw)
+ print(f'Peer {peer} reset result: success')
except (vyos.ipsec.ViciInitiateError) as err:
raise vyos.opmode.UnconfiguredSubsystem(err)
- except (vyos.ipsec.ViciInitiateError) as err:
+ except (vyos.ipsec.ViciCommandError) as err:
raise vyos.opmode.IncorrectValue(err)
+def reset_all_peers():
+ sitetosite_list = _get_all_sitetosite_peers_name_list()
+ if sitetosite_list:
+ for peer_name in sitetosite_list:
+ try:
+ reset_peer(peer_name)
+ except (vyos.opmode.IncorrectValue) as err:
+ print(err)
+ print('Peers reset result: success')
+ else:
+ raise vyos.opmode.UnconfiguredSubsystem(
+ 'VPN IPSec site-to-site is not configured, aborting')
+
+def _get_ra_session_list_by_username(username: typing.Optional[str] = None):
+ """
+ Return list of remote-access IKE_SAs uniqueids
+ :param username:
+ :type username:
+ :return:
+ :rtype:
+ """
+ list_sa_id = []
+ sa_list = vyos.ipsec.get_vici_sas()
+ for sa_val in sa_list:
+ for sa in sa_val.values():
+ if 'remote-eap-id' in sa:
+ if username:
+ if username == sa['remote-eap-id'].decode():
+ list_sa_id.append(sa['uniqueid'].decode())
+ else:
+ list_sa_id.append(sa['uniqueid'].decode())
+ return list_sa_id
+
+
+def reset_ra(username: typing.Optional[str] = None):
+ #Reset remote-access ipsec sessions
+ if username:
+ list_sa_id = _get_ra_session_list_by_username(username)
+ else:
+ list_sa_id = _get_ra_session_list_by_username()
+ if list_sa_id:
+ vyos.ipsec.terminate_vici_ikeid_list(list_sa_id)
+
+
+def reset_profile_dst(profile: str, tunnel: str, nbma_dst: str):
+ if profile and tunnel and nbma_dst:
+ ike_sa_name = f'dmvpn-{profile}-{tunnel}'
+ try:
+ # Get IKE SAs
+ sa_list = convert_data(
+ vyos.ipsec.get_vici_sas_by_name(ike_sa_name, None))
+ if not sa_list:
+ raise vyos.opmode.IncorrectValue(
+ f'SA(s) for profile {profile} tunnel {tunnel} not found, aborting')
+ sa_nbma_list = list([x for x in sa_list if
+ ike_sa_name in x and x[ike_sa_name][
+ 'remote-host'] == nbma_dst])
+ if not sa_nbma_list:
+ raise vyos.opmode.IncorrectValue(
+ f'SA(s) for profile {profile} tunnel {tunnel} remote-host {nbma_dst} not found, aborting')
+ # terminate IKE SAs
+ vyos.ipsec.terminate_vici_ikeid_list(list(
+ [x[ike_sa_name]['uniqueid'] for x in sa_nbma_list if
+ ike_sa_name in x]))
+ # initiate IKE SAs
+ for ike in sa_nbma_list:
+ if ike_sa_name in ike:
+ vyos.ipsec.vici_initiate(ike_sa_name, 'dmvpn',
+ ike[ike_sa_name]['local-host'],
+ ike[ike_sa_name]['remote-host'])
+ print(
+ f'Profile {profile} tunnel {tunnel} remote-host {nbma_dst} reset result: success')
+ except (vyos.ipsec.ViciInitiateError) as err:
+ raise vyos.opmode.UnconfiguredSubsystem(err)
+ except (vyos.ipsec.ViciCommandError) as err:
+ raise vyos.opmode.IncorrectValue(err)
+
+
+def reset_profile_all(profile: str, tunnel: str):
+ if profile and tunnel:
+ ike_sa_name = f'dmvpn-{profile}-{tunnel}'
+ try:
+ # Get IKE SAs
+ sa_list: list = convert_data(
+ vyos.ipsec.get_vici_sas_by_name(ike_sa_name, None))
+ if not sa_list:
+ raise vyos.opmode.IncorrectValue(
+ f'SA(s) for profile {profile} tunnel {tunnel} not found, aborting')
+ # terminate IKE SAs
+ vyos.ipsec.terminate_vici_by_name(ike_sa_name, None)
+ # initiate IKE SAs
+ for ike in sa_list:
+ if ike_sa_name in ike:
+ vyos.ipsec.vici_initiate(ike_sa_name, 'dmvpn',
+ ike[ike_sa_name]['local-host'],
+ ike[ike_sa_name]['remote-host'])
+ print(
+ f'Profile {profile} tunnel {tunnel} remote-host {ike[ike_sa_name]["remote-host"]} reset result: success')
+ print(f'Profile {profile} tunnel {tunnel} reset result: success')
+ except (vyos.ipsec.ViciInitiateError) as err:
+ raise vyos.opmode.UnconfiguredSubsystem(err)
+ except (vyos.ipsec.ViciCommandError) as err:
+ raise vyos.opmode.IncorrectValue(err)
+
def show_sa(raw: bool):
sa_data = _get_raw_data_sas()