summaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
Diffstat (limited to 'python')
-rw-r--r--python/setup.py11
-rw-r--r--python/vyos/component_version.py63
-rw-r--r--python/vyos/config.py12
-rw-r--r--python/vyos/configdep.py9
-rw-r--r--python/vyos/configdict.py25
-rw-r--r--python/vyos/configsession.py55
-rw-r--r--python/vyos/configtree.py29
-rw-r--r--python/vyos/configverify.py3
-rw-r--r--python/vyos/frrender.py8
-rw-r--r--python/vyos/ifconfig/bridge.py14
-rw-r--r--python/vyos/ifconfig/interface.py31
-rw-r--r--python/vyos/kea.py75
-rwxr-xr-xpython/vyos/proto/generate_dataclass.py178
-rw-r--r--python/vyos/proto/vyconf_client.py89
-rw-r--r--python/vyos/system/grub_util.py5
-rwxr-xr-xpython/vyos/template.py98
-rw-r--r--python/vyos/vyconf_session.py123
17 files changed, 725 insertions, 103 deletions
diff --git a/python/setup.py b/python/setup.py
index 96dc211f7..571b956ee 100644
--- a/python/setup.py
+++ b/python/setup.py
@@ -7,6 +7,9 @@ from setuptools.command.build_py import build_py
sys.path.append('./vyos')
from defaults import directories
+def desc_out(f):
+ return os.path.splitext(f)[0] + '.desc'
+
def packages(directory):
return [
_[0].replace('/','.')
@@ -37,9 +40,17 @@ class GenerateProto(build_py):
'protoc',
'--python_out=vyos/proto',
f'--proto_path={self.proto_path}/',
+ f'--descriptor_set_out=vyos/proto/{desc_out(proto_file)}',
proto_file,
]
)
+ subprocess.check_call(
+ [
+ 'vyos/proto/generate_dataclass.py',
+ 'vyos/proto/vyconf.desc',
+ '--out-dir=vyos/proto',
+ ]
+ )
build_py.run(self)
diff --git a/python/vyos/component_version.py b/python/vyos/component_version.py
index 94215531d..81d986658 100644
--- a/python/vyos/component_version.py
+++ b/python/vyos/component_version.py
@@ -49,7 +49,9 @@ DEFAULT_CONFIG_PATH = os.path.join(directories['config'], 'config.boot')
REGEX_WARN_VYOS = r'(// Warning: Do not remove the following line.)'
REGEX_WARN_VYATTA = r'(/\* Warning: Do not remove the following line. \*/)'
REGEX_COMPONENT_VERSION_VYOS = r'// vyos-config-version:\s+"([\w@:-]+)"\s*'
-REGEX_COMPONENT_VERSION_VYATTA = r'/\* === vyatta-config-version:\s+"([\w@:-]+)"\s+=== \*/'
+REGEX_COMPONENT_VERSION_VYATTA = (
+ r'/\* === vyatta-config-version:\s+"([\w@:-]+)"\s+=== \*/'
+)
REGEX_RELEASE_VERSION_VYOS = r'// Release version:\s+(\S*)\s*'
REGEX_RELEASE_VERSION_VYATTA = r'/\* Release version:\s+(\S*)\s*\*/'
@@ -62,16 +64,31 @@ CONFIG_FILE_VERSION = """\
warn_filter_vyos = re.compile(REGEX_WARN_VYOS)
warn_filter_vyatta = re.compile(REGEX_WARN_VYATTA)
-regex_filter = { 'vyos': dict(zip(['component', 'release'],
- [re.compile(REGEX_COMPONENT_VERSION_VYOS),
- re.compile(REGEX_RELEASE_VERSION_VYOS)])),
- 'vyatta': dict(zip(['component', 'release'],
- [re.compile(REGEX_COMPONENT_VERSION_VYATTA),
- re.compile(REGEX_RELEASE_VERSION_VYATTA)])) }
+regex_filter = {
+ 'vyos': dict(
+ zip(
+ ['component', 'release'],
+ [
+ re.compile(REGEX_COMPONENT_VERSION_VYOS),
+ re.compile(REGEX_RELEASE_VERSION_VYOS),
+ ],
+ )
+ ),
+ 'vyatta': dict(
+ zip(
+ ['component', 'release'],
+ [
+ re.compile(REGEX_COMPONENT_VERSION_VYATTA),
+ re.compile(REGEX_RELEASE_VERSION_VYATTA),
+ ],
+ )
+ ),
+}
+
@dataclass
class VersionInfo:
- component: Optional[dict[str,int]] = None
+ component: Optional[dict[str, int]] = None
release: str = get_version()
vintage: str = 'vyos'
config_body: Optional[str] = None
@@ -84,8 +101,9 @@ class VersionInfo:
return bool(self.config_body is None)
def update_footer(self):
- f = CONFIG_FILE_VERSION.format(component_to_string(self.component),
- self.release)
+ f = CONFIG_FILE_VERSION.format(
+ component_to_string(self.component), self.release
+ )
self.footer_lines = f.splitlines()
def update_syntax(self):
@@ -121,13 +139,16 @@ class VersionInfo:
except Exception as e:
raise ValueError(e) from e
+
def component_to_string(component: dict) -> str:
- l = [f'{k}@{v}' for k, v in sorted(component.items(), key=lambda x: x[0])]
+ l = [f'{k}@{v}' for k, v in sorted(component.items(), key=lambda x: x[0])] # noqa: E741
return ':'.join(l)
+
def component_from_string(string: str) -> dict:
return {k: int(v) for k, v in re.findall(r'([\w,-]+)@(\d+)', string)}
+
def version_info_from_file(config_file) -> VersionInfo:
"""Return config file component and release version info."""
version_info = VersionInfo()
@@ -166,27 +187,27 @@ def version_info_from_file(config_file) -> VersionInfo:
return version_info
+
def version_info_from_system() -> VersionInfo:
"""Return system component and release version info."""
d = component_version()
sort_d = dict(sorted(d.items(), key=lambda x: x[0]))
- version_info = VersionInfo(
- component = sort_d,
- release = get_version(),
- vintage = 'vyos'
- )
+ version_info = VersionInfo(component=sort_d, release=get_version(), vintage='vyos')
return version_info
+
def version_info_copy(v: VersionInfo) -> VersionInfo:
"""Make a copy of dataclass."""
return replace(v)
+
def version_info_prune_component(x: VersionInfo, y: VersionInfo) -> VersionInfo:
"""In place pruning of component keys of x not in y."""
if x.component is None or y.component is None:
return
- x.component = { k: v for k,v in x.component.items() if k in y.component }
+ x.component = {k: v for k, v in x.component.items() if k in y.component}
+
def add_system_version(config_str: str = None, out_file: str = None):
"""Wrap config string with system version and write to out_file.
@@ -202,3 +223,11 @@ def add_system_version(config_str: str = None, out_file: str = None):
version_info.write(out_file)
else:
sys.stdout.write(version_info.write_string())
+
+
+def append_system_version(file: str):
+ """Append system version data to existing file"""
+ version_info = version_info_from_system()
+ version_info.update_footer()
+ with open(file, 'a') as f:
+ f.write(version_info.write_string())
diff --git a/python/vyos/config.py b/python/vyos/config.py
index 1fab46761..546eeceab 100644
--- a/python/vyos/config.py
+++ b/python/vyos/config.py
@@ -149,6 +149,18 @@ class Config(object):
return self._running_config
return self._session_config
+ def get_bool_attr(self, attr) -> bool:
+ if not hasattr(self, attr):
+ return False
+ else:
+ tmp = getattr(self, attr)
+ if not isinstance(tmp, bool):
+ return False
+ return tmp
+
+ def set_bool_attr(self, attr, val):
+ setattr(self, attr, val)
+
def _make_path(self, path):
# Backwards-compatibility stuff: original implementation used string paths
# libvyosconfig paths are lists, but since node names cannot contain whitespace,
diff --git a/python/vyos/configdep.py b/python/vyos/configdep.py
index cf7c9d543..747af8dbe 100644
--- a/python/vyos/configdep.py
+++ b/python/vyos/configdep.py
@@ -102,11 +102,16 @@ def run_config_mode_script(target: str, config: 'Config'):
mod = load_as_module(name, path)
config.set_level([])
+ dry_run = config.get_bool_attr('dry_run')
try:
c = mod.get_config(config)
mod.verify(c)
- mod.generate(c)
- mod.apply(c)
+ if not dry_run:
+ mod.generate(c)
+ mod.apply(c)
+ else:
+ if hasattr(mod, 'call_dependents'):
+ mod.call_dependents()
except (VyOSError, ConfigError) as e:
raise ConfigError(str(e)) from e
diff --git a/python/vyos/configdict.py b/python/vyos/configdict.py
index 78b98a3eb..ff0a15933 100644
--- a/python/vyos/configdict.py
+++ b/python/vyos/configdict.py
@@ -517,6 +517,14 @@ def get_interface_dict(config, base, ifname='', recursive_defaults=True, with_pk
else:
dict['ipv6']['address'].update({'eui64_old': eui64})
+ interface_identifier = leaf_node_changed(config, base + [ifname, 'ipv6', 'address', 'interface-identifier'])
+ if interface_identifier:
+ tmp = dict_search('ipv6.address', dict)
+ if not tmp:
+ dict.update({'ipv6': {'address': {'interface_identifier_old': interface_identifier}}})
+ else:
+ dict['ipv6']['address'].update({'interface_identifier_old': interface_identifier})
+
for vif, vif_config in dict.get('vif', {}).items():
# Add subinterface name to dictionary
dict['vif'][vif].update({'ifname' : f'{ifname}.{vif}'})
@@ -626,6 +634,23 @@ def get_vlan_ids(interface):
return vlan_ids
+def get_vlans_ids_and_range(interface):
+ vlan_ids = set()
+
+ vlan_filter_status = json.loads(cmd(f'bridge -j -d vlan show dev {interface}'))
+
+ if vlan_filter_status is not None:
+ for interface_status in vlan_filter_status:
+ for vlan_entry in interface_status.get("vlans", []):
+ start = vlan_entry["vlan"]
+ end = vlan_entry.get("vlanEnd")
+ if end:
+ vlan_ids.add(f"{start}-{end}")
+ else:
+ vlan_ids.add(str(start))
+
+ return vlan_ids
+
def get_accel_dict(config, base, chap_secrets, with_pki=False):
"""
Common utility function to retrieve and mangle the Accel-PPP configuration
diff --git a/python/vyos/configsession.py b/python/vyos/configsession.py
index 90b96b88c..a3be29881 100644
--- a/python/vyos/configsession.py
+++ b/python/vyos/configsession.py
@@ -21,6 +21,10 @@ import subprocess
from vyos.defaults import directories
from vyos.utils.process import is_systemd_service_running
from vyos.utils.dict import dict_to_paths
+from vyos.utils.boot import boot_configuration_complete
+from vyos.vyconf_session import VyconfSession
+
+vyconf_backend = False
CLI_SHELL_API = '/bin/cli-shell-api'
SET = '/opt/vyatta/sbin/my_set'
@@ -165,6 +169,11 @@ class ConfigSession(object):
self.__run_command([CLI_SHELL_API, 'setupSession'])
+ if vyconf_backend and boot_configuration_complete():
+ self._vyconf_session = VyconfSession(on_error=ConfigSessionError)
+ else:
+ self._vyconf_session = None
+
def __del__(self):
try:
output = (
@@ -209,7 +218,10 @@ class ConfigSession(object):
value = []
else:
value = [value]
- self.__run_command([SET] + path + value)
+ if self._vyconf_session is None:
+ self.__run_command([SET] + path + value)
+ else:
+ self._vyconf_session.set(path + value)
def set_section(self, path: list, d: dict):
try:
@@ -223,7 +235,10 @@ class ConfigSession(object):
value = []
else:
value = [value]
- self.__run_command([DELETE] + path + value)
+ if self._vyconf_session is None:
+ self.__run_command([DELETE] + path + value)
+ else:
+ self._vyconf_session.delete(path + value)
def load_section(self, path: list, d: dict):
try:
@@ -261,20 +276,34 @@ class ConfigSession(object):
self.__run_command([COMMENT] + path + value)
def commit(self):
- out = self.__run_command([COMMIT])
+ if self._vyconf_session is None:
+ out = self.__run_command([COMMIT])
+ else:
+ out, _ = self._vyconf_session.commit()
+
return out
def discard(self):
- self.__run_command([DISCARD])
+ if self._vyconf_session is None:
+ self.__run_command([DISCARD])
+ else:
+ out, _ = self._vyconf_session.discard()
def show_config(self, path, format='raw'):
- config_data = self.__run_command(SHOW_CONFIG + path)
+ if self._vyconf_session is None:
+ config_data = self.__run_command(SHOW_CONFIG + path)
+ else:
+ config_data, _ = self._vyconf_session.show_config()
if format == 'raw':
return config_data
def load_config(self, file_path):
- out = self.__run_command(LOAD_CONFIG + [file_path])
+ if self._vyconf_session is None:
+ out = self.__run_command(LOAD_CONFIG + [file_path])
+ else:
+ out, _ = self._vyconf_session.load_config(file=file_path)
+
return out
def load_explicit(self, file_path):
@@ -287,11 +316,21 @@ class ConfigSession(object):
raise ConfigSessionError(e) from e
def migrate_and_load_config(self, file_path):
- out = self.__run_command(MIGRATE_LOAD_CONFIG + [file_path])
+ if self._vyconf_session is None:
+ out = self.__run_command(MIGRATE_LOAD_CONFIG + [file_path])
+ else:
+ out, _ = self._vyconf_session.load_config(file=file_path, migrate=True)
+
return out
def save_config(self, file_path):
- out = self.__run_command(SAVE_CONFIG + [file_path])
+ if self._vyconf_session is None:
+ out = self.__run_command(SAVE_CONFIG + [file_path])
+ else:
+ out, _ = self._vyconf_session.save_config(
+ file=file_path, append_version=True
+ )
+
return out
def install_image(self, url):
diff --git a/python/vyos/configtree.py b/python/vyos/configtree.py
index dade852c7..ff40fbad0 100644
--- a/python/vyos/configtree.py
+++ b/python/vyos/configtree.py
@@ -523,35 +523,6 @@ def mask_inclusive(left, right, libpath=LIBPATH):
return tree
-def show_commit_data(active_tree, proposed_tree, libpath=LIBPATH):
- if not (
- isinstance(active_tree, ConfigTree) and isinstance(proposed_tree, ConfigTree)
- ):
- raise TypeError('Arguments must be instances of ConfigTree')
-
- __lib = cdll.LoadLibrary(libpath)
- __show_commit_data = __lib.show_commit_data
- __show_commit_data.argtypes = [c_void_p, c_void_p]
- __show_commit_data.restype = c_char_p
-
- res = __show_commit_data(active_tree._get_config(), proposed_tree._get_config())
-
- return res.decode()
-
-
-def test_commit(active_tree, proposed_tree, libpath=LIBPATH):
- if not (
- isinstance(active_tree, ConfigTree) and isinstance(proposed_tree, ConfigTree)
- ):
- raise TypeError('Arguments must be instances of ConfigTree')
-
- __lib = cdll.LoadLibrary(libpath)
- __test_commit = __lib.test_commit
- __test_commit.argtypes = [c_void_p, c_void_p]
-
- __test_commit(active_tree._get_config(), proposed_tree._get_config())
-
-
def reference_tree_to_json(from_dir, to_file, internal_cache='', libpath=LIBPATH):
try:
__lib = cdll.LoadLibrary(libpath)
diff --git a/python/vyos/configverify.py b/python/vyos/configverify.py
index 4084425b1..c93d9faac 100644
--- a/python/vyos/configverify.py
+++ b/python/vyos/configverify.py
@@ -92,6 +92,9 @@ def verify_mtu_ipv6(config):
tmp = dict_search('ipv6.address.eui64', config)
if tmp != None: raise ConfigError(error_msg)
+ tmp = dict_search('ipv6.address.interface_identifier', config)
+ if tmp != None: raise ConfigError(error_msg)
+
def verify_vrf(config):
"""
Common helper function used by interface implementations to perform
diff --git a/python/vyos/frrender.py b/python/vyos/frrender.py
index ba44978d1..524167d8b 100644
--- a/python/vyos/frrender.py
+++ b/python/vyos/frrender.py
@@ -60,6 +60,10 @@ def get_frrender_dict(conf, argv=None) -> dict:
from vyos.configdict import get_dhcp_interfaces
from vyos.configdict import get_pppoe_interfaces
+ # We need to re-set the CLI path to the root level, as this function uses
+ # conf.exists() with an absolute path form the CLI root
+ conf.set_level([])
+
# Create an empty dictionary which will be filled down the code path and
# returned to the caller
dict = {}
@@ -88,7 +92,7 @@ def get_frrender_dict(conf, argv=None) -> dict:
if dict_search(f'area.{area_num}.area_type.nssa', ospf) is None:
del default_values['area'][area_num]['area_type']['nssa']
- for protocol in ['babel', 'bgp', 'connected', 'isis', 'kernel', 'rip', 'static']:
+ for protocol in ['babel', 'bgp', 'connected', 'isis', 'kernel', 'nhrp', 'rip', 'static']:
if dict_search(f'redistribute.{protocol}', ospf) is None:
del default_values['redistribute'][protocol]
if not bool(default_values['redistribute']):
@@ -599,8 +603,10 @@ def get_frrender_dict(conf, argv=None) -> dict:
dict.update({'vrf' : vrf})
if os.path.exists(frr_debug_enable):
+ print(f'---- get_frrender_dict({conf}) ----')
import pprint
pprint.pprint(dict)
+ print('-----------------------------------')
return dict
diff --git a/python/vyos/ifconfig/bridge.py b/python/vyos/ifconfig/bridge.py
index d534dade7..f81026965 100644
--- a/python/vyos/ifconfig/bridge.py
+++ b/python/vyos/ifconfig/bridge.py
@@ -19,7 +19,7 @@ from vyos.utils.assertion import assert_list
from vyos.utils.assertion import assert_positive
from vyos.utils.dict import dict_search
from vyos.utils.network import interface_exists
-from vyos.configdict import get_vlan_ids
+from vyos.configdict import get_vlans_ids_and_range
from vyos.configdict import list_diff
@Interface.register
@@ -380,7 +380,7 @@ class BridgeIf(Interface):
add_vlan = []
native_vlan_id = None
allowed_vlan_ids= []
- cur_vlan_ids = get_vlan_ids(interface)
+ cur_vlan_ids = get_vlans_ids_and_range(interface)
if 'native_vlan' in interface_config:
vlan_id = interface_config['native_vlan']
@@ -389,14 +389,8 @@ class BridgeIf(Interface):
if 'allowed_vlan' in interface_config:
for vlan in interface_config['allowed_vlan']:
- vlan_range = vlan.split('-')
- if len(vlan_range) == 2:
- for vlan_add in range(int(vlan_range[0]),int(vlan_range[1]) + 1):
- add_vlan.append(str(vlan_add))
- allowed_vlan_ids.append(str(vlan_add))
- else:
- add_vlan.append(vlan)
- allowed_vlan_ids.append(vlan)
+ add_vlan.append(vlan)
+ allowed_vlan_ids.append(vlan)
# Remove redundant VLANs from the system
for vlan in list_diff(cur_vlan_ids, add_vlan):
diff --git a/python/vyos/ifconfig/interface.py b/python/vyos/ifconfig/interface.py
index 979b62578..9a45ae66e 100644
--- a/python/vyos/ifconfig/interface.py
+++ b/python/vyos/ifconfig/interface.py
@@ -937,6 +937,20 @@ class Interface(Control):
prefixlen = prefix.split('/')[1]
self.del_addr(f'{eui64}/{prefixlen}')
+ def set_ipv6_interface_identifier(self, identifier):
+ """
+ Set the interface identifier for IPv6 autoconf.
+ """
+ cmd = f'ip token set {identifier} dev {self.ifname}'
+ self._cmd(cmd)
+
+ def del_ipv6_interface_identifier(self):
+ """
+ Delete the interface identifier for IPv6 autoconf.
+ """
+ cmd = f'ip token delete dev {self.ifname}'
+ self._cmd(cmd)
+
def set_ipv6_forwarding(self, forwarding):
"""
Configure IPv6 interface-specific Host/Router behaviour.
@@ -1792,6 +1806,23 @@ class Interface(Control):
value = '0' if (tmp != None) else '1'
self.set_ipv6_forwarding(value)
+ # Delete old interface identifier
+ # This should be before setting the accept_ra value
+ old = dict_search('ipv6.address.interface_identifier_old', config)
+ now = dict_search('ipv6.address.interface_identifier', config)
+ if old and not now:
+ # accept_ra of ra is required to delete the interface identifier
+ self.set_ipv6_accept_ra('2')
+ self.del_ipv6_interface_identifier()
+
+ # Set IPv6 Interface identifier
+ # This should be before setting the accept_ra value
+ tmp = dict_search('ipv6.address.interface_identifier', config)
+ if tmp:
+ # accept_ra is required to set the interface identifier
+ self.set_ipv6_accept_ra('2')
+ self.set_ipv6_interface_identifier(tmp)
+
# IPv6 router advertisements
tmp = dict_search('ipv6.address.autoconf', config)
value = '2' if (tmp != None) else '1'
diff --git a/python/vyos/kea.py b/python/vyos/kea.py
index c7947af3e..5eecbbaad 100644
--- a/python/vyos/kea.py
+++ b/python/vyos/kea.py
@@ -20,8 +20,8 @@ import socket
from datetime import datetime
from datetime import timezone
+from vyos import ConfigError
from vyos.template import is_ipv6
-from vyos.template import isc_static_route
from vyos.template import netmask_from_cidr
from vyos.utils.dict import dict_search_args
from vyos.utils.file import file_permissions
@@ -44,6 +44,7 @@ kea4_options = {
'wpad_url': 'wpad-url',
'ipv6_only_preferred': 'v6-only-preferred',
'captive_portal': 'v4-captive-portal',
+ 'capwap_controller': 'capwap-ac-v4',
}
kea6_options = {
@@ -56,6 +57,7 @@ kea6_options = {
'nisplus_server': 'nisp-servers',
'sntp_server': 'sntp-servers',
'captive_portal': 'v6-captive-portal',
+ 'capwap_controller': 'capwap-ac-v6',
}
kea_ctrl_socket = '/run/kea/dhcp{inet}-ctrl-socket'
@@ -111,22 +113,21 @@ def kea_parse_options(config):
default_route = ''
if 'default_router' in config:
- default_route = isc_static_route('0.0.0.0/0', config['default_router'])
+ default_route = f'0.0.0.0/0 - {config["default_router"]}'
routes = [
- isc_static_route(route, route_options['next_hop'])
+ f'{route} - {route_options["next_hop"]}'
for route, route_options in config['static_route'].items()
]
options.append(
{
- 'name': 'rfc3442-static-route',
+ 'name': 'classless-static-route',
'data': ', '.join(
routes if not default_route else routes + [default_route]
),
}
)
- options.append({'name': 'windows-static-route', 'data': ', '.join(routes)})
if 'time_zone' in config:
with open('/usr/share/zoneinfo/' + config['time_zone'], 'rb') as f:
@@ -147,7 +148,7 @@ def kea_parse_options(config):
def kea_parse_subnet(subnet, config):
- out = {'subnet': subnet, 'id': int(config['subnet_id'])}
+ out = {'subnet': subnet, 'id': int(config['subnet_id']), 'user-context': {}}
if 'option' in config:
out['option-data'] = kea_parse_options(config['option'])
@@ -165,6 +166,9 @@ def kea_parse_subnet(subnet, config):
out['valid-lifetime'] = int(config['lease'])
out['max-valid-lifetime'] = int(config['lease'])
+ if 'ping_check' in config:
+ out['user-context']['enable-ping-check'] = True
+
if 'range' in config:
pools = []
for num, range_config in config['range'].items():
@@ -218,6 +222,9 @@ def kea_parse_subnet(subnet, config):
reservations.append(reservation)
out['reservations'] = reservations
+ if 'dynamic_dns_update' in config:
+ out.update(kea_parse_ddns_settings(config['dynamic_dns_update']))
+
return out
@@ -347,6 +354,54 @@ def kea6_parse_subnet(subnet, config):
return out
+def kea_parse_tsig_algo(algo_spec):
+ translate = {
+ 'md5': 'HMAC-MD5',
+ 'sha1': 'HMAC-SHA1',
+ 'sha224': 'HMAC-SHA224',
+ 'sha256': 'HMAC-SHA256',
+ 'sha384': 'HMAC-SHA384',
+ 'sha512': 'HMAC-SHA512'
+ }
+ if algo_spec not in translate:
+ raise ConfigError(f'Unsupported TSIG algorithm: {algo_spec}')
+ return translate[algo_spec]
+
+def kea_parse_enable_disable(value):
+ return True if value == 'enable' else False
+
+def kea_parse_ddns_settings(config):
+ data = {}
+
+ if send_updates := config.get('send_updates'):
+ data['ddns-send-updates'] = kea_parse_enable_disable(send_updates)
+
+ if override_client_update := config.get('override_client_update'):
+ data['ddns-override-client-update'] = kea_parse_enable_disable(override_client_update)
+
+ if override_no_update := config.get('override_no_update'):
+ data['ddns-override-no-update'] = kea_parse_enable_disable(override_no_update)
+
+ if update_on_renew := config.get('update_on_renew'):
+ data['ddns-update-on-renew'] = kea_parse_enable_disable(update_on_renew)
+
+ if conflict_resolution := config.get('conflict_resolution'):
+ data['ddns-use-conflict-resolution'] = kea_parse_enable_disable(conflict_resolution)
+
+ if 'replace_client_name' in config:
+ data['ddns-replace-client-name'] = config['replace_client_name']
+ if 'generated_prefix' in config:
+ data['ddns-generated-prefix'] = config['generated_prefix']
+ if 'qualifying_suffix' in config:
+ data['ddns-qualifying-suffix'] = config['qualifying_suffix']
+ if 'ttl_percent' in config:
+ data['ddns-ttl-percent'] = int(config['ttl_percent']) / 100
+ if 'hostname_char_set' in config:
+ data['hostname-char-set'] = config['hostname_char_set']
+ if 'hostname_char_replacement' in config:
+ data['hostname-char-replacement'] = config['hostname_char_replacement']
+
+ return data
def _ctrl_socket_command(inet, command, args=None):
path = kea_ctrl_socket.format(inet=inet)
@@ -483,10 +538,10 @@ def kea_get_domain_from_subnet_id(config, inet, subnet_id):
if option['name'] == 'domain-name':
return option['data']
- # domain-name is not found in subnet, fallback to shared-network pool option
- for option in network['option-data']:
- if option['name'] == 'domain-name':
- return option['data']
+ # domain-name is not found in subnet, fallback to shared-network pool option
+ for option in network['option-data']:
+ if option['name'] == 'domain-name':
+ return option['data']
return None
diff --git a/python/vyos/proto/generate_dataclass.py b/python/vyos/proto/generate_dataclass.py
new file mode 100755
index 000000000..c6296c568
--- /dev/null
+++ b/python/vyos/proto/generate_dataclass.py
@@ -0,0 +1,178 @@
+#!/usr/bin/env python3
+#
+# Copyright (C) 2025 VyOS maintainers and contributors
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License version 2 or later as
+# published by the Free Software Foundation.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+#
+import argparse
+import os
+
+from google.protobuf.descriptor_pb2 import FileDescriptorSet # pylint: disable=no-name-in-module
+from google.protobuf.descriptor_pb2 import FieldDescriptorProto # pylint: disable=no-name-in-module
+from humps import decamelize
+
+HEADER = """\
+from enum import IntEnum
+from dataclasses import dataclass
+from dataclasses import field
+"""
+
+
+def normalize(s: str) -> str:
+ """Decamelize and avoid syntactic collision"""
+ t = decamelize(s)
+ return t + '_' if t in ['from'] else t
+
+
+def generate_dataclass(descriptor_proto):
+ class_name = descriptor_proto.name
+ fields = []
+ for field_p in descriptor_proto.field:
+ field_name = field_p.name
+ field_type, field_default = get_type(field_p.type, field_p.type_name)
+ match field_p.label:
+ case FieldDescriptorProto.LABEL_REPEATED:
+ field_type = f'list[{field_type}] = field(default_factory=list)'
+ case FieldDescriptorProto.LABEL_OPTIONAL:
+ field_type = f'{field_type} = None'
+ case _:
+ field_type = f'{field_type} = {field_default}'
+
+ fields.append(f' {field_name}: {field_type}')
+
+ code = f"""
+@dataclass
+class {class_name}:
+{chr(10).join(fields) if fields else ' pass'}
+"""
+
+ return code
+
+
+def generate_request(descriptor_proto):
+ class_name = descriptor_proto.name
+ fields = []
+ f_vars = []
+ for field_p in descriptor_proto.field:
+ field_name = field_p.name
+ field_type, field_default = get_type(field_p.type, field_p.type_name)
+ match field_p.label:
+ case FieldDescriptorProto.LABEL_REPEATED:
+ field_type = f'list[{field_type}] = []'
+ case FieldDescriptorProto.LABEL_OPTIONAL:
+ field_type = f'{field_type} = None'
+ case _:
+ field_type = f'{field_type} = {field_default}'
+
+ fields.append(f'{normalize(field_name)}: {field_type}')
+ f_vars.append(f'{normalize(field_name)}')
+
+ fields.insert(0, 'token: str = None')
+
+ code = f"""
+def set_request_{decamelize(class_name)}({', '.join(fields)}):
+ reqi = {class_name} ({', '.join(f_vars)})
+ req = Request({decamelize(class_name)}=reqi)
+ req_env = RequestEnvelope(token, req)
+ return req_env
+"""
+
+ return code
+
+
+def generate_nested_dataclass(descriptor_proto):
+ out = ''
+ for nested_p in descriptor_proto.nested_type:
+ out = out + generate_dataclass(nested_p)
+
+ return out
+
+
+def generate_nested_request(descriptor_proto):
+ out = ''
+ for nested_p in descriptor_proto.nested_type:
+ out = out + generate_request(nested_p)
+
+ return out
+
+
+def generate_enum_dataclass(descriptor_proto):
+ code = ''
+ for enum_p in descriptor_proto.enum_type:
+ enums = []
+ enum_name = enum_p.name
+ for enum_val in enum_p.value:
+ enums.append(f' {enum_val.name} = {enum_val.number}')
+
+ code += f"""
+class {enum_name}(IntEnum):
+{chr(10).join(enums)}
+"""
+
+ return code
+
+
+def get_type(field_type, type_name):
+ res = 'Any', None
+ match field_type:
+ case FieldDescriptorProto.TYPE_STRING:
+ res = 'str', '""'
+ case FieldDescriptorProto.TYPE_INT32 | FieldDescriptorProto.TYPE_INT64:
+ res = 'int', 0
+ case FieldDescriptorProto.TYPE_FLOAT | FieldDescriptorProto.TYPE_DOUBLE:
+ res = 'float', 0.0
+ case FieldDescriptorProto.TYPE_BOOL:
+ res = 'bool', False
+ case FieldDescriptorProto.TYPE_MESSAGE | FieldDescriptorProto.TYPE_ENUM:
+ res = type_name.split('.')[-1], None
+ case _:
+ pass
+
+ return res
+
+
+if __name__ == '__main__':
+ parser = argparse.ArgumentParser()
+ parser.add_argument('descriptor_file', help='protobuf .desc file')
+ parser.add_argument('--out-dir', help='directory to write generated file')
+ args = parser.parse_args()
+ desc_file = args.descriptor_file
+ out_dir = args.out_dir
+
+ with open(desc_file, 'rb') as f:
+ descriptor_set_data = f.read()
+
+ descriptor_set = FileDescriptorSet()
+ descriptor_set.ParseFromString(descriptor_set_data)
+
+ for file_proto in descriptor_set.file:
+ f = f'{file_proto.name.replace(".", "_")}.py'
+ f = os.path.join(out_dir, f)
+ dataclass_code = ''
+ nested_code = ''
+ enum_code = ''
+ request_code = ''
+ with open(f, 'w') as f:
+ enum_code += generate_enum_dataclass(file_proto)
+ for message_proto in file_proto.message_type:
+ dataclass_code += generate_dataclass(message_proto)
+ nested_code += generate_nested_dataclass(message_proto)
+ enum_code += generate_enum_dataclass(message_proto)
+ request_code += generate_nested_request(message_proto)
+
+ f.write(HEADER)
+ f.write(enum_code)
+ f.write(nested_code)
+ f.write(dataclass_code)
+ f.write(request_code)
diff --git a/python/vyos/proto/vyconf_client.py b/python/vyos/proto/vyconf_client.py
new file mode 100644
index 000000000..b385f0951
--- /dev/null
+++ b/python/vyos/proto/vyconf_client.py
@@ -0,0 +1,89 @@
+# Copyright 2025 VyOS maintainers and contributors <maintainers@vyos.io>
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public License
+# along with this library. If not, see <http://www.gnu.org/licenses/>.
+
+import socket
+from dataclasses import asdict
+
+from vyos.proto import vyconf_proto
+from vyos.proto import vyconf_pb2
+
+from google.protobuf.json_format import MessageToDict
+from google.protobuf.json_format import ParseDict
+
+socket_path = '/var/run/vyconfd.sock'
+
+
+def send_socket(msg: bytearray) -> bytes:
+ data = bytes()
+ client = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+ client.connect(socket_path)
+ client.sendall(msg)
+
+ data_length = client.recv(4)
+ if data_length:
+ length = int.from_bytes(data_length)
+ data = client.recv(length)
+
+ client.close()
+
+ return data
+
+
+def request_to_msg(req: vyconf_proto.RequestEnvelope) -> vyconf_pb2.RequestEnvelope:
+ # pylint: disable=no-member
+
+ msg = vyconf_pb2.RequestEnvelope()
+ msg = ParseDict(asdict(req), msg, ignore_unknown_fields=True)
+ return msg
+
+
+def msg_to_response(msg: vyconf_pb2.Response) -> vyconf_proto.Response:
+ # pylint: disable=no-member
+
+ d = MessageToDict(
+ msg, preserving_proto_field_name=True, use_integers_for_enums=True
+ )
+
+ response = vyconf_proto.Response(**d)
+ return response
+
+
+def write_request(req: vyconf_proto.RequestEnvelope) -> bytearray:
+ req_msg = request_to_msg(req)
+ encoded_data = req_msg.SerializeToString()
+ byte_size = req_msg.ByteSize()
+ length_bytes = byte_size.to_bytes(4)
+ arr = bytearray(length_bytes)
+ arr.extend(encoded_data)
+
+ return arr
+
+
+def read_response(msg: bytes) -> vyconf_proto.Response:
+ response_msg = vyconf_pb2.Response() # pylint: disable=no-member
+ response_msg.ParseFromString(msg)
+ response = msg_to_response(response_msg)
+
+ return response
+
+
+def send_request(name, *args, **kwargs):
+ func = getattr(vyconf_proto, f'set_request_{name}')
+ request_env = func(*args, **kwargs)
+ msg = write_request(request_env)
+ response_msg = send_socket(msg)
+ response = read_response(response_msg)
+
+ return response
diff --git a/python/vyos/system/grub_util.py b/python/vyos/system/grub_util.py
index 4a3d8795e..ad95bb4f9 100644
--- a/python/vyos/system/grub_util.py
+++ b/python/vyos/system/grub_util.py
@@ -56,13 +56,12 @@ def set_kernel_cmdline_options(cmdline_options: str, version: str = '',
@image.if_not_live_boot
def update_kernel_cmdline_options(cmdline_options: str,
- root_dir: str = '') -> None:
+ root_dir: str = '',
+ version = image.get_running_image()) -> None:
"""Update Kernel custom cmdline options"""
if not root_dir:
root_dir = disk.find_persistence()
- version = image.get_running_image()
-
boot_opts_current = grub.get_boot_opts(version, root_dir)
boot_opts_proposed = grub.BOOT_OPTS_STEM + f'{version} {cmdline_options}'
diff --git a/python/vyos/template.py b/python/vyos/template.py
index e75db1a8d..d79e1183f 100755
--- a/python/vyos/template.py
+++ b/python/vyos/template.py
@@ -390,28 +390,6 @@ def compare_netmask(netmask1, netmask2):
except:
return False
-@register_filter('isc_static_route')
-def isc_static_route(subnet, router):
- # https://ercpe.de/blog/pushing-static-routes-with-isc-dhcp-server
- # Option format is:
- # <netmask>, <network-byte1>, <network-byte2>, <network-byte3>, <router-byte1>, <router-byte2>, <router-byte3>
- # where bytes with the value 0 are omitted.
- from ipaddress import ip_network
- net = ip_network(subnet)
- # add netmask
- string = str(net.prefixlen) + ','
- # add network bytes
- if net.prefixlen:
- width = net.prefixlen // 8
- if net.prefixlen % 8:
- width += 1
- string += ','.join(map(str,tuple(net.network_address.packed)[:width])) + ','
-
- # add router bytes
- string += ','.join(router.split('.'))
-
- return string
-
@register_filter('is_file')
def is_file(filename):
if os.path.exists(filename):
@@ -881,10 +859,77 @@ def kea_high_availability_json(config):
return dumps(data)
+@register_filter('kea_dynamic_dns_update_main_json')
+def kea_dynamic_dns_update_main_json(config):
+ from vyos.kea import kea_parse_ddns_settings
+ from json import dumps
+
+ data = kea_parse_ddns_settings(config)
+
+ if len(data) == 0:
+ return ''
+
+ return dumps(data, indent=8)[1:-1] + ','
+
+@register_filter('kea_dynamic_dns_update_tsig_key_json')
+def kea_dynamic_dns_update_tsig_key_json(config):
+ from vyos.kea import kea_parse_tsig_algo
+ from json import dumps
+ out = []
+
+ if 'tsig_key' not in config:
+ return dumps(out)
+
+ tsig_keys = config['tsig_key']
+
+ for tsig_key_name, tsig_key_config in tsig_keys.items():
+ tsig_key = {
+ 'name': tsig_key_name,
+ 'algorithm': kea_parse_tsig_algo(tsig_key_config['algorithm']),
+ 'secret': tsig_key_config['secret']
+ }
+ out.append(tsig_key)
+
+ return dumps(out, indent=12)
+
+@register_filter('kea_dynamic_dns_update_domains')
+def kea_dynamic_dns_update_domains(config, type_key):
+ from json import dumps
+ out = []
+
+ if type_key not in config:
+ return dumps(out)
+
+ domains = config[type_key]
+
+ for domain_name, domain_config in domains.items():
+ domain = {
+ 'name': domain_name,
+
+ }
+ if 'key_name' in domain_config:
+ domain['key-name'] = domain_config['key_name']
+
+ if 'dns_server' in domain_config:
+ dns_servers = []
+ for dns_server_config in domain_config['dns_server'].values():
+ dns_server = {
+ 'ip-address': dns_server_config['address']
+ }
+ if 'port' in dns_server_config:
+ dns_server['port'] = int(dns_server_config['port'])
+ dns_servers.append(dns_server)
+ domain['dns-servers'] = dns_servers
+
+ out.append(domain)
+
+ return dumps(out, indent=12)
+
@register_filter('kea_shared_network_json')
def kea_shared_network_json(shared_networks):
from vyos.kea import kea_parse_options
from vyos.kea import kea_parse_subnet
+ from vyos.kea import kea_parse_ddns_settings
from json import dumps
out = []
@@ -895,9 +940,13 @@ def kea_shared_network_json(shared_networks):
network = {
'name': name,
'authoritative': ('authoritative' in config),
- 'subnet4': []
+ 'subnet4': [],
+ 'user-context': {}
}
+ if 'dynamic_dns_update' in config:
+ network.update(kea_parse_ddns_settings(config['dynamic_dns_update']))
+
if 'option' in config:
network['option-data'] = kea_parse_options(config['option'])
@@ -907,6 +956,9 @@ def kea_shared_network_json(shared_networks):
if 'bootfile_server' in config['option']:
network['next-server'] = config['option']['bootfile_server']
+ if 'ping_check' in config:
+ network['user-context']['enable-ping-check'] = True
+
if 'subnet' in config:
for subnet, subnet_config in config['subnet'].items():
if 'disable' in subnet_config:
diff --git a/python/vyos/vyconf_session.py b/python/vyos/vyconf_session.py
new file mode 100644
index 000000000..506095625
--- /dev/null
+++ b/python/vyos/vyconf_session.py
@@ -0,0 +1,123 @@
+# Copyright 2025 VyOS maintainers and contributors <maintainers@vyos.io>
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public License
+# along with this library. If not, see <http://www.gnu.org/licenses/>.
+#
+#
+
+import tempfile
+import shutil
+from functools import wraps
+from typing import Type
+
+from vyos.proto import vyconf_client
+from vyos.migrate import ConfigMigrate
+from vyos.migrate import ConfigMigrateError
+from vyos.component_version import append_system_version
+
+
+def output(o):
+ out = ''
+ for res in (o.output, o.error, o.warning):
+ if res is not None:
+ out = out + res
+ return out
+
+
+class VyconfSession:
+ def __init__(self, token: str = None, on_error: Type[Exception] = None):
+ if token is None:
+ out = vyconf_client.send_request('setup_session')
+ self.__token = out.output
+ else:
+ self.__token = token
+
+ self.on_error = on_error
+
+ @staticmethod
+ def raise_exception(f):
+ @wraps(f)
+ def wrapped(self, *args, **kwargs):
+ if self.on_error is None:
+ return f(self, *args, **kwargs)
+ o, e = f(self, *args, **kwargs)
+ if e:
+ raise self.on_error(o)
+ return o, e
+
+ return wrapped
+
+ @raise_exception
+ def set(self, path: list[str]) -> tuple[str, int]:
+ out = vyconf_client.send_request('set', token=self.__token, path=path)
+ return output(out), out.status
+
+ @raise_exception
+ def delete(self, path: list[str]) -> tuple[str, int]:
+ out = vyconf_client.send_request('delete', token=self.__token, path=path)
+ return output(out), out.status
+
+ @raise_exception
+ def commit(self) -> tuple[str, int]:
+ out = vyconf_client.send_request('commit', token=self.__token)
+ return output(out), out.status
+
+ @raise_exception
+ def discard(self) -> tuple[str, int]:
+ out = vyconf_client.send_request('discard', token=self.__token)
+ return output(out), out.status
+
+ def session_changed(self) -> bool:
+ out = vyconf_client.send_request('session_changed', token=self.__token)
+ return not bool(out.status)
+
+ @raise_exception
+ def load_config(self, file: str, migrate: bool = False) -> tuple[str, int]:
+ # pylint: disable=consider-using-with
+ if migrate:
+ tmp = tempfile.NamedTemporaryFile()
+ shutil.copy2(file, tmp.name)
+ config_migrate = ConfigMigrate(tmp.name)
+ try:
+ config_migrate.run()
+ except ConfigMigrateError as e:
+ tmp.close()
+ return repr(e), 1
+ file = tmp.name
+ else:
+ tmp = ''
+
+ out = vyconf_client.send_request('load', token=self.__token, location=file)
+ if tmp:
+ tmp.close()
+
+ return output(out), out.status
+
+ @raise_exception
+ def save_config(self, file: str, append_version: bool = False) -> tuple[str, int]:
+ out = vyconf_client.send_request('save', token=self.__token, location=file)
+ if append_version:
+ append_system_version(file)
+ return output(out), out.status
+
+ @raise_exception
+ def show_config(self, path: list[str] = None) -> tuple[str, int]:
+ if path is None:
+ path = []
+ out = vyconf_client.send_request('show_config', token=self.__token, path=path)
+ return output(out), out.status
+
+ def __del__(self):
+ out = vyconf_client.send_request('teardown', token=self.__token)
+ if out.status:
+ print(f'Could not tear down session {self.__token}: {output(out)}')