diff options
Diffstat (limited to 'src')
-rwxr-xr-x | src/conf_mode/nat_cgnat.py | 110 | ||||
-rwxr-xr-x | src/op_mode/cgnat.py | 43 |
2 files changed, 98 insertions, 55 deletions
diff --git a/src/conf_mode/nat_cgnat.py b/src/conf_mode/nat_cgnat.py index 9a20a3c54..5ad65de80 100755 --- a/src/conf_mode/nat_cgnat.py +++ b/src/conf_mode/nat_cgnat.py @@ -189,11 +189,6 @@ def verify(config): if 'rule' not in config: raise ConfigError(f'Rule must be defined!') - # As PoC allow only one rule for CGNAT translations - # one internal pool and one external pool - if len(config['rule']) > 1: - raise ConfigError(f'Only one rule is allowed for translations!') - for pool in ('external', 'internal'): if pool not in config['pool']: raise ConfigError(f'{pool} pool must be defined!') @@ -208,6 +203,8 @@ def verify(config): internal_pools_query = "keys(pool.internal)" internal_pools: list = jmespath.search(internal_pools_query, config) + used_external_pools = {} + used_internal_pools = {} for rule, rule_config in config['rule'].items(): if 'source' not in rule_config: raise ConfigError(f'Rule "{rule}" source pool must be defined!') @@ -217,57 +214,82 @@ def verify(config): if 'translation' not in rule_config: raise ConfigError(f'Rule "{rule}" translation pool must be defined!') + # Check if pool exists internal_pool = rule_config['source']['pool'] if internal_pool not in internal_pools: raise ConfigError(f'Internal pool "{internal_pool}" does not exist!') - external_pool = rule_config['translation']['pool'] if external_pool not in external_pools: raise ConfigError(f'External pool "{external_pool}" does not exist!') + # Check pool duplication in different rules + if external_pool in used_external_pools: + raise ConfigError( + f'External pool "{external_pool}" is already used in rule ' + f'{used_external_pools[external_pool]} and cannot be used in ' + f'rule {rule}!' + ) + + if internal_pool in used_internal_pools: + raise ConfigError( + f'Internal pool "{internal_pool}" is already used in rule ' + f'{used_internal_pools[internal_pool]} and cannot be used in ' + f'rule {rule}!' + ) + + used_external_pools[external_pool] = rule + used_internal_pools[internal_pool] = rule + def generate(config): if not config: return None - # first external pool as we allow only one as PoC - ext_pool_name = jmespath.search("rule.*.translation | [0]", config).get('pool') - int_pool_name = jmespath.search("rule.*.source | [0]", config).get('pool') - ext_query = f'pool.external."{ext_pool_name}".range | keys(@)' - int_query = f'pool.internal."{int_pool_name}".range' - external_ranges = jmespath.search(ext_query, config) - internal_ranges = [jmespath.search(int_query, config)] - - external_list_count = [] - external_list_hosts = [] - internal_list_count = [] - internal_list_hosts = [] - for ext_range in external_ranges: - # External hosts count - e_count = IPOperations(ext_range).get_ips_count() - external_list_count.append(e_count) - # External hosts list - e_hosts = IPOperations(ext_range).convert_prefix_to_list_ips() - external_list_hosts.extend(e_hosts) - for int_range in internal_ranges: - # Internal hosts count - i_count = IPOperations(int_range).get_ips_count() - internal_list_count.append(i_count) - # Internal hosts list - i_hosts = IPOperations(int_range).convert_prefix_to_list_ips() - internal_list_hosts.extend(i_hosts) - - external_host_count = sum(external_list_count) - internal_host_count = sum(internal_list_count) - ports_per_user = int( - jmespath.search(f'pool.external."{ext_pool_name}".per_user_limit.port', config) - ) - external_port_range: str = jmespath.search( - f'pool.external."{ext_pool_name}".external_port_range', config - ) - proto_maps, other_maps = generate_port_rules( - external_list_hosts, internal_list_hosts, ports_per_user, external_port_range - ) + proto_maps = [] + other_maps = [] + + for rule, rule_config in config['rule'].items(): + ext_pool_name: str = rule_config['translation']['pool'] + int_pool_name: str = rule_config['source']['pool'] + + external_ranges: list = [range for range in config['pool']['external'][ext_pool_name]['range']] + internal_ranges: list = [range for range in config['pool']['internal'][int_pool_name]['range']] + external_list_hosts_count = [] + external_list_hosts = [] + internal_list_hosts_count = [] + internal_list_hosts = [] + + for ext_range in external_ranges: + # External hosts count + e_count = IPOperations(ext_range).get_ips_count() + external_list_hosts_count.append(e_count) + # External hosts list + e_hosts = IPOperations(ext_range).convert_prefix_to_list_ips() + external_list_hosts.extend(e_hosts) + + for int_range in internal_ranges: + # Internal hosts count + i_count = IPOperations(int_range).get_ips_count() + internal_list_hosts_count.append(i_count) + # Internal hosts list + i_hosts = IPOperations(int_range).convert_prefix_to_list_ips() + internal_list_hosts.extend(i_hosts) + + external_host_count = sum(external_list_hosts_count) + internal_host_count = sum(internal_list_hosts_count) + ports_per_user = int( + jmespath.search(f'pool.external."{ext_pool_name}".per_user_limit.port', config) + ) + external_port_range: str = jmespath.search( + f'pool.external."{ext_pool_name}".external_port_range', config + ) + + rule_proto_maps, rule_other_maps = generate_port_rules( + external_list_hosts, internal_list_hosts, ports_per_user, external_port_range + ) + + proto_maps.extend(rule_proto_maps) + other_maps.extend(rule_other_maps) config['proto_map_elements'] = ', '.join(proto_maps) config['other_map_elements'] = ', '.join(other_maps) diff --git a/src/op_mode/cgnat.py b/src/op_mode/cgnat.py index a98269a15..9ad8f92f9 100755 --- a/src/op_mode/cgnat.py +++ b/src/op_mode/cgnat.py @@ -28,15 +28,11 @@ from vyos.utils.process import cmd CGNAT_TABLE = 'cgnat' -def _get_raw_data(): - """ Get CGNAT dictionary - """ +def _get_raw_data(external_address: str = '', internal_address: str = '') -> list[dict]: + """Get CGNAT dictionary and filter by external or internal address if provided.""" cmd_output = cmd(f'nft --json list table ip {CGNAT_TABLE}') data = json.loads(cmd_output) - return data - -def _get_formatted_output(data): elements = data['nftables'][2]['map']['elem'] allocations = [] for elem in elements: @@ -45,23 +41,48 @@ def _get_formatted_output(data): start_port = elem[1]['concat'][1]['range'][0] end_port = elem[1]['concat'][1]['range'][1] port_range = f'{start_port}-{end_port}' - allocations.append((internal, external, port_range)) + if (internal_address and internal != internal_address) or ( + external_address and external != external_address + ): + continue + + allocations.append( + { + 'internal_address': internal, + 'external_address': external, + 'port_range': port_range, + } + ) + + return allocations + + +def _get_formatted_output(allocations: list[dict]) -> str: + # Convert the list of dictionaries to a list of tuples for tabulate headers = ['Internal IP', 'External IP', 'Port range'] - output = tabulate(allocations, headers, numalign="left") + data = [ + (alloc['internal_address'], alloc['external_address'], alloc['port_range']) + for alloc in allocations + ] + output = tabulate(data, headers, numalign="left") return output -def show_allocation(raw: bool): +def show_allocation( + raw: bool, + external_address: typing.Optional[str], + internal_address: typing.Optional[str], +) -> str: config = ConfigTreeQuery() if not config.exists('nat cgnat'): raise vyos.opmode.UnconfiguredSubsystem('CGNAT is not configured') if raw: - return _get_raw_data() + return _get_raw_data(external_address, internal_address) else: - raw_data = _get_raw_data() + raw_data = _get_raw_data(external_address, internal_address) return _get_formatted_output(raw_data) |