diff options
| author | Christian Breunig <christian@breunig.cc> | 2023-04-21 22:04:09 +0200 | 
|---|---|---|
| committer | GitHub <noreply@github.com> | 2023-04-21 22:04:09 +0200 | 
| commit | 66bc0d231aa177cc5c44d17d1fef0d0d2d873773 (patch) | |
| tree | b3bfe8f36026b6ddb6537d3ae027071c1f0e1c16 /src/op_mode/ipsec.py | |
| parent | 19c5040eaa097a033ae0d3dc757d0e44c8690197 (diff) | |
| parent | b251a183b30a5915fb8e2a8f7a194d75e65ccb34 (diff) | |
| download | vyos-1x-66bc0d231aa177cc5c44d17d1fef0d0d2d873773.tar.gz vyos-1x-66bc0d231aa177cc5c44d17d1fef0d0d2d873773.zip | |
Merge pull request #1957 from aapostoliuk/T5042-sagitta
ipsec: T5042: Rewritten 'show vpn ipsec remote-access' command
Diffstat (limited to 'src/op_mode/ipsec.py')
| -rwxr-xr-x | src/op_mode/ipsec.py | 216 | 
1 files changed, 211 insertions, 5 deletions
| diff --git a/src/op_mode/ipsec.py b/src/op_mode/ipsec.py index 7f4fb72e5..db4948d7a 100755 --- a/src/op_mode/ipsec.py +++ b/src/op_mode/ipsec.py @@ -13,7 +13,6 @@  #  # You should have received a copy of the GNU General Public License  # along with this program.  If not, see <http://www.gnu.org/licenses/>. -  import re  import sys  import typing @@ -24,6 +23,7 @@ from tabulate import tabulate  from vyos.util import convert_data  from vyos.util import seconds_to_human +from vyos.util import cmd  from vyos.configquery import ConfigTreeQuery  import vyos.opmode @@ -46,6 +46,25 @@ def _get_raw_data_sas():      except (vyos.ipsec.ViciInitiateError) as err:          raise vyos.opmode.UnconfiguredSubsystem(err) + +def _get_output_swanctl_sas_from_list(ra_output_list: list) -> str: +    """ +    Template for output for VICI +    Inserts \n after each IKE SA +    :param ra_output_list: IKE SAs list +    :type ra_output_list: list +    :return: formatted string +    :rtype: str +    """ +    output = ''; +    for sa_val in ra_output_list: +        for sa in sa_val.values(): +            swanctl_output: str = cmd( +                f'sudo swanctl -l --ike-id {sa["uniqueid"]}') +        output = f'{output}{swanctl_output}\n\n' +    return output + +  def _get_formatted_output_sas(sas):      sa_data = []      for sa in sas: @@ -444,6 +463,7 @@ def reset_peer(peer: str, tunnel: typing.Optional[str] = None):      except (vyos.ipsec.ViciCommandError) as err:          raise vyos.opmode.IncorrectValue(err) +  def reset_all_peers():      sitetosite_list = _get_all_sitetosite_peers_name_list()      if sitetosite_list: @@ -457,6 +477,7 @@ def reset_all_peers():          raise vyos.opmode.UnconfiguredSubsystem(              'VPN IPSec site-to-site is not configured, aborting') +  def _get_ra_session_list_by_username(username: typing.Optional[str] = None):      """      Return list of remote-access IKE_SAs uniqueids @@ -466,15 +487,15 @@ def _get_ra_session_list_by_username(username: typing.Optional[str] = None):      :rtype:      """      list_sa_id = [] -    sa_list = vyos.ipsec.get_vici_sas() +    sa_list = _get_raw_data_sas()      for sa_val in sa_list:          for sa in sa_val.values():              if 'remote-eap-id' in sa:                  if username: -                    if username == sa['remote-eap-id'].decode(): -                        list_sa_id.append(sa['uniqueid'].decode()) +                    if username == sa['remote-eap-id']: +                        list_sa_id.append(sa['uniqueid'])                  else: -                    list_sa_id.append(sa['uniqueid'].decode()) +                    list_sa_id.append(sa['uniqueid'])      return list_sa_id @@ -556,6 +577,24 @@ def show_sa(raw: bool):      return _get_formatted_output_sas(sa_data) +def _get_output_sas_detail(ra_output_list: list) -> str: +    """ +    Formate all IKE SAs detail output +    :param ra_output_list: IKE SAs list +    :type ra_output_list: list +    :return: formatted RA IKE SAs detail output +    :rtype: str +    """ +    return _get_output_swanctl_sas_from_list(ra_output_list) + + +def show_sa_detail(raw: bool): +    sa_data = _get_raw_data_sas() +    if raw: +        return sa_data +    return _get_output_sas_detail(sa_data) + +  def show_connections(raw: bool):      list_conns = _get_convert_data_connections()      list_sas = _get_raw_data_sas() @@ -573,6 +612,173 @@ def show_connections_summary(raw: bool):          return _get_raw_connections_summary(list_conns, list_sas) +def _get_ra_sessions(username: typing.Optional[str] = None) -> list: +    """ +    Return list of remote-access IKE_SAs from VICI by username. +    If username unspecified, return all remote-access IKE_SAs +    :param username: Username of RA connection +    :type username: str +    :return: list of ra remote-access IKE_SAs +    :rtype: list +    """ +    list_sa = [] +    sa_list = _get_raw_data_sas() +    for conn in sa_list: +        for sa in conn.values(): +            if 'remote-eap-id' in sa: +                if username: +                    if username == sa['remote-eap-id']: +                        list_sa.append(conn) +                else: +                    list_sa.append(conn) +    return list_sa + + +def _filter_ikesas(list_sa: list, filter_key: str, filter_value: str) -> list: +    """ +    Filter IKE SAs by specifice key +    :param list_sa: list of IKE SAs +    :type list_sa: list +    :param filter_key: Filter Key +    :type filter_key: str +    :param filter_value: Filter Value +    :type filter_value: str +    :return: Filtered list of IKE SAs +    :rtype: list +    """ +    filtered_sa_list = [] +    for conn in list_sa: +        for sa in conn.values(): +            if sa[filter_key] and sa[filter_key] == filter_value: +                filtered_sa_list.append(conn) +    return filtered_sa_list + + +def _get_last_installed_childsa(sa: dict) -> str: +    """ +    Return name of last installed active Child SA +    :param sa: Dictionary with Child SAs +    :type sa: dict +    :return: Name of the Last installed active Child SA +    :rtype: str +    """ +    child_sa_name = None +    child_sa_id = 0 +    for sa_name, child_sa in sa['child-sas'].items(): +        if child_sa['state'] == 'INSTALLED': +            if child_sa_id == 0 or int(child_sa['uniqueid']) > child_sa_id: +                child_sa_id = int(child_sa['uniqueid']) +                child_sa_name = sa_name +    return child_sa_name + + +def _get_formatted_ike_proposal(sa: dict) -> str: +    """ +    Return IKE proposal string in format +    EncrALG-EncrKeySize/PFR/HASH/DH-GROUP +    :param sa: IKE SA +    :type sa: dict +    :return: IKE proposal string +    :rtype: str +    """ +    proposal = '' +    proposal = f'{proposal}{sa["encr-alg"]}' if 'encr-alg' in sa else proposal +    proposal = f'{proposal}-{sa["encr-keysize"]}' if 'encr-keysize' in sa else proposal +    proposal = f'{proposal}/{sa["prf-alg"]}' if 'prf-alg' in sa else proposal +    proposal = f'{proposal}/{sa["integ-alg"]}' if 'integ-alg' in sa else proposal +    proposal = f'{proposal}/{sa["dh-group"]}' if 'dh-group' in sa else proposal +    return proposal + + +def _get_formatted_ipsec_proposal(sa: dict) -> str: +    """ +    Return IPSec proposal string in format +    Protocol: EncrALG-EncrKeySize/HASH/PFS +    :param sa: Child SA +    :type sa: dict +    :return: IPSec proposal string +    :rtype: str +    """ +    proposal = '' +    proposal = f'{proposal}{sa["protocol"]}' if 'protocol' in sa else proposal +    proposal = f'{proposal}:{sa["encr-alg"]}' if 'encr-alg' in sa else proposal +    proposal = f'{proposal}-{sa["encr-keysize"]}' if 'encr-keysize' in sa else proposal +    proposal = f'{proposal}/{sa["integ-alg"]}' if 'integ-alg' in sa else proposal +    proposal = f'{proposal}/{sa["dh-group"]}' if 'dh-group' in sa else proposal +    return proposal + + +def _get_output_ra_sas_detail(ra_output_list: list) -> str: +    """ +    Formate RA IKE SAs detail output +    :param ra_output_list: IKE SAs list +    :type ra_output_list: list +    :return: formatted RA IKE SAs detail output +    :rtype: str +    """ +    return _get_output_swanctl_sas_from_list(ra_output_list) + + +def _get_formatted_output_ra_summary(ra_output_list: list): +    sa_data = [] +    for conn in ra_output_list: +        for sa in conn.values(): +            sa_id = sa['uniqueid'] if 'uniqueid' in sa else '' +            sa_username = sa['remote-eap-id'] if 'remote-eap-id' in sa else '' +            sa_protocol = f'IKEv{sa["version"]}' if 'version' in sa else '' +            sa_remotehost = sa['remote-host'] if 'remote-host' in sa else '' +            sa_remoteid = sa['remote-id'] if 'remote-id' in sa else '' +            sa_ike_proposal = _get_formatted_ike_proposal(sa) +            sa_tunnel_ip = sa['remote-vips'] +            child_sa_key = _get_last_installed_childsa(sa) +            if child_sa_key: +                child_sa = sa['child-sas'][child_sa_key] +                sa_ipsec_proposal = _get_formatted_ipsec_proposal(child_sa) +                sa_state = "UP" +                sa_uptime = seconds_to_human(sa['established']) +            else: +                sa_ipsec_proposal = '' +                sa_state = "DOWN" +                sa_uptime = '' +            sa_data.append( +                [sa_id, sa_username, sa_protocol, sa_state, sa_uptime, +                 sa_tunnel_ip, +                 sa_remotehost, sa_remoteid, sa_ike_proposal, +                 sa_ipsec_proposal]) + +    headers = ["Connection ID", "Username", "Protocol", "State", "Uptime", +               "Tunnel IP", "Remote Host", "Remote ID", "IKE Proposal", +               "IPSec Proposal"] +    sa_data = sorted(sa_data, key=_alphanum_key) +    output = tabulate(sa_data, headers) +    return output + + +def show_ra_detail(raw: bool, username: typing.Optional[str] = None, +                   conn_id: typing.Optional[str] = None): +    list_sa: list = _get_ra_sessions() +    if username: +        list_sa = _filter_ikesas(list_sa, 'remote-eap-id', username) +    elif conn_id: +        list_sa = _filter_ikesas(list_sa, 'uniqueid', conn_id) +    if not list_sa: +        raise vyos.opmode.IncorrectValue( +            f'No active connections found, aborting') +    if raw: +        return list_sa +    return _get_output_ra_sas_detail(list_sa) + + +def show_ra_summary(raw: bool): +    list_sa: list = _get_ra_sessions() +    if not list_sa: +        raise vyos.opmode.IncorrectValue( +            f'No active connections found, aborting') +    if raw: +        return list_sa +    return _get_formatted_output_ra_summary(list_sa) + +  if __name__ == '__main__':      try:          res = vyos.opmode.run(sys.modules[__name__]) | 
