summaryrefslogtreecommitdiff
path: root/smoketest/scripts/cli/test_nat.py
diff options
context:
space:
mode:
authorChristian Poessinger <christian@poessinger.com>2021-03-17 19:12:04 +0100
committerChristian Poessinger <christian@poessinger.com>2021-03-17 19:18:17 +0100
commit0f3def974fbaa4a26e6ad590ee37dd965bc2358f (patch)
tree36cc09af1fefbc3f6a4f6ad7b946d00baaecb703 /smoketest/scripts/cli/test_nat.py
parent42d3cfbd3ee893ec567582a04467a899191f44fd (diff)
downloadvyos-1x-0f3def974fbaa4a26e6ad590ee37dd965bc2358f.tar.gz
vyos-1x-0f3def974fbaa4a26e6ad590ee37dd965bc2358f.zip
smoketest: add shim for every test to re-use common tasts
Currently every smoketest does the setup and destruction of the configsession on its own durin setUp(). This creates a lot of overhead and one configsession should be re-used during execution of every smoketest script. In addiion a test that failed will leaf the system in an unconsistent state. For this reason before the test is executed we will save the running config to /tmp and the will re-load the config after the test has passed, always ensuring a clean environment for the next test.
Diffstat (limited to 'smoketest/scripts/cli/test_nat.py')
-rwxr-xr-xsmoketest/scripts/cli/test_nat.py74
1 files changed, 37 insertions, 37 deletions
diff --git a/smoketest/scripts/cli/test_nat.py b/smoketest/scripts/cli/test_nat.py
index b5702d691..0706f234e 100755
--- a/smoketest/scripts/cli/test_nat.py
+++ b/smoketest/scripts/cli/test_nat.py
@@ -19,6 +19,7 @@ import jmespath
import json
import unittest
+from base_vyostest_shim import VyOSUnitTestSHIM
from vyos.configsession import ConfigSession
from vyos.configsession import ConfigSessionError
from vyos.util import cmd
@@ -28,16 +29,15 @@ base_path = ['nat']
src_path = base_path + ['source']
dst_path = base_path + ['destination']
-class TestNAT(unittest.TestCase):
+class TestNAT(VyOSUnitTestSHIM.TestCase):
def setUp(self):
# ensure we can also run this test on a live system - so lets clean
# out the current configuration :)
- self.session = ConfigSession(os.getpid())
- self.session.delete(base_path)
+ self.cli_delete(base_path)
def tearDown(self):
- self.session.delete(base_path)
- self.session.commit()
+ self.cli_delete(base_path)
+ self.cli_commit()
def test_snat(self):
rules = ['100', '110', '120', '130', '200', '210', '220', '230']
@@ -48,15 +48,15 @@ class TestNAT(unittest.TestCase):
# depending of rule order we check either for source address for NAT
# or configured destination address for NAT
if int(rule) < 200:
- self.session.set(src_path + ['rule', rule, 'source', 'address', network])
- self.session.set(src_path + ['rule', rule, 'outbound-interface', outbound_iface_100])
- self.session.set(src_path + ['rule', rule, 'translation', 'address', 'masquerade'])
+ self.cli_set(src_path + ['rule', rule, 'source', 'address', network])
+ self.cli_set(src_path + ['rule', rule, 'outbound-interface', outbound_iface_100])
+ self.cli_set(src_path + ['rule', rule, 'translation', 'address', 'masquerade'])
else:
- self.session.set(src_path + ['rule', rule, 'destination', 'address', network])
- self.session.set(src_path + ['rule', rule, 'outbound-interface', outbound_iface_200])
- self.session.set(src_path + ['rule', rule, 'exclude'])
+ self.cli_set(src_path + ['rule', rule, 'destination', 'address', network])
+ self.cli_set(src_path + ['rule', rule, 'outbound-interface', outbound_iface_200])
+ self.cli_set(src_path + ['rule', rule, 'exclude'])
- self.session.commit()
+ self.cli_commit()
tmp = cmd('sudo nft -j list table nat')
data_json = jmespath.search('nftables[?rule].rule[?chain]', json.loads(tmp))
@@ -98,17 +98,17 @@ class TestNAT(unittest.TestCase):
for rule in rules:
port = f'10{rule}'
- self.session.set(dst_path + ['rule', rule, 'source', 'port', port])
- self.session.set(dst_path + ['rule', rule, 'translation', 'address', '192.0.2.1'])
- self.session.set(dst_path + ['rule', rule, 'translation', 'port', port])
+ self.cli_set(dst_path + ['rule', rule, 'source', 'port', port])
+ self.cli_set(dst_path + ['rule', rule, 'translation', 'address', '192.0.2.1'])
+ self.cli_set(dst_path + ['rule', rule, 'translation', 'port', port])
if int(rule) < 200:
- self.session.set(dst_path + ['rule', rule, 'protocol', inbound_proto_100])
- self.session.set(dst_path + ['rule', rule, 'inbound-interface', inbound_iface_100])
+ self.cli_set(dst_path + ['rule', rule, 'protocol', inbound_proto_100])
+ self.cli_set(dst_path + ['rule', rule, 'inbound-interface', inbound_iface_100])
else:
- self.session.set(dst_path + ['rule', rule, 'protocol', inbound_proto_200])
- self.session.set(dst_path + ['rule', rule, 'inbound-interface', inbound_iface_200])
+ self.cli_set(dst_path + ['rule', rule, 'protocol', inbound_proto_200])
+ self.cli_set(dst_path + ['rule', rule, 'inbound-interface', inbound_iface_200])
- self.session.commit()
+ self.cli_commit()
tmp = cmd('sudo nft -j list table nat')
data_json = jmespath.search('nftables[?rule].rule[?chain]', json.loads(tmp))
@@ -141,31 +141,31 @@ class TestNAT(unittest.TestCase):
def test_snat_required_translation_address(self):
# T2813: Ensure translation address is specified
rule = '5'
- self.session.set(src_path + ['rule', rule, 'source', 'address', '192.0.2.0/24'])
+ self.cli_set(src_path + ['rule', rule, 'source', 'address', '192.0.2.0/24'])
# check validate() - outbound-interface must be defined
with self.assertRaises(ConfigSessionError):
- self.session.commit()
- self.session.set(src_path + ['rule', rule, 'outbound-interface', 'eth0'])
+ self.cli_commit()
+ self.cli_set(src_path + ['rule', rule, 'outbound-interface', 'eth0'])
# check validate() - translation address not specified
with self.assertRaises(ConfigSessionError):
- self.session.commit()
+ self.cli_commit()
- self.session.set(src_path + ['rule', rule, 'translation', 'address', 'masquerade'])
- self.session.commit()
+ self.cli_set(src_path + ['rule', rule, 'translation', 'address', 'masquerade'])
+ self.cli_commit()
def test_dnat_negated_addresses(self):
# T3186: negated addresses are not accepted by nftables
rule = '1000'
- self.session.set(dst_path + ['rule', rule, 'destination', 'address', '!192.0.2.1'])
- self.session.set(dst_path + ['rule', rule, 'destination', 'port', '53'])
- self.session.set(dst_path + ['rule', rule, 'inbound-interface', 'eth0'])
- self.session.set(dst_path + ['rule', rule, 'protocol', 'tcp_udp'])
- self.session.set(dst_path + ['rule', rule, 'source', 'address', '!192.0.2.1'])
- self.session.set(dst_path + ['rule', rule, 'translation', 'address', '192.0.2.1'])
- self.session.set(dst_path + ['rule', rule, 'translation', 'port', '53'])
- self.session.commit()
+ self.cli_set(dst_path + ['rule', rule, 'destination', 'address', '!192.0.2.1'])
+ self.cli_set(dst_path + ['rule', rule, 'destination', 'port', '53'])
+ self.cli_set(dst_path + ['rule', rule, 'inbound-interface', 'eth0'])
+ self.cli_set(dst_path + ['rule', rule, 'protocol', 'tcp_udp'])
+ self.cli_set(dst_path + ['rule', rule, 'source', 'address', '!192.0.2.1'])
+ self.cli_set(dst_path + ['rule', rule, 'translation', 'address', '192.0.2.1'])
+ self.cli_set(dst_path + ['rule', rule, 'translation', 'port', '53'])
+ self.cli_commit()
def test_nat_no_rules(self):
# T3206: deleting all rules but keep the direction 'destination' or
@@ -173,9 +173,9 @@ class TestNAT(unittest.TestCase):
#
# Test that both 'nat destination' and 'nat source' nodes can exist
# without any rule
- self.session.set(src_path)
- self.session.set(dst_path)
- self.session.commit()
+ self.cli_set(src_path)
+ self.cli_set(dst_path)
+ self.cli_commit()
if __name__ == '__main__':