diff options
Diffstat (limited to 'python')
| -rw-r--r-- | python/vyos/component_version.py | 2 | ||||
| -rw-r--r-- | python/vyos/config.py | 102 | ||||
| -rw-r--r-- | python/vyos/configdict.py | 65 | ||||
| -rw-r--r-- | python/vyos/configdiff.py | 18 | ||||
| -rw-r--r-- | python/vyos/configtree.py | 18 | ||||
| -rw-r--r-- | python/vyos/defaults.py | 4 | ||||
| -rw-r--r-- | python/vyos/firewall.py | 58 | ||||
| -rw-r--r-- | python/vyos/ifconfig/bond.py | 4 | ||||
| -rw-r--r-- | python/vyos/ifconfig/bridge.py | 4 | ||||
| -rw-r--r-- | python/vyos/ifconfig/ethernet.py | 2 | ||||
| -rw-r--r-- | python/vyos/ifconfig/interface.py | 114 | ||||
| -rw-r--r-- | python/vyos/ifconfig/pppoe.py | 2 | ||||
| -rw-r--r-- | python/vyos/ifconfig/tunnel.py | 2 | ||||
| -rw-r--r-- | python/vyos/nat.py | 33 | ||||
| -rw-r--r-- | python/vyos/pki.py | 14 | ||||
| -rw-r--r-- | python/vyos/qos/base.py | 24 | ||||
| -rw-r--r-- | python/vyos/template.py | 9 | ||||
| -rw-r--r-- | python/vyos/utils/__init__.py | 1 | ||||
| -rw-r--r-- | python/vyos/utils/assertion.py | 81 | ||||
| -rw-r--r-- | python/vyos/utils/convert.py | 46 | ||||
| -rw-r--r-- | python/vyos/utils/network.py | 186 | ||||
| -rw-r--r-- | python/vyos/validate.py | 321 | ||||
| -rw-r--r-- | python/vyos/xml_ref/__init__.py | 28 | ||||
| -rw-r--r-- | python/vyos/xml_ref/definition.py | 101 | 
24 files changed, 700 insertions, 539 deletions
| diff --git a/python/vyos/component_version.py b/python/vyos/component_version.py index a4e318d08..84e0ae51a 100644 --- a/python/vyos/component_version.py +++ b/python/vyos/component_version.py @@ -37,7 +37,7 @@ import re  import sys  import fileinput -from vyos.xml import component_version +from vyos.xml_ref import component_version  from vyos.version import get_version  from vyos.defaults import directories diff --git a/python/vyos/config.py b/python/vyos/config.py index 179f60c43..6fececd76 100644 --- a/python/vyos/config.py +++ b/python/vyos/config.py @@ -66,17 +66,31 @@ In operational mode, all functions return values from the running config.  import re  import json  from copy import deepcopy +from typing import Union  import vyos.configtree -from vyos.xml_ref import multi_to_list, from_source -from vyos.xml_ref import merge_defaults, relative_defaults -from vyos.utils.dict import get_sub_dict, mangle_dict_keys -from vyos.configsource import ConfigSource, ConfigSourceSession +from vyos.xml_ref import multi_to_list +from vyos.xml_ref import from_source +from vyos.xml_ref import ext_dict_merge +from vyos.xml_ref import relative_defaults +from vyos.utils.dict import get_sub_dict +from vyos.utils.dict import mangle_dict_keys +from vyos.configsource import ConfigSource +from vyos.configsource import ConfigSourceSession  class ConfigDict(dict):      _from_defaults = {} -    def from_defaults(self, path: list[str]): +    _dict_kwargs = {} +    def from_defaults(self, path: list[str]) -> bool:          return from_source(self._from_defaults, path) +    @property +    def kwargs(self) -> dict: +        return self._dict_kwargs + +def config_dict_merge(src: dict, dest: Union[dict, ConfigDict]) -> ConfigDict: +    if not isinstance(dest, ConfigDict): +        dest = ConfigDict(dest) +    return ext_dict_merge(src, dest)  class Config(object):      """ @@ -229,6 +243,13 @@ class Config(object):          return config_dict +    def verify_mangling(self, key_mangling): +        if not (isinstance(key_mangling, tuple) and \ +                (len(key_mangling) == 2) and \ +                isinstance(key_mangling[0], str) and \ +                isinstance(key_mangling[1], str)): +            raise ValueError("key_mangling must be a tuple of two strings") +      def get_config_dict(self, path=[], effective=False, key_mangling=None,                          get_first_key=False, no_multi_convert=False,                          no_tag_node_value_mangle=False, @@ -243,44 +264,37 @@ class Config(object):          Returns: a dict representation of the config under path          """ +        kwargs = locals().copy() +        del kwargs['self'] +        del kwargs['no_multi_convert'] +        del kwargs['with_defaults'] +        del kwargs['with_recursive_defaults'] +          lpath = self._make_path(path)          root_dict = self.get_cached_root_dict(effective)          conf_dict = get_sub_dict(root_dict, lpath, get_first_key=get_first_key) -        if key_mangling is None and no_multi_convert and not (with_defaults or with_recursive_defaults): -            return deepcopy(conf_dict) -          rpath = lpath if get_first_key else lpath[:-1]          if not no_multi_convert:              conf_dict = multi_to_list(rpath, conf_dict) +        if key_mangling is not None: +            self.verify_mangling(key_mangling) +            conf_dict = mangle_dict_keys(conf_dict, +                                         key_mangling[0], key_mangling[1], +                                         abs_path=rpath, +                                         no_tag_node_value_mangle=no_tag_node_value_mangle) +          if with_defaults or with_recursive_defaults: +            defaults = self.get_config_defaults(**kwargs, +                                                recursive=with_recursive_defaults) +            conf_dict = config_dict_merge(defaults, conf_dict) +        else:              conf_dict = ConfigDict(conf_dict) -            conf_dict = merge_defaults(lpath, conf_dict, -                                       get_first_key=get_first_key, -                                       recursive=with_recursive_defaults) -        if key_mangling is None: -            return conf_dict - -        if not (isinstance(key_mangling, tuple) and \ -                (len(key_mangling) == 2) and \ -                isinstance(key_mangling[0], str) and \ -                isinstance(key_mangling[1], str)): -            raise ValueError("key_mangling must be a tuple of two strings") - -        def mangle(obj): -            return mangle_dict_keys(obj, key_mangling[0], key_mangling[1], -                                    abs_path=rpath, -                                    no_tag_node_value_mangle=no_tag_node_value_mangle) - -        if isinstance(conf_dict, ConfigDict): -            from_defaults = mangle(conf_dict._from_defaults) -            conf_dict = mangle(conf_dict) -            conf_dict._from_defaults = from_defaults -        else: -            conf_dict = mangle(conf_dict) +        # save optional args for a call to get_config_defaults +        setattr(conf_dict, '_dict_kwargs', kwargs)          return conf_dict @@ -294,21 +308,29 @@ class Config(object):          defaults = relative_defaults(lpath, conf_dict,                                       get_first_key=get_first_key,                                       recursive=recursive) -        if key_mangling is None: -            return defaults          rpath = lpath if get_first_key else lpath[:-1] -        if not (isinstance(key_mangling, tuple) and \ -                (len(key_mangling) == 2) and \ -                isinstance(key_mangling[0], str) and \ -                isinstance(key_mangling[1], str)): -            raise ValueError("key_mangling must be a tuple of two strings") - -        defaults = mangle_dict_keys(defaults, key_mangling[0], key_mangling[1], abs_path=rpath, no_tag_node_value_mangle=no_tag_node_value_mangle) +        if key_mangling is not None: +            self.verify_mangling(key_mangling) +            defaults = mangle_dict_keys(defaults, +                                        key_mangling[0], key_mangling[1], +                                        abs_path=rpath, +                                        no_tag_node_value_mangle=no_tag_node_value_mangle)          return defaults +    def merge_defaults(self, config_dict: ConfigDict, recursive=False): +        if not isinstance(config_dict, ConfigDict): +            raise TypeError('argument is not of type ConfigDict') +        if not config_dict.kwargs: +            raise ValueError('argument missing metadata') + +        args = config_dict.kwargs +        d = self.get_config_defaults(**args, recursive=recursive) +        config_dict = config_dict_merge(d, config_dict) +        return config_dict +      def is_multi(self, path):          """          Args: diff --git a/python/vyos/configdict.py b/python/vyos/configdict.py index f642d38f2..71a06b625 100644 --- a/python/vyos/configdict.py +++ b/python/vyos/configdict.py @@ -20,7 +20,6 @@ import os  import json  from vyos.utils.dict import dict_search -from vyos.xml import defaults  from vyos.utils.process import cmd  def retrieve_config(path_hash, base_path, config): @@ -177,24 +176,6 @@ def get_removed_vlans(conf, path, dict):      return dict -def T2665_set_dhcpv6pd_defaults(config_dict): -    """ Properly configure DHCPv6 default options in the dictionary. If there is -    no DHCPv6 configured at all, it is safe to remove the entire configuration. -    """ -    # As this is the same for every interface type it is safe to assume this -    # for ethernet -    pd_defaults = defaults(['interfaces', 'ethernet', 'dhcpv6-options', 'pd']) - -    # Implant default dictionary for DHCPv6-PD instances -    if dict_search('dhcpv6_options.pd.length', config_dict): -        del config_dict['dhcpv6_options']['pd']['length'] - -    for pd in (dict_search('dhcpv6_options.pd', config_dict) or []): -        config_dict['dhcpv6_options']['pd'][pd] = dict_merge(pd_defaults, -            config_dict['dhcpv6_options']['pd'][pd]) - -    return config_dict -  def is_member(conf, interface, intftype=None):      """      Checks if passed interface is member of other interface of specified type. @@ -263,6 +244,48 @@ def is_mirror_intf(conf, interface, direction=None):      return ret_val +def has_address_configured(conf, intf): +    """ +    Checks if interface has an address configured. +    Checks the following config nodes: +    'address', 'ipv6 address eui64', 'ipv6 address autoconf' + +    Returns True if interface has address configured, False if it doesn't. +    """ +    from vyos.ifconfig import Section +    ret = False + +    old_level = conf.get_level() +    conf.set_level([]) + +    intfpath = 'interfaces ' + Section.get_config_path(intf) +    if ( conf.exists(f'{intfpath} address') or +            conf.exists(f'{intfpath} ipv6 address autoconf') or +            conf.exists(f'{intfpath} ipv6 address eui64') ): +        ret = True + +    conf.set_level(old_level) +    return ret + +def has_vrf_configured(conf, intf): +    """ +    Checks if interface has a VRF configured. + +    Returns True if interface has VRF configured, False if it doesn't. +    """ +    from vyos.ifconfig import Section +    ret = False + +    old_level = conf.get_level() +    conf.set_level([]) + +    tmp = ['interfaces', Section.get_config_path(intf), 'vrf'] +    if conf.exists(tmp): +        ret = True + +    conf.set_level(old_level) +    return ret +  def has_vlan_subinterface_configured(conf, intf):      """      Checks if interface has an VLAN subinterface configured. @@ -455,6 +478,10 @@ def get_interface_dict(config, base, ifname='', recursive_defaults=True):      dhcp = is_node_changed(config, base + [ifname, 'dhcp-options'])      if dhcp: dict.update({'dhcp_options_changed' : {}}) +    # Changine interface VRF assignemnts require a DHCP restart, too +    dhcp = is_node_changed(config, base + [ifname, 'vrf']) +    if dhcp: dict.update({'dhcp_options_changed' : {}}) +      # Some interfaces come with a source_interface which must also not be part      # of any other bond or bridge interface as it is exclusivly assigned as the      # Kernels "lower" interface to this new "virtual/upper" interface. diff --git a/python/vyos/configdiff.py b/python/vyos/configdiff.py index 0caa204c3..1ec2dfafe 100644 --- a/python/vyos/configdiff.py +++ b/python/vyos/configdiff.py @@ -22,7 +22,7 @@ from vyos.configdict import list_diff  from vyos.utils.dict import get_sub_dict  from vyos.utils.dict import mangle_dict_keys  from vyos.utils.dict import dict_search_args -from vyos.xml import defaults +from vyos.xml_ref import get_defaults  class ConfigDiffError(Exception):      """ @@ -240,7 +240,9 @@ class ConfigDiff(object):                          if self._key_mangling:                              ret[k] = self._mangle_dict_keys(ret[k])                          if k in target_defaults and not no_defaults: -                            default_values = defaults(self._make_path(path)) +                            default_values = get_defaults(self._make_path(path), +                                                          get_first_key=True, +                                                          recursive=True)                              ret[k] = dict_merge(default_values, ret[k])                  return ret @@ -264,7 +266,9 @@ class ConfigDiff(object):                      ret[k] = self._mangle_dict_keys(ret[k])                  if k in target_defaults and not no_defaults: -                    default_values = defaults(self._make_path(path)) +                    default_values = get_defaults(self._make_path(path), +                                                  get_first_key=True, +                                                  recursive=True)                      ret[k] = dict_merge(default_values, ret[k])          return ret @@ -312,7 +316,9 @@ class ConfigDiff(object):                          if self._key_mangling:                              ret[k] = self._mangle_dict_keys(ret[k])                          if k in target_defaults and not no_defaults: -                            default_values = defaults(self._make_path(path)) +                            default_values = get_defaults(self._make_path(path), +                                                          get_first_key=True, +                                                          recursive=True)                              ret[k] = dict_merge(default_values, ret[k])                  return ret @@ -335,7 +341,9 @@ class ConfigDiff(object):                      ret[k] = self._mangle_dict_keys(ret[k])                  if k in target_defaults and not no_defaults: -                    default_values = defaults(self._make_path(path)) +                    default_values = get_defaults(self._make_path(path), +                                                  get_first_key=True, +                                                  recursive=True)                      ret[k] = dict_merge(default_values, ret[k])          return ret diff --git a/python/vyos/configtree.py b/python/vyos/configtree.py index e18d9817d..09cfd43d3 100644 --- a/python/vyos/configtree.py +++ b/python/vyos/configtree.py @@ -383,14 +383,16 @@ def union(left, right, libpath=LIBPATH):      return tree  def reference_tree_to_json(from_dir, to_file, libpath=LIBPATH): -    __lib = cdll.LoadLibrary(libpath) -    __reference_tree_to_json = __lib.reference_tree_to_json -    __reference_tree_to_json.argtypes = [c_char_p, c_char_p] -    __get_error = __lib.get_error -    __get_error.argtypes = [] -    __get_error.restype = c_char_p - -    res = __reference_tree_to_json(from_dir.encode(), to_file.encode()) +    try: +        __lib = cdll.LoadLibrary(libpath) +        __reference_tree_to_json = __lib.reference_tree_to_json +        __reference_tree_to_json.argtypes = [c_char_p, c_char_p] +        __get_error = __lib.get_error +        __get_error.argtypes = [] +        __get_error.restype = c_char_p +        res = __reference_tree_to_json(from_dir.encode(), to_file.encode()) +    except Exception as e: +        raise ConfigTreeError(e)      if res == 1:          msg = __get_error().decode()          raise ConfigTreeError(msg) diff --git a/python/vyos/defaults.py b/python/vyos/defaults.py index d4ffc249e..a5314790d 100644 --- a/python/vyos/defaults.py +++ b/python/vyos/defaults.py @@ -32,7 +32,9 @@ directories = {    'api_schema': f'{base_dir}/services/api/graphql/graphql/schema/',    'api_client_op': f'{base_dir}/services/api/graphql/graphql/client_op/',    'api_templates': f'{base_dir}/services/api/graphql/session/templates/', -  'vyos_udev_dir' : '/run/udev/vyos' +  'vyos_udev_dir' : '/run/udev/vyos', +  'isc_dhclient_dir' : '/run/dhclient', +  'dhcp6_client_dir' : '/run/dhcp6c',  }  config_status = '/tmp/vyos-config-status' diff --git a/python/vyos/firewall.py b/python/vyos/firewall.py index 903cc8535..4aa509fe2 100644 --- a/python/vyos/firewall.py +++ b/python/vyos/firewall.py @@ -41,14 +41,19 @@ def fqdn_config_parse(firewall):      firewall['ip6_fqdn'] = {}      for domain, path in dict_search_recursive(firewall, 'fqdn'): -        fw_name = path[1] # name/ipv6-name -        rule = path[3] # rule id -        suffix = path[4][0] # source/destination (1 char) -        set_name = f'{fw_name}_{rule}_{suffix}' - -        if path[0] == 'name': +        hook_name = path[1] +        priority = path[2] + +        fw_name = path[2] +        rule = path[4] +        suffix = path[5][0] +        set_name = f'{hook_name}_{priority}_{rule}_{suffix}' +             +        if (path[0] == 'ipv4') and (path[1] == 'forward' or path[1] == 'input' or path[1] == 'output' or path[1] == 'name'):              firewall['ip_fqdn'][set_name] = domain -        elif path[0] == 'ipv6_name': +        elif (path[0] == 'ipv6') and (path[1] == 'forward' or path[1] == 'input' or path[1] == 'output' or path[1] == 'name'): +            if path[1] == 'name': +                set_name = f'name6_{priority}_{rule}_{suffix}'              firewall['ip6_fqdn'][set_name] = domain  def fqdn_resolve(fqdn, ipv6=False): @@ -80,7 +85,7 @@ def nft_action(vyos_action):          return 'return'      return vyos_action -def parse_rule(rule_conf, fw_name, rule_id, ip_name): +def parse_rule(rule_conf, hook, fw_name, rule_id, ip_name):      output = []      def_suffix = '6' if ip_name == 'ip6' else '' @@ -129,16 +134,34 @@ def parse_rule(rule_conf, fw_name, rule_id, ip_name):              if 'fqdn' in side_conf:                  fqdn = side_conf['fqdn'] +                hook_name = ''                  operator = ''                  if fqdn[0] == '!':                      operator = '!=' -                output.append(f'{ip_name} {prefix}addr {operator} @FQDN_{fw_name}_{rule_id}_{prefix}') +                if hook == 'FWD': +                    hook_name = 'forward' +                if hook == 'INP': +                    hook_name = 'input' +                if hook == 'OUT': +                    hook_name = 'output' +                if hook == 'NAM': +                    hook_name = f'name{def_suffix}' +                output.append(f'{ip_name} {prefix}addr {operator} @FQDN_{hook_name}_{fw_name}_{rule_id}_{prefix}')              if dict_search_args(side_conf, 'geoip', 'country_code'):                  operator = '' +                hook_name = ''                  if dict_search_args(side_conf, 'geoip', 'inverse_match') != None:                      operator = '!=' -                output.append(f'{ip_name} {prefix}addr {operator} @GEOIP_CC_{fw_name}_{rule_id}') +                if hook == 'FWD': +                    hook_name = 'forward' +                if hook == 'INP': +                    hook_name = 'input' +                if hook == 'OUT': +                    hook_name = 'output' +                if hook == 'NAM': +                    hook_name = f'name' +                output.append(f'{ip_name} {prefix}addr {operator} @GEOIP_CC{def_suffix}_{hook_name}_{fw_name}_{rule_id}')              if 'mac_address' in side_conf:                  suffix = side_conf["mac_address"] @@ -324,7 +347,7 @@ def parse_rule(rule_conf, fw_name, rule_id, ip_name):      if 'recent' in rule_conf:          count = rule_conf['recent']['count']          time = rule_conf['recent']['time'] -        output.append(f'add @RECENT{def_suffix}_{fw_name}_{rule_id} {{ {ip_name} saddr limit rate over {count}/{time} burst {count} packets }}') +        output.append(f'add @RECENT{def_suffix}_{hook}_{fw_name}_{rule_id} {{ {ip_name} saddr limit rate over {count}/{time} burst {count} packets }}')      if 'time' in rule_conf:          output.append(parse_time(rule_conf['time'])) @@ -348,7 +371,9 @@ def parse_rule(rule_conf, fw_name, rule_id, ip_name):          output.append(parse_policy_set(rule_conf['set'], def_suffix))      if 'action' in rule_conf: -        output.append(nft_action(rule_conf['action'])) +        # Change action=return to action=action +        # #output.append(nft_action(rule_conf['action'])) +        output.append(f'{rule_conf["action"]}')          if 'jump' in rule_conf['action']:              target = rule_conf['jump_target']              output.append(f'NAME{def_suffix}_{target}') @@ -365,7 +390,7 @@ def parse_rule(rule_conf, fw_name, rule_id, ip_name):      else:          output.append('return') -    output.append(f'comment "{fw_name}-{rule_id}"') +    output.append(f'comment "{hook}-{fw_name}-{rule_id}"')      return " ".join(output)  def parse_tcp_flags(flags): @@ -493,11 +518,12 @@ def geoip_update(firewall, force=False):          # Map country codes to set names          for codes, path in dict_search_recursive(firewall, 'country_code'): -            set_name = f'GEOIP_CC_{path[1]}_{path[3]}' -            if path[0] == 'name': +            set_name = f'GEOIP_CC_{path[1]}_{path[2]}_{path[4]}' +            if ( path[0] == 'ipv4'):                  for code in codes:                      ipv4_codes.setdefault(code, []).append(set_name) -            elif path[0] == 'ipv6_name': +            elif ( path[0] == 'ipv6' ): +                set_name = f'GEOIP_CC6_{path[1]}_{path[2]}_{path[4]}'                  for code in codes:                      ipv6_codes.setdefault(code, []).append(set_name) diff --git a/python/vyos/ifconfig/bond.py b/python/vyos/ifconfig/bond.py index e88f860be..d1d7d48c4 100644 --- a/python/vyos/ifconfig/bond.py +++ b/python/vyos/ifconfig/bond.py @@ -18,8 +18,8 @@ import os  from vyos.ifconfig.interface import Interface  from vyos.utils.process import cmd  from vyos.utils.dict import dict_search -from vyos.validate import assert_list -from vyos.validate import assert_positive +from vyos.utils.assertion import assert_list +from vyos.utils.assertion import assert_positive  @Interface.register  class BondIf(Interface): diff --git a/python/vyos/ifconfig/bridge.py b/python/vyos/ifconfig/bridge.py index b103b49d8..b29e71394 100644 --- a/python/vyos/ifconfig/bridge.py +++ b/python/vyos/ifconfig/bridge.py @@ -17,8 +17,8 @@ from netifaces import interfaces  import json  from vyos.ifconfig.interface import Interface -from vyos.validate import assert_boolean -from vyos.validate import assert_positive +from vyos.utils.assertion import assert_boolean +from vyos.utils.assertion import assert_positive  from vyos.utils.process import cmd  from vyos.utils.dict import dict_search  from vyos.configdict import get_vlan_ids diff --git a/python/vyos/ifconfig/ethernet.py b/python/vyos/ifconfig/ethernet.py index 4ff044c23..24ce3a803 100644 --- a/python/vyos/ifconfig/ethernet.py +++ b/python/vyos/ifconfig/ethernet.py @@ -23,7 +23,7 @@ from vyos.ifconfig.interface import Interface  from vyos.utils.dict import dict_search  from vyos.utils.file import read_file  from vyos.utils.process import run -from vyos.validate import assert_list +from vyos.utils.assertion import assert_list  @Interface.register  class EthernetIf(Interface): diff --git a/python/vyos/ifconfig/interface.py b/python/vyos/ifconfig/interface.py index efacad902..ddac387e7 100644 --- a/python/vyos/ifconfig/interface.py +++ b/python/vyos/ifconfig/interface.py @@ -31,6 +31,7 @@ from vyos import ConfigError  from vyos.configdict import list_diff  from vyos.configdict import dict_merge  from vyos.configdict import get_vlan_ids +from vyos.defaults import directories  from vyos.template import render  from vyos.utils.network import mac2eui64  from vyos.utils.dict import dict_search @@ -40,14 +41,14 @@ from vyos.utils.network import get_interface_namespace  from vyos.utils.process import is_systemd_service_active  from vyos.template import is_ipv4  from vyos.template import is_ipv6 -from vyos.validate import is_intf_addr_assigned -from vyos.validate import is_ipv6_link_local -from vyos.validate import assert_boolean -from vyos.validate import assert_list -from vyos.validate import assert_mac -from vyos.validate import assert_mtu -from vyos.validate import assert_positive -from vyos.validate import assert_range +from vyos.utils.network import is_intf_addr_assigned +from vyos.utils.network import is_ipv6_link_local +from vyos.utils.assertion import assert_boolean +from vyos.utils.assertion import assert_list +from vyos.utils.assertion import assert_mac +from vyos.utils.assertion import assert_mtu +from vyos.utils.assertion import assert_positive +from vyos.utils.assertion import assert_range  from vyos.ifconfig.control import Control  from vyos.ifconfig.vrrp import VRRP @@ -190,6 +191,10 @@ class Interface(Control):              'validate': lambda fwd: assert_range(fwd,0,2),              'location': '/proc/sys/net/ipv6/conf/{ifname}/forwarding',          }, +        'ipv6_accept_dad': { +            'validate': lambda dad: assert_range(dad,0,3), +            'location': '/proc/sys/net/ipv6/conf/{ifname}/accept_dad', +        },          'ipv6_dad_transmits': {              'validate': assert_positive,              'location': '/proc/sys/net/ipv6/conf/{ifname}/dad_transmits', @@ -259,6 +264,9 @@ class Interface(Control):          'ipv6_forwarding': {              'location': '/proc/sys/net/ipv6/conf/{ifname}/forwarding',          }, +        'ipv6_accept_dad': { +            'location': '/proc/sys/net/ipv6/conf/{ifname}/accept_dad', +        },          'ipv6_dad_transmits': {              'location': '/proc/sys/net/ipv6/conf/{ifname}/dad_transmits',          }, @@ -853,6 +861,13 @@ class Interface(Control):              return None          return self.set_interface('ipv6_forwarding', forwarding) +    def set_ipv6_dad_accept(self, dad): +        """Whether to accept DAD (Duplicate Address Detection)""" +        tmp = self.get_interface('ipv6_accept_dad') +        if tmp == dad: +            return None +        return self.set_interface('ipv6_accept_dad', dad) +      def set_ipv6_dad_messages(self, dad):          """          The amount of Duplicate Address Detection probes to send. @@ -1248,44 +1263,49 @@ class Interface(Control):              raise ValueError()          ifname = self.ifname -        config_base = r'/var/lib/dhcp/dhclient' -        config_file = f'{config_base}_{ifname}.conf' -        options_file = f'{config_base}_{ifname}.options' -        pid_file = f'{config_base}_{ifname}.pid' -        lease_file = f'{config_base}_{ifname}.leases' +        config_base = directories['isc_dhclient_dir'] + '/dhclient' +        dhclient_config_file = f'{config_base}_{ifname}.conf' +        dhclient_lease_file = f'{config_base}_{ifname}.leases' +        systemd_override_file = f'/run/systemd/system/dhclient@{ifname}.service.d/10-override.conf'          systemd_service = f'dhclient@{ifname}.service' +        # Rendered client configuration files require the apsolute config path +        self.config['isc_dhclient_dir'] = directories['isc_dhclient_dir'] +          # 'up' check is mandatory b/c even if the interface is A/D, as soon as          # the DHCP client is started the interface will be placed in u/u state.          # This is not what we intended to do when disabling an interface. -        if enable and 'disable' not in self._config: -            if dict_search('dhcp_options.host_name', self._config) == None: +        if enable and 'disable' not in self.config: +            if dict_search('dhcp_options.host_name', self.config) == None:                  # read configured system hostname.                  # maybe change to vyos hostd client ???                  hostname = 'vyos'                  with open('/etc/hostname', 'r') as f:                      hostname = f.read().rstrip('\n')                      tmp = {'dhcp_options' : { 'host_name' : hostname}} -                    self._config = dict_merge(tmp, self._config) +                    self.config = dict_merge(tmp, self.config) -            render(options_file, 'dhcp-client/daemon-options.j2', self._config) -            render(config_file, 'dhcp-client/ipv4.j2', self._config) +            render(systemd_override_file, 'dhcp-client/override.conf.j2', self.config) +            render(dhclient_config_file, 'dhcp-client/ipv4.j2', self.config) + +            # Reload systemd unit definitons as some options are dynamically generated +            self._cmd('systemctl daemon-reload')              # When the DHCP client is restarted a brief outage will occur, as              # the old lease is released a new one is acquired (T4203). We will              # only restart DHCP client if it's option changed, or if it's not              # running, but it should be running (e.g. on system startup) -            if 'dhcp_options_changed' in self._config or not is_systemd_service_active(systemd_service): +            if 'dhcp_options_changed' in self.config or not is_systemd_service_active(systemd_service):                  return self._cmd(f'systemctl restart {systemd_service}') -            return None          else:              if is_systemd_service_active(systemd_service):                  self._cmd(f'systemctl stop {systemd_service}')              # cleanup old config files -            for file in [config_file, options_file, pid_file, lease_file]: +            for file in [dhclient_config_file, systemd_override_file, dhclient_lease_file]:                  if os.path.isfile(file):                      os.remove(file) +        return None      def set_dhcpv6(self, enable):          """ @@ -1295,13 +1315,20 @@ class Interface(Control):              raise ValueError()          ifname = self.ifname -        config_file = f'/run/dhcp6c/dhcp6c.{ifname}.conf' -        options_file = f'/run/dhcp6c/dhcp6c.{ifname}.options' +        config_base = directories['dhcp6_client_dir'] +        config_file = f'{config_base}/dhcp6c.{ifname}.conf' +        systemd_override_file = f'/run/systemd/system/dhcp6c@{ifname}.service.d/10-override.conf'          systemd_service = f'dhcp6c@{ifname}.service' -        if enable and 'disable' not in self._config: -            render(options_file, 'dhcp-client/dhcp6c_daemon-options.j2', self._config) -            render(config_file, 'dhcp-client/ipv6.j2', self._config) +        # Rendered client configuration files require the apsolute config path +        self.config['dhcp6_client_dir'] = directories['dhcp6_client_dir'] + +        if enable and 'disable' not in self.config: +            render(systemd_override_file, 'dhcp-client/ipv6.override.conf.j2', self.config) +            render(config_file, 'dhcp-client/ipv6.j2', self.config) + +            # Reload systemd unit definitons as some options are dynamically generated +            self._cmd('systemctl daemon-reload')              # We must ignore any return codes. This is required to enable              # DHCPv6-PD for interfaces which are yet not up and running. @@ -1312,26 +1339,28 @@ class Interface(Control):              if os.path.isfile(config_file):                  os.remove(config_file) +        return None +      def set_mirror_redirect(self):          # Please refer to the document for details          #   - https://man7.org/linux/man-pages/man8/tc.8.html          #   - https://man7.org/linux/man-pages/man8/tc-mirred.8.html          # Depening if we are the source or the target interface of the port          # mirror we need to setup some variables. -        source_if = self._config['ifname'] +        source_if = self.config['ifname']          mirror_config = None -        if 'mirror' in self._config: -            mirror_config = self._config['mirror'] -        if 'is_mirror_intf' in self._config: -            source_if = next(iter(self._config['is_mirror_intf'])) -            mirror_config = self._config['is_mirror_intf'][source_if].get('mirror', None) +        if 'mirror' in self.config: +            mirror_config = self.config['mirror'] +        if 'is_mirror_intf' in self.config: +            source_if = next(iter(self.config['is_mirror_intf'])) +            mirror_config = self.config['is_mirror_intf'][source_if].get('mirror', None)          redirect_config = None          # clear existing ingess - ignore errors (e.g. "Error: Cannot find specified          # qdisc on specified device") - we simply cleanup all stuff here -        if not 'traffic_policy' in self._config: +        if not 'traffic_policy' in self.config:              self._popen(f'tc qdisc del dev {source_if} parent ffff: 2>/dev/null');              self._popen(f'tc qdisc del dev {source_if} parent 1: 2>/dev/null'); @@ -1355,11 +1384,11 @@ class Interface(Control):                  if err: print('tc qdisc(filter for mirror port failed')          # Apply interface traffic redirection policy -        elif 'redirect' in self._config: +        elif 'redirect' in self.config:              _, err = self._popen(f'tc qdisc add dev {source_if} handle ffff: ingress')              if err: print(f'tc qdisc add for redirect failed!') -            target_if = self._config['redirect'] +            target_if = self.config['redirect']              _, err = self._popen(f'tc filter add dev {source_if} parent ffff: protocol '\                                   f'all prio 10 u32 match u32 0 0 flowid 1:1 action mirred '\                                   f'egress redirect dev {target_if}') @@ -1402,7 +1431,7 @@ class Interface(Control):          # Cache the configuration - it will be reused inside e.g. DHCP handler          # XXX: maybe pass the option via __init__ in the future and rename this          # method to apply()? -        self._config = config +        self.config = config          # Change interface MAC address - re-set to real hardware address (hw-id)          # if custom mac is removed. Skip if bond member. @@ -1568,10 +1597,17 @@ class Interface(Control):          value = '1' if (tmp != None) else '0'          self.set_ipv6_autoconf(value) -        # IPv6 Duplicate Address Detection (DAD) tries +        # Whether to accept IPv6 DAD (Duplicate Address Detection) packets +        tmp = dict_search('ipv6.accept_dad', config) +        # Not all interface types got this CLI option, but if they do, there +        # is an XML defaultValue available +        if (tmp != None): self.set_ipv6_dad_accept(tmp) + +        # IPv6 DAD tries          tmp = dict_search('ipv6.dup_addr_detect_transmits', config) -        value = tmp if (tmp != None) else '1' -        self.set_ipv6_dad_messages(value) +        # Not all interface types got this CLI option, but if they do, there +        # is an XML defaultValue available +        if (tmp != None): self.set_ipv6_dad_messages(tmp)          # Delete old IPv6 EUI64 addresses before changing MAC          for addr in (dict_search('ipv6.address.eui64_old', config) or []): diff --git a/python/vyos/ifconfig/pppoe.py b/python/vyos/ifconfig/pppoe.py index fd4590beb..febf1452d 100644 --- a/python/vyos/ifconfig/pppoe.py +++ b/python/vyos/ifconfig/pppoe.py @@ -14,7 +14,7 @@  # License along with this library.  If not, see <http://www.gnu.org/licenses/>.  from vyos.ifconfig.interface import Interface -from vyos.validate import assert_range +from vyos.utils.assertion import assert_range  from vyos.utils.network import get_interface_config  @Interface.register diff --git a/python/vyos/ifconfig/tunnel.py b/python/vyos/ifconfig/tunnel.py index fb2f38e2b..9ba7b31a6 100644 --- a/python/vyos/ifconfig/tunnel.py +++ b/python/vyos/ifconfig/tunnel.py @@ -18,7 +18,7 @@  from vyos.ifconfig.interface import Interface  from vyos.utils.dict import dict_search -from vyos.validate import assert_list +from vyos.utils.assertion import assert_list  def enable_to_on(value):      if value == 'enable': diff --git a/python/vyos/nat.py b/python/vyos/nat.py index 603fedb9b..b6702f7e2 100644 --- a/python/vyos/nat.py +++ b/python/vyos/nat.py @@ -94,6 +94,39 @@ def parse_nat_rule(rule_conf, rule_id, nat_type, ipv6=False):          if options:              translation_str += f' {",".join(options)}' +        if not ipv6 and 'backend' in rule_conf['load_balance']: +            hash_input_items = [] +            current_prob = 0 +            nat_map = [] + +            for trans_addr, addr in rule_conf['load_balance']['backend'].items(): +                item_prob = int(addr['weight']) +                upper_limit = current_prob + item_prob - 1 +                hash_val = str(current_prob) + '-' + str(upper_limit) +                element = hash_val + " : " + trans_addr +                nat_map.append(element) +                current_prob = current_prob + item_prob + +            elements = ' , '.join(nat_map) + +            if 'hash' in rule_conf['load_balance'] and 'random' in rule_conf['load_balance']['hash']: +                translation_str += ' numgen random mod 100 map ' + '{ ' + f'{elements}' + ' }' +            else: +                for input_param in rule_conf['load_balance']['hash']: +                    if input_param == 'source-address': +                        param = 'ip saddr' +                    elif input_param == 'destination-address': +                        param = 'ip daddr' +                    elif input_param == 'source-port': +                        prot = rule_conf['protocol'] +                        param = f'{prot} sport' +                    elif input_param == 'destination-port': +                        prot = rule_conf['protocol'] +                        param = f'{prot} dport' +                    hash_input_items.append(param) +                hash_input = ' . '.join(hash_input_items) +                translation_str += f' jhash ' + f'{hash_input}' + ' mod 100 map ' + '{ ' + f'{elements}' + ' }' +      for target in ['source', 'destination']:          if target not in rule_conf:              continue diff --git a/python/vyos/pki.py b/python/vyos/pki.py index cd15e3878..792e24b76 100644 --- a/python/vyos/pki.py +++ b/python/vyos/pki.py @@ -1,6 +1,6 @@  #!/usr/bin/env python3  # -# Copyright (C) 2021 VyOS maintainers and contributors +# Copyright (C) 2023 VyOS maintainers and contributors  #  # This program is free software; you can redistribute it and/or modify  # it under the terms of the GNU General Public License version 2 or later as @@ -63,6 +63,18 @@ private_format_map = {      'OpenSSH': serialization.PrivateFormat.OpenSSH  } +hash_map = { +    'sha256': hashes.SHA256, +    'sha384': hashes.SHA384, +    'sha512': hashes.SHA512, +} + +def get_certificate_fingerprint(cert, hash): +    hash_algorithm = hash_map[hash]() +    fp = cert.fingerprint(hash_algorithm) + +    return fp.hex(':').upper() +  def encode_certificate(cert):      return cert.public_bytes(encoding=serialization.Encoding.PEM).decode('utf-8') diff --git a/python/vyos/qos/base.py b/python/vyos/qos/base.py index 6c5a3d79c..d8bbfe970 100644 --- a/python/vyos/qos/base.py +++ b/python/vyos/qos/base.py @@ -107,7 +107,8 @@ class QoSBase:              queue_limit = dict_search('queue_limit', config)              for ii in range(1, 4): -                tmp = f'tc qdisc replace dev {self._interface} parent {handle:x}:{ii:x} pfifo limit {queue_limit}' +                tmp = f'tc qdisc replace dev {self._interface} parent {handle:x}:{ii:x} pfifo' +                if queue_limit: tmp += f' limit {queue_limit}'                  self._cmd(tmp)          elif queue_type == 'fair-queue': @@ -297,6 +298,27 @@ class QoSBase:                                  filter_cmd += f' flowid {self._parent:x}:{cls:x}'                                  self._cmd(filter_cmd) +                    if any(tmp in ['exceed', 'bandwidth', 'burst'] for tmp in cls_config): +                        filter_cmd += f' action police' + +                        if 'exceed' in cls_config: +                            action = cls_config['exceed'] +                            filter_cmd += f' conform-exceed {action}' +                        if 'not_exceed' in cls_config: +                            action = cls_config['not_exceed'] +                            filter_cmd += f'/{action}' + +                        if 'bandwidth' in cls_config: +                            rate = self._rate_convert(cls_config['bandwidth']) +                            filter_cmd += f' rate {rate}' + +                        if 'burst' in cls_config: +                            burst = cls_config['burst'] +                            filter_cmd += f' burst {burst}' +                        cls = int(cls) +                        filter_cmd += f' flowid {self._parent:x}:{cls:x}' +                        self._cmd(filter_cmd) +                  else:                      filter_cmd += ' basic' diff --git a/python/vyos/template.py b/python/vyos/template.py index 7d1c3970f..e167488c6 100644 --- a/python/vyos/template.py +++ b/python/vyos/template.py @@ -420,7 +420,7 @@ def get_dhcp_router(interface):      Returns False of no router is found, returns the IP address as string if      a router is found.      """ -    lease_file = f'/var/lib/dhcp/dhclient_{interface}.leases' +    lease_file = directories['isc_dhclient_dir'] + f'/dhclient_{interface}.leases'      if not os.path.exists(lease_file):          return None @@ -574,9 +574,9 @@ def nft_action(vyos_action):      return vyos_action  @register_filter('nft_rule') -def nft_rule(rule_conf, fw_name, rule_id, ip_name='ip'): +def nft_rule(rule_conf, fw_hook, fw_name, rule_id, ip_name='ip'):      from vyos.firewall import parse_rule -    return parse_rule(rule_conf, fw_name, rule_id, ip_name) +    return parse_rule(rule_conf, fw_hook, fw_name, rule_id, ip_name)  @register_filter('nft_default_rule')  def nft_default_rule(fw_conf, fw_name, ipv6=False): @@ -587,7 +587,8 @@ def nft_default_rule(fw_conf, fw_name, ipv6=False):          action_suffix = default_action[:1].upper()          output.append(f'log prefix "[{fw_name[:19]}-default-{action_suffix}]"') -    output.append(nft_action(default_action)) +    #output.append(nft_action(default_action)) +    output.append(f'{default_action}')      if 'default_jump_target' in fw_conf:          target = fw_conf['default_jump_target']          def_suffix = '6' if ipv6 else '' diff --git a/python/vyos/utils/__init__.py b/python/vyos/utils/__init__.py index f2783113a..12ef2d3b8 100644 --- a/python/vyos/utils/__init__.py +++ b/python/vyos/utils/__init__.py @@ -13,6 +13,7 @@  # You should have received a copy of the GNU Lesser General Public  # License along with this library.  If not, see <http://www.gnu.org/licenses/>. +from vyos.utils import assertion  from vyos.utils import auth  from vyos.utils import boot  from vyos.utils import commit diff --git a/python/vyos/utils/assertion.py b/python/vyos/utils/assertion.py new file mode 100644 index 000000000..1aaa54dff --- /dev/null +++ b/python/vyos/utils/assertion.py @@ -0,0 +1,81 @@ +# Copyright 2023 VyOS maintainers and contributors <maintainers@vyos.io> +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library.  If not, see <http://www.gnu.org/licenses/>. + +def assert_boolean(b): +    if int(b) not in (0, 1): +        raise ValueError(f'Value {b} out of range') + +def assert_range(value, lower=0, count=3): +    if int(value, 16) not in range(lower, lower+count): +        raise ValueError("Value out of range") + +def assert_list(s, l): +    if s not in l: +        o = ' or '.join([f'"{n}"' for n in l]) +        raise ValueError(f'state must be {o}, got {s}') + +def assert_number(n): +    if not str(n).isnumeric(): +        raise ValueError(f'{n} must be a number') + +def assert_positive(n, smaller=0): +    assert_number(n) +    if int(n) < smaller: +        raise ValueError(f'{n} is smaller than {smaller}') + +def assert_mtu(mtu, ifname): +    assert_number(mtu) + +    import json +    from vyos.utils.process import cmd +    out = cmd(f'ip -j -d link show dev {ifname}') +    # [{"ifindex":2,"ifname":"eth0","flags":["BROADCAST","MULTICAST","UP","LOWER_UP"],"mtu":1500,"qdisc":"pfifo_fast","operstate":"UP","linkmode":"DEFAULT","group":"default","txqlen":1000,"link_type":"ether","address":"08:00:27:d9:5b:04","broadcast":"ff:ff:ff:ff:ff:ff","promiscuity":0,"min_mtu":46,"max_mtu":16110,"inet6_addr_gen_mode":"none","num_tx_queues":1,"num_rx_queues":1,"gso_max_size":65536,"gso_max_segs":65535}] +    parsed = json.loads(out)[0] +    min_mtu = int(parsed.get('min_mtu', '0')) +    # cur_mtu = parsed.get('mtu',0), +    max_mtu = int(parsed.get('max_mtu', '0')) +    cur_mtu = int(mtu) + +    if (min_mtu and cur_mtu < min_mtu) or cur_mtu < 68: +        raise ValueError(f'MTU is too small for interface "{ifname}": {mtu} < {min_mtu}') +    if (max_mtu and cur_mtu > max_mtu) or cur_mtu > 65536: +        raise ValueError(f'MTU is too small for interface "{ifname}": {mtu} > {max_mtu}') + +def assert_mac(m): +    split = m.split(':') +    size = len(split) + +    # a mac address consits out of 6 octets +    if size != 6: +        raise ValueError(f'wrong number of MAC octets ({size}): {m}') + +    octets = [] +    try: +        for octet in split: +            octets.append(int(octet, 16)) +    except ValueError: +        raise ValueError(f'invalid hex number "{octet}" in : {m}') + +    # validate against the first mac address byte if it's a multicast +    # address +    if octets[0] & 1: +        raise ValueError(f'{m} is a multicast MAC address') + +    # overall mac address is not allowed to be 00:00:00:00:00:00 +    if sum(octets) == 0: +        raise ValueError('00:00:00:00:00:00 is not a valid MAC address') + +    if octets[:5] == (0, 0, 94, 0, 1): +        raise ValueError(f'{m} is a VRRP MAC address') diff --git a/python/vyos/utils/convert.py b/python/vyos/utils/convert.py index ec2333ef0..9a8a1ff7d 100644 --- a/python/vyos/utils/convert.py +++ b/python/vyos/utils/convert.py @@ -144,32 +144,54 @@ def mac_to_eui64(mac, prefix=None):          except:  # pylint: disable=bare-except              return -def convert_data(data): -    """Convert multiple types of data to types usable in CLI + +def convert_data(data) -> dict | list | tuple | str | int | float | bool | None: +    """Filter and convert multiple types of data to types usable in CLI/API + +    WARNING: Must not be used for anything except formatting output for API or CLI + +    On the output allowed everything supported in JSON.      Args: -        data (str | bytes | list | OrderedDict): input data +        data (Any): input data      Returns: -        str | list | dict: converted data +        dict | list | tuple | str | int | float | bool | None: converted data      """      from base64 import b64encode -    from collections import OrderedDict -    if isinstance(data, str): +    # return original data for types which do not require conversion +    if isinstance(data, str | int | float | bool | None):          return data -    if isinstance(data, bytes): -        try: -            return data.decode() -        except UnicodeDecodeError: -            return b64encode(data).decode() +      if isinstance(data, list):          list_tmp = []          for item in data:              list_tmp.append(convert_data(item))          return list_tmp -    if isinstance(data, OrderedDict): + +    if isinstance(data, tuple): +        list_tmp = list(data) +        tuple_tmp = tuple(convert_data(list_tmp)) +        return tuple_tmp + +    if isinstance(data, bytes | bytearray): +        try: +            return data.decode() +        except UnicodeDecodeError: +            return b64encode(data).decode() + +    if isinstance(data, set | frozenset): +        list_tmp = convert_data(list(data)) +        return list_tmp + +    if isinstance(data, dict):          dict_tmp = {}          for key, value in data.items():              dict_tmp[key] = convert_data(value)          return dict_tmp + +    # do not return anything for other types +    # which cannot be converted to JSON +    # for example: complex | range | memoryview +    return diff --git a/python/vyos/utils/network.py b/python/vyos/utils/network.py index 3786caf26..3f9a3ef4b 100644 --- a/python/vyos/utils/network.py +++ b/python/vyos/utils/network.py @@ -13,7 +13,15 @@  # You should have received a copy of the GNU Lesser General Public  # License along with this library.  If not, see <http://www.gnu.org/licenses/>. -import os +def _are_same_ip(one, two): +    from socket import AF_INET +    from socket import AF_INET6 +    from socket import inet_pton +    from vyos.template import is_ipv4 +    # compare the binary representation of the IP +    f_one = AF_INET if is_ipv4(one) else AF_INET6 +    s_two = AF_INET if is_ipv4(two) else AF_INET6 +    return inet_pton(f_one, one) == inet_pton(f_one, two)  def get_protocol_by_name(protocol_name):      """Get protocol number by protocol name @@ -48,6 +56,7 @@ def get_interface_config(interface):      """ Returns the used encapsulation protocol for given interface.          If interface does not exist, None is returned.      """ +    import os      if not os.path.exists(f'/sys/class/net/{interface}'):          return None      from json import loads @@ -59,6 +68,7 @@ def get_interface_address(interface):      """ Returns the used encapsulation protocol for given interface.          If interface does not exist, None is returned.      """ +    import os      if not os.path.exists(f'/sys/class/net/{interface}'):          return None      from json import loads @@ -85,7 +95,6 @@ def get_interface_namespace(iface):              if iface == tmp["ifname"]:                  return netns -  def is_wwan_connected(interface):      """ Determine if a given WWAN interface, e.g. wwan0 is connected to the      carrier network or not """ @@ -110,6 +119,7 @@ def is_wwan_connected(interface):  def get_bridge_fdb(interface):      """ Returns the forwarding database entries for a given interface """ +    import os      if not os.path.exists(f'/sys/class/net/{interface}'):          return None      from json import loads @@ -211,3 +221,175 @@ def is_listen_port_bind_service(port: int, service: str) -> bool:          if service == pid_name and port == pid_port:              return True      return False + +def is_ipv6_link_local(addr): +    """ Check if addrsss is an IPv6 link-local address. Returns True/False """ +    from ipaddress import ip_interface +    from vyos.template import is_ipv6 +    addr = addr.split('%')[0] +    if is_ipv6(addr): +        if ip_interface(addr).is_link_local: +            return True + +    return False + +def is_addr_assigned(ip_address, vrf=None) -> bool: +    """ Verify if the given IPv4/IPv6 address is assigned to any interface """ +    from netifaces import interfaces +    from vyos.utils.network import get_interface_config +    from vyos.utils.dict import dict_search + +    for interface in interfaces(): +        # Check if interface belongs to the requested VRF, if this is not the +        # case there is no need to proceed with this data set - continue loop +        # with next element +        tmp = get_interface_config(interface) +        if dict_search('master', tmp) != vrf: +            continue + +        if is_intf_addr_assigned(interface, ip_address): +            return True + +    return False + +def is_intf_addr_assigned(intf, address) -> bool: +    """ +    Verify if the given IPv4/IPv6 address is assigned to specific interface. +    It can check both a single IP address (e.g. 192.0.2.1 or a assigned CIDR +    address 192.0.2.1/24. +    """ +    from vyos.template import is_ipv4 + +    from netifaces import ifaddresses +    from netifaces import AF_INET +    from netifaces import AF_INET6 + +    # check if the requested address type is configured at all +    # { +    # 17: [{'addr': '08:00:27:d9:5b:04', 'broadcast': 'ff:ff:ff:ff:ff:ff'}], +    # 2:  [{'addr': '10.0.2.15', 'netmask': '255.255.255.0', 'broadcast': '10.0.2.255'}], +    # 10: [{'addr': 'fe80::a00:27ff:fed9:5b04%eth0', 'netmask': 'ffff:ffff:ffff:ffff::'}] +    # } +    try: +        addresses = ifaddresses(intf) +    except ValueError as e: +        print(e) +        return False + +    # determine IP version (AF_INET or AF_INET6) depending on passed address +    addr_type = AF_INET if is_ipv4(address) else AF_INET6 + +    # Check every IP address on this interface for a match +    netmask = None +    if '/' in address: +        address, netmask = address.split('/') +    for ip in addresses.get(addr_type, []): +        # ip can have the interface name in the 'addr' field, we need to remove it +        # {'addr': 'fe80::a00:27ff:fec5:f821%eth2', 'netmask': 'ffff:ffff:ffff:ffff::'} +        ip_addr = ip['addr'].split('%')[0] + +        if not _are_same_ip(address, ip_addr): +            continue + +        # we do not have a netmask to compare against, they are the same +        if not netmask: +            return True + +        prefixlen = '' +        if is_ipv4(ip_addr): +            prefixlen = sum([bin(int(_)).count('1') for _ in ip['netmask'].split('.')]) +        else: +            prefixlen = sum([bin(int(_,16)).count('1') for _ in ip['netmask'].split('/')[0].split(':') if _]) + +        if str(prefixlen) == netmask: +            return True + +    return False + +def is_loopback_addr(addr): +    """ Check if supplied IPv4/IPv6 address is a loopback address """ +    from ipaddress import ip_address +    return ip_address(addr).is_loopback + +def is_wireguard_key_pair(private_key: str, public_key:str) -> bool: +    """ +     Checks if public/private keys are keypair +    :param private_key: Wireguard private key +    :type private_key: str +    :param public_key: Wireguard public key +    :type public_key: str +    :return: If public/private keys are keypair returns True else False +    :rtype: bool +    """ +    from vyos.utils.process import cmd +    gen_public_key = cmd('wg pubkey', input=private_key) +    if gen_public_key == public_key: +        return True +    else: +        return False + +def is_subnet_connected(subnet, primary=False): +    """ +    Verify is the given IPv4/IPv6 subnet is connected to any interface on this +    system. + +    primary check if the subnet is reachable via the primary IP address of this +    interface, or in other words has a broadcast address configured. ISC DHCP +    for instance will complain if it should listen on non broadcast interfaces. + +    Return True/False +    """ +    from ipaddress import ip_address +    from ipaddress import ip_network + +    from netifaces import ifaddresses +    from netifaces import interfaces +    from netifaces import AF_INET +    from netifaces import AF_INET6 + +    from vyos.template import is_ipv6 + +    # determine IP version (AF_INET or AF_INET6) depending on passed address +    addr_type = AF_INET +    if is_ipv6(subnet): +        addr_type = AF_INET6 + +    for interface in interfaces(): +        # check if the requested address type is configured at all +        if addr_type not in ifaddresses(interface).keys(): +            continue + +        # An interface can have multiple addresses, but some software components +        # only support the primary address :( +        if primary: +            ip = ifaddresses(interface)[addr_type][0]['addr'] +            if ip_address(ip) in ip_network(subnet): +                return True +        else: +            # Check every assigned IP address if it is connected to the subnet +            # in question +            for ip in ifaddresses(interface)[addr_type]: +                # remove interface extension (e.g. %eth0) that gets thrown on the end of _some_ addrs +                addr = ip['addr'].split('%')[0] +                if ip_address(addr) in ip_network(subnet): +                    return True + +    return False + +def is_afi_configured(interface, afi): +    """ Check if given address family is configured, or in other words - an IP +    address is assigned to the interface. """ +    from netifaces import ifaddresses +    from netifaces import AF_INET +    from netifaces import AF_INET6 + +    if afi not in [AF_INET, AF_INET6]: +        raise ValueError('Address family must be in [AF_INET, AF_INET6]') + +    try: +        addresses = ifaddresses(interface) +    except ValueError as e: +        print(e) +        return False + +    return afi in addresses diff --git a/python/vyos/validate.py b/python/vyos/validate.py deleted file mode 100644 index b149b258f..000000000 --- a/python/vyos/validate.py +++ /dev/null @@ -1,321 +0,0 @@ -# Copyright 2018-2023 VyOS maintainers and contributors <maintainers@vyos.io> -# -# This library is free software; you can redistribute it and/or -# modify it under the terms of the GNU Lesser General Public -# License as published by the Free Software Foundation; either -# version 2.1 of the License, or (at your option) any later version. -# -# This library is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU -# Lesser General Public License for more details. -# -# You should have received a copy of the GNU Lesser General Public -# License along with this library.  If not, see <http://www.gnu.org/licenses/>. - -# Important note when you are adding new validation functions: -# -# The Control class will analyse the signature of the function in this file -# and will build the parameters to be passed to it. -# -# The parameter names "ifname" and "self" will get the Interface name and class -# parameters with default will be left unset -# all other paramters will receive the value to check - -def is_ipv6_link_local(addr): -    """ Check if addrsss is an IPv6 link-local address. Returns True/False """ -    from ipaddress import ip_interface -    from vyos.template import is_ipv6 -    addr = addr.split('%')[0] -    if is_ipv6(addr): -        if ip_interface(addr).is_link_local: -            return True - -    return False - -def _are_same_ip(one, two): -    from socket import AF_INET -    from socket import AF_INET6 -    from socket import inet_pton -    from vyos.template import is_ipv4 -    # compare the binary representation of the IP -    f_one = AF_INET if is_ipv4(one) else AF_INET6 -    s_two = AF_INET if is_ipv4(two) else AF_INET6 -    return inet_pton(f_one, one) == inet_pton(f_one, two) - -def is_intf_addr_assigned(intf, address) -> bool: -    """ -    Verify if the given IPv4/IPv6 address is assigned to specific interface. -    It can check both a single IP address (e.g. 192.0.2.1 or a assigned CIDR -    address 192.0.2.1/24. -    """ -    from vyos.template import is_ipv4 - -    from netifaces import ifaddresses -    from netifaces import AF_INET -    from netifaces import AF_INET6 - -    # check if the requested address type is configured at all -    # { -    # 17: [{'addr': '08:00:27:d9:5b:04', 'broadcast': 'ff:ff:ff:ff:ff:ff'}], -    # 2:  [{'addr': '10.0.2.15', 'netmask': '255.255.255.0', 'broadcast': '10.0.2.255'}], -    # 10: [{'addr': 'fe80::a00:27ff:fed9:5b04%eth0', 'netmask': 'ffff:ffff:ffff:ffff::'}] -    # } -    try: -        addresses = ifaddresses(intf) -    except ValueError as e: -        print(e) -        return False - -    # determine IP version (AF_INET or AF_INET6) depending on passed address -    addr_type = AF_INET if is_ipv4(address) else AF_INET6 - -    # Check every IP address on this interface for a match -    netmask = None -    if '/' in address: -        address, netmask = address.split('/') -    for ip in addresses.get(addr_type, []): -        # ip can have the interface name in the 'addr' field, we need to remove it -        # {'addr': 'fe80::a00:27ff:fec5:f821%eth2', 'netmask': 'ffff:ffff:ffff:ffff::'} -        ip_addr = ip['addr'].split('%')[0] - -        if not _are_same_ip(address, ip_addr): -            continue - -        # we do not have a netmask to compare against, they are the same -        if not netmask: -            return True - -        prefixlen = '' -        if is_ipv4(ip_addr): -            prefixlen = sum([bin(int(_)).count('1') for _ in ip['netmask'].split('.')]) -        else: -            prefixlen = sum([bin(int(_,16)).count('1') for _ in ip['netmask'].split('/')[0].split(':') if _]) - -        if str(prefixlen) == netmask: -            return True - -    return False - -def is_addr_assigned(ip_address, vrf=None) -> bool: -    """ Verify if the given IPv4/IPv6 address is assigned to any interface """ -    from netifaces import interfaces -    from vyos.utils.network import get_interface_config -    from vyos.utils.dict import dict_search - -    for interface in interfaces(): -        # Check if interface belongs to the requested VRF, if this is not the -        # case there is no need to proceed with this data set - continue loop -        # with next element -        tmp = get_interface_config(interface) -        if dict_search('master', tmp) != vrf: -            continue - -        if is_intf_addr_assigned(interface, ip_address): -            return True - -    return False - -def is_afi_configured(interface, afi): -    """ Check if given address family is configured, or in other words - an IP -    address is assigned to the interface. """ -    from netifaces import ifaddresses -    from netifaces import AF_INET -    from netifaces import AF_INET6 - -    if afi not in [AF_INET, AF_INET6]: -        raise ValueError('Address family must be in [AF_INET, AF_INET6]') - -    try: -        addresses = ifaddresses(interface) -    except ValueError as e: -        print(e) -        return False - -    return afi in addresses - -def is_loopback_addr(addr): -    """ Check if supplied IPv4/IPv6 address is a loopback address """ -    from ipaddress import ip_address -    return ip_address(addr).is_loopback - -def is_subnet_connected(subnet, primary=False): -    """ -    Verify is the given IPv4/IPv6 subnet is connected to any interface on this -    system. - -    primary check if the subnet is reachable via the primary IP address of this -    interface, or in other words has a broadcast address configured. ISC DHCP -    for instance will complain if it should listen on non broadcast interfaces. - -    Return True/False -    """ -    from ipaddress import ip_address -    from ipaddress import ip_network - -    from netifaces import ifaddresses -    from netifaces import interfaces -    from netifaces import AF_INET -    from netifaces import AF_INET6 - -    from vyos.template import is_ipv6 - -    # determine IP version (AF_INET or AF_INET6) depending on passed address -    addr_type = AF_INET -    if is_ipv6(subnet): -        addr_type = AF_INET6 - -    for interface in interfaces(): -        # check if the requested address type is configured at all -        if addr_type not in ifaddresses(interface).keys(): -            continue - -        # An interface can have multiple addresses, but some software components -        # only support the primary address :( -        if primary: -            ip = ifaddresses(interface)[addr_type][0]['addr'] -            if ip_address(ip) in ip_network(subnet): -                return True -        else: -            # Check every assigned IP address if it is connected to the subnet -            # in question -            for ip in ifaddresses(interface)[addr_type]: -                # remove interface extension (e.g. %eth0) that gets thrown on the end of _some_ addrs -                addr = ip['addr'].split('%')[0] -                if ip_address(addr) in ip_network(subnet): -                    return True - -    return False - - -def assert_boolean(b): -    if int(b) not in (0, 1): -        raise ValueError(f'Value {b} out of range') - - -def assert_range(value, lower=0, count=3): -    if int(value, 16) not in range(lower, lower+count): -        raise ValueError("Value out of range") - - -def assert_list(s, l): -    if s not in l: -        o = ' or '.join([f'"{n}"' for n in l]) -        raise ValueError(f'state must be {o}, got {s}') - - -def assert_number(n): -    if not str(n).isnumeric(): -        raise ValueError(f'{n} must be a number') - - -def assert_positive(n, smaller=0): -    assert_number(n) -    if int(n) < smaller: -        raise ValueError(f'{n} is smaller than {smaller}') - - -def assert_mtu(mtu, ifname): -    assert_number(mtu) - -    import json -    from vyos.utils.process import cmd -    out = cmd(f'ip -j -d link show dev {ifname}') -    # [{"ifindex":2,"ifname":"eth0","flags":["BROADCAST","MULTICAST","UP","LOWER_UP"],"mtu":1500,"qdisc":"pfifo_fast","operstate":"UP","linkmode":"DEFAULT","group":"default","txqlen":1000,"link_type":"ether","address":"08:00:27:d9:5b:04","broadcast":"ff:ff:ff:ff:ff:ff","promiscuity":0,"min_mtu":46,"max_mtu":16110,"inet6_addr_gen_mode":"none","num_tx_queues":1,"num_rx_queues":1,"gso_max_size":65536,"gso_max_segs":65535}] -    parsed = json.loads(out)[0] -    min_mtu = int(parsed.get('min_mtu', '0')) -    # cur_mtu = parsed.get('mtu',0), -    max_mtu = int(parsed.get('max_mtu', '0')) -    cur_mtu = int(mtu) - -    if (min_mtu and cur_mtu < min_mtu) or cur_mtu < 68: -        raise ValueError(f'MTU is too small for interface "{ifname}": {mtu} < {min_mtu}') -    if (max_mtu and cur_mtu > max_mtu) or cur_mtu > 65536: -        raise ValueError(f'MTU is too small for interface "{ifname}": {mtu} > {max_mtu}') - - -def assert_mac(m): -    split = m.split(':') -    size = len(split) - -    # a mac address consits out of 6 octets -    if size != 6: -        raise ValueError(f'wrong number of MAC octets ({size}): {m}') - -    octets = [] -    try: -        for octet in split: -            octets.append(int(octet, 16)) -    except ValueError: -        raise ValueError(f'invalid hex number "{octet}" in : {m}') - -    # validate against the first mac address byte if it's a multicast -    # address -    if octets[0] & 1: -        raise ValueError(f'{m} is a multicast MAC address') - -    # overall mac address is not allowed to be 00:00:00:00:00:00 -    if sum(octets) == 0: -        raise ValueError('00:00:00:00:00:00 is not a valid MAC address') - -    if octets[:5] == (0, 0, 94, 0, 1): -        raise ValueError(f'{m} is a VRRP MAC address') - -def has_address_configured(conf, intf): -    """ -    Checks if interface has an address configured. -    Checks the following config nodes: -    'address', 'ipv6 address eui64', 'ipv6 address autoconf' - -    Returns True if interface has address configured, False if it doesn't. -    """ -    from vyos.ifconfig import Section -    ret = False - -    old_level = conf.get_level() -    conf.set_level([]) - -    intfpath = 'interfaces ' + Section.get_config_path(intf) -    if ( conf.exists(f'{intfpath} address') or -            conf.exists(f'{intfpath} ipv6 address autoconf') or -            conf.exists(f'{intfpath} ipv6 address eui64') ): -        ret = True - -    conf.set_level(old_level) -    return ret - -def has_vrf_configured(conf, intf): -    """ -    Checks if interface has a VRF configured. - -    Returns True if interface has VRF configured, False if it doesn't. -    """ -    from vyos.ifconfig import Section -    ret = False - -    old_level = conf.get_level() -    conf.set_level([]) - -    tmp = ['interfaces', Section.get_config_path(intf), 'vrf'] -    if conf.exists(tmp): -        ret = True - -    conf.set_level(old_level) -    return ret - -def is_wireguard_key_pair(private_key: str, public_key:str) -> bool: -    """ -     Checks if public/private keys are keypair -    :param private_key: Wireguard private key -    :type private_key: str -    :param public_key: Wireguard public key -    :type public_key: str -    :return: If public/private keys are keypair returns True else False -    :rtype: bool -    """ -    from vyos.utils.process import cmd -    gen_public_key = cmd('wg pubkey', input=private_key) -    if gen_public_key == public_key: -        return True -    else: -        return False diff --git a/python/vyos/xml_ref/__init__.py b/python/vyos/xml_ref/__init__.py index ad2130dca..bf434865d 100644 --- a/python/vyos/xml_ref/__init__.py +++ b/python/vyos/xml_ref/__init__.py @@ -13,8 +13,12 @@  # You should have received a copy of the GNU Lesser General Public License  # along with this library.  If not, see <http://www.gnu.org/licenses/>. +from typing import Optional, Union, TYPE_CHECKING  from vyos.xml_ref import definition +if TYPE_CHECKING: +    from vyos.config import ConfigDict +  def load_reference(cache=[]):      if cache:          return cache[0] @@ -23,11 +27,15 @@ def load_reference(cache=[]):      try:          from vyos.xml_ref.cache import reference -        xml.define(reference) -        cache.append(xml)      except Exception:          raise ImportError('no xml reference cache !!') +    if not reference: +        raise ValueError('empty xml reference cache !!') + +    xml.define(reference) +    cache.append(xml) +      return xml  def is_tag(path: list) -> bool: @@ -48,12 +56,12 @@ def is_leaf(path: list) -> bool:  def cli_defined(path: list, node: str, non_local=False) -> bool:      return load_reference().cli_defined(path, node, non_local=non_local) -def from_source(d: dict, path: list) -> bool: -    return load_reference().from_source(d, path) -  def component_version() -> dict:      return load_reference().component_version() +def default_value(path: list) -> Optional[Union[str, list]]: +    return load_reference().default_value(path) +  def multi_to_list(rpath: list, conf: dict) -> dict:      return load_reference().multi_to_list(rpath, conf) @@ -68,8 +76,8 @@ def relative_defaults(rpath: list, conf: dict, get_first_key=False,                                                get_first_key=get_first_key,                                                recursive=recursive) -def merge_defaults(path: list, conf: dict, get_first_key=False, -                   recursive=False) -> dict: -    return load_reference().merge_defaults(path, conf, -                                           get_first_key=get_first_key, -                                           recursive=recursive) +def from_source(d: dict, path: list) -> bool: +    return definition.from_source(d, path) + +def ext_dict_merge(source: dict, destination: Union[dict, 'ConfigDict']): +    return definition.ext_dict_merge(source, destination) diff --git a/python/vyos/xml_ref/definition.py b/python/vyos/xml_ref/definition.py index d95d580e2..c90c5ddbc 100644 --- a/python/vyos/xml_ref/definition.py +++ b/python/vyos/xml_ref/definition.py @@ -20,6 +20,45 @@ from typing import Optional, Union, Any, TYPE_CHECKING  if TYPE_CHECKING:      from vyos.config import ConfigDict +def set_source_recursive(o: Union[dict, str, list], b: bool): +    d = {} +    if not isinstance(o, dict): +        d = {'_source': b} +    else: +        for k, v in o.items(): +            d[k] = set_source_recursive(v, b) +        d |= {'_source': b} +    return d + +def source_dict_merge(src: dict, dest: dict): +    from copy import deepcopy +    dst = deepcopy(dest) +    from_src = {} + +    for key, value in src.items(): +        if key not in dst: +            dst[key] = value +            from_src[key] = set_source_recursive(value, True) +        elif isinstance(src[key], dict): +            dst[key], f = source_dict_merge(src[key], dst[key]) +            f |= {'_source': False} +            from_src[key] = f + +    return dst, from_src + +def ext_dict_merge(src: dict, dest: Union[dict, 'ConfigDict']): +    d, f = source_dict_merge(src, dest) +    if hasattr(d, '_from_defaults'): +        setattr(d, '_from_defaults', f) +    return d + +def from_source(d: dict, path: list) -> bool: +    for key in path: +        d  = d[key] if key in d else {} +        if not d or not isinstance(d, dict): +            return False +    return d.get('_source', False) +  class Xml:      def __init__(self):          self.ref = {} @@ -123,7 +162,7 @@ class Xml:      def component_version(self) -> dict:          d = {} -        for k, v in self.ref['component_version']: +        for k, v in self.ref['component_version'].items():              d[k] = int(v)          return d @@ -153,6 +192,15 @@ class Xml:              return default.split()          return default +    def default_value(self, path: list) -> Optional[Union[str, list]]: +        d = self._get_ref_path(path) +        default = self._get_default_value(d) +        if default is None: +            return None +        if self._is_multi_node(d) or self._is_tag_node(d): +            return default.split() +        return default +      def get_defaults(self, path: list, get_first_key=False, recursive=False) -> dict:          """Return dict containing default values below path @@ -212,43 +260,6 @@ class Xml:              return False          return True -    def _set_source_recursive(self, o: Union[dict, str, list], b: bool): -        d = {} -        if not isinstance(o, dict): -            d = {'_source': b} -        else: -            for k, v in o.items(): -                d[k] = self._set_source_recursive(v, b) -            d |= {'_source': b} -        return d - -    # use local copy of function in module configdict, to avoid circular -    # import -    # -    # extend dict_merge to keep track of keys only in source -    def _dict_merge(self, source, destination): -        from copy import deepcopy -        dest = deepcopy(destination) -        from_source = {} - -        for key, value in source.items(): -            if key not in dest: -                dest[key] = value -                from_source[key] = self._set_source_recursive(value, True) -            elif isinstance(source[key], dict): -                dest[key], f = self._dict_merge(source[key], dest[key]) -                f |= {'_source': False} -                from_source[key] = f - -        return dest, from_source - -    def from_source(self, d: dict, path: list) -> bool: -        for key in path: -            d  = d[key] if key in d else {} -            if not d or not isinstance(d, dict): -                return False -        return d.get('_source', False) -      def _relative_defaults(self, rpath: list, conf: dict, recursive=False) -> dict:          res: dict = {}          res = self.get_defaults(rpath, recursive=recursive, @@ -289,17 +300,3 @@ class Xml:                  res = {}          return res - -    def merge_defaults(self, path: list, conf: Union[dict, 'ConfigDict'], -                       get_first_key=False, recursive=False) -> dict: -        """Return config dict with defaults non-destructively merged - -        This merges non-recursive defaults relative to the config dict. -        """ -        d = self.relative_defaults(path, conf, get_first_key=get_first_key, -                                   recursive=recursive) -        d, f = self._dict_merge(d, conf) -        d = type(conf)(d) -        if hasattr(d, '_from_defaults'): -            setattr(d, '_from_defaults', f) -        return d | 
