diff options
| -rw-r--r-- | smoketest/scripts/cli/base_vyostest_shim.py | 23 | ||||
| -rwxr-xr-x | smoketest/scripts/cli/test_firewall.py | 24 | ||||
| -rwxr-xr-x | smoketest/scripts/cli/test_nat.py | 13 | ||||
| -rwxr-xr-x | smoketest/scripts/cli/test_nat66.py | 13 | ||||
| -rwxr-xr-x | smoketest/scripts/cli/test_policy_route.py | 11 | ||||
| -rwxr-xr-x | smoketest/scripts/cli/test_system_conntrack.py | 12 | 
6 files changed, 23 insertions, 73 deletions
| diff --git a/smoketest/scripts/cli/base_vyostest_shim.py b/smoketest/scripts/cli/base_vyostest_shim.py index 140642806..c49d3e76c 100644 --- a/smoketest/scripts/cli/base_vyostest_shim.py +++ b/smoketest/scripts/cli/base_vyostest_shim.py @@ -102,6 +102,29 @@ class VyOSUnitTestSHIM:              ssh_client.close()              return output, error +        # Verify nftables output +        def verify_nftables(self, nftables_search, table, inverse=False, args=''): +            nftables_output = cmd(f'sudo nft {args} list table {table}') + +            for search in nftables_search: +                matched = False +                for line in nftables_output.split("\n"): +                    if all(item in line for item in search): +                        matched = True +                        break +                self.assertTrue(not matched if inverse else matched, msg=search) + +        def verify_nftables_chain(self, nftables_search, table, chain, inverse=False, args=''): +            nftables_output = cmd(f'sudo nft {args} list chain {table} {chain}') + +            for search in nftables_search: +                matched = False +                for line in nftables_output.split("\n"): +                    if all(item in line for item in search): +                        matched = True +                        break +                self.assertTrue(not matched if inverse else matched, msg=search) +  # standard construction; typing suggestion: https://stackoverflow.com/a/70292317  def ignore_warning(warning: Type[Warning]):      import warnings diff --git a/smoketest/scripts/cli/test_firewall.py b/smoketest/scripts/cli/test_firewall.py index bc2848492..be5960bbd 100755 --- a/smoketest/scripts/cli/test_firewall.py +++ b/smoketest/scripts/cli/test_firewall.py @@ -22,7 +22,6 @@ from time import sleep  from base_vyostest_shim import VyOSUnitTestSHIM  from vyos.configsession import ConfigSessionError -from vyos.utils.process import cmd  from vyos.utils.process import run  sysfs_config = { @@ -67,28 +66,6 @@ class TestFirewall(VyOSUnitTestSHIM.TestCase):          self.verify_nftables(nftables_search, 'ip vyos_filter', inverse=True) -    def verify_nftables(self, nftables_search, table, inverse=False, args=''): -        nftables_output = cmd(f'sudo nft {args} list table {table}') - -        for search in nftables_search: -            matched = False -            for line in nftables_output.split("\n"): -                if all(item in line for item in search): -                    matched = True -                    break -            self.assertTrue(not matched if inverse else matched, msg=search) - -    def verify_nftables_chain(self, nftables_search, table, chain, inverse=False, args=''): -        nftables_output = cmd(f'sudo nft {args} list chain {table} {chain}') - -        for search in nftables_search: -            matched = False -            for line in nftables_output.split("\n"): -                if all(item in line for item in search): -                    matched = True -                    break -            self.assertTrue(not matched if inverse else matched, msg=search) -      def wait_for_domain_resolver(self, table, set_name, element, max_wait=10):          # Resolver no longer blocks commit, need to wait for daemon to populate set          count = 0 @@ -808,7 +785,6 @@ class TestFirewall(VyOSUnitTestSHIM.TestCase):              ['ct state related', 'accept']          ] -        nftables_output = cmd('sudo nft list table ip vyos_filter')          self.verify_nftables(nftables_search, 'ip vyos_filter')          self.verify_nftables(nftables_search_v6, 'ip6 vyos_filter') diff --git a/smoketest/scripts/cli/test_nat.py b/smoketest/scripts/cli/test_nat.py index 1e6435df8..4f1c3cb4f 100755 --- a/smoketest/scripts/cli/test_nat.py +++ b/smoketest/scripts/cli/test_nat.py @@ -21,8 +21,6 @@ import unittest  from base_vyostest_shim import VyOSUnitTestSHIM  from vyos.configsession import ConfigSessionError -from vyos.utils.process import cmd -from vyos.utils.dict import dict_search  base_path = ['nat']  src_path = base_path + ['source'] @@ -47,17 +45,6 @@ class TestNAT(VyOSUnitTestSHIM.TestCase):          self.assertFalse(os.path.exists(nftables_nat_config))          self.assertFalse(os.path.exists(nftables_static_nat_conf)) -    def verify_nftables(self, nftables_search, table, inverse=False, args=''): -        nftables_output = cmd(f'sudo nft {args} list table {table}') - -        for search in nftables_search: -            matched = False -            for line in nftables_output.split("\n"): -                if all(item in line for item in search): -                    matched = True -                    break -            self.assertTrue(not matched if inverse else matched, msg=search) -      def wait_for_domain_resolver(self, table, set_name, element, max_wait=10):          # Resolver no longer blocks commit, need to wait for daemon to populate set          count = 0 diff --git a/smoketest/scripts/cli/test_nat66.py b/smoketest/scripts/cli/test_nat66.py index 0607f6616..400a895ff 100755 --- a/smoketest/scripts/cli/test_nat66.py +++ b/smoketest/scripts/cli/test_nat66.py @@ -22,8 +22,6 @@ import unittest  from base_vyostest_shim import VyOSUnitTestSHIM  from vyos.configsession import ConfigSessionError -from vyos.utils.process import cmd -from vyos.utils.dict import dict_search  base_path = ['nat66']  src_path = base_path + ['source'] @@ -42,17 +40,6 @@ class TestNAT66(VyOSUnitTestSHIM.TestCase):          self.cli_delete(base_path)          self.cli_commit() -    def verify_nftables(self, nftables_search, table, inverse=False, args=''): -        nftables_output = cmd(f'sudo nft {args} list table {table}') - -        for search in nftables_search: -            matched = False -            for line in nftables_output.split("\n"): -                if all(item in line for item in search): -                    matched = True -                    break -            self.assertTrue(not matched if inverse else matched, msg=search) -      def test_source_nat66(self):          source_prefix = 'fc00::/64'          translation_prefix = 'fc01::/64' diff --git a/smoketest/scripts/cli/test_policy_route.py b/smoketest/scripts/cli/test_policy_route.py index c0b7c1fe7..462fc24d0 100755 --- a/smoketest/scripts/cli/test_policy_route.py +++ b/smoketest/scripts/cli/test_policy_route.py @@ -68,17 +68,6 @@ class TestPolicyRoute(VyOSUnitTestSHIM.TestCase):          self.verify_rules(ip_rule_search, inverse=True) -    def verify_nftables(self, nftables_search, table, inverse=False): -        nftables_output = cmd(f'sudo nft list table {table}') - -        for search in nftables_search: -            matched = False -            for line in nftables_output.split("\n"): -                if all(item in line for item in search): -                    matched = True -                    break -            self.assertTrue(not matched if inverse else matched, msg=search) -      def verify_rules(self, rules_search, inverse=False):          rule_output = cmd('ip rule show') diff --git a/smoketest/scripts/cli/test_system_conntrack.py b/smoketest/scripts/cli/test_system_conntrack.py index ce237a6e7..f00626b3d 100755 --- a/smoketest/scripts/cli/test_system_conntrack.py +++ b/smoketest/scripts/cli/test_system_conntrack.py @@ -21,7 +21,6 @@ import unittest  from base_vyostest_shim import VyOSUnitTestSHIM  from vyos.firewall import find_nftables_rule -from vyos.utils.process import cmd  from vyos.utils.file import read_file  base_path = ['system', 'conntrack'] @@ -43,17 +42,6 @@ class TestSystemConntrack(VyOSUnitTestSHIM.TestCase):          self.cli_delete(base_path)          self.cli_commit() -    def verify_nftables(self, nftables_search, table, inverse=False, args=''): -        nftables_output = cmd(f'sudo nft {args} list table {table}') - -        for search in nftables_search: -            matched = False -            for line in nftables_output.split("\n"): -                if all(item in line for item in search): -                    matched = True -                    break -            self.assertTrue(not matched if inverse else matched, msg=search) -      def test_conntrack_options(self):          conntrack_config = {              'net.netfilter.nf_conntrack_expect_max' : { | 
