summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rwxr-xr-xsrc/conf_mode/dns_forwarding.py4
-rwxr-xr-xsrc/conf_mode/intel_qat.py8
-rwxr-xr-xsrc/conf_mode/interfaces-bonding.py6
-rwxr-xr-xsrc/conf_mode/interfaces-bridge.py6
-rwxr-xr-xsrc/conf_mode/interfaces-openvpn.py1128
-rwxr-xr-xsrc/conf_mode/interfaces-wireless.py14
-rwxr-xr-xsrc/conf_mode/service_pppoe-server.py14
-rwxr-xr-xsrc/conf_mode/ssh.py7
-rwxr-xr-xsrc/conf_mode/vpn_sstp.py10
-rwxr-xr-xsrc/services/vyos-hostsd2
-rw-r--r--src/tests/test_configverify.py (renamed from src/tests/test_template.py)23
-rw-r--r--src/tests/test_dict_search.py (renamed from src/tests/test_vyos_dict_search.py)30
-rw-r--r--src/tests/test_jinja_filters.py69
-rw-r--r--src/tests/test_validate.py64
14 files changed, 476 insertions, 909 deletions
diff --git a/src/conf_mode/dns_forwarding.py b/src/conf_mode/dns_forwarding.py
index 2187b3c73..d0c2dd252 100755
--- a/src/conf_mode/dns_forwarding.py
+++ b/src/conf_mode/dns_forwarding.py
@@ -23,7 +23,7 @@ from vyos.configdict import dict_merge
from vyos.hostsd_client import Client as hostsd_client
from vyos.util import call
from vyos.util import chown
-from vyos.util import vyos_dict_search
+from vyos.util import dict_search
from vyos.template import render
from vyos.xml import defaults
from vyos.validate import is_ipv6
@@ -94,7 +94,7 @@ def verify(dns):
if 'allow_from' not in dns:
raise ConfigError('DNS forwarding requires an allow-from network')
- # we can not use vyos_dict_search() when testing for domain servers
+ # we can not use dict_search() when testing for domain servers
# as a domain will contains dot's which is out dictionary delimiter.
if 'domain' in dns:
for domain in dns['domain']:
diff --git a/src/conf_mode/intel_qat.py b/src/conf_mode/intel_qat.py
index ab98cbc03..dd04a002d 100755
--- a/src/conf_mode/intel_qat.py
+++ b/src/conf_mode/intel_qat.py
@@ -66,8 +66,14 @@ def verify(qat):
# Check if QAT device exist
output, err = popen('lspci -nn', decode='utf-8')
if not err:
+ # PCI id | Chipset
+ # 19e2 -> C3xx
+ # 37c8 -> C62x
+ # 0435 -> DH895
+ # 6f54 -> D15xx
+ # 18ee -> QAT_200XX
data = re.findall(
- '(8086:19e2)|(8086:37c8)|(8086:0435)|(8086:6f54)|(8086:1f18)', output)
+ '(8086:19e2)|(8086:37c8)|(8086:0435)|(8086:6f54)|(8086:18ee)', output)
# If QAT devices found
if not data:
raise ConfigError('No QAT acceleration device found')
diff --git a/src/conf_mode/interfaces-bonding.py b/src/conf_mode/interfaces-bonding.py
index ea9bd54d4..1a549f27d 100755
--- a/src/conf_mode/interfaces-bonding.py
+++ b/src/conf_mode/interfaces-bonding.py
@@ -33,7 +33,7 @@ from vyos.configverify import verify_vlan_config
from vyos.configverify import verify_vrf
from vyos.ifconfig import BondIf
from vyos.ifconfig import Section
-from vyos.util import vyos_dict_search
+from vyos.util import dict_search
from vyos.validate import has_address_configured
from vyos import ConfigError
from vyos import airbag
@@ -101,7 +101,7 @@ def get_config(config=None):
# also present the interfaces to be removed from the bond as dictionary
bond['member'].update({'interface_remove': tmp})
- if vyos_dict_search('member.interface', bond):
+ if dict_search('member.interface', bond):
for interface, interface_config in bond['member']['interface'].items():
# Check if member interface is already member of another bridge
tmp = is_member(conf, interface, 'bridge')
@@ -151,7 +151,7 @@ def verify(bond):
verify_vlan_config(bond)
bond_name = bond['ifname']
- if vyos_dict_search('member.interface', bond):
+ if dict_search('member.interface', bond):
for interface, interface_config in bond['member']['interface'].items():
error_msg = f'Can not add interface "{interface}" to bond, '
diff --git a/src/conf_mode/interfaces-bridge.py b/src/conf_mode/interfaces-bridge.py
index 4aeb8fc67..258f9ec79 100755
--- a/src/conf_mode/interfaces-bridge.py
+++ b/src/conf_mode/interfaces-bridge.py
@@ -32,7 +32,7 @@ from vyos.validate import has_address_configured
from vyos.xml import defaults
from vyos.util import cmd
-from vyos.util import vyos_dict_search
+from vyos.util import dict_search
from vyos import ConfigError
from vyos import airbag
@@ -58,7 +58,7 @@ def get_config(config=None):
else:
bridge.update({'member': {'interface_remove': tmp }})
- if vyos_dict_search('member.interface', bridge):
+ if dict_search('member.interface', bridge):
# XXX: T2665: we need a copy of the dict keys for iteration, else we will get:
# RuntimeError: dictionary changed size during iteration
for interface in list(bridge['member']['interface']):
@@ -100,7 +100,7 @@ def verify(bridge):
verify_dhcpv6(bridge)
verify_vrf(bridge)
- if vyos_dict_search('member.interface', bridge):
+ if dict_search('member.interface', bridge):
for interface, interface_config in bridge['member']['interface'].items():
error_msg = f'Can not add interface "{interface}" to bridge, '
diff --git a/src/conf_mode/interfaces-openvpn.py b/src/conf_mode/interfaces-openvpn.py
index 3f4965029..b1318b9ee 100755
--- a/src/conf_mode/interfaces-openvpn.py
+++ b/src/conf_mode/interfaces-openvpn.py
@@ -17,110 +17,38 @@
import os
import re
-from copy import deepcopy
-from sys import exit,stderr
-from ipaddress import ip_address,ip_network,IPv4Address,IPv4Network,IPv6Address,IPv6Network,summarize_address_range
+from sys import exit
+from ipaddress import IPv4Address
+from ipaddress import IPv4Network
+from ipaddress import IPv6Address
+from ipaddress import IPv6Network
+from ipaddress import summarize_address_range
from netifaces import interfaces
from shutil import rmtree
from vyos.config import Config
-from vyos.configdict import list_diff
-from vyos.configdict import is_member
+from vyos.configdict import get_interface_dict
+from vyos.configverify import verify_vrf
+from vyos.configverify import verify_bridge_delete
+from vyos.configverify import verify_diffie_hellman_length
from vyos.ifconfig import VTunIf
from vyos.template import render
-from vyos.util import call, chown, chmod_600, chmod_755
-from vyos.validate import is_addr_assigned, is_ipv4
-from vyos import ConfigError
-from vyos.util import cmd
+from vyos.util import call
+from vyos.util import chown
+from vyos.util import chmod_600
+from vyos.util import dict_search
+from vyos.validate import is_addr_assigned
+from vyos.validate import is_ipv4
+from vyos.validate import is_ipv6
+from vyos import ConfigError
from vyos import airbag
airbag.enable()
user = 'openvpn'
group = 'openvpn'
-default_config_data = {
- 'address': [],
- 'auth_user': '',
- 'auth_pass': '',
- 'auth_user_pass_file': '',
- 'auth': False,
- 'compress_lzo': False,
- 'deleted': False,
- 'description': '',
- 'disable': False,
- 'disable_ncp': False,
- 'encryption': '',
- 'hash': '',
- 'intf': '',
- 'ipv6_accept_ra': 1,
- 'ipv6_autoconf': 0,
- 'ipv6_eui64_prefix': [],
- 'ipv6_eui64_prefix_remove': [],
- 'ipv6_forwarding': 1,
- 'ipv6_dup_addr_detect': 1,
- 'ipv6_local_address': [],
- 'ipv6_remote_address': [],
- 'is_bridge_member': False,
- 'ping_restart': '60',
- 'ping_interval': '10',
- 'local_address': [],
- 'local_address_subnet': '',
- 'local_host': '',
- 'local_port': '',
- 'mode': '',
- 'ncp_ciphers': '',
- 'options': [],
- 'persistent_tunnel': False,
- 'protocol': 'udp',
- 'protocol_real': '',
- 'redirect_gateway': '',
- 'remote_address': [],
- 'remote_host': [],
- 'remote_port': '',
- 'client': [],
- 'server_domain': '',
- 'server_max_conn': '',
- 'server_dns_nameserver': [],
- 'server_pool': True,
- 'server_pool_start': '',
- 'server_pool_stop': '',
- 'server_pool_netmask': '',
- 'server_push_route': [],
- 'server_reject_unconfigured': False,
- 'server_subnet': [],
- 'server_topology': '',
- 'server_ipv6_dns_nameserver': [],
- 'server_ipv6_local': '',
- 'server_ipv6_prefixlen': '',
- 'server_ipv6_remote': '',
- 'server_ipv6_pool': True,
- 'server_ipv6_pool_base': '',
- 'server_ipv6_pool_prefixlen': '',
- 'server_ipv6_push_route': [],
- 'server_ipv6_subnet': [],
- 'shared_secret_file': '',
- 'tls': False,
- 'tls_auth': '',
- 'tls_ca_cert': '',
- 'tls_cert': '',
- 'tls_crl': '',
- 'tls_dh': '',
- 'tls_key': '',
- 'tls_crypt': '',
- 'tls_role': '',
- 'tls_version_min': '',
- 'type': 'tun',
- 'uid': user,
- 'gid': group,
- 'vrf': ''
-}
-
-
-def get_config_name(intf):
- cfg_file = f'/run/openvpn/{intf}.conf'
- return cfg_file
-
+cfg_file = '/run/openvpn/{ifname}.conf'
def checkCertHeader(header, filename):
"""
@@ -137,600 +65,125 @@ def checkCertHeader(header, filename):
return False
-def getDefaultServer(network, topology, devtype):
+def get_config(config=None):
"""
- Gets the default server parameters for a IPv4 "server" directive.
- Logic from openvpn's src/openvpn/helper.c.
- Returns a dict with addresses or False if the input parameters were incorrect.
+ Retrive CLI config as dictionary. Dictionary can never be empty, as at least the
+ interface name will be added or a deleted flag
"""
- if not (devtype == 'tun' or devtype == 'tap'):
- return False
-
- if not network.version == 4:
- return False
- elif (devtype == 'tun' and network.prefixlen > 29) or (devtype == 'tap' and network.prefixlen > 30):
- return False
-
- server = {
- 'local': '',
- 'remote_netmask': '',
- 'client_remote_netmask': '',
- 'pool_start': '',
- 'pool_stop': '',
- 'pool_netmask': ''
- }
-
- if devtype == 'tun':
- if topology == 'net30' or topology == 'point-to-point':
- server['local'] = network[1]
- server['remote_netmask'] = network[2]
- server['client_remote_netmask'] = server['local']
-
- # pool start is 4th host IP in subnet (.4 in a /24)
- server['pool_start'] = network[4]
-
- if network.prefixlen == 29:
- server['pool_stop'] = network.broadcast_address
- else:
- # pool end is -4 from the broadcast address (.251 in a /24)
- server['pool_stop'] = network[-5]
-
- elif topology == 'subnet':
- server['local'] = network[1]
- server['remote_netmask'] = str(network.netmask)
- server['client_remote_netmask'] = server['remote_netmask']
- server['pool_start'] = network[2]
- server['pool_stop'] = network[-3]
- server['pool_netmask'] = server['remote_netmask']
-
- elif devtype == 'tap':
- server['local'] = network[1]
- server['remote_netmask'] = str(network.netmask)
- server['client_remote_netmask'] = server['remote_netmask']
- server['pool_start'] = network[2]
- server['pool_stop'] = network[-2]
- server['pool_netmask'] = server['remote_netmask']
-
- return server
-
-def get_config(config=None):
- openvpn = deepcopy(default_config_data)
if config:
conf = config
else:
conf = Config()
+ base = ['interfaces', 'openvpn']
+ openvpn = get_interface_dict(conf, base)
- # determine tagNode instance
- if 'VYOS_TAGNODE_VALUE' not in os.environ:
- raise ConfigError('Interface (VYOS_TAGNODE_VALUE) not specified')
-
- openvpn['intf'] = os.environ['VYOS_TAGNODE_VALUE']
- openvpn['auth_user_pass_file'] = f"/run/openvpn/{openvpn['intf']}.pw"
-
- # check if interface is member of a bridge
- tmp = is_member(conf, openvpn['intf'], 'bridge')
- if tmp: openvpn['is_bridge_member'] = next(iter(tmp))
-
- # Check if interface instance has been removed
- if not conf.exists('interfaces openvpn ' + openvpn['intf']):
- openvpn['deleted'] = True
- return openvpn
-
- # bridged server should not have a pool by default (but can be specified manually)
- if openvpn['is_bridge_member']:
- openvpn['server_pool'] = False
- openvpn['server_ipv6_pool'] = False
-
- # set configuration level
- conf.set_level('interfaces openvpn ' + openvpn['intf'])
-
- # retrieve authentication options - username
- if conf.exists('authentication username'):
- openvpn['auth_user'] = conf.return_value('authentication username')
- openvpn['auth'] = True
-
- # retrieve authentication options - username
- if conf.exists('authentication password'):
- openvpn['auth_pass'] = conf.return_value('authentication password')
- openvpn['auth'] = True
-
- # retrieve interface description
- if conf.exists('description'):
- openvpn['description'] = conf.return_value('description')
-
- # interface device-type
- if conf.exists('device-type'):
- openvpn['type'] = conf.return_value('device-type')
-
- # disable interface
- if conf.exists('disable'):
- openvpn['disable'] = True
-
- # data encryption algorithm cipher
- if conf.exists('encryption cipher'):
- openvpn['encryption'] = conf.return_value('encryption cipher')
-
- # disable ncp-ciphers support
- if conf.exists('encryption disable-ncp'):
- openvpn['disable_ncp'] = True
-
- # data encryption algorithm ncp-list
- if conf.exists('encryption ncp-ciphers'):
- _ncp_ciphers = []
- for enc in conf.return_values('encryption ncp-ciphers'):
- if enc == 'none':
- _ncp_ciphers.append('none')
- _ncp_ciphers.append('NONE')
- elif enc == 'des':
- _ncp_ciphers.append('des-cbc')
- _ncp_ciphers.append('DES-CBC')
- elif enc == '3des':
- _ncp_ciphers.append('des-ede3-cbc')
- _ncp_ciphers.append('DES-EDE3-CBC')
- elif enc == 'aes128':
- _ncp_ciphers.append('aes-128-cbc')
- _ncp_ciphers.append('AES-128-CBC')
- elif enc == 'aes128gcm':
- _ncp_ciphers.append('aes-128-gcm')
- _ncp_ciphers.append('AES-128-GCM')
- elif enc == 'aes192':
- _ncp_ciphers.append('aes-192-cbc')
- _ncp_ciphers.append('AES-192-CBC')
- elif enc == 'aes192gcm':
- _ncp_ciphers.append('aes-192-gcm')
- _ncp_ciphers.append('AES-192-GCM')
- elif enc == 'aes256':
- _ncp_ciphers.append('aes-256-cbc')
- _ncp_ciphers.append('AES-256-CBC')
- elif enc == 'aes256gcm':
- _ncp_ciphers.append('aes-256-gcm')
- _ncp_ciphers.append('AES-256-GCM')
- openvpn['ncp_ciphers'] = ':'.join(_ncp_ciphers)
-
- # hash algorithm
- if conf.exists('hash'):
- openvpn['hash'] = conf.return_value('hash')
-
- # Maximum number of keepalive packet failures
- if conf.exists('keep-alive failure-count') and conf.exists('keep-alive interval'):
- fail_count = conf.return_value('keep-alive failure-count')
- interval = conf.return_value('keep-alive interval')
- openvpn['ping_interval' ] = interval
- openvpn['ping_restart' ] = int(interval) * int(fail_count)
-
- # Local IP address of tunnel - even as it is a tag node - we can only work
- # on the first address
- if conf.exists('local-address'):
- for tmp in conf.list_nodes('local-address'):
- tmp_ip = ip_address(tmp)
- if tmp_ip.version == 4:
- openvpn['local_address'].append(tmp)
- if conf.exists('local-address {} subnet-mask'.format(tmp)):
- openvpn['local_address_subnet'] = conf.return_value('local-address {} subnet-mask'.format(tmp))
- elif tmp_ip.version == 6:
- # input IPv6 address could be expanded so get the compressed version
- openvpn['ipv6_local_address'].append(str(tmp_ip))
-
- # Local IP address to accept connections
- if conf.exists('local-host'):
- openvpn['local_host'] = conf.return_value('local-host')
-
- # Local port number to accept connections
- if conf.exists('local-port'):
- openvpn['local_port'] = conf.return_value('local-port')
-
- # Enable acquisition of IPv6 address using stateless autoconfig (SLAAC)
- if conf.exists('ipv6 address autoconf'):
- openvpn['ipv6_autoconf'] = 1
-
- # Get prefixes for IPv6 addressing based on MAC address (EUI-64)
- if conf.exists('ipv6 address eui64'):
- openvpn['ipv6_eui64_prefix'] = conf.return_values('ipv6 address eui64')
-
- # Determine currently effective EUI64 addresses - to determine which
- # address is no longer valid and needs to be removed
- eff_addr = conf.return_effective_values('ipv6 address eui64')
- openvpn['ipv6_eui64_prefix_remove'] = list_diff(eff_addr, openvpn['ipv6_eui64_prefix'])
-
- # Remove the default link-local address if set.
- if conf.exists('ipv6 address no-default-link-local'):
- openvpn['ipv6_eui64_prefix_remove'].append('fe80::/64')
- else:
- # add the link-local by default to make IPv6 work
- openvpn['ipv6_eui64_prefix'].append('fe80::/64')
-
- # Disable IPv6 forwarding on this interface
- if conf.exists('ipv6 disable-forwarding'):
- openvpn['ipv6_forwarding'] = 0
-
- # IPv6 Duplicate Address Detection (DAD) tries
- if conf.exists('ipv6 dup-addr-detect-transmits'):
- openvpn['ipv6_dup_addr_detect'] = int(conf.return_value('ipv6 dup-addr-detect-transmits'))
-
- # to make IPv6 SLAAC and DHCPv6 work with forwarding=1,
- # accept_ra must be 2
- if openvpn['ipv6_autoconf'] or 'dhcpv6' in openvpn['address']:
- openvpn['ipv6_accept_ra'] = 2
-
- # OpenVPN operation mode
- if conf.exists('mode'):
- openvpn['mode'] = conf.return_value('mode')
-
- # Additional OpenVPN options
- if conf.exists('openvpn-option'):
- openvpn['options'] = conf.return_values('openvpn-option')
-
- # Do not close and reopen interface
- if conf.exists('persistent-tunnel'):
- openvpn['persistent_tunnel'] = True
-
- # Communication protocol
- if conf.exists('protocol'):
- openvpn['protocol'] = conf.return_value('protocol')
-
- # IP address of remote end of tunnel
- if conf.exists('remote-address'):
- for tmp in conf.return_values('remote-address'):
- tmp_ip = ip_address(tmp)
- if tmp_ip.version == 4:
- openvpn['remote_address'].append(tmp)
- elif tmp_ip.version == 6:
- openvpn['ipv6_remote_address'].append(str(tmp_ip))
-
- # Remote host to connect to (dynamic if not set)
- if conf.exists('remote-host'):
- openvpn['remote_host'] = conf.return_values('remote-host')
-
- # Remote port number to connect to
- if conf.exists('remote-port'):
- openvpn['remote_port'] = conf.return_value('remote-port')
-
- # OpenVPN tunnel to be used as the default route
- # see https://openvpn.net/community-resources/reference-manual-for-openvpn-2-4/
- # redirect-gateway flags
- if conf.exists('replace-default-route'):
- openvpn['redirect_gateway'] = 'def1'
-
- if conf.exists('replace-default-route local'):
- openvpn['redirect_gateway'] = 'local def1'
-
- # Topology for clients
- if conf.exists('server topology'):
- openvpn['server_topology'] = conf.return_value('server topology')
-
- # Server-mode subnet (from which client IPs are allocated)
- server_network_v4 = None
- server_network_v6 = None
- if conf.exists('server subnet'):
- for tmp in conf.return_values('server subnet'):
- tmp_ip = ip_network(tmp)
- if tmp_ip.version == 4:
- server_network_v4 = tmp_ip
- # convert the network to format: "192.0.2.0 255.255.255.0" for later use in template
- openvpn['server_subnet'].append(tmp_ip.with_netmask.replace(r'/', ' '))
- elif tmp_ip.version == 6:
- server_network_v6 = tmp_ip
- openvpn['server_ipv6_subnet'].append(str(tmp_ip))
-
- # Client-specific settings
- for client in conf.list_nodes('server client'):
- # set configuration level
- conf.set_level('interfaces openvpn ' + openvpn['intf'] + ' server client ' + client)
- data = {
- 'name': client,
- 'disable': False,
- 'ip': [],
- 'ipv6_ip': [],
- 'ipv6_remote': '',
- 'ipv6_push_route': [],
- 'ipv6_subnet': [],
- 'push_route': [],
- 'subnet': [],
- 'remote_netmask': ''
- }
-
- # Option to disable client connection
- if conf.exists('disable'):
- data['disable'] = True
-
- # IP address of the client
- for tmp in conf.return_values('ip'):
- tmp_ip = ip_address(tmp)
- if tmp_ip.version == 4:
- data['ip'].append(tmp)
- elif tmp_ip.version == 6:
- data['ipv6_ip'].append(str(tmp_ip))
-
- # Route to be pushed to the client
- for tmp in conf.return_values('push-route'):
- tmp_ip = ip_network(tmp)
- if tmp_ip.version == 4:
- data['push_route'].append(tmp_ip.with_netmask.replace(r'/', ' '))
- elif tmp_ip.version == 6:
- data['ipv6_push_route'].append(str(tmp_ip))
-
- # Subnet belonging to the client
- for tmp in conf.return_values('subnet'):
- tmp_ip = ip_network(tmp)
- if tmp_ip.version == 4:
- data['subnet'].append(tmp_ip.with_netmask.replace(r'/', ' '))
- elif tmp_ip.version == 6:
- data['ipv6_subnet'].append(str(tmp_ip))
-
- # Append to global client list
- openvpn['client'].append(data)
-
- # re-set configuration level
- conf.set_level('interfaces openvpn ' + openvpn['intf'])
-
- # Server client IP pool
- if conf.exists('server client-ip-pool'):
- conf.set_level('interfaces openvpn ' + openvpn['intf'] + ' server client-ip-pool')
-
- # enable or disable server_pool where necessary
- # default is enabled, or disabled in bridge mode
- openvpn['server_pool'] = not conf.exists('disable')
-
- if conf.exists('start'):
- openvpn['server_pool_start'] = conf.return_value('start')
-
- if conf.exists('stop'):
- openvpn['server_pool_stop'] = conf.return_value('stop')
-
- if conf.exists('netmask'):
- openvpn['server_pool_netmask'] = conf.return_value('netmask')
-
- conf.set_level('interfaces openvpn ' + openvpn['intf'])
-
- # Server client IPv6 pool
- if conf.exists('server client-ipv6-pool'):
- conf.set_level('interfaces openvpn ' + openvpn['intf'] + ' server client-ipv6-pool')
- openvpn['server_ipv6_pool'] = not conf.exists('disable')
- if conf.exists('base'):
- tmp = conf.return_value('base').split('/')
- openvpn['server_ipv6_pool_base'] = str(IPv6Address(tmp[0]))
- if 1 < len(tmp):
- openvpn['server_ipv6_pool_prefixlen'] = tmp[1]
-
- conf.set_level('interfaces openvpn ' + openvpn['intf'])
-
- # DNS suffix to be pushed to all clients
- if conf.exists('server domain-name'):
- openvpn['server_domain'] = conf.return_value('server domain-name')
-
- # Number of maximum client connections
- if conf.exists('server max-connections'):
- openvpn['server_max_conn'] = conf.return_value('server max-connections')
-
- # Domain Name Server (DNS)
- if conf.exists('server name-server'):
- for tmp in conf.return_values('server name-server'):
- tmp_ip = ip_address(tmp)
- if tmp_ip.version == 4:
- openvpn['server_dns_nameserver'].append(tmp)
- elif tmp_ip.version == 6:
- openvpn['server_ipv6_dns_nameserver'].append(str(tmp_ip))
-
- # Route to be pushed to all clients
- if conf.exists('server push-route'):
- for tmp in conf.return_values('server push-route'):
- tmp_ip = ip_network(tmp)
- if tmp_ip.version == 4:
- openvpn['server_push_route'].append(tmp_ip.with_netmask.replace(r'/', ' '))
- elif tmp_ip.version == 6:
- openvpn['server_ipv6_push_route'].append(str(tmp_ip))
-
- # Reject connections from clients that are not explicitly configured
- if conf.exists('server reject-unconfigured-clients'):
- openvpn['server_reject_unconfigured'] = True
-
- # File containing TLS auth static key
- if conf.exists('tls auth-file'):
- openvpn['tls_auth'] = conf.return_value('tls auth-file')
- openvpn['tls'] = True
-
- # File containing certificate for Certificate Authority (CA)
- if conf.exists('tls ca-cert-file'):
- openvpn['tls_ca_cert'] = conf.return_value('tls ca-cert-file')
- openvpn['tls'] = True
-
- # File containing certificate for this host
- if conf.exists('tls cert-file'):
- openvpn['tls_cert'] = conf.return_value('tls cert-file')
- openvpn['tls'] = True
-
- # File containing certificate revocation list (CRL) for this host
- if conf.exists('tls crl-file'):
- openvpn['tls_crl'] = conf.return_value('tls crl-file')
- openvpn['tls'] = True
-
- # File containing Diffie Hellman parameters (server only)
- if conf.exists('tls dh-file'):
- openvpn['tls_dh'] = conf.return_value('tls dh-file')
- openvpn['tls'] = True
-
- # File containing this host's private key
- if conf.exists('tls key-file'):
- openvpn['tls_key'] = conf.return_value('tls key-file')
- openvpn['tls'] = True
-
- # File containing key to encrypt control channel packets
- if conf.exists('tls crypt-file'):
- openvpn['tls_crypt'] = conf.return_value('tls crypt-file')
- openvpn['tls'] = True
-
- # Role in TLS negotiation
- if conf.exists('tls role'):
- openvpn['tls_role'] = conf.return_value('tls role')
- openvpn['tls'] = True
-
- # Minimum required TLS version
- if conf.exists('tls tls-version-min'):
- openvpn['tls_version_min'] = conf.return_value('tls tls-version-min')
- openvpn['tls'] = True
-
- if conf.exists('shared-secret-key-file'):
- openvpn['shared_secret_file'] = conf.return_value('shared-secret-key-file')
-
- if conf.exists('use-lzo-compression'):
- openvpn['compress_lzo'] = True
-
- # Special case when using EC certificates:
- # if key-file is EC and dh-file is unset, set tls_dh to 'none'
- if not openvpn['tls_dh'] and openvpn['tls_key'] and checkCertHeader('-----BEGIN EC PRIVATE KEY-----', openvpn['tls_key']):
- openvpn['tls_dh'] = 'none'
-
- # set default server topology to net30
- if openvpn['mode'] == 'server' and not openvpn['server_topology']:
- openvpn['server_topology'] = 'net30'
-
- # Convert protocol to real protocol used by openvpn.
- # To make openvpn listen on both IPv4 and IPv6 we must use *6 protocols
- # (https://community.openvpn.net/openvpn/ticket/360), unless the local-host
- # or each of the remote-host in client mode is IPv4
- # in which case it must use the standard protocols.
- if openvpn['protocol'] == 'tcp-active':
- openvpn['protocol_real'] = 'tcp6-client'
- elif openvpn['protocol'] == 'tcp-passive':
- openvpn['protocol_real'] = 'tcp6-server'
- else:
- openvpn['protocol_real'] = 'udp6'
-
- if ( is_ipv4(openvpn['local_host']) or
- # in client mode test all the remotes instead
- (openvpn['mode'] == 'client' and all([is_ipv4(h) for h in openvpn['remote_host']])) ):
- # takes out the '6'
- openvpn['protocol_real'] = openvpn['protocol_real'][:3] + openvpn['protocol_real'][4:]
-
- # Set defaults where necessary.
- # If any of the input parameters are wrong,
- # this will return False and no defaults will be set.
- if server_network_v4 and openvpn['server_topology'] and openvpn['type']:
- default_server = None
- default_server = getDefaultServer(server_network_v4, openvpn['server_topology'], openvpn['type'])
- if default_server:
- # server-bridge doesn't require a pool so don't set defaults for it
- if openvpn['server_pool'] and not openvpn['is_bridge_member']:
- if not openvpn['server_pool_start']:
- openvpn['server_pool_start'] = default_server['pool_start']
-
- if not openvpn['server_pool_stop']:
- openvpn['server_pool_stop'] = default_server['pool_stop']
-
- if not openvpn['server_pool_netmask']:
- openvpn['server_pool_netmask'] = default_server['pool_netmask']
-
- for client in openvpn['client']:
- client['remote_netmask'] = default_server['client_remote_netmask']
-
- if server_network_v6:
- if not openvpn['server_ipv6_local']:
- openvpn['server_ipv6_local'] = server_network_v6[1]
- if not openvpn['server_ipv6_prefixlen']:
- openvpn['server_ipv6_prefixlen'] = server_network_v6.prefixlen
- if not openvpn['server_ipv6_remote']:
- openvpn['server_ipv6_remote'] = server_network_v6[2]
-
- if openvpn['server_ipv6_pool'] and server_network_v6.prefixlen < 112:
- if not openvpn['server_ipv6_pool_base']:
- openvpn['server_ipv6_pool_base'] = server_network_v6[0x1000]
- if not openvpn['server_ipv6_pool_prefixlen']:
- openvpn['server_ipv6_pool_prefixlen'] = openvpn['server_ipv6_prefixlen']
-
- for client in openvpn['client']:
- client['ipv6_remote'] = openvpn['server_ipv6_local']
-
- if openvpn['redirect_gateway']:
- openvpn['redirect_gateway'] += ' ipv6'
-
- # retrieve VRF instance
- if conf.exists('vrf'):
- openvpn['vrf'] = conf.return_value('vrf')
+ openvpn['auth_user_pass_file'] = '/run/openvpn/{ifname}.pw'.format(**openvpn)
+ openvpn['daemon_user'] = user
+ openvpn['daemon_group'] = group
return openvpn
def verify(openvpn):
- if openvpn['deleted']:
- if openvpn['is_bridge_member']:
- raise ConfigError((
- f'Cannot delete interface "{openvpn["intf"]}" as it is a '
- f'member of bridge "{openvpn["is_bridge_menber"]}"!'))
+ if 'deleted' in openvpn:
+ verify_bridge_delete(openvpn)
return None
-
- if not openvpn['mode']:
- raise ConfigError('Must specify OpenVPN operation mode')
+ if 'mode' not in openvpn:
+ raise ConfigError('Must specify OpenVPN operation mode!')
# Check if we have disabled ncp and at the same time specified ncp-ciphers
- if openvpn['disable_ncp'] and openvpn['ncp_ciphers']:
- raise ConfigError('Cannot specify both "encryption disable-ncp" and "encryption ncp-ciphers"')
+ if 'encryption' in openvpn:
+ if {'disable_ncp', 'ncp_ciphers'} <= set(openvpn.get('encryption')):
+ raise ConfigError('Can not specify both "encryption disable-ncp" '\
+ 'and "encryption ncp-ciphers"')
+
#
# OpenVPN client mode - VERIFY
#
if openvpn['mode'] == 'client':
- if openvpn['local_port']:
+ if 'local_port' in openvpn:
raise ConfigError('Cannot specify "local-port" in client mode')
- if openvpn['local_host']:
+ if 'local_host' in openvpn:
raise ConfigError('Cannot specify "local-host" in client mode')
+ if 'remote_host' not in openvpn:
+ raise ConfigError('Must specify "remote-host" in client mode')
+
if openvpn['protocol'] == 'tcp-passive':
raise ConfigError('Protocol "tcp-passive" is not valid in client mode')
- if not openvpn['remote_host']:
- raise ConfigError('Must specify "remote-host" in client mode')
-
- if openvpn['tls_dh'] and openvpn['tls_dh'] != 'none':
+ if dict_search('tls.dh_file', openvpn):
raise ConfigError('Cannot specify "tls dh-file" in client mode')
#
# OpenVPN site-to-site - VERIFY
#
- if openvpn['mode'] == 'site-to-site':
- if openvpn['ncp_ciphers']:
- raise ConfigError('encryption ncp-ciphers cannot be specified in site-to-site mode, only server or client')
-
- if openvpn['mode'] == 'site-to-site' and not openvpn['is_bridge_member']:
- if not (openvpn['local_address'] or openvpn['ipv6_local_address']):
+ elif openvpn['mode'] == 'site-to-site':
+ if not 'local_address' in openvpn:
raise ConfigError('Must specify "local-address" or add interface to bridge')
- if len(openvpn['local_address']) > 1 or len(openvpn['ipv6_local_address']) > 1:
- raise ConfigError('Cannot specify more than 1 IPv4 and 1 IPv6 "local-address"')
+ if len([addr for addr in openvpn['local_address'] if is_ipv4(addr)]) > 1:
+ raise ConfigError('Only one IPv4 local-address can be specified')
- if len(openvpn['remote_address']) > 1 or len(openvpn['ipv6_remote_address']) > 1:
- raise ConfigError('Cannot specify more than 1 IPv4 and 1 IPv6 "remote-address"')
+ if len([addr for addr in openvpn['local_address'] if is_ipv6(addr)]) > 1:
+ raise ConfigError('Only one IPv6 local-address can be specified')
- for host in openvpn['remote_host']:
- if host in openvpn['remote_address'] or host in openvpn['ipv6_remote_address']:
- raise ConfigError('"remote-address" cannot be the same as "remote-host"')
+ if openvpn['device_type'] == 'tun':
+ if 'remote_address' not in openvpn:
+ raise ConfigError('Must specify "remote-address"')
- if openvpn['local_address'] and not (openvpn['remote_address'] or openvpn['local_address_subnet']):
- raise ConfigError('IPv4 "local-address" requires IPv4 "remote-address" or IPv4 "local-address subnet"')
+ if 'remote_address' in openvpn:
+ if len([addr for addr in openvpn['remote_address'] if is_ipv4(addr)]) > 1:
+ raise ConfigError('Only one IPv4 remote-address can be specified')
- if openvpn['remote_address'] and not openvpn['local_address']:
- raise ConfigError('IPv4 "remote-address" requires IPv4 "local-address"')
+ if len([addr for addr in openvpn['remote_address'] if is_ipv6(addr)]) > 1:
+ raise ConfigError('Only one IPv6 remote-address can be specified')
- if openvpn['ipv6_local_address'] and not openvpn['ipv6_remote_address']:
- raise ConfigError('IPv6 "local-address" requires IPv6 "remote-address"')
+ if not 'local_address' in openvpn:
+ raise ConfigError('"remote-address" requires "local-address"')
- if openvpn['ipv6_remote_address'] and not openvpn['ipv6_local_address']:
- raise ConfigError('IPv6 "remote-address" requires IPv6 "local-address"')
+ v4loAddr = [addr for addr in openvpn['local_address'] if is_ipv4(addr)]
+ v4remAddr = [addr for addr in openvpn['remote_address'] if is_ipv4(addr)]
+ if v4loAddr and not v4remAddr:
+ raise ConfigError('IPv4 "local-address" requires IPv4 "remote-address"')
+ elif v4remAddr and not v4loAddr:
+ raise ConfigError('IPv4 "remote-address" requires IPv4 "local-address"')
- if openvpn['type'] == 'tun':
- if not (openvpn['remote_address'] or openvpn['ipv6_remote_address']):
- raise ConfigError('Must specify "remote-address"')
+ v6remAddr = [addr for addr in openvpn['remote_address'] if is_ipv6(addr)]
+ v6loAddr = [addr for addr in openvpn['local_address'] if is_ipv6(addr)]
+ if v6loAddr and not v6remAddr:
+ raise ConfigError('IPv6 "local-address" requires IPv6 "remote-address"')
+ elif v6remAddr and not v6loAddr:
+ raise ConfigError('IPv6 "remote-address" requires IPv6 "local-address"')
- if ( (openvpn['local_address'] and openvpn['local_address'] == openvpn['remote_address']) or
- (openvpn['ipv6_local_address'] and openvpn['ipv6_local_address'] == openvpn['ipv6_remote_address']) ):
+ if (v4loAddr == v4remAddr) or (v6remAddr == v4remAddr):
raise ConfigError('"local-address" and "remote-address" cannot be the same')
- if openvpn['local_host'] in openvpn['local_address'] or openvpn['local_host'] in openvpn['ipv6_local_address']:
+ if dict_search('local_host', openvpn) in dict_search('local_address', openvpn):
raise ConfigError('"local-address" cannot be the same as "local-host"')
+ if dict_search('remote_host', openvpn) in dict_search('remote_address', openvpn):
+ raise ConfigError('"remote-address" and "remote-host" can not be the same')
+
+
+ if 'local_address' in openvpn:
+ # we can only have one local_address, this is ensured above
+ v4addr = None
+ for laddr in openvpn['local_address']:
+ if is_ipv4(laddr): v4addr = laddr
+
+ if 'remote_address' not in openvpn and (v4addr not in openvpn['local_address'] or 'subnet_mask' not in openvpn['local_address'][v4addr]):
+ raise ConfigError('IPv4 "local-address" requires IPv4 "remote-address" or IPv4 "local-address subnet"')
+
+ if dict_search('encryption.ncp_ciphers', openvpn):
+ raise ConfigError('NCP ciphers can only be used in client or server mode')
+
else:
# checks for client-server or site-to-site bridged
- if openvpn['local_address'] or openvpn['ipv6_local_address'] or openvpn['remote_address'] or openvpn['ipv6_remote_address']:
- raise ConfigError('Cannot specify "local-address" or "remote-address" in client-server or bridge mode')
+ if 'local_address' in openvpn or 'remote_address' in openvpn:
+ raise ConfigError('Cannot specify "local-address" or "remote-address" ' \
+ 'in client/server or bridge mode')
#
# OpenVPN server mode - VERIFY
@@ -739,47 +192,57 @@ def verify(openvpn):
if openvpn['protocol'] == 'tcp-active':
raise ConfigError('Protocol "tcp-active" is not valid in server mode')
- if openvpn['remote_port']:
+ if 'remote_port' in openvpn:
raise ConfigError('Cannot specify "remote-port" in server mode')
- if openvpn['remote_host']:
+ if 'remote_host' in openvpn:
raise ConfigError('Cannot specify "remote-host" in server mode')
- if openvpn['protocol'] == 'tcp-passive' and len(openvpn['remote_host']) > 1:
- raise ConfigError('Cannot specify more than 1 "remote-host" with "tcp-passive"')
-
- if not openvpn['tls_dh'] and not checkCertHeader('-----BEGIN EC PRIVATE KEY-----', openvpn['tls_key']):
- raise ConfigError('Must specify "tls dh-file" when not using EC keys in server mode')
-
- if len(openvpn['server_subnet']) > 1 or len(openvpn['server_ipv6_subnet']) > 1:
- raise ConfigError('Cannot specify more than 1 IPv4 and 1 IPv6 server subnet')
-
- for client in openvpn['client']:
- if len(client['ip']) > 1 or len(client['ipv6_ip']) > 1:
- raise ConfigError(f'Server client "{client["name"]}": cannot specify more than 1 IPv4 and 1 IPv6 IP')
+ if 'tls' in openvpn:
+ if 'dh_file' not in openvpn['tls']:
+ if 'key_file' in openvpn['tls'] and not checkCertHeader('-----BEGIN EC PRIVATE KEY-----', openvpn['tls']['key_file']):
+ raise ConfigError('Must specify "tls dh-file" when not using EC keys in server mode')
+
+ tmp = dict_search('server.subnet', openvpn)
+ if tmp:
+ v4_subnets = len([subnet for subnet in tmp if is_ipv4(subnet)])
+ v6_subnets = len([subnet for subnet in tmp if is_ipv6(subnet)])
+ if v4_subnets > 1:
+ raise ConfigError('Cannot specify more than 1 IPv4 server subnet')
+ if v6_subnets > 1:
+ raise ConfigError('Cannot specify more than 1 IPv6 server subnet')
+
+ if v6_subnets > 0 and v4_subnets == 0:
+ raise ConfigError('IPv6 server requires an IPv4 server subnet')
- if openvpn['server_subnet']:
- subnet = IPv4Network(openvpn['server_subnet'][0].replace(' ', '/'))
+ for subnet in tmp:
+ if is_ipv4(subnet):
+ subnet = IPv4Network(subnet)
- if openvpn['type'] == 'tun' and subnet.prefixlen > 29:
- raise ConfigError('Server subnets smaller than /29 with device type "tun" are not supported')
- elif openvpn['type'] == 'tap' and subnet.prefixlen > 30:
- raise ConfigError('Server subnets smaller than /30 with device type "tap" are not supported')
+ if openvpn['device_type'] == 'tun' and subnet.prefixlen > 29:
+ raise ConfigError('Server subnets smaller than /29 with device type "tun" are not supported')
+ elif openvpn['device_type'] == 'tap' and subnet.prefixlen > 30:
+ raise ConfigError('Server subnets smaller than /30 with device type "tap" are not supported')
- for client in openvpn['client']:
- if client['ip'] and not IPv4Address(client['ip'][0]) in subnet:
- raise ConfigError(f'Client "{client["name"]}" IP {client["ip"][0]} not in server subnet {subnet}')
+ for client in (dict_search('client', openvpn) or []):
+ if client['ip'] and not IPv4Address(client['ip'][0]) in subnet:
+ raise ConfigError(f'Client "{client["name"]}" IP {client["ip"][0]} not in server subnet {subnet}')
else:
- if not openvpn['is_bridge_member']:
+ if 'is_bridge_member' not in openvpn:
raise ConfigError('Must specify "server subnet" or add interface to bridge in server mode')
- if openvpn['server_pool']:
- if not (openvpn['server_pool_start'] and openvpn['server_pool_stop']):
- raise ConfigError('Server client-ip-pool requires both start and stop addresses in bridged mode')
+
+ for client in (dict_search('client', openvpn) or []):
+ if len(client['ip']) > 1 or len(client['ipv6_ip']) > 1:
+ raise ConfigError(f'Server client "{client["name"]}": cannot specify more than 1 IPv4 and 1 IPv6 IP')
+
+ if dict_search('server.client_ip_pool', openvpn):
+ if not (dict_search('server.client_ip_pool.start', openvpn) and dict_search('server.client_ip_pool.stop', openvpn)):
+ raise ConfigError('Server client-ip-pool requires both start and stop addresses')
else:
- v4PoolStart = IPv4Address(openvpn['server_pool_start'])
- v4PoolStop = IPv4Address(openvpn['server_pool_stop'])
+ v4PoolStart = IPv4Address(dict_search('server.client_ip_pool.start', openvpn))
+ v4PoolStop = IPv4Address(dict_search('server.client_ip_pool.stop', openvpn))
if v4PoolStart > v4PoolStop:
raise ConfigError(f'Server client-ip-pool start address {v4PoolStart} is larger than stop address {v4PoolStop}')
@@ -788,59 +251,57 @@ def verify(openvpn):
raise ConfigError(f'Server client-ip-pool is too large [{v4PoolStart} -> {v4PoolStop} = {v4PoolSize}], maximum is 65536 addresses.')
v4PoolNets = list(summarize_address_range(v4PoolStart, v4PoolStop))
- for client in openvpn['client']:
+ for client in (dict_search('client', openvpn) or []):
if client['ip']:
for v4PoolNet in v4PoolNets:
if IPv4Address(client['ip'][0]) in v4PoolNet:
- print(f'Warning: Client "{client["name"]}" IP {client["ip"][0]} is in server IP pool, it is not reserved for this client.',
- file=stderr)
-
- if openvpn['server_ipv6_subnet']:
- if not openvpn['server_subnet']:
- raise ConfigError('IPv6 server requires an IPv4 server subnet')
-
- if openvpn['server_ipv6_pool']:
- if not openvpn['server_pool']:
- raise ConfigError('IPv6 server pool requires an IPv4 server pool')
-
- if int(openvpn['server_ipv6_pool_prefixlen']) >= 112:
- raise ConfigError('IPv6 server pool must be larger than /112')
-
- v6PoolStart = IPv6Address(openvpn['server_ipv6_pool_base'])
- v6PoolStop = IPv6Network((v6PoolStart, openvpn['server_ipv6_pool_prefixlen']), strict=False)[-1] # don't remove the parentheses, it's a 2-tuple
- v6PoolSize = int(v6PoolStop) - int(v6PoolStart) if int(openvpn['server_ipv6_pool_prefixlen']) > 96 else 65536
- if v6PoolSize < v4PoolSize:
- raise ConfigError(f'IPv6 server pool must be at least as large as the IPv4 pool (current sizes: IPv6={v6PoolSize} IPv4={v4PoolSize})')
-
- v6PoolNets = list(summarize_address_range(v6PoolStart, v6PoolStop))
- for client in openvpn['client']:
- if client['ipv6_ip']:
- for v6PoolNet in v6PoolNets:
- if IPv6Address(client['ipv6_ip'][0]) in v6PoolNet:
- print(f'Warning: Client "{client["name"]}" IP {client["ipv6_ip"][0]} is in server IP pool, it is not reserved for this client.',
- file=stderr)
-
- else:
- if openvpn['server_ipv6_push_route']:
- raise ConfigError('IPv6 push-route requires an IPv6 server subnet')
+ print(f'Warning: Client "{client["name"]}" IP {client["ip"][0]} is in server IP pool, it is not reserved for this client.')
+
+ for subnet in (dict_search('server.subnet', openvpn) or []):
+ if is_ipv6(subnet):
+ tmp = dict_search('client_ipv6_pool.base', openvpn)
+ if tmp:
+ if not dict_search('server.client_ip_pool', openvpn):
+ raise ConfigError('IPv6 server pool requires an IPv4 server pool')
+
+ if int(tmp.split('/')[1]) >= 112:
+ raise ConfigError('IPv6 server pool must be larger than /112')
+
+ #
+ # todo - weird logic
+ #
+ v6PoolStart = IPv6Address(tmp)
+ v6PoolStop = IPv6Network((v6PoolStart, openvpn['server_ipv6_pool_prefixlen']), strict=False)[-1] # don't remove the parentheses, it's a 2-tuple
+ v6PoolSize = int(v6PoolStop) - int(v6PoolStart) if int(openvpn['server_ipv6_pool_prefixlen']) > 96 else 65536
+ if v6PoolSize < v4PoolSize:
+ raise ConfigError(f'IPv6 server pool must be at least as large as the IPv4 pool (current sizes: IPv6={v6PoolSize} IPv4={v4PoolSize})')
+
+ v6PoolNets = list(summarize_address_range(v6PoolStart, v6PoolStop))
+ for client in (dict_search('client', openvpn) or []):
+ if client['ipv6_ip']:
+ for v6PoolNet in v6PoolNets:
+ if IPv6Address(client['ipv6_ip'][0]) in v6PoolNet:
+ print(f'Warning: Client "{client["name"]}" IP {client["ipv6_ip"][0]} is in server IP pool, it is not reserved for this client.')
- for client in openvpn ['client']:
- if client['ipv6_ip']:
- raise ConfigError(f'Server client "{client["name"]}" IPv6 IP requires an IPv6 server subnet')
- if client['ipv6_push_route']:
- raise ConfigError(f'Server client "{client["name"]} IPv6 push-route requires an IPv6 server subnet"')
- if client['ipv6_subnet']:
- raise ConfigError(f'Server client "{client["name"]} IPv6 subnet requires an IPv6 server subnet"')
+ else:
+ for route in (dict_search('server.push_route', openvpn) or []):
+ if is_ipv6(route):
+ raise ConfigError('IPv6 push-route requires an IPv6 server subnet')
+
+ #for client in openvpn ['client']:
+ # if client['ipv6_ip']:
+ # raise ConfigError(f'Server client "{client["name"]}" IPv6 IP requires an IPv6 server subnet')
+ # if client['ipv6_push_route']:
+ # raise ConfigError(f'Server client "{client["name"]} IPv6 push-route requires an IPv6 server subnet"')
+ # if client['ipv6_subnet']:
+ # raise ConfigError(f'Server client "{client["name"]} IPv6 subnet requires an IPv6 server subnet"')
else:
# checks for both client and site-to-site go here
- if openvpn['server_reject_unconfigured']:
- raise ConfigError('reject-unconfigured-clients is only supported in OpenVPN server mode')
-
- if openvpn['server_topology']:
- raise ConfigError('The "topology" option is only valid in server mode')
+ if dict_search('server.reject_unconfigured_clients', openvpn):
+ raise ConfigError('Option reject-unconfigured-clients only supported in server mode')
- if (not openvpn['remote_host']) and openvpn['redirect_gateway']:
+ if 'replace_default_route' in openvpn and 'remote_host' not in openvpn:
raise ConfigError('Cannot set "replace-default-route" without "remote-host"')
#
@@ -849,133 +310,135 @@ def verify(openvpn):
#
# verify specified IP address is present on any interface on this system
- if openvpn['local_host']:
+ if 'local_host' in openvpn:
if not is_addr_assigned(openvpn['local_host']):
- raise ConfigError('No interface on system with specified local-host IP address: {}'.format(openvpn['local_host']))
+ raise ConfigError('local-host IP address "{local_host}" not assigned' \
+ ' to any interface'.format(**openvpn))
# TCP active
if openvpn['protocol'] == 'tcp-active':
- if openvpn['local_port']:
+ if 'local_port' in openvpn:
raise ConfigError('Cannot specify "local-port" with "tcp-active"')
- if not openvpn['remote_host']:
+ if 'remote_host' in openvpn:
raise ConfigError('Must specify "remote-host" with "tcp-active"')
# shared secret and TLS
- if not (openvpn['shared_secret_file'] or openvpn['tls']):
+ if not ('shared_secret_key_file' in openvpn or 'tls' in openvpn):
raise ConfigError('Must specify one of "shared-secret-key-file" and "tls"')
- if openvpn['shared_secret_file'] and openvpn['tls']:
+ if {'shared_secret_key_file', 'tls'} <= set(openvpn):
raise ConfigError('Can only specify one of "shared-secret-key-file" and "tls"')
if openvpn['mode'] in ['client', 'server']:
- if not openvpn['tls']:
- raise ConfigError('Must specify "tls" in client-server mode')
+ if 'tls' not in openvpn:
+ raise ConfigError('Must specify "tls" for server and client mode')
#
# TLS/encryption
#
- if openvpn['shared_secret_file']:
- if openvpn['encryption'] in ['aes128gcm', 'aes192gcm', 'aes256gcm']:
- raise ConfigError('GCM encryption with shared-secret-key-file is not supported')
+ if 'shared_secret_key_file' in openvpn:
+ if dict_search('encryption.cipher', openvpn) in ['aes128gcm', 'aes192gcm', 'aes256gcm']:
+ raise ConfigError('GCM encryption with shared-secret-key-file not supported')
- if not checkCertHeader('-----BEGIN OpenVPN Static key V1-----', openvpn['shared_secret_file']):
- raise ConfigError('Specified shared-secret-key-file "{}" is not valid'.format(openvpn['shared_secret_file']))
+ file = dict_search('shared_secret_key_file', openvpn)
+ if file and not checkCertHeader('-----BEGIN OpenVPN Static key V1-----', file):
+ raise ConfigError(f'Specified shared-secret-key-file "{file}" is not valid')
- if openvpn['tls']:
- if not openvpn['tls_ca_cert']:
+ if 'tls' in openvpn:
+ if 'ca_cert_file' not in openvpn['tls']:
raise ConfigError('Must specify "tls ca-cert-file"')
- if not (openvpn['mode'] == 'client' and openvpn['auth']):
- if not openvpn['tls_cert']:
- raise ConfigError('Must specify "tls cert-file"')
+ if not (openvpn['mode'] == 'client' and 'auth_file' in openvpn['tls']):
+ if 'cert_file' not in openvpn['tls']:
+ raise ConfigError('Missing "tls cert-file"')
- if not openvpn['tls_key']:
- raise ConfigError('Must specify "tls key-file"')
+ if 'key_file' not in openvpn['tls']:
+ raise ConfigError('Missing "tls key-file"')
- if openvpn['tls_auth'] and openvpn['tls_crypt']:
+ if {'auth_file', 'crypt_file'} <= set(openvpn['tls']):
raise ConfigError('TLS auth and crypt are mutually exclusive')
- if not checkCertHeader('-----BEGIN CERTIFICATE-----', openvpn['tls_ca_cert']):
- raise ConfigError('Specified ca-cert-file "{}" is invalid'.format(openvpn['tls_ca_cert']))
+ file = dict_search('tls.ca_cert_file', openvpn)
+ if file and not checkCertHeader('-----BEGIN CERTIFICATE-----', file):
+ raise ConfigError(f'Specified ca-cert-file "{file}" is invalid')
- if openvpn['tls_auth']:
- if not checkCertHeader('-----BEGIN OpenVPN Static key V1-----', openvpn['tls_auth']):
- raise ConfigError('Specified auth-file "{}" is invalid'.format(openvpn['tls_auth']))
+ file = dict_search('tls.auth_file', openvpn)
+ if file and not checkCertHeader('-----BEGIN OpenVPN Static key V1-----', file):
+ raise ConfigError(f'Specified auth-file "{file}" is invalid')
- if openvpn['tls_cert']:
- if not checkCertHeader('-----BEGIN CERTIFICATE-----', openvpn['tls_cert']):
- raise ConfigError('Specified cert-file "{}" is invalid'.format(openvpn['tls_cert']))
+ file = dict_search('tls.cert_file', openvpn)
+ if file and not checkCertHeader('-----BEGIN CERTIFICATE-----', file):
+ raise ConfigError(f'Specified cert-file "{file}" is invalid')
- if openvpn['tls_key']:
- if not checkCertHeader('-----BEGIN (?:RSA |EC )?PRIVATE KEY-----', openvpn['tls_key']):
- raise ConfigError('Specified key-file "{}" is not valid'.format(openvpn['tls_key']))
+ file = dict_search('tls.key_file', openvpn)
+ if file and not checkCertHeader('-----BEGIN (?:RSA |EC )?PRIVATE KEY-----', file):
+ raise ConfigError(f'Specified key-file "{file}" is not valid')
- if openvpn['tls_crypt']:
- if not checkCertHeader('-----BEGIN OpenVPN Static key V1-----', openvpn['tls_crypt']):
- raise ConfigError('Specified TLS crypt-file "{}" is invalid'.format(openvpn['tls_crypt']))
+ file = dict_search('tls.crypt_file', openvpn)
+ if file and not checkCertHeader('-----BEGIN OpenVPN Static key V1-----', file):
+ raise ConfigError(f'Specified TLS crypt-file "{file}" is invalid')
- if openvpn['tls_crl']:
- if not checkCertHeader('-----BEGIN X509 CRL-----', openvpn['tls_crl']):
- raise ConfigError('Specified crl-file "{} not valid'.format(openvpn['tls_crl']))
+ file = dict_search('tls.crl_file', openvpn)
+ if file and not checkCertHeader('-----BEGIN X509 CRL-----', file):
+ raise ConfigError(f'Specified crl-file "{file} not valid')
- if openvpn['tls_dh'] and openvpn['tls_dh'] != 'none':
- if not checkCertHeader('-----BEGIN DH PARAMETERS-----', openvpn['tls_dh']):
- raise ConfigError('Specified dh-file "{}" is not valid'.format(openvpn['tls_dh']))
+ file = dict_search('tls.dh_file', openvpn)
+ if file and not checkCertHeader('-----BEGIN DH PARAMETERS-----', file):
+ raise ConfigError(f'Specified dh-file "{file}" is not valid')
- if openvpn['tls_role']:
+ if file and not verify_diffie_hellman_length(file, 2048):
+ raise ConfigError(f'Minimum DH key-size is 2048 bits')
+
+ tmp = dict_search('tls.role', openvpn)
+ if tmp:
if openvpn['mode'] in ['client', 'server']:
- if not openvpn['tls_auth']:
+ if not dict_search('tls.auth_file', openvpn):
raise ConfigError('Cannot specify "tls role" in client-server mode')
- if openvpn['tls_role'] == 'active':
+ if tmp == 'active':
if openvpn['protocol'] == 'tcp-passive':
raise ConfigError('Cannot specify "tcp-passive" when "tls role" is "active"')
- if openvpn['tls_dh'] and openvpn['tls_dh'] != 'none':
+ if dict_search('tls.dh_file', openvpn):
raise ConfigError('Cannot specify "tls dh-file" when "tls role" is "active"')
- elif openvpn['tls_role'] == 'passive':
+ elif tmp == 'passive':
if openvpn['protocol'] == 'tcp-active':
raise ConfigError('Cannot specify "tcp-active" when "tls role" is "passive"')
- if not openvpn['tls_dh']:
+ if not dict_search('tls.dh_file', openvpn):
raise ConfigError('Must specify "tls dh-file" when "tls role" is "passive"')
- if openvpn['tls_key'] and checkCertHeader('-----BEGIN EC PRIVATE KEY-----', openvpn['tls_key']):
- if openvpn['tls_dh'] and openvpn['tls_dh'] != 'none':
- print('Warning: using dh-file and EC keys simultaneously will lead to DH ciphers being used instead of ECDH')
- else:
- print('Diffie-Hellman prime file is unspecified, assuming ECDH')
+ file = dict_search('tls.key_file', openvpn)
+ if file and checkCertHeader('-----BEGIN EC PRIVATE KEY-----', file):
+ if dict_search('tls.dh_file', openvpn):
+ print('Warning: using dh-file and EC keys simultaneously will ' \
+ 'lead to DH ciphers being used instead of ECDH')
- if openvpn['encryption'] == 'none':
- print('Warning: "encryption none" was specified. NO encryption will be performed and tunnelled data WILL be transmitted in clear text over the network!')
+ if dict_search('encryption.cipher', openvpn) == 'none':
+ print('Warning: "encryption none" was specified!')
+ print('No encryption will be performed and data is transmitted in ' \
+ 'plain text over the network!')
#
# Auth user/pass
#
- if openvpn['auth']:
- if not openvpn['auth_user']:
- raise ConfigError('Username for authentication is missing')
-
- if not openvpn['auth_pass']:
+ if (dict_search('authentication.username', openvpn) and not
+ dict_search('authentication.password', openvpn)):
raise ConfigError('Password for authentication is missing')
- if openvpn['vrf']:
- if openvpn['vrf'] not in interfaces():
- raise ConfigError(f'VRF "{openvpn["vrf"]}" does not exist')
+ if (dict_search('authentication.password', openvpn) and not
+ dict_search('authentication.username', openvpn)):
+ raise ConfigError('Username for authentication is missing')
- if openvpn['is_bridge_member']:
- raise ConfigError((
- f'Interface "{openvpn["intf"]}" cannot be member of VRF '
- f'"{openvpn["vrf"]}" and bridge "{openvpn["is_bridge_member"]}" '
- f'at the same time!'))
+ verify_vrf(openvpn)
return None
def generate(openvpn):
- interface = openvpn['intf']
- directory = os.path.dirname(get_config_name(interface))
+ interface = openvpn['ifname']
+ directory = os.path.dirname(cfg_file.format(**openvpn))
# we can't know in advance which clients have been removed,
# thus all client configs will be removed and re-added on demand
@@ -983,25 +446,20 @@ def generate(openvpn):
if os.path.isdir(ccd_dir):
rmtree(ccd_dir, ignore_errors=True)
- if openvpn['deleted'] or openvpn['disable']:
+ if 'deleted' in openvpn or 'disable' in openvpn:
return None
- # create config directory on demand
- directories = []
- directories.append(f'{directory}/status')
- directories.append(f'{directory}/ccd/{interface}')
- for onedir in directories:
- if not os.path.exists(onedir):
- os.makedirs(onedir, 0o755)
- chown(onedir, user, group)
-
# Fix file permissons for keys
fix_permissions = []
- fix_permissions.append(openvpn['shared_secret_file'])
- fix_permissions.append(openvpn['tls_key'])
+
+ tmp = dict_search('shared_secret_key_file', openvpn)
+ if tmp: fix_permissions.append(openvpn['shared_secret_key_file'])
+
+ tmp = dict_search('tls.key_file', openvpn)
+ if tmp: fix_permissions.append(tmp)
# Generate User/Password authentication file
- if openvpn['auth']:
+ if 'auth' in openvpn:
with open(openvpn['auth_user_pass_file'], 'w') as f:
f.write('{}\n{}'.format(openvpn['auth_user'], openvpn['auth_pass']))
# also change permission on auth file
@@ -1013,16 +471,20 @@ def generate(openvpn):
os.remove(openvpn['auth_user_pass_file'])
# Generate client specific configuration
- for client in openvpn['client']:
- client_file = os.path.join(ccd_dir, client['name'])
- render(client_file, 'openvpn/client.conf.tmpl', client)
- chown(client_file, user, group)
+ if dict_search('server.client', openvpn):
+ for client, client_config in dict_search('server.client', openvpn).items():
+ client_file = os.path.join(ccd_dir, client)
+
+ # Our client need's to know its subnet mask ...
+ client_config['subnet'] = dict_search('server.subnet', openvpn)
+ render(client_file, 'openvpn/client.conf.tmpl', client_config,
+ trim_blocks=True, user=user, group=group)
# we need to support quoting of raw parameters from OpenVPN CLI
# see https://phabricator.vyos.net/T1632
- render(get_config_name(interface), 'openvpn/server.conf.tmpl', openvpn,
- formater=lambda _: _.replace("&quot;", '"'))
- chown(get_config_name(interface), user, group)
+ render(cfg_file.format(**openvpn), 'openvpn/server.conf.tmpl', openvpn,
+ trim_blocks=True, formater=lambda _: _.replace("&quot;", '"'),
+ user=user, group=group)
# Fixup file permissions
for file in fix_permissions:
@@ -1031,75 +493,34 @@ def generate(openvpn):
return None
def apply(openvpn):
- interface = openvpn['intf']
+ interface = openvpn['ifname']
call(f'systemctl stop openvpn@{interface}.service')
- # On configuration change we need to wait for the 'old' interface to
- # vanish from the Kernel, if it is not gone, OpenVPN will report:
- # ERROR: Cannot ioctl TUNSETIFF vtun10: Device or resource busy (errno=16)
- if interface in interfaces():
- cmd(f'sudo ip link del {interface}')
-
# Do some cleanup when OpenVPN is disabled/deleted
- if openvpn['deleted'] or openvpn['disable']:
+ if 'deleted' in openvpn or 'disable' in openvpn:
# cleanup old configuration files
cleanup = []
- cleanup.append(get_config_name(interface))
+ cleanup.append(cfg_file.format(**openvpn))
cleanup.append(openvpn['auth_user_pass_file'])
for file in cleanup:
if os.path.isfile(file):
os.unlink(file)
+ if interface in interfaces():
+ VTunIf(interface).remove()
+
return None
# No matching OpenVPN process running - maybe it got killed or none
# existed - nevertheless, spawn new OpenVPN process
call(f'systemctl start openvpn@{interface}.service')
- if interface not in interfaces():
- try:
- dev_type = openvpn['type']
- cmd(f'sudo openvpn --mktun --dev-type {dev_type} --dev {interface}')
- except:
- pass
-
- # we need to catch the exception if the interface is not up due to
- # reason stated above
- o = VTunIf(interface)
- # update interface description used e.g. within SNMP
- o.set_alias(openvpn['description'])
- # IPv6 accept RA
- o.set_ipv6_accept_ra(openvpn['ipv6_accept_ra'])
- # IPv6 address autoconfiguration
- o.set_ipv6_autoconf(openvpn['ipv6_autoconf'])
- # IPv6 forwarding
- o.set_ipv6_forwarding(openvpn['ipv6_forwarding'])
- # IPv6 Duplicate Address Detection (DAD) tries
- o.set_ipv6_dad_messages(openvpn['ipv6_dup_addr_detect'])
-
- # IPv6 EUI-based addresses - only in TAP mode (TUN's have no MAC)
- # If MAC has changed, old EUI64 addresses won't get deleted,
- # but this isn't easy to solve, so leave them.
- # This is even more difficult as openvpn uses a random MAC for the
- # initial interface creation, unless set by 'lladdr'.
- # NOTE: right now the interface is always deleted. For future
- # compatibility when tap's are not deleted, leave the del_ in
- if openvpn['mode'] == 'tap':
- for addr in openvpn['ipv6_eui64_prefix_remove']:
- o.del_ipv6_eui64_address(addr)
- for addr in openvpn['ipv6_eui64_prefix']:
- o.add_ipv6_eui64_address(addr)
-
- # assign/remove VRF (ONLY when not a member of a bridge,
- # otherwise 'nomaster' removes it from it)
- if not openvpn['is_bridge_member']:
- o.set_vrf(openvpn['vrf'])
-
- # TAP interface needs to be brought up explicitly
- if openvpn['type'] == 'tap':
- if not openvpn['disable']:
- VTunIf(interface).set_admin_state('up')
+ conf = VTunIf.get_config()
+ conf['device_type'] = openvpn['device_type']
+
+ o = VTunIf(interface, **conf)
+ o.update(openvpn)
return None
@@ -1113,3 +534,4 @@ if __name__ == '__main__':
except ConfigError as e:
print(e)
exit(1)
+
diff --git a/src/conf_mode/interfaces-wireless.py b/src/conf_mode/interfaces-wireless.py
index c1770771e..a18a21b83 100755
--- a/src/conf_mode/interfaces-wireless.py
+++ b/src/conf_mode/interfaces-wireless.py
@@ -32,7 +32,7 @@ from vyos.configverify import verify_vrf
from vyos.ifconfig import WiFiIf
from vyos.template import render
from vyos.util import call
-from vyos.util import vyos_dict_search
+from vyos.util import dict_search
from vyos import ConfigError
from vyos import airbag
airbag.enable()
@@ -80,13 +80,13 @@ def get_config(config=None):
# Cleanup "delete" default values when required user selectable values are
# not defined at all
tmp = conf.get_config_dict([], key_mangling=('-', '_'), get_first_key=True)
- if not (vyos_dict_search('security.wpa.passphrase', tmp) or
- vyos_dict_search('security.wpa.radius', tmp)):
+ if not (dict_search('security.wpa.passphrase', tmp) or
+ dict_search('security.wpa.radius', tmp)):
del wifi['security']['wpa']
# defaults include RADIUS server specifics per TAG node which need to be
# added to individual RADIUS servers instead - so we can simply delete them
- if vyos_dict_search('security.wpa.radius.server.port', wifi):
+ if dict_search('security.wpa.radius.server.port', wifi):
del wifi['security']['wpa']['radius']['server']['port']
if not len(wifi['security']['wpa']['radius']['server']):
del wifi['security']['wpa']['radius']
@@ -119,10 +119,10 @@ def get_config(config=None):
if tmp: wifi['station_interfaces'] = tmp
# Add individual RADIUS server default values
- if vyos_dict_search('security.wpa.radius.server', wifi):
+ if dict_search('security.wpa.radius.server', wifi):
default_values = defaults(base + ['security', 'wpa', 'radius', 'server'])
- for server in vyos_dict_search('security.wpa.radius.server', wifi):
+ for server in dict_search('security.wpa.radius.server', wifi):
wifi['security']['wpa']['radius']['server'][server] = dict_merge(
default_values, wifi['security']['wpa']['radius']['server'][server])
@@ -241,7 +241,7 @@ def generate(wifi):
wifi['mac'] = str(mac)
# XXX: Jinja2 can not operate on a dictionary key when it starts of with a number
- if '40mhz_incapable' in (vyos_dict_search('capabilities.ht', wifi) or []):
+ if '40mhz_incapable' in (dict_search('capabilities.ht', wifi) or []):
wifi['capabilities']['ht']['fourtymhz_incapable'] = wifi['capabilities']['ht']['40mhz_incapable']
del wifi['capabilities']['ht']['40mhz_incapable']
diff --git a/src/conf_mode/service_pppoe-server.py b/src/conf_mode/service_pppoe-server.py
index a520120f8..2260b3fe1 100755
--- a/src/conf_mode/service_pppoe-server.py
+++ b/src/conf_mode/service_pppoe-server.py
@@ -23,7 +23,7 @@ from vyos.configdict import get_accel_dict
from vyos.configverify import verify_accel_ppp_base_service
from vyos.template import render
from vyos.util import call
-from vyos.util import vyos_dict_search
+from vyos.util import dict_search
from vyos import ConfigError
from vyos import airbag
airbag.enable()
@@ -57,13 +57,13 @@ def verify(pppoe):
raise ConfigError('At least one listen interface must be defined!')
# local ippool and gateway settings config checks
- if not (vyos_dict_search('client_ip_pool.subnet', pppoe) or
- (vyos_dict_search('client_ip_pool.start', pppoe) and
- vyos_dict_search('client_ip_pool.stop', pppoe))):
+ if not (dict_search('client_ip_pool.subnet', pppoe) or
+ (dict_search('client_ip_pool.start', pppoe) and
+ dict_search('client_ip_pool.stop', pppoe))):
print('Warning: No PPPoE client pool defined')
- if vyos_dict_search('authentication.radius.dynamic_author.server', pppoe):
- if not vyos_dict_search('authentication.radius.dynamic_author.key', pppoe):
+ if dict_search('authentication.radius.dynamic_author.server', pppoe):
+ if not dict_search('authentication.radius.dynamic_author.key', pppoe):
raise ConfigError('DA/CoE server key required!')
return None
@@ -75,7 +75,7 @@ def generate(pppoe):
render(pppoe_conf, 'accel-ppp/pppoe.config.tmpl', pppoe, trim_blocks=True)
- if vyos_dict_search('authentication.mode', pppoe) == 'local':
+ if dict_search('authentication.mode', pppoe) == 'local':
render(pppoe_chap_secrets, 'accel-ppp/chap-secrets.config_dict.tmpl',
pppoe, trim_blocks=True, permission=0o640)
else:
diff --git a/src/conf_mode/ssh.py b/src/conf_mode/ssh.py
index a19fa72d8..8d8e9e4c4 100755
--- a/src/conf_mode/ssh.py
+++ b/src/conf_mode/ssh.py
@@ -21,10 +21,11 @@ from sys import exit
from vyos.config import Config
from vyos.configdict import dict_merge
-from vyos import ConfigError
+from vyos.configverify import verify_vrf
from vyos.util import call
from vyos.template import render
from vyos.xml import defaults
+from vyos import ConfigError
from vyos import airbag
airbag.enable()
@@ -54,9 +55,7 @@ def verify(ssh):
if not ssh:
return None
- if 'vrf' in ssh.keys() and ssh['vrf'] not in interfaces():
- raise ConfigError('VRF "{vrf}" does not exist'.format(**ssh))
-
+ verify_vrf(ssh)
return None
def generate(ssh):
diff --git a/src/conf_mode/vpn_sstp.py b/src/conf_mode/vpn_sstp.py
index 2597ba42f..1b2b80ce5 100755
--- a/src/conf_mode/vpn_sstp.py
+++ b/src/conf_mode/vpn_sstp.py
@@ -23,7 +23,7 @@ from vyos.configdict import get_accel_dict
from vyos.configverify import verify_accel_ppp_base_service
from vyos.template import render
from vyos.util import call
-from vyos.util import vyos_dict_search
+from vyos.util import dict_search
from vyos import ConfigError
from vyos import airbag
airbag.enable()
@@ -56,21 +56,21 @@ def verify(sstp):
#
# SSL certificate checks
#
- tmp = vyos_dict_search('ssl.ca_cert_file', sstp)
+ tmp = dict_search('ssl.ca_cert_file', sstp)
if not tmp:
raise ConfigError(f'SSL CA certificate file required!')
else:
if not os.path.isfile(tmp):
raise ConfigError(f'SSL CA certificate "{tmp}" does not exist!')
- tmp = vyos_dict_search('ssl.cert_file', sstp)
+ tmp = dict_search('ssl.cert_file', sstp)
if not tmp:
raise ConfigError(f'SSL public key file required!')
else:
if not os.path.isfile(tmp):
raise ConfigError(f'SSL public key "{tmp}" does not exist!')
- tmp = vyos_dict_search('ssl.key_file', sstp)
+ tmp = dict_search('ssl.key_file', sstp)
if not tmp:
raise ConfigError(f'SSL private key file required!')
else:
@@ -84,7 +84,7 @@ def generate(sstp):
# accel-cmd reload doesn't work so any change results in a restart of the daemon
render(sstp_conf, 'accel-ppp/sstp.config.tmpl', sstp, trim_blocks=True)
- if vyos_dict_search('authentication.mode', sstp) == 'local':
+ if dict_search('authentication.mode', sstp) == 'local':
render(sstp_chap_secrets, 'accel-ppp/chap-secrets.config_dict.tmpl',
sstp, trim_blocks=True, permission=0o640)
else:
diff --git a/src/services/vyos-hostsd b/src/services/vyos-hostsd
index 59dbeda17..4c4bb036e 100755
--- a/src/services/vyos-hostsd
+++ b/src/services/vyos-hostsd
@@ -589,7 +589,7 @@ if __name__ == '__main__':
socket = context.socket(zmq.REP)
# Set the right permissions on the socket, then change it back
- o_mask = os.umask(0o007)
+ o_mask = os.umask(0o000)
socket.bind(SOCKET_PATH)
os.umask(o_mask)
diff --git a/src/tests/test_template.py b/src/tests/test_configverify.py
index 0b9f2c3b8..ad7e053db 100644
--- a/src/tests/test_template.py
+++ b/src/tests/test_configverify.py
@@ -15,17 +15,24 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from unittest import TestCase
+from vyos.configverify import verify_diffie_hellman_length
+from vyos.util import cmd
-from vyos.template import vyos_address_from_cidr
-from vyos.template import vyos_netmask_from_cidr
+dh_file = '/tmp/dh.pem'
-
-class TestTeamplteHelpers(TestCase):
+class TestDictSearch(TestCase):
def setUp(self):
pass
- def test_helpers_from_cidr(self):
- network = '192.0.2.0/26'
- self.assertEqual(vyos_address_from_cidr(network), '192.0.2.0')
- self.assertEqual(vyos_netmask_from_cidr(network), '255.255.255.192')
+ def test_dh_key_none(self):
+ self.assertFalse(verify_diffie_hellman_length('/tmp/non_existing_file', '1024'))
+
+ def test_dh_key_256(self):
+ key_len = '256'
+ cmd(f'openssl dhparam -out {dh_file} {key_len}')
+ self.assertTrue(verify_diffie_hellman_length(dh_file, key_len))
+ def test_dh_key_512(self):
+ key_len = '512'
+ cmd(f'openssl dhparam -out {dh_file} {key_len}')
+ self.assertTrue(verify_diffie_hellman_length(dh_file, key_len))
diff --git a/src/tests/test_vyos_dict_search.py b/src/tests/test_dict_search.py
index cba6562da..6a0fc74ad 100644
--- a/src/tests/test_vyos_dict_search.py
+++ b/src/tests/test_dict_search.py
@@ -15,7 +15,7 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from unittest import TestCase
-from vyos.util import vyos_dict_search
+from vyos.util import dict_search
data = {
'string': 'fooo',
@@ -29,29 +29,29 @@ class TestDictSearch(TestCase):
pass
def test_non_existing_keys(self):
- """ TestDictSearch: Return False when querying for non-existent key """
- self.assertFalse(vyos_dict_search('non_existing', data))
+ # TestDictSearch: Return False when querying for non-existent key
+ self.assertFalse(dict_search('non_existing', data))
def test_string(self):
- """ TestDictSearch: Return value when querying string """
- self.assertEqual(vyos_dict_search('string', data), data['string'])
+ # TestDictSearch: Return value when querying string
+ self.assertEqual(dict_search('string', data), data['string'])
def test_list(self):
- """ TestDictSearch: Return list items when querying list """
- self.assertEqual(vyos_dict_search('list', data), data['list'])
+ # TestDictSearch: Return list items when querying list
+ self.assertEqual(dict_search('list', data), data['list'])
def test_dict_key_value(self):
- """ TestDictSearch: Return dictionary keys value when value is present """
- self.assertEqual(vyos_dict_search('dict.key_2', data), data['dict']['key_2'])
+ # TestDictSearch: Return dictionary keys value when value is present
+ self.assertEqual(dict_search('dict.key_2', data), data['dict']['key_2'])
def test_nested_dict_key_value(self):
- """ TestDictSearch: Return string value of last key when querying for a nested string """
- self.assertEqual(vyos_dict_search('nested.string', data), data['nested']['string'])
+ # TestDictSearch: Return string value of last key when querying for a nested string
+ self.assertEqual(dict_search('nested.string', data), data['nested']['string'])
def test_nested_dict_key_empty(self):
- """ TestDictSearch: Return False when querying for a nested string whose last key is empty """
- self.assertFalse(vyos_dict_search('nested.empty', data))
+ # TestDictSearch: Return False when querying for a nested string whose last key is empty
+ self.assertFalse(dict_search('nested.empty', data))
def test_nested_list(self):
- """ TestDictSearch: Return list items when querying nested list """
- self.assertEqual(vyos_dict_search('nested.list', data), data['nested']['list'])
+ # TestDictSearch: Return list items when querying nested list
+ self.assertEqual(dict_search('nested.list', data), data['nested']['list'])
diff --git a/src/tests/test_jinja_filters.py b/src/tests/test_jinja_filters.py
new file mode 100644
index 000000000..acd7a5952
--- /dev/null
+++ b/src/tests/test_jinja_filters.py
@@ -0,0 +1,69 @@
+#!/usr/bin/env python3
+#
+# Copyright (C) 2020 VyOS maintainers and contributors
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License version 2 or later as
+# published by the Free Software Foundation.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+from unittest import TestCase
+
+from ipaddress import ip_network
+from vyos.template import vyos_address_from_cidr
+from vyos.template import vyos_netmask_from_cidr
+from vyos.template import vyos_ipv4
+from vyos.template import vyos_ipv6
+from vyos.template import vyos_first_host_address
+from vyos.template import vyos_last_host_address
+from vyos.template import vyos_inc_ip
+
+class TestTeamplteHelpers(TestCase):
+ def setUp(self):
+ pass
+
+ def test_helpers_from_cidr(self):
+ network_v4 = '192.0.2.0/26'
+ self.assertEqual(vyos_address_from_cidr(network_v4), str(ip_network(network_v4).network_address))
+ self.assertEqual(vyos_netmask_from_cidr(network_v4), str(ip_network(network_v4).netmask))
+
+ def test_helpers_ipv4(self):
+ self.assertTrue(vyos_ipv4('192.0.2.1'))
+ self.assertTrue(vyos_ipv4('192.0.2.0/24'))
+ self.assertTrue(vyos_ipv4('192.0.2.1/32'))
+ self.assertTrue(vyos_ipv4('10.255.1.2'))
+ self.assertTrue(vyos_ipv4('10.255.1.0/24'))
+ self.assertTrue(vyos_ipv4('10.255.1.2/32'))
+ self.assertFalse(vyos_ipv4('2001:db8::'))
+ self.assertFalse(vyos_ipv4('2001:db8::1'))
+ self.assertFalse(vyos_ipv4('2001:db8::/64'))
+
+ def test_helpers_ipv6(self):
+ self.assertFalse(vyos_ipv6('192.0.2.1'))
+ self.assertFalse(vyos_ipv6('192.0.2.0/24'))
+ self.assertFalse(vyos_ipv6('192.0.2.1/32'))
+ self.assertFalse(vyos_ipv6('10.255.1.2'))
+ self.assertFalse(vyos_ipv6('10.255.1.0/24'))
+ self.assertFalse(vyos_ipv6('10.255.1.2/32'))
+ self.assertTrue(vyos_ipv6('2001:db8::'))
+ self.assertTrue(vyos_ipv6('2001:db8::1'))
+ self.assertTrue(vyos_ipv6('2001:db8::1/64'))
+ self.assertTrue(vyos_ipv6('2001:db8::/32'))
+ self.assertTrue(vyos_ipv6('2001:db8::/64'))
+
+ def test_helpers_first_host_address(self):
+ self.assertEqual(vyos_first_host_address('10.0.0.0/24'), '10.0.0.1')
+ self.assertEqual(vyos_first_host_address('10.0.0.128/25'), '10.0.0.129')
+ self.assertEqual(vyos_first_host_address('10.0.0.200/29'), '10.0.0.201')
+
+ self.assertEqual(vyos_first_host_address('2001:db8::/64'), '2001:db8::')
+ self.assertEqual(vyos_first_host_address('2001:db8::/112'), '2001:db8::')
+ self.assertEqual(vyos_first_host_address('2001:db8::10/112'), '2001:db8::10')
+ self.assertEqual(vyos_first_host_address('2001:db8::100/112'), '2001:db8::100')
diff --git a/src/tests/test_validate.py b/src/tests/test_validate.py
new file mode 100644
index 000000000..e9fe185ed
--- /dev/null
+++ b/src/tests/test_validate.py
@@ -0,0 +1,64 @@
+#!/usr/bin/env python3
+#
+# Copyright (C) 2020 VyOS maintainers and contributors
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License version 2 or later as
+# published by the Free Software Foundation.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+import vyos.validate
+from unittest import TestCase
+
+class TestVyOSValidate(TestCase):
+ def setUp(self):
+ pass
+
+ def test_is_ip(self):
+ self.assertTrue(vyos.validate.is_ip('192.0.2.1'))
+ self.assertTrue(vyos.validate.is_ip('2001:db8::1'))
+ self.assertFalse(vyos.validate.is_ip('VyOS'))
+
+ def test_is_ipv4(self):
+ self.assertTrue(vyos.validate.is_ipv4('192.0.2.1'))
+ self.assertTrue(vyos.validate.is_ipv4('192.0.2.0/24'))
+ self.assertTrue(vyos.validate.is_ipv4('192.0.2.1/32'))
+
+ self.assertFalse(vyos.validate.is_ipv4('2001:db8::1'))
+ self.assertFalse(vyos.validate.is_ipv4('2001:db8::/64'))
+ self.assertFalse(vyos.validate.is_ipv4('VyOS'))
+
+ def test_is_ipv6(self):
+ self.assertFalse(vyos.validate.is_ipv6('192.0.2.1'))
+ self.assertFalse(vyos.validate.is_ipv6('192.0.2.0/24'))
+ self.assertFalse(vyos.validate.is_ipv6('192.0.2.1/32'))
+ self.assertTrue(vyos.validate.is_ipv6('2001:db8::1'))
+ self.assertTrue(vyos.validate.is_ipv6('2001:db8::/64'))
+ self.assertTrue(vyos.validate.is_ipv6('2001:db8::1/64'))
+ self.assertFalse(vyos.validate.is_ipv6('VyOS'))
+
+ def test_is_ipv6_link_local(self):
+ self.assertFalse(vyos.validate.is_ipv6_link_local('169.254.0.1'))
+ self.assertTrue(vyos.validate.is_ipv6_link_local('fe80::'))
+ self.assertTrue(vyos.validate.is_ipv6_link_local('fe80::affe:1'))
+ self.assertFalse(vyos.validate.is_ipv6_link_local('2001:db8::'))
+ self.assertFalse(vyos.validate.is_ipv6_link_local('VyOS'))
+
+ def test_is_ipv6_link_local(self):
+ self.assertTrue(vyos.validate.is_loopback_addr('127.0.0.1'))
+ self.assertTrue(vyos.validate.is_loopback_addr('127.0.1.1'))
+ self.assertTrue(vyos.validate.is_loopback_addr('127.1.1.1'))
+ self.assertTrue(vyos.validate.is_loopback_addr('::1'))
+
+ self.assertFalse(vyos.validate.is_loopback_addr('::2'))
+ self.assertFalse(vyos.validate.is_loopback_addr('192.0.2.1'))
+
+
+