summaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
Diffstat (limited to 'python')
-rw-r--r--python/setup.py49
-rw-r--r--python/vyos/base.py21
-rw-r--r--python/vyos/component_version.py63
-rw-r--r--python/vyos/config.py12
-rw-r--r--python/vyos/config_mgmt.py10
-rw-r--r--python/vyos/configdep.py9
-rw-r--r--python/vyos/configdict.py25
-rw-r--r--python/vyos/configsession.py55
-rw-r--r--python/vyos/configsource.py10
-rw-r--r--python/vyos/configtree.py49
-rw-r--r--python/vyos/configverify.py6
-rw-r--r--python/vyos/defaults.py12
-rwxr-xr-xpython/vyos/firewall.py93
-rw-r--r--python/vyos/frrender.py11
-rw-r--r--python/vyos/ifconfig/bridge.py14
-rw-r--r--python/vyos/ifconfig/interface.py126
-rw-r--r--python/vyos/kea.py75
-rw-r--r--python/vyos/proto/__init__.py0
-rwxr-xr-xpython/vyos/proto/generate_dataclass.py178
-rw-r--r--python/vyos/proto/vyconf_client.py89
-rw-r--r--python/vyos/system/grub_util.py5
-rwxr-xr-xpython/vyos/template.py144
-rw-r--r--python/vyos/utils/auth.py70
-rw-r--r--python/vyos/utils/network.py92
-rw-r--r--python/vyos/utils/process.py48
-rw-r--r--python/vyos/vyconf_session.py123
26 files changed, 1207 insertions, 182 deletions
diff --git a/python/setup.py b/python/setup.py
index 2d614e724..571b956ee 100644
--- a/python/setup.py
+++ b/python/setup.py
@@ -1,5 +1,14 @@
import os
+import sys
+import subprocess
from setuptools import setup
+from setuptools.command.build_py import build_py
+
+sys.path.append('./vyos')
+from defaults import directories
+
+def desc_out(f):
+ return os.path.splitext(f)[0] + '.desc'
def packages(directory):
return [
@@ -8,6 +17,43 @@ def packages(directory):
if os.path.isfile(os.path.join(_[0], '__init__.py'))
]
+
+class GenerateProto(build_py):
+ ver = os.environ.get('OCAML_VERSION')
+ if ver:
+ proto_path = f'/opt/opam/{ver}/share/vyconf'
+ else:
+ proto_path = directories['proto_path']
+
+ def run(self):
+ # find all .proto files in vyconf proto_path
+ proto_files = []
+ for _, _, files in os.walk(self.proto_path):
+ for file in files:
+ if file.endswith('.proto'):
+ proto_files.append(file)
+
+ # compile each .proto file to Python
+ for proto_file in proto_files:
+ subprocess.check_call(
+ [
+ 'protoc',
+ '--python_out=vyos/proto',
+ f'--proto_path={self.proto_path}/',
+ f'--descriptor_set_out=vyos/proto/{desc_out(proto_file)}',
+ proto_file,
+ ]
+ )
+ subprocess.check_call(
+ [
+ 'vyos/proto/generate_dataclass.py',
+ 'vyos/proto/vyconf.desc',
+ '--out-dir=vyos/proto',
+ ]
+ )
+
+ build_py.run(self)
+
setup(
name = "vyos",
version = "1.3.0",
@@ -29,4 +75,7 @@ setup(
"config-mgmt = vyos.config_mgmt:run",
],
},
+ cmdclass={
+ 'build_py': GenerateProto,
+ },
)
diff --git a/python/vyos/base.py b/python/vyos/base.py
index ca96d96ce..3173ddc20 100644
--- a/python/vyos/base.py
+++ b/python/vyos/base.py
@@ -1,4 +1,4 @@
-# Copyright 2018-2022 VyOS maintainers and contributors <maintainers@vyos.io>
+# Copyright 2018-2025 VyOS maintainers and contributors <maintainers@vyos.io>
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
@@ -15,8 +15,7 @@
from textwrap import fill
-
-class BaseWarning:
+class UserMessage:
def __init__(self, header, message, **kwargs):
self.message = message
self.kwargs = kwargs
@@ -33,7 +32,6 @@ class BaseWarning:
messages = self.message.split('\n')
isfirstmessage = True
initial_indent = self.textinitindent
- print('')
for mes in messages:
mes = fill(mes, initial_indent=initial_indent,
subsequent_indent=self.standardindent, **self.kwargs)
@@ -44,17 +42,24 @@ class BaseWarning:
print('', flush=True)
+class Message():
+ def __init__(self, message, **kwargs):
+ self.Message = UserMessage('', message, **kwargs)
+ self.Message.print()
+
class Warning():
def __init__(self, message, **kwargs):
- self.BaseWarn = BaseWarning('WARNING: ', message, **kwargs)
- self.BaseWarn.print()
+ print('')
+ self.UserMessage = UserMessage('WARNING: ', message, **kwargs)
+ self.UserMessage.print()
class DeprecationWarning():
def __init__(self, message, **kwargs):
# Reformat the message and trim it to 72 characters in length
- self.BaseWarn = BaseWarning('DEPRECATION WARNING: ', message, **kwargs)
- self.BaseWarn.print()
+ print('')
+ self.UserMessage = UserMessage('DEPRECATION WARNING: ', message, **kwargs)
+ self.UserMessage.print()
class ConfigError(Exception):
diff --git a/python/vyos/component_version.py b/python/vyos/component_version.py
index 94215531d..81d986658 100644
--- a/python/vyos/component_version.py
+++ b/python/vyos/component_version.py
@@ -49,7 +49,9 @@ DEFAULT_CONFIG_PATH = os.path.join(directories['config'], 'config.boot')
REGEX_WARN_VYOS = r'(// Warning: Do not remove the following line.)'
REGEX_WARN_VYATTA = r'(/\* Warning: Do not remove the following line. \*/)'
REGEX_COMPONENT_VERSION_VYOS = r'// vyos-config-version:\s+"([\w@:-]+)"\s*'
-REGEX_COMPONENT_VERSION_VYATTA = r'/\* === vyatta-config-version:\s+"([\w@:-]+)"\s+=== \*/'
+REGEX_COMPONENT_VERSION_VYATTA = (
+ r'/\* === vyatta-config-version:\s+"([\w@:-]+)"\s+=== \*/'
+)
REGEX_RELEASE_VERSION_VYOS = r'// Release version:\s+(\S*)\s*'
REGEX_RELEASE_VERSION_VYATTA = r'/\* Release version:\s+(\S*)\s*\*/'
@@ -62,16 +64,31 @@ CONFIG_FILE_VERSION = """\
warn_filter_vyos = re.compile(REGEX_WARN_VYOS)
warn_filter_vyatta = re.compile(REGEX_WARN_VYATTA)
-regex_filter = { 'vyos': dict(zip(['component', 'release'],
- [re.compile(REGEX_COMPONENT_VERSION_VYOS),
- re.compile(REGEX_RELEASE_VERSION_VYOS)])),
- 'vyatta': dict(zip(['component', 'release'],
- [re.compile(REGEX_COMPONENT_VERSION_VYATTA),
- re.compile(REGEX_RELEASE_VERSION_VYATTA)])) }
+regex_filter = {
+ 'vyos': dict(
+ zip(
+ ['component', 'release'],
+ [
+ re.compile(REGEX_COMPONENT_VERSION_VYOS),
+ re.compile(REGEX_RELEASE_VERSION_VYOS),
+ ],
+ )
+ ),
+ 'vyatta': dict(
+ zip(
+ ['component', 'release'],
+ [
+ re.compile(REGEX_COMPONENT_VERSION_VYATTA),
+ re.compile(REGEX_RELEASE_VERSION_VYATTA),
+ ],
+ )
+ ),
+}
+
@dataclass
class VersionInfo:
- component: Optional[dict[str,int]] = None
+ component: Optional[dict[str, int]] = None
release: str = get_version()
vintage: str = 'vyos'
config_body: Optional[str] = None
@@ -84,8 +101,9 @@ class VersionInfo:
return bool(self.config_body is None)
def update_footer(self):
- f = CONFIG_FILE_VERSION.format(component_to_string(self.component),
- self.release)
+ f = CONFIG_FILE_VERSION.format(
+ component_to_string(self.component), self.release
+ )
self.footer_lines = f.splitlines()
def update_syntax(self):
@@ -121,13 +139,16 @@ class VersionInfo:
except Exception as e:
raise ValueError(e) from e
+
def component_to_string(component: dict) -> str:
- l = [f'{k}@{v}' for k, v in sorted(component.items(), key=lambda x: x[0])]
+ l = [f'{k}@{v}' for k, v in sorted(component.items(), key=lambda x: x[0])] # noqa: E741
return ':'.join(l)
+
def component_from_string(string: str) -> dict:
return {k: int(v) for k, v in re.findall(r'([\w,-]+)@(\d+)', string)}
+
def version_info_from_file(config_file) -> VersionInfo:
"""Return config file component and release version info."""
version_info = VersionInfo()
@@ -166,27 +187,27 @@ def version_info_from_file(config_file) -> VersionInfo:
return version_info
+
def version_info_from_system() -> VersionInfo:
"""Return system component and release version info."""
d = component_version()
sort_d = dict(sorted(d.items(), key=lambda x: x[0]))
- version_info = VersionInfo(
- component = sort_d,
- release = get_version(),
- vintage = 'vyos'
- )
+ version_info = VersionInfo(component=sort_d, release=get_version(), vintage='vyos')
return version_info
+
def version_info_copy(v: VersionInfo) -> VersionInfo:
"""Make a copy of dataclass."""
return replace(v)
+
def version_info_prune_component(x: VersionInfo, y: VersionInfo) -> VersionInfo:
"""In place pruning of component keys of x not in y."""
if x.component is None or y.component is None:
return
- x.component = { k: v for k,v in x.component.items() if k in y.component }
+ x.component = {k: v for k, v in x.component.items() if k in y.component}
+
def add_system_version(config_str: str = None, out_file: str = None):
"""Wrap config string with system version and write to out_file.
@@ -202,3 +223,11 @@ def add_system_version(config_str: str = None, out_file: str = None):
version_info.write(out_file)
else:
sys.stdout.write(version_info.write_string())
+
+
+def append_system_version(file: str):
+ """Append system version data to existing file"""
+ version_info = version_info_from_system()
+ version_info.update_footer()
+ with open(file, 'a') as f:
+ f.write(version_info.write_string())
diff --git a/python/vyos/config.py b/python/vyos/config.py
index 1fab46761..546eeceab 100644
--- a/python/vyos/config.py
+++ b/python/vyos/config.py
@@ -149,6 +149,18 @@ class Config(object):
return self._running_config
return self._session_config
+ def get_bool_attr(self, attr) -> bool:
+ if not hasattr(self, attr):
+ return False
+ else:
+ tmp = getattr(self, attr)
+ if not isinstance(tmp, bool):
+ return False
+ return tmp
+
+ def set_bool_attr(self, attr, val):
+ setattr(self, attr, val)
+
def _make_path(self, path):
# Backwards-compatibility stuff: original implementation used string paths
# libvyosconfig paths are lists, but since node names cannot contain whitespace,
diff --git a/python/vyos/config_mgmt.py b/python/vyos/config_mgmt.py
index 1c2b70fdf..dd8910afb 100644
--- a/python/vyos/config_mgmt.py
+++ b/python/vyos/config_mgmt.py
@@ -287,7 +287,7 @@ Proceed ?"""
# commits under commit-confirm are not added to revision list unless
# confirmed, hence a soft revert is to revision 0
- revert_ct = self._get_config_tree_revision(0)
+ revert_ct = self.get_config_tree_revision(0)
message = '[commit-confirm] Reverting to previous config now'
os.system('wall -n ' + message)
@@ -351,7 +351,7 @@ Proceed ?"""
)
return msg, 1
- rollback_ct = self._get_config_tree_revision(rev)
+ rollback_ct = self.get_config_tree_revision(rev)
try:
load(rollback_ct, switch='explicit')
print('Rollback diff has been applied.')
@@ -382,7 +382,7 @@ Proceed ?"""
if rev1 is not None:
if not self._check_revision_number(rev1):
return f'Invalid revision number {rev1}', 1
- ct1 = self._get_config_tree_revision(rev1)
+ ct1 = self.get_config_tree_revision(rev1)
ct2 = self.working_config
msg = f'No changes between working and revision {rev1} configurations.\n'
if rev2 is not None:
@@ -390,7 +390,7 @@ Proceed ?"""
return f'Invalid revision number {rev2}', 1
# compare older to newer
ct2 = ct1
- ct1 = self._get_config_tree_revision(rev2)
+ ct1 = self.get_config_tree_revision(rev2)
msg = f'No changes between revisions {rev2} and {rev1} configurations.\n'
out = ''
@@ -575,7 +575,7 @@ Proceed ?"""
r = f.read().decode()
return r
- def _get_config_tree_revision(self, rev: int):
+ def get_config_tree_revision(self, rev: int):
c = self._get_file_revision(rev)
return ConfigTree(c)
diff --git a/python/vyos/configdep.py b/python/vyos/configdep.py
index cf7c9d543..747af8dbe 100644
--- a/python/vyos/configdep.py
+++ b/python/vyos/configdep.py
@@ -102,11 +102,16 @@ def run_config_mode_script(target: str, config: 'Config'):
mod = load_as_module(name, path)
config.set_level([])
+ dry_run = config.get_bool_attr('dry_run')
try:
c = mod.get_config(config)
mod.verify(c)
- mod.generate(c)
- mod.apply(c)
+ if not dry_run:
+ mod.generate(c)
+ mod.apply(c)
+ else:
+ if hasattr(mod, 'call_dependents'):
+ mod.call_dependents()
except (VyOSError, ConfigError) as e:
raise ConfigError(str(e)) from e
diff --git a/python/vyos/configdict.py b/python/vyos/configdict.py
index 78b98a3eb..ff0a15933 100644
--- a/python/vyos/configdict.py
+++ b/python/vyos/configdict.py
@@ -517,6 +517,14 @@ def get_interface_dict(config, base, ifname='', recursive_defaults=True, with_pk
else:
dict['ipv6']['address'].update({'eui64_old': eui64})
+ interface_identifier = leaf_node_changed(config, base + [ifname, 'ipv6', 'address', 'interface-identifier'])
+ if interface_identifier:
+ tmp = dict_search('ipv6.address', dict)
+ if not tmp:
+ dict.update({'ipv6': {'address': {'interface_identifier_old': interface_identifier}}})
+ else:
+ dict['ipv6']['address'].update({'interface_identifier_old': interface_identifier})
+
for vif, vif_config in dict.get('vif', {}).items():
# Add subinterface name to dictionary
dict['vif'][vif].update({'ifname' : f'{ifname}.{vif}'})
@@ -626,6 +634,23 @@ def get_vlan_ids(interface):
return vlan_ids
+def get_vlans_ids_and_range(interface):
+ vlan_ids = set()
+
+ vlan_filter_status = json.loads(cmd(f'bridge -j -d vlan show dev {interface}'))
+
+ if vlan_filter_status is not None:
+ for interface_status in vlan_filter_status:
+ for vlan_entry in interface_status.get("vlans", []):
+ start = vlan_entry["vlan"]
+ end = vlan_entry.get("vlanEnd")
+ if end:
+ vlan_ids.add(f"{start}-{end}")
+ else:
+ vlan_ids.add(str(start))
+
+ return vlan_ids
+
def get_accel_dict(config, base, chap_secrets, with_pki=False):
"""
Common utility function to retrieve and mangle the Accel-PPP configuration
diff --git a/python/vyos/configsession.py b/python/vyos/configsession.py
index 90b96b88c..a3be29881 100644
--- a/python/vyos/configsession.py
+++ b/python/vyos/configsession.py
@@ -21,6 +21,10 @@ import subprocess
from vyos.defaults import directories
from vyos.utils.process import is_systemd_service_running
from vyos.utils.dict import dict_to_paths
+from vyos.utils.boot import boot_configuration_complete
+from vyos.vyconf_session import VyconfSession
+
+vyconf_backend = False
CLI_SHELL_API = '/bin/cli-shell-api'
SET = '/opt/vyatta/sbin/my_set'
@@ -165,6 +169,11 @@ class ConfigSession(object):
self.__run_command([CLI_SHELL_API, 'setupSession'])
+ if vyconf_backend and boot_configuration_complete():
+ self._vyconf_session = VyconfSession(on_error=ConfigSessionError)
+ else:
+ self._vyconf_session = None
+
def __del__(self):
try:
output = (
@@ -209,7 +218,10 @@ class ConfigSession(object):
value = []
else:
value = [value]
- self.__run_command([SET] + path + value)
+ if self._vyconf_session is None:
+ self.__run_command([SET] + path + value)
+ else:
+ self._vyconf_session.set(path + value)
def set_section(self, path: list, d: dict):
try:
@@ -223,7 +235,10 @@ class ConfigSession(object):
value = []
else:
value = [value]
- self.__run_command([DELETE] + path + value)
+ if self._vyconf_session is None:
+ self.__run_command([DELETE] + path + value)
+ else:
+ self._vyconf_session.delete(path + value)
def load_section(self, path: list, d: dict):
try:
@@ -261,20 +276,34 @@ class ConfigSession(object):
self.__run_command([COMMENT] + path + value)
def commit(self):
- out = self.__run_command([COMMIT])
+ if self._vyconf_session is None:
+ out = self.__run_command([COMMIT])
+ else:
+ out, _ = self._vyconf_session.commit()
+
return out
def discard(self):
- self.__run_command([DISCARD])
+ if self._vyconf_session is None:
+ self.__run_command([DISCARD])
+ else:
+ out, _ = self._vyconf_session.discard()
def show_config(self, path, format='raw'):
- config_data = self.__run_command(SHOW_CONFIG + path)
+ if self._vyconf_session is None:
+ config_data = self.__run_command(SHOW_CONFIG + path)
+ else:
+ config_data, _ = self._vyconf_session.show_config()
if format == 'raw':
return config_data
def load_config(self, file_path):
- out = self.__run_command(LOAD_CONFIG + [file_path])
+ if self._vyconf_session is None:
+ out = self.__run_command(LOAD_CONFIG + [file_path])
+ else:
+ out, _ = self._vyconf_session.load_config(file=file_path)
+
return out
def load_explicit(self, file_path):
@@ -287,11 +316,21 @@ class ConfigSession(object):
raise ConfigSessionError(e) from e
def migrate_and_load_config(self, file_path):
- out = self.__run_command(MIGRATE_LOAD_CONFIG + [file_path])
+ if self._vyconf_session is None:
+ out = self.__run_command(MIGRATE_LOAD_CONFIG + [file_path])
+ else:
+ out, _ = self._vyconf_session.load_config(file=file_path, migrate=True)
+
return out
def save_config(self, file_path):
- out = self.__run_command(SAVE_CONFIG + [file_path])
+ if self._vyconf_session is None:
+ out = self.__run_command(SAVE_CONFIG + [file_path])
+ else:
+ out, _ = self._vyconf_session.save_config(
+ file=file_path, append_version=True
+ )
+
return out
def install_image(self, url):
diff --git a/python/vyos/configsource.py b/python/vyos/configsource.py
index 59e5ac8a1..65cef5333 100644
--- a/python/vyos/configsource.py
+++ b/python/vyos/configsource.py
@@ -319,3 +319,13 @@ class ConfigSourceString(ConfigSource):
self._session_config = ConfigTree(session_config_text) if session_config_text else None
except ValueError:
raise ConfigSourceError(f"Init error in {type(self)}")
+
+class ConfigSourceCache(ConfigSource):
+ def __init__(self, running_config_cache=None, session_config_cache=None):
+ super().__init__()
+
+ try:
+ self._running_config = ConfigTree(internal=running_config_cache) if running_config_cache else None
+ self._session_config = ConfigTree(internal=session_config_cache) if session_config_cache else None
+ except ValueError:
+ raise ConfigSourceError(f"Init error in {type(self)}")
diff --git a/python/vyos/configtree.py b/python/vyos/configtree.py
index 4ad0620a5..ff40fbad0 100644
--- a/python/vyos/configtree.py
+++ b/python/vyos/configtree.py
@@ -50,7 +50,7 @@ def unescape_backslash(string: str) -> str:
def extract_version(s):
"""Extract the version string from the config string"""
t = re.split('(^//)', s, maxsplit=1, flags=re.MULTILINE)
- return (s, ''.join(t[1:]))
+ return (t[0], ''.join(t[1:]))
def check_path(path):
@@ -66,9 +66,14 @@ class ConfigTreeError(Exception):
class ConfigTree(object):
- def __init__(self, config_string=None, address=None, libpath=LIBPATH):
- if config_string is None and address is None:
- raise TypeError("ConfigTree() requires one of 'config_string' or 'address'")
+ def __init__(
+ self, config_string=None, address=None, internal=None, libpath=LIBPATH
+ ):
+ if config_string is None and address is None and internal is None:
+ raise TypeError(
+ "ConfigTree() requires one of 'config_string', 'address', or 'internal'"
+ )
+
self.__config = None
self.__lib = cdll.LoadLibrary(libpath)
@@ -89,6 +94,13 @@ class ConfigTree(object):
self.__to_commands.argtypes = [c_void_p, c_char_p]
self.__to_commands.restype = c_char_p
+ self.__read_internal = self.__lib.read_internal
+ self.__read_internal.argtypes = [c_char_p]
+ self.__read_internal.restype = c_void_p
+
+ self.__write_internal = self.__lib.write_internal
+ self.__write_internal.argtypes = [c_void_p, c_char_p]
+
self.__to_json = self.__lib.to_json
self.__to_json.argtypes = [c_void_p]
self.__to_json.restype = c_char_p
@@ -168,7 +180,21 @@ class ConfigTree(object):
self.__destroy = self.__lib.destroy
self.__destroy.argtypes = [c_void_p]
- if address is None:
+ self.__equal = self.__lib.equal
+ self.__equal.argtypes = [c_void_p, c_void_p]
+ self.__equal.restype = c_bool
+
+ if address is not None:
+ self.__config = address
+ self.__version = ''
+ elif internal is not None:
+ config = self.__read_internal(internal.encode())
+ if config is None:
+ msg = self.__get_error().decode()
+ raise ValueError('Failed to read internal rep: {0}'.format(msg))
+ else:
+ self.__config = config
+ elif config_string is not None:
config_section, version_section = extract_version(config_string)
config_section = escape_backslash(config_section)
config = self.__from_string(config_section.encode())
@@ -179,8 +205,9 @@ class ConfigTree(object):
self.__config = config
self.__version = version_section
else:
- self.__config = address
- self.__version = ''
+ raise TypeError(
+ "ConfigTree() requires one of 'config_string', 'address', or 'internal'"
+ )
self.__migration = os.environ.get('VYOS_MIGRATION')
if self.__migration:
@@ -190,6 +217,11 @@ class ConfigTree(object):
if self.__config is not None:
self.__destroy(self.__config)
+ def __eq__(self, other):
+ if isinstance(other, ConfigTree):
+ return self.__equal(self._get_config(), other._get_config())
+ return False
+
def __str__(self):
return self.to_string()
@@ -199,6 +231,9 @@ class ConfigTree(object):
def get_version_string(self):
return self.__version
+ def write_cache(self, file_name):
+ self.__write_internal(self._get_config(), file_name)
+
def to_string(self, ordered_values=False, no_version=False):
config_string = self.__to_string(self.__config, ordered_values).decode()
config_string = unescape_backslash(config_string)
diff --git a/python/vyos/configverify.py b/python/vyos/configverify.py
index 4084425b1..d5f443f15 100644
--- a/python/vyos/configverify.py
+++ b/python/vyos/configverify.py
@@ -92,6 +92,9 @@ def verify_mtu_ipv6(config):
tmp = dict_search('ipv6.address.eui64', config)
if tmp != None: raise ConfigError(error_msg)
+ tmp = dict_search('ipv6.address.interface_identifier', config)
+ if tmp != None: raise ConfigError(error_msg)
+
def verify_vrf(config):
"""
Common helper function used by interface implementations to perform
@@ -356,6 +359,7 @@ def verify_vlan_config(config):
verify_vrf(vlan)
verify_mirror_redirect(vlan)
verify_mtu_parent(vlan, config)
+ verify_mtu_ipv6(vlan)
# 802.1ad (Q-in-Q) VLANs
for s_vlan_id in config.get('vif_s', {}):
@@ -367,6 +371,7 @@ def verify_vlan_config(config):
verify_vrf(s_vlan)
verify_mirror_redirect(s_vlan)
verify_mtu_parent(s_vlan, config)
+ verify_mtu_ipv6(s_vlan)
for c_vlan_id in s_vlan.get('vif_c', {}):
c_vlan = s_vlan['vif_c'][c_vlan_id]
@@ -378,6 +383,7 @@ def verify_vlan_config(config):
verify_mirror_redirect(c_vlan)
verify_mtu_parent(c_vlan, config)
verify_mtu_parent(c_vlan, s_vlan)
+ verify_mtu_ipv6(c_vlan)
def verify_diffie_hellman_length(file, min_keysize):
diff --git a/python/vyos/defaults.py b/python/vyos/defaults.py
index 86194cd55..c1e5ddc04 100644
--- a/python/vyos/defaults.py
+++ b/python/vyos/defaults.py
@@ -38,14 +38,20 @@ directories = {
'vyos_configdir' : '/opt/vyatta/config',
'completion_dir' : f'{base_dir}/completion',
'ca_certificates' : '/usr/local/share/ca-certificates/vyos',
- 'ppp_nexthop_dir' : '/run/ppp_nexthop'
+ 'ppp_nexthop_dir' : '/run/ppp_nexthop',
+ 'proto_path' : '/usr/share/vyos/vyconf'
}
systemd_services = {
- 'rsyslog' : 'rsyslog.service',
+ 'haproxy' : 'haproxy.service',
+ 'syslog' : 'syslog.service',
'snmpd' : 'snmpd.service',
}
+internal_ports = {
+ 'certbot_haproxy' : 65080, # Certbot running behing haproxy
+}
+
config_status = '/tmp/vyos-config-status'
api_config_state = '/run/http-api-state'
frr_debug_enable = '/tmp/vyos.frr.debug'
@@ -69,3 +75,5 @@ rt_symbolic_names = {
rt_global_vrf = rt_symbolic_names['main']
rt_global_table = rt_symbolic_names['main']
+
+vyconfd_conf = '/etc/vyos/vyconfd.conf'
diff --git a/python/vyos/firewall.py b/python/vyos/firewall.py
index 314e8dfe3..64022db84 100755
--- a/python/vyos/firewall.py
+++ b/python/vyos/firewall.py
@@ -233,6 +233,9 @@ def parse_rule(rule_conf, hook, fw_name, rule_id, ip_name):
hook_name = 'prerouting'
if hook == 'NAM':
hook_name = f'name'
+ # for policy
+ if hook == 'route' or hook == 'route6':
+ hook_name = hook
output.append(f'{ip_name} {prefix}addr {operator} @GEOIP_CC{def_suffix}_{hook_name}_{fw_name}_{rule_id}')
if 'mac_address' in side_conf:
@@ -310,6 +313,16 @@ def parse_rule(rule_conf, hook, fw_name, rule_id, ip_name):
operator = '!='
group_name = group_name[1:]
output.append(f'{ip_name} {prefix}addr {operator} @D_{group_name}')
+ elif 'remote_group' in group:
+ group_name = group['remote_group']
+ operator = ''
+ if group_name[0] == '!':
+ operator = '!='
+ group_name = group_name[1:]
+ if ip_name == 'ip':
+ output.append(f'{ip_name} {prefix}addr {operator} @R_{group_name}')
+ elif ip_name == 'ip6':
+ output.append(f'{ip_name} {prefix}addr {operator} @R6_{group_name}')
if 'mac_group' in group:
group_name = group['mac_group']
operator = ''
@@ -461,14 +474,14 @@ def parse_rule(rule_conf, hook, fw_name, rule_id, ip_name):
output.append('gre version 1')
if gre_key:
- # The offset of the key within the packet shifts depending on the C-flag.
- # nftables cannot handle complex enough expressions to match multiple
+ # The offset of the key within the packet shifts depending on the C-flag.
+ # nftables cannot handle complex enough expressions to match multiple
# offsets based on bitfields elsewhere.
- # We enforce a specific match for the checksum flag in validation, so the
- # gre_flags dict will always have a 'checksum' key when gre_key is populated.
- if not gre_flags['checksum']:
+ # We enforce a specific match for the checksum flag in validation, so the
+ # gre_flags dict will always have a 'checksum' key when gre_key is populated.
+ if not gre_flags['checksum']:
# No "unset" child node means C is set, we offset key lookup +32 bits
- output.append(f'@th,64,32 == {gre_key}')
+ output.append(f'@th,64,32 == {gre_key}')
else:
output.append(f'@th,32,32 == {gre_key}')
@@ -627,7 +640,7 @@ def parse_rule(rule_conf, hook, fw_name, rule_id, ip_name):
return " ".join(output)
def parse_gre_flags(flags, force_keyed=False):
- flag_map = { # nft does not have symbolic names for these.
+ flag_map = { # nft does not have symbolic names for these.
'checksum': 1<<0,
'routing': 1<<1,
'key': 1<<2,
@@ -638,7 +651,7 @@ def parse_gre_flags(flags, force_keyed=False):
include = 0
exclude = 0
for fl_name, fl_state in flags.items():
- if not fl_state:
+ if not fl_state:
include |= flag_map[fl_name]
else: # 'unset' child tag
exclude |= flag_map[fl_name]
@@ -731,14 +744,14 @@ class GeoIPLock(object):
def __exit__(self, exc_type, exc_value, tb):
os.unlink(self.file)
-def geoip_update(firewall, force=False):
+def geoip_update(firewall=None, policy=None, force=False):
with GeoIPLock(geoip_lock_file) as lock:
if not lock:
print("Script is already running")
return False
- if not firewall:
- print("Firewall is not configured")
+ if not firewall and not policy:
+ print("Firewall and policy are not configured")
return True
if not os.path.exists(geoip_database):
@@ -753,23 +766,41 @@ def geoip_update(firewall, force=False):
ipv4_sets = {}
ipv6_sets = {}
+ ipv4_codes_policy = {}
+ ipv6_codes_policy = {}
+
+ ipv4_sets_policy = {}
+ ipv6_sets_policy = {}
+
# Map country codes to set names
- for codes, path in dict_search_recursive(firewall, 'country_code'):
- set_name = f'GEOIP_CC_{path[1]}_{path[2]}_{path[4]}'
- if ( path[0] == 'ipv4'):
- for code in codes:
- ipv4_codes.setdefault(code, []).append(set_name)
- elif ( path[0] == 'ipv6' ):
- set_name = f'GEOIP_CC6_{path[1]}_{path[2]}_{path[4]}'
- for code in codes:
- ipv6_codes.setdefault(code, []).append(set_name)
-
- if not ipv4_codes and not ipv6_codes:
+ if firewall:
+ for codes, path in dict_search_recursive(firewall, 'country_code'):
+ set_name = f'GEOIP_CC_{path[1]}_{path[2]}_{path[4]}'
+ if ( path[0] == 'ipv4'):
+ for code in codes:
+ ipv4_codes.setdefault(code, []).append(set_name)
+ elif ( path[0] == 'ipv6' ):
+ set_name = f'GEOIP_CC6_{path[1]}_{path[2]}_{path[4]}'
+ for code in codes:
+ ipv6_codes.setdefault(code, []).append(set_name)
+
+ if policy:
+ for codes, path in dict_search_recursive(policy, 'country_code'):
+ set_name = f'GEOIP_CC_{path[0]}_{path[1]}_{path[3]}'
+ if ( path[0] == 'route'):
+ for code in codes:
+ ipv4_codes_policy.setdefault(code, []).append(set_name)
+ elif ( path[0] == 'route6' ):
+ set_name = f'GEOIP_CC6_{path[0]}_{path[1]}_{path[3]}'
+ for code in codes:
+ ipv6_codes_policy.setdefault(code, []).append(set_name)
+
+ if not ipv4_codes and not ipv6_codes and not ipv4_codes_policy and not ipv6_codes_policy:
if force:
- print("GeoIP not in use by firewall")
+ print("GeoIP not in use by firewall and policy")
return True
- geoip_data = geoip_load_data([*ipv4_codes, *ipv6_codes])
+ geoip_data = geoip_load_data([*ipv4_codes, *ipv6_codes, *ipv4_codes_policy, *ipv6_codes_policy])
# Iterate IP blocks to assign to sets
for start, end, code in geoip_data:
@@ -778,19 +809,29 @@ def geoip_update(firewall, force=False):
ip_range = f'{start}-{end}' if start != end else start
for setname in ipv4_codes[code]:
ipv4_sets.setdefault(setname, []).append(ip_range)
+ if code in ipv4_codes_policy and ipv4:
+ ip_range = f'{start}-{end}' if start != end else start
+ for setname in ipv4_codes_policy[code]:
+ ipv4_sets_policy.setdefault(setname, []).append(ip_range)
if code in ipv6_codes and not ipv4:
ip_range = f'{start}-{end}' if start != end else start
for setname in ipv6_codes[code]:
ipv6_sets.setdefault(setname, []).append(ip_range)
+ if code in ipv6_codes_policy and not ipv4:
+ ip_range = f'{start}-{end}' if start != end else start
+ for setname in ipv6_codes_policy[code]:
+ ipv6_sets_policy.setdefault(setname, []).append(ip_range)
render(nftables_geoip_conf, 'firewall/nftables-geoip-update.j2', {
'ipv4_sets': ipv4_sets,
- 'ipv6_sets': ipv6_sets
+ 'ipv6_sets': ipv6_sets,
+ 'ipv4_sets_policy': ipv4_sets_policy,
+ 'ipv6_sets_policy': ipv6_sets_policy,
})
result = run(f'nft --file {nftables_geoip_conf}')
if result != 0:
- print('Error: GeoIP failed to update firewall')
+ print('Error: GeoIP failed to update firewall/policy')
return False
return True
diff --git a/python/vyos/frrender.py b/python/vyos/frrender.py
index ba44978d1..73d6dd5f0 100644
--- a/python/vyos/frrender.py
+++ b/python/vyos/frrender.py
@@ -60,6 +60,10 @@ def get_frrender_dict(conf, argv=None) -> dict:
from vyos.configdict import get_dhcp_interfaces
from vyos.configdict import get_pppoe_interfaces
+ # We need to re-set the CLI path to the root level, as this function uses
+ # conf.exists() with an absolute path form the CLI root
+ conf.set_level([])
+
# Create an empty dictionary which will be filled down the code path and
# returned to the caller
dict = {}
@@ -88,7 +92,7 @@ def get_frrender_dict(conf, argv=None) -> dict:
if dict_search(f'area.{area_num}.area_type.nssa', ospf) is None:
del default_values['area'][area_num]['area_type']['nssa']
- for protocol in ['babel', 'bgp', 'connected', 'isis', 'kernel', 'rip', 'static']:
+ for protocol in ['babel', 'bgp', 'connected', 'isis', 'kernel', 'nhrp', 'rip', 'static']:
if dict_search(f'redistribute.{protocol}', ospf) is None:
del default_values['redistribute'][protocol]
if not bool(default_values['redistribute']):
@@ -599,8 +603,10 @@ def get_frrender_dict(conf, argv=None) -> dict:
dict.update({'vrf' : vrf})
if os.path.exists(frr_debug_enable):
+ print(f'---- get_frrender_dict({conf}) ----')
import pprint
pprint.pprint(dict)
+ print('-----------------------------------')
return dict
@@ -691,6 +697,9 @@ class FRRender:
debug('FRR: START CONFIGURATION RENDERING')
# we can not reload an empty file, thus we always embed the marker
output = '!\n'
+ # Enable FRR logging
+ output += 'log syslog\n'
+ output += 'log facility local7\n'
# Enable SNMP agentx support
# SNMP AgentX support cannot be disabled once enabled
if 'snmp' in config_dict:
diff --git a/python/vyos/ifconfig/bridge.py b/python/vyos/ifconfig/bridge.py
index d534dade7..f81026965 100644
--- a/python/vyos/ifconfig/bridge.py
+++ b/python/vyos/ifconfig/bridge.py
@@ -19,7 +19,7 @@ from vyos.utils.assertion import assert_list
from vyos.utils.assertion import assert_positive
from vyos.utils.dict import dict_search
from vyos.utils.network import interface_exists
-from vyos.configdict import get_vlan_ids
+from vyos.configdict import get_vlans_ids_and_range
from vyos.configdict import list_diff
@Interface.register
@@ -380,7 +380,7 @@ class BridgeIf(Interface):
add_vlan = []
native_vlan_id = None
allowed_vlan_ids= []
- cur_vlan_ids = get_vlan_ids(interface)
+ cur_vlan_ids = get_vlans_ids_and_range(interface)
if 'native_vlan' in interface_config:
vlan_id = interface_config['native_vlan']
@@ -389,14 +389,8 @@ class BridgeIf(Interface):
if 'allowed_vlan' in interface_config:
for vlan in interface_config['allowed_vlan']:
- vlan_range = vlan.split('-')
- if len(vlan_range) == 2:
- for vlan_add in range(int(vlan_range[0]),int(vlan_range[1]) + 1):
- add_vlan.append(str(vlan_add))
- allowed_vlan_ids.append(str(vlan_add))
- else:
- add_vlan.append(vlan)
- allowed_vlan_ids.append(vlan)
+ add_vlan.append(vlan)
+ allowed_vlan_ids.append(vlan)
# Remove redundant VLANs from the system
for vlan in list_diff(cur_vlan_ids, add_vlan):
diff --git a/python/vyos/ifconfig/interface.py b/python/vyos/ifconfig/interface.py
index 979b62578..003a273c0 100644
--- a/python/vyos/ifconfig/interface.py
+++ b/python/vyos/ifconfig/interface.py
@@ -22,6 +22,7 @@ from copy import deepcopy
from glob import glob
from ipaddress import IPv4Network
+from ipaddress import IPv6Interface
from netifaces import ifaddresses
# this is not the same as socket.AF_INET/INET6
from netifaces import AF_INET
@@ -909,7 +910,11 @@ class Interface(Control):
tmp = self.get_interface('ipv6_autoconf')
if tmp == autoconf:
return None
- return self.set_interface('ipv6_autoconf', autoconf)
+ rc = self.set_interface('ipv6_autoconf', autoconf)
+ if autoconf == '0':
+ flushed = self.flush_ipv6_slaac_addrs()
+ self.flush_ipv6_slaac_routes(ra_addrs=flushed)
+ return rc
def add_ipv6_eui64_address(self, prefix):
"""
@@ -937,6 +942,20 @@ class Interface(Control):
prefixlen = prefix.split('/')[1]
self.del_addr(f'{eui64}/{prefixlen}')
+ def set_ipv6_interface_identifier(self, identifier):
+ """
+ Set the interface identifier for IPv6 autoconf.
+ """
+ cmd = f'ip token set {identifier} dev {self.ifname}'
+ self._cmd(cmd)
+
+ def del_ipv6_interface_identifier(self):
+ """
+ Delete the interface identifier for IPv6 autoconf.
+ """
+ cmd = f'ip token delete dev {self.ifname}'
+ self._cmd(cmd)
+
def set_ipv6_forwarding(self, forwarding):
"""
Configure IPv6 interface-specific Host/Router behaviour.
@@ -1310,6 +1329,71 @@ class Interface(Control):
# flush all addresses
self._cmd(cmd)
+ def flush_ipv6_slaac_addrs(self) -> list:
+ """
+ Flush all IPv6 addresses installed in response to router advertisement
+ messages from this interface.
+
+ Will raise an exception on error.
+ Will return a list of flushed IPv6 addresses.
+ """
+ netns = get_interface_namespace(self.ifname)
+ netns_cmd = f'ip netns exec {netns}' if netns else ''
+ tmp = get_interface_address(self.ifname)
+ if not tmp or 'addr_info' not in tmp:
+ return
+
+ # Parse interface IP addresses. Example data:
+ # {'family': 'inet6', 'local': '2001:db8:1111:0:250:56ff:feb3:38c5',
+ # 'prefixlen': 64, 'scope': 'global', 'dynamic': True,
+ # 'mngtmpaddr': True, 'protocol': 'kernel_ra',
+ # 'valid_life_time': 2591987, 'preferred_life_time': 14387}
+ flushed = []
+ for addr_info in tmp['addr_info']:
+ if 'protocol' not in addr_info:
+ continue
+ if (addr_info['protocol'] == 'kernel_ra' and
+ addr_info['scope'] == 'global'):
+ # Flush IPv6 addresses installed by router advertisement
+ ra_addr = f"{addr_info['local']}/{addr_info['prefixlen']}"
+ flushed.append(ra_addr)
+ cmd = f'{netns_cmd} ip -6 addr del dev {self.ifname} {ra_addr}'
+ self._cmd(cmd)
+ return flushed
+
+ def flush_ipv6_slaac_routes(self, ra_addrs: list=[]) -> None:
+ """
+ Flush IPv6 default routes installed in response to router advertisement
+ messages from this interface.
+
+ Will raise an exception on error.
+ """
+ # Find IPv6 connected prefixes for flushed SLAAC addresses
+ connected = []
+ for addr in ra_addrs if isinstance(ra_addrs, list) else []:
+ connected.append(str(IPv6Interface(addr).network))
+
+ netns = get_interface_namespace(self.ifname)
+ netns_cmd = f'ip netns exec {netns}' if netns else ''
+
+ tmp = self._cmd(f'{netns_cmd} ip -j -6 route show dev {self.ifname}')
+ tmp = json.loads(tmp)
+ # Parse interface routes. Example data:
+ # {'dst': 'default', 'gateway': 'fe80::250:56ff:feb3:cdba',
+ # 'protocol': 'ra', 'metric': 1024, 'flags': [], 'expires': 1398,
+ # 'metrics': [{'hoplimit': 64}], 'pref': 'medium'}
+ for route in tmp:
+ # If it's a default route received from RA, delete it
+ if (dict_search('dst', route) == 'default' and
+ dict_search('protocol', route) == 'ra'):
+ self._cmd(f'{netns_cmd} ip -6 route del default via {route["gateway"]} dev {self.ifname}')
+ # Remove connected prefixes received from RA
+ if dict_search('dst', route) in connected:
+ # If it's a connected prefix, delete it
+ self._cmd(f'{netns_cmd} ip -6 route del {route["dst"]} dev {self.ifname}')
+
+ return None
+
def add_to_bridge(self, bridge_dict):
"""
Adds the interface to the bridge with the passed port config.
@@ -1320,8 +1404,6 @@ class Interface(Control):
# drop all interface addresses first
self.flush_addrs()
- ifname = self.ifname
-
for bridge, bridge_config in bridge_dict.items():
# add interface to bridge - use Section.klass to get BridgeIf class
Section.klass(bridge)(bridge, create=True).add_port(self.ifname)
@@ -1337,7 +1419,7 @@ class Interface(Control):
bridge_vlan_filter = Section.klass(bridge)(bridge, create=True).get_vlan_filter()
if int(bridge_vlan_filter):
- cur_vlan_ids = get_vlan_ids(ifname)
+ cur_vlan_ids = get_vlan_ids(self.ifname)
add_vlan = []
native_vlan_id = None
allowed_vlan_ids= []
@@ -1360,15 +1442,15 @@ class Interface(Control):
# Remove redundant VLANs from the system
for vlan in list_diff(cur_vlan_ids, add_vlan):
- cmd = f'bridge vlan del dev {ifname} vid {vlan} master'
+ cmd = f'bridge vlan del dev {self.ifname} vid {vlan} master'
self._cmd(cmd)
for vlan in allowed_vlan_ids:
- cmd = f'bridge vlan add dev {ifname} vid {vlan} master'
+ cmd = f'bridge vlan add dev {self.ifname} vid {vlan} master'
self._cmd(cmd)
# Setting native VLAN to system
if native_vlan_id:
- cmd = f'bridge vlan add dev {ifname} vid {native_vlan_id} pvid untagged master'
+ cmd = f'bridge vlan add dev {self.ifname} vid {native_vlan_id} pvid untagged master'
self._cmd(cmd)
def set_dhcp(self, enable: bool, vrf_changed: bool=False):
@@ -1447,12 +1529,11 @@ class Interface(Control):
if enable not in [True, False]:
raise ValueError()
- ifname = self.ifname
config_base = directories['dhcp6_client_dir']
- config_file = f'{config_base}/dhcp6c.{ifname}.conf'
- script_file = f'/etc/wide-dhcpv6/dhcp6c.{ifname}.script' # can not live under /run b/c of noexec mount option
- systemd_override_file = f'/run/systemd/system/dhcp6c@{ifname}.service.d/10-override.conf'
- systemd_service = f'dhcp6c@{ifname}.service'
+ config_file = f'{config_base}/dhcp6c.{self.ifname}.conf'
+ script_file = f'/etc/wide-dhcpv6/dhcp6c.{self.ifname}.script' # can not live under /run b/c of noexec mount option
+ systemd_override_file = f'/run/systemd/system/dhcp6c@{self.ifname}.service.d/10-override.conf'
+ systemd_service = f'dhcp6c@{self.ifname}.service'
# Rendered client configuration files require additional settings
config = deepcopy(self.config)
@@ -1792,11 +1873,26 @@ class Interface(Control):
value = '0' if (tmp != None) else '1'
self.set_ipv6_forwarding(value)
+ # Delete old interface identifier
+ # This should be before setting the accept_ra value
+ old = dict_search('ipv6.address.interface_identifier_old', config)
+ now = dict_search('ipv6.address.interface_identifier', config)
+ if old and not now:
+ # accept_ra of ra is required to delete the interface identifier
+ self.set_ipv6_accept_ra('2')
+ self.del_ipv6_interface_identifier()
+
+ # Set IPv6 Interface identifier
+ # This should be before setting the accept_ra value
+ tmp = dict_search('ipv6.address.interface_identifier', config)
+ if tmp:
+ # accept_ra is required to set the interface identifier
+ self.set_ipv6_accept_ra('2')
+ self.set_ipv6_interface_identifier(tmp)
+
# IPv6 router advertisements
tmp = dict_search('ipv6.address.autoconf', config)
- value = '2' if (tmp != None) else '1'
- if 'dhcpv6' in new_addr:
- value = '2'
+ value = '2' if (tmp != None) else '0'
self.set_ipv6_accept_ra(value)
# IPv6 address autoconfiguration
diff --git a/python/vyos/kea.py b/python/vyos/kea.py
index c7947af3e..5eecbbaad 100644
--- a/python/vyos/kea.py
+++ b/python/vyos/kea.py
@@ -20,8 +20,8 @@ import socket
from datetime import datetime
from datetime import timezone
+from vyos import ConfigError
from vyos.template import is_ipv6
-from vyos.template import isc_static_route
from vyos.template import netmask_from_cidr
from vyos.utils.dict import dict_search_args
from vyos.utils.file import file_permissions
@@ -44,6 +44,7 @@ kea4_options = {
'wpad_url': 'wpad-url',
'ipv6_only_preferred': 'v6-only-preferred',
'captive_portal': 'v4-captive-portal',
+ 'capwap_controller': 'capwap-ac-v4',
}
kea6_options = {
@@ -56,6 +57,7 @@ kea6_options = {
'nisplus_server': 'nisp-servers',
'sntp_server': 'sntp-servers',
'captive_portal': 'v6-captive-portal',
+ 'capwap_controller': 'capwap-ac-v6',
}
kea_ctrl_socket = '/run/kea/dhcp{inet}-ctrl-socket'
@@ -111,22 +113,21 @@ def kea_parse_options(config):
default_route = ''
if 'default_router' in config:
- default_route = isc_static_route('0.0.0.0/0', config['default_router'])
+ default_route = f'0.0.0.0/0 - {config["default_router"]}'
routes = [
- isc_static_route(route, route_options['next_hop'])
+ f'{route} - {route_options["next_hop"]}'
for route, route_options in config['static_route'].items()
]
options.append(
{
- 'name': 'rfc3442-static-route',
+ 'name': 'classless-static-route',
'data': ', '.join(
routes if not default_route else routes + [default_route]
),
}
)
- options.append({'name': 'windows-static-route', 'data': ', '.join(routes)})
if 'time_zone' in config:
with open('/usr/share/zoneinfo/' + config['time_zone'], 'rb') as f:
@@ -147,7 +148,7 @@ def kea_parse_options(config):
def kea_parse_subnet(subnet, config):
- out = {'subnet': subnet, 'id': int(config['subnet_id'])}
+ out = {'subnet': subnet, 'id': int(config['subnet_id']), 'user-context': {}}
if 'option' in config:
out['option-data'] = kea_parse_options(config['option'])
@@ -165,6 +166,9 @@ def kea_parse_subnet(subnet, config):
out['valid-lifetime'] = int(config['lease'])
out['max-valid-lifetime'] = int(config['lease'])
+ if 'ping_check' in config:
+ out['user-context']['enable-ping-check'] = True
+
if 'range' in config:
pools = []
for num, range_config in config['range'].items():
@@ -218,6 +222,9 @@ def kea_parse_subnet(subnet, config):
reservations.append(reservation)
out['reservations'] = reservations
+ if 'dynamic_dns_update' in config:
+ out.update(kea_parse_ddns_settings(config['dynamic_dns_update']))
+
return out
@@ -347,6 +354,54 @@ def kea6_parse_subnet(subnet, config):
return out
+def kea_parse_tsig_algo(algo_spec):
+ translate = {
+ 'md5': 'HMAC-MD5',
+ 'sha1': 'HMAC-SHA1',
+ 'sha224': 'HMAC-SHA224',
+ 'sha256': 'HMAC-SHA256',
+ 'sha384': 'HMAC-SHA384',
+ 'sha512': 'HMAC-SHA512'
+ }
+ if algo_spec not in translate:
+ raise ConfigError(f'Unsupported TSIG algorithm: {algo_spec}')
+ return translate[algo_spec]
+
+def kea_parse_enable_disable(value):
+ return True if value == 'enable' else False
+
+def kea_parse_ddns_settings(config):
+ data = {}
+
+ if send_updates := config.get('send_updates'):
+ data['ddns-send-updates'] = kea_parse_enable_disable(send_updates)
+
+ if override_client_update := config.get('override_client_update'):
+ data['ddns-override-client-update'] = kea_parse_enable_disable(override_client_update)
+
+ if override_no_update := config.get('override_no_update'):
+ data['ddns-override-no-update'] = kea_parse_enable_disable(override_no_update)
+
+ if update_on_renew := config.get('update_on_renew'):
+ data['ddns-update-on-renew'] = kea_parse_enable_disable(update_on_renew)
+
+ if conflict_resolution := config.get('conflict_resolution'):
+ data['ddns-use-conflict-resolution'] = kea_parse_enable_disable(conflict_resolution)
+
+ if 'replace_client_name' in config:
+ data['ddns-replace-client-name'] = config['replace_client_name']
+ if 'generated_prefix' in config:
+ data['ddns-generated-prefix'] = config['generated_prefix']
+ if 'qualifying_suffix' in config:
+ data['ddns-qualifying-suffix'] = config['qualifying_suffix']
+ if 'ttl_percent' in config:
+ data['ddns-ttl-percent'] = int(config['ttl_percent']) / 100
+ if 'hostname_char_set' in config:
+ data['hostname-char-set'] = config['hostname_char_set']
+ if 'hostname_char_replacement' in config:
+ data['hostname-char-replacement'] = config['hostname_char_replacement']
+
+ return data
def _ctrl_socket_command(inet, command, args=None):
path = kea_ctrl_socket.format(inet=inet)
@@ -483,10 +538,10 @@ def kea_get_domain_from_subnet_id(config, inet, subnet_id):
if option['name'] == 'domain-name':
return option['data']
- # domain-name is not found in subnet, fallback to shared-network pool option
- for option in network['option-data']:
- if option['name'] == 'domain-name':
- return option['data']
+ # domain-name is not found in subnet, fallback to shared-network pool option
+ for option in network['option-data']:
+ if option['name'] == 'domain-name':
+ return option['data']
return None
diff --git a/python/vyos/proto/__init__.py b/python/vyos/proto/__init__.py
new file mode 100644
index 000000000..e69de29bb
--- /dev/null
+++ b/python/vyos/proto/__init__.py
diff --git a/python/vyos/proto/generate_dataclass.py b/python/vyos/proto/generate_dataclass.py
new file mode 100755
index 000000000..c6296c568
--- /dev/null
+++ b/python/vyos/proto/generate_dataclass.py
@@ -0,0 +1,178 @@
+#!/usr/bin/env python3
+#
+# Copyright (C) 2025 VyOS maintainers and contributors
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License version 2 or later as
+# published by the Free Software Foundation.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+#
+import argparse
+import os
+
+from google.protobuf.descriptor_pb2 import FileDescriptorSet # pylint: disable=no-name-in-module
+from google.protobuf.descriptor_pb2 import FieldDescriptorProto # pylint: disable=no-name-in-module
+from humps import decamelize
+
+HEADER = """\
+from enum import IntEnum
+from dataclasses import dataclass
+from dataclasses import field
+"""
+
+
+def normalize(s: str) -> str:
+ """Decamelize and avoid syntactic collision"""
+ t = decamelize(s)
+ return t + '_' if t in ['from'] else t
+
+
+def generate_dataclass(descriptor_proto):
+ class_name = descriptor_proto.name
+ fields = []
+ for field_p in descriptor_proto.field:
+ field_name = field_p.name
+ field_type, field_default = get_type(field_p.type, field_p.type_name)
+ match field_p.label:
+ case FieldDescriptorProto.LABEL_REPEATED:
+ field_type = f'list[{field_type}] = field(default_factory=list)'
+ case FieldDescriptorProto.LABEL_OPTIONAL:
+ field_type = f'{field_type} = None'
+ case _:
+ field_type = f'{field_type} = {field_default}'
+
+ fields.append(f' {field_name}: {field_type}')
+
+ code = f"""
+@dataclass
+class {class_name}:
+{chr(10).join(fields) if fields else ' pass'}
+"""
+
+ return code
+
+
+def generate_request(descriptor_proto):
+ class_name = descriptor_proto.name
+ fields = []
+ f_vars = []
+ for field_p in descriptor_proto.field:
+ field_name = field_p.name
+ field_type, field_default = get_type(field_p.type, field_p.type_name)
+ match field_p.label:
+ case FieldDescriptorProto.LABEL_REPEATED:
+ field_type = f'list[{field_type}] = []'
+ case FieldDescriptorProto.LABEL_OPTIONAL:
+ field_type = f'{field_type} = None'
+ case _:
+ field_type = f'{field_type} = {field_default}'
+
+ fields.append(f'{normalize(field_name)}: {field_type}')
+ f_vars.append(f'{normalize(field_name)}')
+
+ fields.insert(0, 'token: str = None')
+
+ code = f"""
+def set_request_{decamelize(class_name)}({', '.join(fields)}):
+ reqi = {class_name} ({', '.join(f_vars)})
+ req = Request({decamelize(class_name)}=reqi)
+ req_env = RequestEnvelope(token, req)
+ return req_env
+"""
+
+ return code
+
+
+def generate_nested_dataclass(descriptor_proto):
+ out = ''
+ for nested_p in descriptor_proto.nested_type:
+ out = out + generate_dataclass(nested_p)
+
+ return out
+
+
+def generate_nested_request(descriptor_proto):
+ out = ''
+ for nested_p in descriptor_proto.nested_type:
+ out = out + generate_request(nested_p)
+
+ return out
+
+
+def generate_enum_dataclass(descriptor_proto):
+ code = ''
+ for enum_p in descriptor_proto.enum_type:
+ enums = []
+ enum_name = enum_p.name
+ for enum_val in enum_p.value:
+ enums.append(f' {enum_val.name} = {enum_val.number}')
+
+ code += f"""
+class {enum_name}(IntEnum):
+{chr(10).join(enums)}
+"""
+
+ return code
+
+
+def get_type(field_type, type_name):
+ res = 'Any', None
+ match field_type:
+ case FieldDescriptorProto.TYPE_STRING:
+ res = 'str', '""'
+ case FieldDescriptorProto.TYPE_INT32 | FieldDescriptorProto.TYPE_INT64:
+ res = 'int', 0
+ case FieldDescriptorProto.TYPE_FLOAT | FieldDescriptorProto.TYPE_DOUBLE:
+ res = 'float', 0.0
+ case FieldDescriptorProto.TYPE_BOOL:
+ res = 'bool', False
+ case FieldDescriptorProto.TYPE_MESSAGE | FieldDescriptorProto.TYPE_ENUM:
+ res = type_name.split('.')[-1], None
+ case _:
+ pass
+
+ return res
+
+
+if __name__ == '__main__':
+ parser = argparse.ArgumentParser()
+ parser.add_argument('descriptor_file', help='protobuf .desc file')
+ parser.add_argument('--out-dir', help='directory to write generated file')
+ args = parser.parse_args()
+ desc_file = args.descriptor_file
+ out_dir = args.out_dir
+
+ with open(desc_file, 'rb') as f:
+ descriptor_set_data = f.read()
+
+ descriptor_set = FileDescriptorSet()
+ descriptor_set.ParseFromString(descriptor_set_data)
+
+ for file_proto in descriptor_set.file:
+ f = f'{file_proto.name.replace(".", "_")}.py'
+ f = os.path.join(out_dir, f)
+ dataclass_code = ''
+ nested_code = ''
+ enum_code = ''
+ request_code = ''
+ with open(f, 'w') as f:
+ enum_code += generate_enum_dataclass(file_proto)
+ for message_proto in file_proto.message_type:
+ dataclass_code += generate_dataclass(message_proto)
+ nested_code += generate_nested_dataclass(message_proto)
+ enum_code += generate_enum_dataclass(message_proto)
+ request_code += generate_nested_request(message_proto)
+
+ f.write(HEADER)
+ f.write(enum_code)
+ f.write(nested_code)
+ f.write(dataclass_code)
+ f.write(request_code)
diff --git a/python/vyos/proto/vyconf_client.py b/python/vyos/proto/vyconf_client.py
new file mode 100644
index 000000000..b385f0951
--- /dev/null
+++ b/python/vyos/proto/vyconf_client.py
@@ -0,0 +1,89 @@
+# Copyright 2025 VyOS maintainers and contributors <maintainers@vyos.io>
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public License
+# along with this library. If not, see <http://www.gnu.org/licenses/>.
+
+import socket
+from dataclasses import asdict
+
+from vyos.proto import vyconf_proto
+from vyos.proto import vyconf_pb2
+
+from google.protobuf.json_format import MessageToDict
+from google.protobuf.json_format import ParseDict
+
+socket_path = '/var/run/vyconfd.sock'
+
+
+def send_socket(msg: bytearray) -> bytes:
+ data = bytes()
+ client = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+ client.connect(socket_path)
+ client.sendall(msg)
+
+ data_length = client.recv(4)
+ if data_length:
+ length = int.from_bytes(data_length)
+ data = client.recv(length)
+
+ client.close()
+
+ return data
+
+
+def request_to_msg(req: vyconf_proto.RequestEnvelope) -> vyconf_pb2.RequestEnvelope:
+ # pylint: disable=no-member
+
+ msg = vyconf_pb2.RequestEnvelope()
+ msg = ParseDict(asdict(req), msg, ignore_unknown_fields=True)
+ return msg
+
+
+def msg_to_response(msg: vyconf_pb2.Response) -> vyconf_proto.Response:
+ # pylint: disable=no-member
+
+ d = MessageToDict(
+ msg, preserving_proto_field_name=True, use_integers_for_enums=True
+ )
+
+ response = vyconf_proto.Response(**d)
+ return response
+
+
+def write_request(req: vyconf_proto.RequestEnvelope) -> bytearray:
+ req_msg = request_to_msg(req)
+ encoded_data = req_msg.SerializeToString()
+ byte_size = req_msg.ByteSize()
+ length_bytes = byte_size.to_bytes(4)
+ arr = bytearray(length_bytes)
+ arr.extend(encoded_data)
+
+ return arr
+
+
+def read_response(msg: bytes) -> vyconf_proto.Response:
+ response_msg = vyconf_pb2.Response() # pylint: disable=no-member
+ response_msg.ParseFromString(msg)
+ response = msg_to_response(response_msg)
+
+ return response
+
+
+def send_request(name, *args, **kwargs):
+ func = getattr(vyconf_proto, f'set_request_{name}')
+ request_env = func(*args, **kwargs)
+ msg = write_request(request_env)
+ response_msg = send_socket(msg)
+ response = read_response(response_msg)
+
+ return response
diff --git a/python/vyos/system/grub_util.py b/python/vyos/system/grub_util.py
index 4a3d8795e..ad95bb4f9 100644
--- a/python/vyos/system/grub_util.py
+++ b/python/vyos/system/grub_util.py
@@ -56,13 +56,12 @@ def set_kernel_cmdline_options(cmdline_options: str, version: str = '',
@image.if_not_live_boot
def update_kernel_cmdline_options(cmdline_options: str,
- root_dir: str = '') -> None:
+ root_dir: str = '',
+ version = image.get_running_image()) -> None:
"""Update Kernel custom cmdline options"""
if not root_dir:
root_dir = disk.find_persistence()
- version = image.get_running_image()
-
boot_opts_current = grub.get_boot_opts(version, root_dir)
boot_opts_proposed = grub.BOOT_OPTS_STEM + f'{version} {cmdline_options}'
diff --git a/python/vyos/template.py b/python/vyos/template.py
index e75db1a8d..11e1cc50f 100755
--- a/python/vyos/template.py
+++ b/python/vyos/template.py
@@ -36,6 +36,7 @@ DEFAULT_TEMPLATE_DIR = directories["templates"]
# Holds template filters registered via register_filter()
_FILTERS = {}
_TESTS = {}
+_CLEVER_FUNCTIONS = {}
# reuse Environments with identical settings to improve performance
@functools.lru_cache(maxsize=2)
@@ -58,6 +59,7 @@ def _get_environment(location=None):
)
env.filters.update(_FILTERS)
env.tests.update(_TESTS)
+ env.globals.update(_CLEVER_FUNCTIONS)
return env
@@ -77,7 +79,7 @@ def register_filter(name, func=None):
"Filters can only be registered before rendering the first template"
)
if name in _FILTERS:
- raise ValueError(f"A filter with name {name!r} was registered already")
+ raise ValueError(f"A filter with name {name!r} was already registered")
_FILTERS[name] = func
return func
@@ -97,10 +99,30 @@ def register_test(name, func=None):
"Tests can only be registered before rendering the first template"
)
if name in _TESTS:
- raise ValueError(f"A test with name {name!r} was registered already")
+ raise ValueError(f"A test with name {name!r} was already registered")
_TESTS[name] = func
return func
+def register_clever_function(name, func=None):
+ """Register a function to be available as test in templates under given name.
+
+ It can also be used as a decorator, see below in this module for examples.
+
+ :raise RuntimeError:
+ when trying to register a test after a template has been rendered already
+ :raise ValueError: when trying to register a name which was taken already
+ """
+ if func is None:
+ return functools.partial(register_clever_function, name)
+ if _get_environment.cache_info().currsize:
+ raise RuntimeError(
+ "Clever functions can only be registered before rendering the" \
+ "first template")
+ if name in _CLEVER_FUNCTIONS:
+ raise ValueError(f"A clever function with name {name!r} was already "\
+ "registered")
+ _CLEVER_FUNCTIONS[name] = func
+ return func
def render_to_string(template, content, formater=None, location=None):
"""Render a template from the template directory, raise on any errors.
@@ -150,6 +172,8 @@ def render(
# As we are opening the file with 'w', we are performing the rendering before
# calling open() to not accidentally erase the file if rendering fails
rendered = render_to_string(template, content, formater, location)
+ # Remove any trailing character and always add a new line at the end
+ rendered = rendered.rstrip() + "\n"
# Write to file
with open(destination, "w") as file:
@@ -390,28 +414,6 @@ def compare_netmask(netmask1, netmask2):
except:
return False
-@register_filter('isc_static_route')
-def isc_static_route(subnet, router):
- # https://ercpe.de/blog/pushing-static-routes-with-isc-dhcp-server
- # Option format is:
- # <netmask>, <network-byte1>, <network-byte2>, <network-byte3>, <router-byte1>, <router-byte2>, <router-byte3>
- # where bytes with the value 0 are omitted.
- from ipaddress import ip_network
- net = ip_network(subnet)
- # add netmask
- string = str(net.prefixlen) + ','
- # add network bytes
- if net.prefixlen:
- width = net.prefixlen // 8
- if net.prefixlen % 8:
- width += 1
- string += ','.join(map(str,tuple(net.network_address.packed)[:width])) + ','
-
- # add router bytes
- string += ','.join(router.split('.'))
-
- return string
-
@register_filter('is_file')
def is_file(filename):
if os.path.exists(filename):
@@ -881,10 +883,77 @@ def kea_high_availability_json(config):
return dumps(data)
+@register_filter('kea_dynamic_dns_update_main_json')
+def kea_dynamic_dns_update_main_json(config):
+ from vyos.kea import kea_parse_ddns_settings
+ from json import dumps
+
+ data = kea_parse_ddns_settings(config)
+
+ if len(data) == 0:
+ return ''
+
+ return dumps(data, indent=8)[1:-1] + ','
+
+@register_filter('kea_dynamic_dns_update_tsig_key_json')
+def kea_dynamic_dns_update_tsig_key_json(config):
+ from vyos.kea import kea_parse_tsig_algo
+ from json import dumps
+ out = []
+
+ if 'tsig_key' not in config:
+ return dumps(out)
+
+ tsig_keys = config['tsig_key']
+
+ for tsig_key_name, tsig_key_config in tsig_keys.items():
+ tsig_key = {
+ 'name': tsig_key_name,
+ 'algorithm': kea_parse_tsig_algo(tsig_key_config['algorithm']),
+ 'secret': tsig_key_config['secret']
+ }
+ out.append(tsig_key)
+
+ return dumps(out, indent=12)
+
+@register_filter('kea_dynamic_dns_update_domains')
+def kea_dynamic_dns_update_domains(config, type_key):
+ from json import dumps
+ out = []
+
+ if type_key not in config:
+ return dumps(out)
+
+ domains = config[type_key]
+
+ for domain_name, domain_config in domains.items():
+ domain = {
+ 'name': domain_name,
+
+ }
+ if 'key_name' in domain_config:
+ domain['key-name'] = domain_config['key_name']
+
+ if 'dns_server' in domain_config:
+ dns_servers = []
+ for dns_server_config in domain_config['dns_server'].values():
+ dns_server = {
+ 'ip-address': dns_server_config['address']
+ }
+ if 'port' in dns_server_config:
+ dns_server['port'] = int(dns_server_config['port'])
+ dns_servers.append(dns_server)
+ domain['dns-servers'] = dns_servers
+
+ out.append(domain)
+
+ return dumps(out, indent=12)
+
@register_filter('kea_shared_network_json')
def kea_shared_network_json(shared_networks):
from vyos.kea import kea_parse_options
from vyos.kea import kea_parse_subnet
+ from vyos.kea import kea_parse_ddns_settings
from json import dumps
out = []
@@ -895,9 +964,13 @@ def kea_shared_network_json(shared_networks):
network = {
'name': name,
'authoritative': ('authoritative' in config),
- 'subnet4': []
+ 'subnet4': [],
+ 'user-context': {}
}
+ if 'dynamic_dns_update' in config:
+ network.update(kea_parse_ddns_settings(config['dynamic_dns_update']))
+
if 'option' in config:
network['option-data'] = kea_parse_options(config['option'])
@@ -907,6 +980,9 @@ def kea_shared_network_json(shared_networks):
if 'bootfile_server' in config['option']:
network['next-server'] = config['option']['bootfile_server']
+ if 'ping_check' in config:
+ network['user-context']['enable-ping-check'] = True
+
if 'subnet' in config:
for subnet, subnet_config in config['subnet'].items():
if 'disable' in subnet_config:
@@ -998,3 +1074,21 @@ def vyos_defined(value, test_value=None, var_type=None):
else:
# Valid value and is matching optional argument if provided - return true
return True
+
+@register_clever_function('get_default_port')
+def get_default_port(service):
+ """
+ Jinja2 plugin to retrieve common service port number from vyos.defaults
+ class form a Jinja2 template. This removes the need to hardcode, or pass in
+ the data using the general dictionary.
+
+ Added to remove code complexity and make it easier to read.
+
+ Example:
+ {{ get_default_port('certbot_haproxy') }}
+ """
+ from vyos.defaults import internal_ports
+ if service not in internal_ports:
+ raise RuntimeError(f'Service "{service}" not found in internal ' \
+ 'vyos.defaults.internal_ports dict!')
+ return internal_ports[service]
diff --git a/python/vyos/utils/auth.py b/python/vyos/utils/auth.py
index a0b3e1cae..5d0e3464a 100644
--- a/python/vyos/utils/auth.py
+++ b/python/vyos/utils/auth.py
@@ -13,10 +13,80 @@
# You should have received a copy of the GNU Lesser General Public License along with this library;
# if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+import cracklib
+import math
import re
+import string
+from enum import StrEnum
+from decimal import Decimal
from vyos.utils.process import cmd
+
+DEFAULT_PASSWORD: str = 'vyos'
+LOW_ENTROPY_MSG: str = 'should be at least 8 characters long;'
+WEAK_PASSWORD_MSG: str = 'The password complexity is too low - @MSG@'
+CRACKLIB_ERROR_MSG: str = 'A following error occurred: @MSG@\n' \
+ 'Possibly the cracklib database is corrupted or is missing. ' \
+ 'Try reinstalling the python3-cracklib package.'
+
+class EPasswdStrength(StrEnum):
+ WEAK = 'Weak'
+ DECENT = 'Decent'
+ STRONG = 'Strong'
+ ERROR = 'Cracklib Error'
+
+
+def calculate_entropy(charset: str, passwd: str) -> float:
+ """
+ Calculate the entropy of a password based on the set of characters used
+ Uses E = log2(R**L) formula, where
+ - R is the range (length) of the character set
+ - L is the length of password
+ """
+ return math.log(math.pow(len(charset), len(passwd)), 2)
+
+def evaluate_strength(passwd: str) -> dict[str, str]:
+ """ Evaluates password strength and returns a check result dict """
+ charset = (cracklib.ASCII_UPPERCASE + cracklib.ASCII_LOWERCASE +
+ string.punctuation + string.digits)
+
+ result = {
+ 'strength': '',
+ 'error': '',
+ }
+
+ try:
+ cracklib.FascistCheck(passwd)
+ except ValueError as e:
+ # The password is vulnerable to dictionary attack no matter the entropy
+ if 'is' in str(e):
+ msg = str(e).replace('is', 'should not be')
+ else:
+ msg = f'should not be {e}'
+ result.update(strength=EPasswdStrength.WEAK)
+ result.update(error=WEAK_PASSWORD_MSG.replace('@MSG@', msg))
+ except Exception as e:
+ result.update(strength=EPasswdStrength.ERROR)
+ result.update(error=CRACKLIB_ERROR_MSG.replace('@MSG@', str(e)))
+ else:
+ # Now check the password's entropy
+ # Cast to Decimal for more precise rounding
+ entropy = Decimal.from_float(calculate_entropy(charset, passwd))
+
+ match round(entropy):
+ case e if e in range(0, 59):
+ result.update(strength=EPasswdStrength.WEAK)
+ result.update(
+ error=WEAK_PASSWORD_MSG.replace('@MSG@', LOW_ENTROPY_MSG)
+ )
+ case e if e in range(60, 119):
+ result.update(strength=EPasswdStrength.DECENT)
+ case e if e >= 120:
+ result.update(strength=EPasswdStrength.STRONG)
+
+ return result
+
def make_password_hash(password):
""" Makes a password hash for /etc/shadow using mkpasswd """
diff --git a/python/vyos/utils/network.py b/python/vyos/utils/network.py
index dc0c0a6d6..20b6a3c9e 100644
--- a/python/vyos/utils/network.py
+++ b/python/vyos/utils/network.py
@@ -256,40 +256,60 @@ def mac2eui64(mac, prefix=None):
except: # pylint: disable=bare-except
return
-def check_port_availability(ipaddress, port, protocol):
+def check_port_availability(address: str=None, port: int=0, protocol: str='tcp') -> bool:
"""
- Check if port is available and not used by any service
- Return False if a port is busy or IP address does not exists
+ Check if given port is available and not used by any service.
+
Should be used carefully for services that can start listening
dynamically, because IP address may be dynamic too
+
+ Args:
+ address: IPv4 or IPv6 address - if None, checks on all interfaces
+ port: TCP/UDP port number.
+
+
+ Returns:
+ False if a port is busy or IP address does not exists
+ True if a port is free and IP address exists
"""
- from socketserver import TCPServer, UDPServer
+ import socket
from ipaddress import ip_address
+ # treat None as "any address"
+ address = address or '::'
+
# verify arguments
try:
- ipaddress = ip_address(ipaddress).compressed
- except:
- raise ValueError(f'The {ipaddress} is not a valid IPv4 or IPv6 address')
+ address = ip_address(address).compressed
+ except ValueError:
+ raise ValueError(f'{address} is not a valid IPv4 or IPv6 address')
if port not in range(1, 65536):
- raise ValueError(f'The port number {port} is not in the 1-65535 range')
+ raise ValueError(f'Port {port} is not in range 1-65535')
if protocol not in ['tcp', 'udp']:
- raise ValueError(f'The protocol {protocol} is not supported. Only tcp and udp are allowed')
+ raise ValueError(f'{protocol} is not supported - only tcp and udp are allowed')
- # check port availability
+ protocol = socket.SOCK_STREAM if protocol == 'tcp' else socket.SOCK_DGRAM
try:
- if protocol == 'tcp':
- server = TCPServer((ipaddress, port), None, bind_and_activate=True)
- if protocol == 'udp':
- server = UDPServer((ipaddress, port), None, bind_and_activate=True)
- server.server_close()
- except Exception as e:
- # errno.h:
- #define EADDRINUSE 98 /* Address already in use */
- if e.errno == 98:
+ addr_info = socket.getaddrinfo(address, port, socket.AF_UNSPEC, protocol)
+ except socket.gaierror as e:
+ print(f'Invalid address: {address}')
+ return False
+
+ for family, socktype, proto, canonname, sockaddr in addr_info:
+ try:
+ with socket.socket(family, socktype, proto) as s:
+ s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+ s.bind(sockaddr)
+ # port is free to use
+ return True
+ except OSError:
+ # port is already in use
return False
- return True
+ # if we reach this point, no socket was tested and we assume the port is
+ # already in use - better safe then sorry
+ return False
+
def is_listen_port_bind_service(port: int, service: str) -> bool:
"""Check if listen port bound to expected program name
@@ -599,3 +619,35 @@ def get_nft_vrf_zone_mapping() -> dict:
for (vrf_name, vrf_id) in vrf_list:
output.append({'interface' : vrf_name, 'vrf_tableid' : vrf_id})
return output
+
+def is_valid_ipv4_address_or_range(addr: str) -> bool:
+ """
+ Validates if the provided address is a valid IPv4, CIDR or IPv4 range
+ :param addr: address to test
+ :return: bool: True if provided address is valid
+ """
+ from ipaddress import ip_network
+ try:
+ if '-' in addr: # If we are checking a range, validate both address's individually
+ split = addr.split('-')
+ return is_valid_ipv4_address_or_range(split[0]) and is_valid_ipv4_address_or_range(split[1])
+ else:
+ return ip_network(addr).version == 4
+ except:
+ return False
+
+def is_valid_ipv6_address_or_range(addr: str) -> bool:
+ """
+ Validates if the provided address is a valid IPv4, CIDR or IPv4 range
+ :param addr: address to test
+ :return: bool: True if provided address is valid
+ """
+ from ipaddress import ip_network
+ try:
+ if '-' in addr: # If we are checking a range, validate both address's individually
+ split = addr.split('-')
+ return is_valid_ipv6_address_or_range(split[0]) and is_valid_ipv6_address_or_range(split[1])
+ else:
+ return ip_network(addr).version == 6
+ except:
+ return False
diff --git a/python/vyos/utils/process.py b/python/vyos/utils/process.py
index 121b6e240..21335e6b3 100644
--- a/python/vyos/utils/process.py
+++ b/python/vyos/utils/process.py
@@ -14,6 +14,7 @@
# License along with this library. If not, see <http://www.gnu.org/licenses/>.
import os
+import shlex
from subprocess import Popen
from subprocess import PIPE
@@ -21,20 +22,17 @@ from subprocess import STDOUT
from subprocess import DEVNULL
-def get_wrapper(vrf, netns, auth):
- wrapper = ''
+def get_wrapper(vrf, netns):
+ wrapper = None
if vrf:
- wrapper = f'ip vrf exec {vrf} '
+ wrapper = ['ip', 'vrf', 'exec', vrf]
elif netns:
- wrapper = f'ip netns exec {netns} '
- if auth:
- wrapper = f'{auth} {wrapper}'
+ wrapper = ['ip', 'netns', 'exec', netns]
return wrapper
def popen(command, flag='', shell=None, input=None, timeout=None, env=None,
- stdout=PIPE, stderr=PIPE, decode='utf-8', auth='', vrf=None,
- netns=None):
+ stdout=PIPE, stderr=PIPE, decode='utf-8', vrf=None, netns=None):
"""
popen is a wrapper helper around subprocess.Popen
with it default setting it will return a tuple (out, err)
@@ -75,28 +73,33 @@ def popen(command, flag='', shell=None, input=None, timeout=None, env=None,
if not debug.enabled(flag):
flag = 'command'
+ use_shell = shell
+ stdin = None
+ if shell is None:
+ use_shell = False
+ if ' ' in command:
+ use_shell = True
+ if env:
+ use_shell = True
+
# Must be run as root to execute command in VRF or network namespace
+ wrapper = get_wrapper(vrf, netns)
if vrf or netns:
if os.getuid() != 0:
raise OSError(
'Permission denied: cannot execute commands in VRF and netns contexts as an unprivileged user'
)
- wrapper = get_wrapper(vrf, netns, auth)
- command = f'{wrapper} {command}' if wrapper else command
+ if use_shell:
+ command = f'{shlex.join(wrapper)} {command}'
+ else:
+ if type(command) is not list:
+ command = [command]
+ command = wrapper + command
- cmd_msg = f"cmd '{command}'"
+ cmd_msg = f"cmd '{command}'" if use_shell else f"cmd '{shlex.join(command)}'"
debug.message(cmd_msg, flag)
- use_shell = shell
- stdin = None
- if shell is None:
- use_shell = False
- if ' ' in command:
- use_shell = True
- if env:
- use_shell = True
-
if input:
stdin = PIPE
input = input.encode() if type(input) is str else input
@@ -155,7 +158,7 @@ def run(command, flag='', shell=None, input=None, timeout=None, env=None,
def cmd(command, flag='', shell=None, input=None, timeout=None, env=None,
stdout=PIPE, stderr=PIPE, decode='utf-8', raising=None, message='',
- expect=[0], auth='', vrf=None, netns=None):
+ expect=[0], vrf=None, netns=None):
"""
A wrapper around popen, which returns the stdout and
will raise the error code of a command
@@ -171,12 +174,11 @@ def cmd(command, flag='', shell=None, input=None, timeout=None, env=None,
input=input, timeout=timeout,
env=env, shell=shell,
decode=decode,
- auth=auth,
vrf=vrf,
netns=netns,
)
if code not in expect:
- wrapper = get_wrapper(vrf, netns, auth='')
+ wrapper = get_wrapper(vrf, netns)
command = f'{wrapper} {command}'
feedback = message + '\n' if message else ''
feedback += f'failed to run command: {command}\n'
diff --git a/python/vyos/vyconf_session.py b/python/vyos/vyconf_session.py
new file mode 100644
index 000000000..506095625
--- /dev/null
+++ b/python/vyos/vyconf_session.py
@@ -0,0 +1,123 @@
+# Copyright 2025 VyOS maintainers and contributors <maintainers@vyos.io>
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public License
+# along with this library. If not, see <http://www.gnu.org/licenses/>.
+#
+#
+
+import tempfile
+import shutil
+from functools import wraps
+from typing import Type
+
+from vyos.proto import vyconf_client
+from vyos.migrate import ConfigMigrate
+from vyos.migrate import ConfigMigrateError
+from vyos.component_version import append_system_version
+
+
+def output(o):
+ out = ''
+ for res in (o.output, o.error, o.warning):
+ if res is not None:
+ out = out + res
+ return out
+
+
+class VyconfSession:
+ def __init__(self, token: str = None, on_error: Type[Exception] = None):
+ if token is None:
+ out = vyconf_client.send_request('setup_session')
+ self.__token = out.output
+ else:
+ self.__token = token
+
+ self.on_error = on_error
+
+ @staticmethod
+ def raise_exception(f):
+ @wraps(f)
+ def wrapped(self, *args, **kwargs):
+ if self.on_error is None:
+ return f(self, *args, **kwargs)
+ o, e = f(self, *args, **kwargs)
+ if e:
+ raise self.on_error(o)
+ return o, e
+
+ return wrapped
+
+ @raise_exception
+ def set(self, path: list[str]) -> tuple[str, int]:
+ out = vyconf_client.send_request('set', token=self.__token, path=path)
+ return output(out), out.status
+
+ @raise_exception
+ def delete(self, path: list[str]) -> tuple[str, int]:
+ out = vyconf_client.send_request('delete', token=self.__token, path=path)
+ return output(out), out.status
+
+ @raise_exception
+ def commit(self) -> tuple[str, int]:
+ out = vyconf_client.send_request('commit', token=self.__token)
+ return output(out), out.status
+
+ @raise_exception
+ def discard(self) -> tuple[str, int]:
+ out = vyconf_client.send_request('discard', token=self.__token)
+ return output(out), out.status
+
+ def session_changed(self) -> bool:
+ out = vyconf_client.send_request('session_changed', token=self.__token)
+ return not bool(out.status)
+
+ @raise_exception
+ def load_config(self, file: str, migrate: bool = False) -> tuple[str, int]:
+ # pylint: disable=consider-using-with
+ if migrate:
+ tmp = tempfile.NamedTemporaryFile()
+ shutil.copy2(file, tmp.name)
+ config_migrate = ConfigMigrate(tmp.name)
+ try:
+ config_migrate.run()
+ except ConfigMigrateError as e:
+ tmp.close()
+ return repr(e), 1
+ file = tmp.name
+ else:
+ tmp = ''
+
+ out = vyconf_client.send_request('load', token=self.__token, location=file)
+ if tmp:
+ tmp.close()
+
+ return output(out), out.status
+
+ @raise_exception
+ def save_config(self, file: str, append_version: bool = False) -> tuple[str, int]:
+ out = vyconf_client.send_request('save', token=self.__token, location=file)
+ if append_version:
+ append_system_version(file)
+ return output(out), out.status
+
+ @raise_exception
+ def show_config(self, path: list[str] = None) -> tuple[str, int]:
+ if path is None:
+ path = []
+ out = vyconf_client.send_request('show_config', token=self.__token, path=path)
+ return output(out), out.status
+
+ def __del__(self):
+ out = vyconf_client.send_request('teardown', token=self.__token)
+ if out.status:
+ print(f'Could not tear down session {self.__token}: {output(out)}')