From 884f68b25455c547f7b0e7dea4e543daea99f3c2 Mon Sep 17 00:00:00 2001 From: sarthurdev <965089+sarthurdev@users.noreply.github.com> Date: Sat, 2 Jul 2022 01:10:47 +0200 Subject: firewall: T4299: Add ability to inverse match country codes --- interface-definitions/include/firewall/geoip.xml.i | 6 ++++++ python/vyos/firewall.py | 24 ++++++++-------------- smoketest/scripts/cli/test_firewall.py | 22 ++++++++++++++++++-- 3 files changed, 35 insertions(+), 17 deletions(-) diff --git a/interface-definitions/include/firewall/geoip.xml.i b/interface-definitions/include/firewall/geoip.xml.i index f6208f718..9fb37a574 100644 --- a/interface-definitions/include/firewall/geoip.xml.i +++ b/interface-definitions/include/firewall/geoip.xml.i @@ -17,6 +17,12 @@ + + + Inverse match of country-codes + + + diff --git a/python/vyos/firewall.py b/python/vyos/firewall.py index 7d1278d0e..3e2de4c3f 100644 --- a/python/vyos/firewall.py +++ b/python/vyos/firewall.py @@ -152,7 +152,10 @@ def parse_rule(rule_conf, fw_name, rule_id, ip_name): output.append(f'{ip_name} {prefix}addr {suffix}') if dict_search_args(side_conf, 'geoip', 'country_code'): - output.append(f'{ip_name} {prefix}addr @GEOIP_CC_{fw_name}_{rule_id}') + operator = '' + if dict_search_args(side_conf, 'geoip', 'inverse_match') != None: + operator = '!=' + output.append(f'{ip_name} {prefix}addr {operator} @GEOIP_CC_{fw_name}_{rule_id}') if 'mac_address' in side_conf: suffix = side_conf["mac_address"] @@ -429,22 +432,13 @@ def geoip_update(firewall, force=False): # Map country codes to set names for codes, path in dict_search_recursive(firewall, 'country_code'): + set_name = f'GEOIP_CC_{path[1]}_{path[3]}' if path[0] == 'name': - set_name = f'GEOIP_CC_{path[1]}_{path[3]}' - ipv4_sets[set_name] = [] for code in codes: - if code not in ipv4_codes: - ipv4_codes[code] = [set_name] - else: - ipv4_codes[code].append(set_n) + ipv4_codes.setdefault(code, []).append(set_name) elif path[0] == 'ipv6_name': - set_name = f'GEOIP_CC_{path[1]}_{path[3]}' - ipv6_sets[set_name] = [] for code in codes: - if code not in ipv6_codes: - ipv6_codes[code] = [set_name] - else: - ipv6_codes[code].append(set_name) + ipv6_codes.setdefault(code, []).append(set_name) if not ipv4_codes and not ipv6_codes: if force: @@ -459,11 +453,11 @@ def geoip_update(firewall, force=False): if code in ipv4_codes and ipv4: ip_range = f'{start}-{end}' if start != end else start for setname in ipv4_codes[code]: - ipv4_sets[setname].append(ip_range) + ipv4_sets.setdefault(setname, []).append(ip_range) if code in ipv6_codes and not ipv4: ip_range = f'{start}-{end}' if start != end else start for setname in ipv6_codes[code]: - ipv6_sets[setname].append(ip_range) + ipv6_sets.setdefault(setname, []).append(ip_range) render(nftables_geoip_conf, 'firewall/nftables-geoip-update.j2', { 'ipv4_sets': ipv4_sets, diff --git a/smoketest/scripts/cli/test_firewall.py b/smoketest/scripts/cli/test_firewall.py index ce06b9074..4de90e1ec 100755 --- a/smoketest/scripts/cli/test_firewall.py +++ b/smoketest/scripts/cli/test_firewall.py @@ -69,8 +69,8 @@ class TestFirewall(VyOSUnitTestSHIM.TestCase): self.verify_nftables(nftables_search, 'ip filter', inverse=True) - def verify_nftables(self, nftables_search, table, inverse=False): - nftables_output = cmd(f'sudo nft list table {table}') + def verify_nftables(self, nftables_search, table, inverse=False, args=''): + nftables_output = cmd(f'sudo nft {args} list table {table}') for search in nftables_search: matched = False @@ -80,6 +80,24 @@ class TestFirewall(VyOSUnitTestSHIM.TestCase): break self.assertTrue(not matched if inverse else matched, msg=search) + def test_geoip(self): + self.cli_set(['firewall', 'name', 'smoketest', 'rule', '1', 'action', 'drop']) + self.cli_set(['firewall', 'name', 'smoketest', 'rule', '1', 'source', 'geoip', 'country-code', 'se']) + self.cli_set(['firewall', 'name', 'smoketest', 'rule', '1', 'source', 'geoip', 'country-code', 'gb']) + self.cli_set(['firewall', 'name', 'smoketest', 'rule', '2', 'action', 'accept']) + self.cli_set(['firewall', 'name', 'smoketest', 'rule', '2', 'source', 'geoip', 'country-code', 'de']) + self.cli_set(['firewall', 'name', 'smoketest', 'rule', '2', 'source', 'geoip', 'country-code', 'fr']) + self.cli_set(['firewall', 'name', 'smoketest', 'rule', '2', 'source', 'geoip', 'inverse-match']) + + self.cli_commit() + + nftables_search = [ + ['ip saddr @GEOIP_CC_smoketest_1', 'drop'], + ['ip saddr != @GEOIP_CC_smoketest_2', 'return'] + ] + # -t prevents 1000+ GeoIP elements being returned + self.verify_nftables(nftables_search, 'ip filter', args='-t') + def test_groups(self): hostmap_path = ['system', 'static-host-mapping', 'host-name'] example_org = ['192.0.2.8', '192.0.2.10', '192.0.2.11'] -- cgit v1.2.3