diff options
Diffstat (limited to 'smoketest/scripts/cli/test_system_conntrack.py')
-rwxr-xr-x | smoketest/scripts/cli/test_system_conntrack.py | 143 |
1 files changed, 126 insertions, 17 deletions
diff --git a/smoketest/scripts/cli/test_system_conntrack.py b/smoketest/scripts/cli/test_system_conntrack.py index 2a89aa98b..0dbc97d49 100755 --- a/smoketest/scripts/cli/test_system_conntrack.py +++ b/smoketest/scripts/cli/test_system_conntrack.py @@ -35,6 +35,17 @@ class TestSystemConntrack(VyOSUnitTestSHIM.TestCase): self.cli_delete(base_path) self.cli_commit() + def verify_nftables(self, nftables_search, table, inverse=False, args=''): + nftables_output = cmd(f'sudo nft {args} list table {table}') + + for search in nftables_search: + matched = False + for line in nftables_output.split("\n"): + if all(item in line for item in search): + matched = True + break + self.assertTrue(not matched if inverse else matched, msg=search) + def test_conntrack_options(self): conntrack_config = { 'net.netfilter.nf_conntrack_expect_max' : { @@ -151,27 +162,34 @@ class TestSystemConntrack(VyOSUnitTestSHIM.TestCase): def test_conntrack_module_enable(self): # conntrack helper modules are disabled by default modules = { - 'ftp' : { - 'driver' : ['nf_nat_ftp', 'nf_conntrack_ftp'], + 'ftp': { + 'driver': ['nf_nat_ftp', 'nf_conntrack_ftp'], + 'nftables': ['ct helper set "ftp_tcp"'] }, - 'h323' : { - 'driver' : ['nf_nat_h323', 'nf_conntrack_h323'], + 'h323': { + 'driver': ['nf_nat_h323', 'nf_conntrack_h323'], + 'nftables': ['ct helper set "ras_udp"', + 'ct helper set "q931_tcp"'] }, - 'nfs' : { - 'nftables' : ['ct helper set "rpc_tcp"', - 'ct helper set "rpc_udp"'] + 'nfs': { + 'nftables': ['ct helper set "rpc_tcp"', + 'ct helper set "rpc_udp"'] }, - 'pptp' : { - 'driver' : ['nf_nat_pptp', 'nf_conntrack_pptp'], + 'pptp': { + 'driver': ['nf_nat_pptp', 'nf_conntrack_pptp'], + 'nftables': ['ct helper set "pptp_tcp"'] }, - 'sip' : { - 'driver' : ['nf_nat_sip', 'nf_conntrack_sip'], + 'sip': { + 'driver': ['nf_nat_sip', 'nf_conntrack_sip'], + 'nftables': ['ct helper set "sip_tcp"', + 'ct helper set "sip_udp"'] }, - 'sqlnet' : { - 'nftables' : ['ct helper set "tns_tcp"'] + 'sqlnet': { + 'nftables': ['ct helper set "tns_tcp"'] }, - 'tftp' : { - 'driver' : ['nf_nat_tftp', 'nf_conntrack_tftp'], + 'tftp': { + 'driver': ['nf_nat_tftp', 'nf_conntrack_tftp'], + 'nftables': ['ct helper set "tftp_udp"'] }, } @@ -189,7 +207,7 @@ class TestSystemConntrack(VyOSUnitTestSHIM.TestCase): self.assertTrue(os.path.isdir(f'/sys/module/{driver}')) if 'nftables' in module_options: for rule in module_options['nftables']: - self.assertTrue(find_nftables_rule('raw', 'VYOS_CT_HELPER', [rule]) != None) + self.assertTrue(find_nftables_rule('ip vyos_conntrack', 'VYOS_CT_HELPER', [rule]) != None) # unload modules for module in modules: @@ -205,7 +223,7 @@ class TestSystemConntrack(VyOSUnitTestSHIM.TestCase): self.assertFalse(os.path.isdir(f'/sys/module/{driver}')) if 'nftables' in module_options: for rule in module_options['nftables']: - self.assertTrue(find_nftables_rule('raw', 'VYOS_CT_HELPER', [rule]) == None) + self.assertTrue(find_nftables_rule('ip vyos_conntrack', 'VYOS_CT_HELPER', [rule]) == None) def test_conntrack_hash_size(self): hash_size = '65536' @@ -232,5 +250,96 @@ class TestSystemConntrack(VyOSUnitTestSHIM.TestCase): tmp = read_file('/etc/modprobe.d/vyatta_nf_conntrack.conf') self.assertIn(hash_size_default, tmp) + def test_conntrack_ignore(self): + address_group = 'conntracktest' + address_group_member = '192.168.0.1' + ipv6_address_group = 'conntracktest6' + ipv6_address_group_member = 'dead:beef::1' + + self.cli_set(['firewall', 'group', 'address-group', address_group, 'address', address_group_member]) + self.cli_set(['firewall', 'group', 'ipv6-address-group', ipv6_address_group, 'address', ipv6_address_group_member]) + + self.cli_set(base_path + ['ignore', 'ipv4', 'rule', '1', 'source', 'address', '192.0.2.1']) + self.cli_set(base_path + ['ignore', 'ipv4', 'rule', '1', 'destination', 'address', '192.0.2.2']) + self.cli_set(base_path + ['ignore', 'ipv4', 'rule', '1', 'destination', 'port', '22']) + self.cli_set(base_path + ['ignore', 'ipv4', 'rule', '1', 'protocol', 'tcp']) + self.cli_set(base_path + ['ignore', 'ipv4', 'rule', '1', 'tcp', 'flags', 'syn']) + + self.cli_set(base_path + ['ignore', 'ipv4', 'rule', '2', 'source', 'address', '192.0.2.1']) + self.cli_set(base_path + ['ignore', 'ipv4', 'rule', '2', 'destination', 'group', 'address-group', address_group]) + + self.cli_set(base_path + ['ignore', 'ipv6', 'rule', '11', 'source', 'address', 'fe80::1']) + self.cli_set(base_path + ['ignore', 'ipv6', 'rule', '11', 'destination', 'address', 'fe80::2']) + self.cli_set(base_path + ['ignore', 'ipv6', 'rule', '11', 'destination', 'port', '22']) + self.cli_set(base_path + ['ignore', 'ipv6', 'rule', '11', 'protocol', 'tcp']) + + self.cli_set(base_path + ['ignore', 'ipv6', 'rule', '12', 'source', 'address', 'fe80::1']) + self.cli_set(base_path + ['ignore', 'ipv6', 'rule', '12', 'destination', 'group', 'address-group', ipv6_address_group]) + + self.cli_set(base_path + ['ignore', 'ipv6', 'rule', '13', 'source', 'address', 'fe80::1']) + self.cli_set(base_path + ['ignore', 'ipv6', 'rule', '13', 'destination', 'address', '!fe80::3']) + + self.cli_commit() + + nftables_search = [ + ['ip saddr 192.0.2.1', 'ip daddr 192.0.2.2', 'tcp dport 22', 'tcp flags & syn == syn', 'notrack'], + ['ip saddr 192.0.2.1', 'ip daddr @A_conntracktest', 'notrack'] + ] + + nftables6_search = [ + ['ip6 saddr fe80::1', 'ip6 daddr fe80::2', 'tcp dport 22', 'notrack'], + ['ip6 saddr fe80::1', 'ip6 daddr @A6_conntracktest6', 'notrack'], + ['ip6 saddr fe80::1', 'ip6 daddr != fe80::3', 'notrack'] + ] + + self.verify_nftables(nftables_search, 'ip vyos_conntrack') + self.verify_nftables(nftables6_search, 'ip6 vyos_conntrack') + + self.cli_delete(['firewall']) + + def test_conntrack_timeout_custom(self): + + self.cli_set(base_path + ['timeout', 'custom', 'ipv4', 'rule', '1', 'source', 'address', '192.0.2.1']) + self.cli_set(base_path + ['timeout', 'custom', 'ipv4', 'rule', '1', 'destination', 'address', '192.0.2.2']) + self.cli_set(base_path + ['timeout', 'custom', 'ipv4', 'rule', '1', 'destination', 'port', '22']) + self.cli_set(base_path + ['timeout', 'custom', 'ipv4', 'rule', '1', 'protocol', 'tcp', 'syn-sent', '77']) + self.cli_set(base_path + ['timeout', 'custom', 'ipv4', 'rule', '1', 'protocol', 'tcp', 'close', '88']) + self.cli_set(base_path + ['timeout', 'custom', 'ipv4', 'rule', '1', 'protocol', 'tcp', 'established', '99']) + + self.cli_set(base_path + ['timeout', 'custom', 'ipv4', 'rule', '2', 'inbound-interface', 'eth1']) + self.cli_set(base_path + ['timeout', 'custom', 'ipv4', 'rule', '2', 'source', 'address', '198.51.100.1']) + self.cli_set(base_path + ['timeout', 'custom', 'ipv4', 'rule', '2', 'protocol', 'udp', 'unreplied', '55']) + + self.cli_set(base_path + ['timeout', 'custom', 'ipv6', 'rule', '1', 'source', 'address', '2001:db8::1']) + self.cli_set(base_path + ['timeout', 'custom', 'ipv6', 'rule', '1', 'inbound-interface', 'eth2']) + self.cli_set(base_path + ['timeout', 'custom', 'ipv6', 'rule', '1', 'protocol', 'tcp', 'time-wait', '22']) + self.cli_set(base_path + ['timeout', 'custom', 'ipv6', 'rule', '1', 'protocol', 'tcp', 'last-ack', '33']) + + self.cli_commit() + + nftables_search = [ + ['ct timeout ct-timeout-1 {'], + ['protocol tcp'], + ['policy = { syn_sent : 77, established : 99, close : 88 }'], + ['ct timeout ct-timeout-2 {'], + ['protocol udp'], + ['policy = { unreplied : 55 }'], + ['chain VYOS_CT_TIMEOUT {'], + ['ip saddr 192.0.2.1', 'ip daddr 192.0.2.2', 'tcp dport 22', 'ct timeout set "ct-timeout-1"'], + ['iifname "eth1"', 'meta l4proto udp', 'ip saddr 198.51.100.1', 'ct timeout set "ct-timeout-2"'] + ] + + nftables6_search = [ + ['ct timeout ct-timeout-1 {'], + ['protocol tcp'], + ['policy = { last_ack : 33, time_wait : 22 }'], + ['chain VYOS_CT_TIMEOUT {'], + ['iifname "eth2"', 'meta l4proto tcp', 'ip6 saddr 2001:db8::1', 'ct timeout set "ct-timeout-1"'] + ] + + self.verify_nftables(nftables_search, 'ip vyos_conntrack') + self.verify_nftables(nftables6_search, 'ip6 vyos_conntrack') + + self.cli_delete(['firewall']) if __name__ == '__main__': unittest.main(verbosity=2) |