summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChristian Poessinger <christian@poessinger.com>2021-03-17 19:12:04 +0100
committerChristian Poessinger <christian@poessinger.com>2021-07-25 21:10:25 +0200
commitfb94d0b6091e09099c972355886918ef63ee9d97 (patch)
tree3f9d9c09f12d328bac4e79fab697dd19a294f055
parentc114e1904571ff3c323a9068d8b3784d9a238d0a (diff)
downloadvyos-1x-fb94d0b6091e09099c972355886918ef63ee9d97.tar.gz
vyos-1x-fb94d0b6091e09099c972355886918ef63ee9d97.zip
smoketest: add shim for every test to re-use common tasts
Currently every smoketest does the setup and destruction of the configsession on its own durin setUp(). This creates a lot of overhead and one configsession should be re-used during execution of every smoketest script. In addiion a test that failed will leaf the system in an unconsistent state. For this reason before the test is executed we will save the running config to /tmp and the will re-load the config after the test has passed, always ensuring a clean environment for the next test. (cherry picked from commit 0f3def974fbaa4a26e6ad590ee37dd965bc2358f)
-rw-r--r--smoketest/scripts/cli/base_accel_ppp_test.py30
-rw-r--r--smoketest/scripts/cli/base_interfaces_test.py185
-rw-r--r--smoketest/scripts/cli/base_vyostest_shim.py90
-rwxr-xr-xsmoketest/scripts/cli/test_interfaces_bonding.py31
-rwxr-xr-xsmoketest/scripts/cli/test_interfaces_bridge.py76
-rwxr-xr-xsmoketest/scripts/cli/test_interfaces_dummy.py4
-rwxr-xr-xsmoketest/scripts/cli/test_interfaces_ethernet.py50
-rwxr-xr-xsmoketest/scripts/cli/test_interfaces_geneve.py4
-rwxr-xr-xsmoketest/scripts/cli/test_interfaces_l2tpv3.py10
-rwxr-xr-xsmoketest/scripts/cli/test_interfaces_loopback.py13
-rwxr-xr-xsmoketest/scripts/cli/test_interfaces_macsec.py84
-rwxr-xr-xsmoketest/scripts/cli/test_interfaces_openvpn.py349
-rwxr-xr-xsmoketest/scripts/cli/test_interfaces_pppoe.py83
-rwxr-xr-xsmoketest/scripts/cli/test_interfaces_pseudo_ethernet.py4
-rwxr-xr-xsmoketest/scripts/cli/test_interfaces_tunnel.py142
-rwxr-xr-xsmoketest/scripts/cli/test_interfaces_vxlan.py9
-rwxr-xr-xsmoketest/scripts/cli/test_interfaces_wireguard.py52
-rwxr-xr-xsmoketest/scripts/cli/test_interfaces_wireless.py58
-rwxr-xr-xsmoketest/scripts/cli/test_nat.py74
-rwxr-xr-xsmoketest/scripts/cli/test_policy.py143
-rwxr-xr-xsmoketest/scripts/cli/test_policy_local-route.py23
-rwxr-xr-xsmoketest/scripts/cli/test_protocols_bfd.py36
-rwxr-xr-xsmoketest/scripts/cli/test_protocols_bgp.py102
-rwxr-xr-xsmoketest/scripts/cli/test_protocols_igmp-proxy.py41
-rwxr-xr-xsmoketest/scripts/cli/test_protocols_ospfv3.py46
-rwxr-xr-xsmoketest/scripts/cli/test_protocols_rip.py85
-rwxr-xr-xsmoketest/scripts/cli/test_protocols_ripng.py89
-rwxr-xr-xsmoketest/scripts/cli/test_protocols_rpki.py51
-rwxr-xr-xsmoketest/scripts/cli/test_protocols_static.py32
-rwxr-xr-xsmoketest/scripts/cli/test_service_bcast-relay.py37
-rwxr-xr-xsmoketest/scripts/cli/test_service_dhcp-relay.py35
-rwxr-xr-xsmoketest/scripts/cli/test_service_dhcp-server.py169
-rwxr-xr-xsmoketest/scripts/cli/test_service_dhcpv6-relay.py31
-rwxr-xr-xsmoketest/scripts/cli/test_service_dhcpv6-server.py71
-rwxr-xr-xsmoketest/scripts/cli/test_service_dns_dynamic.py88
-rwxr-xr-xsmoketest/scripts/cli/test_service_dns_forwarding.py62
-rwxr-xr-xsmoketest/scripts/cli/test_service_https.py23
-rwxr-xr-xsmoketest/scripts/cli/test_service_mdns-repeater.py27
-rwxr-xr-xsmoketest/scripts/cli/test_service_pppoe-server.py16
-rwxr-xr-xsmoketest/scripts/cli/test_service_router-advert.py27
-rwxr-xr-xsmoketest/scripts/cli/test_service_snmp.py76
-rwxr-xr-xsmoketest/scripts/cli/test_service_ssh.py48
-rwxr-xr-xsmoketest/scripts/cli/test_service_tftp-server.py33
-rwxr-xr-xsmoketest/scripts/cli/test_service_webproxy.py97
-rwxr-xr-xsmoketest/scripts/cli/test_system_acceleration_qat.py17
-rwxr-xr-xsmoketest/scripts/cli/test_system_ip.py26
-rwxr-xr-xsmoketest/scripts/cli/test_system_ipv6.py32
-rwxr-xr-xsmoketest/scripts/cli/test_system_lcd.py19
-rwxr-xr-xsmoketest/scripts/cli/test_system_login.py61
-rwxr-xr-xsmoketest/scripts/cli/test_system_nameserver.py30
-rwxr-xr-xsmoketest/scripts/cli/test_system_ntp.py29
-rwxr-xr-xsmoketest/scripts/cli/test_vpn_openconnect.py32
-rwxr-xr-xsmoketest/scripts/cli/test_vpn_sstp.py6
-rwxr-xr-xsmoketest/scripts/cli/test_vrf.py72
54 files changed, 1627 insertions, 1533 deletions
diff --git a/smoketest/scripts/cli/base_accel_ppp_test.py b/smoketest/scripts/cli/base_accel_ppp_test.py
index 705c932b4..b2acb03cc 100644
--- a/smoketest/scripts/cli/base_accel_ppp_test.py
+++ b/smoketest/scripts/cli/base_accel_ppp_test.py
@@ -12,10 +12,10 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-import os
import re
import unittest
+from base_vyostest_shim import VyOSUnitTestSHIM
from configparser import ConfigParser
from vyos.configsession import ConfigSession
@@ -26,26 +26,22 @@ from vyos.util import get_half_cpus
from vyos.util import process_named_running
class BasicAccelPPPTest:
- class BaseTest(unittest.TestCase):
-
+ class TestCase(VyOSUnitTestSHIM.TestCase):
def setUp(self):
- self.session = ConfigSession(os.getpid())
self._gateway = '192.0.2.1'
-
# ensure we can also run this test on a live system - so lets clean
# out the current configuration :)
- self.session.delete(self._base_path)
+ self.cli_delete(self._base_path)
def tearDown(self):
- self.session.delete(self._base_path)
- self.session.commit()
- del self.session
+ self.cli_delete(self._base_path)
+ self.cli_commit()
def set(self, path):
- self.session.set(self._base_path + path)
+ self.cli_set(self._base_path + path)
def delete(self, path):
- self.session.delete(self._base_path + path)
+ self.cli_delete(self._base_path + path)
def basic_config(self):
# PPPoE local auth mode requires local users to be configured!
@@ -65,7 +61,7 @@ class BasicAccelPPPTest:
self.set(['name-server', ns])
# commit changes
- self.session.commit()
+ self.cli_commit()
# Validate configuration values
conf = ConfigParser(allow_no_value=True, delimiters='=')
@@ -95,11 +91,11 @@ class BasicAccelPPPTest:
# upload rate-limit requires also download rate-limit
with self.assertRaises(ConfigSessionError):
- self.session.commit()
+ self.cli_commit()
self.set(['authentication', 'local-users', 'username', user, 'rate-limit', 'download', download])
# commit changes
- self.session.commit()
+ self.cli_commit()
# Validate configuration values
conf = ConfigParser(allow_no_value=True, delimiters='=')
@@ -123,7 +119,7 @@ class BasicAccelPPPTest:
# Check local-users default value(s)
self.delete(['authentication', 'local-users', 'username', user, 'static-ip'])
# commit changes
- self.session.commit()
+ self.cli_commit()
# check local users
tmp = cmd(f'sudo cat {self._chap_secrets}')
@@ -162,7 +158,7 @@ class BasicAccelPPPTest:
self.set(['authentication', 'radius', 'source-address', source_address])
# commit changes
- self.session.commit()
+ self.cli_commit()
# Validate configuration values
conf = ConfigParser(allow_no_value=True, delimiters='=')
@@ -200,7 +196,7 @@ class BasicAccelPPPTest:
self.set(['authentication', 'radius', 'server', radius_server, 'disable-accounting'])
# commit changes
- self.session.commit()
+ self.cli_commit()
conf.read(self._config_file)
diff --git a/smoketest/scripts/cli/base_interfaces_test.py b/smoketest/scripts/cli/base_interfaces_test.py
index 110309780..6f8eda26a 100644
--- a/smoketest/scripts/cli/base_interfaces_test.py
+++ b/smoketest/scripts/cli/base_interfaces_test.py
@@ -22,6 +22,8 @@ from netifaces import AF_INET6
from netifaces import ifaddresses
from netifaces import interfaces
+from base_vyostest_shim import VyOSUnitTestSHIM
+
from vyos.configsession import ConfigSession
from vyos.configsession import ConfigSessionError
from vyos.ifconfig import Interface
@@ -53,7 +55,7 @@ def is_mirrored_to(interface, mirror_if, qdisc):
return ret_val
class BasicInterfaceTest:
- class BaseTest(unittest.TestCase):
+ class TestCase(VyOSUnitTestSHIM.TestCase):
_test_ip = False
_test_mtu = False
_test_vlan = False
@@ -69,29 +71,26 @@ class BasicInterfaceTest:
_qinq_range = ['10', '20', '30']
_vlan_range = ['100', '200', '300', '2000']
_test_addr = ['192.0.2.1/26', '192.0.2.255/31', '192.0.2.64/32',
- '2001:db8:1::ffff/64', '2001:db8:101::1/112']
+ '2001:db8:1::ffff/64', '2001:db8:101::1/128']
_mirror_interfaces = []
# choose IPv6 minimum MTU value for tests - this must always work
_mtu = '1280'
def setUp(self):
- self.session = ConfigSession(os.getpid())
-
# Setup mirror interfaces for SPAN (Switch Port Analyzer)
for span in self._mirror_interfaces:
section = Section.section(span)
- self.session.set(['interfaces', section, span])
+ self.cli_set(['interfaces', section, span])
def tearDown(self):
# Tear down mirror interfaces for SPAN (Switch Port Analyzer)
for span in self._mirror_interfaces:
section = Section.section(span)
- self.session.delete(['interfaces', section, span])
+ self.cli_delete(['interfaces', section, span])
- self.session.delete(self._base_path)
- self.session.commit()
- del self.session
+ self.cli_delete(self._base_path)
+ self.cli_commit()
# Verify that no previously interface remained on the system
for intf in self._interfaces:
@@ -104,10 +103,10 @@ class BasicInterfaceTest:
# Check the two-way mirror rules of ingress and egress
for mirror in self._mirror_interfaces:
for interface in self._interfaces:
- self.session.set(self._base_path + [interface, 'mirror', 'ingress', mirror])
- self.session.set(self._base_path + [interface, 'mirror', 'egress', mirror])
+ self.cli_set(self._base_path + [interface, 'mirror', 'ingress', mirror])
+ self.cli_set(self._base_path + [interface, 'mirror', 'egress', mirror])
- self.session.commit()
+ self.cli_commit()
# Verify config
for mirror in self._mirror_interfaces:
@@ -119,11 +118,11 @@ class BasicInterfaceTest:
# Check if description can be added to interface and
# can be read back
for intf in self._interfaces:
- self.session.set(self._base_path + [intf, 'disable'])
+ self.cli_set(self._base_path + [intf, 'disable'])
for option in self._options.get(intf, []):
- self.session.set(self._base_path + [intf] + option.split())
+ self.cli_set(self._base_path + [intf] + option.split())
- self.session.commit()
+ self.cli_commit()
# Validate interface description
for intf in self._interfaces:
@@ -134,11 +133,11 @@ class BasicInterfaceTest:
# can be read back
for intf in self._interfaces:
test_string=f'Description-Test-{intf}'
- self.session.set(self._base_path + [intf, 'description', test_string])
+ self.cli_set(self._base_path + [intf, 'description', test_string])
for option in self._options.get(intf, []):
- self.session.set(self._base_path + [intf] + option.split())
+ self.cli_set(self._base_path + [intf] + option.split())
- self.session.commit()
+ self.cli_commit()
# Validate interface description
for intf in self._interfaces:
@@ -146,9 +145,9 @@ class BasicInterfaceTest:
tmp = read_file(f'/sys/class/net/{intf}/ifalias')
self.assertEqual(tmp, test_string)
self.assertEqual(Interface(intf).get_alias(), test_string)
- self.session.delete(self._base_path + [intf, 'description'])
+ self.cli_delete(self._base_path + [intf, 'description'])
- self.session.commit()
+ self.cli_commit()
# Validate remove interface description "empty"
for intf in self._interfaces:
@@ -159,11 +158,11 @@ class BasicInterfaceTest:
def test_add_single_ip_address(self):
addr = '192.0.2.0/31'
for intf in self._interfaces:
- self.session.set(self._base_path + [intf, 'address', addr])
+ self.cli_set(self._base_path + [intf, 'address', addr])
for option in self._options.get(intf, []):
- self.session.set(self._base_path + [intf] + option.split())
+ self.cli_set(self._base_path + [intf] + option.split())
- self.session.commit()
+ self.cli_commit()
for intf in self._interfaces:
self.assertTrue(is_intf_addr_assigned(intf, addr))
@@ -173,11 +172,11 @@ class BasicInterfaceTest:
# Add address
for intf in self._interfaces:
for addr in self._test_addr:
- self.session.set(self._base_path + [intf, 'address', addr])
+ self.cli_set(self._base_path + [intf, 'address', addr])
for option in self._options.get(intf, []):
- self.session.set(self._base_path + [intf] + option.split())
+ self.cli_set(self._base_path + [intf] + option.split())
- self.session.commit()
+ self.cli_commit()
# Validate address
for intf in self._interfaces:
@@ -197,10 +196,10 @@ class BasicInterfaceTest:
for interface in self._interfaces:
base = self._base_path + [interface]
for option in self._options.get(interface, []):
- self.session.set(base + option.split())
+ self.cli_set(base + option.split())
# after commit we must have an IPv6 link-local address
- self.session.commit()
+ self.cli_commit()
for interface in self._interfaces:
for addr in ifaddresses(interface)[AF_INET6]:
@@ -209,10 +208,10 @@ class BasicInterfaceTest:
# disable IPv6 link-local address assignment
for interface in self._interfaces:
base = self._base_path + [interface]
- self.session.set(base + ['ipv6', 'address', 'no-default-link-local'])
+ self.cli_set(base + ['ipv6', 'address', 'no-default-link-local'])
# after commit we must have no IPv6 link-local address
- self.session.commit()
+ self.cli_commit()
for interface in self._interfaces:
self.assertTrue(AF_INET6 not in ifaddresses(interface))
@@ -223,12 +222,12 @@ class BasicInterfaceTest:
for intf in self._interfaces:
base = self._base_path + [intf]
- self.session.set(base + ['mtu', self._mtu])
+ self.cli_set(base + ['mtu', self._mtu])
for option in self._options.get(intf, []):
- self.session.set(base + option.split())
+ self.cli_set(base + option.split())
# commit interface changes
- self.session.commit()
+ self.cli_commit()
# verify changed MTU
for intf in self._interfaces:
@@ -246,14 +245,14 @@ class BasicInterfaceTest:
for intf in self._interfaces:
base = self._base_path + [intf]
- self.session.set(base + ['mtu', self._mtu])
- self.session.set(base + ['ipv6', 'address', 'no-default-link-local'])
+ self.cli_set(base + ['mtu', self._mtu])
+ self.cli_set(base + ['ipv6', 'address', 'no-default-link-local'])
for option in self._options.get(intf, []):
- self.session.set(base + option.split())
+ self.cli_set(base + option.split())
# commit interface changes
- self.session.commit()
+ self.cli_commit()
# verify changed MTU
for intf in self._interfaces:
@@ -273,15 +272,15 @@ class BasicInterfaceTest:
for interface in self._interfaces:
base = self._base_path + [interface]
for option in self._options.get(interface, []):
- self.session.set(base + option.split())
+ self.cli_set(base + option.split())
for vlan in self._vlan_range:
base = self._base_path + [interface, 'vif', vlan]
- self.session.set(base + ['mtu', self._mtu])
+ self.cli_set(base + ['mtu', self._mtu])
for address in self._test_addr:
- self.session.set(base + ['address', address])
+ self.cli_set(base + ['address', address])
- self.session.commit()
+ self.cli_commit()
for intf in self._interfaces:
for vlan in self._vlan_range:
@@ -306,29 +305,29 @@ class BasicInterfaceTest:
for interface in self._interfaces:
base = self._base_path + [interface]
- self.session.set(base + ['mtu', mtu_1500])
+ self.cli_set(base + ['mtu', mtu_1500])
for option in self._options.get(interface, []):
- self.session.set(base + option.split())
+ self.cli_set(base + option.split())
if 'source-interface' in option:
iface = option.split()[-1]
iface_type = Section.section(iface)
- self.session.set(['interfaces', iface_type, iface, 'mtu', mtu_9000])
+ self.cli_set(['interfaces', iface_type, iface, 'mtu', mtu_9000])
for vlan in self._vlan_range:
base = self._base_path + [interface, 'vif', vlan]
- self.session.set(base + ['mtu', mtu_9000])
+ self.cli_set(base + ['mtu', mtu_9000])
# check validate() - VIF MTU must not be larger the parent interface
# MTU size.
with self.assertRaises(ConfigSessionError):
- self.session.commit()
+ self.cli_commit()
# Change MTU on base interface to be the same as on the VIF interface
for interface in self._interfaces:
base = self._base_path + [interface]
- self.session.set(base + ['mtu', mtu_9000])
+ self.cli_set(base + ['mtu', mtu_9000])
- self.session.commit()
+ self.cli_commit()
# Verify MTU on base and VIF interfaces
for interface in self._interfaces:
@@ -348,24 +347,24 @@ class BasicInterfaceTest:
for interface in self._interfaces:
base = self._base_path + [interface]
for option in self._options.get(interface, []):
- self.session.set(base + option.split())
+ self.cli_set(base + option.split())
# disable the lower interface
- self.session.set(base + ['disable'])
+ self.cli_set(base + ['disable'])
for vlan in self._vlan_range:
vlan_base = self._base_path + [interface, 'vif', vlan]
# disable the vlan interface
- self.session.set(vlan_base + ['disable'])
+ self.cli_set(vlan_base + ['disable'])
- self.session.commit()
+ self.cli_commit()
# re-enable all lower interfaces
for interface in self._interfaces:
base = self._base_path + [interface]
- self.session.delete(base + ['disable'])
+ self.cli_delete(base + ['disable'])
- self.session.commit()
+ self.cli_commit()
# verify that the lower interfaces are admin up and the vlan
# interfaces are all admin down
@@ -388,16 +387,16 @@ class BasicInterfaceTest:
for interface in self._interfaces:
base = self._base_path + [interface]
for option in self._options.get(interface, []):
- self.session.set(base + option.split())
+ self.cli_set(base + option.split())
for vif_s in self._qinq_range:
for vif_c in self._vlan_range:
base = self._base_path + [interface, 'vif-s', vif_s, 'vif-c', vif_c]
- self.session.set(base + ['mtu', self._mtu])
+ self.cli_set(base + ['mtu', self._mtu])
for address in self._test_addr:
- self.session.set(base + ['address', address])
+ self.cli_set(base + ['address', address])
- self.session.commit()
+ self.cli_commit()
for interface in self._interfaces:
for vif_s in self._qinq_range:
@@ -420,20 +419,20 @@ class BasicInterfaceTest:
arp_tmo = '300'
path = self._base_path + [interface]
for option in self._options.get(interface, []):
- self.session.set(path + option.split())
+ self.cli_set(path + option.split())
# Options
- self.session.set(path + ['ip', 'arp-cache-timeout', arp_tmo])
- self.session.set(path + ['ip', 'disable-arp-filter'])
- self.session.set(path + ['ip', 'disable-forwarding'])
- self.session.set(path + ['ip', 'enable-arp-accept'])
- self.session.set(path + ['ip', 'enable-arp-announce'])
- self.session.set(path + ['ip', 'enable-arp-ignore'])
- self.session.set(path + ['ip', 'enable-proxy-arp'])
- self.session.set(path + ['ip', 'proxy-arp-pvlan'])
- self.session.set(path + ['ip', 'source-validation', 'loose'])
-
- self.session.commit()
+ self.cli_set(path + ['ip', 'arp-cache-timeout', arp_tmo])
+ self.cli_set(path + ['ip', 'disable-arp-filter'])
+ self.cli_set(path + ['ip', 'disable-forwarding'])
+ self.cli_set(path + ['ip', 'enable-arp-accept'])
+ self.cli_set(path + ['ip', 'enable-arp-announce'])
+ self.cli_set(path + ['ip', 'enable-arp-ignore'])
+ self.cli_set(path + ['ip', 'enable-proxy-arp'])
+ self.cli_set(path + ['ip', 'proxy-arp-pvlan'])
+ self.cli_set(path + ['ip', 'source-validation', 'loose'])
+
+ self.cli_commit()
for interface in self._interfaces:
tmp = read_file(f'/proc/sys/net/ipv4/neigh/{interface}/base_reachable_time_ms')
@@ -471,13 +470,13 @@ class BasicInterfaceTest:
dad_transmits = '10'
path = self._base_path + [interface]
for option in self._options.get(interface, []):
- self.session.set(path + option.split())
+ self.cli_set(path + option.split())
# Options
- self.session.set(path + ['ipv6', 'disable-forwarding'])
- self.session.set(path + ['ipv6', 'dup-addr-detect-transmits', dad_transmits])
+ self.cli_set(path + ['ipv6', 'disable-forwarding'])
+ self.cli_set(path + ['ipv6', 'dup-addr-detect-transmits', dad_transmits])
- self.session.commit()
+ self.cli_commit()
for interface in self._interfaces:
tmp = read_file(f'/proc/sys/net/ipv6/conf/{interface}/forwarding')
@@ -495,16 +494,16 @@ class BasicInterfaceTest:
duid = '00:01:00:01:27:71:db:f0:00:50:00:00:00:{}'.format(duid_base)
path = self._base_path + [interface]
for option in self._options.get(interface, []):
- self.session.set(path + option.split())
+ self.cli_set(path + option.split())
# Enable DHCPv6 client
- self.session.set(path + ['address', 'dhcpv6'])
- self.session.set(path + ['dhcpv6-options', 'rapid-commit'])
- self.session.set(path + ['dhcpv6-options', 'parameters-only'])
- self.session.set(path + ['dhcpv6-options', 'duid', duid])
+ self.cli_set(path + ['address', 'dhcpv6'])
+ self.cli_set(path + ['dhcpv6-options', 'rapid-commit'])
+ self.cli_set(path + ['dhcpv6-options', 'parameters-only'])
+ self.cli_set(path + ['dhcpv6-options', 'duid', duid])
duid_base += 1
- self.session.commit()
+ self.cli_commit()
duid_base = 10
for interface in self._interfaces:
@@ -535,21 +534,21 @@ class BasicInterfaceTest:
for interface in self._interfaces:
path = self._base_path + [interface]
for option in self._options.get(interface, []):
- self.session.set(path + option.split())
+ self.cli_set(path + option.split())
address = '1'
# prefix delegation stuff
pd_base = path + ['dhcpv6-options', 'pd', '0']
- self.session.set(pd_base + ['length', prefix_len])
+ self.cli_set(pd_base + ['length', prefix_len])
for delegatee in delegatees:
section = Section.section(delegatee)
- self.session.set(['interfaces', section, delegatee])
- self.session.set(pd_base + ['interface', delegatee, 'address', address])
+ self.cli_set(['interfaces', section, delegatee])
+ self.cli_set(pd_base + ['interface', delegatee, 'address', address])
# increment interface address
address = str(int(address) + 1)
- self.session.commit()
+ self.cli_commit()
for interface in self._interfaces:
dhcpc6_config = read_file(f'/run/dhcp6c/dhcp6c.{interface}.conf')
@@ -577,7 +576,7 @@ class BasicInterfaceTest:
# we can already cleanup the test delegatee interface here
# as until commit() is called, nothing happens
section = Section.section(delegatee)
- self.session.delete(['interfaces', section, delegatee])
+ self.cli_delete(['interfaces', section, delegatee])
def test_dhcpv6pd_manual_sla_id(self):
if not self._test_ipv6_pd:
@@ -591,25 +590,25 @@ class BasicInterfaceTest:
for interface in self._interfaces:
path = self._base_path + [interface]
for option in self._options.get(interface, []):
- self.session.set(path + option.split())
+ self.cli_set(path + option.split())
# prefix delegation stuff
address = '1'
sla_id = '1'
pd_base = path + ['dhcpv6-options', 'pd', '0']
- self.session.set(pd_base + ['length', prefix_len])
+ self.cli_set(pd_base + ['length', prefix_len])
for delegatee in delegatees:
section = Section.section(delegatee)
- self.session.set(['interfaces', section, delegatee])
- self.session.set(pd_base + ['interface', delegatee, 'address', address])
- self.session.set(pd_base + ['interface', delegatee, 'sla-id', sla_id])
+ self.cli_set(['interfaces', section, delegatee])
+ self.cli_set(pd_base + ['interface', delegatee, 'address', address])
+ self.cli_set(pd_base + ['interface', delegatee, 'sla-id', sla_id])
# increment interface address
address = str(int(address) + 1)
sla_id = str(int(sla_id) + 1)
- self.session.commit()
+ self.cli_commit()
# Verify dhcpc6 client configuration
for interface in self._interfaces:
@@ -638,4 +637,4 @@ class BasicInterfaceTest:
# we can already cleanup the test delegatee interface here
# as until commit() is called, nothing happens
section = Section.section(delegatee)
- self.session.delete(['interfaces', section, delegatee])
+ self.cli_delete(['interfaces', section, delegatee])
diff --git a/smoketest/scripts/cli/base_vyostest_shim.py b/smoketest/scripts/cli/base_vyostest_shim.py
new file mode 100644
index 000000000..18e4e567e
--- /dev/null
+++ b/smoketest/scripts/cli/base_vyostest_shim.py
@@ -0,0 +1,90 @@
+# Copyright (C) 2021 VyOS maintainers and contributors
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License version 2 or later as
+# published by the Free Software Foundation.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+import os
+import unittest
+
+from time import sleep
+
+from vyos.configsession import ConfigSession
+from vyos.configsession import ConfigSessionError
+from vyos import ConfigError
+from vyos.util import cmd
+
+save_config = '/tmp/vyos-smoketest-save'
+
+# This class acts as shim between individual Smoketests developed for VyOS and
+# the Python UnitTest framework. Before every test is loaded, we dump the current
+# system configuration and reload it after the test - despite the test results.
+#
+# Using this approach we can not render a live system useless while running any
+# kind of smoketest. In addition it adds debug capabilities like printing the
+# command used to execute the test.
+class VyOSUnitTestSHIM:
+ class TestCase(unittest.TestCase):
+ # if enabled in derived class, print out each and every set/del command
+ # on the CLI. This is usefull to grap all the commands required to
+ # trigger the certain failure condition.
+ # Use "self.debug = True" in derived classes setUp() method
+ debug = False
+
+ @classmethod
+ def setUpClass(cls):
+ cls._session = ConfigSession(os.getpid())
+ cls._session.save_config(save_config)
+ pass
+
+ @classmethod
+ def tearDownClass(cls):
+ # discard any pending changes which might caused a messed up config
+ cls._session.discard()
+ # ... and restore the initial state
+ cls._session.migrate_and_load_config(save_config)
+
+ try:
+ cls._session.commit()
+ except (ConfigError, ConfigSessionError):
+ cls._session.discard()
+ cls.fail(cls)
+
+ def cli_set(self, config):
+ if self.debug:
+ print('set ' + ' '.join(config))
+ self._session.set(config)
+
+ def cli_delete(self, config):
+ if self.debug:
+ print('del ' + ' '.join(config))
+ self._session.delete(config)
+
+ def cli_commit(self):
+ self._session.commit()
+
+ def getFRRconfig(self, string, end='$'):
+ """ Retrieve current "running configuration" from FRR """
+ command = f'vtysh -c "show run" | sed -n "/^{string}{end}/,/^!/p"'
+
+ count = 0
+ tmp = ''
+ while count < 10 and tmp == '':
+ # Let FRR settle after a config change first before harassing it again
+ sleep(1)
+ tmp = cmd(command)
+ count += 1
+
+ if self.debug or tmp == '':
+ import pprint
+ print(f'\n\ncommand "{command}" returned:\n')
+ pprint.pprint(tmp)
+ return tmp
diff --git a/smoketest/scripts/cli/test_interfaces_bonding.py b/smoketest/scripts/cli/test_interfaces_bonding.py
index 880a822a7..86000553e 100755
--- a/smoketest/scripts/cli/test_interfaces_bonding.py
+++ b/smoketest/scripts/cli/test_interfaces_bonding.py
@@ -25,7 +25,7 @@ from vyos.configsession import ConfigSessionError
from vyos.util import get_interface_config
from vyos.util import read_file
-class BondingInterfaceTest(BasicInterfaceTest.BaseTest):
+class BondingInterfaceTest(BasicInterfaceTest.TestCase):
@classmethod
def setUpClass(cls):
cls._test_ip = True
@@ -53,6 +53,9 @@ class BondingInterfaceTest(BasicInterfaceTest.BaseTest):
for member in cls._members:
cls._options['bond0'].append(f'member interface {member}')
+ # call base-classes classmethod
+ super(cls, cls).setUpClass()
+
def test_add_single_ip_address(self):
super().test_add_single_ip_address()
@@ -75,16 +78,16 @@ class BondingInterfaceTest(BasicInterfaceTest.BaseTest):
# configure member interfaces
for interface in self._interfaces:
for option in self._options.get(interface, []):
- self.session.set(self._base_path + [interface] + option.split())
+ self.cli_set(self._base_path + [interface] + option.split())
- self.session.commit()
+ self.cli_commit()
# remove single bond member port
for interface in self._interfaces:
remove_member = self._members[0]
- self.session.delete(self._base_path + [interface, 'member', 'interface', remove_member])
+ self.cli_delete(self._base_path + [interface, 'member', 'interface', remove_member])
- self.session.commit()
+ self.cli_commit()
# removed member port must be admin-up
for interface in self._interfaces:
@@ -97,11 +100,11 @@ class BondingInterfaceTest(BasicInterfaceTest.BaseTest):
min_links = len(self._interfaces)
for interface in self._interfaces:
for option in self._options.get(interface, []):
- self.session.set(self._base_path + [interface] + option.split())
+ self.cli_set(self._base_path + [interface] + option.split())
- self.session.set(self._base_path + [interface, 'min-links', str(min_links)])
+ self.cli_set(self._base_path + [interface, 'min-links', str(min_links)])
- self.session.commit()
+ self.cli_commit()
# verify config
for interface in self._interfaces:
@@ -116,11 +119,11 @@ class BondingInterfaceTest(BasicInterfaceTest.BaseTest):
lacp_rate = 'fast'
for interface in self._interfaces:
for option in self._options.get(interface, []):
- self.session.set(self._base_path + [interface] + option.split())
+ self.cli_set(self._base_path + [interface] + option.split())
- self.session.set(self._base_path + [interface, 'lacp-rate', lacp_rate])
+ self.cli_set(self._base_path + [interface, 'lacp-rate', lacp_rate])
- self.session.commit()
+ self.cli_commit()
# verify config
for interface in self._interfaces:
@@ -136,11 +139,11 @@ class BondingInterfaceTest(BasicInterfaceTest.BaseTest):
for hash_policy in hash_policies:
for interface in self._interfaces:
for option in self._options.get(interface, []):
- self.session.set(self._base_path + [interface] + option.split())
+ self.cli_set(self._base_path + [interface] + option.split())
- self.session.set(self._base_path + [interface, 'hash-policy', hash_policy])
+ self.cli_set(self._base_path + [interface, 'hash-policy', hash_policy])
- self.session.commit()
+ self.cli_commit()
# verify config
for interface in self._interfaces:
diff --git a/smoketest/scripts/cli/test_interfaces_bridge.py b/smoketest/scripts/cli/test_interfaces_bridge.py
index 5e7b2bbd2..4014c1a4c 100755
--- a/smoketest/scripts/cli/test_interfaces_bridge.py
+++ b/smoketest/scripts/cli/test_interfaces_bridge.py
@@ -25,8 +25,10 @@ from netifaces import interfaces
from vyos.ifconfig import Section
from vyos.util import cmd
from vyos.util import read_file
+from vyos.util import get_interface_config
+from vyos.validate import is_intf_addr_assigned
-class BridgeInterfaceTest(BasicInterfaceTest.BaseTest):
+class BridgeInterfaceTest(BasicInterfaceTest.TestCase):
@classmethod
def setUpClass(cls):
cls._test_ip = True
@@ -52,46 +54,55 @@ class BridgeInterfaceTest(BasicInterfaceTest.BaseTest):
cls._options['br0'].append(f'member interface {member}')
cls._interfaces = list(cls._options)
+ # call base-classes classmethod
+ super(cls, cls).setUpClass()
+
+ def tearDown(self):
+ for intf in self._interfaces:
+ self.cli_delete(self._base_path + [intf])
+
+ super().tearDown()
+
def test_add_remove_bridge_member(self):
# Add member interfaces to bridge and set STP cost/priority
for interface in self._interfaces:
base = self._base_path + [interface]
- self.session.set(base + ['stp'])
- self.session.set(base + ['address', '192.0.2.1/24'])
+ self.cli_set(base + ['stp'])
+ self.cli_set(base + ['address', '192.0.2.1/24'])
cost = 1000
priority = 10
# assign members to bridge interface
for member in self._members:
base_member = base + ['member', 'interface', member]
- self.session.set(base_member + ['cost', str(cost)])
- self.session.set(base_member + ['priority', str(priority)])
+ self.cli_set(base_member + ['cost', str(cost)])
+ self.cli_set(base_member + ['priority', str(priority)])
cost += 1
priority += 1
# commit config
- self.session.commit()
-
- # check member interfaces are added on the bridge
- bridge_members = []
- for tmp in glob(f'/sys/class/net/{interface}/lower_*'):
- bridge_members.append(os.path.basename(tmp).replace('lower_', ''))
+ self.cli_commit()
- for member in self._members:
- self.assertIn(member, bridge_members)
-
- # delete all members
+ # Add member interfaces to bridge and set STP cost/priority
for interface in self._interfaces:
- self.session.delete(self._base_path + [interface, 'member'])
+ cost = 1000
+ priority = 10
+ for member in self._members:
+ tmp = get_interface_config(member)
+ self.assertEqual(interface, tmp['master'])
+ self.assertFalse( tmp['linkinfo']['info_slave_data']['isolated'])
+ self.assertEqual(cost, tmp['linkinfo']['info_slave_data']['cost'])
+ self.assertEqual(priority, tmp['linkinfo']['info_slave_data']['priority'])
- self.session.commit()
+ cost += 1
+ priority += 1
def test_bridge_vlan_filter(self):
# Add member interface to bridge and set VLAN filter
for interface in self._interfaces:
base = self._base_path + [interface]
- self.session.set(base + ['vif', '1', 'address', '192.0.2.1/24'])
- self.session.set(base + ['vif', '2', 'address', '192.0.3.1/24'])
+ self.cli_set(base + ['vif', '1', 'address', '192.0.2.1/24'])
+ self.cli_set(base + ['vif', '2', 'address', '192.0.3.1/24'])
vlan_id = 101
allowed_vlan = 2
@@ -99,13 +110,13 @@ class BridgeInterfaceTest(BasicInterfaceTest.BaseTest):
# assign members to bridge interface
for member in self._members:
base_member = base + ['member', 'interface', member]
- self.session.set(base_member + ['allowed-vlan', str(allowed_vlan)])
- self.session.set(base_member + ['allowed-vlan', allowed_vlan_range])
- self.session.set(base_member + ['native-vlan', str(vlan_id)])
+ self.cli_set(base_member + ['allowed-vlan', str(allowed_vlan)])
+ self.cli_set(base_member + ['allowed-vlan', allowed_vlan_range])
+ self.cli_set(base_member + ['native-vlan', str(vlan_id)])
vlan_id += 1
# commit config
- self.session.commit()
+ self.cli_commit()
# Detect the vlan filter function
for interface in self._interfaces:
@@ -161,8 +172,7 @@ class BridgeInterfaceTest(BasicInterfaceTest.BaseTest):
# delete all members
for interface in self._interfaces:
- self.session.delete(self._base_path + [interface, 'member'])
-
+ self.cli_delete(self._base_path + [interface, 'member'])
def test_bridge_vlan_members(self):
# T2945: ensure that VIFs are not dropped from bridge
@@ -170,10 +180,10 @@ class BridgeInterfaceTest(BasicInterfaceTest.BaseTest):
for interface in self._interfaces:
for member in self._members:
for vif in vifs:
- self.session.set(['interfaces', 'ethernet', member, 'vif', vif])
- self.session.set(['interfaces', 'bridge', interface, 'member', 'interface', f'{member}.{vif}'])
+ self.cli_set(['interfaces', 'ethernet', member, 'vif', vif])
+ self.cli_set(['interfaces', 'bridge', interface, 'member', 'interface', f'{member}.{vif}'])
- self.session.commit()
+ self.cli_commit()
# Verify config
for interface in self._interfaces:
@@ -182,10 +192,12 @@ class BridgeInterfaceTest(BasicInterfaceTest.BaseTest):
# member interface must be assigned to the bridge
self.assertTrue(os.path.exists(f'/sys/class/net/{interface}/lower_{member}.{vif}'))
- # remove VLAN interfaces
- for vif in vifs:
- self.session.delete(['interfaces', 'ethernet', member, 'vif', vif])
+ # delete all members
+ for interface in self._interfaces:
+ for member in self._members:
+ for vif in vifs:
+ self.cli_delete(['interfaces', 'ethernet', member, 'vif', vif])
+ self.cli_delete(['interfaces', 'bridge', interface, 'member', 'interface', f'{member}.{vif}'])
if __name__ == '__main__':
unittest.main(verbosity=2)
-
diff --git a/smoketest/scripts/cli/test_interfaces_dummy.py b/smoketest/scripts/cli/test_interfaces_dummy.py
index 6e462bccf..dedc6fe05 100755
--- a/smoketest/scripts/cli/test_interfaces_dummy.py
+++ b/smoketest/scripts/cli/test_interfaces_dummy.py
@@ -18,11 +18,13 @@ import unittest
from base_interfaces_test import BasicInterfaceTest
-class DummyInterfaceTest(BasicInterfaceTest.BaseTest):
+class DummyInterfaceTest(BasicInterfaceTest.TestCase):
@classmethod
def setUpClass(cls):
cls._base_path = ['interfaces', 'dummy']
cls._interfaces = ['dum435', 'dum8677', 'dum0931', 'dum089']
+ # call base-classes classmethod
+ super(cls, cls).setUpClass()
if __name__ == '__main__':
unittest.main(verbosity=2)
diff --git a/smoketest/scripts/cli/test_interfaces_ethernet.py b/smoketest/scripts/cli/test_interfaces_ethernet.py
index bbc5478dc..a31d75423 100755
--- a/smoketest/scripts/cli/test_interfaces_ethernet.py
+++ b/smoketest/scripts/cli/test_interfaces_ethernet.py
@@ -34,7 +34,7 @@ def get_wpa_supplicant_value(interface, key):
tmp = re.findall(r'\n?{}=(.*)'.format(key), tmp)
return tmp[0]
-class EthernetInterfaceTest(BasicInterfaceTest.BaseTest):
+class EthernetInterfaceTest(BasicInterfaceTest.TestCase):
@classmethod
def setUpClass(cls):
cls._test_ip = True
@@ -61,37 +61,39 @@ class EthernetInterfaceTest(BasicInterfaceTest.BaseTest):
for interface in cls._interfaces:
cls._macs[interface] = read_file(f'/sys/class/net/{interface}/address')
+ # call base-classes classmethod
+ super(cls, cls).setUpClass()
+
def tearDown(self):
for interface in self._interfaces:
# when using a dedicated interface to test via TEST_ETH environment
# variable only this one will be cleared in the end - usable to test
# ethernet interfaces via SSH
- self.session.delete(self._base_path + [interface])
- self.session.set(self._base_path + [interface, 'duplex', 'auto'])
- self.session.set(self._base_path + [interface, 'speed', 'auto'])
- self.session.set(self._base_path + [interface, 'hw-id', self._macs[interface]])
+ self.cli_delete(self._base_path + [interface])
+ self.cli_set(self._base_path + [interface, 'duplex', 'auto'])
+ self.cli_set(self._base_path + [interface, 'speed', 'auto'])
+ self.cli_set(self._base_path + [interface, 'hw-id', self._macs[interface]])
# Tear down mirror interfaces for SPAN (Switch Port Analyzer)
for span in self._mirror_interfaces:
section = Section.section(span)
- self.session.delete(['interfaces', section, span])
+ self.cli_delete(['interfaces', section, span])
- self.session.commit()
- del self.session
+ self.cli_commit()
def test_dhcp_disable_interface(self):
# When interface is configured as admin down, it must be admin down
# even when dhcpc starts on the given interface
for interface in self._interfaces:
- self.session.set(self._base_path + [interface, 'disable'])
+ self.cli_set(self._base_path + [interface, 'disable'])
# Also enable DHCP (ISC DHCP always places interface in admin up
# state so we check that we do not start DHCP client.
# https://phabricator.vyos.net/T2767
- self.session.set(self._base_path + [interface, 'address', 'dhcp'])
+ self.cli_set(self._base_path + [interface, 'address', 'dhcp'])
- self.session.commit()
+ self.cli_commit()
# Validate interface state
for interface in self._interfaces:
@@ -111,9 +113,9 @@ class EthernetInterfaceTest(BasicInterfaceTest.BaseTest):
rps_cpus &= ~1
for interface in self._interfaces:
- self.session.set(self._base_path + [interface, 'offload', 'rps'])
+ self.cli_set(self._base_path + [interface, 'offload', 'rps'])
- self.session.commit()
+ self.cli_commit()
for interface in self._interfaces:
cpus = read_file(f'/sys/class/net/{interface}/queues/rx-0/rps_cpus')
@@ -125,35 +127,35 @@ class EthernetInterfaceTest(BasicInterfaceTest.BaseTest):
def test_non_existing_interface(self):
unknonw_interface = self._base_path + ['eth667']
- self.session.set(unknonw_interface)
+ self.cli_set(unknonw_interface)
# check validate() - interface does not exist
with self.assertRaises(ConfigSessionError):
- self.session.commit()
+ self.cli_commit()
# we need to remove this wrong interface from the configuration
# manually, else tearDown() will have problem in commit()
- self.session.delete(unknonw_interface)
+ self.cli_delete(unknonw_interface)
def test_speed_duplex_verify(self):
for interface in self._interfaces:
- self.session.set(self._base_path + [interface, 'speed', '1000'])
+ self.cli_set(self._base_path + [interface, 'speed', '1000'])
# check validate() - if either speed or duplex is not auto, the
# other one must be manually configured, too
with self.assertRaises(ConfigSessionError):
- self.session.commit()
- self.session.set(self._base_path + [interface, 'speed', 'auto'])
- self.session.commit()
+ self.cli_commit()
+ self.cli_set(self._base_path + [interface, 'speed', 'auto'])
+ self.cli_commit()
def test_eapol_support(self):
for interface in self._interfaces:
# Enable EAPoL
- self.session.set(self._base_path + [interface, 'eapol', 'ca-cert-file', ca_cert])
- self.session.set(self._base_path + [interface, 'eapol', 'cert-file', ssl_cert])
- self.session.set(self._base_path + [interface, 'eapol', 'key-file', ssl_key])
+ self.cli_set(self._base_path + [interface, 'eapol', 'ca-cert-file', ca_cert])
+ self.cli_set(self._base_path + [interface, 'eapol', 'cert-file', ssl_cert])
+ self.cli_set(self._base_path + [interface, 'eapol', 'key-file', ssl_key])
- self.session.commit()
+ self.cli_commit()
# Check for running process
self.assertTrue(process_named_running('wpa_supplicant'))
diff --git a/smoketest/scripts/cli/test_interfaces_geneve.py b/smoketest/scripts/cli/test_interfaces_geneve.py
index b708b5437..8a18d8344 100755
--- a/smoketest/scripts/cli/test_interfaces_geneve.py
+++ b/smoketest/scripts/cli/test_interfaces_geneve.py
@@ -19,7 +19,7 @@ import unittest
from vyos.configsession import ConfigSession
from base_interfaces_test import BasicInterfaceTest
-class GeneveInterfaceTest(BasicInterfaceTest.BaseTest):
+class GeneveInterfaceTest(BasicInterfaceTest.TestCase):
@classmethod
def setUpClass(cls):
cls._test_ip = True
@@ -30,6 +30,8 @@ class GeneveInterfaceTest(BasicInterfaceTest.BaseTest):
'gnv1': ['vni 20', 'remote 127.0.1.2'],
}
cls._interfaces = list(cls._options)
+ # call base-classes classmethod
+ super(cls, cls).setUpClass()
if __name__ == '__main__':
unittest.main(verbosity=2)
diff --git a/smoketest/scripts/cli/test_interfaces_l2tpv3.py b/smoketest/scripts/cli/test_interfaces_l2tpv3.py
index b9e65f51d..06ced5c40 100755
--- a/smoketest/scripts/cli/test_interfaces_l2tpv3.py
+++ b/smoketest/scripts/cli/test_interfaces_l2tpv3.py
@@ -21,23 +21,25 @@ import unittest
from base_interfaces_test import BasicInterfaceTest
from vyos.util import cmd
-class L2TPv3InterfaceTest(BasicInterfaceTest.BaseTest):
+class L2TPv3InterfaceTest(BasicInterfaceTest.TestCase):
@classmethod
def setUpClass(cls):
cls._test_ip = True
cls._test_ipv6 = True
cls._base_path = ['interfaces', 'l2tpv3']
cls._options = {
- 'l2tpeth10': ['local-ip 127.0.0.1', 'remote-ip 127.10.10.10',
+ 'l2tpeth10': ['source-address 127.0.0.1', 'remote 127.10.10.10',
'tunnel-id 100', 'peer-tunnel-id 10',
'session-id 100', 'peer-session-id 10',
'source-port 1010', 'destination-port 10101'],
- 'l2tpeth20': ['local-ip 127.0.0.1', 'peer-session-id 20',
- 'peer-tunnel-id 200', 'remote-ip 127.20.20.20',
+ 'l2tpeth20': ['source-address 127.0.0.1', 'peer-session-id 20',
+ 'peer-tunnel-id 200', 'remote 127.20.20.20',
'session-id 20', 'tunnel-id 200',
'source-port 2020', 'destination-port 20202'],
}
cls._interfaces = list(cls._options)
+ # call base-classes classmethod
+ super(cls, cls).setUpClass()
def test_add_single_ip_address(self):
super().test_add_single_ip_address()
diff --git a/smoketest/scripts/cli/test_interfaces_loopback.py b/smoketest/scripts/cli/test_interfaces_loopback.py
index 77dd4c1b5..85b5ca6d6 100755
--- a/smoketest/scripts/cli/test_interfaces_loopback.py
+++ b/smoketest/scripts/cli/test_interfaces_loopback.py
@@ -23,16 +23,17 @@ from vyos.validate import is_intf_addr_assigned
loopbacks = ['127.0.0.1', '::1']
-class LoopbackInterfaceTest(BasicInterfaceTest.BaseTest):
+class LoopbackInterfaceTest(BasicInterfaceTest.TestCase):
@classmethod
def setUpClass(cls):
- cls._base_path = ['interfaces', 'loopback']
- cls._interfaces = ['lo']
+ cls._base_path = ['interfaces', 'loopback']
+ cls._interfaces = ['lo']
+ # call base-classes classmethod
+ super(cls, cls).setUpClass()
def tearDown(self):
- self.session.delete(self._base_path)
- self.session.commit()
- del self.session
+ self.cli_delete(self._base_path)
+ self.cli_commit()
# loopback interface must persist!
for intf in self._interfaces:
diff --git a/smoketest/scripts/cli/test_interfaces_macsec.py b/smoketest/scripts/cli/test_interfaces_macsec.py
index 31071a0f7..e4280a5b7 100755
--- a/smoketest/scripts/cli/test_interfaces_macsec.py
+++ b/smoketest/scripts/cli/test_interfaces_macsec.py
@@ -37,20 +37,22 @@ def get_cipher(interface):
tmp = get_interface_config(interface)
return tmp['linkinfo']['info_data']['cipher_suite'].lower()
-class MACsecInterfaceTest(BasicInterfaceTest.BaseTest):
+class MACsecInterfaceTest(BasicInterfaceTest.TestCase):
@classmethod
def setUpClass(cls):
- cls._test_ip = True
- cls._test_ipv6 = True
- cls._base_path = ['interfaces', 'macsec']
- cls._options = { 'macsec0': ['source-interface eth0', 'security cipher gcm-aes-128'] }
+ cls._test_ip = True
+ cls._test_ipv6 = True
+ cls._base_path = ['interfaces', 'macsec']
+ cls._options = { 'macsec0': ['source-interface eth0', 'security cipher gcm-aes-128'] }
- # if we have a physical eth1 interface, add a second macsec instance
- if 'eth1' in Section.interfaces('ethernet'):
- macsec = { 'macsec1': [f'source-interface eth1', 'security cipher gcm-aes-128'] }
- cls._options.update(macsec)
+ # if we have a physical eth1 interface, add a second macsec instance
+ if 'eth1' in Section.interfaces('ethernet'):
+ macsec = { 'macsec1': [f'source-interface eth1', 'security cipher gcm-aes-128'] }
+ cls._options.update(macsec)
- cls._interfaces = list(cls._options)
+ cls._interfaces = list(cls._options)
+ # call base-classes classmethod
+ super(cls, cls).setUpClass()
def test_macsec_encryption(self):
# MACsec can be operating in authentication and encryption mode - both
@@ -66,31 +68,31 @@ class MACsecInterfaceTest(BasicInterfaceTest.BaseTest):
if option.split()[0] == 'source-interface':
src_interface = option.split()[1]
- self.session.set(self._base_path + [interface] + option.split())
+ self.cli_set(self._base_path + [interface] + option.split())
# Encrypt link
- self.session.set(self._base_path + [interface, 'security', 'encrypt'])
+ self.cli_set(self._base_path + [interface, 'security', 'encrypt'])
# check validate() - Physical source interface MTU must be higher then our MTU
- self.session.set(self._base_path + [interface, 'mtu', '1500'])
+ self.cli_set(self._base_path + [interface, 'mtu', '1500'])
with self.assertRaises(ConfigSessionError):
- self.session.commit()
- self.session.delete(self._base_path + [interface, 'mtu'])
+ self.cli_commit()
+ self.cli_delete(self._base_path + [interface, 'mtu'])
# check validate() - MACsec security keys mandartory when encryption is enabled
with self.assertRaises(ConfigSessionError):
- self.session.commit()
- self.session.set(self._base_path + [interface, 'security', 'mka', 'cak', mak_cak])
+ self.cli_commit()
+ self.cli_set(self._base_path + [interface, 'security', 'mka', 'cak', mak_cak])
# check validate() - MACsec security keys mandartory when encryption is enabled
with self.assertRaises(ConfigSessionError):
- self.session.commit()
- self.session.set(self._base_path + [interface, 'security', 'mka', 'ckn', mak_ckn])
+ self.cli_commit()
+ self.cli_set(self._base_path + [interface, 'security', 'mka', 'ckn', mak_ckn])
- self.session.set(self._base_path + [interface, 'security', 'replay-window', replay_window])
+ self.cli_set(self._base_path + [interface, 'security', 'replay-window', replay_window])
# final commit of settings
- self.session.commit()
+ self.cli_commit()
tmp = get_config_value(src_interface, 'macsec_integ_only')
self.assertTrue("0" in tmp)
@@ -117,20 +119,20 @@ class MACsecInterfaceTest(BasicInterfaceTest.BaseTest):
def test_macsec_gcm_aes_128(self):
interface = 'macsec1'
cipher = 'gcm-aes-128'
- self.session.set(self._base_path + [interface])
+ self.cli_set(self._base_path + [interface])
# check validate() - source interface is mandatory
with self.assertRaises(ConfigSessionError):
- self.session.commit()
- self.session.set(self._base_path + [interface, 'source-interface', 'eth0'])
+ self.cli_commit()
+ self.cli_set(self._base_path + [interface, 'source-interface', 'eth0'])
# check validate() - cipher is mandatory
with self.assertRaises(ConfigSessionError):
- self.session.commit()
- self.session.set(self._base_path + [interface, 'security', 'cipher', cipher])
+ self.cli_commit()
+ self.cli_set(self._base_path + [interface, 'security', 'cipher', cipher])
# final commit and verify
- self.session.commit()
+ self.cli_commit()
self.assertIn(interface, interfaces())
self.assertIn(interface, interfaces())
self.assertEqual(cipher, get_cipher(interface))
@@ -138,20 +140,20 @@ class MACsecInterfaceTest(BasicInterfaceTest.BaseTest):
def test_macsec_gcm_aes_256(self):
interface = 'macsec4'
cipher = 'gcm-aes-256'
- self.session.set(self._base_path + [interface])
+ self.cli_set(self._base_path + [interface])
# check validate() - source interface is mandatory
with self.assertRaises(ConfigSessionError):
- self.session.commit()
- self.session.set(self._base_path + [interface, 'source-interface', 'eth0'])
+ self.cli_commit()
+ self.cli_set(self._base_path + [interface, 'source-interface', 'eth0'])
# check validate() - cipher is mandatory
with self.assertRaises(ConfigSessionError):
- self.session.commit()
- self.session.set(self._base_path + [interface, 'security', 'cipher', cipher])
+ self.cli_commit()
+ self.cli_set(self._base_path + [interface, 'security', 'cipher', cipher])
# final commit and verify
- self.session.commit()
+ self.cli_commit()
self.assertIn(interface, interfaces())
self.assertEqual(cipher, get_cipher(interface))
@@ -163,24 +165,24 @@ class MACsecInterfaceTest(BasicInterfaceTest.BaseTest):
for interface, option_value in self._options.items():
for option in option_value:
- self.session.set(self._base_path + [interface] + option.split())
+ self.cli_set(self._base_path + [interface] + option.split())
if option.split()[0] == 'source-interface':
src_interface = option.split()[1]
- self.session.set(base_bridge + ['member', 'interface', src_interface])
+ self.cli_set(base_bridge + ['member', 'interface', src_interface])
# check validate() - Source interface must not already be a member of a bridge
with self.assertRaises(ConfigSessionError):
- self.session.commit()
- self.session.delete(base_bridge)
+ self.cli_commit()
+ self.cli_delete(base_bridge)
- self.session.set(base_bond + ['member', 'interface', src_interface])
+ self.cli_set(base_bond + ['member', 'interface', src_interface])
# check validate() - Source interface must not already be a member of a bridge
with self.assertRaises(ConfigSessionError):
- self.session.commit()
- self.session.delete(base_bond)
+ self.cli_commit()
+ self.cli_delete(base_bond)
# final commit and verify
- self.session.commit()
+ self.cli_commit()
self.assertIn(interface, interfaces())
if __name__ == '__main__':
diff --git a/smoketest/scripts/cli/test_interfaces_openvpn.py b/smoketest/scripts/cli/test_interfaces_openvpn.py
index 42a008007..1a52a0a5b 100755
--- a/smoketest/scripts/cli/test_interfaces_openvpn.py
+++ b/smoketest/scripts/cli/test_interfaces_openvpn.py
@@ -21,6 +21,8 @@ from glob import glob
from ipaddress import IPv4Network
from netifaces import interfaces
+from base_vyostest_shim import VyOSUnitTestSHIM
+
from vyos.configsession import ConfigSession
from vyos.configsession import ConfigSessionError
from vyos.util import cmd
@@ -58,77 +60,76 @@ def get_vrf(interface):
tmp = tmp.replace('upper_', '')
return tmp
-class TestInterfacesOpenVPN(unittest.TestCase):
+class TestInterfacesOpenVPN(VyOSUnitTestSHIM.TestCase):
def setUp(self):
- self.session = ConfigSession(os.getpid())
- self.session.set(['interfaces', 'dummy', dummy_if, 'address', '192.0.2.1/32'])
- self.session.set(['vrf', 'name', vrf_name, 'table', '12345'])
+ self.cli_set(['interfaces', 'dummy', dummy_if, 'address', '192.0.2.1/32'])
+ self.cli_set(['vrf', 'name', vrf_name, 'table', '12345'])
def tearDown(self):
- self.session.delete(base_path)
- self.session.delete(['interfaces', 'dummy', dummy_if])
- self.session.delete(['vrf'])
- self.session.commit()
- del self.session
+ self.cli_delete(base_path)
+ self.cli_delete(['interfaces', 'dummy', dummy_if])
+ self.cli_delete(['vrf'])
+ self.cli_commit()
def test_openvpn_client_verify(self):
# Create OpenVPN client interface and test verify() steps.
interface = 'vtun2000'
path = base_path + [interface]
- self.session.set(path + ['mode', 'client'])
- self.session.set(path + ['encryption', 'ncp-ciphers', 'aes192gcm'])
+ self.cli_set(path + ['mode', 'client'])
+
+ self.cli_set(path + ['encryption', 'ncp-ciphers', 'aes192gcm'])
# check validate() - cannot specify local-port in client mode
- self.session.set(path + ['local-port', '5000'])
+ self.cli_set(path + ['local-port', '5000'])
with self.assertRaises(ConfigSessionError):
- self.session.commit()
- self.session.delete(path + ['local-port'])
+ self.cli_commit()
+ self.cli_delete(path + ['local-port'])
# check validate() - cannot specify local-host in client mode
- self.session.set(path + ['local-host', '127.0.0.1'])
+ self.cli_set(path + ['local-host', '127.0.0.1'])
with self.assertRaises(ConfigSessionError):
- self.session.commit()
- self.session.delete(path + ['local-host'])
+ self.cli_commit()
+ self.cli_delete(path + ['local-host'])
# check validate() - cannot specify protocol tcp-passive in client mode
- self.session.set(path + ['protocol', 'tcp-passive'])
+ self.cli_set(path + ['protocol', 'tcp-passive'])
with self.assertRaises(ConfigSessionError):
- self.session.commit()
- self.session.delete(path + ['protocol'])
+ self.cli_commit()
+ self.cli_delete(path + ['protocol'])
# check validate() - remote-host must be set in client mode
with self.assertRaises(ConfigSessionError):
- self.session.commit()
- self.session.set(path + ['remote-host', '192.0.9.9'])
+ self.cli_commit()
+ self.cli_set(path + ['remote-host', '192.0.9.9'])
# check validate() - cannot specify "tls dh-file" in client mode
- self.session.set(path + ['tls', 'dh-file', dh_pem])
+ self.cli_set(path + ['tls', 'dh-file', dh_pem])
with self.assertRaises(ConfigSessionError):
- self.session.commit()
- self.session.delete(path + ['tls'])
+ self.cli_commit()
+ self.cli_delete(path + ['tls'])
# check validate() - must specify one of "shared-secret-key-file" and "tls"
with self.assertRaises(ConfigSessionError):
- self.session.commit()
- self.session.set(path + ['shared-secret-key-file', s2s_key])
+ self.cli_commit()
+ self.cli_set(path + ['shared-secret-key-file', s2s_key])
# check validate() - must specify one of "shared-secret-key-file" and "tls"
with self.assertRaises(ConfigSessionError):
- self.session.commit()
- self.session.delete(path + ['shared-secret-key-file', s2s_key])
+ self.cli_commit()
+ self.cli_delete(path + ['shared-secret-key-file', s2s_key])
- self.session.set(path + ['tls', 'ca-cert-file', ca_cert])
- self.session.set(path + ['tls', 'cert-file', ssl_cert])
- self.session.set(path + ['tls', 'key-file', ssl_key])
+ self.cli_set(path + ['tls', 'ca-cert-file', ca_cert])
+ self.cli_set(path + ['tls', 'cert-file', ssl_cert])
+ self.cli_set(path + ['tls', 'key-file', ssl_key])
# check validate() - can not have auth username without a password
- self.session.set(path + ['authentication', 'username', 'vyos'])
+ self.cli_set(path + ['authentication', 'username', 'vyos'])
with self.assertRaises(ConfigSessionError):
- self.session.commit()
- self.session.set(path + ['authentication', 'password', 'vyos'])
+ self.cli_commit()
+ self.cli_set(path + ['authentication', 'password', 'vyos'])
# client commit must pass
- self.session.commit()
+ self.cli_commit()
self.assertTrue(process_named_running(PROCESS_NAME))
self.assertIn(interface, interfaces())
@@ -144,22 +145,22 @@ class TestInterfacesOpenVPN(unittest.TestCase):
path = base_path + [interface]
auth_hash = 'sha1'
- self.session.set(path + ['device-type', 'tun'])
- self.session.set(path + ['encryption', 'cipher', 'aes256'])
- self.session.set(path + ['hash', auth_hash])
- self.session.set(path + ['mode', 'client'])
- self.session.set(path + ['persistent-tunnel'])
- self.session.set(path + ['protocol', protocol])
- self.session.set(path + ['remote-host', remote_host])
- self.session.set(path + ['remote-port', remote_port])
- self.session.set(path + ['tls', 'ca-cert-file', ca_cert])
- self.session.set(path + ['tls', 'cert-file', ssl_cert])
- self.session.set(path + ['tls', 'key-file', ssl_key])
- self.session.set(path + ['vrf', vrf_name])
- self.session.set(path + ['authentication', 'username', interface+'user'])
- self.session.set(path + ['authentication', 'password', interface+'secretpw'])
-
- self.session.commit()
+ self.cli_set(path + ['device-type', 'tun'])
+ self.cli_set(path + ['encryption', 'cipher', 'aes256'])
+ self.cli_set(path + ['hash', auth_hash])
+ self.cli_set(path + ['mode', 'client'])
+ self.cli_set(path + ['persistent-tunnel'])
+ self.cli_set(path + ['protocol', protocol])
+ self.cli_set(path + ['remote-host', remote_host])
+ self.cli_set(path + ['remote-port', remote_port])
+ self.cli_set(path + ['tls', 'ca-cert-file', ca_cert])
+ self.cli_set(path + ['tls', 'cert-file', ssl_cert])
+ self.cli_set(path + ['tls', 'key-file', ssl_key])
+ self.cli_set(path + ['vrf', vrf_name])
+ self.cli_set(path + ['authentication', 'username', interface+'user'])
+ self.cli_set(path + ['authentication', 'password', interface+'secretpw'])
+
+ self.cli_commit()
for ii in num_range:
interface = f'vtun{ii}'
@@ -192,8 +193,8 @@ class TestInterfacesOpenVPN(unittest.TestCase):
self.assertIn(f'{interface}secretpw', pw)
# check that no interface remained after deleting them
- self.session.delete(base_path)
- self.session.commit()
+ self.cli_delete(base_path)
+ self.cli_commit()
for ii in num_range:
interface = f'vtun{ii}'
@@ -205,104 +206,104 @@ class TestInterfacesOpenVPN(unittest.TestCase):
path = base_path + [interface]
# check validate() - must speciy operating mode
- self.session.set(path)
+ self.cli_set(path)
with self.assertRaises(ConfigSessionError):
- self.session.commit()
- self.session.set(path + ['mode', 'server'])
+ self.cli_commit()
+ self.cli_set(path + ['mode', 'server'])
# check validate() - cannot specify protocol tcp-active in server mode
- self.session.set(path + ['protocol', 'tcp-active'])
+ self.cli_set(path + ['protocol', 'tcp-active'])
with self.assertRaises(ConfigSessionError):
- self.session.commit()
- self.session.delete(path + ['protocol'])
+ self.cli_commit()
+ self.cli_delete(path + ['protocol'])
# check validate() - cannot specify local-port in client mode
- self.session.set(path + ['remote-port', '5000'])
+ self.cli_set(path + ['remote-port', '5000'])
with self.assertRaises(ConfigSessionError):
- self.session.commit()
- self.session.delete(path + ['remote-port'])
+ self.cli_commit()
+ self.cli_delete(path + ['remote-port'])
# check validate() - cannot specify local-host in client mode
- self.session.set(path + ['remote-host', '127.0.0.1'])
+ self.cli_set(path + ['remote-host', '127.0.0.1'])
with self.assertRaises(ConfigSessionError):
- self.session.commit()
- self.session.delete(path + ['remote-host'])
+ self.cli_commit()
+ self.cli_delete(path + ['remote-host'])
# check validate() - must specify "tls dh-file" when not using EC keys
# in server mode
with self.assertRaises(ConfigSessionError):
- self.session.commit()
- self.session.set(path + ['tls', 'dh-file', dh_pem])
+ self.cli_commit()
+ self.cli_set(path + ['tls', 'dh-file', dh_pem])
# check validate() - must specify "server subnet" or add interface to
# bridge in server mode
with self.assertRaises(ConfigSessionError):
- self.session.commit()
+ self.cli_commit()
# check validate() - server client-ip-pool is too large
# [100.64.0.4 -> 100.127.255.251 = 4194295], maximum is 65536 addresses.
- self.session.set(path + ['server', 'subnet', '100.64.0.0/10'])
+ self.cli_set(path + ['server', 'subnet', '100.64.0.0/10'])
with self.assertRaises(ConfigSessionError):
- self.session.commit()
+ self.cli_commit()
# check validate() - cannot specify more than 1 IPv4 and 1 IPv6 server subnet
- self.session.set(path + ['server', 'subnet', '100.64.0.0/20'])
+ self.cli_set(path + ['server', 'subnet', '100.64.0.0/20'])
with self.assertRaises(ConfigSessionError):
- self.session.commit()
- self.session.delete(path + ['server', 'subnet', '100.64.0.0/10'])
+ self.cli_commit()
+ self.cli_delete(path + ['server', 'subnet', '100.64.0.0/10'])
# check validate() - must specify "tls ca-cert-file"
with self.assertRaises(ConfigSessionError):
- self.session.commit()
- self.session.set(path + ['tls', 'ca-cert-file', ca_cert])
+ self.cli_commit()
+ self.cli_set(path + ['tls', 'ca-cert-file', ca_cert])
# check validate() - must specify "tls cert-file"
with self.assertRaises(ConfigSessionError):
- self.session.commit()
- self.session.set(path + ['tls', 'cert-file', ssl_cert])
+ self.cli_commit()
+ self.cli_set(path + ['tls', 'cert-file', ssl_cert])
# check validate() - must specify "tls key-file"
with self.assertRaises(ConfigSessionError):
- self.session.commit()
- self.session.set(path + ['tls', 'key-file', ssl_key])
+ self.cli_commit()
+ self.cli_set(path + ['tls', 'key-file', ssl_key])
# check validate() - cannot specify "tls role" in client-server mode'
- self.session.set(path + ['tls', 'role', 'active'])
+ self.cli_set(path + ['tls', 'role', 'active'])
with self.assertRaises(ConfigSessionError):
- self.session.commit()
+ self.cli_commit()
# check validate() - cannot specify "tls role" in client-server mode'
- self.session.set(path + ['tls', 'auth-file', auth_key])
+ self.cli_set(path + ['tls', 'auth-file', auth_key])
with self.assertRaises(ConfigSessionError):
- self.session.commit()
+ self.cli_commit()
# check validate() - cannot specify "tcp-passive" when "tls role" is "active"
- self.session.set(path + ['protocol', 'tcp-passive'])
+ self.cli_set(path + ['protocol', 'tcp-passive'])
with self.assertRaises(ConfigSessionError):
- self.session.commit()
- self.session.delete(path + ['protocol'])
+ self.cli_commit()
+ self.cli_delete(path + ['protocol'])
# check validate() - cannot specify "tls dh-file" when "tls role" is "active"
- self.session.set(path + ['tls', 'dh-file', dh_pem])
+ self.cli_set(path + ['tls', 'dh-file', dh_pem])
with self.assertRaises(ConfigSessionError):
- self.session.commit()
- self.session.delete(path + ['tls', 'dh-file'])
+ self.cli_commit()
+ self.cli_delete(path + ['tls', 'dh-file'])
# Now test the other path with tls role passive
- self.session.set(path + ['tls', 'role', 'passive'])
+ self.cli_set(path + ['tls', 'role', 'passive'])
# check validate() - cannot specify "tcp-active" when "tls role" is "passive"
- self.session.set(path + ['protocol', 'tcp-active'])
+ self.cli_set(path + ['protocol', 'tcp-active'])
with self.assertRaises(ConfigSessionError):
- self.session.commit()
- self.session.delete(path + ['protocol'])
+ self.cli_commit()
+ self.cli_delete(path + ['protocol'])
# check validate() - must specify "tls dh-file" when "tls role" is "passive"
with self.assertRaises(ConfigSessionError):
- self.session.commit()
- self.session.set(path + ['tls', 'dh-file', dh_pem])
+ self.cli_commit()
+ self.cli_set(path + ['tls', 'dh-file', dh_pem])
- self.session.commit()
+ self.cli_commit()
self.assertTrue(process_named_running(PROCESS_NAME))
self.assertIn(interface, interfaces())
@@ -322,29 +323,29 @@ class TestInterfacesOpenVPN(unittest.TestCase):
path = base_path + [interface]
port = str(2000 + ii)
- self.session.set(path + ['device-type', 'tun'])
- self.session.set(path + ['encryption', 'cipher', 'aes192'])
- self.session.set(path + ['hash', auth_hash])
- self.session.set(path + ['mode', 'server'])
- self.session.set(path + ['local-port', port])
- self.session.set(path + ['server', 'subnet', subnet])
- self.session.set(path + ['server', 'topology', 'subnet'])
- self.session.set(path + ['keep-alive', 'failure-count', '5'])
- self.session.set(path + ['keep-alive', 'interval', '5'])
+ self.cli_set(path + ['device-type', 'tun'])
+ self.cli_set(path + ['encryption', 'cipher', 'aes192'])
+ self.cli_set(path + ['hash', auth_hash])
+ self.cli_set(path + ['mode', 'server'])
+ self.cli_set(path + ['local-port', port])
+ self.cli_set(path + ['server', 'subnet', subnet])
+ self.cli_set(path + ['server', 'topology', 'subnet'])
+ self.cli_set(path + ['keep-alive', 'failure-count', '5'])
+ self.cli_set(path + ['keep-alive', 'interval', '5'])
# clients
- self.session.set(path + ['server', 'client', 'client1', 'ip', client_ip])
+ self.cli_set(path + ['server', 'client', 'client1', 'ip', client_ip])
for route in client1_routes:
- self.session.set(path + ['server', 'client', 'client1', 'subnet', route])
+ self.cli_set(path + ['server', 'client', 'client1', 'subnet', route])
- self.session.set(path + ['replace-default-route'])
- self.session.set(path + ['tls', 'ca-cert-file', ca_cert])
- self.session.set(path + ['tls', 'cert-file', ssl_cert])
- self.session.set(path + ['tls', 'key-file', ssl_key])
- self.session.set(path + ['tls', 'dh-file', dh_pem])
- self.session.set(path + ['vrf', vrf_name])
+ self.cli_set(path + ['replace-default-route'])
+ self.cli_set(path + ['tls', 'ca-cert-file', ca_cert])
+ self.cli_set(path + ['tls', 'cert-file', ssl_cert])
+ self.cli_set(path + ['tls', 'key-file', ssl_key])
+ self.cli_set(path + ['tls', 'dh-file', dh_pem])
+ self.cli_set(path + ['vrf', vrf_name])
- self.session.commit()
+ self.cli_commit()
for ii in num_range:
interface = f'vtun{ii}'
@@ -396,8 +397,8 @@ class TestInterfacesOpenVPN(unittest.TestCase):
self.assertIn(interface, interfaces())
# check that no interface remained after deleting them
- self.session.delete(base_path)
- self.session.commit()
+ self.cli_delete(base_path)
+ self.cli_commit()
for ii in num_range:
interface = f'vtun{ii}'
@@ -415,23 +416,23 @@ class TestInterfacesOpenVPN(unittest.TestCase):
path = base_path + [interface]
port = str(2000 + ii)
- self.session.set(path + ['device-type', 'tun'])
- self.session.set(path + ['encryption', 'cipher', 'aes192'])
- self.session.set(path + ['hash', auth_hash])
- self.session.set(path + ['mode', 'server'])
- self.session.set(path + ['local-port', port])
- self.session.set(path + ['server', 'subnet', subnet])
- self.session.set(path + ['server', 'topology', 'net30'])
- self.session.set(path + ['replace-default-route'])
- self.session.set(path + ['keep-alive', 'failure-count', '10'])
- self.session.set(path + ['keep-alive', 'interval', '5'])
- self.session.set(path + ['tls', 'ca-cert-file', ca_cert])
- self.session.set(path + ['tls', 'cert-file', ssl_cert])
- self.session.set(path + ['tls', 'key-file', ssl_key])
- self.session.set(path + ['tls', 'dh-file', dh_pem])
- self.session.set(path + ['vrf', vrf_name])
-
- self.session.commit()
+ self.cli_set(path + ['device-type', 'tun'])
+ self.cli_set(path + ['encryption', 'cipher', 'aes192'])
+ self.cli_set(path + ['hash', auth_hash])
+ self.cli_set(path + ['mode', 'server'])
+ self.cli_set(path + ['local-port', port])
+ self.cli_set(path + ['server', 'subnet', subnet])
+ self.cli_set(path + ['server', 'topology', 'net30'])
+ self.cli_set(path + ['replace-default-route'])
+ self.cli_set(path + ['keep-alive', 'failure-count', '10'])
+ self.cli_set(path + ['keep-alive', 'interval', '5'])
+ self.cli_set(path + ['tls', 'ca-cert-file', ca_cert])
+ self.cli_set(path + ['tls', 'cert-file', ssl_cert])
+ self.cli_set(path + ['tls', 'key-file', ssl_key])
+ self.cli_set(path + ['tls', 'dh-file', dh_pem])
+ self.cli_set(path + ['vrf', vrf_name])
+
+ self.cli_commit()
for ii in num_range:
interface = f'vtun{ii}'
@@ -471,8 +472,8 @@ class TestInterfacesOpenVPN(unittest.TestCase):
self.assertIn(interface, interfaces())
# check that no interface remained after deleting them
- self.session.delete(base_path)
- self.session.commit()
+ self.cli_delete(base_path)
+ self.cli_commit()
for ii in num_range:
interface = f'vtun{ii}'
@@ -485,57 +486,57 @@ class TestInterfacesOpenVPN(unittest.TestCase):
interface = 'vtun5000'
path = base_path + [interface]
- self.session.set(path + ['mode', 'site-to-site'])
+ self.cli_set(path + ['mode', 'site-to-site'])
# check validate() - encryption ncp-ciphers cannot be specified in site-to-site mode
- self.session.set(path + ['encryption', 'ncp-ciphers', 'aes192gcm'])
+ self.cli_set(path + ['encryption', 'ncp-ciphers', 'aes192gcm'])
with self.assertRaises(ConfigSessionError):
- self.session.commit()
- self.session.delete(path + ['encryption'])
+ self.cli_commit()
+ self.cli_delete(path + ['encryption'])
# check validate() - must specify "local-address" or add interface to bridge
with self.assertRaises(ConfigSessionError):
- self.session.commit()
- self.session.set(path + ['local-address', '10.0.0.1'])
- self.session.set(path + ['local-address', '2001:db8:1::1'])
+ self.cli_commit()
+ self.cli_set(path + ['local-address', '10.0.0.1'])
+ self.cli_set(path + ['local-address', '2001:db8:1::1'])
# check validate() - cannot specify more than 1 IPv4 local-address
- self.session.set(path + ['local-address', '10.0.0.2'])
+ self.cli_set(path + ['local-address', '10.0.0.2'])
with self.assertRaises(ConfigSessionError):
- self.session.commit()
- self.session.delete(path + ['local-address', '10.0.0.2'])
+ self.cli_commit()
+ self.cli_delete(path + ['local-address', '10.0.0.2'])
# check validate() - cannot specify more than 1 IPv6 local-address
- self.session.set(path + ['local-address', '2001:db8:1::2'])
+ self.cli_set(path + ['local-address', '2001:db8:1::2'])
with self.assertRaises(ConfigSessionError):
- self.session.commit()
- self.session.delete(path + ['local-address', '2001:db8:1::2'])
+ self.cli_commit()
+ self.cli_delete(path + ['local-address', '2001:db8:1::2'])
# check validate() - IPv4 "local-address" requires IPv4 "remote-address"
# or IPv4 "local-address subnet"
with self.assertRaises(ConfigSessionError):
- self.session.commit()
- self.session.set(path + ['remote-address', '192.168.0.1'])
- self.session.set(path + ['remote-address', '2001:db8:ffff::1'])
+ self.cli_commit()
+ self.cli_set(path + ['remote-address', '192.168.0.1'])
+ self.cli_set(path + ['remote-address', '2001:db8:ffff::1'])
# check validate() - Cannot specify more than 1 IPv4 "remote-address"
- self.session.set(path + ['remote-address', '192.168.0.2'])
+ self.cli_set(path + ['remote-address', '192.168.0.2'])
with self.assertRaises(ConfigSessionError):
- self.session.commit()
- self.session.delete(path + ['remote-address', '192.168.0.2'])
+ self.cli_commit()
+ self.cli_delete(path + ['remote-address', '192.168.0.2'])
# check validate() - Cannot specify more than 1 IPv6 "remote-address"
- self.session.set(path + ['remote-address', '2001:db8:ffff::2'])
+ self.cli_set(path + ['remote-address', '2001:db8:ffff::2'])
with self.assertRaises(ConfigSessionError):
- self.session.commit()
- self.session.delete(path + ['remote-address', '2001:db8:ffff::2'])
+ self.cli_commit()
+ self.cli_delete(path + ['remote-address', '2001:db8:ffff::2'])
# check validate() - Must specify one of "shared-secret-key-file" and "tls"
with self.assertRaises(ConfigSessionError):
- self.session.commit()
- self.session.set(path + ['shared-secret-key-file', s2s_key])
+ self.cli_commit()
+ self.cli_set(path + ['shared-secret-key-file', s2s_key])
- self.session.commit()
+ self.cli_commit()
def test_openvpn_site2site_interfaces_tun(self):
# Create two OpenVPN site-to-site interfaces
@@ -553,23 +554,23 @@ class TestInterfacesOpenVPN(unittest.TestCase):
path = base_path + [interface]
port = str(3000 + ii)
- self.session.set(path + ['local-address', local_address])
+ self.cli_set(path + ['local-address', local_address])
# even numbers use tun type, odd numbers use tap type
if ii % 2 == 0:
- self.session.set(path + ['device-type', 'tun'])
+ self.cli_set(path + ['device-type', 'tun'])
else:
- self.session.set(path + ['device-type', 'tap'])
- self.session.set(path + ['local-address', local_address, 'subnet-mask', local_address_subnet])
+ self.cli_set(path + ['device-type', 'tap'])
+ self.cli_set(path + ['local-address', local_address, 'subnet-mask', local_address_subnet])
- self.session.set(path + ['mode', 'site-to-site'])
- self.session.set(path + ['local-port', port])
- self.session.set(path + ['remote-port', port])
- self.session.set(path + ['shared-secret-key-file', s2s_key])
- self.session.set(path + ['remote-address', remote_address])
- self.session.set(path + ['vrf', vrf_name])
+ self.cli_set(path + ['mode', 'site-to-site'])
+ self.cli_set(path + ['local-port', port])
+ self.cli_set(path + ['remote-port', port])
+ self.cli_set(path + ['shared-secret-key-file', s2s_key])
+ self.cli_set(path + ['remote-address', remote_address])
+ self.cli_set(path + ['vrf', vrf_name])
- self.session.commit()
+ self.cli_commit()
for ii in num_range:
interface = f'vtun{ii}'
@@ -600,8 +601,8 @@ class TestInterfacesOpenVPN(unittest.TestCase):
# check that no interface remained after deleting them
- self.session.delete(base_path)
- self.session.commit()
+ self.cli_delete(base_path)
+ self.cli_commit()
for ii in num_range:
interface = f'vtun{ii}'
diff --git a/smoketest/scripts/cli/test_interfaces_pppoe.py b/smoketest/scripts/cli/test_interfaces_pppoe.py
index f7fa73ea2..3412ebae0 100755
--- a/smoketest/scripts/cli/test_interfaces_pppoe.py
+++ b/smoketest/scripts/cli/test_interfaces_pppoe.py
@@ -15,11 +15,13 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import re
-import os
import unittest
from psutil import process_iter
-from vyos.configsession import ConfigSession, ConfigSessionError
+from base_vyostest_shim import VyOSUnitTestSHIM
+
+from vyos.configsession import ConfigSession
+from vyos.configsession import ConfigSessionError
from vyos.util import read_file
config_file = '/etc/ppp/peers/{}'
@@ -42,16 +44,14 @@ def get_dhcp6c_config_value(interface, key):
out.append(item.replace(';',''))
return out
-class PPPoEInterfaceTest(unittest.TestCase):
+class PPPoEInterfaceTest(VyOSUnitTestSHIM.TestCase):
def setUp(self):
- self.session = ConfigSession(os.getpid())
self._interfaces = ['pppoe10', 'pppoe20', 'pppoe30']
self._source_interface = 'eth0'
def tearDown(self):
- self.session.delete(base_path)
- self.session.commit()
- del self.session
+ self.cli_delete(base_path)
+ self.cli_commit()
def test_pppoe_client(self):
# Check if PPPoE dialer can be configured and runs
@@ -60,19 +60,19 @@ class PPPoEInterfaceTest(unittest.TestCase):
passwd = 'VyOS-passwd-' + interface
mtu = '1400'
- self.session.set(base_path + [interface, 'authentication', 'user', user])
- self.session.set(base_path + [interface, 'authentication', 'password', passwd])
- self.session.set(base_path + [interface, 'default-route', 'auto'])
- self.session.set(base_path + [interface, 'mtu', mtu])
- self.session.set(base_path + [interface, 'no-peer-dns'])
+ self.cli_set(base_path + [interface, 'authentication', 'user', user])
+ self.cli_set(base_path + [interface, 'authentication', 'password', passwd])
+ self.cli_set(base_path + [interface, 'default-route', 'auto'])
+ self.cli_set(base_path + [interface, 'mtu', mtu])
+ self.cli_set(base_path + [interface, 'no-peer-dns'])
# check validate() - a source-interface is required
with self.assertRaises(ConfigSessionError):
- self.session.commit()
- self.session.set(base_path + [interface, 'source-interface', self._source_interface])
+ self.cli_commit()
+ self.cli_set(base_path + [interface, 'source-interface', self._source_interface])
# commit changes
- self.session.commit()
+ self.cli_commit()
# verify configuration file(s)
for interface in self._interfaces:
@@ -97,27 +97,48 @@ class PPPoEInterfaceTest(unittest.TestCase):
self.assertTrue(running)
+
+ def test_pppoe_clent_disabled_interface(self):
+ # Check if PPPoE Client can be disabled
+ for interface in self._interfaces:
+ self.cli_set(base_path + [interface, 'authentication', 'user', 'vyos'])
+ self.cli_set(base_path + [interface, 'authentication', 'password', 'vyos'])
+ self.cli_set(base_path + [interface, 'source-interface', self._source_interface])
+ self.cli_set(base_path + [interface, 'disable'])
+
+ self.cli_commit()
+
+ # Validate PPPoE client process
+ running = False
+ for interface in self._interfaces:
+ for proc in process_iter():
+ if interface in proc.cmdline():
+ running = True
+
+ self.assertFalse(running)
+
+
def test_pppoe_dhcpv6pd(self):
# Check if PPPoE dialer can be configured with DHCPv6-PD
address = '1'
sla_id = '0'
sla_len = '8'
for interface in self._interfaces:
- self.session.set(base_path + [interface, 'authentication', 'user', 'vyos'])
- self.session.set(base_path + [interface, 'authentication', 'password', 'vyos'])
- self.session.set(base_path + [interface, 'default-route', 'none'])
- self.session.set(base_path + [interface, 'no-peer-dns'])
- self.session.set(base_path + [interface, 'source-interface', self._source_interface])
- self.session.set(base_path + [interface, 'ipv6', 'address', 'autoconf'])
+ self.cli_set(base_path + [interface, 'authentication', 'user', 'vyos'])
+ self.cli_set(base_path + [interface, 'authentication', 'password', 'vyos'])
+ self.cli_set(base_path + [interface, 'default-route', 'none'])
+ self.cli_set(base_path + [interface, 'no-peer-dns'])
+ self.cli_set(base_path + [interface, 'source-interface', self._source_interface])
+ self.cli_set(base_path + [interface, 'ipv6', 'address', 'autoconf'])
# prefix delegation stuff
dhcpv6_pd_base = base_path + [interface, 'dhcpv6-options', 'pd', '0']
- self.session.set(dhcpv6_pd_base + ['length', '56'])
- self.session.set(dhcpv6_pd_base + ['interface', self._source_interface, 'address', address])
- self.session.set(dhcpv6_pd_base + ['interface', self._source_interface, 'sla-id', sla_id])
+ self.cli_set(dhcpv6_pd_base + ['length', '56'])
+ self.cli_set(dhcpv6_pd_base + ['interface', self._source_interface, 'address', address])
+ self.cli_set(dhcpv6_pd_base + ['interface', self._source_interface, 'sla-id', sla_id])
# commit changes
- self.session.commit()
+ self.cli_commit()
# verify "normal" PPPoE value - 1492 is default MTU
tmp = get_config_value(interface, 'mtu')[1]
@@ -161,16 +182,16 @@ class PPPoEInterfaceTest(unittest.TestCase):
def test_pppoe_authentication(self):
# When username or password is set - so must be the other
interface = 'pppoe0'
- self.session.set(base_path + [interface, 'authentication', 'user', 'vyos'])
- self.session.set(base_path + [interface, 'source-interface', self._source_interface])
- self.session.set(base_path + [interface, 'ipv6', 'address', 'autoconf'])
+ self.cli_set(base_path + [interface, 'authentication', 'user', 'vyos'])
+ self.cli_set(base_path + [interface, 'source-interface', self._source_interface])
+ self.cli_set(base_path + [interface, 'ipv6', 'address', 'autoconf'])
# check validate() - if user is set, so must be the password
with self.assertRaises(ConfigSessionError):
- self.session.commit()
+ self.cli_commit()
- self.session.set(base_path + [interface, 'authentication', 'password', 'vyos'])
- self.session.commit()
+ self.cli_set(base_path + [interface, 'authentication', 'password', 'vyos'])
+ self.cli_commit()
if __name__ == '__main__':
unittest.main(verbosity=2)
diff --git a/smoketest/scripts/cli/test_interfaces_pseudo_ethernet.py b/smoketest/scripts/cli/test_interfaces_pseudo_ethernet.py
index 8dbbb7f95..dacbca99c 100755
--- a/smoketest/scripts/cli/test_interfaces_pseudo_ethernet.py
+++ b/smoketest/scripts/cli/test_interfaces_pseudo_ethernet.py
@@ -19,7 +19,7 @@ import unittest
from vyos.ifconfig import Section
from base_interfaces_test import BasicInterfaceTest
-class PEthInterfaceTest(BasicInterfaceTest.BaseTest):
+class PEthInterfaceTest(BasicInterfaceTest.TestCase):
@classmethod
def setUpClass(cls):
cls._test_ip = True
@@ -35,6 +35,8 @@ class PEthInterfaceTest(BasicInterfaceTest.BaseTest):
'peth1': ['source-interface eth1'],
}
cls._interfaces = list(cls._options)
+ # call base-classes classmethod
+ super(cls, cls).setUpClass()
if __name__ == '__main__':
unittest.main(verbosity=2)
diff --git a/smoketest/scripts/cli/test_interfaces_tunnel.py b/smoketest/scripts/cli/test_interfaces_tunnel.py
index e65d83840..3aed498b4 100755
--- a/smoketest/scripts/cli/test_interfaces_tunnel.py
+++ b/smoketest/scripts/cli/test_interfaces_tunnel.py
@@ -16,20 +16,18 @@
import unittest
-from vyos.configsession import ConfigSession
+from base_interfaces_test import BasicInterfaceTest
+
from vyos.configsession import ConfigSessionError
from vyos.util import get_interface_config
from vyos.template import inc_ip
-from vyos.util import cmd
-
-from base_interfaces_test import BasicInterfaceTest
remote_ip4 = '192.0.2.100'
remote_ip6 = '2001:db8::ffff'
source_if = 'dum2222'
mtu = 1476
-class TunnelInterfaceTest(BasicInterfaceTest.BaseTest):
+class TunnelInterfaceTest(BasicInterfaceTest.TestCase):
@classmethod
def setUpClass(cls):
cls._test_ip = True
@@ -43,17 +41,18 @@ class TunnelInterfaceTest(BasicInterfaceTest.BaseTest):
'tun20': ['encapsulation gre', 'remote 192.0.2.20', 'source-address ' + cls.local_v4],
}
cls._interfaces = list(cls._options)
+ # call base-classes classmethod
+ super(cls, cls).setUpClass()
def setUp(self):
super().setUp()
- self.session.set(['interfaces', 'dummy', source_if, 'address', self.local_v4 + '/32'])
- self.session.set(['interfaces', 'dummy', source_if, 'address', self.local_v6 + '/128'])
+ self.cli_set(['interfaces', 'dummy', source_if, 'address', self.local_v4 + '/32'])
+ self.cli_set(['interfaces', 'dummy', source_if, 'address', self.local_v6 + '/128'])
def tearDown(self):
- self.session.delete(['interfaces', 'dummy', source_if])
+ self.cli_delete(['interfaces', 'dummy', source_if])
super().tearDown()
-
def test_ipv4_encapsulations(self):
# When running tests ensure that for certain encapsulation types the
# local and remote IP address is actually an IPv4 address
@@ -61,31 +60,30 @@ class TunnelInterfaceTest(BasicInterfaceTest.BaseTest):
interface = f'tun1000'
local_if_addr = f'10.10.200.1/24'
for encapsulation in ['ipip', 'sit', 'gre', 'gretap']:
- self.session.set(self._base_path + [interface, 'address', local_if_addr])
- self.session.set(self._base_path + [interface, 'encapsulation', encapsulation])
- self.session.set(self._base_path + [interface, 'source-address', self.local_v6])
- self.session.set(self._base_path + [interface, 'remote', remote_ip6])
+ self.cli_set(self._base_path + [interface, 'address', local_if_addr])
+ self.cli_set(self._base_path + [interface, 'encapsulation', encapsulation])
+ self.cli_set(self._base_path + [interface, 'source-address', self.local_v6])
+ self.cli_set(self._base_path + [interface, 'remote', remote_ip6])
# Encapsulation mode requires IPv4 source-address
with self.assertRaises(ConfigSessionError):
- self.session.commit()
- self.session.set(self._base_path + [interface, 'source-address', self.local_v4])
+ self.cli_commit()
+ self.cli_set(self._base_path + [interface, 'source-address', self.local_v4])
- # Encapsulation mode requires IPv4 remote address
+ # Encapsulation mode requires IPv4 remote
with self.assertRaises(ConfigSessionError):
- self.session.commit()
- self.session.set(self._base_path + [interface, 'remote', remote_ip4])
-
- self.session.set(self._base_path + [interface, 'source-interface', source_if])
+ self.cli_commit()
+ self.cli_set(self._base_path + [interface, 'remote', remote_ip4])
+ self.cli_set(self._base_path + [interface, 'source-interface', source_if])
# Source interface can not be used with sit and gretap
if encapsulation in ['sit', 'gretap']:
with self.assertRaises(ConfigSessionError):
- self.session.commit()
- self.session.delete(self._base_path + [interface, 'source-interface'])
+ self.cli_commit()
+ self.cli_delete(self._base_path + [interface, 'source-interface'])
# Check if commit is ok
- self.session.commit()
+ self.cli_commit()
conf = get_interface_config(interface)
if encapsulation not in ['sit', 'gretap']:
@@ -93,12 +91,14 @@ class TunnelInterfaceTest(BasicInterfaceTest.BaseTest):
self.assertEqual(interface, conf['ifname'])
self.assertEqual(mtu, conf['mtu'])
+ self.assertEqual(encapsulation, conf['linkinfo']['info_kind'])
self.assertEqual(self.local_v4, conf['linkinfo']['info_data']['local'])
self.assertEqual(remote_ip4, conf['linkinfo']['info_data']['remote'])
+ self.assertTrue(conf['linkinfo']['info_data']['pmtudisc'])
# cleanup this instance
- self.session.delete(self._base_path + [interface])
- self.session.commit()
+ self.cli_delete(self._base_path + [interface])
+ self.cli_commit()
def test_ipv6_encapsulations(self):
# When running tests ensure that for certain encapsulation types the
@@ -107,31 +107,38 @@ class TunnelInterfaceTest(BasicInterfaceTest.BaseTest):
interface = f'tun1010'
local_if_addr = f'10.10.200.1/24'
for encapsulation in ['ipip6', 'ip6ip6', 'ip6gre']:
- self.session.set(self._base_path + [interface, 'address', local_if_addr])
- self.session.set(self._base_path + [interface, 'encapsulation', encapsulation])
- self.session.set(self._base_path + [interface, 'source-address', self.local_v4])
- self.session.set(self._base_path + [interface, 'remote', remote_ip4])
+ self.cli_set(self._base_path + [interface, 'address', local_if_addr])
+ self.cli_set(self._base_path + [interface, 'encapsulation', encapsulation])
+ self.cli_set(self._base_path + [interface, 'source-address', self.local_v4])
+ self.cli_set(self._base_path + [interface, 'remote', remote_ip4])
# Encapsulation mode requires IPv6 source-address
with self.assertRaises(ConfigSessionError):
- self.session.commit()
- self.session.set(self._base_path + [interface, 'source-address', self.local_v6])
+ self.cli_commit()
+ self.cli_set(self._base_path + [interface, 'source-address', self.local_v6])
- # Encapsulation mode requires IPv6 remote address
+ # Encapsulation mode requires IPv6 remote
with self.assertRaises(ConfigSessionError):
- self.session.commit()
- self.session.set(self._base_path + [interface, 'remote', remote_ip6])
+ self.cli_commit()
+ self.cli_set(self._base_path + [interface, 'remote', remote_ip6])
# Configure Tunnel Source interface
- self.session.set(self._base_path + [interface, 'source-interface', source_if])
+ self.cli_set(self._base_path + [interface, 'source-interface', source_if])
+ # Source interface can not be used with ip6gretap
+ if encapsulation in ['ip6gretap']:
+ with self.assertRaises(ConfigSessionError):
+ self.cli_commit()
+ self.cli_delete(self._base_path + [interface, 'source-interface'])
# Check if commit is ok
- self.session.commit()
+ self.cli_commit()
conf = get_interface_config(interface)
+ if encapsulation not in ['ip6gretap']:
+ self.assertEqual(source_if, conf['link'])
+
self.assertEqual(interface, conf['ifname'])
self.assertEqual(mtu, conf['mtu'])
- self.assertEqual(source_if, conf['link'])
# Not applicable for ip6gre
if 'proto' in conf['linkinfo']['info_data']:
@@ -146,46 +153,46 @@ class TunnelInterfaceTest(BasicInterfaceTest.BaseTest):
self.assertEqual(remote_ip6, conf['linkinfo']['info_data']['remote'])
# cleanup this instance
- self.session.delete(self._base_path + [interface])
- self.session.commit()
+ self.cli_delete(self._base_path + [interface])
+ self.cli_commit()
def test_tunnel_verify_local_dhcp(self):
- # We can not use local-ip and dhcp-interface at the same time
+ # We can not use source-address and dhcp-interface at the same time
interface = f'tun1020'
local_if_addr = f'10.0.0.1/24'
- self.session.set(self._base_path + [interface, 'address', local_if_addr])
- self.session.set(self._base_path + [interface, 'encapsulation', 'gre'])
- self.session.set(self._base_path + [interface, 'source-address', self.local_v4])
- self.session.set(self._base_path + [interface, 'remote', remote_ip4])
- self.session.set(self._base_path + [interface, 'dhcp-interface', 'eth0'])
+ self.cli_set(self._base_path + [interface, 'address', local_if_addr])
+ self.cli_set(self._base_path + [interface, 'encapsulation', 'gre'])
+ self.cli_set(self._base_path + [interface, 'source-address', self.local_v4])
+ self.cli_set(self._base_path + [interface, 'remote', remote_ip4])
+ self.cli_set(self._base_path + [interface, 'dhcp-interface', 'eth0'])
# source-address and dhcp-interface can not be used at the same time
with self.assertRaises(ConfigSessionError):
- self.session.commit()
- self.session.delete(self._base_path + [interface, 'dhcp-interface'])
+ self.cli_commit()
+ self.cli_delete(self._base_path + [interface, 'dhcp-interface'])
# Check if commit is ok
- self.session.commit()
+ self.cli_commit()
def test_tunnel_parameters_gre(self):
interface = f'tun1030'
gre_key = '10'
encapsulation = 'gre'
tos = '20'
- ttl = 0
- self.session.set(self._base_path + [interface, 'encapsulation', encapsulation])
- self.session.set(self._base_path + [interface, 'source-address', self.local_v4])
- self.session.set(self._base_path + [interface, 'remote', remote_ip4])
+ self.cli_set(self._base_path + [interface, 'encapsulation', encapsulation])
+ self.cli_set(self._base_path + [interface, 'source-address', self.local_v4])
+ self.cli_set(self._base_path + [interface, 'remote', remote_ip4])
- self.session.set(self._base_path + [interface, 'parameters', 'ip', 'key', gre_key])
- self.session.set(self._base_path + [interface, 'parameters', 'ip', 'tos', tos])
- self.session.set(self._base_path + [interface, 'parameters', 'ip', 'ttl', str(ttl)])
+ self.cli_set(self._base_path + [interface, 'parameters', 'ip', 'no-pmtu-discovery'])
+ self.cli_set(self._base_path + [interface, 'parameters', 'ip', 'key', gre_key])
+ self.cli_set(self._base_path + [interface, 'parameters', 'ip', 'tos', tos])
+ self.cli_set(self._base_path + [interface, 'parameters', 'ip', 'ttl', '0'])
# Check if commit is ok
- self.session.commit()
+ self.cli_commit()
conf = get_interface_config(interface)
self.assertEqual(mtu, conf['mtu'])
@@ -193,7 +200,8 @@ class TunnelInterfaceTest(BasicInterfaceTest.BaseTest):
self.assertEqual(encapsulation, conf['linkinfo']['info_kind'])
self.assertEqual(self.local_v4, conf['linkinfo']['info_data']['local'])
self.assertEqual(remote_ip4, conf['linkinfo']['info_data']['remote'])
- self.assertEqual(ttl, conf['linkinfo']['info_data']['ttl'])
+ self.assertEqual(0, conf['linkinfo']['info_data']['ttl'])
+ self.assertFalse( conf['linkinfo']['info_data']['pmtudisc'])
def test_gretap_parameters_change(self):
interface = f'tun1040'
@@ -201,31 +209,29 @@ class TunnelInterfaceTest(BasicInterfaceTest.BaseTest):
encapsulation = 'gretap'
tos = '20'
- self.session.set(self._base_path + [interface, 'encapsulation', encapsulation])
- self.session.set(self._base_path + [interface, 'source-address', self.local_v4])
- self.session.set(self._base_path + [interface, 'remote', remote_ip4])
+ self.cli_set(self._base_path + [interface, 'encapsulation', encapsulation])
+ self.cli_set(self._base_path + [interface, 'source-address', self.local_v4])
+ self.cli_set(self._base_path + [interface, 'remote', remote_ip4])
# Check if commit is ok
- self.session.commit()
+ self.cli_commit()
conf = get_interface_config(interface)
self.assertEqual(mtu, conf['mtu'])
self.assertEqual(interface, conf['ifname'])
- self.assertEqual('gretap', conf['linkinfo']['info_kind'])
+ self.assertEqual(encapsulation, conf['linkinfo']['info_kind'])
self.assertEqual(self.local_v4, conf['linkinfo']['info_data']['local'])
self.assertEqual(remote_ip4, conf['linkinfo']['info_data']['remote'])
- # TTL uses a default value
- self.assertEqual(64, conf['linkinfo']['info_data']['ttl'])
+ self.assertEqual(64, conf['linkinfo']['info_data']['ttl'])
# Change remote ip address (inc host by 2
new_remote = inc_ip(remote_ip4, 2)
- self.session.set(self._base_path + [interface, 'remote', new_remote])
-
+ self.cli_set(self._base_path + [interface, 'remote', new_remote])
# Check if commit is ok
- self.session.commit()
+ self.cli_commit()
conf = get_interface_config(interface)
self.assertEqual(new_remote, conf['linkinfo']['info_data']['remote'])
if __name__ == '__main__':
- unittest.main(verbosity=2, failfast=True)
+ unittest.main(verbosity=2)
diff --git a/smoketest/scripts/cli/test_interfaces_vxlan.py b/smoketest/scripts/cli/test_interfaces_vxlan.py
index adbe7138b..0fba0f460 100755
--- a/smoketest/scripts/cli/test_interfaces_vxlan.py
+++ b/smoketest/scripts/cli/test_interfaces_vxlan.py
@@ -16,10 +16,13 @@
import unittest
-from vyos.configsession import ConfigSession, ConfigSessionError
+from vyos.configsession import ConfigSession
+from vyos.ifconfig import Interface
+from vyos.util import get_interface_config
+
from base_interfaces_test import BasicInterfaceTest
-class VXLANInterfaceTest(BasicInterfaceTest.BaseTest):
+class VXLANInterfaceTest(BasicInterfaceTest.TestCase):
@classmethod
def setUpClass(cls):
cls._test_ip = True
@@ -32,6 +35,8 @@ class VXLANInterfaceTest(BasicInterfaceTest.BaseTest):
'vxlan30': ['vni 30', 'remote 2001:db8:2000::1', 'source-address 2001:db8:1000::1'],
}
cls._interfaces = list(cls._options)
+ # call base-classes classmethod
+ super(cls, cls).setUpClass()
if __name__ == '__main__':
unittest.main(verbosity=2)
diff --git a/smoketest/scripts/cli/test_interfaces_wireguard.py b/smoketest/scripts/cli/test_interfaces_wireguard.py
index e70324f96..d31ec0332 100755
--- a/smoketest/scripts/cli/test_interfaces_wireguard.py
+++ b/smoketest/scripts/cli/test_interfaces_wireguard.py
@@ -17,8 +17,10 @@
import os
import unittest
-from vyos.configsession import ConfigSession, ConfigSessionError
-from base_interfaces_test import BasicInterfaceTest
+from base_vyostest_shim import VyOSUnitTestSHIM
+from vyos.configsession import ConfigSession
+from vyos.configsession import ConfigSessionError
+
# Generate WireGuard default keypair
if not os.path.isdir('/config/auth/wireguard/default'):
@@ -26,17 +28,15 @@ if not os.path.isdir('/config/auth/wireguard/default'):
base_path = ['interfaces', 'wireguard']
-class WireGuardInterfaceTest(unittest.TestCase):
+class WireGuardInterfaceTest(VyOSUnitTestSHIM.TestCase):
def setUp(self):
- self.session = ConfigSession(os.getpid())
self._test_addr = ['192.0.2.1/26', '192.0.2.255/31', '192.0.2.64/32',
'2001:db8:1::ffff/64', '2001:db8:101::1/112']
self._interfaces = ['wg0', 'wg1']
def tearDown(self):
- self.session.delete(base_path)
- self.session.commit()
- del self.session
+ self.cli_delete(base_path)
+ self.cli_commit()
def test_wireguard_peer(self):
# Create WireGuard interfaces with associated peers
@@ -46,19 +46,19 @@ class WireGuardInterfaceTest(unittest.TestCase):
pubkey = 'n6ZZL7ph/QJUJSUUTyu19c77my1dRCDHkMzFQUO9Z3A='
for addr in self._test_addr:
- self.session.set(base_path + [intf, 'address', addr])
+ self.cli_set(base_path + [intf, 'address', addr])
- self.session.set(base_path + [intf, 'peer', peer, 'address', '127.0.0.1'])
- self.session.set(base_path + [intf, 'peer', peer, 'port', '1337'])
+ self.cli_set(base_path + [intf, 'peer', peer, 'address', '127.0.0.1'])
+ self.cli_set(base_path + [intf, 'peer', peer, 'port', '1337'])
# Allow different prefixes to traverse the tunnel
allowed_ips = ['10.0.0.0/8', '172.16.0.0/12', '192.168.0.0/16']
for ip in allowed_ips:
- self.session.set(base_path + [intf, 'peer', peer, 'allowed-ips', ip])
+ self.cli_set(base_path + [intf, 'peer', peer, 'allowed-ips', ip])
- self.session.set(base_path + [intf, 'peer', peer, 'preshared-key', psk])
- self.session.set(base_path + [intf, 'peer', peer, 'pubkey', pubkey])
- self.session.commit()
+ self.cli_set(base_path + [intf, 'peer', peer, 'preshared-key', psk])
+ self.cli_set(base_path + [intf, 'peer', peer, 'pubkey', pubkey])
+ self.cli_commit()
self.assertTrue(os.path.isdir(f'/sys/class/net/{intf}'))
@@ -71,26 +71,26 @@ class WireGuardInterfaceTest(unittest.TestCase):
pubkey_1 = 'n1CUsmR0M2LUUsyicBd6blZICwUqqWWHbu4ifZ2/9gk='
pubkey_2 = 'ebFx/1G0ti8tvuZd94sEIosAZZIznX+dBAKG/8DFm0I='
- self.session.set(base_path + [interface, 'address', '172.16.0.1/24'])
+ self.cli_set(base_path + [interface, 'address', '172.16.0.1/24'])
- self.session.set(base_path + [interface, 'peer', 'PEER01', 'pubkey', pubkey_1])
- self.session.set(base_path + [interface, 'peer', 'PEER01', 'port', port])
- self.session.set(base_path + [interface, 'peer', 'PEER01', 'allowed-ips', '10.205.212.10/32'])
- self.session.set(base_path + [interface, 'peer', 'PEER01', 'address', '192.0.2.1'])
+ self.cli_set(base_path + [interface, 'peer', 'PEER01', 'pubkey', pubkey_1])
+ self.cli_set(base_path + [interface, 'peer', 'PEER01', 'port', port])
+ self.cli_set(base_path + [interface, 'peer', 'PEER01', 'allowed-ips', '10.205.212.10/32'])
+ self.cli_set(base_path + [interface, 'peer', 'PEER01', 'address', '192.0.2.1'])
- self.session.set(base_path + [interface, 'peer', 'PEER02', 'pubkey', pubkey_2])
- self.session.set(base_path + [interface, 'peer', 'PEER02', 'port', port])
- self.session.set(base_path + [interface, 'peer', 'PEER02', 'allowed-ips', '10.205.212.11/32'])
- self.session.set(base_path + [interface, 'peer', 'PEER02', 'address', '192.0.2.2'])
+ self.cli_set(base_path + [interface, 'peer', 'PEER02', 'pubkey', pubkey_2])
+ self.cli_set(base_path + [interface, 'peer', 'PEER02', 'port', port])
+ self.cli_set(base_path + [interface, 'peer', 'PEER02', 'allowed-ips', '10.205.212.11/32'])
+ self.cli_set(base_path + [interface, 'peer', 'PEER02', 'address', '192.0.2.2'])
# Commit peers
- self.session.commit()
+ self.cli_commit()
self.assertTrue(os.path.isdir(f'/sys/class/net/{interface}'))
# Delete second peer
- self.session.delete(base_path + [interface, 'peer', 'PEER01'])
- self.session.commit()
+ self.cli_delete(base_path + [interface, 'peer', 'PEER01'])
+ self.cli_commit()
if __name__ == '__main__':
unittest.main(verbosity=2)
diff --git a/smoketest/scripts/cli/test_interfaces_wireless.py b/smoketest/scripts/cli/test_interfaces_wireless.py
index 39e8cd5b8..4f539a23c 100755
--- a/smoketest/scripts/cli/test_interfaces_wireless.py
+++ b/smoketest/scripts/cli/test_interfaces_wireless.py
@@ -31,7 +31,7 @@ def get_config_value(interface, key):
tmp = re.findall(f'{key}=+(.*)', tmp)
return tmp[0]
-class WirelessInterfaceTest(BasicInterfaceTest.BaseTest):
+class WirelessInterfaceTest(BasicInterfaceTest.TestCase):
@classmethod
def setUpClass(cls):
cls._test_ip = True
@@ -47,6 +47,8 @@ class WirelessInterfaceTest(BasicInterfaceTest.BaseTest):
'type access-point', 'address 192.0.2.13/30', 'channel 0'],
}
cls._interfaces = list(cls._options)
+ # call base-classes classmethod
+ super(cls, cls).setUpClass()
def test_wireless_add_single_ip_address(self):
# derived method to check if member interfaces are enslaved properly
@@ -67,12 +69,12 @@ class WirelessInterfaceTest(BasicInterfaceTest.BaseTest):
interface = 'wlan0'
ssid = 'ssid'
- self.session.set(self._base_path + [interface, 'ssid', ssid])
- self.session.set(self._base_path + [interface, 'country-code', 'se'])
- self.session.set(self._base_path + [interface, 'type', 'access-point'])
+ self.cli_set(self._base_path + [interface, 'ssid', ssid])
+ self.cli_set(self._base_path + [interface, 'country-code', 'se'])
+ self.cli_set(self._base_path + [interface, 'type', 'access-point'])
# auto-powersave is special
- self.session.set(self._base_path + [interface, 'capabilities', 'ht', 'auto-powersave'])
+ self.cli_set(self._base_path + [interface, 'capabilities', 'ht', 'auto-powersave'])
ht_opt = {
# VyOS CLI option hostapd - ht_capab setting
@@ -88,7 +90,7 @@ class WirelessInterfaceTest(BasicInterfaceTest.BaseTest):
'smps static' : '[SMPS-STATIC]',
}
for key in ht_opt:
- self.session.set(self._base_path + [interface, 'capabilities', 'ht'] + key.split())
+ self.cli_set(self._base_path + [interface, 'capabilities', 'ht'] + key.split())
vht_opt = {
# VyOS CLI option hostapd - ht_capab setting
@@ -105,9 +107,9 @@ class WirelessInterfaceTest(BasicInterfaceTest.BaseTest):
'short-gi 160' : '[SHORT-GI-160]',
}
for key in vht_opt:
- self.session.set(self._base_path + [interface, 'capabilities', 'vht'] + key.split())
+ self.cli_set(self._base_path + [interface, 'capabilities', 'vht'] + key.split())
- self.session.commit()
+ self.cli_commit()
#
# Validate Config
@@ -148,29 +150,29 @@ class WirelessInterfaceTest(BasicInterfaceTest.BaseTest):
mode = 'n'
country = 'de'
- self.session.set(self._base_path + [interface, 'physical-device', phy])
- self.session.set(self._base_path + [interface, 'type', 'access-point'])
- self.session.set(self._base_path + [interface, 'mode', mode])
+ self.cli_set(self._base_path + [interface, 'physical-device', phy])
+ self.cli_set(self._base_path + [interface, 'type', 'access-point'])
+ self.cli_set(self._base_path + [interface, 'mode', mode])
# SSID must be set
with self.assertRaises(ConfigSessionError):
- self.session.commit()
- self.session.set(self._base_path + [interface, 'ssid', ssid])
+ self.cli_commit()
+ self.cli_set(self._base_path + [interface, 'ssid', ssid])
# Channel must be set
with self.assertRaises(ConfigSessionError):
- self.session.commit()
- self.session.set(self._base_path + [interface, 'channel', channel])
+ self.cli_commit()
+ self.cli_set(self._base_path + [interface, 'channel', channel])
# Country-Code must be set
with self.assertRaises(ConfigSessionError):
- self.session.commit()
- self.session.set(self._base_path + [interface, 'country-code', country])
+ self.cli_commit()
+ self.cli_set(self._base_path + [interface, 'country-code', country])
- self.session.set(self._base_path + [interface, 'security', 'wpa', 'mode', 'wpa2'])
- self.session.set(self._base_path + [interface, 'security', 'wpa', 'passphrase', wpa_key])
+ self.cli_set(self._base_path + [interface, 'security', 'wpa', 'mode', 'wpa2'])
+ self.cli_set(self._base_path + [interface, 'security', 'wpa', 'passphrase', wpa_key])
- self.session.commit()
+ self.cli_commit()
#
# Validate Config
@@ -211,13 +213,13 @@ class WirelessInterfaceTest(BasicInterfaceTest.BaseTest):
# We need a bridge where we can hook our access-point interface to
bridge_path = ['interfaces', 'bridge', bridge]
- self.session.set(bridge_path + ['member', 'interface', interface])
+ self.cli_set(bridge_path + ['member', 'interface', interface])
- self.session.set(self._base_path + [interface, 'ssid', ssid])
- self.session.set(self._base_path + [interface, 'country-code', 'se'])
- self.session.set(self._base_path + [interface, 'type', 'access-point'])
+ self.cli_set(self._base_path + [interface, 'ssid', ssid])
+ self.cli_set(self._base_path + [interface, 'country-code', 'se'])
+ self.cli_set(self._base_path + [interface, 'type', 'access-point'])
- self.session.commit()
+ self.cli_commit()
# Check for running process
self.assertTrue(process_named_running('hostapd'))
@@ -228,9 +230,9 @@ class WirelessInterfaceTest(BasicInterfaceTest.BaseTest):
self.assertIn(interface, bridge_members)
- self.session.delete(bridge_path)
- self.session.delete(self._base_path)
- self.session.commit()
+ self.cli_delete(bridge_path)
+ self.cli_delete(self._base_path)
+ self.cli_commit()
if __name__ == '__main__':
check_kmod('mac80211_hwsim')
diff --git a/smoketest/scripts/cli/test_nat.py b/smoketest/scripts/cli/test_nat.py
index b5702d691..0706f234e 100755
--- a/smoketest/scripts/cli/test_nat.py
+++ b/smoketest/scripts/cli/test_nat.py
@@ -19,6 +19,7 @@ import jmespath
import json
import unittest
+from base_vyostest_shim import VyOSUnitTestSHIM
from vyos.configsession import ConfigSession
from vyos.configsession import ConfigSessionError
from vyos.util import cmd
@@ -28,16 +29,15 @@ base_path = ['nat']
src_path = base_path + ['source']
dst_path = base_path + ['destination']
-class TestNAT(unittest.TestCase):
+class TestNAT(VyOSUnitTestSHIM.TestCase):
def setUp(self):
# ensure we can also run this test on a live system - so lets clean
# out the current configuration :)
- self.session = ConfigSession(os.getpid())
- self.session.delete(base_path)
+ self.cli_delete(base_path)
def tearDown(self):
- self.session.delete(base_path)
- self.session.commit()
+ self.cli_delete(base_path)
+ self.cli_commit()
def test_snat(self):
rules = ['100', '110', '120', '130', '200', '210', '220', '230']
@@ -48,15 +48,15 @@ class TestNAT(unittest.TestCase):
# depending of rule order we check either for source address for NAT
# or configured destination address for NAT
if int(rule) < 200:
- self.session.set(src_path + ['rule', rule, 'source', 'address', network])
- self.session.set(src_path + ['rule', rule, 'outbound-interface', outbound_iface_100])
- self.session.set(src_path + ['rule', rule, 'translation', 'address', 'masquerade'])
+ self.cli_set(src_path + ['rule', rule, 'source', 'address', network])
+ self.cli_set(src_path + ['rule', rule, 'outbound-interface', outbound_iface_100])
+ self.cli_set(src_path + ['rule', rule, 'translation', 'address', 'masquerade'])
else:
- self.session.set(src_path + ['rule', rule, 'destination', 'address', network])
- self.session.set(src_path + ['rule', rule, 'outbound-interface', outbound_iface_200])
- self.session.set(src_path + ['rule', rule, 'exclude'])
+ self.cli_set(src_path + ['rule', rule, 'destination', 'address', network])
+ self.cli_set(src_path + ['rule', rule, 'outbound-interface', outbound_iface_200])
+ self.cli_set(src_path + ['rule', rule, 'exclude'])
- self.session.commit()
+ self.cli_commit()
tmp = cmd('sudo nft -j list table nat')
data_json = jmespath.search('nftables[?rule].rule[?chain]', json.loads(tmp))
@@ -98,17 +98,17 @@ class TestNAT(unittest.TestCase):
for rule in rules:
port = f'10{rule}'
- self.session.set(dst_path + ['rule', rule, 'source', 'port', port])
- self.session.set(dst_path + ['rule', rule, 'translation', 'address', '192.0.2.1'])
- self.session.set(dst_path + ['rule', rule, 'translation', 'port', port])
+ self.cli_set(dst_path + ['rule', rule, 'source', 'port', port])
+ self.cli_set(dst_path + ['rule', rule, 'translation', 'address', '192.0.2.1'])
+ self.cli_set(dst_path + ['rule', rule, 'translation', 'port', port])
if int(rule) < 200:
- self.session.set(dst_path + ['rule', rule, 'protocol', inbound_proto_100])
- self.session.set(dst_path + ['rule', rule, 'inbound-interface', inbound_iface_100])
+ self.cli_set(dst_path + ['rule', rule, 'protocol', inbound_proto_100])
+ self.cli_set(dst_path + ['rule', rule, 'inbound-interface', inbound_iface_100])
else:
- self.session.set(dst_path + ['rule', rule, 'protocol', inbound_proto_200])
- self.session.set(dst_path + ['rule', rule, 'inbound-interface', inbound_iface_200])
+ self.cli_set(dst_path + ['rule', rule, 'protocol', inbound_proto_200])
+ self.cli_set(dst_path + ['rule', rule, 'inbound-interface', inbound_iface_200])
- self.session.commit()
+ self.cli_commit()
tmp = cmd('sudo nft -j list table nat')
data_json = jmespath.search('nftables[?rule].rule[?chain]', json.loads(tmp))
@@ -141,31 +141,31 @@ class TestNAT(unittest.TestCase):
def test_snat_required_translation_address(self):
# T2813: Ensure translation address is specified
rule = '5'
- self.session.set(src_path + ['rule', rule, 'source', 'address', '192.0.2.0/24'])
+ self.cli_set(src_path + ['rule', rule, 'source', 'address', '192.0.2.0/24'])
# check validate() - outbound-interface must be defined
with self.assertRaises(ConfigSessionError):
- self.session.commit()
- self.session.set(src_path + ['rule', rule, 'outbound-interface', 'eth0'])
+ self.cli_commit()
+ self.cli_set(src_path + ['rule', rule, 'outbound-interface', 'eth0'])
# check validate() - translation address not specified
with self.assertRaises(ConfigSessionError):
- self.session.commit()
+ self.cli_commit()
- self.session.set(src_path + ['rule', rule, 'translation', 'address', 'masquerade'])
- self.session.commit()
+ self.cli_set(src_path + ['rule', rule, 'translation', 'address', 'masquerade'])
+ self.cli_commit()
def test_dnat_negated_addresses(self):
# T3186: negated addresses are not accepted by nftables
rule = '1000'
- self.session.set(dst_path + ['rule', rule, 'destination', 'address', '!192.0.2.1'])
- self.session.set(dst_path + ['rule', rule, 'destination', 'port', '53'])
- self.session.set(dst_path + ['rule', rule, 'inbound-interface', 'eth0'])
- self.session.set(dst_path + ['rule', rule, 'protocol', 'tcp_udp'])
- self.session.set(dst_path + ['rule', rule, 'source', 'address', '!192.0.2.1'])
- self.session.set(dst_path + ['rule', rule, 'translation', 'address', '192.0.2.1'])
- self.session.set(dst_path + ['rule', rule, 'translation', 'port', '53'])
- self.session.commit()
+ self.cli_set(dst_path + ['rule', rule, 'destination', 'address', '!192.0.2.1'])
+ self.cli_set(dst_path + ['rule', rule, 'destination', 'port', '53'])
+ self.cli_set(dst_path + ['rule', rule, 'inbound-interface', 'eth0'])
+ self.cli_set(dst_path + ['rule', rule, 'protocol', 'tcp_udp'])
+ self.cli_set(dst_path + ['rule', rule, 'source', 'address', '!192.0.2.1'])
+ self.cli_set(dst_path + ['rule', rule, 'translation', 'address', '192.0.2.1'])
+ self.cli_set(dst_path + ['rule', rule, 'translation', 'port', '53'])
+ self.cli_commit()
def test_nat_no_rules(self):
# T3206: deleting all rules but keep the direction 'destination' or
@@ -173,9 +173,9 @@ class TestNAT(unittest.TestCase):
#
# Test that both 'nat destination' and 'nat source' nodes can exist
# without any rule
- self.session.set(src_path)
- self.session.set(dst_path)
- self.session.commit()
+ self.cli_set(src_path)
+ self.cli_set(dst_path)
+ self.cli_commit()
if __name__ == '__main__':
diff --git a/smoketest/scripts/cli/test_policy.py b/smoketest/scripts/cli/test_policy.py
index 1a18c8191..cdd2ad820 100755
--- a/smoketest/scripts/cli/test_policy.py
+++ b/smoketest/scripts/cli/test_policy.py
@@ -14,26 +14,20 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-import os
import unittest
-from vyos.util import cmd
+from base_vyostest_shim import VyOSUnitTestSHIM
+
from vyos.configsession import ConfigSession
from vyos.configsession import ConfigSessionError
+from vyos.util import cmd
base_path = ['policy']
-def getFRRconfig(section):
- return cmd(f'vtysh -c "show run" | sed -n "/^{section}/,/^!/p"')
-
-class TestPolicy(unittest.TestCase):
- def setUp(self):
- self.session = ConfigSession(os.getpid())
-
+class TestPolicy(VyOSUnitTestSHIM.TestCase):
def tearDown(self):
- self.session.delete(base_path)
- self.session.commit()
- del self.session
+ self.cli_delete(base_path)
+ self.cli_commit()
def test_access_list(self):
acls = {
@@ -101,25 +95,25 @@ class TestPolicy(unittest.TestCase):
for acl, acl_config in acls.items():
path = base_path + ['access-list', acl]
- self.session.set(path + ['description', f'VyOS-ACL-{acl}'])
+ self.cli_set(path + ['description', f'VyOS-ACL-{acl}'])
if 'rule' not in acl_config:
continue
for rule, rule_config in acl_config['rule'].items():
- self.session.set(path + ['rule', rule, 'action', rule_config['action']])
+ self.cli_set(path + ['rule', rule, 'action', rule_config['action']])
for direction in ['source', 'destination']:
if direction in rule_config:
if 'any' in rule_config[direction]:
- self.session.set(path + ['rule', rule, direction, 'any'])
+ self.cli_set(path + ['rule', rule, direction, 'any'])
if 'host' in rule_config[direction]:
- self.session.set(path + ['rule', rule, direction, 'host', rule_config[direction]['host']])
+ self.cli_set(path + ['rule', rule, direction, 'host', rule_config[direction]['host']])
if 'network' in rule_config[direction]:
- self.session.set(path + ['rule', rule, direction, 'network', rule_config[direction]['network']])
- self.session.set(path + ['rule', rule, direction, 'inverse-mask', rule_config[direction]['inverse-mask']])
+ self.cli_set(path + ['rule', rule, direction, 'network', rule_config[direction]['network']])
+ self.cli_set(path + ['rule', rule, direction, 'inverse-mask', rule_config[direction]['inverse-mask']])
- self.session.commit()
+ self.cli_commit()
- config = getFRRconfig('access-list')
+ config = self.getFRRconfig('access-list', end='')
for acl, acl_config in acls.items():
seq = '5'
for rule, rule_config in acl_config['rule'].items():
@@ -190,24 +184,24 @@ class TestPolicy(unittest.TestCase):
for acl, acl_config in acls.items():
path = base_path + ['access-list6', acl]
- self.session.set(path + ['description', f'VyOS-ACL-{acl}'])
+ self.cli_set(path + ['description', f'VyOS-ACL-{acl}'])
if 'rule' not in acl_config:
continue
for rule, rule_config in acl_config['rule'].items():
- self.session.set(path + ['rule', rule, 'action', rule_config['action']])
+ self.cli_set(path + ['rule', rule, 'action', rule_config['action']])
for direction in ['source', 'destination']:
if direction in rule_config:
if 'any' in rule_config[direction]:
- self.session.set(path + ['rule', rule, direction, 'any'])
+ self.cli_set(path + ['rule', rule, direction, 'any'])
if 'network' in rule_config[direction]:
- self.session.set(path + ['rule', rule, direction, 'network', rule_config[direction]['network']])
+ self.cli_set(path + ['rule', rule, direction, 'network', rule_config[direction]['network']])
if 'exact-match' in rule_config[direction]:
- self.session.set(path + ['rule', rule, direction, 'exact-match'])
+ self.cli_set(path + ['rule', rule, direction, 'exact-match'])
- self.session.commit()
+ self.cli_commit()
- config = getFRRconfig('ipv6 access-list')
+ config = self.getFRRconfig('ipv6 access-list', end='')
for acl, acl_config in acls.items():
seq = '5'
for rule, rule_config in acl_config['rule'].items():
@@ -295,19 +289,19 @@ class TestPolicy(unittest.TestCase):
for as_path, as_path_config in test_data.items():
path = base_path + ['as-path-list', as_path]
- self.session.set(path + ['description', f'VyOS-ASPATH-{as_path}'])
+ self.cli_set(path + ['description', f'VyOS-ASPATH-{as_path}'])
if 'rule' not in as_path_config:
continue
for rule, rule_config in as_path_config['rule'].items():
if 'action' in rule_config:
- self.session.set(path + ['rule', rule, 'action', rule_config['action']])
+ self.cli_set(path + ['rule', rule, 'action', rule_config['action']])
if 'regex' in rule_config:
- self.session.set(path + ['rule', rule, 'regex', rule_config['regex']])
+ self.cli_set(path + ['rule', rule, 'regex', rule_config['regex']])
- self.session.commit()
+ self.cli_commit()
- config = getFRRconfig('bgp as-path access-list')
+ config = self.getFRRconfig('bgp as-path access-list', end='')
for as_path, as_path_config in test_data.items():
if 'rule' not in as_path_config:
continue
@@ -353,19 +347,19 @@ class TestPolicy(unittest.TestCase):
for comm_list, comm_list_config in test_data.items():
path = base_path + ['community-list', comm_list]
- self.session.set(path + ['description', f'VyOS-COMM-{comm_list}'])
+ self.cli_set(path + ['description', f'VyOS-COMM-{comm_list}'])
if 'rule' not in comm_list_config:
continue
for rule, rule_config in comm_list_config['rule'].items():
if 'action' in rule_config:
- self.session.set(path + ['rule', rule, 'action', rule_config['action']])
+ self.cli_set(path + ['rule', rule, 'action', rule_config['action']])
if 'regex' in rule_config:
- self.session.set(path + ['rule', rule, 'regex', rule_config['regex']])
+ self.cli_set(path + ['rule', rule, 'regex', rule_config['regex']])
- self.session.commit()
+ self.cli_commit()
- config = getFRRconfig('bgp community-list')
+ config = self.getFRRconfig('bgp community-list', end='')
for comm_list, comm_list_config in test_data.items():
if 'rule' not in comm_list_config:
continue
@@ -411,19 +405,19 @@ class TestPolicy(unittest.TestCase):
for comm_list, comm_list_config in test_data.items():
path = base_path + ['extcommunity-list', comm_list]
- self.session.set(path + ['description', f'VyOS-EXTCOMM-{comm_list}'])
+ self.cli_set(path + ['description', f'VyOS-EXTCOMM-{comm_list}'])
if 'rule' not in comm_list_config:
continue
for rule, rule_config in comm_list_config['rule'].items():
if 'action' in rule_config:
- self.session.set(path + ['rule', rule, 'action', rule_config['action']])
+ self.cli_set(path + ['rule', rule, 'action', rule_config['action']])
if 'regex' in rule_config:
- self.session.set(path + ['rule', rule, 'regex', rule_config['regex']])
+ self.cli_set(path + ['rule', rule, 'regex', rule_config['regex']])
- self.session.commit()
+ self.cli_commit()
- config = getFRRconfig('bgp extcommunity-list')
+ config = self.getFRRconfig('bgp extcommunity-list', end='')
for comm_list, comm_list_config in test_data.items():
if 'rule' not in comm_list_config:
continue
@@ -476,19 +470,19 @@ class TestPolicy(unittest.TestCase):
for comm_list, comm_list_config in test_data.items():
path = base_path + ['large-community-list', comm_list]
- self.session.set(path + ['description', f'VyOS-LARGECOMM-{comm_list}'])
+ self.cli_set(path + ['description', f'VyOS-LARGECOMM-{comm_list}'])
if 'rule' not in comm_list_config:
continue
for rule, rule_config in comm_list_config['rule'].items():
if 'action' in rule_config:
- self.session.set(path + ['rule', rule, 'action', rule_config['action']])
+ self.cli_set(path + ['rule', rule, 'action', rule_config['action']])
if 'regex' in rule_config:
- self.session.set(path + ['rule', rule, 'regex', rule_config['regex']])
+ self.cli_set(path + ['rule', rule, 'regex', rule_config['regex']])
- self.session.commit()
+ self.cli_commit()
- config = getFRRconfig('bgp large-community-list')
+ config = self.getFRRconfig('bgp large-community-list', end='')
for comm_list, comm_list_config in test_data.items():
if 'rule' not in comm_list_config:
continue
@@ -550,23 +544,23 @@ class TestPolicy(unittest.TestCase):
for prefix_list, prefix_list_config in test_data.items():
path = base_path + ['prefix-list', prefix_list]
- self.session.set(path + ['description', f'VyOS-PFX-LIST-{prefix_list}'])
+ self.cli_set(path + ['description', f'VyOS-PFX-LIST-{prefix_list}'])
if 'rule' not in prefix_list_config:
continue
for rule, rule_config in prefix_list_config['rule'].items():
if 'action' in rule_config:
- self.session.set(path + ['rule', rule, 'action', rule_config['action']])
+ self.cli_set(path + ['rule', rule, 'action', rule_config['action']])
if 'prefix' in rule_config:
- self.session.set(path + ['rule', rule, 'prefix', rule_config['prefix']])
+ self.cli_set(path + ['rule', rule, 'prefix', rule_config['prefix']])
if 'ge' in rule_config:
- self.session.set(path + ['rule', rule, 'ge', rule_config['ge']])
+ self.cli_set(path + ['rule', rule, 'ge', rule_config['ge']])
if 'le' in rule_config:
- self.session.set(path + ['rule', rule, 'le', rule_config['le']])
+ self.cli_set(path + ['rule', rule, 'le', rule_config['le']])
- self.session.commit()
+ self.cli_commit()
- config = getFRRconfig('ip prefix-list')
+ config = self.getFRRconfig('ip prefix-list', end='')
for prefix_list, prefix_list_config in test_data.items():
if 'rule' not in prefix_list_config:
continue
@@ -633,23 +627,23 @@ class TestPolicy(unittest.TestCase):
for prefix_list, prefix_list_config in test_data.items():
path = base_path + ['prefix-list6', prefix_list]
- self.session.set(path + ['description', f'VyOS-PFX-LIST-{prefix_list}'])
+ self.cli_set(path + ['description', f'VyOS-PFX-LIST-{prefix_list}'])
if 'rule' not in prefix_list_config:
continue
for rule, rule_config in prefix_list_config['rule'].items():
if 'action' in rule_config:
- self.session.set(path + ['rule', rule, 'action', rule_config['action']])
+ self.cli_set(path + ['rule', rule, 'action', rule_config['action']])
if 'prefix' in rule_config:
- self.session.set(path + ['rule', rule, 'prefix', rule_config['prefix']])
+ self.cli_set(path + ['rule', rule, 'prefix', rule_config['prefix']])
if 'ge' in rule_config:
- self.session.set(path + ['rule', rule, 'ge', rule_config['ge']])
+ self.cli_set(path + ['rule', rule, 'ge', rule_config['ge']])
if 'le' in rule_config:
- self.session.set(path + ['rule', rule, 'le', rule_config['le']])
+ self.cli_set(path + ['rule', rule, 'le', rule_config['le']])
- self.session.commit()
+ self.cli_commit()
- config = getFRRconfig('ipv6 prefix-list')
+ config = self.getFRRconfig('ipv6 prefix-list', end='')
for prefix_list, prefix_list_config in test_data.items():
if 'rule' not in prefix_list_config:
continue
@@ -671,5 +665,32 @@ class TestPolicy(unittest.TestCase):
self.assertIn(tmp, config)
+
+ # Test set table for some sources
+ def test_table_id(self):
+ path = base_path + ['local-route']
+
+ sources = ['203.0.113.1', '203.0.113.2']
+ rule = '50'
+ table = '23'
+ for src in sources:
+ self.cli_set(path + ['rule', rule, 'set', 'table', table])
+ self.cli_set(path + ['rule', rule, 'source', src])
+
+ self.cli_commit()
+
+ # Check generated configuration
+
+ # Expected values
+ original = """
+ 50: from 203.0.113.1 lookup 23
+ 50: from 203.0.113.2 lookup 23
+ """
+ tmp = cmd('ip rule show prio 50')
+ original = original.split()
+ tmp = tmp.split()
+
+ self.assertEqual(tmp, original)
+
if __name__ == '__main__':
unittest.main(verbosity=2)
diff --git a/smoketest/scripts/cli/test_policy_local-route.py b/smoketest/scripts/cli/test_policy_local-route.py
index de1882a65..c742a930b 100755
--- a/smoketest/scripts/cli/test_policy_local-route.py
+++ b/smoketest/scripts/cli/test_policy_local-route.py
@@ -17,21 +17,24 @@
import os
import unittest
+from base_vyostest_shim import VyOSUnitTestSHIM
+
from vyos.configsession import ConfigSession
from vyos.configsession import ConfigSessionError
from vyos.util import cmd
from vyos.util import process_named_running
-class PolicyLocalRouteTest(unittest.TestCase):
- def setUp(self):
- self.session = ConfigSession(os.getpid())
- self._sources = ['203.0.113.1', '203.0.113.2']
+class PolicyLocalRouteTest(VyOSUnitTestSHIM.TestCase):
+ @classmethod
+ def setUpClass(cls):
+ cls._sources = ['203.0.113.1', '203.0.113.2']
+ # call base-classes classmethod
+ super(cls, cls).setUpClass()
def tearDown(self):
# Delete all policies
- self.session.delete(['policy', 'local-route'])
- self.session.commit()
- del self.session
+ self.cli_delete(['policy', 'local-route'])
+ self.cli_commit()
# Test set table for some sources
def test_table_id(self):
@@ -39,10 +42,10 @@ class PolicyLocalRouteTest(unittest.TestCase):
rule = '50'
table = '23'
for src in self._sources:
- self.session.set(base + ['rule', rule, 'set', 'table', table])
- self.session.set(base + ['rule', rule, 'source', src])
+ self.cli_set(base + ['rule', rule, 'set', 'table', table])
+ self.cli_set(base + ['rule', rule, 'source', src])
- self.session.commit()
+ self.cli_commit()
# Check generated configuration
diff --git a/smoketest/scripts/cli/test_protocols_bfd.py b/smoketest/scripts/cli/test_protocols_bfd.py
index 7f17c12ff..0c4ed86d7 100755
--- a/smoketest/scripts/cli/test_protocols_bfd.py
+++ b/smoketest/scripts/cli/test_protocols_bfd.py
@@ -17,6 +17,8 @@
import os
import unittest
+from base_vyostest_shim import VyOSUnitTestSHIM
+
from vyos.configsession import ConfigSession
from vyos.configsession import ConfigSessionError
from vyos.util import cmd
@@ -58,17 +60,15 @@ def getFRRconfig():
def getBFDPeerconfig(peer):
return cmd(f'vtysh -c "show run" | sed -n "/^ {peer}/,/^!/p"')
-class TestProtocolsBFD(unittest.TestCase):
+class TestProtocolsBFD(VyOSUnitTestSHIM.TestCase):
def setUp(self):
- self.session = ConfigSession(os.getpid())
- self.session.set(['interfaces', 'dummy', dum_if, 'address', '192.0.2.1/24'])
- self.session.set(['interfaces', 'dummy', dum_if, 'address', '2001:db8::1/64'])
+ self.cli_set(['interfaces', 'dummy', dum_if, 'address', '192.0.2.1/24'])
+ self.cli_set(['interfaces', 'dummy', dum_if, 'address', '2001:db8::1/64'])
def tearDown(self):
- self.session.delete(['interfaces', 'dummy', dum_if])
- self.session.delete(base_path)
- self.session.commit()
- del self.session
+ self.cli_delete(['interfaces', 'dummy', dum_if])
+ self.cli_delete(base_path)
+ self.cli_commit()
# Check for running process
self.assertTrue(process_named_running(PROCESS_NAME))
@@ -76,26 +76,26 @@ class TestProtocolsBFD(unittest.TestCase):
def test_bfd_simple(self):
for peer, peer_config in neighbor_config.items():
if 'echo_mode' in peer_config:
- self.session.set(base_path + ['peer', peer, 'echo-mode'])
+ self.cli_set(base_path + ['peer', peer, 'echo-mode'])
if 'intv_echo' in peer_config:
- self.session.set(base_path + ['peer', peer, 'interval', 'echo-interval', peer_config["intv_echo"]])
+ self.cli_set(base_path + ['peer', peer, 'interval', 'echo-interval', peer_config["intv_echo"]])
if 'intv_mult' in peer_config:
- self.session.set(base_path + ['peer', peer, 'interval', 'multiplier', peer_config["intv_mult"]])
+ self.cli_set(base_path + ['peer', peer, 'interval', 'multiplier', peer_config["intv_mult"]])
if 'intv_rx' in peer_config:
- self.session.set(base_path + ['peer', peer, 'interval', 'receive', peer_config["intv_rx"]])
+ self.cli_set(base_path + ['peer', peer, 'interval', 'receive', peer_config["intv_rx"]])
if 'intv_tx' in peer_config:
- self.session.set(base_path + ['peer', peer, 'interval', 'transmit', peer_config["intv_tx"]])
+ self.cli_set(base_path + ['peer', peer, 'interval', 'transmit', peer_config["intv_tx"]])
if 'multihop' in peer_config:
- self.session.set(base_path + ['peer', peer, 'multihop'])
+ self.cli_set(base_path + ['peer', peer, 'multihop'])
if 'shutdown' in peer_config:
- self.session.set(base_path + ['peer', peer, 'shutdown'])
+ self.cli_set(base_path + ['peer', peer, 'shutdown'])
if 'source_addr' in peer_config:
- self.session.set(base_path + ['peer', peer, 'source', 'address', peer_config["source_addr"]])
+ self.cli_set(base_path + ['peer', peer, 'source', 'address', peer_config["source_addr"]])
if 'source_intf' in peer_config:
- self.session.set(base_path + ['peer', peer, 'source', 'interface', peer_config["source_intf"]])
+ self.cli_set(base_path + ['peer', peer, 'source', 'interface', peer_config["source_intf"]])
# commit changes
- self.session.commit()
+ self.cli_commit()
# Verify FRR bgpd configuration
frrconfig = getFRRconfig()
diff --git a/smoketest/scripts/cli/test_protocols_bgp.py b/smoketest/scripts/cli/test_protocols_bgp.py
index 62ef40282..b261e4164 100755
--- a/smoketest/scripts/cli/test_protocols_bgp.py
+++ b/smoketest/scripts/cli/test_protocols_bgp.py
@@ -17,6 +17,8 @@
import os
import unittest
+from base_vyostest_shim import VyOSUnitTestSHIM
+
from vyos.configsession import ConfigSession
from vyos.configsession import ConfigSessionError
from vyos.util import cmd
@@ -87,14 +89,10 @@ peer_group_config = {
def getFRRBGPconfig():
return cmd(f'vtysh -c "show run" | sed -n "/router bgp {ASN}/,/^!/p"')
-class TestProtocolsBGP(unittest.TestCase):
- def setUp(self):
- self.session = ConfigSession(os.getpid())
-
+class TestProtocolsBGP(VyOSUnitTestSHIM.TestCase):
def tearDown(self):
- self.session.delete(base_path)
- self.session.commit()
- del self.session
+ self.cli_delete(base_path)
+ self.cli_commit()
def verify_frr_config(self, peer, peer_config, frrconfig):
# recurring patterns to verify for both a simple neighbor and a peer-group
@@ -129,15 +127,15 @@ class TestProtocolsBGP(unittest.TestCase):
router_id = '127.0.0.1'
local_pref = '500'
- self.session.set(base_path + ['parameters', 'router-id', router_id])
- self.session.set(base_path + ['parameters', 'log-neighbor-changes'])
+ self.cli_set(base_path + ['parameters', 'router-id', router_id])
+ self.cli_set(base_path + ['parameters', 'log-neighbor-changes'])
# Default local preference (higher=more preferred)
- self.session.set(base_path + ['parameters', 'default', 'local-pref', local_pref])
+ self.cli_set(base_path + ['parameters', 'default', 'local-pref', local_pref])
# Deactivate IPv4 unicast for a peer by default
- self.session.set(base_path + ['parameters', 'default', 'no-ipv4-unicast'])
+ self.cli_set(base_path + ['parameters', 'default', 'no-ipv4-unicast'])
# commit changes
- self.session.commit()
+ self.cli_commit()
# Verify FRR bgpd configuration
frrconfig = getFRRBGPconfig()
@@ -155,40 +153,40 @@ class TestProtocolsBGP(unittest.TestCase):
# also available to a peer-group!
for neighbor, config in neighbor_config.items():
if 'adv_interv' in config:
- self.session.set(base_path + ['neighbor', neighbor, 'advertisement-interval', config["adv_interv"]])
+ self.cli_set(base_path + ['neighbor', neighbor, 'advertisement-interval', config["adv_interv"]])
if 'cap_dynamic' in config:
- self.session.set(base_path + ['neighbor', neighbor, 'capability', 'dynamic'])
+ self.cli_set(base_path + ['neighbor', neighbor, 'capability', 'dynamic'])
if 'cap_ext_next' in config:
- self.session.set(base_path + ['neighbor', neighbor, 'capability', 'extended-nexthop'])
+ self.cli_set(base_path + ['neighbor', neighbor, 'capability', 'extended-nexthop'])
if 'description' in config:
- self.session.set(base_path + ['neighbor', neighbor, 'description', config["description"]])
+ self.cli_set(base_path + ['neighbor', neighbor, 'description', config["description"]])
if 'no_cap_nego' in config:
- self.session.set(base_path + ['neighbor', neighbor, 'disable-capability-negotiation'])
+ self.cli_set(base_path + ['neighbor', neighbor, 'disable-capability-negotiation'])
if 'multi_hop' in config:
- self.session.set(base_path + ['neighbor', neighbor, 'ebgp-multihop', config["multi_hop"]])
+ self.cli_set(base_path + ['neighbor', neighbor, 'ebgp-multihop', config["multi_hop"]])
if 'local_as' in config:
- self.session.set(base_path + ['neighbor', neighbor, 'local-as', config["local_as"]])
+ self.cli_set(base_path + ['neighbor', neighbor, 'local-as', config["local_as"]])
if 'cap_over' in config:
- self.session.set(base_path + ['neighbor', neighbor, 'override-capability'])
+ self.cli_set(base_path + ['neighbor', neighbor, 'override-capability'])
if 'passive' in config:
- self.session.set(base_path + ['neighbor', neighbor, 'passive'])
+ self.cli_set(base_path + ['neighbor', neighbor, 'passive'])
if 'password' in config:
- self.session.set(base_path + ['neighbor', neighbor, 'password', config["password"]])
+ self.cli_set(base_path + ['neighbor', neighbor, 'password', config["password"]])
if 'port' in config:
- self.session.set(base_path + ['neighbor', neighbor, 'port', config["port"]])
+ self.cli_set(base_path + ['neighbor', neighbor, 'port', config["port"]])
if 'remote_as' in config:
- self.session.set(base_path + ['neighbor', neighbor, 'remote-as', config["remote_as"]])
+ self.cli_set(base_path + ['neighbor', neighbor, 'remote-as', config["remote_as"]])
if 'cap_strict' in config:
- self.session.set(base_path + ['neighbor', neighbor, 'strict-capability-match'])
+ self.cli_set(base_path + ['neighbor', neighbor, 'strict-capability-match'])
if 'shutdown' in config:
- self.session.set(base_path + ['neighbor', neighbor, 'shutdown'])
+ self.cli_set(base_path + ['neighbor', neighbor, 'shutdown'])
if 'ttl_security' in config:
- self.session.set(base_path + ['neighbor', neighbor, 'ttl-security', 'hops', config["ttl_security"]])
+ self.cli_set(base_path + ['neighbor', neighbor, 'ttl-security', 'hops', config["ttl_security"]])
if 'update_src' in config:
- self.session.set(base_path + ['neighbor', neighbor, 'update-source', config["update_src"]])
+ self.cli_set(base_path + ['neighbor', neighbor, 'update-source', config["update_src"]])
# commit changes
- self.session.commit()
+ self.cli_commit()
# Verify FRR bgpd configuration
frrconfig = getFRRBGPconfig()
@@ -208,34 +206,34 @@ class TestProtocolsBGP(unittest.TestCase):
# Test out individual peer-group configuration items
for peer_group, config in peer_group_config.items():
if 'cap_dynamic' in config:
- self.session.set(base_path + ['peer-group', peer_group, 'capability', 'dynamic'])
+ self.cli_set(base_path + ['peer-group', peer_group, 'capability', 'dynamic'])
if 'cap_ext_next' in config:
- self.session.set(base_path + ['peer-group', peer_group, 'capability', 'extended-nexthop'])
+ self.cli_set(base_path + ['peer-group', peer_group, 'capability', 'extended-nexthop'])
if 'description' in config:
- self.session.set(base_path + ['peer-group', peer_group, 'description', config["description"]])
+ self.cli_set(base_path + ['peer-group', peer_group, 'description', config["description"]])
if 'no_cap_nego' in config:
- self.session.set(base_path + ['peer-group', peer_group, 'disable-capability-negotiation'])
+ self.cli_set(base_path + ['peer-group', peer_group, 'disable-capability-negotiation'])
if 'multi_hop' in config:
- self.session.set(base_path + ['peer-group', peer_group, 'ebgp-multihop', config["multi_hop"]])
+ self.cli_set(base_path + ['peer-group', peer_group, 'ebgp-multihop', config["multi_hop"]])
if 'local_as' in config:
- self.session.set(base_path + ['peer-group', peer_group, 'local-as', config["local_as"]])
+ self.cli_set(base_path + ['peer-group', peer_group, 'local-as', config["local_as"]])
if 'cap_over' in config:
- self.session.set(base_path + ['peer-group', peer_group, 'override-capability'])
+ self.cli_set(base_path + ['peer-group', peer_group, 'override-capability'])
if 'passive' in config:
- self.session.set(base_path + ['peer-group', peer_group, 'passive'])
+ self.cli_set(base_path + ['peer-group', peer_group, 'passive'])
if 'password' in config:
- self.session.set(base_path + ['peer-group', peer_group, 'password', config["password"]])
+ self.cli_set(base_path + ['peer-group', peer_group, 'password', config["password"]])
if 'remote_as' in config:
- self.session.set(base_path + ['peer-group', peer_group, 'remote-as', config["remote_as"]])
+ self.cli_set(base_path + ['peer-group', peer_group, 'remote-as', config["remote_as"]])
if 'shutdown' in config:
- self.session.set(base_path + ['peer-group', peer_group, 'shutdown'])
+ self.cli_set(base_path + ['peer-group', peer_group, 'shutdown'])
if 'ttl_security' in config:
- self.session.set(base_path + ['peer-group', peer_group, 'ttl-security', 'hops', config["ttl_security"]])
+ self.cli_set(base_path + ['peer-group', peer_group, 'ttl-security', 'hops', config["ttl_security"]])
if 'update_src' in config:
- self.session.set(base_path + ['peer-group', peer_group, 'update-source', config["update_src"]])
+ self.cli_set(base_path + ['peer-group', peer_group, 'update-source', config["update_src"]])
# commit changes
- self.session.commit()
+ self.cli_commit()
# Verify FRR bgpd configuration
frrconfig = getFRRBGPconfig()
@@ -262,21 +260,21 @@ class TestProtocolsBGP(unittest.TestCase):
# We want to redistribute ...
redistributes = ['connected', 'isis', 'kernel', 'ospf', 'rip', 'static']
for redistribute in redistributes:
- self.session.set(base_path + ['address-family', 'ipv4-unicast',
+ self.cli_set(base_path + ['address-family', 'ipv4-unicast',
'redistribute', redistribute])
for network, network_config in networks.items():
- self.session.set(base_path + ['address-family', 'ipv4-unicast',
+ self.cli_set(base_path + ['address-family', 'ipv4-unicast',
'network', network])
if 'as_set' in network_config:
- self.session.set(base_path + ['address-family', 'ipv4-unicast',
+ self.cli_set(base_path + ['address-family', 'ipv4-unicast',
'aggregate-address', network, 'as-set'])
if 'summary_only' in network_config:
- self.session.set(base_path + ['address-family', 'ipv4-unicast',
+ self.cli_set(base_path + ['address-family', 'ipv4-unicast',
'aggregate-address', network, 'summary-only'])
# commit changes
- self.session.commit()
+ self.cli_commit()
# Verify FRR bgpd configuration
frrconfig = getFRRBGPconfig()
@@ -308,18 +306,18 @@ class TestProtocolsBGP(unittest.TestCase):
# We want to redistribute ...
redistributes = ['connected', 'kernel', 'ospfv3', 'ripng', 'static']
for redistribute in redistributes:
- self.session.set(base_path + ['address-family', 'ipv6-unicast',
+ self.cli_set(base_path + ['address-family', 'ipv6-unicast',
'redistribute', redistribute])
for network, network_config in networks.items():
- self.session.set(base_path + ['address-family', 'ipv6-unicast',
+ self.cli_set(base_path + ['address-family', 'ipv6-unicast',
'network', network])
if 'summary_only' in network_config:
- self.session.set(base_path + ['address-family', 'ipv6-unicast',
+ self.cli_set(base_path + ['address-family', 'ipv6-unicast',
'aggregate-address', network, 'summary-only'])
# commit changes
- self.session.commit()
+ self.cli_commit()
# Verify FRR bgpd configuration
frrconfig = getFRRBGPconfig()
diff --git a/smoketest/scripts/cli/test_protocols_igmp-proxy.py b/smoketest/scripts/cli/test_protocols_igmp-proxy.py
index 6aaad739d..1eaf21722 100755
--- a/smoketest/scripts/cli/test_protocols_igmp-proxy.py
+++ b/smoketest/scripts/cli/test_protocols_igmp-proxy.py
@@ -14,9 +14,10 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-import os
import unittest
+from base_vyostest_shim import VyOSUnitTestSHIM
+
from vyos.configsession import ConfigSession
from vyos.configsession import ConfigSessionError
from vyos.util import read_file
@@ -28,46 +29,44 @@ base_path = ['protocols', 'igmp-proxy']
upstream_if = 'eth1'
downstream_if = 'eth2'
-class TestProtocolsIGMPProxy(unittest.TestCase):
+class TestProtocolsIGMPProxy(VyOSUnitTestSHIM.TestCase):
def setUp(self):
- self.session = ConfigSession(os.getpid())
- self.session.set(['interfaces', 'ethernet', upstream_if, 'address', '172.16.1.1/24'])
+ self.cli_set(['interfaces', 'ethernet', upstream_if, 'address', '172.16.1.1/24'])
def tearDown(self):
- self.session.delete(['interfaces', 'ethernet', upstream_if, 'address'])
- self.session.delete(base_path)
- self.session.commit()
- del self.session
+ self.cli_delete(['interfaces', 'ethernet', upstream_if, 'address'])
+ self.cli_delete(base_path)
+ self.cli_commit()
def test_igmpproxy(self):
threshold = '20'
altnet = '192.0.2.0/24'
whitelist = '10.0.0.0/8'
- self.session.set(base_path + ['disable-quickleave'])
- self.session.set(base_path + ['interface', upstream_if, 'threshold', threshold])
- self.session.set(base_path + ['interface', upstream_if, 'alt-subnet', altnet])
- self.session.set(base_path + ['interface', upstream_if, 'whitelist', whitelist])
+ self.cli_set(base_path + ['disable-quickleave'])
+ self.cli_set(base_path + ['interface', upstream_if, 'threshold', threshold])
+ self.cli_set(base_path + ['interface', upstream_if, 'alt-subnet', altnet])
+ self.cli_set(base_path + ['interface', upstream_if, 'whitelist', whitelist])
# Must define an upstream and at least 1 downstream interface!
with self.assertRaises(ConfigSessionError):
- self.session.commit()
- self.session.set(base_path + ['interface', upstream_if, 'role', 'upstream'])
+ self.cli_commit()
+ self.cli_set(base_path + ['interface', upstream_if, 'role', 'upstream'])
# Interface does not exist
- self.session.set(base_path + ['interface', 'eth20', 'role', 'upstream'])
+ self.cli_set(base_path + ['interface', 'eth20', 'role', 'upstream'])
with self.assertRaises(ConfigSessionError):
- self.session.commit()
- self.session.delete(base_path + ['interface', 'eth20'])
+ self.cli_commit()
+ self.cli_delete(base_path + ['interface', 'eth20'])
# Only 1 upstream interface allowed
- self.session.set(base_path + ['interface', downstream_if, 'role', 'upstream'])
+ self.cli_set(base_path + ['interface', downstream_if, 'role', 'upstream'])
with self.assertRaises(ConfigSessionError):
- self.session.commit()
- self.session.set(base_path + ['interface', downstream_if, 'role', 'downstream'])
+ self.cli_commit()
+ self.cli_set(base_path + ['interface', downstream_if, 'role', 'downstream'])
# commit changes
- self.session.commit()
+ self.cli_commit()
# Check generated configuration
config = read_file(IGMP_PROXY_CONF)
diff --git a/smoketest/scripts/cli/test_protocols_ospfv3.py b/smoketest/scripts/cli/test_protocols_ospfv3.py
index 297d5d996..c4eb3fdd8 100755
--- a/smoketest/scripts/cli/test_protocols_ospfv3.py
+++ b/smoketest/scripts/cli/test_protocols_ospfv3.py
@@ -17,6 +17,8 @@
import os
import unittest
+from base_vyostest_shim import VyOSUnitTestSHIM
+
from vyos.configsession import ConfigSession
from vyos.ifconfig import Section
from vyos.util import cmd
@@ -29,17 +31,13 @@ base_path = ['protocols', 'ospfv3']
def getFRROSPFconfig():
return cmd('vtysh -c "show run" | sed -n "/router ospf6/,/^!/p"')
-class TestProtocolsOSPFv3(unittest.TestCase):
- def setUp(self):
- self.session = ConfigSession(os.getpid())
-
+class TestProtocolsOSPFv3(VyOSUnitTestSHIM.TestCase):
def tearDown(self):
# Check for running process
self.assertTrue(process_named_running(PROCESS_NAME))
- self.session.delete(base_path)
- self.session.commit()
- del self.session
+ self.cli_delete(base_path)
+ self.cli_commit()
def test_ospfv3_01_basic(self):
@@ -49,20 +47,20 @@ class TestProtocolsOSPFv3(unittest.TestCase):
acl_name = 'foo-acl-100'
router_id = '192.0.2.1'
- self.session.set(['policy', 'access-list6', acl_name, 'rule', seq, 'action', 'permit'])
- self.session.set(['policy', 'access-list6', acl_name, 'rule', seq, 'source', 'any'])
+ self.cli_set(['policy', 'access-list6', acl_name, 'rule', seq, 'action', 'permit'])
+ self.cli_set(['policy', 'access-list6', acl_name, 'rule', seq, 'source', 'any'])
- self.session.set(base_path + ['parameters', 'router-id', router_id])
- self.session.set(base_path + ['area', area, 'range', prefix, 'advertise'])
- self.session.set(base_path + ['area', area, 'export-list', acl_name])
- self.session.set(base_path + ['area', area, 'import-list', acl_name])
+ self.cli_set(base_path + ['parameters', 'router-id', router_id])
+ self.cli_set(base_path + ['area', area, 'range', prefix, 'advertise'])
+ self.cli_set(base_path + ['area', area, 'export-list', acl_name])
+ self.cli_set(base_path + ['area', area, 'import-list', acl_name])
interfaces = Section.interfaces('ethernet')
for interface in interfaces:
- self.session.set(base_path + ['area', area, 'interface', interface])
+ self.cli_set(base_path + ['area', area, 'interface', interface])
# commit changes
- self.session.commit()
+ self.cli_commit()
# Verify FRR ospfd configuration
frrconfig = getFRROSPFconfig()
@@ -75,7 +73,7 @@ class TestProtocolsOSPFv3(unittest.TestCase):
for interface in interfaces:
self.assertIn(f' interface {interface} area {area}', frrconfig)
- self.session.delete(['policy', 'access-list6', acl_name])
+ self.cli_delete(['policy', 'access-list6', acl_name])
def test_ospfv3_02_distance(self):
@@ -84,13 +82,13 @@ class TestProtocolsOSPFv3(unittest.TestCase):
dist_inter_area = '120'
dist_intra_area = '130'
- self.session.set(base_path + ['distance', 'global', dist_global])
- self.session.set(base_path + ['distance', 'ospfv3', 'external', dist_external])
- self.session.set(base_path + ['distance', 'ospfv3', 'inter-area', dist_inter_area])
- self.session.set(base_path + ['distance', 'ospfv3', 'intra-area', dist_intra_area])
+ self.cli_set(base_path + ['distance', 'global', dist_global])
+ self.cli_set(base_path + ['distance', 'ospfv3', 'external', dist_external])
+ self.cli_set(base_path + ['distance', 'ospfv3', 'inter-area', dist_inter_area])
+ self.cli_set(base_path + ['distance', 'ospfv3', 'intra-area', dist_intra_area])
# commit changes
- self.session.commit()
+ self.cli_commit()
# Verify FRR ospfd configuration
frrconfig = getFRROSPFconfig()
@@ -104,13 +102,13 @@ class TestProtocolsOSPFv3(unittest.TestCase):
route_map_seq = '10'
redistribute = ['bgp', 'connected', 'kernel', 'ripng', 'static']
- self.session.set(['policy', 'route-map', route_map, 'rule', route_map_seq, 'action', 'permit'])
+ self.cli_set(['policy', 'route-map', route_map, 'rule', route_map_seq, 'action', 'permit'])
for protocol in redistribute:
- self.session.set(base_path + ['redistribute', protocol, 'route-map', route_map])
+ self.cli_set(base_path + ['redistribute', protocol, 'route-map', route_map])
# commit changes
- self.session.commit()
+ self.cli_commit()
# Verify FRR ospfd configuration
frrconfig = getFRROSPFconfig()
diff --git a/smoketest/scripts/cli/test_protocols_rip.py b/smoketest/scripts/cli/test_protocols_rip.py
index 4b3674960..6f2028f2b 100755
--- a/smoketest/scripts/cli/test_protocols_rip.py
+++ b/smoketest/scripts/cli/test_protocols_rip.py
@@ -17,6 +17,8 @@
import os
import unittest
+from base_vyostest_shim import VyOSUnitTestSHIM
+
from vyos.configsession import ConfigSession
from vyos.ifconfig import Section
from vyos.util import cmd
@@ -34,32 +36,29 @@ base_path = ['protocols', 'rip']
def getFRRconfig():
return cmd('vtysh -c "show run" | sed -n "/router rip/,/^!/p"')
-class TestProtocolsRIP(unittest.TestCase):
+class TestProtocolsRIP(VyOSUnitTestSHIM.TestCase):
def setUp(self):
- self.session = ConfigSession(os.getpid())
-
- self.session.set(['policy', 'access-list', acl_in, 'rule', '10', 'action', 'permit'])
- self.session.set(['policy', 'access-list', acl_in, 'rule', '10', 'source', 'any'])
- self.session.set(['policy', 'access-list', acl_in, 'rule', '10', 'destination', 'any'])
- self.session.set(['policy', 'access-list', acl_out, 'rule', '20', 'action', 'deny'])
- self.session.set(['policy', 'access-list', acl_out, 'rule', '20', 'source', 'any'])
- self.session.set(['policy', 'access-list', acl_out, 'rule', '20', 'destination', 'any'])
- self.session.set(['policy', 'prefix-list', prefix_list_in, 'rule', '100', 'action', 'permit'])
- self.session.set(['policy', 'prefix-list', prefix_list_in, 'rule', '100', 'prefix', '192.0.2.0/24'])
- self.session.set(['policy', 'prefix-list', prefix_list_out, 'rule', '200', 'action', 'deny'])
- self.session.set(['policy', 'prefix-list', prefix_list_out, 'rule', '200', 'prefix', '192.0.2.0/24'])
- self.session.set(['policy', 'route-map', route_map, 'rule', '10', 'action', 'permit'])
+ self.cli_set(['policy', 'access-list', acl_in, 'rule', '10', 'action', 'permit'])
+ self.cli_set(['policy', 'access-list', acl_in, 'rule', '10', 'source', 'any'])
+ self.cli_set(['policy', 'access-list', acl_in, 'rule', '10', 'destination', 'any'])
+ self.cli_set(['policy', 'access-list', acl_out, 'rule', '20', 'action', 'deny'])
+ self.cli_set(['policy', 'access-list', acl_out, 'rule', '20', 'source', 'any'])
+ self.cli_set(['policy', 'access-list', acl_out, 'rule', '20', 'destination', 'any'])
+ self.cli_set(['policy', 'prefix-list', prefix_list_in, 'rule', '100', 'action', 'permit'])
+ self.cli_set(['policy', 'prefix-list', prefix_list_in, 'rule', '100', 'prefix', '192.0.2.0/24'])
+ self.cli_set(['policy', 'prefix-list', prefix_list_out, 'rule', '200', 'action', 'deny'])
+ self.cli_set(['policy', 'prefix-list', prefix_list_out, 'rule', '200', 'prefix', '192.0.2.0/24'])
+ self.cli_set(['policy', 'route-map', route_map, 'rule', '10', 'action', 'permit'])
def tearDown(self):
- self.session.delete(base_path)
- self.session.delete(['policy', 'access-list', acl_in])
- self.session.delete(['policy', 'access-list', acl_out])
- self.session.delete(['policy', 'prefix-list', prefix_list_in])
- self.session.delete(['policy', 'prefix-list', prefix_list_out])
- self.session.delete(['policy', 'route-map', route_map])
+ self.cli_delete(base_path)
+ self.cli_delete(['policy', 'access-list', acl_in])
+ self.cli_delete(['policy', 'access-list', acl_out])
+ self.cli_delete(['policy', 'prefix-list', prefix_list_in])
+ self.cli_delete(['policy', 'prefix-list', prefix_list_out])
+ self.cli_delete(['policy', 'route-map', route_map])
- self.session.commit()
- del self.session
+ self.cli_commit()
# Check for running process
self.assertTrue(process_named_running(PROCESS_NAME))
@@ -76,34 +75,34 @@ class TestProtocolsRIP(unittest.TestCase):
timer_timeout = '1000'
timer_update = '90'
- self.session.set(base_path + ['default-distance', distance])
- self.session.set(base_path + ['default-information', 'originate'])
- self.session.set(base_path + ['default-metric', metric])
- self.session.set(base_path + ['distribute-list', 'access-list', 'in', acl_in])
- self.session.set(base_path + ['distribute-list', 'access-list', 'out', acl_out])
- self.session.set(base_path + ['distribute-list', 'prefix-list', 'in', prefix_list_in])
- self.session.set(base_path + ['distribute-list', 'prefix-list', 'out', prefix_list_out])
- self.session.set(base_path + ['passive-interface', 'default'])
- self.session.set(base_path + ['timers', 'garbage-collection', timer_garbage])
- self.session.set(base_path + ['timers', 'timeout', timer_timeout])
- self.session.set(base_path + ['timers', 'update', timer_update])
+ self.cli_set(base_path + ['default-distance', distance])
+ self.cli_set(base_path + ['default-information', 'originate'])
+ self.cli_set(base_path + ['default-metric', metric])
+ self.cli_set(base_path + ['distribute-list', 'access-list', 'in', acl_in])
+ self.cli_set(base_path + ['distribute-list', 'access-list', 'out', acl_out])
+ self.cli_set(base_path + ['distribute-list', 'prefix-list', 'in', prefix_list_in])
+ self.cli_set(base_path + ['distribute-list', 'prefix-list', 'out', prefix_list_out])
+ self.cli_set(base_path + ['passive-interface', 'default'])
+ self.cli_set(base_path + ['timers', 'garbage-collection', timer_garbage])
+ self.cli_set(base_path + ['timers', 'timeout', timer_timeout])
+ self.cli_set(base_path + ['timers', 'update', timer_update])
for interface in interfaces:
- self.session.set(base_path + ['interface', interface])
- self.session.set(base_path + ['distribute-list', 'interface', interface, 'access-list', 'in', acl_in])
- self.session.set(base_path + ['distribute-list', 'interface', interface, 'access-list', 'out', acl_out])
+ self.cli_set(base_path + ['interface', interface])
+ self.cli_set(base_path + ['distribute-list', 'interface', interface, 'access-list', 'in', acl_in])
+ self.cli_set(base_path + ['distribute-list', 'interface', interface, 'access-list', 'out', acl_out])
for neighbor in neighbors:
- self.session.set(base_path + ['neighbor', neighbor])
+ self.cli_set(base_path + ['neighbor', neighbor])
for network in networks:
- self.session.set(base_path + ['network', network])
- self.session.set(base_path + ['network-distance', network, 'distance', network_distance])
- self.session.set(base_path + ['route', network])
+ self.cli_set(base_path + ['network', network])
+ self.cli_set(base_path + ['network-distance', network, 'distance', network_distance])
+ self.cli_set(base_path + ['route', network])
for proto in redistribute:
- self.session.set(base_path + ['redistribute', proto, 'metric', metric])
- self.session.set(base_path + ['redistribute', proto, 'route-map', route_map])
+ self.cli_set(base_path + ['redistribute', proto, 'metric', metric])
+ self.cli_set(base_path + ['redistribute', proto, 'route-map', route_map])
# commit changes
- self.session.commit()
+ self.cli_commit()
# Verify FRR ospfd configuration
frrconfig = getFRRconfig()
diff --git a/smoketest/scripts/cli/test_protocols_ripng.py b/smoketest/scripts/cli/test_protocols_ripng.py
index 90cbaccd8..3380dc78b 100755
--- a/smoketest/scripts/cli/test_protocols_ripng.py
+++ b/smoketest/scripts/cli/test_protocols_ripng.py
@@ -14,12 +14,12 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-import os
import unittest
+from base_vyostest_shim import VyOSUnitTestSHIM
+
from vyos.configsession import ConfigSession
from vyos.ifconfig import Section
-from vyos.util import cmd
from vyos.util import process_named_running
PROCESS_NAME = 'ripngd'
@@ -31,33 +31,26 @@ route_map = 'FooBar123'
base_path = ['protocols', 'ripng']
-def getFRRconfig():
- return cmd('vtysh -c "show run" | sed -n "/router ripng/,/^!/p"')
-
-class TestProtocolsRIPng(unittest.TestCase):
+class TestProtocolsRIPng(VyOSUnitTestSHIM.TestCase):
def setUp(self):
- self.session = ConfigSession(os.getpid())
-
- self.session.set(['policy', 'access-list6', acl_in, 'rule', '10', 'action', 'permit'])
- self.session.set(['policy', 'access-list6', acl_in, 'rule', '10', 'source', 'any'])
- self.session.set(['policy', 'access-list6', acl_out, 'rule', '20', 'action', 'deny'])
- self.session.set(['policy', 'access-list6', acl_out, 'rule', '20', 'source', 'any'])
- self.session.set(['policy', 'prefix-list6', prefix_list_in, 'rule', '100', 'action', 'permit'])
- self.session.set(['policy', 'prefix-list6', prefix_list_in, 'rule', '100', 'prefix', '2001:db8::/32'])
- self.session.set(['policy', 'prefix-list6', prefix_list_out, 'rule', '200', 'action', 'deny'])
- self.session.set(['policy', 'prefix-list6', prefix_list_out, 'rule', '200', 'prefix', '2001:db8::/32'])
- self.session.set(['policy', 'route-map', route_map, 'rule', '10', 'action', 'permit'])
+ self.cli_set(['policy', 'access-list6', acl_in, 'rule', '10', 'action', 'permit'])
+ self.cli_set(['policy', 'access-list6', acl_in, 'rule', '10', 'source', 'any'])
+ self.cli_set(['policy', 'access-list6', acl_out, 'rule', '20', 'action', 'deny'])
+ self.cli_set(['policy', 'access-list6', acl_out, 'rule', '20', 'source', 'any'])
+ self.cli_set(['policy', 'prefix-list6', prefix_list_in, 'rule', '100', 'action', 'permit'])
+ self.cli_set(['policy', 'prefix-list6', prefix_list_in, 'rule', '100', 'prefix', '2001:db8::/32'])
+ self.cli_set(['policy', 'prefix-list6', prefix_list_out, 'rule', '200', 'action', 'deny'])
+ self.cli_set(['policy', 'prefix-list6', prefix_list_out, 'rule', '200', 'prefix', '2001:db8::/32'])
+ self.cli_set(['policy', 'route-map', route_map, 'rule', '10', 'action', 'permit'])
def tearDown(self):
- self.session.delete(base_path)
- self.session.delete(['policy', 'access-list6', acl_in])
- self.session.delete(['policy', 'access-list6', acl_out])
- self.session.delete(['policy', 'prefix-list6', prefix_list_in])
- self.session.delete(['policy', 'prefix-list6', prefix_list_out])
- self.session.delete(['policy', 'route-map', route_map])
-
- self.session.commit()
- del self.session
+ self.cli_delete(base_path)
+ self.cli_delete(['policy', 'access-list6', acl_in])
+ self.cli_delete(['policy', 'access-list6', acl_out])
+ self.cli_delete(['policy', 'prefix-list6', prefix_list_in])
+ self.cli_delete(['policy', 'prefix-list6', prefix_list_out])
+ self.cli_delete(['policy', 'route-map', route_map])
+ self.cli_commit()
# Check for running process
self.assertTrue(process_named_running(PROCESS_NAME))
@@ -72,38 +65,38 @@ class TestProtocolsRIPng(unittest.TestCase):
timer_timeout = '1000'
timer_update = '90'
- self.session.set(base_path + ['default-information', 'originate'])
- self.session.set(base_path + ['default-metric', metric])
- self.session.set(base_path + ['distribute-list', 'access-list', 'in', acl_in])
- self.session.set(base_path + ['distribute-list', 'access-list', 'out', acl_out])
- self.session.set(base_path + ['distribute-list', 'prefix-list', 'in', prefix_list_in])
- self.session.set(base_path + ['distribute-list', 'prefix-list', 'out', prefix_list_out])
- self.session.set(base_path + ['passive-interface', 'default'])
- self.session.set(base_path + ['timers', 'garbage-collection', timer_garbage])
- self.session.set(base_path + ['timers', 'timeout', timer_timeout])
- self.session.set(base_path + ['timers', 'update', timer_update])
+ self.cli_set(base_path + ['default-information', 'originate'])
+ self.cli_set(base_path + ['default-metric', metric])
+ self.cli_set(base_path + ['distribute-list', 'access-list', 'in', acl_in])
+ self.cli_set(base_path + ['distribute-list', 'access-list', 'out', acl_out])
+ self.cli_set(base_path + ['distribute-list', 'prefix-list', 'in', prefix_list_in])
+ self.cli_set(base_path + ['distribute-list', 'prefix-list', 'out', prefix_list_out])
+ self.cli_set(base_path + ['passive-interface', 'default'])
+ self.cli_set(base_path + ['timers', 'garbage-collection', timer_garbage])
+ self.cli_set(base_path + ['timers', 'timeout', timer_timeout])
+ self.cli_set(base_path + ['timers', 'update', timer_update])
for aggregate in aggregates:
- self.session.set(base_path + ['aggregate-address', aggregate])
+ self.cli_set(base_path + ['aggregate-address', aggregate])
for interface in interfaces:
- self.session.set(base_path + ['interface', interface])
- self.session.set(base_path + ['distribute-list', 'interface', interface, 'access-list', 'in', acl_in])
- self.session.set(base_path + ['distribute-list', 'interface', interface, 'access-list', 'out', acl_out])
- self.session.set(base_path + ['distribute-list', 'interface', interface, 'prefix-list', 'in', prefix_list_in])
- self.session.set(base_path + ['distribute-list', 'interface', interface, 'prefix-list', 'out', prefix_list_out])
+ self.cli_set(base_path + ['interface', interface])
+ self.cli_set(base_path + ['distribute-list', 'interface', interface, 'access-list', 'in', acl_in])
+ self.cli_set(base_path + ['distribute-list', 'interface', interface, 'access-list', 'out', acl_out])
+ self.cli_set(base_path + ['distribute-list', 'interface', interface, 'prefix-list', 'in', prefix_list_in])
+ self.cli_set(base_path + ['distribute-list', 'interface', interface, 'prefix-list', 'out', prefix_list_out])
for network in networks:
- self.session.set(base_path + ['network', network])
- self.session.set(base_path + ['route', network])
+ self.cli_set(base_path + ['network', network])
+ self.cli_set(base_path + ['route', network])
for proto in redistribute:
- self.session.set(base_path + ['redistribute', proto, 'metric', metric])
- self.session.set(base_path + ['redistribute', proto, 'route-map', route_map])
+ self.cli_set(base_path + ['redistribute', proto, 'metric', metric])
+ self.cli_set(base_path + ['redistribute', proto, 'route-map', route_map])
# commit changes
- self.session.commit()
+ self.cli_commit()
# Verify FRR ospfd configuration
- frrconfig = getFRRconfig()
+ frrconfig = self.getFRRconfig('router ripng')
self.assertIn(f'router ripng', frrconfig)
self.assertIn(f' default-information originate', frrconfig)
self.assertIn(f' default-metric {metric}', frrconfig)
diff --git a/smoketest/scripts/cli/test_protocols_rpki.py b/smoketest/scripts/cli/test_protocols_rpki.py
index bec4ef76f..8212e9469 100755
--- a/smoketest/scripts/cli/test_protocols_rpki.py
+++ b/smoketest/scripts/cli/test_protocols_rpki.py
@@ -17,6 +17,8 @@
import os
import unittest
+from base_vyostest_shim import VyOSUnitTestSHIM
+
from vyos.configsession import ConfigSession
from vyos.configsession import ConfigSessionError
from vyos.util import cmd
@@ -29,22 +31,15 @@ rpki_known_hosts = '/config/auth/known_hosts'
rpki_ssh_key = '/config/auth/id_rsa_rpki'
rpki_ssh_pub = f'{rpki_ssh_key}.pub'
-def getFRRRPKIconfig():
- return cmd(f'vtysh -c "show run" | sed -n "/rpki/,/^!/p"')
-
-class TestProtocolsRPKI(unittest.TestCase):
- def setUp(self):
- self.session = ConfigSession(os.getpid())
-
+class TestProtocolsRPKI(VyOSUnitTestSHIM.TestCase):
def tearDown(self):
- self.session.delete(base_path)
- self.session.commit()
- del self.session
+ self.cli_delete(base_path)
+ self.cli_commit()
# Nothing RPKI specific should be left over in the config
#
# Disabled until T3266 is resolved
- # frrconfig = getFRRRPKIconfig()
+ # frrconfig = self.getFRRconfig('rpki')
# self.assertNotIn('rpki', frrconfig)
# Check for running process
@@ -71,16 +66,16 @@ class TestProtocolsRPKI(unittest.TestCase):
},
}
- self.session.set(base_path + ['polling-period', polling])
+ self.cli_set(base_path + ['polling-period', polling])
for peer, peer_config in cache.items():
- self.session.set(base_path + ['cache', peer, 'port', peer_config['port']])
- self.session.set(base_path + ['cache', peer, 'preference', peer_config['preference']])
+ self.cli_set(base_path + ['cache', peer, 'port', peer_config['port']])
+ self.cli_set(base_path + ['cache', peer, 'preference', peer_config['preference']])
# commit changes
- self.session.commit()
+ self.cli_commit()
# Verify FRR configuration
- frrconfig = getFRRRPKIconfig()
+ frrconfig = self.getFRRconfig('rpki')
self.assertIn(f'rpki polling_period {polling}', frrconfig)
for peer, peer_config in cache.items():
@@ -103,21 +98,21 @@ class TestProtocolsRPKI(unittest.TestCase):
},
}
- self.session.set(base_path + ['polling-period', polling])
+ self.cli_set(base_path + ['polling-period', polling])
for peer, peer_config in cache.items():
- self.session.set(base_path + ['cache', peer, 'port', peer_config['port']])
- self.session.set(base_path + ['cache', peer, 'preference', peer_config['preference']])
- self.session.set(base_path + ['cache', peer, 'ssh', 'username', peer_config['username']])
- self.session.set(base_path + ['cache', peer, 'ssh', 'public-key-file', rpki_ssh_pub])
- self.session.set(base_path + ['cache', peer, 'ssh', 'private-key-file', rpki_ssh_key])
- self.session.set(base_path + ['cache', peer, 'ssh', 'known-hosts-file', rpki_known_hosts])
+ self.cli_set(base_path + ['cache', peer, 'port', peer_config['port']])
+ self.cli_set(base_path + ['cache', peer, 'preference', peer_config['preference']])
+ self.cli_set(base_path + ['cache', peer, 'ssh', 'username', peer_config['username']])
+ self.cli_set(base_path + ['cache', peer, 'ssh', 'public-key-file', rpki_ssh_pub])
+ self.cli_set(base_path + ['cache', peer, 'ssh', 'private-key-file', rpki_ssh_key])
+ self.cli_set(base_path + ['cache', peer, 'ssh', 'known-hosts-file', rpki_known_hosts])
# commit changes
- self.session.commit()
+ self.cli_commit()
# Verify FRR configuration
- frrconfig = getFRRRPKIconfig()
+ frrconfig = self.getFRRconfig('rpki')
self.assertIn(f'rpki polling_period {polling}', frrconfig)
for peer, peer_config in cache.items():
@@ -140,12 +135,12 @@ class TestProtocolsRPKI(unittest.TestCase):
}
for peer, peer_config in cache.items():
- self.session.set(base_path + ['cache', peer, 'port', peer_config['port']])
- self.session.set(base_path + ['cache', peer, 'preference', peer_config['preference']])
+ self.cli_set(base_path + ['cache', peer, 'port', peer_config['port']])
+ self.cli_set(base_path + ['cache', peer, 'preference', peer_config['preference']])
# check validate() - preferences must be unique
with self.assertRaises(ConfigSessionError):
- self.session.commit()
+ self.cli_commit()
if __name__ == '__main__':
diff --git a/smoketest/scripts/cli/test_protocols_static.py b/smoketest/scripts/cli/test_protocols_static.py
index 100fd3387..de9b48de4 100755
--- a/smoketest/scripts/cli/test_protocols_static.py
+++ b/smoketest/scripts/cli/test_protocols_static.py
@@ -19,6 +19,8 @@ import os
import json
import unittest
+from base_vyostest_shim import VyOSUnitTestSHIM
+
from netifaces import interfaces
from vyos.configsession import ConfigSession
@@ -71,29 +73,27 @@ interface_routes = {
},
}
-
-class StaticRouteTest(unittest.TestCase):
+class StaticRouteTest(VyOSUnitTestSHIM.TestCase):
def setUp(self):
- self.session = ConfigSession(os.getpid())
# we need an alive next-hop interface
- self.session.set(['interfaces', 'dummy', dummy_if, 'address', '192.0.2.1/24'])
- self.session.set(['interfaces', 'dummy', dummy_if, 'address', '2001:db8::1/64'])
+ self.cli_set(['interfaces', 'dummy', dummy_if, 'address', '192.0.2.1/24'])
+ self.cli_set(['interfaces', 'dummy', dummy_if, 'address', '2001:db8::1/64'])
def tearDown(self):
- self.session.delete(['interfaces', 'dummy', dummy_if])
- self.session.commit()
+ self.cli_delete(['interfaces', 'dummy', dummy_if])
+ self.cli_commit()
def test_static_routes(self):
for route, route_config in routes.items():
route_type = 'route'
if is_ipv6(route):
route_type = 'route6'
- self.session.set(base_path + [route_type, route, 'next-hop', route_config['next_hop']])
+ self.cli_set(base_path + [route_type, route, 'next-hop', route_config['next_hop']])
if 'distance' in route_config:
- self.session.set(base_path + [route_type, route, 'next-hop', route_config['next_hop'], 'distance', route_config['distance']])
+ self.cli_set(base_path + [route_type, route, 'next-hop', route_config['next_hop'], 'distance', route_config['distance']])
# commit changes
- self.session.commit()
+ self.cli_commit()
# Verify routes
for route, route_config in routes.items():
@@ -114,19 +114,19 @@ class StaticRouteTest(unittest.TestCase):
route_type = 'route'
if is_ipv6(route):
route_type = 'route6'
- self.session.delete(base_path + [route_type, route])
+ self.cli_delete(base_path + [route_type, route])
def test_interface_routes(self):
for route, route_config in interface_routes.items():
route_type = 'interface-route'
if is_ipv6(route):
route_type = 'interface-route6'
- self.session.set(base_path + [route_type, route, 'next-hop-interface', route_config['next_hop']])
+ self.cli_set(base_path + [route_type, route, 'next-hop-interface', route_config['next_hop']])
if 'distance' in route_config:
- self.session.set(base_path + [route_type, route, 'next-hop-interface', route_config['next_hop'], 'distance', route_config['distance']])
+ self.cli_set(base_path + [route_type, route, 'next-hop-interface', route_config['next_hop'], 'distance', route_config['distance']])
# commit changes
- self.session.commit()
+ self.cli_commit()
# Verify routes
for route, route_config in interface_routes.items():
@@ -148,7 +148,7 @@ class StaticRouteTest(unittest.TestCase):
route_type = 'interface-route'
if is_ipv6(route):
route_type = 'interface-route6'
- self.session.delete(base_path + [route_type, route])
+ self.cli_delete(base_path + [route_type, route])
if __name__ == '__main__':
- unittest.main(verbosity=2, failfast=True)
+ unittest.main(verbosity=2)
diff --git a/smoketest/scripts/cli/test_service_bcast-relay.py b/smoketest/scripts/cli/test_service_bcast-relay.py
index c28509714..58b730ab4 100755
--- a/smoketest/scripts/cli/test_service_bcast-relay.py
+++ b/smoketest/scripts/cli/test_service_bcast-relay.py
@@ -14,47 +14,46 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-import os
import unittest
+from base_vyostest_shim import VyOSUnitTestSHIM
+
from psutil import process_iter
-from vyos.configsession import ConfigSession, ConfigSessionError
+from vyos.configsession import ConfigSession
+from vyos.configsession import ConfigSessionError
base_path = ['service', 'broadcast-relay']
-class TestServiceBroadcastRelay(unittest.TestCase):
+class TestServiceBroadcastRelay(VyOSUnitTestSHIM.TestCase):
_address1 = '192.0.2.1/24'
_address2 = '192.0.2.1/24'
def setUp(self):
- self.session = ConfigSession(os.getpid())
- self.session.set(['interfaces', 'dummy', 'dum1001', 'address', self._address1])
- self.session.set(['interfaces', 'dummy', 'dum1002', 'address', self._address2])
- self.session.commit()
+ self.cli_set(['interfaces', 'dummy', 'dum1001', 'address', self._address1])
+ self.cli_set(['interfaces', 'dummy', 'dum1002', 'address', self._address2])
def tearDown(self):
- self.session.delete(['interfaces', 'dummy', 'dum1001'])
- self.session.delete(['interfaces', 'dummy', 'dum1002'])
- self.session.delete(base_path)
- self.session.commit()
- del self.session
+ self.cli_delete(['interfaces', 'dummy', 'dum1001'])
+ self.cli_delete(['interfaces', 'dummy', 'dum1002'])
+ self.cli_delete(base_path)
+ self.cli_commit()
def test_broadcast_relay_service(self):
ids = range(1, 5)
for id in ids:
base = base_path + ['id', str(id)]
- self.session.set(base + ['description', 'vyos'])
- self.session.set(base + ['port', str(10000 + id)])
+ self.cli_set(base + ['description', 'vyos'])
+ self.cli_set(base + ['port', str(10000 + id)])
# check validate() - two interfaces must be present
with self.assertRaises(ConfigSessionError):
- self.session.commit()
+ self.cli_commit()
- self.session.set(base + ['interface', 'dum1001'])
- self.session.set(base + ['interface', 'dum1002'])
- self.session.set(base + ['address', self._address1.split('/')[0]])
+ self.cli_set(base + ['interface', 'dum1001'])
+ self.cli_set(base + ['interface', 'dum1002'])
+ self.cli_set(base + ['address', self._address1.split('/')[0]])
- self.session.commit()
+ self.cli_commit()
for id in ids:
# check if process is running
diff --git a/smoketest/scripts/cli/test_service_dhcp-relay.py b/smoketest/scripts/cli/test_service_dhcp-relay.py
index 676c4a481..db2edba54 100755
--- a/smoketest/scripts/cli/test_service_dhcp-relay.py
+++ b/smoketest/scripts/cli/test_service_dhcp-relay.py
@@ -14,14 +14,13 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-import re
-import os
import unittest
+from base_vyostest_shim import VyOSUnitTestSHIM
+
from vyos.configsession import ConfigSession
from vyos.configsession import ConfigSessionError
from vyos.ifconfig import Section
-from vyos.util import cmd
from vyos.util import process_named_running
from vyos.util import read_file
@@ -29,14 +28,10 @@ PROCESS_NAME = 'dhcrelay'
RELAY_CONF = '/run/dhcp-relay/dhcrelay.conf'
base_path = ['service', 'dhcp-relay']
-class TestServiceDHCPRelay(unittest.TestCase):
- def setUp(self):
- self.session = ConfigSession(os.getpid())
-
+class TestServiceDHCPRelay(VyOSUnitTestSHIM.TestCase):
def tearDown(self):
- self.session.delete(base_path)
- self.session.commit()
- del self.session
+ self.cli_delete(base_path)
+ self.cli_commit()
def test_relay_default(self):
max_size = '800'
@@ -44,28 +39,28 @@ class TestServiceDHCPRelay(unittest.TestCase):
agents_packets = 'append'
servers = ['192.0.2.1', '192.0.2.2']
- self.session.set(base_path + ['interface', 'lo'])
+ self.cli_set(base_path + ['interface', 'lo'])
# check validate() - DHCP relay does not support the loopback interface
with self.assertRaises(ConfigSessionError):
- self.session.commit()
- self.session.delete(base_path + ['interface', 'lo'])
+ self.cli_commit()
+ self.cli_delete(base_path + ['interface', 'lo'])
# activate DHCP relay on all ethernet interfaces
for tmp in Section.interfaces("ethernet"):
- self.session.set(base_path + ['interface', tmp])
+ self.cli_set(base_path + ['interface', tmp])
# check validate() - No DHCP relay server(s) configured
with self.assertRaises(ConfigSessionError):
- self.session.commit()
+ self.cli_commit()
for server in servers:
- self.session.set(base_path + ['server', server])
+ self.cli_set(base_path + ['server', server])
- self.session.set(base_path + ['relay-options', 'max-size', max_size])
- self.session.set(base_path + ['relay-options', 'hop-count', hop_count])
- self.session.set(base_path + ['relay-options', 'relay-agents-packets', agents_packets])
+ self.cli_set(base_path + ['relay-options', 'max-size', max_size])
+ self.cli_set(base_path + ['relay-options', 'hop-count', hop_count])
+ self.cli_set(base_path + ['relay-options', 'relay-agents-packets', agents_packets])
# commit changes
- self.session.commit()
+ self.cli_commit()
# Check configured port
config = read_file(RELAY_CONF)
diff --git a/smoketest/scripts/cli/test_service_dhcp-server.py b/smoketest/scripts/cli/test_service_dhcp-server.py
index 2a5789e70..815bd333a 100755
--- a/smoketest/scripts/cli/test_service_dhcp-server.py
+++ b/smoketest/scripts/cli/test_service_dhcp-server.py
@@ -14,13 +14,12 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-import re
-import os
import unittest
+from base_vyostest_shim import VyOSUnitTestSHIM
+
from vyos.configsession import ConfigSession
from vyos.configsession import ConfigSessionError
-from vyos.util import cmd
from vyos.util import process_named_running
from vyos.util import read_file
from vyos.template import address_from_cidr
@@ -37,17 +36,15 @@ dns_1 = inc_ip(subnet, 2)
dns_2 = inc_ip(subnet, 3)
domain_name = 'vyos.net'
-class TestServiceDHCPServer(unittest.TestCase):
+class TestServiceDHCPServer(VyOSUnitTestSHIM.TestCase):
def setUp(self):
- self.session = ConfigSession(os.getpid())
cidr_mask = subnet.split('/')[-1]
- self.session.set(['interfaces', 'dummy', 'dum8765', 'address', f'{router}/{cidr_mask}'])
+ self.cli_set(['interfaces', 'dummy', 'dum8765', 'address', f'{router}/{cidr_mask}'])
def tearDown(self):
- self.session.delete(['interfaces', 'dummy', 'dum8765'])
- self.session.delete(base_path)
- self.session.commit()
- del self.session
+ self.cli_delete(['interfaces', 'dummy', 'dum8765'])
+ self.cli_delete(base_path)
+ self.cli_commit()
def test_dhcp_single_pool_range(self):
shared_net_name = 'SMOKE-1'
@@ -57,25 +54,25 @@ class TestServiceDHCPServer(unittest.TestCase):
range_1_start = inc_ip(subnet, 40)
range_1_stop = inc_ip(subnet, 50)
- self.session.set(base_path + ['dynamic-dns-update'])
+ self.cli_set(base_path + ['dynamic-dns-update'])
pool = base_path + ['shared-network-name', shared_net_name, 'subnet', subnet]
# we use the first subnet IP address as default gateway
- self.session.set(pool + ['default-router', router])
- self.session.set(pool + ['dns-server', dns_1])
- self.session.set(pool + ['dns-server', dns_2])
- self.session.set(pool + ['domain-name', domain_name])
+ self.cli_set(pool + ['default-router', router])
+ self.cli_set(pool + ['dns-server', dns_1])
+ self.cli_set(pool + ['dns-server', dns_2])
+ self.cli_set(pool + ['domain-name', domain_name])
# check validate() - No DHCP address range or active static-mapping set
with self.assertRaises(ConfigSessionError):
- self.session.commit()
- self.session.set(pool + ['range', '0', 'start', range_0_start])
- self.session.set(pool + ['range', '0', 'stop', range_0_stop])
- self.session.set(pool + ['range', '1', 'start', range_1_start])
- self.session.set(pool + ['range', '1', 'stop', range_1_stop])
+ self.cli_commit()
+ self.cli_set(pool + ['range', '0', 'start', range_0_start])
+ self.cli_set(pool + ['range', '0', 'stop', range_0_stop])
+ self.cli_set(pool + ['range', '1', 'start', range_1_start])
+ self.cli_set(pool + ['range', '1', 'stop', range_1_stop])
# commit changes
- self.session.commit()
+ self.cli_commit()
config = read_file(DHCPD_CONF)
network = address_from_cidr(subnet)
@@ -110,42 +107,42 @@ class TestServiceDHCPServer(unittest.TestCase):
pool = base_path + ['shared-network-name', shared_net_name, 'subnet', subnet]
# we use the first subnet IP address as default gateway
- self.session.set(pool + ['default-router', router])
- self.session.set(pool + ['dns-server', dns_1])
- self.session.set(pool + ['dns-server', dns_2])
- self.session.set(pool + ['domain-name', domain_name])
- self.session.set(pool + ['ip-forwarding'])
- self.session.set(pool + ['smtp-server', smtp_server])
- self.session.set(pool + ['pop-server', smtp_server])
- self.session.set(pool + ['time-server', time_server])
- self.session.set(pool + ['tftp-server-name', tftp_server])
+ self.cli_set(pool + ['default-router', router])
+ self.cli_set(pool + ['dns-server', dns_1])
+ self.cli_set(pool + ['dns-server', dns_2])
+ self.cli_set(pool + ['domain-name', domain_name])
+ self.cli_set(pool + ['ip-forwarding'])
+ self.cli_set(pool + ['smtp-server', smtp_server])
+ self.cli_set(pool + ['pop-server', smtp_server])
+ self.cli_set(pool + ['time-server', time_server])
+ self.cli_set(pool + ['tftp-server-name', tftp_server])
for search in search_domains:
- self.session.set(pool + ['domain-search', search])
- self.session.set(pool + ['bootfile-name', bootfile_name])
- self.session.set(pool + ['bootfile-server', bootfile_server])
- self.session.set(pool + ['wpad-url', wpad])
- self.session.set(pool + ['server-identifier', server_identifier])
+ self.cli_set(pool + ['domain-search', search])
+ self.cli_set(pool + ['bootfile-name', bootfile_name])
+ self.cli_set(pool + ['bootfile-server', bootfile_server])
+ self.cli_set(pool + ['wpad-url', wpad])
+ self.cli_set(pool + ['server-identifier', server_identifier])
- self.session.set(pool + ['static-route', 'destination-subnet', '10.0.0.0/24'])
- self.session.set(pool + ['static-route', 'router', '192.0.2.1'])
+ self.cli_set(pool + ['static-route', 'destination-subnet', '10.0.0.0/24'])
+ self.cli_set(pool + ['static-route', 'router', '192.0.2.1'])
# check validate() - No DHCP address range or active static-mapping set
with self.assertRaises(ConfigSessionError):
- self.session.commit()
- self.session.set(pool + ['range', '0', 'start', range_0_start])
- self.session.set(pool + ['range', '0', 'stop', range_0_stop])
+ self.cli_commit()
+ self.cli_set(pool + ['range', '0', 'start', range_0_start])
+ self.cli_set(pool + ['range', '0', 'stop', range_0_stop])
# failover
failover_local = router
failover_remote = inc_ip(router, 1)
- self.session.set(pool + ['failover', 'local-address', failover_local])
- self.session.set(pool + ['failover', 'name', shared_net_name])
- self.session.set(pool + ['failover', 'peer-address', failover_remote])
- self.session.set(pool + ['failover', 'status', 'primary'])
+ self.cli_set(pool + ['failover', 'local-address', failover_local])
+ self.cli_set(pool + ['failover', 'name', shared_net_name])
+ self.cli_set(pool + ['failover', 'peer-address', failover_remote])
+ self.cli_set(pool + ['failover', 'status', 'primary'])
# commit changes
- self.session.commit()
+ self.cli_commit()
config = read_file(DHCPD_CONF)
@@ -204,24 +201,24 @@ class TestServiceDHCPServer(unittest.TestCase):
pool = base_path + ['shared-network-name', shared_net_name, 'subnet', subnet]
# we use the first subnet IP address as default gateway
- self.session.set(pool + ['default-router', router])
- self.session.set(pool + ['dns-server', dns_1])
- self.session.set(pool + ['dns-server', dns_2])
- self.session.set(pool + ['domain-name', domain_name])
+ self.cli_set(pool + ['default-router', router])
+ self.cli_set(pool + ['dns-server', dns_1])
+ self.cli_set(pool + ['dns-server', dns_2])
+ self.cli_set(pool + ['domain-name', domain_name])
# check validate() - No DHCP address range or active static-mapping set
with self.assertRaises(ConfigSessionError):
- self.session.commit()
+ self.cli_commit()
client_base = 10
for client in ['client1', 'client2', 'client3']:
mac = '00:50:00:00:00:{}'.format(client_base)
- self.session.set(pool + ['static-mapping', client, 'mac-address', mac])
- self.session.set(pool + ['static-mapping', client, 'ip-address', inc_ip(subnet, client_base)])
+ self.cli_set(pool + ['static-mapping', client, 'mac-address', mac])
+ self.cli_set(pool + ['static-mapping', client, 'ip-address', inc_ip(subnet, client_base)])
client_base += 1
# commit changes
- self.session.commit()
+ self.cli_commit()
config = read_file(DHCPD_CONF)
network = address_from_cidr(subnet)
@@ -264,25 +261,25 @@ class TestServiceDHCPServer(unittest.TestCase):
pool = base_path + ['shared-network-name', shared_net_name, 'subnet', subnet]
# we use the first subnet IP address as default gateway
- self.session.set(pool + ['default-router', router])
- self.session.set(pool + ['dns-server', dns_1])
- self.session.set(pool + ['domain-name', domain_name])
- self.session.set(pool + ['lease', lease_time])
+ self.cli_set(pool + ['default-router', router])
+ self.cli_set(pool + ['dns-server', dns_1])
+ self.cli_set(pool + ['domain-name', domain_name])
+ self.cli_set(pool + ['lease', lease_time])
- self.session.set(pool + ['range', '0', 'start', range_0_start])
- self.session.set(pool + ['range', '0', 'stop', range_0_stop])
- self.session.set(pool + ['range', '1', 'start', range_1_start])
- self.session.set(pool + ['range', '1', 'stop', range_1_stop])
+ self.cli_set(pool + ['range', '0', 'start', range_0_start])
+ self.cli_set(pool + ['range', '0', 'stop', range_0_stop])
+ self.cli_set(pool + ['range', '1', 'start', range_1_start])
+ self.cli_set(pool + ['range', '1', 'stop', range_1_stop])
client_base = 60
for client in ['client1', 'client2', 'client3', 'client4']:
mac = '02:50:00:00:00:{}'.format(client_base)
- self.session.set(pool + ['static-mapping', client, 'mac-address', mac])
- self.session.set(pool + ['static-mapping', client, 'ip-address', inc_ip(subnet, client_base)])
+ self.cli_set(pool + ['static-mapping', client, 'mac-address', mac])
+ self.cli_set(pool + ['static-mapping', client, 'ip-address', inc_ip(subnet, client_base)])
client_base += 1
# commit changes
- self.session.commit()
+ self.cli_commit()
config = read_file(DHCPD_CONF)
for network in ['0', '1', '2', '3']:
@@ -329,13 +326,13 @@ class TestServiceDHCPServer(unittest.TestCase):
range_0_stop = inc_ip(subnet, 20)
pool = base_path + ['shared-network-name', 'EXCLUDE-TEST', 'subnet', subnet]
- self.session.set(pool + ['default-router', router])
- self.session.set(pool + ['exclude', router])
- self.session.set(pool + ['range', '0', 'start', range_0_start])
- self.session.set(pool + ['range', '0', 'stop', range_0_stop])
+ self.cli_set(pool + ['default-router', router])
+ self.cli_set(pool + ['exclude', router])
+ self.cli_set(pool + ['range', '0', 'start', range_0_start])
+ self.cli_set(pool + ['range', '0', 'stop', range_0_stop])
# commit changes
- self.session.commit()
+ self.cli_commit()
# VErify
config = read_file(DHCPD_CONF)
@@ -362,13 +359,13 @@ class TestServiceDHCPServer(unittest.TestCase):
range_0_start_excl = inc_ip(exclude_addr, 1)
pool = base_path + ['shared-network-name', 'EXCLUDE-TEST-2', 'subnet', subnet]
- self.session.set(pool + ['default-router', router])
- self.session.set(pool + ['exclude', exclude_addr])
- self.session.set(pool + ['range', '0', 'start', range_0_start])
- self.session.set(pool + ['range', '0', 'stop', range_0_stop])
+ self.cli_set(pool + ['default-router', router])
+ self.cli_set(pool + ['exclude', exclude_addr])
+ self.cli_set(pool + ['range', '0', 'start', range_0_start])
+ self.cli_set(pool + ['range', '0', 'stop', range_0_stop])
# commit changes
- self.session.commit()
+ self.cli_commit()
# Verify
config = read_file(DHCPD_CONF)
@@ -386,7 +383,7 @@ class TestServiceDHCPServer(unittest.TestCase):
def test_dhcp_relay_server(self):
# Listen on specific address and return DHCP leases from a non
# directly connected pool
- self.session.set(base_path + ['listen-address', router])
+ self.cli_set(base_path + ['listen-address', router])
relay_subnet = '10.0.0.0/16'
relay_router = inc_ip(relay_subnet, 1)
@@ -395,12 +392,12 @@ class TestServiceDHCPServer(unittest.TestCase):
range_0_stop = '10.0.250.255'
pool = base_path + ['shared-network-name', 'RELAY', 'subnet', relay_subnet]
- self.session.set(pool + ['default-router', relay_router])
- self.session.set(pool + ['range', '0', 'start', range_0_start])
- self.session.set(pool + ['range', '0', 'stop', range_0_stop])
+ self.cli_set(pool + ['default-router', relay_router])
+ self.cli_set(pool + ['range', '0', 'start', range_0_start])
+ self.cli_set(pool + ['range', '0', 'stop', range_0_stop])
# commit changes
- self.session.commit()
+ self.cli_commit()
config = read_file(DHCPD_CONF)
network = address_from_cidr(subnet)
@@ -425,18 +422,18 @@ class TestServiceDHCPServer(unittest.TestCase):
pool = base_path + ['shared-network-name', shared_net_name, 'subnet', subnet]
# we use the first subnet IP address as default gateway
- self.session.set(pool + ['default-router', router])
- self.session.set(pool + ['range', '0', 'start', range_0_start])
- self.session.set(pool + ['range', '0', 'stop', range_0_stop])
+ self.cli_set(pool + ['default-router', router])
+ self.cli_set(pool + ['range', '0', 'start', range_0_start])
+ self.cli_set(pool + ['range', '0', 'stop', range_0_stop])
- self.session.set(base_path + ['global-parameters', 'this-is-crap'])
+ self.cli_set(base_path + ['global-parameters', 'this-is-crap'])
# check generate() - dhcpd should not acceot this garbage config
with self.assertRaises(ConfigSessionError):
- self.session.commit()
- self.session.delete(base_path + ['global-parameters'])
+ self.cli_commit()
+ self.cli_delete(base_path + ['global-parameters'])
# commit changes
- self.session.commit()
+ self.cli_commit()
# Check for running process
self.assertTrue(process_named_running(PROCESS_NAME))
diff --git a/smoketest/scripts/cli/test_service_dhcpv6-relay.py b/smoketest/scripts/cli/test_service_dhcpv6-relay.py
index e36c237bc..5a9dd1aa6 100755
--- a/smoketest/scripts/cli/test_service_dhcpv6-relay.py
+++ b/smoketest/scripts/cli/test_service_dhcpv6-relay.py
@@ -14,15 +14,14 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-import re
-import os
import unittest
+from base_vyostest_shim import VyOSUnitTestSHIM
+
from vyos.configsession import ConfigSession
from vyos.configsession import ConfigSessionError
from vyos.ifconfig import Section
from vyos.template import address_from_cidr
-from vyos.util import cmd
from vyos.util import process_named_running
from vyos.util import read_file
@@ -35,52 +34,50 @@ upstream_if_addr = '2001:db8::1/64'
listen_addr = '2001:db8:ffff::1/64'
interfaces = []
-class TestServiceDHCPv6Relay(unittest.TestCase):
+class TestServiceDHCPv6Relay(VyOSUnitTestSHIM.TestCase):
def setUp(self):
- self.session = ConfigSession(os.getpid())
for tmp in interfaces:
listen = listen_addr
if tmp == upstream_if:
listen = upstream_if_addr
- self.session.set(['interfaces', 'ethernet', tmp, 'address', listen])
+ self.cli_set(['interfaces', 'ethernet', tmp, 'address', listen])
def tearDown(self):
- self.session.delete(base_path)
+ self.cli_delete(base_path)
for tmp in interfaces:
listen = listen_addr
if tmp == upstream_if:
listen = upstream_if_addr
- self.session.delete(['interfaces', 'ethernet', tmp, 'address', listen])
+ self.cli_delete(['interfaces', 'ethernet', tmp, 'address', listen])
- self.session.commit()
- del self.session
+ self.cli_commit()
def test_relay_default(self):
dhcpv6_server = '2001:db8::ffff'
hop_count = '20'
- self.session.set(base_path + ['use-interface-id-option'])
- self.session.set(base_path + ['max-hop-count', hop_count])
+ self.cli_set(base_path + ['use-interface-id-option'])
+ self.cli_set(base_path + ['max-hop-count', hop_count])
# check validate() - Must set at least one listen and upstream
# interface addresses.
with self.assertRaises(ConfigSessionError):
- self.session.commit()
- self.session.set(base_path + ['upstream-interface', upstream_if, 'address', dhcpv6_server])
+ self.cli_commit()
+ self.cli_set(base_path + ['upstream-interface', upstream_if, 'address', dhcpv6_server])
# check validate() - Must set at least one listen and upstream
# interface addresses.
with self.assertRaises(ConfigSessionError):
- self.session.commit()
+ self.cli_commit()
# add listener on all ethernet interfaces except the upstream interface
for tmp in interfaces:
if tmp == upstream_if:
continue
- self.session.set(base_path + ['listen-interface', tmp, 'address', listen_addr.split('/')[0]])
+ self.cli_set(base_path + ['listen-interface', tmp, 'address', listen_addr.split('/')[0]])
# commit changes
- self.session.commit()
+ self.cli_commit()
# Check configured port
config = read_file(RELAY_CONF)
diff --git a/smoketest/scripts/cli/test_service_dhcpv6-server.py b/smoketest/scripts/cli/test_service_dhcpv6-server.py
index f97c7963f..3f9564e59 100755
--- a/smoketest/scripts/cli/test_service_dhcpv6-server.py
+++ b/smoketest/scripts/cli/test_service_dhcpv6-server.py
@@ -14,14 +14,13 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-import re
-import os
import unittest
+from base_vyostest_shim import VyOSUnitTestSHIM
+
from vyos.configsession import ConfigSession
from vyos.configsession import ConfigSessionError
from vyos.template import inc_ip
-from vyos.util import cmd
from vyos.util import process_named_running
from vyos.util import read_file
@@ -37,16 +36,14 @@ nis_servers = ['2001:db8:ffff::1', '2001:db8:ffff::2']
interface = 'eth1'
interface_addr = inc_ip(subnet, 1) + '/64'
-class TestServiceDHCPServer(unittest.TestCase):
+class TestServiceDHCPServer(VyOSUnitTestSHIM.TestCase):
def setUp(self):
- self.session = ConfigSession(os.getpid())
- self.session.set(['interfaces', 'ethernet', interface, 'address', interface_addr])
+ self.cli_set(['interfaces', 'ethernet', interface, 'address', interface_addr])
def tearDown(self):
- self.session.delete(base_path)
- self.session.delete(['interfaces', 'ethernet', interface, 'address', interface_addr])
- self.session.commit()
- del self.session
+ self.cli_delete(base_path)
+ self.cli_delete(['interfaces', 'ethernet', interface, 'address', interface_addr])
+ self.cli_commit()
def test_single_pool(self):
shared_net_name = 'SMOKE-1'
@@ -62,38 +59,38 @@ class TestServiceDHCPServer(unittest.TestCase):
pool = base_path + ['shared-network-name', shared_net_name, 'subnet', subnet]
- self.session.set(base_path + ['preference', preference])
+ self.cli_set(base_path + ['preference', preference])
# we use the first subnet IP address as default gateway
- self.session.set(pool + ['name-server', dns_1])
- self.session.set(pool + ['name-server', dns_2])
- self.session.set(pool + ['name-server', dns_2])
- self.session.set(pool + ['lease-time', 'default', lease_time])
- self.session.set(pool + ['lease-time', 'maximum', max_lease_time])
- self.session.set(pool + ['lease-time', 'minimum', min_lease_time])
- self.session.set(pool + ['nis-domain', domain])
- self.session.set(pool + ['nisplus-domain', domain])
- self.session.set(pool + ['sip-server', sip_server])
- self.session.set(pool + ['sntp-server', sntp_server])
- self.session.set(pool + ['address-range', 'start', range_start, 'stop', range_stop])
+ self.cli_set(pool + ['name-server', dns_1])
+ self.cli_set(pool + ['name-server', dns_2])
+ self.cli_set(pool + ['name-server', dns_2])
+ self.cli_set(pool + ['lease-time', 'default', lease_time])
+ self.cli_set(pool + ['lease-time', 'maximum', max_lease_time])
+ self.cli_set(pool + ['lease-time', 'minimum', min_lease_time])
+ self.cli_set(pool + ['nis-domain', domain])
+ self.cli_set(pool + ['nisplus-domain', domain])
+ self.cli_set(pool + ['sip-server', sip_server])
+ self.cli_set(pool + ['sntp-server', sntp_server])
+ self.cli_set(pool + ['address-range', 'start', range_start, 'stop', range_stop])
for server in nis_servers:
- self.session.set(pool + ['nis-server', server])
- self.session.set(pool + ['nisplus-server', server])
+ self.cli_set(pool + ['nis-server', server])
+ self.cli_set(pool + ['nisplus-server', server])
for search in search_domains:
- self.session.set(pool + ['domain-search', search])
+ self.cli_set(pool + ['domain-search', search])
client_base = 1
for client in ['client1', 'client2', 'client3']:
cid = '00:01:00:01:12:34:56:78:aa:bb:cc:dd:ee:{}'.format(client_base)
- self.session.set(pool + ['static-mapping', client, 'identifier', cid])
- self.session.set(pool + ['static-mapping', client, 'ipv6-address', inc_ip(subnet, client_base)])
- self.session.set(pool + ['static-mapping', client, 'ipv6-prefix', inc_ip(subnet, client_base << 64) + '/64'])
+ self.cli_set(pool + ['static-mapping', client, 'identifier', cid])
+ self.cli_set(pool + ['static-mapping', client, 'ipv6-address', inc_ip(subnet, client_base)])
+ self.cli_set(pool + ['static-mapping', client, 'ipv6-prefix', inc_ip(subnet, client_base << 64) + '/64'])
client_base += 1
# commit changes
- self.session.commit()
+ self.cli_commit()
config = read_file(DHCPD_CONF)
self.assertIn(f'option dhcp6.preference {preference};', config)
@@ -139,12 +136,12 @@ class TestServiceDHCPServer(unittest.TestCase):
pool = base_path + ['shared-network-name', shared_net_name, 'subnet', subnet]
- self.session.set(pool + ['address-range', 'start', range_start, 'stop', range_stop])
- self.session.set(pool + ['prefix-delegation', 'start', delegate_start, 'stop', delegate_stop])
- self.session.set(pool + ['prefix-delegation', 'start', delegate_start, 'prefix-length', delegate_len])
+ self.cli_set(pool + ['address-range', 'start', range_start, 'stop', range_stop])
+ self.cli_set(pool + ['prefix-delegation', 'start', delegate_start, 'stop', delegate_stop])
+ self.cli_set(pool + ['prefix-delegation', 'start', delegate_start, 'prefix-length', delegate_len])
# commit changes
- self.session.commit()
+ self.cli_commit()
config = read_file(DHCPD_CONF)
self.assertIn(f'subnet6 {subnet}' + r' {', config)
@@ -159,12 +156,12 @@ class TestServiceDHCPServer(unittest.TestCase):
ns_global_1 = '2001:db8::1111'
ns_global_2 = '2001:db8::2222'
- self.session.set(base_path + ['global-parameters', 'name-server', ns_global_1])
- self.session.set(base_path + ['global-parameters', 'name-server', ns_global_2])
- self.session.set(base_path + ['shared-network-name', shared_net_name, 'subnet', subnet])
+ self.cli_set(base_path + ['global-parameters', 'name-server', ns_global_1])
+ self.cli_set(base_path + ['global-parameters', 'name-server', ns_global_2])
+ self.cli_set(base_path + ['shared-network-name', shared_net_name, 'subnet', subnet])
# commit changes
- self.session.commit()
+ self.cli_commit()
config = read_file(DHCPD_CONF)
self.assertIn(f'option dhcp6.name-servers {ns_global_1}, {ns_global_2};', config)
diff --git a/smoketest/scripts/cli/test_service_dns_dynamic.py b/smoketest/scripts/cli/test_service_dns_dynamic.py
index 83eede64a..d8a87ffd4 100755
--- a/smoketest/scripts/cli/test_service_dns_dynamic.py
+++ b/smoketest/scripts/cli/test_service_dns_dynamic.py
@@ -18,10 +18,11 @@ import re
import os
import unittest
-from getpass import getuser
+from base_vyostest_shim import VyOSUnitTestSHIM
+
from vyos.configsession import ConfigSession
from vyos.configsession import ConfigSessionError
-from vyos.util import read_file
+from vyos.util import cmd
from vyos.util import process_named_running
PROCESS_NAME = 'ddclient'
@@ -29,21 +30,17 @@ DDCLIENT_CONF = '/run/ddclient/ddclient.conf'
base_path = ['service', 'dns', 'dynamic']
def get_config_value(key):
- tmp = read_file(DDCLIENT_CONF)
+ tmp = cmd(f'sudo cat {DDCLIENT_CONF}')
tmp = re.findall(r'\n?{}=+(.*)'.format(key), tmp)
tmp = tmp[0].rstrip(',')
return tmp
-class TestServiceDDNS(unittest.TestCase):
- def setUp(self):
- self.session = ConfigSession(os.getpid())
+class TestServiceDDNS(VyOSUnitTestSHIM.TestCase):
def tearDown(self):
# Delete DDNS configuration
- self.session.delete(base_path)
- self.session.commit()
-
- del self.session
+ self.cli_delete(base_path)
+ self.cli_commit()
def test_dyndns_service(self):
ddns = ['interface', 'eth0', 'service']
@@ -53,45 +50,44 @@ class TestServiceDDNS(unittest.TestCase):
user = 'vyos_user'
password = 'vyos_pass'
zone = 'vyos.io'
- self.session.delete(base_path)
- self.session.set(base_path + ddns + [service, 'host-name', 'test.ddns.vyos.io'])
- self.session.set(base_path + ddns + [service, 'login', user])
- self.session.set(base_path + ddns + [service, 'password', password])
- self.session.set(base_path + ddns + [service, 'zone', zone])
+ self.cli_delete(base_path)
+ self.cli_set(base_path + ddns + [service, 'host-name', 'test.ddns.vyos.io'])
+ self.cli_set(base_path + ddns + [service, 'login', user])
+ self.cli_set(base_path + ddns + [service, 'password', password])
+ self.cli_set(base_path + ddns + [service, 'zone', zone])
# commit changes
if service == 'cloudflare':
- self.session.commit()
+ self.cli_commit()
else:
# zone option only works on cloudflare, an exception is raised
# for all others
with self.assertRaises(ConfigSessionError):
- self.session.commit()
- self.session.delete(base_path + ddns + [service, 'zone', 'vyos.io'])
+ self.cli_commit()
+ self.cli_delete(base_path + ddns + [service, 'zone', 'vyos.io'])
# commit changes again - now it should work
- self.session.commit()
+ self.cli_commit()
# we can only read the configuration file when we operate as 'root'
- if getuser() == 'root':
- protocol = get_config_value('protocol')
- login = get_config_value('login')
- pwd = get_config_value('password')
-
- # some services need special treatment
- protoname = service
- if service == 'cloudflare':
- tmp = get_config_value('zone')
- self.assertTrue(tmp == zone)
- elif service == 'afraid':
- protoname = 'freedns'
- elif service == 'dyndns':
- protoname = 'dyndns2'
- elif service == 'zoneedit':
- protoname = 'zoneedit1'
-
- self.assertTrue(protocol == protoname)
- self.assertTrue(login == user)
- self.assertTrue(pwd == "'" + password + "'")
+ protocol = get_config_value('protocol')
+ login = get_config_value('login')
+ pwd = get_config_value('password')
+
+ # some services need special treatment
+ protoname = service
+ if service == 'cloudflare':
+ tmp = get_config_value('zone')
+ self.assertTrue(tmp == zone)
+ elif service == 'afraid':
+ protoname = 'freedns'
+ elif service == 'dyndns':
+ protoname = 'dyndns2'
+ elif service == 'zoneedit':
+ protoname = 'zoneedit1'
+
+ self.assertTrue(protocol == protoname)
+ self.assertTrue(login == user)
+ self.assertTrue(pwd == "'" + password + "'")
# Check for running process
self.assertTrue(process_named_running(PROCESS_NAME))
@@ -101,11 +97,11 @@ class TestServiceDDNS(unittest.TestCase):
ddns = ['interface', 'eth0', 'rfc2136', 'vyos']
ddns_key_file = '/config/auth/my.key'
- self.session.set(base_path + ddns + ['key', ddns_key_file])
- self.session.set(base_path + ddns + ['record', 'test.ddns.vyos.io'])
- self.session.set(base_path + ddns + ['server', 'ns1.vyos.io'])
- self.session.set(base_path + ddns + ['ttl', '300'])
- self.session.set(base_path + ddns + ['zone', 'vyos.io'])
+ self.cli_set(base_path + ddns + ['key', ddns_key_file])
+ self.cli_set(base_path + ddns + ['record', 'test.ddns.vyos.io'])
+ self.cli_set(base_path + ddns + ['server', 'ns1.vyos.io'])
+ self.cli_set(base_path + ddns + ['ttl', '300'])
+ self.cli_set(base_path + ddns + ['zone', 'vyos.io'])
# ensure an exception will be raised as no key is present
if os.path.exists(ddns_key_file):
@@ -113,13 +109,13 @@ class TestServiceDDNS(unittest.TestCase):
# check validate() - the key file does not exist yet
with self.assertRaises(ConfigSessionError):
- self.session.commit()
+ self.cli_commit()
with open(ddns_key_file, 'w') as f:
f.write('S3cretKey')
# commit changes
- self.session.commit()
+ self.cli_commit()
# TODO: inspect generated configuration file
diff --git a/smoketest/scripts/cli/test_service_dns_forwarding.py b/smoketest/scripts/cli/test_service_dns_forwarding.py
index ada53e8dd..8005eb319 100755
--- a/smoketest/scripts/cli/test_service_dns_forwarding.py
+++ b/smoketest/scripts/cli/test_service_dns_forwarding.py
@@ -15,10 +15,12 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import re
-import os
import unittest
-from vyos.configsession import ConfigSession, ConfigSessionError
+from base_vyostest_shim import VyOSUnitTestSHIM
+
+from vyos.configsession import ConfigSession
+from vyos.configsession import ConfigSessionError
from vyos.util import read_file
from vyos.util import process_named_running
@@ -37,44 +39,40 @@ def get_config_value(key, file=CONFIG_FILE):
tmp = re.findall(r'\n{}=+(.*)'.format(key), tmp)
return tmp[0]
-class TestServicePowerDNS(unittest.TestCase):
- def setUp(self):
- self.session = ConfigSession(os.getpid())
-
+class TestServicePowerDNS(VyOSUnitTestSHIM.TestCase):
def tearDown(self):
# Delete DNS forwarding configuration
- self.session.delete(base_path)
- self.session.commit()
- del self.session
+ self.cli_delete(base_path)
+ self.cli_commit()
def test_basic_forwarding(self):
# Check basic DNS forwarding settings
cache_size = '20'
negative_ttl = '120'
- self.session.set(base_path + ['cache-size', cache_size])
- self.session.set(base_path + ['negative-ttl', negative_ttl])
+ self.cli_set(base_path + ['cache-size', cache_size])
+ self.cli_set(base_path + ['negative-ttl', negative_ttl])
# check validate() - allow from must be defined
with self.assertRaises(ConfigSessionError):
- self.session.commit()
+ self.cli_commit()
for network in allow_from:
- self.session.set(base_path + ['allow-from', network])
+ self.cli_set(base_path + ['allow-from', network])
# check validate() - listen-address must be defined
with self.assertRaises(ConfigSessionError):
- self.session.commit()
+ self.cli_commit()
for address in listen_adress:
- self.session.set(base_path + ['listen-address', address])
+ self.cli_set(base_path + ['listen-address', address])
# configure DNSSEC
- self.session.set(base_path + ['dnssec', 'validate'])
+ self.cli_set(base_path + ['dnssec', 'validate'])
# Do not use local /etc/hosts file in name resolution
- self.session.set(base_path + ['ignore-hosts-file'])
+ self.cli_set(base_path + ['ignore-hosts-file'])
# commit changes
- self.session.commit()
+ self.cli_commit()
# Check configured cache-size
tmp = get_config_value('max-cache-entries')
@@ -103,16 +101,16 @@ class TestServicePowerDNS(unittest.TestCase):
# DNSSEC option testing
for network in allow_from:
- self.session.set(base_path + ['allow-from', network])
+ self.cli_set(base_path + ['allow-from', network])
for address in listen_adress:
- self.session.set(base_path + ['listen-address', address])
+ self.cli_set(base_path + ['listen-address', address])
options = ['off', 'process-no-validate', 'process', 'log-fail', 'validate']
for option in options:
- self.session.set(base_path + ['dnssec', option])
+ self.cli_set(base_path + ['dnssec', option])
# commit changes
- self.session.commit()
+ self.cli_commit()
tmp = get_config_value('dnssec')
self.assertEqual(tmp, option)
@@ -124,16 +122,16 @@ class TestServicePowerDNS(unittest.TestCase):
# Externe Domain Name Servers (DNS) addresses
for network in allow_from:
- self.session.set(base_path + ['allow-from', network])
+ self.cli_set(base_path + ['allow-from', network])
for address in listen_adress:
- self.session.set(base_path + ['listen-address', address])
+ self.cli_set(base_path + ['listen-address', address])
nameservers = ['192.0.2.1', '192.0.2.2']
for nameserver in nameservers:
- self.session.set(base_path + ['name-server', nameserver])
+ self.cli_set(base_path + ['name-server', nameserver])
# commit changes
- self.session.commit()
+ self.cli_commit()
tmp = get_config_value(r'\+.', file=FORWARD_FILE)
self.assertEqual(tmp, ', '.join(nameservers))
@@ -148,26 +146,26 @@ class TestServicePowerDNS(unittest.TestCase):
def test_domain_forwarding(self):
for network in allow_from:
- self.session.set(base_path + ['allow-from', network])
+ self.cli_set(base_path + ['allow-from', network])
for address in listen_adress:
- self.session.set(base_path + ['listen-address', address])
+ self.cli_set(base_path + ['listen-address', address])
domains = ['vyos.io', 'vyos.net', 'vyos.com']
nameservers = ['192.0.2.1', '192.0.2.2']
for domain in domains:
for nameserver in nameservers:
- self.session.set(base_path + ['domain', domain, 'server', nameserver])
+ self.cli_set(base_path + ['domain', domain, 'server', nameserver])
# Test 'recursion-desired' flag for only one domain
if domain == domains[0]:
- self.session.set(base_path + ['domain', domain, 'recursion-desired'])
+ self.cli_set(base_path + ['domain', domain, 'recursion-desired'])
# Test 'negative trust anchor' flag for the second domain only
if domain == domains[1]:
- self.session.set(base_path + ['domain', domain, 'addnta'])
+ self.cli_set(base_path + ['domain', domain, 'addnta'])
# commit changes
- self.session.commit()
+ self.cli_commit()
# Test configured name-servers
hosts_conf = read_file(HOSTSD_FILE)
diff --git a/smoketest/scripts/cli/test_service_https.py b/smoketest/scripts/cli/test_service_https.py
index fd0f6bfbd..3ed7655e9 100755
--- a/smoketest/scripts/cli/test_service_https.py
+++ b/smoketest/scripts/cli/test_service_https.py
@@ -14,28 +14,27 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-import os
import unittest
+from base_vyostest_shim import VyOSUnitTestSHIM
from vyos.configsession import ConfigSession
from vyos.util import run
base_path = ['service', 'https']
-class TestHTTPSService(unittest.TestCase):
+class TestHTTPSService(VyOSUnitTestSHIM.TestCase):
def setUp(self):
- self.session = ConfigSession(os.getpid())
# ensure we can also run this test on a live system - so lets clean
# out the current configuration :)
- self.session.delete(base_path)
+ self.cli_delete(base_path)
def tearDown(self):
- self.session.delete(base_path)
- self.session.commit()
+ self.cli_delete(base_path)
+ self.cli_commit()
def test_default(self):
- self.session.set(base_path)
- self.session.commit()
+ self.cli_set(base_path)
+ self.cli_commit()
ret = run('sudo /usr/sbin/nginx -t')
self.assertEqual(ret, 0)
@@ -48,11 +47,11 @@ class TestHTTPSService(unittest.TestCase):
test_path = base_path + ['virtual-host', vhost_id]
- self.session.set(test_path + ['listen-address', address])
- self.session.set(test_path + ['listen-port', port])
- self.session.set(test_path + ['server-name', name])
+ self.cli_set(test_path + ['listen-address', address])
+ self.cli_set(test_path + ['listen-port', port])
+ self.cli_set(test_path + ['server-name', name])
- self.session.commit()
+ self.cli_commit()
ret = run('sudo /usr/sbin/nginx -t')
self.assertEqual(ret, 0)
diff --git a/smoketest/scripts/cli/test_service_mdns-repeater.py b/smoketest/scripts/cli/test_service_mdns-repeater.py
index e6986b92a..b1092c3e5 100755
--- a/smoketest/scripts/cli/test_service_mdns-repeater.py
+++ b/smoketest/scripts/cli/test_service_mdns-repeater.py
@@ -14,35 +14,32 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-import os
import unittest
+from base_vyostest_shim import VyOSUnitTestSHIM
+
from vyos.configsession import ConfigSession
from vyos.util import process_named_running
base_path = ['service', 'mdns', 'repeater']
intf_base = ['interfaces', 'dummy']
-class TestServiceMDNSrepeater(unittest.TestCase):
- def setUp(self):
- self.session = ConfigSession(os.getpid())
-
+class TestServiceMDNSrepeater(VyOSUnitTestSHIM.TestCase):
def tearDown(self):
- self.session.delete(base_path)
- self.session.delete(intf_base + ['dum10'])
- self.session.delete(intf_base + ['dum20'])
- self.session.commit()
- del self.session
+ self.cli_delete(base_path)
+ self.cli_delete(intf_base + ['dum10'])
+ self.cli_delete(intf_base + ['dum20'])
+ self.cli_commit()
def test_service(self):
# Service required a configured IP address on the interface
- self.session.set(intf_base + ['dum10', 'address', '192.0.2.1/30'])
- self.session.set(intf_base + ['dum20', 'address', '192.0.2.5/30'])
+ self.cli_set(intf_base + ['dum10', 'address', '192.0.2.1/30'])
+ self.cli_set(intf_base + ['dum20', 'address', '192.0.2.5/30'])
- self.session.set(base_path + ['interface', 'dum10'])
- self.session.set(base_path + ['interface', 'dum20'])
- self.session.commit()
+ self.cli_set(base_path + ['interface', 'dum10'])
+ self.cli_set(base_path + ['interface', 'dum20'])
+ self.cli_commit()
# Check for running process
self.assertTrue(process_named_running('mdns-repeater'))
diff --git a/smoketest/scripts/cli/test_service_pppoe-server.py b/smoketest/scripts/cli/test_service_pppoe-server.py
index bfbe88daa..21d1028ce 100755
--- a/smoketest/scripts/cli/test_service_pppoe-server.py
+++ b/smoketest/scripts/cli/test_service_pppoe-server.py
@@ -1,6 +1,6 @@
#!/usr/bin/env python3
#
-# Copyright (C) 020 VyOS maintainers and contributors
+# Copyright (C) 2020 VyOS maintainers and contributors
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 or later as
@@ -27,7 +27,7 @@ local_if = ['interfaces', 'dummy', 'dum667']
ac_name = 'ACN'
interface = 'eth0'
-class TestServicePPPoEServer(BasicAccelPPPTest.BaseTest):
+class TestServicePPPoEServer(BasicAccelPPPTest.TestCase):
def setUp(self):
self._base_path = ['service', 'pppoe-server']
self._process_name = 'accel-pppd'
@@ -37,7 +37,7 @@ class TestServicePPPoEServer(BasicAccelPPPTest.BaseTest):
super().setUp()
def tearDown(self):
- self.session.delete(local_if)
+ self.cli_delete(local_if)
super().tearDown()
def verify(self, conf):
@@ -66,7 +66,7 @@ class TestServicePPPoEServer(BasicAccelPPPTest.BaseTest):
super().verify(conf)
def basic_config(self):
- self.session.set(local_if + ['address', '192.0.2.1/32'])
+ self.cli_set(local_if + ['address', '192.0.2.1/32'])
self.set(['access-concentrator', ac_name])
self.set(['interface', interface])
@@ -96,7 +96,7 @@ class TestServicePPPoEServer(BasicAccelPPPTest.BaseTest):
self.set(['ppp-options', 'interface-cache', interface_cache])
# commit changes
- self.session.commit()
+ self.cli_commit()
# Validate configuration values
conf = ConfigParser(allow_no_value=True, delimiters='=')
@@ -131,7 +131,7 @@ class TestServicePPPoEServer(BasicAccelPPPTest.BaseTest):
self.set( ['authentication', 'protocols', 'mschap-v2'])
# commit changes
- self.session.commit()
+ self.cli_commit()
# Validate configuration values
conf = ConfigParser(allow_no_value=True)
@@ -157,7 +157,7 @@ class TestServicePPPoEServer(BasicAccelPPPTest.BaseTest):
self.set(['client-ip-pool', 'stop', stop])
# commit changes
- self.session.commit()
+ self.cli_commit()
# Validate configuration values
conf = ConfigParser(allow_no_value=True)
@@ -194,7 +194,7 @@ class TestServicePPPoEServer(BasicAccelPPPTest.BaseTest):
self.set(['client-ipv6-pool', 'delegate', delegate_prefix, 'delegation-prefix', delegate_mask])
# commit changes
- self.session.commit()
+ self.cli_commit()
# Validate configuration values
conf = ConfigParser(allow_no_value=True, delimiters='=')
diff --git a/smoketest/scripts/cli/test_service_router-advert.py b/smoketest/scripts/cli/test_service_router-advert.py
index b80eb3c43..b19c49c6e 100755
--- a/smoketest/scripts/cli/test_service_router-advert.py
+++ b/smoketest/scripts/cli/test_service_router-advert.py
@@ -15,9 +15,10 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import re
-import os
import unittest
+from base_vyostest_shim import VyOSUnitTestSHIM
+
from vyos.configsession import ConfigSession
from vyos.util import read_file
from vyos.util import process_named_running
@@ -33,26 +34,24 @@ def get_config_value(key):
tmp = re.findall(r'\n?{}\s+(.*)'.format(key), tmp)
return tmp[0].split()[0].replace(';','')
-class TestServiceRADVD(unittest.TestCase):
+class TestServiceRADVD(VyOSUnitTestSHIM.TestCase):
def setUp(self):
- self.session = ConfigSession(os.getpid())
- self.session.set(address_base + ['2001:db8::1/64'])
+ self.cli_set(address_base + ['2001:db8::1/64'])
def tearDown(self):
- self.session.delete(address_base)
- self.session.delete(base_path)
- self.session.commit()
- del self.session
+ self.cli_delete(address_base)
+ self.cli_delete(base_path)
+ self.cli_commit()
def test_single(self):
- self.session.set(base_path + ['prefix', '::/64', 'no-on-link-flag'])
- self.session.set(base_path + ['prefix', '::/64', 'no-autonomous-flag'])
- self.session.set(base_path + ['prefix', '::/64', 'valid-lifetime', 'infinity'])
- self.session.set(base_path + ['dnssl', '2001:db8::1234'])
- self.session.set(base_path + ['other-config-flag'])
+ self.cli_set(base_path + ['prefix', '::/64', 'no-on-link-flag'])
+ self.cli_set(base_path + ['prefix', '::/64', 'no-autonomous-flag'])
+ self.cli_set(base_path + ['prefix', '::/64', 'valid-lifetime', 'infinity'])
+ self.cli_set(base_path + ['dnssl', '2001:db8::1234'])
+ self.cli_set(base_path + ['other-config-flag'])
# commit changes
- self.session.commit()
+ self.cli_commit()
# verify values
tmp = get_config_value('interface')
diff --git a/smoketest/scripts/cli/test_service_snmp.py b/smoketest/scripts/cli/test_service_snmp.py
index 81045d0b4..008271102 100755
--- a/smoketest/scripts/cli/test_service_snmp.py
+++ b/smoketest/scripts/cli/test_service_snmp.py
@@ -14,10 +14,10 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-import os
import re
import unittest
+from base_vyostest_shim import VyOSUnitTestSHIM
from vyos.configsession import ConfigSession
from vyos.configsession import ConfigSessionError
@@ -35,15 +35,11 @@ def get_config_value(key):
tmp = re.findall(r'\n?{}\s+(.*)'.format(key), tmp)
return tmp[0]
-class TestSNMPService(unittest.TestCase):
+class TestSNMPService(VyOSUnitTestSHIM.TestCase):
def setUp(self):
- self.session = ConfigSession(os.getpid())
# ensure we can also run this test on a live system - so lets clean
# out the current configuration :)
- self.session.delete(base_path)
-
- def tearDown(self):
- del self.session
+ self.cli_delete(base_path)
def test_snmp_basic(self):
# Check if SNMP can be configured and service runs
@@ -53,19 +49,19 @@ class TestSNMPService(unittest.TestCase):
for auth in ['ro', 'rw']:
community = 'VyOS' + auth
- self.session.set(base_path + ['community', community, 'authorization', auth])
+ self.cli_set(base_path + ['community', community, 'authorization', auth])
for client in clients:
- self.session.set(base_path + ['community', community, 'client', client])
+ self.cli_set(base_path + ['community', community, 'client', client])
for network in networks:
- self.session.set(base_path + ['community', community, 'network', network])
+ self.cli_set(base_path + ['community', community, 'network', network])
for addr in listen:
- self.session.set(base_path + ['listen-address', addr])
+ self.cli_set(base_path + ['listen-address', addr])
- self.session.set(base_path + ['contact', 'maintainers@vyos.io'])
- self.session.set(base_path + ['location', 'qemu'])
+ self.cli_set(base_path + ['contact', 'maintainers@vyos.io'])
+ self.cli_set(base_path + ['location', 'qemu'])
- self.session.commit()
+ self.cli_commit()
# verify listen address, it will be returned as
# ['unix:/run/snmpd.socket,udp:127.0.0.1:161,udp6:[::1]:161']
@@ -88,30 +84,30 @@ class TestSNMPService(unittest.TestCase):
# Check if SNMPv3 can be configured with SHA authentication
# and service runs
- self.session.set(base_path + ['v3', 'engineid', '000000000000000000000002'])
- self.session.set(base_path + ['v3', 'group', 'default', 'mode', 'ro'])
+ self.cli_set(base_path + ['v3', 'engineid', '000000000000000000000002'])
+ self.cli_set(base_path + ['v3', 'group', 'default', 'mode', 'ro'])
# check validate() - a view must be created before this can be comitted
with self.assertRaises(ConfigSessionError):
- self.session.commit()
+ self.cli_commit()
- self.session.set(base_path + ['v3', 'view', 'default', 'oid', '1'])
- self.session.set(base_path + ['v3', 'group', 'default', 'view', 'default'])
+ self.cli_set(base_path + ['v3', 'view', 'default', 'oid', '1'])
+ self.cli_set(base_path + ['v3', 'group', 'default', 'view', 'default'])
# create user
- self.session.set(base_path + ['v3', 'user', 'vyos', 'auth', 'plaintext-password', 'vyos12345678'])
- self.session.set(base_path + ['v3', 'user', 'vyos', 'auth', 'type', 'sha'])
- self.session.set(base_path + ['v3', 'user', 'vyos', 'privacy', 'plaintext-password', 'vyos12345678'])
- self.session.set(base_path + ['v3', 'user', 'vyos', 'privacy', 'type', 'aes'])
- self.session.set(base_path + ['v3', 'user', 'vyos', 'group', 'default'])
+ self.cli_set(base_path + ['v3', 'user', 'vyos', 'auth', 'plaintext-password', 'vyos12345678'])
+ self.cli_set(base_path + ['v3', 'user', 'vyos', 'auth', 'type', 'sha'])
+ self.cli_set(base_path + ['v3', 'user', 'vyos', 'privacy', 'plaintext-password', 'vyos12345678'])
+ self.cli_set(base_path + ['v3', 'user', 'vyos', 'privacy', 'type', 'aes'])
+ self.cli_set(base_path + ['v3', 'user', 'vyos', 'group', 'default'])
- self.session.commit()
+ self.cli_commit()
# commit will alter the CLI values - check if they have been updated:
hashed_password = '4e52fe55fd011c9c51ae2c65f4b78ca93dcafdfe'
- tmp = self.session.show_config(base_path + ['v3', 'user', 'vyos', 'auth', 'encrypted-password']).split()[1]
+ tmp = self._session.show_config(base_path + ['v3', 'user', 'vyos', 'auth', 'encrypted-password']).split()[1]
self.assertEqual(tmp, hashed_password)
- tmp = self.session.show_config(base_path + ['v3', 'user', 'vyos', 'privacy', 'encrypted-password']).split()[1]
+ tmp = self._session.show_config(base_path + ['v3', 'user', 'vyos', 'privacy', 'encrypted-password']).split()[1]
self.assertEqual(tmp, hashed_password)
# TODO: read in config file and check values
@@ -123,30 +119,30 @@ class TestSNMPService(unittest.TestCase):
# Check if SNMPv3 can be configured with MD5 authentication
# and service runs
- self.session.set(base_path + ['v3', 'engineid', '000000000000000000000002'])
- self.session.set(base_path + ['v3', 'group', 'default', 'mode', 'ro'])
+ self.cli_set(base_path + ['v3', 'engineid', '000000000000000000000002'])
+ self.cli_set(base_path + ['v3', 'group', 'default', 'mode', 'ro'])
# check validate() - a view must be created before this can be comitted
with self.assertRaises(ConfigSessionError):
- self.session.commit()
+ self.cli_commit()
- self.session.set(base_path + ['v3', 'view', 'default', 'oid', '1'])
- self.session.set(base_path + ['v3', 'group', 'default', 'view', 'default'])
+ self.cli_set(base_path + ['v3', 'view', 'default', 'oid', '1'])
+ self.cli_set(base_path + ['v3', 'group', 'default', 'view', 'default'])
# create user
- self.session.set(base_path + ['v3', 'user', 'vyos', 'auth', 'plaintext-password', 'vyos12345678'])
- self.session.set(base_path + ['v3', 'user', 'vyos', 'auth', 'type', 'md5'])
- self.session.set(base_path + ['v3', 'user', 'vyos', 'privacy', 'plaintext-password', 'vyos12345678'])
- self.session.set(base_path + ['v3', 'user', 'vyos', 'privacy', 'type', 'des'])
- self.session.set(base_path + ['v3', 'user', 'vyos', 'group', 'default'])
+ self.cli_set(base_path + ['v3', 'user', 'vyos', 'auth', 'plaintext-password', 'vyos12345678'])
+ self.cli_set(base_path + ['v3', 'user', 'vyos', 'auth', 'type', 'md5'])
+ self.cli_set(base_path + ['v3', 'user', 'vyos', 'privacy', 'plaintext-password', 'vyos12345678'])
+ self.cli_set(base_path + ['v3', 'user', 'vyos', 'privacy', 'type', 'des'])
+ self.cli_set(base_path + ['v3', 'user', 'vyos', 'group', 'default'])
- self.session.commit()
+ self.cli_commit()
# commit will alter the CLI values - check if they have been updated:
hashed_password = '4c67690d45d3dfcd33d0d7e308e370ad'
- tmp = self.session.show_config(base_path + ['v3', 'user', 'vyos', 'auth', 'encrypted-password']).split()[1]
+ tmp = self._session.show_config(base_path + ['v3', 'user', 'vyos', 'auth', 'encrypted-password']).split()[1]
self.assertEqual(tmp, hashed_password)
- tmp = self.session.show_config(base_path + ['v3', 'user', 'vyos', 'privacy', 'encrypted-password']).split()[1]
+ tmp = self._session.show_config(base_path + ['v3', 'user', 'vyos', 'privacy', 'encrypted-password']).split()[1]
self.assertEqual(tmp, hashed_password)
# TODO: read in config file and check values
diff --git a/smoketest/scripts/cli/test_service_ssh.py b/smoketest/scripts/cli/test_service_ssh.py
index 1e099b0a5..01b875867 100755
--- a/smoketest/scripts/cli/test_service_ssh.py
+++ b/smoketest/scripts/cli/test_service_ssh.py
@@ -18,6 +18,8 @@ import re
import os
import unittest
+from base_vyostest_shim import VyOSUnitTestSHIM
+
from vyos.configsession import ConfigSession
from vyos.configsession import ConfigSessionError
from vyos.util import cmd
@@ -34,26 +36,24 @@ def get_config_value(key):
tmp = re.findall(f'\n?{key}\s+(.*)', tmp)
return tmp
-class TestServiceSSH(unittest.TestCase):
+class TestServiceSSH(VyOSUnitTestSHIM.TestCase):
def setUp(self):
- self.session = ConfigSession(os.getpid())
# ensure we can also run this test on a live system - so lets clean
# out the current configuration :)
- self.session.delete(base_path)
+ self.cli_delete(base_path)
def tearDown(self):
# delete testing SSH config
- self.session.delete(base_path)
- self.session.commit()
- del self.session
+ self.cli_delete(base_path)
+ self.cli_commit()
def test_ssh_default(self):
# Check if SSH service runs with default settings - used for checking
# behavior of <defaultValue> in XML definition
- self.session.set(base_path)
+ self.cli_set(base_path)
# commit changes
- self.session.commit()
+ self.cli_commit()
# Check configured port
port = get_config_value('Port')[0]
@@ -64,15 +64,15 @@ class TestServiceSSH(unittest.TestCase):
def test_ssh_single_listen_address(self):
# Check if SSH service can be configured and runs
- self.session.set(base_path + ['port', '1234'])
- self.session.set(base_path + ['disable-host-validation'])
- self.session.set(base_path + ['disable-password-authentication'])
- self.session.set(base_path + ['loglevel', 'verbose'])
- self.session.set(base_path + ['client-keepalive-interval', '100'])
- self.session.set(base_path + ['listen-address', '127.0.0.1'])
+ self.cli_set(base_path + ['port', '1234'])
+ self.cli_set(base_path + ['disable-host-validation'])
+ self.cli_set(base_path + ['disable-password-authentication'])
+ self.cli_set(base_path + ['loglevel', 'verbose'])
+ self.cli_set(base_path + ['client-keepalive-interval', '100'])
+ self.cli_set(base_path + ['listen-address', '127.0.0.1'])
# commit changes
- self.session.commit()
+ self.cli_commit()
# Check configured port
port = get_config_value('Port')[0]
@@ -106,14 +106,14 @@ class TestServiceSSH(unittest.TestCase):
# listen ports and listen-addresses
ports = ['22', '2222', '2223', '2224']
for port in ports:
- self.session.set(base_path + ['port', port])
+ self.cli_set(base_path + ['port', port])
addresses = ['127.0.0.1', '::1']
for address in addresses:
- self.session.set(base_path + ['listen-address', address])
+ self.cli_set(base_path + ['listen-address', address])
# commit changes
- self.session.commit()
+ self.cli_commit()
# Check configured port
tmp = get_config_value('Port')
@@ -131,17 +131,17 @@ class TestServiceSSH(unittest.TestCase):
def test_ssh_vrf(self):
# Check if SSH service can be bound to given VRF
port = '22'
- self.session.set(base_path + ['port', port])
- self.session.set(base_path + ['vrf', vrf])
+ self.cli_set(base_path + ['port', port])
+ self.cli_set(base_path + ['vrf', vrf])
# VRF does yet not exist - an error must be thrown
with self.assertRaises(ConfigSessionError):
- self.session.commit()
+ self.cli_commit()
- self.session.set(['vrf', 'name', vrf, 'table', '1338'])
+ self.cli_set(['vrf', 'name', vrf, 'table', '1338'])
# commit changes
- self.session.commit()
+ self.cli_commit()
# Check configured port
tmp = get_config_value('Port')
@@ -155,7 +155,7 @@ class TestServiceSSH(unittest.TestCase):
self.assertIn(PROCESS_NAME, tmp)
# delete VRF
- self.session.delete(['vrf', 'name', vrf])
+ self.cli_delete(['vrf', 'name', vrf])
if __name__ == '__main__':
unittest.main(verbosity=2)
diff --git a/smoketest/scripts/cli/test_service_tftp-server.py b/smoketest/scripts/cli/test_service_tftp-server.py
index 82e5811ff..aed4c6beb 100755
--- a/smoketest/scripts/cli/test_service_tftp-server.py
+++ b/smoketest/scripts/cli/test_service_tftp-server.py
@@ -14,11 +14,10 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-import re
-import os
import unittest
from psutil import process_iter
+from base_vyostest_shim import VyOSUnitTestSHIM
from vyos.configsession import ConfigSession
from vyos.configsession import ConfigSessionError
@@ -32,28 +31,26 @@ dummy_if_path = ['interfaces', 'dummy', 'dum69']
address_ipv4 = '192.0.2.1'
address_ipv6 = '2001:db8::1'
-class TestServiceTFTPD(unittest.TestCase):
+class TestServiceTFTPD(VyOSUnitTestSHIM.TestCase):
def setUp(self):
- self.session = ConfigSession(os.getpid())
- self.session.set(dummy_if_path + ['address', address_ipv4 + '/32'])
- self.session.set(dummy_if_path + ['address', address_ipv6 + '/128'])
+ self.cli_set(dummy_if_path + ['address', address_ipv4 + '/32'])
+ self.cli_set(dummy_if_path + ['address', address_ipv6 + '/128'])
def tearDown(self):
- self.session.delete(base_path)
- self.session.delete(dummy_if_path)
- self.session.commit()
- del self.session
+ self.cli_delete(base_path)
+ self.cli_delete(dummy_if_path)
+ self.cli_commit()
def test_01_tftpd_single(self):
directory = '/tmp'
port = '69' # default port
- self.session.set(base_path + ['allow-upload'])
- self.session.set(base_path + ['directory', directory])
- self.session.set(base_path + ['listen-address', address_ipv4])
+ self.cli_set(base_path + ['allow-upload'])
+ self.cli_set(base_path + ['directory', directory])
+ self.cli_set(base_path + ['listen-address', address_ipv4])
# commit changes
- self.session.commit()
+ self.cli_commit()
config = read_file('/etc/default/tftpd0')
# verify listen IP address
@@ -71,13 +68,13 @@ class TestServiceTFTPD(unittest.TestCase):
address = [address_ipv4, address_ipv6]
port = '70'
- self.session.set(base_path + ['directory', directory])
+ self.cli_set(base_path + ['directory', directory])
for addr in address:
- self.session.set(base_path + ['listen-address', addr])
- self.session.set(base_path + ['port', port])
+ self.cli_set(base_path + ['listen-address', addr])
+ self.cli_set(base_path + ['port', port])
# commit changes
- self.session.commit()
+ self.cli_commit()
for idx in range(0, len(address)):
config = read_file(f'/etc/default/tftpd{idx}')
diff --git a/smoketest/scripts/cli/test_service_webproxy.py b/smoketest/scripts/cli/test_service_webproxy.py
index 3db2daa8f..d47bd452d 100755
--- a/smoketest/scripts/cli/test_service_webproxy.py
+++ b/smoketest/scripts/cli/test_service_webproxy.py
@@ -14,9 +14,10 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-import os
import unittest
+from base_vyostest_shim import VyOSUnitTestSHIM
+
from vyos.configsession import ConfigSession
from vyos.configsession import ConfigSessionError
from vyos.util import cmd
@@ -29,23 +30,21 @@ base_path = ['service', 'webproxy']
listen_if = 'dum3632'
listen_ip = '192.0.2.1'
-class TestServiceWebProxy(unittest.TestCase):
+class TestServiceWebProxy(VyOSUnitTestSHIM.TestCase):
def setUp(self):
- self.session = ConfigSession(os.getpid())
- self.session.set(['interfaces', 'dummy', listen_if, 'address', listen_ip + '/32'])
+ self.cli_set(['interfaces', 'dummy', listen_if, 'address', listen_ip + '/32'])
def tearDown(self):
- self.session.delete(['interfaces', 'dummy', listen_if])
- self.session.delete(base_path)
- self.session.commit()
- del self.session
+ self.cli_delete(['interfaces', 'dummy', listen_if])
+ self.cli_delete(base_path)
+ self.cli_commit()
def test_01_basic_proxy(self):
default_cache = '100'
- self.session.set(base_path + ['listen-address', listen_ip])
+ self.cli_set(base_path + ['listen-address', listen_ip])
# commit changes
- self.session.commit()
+ self.cli_commit()
config = read_file(PROXY_CONF)
self.assertIn(f'http_port {listen_ip}:3128 intercept', config)
@@ -84,24 +83,24 @@ class TestServiceWebProxy(unittest.TestCase):
block_mine = ['application/pdf', 'application/x-sh']
body_max_size = '4096'
- self.session.set(base_path + ['listen-address', listen_ip])
- self.session.set(base_path + ['append-domain', domain])
- self.session.set(base_path + ['default-port', port])
- self.session.set(base_path + ['cache-size', cache_size])
- self.session.set(base_path + ['disable-access-log'])
+ self.cli_set(base_path + ['listen-address', listen_ip])
+ self.cli_set(base_path + ['append-domain', domain])
+ self.cli_set(base_path + ['default-port', port])
+ self.cli_set(base_path + ['cache-size', cache_size])
+ self.cli_set(base_path + ['disable-access-log'])
- self.session.set(base_path + ['minimum-object-size', min_obj_size])
- self.session.set(base_path + ['maximum-object-size', max_obj_size])
+ self.cli_set(base_path + ['minimum-object-size', min_obj_size])
+ self.cli_set(base_path + ['maximum-object-size', max_obj_size])
- self.session.set(base_path + ['outgoing-address', listen_ip])
+ self.cli_set(base_path + ['outgoing-address', listen_ip])
for mime in block_mine:
- self.session.set(base_path + ['reply-block-mime', mime])
+ self.cli_set(base_path + ['reply-block-mime', mime])
- self.session.set(base_path + ['reply-body-max-size', body_max_size])
+ self.cli_set(base_path + ['reply-body-max-size', body_max_size])
# commit changes
- self.session.commit()
+ self.cli_commit()
config = read_file(PROXY_CONF)
self.assertIn(f'http_port {listen_ip}:{port} intercept', config)
@@ -132,34 +131,34 @@ class TestServiceWebProxy(unittest.TestCase):
ldap_attr = 'cn'
ldap_filter = '(cn=%s)'
- self.session.set(base_path + ['listen-address', listen_ip, 'disable-transparent'])
- self.session.set(base_path + ['authentication', 'children', auth_children])
- self.session.set(base_path + ['authentication', 'credentials-ttl', cred_ttl])
+ self.cli_set(base_path + ['listen-address', listen_ip, 'disable-transparent'])
+ self.cli_set(base_path + ['authentication', 'children', auth_children])
+ self.cli_set(base_path + ['authentication', 'credentials-ttl', cred_ttl])
- self.session.set(base_path + ['authentication', 'realm', realm])
- self.session.set(base_path + ['authentication', 'method', 'ldap'])
+ self.cli_set(base_path + ['authentication', 'realm', realm])
+ self.cli_set(base_path + ['authentication', 'method', 'ldap'])
# check validate() - LDAP authentication is enabled, but server not set
with self.assertRaises(ConfigSessionError):
- self.session.commit()
- self.session.set(base_path + ['authentication', 'ldap', 'server', ldap_server])
+ self.cli_commit()
+ self.cli_set(base_path + ['authentication', 'ldap', 'server', ldap_server])
# check validate() - LDAP password can not be set when bind-dn is not define
- self.session.set(base_path + ['authentication', 'ldap', 'password', ldap_password])
+ self.cli_set(base_path + ['authentication', 'ldap', 'password', ldap_password])
with self.assertRaises(ConfigSessionError):
- self.session.commit()
- self.session.set(base_path + ['authentication', 'ldap', 'bind-dn', ldap_bind_dn])
+ self.cli_commit()
+ self.cli_set(base_path + ['authentication', 'ldap', 'bind-dn', ldap_bind_dn])
# check validate() - LDAP base-dn must be set
with self.assertRaises(ConfigSessionError):
- self.session.commit()
- self.session.set(base_path + ['authentication', 'ldap', 'base-dn', ldap_base_dn])
+ self.cli_commit()
+ self.cli_set(base_path + ['authentication', 'ldap', 'base-dn', ldap_base_dn])
- self.session.set(base_path + ['authentication', 'ldap', 'username-attribute', ldap_attr])
- self.session.set(base_path + ['authentication', 'ldap', 'filter-expression', ldap_filter])
- self.session.set(base_path + ['authentication', 'ldap', 'use-ssl'])
+ self.cli_set(base_path + ['authentication', 'ldap', 'username-attribute', ldap_attr])
+ self.cli_set(base_path + ['authentication', 'ldap', 'filter-expression', ldap_filter])
+ self.cli_set(base_path + ['authentication', 'ldap', 'use-ssl'])
# commit changes
- self.session.commit()
+ self.cli_commit()
config = read_file(PROXY_CONF)
self.assertIn(f'http_port {listen_ip}:3128', config) # disable-transparent
@@ -175,7 +174,7 @@ class TestServiceWebProxy(unittest.TestCase):
self.assertTrue(process_named_running(PROCESS_NAME))
def test_04_cache_peer(self):
- self.session.set(base_path + ['listen-address', listen_ip])
+ self.cli_set(base_path + ['listen-address', listen_ip])
cache_peers = {
'foo' : '192.0.2.1',
@@ -183,12 +182,12 @@ class TestServiceWebProxy(unittest.TestCase):
'baz' : '192.0.2.3',
}
for peer in cache_peers:
- self.session.set(base_path + ['cache-peer', peer, 'address', cache_peers[peer]])
+ self.cli_set(base_path + ['cache-peer', peer, 'address', cache_peers[peer]])
if peer == 'baz':
- self.session.set(base_path + ['cache-peer', peer, 'type', 'sibling'])
+ self.cli_set(base_path + ['cache-peer', peer, 'type', 'sibling'])
# commit changes
- self.session.commit()
+ self.cli_commit()
config = read_file(PROXY_CONF)
self.assertIn('never_direct allow all', config)
@@ -214,22 +213,22 @@ class TestServiceWebProxy(unittest.TestCase):
local_ok = ['10.0.0.0', 'vyos.net']
local_ok_url = ['vyos.net', 'vyos.io']
- self.session.set(base_path + ['listen-address', listen_ip])
- self.session.set(base_path + ['url-filtering', 'squidguard', 'log', 'all'])
+ self.cli_set(base_path + ['listen-address', listen_ip])
+ self.cli_set(base_path + ['url-filtering', 'squidguard', 'log', 'all'])
for block in local_block:
- self.session.set(base_path + ['url-filtering', 'squidguard', 'local-block', block])
+ self.cli_set(base_path + ['url-filtering', 'squidguard', 'local-block', block])
for ok in local_ok:
- self.session.set(base_path + ['url-filtering', 'squidguard', 'local-ok', ok])
+ self.cli_set(base_path + ['url-filtering', 'squidguard', 'local-ok', ok])
for url in local_block_url:
- self.session.set(base_path + ['url-filtering', 'squidguard', 'local-block-url', url])
+ self.cli_set(base_path + ['url-filtering', 'squidguard', 'local-block-url', url])
for url in local_ok_url:
- self.session.set(base_path + ['url-filtering', 'squidguard', 'local-ok-url', url])
+ self.cli_set(base_path + ['url-filtering', 'squidguard', 'local-ok-url', url])
for pattern in local_block_pattern:
- self.session.set(base_path + ['url-filtering', 'squidguard', 'local-block-keyword', pattern])
+ self.cli_set(base_path + ['url-filtering', 'squidguard', 'local-block-keyword', pattern])
# commit changes
- self.session.commit()
+ self.cli_commit()
# Check regular Squid config
config = read_file(PROXY_CONF)
diff --git a/smoketest/scripts/cli/test_system_acceleration_qat.py b/smoketest/scripts/cli/test_system_acceleration_qat.py
index cadb263f5..0a86f58b8 100755
--- a/smoketest/scripts/cli/test_system_acceleration_qat.py
+++ b/smoketest/scripts/cli/test_system_acceleration_qat.py
@@ -14,22 +14,19 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-import os
import unittest
+from base_vyostest_shim import VyOSUnitTestSHIM
+
from vyos.configsession import ConfigSession
from vyos.configsession import ConfigSessionError
base_path = ['system', 'acceleration', 'qat']
-class TestSystemLCD(unittest.TestCase):
- def setUp(self):
- self.session = ConfigSession(os.getpid())
-
+class TestIntelQAT(VyOSUnitTestSHIM.TestCase):
def tearDown(self):
- self.session.delete(base_path)
- self.session.commit()
- del self.session
+ self.cli_delete(base_path)
+ self.cli_commit()
def test_basic(self):
""" Check if configuration script is in place and that the config
@@ -37,11 +34,11 @@ class TestSystemLCD(unittest.TestCase):
be extended with QAT autodetection once run on a QAT enabled device """
# configure some system display
- self.session.set(base_path)
+ self.cli_set(base_path)
# An error must be thrown if QAT device could not be found
with self.assertRaises(ConfigSessionError):
- self.session.commit()
+ self.cli_commit()
if __name__ == '__main__':
unittest.main(verbosity=2)
diff --git a/smoketest/scripts/cli/test_system_ip.py b/smoketest/scripts/cli/test_system_ip.py
index 4ad8d537d..e98a4e234 100755
--- a/smoketest/scripts/cli/test_system_ip.py
+++ b/smoketest/scripts/cli/test_system_ip.py
@@ -14,22 +14,18 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-import os
import unittest
+from base_vyostest_shim import VyOSUnitTestSHIM
from vyos.configsession import ConfigSession
from vyos.util import read_file
base_path = ['system', 'ip']
-class TestSystemIP(unittest.TestCase):
- def setUp(self):
- self.session = ConfigSession(os.getpid())
-
+class TestSystemIP(VyOSUnitTestSHIM.TestCase):
def tearDown(self):
- self.session.delete(base_path)
- self.session.commit()
- del self.session
+ self.cli_delete(base_path)
+ self.cli_commit()
def test_system_ip_forwarding(self):
# Test if IPv4 forwarding can be disabled globally, default is '1'
@@ -37,8 +33,8 @@ class TestSystemIP(unittest.TestCase):
all_forwarding = '/proc/sys/net/ipv4/conf/all/forwarding'
self.assertEqual(read_file(all_forwarding), '1')
- self.session.set(base_path + ['disable-forwarding'])
- self.session.commit()
+ self.cli_set(base_path + ['disable-forwarding'])
+ self.cli_commit()
self.assertEqual(read_file(all_forwarding), '0')
@@ -50,9 +46,9 @@ class TestSystemIP(unittest.TestCase):
self.assertEqual(read_file(use_neigh), '0')
self.assertEqual(read_file(hash_policy), '0')
- self.session.set(base_path + ['multipath', 'ignore-unreachable-nexthops'])
- self.session.set(base_path + ['multipath', 'layer4-hashing'])
- self.session.commit()
+ self.cli_set(base_path + ['multipath', 'ignore-unreachable-nexthops'])
+ self.cli_set(base_path + ['multipath', 'layer4-hashing'])
+ self.cli_commit()
self.assertEqual(read_file(use_neigh), '1')
self.assertEqual(read_file(hash_policy), '1')
@@ -69,8 +65,8 @@ class TestSystemIP(unittest.TestCase):
self.assertEqual(read_file(gc_thresh1), '1024')
for size in [1024, 2048, 4096, 8192, 16384, 32768]:
- self.session.set(base_path + ['arp', 'table-size', str(size)])
- self.session.commit()
+ self.cli_set(base_path + ['arp', 'table-size', str(size)])
+ self.cli_commit()
self.assertEqual(read_file(gc_thresh3), str(size))
self.assertEqual(read_file(gc_thresh2), str(size // 2))
diff --git a/smoketest/scripts/cli/test_system_ipv6.py b/smoketest/scripts/cli/test_system_ipv6.py
index df69739eb..c9c9e833d 100755
--- a/smoketest/scripts/cli/test_system_ipv6.py
+++ b/smoketest/scripts/cli/test_system_ipv6.py
@@ -14,9 +14,9 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-import os
import unittest
+from base_vyostest_shim import VyOSUnitTestSHIM
from vyos.configsession import ConfigSession
from vyos.util import read_file
@@ -27,30 +27,26 @@ file_disable = '/etc/modprobe.d/vyos_disable_ipv6.conf'
file_dad = '/proc/sys/net/ipv6/conf/all/accept_dad'
file_multipath = '/proc/sys/net/ipv6/fib_multipath_hash_policy'
-class TestSystemIPv6(unittest.TestCase):
- def setUp(self):
- self.session = ConfigSession(os.getpid())
-
+class TestSystemIPv6(VyOSUnitTestSHIM.TestCase):
def tearDown(self):
- self.session.delete(base_path)
- self.session.commit()
- del self.session
+ self.cli_delete(base_path)
+ self.cli_commit()
def test_system_ipv6_forwarding(self):
# Test if IPv6 forwarding can be disabled globally, default is '1'
# which means forwearding enabled
self.assertEqual(read_file(file_forwarding), '1')
- self.session.set(base_path + ['disable-forwarding'])
- self.session.commit()
+ self.cli_set(base_path + ['disable-forwarding'])
+ self.cli_commit()
self.assertEqual(read_file(file_forwarding), '0')
def test_system_ipv6_disable(self):
# Do not assign any IPv6 address on interfaces, this requires a reboot
# which can not be tested, but we can read the config file :)
- self.session.set(base_path + ['disable'])
- self.session.commit()
+ self.cli_set(base_path + ['disable'])
+ self.cli_commit()
# Verify configuration file
self.assertEqual(read_file(file_disable), 'options ipv6 disable_ipv6=1')
@@ -61,8 +57,8 @@ class TestSystemIPv6(unittest.TestCase):
# Do not assign any IPv6 address on interfaces, this requires a reboot
# which can not be tested, but we can read the config file :)
- self.session.set(base_path + ['strict-dad'])
- self.session.commit()
+ self.cli_set(base_path + ['strict-dad'])
+ self.cli_commit()
# Verify configuration file
self.assertEqual(read_file(file_dad), '2')
@@ -73,8 +69,8 @@ class TestSystemIPv6(unittest.TestCase):
# Do not assign any IPv6 address on interfaces, this requires a reboot
# which can not be tested, but we can read the config file :)
- self.session.set(base_path + ['multipath', 'layer4-hashing'])
- self.session.commit()
+ self.cli_set(base_path + ['multipath', 'layer4-hashing'])
+ self.cli_commit()
# Verify configuration file
self.assertEqual(read_file(file_multipath), '1')
@@ -91,8 +87,8 @@ class TestSystemIPv6(unittest.TestCase):
self.assertEqual(read_file(gc_thresh1), '1024')
for size in [1024, 2048, 4096, 8192, 16384, 32768]:
- self.session.set(base_path + ['neighbor', 'table-size', str(size)])
- self.session.commit()
+ self.cli_set(base_path + ['neighbor', 'table-size', str(size)])
+ self.cli_commit()
self.assertEqual(read_file(gc_thresh3), str(size))
self.assertEqual(read_file(gc_thresh2), str(size // 2))
diff --git a/smoketest/scripts/cli/test_system_lcd.py b/smoketest/scripts/cli/test_system_lcd.py
index 2bf601e3b..7a39e2986 100755
--- a/smoketest/scripts/cli/test_system_lcd.py
+++ b/smoketest/scripts/cli/test_system_lcd.py
@@ -14,32 +14,29 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-import os
import unittest
+from base_vyostest_shim import VyOSUnitTestSHIM
from configparser import ConfigParser
+
from vyos.configsession import ConfigSession
from vyos.util import process_named_running
config_file = '/run/LCDd/LCDd.conf'
base_path = ['system', 'lcd']
-class TestSystemLCD(unittest.TestCase):
- def setUp(self):
- self.session = ConfigSession(os.getpid())
-
+class TestSystemLCD(VyOSUnitTestSHIM.TestCase):
def tearDown(self):
- self.session.delete(base_path)
- self.session.commit()
- del self.session
+ self.cli_delete(base_path)
+ self.cli_commit()
def test_system_display(self):
# configure some system display
- self.session.set(base_path + ['device', 'ttyS1'])
- self.session.set(base_path + ['model', 'cfa-533'])
+ self.cli_set(base_path + ['device', 'ttyS1'])
+ self.cli_set(base_path + ['model', 'cfa-533'])
# commit changes
- self.session.commit()
+ self.cli_commit()
# load up ini-styled LCDd.conf
conf = ConfigParser()
diff --git a/smoketest/scripts/cli/test_system_login.py b/smoketest/scripts/cli/test_system_login.py
index dfa56e971..8327235fb 100755
--- a/smoketest/scripts/cli/test_system_login.py
+++ b/smoketest/scripts/cli/test_system_login.py
@@ -14,11 +14,12 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-import os
import re
import platform
import unittest
+from base_vyostest_shim import VyOSUnitTestSHIM
+
from distutils.version import LooseVersion
from platform import release as kernel_version
from subprocess import Popen, PIPE
@@ -32,42 +33,38 @@ from vyos.template import inc_ip
base_path = ['system', 'login']
users = ['vyos1', 'vyos2']
-class TestSystemLogin(unittest.TestCase):
- def setUp(self):
- self.session = ConfigSession(os.getpid())
-
+class TestSystemLogin(VyOSUnitTestSHIM.TestCase):
def tearDown(self):
# Delete individual users from configuration
for user in users:
- self.session.delete(base_path + ['user', user])
+ self.cli_delete(base_path + ['user', user])
- self.session.commit()
- del self.session
+ self.cli_commit()
def test_add_linux_system_user(self):
system_user = 'backup'
- self.session.set(base_path + ['user', system_user, 'authentication', 'plaintext-password', system_user])
+ self.cli_set(base_path + ['user', system_user, 'authentication', 'plaintext-password', system_user])
# check validate() - can not add username which exists on the Debian
# base system (UID < 1000)
with self.assertRaises(ConfigSessionError):
- self.session.commit()
+ self.cli_commit()
- self.session.delete(base_path + ['user', system_user])
+ self.cli_delete(base_path + ['user', system_user])
def test_system_login_user(self):
# Check if user can be created and we can SSH to localhost
- self.session.set(['service', 'ssh', 'port', '22'])
+ self.cli_set(['service', 'ssh', 'port', '22'])
for user in users:
name = "VyOS Roxx " + user
home_dir = "/tmp/" + user
- self.session.set(base_path + ['user', user, 'authentication', 'plaintext-password', user])
- self.session.set(base_path + ['user', user, 'full-name', 'VyOS Roxx'])
- self.session.set(base_path + ['user', user, 'home-directory', home_dir])
+ self.cli_set(base_path + ['user', user, 'authentication', 'plaintext-password', user])
+ self.cli_set(base_path + ['user', user, 'full-name', 'VyOS Roxx'])
+ self.cli_set(base_path + ['user', user, 'home-directory', home_dir])
- self.session.commit()
+ self.cli_commit()
for user in users:
cmd = ['su','-', user]
@@ -104,18 +101,18 @@ class TestSystemLogin(unittest.TestCase):
radius_port = '2000'
radius_timeout = '1'
- self.session.set(base_path + ['radius', 'server', radius_server, 'key', radius_key])
- self.session.set(base_path + ['radius', 'server', radius_server, 'port', radius_port])
- self.session.set(base_path + ['radius', 'server', radius_server, 'timeout', radius_timeout])
- self.session.set(base_path + ['radius', 'source-address', radius_source])
- self.session.set(base_path + ['radius', 'source-address', inc_ip(radius_source, 1)])
+ self.cli_set(base_path + ['radius', 'server', radius_server, 'key', radius_key])
+ self.cli_set(base_path + ['radius', 'server', radius_server, 'port', radius_port])
+ self.cli_set(base_path + ['radius', 'server', radius_server, 'timeout', radius_timeout])
+ self.cli_set(base_path + ['radius', 'source-address', radius_source])
+ self.cli_set(base_path + ['radius', 'source-address', inc_ip(radius_source, 1)])
# check validate() - Only one IPv4 source-address supported
with self.assertRaises(ConfigSessionError):
- self.session.commit()
- self.session.delete(base_path + ['radius', 'source-address', inc_ip(radius_source, 1)])
+ self.cli_commit()
+ self.cli_delete(base_path + ['radius', 'source-address', inc_ip(radius_source, 1)])
- self.session.commit()
+ self.cli_commit()
# this file must be read with higher permissions
pam_radius_auth_conf = cmd('sudo cat /etc/pam_radius_auth.conf')
@@ -158,18 +155,18 @@ class TestSystemLogin(unittest.TestCase):
radius_port = '4000'
radius_timeout = '4'
- self.session.set(base_path + ['radius', 'server', radius_server, 'key', radius_key])
- self.session.set(base_path + ['radius', 'server', radius_server, 'port', radius_port])
- self.session.set(base_path + ['radius', 'server', radius_server, 'timeout', radius_timeout])
- self.session.set(base_path + ['radius', 'source-address', radius_source])
- self.session.set(base_path + ['radius', 'source-address', inc_ip(radius_source, 1)])
+ self.cli_set(base_path + ['radius', 'server', radius_server, 'key', radius_key])
+ self.cli_set(base_path + ['radius', 'server', radius_server, 'port', radius_port])
+ self.cli_set(base_path + ['radius', 'server', radius_server, 'timeout', radius_timeout])
+ self.cli_set(base_path + ['radius', 'source-address', radius_source])
+ self.cli_set(base_path + ['radius', 'source-address', inc_ip(radius_source, 1)])
# check validate() - Only one IPv4 source-address supported
with self.assertRaises(ConfigSessionError):
- self.session.commit()
- self.session.delete(base_path + ['radius', 'source-address', inc_ip(radius_source, 1)])
+ self.cli_commit()
+ self.cli_delete(base_path + ['radius', 'source-address', inc_ip(radius_source, 1)])
- self.session.commit()
+ self.cli_commit()
# this file must be read with higher permissions
pam_radius_auth_conf = cmd('sudo cat /etc/pam_radius_auth.conf')
diff --git a/smoketest/scripts/cli/test_system_nameserver.py b/smoketest/scripts/cli/test_system_nameserver.py
index 5610c90c7..50dc466c2 100755
--- a/smoketest/scripts/cli/test_system_nameserver.py
+++ b/smoketest/scripts/cli/test_system_nameserver.py
@@ -14,12 +14,15 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-import os
import re
import unittest
-from vyos.configsession import ConfigSession, ConfigSessionError
-import vyos.util as util
+from base_vyostest_shim import VyOSUnitTestSHIM
+
+from vyos.configsession import ConfigSession
+from vyos.configsession import ConfigSessionError
+
+from vyos.util import read_file
RESOLV_CONF = '/etc/resolv.conf'
@@ -27,25 +30,20 @@ test_servers = ['192.0.2.10', '2001:db8:1::100']
base_path = ['system', 'name-server']
def get_name_servers():
- resolv_conf = util.read_file(RESOLV_CONF)
+ resolv_conf = read_file(RESOLV_CONF)
return re.findall(r'\n?nameserver\s+(.*)', resolv_conf)
-class TestSystemNameServer(unittest.TestCase):
- def setUp(self):
- self.session = ConfigSession(os.getpid())
-
+class TestSystemNameServer(VyOSUnitTestSHIM.TestCase):
def tearDown(self):
# Delete existing name servers
- self.session.delete(base_path)
- self.session.commit()
-
- del self.session
+ self.cli_delete(base_path)
+ self.cli_commit()
def test_nameserver_add(self):
# Check if server is added to resolv.conf
for s in test_servers:
- self.session.set(base_path + [s])
- self.session.commit()
+ self.cli_set(base_path + [s])
+ self.cli_commit()
servers = get_name_servers()
for s in servers:
@@ -54,8 +52,8 @@ class TestSystemNameServer(unittest.TestCase):
def test_nameserver_delete(self):
# Test if a deleted server disappears from resolv.conf
for s in test_servers:
- self.session.delete(base_path + [s])
- self.session.commit()
+ self.cli_delete(base_path + [s])
+ self.cli_commit()
servers = get_name_servers()
for s in servers:
diff --git a/smoketest/scripts/cli/test_system_ntp.py b/smoketest/scripts/cli/test_system_ntp.py
index edb6ad94d..2b86ebd7c 100755
--- a/smoketest/scripts/cli/test_system_ntp.py
+++ b/smoketest/scripts/cli/test_system_ntp.py
@@ -15,9 +15,10 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import re
-import os
import unittest
+from base_vyostest_shim import VyOSUnitTestSHIM
+
from vyos.configsession import ConfigSession
from vyos.configsession import ConfigSessionError
from vyos.template import address_from_cidr
@@ -35,17 +36,15 @@ def get_config_value(key):
# remove possible trailing whitespaces
return [item.strip() for item in tmp]
-class TestSystemNTP(unittest.TestCase):
+class TestSystemNTP(VyOSUnitTestSHIM.TestCase):
def setUp(self):
- self.session = ConfigSession(os.getpid())
# ensure we can also run this test on a live system - so lets clean
# out the current configuration :)
- self.session.delete(base_path)
+ self.cli_delete(base_path)
def tearDown(self):
- self.session.delete(base_path)
- self.session.commit()
- del self.session
+ self.cli_delete(base_path)
+ self.cli_commit()
self.assertFalse(process_named_running(PROCESS_NAME))
@@ -57,13 +56,13 @@ class TestSystemNTP(unittest.TestCase):
for server in servers:
for option in options:
- self.session.set(base_path + ['server', server, option])
+ self.cli_set(base_path + ['server', server, option])
# Test NTP pool
- self.session.set(base_path + ['server', ntp_pool, 'pool'])
+ self.cli_set(base_path + ['server', ntp_pool, 'pool'])
# commit changes
- self.session.commit()
+ self.cli_commit()
# Check generated configuration
tmp = get_config_value('server')
@@ -81,21 +80,21 @@ class TestSystemNTP(unittest.TestCase):
# Test the allowed-networks statement
listen_address = ['127.0.0.1', '::1']
for listen in listen_address:
- self.session.set(base_path + ['listen-address', listen])
+ self.cli_set(base_path + ['listen-address', listen])
networks = ['192.0.2.0/24', '2001:db8:1000::/64']
for network in networks:
- self.session.set(base_path + ['allow-clients', 'address', network])
+ self.cli_set(base_path + ['allow-clients', 'address', network])
# Verify "NTP server not configured" verify() statement
with self.assertRaises(ConfigSessionError):
- self.session.commit()
+ self.cli_commit()
servers = ['192.0.2.1', '192.0.2.2']
for server in servers:
- self.session.set(base_path + ['server', server])
+ self.cli_set(base_path + ['server', server])
- self.session.commit()
+ self.cli_commit()
# Check generated client address configuration
for network in networks:
diff --git a/smoketest/scripts/cli/test_vpn_openconnect.py b/smoketest/scripts/cli/test_vpn_openconnect.py
index e27216e09..bf528c8b7 100755
--- a/smoketest/scripts/cli/test_vpn_openconnect.py
+++ b/smoketest/scripts/cli/test_vpn_openconnect.py
@@ -14,9 +14,10 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-import os
import unittest
+from base_vyostest_shim import VyOSUnitTestSHIM
+
from vyos.configsession import ConfigSession
from vyos.util import process_named_running
@@ -25,29 +26,24 @@ base_path = ['vpn', 'openconnect']
cert = '/etc/ssl/certs/ssl-cert-snakeoil.pem'
cert_key = '/etc/ssl/private/ssl-cert-snakeoil.key'
-class TestVpnOpenconnect(unittest.TestCase):
- def setUp(self):
- self.session = ConfigSession(os.getpid())
-
+class TestVpnOpenconnect(VyOSUnitTestSHIM.TestCase):
def tearDown(self):
# Delete vpn openconnect configuration
- self.session.delete(base_path)
- self.session.commit()
-
- del self.session
+ self.cli_delete(base_path)
+ self.cli_commit()
def test_vpn(self):
user = 'vyos_user'
password = 'vyos_pass'
- self.session.delete(base_path)
- self.session.set(base_path + ["authentication", "local-users", "username", user, "password", password])
- self.session.set(base_path + ["authentication", "mode", "local"])
- self.session.set(base_path + ["network-settings", "client-ip-settings", "subnet", "192.0.2.0/24"])
- self.session.set(base_path + ["ssl", "ca-cert-file", cert])
- self.session.set(base_path + ["ssl", "cert-file", cert])
- self.session.set(base_path + ["ssl", "key-file", cert_key])
-
- self.session.commit()
+ self.cli_delete(base_path)
+ self.cli_set(base_path + ["authentication", "local-users", "username", user, "password", password])
+ self.cli_set(base_path + ["authentication", "mode", "local"])
+ self.cli_set(base_path + ["network-settings", "client-ip-settings", "subnet", "192.0.2.0/24"])
+ self.cli_set(base_path + ["ssl", "ca-cert-file", cert])
+ self.cli_set(base_path + ["ssl", "cert-file", cert])
+ self.cli_set(base_path + ["ssl", "key-file", cert_key])
+
+ self.cli_commit()
# Check for running process
self.assertTrue(process_named_running('ocserv-main'))
diff --git a/smoketest/scripts/cli/test_vpn_sstp.py b/smoketest/scripts/cli/test_vpn_sstp.py
index 95fe38dd9..033338685 100755
--- a/smoketest/scripts/cli/test_vpn_sstp.py
+++ b/smoketest/scripts/cli/test_vpn_sstp.py
@@ -23,18 +23,14 @@ ca_cert = '/tmp/ca.crt'
ssl_cert = '/tmp/server.crt'
ssl_key = '/tmp/server.key'
-class TestVPNSSTPServer(BasicAccelPPPTest.BaseTest):
+class TestVPNSSTPServer(BasicAccelPPPTest.TestCase):
def setUp(self):
self._base_path = ['vpn', 'sstp']
self._process_name = 'accel-pppd'
self._config_file = '/run/accel-pppd/sstp.conf'
self._chap_secrets = '/run/accel-pppd/sstp.chap-secrets'
-
super().setUp()
- def tearDown(self):
- super().tearDown()
-
def basic_config(self):
# SSL is mandatory
self.set(['ssl', 'ca-cert-file', ca_cert])
diff --git a/smoketest/scripts/cli/test_vrf.py b/smoketest/scripts/cli/test_vrf.py
index 4ce470d46..01d2e8c39 100755
--- a/smoketest/scripts/cli/test_vrf.py
+++ b/smoketest/scripts/cli/test_vrf.py
@@ -19,6 +19,8 @@ import os
import json
import unittest
+from base_vyostest_shim import VyOSUnitTestSHIM
+
from netifaces import interfaces
from vyos.configsession import ConfigSession
@@ -39,7 +41,7 @@ def get_vrf_ipv4_routes(vrf):
def get_vrf_ipv6_routes(vrf):
return json.loads(cmd(f'ip -6 -j route show vrf {vrf}'))
-class VRFTest(unittest.TestCase):
+class VRFTest(VyOSUnitTestSHIM.TestCase):
_interfaces = []
@classmethod
@@ -54,13 +56,13 @@ class VRFTest(unittest.TestCase):
if not '.' in tmp:
cls._interfaces.append(tmp)
- def setUp(self):
- self.session = ConfigSession(os.getpid())
+ # call base-classes classmethod
+ super(cls, cls).setUpClass()
def tearDown(self):
# delete all VRFs
- self.session.delete(base_path)
- self.session.commit()
+ self.cli_delete(base_path)
+ self.cli_commit()
for vrf in vrfs:
self.assertNotIn(vrf, interfaces())
@@ -69,20 +71,20 @@ class VRFTest(unittest.TestCase):
for vrf in vrfs:
base = base_path + ['name', vrf]
description = f'VyOS-VRF-{vrf}'
- self.session.set(base + ['description', description])
+ self.cli_set(base + ['description', description])
# check validate() - a table ID is mandatory
with self.assertRaises(ConfigSessionError):
- self.session.commit()
+ self.cli_commit()
- self.session.set(base + ['table', table])
+ self.cli_set(base + ['table', table])
if vrf == 'green':
- self.session.set(base + ['disable'])
+ self.cli_set(base + ['disable'])
table = str(int(table) + 1)
# commit changes
- self.session.commit()
+ self.cli_commit()
# Verify VRF configuration
table = '1000'
@@ -113,11 +115,11 @@ class VRFTest(unittest.TestCase):
table = '2000'
for vrf in vrfs:
base = base_path + ['name', vrf]
- self.session.set(base + ['table', str(table)])
+ self.cli_set(base + ['table', str(table)])
table = str(int(table) + 1)
# commit changes
- self.session.commit()
+ self.cli_commit()
# Verify VRF configuration
for vrf in vrfs:
@@ -129,13 +131,13 @@ class VRFTest(unittest.TestCase):
table = '2000'
for vrf in vrfs:
base = base_path + ['name', vrf]
- self.session.set(base + ['table', str(table)])
+ self.cli_set(base + ['table', str(table)])
table = str(int(table) + 1)
- self.session.set(base_path + ['bind-to-all'])
+ self.cli_set(base_path + ['bind-to-all'])
# commit changes
- self.session.commit()
+ self.cli_commit()
# Verify VRF configuration
tmp = read_file('/proc/sys/net/ipv4/tcp_l3mdev_accept')
@@ -149,31 +151,31 @@ class VRFTest(unittest.TestCase):
table = '1000'
vrf = vrfs[0]
base = base_path + ['name', vrf]
- self.session.set(base + ['table', table])
+ self.cli_set(base + ['table', table])
# commit changes
- self.session.commit()
+ self.cli_commit()
# Check if VRF has been created
self.assertTrue(vrf in interfaces())
table = str(int(table) + 1)
- self.session.set(base + ['table', table])
+ self.cli_set(base + ['table', table])
# check validate() - table ID can not be altered!
with self.assertRaises(ConfigSessionError):
- self.session.commit()
+ self.cli_commit()
def test_vrf_assign_interface(self):
vrf = vrfs[0]
table = '5000'
- self.session.set(['vrf', 'name', vrf, 'table', table])
+ self.cli_set(['vrf', 'name', vrf, 'table', table])
for interface in self._interfaces:
section = Section.section(interface)
- self.session.set(['interfaces', section, interface, 'vrf', vrf])
+ self.cli_set(['interfaces', section, interface, 'vrf', vrf])
# commit changes
- self.session.commit()
+ self.cli_commit()
# Verify & cleanup
for interface in self._interfaces:
@@ -182,7 +184,7 @@ class VRFTest(unittest.TestCase):
self.assertEqual(tmp, vrf)
# cleanup
section = Section.section(interface)
- self.session.delete(['interfaces', section, interface, 'vrf'])
+ self.cli_delete(['interfaces', section, interface, 'vrf'])
def test_vrf_static_routes(self):
routes = {
@@ -206,15 +208,15 @@ class VRFTest(unittest.TestCase):
table = '2000'
for vrf in vrfs:
base = base_path + ['name', vrf]
- self.session.set(base + ['table', str(table)])
+ self.cli_set(base + ['table', str(table)])
# required interface for leaking to default table
- self.session.set(['interfaces', 'ethernet', 'eth0', 'address', '192.0.2.1/24'])
+ self.cli_set(['interfaces', 'ethernet', 'eth0', 'address', '192.0.2.1/24'])
# we also need an interface in "UP" state to install routes
- self.session.set(['interfaces', 'dummy', f'dum{table}', 'vrf', vrf])
- self.session.set(['interfaces', 'dummy', f'dum{table}', 'address', '192.0.2.1/24'])
- self.session.set(['interfaces', 'dummy', f'dum{table}', 'address', '2001:db8::1/64'])
+ self.cli_set(['interfaces', 'dummy', f'dum{table}', 'vrf', vrf])
+ self.cli_set(['interfaces', 'dummy', f'dum{table}', 'address', '192.0.2.1/24'])
+ self.cli_set(['interfaces', 'dummy', f'dum{table}', 'address', '2001:db8::1/64'])
table = str(int(table) + 1)
proto_base = ['protocols', 'vrf', vrf, 'static']
@@ -222,14 +224,14 @@ class VRFTest(unittest.TestCase):
route_type = 'route'
if is_ipv6(route):
route_type = 'route6'
- self.session.set(proto_base + [route_type, route, 'next-hop', route_config['next_hop']])
+ self.cli_set(proto_base + [route_type, route, 'next-hop', route_config['next_hop']])
if 'distance' in route_config:
- self.session.set(proto_base + [route_type, route, 'next-hop', route_config['next_hop'], 'distance', route_config['distance']])
+ self.cli_set(proto_base + [route_type, route, 'next-hop', route_config['next_hop'], 'distance', route_config['distance']])
if 'next_hop_vrf' in route_config:
- self.session.set(proto_base + [route_type, route, 'next-hop', route_config['next_hop'], 'next-hop-vrf', route_config['next_hop_vrf']])
+ self.cli_set(proto_base + [route_type, route, 'next-hop', route_config['next_hop'], 'next-hop-vrf', route_config['next_hop_vrf']])
# commit changes
- self.session.commit()
+ self.cli_commit()
# Verify routes
table = '2000'
@@ -249,9 +251,9 @@ class VRFTest(unittest.TestCase):
self.assertTrue(found)
# Cleanup
- self.session.delete(['protocols', 'vrf', vrf])
- self.session.delete(['interfaces', 'dummy', f'dum{table}'])
- self.session.delete(['interfaces', 'ethernet', 'eth0', 'address', '192.0.2.1/24'])
+ self.cli_delete(['protocols', 'vrf', vrf])
+ self.cli_delete(['interfaces', 'dummy', f'dum{table}'])
+ self.cli_delete(['interfaces', 'ethernet', 'eth0', 'address', '192.0.2.1/24'])
table = str(int(table) + 1)