diff options
Diffstat (limited to 'python')
| -rw-r--r-- | python/vyos/configdict.py | 64 | ||||
| -rw-r--r-- | python/vyos/defaults.py | 4 | ||||
| -rw-r--r-- | python/vyos/ifconfig/bond.py | 4 | ||||
| -rw-r--r-- | python/vyos/ifconfig/bridge.py | 4 | ||||
| -rw-r--r-- | python/vyos/ifconfig/ethernet.py | 2 | ||||
| -rw-r--r-- | python/vyos/ifconfig/interface.py | 87 | ||||
| -rw-r--r-- | python/vyos/ifconfig/pppoe.py | 2 | ||||
| -rw-r--r-- | python/vyos/ifconfig/tunnel.py | 2 | ||||
| -rw-r--r-- | python/vyos/ipsec.py | 16 | ||||
| -rw-r--r-- | python/vyos/qos/base.py | 3 | ||||
| -rw-r--r-- | python/vyos/template.py | 2 | ||||
| -rw-r--r-- | python/vyos/utils/__init__.py | 1 | ||||
| -rw-r--r-- | python/vyos/utils/assertion.py | 81 | ||||
| -rw-r--r-- | python/vyos/utils/network.py | 186 | ||||
| -rw-r--r-- | python/vyos/validate.py | 304 | 
15 files changed, 386 insertions, 376 deletions
| diff --git a/python/vyos/configdict.py b/python/vyos/configdict.py index f642d38f2..2a47e88f9 100644 --- a/python/vyos/configdict.py +++ b/python/vyos/configdict.py @@ -177,24 +177,6 @@ def get_removed_vlans(conf, path, dict):      return dict -def T2665_set_dhcpv6pd_defaults(config_dict): -    """ Properly configure DHCPv6 default options in the dictionary. If there is -    no DHCPv6 configured at all, it is safe to remove the entire configuration. -    """ -    # As this is the same for every interface type it is safe to assume this -    # for ethernet -    pd_defaults = defaults(['interfaces', 'ethernet', 'dhcpv6-options', 'pd']) - -    # Implant default dictionary for DHCPv6-PD instances -    if dict_search('dhcpv6_options.pd.length', config_dict): -        del config_dict['dhcpv6_options']['pd']['length'] - -    for pd in (dict_search('dhcpv6_options.pd', config_dict) or []): -        config_dict['dhcpv6_options']['pd'][pd] = dict_merge(pd_defaults, -            config_dict['dhcpv6_options']['pd'][pd]) - -    return config_dict -  def is_member(conf, interface, intftype=None):      """      Checks if passed interface is member of other interface of specified type. @@ -263,6 +245,48 @@ def is_mirror_intf(conf, interface, direction=None):      return ret_val +def has_address_configured(conf, intf): +    """ +    Checks if interface has an address configured. +    Checks the following config nodes: +    'address', 'ipv6 address eui64', 'ipv6 address autoconf' + +    Returns True if interface has address configured, False if it doesn't. +    """ +    from vyos.ifconfig import Section +    ret = False + +    old_level = conf.get_level() +    conf.set_level([]) + +    intfpath = 'interfaces ' + Section.get_config_path(intf) +    if ( conf.exists(f'{intfpath} address') or +            conf.exists(f'{intfpath} ipv6 address autoconf') or +            conf.exists(f'{intfpath} ipv6 address eui64') ): +        ret = True + +    conf.set_level(old_level) +    return ret + +def has_vrf_configured(conf, intf): +    """ +    Checks if interface has a VRF configured. + +    Returns True if interface has VRF configured, False if it doesn't. +    """ +    from vyos.ifconfig import Section +    ret = False + +    old_level = conf.get_level() +    conf.set_level([]) + +    tmp = ['interfaces', Section.get_config_path(intf), 'vrf'] +    if conf.exists(tmp): +        ret = True + +    conf.set_level(old_level) +    return ret +  def has_vlan_subinterface_configured(conf, intf):      """      Checks if interface has an VLAN subinterface configured. @@ -455,6 +479,10 @@ def get_interface_dict(config, base, ifname='', recursive_defaults=True):      dhcp = is_node_changed(config, base + [ifname, 'dhcp-options'])      if dhcp: dict.update({'dhcp_options_changed' : {}}) +    # Changine interface VRF assignemnts require a DHCP restart, too +    dhcp = is_node_changed(config, base + [ifname, 'vrf']) +    if dhcp: dict.update({'dhcp_options_changed' : {}}) +      # Some interfaces come with a source_interface which must also not be part      # of any other bond or bridge interface as it is exclusivly assigned as the      # Kernels "lower" interface to this new "virtual/upper" interface. diff --git a/python/vyos/defaults.py b/python/vyos/defaults.py index d4ffc249e..a5314790d 100644 --- a/python/vyos/defaults.py +++ b/python/vyos/defaults.py @@ -32,7 +32,9 @@ directories = {    'api_schema': f'{base_dir}/services/api/graphql/graphql/schema/',    'api_client_op': f'{base_dir}/services/api/graphql/graphql/client_op/',    'api_templates': f'{base_dir}/services/api/graphql/session/templates/', -  'vyos_udev_dir' : '/run/udev/vyos' +  'vyos_udev_dir' : '/run/udev/vyos', +  'isc_dhclient_dir' : '/run/dhclient', +  'dhcp6_client_dir' : '/run/dhcp6c',  }  config_status = '/tmp/vyos-config-status' diff --git a/python/vyos/ifconfig/bond.py b/python/vyos/ifconfig/bond.py index e88f860be..d1d7d48c4 100644 --- a/python/vyos/ifconfig/bond.py +++ b/python/vyos/ifconfig/bond.py @@ -18,8 +18,8 @@ import os  from vyos.ifconfig.interface import Interface  from vyos.utils.process import cmd  from vyos.utils.dict import dict_search -from vyos.validate import assert_list -from vyos.validate import assert_positive +from vyos.utils.assertion import assert_list +from vyos.utils.assertion import assert_positive  @Interface.register  class BondIf(Interface): diff --git a/python/vyos/ifconfig/bridge.py b/python/vyos/ifconfig/bridge.py index b103b49d8..b29e71394 100644 --- a/python/vyos/ifconfig/bridge.py +++ b/python/vyos/ifconfig/bridge.py @@ -17,8 +17,8 @@ from netifaces import interfaces  import json  from vyos.ifconfig.interface import Interface -from vyos.validate import assert_boolean -from vyos.validate import assert_positive +from vyos.utils.assertion import assert_boolean +from vyos.utils.assertion import assert_positive  from vyos.utils.process import cmd  from vyos.utils.dict import dict_search  from vyos.configdict import get_vlan_ids diff --git a/python/vyos/ifconfig/ethernet.py b/python/vyos/ifconfig/ethernet.py index 4ff044c23..24ce3a803 100644 --- a/python/vyos/ifconfig/ethernet.py +++ b/python/vyos/ifconfig/ethernet.py @@ -23,7 +23,7 @@ from vyos.ifconfig.interface import Interface  from vyos.utils.dict import dict_search  from vyos.utils.file import read_file  from vyos.utils.process import run -from vyos.validate import assert_list +from vyos.utils.assertion import assert_list  @Interface.register  class EthernetIf(Interface): diff --git a/python/vyos/ifconfig/interface.py b/python/vyos/ifconfig/interface.py index 99ddb2021..75c5f27a9 100644 --- a/python/vyos/ifconfig/interface.py +++ b/python/vyos/ifconfig/interface.py @@ -31,6 +31,7 @@ from vyos import ConfigError  from vyos.configdict import list_diff  from vyos.configdict import dict_merge  from vyos.configdict import get_vlan_ids +from vyos.defaults import directories  from vyos.template import render  from vyos.utils.network import mac2eui64  from vyos.utils.dict import dict_search @@ -40,14 +41,14 @@ from vyos.utils.network import get_interface_namespace  from vyos.utils.process import is_systemd_service_active  from vyos.template import is_ipv4  from vyos.template import is_ipv6 -from vyos.validate import is_intf_addr_assigned -from vyos.validate import is_ipv6_link_local -from vyos.validate import assert_boolean -from vyos.validate import assert_list -from vyos.validate import assert_mac -from vyos.validate import assert_mtu -from vyos.validate import assert_positive -from vyos.validate import assert_range +from vyos.utils.network import is_intf_addr_assigned +from vyos.utils.network import is_ipv6_link_local +from vyos.utils.assertion import assert_boolean +from vyos.utils.assertion import assert_list +from vyos.utils.assertion import assert_mac +from vyos.utils.assertion import assert_mtu +from vyos.utils.assertion import assert_positive +from vyos.utils.assertion import assert_range  from vyos.ifconfig.control import Control  from vyos.ifconfig.vrrp import VRRP @@ -1240,44 +1241,49 @@ class Interface(Control):              raise ValueError()          ifname = self.ifname -        config_base = r'/var/lib/dhcp/dhclient' -        config_file = f'{config_base}_{ifname}.conf' -        options_file = f'{config_base}_{ifname}.options' -        pid_file = f'{config_base}_{ifname}.pid' -        lease_file = f'{config_base}_{ifname}.leases' +        config_base = directories['isc_dhclient_dir'] + '/dhclient' +        dhclient_config_file = f'{config_base}_{ifname}.conf' +        dhclient_lease_file = f'{config_base}_{ifname}.leases' +        systemd_override_file = f'/run/systemd/system/dhclient@{ifname}.service.d/10-override.conf'          systemd_service = f'dhclient@{ifname}.service' +        # Rendered client configuration files require the apsolute config path +        self.config['isc_dhclient_dir'] = directories['isc_dhclient_dir'] +          # 'up' check is mandatory b/c even if the interface is A/D, as soon as          # the DHCP client is started the interface will be placed in u/u state.          # This is not what we intended to do when disabling an interface. -        if enable and 'disable' not in self._config: -            if dict_search('dhcp_options.host_name', self._config) == None: +        if enable and 'disable' not in self.config: +            if dict_search('dhcp_options.host_name', self.config) == None:                  # read configured system hostname.                  # maybe change to vyos hostd client ???                  hostname = 'vyos'                  with open('/etc/hostname', 'r') as f:                      hostname = f.read().rstrip('\n')                      tmp = {'dhcp_options' : { 'host_name' : hostname}} -                    self._config = dict_merge(tmp, self._config) +                    self.config = dict_merge(tmp, self.config) + +            render(systemd_override_file, 'dhcp-client/override.conf.j2', self.config) +            render(dhclient_config_file, 'dhcp-client/ipv4.j2', self.config) -            render(options_file, 'dhcp-client/daemon-options.j2', self._config) -            render(config_file, 'dhcp-client/ipv4.j2', self._config) +            # Reload systemd unit definitons as some options are dynamically generated +            self._cmd('systemctl daemon-reload')              # When the DHCP client is restarted a brief outage will occur, as              # the old lease is released a new one is acquired (T4203). We will              # only restart DHCP client if it's option changed, or if it's not              # running, but it should be running (e.g. on system startup) -            if 'dhcp_options_changed' in self._config or not is_systemd_service_active(systemd_service): +            if 'dhcp_options_changed' in self.config or not is_systemd_service_active(systemd_service):                  return self._cmd(f'systemctl restart {systemd_service}') -            return None          else:              if is_systemd_service_active(systemd_service):                  self._cmd(f'systemctl stop {systemd_service}')              # cleanup old config files -            for file in [config_file, options_file, pid_file, lease_file]: +            for file in [dhclient_config_file, systemd_override_file, dhclient_lease_file]:                  if os.path.isfile(file):                      os.remove(file) +        return None      def set_dhcpv6(self, enable):          """ @@ -1287,13 +1293,20 @@ class Interface(Control):              raise ValueError()          ifname = self.ifname -        config_file = f'/run/dhcp6c/dhcp6c.{ifname}.conf' -        options_file = f'/run/dhcp6c/dhcp6c.{ifname}.options' +        config_base = directories['dhcp6_client_dir'] +        config_file = f'{config_base}/dhcp6c.{ifname}.conf' +        systemd_override_file = f'/run/systemd/system/dhcp6c@{ifname}.service.d/10-override.conf'          systemd_service = f'dhcp6c@{ifname}.service' -        if enable and 'disable' not in self._config: -            render(options_file, 'dhcp-client/dhcp6c_daemon-options.j2', self._config) -            render(config_file, 'dhcp-client/ipv6.j2', self._config) +        # Rendered client configuration files require the apsolute config path +        self.config['dhcp6_client_dir'] = directories['dhcp6_client_dir'] + +        if enable and 'disable' not in self.config: +            render(systemd_override_file, 'dhcp-client/ipv6.override.conf.j2', self.config) +            render(config_file, 'dhcp-client/ipv6.j2', self.config) + +            # Reload systemd unit definitons as some options are dynamically generated +            self._cmd('systemctl daemon-reload')              # We must ignore any return codes. This is required to enable              # DHCPv6-PD for interfaces which are yet not up and running. @@ -1304,26 +1317,28 @@ class Interface(Control):              if os.path.isfile(config_file):                  os.remove(config_file) +        return None +      def set_mirror_redirect(self):          # Please refer to the document for details          #   - https://man7.org/linux/man-pages/man8/tc.8.html          #   - https://man7.org/linux/man-pages/man8/tc-mirred.8.html          # Depening if we are the source or the target interface of the port          # mirror we need to setup some variables. -        source_if = self._config['ifname'] +        source_if = self.config['ifname']          mirror_config = None -        if 'mirror' in self._config: -            mirror_config = self._config['mirror'] -        if 'is_mirror_intf' in self._config: -            source_if = next(iter(self._config['is_mirror_intf'])) -            mirror_config = self._config['is_mirror_intf'][source_if].get('mirror', None) +        if 'mirror' in self.config: +            mirror_config = self.config['mirror'] +        if 'is_mirror_intf' in self.config: +            source_if = next(iter(self.config['is_mirror_intf'])) +            mirror_config = self.config['is_mirror_intf'][source_if].get('mirror', None)          redirect_config = None          # clear existing ingess - ignore errors (e.g. "Error: Cannot find specified          # qdisc on specified device") - we simply cleanup all stuff here -        if not 'traffic_policy' in self._config: +        if not 'traffic_policy' in self.config:              self._popen(f'tc qdisc del dev {source_if} parent ffff: 2>/dev/null');              self._popen(f'tc qdisc del dev {source_if} parent 1: 2>/dev/null'); @@ -1347,11 +1362,11 @@ class Interface(Control):                  if err: print('tc qdisc(filter for mirror port failed')          # Apply interface traffic redirection policy -        elif 'redirect' in self._config: +        elif 'redirect' in self.config:              _, err = self._popen(f'tc qdisc add dev {source_if} handle ffff: ingress')              if err: print(f'tc qdisc add for redirect failed!') -            target_if = self._config['redirect'] +            target_if = self.config['redirect']              _, err = self._popen(f'tc filter add dev {source_if} parent ffff: protocol '\                                   f'all prio 10 u32 match u32 0 0 flowid 1:1 action mirred '\                                   f'egress redirect dev {target_if}') @@ -1370,7 +1385,7 @@ class Interface(Control):          # Cache the configuration - it will be reused inside e.g. DHCP handler          # XXX: maybe pass the option via __init__ in the future and rename this          # method to apply()? -        self._config = config +        self.config = config          # Change interface MAC address - re-set to real hardware address (hw-id)          # if custom mac is removed. Skip if bond member. diff --git a/python/vyos/ifconfig/pppoe.py b/python/vyos/ifconfig/pppoe.py index fd4590beb..febf1452d 100644 --- a/python/vyos/ifconfig/pppoe.py +++ b/python/vyos/ifconfig/pppoe.py @@ -14,7 +14,7 @@  # License along with this library.  If not, see <http://www.gnu.org/licenses/>.  from vyos.ifconfig.interface import Interface -from vyos.validate import assert_range +from vyos.utils.assertion import assert_range  from vyos.utils.network import get_interface_config  @Interface.register diff --git a/python/vyos/ifconfig/tunnel.py b/python/vyos/ifconfig/tunnel.py index fb2f38e2b..9ba7b31a6 100644 --- a/python/vyos/ifconfig/tunnel.py +++ b/python/vyos/ifconfig/tunnel.py @@ -18,7 +18,7 @@  from vyos.ifconfig.interface import Interface  from vyos.utils.dict import dict_search -from vyos.validate import assert_list +from vyos.utils.assertion import assert_list  def enable_to_on(value):      if value == 'enable': diff --git a/python/vyos/ipsec.py b/python/vyos/ipsec.py index bb5611025..4603aab22 100644 --- a/python/vyos/ipsec.py +++ b/python/vyos/ipsec.py @@ -33,9 +33,11 @@ def get_vici_sas():          session = vici_session()      except Exception:          raise ViciInitiateError("IPsec not initialized") -    sas = list(session.list_sas()) -    return sas - +    try: +        sas = list(session.list_sas()) +        return sas +    except Exception: +        raise ViciCommandError(f'Failed to get SAs')  def get_vici_connections():      from vici import Session as vici_session @@ -44,9 +46,11 @@ def get_vici_connections():          session = vici_session()      except Exception:          raise ViciInitiateError("IPsec not initialized") -    connections = list(session.list_conns()) -    return connections - +    try: +        connections = list(session.list_conns()) +        return connections +    except Exception: +        raise ViciCommandError(f'Failed to get connections')  def get_vici_sas_by_name(ike_name: str, tunnel: str) -> list:      """ diff --git a/python/vyos/qos/base.py b/python/vyos/qos/base.py index 6c5a3d79c..1eac0d1ee 100644 --- a/python/vyos/qos/base.py +++ b/python/vyos/qos/base.py @@ -107,7 +107,8 @@ class QoSBase:              queue_limit = dict_search('queue_limit', config)              for ii in range(1, 4): -                tmp = f'tc qdisc replace dev {self._interface} parent {handle:x}:{ii:x} pfifo limit {queue_limit}' +                tmp = f'tc qdisc replace dev {self._interface} parent {handle:x}:{ii:x} pfifo' +                if queue_limit: tmp += f' limit {queue_limit}'                  self._cmd(tmp)          elif queue_type == 'fair-queue': diff --git a/python/vyos/template.py b/python/vyos/template.py index 7d1c3970f..6469623fd 100644 --- a/python/vyos/template.py +++ b/python/vyos/template.py @@ -420,7 +420,7 @@ def get_dhcp_router(interface):      Returns False of no router is found, returns the IP address as string if      a router is found.      """ -    lease_file = f'/var/lib/dhcp/dhclient_{interface}.leases' +    lease_file = directories['isc_dhclient_dir'] + f'/dhclient_{interface}.leases'      if not os.path.exists(lease_file):          return None diff --git a/python/vyos/utils/__init__.py b/python/vyos/utils/__init__.py index f2783113a..12ef2d3b8 100644 --- a/python/vyos/utils/__init__.py +++ b/python/vyos/utils/__init__.py @@ -13,6 +13,7 @@  # You should have received a copy of the GNU Lesser General Public  # License along with this library.  If not, see <http://www.gnu.org/licenses/>. +from vyos.utils import assertion  from vyos.utils import auth  from vyos.utils import boot  from vyos.utils import commit diff --git a/python/vyos/utils/assertion.py b/python/vyos/utils/assertion.py new file mode 100644 index 000000000..1aaa54dff --- /dev/null +++ b/python/vyos/utils/assertion.py @@ -0,0 +1,81 @@ +# Copyright 2023 VyOS maintainers and contributors <maintainers@vyos.io> +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library.  If not, see <http://www.gnu.org/licenses/>. + +def assert_boolean(b): +    if int(b) not in (0, 1): +        raise ValueError(f'Value {b} out of range') + +def assert_range(value, lower=0, count=3): +    if int(value, 16) not in range(lower, lower+count): +        raise ValueError("Value out of range") + +def assert_list(s, l): +    if s not in l: +        o = ' or '.join([f'"{n}"' for n in l]) +        raise ValueError(f'state must be {o}, got {s}') + +def assert_number(n): +    if not str(n).isnumeric(): +        raise ValueError(f'{n} must be a number') + +def assert_positive(n, smaller=0): +    assert_number(n) +    if int(n) < smaller: +        raise ValueError(f'{n} is smaller than {smaller}') + +def assert_mtu(mtu, ifname): +    assert_number(mtu) + +    import json +    from vyos.utils.process import cmd +    out = cmd(f'ip -j -d link show dev {ifname}') +    # [{"ifindex":2,"ifname":"eth0","flags":["BROADCAST","MULTICAST","UP","LOWER_UP"],"mtu":1500,"qdisc":"pfifo_fast","operstate":"UP","linkmode":"DEFAULT","group":"default","txqlen":1000,"link_type":"ether","address":"08:00:27:d9:5b:04","broadcast":"ff:ff:ff:ff:ff:ff","promiscuity":0,"min_mtu":46,"max_mtu":16110,"inet6_addr_gen_mode":"none","num_tx_queues":1,"num_rx_queues":1,"gso_max_size":65536,"gso_max_segs":65535}] +    parsed = json.loads(out)[0] +    min_mtu = int(parsed.get('min_mtu', '0')) +    # cur_mtu = parsed.get('mtu',0), +    max_mtu = int(parsed.get('max_mtu', '0')) +    cur_mtu = int(mtu) + +    if (min_mtu and cur_mtu < min_mtu) or cur_mtu < 68: +        raise ValueError(f'MTU is too small for interface "{ifname}": {mtu} < {min_mtu}') +    if (max_mtu and cur_mtu > max_mtu) or cur_mtu > 65536: +        raise ValueError(f'MTU is too small for interface "{ifname}": {mtu} > {max_mtu}') + +def assert_mac(m): +    split = m.split(':') +    size = len(split) + +    # a mac address consits out of 6 octets +    if size != 6: +        raise ValueError(f'wrong number of MAC octets ({size}): {m}') + +    octets = [] +    try: +        for octet in split: +            octets.append(int(octet, 16)) +    except ValueError: +        raise ValueError(f'invalid hex number "{octet}" in : {m}') + +    # validate against the first mac address byte if it's a multicast +    # address +    if octets[0] & 1: +        raise ValueError(f'{m} is a multicast MAC address') + +    # overall mac address is not allowed to be 00:00:00:00:00:00 +    if sum(octets) == 0: +        raise ValueError('00:00:00:00:00:00 is not a valid MAC address') + +    if octets[:5] == (0, 0, 94, 0, 1): +        raise ValueError(f'{m} is a VRRP MAC address') diff --git a/python/vyos/utils/network.py b/python/vyos/utils/network.py index 3786caf26..3f9a3ef4b 100644 --- a/python/vyos/utils/network.py +++ b/python/vyos/utils/network.py @@ -13,7 +13,15 @@  # You should have received a copy of the GNU Lesser General Public  # License along with this library.  If not, see <http://www.gnu.org/licenses/>. -import os +def _are_same_ip(one, two): +    from socket import AF_INET +    from socket import AF_INET6 +    from socket import inet_pton +    from vyos.template import is_ipv4 +    # compare the binary representation of the IP +    f_one = AF_INET if is_ipv4(one) else AF_INET6 +    s_two = AF_INET if is_ipv4(two) else AF_INET6 +    return inet_pton(f_one, one) == inet_pton(f_one, two)  def get_protocol_by_name(protocol_name):      """Get protocol number by protocol name @@ -48,6 +56,7 @@ def get_interface_config(interface):      """ Returns the used encapsulation protocol for given interface.          If interface does not exist, None is returned.      """ +    import os      if not os.path.exists(f'/sys/class/net/{interface}'):          return None      from json import loads @@ -59,6 +68,7 @@ def get_interface_address(interface):      """ Returns the used encapsulation protocol for given interface.          If interface does not exist, None is returned.      """ +    import os      if not os.path.exists(f'/sys/class/net/{interface}'):          return None      from json import loads @@ -85,7 +95,6 @@ def get_interface_namespace(iface):              if iface == tmp["ifname"]:                  return netns -  def is_wwan_connected(interface):      """ Determine if a given WWAN interface, e.g. wwan0 is connected to the      carrier network or not """ @@ -110,6 +119,7 @@ def is_wwan_connected(interface):  def get_bridge_fdb(interface):      """ Returns the forwarding database entries for a given interface """ +    import os      if not os.path.exists(f'/sys/class/net/{interface}'):          return None      from json import loads @@ -211,3 +221,175 @@ def is_listen_port_bind_service(port: int, service: str) -> bool:          if service == pid_name and port == pid_port:              return True      return False + +def is_ipv6_link_local(addr): +    """ Check if addrsss is an IPv6 link-local address. Returns True/False """ +    from ipaddress import ip_interface +    from vyos.template import is_ipv6 +    addr = addr.split('%')[0] +    if is_ipv6(addr): +        if ip_interface(addr).is_link_local: +            return True + +    return False + +def is_addr_assigned(ip_address, vrf=None) -> bool: +    """ Verify if the given IPv4/IPv6 address is assigned to any interface """ +    from netifaces import interfaces +    from vyos.utils.network import get_interface_config +    from vyos.utils.dict import dict_search + +    for interface in interfaces(): +        # Check if interface belongs to the requested VRF, if this is not the +        # case there is no need to proceed with this data set - continue loop +        # with next element +        tmp = get_interface_config(interface) +        if dict_search('master', tmp) != vrf: +            continue + +        if is_intf_addr_assigned(interface, ip_address): +            return True + +    return False + +def is_intf_addr_assigned(intf, address) -> bool: +    """ +    Verify if the given IPv4/IPv6 address is assigned to specific interface. +    It can check both a single IP address (e.g. 192.0.2.1 or a assigned CIDR +    address 192.0.2.1/24. +    """ +    from vyos.template import is_ipv4 + +    from netifaces import ifaddresses +    from netifaces import AF_INET +    from netifaces import AF_INET6 + +    # check if the requested address type is configured at all +    # { +    # 17: [{'addr': '08:00:27:d9:5b:04', 'broadcast': 'ff:ff:ff:ff:ff:ff'}], +    # 2:  [{'addr': '10.0.2.15', 'netmask': '255.255.255.0', 'broadcast': '10.0.2.255'}], +    # 10: [{'addr': 'fe80::a00:27ff:fed9:5b04%eth0', 'netmask': 'ffff:ffff:ffff:ffff::'}] +    # } +    try: +        addresses = ifaddresses(intf) +    except ValueError as e: +        print(e) +        return False + +    # determine IP version (AF_INET or AF_INET6) depending on passed address +    addr_type = AF_INET if is_ipv4(address) else AF_INET6 + +    # Check every IP address on this interface for a match +    netmask = None +    if '/' in address: +        address, netmask = address.split('/') +    for ip in addresses.get(addr_type, []): +        # ip can have the interface name in the 'addr' field, we need to remove it +        # {'addr': 'fe80::a00:27ff:fec5:f821%eth2', 'netmask': 'ffff:ffff:ffff:ffff::'} +        ip_addr = ip['addr'].split('%')[0] + +        if not _are_same_ip(address, ip_addr): +            continue + +        # we do not have a netmask to compare against, they are the same +        if not netmask: +            return True + +        prefixlen = '' +        if is_ipv4(ip_addr): +            prefixlen = sum([bin(int(_)).count('1') for _ in ip['netmask'].split('.')]) +        else: +            prefixlen = sum([bin(int(_,16)).count('1') for _ in ip['netmask'].split('/')[0].split(':') if _]) + +        if str(prefixlen) == netmask: +            return True + +    return False + +def is_loopback_addr(addr): +    """ Check if supplied IPv4/IPv6 address is a loopback address """ +    from ipaddress import ip_address +    return ip_address(addr).is_loopback + +def is_wireguard_key_pair(private_key: str, public_key:str) -> bool: +    """ +     Checks if public/private keys are keypair +    :param private_key: Wireguard private key +    :type private_key: str +    :param public_key: Wireguard public key +    :type public_key: str +    :return: If public/private keys are keypair returns True else False +    :rtype: bool +    """ +    from vyos.utils.process import cmd +    gen_public_key = cmd('wg pubkey', input=private_key) +    if gen_public_key == public_key: +        return True +    else: +        return False + +def is_subnet_connected(subnet, primary=False): +    """ +    Verify is the given IPv4/IPv6 subnet is connected to any interface on this +    system. + +    primary check if the subnet is reachable via the primary IP address of this +    interface, or in other words has a broadcast address configured. ISC DHCP +    for instance will complain if it should listen on non broadcast interfaces. + +    Return True/False +    """ +    from ipaddress import ip_address +    from ipaddress import ip_network + +    from netifaces import ifaddresses +    from netifaces import interfaces +    from netifaces import AF_INET +    from netifaces import AF_INET6 + +    from vyos.template import is_ipv6 + +    # determine IP version (AF_INET or AF_INET6) depending on passed address +    addr_type = AF_INET +    if is_ipv6(subnet): +        addr_type = AF_INET6 + +    for interface in interfaces(): +        # check if the requested address type is configured at all +        if addr_type not in ifaddresses(interface).keys(): +            continue + +        # An interface can have multiple addresses, but some software components +        # only support the primary address :( +        if primary: +            ip = ifaddresses(interface)[addr_type][0]['addr'] +            if ip_address(ip) in ip_network(subnet): +                return True +        else: +            # Check every assigned IP address if it is connected to the subnet +            # in question +            for ip in ifaddresses(interface)[addr_type]: +                # remove interface extension (e.g. %eth0) that gets thrown on the end of _some_ addrs +                addr = ip['addr'].split('%')[0] +                if ip_address(addr) in ip_network(subnet): +                    return True + +    return False + +def is_afi_configured(interface, afi): +    """ Check if given address family is configured, or in other words - an IP +    address is assigned to the interface. """ +    from netifaces import ifaddresses +    from netifaces import AF_INET +    from netifaces import AF_INET6 + +    if afi not in [AF_INET, AF_INET6]: +        raise ValueError('Address family must be in [AF_INET, AF_INET6]') + +    try: +        addresses = ifaddresses(interface) +    except ValueError as e: +        print(e) +        return False + +    return afi in addresses diff --git a/python/vyos/validate.py b/python/vyos/validate.py deleted file mode 100644 index 567f4c972..000000000 --- a/python/vyos/validate.py +++ /dev/null @@ -1,304 +0,0 @@ -# Copyright 2018-2021 VyOS maintainers and contributors <maintainers@vyos.io> -# -# This library is free software; you can redistribute it and/or -# modify it under the terms of the GNU Lesser General Public -# License as published by the Free Software Foundation; either -# version 2.1 of the License, or (at your option) any later version. -# -# This library is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU -# Lesser General Public License for more details. -# -# You should have received a copy of the GNU Lesser General Public -# License along with this library.  If not, see <http://www.gnu.org/licenses/>. - -# Important note when you are adding new validation functions: -# -# The Control class will analyse the signature of the function in this file -# and will build the parameters to be passed to it. -# -# The parameter names "ifname" and "self" will get the Interface name and class -# parameters with default will be left unset -# all other paramters will receive the value to check - -def is_ipv6_link_local(addr): -    """ Check if addrsss is an IPv6 link-local address. Returns True/False """ -    from ipaddress import ip_interface -    from vyos.template import is_ipv6 -    addr = addr.split('%')[0] -    if is_ipv6(addr): -        if ip_interface(addr).is_link_local: -            return True - -    return False - -def _are_same_ip(one, two): -    from socket import AF_INET -    from socket import AF_INET6 -    from socket import inet_pton -    from vyos.template import is_ipv4 -    # compare the binary representation of the IP -    f_one = AF_INET if is_ipv4(one) else AF_INET6 -    s_two = AF_INET if is_ipv4(two) else AF_INET6 -    return inet_pton(f_one, one) == inet_pton(f_one, two) - -def is_intf_addr_assigned(intf, address) -> bool: -    """ -    Verify if the given IPv4/IPv6 address is assigned to specific interface. -    It can check both a single IP address (e.g. 192.0.2.1 or a assigned CIDR -    address 192.0.2.1/24. -    """ -    from vyos.template import is_ipv4 - -    from netifaces import ifaddresses -    from netifaces import AF_INET -    from netifaces import AF_INET6 - -    # check if the requested address type is configured at all -    # { -    # 17: [{'addr': '08:00:27:d9:5b:04', 'broadcast': 'ff:ff:ff:ff:ff:ff'}], -    # 2:  [{'addr': '10.0.2.15', 'netmask': '255.255.255.0', 'broadcast': '10.0.2.255'}], -    # 10: [{'addr': 'fe80::a00:27ff:fed9:5b04%eth0', 'netmask': 'ffff:ffff:ffff:ffff::'}] -    # } -    try: -        addresses = ifaddresses(intf) -    except ValueError as e: -        print(e) -        return False - -    # determine IP version (AF_INET or AF_INET6) depending on passed address -    addr_type = AF_INET if is_ipv4(address) else AF_INET6 - -    # Check every IP address on this interface for a match -    netmask = None -    if '/' in address: -        address, netmask = address.split('/') -    for ip in addresses.get(addr_type, []): -        # ip can have the interface name in the 'addr' field, we need to remove it -        # {'addr': 'fe80::a00:27ff:fec5:f821%eth2', 'netmask': 'ffff:ffff:ffff:ffff::'} -        ip_addr = ip['addr'].split('%')[0] - -        if not _are_same_ip(address, ip_addr): -            continue - -        # we do not have a netmask to compare against, they are the same -        if not netmask: -            return True - -        prefixlen = '' -        if is_ipv4(ip_addr): -            prefixlen = sum([bin(int(_)).count('1') for _ in ip['netmask'].split('.')]) -        else: -            prefixlen = sum([bin(int(_,16)).count('1') for _ in ip['netmask'].split('/')[0].split(':') if _]) - -        if str(prefixlen) == netmask: -            return True - -    return False - -def is_addr_assigned(ip_address, vrf=None) -> bool: -    """ Verify if the given IPv4/IPv6 address is assigned to any interface """ -    from netifaces import interfaces -    from vyos.utils.network import get_interface_config -    from vyos.utils.dict import dict_search - -    for interface in interfaces(): -        # Check if interface belongs to the requested VRF, if this is not the -        # case there is no need to proceed with this data set - continue loop -        # with next element -        tmp = get_interface_config(interface) -        if dict_search('master', tmp) != vrf: -            continue - -        if is_intf_addr_assigned(interface, ip_address): -            return True - -    return False - -def is_afi_configured(interface, afi): -    """ Check if given address family is configured, or in other words - an IP -    address is assigned to the interface. """ -    from netifaces import ifaddresses -    from netifaces import AF_INET -    from netifaces import AF_INET6 - -    if afi not in [AF_INET, AF_INET6]: -        raise ValueError('Address family must be in [AF_INET, AF_INET6]') - -    try: -        addresses = ifaddresses(interface) -    except ValueError as e: -        print(e) -        return False - -    return afi in addresses - -def is_loopback_addr(addr): -    """ Check if supplied IPv4/IPv6 address is a loopback address """ -    from ipaddress import ip_address -    return ip_address(addr).is_loopback - -def is_subnet_connected(subnet, primary=False): -    """ -    Verify is the given IPv4/IPv6 subnet is connected to any interface on this -    system. - -    primary check if the subnet is reachable via the primary IP address of this -    interface, or in other words has a broadcast address configured. ISC DHCP -    for instance will complain if it should listen on non broadcast interfaces. - -    Return True/False -    """ -    from ipaddress import ip_address -    from ipaddress import ip_network - -    from netifaces import ifaddresses -    from netifaces import interfaces -    from netifaces import AF_INET -    from netifaces import AF_INET6 - -    from vyos.template import is_ipv6 - -    # determine IP version (AF_INET or AF_INET6) depending on passed address -    addr_type = AF_INET -    if is_ipv6(subnet): -        addr_type = AF_INET6 - -    for interface in interfaces(): -        # check if the requested address type is configured at all -        if addr_type not in ifaddresses(interface).keys(): -            continue - -        # An interface can have multiple addresses, but some software components -        # only support the primary address :( -        if primary: -            ip = ifaddresses(interface)[addr_type][0]['addr'] -            if ip_address(ip) in ip_network(subnet): -                return True -        else: -            # Check every assigned IP address if it is connected to the subnet -            # in question -            for ip in ifaddresses(interface)[addr_type]: -                # remove interface extension (e.g. %eth0) that gets thrown on the end of _some_ addrs -                addr = ip['addr'].split('%')[0] -                if ip_address(addr) in ip_network(subnet): -                    return True - -    return False - - -def assert_boolean(b): -    if int(b) not in (0, 1): -        raise ValueError(f'Value {b} out of range') - - -def assert_range(value, lower=0, count=3): -    if int(value, 16) not in range(lower, lower+count): -        raise ValueError("Value out of range") - - -def assert_list(s, l): -    if s not in l: -        o = ' or '.join([f'"{n}"' for n in l]) -        raise ValueError(f'state must be {o}, got {s}') - - -def assert_number(n): -    if not str(n).isnumeric(): -        raise ValueError(f'{n} must be a number') - - -def assert_positive(n, smaller=0): -    assert_number(n) -    if int(n) < smaller: -        raise ValueError(f'{n} is smaller than {smaller}') - - -def assert_mtu(mtu, ifname): -    assert_number(mtu) - -    import json -    from vyos.utils.process import cmd -    out = cmd(f'ip -j -d link show dev {ifname}') -    # [{"ifindex":2,"ifname":"eth0","flags":["BROADCAST","MULTICAST","UP","LOWER_UP"],"mtu":1500,"qdisc":"pfifo_fast","operstate":"UP","linkmode":"DEFAULT","group":"default","txqlen":1000,"link_type":"ether","address":"08:00:27:d9:5b:04","broadcast":"ff:ff:ff:ff:ff:ff","promiscuity":0,"min_mtu":46,"max_mtu":16110,"inet6_addr_gen_mode":"none","num_tx_queues":1,"num_rx_queues":1,"gso_max_size":65536,"gso_max_segs":65535}] -    parsed = json.loads(out)[0] -    min_mtu = int(parsed.get('min_mtu', '0')) -    # cur_mtu = parsed.get('mtu',0), -    max_mtu = int(parsed.get('max_mtu', '0')) -    cur_mtu = int(mtu) - -    if (min_mtu and cur_mtu < min_mtu) or cur_mtu < 68: -        raise ValueError(f'MTU is too small for interface "{ifname}": {mtu} < {min_mtu}') -    if (max_mtu and cur_mtu > max_mtu) or cur_mtu > 65536: -        raise ValueError(f'MTU is too small for interface "{ifname}": {mtu} > {max_mtu}') - - -def assert_mac(m): -    split = m.split(':') -    size = len(split) - -    # a mac address consits out of 6 octets -    if size != 6: -        raise ValueError(f'wrong number of MAC octets ({size}): {m}') - -    octets = [] -    try: -        for octet in split: -            octets.append(int(octet, 16)) -    except ValueError: -        raise ValueError(f'invalid hex number "{octet}" in : {m}') - -    # validate against the first mac address byte if it's a multicast -    # address -    if octets[0] & 1: -        raise ValueError(f'{m} is a multicast MAC address') - -    # overall mac address is not allowed to be 00:00:00:00:00:00 -    if sum(octets) == 0: -        raise ValueError('00:00:00:00:00:00 is not a valid MAC address') - -    if octets[:5] == (0, 0, 94, 0, 1): -        raise ValueError(f'{m} is a VRRP MAC address') - -def has_address_configured(conf, intf): -    """ -    Checks if interface has an address configured. -    Checks the following config nodes: -    'address', 'ipv6 address eui64', 'ipv6 address autoconf' - -    Returns True if interface has address configured, False if it doesn't. -    """ -    from vyos.ifconfig import Section -    ret = False - -    old_level = conf.get_level() -    conf.set_level([]) - -    intfpath = 'interfaces ' + Section.get_config_path(intf) -    if ( conf.exists(f'{intfpath} address') or -            conf.exists(f'{intfpath} ipv6 address autoconf') or -            conf.exists(f'{intfpath} ipv6 address eui64') ): -        ret = True - -    conf.set_level(old_level) -    return ret - -def has_vrf_configured(conf, intf): -    """ -    Checks if interface has a VRF configured. - -    Returns True if interface has VRF configured, False if it doesn't. -    """ -    from vyos.ifconfig import Section -    ret = False - -    old_level = conf.get_level() -    conf.set_level([]) - -    tmp = ['interfaces', Section.get_config_path(intf), 'vrf'] -    if conf.exists(tmp): -        ret = True - -    conf.set_level(old_level) -    return ret | 
