summaryrefslogtreecommitdiff
path: root/smoketest/scripts/cli
diff options
context:
space:
mode:
Diffstat (limited to 'smoketest/scripts/cli')
-rw-r--r--smoketest/scripts/cli/base_accel_ppp_test.py20
-rw-r--r--smoketest/scripts/cli/base_interfaces_test.py33
-rwxr-xr-xsmoketest/scripts/cli/test_interfaces_openvpn.py374
-rwxr-xr-xsmoketest/scripts/cli/test_nat.py20
-rwxr-xr-xsmoketest/scripts/cli/test_service_ssh.py32
5 files changed, 441 insertions, 38 deletions
diff --git a/smoketest/scripts/cli/base_accel_ppp_test.py b/smoketest/scripts/cli/base_accel_ppp_test.py
index 56cbf1dd4..e3e5071c1 100644
--- a/smoketest/scripts/cli/base_accel_ppp_test.py
+++ b/smoketest/scripts/cli/base_accel_ppp_test.py
@@ -192,3 +192,23 @@ class BasicAccelPPPTest:
# Check for running process
self.assertTrue(process_named_running(self._process_name))
+
+ #
+ # Disable Radius Accounting
+ #
+ self.delete(['authentication', 'radius', 'server', radius_server, 'acct-port'])
+ self.set(['authentication', 'radius', 'server', radius_server, 'disable-accounting'])
+
+ # commit changes
+ self.session.commit()
+
+ conf.read(self._config_file)
+
+ server = conf['radius']['server'].split(',')
+ self.assertEqual(radius_server, server[0])
+ self.assertEqual(radius_key, server[1])
+ self.assertEqual(f'auth-port={radius_port}', server[2])
+ self.assertEqual(f'acct-port=0', server[3])
+ self.assertEqual(f'req-limit=0', server[4])
+ self.assertEqual(f'fail-time=0', server[5])
+
diff --git a/smoketest/scripts/cli/base_interfaces_test.py b/smoketest/scripts/cli/base_interfaces_test.py
index c6bb5bd1a..653cc91f9 100644
--- a/smoketest/scripts/cli/base_interfaces_test.py
+++ b/smoketest/scripts/cli/base_interfaces_test.py
@@ -22,7 +22,7 @@ from vyos.configsession import ConfigSession
from vyos.ifconfig import Interface
from vyos.util import read_file
from vyos.util import cmd
-from vyos.util import vyos_dict_search
+from vyos.util import dict_search
from vyos.validate import is_intf_addr_assigned, is_ipv6_link_local
class BasicInterfaceTest:
@@ -162,16 +162,45 @@ class BasicInterfaceTest:
""" Testcase if MTU can be changed on interface """
if not self._test_mtu:
return None
+
+ for intf in self._interfaces:
+ base = self._base_path + [intf]
+ self.session.set(base + ['mtu', self._mtu])
+ for option in self._options.get(intf, []):
+ self.session.set(base + option.split())
+
+ # commit interface changes
+ self.session.commit()
+
+ # verify changed MTU
+ for intf in self._interfaces:
+ self._mtu_test(intf)
+
+ def test_change_mtu_1200(self):
+ """ Testcase if MTU can be changed to 1200 on non IPv6 enabled interfaces """
+ if not self._test_mtu:
+ return None
+
+ old_mtu = self._mtu
+ self._mtu = '1200'
+
for intf in self._interfaces:
base = self._base_path + [intf]
self.session.set(base + ['mtu', self._mtu])
+ self.session.set(base + ['ipv6', 'address', 'no-default-link-local'])
+
for option in self._options.get(intf, []):
self.session.set(base + option.split())
+ # commit interface changes
self.session.commit()
+
+ # verify changed MTU
for intf in self._interfaces:
self._mtu_test(intf)
+ self._mtu = old_mtu
+
def test_8021q_vlan(self):
""" Testcase for 802.1q VLAN interfaces """
if not self._test_vlan:
@@ -219,7 +248,7 @@ class BasicInterfaceTest:
for interface in self._interfaces:
for vif_s in self._qinq_range:
tmp = json.loads(cmd(f'ip -d -j link show dev {interface}.{vif_s}'))[0]
- self.assertEqual(vyos_dict_search('linkinfo.info_data.protocol', tmp), '802.1ad')
+ self.assertEqual(dict_search('linkinfo.info_data.protocol', tmp), '802.1ad')
for vif_c in self._vlan_range:
vif = f'{interface}.{vif_s}.{vif_c}'
diff --git a/smoketest/scripts/cli/test_interfaces_openvpn.py b/smoketest/scripts/cli/test_interfaces_openvpn.py
index 5cc62e3e2..9ffb945b9 100755
--- a/smoketest/scripts/cli/test_interfaces_openvpn.py
+++ b/smoketest/scripts/cli/test_interfaces_openvpn.py
@@ -26,6 +26,9 @@ from vyos.configsession import ConfigSessionError
from vyos.util import cmd
from vyos.util import process_named_running
from vyos.util import read_file
+from vyos.template import vyos_inc_ip
+from vyos.template import vyos_netmask_from_cidr
+from vyos.template import vyos_last_host_address
PROCESS_NAME = 'openvpn'
@@ -35,13 +38,15 @@ ssl_cert = '/config/auth/ovpn_test_server.pem'
ssl_key = '/config/auth/ovpn_test_server.key'
dh_pem = '/config/auth/ovpn_test_dh.pem'
s2s_key = '/config/auth/ovpn_test_site2site.key'
+auth_key = '/config/auth/ovpn_test_tls_auth.key'
remote_port = '1194'
protocol = 'udp'
path = []
interface = ''
remote_host = ''
-vrf_name = 'mgmt'
+vrf_name = 'orange'
+dummy_if = 'dum1301'
def get_vrf(interface):
for upper in glob(f'/sys/class/net/{interface}/upper*'):
@@ -54,20 +59,88 @@ def get_vrf(interface):
class TestInterfacesOpenVPN(unittest.TestCase):
def setUp(self):
self.session = ConfigSession(os.getpid())
- self.session.set(['interfaces', 'dummy', 'dum1328', 'address', '192.0.2.1/24'])
+ self.session.set(['interfaces', 'dummy', dummy_if, 'address', '192.0.2.1/32'])
self.session.set(['vrf', 'name', vrf_name, 'table', '12345'])
def tearDown(self):
self.session.delete(base_path)
- self.session.delete(['interfaces', 'dummy', 'dum1328'])
- self.session.delete(['vrf', 'name', vrf_name])
+ self.session.delete(['interfaces', 'dummy', dummy_if])
+ self.session.delete(['vrf'])
self.session.commit()
del self.session
- def test_client_interfaces(self):
- """ Create OpenVPN client interfaces connecting to different
- server IP addresses. Validate configuration afterwards. """
+ def test_client_verify(self):
+ """
+ Create OpenVPN client interface and test verify() steps.
+ """
+ interface = 'vtun2000'
+ path = base_path + [interface]
+ self.session.set(path + ['mode', 'client'])
+
+ # check validate() - cannot specify both "encryption disable-ncp" and
+ # "encryption ncp-ciphers" at the same time
+ self.session.set(path + ['encryption', 'disable-ncp'])
+ self.session.set(path + ['encryption', 'ncp-ciphers', 'aes192gcm'])
+
+ with self.assertRaises(ConfigSessionError):
+ self.session.commit()
+ self.session.delete(path + ['encryption', 'ncp-ciphers'])
+
+ # check validate() - cannot specify local-port in client mode
+ self.session.set(path + ['local-port', '5000'])
+ with self.assertRaises(ConfigSessionError):
+ self.session.commit()
+ self.session.delete(path + ['local-port'])
+
+ # check validate() - cannot specify local-host in client mode
+ self.session.set(path + ['local-host', '127.0.0.1'])
+ with self.assertRaises(ConfigSessionError):
+ self.session.commit()
+ self.session.delete(path + ['local-host'])
+
+ # check validate() - cannot specify protocol tcp-passive in client mode
+ self.session.set(path + ['protocol', 'tcp-passive'])
+ with self.assertRaises(ConfigSessionError):
+ self.session.commit()
+ self.session.delete(path + ['protocol'])
+
+ # check validate() - remote-host must be set in client mode
+ with self.assertRaises(ConfigSessionError):
+ self.session.commit()
+ self.session.set(path + ['remote-host', '192.0.9.9'])
+
+ # check validate() - cannot specify "tls dh-file" in client mode
+ self.session.set(path + ['tls', 'dh-file', dh_pem])
+ with self.assertRaises(ConfigSessionError):
+ self.session.commit()
+ self.session.delete(path + ['tls'])
+
+ # check validate() - must specify one of "shared-secret-key-file" and "tls"
+ with self.assertRaises(ConfigSessionError):
+ self.session.commit()
+ self.session.set(path + ['shared-secret-key-file', s2s_key])
+
+ # check validate() - must specify one of "shared-secret-key-file" and "tls"
+ with self.assertRaises(ConfigSessionError):
+ self.session.commit()
+ self.session.delete(path + ['shared-secret-key-file', s2s_key])
+
+ self.session.set(path + ['tls', 'ca-cert-file', ca_cert])
+ self.session.set(path + ['tls', 'cert-file', ssl_cert])
+ self.session.set(path + ['tls', 'key-file', ssl_key])
+
+ # client commit must pass
+ self.session.commit()
+
+ self.assertTrue(process_named_running(PROCESS_NAME))
+ self.assertIn(interface, interfaces())
+
+ def test_client_interfaces(self):
+ """
+ Create OpenVPN client interfaces connecting to different
+ server IP addresses. Validate configuration afterwards.
+ """
num_range = range(10, 15)
for ii in num_range:
interface = f'vtun{ii}'
@@ -122,11 +195,121 @@ class TestInterfacesOpenVPN(unittest.TestCase):
interface = f'vtun{ii}'
self.assertNotIn(interface, interfaces())
+ def test_server_verify(self):
+ """
+ Create one OpenVPN server interface and check required verify() stages
+ """
+ interface = 'vtun5000'
+ path = base_path + [interface]
+
+ # check validate() - must speciy operating mode
+ self.session.set(path)
+ with self.assertRaises(ConfigSessionError):
+ self.session.commit()
+ self.session.set(path + ['mode', 'server'])
+
+ # check validate() - cannot specify protocol tcp-active in server mode
+ self.session.set(path + ['protocol', 'tcp-active'])
+ with self.assertRaises(ConfigSessionError):
+ self.session.commit()
+ self.session.delete(path + ['protocol'])
+
+ # check validate() - cannot specify local-port in client mode
+ self.session.set(path + ['remote-port', '5000'])
+ with self.assertRaises(ConfigSessionError):
+ self.session.commit()
+ self.session.delete(path + ['remote-port'])
+
+ # check validate() - cannot specify local-host in client mode
+ self.session.set(path + ['remote-host', '127.0.0.1'])
+ with self.assertRaises(ConfigSessionError):
+ self.session.commit()
+ self.session.delete(path + ['remote-host'])
+
+ # check validate() - must specify "tls dh-file" when not using EC keys
+ # in server mode
+ with self.assertRaises(ConfigSessionError):
+ self.session.commit()
+ self.session.set(path + ['tls', 'dh-file', dh_pem])
+
+ # check validate() - must specify "server subnet" or add interface to
+ # bridge in server mode
+ with self.assertRaises(ConfigSessionError):
+ self.session.commit()
+
+ # check validate() - server client-ip-pool is too large
+ # [100.64.0.4 -> 100.127.255.251 = 4194295], maximum is 65536 addresses.
+ self.session.set(path + ['server', 'subnet', '100.64.0.0/10'])
+ with self.assertRaises(ConfigSessionError):
+ self.session.commit()
+
+ # check validate() - cannot specify more than 1 IPv4 and 1 IPv6 server subnet
+ self.session.set(path + ['server', 'subnet', '100.64.0.0/20'])
+ with self.assertRaises(ConfigSessionError):
+ self.session.commit()
+ self.session.delete(path + ['server', 'subnet', '100.64.0.0/10'])
+
+ # check validate() - must specify "tls ca-cert-file"
+ with self.assertRaises(ConfigSessionError):
+ self.session.commit()
+ self.session.set(path + ['tls', 'ca-cert-file', ca_cert])
+
+ # check validate() - must specify "tls cert-file"
+ with self.assertRaises(ConfigSessionError):
+ self.session.commit()
+ self.session.set(path + ['tls', 'cert-file', ssl_cert])
+
+ # check validate() - must specify "tls key-file"
+ with self.assertRaises(ConfigSessionError):
+ self.session.commit()
+ self.session.set(path + ['tls', 'key-file', ssl_key])
+
+ # check validate() - cannot specify "tls role" in client-server mode'
+ self.session.set(path + ['tls', 'role', 'active'])
+ with self.assertRaises(ConfigSessionError):
+ self.session.commit()
+
+ # check validate() - cannot specify "tls role" in client-server mode'
+ self.session.set(path + ['tls', 'auth-file', auth_key])
+ with self.assertRaises(ConfigSessionError):
+ self.session.commit()
+
+ # check validate() - cannot specify "tcp-passive" when "tls role" is "active"
+ self.session.set(path + ['protocol', 'tcp-passive'])
+ with self.assertRaises(ConfigSessionError):
+ self.session.commit()
+ self.session.delete(path + ['protocol'])
+
+ # check validate() - cannot specify "tls dh-file" when "tls role" is "active"
+ self.session.set(path + ['tls', 'dh-file', dh_pem])
+ with self.assertRaises(ConfigSessionError):
+ self.session.commit()
+ self.session.delete(path + ['tls', 'dh-file'])
+
+ # Now test the other path with tls role passive
+ self.session.set(path + ['tls', 'role', 'passive'])
+ # check validate() - cannot specify "tcp-active" when "tls role" is "passive"
+ self.session.set(path + ['protocol', 'tcp-active'])
+ with self.assertRaises(ConfigSessionError):
+ self.session.commit()
+ self.session.delete(path + ['protocol'])
+
+
+ # check validate() - must specify "tls dh-file" when "tls role" is "passive"
+ with self.assertRaises(ConfigSessionError):
+ self.session.commit()
+ self.session.set(path + ['tls', 'dh-file', dh_pem])
- def test_server_interfaces(self):
- """ Create OpenVPN server interfaces using different client subnets.
- Validate configuration afterwards. """
+ self.session.commit()
+
+ self.assertTrue(process_named_running(PROCESS_NAME))
+ self.assertIn(interface, interfaces())
+ def test_server_subnet_topology(self):
+ """
+ Create OpenVPN server interfaces using different client subnets.
+ Validate configuration afterwards.
+ """
auth_hash = 'sha256'
num_range = range(20, 25)
port = ''
@@ -142,6 +325,8 @@ class TestInterfacesOpenVPN(unittest.TestCase):
self.session.set(path + ['mode', 'server'])
self.session.set(path + ['local-port', port])
self.session.set(path + ['server', 'subnet', subnet])
+ self.session.set(path + ['server', 'topology', 'subnet'])
+ self.session.set(path + ['replace-default-route'])
self.session.set(path + ['tls', 'ca-cert-file', ca_cert])
self.session.set(path + ['tls', 'cert-file', ssl_cert])
self.session.set(path + ['tls', 'key-file', ssl_key])
@@ -153,6 +338,82 @@ class TestInterfacesOpenVPN(unittest.TestCase):
for ii in num_range:
interface = f'vtun{ii}'
subnet = f'192.0.{ii}.0/24'
+ start_addr = vyos_inc_ip(subnet, '2')
+ stop_addr = vyos_last_host_address(subnet)
+ port = str(2000 + ii)
+
+ config_file = f'/run/openvpn/{interface}.conf'
+ config = read_file(config_file)
+
+ self.assertIn(f'dev {interface}', config)
+ self.assertIn(f'dev-type tun', config)
+ self.assertIn(f'persist-key', config)
+ self.assertIn(f'proto udp', config) # default protocol
+ self.assertIn(f'auth {auth_hash}', config)
+ self.assertIn(f'cipher aes-192-cbc', config)
+ self.assertIn(f'topology subnet', config)
+ self.assertIn(f'lport {port}', config)
+ self.assertIn(f'push "redirect-gateway def1"', config)
+
+ # TLS options
+ self.assertIn(f'ca {ca_cert}', config)
+ self.assertIn(f'cert {ssl_cert}', config)
+ self.assertIn(f'key {ssl_key}', config)
+ self.assertIn(f'dh {dh_pem}', config)
+
+ # IP pool configuration
+ netmask = IPv4Network(subnet).netmask
+ network = IPv4Network(subnet).network_address
+ self.assertIn(f'server {network} {netmask} nopool', config)
+ self.assertIn(f'ifconfig-pool {start_addr} {stop_addr}', config)
+
+ self.assertTrue(process_named_running(PROCESS_NAME))
+ self.assertEqual(get_vrf(interface), vrf_name)
+ self.assertIn(interface, interfaces())
+
+ # check that no interface remained after deleting them
+ self.session.delete(base_path)
+ self.session.commit()
+
+ for ii in num_range:
+ interface = f'vtun{ii}'
+ self.assertNotIn(interface, interfaces())
+
+ def test_server_net30_topology(self):
+ """
+ Create OpenVPN server interfaces (net30) using different client
+ subnets. Validate configuration afterwards.
+ """
+ auth_hash = 'sha256'
+ num_range = range(20, 25)
+ port = ''
+ for ii in num_range:
+ interface = f'vtun{ii}'
+ subnet = f'192.0.{ii}.0/24'
+ path = base_path + [interface]
+ port = str(2000 + ii)
+
+ self.session.set(path + ['device-type', 'tun'])
+ self.session.set(path + ['encryption', 'cipher', 'aes192'])
+ self.session.set(path + ['hash', auth_hash])
+ self.session.set(path + ['mode', 'server'])
+ self.session.set(path + ['local-port', port])
+ self.session.set(path + ['server', 'subnet', subnet])
+ self.session.set(path + ['server', 'topology', 'net30'])
+ self.session.set(path + ['replace-default-route'])
+ self.session.set(path + ['tls', 'ca-cert-file', ca_cert])
+ self.session.set(path + ['tls', 'cert-file', ssl_cert])
+ self.session.set(path + ['tls', 'key-file', ssl_key])
+ self.session.set(path + ['tls', 'dh-file', dh_pem])
+ self.session.set(path + ['vrf', vrf_name])
+
+ self.session.commit()
+
+ for ii in num_range:
+ interface = f'vtun{ii}'
+ subnet = f'192.0.{ii}.0/24'
+ start_addr = vyos_inc_ip(subnet, '4')
+ stop_addr = vyos_last_host_address(subnet)
port = str(2000 + ii)
config_file = f'/run/openvpn/{interface}.conf'
@@ -166,6 +427,7 @@ class TestInterfacesOpenVPN(unittest.TestCase):
self.assertIn(f'cipher aes-192-cbc', config)
self.assertIn(f'topology net30', config)
self.assertIn(f'lport {port}', config)
+ self.assertIn(f'push "redirect-gateway def1"', config)
# TLS options
self.assertIn(f'ca {ca_cert}', config)
@@ -177,6 +439,7 @@ class TestInterfacesOpenVPN(unittest.TestCase):
netmask = IPv4Network(subnet).netmask
network = IPv4Network(subnet).network_address
self.assertIn(f'server {network} {netmask} nopool', config)
+ self.assertIn(f'ifconfig-pool {start_addr} {stop_addr}', config)
self.assertTrue(process_named_running(PROCESS_NAME))
self.assertEqual(get_vrf(interface), vrf_name)
@@ -190,9 +453,69 @@ class TestInterfacesOpenVPN(unittest.TestCase):
interface = f'vtun{ii}'
self.assertNotIn(interface, interfaces())
+ def test_site2site_verify(self):
+ """
+ Create one OpenVPN site2site interface and check required verify() stages
+ """
+ interface = 'vtun5000'
+ path = base_path + [interface]
+
+ self.session.set(path + ['mode', 'site-to-site'])
+
+ # check validate() - encryption ncp-ciphers cannot be specified in site-to-site mode
+ self.session.set(path + ['encryption', 'ncp-ciphers', 'aes192gcm'])
+ with self.assertRaises(ConfigSessionError):
+ self.session.commit()
+ self.session.delete(path + ['encryption'])
+
+ # check validate() - must specify "local-address" or add interface to bridge
+ with self.assertRaises(ConfigSessionError):
+ self.session.commit()
+ self.session.set(path + ['local-address', '10.0.0.1'])
+ self.session.set(path + ['local-address', '2001:db8:1::1'])
+
+ # check validate() - cannot specify more than 1 IPv4 local-address
+ self.session.set(path + ['local-address', '10.0.0.2'])
+ with self.assertRaises(ConfigSessionError):
+ self.session.commit()
+ self.session.delete(path + ['local-address', '10.0.0.2'])
+
+ # check validate() - cannot specify more than 1 IPv6 local-address
+ self.session.set(path + ['local-address', '2001:db8:1::2'])
+ with self.assertRaises(ConfigSessionError):
+ self.session.commit()
+ self.session.delete(path + ['local-address', '2001:db8:1::2'])
+
+ # check validate() - IPv4 "local-address" requires IPv4 "remote-address"
+ # or IPv4 "local-address subnet"
+ with self.assertRaises(ConfigSessionError):
+ self.session.commit()
+ self.session.set(path + ['remote-address', '192.168.0.1'])
+ self.session.set(path + ['remote-address', '2001:db8:ffff::1'])
+
+ # check validate() - Cannot specify more than 1 IPv4 "remote-address"
+ self.session.set(path + ['remote-address', '192.168.0.2'])
+ with self.assertRaises(ConfigSessionError):
+ self.session.commit()
+ self.session.delete(path + ['remote-address', '192.168.0.2'])
+
+ # check validate() - Cannot specify more than 1 IPv6 "remote-address"
+ self.session.set(path + ['remote-address', '2001:db8:ffff::2'])
+ with self.assertRaises(ConfigSessionError):
+ self.session.commit()
+ self.session.delete(path + ['remote-address', '2001:db8:ffff::2'])
+
+ # check validate() - Must specify one of "shared-secret-key-file" and "tls"
+ with self.assertRaises(ConfigSessionError):
+ self.session.commit()
+ self.session.set(path + ['shared-secret-key-file', s2s_key])
+
+ self.session.commit()
def test_site2site_interfaces(self):
- """ Create two OpenVPN site-to-site interfaces """
+ """
+ Create two OpenVPN site-to-site interfaces
+ """
num_range = range(30, 35)
port = ''
local_address = ''
@@ -250,32 +573,33 @@ if __name__ == '__main__':
subject = '/C=DE/ST=BY/O=VyOS/localityName=Cloud/commonName=vyos/' \
'organizationalUnitName=VyOS/emailAddress=maintainers@vyos.io/'
- if (not os.path.isfile(ssl_key) and not os.path.isfile(ssl_cert) and
- not os.path.isfile(ca_cert) and not os.path.isfile(dh_pem) and
- not os.path.isfile(s2s_key)):
-
+ if not (os.path.isfile(ssl_key) and os.path.isfile(ssl_cert)):
# Generate mandatory SSL certificate
tmp = f'openssl req -newkey rsa:4096 -new -nodes -x509 -days 3650 '\
f'-keyout {ssl_key} -out {ssl_cert} -subj {subject}'
- out = cmd(tmp)
- print(out)
+ print(cmd(tmp))
+ if not os.path.isfile(ca_cert):
# Generate "CA"
tmp = f'openssl req -new -x509 -key {ssl_key} -out {ca_cert} -subj {subject}'
- out = cmd(tmp)
- print(out)
+ print(cmd(tmp))
+ if not os.path.isfile(dh_pem):
# Generate "DH" key
tmp = f'openssl dhparam -out {dh_pem} 2048'
- out = cmd(tmp)
- print(out)
+ print(cmd(tmp))
+ if not os.path.isfile(s2s_key):
# Generate site-2-site key
tmp = f'openvpn --genkey --secret {s2s_key}'
- out = cmd(tmp)
- print(out)
+ print(cmd(tmp))
+
+ if not os.path.isfile(auth_key):
+ # Generate TLS auth key
+ tmp = f'openvpn --genkey --secret {auth_key}'
+ print(cmd(tmp))
- for file in [ca_cert, ssl_cert, ssl_key, dh_pem, s2s_key]:
- cmd(f'sudo chown openvpn:openvpn {file}')
+ for file in [ca_cert, ssl_cert, ssl_key, dh_pem, s2s_key, auth_key]:
+ cmd(f'sudo chown openvpn:openvpn {file}')
unittest.main()
diff --git a/smoketest/scripts/cli/test_nat.py b/smoketest/scripts/cli/test_nat.py
index b5bde743b..43392bde3 100755
--- a/smoketest/scripts/cli/test_nat.py
+++ b/smoketest/scripts/cli/test_nat.py
@@ -22,7 +22,7 @@ import unittest
from vyos.configsession import ConfigSession
from vyos.configsession import ConfigSessionError
from vyos.util import cmd
-from vyos.util import vyos_dict_search
+from vyos.util import dict_search
base_path = ['nat']
src_path = base_path + ['source']
@@ -73,10 +73,10 @@ class TestNAT(unittest.TestCase):
self.assertEqual(data['family'], 'ip')
self.assertEqual(data['table'], 'nat')
- iface = vyos_dict_search('match.right', data['expr'][0])
- direction = vyos_dict_search('match.left.payload.field', data['expr'][1])
- address = vyos_dict_search('match.right.prefix.addr', data['expr'][1])
- mask = vyos_dict_search('match.right.prefix.len', data['expr'][1])
+ iface = dict_search('match.right', data['expr'][0])
+ direction = dict_search('match.left.payload.field', data['expr'][1])
+ address = dict_search('match.right.prefix.addr', data['expr'][1])
+ mask = dict_search('match.right.prefix.len', data['expr'][1])
if int(rule) < 200:
self.assertEqual(direction, 'saddr')
@@ -127,11 +127,11 @@ class TestNAT(unittest.TestCase):
self.assertEqual(data['family'], 'ip')
self.assertEqual(data['table'], 'nat')
- iface = vyos_dict_search('match.right', data['expr'][0])
- direction = vyos_dict_search('match.left.payload.field', data['expr'][1])
- protocol = vyos_dict_search('match.left.payload.protocol', data['expr'][1])
- dnat_addr = vyos_dict_search('dnat.addr', data['expr'][3])
- dnat_port = vyos_dict_search('dnat.port', data['expr'][3])
+ iface = dict_search('match.right', data['expr'][0])
+ direction = dict_search('match.left.payload.field', data['expr'][1])
+ protocol = dict_search('match.left.payload.protocol', data['expr'][1])
+ dnat_addr = dict_search('dnat.addr', data['expr'][3])
+ dnat_port = dict_search('dnat.port', data['expr'][3])
self.assertEqual(direction, 'sport')
self.assertEqual(dnat_addr, '192.0.2.1')
diff --git a/smoketest/scripts/cli/test_service_ssh.py b/smoketest/scripts/cli/test_service_ssh.py
index 0cd00ccce..ea70d8e03 100755
--- a/smoketest/scripts/cli/test_service_ssh.py
+++ b/smoketest/scripts/cli/test_service_ssh.py
@@ -20,12 +20,14 @@ import unittest
from vyos.configsession import ConfigSession
from vyos.configsession import ConfigSessionError
-from vyos.util import read_file
+from vyos.util import cmd
from vyos.util import process_named_running
+from vyos.util import read_file
PROCESS_NAME = 'sshd'
SSHD_CONF = '/run/ssh/sshd_config'
base_path = ['service', 'ssh']
+vrf = 'ssh-test'
def get_config_value(key):
tmp = read_file(SSHD_CONF)
@@ -44,6 +46,8 @@ class TestServiceSSH(unittest.TestCase):
self.session.delete(base_path)
# restore "plain" SSH access
self.session.set(base_path)
+ # delete VRF
+ self.session.delete(['vrf', 'name', vrf])
self.session.commit()
del self.session
@@ -129,5 +133,31 @@ class TestServiceSSH(unittest.TestCase):
# Check for running process
self.assertTrue(process_named_running(PROCESS_NAME))
+ def test_ssh_vrf(self):
+ """ Check if SSH service can be bound to given VRF """
+ port = '22'
+ self.session.set(base_path + ['port', port])
+ self.session.set(base_path + ['vrf', vrf])
+
+ # VRF does yet not exist - an error must be thrown
+ with self.assertRaises(ConfigSessionError):
+ self.session.commit()
+
+ self.session.set(['vrf', 'name', vrf, 'table', '1001'])
+
+ # commit changes
+ self.session.commit()
+
+ # Check configured port
+ tmp = get_config_value('Port')
+ self.assertIn(port, tmp)
+
+ # Check for running process
+ self.assertTrue(process_named_running(PROCESS_NAME))
+
+ # Check for process in VRF
+ tmp = cmd(f'ip vrf pids {vrf}')
+ self.assertIn(PROCESS_NAME, tmp)
+
if __name__ == '__main__':
unittest.main()