summaryrefslogtreecommitdiff
path: root/src/op_mode/neighbor.py
blob: 8b3c45c7c34d4b380c93d74dced479120f084d16 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
#!/usr/bin/env python3
#
# Copyright (C) 2022-2023 VyOS maintainers and contributors
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 or later as
# published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

# Sample output of `ip --json neigh list`:
#
# [
#   {
#     "dst": "192.168.1.1",
#     "dev": "eth0",                 # Missing if `dev ...` option is used
#     "lladdr": "00:aa:bb:cc:dd:ee", # May be missing for failed entries
#     "state": [
#       "REACHABLE"
#     ]
#  },
# ]

import sys
import typing

import vyos.opmode
from vyos.utils.network import interface_exists

ArgFamily = typing.Literal['inet', 'inet6']
ArgState = typing.Literal['reachable', 'stale', 'failed', 'permanent']

def get_raw_data(family, interface=None, state=None):
    from json import loads
    from vyos.utils.process import cmd

    if interface:
        if not interface_exists(interface):
            raise ValueError(f"Interface '{interface}' does not exist in the system")
        interface = f"dev {interface}"
    else:
        interface = ""

    if state:
        state = f"nud {state}"
    else:
        state = ""

    neigh_cmd = f"ip --family {family} --json neighbor list {interface} {state}"

    data = loads(cmd(neigh_cmd))

    return data

def format_neighbors(neighs, interface=None):
    from tabulate import tabulate

    def entry_to_list(e, intf=None):
        dst = e["dst"]

        # State is always a list in the iproute2 output
        state = ", ".join(e["state"])

        # Link layer address is absent from e.g. FAILED entries
        if "lladdr" in e:
            lladdr = e["lladdr"]
        else:
            lladdr = None

        # Device field is absent from outputs of `ip neigh list dev ...`
        if "dev" in e:
            dev = e["dev"]
        elif interface:
            dev = interface
        else:
            raise ValueError("interface is not defined")

        return [dst, dev, lladdr, state]

    neighs = map(entry_to_list, neighs)

    headers = ["Address", "Interface", "Link layer address",  "State"]
    return tabulate(neighs, headers)

def show(raw: bool, family: ArgFamily, interface: typing.Optional[str],
         state: typing.Optional[ArgState]):
    """ Display neighbor table contents """
    data = get_raw_data(family, interface, state=state)

    if raw:
        return data
    else:
        return format_neighbors(data, interface)

def reset(family: ArgFamily, interface: typing.Optional[str], address: typing.Optional[str]):
    from vyos.utils.process import run

    if address and interface:
        raise ValueError("interface and address parameters are mutually exclusive")
    elif address:
        run(f"""ip --family {family} neighbor flush to {address}""")
    elif interface:
        run(f"""ip --family {family} neighbor flush dev {interface}""")
    else:
        # Flush an entire neighbor table
        run(f"""ip --family {family} neighbor flush""")

if __name__ == '__main__':
    try:
        res = vyos.opmode.run(sys.modules[__name__])
        if res:
            print(res)
    except (ValueError, vyos.opmode.Error) as e:
        print(e)
        sys.exit(1)