summaryrefslogtreecommitdiff
path: root/smoketest/scripts/cli
diff options
context:
space:
mode:
Diffstat (limited to 'smoketest/scripts/cli')
-rw-r--r--smoketest/scripts/cli/base_interfaces_test.py47
-rw-r--r--smoketest/scripts/cli/base_vyostest_shim.py7
-rwxr-xr-xsmoketest/scripts/cli/test_load-balancing_haproxy.py23
-rwxr-xr-xsmoketest/scripts/cli/test_protocols_nhrp.py3
-rwxr-xr-xsmoketest/scripts/cli/test_system_syslog.py260
5 files changed, 275 insertions, 65 deletions
diff --git a/smoketest/scripts/cli/base_interfaces_test.py b/smoketest/scripts/cli/base_interfaces_test.py
index c19bfcfe2..78c807d59 100644
--- a/smoketest/scripts/cli/base_interfaces_test.py
+++ b/smoketest/scripts/cli/base_interfaces_test.py
@@ -38,6 +38,7 @@ from vyos.utils.network import is_intf_addr_assigned
from vyos.utils.network import is_ipv6_link_local
from vyos.utils.network import get_nft_vrf_zone_mapping
from vyos.xml_ref import cli_defined
+from vyos.xml_ref import default_value
dhclient_base_dir = directories['isc_dhclient_dir']
dhclient_process_name = 'dhclient'
@@ -282,6 +283,9 @@ class BasicInterfaceTest:
if not self._test_dhcp or not self._test_vrf:
self.skipTest('not supported')
+ cli_default_metric = default_value(self._base_path + [self._interfaces[0],
+ 'dhcp-options', 'default-route-distance'])
+
vrf_name = 'purple4'
self.cli_set(['vrf', 'name', vrf_name, 'table', '65000'])
@@ -307,7 +311,28 @@ class BasicInterfaceTest:
self.assertIn(str(dhclient_pid), vrf_pids)
# and the commandline has the appropriate options
cmdline = read_file(f'/proc/{dhclient_pid}/cmdline')
- self.assertIn('-e\x00IF_METRIC=210', cmdline) # 210 is the default value
+ self.assertIn(f'-e\x00IF_METRIC={cli_default_metric}', cmdline)
+
+ # T5103: remove interface from VRF instance and move DHCP client
+ # back to default VRF. This must restart the DHCP client process
+ for interface in self._interfaces:
+ self.cli_delete(self._base_path + [interface, 'vrf'])
+
+ self.cli_commit()
+
+ # Validate interface state
+ for interface in self._interfaces:
+ tmp = get_interface_vrf(interface)
+ self.assertEqual(tmp, 'default')
+ # Check if dhclient process runs
+ dhclient_pid = process_named_running(dhclient_process_name, cmdline=interface, timeout=10)
+ self.assertTrue(dhclient_pid)
+ # .. inside the appropriate VRF instance
+ vrf_pids = cmd(f'ip vrf pids {vrf_name}')
+ self.assertNotIn(str(dhclient_pid), vrf_pids)
+ # and the commandline has the appropriate options
+ cmdline = read_file(f'/proc/{dhclient_pid}/cmdline')
+ self.assertIn(f'-e\x00IF_METRIC={cli_default_metric}', cmdline)
self.cli_delete(['vrf', 'name', vrf_name])
@@ -341,6 +366,26 @@ class BasicInterfaceTest:
vrf_pids = cmd(f'ip vrf pids {vrf_name}')
self.assertIn(str(tmp), vrf_pids)
+ # T7135: remove interface from VRF instance and move DHCP client
+ # back to default VRF. This must restart the DHCP client process
+ for interface in self._interfaces:
+ self.cli_delete(self._base_path + [interface, 'vrf'])
+
+ self.cli_commit()
+
+ # Validate interface state
+ for interface in self._interfaces:
+ tmp = get_interface_vrf(interface)
+ self.assertEqual(tmp, 'default')
+
+ # Check if dhclient process runs
+ tmp = process_named_running(dhcp6c_process_name, cmdline=interface, timeout=10)
+ self.assertTrue(tmp)
+ # .. inside the appropriate VRF instance
+ vrf_pids = cmd(f'ip vrf pids {vrf_name}')
+ self.assertNotIn(str(tmp), vrf_pids)
+
+
self.cli_delete(['vrf', 'name', vrf_name])
def test_move_interface_between_vrf_instances(self):
diff --git a/smoketest/scripts/cli/base_vyostest_shim.py b/smoketest/scripts/cli/base_vyostest_shim.py
index a54622700..a89b8dce5 100644
--- a/smoketest/scripts/cli/base_vyostest_shim.py
+++ b/smoketest/scripts/cli/base_vyostest_shim.py
@@ -75,10 +75,11 @@ class VyOSUnitTestSHIM:
cls._session.discard()
cls.fail(cls)
- def cli_set(self, config):
+ def cli_set(self, path, value=None):
if self.debug:
- print('set ' + ' '.join(config))
- self._session.set(config)
+ str = f'set {" ".join(path)} {value}' if value else f'set {" ".join(path)}'
+ print(str)
+ self._session.set(path, value)
def cli_delete(self, config):
if self.debug:
diff --git a/smoketest/scripts/cli/test_load-balancing_haproxy.py b/smoketest/scripts/cli/test_load-balancing_haproxy.py
index 967eb3869..9f412aa95 100755
--- a/smoketest/scripts/cli/test_load-balancing_haproxy.py
+++ b/smoketest/scripts/cli/test_load-balancing_haproxy.py
@@ -498,5 +498,28 @@ class TestLoadBalancingReverseProxy(VyOSUnitTestSHIM.TestCase):
self.assertIn('log /dev/log local5 notice', config)
self.assertIn('log /dev/log local6 crit', config)
+ def test_10_lb_reverse_proxy_http_compression(self):
+ # Setup base
+ self.configure_pki()
+ self.base_config()
+
+ # Configure compression in frontend
+ self.cli_set(base_path + ['service', 'https_front', 'http-compression', 'algorithm', 'gzip'])
+ self.cli_set(base_path + ['service', 'https_front', 'http-compression', 'mime-type', 'text/html'])
+ self.cli_set(base_path + ['service', 'https_front', 'http-compression', 'mime-type', 'text/javascript'])
+ self.cli_set(base_path + ['service', 'https_front', 'http-compression', 'mime-type', 'text/plain'])
+ self.cli_commit()
+
+ # Test compression is present in generated configuration file
+ config = read_file(HAPROXY_CONF)
+ self.assertIn('filter compression', config)
+ self.assertIn('compression algo gzip', config)
+ self.assertIn('compression type text/html text/javascript text/plain', config)
+
+ # Test setting compression without specifying any mime-types fails verification
+ self.cli_delete(base_path + ['service', 'https_front', 'http-compression', 'mime-type'])
+ with self.assertRaises(ConfigSessionError) as e:
+ self.cli_commit()
+
if __name__ == '__main__':
unittest.main(verbosity=2)
diff --git a/smoketest/scripts/cli/test_protocols_nhrp.py b/smoketest/scripts/cli/test_protocols_nhrp.py
index f6d1f1da5..73a760945 100755
--- a/smoketest/scripts/cli/test_protocols_nhrp.py
+++ b/smoketest/scripts/cli/test_protocols_nhrp.py
@@ -17,10 +17,7 @@
import unittest
from base_vyostest_shim import VyOSUnitTestSHIM
-
-from vyos.firewall import find_nftables_rule
from vyos.utils.process import process_named_running
-from vyos.utils.file import read_file
tunnel_path = ['interfaces', 'tunnel']
nhrp_path = ['protocols', 'nhrp']
diff --git a/smoketest/scripts/cli/test_system_syslog.py b/smoketest/scripts/cli/test_system_syslog.py
index a86711119..c3b14e1c0 100755
--- a/smoketest/scripts/cli/test_system_syslog.py
+++ b/smoketest/scripts/cli/test_system_syslog.py
@@ -1,6 +1,6 @@
#!/usr/bin/env python3
#
-# Copyright (C) 2019-2024 VyOS maintainers and contributors
+# Copyright (C) 2019-2025 VyOS maintainers and contributors
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 or later as
@@ -14,24 +14,29 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-import re
import unittest
from base_vyostest_shim import VyOSUnitTestSHIM
from vyos.utils.file import read_file
+from vyos.utils.process import cmd
from vyos.utils.process import process_named_running
from vyos.xml_ref import default_value
PROCESS_NAME = 'rsyslogd'
-RSYSLOG_CONF = '/etc/rsyslog.d/00-vyos.conf'
+RSYSLOG_CONF = '/run/rsyslog/rsyslog.conf'
base_path = ['system', 'syslog']
-def get_config_value(key):
- tmp = read_file(RSYSLOG_CONF)
- tmp = re.findall(r'\n?{}\s+(.*)'.format(key), tmp)
- return tmp[0]
+def get_config(string=''):
+ """
+ Retrieve current "running configuration" from FRR
+ string: search for a specific start string in the configuration
+ """
+ command = 'cat /run/rsyslog/rsyslog.conf'
+ if string:
+ command += f' | sed -n "/^{string}$/,/}}/p"' # }} required to escape } in f-string
+ return cmd(command)
class TestRSYSLOGService(VyOSUnitTestSHIM.TestCase):
@classmethod
@@ -41,6 +46,7 @@ class TestRSYSLOGService(VyOSUnitTestSHIM.TestCase):
# ensure we can also run this test on a live system - so lets clean
# out the current configuration :)
cls.cli_delete(cls, base_path)
+ cls.cli_delete(cls, ['vrf'])
def tearDown(self):
# Check for running process
@@ -53,79 +59,217 @@ class TestRSYSLOGService(VyOSUnitTestSHIM.TestCase):
# Check for running process
self.assertFalse(process_named_running(PROCESS_NAME))
- def test_syslog_basic(self):
- host1 = '127.0.0.10'
- host2 = '127.0.0.20'
-
- self.cli_set(base_path + ['host', host1, 'port', '999'])
- self.cli_set(base_path + ['host', host1, 'facility', 'all', 'level', 'all'])
- self.cli_set(base_path + ['host', host2, 'facility', 'kern', 'level', 'err'])
- self.cli_set(base_path + ['console', 'facility', 'all', 'level', 'warning'])
-
+ def test_console(self):
+ level = 'warning'
+ self.cli_set(base_path + ['console', 'facility', 'all', 'level'], value=level)
self.cli_commit()
- # verify log level and facilities in config file
- # *.warning /dev/console
- # *.* @198.51.100.1:999
- # kern.err @192.0.2.1:514
+
+ rsyslog_conf = get_config()
config = [
- get_config_value('\*.\*'),
- get_config_value('kern.err'),
- get_config_value('\*.warning'),
+ f'if prifilt("*.{level}") then {{', # {{ required to escape { in f-string
+ 'action(type="omfile" file="/dev/console")',
]
- expected = [f'@{host1}:999', f'@{host2}:514', '/dev/console']
+ for tmp in config:
+ self.assertIn(tmp, rsyslog_conf)
- for i in range(0, 3):
- self.assertIn(expected[i], config[i])
- # Check for running process
- self.assertTrue(process_named_running(PROCESS_NAME))
-
- def test_syslog_global(self):
+ def test_basic(self):
hostname = 'vyos123'
- domainname = 'example.local'
- self.cli_set(['system', 'host-name', hostname])
- self.cli_set(['system', 'domain-name', domainname])
- self.cli_set(base_path + ['global', 'marker', 'interval', '600'])
- self.cli_set(base_path + ['global', 'preserve-fqdn'])
- self.cli_set(base_path + ['global', 'facility', 'kern', 'level', 'err'])
+ domain_name = 'example.local'
+ default_marker_interval = default_value(base_path + ['marker', 'interval'])
+
+ facility = {
+ 'auth': {'level': 'info'},
+ 'kern': {'level': 'debug'},
+ 'all': {'level': 'notice'},
+ }
+
+ self.cli_set(['system', 'host-name'], value=hostname)
+ self.cli_set(['system', 'domain-name'], value=domain_name)
+ self.cli_set(base_path + ['preserve-fqdn'])
+
+ for tmp, tmp_options in facility.items():
+ level = tmp_options['level']
+ self.cli_set(base_path + ['local', 'facility', tmp, 'level'], value=level)
self.cli_commit()
- config = read_file(RSYSLOG_CONF)
+ config = get_config('')
expected = [
- '$MarkMessagePeriod 600',
- '$PreserveFQDN on',
- 'kern.err',
- f'$LocalHostName {hostname}.{domainname}',
+ f'module(load="immark" interval="{default_marker_interval}")',
+ 'global(preserveFQDN="on")',
+ f'global(localHostname="{hostname}.{domain_name}")',
]
-
for e in expected:
self.assertIn(e, config)
- # Check for running process
- self.assertTrue(process_named_running(PROCESS_NAME))
- def test_syslog_remote(self):
- rhost = '169.254.0.1'
- default_port = default_value(base_path + ['host', rhost, 'port'])
+ config = get_config('#### GLOBAL LOGGING ####')
+ prifilt = []
+ for tmp, tmp_options in facility.items():
+ if tmp == 'all':
+ tmp = '*'
+ level = tmp_options['level']
+ prifilt.append(f'{tmp}.{level}')
+
+ prifilt.sort()
+ prifilt = ','.join(prifilt)
+
+ self.assertIn(f'if prifilt("{prifilt}") then {{', config)
+ self.assertIn( ' action(', config)
+ self.assertIn( ' type="omfile"', config)
+ self.assertIn( ' file="/var/log/messages"', config)
+ self.assertIn( ' rotation.sizeLimit="524288"', config)
+ self.assertIn( ' rotation.sizeLimitCommand="/usr/sbin/logrotate /etc/logrotate.d/vyos-rsyslog"', config)
+
+ def test_remote(self):
+ rhosts = {
+ '169.254.0.1': {
+ 'facility': {'auth' : {'level': 'info'}},
+ 'protocol': 'udp',
+ },
+ '169.254.0.2': {
+ 'port': '1514',
+ 'protocol': 'udp',
+ },
+ '169.254.0.3': {
+ 'facility': {'auth' : {'level': 'info'},
+ 'kern' : {'level': 'debug'},
+ 'all' : {'level': 'notice'},
+ },
+ 'format': ['include-timezone', 'octet-counted'],
+ 'protocol': 'tcp',
+ 'port': '10514',
+ },
+ }
+ default_port = default_value(base_path + ['remote', next(iter(rhosts)), 'port'])
+ default_protocol = default_value(base_path + ['remote', next(iter(rhosts)), 'protocol'])
+
+ for remote, remote_options in rhosts.items():
+ remote_base = base_path + ['remote', remote]
+
+ if 'port' in remote_options:
+ self.cli_set(remote_base + ['port'], value=remote_options['port'])
- self.cli_set(base_path + ['global', 'facility', 'all', 'level', 'info'])
- self.cli_set(base_path + ['global', 'facility', 'local7', 'level', 'debug'])
- self.cli_set(base_path + ['host', rhost, 'facility', 'all', 'level', 'all'])
- self.cli_set(base_path + ['host', rhost, 'protocol', 'tcp'])
+ if 'facility' in remote_options:
+ for facility, facility_options in remote_options['facility'].items():
+ level = facility_options['level']
+ self.cli_set(remote_base + ['facility', facility, 'level'],
+ value=level)
+
+ if 'format' in remote_options:
+ for format in remote_options['format']:
+ self.cli_set(remote_base + ['format'], value=format)
+
+ if 'protocol' in remote_options:
+ protocol = remote_options['protocol']
+ self.cli_set(remote_base + ['protocol'], value=protocol)
self.cli_commit()
config = read_file(RSYSLOG_CONF)
- self.assertIn(f'*.* @@{rhost}:{default_port}', config)
+ for remote, remote_options in rhosts.items():
+ config = get_config(f'# Remote syslog to {remote}')
+ prifilt = []
+ if 'facility' in remote_options:
+ for facility, facility_options in remote_options['facility'].items():
+ level = facility_options['level']
+ if facility == 'all':
+ facility = '*'
+ prifilt.append(f'{facility}.{level}')
- # Change default port and enable "octet-counting" mode
- port = '10514'
- self.cli_set(base_path + ['host', rhost, 'port', port])
- self.cli_set(base_path + ['host', rhost, 'format', 'octet-counted'])
- self.cli_commit()
+ prifilt.sort()
+ prifilt = ','.join(prifilt)
+ if not prifilt:
+ # Skip test - as we do not render anything if no facility is set
+ continue
+
+ self.assertIn(f'if prifilt("{prifilt}") then {{', config)
+ self.assertIn( ' type="omfwd"', config)
+ self.assertIn(f' target="{remote}"', config)
+
+ port = default_port
+ if 'port' in remote_options:
+ port = remote_options['port']
+ self.assertIn(f'port="{port}"', config)
+
+ protocol = default_protocol
+ if 'protocol' in remote_options:
+ protocol = remote_options['protocol']
+ self.assertIn(f'protocol="{protocol}"', config)
+
+ if 'format' in remote_options:
+ if 'include-timezone' in remote_options['format']:
+ self.assertIn( ' template="SyslogProtocol23Format"', config)
+
+ if 'octet-counted' in remote_options['format']:
+ self.assertIn( ' TCP_Framing="octed-counted"', config)
+ else:
+ self.assertIn( ' TCP_Framing="traditional"', config)
+
+ def test_vrf_source_address(self):
+ rhosts = {
+ '169.254.0.10': { },
+ '169.254.0.11': {
+ 'vrf': {'name' : 'red', 'table' : '12321'},
+ 'source_address' : '169.254.0.11',
+ },
+ '169.254.0.12': {
+ 'vrf': {'name' : 'green', 'table' : '12322'},
+ 'source_address' : '169.254.0.12',
+ },
+ '169.254.0.13': {
+ 'vrf': {'name' : 'blue', 'table' : '12323'},
+ 'source_address' : '169.254.0.13',
+ },
+ }
+ for remote, remote_options in rhosts.items():
+ remote_base = base_path + ['remote', remote]
+ self.cli_set(remote_base + ['facility', 'all'])
+
+ vrf = None
+ if 'vrf' in remote_options:
+ vrf = remote_options['vrf']['name']
+ self.cli_set(['vrf', 'name', vrf, 'table'],
+ value=remote_options['vrf']['table'])
+ self.cli_set(remote_base + ['vrf'], value=vrf)
+
+ if 'source_address' in remote_options:
+ source_address = remote_options['source_address']
+ self.cli_set(remote_base + ['source-address'],
+ value=source_address)
+
+ idx = source_address.split('.')[-1]
+ self.cli_set(['interfaces', 'dummy', f'dum{idx}', 'address'],
+ value=f'{source_address}/32')
+ if vrf:
+ self.cli_set(['interfaces', 'dummy', f'dum{idx}', 'vrf'],
+ value=vrf)
+
+ self.cli_commit()
config = read_file(RSYSLOG_CONF)
- self.assertIn(f'*.* @@(o){rhost}:{port}', config)
+ for remote, remote_options in rhosts.items():
+ config = get_config(f'# Remote syslog to {remote}')
+
+ self.assertIn(f'target="{remote}"', config)
+ if 'vrf' in remote_options:
+ vrf = remote_options['vrf']['name']
+ self.assertIn(f'Device="{vrf}"', config)
+
+ if 'source_address' in remote_options:
+ source_address = remote_options['source_address']
+ self.assertIn(f'Address="{source_address}"', config)
+
+ # Cleanup VRF/Dummy interfaces
+ for remote, remote_options in rhosts.items():
+ if 'vrf' in remote_options:
+ vrf = remote_options['vrf']['name']
+ self.cli_delete(['vrf', 'name', vrf])
+
+ if 'source_address' in remote_options:
+ source_address = remote_options['source_address']
+ idx = source_address.split('.')[-1]
+ self.cli_delete(['interfaces', 'dummy', f'dum{idx}'])
if __name__ == '__main__':
unittest.main(verbosity=2)