diff options
| author | kumvijaya <kuvmijaya@gmail.com> | 2024-09-26 11:31:07 +0530 |
|---|---|---|
| committer | kumvijaya <kuvmijaya@gmail.com> | 2024-09-26 11:31:07 +0530 |
| commit | a950059053f7394acfb453cc0d8194aa3dc721fa (patch) | |
| tree | eb0acf278f649b5d1417e18e34d728efcd16e745 /smoketest/scripts/cli/test_interfaces_openvpn.py | |
| parent | f0815f3e9b212f424f5adb0c572a71119ad4a8a0 (diff) | |
| download | vyos-workflow-test-temp-a950059053f7394acfb453cc0d8194aa3dc721fa.tar.gz vyos-workflow-test-temp-a950059053f7394acfb453cc0d8194aa3dc721fa.zip | |
T6732: added same as vyos 1x
Diffstat (limited to 'smoketest/scripts/cli/test_interfaces_openvpn.py')
| -rw-r--r-- | smoketest/scripts/cli/test_interfaces_openvpn.py | 868 |
1 files changed, 868 insertions, 0 deletions
diff --git a/smoketest/scripts/cli/test_interfaces_openvpn.py b/smoketest/scripts/cli/test_interfaces_openvpn.py new file mode 100644 index 0000000..e087b87 --- /dev/null +++ b/smoketest/scripts/cli/test_interfaces_openvpn.py @@ -0,0 +1,868 @@ +#!/usr/bin/env python3 +# +# Copyright (C) 2020-2024 VyOS maintainers and contributors +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 2 or later as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import os +import unittest + +from glob import glob +from ipaddress import IPv4Network +from netifaces import interfaces + +from base_vyostest_shim import VyOSUnitTestSHIM + +from vyos.configsession import ConfigSessionError +from vyos.utils.process import cmd +from vyos.utils.process import process_named_running +from vyos.utils.file import read_file +from vyos.template import address_from_cidr +from vyos.template import inc_ip +from vyos.template import last_host_address +from vyos.template import netmask_from_cidr + +PROCESS_NAME = 'openvpn' + +base_path = ['interfaces', 'openvpn'] + +cert_data = """ +MIICFDCCAbugAwIBAgIUfMbIsB/ozMXijYgUYG80T1ry+mcwCgYIKoZIzj0EAwIw +WTELMAkGA1UEBhMCR0IxEzARBgNVBAgMClNvbWUtU3RhdGUxEjAQBgNVBAcMCVNv +bWUtQ2l0eTENMAsGA1UECgwEVnlPUzESMBAGA1UEAwwJVnlPUyBUZXN0MB4XDTIx +MDcyMDEyNDUxMloXDTI2MDcxOTEyNDUxMlowWTELMAkGA1UEBhMCR0IxEzARBgNV +BAgMClNvbWUtU3RhdGUxEjAQBgNVBAcMCVNvbWUtQ2l0eTENMAsGA1UECgwEVnlP +UzESMBAGA1UEAwwJVnlPUyBUZXN0MFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAE +01HrLcNttqq4/PtoMua8rMWEkOdBu7vP94xzDO7A8C92ls1v86eePy4QllKCzIw3 +QxBIoCuH2peGRfWgPRdFsKNhMF8wDwYDVR0TAQH/BAUwAwEB/zAOBgNVHQ8BAf8E +BAMCAYYwHQYDVR0lBBYwFAYIKwYBBQUHAwIGCCsGAQUFBwMBMB0GA1UdDgQWBBSu ++JnU5ZC4mkuEpqg2+Mk4K79oeDAKBggqhkjOPQQDAgNHADBEAiBEFdzQ/Bc3Lftz +ngrY605UhA6UprHhAogKgROv7iR4QgIgEFUxTtW3xXJcnUPWhhUFhyZoqfn8dE93 ++dm/LDnp7C0= +""" + +key_data = """ +MIGHAgEAMBMGByqGSM49AgEGCCqGSM49AwEHBG0wawIBAQQgPLpD0Ohhoq0g4nhx +2KMIuze7ucKUt/lBEB2wc03IxXyhRANCAATTUestw222qrj8+2gy5rysxYSQ50G7 +u8/3jHMM7sDwL3aWzW/zp54/LhCWUoLMjDdDEEigK4fal4ZF9aA9F0Ww +""" + +dh_data = """ +MIIBCAKCAQEApzGAPcQlLJiOyfGZgl1qxNgufXkdpjG7lMaOrO4TGr1giFe3jIFO +FxJNC/G9Dn+KSukaWssVVR+Jwr/JesZFPawihS03wC7cZsccykNRIjiteqJDwYJZ +UHieOxyCuCeY4pqOUCl1uswRGjLvIFtwynpnXKKuz2YtjNifma90PEgv/vVWKix+ +Q0TAbdbzJzO5xp8UVn9DuYfSr10k3LbDqDM7w5ezHZxFk24S5pN/yoOpdbxB8TS6 +7q3IYXxR3F+RseKu4J3AvkxXSP1j7COXddPpLnvbJT/SW8NrjuC/n0eKGvmeyqNv +108Y89jnT79MxMMRQk66iwlsd1m4pa/OYwIBAg== +""" + +ovpn_key_data = """ +443f2a710ac411c36894b2531e62c4550b079b8f3f08997f4be57c64abfdaaa4 +31d2396b01ecec3a2c0618959e8186d99f489742d25673ffb3268841ebb2e704 +2a2daabe584e79d51d2b1d7409bf8840f7e42efa3e660a521719b04ee88b9043 +e6315ae12da7c9abd55f67eeed71a9ee8c6e163b5d2661fc332cf90cb45658b4 +adf892f79537d37d3a3d90da283ce885adf325ffd2b5be92067cdf0345c7712c +9d36b642c170351b6d9ce9f6230c7a2617b0c181121bce7d5373404fb68e6521 +0b36e6d40ef2769cf8990503859f6f2db3c85ba74420430a6250d6a74ca51ece +4b85124bfdfec0c8a530cefa7350378d81a4539f74bed832a902ae4798142e4a +""" + +remote_port = '1194' +protocol = 'udp' +path = [] +interface = '' +remote_host = '' +vrf_name = 'orange' +dummy_if = 'dum1301' + +def get_vrf(interface): + for upper in glob(f'/sys/class/net/{interface}/upper*'): + # an upper interface could be named: upper_bond0.1000.1100, thus + # we need top drop the upper_ prefix + tmp = os.path.basename(upper) + tmp = tmp.replace('upper_', '') + return tmp + +class TestInterfacesOpenVPN(VyOSUnitTestSHIM.TestCase): + @classmethod + def setUpClass(cls): + super(TestInterfacesOpenVPN, cls).setUpClass() + + cls.cli_set(cls, ['interfaces', 'dummy', dummy_if, 'address', '192.0.2.1/32']) + cls.cli_set(cls, ['vrf', 'name', vrf_name, 'table', '12345']) + + cls.cli_set(cls, ['pki', 'ca', 'ovpn_test', 'certificate', cert_data.replace('\n','')]) + cls.cli_set(cls, ['pki', 'certificate', 'ovpn_test', 'certificate', cert_data.replace('\n','')]) + cls.cli_set(cls, ['pki', 'certificate', 'ovpn_test', 'private', 'key', key_data.replace('\n','')]) + cls.cli_set(cls, ['pki', 'dh', 'ovpn_test', 'parameters', dh_data.replace('\n','')]) + cls.cli_set(cls, ['pki', 'openvpn', 'shared-secret', 'ovpn_test', 'key', ovpn_key_data.replace('\n','')]) + + @classmethod + def tearDownClass(cls): + cls.cli_delete(cls, ['interfaces', 'dummy', dummy_if]) + cls.cli_delete(cls, ['vrf']) + + super(TestInterfacesOpenVPN, cls).tearDownClass() + + def tearDown(self): + self.cli_delete(base_path) + self.cli_commit() + + def test_openvpn_client_verify(self): + # Create OpenVPN client interface and test verify() steps. + interface = 'vtun2000' + path = base_path + [interface] + self.cli_set(path + ['mode', 'client']) + self.cli_set(path + ['encryption', 'data-ciphers', 'aes192gcm']) + + # check validate() - cannot specify local-port in client mode + self.cli_set(path + ['local-port', '5000']) + with self.assertRaises(ConfigSessionError): + self.cli_commit() + self.cli_delete(path + ['local-port']) + + # check validate() - cannot specify local-host in client mode + self.cli_set(path + ['local-host', '127.0.0.1']) + with self.assertRaises(ConfigSessionError): + self.cli_commit() + self.cli_delete(path + ['local-host']) + + # check validate() - cannot specify protocol tcp-passive in client mode + self.cli_set(path + ['protocol', 'tcp-passive']) + with self.assertRaises(ConfigSessionError): + self.cli_commit() + self.cli_delete(path + ['protocol']) + + # check validate() - remote-host must be set in client mode + with self.assertRaises(ConfigSessionError): + self.cli_commit() + self.cli_set(path + ['remote-host', '192.0.9.9']) + + # check validate() - cannot specify "tls dh-params" in client mode + self.cli_set(path + ['tls', 'dh-params', 'ovpn_test']) + with self.assertRaises(ConfigSessionError): + self.cli_commit() + self.cli_delete(path + ['tls']) + + # check validate() - must specify one of "shared-secret-key" and "tls" + with self.assertRaises(ConfigSessionError): + self.cli_commit() + self.cli_set(path + ['shared-secret-key', 'ovpn_test']) + + # check validate() - must specify one of "shared-secret-key" and "tls" + with self.assertRaises(ConfigSessionError): + self.cli_commit() + self.cli_delete(path + ['shared-secret-key', 'ovpn_test']) + + # check validate() - cannot specify "encryption cipher" in client mode + self.cli_set(path + ['encryption', 'cipher', 'aes192gcm']) + with self.assertRaises(ConfigSessionError): + self.cli_commit() + self.cli_delete(path + ['encryption', 'cipher']) + + self.cli_set(path + ['tls', 'ca-certificate', 'ovpn_test']) + self.cli_set(path + ['tls', 'certificate', 'ovpn_test']) + + # check validate() - can not have auth username without a password + self.cli_set(path + ['authentication', 'username', 'vyos']) + with self.assertRaises(ConfigSessionError): + self.cli_commit() + self.cli_set(path + ['authentication', 'password', 'vyos']) + + # client commit must pass + self.cli_commit() + + self.assertTrue(process_named_running(PROCESS_NAME)) + self.assertIn(interface, interfaces()) + + + def test_openvpn_client_interfaces(self): + # Create OpenVPN client interfaces connecting to different + # server IP addresses. Validate configuration afterwards. + num_range = range(10, 15) + for ii in num_range: + interface = f'vtun{ii}' + remote_host = f'192.0.2.{ii}' + path = base_path + [interface] + auth_hash = 'sha1' + + self.cli_set(path + ['device-type', 'tun']) + self.cli_set(path + ['encryption', 'data-ciphers', 'aes256']) + self.cli_set(path + ['hash', auth_hash]) + self.cli_set(path + ['mode', 'client']) + self.cli_set(path + ['persistent-tunnel']) + self.cli_set(path + ['protocol', protocol]) + self.cli_set(path + ['remote-host', remote_host]) + self.cli_set(path + ['remote-port', remote_port]) + self.cli_set(path + ['tls', 'ca-certificate', 'ovpn_test']) + self.cli_set(path + ['tls', 'certificate', 'ovpn_test']) + self.cli_set(path + ['vrf', vrf_name]) + self.cli_set(path + ['authentication', 'username', interface+'user']) + self.cli_set(path + ['authentication', 'password', interface+'secretpw']) + + self.cli_commit() + + for ii in num_range: + interface = f'vtun{ii}' + remote_host = f'192.0.2.{ii}' + config_file = f'/run/openvpn/{interface}.conf' + pw_file = f'/run/openvpn/{interface}.pw' + config = read_file(config_file) + + self.assertIn(f'dev {interface}', config) + self.assertIn(f'dev-type tun', config) + self.assertIn(f'persist-key', config) + self.assertIn(f'proto {protocol}', config) + self.assertIn(f'rport {remote_port}', config) + self.assertIn(f'remote {remote_host}', config) + self.assertIn(f'persist-tun', config) + self.assertIn(f'auth {auth_hash}', config) + self.assertIn(f'data-ciphers AES-256-CBC', config) + + # TLS options + self.assertIn(f'ca /run/openvpn/{interface}_ca.pem', config) + self.assertIn(f'cert /run/openvpn/{interface}_cert.pem', config) + self.assertIn(f'key /run/openvpn/{interface}_cert.key', config) + + self.assertTrue(process_named_running(PROCESS_NAME)) + self.assertEqual(get_vrf(interface), vrf_name) + self.assertIn(interface, interfaces()) + + pw = cmd(f'sudo cat {pw_file}') + self.assertIn(f'{interface}user', pw) + self.assertIn(f'{interface}secretpw', pw) + + # check that no interface remained after deleting them + self.cli_delete(base_path) + self.cli_commit() + + for ii in num_range: + interface = f'vtun{ii}' + self.assertNotIn(interface, interfaces()) + + def test_openvpn_client_ip_version(self): + # Test the client mode behavior combined with different IP protocol versions + + interface = 'vtun10' + remote_host = '192.0.2.10' + remote_host_v6 = 'fd00::2:10' + path = base_path + [interface] + auth_hash = 'sha1' + + # Default behavior: client uses uspecified protocol version (udp) + self.cli_set(path + ['device-type', 'tun']) + self.cli_set(path + ['encryption', 'data-ciphers', 'aes256']) + self.cli_set(path + ['hash', auth_hash]) + self.cli_set(path + ['mode', 'client']) + self.cli_set(path + ['persistent-tunnel']) + self.cli_set(path + ['protocol', 'udp']) + self.cli_set(path + ['remote-host', remote_host]) + self.cli_set(path + ['remote-port', remote_port]) + self.cli_set(path + ['tls', 'ca-certificate', 'ovpn_test']) + self.cli_set(path + ['tls', 'certificate', 'ovpn_test']) + self.cli_set(path + ['vrf', vrf_name]) + self.cli_set(path + ['authentication', 'username', interface+'user']) + self.cli_set(path + ['authentication', 'password', interface+'secretpw']) + + self.cli_commit() + + config_file = f'/run/openvpn/{interface}.conf' + config = read_file(config_file) + + self.assertIn(f'dev vtun10', config) + self.assertIn(f'dev-type tun', config) + self.assertIn(f'persist-key', config) + self.assertIn(f'proto udp', config) + self.assertIn(f'rport {remote_port}', config) + self.assertIn(f'remote {remote_host}', config) + self.assertIn(f'persist-tun', config) + + # IPv4 only: client usees udp4 protocol + self.cli_set(path + ['ip-version', 'ipv4']) + self.cli_commit() + + config = read_file(config_file) + self.assertIn(f'proto udp4', config) + + # IPv6 only: client uses udp6 protocol + self.cli_set(path + ['ip-version', 'ipv6']) + self.cli_delete(path + ['remote-host', remote_host]) + self.cli_set(path + ['remote-host', remote_host_v6]) + self.cli_commit() + + config = read_file(config_file) + self.assertIn(f'proto udp6', config) + + # IPv6 dual-stack: not allowed in client mode + self.cli_set(path + ['ip-version', 'dual-stack']) + with self.assertRaises(ConfigSessionError): + self.cli_commit() + + self.cli_delete(base_path) + self.cli_commit() + + def test_openvpn_server_verify(self): + # Create one OpenVPN server interface and check required verify() stages + interface = 'vtun5000' + path = base_path + [interface] + + # check validate() - must speciy operating mode + self.cli_set(path) + with self.assertRaises(ConfigSessionError): + self.cli_commit() + self.cli_set(path + ['mode', 'server']) + + # check validate() - cannot specify protocol tcp-active in server mode + self.cli_set(path + ['protocol', 'tcp-active']) + with self.assertRaises(ConfigSessionError): + self.cli_commit() + self.cli_delete(path + ['protocol']) + + # check validate() - cannot specify local-port in client mode + self.cli_set(path + ['remote-port', '5000']) + with self.assertRaises(ConfigSessionError): + self.cli_commit() + self.cli_delete(path + ['remote-port']) + + # check validate() - cannot specify local-host in client mode + self.cli_set(path + ['remote-host', '127.0.0.1']) + with self.assertRaises(ConfigSessionError): + self.cli_commit() + self.cli_delete(path + ['remote-host']) + + # check validate() - must specify "tls dh-params" when not using EC keys + # in server mode + with self.assertRaises(ConfigSessionError): + self.cli_commit() + self.cli_set(path + ['tls', 'dh-params', 'ovpn_test']) + + # check validate() - must specify "server subnet" or add interface to + # bridge in server mode + with self.assertRaises(ConfigSessionError): + self.cli_commit() + + # check validate() - server client-ip-pool is too large + # [100.64.0.4 -> 100.127.255.251 = 4194295], maximum is 65536 addresses. + self.cli_set(path + ['server', 'subnet', '100.64.0.0/10']) + with self.assertRaises(ConfigSessionError): + self.cli_commit() + + # check validate() - cannot specify more than 1 IPv4 and 1 IPv6 server subnet + self.cli_set(path + ['server', 'subnet', '100.64.0.0/20']) + with self.assertRaises(ConfigSessionError): + self.cli_commit() + self.cli_delete(path + ['server', 'subnet', '100.64.0.0/10']) + + # check validate() - must specify "tls ca-certificate" + with self.assertRaises(ConfigSessionError): + self.cli_commit() + self.cli_set(path + ['tls', 'ca-certificate', 'ovpn_test']) + + # check validate() - must specify "tls certificate" + with self.assertRaises(ConfigSessionError): + self.cli_commit() + self.cli_set(path + ['tls', 'certificate', 'ovpn_test']) + + # check validate() - cannot specify "tls role" in client-server mode' + self.cli_set(path + ['tls', 'role', 'active']) + with self.assertRaises(ConfigSessionError): + self.cli_commit() + + # check validate() - cannot specify "tls role" in client-server mode' + self.cli_set(path + ['tls', 'auth-key', 'ovpn_test']) + with self.assertRaises(ConfigSessionError): + self.cli_commit() + + # check validate() - cannot specify "tcp-passive" when "tls role" is "active" + self.cli_set(path + ['protocol', 'tcp-passive']) + with self.assertRaises(ConfigSessionError): + self.cli_commit() + self.cli_delete(path + ['protocol']) + + # check validate() - cannot specify "tls dh-params" when "tls role" is "active" + self.cli_set(path + ['tls', 'dh-params', 'ovpn_test']) + with self.assertRaises(ConfigSessionError): + self.cli_commit() + self.cli_delete(path + ['tls', 'dh-params']) + + # check validate() - cannot specify "encryption cipher" in server mode + self.cli_set(path + ['encryption', 'cipher', 'aes256']) + with self.assertRaises(ConfigSessionError): + self.cli_commit() + self.cli_delete(path + ['encryption', 'cipher']) + + # Now test the other path with tls role passive + self.cli_set(path + ['tls', 'role', 'passive']) + # check validate() - cannot specify "tcp-active" when "tls role" is "passive" + self.cli_set(path + ['protocol', 'tcp-active']) + with self.assertRaises(ConfigSessionError): + self.cli_commit() + self.cli_delete(path + ['protocol']) + + self.cli_set(path + ['tls', 'dh-params', 'ovpn_test']) + + self.cli_commit() + + self.assertTrue(process_named_running(PROCESS_NAME)) + self.assertIn(interface, interfaces()) + + def test_openvpn_server_subnet_topology(self): + # Create OpenVPN server interfaces using different client subnets. + # Validate configuration afterwards. + + auth_hash = 'sha256' + num_range = range(20, 25) + port = '' + client1_routes = ['10.0.0.0/8', '172.16.0.0/12', '192.168.0.0/16'] + for ii in num_range: + interface = f'vtun{ii}' + subnet = f'192.0.{ii}.0/24' + client_ip = inc_ip(subnet, '5') + path = base_path + [interface] + port = str(2000 + ii) + + self.cli_set(path + ['device-type', 'tun']) + self.cli_set(path + ['encryption', 'data-ciphers', 'aes192']) + self.cli_set(path + ['hash', auth_hash]) + self.cli_set(path + ['mode', 'server']) + self.cli_set(path + ['local-port', port]) + self.cli_set(path + ['server', 'mfa', 'totp']) + self.cli_set(path + ['server', 'subnet', subnet]) + self.cli_set(path + ['server', 'topology', 'subnet']) + self.cli_set(path + ['keep-alive', 'failure-count', '5']) + self.cli_set(path + ['keep-alive', 'interval', '5']) + + # clients + self.cli_set(path + ['server', 'client', 'client1', 'ip', client_ip]) + for route in client1_routes: + self.cli_set(path + ['server', 'client', 'client1', 'subnet', route]) + + self.cli_set(path + ['replace-default-route']) + self.cli_set(path + ['tls', 'ca-certificate', 'ovpn_test']) + self.cli_set(path + ['tls', 'certificate', 'ovpn_test']) + self.cli_set(path + ['tls', 'dh-params', 'ovpn_test']) + self.cli_set(path + ['vrf', vrf_name]) + + self.cli_commit() + + for ii in num_range: + interface = f'vtun{ii}' + plugin = f'plugin "/usr/lib/openvpn/openvpn-otp.so" "otp_secrets=/config/auth/openvpn/{interface}-otp-secrets otp_slop=180 totp_t0=0 totp_step=30 totp_digits=6 password_is_cr=1"' + subnet = f'192.0.{ii}.0/24' + + start_addr = inc_ip(subnet, '2') + stop_addr = last_host_address(subnet) + + client_ip = inc_ip(subnet, '5') + client_netmask = netmask_from_cidr(subnet) + + port = str(2000 + ii) + + config_file = f'/run/openvpn/{interface}.conf' + client_config_file = f'/run/openvpn/ccd/{interface}/client1' + config = read_file(config_file) + + self.assertIn(f'dev {interface}', config) + self.assertIn(f'dev-type tun', config) + self.assertIn(f'persist-key', config) + self.assertIn(f'proto udp', config) # default protocol + self.assertIn(f'auth {auth_hash}', config) + self.assertIn(f'data-ciphers AES-192-CBC', config) + self.assertIn(f'topology subnet', config) + self.assertIn(f'lport {port}', config) + self.assertIn(f'push "redirect-gateway def1"', config) + self.assertIn(f'{plugin}', config) + self.assertIn(f'keepalive 5 25', config) + + # TLS options + self.assertIn(f'ca /run/openvpn/{interface}_ca.pem', config) + self.assertIn(f'cert /run/openvpn/{interface}_cert.pem', config) + self.assertIn(f'key /run/openvpn/{interface}_cert.key', config) + self.assertIn(f'dh /run/openvpn/{interface}_dh.pem', config) + + # IP pool configuration + netmask = IPv4Network(subnet).netmask + network = IPv4Network(subnet).network_address + self.assertIn(f'server {network} {netmask}', config) + + # Verify client + client_config = read_file(client_config_file) + + self.assertIn(f'ifconfig-push {client_ip} {client_netmask}', client_config) + for route in client1_routes: + self.assertIn('iroute {} {}'.format(address_from_cidr(route), netmask_from_cidr(route)), client_config) + + self.assertTrue(process_named_running(PROCESS_NAME)) + self.assertEqual(get_vrf(interface), vrf_name) + self.assertIn(interface, interfaces()) + + # check that no interface remained after deleting them + self.cli_delete(base_path) + self.cli_commit() + + for ii in num_range: + interface = f'vtun{ii}' + self.assertNotIn(interface, interfaces()) + + def test_openvpn_server_ip_version(self): + # Test the server mode behavior combined with each IP protocol version + + auth_hash = 'sha256' + port = '2000' + + interface = 'vtun20' + subnet = '192.0.20.0/24' + path = base_path + [interface] + + # Default behavior: client uses uspecified protocol version (udp) + self.cli_set(path + ['device-type', 'tun']) + self.cli_set(path + ['encryption', 'data-ciphers', 'aes192']) + self.cli_set(path + ['hash', auth_hash]) + self.cli_set(path + ['mode', 'server']) + self.cli_set(path + ['local-port', port]) + self.cli_set(path + ['server', 'subnet', subnet]) + self.cli_set(path + ['server', 'topology', 'subnet']) + + self.cli_set(path + ['replace-default-route']) + self.cli_set(path + ['tls', 'ca-certificate', 'ovpn_test']) + self.cli_set(path + ['tls', 'certificate', 'ovpn_test']) + self.cli_set(path + ['tls', 'dh-params', 'ovpn_test']) + + self.cli_commit() + + start_addr = inc_ip(subnet, '2') + stop_addr = last_host_address(subnet) + + config_file = f'/run/openvpn/{interface}.conf' + config = read_file(config_file) + + self.assertIn(f'dev {interface}', config) + self.assertIn(f'dev-type tun', config) + self.assertIn(f'persist-key', config) + self.assertIn(f'proto udp', config) # default protocol + self.assertIn(f'auth {auth_hash}', config) + self.assertIn(f'data-ciphers AES-192-CBC', config) + self.assertIn(f'topology subnet', config) + self.assertIn(f'lport {port}', config) + self.assertIn(f'push "redirect-gateway def1"', config) + + # IPv4 only: server usees udp4 protocol + self.cli_set(path + ['ip-version', 'ipv4']) + self.cli_commit() + + config = read_file(config_file) + self.assertIn(f'proto udp4', config) + + # IPv6 only: server uses udp6 protocol + bind ipv6only + self.cli_set(path + ['ip-version', 'ipv6']) + self.cli_commit() + + config = read_file(config_file) + self.assertIn(f'proto udp6', config) + self.assertIn(f'bind ipv6only', config) + + # IPv6 dual-stack: server uses udp6 protocol without bind ipv6only + self.cli_set(path + ['ip-version', 'dual-stack']) + self.cli_commit() + + config = read_file(config_file) + self.assertIn(f'proto udp6', config) + self.assertNotIn(f'bind ipv6only', config) + + self.cli_delete(base_path) + self.cli_commit() + + def test_openvpn_site2site_verify(self): + # Create one OpenVPN site2site interface and check required + # verify() stages + + interface = 'vtun5000' + path = base_path + [interface] + + self.cli_set(path + ['mode', 'site-to-site']) + + # check validate() - cipher negotiation cannot be enabled in site-to-site mode + self.cli_set(path + ['encryption', 'data-ciphers', 'aes192gcm']) + with self.assertRaises(ConfigSessionError): + self.cli_commit() + self.cli_delete(path + ['encryption']) + + # check validate() - must specify "local-address" or add interface to bridge + with self.assertRaises(ConfigSessionError): + self.cli_commit() + self.cli_set(path + ['local-address', '10.0.0.1']) + self.cli_set(path + ['local-address', '2001:db8:1::1']) + + # check validate() - cannot specify more than 1 IPv4 local-address + self.cli_set(path + ['local-address', '10.0.0.2']) + with self.assertRaises(ConfigSessionError): + self.cli_commit() + self.cli_delete(path + ['local-address', '10.0.0.2']) + + # check validate() - cannot specify more than 1 IPv6 local-address + self.cli_set(path + ['local-address', '2001:db8:1::2']) + with self.assertRaises(ConfigSessionError): + self.cli_commit() + self.cli_delete(path + ['local-address', '2001:db8:1::2']) + + # check validate() - IPv4 "local-address" requires IPv4 "remote-address" + # or IPv4 "local-address subnet" + with self.assertRaises(ConfigSessionError): + self.cli_commit() + self.cli_set(path + ['remote-address', '192.168.0.1']) + self.cli_set(path + ['remote-address', '2001:db8:ffff::1']) + + # check validate() - Cannot specify more than 1 IPv4 "remote-address" + self.cli_set(path + ['remote-address', '192.168.0.2']) + with self.assertRaises(ConfigSessionError): + self.cli_commit() + self.cli_delete(path + ['remote-address', '192.168.0.2']) + + # check validate() - Cannot specify more than 1 IPv6 "remote-address" + self.cli_set(path + ['remote-address', '2001:db8:ffff::2']) + with self.assertRaises(ConfigSessionError): + self.cli_commit() + self.cli_delete(path + ['remote-address', '2001:db8:ffff::2']) + + # check validate() - Must specify one of "shared-secret-key" and "tls" + with self.assertRaises(ConfigSessionError): + self.cli_commit() + self.cli_set(path + ['shared-secret-key', 'ovpn_test']) + + self.cli_commit() + + def test_openvpn_options(self): + # Ensure OpenVPN process restart on openvpn-option CLI node change + + interface = 'vtun5001' + path = base_path + [interface] + encryption_cipher = 'aes256' + + self.cli_set(path + ['mode', 'site-to-site']) + self.cli_set(path + ['local-address', '10.0.0.2']) + self.cli_set(path + ['remote-address', '192.168.0.3']) + self.cli_set(path + ['shared-secret-key', 'ovpn_test']) + self.cli_set(path + ['encryption', 'cipher', encryption_cipher]) + + self.cli_commit() + + # Now verify the OpenVPN "raw" option passing. Once an openvpn-option is + # added, modified or deleted from the CLI, OpenVPN daemon must be restarted + cur_pid = process_named_running('openvpn') + self.cli_set(path + ['openvpn-option', '--persist-tun']) + self.cli_commit() + + # PID must be different as OpenVPN Must be restarted + new_pid = process_named_running('openvpn') + self.assertNotEqual(cur_pid, new_pid) + cur_pid = new_pid + + self.cli_set(path + ['openvpn-option', '--persist-key']) + self.cli_commit() + + # PID must be different as OpenVPN Must be restarted + new_pid = process_named_running('openvpn') + self.assertNotEqual(cur_pid, new_pid) + cur_pid = new_pid + + self.cli_delete(path + ['openvpn-option']) + self.cli_commit() + + # PID must be different as OpenVPN Must be restarted + new_pid = process_named_running('openvpn') + self.assertNotEqual(cur_pid, new_pid) + cur_pid = new_pid + + def test_openvpn_site2site_interfaces_tun(self): + # Create two OpenVPN site-to-site interfaces + + num_range = range(30, 35) + port = '' + local_address = '' + remote_address = '' + encryption_cipher = 'aes256' + + for ii in num_range: + interface = f'vtun{ii}' + local_address = f'192.0.{ii}.1' + local_address_subnet = '255.255.255.252' + remote_address = f'172.16.{ii}.1' + path = base_path + [interface] + port = str(3000 + ii) + + self.cli_set(path + ['local-address', local_address]) + + # even numbers use tun type, odd numbers use tap type + if ii % 2 == 0: + self.cli_set(path + ['device-type', 'tun']) + else: + self.cli_set(path + ['device-type', 'tap']) + self.cli_set(path + ['local-address', local_address, 'subnet-mask', local_address_subnet]) + + self.cli_set(path + ['mode', 'site-to-site']) + self.cli_set(path + ['local-port', port]) + self.cli_set(path + ['remote-port', port]) + self.cli_set(path + ['shared-secret-key', 'ovpn_test']) + self.cli_set(path + ['remote-address', remote_address]) + self.cli_set(path + ['encryption', 'cipher', encryption_cipher]) + self.cli_set(path + ['vrf', vrf_name]) + + self.cli_commit() + + for ii in num_range: + interface = f'vtun{ii}' + local_address = f'192.0.{ii}.1' + remote_address = f'172.16.{ii}.1' + port = str(3000 + ii) + + config_file = f'/run/openvpn/{interface}.conf' + config = read_file(config_file) + + # even numbers use tun type, odd numbers use tap type + if ii % 2 == 0: + self.assertIn(f'dev-type tun', config) + self.assertIn(f'ifconfig {local_address} {remote_address}', config) + else: + self.assertIn(f'dev-type tap', config) + self.assertIn(f'ifconfig {local_address} {local_address_subnet}', config) + + self.assertIn(f'dev {interface}', config) + self.assertIn(f'secret /run/openvpn/{interface}_shared.key', config) + self.assertIn(f'lport {port}', config) + self.assertIn(f'rport {port}', config) + + + self.assertTrue(process_named_running(PROCESS_NAME)) + self.assertEqual(get_vrf(interface), vrf_name) + self.assertIn(interface, interfaces()) + + + # check that no interface remained after deleting them + self.cli_delete(base_path) + self.cli_commit() + + for ii in num_range: + interface = f'vtun{ii}' + self.assertNotIn(interface, interfaces()) + + + def test_openvpn_site2site_ip_version(self): + # Test the site-to-site mode behavior combined with each IP protocol version + + encryption_cipher = 'aes256' + + interface = 'vtun30' + local_address = '192.0.30.1' + local_address_subnet = '255.255.255.252' + remote_address = '172.16.30.1' + path = base_path + [interface] + port = '3030' + + self.cli_set(path + ['local-address', local_address]) + self.cli_set(path + ['device-type', 'tun']) + self.cli_set(path + ['mode', 'site-to-site']) + self.cli_set(path + ['local-port', port]) + self.cli_set(path + ['remote-port', port]) + self.cli_set(path + ['shared-secret-key', 'ovpn_test']) + self.cli_set(path + ['remote-address', remote_address]) + self.cli_set(path + ['encryption', 'cipher', encryption_cipher]) + + self.cli_commit() + + config_file = f'/run/openvpn/{interface}.conf' + config = read_file(config_file) + + self.assertIn(f'dev-type tun', config) + self.assertIn(f'ifconfig {local_address} {remote_address}', config) + self.assertIn(f'proto udp', config) + self.assertIn(f'dev {interface}', config) + self.assertIn(f'secret /run/openvpn/{interface}_shared.key', config) + self.assertIn(f'lport {port}', config) + self.assertIn(f'rport {port}', config) + + # IPv4 only: server usees udp4 protocol + self.cli_set(path + ['ip-version', 'ipv4']) + self.cli_commit() + + config = read_file(config_file) + self.assertIn(f'proto udp4', config) + + # IPv6 only: server uses udp6 protocol + bind ipv6only + self.cli_set(path + ['ip-version', 'ipv6']) + self.cli_commit() + + config = read_file(config_file) + self.assertIn(f'proto udp6', config) + self.assertIn(f'bind ipv6only', config) + + # IPv6 dual-stack: not allowed in site-to-site mode + self.cli_set(path + ['ip-version', 'dual-stack']) + with self.assertRaises(ConfigSessionError): + self.cli_commit() + + self.cli_delete(base_path) + self.cli_commit() + + def test_openvpn_server_server_bridge(self): + # Create OpenVPN server interface using bridge. + # Validate configuration afterwards. + br_if = 'br0' + vtun_if = 'vtun5010' + auth_hash = 'sha256' + path = base_path + [vtun_if] + start_subnet = "192.168.0.100" + stop_subnet = "192.168.0.200" + mask_subnet = "255.255.255.0" + gw_subnet = "192.168.0.1" + + self.cli_set(['interfaces', 'bridge', br_if, 'member', 'interface', vtun_if]) + self.cli_set(path + ['device-type', 'tap']) + self.cli_set(path + ['encryption', 'data-ciphers', 'aes192']) + self.cli_set(path + ['hash', auth_hash]) + self.cli_set(path + ['mode', 'server']) + self.cli_set(path + ['server', 'bridge', 'gateway', gw_subnet]) + self.cli_set(path + ['server', 'bridge', 'start', start_subnet]) + self.cli_set(path + ['server', 'bridge', 'stop', stop_subnet]) + self.cli_set(path + ['server', 'bridge', 'subnet-mask', mask_subnet]) + self.cli_set(path + ['keep-alive', 'failure-count', '5']) + self.cli_set(path + ['keep-alive', 'interval', '5']) + self.cli_set(path + ['tls', 'ca-certificate', 'ovpn_test']) + self.cli_set(path + ['tls', 'certificate', 'ovpn_test']) + self.cli_set(path + ['tls', 'dh-params', 'ovpn_test']) + + self.cli_commit() + + config_file = f'/run/openvpn/{vtun_if}.conf' + config = read_file(config_file) + self.assertIn(f'dev {vtun_if}', config) + self.assertIn(f'dev-type tap', config) + self.assertIn(f'proto udp', config) # default protocol + self.assertIn(f'auth {auth_hash}', config) + self.assertIn(f'data-ciphers AES-192-CBC', config) + self.assertIn(f'mode server', config) + self.assertIn(f'server-bridge {gw_subnet} {mask_subnet} {start_subnet} {stop_subnet}', config) + self.assertIn(f'keepalive 5 25', config) + + # TLS options + self.assertIn(f'ca /run/openvpn/{vtun_if}_ca.pem', config) + self.assertIn(f'cert /run/openvpn/{vtun_if}_cert.pem', config) + self.assertIn(f'key /run/openvpn/{vtun_if}_cert.key', config) + self.assertIn(f'dh /run/openvpn/{vtun_if}_dh.pem', config) + + # check that no interface remained after deleting them + self.cli_delete(['interfaces', 'bridge', br_if, 'member', 'interface', vtun_if]) + self.cli_delete(base_path) + self.cli_commit() + +if __name__ == '__main__': + unittest.main(verbosity=2) |
