From bb76575715682594d4d6d73d8b9e87692bdc6841 Mon Sep 17 00:00:00 2001
From: Christian Poessinger <christian@poessinger.com>
Date: Sun, 1 Nov 2020 10:46:46 +0100
Subject: openvpn: T2994: remove workarounds for individual ipv4 and ipv6 keys

Remove workaround which split (local|remote)_address and also subnet keys into
individual keys for the assigned IP address family (4/6).

During template rendering check IP version by introducing new ipv4 and ipv6
Jinja2 filters {% if foo | ipv4 %} or {% if bar | ipv6 %} options.
---
 src/conf_mode/interfaces-openvpn.py | 237 ++++++++++++++++--------------------
 src/tests/test_jinja_filters.py     |  56 +++++++++
 src/tests/test_template.py          |  31 -----
 3 files changed, 161 insertions(+), 163 deletions(-)
 create mode 100644 src/tests/test_jinja_filters.py
 delete mode 100644 src/tests/test_template.py

(limited to 'src')

diff --git a/src/conf_mode/interfaces-openvpn.py b/src/conf_mode/interfaces-openvpn.py
index b75b6dc1b..3ad04610a 100755
--- a/src/conf_mode/interfaces-openvpn.py
+++ b/src/conf_mode/interfaces-openvpn.py
@@ -28,6 +28,9 @@ from shutil import rmtree
 
 from vyos.config import Config
 from vyos.configdict import get_interface_dict
+from vyos.configverify import verify_vrf
+from vyos.configverify import verify_bridge_delete
+from vyos.configverify import verify_diffie_hellman_length
 from vyos.ifconfig import VTunIf
 from vyos.template import render
 from vyos.util import call
@@ -35,10 +38,8 @@ from vyos.util import chown
 from vyos.util import chmod_600
 from vyos.util import dict_search
 from vyos.validate import is_addr_assigned
+from vyos.validate import is_ipv4
 from vyos.validate import is_ipv6
-from vyos.configverify import verify_vrf
-from vyos.configverify import verify_bridge_delete
-from vyos.configverify import verify_diffie_hellman_length
 
 from vyos import ConfigError
 from vyos import airbag
@@ -80,47 +81,6 @@ def get_config(config=None):
     openvpn['daemon_user'] = user
     openvpn['daemon_group'] = group
 
-    for addr_type in ['local_address', 'remote_address']:
-        # Split the {local,remote}_address property into separate IPv4 and IPv6
-        # lists for proper validation/configuration
-        if addr_type in openvpn:
-            # actually only one ipv4 and one ipv6 address is allowed - validated
-            # later on, thus keep this simple
-            address_v4 = []
-            netmask_v4 = []
-            address_v6 = []
-
-            for addr in openvpn[addr_type]:
-                if is_ipv6(addr):
-                    address_v6.append(addr)
-                else:
-                    if addr_type == 'local_address' and addr in openvpn['local_address'] and 'subnet_mask' in openvpn['local_address'][addr]:
-                        netmask_v4.append(openvpn['local_address'][addr]['subnet_mask'])
-                    address_v4.append(addr)
-
-            if address_v4: openvpn.update({addr_type + '_v4': address_v4})
-            if address_v6: openvpn.update({addr_type + '_v6': address_v6})
-            if netmask_v4: openvpn.update({'local_address_v4_netmask': netmask_v4})
-
-            del openvpn[addr_type]
-
-    if dict_search('server.subnet', openvpn):
-        # Split the subnet property into separate IPv4 and IPv6 lists for
-        # proper validation/configuration
-        subnet_v4 = []
-        subnet_v6 = []
-
-        for subnet in dict_search('server.subnet', openvpn):
-            if is_ipv6(subnet):
-                subnet_v6.append(subnet)
-            else:
-                subnet_v4.append(subnet)
-
-        if subnet_v4: openvpn['server'].update({'subnet_v4': subnet_v4})
-        if subnet_v6: openvpn['server'].update({'subnet_v6': subnet_v6})
-
-        del openvpn['server']['subnet']
-
     return openvpn
 
 def verify(openvpn):
@@ -160,56 +120,64 @@ def verify(openvpn):
     # OpenVPN site-to-site - VERIFY
     #
     elif openvpn['mode'] == 'site-to-site':
-        if dict_search('encryption.ncp_ciphers', openvpn):
-            raise ConfigError('NCP ciphers can only be used in client or server mode')
-
-    elif openvpn['mode'] == 'site-to-site' and 'is_bridge_member' not in openvpn:
-
-        if not ('local_address_v4' in openvpn or 'local_address_v6' in openvpn):
+        if not 'local_address' in openvpn:
             raise ConfigError('Must specify "local-address" or add interface to bridge')
 
-        if 'local_address_v4' in openvpn and len(openvpn['local_address_v4']) > 1:
+        if len([addr for addr in openvpn['local_address'] if is_ipv4(addr)]) > 1:
             raise ConfigError('Only one IPv4 local-address can be specified')
 
-        if 'local_address_v6' in openvpn and len(openvpn['local_address_v6']) > 1:
+        if len([addr for addr in openvpn['local_address'] if is_ipv6(addr)]) > 1:
             raise ConfigError('Only one IPv6 local-address can be specified')
 
-        if 'remote_address_v4' in openvpn and len(openvpn['remote_address_v4']) > 1:
-            raise ConfigError('Only one IPv4 remote-address can be specified')
+        if openvpn['device_type'] == 'tun':
+            if 'remote_address' not in openvpn:
+                raise ConfigError('Must specify "remote-address"')
 
-        if 'remote_address_v6' in openvpn and len(openvpn['remote_address_v6']) > 1:
-            raise ConfigError('Only one IPv6 remote-address can be specified')
+        if 'remote_address' in openvpn:
+            if len([addr for addr in openvpn['remote_address'] if is_ipv4(addr)]) > 1:
+                raise ConfigError('Only one IPv4 remote-address can be specified')
 
-        for host in (dict_search('remote_host', openvpn) or []):
-            if host in (dict_search('remote_address_v4', openvpn) or vyos_dict_search('remote_address_v6', openvpn) or []):
-                raise ConfigError('"remote-address" and "remote-host" can not be the same')
+            if len([addr for addr in openvpn['remote_address'] if is_ipv6(addr)]) > 1:
+                raise ConfigError('Only one IPv6 remote-address can be specified')
 
-        if 'local_address_v4' in openvpn:
-            if not ({'remote_address', 'local_address_v4_netmask'} <= set(openvpn)):
-                raise ConfigError('IPv4 "local-address" requires IPv4 "remote-address" or IPv4 "local-address subnet"')
+            if not 'local_address' in openvpn:
+                raise ConfigError('"remote-address" requires "local-address"')
+
+            v4loAddr = [addr for addr in openvpn['local_address'] if is_ipv4(addr)]
+            v4remAddr = [addr for addr in openvpn['remote_address'] if is_ipv4(addr)]
+            if v4loAddr and not v4remAddr:
+                raise ConfigError('IPv4 "local-address" requires IPv4 "remote-address"')
+            elif v4remAddr and not v4loAddr:
+                raise ConfigError('IPv4 "remote-address" requires IPv4 "local-address"')
 
-        if 'remote_address_v4' in openvpn and not 'local_address_v4' in openvpn:
-            raise ConfigError('IPv4 "remote-address" requires IPv4 "local-address"')
+            v6remAddr = [addr for addr in openvpn['remote_address'] if is_ipv6(addr)]
+            v6loAddr = [addr for addr in openvpn['local_address'] if is_ipv6(addr)]
+            if v6loAddr and not v6remAddr:
+                raise ConfigError('IPv6 "local-address" requires IPv6 "remote-address"')
+            elif v6remAddr and not v6loAddr:
+                raise ConfigError('IPv6 "remote-address" requires IPv6 "local-address"')
 
-        if 'local_address_v4' in openvpn and not 'remote_address_v4' in openvpn:
-            raise ConfigError('IPv4 "local-address" requires IPv4 "remote-address"')
+            if (v4loAddr == v4remAddr) or (v6remAddr == v4remAddr):
+                raise ConfigError('"local-address" and "remote-address" cannot be the same')
 
-        if 'remote_address_v6' in openvpn and not 'local_address_v6' in openvpn:
-            raise ConfigError('IPv6 "remote-address" requires IPv6 "local-address"')
+            if dict_search('local_host', openvpn) in dict_search('local_address', openvpn):
+                raise ConfigError('"local-address" cannot be the same as "local-host"')
 
-        if 'local_address_v6' in openvpn and not 'remote_address_v6' in openvpn:
-            raise ConfigError('IPv6 "local-address" requires IPv6 "remote-address"')
+            if dict_search('remote_host', openvpn) in dict_search('remote_address', openvpn):
+                raise ConfigError('"remote-address" and "remote-host" can not be the same')
 
-        if openvpn['device_type'] == 'tun':
-            if not ('remote_address_v4' in openvpn or 'remote_address_v6' in openvpn):
-                raise ConfigError('Must specify "remote-address"')
 
-            if (dict_search('local_address_v4', openvpn) == vyos_dict_search('remote_address_v4', openvpn) or
-                dict_search('local_address_v6', openvpn) == vyos_dict_search('remote_address_v6', openvpn)):
-                raise ConfigError('"local-address" and "remote-address" cannot be the same')
+        if 'local_address' in openvpn:
+            # we can only have one local_address, this is ensured above
+            v4addr = None
+            for laddr in openvpn['local_address']:
+                if is_ipv4(laddr): v4addr = laddr
 
-            if dict_search('local_host', openvpn) in vyos_dict_search('local_address_v4', openvpn) or vyos_dict_search('local_address_v6', openvpn):
-                raise ConfigError('"local-address" cannot be the same as "local-host"')
+            if 'remote_address' not in openvpn and (v4addr not in openvpn['local_address'] or 'subnet_mask' not in openvpn['local_address'][v4addr]):
+                raise ConfigError('IPv4 "local-address" requires IPv4 "remote-address" or IPv4 "local-address subnet"')
+
+        if dict_search('encryption.ncp_ciphers', openvpn):
+            raise ConfigError('NCP ciphers can only be used in client or server mode')
 
     else:
         # checks for client-server or site-to-site bridged
@@ -235,35 +203,42 @@ def verify(openvpn):
                 if 'key_file' in openvpn['tls'] and not checkCertHeader('-----BEGIN EC PRIVATE KEY-----', openvpn['tls']['key_file']):
                     raise ConfigError('Must specify "tls dh-file" when not using EC keys in server mode')
 
-        if len(dict_search('server.subnet_v4', openvpn) or []) > 1:
-            raise ConfigError('Cannot specify more than 1 IPv4 server subnet')
-
-        if len(dict_search('server.subnet_v6', openvpn) or []) > 1:
-            raise ConfigError('Cannot specify more than 1 IPv6 server subnet')
-
-        for client in (dict_search('client', openvpn) or []):
-            if len(client['ip']) > 1 or len(client['ipv6_ip']) > 1:
-                raise ConfigError(f'Server client "{client["name"]}": cannot specify more than 1 IPv4 and 1 IPv6 IP')
-
-        tmp = dict_search('server.subnet_v4', openvpn)
+        tmp = dict_search('server.subnet', openvpn)
         if tmp:
-            subnet = IPv4Network(tmp[0].replace(' ', '/'))
+            v4_subnets = len([subnet for subnet in tmp if is_ipv4(subnet)])
+            v6_subnets = len([subnet for subnet in tmp if is_ipv6(subnet)])
+            if v4_subnets > 1:
+                raise ConfigError('Cannot specify more than 1 IPv4 server subnet')
+            if v6_subnets > 1:
+                raise ConfigError('Cannot specify more than 1 IPv6 server subnet')
+
+            if v6_subnets > 0 and v4_subnets == 0:
+                raise ConfigError('IPv6 server requires an IPv4 server subnet')
+
+            for subnet in tmp:
+                if is_ipv4(subnet):
+                    subnet = IPv4Network(subnet)
 
-            if openvpn['device_type'] == 'tun' and subnet.prefixlen > 29:
-                raise ConfigError('Server subnets smaller than /29 with device type "tun" are not supported')
-            elif openvpn['device_type'] == 'tap' and subnet.prefixlen > 30:
-                raise ConfigError('Server subnets smaller than /30 with device type "tap" are not supported')
+                    if openvpn['device_type'] == 'tun' and subnet.prefixlen > 29:
+                        raise ConfigError('Server subnets smaller than /29 with device type "tun" are not supported')
+                    elif openvpn['device_type'] == 'tap' and subnet.prefixlen > 30:
+                        raise ConfigError('Server subnets smaller than /30 with device type "tap" are not supported')
 
-            for client in (dict_search('client', openvpn) or []):
-                if client['ip'] and not IPv4Address(client['ip'][0]) in subnet:
-                    raise ConfigError(f'Client "{client["name"]}" IP {client["ip"][0]} not in server subnet {subnet}')
+                    for client in (dict_search('client', openvpn) or []):
+                        if client['ip'] and not IPv4Address(client['ip'][0]) in subnet:
+                            raise ConfigError(f'Client "{client["name"]}" IP {client["ip"][0]} not in server subnet {subnet}')
 
         else:
             if 'is_bridge_member' not in openvpn:
                 raise ConfigError('Must specify "server subnet" or add interface to bridge in server mode')
 
+
+        for client in (dict_search('client', openvpn) or []):
+            if len(client['ip']) > 1 or len(client['ipv6_ip']) > 1:
+                raise ConfigError(f'Server client "{client["name"]}": cannot specify more than 1 IPv4 and 1 IPv6 IP')
+
         if dict_search('server.client_ip_pool', openvpn):
-            if not (dict_search('server.client_ip_pool.start', openvpn) and vyos_dict_search('server.client_ip_pool.stop', openvpn)):
+            if not (dict_search('server.client_ip_pool.start', openvpn) and dict_search('server.client_ip_pool.stop', openvpn)):
                 raise ConfigError('Server client-ip-pool requires both start and stop addresses')
             else:
                 v4PoolStart = IPv4Address(dict_search('server.client_ip_pool.start', openvpn))
@@ -282,39 +257,36 @@ def verify(openvpn):
                             if IPv4Address(client['ip'][0]) in v4PoolNet:
                                 print(f'Warning: Client "{client["name"]}" IP {client["ip"][0]} is in server IP pool, it is not reserved for this client.')
 
-        if dict_search('server.subnet_v6', openvpn):
-            if not dict_search('server.subnet_v4', openvpn):
-                raise ConfigError('IPv6 server requires an IPv4 server subnet')
-
-            tmp = dict_search('client_ipv6_pool.base', openvpn)
-            if tmp:
-                if not dict_search('server.client_ip_pool', openvpn):
-                    raise ConfigError('IPv6 server pool requires an IPv4 server pool')
-
-                if int(tmp.split('/')[1]) >= 112:
-                    raise ConfigError('IPv6 server pool must be larger than /112')
-
-                #
-                # todo - weird logic
-                #
-
-                v6PoolStart = IPv6Address(tmp)
-                v6PoolStop = IPv6Network((v6PoolStart, openvpn['server_ipv6_pool_prefixlen']), strict=False)[-1] # don't remove the parentheses, it's a 2-tuple
-                v6PoolSize = int(v6PoolStop) - int(v6PoolStart) if int(openvpn['server_ipv6_pool_prefixlen']) > 96 else 65536
-                if v6PoolSize < v4PoolSize:
-                    raise ConfigError(f'IPv6 server pool must be at least as large as the IPv4 pool (current sizes: IPv6={v6PoolSize} IPv4={v4PoolSize})')
-
-                v6PoolNets = list(summarize_address_range(v6PoolStart, v6PoolStop))
-                for client in (dict_search('client', openvpn) or []):
-                    if client['ipv6_ip']:
-                        for v6PoolNet in v6PoolNets:
-                            if IPv6Address(client['ipv6_ip'][0]) in v6PoolNet:
-                                print(f'Warning: Client "{client["name"]}" IP {client["ipv6_ip"][0]} is in server IP pool, it is not reserved for this client.')
+        for subnet in (dict_search('server.subnet', openvpn) or []):
+            if is_ipv6(subnet):
+                tmp = dict_search('client_ipv6_pool.base', openvpn)
+                if tmp:
+                    if not dict_search('server.client_ip_pool', openvpn):
+                        raise ConfigError('IPv6 server pool requires an IPv4 server pool')
+
+                    if int(tmp.split('/')[1]) >= 112:
+                        raise ConfigError('IPv6 server pool must be larger than /112')
+
+                    #
+                    # todo - weird logic
+                    #
+                    v6PoolStart = IPv6Address(tmp)
+                    v6PoolStop = IPv6Network((v6PoolStart, openvpn['server_ipv6_pool_prefixlen']), strict=False)[-1] # don't remove the parentheses, it's a 2-tuple
+                    v6PoolSize = int(v6PoolStop) - int(v6PoolStart) if int(openvpn['server_ipv6_pool_prefixlen']) > 96 else 65536
+                    if v6PoolSize < v4PoolSize:
+                        raise ConfigError(f'IPv6 server pool must be at least as large as the IPv4 pool (current sizes: IPv6={v6PoolSize} IPv4={v4PoolSize})')
+
+                    v6PoolNets = list(summarize_address_range(v6PoolStart, v6PoolStop))
+                    for client in (dict_search('client', openvpn) or []):
+                        if client['ipv6_ip']:
+                            for v6PoolNet in v6PoolNets:
+                                if IPv6Address(client['ipv6_ip'][0]) in v6PoolNet:
+                                    print(f'Warning: Client "{client["name"]}" IP {client["ipv6_ip"][0]} is in server IP pool, it is not reserved for this client.')
 
-        else:
-            for route in (dict_search('server.push_route', openvpn) or []):
-                if is_ipv6(route):
-                    raise ConfigError('IPv6 push-route requires an IPv6 server subnet')
+            else:
+                for route in (dict_search('server.push_route', openvpn) or []):
+                    if is_ipv6(route):
+                        raise ConfigError('IPv6 push-route requires an IPv6 server subnet')
 
             #for client in openvpn ['client']:
             #    if client['ipv6_ip']:
@@ -480,7 +452,7 @@ def generate(openvpn):
     # create config directory on demand
     directories = []
     directories.append(f'{directory}/status')
-    directories.append(f'{directory}/ccd/{interface}')
+    directories.append(ccd_dir)
     for onedir in directories:
         if not os.path.exists(onedir):
             os.makedirs(onedir, 0o755)
@@ -513,7 +485,7 @@ def generate(openvpn):
             client_file = os.path.join(ccd_dir, client)
 
             # Our client need's to know its subnet mask ...
-            client_config['subnet'] = dict_search('server.subnet_v4', openvpn)
+            client_config['subnet'] = dict_search('server.subnet', openvpn)
             render(client_file, 'openvpn/client.conf.tmpl', client_config,
                    trim_blocks=True, user=user, group=group)
 
@@ -571,3 +543,4 @@ if __name__ == '__main__':
     except ConfigError as e:
         print(e)
         exit(1)
+
diff --git a/src/tests/test_jinja_filters.py b/src/tests/test_jinja_filters.py
new file mode 100644
index 000000000..57ff11070
--- /dev/null
+++ b/src/tests/test_jinja_filters.py
@@ -0,0 +1,56 @@
+#!/usr/bin/env python3
+#
+# Copyright (C) 2020 VyOS maintainers and contributors
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License version 2 or later as
+# published by the Free Software Foundation.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+from unittest import TestCase
+
+from vyos.template import vyos_address_from_cidr
+from vyos.template import vyos_netmask_from_cidr
+from vyos.template import vyos_ipv4
+from vyos.template import vyos_ipv6
+
+class TestTeamplteHelpers(TestCase):
+    def setUp(self):
+        pass
+
+    def test_helpers_from_cidr(self):
+        network = '192.0.2.0/26'
+        self.assertEqual(vyos_address_from_cidr(network), '192.0.2.0')
+        self.assertEqual(vyos_netmask_from_cidr(network), '255.255.255.192')
+
+    def test_helpers_ipv4(self):
+        self.assertTrue(vyos_ipv4('192.0.2.1'))
+        self.assertTrue(vyos_ipv4('192.0.2.0/24'))
+        self.assertTrue(vyos_ipv4('192.0.2.1/32'))
+        self.assertTrue(vyos_ipv4('10.255.1.2'))
+        self.assertTrue(vyos_ipv4('10.255.1.0/24'))
+        self.assertTrue(vyos_ipv4('10.255.1.2/32'))
+        self.assertFalse(vyos_ipv4('2001:db8::'))
+        self.assertFalse(vyos_ipv4('2001:db8::1'))
+        self.assertFalse(vyos_ipv4('2001:db8::/64'))
+
+    def test_helpers_ipv6(self):
+        self.assertFalse(vyos_ipv6('192.0.2.1'))
+        self.assertFalse(vyos_ipv6('192.0.2.0/24'))
+        self.assertFalse(vyos_ipv6('192.0.2.1/32'))
+        self.assertFalse(vyos_ipv6('10.255.1.2'))
+        self.assertFalse(vyos_ipv6('10.255.1.0/24'))
+        self.assertFalse(vyos_ipv6('10.255.1.2/32'))
+        self.assertTrue(vyos_ipv6('2001:db8::'))
+        self.assertTrue(vyos_ipv6('2001:db8::1'))
+        self.assertTrue(vyos_ipv6('2001:db8::1/64'))
+        self.assertTrue(vyos_ipv6('2001:db8::/32'))
+        self.assertTrue(vyos_ipv6('2001:db8::/64'))
+
diff --git a/src/tests/test_template.py b/src/tests/test_template.py
deleted file mode 100644
index 0b9f2c3b8..000000000
--- a/src/tests/test_template.py
+++ /dev/null
@@ -1,31 +0,0 @@
-#!/usr/bin/env python3
-#
-# Copyright (C) 2020 VyOS maintainers and contributors
-#
-# This program is free software; you can redistribute it and/or modify
-# it under the terms of the GNU General Public License version 2 or later as
-# published by the Free Software Foundation.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program.  If not, see <http://www.gnu.org/licenses/>.
-
-from unittest import TestCase
-
-from vyos.template import vyos_address_from_cidr
-from vyos.template import vyos_netmask_from_cidr
-
-
-class TestTeamplteHelpers(TestCase):
-    def setUp(self):
-        pass
-
-    def test_helpers_from_cidr(self):
-        network = '192.0.2.0/26'
-        self.assertEqual(vyos_address_from_cidr(network), '192.0.2.0')
-        self.assertEqual(vyos_netmask_from_cidr(network), '255.255.255.192')
-
-- 
cgit v1.2.3