#!/usr/bin/env python3 # # Copyright (C) 2020-2022 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as # published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. import jmespath import json import os import unittest from base_vyostest_shim import VyOSUnitTestSHIM from vyos.configsession import ConfigSessionError base_path = ['nat'] src_path = base_path + ['source'] dst_path = base_path + ['destination'] static_path = base_path + ['static'] nftables_nat_config = '/run/nftables_nat.conf' nftables_static_nat_conf = '/run/nftables_static-nat-rules.nft' class TestNAT(VyOSUnitTestSHIM.TestCase): @classmethod def setUpClass(cls): super(TestNAT, cls).setUpClass() # ensure we can also run this test on a live system - so lets clean # out the current configuration :) cls.cli_delete(cls, base_path) def tearDown(self): self.cli_delete(base_path) self.cli_commit() self.assertFalse(os.path.exists(nftables_nat_config)) self.assertFalse(os.path.exists(nftables_static_nat_conf)) def wait_for_domain_resolver(self, table, set_name, element, max_wait=10): # Resolver no longer blocks commit, need to wait for daemon to populate set count = 0 while count < max_wait: code = run(f'sudo nft get element {table} {set_name} {{ {element} }}') if code == 0: return True count += 1 sleep(1) return False def test_snat(self): rules = ['100', '110', '120', '130', '200', '210', '220', '230'] outbound_iface_100 = 'eth0' outbound_iface_200 = 'eth1' nftables_search = ['jump VYOS_PRE_SNAT_HOOK'] for rule in rules: network = f'192.168.{rule}.0/24' # depending of rule order we check either for source address for NAT # or configured destination address for NAT if int(rule) < 200: self.cli_set(src_path + ['rule', rule, 'source', 'address', network]) self.cli_set(src_path + ['rule', rule, 'outbound-interface', 'name', outbound_iface_100]) self.cli_set(src_path + ['rule', rule, 'translation', 'address', 'masquerade']) nftables_search.append([f'saddr {network}', f'oifname "{outbound_iface_100}"', 'masquerade']) else: self.cli_set(src_path + ['rule', rule, 'destination', 'address', network]) self.cli_set(src_path + ['rule', rule, 'outbound-interface', 'name', outbound_iface_200]) self.cli_set(src_path + ['rule', rule, 'exclude']) nftables_search.append([f'daddr {network}', f'oifname "{outbound_iface_200}"', 'return']) self.cli_commit() self.verify_nftables(nftables_search, 'ip vyos_nat') def test_snat_groups(self): address_group = 'smoketest_addr' address_group_member = '192.0.2.1' interface_group = 'smoketest_ifaces' interface_group_member = 'bond.99' self.cli_set(['firewall', 'group', 'address-group', address_group, 'address', address_group_member]) self.cli_set(['firewall', 'group', 'interface-group', interface_group, 'interface', interface_group_member]) self.cli_set(src_path + ['rule', '100', 'source', 'group', 'address-group', address_group]) self.cli_set(src_path + ['rule', '100', 'outbound-interface', 'group', interface_group]) self.cli_set(src_path + ['rule', '100', 'translation', 'address', 'masquerade']) self.cli_set(src_path + ['rule', '110', 'source', 'group', 'address-group', address_group]) self.cli_set(src_path + ['rule', '110', 'translation', 'address', '203.0.113.1']) self.cli_set(src_path + ['rule', '120', 'source', 'group', 'address-group', address_group]) self.cli_set(src_path + ['rule', '120', 'translation', 'address', '203.0.113.111/32']) self.cli_commit() nftables_search = [ [f'set A_{address_group}'], [f'elements = {{ {address_group_member} }}'], [f'ip saddr @A_{address_group}', f'oifname @I_{interface_group}', 'masquerade'], [f'ip saddr @A_{address_group}', 'snat to 203.0.113.1'], [f'ip saddr @A_{address_group}', 'snat prefix to 203.0.113.111/32'] ] self.verify_nftables(nftables_search, 'ip vyos_nat') self.cli_delete(['firewall']) def test_dnat(self): rules = ['100', '110', '120', '130', '200', '210', '220', '230'] inbound_iface_100 = 'eth0' inbound_iface_200 = 'eth1' inbound_proto_100 = 'udp' inbound_proto_200 = 'tcp' nftables_search = ['jump VYOS_PRE_DNAT_HOOK'] for rule in rules: port = f'10{rule}' self.cli_set(dst_path + ['rule', rule, 'source', 'port', port]) self.cli_set(dst_path + ['rule', rule, 'translation', 'address', '192.0.2.1']) self.cli_set(dst_path + ['rule', rule, 'translation', 'port', port]) rule_search = [f'dnat to 192.0.2.1:{port}'] if int(rule) < 200: self.cli_set(dst_path + ['rule', rule, 'protocol', inbound_proto_100]) self.cli_set(dst_path + ['rule', rule, 'inbound-interface', 'name', inbound_iface_100]) rule_search.append(f'{inbound_proto_100} sport {port}') rule_search.append(f'iifname "{inbound_iface_100}"') else: self.cli_set(dst_path + ['rule', rule, 'protocol', inbound_proto_200]) self.cli_set(dst_path + ['rule', rule, 'inbound-interface', 'name', inbound_iface_200]) rule_search.append(f'iifname "{inbound_iface_200}"') nftables_search.append(rule_search) self.cli_commit() self.verify_nftables(nftables_search, 'ip vyos_nat') def test_snat_required_translation_address(self): # T2813: Ensure translation address is specified rule = '5' self.cli_set(src_path + ['rule', rule, 'source', 'address', '192.0.2.0/24']) # check validate() - translation address not specified with self.assertRaises(ConfigSessionError): self.cli_commit() self.cli_set(src_path + ['rule', rule, 'translation', 'address', 'masquerade']) self.cli_commit() def test_dnat_negated_addresses(self): # T3186: negated addresses are not accepted by nftables rule = '1000' self.cli_set(dst_path + ['rule', rule, 'destination', 'address', '!192.0.2.1']) self.cli_set(dst_path + ['rule', rule, 'destination', 'port', '53']) self.cli_set(dst_path + ['rule', rule, 'inbound-interface', 'name', 'eth0']) self.cli_set(dst_path + ['rule', rule, 'protocol', 'tcp_udp']) self.cli_set(dst_path + ['rule', rule, 'source', 'address', '!192.0.2.1']) self.cli_set(dst_path + ['rule', rule, 'translation', 'address', '192.0.2.1']) self.cli_set(dst_path + ['rule', rule, 'translation', 'port', '53']) self.cli_commit() def test_nat_no_rules(self): # T3206: deleting all rules but keep the direction 'destination' or # 'source' resulteds in KeyError: 'rule'. # # Test that both 'nat destination' and 'nat source' nodes can exist # without any rule self.cli_set(src_path) self.cli_set(dst_path) self.cli_set(static_path) self.cli_commit() def test_dnat_without_translation_address(self): self.cli_set(dst_path + ['rule', '1', 'inbound-interface', 'name', 'eth1']) self.cli_set(dst_path + ['rule', '1', 'destination', 'port', '443']) self.cli_set(dst_path + ['rule', '1', 'protocol', 'tcp']) self.cli_set(dst_path + ['rule', '1', 'packet-type', 'host']) self.cli_set(dst_path + ['rule', '1', 'translation', 'port', '443']) self.cli_commit() nftables_search = [ ['iifname "eth1"', 'tcp dport 443', 'pkttype host', 'dnat to :443'] ] self.verify_nftables(nftables_search, 'ip vyos_nat') def test_static_nat(self): dst_addr_1 = '10.0.1.1' translate_addr_1 = '192.168.1.1' dst_addr_2 = '203.0.113.0/24' translate_addr_2 = '192.0.2.0/24' ifname = 'eth0' self.cli_set(static_path + ['rule', '10', 'destination', 'address', dst_addr_1]) self.cli_set(static_path + ['rule', '10', 'inbound-interface', ifname]) self.cli_set(static_path + ['rule', '10', 'translation', 'address', translate_addr_1]) self.cli_set(static_path + ['rule', '20', 'destination', 'address', dst_addr_2]) self.cli_set(static_path + ['rule', '20', 'inbound-interface', ifname]) self.cli_set(static_path + ['rule', '20', 'translation', 'address', translate_addr_2]) self.cli_commit() nftables_search = [ [f'iifname "{ifname}"', f'ip daddr {dst_addr_1}', f'dnat to {translate_addr_1}'], [f'oifname "{ifname}"', f'ip saddr {translate_addr_1}', f'snat to {dst_addr_1}'], [f'iifname "{ifname}"', f'dnat ip prefix to ip daddr map {{ {dst_addr_2} : {translate_addr_2} }}'], [f'oifname "{ifname}"', f'snat ip prefix to ip saddr map {{ {translate_addr_2} : {dst_addr_2} }}'] ] self.verify_nftables(nftables_search, 'ip vyos_static_nat') def test_dnat_redirect(self): dst_addr_1 = '10.0.1.1' dest_port = '5122' protocol = 'tcp' redirected_port = '22' ifname = 'eth0' self.cli_set(dst_path + ['rule', '10', 'destination', 'address', dst_addr_1]) self.cli_set(dst_path + ['rule', '10', 'destination', 'port', dest_port]) self.cli_set(dst_path + ['rule', '10', 'protocol', protocol]) self.cli_set(dst_path + ['rule', '10', 'inbound-interface', 'name', ifname]) self.cli_set(dst_path + ['rule', '10', 'translation', 'redirect', 'port', redirected_port]) self.cli_set(dst_path + ['rule', '20', 'destination', 'address', dst_addr_1]) self.cli_set(dst_path + ['rule', '20', 'destination', 'port', dest_port]) self.cli_set(dst_path + ['rule', '20', 'protocol', protocol]) self.cli_set(dst_path + ['rule', '20', 'inbound-interface', 'name', ifname]) self.cli_set(dst_path + ['rule', '20', 'translation', 'redirect']) self.cli_commit() nftables_search = [ [f'iifname "{ifname}"', f'ip daddr {dst_addr_1}', f'{protocol} dport {dest_port}', f'redirect to :{redirected_port}'], [f'iifname "{ifname}"', f'ip daddr {dst_addr_1}', f'{protocol} dport {dest_port}', f'redirect'] ] self.verify_nftables(nftables_search, 'ip vyos_nat') def test_nat_balance(self): ifname = 'eth0' member_1 = '198.51.100.1' weight_1 = '10' member_2 = '198.51.100.2' weight_2 = '90' member_3 = '192.0.2.1' weight_3 = '35' member_4 = '192.0.2.2' weight_4 = '65' dst_port = '443' self.cli_set(dst_path + ['rule', '1', 'inbound-interface', 'name', ifname]) self.cli_set(dst_path + ['rule', '1', 'protocol', 'tcp']) self.cli_set(dst_path + ['rule', '1', 'destination', 'port', dst_port]) self.cli_set(dst_path + ['rule', '1', 'load-balance', 'hash', 'source-address']) self.cli_set(dst_path + ['rule', '1', 'load-balance', 'hash', 'source-port']) self.cli_set(dst_path + ['rule', '1', 'load-balance', 'hash', 'destination-address']) self.cli_set(dst_path + ['rule', '1', 'load-balance', 'hash', 'destination-port']) self.cli_set(dst_path + ['rule', '1', 'load-balance', 'backend', member_1, 'weight', weight_1]) self.cli_set(dst_path + ['rule', '1', 'load-balance', 'backend', member_2, 'weight', weight_2]) self.cli_set(src_path + ['rule', '1', 'outbound-interface', 'name', ifname]) self.cli_set(src_path + ['rule', '1', 'load-balance', 'hash', 'random']) self.cli_set(src_path + ['rule', '1', 'load-balance', 'backend', member_3, 'weight', weight_3]) self.cli_set(src_path + ['rule', '1', 'load-balance', 'backend', member_4, 'weight', weight_4]) self.cli_commit() nftables_search = [ [f'iifname "{ifname}"', f'tcp dport {dst_port}', f'dnat to jhash ip saddr . tcp sport . ip daddr . tcp dport mod 100 map', f'0-9 : {member_1}, 10-99 : {member_2}'], [f'oifname "{ifname}"', f'snat to numgen random mod 100 map', f'0-34 : {member_3}, 35-99 : {member_4}'] ] self.verify_nftables(nftables_search, 'ip vyos_nat') def test_snat_net_port_map(self): self.cli_set(src_path + ['rule', '10', 'protocol', 'tcp_udp']) self.cli_set(src_path + ['rule', '10', 'source', 'address', '100.64.0.0/25']) self.cli_set(src_path + ['rule', '10', 'translation', 'address', '203.0.113.0/25']) self.cli_set(src_path + ['rule', '10', 'translation', 'port', '1025-3072']) self.cli_set(src_path + ['rule', '20', 'protocol', 'tcp_udp']) self.cli_set(src_path + ['rule', '20', 'source', 'address', '100.64.0.128/25']) self.cli_set(src_path + ['rule', '20', 'translation', 'address', '203.0.113.128/25']) self.cli_set(src_path + ['rule', '20', 'translation', 'port', '1025-3072']) self.cli_commit() nftables_search = [ ['meta l4proto { tcp, udp }', 'snat ip prefix to ip saddr map { 100.64.0.0/25 : 203.0.113.0/25 . 1025-3072 }', 'comment "SRC-NAT-10"'], ['meta l4proto { tcp, udp }', 'snat ip prefix to ip saddr map { 100.64.0.128/25 : 203.0.113.128/25 . 1025-3072 }', 'comment "SRC-NAT-20"'] ] self.verify_nftables(nftables_search, 'ip vyos_nat') if __name__ == '__main__': unittest.main(verbosity=2)