summaryrefslogtreecommitdiff
path: root/src/op_mode/nat.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/op_mode/nat.py')
-rwxr-xr-xsrc/op_mode/nat.py52
1 files changed, 31 insertions, 21 deletions
diff --git a/src/op_mode/nat.py b/src/op_mode/nat.py
index f899eb3dc..c92795745 100755
--- a/src/op_mode/nat.py
+++ b/src/op_mode/nat.py
@@ -18,23 +18,23 @@ import jmespath
import json
import sys
import xmltodict
+import typing
-from sys import exit
from tabulate import tabulate
-from vyos.configquery import ConfigTreeQuery
+import vyos.opmode
+from vyos.configquery import ConfigTreeQuery
from vyos.util import cmd
from vyos.util import dict_search
-import vyos.opmode
-
-
base = 'nat'
unconf_message = 'NAT is not configured'
+ArgDirection = typing.Literal['source', 'destination']
+ArgFamily = typing.Literal['inet', 'inet6']
-def _get_xml_translation(direction, family):
+def _get_xml_translation(direction, family, address=None):
"""
Get conntrack XML output --src-nat|--dst-nat
"""
@@ -42,7 +42,10 @@ def _get_xml_translation(direction, family):
opt = '--src-nat'
if direction == 'destination':
opt = '--dst-nat'
- return cmd(f'sudo conntrack --dump --family {family} {opt} --output xml')
+ tmp = f'conntrack --dump --family {family} {opt} --output xml'
+ if address:
+ tmp += f' --src {address}'
+ return cmd(tmp)
def _xml_to_dict(xml):
@@ -66,7 +69,7 @@ def _get_json_data(direction, family):
if direction == 'destination':
chain = 'PREROUTING'
family = 'ip6' if family == 'inet6' else 'ip'
- return cmd(f'sudo nft --json list chain {family} vyos_nat {chain}')
+ return cmd(f'nft --json list chain {family} vyos_nat {chain}')
def _get_raw_data_rules(direction, family):
@@ -82,11 +85,11 @@ def _get_raw_data_rules(direction, family):
return rules
-def _get_raw_translation(direction, family):
+def _get_raw_translation(direction, family, address=None):
"""
Return: dictionary
"""
- xml = _get_xml_translation(direction, family)
+ xml = _get_xml_translation(direction, family, address)
if len(xml) == 0:
output = {'conntrack':
{
@@ -231,7 +234,7 @@ def _get_formatted_output_statistics(data, direction):
return output
-def _get_formatted_translation(dict_data, nat_direction, family):
+def _get_formatted_translation(dict_data, nat_direction, family, verbose):
data_entries = []
if 'error' in dict_data['conntrack']:
return 'Entries not found'
@@ -269,14 +272,14 @@ def _get_formatted_translation(dict_data, nat_direction, family):
reply_src = f'{reply_src}:{reply_sport}' if reply_sport else reply_src
reply_dst = f'{reply_dst}:{reply_dport}' if reply_dport else reply_dst
state = meta['state'] if 'state' in meta else ''
- mark = meta['mark']
+ mark = meta.get('mark', '')
zone = meta['zone'] if 'zone' in meta else ''
if nat_direction == 'source':
- data_entries.append(
- [orig_src, reply_dst, proto, timeout, mark, zone])
+ tmp = [orig_src, reply_dst, proto, timeout, mark, zone]
+ data_entries.append(tmp)
elif nat_direction == 'destination':
- data_entries.append(
- [orig_dst, reply_src, proto, timeout, mark, zone])
+ tmp = [orig_dst, reply_src, proto, timeout, mark, zone]
+ data_entries.append(tmp)
headers = ["Pre-NAT", "Post-NAT", "Proto", "Timeout", "Mark", "Zone"]
output = tabulate(data_entries, headers, numalign="left")
@@ -297,7 +300,7 @@ def _verify(func):
@_verify
-def show_rules(raw: bool, direction: str, family: str):
+def show_rules(raw: bool, direction: ArgDirection, family: ArgFamily):
nat_rules = _get_raw_data_rules(direction, family)
if raw:
return nat_rules
@@ -306,7 +309,7 @@ def show_rules(raw: bool, direction: str, family: str):
@_verify
-def show_statistics(raw: bool, direction: str, family: str):
+def show_statistics(raw: bool, direction: ArgDirection, family: ArgFamily):
nat_statistics = _get_raw_data_rules(direction, family)
if raw:
return nat_statistics
@@ -315,13 +318,20 @@ def show_statistics(raw: bool, direction: str, family: str):
@_verify
-def show_translations(raw: bool, direction: str, family: str):
+def show_translations(raw: bool, direction: ArgDirection,
+ family: ArgFamily,
+ address: typing.Optional[str],
+ verbose: typing.Optional[bool]):
family = 'ipv6' if family == 'inet6' else 'ipv4'
- nat_translation = _get_raw_translation(direction, family)
+ nat_translation = _get_raw_translation(direction,
+ family=family,
+ address=address)
+
if raw:
return nat_translation
else:
- return _get_formatted_translation(nat_translation, direction, family)
+ return _get_formatted_translation(nat_translation, direction, family,
+ verbose)
if __name__ == '__main__':