1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
|
#!/usr/bin/env python3
#
# Copyright (C) 2022 VyOS maintainers and contributors
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 or later as
# published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from vyos.template import is_ip_network
from vyos.util import dict_search_args
def parse_nat_rule(rule_conf, rule_id, nat_type, ipv6=False):
output = []
ip_prefix = 'ip6' if ipv6 else 'ip'
log_prefix = ('DST' if nat_type == 'destination' else 'SRC') + f'-NAT-{rule_id}'
log_suffix = ''
if ipv6:
log_prefix = log_prefix.replace("NAT-", "NAT66-")
ignore_type_addr = False
translation_str = ''
if 'inbound_interface' in rule_conf:
ifname = rule_conf['inbound_interface']
if ifname != 'any':
output.append(f'iifname "{ifname}"')
if 'outbound_interface' in rule_conf:
ifname = rule_conf['outbound_interface']
if ifname != 'any':
output.append(f'oifname "{ifname}"')
if 'protocol' in rule_conf and rule_conf['protocol'] != 'all':
protocol = rule_conf['protocol']
if protocol == 'tcp_udp':
protocol = '{ tcp, udp }'
output.append(f'meta l4proto {protocol}')
if 'exclude' in rule_conf:
translation_str = 'return'
log_suffix = '-EXCL'
elif 'translation' in rule_conf:
translation_prefix = nat_type[:1]
translation_output = [f'{translation_prefix}nat']
addr = dict_search_args(rule_conf, 'translation', 'address')
port = dict_search_args(rule_conf, 'translation', 'port')
if addr and is_ip_network(addr):
if not ipv6:
map_addr = dict_search_args(rule_conf, nat_type, 'address')
translation_output.append(f'{ip_prefix} prefix to {ip_prefix} {translation_prefix}addr map {{ {map_addr} : {addr} }}')
ignore_type_addr = True
else:
translation_output.append(f'prefix to {addr}')
elif addr == 'masquerade':
if port:
addr = f'{addr} to '
translation_output = [addr]
log_suffix = '-MASQ'
else:
translation_output.append('to')
if addr:
translation_output.append(addr)
options = []
addr_mapping = dict_search_args(rule_conf, 'translation', 'options', 'address_mapping')
port_mapping = dict_search_args(rule_conf, 'translation', 'options', 'port_mapping')
if addr_mapping == 'persistent':
options.append('persistent')
if port_mapping and port_mapping != 'none':
options.append(port_mapping)
translation_str = " ".join(translation_output) + (f':{port}' if port else '')
if options:
translation_str += f' {",".join(options)}'
for target in ['source', 'destination']:
prefix = target[:1]
addr = dict_search_args(rule_conf, target, 'address')
if addr and not (ignore_type_addr and target == nat_type):
operator = ''
if addr[:1] == '!':
operator = '!='
addr = addr[1:]
output.append(f'{ip_prefix} {prefix}addr {operator} {addr}')
addr_prefix = dict_search_args(rule_conf, target, 'prefix')
if addr_prefix and ipv6:
operator = ''
if addr_prefix[:1] == '!':
operator = '!='
addr_prefix = addr[1:]
output.append(f'ip6 {prefix}addr {operator} {addr_prefix}')
port = dict_search_args(rule_conf, target, 'port')
if port:
protocol = rule_conf['protocol']
if protocol == 'tcp_udp':
protocol = 'th'
operator = ''
if port[:1] == '!':
operator = '!='
port = port[1:]
output.append(f'{protocol} {prefix}port {operator} {{ {port} }}')
output.append('counter')
if 'log' in rule_conf:
output.append(f'log prefix "[{log_prefix}{log_suffix}]"')
if translation_str:
output.append(translation_str)
output.append(f'comment "{log_prefix}"')
return " ".join(output)
|