summaryrefslogtreecommitdiff
path: root/src/op_mode/show_nat_translations.py
diff options
context:
space:
mode:
authorChristian Poessinger <christian@poessinger.com>2022-12-23 17:30:22 +0100
committerChristian Poessinger <christian@poessinger.com>2022-12-23 17:36:34 +0100
commit5a089a033bf684583ad75d3c7b5687d72aabded9 (patch)
tree8a1212ba208193f623fdc4362c90bab5a1c3a6b0 /src/op_mode/show_nat_translations.py
parent76d8a34d9efe9b420f9e698bba88b44ec5d8631b (diff)
downloadvyos-1x-5a089a033bf684583ad75d3c7b5687d72aabded9.tar.gz
vyos-1x-5a089a033bf684583ad75d3c7b5687d72aabded9.zip
nat: T4545: implement missing functionality from old script to new op-mode script
Remaining functionality to filter NAT translations for a given address got implemented to nat.py - with this cahnge we can drop the old files show_nat*.py
Diffstat (limited to 'src/op_mode/show_nat_translations.py')
-rwxr-xr-xsrc/op_mode/show_nat_translations.py216
1 files changed, 0 insertions, 216 deletions
diff --git a/src/op_mode/show_nat_translations.py b/src/op_mode/show_nat_translations.py
deleted file mode 100755
index 508845e23..000000000
--- a/src/op_mode/show_nat_translations.py
+++ /dev/null
@@ -1,216 +0,0 @@
-#!/usr/bin/env python3
-#
-# Copyright (C) 2020-2022 VyOS maintainers and contributors
-#
-# This program is free software; you can redistribute it and/or modify
-# it under the terms of the GNU General Public License version 2 or later as
-# published by the Free Software Foundation.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-
-'''
-show nat translations
-'''
-
-import os
-import sys
-import ipaddress
-import argparse
-import xmltodict
-
-from vyos.util import popen
-from vyos.util import DEVNULL
-
-conntrack = '/usr/sbin/conntrack'
-
-verbose_format = "%-20s %-18s %-20s %-18s"
-normal_format = "%-20s %-20s %-4s %-8s %s"
-
-
-def headers(verbose, pipe):
- if verbose:
- return verbose_format % ('Pre-NAT src', 'Pre-NAT dst', 'Post-NAT src', 'Post-NAT dst')
- return normal_format % ('Pre-NAT', 'Post-NAT', 'Prot', 'Timeout', 'Type' if pipe else '')
-
-
-def command(srcdest, proto, ipaddr):
- command = f'{conntrack} -o xml -L'
-
- if proto:
- command += f' -p {proto}'
-
- if srcdest == 'source':
- command += ' -n'
- if ipaddr:
- command += f' --orig-src {ipaddr}'
- if srcdest == 'destination':
- command += ' -g'
- if ipaddr:
- command += f' --orig-dst {ipaddr}'
-
- return command
-
-
-def run(command):
- xml, code = popen(command,stderr=DEVNULL)
- if code:
- sys.exit('conntrack failed')
- return xml
-
-
-def content(xmlfile):
- xml = ''
- with open(xmlfile,'r') as r:
- xml += r.read()
- return xml
-
-
-def pipe():
- xml = ''
- while True:
- line = sys.stdin.readline()
- xml += line
- if '</conntrack>' in line:
- break
-
- sys.stdin = open('/dev/tty')
- return xml
-
-
-def xml_to_dict(xml):
- """
- Convert XML to dictionary
- Return: dictionary
- """
- parse = xmltodict.parse(xml)
- # If only one NAT entry we must change dict T4499
- if 'meta' in parse['conntrack']['flow']:
- return dict(conntrack={'flow': [parse['conntrack']['flow']]})
- return parse
-
-
-def process(data, stats, protocol, pipe, verbose, flowtype=''):
- if not data:
- return
-
- parsed = xml_to_dict(data)
-
- print(headers(verbose, pipe))
-
- # to help the linter to detect typos
- ORIGINAL = 'original'
- REPLY = 'reply'
- INDEPENDANT = 'independent'
- SPORT = 'sport'
- DPORT = 'dport'
- SRC = 'src'
- DST = 'dst'
-
- for rule in parsed['conntrack']['flow']:
- src, dst, sport, dport, proto = {}, {}, {}, {}, {}
- packet_count, byte_count = {}, {}
- timeout, use = 0, 0
-
- rule_type = rule.get('type', '')
-
- for meta in rule['meta']:
- # print(meta)
- direction = meta['@direction']
-
- if direction in (ORIGINAL, REPLY):
- if 'layer3' in meta:
- l3 = meta['layer3']
- src[direction] = l3[SRC]
- dst[direction] = l3[DST]
-
- if 'layer4' in meta:
- l4 = meta['layer4']
- sp = l4.get(SPORT, '')
- dp = l4.get(DPORT, '')
- if sp:
- sport[direction] = sp
- if dp:
- dport[direction] = dp
- proto[direction] = l4.get('@protoname','')
-
- if stats and 'counters' in meta:
- packet_count[direction] = meta['packets']
- byte_count[direction] = meta['bytes']
- continue
-
- if direction == INDEPENDANT:
- timeout = meta['timeout']
- use = meta['use']
- continue
-
- in_src = '%s:%s' % (src[ORIGINAL], sport[ORIGINAL]) if ORIGINAL in sport else src[ORIGINAL]
- in_dst = '%s:%s' % (dst[ORIGINAL], dport[ORIGINAL]) if ORIGINAL in dport else dst[ORIGINAL]
-
- # inverted the the perl code !!?
- out_dst = '%s:%s' % (dst[REPLY], dport[REPLY]) if REPLY in dport else dst[REPLY]
- out_src = '%s:%s' % (src[REPLY], sport[REPLY]) if REPLY in sport else src[REPLY]
-
- if flowtype == 'source':
- v = ORIGINAL in sport and REPLY in dport
- f = '%s:%s' % (src[ORIGINAL], sport[ORIGINAL]) if v else src[ORIGINAL]
- t = '%s:%s' % (dst[REPLY], dport[REPLY]) if v else dst[REPLY]
- else:
- v = ORIGINAL in dport and REPLY in sport
- f = '%s:%s' % (dst[ORIGINAL], dport[ORIGINAL]) if v else dst[ORIGINAL]
- t = '%s:%s' % (src[REPLY], sport[REPLY]) if v else src[REPLY]
-
- # Thomas: I do not believe proto should be an option
- p = proto.get('original', '')
- if protocol and p != protocol:
- continue
-
- if verbose:
- msg = verbose_format % (in_src, in_dst, out_dst, out_src)
- p = f'{p}: ' if p else ''
- msg += f'\n {p}{f} ==> {t}'
- msg += f' timeout: {timeout}' if timeout else ''
- msg += f' use: {use} ' if use else ''
- msg += f' type: {rule_type}' if rule_type else ''
- print(msg)
- else:
- print(normal_format % (f, t, p, timeout, rule_type if rule_type else ''))
-
- if stats:
- for direction in ('original', 'reply'):
- if direction in packet_count:
- print(' %-8s: packets %s, bytes %s' % direction, packet_count[direction], byte_count[direction])
-
-
-def main():
- parser = argparse.ArgumentParser(description=sys.modules[__name__].__doc__)
- parser.add_argument('--verbose', help='provide more details about the flows', action='store_true')
- parser.add_argument('--proto', help='filter by protocol', default='', type=str)
- parser.add_argument('--file', help='read the conntrack xml from a file', type=str)
- parser.add_argument('--stats', help='add usage statistics', action='store_true')
- parser.add_argument('--type', help='NAT type (source, destination)', required=True, type=str)
- parser.add_argument('--ipaddr', help='source ip address to filter on', type=ipaddress.ip_address)
- parser.add_argument('--pipe', help='read conntrack xml data from stdin', action='store_true')
-
- arg = parser.parse_args()
-
- if arg.type not in ('source', 'destination'):
- sys.exit('Unknown NAT type!')
-
- if arg.pipe:
- process(pipe(), arg.stats, arg.proto, arg.pipe, arg.verbose, arg.type)
- elif arg.file:
- process(content(arg.file), arg.stats, arg.proto, arg.pipe, arg.verbose, arg.type)
- else:
- try:
- process(run(command(arg.type, arg.proto, arg.ipaddr)), arg.stats, arg.proto, arg.pipe, arg.verbose, arg.type)
- except:
- pass
-
-if __name__ == '__main__':
- main()