summaryrefslogtreecommitdiff
path: root/python/vyos/util.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/vyos/util.py')
-rw-r--r--python/vyos/util.py76
1 files changed, 50 insertions, 26 deletions
diff --git a/python/vyos/util.py b/python/vyos/util.py
index 2a3f6a228..16fcbf10b 100644
--- a/python/vyos/util.py
+++ b/python/vyos/util.py
@@ -1,4 +1,4 @@
-# Copyright 2020 VyOS maintainers and contributors <maintainers@vyos.io>
+# Copyright 2020-2021 VyOS maintainers and contributors <maintainers@vyos.io>
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
@@ -22,25 +22,13 @@ import sys
# where it is used so it is as local as possible to the execution
#
-
-def _need_sudo(command):
- return os.path.basename(command.split()[0]) in ('systemctl', )
-
-
-def _add_sudo(command):
- if _need_sudo(command):
- return 'sudo ' + command
- return command
-
-
from subprocess import Popen
from subprocess import PIPE
from subprocess import STDOUT
from subprocess import DEVNULL
-
def popen(command, flag='', shell=None, input=None, timeout=None, env=None,
- stdout=PIPE, stderr=PIPE, decode='utf-8', autosudo=True):
+ stdout=PIPE, stderr=PIPE, decode='utf-8'):
"""
popen is a wrapper helper aound subprocess.Popen
with it default setting it will return a tuple (out, err)
@@ -79,9 +67,6 @@ def popen(command, flag='', shell=None, input=None, timeout=None, env=None,
if not debug.enabled(flag):
flag = 'command'
- if autosudo:
- command = _add_sudo(command)
-
cmd_msg = f"cmd '{command}'"
debug.message(cmd_msg, flag)
@@ -98,11 +83,8 @@ def popen(command, flag='', shell=None, input=None, timeout=None, env=None,
stdin = PIPE
input = input.encode() if type(input) is str else input
- p = Popen(
- command,
- stdin=stdin, stdout=stdout, stderr=stderr,
- env=env, shell=use_shell,
- )
+ p = Popen(command, stdin=stdin, stdout=stdout, stderr=stderr,
+ env=env, shell=use_shell)
pipe = p.communicate(input, timeout)
@@ -135,7 +117,7 @@ def popen(command, flag='', shell=None, input=None, timeout=None, env=None,
def run(command, flag='', shell=None, input=None, timeout=None, env=None,
- stdout=DEVNULL, stderr=PIPE, decode='utf-8', autosudo=True):
+ stdout=DEVNULL, stderr=PIPE, decode='utf-8'):
"""
A wrapper around popen, which discard the stdout and
will return the error code of a command
@@ -151,8 +133,8 @@ def run(command, flag='', shell=None, input=None, timeout=None, env=None,
def cmd(command, flag='', shell=None, input=None, timeout=None, env=None,
- stdout=PIPE, stderr=PIPE, decode='utf-8', autosudo=True,
- raising=None, message='', expect=[0]):
+ stdout=PIPE, stderr=PIPE, decode='utf-8', raising=None, message='',
+ expect=[0]):
"""
A wrapper around popen, which returns the stdout and
will raise the error code of a command
@@ -183,7 +165,7 @@ def cmd(command, flag='', shell=None, input=None, timeout=None, env=None,
def call(command, flag='', shell=None, input=None, timeout=None, env=None,
- stdout=PIPE, stderr=PIPE, decode='utf-8', autosudo=True):
+ stdout=PIPE, stderr=PIPE, decode='utf-8'):
"""
A wrapper around popen, which print the stdout and
will return the error code of a command
@@ -682,6 +664,16 @@ def get_interface_config(interface):
tmp = loads(cmd(f'ip -d -j link show {interface}'))[0]
return tmp
+def get_interface_address(interface):
+ """ Returns the used encapsulation protocol for given interface.
+ If interface does not exist, None is returned.
+ """
+ if not os.path.exists(f'/sys/class/net/{interface}'):
+ return None
+ from json import loads
+ tmp = loads(cmd(f'ip -d -j addr show {interface}'))[0]
+ return tmp
+
def get_all_vrfs():
""" Return a dictionary of all system wide known VRF instances """
from json import loads
@@ -694,3 +686,35 @@ def get_all_vrfs():
name = entry.pop('name')
data[name] = entry
return data
+
+def cidr_fit(cidr_a, cidr_b, both_directions = False):
+ """
+ Does CIDR A fit inside of CIDR B?
+
+ Credit: https://gist.github.com/magnetikonline/686fde8ee0bce4d4930ce8738908a009
+ """
+ def split_cidr(cidr):
+ part_list = cidr.split("/")
+ if len(part_list) == 1:
+ # if just an IP address, assume /32
+ part_list.append("32")
+
+ # return address and prefix size
+ return part_list[0].strip(), int(part_list[1])
+ def address_to_bits(address):
+ # convert each octet of IP address to binary
+ bit_list = [bin(int(part)) for part in address.split(".")]
+
+ # join binary parts together
+ # note: part[2:] to slice off the leading "0b" from bin() results
+ return "".join([part[2:].zfill(8) for part in bit_list])
+ def binary_network_prefix(cidr):
+ # return CIDR as bits, to the length of the prefix size only (drop the rest)
+ address, prefix_size = split_cidr(cidr)
+ return address_to_bits(address)[:prefix_size]
+
+ prefix_a = binary_network_prefix(cidr_a)
+ prefix_b = binary_network_prefix(cidr_b)
+ if both_directions:
+ return prefix_a.startswith(prefix_b) or prefix_b.startswith(prefix_a)
+ return prefix_a.startswith(prefix_b)