From 34db435e7a74ee8509777802e03927de2dd57627 Mon Sep 17 00:00:00 2001 From: sarthurdev <965089+sarthurdev@users.noreply.github.com> Date: Mon, 13 Jun 2022 01:45:06 +0200 Subject: firewall: T4147: Use named sets for firewall groups * Refactor nftables clean-up code * Adds policy route test for using firewall groups --- smoketest/scripts/cli/test_policy_route.py | 71 ++++++++++++++++++------------ 1 file changed, 44 insertions(+), 27 deletions(-) (limited to 'smoketest/scripts/cli/test_policy_route.py') diff --git a/smoketest/scripts/cli/test_policy_route.py b/smoketest/scripts/cli/test_policy_route.py index e2d70f289..534cfb082 100755 --- a/smoketest/scripts/cli/test_policy_route.py +++ b/smoketest/scripts/cli/test_policy_route.py @@ -47,6 +47,47 @@ class TestPolicyRoute(VyOSUnitTestSHIM.TestCase): self.cli_delete(['policy', 'route6']) self.cli_commit() + nftables_search = [ + ['set N_smoketest_network'], + ['set N_smoketest_network1'], + ['chain VYOS_PBR_smoketest'] + ] + + self.verify_nftables(nftables_search, 'ip filter', inverse=True) + + def verify_nftables(self, nftables_search, table, inverse=False): + nftables_output = cmd(f'sudo nft list table {table}') + + for search in nftables_search: + matched = False + for line in nftables_output.split("\n"): + if all(item in line for item in search): + matched = True + break + self.assertTrue(not matched if inverse else matched, msg=search) + + def test_pbr_group(self): + self.cli_set(['firewall', 'group', 'network-group', 'smoketest_network', 'network', '172.16.99.0/24']) + self.cli_set(['firewall', 'group', 'network-group', 'smoketest_network1', 'network', '172.16.101.0/24']) + self.cli_set(['firewall', 'group', 'network-group', 'smoketest_network1', 'include', 'smoketest_network']) + + self.cli_set(['policy', 'route', 'smoketest', 'rule', '1', 'source', 'group', 'network-group', 'smoketest_network']) + self.cli_set(['policy', 'route', 'smoketest', 'rule', '1', 'destination', 'group', 'network-group', 'smoketest_network1']) + self.cli_set(['policy', 'route', 'smoketest', 'rule', '1', 'set', 'mark', mark]) + + self.cli_set(['interfaces', 'ethernet', interface, 'policy', 'route', 'smoketest']) + + self.cli_commit() + + nftables_search = [ + [f'iifname "{interface}"','jump VYOS_PBR_smoketest'], + ['ip daddr @N_smoketest_network1', 'ip saddr @N_smoketest_network'], + ] + + self.verify_nftables(nftables_search, 'ip mangle') + + self.cli_delete(['firewall']) + def test_pbr_mark(self): self.cli_set(['policy', 'route', 'smoketest', 'rule', '1', 'source', 'address', '172.16.20.10']) self.cli_set(['policy', 'route', 'smoketest', 'rule', '1', 'destination', 'address', '172.16.10.10']) @@ -63,15 +104,7 @@ class TestPolicyRoute(VyOSUnitTestSHIM.TestCase): ['ip daddr 172.16.10.10', 'ip saddr 172.16.20.10', 'meta mark set ' + mark_hex], ] - nftables_output = cmd('sudo nft list table ip mangle') - - for search in nftables_search: - matched = False - for line in nftables_output.split("\n"): - if all(item in line for item in search): - matched = True - break - self.assertTrue(matched) + self.verify_nftables(nftables_search, 'ip mangle') def test_pbr_table(self): self.cli_set(['policy', 'route', 'smoketest', 'rule', '1', 'protocol', 'tcp']) @@ -97,15 +130,7 @@ class TestPolicyRoute(VyOSUnitTestSHIM.TestCase): ['tcp flags & (syn | ack) == syn', 'tcp dport { 8888 }', 'meta mark set ' + mark_hex] ] - nftables_output = cmd('sudo nft list table ip mangle') - - for search in nftables_search: - matched = False - for line in nftables_output.split("\n"): - if all(item in line for item in search): - matched = True - break - self.assertTrue(matched) + self.verify_nftables(nftables_search, 'ip mangle') # IPv6 @@ -114,15 +139,7 @@ class TestPolicyRoute(VyOSUnitTestSHIM.TestCase): ['meta l4proto { tcp, udp }', 'th dport { 8888 }', 'meta mark set ' + mark_hex] ] - nftables6_output = cmd('sudo nft list table ip6 mangle') - - for search in nftables6_search: - matched = False - for line in nftables6_output.split("\n"): - if all(item in line for item in search): - matched = True - break - self.assertTrue(matched) + self.verify_nftables(nftables6_search, 'ip6 mangle') # IP rule fwmark -> table -- cgit v1.2.3