summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rwxr-xr-xsrc/conf_mode/service_upnp.py44
1 files changed, 23 insertions, 21 deletions
diff --git a/src/conf_mode/service_upnp.py b/src/conf_mode/service_upnp.py
index 638296f45..d21b31990 100755
--- a/src/conf_mode/service_upnp.py
+++ b/src/conf_mode/service_upnp.py
@@ -1,6 +1,6 @@
#!/usr/bin/env python3
#
-# Copyright (C) 2021 VyOS maintainers and contributors
+# Copyright (C) 2021-2022 VyOS maintainers and contributors
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 or later as
@@ -24,7 +24,6 @@ from ipaddress import IPv6Network
from vyos.config import Config
from vyos.configdict import dict_merge
-from vyos.configdict import dict_search
from vyos.configdict import get_interface_dict
from vyos.configverify import verify_vrf
from vyos.util import call
@@ -43,17 +42,18 @@ def get_config(config=None):
conf = config
else:
conf = Config()
+
base = ['service', 'upnp']
upnpd = conf.get_config_dict(base, key_mangling=('-', '_'), get_first_key=True)
-
+
if not upnpd:
return None
-
- if dict_search('rule', upnpd):
+
+ if 'rule' in upnpd:
default_member_values = defaults(base + ['rule'])
for rule,rule_config in upnpd['rule'].items():
upnpd['rule'][rule] = dict_merge(default_member_values, upnpd['rule'][rule])
-
+
uuidgen = uuid.uuid1()
upnpd.update({'uuid': uuidgen})
@@ -62,7 +62,7 @@ def get_config(config=None):
def get_all_interface_addr(prefix, filter_dev, filter_family):
list_addr = []
interfaces = netifaces.interfaces()
-
+
for interface in interfaces:
if filter_dev and interface in filter_dev:
continue
@@ -87,27 +87,28 @@ def get_all_interface_addr(prefix, filter_dev, filter_family):
list_addr.append(addr['addr'] + prefix)
else:
list_addr.append(addr['addr'])
-
+
return list_addr
def verify(upnpd):
if not upnpd:
return None
-
+
if 'wan_interface' not in upnpd:
raise ConfigError('To enable UPNP, you must have the "wan-interface" option!')
-
- if dict_search('rules', upnpd):
- for rule,rule_config in upnpd['rule'].items():
+
+ if 'rule' in upnpd:
+ for rule, rule_config in upnpd['rule'].items():
for option in ['external_port_range', 'internal_port_range', 'ip', 'action']:
if option not in rule_config:
- raise ConfigError(f'A UPNP rule must have an "{option}" option!')
-
- if dict_search('stun', upnpd):
+ tmp = option.replace('_', '-')
+ raise ConfigError(f'Every UPNP rule requires "{tmp}" to be set!')
+
+ if 'stun' in upnpd:
for option in ['host', 'port']:
if option not in upnpd['stun']:
raise ConfigError(f'A UPNP stun support must have an "{option}" option!')
-
+
# Check the validity of the IP address
listen_dev = []
system_addrs_cidr = get_all_interface_addr(True, [], [netifaces.AF_INET, netifaces.AF_INET6])
@@ -120,7 +121,7 @@ def verify(upnpd):
raise ConfigError(f'The address "{listen_if_or_addr}" is an address that is not allowed to listen on. It is not an interface address nor a multicast address!')
if is_ipv6(listen_if_or_addr) and IPv6Network(listen_if_or_addr).is_multicast:
raise ConfigError(f'The address "{listen_if_or_addr}" is an address that is not allowed to listen on. It is not an interface address nor a multicast address!')
-
+
system_listening_dev_addrs_cidr = get_all_interface_addr(True, listen_dev, [netifaces.AF_INET6])
system_listening_dev_addrs = get_all_interface_addr(False, listen_dev, [netifaces.AF_INET6])
for listen_if_or_addr in upnpd['listen']:
@@ -130,19 +131,20 @@ def verify(upnpd):
def generate(upnpd):
if not upnpd:
return None
-
+
if os.path.isfile(config_file):
os.unlink(config_file)
-
+
render(config_file, 'firewall/upnpd.conf.tmpl', upnpd)
def apply(upnpd):
+ systemd_service_name = 'miniupnpd.service'
if not upnpd:
# Stop the UPNP service
- call('systemctl stop miniupnpd.service')
+ call(f'systemctl stop {systemd_service_name}')
else:
# Start the UPNP service
- call('systemctl restart miniupnpd.service')
+ call(f'systemctl restart {systemd_service_name}')
if __name__ == '__main__':
try: