summaryrefslogtreecommitdiff
path: root/src/migration-scripts/firewall/6-to-7
diff options
context:
space:
mode:
authorDaniil Baturin <daniil@vyos.io>2024-09-12 13:59:18 +0100
committerGitHub <noreply@github.com>2024-09-12 13:59:18 +0100
commit205d957d092ade5708cc2182381864c04e4c0aff (patch)
treee78636efaa1332c5d49e1c2f023721dc030f8d6a /src/migration-scripts/firewall/6-to-7
parent9652bfda0a7f3e7932aecb32262c34f3fede72b2 (diff)
parenteaa9c82670fa5ee90835266e6f7a24f81c49d17e (diff)
downloadvyos-1x-205d957d092ade5708cc2182381864c04e4c0aff.tar.gz
vyos-1x-205d957d092ade5708cc2182381864c04e4c0aff.zip
Merge pull request #4050 from jestabro/revise-migration-circinus
T6007: revise migration system
Diffstat (limited to 'src/migration-scripts/firewall/6-to-7')
-rw-r--r--[-rwxr-xr-x]src/migration-scripts/firewall/6-to-7444
1 files changed, 213 insertions, 231 deletions
diff --git a/src/migration-scripts/firewall/6-to-7 b/src/migration-scripts/firewall/6-to-7
index 938044c6d..1afbc780b 100755..100644
--- a/src/migration-scripts/firewall/6-to-7
+++ b/src/migration-scripts/firewall/6-to-7
@@ -1,18 +1,17 @@
-#!/usr/bin/env python3
+# Copyright 2021-2024 VyOS maintainers and contributors <maintainers@vyos.io>
#
-# Copyright (C) 2021-2024 VyOS maintainers and contributors
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
#
-# This program is free software; you can redistribute it and/or modify
-# it under the terms of the GNU General Public License version 2 or later as
-# published by the Free Software Foundation.
-#
-# This program is distributed in the hope that it will be useful,
+# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
+# You should have received a copy of the GNU Lesser General Public License
+# along with this library. If not, see <http://www.gnu.org/licenses/>.
# T2199: Remove unavailable nodes due to XML/Python implementation using nftables
# monthdays: nftables does not have a monthdays equivalent
@@ -23,28 +22,11 @@
import re
-from sys import argv
-from sys import exit
-
from vyos.configtree import ConfigTree
-if len(argv) < 2:
- print("Must specify file name!")
- exit(1)
-
-file_name = argv[1]
-
-with open(file_name, 'r') as f:
- config_file = f.read()
-
max_len_description = 255
base = ['firewall']
-config = ConfigTree(config_file)
-
-if not config.exists(base):
- # Nothing to do
- exit(0)
icmp_remove = ['any']
icmp_translations = {
@@ -107,216 +89,216 @@ icmpv6_translations = {
'unknown-option': [4, 2]
}
-v4_found = False
-v6_found = False
v4_groups = ["address-group", "network-group", "port-group"]
v6_groups = ["ipv6-address-group", "ipv6-network-group", "port-group"]
-translated_dict = {}
-if config.exists(base + ['group']):
- for group_type in config.list_nodes(base + ['group']):
- for group_name in config.list_nodes(base + ['group', group_type]):
- name_description = base + ['group', group_type, group_name, 'description']
+def migrate(config: ConfigTree) -> None:
+ if not config.exists(base):
+ # Nothing to do
+ return
+
+ v4_found = False
+ v6_found = False
+ translated_dict = {}
+
+ if config.exists(base + ['group']):
+ for group_type in config.list_nodes(base + ['group']):
+ for group_name in config.list_nodes(base + ['group', group_type]):
+ name_description = base + ['group', group_type, group_name, 'description']
+ if config.exists(name_description):
+ tmp = config.return_value(name_description)
+ config.set(name_description, value=tmp[:max_len_description])
+ if '+' in group_name:
+ replacement_string = "_"
+ if group_type in v4_groups and not v4_found:
+ v4_found = True
+ if group_type in v6_groups and not v6_found:
+ v6_found = True
+ new_group_name = group_name.replace('+', replacement_string)
+ while config.exists(base + ['group', group_type, new_group_name]):
+ replacement_string = replacement_string + "_"
+ new_group_name = group_name.replace('+', replacement_string)
+ translated_dict[group_name] = new_group_name
+ config.copy(base + ['group', group_type, group_name], base + ['group', group_type, new_group_name])
+ config.delete(base + ['group', group_type, group_name])
+
+ if config.exists(base + ['name']):
+ for name in config.list_nodes(base + ['name']):
+ name_description = base + ['name', name, 'description']
if config.exists(name_description):
tmp = config.return_value(name_description)
config.set(name_description, value=tmp[:max_len_description])
- if '+' in group_name:
- replacement_string = "_"
- if group_type in v4_groups and not v4_found:
- v4_found = True
- if group_type in v6_groups and not v6_found:
- v6_found = True
- new_group_name = group_name.replace('+', replacement_string)
- while config.exists(base + ['group', group_type, new_group_name]):
- replacement_string = replacement_string + "_"
- new_group_name = group_name.replace('+', replacement_string)
- translated_dict[group_name] = new_group_name
- config.copy(base + ['group', group_type, group_name], base + ['group', group_type, new_group_name])
- config.delete(base + ['group', group_type, group_name])
-
-if config.exists(base + ['name']):
- for name in config.list_nodes(base + ['name']):
- name_description = base + ['name', name, 'description']
- if config.exists(name_description):
- tmp = config.return_value(name_description)
- config.set(name_description, value=tmp[:max_len_description])
-
- if not config.exists(base + ['name', name, 'rule']):
- continue
-
- for rule in config.list_nodes(base + ['name', name, 'rule']):
- rule_description = base + ['name', name, 'rule', rule, 'description']
- if config.exists(rule_description):
- tmp = config.return_value(rule_description)
- config.set(rule_description, value=tmp[:max_len_description])
-
- rule_recent = base + ['name', name, 'rule', rule, 'recent']
- rule_time = base + ['name', name, 'rule', rule, 'time']
- rule_tcp_flags = base + ['name', name, 'rule', rule, 'tcp', 'flags']
- rule_icmp = base + ['name', name, 'rule', rule, 'icmp']
-
- if config.exists(rule_time + ['monthdays']):
- config.delete(rule_time + ['monthdays'])
-
- if config.exists(rule_time + ['utc']):
- config.delete(rule_time + ['utc'])
-
- if config.exists(rule_recent + ['time']):
- tmp = int(config.return_value(rule_recent + ['time']))
- unit = 'minute'
- if tmp > 600:
- unit = 'hour'
- elif tmp < 10:
- unit = 'second'
- config.set(rule_recent + ['time'], value=unit)
-
- if config.exists(rule_tcp_flags):
- tmp = config.return_value(rule_tcp_flags)
- config.delete(rule_tcp_flags)
- for flag in tmp.split(","):
- if flag[0] == '!':
- config.set(rule_tcp_flags + ['not', flag[1:].lower()])
- else:
- config.set(rule_tcp_flags + [flag.lower()])
-
- if config.exists(rule_icmp + ['type-name']):
- tmp = config.return_value(rule_icmp + ['type-name'])
- if tmp in icmp_remove:
- config.delete(rule_icmp + ['type-name'])
- elif tmp in icmp_translations:
- translate = icmp_translations[tmp]
- if isinstance(translate, str):
- config.set(rule_icmp + ['type-name'], value=translate)
- elif isinstance(translate, list):
+
+ if not config.exists(base + ['name', name, 'rule']):
+ continue
+
+ for rule in config.list_nodes(base + ['name', name, 'rule']):
+ rule_description = base + ['name', name, 'rule', rule, 'description']
+ if config.exists(rule_description):
+ tmp = config.return_value(rule_description)
+ config.set(rule_description, value=tmp[:max_len_description])
+
+ rule_recent = base + ['name', name, 'rule', rule, 'recent']
+ rule_time = base + ['name', name, 'rule', rule, 'time']
+ rule_tcp_flags = base + ['name', name, 'rule', rule, 'tcp', 'flags']
+ rule_icmp = base + ['name', name, 'rule', rule, 'icmp']
+
+ if config.exists(rule_time + ['monthdays']):
+ config.delete(rule_time + ['monthdays'])
+
+ if config.exists(rule_time + ['utc']):
+ config.delete(rule_time + ['utc'])
+
+ if config.exists(rule_recent + ['time']):
+ tmp = int(config.return_value(rule_recent + ['time']))
+ unit = 'minute'
+ if tmp > 600:
+ unit = 'hour'
+ elif tmp < 10:
+ unit = 'second'
+ config.set(rule_recent + ['time'], value=unit)
+
+ if config.exists(rule_tcp_flags):
+ tmp = config.return_value(rule_tcp_flags)
+ config.delete(rule_tcp_flags)
+ for flag in tmp.split(","):
+ if flag[0] == '!':
+ config.set(rule_tcp_flags + ['not', flag[1:].lower()])
+ else:
+ config.set(rule_tcp_flags + [flag.lower()])
+
+ if config.exists(rule_icmp + ['type-name']):
+ tmp = config.return_value(rule_icmp + ['type-name'])
+ if tmp in icmp_remove:
config.delete(rule_icmp + ['type-name'])
- config.set(rule_icmp + ['type'], value=translate[0])
- config.set(rule_icmp + ['code'], value=translate[1])
-
- for direction in ['destination', 'source']:
- if config.exists(base + ['name', name, 'rule', rule, direction]):
- if config.exists(base + ['name', name, 'rule', rule, direction, 'group']) and v4_found:
- for group_type in config.list_nodes(base + ['name', name, 'rule', rule, direction, 'group']):
- group_name = config.return_value(base + ['name', name, 'rule', rule, direction, 'group', group_type])
- if '+' in group_name:
- if group_name[0] == "!":
- new_group_name = "!" + translated_dict[group_name[1:]]
- else:
- new_group_name = translated_dict[group_name]
- config.set(base + ['name', name, 'rule', rule, direction, 'group', group_type], value=new_group_name)
-
- pg_base = base + ['name', name, 'rule', rule, direction, 'group', 'port-group']
- proto_base = base + ['name', name, 'rule', rule, 'protocol']
- if config.exists(pg_base) and not config.exists(proto_base):
- config.set(proto_base, value='tcp_udp')
-
- if '+' in name:
- replacement_string = "_"
- new_name = name.replace('+', replacement_string)
- while config.exists(base + ['name', new_name]):
- replacement_string = replacement_string + "_"
+ elif tmp in icmp_translations:
+ translate = icmp_translations[tmp]
+ if isinstance(translate, str):
+ config.set(rule_icmp + ['type-name'], value=translate)
+ elif isinstance(translate, list):
+ config.delete(rule_icmp + ['type-name'])
+ config.set(rule_icmp + ['type'], value=translate[0])
+ config.set(rule_icmp + ['code'], value=translate[1])
+
+ for direction in ['destination', 'source']:
+ if config.exists(base + ['name', name, 'rule', rule, direction]):
+ if config.exists(base + ['name', name, 'rule', rule, direction, 'group']) and v4_found:
+ for group_type in config.list_nodes(base + ['name', name, 'rule', rule, direction, 'group']):
+ group_name = config.return_value(base + ['name', name, 'rule', rule, direction, 'group', group_type])
+ if '+' in group_name:
+ if group_name[0] == "!":
+ new_group_name = "!" + translated_dict[group_name[1:]]
+ else:
+ new_group_name = translated_dict[group_name]
+ config.set(base + ['name', name, 'rule', rule, direction, 'group', group_type], value=new_group_name)
+
+ pg_base = base + ['name', name, 'rule', rule, direction, 'group', 'port-group']
+ proto_base = base + ['name', name, 'rule', rule, 'protocol']
+ if config.exists(pg_base) and not config.exists(proto_base):
+ config.set(proto_base, value='tcp_udp')
+
+ if '+' in name:
+ replacement_string = "_"
new_name = name.replace('+', replacement_string)
- config.copy(base + ['name', name], base + ['name', new_name])
- config.delete(base + ['name', name])
-
-if config.exists(base + ['ipv6-name']):
- for name in config.list_nodes(base + ['ipv6-name']):
- name_description = base + ['ipv6-name', name, 'description']
- if config.exists(name_description):
- tmp = config.return_value(name_description)
- config.set(name_description, value=tmp[:max_len_description])
-
- if not config.exists(base + ['ipv6-name', name, 'rule']):
- continue
-
- for rule in config.list_nodes(base + ['ipv6-name', name, 'rule']):
- rule_description = base + ['ipv6-name', name, 'rule', rule, 'description']
- if config.exists(rule_description):
- tmp = config.return_value(rule_description)
- config.set(rule_description, value=tmp[:max_len_description])
-
- rule_recent = base + ['ipv6-name', name, 'rule', rule, 'recent']
- rule_time = base + ['ipv6-name', name, 'rule', rule, 'time']
- rule_tcp_flags = base + ['ipv6-name', name, 'rule', rule, 'tcp', 'flags']
- rule_icmp = base + ['ipv6-name', name, 'rule', rule, 'icmpv6']
-
- if config.exists(rule_time + ['monthdays']):
- config.delete(rule_time + ['monthdays'])
-
- if config.exists(rule_time + ['utc']):
- config.delete(rule_time + ['utc'])
-
- if config.exists(rule_recent + ['time']):
- tmp = int(config.return_value(rule_recent + ['time']))
- unit = 'minute'
- if tmp > 600:
- unit = 'hour'
- elif tmp < 10:
- unit = 'second'
- config.set(rule_recent + ['time'], value=unit)
-
- if config.exists(rule_tcp_flags):
- tmp = config.return_value(rule_tcp_flags)
- config.delete(rule_tcp_flags)
- for flag in tmp.split(","):
- if flag[0] == '!':
- config.set(rule_tcp_flags + ['not', flag[1:].lower()])
- else:
- config.set(rule_tcp_flags + [flag.lower()])
-
- if config.exists(base + ['ipv6-name', name, 'rule', rule, 'protocol']):
- tmp = config.return_value(base + ['ipv6-name', name, 'rule', rule, 'protocol'])
- if tmp == 'icmpv6':
- config.set(base + ['ipv6-name', name, 'rule', rule, 'protocol'], value='ipv6-icmp')
-
- if config.exists(rule_icmp + ['type']):
- tmp = config.return_value(rule_icmp + ['type'])
- type_code_match = re.match(r'^(\d+)(?:/(\d+))?$', tmp)
-
- if type_code_match:
- config.set(rule_icmp + ['type'], value=type_code_match[1])
- if type_code_match[2]:
- config.set(rule_icmp + ['code'], value=type_code_match[2])
- elif tmp in icmpv6_remove:
- config.delete(rule_icmp + ['type'])
- elif tmp in icmpv6_translations:
- translate = icmpv6_translations[tmp]
- if isinstance(translate, str):
+ while config.exists(base + ['name', new_name]):
+ replacement_string = replacement_string + "_"
+ new_name = name.replace('+', replacement_string)
+ config.copy(base + ['name', name], base + ['name', new_name])
+ config.delete(base + ['name', name])
+
+ if config.exists(base + ['ipv6-name']):
+ for name in config.list_nodes(base + ['ipv6-name']):
+ name_description = base + ['ipv6-name', name, 'description']
+ if config.exists(name_description):
+ tmp = config.return_value(name_description)
+ config.set(name_description, value=tmp[:max_len_description])
+
+ if not config.exists(base + ['ipv6-name', name, 'rule']):
+ continue
+
+ for rule in config.list_nodes(base + ['ipv6-name', name, 'rule']):
+ rule_description = base + ['ipv6-name', name, 'rule', rule, 'description']
+ if config.exists(rule_description):
+ tmp = config.return_value(rule_description)
+ config.set(rule_description, value=tmp[:max_len_description])
+
+ rule_recent = base + ['ipv6-name', name, 'rule', rule, 'recent']
+ rule_time = base + ['ipv6-name', name, 'rule', rule, 'time']
+ rule_tcp_flags = base + ['ipv6-name', name, 'rule', rule, 'tcp', 'flags']
+ rule_icmp = base + ['ipv6-name', name, 'rule', rule, 'icmpv6']
+
+ if config.exists(rule_time + ['monthdays']):
+ config.delete(rule_time + ['monthdays'])
+
+ if config.exists(rule_time + ['utc']):
+ config.delete(rule_time + ['utc'])
+
+ if config.exists(rule_recent + ['time']):
+ tmp = int(config.return_value(rule_recent + ['time']))
+ unit = 'minute'
+ if tmp > 600:
+ unit = 'hour'
+ elif tmp < 10:
+ unit = 'second'
+ config.set(rule_recent + ['time'], value=unit)
+
+ if config.exists(rule_tcp_flags):
+ tmp = config.return_value(rule_tcp_flags)
+ config.delete(rule_tcp_flags)
+ for flag in tmp.split(","):
+ if flag[0] == '!':
+ config.set(rule_tcp_flags + ['not', flag[1:].lower()])
+ else:
+ config.set(rule_tcp_flags + [flag.lower()])
+
+ if config.exists(base + ['ipv6-name', name, 'rule', rule, 'protocol']):
+ tmp = config.return_value(base + ['ipv6-name', name, 'rule', rule, 'protocol'])
+ if tmp == 'icmpv6':
+ config.set(base + ['ipv6-name', name, 'rule', rule, 'protocol'], value='ipv6-icmp')
+
+ if config.exists(rule_icmp + ['type']):
+ tmp = config.return_value(rule_icmp + ['type'])
+ type_code_match = re.match(r'^(\d+)(?:/(\d+))?$', tmp)
+
+ if type_code_match:
+ config.set(rule_icmp + ['type'], value=type_code_match[1])
+ if type_code_match[2]:
+ config.set(rule_icmp + ['code'], value=type_code_match[2])
+ elif tmp in icmpv6_remove:
config.delete(rule_icmp + ['type'])
- config.set(rule_icmp + ['type-name'], value=translate)
- elif isinstance(translate, list):
- config.set(rule_icmp + ['type'], value=translate[0])
- config.set(rule_icmp + ['code'], value=translate[1])
- else:
- config.rename(rule_icmp + ['type'], 'type-name')
-
- for direction in ['destination', 'source']:
- if config.exists(base + ['ipv6-name', name, 'rule', rule, direction]):
- if config.exists(base + ['ipv6-name', name, 'rule', rule, direction, 'group']) and v6_found:
- for group_type in config.list_nodes(base + ['ipv6-name', name, 'rule', rule, direction, 'group']):
- group_name = config.return_value(base + ['ipv6-name', name, 'rule', rule, direction, 'group', group_type])
- if '+' in group_name:
- if group_name[0] == "!":
- new_group_name = "!" + translated_dict[group_name[1:]]
- else:
- new_group_name = translated_dict[group_name]
- config.set(base + ['ipv6-name', name, 'rule', rule, direction, 'group', group_type], value=new_group_name)
-
- pg_base = base + ['ipv6-name', name, 'rule', rule, direction, 'group', 'port-group']
- proto_base = base + ['ipv6-name', name, 'rule', rule, 'protocol']
- if config.exists(pg_base) and not config.exists(proto_base):
- config.set(proto_base, value='tcp_udp')
-
- if '+' in name:
- replacement_string = "_"
- new_name = name.replace('+', replacement_string)
- while config.exists(base + ['ipv6-name', new_name]):
- replacement_string = replacement_string + "_"
+ elif tmp in icmpv6_translations:
+ translate = icmpv6_translations[tmp]
+ if isinstance(translate, str):
+ config.delete(rule_icmp + ['type'])
+ config.set(rule_icmp + ['type-name'], value=translate)
+ elif isinstance(translate, list):
+ config.set(rule_icmp + ['type'], value=translate[0])
+ config.set(rule_icmp + ['code'], value=translate[1])
+ else:
+ config.rename(rule_icmp + ['type'], 'type-name')
+
+ for direction in ['destination', 'source']:
+ if config.exists(base + ['ipv6-name', name, 'rule', rule, direction]):
+ if config.exists(base + ['ipv6-name', name, 'rule', rule, direction, 'group']) and v6_found:
+ for group_type in config.list_nodes(base + ['ipv6-name', name, 'rule', rule, direction, 'group']):
+ group_name = config.return_value(base + ['ipv6-name', name, 'rule', rule, direction, 'group', group_type])
+ if '+' in group_name:
+ if group_name[0] == "!":
+ new_group_name = "!" + translated_dict[group_name[1:]]
+ else:
+ new_group_name = translated_dict[group_name]
+ config.set(base + ['ipv6-name', name, 'rule', rule, direction, 'group', group_type], value=new_group_name)
+
+ pg_base = base + ['ipv6-name', name, 'rule', rule, direction, 'group', 'port-group']
+ proto_base = base + ['ipv6-name', name, 'rule', rule, 'protocol']
+ if config.exists(pg_base) and not config.exists(proto_base):
+ config.set(proto_base, value='tcp_udp')
+
+ if '+' in name:
+ replacement_string = "_"
new_name = name.replace('+', replacement_string)
- config.copy(base + ['ipv6-name', name], base + ['ipv6-name', new_name])
- config.delete(base + ['ipv6-name', name])
-try:
- with open(file_name, 'w') as f:
- f.write(config.to_string())
-except OSError as e:
- print("Failed to save the modified config: {}".format(e))
- exit(1)
+ while config.exists(base + ['ipv6-name', new_name]):
+ replacement_string = replacement_string + "_"
+ new_name = name.replace('+', replacement_string)
+ config.copy(base + ['ipv6-name', name], base + ['ipv6-name', new_name])
+ config.delete(base + ['ipv6-name', name])