summaryrefslogtreecommitdiff
path: root/smoketest/scripts/cli
diff options
context:
space:
mode:
Diffstat (limited to 'smoketest/scripts/cli')
-rw-r--r--smoketest/scripts/cli/base_interfaces_test.py7
-rwxr-xr-xsmoketest/scripts/cli/test_interfaces_macsec.py136
-rwxr-xr-xsmoketest/scripts/cli/test_interfaces_wireless.py10
-rwxr-xr-xsmoketest/scripts/cli/test_interfaces_wirelessmodem.py2
-rwxr-xr-xsmoketest/scripts/cli/test_service_dns_dynamic.py22
-rwxr-xr-xsmoketest/scripts/cli/test_service_dns_forwarding.py192
-rwxr-xr-xsmoketest/scripts/cli/test_service_mdns-repeater.py4
-rwxr-xr-xsmoketest/scripts/cli/test_service_pppoe-server.py8
-rwxr-xr-xsmoketest/scripts/cli/test_service_router-advert.py6
-rwxr-xr-xsmoketest/scripts/cli/test_service_snmp.py14
-rwxr-xr-xsmoketest/scripts/cli/test_service_ssh.py15
-rwxr-xr-xsmoketest/scripts/cli/test_system_lcd.py9
-rwxr-xr-xsmoketest/scripts/cli/test_system_ntp.py13
-rwxr-xr-xsmoketest/scripts/cli/test_vpn_openconnect.py8
14 files changed, 344 insertions, 102 deletions
diff --git a/smoketest/scripts/cli/base_interfaces_test.py b/smoketest/scripts/cli/base_interfaces_test.py
index 14ec7e137..047c19dd0 100644
--- a/smoketest/scripts/cli/base_interfaces_test.py
+++ b/smoketest/scripts/cli/base_interfaces_test.py
@@ -14,12 +14,15 @@
import os
import unittest
+import json
from netifaces import ifaddresses, AF_INET, AF_INET6
from vyos.configsession import ConfigSession
from vyos.ifconfig import Interface
from vyos.util import read_file
+from vyos.util import cmd
+from vyos.util import vyos_dict_search
from vyos.validate import is_intf_addr_assigned, is_ipv6_link_local
class BasicInterfaceTest:
@@ -212,8 +215,12 @@ class BasicInterfaceTest:
self.session.set(base + ['address', address])
self.session.commit()
+
for interface in self._interfaces:
for vif_s in self._qinq_range:
+ tmp = json.loads(cmd(f'ip -d -j link show dev {interface}.{vif_s}'))[0]
+ self.assertEqual(vyos_dict_search('linkinfo.info_data.protocol', tmp), '802.1ad')
+
for vif_c in self._vlan_range:
vif = f'{interface}.{vif_s}.{vif_c}'
for address in self._test_addr:
diff --git a/smoketest/scripts/cli/test_interfaces_macsec.py b/smoketest/scripts/cli/test_interfaces_macsec.py
index 0f1b6486d..6d1be86ba 100755
--- a/smoketest/scripts/cli/test_interfaces_macsec.py
+++ b/smoketest/scripts/cli/test_interfaces_macsec.py
@@ -16,15 +16,17 @@
import re
import unittest
-from psutil import process_iter
-from vyos.ifconfig import Section
from base_interfaces_test import BasicInterfaceTest
+from netifaces import interfaces
+
from vyos.configsession import ConfigSessionError
+from vyos.ifconfig import Section
from vyos.util import read_file
+from vyos.util import process_named_running
-def get_config_value(intf, key):
- tmp = read_file(f'/run/wpa_supplicant/{intf}.conf')
+def get_config_value(interface, key):
+ tmp = read_file(f'/run/wpa_supplicant/{interface}.conf')
tmp = re.findall(r'\n?{}=(.*)'.format(key), tmp)
return tmp[0]
@@ -32,71 +34,123 @@ class MACsecInterfaceTest(BasicInterfaceTest.BaseTest):
def setUp(self):
super().setUp()
self._base_path = ['interfaces', 'macsec']
- self._options = {
- 'macsec0': ['source-interface eth0',
- 'security cipher gcm-aes-128']
- }
+ self._options = { 'macsec0': ['source-interface eth0', 'security cipher gcm-aes-128'] }
# if we have a physical eth1 interface, add a second macsec instance
if 'eth1' in Section.interfaces("ethernet"):
- macsec = { 'macsec1': ['source-interface eth1', 'security cipher gcm-aes-128'] }
+ macsec = { 'macsec1': [f'source-interface eth1', 'security cipher gcm-aes-128'] }
self._options.update(macsec)
self._interfaces = list(self._options)
def test_encryption(self):
- """ MACsec can be operating in authentication and encryption
- mode - both using different mandatory settings, lets test
- encryption as the basic authentication test has been performed
- using the base class tests """
- intf = 'macsec0'
- src_intf = 'eth0'
+ """ MACsec can be operating in authentication and encryption mode - both
+ using different mandatory settings, lets test encryption as the basic
+ authentication test has been performed using the base class tests """
+
mak_cak = '232e44b7fda6f8e2d88a07bf78a7aff4'
mak_ckn = '40916f4b23e3d548ad27eedd2d10c6f98c2d21684699647d63d41b500dfe8836'
replay_window = '64'
- self.session.set(self._base_path + [intf, 'security', 'encrypt'])
- # check validate() - Cipher suite must be set for MACsec
- with self.assertRaises(ConfigSessionError):
- self.session.commit()
- self.session.set(self._base_path + [intf, 'security', 'cipher', 'gcm-aes-128'])
+ for interface, option_value in self._options.items():
+ for option in option_value:
+ if option.split()[0] == 'source-interface':
+ src_interface = option.split()[1]
- # check validate() - Physical source interface must be set for MACsec
- with self.assertRaises(ConfigSessionError):
+ self.session.set(self._base_path + [interface] + option.split())
+
+ # Encrypt link
+ self.session.set(self._base_path + [interface, 'security', 'encrypt'])
+
+ # check validate() - Physical source interface MTU must be higher then our MTU
+ self.session.set(self._base_path + [interface, 'mtu', '1500'])
+ with self.assertRaises(ConfigSessionError):
+ self.session.commit()
+ self.session.delete(self._base_path + [interface, 'mtu'])
+
+ # check validate() - MACsec security keys mandartory when encryption is enabled
+ with self.assertRaises(ConfigSessionError):
+ self.session.commit()
+ self.session.set(self._base_path + [interface, 'security', 'mka', 'cak', mak_cak])
+
+ # check validate() - MACsec security keys mandartory when encryption is enabled
+ with self.assertRaises(ConfigSessionError):
+ self.session.commit()
+ self.session.set(self._base_path + [interface, 'security', 'mka', 'ckn', mak_ckn])
+
+ self.session.set(self._base_path + [interface, 'security', 'replay-window', replay_window])
+
+ # final commit of settings
self.session.commit()
- self.session.set(self._base_path + [intf, 'source-interface', src_intf])
- # check validate() - MACsec security keys mandartory when encryption is enabled
+ tmp = get_config_value(src_interface, 'macsec_integ_only')
+ self.assertTrue("0" in tmp)
+
+ tmp = get_config_value(src_interface, 'mka_cak')
+ self.assertTrue(mak_cak in tmp)
+
+ tmp = get_config_value(src_interface, 'mka_ckn')
+ self.assertTrue(mak_ckn in tmp)
+
+ # check that the default priority of 255 is programmed
+ tmp = get_config_value(src_interface, 'mka_priority')
+ self.assertTrue("255" in tmp)
+
+ tmp = get_config_value(src_interface, 'macsec_replay_window')
+ self.assertTrue(replay_window in tmp)
+
+ tmp = read_file(f'/sys/class/net/{interface}/mtu')
+ self.assertEqual(tmp, '1460')
+
+ # Check for running process
+ self.assertTrue(process_named_running('wpa_supplicant'))
+
+ def test_mandatory_toptions(self):
+ interface = 'macsec1'
+ self.session.set(self._base_path + [interface])
+
+ # check validate() - source interface is mandatory
with self.assertRaises(ConfigSessionError):
self.session.commit()
- self.session.set(self._base_path + [intf, 'security', 'mka', 'cak', mak_cak])
+ self.session.set(self._base_path + [interface, 'source-interface', 'eth0'])
- # check validate() - MACsec security keys mandartory when encryption is enabled
+ # check validate() - cipher is mandatory
with self.assertRaises(ConfigSessionError):
self.session.commit()
- self.session.set(self._base_path + [intf, 'security', 'mka', 'ckn', mak_ckn])
+ self.session.set(self._base_path + [interface, 'security', 'cipher', 'gcm-aes-128'])
- self.session.set(self._base_path + [intf, 'security', 'replay-window', replay_window])
+ # final commit and verify
self.session.commit()
+ self.assertIn(interface, interfaces())
- tmp = get_config_value(src_intf, 'macsec_integ_only')
- self.assertTrue("0" in tmp)
+ def test_source_interface(self):
+ """ Ensure source-interface can bot be part of any other bond or bridge """
- tmp = get_config_value(src_intf, 'mka_cak')
- self.assertTrue(mak_cak in tmp)
+ base_bridge = ['interfaces', 'bridge', 'br200']
+ base_bond = ['interfaces', 'bonding', 'bond200']
- tmp = get_config_value(src_intf, 'mka_ckn')
- self.assertTrue(mak_ckn in tmp)
+ for interface, option_value in self._options.items():
+ for option in option_value:
+ self.session.set(self._base_path + [interface] + option.split())
+ if option.split()[0] == 'source-interface':
+ src_interface = option.split()[1]
- # check that the default priority of 255 is programmed
- tmp = get_config_value(src_intf, 'mka_priority')
- self.assertTrue("255" in tmp)
+ self.session.set(base_bridge + ['member', 'interface', src_interface])
+ # check validate() - Source interface must not already be a member of a bridge
+ with self.assertRaises(ConfigSessionError):
+ self.session.commit()
+ self.session.delete(base_bridge)
- tmp = get_config_value(src_intf, 'macsec_replay_window')
- self.assertTrue(replay_window in tmp)
+ self.session.set(base_bond + ['member', 'interface', src_interface])
+ # check validate() - Source interface must not already be a member of a bridge
+ with self.assertRaises(ConfigSessionError):
+ self.session.commit()
+ self.session.delete(base_bond)
- # Check for running process
- self.assertTrue("wpa_supplicant" in (p.name() for p in process_iter()))
+ # final commit and verify
+ self.session.commit()
+ self.assertIn(interface, interfaces())
if __name__ == '__main__':
unittest.main()
+
diff --git a/smoketest/scripts/cli/test_interfaces_wireless.py b/smoketest/scripts/cli/test_interfaces_wireless.py
index 691f633b7..0e93b6432 100755
--- a/smoketest/scripts/cli/test_interfaces_wireless.py
+++ b/smoketest/scripts/cli/test_interfaces_wireless.py
@@ -19,8 +19,7 @@ import re
import unittest
from base_interfaces_test import BasicInterfaceTest
-from psutil import process_iter
-
+from vyos.util import process_named_running
from vyos.util import check_kmod
from vyos.util import read_file
@@ -54,10 +53,10 @@ class WirelessInterfaceTest(BasicInterfaceTest.BaseTest):
for option, option_value in self._options.items():
if 'type access-point' in option_value:
# Check for running process
- self.assertIn('hostapd', (p.name() for p in process_iter()))
+ self.assertTrue(process_named_running('hostapd'))
elif 'type station' in option_value:
# Check for running process
- self.assertIn('wpa_supplicant', (p.name() for p in process_iter()))
+ self.assertTrue(process_named_running('wpa_supplicant'))
else:
self.assertTrue(False)
@@ -137,8 +136,7 @@ class WirelessInterfaceTest(BasicInterfaceTest.BaseTest):
self.assertIn(value, tmp)
# Check for running process
- self.assertIn('hostapd', (p.name() for p in process_iter()))
-
+ self.assertTrue(process_named_running('hostapd'))
if __name__ == '__main__':
check_kmod('mac80211_hwsim')
diff --git a/smoketest/scripts/cli/test_interfaces_wirelessmodem.py b/smoketest/scripts/cli/test_interfaces_wirelessmodem.py
index 40cd03b93..efc9c0e98 100755
--- a/smoketest/scripts/cli/test_interfaces_wirelessmodem.py
+++ b/smoketest/scripts/cli/test_interfaces_wirelessmodem.py
@@ -43,7 +43,7 @@ class WWANInterfaceTest(unittest.TestCase):
def test_wlm_1(self):
for interface in self._interfaces:
self.session.set(base_path + [interface, 'no-peer-dns'])
- self.session.set(base_path + [interface, 'ondemand'])
+ self.session.set(base_path + [interface, 'connect-on-demand'])
# check validate() - APN must be configure
with self.assertRaises(ConfigSessionError):
diff --git a/smoketest/scripts/cli/test_service_dns_dynamic.py b/smoketest/scripts/cli/test_service_dns_dynamic.py
index be52360ed..c7ac87135 100755
--- a/smoketest/scripts/cli/test_service_dns_dynamic.py
+++ b/smoketest/scripts/cli/test_service_dns_dynamic.py
@@ -19,10 +19,12 @@ import os
import unittest
from getpass import getuser
-from psutil import process_iter
-from vyos.configsession import ConfigSession, ConfigSessionError
+from vyos.configsession import ConfigSession
+from vyos.configsession import ConfigSessionError
from vyos.util import read_file
+from vyos.util import process_named_running
+PROCESS_NAME = 'ddclient'
DDCLIENT_CONF = '/run/ddclient/ddclient.conf'
base_path = ['service', 'dns', 'dynamic']
@@ -32,17 +34,6 @@ def get_config_value(key):
tmp = tmp[0].rstrip(',')
return tmp
-def check_process():
- """
- Check for running process, process name changes dynamically e.g.
- "ddclient - sleeping for 270 seconds", thus we need a different approach
- """
- running = False
- for p in process_iter():
- if "ddclient" in p.name():
- running = True
- return running
-
class TestServiceDDNS(unittest.TestCase):
def setUp(self):
self.session = ConfigSession(os.getpid())
@@ -104,8 +95,7 @@ class TestServiceDDNS(unittest.TestCase):
self.assertTrue(pwd == "'" + password + "'")
# Check for running process
- self.assertTrue(check_process())
-
+ self.assertTrue(process_named_running(PROCESS_NAME))
def test_rfc2136(self):
""" Check if DDNS service can be configured and runs """
@@ -135,7 +125,7 @@ class TestServiceDDNS(unittest.TestCase):
# TODO: inspect generated configuration file
# Check for running process
- self.assertTrue(check_process())
+ self.assertTrue(process_named_running(PROCESS_NAME))
if __name__ == '__main__':
unittest.main()
diff --git a/smoketest/scripts/cli/test_service_dns_forwarding.py b/smoketest/scripts/cli/test_service_dns_forwarding.py
new file mode 100755
index 000000000..5e2f3dfbd
--- /dev/null
+++ b/smoketest/scripts/cli/test_service_dns_forwarding.py
@@ -0,0 +1,192 @@
+#!/usr/bin/env python3
+#
+# Copyright (C) 2019-2020 VyOS maintainers and contributors
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License version 2 or later as
+# published by the Free Software Foundation.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+import re
+import os
+import unittest
+
+from vyos.configsession import ConfigSession, ConfigSessionError
+from vyos.util import read_file
+from vyos.util import process_named_running
+
+CONFIG_FILE = '/run/powerdns/recursor.conf'
+FORWARD_FILE = '/run/powerdns/recursor.forward-zones.conf'
+HOSTSD_FILE = '/run/powerdns/recursor.vyos-hostsd.conf.lua'
+PROCESS_NAME= 'pdns-r/worker'
+
+base_path = ['service', 'dns', 'forwarding']
+
+allow_from = ['192.0.2.0/24', '2001:db8::/32']
+listen_adress = ['127.0.0.1', '::1']
+
+def get_config_value(key, file=CONFIG_FILE):
+ tmp = read_file(file)
+ tmp = re.findall(r'\n{}=+(.*)'.format(key), tmp)
+ return tmp[0]
+
+class TestServicePowerDNS(unittest.TestCase):
+ def setUp(self):
+ self.session = ConfigSession(os.getpid())
+
+ def tearDown(self):
+ # Delete DNS forwarding configuration
+ self.session.delete(base_path)
+ self.session.commit()
+ del self.session
+
+ def test_basic_forwarding(self):
+ """ Check basic DNS forwarding settings """
+ cache_size = '20'
+ negative_ttl = '120'
+
+ self.session.set(base_path + ['cache-size', cache_size])
+ self.session.set(base_path + ['negative-ttl', negative_ttl])
+
+ # check validate() - allow from must be defined
+ with self.assertRaises(ConfigSessionError):
+ self.session.commit()
+ for network in allow_from:
+ self.session.set(base_path + ['allow-from', network])
+
+ # check validate() - listen-address must be defined
+ with self.assertRaises(ConfigSessionError):
+ self.session.commit()
+ for address in listen_adress:
+ self.session.set(base_path + ['listen-address', address])
+
+ # configure DNSSEC
+ self.session.set(base_path + ['dnssec', 'validate'])
+
+ # Do not use local /etc/hosts file in name resolution
+ self.session.set(base_path + ['ignore-hosts-file'])
+
+ # commit changes
+ self.session.commit()
+
+ # Check configured cache-size
+ tmp = get_config_value('max-cache-entries')
+ self.assertEqual(tmp, cache_size)
+
+ # Networks allowed to query this server
+ tmp = get_config_value('allow-from')
+ self.assertEqual(tmp, ','.join(allow_from))
+
+ # Addresses to listen for DNS queries
+ tmp = get_config_value('local-address')
+ self.assertEqual(tmp, ','.join(listen_adress))
+
+ # Maximum amount of time negative entries are cached
+ tmp = get_config_value('max-negative-ttl')
+ self.assertEqual(tmp, negative_ttl)
+
+ # Do not use local /etc/hosts file in name resolution
+ tmp = get_config_value('export-etc-hosts')
+ self.assertEqual(tmp, 'no')
+
+ # Check for running process
+ self.assertTrue(process_named_running(PROCESS_NAME))
+
+ def test_dnssec(self):
+ """ DNSSEC option testing """
+
+ for network in allow_from:
+ self.session.set(base_path + ['allow-from', network])
+ for address in listen_adress:
+ self.session.set(base_path + ['listen-address', address])
+
+ options = ['off', 'process-no-validate', 'process', 'log-fail', 'validate']
+ for option in options:
+ self.session.set(base_path + ['dnssec', option])
+
+ # commit changes
+ self.session.commit()
+
+ tmp = get_config_value('dnssec')
+ self.assertEqual(tmp, option)
+
+ # Check for running process
+ self.assertTrue(process_named_running(PROCESS_NAME))
+
+ def test_external_nameserver(self):
+ """ Externe Domain Name Servers (DNS) addresses """
+
+ for network in allow_from:
+ self.session.set(base_path + ['allow-from', network])
+ for address in listen_adress:
+ self.session.set(base_path + ['listen-address', address])
+
+ nameservers = ['192.0.2.1', '192.0.2.2']
+ for nameserver in nameservers:
+ self.session.set(base_path + ['name-server', nameserver])
+
+ # commit changes
+ self.session.commit()
+
+ tmp = get_config_value(r'\+.', file=FORWARD_FILE)
+ self.assertEqual(tmp, ', '.join(nameservers))
+
+ # Do not use local /etc/hosts file in name resolution
+ # default: yes
+ tmp = get_config_value('export-etc-hosts')
+ self.assertEqual(tmp, 'yes')
+
+ # Check for running process
+ self.assertTrue(process_named_running(PROCESS_NAME))
+
+ def test_domain_forwarding(self):
+ """ Externe Domain Name Servers (DNS) addresses """
+
+ for network in allow_from:
+ self.session.set(base_path + ['allow-from', network])
+ for address in listen_adress:
+ self.session.set(base_path + ['listen-address', address])
+
+ domains = ['vyos.io', 'vyos.net', 'vyos.com']
+ nameservers = ['192.0.2.1', '192.0.2.2']
+ for domain in domains:
+ for nameserver in nameservers:
+ self.session.set(base_path + ['domain', domain, 'server', nameserver])
+
+ # Test 'recursion-desired' flag for only one domain
+ if domain == domains[0]:
+ self.session.set(base_path + ['domain', domain, 'recursion-desired'])
+
+ # Test 'negative trust anchor' flag for the second domain only
+ if domain == domains[1]:
+ self.session.set(base_path + ['domain', domain, 'addnta'])
+
+ # commit changes
+ self.session.commit()
+
+ # Test configured name-servers
+ hosts_conf = read_file(HOSTSD_FILE)
+ for domain in domains:
+ # Test 'recursion-desired' flag for the first domain only
+ if domain == domains[0]: key =f'\+{domain}'
+ else: key =f'{domain}'
+ tmp = get_config_value(key, file=FORWARD_FILE)
+ self.assertEqual(tmp, ', '.join(nameservers))
+
+ # Test 'negative trust anchor' flag for the second domain only
+ if domain == domains[1]:
+ self.assertIn(f'addNTA("{domain}", "static")', hosts_conf)
+
+ # Check for running process
+ self.assertTrue(process_named_running(PROCESS_NAME))
+
+if __name__ == '__main__':
+ unittest.main()
+
diff --git a/smoketest/scripts/cli/test_service_mdns-repeater.py b/smoketest/scripts/cli/test_service_mdns-repeater.py
index 18900b6d2..de73b9914 100755
--- a/smoketest/scripts/cli/test_service_mdns-repeater.py
+++ b/smoketest/scripts/cli/test_service_mdns-repeater.py
@@ -17,8 +17,8 @@
import os
import unittest
-from psutil import process_iter
from vyos.configsession import ConfigSession
+from vyos.util import process_named_running
base_path = ['service', 'mdns', 'repeater']
intf_base = ['interfaces', 'dummy']
@@ -45,7 +45,7 @@ class TestServiceMDNSrepeater(unittest.TestCase):
self.session.commit()
# Check for running process
- self.assertTrue("mdns-repeater" in (p.name() for p in process_iter()))
+ self.assertTrue(process_named_running('mdns-repeater'))
if __name__ == '__main__':
unittest.main()
diff --git a/smoketest/scripts/cli/test_service_pppoe-server.py b/smoketest/scripts/cli/test_service_pppoe-server.py
index 901ca792d..3a6b12ef4 100755
--- a/smoketest/scripts/cli/test_service_pppoe-server.py
+++ b/smoketest/scripts/cli/test_service_pppoe-server.py
@@ -14,15 +14,15 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-import re
import os
import unittest
from configparser import ConfigParser
-from psutil import process_iter
from vyos.configsession import ConfigSession
from vyos.configsession import ConfigSessionError
+from vyos.util import process_named_running
+process_name = 'accel-pppd'
base_path = ['service', 'pppoe-server']
local_if = ['interfaces', 'dummy', 'dum667']
pppoe_conf = '/run/accel-pppd/pppoe.conf'
@@ -116,7 +116,7 @@ class TestServicePPPoEServer(unittest.TestCase):
self.assertEqual(conf['connlimit']['limit'], '20/min')
# Check for running process
- self.assertTrue('accel-pppd' in (p.name() for p in process_iter()))
+ self.assertTrue(process_named_running(process_name))
def test_radius_auth(self):
""" Test configuration of RADIUS authentication for PPPoE server """
@@ -161,7 +161,7 @@ class TestServicePPPoEServer(unittest.TestCase):
self.assertFalse(conf['ppp'].getboolean('ccp'))
# Check for running process
- self.assertTrue('accel-pppd' in (p.name() for p in process_iter()))
+ self.assertTrue(process_named_running(process_name))
if __name__ == '__main__':
unittest.main()
diff --git a/smoketest/scripts/cli/test_service_router-advert.py b/smoketest/scripts/cli/test_service_router-advert.py
index ec2110c8a..238f59e6d 100755
--- a/smoketest/scripts/cli/test_service_router-advert.py
+++ b/smoketest/scripts/cli/test_service_router-advert.py
@@ -18,9 +18,9 @@ import re
import os
import unittest
-from psutil import process_iter
from vyos.configsession import ConfigSession
from vyos.util import read_file
+from vyos.util import process_named_running
RADVD_CONF = '/run/radvd/radvd.conf'
@@ -90,10 +90,8 @@ class TestServiceRADVD(unittest.TestCase):
tmp = get_config_value('AdvOnLink')
self.assertEqual(tmp, 'off')
-
-
# Check for running process
- self.assertTrue('radvd' in (p.name() for p in process_iter()))
+ self.assertTrue(process_named_running('radvd'))
if __name__ == '__main__':
unittest.main()
diff --git a/smoketest/scripts/cli/test_service_snmp.py b/smoketest/scripts/cli/test_service_snmp.py
index fb5f5393f..067a3c76b 100755
--- a/smoketest/scripts/cli/test_service_snmp.py
+++ b/smoketest/scripts/cli/test_service_snmp.py
@@ -19,12 +19,15 @@ import re
import unittest
from vyos.validate import is_ipv4
-from psutil import process_iter
-from vyos.configsession import ConfigSession, ConfigSessionError
+from vyos.configsession import ConfigSession
+from vyos.configsession import ConfigSessionError
from vyos.util import read_file
+from vyos.util import process_named_running
+PROCESS_NAME = 'snmpd'
SNMPD_CONF = '/etc/snmp/snmpd.conf'
+
base_path = ['service', 'snmp']
def get_config_value(key):
@@ -78,7 +81,7 @@ class TestSNMPService(unittest.TestCase):
self.assertTrue(expected in config)
# Check for running process
- self.assertTrue("snmpd" in (p.name() for p in process_iter()))
+ self.assertTrue(process_named_running(PROCESS_NAME))
def test_snmpv3_sha(self):
@@ -113,7 +116,7 @@ class TestSNMPService(unittest.TestCase):
# TODO: read in config file and check values
# Check for running process
- self.assertTrue("snmpd" in (p.name() for p in process_iter()))
+ self.assertTrue(process_named_running(PROCESS_NAME))
def test_snmpv3_md5(self):
""" Check if SNMPv3 can be configured with MD5 authentication and service runs"""
@@ -147,8 +150,7 @@ class TestSNMPService(unittest.TestCase):
# TODO: read in config file and check values
# Check for running process
- self.assertTrue("snmpd" in (p.name() for p in process_iter()))
-
+ self.assertTrue(process_named_running(PROCESS_NAME))
if __name__ == '__main__':
unittest.main()
diff --git a/smoketest/scripts/cli/test_service_ssh.py b/smoketest/scripts/cli/test_service_ssh.py
index 79850fe44..0cd00ccce 100755
--- a/smoketest/scripts/cli/test_service_ssh.py
+++ b/smoketest/scripts/cli/test_service_ssh.py
@@ -18,10 +18,12 @@ import re
import os
import unittest
-from psutil import process_iter
-from vyos.configsession import ConfigSession, ConfigSessionError
+from vyos.configsession import ConfigSession
+from vyos.configsession import ConfigSessionError
from vyos.util import read_file
+from vyos.util import process_named_running
+PROCESS_NAME = 'sshd'
SSHD_CONF = '/run/ssh/sshd_config'
base_path = ['service', 'ssh']
@@ -30,9 +32,6 @@ def get_config_value(key):
tmp = re.findall(f'\n?{key}\s+(.*)', tmp)
return tmp
-def is_service_running():
- return 'sshd' in (p.name() for p in process_iter())
-
class TestServiceSSH(unittest.TestCase):
def setUp(self):
self.session = ConfigSession(os.getpid())
@@ -62,7 +61,7 @@ class TestServiceSSH(unittest.TestCase):
self.assertEqual('22', port)
# Check for running process
- self.assertTrue(is_service_running())
+ self.assertTrue(process_named_running(PROCESS_NAME))
def test_ssh_single(self):
""" Check if SSH service can be configured and runs """
@@ -101,7 +100,7 @@ class TestServiceSSH(unittest.TestCase):
self.assertTrue("100" in keepalive)
# Check for running process
- self.assertTrue(is_service_running())
+ self.assertTrue(process_named_running(PROCESS_NAME))
def test_ssh_multi(self):
""" Check if SSH service can be configured and runs with multiple
@@ -128,7 +127,7 @@ class TestServiceSSH(unittest.TestCase):
self.assertIn(address, tmp)
# Check for running process
- self.assertTrue(is_service_running())
+ self.assertTrue(process_named_running(PROCESS_NAME))
if __name__ == '__main__':
unittest.main()
diff --git a/smoketest/scripts/cli/test_system_lcd.py b/smoketest/scripts/cli/test_system_lcd.py
index 931a91c53..9385799b0 100755
--- a/smoketest/scripts/cli/test_system_lcd.py
+++ b/smoketest/scripts/cli/test_system_lcd.py
@@ -18,9 +18,10 @@ import os
import unittest
from configparser import ConfigParser
-from psutil import process_iter
from vyos.configsession import ConfigSession
+from vyos.util import process_named_running
+config_file = '/run/LCDd/LCDd.conf'
base_path = ['system', 'lcd']
class TestSystemLCD(unittest.TestCase):
@@ -42,13 +43,13 @@ class TestSystemLCD(unittest.TestCase):
# load up ini-styled LCDd.conf
conf = ConfigParser()
- conf.read('/run/LCDd/LCDd.conf')
+ conf.read(config_file)
self.assertEqual(conf['CFontzPacket']['Model'], '533')
self.assertEqual(conf['CFontzPacket']['Device'], '/dev/ttyS1')
- # both processes running
- self.assertTrue('LCDd' in (p.name() for p in process_iter()))
+ # Check for running process
+ self.assertTrue(process_named_running('LCDd'))
if __name__ == '__main__':
unittest.main()
diff --git a/smoketest/scripts/cli/test_system_ntp.py b/smoketest/scripts/cli/test_system_ntp.py
index 856a28916..2a7c64870 100755
--- a/smoketest/scripts/cli/test_system_ntp.py
+++ b/smoketest/scripts/cli/test_system_ntp.py
@@ -18,11 +18,14 @@ import re
import os
import unittest
-from psutil import process_iter
-from vyos.configsession import ConfigSession, ConfigSessionError
-from vyos.template import vyos_address_from_cidr, vyos_netmask_from_cidr
+from vyos.configsession import ConfigSession
+from vyos.configsession import ConfigSessionError
+from vyos.template import vyos_address_from_cidr
+from vyos.template import vyos_netmask_from_cidr
from vyos.util import read_file
+from vyos.util import process_named_running
+PROCESS_NAME = 'ntpd'
NTP_CONF = '/etc/ntp.conf'
base_path = ['system', 'ntp']
@@ -63,7 +66,7 @@ class TestSystemNTP(unittest.TestCase):
self.assertTrue(test in tmp)
# Check for running process
- self.assertTrue("ntpd" in (p.name() for p in process_iter()))
+ self.assertTrue(process_named_running(PROCESS_NAME))
def test_ntp_clients(self):
""" Test the allowed-networks statement """
@@ -102,7 +105,7 @@ class TestSystemNTP(unittest.TestCase):
self.assertEqual(tmp, test)
# Check for running process
- self.assertTrue("ntpd" in (p.name() for p in process_iter()))
+ self.assertTrue(process_named_running(PROCESS_NAME))
if __name__ == '__main__':
unittest.main()
diff --git a/smoketest/scripts/cli/test_vpn_openconnect.py b/smoketest/scripts/cli/test_vpn_openconnect.py
index d2b82d686..2ba6aabf9 100755
--- a/smoketest/scripts/cli/test_vpn_openconnect.py
+++ b/smoketest/scripts/cli/test_vpn_openconnect.py
@@ -14,13 +14,11 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-import re
import os
import unittest
-from psutil import process_iter
-from vyos.configsession import ConfigSession, ConfigSessionError
-from vyos.util import read_file
+from vyos.configsession import ConfigSession
+from vyos.util import process_named_running
OCSERV_CONF = '/run/ocserv/ocserv.conf'
base_path = ['vpn', 'openconnect']
@@ -52,7 +50,7 @@ class TestVpnOpenconnect(unittest.TestCase):
self.session.commit()
# Check for running process
- self.assertTrue("ocserv-main" in (p.name() for p in process_iter()))
+ self.assertTrue(process_named_running('ocserv-main'))
if __name__ == '__main__':
unittest.main()