diff options
Diffstat (limited to 'smoketest/scripts/cli')
| -rw-r--r-- | smoketest/scripts/cli/base_accel_ppp_test.py | 17 | ||||
| -rw-r--r-- | smoketest/scripts/cli/base_interfaces_test.py | 30 | ||||
| -rwxr-xr-x | smoketest/scripts/cli/test_interfaces_openvpn.py | 97 | ||||
| -rwxr-xr-x | smoketest/scripts/cli/test_nat.py | 136 | ||||
| -rwxr-xr-x | smoketest/scripts/cli/test_service_pppoe-server.py | 2 | ||||
| -rwxr-xr-x | smoketest/scripts/cli/test_service_tftp-server.py | 105 | ||||
| -rwxr-xr-x | smoketest/scripts/cli/test_system_ntp.py | 8 | ||||
| -rwxr-xr-x | smoketest/scripts/cli/test_vpn_sstp.py | 1 |
8 files changed, 363 insertions, 33 deletions
diff --git a/smoketest/scripts/cli/base_accel_ppp_test.py b/smoketest/scripts/cli/base_accel_ppp_test.py index cf401b0d8..56cbf1dd4 100644 --- a/smoketest/scripts/cli/base_accel_ppp_test.py +++ b/smoketest/scripts/cli/base_accel_ppp_test.py @@ -44,6 +44,9 @@ class BasicAccelPPPTest: def set(self, path): self.session.set(self._base_path + path) + def delete(self, path): + self.session.delete(self._base_path + path) + def basic_config(self): # PPPoE local auth mode requires local users to be configured! self.set(['authentication', 'local-users', 'username', 'vyos', 'password', 'vyos']) @@ -117,6 +120,20 @@ class BasicAccelPPPTest: # Check for running process self.assertTrue(process_named_running(self._process_name)) + # Check local-users default value(s) + self.delete(['authentication', 'local-users', 'username', user, 'static-ip']) + # commit changes + self.session.commit() + + # check local users + tmp = cmd(f'sudo cat {self._chap_secrets}') + regex = f'{user}\s+\*\s+{password}\s+\*\s+{download}/{upload}' + tmp = re.findall(regex, tmp) + self.assertTrue(tmp) + + # Check for running process + self.assertTrue(process_named_running(self._process_name)) + def test_authentication_radius(self): """ Test configuration of RADIUS authentication for PPPoE server """ self.basic_config() diff --git a/smoketest/scripts/cli/base_interfaces_test.py b/smoketest/scripts/cli/base_interfaces_test.py index 047c19dd0..c6bb5bd1a 100644 --- a/smoketest/scripts/cli/base_interfaces_test.py +++ b/smoketest/scripts/cli/base_interfaces_test.py @@ -228,7 +228,7 @@ class BasicInterfaceTest: self._mtu_test(vif) def test_ip_options(self): - """ test IP options like arp """ + """ Test interface base IPv4 options """ if not self._test_ip: return None @@ -241,6 +241,7 @@ class BasicInterfaceTest: # Options self.session.set(path + ['ip', 'arp-cache-timeout', arp_tmo]) self.session.set(path + ['ip', 'disable-arp-filter']) + self.session.set(path + ['ip', 'disable-forwarding']) self.session.set(path + ['ip', 'enable-arp-accept']) self.session.set(path + ['ip', 'enable-arp-announce']) self.session.set(path + ['ip', 'enable-arp-ignore']) @@ -266,6 +267,9 @@ class BasicInterfaceTest: tmp = read_file(f'/proc/sys/net/ipv4/conf/{interface}/arp_ignore') self.assertEqual('1', tmp) + tmp = read_file(f'/proc/sys/net/ipv4/conf/{interface}/forwarding') + self.assertEqual('0', tmp) + tmp = read_file(f'/proc/sys/net/ipv4/conf/{interface}/proxy_arp') self.assertEqual('1', tmp) @@ -274,3 +278,27 @@ class BasicInterfaceTest: tmp = read_file(f'/proc/sys/net/ipv4/conf/{interface}/rp_filter') self.assertEqual('2', tmp) + + def test_ipv6_options(self): + """ Test interface base IPv6 options """ + if not self._test_ipv6: + return None + + for interface in self._interfaces: + dad_transmits = '10' + path = self._base_path + [interface] + for option in self._options.get(interface, []): + self.session.set(path + option.split()) + + # Options + self.session.set(path + ['ipv6', 'disable-forwarding']) + self.session.set(path + ['ipv6', 'dup-addr-detect-transmits', dad_transmits]) + + self.session.commit() + + for interface in self._interfaces: + tmp = read_file(f'/proc/sys/net/ipv6/conf/{interface}/forwarding') + self.assertEqual('0', tmp) + + tmp = read_file(f'/proc/sys/net/ipv6/conf/{interface}/dad_transmits') + self.assertEqual(dad_transmits, tmp) diff --git a/smoketest/scripts/cli/test_interfaces_openvpn.py b/smoketest/scripts/cli/test_interfaces_openvpn.py new file mode 100755 index 000000000..0ac91c170 --- /dev/null +++ b/smoketest/scripts/cli/test_interfaces_openvpn.py @@ -0,0 +1,97 @@ +#!/usr/bin/env python3 +# +# Copyright (C) 2020 VyOS maintainers and contributors +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 2 or later as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import os +import unittest + +from vyos.configsession import ConfigSession +from vyos.configsession import ConfigSessionError +from vyos.util import cmd +from vyos.util import process_named_running +from vyos.util import read_file + +PROCESS_NAME = 'openvpn' + +base_path = ['interfaces', 'openvpn'] +ca_cert = '/config/auth/ovpn_test_ca.crt' +ssl_cert = '/config/auth/ovpn_test_server.crt' +ssl_key = '/config/auth/ovpn_test_server.key' + +class TestInterfacesOpenVPN(unittest.TestCase): + def setUp(self): + self.session = ConfigSession(os.getpid()) + + def tearDown(self): + self.session.delete(base_path) + self.session.commit() + del self.session + + def test_client(self): + """ Basic OpenVPN client test """ + interface = 'vtun10' + remote_host = '192.0.2.1' + remote_port = '1194' + protocol = 'udp' + path = base_path + [interface] + + self.session.set(path + ['device-type', 'tun']) + self.session.set(path + ['encryption', 'cipher', 'aes256']) + self.session.set(path + ['hash', 'sha1']) + self.session.set(path + ['mode', 'client']) + self.session.set(path + ['persistent-tunnel']) + self.session.set(path + ['protocol', protocol]) + self.session.set(path + ['remote-host', remote_host]) + self.session.set(path + ['remote-port', remote_port]) + self.session.set(path + ['tls', 'ca-cert-file', ca_cert]) + self.session.set(path + ['tls', 'cert-file', ssl_cert]) + self.session.set(path + ['tls', 'key-file', ssl_key]) + + self.session.commit() + + config_file = f'/run/openvpn/{interface}.conf' + config = read_file(config_file) + + self.assertIn(f'dev {interface}', config) + self.assertIn('dev-type tun', config) + self.assertIn('persist-key', config) + self.assertIn(f'proto {protocol}', config) + self.assertIn(f'rport {remote_port}', config) + self.assertIn(f'remote {remote_host}', config) + self.assertIn('persist-tun', config) + + + self.assertTrue(process_named_running(PROCESS_NAME)) + +if __name__ == '__main__': + # Our SSL certificates need a subject ... + subject = '/C=DE/ST=BY/O=VyOS/localityName=Cloud/commonName=vyos/' \ + 'organizationalUnitName=VyOS/emailAddress=maintainers@vyos.io/' + + if not os.path.isfile(ssl_key) and not os.path.isfile(ssl_cert) and not os.path.isfile(ca_cert): + # Generate mandatory SSL certificate + tmp = f'openssl req -newkey rsa:4096 -new -nodes -x509 -days 3650 '\ + f'-keyout {ssl_key} -out {ssl_cert} -subj {subject}' + cmd(tmp) + + # Generate "CA" + tmp = f'openssl req -new -x509 -key {ssl_key} -out {ca_cert} '\ + f'-subj {subject}' + cmd(tmp) + + for file in [ca_cert, ssl_cert, ssl_key]: + cmd(f'sudo chown openvpn:openvpn {file}') + + unittest.main() diff --git a/smoketest/scripts/cli/test_nat.py b/smoketest/scripts/cli/test_nat.py index b06fa239d..b5bde743b 100755 --- a/smoketest/scripts/cli/test_nat.py +++ b/smoketest/scripts/cli/test_nat.py @@ -19,13 +19,14 @@ import jmespath import json import unittest -from vyos.configsession import ConfigSession, ConfigSessionError +from vyos.configsession import ConfigSession +from vyos.configsession import ConfigSessionError from vyos.util import cmd +from vyos.util import vyos_dict_search base_path = ['nat'] -source_path = base_path + ['source'] - -snat_pattern = 'nftables[?rule].rule[?chain].{chain: chain, comment: comment, address: { network: expr[].match.right.prefix.addr | [0], prefix: expr[].match.right.prefix.len | [0]}}' +src_path = base_path + ['source'] +dst_path = base_path + ['destination'] class TestNAT(unittest.TestCase): def setUp(self): @@ -38,37 +39,126 @@ class TestNAT(unittest.TestCase): self.session.delete(base_path) self.session.commit() - def test_source_nat(self): - """ Configure and validate source NAT rule(s) """ - - network = '192.168.0.0/16' - self.session.set(source_path + ['rule', '1', 'destination', 'address', network]) - self.session.set(source_path + ['rule', '1', 'exclude']) + def test_snat(self): + """ Test source NAT (SNAT) rules """ + + rules = ['100', '110', '120', '130', '200', '210', '220', '230'] + outbound_iface_100 = 'eth0' + outbound_iface_200 = 'eth1' + for rule in rules: + network = f'192.168.{rule}.0/24' + # depending of rule order we check either for source address for NAT + # or configured destination address for NAT + if int(rule) < 200: + self.session.set(src_path + ['rule', rule, 'source', 'address', network]) + self.session.set(src_path + ['rule', rule, 'outbound-interface', outbound_iface_100]) + self.session.set(src_path + ['rule', rule, 'translation', 'address', 'masquerade']) + else: + self.session.set(src_path + ['rule', rule, 'destination', 'address', network]) + self.session.set(src_path + ['rule', rule, 'outbound-interface', outbound_iface_200]) + self.session.set(src_path + ['rule', rule, 'exclude']) - # check validate() - outbound-interface must be defined - with self.assertRaises(ConfigSessionError): - self.session.commit() - - self.session.set(source_path + ['rule', '1', 'outbound-interface', 'any']) self.session.commit() tmp = cmd('sudo nft -j list table nat') - nftable_json = json.loads(tmp) - condensed_json = jmespath.search(snat_pattern, nftable_json)[0] - - self.assertEqual(condensed_json['comment'], 'DST-NAT-1') - self.assertEqual(condensed_json['address']['network'], network.split('/')[0]) - self.assertEqual(str(condensed_json['address']['prefix']), network.split('/')[1]) + data_json = jmespath.search('nftables[?rule].rule[?chain]', json.loads(tmp)) + + for idx in range(0, len(data_json)): + rule = str(rules[idx]) + data = data_json[idx] + network = f'192.168.{rule}.0/24' + + self.assertEqual(data['chain'], 'POSTROUTING') + self.assertEqual(data['comment'], f'SRC-NAT-{rule}') + self.assertEqual(data['family'], 'ip') + self.assertEqual(data['table'], 'nat') + + iface = vyos_dict_search('match.right', data['expr'][0]) + direction = vyos_dict_search('match.left.payload.field', data['expr'][1]) + address = vyos_dict_search('match.right.prefix.addr', data['expr'][1]) + mask = vyos_dict_search('match.right.prefix.len', data['expr'][1]) + + if int(rule) < 200: + self.assertEqual(direction, 'saddr') + self.assertEqual(iface, outbound_iface_100) + # check for masquerade keyword + self.assertIn('masquerade', data['expr'][3]) + else: + self.assertEqual(direction, 'daddr') + self.assertEqual(iface, outbound_iface_200) + # check for return keyword due to 'exclude' + self.assertIn('return', data['expr'][3]) + + self.assertEqual(f'{address}/{mask}', network) + + def test_dnat(self): + """ Test destination NAT (DNAT) rules """ + + rules = ['100', '110', '120', '130', '200', '210', '220', '230'] + inbound_iface_100 = 'eth0' + inbound_iface_200 = 'eth1' + inbound_proto_100 = 'udp' + inbound_proto_200 = 'tcp' + + for rule in rules: + port = f'10{rule}' + self.session.set(dst_path + ['rule', rule, 'source', 'port', port]) + self.session.set(dst_path + ['rule', rule, 'translation', 'address', '192.0.2.1']) + self.session.set(dst_path + ['rule', rule, 'translation', 'port', port]) + if int(rule) < 200: + self.session.set(dst_path + ['rule', rule, 'protocol', inbound_proto_100]) + self.session.set(dst_path + ['rule', rule, 'inbound-interface', inbound_iface_100]) + else: + self.session.set(dst_path + ['rule', rule, 'protocol', inbound_proto_200]) + self.session.set(dst_path + ['rule', rule, 'inbound-interface', inbound_iface_200]) + self.session.commit() - def test_validation(self): + tmp = cmd('sudo nft -j list table nat') + data_json = jmespath.search('nftables[?rule].rule[?chain]', json.loads(tmp)) + + for idx in range(0, len(data_json)): + rule = str(rules[idx]) + data = data_json[idx] + port = int(f'10{rule}') + + self.assertEqual(data['chain'], 'PREROUTING') + self.assertEqual(data['comment'].split()[0], f'DST-NAT-{rule}') + self.assertEqual(data['family'], 'ip') + self.assertEqual(data['table'], 'nat') + + iface = vyos_dict_search('match.right', data['expr'][0]) + direction = vyos_dict_search('match.left.payload.field', data['expr'][1]) + protocol = vyos_dict_search('match.left.payload.protocol', data['expr'][1]) + dnat_addr = vyos_dict_search('dnat.addr', data['expr'][3]) + dnat_port = vyos_dict_search('dnat.port', data['expr'][3]) + + self.assertEqual(direction, 'sport') + self.assertEqual(dnat_addr, '192.0.2.1') + self.assertEqual(dnat_port, port) + if int(rule) < 200: + self.assertEqual(iface, inbound_iface_100) + self.assertEqual(protocol, inbound_proto_100) + else: + self.assertEqual(iface, inbound_iface_200) + + + def test_validation_logic(self): """ T2813: Ensure translation address is specified """ - self.session.set(source_path + ['rule', '100', 'outbound-interface', 'eth0']) + rule = '5' + self.session.set(src_path + ['rule', rule, 'source', 'address', '192.0.2.0/24']) + + # check validate() - outbound-interface must be defined + with self.assertRaises(ConfigSessionError): + self.session.commit() + self.session.set(src_path + ['rule', rule, 'outbound-interface', 'eth0']) # check validate() - translation address not specified with self.assertRaises(ConfigSessionError): self.session.commit() + self.session.set(src_path + ['rule', rule, 'translation', 'address', 'masquerade']) + self.session.commit() if __name__ == '__main__': unittest.main() diff --git a/smoketest/scripts/cli/test_service_pppoe-server.py b/smoketest/scripts/cli/test_service_pppoe-server.py index 8db002b57..f0c71e2de 100755 --- a/smoketest/scripts/cli/test_service_pppoe-server.py +++ b/smoketest/scripts/cli/test_service_pppoe-server.py @@ -47,7 +47,7 @@ class TestServicePPPoEServer(BasicAccelPPPTest.BaseTest): mtu = '1492' # validate some common values in the configuration - for tmp in ['log_syslog', 'pppoe', 'chap-secrets', 'ippool', + for tmp in ['log_syslog', 'pppoe', 'ippool', 'auth_mschap_v2', 'auth_mschap_v1', 'auth_chap_md5', 'auth_pap', 'shaper']: # Settings without values provide None diff --git a/smoketest/scripts/cli/test_service_tftp-server.py b/smoketest/scripts/cli/test_service_tftp-server.py new file mode 100755 index 000000000..92333392a --- /dev/null +++ b/smoketest/scripts/cli/test_service_tftp-server.py @@ -0,0 +1,105 @@ +#!/usr/bin/env python3 +# +# Copyright (C) 2019-2020 VyOS maintainers and contributors +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 2 or later as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import re +import os +import unittest + +from psutil import process_iter + +from vyos.configsession import ConfigSession +from vyos.configsession import ConfigSessionError +from vyos.util import read_file +from vyos.util import process_named_running +from vyos.validate import is_ipv6 + +PROCESS_NAME = 'in.tftpd' +base_path = ['service', 'tftp-server'] +dummy_if_path = ['interfaces', 'dummy', 'dum69'] +address_ipv4 = '192.0.2.1' +address_ipv6 = '2001:db8::1' + +class TestServiceTFTPD(unittest.TestCase): + def setUp(self): + self.session = ConfigSession(os.getpid()) + self.session.set(dummy_if_path + ['address', address_ipv4 + '/32']) + self.session.set(dummy_if_path + ['address', address_ipv6 + '/128']) + + def tearDown(self): + self.session.delete(base_path) + self.session.delete(dummy_if_path) + self.session.commit() + del self.session + + def test_01_tftpd_single(self): + directory = '/tmp' + port = '69' # default port + + self.session.set(base_path + ['allow-upload']) + self.session.set(base_path + ['directory', directory]) + self.session.set(base_path + ['listen-address', address_ipv4]) + + # commit changes + self.session.commit() + + config = read_file('/etc/default/tftpd0') + # verify listen IP address + self.assertIn(f'{address_ipv4}:{port} -4', config) + # verify directory + self.assertIn(directory, config) + # verify upload + self.assertIn('--create --umask 000', config) + + # Check for running process + self.assertTrue(process_named_running(PROCESS_NAME)) + + def test_02_tftpd_multi(self): + directory = '/tmp' + address = [address_ipv4, address_ipv6] + port = '70' + + self.session.set(base_path + ['directory', directory]) + for addr in address: + self.session.set(base_path + ['listen-address', addr]) + self.session.set(base_path + ['port', port]) + + # commit changes + self.session.commit() + + for idx in range(0, len(address)): + config = read_file(f'/etc/default/tftpd{idx}') + addr = address[idx] + + # verify listen IP address + if is_ipv6(addr): + addr = f'[{addr}]' + self.assertIn(f'{addr}:{port} -6', config) + else: + self.assertIn(f'{addr}:{port} -4', config) + + # verify directory + self.assertIn(directory, config) + + # Check for running processes - one process is spawned per listen + # IP address, wheter it's IPv4 or IPv6 + count = 0 + for p in process_iter(): + if PROCESS_NAME in p.name(): + count += 1 + self.assertEqual(count, len(address)) + +if __name__ == '__main__': + unittest.main() diff --git a/smoketest/scripts/cli/test_system_ntp.py b/smoketest/scripts/cli/test_system_ntp.py index 2a7c64870..4f62b62d5 100755 --- a/smoketest/scripts/cli/test_system_ntp.py +++ b/smoketest/scripts/cli/test_system_ntp.py @@ -70,10 +70,6 @@ class TestSystemNTP(unittest.TestCase): def test_ntp_clients(self): """ Test the allowed-networks statement """ - listen_address = ['127.0.0.1', '::1'] - for listen in listen_address: - self.session.set(base_path + ['listen-address', listen]) - networks = ['192.0.2.0/24', '2001:db8:1000::/64'] for network in networks: self.session.set(base_path + ['allow-clients', 'address', network]) @@ -99,9 +95,7 @@ class TestSystemNTP(unittest.TestCase): # Check listen address tmp = get_config_value('interface') - test = ['ignore wildcard'] - for listen in listen_address: - test.append(f'listen {listen}') + test = ['ignore wildcard', 'listen 127.0.0.1', 'listen ::1'] self.assertEqual(tmp, test) # Check for running process diff --git a/smoketest/scripts/cli/test_vpn_sstp.py b/smoketest/scripts/cli/test_vpn_sstp.py index 83be4c248..9babb83dc 100755 --- a/smoketest/scripts/cli/test_vpn_sstp.py +++ b/smoketest/scripts/cli/test_vpn_sstp.py @@ -19,7 +19,6 @@ import unittest from base_accel_ppp_test import BasicAccelPPPTest from vyos.util import cmd -process_name = 'accel-pppd' ca_cert = '/tmp/ca.crt' ssl_cert = '/tmp/server.crt' ssl_key = '/tmp/server.key' |
