From bc9ccaeda54279022b73a806fa8aa77c523fbecc Mon Sep 17 00:00:00 2001
From: sarthurdev <965089+sarthurdev@users.noreply.github.com>
Date: Tue, 27 Feb 2024 21:29:08 +0100
Subject: smoketest: T5160: Deduplicate nftables verify functions to testcase
 class, remove obsolete imports

---
 smoketest/scripts/cli/base_vyostest_shim.py    | 23 +++++++++++++++++++++++
 smoketest/scripts/cli/test_firewall.py         | 24 ------------------------
 smoketest/scripts/cli/test_nat.py              | 13 -------------
 smoketest/scripts/cli/test_nat66.py            | 13 -------------
 smoketest/scripts/cli/test_policy_route.py     | 11 -----------
 smoketest/scripts/cli/test_system_conntrack.py | 12 ------------
 6 files changed, 23 insertions(+), 73 deletions(-)

(limited to 'smoketest')

diff --git a/smoketest/scripts/cli/base_vyostest_shim.py b/smoketest/scripts/cli/base_vyostest_shim.py
index 140642806..c49d3e76c 100644
--- a/smoketest/scripts/cli/base_vyostest_shim.py
+++ b/smoketest/scripts/cli/base_vyostest_shim.py
@@ -102,6 +102,29 @@ class VyOSUnitTestSHIM:
             ssh_client.close()
             return output, error
 
+        # Verify nftables output
+        def verify_nftables(self, nftables_search, table, inverse=False, args=''):
+            nftables_output = cmd(f'sudo nft {args} list table {table}')
+
+            for search in nftables_search:
+                matched = False
+                for line in nftables_output.split("\n"):
+                    if all(item in line for item in search):
+                        matched = True
+                        break
+                self.assertTrue(not matched if inverse else matched, msg=search)
+
+        def verify_nftables_chain(self, nftables_search, table, chain, inverse=False, args=''):
+            nftables_output = cmd(f'sudo nft {args} list chain {table} {chain}')
+
+            for search in nftables_search:
+                matched = False
+                for line in nftables_output.split("\n"):
+                    if all(item in line for item in search):
+                        matched = True
+                        break
+                self.assertTrue(not matched if inverse else matched, msg=search)
+
 # standard construction; typing suggestion: https://stackoverflow.com/a/70292317
 def ignore_warning(warning: Type[Warning]):
     import warnings
diff --git a/smoketest/scripts/cli/test_firewall.py b/smoketest/scripts/cli/test_firewall.py
index bc2848492..be5960bbd 100755
--- a/smoketest/scripts/cli/test_firewall.py
+++ b/smoketest/scripts/cli/test_firewall.py
@@ -22,7 +22,6 @@ from time import sleep
 from base_vyostest_shim import VyOSUnitTestSHIM
 
 from vyos.configsession import ConfigSessionError
-from vyos.utils.process import cmd
 from vyos.utils.process import run
 
 sysfs_config = {
@@ -67,28 +66,6 @@ class TestFirewall(VyOSUnitTestSHIM.TestCase):
 
         self.verify_nftables(nftables_search, 'ip vyos_filter', inverse=True)
 
-    def verify_nftables(self, nftables_search, table, inverse=False, args=''):
-        nftables_output = cmd(f'sudo nft {args} list table {table}')
-
-        for search in nftables_search:
-            matched = False
-            for line in nftables_output.split("\n"):
-                if all(item in line for item in search):
-                    matched = True
-                    break
-            self.assertTrue(not matched if inverse else matched, msg=search)
-
-    def verify_nftables_chain(self, nftables_search, table, chain, inverse=False, args=''):
-        nftables_output = cmd(f'sudo nft {args} list chain {table} {chain}')
-
-        for search in nftables_search:
-            matched = False
-            for line in nftables_output.split("\n"):
-                if all(item in line for item in search):
-                    matched = True
-                    break
-            self.assertTrue(not matched if inverse else matched, msg=search)
-
     def wait_for_domain_resolver(self, table, set_name, element, max_wait=10):
         # Resolver no longer blocks commit, need to wait for daemon to populate set
         count = 0
@@ -808,7 +785,6 @@ class TestFirewall(VyOSUnitTestSHIM.TestCase):
             ['ct state related', 'accept']
         ]
 
-        nftables_output = cmd('sudo nft list table ip vyos_filter')
         self.verify_nftables(nftables_search, 'ip vyos_filter')
         self.verify_nftables(nftables_search_v6, 'ip6 vyos_filter')
 
diff --git a/smoketest/scripts/cli/test_nat.py b/smoketest/scripts/cli/test_nat.py
index 1e6435df8..4f1c3cb4f 100755
--- a/smoketest/scripts/cli/test_nat.py
+++ b/smoketest/scripts/cli/test_nat.py
@@ -21,8 +21,6 @@ import unittest
 
 from base_vyostest_shim import VyOSUnitTestSHIM
 from vyos.configsession import ConfigSessionError
-from vyos.utils.process import cmd
-from vyos.utils.dict import dict_search
 
 base_path = ['nat']
 src_path = base_path + ['source']
@@ -47,17 +45,6 @@ class TestNAT(VyOSUnitTestSHIM.TestCase):
         self.assertFalse(os.path.exists(nftables_nat_config))
         self.assertFalse(os.path.exists(nftables_static_nat_conf))
 
-    def verify_nftables(self, nftables_search, table, inverse=False, args=''):
-        nftables_output = cmd(f'sudo nft {args} list table {table}')
-
-        for search in nftables_search:
-            matched = False
-            for line in nftables_output.split("\n"):
-                if all(item in line for item in search):
-                    matched = True
-                    break
-            self.assertTrue(not matched if inverse else matched, msg=search)
-
     def wait_for_domain_resolver(self, table, set_name, element, max_wait=10):
         # Resolver no longer blocks commit, need to wait for daemon to populate set
         count = 0
diff --git a/smoketest/scripts/cli/test_nat66.py b/smoketest/scripts/cli/test_nat66.py
index 0607f6616..400a895ff 100755
--- a/smoketest/scripts/cli/test_nat66.py
+++ b/smoketest/scripts/cli/test_nat66.py
@@ -22,8 +22,6 @@ import unittest
 from base_vyostest_shim import VyOSUnitTestSHIM
 
 from vyos.configsession import ConfigSessionError
-from vyos.utils.process import cmd
-from vyos.utils.dict import dict_search
 
 base_path = ['nat66']
 src_path = base_path + ['source']
@@ -42,17 +40,6 @@ class TestNAT66(VyOSUnitTestSHIM.TestCase):
         self.cli_delete(base_path)
         self.cli_commit()
 
-    def verify_nftables(self, nftables_search, table, inverse=False, args=''):
-        nftables_output = cmd(f'sudo nft {args} list table {table}')
-
-        for search in nftables_search:
-            matched = False
-            for line in nftables_output.split("\n"):
-                if all(item in line for item in search):
-                    matched = True
-                    break
-            self.assertTrue(not matched if inverse else matched, msg=search)
-
     def test_source_nat66(self):
         source_prefix = 'fc00::/64'
         translation_prefix = 'fc01::/64'
diff --git a/smoketest/scripts/cli/test_policy_route.py b/smoketest/scripts/cli/test_policy_route.py
index c0b7c1fe7..462fc24d0 100755
--- a/smoketest/scripts/cli/test_policy_route.py
+++ b/smoketest/scripts/cli/test_policy_route.py
@@ -68,17 +68,6 @@ class TestPolicyRoute(VyOSUnitTestSHIM.TestCase):
 
         self.verify_rules(ip_rule_search, inverse=True)
 
-    def verify_nftables(self, nftables_search, table, inverse=False):
-        nftables_output = cmd(f'sudo nft list table {table}')
-
-        for search in nftables_search:
-            matched = False
-            for line in nftables_output.split("\n"):
-                if all(item in line for item in search):
-                    matched = True
-                    break
-            self.assertTrue(not matched if inverse else matched, msg=search)
-
     def verify_rules(self, rules_search, inverse=False):
         rule_output = cmd('ip rule show')
 
diff --git a/smoketest/scripts/cli/test_system_conntrack.py b/smoketest/scripts/cli/test_system_conntrack.py
index ce237a6e7..f00626b3d 100755
--- a/smoketest/scripts/cli/test_system_conntrack.py
+++ b/smoketest/scripts/cli/test_system_conntrack.py
@@ -21,7 +21,6 @@ import unittest
 from base_vyostest_shim import VyOSUnitTestSHIM
 
 from vyos.firewall import find_nftables_rule
-from vyos.utils.process import cmd
 from vyos.utils.file import read_file
 
 base_path = ['system', 'conntrack']
@@ -43,17 +42,6 @@ class TestSystemConntrack(VyOSUnitTestSHIM.TestCase):
         self.cli_delete(base_path)
         self.cli_commit()
 
-    def verify_nftables(self, nftables_search, table, inverse=False, args=''):
-        nftables_output = cmd(f'sudo nft {args} list table {table}')
-
-        for search in nftables_search:
-            matched = False
-            for line in nftables_output.split("\n"):
-                if all(item in line for item in search):
-                    matched = True
-                    break
-            self.assertTrue(not matched if inverse else matched, msg=search)
-
     def test_conntrack_options(self):
         conntrack_config = {
             'net.netfilter.nf_conntrack_expect_max' : {
-- 
cgit v1.2.3