diff options
Diffstat (limited to 'src')
| -rwxr-xr-x | src/op_mode/firewall.py | 113 | 
1 files changed, 87 insertions, 26 deletions
| diff --git a/src/op_mode/firewall.py b/src/op_mode/firewall.py index 7a3ab921d..086536e4e 100755 --- a/src/op_mode/firewall.py +++ b/src/op_mode/firewall.py @@ -148,6 +148,38 @@ def get_nftables_group_members(family, table, name):      return out +def get_nftables_remote_group_members(family, table, name): +    prefix = 'ip6' if family == 'ipv6' else 'ip' +    out = [] + +    try: +        results_str = cmd(f'nft -j list set {prefix} {table} {name}') +        results = json.loads(results_str) +    except: +        return out + +    if 'nftables' not in results: +        return out + +    for obj in results['nftables']: +        if 'set' not in obj: +            continue + +        set_obj = obj['set'] +        if 'elem' in set_obj: +            for elem in set_obj['elem']: +                # search for single IP elements +                if isinstance(elem, str): +                    out.append(elem) +                # search for prefix elements +                elif isinstance(elem, dict) and 'prefix' in elem: +                    out.append(f"{elem['prefix']['addr']}/{elem['prefix']['len']}") +                # search for IP range elements +                elif isinstance(elem, dict) and 'range' in elem: +                    out.append(f"{elem['range'][0]}-{elem['range'][1]}") + +    return out +  def output_firewall_vertical(rules, headers, adjust=True):      for rule in rules:          adjusted_rule = rule + [""] * (len(headers) - len(rule)) if adjust else rule # account for different header length, like default-action @@ -556,32 +588,8 @@ def show_firewall_group(name=None):      header_tail = []      for group_type, group_type_conf in firewall['group'].items(): -        ## -        if group_type != 'dynamic_group': - -            for group_name, group_conf in group_type_conf.items(): -                if name and name != group_name: -                    continue - -                references = find_references(group_type, group_name) -                row = [group_name,  textwrap.fill(group_conf.get('description') or '', 50), group_type, '\n'.join(references) or 'N/D'] -                if 'address' in group_conf: -                    row.append("\n".join(sorted(group_conf['address']))) -                elif 'network' in group_conf: -                    row.append("\n".join(sorted(group_conf['network'], key=ipaddress.ip_network))) -                elif 'mac_address' in group_conf: -                    row.append("\n".join(sorted(group_conf['mac_address']))) -                elif 'port' in group_conf: -                    row.append("\n".join(sorted(group_conf['port']))) -                elif 'interface' in group_conf: -                    row.append("\n".join(sorted(group_conf['interface']))) -                elif 'url' in group_conf: -                    row.append(group_conf['url']) -                else: -                    row.append('N/D') -                rows.append(row) - -        else: +        # interate over dynamic-groups +        if group_type == 'dynamic_group':              if not args.detail:                  header_tail = ['Timeout', 'Expires'] @@ -628,6 +636,59 @@ def show_firewall_group(name=None):                              header_tail += [""] * (len(members) - 1)                              rows.append(row) +        # iterate over remote-groups +        elif group_type == 'remote_group': +            for remote_name, remote_conf in group_type_conf.items(): +                if name and name != remote_name: +                    continue + +                references = find_references(group_type, remote_name) +                row = [remote_name, textwrap.fill(remote_conf.get('description') or '', 50), group_type, '\n'.join(references) or 'N/D'] +                members = get_nftables_remote_group_members("ipv4", 'vyos_filter', f'R_{remote_name}') + +                if 'url' in remote_conf: +                    # display only the url if no members are found for both views +                    if not members: +                        if args.detail: +                            header_tail = ['Remote URL'] +                            row.append('N/D') +                            row.append(remote_conf['url']) +                        else: +                            row.append(remote_conf['url']) +                        rows.append(row) +                    else: +                        # display all table elements in detail view +                        if args.detail: +                            header_tail = ['Remote URL'] +                            row += [' '.join(members)] +                            row.append(remote_conf['url']) +                            rows.append(row) +                        else: +                            row.append(remote_conf['url']) +                            rows.append(row) + +        # catch the rest of the group types +        else: +            for group_name, group_conf in group_type_conf.items(): +                if name and name != group_name: +                    continue + +                references = find_references(group_type, group_name) +                row = [group_name,  textwrap.fill(group_conf.get('description') or '', 50), group_type, '\n'.join(references) or 'N/D'] +                if 'address' in group_conf: +                    row.append("\n".join(sorted(group_conf['address']))) +                elif 'network' in group_conf: +                    row.append("\n".join(sorted(group_conf['network'], key=ipaddress.ip_network))) +                elif 'mac_address' in group_conf: +                    row.append("\n".join(sorted(group_conf['mac_address']))) +                elif 'port' in group_conf: +                    row.append("\n".join(sorted(group_conf['port']))) +                elif 'interface' in group_conf: +                    row.append("\n".join(sorted(group_conf['interface']))) +                else: +                    row.append('N/D') +                rows.append(row) +      if rows:          print('Firewall Groups\n')          if args.detail: | 
