diff options
Diffstat (limited to 'src')
| -rwxr-xr-x | src/op_mode/nat.py | 40 | ||||
| -rwxr-xr-x | src/op_mode/show_nat66_statistics.py | 63 | ||||
| -rwxr-xr-x | src/op_mode/show_nat66_translations.py | 204 | ||||
| -rwxr-xr-x | src/op_mode/show_nat_statistics.py | 63 | ||||
| -rwxr-xr-x | src/op_mode/show_nat_translations.py | 216 | 
5 files changed, 21 insertions, 565 deletions
| diff --git a/src/op_mode/nat.py b/src/op_mode/nat.py index f899eb3dc..a46571bd5 100755 --- a/src/op_mode/nat.py +++ b/src/op_mode/nat.py @@ -18,23 +18,21 @@ import jmespath  import json  import sys  import xmltodict +import typing -from sys import exit  from tabulate import tabulate -from vyos.configquery import ConfigTreeQuery +import vyos.opmode +from vyos.configquery import ConfigTreeQuery  from vyos.util import cmd  from vyos.util import dict_search -import vyos.opmode - -  base = 'nat'  unconf_message = 'NAT is not configured' -def _get_xml_translation(direction, family): +def _get_xml_translation(direction, family, address=None):      """      Get conntrack XML output --src-nat|--dst-nat      """ @@ -42,7 +40,10 @@ def _get_xml_translation(direction, family):          opt = '--src-nat'      if direction == 'destination':          opt = '--dst-nat' -    return cmd(f'sudo conntrack --dump --family {family} {opt} --output xml') +    tmp = f'conntrack --dump --family {family} {opt} --output xml' +    if address: +        tmp += f' --src {address}' +    return cmd(tmp)  def _xml_to_dict(xml): @@ -66,7 +67,7 @@ def _get_json_data(direction, family):      if direction == 'destination':          chain = 'PREROUTING'      family = 'ip6' if family == 'inet6' else 'ip' -    return cmd(f'sudo nft --json list chain {family} vyos_nat {chain}') +    return cmd(f'nft --json list chain {family} vyos_nat {chain}')  def _get_raw_data_rules(direction, family): @@ -82,11 +83,11 @@ def _get_raw_data_rules(direction, family):      return rules -def _get_raw_translation(direction, family): +def _get_raw_translation(direction, family, address=None):      """      Return: dictionary      """ -    xml = _get_xml_translation(direction, family) +    xml = _get_xml_translation(direction, family, address)      if len(xml) == 0:          output = {'conntrack':              { @@ -231,7 +232,7 @@ def _get_formatted_output_statistics(data, direction):      return output -def _get_formatted_translation(dict_data, nat_direction, family): +def _get_formatted_translation(dict_data, nat_direction, family, verbose):      data_entries = []      if 'error' in dict_data['conntrack']:          return 'Entries not found' @@ -269,14 +270,14 @@ def _get_formatted_translation(dict_data, nat_direction, family):                  reply_src = f'{reply_src}:{reply_sport}' if reply_sport else reply_src                  reply_dst = f'{reply_dst}:{reply_dport}' if reply_dport else reply_dst                  state = meta['state'] if 'state' in meta else '' -                mark = meta['mark'] +                mark = meta.get('mark', '')                  zone = meta['zone'] if 'zone' in meta else ''                  if nat_direction == 'source': -                    data_entries.append( -                        [orig_src, reply_dst, proto, timeout, mark, zone]) +                    tmp = [orig_src, reply_dst, proto, timeout, mark, zone] +                    data_entries.append(tmp)                  elif nat_direction == 'destination': -                    data_entries.append( -                        [orig_dst, reply_src, proto, timeout, mark, zone]) +                    tmp = [orig_dst, reply_src, proto, timeout, mark, zone] +                    data_entries.append(tmp)      headers = ["Pre-NAT", "Post-NAT", "Proto", "Timeout", "Mark", "Zone"]      output = tabulate(data_entries, headers, numalign="left") @@ -315,13 +316,14 @@ def show_statistics(raw: bool, direction: str, family: str):  @_verify -def show_translations(raw: bool, direction: str, family: str): +def show_translations(raw: bool, direction: str, family: str, address: typing.Optional[str]):      family = 'ipv6' if family == 'inet6' else 'ipv4' -    nat_translation = _get_raw_translation(direction, family) +    nat_translation = _get_raw_translation(direction, family=family, address=address) +      if raw:          return nat_translation      else: -        return _get_formatted_translation(nat_translation, direction, family) +        return _get_formatted_translation(nat_translation, direction, family, verbose)  if __name__ == '__main__': diff --git a/src/op_mode/show_nat66_statistics.py b/src/op_mode/show_nat66_statistics.py deleted file mode 100755 index cb10aed9f..000000000 --- a/src/op_mode/show_nat66_statistics.py +++ /dev/null @@ -1,63 +0,0 @@ -#!/usr/bin/env python3 -# -# Copyright (C) 2018 VyOS maintainers and contributors -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License version 2 or later as -# published by the Free Software Foundation. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program.  If not, see <http://www.gnu.org/licenses/>. - -import jmespath -import json - -from argparse import ArgumentParser -from jinja2 import Template -from sys import exit -from vyos.util import cmd - -OUT_TMPL_SRC=""" -rule      pkts        bytes   interface -----      ----        -----   --------- -{% for r in output %} -{% if r.comment %} -{% set packets   = r.counter.packets %} -{% set bytes     = r.counter.bytes %} -{% set interface = r.interface %} -{# remove rule comment prefix #} -{% set comment   = r.comment | replace('SRC-NAT66-', '') | replace('DST-NAT66-', '') %} -{{ "%-4s" | format(comment) }} {{ "%9s" | format(packets) }} {{ "%12s" | format(bytes) }}   {{ interface }} -{% endif %} -{% endfor %} -""" - -parser = ArgumentParser() -group = parser.add_mutually_exclusive_group() -group.add_argument("--source", help="Show statistics for configured source NAT rules", action="store_true") -group.add_argument("--destination", help="Show statistics for configured destination NAT rules", action="store_true") -args = parser.parse_args() - -if args.source or args.destination: -    tmp = cmd('sudo nft -j list table ip6 vyos_nat') -    tmp = json.loads(tmp) - -    source = r"nftables[?rule.chain=='POSTROUTING'].rule.{chain: chain, handle: handle, comment: comment, counter: expr[].counter | [0], interface: expr[].match.right | [0] }" -    destination = r"nftables[?rule.chain=='PREROUTING'].rule.{chain: chain, handle: handle, comment: comment, counter: expr[].counter | [0], interface: expr[].match.right | [0] }" -    data = { -        'output' : jmespath.search(source if args.source else destination, tmp), -        'direction' : 'source' if args.source else 'destination' -    } - -    tmpl = Template(OUT_TMPL_SRC, lstrip_blocks=True) -    print(tmpl.render(data)) -    exit(0) -else: -    parser.print_help() -    exit(1) - diff --git a/src/op_mode/show_nat66_translations.py b/src/op_mode/show_nat66_translations.py deleted file mode 100755 index 045d64065..000000000 --- a/src/op_mode/show_nat66_translations.py +++ /dev/null @@ -1,204 +0,0 @@ -#!/usr/bin/env python3 -# -# Copyright (C) 2020 VyOS maintainers and contributors -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License version 2 or later as -# published by the Free Software Foundation. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program.  If not, see <http://www.gnu.org/licenses/>. - -''' -show nat translations -''' - -import os -import sys -import ipaddress -import argparse -import xmltodict - -from vyos.util import popen -from vyos.util import DEVNULL - -conntrack = '/usr/sbin/conntrack' - -verbose_format = "%-20s %-18s %-20s %-18s" -normal_format = "%-20s %-20s %-4s  %-8s %s" - - -def headers(verbose, pipe): -    if verbose: -        return verbose_format % ('Pre-NAT src', 'Pre-NAT dst', 'Post-NAT src', 'Post-NAT dst') -    return normal_format % ('Pre-NAT', 'Post-NAT', 'Prot', 'Timeout', 'Type' if pipe else '') - - -def command(srcdest, proto, ipaddr): -    command = f'{conntrack} -o xml -L -f ipv6' - -    if proto: -        command += f' -p {proto}' - -    if srcdest == 'source': -        command += ' -n' -        if ipaddr: -            command += f' --orig-src {ipaddr}' -    if srcdest == 'destination': -        command += ' -g' -        if ipaddr: -            command += f' --orig-dst {ipaddr}' - -    return command - - -def run(command): -    xml, code = popen(command,stderr=DEVNULL) -    if code: -        sys.exit('conntrack failed') -    return xml - - -def content(xmlfile): -    xml = '' -    with open(xmlfile,'r') as r: -        xml += r.read() -    return xml - - -def pipe(): -    xml = '' -    while True: -        line = sys.stdin.readline() -        xml += line -        if '</conntrack>' in line: -            break - -    sys.stdin = open('/dev/tty') -    return xml - - -def process(data, stats, protocol, pipe, verbose, flowtype=''): -    if not data: -        return - -    parsed = xmltodict.parse(data) - -    print(headers(verbose, pipe)) - -    # to help the linter to detect typos -    ORIGINAL = 'original' -    REPLY = 'reply' -    INDEPENDANT = 'independent' -    SPORT = 'sport' -    DPORT = 'dport' -    SRC = 'src' -    DST = 'dst' - -    for rule in parsed['conntrack']['flow']: -        src, dst, sport, dport, proto = {}, {}, {}, {}, {} -        packet_count, byte_count = {}, {} -        timeout, use = 0, 0 - -        rule_type = rule.get('type', '') - -        for meta in rule['meta']: -            # print(meta) -            direction = meta['@direction'] - -            if direction in (ORIGINAL, REPLY): -                if 'layer3' in meta: -                    l3 = meta['layer3'] -                    src[direction] = l3[SRC] -                    dst[direction] = l3[DST] - -                if 'layer4' in meta: -                    l4 = meta['layer4'] -                    sp = l4.get(SPORT, '') -                    dp = l4.get(DPORT, '') -                    if sp: -                        sport[direction] = sp -                    if dp: -                        dport[direction] = dp -                    proto[direction] = l4.get('@protoname','') - -                if stats and 'counters' in meta: -                    packet_count[direction] = meta['packets'] -                    byte_count[direction] = meta['bytes'] -                continue - -            if direction == INDEPENDANT: -                timeout = meta['timeout'] -                use = meta['use'] -                continue - -        in_src = '%s:%s' % (src[ORIGINAL], sport[ORIGINAL]) if ORIGINAL in sport else src[ORIGINAL] -        in_dst = '%s:%s' % (dst[ORIGINAL], dport[ORIGINAL]) if ORIGINAL in dport else dst[ORIGINAL] - -        # inverted the the perl code !!? -        out_dst = '%s:%s' % (dst[REPLY], dport[REPLY]) if REPLY in dport else dst[REPLY] -        out_src = '%s:%s' % (src[REPLY], sport[REPLY]) if REPLY in sport else src[REPLY] - -        if flowtype == 'source': -            v = ORIGINAL in sport and REPLY in dport -            f = '%s:%s' % (src[ORIGINAL], sport[ORIGINAL]) if v else src[ORIGINAL] -            t = '%s:%s' % (dst[REPLY], dport[REPLY]) if v else dst[REPLY] -        else: -            v = ORIGINAL in dport and REPLY in sport -            f = '%s:%s' % (dst[ORIGINAL], dport[ORIGINAL]) if v else dst[ORIGINAL] -            t = '%s:%s' % (src[REPLY], sport[REPLY]) if v else src[REPLY] - -        # Thomas: I do not believe proto should be an option -        p = proto.get('original', '') -        if protocol and p != protocol: -            continue - -        if verbose: -            msg = verbose_format % (in_src, in_dst, out_dst, out_src) -            p = f'{p}: ' if p else '' -            msg += f'\n  {p}{f} ==> {t}' -            msg += f' timeout: {timeout}' if timeout else '' -            msg += f' use: {use} ' if use else '' -            msg += f' type: {rule_type}' if rule_type else '' -            print(msg) -        else: -            print(normal_format % (f, t, p, timeout, rule_type if rule_type else '')) - -        if stats: -            for direction in ('original', 'reply'): -                if direction in packet_count: -                    print('  %-8s: packets %s, bytes %s' % direction, packet_count[direction], byte_count[direction]) - - -def main(): -    parser = argparse.ArgumentParser(description=sys.modules[__name__].__doc__) -    parser.add_argument('--verbose', help='provide more details about the flows', action='store_true') -    parser.add_argument('--proto', help='filter by protocol', default='', type=str) -    parser.add_argument('--file', help='read the conntrack xml from a file', type=str) -    parser.add_argument('--stats', help='add usage statistics', action='store_true') -    parser.add_argument('--type', help='NAT type (source, destination)', required=True, type=str) -    parser.add_argument('--ipaddr', help='source ip address to filter on', type=ipaddress.ip_address) -    parser.add_argument('--pipe', help='read conntrack xml data from stdin', action='store_true') - -    arg = parser.parse_args() - -    if arg.type not in ('source', 'destination'): -        sys.exit('Unknown NAT type!') - -    if arg.pipe: -        process(pipe(), arg.stats, arg.proto, arg.pipe, arg.verbose, arg.type) -    elif arg.file: -        process(content(arg.file), arg.stats, arg.proto, arg.pipe, arg.verbose, arg.type) -    else: -        try: -            process(run(command(arg.type, arg.proto, arg.ipaddr)), arg.stats, arg.proto, arg.pipe, arg.verbose, arg.type) -        except: -            pass - -if __name__ == '__main__': -    main() diff --git a/src/op_mode/show_nat_statistics.py b/src/op_mode/show_nat_statistics.py deleted file mode 100755 index be41e083b..000000000 --- a/src/op_mode/show_nat_statistics.py +++ /dev/null @@ -1,63 +0,0 @@ -#!/usr/bin/env python3 -# -# Copyright (C) 2018 VyOS maintainers and contributors -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License version 2 or later as -# published by the Free Software Foundation. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program.  If not, see <http://www.gnu.org/licenses/>. - -import jmespath -import json - -from argparse import ArgumentParser -from jinja2 import Template -from sys import exit -from vyos.util import cmd - -OUT_TMPL_SRC=""" -rule      pkts        bytes   interface -----      ----        -----   --------- -{% for r in output %} -{% if r.comment %} -{% set packets   = r.counter.packets %} -{% set bytes     = r.counter.bytes %} -{% set interface = r.interface %} -{# remove rule comment prefix #} -{% set comment   = r.comment | replace('SRC-NAT-', '') | replace('DST-NAT-', '') | replace(' tcp_udp', '') %} -{{ "%-4s" | format(comment) }} {{ "%9s" | format(packets) }} {{ "%12s" | format(bytes) }}   {{ interface }} -{% endif %} -{% endfor %} -""" - -parser = ArgumentParser() -group = parser.add_mutually_exclusive_group() -group.add_argument("--source", help="Show statistics for configured source NAT rules", action="store_true") -group.add_argument("--destination", help="Show statistics for configured destination NAT rules", action="store_true") -args = parser.parse_args() - -if args.source or args.destination: -    tmp = cmd('sudo nft -j list table ip vyos_nat') -    tmp = json.loads(tmp) - -    source = r"nftables[?rule.chain=='POSTROUTING'].rule.{chain: chain, handle: handle, comment: comment, counter: expr[].counter | [0], interface: expr[].match.right | [0] }" -    destination = r"nftables[?rule.chain=='PREROUTING'].rule.{chain: chain, handle: handle, comment: comment, counter: expr[].counter | [0], interface: expr[].match.right | [0] }" -    data = { -        'output' : jmespath.search(source if args.source else destination, tmp), -        'direction' : 'source' if args.source else 'destination' -    } - -    tmpl = Template(OUT_TMPL_SRC, lstrip_blocks=True) -    print(tmpl.render(data)) -    exit(0) -else: -    parser.print_help() -    exit(1) - diff --git a/src/op_mode/show_nat_translations.py b/src/op_mode/show_nat_translations.py deleted file mode 100755 index 508845e23..000000000 --- a/src/op_mode/show_nat_translations.py +++ /dev/null @@ -1,216 +0,0 @@ -#!/usr/bin/env python3 -# -# Copyright (C) 2020-2022 VyOS maintainers and contributors -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License version 2 or later as -# published by the Free Software Foundation. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program.  If not, see <http://www.gnu.org/licenses/>. - -''' -show nat translations -''' - -import os -import sys -import ipaddress -import argparse -import xmltodict - -from vyos.util import popen -from vyos.util import DEVNULL - -conntrack = '/usr/sbin/conntrack' - -verbose_format = "%-20s %-18s %-20s %-18s" -normal_format = "%-20s %-20s %-4s  %-8s %s" - - -def headers(verbose, pipe): -    if verbose: -        return verbose_format % ('Pre-NAT src', 'Pre-NAT dst', 'Post-NAT src', 'Post-NAT dst') -    return normal_format % ('Pre-NAT', 'Post-NAT', 'Prot', 'Timeout', 'Type' if pipe else '') - - -def command(srcdest, proto, ipaddr): -    command = f'{conntrack} -o xml -L' - -    if proto: -        command += f' -p {proto}' - -    if srcdest == 'source': -        command += ' -n' -        if ipaddr: -            command += f' --orig-src {ipaddr}' -    if srcdest == 'destination': -        command += ' -g' -        if ipaddr: -            command += f' --orig-dst {ipaddr}' - -    return command - - -def run(command): -    xml, code = popen(command,stderr=DEVNULL) -    if code: -        sys.exit('conntrack failed') -    return xml - - -def content(xmlfile): -    xml = '' -    with open(xmlfile,'r') as r: -        xml += r.read() -    return xml - - -def pipe(): -    xml = '' -    while True: -        line = sys.stdin.readline() -        xml += line -        if '</conntrack>' in line: -            break - -    sys.stdin = open('/dev/tty') -    return xml - - -def xml_to_dict(xml): -    """ -    Convert XML to dictionary -    Return: dictionary -    """ -    parse = xmltodict.parse(xml) -    # If only one NAT entry we must change dict T4499 -    if 'meta' in parse['conntrack']['flow']: -        return dict(conntrack={'flow': [parse['conntrack']['flow']]}) -    return parse - - -def process(data, stats, protocol, pipe, verbose, flowtype=''): -    if not data: -        return - -    parsed = xml_to_dict(data) - -    print(headers(verbose, pipe)) - -    # to help the linter to detect typos -    ORIGINAL = 'original' -    REPLY = 'reply' -    INDEPENDANT = 'independent' -    SPORT = 'sport' -    DPORT = 'dport' -    SRC = 'src' -    DST = 'dst' - -    for rule in parsed['conntrack']['flow']: -        src, dst, sport, dport, proto = {}, {}, {}, {}, {} -        packet_count, byte_count = {}, {} -        timeout, use = 0, 0 - -        rule_type = rule.get('type', '') - -        for meta in rule['meta']: -            # print(meta) -            direction = meta['@direction'] - -            if direction in (ORIGINAL, REPLY): -                if 'layer3' in meta: -                    l3 = meta['layer3'] -                    src[direction] = l3[SRC] -                    dst[direction] = l3[DST] - -                if 'layer4' in meta: -                    l4 = meta['layer4'] -                    sp = l4.get(SPORT, '') -                    dp = l4.get(DPORT, '') -                    if sp: -                        sport[direction] = sp -                    if dp: -                        dport[direction] = dp -                    proto[direction] = l4.get('@protoname','') - -                if stats and 'counters' in meta: -                    packet_count[direction] = meta['packets'] -                    byte_count[direction] = meta['bytes'] -                continue - -            if direction == INDEPENDANT: -                timeout = meta['timeout'] -                use = meta['use'] -                continue - -        in_src = '%s:%s' % (src[ORIGINAL], sport[ORIGINAL]) if ORIGINAL in sport else src[ORIGINAL] -        in_dst = '%s:%s' % (dst[ORIGINAL], dport[ORIGINAL]) if ORIGINAL in dport else dst[ORIGINAL] - -        # inverted the the perl code !!? -        out_dst = '%s:%s' % (dst[REPLY], dport[REPLY]) if REPLY in dport else dst[REPLY] -        out_src = '%s:%s' % (src[REPLY], sport[REPLY]) if REPLY in sport else src[REPLY] - -        if flowtype == 'source': -            v = ORIGINAL in sport and REPLY in dport -            f = '%s:%s' % (src[ORIGINAL], sport[ORIGINAL]) if v else src[ORIGINAL] -            t = '%s:%s' % (dst[REPLY], dport[REPLY]) if v else dst[REPLY] -        else: -            v = ORIGINAL in dport and REPLY in sport -            f = '%s:%s' % (dst[ORIGINAL], dport[ORIGINAL]) if v else dst[ORIGINAL] -            t = '%s:%s' % (src[REPLY], sport[REPLY]) if v else src[REPLY] - -        # Thomas: I do not believe proto should be an option -        p = proto.get('original', '') -        if protocol and p != protocol: -            continue - -        if verbose: -            msg = verbose_format % (in_src, in_dst, out_dst, out_src) -            p = f'{p}: ' if p else '' -            msg += f'\n  {p}{f} ==> {t}' -            msg += f' timeout: {timeout}' if timeout else '' -            msg += f' use: {use} ' if use else '' -            msg += f' type: {rule_type}' if rule_type else '' -            print(msg) -        else: -            print(normal_format % (f, t, p, timeout, rule_type if rule_type else '')) - -        if stats: -            for direction in ('original', 'reply'): -                if direction in packet_count: -                    print('  %-8s: packets %s, bytes %s' % direction, packet_count[direction], byte_count[direction]) - - -def main(): -    parser = argparse.ArgumentParser(description=sys.modules[__name__].__doc__) -    parser.add_argument('--verbose', help='provide more details about the flows', action='store_true') -    parser.add_argument('--proto', help='filter by protocol', default='', type=str) -    parser.add_argument('--file', help='read the conntrack xml from a file', type=str) -    parser.add_argument('--stats', help='add usage statistics', action='store_true') -    parser.add_argument('--type', help='NAT type (source, destination)', required=True, type=str) -    parser.add_argument('--ipaddr', help='source ip address to filter on', type=ipaddress.ip_address) -    parser.add_argument('--pipe', help='read conntrack xml data from stdin', action='store_true') - -    arg = parser.parse_args() - -    if arg.type not in ('source', 'destination'): -        sys.exit('Unknown NAT type!') - -    if arg.pipe: -        process(pipe(), arg.stats, arg.proto, arg.pipe, arg.verbose, arg.type) -    elif arg.file: -        process(content(arg.file), arg.stats, arg.proto, arg.pipe, arg.verbose, arg.type) -    else: -        try: -            process(run(command(arg.type, arg.proto, arg.ipaddr)), arg.stats, arg.proto, arg.pipe, arg.verbose, arg.type) -        except: -            pass - -if __name__ == '__main__': -    main() | 
