summaryrefslogtreecommitdiff
path: root/python/vyos/utils
diff options
context:
space:
mode:
Diffstat (limited to 'python/vyos/utils')
-rw-r--r--python/vyos/utils/config.py9
-rw-r--r--python/vyos/utils/convert.py5
-rw-r--r--python/vyos/utils/dict.py59
-rw-r--r--python/vyos/utils/file.py41
-rw-r--r--python/vyos/utils/io.py34
-rw-r--r--python/vyos/utils/network.py106
-rw-r--r--python/vyos/utils/process.py27
7 files changed, 256 insertions, 25 deletions
diff --git a/python/vyos/utils/config.py b/python/vyos/utils/config.py
index bd363ce46..33047010b 100644
--- a/python/vyos/utils/config.py
+++ b/python/vyos/utils/config.py
@@ -1,4 +1,4 @@
-# Copyright 2023 VyOS maintainers and contributors <maintainers@vyos.io>
+# Copyright 2023-2024 VyOS maintainers and contributors <maintainers@vyos.io>
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
@@ -31,4 +31,9 @@ def read_saved_value(path: list):
if not ct.exists(path):
return ''
res = ct.return_values(path)
- return res[0] if len(res) == 1 else res
+ if len(res) == 1:
+ return res[0]
+ res = ct.list_nodes(path)
+ if len(res) == 1:
+ return ' '.join(res)
+ return res
diff --git a/python/vyos/utils/convert.py b/python/vyos/utils/convert.py
index 9a8a1ff7d..c02f0071e 100644
--- a/python/vyos/utils/convert.py
+++ b/python/vyos/utils/convert.py
@@ -52,7 +52,8 @@ def seconds_to_human(s, separator=""):
return result
-def bytes_to_human(bytes, initial_exponent=0, precision=2):
+def bytes_to_human(bytes, initial_exponent=0, precision=2,
+ int_below_exponent=0):
""" Converts a value in bytes to a human-readable size string like 640 KB
The initial_exponent parameter is the exponent of 2,
@@ -68,6 +69,8 @@ def bytes_to_human(bytes, initial_exponent=0, precision=2):
# log2 is a float, while range checking requires an int
exponent = int(log2(bytes))
+ if exponent < int_below_exponent:
+ precision = 0
if exponent < 10:
value = bytes
diff --git a/python/vyos/utils/dict.py b/python/vyos/utils/dict.py
index 9484eacdd..d36b6fcfb 100644
--- a/python/vyos/utils/dict.py
+++ b/python/vyos/utils/dict.py
@@ -199,6 +199,31 @@ def dict_search_recursive(dict_object, key, path=[]):
for x in dict_search_recursive(j, key, new_path):
yield x
+
+def dict_set(key_path, value, dict_object):
+ """ Set value to Python dictionary (dict_object) using path to key delimited by dot (.).
+ The key will be added if it does not exist.
+ """
+ path_list = key_path.split(".")
+ dynamic_dict = dict_object
+ if len(path_list) > 0:
+ for i in range(0, len(path_list)-1):
+ dynamic_dict = dynamic_dict[path_list[i]]
+ dynamic_dict[path_list[len(path_list)-1]] = value
+
+def dict_delete(key_path, dict_object):
+ """ Delete key in Python dictionary (dict_object) using path to key delimited by dot (.).
+ """
+ path_dict = dict_object
+ path_list = key_path.split('.')
+ inside = path_list[:-1]
+ if not inside:
+ del dict_object[path_list]
+ else:
+ for key in path_list[:-1]:
+ path_dict = path_dict[key]
+ del path_dict[path_list[len(path_list)-1]]
+
def dict_to_list(d, save_key_to=None):
""" Convert a dict to a list of dicts.
@@ -228,6 +253,39 @@ def dict_to_list(d, save_key_to=None):
return collect
+def dict_to_paths_values(conf: dict) -> dict:
+ """
+ Convert nested dictionary to simple dictionary, where key is a path is delimited by dot (.).
+ """
+ list_of_paths = []
+ dict_of_options ={}
+ for path in dict_to_key_paths(conf):
+ str_path = '.'.join(path)
+ list_of_paths.append(str_path)
+
+ for path in list_of_paths:
+ dict_of_options[path] = dict_search(path,conf)
+
+ return dict_of_options
+def dict_to_key_paths(d: dict) -> list:
+ """ Generator to return list of key paths from dict of list[str]|str
+ """
+ def func(d, path):
+ if isinstance(d, dict):
+ if not d:
+ yield path
+ for k, v in d.items():
+ for r in func(v, path + [k]):
+ yield r
+ elif isinstance(d, list):
+ yield path
+ elif isinstance(d, str):
+ yield path
+ else:
+ raise ValueError('object is not a dict of strings/list of strings')
+ for r in func(d, []):
+ yield r
+
def dict_to_paths(d: dict) -> list:
""" Generator to return list of paths from dict of list[str]|str
"""
@@ -305,3 +363,4 @@ class FixedDict(dict):
if k not in self._allowed:
raise ConfigError(f'Option "{k}" has no defined default')
super().__setitem__(k, v)
+
diff --git a/python/vyos/utils/file.py b/python/vyos/utils/file.py
index 667a2464b..c566f0334 100644
--- a/python/vyos/utils/file.py
+++ b/python/vyos/utils/file.py
@@ -83,21 +83,34 @@ def read_json(fname, defaultonfailure=None):
return defaultonfailure
raise e
-def chown(path, user, group):
+def chown(path, user=None, group=None, recursive=False):
""" change file/directory owner """
from pwd import getpwnam
from grp import getgrnam
- if user is None or group is None:
+ if user is None and group is None:
return False
# path may also be an open file descriptor
if not isinstance(path, int) and not os.path.exists(path):
return False
- uid = getpwnam(user).pw_uid
- gid = getgrnam(group).gr_gid
- os.chown(path, uid, gid)
+ # keep current value if not specified otherwise
+ uid = -1
+ gid = -1
+
+ if user:
+ uid = getpwnam(user).pw_uid
+ if group:
+ gid = getgrnam(group).gr_gid
+
+ if recursive:
+ for dirpath, dirnames, filenames in os.walk(path):
+ os.chown(dirpath, uid, gid)
+ for filename in filenames:
+ os.chown(os.path.join(dirpath, filename), uid, gid)
+ else:
+ os.chown(path, uid, gid)
return True
@@ -134,6 +147,24 @@ def chmod_755(path):
S_IROTH | S_IXOTH
chmod(path, bitmask)
+def chmod_2775(path):
+ """ user/group permissions with set-group-id bit set """
+ from stat import S_ISGID, S_IRWXU, S_IRWXG, S_IROTH, S_IXOTH
+
+ bitmask = S_ISGID | S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH
+ chmod(path, bitmask)
+
+def chmod_775(path):
+ """ Make file executable by all """
+ from stat import S_IRUSR, S_IWUSR, S_IXUSR, S_IRGRP, S_IWGRP, S_IXGRP, S_IROTH, S_IXOTH
+
+ bitmask = S_IRUSR | S_IWUSR | S_IXUSR | S_IRGRP | S_IWGRP | S_IXGRP | \
+ S_IROTH | S_IXOTH
+ chmod(path, bitmask)
+
+def file_permissions(path):
+ """ Return file permissions in string format, e.g '0755' """
+ return oct(os.stat(path).st_mode)[4:]
def makedir(path, user=None, group=None):
if os.path.exists(path):
diff --git a/python/vyos/utils/io.py b/python/vyos/utils/io.py
index 8790cbaac..0afaf695c 100644
--- a/python/vyos/utils/io.py
+++ b/python/vyos/utils/io.py
@@ -13,6 +13,8 @@
# You should have received a copy of the GNU Lesser General Public
# License along with this library. If not, see <http://www.gnu.org/licenses/>.
+from typing import Callable
+
def print_error(str='', end='\n'):
"""
Print `str` to stderr, terminated with `end`.
@@ -24,13 +26,18 @@ def print_error(str='', end='\n'):
sys.stderr.write(end)
sys.stderr.flush()
-def ask_input(question, default='', numeric_only=False, valid_responses=[]):
+def ask_input(question, default='', numeric_only=False, valid_responses=[],
+ no_echo=False):
+ from getpass import getpass
question_out = question
if default:
question_out += f' (Default: {default})'
response = ''
while True:
- response = input(question_out + ' ').strip()
+ if not no_echo:
+ response = input(question_out + ' ').strip()
+ else:
+ response = getpass(question_out + ' ').strip()
if not response and default:
return default
if numeric_only:
@@ -72,3 +79,26 @@ def is_dumb_terminal():
"""Check if the current TTY is dumb, so that we can disable advanced terminal features."""
import os
return os.getenv('TERM') in ['vt100', 'dumb']
+
+def select_entry(l: list, list_msg: str = '', prompt_msg: str = '',
+ list_format: Callable = None,) -> str:
+ """Select an entry from a list
+
+ Args:
+ l (list): a list of entries
+ list_msg (str): a message to print before listing the entries
+ prompt_msg (str): a message to print as prompt for selection
+
+ Returns:
+ str: a selected entry
+ """
+ en = list(enumerate(l, 1))
+ print(list_msg)
+ for i, e in en:
+ if list_format:
+ print(f'\t{i}: {list_format(e)}')
+ else:
+ print(f'\t{i}: {e}')
+ select = ask_input(prompt_msg, numeric_only=True,
+ valid_responses=range(1, len(l)+1))
+ return next(filter(lambda x: x[0] == select, en))[1]
diff --git a/python/vyos/utils/network.py b/python/vyos/utils/network.py
index 9354bd495..cac59475d 100644
--- a/python/vyos/utils/network.py
+++ b/python/vyos/utils/network.py
@@ -61,14 +61,17 @@ def get_vrf_members(vrf: str) -> list:
"""
import json
from vyos.utils.process import cmd
- if not interface_exists(vrf):
- raise ValueError(f'VRF "{vrf}" does not exist!')
- output = cmd(f'ip --json --brief link show master {vrf}')
- answer = json.loads(output)
interfaces = []
- for data in answer:
- if 'ifname' in data:
- interfaces.append(data.get('ifname'))
+ try:
+ if not interface_exists(vrf):
+ raise ValueError(f'VRF "{vrf}" does not exist!')
+ output = cmd(f'ip --json --brief link show vrf {vrf}')
+ answer = json.loads(output)
+ for data in answer:
+ if 'ifname' in data:
+ interfaces.append(data.get('ifname'))
+ except:
+ pass
return interfaces
def get_interface_vrf(interface):
@@ -156,7 +159,9 @@ def is_wwan_connected(interface):
""" Determine if a given WWAN interface, e.g. wwan0 is connected to the
carrier network or not """
import json
+ from vyos.utils.dict import dict_search
from vyos.utils.process import cmd
+ from vyos.utils.process import is_systemd_service_active
if not interface.startswith('wwan'):
raise ValueError(f'Specified interface "{interface}" is not a WWAN interface')
@@ -197,6 +202,22 @@ def get_all_vrfs():
data[name] = entry
return data
+def interface_list() -> list:
+ from vyos.ifconfig import Section
+ """
+ Get list of interfaces in system
+ :rtype: list
+ """
+ return Section.interfaces()
+
+
+def vrf_list() -> list:
+ """
+ Get list of VRFs in system
+ :rtype: list
+ """
+ return list(get_all_vrfs().keys())
+
def mac2eui64(mac, prefix=None):
"""
Convert a MAC address to a EUI64 address or, with prefix provided, a full
@@ -289,7 +310,7 @@ def is_ipv6_link_local(addr):
return False
-def is_addr_assigned(ip_address, vrf=None) -> bool:
+def is_addr_assigned(ip_address, vrf=None, return_ifname=False) -> bool | str:
""" Verify if the given IPv4/IPv6 address is assigned to any interface """
from netifaces import interfaces
from vyos.utils.network import get_interface_config
@@ -304,7 +325,7 @@ def is_addr_assigned(ip_address, vrf=None) -> bool:
continue
if is_intf_addr_assigned(interface, ip_address):
- return True
+ return interface if return_ifname else True
return False
@@ -468,3 +489,70 @@ def get_vxlan_vlan_tunnels(interface: str) -> list:
os_configured_vlan_ids.append(str(vlanStart))
return os_configured_vlan_ids
+
+def get_vxlan_vni_filter(interface: str) -> list:
+ """ Return a list of strings with VNIs configured in the Kernel"""
+ from json import loads
+ from vyos.utils.process import cmd
+
+ if not interface.startswith('vxlan'):
+ raise ValueError('Only applicable for VXLAN interfaces!')
+
+ # Determine current OS Kernel configured VNI filters in VXLAN interface
+ #
+ # $ bridge -j vni show dev vxlan1
+ # [{"ifname":"vxlan1","vnis":[{"vni":100},{"vni":200},{"vni":300,"vniEnd":399}]}]
+ #
+ # Example output: ['10010', '10020', '10021', '10022']
+ os_configured_vnis = []
+ tmp = loads(cmd(f'bridge --json vni show dev {interface}'))
+ if tmp:
+ for tunnel in tmp[0].get('vnis', {}):
+ vniStart = tunnel['vni']
+ if 'vniEnd' in tunnel:
+ vniEnd = tunnel['vniEnd']
+ # Build a real list for user VNIs
+ vni_list = list(range(vniStart, vniEnd +1))
+ # Convert list of integers to list or strings
+ os_configured_vnis.extend(map(str, vni_list))
+ # Proceed with next tunnel - this one is complete
+ continue
+
+ # Add single tunel id - not part of a range
+ os_configured_vnis.append(str(vniStart))
+
+ return os_configured_vnis
+
+# Calculate prefix length of an IPv6 range, where possible
+# Python-ified from source: https://gitlab.isc.org/isc-projects/dhcp/-/blob/master/keama/confparse.c#L4591
+def ipv6_prefix_length(low, high):
+ import socket
+
+ bytemasks = [0x80, 0xc0, 0xe0, 0xf0, 0xf8, 0xfc, 0xfe, 0xff]
+
+ try:
+ lo = bytearray(socket.inet_pton(socket.AF_INET6, low))
+ hi = bytearray(socket.inet_pton(socket.AF_INET6, high))
+ except:
+ return None
+
+ xor = bytearray(a ^ b for a, b in zip(lo, hi))
+
+ plen = 0
+ while plen < 128 and xor[plen // 8] == 0:
+ plen += 8
+
+ if plen == 128:
+ return plen
+
+ for i in range((plen // 8) + 1, 16):
+ if xor[i] != 0:
+ return None
+
+ for i in range(8):
+ msk = ~xor[plen // 8] & 0xff
+
+ if msk == bytemasks[i]:
+ return plen + i + 1
+
+ return None
diff --git a/python/vyos/utils/process.py b/python/vyos/utils/process.py
index e09c7d86d..bd0644bc0 100644
--- a/python/vyos/utils/process.py
+++ b/python/vyos/utils/process.py
@@ -204,17 +204,32 @@ def process_running(pid_file):
pid = f.read().strip()
return pid_exists(int(pid))
-def process_named_running(name, cmdline: str=None):
+def process_named_running(name: str, cmdline: str=None, timeout: int=0):
""" Checks if process with given name is running and returns its PID.
If Process is not running, return None
"""
from psutil import process_iter
- for p in process_iter(['name', 'pid', 'cmdline']):
- if cmdline:
- if p.info['name'] == name and cmdline in p.info['cmdline']:
+ def check_process(name, cmdline):
+ for p in process_iter(['name', 'pid', 'cmdline']):
+ if cmdline:
+ if name in p.info['name'] and cmdline in p.info['cmdline']:
+ return p.info['pid']
+ elif name in p.info['name']:
return p.info['pid']
- elif p.info['name'] == name:
- return p.info['pid']
+ return None
+ if timeout:
+ import time
+ time_expire = time.time() + timeout
+ while True:
+ tmp = check_process(name, cmdline)
+ if not tmp:
+ if time.time() > time_expire:
+ break
+ time.sleep(0.100) # wait 250ms
+ continue
+ return tmp
+ else:
+ return check_process(name, cmdline)
return None
def is_systemd_service_active(service):