From f0aab13bb4bf111b3b47f34cb554873e1db1d44d Mon Sep 17 00:00:00 2001 From: zsdc Date: Wed, 18 Dec 2019 23:57:52 +0200 Subject: flow-accounting: T1890: flow-accounting rewritten with Python and XML This patch keep compatibility with old configuration and software, but now it is much easier to add a lot of other useful things Completely replaces vyatta-netflow package (except some outdated and not available via CLI parts) --- src/conf_mode/flow_accounting_conf.py | 440 ++++++++++++++++++++++++++++++++++ src/op_mode/flow_accounting_op.py | 233 ++++++++++++++++++ 2 files changed, 673 insertions(+) create mode 100644 src/conf_mode/flow_accounting_conf.py create mode 100644 src/op_mode/flow_accounting_op.py (limited to 'src') diff --git a/src/conf_mode/flow_accounting_conf.py b/src/conf_mode/flow_accounting_conf.py new file mode 100644 index 000000000..75dee4e64 --- /dev/null +++ b/src/conf_mode/flow_accounting_conf.py @@ -0,0 +1,440 @@ +#!/usr/bin/env python3 +# +# Copyright (C) 2018 VyOS maintainers and contributors +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 2 or later as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# + +import sys +import re +import ipaddress +import subprocess + +from vyos.config import Config +from vyos import ConfigError +import vyos.interfaces +from vyos.ifconfig import Interface +from jinja2 import Template + +# default values +default_sflow_server_port = 6343 +default_netflow_server_port = 2055 +default_plugin_pipe_size = 10 +default_captured_packet_size = 128 +default_netflow_version = '9' +default_sflow_agentip = 'auto' +uacctd_conf_path = '/etc/pmacct/uacctd.conf' +iptables_nflog_table = 'raw' +iptables_nflog_chain = 'VYATTA_CT_PREROUTING_HOOK' + +# pmacct config template +uacct_conf_jinja = '''# Genereated from VyOS configuration +daemonize: true +promisc: false +pidfile: /var/run/uacctd.pid +uacctd_group: 2 +uacctd_nl_size: 2097152 +snaplen: {{ snaplen }} +aggregate: in_iface,src_mac,dst_mac,vlan,src_host,dst_host,src_port,dst_port,proto,tos,flows +plugin_pipe_size: {{ templatecfg['plugin_pipe_size'] }} +plugin_buffer_size: {{ templatecfg['plugin_buffer_size'] }} +{%- if templatecfg['syslog-facility'] != none %} +syslog: {{ templatecfg['syslog-facility'] }} +{%- endif %} +{%- if templatecfg['disable-imt'] == none %} +imt_path: /tmp/uacctd.pipe +imt_mem_pools_number: 169 +{%- endif %} +plugins: +{%- if templatecfg['netflow']['servers'] != none -%} + {% for server in templatecfg['netflow']['servers'] %} + {%- if loop.last -%}nfprobe[nf_{{ server['address'] }}]{%- else %}nfprobe[nf_{{ server['address'] }}],{%- endif %} + {%- endfor -%} + {% set plugins_presented = true %} +{%- endif %} +{%- if templatecfg['sflow']['servers'] != none -%} + {% if plugins_presented -%} + {%- for server in templatecfg['sflow']['servers'] -%} + ,sfprobe[sf_{{ server['address'] }}] + {%- endfor %} + {%- else %} + {%- for server in templatecfg['sflow']['servers'] %} + {%- if loop.last -%}sfprobe[sf_{{ server['address'] }}]{%- else %}sfprobe[sf_{{ server['address'] }}],{%- endif %} + {%- endfor %} + {%- endif -%} + {% set plugins_presented = true %} +{%- endif %} +{%- if templatecfg['disable-imt'] == none %} + {%- if plugins_presented -%},memory{%- else %}memory{%- endif %} +{%- endif %} +{%- if templatecfg['netflow']['servers'] != none %} +{%- for server in templatecfg['netflow']['servers'] %} +nfprobe_receiver[nf_{{ server['address'] }}]: {{ server['address'] }}:{{ server['port'] }} +nfprobe_version[nf_{{ server['address'] }}]: {{ templatecfg['netflow']['version'] }} +{%- if templatecfg['netflow']['engine-id'] != none %} +nfprobe_engine[nf_{{ server['address'] }}]: {{ templatecfg['netflow']['engine-id'] }} +{%- endif %} +{%- if templatecfg['netflow']['max-flows'] != none %} +nfprobe_maxflows[nf_{{ server['address'] }}]: {{ templatecfg['netflow']['max-flows'] }} +{%- endif %} +{%- if templatecfg['netflow']['sampling-rate'] != none %} +sampling_rate[nf_{{ server['address'] }}]: {{ templatecfg['netflow']['sampling-rate'] }} +{%- endif %} +{%- if templatecfg['netflow']['source-ip'] != none %} +nfprobe_source_ip[nf_{{ server['address'] }}]: {{ templatecfg['netflow']['source-ip'] }} +{%- endif %} +{%- if templatecfg['netflow']['timeout_string'] != '' %} +nfprobe_timeouts[nf_{{ server['address'] }}]: {{ templatecfg['netflow']['timeout_string'] }} +{%- endif %} +{%- endfor %} +{%- endif %} +{%- if templatecfg['sflow']['servers'] != none %} +{%- for server in templatecfg['sflow']['servers'] %} +sfprobe_receiver[sf_{{ server['address'] }}]: {{ server['address'] }}:{{ server['port'] }} +sfprobe_agentip[sf_{{ server['address'] }}]: {{ templatecfg['sflow']['agent-address'] }} +{%- if templatecfg['sflow']['sampling-rate'] != none %} +sampling_rate[sf_{{ server['address'] }}]: {{ templatecfg['sflow']['sampling-rate'] }} +{%- endif %} +{%- endfor %} +{% endif %} +''' + +# helper functions +# check if node exists and return True if this is true +def _node_exists(path): + vyos_config = Config() + if vyos_config.exists(path): + return True + +# get sFlow agent-ip if agent-address is "auto" (default behaviour) +def _sflow_default_agentip(config): + # check if any of BGP, OSPF, OSPFv3 protocols are configured and use router-id from there + if config.exists('protocols bgp'): + bgp_router_id = config.return_value("protocols bgp {} parameters router-id".format(config.list_nodes('protocols bgp')[0])) + if bgp_router_id: + return bgp_router_id + if config.return_value('protocols ospf parameters router-id'): + return config.return_value('protocols ospf parameters router-id') + if config.return_value('protocols ospfv3 parameters router-id'): + return config.return_value('protocols ospfv3 parameters router-id') + + # if router-id was not found, use first available ip of any interface + for iface in vyos.interfaces.list_interfaces(): + for address in Interface(iface).get_addr(): + # return an IP, if this is not loopback + regex_filter = re.compile('^(?!(127)|(::1)|(fe80))(?P[a-f\d\.:]+)/\d+$') + if regex_filter.search(address): + return regex_filter.search(address).group('ipaddr') + + # return nothing by default + return None + +# get iptables rule dict for chain in table +def _iptables_get_nflog(): + # define list with rules + rules = [] + + # prepare regex for parsing rules + rule_pattern = "^-A (?P{0} -i (?P[\w\.\*\-]+).*--comment FLOW_ACCOUNTING_RULE.* -j NFLOG.*$)".format(iptables_nflog_chain) + rule_re = re.compile(rule_pattern) + + for iptables_variant in ['iptables', 'ip6tables']: + # run iptables, save output and split it by lines + iptables_command = "sudo {0} -t {1} -S {2}".format(iptables_variant, iptables_nflog_table, iptables_nflog_chain) + process = subprocess.Popen(iptables_command, stdout=subprocess.PIPE, shell=True, universal_newlines=True) + stdout, stderr = process.communicate() + if not process.returncode == 0: + print("Failed to get flows list: command \"{}\" returned exit code: {}\nError: {}".format(command, process.returncode(), stderr)) + sys.exit(1) + iptables_out = stdout.splitlines() + + # parse each line and add information to list + for current_rule in iptables_out: + current_rule_parsed = rule_re.search(current_rule) + if current_rule_parsed: + rules.append({ 'interface': current_rule_parsed.groupdict()["interface"], 'iptables_variant': iptables_variant, 'table': iptables_nflog_table, 'rule_definition': current_rule_parsed.groupdict()["rule_definition"] }) + + # return list with rules + return rules + +# modify iptables rules +def _iptables_config(configured_ifaces): + # define list of iptables commands to modify settings + iptable_commands = [] + + # prepare extended list with configured interfaces + configured_ifaces_extended = [] + for iface in configured_ifaces: + configured_ifaces_extended.append({ 'iface': iface, 'iptables_variant': 'iptables' }) + configured_ifaces_extended.append({ 'iface': iface, 'iptables_variant': 'ip6tables' }) + + # get currently configured interfaces with iptables rules + active_nflog_rules = _iptables_get_nflog() + + # compare current active list with configured one and delete excessive interfaces, add missed + active_nflog_ifaces = [] + for rule in active_nflog_rules: + if rule['interface'] not in configured_ifaces: + iptable_commands.append("sudo {0} -t {1} -D {2}".format(rule['iptables_variant'], rule['table'], rule['rule_definition'])) + else: + active_nflog_ifaces.append({ 'iface': rule['interface'], 'iptables_variant': rule['iptables_variant'] }) + + # do not create new rules for already configured interfaces + for iface in active_nflog_ifaces: + if iface in active_nflog_ifaces: + configured_ifaces_extended.remove(iface) + + # create missed rules + for iface_extended in configured_ifaces_extended: + rule_definition = "{0} -i {1} -m comment --comment FLOW_ACCOUNTING_RULE -j NFLOG --nflog-group 2 --nflog-range {2} --nflog-threshold 100".format(iptables_nflog_chain, iface_extended['iface'], default_captured_packet_size) + iptable_commands.append("sudo {0} -t {1} -I {2}".format(iface_extended['iptables_variant'], iptables_nflog_table, rule_definition)) + + # change iptables + for command in iptable_commands: + return_code = subprocess.call(command.split(' ')) + if not return_code == 0: + raise ConfigError("Failed to run command: {}\nExit code {}".format(command, return_code)) + + +def get_config(): + vc = Config() + vc.set_level('') + # Convert the VyOS config to an abstract internal representation + flow_config = { + 'flow-accounting-configured': vc.exists('system flow-accounting'), + 'buffer-size': vc.return_value('system flow-accounting buffer-size'), + 'disable-imt': _node_exists('system flow-accounting disable-imt'), + 'syslog-facility': vc.return_value('system flow-accounting syslog-facility'), + 'interfaces': None, + 'sflow': { + 'configured': vc.exists('system flow-accounting sflow'), + 'agent-address': vc.return_value('system flow-accounting sflow agent-address'), + 'sampling-rate': vc.return_value('system flow-accounting sflow sampling-rate'), + 'servers': None + }, + 'netflow': { + 'configured': vc.exists('system flow-accounting netflow'), + 'engine-id': vc.return_value('system flow-accounting netflow engine-id'), + 'max-flows': vc.return_value('system flow-accounting netflow max-flows'), + 'sampling-rate': vc.return_value('system flow-accounting netflow sampling-rate'), + 'source-ip': vc.return_value('system flow-accounting netflow source-ip'), + 'version': vc.return_value('system flow-accounting netflow version'), + 'timeout': { + 'expint': vc.return_value('system flow-accounting netflow timeout expiry-interval'), + 'general': vc.return_value('system flow-accounting netflow timeout flow-generic'), + 'icmp': vc.return_value('system flow-accounting netflow timeout icmp'), + 'maxlife': vc.return_value('system flow-accounting netflow timeout max-active-life'), + 'tcp.fin': vc.return_value('system flow-accounting netflow timeout tcp-fin'), + 'tcp': vc.return_value('system flow-accounting netflow timeout tcp-generic'), + 'tcp.rst': vc.return_value('system flow-accounting netflow timeout tcp-rst'), + 'udp': vc.return_value('system flow-accounting netflow timeout udp') + }, + 'servers': None + } + } + + # get interfaces list + if vc.exists('system flow-accounting interface'): + flow_config['interfaces'] = vc.return_values('system flow-accounting interface') + + # get sFlow collectors list + if vc.exists('system flow-accounting sflow server'): + flow_config['sflow']['servers'] = [] + sflow_collectors = vc.list_nodes('system flow-accounting sflow server') + for collector in sflow_collectors: + port = default_sflow_server_port + if vc.return_value("system flow-accounting sflow server {} port".format(collector)): + port = vc.return_value("system flow-accounting sflow server {} port".format(collector)) + flow_config['sflow']['servers'].append({ 'address': collector, 'port': port }) + + # get NetFlow collectors list + if vc.exists('system flow-accounting netflow server'): + flow_config['netflow']['servers'] = [] + netflow_collectors = vc.list_nodes('system flow-accounting netflow server') + for collector in netflow_collectors: + port = default_netflow_server_port + if vc.return_value("system flow-accounting netflow server {} port".format(collector)): + port = vc.return_value("system flow-accounting netflow server {} port".format(collector)) + flow_config['netflow']['servers'].append({ 'address': collector, 'port': port }) + + # get sflow agent-id + if flow_config['sflow']['agent-address'] == None or flow_config['sflow']['agent-address'] == 'auto': + flow_config['sflow']['agent-address'] = _sflow_default_agentip(vc) + + # get NetFlow version + if not flow_config['netflow']['version']: + flow_config['netflow']['version'] = default_netflow_version + + # convert NetFlow engine-id format, if this is necessary + if flow_config['netflow']['engine-id'] and flow_config['netflow']['version'] == '5': + regex_filter = re.compile('^\d+$') + if regex_filter.search(flow_config['netflow']['engine-id']): + flow_config['netflow']['engine-id'] = "{}:0".format(flow_config['netflow']['engine-id']) + + # return dict with flow-accounting configuration + return flow_config + +def verify(config): + # Verify that configuration is valid + # skip all checks if flow-accounting was removed + if not config['flow-accounting-configured']: + return True + + # check if at least one collector is enabled + if not (config['sflow']['configured'] or config['netflow']['configured'] or not config['disable-imt']): + raise ConfigError("You need to configure at least one sFlow or NetFlow protocol, or not set \"disable-imt\" for flow-accounting") + + # Check if at least one interface is configured + if not config['interfaces']: + raise ConfigError("You need to configure at least one interface for flow-accounting") + + # check that all configured interfaces exists in the system + for iface in config['interfaces']: + if not iface in vyos.interfaces.list_interfaces(): + # chnged from error to warning to allow adding dynamic interfaces and interface templates + # raise ConfigError("The {} interface is not presented in the system".format(iface)) + print("Warning: the {} interface is not presented in the system".format(iface)) + + # check sFlow configuration + if config['sflow']['configured']: + # check if at least one sFlow collector is configured if sFlow configuration is presented + if not config['sflow']['servers']: + raise ConfigError("You need to configure at least one sFlow server") + + # check that all sFlow collectors use the same IP protocol version + sflow_collector_ipver = None + for sflow_collector in config['sflow']['servers']: + if sflow_collector_ipver: + if sflow_collector_ipver != ipaddress.ip_address(sflow_collector['address']).version: + raise ConfigError("All sFlow servers must use the same IP protocol") + else: + sflow_collector_ipver = ipaddress.ip_address(sflow_collector['address']).version + + + # check agent-id for sFlow: we should avoid mixing IPv4 agent-id with IPv6 collectors and vice-versa + for sflow_collector in config['sflow']['servers']: + if ipaddress.ip_address(sflow_collector['address']).version != ipaddress.ip_address(config['sflow']['agent-address']).version: + raise ConfigError("Different IP address versions cannot be mixed in \"sflow agent-address\" and \"sflow server\". You need to set manually the same IP version for \"agent-address\" as for all sFlow servers") + + # check if configured sFlow agent-id exist in the system + agent_id_presented = None + for iface in vyos.interfaces.list_interfaces(): + for address in Interface(iface).get_addr(): + # check an IP, if this is not loopback + regex_filter = re.compile('^(?!(127)|(::1)|(fe80))(?P[a-f\d\.:]+)/\d+$') + if regex_filter.search(address): + if regex_filter.search(address).group('ipaddr') == config['sflow']['agent-address']: + agent_id_presented = True + break + if not agent_id_presented: + raise ConfigError("Your \"sflow agent-address\" does not exist in the system") + + # check NetFlow configuration + if config['netflow']['configured']: + # check if at least one NetFlow collector is configured if NetFlow configuration is presented + if not config['netflow']['servers']: + raise ConfigError("You need to configure at least one NetFlow server") + + # check if configured netflow source-ip exist in the system + if config['netflow']['source-ip']: + source_ip_presented = None + for iface in vyos.interfaces.list_interfaces(): + for address in Interface(iface).get_addr(): + # check an IP + regex_filter = re.compile('^(?!(127)|(::1)|(fe80))(?P[a-f\d\.:]+)/\d+$') + if regex_filter.search(address): + if regex_filter.search(address).group('ipaddr') == config['netflow']['source-ip']: + source_ip_presented = True + break + if not source_ip_presented: + raise ConfigError("Your \"netflow source-ip\" does not exist in the system") + + # check if engine-id compatible with selected protocol version + if config['netflow']['engine-id']: + v5_filter = '^(\d|[1-9]\d|1\d{2}|2[0-4]\d|25[0-5]):(\d|[1-9]\d|1\d{2}|2[0-4]\d|25[0-5])$' + v9v10_filter = '^(\d|[1-9]\d{1,8}|[1-3]\d{9}|4[01]\d{8}|42[0-8]\d{7}|429[0-3]\d{6}|4294[0-8]\d{5}|42949[0-5]\d{4}|429496[0-6]\d{3}|4294967[01]\d{2}|42949672[0-8]\d|429496729[0-5])$' + if config['netflow']['version'] == '5': + regex_filter = re.compile(v5_filter) + if not regex_filter.search(config['netflow']['engine-id']): + raise ConfigError("You cannot use NetFlow engine-id {} together with NetFlow protocol version {}".format(config['netflow']['engine-id'], config['netflow']['version'])) + else: + regex_filter = re.compile(v9v10_filter) + if not regex_filter.search(config['netflow']['engine-id']): + raise ConfigError("You cannot use NetFlow engine-id {} together with NetFlow protocol version {}".format(config['netflow']['engine-id'], config['netflow']['version'])) + + # return True if all checks were passed + return True + +def generate(config): + # skip all checks if flow-accounting was removed + if not config['flow-accounting-configured']: + return True + + # Calculate all necessary values + if config['buffer-size']: + # circular queue size + config['plugin_pipe_size'] = int(config['buffer-size']) * 1024**2 + else: + config['plugin_pipe_size'] = default_plugin_pipe_size * 1024**2 + # transfer buffer size + # recommended value from pmacct developers 1/1000 of pipe size + config['plugin_buffer_size'] = int(config['plugin_pipe_size'] / 1000) + + # Prepare a timeouts string + timeout_string = '' + for timeout_type, timeout_value in config['netflow']['timeout'].items(): + if timeout_value: + if timeout_string == '': + timeout_string = "{}{}={}".format(timeout_string, timeout_type, timeout_value) + else: + timeout_string = "{}:{}={}".format(timeout_string, timeout_type, timeout_value) + config['netflow']['timeout_string'] = timeout_string + + # Generate daemon configs + uacct_conf_template = Template(uacct_conf_jinja) + uacct_conf_file_data = uacct_conf_template.render(templatecfg = config, snaplen = default_captured_packet_size) + + # save generated config to uacctd.conf + with open(uacctd_conf_path, 'w') as file: + file.write(uacct_conf_file_data) + + +def apply(config): + # define variables + command = None + # Check if flow-accounting was removed and define command + if not config['flow-accounting-configured']: + command = '/usr/bin/sudo /bin/systemctl stop uacctd' + else: + command = '/usr/bin/sudo /bin/systemctl restart uacctd' + + # run command to start or stop flow-accounting + return_code = subprocess.call(command.split(' ')) + if not return_code == 0: + raise ConfigError("Failed to start/stop flow-accounting: command {} returned exit code {}".format(command, return_code)) + + # configure iptables rules for defined interfaces + if config['interfaces']: + _iptables_config(config['interfaces']) + else: + _iptables_config([]) + +if __name__ == '__main__': + try: + config = get_config() + verify(config) + generate(config) + apply(config) + except ConfigError as e: + print(e) + sys.exit(1) diff --git a/src/op_mode/flow_accounting_op.py b/src/op_mode/flow_accounting_op.py new file mode 100644 index 000000000..caaf22b31 --- /dev/null +++ b/src/op_mode/flow_accounting_op.py @@ -0,0 +1,233 @@ +#!/usr/bin/env python3 +# +# Copyright (C) 2018 VyOS maintainers and contributors +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 2 or later as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# + +import sys +import argparse +import re +import ipaddress +import subprocess +from tabulate import tabulate + +# some default values +uacctd_pidfile = '/var/run/uacctd.pid' +uacctd_pipefile = '/tmp/uacctd.pipe' + + +# check if ports argument have correct format +def _is_ports(ports): + # define regex for checking + regex_filter = re.compile('^(\d|[1-9]\d{1,3}|[1-5]\d{4}|6[0-4]\d{3}|65[0-4]\d{2}|655[0-2]\d|6553[0-5])$|^(\d|[1-9]\d{1,3}|[1-5]\d{4}|6[0-4]\d{3}|65[0-4]\d{2}|655[0-2]\d|6553[0-5])-(\d|[1-9]\d{1,3}|[1-5]\d{4}|6[0-4]\d{3}|65[0-4]\d{2}|655[0-2]\d|6553[0-5])$|^((\d|[1-9]\d{1,3}|[1-5]\d{4}|6[0-4]\d{3}|65[0-4]\d{2}|655[0-2]\d|6553[0-5]),)+(\d|[1-9]\d{1,3}|[1-5]\d{4}|6[0-4]\d{3}|65[0-4]\d{2}|655[0-2]\d|6553[0-5])$') + if not regex_filter.search(ports): + raise argparse.ArgumentTypeError("Invalid ports: {}".format(ports)) + + # check which type nitation is used: single port, ports list, ports range + # single port + regex_filter = re.compile('^(\d|[1-9]\d{1,3}|[1-5]\d{4}|6[0-4]\d{3}|65[0-4]\d{2}|655[0-2]\d|6553[0-5])$') + if regex_filter.search(ports): + filter_ports = { 'type': 'single', 'value': int(ports) } + + # ports list + regex_filter = re.compile('^((\d|[1-9]\d{1,3}|[1-5]\d{4}|6[0-4]\d{3}|65[0-4]\d{2}|655[0-2]\d|6553[0-5]),)+(\d|[1-9]\d{1,3}|[1-5]\d{4}|6[0-4]\d{3}|65[0-4]\d{2}|655[0-2]\d|6553[0-5])') + if regex_filter.search(ports): + filter_ports = { 'type': 'list', 'value': list(map(int, ports.split(','))) } + + # ports range + regex_filter = re.compile('^(?P\d|[1-9]\d{1,3}|[1-5]\d{4}|6[0-4]\d{3}|65[0-4]\d{2}|655[0-2]\d|6553[0-5])-(?P\d|[1-9]\d{1,3}|[1-5]\d{4}|6[0-4]\d{3}|65[0-4]\d{2}|655[0-2]\d|6553[0-5])$') + if regex_filter.search(ports): + # check if second number is greater than the first + if int(regex_filter.search(ports).group('first')) >= int(regex_filter.search(ports).group('second')): + raise argparse.ArgumentTypeError("Invalid ports: {}".format(ports)) + filter_ports = { 'type': 'range', 'value': range(int(regex_filter.search(ports).group('first')), int(regex_filter.search(ports).group('second'))) } + + # if all above failed + if not filter_ports: + raise argparse.ArgumentTypeError("Failed to parse: {}".format(ports)) + else: + return filter_ports + +# check if host argument have correct format +def _is_host(host): + # define regex for checking + if not ipaddress.ip_address(host): + raise argparse.ArgumentTypeError("Invalid host: {}".format(host)) + return host + +# check if flow-accounting running +def _uacctd_running(): + command = "/usr/bin/sudo /bin/systemctl status uacctd > /dev/null" + return_code = subprocess.call(command, shell=True) + if not return_code == 0: + return False + + # return True if all checks were passed + return True + +# get list of interfaces +def _get_ifaces_dict(): + # run command to get ifaces list + command = "/bin/ip link show" + process = subprocess.Popen(command.split(' '), stdout=subprocess.PIPE, universal_newlines=True) + stdout, stderr = process.communicate() + if not process.returncode == 0: + print("Failed to get interfaces list: command \"{}\" returned exit code: {}".format(command, process.returncode())) + sys.exit(1) + + # read output + ifaces_out = stdout.splitlines() + + # make a dictionary with interfaces and indexes + ifaces_dict = {} + regex_filter = re.compile('^(?P\d+):\ (?P[\w\d\.]+)[:@].*$') + for iface_line in ifaces_out: + if regex_filter.search(iface_line): + ifaces_dict[int(regex_filter.search(iface_line).group('iface_index'))] = regex_filter.search(iface_line).group('iface_name') + + # return dictioanry + return ifaces_dict + +# get list of flows +def _get_flows_list(): + # run command to get flows list + command = "/usr/bin/pmacct -s -O json -T flows -p {}".format(uacctd_pipefile) + process = subprocess.Popen(command.split(' '), stdout=subprocess.PIPE, universal_newlines=True) + stdout, stderr = process.communicate() + if not process.returncode == 0: + print("Failed to get flows list: command \"{}\" returned exit code: {}\nError: {}".format(command, process.returncode(), stderr)) + sys.exit(1) + + # read output + flows_out = stdout.splitlines() + + # make a list with flows + flows_list = [] + for flow_line in flows_out: + flows_list.append(eval(flow_line)) + + # return list of flows + return flows_list + +# filter and format flows +def _flows_filter(flows, ifaces): + # predefine filtered flows list + flows_filtered = [] + + # add interface names to flows + for flow in flows: + if flow['iface_in'] in ifaces: + flow['iface_in_name'] = ifaces[flow['iface_in']] + else: + flow['iface_in_name'] = 'unknown' + + # iterate through flows list + for flow in flows: + # filter by interface + if cmd_args.interface: + if flow['iface_in_name'] != cmd_args.interface: + continue + # filter by host + if cmd_args.host: + if flow['ip_src'] != cmd_args.host and flow['ip_dst'] != cmd_args.host: + continue + # filter by ports + if cmd_args.ports: + if cmd_args.ports['type'] == 'single': + if flow['port_src'] != cmd_args.ports['value'] and flow['port_dst'] != cmd_args.ports['value']: + continue + else: + if flow['port_src'] not in cmd_args.ports['value'] and flow['port_dst'] not in cmd_args.ports['value']: + continue + # add filtered flows to new list + flows_filtered.append(flow) + + # stop adding if we already reached top count + if cmd_args.top: + if len(flows_filtered) == cmd_args.top: + break + + # return filtered flows + return flows_filtered + +# print flow table +def _flows_table_print(flows): + #define headers and body + table_headers = [ 'IN_IFACE', 'SRC_MAC', 'DST_MAC', 'SRC_IP', 'DST_IP', 'SRC_PORT', 'DST_PORT', 'PROTOCOL', 'TOS', 'PACKETS', 'FLOWS', 'BYTES' ] + table_body = [] + # convert flows to list + for flow in flows: + table_body.append([flow['iface_in_name'], flow['mac_src'], flow['mac_dst'], flow['ip_src'], flow['ip_dst'], flow['port_src'], flow['port_dst'], flow['ip_proto'], flow['tos'], flow['packets'], flow['flows'], flow['bytes'] ]) + # configure and fill table + table = tabulate(table_body, table_headers, tablefmt="simple") + + # print formatted table + try: + print(table) + except IOError: + sys.exit(0) + except KeyboardInterrupt: + sys.exit(0) + + +# define program arguments +cmd_args_parser = argparse.ArgumentParser(description='show flow-accounting') +cmd_args_parser.add_argument('--action', choices=['show', 'clear', 'restart'], required=True, help='command to flow-accounting daemon') +cmd_args_parser.add_argument('--filter', choices=['interface', 'host', 'ports', 'top'], required=False, nargs='*', help='filter flows to display') +cmd_args_parser.add_argument('--interface', required=False, help='interface name for output filtration') +cmd_args_parser.add_argument('--host', type=_is_host, required=False, help='host address for output filtration') +cmd_args_parser.add_argument('--ports', type=_is_ports, required=False, help='ports number for output filtration') +cmd_args_parser.add_argument('--top', type=int, required=False, help='top records for output filtration') +# parse arguments +cmd_args = cmd_args_parser.parse_args() + + +# main logic +# do nothing if uacctd daemon is not running +if not _uacctd_running(): + print("flow-accounting is not active") + sys.exit(1) + +# restart pmacct daemon +if cmd_args.action == 'restart': + # run command to restart flow-accounting + command = '/usr/bin/sudo /bin/systemctl restart uacctd' + return_code = subprocess.call(command.split(' ')) + if not return_code == 0: + print("Failed to restart flow-accounting: command \"{}\" returned exit code: {}".format(command, return_code)) + sys.exit(1) + +# clear in-memory collected flows +if cmd_args.action == 'clear': + # run command to clear flows + command = "/usr/bin/pmacct -e -p {}".format(uacctd_pipefile) + return_code = subprocess.call(command.split(' ')) + if not return_code == 0: + print("Failed to clear flows: command \"{}\" returned exit code: {}".format(command, return_code)) + sys.exit(1) + +# show table with flows +if cmd_args.action == 'show': + # get interfaces index and names + ifaces_dict = _get_ifaces_dict() + # get flows + flows_list = _get_flows_list() + + # filter and format flows + tabledata = _flows_filter(flows_list, ifaces_dict) + + # print flows + _flows_table_print(tabledata) + +sys.exit(0) -- cgit v1.2.3