diff options
Diffstat (limited to 'python')
-rw-r--r-- | python/setup.py | 49 | ||||
-rw-r--r-- | python/vyos/config.py | 12 | ||||
-rw-r--r-- | python/vyos/config_mgmt.py | 10 | ||||
-rw-r--r-- | python/vyos/configdep.py | 9 | ||||
-rw-r--r-- | python/vyos/configsource.py | 10 | ||||
-rw-r--r-- | python/vyos/configtree.py | 49 | ||||
-rw-r--r-- | python/vyos/defaults.py | 5 | ||||
-rwxr-xr-x | python/vyos/firewall.py | 7 | ||||
-rw-r--r-- | python/vyos/frrender.py | 6 | ||||
-rw-r--r-- | python/vyos/kea.py | 8 | ||||
-rw-r--r-- | python/vyos/proto/__init__.py | 0 | ||||
-rwxr-xr-x | python/vyos/proto/generate_dataclass.py | 178 | ||||
-rw-r--r-- | python/vyos/proto/vyconf_client.py | 87 | ||||
-rw-r--r-- | python/vyos/utils/auth.py | 70 | ||||
-rw-r--r-- | python/vyos/utils/network.py | 16 |
15 files changed, 497 insertions, 19 deletions
diff --git a/python/setup.py b/python/setup.py index 2d614e724..571b956ee 100644 --- a/python/setup.py +++ b/python/setup.py @@ -1,5 +1,14 @@ import os +import sys +import subprocess from setuptools import setup +from setuptools.command.build_py import build_py + +sys.path.append('./vyos') +from defaults import directories + +def desc_out(f): + return os.path.splitext(f)[0] + '.desc' def packages(directory): return [ @@ -8,6 +17,43 @@ def packages(directory): if os.path.isfile(os.path.join(_[0], '__init__.py')) ] + +class GenerateProto(build_py): + ver = os.environ.get('OCAML_VERSION') + if ver: + proto_path = f'/opt/opam/{ver}/share/vyconf' + else: + proto_path = directories['proto_path'] + + def run(self): + # find all .proto files in vyconf proto_path + proto_files = [] + for _, _, files in os.walk(self.proto_path): + for file in files: + if file.endswith('.proto'): + proto_files.append(file) + + # compile each .proto file to Python + for proto_file in proto_files: + subprocess.check_call( + [ + 'protoc', + '--python_out=vyos/proto', + f'--proto_path={self.proto_path}/', + f'--descriptor_set_out=vyos/proto/{desc_out(proto_file)}', + proto_file, + ] + ) + subprocess.check_call( + [ + 'vyos/proto/generate_dataclass.py', + 'vyos/proto/vyconf.desc', + '--out-dir=vyos/proto', + ] + ) + + build_py.run(self) + setup( name = "vyos", version = "1.3.0", @@ -29,4 +75,7 @@ setup( "config-mgmt = vyos.config_mgmt:run", ], }, + cmdclass={ + 'build_py': GenerateProto, + }, ) diff --git a/python/vyos/config.py b/python/vyos/config.py index 1fab46761..546eeceab 100644 --- a/python/vyos/config.py +++ b/python/vyos/config.py @@ -149,6 +149,18 @@ class Config(object): return self._running_config return self._session_config + def get_bool_attr(self, attr) -> bool: + if not hasattr(self, attr): + return False + else: + tmp = getattr(self, attr) + if not isinstance(tmp, bool): + return False + return tmp + + def set_bool_attr(self, attr, val): + setattr(self, attr, val) + def _make_path(self, path): # Backwards-compatibility stuff: original implementation used string paths # libvyosconfig paths are lists, but since node names cannot contain whitespace, diff --git a/python/vyos/config_mgmt.py b/python/vyos/config_mgmt.py index 1c2b70fdf..dd8910afb 100644 --- a/python/vyos/config_mgmt.py +++ b/python/vyos/config_mgmt.py @@ -287,7 +287,7 @@ Proceed ?""" # commits under commit-confirm are not added to revision list unless # confirmed, hence a soft revert is to revision 0 - revert_ct = self._get_config_tree_revision(0) + revert_ct = self.get_config_tree_revision(0) message = '[commit-confirm] Reverting to previous config now' os.system('wall -n ' + message) @@ -351,7 +351,7 @@ Proceed ?""" ) return msg, 1 - rollback_ct = self._get_config_tree_revision(rev) + rollback_ct = self.get_config_tree_revision(rev) try: load(rollback_ct, switch='explicit') print('Rollback diff has been applied.') @@ -382,7 +382,7 @@ Proceed ?""" if rev1 is not None: if not self._check_revision_number(rev1): return f'Invalid revision number {rev1}', 1 - ct1 = self._get_config_tree_revision(rev1) + ct1 = self.get_config_tree_revision(rev1) ct2 = self.working_config msg = f'No changes between working and revision {rev1} configurations.\n' if rev2 is not None: @@ -390,7 +390,7 @@ Proceed ?""" return f'Invalid revision number {rev2}', 1 # compare older to newer ct2 = ct1 - ct1 = self._get_config_tree_revision(rev2) + ct1 = self.get_config_tree_revision(rev2) msg = f'No changes between revisions {rev2} and {rev1} configurations.\n' out = '' @@ -575,7 +575,7 @@ Proceed ?""" r = f.read().decode() return r - def _get_config_tree_revision(self, rev: int): + def get_config_tree_revision(self, rev: int): c = self._get_file_revision(rev) return ConfigTree(c) diff --git a/python/vyos/configdep.py b/python/vyos/configdep.py index cf7c9d543..747af8dbe 100644 --- a/python/vyos/configdep.py +++ b/python/vyos/configdep.py @@ -102,11 +102,16 @@ def run_config_mode_script(target: str, config: 'Config'): mod = load_as_module(name, path) config.set_level([]) + dry_run = config.get_bool_attr('dry_run') try: c = mod.get_config(config) mod.verify(c) - mod.generate(c) - mod.apply(c) + if not dry_run: + mod.generate(c) + mod.apply(c) + else: + if hasattr(mod, 'call_dependents'): + mod.call_dependents() except (VyOSError, ConfigError) as e: raise ConfigError(str(e)) from e diff --git a/python/vyos/configsource.py b/python/vyos/configsource.py index 59e5ac8a1..65cef5333 100644 --- a/python/vyos/configsource.py +++ b/python/vyos/configsource.py @@ -319,3 +319,13 @@ class ConfigSourceString(ConfigSource): self._session_config = ConfigTree(session_config_text) if session_config_text else None except ValueError: raise ConfigSourceError(f"Init error in {type(self)}") + +class ConfigSourceCache(ConfigSource): + def __init__(self, running_config_cache=None, session_config_cache=None): + super().__init__() + + try: + self._running_config = ConfigTree(internal=running_config_cache) if running_config_cache else None + self._session_config = ConfigTree(internal=session_config_cache) if session_config_cache else None + except ValueError: + raise ConfigSourceError(f"Init error in {type(self)}") diff --git a/python/vyos/configtree.py b/python/vyos/configtree.py index 4ad0620a5..ff40fbad0 100644 --- a/python/vyos/configtree.py +++ b/python/vyos/configtree.py @@ -50,7 +50,7 @@ def unescape_backslash(string: str) -> str: def extract_version(s): """Extract the version string from the config string""" t = re.split('(^//)', s, maxsplit=1, flags=re.MULTILINE) - return (s, ''.join(t[1:])) + return (t[0], ''.join(t[1:])) def check_path(path): @@ -66,9 +66,14 @@ class ConfigTreeError(Exception): class ConfigTree(object): - def __init__(self, config_string=None, address=None, libpath=LIBPATH): - if config_string is None and address is None: - raise TypeError("ConfigTree() requires one of 'config_string' or 'address'") + def __init__( + self, config_string=None, address=None, internal=None, libpath=LIBPATH + ): + if config_string is None and address is None and internal is None: + raise TypeError( + "ConfigTree() requires one of 'config_string', 'address', or 'internal'" + ) + self.__config = None self.__lib = cdll.LoadLibrary(libpath) @@ -89,6 +94,13 @@ class ConfigTree(object): self.__to_commands.argtypes = [c_void_p, c_char_p] self.__to_commands.restype = c_char_p + self.__read_internal = self.__lib.read_internal + self.__read_internal.argtypes = [c_char_p] + self.__read_internal.restype = c_void_p + + self.__write_internal = self.__lib.write_internal + self.__write_internal.argtypes = [c_void_p, c_char_p] + self.__to_json = self.__lib.to_json self.__to_json.argtypes = [c_void_p] self.__to_json.restype = c_char_p @@ -168,7 +180,21 @@ class ConfigTree(object): self.__destroy = self.__lib.destroy self.__destroy.argtypes = [c_void_p] - if address is None: + self.__equal = self.__lib.equal + self.__equal.argtypes = [c_void_p, c_void_p] + self.__equal.restype = c_bool + + if address is not None: + self.__config = address + self.__version = '' + elif internal is not None: + config = self.__read_internal(internal.encode()) + if config is None: + msg = self.__get_error().decode() + raise ValueError('Failed to read internal rep: {0}'.format(msg)) + else: + self.__config = config + elif config_string is not None: config_section, version_section = extract_version(config_string) config_section = escape_backslash(config_section) config = self.__from_string(config_section.encode()) @@ -179,8 +205,9 @@ class ConfigTree(object): self.__config = config self.__version = version_section else: - self.__config = address - self.__version = '' + raise TypeError( + "ConfigTree() requires one of 'config_string', 'address', or 'internal'" + ) self.__migration = os.environ.get('VYOS_MIGRATION') if self.__migration: @@ -190,6 +217,11 @@ class ConfigTree(object): if self.__config is not None: self.__destroy(self.__config) + def __eq__(self, other): + if isinstance(other, ConfigTree): + return self.__equal(self._get_config(), other._get_config()) + return False + def __str__(self): return self.to_string() @@ -199,6 +231,9 @@ class ConfigTree(object): def get_version_string(self): return self.__version + def write_cache(self, file_name): + self.__write_internal(self._get_config(), file_name) + def to_string(self, ordered_values=False, no_version=False): config_string = self.__to_string(self.__config, ordered_values).decode() config_string = unescape_backslash(config_string) diff --git a/python/vyos/defaults.py b/python/vyos/defaults.py index 86194cd55..2b08ff68e 100644 --- a/python/vyos/defaults.py +++ b/python/vyos/defaults.py @@ -38,7 +38,8 @@ directories = { 'vyos_configdir' : '/opt/vyatta/config', 'completion_dir' : f'{base_dir}/completion', 'ca_certificates' : '/usr/local/share/ca-certificates/vyos', - 'ppp_nexthop_dir' : '/run/ppp_nexthop' + 'ppp_nexthop_dir' : '/run/ppp_nexthop', + 'proto_path' : '/usr/share/vyos/vyconf' } systemd_services = { @@ -69,3 +70,5 @@ rt_symbolic_names = { rt_global_vrf = rt_symbolic_names['main'] rt_global_table = rt_symbolic_names['main'] + +vyconfd_conf = '/etc/vyos/vyconfd.conf' diff --git a/python/vyos/firewall.py b/python/vyos/firewall.py index 314e8dfe3..9f01f8be1 100755 --- a/python/vyos/firewall.py +++ b/python/vyos/firewall.py @@ -310,6 +310,13 @@ def parse_rule(rule_conf, hook, fw_name, rule_id, ip_name): operator = '!=' group_name = group_name[1:] output.append(f'{ip_name} {prefix}addr {operator} @D_{group_name}') + elif 'remote_group' in group: + group_name = group['remote_group'] + operator = '' + if group_name[0] == '!': + operator = '!=' + group_name = group_name[1:] + output.append(f'{ip_name} {prefix}addr {operator} @R_{group_name}') if 'mac_group' in group: group_name = group['mac_group'] operator = '' diff --git a/python/vyos/frrender.py b/python/vyos/frrender.py index ba44978d1..8d469e3e2 100644 --- a/python/vyos/frrender.py +++ b/python/vyos/frrender.py @@ -60,6 +60,10 @@ def get_frrender_dict(conf, argv=None) -> dict: from vyos.configdict import get_dhcp_interfaces from vyos.configdict import get_pppoe_interfaces + # We need to re-set the CLI path to the root level, as this function uses + # conf.exists() with an absolute path form the CLI root + conf.set_level([]) + # Create an empty dictionary which will be filled down the code path and # returned to the caller dict = {} @@ -599,8 +603,10 @@ def get_frrender_dict(conf, argv=None) -> dict: dict.update({'vrf' : vrf}) if os.path.exists(frr_debug_enable): + print(f'---- get_frrender_dict({conf}) ----') import pprint pprint.pprint(dict) + print('-----------------------------------') return dict diff --git a/python/vyos/kea.py b/python/vyos/kea.py index c7947af3e..9fc5dde3d 100644 --- a/python/vyos/kea.py +++ b/python/vyos/kea.py @@ -483,10 +483,10 @@ def kea_get_domain_from_subnet_id(config, inet, subnet_id): if option['name'] == 'domain-name': return option['data'] - # domain-name is not found in subnet, fallback to shared-network pool option - for option in network['option-data']: - if option['name'] == 'domain-name': - return option['data'] + # domain-name is not found in subnet, fallback to shared-network pool option + for option in network['option-data']: + if option['name'] == 'domain-name': + return option['data'] return None diff --git a/python/vyos/proto/__init__.py b/python/vyos/proto/__init__.py new file mode 100644 index 000000000..e69de29bb --- /dev/null +++ b/python/vyos/proto/__init__.py diff --git a/python/vyos/proto/generate_dataclass.py b/python/vyos/proto/generate_dataclass.py new file mode 100755 index 000000000..c6296c568 --- /dev/null +++ b/python/vyos/proto/generate_dataclass.py @@ -0,0 +1,178 @@ +#!/usr/bin/env python3 +# +# Copyright (C) 2025 VyOS maintainers and contributors +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 2 or later as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +# +# +import argparse +import os + +from google.protobuf.descriptor_pb2 import FileDescriptorSet # pylint: disable=no-name-in-module +from google.protobuf.descriptor_pb2 import FieldDescriptorProto # pylint: disable=no-name-in-module +from humps import decamelize + +HEADER = """\ +from enum import IntEnum +from dataclasses import dataclass +from dataclasses import field +""" + + +def normalize(s: str) -> str: + """Decamelize and avoid syntactic collision""" + t = decamelize(s) + return t + '_' if t in ['from'] else t + + +def generate_dataclass(descriptor_proto): + class_name = descriptor_proto.name + fields = [] + for field_p in descriptor_proto.field: + field_name = field_p.name + field_type, field_default = get_type(field_p.type, field_p.type_name) + match field_p.label: + case FieldDescriptorProto.LABEL_REPEATED: + field_type = f'list[{field_type}] = field(default_factory=list)' + case FieldDescriptorProto.LABEL_OPTIONAL: + field_type = f'{field_type} = None' + case _: + field_type = f'{field_type} = {field_default}' + + fields.append(f' {field_name}: {field_type}') + + code = f""" +@dataclass +class {class_name}: +{chr(10).join(fields) if fields else ' pass'} +""" + + return code + + +def generate_request(descriptor_proto): + class_name = descriptor_proto.name + fields = [] + f_vars = [] + for field_p in descriptor_proto.field: + field_name = field_p.name + field_type, field_default = get_type(field_p.type, field_p.type_name) + match field_p.label: + case FieldDescriptorProto.LABEL_REPEATED: + field_type = f'list[{field_type}] = []' + case FieldDescriptorProto.LABEL_OPTIONAL: + field_type = f'{field_type} = None' + case _: + field_type = f'{field_type} = {field_default}' + + fields.append(f'{normalize(field_name)}: {field_type}') + f_vars.append(f'{normalize(field_name)}') + + fields.insert(0, 'token: str = None') + + code = f""" +def set_request_{decamelize(class_name)}({', '.join(fields)}): + reqi = {class_name} ({', '.join(f_vars)}) + req = Request({decamelize(class_name)}=reqi) + req_env = RequestEnvelope(token, req) + return req_env +""" + + return code + + +def generate_nested_dataclass(descriptor_proto): + out = '' + for nested_p in descriptor_proto.nested_type: + out = out + generate_dataclass(nested_p) + + return out + + +def generate_nested_request(descriptor_proto): + out = '' + for nested_p in descriptor_proto.nested_type: + out = out + generate_request(nested_p) + + return out + + +def generate_enum_dataclass(descriptor_proto): + code = '' + for enum_p in descriptor_proto.enum_type: + enums = [] + enum_name = enum_p.name + for enum_val in enum_p.value: + enums.append(f' {enum_val.name} = {enum_val.number}') + + code += f""" +class {enum_name}(IntEnum): +{chr(10).join(enums)} +""" + + return code + + +def get_type(field_type, type_name): + res = 'Any', None + match field_type: + case FieldDescriptorProto.TYPE_STRING: + res = 'str', '""' + case FieldDescriptorProto.TYPE_INT32 | FieldDescriptorProto.TYPE_INT64: + res = 'int', 0 + case FieldDescriptorProto.TYPE_FLOAT | FieldDescriptorProto.TYPE_DOUBLE: + res = 'float', 0.0 + case FieldDescriptorProto.TYPE_BOOL: + res = 'bool', False + case FieldDescriptorProto.TYPE_MESSAGE | FieldDescriptorProto.TYPE_ENUM: + res = type_name.split('.')[-1], None + case _: + pass + + return res + + +if __name__ == '__main__': + parser = argparse.ArgumentParser() + parser.add_argument('descriptor_file', help='protobuf .desc file') + parser.add_argument('--out-dir', help='directory to write generated file') + args = parser.parse_args() + desc_file = args.descriptor_file + out_dir = args.out_dir + + with open(desc_file, 'rb') as f: + descriptor_set_data = f.read() + + descriptor_set = FileDescriptorSet() + descriptor_set.ParseFromString(descriptor_set_data) + + for file_proto in descriptor_set.file: + f = f'{file_proto.name.replace(".", "_")}.py' + f = os.path.join(out_dir, f) + dataclass_code = '' + nested_code = '' + enum_code = '' + request_code = '' + with open(f, 'w') as f: + enum_code += generate_enum_dataclass(file_proto) + for message_proto in file_proto.message_type: + dataclass_code += generate_dataclass(message_proto) + nested_code += generate_nested_dataclass(message_proto) + enum_code += generate_enum_dataclass(message_proto) + request_code += generate_nested_request(message_proto) + + f.write(HEADER) + f.write(enum_code) + f.write(nested_code) + f.write(dataclass_code) + f.write(request_code) diff --git a/python/vyos/proto/vyconf_client.py b/python/vyos/proto/vyconf_client.py new file mode 100644 index 000000000..f34549309 --- /dev/null +++ b/python/vyos/proto/vyconf_client.py @@ -0,0 +1,87 @@ +# Copyright 2025 VyOS maintainers and contributors <maintainers@vyos.io> +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public License +# along with this library. If not, see <http://www.gnu.org/licenses/>. + +import socket +from dataclasses import asdict + +from vyos.proto import vyconf_proto +from vyos.proto import vyconf_pb2 + +from google.protobuf.json_format import MessageToDict +from google.protobuf.json_format import ParseDict + +socket_path = '/var/run/vyconfd.sock' + + +def send_socket(msg: bytearray) -> bytes: + data = bytes() + client = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + client.connect(socket_path) + client.sendall(msg) + + data_length = client.recv(4) + if data_length: + length = int.from_bytes(data_length) + data = client.recv(length) + + client.close() + + return data + + +def request_to_msg(req: vyconf_proto.RequestEnvelope) -> vyconf_pb2.RequestEnvelope: + # pylint: disable=no-member + + msg = vyconf_pb2.RequestEnvelope() + msg = ParseDict(asdict(req), msg, ignore_unknown_fields=True) + return msg + + +def msg_to_response(msg: vyconf_pb2.Response) -> vyconf_proto.Response: + # pylint: disable=no-member + + d = MessageToDict(msg, preserving_proto_field_name=True) + + response = vyconf_proto.Response(**d) + return response + + +def write_request(req: vyconf_proto.RequestEnvelope) -> bytearray: + req_msg = request_to_msg(req) + encoded_data = req_msg.SerializeToString() + byte_size = req_msg.ByteSize() + length_bytes = byte_size.to_bytes(4) + arr = bytearray(length_bytes) + arr.extend(encoded_data) + + return arr + + +def read_response(msg: bytes) -> vyconf_proto.Response: + response_msg = vyconf_pb2.Response() # pylint: disable=no-member + response_msg.ParseFromString(msg) + response = msg_to_response(response_msg) + + return response + + +def send_request(name, *args, **kwargs): + func = getattr(vyconf_proto, f'set_request_{name}') + request_env = func(*args, **kwargs) + msg = write_request(request_env) + response_msg = send_socket(msg) + response = read_response(response_msg) + + return response diff --git a/python/vyos/utils/auth.py b/python/vyos/utils/auth.py index a0b3e1cae..5d0e3464a 100644 --- a/python/vyos/utils/auth.py +++ b/python/vyos/utils/auth.py @@ -13,10 +13,80 @@ # You should have received a copy of the GNU Lesser General Public License along with this library; # if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +import cracklib +import math import re +import string +from enum import StrEnum +from decimal import Decimal from vyos.utils.process import cmd + +DEFAULT_PASSWORD: str = 'vyos' +LOW_ENTROPY_MSG: str = 'should be at least 8 characters long;' +WEAK_PASSWORD_MSG: str = 'The password complexity is too low - @MSG@' +CRACKLIB_ERROR_MSG: str = 'A following error occurred: @MSG@\n' \ + 'Possibly the cracklib database is corrupted or is missing. ' \ + 'Try reinstalling the python3-cracklib package.' + +class EPasswdStrength(StrEnum): + WEAK = 'Weak' + DECENT = 'Decent' + STRONG = 'Strong' + ERROR = 'Cracklib Error' + + +def calculate_entropy(charset: str, passwd: str) -> float: + """ + Calculate the entropy of a password based on the set of characters used + Uses E = log2(R**L) formula, where + - R is the range (length) of the character set + - L is the length of password + """ + return math.log(math.pow(len(charset), len(passwd)), 2) + +def evaluate_strength(passwd: str) -> dict[str, str]: + """ Evaluates password strength and returns a check result dict """ + charset = (cracklib.ASCII_UPPERCASE + cracklib.ASCII_LOWERCASE + + string.punctuation + string.digits) + + result = { + 'strength': '', + 'error': '', + } + + try: + cracklib.FascistCheck(passwd) + except ValueError as e: + # The password is vulnerable to dictionary attack no matter the entropy + if 'is' in str(e): + msg = str(e).replace('is', 'should not be') + else: + msg = f'should not be {e}' + result.update(strength=EPasswdStrength.WEAK) + result.update(error=WEAK_PASSWORD_MSG.replace('@MSG@', msg)) + except Exception as e: + result.update(strength=EPasswdStrength.ERROR) + result.update(error=CRACKLIB_ERROR_MSG.replace('@MSG@', str(e))) + else: + # Now check the password's entropy + # Cast to Decimal for more precise rounding + entropy = Decimal.from_float(calculate_entropy(charset, passwd)) + + match round(entropy): + case e if e in range(0, 59): + result.update(strength=EPasswdStrength.WEAK) + result.update( + error=WEAK_PASSWORD_MSG.replace('@MSG@', LOW_ENTROPY_MSG) + ) + case e if e in range(60, 119): + result.update(strength=EPasswdStrength.DECENT) + case e if e >= 120: + result.update(strength=EPasswdStrength.STRONG) + + return result + def make_password_hash(password): """ Makes a password hash for /etc/shadow using mkpasswd """ diff --git a/python/vyos/utils/network.py b/python/vyos/utils/network.py index dc0c0a6d6..2f666f0ee 100644 --- a/python/vyos/utils/network.py +++ b/python/vyos/utils/network.py @@ -599,3 +599,19 @@ def get_nft_vrf_zone_mapping() -> dict: for (vrf_name, vrf_id) in vrf_list: output.append({'interface' : vrf_name, 'vrf_tableid' : vrf_id}) return output + +def is_valid_ipv4_address_or_range(addr: str) -> bool: + """ + Validates if the provided address is a valid IPv4, CIDR or IPv4 range + :param addr: address to test + :return: bool: True if provided address is valid + """ + from ipaddress import ip_network + try: + if '-' in addr: # If we are checking a range, validate both address's individually + split = addr.split('-') + return is_valid_ipv4_address_or_range(split[0]) and is_valid_ipv4_address_or_range(split[1]) + else: + return ip_network(addr).version == 4 + except: + return False |