#!/usr/bin/env python3 # # Copyright (C) 2025 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as # published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . import argparse import logging import multiprocessing import queue import signal import socket import threading from pathlib import Path from time import sleep from typing import Dict, AnyStr, List, Union from pyroute2.common import AF_MPLS from pyroute2.iproute import IPRoute from pyroute2.netlink import rtnl, nlmsg from pyroute2.netlink.nfnetlink.nfctsocket import nfct_msg from pyroute2.netlink.rtnl import (rt_proto as RT_PROTO, rt_type as RT_TYPES, rtypes as RTYPES ) from pyroute2.netlink.rtnl.fibmsg import FR_ACT_GOTO, FR_ACT_NOP, FR_ACT_TO_TBL, \ fibmsg from pyroute2.netlink.rtnl import ifaddrmsg from pyroute2.netlink.rtnl import ifinfmsg from pyroute2.netlink.rtnl import ndmsg from pyroute2.netlink.rtnl import rtmsg from pyroute2.netlink.rtnl.rtmsg import nh, rtmsg_base from vyos.include.uapi.linux.fib_rules import * from vyos.include.uapi.linux.icmpv6 import * from vyos.include.uapi.linux.if_arp import * from vyos.include.uapi.linux.lwtunnel import * from vyos.include.uapi.linux.neighbour import * from vyos.include.uapi.linux.rtnetlink import * from vyos.utils.file import read_json manager = multiprocessing.Manager() cache = manager.dict() class UnsupportedMessageType(Exception): pass shutdown_event = multiprocessing.Event() logging.basicConfig(level=logging.INFO, format='%(message)s') logger = logging.getLogger(__name__) class DebugFormatter(logging.Formatter): def format(self, record): self._style._fmt = '[%(asctime)s] %(levelname)s: %(message)s' return super().format(record) def set_log_level(level: str) -> None: if level == 'debug': logger.setLevel(logging.DEBUG) logger.parent.handlers[0].setFormatter(DebugFormatter()) else: logger.setLevel(logging.INFO) IFF_FLAGS = { 'RUNNING': ifinfmsg.IFF_RUNNING, 'LOOPBACK': ifinfmsg.IFF_LOOPBACK, 'BROADCAST': ifinfmsg.IFF_BROADCAST, 'POINTOPOINT': ifinfmsg.IFF_POINTOPOINT, 'MULTICAST': ifinfmsg.IFF_MULTICAST, 'NOARP': ifinfmsg.IFF_NOARP, 'ALLMULTI': ifinfmsg.IFF_ALLMULTI, 'PROMISC': ifinfmsg.IFF_PROMISC, 'MASTER': ifinfmsg.IFF_MASTER, 'SLAVE': ifinfmsg.IFF_SLAVE, 'DEBUG': ifinfmsg.IFF_DEBUG, 'DYNAMIC': ifinfmsg.IFF_DYNAMIC, 'AUTOMEDIA': ifinfmsg.IFF_AUTOMEDIA, 'PORTSEL': ifinfmsg.IFF_PORTSEL, 'NOTRAILERS': ifinfmsg.IFF_NOTRAILERS, 'UP': ifinfmsg.IFF_UP, 'LOWER_UP': ifinfmsg.IFF_LOWER_UP, 'DORMANT': ifinfmsg.IFF_DORMANT, 'ECHO': ifinfmsg.IFF_ECHO, } NEIGH_STATE_FLAGS = { 'INCOMPLETE': ndmsg.NUD_INCOMPLETE, 'REACHABLE': ndmsg.NUD_REACHABLE, 'STALE': ndmsg.NUD_STALE, 'DELAY': ndmsg.NUD_DELAY, 'PROBE': ndmsg.NUD_PROBE, 'FAILED': ndmsg.NUD_FAILED, 'NOARP': ndmsg.NUD_NOARP, 'PERMANENT': ndmsg.NUD_PERMANENT, } IFA_FLAGS = { 'secondary': ifaddrmsg.IFA_F_SECONDARY, 'temporary': ifaddrmsg.IFA_F_SECONDARY, 'nodad': ifaddrmsg.IFA_F_NODAD, 'optimistic': ifaddrmsg.IFA_F_OPTIMISTIC, 'dadfailed': ifaddrmsg.IFA_F_DADFAILED, 'home': ifaddrmsg.IFA_F_HOMEADDRESS, 'deprecated': ifaddrmsg.IFA_F_DEPRECATED, 'tentative': ifaddrmsg.IFA_F_TENTATIVE, 'permanent': ifaddrmsg.IFA_F_PERMANENT, 'mngtmpaddr': ifaddrmsg.IFA_F_MANAGETEMPADDR, 'noprefixroute': ifaddrmsg.IFA_F_NOPREFIXROUTE, 'autojoin': ifaddrmsg.IFA_F_MCAUTOJOIN, 'stable-privacy': ifaddrmsg.IFA_F_STABLE_PRIVACY, } RT_SCOPE_TO_NAME = { rtmsg.RT_SCOPE_UNIVERSE: 'global', rtmsg.RT_SCOPE_SITE: 'site', rtmsg.RT_SCOPE_LINK: 'link', rtmsg.RT_SCOPE_HOST: 'host', rtmsg.RT_SCOPE_NOWHERE: 'nowhere', } FAMILY_TO_NAME = { socket.AF_INET: 'inet', socket.AF_INET6: 'inet6', socket.AF_PACKET: 'link', AF_MPLS: 'mpls', socket.AF_BRIDGE: 'bridge', } _INFINITY = 4294967295 def _get_iif_name(idx: int) -> str: """ Retrieves the interface name associated with a given index. """ try: if_info = IPRoute().link("get", index=idx) if if_info: return if_info[0].get_attr('IFLA_IFNAME') except Exception as e: pass return '' def remember_if_index(idx: int, event_type: int) -> None: """ Manages the caching of network interface names based on their index and event type. - For RTM_DELLINK event, the interface name is removed from the cache if exists. - For RTM_NEWLINK event, the interface name is retrieved and updated in the cache. """ name = cache.get(idx) if name: if event_type == rtnl.RTM_DELLINK: del cache[idx] else: name = _get_iif_name(idx) if name: cache[idx] = name else: cache[idx] = _get_iif_name(idx) class BaseFormatter: """ A base class providing utility methods for formatting network message data. """ def _get_if_name_by_index(self, idx: int) -> str: """ Retrieves the name of a network interface based on its index. Uses a cached lookup for efficiency. If the name is not found in the cache, it queries the system and updates the cache. """ if_name = cache.get(idx) if not if_name: if_name = _get_iif_name(idx) cache[idx] = if_name return if_name def _format_rttable(self, idx: int) -> str: """ Formats a route table identifier into a readable name. """ return f'{RT_TABLE_TO_NAME.get(idx, idx)}' def _parse_flag(self, data: int, flags: dict) -> list: """ Extracts and returns flag names equal the bits set in a numeric value. """ result = list() if data: for key, val in flags.items(): if data & val: result.append(key) data &= ~val if data: result.append(f"{data:#x}") return result def af_bit_len(self, af: int) -> int: """ Gets the bit length of a given address family. Supports common address families like IPv4, IPv6, and MPLS. """ _map = { socket.AF_INET6: 128, socket.AF_INET: 32, AF_MPLS: 20, } return _map.get(af) def _format_simple_field(self, data: str, prefix: str='') -> str: """ Formats a simple field with an optional prefix. A simple field represents a value that does not require additional parsing and is used as is. """ return self._output(f'{prefix} {data}') if data is not None else '' def _output(self, data: str) -> str: """ Standardizes the output format. Ensures that the output is enclosed with single spaces and has no leading or trailing whitespace. """ return f' {data.strip()} ' if data else '' class BaseMSGFormatter(BaseFormatter): """ A base formatter class for network messages. This class provides common methods for formatting network-related messages, """ def _prepare_start_message(self, event: str) -> str: """ Prepares a starting message string based on the event type. """ if event in ['RTM_DELROUTE', 'RTM_DELLINK', 'RTM_DELNEIGH', 'RTM_DELADDR', 'RTM_DELADDRLABEL', 'RTM_DELRULE', 'RTM_DELNETCONF']: return 'Deleted ' if event == 'RTM_GETNEIGH': return 'Miss ' return '' def _format_flow_field(self, data: int) -> str: """ Formats a flow field to represent traffic realms. """ to = data & 0xFFFF from_ = data >> 16 result = f"realm{'s' if from_ else ''} " if from_: result += f'{from_}/' result += f'{to}' return self._output(result) def format(self, msg: nlmsg) -> str: """ Abstract method to format a complete message. This method must be implemented by subclasses to provide specific formatting logic for different types of messages. """ raise NotImplementedError(f'{msg.get("event")}: {msg}') class LinkFormatter(BaseMSGFormatter): """ A formatter class for handling link-related network messages `RTM_NEWLINK` and `RTM_DELLINK`. """ def _format_iff_flags(self, flags: int) -> str: """ Formats interface flags into a human-readable string. """ result = list() if flags: if flags & IFF_FLAGS['UP'] and not flags & IFF_FLAGS['RUNNING']: result.append('NO-CARRIER') flags &= ~IFF_FLAGS['RUNNING'] result.extend(self._parse_flag(flags, IFF_FLAGS)) return self._output(f'<{(",").join(result)}>') def _format_if_props(self, data: ifinfmsg.ifinfbase.proplist) -> str: """ Formats interface alternative name properties. """ result = '' for rec in data.altnames(): result += f'[altname {rec}] ' return self._output(result) def _format_link(self, msg: ifinfmsg.ifinfmsg) -> str: """ Formats the link attribute of a network interface message. """ if msg.get_attr("IFLA_LINK") is not None: iflink = msg.get_attr("IFLA_LINK") if iflink: if msg.get_attr("IFLA_LINK_NETNSID"): return f'if{iflink}' else: return self._get_if_name_by_index(iflink) return 'NONE' def _format_link_info(self, msg: ifinfmsg.ifinfmsg) -> str: """ Formats detailed information about the link, including type, address, broadcast address, and permanent address. """ result = f'link/{ARPHRD_TO_NAME.get(msg.get("ifi_type"), msg.get("ifi_type"))}' result += self._format_simple_field(msg.get_attr('IFLA_ADDRESS')) if msg.get_attr("IFLA_BROADCAST"): if msg.get('flags') & ifinfmsg.IFF_POINTOPOINT: result += f' peer' else: result += f' brd' result += f' {msg.get_attr("IFLA_BROADCAST")}' if msg.get_attr("IFLA_PERM_ADDRESS"): if not msg.get_attr("IFLA_ADDRESS") or \ msg.get_attr("IFLA_ADDRESS") != msg.get_attr("IFLA_PERM_ADDRESS"): result += f' permaddr {msg.get_attr("IFLA_PERM_ADDRESS")}' return self._output(result) def format(self, msg: ifinfmsg.ifinfmsg): """ Formats a network link message into a structured output string. """ if msg.get("family") not in [socket.AF_UNSPEC, socket.AF_BRIDGE]: return None message = self._prepare_start_message(msg.get('event')) link = self._format_link(msg) message += f'{msg.get("index")}: {msg.get_attr("IFLA_IFNAME")}' message += f'@{link}' if link else '' message += f': {self._format_iff_flags(msg.get("flags"))}' message += self._format_simple_field(msg.get_attr('IFLA_MTU'), prefix='mtu') message += self._format_simple_field(msg.get_attr('IFLA_QDISC'), prefix='qdisc') message += self._format_simple_field(msg.get_attr('IFLA_OPERSTATE'), prefix='state') message += self._format_simple_field(msg.get_attr('IFLA_GROUP'), prefix='group') message += self._format_simple_field(msg.get_attr('IFLA_MASTER'), prefix='master') message += self._format_link_info(msg) if msg.get_attr('IFLA_PROP_LIST'): message += self._format_if_props(msg.get_attr('IFLA_PROP_LIST')) return self._output(message) class EncapFormatter(BaseFormatter): """ A formatter class for handling encapsulation attributes in routing messages. """ # TODO: implement other lwtunnel decoder in pyroute2 # https://github.com/svinota/pyroute2/blob/78cfe838bec8d96324811a3962bda15fb028e0ce/pyroute2/netlink/rtnl/rtmsg.py#L657 def __init__(self): """ Initializes the EncapFormatter with supported encapsulation types. """ self.formatters = { rtmsg.LWTUNNEL_ENCAP_MPLS: self.mpls_format, rtmsg.LWTUNNEL_ENCAP_SEG6: self.seg6_format, rtmsg.LWTUNNEL_ENCAP_BPF: self.bpf_format, rtmsg.LWTUNNEL_ENCAP_SEG6_LOCAL: self.seg6local_format, } def _format_srh(self, data: rtmsg_base.seg6_encap_info.ipv6_sr_hdr): """ Formats Segment Routing Header (SRH) attributes. """ result = '' # pyroute2 decode mode only as inline or encap (encap, l2encap, encap.red, l2encap.red") # https://github.com/svinota/pyroute2/blob/78cfe838bec8d96324811a3962bda15fb028e0ce/pyroute2/netlink/rtnl/rtmsg.py#L220 for key in ['mode', 'segs']: val = data.get(key) if val: if key == 'segs': result += f'{key} {len(val)} {val} ' else: result += f'{key} {val} ' return self._output(result) def _format_bpf_object(self, data: rtmsg_base.bpf_encap_info, attr_name: str, attr_key: str): """ Formats eBPF program attributes. """ attr = data.get_attr(attr_name) if not attr: return '' result = '' if attr.get_attr("LWT_BPF_PROG_NAME"): result += f'{attr.get_attr("LWT_BPF_PROG_NAME")} ' if attr.get_attr("LWT_BPF_PROG_FD"): result += f'{attr.get_attr("LWT_BPF_PROG_FD")} ' return self._output(f'{attr_key} {result.strip()}') def mpls_format(self, data: rtmsg_base.mpls_encap_info): """ Formats MPLS encapsulation attributes. """ result = '' if data.get_attr("MPLS_IPTUNNEL_DST"): for rec in data.get_attr("MPLS_IPTUNNEL_DST"): for key, val in rec.items(): if val: result += f'{key} {val} ' if data.get_attr("MPLS_IPTUNNEL_TTL"): result += f' ttl {data.get_attr("MPLS_IPTUNNEL_TTL")}' return self._output(result) def bpf_format(self, data: rtmsg_base.bpf_encap_info): """ Formats eBPF encapsulation attributes. """ result = '' result += self._format_bpf_object(data, 'LWT_BPF_IN', 'in') result += self._format_bpf_object(data, 'LWT_BPF_OUT', 'out') result += self._format_bpf_object(data, 'LWT_BPF_XMIT', 'xmit') if data.get_attr('LWT_BPF_XMIT_HEADROOM'): result += f'headroom {data.get_attr("LWT_BPF_XMIT_HEADROOM")} ' return self._output(result) def seg6_format(self, data: rtmsg_base.seg6_encap_info): """ Formats Segment Routing (SEG6) encapsulation attributes. """ result = '' if data.get_attr("SEG6_IPTUNNEL_SRH"): result += self._format_srh(data.get_attr("SEG6_IPTUNNEL_SRH")) return self._output(result) def seg6local_format(self, data: rtmsg_base.seg6local_encap_info): """ Formats SEG6 local encapsulation attributes. """ result = '' formatters = { 'SEG6_LOCAL_ACTION': lambda val: f' action {next((k for k, v in data.action.actions.items() if v == val), "unknown")}', 'SEG6_LOCAL_SRH': lambda val: f' {self._format_srh(val)}', 'SEG6_LOCAL_TABLE': lambda val: f' table {self._format_rttable(val)}', 'SEG6_LOCAL_NH4': lambda val: f' nh4 {val}', 'SEG6_LOCAL_NH6': lambda val: f' nh6 {val}', 'SEG6_LOCAL_IIF': lambda val: f' iif {self._get_if_name_by_index(val)}', 'SEG6_LOCAL_OIF': lambda val: f' oif {self._get_if_name_by_index(val)}', 'SEG6_LOCAL_BPF': lambda val: f' endpoint {val.get("LWT_BPF_PROG_NAME")}', 'SEG6_LOCAL_VRFTABLE': lambda val: f' vrftable {self._format_rttable(val)}', } for rec in data.get('attrs'): if rec[0] in formatters: result += formatters[rec[0]](rec[1]) return self._output(result) def format(self, type: int, data: Union[rtmsg_base.mpls_encap_info, rtmsg_base.bpf_encap_info, rtmsg_base.seg6_encap_info, rtmsg_base.seg6local_encap_info]): """ Formats encapsulation attributes based on their type. """ result = '' formatter = self.formatters.get(type) result += f'encap {ENCAP_TO_NAME.get(type, "unknown")}' if formatter: result += f' {formatter(data)}' return self._output(result) class RouteFormatter(BaseMSGFormatter): """ A formatter class for handling network routing messages `RTM_NEWROUTE` and `RTM_DELROUTE`. """ def _format_rt_flags(self, flags: int) -> str: """ Formats route flags into a comma-separated string. """ result = list() result.extend(self._parse_flag(flags, RT_FlAGS)) return self._output(",".join(result)) def _format_rta_encap(self, type: int, data: Union[rtmsg_base.mpls_encap_info, rtmsg_base.bpf_encap_info, rtmsg_base.seg6_encap_info, rtmsg_base.seg6local_encap_info]) -> str: """ Formats encapsulation attributes. """ return EncapFormatter().format(type, data) def _format_rta_newdest(self, data: str) -> str: """ Formats a new destination attribute. """ return self._output(f'as to {data}') def _format_rta_gateway(self, data: str) -> str: """ Formats a gateway attribute. """ return self._output(f'via {data}') def _format_rta_via(self, data: str) -> str: """ Formats a 'via' route attribute. """ return self._output(f'{data}') def _format_rta_metrics(self, data: rtmsg_base.metrics): """ Formats routing metrics. """ result = '' def __format_metric_time(_val: int) -> str: """Formats metric time values into seconds or milliseconds.""" return f"{_val / 1000}s" if _val >= 1000 else f"{_val}ms" def __format_reatures(_val: int) -> str: """Parse and formats routing feature flags.""" result = self._parse_flag(_val, {'ecn': RTAX_FEATURE_ECN, 'tcp_usec_ts': RTAX_FEATURE_TCP_USEC_TS}) return ",".join(result) formatters = { 'RTAX_MTU': lambda val: f' mtu {val}', 'RTAX_WINDOW': lambda val: f' window {val}', 'RTAX_RTT': lambda val: f' rtt {__format_metric_time(val / 8)}', 'RTAX_RTTVAR': lambda val: f' rttvar {__format_metric_time(val / 4)}', 'RTAX_SSTHRESH': lambda val: f' ssthresh {val}', 'RTAX_CWND': lambda val: f' cwnd {val}', 'RTAX_ADVMSS': lambda val: f' advmss {val}', 'RTAX_REORDERING': lambda val: f' reordering {val}', 'RTAX_HOPLIMIT': lambda val: f' hoplimit {val}', 'RTAX_INITCWND': lambda val: f' initcwnd {val}', 'RTAX_FEATURES': lambda val: f' features {__format_reatures(val)}', 'RTAX_RTO_MIN': lambda val: f' rto_min {__format_metric_time(val)}', 'RTAX_INITRWND': lambda val: f' initrwnd {val}', 'RTAX_QUICKACK': lambda val: f' quickack {val}', } for rec in data.get('attrs'): if rec[0] in formatters: result += formatters[rec[0]](rec[1]) return self._output(result) def _format_rta_pref(self, data: int) -> str: """ Formats a pref attribute. """ pref = { ICMPV6_ROUTER_PREF_LOW: "low", ICMPV6_ROUTER_PREF_MEDIUM: "medium", ICMPV6_ROUTER_PREF_HIGH: "high", } return self._output(f' pref {pref.get(data, data)}') def _format_rta_multipath(self, mcast_cloned: bool, family: int, data: List[nh]) -> str: """ Formats multipath route attributes. """ result = '' first = True for rec in data: if mcast_cloned: if first: result += ' Oifs: ' first = False else: result += ' ' else: result += ' nexthop ' if rec.get_attr('RTA_ENCAP'): result += self._format_rta_encap(rec.get_attr('RTA_ENCAP_TYPE'), rec.get_attr('RTA_ENCAP')) if rec.get_attr('RTA_NEWDST'): result += self._format_rta_newdest(rec.get_attr('RTA_NEWDST')) if rec.get_attr('RTA_GATEWAY'): result += self._format_rta_gateway(rec.get_attr('RTA_GATEWAY')) if rec.get_attr('RTA_VIA'): result += self._format_rta_via(rec.get_attr('RTA_VIA')) if rec.get_attr('RTA_FLOW'): result += self._format_flow_field(rec.get_attr('RTA_FLOW')) result += f' dev {self._get_if_name_by_index(rec.get("oif"))}' if mcast_cloned: if rec.get("hops") != 1: result += f' (ttl>{rec.get("hops")})' else: if family != AF_MPLS: result += f' weight {rec.get("hops") + 1}' result += self._format_rt_flags(rec.get("flags")) return self._output(result) def format(self, msg: rtmsg.rtmsg) -> str: """ Formats a network route message into a human-readable string representation. """ message = self._prepare_start_message(msg.get('event')) message += RT_TYPES.get(msg.get('type')) if msg.get_attr('RTA_DST'): host_len = self.af_bit_len(msg.get('family')) if msg.get('dst_len') != host_len: message += f' {msg.get_attr("RTA_DST")}/{msg.get("dst_len")}' else: message += f' {msg.get_attr("RTA_DST")}' elif msg.get('dst_len'): message += f' 0/{msg.get("dst_len")}' else: message += ' default' if msg.get_attr('RTA_SRC'): message += f' from {msg.get_attr("RTA_SRC")}' elif msg.get('src_len'): message += f' from 0/{msg.get("src_len")}' message += self._format_simple_field(msg.get_attr('RTA_NH_ID'), prefix='nhid') if msg.get_attr('RTA_NEWDST'): message += self._format_rta_newdest(msg.get_attr('RTA_NEWDST')) if msg.get_attr('RTA_ENCAP'): message += self._format_rta_encap(msg.get_attr('RTA_ENCAP_TYPE'), msg.get_attr('RTA_ENCAP')) message += self._format_simple_field(msg.get('tos'), prefix='tos') if msg.get_attr('RTA_GATEWAY'): message += self._format_rta_gateway(msg.get_attr('RTA_GATEWAY')) if msg.get_attr('RTA_VIA'): message += self._format_rta_via(msg.get_attr('RTA_VIA')) if msg.get_attr('RTA_OIF') is not None: message += f' dev {self._get_if_name_by_index(msg.get_attr("RTA_OIF"))}' if msg.get_attr("RTA_TABLE"): message += f' table {self._format_rttable(msg.get_attr("RTA_TABLE"))}' if not msg.get('flags') & RTM_F_CLONED: message += f' proto {RT_PROTO.get(msg.get("proto"))}' if not msg.get('scope') == rtmsg.RT_SCOPE_UNIVERSE: message += f' scope {RT_SCOPE_TO_NAME.get(msg.get("scope"))}' message += self._format_simple_field(msg.get_attr('RTA_PREFSRC'), prefix='src') message += self._format_simple_field(msg.get_attr('RTA_PRIORITY'), prefix='metric') message += self._format_rt_flags(msg.get("flags")) if msg.get_attr('RTA_MARK'): mark = msg.get_attr("RTA_MARK") if mark >= 16: message += f' mark 0x{mark:x}' else: message += f' mark {mark}' if msg.get_attr('RTA_FLOW'): message += self._format_flow_field(msg.get_attr('RTA_FLOW')) message += self._format_simple_field(msg.get_attr('RTA_UID'), prefix='uid') if msg.get_attr('RTA_METRICS'): message += self._format_rta_metrics(msg.get_attr("RTA_METRICS")) if msg.get_attr('RTA_IIF') is not None: message += f' iif {self._get_if_name_by_index(msg.get_attr("RTA_IIF"))}' if msg.get_attr('RTA_PREF') is not None: message += self._format_rta_pref(msg.get_attr("RTA_PREF")) if msg.get_attr('RTA_TTL_PROPAGATE') is not None: message += f' ttl-propogate {"enabled" if msg.get_attr("RTA_TTL_PROPAGATE") else "disabled"}' if msg.get_attr('RTA_MULTIPATH') is not None: _tmp = self._format_rta_multipath( mcast_cloned=msg.get('flags') & RTM_F_CLONED and msg.get('type') == RTYPES['RTN_MULTICAST'], family=msg.get('family'), data=msg.get_attr("RTA_MULTIPATH")) message += f' {_tmp}' return self._output(message) class AddrFormatter(BaseMSGFormatter): """ A formatter class for handling address-related network messages `RTM_NEWADDR` and `RTM_DELADDR`. """ INFINITY_LIFE_TIME = _INFINITY def _format_ifa_flags(self, flags: int, family: int) -> str: """ Formats address flags into a human-readable string. """ result = list() if flags: if not flags & IFA_FLAGS['permanent']: result.append('dynamic') flags &= ~IFA_FLAGS['permanent'] if flags & IFA_FLAGS['temporary'] and family == socket.AF_INET6: result.append('temporary') flags &= ~IFA_FLAGS['temporary'] result.extend(self._parse_flag(flags, IFA_FLAGS)) return self._output(",".join(result)) def _format_ifa_addr(self, local: str, addr: str, preflen: int, priority: int) -> str: """ Formats address information into a shuman-readable string. """ result = '' local = local or addr addr = addr or local if local: result += f'{local}' if addr and addr != local: result += f' peer {addr}' result += f'/{preflen}' if priority: result += f' {priority}' return self._output(result) def _format_ifa_cacheinfo(self, data: ifaddrmsg.ifaddrmsg.cacheinfo) -> str: """ Formats cache information for an address. """ result = '' _map = { 'ifa_valid': 'valid_lft', 'ifa_preferred': 'preferred_lft', } for key in ['ifa_valid', 'ifa_preferred']: val = data.get(key) if val == self.INFINITY_LIFE_TIME: result += f'{_map.get(key)} forever ' else: result += f'{_map.get(key)} {val}sec ' return self._output(result) def format(self, msg: ifaddrmsg.ifaddrmsg) -> str: """ Formats a full network address message. Combine attributes such as index, family, address, flags, and cache information into a structured output string. """ message = self._prepare_start_message(msg.get('event')) message += f'{msg.get("index")}: {self._get_if_name_by_index(msg.get("index"))} ' message += f'{FAMILY_TO_NAME.get(msg.get("family"), msg.get("family"))} ' message += self._format_ifa_addr( msg.get_attr('IFA_LOCAL'), msg.get_attr('IFA_ADDRESS'), msg.get('prefixlen'), msg.get_attr('IFA_RT_PRIORITY') ) message += self._format_simple_field(msg.get_attr('IFA_BROADCAST'), prefix='brd') message += self._format_simple_field(msg.get_attr('IFA_ANYCAST'), prefix='any') if msg.get('scope') is not None: message += f' scope {RT_SCOPE_TO_NAME.get(msg.get("scope"))}' message += self._format_ifa_flags(msg.get_attr("IFA_FLAGS"), msg.get("family")) message += self._format_simple_field(msg.get_attr('IFA_LABEL'), prefix='label:') if msg.get_attr('IFA_CACHEINFO'): message += self._format_ifa_cacheinfo(msg.get_attr('IFA_CACHEINFO')) return self._output(message) class NeighFormatter(BaseMSGFormatter): """ A formatter class for handling neighbor-related network messages `RTM_NEWNEIGH`, `RTM_DELNEIGH` and `RTM_GETNEIGH` """ def _format_ntf_flags(self, flags: int) -> str: """ Formats neighbor table entry flags into a human-readable string. """ result = list() result.extend(self._parse_flag(flags, NTF_FlAGS)) return self._output(",".join(result)) def _format_neigh_state(self, data: int) -> str: """ Formats the state of a neighbor entry. """ result = list() result.extend(self._parse_flag(data, NEIGH_STATE_FLAGS)) return self._output(",".join(result)) def format(self, msg: ndmsg.ndmsg) -> str: """ Formats a full neighbor-related network message. Combine attributes such as destination, device, link-layer address, flags, state, and protocol into a structured output string. """ message = self._prepare_start_message(msg.get('event')) message += self._format_simple_field(msg.get_attr('NDA_DST'), prefix='') if msg.get("ifindex") is not None: message += f' dev {self._get_if_name_by_index(msg.get("ifindex"))}' message += self._format_simple_field(msg.get_attr('NDA_LLADDR'), prefix='lladdr') message += f' {self._format_ntf_flags(msg.get("flags"))}' message += f' {self._format_neigh_state(msg.get("state"))}' if msg.get_attr('NDA_PROTOCOL'): message += f' proto {RT_PROTO.get(msg.get_attr("NDA_PROTOCOL"), msg.get_attr("NDA_PROTOCOL"))}' return self._output(message) class RuleFormatter(BaseMSGFormatter): """ A formatter class for handling ruting tule network messages `RTM_NEWRULE` and `RTM_DELRULE` """ def _format_direction(self, data: str, length: int, host_len: int): """ Formats the direction of traffic based on source or destination and prefix length. """ result = '' if data: result += f' {data}' if length != host_len: result += f'/{length}' elif length: result += f' 0/{length}' return self._output(result) def _format_fra_interface(self, data: str, flags: int, prefix: str): """ Formats interface-related attributes. """ result = f'{prefix} {data}' if flags & FIB_RULE_IIF_DETACHED: result += '[detached]' return self._output(result) def _format_fra_range(self, data: [str, dict], prefix: str): """ Formats a range of values (e.g., UID, sport, or dport). """ result = '' if data: if isinstance(data, str): result += f' {prefix} {data}' else: result += f' {prefix} {data.get("start")}:{data.get("end")}' return self._output(result) def _format_fra_table(self, msg: fibmsg): """ Formats the lookup table and associated attributes in the message. """ def __format_field(data: int, prefix: str): if data and data not in [-1, _INFINITY]: return f' {prefix} {data}' return '' result = '' table = msg.get_attr('FRA_TABLE') or msg.get('table') if table: result += f' lookup {self._format_rttable(table)}' result += __format_field(msg.get_attr('FRA_SUPPRESS_PREFIXLEN'), 'suppress_prefixlength') result += __format_field(msg.get_attr('FRA_SUPPRESS_IFGROUP'), 'suppress_ifgroup') return self._output(result) def _format_fra_action(self, msg: fibmsg): """ Formats the action associated with the rule. """ result = '' if msg.get('action') == RTYPES.get('RTN_NAT'): if msg.get_attr('RTA_GATEWAY'): # looks like deprecated but still use in iproute2 result += f' map-to {msg.get_attr("RTA_GATEWAY")}' else: result += ' masquerade' elif msg.get('action') == FR_ACT_GOTO: result += f' goto {msg.get_attr("FRA_GOTO") or "none"}' if msg.get('flags') & FIB_RULE_UNRESOLVED: result += ' [unresolved]' elif msg.get('action') == FR_ACT_NOP: result += ' nop' elif msg.get('action') != FR_ACT_TO_TBL: result += f' {RTYPES.get(msg.get("action"))}' return self._output(result) def format(self, msg: fibmsg): """ Formats a complete routing rule message. Combines information about source, destination, interfaces, actions, and other attributes into a single formatted string. """ message = self._prepare_start_message(msg.get('event')) host_len = self.af_bit_len(msg.get('family')) message += self._format_simple_field(msg.get_attr('FRA_PRIORITY'), prefix='') if msg.get('flags') & FIB_RULE_INVERT: message += ' not' tmp = self._format_direction(msg.get_attr('FRA_SRC'), msg.get('src_len'), host_len) message += ' from' + (tmp if tmp else ' all ') if msg.get_attr('FRA_DST'): tmp = self._format_direction(msg.get_attr('FRA_DST'), msg.get('dst_len'), host_len) message += ' to' + tmp if msg.get('tos'): message += f' tos {hex(msg.get("tos"))}' if msg.get_attr('FRA_FWMARK') or msg.get_attr('FRA_FWMASK'): mark = msg.get_attr('FRA_FWMARK') or 0 mask = msg.get_attr('FRA_FWMASK') or 0 if mask != 0xFFFFFFFF: message += f' fwmark {mark}/{mask}' else: message += f' fwmark {mark}' if msg.get_attr('FRA_IIFNAME'): message += self._format_fra_interface( msg.get_attr('FRA_IIFNAME'), msg.get('flags'), 'iif' ) if msg.get_attr('FRA_OIFNAME'): message += self._format_fra_interface( msg.get_attr('FRA_OIFNAME'), msg.get('flags'), 'oif' ) if msg.get_attr('FRA_L3MDEV'): message += f' lookup [l3mdev-table]' if msg.get_attr('FRA_UID_RANGE'): message += self._format_fra_range(msg.get_attr('FRA_UID_RANGE'), 'uidrange') message += self._format_simple_field(msg.get_attr('FRA_IP_PROTO'), prefix='ipproto') if msg.get_attr('FRA_SPORT_RANGE'): message += self._format_fra_range(msg.get_attr('FRA_SPORT_RANGE'), 'sport') if msg.get_attr('FRA_DPORT_RANGE'): message += self._format_fra_range(msg.get_attr('FRA_DPORT_RANGE'), 'dport') message += self._format_simple_field(msg.get_attr('FRA_TUN_ID'), prefix='tun_id') message += self._format_fra_table(msg) if msg.get_attr('FRA_FLOW'): message += self._format_flow_field(msg.get_attr('FRA_FLOW')) message += self._format_fra_action(msg) if msg.get_attr('FRA_PROTOCOL'): message += f' proto {RT_PROTO.get(msg.get_attr("FRA_PROTOCOL"), msg.get_attr("FRA_PROTOCOL"))}' return self._output(message) class AddrlabelFormatter(BaseMSGFormatter): # Not implemented decoder on pytroute2 but ip monitor use it message pass class PrefixFormatter(BaseMSGFormatter): # Not implemented decoder on pytroute2 but ip monitor use it message pass class NetconfFormatter(BaseMSGFormatter): # Not implemented decoder on pytroute2 but ip monitor use it message pass EVENT_MAP = { rtnl.RTM_NEWROUTE: {'parser': RouteFormatter, 'event': 'route'}, rtnl.RTM_DELROUTE: {'parser': RouteFormatter, 'event': 'route'}, rtnl.RTM_NEWLINK: {'parser': LinkFormatter, 'event': 'link'}, rtnl.RTM_DELLINK: {'parser': LinkFormatter, 'event': 'link'}, rtnl.RTM_NEWADDR: {'parser': AddrFormatter, 'event': 'addr'}, rtnl.RTM_DELADDR: {'parser': AddrFormatter, 'event': 'addr'}, # rtnl.RTM_NEWADDRLABEL: {'parser': AddrlabelFormatter, 'event': 'addrlabel'}, # rtnl.RTM_DELADDRLABEL: {'parser': AddrlabelFormatter, 'event': 'addrlabel'}, rtnl.RTM_NEWNEIGH: {'parser': NeighFormatter, 'event': 'neigh'}, rtnl.RTM_DELNEIGH: {'parser': NeighFormatter, 'event': 'neigh'}, rtnl.RTM_GETNEIGH: {'parser': NeighFormatter, 'event': 'neigh'}, # rtnl.RTM_NEWPREFIX: {'parser': PrefixFormatter, 'event': 'prefix'}, rtnl.RTM_NEWRULE: {'parser': RuleFormatter, 'event': 'rule'}, rtnl.RTM_DELRULE: {'parser': RuleFormatter, 'event': 'rule'}, # rtnl.RTM_NEWNETCONF: {'parser': NetconfFormatter, 'event': 'netconf'}, # rtnl.RTM_DELNETCONF: {'parser': NetconfFormatter, 'event': 'netconf'}, } def sig_handler(signum, frame): process_name = multiprocessing.current_process().name logger.debug( f'[{process_name}]: {"Shutdown" if signum == signal.SIGTERM else "Reload"} signal received...' ) shutdown_event.set() def parse_event_type(header: Dict) -> tuple: """ Extract event type and parser. """ event_type = EVENT_MAP.get(header['type'], {}).get('event', 'unknown') _parser = EVENT_MAP.get(header['type'], {}).get('parser') if _parser is None: raise UnsupportedMessageType(f'Unsupported message type: {header["type"]}') return event_type, _parser def is_need_to_log(event_type: AnyStr, conf_event: Dict): """ Filter message by event type and protocols """ conf = conf_event.get(event_type) if conf == {}: return True return False def parse_event(msg: nfct_msg, conf_event: Dict) -> str: """ Convert nfct_msg to internal data dict. """ data = '' event_type, parser = parse_event_type(msg['header']) if event_type == 'link': remember_if_index(idx=msg.get('index'), event_type=msg['header'].get('type')) if not is_need_to_log(event_type, conf_event): return data message = parser().format(msg) if message: data = f'{f"[{event_type}]".upper():<{7}} {message}' return data def worker(ct: IPRoute, shutdown_event: multiprocessing.Event, conf_event: Dict) -> None: """ Main function of parser worker process """ process_name = multiprocessing.current_process().name logger.debug(f'[{process_name}] started') timeout = 0.1 while not shutdown_event.is_set(): if not ct.buffer_queue.empty(): msg = None try: for msg in ct.get(): message = parse_event(msg, conf_event) if message: if logger.level == logging.DEBUG: logger.debug(f'[{process_name}]: {message} raw: {msg}') else: logger.info(message) except queue.Full: logger.error('IPRoute message queue if full.') except UnsupportedMessageType as e: logger.debug(f'{e} =====> raw msg: {msg}') except Exception as e: logger.error(f'Unexpected error: {e.__class__} {e} [{msg}]') else: sleep(timeout) if __name__ == '__main__': parser = argparse.ArgumentParser() parser.add_argument( '-c', '--config', action='store', help='Path to vyos-network-event-logger configuration', required=True, type=Path, ) args = parser.parse_args() try: config = read_json(args.config) except Exception as err: logger.error(f'Configuration file "{args.config}" does not exist or malformed: {err}') exit(1) set_log_level(config.get('log_level', 'info')) signal.signal(signal.SIGHUP, sig_handler) signal.signal(signal.SIGTERM, sig_handler) if 'event' in config: event_groups = list(config.get('event').keys()) else: logger.error(f'Configuration is wrong. Event filter is empty.') exit(1) conf_event = config['event'] qsize = config.get('queue_size') ct = IPRoute(async_qsize=int(qsize) if qsize else None) ct.buffer_queue = multiprocessing.Queue(ct.async_qsize) ct.bind(async_cache=True) processes = list() try: for _ in range(multiprocessing.cpu_count()): p = multiprocessing.Process(target=worker, args=(ct, shutdown_event, conf_event)) processes.append(p) p.start() logger.info('IPRoute socket bound and listening for messages.') while not shutdown_event.is_set(): if not ct.pthread.is_alive(): if ct.buffer_queue.qsize() / ct.async_qsize < 0.9: if not shutdown_event.is_set(): logger.debug('Restart listener thread') # restart listener thread after queue overloaded when queue size low than 90% ct.pthread = threading.Thread(name='Netlink async cache', target=ct.async_recv) ct.pthread.daemon = True ct.pthread.start() else: sleep(0.1) finally: for p in processes: p.join() if not p.is_alive(): logger.debug(f'[{p.name}]: finished') ct.close() logging.info('IPRoute socket closed.') exit()