diff options
author | Christian Breunig <christian@breunig.cc> | 2024-05-21 19:51:18 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-05-21 19:51:18 +0200 |
commit | 108a42fc013073759c82caf7a6a8c79f417c2c81 (patch) | |
tree | b2afe9b573a209c9c1aeb4cceb218fa00750f17d /src/op_mode/cgnat.py | |
parent | a67cde68b553ee816162ccb7804c3f57664ed004 (diff) | |
parent | c554c483817bfc6ef4f0175298d23355696f8665 (diff) | |
download | vyos-1x-108a42fc013073759c82caf7a6a8c79f417c2c81.tar.gz vyos-1x-108a42fc013073759c82caf7a6a8c79f417c2c81.zip |
Merge pull request #3490 from sever-sever/T6366
T6366: CGNAT add ability to get external and internal allocations
Diffstat (limited to 'src/op_mode/cgnat.py')
-rwxr-xr-x | src/op_mode/cgnat.py | 44 |
1 files changed, 33 insertions, 11 deletions
diff --git a/src/op_mode/cgnat.py b/src/op_mode/cgnat.py index e58b15809..9ad8f92f9 100755 --- a/src/op_mode/cgnat.py +++ b/src/op_mode/cgnat.py @@ -16,6 +16,7 @@ import json import sys +import typing from tabulate import tabulate @@ -27,15 +28,11 @@ from vyos.utils.process import cmd CGNAT_TABLE = 'cgnat' -def _get_raw_data(): - """ Get CGNAT dictionary - """ +def _get_raw_data(external_address: str = '', internal_address: str = '') -> list[dict]: + """Get CGNAT dictionary and filter by external or internal address if provided.""" cmd_output = cmd(f'nft --json list table ip {CGNAT_TABLE}') data = json.loads(cmd_output) - return data - -def _get_formatted_output(data): elements = data['nftables'][2]['map']['elem'] allocations = [] for elem in elements: @@ -44,23 +41,48 @@ def _get_formatted_output(data): start_port = elem[1]['concat'][1]['range'][0] end_port = elem[1]['concat'][1]['range'][1] port_range = f'{start_port}-{end_port}' - allocations.append((internal, external, port_range)) + if (internal_address and internal != internal_address) or ( + external_address and external != external_address + ): + continue + + allocations.append( + { + 'internal_address': internal, + 'external_address': external, + 'port_range': port_range, + } + ) + + return allocations + + +def _get_formatted_output(allocations: list[dict]) -> str: + # Convert the list of dictionaries to a list of tuples for tabulate headers = ['Internal IP', 'External IP', 'Port range'] - output = tabulate(allocations, headers, numalign="left") + data = [ + (alloc['internal_address'], alloc['external_address'], alloc['port_range']) + for alloc in allocations + ] + output = tabulate(data, headers, numalign="left") return output -def show_allocation(raw: bool): +def show_allocation( + raw: bool, + external_address: typing.Optional[str], + internal_address: typing.Optional[str], +) -> str: config = ConfigTreeQuery() if not config.exists('nat cgnat'): raise vyos.opmode.UnconfiguredSubsystem('CGNAT is not configured') if raw: - return _get_raw_data() + return _get_raw_data(external_address, internal_address) else: - raw_data = _get_raw_data() + raw_data = _get_raw_data(external_address, internal_address) return _get_formatted_output(raw_data) |