diff options
Diffstat (limited to 'src/services')
-rw-r--r-- | src/services/api/rest/models.py | 7 | ||||
-rwxr-xr-x | src/services/vyos-commitd | 457 | ||||
-rwxr-xr-x | src/services/vyos-configd | 58 | ||||
-rwxr-xr-x | src/services/vyos-conntrack-logger | 2 | ||||
-rwxr-xr-x | src/services/vyos-domain-resolver | 313 | ||||
-rwxr-xr-x | src/services/vyos-hostsd | 4 | ||||
-rwxr-xr-x | src/services/vyos-http-api-server | 46 | ||||
-rw-r--r-- | src/services/vyos-network-event-logger | 1218 |
8 files changed, 2073 insertions, 32 deletions
diff --git a/src/services/api/rest/models.py b/src/services/api/rest/models.py index 27d9fb5ee..dda50010f 100644 --- a/src/services/api/rest/models.py +++ b/src/services/api/rest/models.py @@ -293,6 +293,13 @@ class TracerouteModel(ApiModel): } +class InfoQueryParams(BaseModel): + model_config = {"extra": "forbid"} + + version: bool = True + hostname: bool = True + + class Success(BaseModel): success: bool data: Union[str, bool, Dict] diff --git a/src/services/vyos-commitd b/src/services/vyos-commitd new file mode 100755 index 000000000..e7f2d82c7 --- /dev/null +++ b/src/services/vyos-commitd @@ -0,0 +1,457 @@ +#!/usr/bin/env python3 +# +# Copyright (C) 2025 VyOS maintainers and contributors +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 2 or later as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +# +# +import os +import sys +import grp +import json +import signal +import socket +import typing +import logging +import traceback +import importlib.util +import io +from contextlib import redirect_stdout +from dataclasses import dataclass +from dataclasses import fields +from dataclasses import field +from dataclasses import asdict +from pathlib import Path + +import tomli + +from google.protobuf.json_format import MessageToDict +from google.protobuf.json_format import ParseDict + +from vyos.defaults import directories +from vyos.utils.boot import boot_configuration_complete +from vyos.configsource import ConfigSourceCache +from vyos.configsource import ConfigSourceError +from vyos.config import Config +from vyos.frrender import FRRender +from vyos.frrender import get_frrender_dict +from vyos import ConfigError + +from vyos.proto import vycall_pb2 + + +@dataclass +class Status: + success: bool = False + out: str = '' + + +@dataclass +class Call: + script_name: str = '' + tag_value: str = None + arg_value: str = None + reply: Status = None + + def set_reply(self, success: bool, out: str): + self.reply = Status(success=success, out=out) + + +@dataclass +class Session: + # pylint: disable=too-many-instance-attributes + + session_id: str = '' + dry_run: bool = False + atomic: bool = False + background: bool = False + config: Config = None + init: Status = None + calls: list[Call] = field(default_factory=list) + + def set_init(self, success: bool, out: str): + self.init = Status(success=success, out=out) + + +@dataclass +class ServerConf: + commitd_socket: str = '' + session_dir: str = '' + running_cache: str = '' + session_cache: str = '' + + +server_conf = None +SOCKET_PATH = None +conf_mode_scripts = None +frr = None + +CFG_GROUP = 'vyattacfg' + +script_stdout_log = '/tmp/vyos-commitd-script-stdout' + +debug = True + +logger = logging.getLogger(__name__) +logs_handler = logging.StreamHandler() +logger.addHandler(logs_handler) + +if debug: + logger.setLevel(logging.DEBUG) +else: + logger.setLevel(logging.INFO) + + +vyos_conf_scripts_dir = directories['conf_mode'] +commitd_include_file = os.path.join(directories['data'], 'configd-include.json') + + +def key_name_from_file_name(f): + return os.path.splitext(f)[0] + + +def module_name_from_key(k): + return k.replace('-', '_') + + +def path_from_file_name(f): + return os.path.join(vyos_conf_scripts_dir, f) + + +def load_conf_mode_scripts(): + with open(commitd_include_file) as f: + try: + include = json.load(f) + except OSError as e: + logger.critical(f'configd include file error: {e}') + sys.exit(1) + except json.JSONDecodeError as e: + logger.critical(f'JSON load error: {e}') + sys.exit(1) + + # import conf_mode scripts + (_, _, filenames) = next(iter(os.walk(vyos_conf_scripts_dir))) + filenames.sort() + + # this is redundant, as all scripts are currently in the include file; + # leave it as an inexpensive check for future changes + load_filenames = [f for f in filenames if f in include] + imports = [key_name_from_file_name(f) for f in load_filenames] + module_names = [module_name_from_key(k) for k in imports] + paths = [path_from_file_name(f) for f in load_filenames] + to_load = list(zip(module_names, paths)) + + modules = [] + + for x in to_load: + spec = importlib.util.spec_from_file_location(x[0], x[1]) + module = importlib.util.module_from_spec(spec) + spec.loader.exec_module(module) + modules.append(module) + + scripts = dict(zip(imports, modules)) + + return scripts + + +def get_session_out(session: Session) -> str: + out = '' + if session.init and session.init.out: + out = f'{out} + init: {session.init.out} + \n' + for call in session.calls: + reply = call.reply + if reply and reply.out: + out = f'{out} + {call.script_name}: {reply.out} + \n' + return out + + +def write_stdout_log(file_name, session): + if boot_configuration_complete(): + return + with open(file_name, 'a') as f: + f.write(get_session_out(session)) + + +def msg_to_commit_data(msg: vycall_pb2.Commit) -> Session: + # pylint: disable=no-member + + d = MessageToDict(msg, preserving_proto_field_name=True) + + # wrap in dataclasses + session = Session(**d) + session.init = Status(**session.init) if session.init else None + session.calls = list(map(lambda x: Call(**x), session.calls)) + for call in session.calls: + call.reply = Status(**call.reply) if call.reply else None + + return session + + +def commit_data_to_msg(obj: Session) -> vycall_pb2.Commit: + # pylint: disable=no-member + + # avoid asdict attempt of deepcopy on Config obj + obj.config = None + + msg = vycall_pb2.Commit() + msg = ParseDict(asdict(obj), msg, ignore_unknown_fields=True) + + return msg + + +def initialization(session: Session) -> Session: + running_cache = os.path.join(server_conf.session_dir, server_conf.running_cache) + session_cache = os.path.join(server_conf.session_dir, server_conf.session_cache) + try: + configsource = ConfigSourceCache( + running_config_cache=running_cache, + session_config_cache=session_cache, + ) + except ConfigSourceError as e: + fail_msg = f'Failed to read config caches: {e}' + logger.critical(fail_msg) + session.set_init(False, fail_msg) + return session + + session.set_init(True, '') + + config = Config(config_source=configsource) + + dependent_func: dict[str, list[typing.Callable]] = {} + setattr(config, 'dependent_func', dependent_func) + + scripts_called = [] + setattr(config, 'scripts_called', scripts_called) + + dry_run = session.dry_run + config.set_bool_attr('dry_run', dry_run) + logger.debug(f'commit dry_run is {dry_run}') + + session.config = config + + return session + + +def run_script(script_name: str, config: Config, args: list) -> tuple[bool, str]: + # pylint: disable=broad-exception-caught + + script = conf_mode_scripts[script_name] + script.argv = args + config.set_level([]) + dry_run = config.get_bool_attr('dry_run') + try: + c = script.get_config(config) + script.verify(c) + if not dry_run: + script.generate(c) + script.apply(c) + else: + if hasattr(script, 'call_dependents'): + script.call_dependents() + except ConfigError as e: + logger.error(e) + return False, str(e) + except Exception: + tb = traceback.format_exc() + logger.error(tb) + return False, tb + + return True, '' + + +def process_call_data(call: Call, config: Config, last: bool = False) -> None: + # pylint: disable=too-many-locals + + script_name = key_name_from_file_name(call.script_name) + + if script_name not in conf_mode_scripts: + fail_msg = f'No such script: {call.script_name}' + logger.critical(fail_msg) + call.set_reply(False, fail_msg) + return + + config.dependency_list.clear() + + tag_value = call.tag_value if call.tag_value is not None else '' + os.environ['VYOS_TAGNODE_VALUE'] = tag_value + + args = call.arg_value.split() if call.arg_value else [] + args.insert(0, f'{script_name}.py') + + tag_ext = f'_{tag_value}' if tag_value else '' + script_record = f'{script_name}{tag_ext}' + scripts_called = getattr(config, 'scripts_called', []) + scripts_called.append(script_record) + + with redirect_stdout(io.StringIO()) as o: + success, err_out = run_script(script_name, config, args) + amb_out = o.getvalue() + o.close() + + out = amb_out + err_out + + call.set_reply(success, out) + + logger.info(f'[{script_name}] {out}') + + if last: + scripts_called = getattr(config, 'scripts_called', []) + logger.debug(f'scripts_called: {scripts_called}') + + if last and success: + tmp = get_frrender_dict(config) + if frr.generate(tmp): + # only apply a new FRR configuration if anything changed + # in comparison to the previous applied configuration + frr.apply() + + +def process_session_data(session: Session) -> Session: + if session.init is None or not session.init.success: + return session + + config = session.config + len_calls = len(session.calls) + for index, call in enumerate(session.calls): + process_call_data(call, config, last=len_calls == index + 1) + + return session + + +def read_message(msg: bytes) -> Session: + """Read message into Session instance""" + + message = vycall_pb2.Commit() # pylint: disable=no-member + message.ParseFromString(msg) + session = msg_to_commit_data(message) + + session = initialization(session) + session = process_session_data(session) + + write_stdout_log(script_stdout_log, session) + + return session + + +def write_reply(session: Session) -> bytearray: + """Serialize modified object to bytearray, prepending data length + header""" + + reply = commit_data_to_msg(session) + encoded_data = reply.SerializeToString() + byte_size = reply.ByteSize() + length_bytes = byte_size.to_bytes(4) + arr = bytearray(length_bytes) + arr.extend(encoded_data) + + return arr + + +def load_server_conf() -> ServerConf: + # pylint: disable=import-outside-toplevel + # pylint: disable=broad-exception-caught + from vyos.defaults import vyconfd_conf + + try: + with open(vyconfd_conf, 'rb') as f: + vyconfd_conf_d = tomli.load(f) + + except Exception as e: + logger.critical(f'Failed to open the vyconfd.conf file {vyconfd_conf}: {e}') + sys.exit(1) + + app = vyconfd_conf_d.get('appliance', {}) + + conf_data = { + k: v for k, v in app.items() if k in [_.name for _ in fields(ServerConf)] + } + + conf = ServerConf(**conf_data) + + return conf + + +def remove_if_exists(f: str): + try: + os.unlink(f) + except FileNotFoundError: + pass + + +def sig_handler(_signum, _frame): + logger.info('stopping server') + raise KeyboardInterrupt + + +def run_server(): + # pylint: disable=global-statement + + global server_conf + global SOCKET_PATH + global conf_mode_scripts + global frr + + signal.signal(signal.SIGTERM, sig_handler) + signal.signal(signal.SIGINT, sig_handler) + + logger.info('starting server') + + server_conf = load_server_conf() + SOCKET_PATH = server_conf.commitd_socket + conf_mode_scripts = load_conf_mode_scripts() + + cfg_group = grp.getgrnam(CFG_GROUP) + os.setgid(cfg_group.gr_gid) + + server_socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + + remove_if_exists(SOCKET_PATH) + server_socket.bind(SOCKET_PATH) + Path(SOCKET_PATH).chmod(0o775) + + # We only need one long-lived instance of FRRender + frr = FRRender() + + server_socket.listen(2) + while True: + try: + conn, _ = server_socket.accept() + logger.debug('connection accepted') + while True: + # receive size of data + data_length = conn.recv(4) + if not data_length: + logger.debug('no data') + # if no data break + break + + length = int.from_bytes(data_length) + # receive data + data = conn.recv(length) + + session = read_message(data) + reply = write_reply(session) + conn.sendall(reply) + + conn.close() + logger.debug('connection closed') + + except KeyboardInterrupt: + break + + server_socket.close() + sys.exit(0) + + +if __name__ == '__main__': + run_server() diff --git a/src/services/vyos-configd b/src/services/vyos-configd index d558e8c26..28acccd2c 100755 --- a/src/services/vyos-configd +++ b/src/services/vyos-configd @@ -28,6 +28,7 @@ import traceback import importlib.util import io from contextlib import redirect_stdout +from enum import Enum import zmq @@ -60,11 +61,14 @@ SOCKET_PATH = 'ipc:///run/vyos-configd.sock' MAX_MSG_SIZE = 65535 PAD_MSG_SIZE = 6 + # Response error codes -R_SUCCESS = 1 -R_ERROR_COMMIT = 2 -R_ERROR_DAEMON = 4 -R_PASS = 8 +class Response(Enum): + SUCCESS = 1 + ERROR_COMMIT = 2 + ERROR_DAEMON = 4 + PASS = 8 + vyos_conf_scripts_dir = directories['conf_mode'] configd_include_file = os.path.join(directories['data'], 'configd-include.json') @@ -73,12 +77,15 @@ configd_env_unset_file = os.path.join(directories['data'], 'vyos-configd-env-uns # sourced on entering config session configd_env_file = '/etc/default/vyos-configd-env' + def key_name_from_file_name(f): return os.path.splitext(f)[0] + def module_name_from_key(k): return k.replace('-', '_') + def path_from_file_name(f): return os.path.join(vyos_conf_scripts_dir, f) @@ -126,7 +133,7 @@ def write_stdout_log(file_name, msg): f.write(msg) -def run_script(script_name, config, args) -> tuple[int, str]: +def run_script(script_name, config, args) -> tuple[Response, str]: # pylint: disable=broad-exception-caught script = conf_mode_scripts[script_name] @@ -139,13 +146,13 @@ def run_script(script_name, config, args) -> tuple[int, str]: script.apply(c) except ConfigError as e: logger.error(e) - return R_ERROR_COMMIT, str(e) + return Response.ERROR_COMMIT, str(e) except Exception: tb = traceback.format_exc() logger.error(tb) - return R_ERROR_COMMIT, tb + return Response.ERROR_COMMIT, tb - return R_SUCCESS, '' + return Response.SUCCESS, '' def initialization(socket): @@ -195,8 +202,9 @@ def initialization(socket): os.environ['VYATTA_CHANGES_ONLY_DIR'] = changes_only_dir_string try: - configsource = ConfigSourceString(running_config_text=active_string, - session_config_text=session_string) + configsource = ConfigSourceString( + running_config_text=active_string, session_config_text=session_string + ) except ConfigSourceError as e: logger.debug(e) return None @@ -211,17 +219,14 @@ def initialization(socket): scripts_called = [] setattr(config, 'scripts_called', scripts_called) - if not hasattr(config, 'frrender_cls'): - setattr(config, 'frrender_cls', FRRender()) - return config -def process_node_data(config, data, _last: bool = False) -> tuple[int, str]: +def process_node_data(config, data, _last: bool = False) -> tuple[Response, str]: if not config: out = 'Empty config' logger.critical(out) - return R_ERROR_DAEMON, out + return Response.ERROR_DAEMON, out script_name = None os.environ['VYOS_TAGNODE_VALUE'] = '' @@ -237,7 +242,7 @@ def process_node_data(config, data, _last: bool = False) -> tuple[int, str]: if not script_name: out = 'Missing script_name' logger.critical(out) - return R_ERROR_DAEMON, out + return Response.ERROR_DAEMON, out if res.group(3): args = res.group(3).split() args.insert(0, f'{script_name}.py') @@ -249,7 +254,7 @@ def process_node_data(config, data, _last: bool = False) -> tuple[int, str]: scripts_called.append(script_record) if script_name not in include_set: - return R_PASS, '' + return Response.PASS, '' with redirect_stdout(io.StringIO()) as o: result, err_out = run_script(script_name, config, args) @@ -262,13 +267,15 @@ def process_node_data(config, data, _last: bool = False) -> tuple[int, str]: def send_result(sock, err, msg): + err_no = err.value + err_name = err.name msg = msg if msg else '' msg_size = min(MAX_MSG_SIZE, len(msg)) - err_rep = err.to_bytes(1) + err_rep = err_no.to_bytes(1) msg_size_rep = f'{msg_size:#0{PAD_MSG_SIZE}x}' - logger.debug(f'Sending reply: error_code {err} with output') + logger.debug(f'Sending reply: {err_name} with output') sock.send_multipart([err_rep, msg_size_rep.encode(), msg.encode()]) write_stdout_log(script_stdout_log, msg) @@ -312,8 +319,10 @@ if __name__ == '__main__': remove_if_file(configd_env_file) os.symlink(configd_env_set_file, configd_env_file) - config = None + # We only need one long-lived instance of FRRender + frr = FRRender() + config = None while True: # Wait for next request from client msg = socket.recv().decode() @@ -332,10 +341,11 @@ if __name__ == '__main__': scripts_called = getattr(config, 'scripts_called', []) logger.debug(f'scripts_called: {scripts_called}') - if hasattr(config, 'frrender_cls') and res == R_SUCCESS: - frrender_cls = getattr(config, 'frrender_cls') + if res == Response.SUCCESS: tmp = get_frrender_dict(config) - frrender_cls.generate(tmp) - frrender_cls.apply() + if frr.generate(tmp): + # only apply a new FRR configuration if anything changed + # in comparison to the previous applied configuration + frr.apply() else: logger.critical(f'Unexpected message: {message}') diff --git a/src/services/vyos-conntrack-logger b/src/services/vyos-conntrack-logger index 9c31b465f..ec0e1f717 100755 --- a/src/services/vyos-conntrack-logger +++ b/src/services/vyos-conntrack-logger @@ -15,10 +15,8 @@ # along with this program. If not, see <http://www.gnu.org/licenses/>. import argparse -import grp import logging import multiprocessing -import os import queue import signal import socket diff --git a/src/services/vyos-domain-resolver b/src/services/vyos-domain-resolver new file mode 100755 index 000000000..fb18724af --- /dev/null +++ b/src/services/vyos-domain-resolver @@ -0,0 +1,313 @@ +#!/usr/bin/env python3 +# +# Copyright (C) 2022-2024 VyOS maintainers and contributors +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 2 or later as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +import json +import time +import logging +import os + +from vyos.configdict import dict_merge +from vyos.configquery import ConfigTreeQuery +from vyos.firewall import fqdn_config_parse +from vyos.firewall import fqdn_resolve +from vyos.ifconfig import WireGuardIf +from vyos.remote import download +from vyos.utils.commit import commit_in_progress +from vyos.utils.dict import dict_search_args +from vyos.utils.kernel import WIREGUARD_REKEY_AFTER_TIME +from vyos.utils.file import makedir, chmod_775, write_file, read_file +from vyos.utils.network import is_valid_ipv4_address_or_range, is_valid_ipv6_address_or_range +from vyos.utils.process import cmd +from vyos.utils.process import run +from vyos.xml_ref import get_defaults + +base = ['firewall'] +timeout = 300 +cache = False +base_firewall = ['firewall'] +base_nat = ['nat'] +base_interfaces = ['interfaces'] + +firewall_config_dir = "/config/firewall" + +domain_state = {} + +ipv4_tables = { + 'ip vyos_mangle', + 'ip vyos_filter', + 'ip vyos_nat', + 'ip raw' +} + +ipv6_tables = { + 'ip6 vyos_mangle', + 'ip6 vyos_filter', + 'ip6 raw' +} + +logger = logging.getLogger(__name__) +logs_handler = logging.StreamHandler() +logger.addHandler(logs_handler) +logger.setLevel(logging.INFO) + +def get_config(conf, node): + node_config = conf.get_config_dict(node, key_mangling=('-', '_'), get_first_key=True, + no_tag_node_value_mangle=True) + + default_values = get_defaults(node, get_first_key=True) + + node_config = dict_merge(default_values, node_config) + + if node == base_firewall and 'global_options' in node_config: + global_config = node_config['global_options'] + global timeout, cache + + if 'resolver_interval' in global_config: + timeout = int(global_config['resolver_interval']) + + if 'resolver_cache' in global_config: + cache = True + + fqdn_config_parse(node_config, node[0]) + + return node_config + +def resolve(domains, ipv6=False): + global domain_state + + ip_list = set() + + for domain in domains: + resolved = fqdn_resolve(domain, ipv6=ipv6) + + cache_key = f'{domain}_ipv6' if ipv6 else domain + + if resolved and cache: + domain_state[cache_key] = resolved + elif not resolved: + if cache_key not in domain_state: + continue + resolved = domain_state[cache_key] + + ip_list = ip_list | resolved + return ip_list + +def nft_output(table, set_name, ip_list): + output = [f'flush set {table} {set_name}'] + if ip_list: + ip_str = ','.join(ip_list) + output.append(f'add element {table} {set_name} {{ {ip_str} }}') + return output + +def nft_valid_sets(): + try: + valid_sets = [] + sets_json = cmd('nft --json list sets') + sets_obj = json.loads(sets_json) + + for obj in sets_obj['nftables']: + if 'set' in obj: + family = obj['set']['family'] + table = obj['set']['table'] + name = obj['set']['name'] + valid_sets.append((f'{family} {table}', name)) + + return valid_sets + except: + return [] + +def update_remote_group(config): + conf_lines = [] + count = 0 + valid_sets = nft_valid_sets() + + remote_groups = dict_search_args(config, 'group', 'remote_group') + if remote_groups: + # Create directory for list files if necessary + if not os.path.isdir(firewall_config_dir): + makedir(firewall_config_dir, group='vyattacfg') + chmod_775(firewall_config_dir) + + for set_name, remote_config in remote_groups.items(): + if 'url' not in remote_config: + continue + nft_ip_set_name = f'R_{set_name}' + nft_ip6_set_name = f'R6_{set_name}' + + # Create list file if necessary + list_file = os.path.join(firewall_config_dir, f"{nft_ip_set_name}.txt") + if not os.path.exists(list_file): + write_file(list_file, '', user="root", group="vyattacfg", mode=0o644) + + # Attempt to download file, use cached version if download fails + try: + download(list_file, remote_config['url'], raise_error=True) + except: + logger.error(f'Failed to download list-file for {set_name} remote group') + logger.info(f'Using cached list-file for {set_name} remote group') + + # Read list file + ip_list = [] + ip6_list = [] + invalid_list = [] + for line in read_file(list_file).splitlines(): + line_first_word = line.strip().partition(' ')[0] + + if is_valid_ipv4_address_or_range(line_first_word): + ip_list.append(line_first_word) + elif is_valid_ipv6_address_or_range(line_first_word): + ip6_list.append(line_first_word) + else: + if line_first_word[0].isalnum(): + invalid_list.append(line_first_word) + + # Load ip tables + for table in ipv4_tables: + if (table, nft_ip_set_name) in valid_sets: + conf_lines += nft_output(table, nft_ip_set_name, ip_list) + + # Load ip6 tables + for table in ipv6_tables: + if (table, nft_ip6_set_name) in valid_sets: + conf_lines += nft_output(table, nft_ip6_set_name, ip6_list) + + invalid_str = ", ".join(invalid_list) + if invalid_str: + logger.info(f'Invalid address for set {set_name}: {invalid_str}') + + count += 1 + + nft_conf_str = "\n".join(conf_lines) + "\n" + code = run(f'nft --file -', input=nft_conf_str) + + logger.info(f'Updated {count} remote-groups in firewall - result: {code}') + + +def update_fqdn(config, node): + conf_lines = [] + count = 0 + valid_sets = nft_valid_sets() + + if node == 'firewall': + domain_groups = dict_search_args(config, 'group', 'domain_group') + if domain_groups: + for set_name, domain_config in domain_groups.items(): + if 'address' not in domain_config: + continue + nft_set_name = f'D_{set_name}' + domains = domain_config['address'] + + ip_list = resolve(domains, ipv6=False) + for table in ipv4_tables: + if (table, nft_set_name) in valid_sets: + conf_lines += nft_output(table, nft_set_name, ip_list) + ip6_list = resolve(domains, ipv6=True) + for table in ipv6_tables: + if (table, nft_set_name) in valid_sets: + conf_lines += nft_output(table, nft_set_name, ip6_list) + count += 1 + + for set_name, domain in config['ip_fqdn'].items(): + table = 'ip vyos_filter' + nft_set_name = f'FQDN_{set_name}' + ip_list = resolve([domain], ipv6=False) + if (table, nft_set_name) in valid_sets: + conf_lines += nft_output(table, nft_set_name, ip_list) + count += 1 + + for set_name, domain in config['ip6_fqdn'].items(): + table = 'ip6 vyos_filter' + nft_set_name = f'FQDN_{set_name}' + ip_list = resolve([domain], ipv6=True) + if (table, nft_set_name) in valid_sets: + conf_lines += nft_output(table, nft_set_name, ip_list) + count += 1 + + else: + # It's NAT + for set_name, domain in config['ip_fqdn'].items(): + table = 'ip vyos_nat' + nft_set_name = f'FQDN_nat_{set_name}' + ip_list = resolve([domain], ipv6=False) + if (table, nft_set_name) in valid_sets: + conf_lines += nft_output(table, nft_set_name, ip_list) + count += 1 + + nft_conf_str = "\n".join(conf_lines) + "\n" + code = run(f'nft --file -', input=nft_conf_str) + + logger.info(f'Updated {count} sets in {node} - result: {code}') + +def update_interfaces(config, node): + if node == 'interfaces': + wg_interfaces = dict_search_args(config, 'wireguard') + if wg_interfaces: + + peer_public_keys = {} + # for each wireguard interfaces + for interface, wireguard in wg_interfaces.items(): + peer_public_keys[interface] = [] + for peer, peer_config in wireguard['peer'].items(): + # check peer if peer host-name or address is set + if 'host_name' in peer_config or 'address' in peer_config: + # check latest handshake + peer_public_keys[interface].append( + peer_config['public_key'] + ) + + now_time = time.time() + for (interface, check_peer_public_keys) in peer_public_keys.items(): + if len(check_peer_public_keys) == 0: + continue + + intf = WireGuardIf(interface, create=False, debug=False) + handshakes = intf.operational.get_latest_handshakes() + + # WireGuard performs a handshake every WIREGUARD_REKEY_AFTER_TIME + # if data is being transmitted between the peers. If no data is + # transmitted, the handshake will not be initiated unless new + # data begins to flow. Each handshake generates a new session + # key, and the key is rotated at least every 120 seconds or + # upon data transmission after a prolonged silence. + for public_key, handshake_time in handshakes.items(): + if public_key in check_peer_public_keys and ( + handshake_time == 0 + or (now_time - handshake_time > 3*WIREGUARD_REKEY_AFTER_TIME) + ): + intf.operational.reset_peer(public_key=public_key) + +if __name__ == '__main__': + logger.info('VyOS domain resolver') + + count = 1 + while commit_in_progress(): + if ( count % 60 == 0 ): + logger.info(f'Commit still in progress after {count}s - waiting') + count += 1 + time.sleep(1) + + conf = ConfigTreeQuery() + firewall = get_config(conf, base_firewall) + nat = get_config(conf, base_nat) + interfaces = get_config(conf, base_interfaces) + + logger.info(f'interval: {timeout}s - cache: {cache}') + + while True: + update_fqdn(firewall, 'firewall') + update_fqdn(nat, 'nat') + update_remote_group(firewall) + update_interfaces(interfaces, 'interfaces') + time.sleep(timeout) diff --git a/src/services/vyos-hostsd b/src/services/vyos-hostsd index 1ba90471e..44f03586c 100755 --- a/src/services/vyos-hostsd +++ b/src/services/vyos-hostsd @@ -233,10 +233,7 @@ # } import os -import sys -import time import json -import signal import traceback import re import logging @@ -245,7 +242,6 @@ import zmq from voluptuous import Schema, MultipleInvalid, Required, Any from collections import OrderedDict from vyos.utils.file import makedir -from vyos.utils.permission import chown from vyos.utils.permission import chmod_755 from vyos.utils.process import popen from vyos.utils.process import process_named_running diff --git a/src/services/vyos-http-api-server b/src/services/vyos-http-api-server index 558561182..be3dd5051 100755 --- a/src/services/vyos-http-api-server +++ b/src/services/vyos-http-api-server @@ -20,18 +20,22 @@ import grp import json import logging import signal +import traceback from time import sleep +from typing import Annotated -from fastapi import FastAPI +from fastapi import FastAPI, Query from fastapi.exceptions import RequestValidationError from uvicorn import Config as UvicornConfig from uvicorn import Server as UvicornServer from vyos.configsession import ConfigSession from vyos.defaults import api_config_state +from vyos.utils.file import read_file +from vyos.version import get_version from api.session import SessionState -from api.rest.models import error +from api.rest.models import error, InfoQueryParams, success CFG_GROUP = 'vyattacfg' @@ -57,11 +61,49 @@ app = FastAPI(debug=True, title="VyOS API", version="0.1.0") + @app.exception_handler(RequestValidationError) async def validation_exception_handler(_request, exc): return error(400, str(exc.errors()[0])) +@app.get('/info') +def info(q: Annotated[InfoQueryParams, Query()]): + show_version = q.version + show_hostname = q.hostname + + prelogin_file = r'/etc/issue' + hostname_file = r'/etc/hostname' + default = 'Welcome to VyOS' + + try: + res = { + 'banner': '', + 'hostname': '', + 'version': '' + } + if show_version: + res.update(version=get_version()) + + if show_hostname: + try: + hostname = read_file(hostname_file) + except Exception: + hostname = 'vyos' + res.update(hostname=hostname) + + banner = read_file(prelogin_file, defaultonfailure=default) + if banner == f'{default} - \\n \\l': + banner = banner.partition(default)[1] + + res.update(banner=banner) + except Exception: + LOG.critical(traceback.format_exc()) + return error(500, 'An internal error occured. Check the logs for details.') + + return success(res) + + ### # Modify uvicorn to allow reloading server within the configsession ### diff --git a/src/services/vyos-network-event-logger b/src/services/vyos-network-event-logger new file mode 100644 index 000000000..840ff3cda --- /dev/null +++ b/src/services/vyos-network-event-logger @@ -0,0 +1,1218 @@ +#!/usr/bin/env python3 +# +# Copyright (C) 2025 VyOS maintainers and contributors +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 2 or later as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import argparse +import logging +import multiprocessing +import queue +import signal +import socket +import threading +from pathlib import Path +from time import sleep +from typing import Dict, AnyStr, List, Union + +from pyroute2.common import AF_MPLS +from pyroute2.iproute import IPRoute +from pyroute2.netlink import rtnl, nlmsg +from pyroute2.netlink.nfnetlink.nfctsocket import nfct_msg +from pyroute2.netlink.rtnl import (rt_proto as RT_PROTO, rt_type as RT_TYPES, + rtypes as RTYPES + ) +from pyroute2.netlink.rtnl.fibmsg import FR_ACT_GOTO, FR_ACT_NOP, FR_ACT_TO_TBL, \ + fibmsg +from pyroute2.netlink.rtnl import ifaddrmsg +from pyroute2.netlink.rtnl import ifinfmsg +from pyroute2.netlink.rtnl import ndmsg +from pyroute2.netlink.rtnl import rtmsg +from pyroute2.netlink.rtnl.rtmsg import nh, rtmsg_base + +from vyos.include.uapi.linux.fib_rules import * +from vyos.include.uapi.linux.icmpv6 import * +from vyos.include.uapi.linux.if_arp import * +from vyos.include.uapi.linux.lwtunnel import * +from vyos.include.uapi.linux.neighbour import * +from vyos.include.uapi.linux.rtnetlink import * + +from vyos.utils.file import read_json + + +manager = multiprocessing.Manager() +cache = manager.dict() + + +class UnsupportedMessageType(Exception): + pass + +shutdown_event = multiprocessing.Event() + +logging.basicConfig(level=logging.INFO, format='%(message)s') +logger = logging.getLogger(__name__) + + +class DebugFormatter(logging.Formatter): + def format(self, record): + self._style._fmt = '[%(asctime)s] %(levelname)s: %(message)s' + return super().format(record) + + +def set_log_level(level: str) -> None: + if level == 'debug': + logger.setLevel(logging.DEBUG) + logger.parent.handlers[0].setFormatter(DebugFormatter()) + else: + logger.setLevel(logging.INFO) + +IFF_FLAGS = { + 'RUNNING': ifinfmsg.IFF_RUNNING, + 'LOOPBACK': ifinfmsg.IFF_LOOPBACK, + 'BROADCAST': ifinfmsg.IFF_BROADCAST, + 'POINTOPOINT': ifinfmsg.IFF_POINTOPOINT, + 'MULTICAST': ifinfmsg.IFF_MULTICAST, + 'NOARP': ifinfmsg.IFF_NOARP, + 'ALLMULTI': ifinfmsg.IFF_ALLMULTI, + 'PROMISC': ifinfmsg.IFF_PROMISC, + 'MASTER': ifinfmsg.IFF_MASTER, + 'SLAVE': ifinfmsg.IFF_SLAVE, + 'DEBUG': ifinfmsg.IFF_DEBUG, + 'DYNAMIC': ifinfmsg.IFF_DYNAMIC, + 'AUTOMEDIA': ifinfmsg.IFF_AUTOMEDIA, + 'PORTSEL': ifinfmsg.IFF_PORTSEL, + 'NOTRAILERS': ifinfmsg.IFF_NOTRAILERS, + 'UP': ifinfmsg.IFF_UP, + 'LOWER_UP': ifinfmsg.IFF_LOWER_UP, + 'DORMANT': ifinfmsg.IFF_DORMANT, + 'ECHO': ifinfmsg.IFF_ECHO, +} + +NEIGH_STATE_FLAGS = { + 'INCOMPLETE': ndmsg.NUD_INCOMPLETE, + 'REACHABLE': ndmsg.NUD_REACHABLE, + 'STALE': ndmsg.NUD_STALE, + 'DELAY': ndmsg.NUD_DELAY, + 'PROBE': ndmsg.NUD_PROBE, + 'FAILED': ndmsg.NUD_FAILED, + 'NOARP': ndmsg.NUD_NOARP, + 'PERMANENT': ndmsg.NUD_PERMANENT, +} + +IFA_FLAGS = { + 'secondary': ifaddrmsg.IFA_F_SECONDARY, + 'temporary': ifaddrmsg.IFA_F_SECONDARY, + 'nodad': ifaddrmsg.IFA_F_NODAD, + 'optimistic': ifaddrmsg.IFA_F_OPTIMISTIC, + 'dadfailed': ifaddrmsg.IFA_F_DADFAILED, + 'home': ifaddrmsg.IFA_F_HOMEADDRESS, + 'deprecated': ifaddrmsg.IFA_F_DEPRECATED, + 'tentative': ifaddrmsg.IFA_F_TENTATIVE, + 'permanent': ifaddrmsg.IFA_F_PERMANENT, + 'mngtmpaddr': ifaddrmsg.IFA_F_MANAGETEMPADDR, + 'noprefixroute': ifaddrmsg.IFA_F_NOPREFIXROUTE, + 'autojoin': ifaddrmsg.IFA_F_MCAUTOJOIN, + 'stable-privacy': ifaddrmsg.IFA_F_STABLE_PRIVACY, +} + +RT_SCOPE_TO_NAME = { + rtmsg.RT_SCOPE_UNIVERSE: 'global', + rtmsg.RT_SCOPE_SITE: 'site', + rtmsg.RT_SCOPE_LINK: 'link', + rtmsg.RT_SCOPE_HOST: 'host', + rtmsg.RT_SCOPE_NOWHERE: 'nowhere', +} + +FAMILY_TO_NAME = { + socket.AF_INET: 'inet', + socket.AF_INET6: 'inet6', + socket.AF_PACKET: 'link', + AF_MPLS: 'mpls', + socket.AF_BRIDGE: 'bridge', +} + +_INFINITY = 4294967295 + + +def _get_iif_name(idx: int) -> str: + """ + Retrieves the interface name associated with a given index. + """ + try: + if_info = IPRoute().link("get", index=idx) + if if_info: + return if_info[0].get_attr('IFLA_IFNAME') + except Exception as e: + pass + + return '' + + +def remember_if_index(idx: int, event_type: int) -> None: + """ + Manages the caching of network interface names based on their index and event type. + + - For RTM_DELLINK event, the interface name is removed from the cache if exists. + - For RTM_NEWLINK event, the interface name is retrieved and updated in the cache. + """ + name = cache.get(idx) + if name: + if event_type == rtnl.RTM_DELLINK: + del cache[idx] + else: + name = _get_iif_name(idx) + if name: + cache[idx] = name + else: + cache[idx] = _get_iif_name(idx) + + +class BaseFormatter: + """ + A base class providing utility methods for formatting network message data. + """ + def _get_if_name_by_index(self, idx: int) -> str: + """ + Retrieves the name of a network interface based on its index. + + Uses a cached lookup for efficiency. If the name is not found in the cache, + it queries the system and updates the cache. + """ + if_name = cache.get(idx) + if not if_name: + if_name = _get_iif_name(idx) + cache[idx] = if_name + + return if_name + + def _format_rttable(self, idx: int) -> str: + """ + Formats a route table identifier into a readable name. + """ + return f'{RT_TABLE_TO_NAME.get(idx, idx)}' + + def _parse_flag(self, data: int, flags: dict) -> list: + """ + Extracts and returns flag names equal the bits set in a numeric value. + """ + result = list() + if data: + for key, val in flags.items(): + if data & val: + result.append(key) + data &= ~val + + if data: + result.append(f"{data:#x}") + + return result + + def af_bit_len(self, af: int) -> int: + """ + Gets the bit length of a given address family. + Supports common address families like IPv4, IPv6, and MPLS. + """ + _map = { + socket.AF_INET6: 128, + socket.AF_INET: 32, + AF_MPLS: 20, + } + + return _map.get(af) + + def _format_simple_field(self, data: str, prefix: str='') -> str: + """ + Formats a simple field with an optional prefix. + + A simple field represents a value that does not require additional + parsing and is used as is. + """ + return self._output(f'{prefix} {data}') if data is not None else '' + + def _output(self, data: str) -> str: + """ + Standardizes the output format. + + Ensures that the output is enclosed with single spaces and has no leading + or trailing whitespace. + """ + return f' {data.strip()} ' if data else '' + + +class BaseMSGFormatter(BaseFormatter): + """ + A base formatter class for network messages. + This class provides common methods for formatting network-related messages, + """ + + def _prepare_start_message(self, event: str) -> str: + """ + Prepares a starting message string based on the event type. + """ + if event in ['RTM_DELROUTE', 'RTM_DELLINK', 'RTM_DELNEIGH', + 'RTM_DELADDR', 'RTM_DELADDRLABEL', 'RTM_DELRULE', + 'RTM_DELNETCONF']: + return 'Deleted ' + if event == 'RTM_GETNEIGH': + return 'Miss ' + return '' + + def _format_flow_field(self, data: int) -> str: + """ + Formats a flow field to represent traffic realms. + """ + to = data & 0xFFFF + from_ = data >> 16 + result = f"realm{'s' if from_ else ''} " + if from_: + result += f'{from_}/' + result += f'{to}' + + return self._output(result) + + def format(self, msg: nlmsg) -> str: + """ + Abstract method to format a complete message. + + This method must be implemented by subclasses to provide specific formatting + logic for different types of messages. + """ + raise NotImplementedError(f'{msg.get("event")}: {msg}') + + +class LinkFormatter(BaseMSGFormatter): + """ + A formatter class for handling link-related network messages + `RTM_NEWLINK` and `RTM_DELLINK`. + """ + def _format_iff_flags(self, flags: int) -> str: + """ + Formats interface flags into a human-readable string. + """ + result = list() + if flags: + if flags & IFF_FLAGS['UP'] and not flags & IFF_FLAGS['RUNNING']: + result.append('NO-CARRIER') + + flags &= ~IFF_FLAGS['RUNNING'] + + result.extend(self._parse_flag(flags, IFF_FLAGS)) + + return self._output(f'<{(",").join(result)}>') + + def _format_if_props(self, data: ifinfmsg.ifinfbase.proplist) -> str: + """ + Formats interface alternative name properties. + """ + result = '' + for rec in data.altnames(): + result += f'[altname {rec}] ' + return self._output(result) + + def _format_link(self, msg: ifinfmsg.ifinfmsg) -> str: + """ + Formats the link attribute of a network interface message. + """ + if msg.get_attr("IFLA_LINK") is not None: + iflink = msg.get_attr("IFLA_LINK") + if iflink: + if msg.get_attr("IFLA_LINK_NETNSID"): + return f'if{iflink}' + else: + return self._get_if_name_by_index(iflink) + return 'NONE' + + def _format_link_info(self, msg: ifinfmsg.ifinfmsg) -> str: + """ + Formats detailed information about the link, including type, address, + broadcast address, and permanent address. + """ + result = f'link/{ARPHRD_TO_NAME.get(msg.get("ifi_type"), msg.get("ifi_type"))}' + result += self._format_simple_field(msg.get_attr('IFLA_ADDRESS')) + + if msg.get_attr("IFLA_BROADCAST"): + if msg.get('flags') & ifinfmsg.IFF_POINTOPOINT: + result += f' peer' + else: + result += f' brd' + result += f' {msg.get_attr("IFLA_BROADCAST")}' + + if msg.get_attr("IFLA_PERM_ADDRESS"): + if not msg.get_attr("IFLA_ADDRESS") or \ + msg.get_attr("IFLA_ADDRESS") != msg.get_attr("IFLA_PERM_ADDRESS"): + result += f' permaddr {msg.get_attr("IFLA_PERM_ADDRESS")}' + + return self._output(result) + + def format(self, msg: ifinfmsg.ifinfmsg): + """ + Formats a network link message into a structured output string. + """ + if msg.get("family") not in [socket.AF_UNSPEC, socket.AF_BRIDGE]: + return None + + message = self._prepare_start_message(msg.get('event')) + + link = self._format_link(msg) + + message += f'{msg.get("index")}: {msg.get_attr("IFLA_IFNAME")}' + message += f'@{link}' if link else '' + message += f': {self._format_iff_flags(msg.get("flags"))}' + + message += self._format_simple_field(msg.get_attr('IFLA_MTU'), prefix='mtu') + message += self._format_simple_field(msg.get_attr('IFLA_QDISC'), prefix='qdisc') + message += self._format_simple_field(msg.get_attr('IFLA_OPERSTATE'), prefix='state') + message += self._format_simple_field(msg.get_attr('IFLA_GROUP'), prefix='group') + message += self._format_simple_field(msg.get_attr('IFLA_MASTER'), prefix='master') + + message += self._format_link_info(msg) + + if msg.get_attr('IFLA_PROP_LIST'): + message += self._format_if_props(msg.get_attr('IFLA_PROP_LIST')) + + return self._output(message) + + +class EncapFormatter(BaseFormatter): + """ + A formatter class for handling encapsulation attributes in routing messages. + """ + # TODO: implement other lwtunnel decoder in pyroute2 + # https://github.com/svinota/pyroute2/blob/78cfe838bec8d96324811a3962bda15fb028e0ce/pyroute2/netlink/rtnl/rtmsg.py#L657 + def __init__(self): + """ + Initializes the EncapFormatter with supported encapsulation types. + """ + self.formatters = { + rtmsg.LWTUNNEL_ENCAP_MPLS: self.mpls_format, + rtmsg.LWTUNNEL_ENCAP_SEG6: self.seg6_format, + rtmsg.LWTUNNEL_ENCAP_BPF: self.bpf_format, + rtmsg.LWTUNNEL_ENCAP_SEG6_LOCAL: self.seg6local_format, + } + + def _format_srh(self, data: rtmsg_base.seg6_encap_info.ipv6_sr_hdr): + """ + Formats Segment Routing Header (SRH) attributes. + """ + result = '' + # pyroute2 decode mode only as inline or encap (encap, l2encap, encap.red, l2encap.red") + # https://github.com/svinota/pyroute2/blob/78cfe838bec8d96324811a3962bda15fb028e0ce/pyroute2/netlink/rtnl/rtmsg.py#L220 + for key in ['mode', 'segs']: + + val = data.get(key) + + if val: + if key == 'segs': + result += f'{key} {len(val)} {val} ' + else: + result += f'{key} {val} ' + + return self._output(result) + + def _format_bpf_object(self, data: rtmsg_base.bpf_encap_info, attr_name: str, attr_key: str): + """ + Formats eBPF program attributes. + """ + attr = data.get_attr(attr_name) + if not attr: + return '' + result = '' + if attr.get_attr("LWT_BPF_PROG_NAME"): + result += f'{attr.get_attr("LWT_BPF_PROG_NAME")} ' + if attr.get_attr("LWT_BPF_PROG_FD"): + result += f'{attr.get_attr("LWT_BPF_PROG_FD")} ' + + return self._output(f'{attr_key} {result.strip()}') + + def mpls_format(self, data: rtmsg_base.mpls_encap_info): + """ + Formats MPLS encapsulation attributes. + """ + result = '' + if data.get_attr("MPLS_IPTUNNEL_DST"): + for rec in data.get_attr("MPLS_IPTUNNEL_DST"): + for key, val in rec.items(): + if val: + result += f'{key} {val} ' + + if data.get_attr("MPLS_IPTUNNEL_TTL"): + result += f' ttl {data.get_attr("MPLS_IPTUNNEL_TTL")}' + + return self._output(result) + + def bpf_format(self, data: rtmsg_base.bpf_encap_info): + """ + Formats eBPF encapsulation attributes. + """ + result = '' + result += self._format_bpf_object(data, 'LWT_BPF_IN', 'in') + result += self._format_bpf_object(data, 'LWT_BPF_OUT', 'out') + result += self._format_bpf_object(data, 'LWT_BPF_XMIT', 'xmit') + + if data.get_attr('LWT_BPF_XMIT_HEADROOM'): + result += f'headroom {data.get_attr("LWT_BPF_XMIT_HEADROOM")} ' + + return self._output(result) + + def seg6_format(self, data: rtmsg_base.seg6_encap_info): + """ + Formats Segment Routing (SEG6) encapsulation attributes. + """ + result = '' + if data.get_attr("SEG6_IPTUNNEL_SRH"): + result += self._format_srh(data.get_attr("SEG6_IPTUNNEL_SRH")) + + return self._output(result) + + def seg6local_format(self, data: rtmsg_base.seg6local_encap_info): + """ + Formats SEG6 local encapsulation attributes. + """ + result = '' + formatters = { + 'SEG6_LOCAL_ACTION': lambda val: f' action {next((k for k, v in data.action.actions.items() if v == val), "unknown")}', + 'SEG6_LOCAL_SRH': lambda val: f' {self._format_srh(val)}', + 'SEG6_LOCAL_TABLE': lambda val: f' table {self._format_rttable(val)}', + 'SEG6_LOCAL_NH4': lambda val: f' nh4 {val}', + 'SEG6_LOCAL_NH6': lambda val: f' nh6 {val}', + 'SEG6_LOCAL_IIF': lambda val: f' iif {self._get_if_name_by_index(val)}', + 'SEG6_LOCAL_OIF': lambda val: f' oif {self._get_if_name_by_index(val)}', + 'SEG6_LOCAL_BPF': lambda val: f' endpoint {val.get("LWT_BPF_PROG_NAME")}', + 'SEG6_LOCAL_VRFTABLE': lambda val: f' vrftable {self._format_rttable(val)}', + } + + for rec in data.get('attrs'): + if rec[0] in formatters: + result += formatters[rec[0]](rec[1]) + + return self._output(result) + + def format(self, type: int, data: Union[rtmsg_base.mpls_encap_info, + rtmsg_base.bpf_encap_info, + rtmsg_base.seg6_encap_info, + rtmsg_base.seg6local_encap_info]): + """ + Formats encapsulation attributes based on their type. + """ + result = '' + formatter = self.formatters.get(type) + + result += f'encap {ENCAP_TO_NAME.get(type, "unknown")}' + + if formatter: + result += f' {formatter(data)}' + + return self._output(result) + + +class RouteFormatter(BaseMSGFormatter): + """ + A formatter class for handling network routing messages + `RTM_NEWROUTE` and `RTM_DELROUTE`. + """ + + def _format_rt_flags(self, flags: int) -> str: + """ + Formats route flags into a comma-separated string. + """ + result = list() + result.extend(self._parse_flag(flags, RT_FlAGS)) + + return self._output(",".join(result)) + + def _format_rta_encap(self, type: int, data: Union[rtmsg_base.mpls_encap_info, + rtmsg_base.bpf_encap_info, + rtmsg_base.seg6_encap_info, + rtmsg_base.seg6local_encap_info]) -> str: + """ + Formats encapsulation attributes. + """ + return EncapFormatter().format(type, data) + + def _format_rta_newdest(self, data: str) -> str: + """ + Formats a new destination attribute. + """ + return self._output(f'as to {data}') + + def _format_rta_gateway(self, data: str) -> str: + """ + Formats a gateway attribute. + """ + return self._output(f'via {data}') + + def _format_rta_via(self, data: str) -> str: + """ + Formats a 'via' route attribute. + """ + return self._output(f'{data}') + + def _format_rta_metrics(self, data: rtmsg_base.metrics): + """ + Formats routing metrics. + """ + result = '' + + def __format_metric_time(_val: int) -> str: + """Formats metric time values into seconds or milliseconds.""" + return f"{_val / 1000}s" if _val >= 1000 else f"{_val}ms" + + def __format_reatures(_val: int) -> str: + """Parse and formats routing feature flags.""" + result = self._parse_flag(_val, {'ecn': RTAX_FEATURE_ECN, + 'tcp_usec_ts': RTAX_FEATURE_TCP_USEC_TS}) + return ",".join(result) + + formatters = { + 'RTAX_MTU': lambda val: f' mtu {val}', + 'RTAX_WINDOW': lambda val: f' window {val}', + 'RTAX_RTT': lambda val: f' rtt {__format_metric_time(val / 8)}', + 'RTAX_RTTVAR': lambda val: f' rttvar {__format_metric_time(val / 4)}', + 'RTAX_SSTHRESH': lambda val: f' ssthresh {val}', + 'RTAX_CWND': lambda val: f' cwnd {val}', + 'RTAX_ADVMSS': lambda val: f' advmss {val}', + 'RTAX_REORDERING': lambda val: f' reordering {val}', + 'RTAX_HOPLIMIT': lambda val: f' hoplimit {val}', + 'RTAX_INITCWND': lambda val: f' initcwnd {val}', + 'RTAX_FEATURES': lambda val: f' features {__format_reatures(val)}', + 'RTAX_RTO_MIN': lambda val: f' rto_min {__format_metric_time(val)}', + 'RTAX_INITRWND': lambda val: f' initrwnd {val}', + 'RTAX_QUICKACK': lambda val: f' quickack {val}', + } + + for rec in data.get('attrs'): + if rec[0] in formatters: + result += formatters[rec[0]](rec[1]) + + return self._output(result) + + def _format_rta_pref(self, data: int) -> str: + """ + Formats a pref attribute. + """ + pref = { + ICMPV6_ROUTER_PREF_LOW: "low", + ICMPV6_ROUTER_PREF_MEDIUM: "medium", + ICMPV6_ROUTER_PREF_HIGH: "high", + } + + return self._output(f' pref {pref.get(data, data)}') + + def _format_rta_multipath(self, mcast_cloned: bool, family: int, data: List[nh]) -> str: + """ + Formats multipath route attributes. + """ + result = '' + first = True + for rec in data: + if mcast_cloned: + if first: + result += ' Oifs: ' + first = False + else: + result += ' ' + else: + result += ' nexthop ' + + if rec.get_attr('RTA_ENCAP'): + result += self._format_rta_encap(rec.get_attr('RTA_ENCAP_TYPE'), + rec.get_attr('RTA_ENCAP')) + + if rec.get_attr('RTA_NEWDST'): + result += self._format_rta_newdest(rec.get_attr('RTA_NEWDST')) + + if rec.get_attr('RTA_GATEWAY'): + result += self._format_rta_gateway(rec.get_attr('RTA_GATEWAY')) + + if rec.get_attr('RTA_VIA'): + result += self._format_rta_via(rec.get_attr('RTA_VIA')) + + if rec.get_attr('RTA_FLOW'): + result += self._format_flow_field(rec.get_attr('RTA_FLOW')) + + result += f' dev {self._get_if_name_by_index(rec.get("oif"))}' + if mcast_cloned: + if rec.get("hops") != 1: + result += f' (ttl>{rec.get("hops")})' + else: + if family != AF_MPLS: + result += f' weight {rec.get("hops") + 1}' + + result += self._format_rt_flags(rec.get("flags")) + + return self._output(result) + + def format(self, msg: rtmsg.rtmsg) -> str: + """ + Formats a network route message into a human-readable string representation. + """ + message = self._prepare_start_message(msg.get('event')) + + message += RT_TYPES.get(msg.get('type')) + + if msg.get_attr('RTA_DST'): + host_len = self.af_bit_len(msg.get('family')) + if msg.get('dst_len') != host_len: + message += f' {msg.get_attr("RTA_DST")}/{msg.get("dst_len")}' + else: + message += f' {msg.get_attr("RTA_DST")}' + elif msg.get('dst_len'): + message += f' 0/{msg.get("dst_len")}' + else: + message += ' default' + + if msg.get_attr('RTA_SRC'): + message += f' from {msg.get_attr("RTA_SRC")}' + elif msg.get('src_len'): + message += f' from 0/{msg.get("src_len")}' + + message += self._format_simple_field(msg.get_attr('RTA_NH_ID'), prefix='nhid') + + if msg.get_attr('RTA_NEWDST'): + message += self._format_rta_newdest(msg.get_attr('RTA_NEWDST')) + + if msg.get_attr('RTA_ENCAP'): + message += self._format_rta_encap(msg.get_attr('RTA_ENCAP_TYPE'), + msg.get_attr('RTA_ENCAP')) + + message += self._format_simple_field(msg.get('tos'), prefix='tos') + + if msg.get_attr('RTA_GATEWAY'): + message += self._format_rta_gateway(msg.get_attr('RTA_GATEWAY')) + + if msg.get_attr('RTA_VIA'): + message += self._format_rta_via(msg.get_attr('RTA_VIA')) + + if msg.get_attr('RTA_OIF') is not None: + message += f' dev {self._get_if_name_by_index(msg.get_attr("RTA_OIF"))}' + + if msg.get_attr("RTA_TABLE"): + message += f' table {self._format_rttable(msg.get_attr("RTA_TABLE"))}' + + if not msg.get('flags') & RTM_F_CLONED: + message += f' proto {RT_PROTO.get(msg.get("proto"))}' + + if not msg.get('scope') == rtmsg.RT_SCOPE_UNIVERSE: + message += f' scope {RT_SCOPE_TO_NAME.get(msg.get("scope"))}' + + message += self._format_simple_field(msg.get_attr('RTA_PREFSRC'), prefix='src') + message += self._format_simple_field(msg.get_attr('RTA_PRIORITY'), prefix='metric') + + message += self._format_rt_flags(msg.get("flags")) + + if msg.get_attr('RTA_MARK'): + mark = msg.get_attr("RTA_MARK") + if mark >= 16: + message += f' mark 0x{mark:x}' + else: + message += f' mark {mark}' + + if msg.get_attr('RTA_FLOW'): + message += self._format_flow_field(msg.get_attr('RTA_FLOW')) + + message += self._format_simple_field(msg.get_attr('RTA_UID'), prefix='uid') + + if msg.get_attr('RTA_METRICS'): + message += self._format_rta_metrics(msg.get_attr("RTA_METRICS")) + + if msg.get_attr('RTA_IIF') is not None: + message += f' iif {self._get_if_name_by_index(msg.get_attr("RTA_IIF"))}' + + if msg.get_attr('RTA_PREF') is not None: + message += self._format_rta_pref(msg.get_attr("RTA_PREF")) + + if msg.get_attr('RTA_TTL_PROPAGATE') is not None: + message += f' ttl-propogate {"enabled" if msg.get_attr("RTA_TTL_PROPAGATE") else "disabled"}' + + if msg.get_attr('RTA_MULTIPATH') is not None: + _tmp = self._format_rta_multipath( + mcast_cloned=msg.get('flags') & RTM_F_CLONED and msg.get('type') == RTYPES['RTN_MULTICAST'], + family=msg.get('family'), + data=msg.get_attr("RTA_MULTIPATH")) + message += f' {_tmp}' + + return self._output(message) + + +class AddrFormatter(BaseMSGFormatter): + """ + A formatter class for handling address-related network messages + `RTM_NEWADDR` and `RTM_DELADDR`. + """ + INFINITY_LIFE_TIME = _INFINITY + + def _format_ifa_flags(self, flags: int, family: int) -> str: + """ + Formats address flags into a human-readable string. + """ + result = list() + if flags: + if not flags & IFA_FLAGS['permanent']: + result.append('dynamic') + flags &= ~IFA_FLAGS['permanent'] + + if flags & IFA_FLAGS['temporary'] and family == socket.AF_INET6: + result.append('temporary') + flags &= ~IFA_FLAGS['temporary'] + + result.extend(self._parse_flag(flags, IFA_FLAGS)) + + return self._output(",".join(result)) + + def _format_ifa_addr(self, local: str, addr: str, preflen: int, priority: int) -> str: + """ + Formats address information into a shuman-readable string. + """ + result = '' + local = local or addr + addr = addr or local + + if local: + result += f'{local}' + if addr and addr != local: + result += f' peer {addr}' + result += f'/{preflen}' + + if priority: + result += f' {priority}' + + return self._output(result) + + def _format_ifa_cacheinfo(self, data: ifaddrmsg.ifaddrmsg.cacheinfo) -> str: + """ + Formats cache information for an address. + """ + result = '' + _map = { + 'ifa_valid': 'valid_lft', + 'ifa_preferred': 'preferred_lft', + } + + for key in ['ifa_valid', 'ifa_preferred']: + val = data.get(key) + if val == self.INFINITY_LIFE_TIME: + result += f'{_map.get(key)} forever ' + else: + result += f'{_map.get(key)} {val}sec ' + + return self._output(result) + + def format(self, msg: ifaddrmsg.ifaddrmsg) -> str: + """ + Formats a full network address message. + Combine attributes such as index, family, address, flags, and cache + information into a structured output string. + """ + message = self._prepare_start_message(msg.get('event')) + + message += f'{msg.get("index")}: {self._get_if_name_by_index(msg.get("index"))} ' + message += f'{FAMILY_TO_NAME.get(msg.get("family"), msg.get("family"))} ' + + message += self._format_ifa_addr( + msg.get_attr('IFA_LOCAL'), + msg.get_attr('IFA_ADDRESS'), + msg.get('prefixlen'), + msg.get_attr('IFA_RT_PRIORITY') + ) + message += self._format_simple_field(msg.get_attr('IFA_BROADCAST'), prefix='brd') + message += self._format_simple_field(msg.get_attr('IFA_ANYCAST'), prefix='any') + + if msg.get('scope') is not None: + message += f' scope {RT_SCOPE_TO_NAME.get(msg.get("scope"))}' + + message += self._format_ifa_flags(msg.get_attr("IFA_FLAGS"), msg.get("family")) + message += self._format_simple_field(msg.get_attr('IFA_LABEL'), prefix='label:') + + if msg.get_attr('IFA_CACHEINFO'): + message += self._format_ifa_cacheinfo(msg.get_attr('IFA_CACHEINFO')) + + return self._output(message) + + +class NeighFormatter(BaseMSGFormatter): + """ + A formatter class for handling neighbor-related network messages + `RTM_NEWNEIGH`, `RTM_DELNEIGH` and `RTM_GETNEIGH` + """ + def _format_ntf_flags(self, flags: int) -> str: + """ + Formats neighbor table entry flags into a human-readable string. + """ + result = list() + result.extend(self._parse_flag(flags, NTF_FlAGS)) + + return self._output(",".join(result)) + + def _format_neigh_state(self, data: int) -> str: + """ + Formats the state of a neighbor entry. + """ + result = list() + result.extend(self._parse_flag(data, NEIGH_STATE_FLAGS)) + + return self._output(",".join(result)) + + def format(self, msg: ndmsg.ndmsg) -> str: + """ + Formats a full neighbor-related network message. + Combine attributes such as destination, device, link-layer address, + flags, state, and protocol into a structured output string. + """ + message = self._prepare_start_message(msg.get('event')) + message += self._format_simple_field(msg.get_attr('NDA_DST'), prefix='') + + if msg.get("ifindex") is not None: + message += f' dev {self._get_if_name_by_index(msg.get("ifindex"))}' + + message += self._format_simple_field(msg.get_attr('NDA_LLADDR'), prefix='lladdr') + message += f' {self._format_ntf_flags(msg.get("flags"))}' + message += f' {self._format_neigh_state(msg.get("state"))}' + + if msg.get_attr('NDA_PROTOCOL'): + message += f' proto {RT_PROTO.get(msg.get_attr("NDA_PROTOCOL"), msg.get_attr("NDA_PROTOCOL"))}' + + return self._output(message) + + +class RuleFormatter(BaseMSGFormatter): + """ + A formatter class for handling ruting tule network messages + `RTM_NEWRULE` and `RTM_DELRULE` + """ + def _format_direction(self, data: str, length: int, host_len: int): + """ + Formats the direction of traffic based on source or destination and prefix length. + """ + result = '' + if data: + result += f' {data}' + if length != host_len: + result += f'/{length}' + elif length: + result += f' 0/{length}' + + return self._output(result) + + def _format_fra_interface(self, data: str, flags: int, prefix: str): + """ + Formats interface-related attributes. + """ + result = f'{prefix} {data}' + if flags & FIB_RULE_IIF_DETACHED: + result += '[detached]' + + return self._output(result) + + def _format_fra_range(self, data: [str, dict], prefix: str): + """ + Formats a range of values (e.g., UID, sport, or dport). + """ + result = '' + if data: + if isinstance(data, str): + result += f' {prefix} {data}' + else: + result += f' {prefix} {data.get("start")}:{data.get("end")}' + return self._output(result) + + def _format_fra_table(self, msg: fibmsg): + """ + Formats the lookup table and associated attributes in the message. + """ + def __format_field(data: int, prefix: str): + if data and data not in [-1, _INFINITY]: + return f' {prefix} {data}' + return '' + + result = '' + table = msg.get_attr('FRA_TABLE') or msg.get('table') + if table: + result += f' lookup {self._format_rttable(table)}' + result += __format_field(msg.get_attr('FRA_SUPPRESS_PREFIXLEN'), 'suppress_prefixlength') + result += __format_field(msg.get_attr('FRA_SUPPRESS_IFGROUP'), 'suppress_ifgroup') + + return self._output(result) + + def _format_fra_action(self, msg: fibmsg): + """ + Formats the action associated with the rule. + """ + result = '' + if msg.get('action') == RTYPES.get('RTN_NAT'): + if msg.get_attr('RTA_GATEWAY'): # looks like deprecated but still use in iproute2 + result += f' map-to {msg.get_attr("RTA_GATEWAY")}' + else: + result += ' masquerade' + + elif msg.get('action') == FR_ACT_GOTO: + result += f' goto {msg.get_attr("FRA_GOTO") or "none"}' + if msg.get('flags') & FIB_RULE_UNRESOLVED: + result += ' [unresolved]' + + elif msg.get('action') == FR_ACT_NOP: + result += ' nop' + + elif msg.get('action') != FR_ACT_TO_TBL: + result += f' {RTYPES.get(msg.get("action"))}' + + return self._output(result) + + def format(self, msg: fibmsg): + """ + Formats a complete routing rule message. + Combines information about source, destination, interfaces, actions, + and other attributes into a single formatted string. + """ + message = self._prepare_start_message(msg.get('event')) + host_len = self.af_bit_len(msg.get('family')) + message += self._format_simple_field(msg.get_attr('FRA_PRIORITY'), prefix='') + + if msg.get('flags') & FIB_RULE_INVERT: + message += ' not' + + tmp = self._format_direction(msg.get_attr('FRA_SRC'), msg.get('src_len'), host_len) + message += ' from' + (tmp if tmp else ' all ') + + if msg.get_attr('FRA_DST'): + tmp = self._format_direction(msg.get_attr('FRA_DST'), msg.get('dst_len'), host_len) + message += ' to' + tmp + + if msg.get('tos'): + message += f' tos {hex(msg.get("tos"))}' + + if msg.get_attr('FRA_FWMARK') or msg.get_attr('FRA_FWMASK'): + mark = msg.get_attr('FRA_FWMARK') or 0 + mask = msg.get_attr('FRA_FWMASK') or 0 + if mask != 0xFFFFFFFF: + message += f' fwmark {mark}/{mask}' + else: + message += f' fwmark {mark}' + + if msg.get_attr('FRA_IIFNAME'): + message += self._format_fra_interface( + msg.get_attr('FRA_IIFNAME'), + msg.get('flags'), + 'iif' + ) + + if msg.get_attr('FRA_OIFNAME'): + message += self._format_fra_interface( + msg.get_attr('FRA_OIFNAME'), + msg.get('flags'), + 'oif' + ) + + if msg.get_attr('FRA_L3MDEV'): + message += f' lookup [l3mdev-table]' + + if msg.get_attr('FRA_UID_RANGE'): + message += self._format_fra_range(msg.get_attr('FRA_UID_RANGE'), 'uidrange') + + message += self._format_simple_field(msg.get_attr('FRA_IP_PROTO'), prefix='ipproto') + + if msg.get_attr('FRA_SPORT_RANGE'): + message += self._format_fra_range(msg.get_attr('FRA_SPORT_RANGE'), 'sport') + + if msg.get_attr('FRA_DPORT_RANGE'): + message += self._format_fra_range(msg.get_attr('FRA_DPORT_RANGE'), 'dport') + + message += self._format_simple_field(msg.get_attr('FRA_TUN_ID'), prefix='tun_id') + + message += self._format_fra_table(msg) + + if msg.get_attr('FRA_FLOW'): + message += self._format_flow_field(msg.get_attr('FRA_FLOW')) + + message += self._format_fra_action(msg) + + if msg.get_attr('FRA_PROTOCOL'): + message += f' proto {RT_PROTO.get(msg.get_attr("FRA_PROTOCOL"), msg.get_attr("FRA_PROTOCOL"))}' + + return self._output(message) + + +class AddrlabelFormatter(BaseMSGFormatter): + # Not implemented decoder on pytroute2 but ip monitor use it message + pass + + +class PrefixFormatter(BaseMSGFormatter): + # Not implemented decoder on pytroute2 but ip monitor use it message + pass + + +class NetconfFormatter(BaseMSGFormatter): + # Not implemented decoder on pytroute2 but ip monitor use it message + pass + + +EVENT_MAP = { + rtnl.RTM_NEWROUTE: {'parser': RouteFormatter, 'event': 'route'}, + rtnl.RTM_DELROUTE: {'parser': RouteFormatter, 'event': 'route'}, + rtnl.RTM_NEWLINK: {'parser': LinkFormatter, 'event': 'link'}, + rtnl.RTM_DELLINK: {'parser': LinkFormatter, 'event': 'link'}, + rtnl.RTM_NEWADDR: {'parser': AddrFormatter, 'event': 'addr'}, + rtnl.RTM_DELADDR: {'parser': AddrFormatter, 'event': 'addr'}, + # rtnl.RTM_NEWADDRLABEL: {'parser': AddrlabelFormatter, 'event': 'addrlabel'}, + # rtnl.RTM_DELADDRLABEL: {'parser': AddrlabelFormatter, 'event': 'addrlabel'}, + rtnl.RTM_NEWNEIGH: {'parser': NeighFormatter, 'event': 'neigh'}, + rtnl.RTM_DELNEIGH: {'parser': NeighFormatter, 'event': 'neigh'}, + rtnl.RTM_GETNEIGH: {'parser': NeighFormatter, 'event': 'neigh'}, + # rtnl.RTM_NEWPREFIX: {'parser': PrefixFormatter, 'event': 'prefix'}, + rtnl.RTM_NEWRULE: {'parser': RuleFormatter, 'event': 'rule'}, + rtnl.RTM_DELRULE: {'parser': RuleFormatter, 'event': 'rule'}, + # rtnl.RTM_NEWNETCONF: {'parser': NetconfFormatter, 'event': 'netconf'}, + # rtnl.RTM_DELNETCONF: {'parser': NetconfFormatter, 'event': 'netconf'}, +} + + +def sig_handler(signum, frame): + process_name = multiprocessing.current_process().name + logger.debug( + f'[{process_name}]: {"Shutdown" if signum == signal.SIGTERM else "Reload"} signal received...' + ) + shutdown_event.set() + + +def parse_event_type(header: Dict) -> tuple: + """ + Extract event type and parser. + """ + event_type = EVENT_MAP.get(header['type'], {}).get('event', 'unknown') + _parser = EVENT_MAP.get(header['type'], {}).get('parser') + + if _parser is None: + raise UnsupportedMessageType(f'Unsupported message type: {header["type"]}') + + return event_type, _parser + + +def is_need_to_log(event_type: AnyStr, conf_event: Dict): + """ + Filter message by event type and protocols + """ + conf = conf_event.get(event_type) + if conf == {}: + return True + return False + + +def parse_event(msg: nfct_msg, conf_event: Dict) -> str: + """ + Convert nfct_msg to internal data dict. + """ + data = '' + event_type, parser = parse_event_type(msg['header']) + if event_type == 'link': + remember_if_index(idx=msg.get('index'), event_type=msg['header'].get('type')) + + if not is_need_to_log(event_type, conf_event): + return data + + message = parser().format(msg) + if message: + data = f'{f"[{event_type}]".upper():<{7}} {message}' + + return data + + +def worker(ct: IPRoute, shutdown_event: multiprocessing.Event, conf_event: Dict) -> None: + """ + Main function of parser worker process + """ + process_name = multiprocessing.current_process().name + logger.debug(f'[{process_name}] started') + timeout = 0.1 + while not shutdown_event.is_set(): + if not ct.buffer_queue.empty(): + msg = None + try: + for msg in ct.get(): + message = parse_event(msg, conf_event) + if message: + if logger.level == logging.DEBUG: + logger.debug(f'[{process_name}]: {message} raw: {msg}') + else: + logger.info(message) + except queue.Full: + logger.error('IPRoute message queue if full.') + except UnsupportedMessageType as e: + logger.debug(f'{e} =====> raw msg: {msg}') + except Exception as e: + logger.error(f'Unexpected error: {e.__class__} {e} [{msg}]') + else: + sleep(timeout) + + +if __name__ == '__main__': + parser = argparse.ArgumentParser() + parser.add_argument( + '-c', + '--config', + action='store', + help='Path to vyos-network-event-logger configuration', + required=True, + type=Path, + ) + + args = parser.parse_args() + try: + config = read_json(args.config) + except Exception as err: + logger.error(f'Configuration file "{args.config}" does not exist or malformed: {err}') + exit(1) + + set_log_level(config.get('log_level', 'info')) + + signal.signal(signal.SIGHUP, sig_handler) + signal.signal(signal.SIGTERM, sig_handler) + + if 'event' in config: + event_groups = list(config.get('event').keys()) + else: + logger.error(f'Configuration is wrong. Event filter is empty.') + exit(1) + + conf_event = config['event'] + qsize = config.get('queue_size') + ct = IPRoute(async_qsize=int(qsize) if qsize else None) + ct.buffer_queue = multiprocessing.Queue(ct.async_qsize) + ct.bind(async_cache=True) + + processes = list() + try: + for _ in range(multiprocessing.cpu_count()): + p = multiprocessing.Process(target=worker, args=(ct, shutdown_event, conf_event)) + processes.append(p) + p.start() + logger.info('IPRoute socket bound and listening for messages.') + + while not shutdown_event.is_set(): + if not ct.pthread.is_alive(): + if ct.buffer_queue.qsize() / ct.async_qsize < 0.9: + if not shutdown_event.is_set(): + logger.debug('Restart listener thread') + # restart listener thread after queue overloaded when queue size low than 90% + ct.pthread = threading.Thread(name='Netlink async cache', target=ct.async_recv) + ct.pthread.daemon = True + ct.pthread.start() + else: + sleep(0.1) + finally: + for p in processes: + p.join() + if not p.is_alive(): + logger.debug(f'[{p.name}]: finished') + ct.close() + logging.info('IPRoute socket closed.') + exit() |