1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
|
#!/usr/bin/env python3
#
# Copyright (C) 2022 VyOS maintainers and contributors
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 or later as
# published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import argparse
import re
import tabulate
from vyos.configquery import ConfigTreeQuery
from vyos.util import cmd
def get_firewall_bridge_name():
"""
Return firewall policy bridge name
Example:
set firewall policy bridge forward ipv4 'FOO'
returns 'FOO'
"""
config = ConfigTreeQuery()
name = ['firewall', 'policy', 'bridge', 'forward', 'ipv4']
if config.exists(name):
name = config.value(name)
return name
def get_firewall_config(name):
"""
Return dict firewall rules for required name
"""
config = ConfigTreeQuery()
base = ['firewall', 'name', name]
if config.exists(base):
firewall = config.get_config_dict(base, key_mangling=('-', '_'),
get_first_key=True, no_tag_node_value_mangle=True)
return firewall
def get_nftables_details(name):
command = f'sudo nft list chain bridge filter {name}'
try:
results = cmd(command)
except:
return {}
out = {}
for line in results.split('\n'):
comment_search = re.search(rf'{name}[\- ](\d+|default-action)', line)
if not comment_search:
continue
rule = {}
rule_id = comment_search[1]
counter_search = re.search(r'counter packets (\d+) bytes (\d+)', line)
if counter_search:
rule['packets'] = counter_search[1]
rule['bytes'] = counter_search[2]
rule['conditions'] = re.sub(r'(\b(counter packets \d+ bytes \d+|drop|return|log)\b|comment "[\w\-]+")', '', line).strip()
out[rule_id] = rule
return out
def output_firewall_name(name):
print(f'\n---------------------------------\nBridge Firewall "{name}"\n')
details = get_nftables_details(name)
firewall = get_firewall_config(name)
rows = []
for rule, rule_conf in firewall['rule'].items():
row = [rule, rule_conf['action'], rule_conf['protocol'] if 'protocol' in rule_conf else 'all']
rule_details = details[rule]
row.append(rule_details.get('packets', 0))
row.append(rule_details.get('bytes', 0))
row.append(rule_details['conditions'])
rows.append(row)
if 'default_action' in firewall:
row = ['default']
rule_details = details['default-action']
row.append(firewall['default_action'])
row.append('all')
row.append(rule_details.get('packets', 0))
row.append(rule_details.get('bytes', 0))
else:
row.append('0')
row.append('0')
row.append('src 0.0.0.0/0 dst 0.0.0.0/0')
rows.append(row)
if rows:
header = ['Rule', 'Action', 'Protocol', 'Packets', 'Bytes', 'Conditions']
print(tabulate.tabulate(rows, header) + '\n')
return
parser = argparse.ArgumentParser()
parser.add_argument("--summary", action="store_true", help="Show all containers")
config = ConfigTreeQuery()
base = ['firewall', 'policy']
if not config.exists(base):
print('Firewall policy not configured')
exit(0)
if __name__ == '__main__':
args = parser.parse_args()
fw_name = get_firewall_bridge_name()
if args.summary:
output_firewall_name(fw_name)
exit(0)
|