summaryrefslogtreecommitdiff
path: root/src/op_mode/cgnat.py
diff options
context:
space:
mode:
authorChristian Breunig <christian@breunig.cc>2024-05-21 19:51:18 +0200
committerGitHub <noreply@github.com>2024-05-21 19:51:18 +0200
commit108a42fc013073759c82caf7a6a8c79f417c2c81 (patch)
treeb2afe9b573a209c9c1aeb4cceb218fa00750f17d /src/op_mode/cgnat.py
parenta67cde68b553ee816162ccb7804c3f57664ed004 (diff)
parentc554c483817bfc6ef4f0175298d23355696f8665 (diff)
downloadvyos-1x-108a42fc013073759c82caf7a6a8c79f417c2c81.tar.gz
vyos-1x-108a42fc013073759c82caf7a6a8c79f417c2c81.zip
Merge pull request #3490 from sever-sever/T6366
T6366: CGNAT add ability to get external and internal allocations
Diffstat (limited to 'src/op_mode/cgnat.py')
-rwxr-xr-xsrc/op_mode/cgnat.py44
1 files changed, 33 insertions, 11 deletions
diff --git a/src/op_mode/cgnat.py b/src/op_mode/cgnat.py
index e58b15809..9ad8f92f9 100755
--- a/src/op_mode/cgnat.py
+++ b/src/op_mode/cgnat.py
@@ -16,6 +16,7 @@
import json
import sys
+import typing
from tabulate import tabulate
@@ -27,15 +28,11 @@ from vyos.utils.process import cmd
CGNAT_TABLE = 'cgnat'
-def _get_raw_data():
- """ Get CGNAT dictionary
- """
+def _get_raw_data(external_address: str = '', internal_address: str = '') -> list[dict]:
+ """Get CGNAT dictionary and filter by external or internal address if provided."""
cmd_output = cmd(f'nft --json list table ip {CGNAT_TABLE}')
data = json.loads(cmd_output)
- return data
-
-def _get_formatted_output(data):
elements = data['nftables'][2]['map']['elem']
allocations = []
for elem in elements:
@@ -44,23 +41,48 @@ def _get_formatted_output(data):
start_port = elem[1]['concat'][1]['range'][0]
end_port = elem[1]['concat'][1]['range'][1]
port_range = f'{start_port}-{end_port}'
- allocations.append((internal, external, port_range))
+ if (internal_address and internal != internal_address) or (
+ external_address and external != external_address
+ ):
+ continue
+
+ allocations.append(
+ {
+ 'internal_address': internal,
+ 'external_address': external,
+ 'port_range': port_range,
+ }
+ )
+
+ return allocations
+
+
+def _get_formatted_output(allocations: list[dict]) -> str:
+ # Convert the list of dictionaries to a list of tuples for tabulate
headers = ['Internal IP', 'External IP', 'Port range']
- output = tabulate(allocations, headers, numalign="left")
+ data = [
+ (alloc['internal_address'], alloc['external_address'], alloc['port_range'])
+ for alloc in allocations
+ ]
+ output = tabulate(data, headers, numalign="left")
return output
-def show_allocation(raw: bool):
+def show_allocation(
+ raw: bool,
+ external_address: typing.Optional[str],
+ internal_address: typing.Optional[str],
+) -> str:
config = ConfigTreeQuery()
if not config.exists('nat cgnat'):
raise vyos.opmode.UnconfiguredSubsystem('CGNAT is not configured')
if raw:
- return _get_raw_data()
+ return _get_raw_data(external_address, internal_address)
else:
- raw_data = _get_raw_data()
+ raw_data = _get_raw_data(external_address, internal_address)
return _get_formatted_output(raw_data)