summaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
Diffstat (limited to 'python')
-rw-r--r--python/setup.py49
-rw-r--r--python/vyos/config.py12
-rw-r--r--python/vyos/config_mgmt.py10
-rw-r--r--python/vyos/configdep.py9
-rw-r--r--python/vyos/configsource.py10
-rw-r--r--python/vyos/configtree.py49
-rw-r--r--python/vyos/defaults.py5
-rwxr-xr-xpython/vyos/firewall.py7
-rw-r--r--python/vyos/frrender.py6
-rw-r--r--python/vyos/kea.py8
-rw-r--r--python/vyos/proto/__init__.py0
-rwxr-xr-xpython/vyos/proto/generate_dataclass.py178
-rw-r--r--python/vyos/proto/vyconf_client.py87
-rw-r--r--python/vyos/utils/auth.py70
-rw-r--r--python/vyos/utils/network.py16
15 files changed, 497 insertions, 19 deletions
diff --git a/python/setup.py b/python/setup.py
index 2d614e724..571b956ee 100644
--- a/python/setup.py
+++ b/python/setup.py
@@ -1,5 +1,14 @@
import os
+import sys
+import subprocess
from setuptools import setup
+from setuptools.command.build_py import build_py
+
+sys.path.append('./vyos')
+from defaults import directories
+
+def desc_out(f):
+ return os.path.splitext(f)[0] + '.desc'
def packages(directory):
return [
@@ -8,6 +17,43 @@ def packages(directory):
if os.path.isfile(os.path.join(_[0], '__init__.py'))
]
+
+class GenerateProto(build_py):
+ ver = os.environ.get('OCAML_VERSION')
+ if ver:
+ proto_path = f'/opt/opam/{ver}/share/vyconf'
+ else:
+ proto_path = directories['proto_path']
+
+ def run(self):
+ # find all .proto files in vyconf proto_path
+ proto_files = []
+ for _, _, files in os.walk(self.proto_path):
+ for file in files:
+ if file.endswith('.proto'):
+ proto_files.append(file)
+
+ # compile each .proto file to Python
+ for proto_file in proto_files:
+ subprocess.check_call(
+ [
+ 'protoc',
+ '--python_out=vyos/proto',
+ f'--proto_path={self.proto_path}/',
+ f'--descriptor_set_out=vyos/proto/{desc_out(proto_file)}',
+ proto_file,
+ ]
+ )
+ subprocess.check_call(
+ [
+ 'vyos/proto/generate_dataclass.py',
+ 'vyos/proto/vyconf.desc',
+ '--out-dir=vyos/proto',
+ ]
+ )
+
+ build_py.run(self)
+
setup(
name = "vyos",
version = "1.3.0",
@@ -29,4 +75,7 @@ setup(
"config-mgmt = vyos.config_mgmt:run",
],
},
+ cmdclass={
+ 'build_py': GenerateProto,
+ },
)
diff --git a/python/vyos/config.py b/python/vyos/config.py
index 1fab46761..546eeceab 100644
--- a/python/vyos/config.py
+++ b/python/vyos/config.py
@@ -149,6 +149,18 @@ class Config(object):
return self._running_config
return self._session_config
+ def get_bool_attr(self, attr) -> bool:
+ if not hasattr(self, attr):
+ return False
+ else:
+ tmp = getattr(self, attr)
+ if not isinstance(tmp, bool):
+ return False
+ return tmp
+
+ def set_bool_attr(self, attr, val):
+ setattr(self, attr, val)
+
def _make_path(self, path):
# Backwards-compatibility stuff: original implementation used string paths
# libvyosconfig paths are lists, but since node names cannot contain whitespace,
diff --git a/python/vyos/config_mgmt.py b/python/vyos/config_mgmt.py
index 1c2b70fdf..dd8910afb 100644
--- a/python/vyos/config_mgmt.py
+++ b/python/vyos/config_mgmt.py
@@ -287,7 +287,7 @@ Proceed ?"""
# commits under commit-confirm are not added to revision list unless
# confirmed, hence a soft revert is to revision 0
- revert_ct = self._get_config_tree_revision(0)
+ revert_ct = self.get_config_tree_revision(0)
message = '[commit-confirm] Reverting to previous config now'
os.system('wall -n ' + message)
@@ -351,7 +351,7 @@ Proceed ?"""
)
return msg, 1
- rollback_ct = self._get_config_tree_revision(rev)
+ rollback_ct = self.get_config_tree_revision(rev)
try:
load(rollback_ct, switch='explicit')
print('Rollback diff has been applied.')
@@ -382,7 +382,7 @@ Proceed ?"""
if rev1 is not None:
if not self._check_revision_number(rev1):
return f'Invalid revision number {rev1}', 1
- ct1 = self._get_config_tree_revision(rev1)
+ ct1 = self.get_config_tree_revision(rev1)
ct2 = self.working_config
msg = f'No changes between working and revision {rev1} configurations.\n'
if rev2 is not None:
@@ -390,7 +390,7 @@ Proceed ?"""
return f'Invalid revision number {rev2}', 1
# compare older to newer
ct2 = ct1
- ct1 = self._get_config_tree_revision(rev2)
+ ct1 = self.get_config_tree_revision(rev2)
msg = f'No changes between revisions {rev2} and {rev1} configurations.\n'
out = ''
@@ -575,7 +575,7 @@ Proceed ?"""
r = f.read().decode()
return r
- def _get_config_tree_revision(self, rev: int):
+ def get_config_tree_revision(self, rev: int):
c = self._get_file_revision(rev)
return ConfigTree(c)
diff --git a/python/vyos/configdep.py b/python/vyos/configdep.py
index cf7c9d543..747af8dbe 100644
--- a/python/vyos/configdep.py
+++ b/python/vyos/configdep.py
@@ -102,11 +102,16 @@ def run_config_mode_script(target: str, config: 'Config'):
mod = load_as_module(name, path)
config.set_level([])
+ dry_run = config.get_bool_attr('dry_run')
try:
c = mod.get_config(config)
mod.verify(c)
- mod.generate(c)
- mod.apply(c)
+ if not dry_run:
+ mod.generate(c)
+ mod.apply(c)
+ else:
+ if hasattr(mod, 'call_dependents'):
+ mod.call_dependents()
except (VyOSError, ConfigError) as e:
raise ConfigError(str(e)) from e
diff --git a/python/vyos/configsource.py b/python/vyos/configsource.py
index 59e5ac8a1..65cef5333 100644
--- a/python/vyos/configsource.py
+++ b/python/vyos/configsource.py
@@ -319,3 +319,13 @@ class ConfigSourceString(ConfigSource):
self._session_config = ConfigTree(session_config_text) if session_config_text else None
except ValueError:
raise ConfigSourceError(f"Init error in {type(self)}")
+
+class ConfigSourceCache(ConfigSource):
+ def __init__(self, running_config_cache=None, session_config_cache=None):
+ super().__init__()
+
+ try:
+ self._running_config = ConfigTree(internal=running_config_cache) if running_config_cache else None
+ self._session_config = ConfigTree(internal=session_config_cache) if session_config_cache else None
+ except ValueError:
+ raise ConfigSourceError(f"Init error in {type(self)}")
diff --git a/python/vyos/configtree.py b/python/vyos/configtree.py
index 4ad0620a5..ff40fbad0 100644
--- a/python/vyos/configtree.py
+++ b/python/vyos/configtree.py
@@ -50,7 +50,7 @@ def unescape_backslash(string: str) -> str:
def extract_version(s):
"""Extract the version string from the config string"""
t = re.split('(^//)', s, maxsplit=1, flags=re.MULTILINE)
- return (s, ''.join(t[1:]))
+ return (t[0], ''.join(t[1:]))
def check_path(path):
@@ -66,9 +66,14 @@ class ConfigTreeError(Exception):
class ConfigTree(object):
- def __init__(self, config_string=None, address=None, libpath=LIBPATH):
- if config_string is None and address is None:
- raise TypeError("ConfigTree() requires one of 'config_string' or 'address'")
+ def __init__(
+ self, config_string=None, address=None, internal=None, libpath=LIBPATH
+ ):
+ if config_string is None and address is None and internal is None:
+ raise TypeError(
+ "ConfigTree() requires one of 'config_string', 'address', or 'internal'"
+ )
+
self.__config = None
self.__lib = cdll.LoadLibrary(libpath)
@@ -89,6 +94,13 @@ class ConfigTree(object):
self.__to_commands.argtypes = [c_void_p, c_char_p]
self.__to_commands.restype = c_char_p
+ self.__read_internal = self.__lib.read_internal
+ self.__read_internal.argtypes = [c_char_p]
+ self.__read_internal.restype = c_void_p
+
+ self.__write_internal = self.__lib.write_internal
+ self.__write_internal.argtypes = [c_void_p, c_char_p]
+
self.__to_json = self.__lib.to_json
self.__to_json.argtypes = [c_void_p]
self.__to_json.restype = c_char_p
@@ -168,7 +180,21 @@ class ConfigTree(object):
self.__destroy = self.__lib.destroy
self.__destroy.argtypes = [c_void_p]
- if address is None:
+ self.__equal = self.__lib.equal
+ self.__equal.argtypes = [c_void_p, c_void_p]
+ self.__equal.restype = c_bool
+
+ if address is not None:
+ self.__config = address
+ self.__version = ''
+ elif internal is not None:
+ config = self.__read_internal(internal.encode())
+ if config is None:
+ msg = self.__get_error().decode()
+ raise ValueError('Failed to read internal rep: {0}'.format(msg))
+ else:
+ self.__config = config
+ elif config_string is not None:
config_section, version_section = extract_version(config_string)
config_section = escape_backslash(config_section)
config = self.__from_string(config_section.encode())
@@ -179,8 +205,9 @@ class ConfigTree(object):
self.__config = config
self.__version = version_section
else:
- self.__config = address
- self.__version = ''
+ raise TypeError(
+ "ConfigTree() requires one of 'config_string', 'address', or 'internal'"
+ )
self.__migration = os.environ.get('VYOS_MIGRATION')
if self.__migration:
@@ -190,6 +217,11 @@ class ConfigTree(object):
if self.__config is not None:
self.__destroy(self.__config)
+ def __eq__(self, other):
+ if isinstance(other, ConfigTree):
+ return self.__equal(self._get_config(), other._get_config())
+ return False
+
def __str__(self):
return self.to_string()
@@ -199,6 +231,9 @@ class ConfigTree(object):
def get_version_string(self):
return self.__version
+ def write_cache(self, file_name):
+ self.__write_internal(self._get_config(), file_name)
+
def to_string(self, ordered_values=False, no_version=False):
config_string = self.__to_string(self.__config, ordered_values).decode()
config_string = unescape_backslash(config_string)
diff --git a/python/vyos/defaults.py b/python/vyos/defaults.py
index 86194cd55..2b08ff68e 100644
--- a/python/vyos/defaults.py
+++ b/python/vyos/defaults.py
@@ -38,7 +38,8 @@ directories = {
'vyos_configdir' : '/opt/vyatta/config',
'completion_dir' : f'{base_dir}/completion',
'ca_certificates' : '/usr/local/share/ca-certificates/vyos',
- 'ppp_nexthop_dir' : '/run/ppp_nexthop'
+ 'ppp_nexthop_dir' : '/run/ppp_nexthop',
+ 'proto_path' : '/usr/share/vyos/vyconf'
}
systemd_services = {
@@ -69,3 +70,5 @@ rt_symbolic_names = {
rt_global_vrf = rt_symbolic_names['main']
rt_global_table = rt_symbolic_names['main']
+
+vyconfd_conf = '/etc/vyos/vyconfd.conf'
diff --git a/python/vyos/firewall.py b/python/vyos/firewall.py
index 314e8dfe3..9f01f8be1 100755
--- a/python/vyos/firewall.py
+++ b/python/vyos/firewall.py
@@ -310,6 +310,13 @@ def parse_rule(rule_conf, hook, fw_name, rule_id, ip_name):
operator = '!='
group_name = group_name[1:]
output.append(f'{ip_name} {prefix}addr {operator} @D_{group_name}')
+ elif 'remote_group' in group:
+ group_name = group['remote_group']
+ operator = ''
+ if group_name[0] == '!':
+ operator = '!='
+ group_name = group_name[1:]
+ output.append(f'{ip_name} {prefix}addr {operator} @R_{group_name}')
if 'mac_group' in group:
group_name = group['mac_group']
operator = ''
diff --git a/python/vyos/frrender.py b/python/vyos/frrender.py
index ba44978d1..8d469e3e2 100644
--- a/python/vyos/frrender.py
+++ b/python/vyos/frrender.py
@@ -60,6 +60,10 @@ def get_frrender_dict(conf, argv=None) -> dict:
from vyos.configdict import get_dhcp_interfaces
from vyos.configdict import get_pppoe_interfaces
+ # We need to re-set the CLI path to the root level, as this function uses
+ # conf.exists() with an absolute path form the CLI root
+ conf.set_level([])
+
# Create an empty dictionary which will be filled down the code path and
# returned to the caller
dict = {}
@@ -599,8 +603,10 @@ def get_frrender_dict(conf, argv=None) -> dict:
dict.update({'vrf' : vrf})
if os.path.exists(frr_debug_enable):
+ print(f'---- get_frrender_dict({conf}) ----')
import pprint
pprint.pprint(dict)
+ print('-----------------------------------')
return dict
diff --git a/python/vyos/kea.py b/python/vyos/kea.py
index c7947af3e..9fc5dde3d 100644
--- a/python/vyos/kea.py
+++ b/python/vyos/kea.py
@@ -483,10 +483,10 @@ def kea_get_domain_from_subnet_id(config, inet, subnet_id):
if option['name'] == 'domain-name':
return option['data']
- # domain-name is not found in subnet, fallback to shared-network pool option
- for option in network['option-data']:
- if option['name'] == 'domain-name':
- return option['data']
+ # domain-name is not found in subnet, fallback to shared-network pool option
+ for option in network['option-data']:
+ if option['name'] == 'domain-name':
+ return option['data']
return None
diff --git a/python/vyos/proto/__init__.py b/python/vyos/proto/__init__.py
new file mode 100644
index 000000000..e69de29bb
--- /dev/null
+++ b/python/vyos/proto/__init__.py
diff --git a/python/vyos/proto/generate_dataclass.py b/python/vyos/proto/generate_dataclass.py
new file mode 100755
index 000000000..c6296c568
--- /dev/null
+++ b/python/vyos/proto/generate_dataclass.py
@@ -0,0 +1,178 @@
+#!/usr/bin/env python3
+#
+# Copyright (C) 2025 VyOS maintainers and contributors
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License version 2 or later as
+# published by the Free Software Foundation.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+#
+import argparse
+import os
+
+from google.protobuf.descriptor_pb2 import FileDescriptorSet # pylint: disable=no-name-in-module
+from google.protobuf.descriptor_pb2 import FieldDescriptorProto # pylint: disable=no-name-in-module
+from humps import decamelize
+
+HEADER = """\
+from enum import IntEnum
+from dataclasses import dataclass
+from dataclasses import field
+"""
+
+
+def normalize(s: str) -> str:
+ """Decamelize and avoid syntactic collision"""
+ t = decamelize(s)
+ return t + '_' if t in ['from'] else t
+
+
+def generate_dataclass(descriptor_proto):
+ class_name = descriptor_proto.name
+ fields = []
+ for field_p in descriptor_proto.field:
+ field_name = field_p.name
+ field_type, field_default = get_type(field_p.type, field_p.type_name)
+ match field_p.label:
+ case FieldDescriptorProto.LABEL_REPEATED:
+ field_type = f'list[{field_type}] = field(default_factory=list)'
+ case FieldDescriptorProto.LABEL_OPTIONAL:
+ field_type = f'{field_type} = None'
+ case _:
+ field_type = f'{field_type} = {field_default}'
+
+ fields.append(f' {field_name}: {field_type}')
+
+ code = f"""
+@dataclass
+class {class_name}:
+{chr(10).join(fields) if fields else ' pass'}
+"""
+
+ return code
+
+
+def generate_request(descriptor_proto):
+ class_name = descriptor_proto.name
+ fields = []
+ f_vars = []
+ for field_p in descriptor_proto.field:
+ field_name = field_p.name
+ field_type, field_default = get_type(field_p.type, field_p.type_name)
+ match field_p.label:
+ case FieldDescriptorProto.LABEL_REPEATED:
+ field_type = f'list[{field_type}] = []'
+ case FieldDescriptorProto.LABEL_OPTIONAL:
+ field_type = f'{field_type} = None'
+ case _:
+ field_type = f'{field_type} = {field_default}'
+
+ fields.append(f'{normalize(field_name)}: {field_type}')
+ f_vars.append(f'{normalize(field_name)}')
+
+ fields.insert(0, 'token: str = None')
+
+ code = f"""
+def set_request_{decamelize(class_name)}({', '.join(fields)}):
+ reqi = {class_name} ({', '.join(f_vars)})
+ req = Request({decamelize(class_name)}=reqi)
+ req_env = RequestEnvelope(token, req)
+ return req_env
+"""
+
+ return code
+
+
+def generate_nested_dataclass(descriptor_proto):
+ out = ''
+ for nested_p in descriptor_proto.nested_type:
+ out = out + generate_dataclass(nested_p)
+
+ return out
+
+
+def generate_nested_request(descriptor_proto):
+ out = ''
+ for nested_p in descriptor_proto.nested_type:
+ out = out + generate_request(nested_p)
+
+ return out
+
+
+def generate_enum_dataclass(descriptor_proto):
+ code = ''
+ for enum_p in descriptor_proto.enum_type:
+ enums = []
+ enum_name = enum_p.name
+ for enum_val in enum_p.value:
+ enums.append(f' {enum_val.name} = {enum_val.number}')
+
+ code += f"""
+class {enum_name}(IntEnum):
+{chr(10).join(enums)}
+"""
+
+ return code
+
+
+def get_type(field_type, type_name):
+ res = 'Any', None
+ match field_type:
+ case FieldDescriptorProto.TYPE_STRING:
+ res = 'str', '""'
+ case FieldDescriptorProto.TYPE_INT32 | FieldDescriptorProto.TYPE_INT64:
+ res = 'int', 0
+ case FieldDescriptorProto.TYPE_FLOAT | FieldDescriptorProto.TYPE_DOUBLE:
+ res = 'float', 0.0
+ case FieldDescriptorProto.TYPE_BOOL:
+ res = 'bool', False
+ case FieldDescriptorProto.TYPE_MESSAGE | FieldDescriptorProto.TYPE_ENUM:
+ res = type_name.split('.')[-1], None
+ case _:
+ pass
+
+ return res
+
+
+if __name__ == '__main__':
+ parser = argparse.ArgumentParser()
+ parser.add_argument('descriptor_file', help='protobuf .desc file')
+ parser.add_argument('--out-dir', help='directory to write generated file')
+ args = parser.parse_args()
+ desc_file = args.descriptor_file
+ out_dir = args.out_dir
+
+ with open(desc_file, 'rb') as f:
+ descriptor_set_data = f.read()
+
+ descriptor_set = FileDescriptorSet()
+ descriptor_set.ParseFromString(descriptor_set_data)
+
+ for file_proto in descriptor_set.file:
+ f = f'{file_proto.name.replace(".", "_")}.py'
+ f = os.path.join(out_dir, f)
+ dataclass_code = ''
+ nested_code = ''
+ enum_code = ''
+ request_code = ''
+ with open(f, 'w') as f:
+ enum_code += generate_enum_dataclass(file_proto)
+ for message_proto in file_proto.message_type:
+ dataclass_code += generate_dataclass(message_proto)
+ nested_code += generate_nested_dataclass(message_proto)
+ enum_code += generate_enum_dataclass(message_proto)
+ request_code += generate_nested_request(message_proto)
+
+ f.write(HEADER)
+ f.write(enum_code)
+ f.write(nested_code)
+ f.write(dataclass_code)
+ f.write(request_code)
diff --git a/python/vyos/proto/vyconf_client.py b/python/vyos/proto/vyconf_client.py
new file mode 100644
index 000000000..f34549309
--- /dev/null
+++ b/python/vyos/proto/vyconf_client.py
@@ -0,0 +1,87 @@
+# Copyright 2025 VyOS maintainers and contributors <maintainers@vyos.io>
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public License
+# along with this library. If not, see <http://www.gnu.org/licenses/>.
+
+import socket
+from dataclasses import asdict
+
+from vyos.proto import vyconf_proto
+from vyos.proto import vyconf_pb2
+
+from google.protobuf.json_format import MessageToDict
+from google.protobuf.json_format import ParseDict
+
+socket_path = '/var/run/vyconfd.sock'
+
+
+def send_socket(msg: bytearray) -> bytes:
+ data = bytes()
+ client = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+ client.connect(socket_path)
+ client.sendall(msg)
+
+ data_length = client.recv(4)
+ if data_length:
+ length = int.from_bytes(data_length)
+ data = client.recv(length)
+
+ client.close()
+
+ return data
+
+
+def request_to_msg(req: vyconf_proto.RequestEnvelope) -> vyconf_pb2.RequestEnvelope:
+ # pylint: disable=no-member
+
+ msg = vyconf_pb2.RequestEnvelope()
+ msg = ParseDict(asdict(req), msg, ignore_unknown_fields=True)
+ return msg
+
+
+def msg_to_response(msg: vyconf_pb2.Response) -> vyconf_proto.Response:
+ # pylint: disable=no-member
+
+ d = MessageToDict(msg, preserving_proto_field_name=True)
+
+ response = vyconf_proto.Response(**d)
+ return response
+
+
+def write_request(req: vyconf_proto.RequestEnvelope) -> bytearray:
+ req_msg = request_to_msg(req)
+ encoded_data = req_msg.SerializeToString()
+ byte_size = req_msg.ByteSize()
+ length_bytes = byte_size.to_bytes(4)
+ arr = bytearray(length_bytes)
+ arr.extend(encoded_data)
+
+ return arr
+
+
+def read_response(msg: bytes) -> vyconf_proto.Response:
+ response_msg = vyconf_pb2.Response() # pylint: disable=no-member
+ response_msg.ParseFromString(msg)
+ response = msg_to_response(response_msg)
+
+ return response
+
+
+def send_request(name, *args, **kwargs):
+ func = getattr(vyconf_proto, f'set_request_{name}')
+ request_env = func(*args, **kwargs)
+ msg = write_request(request_env)
+ response_msg = send_socket(msg)
+ response = read_response(response_msg)
+
+ return response
diff --git a/python/vyos/utils/auth.py b/python/vyos/utils/auth.py
index a0b3e1cae..5d0e3464a 100644
--- a/python/vyos/utils/auth.py
+++ b/python/vyos/utils/auth.py
@@ -13,10 +13,80 @@
# You should have received a copy of the GNU Lesser General Public License along with this library;
# if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+import cracklib
+import math
import re
+import string
+from enum import StrEnum
+from decimal import Decimal
from vyos.utils.process import cmd
+
+DEFAULT_PASSWORD: str = 'vyos'
+LOW_ENTROPY_MSG: str = 'should be at least 8 characters long;'
+WEAK_PASSWORD_MSG: str = 'The password complexity is too low - @MSG@'
+CRACKLIB_ERROR_MSG: str = 'A following error occurred: @MSG@\n' \
+ 'Possibly the cracklib database is corrupted or is missing. ' \
+ 'Try reinstalling the python3-cracklib package.'
+
+class EPasswdStrength(StrEnum):
+ WEAK = 'Weak'
+ DECENT = 'Decent'
+ STRONG = 'Strong'
+ ERROR = 'Cracklib Error'
+
+
+def calculate_entropy(charset: str, passwd: str) -> float:
+ """
+ Calculate the entropy of a password based on the set of characters used
+ Uses E = log2(R**L) formula, where
+ - R is the range (length) of the character set
+ - L is the length of password
+ """
+ return math.log(math.pow(len(charset), len(passwd)), 2)
+
+def evaluate_strength(passwd: str) -> dict[str, str]:
+ """ Evaluates password strength and returns a check result dict """
+ charset = (cracklib.ASCII_UPPERCASE + cracklib.ASCII_LOWERCASE +
+ string.punctuation + string.digits)
+
+ result = {
+ 'strength': '',
+ 'error': '',
+ }
+
+ try:
+ cracklib.FascistCheck(passwd)
+ except ValueError as e:
+ # The password is vulnerable to dictionary attack no matter the entropy
+ if 'is' in str(e):
+ msg = str(e).replace('is', 'should not be')
+ else:
+ msg = f'should not be {e}'
+ result.update(strength=EPasswdStrength.WEAK)
+ result.update(error=WEAK_PASSWORD_MSG.replace('@MSG@', msg))
+ except Exception as e:
+ result.update(strength=EPasswdStrength.ERROR)
+ result.update(error=CRACKLIB_ERROR_MSG.replace('@MSG@', str(e)))
+ else:
+ # Now check the password's entropy
+ # Cast to Decimal for more precise rounding
+ entropy = Decimal.from_float(calculate_entropy(charset, passwd))
+
+ match round(entropy):
+ case e if e in range(0, 59):
+ result.update(strength=EPasswdStrength.WEAK)
+ result.update(
+ error=WEAK_PASSWORD_MSG.replace('@MSG@', LOW_ENTROPY_MSG)
+ )
+ case e if e in range(60, 119):
+ result.update(strength=EPasswdStrength.DECENT)
+ case e if e >= 120:
+ result.update(strength=EPasswdStrength.STRONG)
+
+ return result
+
def make_password_hash(password):
""" Makes a password hash for /etc/shadow using mkpasswd """
diff --git a/python/vyos/utils/network.py b/python/vyos/utils/network.py
index dc0c0a6d6..2f666f0ee 100644
--- a/python/vyos/utils/network.py
+++ b/python/vyos/utils/network.py
@@ -599,3 +599,19 @@ def get_nft_vrf_zone_mapping() -> dict:
for (vrf_name, vrf_id) in vrf_list:
output.append({'interface' : vrf_name, 'vrf_tableid' : vrf_id})
return output
+
+def is_valid_ipv4_address_or_range(addr: str) -> bool:
+ """
+ Validates if the provided address is a valid IPv4, CIDR or IPv4 range
+ :param addr: address to test
+ :return: bool: True if provided address is valid
+ """
+ from ipaddress import ip_network
+ try:
+ if '-' in addr: # If we are checking a range, validate both address's individually
+ split = addr.split('-')
+ return is_valid_ipv4_address_or_range(split[0]) and is_valid_ipv4_address_or_range(split[1])
+ else:
+ return ip_network(addr).version == 4
+ except:
+ return False