summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChristian Breunig <christian@breunig.cc>2023-07-15 20:12:56 +0200
committerChristian Breunig <christian@breunig.cc>2023-07-15 20:13:12 +0200
commit5f77ccf91eb402c548fc91b2e080a4b2b86f4181 (patch)
tree9b926dedc07ef547ad8bbe539f89990249552414
parent9285b9a571ee944daf6f17847a62f115146834a4 (diff)
downloadvyos-1x-5f77ccf91eb402c548fc91b2e080a4b2b86f4181.tar.gz
vyos-1x-5f77ccf91eb402c548fc91b2e080a4b2b86f4181.zip
T5195: vyos.util -> vyos.utils package refactoring part #2
-rw-r--r--python/vyos/configdep.py2
-rw-r--r--python/vyos/configdict.py2
-rw-r--r--python/vyos/ifconfig/vrrp.py19
-rw-r--r--python/vyos/remote.py5
-rw-r--r--python/vyos/template.py6
-rw-r--r--python/vyos/util.py347
-rw-r--r--python/vyos/utils/__init__.py3
-rw-r--r--python/vyos/utils/convert.py30
-rw-r--r--python/vyos/utils/kernel.py25
-rw-r--r--python/vyos/utils/list.py20
-rw-r--r--python/vyos/utils/misc.py66
-rw-r--r--python/vyos/utils/network.py24
-rw-r--r--python/vyos/utils/permission.py15
-rw-r--r--python/vyos/utils/system.py33
-rw-r--r--python/vyos/validate.py1
-rw-r--r--smoketest/scripts/cli/base_accel_ppp_test.py2
-rwxr-xr-xsmoketest/scripts/cli/test_interfaces_wireless.py2
-rwxr-xr-xsrc/conf_mode/https.py2
-rwxr-xr-xsrc/conf_mode/interfaces-l2tpv3.py2
-rwxr-xr-xsrc/conf_mode/interfaces-openvpn.py2
-rwxr-xr-xsrc/conf_mode/interfaces-wireguard.py2
-rwxr-xr-xsrc/conf_mode/load-balancing-haproxy.py2
-rwxr-xr-xsrc/conf_mode/nat.py2
-rwxr-xr-xsrc/conf_mode/nat66.py2
-rwxr-xr-xsrc/conf_mode/system_lcd.py2
-rwxr-xr-xsrc/conf_mode/vpn_l2tp.py4
-rwxr-xr-xsrc/conf_mode/vpn_openconnect.py2
-rwxr-xr-xsrc/conf_mode/vpn_pptp.py2
-rwxr-xr-xsrc/conf_mode/vpn_sstp.py2
-rwxr-xr-xsrc/helpers/vyos-interface-rescan.py2
-rwxr-xr-xsrc/helpers/vyos-sudo.py2
-rwxr-xr-xsrc/op_mode/igmp-proxy.py9
-rwxr-xr-xsrc/op_mode/ipsec.py4
-rwxr-xr-xsrc/op_mode/memory.py2
-rwxr-xr-xsrc/op_mode/openvpn.py2
-rwxr-xr-xsrc/op_mode/pki.py8
-rwxr-xr-xsrc/op_mode/storage.py2
-rwxr-xr-xsrc/op_mode/uptime.py2
-rwxr-xr-xsrc/services/api/graphql/generate/schema_from_op_mode.py2
-rw-r--r--src/services/api/graphql/libs/op_mode.py2
-rwxr-xr-xsrc/tests/test_find_device_file.py2
-rw-r--r--src/tests/test_util.py2
42 files changed, 265 insertions, 404 deletions
diff --git a/python/vyos/configdep.py b/python/vyos/configdep.py
index d4b2cc78f..7a8559839 100644
--- a/python/vyos/configdep.py
+++ b/python/vyos/configdep.py
@@ -18,7 +18,7 @@ import json
import typing
from inspect import stack
-from vyos.util import load_as_module
+from vyos.utils.system import load_as_module
from vyos.defaults import directories
from vyos.configsource import VyOSError
from vyos import ConfigError
diff --git a/python/vyos/configdict.py b/python/vyos/configdict.py
index 2411a04c4..f642d38f2 100644
--- a/python/vyos/configdict.py
+++ b/python/vyos/configdict.py
@@ -590,7 +590,7 @@ def get_accel_dict(config, base, chap_secrets):
Return a dictionary with the necessary interface config keys.
"""
- from vyos.util import get_half_cpus
+ from vyos.utils.system import get_half_cpus
from vyos.template import is_ipv4
dict = config.get_config_dict(base, key_mangling=('-', '_'),
diff --git a/python/vyos/ifconfig/vrrp.py b/python/vyos/ifconfig/vrrp.py
index 47aaadecd..fde903a53 100644
--- a/python/vyos/ifconfig/vrrp.py
+++ b/python/vyos/ifconfig/vrrp.py
@@ -1,4 +1,4 @@
-# Copyright 2019 VyOS maintainers and contributors <maintainers@vyos.io>
+# Copyright 2019-2023 VyOS maintainers and contributors <maintainers@vyos.io>
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
@@ -21,8 +21,11 @@ from time import time
from time import sleep
from tabulate import tabulate
-from vyos import util
from vyos.configquery import ConfigTreeQuery
+from vyos.utils.convert import seconds_to_human
+from vyos.utils.file import read_file
+from vyos.utils.file import wait_for_file_write_complete
+from vyos.utils.process import process_running
class VRRPError(Exception):
pass
@@ -84,21 +87,21 @@ class VRRP(object):
def is_running(cls):
if not os.path.exists(cls.location['pid']):
return False
- return util.process_running(cls.location['pid'])
+ return process_running(cls.location['pid'])
@classmethod
def collect(cls, what):
fname = cls.location[what]
try:
# send signal to generate the configuration file
- pid = util.read_file(cls.location['pid'])
- util.wait_for_file_write_complete(fname,
+ pid = read_file(cls.location['pid'])
+ wait_for_file_write_complete(fname,
pre_hook=(lambda: os.kill(int(pid), cls._signal[what])),
timeout=30)
- return util.read_file(fname)
+ return read_file(fname)
except OSError:
- # raised by vyos.util.read_file
+ # raised by vyos.utils.file.read_file
raise VRRPNoData("VRRP data is not available (wait time exceeded)")
except FileNotFoundError:
raise VRRPNoData("VRRP data is not available (process not running or no active groups)")
@@ -145,7 +148,7 @@ class VRRP(object):
priority = data['effective_priority']
since = int(time() - float(data['last_transition']))
- last = util.seconds_to_human(since)
+ last = seconds_to_human(since)
groups.append([name, intf, vrid, state, priority, last])
diff --git a/python/vyos/remote.py b/python/vyos/remote.py
index 15939a523..ef8c23da4 100644
--- a/python/vyos/remote.py
+++ b/python/vyos/remote.py
@@ -33,14 +33,13 @@ from requests.adapters import HTTPAdapter
from requests.packages.urllib3 import PoolManager
from vyos.utils.io import ask_yes_no
-from vyos.util import begin
-from vyos.utils.process import cmd
from vyos.utils.io import make_incremental_progressbar
from vyos.utils.io import make_progressbar
from vyos.utils.io import print_error
+from vyos.utils.misc import begin
+from vyos.utils.process import cm
from vyos.version import get_version
-
CHUNK_SIZE = 8192
class InteractivePolicy(MissingHostKeyPolicy):
diff --git a/python/vyos/template.py b/python/vyos/template.py
index 3cf60cea9..7d1c3970f 100644
--- a/python/vyos/template.py
+++ b/python/vyos/template.py
@@ -162,19 +162,19 @@ def force_to_list(value):
@register_filter('seconds_to_human')
def seconds_to_human(seconds, separator=""):
""" Convert seconds to human-readable values like 1d6h15m23s """
- from vyos.util import seconds_to_human
+ from vyos.utils.convert import seconds_to_human
return seconds_to_human(seconds, separator=separator)
@register_filter('bytes_to_human')
def bytes_to_human(bytes, initial_exponent=0, precision=2):
""" Convert bytes to human-readable values like 1.44M """
- from vyos.util import bytes_to_human
+ from vyos.utils.convert import bytes_to_human
return bytes_to_human(bytes, initial_exponent=initial_exponent, precision=precision)
@register_filter('human_to_bytes')
def human_to_bytes(value):
""" Convert a data amount with a unit suffix to bytes, like 2K to 2048 """
- from vyos.util import human_to_bytes
+ from vyos.utils.convert import human_to_bytes
return human_to_bytes(value)
@register_filter('ip_from_cidr')
diff --git a/python/vyos/util.py b/python/vyos/util.py
index 0d7985d54..63539e897 100644
--- a/python/vyos/util.py
+++ b/python/vyos/util.py
@@ -13,16 +13,6 @@
# You should have received a copy of the GNU Lesser General Public
# License along with this library. If not, see <http://www.gnu.org/licenses/>.
-import os
-import re
-import sys
-
-#
-# NOTE: Do not import full classes here, move your import to the function
-# where it is used so it is as local as possible to the execution
-#
-
-
def _mangle_dict_keys(data, regex, replacement, abs_path=[], no_tag_node_value_mangle=False, mod=0):
""" Mangles dict keys according to a regex and replacement character.
Some libraries like Jinja2 do not like certain characters in dict keys.
@@ -34,6 +24,7 @@ def _mangle_dict_keys(data, regex, replacement, abs_path=[], no_tag_node_value_m
Returns: dict
"""
+ import re
from vyos.xml import is_tag
new_dict = {}
@@ -68,339 +59,3 @@ def _mangle_dict_keys(data, regex, replacement, abs_path=[], no_tag_node_value_m
def mangle_dict_keys(data, regex, replacement, abs_path=[], no_tag_node_value_mangle=False):
return _mangle_dict_keys(data, regex, replacement, abs_path=abs_path, no_tag_node_value_mangle=no_tag_node_value_mangle, mod=0)
-
-def is_list_equal(first: list, second: list) -> bool:
- """ Check if 2 lists are equal and list not empty """
- if len(first) != len(second) or len(first) == 0:
- return False
- return sorted(first) == sorted(second)
-
-def is_listen_port_bind_service(port: int, service: str) -> bool:
- """Check if listen port bound to expected program name
- :param port: Bind port
- :param service: Program name
- :return: bool
-
- Example:
- % is_listen_port_bind_service(443, 'nginx')
- True
- % is_listen_port_bind_service(443, 'ocserv-main')
- False
- """
- from psutil import net_connections as connections
- from psutil import Process as process
- for connection in connections():
- addr = connection.laddr
- pid = connection.pid
- pid_name = process(pid).name()
- pid_port = addr.port
- if service == pid_name and port == pid_port:
- return True
- return False
-
-def seconds_to_human(s, separator=""):
- """ Converts number of seconds passed to a human-readable
- interval such as 1w4d18h35m59s
- """
- s = int(s)
-
- week = 60 * 60 * 24 * 7
- day = 60 * 60 * 24
- hour = 60 * 60
-
- remainder = 0
- result = ""
-
- weeks = s // week
- if weeks > 0:
- result = "{0}w".format(weeks)
- s = s % week
-
- days = s // day
- if days > 0:
- result = "{0}{1}{2}d".format(result, separator, days)
- s = s % day
-
- hours = s // hour
- if hours > 0:
- result = "{0}{1}{2}h".format(result, separator, hours)
- s = s % hour
-
- minutes = s // 60
- if minutes > 0:
- result = "{0}{1}{2}m".format(result, separator, minutes)
- s = s % 60
-
- seconds = s
- if seconds > 0:
- result = "{0}{1}{2}s".format(result, separator, seconds)
-
- return result
-
-def bytes_to_human(bytes, initial_exponent=0, precision=2):
- """ Converts a value in bytes to a human-readable size string like 640 KB
-
- The initial_exponent parameter is the exponent of 2,
- e.g. 10 (1024) for kilobytes, 20 (1024 * 1024) for megabytes.
- """
-
- if bytes == 0:
- return "0 B"
-
- from math import log2
-
- bytes = bytes * (2**initial_exponent)
-
- # log2 is a float, while range checking requires an int
- exponent = int(log2(bytes))
-
- if exponent < 10:
- value = bytes
- suffix = "B"
- elif exponent in range(10, 20):
- value = bytes / 1024
- suffix = "KB"
- elif exponent in range(20, 30):
- value = bytes / 1024**2
- suffix = "MB"
- elif exponent in range(30, 40):
- value = bytes / 1024**3
- suffix = "GB"
- else:
- value = bytes / 1024**4
- suffix = "TB"
- # Add a new case when the first machine with petabyte RAM
- # hits the market.
-
- size_string = "{0:.{1}f} {2}".format(value, precision, suffix)
- return size_string
-
-def human_to_bytes(value):
- """ Converts a data amount with a unit suffix to bytes, like 2K to 2048 """
-
- from re import match as re_match
-
- res = re_match(r'^\s*(\d+(?:\.\d+)?)\s*([a-zA-Z]+)\s*$', value)
-
- if not res:
- raise ValueError(f"'{value}' is not a valid data amount")
- else:
- amount = float(res.group(1))
- unit = res.group(2).lower()
-
- if unit == 'b':
- res = amount
- elif (unit == 'k') or (unit == 'kb'):
- res = amount * 1024
- elif (unit == 'm') or (unit == 'mb'):
- res = amount * 1024**2
- elif (unit == 'g') or (unit == 'gb'):
- res = amount * 1024**3
- elif (unit == 't') or (unit == 'tb'):
- res = amount * 1024**4
- else:
- raise ValueError(f"Unsupported data unit '{unit}'")
-
- # There cannot be fractional bytes, so we convert them to integer.
- # However, truncating causes problems with conversion back to human unit,
- # so we round instead -- that seems to work well enough.
- return round(res)
-
-def get_cfg_group_id():
- from grp import getgrnam
- from vyos.defaults import cfg_group
-
- group_data = getgrnam(cfg_group)
- return group_data.gr_gid
-
-def wait_for_inotify(file_path, pre_hook=None, event_type=None, timeout=None, sleep_interval=0.1):
- """ Waits for an inotify event to occur """
- if not os.path.dirname(file_path):
- raise ValueError(
- "File path {} does not have a directory part (required for inotify watching)".format(file_path))
- if not os.path.basename(file_path):
- raise ValueError(
- "File path {} does not have a file part, do not know what to watch for".format(file_path))
-
- from inotify.adapters import Inotify
- from time import time
- from time import sleep
-
- time_start = time()
-
- i = Inotify()
- i.add_watch(os.path.dirname(file_path))
-
- if pre_hook:
- pre_hook()
-
- for event in i.event_gen(yield_nones=True):
- if (timeout is not None) and ((time() - time_start) > timeout):
- # If the function didn't return until this point,
- # the file failed to have been written to and closed within the timeout
- raise OSError("Waiting for file {} to be written has failed".format(file_path))
-
- # Most such events don't take much time, so it's better to check right away
- # and sleep later.
- if event is not None:
- (_, type_names, path, filename) = event
- if filename == os.path.basename(file_path):
- if event_type in type_names:
- return
- sleep(sleep_interval)
-
-def wait_for_file_write_complete(file_path, pre_hook=None, timeout=None, sleep_interval=0.1):
- """ Waits for a process to close a file after opening it in write mode. """
- wait_for_inotify(file_path,
- event_type='IN_CLOSE_WRITE', pre_hook=pre_hook, timeout=timeout, sleep_interval=sleep_interval)
-
-def is_admin() -> bool:
- """Look if current user is in sudo group"""
- from getpass import getuser
- from grp import getgrnam
- current_user = getuser()
- (_, _, _, admin_group_members) = getgrnam('sudo')
- return current_user in admin_group_members
-
-def mac2eui64(mac, prefix=None):
- """
- Convert a MAC address to a EUI64 address or, with prefix provided, a full
- IPv6 address.
- Thankfully copied from https://gist.github.com/wido/f5e32576bb57b5cc6f934e177a37a0d3
- """
- import re
- from ipaddress import ip_network
- # http://tools.ietf.org/html/rfc4291#section-2.5.1
- eui64 = re.sub(r'[.:-]', '', mac).lower()
- eui64 = eui64[0:6] + 'fffe' + eui64[6:]
- eui64 = hex(int(eui64[0:2], 16) ^ 2)[2:].zfill(2) + eui64[2:]
-
- if prefix is None:
- return ':'.join(re.findall(r'.{4}', eui64))
- else:
- try:
- net = ip_network(prefix, strict=False)
- euil = int('0x{0}'.format(eui64), 16)
- return str(net[euil])
- except: # pylint: disable=bare-except
- return
-
-def get_half_cpus():
- """ return 1/2 of the numbers of available CPUs """
- cpu = os.cpu_count()
- if cpu > 1:
- cpu /= 2
- return int(cpu)
-
-def check_kmod(k_mod):
- """ Common utility function to load required kernel modules on demand """
- from vyos import ConfigError
- from vyos.utils.process import call
- if isinstance(k_mod, str):
- k_mod = k_mod.split()
- for module in k_mod:
- if not os.path.exists(f'/sys/module/{module}'):
- if call(f'modprobe {module}') != 0:
- raise ConfigError(f'Loading Kernel module {module} failed')
-
-def find_device_file(device):
- """ Recurively search /dev for the given device file and return its full path.
- If no device file was found 'None' is returned """
- from fnmatch import fnmatch
-
- for root, dirs, files in os.walk('/dev'):
- for basename in files:
- if fnmatch(basename, device):
- return os.path.join(root, basename)
-
- return None
-
-def convert_data(data):
- """Convert multiple types of data to types usable in CLI
-
- Args:
- data (str | bytes | list | OrderedDict): input data
-
- Returns:
- str | list | dict: converted data
- """
- from base64 import b64encode
- from collections import OrderedDict
-
- if isinstance(data, str):
- return data
- if isinstance(data, bytes):
- try:
- return data.decode()
- except UnicodeDecodeError:
- return b64encode(data).decode()
- if isinstance(data, list):
- list_tmp = []
- for item in data:
- list_tmp.append(convert_data(item))
- return list_tmp
- if isinstance(data, OrderedDict):
- dict_tmp = {}
- for key, value in data.items():
- dict_tmp[key] = convert_data(value)
- return dict_tmp
-
-def begin(*args):
- """
- Evaluate arguments in order and return the result of the *last* argument.
- For combining multiple expressions in one statement. Useful for lambdas.
- """
- return args[-1]
-
-def begin0(*args):
- """
- Evaluate arguments in order and return the result of the *first* argument.
- For combining multiple expressions in one statement. Useful for lambdas.
- """
- return args[0]
-
-def install_into_config(conf, config_paths, override_prompt=True):
- # Allows op-mode scripts to install values if called from an active config session
- # config_paths: dict of config paths
- # override_prompt: if True, user will be prompted before existing nodes are overwritten
- if not config_paths:
- return None
-
- from vyos.config import Config
- from vyos.utils.io import ask_yes_no
- from vyos.utils.process import cmd
- if not Config().in_session():
- print('You are not in configure mode, commands to install manually from configure mode:')
- for path in config_paths:
- print(f'set {path}')
- return None
-
- count = 0
- failed = []
-
- for path in config_paths:
- if override_prompt and conf.exists(path) and not conf.is_multi(path):
- if not ask_yes_no(f'Config node "{node}" already exists. Do you want to overwrite it?'):
- continue
-
- try:
- cmd(f'/opt/vyatta/sbin/my_set {path}')
- count += 1
- except:
- failed.append(path)
-
- if failed:
- print(f'Failed to install {len(failed)} value(s). Commands to manually install:')
- for path in failed:
- print(f'set {path}')
-
- if count > 0:
- print(f'{count} value(s) installed. Use "compare" to see the pending changes, and "commit" to apply.')
-
-def load_as_module(name: str, path: str):
- import importlib.util
-
- spec = importlib.util.spec_from_file_location(name, path)
- mod = importlib.util.module_from_spec(spec)
- spec.loader.exec_module(mod)
- return mod
diff --git a/python/vyos/utils/__init__.py b/python/vyos/utils/__init__.py
index abc9af5da..6cca4e935 100644
--- a/python/vyos/utils/__init__.py
+++ b/python/vyos/utils/__init__.py
@@ -19,6 +19,9 @@ from vyos.utils import convert
from vyos.utils import dict
from vyos.utils import file
from vyos.utils import io
+from vyos.utils import kernel
+from vyos.utils import list
+from vyos.utils import misc
from vyos.utils import network
from vyos.utils import permission
from vyos.utils import process
diff --git a/python/vyos/utils/convert.py b/python/vyos/utils/convert.py
index 975c67e0a..ec2333ef0 100644
--- a/python/vyos/utils/convert.py
+++ b/python/vyos/utils/convert.py
@@ -143,3 +143,33 @@ def mac_to_eui64(mac, prefix=None):
return str(net[euil])
except: # pylint: disable=bare-except
return
+
+def convert_data(data):
+ """Convert multiple types of data to types usable in CLI
+
+ Args:
+ data (str | bytes | list | OrderedDict): input data
+
+ Returns:
+ str | list | dict: converted data
+ """
+ from base64 import b64encode
+ from collections import OrderedDict
+
+ if isinstance(data, str):
+ return data
+ if isinstance(data, bytes):
+ try:
+ return data.decode()
+ except UnicodeDecodeError:
+ return b64encode(data).decode()
+ if isinstance(data, list):
+ list_tmp = []
+ for item in data:
+ list_tmp.append(convert_data(item))
+ return list_tmp
+ if isinstance(data, OrderedDict):
+ dict_tmp = {}
+ for key, value in data.items():
+ dict_tmp[key] = convert_data(value)
+ return dict_tmp
diff --git a/python/vyos/utils/kernel.py b/python/vyos/utils/kernel.py
new file mode 100644
index 000000000..d950b8e75
--- /dev/null
+++ b/python/vyos/utils/kernel.py
@@ -0,0 +1,25 @@
+# Copyright 2023 VyOS maintainers and contributors <maintainers@vyos.io>
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library. If not, see <http://www.gnu.org/licenses/>.
+
+def check_kmod(k_mod):
+ """ Common utility function to load required kernel modules on demand """
+ from vyos import ConfigError
+ from vyos.utils.process import call
+ if isinstance(k_mod, str):
+ k_mod = k_mod.split()
+ for module in k_mod:
+ if not os.path.exists(f'/sys/module/{module}'):
+ if call(f'modprobe {module}') != 0:
+ raise ConfigError(f'Loading Kernel module {module} failed')
diff --git a/python/vyos/utils/list.py b/python/vyos/utils/list.py
new file mode 100644
index 000000000..63ef720ab
--- /dev/null
+++ b/python/vyos/utils/list.py
@@ -0,0 +1,20 @@
+# Copyright 2023 VyOS maintainers and contributors <maintainers@vyos.io>
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library. If not, see <http://www.gnu.org/licenses/>.
+
+def is_list_equal(first: list, second: list) -> bool:
+ """ Check if 2 lists are equal and list not empty """
+ if len(first) != len(second) or len(first) == 0:
+ return False
+ return sorted(first) == sorted(second)
diff --git a/python/vyos/utils/misc.py b/python/vyos/utils/misc.py
new file mode 100644
index 000000000..d82655914
--- /dev/null
+++ b/python/vyos/utils/misc.py
@@ -0,0 +1,66 @@
+# Copyright 2023 VyOS maintainers and contributors <maintainers@vyos.io>
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library. If not, see <http://www.gnu.org/licenses/>.
+
+def begin(*args):
+ """
+ Evaluate arguments in order and return the result of the *last* argument.
+ For combining multiple expressions in one statement. Useful for lambdas.
+ """
+ return args[-1]
+
+def begin0(*args):
+ """
+ Evaluate arguments in order and return the result of the *first* argument.
+ For combining multiple expressions in one statement. Useful for lambdas.
+ """
+ return args[0]
+
+def install_into_config(conf, config_paths, override_prompt=True):
+ # Allows op-mode scripts to install values if called from an active config session
+ # config_paths: dict of config paths
+ # override_prompt: if True, user will be prompted before existing nodes are overwritten
+ if not config_paths:
+ return None
+
+ from vyos.config import Config
+ from vyos.utils.io import ask_yes_no
+ from vyos.utils.process import cmd
+ if not Config().in_session():
+ print('You are not in configure mode, commands to install manually from configure mode:')
+ for path in config_paths:
+ print(f'set {path}')
+ return None
+
+ count = 0
+ failed = []
+
+ for path in config_paths:
+ if override_prompt and conf.exists(path) and not conf.is_multi(path):
+ if not ask_yes_no(f'Config node "{node}" already exists. Do you want to overwrite it?'):
+ continue
+
+ try:
+ cmd(f'/opt/vyatta/sbin/my_set {path}')
+ count += 1
+ except:
+ failed.append(path)
+
+ if failed:
+ print(f'Failed to install {len(failed)} value(s). Commands to manually install:')
+ for path in failed:
+ print(f'set {path}')
+
+ if count > 0:
+ print(f'{count} value(s) installed. Use "compare" to see the pending changes, and "commit" to apply.')
diff --git a/python/vyos/utils/network.py b/python/vyos/utils/network.py
index 8db598f05..3786caf26 100644
--- a/python/vyos/utils/network.py
+++ b/python/vyos/utils/network.py
@@ -154,7 +154,6 @@ def mac2eui64(mac, prefix=None):
except: # pylint: disable=bare-except
return
-
def check_port_availability(ipaddress, port, protocol):
"""
Check if port is available and not used by any service
@@ -189,3 +188,26 @@ def check_port_availability(ipaddress, port, protocol):
return False
return True
+
+def is_listen_port_bind_service(port: int, service: str) -> bool:
+ """Check if listen port bound to expected program name
+ :param port: Bind port
+ :param service: Program name
+ :return: bool
+
+ Example:
+ % is_listen_port_bind_service(443, 'nginx')
+ True
+ % is_listen_port_bind_service(443, 'ocserv-main')
+ False
+ """
+ from psutil import net_connections as connections
+ from psutil import Process as process
+ for connection in connections():
+ addr = connection.laddr
+ pid = connection.pid
+ pid_name = process(pid).name()
+ pid_port = addr.port
+ if service == pid_name and port == pid_port:
+ return True
+ return False
diff --git a/python/vyos/utils/permission.py b/python/vyos/utils/permission.py
index 8c2d72b83..d938b494f 100644
--- a/python/vyos/utils/permission.py
+++ b/python/vyos/utils/permission.py
@@ -61,3 +61,18 @@ def chmod_755(path):
bitmask = S_IRUSR | S_IWUSR | S_IXUSR | S_IRGRP | S_IXGRP | \
S_IROTH | S_IXOTH
chmod(path, bitmask)
+
+def is_admin() -> bool:
+ """Look if current user is in sudo group"""
+ from getpass import getuser
+ from grp import getgrnam
+ current_user = getuser()
+ (_, _, _, admin_group_members) = getgrnam('sudo')
+ return current_user in admin_group_members
+
+def get_cfg_group_id():
+ from grp import getgrnam
+ from vyos.defaults import cfg_group
+
+ group_data = getgrnam(cfg_group)
+ return group_data.gr_gid
diff --git a/python/vyos/utils/system.py b/python/vyos/utils/system.py
index 7102d5985..5d41c0c05 100644
--- a/python/vyos/utils/system.py
+++ b/python/vyos/utils/system.py
@@ -13,9 +13,9 @@
# You should have received a copy of the GNU Lesser General Public
# License along with this library. If not, see <http://www.gnu.org/licenses/>.
+import os
from subprocess import run
-
def sysctl_read(name: str) -> str:
"""Read and return current value of sysctl() option
@@ -28,7 +28,6 @@ def sysctl_read(name: str) -> str:
tmp = run(['sysctl', '-nb', name], capture_output=True)
return tmp.stdout.decode()
-
def sysctl_write(name: str, value: str | int) -> bool:
"""Change value via sysctl()
@@ -56,13 +55,12 @@ def sysctl_write(name: str, value: str | int) -> bool:
# False in other cases
return False
-
def sysctl_apply(sysctl_dict: dict[str, str], revert: bool = True) -> bool:
"""Apply sysctl values.
Args:
sysctl_dict (dict[str, str]): dictionary with sysctl keys with values
- revert (bool, optional): Revert to original values if new were not
+ revert (bool, optional): Revert to original values if new were not
applied. Defaults to True.
Returns:
@@ -80,3 +78,30 @@ def sysctl_apply(sysctl_dict: dict[str, str], revert: bool = True) -> bool:
return False
# everything applied
return True
+
+def get_half_cpus():
+ """ return 1/2 of the numbers of available CPUs """
+ cpu = os.cpu_count()
+ if cpu > 1:
+ cpu /= 2
+ return int(cpu)
+
+def find_device_file(device):
+ """ Recurively search /dev for the given device file and return its full path.
+ If no device file was found 'None' is returned """
+ from fnmatch import fnmatch
+
+ for root, dirs, files in os.walk('/dev'):
+ for basename in files:
+ if fnmatch(basename, device):
+ return os.path.join(root, basename)
+
+ return None
+
+def load_as_module(name: str, path: str):
+ import importlib.util
+
+ spec = importlib.util.spec_from_file_location(name, path)
+ mod = importlib.util.module_from_spec(spec)
+ spec.loader.exec_module(mod)
+ return mod
diff --git a/python/vyos/validate.py b/python/vyos/validate.py
index 7afbe81c9..567f4c972 100644
--- a/python/vyos/validate.py
+++ b/python/vyos/validate.py
@@ -102,6 +102,7 @@ def is_addr_assigned(ip_address, vrf=None) -> bool:
from netifaces import interfaces
from vyos.utils.network import get_interface_config
from vyos.utils.dict import dict_search
+
for interface in interfaces():
# Check if interface belongs to the requested VRF, if this is not the
# case there is no need to proceed with this data set - continue loop
diff --git a/smoketest/scripts/cli/base_accel_ppp_test.py b/smoketest/scripts/cli/base_accel_ppp_test.py
index 105c09f6e..989028f64 100644
--- a/smoketest/scripts/cli/base_accel_ppp_test.py
+++ b/smoketest/scripts/cli/base_accel_ppp_test.py
@@ -21,7 +21,7 @@ from configparser import ConfigParser
from vyos.configsession import ConfigSession
from vyos.configsession import ConfigSessionError
from vyos.template import is_ipv4
-from vyos.util import get_half_cpus
+from vyos.utils.system import get_half_cpus
from vyos.utils.process import process_named_running
from vyos.utils.process import cmd
diff --git a/smoketest/scripts/cli/test_interfaces_wireless.py b/smoketest/scripts/cli/test_interfaces_wireless.py
index a13a7531f..875ca9dc6 100755
--- a/smoketest/scripts/cli/test_interfaces_wireless.py
+++ b/smoketest/scripts/cli/test_interfaces_wireless.py
@@ -23,7 +23,7 @@ from glob import glob
from vyos.configsession import ConfigSessionError
from vyos.utils.process import process_named_running
-from vyos.util import check_kmod
+from vyos.utils.kernel import check_kmod
from vyos.utils.file import read_file
def get_config_value(interface, key):
diff --git a/src/conf_mode/https.py b/src/conf_mode/https.py
index 26f515d1c..010490c7e 100755
--- a/src/conf_mode/https.py
+++ b/src/conf_mode/https.py
@@ -30,7 +30,7 @@ from vyos.pki import wrap_private_key
from vyos.template import render
from vyos.utils.process import call
from vyos.utils.network import check_port_availability
-from vyos.util import is_listen_port_bind_service
+from vyos.utils.network import is_listen_port_bind_service
from vyos.utils.file import write_file
from vyos import airbag
diff --git a/src/conf_mode/interfaces-l2tpv3.py b/src/conf_mode/interfaces-l2tpv3.py
index ca321e01d..6efeac302 100755
--- a/src/conf_mode/interfaces-l2tpv3.py
+++ b/src/conf_mode/interfaces-l2tpv3.py
@@ -28,7 +28,7 @@ from vyos.configverify import verify_mtu_ipv6
from vyos.configverify import verify_mirror_redirect
from vyos.configverify import verify_bond_bridge_member
from vyos.ifconfig import L2TPv3If
-from vyos.util import check_kmod
+from vyos.utils.kernel import check_kmod
from vyos.validate import is_addr_assigned
from vyos import ConfigError
from vyos import airbag
diff --git a/src/conf_mode/interfaces-openvpn.py b/src/conf_mode/interfaces-openvpn.py
index b447e0a67..607a19385 100755
--- a/src/conf_mode/interfaces-openvpn.py
+++ b/src/conf_mode/interfaces-openvpn.py
@@ -52,7 +52,7 @@ from vyos.template import is_ipv4
from vyos.template import is_ipv6
from vyos.utils.dict import dict_search
from vyos.utils.dict import dict_search_args
-from vyos.util import is_list_equal
+from vyos.utils.list import is_list_equal
from vyos.utils.file import makedir
from vyos.utils.file import read_file
from vyos.utils.file import write_file
diff --git a/src/conf_mode/interfaces-wireguard.py b/src/conf_mode/interfaces-wireguard.py
index 490751bb4..a02baba82 100755
--- a/src/conf_mode/interfaces-wireguard.py
+++ b/src/conf_mode/interfaces-wireguard.py
@@ -27,7 +27,7 @@ from vyos.configverify import verify_mtu_ipv6
from vyos.configverify import verify_mirror_redirect
from vyos.configverify import verify_bond_bridge_member
from vyos.ifconfig import WireGuardIf
-from vyos.util import check_kmod
+from vyos.utils.kernel import check_kmod
from vyos.utils.network import check_port_availability
from vyos import ConfigError
from vyos import airbag
diff --git a/src/conf_mode/load-balancing-haproxy.py b/src/conf_mode/load-balancing-haproxy.py
index 151d9bcbc..2fb0edf8e 100755
--- a/src/conf_mode/load-balancing-haproxy.py
+++ b/src/conf_mode/load-balancing-haproxy.py
@@ -23,7 +23,7 @@ from vyos.config import Config
from vyos.configdict import dict_merge
from vyos.utils.process import call
from vyos.utils.network import check_port_availability
-from vyos.util import is_listen_port_bind_service
+from vyos.utils.network import is_listen_port_bind_service
from vyos.pki import wrap_certificate
from vyos.pki import wrap_private_key
from vyos.template import render
diff --git a/src/conf_mode/nat.py b/src/conf_mode/nat.py
index e17dee835..5f4b658f8 100755
--- a/src/conf_mode/nat.py
+++ b/src/conf_mode/nat.py
@@ -28,7 +28,7 @@ from vyos.config import Config
from vyos.configdict import dict_merge
from vyos.template import render
from vyos.template import is_ip_network
-from vyos.util import check_kmod
+from vyos.utils.kernel import check_kmod
from vyos.utils.dict import dict_search
from vyos.utils.dict import dict_search_args
from vyos.utils.process import cmd
diff --git a/src/conf_mode/nat66.py b/src/conf_mode/nat66.py
index 12d503ef9..25f625b84 100755
--- a/src/conf_mode/nat66.py
+++ b/src/conf_mode/nat66.py
@@ -26,7 +26,7 @@ from vyos.config import Config
from vyos.configdict import dict_merge
from vyos.template import render
from vyos.utils.process import cmd
-from vyos.util import check_kmod
+from vyos.utils.kernel import check_kmod
from vyos.utils.dict import dict_search
from vyos.template import is_ipv6
from vyos.xml import defaults
diff --git a/src/conf_mode/system_lcd.py b/src/conf_mode/system_lcd.py
index 582933069..eb88224d1 100755
--- a/src/conf_mode/system_lcd.py
+++ b/src/conf_mode/system_lcd.py
@@ -20,7 +20,7 @@ from sys import exit
from vyos.config import Config
from vyos.utils.process import call
-from vyos.util import find_device_file
+from vyos.utils.system import find_device_file
from vyos.template import render
from vyos import ConfigError
from vyos import airbag
diff --git a/src/conf_mode/vpn_l2tp.py b/src/conf_mode/vpn_l2tp.py
index 0645854c9..6232ce64a 100755
--- a/src/conf_mode/vpn_l2tp.py
+++ b/src/conf_mode/vpn_l2tp.py
@@ -27,9 +27,9 @@ from vyos.config import Config
from vyos.template import is_ipv4
from vyos.template import render
from vyos.utils.process import call
-from vyos.util import get_half_cpus
+from vyos.utils.system import get_half_cpus
from vyos.utils.network import check_port_availability
-from vyos.util import is_listen_port_bind_service
+from vyos.utils.network import is_listen_port_bind_service
from vyos import ConfigError
from vyos import airbag
diff --git a/src/conf_mode/vpn_openconnect.py b/src/conf_mode/vpn_openconnect.py
index 71cd9040a..e82862fa3 100755
--- a/src/conf_mode/vpn_openconnect.py
+++ b/src/conf_mode/vpn_openconnect.py
@@ -26,7 +26,7 @@ from vyos.template import render
from vyos.utils.process import call
from vyos.utils.network import check_port_availability
from vyos.utils.process import is_systemd_service_running
-from vyos.util import is_listen_port_bind_service
+from vyos.utils.network import is_listen_port_bind_service
from vyos.utils.dict import dict_search
from vyos.xml import defaults
from vyos import ConfigError
diff --git a/src/conf_mode/vpn_pptp.py b/src/conf_mode/vpn_pptp.py
index e209d674a..d542f57fe 100755
--- a/src/conf_mode/vpn_pptp.py
+++ b/src/conf_mode/vpn_pptp.py
@@ -23,7 +23,7 @@ from sys import exit
from vyos.config import Config
from vyos.template import render
-from vyos.util import get_half_cpus
+from vyos.utils.system import get_half_cpus
from vyos.utils.process import call
from vyos import ConfigError
diff --git a/src/conf_mode/vpn_sstp.py b/src/conf_mode/vpn_sstp.py
index 5a1e7da87..e98d8385b 100755
--- a/src/conf_mode/vpn_sstp.py
+++ b/src/conf_mode/vpn_sstp.py
@@ -28,7 +28,7 @@ from vyos.template import render
from vyos.utils.process import call
from vyos.utils.network import check_port_availability
from vyos.utils.dict import dict_search
-from vyos.util import is_listen_port_bind_service
+from vyos.utils.network import is_listen_port_bind_service
from vyos.utils.file import write_file
from vyos import ConfigError
from vyos import airbag
diff --git a/src/helpers/vyos-interface-rescan.py b/src/helpers/vyos-interface-rescan.py
index 1ac1810e0..012357259 100755
--- a/src/helpers/vyos-interface-rescan.py
+++ b/src/helpers/vyos-interface-rescan.py
@@ -24,7 +24,7 @@ import netaddr
from vyos.configtree import ConfigTree
from vyos.defaults import directories
-from vyos.util import get_cfg_group_id
+from vyos.utils.permission import get_cfg_group_id
debug = False
diff --git a/src/helpers/vyos-sudo.py b/src/helpers/vyos-sudo.py
index 3e4c196d9..75dd7f29d 100755
--- a/src/helpers/vyos-sudo.py
+++ b/src/helpers/vyos-sudo.py
@@ -18,7 +18,7 @@
import os
import sys
-from vyos.util import is_admin
+from vyos.utils.permission import is_admin
if __name__ == '__main__':
diff --git a/src/op_mode/igmp-proxy.py b/src/op_mode/igmp-proxy.py
index a640b0bbf..709e25915 100755
--- a/src/op_mode/igmp-proxy.py
+++ b/src/op_mode/igmp-proxy.py
@@ -28,17 +28,14 @@ import tabulate
import vyos.config
import vyos.opmode
-from vyos.util import bytes_to_human
+from vyos.utils.convert import bytes_to_human
from vyos.utils.io import print_error
+from vyos.utils.process import process_named_running
def _is_configured():
"""Check if IGMP proxy is configured"""
return vyos.config.Config().exists_effective('protocols igmp-proxy')
-def _is_running():
- """Check if IGMP proxy is currently running"""
- return not vyos.util.run('ps -C igmpproxy')
-
def _kernel_to_ip(addr):
"""
Convert any given address from Linux kernel to a proper, IPv4 address
@@ -85,7 +82,7 @@ def show_interface(raw: bool):
if not _is_configured():
print_error('IGMP proxy is not configured.')
sys.exit(0)
-if not _is_running():
+if not process_named_running('igmpproxy'):
print_error('IGMP proxy is not running.')
sys.exit(0)
diff --git a/src/op_mode/ipsec.py b/src/op_mode/ipsec.py
index 774459771..57d3cfed9 100755
--- a/src/op_mode/ipsec.py
+++ b/src/op_mode/ipsec.py
@@ -21,8 +21,8 @@ from hurry import filesize
from re import split as re_split
from tabulate import tabulate
-from vyos.util import convert_data
-from vyos.util import seconds_to_human
+from vyos.utils.convert import convert_data
+from vyos.utils.convert import seconds_to_human
from vyos.utils.process import cmd
from vyos.configquery import ConfigTreeQuery
diff --git a/src/op_mode/memory.py b/src/op_mode/memory.py
index 7666de646..eb530035b 100755
--- a/src/op_mode/memory.py
+++ b/src/op_mode/memory.py
@@ -54,7 +54,7 @@ def _get_raw_data():
return mem_data
def _get_formatted_output(mem):
- from vyos.util import bytes_to_human
+ from vyos.utils.convert import bytes_to_human
# For human-readable outputs, we convert bytes to more convenient units
# (100M, 1.3G...)
diff --git a/src/op_mode/openvpn.py b/src/op_mode/openvpn.py
index f59206330..fd9d2db92 100755
--- a/src/op_mode/openvpn.py
+++ b/src/op_mode/openvpn.py
@@ -23,7 +23,7 @@ import typing
from tabulate import tabulate
import vyos.opmode
-from vyos.util import bytes_to_human
+from vyos.utils.convert import bytes_to_human
from vyos.utils.commit import commit_in_progress
from vyos.utils.process import call
from vyos.utils.process import rc_cmd
diff --git a/src/op_mode/pki.py b/src/op_mode/pki.py
index 7a8195a89..4c31291ad 100755
--- a/src/op_mode/pki.py
+++ b/src/op_mode/pki.py
@@ -1,6 +1,6 @@
#!/usr/bin/env python3
#
-# Copyright (C) 2021 VyOS maintainers and contributors
+# Copyright (C) 2021-2023 VyOS maintainers and contributors
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 or later as
@@ -34,11 +34,11 @@ from vyos.pki import create_dh_parameters
from vyos.pki import load_certificate, load_certificate_request, load_private_key
from vyos.pki import load_crl, load_dh_parameters, load_public_key
from vyos.pki import verify_certificate
-from vyos.xml import defaults
from vyos.utils.io import ask_input
from vyos.utils.io import ask_yes_no
+from vyos.utils.misc import install_into_config
from vyos.utils.process import cmd
-from vyos.util import install_into_config
+from vyos.xml import defaults
CERT_REQ_END = '-----END CERTIFICATE REQUEST-----'
auth_dir = '/config/auth'
@@ -191,7 +191,7 @@ def install_ssh_key(name, public_key, private_key, passphrase=None):
def install_keypair(name, key_type, private_key=None, public_key=None, passphrase=None, prompt=True):
# Show/install conf commands for key-pair
-
+
config_paths = []
if public_key:
diff --git a/src/op_mode/storage.py b/src/op_mode/storage.py
index 9704a9cf5..6bc3d3a2d 100755
--- a/src/op_mode/storage.py
+++ b/src/op_mode/storage.py
@@ -43,7 +43,7 @@ def _get_system_storage(only_persistent=False):
def _get_raw_data():
from re import sub as re_sub
- from vyos.util import human_to_bytes
+ from vyos.utils.convert import human_to_bytes
out = _get_system_storage(only_persistent=True)
lines = out.splitlines()
diff --git a/src/op_mode/uptime.py b/src/op_mode/uptime.py
index 62908164e..d6adf6f4d 100755
--- a/src/op_mode/uptime.py
+++ b/src/op_mode/uptime.py
@@ -45,7 +45,7 @@ def _get_load_averages():
return res
def _get_raw_data():
- from vyos.util import seconds_to_human
+ from vyos.utils.convert import seconds_to_human
res = {}
res["uptime_seconds"] = _get_uptime_seconds()
diff --git a/src/services/api/graphql/generate/schema_from_op_mode.py b/src/services/api/graphql/generate/schema_from_op_mode.py
index 229ccf90f..ab7cb691f 100755
--- a/src/services/api/graphql/generate/schema_from_op_mode.py
+++ b/src/services/api/graphql/generate/schema_from_op_mode.py
@@ -27,7 +27,7 @@ from jinja2 import Template
from vyos.defaults import directories
from vyos.opmode import _is_op_mode_function_name as is_op_mode_function_name
from vyos.opmode import _get_literal_values as get_literal_values
-from vyos.util import load_as_module
+from vyos.utils.system import load_as_module
if __package__ is None or __package__ == '':
sys.path.append(os.path.join(directories['services'], 'api'))
from graphql.libs.op_mode import is_show_function_name
diff --git a/src/services/api/graphql/libs/op_mode.py b/src/services/api/graphql/libs/op_mode.py
index e91d8bd0f..5022f7d4e 100644
--- a/src/services/api/graphql/libs/op_mode.py
+++ b/src/services/api/graphql/libs/op_mode.py
@@ -20,7 +20,7 @@ from typing import Union, Tuple, Optional
from humps import decamelize
from vyos.defaults import directories
-from vyos.util import load_as_module
+from vyos.utils.system import load_as_module
from vyos.opmode import _normalize_field_names
from vyos.opmode import _is_literal_type, _get_literal_values
diff --git a/src/tests/test_find_device_file.py b/src/tests/test_find_device_file.py
index 43c80dc76..f18043d65 100755
--- a/src/tests/test_find_device_file.py
+++ b/src/tests/test_find_device_file.py
@@ -15,7 +15,7 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from unittest import TestCase
-from vyos.util import find_device_file
+from vyos.utils.system import find_device_file
class TestDeviceFile(TestCase):
""" used to find USB devices on target """
diff --git a/src/tests/test_util.py b/src/tests/test_util.py
index 2e8f1b6cc..66c27e824 100644
--- a/src/tests/test_util.py
+++ b/src/tests/test_util.py
@@ -17,7 +17,7 @@
from unittest import TestCase
class TestVyOSUtil(TestCase):
- def test_key_mangline(self):
+ def test_key_mangling(self):
from vyos.util import mangle_dict_keys
data = {"foo-bar": {"baz-quux": None}}
expected_data = {"foo_bar": {"baz_quux": None}}