diff options
Diffstat (limited to 'smoketest/scripts/cli/test_nat.py')
-rwxr-xr-x | smoketest/scripts/cli/test_nat.py | 119 |
1 files changed, 69 insertions, 50 deletions
diff --git a/smoketest/scripts/cli/test_nat.py b/smoketest/scripts/cli/test_nat.py index 75c628244..408facfb3 100755 --- a/smoketest/scripts/cli/test_nat.py +++ b/smoketest/scripts/cli/test_nat.py @@ -1,6 +1,6 @@ #!/usr/bin/env python3 # -# Copyright (C) 2020 VyOS maintainers and contributors +# Copyright (C) 2020-2022 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as @@ -14,7 +14,6 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. -import os import jmespath import json import unittest @@ -29,10 +28,13 @@ src_path = base_path + ['source'] dst_path = base_path + ['destination'] class TestNAT(VyOSUnitTestSHIM.TestCase): - def setUp(self): + @classmethod + def setUpClass(cls): + super(TestNAT, cls).setUpClass() + # ensure we can also run this test on a live system - so lets clean # out the current configuration :) - self.cli_delete(base_path) + cls.cli_delete(cls, base_path) def tearDown(self): self.cli_delete(base_path) @@ -57,36 +59,44 @@ class TestNAT(VyOSUnitTestSHIM.TestCase): self.cli_commit() - tmp = cmd('sudo nft -j list table nat') + tmp = cmd('sudo nft -j list chain ip nat POSTROUTING') data_json = jmespath.search('nftables[?rule].rule[?chain]', json.loads(tmp)) for idx in range(0, len(data_json)): - rule = str(rules[idx]) data = data_json[idx] - network = f'192.168.{rule}.0/24' - - self.assertEqual(data['chain'], 'POSTROUTING') - self.assertEqual(data['comment'], f'SRC-NAT-{rule}') - self.assertEqual(data['family'], 'ip') - self.assertEqual(data['table'], 'nat') - - iface = dict_search('match.right', data['expr'][0]) - direction = dict_search('match.left.payload.field', data['expr'][1]) - address = dict_search('match.right.prefix.addr', data['expr'][1]) - mask = dict_search('match.right.prefix.len', data['expr'][1]) + if idx == 0: + self.assertEqual(data['chain'], 'POSTROUTING') + self.assertEqual(data['family'], 'ip') + self.assertEqual(data['table'], 'nat') - if int(rule) < 200: - self.assertEqual(direction, 'saddr') - self.assertEqual(iface, outbound_iface_100) - # check for masquerade keyword - self.assertIn('masquerade', data['expr'][3]) + jump_target = dict_search('jump.target', data['expr'][1]) + self.assertEqual(jump_target,'VYOS_PRE_SNAT_HOOK') else: - self.assertEqual(direction, 'daddr') - self.assertEqual(iface, outbound_iface_200) - # check for return keyword due to 'exclude' - self.assertIn('return', data['expr'][3]) - - self.assertEqual(f'{address}/{mask}', network) + rule = str(rules[idx - 1]) + network = f'192.168.{rule}.0/24' + + self.assertEqual(data['chain'], 'POSTROUTING') + self.assertEqual(data['comment'], f'SRC-NAT-{rule}') + self.assertEqual(data['family'], 'ip') + self.assertEqual(data['table'], 'nat') + + iface = dict_search('match.right', data['expr'][0]) + direction = dict_search('match.left.payload.field', data['expr'][1]) + address = dict_search('match.right.prefix.addr', data['expr'][1]) + mask = dict_search('match.right.prefix.len', data['expr'][1]) + + if int(rule) < 200: + self.assertEqual(direction, 'saddr') + self.assertEqual(iface, outbound_iface_100) + # check for masquerade keyword + self.assertIn('masquerade', data['expr'][3]) + else: + self.assertEqual(direction, 'daddr') + self.assertEqual(iface, outbound_iface_200) + # check for return keyword due to 'exclude' + self.assertIn('return', data['expr'][3]) + + self.assertEqual(f'{address}/{mask}', network) def test_dnat(self): rules = ['100', '110', '120', '130', '200', '210', '220', '230'] @@ -109,33 +119,42 @@ class TestNAT(VyOSUnitTestSHIM.TestCase): self.cli_commit() - tmp = cmd('sudo nft -j list table nat') + tmp = cmd('sudo nft -j list chain ip nat PREROUTING') data_json = jmespath.search('nftables[?rule].rule[?chain]', json.loads(tmp)) for idx in range(0, len(data_json)): - rule = str(rules[idx]) data = data_json[idx] - port = int(f'10{rule}') - - self.assertEqual(data['chain'], 'PREROUTING') - self.assertEqual(data['comment'].split()[0], f'DST-NAT-{rule}') - self.assertEqual(data['family'], 'ip') - self.assertEqual(data['table'], 'nat') - - iface = dict_search('match.right', data['expr'][0]) - direction = dict_search('match.left.payload.field', data['expr'][1]) - protocol = dict_search('match.left.payload.protocol', data['expr'][1]) - dnat_addr = dict_search('dnat.addr', data['expr'][3]) - dnat_port = dict_search('dnat.port', data['expr'][3]) - - self.assertEqual(direction, 'sport') - self.assertEqual(dnat_addr, '192.0.2.1') - self.assertEqual(dnat_port, port) - if int(rule) < 200: - self.assertEqual(iface, inbound_iface_100) - self.assertEqual(protocol, inbound_proto_100) + if idx == 0: + self.assertEqual(data['chain'], 'PREROUTING') + self.assertEqual(data['family'], 'ip') + self.assertEqual(data['table'], 'nat') + + jump_target = dict_search('jump.target', data['expr'][1]) + self.assertEqual(jump_target,'VYOS_PRE_DNAT_HOOK') else: - self.assertEqual(iface, inbound_iface_200) + + rule = str(rules[idx - 1]) + port = int(f'10{rule}') + + self.assertEqual(data['chain'], 'PREROUTING') + self.assertEqual(data['comment'].split()[0], f'DST-NAT-{rule}') + self.assertEqual(data['family'], 'ip') + self.assertEqual(data['table'], 'nat') + + iface = dict_search('match.right', data['expr'][0]) + direction = dict_search('match.left.payload.field', data['expr'][1]) + protocol = dict_search('match.left.payload.protocol', data['expr'][1]) + dnat_addr = dict_search('dnat.addr', data['expr'][3]) + dnat_port = dict_search('dnat.port', data['expr'][3]) + + self.assertEqual(direction, 'sport') + self.assertEqual(dnat_addr, '192.0.2.1') + self.assertEqual(dnat_port, port) + if int(rule) < 200: + self.assertEqual(iface, inbound_iface_100) + self.assertEqual(protocol, inbound_proto_100) + else: + self.assertEqual(iface, inbound_iface_200) def test_snat_required_translation_address(self): # T2813: Ensure translation address is specified |