summaryrefslogtreecommitdiff
path: root/smoketest/scripts/cli
diff options
context:
space:
mode:
authorsarthurdev <965089+sarthurdev@users.noreply.github.com>2022-09-20 14:19:23 +0200
committersarthurdev <965089+sarthurdev@users.noreply.github.com>2022-09-21 20:53:49 +0200
commit448d4f6db9cf6dfceffccf988301e5f4d04c9afa (patch)
tree8cdb965b4cabfcdf02c53e7046833d0cd5610df0 /smoketest/scripts/cli
parente9c233d65cfffccca131afb4cfb0bcaae0836c39 (diff)
downloadvyos-1x-448d4f6db9cf6dfceffccf988301e5f4d04c9afa.tar.gz
vyos-1x-448d4f6db9cf6dfceffccf988301e5f4d04c9afa.zip
nat: T4605: Refactor NAT to use python module for parsing rules
* Rename table to vyos_nat * Refactor tests to use `verify_nftables` format
Diffstat (limited to 'smoketest/scripts/cli')
-rwxr-xr-xsmoketest/scripts/cli/test_load_balancning_wan.py1
-rwxr-xr-xsmoketest/scripts/cli/test_nat.py141
2 files changed, 68 insertions, 74 deletions
diff --git a/smoketest/scripts/cli/test_load_balancning_wan.py b/smoketest/scripts/cli/test_load_balancning_wan.py
index 303dece86..23020b9b1 100755
--- a/smoketest/scripts/cli/test_load_balancning_wan.py
+++ b/smoketest/scripts/cli/test_load_balancning_wan.py
@@ -177,6 +177,7 @@ class TestLoadBalancingWan(VyOSUnitTestSHIM.TestCase):
}"""
nat_vyos_pre_snat_hook = """table ip nat {
chain VYOS_PRE_SNAT_HOOK {
+ type nat hook postrouting priority srcnat - 1; policy accept;
counter jump WANLOADBALANCE
return
}
diff --git a/smoketest/scripts/cli/test_nat.py b/smoketest/scripts/cli/test_nat.py
index 408facfb3..5863409be 100755
--- a/smoketest/scripts/cli/test_nat.py
+++ b/smoketest/scripts/cli/test_nat.py
@@ -26,6 +26,7 @@ from vyos.util import dict_search
base_path = ['nat']
src_path = base_path + ['source']
dst_path = base_path + ['destination']
+static_path = base_path + ['static']
class TestNAT(VyOSUnitTestSHIM.TestCase):
@classmethod
@@ -40,10 +41,24 @@ class TestNAT(VyOSUnitTestSHIM.TestCase):
self.cli_delete(base_path)
self.cli_commit()
+ def verify_nftables(self, nftables_search, table, inverse=False, args=''):
+ nftables_output = cmd(f'sudo nft {args} list table {table}')
+
+ for search in nftables_search:
+ matched = False
+ for line in nftables_output.split("\n"):
+ if all(item in line for item in search):
+ matched = True
+ break
+ self.assertTrue(not matched if inverse else matched, msg=search)
+
def test_snat(self):
rules = ['100', '110', '120', '130', '200', '210', '220', '230']
outbound_iface_100 = 'eth0'
outbound_iface_200 = 'eth1'
+
+ nftables_search = ['jump VYOS_PRE_SNAT_HOOK']
+
for rule in rules:
network = f'192.168.{rule}.0/24'
# depending of rule order we check either for source address for NAT
@@ -52,51 +67,16 @@ class TestNAT(VyOSUnitTestSHIM.TestCase):
self.cli_set(src_path + ['rule', rule, 'source', 'address', network])
self.cli_set(src_path + ['rule', rule, 'outbound-interface', outbound_iface_100])
self.cli_set(src_path + ['rule', rule, 'translation', 'address', 'masquerade'])
+ nftables_search.append([f'saddr {network}', f'oifname "{outbound_iface_100}"', 'masquerade'])
else:
self.cli_set(src_path + ['rule', rule, 'destination', 'address', network])
self.cli_set(src_path + ['rule', rule, 'outbound-interface', outbound_iface_200])
self.cli_set(src_path + ['rule', rule, 'exclude'])
+ nftables_search.append([f'daddr {network}', f'oifname "{outbound_iface_200}"', 'return'])
self.cli_commit()
- tmp = cmd('sudo nft -j list chain ip nat POSTROUTING')
- data_json = jmespath.search('nftables[?rule].rule[?chain]', json.loads(tmp))
-
- for idx in range(0, len(data_json)):
- data = data_json[idx]
- if idx == 0:
- self.assertEqual(data['chain'], 'POSTROUTING')
- self.assertEqual(data['family'], 'ip')
- self.assertEqual(data['table'], 'nat')
-
- jump_target = dict_search('jump.target', data['expr'][1])
- self.assertEqual(jump_target,'VYOS_PRE_SNAT_HOOK')
- else:
- rule = str(rules[idx - 1])
- network = f'192.168.{rule}.0/24'
-
- self.assertEqual(data['chain'], 'POSTROUTING')
- self.assertEqual(data['comment'], f'SRC-NAT-{rule}')
- self.assertEqual(data['family'], 'ip')
- self.assertEqual(data['table'], 'nat')
-
- iface = dict_search('match.right', data['expr'][0])
- direction = dict_search('match.left.payload.field', data['expr'][1])
- address = dict_search('match.right.prefix.addr', data['expr'][1])
- mask = dict_search('match.right.prefix.len', data['expr'][1])
-
- if int(rule) < 200:
- self.assertEqual(direction, 'saddr')
- self.assertEqual(iface, outbound_iface_100)
- # check for masquerade keyword
- self.assertIn('masquerade', data['expr'][3])
- else:
- self.assertEqual(direction, 'daddr')
- self.assertEqual(iface, outbound_iface_200)
- # check for return keyword due to 'exclude'
- self.assertIn('return', data['expr'][3])
-
- self.assertEqual(f'{address}/{mask}', network)
+ self.verify_nftables(nftables_search, 'ip vyos_nat')
def test_dnat(self):
rules = ['100', '110', '120', '130', '200', '210', '220', '230']
@@ -105,56 +85,29 @@ class TestNAT(VyOSUnitTestSHIM.TestCase):
inbound_proto_100 = 'udp'
inbound_proto_200 = 'tcp'
+ nftables_search = ['jump VYOS_PRE_DNAT_HOOK']
+
for rule in rules:
port = f'10{rule}'
self.cli_set(dst_path + ['rule', rule, 'source', 'port', port])
self.cli_set(dst_path + ['rule', rule, 'translation', 'address', '192.0.2.1'])
self.cli_set(dst_path + ['rule', rule, 'translation', 'port', port])
+ rule_search = [f'dnat to 192.0.2.1:{port}']
if int(rule) < 200:
self.cli_set(dst_path + ['rule', rule, 'protocol', inbound_proto_100])
self.cli_set(dst_path + ['rule', rule, 'inbound-interface', inbound_iface_100])
+ rule_search.append(f'{inbound_proto_100} sport {port}')
+ rule_search.append(f'iifname "{inbound_iface_100}"')
else:
self.cli_set(dst_path + ['rule', rule, 'protocol', inbound_proto_200])
self.cli_set(dst_path + ['rule', rule, 'inbound-interface', inbound_iface_200])
+ rule_search.append(f'iifname "{inbound_iface_200}"')
- self.cli_commit()
-
- tmp = cmd('sudo nft -j list chain ip nat PREROUTING')
- data_json = jmespath.search('nftables[?rule].rule[?chain]', json.loads(tmp))
-
- for idx in range(0, len(data_json)):
- data = data_json[idx]
- if idx == 0:
- self.assertEqual(data['chain'], 'PREROUTING')
- self.assertEqual(data['family'], 'ip')
- self.assertEqual(data['table'], 'nat')
+ nftables_search.append(rule_search)
- jump_target = dict_search('jump.target', data['expr'][1])
- self.assertEqual(jump_target,'VYOS_PRE_DNAT_HOOK')
- else:
+ self.cli_commit()
- rule = str(rules[idx - 1])
- port = int(f'10{rule}')
-
- self.assertEqual(data['chain'], 'PREROUTING')
- self.assertEqual(data['comment'].split()[0], f'DST-NAT-{rule}')
- self.assertEqual(data['family'], 'ip')
- self.assertEqual(data['table'], 'nat')
-
- iface = dict_search('match.right', data['expr'][0])
- direction = dict_search('match.left.payload.field', data['expr'][1])
- protocol = dict_search('match.left.payload.protocol', data['expr'][1])
- dnat_addr = dict_search('dnat.addr', data['expr'][3])
- dnat_port = dict_search('dnat.port', data['expr'][3])
-
- self.assertEqual(direction, 'sport')
- self.assertEqual(dnat_addr, '192.0.2.1')
- self.assertEqual(dnat_port, port)
- if int(rule) < 200:
- self.assertEqual(iface, inbound_iface_100)
- self.assertEqual(protocol, inbound_proto_100)
- else:
- self.assertEqual(iface, inbound_iface_200)
+ self.verify_nftables(nftables_search, 'ip vyos_nat')
def test_snat_required_translation_address(self):
# T2813: Ensure translation address is specified
@@ -193,8 +146,48 @@ class TestNAT(VyOSUnitTestSHIM.TestCase):
# without any rule
self.cli_set(src_path)
self.cli_set(dst_path)
+ self.cli_set(static_path)
+ self.cli_commit()
+
+ def test_dnat_without_translation_address(self):
+ self.cli_set(dst_path + ['rule', '1', 'inbound-interface', 'eth1'])
+ self.cli_set(dst_path + ['rule', '1', 'destination', 'port', '443'])
+ self.cli_set(dst_path + ['rule', '1', 'protocol', 'tcp'])
+ self.cli_set(dst_path + ['rule', '1', 'translation', 'port', '443'])
+
+ self.cli_commit()
+
+ nftables_search = [
+ ['iifname "eth1"', 'tcp dport 443', 'dnat to :443']
+ ]
+
+ self.verify_nftables(nftables_search, 'ip vyos_nat')
+
+ def test_static_nat(self):
+ dst_addr_1 = '10.0.1.1'
+ translate_addr_1 = '192.168.1.1'
+ dst_addr_2 = '203.0.113.0/24'
+ translate_addr_2 = '192.0.2.0/24'
+ ifname = 'eth0'
+
+ self.cli_set(static_path + ['rule', '10', 'destination', 'address', dst_addr_1])
+ self.cli_set(static_path + ['rule', '10', 'inbound-interface', ifname])
+ self.cli_set(static_path + ['rule', '10', 'translation', 'address', translate_addr_1])
+
+ self.cli_set(static_path + ['rule', '20', 'destination', 'address', dst_addr_2])
+ self.cli_set(static_path + ['rule', '20', 'inbound-interface', ifname])
+ self.cli_set(static_path + ['rule', '20', 'translation', 'address', translate_addr_2])
+
self.cli_commit()
+ nftables_search = [
+ [f'iifname "{ifname}"', f'ip daddr {dst_addr_1}', f'dnat to {translate_addr_1}'],
+ [f'oifname "{ifname}"', f'ip saddr {translate_addr_1}', f'snat to {dst_addr_1}'],
+ [f'iifname "{ifname}"', f'dnat ip prefix to ip daddr map {{ {dst_addr_2} : {translate_addr_2} }}'],
+ [f'oifname "{ifname}"', f'snat ip prefix to ip daddr map {{ {translate_addr_2} : {dst_addr_2} }}']
+ ]
+
+ self.verify_nftables(nftables_search, 'ip vyos_nat')
if __name__ == '__main__':
unittest.main(verbosity=2)