diff options
Diffstat (limited to 'python/vyos/util.py')
-rw-r--r-- | python/vyos/util.py | 76 |
1 files changed, 50 insertions, 26 deletions
diff --git a/python/vyos/util.py b/python/vyos/util.py index 2a3f6a228..16fcbf10b 100644 --- a/python/vyos/util.py +++ b/python/vyos/util.py @@ -1,4 +1,4 @@ -# Copyright 2020 VyOS maintainers and contributors <maintainers@vyos.io> +# Copyright 2020-2021 VyOS maintainers and contributors <maintainers@vyos.io> # # This library is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public @@ -22,25 +22,13 @@ import sys # where it is used so it is as local as possible to the execution # - -def _need_sudo(command): - return os.path.basename(command.split()[0]) in ('systemctl', ) - - -def _add_sudo(command): - if _need_sudo(command): - return 'sudo ' + command - return command - - from subprocess import Popen from subprocess import PIPE from subprocess import STDOUT from subprocess import DEVNULL - def popen(command, flag='', shell=None, input=None, timeout=None, env=None, - stdout=PIPE, stderr=PIPE, decode='utf-8', autosudo=True): + stdout=PIPE, stderr=PIPE, decode='utf-8'): """ popen is a wrapper helper aound subprocess.Popen with it default setting it will return a tuple (out, err) @@ -79,9 +67,6 @@ def popen(command, flag='', shell=None, input=None, timeout=None, env=None, if not debug.enabled(flag): flag = 'command' - if autosudo: - command = _add_sudo(command) - cmd_msg = f"cmd '{command}'" debug.message(cmd_msg, flag) @@ -98,11 +83,8 @@ def popen(command, flag='', shell=None, input=None, timeout=None, env=None, stdin = PIPE input = input.encode() if type(input) is str else input - p = Popen( - command, - stdin=stdin, stdout=stdout, stderr=stderr, - env=env, shell=use_shell, - ) + p = Popen(command, stdin=stdin, stdout=stdout, stderr=stderr, + env=env, shell=use_shell) pipe = p.communicate(input, timeout) @@ -135,7 +117,7 @@ def popen(command, flag='', shell=None, input=None, timeout=None, env=None, def run(command, flag='', shell=None, input=None, timeout=None, env=None, - stdout=DEVNULL, stderr=PIPE, decode='utf-8', autosudo=True): + stdout=DEVNULL, stderr=PIPE, decode='utf-8'): """ A wrapper around popen, which discard the stdout and will return the error code of a command @@ -151,8 +133,8 @@ def run(command, flag='', shell=None, input=None, timeout=None, env=None, def cmd(command, flag='', shell=None, input=None, timeout=None, env=None, - stdout=PIPE, stderr=PIPE, decode='utf-8', autosudo=True, - raising=None, message='', expect=[0]): + stdout=PIPE, stderr=PIPE, decode='utf-8', raising=None, message='', + expect=[0]): """ A wrapper around popen, which returns the stdout and will raise the error code of a command @@ -183,7 +165,7 @@ def cmd(command, flag='', shell=None, input=None, timeout=None, env=None, def call(command, flag='', shell=None, input=None, timeout=None, env=None, - stdout=PIPE, stderr=PIPE, decode='utf-8', autosudo=True): + stdout=PIPE, stderr=PIPE, decode='utf-8'): """ A wrapper around popen, which print the stdout and will return the error code of a command @@ -682,6 +664,16 @@ def get_interface_config(interface): tmp = loads(cmd(f'ip -d -j link show {interface}'))[0] return tmp +def get_interface_address(interface): + """ Returns the used encapsulation protocol for given interface. + If interface does not exist, None is returned. + """ + if not os.path.exists(f'/sys/class/net/{interface}'): + return None + from json import loads + tmp = loads(cmd(f'ip -d -j addr show {interface}'))[0] + return tmp + def get_all_vrfs(): """ Return a dictionary of all system wide known VRF instances """ from json import loads @@ -694,3 +686,35 @@ def get_all_vrfs(): name = entry.pop('name') data[name] = entry return data + +def cidr_fit(cidr_a, cidr_b, both_directions = False): + """ + Does CIDR A fit inside of CIDR B? + + Credit: https://gist.github.com/magnetikonline/686fde8ee0bce4d4930ce8738908a009 + """ + def split_cidr(cidr): + part_list = cidr.split("/") + if len(part_list) == 1: + # if just an IP address, assume /32 + part_list.append("32") + + # return address and prefix size + return part_list[0].strip(), int(part_list[1]) + def address_to_bits(address): + # convert each octet of IP address to binary + bit_list = [bin(int(part)) for part in address.split(".")] + + # join binary parts together + # note: part[2:] to slice off the leading "0b" from bin() results + return "".join([part[2:].zfill(8) for part in bit_list]) + def binary_network_prefix(cidr): + # return CIDR as bits, to the length of the prefix size only (drop the rest) + address, prefix_size = split_cidr(cidr) + return address_to_bits(address)[:prefix_size] + + prefix_a = binary_network_prefix(cidr_a) + prefix_b = binary_network_prefix(cidr_b) + if both_directions: + return prefix_a.startswith(prefix_b) or prefix_b.startswith(prefix_a) + return prefix_a.startswith(prefix_b) |