diff options
| -rwxr-xr-x | src/conf_mode/service_upnp.py | 44 | 
1 files changed, 23 insertions, 21 deletions
diff --git a/src/conf_mode/service_upnp.py b/src/conf_mode/service_upnp.py index 638296f45..d21b31990 100755 --- a/src/conf_mode/service_upnp.py +++ b/src/conf_mode/service_upnp.py @@ -1,6 +1,6 @@  #!/usr/bin/env python3  # -# Copyright (C) 2021 VyOS maintainers and contributors +# Copyright (C) 2021-2022 VyOS maintainers and contributors  #  # This program is free software; you can redistribute it and/or modify  # it under the terms of the GNU General Public License version 2 or later as @@ -24,7 +24,6 @@ from ipaddress import IPv6Network  from vyos.config import Config  from vyos.configdict import dict_merge -from vyos.configdict import dict_search  from vyos.configdict import get_interface_dict  from vyos.configverify import verify_vrf  from vyos.util import call @@ -43,17 +42,18 @@ def get_config(config=None):          conf = config      else:          conf = Config() +      base = ['service', 'upnp']      upnpd = conf.get_config_dict(base, key_mangling=('-', '_'), get_first_key=True) -     +      if not upnpd:          return None -     -    if dict_search('rule', upnpd): + +    if 'rule' in upnpd:          default_member_values = defaults(base + ['rule'])          for rule,rule_config in upnpd['rule'].items():              upnpd['rule'][rule] = dict_merge(default_member_values, upnpd['rule'][rule]) -     +      uuidgen = uuid.uuid1()      upnpd.update({'uuid': uuidgen}) @@ -62,7 +62,7 @@ def get_config(config=None):  def get_all_interface_addr(prefix, filter_dev, filter_family):      list_addr = []      interfaces = netifaces.interfaces() -     +      for interface in interfaces:          if filter_dev and interface in filter_dev:              continue @@ -87,27 +87,28 @@ def get_all_interface_addr(prefix, filter_dev, filter_family):                          list_addr.append(addr['addr'] + prefix)                      else:                          list_addr.append(addr['addr']) -     +      return list_addr  def verify(upnpd):      if not upnpd:          return None -     +      if 'wan_interface' not in upnpd:          raise ConfigError('To enable UPNP, you must have the "wan-interface" option!') -     -    if dict_search('rules', upnpd): -        for rule,rule_config in upnpd['rule'].items(): + +    if 'rule' in upnpd: +        for rule, rule_config in upnpd['rule'].items():              for option in ['external_port_range', 'internal_port_range', 'ip', 'action']:                  if option not in rule_config: -                    raise ConfigError(f'A UPNP rule must have an "{option}" option!') -     -    if dict_search('stun', upnpd): +                    tmp = option.replace('_', '-') +                    raise ConfigError(f'Every UPNP rule requires "{tmp}" to be set!') + +    if 'stun' in upnpd:          for option in ['host', 'port']:              if option not in upnpd['stun']:                  raise ConfigError(f'A UPNP stun support must have an "{option}" option!') -     +      # Check the validity of the IP address      listen_dev = []      system_addrs_cidr = get_all_interface_addr(True, [], [netifaces.AF_INET, netifaces.AF_INET6]) @@ -120,7 +121,7 @@ def verify(upnpd):                  raise ConfigError(f'The address "{listen_if_or_addr}" is an address that is not allowed to listen on. It is not an interface address nor a multicast address!')              if is_ipv6(listen_if_or_addr) and IPv6Network(listen_if_or_addr).is_multicast:                  raise ConfigError(f'The address "{listen_if_or_addr}" is an address that is not allowed to listen on. It is not an interface address nor a multicast address!') -     +      system_listening_dev_addrs_cidr = get_all_interface_addr(True, listen_dev, [netifaces.AF_INET6])      system_listening_dev_addrs = get_all_interface_addr(False, listen_dev, [netifaces.AF_INET6])      for listen_if_or_addr in upnpd['listen']: @@ -130,19 +131,20 @@ def verify(upnpd):  def generate(upnpd):      if not upnpd:          return None -     +      if os.path.isfile(config_file):          os.unlink(config_file) -     +      render(config_file, 'firewall/upnpd.conf.tmpl', upnpd)  def apply(upnpd): +    systemd_service_name = 'miniupnpd.service'      if not upnpd:          # Stop the UPNP service -        call('systemctl stop miniupnpd.service') +        call(f'systemctl stop {systemd_service_name}')      else:          # Start the UPNP service -        call('systemctl restart miniupnpd.service') +        call(f'systemctl restart {systemd_service_name}')  if __name__ == '__main__':      try:  | 
