summaryrefslogtreecommitdiff
path: root/smoketest/scripts/cli
diff options
context:
space:
mode:
authorChristian Poessinger <christian@poessinger.com>2022-09-22 07:58:28 +0200
committerGitHub <noreply@github.com>2022-09-22 07:58:28 +0200
commit7ba1f6444d1b7a8d25715623daf75f81521d9667 (patch)
tree8c391027eef25ae4ffc7e18be291c6df402937e6 /smoketest/scripts/cli
parentf3e6fb5aab6f562dab49f559f31c58c0f86c03df (diff)
parentc6bbe051574acf5ca1501e631d73ac06bdb17b30 (diff)
downloadvyos-1x-7ba1f6444d1b7a8d25715623daf75f81521d9667.tar.gz
vyos-1x-7ba1f6444d1b7a8d25715623daf75f81521d9667.zip
Merge pull request #1552 from sarthurdev/nat_refactor
nat: nat66: T4605: T4706: Refactor NAT/NAT66 and use new table name
Diffstat (limited to 'smoketest/scripts/cli')
-rwxr-xr-xsmoketest/scripts/cli/test_load_balancning_wan.py1
-rwxr-xr-xsmoketest/scripts/cli/test_nat.py141
-rwxr-xr-xsmoketest/scripts/cli/test_nat66.py63
3 files changed, 94 insertions, 111 deletions
diff --git a/smoketest/scripts/cli/test_load_balancning_wan.py b/smoketest/scripts/cli/test_load_balancning_wan.py
index 303dece86..23020b9b1 100755
--- a/smoketest/scripts/cli/test_load_balancning_wan.py
+++ b/smoketest/scripts/cli/test_load_balancning_wan.py
@@ -177,6 +177,7 @@ class TestLoadBalancingWan(VyOSUnitTestSHIM.TestCase):
}"""
nat_vyos_pre_snat_hook = """table ip nat {
chain VYOS_PRE_SNAT_HOOK {
+ type nat hook postrouting priority srcnat - 1; policy accept;
counter jump WANLOADBALANCE
return
}
diff --git a/smoketest/scripts/cli/test_nat.py b/smoketest/scripts/cli/test_nat.py
index 408facfb3..f824838c0 100755
--- a/smoketest/scripts/cli/test_nat.py
+++ b/smoketest/scripts/cli/test_nat.py
@@ -26,6 +26,7 @@ from vyos.util import dict_search
base_path = ['nat']
src_path = base_path + ['source']
dst_path = base_path + ['destination']
+static_path = base_path + ['static']
class TestNAT(VyOSUnitTestSHIM.TestCase):
@classmethod
@@ -40,10 +41,24 @@ class TestNAT(VyOSUnitTestSHIM.TestCase):
self.cli_delete(base_path)
self.cli_commit()
+ def verify_nftables(self, nftables_search, table, inverse=False, args=''):
+ nftables_output = cmd(f'sudo nft {args} list table {table}')
+
+ for search in nftables_search:
+ matched = False
+ for line in nftables_output.split("\n"):
+ if all(item in line for item in search):
+ matched = True
+ break
+ self.assertTrue(not matched if inverse else matched, msg=search)
+
def test_snat(self):
rules = ['100', '110', '120', '130', '200', '210', '220', '230']
outbound_iface_100 = 'eth0'
outbound_iface_200 = 'eth1'
+
+ nftables_search = ['jump VYOS_PRE_SNAT_HOOK']
+
for rule in rules:
network = f'192.168.{rule}.0/24'
# depending of rule order we check either for source address for NAT
@@ -52,51 +67,16 @@ class TestNAT(VyOSUnitTestSHIM.TestCase):
self.cli_set(src_path + ['rule', rule, 'source', 'address', network])
self.cli_set(src_path + ['rule', rule, 'outbound-interface', outbound_iface_100])
self.cli_set(src_path + ['rule', rule, 'translation', 'address', 'masquerade'])
+ nftables_search.append([f'saddr {network}', f'oifname "{outbound_iface_100}"', 'masquerade'])
else:
self.cli_set(src_path + ['rule', rule, 'destination', 'address', network])
self.cli_set(src_path + ['rule', rule, 'outbound-interface', outbound_iface_200])
self.cli_set(src_path + ['rule', rule, 'exclude'])
+ nftables_search.append([f'daddr {network}', f'oifname "{outbound_iface_200}"', 'return'])
self.cli_commit()
- tmp = cmd('sudo nft -j list chain ip nat POSTROUTING')
- data_json = jmespath.search('nftables[?rule].rule[?chain]', json.loads(tmp))
-
- for idx in range(0, len(data_json)):
- data = data_json[idx]
- if idx == 0:
- self.assertEqual(data['chain'], 'POSTROUTING')
- self.assertEqual(data['family'], 'ip')
- self.assertEqual(data['table'], 'nat')
-
- jump_target = dict_search('jump.target', data['expr'][1])
- self.assertEqual(jump_target,'VYOS_PRE_SNAT_HOOK')
- else:
- rule = str(rules[idx - 1])
- network = f'192.168.{rule}.0/24'
-
- self.assertEqual(data['chain'], 'POSTROUTING')
- self.assertEqual(data['comment'], f'SRC-NAT-{rule}')
- self.assertEqual(data['family'], 'ip')
- self.assertEqual(data['table'], 'nat')
-
- iface = dict_search('match.right', data['expr'][0])
- direction = dict_search('match.left.payload.field', data['expr'][1])
- address = dict_search('match.right.prefix.addr', data['expr'][1])
- mask = dict_search('match.right.prefix.len', data['expr'][1])
-
- if int(rule) < 200:
- self.assertEqual(direction, 'saddr')
- self.assertEqual(iface, outbound_iface_100)
- # check for masquerade keyword
- self.assertIn('masquerade', data['expr'][3])
- else:
- self.assertEqual(direction, 'daddr')
- self.assertEqual(iface, outbound_iface_200)
- # check for return keyword due to 'exclude'
- self.assertIn('return', data['expr'][3])
-
- self.assertEqual(f'{address}/{mask}', network)
+ self.verify_nftables(nftables_search, 'ip vyos_nat')
def test_dnat(self):
rules = ['100', '110', '120', '130', '200', '210', '220', '230']
@@ -105,56 +85,29 @@ class TestNAT(VyOSUnitTestSHIM.TestCase):
inbound_proto_100 = 'udp'
inbound_proto_200 = 'tcp'
+ nftables_search = ['jump VYOS_PRE_DNAT_HOOK']
+
for rule in rules:
port = f'10{rule}'
self.cli_set(dst_path + ['rule', rule, 'source', 'port', port])
self.cli_set(dst_path + ['rule', rule, 'translation', 'address', '192.0.2.1'])
self.cli_set(dst_path + ['rule', rule, 'translation', 'port', port])
+ rule_search = [f'dnat to 192.0.2.1:{port}']
if int(rule) < 200:
self.cli_set(dst_path + ['rule', rule, 'protocol', inbound_proto_100])
self.cli_set(dst_path + ['rule', rule, 'inbound-interface', inbound_iface_100])
+ rule_search.append(f'{inbound_proto_100} sport {port}')
+ rule_search.append(f'iifname "{inbound_iface_100}"')
else:
self.cli_set(dst_path + ['rule', rule, 'protocol', inbound_proto_200])
self.cli_set(dst_path + ['rule', rule, 'inbound-interface', inbound_iface_200])
+ rule_search.append(f'iifname "{inbound_iface_200}"')
- self.cli_commit()
-
- tmp = cmd('sudo nft -j list chain ip nat PREROUTING')
- data_json = jmespath.search('nftables[?rule].rule[?chain]', json.loads(tmp))
-
- for idx in range(0, len(data_json)):
- data = data_json[idx]
- if idx == 0:
- self.assertEqual(data['chain'], 'PREROUTING')
- self.assertEqual(data['family'], 'ip')
- self.assertEqual(data['table'], 'nat')
+ nftables_search.append(rule_search)
- jump_target = dict_search('jump.target', data['expr'][1])
- self.assertEqual(jump_target,'VYOS_PRE_DNAT_HOOK')
- else:
+ self.cli_commit()
- rule = str(rules[idx - 1])
- port = int(f'10{rule}')
-
- self.assertEqual(data['chain'], 'PREROUTING')
- self.assertEqual(data['comment'].split()[0], f'DST-NAT-{rule}')
- self.assertEqual(data['family'], 'ip')
- self.assertEqual(data['table'], 'nat')
-
- iface = dict_search('match.right', data['expr'][0])
- direction = dict_search('match.left.payload.field', data['expr'][1])
- protocol = dict_search('match.left.payload.protocol', data['expr'][1])
- dnat_addr = dict_search('dnat.addr', data['expr'][3])
- dnat_port = dict_search('dnat.port', data['expr'][3])
-
- self.assertEqual(direction, 'sport')
- self.assertEqual(dnat_addr, '192.0.2.1')
- self.assertEqual(dnat_port, port)
- if int(rule) < 200:
- self.assertEqual(iface, inbound_iface_100)
- self.assertEqual(protocol, inbound_proto_100)
- else:
- self.assertEqual(iface, inbound_iface_200)
+ self.verify_nftables(nftables_search, 'ip vyos_nat')
def test_snat_required_translation_address(self):
# T2813: Ensure translation address is specified
@@ -193,8 +146,48 @@ class TestNAT(VyOSUnitTestSHIM.TestCase):
# without any rule
self.cli_set(src_path)
self.cli_set(dst_path)
+ self.cli_set(static_path)
+ self.cli_commit()
+
+ def test_dnat_without_translation_address(self):
+ self.cli_set(dst_path + ['rule', '1', 'inbound-interface', 'eth1'])
+ self.cli_set(dst_path + ['rule', '1', 'destination', 'port', '443'])
+ self.cli_set(dst_path + ['rule', '1', 'protocol', 'tcp'])
+ self.cli_set(dst_path + ['rule', '1', 'translation', 'port', '443'])
+
+ self.cli_commit()
+
+ nftables_search = [
+ ['iifname "eth1"', 'tcp dport 443', 'dnat to :443']
+ ]
+
+ self.verify_nftables(nftables_search, 'ip vyos_nat')
+
+ def test_static_nat(self):
+ dst_addr_1 = '10.0.1.1'
+ translate_addr_1 = '192.168.1.1'
+ dst_addr_2 = '203.0.113.0/24'
+ translate_addr_2 = '192.0.2.0/24'
+ ifname = 'eth0'
+
+ self.cli_set(static_path + ['rule', '10', 'destination', 'address', dst_addr_1])
+ self.cli_set(static_path + ['rule', '10', 'inbound-interface', ifname])
+ self.cli_set(static_path + ['rule', '10', 'translation', 'address', translate_addr_1])
+
+ self.cli_set(static_path + ['rule', '20', 'destination', 'address', dst_addr_2])
+ self.cli_set(static_path + ['rule', '20', 'inbound-interface', ifname])
+ self.cli_set(static_path + ['rule', '20', 'translation', 'address', translate_addr_2])
+
self.cli_commit()
+ nftables_search = [
+ [f'iifname "{ifname}"', f'ip daddr {dst_addr_1}', f'dnat to {translate_addr_1}'],
+ [f'oifname "{ifname}"', f'ip saddr {translate_addr_1}', f'snat to {dst_addr_1}'],
+ [f'iifname "{ifname}"', f'dnat ip prefix to ip daddr map {{ {dst_addr_2} : {translate_addr_2} }}'],
+ [f'oifname "{ifname}"', f'snat ip prefix to ip saddr map {{ {translate_addr_2} : {dst_addr_2} }}']
+ ]
+
+ self.verify_nftables(nftables_search, 'ip vyos_static_nat')
if __name__ == '__main__':
unittest.main(verbosity=2)
diff --git a/smoketest/scripts/cli/test_nat66.py b/smoketest/scripts/cli/test_nat66.py
index 537b094a4..6cf7ca0a1 100755
--- a/smoketest/scripts/cli/test_nat66.py
+++ b/smoketest/scripts/cli/test_nat66.py
@@ -71,12 +71,12 @@ class TestNAT66(VyOSUnitTestSHIM.TestCase):
self.cli_commit()
nftables_search = [
- ['oifname "eth1"', 'ip6 saddr fc00::/64', 'snat prefix to fc01::/64'],
- ['oifname "eth1"', 'ip6 saddr fc00::/64', 'masquerade'],
- ['oifname "eth1"', 'ip6 saddr fc00::/64', 'return']
+ ['oifname "eth1"', f'ip6 saddr {source_prefix}', f'snat prefix to {translation_prefix}'],
+ ['oifname "eth1"', f'ip6 saddr {source_prefix}', 'masquerade'],
+ ['oifname "eth1"', f'ip6 saddr {source_prefix}', 'return']
]
- self.verify_nftables(nftables_search, 'ip6 nat')
+ self.verify_nftables(nftables_search, 'ip6 vyos_nat')
def test_source_nat66_address(self):
source_prefix = 'fc00::/64'
@@ -88,25 +88,11 @@ class TestNAT66(VyOSUnitTestSHIM.TestCase):
# check validate() - outbound-interface must be defined
self.cli_commit()
- tmp = cmd('sudo nft -j list table ip6 nat')
- data_json = jmespath.search('nftables[?rule].rule[?chain]', json.loads(tmp))
-
- for idx in range(0, len(data_json)):
- data = data_json[idx]
-
- self.assertEqual(data['chain'], 'POSTROUTING')
- self.assertEqual(data['family'], 'ip6')
- self.assertEqual(data['table'], 'nat')
-
- iface = dict_search('match.right', data['expr'][0])
- address = dict_search('match.right.prefix.addr', data['expr'][2])
- mask = dict_search('match.right.prefix.len', data['expr'][2])
- snat_address = dict_search('snat.addr', data['expr'][3])
+ nftables_search = [
+ ['oifname "eth1"', f'ip6 saddr {source_prefix}', f'snat to {translation_address}']
+ ]
- self.assertEqual(iface, 'eth1')
- # check for translation address
- self.assertEqual(snat_address, translation_address)
- self.assertEqual(f'{address}/{mask}', source_prefix)
+ self.verify_nftables(nftables_search, 'ip6 vyos_nat')
def test_destination_nat66(self):
destination_address = 'fc00::1'
@@ -129,7 +115,7 @@ class TestNAT66(VyOSUnitTestSHIM.TestCase):
['iifname "eth1"', 'ip6 saddr fc02::1', 'ip6 daddr fc00::1', 'return']
]
- self.verify_nftables(nftables_search, 'ip6 nat')
+ self.verify_nftables(nftables_search, 'ip6 vyos_nat')
def test_destination_nat66_protocol(self):
translation_address = '2001:db8:1111::1'
@@ -153,7 +139,7 @@ class TestNAT66(VyOSUnitTestSHIM.TestCase):
['iifname "eth1"', 'tcp dport 4545', 'ip6 saddr 2001:db8:2222::/64', 'tcp sport 8080', 'dnat to 2001:db8:1111::1:5555']
]
- self.verify_nftables(nftables_search, 'ip6 nat')
+ self.verify_nftables(nftables_search, 'ip6 vyos_nat')
def test_destination_nat66_prefix(self):
destination_prefix = 'fc00::/64'
@@ -165,22 +151,25 @@ class TestNAT66(VyOSUnitTestSHIM.TestCase):
# check validate() - outbound-interface must be defined
self.cli_commit()
- tmp = cmd('sudo nft -j list table ip6 nat')
- data_json = jmespath.search('nftables[?rule].rule[?chain]', json.loads(tmp))
+ nftables_search = [
+ ['iifname "eth1"', f'ip6 daddr {destination_prefix}', f'dnat prefix to {translation_prefix}']
+ ]
+
+ self.verify_nftables(nftables_search, 'ip6 vyos_nat')
- for idx in range(0, len(data_json)):
- data = data_json[idx]
+ def test_destination_nat66_without_translation_address(self):
+ self.cli_set(dst_path + ['rule', '1', 'inbound-interface', 'eth1'])
+ self.cli_set(dst_path + ['rule', '1', 'destination', 'port', '443'])
+ self.cli_set(dst_path + ['rule', '1', 'protocol', 'tcp'])
+ self.cli_set(dst_path + ['rule', '1', 'translation', 'port', '443'])
- self.assertEqual(data['chain'], 'PREROUTING')
- self.assertEqual(data['family'], 'ip6')
- self.assertEqual(data['table'], 'nat')
+ self.cli_commit()
- iface = dict_search('match.right', data['expr'][0])
- translation_address = dict_search('dnat.addr.prefix.addr', data['expr'][3])
- translation_mask = dict_search('dnat.addr.prefix.len', data['expr'][3])
+ nftables_search = [
+ ['iifname "eth1"', 'tcp dport 443', 'dnat to :443']
+ ]
- self.assertEqual(f'{translation_address}/{translation_mask}', translation_prefix)
- self.assertEqual(iface, 'eth1')
+ self.verify_nftables(nftables_search, 'ip6 vyos_nat')
def test_source_nat66_required_translation_prefix(self):
# T2813: Ensure translation address is specified
@@ -222,7 +211,7 @@ class TestNAT66(VyOSUnitTestSHIM.TestCase):
['oifname "eth1"', 'ip6 saddr 2001:db8:2222::/64', 'tcp dport 9999', 'tcp sport 8080', 'snat to 2001:db8:1111::1:80']
]
- self.verify_nftables(nftables_search, 'ip6 nat')
+ self.verify_nftables(nftables_search, 'ip6 vyos_nat')
def test_nat66_no_rules(self):
# T3206: deleting all rules but keep the direction 'destination' or