diff options
Diffstat (limited to 'smoketest/scripts/cli')
-rw-r--r-- | smoketest/scripts/cli/base_interfaces_test.py | 7 | ||||
-rwxr-xr-x | smoketest/scripts/cli/test_interfaces_macsec.py | 136 | ||||
-rwxr-xr-x | smoketest/scripts/cli/test_interfaces_wireless.py | 10 | ||||
-rwxr-xr-x | smoketest/scripts/cli/test_interfaces_wirelessmodem.py | 2 | ||||
-rwxr-xr-x | smoketest/scripts/cli/test_service_dns_dynamic.py | 22 | ||||
-rwxr-xr-x | smoketest/scripts/cli/test_service_dns_forwarding.py | 192 | ||||
-rwxr-xr-x | smoketest/scripts/cli/test_service_mdns-repeater.py | 4 | ||||
-rwxr-xr-x | smoketest/scripts/cli/test_service_pppoe-server.py | 8 | ||||
-rwxr-xr-x | smoketest/scripts/cli/test_service_router-advert.py | 6 | ||||
-rwxr-xr-x | smoketest/scripts/cli/test_service_snmp.py | 14 | ||||
-rwxr-xr-x | smoketest/scripts/cli/test_service_ssh.py | 15 | ||||
-rwxr-xr-x | smoketest/scripts/cli/test_system_lcd.py | 9 | ||||
-rwxr-xr-x | smoketest/scripts/cli/test_system_ntp.py | 13 | ||||
-rwxr-xr-x | smoketest/scripts/cli/test_vpn_openconnect.py | 8 |
14 files changed, 344 insertions, 102 deletions
diff --git a/smoketest/scripts/cli/base_interfaces_test.py b/smoketest/scripts/cli/base_interfaces_test.py index 14ec7e137..047c19dd0 100644 --- a/smoketest/scripts/cli/base_interfaces_test.py +++ b/smoketest/scripts/cli/base_interfaces_test.py @@ -14,12 +14,15 @@ import os import unittest +import json from netifaces import ifaddresses, AF_INET, AF_INET6 from vyos.configsession import ConfigSession from vyos.ifconfig import Interface from vyos.util import read_file +from vyos.util import cmd +from vyos.util import vyos_dict_search from vyos.validate import is_intf_addr_assigned, is_ipv6_link_local class BasicInterfaceTest: @@ -212,8 +215,12 @@ class BasicInterfaceTest: self.session.set(base + ['address', address]) self.session.commit() + for interface in self._interfaces: for vif_s in self._qinq_range: + tmp = json.loads(cmd(f'ip -d -j link show dev {interface}.{vif_s}'))[0] + self.assertEqual(vyos_dict_search('linkinfo.info_data.protocol', tmp), '802.1ad') + for vif_c in self._vlan_range: vif = f'{interface}.{vif_s}.{vif_c}' for address in self._test_addr: diff --git a/smoketest/scripts/cli/test_interfaces_macsec.py b/smoketest/scripts/cli/test_interfaces_macsec.py index 0f1b6486d..6d1be86ba 100755 --- a/smoketest/scripts/cli/test_interfaces_macsec.py +++ b/smoketest/scripts/cli/test_interfaces_macsec.py @@ -16,15 +16,17 @@ import re import unittest -from psutil import process_iter -from vyos.ifconfig import Section from base_interfaces_test import BasicInterfaceTest +from netifaces import interfaces + from vyos.configsession import ConfigSessionError +from vyos.ifconfig import Section from vyos.util import read_file +from vyos.util import process_named_running -def get_config_value(intf, key): - tmp = read_file(f'/run/wpa_supplicant/{intf}.conf') +def get_config_value(interface, key): + tmp = read_file(f'/run/wpa_supplicant/{interface}.conf') tmp = re.findall(r'\n?{}=(.*)'.format(key), tmp) return tmp[0] @@ -32,71 +34,123 @@ class MACsecInterfaceTest(BasicInterfaceTest.BaseTest): def setUp(self): super().setUp() self._base_path = ['interfaces', 'macsec'] - self._options = { - 'macsec0': ['source-interface eth0', - 'security cipher gcm-aes-128'] - } + self._options = { 'macsec0': ['source-interface eth0', 'security cipher gcm-aes-128'] } # if we have a physical eth1 interface, add a second macsec instance if 'eth1' in Section.interfaces("ethernet"): - macsec = { 'macsec1': ['source-interface eth1', 'security cipher gcm-aes-128'] } + macsec = { 'macsec1': [f'source-interface eth1', 'security cipher gcm-aes-128'] } self._options.update(macsec) self._interfaces = list(self._options) def test_encryption(self): - """ MACsec can be operating in authentication and encryption - mode - both using different mandatory settings, lets test - encryption as the basic authentication test has been performed - using the base class tests """ - intf = 'macsec0' - src_intf = 'eth0' + """ MACsec can be operating in authentication and encryption mode - both + using different mandatory settings, lets test encryption as the basic + authentication test has been performed using the base class tests """ + mak_cak = '232e44b7fda6f8e2d88a07bf78a7aff4' mak_ckn = '40916f4b23e3d548ad27eedd2d10c6f98c2d21684699647d63d41b500dfe8836' replay_window = '64' - self.session.set(self._base_path + [intf, 'security', 'encrypt']) - # check validate() - Cipher suite must be set for MACsec - with self.assertRaises(ConfigSessionError): - self.session.commit() - self.session.set(self._base_path + [intf, 'security', 'cipher', 'gcm-aes-128']) + for interface, option_value in self._options.items(): + for option in option_value: + if option.split()[0] == 'source-interface': + src_interface = option.split()[1] - # check validate() - Physical source interface must be set for MACsec - with self.assertRaises(ConfigSessionError): + self.session.set(self._base_path + [interface] + option.split()) + + # Encrypt link + self.session.set(self._base_path + [interface, 'security', 'encrypt']) + + # check validate() - Physical source interface MTU must be higher then our MTU + self.session.set(self._base_path + [interface, 'mtu', '1500']) + with self.assertRaises(ConfigSessionError): + self.session.commit() + self.session.delete(self._base_path + [interface, 'mtu']) + + # check validate() - MACsec security keys mandartory when encryption is enabled + with self.assertRaises(ConfigSessionError): + self.session.commit() + self.session.set(self._base_path + [interface, 'security', 'mka', 'cak', mak_cak]) + + # check validate() - MACsec security keys mandartory when encryption is enabled + with self.assertRaises(ConfigSessionError): + self.session.commit() + self.session.set(self._base_path + [interface, 'security', 'mka', 'ckn', mak_ckn]) + + self.session.set(self._base_path + [interface, 'security', 'replay-window', replay_window]) + + # final commit of settings self.session.commit() - self.session.set(self._base_path + [intf, 'source-interface', src_intf]) - # check validate() - MACsec security keys mandartory when encryption is enabled + tmp = get_config_value(src_interface, 'macsec_integ_only') + self.assertTrue("0" in tmp) + + tmp = get_config_value(src_interface, 'mka_cak') + self.assertTrue(mak_cak in tmp) + + tmp = get_config_value(src_interface, 'mka_ckn') + self.assertTrue(mak_ckn in tmp) + + # check that the default priority of 255 is programmed + tmp = get_config_value(src_interface, 'mka_priority') + self.assertTrue("255" in tmp) + + tmp = get_config_value(src_interface, 'macsec_replay_window') + self.assertTrue(replay_window in tmp) + + tmp = read_file(f'/sys/class/net/{interface}/mtu') + self.assertEqual(tmp, '1460') + + # Check for running process + self.assertTrue(process_named_running('wpa_supplicant')) + + def test_mandatory_toptions(self): + interface = 'macsec1' + self.session.set(self._base_path + [interface]) + + # check validate() - source interface is mandatory with self.assertRaises(ConfigSessionError): self.session.commit() - self.session.set(self._base_path + [intf, 'security', 'mka', 'cak', mak_cak]) + self.session.set(self._base_path + [interface, 'source-interface', 'eth0']) - # check validate() - MACsec security keys mandartory when encryption is enabled + # check validate() - cipher is mandatory with self.assertRaises(ConfigSessionError): self.session.commit() - self.session.set(self._base_path + [intf, 'security', 'mka', 'ckn', mak_ckn]) + self.session.set(self._base_path + [interface, 'security', 'cipher', 'gcm-aes-128']) - self.session.set(self._base_path + [intf, 'security', 'replay-window', replay_window]) + # final commit and verify self.session.commit() + self.assertIn(interface, interfaces()) - tmp = get_config_value(src_intf, 'macsec_integ_only') - self.assertTrue("0" in tmp) + def test_source_interface(self): + """ Ensure source-interface can bot be part of any other bond or bridge """ - tmp = get_config_value(src_intf, 'mka_cak') - self.assertTrue(mak_cak in tmp) + base_bridge = ['interfaces', 'bridge', 'br200'] + base_bond = ['interfaces', 'bonding', 'bond200'] - tmp = get_config_value(src_intf, 'mka_ckn') - self.assertTrue(mak_ckn in tmp) + for interface, option_value in self._options.items(): + for option in option_value: + self.session.set(self._base_path + [interface] + option.split()) + if option.split()[0] == 'source-interface': + src_interface = option.split()[1] - # check that the default priority of 255 is programmed - tmp = get_config_value(src_intf, 'mka_priority') - self.assertTrue("255" in tmp) + self.session.set(base_bridge + ['member', 'interface', src_interface]) + # check validate() - Source interface must not already be a member of a bridge + with self.assertRaises(ConfigSessionError): + self.session.commit() + self.session.delete(base_bridge) - tmp = get_config_value(src_intf, 'macsec_replay_window') - self.assertTrue(replay_window in tmp) + self.session.set(base_bond + ['member', 'interface', src_interface]) + # check validate() - Source interface must not already be a member of a bridge + with self.assertRaises(ConfigSessionError): + self.session.commit() + self.session.delete(base_bond) - # Check for running process - self.assertTrue("wpa_supplicant" in (p.name() for p in process_iter())) + # final commit and verify + self.session.commit() + self.assertIn(interface, interfaces()) if __name__ == '__main__': unittest.main() + diff --git a/smoketest/scripts/cli/test_interfaces_wireless.py b/smoketest/scripts/cli/test_interfaces_wireless.py index 691f633b7..0e93b6432 100755 --- a/smoketest/scripts/cli/test_interfaces_wireless.py +++ b/smoketest/scripts/cli/test_interfaces_wireless.py @@ -19,8 +19,7 @@ import re import unittest from base_interfaces_test import BasicInterfaceTest -from psutil import process_iter - +from vyos.util import process_named_running from vyos.util import check_kmod from vyos.util import read_file @@ -54,10 +53,10 @@ class WirelessInterfaceTest(BasicInterfaceTest.BaseTest): for option, option_value in self._options.items(): if 'type access-point' in option_value: # Check for running process - self.assertIn('hostapd', (p.name() for p in process_iter())) + self.assertTrue(process_named_running('hostapd')) elif 'type station' in option_value: # Check for running process - self.assertIn('wpa_supplicant', (p.name() for p in process_iter())) + self.assertTrue(process_named_running('wpa_supplicant')) else: self.assertTrue(False) @@ -137,8 +136,7 @@ class WirelessInterfaceTest(BasicInterfaceTest.BaseTest): self.assertIn(value, tmp) # Check for running process - self.assertIn('hostapd', (p.name() for p in process_iter())) - + self.assertTrue(process_named_running('hostapd')) if __name__ == '__main__': check_kmod('mac80211_hwsim') diff --git a/smoketest/scripts/cli/test_interfaces_wirelessmodem.py b/smoketest/scripts/cli/test_interfaces_wirelessmodem.py index 40cd03b93..efc9c0e98 100755 --- a/smoketest/scripts/cli/test_interfaces_wirelessmodem.py +++ b/smoketest/scripts/cli/test_interfaces_wirelessmodem.py @@ -43,7 +43,7 @@ class WWANInterfaceTest(unittest.TestCase): def test_wlm_1(self): for interface in self._interfaces: self.session.set(base_path + [interface, 'no-peer-dns']) - self.session.set(base_path + [interface, 'ondemand']) + self.session.set(base_path + [interface, 'connect-on-demand']) # check validate() - APN must be configure with self.assertRaises(ConfigSessionError): diff --git a/smoketest/scripts/cli/test_service_dns_dynamic.py b/smoketest/scripts/cli/test_service_dns_dynamic.py index be52360ed..c7ac87135 100755 --- a/smoketest/scripts/cli/test_service_dns_dynamic.py +++ b/smoketest/scripts/cli/test_service_dns_dynamic.py @@ -19,10 +19,12 @@ import os import unittest from getpass import getuser -from psutil import process_iter -from vyos.configsession import ConfigSession, ConfigSessionError +from vyos.configsession import ConfigSession +from vyos.configsession import ConfigSessionError from vyos.util import read_file +from vyos.util import process_named_running +PROCESS_NAME = 'ddclient' DDCLIENT_CONF = '/run/ddclient/ddclient.conf' base_path = ['service', 'dns', 'dynamic'] @@ -32,17 +34,6 @@ def get_config_value(key): tmp = tmp[0].rstrip(',') return tmp -def check_process(): - """ - Check for running process, process name changes dynamically e.g. - "ddclient - sleeping for 270 seconds", thus we need a different approach - """ - running = False - for p in process_iter(): - if "ddclient" in p.name(): - running = True - return running - class TestServiceDDNS(unittest.TestCase): def setUp(self): self.session = ConfigSession(os.getpid()) @@ -104,8 +95,7 @@ class TestServiceDDNS(unittest.TestCase): self.assertTrue(pwd == "'" + password + "'") # Check for running process - self.assertTrue(check_process()) - + self.assertTrue(process_named_running(PROCESS_NAME)) def test_rfc2136(self): """ Check if DDNS service can be configured and runs """ @@ -135,7 +125,7 @@ class TestServiceDDNS(unittest.TestCase): # TODO: inspect generated configuration file # Check for running process - self.assertTrue(check_process()) + self.assertTrue(process_named_running(PROCESS_NAME)) if __name__ == '__main__': unittest.main() diff --git a/smoketest/scripts/cli/test_service_dns_forwarding.py b/smoketest/scripts/cli/test_service_dns_forwarding.py new file mode 100755 index 000000000..5e2f3dfbd --- /dev/null +++ b/smoketest/scripts/cli/test_service_dns_forwarding.py @@ -0,0 +1,192 @@ +#!/usr/bin/env python3 +# +# Copyright (C) 2019-2020 VyOS maintainers and contributors +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 2 or later as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import re +import os +import unittest + +from vyos.configsession import ConfigSession, ConfigSessionError +from vyos.util import read_file +from vyos.util import process_named_running + +CONFIG_FILE = '/run/powerdns/recursor.conf' +FORWARD_FILE = '/run/powerdns/recursor.forward-zones.conf' +HOSTSD_FILE = '/run/powerdns/recursor.vyos-hostsd.conf.lua' +PROCESS_NAME= 'pdns-r/worker' + +base_path = ['service', 'dns', 'forwarding'] + +allow_from = ['192.0.2.0/24', '2001:db8::/32'] +listen_adress = ['127.0.0.1', '::1'] + +def get_config_value(key, file=CONFIG_FILE): + tmp = read_file(file) + tmp = re.findall(r'\n{}=+(.*)'.format(key), tmp) + return tmp[0] + +class TestServicePowerDNS(unittest.TestCase): + def setUp(self): + self.session = ConfigSession(os.getpid()) + + def tearDown(self): + # Delete DNS forwarding configuration + self.session.delete(base_path) + self.session.commit() + del self.session + + def test_basic_forwarding(self): + """ Check basic DNS forwarding settings """ + cache_size = '20' + negative_ttl = '120' + + self.session.set(base_path + ['cache-size', cache_size]) + self.session.set(base_path + ['negative-ttl', negative_ttl]) + + # check validate() - allow from must be defined + with self.assertRaises(ConfigSessionError): + self.session.commit() + for network in allow_from: + self.session.set(base_path + ['allow-from', network]) + + # check validate() - listen-address must be defined + with self.assertRaises(ConfigSessionError): + self.session.commit() + for address in listen_adress: + self.session.set(base_path + ['listen-address', address]) + + # configure DNSSEC + self.session.set(base_path + ['dnssec', 'validate']) + + # Do not use local /etc/hosts file in name resolution + self.session.set(base_path + ['ignore-hosts-file']) + + # commit changes + self.session.commit() + + # Check configured cache-size + tmp = get_config_value('max-cache-entries') + self.assertEqual(tmp, cache_size) + + # Networks allowed to query this server + tmp = get_config_value('allow-from') + self.assertEqual(tmp, ','.join(allow_from)) + + # Addresses to listen for DNS queries + tmp = get_config_value('local-address') + self.assertEqual(tmp, ','.join(listen_adress)) + + # Maximum amount of time negative entries are cached + tmp = get_config_value('max-negative-ttl') + self.assertEqual(tmp, negative_ttl) + + # Do not use local /etc/hosts file in name resolution + tmp = get_config_value('export-etc-hosts') + self.assertEqual(tmp, 'no') + + # Check for running process + self.assertTrue(process_named_running(PROCESS_NAME)) + + def test_dnssec(self): + """ DNSSEC option testing """ + + for network in allow_from: + self.session.set(base_path + ['allow-from', network]) + for address in listen_adress: + self.session.set(base_path + ['listen-address', address]) + + options = ['off', 'process-no-validate', 'process', 'log-fail', 'validate'] + for option in options: + self.session.set(base_path + ['dnssec', option]) + + # commit changes + self.session.commit() + + tmp = get_config_value('dnssec') + self.assertEqual(tmp, option) + + # Check for running process + self.assertTrue(process_named_running(PROCESS_NAME)) + + def test_external_nameserver(self): + """ Externe Domain Name Servers (DNS) addresses """ + + for network in allow_from: + self.session.set(base_path + ['allow-from', network]) + for address in listen_adress: + self.session.set(base_path + ['listen-address', address]) + + nameservers = ['192.0.2.1', '192.0.2.2'] + for nameserver in nameservers: + self.session.set(base_path + ['name-server', nameserver]) + + # commit changes + self.session.commit() + + tmp = get_config_value(r'\+.', file=FORWARD_FILE) + self.assertEqual(tmp, ', '.join(nameservers)) + + # Do not use local /etc/hosts file in name resolution + # default: yes + tmp = get_config_value('export-etc-hosts') + self.assertEqual(tmp, 'yes') + + # Check for running process + self.assertTrue(process_named_running(PROCESS_NAME)) + + def test_domain_forwarding(self): + """ Externe Domain Name Servers (DNS) addresses """ + + for network in allow_from: + self.session.set(base_path + ['allow-from', network]) + for address in listen_adress: + self.session.set(base_path + ['listen-address', address]) + + domains = ['vyos.io', 'vyos.net', 'vyos.com'] + nameservers = ['192.0.2.1', '192.0.2.2'] + for domain in domains: + for nameserver in nameservers: + self.session.set(base_path + ['domain', domain, 'server', nameserver]) + + # Test 'recursion-desired' flag for only one domain + if domain == domains[0]: + self.session.set(base_path + ['domain', domain, 'recursion-desired']) + + # Test 'negative trust anchor' flag for the second domain only + if domain == domains[1]: + self.session.set(base_path + ['domain', domain, 'addnta']) + + # commit changes + self.session.commit() + + # Test configured name-servers + hosts_conf = read_file(HOSTSD_FILE) + for domain in domains: + # Test 'recursion-desired' flag for the first domain only + if domain == domains[0]: key =f'\+{domain}' + else: key =f'{domain}' + tmp = get_config_value(key, file=FORWARD_FILE) + self.assertEqual(tmp, ', '.join(nameservers)) + + # Test 'negative trust anchor' flag for the second domain only + if domain == domains[1]: + self.assertIn(f'addNTA("{domain}", "static")', hosts_conf) + + # Check for running process + self.assertTrue(process_named_running(PROCESS_NAME)) + +if __name__ == '__main__': + unittest.main() + diff --git a/smoketest/scripts/cli/test_service_mdns-repeater.py b/smoketest/scripts/cli/test_service_mdns-repeater.py index 18900b6d2..de73b9914 100755 --- a/smoketest/scripts/cli/test_service_mdns-repeater.py +++ b/smoketest/scripts/cli/test_service_mdns-repeater.py @@ -17,8 +17,8 @@ import os import unittest -from psutil import process_iter from vyos.configsession import ConfigSession +from vyos.util import process_named_running base_path = ['service', 'mdns', 'repeater'] intf_base = ['interfaces', 'dummy'] @@ -45,7 +45,7 @@ class TestServiceMDNSrepeater(unittest.TestCase): self.session.commit() # Check for running process - self.assertTrue("mdns-repeater" in (p.name() for p in process_iter())) + self.assertTrue(process_named_running('mdns-repeater')) if __name__ == '__main__': unittest.main() diff --git a/smoketest/scripts/cli/test_service_pppoe-server.py b/smoketest/scripts/cli/test_service_pppoe-server.py index 901ca792d..3a6b12ef4 100755 --- a/smoketest/scripts/cli/test_service_pppoe-server.py +++ b/smoketest/scripts/cli/test_service_pppoe-server.py @@ -14,15 +14,15 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. -import re import os import unittest from configparser import ConfigParser -from psutil import process_iter from vyos.configsession import ConfigSession from vyos.configsession import ConfigSessionError +from vyos.util import process_named_running +process_name = 'accel-pppd' base_path = ['service', 'pppoe-server'] local_if = ['interfaces', 'dummy', 'dum667'] pppoe_conf = '/run/accel-pppd/pppoe.conf' @@ -116,7 +116,7 @@ class TestServicePPPoEServer(unittest.TestCase): self.assertEqual(conf['connlimit']['limit'], '20/min') # Check for running process - self.assertTrue('accel-pppd' in (p.name() for p in process_iter())) + self.assertTrue(process_named_running(process_name)) def test_radius_auth(self): """ Test configuration of RADIUS authentication for PPPoE server """ @@ -161,7 +161,7 @@ class TestServicePPPoEServer(unittest.TestCase): self.assertFalse(conf['ppp'].getboolean('ccp')) # Check for running process - self.assertTrue('accel-pppd' in (p.name() for p in process_iter())) + self.assertTrue(process_named_running(process_name)) if __name__ == '__main__': unittest.main() diff --git a/smoketest/scripts/cli/test_service_router-advert.py b/smoketest/scripts/cli/test_service_router-advert.py index ec2110c8a..238f59e6d 100755 --- a/smoketest/scripts/cli/test_service_router-advert.py +++ b/smoketest/scripts/cli/test_service_router-advert.py @@ -18,9 +18,9 @@ import re import os import unittest -from psutil import process_iter from vyos.configsession import ConfigSession from vyos.util import read_file +from vyos.util import process_named_running RADVD_CONF = '/run/radvd/radvd.conf' @@ -90,10 +90,8 @@ class TestServiceRADVD(unittest.TestCase): tmp = get_config_value('AdvOnLink') self.assertEqual(tmp, 'off') - - # Check for running process - self.assertTrue('radvd' in (p.name() for p in process_iter())) + self.assertTrue(process_named_running('radvd')) if __name__ == '__main__': unittest.main() diff --git a/smoketest/scripts/cli/test_service_snmp.py b/smoketest/scripts/cli/test_service_snmp.py index fb5f5393f..067a3c76b 100755 --- a/smoketest/scripts/cli/test_service_snmp.py +++ b/smoketest/scripts/cli/test_service_snmp.py @@ -19,12 +19,15 @@ import re import unittest from vyos.validate import is_ipv4 -from psutil import process_iter -from vyos.configsession import ConfigSession, ConfigSessionError +from vyos.configsession import ConfigSession +from vyos.configsession import ConfigSessionError from vyos.util import read_file +from vyos.util import process_named_running +PROCESS_NAME = 'snmpd' SNMPD_CONF = '/etc/snmp/snmpd.conf' + base_path = ['service', 'snmp'] def get_config_value(key): @@ -78,7 +81,7 @@ class TestSNMPService(unittest.TestCase): self.assertTrue(expected in config) # Check for running process - self.assertTrue("snmpd" in (p.name() for p in process_iter())) + self.assertTrue(process_named_running(PROCESS_NAME)) def test_snmpv3_sha(self): @@ -113,7 +116,7 @@ class TestSNMPService(unittest.TestCase): # TODO: read in config file and check values # Check for running process - self.assertTrue("snmpd" in (p.name() for p in process_iter())) + self.assertTrue(process_named_running(PROCESS_NAME)) def test_snmpv3_md5(self): """ Check if SNMPv3 can be configured with MD5 authentication and service runs""" @@ -147,8 +150,7 @@ class TestSNMPService(unittest.TestCase): # TODO: read in config file and check values # Check for running process - self.assertTrue("snmpd" in (p.name() for p in process_iter())) - + self.assertTrue(process_named_running(PROCESS_NAME)) if __name__ == '__main__': unittest.main() diff --git a/smoketest/scripts/cli/test_service_ssh.py b/smoketest/scripts/cli/test_service_ssh.py index 79850fe44..0cd00ccce 100755 --- a/smoketest/scripts/cli/test_service_ssh.py +++ b/smoketest/scripts/cli/test_service_ssh.py @@ -18,10 +18,12 @@ import re import os import unittest -from psutil import process_iter -from vyos.configsession import ConfigSession, ConfigSessionError +from vyos.configsession import ConfigSession +from vyos.configsession import ConfigSessionError from vyos.util import read_file +from vyos.util import process_named_running +PROCESS_NAME = 'sshd' SSHD_CONF = '/run/ssh/sshd_config' base_path = ['service', 'ssh'] @@ -30,9 +32,6 @@ def get_config_value(key): tmp = re.findall(f'\n?{key}\s+(.*)', tmp) return tmp -def is_service_running(): - return 'sshd' in (p.name() for p in process_iter()) - class TestServiceSSH(unittest.TestCase): def setUp(self): self.session = ConfigSession(os.getpid()) @@ -62,7 +61,7 @@ class TestServiceSSH(unittest.TestCase): self.assertEqual('22', port) # Check for running process - self.assertTrue(is_service_running()) + self.assertTrue(process_named_running(PROCESS_NAME)) def test_ssh_single(self): """ Check if SSH service can be configured and runs """ @@ -101,7 +100,7 @@ class TestServiceSSH(unittest.TestCase): self.assertTrue("100" in keepalive) # Check for running process - self.assertTrue(is_service_running()) + self.assertTrue(process_named_running(PROCESS_NAME)) def test_ssh_multi(self): """ Check if SSH service can be configured and runs with multiple @@ -128,7 +127,7 @@ class TestServiceSSH(unittest.TestCase): self.assertIn(address, tmp) # Check for running process - self.assertTrue(is_service_running()) + self.assertTrue(process_named_running(PROCESS_NAME)) if __name__ == '__main__': unittest.main() diff --git a/smoketest/scripts/cli/test_system_lcd.py b/smoketest/scripts/cli/test_system_lcd.py index 931a91c53..9385799b0 100755 --- a/smoketest/scripts/cli/test_system_lcd.py +++ b/smoketest/scripts/cli/test_system_lcd.py @@ -18,9 +18,10 @@ import os import unittest from configparser import ConfigParser -from psutil import process_iter from vyos.configsession import ConfigSession +from vyos.util import process_named_running +config_file = '/run/LCDd/LCDd.conf' base_path = ['system', 'lcd'] class TestSystemLCD(unittest.TestCase): @@ -42,13 +43,13 @@ class TestSystemLCD(unittest.TestCase): # load up ini-styled LCDd.conf conf = ConfigParser() - conf.read('/run/LCDd/LCDd.conf') + conf.read(config_file) self.assertEqual(conf['CFontzPacket']['Model'], '533') self.assertEqual(conf['CFontzPacket']['Device'], '/dev/ttyS1') - # both processes running - self.assertTrue('LCDd' in (p.name() for p in process_iter())) + # Check for running process + self.assertTrue(process_named_running('LCDd')) if __name__ == '__main__': unittest.main() diff --git a/smoketest/scripts/cli/test_system_ntp.py b/smoketest/scripts/cli/test_system_ntp.py index 856a28916..2a7c64870 100755 --- a/smoketest/scripts/cli/test_system_ntp.py +++ b/smoketest/scripts/cli/test_system_ntp.py @@ -18,11 +18,14 @@ import re import os import unittest -from psutil import process_iter -from vyos.configsession import ConfigSession, ConfigSessionError -from vyos.template import vyos_address_from_cidr, vyos_netmask_from_cidr +from vyos.configsession import ConfigSession +from vyos.configsession import ConfigSessionError +from vyos.template import vyos_address_from_cidr +from vyos.template import vyos_netmask_from_cidr from vyos.util import read_file +from vyos.util import process_named_running +PROCESS_NAME = 'ntpd' NTP_CONF = '/etc/ntp.conf' base_path = ['system', 'ntp'] @@ -63,7 +66,7 @@ class TestSystemNTP(unittest.TestCase): self.assertTrue(test in tmp) # Check for running process - self.assertTrue("ntpd" in (p.name() for p in process_iter())) + self.assertTrue(process_named_running(PROCESS_NAME)) def test_ntp_clients(self): """ Test the allowed-networks statement """ @@ -102,7 +105,7 @@ class TestSystemNTP(unittest.TestCase): self.assertEqual(tmp, test) # Check for running process - self.assertTrue("ntpd" in (p.name() for p in process_iter())) + self.assertTrue(process_named_running(PROCESS_NAME)) if __name__ == '__main__': unittest.main() diff --git a/smoketest/scripts/cli/test_vpn_openconnect.py b/smoketest/scripts/cli/test_vpn_openconnect.py index d2b82d686..2ba6aabf9 100755 --- a/smoketest/scripts/cli/test_vpn_openconnect.py +++ b/smoketest/scripts/cli/test_vpn_openconnect.py @@ -14,13 +14,11 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. -import re import os import unittest -from psutil import process_iter -from vyos.configsession import ConfigSession, ConfigSessionError -from vyos.util import read_file +from vyos.configsession import ConfigSession +from vyos.util import process_named_running OCSERV_CONF = '/run/ocserv/ocserv.conf' base_path = ['vpn', 'openconnect'] @@ -52,7 +50,7 @@ class TestVpnOpenconnect(unittest.TestCase): self.session.commit() # Check for running process - self.assertTrue("ocserv-main" in (p.name() for p in process_iter())) + self.assertTrue(process_named_running('ocserv-main')) if __name__ == '__main__': unittest.main() |