summaryrefslogtreecommitdiff
path: root/src/op_mode/nat.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/op_mode/nat.py')
-rwxr-xr-xsrc/op_mode/nat.py62
1 files changed, 46 insertions, 16 deletions
diff --git a/src/op_mode/nat.py b/src/op_mode/nat.py
index 845dbbb2c..cf06de0e9 100755
--- a/src/op_mode/nat.py
+++ b/src/op_mode/nat.py
@@ -18,17 +18,21 @@ import jmespath
import json
import sys
import xmltodict
+import typing
-from sys import exit
from tabulate import tabulate
+import vyos.opmode
+
+from vyos.configquery import ConfigTreeQuery
from vyos.util import cmd
from vyos.util import dict_search
-import vyos.opmode
+base = 'nat'
+unconf_message = 'NAT is not configured'
-def _get_xml_translation(direction, family):
+def _get_xml_translation(direction, family, address=None):
"""
Get conntrack XML output --src-nat|--dst-nat
"""
@@ -36,7 +40,10 @@ def _get_xml_translation(direction, family):
opt = '--src-nat'
if direction == 'destination':
opt = '--dst-nat'
- return cmd(f'sudo conntrack --dump --family {family} {opt} --output xml')
+ tmp = f'conntrack --dump --family {family} {opt} --output xml'
+ if address:
+ tmp += f' --src {address}'
+ return cmd(tmp)
def _xml_to_dict(xml):
@@ -60,7 +67,7 @@ def _get_json_data(direction, family):
if direction == 'destination':
chain = 'PREROUTING'
family = 'ip6' if family == 'inet6' else 'ip'
- return cmd(f'sudo nft --json list chain {family} vyos_nat {chain}')
+ return cmd(f'nft --json list chain {family} vyos_nat {chain}')
def _get_raw_data_rules(direction, family):
@@ -76,11 +83,11 @@ def _get_raw_data_rules(direction, family):
return rules
-def _get_raw_translation(direction, family):
+def _get_raw_translation(direction, family, address=None):
"""
Return: dictionary
"""
- xml = _get_xml_translation(direction, family)
+ xml = _get_xml_translation(direction, family, address)
if len(xml) == 0:
output = {'conntrack':
{
@@ -225,7 +232,7 @@ def _get_formatted_output_statistics(data, direction):
return output
-def _get_formatted_translation(dict_data, nat_direction, family):
+def _get_formatted_translation(dict_data, nat_direction, family, verbose):
data_entries = []
if 'error' in dict_data['conntrack']:
return 'Entries not found'
@@ -263,20 +270,34 @@ def _get_formatted_translation(dict_data, nat_direction, family):
reply_src = f'{reply_src}:{reply_sport}' if reply_sport else reply_src
reply_dst = f'{reply_dst}:{reply_dport}' if reply_dport else reply_dst
state = meta['state'] if 'state' in meta else ''
- mark = meta['mark']
+ mark = meta.get('mark', '')
zone = meta['zone'] if 'zone' in meta else ''
if nat_direction == 'source':
- data_entries.append(
- [orig_src, reply_dst, proto, timeout, mark, zone])
+ tmp = [orig_src, reply_dst, proto, timeout, mark, zone]
+ data_entries.append(tmp)
elif nat_direction == 'destination':
- data_entries.append(
- [orig_dst, reply_src, proto, timeout, mark, zone])
+ tmp = [orig_dst, reply_src, proto, timeout, mark, zone]
+ data_entries.append(tmp)
headers = ["Pre-NAT", "Post-NAT", "Proto", "Timeout", "Mark", "Zone"]
output = tabulate(data_entries, headers, numalign="left")
return output
+def _verify(func):
+ """Decorator checks if NAT config exists"""
+ from functools import wraps
+
+ @wraps(func)
+ def _wrapper(*args, **kwargs):
+ config = ConfigTreeQuery()
+ if not config.exists(base):
+ raise vyos.opmode.UnconfiguredSubsystem(unconf_message)
+ return func(*args, **kwargs)
+ return _wrapper
+
+
+@_verify
def show_rules(raw: bool, direction: str, family: str):
nat_rules = _get_raw_data_rules(direction, family)
if raw:
@@ -285,6 +306,7 @@ def show_rules(raw: bool, direction: str, family: str):
return _get_formatted_output_rules(nat_rules, direction, family)
+@_verify
def show_statistics(raw: bool, direction: str, family: str):
nat_statistics = _get_raw_data_rules(direction, family)
if raw:
@@ -293,13 +315,21 @@ def show_statistics(raw: bool, direction: str, family: str):
return _get_formatted_output_statistics(nat_statistics, direction)
-def show_translations(raw: bool, direction: str, family: str):
+@_verify
+def show_translations(raw: bool, direction:
+ str, family: str,
+ address: typing.Optional[str],
+ verbose: typing.Optional[bool]):
family = 'ipv6' if family == 'inet6' else 'ipv4'
- nat_translation = _get_raw_translation(direction, family)
+ nat_translation = _get_raw_translation(direction,
+ family=family,
+ address=address)
+
if raw:
return nat_translation
else:
- return _get_formatted_translation(nat_translation, direction, family)
+ return _get_formatted_translation(nat_translation, direction, family,
+ verbose)
if __name__ == '__main__':