diff options
author | Christian Poessinger <christian@poessinger.com> | 2022-09-22 07:58:28 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-09-22 07:58:28 +0200 |
commit | 7ba1f6444d1b7a8d25715623daf75f81521d9667 (patch) | |
tree | 8c391027eef25ae4ffc7e18be291c6df402937e6 /smoketest/scripts/cli | |
parent | f3e6fb5aab6f562dab49f559f31c58c0f86c03df (diff) | |
parent | c6bbe051574acf5ca1501e631d73ac06bdb17b30 (diff) | |
download | vyos-1x-7ba1f6444d1b7a8d25715623daf75f81521d9667.tar.gz vyos-1x-7ba1f6444d1b7a8d25715623daf75f81521d9667.zip |
Merge pull request #1552 from sarthurdev/nat_refactor
nat: nat66: T4605: T4706: Refactor NAT/NAT66 and use new table name
Diffstat (limited to 'smoketest/scripts/cli')
-rwxr-xr-x | smoketest/scripts/cli/test_load_balancning_wan.py | 1 | ||||
-rwxr-xr-x | smoketest/scripts/cli/test_nat.py | 141 | ||||
-rwxr-xr-x | smoketest/scripts/cli/test_nat66.py | 63 |
3 files changed, 94 insertions, 111 deletions
diff --git a/smoketest/scripts/cli/test_load_balancning_wan.py b/smoketest/scripts/cli/test_load_balancning_wan.py index 303dece86..23020b9b1 100755 --- a/smoketest/scripts/cli/test_load_balancning_wan.py +++ b/smoketest/scripts/cli/test_load_balancning_wan.py @@ -177,6 +177,7 @@ class TestLoadBalancingWan(VyOSUnitTestSHIM.TestCase): }""" nat_vyos_pre_snat_hook = """table ip nat { chain VYOS_PRE_SNAT_HOOK { + type nat hook postrouting priority srcnat - 1; policy accept; counter jump WANLOADBALANCE return } diff --git a/smoketest/scripts/cli/test_nat.py b/smoketest/scripts/cli/test_nat.py index 408facfb3..f824838c0 100755 --- a/smoketest/scripts/cli/test_nat.py +++ b/smoketest/scripts/cli/test_nat.py @@ -26,6 +26,7 @@ from vyos.util import dict_search base_path = ['nat'] src_path = base_path + ['source'] dst_path = base_path + ['destination'] +static_path = base_path + ['static'] class TestNAT(VyOSUnitTestSHIM.TestCase): @classmethod @@ -40,10 +41,24 @@ class TestNAT(VyOSUnitTestSHIM.TestCase): self.cli_delete(base_path) self.cli_commit() + def verify_nftables(self, nftables_search, table, inverse=False, args=''): + nftables_output = cmd(f'sudo nft {args} list table {table}') + + for search in nftables_search: + matched = False + for line in nftables_output.split("\n"): + if all(item in line for item in search): + matched = True + break + self.assertTrue(not matched if inverse else matched, msg=search) + def test_snat(self): rules = ['100', '110', '120', '130', '200', '210', '220', '230'] outbound_iface_100 = 'eth0' outbound_iface_200 = 'eth1' + + nftables_search = ['jump VYOS_PRE_SNAT_HOOK'] + for rule in rules: network = f'192.168.{rule}.0/24' # depending of rule order we check either for source address for NAT @@ -52,51 +67,16 @@ class TestNAT(VyOSUnitTestSHIM.TestCase): self.cli_set(src_path + ['rule', rule, 'source', 'address', network]) self.cli_set(src_path + ['rule', rule, 'outbound-interface', outbound_iface_100]) self.cli_set(src_path + ['rule', rule, 'translation', 'address', 'masquerade']) + nftables_search.append([f'saddr {network}', f'oifname "{outbound_iface_100}"', 'masquerade']) else: self.cli_set(src_path + ['rule', rule, 'destination', 'address', network]) self.cli_set(src_path + ['rule', rule, 'outbound-interface', outbound_iface_200]) self.cli_set(src_path + ['rule', rule, 'exclude']) + nftables_search.append([f'daddr {network}', f'oifname "{outbound_iface_200}"', 'return']) self.cli_commit() - tmp = cmd('sudo nft -j list chain ip nat POSTROUTING') - data_json = jmespath.search('nftables[?rule].rule[?chain]', json.loads(tmp)) - - for idx in range(0, len(data_json)): - data = data_json[idx] - if idx == 0: - self.assertEqual(data['chain'], 'POSTROUTING') - self.assertEqual(data['family'], 'ip') - self.assertEqual(data['table'], 'nat') - - jump_target = dict_search('jump.target', data['expr'][1]) - self.assertEqual(jump_target,'VYOS_PRE_SNAT_HOOK') - else: - rule = str(rules[idx - 1]) - network = f'192.168.{rule}.0/24' - - self.assertEqual(data['chain'], 'POSTROUTING') - self.assertEqual(data['comment'], f'SRC-NAT-{rule}') - self.assertEqual(data['family'], 'ip') - self.assertEqual(data['table'], 'nat') - - iface = dict_search('match.right', data['expr'][0]) - direction = dict_search('match.left.payload.field', data['expr'][1]) - address = dict_search('match.right.prefix.addr', data['expr'][1]) - mask = dict_search('match.right.prefix.len', data['expr'][1]) - - if int(rule) < 200: - self.assertEqual(direction, 'saddr') - self.assertEqual(iface, outbound_iface_100) - # check for masquerade keyword - self.assertIn('masquerade', data['expr'][3]) - else: - self.assertEqual(direction, 'daddr') - self.assertEqual(iface, outbound_iface_200) - # check for return keyword due to 'exclude' - self.assertIn('return', data['expr'][3]) - - self.assertEqual(f'{address}/{mask}', network) + self.verify_nftables(nftables_search, 'ip vyos_nat') def test_dnat(self): rules = ['100', '110', '120', '130', '200', '210', '220', '230'] @@ -105,56 +85,29 @@ class TestNAT(VyOSUnitTestSHIM.TestCase): inbound_proto_100 = 'udp' inbound_proto_200 = 'tcp' + nftables_search = ['jump VYOS_PRE_DNAT_HOOK'] + for rule in rules: port = f'10{rule}' self.cli_set(dst_path + ['rule', rule, 'source', 'port', port]) self.cli_set(dst_path + ['rule', rule, 'translation', 'address', '192.0.2.1']) self.cli_set(dst_path + ['rule', rule, 'translation', 'port', port]) + rule_search = [f'dnat to 192.0.2.1:{port}'] if int(rule) < 200: self.cli_set(dst_path + ['rule', rule, 'protocol', inbound_proto_100]) self.cli_set(dst_path + ['rule', rule, 'inbound-interface', inbound_iface_100]) + rule_search.append(f'{inbound_proto_100} sport {port}') + rule_search.append(f'iifname "{inbound_iface_100}"') else: self.cli_set(dst_path + ['rule', rule, 'protocol', inbound_proto_200]) self.cli_set(dst_path + ['rule', rule, 'inbound-interface', inbound_iface_200]) + rule_search.append(f'iifname "{inbound_iface_200}"') - self.cli_commit() - - tmp = cmd('sudo nft -j list chain ip nat PREROUTING') - data_json = jmespath.search('nftables[?rule].rule[?chain]', json.loads(tmp)) - - for idx in range(0, len(data_json)): - data = data_json[idx] - if idx == 0: - self.assertEqual(data['chain'], 'PREROUTING') - self.assertEqual(data['family'], 'ip') - self.assertEqual(data['table'], 'nat') + nftables_search.append(rule_search) - jump_target = dict_search('jump.target', data['expr'][1]) - self.assertEqual(jump_target,'VYOS_PRE_DNAT_HOOK') - else: + self.cli_commit() - rule = str(rules[idx - 1]) - port = int(f'10{rule}') - - self.assertEqual(data['chain'], 'PREROUTING') - self.assertEqual(data['comment'].split()[0], f'DST-NAT-{rule}') - self.assertEqual(data['family'], 'ip') - self.assertEqual(data['table'], 'nat') - - iface = dict_search('match.right', data['expr'][0]) - direction = dict_search('match.left.payload.field', data['expr'][1]) - protocol = dict_search('match.left.payload.protocol', data['expr'][1]) - dnat_addr = dict_search('dnat.addr', data['expr'][3]) - dnat_port = dict_search('dnat.port', data['expr'][3]) - - self.assertEqual(direction, 'sport') - self.assertEqual(dnat_addr, '192.0.2.1') - self.assertEqual(dnat_port, port) - if int(rule) < 200: - self.assertEqual(iface, inbound_iface_100) - self.assertEqual(protocol, inbound_proto_100) - else: - self.assertEqual(iface, inbound_iface_200) + self.verify_nftables(nftables_search, 'ip vyos_nat') def test_snat_required_translation_address(self): # T2813: Ensure translation address is specified @@ -193,8 +146,48 @@ class TestNAT(VyOSUnitTestSHIM.TestCase): # without any rule self.cli_set(src_path) self.cli_set(dst_path) + self.cli_set(static_path) + self.cli_commit() + + def test_dnat_without_translation_address(self): + self.cli_set(dst_path + ['rule', '1', 'inbound-interface', 'eth1']) + self.cli_set(dst_path + ['rule', '1', 'destination', 'port', '443']) + self.cli_set(dst_path + ['rule', '1', 'protocol', 'tcp']) + self.cli_set(dst_path + ['rule', '1', 'translation', 'port', '443']) + + self.cli_commit() + + nftables_search = [ + ['iifname "eth1"', 'tcp dport 443', 'dnat to :443'] + ] + + self.verify_nftables(nftables_search, 'ip vyos_nat') + + def test_static_nat(self): + dst_addr_1 = '10.0.1.1' + translate_addr_1 = '192.168.1.1' + dst_addr_2 = '203.0.113.0/24' + translate_addr_2 = '192.0.2.0/24' + ifname = 'eth0' + + self.cli_set(static_path + ['rule', '10', 'destination', 'address', dst_addr_1]) + self.cli_set(static_path + ['rule', '10', 'inbound-interface', ifname]) + self.cli_set(static_path + ['rule', '10', 'translation', 'address', translate_addr_1]) + + self.cli_set(static_path + ['rule', '20', 'destination', 'address', dst_addr_2]) + self.cli_set(static_path + ['rule', '20', 'inbound-interface', ifname]) + self.cli_set(static_path + ['rule', '20', 'translation', 'address', translate_addr_2]) + self.cli_commit() + nftables_search = [ + [f'iifname "{ifname}"', f'ip daddr {dst_addr_1}', f'dnat to {translate_addr_1}'], + [f'oifname "{ifname}"', f'ip saddr {translate_addr_1}', f'snat to {dst_addr_1}'], + [f'iifname "{ifname}"', f'dnat ip prefix to ip daddr map {{ {dst_addr_2} : {translate_addr_2} }}'], + [f'oifname "{ifname}"', f'snat ip prefix to ip saddr map {{ {translate_addr_2} : {dst_addr_2} }}'] + ] + + self.verify_nftables(nftables_search, 'ip vyos_static_nat') if __name__ == '__main__': unittest.main(verbosity=2) diff --git a/smoketest/scripts/cli/test_nat66.py b/smoketest/scripts/cli/test_nat66.py index 537b094a4..6cf7ca0a1 100755 --- a/smoketest/scripts/cli/test_nat66.py +++ b/smoketest/scripts/cli/test_nat66.py @@ -71,12 +71,12 @@ class TestNAT66(VyOSUnitTestSHIM.TestCase): self.cli_commit() nftables_search = [ - ['oifname "eth1"', 'ip6 saddr fc00::/64', 'snat prefix to fc01::/64'], - ['oifname "eth1"', 'ip6 saddr fc00::/64', 'masquerade'], - ['oifname "eth1"', 'ip6 saddr fc00::/64', 'return'] + ['oifname "eth1"', f'ip6 saddr {source_prefix}', f'snat prefix to {translation_prefix}'], + ['oifname "eth1"', f'ip6 saddr {source_prefix}', 'masquerade'], + ['oifname "eth1"', f'ip6 saddr {source_prefix}', 'return'] ] - self.verify_nftables(nftables_search, 'ip6 nat') + self.verify_nftables(nftables_search, 'ip6 vyos_nat') def test_source_nat66_address(self): source_prefix = 'fc00::/64' @@ -88,25 +88,11 @@ class TestNAT66(VyOSUnitTestSHIM.TestCase): # check validate() - outbound-interface must be defined self.cli_commit() - tmp = cmd('sudo nft -j list table ip6 nat') - data_json = jmespath.search('nftables[?rule].rule[?chain]', json.loads(tmp)) - - for idx in range(0, len(data_json)): - data = data_json[idx] - - self.assertEqual(data['chain'], 'POSTROUTING') - self.assertEqual(data['family'], 'ip6') - self.assertEqual(data['table'], 'nat') - - iface = dict_search('match.right', data['expr'][0]) - address = dict_search('match.right.prefix.addr', data['expr'][2]) - mask = dict_search('match.right.prefix.len', data['expr'][2]) - snat_address = dict_search('snat.addr', data['expr'][3]) + nftables_search = [ + ['oifname "eth1"', f'ip6 saddr {source_prefix}', f'snat to {translation_address}'] + ] - self.assertEqual(iface, 'eth1') - # check for translation address - self.assertEqual(snat_address, translation_address) - self.assertEqual(f'{address}/{mask}', source_prefix) + self.verify_nftables(nftables_search, 'ip6 vyos_nat') def test_destination_nat66(self): destination_address = 'fc00::1' @@ -129,7 +115,7 @@ class TestNAT66(VyOSUnitTestSHIM.TestCase): ['iifname "eth1"', 'ip6 saddr fc02::1', 'ip6 daddr fc00::1', 'return'] ] - self.verify_nftables(nftables_search, 'ip6 nat') + self.verify_nftables(nftables_search, 'ip6 vyos_nat') def test_destination_nat66_protocol(self): translation_address = '2001:db8:1111::1' @@ -153,7 +139,7 @@ class TestNAT66(VyOSUnitTestSHIM.TestCase): ['iifname "eth1"', 'tcp dport 4545', 'ip6 saddr 2001:db8:2222::/64', 'tcp sport 8080', 'dnat to 2001:db8:1111::1:5555'] ] - self.verify_nftables(nftables_search, 'ip6 nat') + self.verify_nftables(nftables_search, 'ip6 vyos_nat') def test_destination_nat66_prefix(self): destination_prefix = 'fc00::/64' @@ -165,22 +151,25 @@ class TestNAT66(VyOSUnitTestSHIM.TestCase): # check validate() - outbound-interface must be defined self.cli_commit() - tmp = cmd('sudo nft -j list table ip6 nat') - data_json = jmespath.search('nftables[?rule].rule[?chain]', json.loads(tmp)) + nftables_search = [ + ['iifname "eth1"', f'ip6 daddr {destination_prefix}', f'dnat prefix to {translation_prefix}'] + ] + + self.verify_nftables(nftables_search, 'ip6 vyos_nat') - for idx in range(0, len(data_json)): - data = data_json[idx] + def test_destination_nat66_without_translation_address(self): + self.cli_set(dst_path + ['rule', '1', 'inbound-interface', 'eth1']) + self.cli_set(dst_path + ['rule', '1', 'destination', 'port', '443']) + self.cli_set(dst_path + ['rule', '1', 'protocol', 'tcp']) + self.cli_set(dst_path + ['rule', '1', 'translation', 'port', '443']) - self.assertEqual(data['chain'], 'PREROUTING') - self.assertEqual(data['family'], 'ip6') - self.assertEqual(data['table'], 'nat') + self.cli_commit() - iface = dict_search('match.right', data['expr'][0]) - translation_address = dict_search('dnat.addr.prefix.addr', data['expr'][3]) - translation_mask = dict_search('dnat.addr.prefix.len', data['expr'][3]) + nftables_search = [ + ['iifname "eth1"', 'tcp dport 443', 'dnat to :443'] + ] - self.assertEqual(f'{translation_address}/{translation_mask}', translation_prefix) - self.assertEqual(iface, 'eth1') + self.verify_nftables(nftables_search, 'ip6 vyos_nat') def test_source_nat66_required_translation_prefix(self): # T2813: Ensure translation address is specified @@ -222,7 +211,7 @@ class TestNAT66(VyOSUnitTestSHIM.TestCase): ['oifname "eth1"', 'ip6 saddr 2001:db8:2222::/64', 'tcp dport 9999', 'tcp sport 8080', 'snat to 2001:db8:1111::1:80'] ] - self.verify_nftables(nftables_search, 'ip6 nat') + self.verify_nftables(nftables_search, 'ip6 vyos_nat') def test_nat66_no_rules(self): # T3206: deleting all rules but keep the direction 'destination' or |