path: root/smoketest/scripts
diff options
Diffstat (limited to 'smoketest/scripts')
1 files changed, 103 insertions, 48 deletions
diff --git a/smoketest/scripts/cli/ b/smoketest/scripts/cli/
index 961b7a6f4..de2e9b260 100755
--- a/smoketest/scripts/cli/
+++ b/smoketest/scripts/cli/
@@ -1,6 +1,6 @@
#!/usr/bin/env python3
-# Copyright (C) 2019-2024 VyOS maintainers and contributors
+# Copyright (C) 2019-2025 VyOS maintainers and contributors
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 or later as
@@ -20,18 +20,24 @@ import unittest
from base_vyostest_shim import VyOSUnitTestSHIM
from vyos.utils.file import read_file
+from vyos.utils.process import cmd
from vyos.utils.process import process_named_running
from vyos.xml_ref import default_value
PROCESS_NAME = 'rsyslogd'
-RSYSLOG_CONF = '/etc/rsyslog.d/00-vyos.conf'
+RSYSLOG_CONF = '/run/rsyslog/rsyslog.conf'
base_path = ['system', 'syslog']
-def get_config_value(key):
- tmp = read_file(RSYSLOG_CONF)
- tmp = re.findall(r'\n?{}\s+(.*)'.format(key), tmp)
- return tmp[0]
+def get_config(string=''):
+ """
+ Retrieve current "running configuration" from FRR
+ string: search for a specific start string in the configuration
+ """
+ command = 'cat /run/rsyslog/rsyslog.conf'
+ if string:
+ command += f' | sed -n "/^{string}$/,/}}/p"' # }} required to escape } in f-string
+ return cmd(command)
class TestRSYSLOGService(VyOSUnitTestSHIM.TestCase):
@@ -53,37 +59,72 @@ class TestRSYSLOGService(VyOSUnitTestSHIM.TestCase):
# Check for running process
- def test_syslog_console(self):
- self.cli_set(base_path + ['console', 'facility', 'all', 'level', 'warning'])
+ def test_console(self):
+ level = 'warning'
+ self.cli_set(base_path + ['console', 'facility', 'all', 'level', level])
- self.assertIn('/dev/console', get_config_value('\*.warning'))
- def test_syslog_global(self):
+ rsyslog_conf = get_config()
+ config = [
+ f'if prifilt("*.{level}") then {{', # {{ required to escape { in f-string
+ 'action(type="omfile" file="/dev/console")',
+ ]
+ for tmp in config:
+ self.assertIn(tmp, rsyslog_conf)
+ def test_global(self):
hostname = 'vyos123'
- domainname = 'example.local'
+ domain_name = 'example.local'
+ default_marker_interval = default_value(base_path + ['global',
+ 'marker', 'interval'])
+ facility = {
+ 'auth': {'level': 'info'},
+ 'kern': {'level': 'debug'},
+ 'all': {'level': 'notice'},
+ }
self.cli_set(['system', 'host-name', hostname])
- self.cli_set(['system', 'domain-name', domainname])
- self.cli_set(base_path + ['global', 'marker', 'interval', '600'])
+ self.cli_set(['system', 'domain-name', domain_name])
self.cli_set(base_path + ['global', 'preserve-fqdn'])
- self.cli_set(base_path + ['global', 'facility', 'kern', 'level', 'err'])
+ for tmp, tmp_options in facility.items():
+ level = tmp_options['level']
+ self.cli_set(base_path + ['global', 'facility', tmp, 'level', level])
- config = read_file(RSYSLOG_CONF)
+ config = get_config('')
expected = [
- '$MarkMessagePeriod 600',
- '$PreserveFQDN on',
- 'kern.err',
- f'$LocalHostName {hostname}.{domainname}',
+ f'module(load="immark" interval="{default_marker_interval}")',
+ 'global(preserveFQDN="on")',
+ f'global(localHostname="{hostname}.{domain_name}")',
for e in expected:
self.assertIn(e, config)
- def test_syslog_remote(self):
+ config = get_config('#### GLOBAL LOGGING ####')
+ prifilt = []
+ for tmp, tmp_options in facility.items():
+ if tmp == 'all':
+ tmp = '*'
+ level = tmp_options['level']
+ prifilt.append(f'{tmp}.{level}')
+ prifilt.sort()
+ prifilt = ','.join(prifilt)
+ self.assertIn(f'if prifilt("{prifilt}") then {{', config)
+ self.assertIn( ' action(', config)
+ self.assertIn( ' type="omfile"', config)
+ self.assertIn( ' file="/var/log/messages"', config)
+ self.assertIn( ' queue.size="262144"', config)
+ self.assertIn( ' rotation.sizeLimitCommand="/usr/sbin/logrotate /etc/logrotate.d/vyos-rsyslog"', config)
+ def test_remote(self):
rhosts = {
'': {
- 'facility': {'name' : 'auth', 'level': 'info'},
+ 'facility': {'auth' : {'level': 'info'}},
'protocol': 'udp',
'': {
@@ -91,11 +132,17 @@ class TestRSYSLOGService(VyOSUnitTestSHIM.TestCase):
'protocol': 'udp',
'': {
+ 'facility': {'auth' : {'level': 'info'},
+ 'kern' : {'level': 'debug'},
+ 'all' : {'level': 'notice'},
+ },
'format': ['include-timezone', 'octet-counted'],
'protocol': 'tcp',
+ 'port': '10514',
default_port = default_value(base_path + ['remote', next(iter(rhosts)), 'port'])
+ default_protocol = default_value(base_path + ['remote', next(iter(rhosts)), 'protocol'])
for remote, remote_options in rhosts.items():
remote_base = base_path + ['remote', remote]
@@ -103,13 +150,10 @@ class TestRSYSLOGService(VyOSUnitTestSHIM.TestCase):
if 'port' in remote_options:
self.cli_set(remote_base + ['port', remote_options['port']])
- if ('facility' in remote_options and
- 'name' in remote_options['facility'] and
- 'level' in remote_options['facility']
- ):
- facility = remote_options['facility']['name']
- level = remote_options['facility']['level']
- self.cli_set(remote_base + ['facility', facility, 'level', level])
+ if 'facility' in remote_options:
+ for facility, facility_options in remote_options['facility'].items():
+ level = facility_options['level']
+ self.cli_set(remote_base + ['facility', facility, 'level', level])
if 'format' in remote_options:
for format in remote_options['format']:
@@ -123,32 +167,43 @@ class TestRSYSLOGService(VyOSUnitTestSHIM.TestCase):
config = read_file(RSYSLOG_CONF)
for remote, remote_options in rhosts.items():
- tmp = ' '
- if ('facility' in remote_options and
- 'name' in remote_options['facility'] and
- 'level' in remote_options['facility']
- ):
- facility = remote_options['facility']['name']
- level = remote_options['facility']['level']
- tmp = f'{facility}.{level} '
- tmp += '@'
- if 'protocol' in remote_options and remote_options['protocol'] == 'tcp':
- tmp += '@'
- if 'format' in remote_options and 'octet-counted' in remote_options['format']:
- tmp += '(o)'
+ config = get_config(f'# Remote syslog to {remote}')
+ prifilt = []
+ if 'facility' in remote_options:
+ for facility, facility_options in remote_options['facility'].items():
+ level = facility_options['level']
+ if facility == 'all':
+ facility = '*'
+ prifilt.append(f'{facility}.{level}')
+ prifilt.sort()
+ prifilt = ','.join(prifilt)
+ if not prifilt:
+ # Skip test - as we do not render anything if no facility is set
+ continue
+ self.assertIn(f'if prifilt("{prifilt}") then {{', config)
+ self.assertIn( ' type="omfwd"', config)
+ self.assertIn(f' target="{remote}"', config)
port = default_port
if 'port' in remote_options:
port = remote_options['port']
+ self.assertIn(f'port="{port}"', config)
- tmp += f'{remote}:{port}'
+ protocol = default_protocol
+ if 'protocol' in remote_options:
+ protocol = remote_options['protocol']
+ self.assertIn(f'protocol="{protocol}"', config)
- if 'format' in remote_options and 'include-timezone' in remote_options['format']:
- tmp += ';RSYSLOG_SyslogProtocol23Format'
+ if 'format' in remote_options:
+ if 'include-timezone' in remote_options['format']:
+ self.assertIn( ' template="SyslogProtocol23Format"', config)
- self.assertIn(tmp, config)
+ if 'octet-counted' in remote_options['format']:
+ self.assertIn( ' TCP_Framing="octed-counted"', config)
+ else:
+ self.assertIn( ' TCP_Framing="traditional"', config)
if __name__ == '__main__':