diff options
| author | Daniil Baturin <daniil@vyos.io> | 2020-07-16 00:35:19 +0300 | 
|---|---|---|
| committer | Daniil Baturin <daniil@vyos.io> | 2020-07-16 00:35:19 +0300 | 
| commit | 69a795b1a24d111b9bedc6a9e2c86f95ad2bee21 (patch) | |
| tree | e5bb240d45427c97374352b0830f4911e7f76135 /src | |
| parent | 305c0910f08c8b6772be0247cfc03fa0282478c8 (diff) | |
| download | vyos-1x-69a795b1a24d111b9bedc6a9e2c86f95ad2bee21.tar.gz vyos-1x-69a795b1a24d111b9bedc6a9e2c86f95ad2bee21.zip | |
Improve the readability of the port validation. Also fixes T2708.
Diffstat (limited to 'src')
| -rwxr-xr-x | src/op_mode/flow_accounting_op.py | 82 | 
1 files changed, 43 insertions, 39 deletions
| diff --git a/src/op_mode/flow_accounting_op.py b/src/op_mode/flow_accounting_op.py index 9d0417cd4..6586cbceb 100755 --- a/src/op_mode/flow_accounting_op.py +++ b/src/op_mode/flow_accounting_op.py @@ -29,47 +29,42 @@ from vyos.logger import syslog  uacctd_pidfile = '/var/run/uacctd.pid'  uacctd_pipefile = '/tmp/uacctd.pipe' - -# check if ports argument have correct format -def _is_ports(ports): -    # define regex for checking -    regex_filter = re.compile(r'^(\d|[1-9]\d{1,3}|[1-5]\d{4}|6[0-4]\d{3}|65[0-4]\d{2}|655[0-2]\d|6553[0-5])$|^(\d|[1-9]\d{1,3}|[1-5]\d{4}|6[0-4]\d{3}|65[0-4]\d{2}|655[0-2]\d|6553[0-5])-(\d|[1-9]\d{1,3}|[1-5]\d{4}|6[0-4]\d{3}|65[0-4]\d{2}|655[0-2]\d|6553[0-5])$|^((\d|[1-9]\d{1,3}|[1-5]\d{4}|6[0-4]\d{3}|65[0-4]\d{2}|655[0-2]\d|6553[0-5]),)+(\d|[1-9]\d{1,3}|[1-5]\d{4}|6[0-4]\d{3}|65[0-4]\d{2}|655[0-2]\d|6553[0-5])$') -    if not regex_filter.search(ports): -        raise argparse.ArgumentTypeError("Invalid ports: {}".format(ports)) - -    # check which type nitation is used: single port, ports list, ports range -    # single port -    regex_filter = re.compile(r'^(\d|[1-9]\d{1,3}|[1-5]\d{4}|6[0-4]\d{3}|65[0-4]\d{2}|655[0-2]\d|6553[0-5])$') -    if regex_filter.search(ports): -        filter_ports = {'type': 'single', 'value': int(ports)} - -    # ports list -    regex_filter = re.compile(r'^((\d|[1-9]\d{1,3}|[1-5]\d{4}|6[0-4]\d{3}|65[0-4]\d{2}|655[0-2]\d|6553[0-5]),)+(\d|[1-9]\d{1,3}|[1-5]\d{4}|6[0-4]\d{3}|65[0-4]\d{2}|655[0-2]\d|6553[0-5])') -    if regex_filter.search(ports): -        filter_ports = {'type': 'list', 'value': list(map(int, ports.split(',')))} - -    # ports range -    regex_filter = re.compile(r'^(?P<first>\d|[1-9]\d{1,3}|[1-5]\d{4}|6[0-4]\d{3}|65[0-4]\d{2}|655[0-2]\d|6553[0-5])-(?P<second>\d|[1-9]\d{1,3}|[1-5]\d{4}|6[0-4]\d{3}|65[0-4]\d{2}|655[0-2]\d|6553[0-5])$') -    if regex_filter.search(ports): -        # check if second number is greater than the first -        if int(regex_filter.search(ports).group('first')) >= int(regex_filter.search(ports).group('second')): -            raise argparse.ArgumentTypeError("Invalid ports: {}".format(ports)) -        filter_ports = {'type': 'range', 'value': range(int(regex_filter.search(ports).group('first')), int(regex_filter.search(ports).group('second')))} - -    # if all above failed -    if not filter_ports: -        raise argparse.ArgumentTypeError("Failed to parse: {}".format(ports)) +def parse_port(port): +    try: +        port_num = int(port) +        if (port_num >= 0) and (port_num <= 65535): +            return port_num +        else: +            raise ValueError("out of the 0-65535 range".format(port)) +    except ValueError as e: +        raise ValueError("Incorrect port number \'{0}\': {1}".format(port, e)) + +def parse_ports(arg): +    if re.match(r'^\d+$', arg): +        # Single port +        port = parse_port(arg) +        return {"type": "single", "value": port} +    elif re.match(r'^\d+\-\d+$', arg): +        # Port range +        ports = arg.split("-") +        ports = list(map(parse_port, ports)) +        if ports[0] > ports[1]: +            raise ValueError("Malformed port range \'{0}\': lower end is greater than the higher".format(arg)) +        else: +            return {"type": "range", "value": (ports[0], ports[1])} +    elif re.match(r'^\d+,.*\d$', arg): +        # Port list +        ports = re.split(r',+', arg) # This allows duplicate commad like '1,,2,3,4' +        ports = list(map(parse_port, ports)) +        return {"type": "list", "value": ports}      else: -        return filter_ports - +        raise ValueError("Malformed port spec \'{0}\'".format(arg))  # check if host argument have correct format -def _is_host(host): +def check_host(host):      # define regex for checking      if not ipaddress.ip_address(host): -        raise argparse.ArgumentTypeError("Invalid host: {}".format(host)) -    return host - +        raise ValueError("Invalid host \'{}\', must be a valid IP or IPv6 address".format(host))  # check if flow-accounting running  def _uacctd_running(): @@ -205,12 +200,21 @@ cmd_args_parser = argparse.ArgumentParser(description='show flow-accounting')  cmd_args_parser.add_argument('--action', choices=['show', 'clear', 'restart'], required=True, help='command to flow-accounting daemon')  cmd_args_parser.add_argument('--filter', choices=['interface', 'host', 'ports', 'top'], required=False,  nargs='*', help='filter flows to display')  cmd_args_parser.add_argument('--interface', required=False, help='interface name for output filtration') -cmd_args_parser.add_argument('--host', type=_is_host, required=False, help='host address for output filtration') -cmd_args_parser.add_argument('--ports', type=_is_ports, required=False, help='ports number for output filtration') -cmd_args_parser.add_argument('--top', type=int, required=False, help='top records for output filtration') +cmd_args_parser.add_argument('--host', type=str, required=False, help='host address for output filtering') +cmd_args_parser.add_argument('--ports', type=str, required=False, help='port number, range or list for output filtering') +cmd_args_parser.add_argument('--top', type=int, required=False, help='top records for output filtering')  # parse arguments  cmd_args = cmd_args_parser.parse_args() +try: +    if cmd_args.host: +        check_host(cmd_args.host) + +    if cmd_args.ports: +        cmd_args.ports = parse_ports(cmd_args.ports) +except ValueError as e: +    print(e) +    sys.exit(1)  # main logic  # do nothing if uacctd daemon is not running | 
