summaryrefslogtreecommitdiff
path: root/smoketest/scripts/cli
diff options
context:
space:
mode:
Diffstat (limited to 'smoketest/scripts/cli')
-rw-r--r--smoketest/scripts/cli/base_accel_ppp_test.py17
-rw-r--r--smoketest/scripts/cli/base_interfaces_test.py30
-rwxr-xr-xsmoketest/scripts/cli/test_interfaces_openvpn.py97
-rwxr-xr-xsmoketest/scripts/cli/test_nat.py136
-rwxr-xr-xsmoketest/scripts/cli/test_service_pppoe-server.py2
-rwxr-xr-xsmoketest/scripts/cli/test_service_tftp-server.py105
-rwxr-xr-xsmoketest/scripts/cli/test_system_ntp.py8
-rwxr-xr-xsmoketest/scripts/cli/test_vpn_sstp.py1
8 files changed, 363 insertions, 33 deletions
diff --git a/smoketest/scripts/cli/base_accel_ppp_test.py b/smoketest/scripts/cli/base_accel_ppp_test.py
index cf401b0d8..56cbf1dd4 100644
--- a/smoketest/scripts/cli/base_accel_ppp_test.py
+++ b/smoketest/scripts/cli/base_accel_ppp_test.py
@@ -44,6 +44,9 @@ class BasicAccelPPPTest:
def set(self, path):
self.session.set(self._base_path + path)
+ def delete(self, path):
+ self.session.delete(self._base_path + path)
+
def basic_config(self):
# PPPoE local auth mode requires local users to be configured!
self.set(['authentication', 'local-users', 'username', 'vyos', 'password', 'vyos'])
@@ -117,6 +120,20 @@ class BasicAccelPPPTest:
# Check for running process
self.assertTrue(process_named_running(self._process_name))
+ # Check local-users default value(s)
+ self.delete(['authentication', 'local-users', 'username', user, 'static-ip'])
+ # commit changes
+ self.session.commit()
+
+ # check local users
+ tmp = cmd(f'sudo cat {self._chap_secrets}')
+ regex = f'{user}\s+\*\s+{password}\s+\*\s+{download}/{upload}'
+ tmp = re.findall(regex, tmp)
+ self.assertTrue(tmp)
+
+ # Check for running process
+ self.assertTrue(process_named_running(self._process_name))
+
def test_authentication_radius(self):
""" Test configuration of RADIUS authentication for PPPoE server """
self.basic_config()
diff --git a/smoketest/scripts/cli/base_interfaces_test.py b/smoketest/scripts/cli/base_interfaces_test.py
index 047c19dd0..c6bb5bd1a 100644
--- a/smoketest/scripts/cli/base_interfaces_test.py
+++ b/smoketest/scripts/cli/base_interfaces_test.py
@@ -228,7 +228,7 @@ class BasicInterfaceTest:
self._mtu_test(vif)
def test_ip_options(self):
- """ test IP options like arp """
+ """ Test interface base IPv4 options """
if not self._test_ip:
return None
@@ -241,6 +241,7 @@ class BasicInterfaceTest:
# Options
self.session.set(path + ['ip', 'arp-cache-timeout', arp_tmo])
self.session.set(path + ['ip', 'disable-arp-filter'])
+ self.session.set(path + ['ip', 'disable-forwarding'])
self.session.set(path + ['ip', 'enable-arp-accept'])
self.session.set(path + ['ip', 'enable-arp-announce'])
self.session.set(path + ['ip', 'enable-arp-ignore'])
@@ -266,6 +267,9 @@ class BasicInterfaceTest:
tmp = read_file(f'/proc/sys/net/ipv4/conf/{interface}/arp_ignore')
self.assertEqual('1', tmp)
+ tmp = read_file(f'/proc/sys/net/ipv4/conf/{interface}/forwarding')
+ self.assertEqual('0', tmp)
+
tmp = read_file(f'/proc/sys/net/ipv4/conf/{interface}/proxy_arp')
self.assertEqual('1', tmp)
@@ -274,3 +278,27 @@ class BasicInterfaceTest:
tmp = read_file(f'/proc/sys/net/ipv4/conf/{interface}/rp_filter')
self.assertEqual('2', tmp)
+
+ def test_ipv6_options(self):
+ """ Test interface base IPv6 options """
+ if not self._test_ipv6:
+ return None
+
+ for interface in self._interfaces:
+ dad_transmits = '10'
+ path = self._base_path + [interface]
+ for option in self._options.get(interface, []):
+ self.session.set(path + option.split())
+
+ # Options
+ self.session.set(path + ['ipv6', 'disable-forwarding'])
+ self.session.set(path + ['ipv6', 'dup-addr-detect-transmits', dad_transmits])
+
+ self.session.commit()
+
+ for interface in self._interfaces:
+ tmp = read_file(f'/proc/sys/net/ipv6/conf/{interface}/forwarding')
+ self.assertEqual('0', tmp)
+
+ tmp = read_file(f'/proc/sys/net/ipv6/conf/{interface}/dad_transmits')
+ self.assertEqual(dad_transmits, tmp)
diff --git a/smoketest/scripts/cli/test_interfaces_openvpn.py b/smoketest/scripts/cli/test_interfaces_openvpn.py
new file mode 100755
index 000000000..0ac91c170
--- /dev/null
+++ b/smoketest/scripts/cli/test_interfaces_openvpn.py
@@ -0,0 +1,97 @@
+#!/usr/bin/env python3
+#
+# Copyright (C) 2020 VyOS maintainers and contributors
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License version 2 or later as
+# published by the Free Software Foundation.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+import os
+import unittest
+
+from vyos.configsession import ConfigSession
+from vyos.configsession import ConfigSessionError
+from vyos.util import cmd
+from vyos.util import process_named_running
+from vyos.util import read_file
+
+PROCESS_NAME = 'openvpn'
+
+base_path = ['interfaces', 'openvpn']
+ca_cert = '/config/auth/ovpn_test_ca.crt'
+ssl_cert = '/config/auth/ovpn_test_server.crt'
+ssl_key = '/config/auth/ovpn_test_server.key'
+
+class TestInterfacesOpenVPN(unittest.TestCase):
+ def setUp(self):
+ self.session = ConfigSession(os.getpid())
+
+ def tearDown(self):
+ self.session.delete(base_path)
+ self.session.commit()
+ del self.session
+
+ def test_client(self):
+ """ Basic OpenVPN client test """
+ interface = 'vtun10'
+ remote_host = '192.0.2.1'
+ remote_port = '1194'
+ protocol = 'udp'
+ path = base_path + [interface]
+
+ self.session.set(path + ['device-type', 'tun'])
+ self.session.set(path + ['encryption', 'cipher', 'aes256'])
+ self.session.set(path + ['hash', 'sha1'])
+ self.session.set(path + ['mode', 'client'])
+ self.session.set(path + ['persistent-tunnel'])
+ self.session.set(path + ['protocol', protocol])
+ self.session.set(path + ['remote-host', remote_host])
+ self.session.set(path + ['remote-port', remote_port])
+ self.session.set(path + ['tls', 'ca-cert-file', ca_cert])
+ self.session.set(path + ['tls', 'cert-file', ssl_cert])
+ self.session.set(path + ['tls', 'key-file', ssl_key])
+
+ self.session.commit()
+
+ config_file = f'/run/openvpn/{interface}.conf'
+ config = read_file(config_file)
+
+ self.assertIn(f'dev {interface}', config)
+ self.assertIn('dev-type tun', config)
+ self.assertIn('persist-key', config)
+ self.assertIn(f'proto {protocol}', config)
+ self.assertIn(f'rport {remote_port}', config)
+ self.assertIn(f'remote {remote_host}', config)
+ self.assertIn('persist-tun', config)
+
+
+ self.assertTrue(process_named_running(PROCESS_NAME))
+
+if __name__ == '__main__':
+ # Our SSL certificates need a subject ...
+ subject = '/C=DE/ST=BY/O=VyOS/localityName=Cloud/commonName=vyos/' \
+ 'organizationalUnitName=VyOS/emailAddress=maintainers@vyos.io/'
+
+ if not os.path.isfile(ssl_key) and not os.path.isfile(ssl_cert) and not os.path.isfile(ca_cert):
+ # Generate mandatory SSL certificate
+ tmp = f'openssl req -newkey rsa:4096 -new -nodes -x509 -days 3650 '\
+ f'-keyout {ssl_key} -out {ssl_cert} -subj {subject}'
+ cmd(tmp)
+
+ # Generate "CA"
+ tmp = f'openssl req -new -x509 -key {ssl_key} -out {ca_cert} '\
+ f'-subj {subject}'
+ cmd(tmp)
+
+ for file in [ca_cert, ssl_cert, ssl_key]:
+ cmd(f'sudo chown openvpn:openvpn {file}')
+
+ unittest.main()
diff --git a/smoketest/scripts/cli/test_nat.py b/smoketest/scripts/cli/test_nat.py
index b06fa239d..b5bde743b 100755
--- a/smoketest/scripts/cli/test_nat.py
+++ b/smoketest/scripts/cli/test_nat.py
@@ -19,13 +19,14 @@ import jmespath
import json
import unittest
-from vyos.configsession import ConfigSession, ConfigSessionError
+from vyos.configsession import ConfigSession
+from vyos.configsession import ConfigSessionError
from vyos.util import cmd
+from vyos.util import vyos_dict_search
base_path = ['nat']
-source_path = base_path + ['source']
-
-snat_pattern = 'nftables[?rule].rule[?chain].{chain: chain, comment: comment, address: { network: expr[].match.right.prefix.addr | [0], prefix: expr[].match.right.prefix.len | [0]}}'
+src_path = base_path + ['source']
+dst_path = base_path + ['destination']
class TestNAT(unittest.TestCase):
def setUp(self):
@@ -38,37 +39,126 @@ class TestNAT(unittest.TestCase):
self.session.delete(base_path)
self.session.commit()
- def test_source_nat(self):
- """ Configure and validate source NAT rule(s) """
-
- network = '192.168.0.0/16'
- self.session.set(source_path + ['rule', '1', 'destination', 'address', network])
- self.session.set(source_path + ['rule', '1', 'exclude'])
+ def test_snat(self):
+ """ Test source NAT (SNAT) rules """
+
+ rules = ['100', '110', '120', '130', '200', '210', '220', '230']
+ outbound_iface_100 = 'eth0'
+ outbound_iface_200 = 'eth1'
+ for rule in rules:
+ network = f'192.168.{rule}.0/24'
+ # depending of rule order we check either for source address for NAT
+ # or configured destination address for NAT
+ if int(rule) < 200:
+ self.session.set(src_path + ['rule', rule, 'source', 'address', network])
+ self.session.set(src_path + ['rule', rule, 'outbound-interface', outbound_iface_100])
+ self.session.set(src_path + ['rule', rule, 'translation', 'address', 'masquerade'])
+ else:
+ self.session.set(src_path + ['rule', rule, 'destination', 'address', network])
+ self.session.set(src_path + ['rule', rule, 'outbound-interface', outbound_iface_200])
+ self.session.set(src_path + ['rule', rule, 'exclude'])
- # check validate() - outbound-interface must be defined
- with self.assertRaises(ConfigSessionError):
- self.session.commit()
-
- self.session.set(source_path + ['rule', '1', 'outbound-interface', 'any'])
self.session.commit()
tmp = cmd('sudo nft -j list table nat')
- nftable_json = json.loads(tmp)
- condensed_json = jmespath.search(snat_pattern, nftable_json)[0]
-
- self.assertEqual(condensed_json['comment'], 'DST-NAT-1')
- self.assertEqual(condensed_json['address']['network'], network.split('/')[0])
- self.assertEqual(str(condensed_json['address']['prefix']), network.split('/')[1])
+ data_json = jmespath.search('nftables[?rule].rule[?chain]', json.loads(tmp))
+
+ for idx in range(0, len(data_json)):
+ rule = str(rules[idx])
+ data = data_json[idx]
+ network = f'192.168.{rule}.0/24'
+
+ self.assertEqual(data['chain'], 'POSTROUTING')
+ self.assertEqual(data['comment'], f'SRC-NAT-{rule}')
+ self.assertEqual(data['family'], 'ip')
+ self.assertEqual(data['table'], 'nat')
+
+ iface = vyos_dict_search('match.right', data['expr'][0])
+ direction = vyos_dict_search('match.left.payload.field', data['expr'][1])
+ address = vyos_dict_search('match.right.prefix.addr', data['expr'][1])
+ mask = vyos_dict_search('match.right.prefix.len', data['expr'][1])
+
+ if int(rule) < 200:
+ self.assertEqual(direction, 'saddr')
+ self.assertEqual(iface, outbound_iface_100)
+ # check for masquerade keyword
+ self.assertIn('masquerade', data['expr'][3])
+ else:
+ self.assertEqual(direction, 'daddr')
+ self.assertEqual(iface, outbound_iface_200)
+ # check for return keyword due to 'exclude'
+ self.assertIn('return', data['expr'][3])
+
+ self.assertEqual(f'{address}/{mask}', network)
+
+ def test_dnat(self):
+ """ Test destination NAT (DNAT) rules """
+
+ rules = ['100', '110', '120', '130', '200', '210', '220', '230']
+ inbound_iface_100 = 'eth0'
+ inbound_iface_200 = 'eth1'
+ inbound_proto_100 = 'udp'
+ inbound_proto_200 = 'tcp'
+
+ for rule in rules:
+ port = f'10{rule}'
+ self.session.set(dst_path + ['rule', rule, 'source', 'port', port])
+ self.session.set(dst_path + ['rule', rule, 'translation', 'address', '192.0.2.1'])
+ self.session.set(dst_path + ['rule', rule, 'translation', 'port', port])
+ if int(rule) < 200:
+ self.session.set(dst_path + ['rule', rule, 'protocol', inbound_proto_100])
+ self.session.set(dst_path + ['rule', rule, 'inbound-interface', inbound_iface_100])
+ else:
+ self.session.set(dst_path + ['rule', rule, 'protocol', inbound_proto_200])
+ self.session.set(dst_path + ['rule', rule, 'inbound-interface', inbound_iface_200])
+ self.session.commit()
- def test_validation(self):
+ tmp = cmd('sudo nft -j list table nat')
+ data_json = jmespath.search('nftables[?rule].rule[?chain]', json.loads(tmp))
+
+ for idx in range(0, len(data_json)):
+ rule = str(rules[idx])
+ data = data_json[idx]
+ port = int(f'10{rule}')
+
+ self.assertEqual(data['chain'], 'PREROUTING')
+ self.assertEqual(data['comment'].split()[0], f'DST-NAT-{rule}')
+ self.assertEqual(data['family'], 'ip')
+ self.assertEqual(data['table'], 'nat')
+
+ iface = vyos_dict_search('match.right', data['expr'][0])
+ direction = vyos_dict_search('match.left.payload.field', data['expr'][1])
+ protocol = vyos_dict_search('match.left.payload.protocol', data['expr'][1])
+ dnat_addr = vyos_dict_search('dnat.addr', data['expr'][3])
+ dnat_port = vyos_dict_search('dnat.port', data['expr'][3])
+
+ self.assertEqual(direction, 'sport')
+ self.assertEqual(dnat_addr, '192.0.2.1')
+ self.assertEqual(dnat_port, port)
+ if int(rule) < 200:
+ self.assertEqual(iface, inbound_iface_100)
+ self.assertEqual(protocol, inbound_proto_100)
+ else:
+ self.assertEqual(iface, inbound_iface_200)
+
+
+ def test_validation_logic(self):
""" T2813: Ensure translation address is specified """
- self.session.set(source_path + ['rule', '100', 'outbound-interface', 'eth0'])
+ rule = '5'
+ self.session.set(src_path + ['rule', rule, 'source', 'address', '192.0.2.0/24'])
+
+ # check validate() - outbound-interface must be defined
+ with self.assertRaises(ConfigSessionError):
+ self.session.commit()
+ self.session.set(src_path + ['rule', rule, 'outbound-interface', 'eth0'])
# check validate() - translation address not specified
with self.assertRaises(ConfigSessionError):
self.session.commit()
+ self.session.set(src_path + ['rule', rule, 'translation', 'address', 'masquerade'])
+ self.session.commit()
if __name__ == '__main__':
unittest.main()
diff --git a/smoketest/scripts/cli/test_service_pppoe-server.py b/smoketest/scripts/cli/test_service_pppoe-server.py
index 8db002b57..f0c71e2de 100755
--- a/smoketest/scripts/cli/test_service_pppoe-server.py
+++ b/smoketest/scripts/cli/test_service_pppoe-server.py
@@ -47,7 +47,7 @@ class TestServicePPPoEServer(BasicAccelPPPTest.BaseTest):
mtu = '1492'
# validate some common values in the configuration
- for tmp in ['log_syslog', 'pppoe', 'chap-secrets', 'ippool',
+ for tmp in ['log_syslog', 'pppoe', 'ippool',
'auth_mschap_v2', 'auth_mschap_v1', 'auth_chap_md5',
'auth_pap', 'shaper']:
# Settings without values provide None
diff --git a/smoketest/scripts/cli/test_service_tftp-server.py b/smoketest/scripts/cli/test_service_tftp-server.py
new file mode 100755
index 000000000..92333392a
--- /dev/null
+++ b/smoketest/scripts/cli/test_service_tftp-server.py
@@ -0,0 +1,105 @@
+#!/usr/bin/env python3
+#
+# Copyright (C) 2019-2020 VyOS maintainers and contributors
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License version 2 or later as
+# published by the Free Software Foundation.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+import re
+import os
+import unittest
+
+from psutil import process_iter
+
+from vyos.configsession import ConfigSession
+from vyos.configsession import ConfigSessionError
+from vyos.util import read_file
+from vyos.util import process_named_running
+from vyos.validate import is_ipv6
+
+PROCESS_NAME = 'in.tftpd'
+base_path = ['service', 'tftp-server']
+dummy_if_path = ['interfaces', 'dummy', 'dum69']
+address_ipv4 = '192.0.2.1'
+address_ipv6 = '2001:db8::1'
+
+class TestServiceTFTPD(unittest.TestCase):
+ def setUp(self):
+ self.session = ConfigSession(os.getpid())
+ self.session.set(dummy_if_path + ['address', address_ipv4 + '/32'])
+ self.session.set(dummy_if_path + ['address', address_ipv6 + '/128'])
+
+ def tearDown(self):
+ self.session.delete(base_path)
+ self.session.delete(dummy_if_path)
+ self.session.commit()
+ del self.session
+
+ def test_01_tftpd_single(self):
+ directory = '/tmp'
+ port = '69' # default port
+
+ self.session.set(base_path + ['allow-upload'])
+ self.session.set(base_path + ['directory', directory])
+ self.session.set(base_path + ['listen-address', address_ipv4])
+
+ # commit changes
+ self.session.commit()
+
+ config = read_file('/etc/default/tftpd0')
+ # verify listen IP address
+ self.assertIn(f'{address_ipv4}:{port} -4', config)
+ # verify directory
+ self.assertIn(directory, config)
+ # verify upload
+ self.assertIn('--create --umask 000', config)
+
+ # Check for running process
+ self.assertTrue(process_named_running(PROCESS_NAME))
+
+ def test_02_tftpd_multi(self):
+ directory = '/tmp'
+ address = [address_ipv4, address_ipv6]
+ port = '70'
+
+ self.session.set(base_path + ['directory', directory])
+ for addr in address:
+ self.session.set(base_path + ['listen-address', addr])
+ self.session.set(base_path + ['port', port])
+
+ # commit changes
+ self.session.commit()
+
+ for idx in range(0, len(address)):
+ config = read_file(f'/etc/default/tftpd{idx}')
+ addr = address[idx]
+
+ # verify listen IP address
+ if is_ipv6(addr):
+ addr = f'[{addr}]'
+ self.assertIn(f'{addr}:{port} -6', config)
+ else:
+ self.assertIn(f'{addr}:{port} -4', config)
+
+ # verify directory
+ self.assertIn(directory, config)
+
+ # Check for running processes - one process is spawned per listen
+ # IP address, wheter it's IPv4 or IPv6
+ count = 0
+ for p in process_iter():
+ if PROCESS_NAME in p.name():
+ count += 1
+ self.assertEqual(count, len(address))
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/smoketest/scripts/cli/test_system_ntp.py b/smoketest/scripts/cli/test_system_ntp.py
index 2a7c64870..4f62b62d5 100755
--- a/smoketest/scripts/cli/test_system_ntp.py
+++ b/smoketest/scripts/cli/test_system_ntp.py
@@ -70,10 +70,6 @@ class TestSystemNTP(unittest.TestCase):
def test_ntp_clients(self):
""" Test the allowed-networks statement """
- listen_address = ['127.0.0.1', '::1']
- for listen in listen_address:
- self.session.set(base_path + ['listen-address', listen])
-
networks = ['192.0.2.0/24', '2001:db8:1000::/64']
for network in networks:
self.session.set(base_path + ['allow-clients', 'address', network])
@@ -99,9 +95,7 @@ class TestSystemNTP(unittest.TestCase):
# Check listen address
tmp = get_config_value('interface')
- test = ['ignore wildcard']
- for listen in listen_address:
- test.append(f'listen {listen}')
+ test = ['ignore wildcard', 'listen 127.0.0.1', 'listen ::1']
self.assertEqual(tmp, test)
# Check for running process
diff --git a/smoketest/scripts/cli/test_vpn_sstp.py b/smoketest/scripts/cli/test_vpn_sstp.py
index 83be4c248..9babb83dc 100755
--- a/smoketest/scripts/cli/test_vpn_sstp.py
+++ b/smoketest/scripts/cli/test_vpn_sstp.py
@@ -19,7 +19,6 @@ import unittest
from base_accel_ppp_test import BasicAccelPPPTest
from vyos.util import cmd
-process_name = 'accel-pppd'
ca_cert = '/tmp/ca.crt'
ssl_cert = '/tmp/server.crt'
ssl_key = '/tmp/server.key'