From fb94d0b6091e09099c972355886918ef63ee9d97 Mon Sep 17 00:00:00 2001 From: Christian Poessinger Date: Wed, 17 Mar 2021 19:12:04 +0100 Subject: smoketest: add shim for every test to re-use common tasts Currently every smoketest does the setup and destruction of the configsession on its own durin setUp(). This creates a lot of overhead and one configsession should be re-used during execution of every smoketest script. In addiion a test that failed will leaf the system in an unconsistent state. For this reason before the test is executed we will save the running config to /tmp and the will re-load the config after the test has passed, always ensuring a clean environment for the next test. (cherry picked from commit 0f3def974fbaa4a26e6ad590ee37dd965bc2358f) --- smoketest/scripts/cli/base_accel_ppp_test.py | 30 +- smoketest/scripts/cli/base_interfaces_test.py | 185 ++++++----- smoketest/scripts/cli/base_vyostest_shim.py | 90 ++++++ smoketest/scripts/cli/test_interfaces_bonding.py | 31 +- smoketest/scripts/cli/test_interfaces_bridge.py | 76 +++-- smoketest/scripts/cli/test_interfaces_dummy.py | 4 +- smoketest/scripts/cli/test_interfaces_ethernet.py | 50 +-- smoketest/scripts/cli/test_interfaces_geneve.py | 4 +- smoketest/scripts/cli/test_interfaces_l2tpv3.py | 10 +- smoketest/scripts/cli/test_interfaces_loopback.py | 13 +- smoketest/scripts/cli/test_interfaces_macsec.py | 84 ++--- smoketest/scripts/cli/test_interfaces_openvpn.py | 349 +++++++++++---------- smoketest/scripts/cli/test_interfaces_pppoe.py | 83 +++-- .../scripts/cli/test_interfaces_pseudo_ethernet.py | 4 +- smoketest/scripts/cli/test_interfaces_tunnel.py | 142 +++++---- smoketest/scripts/cli/test_interfaces_vxlan.py | 9 +- smoketest/scripts/cli/test_interfaces_wireguard.py | 52 +-- smoketest/scripts/cli/test_interfaces_wireless.py | 58 ++-- smoketest/scripts/cli/test_nat.py | 74 ++--- smoketest/scripts/cli/test_policy.py | 143 +++++---- smoketest/scripts/cli/test_policy_local-route.py | 23 +- smoketest/scripts/cli/test_protocols_bfd.py | 36 +-- smoketest/scripts/cli/test_protocols_bgp.py | 102 +++--- smoketest/scripts/cli/test_protocols_igmp-proxy.py | 41 ++- smoketest/scripts/cli/test_protocols_ospfv3.py | 46 ++- smoketest/scripts/cli/test_protocols_rip.py | 85 +++-- smoketest/scripts/cli/test_protocols_ripng.py | 89 +++--- smoketest/scripts/cli/test_protocols_rpki.py | 51 ++- smoketest/scripts/cli/test_protocols_static.py | 32 +- smoketest/scripts/cli/test_service_bcast-relay.py | 37 ++- smoketest/scripts/cli/test_service_dhcp-relay.py | 35 +-- smoketest/scripts/cli/test_service_dhcp-server.py | 169 +++++----- smoketest/scripts/cli/test_service_dhcpv6-relay.py | 31 +- .../scripts/cli/test_service_dhcpv6-server.py | 71 ++--- smoketest/scripts/cli/test_service_dns_dynamic.py | 88 +++--- .../scripts/cli/test_service_dns_forwarding.py | 62 ++-- smoketest/scripts/cli/test_service_https.py | 23 +- .../scripts/cli/test_service_mdns-repeater.py | 27 +- smoketest/scripts/cli/test_service_pppoe-server.py | 16 +- .../scripts/cli/test_service_router-advert.py | 27 +- smoketest/scripts/cli/test_service_snmp.py | 76 +++-- smoketest/scripts/cli/test_service_ssh.py | 48 +-- smoketest/scripts/cli/test_service_tftp-server.py | 33 +- smoketest/scripts/cli/test_service_webproxy.py | 97 +++--- .../scripts/cli/test_system_acceleration_qat.py | 17 +- smoketest/scripts/cli/test_system_ip.py | 26 +- smoketest/scripts/cli/test_system_ipv6.py | 32 +- smoketest/scripts/cli/test_system_lcd.py | 19 +- smoketest/scripts/cli/test_system_login.py | 61 ++-- smoketest/scripts/cli/test_system_nameserver.py | 30 +- smoketest/scripts/cli/test_system_ntp.py | 29 +- smoketest/scripts/cli/test_vpn_openconnect.py | 32 +- smoketest/scripts/cli/test_vpn_sstp.py | 6 +- smoketest/scripts/cli/test_vrf.py | 72 ++--- 54 files changed, 1627 insertions(+), 1533 deletions(-) create mode 100644 smoketest/scripts/cli/base_vyostest_shim.py diff --git a/smoketest/scripts/cli/base_accel_ppp_test.py b/smoketest/scripts/cli/base_accel_ppp_test.py index 705c932b4..b2acb03cc 100644 --- a/smoketest/scripts/cli/base_accel_ppp_test.py +++ b/smoketest/scripts/cli/base_accel_ppp_test.py @@ -12,10 +12,10 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . -import os import re import unittest +from base_vyostest_shim import VyOSUnitTestSHIM from configparser import ConfigParser from vyos.configsession import ConfigSession @@ -26,26 +26,22 @@ from vyos.util import get_half_cpus from vyos.util import process_named_running class BasicAccelPPPTest: - class BaseTest(unittest.TestCase): - + class TestCase(VyOSUnitTestSHIM.TestCase): def setUp(self): - self.session = ConfigSession(os.getpid()) self._gateway = '192.0.2.1' - # ensure we can also run this test on a live system - so lets clean # out the current configuration :) - self.session.delete(self._base_path) + self.cli_delete(self._base_path) def tearDown(self): - self.session.delete(self._base_path) - self.session.commit() - del self.session + self.cli_delete(self._base_path) + self.cli_commit() def set(self, path): - self.session.set(self._base_path + path) + self.cli_set(self._base_path + path) def delete(self, path): - self.session.delete(self._base_path + path) + self.cli_delete(self._base_path + path) def basic_config(self): # PPPoE local auth mode requires local users to be configured! @@ -65,7 +61,7 @@ class BasicAccelPPPTest: self.set(['name-server', ns]) # commit changes - self.session.commit() + self.cli_commit() # Validate configuration values conf = ConfigParser(allow_no_value=True, delimiters='=') @@ -95,11 +91,11 @@ class BasicAccelPPPTest: # upload rate-limit requires also download rate-limit with self.assertRaises(ConfigSessionError): - self.session.commit() + self.cli_commit() self.set(['authentication', 'local-users', 'username', user, 'rate-limit', 'download', download]) # commit changes - self.session.commit() + self.cli_commit() # Validate configuration values conf = ConfigParser(allow_no_value=True, delimiters='=') @@ -123,7 +119,7 @@ class BasicAccelPPPTest: # Check local-users default value(s) self.delete(['authentication', 'local-users', 'username', user, 'static-ip']) # commit changes - self.session.commit() + self.cli_commit() # check local users tmp = cmd(f'sudo cat {self._chap_secrets}') @@ -162,7 +158,7 @@ class BasicAccelPPPTest: self.set(['authentication', 'radius', 'source-address', source_address]) # commit changes - self.session.commit() + self.cli_commit() # Validate configuration values conf = ConfigParser(allow_no_value=True, delimiters='=') @@ -200,7 +196,7 @@ class BasicAccelPPPTest: self.set(['authentication', 'radius', 'server', radius_server, 'disable-accounting']) # commit changes - self.session.commit() + self.cli_commit() conf.read(self._config_file) diff --git a/smoketest/scripts/cli/base_interfaces_test.py b/smoketest/scripts/cli/base_interfaces_test.py index 110309780..6f8eda26a 100644 --- a/smoketest/scripts/cli/base_interfaces_test.py +++ b/smoketest/scripts/cli/base_interfaces_test.py @@ -22,6 +22,8 @@ from netifaces import AF_INET6 from netifaces import ifaddresses from netifaces import interfaces +from base_vyostest_shim import VyOSUnitTestSHIM + from vyos.configsession import ConfigSession from vyos.configsession import ConfigSessionError from vyos.ifconfig import Interface @@ -53,7 +55,7 @@ def is_mirrored_to(interface, mirror_if, qdisc): return ret_val class BasicInterfaceTest: - class BaseTest(unittest.TestCase): + class TestCase(VyOSUnitTestSHIM.TestCase): _test_ip = False _test_mtu = False _test_vlan = False @@ -69,29 +71,26 @@ class BasicInterfaceTest: _qinq_range = ['10', '20', '30'] _vlan_range = ['100', '200', '300', '2000'] _test_addr = ['192.0.2.1/26', '192.0.2.255/31', '192.0.2.64/32', - '2001:db8:1::ffff/64', '2001:db8:101::1/112'] + '2001:db8:1::ffff/64', '2001:db8:101::1/128'] _mirror_interfaces = [] # choose IPv6 minimum MTU value for tests - this must always work _mtu = '1280' def setUp(self): - self.session = ConfigSession(os.getpid()) - # Setup mirror interfaces for SPAN (Switch Port Analyzer) for span in self._mirror_interfaces: section = Section.section(span) - self.session.set(['interfaces', section, span]) + self.cli_set(['interfaces', section, span]) def tearDown(self): # Tear down mirror interfaces for SPAN (Switch Port Analyzer) for span in self._mirror_interfaces: section = Section.section(span) - self.session.delete(['interfaces', section, span]) + self.cli_delete(['interfaces', section, span]) - self.session.delete(self._base_path) - self.session.commit() - del self.session + self.cli_delete(self._base_path) + self.cli_commit() # Verify that no previously interface remained on the system for intf in self._interfaces: @@ -104,10 +103,10 @@ class BasicInterfaceTest: # Check the two-way mirror rules of ingress and egress for mirror in self._mirror_interfaces: for interface in self._interfaces: - self.session.set(self._base_path + [interface, 'mirror', 'ingress', mirror]) - self.session.set(self._base_path + [interface, 'mirror', 'egress', mirror]) + self.cli_set(self._base_path + [interface, 'mirror', 'ingress', mirror]) + self.cli_set(self._base_path + [interface, 'mirror', 'egress', mirror]) - self.session.commit() + self.cli_commit() # Verify config for mirror in self._mirror_interfaces: @@ -119,11 +118,11 @@ class BasicInterfaceTest: # Check if description can be added to interface and # can be read back for intf in self._interfaces: - self.session.set(self._base_path + [intf, 'disable']) + self.cli_set(self._base_path + [intf, 'disable']) for option in self._options.get(intf, []): - self.session.set(self._base_path + [intf] + option.split()) + self.cli_set(self._base_path + [intf] + option.split()) - self.session.commit() + self.cli_commit() # Validate interface description for intf in self._interfaces: @@ -134,11 +133,11 @@ class BasicInterfaceTest: # can be read back for intf in self._interfaces: test_string=f'Description-Test-{intf}' - self.session.set(self._base_path + [intf, 'description', test_string]) + self.cli_set(self._base_path + [intf, 'description', test_string]) for option in self._options.get(intf, []): - self.session.set(self._base_path + [intf] + option.split()) + self.cli_set(self._base_path + [intf] + option.split()) - self.session.commit() + self.cli_commit() # Validate interface description for intf in self._interfaces: @@ -146,9 +145,9 @@ class BasicInterfaceTest: tmp = read_file(f'/sys/class/net/{intf}/ifalias') self.assertEqual(tmp, test_string) self.assertEqual(Interface(intf).get_alias(), test_string) - self.session.delete(self._base_path + [intf, 'description']) + self.cli_delete(self._base_path + [intf, 'description']) - self.session.commit() + self.cli_commit() # Validate remove interface description "empty" for intf in self._interfaces: @@ -159,11 +158,11 @@ class BasicInterfaceTest: def test_add_single_ip_address(self): addr = '192.0.2.0/31' for intf in self._interfaces: - self.session.set(self._base_path + [intf, 'address', addr]) + self.cli_set(self._base_path + [intf, 'address', addr]) for option in self._options.get(intf, []): - self.session.set(self._base_path + [intf] + option.split()) + self.cli_set(self._base_path + [intf] + option.split()) - self.session.commit() + self.cli_commit() for intf in self._interfaces: self.assertTrue(is_intf_addr_assigned(intf, addr)) @@ -173,11 +172,11 @@ class BasicInterfaceTest: # Add address for intf in self._interfaces: for addr in self._test_addr: - self.session.set(self._base_path + [intf, 'address', addr]) + self.cli_set(self._base_path + [intf, 'address', addr]) for option in self._options.get(intf, []): - self.session.set(self._base_path + [intf] + option.split()) + self.cli_set(self._base_path + [intf] + option.split()) - self.session.commit() + self.cli_commit() # Validate address for intf in self._interfaces: @@ -197,10 +196,10 @@ class BasicInterfaceTest: for interface in self._interfaces: base = self._base_path + [interface] for option in self._options.get(interface, []): - self.session.set(base + option.split()) + self.cli_set(base + option.split()) # after commit we must have an IPv6 link-local address - self.session.commit() + self.cli_commit() for interface in self._interfaces: for addr in ifaddresses(interface)[AF_INET6]: @@ -209,10 +208,10 @@ class BasicInterfaceTest: # disable IPv6 link-local address assignment for interface in self._interfaces: base = self._base_path + [interface] - self.session.set(base + ['ipv6', 'address', 'no-default-link-local']) + self.cli_set(base + ['ipv6', 'address', 'no-default-link-local']) # after commit we must have no IPv6 link-local address - self.session.commit() + self.cli_commit() for interface in self._interfaces: self.assertTrue(AF_INET6 not in ifaddresses(interface)) @@ -223,12 +222,12 @@ class BasicInterfaceTest: for intf in self._interfaces: base = self._base_path + [intf] - self.session.set(base + ['mtu', self._mtu]) + self.cli_set(base + ['mtu', self._mtu]) for option in self._options.get(intf, []): - self.session.set(base + option.split()) + self.cli_set(base + option.split()) # commit interface changes - self.session.commit() + self.cli_commit() # verify changed MTU for intf in self._interfaces: @@ -246,14 +245,14 @@ class BasicInterfaceTest: for intf in self._interfaces: base = self._base_path + [intf] - self.session.set(base + ['mtu', self._mtu]) - self.session.set(base + ['ipv6', 'address', 'no-default-link-local']) + self.cli_set(base + ['mtu', self._mtu]) + self.cli_set(base + ['ipv6', 'address', 'no-default-link-local']) for option in self._options.get(intf, []): - self.session.set(base + option.split()) + self.cli_set(base + option.split()) # commit interface changes - self.session.commit() + self.cli_commit() # verify changed MTU for intf in self._interfaces: @@ -273,15 +272,15 @@ class BasicInterfaceTest: for interface in self._interfaces: base = self._base_path + [interface] for option in self._options.get(interface, []): - self.session.set(base + option.split()) + self.cli_set(base + option.split()) for vlan in self._vlan_range: base = self._base_path + [interface, 'vif', vlan] - self.session.set(base + ['mtu', self._mtu]) + self.cli_set(base + ['mtu', self._mtu]) for address in self._test_addr: - self.session.set(base + ['address', address]) + self.cli_set(base + ['address', address]) - self.session.commit() + self.cli_commit() for intf in self._interfaces: for vlan in self._vlan_range: @@ -306,29 +305,29 @@ class BasicInterfaceTest: for interface in self._interfaces: base = self._base_path + [interface] - self.session.set(base + ['mtu', mtu_1500]) + self.cli_set(base + ['mtu', mtu_1500]) for option in self._options.get(interface, []): - self.session.set(base + option.split()) + self.cli_set(base + option.split()) if 'source-interface' in option: iface = option.split()[-1] iface_type = Section.section(iface) - self.session.set(['interfaces', iface_type, iface, 'mtu', mtu_9000]) + self.cli_set(['interfaces', iface_type, iface, 'mtu', mtu_9000]) for vlan in self._vlan_range: base = self._base_path + [interface, 'vif', vlan] - self.session.set(base + ['mtu', mtu_9000]) + self.cli_set(base + ['mtu', mtu_9000]) # check validate() - VIF MTU must not be larger the parent interface # MTU size. with self.assertRaises(ConfigSessionError): - self.session.commit() + self.cli_commit() # Change MTU on base interface to be the same as on the VIF interface for interface in self._interfaces: base = self._base_path + [interface] - self.session.set(base + ['mtu', mtu_9000]) + self.cli_set(base + ['mtu', mtu_9000]) - self.session.commit() + self.cli_commit() # Verify MTU on base and VIF interfaces for interface in self._interfaces: @@ -348,24 +347,24 @@ class BasicInterfaceTest: for interface in self._interfaces: base = self._base_path + [interface] for option in self._options.get(interface, []): - self.session.set(base + option.split()) + self.cli_set(base + option.split()) # disable the lower interface - self.session.set(base + ['disable']) + self.cli_set(base + ['disable']) for vlan in self._vlan_range: vlan_base = self._base_path + [interface, 'vif', vlan] # disable the vlan interface - self.session.set(vlan_base + ['disable']) + self.cli_set(vlan_base + ['disable']) - self.session.commit() + self.cli_commit() # re-enable all lower interfaces for interface in self._interfaces: base = self._base_path + [interface] - self.session.delete(base + ['disable']) + self.cli_delete(base + ['disable']) - self.session.commit() + self.cli_commit() # verify that the lower interfaces are admin up and the vlan # interfaces are all admin down @@ -388,16 +387,16 @@ class BasicInterfaceTest: for interface in self._interfaces: base = self._base_path + [interface] for option in self._options.get(interface, []): - self.session.set(base + option.split()) + self.cli_set(base + option.split()) for vif_s in self._qinq_range: for vif_c in self._vlan_range: base = self._base_path + [interface, 'vif-s', vif_s, 'vif-c', vif_c] - self.session.set(base + ['mtu', self._mtu]) + self.cli_set(base + ['mtu', self._mtu]) for address in self._test_addr: - self.session.set(base + ['address', address]) + self.cli_set(base + ['address', address]) - self.session.commit() + self.cli_commit() for interface in self._interfaces: for vif_s in self._qinq_range: @@ -420,20 +419,20 @@ class BasicInterfaceTest: arp_tmo = '300' path = self._base_path + [interface] for option in self._options.get(interface, []): - self.session.set(path + option.split()) + self.cli_set(path + option.split()) # Options - self.session.set(path + ['ip', 'arp-cache-timeout', arp_tmo]) - self.session.set(path + ['ip', 'disable-arp-filter']) - self.session.set(path + ['ip', 'disable-forwarding']) - self.session.set(path + ['ip', 'enable-arp-accept']) - self.session.set(path + ['ip', 'enable-arp-announce']) - self.session.set(path + ['ip', 'enable-arp-ignore']) - self.session.set(path + ['ip', 'enable-proxy-arp']) - self.session.set(path + ['ip', 'proxy-arp-pvlan']) - self.session.set(path + ['ip', 'source-validation', 'loose']) - - self.session.commit() + self.cli_set(path + ['ip', 'arp-cache-timeout', arp_tmo]) + self.cli_set(path + ['ip', 'disable-arp-filter']) + self.cli_set(path + ['ip', 'disable-forwarding']) + self.cli_set(path + ['ip', 'enable-arp-accept']) + self.cli_set(path + ['ip', 'enable-arp-announce']) + self.cli_set(path + ['ip', 'enable-arp-ignore']) + self.cli_set(path + ['ip', 'enable-proxy-arp']) + self.cli_set(path + ['ip', 'proxy-arp-pvlan']) + self.cli_set(path + ['ip', 'source-validation', 'loose']) + + self.cli_commit() for interface in self._interfaces: tmp = read_file(f'/proc/sys/net/ipv4/neigh/{interface}/base_reachable_time_ms') @@ -471,13 +470,13 @@ class BasicInterfaceTest: dad_transmits = '10' path = self._base_path + [interface] for option in self._options.get(interface, []): - self.session.set(path + option.split()) + self.cli_set(path + option.split()) # Options - self.session.set(path + ['ipv6', 'disable-forwarding']) - self.session.set(path + ['ipv6', 'dup-addr-detect-transmits', dad_transmits]) + self.cli_set(path + ['ipv6', 'disable-forwarding']) + self.cli_set(path + ['ipv6', 'dup-addr-detect-transmits', dad_transmits]) - self.session.commit() + self.cli_commit() for interface in self._interfaces: tmp = read_file(f'/proc/sys/net/ipv6/conf/{interface}/forwarding') @@ -495,16 +494,16 @@ class BasicInterfaceTest: duid = '00:01:00:01:27:71:db:f0:00:50:00:00:00:{}'.format(duid_base) path = self._base_path + [interface] for option in self._options.get(interface, []): - self.session.set(path + option.split()) + self.cli_set(path + option.split()) # Enable DHCPv6 client - self.session.set(path + ['address', 'dhcpv6']) - self.session.set(path + ['dhcpv6-options', 'rapid-commit']) - self.session.set(path + ['dhcpv6-options', 'parameters-only']) - self.session.set(path + ['dhcpv6-options', 'duid', duid]) + self.cli_set(path + ['address', 'dhcpv6']) + self.cli_set(path + ['dhcpv6-options', 'rapid-commit']) + self.cli_set(path + ['dhcpv6-options', 'parameters-only']) + self.cli_set(path + ['dhcpv6-options', 'duid', duid]) duid_base += 1 - self.session.commit() + self.cli_commit() duid_base = 10 for interface in self._interfaces: @@ -535,21 +534,21 @@ class BasicInterfaceTest: for interface in self._interfaces: path = self._base_path + [interface] for option in self._options.get(interface, []): - self.session.set(path + option.split()) + self.cli_set(path + option.split()) address = '1' # prefix delegation stuff pd_base = path + ['dhcpv6-options', 'pd', '0'] - self.session.set(pd_base + ['length', prefix_len]) + self.cli_set(pd_base + ['length', prefix_len]) for delegatee in delegatees: section = Section.section(delegatee) - self.session.set(['interfaces', section, delegatee]) - self.session.set(pd_base + ['interface', delegatee, 'address', address]) + self.cli_set(['interfaces', section, delegatee]) + self.cli_set(pd_base + ['interface', delegatee, 'address', address]) # increment interface address address = str(int(address) + 1) - self.session.commit() + self.cli_commit() for interface in self._interfaces: dhcpc6_config = read_file(f'/run/dhcp6c/dhcp6c.{interface}.conf') @@ -577,7 +576,7 @@ class BasicInterfaceTest: # we can already cleanup the test delegatee interface here # as until commit() is called, nothing happens section = Section.section(delegatee) - self.session.delete(['interfaces', section, delegatee]) + self.cli_delete(['interfaces', section, delegatee]) def test_dhcpv6pd_manual_sla_id(self): if not self._test_ipv6_pd: @@ -591,25 +590,25 @@ class BasicInterfaceTest: for interface in self._interfaces: path = self._base_path + [interface] for option in self._options.get(interface, []): - self.session.set(path + option.split()) + self.cli_set(path + option.split()) # prefix delegation stuff address = '1' sla_id = '1' pd_base = path + ['dhcpv6-options', 'pd', '0'] - self.session.set(pd_base + ['length', prefix_len]) + self.cli_set(pd_base + ['length', prefix_len]) for delegatee in delegatees: section = Section.section(delegatee) - self.session.set(['interfaces', section, delegatee]) - self.session.set(pd_base + ['interface', delegatee, 'address', address]) - self.session.set(pd_base + ['interface', delegatee, 'sla-id', sla_id]) + self.cli_set(['interfaces', section, delegatee]) + self.cli_set(pd_base + ['interface', delegatee, 'address', address]) + self.cli_set(pd_base + ['interface', delegatee, 'sla-id', sla_id]) # increment interface address address = str(int(address) + 1) sla_id = str(int(sla_id) + 1) - self.session.commit() + self.cli_commit() # Verify dhcpc6 client configuration for interface in self._interfaces: @@ -638,4 +637,4 @@ class BasicInterfaceTest: # we can already cleanup the test delegatee interface here # as until commit() is called, nothing happens section = Section.section(delegatee) - self.session.delete(['interfaces', section, delegatee]) + self.cli_delete(['interfaces', section, delegatee]) diff --git a/smoketest/scripts/cli/base_vyostest_shim.py b/smoketest/scripts/cli/base_vyostest_shim.py new file mode 100644 index 000000000..18e4e567e --- /dev/null +++ b/smoketest/scripts/cli/base_vyostest_shim.py @@ -0,0 +1,90 @@ +# Copyright (C) 2021 VyOS maintainers and contributors +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 2 or later as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +import os +import unittest + +from time import sleep + +from vyos.configsession import ConfigSession +from vyos.configsession import ConfigSessionError +from vyos import ConfigError +from vyos.util import cmd + +save_config = '/tmp/vyos-smoketest-save' + +# This class acts as shim between individual Smoketests developed for VyOS and +# the Python UnitTest framework. Before every test is loaded, we dump the current +# system configuration and reload it after the test - despite the test results. +# +# Using this approach we can not render a live system useless while running any +# kind of smoketest. In addition it adds debug capabilities like printing the +# command used to execute the test. +class VyOSUnitTestSHIM: + class TestCase(unittest.TestCase): + # if enabled in derived class, print out each and every set/del command + # on the CLI. This is usefull to grap all the commands required to + # trigger the certain failure condition. + # Use "self.debug = True" in derived classes setUp() method + debug = False + + @classmethod + def setUpClass(cls): + cls._session = ConfigSession(os.getpid()) + cls._session.save_config(save_config) + pass + + @classmethod + def tearDownClass(cls): + # discard any pending changes which might caused a messed up config + cls._session.discard() + # ... and restore the initial state + cls._session.migrate_and_load_config(save_config) + + try: + cls._session.commit() + except (ConfigError, ConfigSessionError): + cls._session.discard() + cls.fail(cls) + + def cli_set(self, config): + if self.debug: + print('set ' + ' '.join(config)) + self._session.set(config) + + def cli_delete(self, config): + if self.debug: + print('del ' + ' '.join(config)) + self._session.delete(config) + + def cli_commit(self): + self._session.commit() + + def getFRRconfig(self, string, end='$'): + """ Retrieve current "running configuration" from FRR """ + command = f'vtysh -c "show run" | sed -n "/^{string}{end}/,/^!/p"' + + count = 0 + tmp = '' + while count < 10 and tmp == '': + # Let FRR settle after a config change first before harassing it again + sleep(1) + tmp = cmd(command) + count += 1 + + if self.debug or tmp == '': + import pprint + print(f'\n\ncommand "{command}" returned:\n') + pprint.pprint(tmp) + return tmp diff --git a/smoketest/scripts/cli/test_interfaces_bonding.py b/smoketest/scripts/cli/test_interfaces_bonding.py index 880a822a7..86000553e 100755 --- a/smoketest/scripts/cli/test_interfaces_bonding.py +++ b/smoketest/scripts/cli/test_interfaces_bonding.py @@ -25,7 +25,7 @@ from vyos.configsession import ConfigSessionError from vyos.util import get_interface_config from vyos.util import read_file -class BondingInterfaceTest(BasicInterfaceTest.BaseTest): +class BondingInterfaceTest(BasicInterfaceTest.TestCase): @classmethod def setUpClass(cls): cls._test_ip = True @@ -53,6 +53,9 @@ class BondingInterfaceTest(BasicInterfaceTest.BaseTest): for member in cls._members: cls._options['bond0'].append(f'member interface {member}') + # call base-classes classmethod + super(cls, cls).setUpClass() + def test_add_single_ip_address(self): super().test_add_single_ip_address() @@ -75,16 +78,16 @@ class BondingInterfaceTest(BasicInterfaceTest.BaseTest): # configure member interfaces for interface in self._interfaces: for option in self._options.get(interface, []): - self.session.set(self._base_path + [interface] + option.split()) + self.cli_set(self._base_path + [interface] + option.split()) - self.session.commit() + self.cli_commit() # remove single bond member port for interface in self._interfaces: remove_member = self._members[0] - self.session.delete(self._base_path + [interface, 'member', 'interface', remove_member]) + self.cli_delete(self._base_path + [interface, 'member', 'interface', remove_member]) - self.session.commit() + self.cli_commit() # removed member port must be admin-up for interface in self._interfaces: @@ -97,11 +100,11 @@ class BondingInterfaceTest(BasicInterfaceTest.BaseTest): min_links = len(self._interfaces) for interface in self._interfaces: for option in self._options.get(interface, []): - self.session.set(self._base_path + [interface] + option.split()) + self.cli_set(self._base_path + [interface] + option.split()) - self.session.set(self._base_path + [interface, 'min-links', str(min_links)]) + self.cli_set(self._base_path + [interface, 'min-links', str(min_links)]) - self.session.commit() + self.cli_commit() # verify config for interface in self._interfaces: @@ -116,11 +119,11 @@ class BondingInterfaceTest(BasicInterfaceTest.BaseTest): lacp_rate = 'fast' for interface in self._interfaces: for option in self._options.get(interface, []): - self.session.set(self._base_path + [interface] + option.split()) + self.cli_set(self._base_path + [interface] + option.split()) - self.session.set(self._base_path + [interface, 'lacp-rate', lacp_rate]) + self.cli_set(self._base_path + [interface, 'lacp-rate', lacp_rate]) - self.session.commit() + self.cli_commit() # verify config for interface in self._interfaces: @@ -136,11 +139,11 @@ class BondingInterfaceTest(BasicInterfaceTest.BaseTest): for hash_policy in hash_policies: for interface in self._interfaces: for option in self._options.get(interface, []): - self.session.set(self._base_path + [interface] + option.split()) + self.cli_set(self._base_path + [interface] + option.split()) - self.session.set(self._base_path + [interface, 'hash-policy', hash_policy]) + self.cli_set(self._base_path + [interface, 'hash-policy', hash_policy]) - self.session.commit() + self.cli_commit() # verify config for interface in self._interfaces: diff --git a/smoketest/scripts/cli/test_interfaces_bridge.py b/smoketest/scripts/cli/test_interfaces_bridge.py index 5e7b2bbd2..4014c1a4c 100755 --- a/smoketest/scripts/cli/test_interfaces_bridge.py +++ b/smoketest/scripts/cli/test_interfaces_bridge.py @@ -25,8 +25,10 @@ from netifaces import interfaces from vyos.ifconfig import Section from vyos.util import cmd from vyos.util import read_file +from vyos.util import get_interface_config +from vyos.validate import is_intf_addr_assigned -class BridgeInterfaceTest(BasicInterfaceTest.BaseTest): +class BridgeInterfaceTest(BasicInterfaceTest.TestCase): @classmethod def setUpClass(cls): cls._test_ip = True @@ -52,46 +54,55 @@ class BridgeInterfaceTest(BasicInterfaceTest.BaseTest): cls._options['br0'].append(f'member interface {member}') cls._interfaces = list(cls._options) + # call base-classes classmethod + super(cls, cls).setUpClass() + + def tearDown(self): + for intf in self._interfaces: + self.cli_delete(self._base_path + [intf]) + + super().tearDown() + def test_add_remove_bridge_member(self): # Add member interfaces to bridge and set STP cost/priority for interface in self._interfaces: base = self._base_path + [interface] - self.session.set(base + ['stp']) - self.session.set(base + ['address', '192.0.2.1/24']) + self.cli_set(base + ['stp']) + self.cli_set(base + ['address', '192.0.2.1/24']) cost = 1000 priority = 10 # assign members to bridge interface for member in self._members: base_member = base + ['member', 'interface', member] - self.session.set(base_member + ['cost', str(cost)]) - self.session.set(base_member + ['priority', str(priority)]) + self.cli_set(base_member + ['cost', str(cost)]) + self.cli_set(base_member + ['priority', str(priority)]) cost += 1 priority += 1 # commit config - self.session.commit() - - # check member interfaces are added on the bridge - bridge_members = [] - for tmp in glob(f'/sys/class/net/{interface}/lower_*'): - bridge_members.append(os.path.basename(tmp).replace('lower_', '')) + self.cli_commit() - for member in self._members: - self.assertIn(member, bridge_members) - - # delete all members + # Add member interfaces to bridge and set STP cost/priority for interface in self._interfaces: - self.session.delete(self._base_path + [interface, 'member']) + cost = 1000 + priority = 10 + for member in self._members: + tmp = get_interface_config(member) + self.assertEqual(interface, tmp['master']) + self.assertFalse( tmp['linkinfo']['info_slave_data']['isolated']) + self.assertEqual(cost, tmp['linkinfo']['info_slave_data']['cost']) + self.assertEqual(priority, tmp['linkinfo']['info_slave_data']['priority']) - self.session.commit() + cost += 1 + priority += 1 def test_bridge_vlan_filter(self): # Add member interface to bridge and set VLAN filter for interface in self._interfaces: base = self._base_path + [interface] - self.session.set(base + ['vif', '1', 'address', '192.0.2.1/24']) - self.session.set(base + ['vif', '2', 'address', '192.0.3.1/24']) + self.cli_set(base + ['vif', '1', 'address', '192.0.2.1/24']) + self.cli_set(base + ['vif', '2', 'address', '192.0.3.1/24']) vlan_id = 101 allowed_vlan = 2 @@ -99,13 +110,13 @@ class BridgeInterfaceTest(BasicInterfaceTest.BaseTest): # assign members to bridge interface for member in self._members: base_member = base + ['member', 'interface', member] - self.session.set(base_member + ['allowed-vlan', str(allowed_vlan)]) - self.session.set(base_member + ['allowed-vlan', allowed_vlan_range]) - self.session.set(base_member + ['native-vlan', str(vlan_id)]) + self.cli_set(base_member + ['allowed-vlan', str(allowed_vlan)]) + self.cli_set(base_member + ['allowed-vlan', allowed_vlan_range]) + self.cli_set(base_member + ['native-vlan', str(vlan_id)]) vlan_id += 1 # commit config - self.session.commit() + self.cli_commit() # Detect the vlan filter function for interface in self._interfaces: @@ -161,8 +172,7 @@ class BridgeInterfaceTest(BasicInterfaceTest.BaseTest): # delete all members for interface in self._interfaces: - self.session.delete(self._base_path + [interface, 'member']) - + self.cli_delete(self._base_path + [interface, 'member']) def test_bridge_vlan_members(self): # T2945: ensure that VIFs are not dropped from bridge @@ -170,10 +180,10 @@ class BridgeInterfaceTest(BasicInterfaceTest.BaseTest): for interface in self._interfaces: for member in self._members: for vif in vifs: - self.session.set(['interfaces', 'ethernet', member, 'vif', vif]) - self.session.set(['interfaces', 'bridge', interface, 'member', 'interface', f'{member}.{vif}']) + self.cli_set(['interfaces', 'ethernet', member, 'vif', vif]) + self.cli_set(['interfaces', 'bridge', interface, 'member', 'interface', f'{member}.{vif}']) - self.session.commit() + self.cli_commit() # Verify config for interface in self._interfaces: @@ -182,10 +192,12 @@ class BridgeInterfaceTest(BasicInterfaceTest.BaseTest): # member interface must be assigned to the bridge self.assertTrue(os.path.exists(f'/sys/class/net/{interface}/lower_{member}.{vif}')) - # remove VLAN interfaces - for vif in vifs: - self.session.delete(['interfaces', 'ethernet', member, 'vif', vif]) + # delete all members + for interface in self._interfaces: + for member in self._members: + for vif in vifs: + self.cli_delete(['interfaces', 'ethernet', member, 'vif', vif]) + self.cli_delete(['interfaces', 'bridge', interface, 'member', 'interface', f'{member}.{vif}']) if __name__ == '__main__': unittest.main(verbosity=2) - diff --git a/smoketest/scripts/cli/test_interfaces_dummy.py b/smoketest/scripts/cli/test_interfaces_dummy.py index 6e462bccf..dedc6fe05 100755 --- a/smoketest/scripts/cli/test_interfaces_dummy.py +++ b/smoketest/scripts/cli/test_interfaces_dummy.py @@ -18,11 +18,13 @@ import unittest from base_interfaces_test import BasicInterfaceTest -class DummyInterfaceTest(BasicInterfaceTest.BaseTest): +class DummyInterfaceTest(BasicInterfaceTest.TestCase): @classmethod def setUpClass(cls): cls._base_path = ['interfaces', 'dummy'] cls._interfaces = ['dum435', 'dum8677', 'dum0931', 'dum089'] + # call base-classes classmethod + super(cls, cls).setUpClass() if __name__ == '__main__': unittest.main(verbosity=2) diff --git a/smoketest/scripts/cli/test_interfaces_ethernet.py b/smoketest/scripts/cli/test_interfaces_ethernet.py index bbc5478dc..a31d75423 100755 --- a/smoketest/scripts/cli/test_interfaces_ethernet.py +++ b/smoketest/scripts/cli/test_interfaces_ethernet.py @@ -34,7 +34,7 @@ def get_wpa_supplicant_value(interface, key): tmp = re.findall(r'\n?{}=(.*)'.format(key), tmp) return tmp[0] -class EthernetInterfaceTest(BasicInterfaceTest.BaseTest): +class EthernetInterfaceTest(BasicInterfaceTest.TestCase): @classmethod def setUpClass(cls): cls._test_ip = True @@ -61,37 +61,39 @@ class EthernetInterfaceTest(BasicInterfaceTest.BaseTest): for interface in cls._interfaces: cls._macs[interface] = read_file(f'/sys/class/net/{interface}/address') + # call base-classes classmethod + super(cls, cls).setUpClass() + def tearDown(self): for interface in self._interfaces: # when using a dedicated interface to test via TEST_ETH environment # variable only this one will be cleared in the end - usable to test # ethernet interfaces via SSH - self.session.delete(self._base_path + [interface]) - self.session.set(self._base_path + [interface, 'duplex', 'auto']) - self.session.set(self._base_path + [interface, 'speed', 'auto']) - self.session.set(self._base_path + [interface, 'hw-id', self._macs[interface]]) + self.cli_delete(self._base_path + [interface]) + self.cli_set(self._base_path + [interface, 'duplex', 'auto']) + self.cli_set(self._base_path + [interface, 'speed', 'auto']) + self.cli_set(self._base_path + [interface, 'hw-id', self._macs[interface]]) # Tear down mirror interfaces for SPAN (Switch Port Analyzer) for span in self._mirror_interfaces: section = Section.section(span) - self.session.delete(['interfaces', section, span]) + self.cli_delete(['interfaces', section, span]) - self.session.commit() - del self.session + self.cli_commit() def test_dhcp_disable_interface(self): # When interface is configured as admin down, it must be admin down # even when dhcpc starts on the given interface for interface in self._interfaces: - self.session.set(self._base_path + [interface, 'disable']) + self.cli_set(self._base_path + [interface, 'disable']) # Also enable DHCP (ISC DHCP always places interface in admin up # state so we check that we do not start DHCP client. # https://phabricator.vyos.net/T2767 - self.session.set(self._base_path + [interface, 'address', 'dhcp']) + self.cli_set(self._base_path + [interface, 'address', 'dhcp']) - self.session.commit() + self.cli_commit() # Validate interface state for interface in self._interfaces: @@ -111,9 +113,9 @@ class EthernetInterfaceTest(BasicInterfaceTest.BaseTest): rps_cpus &= ~1 for interface in self._interfaces: - self.session.set(self._base_path + [interface, 'offload', 'rps']) + self.cli_set(self._base_path + [interface, 'offload', 'rps']) - self.session.commit() + self.cli_commit() for interface in self._interfaces: cpus = read_file(f'/sys/class/net/{interface}/queues/rx-0/rps_cpus') @@ -125,35 +127,35 @@ class EthernetInterfaceTest(BasicInterfaceTest.BaseTest): def test_non_existing_interface(self): unknonw_interface = self._base_path + ['eth667'] - self.session.set(unknonw_interface) + self.cli_set(unknonw_interface) # check validate() - interface does not exist with self.assertRaises(ConfigSessionError): - self.session.commit() + self.cli_commit() # we need to remove this wrong interface from the configuration # manually, else tearDown() will have problem in commit() - self.session.delete(unknonw_interface) + self.cli_delete(unknonw_interface) def test_speed_duplex_verify(self): for interface in self._interfaces: - self.session.set(self._base_path + [interface, 'speed', '1000']) + self.cli_set(self._base_path + [interface, 'speed', '1000']) # check validate() - if either speed or duplex is not auto, the # other one must be manually configured, too with self.assertRaises(ConfigSessionError): - self.session.commit() - self.session.set(self._base_path + [interface, 'speed', 'auto']) - self.session.commit() + self.cli_commit() + self.cli_set(self._base_path + [interface, 'speed', 'auto']) + self.cli_commit() def test_eapol_support(self): for interface in self._interfaces: # Enable EAPoL - self.session.set(self._base_path + [interface, 'eapol', 'ca-cert-file', ca_cert]) - self.session.set(self._base_path + [interface, 'eapol', 'cert-file', ssl_cert]) - self.session.set(self._base_path + [interface, 'eapol', 'key-file', ssl_key]) + self.cli_set(self._base_path + [interface, 'eapol', 'ca-cert-file', ca_cert]) + self.cli_set(self._base_path + [interface, 'eapol', 'cert-file', ssl_cert]) + self.cli_set(self._base_path + [interface, 'eapol', 'key-file', ssl_key]) - self.session.commit() + self.cli_commit() # Check for running process self.assertTrue(process_named_running('wpa_supplicant')) diff --git a/smoketest/scripts/cli/test_interfaces_geneve.py b/smoketest/scripts/cli/test_interfaces_geneve.py index b708b5437..8a18d8344 100755 --- a/smoketest/scripts/cli/test_interfaces_geneve.py +++ b/smoketest/scripts/cli/test_interfaces_geneve.py @@ -19,7 +19,7 @@ import unittest from vyos.configsession import ConfigSession from base_interfaces_test import BasicInterfaceTest -class GeneveInterfaceTest(BasicInterfaceTest.BaseTest): +class GeneveInterfaceTest(BasicInterfaceTest.TestCase): @classmethod def setUpClass(cls): cls._test_ip = True @@ -30,6 +30,8 @@ class GeneveInterfaceTest(BasicInterfaceTest.BaseTest): 'gnv1': ['vni 20', 'remote 127.0.1.2'], } cls._interfaces = list(cls._options) + # call base-classes classmethod + super(cls, cls).setUpClass() if __name__ == '__main__': unittest.main(verbosity=2) diff --git a/smoketest/scripts/cli/test_interfaces_l2tpv3.py b/smoketest/scripts/cli/test_interfaces_l2tpv3.py index b9e65f51d..06ced5c40 100755 --- a/smoketest/scripts/cli/test_interfaces_l2tpv3.py +++ b/smoketest/scripts/cli/test_interfaces_l2tpv3.py @@ -21,23 +21,25 @@ import unittest from base_interfaces_test import BasicInterfaceTest from vyos.util import cmd -class L2TPv3InterfaceTest(BasicInterfaceTest.BaseTest): +class L2TPv3InterfaceTest(BasicInterfaceTest.TestCase): @classmethod def setUpClass(cls): cls._test_ip = True cls._test_ipv6 = True cls._base_path = ['interfaces', 'l2tpv3'] cls._options = { - 'l2tpeth10': ['local-ip 127.0.0.1', 'remote-ip 127.10.10.10', + 'l2tpeth10': ['source-address 127.0.0.1', 'remote 127.10.10.10', 'tunnel-id 100', 'peer-tunnel-id 10', 'session-id 100', 'peer-session-id 10', 'source-port 1010', 'destination-port 10101'], - 'l2tpeth20': ['local-ip 127.0.0.1', 'peer-session-id 20', - 'peer-tunnel-id 200', 'remote-ip 127.20.20.20', + 'l2tpeth20': ['source-address 127.0.0.1', 'peer-session-id 20', + 'peer-tunnel-id 200', 'remote 127.20.20.20', 'session-id 20', 'tunnel-id 200', 'source-port 2020', 'destination-port 20202'], } cls._interfaces = list(cls._options) + # call base-classes classmethod + super(cls, cls).setUpClass() def test_add_single_ip_address(self): super().test_add_single_ip_address() diff --git a/smoketest/scripts/cli/test_interfaces_loopback.py b/smoketest/scripts/cli/test_interfaces_loopback.py index 77dd4c1b5..85b5ca6d6 100755 --- a/smoketest/scripts/cli/test_interfaces_loopback.py +++ b/smoketest/scripts/cli/test_interfaces_loopback.py @@ -23,16 +23,17 @@ from vyos.validate import is_intf_addr_assigned loopbacks = ['127.0.0.1', '::1'] -class LoopbackInterfaceTest(BasicInterfaceTest.BaseTest): +class LoopbackInterfaceTest(BasicInterfaceTest.TestCase): @classmethod def setUpClass(cls): - cls._base_path = ['interfaces', 'loopback'] - cls._interfaces = ['lo'] + cls._base_path = ['interfaces', 'loopback'] + cls._interfaces = ['lo'] + # call base-classes classmethod + super(cls, cls).setUpClass() def tearDown(self): - self.session.delete(self._base_path) - self.session.commit() - del self.session + self.cli_delete(self._base_path) + self.cli_commit() # loopback interface must persist! for intf in self._interfaces: diff --git a/smoketest/scripts/cli/test_interfaces_macsec.py b/smoketest/scripts/cli/test_interfaces_macsec.py index 31071a0f7..e4280a5b7 100755 --- a/smoketest/scripts/cli/test_interfaces_macsec.py +++ b/smoketest/scripts/cli/test_interfaces_macsec.py @@ -37,20 +37,22 @@ def get_cipher(interface): tmp = get_interface_config(interface) return tmp['linkinfo']['info_data']['cipher_suite'].lower() -class MACsecInterfaceTest(BasicInterfaceTest.BaseTest): +class MACsecInterfaceTest(BasicInterfaceTest.TestCase): @classmethod def setUpClass(cls): - cls._test_ip = True - cls._test_ipv6 = True - cls._base_path = ['interfaces', 'macsec'] - cls._options = { 'macsec0': ['source-interface eth0', 'security cipher gcm-aes-128'] } + cls._test_ip = True + cls._test_ipv6 = True + cls._base_path = ['interfaces', 'macsec'] + cls._options = { 'macsec0': ['source-interface eth0', 'security cipher gcm-aes-128'] } - # if we have a physical eth1 interface, add a second macsec instance - if 'eth1' in Section.interfaces('ethernet'): - macsec = { 'macsec1': [f'source-interface eth1', 'security cipher gcm-aes-128'] } - cls._options.update(macsec) + # if we have a physical eth1 interface, add a second macsec instance + if 'eth1' in Section.interfaces('ethernet'): + macsec = { 'macsec1': [f'source-interface eth1', 'security cipher gcm-aes-128'] } + cls._options.update(macsec) - cls._interfaces = list(cls._options) + cls._interfaces = list(cls._options) + # call base-classes classmethod + super(cls, cls).setUpClass() def test_macsec_encryption(self): # MACsec can be operating in authentication and encryption mode - both @@ -66,31 +68,31 @@ class MACsecInterfaceTest(BasicInterfaceTest.BaseTest): if option.split()[0] == 'source-interface': src_interface = option.split()[1] - self.session.set(self._base_path + [interface] + option.split()) + self.cli_set(self._base_path + [interface] + option.split()) # Encrypt link - self.session.set(self._base_path + [interface, 'security', 'encrypt']) + self.cli_set(self._base_path + [interface, 'security', 'encrypt']) # check validate() - Physical source interface MTU must be higher then our MTU - self.session.set(self._base_path + [interface, 'mtu', '1500']) + self.cli_set(self._base_path + [interface, 'mtu', '1500']) with self.assertRaises(ConfigSessionError): - self.session.commit() - self.session.delete(self._base_path + [interface, 'mtu']) + self.cli_commit() + self.cli_delete(self._base_path + [interface, 'mtu']) # check validate() - MACsec security keys mandartory when encryption is enabled with self.assertRaises(ConfigSessionError): - self.session.commit() - self.session.set(self._base_path + [interface, 'security', 'mka', 'cak', mak_cak]) + self.cli_commit() + self.cli_set(self._base_path + [interface, 'security', 'mka', 'cak', mak_cak]) # check validate() - MACsec security keys mandartory when encryption is enabled with self.assertRaises(ConfigSessionError): - self.session.commit() - self.session.set(self._base_path + [interface, 'security', 'mka', 'ckn', mak_ckn]) + self.cli_commit() + self.cli_set(self._base_path + [interface, 'security', 'mka', 'ckn', mak_ckn]) - self.session.set(self._base_path + [interface, 'security', 'replay-window', replay_window]) + self.cli_set(self._base_path + [interface, 'security', 'replay-window', replay_window]) # final commit of settings - self.session.commit() + self.cli_commit() tmp = get_config_value(src_interface, 'macsec_integ_only') self.assertTrue("0" in tmp) @@ -117,20 +119,20 @@ class MACsecInterfaceTest(BasicInterfaceTest.BaseTest): def test_macsec_gcm_aes_128(self): interface = 'macsec1' cipher = 'gcm-aes-128' - self.session.set(self._base_path + [interface]) + self.cli_set(self._base_path + [interface]) # check validate() - source interface is mandatory with self.assertRaises(ConfigSessionError): - self.session.commit() - self.session.set(self._base_path + [interface, 'source-interface', 'eth0']) + self.cli_commit() + self.cli_set(self._base_path + [interface, 'source-interface', 'eth0']) # check validate() - cipher is mandatory with self.assertRaises(ConfigSessionError): - self.session.commit() - self.session.set(self._base_path + [interface, 'security', 'cipher', cipher]) + self.cli_commit() + self.cli_set(self._base_path + [interface, 'security', 'cipher', cipher]) # final commit and verify - self.session.commit() + self.cli_commit() self.assertIn(interface, interfaces()) self.assertIn(interface, interfaces()) self.assertEqual(cipher, get_cipher(interface)) @@ -138,20 +140,20 @@ class MACsecInterfaceTest(BasicInterfaceTest.BaseTest): def test_macsec_gcm_aes_256(self): interface = 'macsec4' cipher = 'gcm-aes-256' - self.session.set(self._base_path + [interface]) + self.cli_set(self._base_path + [interface]) # check validate() - source interface is mandatory with self.assertRaises(ConfigSessionError): - self.session.commit() - self.session.set(self._base_path + [interface, 'source-interface', 'eth0']) + self.cli_commit() + self.cli_set(self._base_path + [interface, 'source-interface', 'eth0']) # check validate() - cipher is mandatory with self.assertRaises(ConfigSessionError): - self.session.commit() - self.session.set(self._base_path + [interface, 'security', 'cipher', cipher]) + self.cli_commit() + self.cli_set(self._base_path + [interface, 'security', 'cipher', cipher]) # final commit and verify - self.session.commit() + self.cli_commit() self.assertIn(interface, interfaces()) self.assertEqual(cipher, get_cipher(interface)) @@ -163,24 +165,24 @@ class MACsecInterfaceTest(BasicInterfaceTest.BaseTest): for interface, option_value in self._options.items(): for option in option_value: - self.session.set(self._base_path + [interface] + option.split()) + self.cli_set(self._base_path + [interface] + option.split()) if option.split()[0] == 'source-interface': src_interface = option.split()[1] - self.session.set(base_bridge + ['member', 'interface', src_interface]) + self.cli_set(base_bridge + ['member', 'interface', src_interface]) # check validate() - Source interface must not already be a member of a bridge with self.assertRaises(ConfigSessionError): - self.session.commit() - self.session.delete(base_bridge) + self.cli_commit() + self.cli_delete(base_bridge) - self.session.set(base_bond + ['member', 'interface', src_interface]) + self.cli_set(base_bond + ['member', 'interface', src_interface]) # check validate() - Source interface must not already be a member of a bridge with self.assertRaises(ConfigSessionError): - self.session.commit() - self.session.delete(base_bond) + self.cli_commit() + self.cli_delete(base_bond) # final commit and verify - self.session.commit() + self.cli_commit() self.assertIn(interface, interfaces()) if __name__ == '__main__': diff --git a/smoketest/scripts/cli/test_interfaces_openvpn.py b/smoketest/scripts/cli/test_interfaces_openvpn.py index 42a008007..1a52a0a5b 100755 --- a/smoketest/scripts/cli/test_interfaces_openvpn.py +++ b/smoketest/scripts/cli/test_interfaces_openvpn.py @@ -21,6 +21,8 @@ from glob import glob from ipaddress import IPv4Network from netifaces import interfaces +from base_vyostest_shim import VyOSUnitTestSHIM + from vyos.configsession import ConfigSession from vyos.configsession import ConfigSessionError from vyos.util import cmd @@ -58,77 +60,76 @@ def get_vrf(interface): tmp = tmp.replace('upper_', '') return tmp -class TestInterfacesOpenVPN(unittest.TestCase): +class TestInterfacesOpenVPN(VyOSUnitTestSHIM.TestCase): def setUp(self): - self.session = ConfigSession(os.getpid()) - self.session.set(['interfaces', 'dummy', dummy_if, 'address', '192.0.2.1/32']) - self.session.set(['vrf', 'name', vrf_name, 'table', '12345']) + self.cli_set(['interfaces', 'dummy', dummy_if, 'address', '192.0.2.1/32']) + self.cli_set(['vrf', 'name', vrf_name, 'table', '12345']) def tearDown(self): - self.session.delete(base_path) - self.session.delete(['interfaces', 'dummy', dummy_if]) - self.session.delete(['vrf']) - self.session.commit() - del self.session + self.cli_delete(base_path) + self.cli_delete(['interfaces', 'dummy', dummy_if]) + self.cli_delete(['vrf']) + self.cli_commit() def test_openvpn_client_verify(self): # Create OpenVPN client interface and test verify() steps. interface = 'vtun2000' path = base_path + [interface] - self.session.set(path + ['mode', 'client']) - self.session.set(path + ['encryption', 'ncp-ciphers', 'aes192gcm']) + self.cli_set(path + ['mode', 'client']) + + self.cli_set(path + ['encryption', 'ncp-ciphers', 'aes192gcm']) # check validate() - cannot specify local-port in client mode - self.session.set(path + ['local-port', '5000']) + self.cli_set(path + ['local-port', '5000']) with self.assertRaises(ConfigSessionError): - self.session.commit() - self.session.delete(path + ['local-port']) + self.cli_commit() + self.cli_delete(path + ['local-port']) # check validate() - cannot specify local-host in client mode - self.session.set(path + ['local-host', '127.0.0.1']) + self.cli_set(path + ['local-host', '127.0.0.1']) with self.assertRaises(ConfigSessionError): - self.session.commit() - self.session.delete(path + ['local-host']) + self.cli_commit() + self.cli_delete(path + ['local-host']) # check validate() - cannot specify protocol tcp-passive in client mode - self.session.set(path + ['protocol', 'tcp-passive']) + self.cli_set(path + ['protocol', 'tcp-passive']) with self.assertRaises(ConfigSessionError): - self.session.commit() - self.session.delete(path + ['protocol']) + self.cli_commit() + self.cli_delete(path + ['protocol']) # check validate() - remote-host must be set in client mode with self.assertRaises(ConfigSessionError): - self.session.commit() - self.session.set(path + ['remote-host', '192.0.9.9']) + self.cli_commit() + self.cli_set(path + ['remote-host', '192.0.9.9']) # check validate() - cannot specify "tls dh-file" in client mode - self.session.set(path + ['tls', 'dh-file', dh_pem]) + self.cli_set(path + ['tls', 'dh-file', dh_pem]) with self.assertRaises(ConfigSessionError): - self.session.commit() - self.session.delete(path + ['tls']) + self.cli_commit() + self.cli_delete(path + ['tls']) # check validate() - must specify one of "shared-secret-key-file" and "tls" with self.assertRaises(ConfigSessionError): - self.session.commit() - self.session.set(path + ['shared-secret-key-file', s2s_key]) + self.cli_commit() + self.cli_set(path + ['shared-secret-key-file', s2s_key]) # check validate() - must specify one of "shared-secret-key-file" and "tls" with self.assertRaises(ConfigSessionError): - self.session.commit() - self.session.delete(path + ['shared-secret-key-file', s2s_key]) + self.cli_commit() + self.cli_delete(path + ['shared-secret-key-file', s2s_key]) - self.session.set(path + ['tls', 'ca-cert-file', ca_cert]) - self.session.set(path + ['tls', 'cert-file', ssl_cert]) - self.session.set(path + ['tls', 'key-file', ssl_key]) + self.cli_set(path + ['tls', 'ca-cert-file', ca_cert]) + self.cli_set(path + ['tls', 'cert-file', ssl_cert]) + self.cli_set(path + ['tls', 'key-file', ssl_key]) # check validate() - can not have auth username without a password - self.session.set(path + ['authentication', 'username', 'vyos']) + self.cli_set(path + ['authentication', 'username', 'vyos']) with self.assertRaises(ConfigSessionError): - self.session.commit() - self.session.set(path + ['authentication', 'password', 'vyos']) + self.cli_commit() + self.cli_set(path + ['authentication', 'password', 'vyos']) # client commit must pass - self.session.commit() + self.cli_commit() self.assertTrue(process_named_running(PROCESS_NAME)) self.assertIn(interface, interfaces()) @@ -144,22 +145,22 @@ class TestInterfacesOpenVPN(unittest.TestCase): path = base_path + [interface] auth_hash = 'sha1' - self.session.set(path + ['device-type', 'tun']) - self.session.set(path + ['encryption', 'cipher', 'aes256']) - self.session.set(path + ['hash', auth_hash]) - self.session.set(path + ['mode', 'client']) - self.session.set(path + ['persistent-tunnel']) - self.session.set(path + ['protocol', protocol]) - self.session.set(path + ['remote-host', remote_host]) - self.session.set(path + ['remote-port', remote_port]) - self.session.set(path + ['tls', 'ca-cert-file', ca_cert]) - self.session.set(path + ['tls', 'cert-file', ssl_cert]) - self.session.set(path + ['tls', 'key-file', ssl_key]) - self.session.set(path + ['vrf', vrf_name]) - self.session.set(path + ['authentication', 'username', interface+'user']) - self.session.set(path + ['authentication', 'password', interface+'secretpw']) - - self.session.commit() + self.cli_set(path + ['device-type', 'tun']) + self.cli_set(path + ['encryption', 'cipher', 'aes256']) + self.cli_set(path + ['hash', auth_hash]) + self.cli_set(path + ['mode', 'client']) + self.cli_set(path + ['persistent-tunnel']) + self.cli_set(path + ['protocol', protocol]) + self.cli_set(path + ['remote-host', remote_host]) + self.cli_set(path + ['remote-port', remote_port]) + self.cli_set(path + ['tls', 'ca-cert-file', ca_cert]) + self.cli_set(path + ['tls', 'cert-file', ssl_cert]) + self.cli_set(path + ['tls', 'key-file', ssl_key]) + self.cli_set(path + ['vrf', vrf_name]) + self.cli_set(path + ['authentication', 'username', interface+'user']) + self.cli_set(path + ['authentication', 'password', interface+'secretpw']) + + self.cli_commit() for ii in num_range: interface = f'vtun{ii}' @@ -192,8 +193,8 @@ class TestInterfacesOpenVPN(unittest.TestCase): self.assertIn(f'{interface}secretpw', pw) # check that no interface remained after deleting them - self.session.delete(base_path) - self.session.commit() + self.cli_delete(base_path) + self.cli_commit() for ii in num_range: interface = f'vtun{ii}' @@ -205,104 +206,104 @@ class TestInterfacesOpenVPN(unittest.TestCase): path = base_path + [interface] # check validate() - must speciy operating mode - self.session.set(path) + self.cli_set(path) with self.assertRaises(ConfigSessionError): - self.session.commit() - self.session.set(path + ['mode', 'server']) + self.cli_commit() + self.cli_set(path + ['mode', 'server']) # check validate() - cannot specify protocol tcp-active in server mode - self.session.set(path + ['protocol', 'tcp-active']) + self.cli_set(path + ['protocol', 'tcp-active']) with self.assertRaises(ConfigSessionError): - self.session.commit() - self.session.delete(path + ['protocol']) + self.cli_commit() + self.cli_delete(path + ['protocol']) # check validate() - cannot specify local-port in client mode - self.session.set(path + ['remote-port', '5000']) + self.cli_set(path + ['remote-port', '5000']) with self.assertRaises(ConfigSessionError): - self.session.commit() - self.session.delete(path + ['remote-port']) + self.cli_commit() + self.cli_delete(path + ['remote-port']) # check validate() - cannot specify local-host in client mode - self.session.set(path + ['remote-host', '127.0.0.1']) + self.cli_set(path + ['remote-host', '127.0.0.1']) with self.assertRaises(ConfigSessionError): - self.session.commit() - self.session.delete(path + ['remote-host']) + self.cli_commit() + self.cli_delete(path + ['remote-host']) # check validate() - must specify "tls dh-file" when not using EC keys # in server mode with self.assertRaises(ConfigSessionError): - self.session.commit() - self.session.set(path + ['tls', 'dh-file', dh_pem]) + self.cli_commit() + self.cli_set(path + ['tls', 'dh-file', dh_pem]) # check validate() - must specify "server subnet" or add interface to # bridge in server mode with self.assertRaises(ConfigSessionError): - self.session.commit() + self.cli_commit() # check validate() - server client-ip-pool is too large # [100.64.0.4 -> 100.127.255.251 = 4194295], maximum is 65536 addresses. - self.session.set(path + ['server', 'subnet', '100.64.0.0/10']) + self.cli_set(path + ['server', 'subnet', '100.64.0.0/10']) with self.assertRaises(ConfigSessionError): - self.session.commit() + self.cli_commit() # check validate() - cannot specify more than 1 IPv4 and 1 IPv6 server subnet - self.session.set(path + ['server', 'subnet', '100.64.0.0/20']) + self.cli_set(path + ['server', 'subnet', '100.64.0.0/20']) with self.assertRaises(ConfigSessionError): - self.session.commit() - self.session.delete(path + ['server', 'subnet', '100.64.0.0/10']) + self.cli_commit() + self.cli_delete(path + ['server', 'subnet', '100.64.0.0/10']) # check validate() - must specify "tls ca-cert-file" with self.assertRaises(ConfigSessionError): - self.session.commit() - self.session.set(path + ['tls', 'ca-cert-file', ca_cert]) + self.cli_commit() + self.cli_set(path + ['tls', 'ca-cert-file', ca_cert]) # check validate() - must specify "tls cert-file" with self.assertRaises(ConfigSessionError): - self.session.commit() - self.session.set(path + ['tls', 'cert-file', ssl_cert]) + self.cli_commit() + self.cli_set(path + ['tls', 'cert-file', ssl_cert]) # check validate() - must specify "tls key-file" with self.assertRaises(ConfigSessionError): - self.session.commit() - self.session.set(path + ['tls', 'key-file', ssl_key]) + self.cli_commit() + self.cli_set(path + ['tls', 'key-file', ssl_key]) # check validate() - cannot specify "tls role" in client-server mode' - self.session.set(path + ['tls', 'role', 'active']) + self.cli_set(path + ['tls', 'role', 'active']) with self.assertRaises(ConfigSessionError): - self.session.commit() + self.cli_commit() # check validate() - cannot specify "tls role" in client-server mode' - self.session.set(path + ['tls', 'auth-file', auth_key]) + self.cli_set(path + ['tls', 'auth-file', auth_key]) with self.assertRaises(ConfigSessionError): - self.session.commit() + self.cli_commit() # check validate() - cannot specify "tcp-passive" when "tls role" is "active" - self.session.set(path + ['protocol', 'tcp-passive']) + self.cli_set(path + ['protocol', 'tcp-passive']) with self.assertRaises(ConfigSessionError): - self.session.commit() - self.session.delete(path + ['protocol']) + self.cli_commit() + self.cli_delete(path + ['protocol']) # check validate() - cannot specify "tls dh-file" when "tls role" is "active" - self.session.set(path + ['tls', 'dh-file', dh_pem]) + self.cli_set(path + ['tls', 'dh-file', dh_pem]) with self.assertRaises(ConfigSessionError): - self.session.commit() - self.session.delete(path + ['tls', 'dh-file']) + self.cli_commit() + self.cli_delete(path + ['tls', 'dh-file']) # Now test the other path with tls role passive - self.session.set(path + ['tls', 'role', 'passive']) + self.cli_set(path + ['tls', 'role', 'passive']) # check validate() - cannot specify "tcp-active" when "tls role" is "passive" - self.session.set(path + ['protocol', 'tcp-active']) + self.cli_set(path + ['protocol', 'tcp-active']) with self.assertRaises(ConfigSessionError): - self.session.commit() - self.session.delete(path + ['protocol']) + self.cli_commit() + self.cli_delete(path + ['protocol']) # check validate() - must specify "tls dh-file" when "tls role" is "passive" with self.assertRaises(ConfigSessionError): - self.session.commit() - self.session.set(path + ['tls', 'dh-file', dh_pem]) + self.cli_commit() + self.cli_set(path + ['tls', 'dh-file', dh_pem]) - self.session.commit() + self.cli_commit() self.assertTrue(process_named_running(PROCESS_NAME)) self.assertIn(interface, interfaces()) @@ -322,29 +323,29 @@ class TestInterfacesOpenVPN(unittest.TestCase): path = base_path + [interface] port = str(2000 + ii) - self.session.set(path + ['device-type', 'tun']) - self.session.set(path + ['encryption', 'cipher', 'aes192']) - self.session.set(path + ['hash', auth_hash]) - self.session.set(path + ['mode', 'server']) - self.session.set(path + ['local-port', port]) - self.session.set(path + ['server', 'subnet', subnet]) - self.session.set(path + ['server', 'topology', 'subnet']) - self.session.set(path + ['keep-alive', 'failure-count', '5']) - self.session.set(path + ['keep-alive', 'interval', '5']) + self.cli_set(path + ['device-type', 'tun']) + self.cli_set(path + ['encryption', 'cipher', 'aes192']) + self.cli_set(path + ['hash', auth_hash]) + self.cli_set(path + ['mode', 'server']) + self.cli_set(path + ['local-port', port]) + self.cli_set(path + ['server', 'subnet', subnet]) + self.cli_set(path + ['server', 'topology', 'subnet']) + self.cli_set(path + ['keep-alive', 'failure-count', '5']) + self.cli_set(path + ['keep-alive', 'interval', '5']) # clients - self.session.set(path + ['server', 'client', 'client1', 'ip', client_ip]) + self.cli_set(path + ['server', 'client', 'client1', 'ip', client_ip]) for route in client1_routes: - self.session.set(path + ['server', 'client', 'client1', 'subnet', route]) + self.cli_set(path + ['server', 'client', 'client1', 'subnet', route]) - self.session.set(path + ['replace-default-route']) - self.session.set(path + ['tls', 'ca-cert-file', ca_cert]) - self.session.set(path + ['tls', 'cert-file', ssl_cert]) - self.session.set(path + ['tls', 'key-file', ssl_key]) - self.session.set(path + ['tls', 'dh-file', dh_pem]) - self.session.set(path + ['vrf', vrf_name]) + self.cli_set(path + ['replace-default-route']) + self.cli_set(path + ['tls', 'ca-cert-file', ca_cert]) + self.cli_set(path + ['tls', 'cert-file', ssl_cert]) + self.cli_set(path + ['tls', 'key-file', ssl_key]) + self.cli_set(path + ['tls', 'dh-file', dh_pem]) + self.cli_set(path + ['vrf', vrf_name]) - self.session.commit() + self.cli_commit() for ii in num_range: interface = f'vtun{ii}' @@ -396,8 +397,8 @@ class TestInterfacesOpenVPN(unittest.TestCase): self.assertIn(interface, interfaces()) # check that no interface remained after deleting them - self.session.delete(base_path) - self.session.commit() + self.cli_delete(base_path) + self.cli_commit() for ii in num_range: interface = f'vtun{ii}' @@ -415,23 +416,23 @@ class TestInterfacesOpenVPN(unittest.TestCase): path = base_path + [interface] port = str(2000 + ii) - self.session.set(path + ['device-type', 'tun']) - self.session.set(path + ['encryption', 'cipher', 'aes192']) - self.session.set(path + ['hash', auth_hash]) - self.session.set(path + ['mode', 'server']) - self.session.set(path + ['local-port', port]) - self.session.set(path + ['server', 'subnet', subnet]) - self.session.set(path + ['server', 'topology', 'net30']) - self.session.set(path + ['replace-default-route']) - self.session.set(path + ['keep-alive', 'failure-count', '10']) - self.session.set(path + ['keep-alive', 'interval', '5']) - self.session.set(path + ['tls', 'ca-cert-file', ca_cert]) - self.session.set(path + ['tls', 'cert-file', ssl_cert]) - self.session.set(path + ['tls', 'key-file', ssl_key]) - self.session.set(path + ['tls', 'dh-file', dh_pem]) - self.session.set(path + ['vrf', vrf_name]) - - self.session.commit() + self.cli_set(path + ['device-type', 'tun']) + self.cli_set(path + ['encryption', 'cipher', 'aes192']) + self.cli_set(path + ['hash', auth_hash]) + self.cli_set(path + ['mode', 'server']) + self.cli_set(path + ['local-port', port]) + self.cli_set(path + ['server', 'subnet', subnet]) + self.cli_set(path + ['server', 'topology', 'net30']) + self.cli_set(path + ['replace-default-route']) + self.cli_set(path + ['keep-alive', 'failure-count', '10']) + self.cli_set(path + ['keep-alive', 'interval', '5']) + self.cli_set(path + ['tls', 'ca-cert-file', ca_cert]) + self.cli_set(path + ['tls', 'cert-file', ssl_cert]) + self.cli_set(path + ['tls', 'key-file', ssl_key]) + self.cli_set(path + ['tls', 'dh-file', dh_pem]) + self.cli_set(path + ['vrf', vrf_name]) + + self.cli_commit() for ii in num_range: interface = f'vtun{ii}' @@ -471,8 +472,8 @@ class TestInterfacesOpenVPN(unittest.TestCase): self.assertIn(interface, interfaces()) # check that no interface remained after deleting them - self.session.delete(base_path) - self.session.commit() + self.cli_delete(base_path) + self.cli_commit() for ii in num_range: interface = f'vtun{ii}' @@ -485,57 +486,57 @@ class TestInterfacesOpenVPN(unittest.TestCase): interface = 'vtun5000' path = base_path + [interface] - self.session.set(path + ['mode', 'site-to-site']) + self.cli_set(path + ['mode', 'site-to-site']) # check validate() - encryption ncp-ciphers cannot be specified in site-to-site mode - self.session.set(path + ['encryption', 'ncp-ciphers', 'aes192gcm']) + self.cli_set(path + ['encryption', 'ncp-ciphers', 'aes192gcm']) with self.assertRaises(ConfigSessionError): - self.session.commit() - self.session.delete(path + ['encryption']) + self.cli_commit() + self.cli_delete(path + ['encryption']) # check validate() - must specify "local-address" or add interface to bridge with self.assertRaises(ConfigSessionError): - self.session.commit() - self.session.set(path + ['local-address', '10.0.0.1']) - self.session.set(path + ['local-address', '2001:db8:1::1']) + self.cli_commit() + self.cli_set(path + ['local-address', '10.0.0.1']) + self.cli_set(path + ['local-address', '2001:db8:1::1']) # check validate() - cannot specify more than 1 IPv4 local-address - self.session.set(path + ['local-address', '10.0.0.2']) + self.cli_set(path + ['local-address', '10.0.0.2']) with self.assertRaises(ConfigSessionError): - self.session.commit() - self.session.delete(path + ['local-address', '10.0.0.2']) + self.cli_commit() + self.cli_delete(path + ['local-address', '10.0.0.2']) # check validate() - cannot specify more than 1 IPv6 local-address - self.session.set(path + ['local-address', '2001:db8:1::2']) + self.cli_set(path + ['local-address', '2001:db8:1::2']) with self.assertRaises(ConfigSessionError): - self.session.commit() - self.session.delete(path + ['local-address', '2001:db8:1::2']) + self.cli_commit() + self.cli_delete(path + ['local-address', '2001:db8:1::2']) # check validate() - IPv4 "local-address" requires IPv4 "remote-address" # or IPv4 "local-address subnet" with self.assertRaises(ConfigSessionError): - self.session.commit() - self.session.set(path + ['remote-address', '192.168.0.1']) - self.session.set(path + ['remote-address', '2001:db8:ffff::1']) + self.cli_commit() + self.cli_set(path + ['remote-address', '192.168.0.1']) + self.cli_set(path + ['remote-address', '2001:db8:ffff::1']) # check validate() - Cannot specify more than 1 IPv4 "remote-address" - self.session.set(path + ['remote-address', '192.168.0.2']) + self.cli_set(path + ['remote-address', '192.168.0.2']) with self.assertRaises(ConfigSessionError): - self.session.commit() - self.session.delete(path + ['remote-address', '192.168.0.2']) + self.cli_commit() + self.cli_delete(path + ['remote-address', '192.168.0.2']) # check validate() - Cannot specify more than 1 IPv6 "remote-address" - self.session.set(path + ['remote-address', '2001:db8:ffff::2']) + self.cli_set(path + ['remote-address', '2001:db8:ffff::2']) with self.assertRaises(ConfigSessionError): - self.session.commit() - self.session.delete(path + ['remote-address', '2001:db8:ffff::2']) + self.cli_commit() + self.cli_delete(path + ['remote-address', '2001:db8:ffff::2']) # check validate() - Must specify one of "shared-secret-key-file" and "tls" with self.assertRaises(ConfigSessionError): - self.session.commit() - self.session.set(path + ['shared-secret-key-file', s2s_key]) + self.cli_commit() + self.cli_set(path + ['shared-secret-key-file', s2s_key]) - self.session.commit() + self.cli_commit() def test_openvpn_site2site_interfaces_tun(self): # Create two OpenVPN site-to-site interfaces @@ -553,23 +554,23 @@ class TestInterfacesOpenVPN(unittest.TestCase): path = base_path + [interface] port = str(3000 + ii) - self.session.set(path + ['local-address', local_address]) + self.cli_set(path + ['local-address', local_address]) # even numbers use tun type, odd numbers use tap type if ii % 2 == 0: - self.session.set(path + ['device-type', 'tun']) + self.cli_set(path + ['device-type', 'tun']) else: - self.session.set(path + ['device-type', 'tap']) - self.session.set(path + ['local-address', local_address, 'subnet-mask', local_address_subnet]) + self.cli_set(path + ['device-type', 'tap']) + self.cli_set(path + ['local-address', local_address, 'subnet-mask', local_address_subnet]) - self.session.set(path + ['mode', 'site-to-site']) - self.session.set(path + ['local-port', port]) - self.session.set(path + ['remote-port', port]) - self.session.set(path + ['shared-secret-key-file', s2s_key]) - self.session.set(path + ['remote-address', remote_address]) - self.session.set(path + ['vrf', vrf_name]) + self.cli_set(path + ['mode', 'site-to-site']) + self.cli_set(path + ['local-port', port]) + self.cli_set(path + ['remote-port', port]) + self.cli_set(path + ['shared-secret-key-file', s2s_key]) + self.cli_set(path + ['remote-address', remote_address]) + self.cli_set(path + ['vrf', vrf_name]) - self.session.commit() + self.cli_commit() for ii in num_range: interface = f'vtun{ii}' @@ -600,8 +601,8 @@ class TestInterfacesOpenVPN(unittest.TestCase): # check that no interface remained after deleting them - self.session.delete(base_path) - self.session.commit() + self.cli_delete(base_path) + self.cli_commit() for ii in num_range: interface = f'vtun{ii}' diff --git a/smoketest/scripts/cli/test_interfaces_pppoe.py b/smoketest/scripts/cli/test_interfaces_pppoe.py index f7fa73ea2..3412ebae0 100755 --- a/smoketest/scripts/cli/test_interfaces_pppoe.py +++ b/smoketest/scripts/cli/test_interfaces_pppoe.py @@ -15,11 +15,13 @@ # along with this program. If not, see . import re -import os import unittest from psutil import process_iter -from vyos.configsession import ConfigSession, ConfigSessionError +from base_vyostest_shim import VyOSUnitTestSHIM + +from vyos.configsession import ConfigSession +from vyos.configsession import ConfigSessionError from vyos.util import read_file config_file = '/etc/ppp/peers/{}' @@ -42,16 +44,14 @@ def get_dhcp6c_config_value(interface, key): out.append(item.replace(';','')) return out -class PPPoEInterfaceTest(unittest.TestCase): +class PPPoEInterfaceTest(VyOSUnitTestSHIM.TestCase): def setUp(self): - self.session = ConfigSession(os.getpid()) self._interfaces = ['pppoe10', 'pppoe20', 'pppoe30'] self._source_interface = 'eth0' def tearDown(self): - self.session.delete(base_path) - self.session.commit() - del self.session + self.cli_delete(base_path) + self.cli_commit() def test_pppoe_client(self): # Check if PPPoE dialer can be configured and runs @@ -60,19 +60,19 @@ class PPPoEInterfaceTest(unittest.TestCase): passwd = 'VyOS-passwd-' + interface mtu = '1400' - self.session.set(base_path + [interface, 'authentication', 'user', user]) - self.session.set(base_path + [interface, 'authentication', 'password', passwd]) - self.session.set(base_path + [interface, 'default-route', 'auto']) - self.session.set(base_path + [interface, 'mtu', mtu]) - self.session.set(base_path + [interface, 'no-peer-dns']) + self.cli_set(base_path + [interface, 'authentication', 'user', user]) + self.cli_set(base_path + [interface, 'authentication', 'password', passwd]) + self.cli_set(base_path + [interface, 'default-route', 'auto']) + self.cli_set(base_path + [interface, 'mtu', mtu]) + self.cli_set(base_path + [interface, 'no-peer-dns']) # check validate() - a source-interface is required with self.assertRaises(ConfigSessionError): - self.session.commit() - self.session.set(base_path + [interface, 'source-interface', self._source_interface]) + self.cli_commit() + self.cli_set(base_path + [interface, 'source-interface', self._source_interface]) # commit changes - self.session.commit() + self.cli_commit() # verify configuration file(s) for interface in self._interfaces: @@ -97,27 +97,48 @@ class PPPoEInterfaceTest(unittest.TestCase): self.assertTrue(running) + + def test_pppoe_clent_disabled_interface(self): + # Check if PPPoE Client can be disabled + for interface in self._interfaces: + self.cli_set(base_path + [interface, 'authentication', 'user', 'vyos']) + self.cli_set(base_path + [interface, 'authentication', 'password', 'vyos']) + self.cli_set(base_path + [interface, 'source-interface', self._source_interface]) + self.cli_set(base_path + [interface, 'disable']) + + self.cli_commit() + + # Validate PPPoE client process + running = False + for interface in self._interfaces: + for proc in process_iter(): + if interface in proc.cmdline(): + running = True + + self.assertFalse(running) + + def test_pppoe_dhcpv6pd(self): # Check if PPPoE dialer can be configured with DHCPv6-PD address = '1' sla_id = '0' sla_len = '8' for interface in self._interfaces: - self.session.set(base_path + [interface, 'authentication', 'user', 'vyos']) - self.session.set(base_path + [interface, 'authentication', 'password', 'vyos']) - self.session.set(base_path + [interface, 'default-route', 'none']) - self.session.set(base_path + [interface, 'no-peer-dns']) - self.session.set(base_path + [interface, 'source-interface', self._source_interface]) - self.session.set(base_path + [interface, 'ipv6', 'address', 'autoconf']) + self.cli_set(base_path + [interface, 'authentication', 'user', 'vyos']) + self.cli_set(base_path + [interface, 'authentication', 'password', 'vyos']) + self.cli_set(base_path + [interface, 'default-route', 'none']) + self.cli_set(base_path + [interface, 'no-peer-dns']) + self.cli_set(base_path + [interface, 'source-interface', self._source_interface]) + self.cli_set(base_path + [interface, 'ipv6', 'address', 'autoconf']) # prefix delegation stuff dhcpv6_pd_base = base_path + [interface, 'dhcpv6-options', 'pd', '0'] - self.session.set(dhcpv6_pd_base + ['length', '56']) - self.session.set(dhcpv6_pd_base + ['interface', self._source_interface, 'address', address]) - self.session.set(dhcpv6_pd_base + ['interface', self._source_interface, 'sla-id', sla_id]) + self.cli_set(dhcpv6_pd_base + ['length', '56']) + self.cli_set(dhcpv6_pd_base + ['interface', self._source_interface, 'address', address]) + self.cli_set(dhcpv6_pd_base + ['interface', self._source_interface, 'sla-id', sla_id]) # commit changes - self.session.commit() + self.cli_commit() # verify "normal" PPPoE value - 1492 is default MTU tmp = get_config_value(interface, 'mtu')[1] @@ -161,16 +182,16 @@ class PPPoEInterfaceTest(unittest.TestCase): def test_pppoe_authentication(self): # When username or password is set - so must be the other interface = 'pppoe0' - self.session.set(base_path + [interface, 'authentication', 'user', 'vyos']) - self.session.set(base_path + [interface, 'source-interface', self._source_interface]) - self.session.set(base_path + [interface, 'ipv6', 'address', 'autoconf']) + self.cli_set(base_path + [interface, 'authentication', 'user', 'vyos']) + self.cli_set(base_path + [interface, 'source-interface', self._source_interface]) + self.cli_set(base_path + [interface, 'ipv6', 'address', 'autoconf']) # check validate() - if user is set, so must be the password with self.assertRaises(ConfigSessionError): - self.session.commit() + self.cli_commit() - self.session.set(base_path + [interface, 'authentication', 'password', 'vyos']) - self.session.commit() + self.cli_set(base_path + [interface, 'authentication', 'password', 'vyos']) + self.cli_commit() if __name__ == '__main__': unittest.main(verbosity=2) diff --git a/smoketest/scripts/cli/test_interfaces_pseudo_ethernet.py b/smoketest/scripts/cli/test_interfaces_pseudo_ethernet.py index 8dbbb7f95..dacbca99c 100755 --- a/smoketest/scripts/cli/test_interfaces_pseudo_ethernet.py +++ b/smoketest/scripts/cli/test_interfaces_pseudo_ethernet.py @@ -19,7 +19,7 @@ import unittest from vyos.ifconfig import Section from base_interfaces_test import BasicInterfaceTest -class PEthInterfaceTest(BasicInterfaceTest.BaseTest): +class PEthInterfaceTest(BasicInterfaceTest.TestCase): @classmethod def setUpClass(cls): cls._test_ip = True @@ -35,6 +35,8 @@ class PEthInterfaceTest(BasicInterfaceTest.BaseTest): 'peth1': ['source-interface eth1'], } cls._interfaces = list(cls._options) + # call base-classes classmethod + super(cls, cls).setUpClass() if __name__ == '__main__': unittest.main(verbosity=2) diff --git a/smoketest/scripts/cli/test_interfaces_tunnel.py b/smoketest/scripts/cli/test_interfaces_tunnel.py index e65d83840..3aed498b4 100755 --- a/smoketest/scripts/cli/test_interfaces_tunnel.py +++ b/smoketest/scripts/cli/test_interfaces_tunnel.py @@ -16,20 +16,18 @@ import unittest -from vyos.configsession import ConfigSession +from base_interfaces_test import BasicInterfaceTest + from vyos.configsession import ConfigSessionError from vyos.util import get_interface_config from vyos.template import inc_ip -from vyos.util import cmd - -from base_interfaces_test import BasicInterfaceTest remote_ip4 = '192.0.2.100' remote_ip6 = '2001:db8::ffff' source_if = 'dum2222' mtu = 1476 -class TunnelInterfaceTest(BasicInterfaceTest.BaseTest): +class TunnelInterfaceTest(BasicInterfaceTest.TestCase): @classmethod def setUpClass(cls): cls._test_ip = True @@ -43,17 +41,18 @@ class TunnelInterfaceTest(BasicInterfaceTest.BaseTest): 'tun20': ['encapsulation gre', 'remote 192.0.2.20', 'source-address ' + cls.local_v4], } cls._interfaces = list(cls._options) + # call base-classes classmethod + super(cls, cls).setUpClass() def setUp(self): super().setUp() - self.session.set(['interfaces', 'dummy', source_if, 'address', self.local_v4 + '/32']) - self.session.set(['interfaces', 'dummy', source_if, 'address', self.local_v6 + '/128']) + self.cli_set(['interfaces', 'dummy', source_if, 'address', self.local_v4 + '/32']) + self.cli_set(['interfaces', 'dummy', source_if, 'address', self.local_v6 + '/128']) def tearDown(self): - self.session.delete(['interfaces', 'dummy', source_if]) + self.cli_delete(['interfaces', 'dummy', source_if]) super().tearDown() - def test_ipv4_encapsulations(self): # When running tests ensure that for certain encapsulation types the # local and remote IP address is actually an IPv4 address @@ -61,31 +60,30 @@ class TunnelInterfaceTest(BasicInterfaceTest.BaseTest): interface = f'tun1000' local_if_addr = f'10.10.200.1/24' for encapsulation in ['ipip', 'sit', 'gre', 'gretap']: - self.session.set(self._base_path + [interface, 'address', local_if_addr]) - self.session.set(self._base_path + [interface, 'encapsulation', encapsulation]) - self.session.set(self._base_path + [interface, 'source-address', self.local_v6]) - self.session.set(self._base_path + [interface, 'remote', remote_ip6]) + self.cli_set(self._base_path + [interface, 'address', local_if_addr]) + self.cli_set(self._base_path + [interface, 'encapsulation', encapsulation]) + self.cli_set(self._base_path + [interface, 'source-address', self.local_v6]) + self.cli_set(self._base_path + [interface, 'remote', remote_ip6]) # Encapsulation mode requires IPv4 source-address with self.assertRaises(ConfigSessionError): - self.session.commit() - self.session.set(self._base_path + [interface, 'source-address', self.local_v4]) + self.cli_commit() + self.cli_set(self._base_path + [interface, 'source-address', self.local_v4]) - # Encapsulation mode requires IPv4 remote address + # Encapsulation mode requires IPv4 remote with self.assertRaises(ConfigSessionError): - self.session.commit() - self.session.set(self._base_path + [interface, 'remote', remote_ip4]) - - self.session.set(self._base_path + [interface, 'source-interface', source_if]) + self.cli_commit() + self.cli_set(self._base_path + [interface, 'remote', remote_ip4]) + self.cli_set(self._base_path + [interface, 'source-interface', source_if]) # Source interface can not be used with sit and gretap if encapsulation in ['sit', 'gretap']: with self.assertRaises(ConfigSessionError): - self.session.commit() - self.session.delete(self._base_path + [interface, 'source-interface']) + self.cli_commit() + self.cli_delete(self._base_path + [interface, 'source-interface']) # Check if commit is ok - self.session.commit() + self.cli_commit() conf = get_interface_config(interface) if encapsulation not in ['sit', 'gretap']: @@ -93,12 +91,14 @@ class TunnelInterfaceTest(BasicInterfaceTest.BaseTest): self.assertEqual(interface, conf['ifname']) self.assertEqual(mtu, conf['mtu']) + self.assertEqual(encapsulation, conf['linkinfo']['info_kind']) self.assertEqual(self.local_v4, conf['linkinfo']['info_data']['local']) self.assertEqual(remote_ip4, conf['linkinfo']['info_data']['remote']) + self.assertTrue(conf['linkinfo']['info_data']['pmtudisc']) # cleanup this instance - self.session.delete(self._base_path + [interface]) - self.session.commit() + self.cli_delete(self._base_path + [interface]) + self.cli_commit() def test_ipv6_encapsulations(self): # When running tests ensure that for certain encapsulation types the @@ -107,31 +107,38 @@ class TunnelInterfaceTest(BasicInterfaceTest.BaseTest): interface = f'tun1010' local_if_addr = f'10.10.200.1/24' for encapsulation in ['ipip6', 'ip6ip6', 'ip6gre']: - self.session.set(self._base_path + [interface, 'address', local_if_addr]) - self.session.set(self._base_path + [interface, 'encapsulation', encapsulation]) - self.session.set(self._base_path + [interface, 'source-address', self.local_v4]) - self.session.set(self._base_path + [interface, 'remote', remote_ip4]) + self.cli_set(self._base_path + [interface, 'address', local_if_addr]) + self.cli_set(self._base_path + [interface, 'encapsulation', encapsulation]) + self.cli_set(self._base_path + [interface, 'source-address', self.local_v4]) + self.cli_set(self._base_path + [interface, 'remote', remote_ip4]) # Encapsulation mode requires IPv6 source-address with self.assertRaises(ConfigSessionError): - self.session.commit() - self.session.set(self._base_path + [interface, 'source-address', self.local_v6]) + self.cli_commit() + self.cli_set(self._base_path + [interface, 'source-address', self.local_v6]) - # Encapsulation mode requires IPv6 remote address + # Encapsulation mode requires IPv6 remote with self.assertRaises(ConfigSessionError): - self.session.commit() - self.session.set(self._base_path + [interface, 'remote', remote_ip6]) + self.cli_commit() + self.cli_set(self._base_path + [interface, 'remote', remote_ip6]) # Configure Tunnel Source interface - self.session.set(self._base_path + [interface, 'source-interface', source_if]) + self.cli_set(self._base_path + [interface, 'source-interface', source_if]) + # Source interface can not be used with ip6gretap + if encapsulation in ['ip6gretap']: + with self.assertRaises(ConfigSessionError): + self.cli_commit() + self.cli_delete(self._base_path + [interface, 'source-interface']) # Check if commit is ok - self.session.commit() + self.cli_commit() conf = get_interface_config(interface) + if encapsulation not in ['ip6gretap']: + self.assertEqual(source_if, conf['link']) + self.assertEqual(interface, conf['ifname']) self.assertEqual(mtu, conf['mtu']) - self.assertEqual(source_if, conf['link']) # Not applicable for ip6gre if 'proto' in conf['linkinfo']['info_data']: @@ -146,46 +153,46 @@ class TunnelInterfaceTest(BasicInterfaceTest.BaseTest): self.assertEqual(remote_ip6, conf['linkinfo']['info_data']['remote']) # cleanup this instance - self.session.delete(self._base_path + [interface]) - self.session.commit() + self.cli_delete(self._base_path + [interface]) + self.cli_commit() def test_tunnel_verify_local_dhcp(self): - # We can not use local-ip and dhcp-interface at the same time + # We can not use source-address and dhcp-interface at the same time interface = f'tun1020' local_if_addr = f'10.0.0.1/24' - self.session.set(self._base_path + [interface, 'address', local_if_addr]) - self.session.set(self._base_path + [interface, 'encapsulation', 'gre']) - self.session.set(self._base_path + [interface, 'source-address', self.local_v4]) - self.session.set(self._base_path + [interface, 'remote', remote_ip4]) - self.session.set(self._base_path + [interface, 'dhcp-interface', 'eth0']) + self.cli_set(self._base_path + [interface, 'address', local_if_addr]) + self.cli_set(self._base_path + [interface, 'encapsulation', 'gre']) + self.cli_set(self._base_path + [interface, 'source-address', self.local_v4]) + self.cli_set(self._base_path + [interface, 'remote', remote_ip4]) + self.cli_set(self._base_path + [interface, 'dhcp-interface', 'eth0']) # source-address and dhcp-interface can not be used at the same time with self.assertRaises(ConfigSessionError): - self.session.commit() - self.session.delete(self._base_path + [interface, 'dhcp-interface']) + self.cli_commit() + self.cli_delete(self._base_path + [interface, 'dhcp-interface']) # Check if commit is ok - self.session.commit() + self.cli_commit() def test_tunnel_parameters_gre(self): interface = f'tun1030' gre_key = '10' encapsulation = 'gre' tos = '20' - ttl = 0 - self.session.set(self._base_path + [interface, 'encapsulation', encapsulation]) - self.session.set(self._base_path + [interface, 'source-address', self.local_v4]) - self.session.set(self._base_path + [interface, 'remote', remote_ip4]) + self.cli_set(self._base_path + [interface, 'encapsulation', encapsulation]) + self.cli_set(self._base_path + [interface, 'source-address', self.local_v4]) + self.cli_set(self._base_path + [interface, 'remote', remote_ip4]) - self.session.set(self._base_path + [interface, 'parameters', 'ip', 'key', gre_key]) - self.session.set(self._base_path + [interface, 'parameters', 'ip', 'tos', tos]) - self.session.set(self._base_path + [interface, 'parameters', 'ip', 'ttl', str(ttl)]) + self.cli_set(self._base_path + [interface, 'parameters', 'ip', 'no-pmtu-discovery']) + self.cli_set(self._base_path + [interface, 'parameters', 'ip', 'key', gre_key]) + self.cli_set(self._base_path + [interface, 'parameters', 'ip', 'tos', tos]) + self.cli_set(self._base_path + [interface, 'parameters', 'ip', 'ttl', '0']) # Check if commit is ok - self.session.commit() + self.cli_commit() conf = get_interface_config(interface) self.assertEqual(mtu, conf['mtu']) @@ -193,7 +200,8 @@ class TunnelInterfaceTest(BasicInterfaceTest.BaseTest): self.assertEqual(encapsulation, conf['linkinfo']['info_kind']) self.assertEqual(self.local_v4, conf['linkinfo']['info_data']['local']) self.assertEqual(remote_ip4, conf['linkinfo']['info_data']['remote']) - self.assertEqual(ttl, conf['linkinfo']['info_data']['ttl']) + self.assertEqual(0, conf['linkinfo']['info_data']['ttl']) + self.assertFalse( conf['linkinfo']['info_data']['pmtudisc']) def test_gretap_parameters_change(self): interface = f'tun1040' @@ -201,31 +209,29 @@ class TunnelInterfaceTest(BasicInterfaceTest.BaseTest): encapsulation = 'gretap' tos = '20' - self.session.set(self._base_path + [interface, 'encapsulation', encapsulation]) - self.session.set(self._base_path + [interface, 'source-address', self.local_v4]) - self.session.set(self._base_path + [interface, 'remote', remote_ip4]) + self.cli_set(self._base_path + [interface, 'encapsulation', encapsulation]) + self.cli_set(self._base_path + [interface, 'source-address', self.local_v4]) + self.cli_set(self._base_path + [interface, 'remote', remote_ip4]) # Check if commit is ok - self.session.commit() + self.cli_commit() conf = get_interface_config(interface) self.assertEqual(mtu, conf['mtu']) self.assertEqual(interface, conf['ifname']) - self.assertEqual('gretap', conf['linkinfo']['info_kind']) + self.assertEqual(encapsulation, conf['linkinfo']['info_kind']) self.assertEqual(self.local_v4, conf['linkinfo']['info_data']['local']) self.assertEqual(remote_ip4, conf['linkinfo']['info_data']['remote']) - # TTL uses a default value - self.assertEqual(64, conf['linkinfo']['info_data']['ttl']) + self.assertEqual(64, conf['linkinfo']['info_data']['ttl']) # Change remote ip address (inc host by 2 new_remote = inc_ip(remote_ip4, 2) - self.session.set(self._base_path + [interface, 'remote', new_remote]) - + self.cli_set(self._base_path + [interface, 'remote', new_remote]) # Check if commit is ok - self.session.commit() + self.cli_commit() conf = get_interface_config(interface) self.assertEqual(new_remote, conf['linkinfo']['info_data']['remote']) if __name__ == '__main__': - unittest.main(verbosity=2, failfast=True) + unittest.main(verbosity=2) diff --git a/smoketest/scripts/cli/test_interfaces_vxlan.py b/smoketest/scripts/cli/test_interfaces_vxlan.py index adbe7138b..0fba0f460 100755 --- a/smoketest/scripts/cli/test_interfaces_vxlan.py +++ b/smoketest/scripts/cli/test_interfaces_vxlan.py @@ -16,10 +16,13 @@ import unittest -from vyos.configsession import ConfigSession, ConfigSessionError +from vyos.configsession import ConfigSession +from vyos.ifconfig import Interface +from vyos.util import get_interface_config + from base_interfaces_test import BasicInterfaceTest -class VXLANInterfaceTest(BasicInterfaceTest.BaseTest): +class VXLANInterfaceTest(BasicInterfaceTest.TestCase): @classmethod def setUpClass(cls): cls._test_ip = True @@ -32,6 +35,8 @@ class VXLANInterfaceTest(BasicInterfaceTest.BaseTest): 'vxlan30': ['vni 30', 'remote 2001:db8:2000::1', 'source-address 2001:db8:1000::1'], } cls._interfaces = list(cls._options) + # call base-classes classmethod + super(cls, cls).setUpClass() if __name__ == '__main__': unittest.main(verbosity=2) diff --git a/smoketest/scripts/cli/test_interfaces_wireguard.py b/smoketest/scripts/cli/test_interfaces_wireguard.py index e70324f96..d31ec0332 100755 --- a/smoketest/scripts/cli/test_interfaces_wireguard.py +++ b/smoketest/scripts/cli/test_interfaces_wireguard.py @@ -17,8 +17,10 @@ import os import unittest -from vyos.configsession import ConfigSession, ConfigSessionError -from base_interfaces_test import BasicInterfaceTest +from base_vyostest_shim import VyOSUnitTestSHIM +from vyos.configsession import ConfigSession +from vyos.configsession import ConfigSessionError + # Generate WireGuard default keypair if not os.path.isdir('/config/auth/wireguard/default'): @@ -26,17 +28,15 @@ if not os.path.isdir('/config/auth/wireguard/default'): base_path = ['interfaces', 'wireguard'] -class WireGuardInterfaceTest(unittest.TestCase): +class WireGuardInterfaceTest(VyOSUnitTestSHIM.TestCase): def setUp(self): - self.session = ConfigSession(os.getpid()) self._test_addr = ['192.0.2.1/26', '192.0.2.255/31', '192.0.2.64/32', '2001:db8:1::ffff/64', '2001:db8:101::1/112'] self._interfaces = ['wg0', 'wg1'] def tearDown(self): - self.session.delete(base_path) - self.session.commit() - del self.session + self.cli_delete(base_path) + self.cli_commit() def test_wireguard_peer(self): # Create WireGuard interfaces with associated peers @@ -46,19 +46,19 @@ class WireGuardInterfaceTest(unittest.TestCase): pubkey = 'n6ZZL7ph/QJUJSUUTyu19c77my1dRCDHkMzFQUO9Z3A=' for addr in self._test_addr: - self.session.set(base_path + [intf, 'address', addr]) + self.cli_set(base_path + [intf, 'address', addr]) - self.session.set(base_path + [intf, 'peer', peer, 'address', '127.0.0.1']) - self.session.set(base_path + [intf, 'peer', peer, 'port', '1337']) + self.cli_set(base_path + [intf, 'peer', peer, 'address', '127.0.0.1']) + self.cli_set(base_path + [intf, 'peer', peer, 'port', '1337']) # Allow different prefixes to traverse the tunnel allowed_ips = ['10.0.0.0/8', '172.16.0.0/12', '192.168.0.0/16'] for ip in allowed_ips: - self.session.set(base_path + [intf, 'peer', peer, 'allowed-ips', ip]) + self.cli_set(base_path + [intf, 'peer', peer, 'allowed-ips', ip]) - self.session.set(base_path + [intf, 'peer', peer, 'preshared-key', psk]) - self.session.set(base_path + [intf, 'peer', peer, 'pubkey', pubkey]) - self.session.commit() + self.cli_set(base_path + [intf, 'peer', peer, 'preshared-key', psk]) + self.cli_set(base_path + [intf, 'peer', peer, 'pubkey', pubkey]) + self.cli_commit() self.assertTrue(os.path.isdir(f'/sys/class/net/{intf}')) @@ -71,26 +71,26 @@ class WireGuardInterfaceTest(unittest.TestCase): pubkey_1 = 'n1CUsmR0M2LUUsyicBd6blZICwUqqWWHbu4ifZ2/9gk=' pubkey_2 = 'ebFx/1G0ti8tvuZd94sEIosAZZIznX+dBAKG/8DFm0I=' - self.session.set(base_path + [interface, 'address', '172.16.0.1/24']) + self.cli_set(base_path + [interface, 'address', '172.16.0.1/24']) - self.session.set(base_path + [interface, 'peer', 'PEER01', 'pubkey', pubkey_1]) - self.session.set(base_path + [interface, 'peer', 'PEER01', 'port', port]) - self.session.set(base_path + [interface, 'peer', 'PEER01', 'allowed-ips', '10.205.212.10/32']) - self.session.set(base_path + [interface, 'peer', 'PEER01', 'address', '192.0.2.1']) + self.cli_set(base_path + [interface, 'peer', 'PEER01', 'pubkey', pubkey_1]) + self.cli_set(base_path + [interface, 'peer', 'PEER01', 'port', port]) + self.cli_set(base_path + [interface, 'peer', 'PEER01', 'allowed-ips', '10.205.212.10/32']) + self.cli_set(base_path + [interface, 'peer', 'PEER01', 'address', '192.0.2.1']) - self.session.set(base_path + [interface, 'peer', 'PEER02', 'pubkey', pubkey_2]) - self.session.set(base_path + [interface, 'peer', 'PEER02', 'port', port]) - self.session.set(base_path + [interface, 'peer', 'PEER02', 'allowed-ips', '10.205.212.11/32']) - self.session.set(base_path + [interface, 'peer', 'PEER02', 'address', '192.0.2.2']) + self.cli_set(base_path + [interface, 'peer', 'PEER02', 'pubkey', pubkey_2]) + self.cli_set(base_path + [interface, 'peer', 'PEER02', 'port', port]) + self.cli_set(base_path + [interface, 'peer', 'PEER02', 'allowed-ips', '10.205.212.11/32']) + self.cli_set(base_path + [interface, 'peer', 'PEER02', 'address', '192.0.2.2']) # Commit peers - self.session.commit() + self.cli_commit() self.assertTrue(os.path.isdir(f'/sys/class/net/{interface}')) # Delete second peer - self.session.delete(base_path + [interface, 'peer', 'PEER01']) - self.session.commit() + self.cli_delete(base_path + [interface, 'peer', 'PEER01']) + self.cli_commit() if __name__ == '__main__': unittest.main(verbosity=2) diff --git a/smoketest/scripts/cli/test_interfaces_wireless.py b/smoketest/scripts/cli/test_interfaces_wireless.py index 39e8cd5b8..4f539a23c 100755 --- a/smoketest/scripts/cli/test_interfaces_wireless.py +++ b/smoketest/scripts/cli/test_interfaces_wireless.py @@ -31,7 +31,7 @@ def get_config_value(interface, key): tmp = re.findall(f'{key}=+(.*)', tmp) return tmp[0] -class WirelessInterfaceTest(BasicInterfaceTest.BaseTest): +class WirelessInterfaceTest(BasicInterfaceTest.TestCase): @classmethod def setUpClass(cls): cls._test_ip = True @@ -47,6 +47,8 @@ class WirelessInterfaceTest(BasicInterfaceTest.BaseTest): 'type access-point', 'address 192.0.2.13/30', 'channel 0'], } cls._interfaces = list(cls._options) + # call base-classes classmethod + super(cls, cls).setUpClass() def test_wireless_add_single_ip_address(self): # derived method to check if member interfaces are enslaved properly @@ -67,12 +69,12 @@ class WirelessInterfaceTest(BasicInterfaceTest.BaseTest): interface = 'wlan0' ssid = 'ssid' - self.session.set(self._base_path + [interface, 'ssid', ssid]) - self.session.set(self._base_path + [interface, 'country-code', 'se']) - self.session.set(self._base_path + [interface, 'type', 'access-point']) + self.cli_set(self._base_path + [interface, 'ssid', ssid]) + self.cli_set(self._base_path + [interface, 'country-code', 'se']) + self.cli_set(self._base_path + [interface, 'type', 'access-point']) # auto-powersave is special - self.session.set(self._base_path + [interface, 'capabilities', 'ht', 'auto-powersave']) + self.cli_set(self._base_path + [interface, 'capabilities', 'ht', 'auto-powersave']) ht_opt = { # VyOS CLI option hostapd - ht_capab setting @@ -88,7 +90,7 @@ class WirelessInterfaceTest(BasicInterfaceTest.BaseTest): 'smps static' : '[SMPS-STATIC]', } for key in ht_opt: - self.session.set(self._base_path + [interface, 'capabilities', 'ht'] + key.split()) + self.cli_set(self._base_path + [interface, 'capabilities', 'ht'] + key.split()) vht_opt = { # VyOS CLI option hostapd - ht_capab setting @@ -105,9 +107,9 @@ class WirelessInterfaceTest(BasicInterfaceTest.BaseTest): 'short-gi 160' : '[SHORT-GI-160]', } for key in vht_opt: - self.session.set(self._base_path + [interface, 'capabilities', 'vht'] + key.split()) + self.cli_set(self._base_path + [interface, 'capabilities', 'vht'] + key.split()) - self.session.commit() + self.cli_commit() # # Validate Config @@ -148,29 +150,29 @@ class WirelessInterfaceTest(BasicInterfaceTest.BaseTest): mode = 'n' country = 'de' - self.session.set(self._base_path + [interface, 'physical-device', phy]) - self.session.set(self._base_path + [interface, 'type', 'access-point']) - self.session.set(self._base_path + [interface, 'mode', mode]) + self.cli_set(self._base_path + [interface, 'physical-device', phy]) + self.cli_set(self._base_path + [interface, 'type', 'access-point']) + self.cli_set(self._base_path + [interface, 'mode', mode]) # SSID must be set with self.assertRaises(ConfigSessionError): - self.session.commit() - self.session.set(self._base_path + [interface, 'ssid', ssid]) + self.cli_commit() + self.cli_set(self._base_path + [interface, 'ssid', ssid]) # Channel must be set with self.assertRaises(ConfigSessionError): - self.session.commit() - self.session.set(self._base_path + [interface, 'channel', channel]) + self.cli_commit() + self.cli_set(self._base_path + [interface, 'channel', channel]) # Country-Code must be set with self.assertRaises(ConfigSessionError): - self.session.commit() - self.session.set(self._base_path + [interface, 'country-code', country]) + self.cli_commit() + self.cli_set(self._base_path + [interface, 'country-code', country]) - self.session.set(self._base_path + [interface, 'security', 'wpa', 'mode', 'wpa2']) - self.session.set(self._base_path + [interface, 'security', 'wpa', 'passphrase', wpa_key]) + self.cli_set(self._base_path + [interface, 'security', 'wpa', 'mode', 'wpa2']) + self.cli_set(self._base_path + [interface, 'security', 'wpa', 'passphrase', wpa_key]) - self.session.commit() + self.cli_commit() # # Validate Config @@ -211,13 +213,13 @@ class WirelessInterfaceTest(BasicInterfaceTest.BaseTest): # We need a bridge where we can hook our access-point interface to bridge_path = ['interfaces', 'bridge', bridge] - self.session.set(bridge_path + ['member', 'interface', interface]) + self.cli_set(bridge_path + ['member', 'interface', interface]) - self.session.set(self._base_path + [interface, 'ssid', ssid]) - self.session.set(self._base_path + [interface, 'country-code', 'se']) - self.session.set(self._base_path + [interface, 'type', 'access-point']) + self.cli_set(self._base_path + [interface, 'ssid', ssid]) + self.cli_set(self._base_path + [interface, 'country-code', 'se']) + self.cli_set(self._base_path + [interface, 'type', 'access-point']) - self.session.commit() + self.cli_commit() # Check for running process self.assertTrue(process_named_running('hostapd')) @@ -228,9 +230,9 @@ class WirelessInterfaceTest(BasicInterfaceTest.BaseTest): self.assertIn(interface, bridge_members) - self.session.delete(bridge_path) - self.session.delete(self._base_path) - self.session.commit() + self.cli_delete(bridge_path) + self.cli_delete(self._base_path) + self.cli_commit() if __name__ == '__main__': check_kmod('mac80211_hwsim') diff --git a/smoketest/scripts/cli/test_nat.py b/smoketest/scripts/cli/test_nat.py index b5702d691..0706f234e 100755 --- a/smoketest/scripts/cli/test_nat.py +++ b/smoketest/scripts/cli/test_nat.py @@ -19,6 +19,7 @@ import jmespath import json import unittest +from base_vyostest_shim import VyOSUnitTestSHIM from vyos.configsession import ConfigSession from vyos.configsession import ConfigSessionError from vyos.util import cmd @@ -28,16 +29,15 @@ base_path = ['nat'] src_path = base_path + ['source'] dst_path = base_path + ['destination'] -class TestNAT(unittest.TestCase): +class TestNAT(VyOSUnitTestSHIM.TestCase): def setUp(self): # ensure we can also run this test on a live system - so lets clean # out the current configuration :) - self.session = ConfigSession(os.getpid()) - self.session.delete(base_path) + self.cli_delete(base_path) def tearDown(self): - self.session.delete(base_path) - self.session.commit() + self.cli_delete(base_path) + self.cli_commit() def test_snat(self): rules = ['100', '110', '120', '130', '200', '210', '220', '230'] @@ -48,15 +48,15 @@ class TestNAT(unittest.TestCase): # depending of rule order we check either for source address for NAT # or configured destination address for NAT if int(rule) < 200: - self.session.set(src_path + ['rule', rule, 'source', 'address', network]) - self.session.set(src_path + ['rule', rule, 'outbound-interface', outbound_iface_100]) - self.session.set(src_path + ['rule', rule, 'translation', 'address', 'masquerade']) + self.cli_set(src_path + ['rule', rule, 'source', 'address', network]) + self.cli_set(src_path + ['rule', rule, 'outbound-interface', outbound_iface_100]) + self.cli_set(src_path + ['rule', rule, 'translation', 'address', 'masquerade']) else: - self.session.set(src_path + ['rule', rule, 'destination', 'address', network]) - self.session.set(src_path + ['rule', rule, 'outbound-interface', outbound_iface_200]) - self.session.set(src_path + ['rule', rule, 'exclude']) + self.cli_set(src_path + ['rule', rule, 'destination', 'address', network]) + self.cli_set(src_path + ['rule', rule, 'outbound-interface', outbound_iface_200]) + self.cli_set(src_path + ['rule', rule, 'exclude']) - self.session.commit() + self.cli_commit() tmp = cmd('sudo nft -j list table nat') data_json = jmespath.search('nftables[?rule].rule[?chain]', json.loads(tmp)) @@ -98,17 +98,17 @@ class TestNAT(unittest.TestCase): for rule in rules: port = f'10{rule}' - self.session.set(dst_path + ['rule', rule, 'source', 'port', port]) - self.session.set(dst_path + ['rule', rule, 'translation', 'address', '192.0.2.1']) - self.session.set(dst_path + ['rule', rule, 'translation', 'port', port]) + self.cli_set(dst_path + ['rule', rule, 'source', 'port', port]) + self.cli_set(dst_path + ['rule', rule, 'translation', 'address', '192.0.2.1']) + self.cli_set(dst_path + ['rule', rule, 'translation', 'port', port]) if int(rule) < 200: - self.session.set(dst_path + ['rule', rule, 'protocol', inbound_proto_100]) - self.session.set(dst_path + ['rule', rule, 'inbound-interface', inbound_iface_100]) + self.cli_set(dst_path + ['rule', rule, 'protocol', inbound_proto_100]) + self.cli_set(dst_path + ['rule', rule, 'inbound-interface', inbound_iface_100]) else: - self.session.set(dst_path + ['rule', rule, 'protocol', inbound_proto_200]) - self.session.set(dst_path + ['rule', rule, 'inbound-interface', inbound_iface_200]) + self.cli_set(dst_path + ['rule', rule, 'protocol', inbound_proto_200]) + self.cli_set(dst_path + ['rule', rule, 'inbound-interface', inbound_iface_200]) - self.session.commit() + self.cli_commit() tmp = cmd('sudo nft -j list table nat') data_json = jmespath.search('nftables[?rule].rule[?chain]', json.loads(tmp)) @@ -141,31 +141,31 @@ class TestNAT(unittest.TestCase): def test_snat_required_translation_address(self): # T2813: Ensure translation address is specified rule = '5' - self.session.set(src_path + ['rule', rule, 'source', 'address', '192.0.2.0/24']) + self.cli_set(src_path + ['rule', rule, 'source', 'address', '192.0.2.0/24']) # check validate() - outbound-interface must be defined with self.assertRaises(ConfigSessionError): - self.session.commit() - self.session.set(src_path + ['rule', rule, 'outbound-interface', 'eth0']) + self.cli_commit() + self.cli_set(src_path + ['rule', rule, 'outbound-interface', 'eth0']) # check validate() - translation address not specified with self.assertRaises(ConfigSessionError): - self.session.commit() + self.cli_commit() - self.session.set(src_path + ['rule', rule, 'translation', 'address', 'masquerade']) - self.session.commit() + self.cli_set(src_path + ['rule', rule, 'translation', 'address', 'masquerade']) + self.cli_commit() def test_dnat_negated_addresses(self): # T3186: negated addresses are not accepted by nftables rule = '1000' - self.session.set(dst_path + ['rule', rule, 'destination', 'address', '!192.0.2.1']) - self.session.set(dst_path + ['rule', rule, 'destination', 'port', '53']) - self.session.set(dst_path + ['rule', rule, 'inbound-interface', 'eth0']) - self.session.set(dst_path + ['rule', rule, 'protocol', 'tcp_udp']) - self.session.set(dst_path + ['rule', rule, 'source', 'address', '!192.0.2.1']) - self.session.set(dst_path + ['rule', rule, 'translation', 'address', '192.0.2.1']) - self.session.set(dst_path + ['rule', rule, 'translation', 'port', '53']) - self.session.commit() + self.cli_set(dst_path + ['rule', rule, 'destination', 'address', '!192.0.2.1']) + self.cli_set(dst_path + ['rule', rule, 'destination', 'port', '53']) + self.cli_set(dst_path + ['rule', rule, 'inbound-interface', 'eth0']) + self.cli_set(dst_path + ['rule', rule, 'protocol', 'tcp_udp']) + self.cli_set(dst_path + ['rule', rule, 'source', 'address', '!192.0.2.1']) + self.cli_set(dst_path + ['rule', rule, 'translation', 'address', '192.0.2.1']) + self.cli_set(dst_path + ['rule', rule, 'translation', 'port', '53']) + self.cli_commit() def test_nat_no_rules(self): # T3206: deleting all rules but keep the direction 'destination' or @@ -173,9 +173,9 @@ class TestNAT(unittest.TestCase): # # Test that both 'nat destination' and 'nat source' nodes can exist # without any rule - self.session.set(src_path) - self.session.set(dst_path) - self.session.commit() + self.cli_set(src_path) + self.cli_set(dst_path) + self.cli_commit() if __name__ == '__main__': diff --git a/smoketest/scripts/cli/test_policy.py b/smoketest/scripts/cli/test_policy.py index 1a18c8191..cdd2ad820 100755 --- a/smoketest/scripts/cli/test_policy.py +++ b/smoketest/scripts/cli/test_policy.py @@ -14,26 +14,20 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . -import os import unittest -from vyos.util import cmd +from base_vyostest_shim import VyOSUnitTestSHIM + from vyos.configsession import ConfigSession from vyos.configsession import ConfigSessionError +from vyos.util import cmd base_path = ['policy'] -def getFRRconfig(section): - return cmd(f'vtysh -c "show run" | sed -n "/^{section}/,/^!/p"') - -class TestPolicy(unittest.TestCase): - def setUp(self): - self.session = ConfigSession(os.getpid()) - +class TestPolicy(VyOSUnitTestSHIM.TestCase): def tearDown(self): - self.session.delete(base_path) - self.session.commit() - del self.session + self.cli_delete(base_path) + self.cli_commit() def test_access_list(self): acls = { @@ -101,25 +95,25 @@ class TestPolicy(unittest.TestCase): for acl, acl_config in acls.items(): path = base_path + ['access-list', acl] - self.session.set(path + ['description', f'VyOS-ACL-{acl}']) + self.cli_set(path + ['description', f'VyOS-ACL-{acl}']) if 'rule' not in acl_config: continue for rule, rule_config in acl_config['rule'].items(): - self.session.set(path + ['rule', rule, 'action', rule_config['action']]) + self.cli_set(path + ['rule', rule, 'action', rule_config['action']]) for direction in ['source', 'destination']: if direction in rule_config: if 'any' in rule_config[direction]: - self.session.set(path + ['rule', rule, direction, 'any']) + self.cli_set(path + ['rule', rule, direction, 'any']) if 'host' in rule_config[direction]: - self.session.set(path + ['rule', rule, direction, 'host', rule_config[direction]['host']]) + self.cli_set(path + ['rule', rule, direction, 'host', rule_config[direction]['host']]) if 'network' in rule_config[direction]: - self.session.set(path + ['rule', rule, direction, 'network', rule_config[direction]['network']]) - self.session.set(path + ['rule', rule, direction, 'inverse-mask', rule_config[direction]['inverse-mask']]) + self.cli_set(path + ['rule', rule, direction, 'network', rule_config[direction]['network']]) + self.cli_set(path + ['rule', rule, direction, 'inverse-mask', rule_config[direction]['inverse-mask']]) - self.session.commit() + self.cli_commit() - config = getFRRconfig('access-list') + config = self.getFRRconfig('access-list', end='') for acl, acl_config in acls.items(): seq = '5' for rule, rule_config in acl_config['rule'].items(): @@ -190,24 +184,24 @@ class TestPolicy(unittest.TestCase): for acl, acl_config in acls.items(): path = base_path + ['access-list6', acl] - self.session.set(path + ['description', f'VyOS-ACL-{acl}']) + self.cli_set(path + ['description', f'VyOS-ACL-{acl}']) if 'rule' not in acl_config: continue for rule, rule_config in acl_config['rule'].items(): - self.session.set(path + ['rule', rule, 'action', rule_config['action']]) + self.cli_set(path + ['rule', rule, 'action', rule_config['action']]) for direction in ['source', 'destination']: if direction in rule_config: if 'any' in rule_config[direction]: - self.session.set(path + ['rule', rule, direction, 'any']) + self.cli_set(path + ['rule', rule, direction, 'any']) if 'network' in rule_config[direction]: - self.session.set(path + ['rule', rule, direction, 'network', rule_config[direction]['network']]) + self.cli_set(path + ['rule', rule, direction, 'network', rule_config[direction]['network']]) if 'exact-match' in rule_config[direction]: - self.session.set(path + ['rule', rule, direction, 'exact-match']) + self.cli_set(path + ['rule', rule, direction, 'exact-match']) - self.session.commit() + self.cli_commit() - config = getFRRconfig('ipv6 access-list') + config = self.getFRRconfig('ipv6 access-list', end='') for acl, acl_config in acls.items(): seq = '5' for rule, rule_config in acl_config['rule'].items(): @@ -295,19 +289,19 @@ class TestPolicy(unittest.TestCase): for as_path, as_path_config in test_data.items(): path = base_path + ['as-path-list', as_path] - self.session.set(path + ['description', f'VyOS-ASPATH-{as_path}']) + self.cli_set(path + ['description', f'VyOS-ASPATH-{as_path}']) if 'rule' not in as_path_config: continue for rule, rule_config in as_path_config['rule'].items(): if 'action' in rule_config: - self.session.set(path + ['rule', rule, 'action', rule_config['action']]) + self.cli_set(path + ['rule', rule, 'action', rule_config['action']]) if 'regex' in rule_config: - self.session.set(path + ['rule', rule, 'regex', rule_config['regex']]) + self.cli_set(path + ['rule', rule, 'regex', rule_config['regex']]) - self.session.commit() + self.cli_commit() - config = getFRRconfig('bgp as-path access-list') + config = self.getFRRconfig('bgp as-path access-list', end='') for as_path, as_path_config in test_data.items(): if 'rule' not in as_path_config: continue @@ -353,19 +347,19 @@ class TestPolicy(unittest.TestCase): for comm_list, comm_list_config in test_data.items(): path = base_path + ['community-list', comm_list] - self.session.set(path + ['description', f'VyOS-COMM-{comm_list}']) + self.cli_set(path + ['description', f'VyOS-COMM-{comm_list}']) if 'rule' not in comm_list_config: continue for rule, rule_config in comm_list_config['rule'].items(): if 'action' in rule_config: - self.session.set(path + ['rule', rule, 'action', rule_config['action']]) + self.cli_set(path + ['rule', rule, 'action', rule_config['action']]) if 'regex' in rule_config: - self.session.set(path + ['rule', rule, 'regex', rule_config['regex']]) + self.cli_set(path + ['rule', rule, 'regex', rule_config['regex']]) - self.session.commit() + self.cli_commit() - config = getFRRconfig('bgp community-list') + config = self.getFRRconfig('bgp community-list', end='') for comm_list, comm_list_config in test_data.items(): if 'rule' not in comm_list_config: continue @@ -411,19 +405,19 @@ class TestPolicy(unittest.TestCase): for comm_list, comm_list_config in test_data.items(): path = base_path + ['extcommunity-list', comm_list] - self.session.set(path + ['description', f'VyOS-EXTCOMM-{comm_list}']) + self.cli_set(path + ['description', f'VyOS-EXTCOMM-{comm_list}']) if 'rule' not in comm_list_config: continue for rule, rule_config in comm_list_config['rule'].items(): if 'action' in rule_config: - self.session.set(path + ['rule', rule, 'action', rule_config['action']]) + self.cli_set(path + ['rule', rule, 'action', rule_config['action']]) if 'regex' in rule_config: - self.session.set(path + ['rule', rule, 'regex', rule_config['regex']]) + self.cli_set(path + ['rule', rule, 'regex', rule_config['regex']]) - self.session.commit() + self.cli_commit() - config = getFRRconfig('bgp extcommunity-list') + config = self.getFRRconfig('bgp extcommunity-list', end='') for comm_list, comm_list_config in test_data.items(): if 'rule' not in comm_list_config: continue @@ -476,19 +470,19 @@ class TestPolicy(unittest.TestCase): for comm_list, comm_list_config in test_data.items(): path = base_path + ['large-community-list', comm_list] - self.session.set(path + ['description', f'VyOS-LARGECOMM-{comm_list}']) + self.cli_set(path + ['description', f'VyOS-LARGECOMM-{comm_list}']) if 'rule' not in comm_list_config: continue for rule, rule_config in comm_list_config['rule'].items(): if 'action' in rule_config: - self.session.set(path + ['rule', rule, 'action', rule_config['action']]) + self.cli_set(path + ['rule', rule, 'action', rule_config['action']]) if 'regex' in rule_config: - self.session.set(path + ['rule', rule, 'regex', rule_config['regex']]) + self.cli_set(path + ['rule', rule, 'regex', rule_config['regex']]) - self.session.commit() + self.cli_commit() - config = getFRRconfig('bgp large-community-list') + config = self.getFRRconfig('bgp large-community-list', end='') for comm_list, comm_list_config in test_data.items(): if 'rule' not in comm_list_config: continue @@ -550,23 +544,23 @@ class TestPolicy(unittest.TestCase): for prefix_list, prefix_list_config in test_data.items(): path = base_path + ['prefix-list', prefix_list] - self.session.set(path + ['description', f'VyOS-PFX-LIST-{prefix_list}']) + self.cli_set(path + ['description', f'VyOS-PFX-LIST-{prefix_list}']) if 'rule' not in prefix_list_config: continue for rule, rule_config in prefix_list_config['rule'].items(): if 'action' in rule_config: - self.session.set(path + ['rule', rule, 'action', rule_config['action']]) + self.cli_set(path + ['rule', rule, 'action', rule_config['action']]) if 'prefix' in rule_config: - self.session.set(path + ['rule', rule, 'prefix', rule_config['prefix']]) + self.cli_set(path + ['rule', rule, 'prefix', rule_config['prefix']]) if 'ge' in rule_config: - self.session.set(path + ['rule', rule, 'ge', rule_config['ge']]) + self.cli_set(path + ['rule', rule, 'ge', rule_config['ge']]) if 'le' in rule_config: - self.session.set(path + ['rule', rule, 'le', rule_config['le']]) + self.cli_set(path + ['rule', rule, 'le', rule_config['le']]) - self.session.commit() + self.cli_commit() - config = getFRRconfig('ip prefix-list') + config = self.getFRRconfig('ip prefix-list', end='') for prefix_list, prefix_list_config in test_data.items(): if 'rule' not in prefix_list_config: continue @@ -633,23 +627,23 @@ class TestPolicy(unittest.TestCase): for prefix_list, prefix_list_config in test_data.items(): path = base_path + ['prefix-list6', prefix_list] - self.session.set(path + ['description', f'VyOS-PFX-LIST-{prefix_list}']) + self.cli_set(path + ['description', f'VyOS-PFX-LIST-{prefix_list}']) if 'rule' not in prefix_list_config: continue for rule, rule_config in prefix_list_config['rule'].items(): if 'action' in rule_config: - self.session.set(path + ['rule', rule, 'action', rule_config['action']]) + self.cli_set(path + ['rule', rule, 'action', rule_config['action']]) if 'prefix' in rule_config: - self.session.set(path + ['rule', rule, 'prefix', rule_config['prefix']]) + self.cli_set(path + ['rule', rule, 'prefix', rule_config['prefix']]) if 'ge' in rule_config: - self.session.set(path + ['rule', rule, 'ge', rule_config['ge']]) + self.cli_set(path + ['rule', rule, 'ge', rule_config['ge']]) if 'le' in rule_config: - self.session.set(path + ['rule', rule, 'le', rule_config['le']]) + self.cli_set(path + ['rule', rule, 'le', rule_config['le']]) - self.session.commit() + self.cli_commit() - config = getFRRconfig('ipv6 prefix-list') + config = self.getFRRconfig('ipv6 prefix-list', end='') for prefix_list, prefix_list_config in test_data.items(): if 'rule' not in prefix_list_config: continue @@ -671,5 +665,32 @@ class TestPolicy(unittest.TestCase): self.assertIn(tmp, config) + + # Test set table for some sources + def test_table_id(self): + path = base_path + ['local-route'] + + sources = ['203.0.113.1', '203.0.113.2'] + rule = '50' + table = '23' + for src in sources: + self.cli_set(path + ['rule', rule, 'set', 'table', table]) + self.cli_set(path + ['rule', rule, 'source', src]) + + self.cli_commit() + + # Check generated configuration + + # Expected values + original = """ + 50: from 203.0.113.1 lookup 23 + 50: from 203.0.113.2 lookup 23 + """ + tmp = cmd('ip rule show prio 50') + original = original.split() + tmp = tmp.split() + + self.assertEqual(tmp, original) + if __name__ == '__main__': unittest.main(verbosity=2) diff --git a/smoketest/scripts/cli/test_policy_local-route.py b/smoketest/scripts/cli/test_policy_local-route.py index de1882a65..c742a930b 100755 --- a/smoketest/scripts/cli/test_policy_local-route.py +++ b/smoketest/scripts/cli/test_policy_local-route.py @@ -17,21 +17,24 @@ import os import unittest +from base_vyostest_shim import VyOSUnitTestSHIM + from vyos.configsession import ConfigSession from vyos.configsession import ConfigSessionError from vyos.util import cmd from vyos.util import process_named_running -class PolicyLocalRouteTest(unittest.TestCase): - def setUp(self): - self.session = ConfigSession(os.getpid()) - self._sources = ['203.0.113.1', '203.0.113.2'] +class PolicyLocalRouteTest(VyOSUnitTestSHIM.TestCase): + @classmethod + def setUpClass(cls): + cls._sources = ['203.0.113.1', '203.0.113.2'] + # call base-classes classmethod + super(cls, cls).setUpClass() def tearDown(self): # Delete all policies - self.session.delete(['policy', 'local-route']) - self.session.commit() - del self.session + self.cli_delete(['policy', 'local-route']) + self.cli_commit() # Test set table for some sources def test_table_id(self): @@ -39,10 +42,10 @@ class PolicyLocalRouteTest(unittest.TestCase): rule = '50' table = '23' for src in self._sources: - self.session.set(base + ['rule', rule, 'set', 'table', table]) - self.session.set(base + ['rule', rule, 'source', src]) + self.cli_set(base + ['rule', rule, 'set', 'table', table]) + self.cli_set(base + ['rule', rule, 'source', src]) - self.session.commit() + self.cli_commit() # Check generated configuration diff --git a/smoketest/scripts/cli/test_protocols_bfd.py b/smoketest/scripts/cli/test_protocols_bfd.py index 7f17c12ff..0c4ed86d7 100755 --- a/smoketest/scripts/cli/test_protocols_bfd.py +++ b/smoketest/scripts/cli/test_protocols_bfd.py @@ -17,6 +17,8 @@ import os import unittest +from base_vyostest_shim import VyOSUnitTestSHIM + from vyos.configsession import ConfigSession from vyos.configsession import ConfigSessionError from vyos.util import cmd @@ -58,17 +60,15 @@ def getFRRconfig(): def getBFDPeerconfig(peer): return cmd(f'vtysh -c "show run" | sed -n "/^ {peer}/,/^!/p"') -class TestProtocolsBFD(unittest.TestCase): +class TestProtocolsBFD(VyOSUnitTestSHIM.TestCase): def setUp(self): - self.session = ConfigSession(os.getpid()) - self.session.set(['interfaces', 'dummy', dum_if, 'address', '192.0.2.1/24']) - self.session.set(['interfaces', 'dummy', dum_if, 'address', '2001:db8::1/64']) + self.cli_set(['interfaces', 'dummy', dum_if, 'address', '192.0.2.1/24']) + self.cli_set(['interfaces', 'dummy', dum_if, 'address', '2001:db8::1/64']) def tearDown(self): - self.session.delete(['interfaces', 'dummy', dum_if]) - self.session.delete(base_path) - self.session.commit() - del self.session + self.cli_delete(['interfaces', 'dummy', dum_if]) + self.cli_delete(base_path) + self.cli_commit() # Check for running process self.assertTrue(process_named_running(PROCESS_NAME)) @@ -76,26 +76,26 @@ class TestProtocolsBFD(unittest.TestCase): def test_bfd_simple(self): for peer, peer_config in neighbor_config.items(): if 'echo_mode' in peer_config: - self.session.set(base_path + ['peer', peer, 'echo-mode']) + self.cli_set(base_path + ['peer', peer, 'echo-mode']) if 'intv_echo' in peer_config: - self.session.set(base_path + ['peer', peer, 'interval', 'echo-interval', peer_config["intv_echo"]]) + self.cli_set(base_path + ['peer', peer, 'interval', 'echo-interval', peer_config["intv_echo"]]) if 'intv_mult' in peer_config: - self.session.set(base_path + ['peer', peer, 'interval', 'multiplier', peer_config["intv_mult"]]) + self.cli_set(base_path + ['peer', peer, 'interval', 'multiplier', peer_config["intv_mult"]]) if 'intv_rx' in peer_config: - self.session.set(base_path + ['peer', peer, 'interval', 'receive', peer_config["intv_rx"]]) + self.cli_set(base_path + ['peer', peer, 'interval', 'receive', peer_config["intv_rx"]]) if 'intv_tx' in peer_config: - self.session.set(base_path + ['peer', peer, 'interval', 'transmit', peer_config["intv_tx"]]) + self.cli_set(base_path + ['peer', peer, 'interval', 'transmit', peer_config["intv_tx"]]) if 'multihop' in peer_config: - self.session.set(base_path + ['peer', peer, 'multihop']) + self.cli_set(base_path + ['peer', peer, 'multihop']) if 'shutdown' in peer_config: - self.session.set(base_path + ['peer', peer, 'shutdown']) + self.cli_set(base_path + ['peer', peer, 'shutdown']) if 'source_addr' in peer_config: - self.session.set(base_path + ['peer', peer, 'source', 'address', peer_config["source_addr"]]) + self.cli_set(base_path + ['peer', peer, 'source', 'address', peer_config["source_addr"]]) if 'source_intf' in peer_config: - self.session.set(base_path + ['peer', peer, 'source', 'interface', peer_config["source_intf"]]) + self.cli_set(base_path + ['peer', peer, 'source', 'interface', peer_config["source_intf"]]) # commit changes - self.session.commit() + self.cli_commit() # Verify FRR bgpd configuration frrconfig = getFRRconfig() diff --git a/smoketest/scripts/cli/test_protocols_bgp.py b/smoketest/scripts/cli/test_protocols_bgp.py index 62ef40282..b261e4164 100755 --- a/smoketest/scripts/cli/test_protocols_bgp.py +++ b/smoketest/scripts/cli/test_protocols_bgp.py @@ -17,6 +17,8 @@ import os import unittest +from base_vyostest_shim import VyOSUnitTestSHIM + from vyos.configsession import ConfigSession from vyos.configsession import ConfigSessionError from vyos.util import cmd @@ -87,14 +89,10 @@ peer_group_config = { def getFRRBGPconfig(): return cmd(f'vtysh -c "show run" | sed -n "/router bgp {ASN}/,/^!/p"') -class TestProtocolsBGP(unittest.TestCase): - def setUp(self): - self.session = ConfigSession(os.getpid()) - +class TestProtocolsBGP(VyOSUnitTestSHIM.TestCase): def tearDown(self): - self.session.delete(base_path) - self.session.commit() - del self.session + self.cli_delete(base_path) + self.cli_commit() def verify_frr_config(self, peer, peer_config, frrconfig): # recurring patterns to verify for both a simple neighbor and a peer-group @@ -129,15 +127,15 @@ class TestProtocolsBGP(unittest.TestCase): router_id = '127.0.0.1' local_pref = '500' - self.session.set(base_path + ['parameters', 'router-id', router_id]) - self.session.set(base_path + ['parameters', 'log-neighbor-changes']) + self.cli_set(base_path + ['parameters', 'router-id', router_id]) + self.cli_set(base_path + ['parameters', 'log-neighbor-changes']) # Default local preference (higher=more preferred) - self.session.set(base_path + ['parameters', 'default', 'local-pref', local_pref]) + self.cli_set(base_path + ['parameters', 'default', 'local-pref', local_pref]) # Deactivate IPv4 unicast for a peer by default - self.session.set(base_path + ['parameters', 'default', 'no-ipv4-unicast']) + self.cli_set(base_path + ['parameters', 'default', 'no-ipv4-unicast']) # commit changes - self.session.commit() + self.cli_commit() # Verify FRR bgpd configuration frrconfig = getFRRBGPconfig() @@ -155,40 +153,40 @@ class TestProtocolsBGP(unittest.TestCase): # also available to a peer-group! for neighbor, config in neighbor_config.items(): if 'adv_interv' in config: - self.session.set(base_path + ['neighbor', neighbor, 'advertisement-interval', config["adv_interv"]]) + self.cli_set(base_path + ['neighbor', neighbor, 'advertisement-interval', config["adv_interv"]]) if 'cap_dynamic' in config: - self.session.set(base_path + ['neighbor', neighbor, 'capability', 'dynamic']) + self.cli_set(base_path + ['neighbor', neighbor, 'capability', 'dynamic']) if 'cap_ext_next' in config: - self.session.set(base_path + ['neighbor', neighbor, 'capability', 'extended-nexthop']) + self.cli_set(base_path + ['neighbor', neighbor, 'capability', 'extended-nexthop']) if 'description' in config: - self.session.set(base_path + ['neighbor', neighbor, 'description', config["description"]]) + self.cli_set(base_path + ['neighbor', neighbor, 'description', config["description"]]) if 'no_cap_nego' in config: - self.session.set(base_path + ['neighbor', neighbor, 'disable-capability-negotiation']) + self.cli_set(base_path + ['neighbor', neighbor, 'disable-capability-negotiation']) if 'multi_hop' in config: - self.session.set(base_path + ['neighbor', neighbor, 'ebgp-multihop', config["multi_hop"]]) + self.cli_set(base_path + ['neighbor', neighbor, 'ebgp-multihop', config["multi_hop"]]) if 'local_as' in config: - self.session.set(base_path + ['neighbor', neighbor, 'local-as', config["local_as"]]) + self.cli_set(base_path + ['neighbor', neighbor, 'local-as', config["local_as"]]) if 'cap_over' in config: - self.session.set(base_path + ['neighbor', neighbor, 'override-capability']) + self.cli_set(base_path + ['neighbor', neighbor, 'override-capability']) if 'passive' in config: - self.session.set(base_path + ['neighbor', neighbor, 'passive']) + self.cli_set(base_path + ['neighbor', neighbor, 'passive']) if 'password' in config: - self.session.set(base_path + ['neighbor', neighbor, 'password', config["password"]]) + self.cli_set(base_path + ['neighbor', neighbor, 'password', config["password"]]) if 'port' in config: - self.session.set(base_path + ['neighbor', neighbor, 'port', config["port"]]) + self.cli_set(base_path + ['neighbor', neighbor, 'port', config["port"]]) if 'remote_as' in config: - self.session.set(base_path + ['neighbor', neighbor, 'remote-as', config["remote_as"]]) + self.cli_set(base_path + ['neighbor', neighbor, 'remote-as', config["remote_as"]]) if 'cap_strict' in config: - self.session.set(base_path + ['neighbor', neighbor, 'strict-capability-match']) + self.cli_set(base_path + ['neighbor', neighbor, 'strict-capability-match']) if 'shutdown' in config: - self.session.set(base_path + ['neighbor', neighbor, 'shutdown']) + self.cli_set(base_path + ['neighbor', neighbor, 'shutdown']) if 'ttl_security' in config: - self.session.set(base_path + ['neighbor', neighbor, 'ttl-security', 'hops', config["ttl_security"]]) + self.cli_set(base_path + ['neighbor', neighbor, 'ttl-security', 'hops', config["ttl_security"]]) if 'update_src' in config: - self.session.set(base_path + ['neighbor', neighbor, 'update-source', config["update_src"]]) + self.cli_set(base_path + ['neighbor', neighbor, 'update-source', config["update_src"]]) # commit changes - self.session.commit() + self.cli_commit() # Verify FRR bgpd configuration frrconfig = getFRRBGPconfig() @@ -208,34 +206,34 @@ class TestProtocolsBGP(unittest.TestCase): # Test out individual peer-group configuration items for peer_group, config in peer_group_config.items(): if 'cap_dynamic' in config: - self.session.set(base_path + ['peer-group', peer_group, 'capability', 'dynamic']) + self.cli_set(base_path + ['peer-group', peer_group, 'capability', 'dynamic']) if 'cap_ext_next' in config: - self.session.set(base_path + ['peer-group', peer_group, 'capability', 'extended-nexthop']) + self.cli_set(base_path + ['peer-group', peer_group, 'capability', 'extended-nexthop']) if 'description' in config: - self.session.set(base_path + ['peer-group', peer_group, 'description', config["description"]]) + self.cli_set(base_path + ['peer-group', peer_group, 'description', config["description"]]) if 'no_cap_nego' in config: - self.session.set(base_path + ['peer-group', peer_group, 'disable-capability-negotiation']) + self.cli_set(base_path + ['peer-group', peer_group, 'disable-capability-negotiation']) if 'multi_hop' in config: - self.session.set(base_path + ['peer-group', peer_group, 'ebgp-multihop', config["multi_hop"]]) + self.cli_set(base_path + ['peer-group', peer_group, 'ebgp-multihop', config["multi_hop"]]) if 'local_as' in config: - self.session.set(base_path + ['peer-group', peer_group, 'local-as', config["local_as"]]) + self.cli_set(base_path + ['peer-group', peer_group, 'local-as', config["local_as"]]) if 'cap_over' in config: - self.session.set(base_path + ['peer-group', peer_group, 'override-capability']) + self.cli_set(base_path + ['peer-group', peer_group, 'override-capability']) if 'passive' in config: - self.session.set(base_path + ['peer-group', peer_group, 'passive']) + self.cli_set(base_path + ['peer-group', peer_group, 'passive']) if 'password' in config: - self.session.set(base_path + ['peer-group', peer_group, 'password', config["password"]]) + self.cli_set(base_path + ['peer-group', peer_group, 'password', config["password"]]) if 'remote_as' in config: - self.session.set(base_path + ['peer-group', peer_group, 'remote-as', config["remote_as"]]) + self.cli_set(base_path + ['peer-group', peer_group, 'remote-as', config["remote_as"]]) if 'shutdown' in config: - self.session.set(base_path + ['peer-group', peer_group, 'shutdown']) + self.cli_set(base_path + ['peer-group', peer_group, 'shutdown']) if 'ttl_security' in config: - self.session.set(base_path + ['peer-group', peer_group, 'ttl-security', 'hops', config["ttl_security"]]) + self.cli_set(base_path + ['peer-group', peer_group, 'ttl-security', 'hops', config["ttl_security"]]) if 'update_src' in config: - self.session.set(base_path + ['peer-group', peer_group, 'update-source', config["update_src"]]) + self.cli_set(base_path + ['peer-group', peer_group, 'update-source', config["update_src"]]) # commit changes - self.session.commit() + self.cli_commit() # Verify FRR bgpd configuration frrconfig = getFRRBGPconfig() @@ -262,21 +260,21 @@ class TestProtocolsBGP(unittest.TestCase): # We want to redistribute ... redistributes = ['connected', 'isis', 'kernel', 'ospf', 'rip', 'static'] for redistribute in redistributes: - self.session.set(base_path + ['address-family', 'ipv4-unicast', + self.cli_set(base_path + ['address-family', 'ipv4-unicast', 'redistribute', redistribute]) for network, network_config in networks.items(): - self.session.set(base_path + ['address-family', 'ipv4-unicast', + self.cli_set(base_path + ['address-family', 'ipv4-unicast', 'network', network]) if 'as_set' in network_config: - self.session.set(base_path + ['address-family', 'ipv4-unicast', + self.cli_set(base_path + ['address-family', 'ipv4-unicast', 'aggregate-address', network, 'as-set']) if 'summary_only' in network_config: - self.session.set(base_path + ['address-family', 'ipv4-unicast', + self.cli_set(base_path + ['address-family', 'ipv4-unicast', 'aggregate-address', network, 'summary-only']) # commit changes - self.session.commit() + self.cli_commit() # Verify FRR bgpd configuration frrconfig = getFRRBGPconfig() @@ -308,18 +306,18 @@ class TestProtocolsBGP(unittest.TestCase): # We want to redistribute ... redistributes = ['connected', 'kernel', 'ospfv3', 'ripng', 'static'] for redistribute in redistributes: - self.session.set(base_path + ['address-family', 'ipv6-unicast', + self.cli_set(base_path + ['address-family', 'ipv6-unicast', 'redistribute', redistribute]) for network, network_config in networks.items(): - self.session.set(base_path + ['address-family', 'ipv6-unicast', + self.cli_set(base_path + ['address-family', 'ipv6-unicast', 'network', network]) if 'summary_only' in network_config: - self.session.set(base_path + ['address-family', 'ipv6-unicast', + self.cli_set(base_path + ['address-family', 'ipv6-unicast', 'aggregate-address', network, 'summary-only']) # commit changes - self.session.commit() + self.cli_commit() # Verify FRR bgpd configuration frrconfig = getFRRBGPconfig() diff --git a/smoketest/scripts/cli/test_protocols_igmp-proxy.py b/smoketest/scripts/cli/test_protocols_igmp-proxy.py index 6aaad739d..1eaf21722 100755 --- a/smoketest/scripts/cli/test_protocols_igmp-proxy.py +++ b/smoketest/scripts/cli/test_protocols_igmp-proxy.py @@ -14,9 +14,10 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . -import os import unittest +from base_vyostest_shim import VyOSUnitTestSHIM + from vyos.configsession import ConfigSession from vyos.configsession import ConfigSessionError from vyos.util import read_file @@ -28,46 +29,44 @@ base_path = ['protocols', 'igmp-proxy'] upstream_if = 'eth1' downstream_if = 'eth2' -class TestProtocolsIGMPProxy(unittest.TestCase): +class TestProtocolsIGMPProxy(VyOSUnitTestSHIM.TestCase): def setUp(self): - self.session = ConfigSession(os.getpid()) - self.session.set(['interfaces', 'ethernet', upstream_if, 'address', '172.16.1.1/24']) + self.cli_set(['interfaces', 'ethernet', upstream_if, 'address', '172.16.1.1/24']) def tearDown(self): - self.session.delete(['interfaces', 'ethernet', upstream_if, 'address']) - self.session.delete(base_path) - self.session.commit() - del self.session + self.cli_delete(['interfaces', 'ethernet', upstream_if, 'address']) + self.cli_delete(base_path) + self.cli_commit() def test_igmpproxy(self): threshold = '20' altnet = '192.0.2.0/24' whitelist = '10.0.0.0/8' - self.session.set(base_path + ['disable-quickleave']) - self.session.set(base_path + ['interface', upstream_if, 'threshold', threshold]) - self.session.set(base_path + ['interface', upstream_if, 'alt-subnet', altnet]) - self.session.set(base_path + ['interface', upstream_if, 'whitelist', whitelist]) + self.cli_set(base_path + ['disable-quickleave']) + self.cli_set(base_path + ['interface', upstream_if, 'threshold', threshold]) + self.cli_set(base_path + ['interface', upstream_if, 'alt-subnet', altnet]) + self.cli_set(base_path + ['interface', upstream_if, 'whitelist', whitelist]) # Must define an upstream and at least 1 downstream interface! with self.assertRaises(ConfigSessionError): - self.session.commit() - self.session.set(base_path + ['interface', upstream_if, 'role', 'upstream']) + self.cli_commit() + self.cli_set(base_path + ['interface', upstream_if, 'role', 'upstream']) # Interface does not exist - self.session.set(base_path + ['interface', 'eth20', 'role', 'upstream']) + self.cli_set(base_path + ['interface', 'eth20', 'role', 'upstream']) with self.assertRaises(ConfigSessionError): - self.session.commit() - self.session.delete(base_path + ['interface', 'eth20']) + self.cli_commit() + self.cli_delete(base_path + ['interface', 'eth20']) # Only 1 upstream interface allowed - self.session.set(base_path + ['interface', downstream_if, 'role', 'upstream']) + self.cli_set(base_path + ['interface', downstream_if, 'role', 'upstream']) with self.assertRaises(ConfigSessionError): - self.session.commit() - self.session.set(base_path + ['interface', downstream_if, 'role', 'downstream']) + self.cli_commit() + self.cli_set(base_path + ['interface', downstream_if, 'role', 'downstream']) # commit changes - self.session.commit() + self.cli_commit() # Check generated configuration config = read_file(IGMP_PROXY_CONF) diff --git a/smoketest/scripts/cli/test_protocols_ospfv3.py b/smoketest/scripts/cli/test_protocols_ospfv3.py index 297d5d996..c4eb3fdd8 100755 --- a/smoketest/scripts/cli/test_protocols_ospfv3.py +++ b/smoketest/scripts/cli/test_protocols_ospfv3.py @@ -17,6 +17,8 @@ import os import unittest +from base_vyostest_shim import VyOSUnitTestSHIM + from vyos.configsession import ConfigSession from vyos.ifconfig import Section from vyos.util import cmd @@ -29,17 +31,13 @@ base_path = ['protocols', 'ospfv3'] def getFRROSPFconfig(): return cmd('vtysh -c "show run" | sed -n "/router ospf6/,/^!/p"') -class TestProtocolsOSPFv3(unittest.TestCase): - def setUp(self): - self.session = ConfigSession(os.getpid()) - +class TestProtocolsOSPFv3(VyOSUnitTestSHIM.TestCase): def tearDown(self): # Check for running process self.assertTrue(process_named_running(PROCESS_NAME)) - self.session.delete(base_path) - self.session.commit() - del self.session + self.cli_delete(base_path) + self.cli_commit() def test_ospfv3_01_basic(self): @@ -49,20 +47,20 @@ class TestProtocolsOSPFv3(unittest.TestCase): acl_name = 'foo-acl-100' router_id = '192.0.2.1' - self.session.set(['policy', 'access-list6', acl_name, 'rule', seq, 'action', 'permit']) - self.session.set(['policy', 'access-list6', acl_name, 'rule', seq, 'source', 'any']) + self.cli_set(['policy', 'access-list6', acl_name, 'rule', seq, 'action', 'permit']) + self.cli_set(['policy', 'access-list6', acl_name, 'rule', seq, 'source', 'any']) - self.session.set(base_path + ['parameters', 'router-id', router_id]) - self.session.set(base_path + ['area', area, 'range', prefix, 'advertise']) - self.session.set(base_path + ['area', area, 'export-list', acl_name]) - self.session.set(base_path + ['area', area, 'import-list', acl_name]) + self.cli_set(base_path + ['parameters', 'router-id', router_id]) + self.cli_set(base_path + ['area', area, 'range', prefix, 'advertise']) + self.cli_set(base_path + ['area', area, 'export-list', acl_name]) + self.cli_set(base_path + ['area', area, 'import-list', acl_name]) interfaces = Section.interfaces('ethernet') for interface in interfaces: - self.session.set(base_path + ['area', area, 'interface', interface]) + self.cli_set(base_path + ['area', area, 'interface', interface]) # commit changes - self.session.commit() + self.cli_commit() # Verify FRR ospfd configuration frrconfig = getFRROSPFconfig() @@ -75,7 +73,7 @@ class TestProtocolsOSPFv3(unittest.TestCase): for interface in interfaces: self.assertIn(f' interface {interface} area {area}', frrconfig) - self.session.delete(['policy', 'access-list6', acl_name]) + self.cli_delete(['policy', 'access-list6', acl_name]) def test_ospfv3_02_distance(self): @@ -84,13 +82,13 @@ class TestProtocolsOSPFv3(unittest.TestCase): dist_inter_area = '120' dist_intra_area = '130' - self.session.set(base_path + ['distance', 'global', dist_global]) - self.session.set(base_path + ['distance', 'ospfv3', 'external', dist_external]) - self.session.set(base_path + ['distance', 'ospfv3', 'inter-area', dist_inter_area]) - self.session.set(base_path + ['distance', 'ospfv3', 'intra-area', dist_intra_area]) + self.cli_set(base_path + ['distance', 'global', dist_global]) + self.cli_set(base_path + ['distance', 'ospfv3', 'external', dist_external]) + self.cli_set(base_path + ['distance', 'ospfv3', 'inter-area', dist_inter_area]) + self.cli_set(base_path + ['distance', 'ospfv3', 'intra-area', dist_intra_area]) # commit changes - self.session.commit() + self.cli_commit() # Verify FRR ospfd configuration frrconfig = getFRROSPFconfig() @@ -104,13 +102,13 @@ class TestProtocolsOSPFv3(unittest.TestCase): route_map_seq = '10' redistribute = ['bgp', 'connected', 'kernel', 'ripng', 'static'] - self.session.set(['policy', 'route-map', route_map, 'rule', route_map_seq, 'action', 'permit']) + self.cli_set(['policy', 'route-map', route_map, 'rule', route_map_seq, 'action', 'permit']) for protocol in redistribute: - self.session.set(base_path + ['redistribute', protocol, 'route-map', route_map]) + self.cli_set(base_path + ['redistribute', protocol, 'route-map', route_map]) # commit changes - self.session.commit() + self.cli_commit() # Verify FRR ospfd configuration frrconfig = getFRROSPFconfig() diff --git a/smoketest/scripts/cli/test_protocols_rip.py b/smoketest/scripts/cli/test_protocols_rip.py index 4b3674960..6f2028f2b 100755 --- a/smoketest/scripts/cli/test_protocols_rip.py +++ b/smoketest/scripts/cli/test_protocols_rip.py @@ -17,6 +17,8 @@ import os import unittest +from base_vyostest_shim import VyOSUnitTestSHIM + from vyos.configsession import ConfigSession from vyos.ifconfig import Section from vyos.util import cmd @@ -34,32 +36,29 @@ base_path = ['protocols', 'rip'] def getFRRconfig(): return cmd('vtysh -c "show run" | sed -n "/router rip/,/^!/p"') -class TestProtocolsRIP(unittest.TestCase): +class TestProtocolsRIP(VyOSUnitTestSHIM.TestCase): def setUp(self): - self.session = ConfigSession(os.getpid()) - - self.session.set(['policy', 'access-list', acl_in, 'rule', '10', 'action', 'permit']) - self.session.set(['policy', 'access-list', acl_in, 'rule', '10', 'source', 'any']) - self.session.set(['policy', 'access-list', acl_in, 'rule', '10', 'destination', 'any']) - self.session.set(['policy', 'access-list', acl_out, 'rule', '20', 'action', 'deny']) - self.session.set(['policy', 'access-list', acl_out, 'rule', '20', 'source', 'any']) - self.session.set(['policy', 'access-list', acl_out, 'rule', '20', 'destination', 'any']) - self.session.set(['policy', 'prefix-list', prefix_list_in, 'rule', '100', 'action', 'permit']) - self.session.set(['policy', 'prefix-list', prefix_list_in, 'rule', '100', 'prefix', '192.0.2.0/24']) - self.session.set(['policy', 'prefix-list', prefix_list_out, 'rule', '200', 'action', 'deny']) - self.session.set(['policy', 'prefix-list', prefix_list_out, 'rule', '200', 'prefix', '192.0.2.0/24']) - self.session.set(['policy', 'route-map', route_map, 'rule', '10', 'action', 'permit']) + self.cli_set(['policy', 'access-list', acl_in, 'rule', '10', 'action', 'permit']) + self.cli_set(['policy', 'access-list', acl_in, 'rule', '10', 'source', 'any']) + self.cli_set(['policy', 'access-list', acl_in, 'rule', '10', 'destination', 'any']) + self.cli_set(['policy', 'access-list', acl_out, 'rule', '20', 'action', 'deny']) + self.cli_set(['policy', 'access-list', acl_out, 'rule', '20', 'source', 'any']) + self.cli_set(['policy', 'access-list', acl_out, 'rule', '20', 'destination', 'any']) + self.cli_set(['policy', 'prefix-list', prefix_list_in, 'rule', '100', 'action', 'permit']) + self.cli_set(['policy', 'prefix-list', prefix_list_in, 'rule', '100', 'prefix', '192.0.2.0/24']) + self.cli_set(['policy', 'prefix-list', prefix_list_out, 'rule', '200', 'action', 'deny']) + self.cli_set(['policy', 'prefix-list', prefix_list_out, 'rule', '200', 'prefix', '192.0.2.0/24']) + self.cli_set(['policy', 'route-map', route_map, 'rule', '10', 'action', 'permit']) def tearDown(self): - self.session.delete(base_path) - self.session.delete(['policy', 'access-list', acl_in]) - self.session.delete(['policy', 'access-list', acl_out]) - self.session.delete(['policy', 'prefix-list', prefix_list_in]) - self.session.delete(['policy', 'prefix-list', prefix_list_out]) - self.session.delete(['policy', 'route-map', route_map]) + self.cli_delete(base_path) + self.cli_delete(['policy', 'access-list', acl_in]) + self.cli_delete(['policy', 'access-list', acl_out]) + self.cli_delete(['policy', 'prefix-list', prefix_list_in]) + self.cli_delete(['policy', 'prefix-list', prefix_list_out]) + self.cli_delete(['policy', 'route-map', route_map]) - self.session.commit() - del self.session + self.cli_commit() # Check for running process self.assertTrue(process_named_running(PROCESS_NAME)) @@ -76,34 +75,34 @@ class TestProtocolsRIP(unittest.TestCase): timer_timeout = '1000' timer_update = '90' - self.session.set(base_path + ['default-distance', distance]) - self.session.set(base_path + ['default-information', 'originate']) - self.session.set(base_path + ['default-metric', metric]) - self.session.set(base_path + ['distribute-list', 'access-list', 'in', acl_in]) - self.session.set(base_path + ['distribute-list', 'access-list', 'out', acl_out]) - self.session.set(base_path + ['distribute-list', 'prefix-list', 'in', prefix_list_in]) - self.session.set(base_path + ['distribute-list', 'prefix-list', 'out', prefix_list_out]) - self.session.set(base_path + ['passive-interface', 'default']) - self.session.set(base_path + ['timers', 'garbage-collection', timer_garbage]) - self.session.set(base_path + ['timers', 'timeout', timer_timeout]) - self.session.set(base_path + ['timers', 'update', timer_update]) + self.cli_set(base_path + ['default-distance', distance]) + self.cli_set(base_path + ['default-information', 'originate']) + self.cli_set(base_path + ['default-metric', metric]) + self.cli_set(base_path + ['distribute-list', 'access-list', 'in', acl_in]) + self.cli_set(base_path + ['distribute-list', 'access-list', 'out', acl_out]) + self.cli_set(base_path + ['distribute-list', 'prefix-list', 'in', prefix_list_in]) + self.cli_set(base_path + ['distribute-list', 'prefix-list', 'out', prefix_list_out]) + self.cli_set(base_path + ['passive-interface', 'default']) + self.cli_set(base_path + ['timers', 'garbage-collection', timer_garbage]) + self.cli_set(base_path + ['timers', 'timeout', timer_timeout]) + self.cli_set(base_path + ['timers', 'update', timer_update]) for interface in interfaces: - self.session.set(base_path + ['interface', interface]) - self.session.set(base_path + ['distribute-list', 'interface', interface, 'access-list', 'in', acl_in]) - self.session.set(base_path + ['distribute-list', 'interface', interface, 'access-list', 'out', acl_out]) + self.cli_set(base_path + ['interface', interface]) + self.cli_set(base_path + ['distribute-list', 'interface', interface, 'access-list', 'in', acl_in]) + self.cli_set(base_path + ['distribute-list', 'interface', interface, 'access-list', 'out', acl_out]) for neighbor in neighbors: - self.session.set(base_path + ['neighbor', neighbor]) + self.cli_set(base_path + ['neighbor', neighbor]) for network in networks: - self.session.set(base_path + ['network', network]) - self.session.set(base_path + ['network-distance', network, 'distance', network_distance]) - self.session.set(base_path + ['route', network]) + self.cli_set(base_path + ['network', network]) + self.cli_set(base_path + ['network-distance', network, 'distance', network_distance]) + self.cli_set(base_path + ['route', network]) for proto in redistribute: - self.session.set(base_path + ['redistribute', proto, 'metric', metric]) - self.session.set(base_path + ['redistribute', proto, 'route-map', route_map]) + self.cli_set(base_path + ['redistribute', proto, 'metric', metric]) + self.cli_set(base_path + ['redistribute', proto, 'route-map', route_map]) # commit changes - self.session.commit() + self.cli_commit() # Verify FRR ospfd configuration frrconfig = getFRRconfig() diff --git a/smoketest/scripts/cli/test_protocols_ripng.py b/smoketest/scripts/cli/test_protocols_ripng.py index 90cbaccd8..3380dc78b 100755 --- a/smoketest/scripts/cli/test_protocols_ripng.py +++ b/smoketest/scripts/cli/test_protocols_ripng.py @@ -14,12 +14,12 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . -import os import unittest +from base_vyostest_shim import VyOSUnitTestSHIM + from vyos.configsession import ConfigSession from vyos.ifconfig import Section -from vyos.util import cmd from vyos.util import process_named_running PROCESS_NAME = 'ripngd' @@ -31,33 +31,26 @@ route_map = 'FooBar123' base_path = ['protocols', 'ripng'] -def getFRRconfig(): - return cmd('vtysh -c "show run" | sed -n "/router ripng/,/^!/p"') - -class TestProtocolsRIPng(unittest.TestCase): +class TestProtocolsRIPng(VyOSUnitTestSHIM.TestCase): def setUp(self): - self.session = ConfigSession(os.getpid()) - - self.session.set(['policy', 'access-list6', acl_in, 'rule', '10', 'action', 'permit']) - self.session.set(['policy', 'access-list6', acl_in, 'rule', '10', 'source', 'any']) - self.session.set(['policy', 'access-list6', acl_out, 'rule', '20', 'action', 'deny']) - self.session.set(['policy', 'access-list6', acl_out, 'rule', '20', 'source', 'any']) - self.session.set(['policy', 'prefix-list6', prefix_list_in, 'rule', '100', 'action', 'permit']) - self.session.set(['policy', 'prefix-list6', prefix_list_in, 'rule', '100', 'prefix', '2001:db8::/32']) - self.session.set(['policy', 'prefix-list6', prefix_list_out, 'rule', '200', 'action', 'deny']) - self.session.set(['policy', 'prefix-list6', prefix_list_out, 'rule', '200', 'prefix', '2001:db8::/32']) - self.session.set(['policy', 'route-map', route_map, 'rule', '10', 'action', 'permit']) + self.cli_set(['policy', 'access-list6', acl_in, 'rule', '10', 'action', 'permit']) + self.cli_set(['policy', 'access-list6', acl_in, 'rule', '10', 'source', 'any']) + self.cli_set(['policy', 'access-list6', acl_out, 'rule', '20', 'action', 'deny']) + self.cli_set(['policy', 'access-list6', acl_out, 'rule', '20', 'source', 'any']) + self.cli_set(['policy', 'prefix-list6', prefix_list_in, 'rule', '100', 'action', 'permit']) + self.cli_set(['policy', 'prefix-list6', prefix_list_in, 'rule', '100', 'prefix', '2001:db8::/32']) + self.cli_set(['policy', 'prefix-list6', prefix_list_out, 'rule', '200', 'action', 'deny']) + self.cli_set(['policy', 'prefix-list6', prefix_list_out, 'rule', '200', 'prefix', '2001:db8::/32']) + self.cli_set(['policy', 'route-map', route_map, 'rule', '10', 'action', 'permit']) def tearDown(self): - self.session.delete(base_path) - self.session.delete(['policy', 'access-list6', acl_in]) - self.session.delete(['policy', 'access-list6', acl_out]) - self.session.delete(['policy', 'prefix-list6', prefix_list_in]) - self.session.delete(['policy', 'prefix-list6', prefix_list_out]) - self.session.delete(['policy', 'route-map', route_map]) - - self.session.commit() - del self.session + self.cli_delete(base_path) + self.cli_delete(['policy', 'access-list6', acl_in]) + self.cli_delete(['policy', 'access-list6', acl_out]) + self.cli_delete(['policy', 'prefix-list6', prefix_list_in]) + self.cli_delete(['policy', 'prefix-list6', prefix_list_out]) + self.cli_delete(['policy', 'route-map', route_map]) + self.cli_commit() # Check for running process self.assertTrue(process_named_running(PROCESS_NAME)) @@ -72,38 +65,38 @@ class TestProtocolsRIPng(unittest.TestCase): timer_timeout = '1000' timer_update = '90' - self.session.set(base_path + ['default-information', 'originate']) - self.session.set(base_path + ['default-metric', metric]) - self.session.set(base_path + ['distribute-list', 'access-list', 'in', acl_in]) - self.session.set(base_path + ['distribute-list', 'access-list', 'out', acl_out]) - self.session.set(base_path + ['distribute-list', 'prefix-list', 'in', prefix_list_in]) - self.session.set(base_path + ['distribute-list', 'prefix-list', 'out', prefix_list_out]) - self.session.set(base_path + ['passive-interface', 'default']) - self.session.set(base_path + ['timers', 'garbage-collection', timer_garbage]) - self.session.set(base_path + ['timers', 'timeout', timer_timeout]) - self.session.set(base_path + ['timers', 'update', timer_update]) + self.cli_set(base_path + ['default-information', 'originate']) + self.cli_set(base_path + ['default-metric', metric]) + self.cli_set(base_path + ['distribute-list', 'access-list', 'in', acl_in]) + self.cli_set(base_path + ['distribute-list', 'access-list', 'out', acl_out]) + self.cli_set(base_path + ['distribute-list', 'prefix-list', 'in', prefix_list_in]) + self.cli_set(base_path + ['distribute-list', 'prefix-list', 'out', prefix_list_out]) + self.cli_set(base_path + ['passive-interface', 'default']) + self.cli_set(base_path + ['timers', 'garbage-collection', timer_garbage]) + self.cli_set(base_path + ['timers', 'timeout', timer_timeout]) + self.cli_set(base_path + ['timers', 'update', timer_update]) for aggregate in aggregates: - self.session.set(base_path + ['aggregate-address', aggregate]) + self.cli_set(base_path + ['aggregate-address', aggregate]) for interface in interfaces: - self.session.set(base_path + ['interface', interface]) - self.session.set(base_path + ['distribute-list', 'interface', interface, 'access-list', 'in', acl_in]) - self.session.set(base_path + ['distribute-list', 'interface', interface, 'access-list', 'out', acl_out]) - self.session.set(base_path + ['distribute-list', 'interface', interface, 'prefix-list', 'in', prefix_list_in]) - self.session.set(base_path + ['distribute-list', 'interface', interface, 'prefix-list', 'out', prefix_list_out]) + self.cli_set(base_path + ['interface', interface]) + self.cli_set(base_path + ['distribute-list', 'interface', interface, 'access-list', 'in', acl_in]) + self.cli_set(base_path + ['distribute-list', 'interface', interface, 'access-list', 'out', acl_out]) + self.cli_set(base_path + ['distribute-list', 'interface', interface, 'prefix-list', 'in', prefix_list_in]) + self.cli_set(base_path + ['distribute-list', 'interface', interface, 'prefix-list', 'out', prefix_list_out]) for network in networks: - self.session.set(base_path + ['network', network]) - self.session.set(base_path + ['route', network]) + self.cli_set(base_path + ['network', network]) + self.cli_set(base_path + ['route', network]) for proto in redistribute: - self.session.set(base_path + ['redistribute', proto, 'metric', metric]) - self.session.set(base_path + ['redistribute', proto, 'route-map', route_map]) + self.cli_set(base_path + ['redistribute', proto, 'metric', metric]) + self.cli_set(base_path + ['redistribute', proto, 'route-map', route_map]) # commit changes - self.session.commit() + self.cli_commit() # Verify FRR ospfd configuration - frrconfig = getFRRconfig() + frrconfig = self.getFRRconfig('router ripng') self.assertIn(f'router ripng', frrconfig) self.assertIn(f' default-information originate', frrconfig) self.assertIn(f' default-metric {metric}', frrconfig) diff --git a/smoketest/scripts/cli/test_protocols_rpki.py b/smoketest/scripts/cli/test_protocols_rpki.py index bec4ef76f..8212e9469 100755 --- a/smoketest/scripts/cli/test_protocols_rpki.py +++ b/smoketest/scripts/cli/test_protocols_rpki.py @@ -17,6 +17,8 @@ import os import unittest +from base_vyostest_shim import VyOSUnitTestSHIM + from vyos.configsession import ConfigSession from vyos.configsession import ConfigSessionError from vyos.util import cmd @@ -29,22 +31,15 @@ rpki_known_hosts = '/config/auth/known_hosts' rpki_ssh_key = '/config/auth/id_rsa_rpki' rpki_ssh_pub = f'{rpki_ssh_key}.pub' -def getFRRRPKIconfig(): - return cmd(f'vtysh -c "show run" | sed -n "/rpki/,/^!/p"') - -class TestProtocolsRPKI(unittest.TestCase): - def setUp(self): - self.session = ConfigSession(os.getpid()) - +class TestProtocolsRPKI(VyOSUnitTestSHIM.TestCase): def tearDown(self): - self.session.delete(base_path) - self.session.commit() - del self.session + self.cli_delete(base_path) + self.cli_commit() # Nothing RPKI specific should be left over in the config # # Disabled until T3266 is resolved - # frrconfig = getFRRRPKIconfig() + # frrconfig = self.getFRRconfig('rpki') # self.assertNotIn('rpki', frrconfig) # Check for running process @@ -71,16 +66,16 @@ class TestProtocolsRPKI(unittest.TestCase): }, } - self.session.set(base_path + ['polling-period', polling]) + self.cli_set(base_path + ['polling-period', polling]) for peer, peer_config in cache.items(): - self.session.set(base_path + ['cache', peer, 'port', peer_config['port']]) - self.session.set(base_path + ['cache', peer, 'preference', peer_config['preference']]) + self.cli_set(base_path + ['cache', peer, 'port', peer_config['port']]) + self.cli_set(base_path + ['cache', peer, 'preference', peer_config['preference']]) # commit changes - self.session.commit() + self.cli_commit() # Verify FRR configuration - frrconfig = getFRRRPKIconfig() + frrconfig = self.getFRRconfig('rpki') self.assertIn(f'rpki polling_period {polling}', frrconfig) for peer, peer_config in cache.items(): @@ -103,21 +98,21 @@ class TestProtocolsRPKI(unittest.TestCase): }, } - self.session.set(base_path + ['polling-period', polling]) + self.cli_set(base_path + ['polling-period', polling]) for peer, peer_config in cache.items(): - self.session.set(base_path + ['cache', peer, 'port', peer_config['port']]) - self.session.set(base_path + ['cache', peer, 'preference', peer_config['preference']]) - self.session.set(base_path + ['cache', peer, 'ssh', 'username', peer_config['username']]) - self.session.set(base_path + ['cache', peer, 'ssh', 'public-key-file', rpki_ssh_pub]) - self.session.set(base_path + ['cache', peer, 'ssh', 'private-key-file', rpki_ssh_key]) - self.session.set(base_path + ['cache', peer, 'ssh', 'known-hosts-file', rpki_known_hosts]) + self.cli_set(base_path + ['cache', peer, 'port', peer_config['port']]) + self.cli_set(base_path + ['cache', peer, 'preference', peer_config['preference']]) + self.cli_set(base_path + ['cache', peer, 'ssh', 'username', peer_config['username']]) + self.cli_set(base_path + ['cache', peer, 'ssh', 'public-key-file', rpki_ssh_pub]) + self.cli_set(base_path + ['cache', peer, 'ssh', 'private-key-file', rpki_ssh_key]) + self.cli_set(base_path + ['cache', peer, 'ssh', 'known-hosts-file', rpki_known_hosts]) # commit changes - self.session.commit() + self.cli_commit() # Verify FRR configuration - frrconfig = getFRRRPKIconfig() + frrconfig = self.getFRRconfig('rpki') self.assertIn(f'rpki polling_period {polling}', frrconfig) for peer, peer_config in cache.items(): @@ -140,12 +135,12 @@ class TestProtocolsRPKI(unittest.TestCase): } for peer, peer_config in cache.items(): - self.session.set(base_path + ['cache', peer, 'port', peer_config['port']]) - self.session.set(base_path + ['cache', peer, 'preference', peer_config['preference']]) + self.cli_set(base_path + ['cache', peer, 'port', peer_config['port']]) + self.cli_set(base_path + ['cache', peer, 'preference', peer_config['preference']]) # check validate() - preferences must be unique with self.assertRaises(ConfigSessionError): - self.session.commit() + self.cli_commit() if __name__ == '__main__': diff --git a/smoketest/scripts/cli/test_protocols_static.py b/smoketest/scripts/cli/test_protocols_static.py index 100fd3387..de9b48de4 100755 --- a/smoketest/scripts/cli/test_protocols_static.py +++ b/smoketest/scripts/cli/test_protocols_static.py @@ -19,6 +19,8 @@ import os import json import unittest +from base_vyostest_shim import VyOSUnitTestSHIM + from netifaces import interfaces from vyos.configsession import ConfigSession @@ -71,29 +73,27 @@ interface_routes = { }, } - -class StaticRouteTest(unittest.TestCase): +class StaticRouteTest(VyOSUnitTestSHIM.TestCase): def setUp(self): - self.session = ConfigSession(os.getpid()) # we need an alive next-hop interface - self.session.set(['interfaces', 'dummy', dummy_if, 'address', '192.0.2.1/24']) - self.session.set(['interfaces', 'dummy', dummy_if, 'address', '2001:db8::1/64']) + self.cli_set(['interfaces', 'dummy', dummy_if, 'address', '192.0.2.1/24']) + self.cli_set(['interfaces', 'dummy', dummy_if, 'address', '2001:db8::1/64']) def tearDown(self): - self.session.delete(['interfaces', 'dummy', dummy_if]) - self.session.commit() + self.cli_delete(['interfaces', 'dummy', dummy_if]) + self.cli_commit() def test_static_routes(self): for route, route_config in routes.items(): route_type = 'route' if is_ipv6(route): route_type = 'route6' - self.session.set(base_path + [route_type, route, 'next-hop', route_config['next_hop']]) + self.cli_set(base_path + [route_type, route, 'next-hop', route_config['next_hop']]) if 'distance' in route_config: - self.session.set(base_path + [route_type, route, 'next-hop', route_config['next_hop'], 'distance', route_config['distance']]) + self.cli_set(base_path + [route_type, route, 'next-hop', route_config['next_hop'], 'distance', route_config['distance']]) # commit changes - self.session.commit() + self.cli_commit() # Verify routes for route, route_config in routes.items(): @@ -114,19 +114,19 @@ class StaticRouteTest(unittest.TestCase): route_type = 'route' if is_ipv6(route): route_type = 'route6' - self.session.delete(base_path + [route_type, route]) + self.cli_delete(base_path + [route_type, route]) def test_interface_routes(self): for route, route_config in interface_routes.items(): route_type = 'interface-route' if is_ipv6(route): route_type = 'interface-route6' - self.session.set(base_path + [route_type, route, 'next-hop-interface', route_config['next_hop']]) + self.cli_set(base_path + [route_type, route, 'next-hop-interface', route_config['next_hop']]) if 'distance' in route_config: - self.session.set(base_path + [route_type, route, 'next-hop-interface', route_config['next_hop'], 'distance', route_config['distance']]) + self.cli_set(base_path + [route_type, route, 'next-hop-interface', route_config['next_hop'], 'distance', route_config['distance']]) # commit changes - self.session.commit() + self.cli_commit() # Verify routes for route, route_config in interface_routes.items(): @@ -148,7 +148,7 @@ class StaticRouteTest(unittest.TestCase): route_type = 'interface-route' if is_ipv6(route): route_type = 'interface-route6' - self.session.delete(base_path + [route_type, route]) + self.cli_delete(base_path + [route_type, route]) if __name__ == '__main__': - unittest.main(verbosity=2, failfast=True) + unittest.main(verbosity=2) diff --git a/smoketest/scripts/cli/test_service_bcast-relay.py b/smoketest/scripts/cli/test_service_bcast-relay.py index c28509714..58b730ab4 100755 --- a/smoketest/scripts/cli/test_service_bcast-relay.py +++ b/smoketest/scripts/cli/test_service_bcast-relay.py @@ -14,47 +14,46 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . -import os import unittest +from base_vyostest_shim import VyOSUnitTestSHIM + from psutil import process_iter -from vyos.configsession import ConfigSession, ConfigSessionError +from vyos.configsession import ConfigSession +from vyos.configsession import ConfigSessionError base_path = ['service', 'broadcast-relay'] -class TestServiceBroadcastRelay(unittest.TestCase): +class TestServiceBroadcastRelay(VyOSUnitTestSHIM.TestCase): _address1 = '192.0.2.1/24' _address2 = '192.0.2.1/24' def setUp(self): - self.session = ConfigSession(os.getpid()) - self.session.set(['interfaces', 'dummy', 'dum1001', 'address', self._address1]) - self.session.set(['interfaces', 'dummy', 'dum1002', 'address', self._address2]) - self.session.commit() + self.cli_set(['interfaces', 'dummy', 'dum1001', 'address', self._address1]) + self.cli_set(['interfaces', 'dummy', 'dum1002', 'address', self._address2]) def tearDown(self): - self.session.delete(['interfaces', 'dummy', 'dum1001']) - self.session.delete(['interfaces', 'dummy', 'dum1002']) - self.session.delete(base_path) - self.session.commit() - del self.session + self.cli_delete(['interfaces', 'dummy', 'dum1001']) + self.cli_delete(['interfaces', 'dummy', 'dum1002']) + self.cli_delete(base_path) + self.cli_commit() def test_broadcast_relay_service(self): ids = range(1, 5) for id in ids: base = base_path + ['id', str(id)] - self.session.set(base + ['description', 'vyos']) - self.session.set(base + ['port', str(10000 + id)]) + self.cli_set(base + ['description', 'vyos']) + self.cli_set(base + ['port', str(10000 + id)]) # check validate() - two interfaces must be present with self.assertRaises(ConfigSessionError): - self.session.commit() + self.cli_commit() - self.session.set(base + ['interface', 'dum1001']) - self.session.set(base + ['interface', 'dum1002']) - self.session.set(base + ['address', self._address1.split('/')[0]]) + self.cli_set(base + ['interface', 'dum1001']) + self.cli_set(base + ['interface', 'dum1002']) + self.cli_set(base + ['address', self._address1.split('/')[0]]) - self.session.commit() + self.cli_commit() for id in ids: # check if process is running diff --git a/smoketest/scripts/cli/test_service_dhcp-relay.py b/smoketest/scripts/cli/test_service_dhcp-relay.py index 676c4a481..db2edba54 100755 --- a/smoketest/scripts/cli/test_service_dhcp-relay.py +++ b/smoketest/scripts/cli/test_service_dhcp-relay.py @@ -14,14 +14,13 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . -import re -import os import unittest +from base_vyostest_shim import VyOSUnitTestSHIM + from vyos.configsession import ConfigSession from vyos.configsession import ConfigSessionError from vyos.ifconfig import Section -from vyos.util import cmd from vyos.util import process_named_running from vyos.util import read_file @@ -29,14 +28,10 @@ PROCESS_NAME = 'dhcrelay' RELAY_CONF = '/run/dhcp-relay/dhcrelay.conf' base_path = ['service', 'dhcp-relay'] -class TestServiceDHCPRelay(unittest.TestCase): - def setUp(self): - self.session = ConfigSession(os.getpid()) - +class TestServiceDHCPRelay(VyOSUnitTestSHIM.TestCase): def tearDown(self): - self.session.delete(base_path) - self.session.commit() - del self.session + self.cli_delete(base_path) + self.cli_commit() def test_relay_default(self): max_size = '800' @@ -44,28 +39,28 @@ class TestServiceDHCPRelay(unittest.TestCase): agents_packets = 'append' servers = ['192.0.2.1', '192.0.2.2'] - self.session.set(base_path + ['interface', 'lo']) + self.cli_set(base_path + ['interface', 'lo']) # check validate() - DHCP relay does not support the loopback interface with self.assertRaises(ConfigSessionError): - self.session.commit() - self.session.delete(base_path + ['interface', 'lo']) + self.cli_commit() + self.cli_delete(base_path + ['interface', 'lo']) # activate DHCP relay on all ethernet interfaces for tmp in Section.interfaces("ethernet"): - self.session.set(base_path + ['interface', tmp]) + self.cli_set(base_path + ['interface', tmp]) # check validate() - No DHCP relay server(s) configured with self.assertRaises(ConfigSessionError): - self.session.commit() + self.cli_commit() for server in servers: - self.session.set(base_path + ['server', server]) + self.cli_set(base_path + ['server', server]) - self.session.set(base_path + ['relay-options', 'max-size', max_size]) - self.session.set(base_path + ['relay-options', 'hop-count', hop_count]) - self.session.set(base_path + ['relay-options', 'relay-agents-packets', agents_packets]) + self.cli_set(base_path + ['relay-options', 'max-size', max_size]) + self.cli_set(base_path + ['relay-options', 'hop-count', hop_count]) + self.cli_set(base_path + ['relay-options', 'relay-agents-packets', agents_packets]) # commit changes - self.session.commit() + self.cli_commit() # Check configured port config = read_file(RELAY_CONF) diff --git a/smoketest/scripts/cli/test_service_dhcp-server.py b/smoketest/scripts/cli/test_service_dhcp-server.py index 2a5789e70..815bd333a 100755 --- a/smoketest/scripts/cli/test_service_dhcp-server.py +++ b/smoketest/scripts/cli/test_service_dhcp-server.py @@ -14,13 +14,12 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . -import re -import os import unittest +from base_vyostest_shim import VyOSUnitTestSHIM + from vyos.configsession import ConfigSession from vyos.configsession import ConfigSessionError -from vyos.util import cmd from vyos.util import process_named_running from vyos.util import read_file from vyos.template import address_from_cidr @@ -37,17 +36,15 @@ dns_1 = inc_ip(subnet, 2) dns_2 = inc_ip(subnet, 3) domain_name = 'vyos.net' -class TestServiceDHCPServer(unittest.TestCase): +class TestServiceDHCPServer(VyOSUnitTestSHIM.TestCase): def setUp(self): - self.session = ConfigSession(os.getpid()) cidr_mask = subnet.split('/')[-1] - self.session.set(['interfaces', 'dummy', 'dum8765', 'address', f'{router}/{cidr_mask}']) + self.cli_set(['interfaces', 'dummy', 'dum8765', 'address', f'{router}/{cidr_mask}']) def tearDown(self): - self.session.delete(['interfaces', 'dummy', 'dum8765']) - self.session.delete(base_path) - self.session.commit() - del self.session + self.cli_delete(['interfaces', 'dummy', 'dum8765']) + self.cli_delete(base_path) + self.cli_commit() def test_dhcp_single_pool_range(self): shared_net_name = 'SMOKE-1' @@ -57,25 +54,25 @@ class TestServiceDHCPServer(unittest.TestCase): range_1_start = inc_ip(subnet, 40) range_1_stop = inc_ip(subnet, 50) - self.session.set(base_path + ['dynamic-dns-update']) + self.cli_set(base_path + ['dynamic-dns-update']) pool = base_path + ['shared-network-name', shared_net_name, 'subnet', subnet] # we use the first subnet IP address as default gateway - self.session.set(pool + ['default-router', router]) - self.session.set(pool + ['dns-server', dns_1]) - self.session.set(pool + ['dns-server', dns_2]) - self.session.set(pool + ['domain-name', domain_name]) + self.cli_set(pool + ['default-router', router]) + self.cli_set(pool + ['dns-server', dns_1]) + self.cli_set(pool + ['dns-server', dns_2]) + self.cli_set(pool + ['domain-name', domain_name]) # check validate() - No DHCP address range or active static-mapping set with self.assertRaises(ConfigSessionError): - self.session.commit() - self.session.set(pool + ['range', '0', 'start', range_0_start]) - self.session.set(pool + ['range', '0', 'stop', range_0_stop]) - self.session.set(pool + ['range', '1', 'start', range_1_start]) - self.session.set(pool + ['range', '1', 'stop', range_1_stop]) + self.cli_commit() + self.cli_set(pool + ['range', '0', 'start', range_0_start]) + self.cli_set(pool + ['range', '0', 'stop', range_0_stop]) + self.cli_set(pool + ['range', '1', 'start', range_1_start]) + self.cli_set(pool + ['range', '1', 'stop', range_1_stop]) # commit changes - self.session.commit() + self.cli_commit() config = read_file(DHCPD_CONF) network = address_from_cidr(subnet) @@ -110,42 +107,42 @@ class TestServiceDHCPServer(unittest.TestCase): pool = base_path + ['shared-network-name', shared_net_name, 'subnet', subnet] # we use the first subnet IP address as default gateway - self.session.set(pool + ['default-router', router]) - self.session.set(pool + ['dns-server', dns_1]) - self.session.set(pool + ['dns-server', dns_2]) - self.session.set(pool + ['domain-name', domain_name]) - self.session.set(pool + ['ip-forwarding']) - self.session.set(pool + ['smtp-server', smtp_server]) - self.session.set(pool + ['pop-server', smtp_server]) - self.session.set(pool + ['time-server', time_server]) - self.session.set(pool + ['tftp-server-name', tftp_server]) + self.cli_set(pool + ['default-router', router]) + self.cli_set(pool + ['dns-server', dns_1]) + self.cli_set(pool + ['dns-server', dns_2]) + self.cli_set(pool + ['domain-name', domain_name]) + self.cli_set(pool + ['ip-forwarding']) + self.cli_set(pool + ['smtp-server', smtp_server]) + self.cli_set(pool + ['pop-server', smtp_server]) + self.cli_set(pool + ['time-server', time_server]) + self.cli_set(pool + ['tftp-server-name', tftp_server]) for search in search_domains: - self.session.set(pool + ['domain-search', search]) - self.session.set(pool + ['bootfile-name', bootfile_name]) - self.session.set(pool + ['bootfile-server', bootfile_server]) - self.session.set(pool + ['wpad-url', wpad]) - self.session.set(pool + ['server-identifier', server_identifier]) + self.cli_set(pool + ['domain-search', search]) + self.cli_set(pool + ['bootfile-name', bootfile_name]) + self.cli_set(pool + ['bootfile-server', bootfile_server]) + self.cli_set(pool + ['wpad-url', wpad]) + self.cli_set(pool + ['server-identifier', server_identifier]) - self.session.set(pool + ['static-route', 'destination-subnet', '10.0.0.0/24']) - self.session.set(pool + ['static-route', 'router', '192.0.2.1']) + self.cli_set(pool + ['static-route', 'destination-subnet', '10.0.0.0/24']) + self.cli_set(pool + ['static-route', 'router', '192.0.2.1']) # check validate() - No DHCP address range or active static-mapping set with self.assertRaises(ConfigSessionError): - self.session.commit() - self.session.set(pool + ['range', '0', 'start', range_0_start]) - self.session.set(pool + ['range', '0', 'stop', range_0_stop]) + self.cli_commit() + self.cli_set(pool + ['range', '0', 'start', range_0_start]) + self.cli_set(pool + ['range', '0', 'stop', range_0_stop]) # failover failover_local = router failover_remote = inc_ip(router, 1) - self.session.set(pool + ['failover', 'local-address', failover_local]) - self.session.set(pool + ['failover', 'name', shared_net_name]) - self.session.set(pool + ['failover', 'peer-address', failover_remote]) - self.session.set(pool + ['failover', 'status', 'primary']) + self.cli_set(pool + ['failover', 'local-address', failover_local]) + self.cli_set(pool + ['failover', 'name', shared_net_name]) + self.cli_set(pool + ['failover', 'peer-address', failover_remote]) + self.cli_set(pool + ['failover', 'status', 'primary']) # commit changes - self.session.commit() + self.cli_commit() config = read_file(DHCPD_CONF) @@ -204,24 +201,24 @@ class TestServiceDHCPServer(unittest.TestCase): pool = base_path + ['shared-network-name', shared_net_name, 'subnet', subnet] # we use the first subnet IP address as default gateway - self.session.set(pool + ['default-router', router]) - self.session.set(pool + ['dns-server', dns_1]) - self.session.set(pool + ['dns-server', dns_2]) - self.session.set(pool + ['domain-name', domain_name]) + self.cli_set(pool + ['default-router', router]) + self.cli_set(pool + ['dns-server', dns_1]) + self.cli_set(pool + ['dns-server', dns_2]) + self.cli_set(pool + ['domain-name', domain_name]) # check validate() - No DHCP address range or active static-mapping set with self.assertRaises(ConfigSessionError): - self.session.commit() + self.cli_commit() client_base = 10 for client in ['client1', 'client2', 'client3']: mac = '00:50:00:00:00:{}'.format(client_base) - self.session.set(pool + ['static-mapping', client, 'mac-address', mac]) - self.session.set(pool + ['static-mapping', client, 'ip-address', inc_ip(subnet, client_base)]) + self.cli_set(pool + ['static-mapping', client, 'mac-address', mac]) + self.cli_set(pool + ['static-mapping', client, 'ip-address', inc_ip(subnet, client_base)]) client_base += 1 # commit changes - self.session.commit() + self.cli_commit() config = read_file(DHCPD_CONF) network = address_from_cidr(subnet) @@ -264,25 +261,25 @@ class TestServiceDHCPServer(unittest.TestCase): pool = base_path + ['shared-network-name', shared_net_name, 'subnet', subnet] # we use the first subnet IP address as default gateway - self.session.set(pool + ['default-router', router]) - self.session.set(pool + ['dns-server', dns_1]) - self.session.set(pool + ['domain-name', domain_name]) - self.session.set(pool + ['lease', lease_time]) + self.cli_set(pool + ['default-router', router]) + self.cli_set(pool + ['dns-server', dns_1]) + self.cli_set(pool + ['domain-name', domain_name]) + self.cli_set(pool + ['lease', lease_time]) - self.session.set(pool + ['range', '0', 'start', range_0_start]) - self.session.set(pool + ['range', '0', 'stop', range_0_stop]) - self.session.set(pool + ['range', '1', 'start', range_1_start]) - self.session.set(pool + ['range', '1', 'stop', range_1_stop]) + self.cli_set(pool + ['range', '0', 'start', range_0_start]) + self.cli_set(pool + ['range', '0', 'stop', range_0_stop]) + self.cli_set(pool + ['range', '1', 'start', range_1_start]) + self.cli_set(pool + ['range', '1', 'stop', range_1_stop]) client_base = 60 for client in ['client1', 'client2', 'client3', 'client4']: mac = '02:50:00:00:00:{}'.format(client_base) - self.session.set(pool + ['static-mapping', client, 'mac-address', mac]) - self.session.set(pool + ['static-mapping', client, 'ip-address', inc_ip(subnet, client_base)]) + self.cli_set(pool + ['static-mapping', client, 'mac-address', mac]) + self.cli_set(pool + ['static-mapping', client, 'ip-address', inc_ip(subnet, client_base)]) client_base += 1 # commit changes - self.session.commit() + self.cli_commit() config = read_file(DHCPD_CONF) for network in ['0', '1', '2', '3']: @@ -329,13 +326,13 @@ class TestServiceDHCPServer(unittest.TestCase): range_0_stop = inc_ip(subnet, 20) pool = base_path + ['shared-network-name', 'EXCLUDE-TEST', 'subnet', subnet] - self.session.set(pool + ['default-router', router]) - self.session.set(pool + ['exclude', router]) - self.session.set(pool + ['range', '0', 'start', range_0_start]) - self.session.set(pool + ['range', '0', 'stop', range_0_stop]) + self.cli_set(pool + ['default-router', router]) + self.cli_set(pool + ['exclude', router]) + self.cli_set(pool + ['range', '0', 'start', range_0_start]) + self.cli_set(pool + ['range', '0', 'stop', range_0_stop]) # commit changes - self.session.commit() + self.cli_commit() # VErify config = read_file(DHCPD_CONF) @@ -362,13 +359,13 @@ class TestServiceDHCPServer(unittest.TestCase): range_0_start_excl = inc_ip(exclude_addr, 1) pool = base_path + ['shared-network-name', 'EXCLUDE-TEST-2', 'subnet', subnet] - self.session.set(pool + ['default-router', router]) - self.session.set(pool + ['exclude', exclude_addr]) - self.session.set(pool + ['range', '0', 'start', range_0_start]) - self.session.set(pool + ['range', '0', 'stop', range_0_stop]) + self.cli_set(pool + ['default-router', router]) + self.cli_set(pool + ['exclude', exclude_addr]) + self.cli_set(pool + ['range', '0', 'start', range_0_start]) + self.cli_set(pool + ['range', '0', 'stop', range_0_stop]) # commit changes - self.session.commit() + self.cli_commit() # Verify config = read_file(DHCPD_CONF) @@ -386,7 +383,7 @@ class TestServiceDHCPServer(unittest.TestCase): def test_dhcp_relay_server(self): # Listen on specific address and return DHCP leases from a non # directly connected pool - self.session.set(base_path + ['listen-address', router]) + self.cli_set(base_path + ['listen-address', router]) relay_subnet = '10.0.0.0/16' relay_router = inc_ip(relay_subnet, 1) @@ -395,12 +392,12 @@ class TestServiceDHCPServer(unittest.TestCase): range_0_stop = '10.0.250.255' pool = base_path + ['shared-network-name', 'RELAY', 'subnet', relay_subnet] - self.session.set(pool + ['default-router', relay_router]) - self.session.set(pool + ['range', '0', 'start', range_0_start]) - self.session.set(pool + ['range', '0', 'stop', range_0_stop]) + self.cli_set(pool + ['default-router', relay_router]) + self.cli_set(pool + ['range', '0', 'start', range_0_start]) + self.cli_set(pool + ['range', '0', 'stop', range_0_stop]) # commit changes - self.session.commit() + self.cli_commit() config = read_file(DHCPD_CONF) network = address_from_cidr(subnet) @@ -425,18 +422,18 @@ class TestServiceDHCPServer(unittest.TestCase): pool = base_path + ['shared-network-name', shared_net_name, 'subnet', subnet] # we use the first subnet IP address as default gateway - self.session.set(pool + ['default-router', router]) - self.session.set(pool + ['range', '0', 'start', range_0_start]) - self.session.set(pool + ['range', '0', 'stop', range_0_stop]) + self.cli_set(pool + ['default-router', router]) + self.cli_set(pool + ['range', '0', 'start', range_0_start]) + self.cli_set(pool + ['range', '0', 'stop', range_0_stop]) - self.session.set(base_path + ['global-parameters', 'this-is-crap']) + self.cli_set(base_path + ['global-parameters', 'this-is-crap']) # check generate() - dhcpd should not acceot this garbage config with self.assertRaises(ConfigSessionError): - self.session.commit() - self.session.delete(base_path + ['global-parameters']) + self.cli_commit() + self.cli_delete(base_path + ['global-parameters']) # commit changes - self.session.commit() + self.cli_commit() # Check for running process self.assertTrue(process_named_running(PROCESS_NAME)) diff --git a/smoketest/scripts/cli/test_service_dhcpv6-relay.py b/smoketest/scripts/cli/test_service_dhcpv6-relay.py index e36c237bc..5a9dd1aa6 100755 --- a/smoketest/scripts/cli/test_service_dhcpv6-relay.py +++ b/smoketest/scripts/cli/test_service_dhcpv6-relay.py @@ -14,15 +14,14 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . -import re -import os import unittest +from base_vyostest_shim import VyOSUnitTestSHIM + from vyos.configsession import ConfigSession from vyos.configsession import ConfigSessionError from vyos.ifconfig import Section from vyos.template import address_from_cidr -from vyos.util import cmd from vyos.util import process_named_running from vyos.util import read_file @@ -35,52 +34,50 @@ upstream_if_addr = '2001:db8::1/64' listen_addr = '2001:db8:ffff::1/64' interfaces = [] -class TestServiceDHCPv6Relay(unittest.TestCase): +class TestServiceDHCPv6Relay(VyOSUnitTestSHIM.TestCase): def setUp(self): - self.session = ConfigSession(os.getpid()) for tmp in interfaces: listen = listen_addr if tmp == upstream_if: listen = upstream_if_addr - self.session.set(['interfaces', 'ethernet', tmp, 'address', listen]) + self.cli_set(['interfaces', 'ethernet', tmp, 'address', listen]) def tearDown(self): - self.session.delete(base_path) + self.cli_delete(base_path) for tmp in interfaces: listen = listen_addr if tmp == upstream_if: listen = upstream_if_addr - self.session.delete(['interfaces', 'ethernet', tmp, 'address', listen]) + self.cli_delete(['interfaces', 'ethernet', tmp, 'address', listen]) - self.session.commit() - del self.session + self.cli_commit() def test_relay_default(self): dhcpv6_server = '2001:db8::ffff' hop_count = '20' - self.session.set(base_path + ['use-interface-id-option']) - self.session.set(base_path + ['max-hop-count', hop_count]) + self.cli_set(base_path + ['use-interface-id-option']) + self.cli_set(base_path + ['max-hop-count', hop_count]) # check validate() - Must set at least one listen and upstream # interface addresses. with self.assertRaises(ConfigSessionError): - self.session.commit() - self.session.set(base_path + ['upstream-interface', upstream_if, 'address', dhcpv6_server]) + self.cli_commit() + self.cli_set(base_path + ['upstream-interface', upstream_if, 'address', dhcpv6_server]) # check validate() - Must set at least one listen and upstream # interface addresses. with self.assertRaises(ConfigSessionError): - self.session.commit() + self.cli_commit() # add listener on all ethernet interfaces except the upstream interface for tmp in interfaces: if tmp == upstream_if: continue - self.session.set(base_path + ['listen-interface', tmp, 'address', listen_addr.split('/')[0]]) + self.cli_set(base_path + ['listen-interface', tmp, 'address', listen_addr.split('/')[0]]) # commit changes - self.session.commit() + self.cli_commit() # Check configured port config = read_file(RELAY_CONF) diff --git a/smoketest/scripts/cli/test_service_dhcpv6-server.py b/smoketest/scripts/cli/test_service_dhcpv6-server.py index f97c7963f..3f9564e59 100755 --- a/smoketest/scripts/cli/test_service_dhcpv6-server.py +++ b/smoketest/scripts/cli/test_service_dhcpv6-server.py @@ -14,14 +14,13 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . -import re -import os import unittest +from base_vyostest_shim import VyOSUnitTestSHIM + from vyos.configsession import ConfigSession from vyos.configsession import ConfigSessionError from vyos.template import inc_ip -from vyos.util import cmd from vyos.util import process_named_running from vyos.util import read_file @@ -37,16 +36,14 @@ nis_servers = ['2001:db8:ffff::1', '2001:db8:ffff::2'] interface = 'eth1' interface_addr = inc_ip(subnet, 1) + '/64' -class TestServiceDHCPServer(unittest.TestCase): +class TestServiceDHCPServer(VyOSUnitTestSHIM.TestCase): def setUp(self): - self.session = ConfigSession(os.getpid()) - self.session.set(['interfaces', 'ethernet', interface, 'address', interface_addr]) + self.cli_set(['interfaces', 'ethernet', interface, 'address', interface_addr]) def tearDown(self): - self.session.delete(base_path) - self.session.delete(['interfaces', 'ethernet', interface, 'address', interface_addr]) - self.session.commit() - del self.session + self.cli_delete(base_path) + self.cli_delete(['interfaces', 'ethernet', interface, 'address', interface_addr]) + self.cli_commit() def test_single_pool(self): shared_net_name = 'SMOKE-1' @@ -62,38 +59,38 @@ class TestServiceDHCPServer(unittest.TestCase): pool = base_path + ['shared-network-name', shared_net_name, 'subnet', subnet] - self.session.set(base_path + ['preference', preference]) + self.cli_set(base_path + ['preference', preference]) # we use the first subnet IP address as default gateway - self.session.set(pool + ['name-server', dns_1]) - self.session.set(pool + ['name-server', dns_2]) - self.session.set(pool + ['name-server', dns_2]) - self.session.set(pool + ['lease-time', 'default', lease_time]) - self.session.set(pool + ['lease-time', 'maximum', max_lease_time]) - self.session.set(pool + ['lease-time', 'minimum', min_lease_time]) - self.session.set(pool + ['nis-domain', domain]) - self.session.set(pool + ['nisplus-domain', domain]) - self.session.set(pool + ['sip-server', sip_server]) - self.session.set(pool + ['sntp-server', sntp_server]) - self.session.set(pool + ['address-range', 'start', range_start, 'stop', range_stop]) + self.cli_set(pool + ['name-server', dns_1]) + self.cli_set(pool + ['name-server', dns_2]) + self.cli_set(pool + ['name-server', dns_2]) + self.cli_set(pool + ['lease-time', 'default', lease_time]) + self.cli_set(pool + ['lease-time', 'maximum', max_lease_time]) + self.cli_set(pool + ['lease-time', 'minimum', min_lease_time]) + self.cli_set(pool + ['nis-domain', domain]) + self.cli_set(pool + ['nisplus-domain', domain]) + self.cli_set(pool + ['sip-server', sip_server]) + self.cli_set(pool + ['sntp-server', sntp_server]) + self.cli_set(pool + ['address-range', 'start', range_start, 'stop', range_stop]) for server in nis_servers: - self.session.set(pool + ['nis-server', server]) - self.session.set(pool + ['nisplus-server', server]) + self.cli_set(pool + ['nis-server', server]) + self.cli_set(pool + ['nisplus-server', server]) for search in search_domains: - self.session.set(pool + ['domain-search', search]) + self.cli_set(pool + ['domain-search', search]) client_base = 1 for client in ['client1', 'client2', 'client3']: cid = '00:01:00:01:12:34:56:78:aa:bb:cc:dd:ee:{}'.format(client_base) - self.session.set(pool + ['static-mapping', client, 'identifier', cid]) - self.session.set(pool + ['static-mapping', client, 'ipv6-address', inc_ip(subnet, client_base)]) - self.session.set(pool + ['static-mapping', client, 'ipv6-prefix', inc_ip(subnet, client_base << 64) + '/64']) + self.cli_set(pool + ['static-mapping', client, 'identifier', cid]) + self.cli_set(pool + ['static-mapping', client, 'ipv6-address', inc_ip(subnet, client_base)]) + self.cli_set(pool + ['static-mapping', client, 'ipv6-prefix', inc_ip(subnet, client_base << 64) + '/64']) client_base += 1 # commit changes - self.session.commit() + self.cli_commit() config = read_file(DHCPD_CONF) self.assertIn(f'option dhcp6.preference {preference};', config) @@ -139,12 +136,12 @@ class TestServiceDHCPServer(unittest.TestCase): pool = base_path + ['shared-network-name', shared_net_name, 'subnet', subnet] - self.session.set(pool + ['address-range', 'start', range_start, 'stop', range_stop]) - self.session.set(pool + ['prefix-delegation', 'start', delegate_start, 'stop', delegate_stop]) - self.session.set(pool + ['prefix-delegation', 'start', delegate_start, 'prefix-length', delegate_len]) + self.cli_set(pool + ['address-range', 'start', range_start, 'stop', range_stop]) + self.cli_set(pool + ['prefix-delegation', 'start', delegate_start, 'stop', delegate_stop]) + self.cli_set(pool + ['prefix-delegation', 'start', delegate_start, 'prefix-length', delegate_len]) # commit changes - self.session.commit() + self.cli_commit() config = read_file(DHCPD_CONF) self.assertIn(f'subnet6 {subnet}' + r' {', config) @@ -159,12 +156,12 @@ class TestServiceDHCPServer(unittest.TestCase): ns_global_1 = '2001:db8::1111' ns_global_2 = '2001:db8::2222' - self.session.set(base_path + ['global-parameters', 'name-server', ns_global_1]) - self.session.set(base_path + ['global-parameters', 'name-server', ns_global_2]) - self.session.set(base_path + ['shared-network-name', shared_net_name, 'subnet', subnet]) + self.cli_set(base_path + ['global-parameters', 'name-server', ns_global_1]) + self.cli_set(base_path + ['global-parameters', 'name-server', ns_global_2]) + self.cli_set(base_path + ['shared-network-name', shared_net_name, 'subnet', subnet]) # commit changes - self.session.commit() + self.cli_commit() config = read_file(DHCPD_CONF) self.assertIn(f'option dhcp6.name-servers {ns_global_1}, {ns_global_2};', config) diff --git a/smoketest/scripts/cli/test_service_dns_dynamic.py b/smoketest/scripts/cli/test_service_dns_dynamic.py index 83eede64a..d8a87ffd4 100755 --- a/smoketest/scripts/cli/test_service_dns_dynamic.py +++ b/smoketest/scripts/cli/test_service_dns_dynamic.py @@ -18,10 +18,11 @@ import re import os import unittest -from getpass import getuser +from base_vyostest_shim import VyOSUnitTestSHIM + from vyos.configsession import ConfigSession from vyos.configsession import ConfigSessionError -from vyos.util import read_file +from vyos.util import cmd from vyos.util import process_named_running PROCESS_NAME = 'ddclient' @@ -29,21 +30,17 @@ DDCLIENT_CONF = '/run/ddclient/ddclient.conf' base_path = ['service', 'dns', 'dynamic'] def get_config_value(key): - tmp = read_file(DDCLIENT_CONF) + tmp = cmd(f'sudo cat {DDCLIENT_CONF}') tmp = re.findall(r'\n?{}=+(.*)'.format(key), tmp) tmp = tmp[0].rstrip(',') return tmp -class TestServiceDDNS(unittest.TestCase): - def setUp(self): - self.session = ConfigSession(os.getpid()) +class TestServiceDDNS(VyOSUnitTestSHIM.TestCase): def tearDown(self): # Delete DDNS configuration - self.session.delete(base_path) - self.session.commit() - - del self.session + self.cli_delete(base_path) + self.cli_commit() def test_dyndns_service(self): ddns = ['interface', 'eth0', 'service'] @@ -53,45 +50,44 @@ class TestServiceDDNS(unittest.TestCase): user = 'vyos_user' password = 'vyos_pass' zone = 'vyos.io' - self.session.delete(base_path) - self.session.set(base_path + ddns + [service, 'host-name', 'test.ddns.vyos.io']) - self.session.set(base_path + ddns + [service, 'login', user]) - self.session.set(base_path + ddns + [service, 'password', password]) - self.session.set(base_path + ddns + [service, 'zone', zone]) + self.cli_delete(base_path) + self.cli_set(base_path + ddns + [service, 'host-name', 'test.ddns.vyos.io']) + self.cli_set(base_path + ddns + [service, 'login', user]) + self.cli_set(base_path + ddns + [service, 'password', password]) + self.cli_set(base_path + ddns + [service, 'zone', zone]) # commit changes if service == 'cloudflare': - self.session.commit() + self.cli_commit() else: # zone option only works on cloudflare, an exception is raised # for all others with self.assertRaises(ConfigSessionError): - self.session.commit() - self.session.delete(base_path + ddns + [service, 'zone', 'vyos.io']) + self.cli_commit() + self.cli_delete(base_path + ddns + [service, 'zone', 'vyos.io']) # commit changes again - now it should work - self.session.commit() + self.cli_commit() # we can only read the configuration file when we operate as 'root' - if getuser() == 'root': - protocol = get_config_value('protocol') - login = get_config_value('login') - pwd = get_config_value('password') - - # some services need special treatment - protoname = service - if service == 'cloudflare': - tmp = get_config_value('zone') - self.assertTrue(tmp == zone) - elif service == 'afraid': - protoname = 'freedns' - elif service == 'dyndns': - protoname = 'dyndns2' - elif service == 'zoneedit': - protoname = 'zoneedit1' - - self.assertTrue(protocol == protoname) - self.assertTrue(login == user) - self.assertTrue(pwd == "'" + password + "'") + protocol = get_config_value('protocol') + login = get_config_value('login') + pwd = get_config_value('password') + + # some services need special treatment + protoname = service + if service == 'cloudflare': + tmp = get_config_value('zone') + self.assertTrue(tmp == zone) + elif service == 'afraid': + protoname = 'freedns' + elif service == 'dyndns': + protoname = 'dyndns2' + elif service == 'zoneedit': + protoname = 'zoneedit1' + + self.assertTrue(protocol == protoname) + self.assertTrue(login == user) + self.assertTrue(pwd == "'" + password + "'") # Check for running process self.assertTrue(process_named_running(PROCESS_NAME)) @@ -101,11 +97,11 @@ class TestServiceDDNS(unittest.TestCase): ddns = ['interface', 'eth0', 'rfc2136', 'vyos'] ddns_key_file = '/config/auth/my.key' - self.session.set(base_path + ddns + ['key', ddns_key_file]) - self.session.set(base_path + ddns + ['record', 'test.ddns.vyos.io']) - self.session.set(base_path + ddns + ['server', 'ns1.vyos.io']) - self.session.set(base_path + ddns + ['ttl', '300']) - self.session.set(base_path + ddns + ['zone', 'vyos.io']) + self.cli_set(base_path + ddns + ['key', ddns_key_file]) + self.cli_set(base_path + ddns + ['record', 'test.ddns.vyos.io']) + self.cli_set(base_path + ddns + ['server', 'ns1.vyos.io']) + self.cli_set(base_path + ddns + ['ttl', '300']) + self.cli_set(base_path + ddns + ['zone', 'vyos.io']) # ensure an exception will be raised as no key is present if os.path.exists(ddns_key_file): @@ -113,13 +109,13 @@ class TestServiceDDNS(unittest.TestCase): # check validate() - the key file does not exist yet with self.assertRaises(ConfigSessionError): - self.session.commit() + self.cli_commit() with open(ddns_key_file, 'w') as f: f.write('S3cretKey') # commit changes - self.session.commit() + self.cli_commit() # TODO: inspect generated configuration file diff --git a/smoketest/scripts/cli/test_service_dns_forwarding.py b/smoketest/scripts/cli/test_service_dns_forwarding.py index ada53e8dd..8005eb319 100755 --- a/smoketest/scripts/cli/test_service_dns_forwarding.py +++ b/smoketest/scripts/cli/test_service_dns_forwarding.py @@ -15,10 +15,12 @@ # along with this program. If not, see . import re -import os import unittest -from vyos.configsession import ConfigSession, ConfigSessionError +from base_vyostest_shim import VyOSUnitTestSHIM + +from vyos.configsession import ConfigSession +from vyos.configsession import ConfigSessionError from vyos.util import read_file from vyos.util import process_named_running @@ -37,44 +39,40 @@ def get_config_value(key, file=CONFIG_FILE): tmp = re.findall(r'\n{}=+(.*)'.format(key), tmp) return tmp[0] -class TestServicePowerDNS(unittest.TestCase): - def setUp(self): - self.session = ConfigSession(os.getpid()) - +class TestServicePowerDNS(VyOSUnitTestSHIM.TestCase): def tearDown(self): # Delete DNS forwarding configuration - self.session.delete(base_path) - self.session.commit() - del self.session + self.cli_delete(base_path) + self.cli_commit() def test_basic_forwarding(self): # Check basic DNS forwarding settings cache_size = '20' negative_ttl = '120' - self.session.set(base_path + ['cache-size', cache_size]) - self.session.set(base_path + ['negative-ttl', negative_ttl]) + self.cli_set(base_path + ['cache-size', cache_size]) + self.cli_set(base_path + ['negative-ttl', negative_ttl]) # check validate() - allow from must be defined with self.assertRaises(ConfigSessionError): - self.session.commit() + self.cli_commit() for network in allow_from: - self.session.set(base_path + ['allow-from', network]) + self.cli_set(base_path + ['allow-from', network]) # check validate() - listen-address must be defined with self.assertRaises(ConfigSessionError): - self.session.commit() + self.cli_commit() for address in listen_adress: - self.session.set(base_path + ['listen-address', address]) + self.cli_set(base_path + ['listen-address', address]) # configure DNSSEC - self.session.set(base_path + ['dnssec', 'validate']) + self.cli_set(base_path + ['dnssec', 'validate']) # Do not use local /etc/hosts file in name resolution - self.session.set(base_path + ['ignore-hosts-file']) + self.cli_set(base_path + ['ignore-hosts-file']) # commit changes - self.session.commit() + self.cli_commit() # Check configured cache-size tmp = get_config_value('max-cache-entries') @@ -103,16 +101,16 @@ class TestServicePowerDNS(unittest.TestCase): # DNSSEC option testing for network in allow_from: - self.session.set(base_path + ['allow-from', network]) + self.cli_set(base_path + ['allow-from', network]) for address in listen_adress: - self.session.set(base_path + ['listen-address', address]) + self.cli_set(base_path + ['listen-address', address]) options = ['off', 'process-no-validate', 'process', 'log-fail', 'validate'] for option in options: - self.session.set(base_path + ['dnssec', option]) + self.cli_set(base_path + ['dnssec', option]) # commit changes - self.session.commit() + self.cli_commit() tmp = get_config_value('dnssec') self.assertEqual(tmp, option) @@ -124,16 +122,16 @@ class TestServicePowerDNS(unittest.TestCase): # Externe Domain Name Servers (DNS) addresses for network in allow_from: - self.session.set(base_path + ['allow-from', network]) + self.cli_set(base_path + ['allow-from', network]) for address in listen_adress: - self.session.set(base_path + ['listen-address', address]) + self.cli_set(base_path + ['listen-address', address]) nameservers = ['192.0.2.1', '192.0.2.2'] for nameserver in nameservers: - self.session.set(base_path + ['name-server', nameserver]) + self.cli_set(base_path + ['name-server', nameserver]) # commit changes - self.session.commit() + self.cli_commit() tmp = get_config_value(r'\+.', file=FORWARD_FILE) self.assertEqual(tmp, ', '.join(nameservers)) @@ -148,26 +146,26 @@ class TestServicePowerDNS(unittest.TestCase): def test_domain_forwarding(self): for network in allow_from: - self.session.set(base_path + ['allow-from', network]) + self.cli_set(base_path + ['allow-from', network]) for address in listen_adress: - self.session.set(base_path + ['listen-address', address]) + self.cli_set(base_path + ['listen-address', address]) domains = ['vyos.io', 'vyos.net', 'vyos.com'] nameservers = ['192.0.2.1', '192.0.2.2'] for domain in domains: for nameserver in nameservers: - self.session.set(base_path + ['domain', domain, 'server', nameserver]) + self.cli_set(base_path + ['domain', domain, 'server', nameserver]) # Test 'recursion-desired' flag for only one domain if domain == domains[0]: - self.session.set(base_path + ['domain', domain, 'recursion-desired']) + self.cli_set(base_path + ['domain', domain, 'recursion-desired']) # Test 'negative trust anchor' flag for the second domain only if domain == domains[1]: - self.session.set(base_path + ['domain', domain, 'addnta']) + self.cli_set(base_path + ['domain', domain, 'addnta']) # commit changes - self.session.commit() + self.cli_commit() # Test configured name-servers hosts_conf = read_file(HOSTSD_FILE) diff --git a/smoketest/scripts/cli/test_service_https.py b/smoketest/scripts/cli/test_service_https.py index fd0f6bfbd..3ed7655e9 100755 --- a/smoketest/scripts/cli/test_service_https.py +++ b/smoketest/scripts/cli/test_service_https.py @@ -14,28 +14,27 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . -import os import unittest +from base_vyostest_shim import VyOSUnitTestSHIM from vyos.configsession import ConfigSession from vyos.util import run base_path = ['service', 'https'] -class TestHTTPSService(unittest.TestCase): +class TestHTTPSService(VyOSUnitTestSHIM.TestCase): def setUp(self): - self.session = ConfigSession(os.getpid()) # ensure we can also run this test on a live system - so lets clean # out the current configuration :) - self.session.delete(base_path) + self.cli_delete(base_path) def tearDown(self): - self.session.delete(base_path) - self.session.commit() + self.cli_delete(base_path) + self.cli_commit() def test_default(self): - self.session.set(base_path) - self.session.commit() + self.cli_set(base_path) + self.cli_commit() ret = run('sudo /usr/sbin/nginx -t') self.assertEqual(ret, 0) @@ -48,11 +47,11 @@ class TestHTTPSService(unittest.TestCase): test_path = base_path + ['virtual-host', vhost_id] - self.session.set(test_path + ['listen-address', address]) - self.session.set(test_path + ['listen-port', port]) - self.session.set(test_path + ['server-name', name]) + self.cli_set(test_path + ['listen-address', address]) + self.cli_set(test_path + ['listen-port', port]) + self.cli_set(test_path + ['server-name', name]) - self.session.commit() + self.cli_commit() ret = run('sudo /usr/sbin/nginx -t') self.assertEqual(ret, 0) diff --git a/smoketest/scripts/cli/test_service_mdns-repeater.py b/smoketest/scripts/cli/test_service_mdns-repeater.py index e6986b92a..b1092c3e5 100755 --- a/smoketest/scripts/cli/test_service_mdns-repeater.py +++ b/smoketest/scripts/cli/test_service_mdns-repeater.py @@ -14,35 +14,32 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . -import os import unittest +from base_vyostest_shim import VyOSUnitTestSHIM + from vyos.configsession import ConfigSession from vyos.util import process_named_running base_path = ['service', 'mdns', 'repeater'] intf_base = ['interfaces', 'dummy'] -class TestServiceMDNSrepeater(unittest.TestCase): - def setUp(self): - self.session = ConfigSession(os.getpid()) - +class TestServiceMDNSrepeater(VyOSUnitTestSHIM.TestCase): def tearDown(self): - self.session.delete(base_path) - self.session.delete(intf_base + ['dum10']) - self.session.delete(intf_base + ['dum20']) - self.session.commit() - del self.session + self.cli_delete(base_path) + self.cli_delete(intf_base + ['dum10']) + self.cli_delete(intf_base + ['dum20']) + self.cli_commit() def test_service(self): # Service required a configured IP address on the interface - self.session.set(intf_base + ['dum10', 'address', '192.0.2.1/30']) - self.session.set(intf_base + ['dum20', 'address', '192.0.2.5/30']) + self.cli_set(intf_base + ['dum10', 'address', '192.0.2.1/30']) + self.cli_set(intf_base + ['dum20', 'address', '192.0.2.5/30']) - self.session.set(base_path + ['interface', 'dum10']) - self.session.set(base_path + ['interface', 'dum20']) - self.session.commit() + self.cli_set(base_path + ['interface', 'dum10']) + self.cli_set(base_path + ['interface', 'dum20']) + self.cli_commit() # Check for running process self.assertTrue(process_named_running('mdns-repeater')) diff --git a/smoketest/scripts/cli/test_service_pppoe-server.py b/smoketest/scripts/cli/test_service_pppoe-server.py index bfbe88daa..21d1028ce 100755 --- a/smoketest/scripts/cli/test_service_pppoe-server.py +++ b/smoketest/scripts/cli/test_service_pppoe-server.py @@ -1,6 +1,6 @@ #!/usr/bin/env python3 # -# Copyright (C) 020 VyOS maintainers and contributors +# Copyright (C) 2020 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as @@ -27,7 +27,7 @@ local_if = ['interfaces', 'dummy', 'dum667'] ac_name = 'ACN' interface = 'eth0' -class TestServicePPPoEServer(BasicAccelPPPTest.BaseTest): +class TestServicePPPoEServer(BasicAccelPPPTest.TestCase): def setUp(self): self._base_path = ['service', 'pppoe-server'] self._process_name = 'accel-pppd' @@ -37,7 +37,7 @@ class TestServicePPPoEServer(BasicAccelPPPTest.BaseTest): super().setUp() def tearDown(self): - self.session.delete(local_if) + self.cli_delete(local_if) super().tearDown() def verify(self, conf): @@ -66,7 +66,7 @@ class TestServicePPPoEServer(BasicAccelPPPTest.BaseTest): super().verify(conf) def basic_config(self): - self.session.set(local_if + ['address', '192.0.2.1/32']) + self.cli_set(local_if + ['address', '192.0.2.1/32']) self.set(['access-concentrator', ac_name]) self.set(['interface', interface]) @@ -96,7 +96,7 @@ class TestServicePPPoEServer(BasicAccelPPPTest.BaseTest): self.set(['ppp-options', 'interface-cache', interface_cache]) # commit changes - self.session.commit() + self.cli_commit() # Validate configuration values conf = ConfigParser(allow_no_value=True, delimiters='=') @@ -131,7 +131,7 @@ class TestServicePPPoEServer(BasicAccelPPPTest.BaseTest): self.set( ['authentication', 'protocols', 'mschap-v2']) # commit changes - self.session.commit() + self.cli_commit() # Validate configuration values conf = ConfigParser(allow_no_value=True) @@ -157,7 +157,7 @@ class TestServicePPPoEServer(BasicAccelPPPTest.BaseTest): self.set(['client-ip-pool', 'stop', stop]) # commit changes - self.session.commit() + self.cli_commit() # Validate configuration values conf = ConfigParser(allow_no_value=True) @@ -194,7 +194,7 @@ class TestServicePPPoEServer(BasicAccelPPPTest.BaseTest): self.set(['client-ipv6-pool', 'delegate', delegate_prefix, 'delegation-prefix', delegate_mask]) # commit changes - self.session.commit() + self.cli_commit() # Validate configuration values conf = ConfigParser(allow_no_value=True, delimiters='=') diff --git a/smoketest/scripts/cli/test_service_router-advert.py b/smoketest/scripts/cli/test_service_router-advert.py index b80eb3c43..b19c49c6e 100755 --- a/smoketest/scripts/cli/test_service_router-advert.py +++ b/smoketest/scripts/cli/test_service_router-advert.py @@ -15,9 +15,10 @@ # along with this program. If not, see . import re -import os import unittest +from base_vyostest_shim import VyOSUnitTestSHIM + from vyos.configsession import ConfigSession from vyos.util import read_file from vyos.util import process_named_running @@ -33,26 +34,24 @@ def get_config_value(key): tmp = re.findall(r'\n?{}\s+(.*)'.format(key), tmp) return tmp[0].split()[0].replace(';','') -class TestServiceRADVD(unittest.TestCase): +class TestServiceRADVD(VyOSUnitTestSHIM.TestCase): def setUp(self): - self.session = ConfigSession(os.getpid()) - self.session.set(address_base + ['2001:db8::1/64']) + self.cli_set(address_base + ['2001:db8::1/64']) def tearDown(self): - self.session.delete(address_base) - self.session.delete(base_path) - self.session.commit() - del self.session + self.cli_delete(address_base) + self.cli_delete(base_path) + self.cli_commit() def test_single(self): - self.session.set(base_path + ['prefix', '::/64', 'no-on-link-flag']) - self.session.set(base_path + ['prefix', '::/64', 'no-autonomous-flag']) - self.session.set(base_path + ['prefix', '::/64', 'valid-lifetime', 'infinity']) - self.session.set(base_path + ['dnssl', '2001:db8::1234']) - self.session.set(base_path + ['other-config-flag']) + self.cli_set(base_path + ['prefix', '::/64', 'no-on-link-flag']) + self.cli_set(base_path + ['prefix', '::/64', 'no-autonomous-flag']) + self.cli_set(base_path + ['prefix', '::/64', 'valid-lifetime', 'infinity']) + self.cli_set(base_path + ['dnssl', '2001:db8::1234']) + self.cli_set(base_path + ['other-config-flag']) # commit changes - self.session.commit() + self.cli_commit() # verify values tmp = get_config_value('interface') diff --git a/smoketest/scripts/cli/test_service_snmp.py b/smoketest/scripts/cli/test_service_snmp.py index 81045d0b4..008271102 100755 --- a/smoketest/scripts/cli/test_service_snmp.py +++ b/smoketest/scripts/cli/test_service_snmp.py @@ -14,10 +14,10 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . -import os import re import unittest +from base_vyostest_shim import VyOSUnitTestSHIM from vyos.configsession import ConfigSession from vyos.configsession import ConfigSessionError @@ -35,15 +35,11 @@ def get_config_value(key): tmp = re.findall(r'\n?{}\s+(.*)'.format(key), tmp) return tmp[0] -class TestSNMPService(unittest.TestCase): +class TestSNMPService(VyOSUnitTestSHIM.TestCase): def setUp(self): - self.session = ConfigSession(os.getpid()) # ensure we can also run this test on a live system - so lets clean # out the current configuration :) - self.session.delete(base_path) - - def tearDown(self): - del self.session + self.cli_delete(base_path) def test_snmp_basic(self): # Check if SNMP can be configured and service runs @@ -53,19 +49,19 @@ class TestSNMPService(unittest.TestCase): for auth in ['ro', 'rw']: community = 'VyOS' + auth - self.session.set(base_path + ['community', community, 'authorization', auth]) + self.cli_set(base_path + ['community', community, 'authorization', auth]) for client in clients: - self.session.set(base_path + ['community', community, 'client', client]) + self.cli_set(base_path + ['community', community, 'client', client]) for network in networks: - self.session.set(base_path + ['community', community, 'network', network]) + self.cli_set(base_path + ['community', community, 'network', network]) for addr in listen: - self.session.set(base_path + ['listen-address', addr]) + self.cli_set(base_path + ['listen-address', addr]) - self.session.set(base_path + ['contact', 'maintainers@vyos.io']) - self.session.set(base_path + ['location', 'qemu']) + self.cli_set(base_path + ['contact', 'maintainers@vyos.io']) + self.cli_set(base_path + ['location', 'qemu']) - self.session.commit() + self.cli_commit() # verify listen address, it will be returned as # ['unix:/run/snmpd.socket,udp:127.0.0.1:161,udp6:[::1]:161'] @@ -88,30 +84,30 @@ class TestSNMPService(unittest.TestCase): # Check if SNMPv3 can be configured with SHA authentication # and service runs - self.session.set(base_path + ['v3', 'engineid', '000000000000000000000002']) - self.session.set(base_path + ['v3', 'group', 'default', 'mode', 'ro']) + self.cli_set(base_path + ['v3', 'engineid', '000000000000000000000002']) + self.cli_set(base_path + ['v3', 'group', 'default', 'mode', 'ro']) # check validate() - a view must be created before this can be comitted with self.assertRaises(ConfigSessionError): - self.session.commit() + self.cli_commit() - self.session.set(base_path + ['v3', 'view', 'default', 'oid', '1']) - self.session.set(base_path + ['v3', 'group', 'default', 'view', 'default']) + self.cli_set(base_path + ['v3', 'view', 'default', 'oid', '1']) + self.cli_set(base_path + ['v3', 'group', 'default', 'view', 'default']) # create user - self.session.set(base_path + ['v3', 'user', 'vyos', 'auth', 'plaintext-password', 'vyos12345678']) - self.session.set(base_path + ['v3', 'user', 'vyos', 'auth', 'type', 'sha']) - self.session.set(base_path + ['v3', 'user', 'vyos', 'privacy', 'plaintext-password', 'vyos12345678']) - self.session.set(base_path + ['v3', 'user', 'vyos', 'privacy', 'type', 'aes']) - self.session.set(base_path + ['v3', 'user', 'vyos', 'group', 'default']) + self.cli_set(base_path + ['v3', 'user', 'vyos', 'auth', 'plaintext-password', 'vyos12345678']) + self.cli_set(base_path + ['v3', 'user', 'vyos', 'auth', 'type', 'sha']) + self.cli_set(base_path + ['v3', 'user', 'vyos', 'privacy', 'plaintext-password', 'vyos12345678']) + self.cli_set(base_path + ['v3', 'user', 'vyos', 'privacy', 'type', 'aes']) + self.cli_set(base_path + ['v3', 'user', 'vyos', 'group', 'default']) - self.session.commit() + self.cli_commit() # commit will alter the CLI values - check if they have been updated: hashed_password = '4e52fe55fd011c9c51ae2c65f4b78ca93dcafdfe' - tmp = self.session.show_config(base_path + ['v3', 'user', 'vyos', 'auth', 'encrypted-password']).split()[1] + tmp = self._session.show_config(base_path + ['v3', 'user', 'vyos', 'auth', 'encrypted-password']).split()[1] self.assertEqual(tmp, hashed_password) - tmp = self.session.show_config(base_path + ['v3', 'user', 'vyos', 'privacy', 'encrypted-password']).split()[1] + tmp = self._session.show_config(base_path + ['v3', 'user', 'vyos', 'privacy', 'encrypted-password']).split()[1] self.assertEqual(tmp, hashed_password) # TODO: read in config file and check values @@ -123,30 +119,30 @@ class TestSNMPService(unittest.TestCase): # Check if SNMPv3 can be configured with MD5 authentication # and service runs - self.session.set(base_path + ['v3', 'engineid', '000000000000000000000002']) - self.session.set(base_path + ['v3', 'group', 'default', 'mode', 'ro']) + self.cli_set(base_path + ['v3', 'engineid', '000000000000000000000002']) + self.cli_set(base_path + ['v3', 'group', 'default', 'mode', 'ro']) # check validate() - a view must be created before this can be comitted with self.assertRaises(ConfigSessionError): - self.session.commit() + self.cli_commit() - self.session.set(base_path + ['v3', 'view', 'default', 'oid', '1']) - self.session.set(base_path + ['v3', 'group', 'default', 'view', 'default']) + self.cli_set(base_path + ['v3', 'view', 'default', 'oid', '1']) + self.cli_set(base_path + ['v3', 'group', 'default', 'view', 'default']) # create user - self.session.set(base_path + ['v3', 'user', 'vyos', 'auth', 'plaintext-password', 'vyos12345678']) - self.session.set(base_path + ['v3', 'user', 'vyos', 'auth', 'type', 'md5']) - self.session.set(base_path + ['v3', 'user', 'vyos', 'privacy', 'plaintext-password', 'vyos12345678']) - self.session.set(base_path + ['v3', 'user', 'vyos', 'privacy', 'type', 'des']) - self.session.set(base_path + ['v3', 'user', 'vyos', 'group', 'default']) + self.cli_set(base_path + ['v3', 'user', 'vyos', 'auth', 'plaintext-password', 'vyos12345678']) + self.cli_set(base_path + ['v3', 'user', 'vyos', 'auth', 'type', 'md5']) + self.cli_set(base_path + ['v3', 'user', 'vyos', 'privacy', 'plaintext-password', 'vyos12345678']) + self.cli_set(base_path + ['v3', 'user', 'vyos', 'privacy', 'type', 'des']) + self.cli_set(base_path + ['v3', 'user', 'vyos', 'group', 'default']) - self.session.commit() + self.cli_commit() # commit will alter the CLI values - check if they have been updated: hashed_password = '4c67690d45d3dfcd33d0d7e308e370ad' - tmp = self.session.show_config(base_path + ['v3', 'user', 'vyos', 'auth', 'encrypted-password']).split()[1] + tmp = self._session.show_config(base_path + ['v3', 'user', 'vyos', 'auth', 'encrypted-password']).split()[1] self.assertEqual(tmp, hashed_password) - tmp = self.session.show_config(base_path + ['v3', 'user', 'vyos', 'privacy', 'encrypted-password']).split()[1] + tmp = self._session.show_config(base_path + ['v3', 'user', 'vyos', 'privacy', 'encrypted-password']).split()[1] self.assertEqual(tmp, hashed_password) # TODO: read in config file and check values diff --git a/smoketest/scripts/cli/test_service_ssh.py b/smoketest/scripts/cli/test_service_ssh.py index 1e099b0a5..01b875867 100755 --- a/smoketest/scripts/cli/test_service_ssh.py +++ b/smoketest/scripts/cli/test_service_ssh.py @@ -18,6 +18,8 @@ import re import os import unittest +from base_vyostest_shim import VyOSUnitTestSHIM + from vyos.configsession import ConfigSession from vyos.configsession import ConfigSessionError from vyos.util import cmd @@ -34,26 +36,24 @@ def get_config_value(key): tmp = re.findall(f'\n?{key}\s+(.*)', tmp) return tmp -class TestServiceSSH(unittest.TestCase): +class TestServiceSSH(VyOSUnitTestSHIM.TestCase): def setUp(self): - self.session = ConfigSession(os.getpid()) # ensure we can also run this test on a live system - so lets clean # out the current configuration :) - self.session.delete(base_path) + self.cli_delete(base_path) def tearDown(self): # delete testing SSH config - self.session.delete(base_path) - self.session.commit() - del self.session + self.cli_delete(base_path) + self.cli_commit() def test_ssh_default(self): # Check if SSH service runs with default settings - used for checking # behavior of in XML definition - self.session.set(base_path) + self.cli_set(base_path) # commit changes - self.session.commit() + self.cli_commit() # Check configured port port = get_config_value('Port')[0] @@ -64,15 +64,15 @@ class TestServiceSSH(unittest.TestCase): def test_ssh_single_listen_address(self): # Check if SSH service can be configured and runs - self.session.set(base_path + ['port', '1234']) - self.session.set(base_path + ['disable-host-validation']) - self.session.set(base_path + ['disable-password-authentication']) - self.session.set(base_path + ['loglevel', 'verbose']) - self.session.set(base_path + ['client-keepalive-interval', '100']) - self.session.set(base_path + ['listen-address', '127.0.0.1']) + self.cli_set(base_path + ['port', '1234']) + self.cli_set(base_path + ['disable-host-validation']) + self.cli_set(base_path + ['disable-password-authentication']) + self.cli_set(base_path + ['loglevel', 'verbose']) + self.cli_set(base_path + ['client-keepalive-interval', '100']) + self.cli_set(base_path + ['listen-address', '127.0.0.1']) # commit changes - self.session.commit() + self.cli_commit() # Check configured port port = get_config_value('Port')[0] @@ -106,14 +106,14 @@ class TestServiceSSH(unittest.TestCase): # listen ports and listen-addresses ports = ['22', '2222', '2223', '2224'] for port in ports: - self.session.set(base_path + ['port', port]) + self.cli_set(base_path + ['port', port]) addresses = ['127.0.0.1', '::1'] for address in addresses: - self.session.set(base_path + ['listen-address', address]) + self.cli_set(base_path + ['listen-address', address]) # commit changes - self.session.commit() + self.cli_commit() # Check configured port tmp = get_config_value('Port') @@ -131,17 +131,17 @@ class TestServiceSSH(unittest.TestCase): def test_ssh_vrf(self): # Check if SSH service can be bound to given VRF port = '22' - self.session.set(base_path + ['port', port]) - self.session.set(base_path + ['vrf', vrf]) + self.cli_set(base_path + ['port', port]) + self.cli_set(base_path + ['vrf', vrf]) # VRF does yet not exist - an error must be thrown with self.assertRaises(ConfigSessionError): - self.session.commit() + self.cli_commit() - self.session.set(['vrf', 'name', vrf, 'table', '1338']) + self.cli_set(['vrf', 'name', vrf, 'table', '1338']) # commit changes - self.session.commit() + self.cli_commit() # Check configured port tmp = get_config_value('Port') @@ -155,7 +155,7 @@ class TestServiceSSH(unittest.TestCase): self.assertIn(PROCESS_NAME, tmp) # delete VRF - self.session.delete(['vrf', 'name', vrf]) + self.cli_delete(['vrf', 'name', vrf]) if __name__ == '__main__': unittest.main(verbosity=2) diff --git a/smoketest/scripts/cli/test_service_tftp-server.py b/smoketest/scripts/cli/test_service_tftp-server.py index 82e5811ff..aed4c6beb 100755 --- a/smoketest/scripts/cli/test_service_tftp-server.py +++ b/smoketest/scripts/cli/test_service_tftp-server.py @@ -14,11 +14,10 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . -import re -import os import unittest from psutil import process_iter +from base_vyostest_shim import VyOSUnitTestSHIM from vyos.configsession import ConfigSession from vyos.configsession import ConfigSessionError @@ -32,28 +31,26 @@ dummy_if_path = ['interfaces', 'dummy', 'dum69'] address_ipv4 = '192.0.2.1' address_ipv6 = '2001:db8::1' -class TestServiceTFTPD(unittest.TestCase): +class TestServiceTFTPD(VyOSUnitTestSHIM.TestCase): def setUp(self): - self.session = ConfigSession(os.getpid()) - self.session.set(dummy_if_path + ['address', address_ipv4 + '/32']) - self.session.set(dummy_if_path + ['address', address_ipv6 + '/128']) + self.cli_set(dummy_if_path + ['address', address_ipv4 + '/32']) + self.cli_set(dummy_if_path + ['address', address_ipv6 + '/128']) def tearDown(self): - self.session.delete(base_path) - self.session.delete(dummy_if_path) - self.session.commit() - del self.session + self.cli_delete(base_path) + self.cli_delete(dummy_if_path) + self.cli_commit() def test_01_tftpd_single(self): directory = '/tmp' port = '69' # default port - self.session.set(base_path + ['allow-upload']) - self.session.set(base_path + ['directory', directory]) - self.session.set(base_path + ['listen-address', address_ipv4]) + self.cli_set(base_path + ['allow-upload']) + self.cli_set(base_path + ['directory', directory]) + self.cli_set(base_path + ['listen-address', address_ipv4]) # commit changes - self.session.commit() + self.cli_commit() config = read_file('/etc/default/tftpd0') # verify listen IP address @@ -71,13 +68,13 @@ class TestServiceTFTPD(unittest.TestCase): address = [address_ipv4, address_ipv6] port = '70' - self.session.set(base_path + ['directory', directory]) + self.cli_set(base_path + ['directory', directory]) for addr in address: - self.session.set(base_path + ['listen-address', addr]) - self.session.set(base_path + ['port', port]) + self.cli_set(base_path + ['listen-address', addr]) + self.cli_set(base_path + ['port', port]) # commit changes - self.session.commit() + self.cli_commit() for idx in range(0, len(address)): config = read_file(f'/etc/default/tftpd{idx}') diff --git a/smoketest/scripts/cli/test_service_webproxy.py b/smoketest/scripts/cli/test_service_webproxy.py index 3db2daa8f..d47bd452d 100755 --- a/smoketest/scripts/cli/test_service_webproxy.py +++ b/smoketest/scripts/cli/test_service_webproxy.py @@ -14,9 +14,10 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . -import os import unittest +from base_vyostest_shim import VyOSUnitTestSHIM + from vyos.configsession import ConfigSession from vyos.configsession import ConfigSessionError from vyos.util import cmd @@ -29,23 +30,21 @@ base_path = ['service', 'webproxy'] listen_if = 'dum3632' listen_ip = '192.0.2.1' -class TestServiceWebProxy(unittest.TestCase): +class TestServiceWebProxy(VyOSUnitTestSHIM.TestCase): def setUp(self): - self.session = ConfigSession(os.getpid()) - self.session.set(['interfaces', 'dummy', listen_if, 'address', listen_ip + '/32']) + self.cli_set(['interfaces', 'dummy', listen_if, 'address', listen_ip + '/32']) def tearDown(self): - self.session.delete(['interfaces', 'dummy', listen_if]) - self.session.delete(base_path) - self.session.commit() - del self.session + self.cli_delete(['interfaces', 'dummy', listen_if]) + self.cli_delete(base_path) + self.cli_commit() def test_01_basic_proxy(self): default_cache = '100' - self.session.set(base_path + ['listen-address', listen_ip]) + self.cli_set(base_path + ['listen-address', listen_ip]) # commit changes - self.session.commit() + self.cli_commit() config = read_file(PROXY_CONF) self.assertIn(f'http_port {listen_ip}:3128 intercept', config) @@ -84,24 +83,24 @@ class TestServiceWebProxy(unittest.TestCase): block_mine = ['application/pdf', 'application/x-sh'] body_max_size = '4096' - self.session.set(base_path + ['listen-address', listen_ip]) - self.session.set(base_path + ['append-domain', domain]) - self.session.set(base_path + ['default-port', port]) - self.session.set(base_path + ['cache-size', cache_size]) - self.session.set(base_path + ['disable-access-log']) + self.cli_set(base_path + ['listen-address', listen_ip]) + self.cli_set(base_path + ['append-domain', domain]) + self.cli_set(base_path + ['default-port', port]) + self.cli_set(base_path + ['cache-size', cache_size]) + self.cli_set(base_path + ['disable-access-log']) - self.session.set(base_path + ['minimum-object-size', min_obj_size]) - self.session.set(base_path + ['maximum-object-size', max_obj_size]) + self.cli_set(base_path + ['minimum-object-size', min_obj_size]) + self.cli_set(base_path + ['maximum-object-size', max_obj_size]) - self.session.set(base_path + ['outgoing-address', listen_ip]) + self.cli_set(base_path + ['outgoing-address', listen_ip]) for mime in block_mine: - self.session.set(base_path + ['reply-block-mime', mime]) + self.cli_set(base_path + ['reply-block-mime', mime]) - self.session.set(base_path + ['reply-body-max-size', body_max_size]) + self.cli_set(base_path + ['reply-body-max-size', body_max_size]) # commit changes - self.session.commit() + self.cli_commit() config = read_file(PROXY_CONF) self.assertIn(f'http_port {listen_ip}:{port} intercept', config) @@ -132,34 +131,34 @@ class TestServiceWebProxy(unittest.TestCase): ldap_attr = 'cn' ldap_filter = '(cn=%s)' - self.session.set(base_path + ['listen-address', listen_ip, 'disable-transparent']) - self.session.set(base_path + ['authentication', 'children', auth_children]) - self.session.set(base_path + ['authentication', 'credentials-ttl', cred_ttl]) + self.cli_set(base_path + ['listen-address', listen_ip, 'disable-transparent']) + self.cli_set(base_path + ['authentication', 'children', auth_children]) + self.cli_set(base_path + ['authentication', 'credentials-ttl', cred_ttl]) - self.session.set(base_path + ['authentication', 'realm', realm]) - self.session.set(base_path + ['authentication', 'method', 'ldap']) + self.cli_set(base_path + ['authentication', 'realm', realm]) + self.cli_set(base_path + ['authentication', 'method', 'ldap']) # check validate() - LDAP authentication is enabled, but server not set with self.assertRaises(ConfigSessionError): - self.session.commit() - self.session.set(base_path + ['authentication', 'ldap', 'server', ldap_server]) + self.cli_commit() + self.cli_set(base_path + ['authentication', 'ldap', 'server', ldap_server]) # check validate() - LDAP password can not be set when bind-dn is not define - self.session.set(base_path + ['authentication', 'ldap', 'password', ldap_password]) + self.cli_set(base_path + ['authentication', 'ldap', 'password', ldap_password]) with self.assertRaises(ConfigSessionError): - self.session.commit() - self.session.set(base_path + ['authentication', 'ldap', 'bind-dn', ldap_bind_dn]) + self.cli_commit() + self.cli_set(base_path + ['authentication', 'ldap', 'bind-dn', ldap_bind_dn]) # check validate() - LDAP base-dn must be set with self.assertRaises(ConfigSessionError): - self.session.commit() - self.session.set(base_path + ['authentication', 'ldap', 'base-dn', ldap_base_dn]) + self.cli_commit() + self.cli_set(base_path + ['authentication', 'ldap', 'base-dn', ldap_base_dn]) - self.session.set(base_path + ['authentication', 'ldap', 'username-attribute', ldap_attr]) - self.session.set(base_path + ['authentication', 'ldap', 'filter-expression', ldap_filter]) - self.session.set(base_path + ['authentication', 'ldap', 'use-ssl']) + self.cli_set(base_path + ['authentication', 'ldap', 'username-attribute', ldap_attr]) + self.cli_set(base_path + ['authentication', 'ldap', 'filter-expression', ldap_filter]) + self.cli_set(base_path + ['authentication', 'ldap', 'use-ssl']) # commit changes - self.session.commit() + self.cli_commit() config = read_file(PROXY_CONF) self.assertIn(f'http_port {listen_ip}:3128', config) # disable-transparent @@ -175,7 +174,7 @@ class TestServiceWebProxy(unittest.TestCase): self.assertTrue(process_named_running(PROCESS_NAME)) def test_04_cache_peer(self): - self.session.set(base_path + ['listen-address', listen_ip]) + self.cli_set(base_path + ['listen-address', listen_ip]) cache_peers = { 'foo' : '192.0.2.1', @@ -183,12 +182,12 @@ class TestServiceWebProxy(unittest.TestCase): 'baz' : '192.0.2.3', } for peer in cache_peers: - self.session.set(base_path + ['cache-peer', peer, 'address', cache_peers[peer]]) + self.cli_set(base_path + ['cache-peer', peer, 'address', cache_peers[peer]]) if peer == 'baz': - self.session.set(base_path + ['cache-peer', peer, 'type', 'sibling']) + self.cli_set(base_path + ['cache-peer', peer, 'type', 'sibling']) # commit changes - self.session.commit() + self.cli_commit() config = read_file(PROXY_CONF) self.assertIn('never_direct allow all', config) @@ -214,22 +213,22 @@ class TestServiceWebProxy(unittest.TestCase): local_ok = ['10.0.0.0', 'vyos.net'] local_ok_url = ['vyos.net', 'vyos.io'] - self.session.set(base_path + ['listen-address', listen_ip]) - self.session.set(base_path + ['url-filtering', 'squidguard', 'log', 'all']) + self.cli_set(base_path + ['listen-address', listen_ip]) + self.cli_set(base_path + ['url-filtering', 'squidguard', 'log', 'all']) for block in local_block: - self.session.set(base_path + ['url-filtering', 'squidguard', 'local-block', block]) + self.cli_set(base_path + ['url-filtering', 'squidguard', 'local-block', block]) for ok in local_ok: - self.session.set(base_path + ['url-filtering', 'squidguard', 'local-ok', ok]) + self.cli_set(base_path + ['url-filtering', 'squidguard', 'local-ok', ok]) for url in local_block_url: - self.session.set(base_path + ['url-filtering', 'squidguard', 'local-block-url', url]) + self.cli_set(base_path + ['url-filtering', 'squidguard', 'local-block-url', url]) for url in local_ok_url: - self.session.set(base_path + ['url-filtering', 'squidguard', 'local-ok-url', url]) + self.cli_set(base_path + ['url-filtering', 'squidguard', 'local-ok-url', url]) for pattern in local_block_pattern: - self.session.set(base_path + ['url-filtering', 'squidguard', 'local-block-keyword', pattern]) + self.cli_set(base_path + ['url-filtering', 'squidguard', 'local-block-keyword', pattern]) # commit changes - self.session.commit() + self.cli_commit() # Check regular Squid config config = read_file(PROXY_CONF) diff --git a/smoketest/scripts/cli/test_system_acceleration_qat.py b/smoketest/scripts/cli/test_system_acceleration_qat.py index cadb263f5..0a86f58b8 100755 --- a/smoketest/scripts/cli/test_system_acceleration_qat.py +++ b/smoketest/scripts/cli/test_system_acceleration_qat.py @@ -14,22 +14,19 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . -import os import unittest +from base_vyostest_shim import VyOSUnitTestSHIM + from vyos.configsession import ConfigSession from vyos.configsession import ConfigSessionError base_path = ['system', 'acceleration', 'qat'] -class TestSystemLCD(unittest.TestCase): - def setUp(self): - self.session = ConfigSession(os.getpid()) - +class TestIntelQAT(VyOSUnitTestSHIM.TestCase): def tearDown(self): - self.session.delete(base_path) - self.session.commit() - del self.session + self.cli_delete(base_path) + self.cli_commit() def test_basic(self): """ Check if configuration script is in place and that the config @@ -37,11 +34,11 @@ class TestSystemLCD(unittest.TestCase): be extended with QAT autodetection once run on a QAT enabled device """ # configure some system display - self.session.set(base_path) + self.cli_set(base_path) # An error must be thrown if QAT device could not be found with self.assertRaises(ConfigSessionError): - self.session.commit() + self.cli_commit() if __name__ == '__main__': unittest.main(verbosity=2) diff --git a/smoketest/scripts/cli/test_system_ip.py b/smoketest/scripts/cli/test_system_ip.py index 4ad8d537d..e98a4e234 100755 --- a/smoketest/scripts/cli/test_system_ip.py +++ b/smoketest/scripts/cli/test_system_ip.py @@ -14,22 +14,18 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . -import os import unittest +from base_vyostest_shim import VyOSUnitTestSHIM from vyos.configsession import ConfigSession from vyos.util import read_file base_path = ['system', 'ip'] -class TestSystemIP(unittest.TestCase): - def setUp(self): - self.session = ConfigSession(os.getpid()) - +class TestSystemIP(VyOSUnitTestSHIM.TestCase): def tearDown(self): - self.session.delete(base_path) - self.session.commit() - del self.session + self.cli_delete(base_path) + self.cli_commit() def test_system_ip_forwarding(self): # Test if IPv4 forwarding can be disabled globally, default is '1' @@ -37,8 +33,8 @@ class TestSystemIP(unittest.TestCase): all_forwarding = '/proc/sys/net/ipv4/conf/all/forwarding' self.assertEqual(read_file(all_forwarding), '1') - self.session.set(base_path + ['disable-forwarding']) - self.session.commit() + self.cli_set(base_path + ['disable-forwarding']) + self.cli_commit() self.assertEqual(read_file(all_forwarding), '0') @@ -50,9 +46,9 @@ class TestSystemIP(unittest.TestCase): self.assertEqual(read_file(use_neigh), '0') self.assertEqual(read_file(hash_policy), '0') - self.session.set(base_path + ['multipath', 'ignore-unreachable-nexthops']) - self.session.set(base_path + ['multipath', 'layer4-hashing']) - self.session.commit() + self.cli_set(base_path + ['multipath', 'ignore-unreachable-nexthops']) + self.cli_set(base_path + ['multipath', 'layer4-hashing']) + self.cli_commit() self.assertEqual(read_file(use_neigh), '1') self.assertEqual(read_file(hash_policy), '1') @@ -69,8 +65,8 @@ class TestSystemIP(unittest.TestCase): self.assertEqual(read_file(gc_thresh1), '1024') for size in [1024, 2048, 4096, 8192, 16384, 32768]: - self.session.set(base_path + ['arp', 'table-size', str(size)]) - self.session.commit() + self.cli_set(base_path + ['arp', 'table-size', str(size)]) + self.cli_commit() self.assertEqual(read_file(gc_thresh3), str(size)) self.assertEqual(read_file(gc_thresh2), str(size // 2)) diff --git a/smoketest/scripts/cli/test_system_ipv6.py b/smoketest/scripts/cli/test_system_ipv6.py index df69739eb..c9c9e833d 100755 --- a/smoketest/scripts/cli/test_system_ipv6.py +++ b/smoketest/scripts/cli/test_system_ipv6.py @@ -14,9 +14,9 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . -import os import unittest +from base_vyostest_shim import VyOSUnitTestSHIM from vyos.configsession import ConfigSession from vyos.util import read_file @@ -27,30 +27,26 @@ file_disable = '/etc/modprobe.d/vyos_disable_ipv6.conf' file_dad = '/proc/sys/net/ipv6/conf/all/accept_dad' file_multipath = '/proc/sys/net/ipv6/fib_multipath_hash_policy' -class TestSystemIPv6(unittest.TestCase): - def setUp(self): - self.session = ConfigSession(os.getpid()) - +class TestSystemIPv6(VyOSUnitTestSHIM.TestCase): def tearDown(self): - self.session.delete(base_path) - self.session.commit() - del self.session + self.cli_delete(base_path) + self.cli_commit() def test_system_ipv6_forwarding(self): # Test if IPv6 forwarding can be disabled globally, default is '1' # which means forwearding enabled self.assertEqual(read_file(file_forwarding), '1') - self.session.set(base_path + ['disable-forwarding']) - self.session.commit() + self.cli_set(base_path + ['disable-forwarding']) + self.cli_commit() self.assertEqual(read_file(file_forwarding), '0') def test_system_ipv6_disable(self): # Do not assign any IPv6 address on interfaces, this requires a reboot # which can not be tested, but we can read the config file :) - self.session.set(base_path + ['disable']) - self.session.commit() + self.cli_set(base_path + ['disable']) + self.cli_commit() # Verify configuration file self.assertEqual(read_file(file_disable), 'options ipv6 disable_ipv6=1') @@ -61,8 +57,8 @@ class TestSystemIPv6(unittest.TestCase): # Do not assign any IPv6 address on interfaces, this requires a reboot # which can not be tested, but we can read the config file :) - self.session.set(base_path + ['strict-dad']) - self.session.commit() + self.cli_set(base_path + ['strict-dad']) + self.cli_commit() # Verify configuration file self.assertEqual(read_file(file_dad), '2') @@ -73,8 +69,8 @@ class TestSystemIPv6(unittest.TestCase): # Do not assign any IPv6 address on interfaces, this requires a reboot # which can not be tested, but we can read the config file :) - self.session.set(base_path + ['multipath', 'layer4-hashing']) - self.session.commit() + self.cli_set(base_path + ['multipath', 'layer4-hashing']) + self.cli_commit() # Verify configuration file self.assertEqual(read_file(file_multipath), '1') @@ -91,8 +87,8 @@ class TestSystemIPv6(unittest.TestCase): self.assertEqual(read_file(gc_thresh1), '1024') for size in [1024, 2048, 4096, 8192, 16384, 32768]: - self.session.set(base_path + ['neighbor', 'table-size', str(size)]) - self.session.commit() + self.cli_set(base_path + ['neighbor', 'table-size', str(size)]) + self.cli_commit() self.assertEqual(read_file(gc_thresh3), str(size)) self.assertEqual(read_file(gc_thresh2), str(size // 2)) diff --git a/smoketest/scripts/cli/test_system_lcd.py b/smoketest/scripts/cli/test_system_lcd.py index 2bf601e3b..7a39e2986 100755 --- a/smoketest/scripts/cli/test_system_lcd.py +++ b/smoketest/scripts/cli/test_system_lcd.py @@ -14,32 +14,29 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . -import os import unittest +from base_vyostest_shim import VyOSUnitTestSHIM from configparser import ConfigParser + from vyos.configsession import ConfigSession from vyos.util import process_named_running config_file = '/run/LCDd/LCDd.conf' base_path = ['system', 'lcd'] -class TestSystemLCD(unittest.TestCase): - def setUp(self): - self.session = ConfigSession(os.getpid()) - +class TestSystemLCD(VyOSUnitTestSHIM.TestCase): def tearDown(self): - self.session.delete(base_path) - self.session.commit() - del self.session + self.cli_delete(base_path) + self.cli_commit() def test_system_display(self): # configure some system display - self.session.set(base_path + ['device', 'ttyS1']) - self.session.set(base_path + ['model', 'cfa-533']) + self.cli_set(base_path + ['device', 'ttyS1']) + self.cli_set(base_path + ['model', 'cfa-533']) # commit changes - self.session.commit() + self.cli_commit() # load up ini-styled LCDd.conf conf = ConfigParser() diff --git a/smoketest/scripts/cli/test_system_login.py b/smoketest/scripts/cli/test_system_login.py index dfa56e971..8327235fb 100755 --- a/smoketest/scripts/cli/test_system_login.py +++ b/smoketest/scripts/cli/test_system_login.py @@ -14,11 +14,12 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . -import os import re import platform import unittest +from base_vyostest_shim import VyOSUnitTestSHIM + from distutils.version import LooseVersion from platform import release as kernel_version from subprocess import Popen, PIPE @@ -32,42 +33,38 @@ from vyos.template import inc_ip base_path = ['system', 'login'] users = ['vyos1', 'vyos2'] -class TestSystemLogin(unittest.TestCase): - def setUp(self): - self.session = ConfigSession(os.getpid()) - +class TestSystemLogin(VyOSUnitTestSHIM.TestCase): def tearDown(self): # Delete individual users from configuration for user in users: - self.session.delete(base_path + ['user', user]) + self.cli_delete(base_path + ['user', user]) - self.session.commit() - del self.session + self.cli_commit() def test_add_linux_system_user(self): system_user = 'backup' - self.session.set(base_path + ['user', system_user, 'authentication', 'plaintext-password', system_user]) + self.cli_set(base_path + ['user', system_user, 'authentication', 'plaintext-password', system_user]) # check validate() - can not add username which exists on the Debian # base system (UID < 1000) with self.assertRaises(ConfigSessionError): - self.session.commit() + self.cli_commit() - self.session.delete(base_path + ['user', system_user]) + self.cli_delete(base_path + ['user', system_user]) def test_system_login_user(self): # Check if user can be created and we can SSH to localhost - self.session.set(['service', 'ssh', 'port', '22']) + self.cli_set(['service', 'ssh', 'port', '22']) for user in users: name = "VyOS Roxx " + user home_dir = "/tmp/" + user - self.session.set(base_path + ['user', user, 'authentication', 'plaintext-password', user]) - self.session.set(base_path + ['user', user, 'full-name', 'VyOS Roxx']) - self.session.set(base_path + ['user', user, 'home-directory', home_dir]) + self.cli_set(base_path + ['user', user, 'authentication', 'plaintext-password', user]) + self.cli_set(base_path + ['user', user, 'full-name', 'VyOS Roxx']) + self.cli_set(base_path + ['user', user, 'home-directory', home_dir]) - self.session.commit() + self.cli_commit() for user in users: cmd = ['su','-', user] @@ -104,18 +101,18 @@ class TestSystemLogin(unittest.TestCase): radius_port = '2000' radius_timeout = '1' - self.session.set(base_path + ['radius', 'server', radius_server, 'key', radius_key]) - self.session.set(base_path + ['radius', 'server', radius_server, 'port', radius_port]) - self.session.set(base_path + ['radius', 'server', radius_server, 'timeout', radius_timeout]) - self.session.set(base_path + ['radius', 'source-address', radius_source]) - self.session.set(base_path + ['radius', 'source-address', inc_ip(radius_source, 1)]) + self.cli_set(base_path + ['radius', 'server', radius_server, 'key', radius_key]) + self.cli_set(base_path + ['radius', 'server', radius_server, 'port', radius_port]) + self.cli_set(base_path + ['radius', 'server', radius_server, 'timeout', radius_timeout]) + self.cli_set(base_path + ['radius', 'source-address', radius_source]) + self.cli_set(base_path + ['radius', 'source-address', inc_ip(radius_source, 1)]) # check validate() - Only one IPv4 source-address supported with self.assertRaises(ConfigSessionError): - self.session.commit() - self.session.delete(base_path + ['radius', 'source-address', inc_ip(radius_source, 1)]) + self.cli_commit() + self.cli_delete(base_path + ['radius', 'source-address', inc_ip(radius_source, 1)]) - self.session.commit() + self.cli_commit() # this file must be read with higher permissions pam_radius_auth_conf = cmd('sudo cat /etc/pam_radius_auth.conf') @@ -158,18 +155,18 @@ class TestSystemLogin(unittest.TestCase): radius_port = '4000' radius_timeout = '4' - self.session.set(base_path + ['radius', 'server', radius_server, 'key', radius_key]) - self.session.set(base_path + ['radius', 'server', radius_server, 'port', radius_port]) - self.session.set(base_path + ['radius', 'server', radius_server, 'timeout', radius_timeout]) - self.session.set(base_path + ['radius', 'source-address', radius_source]) - self.session.set(base_path + ['radius', 'source-address', inc_ip(radius_source, 1)]) + self.cli_set(base_path + ['radius', 'server', radius_server, 'key', radius_key]) + self.cli_set(base_path + ['radius', 'server', radius_server, 'port', radius_port]) + self.cli_set(base_path + ['radius', 'server', radius_server, 'timeout', radius_timeout]) + self.cli_set(base_path + ['radius', 'source-address', radius_source]) + self.cli_set(base_path + ['radius', 'source-address', inc_ip(radius_source, 1)]) # check validate() - Only one IPv4 source-address supported with self.assertRaises(ConfigSessionError): - self.session.commit() - self.session.delete(base_path + ['radius', 'source-address', inc_ip(radius_source, 1)]) + self.cli_commit() + self.cli_delete(base_path + ['radius', 'source-address', inc_ip(radius_source, 1)]) - self.session.commit() + self.cli_commit() # this file must be read with higher permissions pam_radius_auth_conf = cmd('sudo cat /etc/pam_radius_auth.conf') diff --git a/smoketest/scripts/cli/test_system_nameserver.py b/smoketest/scripts/cli/test_system_nameserver.py index 5610c90c7..50dc466c2 100755 --- a/smoketest/scripts/cli/test_system_nameserver.py +++ b/smoketest/scripts/cli/test_system_nameserver.py @@ -14,12 +14,15 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . -import os import re import unittest -from vyos.configsession import ConfigSession, ConfigSessionError -import vyos.util as util +from base_vyostest_shim import VyOSUnitTestSHIM + +from vyos.configsession import ConfigSession +from vyos.configsession import ConfigSessionError + +from vyos.util import read_file RESOLV_CONF = '/etc/resolv.conf' @@ -27,25 +30,20 @@ test_servers = ['192.0.2.10', '2001:db8:1::100'] base_path = ['system', 'name-server'] def get_name_servers(): - resolv_conf = util.read_file(RESOLV_CONF) + resolv_conf = read_file(RESOLV_CONF) return re.findall(r'\n?nameserver\s+(.*)', resolv_conf) -class TestSystemNameServer(unittest.TestCase): - def setUp(self): - self.session = ConfigSession(os.getpid()) - +class TestSystemNameServer(VyOSUnitTestSHIM.TestCase): def tearDown(self): # Delete existing name servers - self.session.delete(base_path) - self.session.commit() - - del self.session + self.cli_delete(base_path) + self.cli_commit() def test_nameserver_add(self): # Check if server is added to resolv.conf for s in test_servers: - self.session.set(base_path + [s]) - self.session.commit() + self.cli_set(base_path + [s]) + self.cli_commit() servers = get_name_servers() for s in servers: @@ -54,8 +52,8 @@ class TestSystemNameServer(unittest.TestCase): def test_nameserver_delete(self): # Test if a deleted server disappears from resolv.conf for s in test_servers: - self.session.delete(base_path + [s]) - self.session.commit() + self.cli_delete(base_path + [s]) + self.cli_commit() servers = get_name_servers() for s in servers: diff --git a/smoketest/scripts/cli/test_system_ntp.py b/smoketest/scripts/cli/test_system_ntp.py index edb6ad94d..2b86ebd7c 100755 --- a/smoketest/scripts/cli/test_system_ntp.py +++ b/smoketest/scripts/cli/test_system_ntp.py @@ -15,9 +15,10 @@ # along with this program. If not, see . import re -import os import unittest +from base_vyostest_shim import VyOSUnitTestSHIM + from vyos.configsession import ConfigSession from vyos.configsession import ConfigSessionError from vyos.template import address_from_cidr @@ -35,17 +36,15 @@ def get_config_value(key): # remove possible trailing whitespaces return [item.strip() for item in tmp] -class TestSystemNTP(unittest.TestCase): +class TestSystemNTP(VyOSUnitTestSHIM.TestCase): def setUp(self): - self.session = ConfigSession(os.getpid()) # ensure we can also run this test on a live system - so lets clean # out the current configuration :) - self.session.delete(base_path) + self.cli_delete(base_path) def tearDown(self): - self.session.delete(base_path) - self.session.commit() - del self.session + self.cli_delete(base_path) + self.cli_commit() self.assertFalse(process_named_running(PROCESS_NAME)) @@ -57,13 +56,13 @@ class TestSystemNTP(unittest.TestCase): for server in servers: for option in options: - self.session.set(base_path + ['server', server, option]) + self.cli_set(base_path + ['server', server, option]) # Test NTP pool - self.session.set(base_path + ['server', ntp_pool, 'pool']) + self.cli_set(base_path + ['server', ntp_pool, 'pool']) # commit changes - self.session.commit() + self.cli_commit() # Check generated configuration tmp = get_config_value('server') @@ -81,21 +80,21 @@ class TestSystemNTP(unittest.TestCase): # Test the allowed-networks statement listen_address = ['127.0.0.1', '::1'] for listen in listen_address: - self.session.set(base_path + ['listen-address', listen]) + self.cli_set(base_path + ['listen-address', listen]) networks = ['192.0.2.0/24', '2001:db8:1000::/64'] for network in networks: - self.session.set(base_path + ['allow-clients', 'address', network]) + self.cli_set(base_path + ['allow-clients', 'address', network]) # Verify "NTP server not configured" verify() statement with self.assertRaises(ConfigSessionError): - self.session.commit() + self.cli_commit() servers = ['192.0.2.1', '192.0.2.2'] for server in servers: - self.session.set(base_path + ['server', server]) + self.cli_set(base_path + ['server', server]) - self.session.commit() + self.cli_commit() # Check generated client address configuration for network in networks: diff --git a/smoketest/scripts/cli/test_vpn_openconnect.py b/smoketest/scripts/cli/test_vpn_openconnect.py index e27216e09..bf528c8b7 100755 --- a/smoketest/scripts/cli/test_vpn_openconnect.py +++ b/smoketest/scripts/cli/test_vpn_openconnect.py @@ -14,9 +14,10 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . -import os import unittest +from base_vyostest_shim import VyOSUnitTestSHIM + from vyos.configsession import ConfigSession from vyos.util import process_named_running @@ -25,29 +26,24 @@ base_path = ['vpn', 'openconnect'] cert = '/etc/ssl/certs/ssl-cert-snakeoil.pem' cert_key = '/etc/ssl/private/ssl-cert-snakeoil.key' -class TestVpnOpenconnect(unittest.TestCase): - def setUp(self): - self.session = ConfigSession(os.getpid()) - +class TestVpnOpenconnect(VyOSUnitTestSHIM.TestCase): def tearDown(self): # Delete vpn openconnect configuration - self.session.delete(base_path) - self.session.commit() - - del self.session + self.cli_delete(base_path) + self.cli_commit() def test_vpn(self): user = 'vyos_user' password = 'vyos_pass' - self.session.delete(base_path) - self.session.set(base_path + ["authentication", "local-users", "username", user, "password", password]) - self.session.set(base_path + ["authentication", "mode", "local"]) - self.session.set(base_path + ["network-settings", "client-ip-settings", "subnet", "192.0.2.0/24"]) - self.session.set(base_path + ["ssl", "ca-cert-file", cert]) - self.session.set(base_path + ["ssl", "cert-file", cert]) - self.session.set(base_path + ["ssl", "key-file", cert_key]) - - self.session.commit() + self.cli_delete(base_path) + self.cli_set(base_path + ["authentication", "local-users", "username", user, "password", password]) + self.cli_set(base_path + ["authentication", "mode", "local"]) + self.cli_set(base_path + ["network-settings", "client-ip-settings", "subnet", "192.0.2.0/24"]) + self.cli_set(base_path + ["ssl", "ca-cert-file", cert]) + self.cli_set(base_path + ["ssl", "cert-file", cert]) + self.cli_set(base_path + ["ssl", "key-file", cert_key]) + + self.cli_commit() # Check for running process self.assertTrue(process_named_running('ocserv-main')) diff --git a/smoketest/scripts/cli/test_vpn_sstp.py b/smoketest/scripts/cli/test_vpn_sstp.py index 95fe38dd9..033338685 100755 --- a/smoketest/scripts/cli/test_vpn_sstp.py +++ b/smoketest/scripts/cli/test_vpn_sstp.py @@ -23,18 +23,14 @@ ca_cert = '/tmp/ca.crt' ssl_cert = '/tmp/server.crt' ssl_key = '/tmp/server.key' -class TestVPNSSTPServer(BasicAccelPPPTest.BaseTest): +class TestVPNSSTPServer(BasicAccelPPPTest.TestCase): def setUp(self): self._base_path = ['vpn', 'sstp'] self._process_name = 'accel-pppd' self._config_file = '/run/accel-pppd/sstp.conf' self._chap_secrets = '/run/accel-pppd/sstp.chap-secrets' - super().setUp() - def tearDown(self): - super().tearDown() - def basic_config(self): # SSL is mandatory self.set(['ssl', 'ca-cert-file', ca_cert]) diff --git a/smoketest/scripts/cli/test_vrf.py b/smoketest/scripts/cli/test_vrf.py index 4ce470d46..01d2e8c39 100755 --- a/smoketest/scripts/cli/test_vrf.py +++ b/smoketest/scripts/cli/test_vrf.py @@ -19,6 +19,8 @@ import os import json import unittest +from base_vyostest_shim import VyOSUnitTestSHIM + from netifaces import interfaces from vyos.configsession import ConfigSession @@ -39,7 +41,7 @@ def get_vrf_ipv4_routes(vrf): def get_vrf_ipv6_routes(vrf): return json.loads(cmd(f'ip -6 -j route show vrf {vrf}')) -class VRFTest(unittest.TestCase): +class VRFTest(VyOSUnitTestSHIM.TestCase): _interfaces = [] @classmethod @@ -54,13 +56,13 @@ class VRFTest(unittest.TestCase): if not '.' in tmp: cls._interfaces.append(tmp) - def setUp(self): - self.session = ConfigSession(os.getpid()) + # call base-classes classmethod + super(cls, cls).setUpClass() def tearDown(self): # delete all VRFs - self.session.delete(base_path) - self.session.commit() + self.cli_delete(base_path) + self.cli_commit() for vrf in vrfs: self.assertNotIn(vrf, interfaces()) @@ -69,20 +71,20 @@ class VRFTest(unittest.TestCase): for vrf in vrfs: base = base_path + ['name', vrf] description = f'VyOS-VRF-{vrf}' - self.session.set(base + ['description', description]) + self.cli_set(base + ['description', description]) # check validate() - a table ID is mandatory with self.assertRaises(ConfigSessionError): - self.session.commit() + self.cli_commit() - self.session.set(base + ['table', table]) + self.cli_set(base + ['table', table]) if vrf == 'green': - self.session.set(base + ['disable']) + self.cli_set(base + ['disable']) table = str(int(table) + 1) # commit changes - self.session.commit() + self.cli_commit() # Verify VRF configuration table = '1000' @@ -113,11 +115,11 @@ class VRFTest(unittest.TestCase): table = '2000' for vrf in vrfs: base = base_path + ['name', vrf] - self.session.set(base + ['table', str(table)]) + self.cli_set(base + ['table', str(table)]) table = str(int(table) + 1) # commit changes - self.session.commit() + self.cli_commit() # Verify VRF configuration for vrf in vrfs: @@ -129,13 +131,13 @@ class VRFTest(unittest.TestCase): table = '2000' for vrf in vrfs: base = base_path + ['name', vrf] - self.session.set(base + ['table', str(table)]) + self.cli_set(base + ['table', str(table)]) table = str(int(table) + 1) - self.session.set(base_path + ['bind-to-all']) + self.cli_set(base_path + ['bind-to-all']) # commit changes - self.session.commit() + self.cli_commit() # Verify VRF configuration tmp = read_file('/proc/sys/net/ipv4/tcp_l3mdev_accept') @@ -149,31 +151,31 @@ class VRFTest(unittest.TestCase): table = '1000' vrf = vrfs[0] base = base_path + ['name', vrf] - self.session.set(base + ['table', table]) + self.cli_set(base + ['table', table]) # commit changes - self.session.commit() + self.cli_commit() # Check if VRF has been created self.assertTrue(vrf in interfaces()) table = str(int(table) + 1) - self.session.set(base + ['table', table]) + self.cli_set(base + ['table', table]) # check validate() - table ID can not be altered! with self.assertRaises(ConfigSessionError): - self.session.commit() + self.cli_commit() def test_vrf_assign_interface(self): vrf = vrfs[0] table = '5000' - self.session.set(['vrf', 'name', vrf, 'table', table]) + self.cli_set(['vrf', 'name', vrf, 'table', table]) for interface in self._interfaces: section = Section.section(interface) - self.session.set(['interfaces', section, interface, 'vrf', vrf]) + self.cli_set(['interfaces', section, interface, 'vrf', vrf]) # commit changes - self.session.commit() + self.cli_commit() # Verify & cleanup for interface in self._interfaces: @@ -182,7 +184,7 @@ class VRFTest(unittest.TestCase): self.assertEqual(tmp, vrf) # cleanup section = Section.section(interface) - self.session.delete(['interfaces', section, interface, 'vrf']) + self.cli_delete(['interfaces', section, interface, 'vrf']) def test_vrf_static_routes(self): routes = { @@ -206,15 +208,15 @@ class VRFTest(unittest.TestCase): table = '2000' for vrf in vrfs: base = base_path + ['name', vrf] - self.session.set(base + ['table', str(table)]) + self.cli_set(base + ['table', str(table)]) # required interface for leaking to default table - self.session.set(['interfaces', 'ethernet', 'eth0', 'address', '192.0.2.1/24']) + self.cli_set(['interfaces', 'ethernet', 'eth0', 'address', '192.0.2.1/24']) # we also need an interface in "UP" state to install routes - self.session.set(['interfaces', 'dummy', f'dum{table}', 'vrf', vrf]) - self.session.set(['interfaces', 'dummy', f'dum{table}', 'address', '192.0.2.1/24']) - self.session.set(['interfaces', 'dummy', f'dum{table}', 'address', '2001:db8::1/64']) + self.cli_set(['interfaces', 'dummy', f'dum{table}', 'vrf', vrf]) + self.cli_set(['interfaces', 'dummy', f'dum{table}', 'address', '192.0.2.1/24']) + self.cli_set(['interfaces', 'dummy', f'dum{table}', 'address', '2001:db8::1/64']) table = str(int(table) + 1) proto_base = ['protocols', 'vrf', vrf, 'static'] @@ -222,14 +224,14 @@ class VRFTest(unittest.TestCase): route_type = 'route' if is_ipv6(route): route_type = 'route6' - self.session.set(proto_base + [route_type, route, 'next-hop', route_config['next_hop']]) + self.cli_set(proto_base + [route_type, route, 'next-hop', route_config['next_hop']]) if 'distance' in route_config: - self.session.set(proto_base + [route_type, route, 'next-hop', route_config['next_hop'], 'distance', route_config['distance']]) + self.cli_set(proto_base + [route_type, route, 'next-hop', route_config['next_hop'], 'distance', route_config['distance']]) if 'next_hop_vrf' in route_config: - self.session.set(proto_base + [route_type, route, 'next-hop', route_config['next_hop'], 'next-hop-vrf', route_config['next_hop_vrf']]) + self.cli_set(proto_base + [route_type, route, 'next-hop', route_config['next_hop'], 'next-hop-vrf', route_config['next_hop_vrf']]) # commit changes - self.session.commit() + self.cli_commit() # Verify routes table = '2000' @@ -249,9 +251,9 @@ class VRFTest(unittest.TestCase): self.assertTrue(found) # Cleanup - self.session.delete(['protocols', 'vrf', vrf]) - self.session.delete(['interfaces', 'dummy', f'dum{table}']) - self.session.delete(['interfaces', 'ethernet', 'eth0', 'address', '192.0.2.1/24']) + self.cli_delete(['protocols', 'vrf', vrf]) + self.cli_delete(['interfaces', 'dummy', f'dum{table}']) + self.cli_delete(['interfaces', 'ethernet', 'eth0', 'address', '192.0.2.1/24']) table = str(int(table) + 1) -- cgit v1.2.3