summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rwxr-xr-xsrc/conf_mode/nat_cgnat.py110
-rwxr-xr-xsrc/op_mode/cgnat.py43
2 files changed, 98 insertions, 55 deletions
diff --git a/src/conf_mode/nat_cgnat.py b/src/conf_mode/nat_cgnat.py
index 9a20a3c54..5ad65de80 100755
--- a/src/conf_mode/nat_cgnat.py
+++ b/src/conf_mode/nat_cgnat.py
@@ -189,11 +189,6 @@ def verify(config):
if 'rule' not in config:
raise ConfigError(f'Rule must be defined!')
- # As PoC allow only one rule for CGNAT translations
- # one internal pool and one external pool
- if len(config['rule']) > 1:
- raise ConfigError(f'Only one rule is allowed for translations!')
-
for pool in ('external', 'internal'):
if pool not in config['pool']:
raise ConfigError(f'{pool} pool must be defined!')
@@ -208,6 +203,8 @@ def verify(config):
internal_pools_query = "keys(pool.internal)"
internal_pools: list = jmespath.search(internal_pools_query, config)
+ used_external_pools = {}
+ used_internal_pools = {}
for rule, rule_config in config['rule'].items():
if 'source' not in rule_config:
raise ConfigError(f'Rule "{rule}" source pool must be defined!')
@@ -217,57 +214,82 @@ def verify(config):
if 'translation' not in rule_config:
raise ConfigError(f'Rule "{rule}" translation pool must be defined!')
+ # Check if pool exists
internal_pool = rule_config['source']['pool']
if internal_pool not in internal_pools:
raise ConfigError(f'Internal pool "{internal_pool}" does not exist!')
-
external_pool = rule_config['translation']['pool']
if external_pool not in external_pools:
raise ConfigError(f'External pool "{external_pool}" does not exist!')
+ # Check pool duplication in different rules
+ if external_pool in used_external_pools:
+ raise ConfigError(
+ f'External pool "{external_pool}" is already used in rule '
+ f'{used_external_pools[external_pool]} and cannot be used in '
+ f'rule {rule}!'
+ )
+
+ if internal_pool in used_internal_pools:
+ raise ConfigError(
+ f'Internal pool "{internal_pool}" is already used in rule '
+ f'{used_internal_pools[internal_pool]} and cannot be used in '
+ f'rule {rule}!'
+ )
+
+ used_external_pools[external_pool] = rule
+ used_internal_pools[internal_pool] = rule
+
def generate(config):
if not config:
return None
- # first external pool as we allow only one as PoC
- ext_pool_name = jmespath.search("rule.*.translation | [0]", config).get('pool')
- int_pool_name = jmespath.search("rule.*.source | [0]", config).get('pool')
- ext_query = f'pool.external."{ext_pool_name}".range | keys(@)'
- int_query = f'pool.internal."{int_pool_name}".range'
- external_ranges = jmespath.search(ext_query, config)
- internal_ranges = [jmespath.search(int_query, config)]
-
- external_list_count = []
- external_list_hosts = []
- internal_list_count = []
- internal_list_hosts = []
- for ext_range in external_ranges:
- # External hosts count
- e_count = IPOperations(ext_range).get_ips_count()
- external_list_count.append(e_count)
- # External hosts list
- e_hosts = IPOperations(ext_range).convert_prefix_to_list_ips()
- external_list_hosts.extend(e_hosts)
- for int_range in internal_ranges:
- # Internal hosts count
- i_count = IPOperations(int_range).get_ips_count()
- internal_list_count.append(i_count)
- # Internal hosts list
- i_hosts = IPOperations(int_range).convert_prefix_to_list_ips()
- internal_list_hosts.extend(i_hosts)
-
- external_host_count = sum(external_list_count)
- internal_host_count = sum(internal_list_count)
- ports_per_user = int(
- jmespath.search(f'pool.external."{ext_pool_name}".per_user_limit.port', config)
- )
- external_port_range: str = jmespath.search(
- f'pool.external."{ext_pool_name}".external_port_range', config
- )
- proto_maps, other_maps = generate_port_rules(
- external_list_hosts, internal_list_hosts, ports_per_user, external_port_range
- )
+ proto_maps = []
+ other_maps = []
+
+ for rule, rule_config in config['rule'].items():
+ ext_pool_name: str = rule_config['translation']['pool']
+ int_pool_name: str = rule_config['source']['pool']
+
+ external_ranges: list = [range for range in config['pool']['external'][ext_pool_name]['range']]
+ internal_ranges: list = [range for range in config['pool']['internal'][int_pool_name]['range']]
+ external_list_hosts_count = []
+ external_list_hosts = []
+ internal_list_hosts_count = []
+ internal_list_hosts = []
+
+ for ext_range in external_ranges:
+ # External hosts count
+ e_count = IPOperations(ext_range).get_ips_count()
+ external_list_hosts_count.append(e_count)
+ # External hosts list
+ e_hosts = IPOperations(ext_range).convert_prefix_to_list_ips()
+ external_list_hosts.extend(e_hosts)
+
+ for int_range in internal_ranges:
+ # Internal hosts count
+ i_count = IPOperations(int_range).get_ips_count()
+ internal_list_hosts_count.append(i_count)
+ # Internal hosts list
+ i_hosts = IPOperations(int_range).convert_prefix_to_list_ips()
+ internal_list_hosts.extend(i_hosts)
+
+ external_host_count = sum(external_list_hosts_count)
+ internal_host_count = sum(internal_list_hosts_count)
+ ports_per_user = int(
+ jmespath.search(f'pool.external."{ext_pool_name}".per_user_limit.port', config)
+ )
+ external_port_range: str = jmespath.search(
+ f'pool.external."{ext_pool_name}".external_port_range', config
+ )
+
+ rule_proto_maps, rule_other_maps = generate_port_rules(
+ external_list_hosts, internal_list_hosts, ports_per_user, external_port_range
+ )
+
+ proto_maps.extend(rule_proto_maps)
+ other_maps.extend(rule_other_maps)
config['proto_map_elements'] = ', '.join(proto_maps)
config['other_map_elements'] = ', '.join(other_maps)
diff --git a/src/op_mode/cgnat.py b/src/op_mode/cgnat.py
index a98269a15..9ad8f92f9 100755
--- a/src/op_mode/cgnat.py
+++ b/src/op_mode/cgnat.py
@@ -28,15 +28,11 @@ from vyos.utils.process import cmd
CGNAT_TABLE = 'cgnat'
-def _get_raw_data():
- """ Get CGNAT dictionary
- """
+def _get_raw_data(external_address: str = '', internal_address: str = '') -> list[dict]:
+ """Get CGNAT dictionary and filter by external or internal address if provided."""
cmd_output = cmd(f'nft --json list table ip {CGNAT_TABLE}')
data = json.loads(cmd_output)
- return data
-
-def _get_formatted_output(data):
elements = data['nftables'][2]['map']['elem']
allocations = []
for elem in elements:
@@ -45,23 +41,48 @@ def _get_formatted_output(data):
start_port = elem[1]['concat'][1]['range'][0]
end_port = elem[1]['concat'][1]['range'][1]
port_range = f'{start_port}-{end_port}'
- allocations.append((internal, external, port_range))
+ if (internal_address and internal != internal_address) or (
+ external_address and external != external_address
+ ):
+ continue
+
+ allocations.append(
+ {
+ 'internal_address': internal,
+ 'external_address': external,
+ 'port_range': port_range,
+ }
+ )
+
+ return allocations
+
+
+def _get_formatted_output(allocations: list[dict]) -> str:
+ # Convert the list of dictionaries to a list of tuples for tabulate
headers = ['Internal IP', 'External IP', 'Port range']
- output = tabulate(allocations, headers, numalign="left")
+ data = [
+ (alloc['internal_address'], alloc['external_address'], alloc['port_range'])
+ for alloc in allocations
+ ]
+ output = tabulate(data, headers, numalign="left")
return output
-def show_allocation(raw: bool):
+def show_allocation(
+ raw: bool,
+ external_address: typing.Optional[str],
+ internal_address: typing.Optional[str],
+) -> str:
config = ConfigTreeQuery()
if not config.exists('nat cgnat'):
raise vyos.opmode.UnconfiguredSubsystem('CGNAT is not configured')
if raw:
- return _get_raw_data()
+ return _get_raw_data(external_address, internal_address)
else:
- raw_data = _get_raw_data()
+ raw_data = _get_raw_data(external_address, internal_address)
return _get_formatted_output(raw_data)