diff options
Diffstat (limited to 'src')
| -rwxr-xr-x | src/conf_mode/lldp.py | 235 | 
1 files changed, 59 insertions, 176 deletions
diff --git a/src/conf_mode/lldp.py b/src/conf_mode/lldp.py index 082c3e128..db8328259 100755 --- a/src/conf_mode/lldp.py +++ b/src/conf_mode/lldp.py @@ -1,6 +1,6 @@  #!/usr/bin/env python3  # -# Copyright (C) 2017-2020 VyOS maintainers and contributors +# Copyright (C) 2017-2022 VyOS maintainers and contributors  #  # This program is free software; you can redistribute it and/or modify  # it under the terms of the GNU General Public License version 2 or later as @@ -15,19 +15,19 @@  # along with this program.  If not, see <http://www.gnu.org/licenses/>.  import os -import re -from copy import deepcopy  from sys import exit  from vyos.config import Config +from vyos.configdict import dict_merge  from vyos.validate import is_addr_assigned  from vyos.validate import is_loopback_addr  from vyos.version import get_version_data -from vyos import ConfigError  from vyos.util import call +from vyos.util import dict_search +from vyos.xml import defaults  from vyos.template import render - +from vyos import ConfigError  from vyos import airbag  airbag.enable() @@ -35,178 +35,73 @@ config_file = "/etc/default/lldpd"  vyos_config_file = "/etc/lldpd.d/01-vyos.conf"  base = ['service', 'lldp'] -default_config_data = { -    "options": '', -    "interface_list": '', -    "location": '' -} - -def get_options(config): -    options = {} -    config.set_level(base) - -    options['listen_vlan'] = config.exists('listen-vlan') -    options['mgmt_addr'] = [] -    for addr in config.return_values('management-address'): -        if is_addr_assigned(addr) and not is_loopback_addr(addr): -            options['mgmt_addr'].append(addr) -        else: -            message = 'WARNING: LLDP management address {0} invalid - '.format(addr) -            if is_loopback_addr(addr): -                message += '(loopback address).' -            else: -                message += 'address not found.' -            print(message) - -    snmp = config.exists('snmp enable') -    options["snmp"] = snmp -    if snmp: -        config.set_level('') -        options["sys_snmp"] = config.exists('service snmp') -        config.set_level(base) - -    config.set_level(base + ['legacy-protocols']) -    options['cdp'] = config.exists('cdp') -    options['edp'] = config.exists('edp') -    options['fdp'] = config.exists('fdp') -    options['sonmp'] = config.exists('sonmp') - -    # start with an unknown version information -    version_data = get_version_data() -    options['description'] = version_data['version'] -    options['listen_on'] = [] - -    return options - -def get_interface_list(config): -    config.set_level(base) -    intfs_names = config.list_nodes(['interface']) -    if len(intfs_names) < 0: -        return 0 - -    interface_list = [] -    for name in intfs_names: -        config.set_level(base + ['interface', name]) -        disable = config.exists(['disable']) -        intf = { -            'name': name, -            'disable': disable -        } -        interface_list.append(intf) -    return interface_list - - -def get_location_intf(config, name): -    path = base + ['interface', name] -    config.set_level(path) - -    config.set_level(path + ['location']) -    elin = '' -    coordinate_based = {} - -    if config.exists('elin'): -        elin = config.return_value('elin') - -    if config.exists('coordinate-based'): -        config.set_level(path + ['location', 'coordinate-based']) - -        coordinate_based['latitude'] = config.return_value(['latitude']) -        coordinate_based['longitude'] = config.return_value(['longitude']) - -        coordinate_based['altitude'] = '0' -        if config.exists(['altitude']): -            coordinate_based['altitude'] = config.return_value(['altitude']) - -        coordinate_based['datum'] = 'WGS84' -        if config.exists(['datum']): -            coordinate_based['datum'] = config.return_value(['datum']) - -    intf = { -        'name': name, -        'elin': elin, -        'coordinate_based': coordinate_based - -    } -    return intf - - -def get_location(config): -    config.set_level(base) -    intfs_names = config.list_nodes(['interface']) -    if len(intfs_names) < 0: -        return 0 - -    if config.exists('disable'): -        return 0 - -    intfs_location = [] -    for name in intfs_names: -        intf = get_location_intf(config, name) -        intfs_location.append(intf) - -    return intfs_location - -  def get_config(config=None): -    lldp = deepcopy(default_config_data)      if config:          conf = config      else:          conf = Config() +      if not conf.exists(base): -        return None -    else: -        lldp['options'] = get_options(conf) -        lldp['interface_list'] = get_interface_list(conf) -        lldp['location'] = get_location(conf) +        return {} -        return lldp +    lldp = conf.get_config_dict(base, key_mangling=('-', '_'), +                                get_first_key=True, no_tag_node_value_mangle=True) +    if conf.exists(['service', 'snmp']): +        lldp['system_snmp_enabled'] = '' + +    version_data = get_version_data() +    lldp['version'] = version_data['version'] + +    # We have gathered the dict representation of the CLI, but there are default +    # options which we need to update into the dictionary retrived. +    # location coordinates have a default value +    if 'interface' in lldp: +        for interface, interface_config in lldp['interface'].items(): +            default_values = defaults(base + ['interface']) +            if dict_search('location.coordinate_based', interface_config) == None: +                # no location specified - no need to add defaults +                del default_values['location']['coordinate_based']['datum'] +                del default_values['location']['coordinate_based']['altitude'] + +            # cleanup default_values dictionary from inner to outer +            # this might feel overkill here, but it does support easy extension +            # in the future with additional default values +            if len(default_values['location']['coordinate_based']) == 0: +                del default_values['location']['coordinate_based'] +            if len(default_values['location']) == 0: +                del default_values['location'] + +            lldp['interface'][interface] = dict_merge(default_values, +                                                   lldp['interface'][interface]) + +    return lldp  def verify(lldp):      # bail out early - looks like removal from running config      if lldp is None:          return -    # check location -    for location in lldp['location']: -        # check coordinate-based -        if len(location['coordinate_based']) > 0: -            # check longitude and latitude -            if not location['coordinate_based']['longitude']: -                raise ConfigError('Must define longitude for interface {0}'.format(location['name'])) - -            if not location['coordinate_based']['latitude']: -                raise ConfigError('Must define latitude for interface {0}'.format(location['name'])) - -            if not re.match(r'^(\d+)(\.\d+)?[nNsS]$', location['coordinate_based']['latitude']): -                raise ConfigError('Invalid location for interface {0}:\n' \ -                                  'latitude should be a number followed by S or N'.format(location['name'])) - -            if not re.match(r'^(\d+)(\.\d+)?[eEwW]$', location['coordinate_based']['longitude']): -                raise ConfigError('Invalid location for interface {0}:\n' \ -                                  'longitude should be a number followed by E or W'.format(location['name'])) - -            # check altitude and datum if exist -            if location['coordinate_based']['altitude']: -                if not re.match(r'^[-+0-9\.]+$', location['coordinate_based']['altitude']): -                    raise ConfigError('Invalid location for interface {0}:\n' \ -                                      'altitude should be a positive or negative number'.format(location['name'])) - -            if location['coordinate_based']['datum']: -                if not re.match(r'^(WGS84|NAD83|MLLW)$', location['coordinate_based']['datum']): -                    raise ConfigError("Invalid location for interface {0}:\n' \ -                                      'datum should be WGS84, NAD83, or MLLW".format(location['name'])) - -        # check elin -        elif location['elin']: -            if not re.match(r'^[0-9]{10,25}$', location['elin']): -                raise ConfigError('Invalid location for interface {0}:\n' \ -                                  'ELIN number must be between 10-25 numbers'.format(location['name'])) +    if 'management_address' in lldp: +        for address in lldp['management_address']: +            message = f'WARNING: LLDP management address "{address}" is invalid' +            if is_loopback_addr(address): +                print(f'{message} - loopback address') +            elif not is_addr_assigned(address): +                print(f'{message} - not assigned to any interface') + +    if 'interface' in lldp: +        for interface, interface_config in lldp['interface'].items(): +            # bail out early if no location info present in interface config +            if 'location' not in interface_config: +                continue +            if 'coordinate_based' in interface_config['location']: +                if not {'latitude', 'latitude'} <= set(interface_config['location']['coordinate_based']): +                    raise ConfigError(f'Must define both longitude and latitude for "{interface}" location!')      # check options -    if lldp['options']['snmp']: -        if not lldp['options']['sys_snmp']: +    if 'snmp' in lldp and 'enable' in lldp['snmp']: +        if 'system_snmp_enabled' not in lldp:              raise ConfigError('SNMP must be configured to enable LLDP SNMP') @@ -215,29 +110,17 @@ def generate(lldp):      if lldp is None:          return -    # generate listen on interfaces -    for intf in lldp['interface_list']: -        tmp = '' -        # add exclamation mark if interface is disabled -        if intf['disable']: -            tmp = '!' - -        tmp += intf['name'] -        lldp['options']['listen_on'].append(tmp) - -    # generate /etc/default/lldpd      render(config_file, 'lldp/lldpd.tmpl', lldp) -    # generate /etc/lldpd.d/01-vyos.conf      render(vyos_config_file, 'lldp/vyos.conf.tmpl', lldp) -  def apply(lldp): +    systemd_service = 'lldpd.service'      if lldp:          # start/restart lldp service -        call('systemctl restart lldpd.service') +        call(f'systemctl restart {systemd_service}')      else:          # LLDP service has been terminated -        call('systemctl stop lldpd.service') +        call(f'systemctl stop {systemd_service}')          if os.path.isfile(config_file):              os.unlink(config_file)          if os.path.isfile(vyos_config_file):  | 
