diff options
author | Christian Poessinger <christian@poessinger.com> | 2022-12-23 17:30:22 +0100 |
---|---|---|
committer | Christian Poessinger <christian@poessinger.com> | 2022-12-23 17:36:34 +0100 |
commit | 5a089a033bf684583ad75d3c7b5687d72aabded9 (patch) | |
tree | 8a1212ba208193f623fdc4362c90bab5a1c3a6b0 /src | |
parent | 76d8a34d9efe9b420f9e698bba88b44ec5d8631b (diff) | |
download | vyos-1x-5a089a033bf684583ad75d3c7b5687d72aabded9.tar.gz vyos-1x-5a089a033bf684583ad75d3c7b5687d72aabded9.zip |
nat: T4545: implement missing functionality from old script to new op-mode script
Remaining functionality to filter NAT translations for a given address
got implemented to nat.py - with this cahnge we can drop the old files
show_nat*.py
Diffstat (limited to 'src')
-rwxr-xr-x | src/op_mode/nat.py | 40 | ||||
-rwxr-xr-x | src/op_mode/show_nat66_statistics.py | 63 | ||||
-rwxr-xr-x | src/op_mode/show_nat66_translations.py | 204 | ||||
-rwxr-xr-x | src/op_mode/show_nat_statistics.py | 63 | ||||
-rwxr-xr-x | src/op_mode/show_nat_translations.py | 216 |
5 files changed, 21 insertions, 565 deletions
diff --git a/src/op_mode/nat.py b/src/op_mode/nat.py index f899eb3dc..a46571bd5 100755 --- a/src/op_mode/nat.py +++ b/src/op_mode/nat.py @@ -18,23 +18,21 @@ import jmespath import json import sys import xmltodict +import typing -from sys import exit from tabulate import tabulate -from vyos.configquery import ConfigTreeQuery +import vyos.opmode +from vyos.configquery import ConfigTreeQuery from vyos.util import cmd from vyos.util import dict_search -import vyos.opmode - - base = 'nat' unconf_message = 'NAT is not configured' -def _get_xml_translation(direction, family): +def _get_xml_translation(direction, family, address=None): """ Get conntrack XML output --src-nat|--dst-nat """ @@ -42,7 +40,10 @@ def _get_xml_translation(direction, family): opt = '--src-nat' if direction == 'destination': opt = '--dst-nat' - return cmd(f'sudo conntrack --dump --family {family} {opt} --output xml') + tmp = f'conntrack --dump --family {family} {opt} --output xml' + if address: + tmp += f' --src {address}' + return cmd(tmp) def _xml_to_dict(xml): @@ -66,7 +67,7 @@ def _get_json_data(direction, family): if direction == 'destination': chain = 'PREROUTING' family = 'ip6' if family == 'inet6' else 'ip' - return cmd(f'sudo nft --json list chain {family} vyos_nat {chain}') + return cmd(f'nft --json list chain {family} vyos_nat {chain}') def _get_raw_data_rules(direction, family): @@ -82,11 +83,11 @@ def _get_raw_data_rules(direction, family): return rules -def _get_raw_translation(direction, family): +def _get_raw_translation(direction, family, address=None): """ Return: dictionary """ - xml = _get_xml_translation(direction, family) + xml = _get_xml_translation(direction, family, address) if len(xml) == 0: output = {'conntrack': { @@ -231,7 +232,7 @@ def _get_formatted_output_statistics(data, direction): return output -def _get_formatted_translation(dict_data, nat_direction, family): +def _get_formatted_translation(dict_data, nat_direction, family, verbose): data_entries = [] if 'error' in dict_data['conntrack']: return 'Entries not found' @@ -269,14 +270,14 @@ def _get_formatted_translation(dict_data, nat_direction, family): reply_src = f'{reply_src}:{reply_sport}' if reply_sport else reply_src reply_dst = f'{reply_dst}:{reply_dport}' if reply_dport else reply_dst state = meta['state'] if 'state' in meta else '' - mark = meta['mark'] + mark = meta.get('mark', '') zone = meta['zone'] if 'zone' in meta else '' if nat_direction == 'source': - data_entries.append( - [orig_src, reply_dst, proto, timeout, mark, zone]) + tmp = [orig_src, reply_dst, proto, timeout, mark, zone] + data_entries.append(tmp) elif nat_direction == 'destination': - data_entries.append( - [orig_dst, reply_src, proto, timeout, mark, zone]) + tmp = [orig_dst, reply_src, proto, timeout, mark, zone] + data_entries.append(tmp) headers = ["Pre-NAT", "Post-NAT", "Proto", "Timeout", "Mark", "Zone"] output = tabulate(data_entries, headers, numalign="left") @@ -315,13 +316,14 @@ def show_statistics(raw: bool, direction: str, family: str): @_verify -def show_translations(raw: bool, direction: str, family: str): +def show_translations(raw: bool, direction: str, family: str, address: typing.Optional[str]): family = 'ipv6' if family == 'inet6' else 'ipv4' - nat_translation = _get_raw_translation(direction, family) + nat_translation = _get_raw_translation(direction, family=family, address=address) + if raw: return nat_translation else: - return _get_formatted_translation(nat_translation, direction, family) + return _get_formatted_translation(nat_translation, direction, family, verbose) if __name__ == '__main__': diff --git a/src/op_mode/show_nat66_statistics.py b/src/op_mode/show_nat66_statistics.py deleted file mode 100755 index cb10aed9f..000000000 --- a/src/op_mode/show_nat66_statistics.py +++ /dev/null @@ -1,63 +0,0 @@ -#!/usr/bin/env python3 -# -# Copyright (C) 2018 VyOS maintainers and contributors -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License version 2 or later as -# published by the Free Software Foundation. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. - -import jmespath -import json - -from argparse import ArgumentParser -from jinja2 import Template -from sys import exit -from vyos.util import cmd - -OUT_TMPL_SRC=""" -rule pkts bytes interface ----- ---- ----- --------- -{% for r in output %} -{% if r.comment %} -{% set packets = r.counter.packets %} -{% set bytes = r.counter.bytes %} -{% set interface = r.interface %} -{# remove rule comment prefix #} -{% set comment = r.comment | replace('SRC-NAT66-', '') | replace('DST-NAT66-', '') %} -{{ "%-4s" | format(comment) }} {{ "%9s" | format(packets) }} {{ "%12s" | format(bytes) }} {{ interface }} -{% endif %} -{% endfor %} -""" - -parser = ArgumentParser() -group = parser.add_mutually_exclusive_group() -group.add_argument("--source", help="Show statistics for configured source NAT rules", action="store_true") -group.add_argument("--destination", help="Show statistics for configured destination NAT rules", action="store_true") -args = parser.parse_args() - -if args.source or args.destination: - tmp = cmd('sudo nft -j list table ip6 vyos_nat') - tmp = json.loads(tmp) - - source = r"nftables[?rule.chain=='POSTROUTING'].rule.{chain: chain, handle: handle, comment: comment, counter: expr[].counter | [0], interface: expr[].match.right | [0] }" - destination = r"nftables[?rule.chain=='PREROUTING'].rule.{chain: chain, handle: handle, comment: comment, counter: expr[].counter | [0], interface: expr[].match.right | [0] }" - data = { - 'output' : jmespath.search(source if args.source else destination, tmp), - 'direction' : 'source' if args.source else 'destination' - } - - tmpl = Template(OUT_TMPL_SRC, lstrip_blocks=True) - print(tmpl.render(data)) - exit(0) -else: - parser.print_help() - exit(1) - diff --git a/src/op_mode/show_nat66_translations.py b/src/op_mode/show_nat66_translations.py deleted file mode 100755 index 045d64065..000000000 --- a/src/op_mode/show_nat66_translations.py +++ /dev/null @@ -1,204 +0,0 @@ -#!/usr/bin/env python3 -# -# Copyright (C) 2020 VyOS maintainers and contributors -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License version 2 or later as -# published by the Free Software Foundation. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. - -''' -show nat translations -''' - -import os -import sys -import ipaddress -import argparse -import xmltodict - -from vyos.util import popen -from vyos.util import DEVNULL - -conntrack = '/usr/sbin/conntrack' - -verbose_format = "%-20s %-18s %-20s %-18s" -normal_format = "%-20s %-20s %-4s %-8s %s" - - -def headers(verbose, pipe): - if verbose: - return verbose_format % ('Pre-NAT src', 'Pre-NAT dst', 'Post-NAT src', 'Post-NAT dst') - return normal_format % ('Pre-NAT', 'Post-NAT', 'Prot', 'Timeout', 'Type' if pipe else '') - - -def command(srcdest, proto, ipaddr): - command = f'{conntrack} -o xml -L -f ipv6' - - if proto: - command += f' -p {proto}' - - if srcdest == 'source': - command += ' -n' - if ipaddr: - command += f' --orig-src {ipaddr}' - if srcdest == 'destination': - command += ' -g' - if ipaddr: - command += f' --orig-dst {ipaddr}' - - return command - - -def run(command): - xml, code = popen(command,stderr=DEVNULL) - if code: - sys.exit('conntrack failed') - return xml - - -def content(xmlfile): - xml = '' - with open(xmlfile,'r') as r: - xml += r.read() - return xml - - -def pipe(): - xml = '' - while True: - line = sys.stdin.readline() - xml += line - if '</conntrack>' in line: - break - - sys.stdin = open('/dev/tty') - return xml - - -def process(data, stats, protocol, pipe, verbose, flowtype=''): - if not data: - return - - parsed = xmltodict.parse(data) - - print(headers(verbose, pipe)) - - # to help the linter to detect typos - ORIGINAL = 'original' - REPLY = 'reply' - INDEPENDANT = 'independent' - SPORT = 'sport' - DPORT = 'dport' - SRC = 'src' - DST = 'dst' - - for rule in parsed['conntrack']['flow']: - src, dst, sport, dport, proto = {}, {}, {}, {}, {} - packet_count, byte_count = {}, {} - timeout, use = 0, 0 - - rule_type = rule.get('type', '') - - for meta in rule['meta']: - # print(meta) - direction = meta['@direction'] - - if direction in (ORIGINAL, REPLY): - if 'layer3' in meta: - l3 = meta['layer3'] - src[direction] = l3[SRC] - dst[direction] = l3[DST] - - if 'layer4' in meta: - l4 = meta['layer4'] - sp = l4.get(SPORT, '') - dp = l4.get(DPORT, '') - if sp: - sport[direction] = sp - if dp: - dport[direction] = dp - proto[direction] = l4.get('@protoname','') - - if stats and 'counters' in meta: - packet_count[direction] = meta['packets'] - byte_count[direction] = meta['bytes'] - continue - - if direction == INDEPENDANT: - timeout = meta['timeout'] - use = meta['use'] - continue - - in_src = '%s:%s' % (src[ORIGINAL], sport[ORIGINAL]) if ORIGINAL in sport else src[ORIGINAL] - in_dst = '%s:%s' % (dst[ORIGINAL], dport[ORIGINAL]) if ORIGINAL in dport else dst[ORIGINAL] - - # inverted the the perl code !!? - out_dst = '%s:%s' % (dst[REPLY], dport[REPLY]) if REPLY in dport else dst[REPLY] - out_src = '%s:%s' % (src[REPLY], sport[REPLY]) if REPLY in sport else src[REPLY] - - if flowtype == 'source': - v = ORIGINAL in sport and REPLY in dport - f = '%s:%s' % (src[ORIGINAL], sport[ORIGINAL]) if v else src[ORIGINAL] - t = '%s:%s' % (dst[REPLY], dport[REPLY]) if v else dst[REPLY] - else: - v = ORIGINAL in dport and REPLY in sport - f = '%s:%s' % (dst[ORIGINAL], dport[ORIGINAL]) if v else dst[ORIGINAL] - t = '%s:%s' % (src[REPLY], sport[REPLY]) if v else src[REPLY] - - # Thomas: I do not believe proto should be an option - p = proto.get('original', '') - if protocol and p != protocol: - continue - - if verbose: - msg = verbose_format % (in_src, in_dst, out_dst, out_src) - p = f'{p}: ' if p else '' - msg += f'\n {p}{f} ==> {t}' - msg += f' timeout: {timeout}' if timeout else '' - msg += f' use: {use} ' if use else '' - msg += f' type: {rule_type}' if rule_type else '' - print(msg) - else: - print(normal_format % (f, t, p, timeout, rule_type if rule_type else '')) - - if stats: - for direction in ('original', 'reply'): - if direction in packet_count: - print(' %-8s: packets %s, bytes %s' % direction, packet_count[direction], byte_count[direction]) - - -def main(): - parser = argparse.ArgumentParser(description=sys.modules[__name__].__doc__) - parser.add_argument('--verbose', help='provide more details about the flows', action='store_true') - parser.add_argument('--proto', help='filter by protocol', default='', type=str) - parser.add_argument('--file', help='read the conntrack xml from a file', type=str) - parser.add_argument('--stats', help='add usage statistics', action='store_true') - parser.add_argument('--type', help='NAT type (source, destination)', required=True, type=str) - parser.add_argument('--ipaddr', help='source ip address to filter on', type=ipaddress.ip_address) - parser.add_argument('--pipe', help='read conntrack xml data from stdin', action='store_true') - - arg = parser.parse_args() - - if arg.type not in ('source', 'destination'): - sys.exit('Unknown NAT type!') - - if arg.pipe: - process(pipe(), arg.stats, arg.proto, arg.pipe, arg.verbose, arg.type) - elif arg.file: - process(content(arg.file), arg.stats, arg.proto, arg.pipe, arg.verbose, arg.type) - else: - try: - process(run(command(arg.type, arg.proto, arg.ipaddr)), arg.stats, arg.proto, arg.pipe, arg.verbose, arg.type) - except: - pass - -if __name__ == '__main__': - main() diff --git a/src/op_mode/show_nat_statistics.py b/src/op_mode/show_nat_statistics.py deleted file mode 100755 index be41e083b..000000000 --- a/src/op_mode/show_nat_statistics.py +++ /dev/null @@ -1,63 +0,0 @@ -#!/usr/bin/env python3 -# -# Copyright (C) 2018 VyOS maintainers and contributors -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License version 2 or later as -# published by the Free Software Foundation. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. - -import jmespath -import json - -from argparse import ArgumentParser -from jinja2 import Template -from sys import exit -from vyos.util import cmd - -OUT_TMPL_SRC=""" -rule pkts bytes interface ----- ---- ----- --------- -{% for r in output %} -{% if r.comment %} -{% set packets = r.counter.packets %} -{% set bytes = r.counter.bytes %} -{% set interface = r.interface %} -{# remove rule comment prefix #} -{% set comment = r.comment | replace('SRC-NAT-', '') | replace('DST-NAT-', '') | replace(' tcp_udp', '') %} -{{ "%-4s" | format(comment) }} {{ "%9s" | format(packets) }} {{ "%12s" | format(bytes) }} {{ interface }} -{% endif %} -{% endfor %} -""" - -parser = ArgumentParser() -group = parser.add_mutually_exclusive_group() -group.add_argument("--source", help="Show statistics for configured source NAT rules", action="store_true") -group.add_argument("--destination", help="Show statistics for configured destination NAT rules", action="store_true") -args = parser.parse_args() - -if args.source or args.destination: - tmp = cmd('sudo nft -j list table ip vyos_nat') - tmp = json.loads(tmp) - - source = r"nftables[?rule.chain=='POSTROUTING'].rule.{chain: chain, handle: handle, comment: comment, counter: expr[].counter | [0], interface: expr[].match.right | [0] }" - destination = r"nftables[?rule.chain=='PREROUTING'].rule.{chain: chain, handle: handle, comment: comment, counter: expr[].counter | [0], interface: expr[].match.right | [0] }" - data = { - 'output' : jmespath.search(source if args.source else destination, tmp), - 'direction' : 'source' if args.source else 'destination' - } - - tmpl = Template(OUT_TMPL_SRC, lstrip_blocks=True) - print(tmpl.render(data)) - exit(0) -else: - parser.print_help() - exit(1) - diff --git a/src/op_mode/show_nat_translations.py b/src/op_mode/show_nat_translations.py deleted file mode 100755 index 508845e23..000000000 --- a/src/op_mode/show_nat_translations.py +++ /dev/null @@ -1,216 +0,0 @@ -#!/usr/bin/env python3 -# -# Copyright (C) 2020-2022 VyOS maintainers and contributors -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License version 2 or later as -# published by the Free Software Foundation. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. - -''' -show nat translations -''' - -import os -import sys -import ipaddress -import argparse -import xmltodict - -from vyos.util import popen -from vyos.util import DEVNULL - -conntrack = '/usr/sbin/conntrack' - -verbose_format = "%-20s %-18s %-20s %-18s" -normal_format = "%-20s %-20s %-4s %-8s %s" - - -def headers(verbose, pipe): - if verbose: - return verbose_format % ('Pre-NAT src', 'Pre-NAT dst', 'Post-NAT src', 'Post-NAT dst') - return normal_format % ('Pre-NAT', 'Post-NAT', 'Prot', 'Timeout', 'Type' if pipe else '') - - -def command(srcdest, proto, ipaddr): - command = f'{conntrack} -o xml -L' - - if proto: - command += f' -p {proto}' - - if srcdest == 'source': - command += ' -n' - if ipaddr: - command += f' --orig-src {ipaddr}' - if srcdest == 'destination': - command += ' -g' - if ipaddr: - command += f' --orig-dst {ipaddr}' - - return command - - -def run(command): - xml, code = popen(command,stderr=DEVNULL) - if code: - sys.exit('conntrack failed') - return xml - - -def content(xmlfile): - xml = '' - with open(xmlfile,'r') as r: - xml += r.read() - return xml - - -def pipe(): - xml = '' - while True: - line = sys.stdin.readline() - xml += line - if '</conntrack>' in line: - break - - sys.stdin = open('/dev/tty') - return xml - - -def xml_to_dict(xml): - """ - Convert XML to dictionary - Return: dictionary - """ - parse = xmltodict.parse(xml) - # If only one NAT entry we must change dict T4499 - if 'meta' in parse['conntrack']['flow']: - return dict(conntrack={'flow': [parse['conntrack']['flow']]}) - return parse - - -def process(data, stats, protocol, pipe, verbose, flowtype=''): - if not data: - return - - parsed = xml_to_dict(data) - - print(headers(verbose, pipe)) - - # to help the linter to detect typos - ORIGINAL = 'original' - REPLY = 'reply' - INDEPENDANT = 'independent' - SPORT = 'sport' - DPORT = 'dport' - SRC = 'src' - DST = 'dst' - - for rule in parsed['conntrack']['flow']: - src, dst, sport, dport, proto = {}, {}, {}, {}, {} - packet_count, byte_count = {}, {} - timeout, use = 0, 0 - - rule_type = rule.get('type', '') - - for meta in rule['meta']: - # print(meta) - direction = meta['@direction'] - - if direction in (ORIGINAL, REPLY): - if 'layer3' in meta: - l3 = meta['layer3'] - src[direction] = l3[SRC] - dst[direction] = l3[DST] - - if 'layer4' in meta: - l4 = meta['layer4'] - sp = l4.get(SPORT, '') - dp = l4.get(DPORT, '') - if sp: - sport[direction] = sp - if dp: - dport[direction] = dp - proto[direction] = l4.get('@protoname','') - - if stats and 'counters' in meta: - packet_count[direction] = meta['packets'] - byte_count[direction] = meta['bytes'] - continue - - if direction == INDEPENDANT: - timeout = meta['timeout'] - use = meta['use'] - continue - - in_src = '%s:%s' % (src[ORIGINAL], sport[ORIGINAL]) if ORIGINAL in sport else src[ORIGINAL] - in_dst = '%s:%s' % (dst[ORIGINAL], dport[ORIGINAL]) if ORIGINAL in dport else dst[ORIGINAL] - - # inverted the the perl code !!? - out_dst = '%s:%s' % (dst[REPLY], dport[REPLY]) if REPLY in dport else dst[REPLY] - out_src = '%s:%s' % (src[REPLY], sport[REPLY]) if REPLY in sport else src[REPLY] - - if flowtype == 'source': - v = ORIGINAL in sport and REPLY in dport - f = '%s:%s' % (src[ORIGINAL], sport[ORIGINAL]) if v else src[ORIGINAL] - t = '%s:%s' % (dst[REPLY], dport[REPLY]) if v else dst[REPLY] - else: - v = ORIGINAL in dport and REPLY in sport - f = '%s:%s' % (dst[ORIGINAL], dport[ORIGINAL]) if v else dst[ORIGINAL] - t = '%s:%s' % (src[REPLY], sport[REPLY]) if v else src[REPLY] - - # Thomas: I do not believe proto should be an option - p = proto.get('original', '') - if protocol and p != protocol: - continue - - if verbose: - msg = verbose_format % (in_src, in_dst, out_dst, out_src) - p = f'{p}: ' if p else '' - msg += f'\n {p}{f} ==> {t}' - msg += f' timeout: {timeout}' if timeout else '' - msg += f' use: {use} ' if use else '' - msg += f' type: {rule_type}' if rule_type else '' - print(msg) - else: - print(normal_format % (f, t, p, timeout, rule_type if rule_type else '')) - - if stats: - for direction in ('original', 'reply'): - if direction in packet_count: - print(' %-8s: packets %s, bytes %s' % direction, packet_count[direction], byte_count[direction]) - - -def main(): - parser = argparse.ArgumentParser(description=sys.modules[__name__].__doc__) - parser.add_argument('--verbose', help='provide more details about the flows', action='store_true') - parser.add_argument('--proto', help='filter by protocol', default='', type=str) - parser.add_argument('--file', help='read the conntrack xml from a file', type=str) - parser.add_argument('--stats', help='add usage statistics', action='store_true') - parser.add_argument('--type', help='NAT type (source, destination)', required=True, type=str) - parser.add_argument('--ipaddr', help='source ip address to filter on', type=ipaddress.ip_address) - parser.add_argument('--pipe', help='read conntrack xml data from stdin', action='store_true') - - arg = parser.parse_args() - - if arg.type not in ('source', 'destination'): - sys.exit('Unknown NAT type!') - - if arg.pipe: - process(pipe(), arg.stats, arg.proto, arg.pipe, arg.verbose, arg.type) - elif arg.file: - process(content(arg.file), arg.stats, arg.proto, arg.pipe, arg.verbose, arg.type) - else: - try: - process(run(command(arg.type, arg.proto, arg.ipaddr)), arg.stats, arg.proto, arg.pipe, arg.verbose, arg.type) - except: - pass - -if __name__ == '__main__': - main() |