From 281533f59e9aa3acb255894a58eaae35f0960d70 Mon Sep 17 00:00:00 2001 From: Christian Poessinger Date: Sun, 11 Oct 2020 17:37:09 +0200 Subject: smoketest: nat: extend snat and add dnat test cases --- smoketest/scripts/cli/test_nat.py | 123 ++++++++++++++++++++++++++++++++------ 1 file changed, 105 insertions(+), 18 deletions(-) (limited to 'smoketest') diff --git a/smoketest/scripts/cli/test_nat.py b/smoketest/scripts/cli/test_nat.py index 0bb4761ea..b5bde743b 100755 --- a/smoketest/scripts/cli/test_nat.py +++ b/smoketest/scripts/cli/test_nat.py @@ -19,13 +19,14 @@ import jmespath import json import unittest -from vyos.configsession import ConfigSession, ConfigSessionError +from vyos.configsession import ConfigSession +from vyos.configsession import ConfigSessionError from vyos.util import cmd +from vyos.util import vyos_dict_search base_path = ['nat'] -source_path = base_path + ['source'] - -snat_pattern = 'nftables[?rule].rule[?chain].{chain: chain, comment: comment, address: { network: expr[].match.right.prefix.addr | [0], prefix: expr[].match.right.prefix.len | [0]}}' +src_path = base_path + ['source'] +dst_path = base_path + ['destination'] class TestNAT(unittest.TestCase): def setUp(self): @@ -38,39 +39,125 @@ class TestNAT(unittest.TestCase): self.session.delete(base_path) self.session.commit() - def test_source_nat(self): - """ Configure and validate source NAT rule(s) """ + def test_snat(self): + """ Test source NAT (SNAT) rules """ + + rules = ['100', '110', '120', '130', '200', '210', '220', '230'] + outbound_iface_100 = 'eth0' + outbound_iface_200 = 'eth1' + for rule in rules: + network = f'192.168.{rule}.0/24' + # depending of rule order we check either for source address for NAT + # or configured destination address for NAT + if int(rule) < 200: + self.session.set(src_path + ['rule', rule, 'source', 'address', network]) + self.session.set(src_path + ['rule', rule, 'outbound-interface', outbound_iface_100]) + self.session.set(src_path + ['rule', rule, 'translation', 'address', 'masquerade']) + else: + self.session.set(src_path + ['rule', rule, 'destination', 'address', network]) + self.session.set(src_path + ['rule', rule, 'outbound-interface', outbound_iface_200]) + self.session.set(src_path + ['rule', rule, 'exclude']) + + self.session.commit() - network = '192.168.0.0/16' - self.session.set(source_path + ['rule', '1', 'destination', 'address', network]) - self.session.set(source_path + ['rule', '1', 'exclude']) + tmp = cmd('sudo nft -j list table nat') + data_json = jmespath.search('nftables[?rule].rule[?chain]', json.loads(tmp)) + + for idx in range(0, len(data_json)): + rule = str(rules[idx]) + data = data_json[idx] + network = f'192.168.{rule}.0/24' + + self.assertEqual(data['chain'], 'POSTROUTING') + self.assertEqual(data['comment'], f'SRC-NAT-{rule}') + self.assertEqual(data['family'], 'ip') + self.assertEqual(data['table'], 'nat') + + iface = vyos_dict_search('match.right', data['expr'][0]) + direction = vyos_dict_search('match.left.payload.field', data['expr'][1]) + address = vyos_dict_search('match.right.prefix.addr', data['expr'][1]) + mask = vyos_dict_search('match.right.prefix.len', data['expr'][1]) + + if int(rule) < 200: + self.assertEqual(direction, 'saddr') + self.assertEqual(iface, outbound_iface_100) + # check for masquerade keyword + self.assertIn('masquerade', data['expr'][3]) + else: + self.assertEqual(direction, 'daddr') + self.assertEqual(iface, outbound_iface_200) + # check for return keyword due to 'exclude' + self.assertIn('return', data['expr'][3]) + + self.assertEqual(f'{address}/{mask}', network) + + def test_dnat(self): + """ Test destination NAT (DNAT) rules """ + + rules = ['100', '110', '120', '130', '200', '210', '220', '230'] + inbound_iface_100 = 'eth0' + inbound_iface_200 = 'eth1' + inbound_proto_100 = 'udp' + inbound_proto_200 = 'tcp' + + for rule in rules: + port = f'10{rule}' + self.session.set(dst_path + ['rule', rule, 'source', 'port', port]) + self.session.set(dst_path + ['rule', rule, 'translation', 'address', '192.0.2.1']) + self.session.set(dst_path + ['rule', rule, 'translation', 'port', port]) + if int(rule) < 200: + self.session.set(dst_path + ['rule', rule, 'protocol', inbound_proto_100]) + self.session.set(dst_path + ['rule', rule, 'inbound-interface', inbound_iface_100]) + else: + self.session.set(dst_path + ['rule', rule, 'protocol', inbound_proto_200]) + self.session.set(dst_path + ['rule', rule, 'inbound-interface', inbound_iface_200]) - self.session.set(source_path + ['rule', '1', 'outbound-interface', 'any']) self.session.commit() tmp = cmd('sudo nft -j list table nat') - nftable_json = json.loads(tmp) - condensed_json = jmespath.search(snat_pattern, nftable_json)[0] + data_json = jmespath.search('nftables[?rule].rule[?chain]', json.loads(tmp)) + + for idx in range(0, len(data_json)): + rule = str(rules[idx]) + data = data_json[idx] + port = int(f'10{rule}') + + self.assertEqual(data['chain'], 'PREROUTING') + self.assertEqual(data['comment'].split()[0], f'DST-NAT-{rule}') + self.assertEqual(data['family'], 'ip') + self.assertEqual(data['table'], 'nat') + + iface = vyos_dict_search('match.right', data['expr'][0]) + direction = vyos_dict_search('match.left.payload.field', data['expr'][1]) + protocol = vyos_dict_search('match.left.payload.protocol', data['expr'][1]) + dnat_addr = vyos_dict_search('dnat.addr', data['expr'][3]) + dnat_port = vyos_dict_search('dnat.port', data['expr'][3]) + + self.assertEqual(direction, 'sport') + self.assertEqual(dnat_addr, '192.0.2.1') + self.assertEqual(dnat_port, port) + if int(rule) < 200: + self.assertEqual(iface, inbound_iface_100) + self.assertEqual(protocol, inbound_proto_100) + else: + self.assertEqual(iface, inbound_iface_200) - self.assertEqual(condensed_json['comment'], 'SRC-NAT-1') - self.assertEqual(condensed_json['address']['network'], network.split('/')[0]) - self.assertEqual(str(condensed_json['address']['prefix']), network.split('/')[1]) def test_validation_logic(self): """ T2813: Ensure translation address is specified """ rule = '5' - self.session.set(source_path + ['rule', rule, 'source', 'address', '192.0.2.0/24']) + self.session.set(src_path + ['rule', rule, 'source', 'address', '192.0.2.0/24']) # check validate() - outbound-interface must be defined with self.assertRaises(ConfigSessionError): self.session.commit() - self.session.set(source_path + ['rule', rule, 'outbound-interface', 'eth0']) + self.session.set(src_path + ['rule', rule, 'outbound-interface', 'eth0']) # check validate() - translation address not specified with self.assertRaises(ConfigSessionError): self.session.commit() - self.session.set(source_path + ['rule', rule, 'translation', 'address', 'masquerade']) + self.session.set(src_path + ['rule', rule, 'translation', 'address', 'masquerade']) self.session.commit() if __name__ == '__main__': -- cgit v1.2.3