summaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
Diffstat (limited to 'python')
-rw-r--r--python/vyos/accel_ppp.py2
-rw-r--r--python/vyos/authutils.py2
-rw-r--r--python/vyos/config_mgmt.py5
-rw-r--r--python/vyos/configdict.py4
-rw-r--r--python/vyos/configdiff.py5
-rw-r--r--python/vyos/configquery.py5
-rw-r--r--python/vyos/configsession.py2
-rw-r--r--python/vyos/configverify.py8
-rw-r--r--python/vyos/ethtool.py2
-rw-r--r--python/vyos/firewall.py14
-rw-r--r--python/vyos/frr.py18
-rw-r--r--python/vyos/ifconfig/bond.py4
-rw-r--r--python/vyos/ifconfig/bridge.py4
-rw-r--r--python/vyos/ifconfig/control.py8
-rw-r--r--python/vyos/ifconfig/ethernet.py6
-rw-r--r--python/vyos/ifconfig/geneve.py2
-rw-r--r--python/vyos/ifconfig/interface.py12
-rw-r--r--python/vyos/ifconfig/l2tpv3.py5
-rw-r--r--python/vyos/ifconfig/pppoe.py2
-rw-r--r--python/vyos/ifconfig/tunnel.py2
-rw-r--r--python/vyos/ifconfig/vti.py2
-rw-r--r--python/vyos/ifconfig/vxlan.py2
-rw-r--r--python/vyos/migrator.py2
-rw-r--r--python/vyos/nat.py2
-rw-r--r--python/vyos/qos/base.py6
-rw-r--r--python/vyos/qos/priority.py2
-rw-r--r--python/vyos/remote.py10
-rw-r--r--python/vyos/template.py10
-rw-r--r--python/vyos/util.py741
-rw-r--r--python/vyos/utils/__init__.py10
-rw-r--r--python/vyos/utils/commit.py60
-rw-r--r--python/vyos/utils/file.py12
-rw-r--r--python/vyos/utils/network.py154
-rw-r--r--python/vyos/utils/permission.py63
-rw-r--r--python/vyos/utils/process.py230
-rw-r--r--python/vyos/validate.py6
-rw-r--r--python/vyos/version.py12
37 files changed, 616 insertions, 820 deletions
diff --git a/python/vyos/accel_ppp.py b/python/vyos/accel_ppp.py
index 0af311e57..0b4f8a9fe 100644
--- a/python/vyos/accel_ppp.py
+++ b/python/vyos/accel_ppp.py
@@ -18,7 +18,7 @@
import sys
import vyos.opmode
-from vyos.util import rc_cmd
+from vyos.utils.process import rc_cmd
def get_server_statistics(accel_statistics, pattern, sep=':') -> dict:
diff --git a/python/vyos/authutils.py b/python/vyos/authutils.py
index 66b5f4a74..a59858d72 100644
--- a/python/vyos/authutils.py
+++ b/python/vyos/authutils.py
@@ -15,7 +15,7 @@
import re
-from vyos.util import cmd
+from vyos.utils.process import cmd
def make_password_hash(password):
diff --git a/python/vyos/config_mgmt.py b/python/vyos/config_mgmt.py
index e1870aa0a..4ddabd6c2 100644
--- a/python/vyos/config_mgmt.py
+++ b/python/vyos/config_mgmt.py
@@ -18,6 +18,7 @@ import re
import sys
import gzip
import logging
+
from typing import Optional, Tuple, Union
from filecmp import cmp
from datetime import datetime
@@ -29,7 +30,9 @@ from vyos.config import Config
from vyos.configtree import ConfigTree, ConfigTreeError, show_diff
from vyos.defaults import directories
from vyos.version import get_full_version_data
-from vyos.util import is_systemd_service_active, ask_yes_no, rc_cmd
+from vyos.utils.io import ask_yes_no
+from vyos.utils.process import is_systemd_service_active
+from vyos.utils.process import rc_cmd
SAVE_CONFIG = '/opt/vyatta/sbin/vyatta-save-config.pl'
diff --git a/python/vyos/configdict.py b/python/vyos/configdict.py
index 1205342df..2411a04c4 100644
--- a/python/vyos/configdict.py
+++ b/python/vyos/configdict.py
@@ -19,9 +19,9 @@ A library for retrieving value dicts from VyOS configs in a declarative fashion.
import os
import json
-from vyos.util import dict_search
+from vyos.utils.dict import dict_search
from vyos.xml import defaults
-from vyos.util import cmd
+from vyos.utils.process import cmd
def retrieve_config(path_hash, base_path, config):
"""
diff --git a/python/vyos/configdiff.py b/python/vyos/configdiff.py
index ac86af09c..5d30e9b66 100644
--- a/python/vyos/configdiff.py
+++ b/python/vyos/configdiff.py
@@ -19,8 +19,9 @@ from vyos.config import Config
from vyos.configtree import DiffTree
from vyos.configdict import dict_merge
from vyos.configdict import list_diff
-from vyos.util import get_sub_dict, mangle_dict_keys
-from vyos.util import dict_search_args
+from vyos.utils.dict import get_sub_dict
+from vyos.util import mangle_dict_keys
+from vyos.utils.dict import dict_search_args
from vyos.xml import defaults
class ConfigDiffError(Exception):
diff --git a/python/vyos/configquery.py b/python/vyos/configquery.py
index 9260da568..71ad5b4f0 100644
--- a/python/vyos/configquery.py
+++ b/python/vyos/configquery.py
@@ -19,9 +19,10 @@ settings from op mode, and execution of arbitrary op mode commands.
'''
import os
-from subprocess import STDOUT
-from vyos.util import popen
+from vyos.utils.process import STDOUT
+from vyos.utils.process import popen
+
from vyos.utils.boot import boot_configuration_complete
from vyos.config import Config
from vyos.configsource import ConfigSourceSession, ConfigSourceString
diff --git a/python/vyos/configsession.py b/python/vyos/configsession.py
index decb82437..e8918d577 100644
--- a/python/vyos/configsession.py
+++ b/python/vyos/configsession.py
@@ -17,7 +17,7 @@ import re
import sys
import subprocess
-from vyos.util import is_systemd_service_running
+from vyos.utils.process import is_systemd_service_running
from vyos.utils.dict import dict_to_paths
CLI_SHELL_API = '/bin/cli-shell-api'
diff --git a/python/vyos/configverify.py b/python/vyos/configverify.py
index 94dcdf4d9..5b94bd98b 100644
--- a/python/vyos/configverify.py
+++ b/python/vyos/configverify.py
@@ -22,8 +22,8 @@
# makes use of it!
from vyos import ConfigError
-from vyos.util import dict_search
-from vyos.util import dict_search_recursive
+from vyos.utils.dict import dict_search
+from vyos.utils.dict import dict_search_recursive
def verify_mtu(config):
"""
@@ -314,8 +314,6 @@ def verify_dhcpv6(config):
recurring validation of DHCPv6 options which are mutually exclusive.
"""
if 'dhcpv6_options' in config:
- from vyos.util import dict_search
-
if {'parameters_only', 'temporary'} <= set(config['dhcpv6_options']):
raise ConfigError('DHCPv6 temporary and parameters-only options '
'are mutually exclusive!')
@@ -460,7 +458,7 @@ def verify_diffie_hellman_length(file, min_keysize):
then or equal to min_keysize """
import os
import re
- from vyos.util import cmd
+ from vyos.utils.process import cmd
try:
keysize = str(min_keysize)
diff --git a/python/vyos/ethtool.py b/python/vyos/ethtool.py
index 9b7da89fa..ca3bcfc3d 100644
--- a/python/vyos/ethtool.py
+++ b/python/vyos/ethtool.py
@@ -16,7 +16,7 @@
import os
import re
-from vyos.util import popen
+from vyos.utils.process import popen
# These drivers do not support using ethtool to change the speed, duplex, or
# flow control settings
diff --git a/python/vyos/firewall.py b/python/vyos/firewall.py
index 919032a41..2793b201c 100644
--- a/python/vyos/firewall.py
+++ b/python/vyos/firewall.py
@@ -1,6 +1,6 @@
#!/usr/bin/env python3
#
-# Copyright (C) 2021-2022 VyOS maintainers and contributors
+# Copyright (C) 2021-2023 VyOS maintainers and contributors
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 or later as
@@ -28,11 +28,11 @@ from time import strftime
from vyos.remote import download
from vyos.template import is_ipv4
from vyos.template import render
-from vyos.util import call
-from vyos.util import cmd
-from vyos.util import dict_search_args
-from vyos.util import dict_search_recursive
-from vyos.util import run
+from vyos.utils.dict import dict_search_args
+from vyos.utils.dict import dict_search_recursive
+from vyos.utils.process import call
+from vyos.utils.process import cmd
+from vyos.utils.process import run
# Domain Resolver
@@ -45,7 +45,7 @@ def fqdn_config_parse(firewall):
rule = path[3] # rule id
suffix = path[4][0] # source/destination (1 char)
set_name = f'{fw_name}_{rule}_{suffix}'
-
+
if path[0] == 'name':
firewall['ip_fqdn'][set_name] = domain
elif path[0] == 'ipv6_name':
diff --git a/python/vyos/frr.py b/python/vyos/frr.py
index a84f183ef..58213d197 100644
--- a/python/vyos/frr.py
+++ b/python/vyos/frr.py
@@ -67,9 +67,13 @@ Apply the new configuration:
import tempfile
import re
+
from vyos import util
-from vyos.util import chown
-from vyos.util import cmd
+from vyos.utils.permission import chown
+from vyos.utils.process import cmd
+from vyos.utils.process import popen
+from vyos.utils.process import STDOUT
+
import logging
from logging.handlers import SysLogHandler
import os
@@ -144,7 +148,7 @@ def get_configuration(daemon=None, marked=False):
if daemon:
cmd += f' -d {daemon}'
- output, code = util.popen(cmd, stderr=util.STDOUT)
+ output, code = popen(cmd, stderr=STDOUT)
if code:
raise OSError(code, output)
@@ -166,7 +170,7 @@ def mark_configuration(config):
config: The configuration string to mark/test
return: The marked configuration from FRR
"""
- output, code = util.popen(f"{path_vtysh} -m -f -", stderr=util.STDOUT, input=config)
+ output, code = popen(f"{path_vtysh} -m -f -", stderr=STDOUT, input=config)
if code == 2:
raise ConfigurationNotValid(str(output))
@@ -206,7 +210,7 @@ def reload_configuration(config, daemon=None):
cmd += f' {f.name}'
LOG.debug(f'reload_configuration: Executing command against frr-reload: "{cmd}"')
- output, code = util.popen(cmd, stderr=util.STDOUT)
+ output, code = popen(cmd, stderr=STDOUT)
f.close()
for i, e in enumerate(output.split('\n')):
LOG.debug(f'frr-reload output: {i:3} {e}')
@@ -235,7 +239,7 @@ def execute(command):
cmd = f"{path_vtysh} -c '{command}'"
- output, code = util.popen(cmd, stderr=util.STDOUT)
+ output, code = popen(cmd, stderr=STDOUT)
if code:
raise OSError(code, output)
@@ -267,7 +271,7 @@ def configure(lines, daemon=False):
for x in lines:
cmd += f" -c '{x}'"
- output, code = util.popen(cmd, stderr=util.STDOUT)
+ output, code = popen(cmd, stderr=STDOUT)
if code == 1:
raise ConfigurationNotValid(f'Configuration FRR failed: {repr(output)}')
elif code:
diff --git a/python/vyos/ifconfig/bond.py b/python/vyos/ifconfig/bond.py
index 0edd17055..e88f860be 100644
--- a/python/vyos/ifconfig/bond.py
+++ b/python/vyos/ifconfig/bond.py
@@ -16,8 +16,8 @@
import os
from vyos.ifconfig.interface import Interface
-from vyos.util import cmd
-from vyos.util import dict_search
+from vyos.utils.process import cmd
+from vyos.utils.dict import dict_search
from vyos.validate import assert_list
from vyos.validate import assert_positive
diff --git a/python/vyos/ifconfig/bridge.py b/python/vyos/ifconfig/bridge.py
index aa818bc5f..b103b49d8 100644
--- a/python/vyos/ifconfig/bridge.py
+++ b/python/vyos/ifconfig/bridge.py
@@ -19,8 +19,8 @@ import json
from vyos.ifconfig.interface import Interface
from vyos.validate import assert_boolean
from vyos.validate import assert_positive
-from vyos.util import cmd
-from vyos.util import dict_search
+from vyos.utils.process import cmd
+from vyos.utils.dict import dict_search
from vyos.configdict import get_vlan_ids
from vyos.configdict import list_diff
diff --git a/python/vyos/ifconfig/control.py b/python/vyos/ifconfig/control.py
index 7a6b36e7c..c8366cb58 100644
--- a/python/vyos/ifconfig/control.py
+++ b/python/vyos/ifconfig/control.py
@@ -19,10 +19,10 @@ from inspect import signature
from inspect import _empty
from vyos.ifconfig.section import Section
-from vyos.util import popen
-from vyos.util import cmd
-from vyos.util import read_file
-from vyos.util import write_file
+from vyos.utils.process import popen
+from vyos.utils.process import cmd
+from vyos.utils.file import read_file
+from vyos.utils.file import write_file
from vyos import debug
class Control(Section):
diff --git a/python/vyos/ifconfig/ethernet.py b/python/vyos/ifconfig/ethernet.py
index 30bea3b86..4ff044c23 100644
--- a/python/vyos/ifconfig/ethernet.py
+++ b/python/vyos/ifconfig/ethernet.py
@@ -20,9 +20,9 @@ from glob import glob
from vyos.base import Warning
from vyos.ethtool import Ethtool
from vyos.ifconfig.interface import Interface
-from vyos.util import run
-from vyos.util import dict_search
-from vyos.util import read_file
+from vyos.utils.dict import dict_search
+from vyos.utils.file import read_file
+from vyos.utils.process import run
from vyos.validate import assert_list
@Interface.register
diff --git a/python/vyos/ifconfig/geneve.py b/python/vyos/ifconfig/geneve.py
index 7a05e47a7..fbb261a35 100644
--- a/python/vyos/ifconfig/geneve.py
+++ b/python/vyos/ifconfig/geneve.py
@@ -14,7 +14,7 @@
# License along with this library. If not, see <http://www.gnu.org/licenses/>.
from vyos.ifconfig import Interface
-from vyos.util import dict_search
+from vyos.utils.dict import dict_search
@Interface.register
class GeneveIf(Interface):
diff --git a/python/vyos/ifconfig/interface.py b/python/vyos/ifconfig/interface.py
index f6289a6e6..120f2131b 100644
--- a/python/vyos/ifconfig/interface.py
+++ b/python/vyos/ifconfig/interface.py
@@ -32,12 +32,12 @@ from vyos.configdict import list_diff
from vyos.configdict import dict_merge
from vyos.configdict import get_vlan_ids
from vyos.template import render
-from vyos.util import mac2eui64
-from vyos.util import dict_search
-from vyos.util import read_file
-from vyos.util import get_interface_config
-from vyos.util import get_interface_namespace
-from vyos.util import is_systemd_service_active
+from vyos.utils.network import mac2eui64
+from vyos.utils.dict import dict_search
+from vyos.utils.file import read_file
+from vyos.utils.network import get_interface_config
+from vyos.utils.network import get_interface_namespace
+from vyos.utils.process import is_systemd_service_active
from vyos.template import is_ipv4
from vyos.template import is_ipv6
from vyos.validate import is_intf_addr_assigned
diff --git a/python/vyos/ifconfig/l2tpv3.py b/python/vyos/ifconfig/l2tpv3.py
index fcd1fbf81..85a89ef8b 100644
--- a/python/vyos/ifconfig/l2tpv3.py
+++ b/python/vyos/ifconfig/l2tpv3.py
@@ -1,4 +1,4 @@
-# Copyright 2019-2021 VyOS maintainers and contributors <maintainers@vyos.io>
+# Copyright 2019-2023 VyOS maintainers and contributors <maintainers@vyos.io>
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
@@ -15,7 +15,8 @@
from time import sleep
from time import time
-from vyos.util import run
+
+from vyos.utils.process import run
from vyos.ifconfig.interface import Interface
def wait_for_add_l2tpv3(timeout=10, sleep_interval=1, cmd=None):
diff --git a/python/vyos/ifconfig/pppoe.py b/python/vyos/ifconfig/pppoe.py
index 437fe0cae..fd4590beb 100644
--- a/python/vyos/ifconfig/pppoe.py
+++ b/python/vyos/ifconfig/pppoe.py
@@ -15,7 +15,7 @@
from vyos.ifconfig.interface import Interface
from vyos.validate import assert_range
-from vyos.util import get_interface_config
+from vyos.utils.network import get_interface_config
@Interface.register
class PPPoEIf(Interface):
diff --git a/python/vyos/ifconfig/tunnel.py b/python/vyos/ifconfig/tunnel.py
index b7bf7d982..fb2f38e2b 100644
--- a/python/vyos/ifconfig/tunnel.py
+++ b/python/vyos/ifconfig/tunnel.py
@@ -17,7 +17,7 @@
# https://community.hetzner.com/tutorials/linux-setup-gre-tunnel
from vyos.ifconfig.interface import Interface
-from vyos.util import dict_search
+from vyos.utils.dict import dict_search
from vyos.validate import assert_list
def enable_to_on(value):
diff --git a/python/vyos/ifconfig/vti.py b/python/vyos/ifconfig/vti.py
index dc99d365a..9ebbeb9ed 100644
--- a/python/vyos/ifconfig/vti.py
+++ b/python/vyos/ifconfig/vti.py
@@ -14,7 +14,7 @@
# License along with this library. If not, see <http://www.gnu.org/licenses/>.
from vyos.ifconfig.interface import Interface
-from vyos.util import dict_search
+from vyos.utils.dict import dict_search
@Interface.register
class VTIIf(Interface):
diff --git a/python/vyos/ifconfig/vxlan.py b/python/vyos/ifconfig/vxlan.py
index 5baff10a9..6a9911588 100644
--- a/python/vyos/ifconfig/vxlan.py
+++ b/python/vyos/ifconfig/vxlan.py
@@ -15,7 +15,7 @@
from vyos import ConfigError
from vyos.ifconfig import Interface
-from vyos.util import dict_search
+from vyos.utils.dict import dict_search
@Interface.register
class VXLANIf(Interface):
diff --git a/python/vyos/migrator.py b/python/vyos/migrator.py
index 87c74e1ea..872682bc0 100644
--- a/python/vyos/migrator.py
+++ b/python/vyos/migrator.py
@@ -20,7 +20,7 @@ import logging
import vyos.defaults
import vyos.component_version as component_version
-from vyos.util import cmd
+from vyos.utils.process import cmd
log_file = os.path.join(vyos.defaults.directories['config'], 'vyos-migrate.log')
diff --git a/python/vyos/nat.py b/python/vyos/nat.py
index 53fd7fb33..5b8d5d1a3 100644
--- a/python/vyos/nat.py
+++ b/python/vyos/nat.py
@@ -15,7 +15,7 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from vyos.template import is_ip_network
-from vyos.util import dict_search_args
+from vyos.utils.dict import dict_search_args
from vyos.template import bracketize_ipv6
diff --git a/python/vyos/qos/base.py b/python/vyos/qos/base.py
index 3983b1bc0..6c5a3d79c 100644
--- a/python/vyos/qos/base.py
+++ b/python/vyos/qos/base.py
@@ -16,9 +16,9 @@
import os
from vyos.base import Warning
-from vyos.util import cmd
-from vyos.util import dict_search
-from vyos.util import read_file
+from vyos.utils.process import cmd
+from vyos.utils.dict import dict_search
+from vyos.utils.file import read_file
from vyos.utils.network import get_protocol_by_name
diff --git a/python/vyos/qos/priority.py b/python/vyos/qos/priority.py
index 6d4a60a43..8182400f9 100644
--- a/python/vyos/qos/priority.py
+++ b/python/vyos/qos/priority.py
@@ -14,7 +14,7 @@
# License along with this library. If not, see <http://www.gnu.org/licenses/>.
from vyos.qos.base import QoSBase
-from vyos.util import dict_search
+from vyos.utils.dict import dict_search
class Priority(QoSBase):
_parent = 1
diff --git a/python/vyos/remote.py b/python/vyos/remote.py
index 66044fa52..15939a523 100644
--- a/python/vyos/remote.py
+++ b/python/vyos/remote.py
@@ -32,12 +32,12 @@ from requests import Session
from requests.adapters import HTTPAdapter
from requests.packages.urllib3 import PoolManager
-from vyos.util import ask_yes_no
+from vyos.utils.io import ask_yes_no
from vyos.util import begin
-from vyos.util import cmd
-from vyos.util import make_incremental_progressbar
-from vyos.util import make_progressbar
-from vyos.util import print_error
+from vyos.utils.process import cmd
+from vyos.utils.io import make_incremental_progressbar
+from vyos.utils.io import make_progressbar
+from vyos.utils.io import print_error
from vyos.version import get_version
diff --git a/python/vyos/template.py b/python/vyos/template.py
index 254a15e3a..3cf60cea9 100644
--- a/python/vyos/template.py
+++ b/python/vyos/template.py
@@ -20,10 +20,10 @@ from jinja2 import Environment
from jinja2 import FileSystemLoader
from jinja2 import ChainableUndefined
from vyos.defaults import directories
-from vyos.util import chmod
-from vyos.util import chown
-from vyos.util import dict_search_args
-from vyos.util import makedir
+from vyos.utils.dict import dict_search_args
+from vyos.utils.file import makedir
+from vyos.utils.permission import chmod
+from vyos.utils.permission import chown
# Holds template filters registered via register_filter()
_FILTERS = {}
@@ -424,7 +424,7 @@ def get_dhcp_router(interface):
if not os.path.exists(lease_file):
return None
- from vyos.util import read_file
+ from vyos.utils.file import read_file
for line in read_file(lease_file).splitlines():
if 'option routers' in line:
(_, _, address) = line.split()
diff --git a/python/vyos/util.py b/python/vyos/util.py
index fac3a920b..0d7985d54 100644
--- a/python/vyos/util.py
+++ b/python/vyos/util.py
@@ -1,4 +1,4 @@
-# Copyright 2020-2022 VyOS maintainers and contributors <maintainers@vyos.io>
+# Copyright 2020-2023 VyOS maintainers and contributors <maintainers@vyos.io>
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
@@ -22,351 +22,6 @@ import sys
# where it is used so it is as local as possible to the execution
#
-from subprocess import Popen
-from subprocess import PIPE
-from subprocess import STDOUT
-from subprocess import DEVNULL
-
-def popen(command, flag='', shell=None, input=None, timeout=None, env=None,
- stdout=PIPE, stderr=PIPE, decode='utf-8'):
- """
- popen is a wrapper helper aound subprocess.Popen
- with it default setting it will return a tuple (out, err)
- out: the output of the program run
- err: the error code returned by the program
-
- it can be affected by the following flags:
- shell: do not try to auto-detect if a shell is required
- for example if a pipe (|) or redirection (>, >>) is used
- input: data to sent to the child process via STDIN
- the data should be bytes but string will be converted
- timeout: time after which the command will be considered to have failed
- env: mapping that defines the environment variables for the new process
- stdout: define how the output of the program should be handled
- - PIPE (default), sends stdout to the output
- - DEVNULL, discard the output
- stderr: define how the output of the program should be handled
- - None (default), send/merge the data to/with stderr
- - PIPE, popen will append it to output
- - STDOUT, send the data to be merged with stdout
- - DEVNULL, discard the output
- decode: specify the expected text encoding (utf-8, ascii, ...)
- the default is explicitely utf-8 which is python's own default
-
- usage:
- get both stdout and stderr: popen('command', stdout=PIPE, stderr=STDOUT)
- discard stdout and get stderr: popen('command', stdout=DEVNUL, stderr=PIPE)
- """
-
- # airbag must be left as an import in the function as otherwise we have a
- # a circual import dependency
- from vyos import debug
- from vyos import airbag
-
- # log if the flag is set, otherwise log if command is set
- if not debug.enabled(flag):
- flag = 'command'
-
- cmd_msg = f"cmd '{command}'"
- debug.message(cmd_msg, flag)
-
- use_shell = shell
- stdin = None
- if shell is None:
- use_shell = False
- if ' ' in command:
- use_shell = True
- if env:
- use_shell = True
-
- if input:
- stdin = PIPE
- input = input.encode() if type(input) is str else input
-
- p = Popen(command, stdin=stdin, stdout=stdout, stderr=stderr,
- env=env, shell=use_shell)
-
- pipe = p.communicate(input, timeout)
-
- pipe_out = b''
- if stdout == PIPE:
- pipe_out = pipe[0]
-
- pipe_err = b''
- if stderr == PIPE:
- pipe_err = pipe[1]
-
- str_out = pipe_out.decode(decode).replace('\r\n', '\n').strip()
- str_err = pipe_err.decode(decode).replace('\r\n', '\n').strip()
-
- out_msg = f"returned (out):\n{str_out}"
- if str_out:
- debug.message(out_msg, flag)
-
- if str_err:
- err_msg = f"returned (err):\n{str_err}"
- # this message will also be send to syslog via airbag
- debug.message(err_msg, flag, destination=sys.stderr)
-
- # should something go wrong, report this too via airbag
- airbag.noteworthy(cmd_msg)
- airbag.noteworthy(out_msg)
- airbag.noteworthy(err_msg)
-
- return str_out, p.returncode
-
-
-def run(command, flag='', shell=None, input=None, timeout=None, env=None,
- stdout=DEVNULL, stderr=PIPE, decode='utf-8'):
- """
- A wrapper around popen, which discard the stdout and
- will return the error code of a command
- """
- _, code = popen(
- command, flag,
- stdout=stdout, stderr=stderr,
- input=input, timeout=timeout,
- env=env, shell=shell,
- decode=decode,
- )
- return code
-
-
-def cmd(command, flag='', shell=None, input=None, timeout=None, env=None,
- stdout=PIPE, stderr=PIPE, decode='utf-8', raising=None, message='',
- expect=[0]):
- """
- A wrapper around popen, which returns the stdout and
- will raise the error code of a command
-
- raising: specify which call should be used when raising
- the class should only require a string as parameter
- (default is OSError) with the error code
- expect: a list of error codes to consider as normal
- """
- decoded, code = popen(
- command, flag,
- stdout=stdout, stderr=stderr,
- input=input, timeout=timeout,
- env=env, shell=shell,
- decode=decode,
- )
- if code not in expect:
- feedback = message + '\n' if message else ''
- feedback += f'failed to run command: {command}\n'
- feedback += f'returned: {decoded}\n'
- feedback += f'exit code: {code}'
- if raising is None:
- # error code can be recovered with .errno
- raise OSError(code, feedback)
- else:
- raise raising(feedback)
- return decoded
-
-
-def rc_cmd(command, flag='', shell=None, input=None, timeout=None, env=None,
- stdout=PIPE, stderr=STDOUT, decode='utf-8'):
- """
- A wrapper around popen, which returns the return code
- of a command and stdout
-
- % rc_cmd('uname')
- (0, 'Linux')
- % rc_cmd('ip link show dev eth99')
- (1, 'Device "eth99" does not exist.')
- """
- out, code = popen(
- command, flag,
- stdout=stdout, stderr=stderr,
- input=input, timeout=timeout,
- env=env, shell=shell,
- decode=decode,
- )
- return code, out
-
-
-def call(command, flag='', shell=None, input=None, timeout=None, env=None,
- stdout=PIPE, stderr=PIPE, decode='utf-8'):
- """
- A wrapper around popen, which print the stdout and
- will return the error code of a command
- """
- out, code = popen(
- command, flag,
- stdout=stdout, stderr=stderr,
- input=input, timeout=timeout,
- env=env, shell=shell,
- decode=decode,
- )
- if out:
- print(out)
- return code
-
-
-def read_file(fname, defaultonfailure=None):
- """
- read the content of a file, stripping any end characters (space, newlines)
- should defaultonfailure be not None, it is returned on failure to read
- """
- try:
- """ Read a file to string """
- with open(fname, 'r') as f:
- data = f.read().strip()
- return data
- except Exception as e:
- if defaultonfailure is not None:
- return defaultonfailure
- raise e
-
-def write_file(fname, data, defaultonfailure=None, user=None, group=None, mode=None, append=False):
- """
- Write content of data to given fname, should defaultonfailure be not None,
- it is returned on failure to read.
-
- If directory of file is not present, it is auto-created.
- """
- dirname = os.path.dirname(fname)
- if not os.path.isdir(dirname):
- os.makedirs(dirname, mode=0o755, exist_ok=False)
- chown(dirname, user, group)
-
- try:
- """ Write a file to string """
- bytes = 0
- with open(fname, 'w' if not append else 'a') as f:
- bytes = f.write(data)
- chown(fname, user, group)
- chmod(fname, mode)
- return bytes
- except Exception as e:
- if defaultonfailure is not None:
- return defaultonfailure
- raise e
-
-def read_json(fname, defaultonfailure=None):
- """
- read and json decode the content of a file
- should defaultonfailure be not None, it is returned on failure to read
- """
- import json
- try:
- with open(fname, 'r') as f:
- data = json.load(f)
- return data
- except Exception as e:
- if defaultonfailure is not None:
- return defaultonfailure
- raise e
-
-
-def chown(path, user, group):
- """ change file/directory owner """
- from pwd import getpwnam
- from grp import getgrnam
-
- if user is None or group is None:
- return False
-
- # path may also be an open file descriptor
- if not isinstance(path, int) and not os.path.exists(path):
- return False
-
- uid = getpwnam(user).pw_uid
- gid = getgrnam(group).gr_gid
- os.chown(path, uid, gid)
- return True
-
-
-def chmod(path, bitmask):
- # path may also be an open file descriptor
- if not isinstance(path, int) and not os.path.exists(path):
- return
- if bitmask is None:
- return
- os.chmod(path, bitmask)
-
-
-def chmod_600(path):
- """ make file only read/writable by owner """
- from stat import S_IRUSR, S_IWUSR
-
- bitmask = S_IRUSR | S_IWUSR
- chmod(path, bitmask)
-
-
-def chmod_750(path):
- """ make file/directory only executable to user and group """
- from stat import S_IRUSR, S_IWUSR, S_IXUSR, S_IRGRP, S_IXGRP
-
- bitmask = S_IRUSR | S_IWUSR | S_IXUSR | S_IRGRP | S_IXGRP
- chmod(path, bitmask)
-
-
-def chmod_755(path):
- """ make file executable by all """
- from stat import S_IRUSR, S_IWUSR, S_IXUSR, S_IRGRP, S_IXGRP, S_IROTH, S_IXOTH
-
- bitmask = S_IRUSR | S_IWUSR | S_IXUSR | S_IRGRP | S_IXGRP | \
- S_IROTH | S_IXOTH
- chmod(path, bitmask)
-
-
-def makedir(path, user=None, group=None):
- if os.path.exists(path):
- return
- os.makedirs(path, mode=0o755)
- chown(path, user, group)
-
-def colon_separated_to_dict(data_string, uniquekeys=False):
- """ Converts a string containing newline-separated entries
- of colon-separated key-value pairs into a dict.
-
- Such files are common in Linux /proc filesystem
-
- Args:
- data_string (str): data string
- uniquekeys (bool): whether to insist that keys are unique or not
-
- Returns: dict
-
- Raises:
- ValueError: if uniquekeys=True and the data string has
- duplicate keys.
-
- Note:
- If uniquekeys=True, then dict entries are always strings,
- otherwise they are always lists of strings.
- """
- import re
- key_value_re = re.compile('([^:]+)\s*\:\s*(.*)')
-
- data_raw = re.split('\n', data_string)
-
- data = {}
-
- for l in data_raw:
- l = l.strip()
- if l:
- match = re.match(key_value_re, l)
- if match and (len(match.groups()) == 2):
- key = match.groups()[0].strip()
- value = match.groups()[1].strip()
- else:
- raise ValueError(f"""Line "{l}" could not be parsed a colon-separated pair """, l)
- if key in data.keys():
- if uniquekeys:
- raise ValueError("Data string has duplicate keys: {0}".format(key))
- else:
- data[key].append(value)
- else:
- if uniquekeys:
- data[key] = value
- else:
- data[key] = [value]
- else:
- pass
-
- return data
def _mangle_dict_keys(data, regex, replacement, abs_path=[], no_tag_node_value_mangle=False, mod=0):
""" Mangles dict keys according to a regex and replacement character.
@@ -414,68 +69,6 @@ def _mangle_dict_keys(data, regex, replacement, abs_path=[], no_tag_node_value_m
def mangle_dict_keys(data, regex, replacement, abs_path=[], no_tag_node_value_mangle=False):
return _mangle_dict_keys(data, regex, replacement, abs_path=abs_path, no_tag_node_value_mangle=no_tag_node_value_mangle, mod=0)
-def _get_sub_dict(d, lpath):
- k = lpath[0]
- if k not in d.keys():
- return {}
- c = {k: d[k]}
- lpath = lpath[1:]
- if not lpath:
- return c
- elif not isinstance(c[k], dict):
- return {}
- return _get_sub_dict(c[k], lpath)
-
-def get_sub_dict(source, lpath, get_first_key=False):
- """ Returns the sub-dict of a nested dict, defined by path of keys.
-
- Args:
- source (dict): Source dict to extract from
- lpath (list[str]): sequence of keys
-
- Returns: source, if lpath is empty, else
- {key : source[..]..[key]} for key the last element of lpath, if exists
- {} otherwise
- """
- if not isinstance(source, dict):
- raise TypeError("source must be of type dict")
- if not isinstance(lpath, list):
- raise TypeError("path must be of type list")
- if not lpath:
- return source
-
- ret = _get_sub_dict(source, lpath)
-
- if get_first_key and lpath and ret:
- tmp = next(iter(ret.values()))
- if not isinstance(tmp, dict):
- raise TypeError("Data under node is not of type dict")
- ret = tmp
-
- return ret
-
-def process_running(pid_file):
- """ Checks if a process with PID in pid_file is running """
- from psutil import pid_exists
- if not os.path.isfile(pid_file):
- return False
- with open(pid_file, 'r') as f:
- pid = f.read().strip()
- return pid_exists(int(pid))
-
-def process_named_running(name, cmdline: str=None):
- """ Checks if process with given name is running and returns its PID.
- If Process is not running, return None
- """
- from psutil import process_iter
- for p in process_iter(['name', 'pid', 'cmdline']):
- if cmdline:
- if p.info['name'] == name and cmdline in p.info['cmdline']:
- return p.info['pid']
- elif p.info['name'] == name:
- return p.info['pid']
- return None
-
def is_list_equal(first: list, second: list) -> bool:
""" Check if 2 lists are equal and list not empty """
if len(first) != len(second) or len(first) == 0:
@@ -620,13 +213,6 @@ def get_cfg_group_id():
group_data = getgrnam(cfg_group)
return group_data.gr_gid
-
-def file_is_persistent(path):
- import re
- location = r'^(/config|/opt/vyatta/etc/config)'
- absolute = os.path.abspath(os.path.dirname(path))
- return re.match(location,absolute)
-
def wait_for_inotify(file_path, pre_hook=None, event_type=None, timeout=None, sleep_interval=0.1):
""" Waits for an inotify event to occur """
if not os.path.dirname(file_path):
@@ -668,91 +254,6 @@ def wait_for_file_write_complete(file_path, pre_hook=None, timeout=None, sleep_i
wait_for_inotify(file_path,
event_type='IN_CLOSE_WRITE', pre_hook=pre_hook, timeout=timeout, sleep_interval=sleep_interval)
-def commit_in_progress():
- """ Not to be used in normal op mode scripts! """
-
- # The CStore backend locks the config by opening a file
- # The file is not removed after commit, so just checking
- # if it exists is insufficient, we need to know if it's open by anyone
-
- # There are two ways to check if any other process keeps a file open.
- # The first one is to try opening it and see if the OS objects.
- # That's faster but prone to race conditions and can be intrusive.
- # The other one is to actually check if any process keeps it open.
- # It's non-intrusive but needs root permissions, else you can't check
- # processes of other users.
- #
- # Since this will be used in scripts that modify the config outside of the CLI
- # framework, those knowingly have root permissions.
- # For everything else, we add a safeguard.
- from psutil import process_iter
- from psutil import NoSuchProcess
- from getpass import getuser
- from vyos.defaults import commit_lock
-
- if getuser() != 'root':
- raise OSError('This functions needs to be run as root to return correct results!')
-
- for proc in process_iter():
- try:
- files = proc.open_files()
- if files:
- for f in files:
- if f.path == commit_lock:
- return True
- except NoSuchProcess as err:
- # Process died before we could examine it
- pass
- # Default case
- return False
-
-
-def wait_for_commit_lock():
- """ Not to be used in normal op mode scripts! """
- from time import sleep
- # Very synchronous approach to multiprocessing
- while commit_in_progress():
- sleep(1)
-
-def ask_input(question, default='', numeric_only=False, valid_responses=[]):
- question_out = question
- if default:
- question_out += f' (Default: {default})'
- response = ''
- while True:
- response = input(question_out + ' ').strip()
- if not response and default:
- return default
- if numeric_only:
- if not response.isnumeric():
- print("Invalid value, try again.")
- continue
- response = int(response)
- if valid_responses and response not in valid_responses:
- print("Invalid value, try again.")
- continue
- break
- return response
-
-def ask_yes_no(question, default=False) -> bool:
- """Ask a yes/no question via input() and return their answer."""
- from sys import stdout
- default_msg = "[Y/n]" if default else "[y/N]"
- while True:
- try:
- stdout.write("%s %s " % (question, default_msg))
- c = input().lower()
- if c == '':
- return default
- elif c in ("y", "ye", "yes"):
- return True
- elif c in ("n", "no"):
- return False
- else:
- stdout.write("Please respond with yes/y or no/n\n")
- except EOFError:
- stdout.write("\nPlease respond with yes/y or no/n\n")
-
def is_admin() -> bool:
"""Look if current user is in sudo group"""
from getpass import getuser
@@ -761,7 +262,6 @@ def is_admin() -> bool:
(_, _, _, admin_group_members) = getgrnam('sudo')
return current_user in admin_group_members
-
def mac2eui64(mac, prefix=None):
"""
Convert a MAC address to a EUI64 address or, with prefix provided, a full
@@ -795,6 +295,7 @@ def get_half_cpus():
def check_kmod(k_mod):
""" Common utility function to load required kernel modules on demand """
from vyos import ConfigError
+ from vyos.utils.process import call
if isinstance(k_mod, str):
k_mod = k_mod.split()
for module in k_mod:
@@ -814,60 +315,6 @@ def find_device_file(device):
return None
-def dict_search(path, dict_object):
- """ Traverse Python dictionary (dict_object) delimited by dot (.).
- Return value of key if found, None otherwise.
-
- This is faster implementation then jmespath.search('foo.bar', dict_object)"""
- if not isinstance(dict_object, dict) or not path:
- return None
-
- parts = path.split('.')
- inside = parts[:-1]
- if not inside:
- if path not in dict_object:
- return None
- return dict_object[path]
- c = dict_object
- for p in parts[:-1]:
- c = c.get(p, {})
- return c.get(parts[-1], None)
-
-def dict_search_args(dict_object, *path):
- # Traverse dictionary using variable arguments
- # Added due to above function not allowing for '.' in the key names
- # Example: dict_search_args(some_dict, 'key', 'subkey', 'subsubkey', ...)
- if not isinstance(dict_object, dict) or not path:
- return None
-
- for item in path:
- if item not in dict_object:
- return None
- dict_object = dict_object[item]
- return dict_object
-
-def dict_search_recursive(dict_object, key, path=[]):
- """ Traverse a dictionary recurisvely and return the value of the key
- we are looking for.
-
- Thankfully copied from https://stackoverflow.com/a/19871956
-
- Modified to yield optional path to found keys
- """
- if isinstance(dict_object, list):
- for i in dict_object:
- new_path = path + [i]
- for x in dict_search_recursive(i, key, new_path):
- yield x
- elif isinstance(dict_object, dict):
- if key in dict_object:
- new_path = path + [key]
- yield dict_object[key], new_path
- for k, j in dict_object.items():
- new_path = path + [k]
- for x in dict_search_recursive(j, key, new_path):
- yield x
-
def convert_data(data):
"""Convert multiple types of data to types usable in CLI
@@ -898,114 +345,6 @@ def convert_data(data):
dict_tmp[key] = convert_data(value)
return dict_tmp
-def get_bridge_fdb(interface):
- """ Returns the forwarding database entries for a given interface """
- if not os.path.exists(f'/sys/class/net/{interface}'):
- return None
- from json import loads
- tmp = loads(cmd(f'bridge -j fdb show dev {interface}'))
- return tmp
-
-def get_interface_config(interface):
- """ Returns the used encapsulation protocol for given interface.
- If interface does not exist, None is returned.
- """
- if not os.path.exists(f'/sys/class/net/{interface}'):
- return None
- from json import loads
- tmp = loads(cmd(f'ip --detail --json link show dev {interface}'))[0]
- return tmp
-
-def get_interface_address(interface):
- """ Returns the used encapsulation protocol for given interface.
- If interface does not exist, None is returned.
- """
- if not os.path.exists(f'/sys/class/net/{interface}'):
- return None
- from json import loads
- tmp = loads(cmd(f'ip --detail --json addr show dev {interface}'))[0]
- return tmp
-
-def get_interface_namespace(iface):
- """
- Returns wich netns the interface belongs to
- """
- from json import loads
- # Check if netns exist
- tmp = loads(cmd(f'ip --json netns ls'))
- if len(tmp) == 0:
- return None
-
- for ns in tmp:
- netns = f'{ns["name"]}'
- # Search interface in each netns
- data = loads(cmd(f'ip netns exec {netns} ip --json link show'))
- for tmp in data:
- if iface == tmp["ifname"]:
- return netns
-
-def get_all_vrfs():
- """ Return a dictionary of all system wide known VRF instances """
- from json import loads
- tmp = loads(cmd('ip --json vrf list'))
- # Result is of type [{"name":"red","table":1000},{"name":"blue","table":2000}]
- # so we will re-arrange it to a more nicer representation:
- # {'red': {'table': 1000}, 'blue': {'table': 2000}}
- data = {}
- for entry in tmp:
- name = entry.pop('name')
- data[name] = entry
- return data
-
-def print_error(str='', end='\n'):
- """
- Print `str` to stderr, terminated with `end`.
- Used for warnings and out-of-band messages to avoid mangling precious
- stdout output.
- """
- sys.stderr.write(str)
- sys.stderr.write(end)
- sys.stderr.flush()
-
-def make_progressbar():
- """
- Make a procedure that takes two arguments `done` and `total` and prints a
- progressbar based on the ratio thereof, whose length is determined by the
- width of the terminal.
- """
- import shutil, math
- col, _ = shutil.get_terminal_size()
- col = max(col - 15, 20)
- def print_progressbar(done, total):
- if done <= total:
- increment = total / col
- length = math.ceil(done / increment)
- percentage = str(math.ceil(100 * done / total)).rjust(3)
- print_error(f'[{length * "#"}{(col - length) * "_"}] {percentage}%', '\r')
- # Print a newline so that the subsequent prints don't overwrite the full bar.
- if done == total:
- print_error()
- return print_progressbar
-
-def make_incremental_progressbar(increment: float):
- """
- Make a generator that displays a progressbar that grows monotonically with
- every iteration.
- First call displays it at 0% and every subsequent iteration displays it
- at `increment` increments where 0.0 < `increment` < 1.0.
- Intended for FTP and HTTP transfers with stateless callbacks.
- """
- print_progressbar = make_progressbar()
- total = 0.0
- while total < 1.0:
- print_progressbar(total, 1.0)
- yield
- total += increment
- print_progressbar(1, 1)
- # Ignore further calls.
- while True:
- yield
-
def begin(*args):
"""
Evaluate arguments in order and return the result of the *last* argument.
@@ -1020,67 +359,16 @@ def begin0(*args):
"""
return args[0]
-def is_systemd_service_active(service):
- """ Test is a specified systemd service is activated.
- Returns True if service is active, false otherwise.
- Copied from: https://unix.stackexchange.com/a/435317 """
- tmp = cmd(f'systemctl show --value -p ActiveState {service}')
- return bool((tmp == 'active'))
-
-def is_systemd_service_running(service):
- """ Test is a specified systemd service is actually running.
- Returns True if service is running, false otherwise.
- Copied from: https://unix.stackexchange.com/a/435317 """
- tmp = cmd(f'systemctl show --value -p SubState {service}')
- return bool((tmp == 'running'))
-
-def check_port_availability(ipaddress, port, protocol):
- """
- Check if port is available and not used by any service
- Return False if a port is busy or IP address does not exists
- Should be used carefully for services that can start listening
- dynamically, because IP address may be dynamic too
- """
- from socketserver import TCPServer, UDPServer
- from ipaddress import ip_address
-
- # verify arguments
- try:
- ipaddress = ip_address(ipaddress).compressed
- except:
- raise ValueError(f'The {ipaddress} is not a valid IPv4 or IPv6 address')
- if port not in range(1, 65536):
- raise ValueError(f'The port number {port} is not in the 1-65535 range')
- if protocol not in ['tcp', 'udp']:
- raise ValueError(
- f'The protocol {protocol} is not supported. Only tcp and udp are allowed'
- )
-
- # check port availability
- try:
- if protocol == 'tcp':
- server = TCPServer((ipaddress, port), None, bind_and_activate=True)
- if protocol == 'udp':
- server = UDPServer((ipaddress, port), None, bind_and_activate=True)
- server.server_close()
- except Exception as e:
- # errno.h:
- #define EADDRINUSE 98 /* Address already in use */
- if e.errno == 98:
- return False
-
- return True
-
def install_into_config(conf, config_paths, override_prompt=True):
# Allows op-mode scripts to install values if called from an active config session
# config_paths: dict of config paths
# override_prompt: if True, user will be prompted before existing nodes are overwritten
-
if not config_paths:
return None
from vyos.config import Config
-
+ from vyos.utils.io import ask_yes_no
+ from vyos.utils.process import cmd
if not Config().in_session():
print('You are not in configure mode, commands to install manually from configure mode:')
for path in config_paths:
@@ -1109,27 +397,6 @@ def install_into_config(conf, config_paths, override_prompt=True):
if count > 0:
print(f'{count} value(s) installed. Use "compare" to see the pending changes, and "commit" to apply.')
-def is_wwan_connected(interface):
- """ Determine if a given WWAN interface, e.g. wwan0 is connected to the
- carrier network or not """
- import json
-
- if not interface.startswith('wwan'):
- raise ValueError(f'Specified interface "{interface}" is not a WWAN interface')
-
- # ModemManager is required for connection(s) - if service is not running,
- # there won't be any connection at all!
- if not is_systemd_service_active('ModemManager.service'):
- return False
-
- modem = interface.lstrip('wwan')
-
- tmp = cmd(f'mmcli --modem {modem} --output-json')
- tmp = json.loads(tmp)
-
- # return True/False if interface is in connected state
- return dict_search('modem.generic.state', tmp) == 'connected'
-
def load_as_module(name: str, path: str):
import importlib.util
diff --git a/python/vyos/utils/__init__.py b/python/vyos/utils/__init__.py
index 5c7a9ecb8..abc9af5da 100644
--- a/python/vyos/utils/__init__.py
+++ b/python/vyos/utils/__init__.py
@@ -13,5 +13,13 @@
# You should have received a copy of the GNU Lesser General Public
# License along with this library. If not, see <http://www.gnu.org/licenses/>.
-from vyos.utils import network
from vyos.utils import boot
+from vyos.utils import commit
+from vyos.utils import convert
+from vyos.utils import dict
+from vyos.utils import file
+from vyos.utils import io
+from vyos.utils import network
+from vyos.utils import permission
+from vyos.utils import process
+from vyos.utils import system
diff --git a/python/vyos/utils/commit.py b/python/vyos/utils/commit.py
new file mode 100644
index 000000000..105aed8c2
--- /dev/null
+++ b/python/vyos/utils/commit.py
@@ -0,0 +1,60 @@
+# Copyright 2023 VyOS maintainers and contributors <maintainers@vyos.io>
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library. If not, see <http://www.gnu.org/licenses/>.
+
+def commit_in_progress():
+ """ Not to be used in normal op mode scripts! """
+
+ # The CStore backend locks the config by opening a file
+ # The file is not removed after commit, so just checking
+ # if it exists is insufficient, we need to know if it's open by anyone
+
+ # There are two ways to check if any other process keeps a file open.
+ # The first one is to try opening it and see if the OS objects.
+ # That's faster but prone to race conditions and can be intrusive.
+ # The other one is to actually check if any process keeps it open.
+ # It's non-intrusive but needs root permissions, else you can't check
+ # processes of other users.
+ #
+ # Since this will be used in scripts that modify the config outside of the CLI
+ # framework, those knowingly have root permissions.
+ # For everything else, we add a safeguard.
+ from psutil import process_iter
+ from psutil import NoSuchProcess
+ from getpass import getuser
+ from vyos.defaults import commit_lock
+
+ if getuser() != 'root':
+ raise OSError('This functions needs to be run as root to return correct results!')
+
+ for proc in process_iter():
+ try:
+ files = proc.open_files()
+ if files:
+ for f in files:
+ if f.path == commit_lock:
+ return True
+ except NoSuchProcess as err:
+ # Process died before we could examine it
+ pass
+ # Default case
+ return False
+
+
+def wait_for_commit_lock():
+ """ Not to be used in normal op mode scripts! """
+ from time import sleep
+ # Very synchronous approach to multiprocessing
+ while commit_in_progress():
+ sleep(1)
diff --git a/python/vyos/utils/file.py b/python/vyos/utils/file.py
index 2560a35be..667a2464b 100644
--- a/python/vyos/utils/file.py
+++ b/python/vyos/utils/file.py
@@ -14,7 +14,19 @@
# License along with this library. If not, see <http://www.gnu.org/licenses/>.
import os
+from vyos.utils.permission import chown
+def makedir(path, user=None, group=None):
+ if os.path.exists(path):
+ return
+ os.makedirs(path, mode=0o755)
+ chown(path, user, group)
+
+def file_is_persistent(path):
+ import re
+ location = r'^(/config|/opt/vyatta/etc/config)'
+ absolute = os.path.abspath(os.path.dirname(path))
+ return re.match(location,absolute)
def read_file(fname, defaultonfailure=None):
"""
diff --git a/python/vyos/utils/network.py b/python/vyos/utils/network.py
index 209bc9ecc..8db598f05 100644
--- a/python/vyos/utils/network.py
+++ b/python/vyos/utils/network.py
@@ -13,6 +13,8 @@
# You should have received a copy of the GNU Lesser General Public
# License along with this library. If not, see <http://www.gnu.org/licenses/>.
+import os
+
def get_protocol_by_name(protocol_name):
"""Get protocol number by protocol name
@@ -27,7 +29,7 @@ def get_protocol_by_name(protocol_name):
return protocol_name
def interface_exists_in_netns(interface_name, netns):
- from vyos.util import rc_cmd
+ from vyos.utils.process import rc_cmd
rc, out = rc_cmd(f'ip netns exec {netns} ip link show dev {interface_name}')
if rc == 0:
return True
@@ -35,9 +37,155 @@ def interface_exists_in_netns(interface_name, netns):
def get_interface_vrf(interface):
""" Returns VRF of given interface """
- from vyos.util import dict_search
- from vyos.util import get_interface_config
+ from vyos.utils.dict import dict_search
+ from vyos.utils.network import get_interface_config
tmp = get_interface_config(interface)
if dict_search('linkinfo.info_slave_kind', tmp) == 'vrf':
return tmp['master']
return 'default'
+
+def get_interface_config(interface):
+ """ Returns the used encapsulation protocol for given interface.
+ If interface does not exist, None is returned.
+ """
+ if not os.path.exists(f'/sys/class/net/{interface}'):
+ return None
+ from json import loads
+ from vyos.utils.process import cmd
+ tmp = loads(cmd(f'ip --detail --json link show dev {interface}'))[0]
+ return tmp
+
+def get_interface_address(interface):
+ """ Returns the used encapsulation protocol for given interface.
+ If interface does not exist, None is returned.
+ """
+ if not os.path.exists(f'/sys/class/net/{interface}'):
+ return None
+ from json import loads
+ from vyos.utils.process import cmd
+ tmp = loads(cmd(f'ip --detail --json addr show dev {interface}'))[0]
+ return tmp
+
+def get_interface_namespace(iface):
+ """
+ Returns wich netns the interface belongs to
+ """
+ from json import loads
+ from vyos.utils.process import cmd
+ # Check if netns exist
+ tmp = loads(cmd(f'ip --json netns ls'))
+ if len(tmp) == 0:
+ return None
+
+ for ns in tmp:
+ netns = f'{ns["name"]}'
+ # Search interface in each netns
+ data = loads(cmd(f'ip netns exec {netns} ip --json link show'))
+ for tmp in data:
+ if iface == tmp["ifname"]:
+ return netns
+
+
+def is_wwan_connected(interface):
+ """ Determine if a given WWAN interface, e.g. wwan0 is connected to the
+ carrier network or not """
+ import json
+ from vyos.utils.process import cmd
+
+ if not interface.startswith('wwan'):
+ raise ValueError(f'Specified interface "{interface}" is not a WWAN interface')
+
+ # ModemManager is required for connection(s) - if service is not running,
+ # there won't be any connection at all!
+ if not is_systemd_service_active('ModemManager.service'):
+ return False
+
+ modem = interface.lstrip('wwan')
+
+ tmp = cmd(f'mmcli --modem {modem} --output-json')
+ tmp = json.loads(tmp)
+
+ # return True/False if interface is in connected state
+ return dict_search('modem.generic.state', tmp) == 'connected'
+
+def get_bridge_fdb(interface):
+ """ Returns the forwarding database entries for a given interface """
+ if not os.path.exists(f'/sys/class/net/{interface}'):
+ return None
+ from json import loads
+ from vyos.utils.process import cmd
+ tmp = loads(cmd(f'bridge -j fdb show dev {interface}'))
+ return tmp
+
+def get_all_vrfs():
+ """ Return a dictionary of all system wide known VRF instances """
+ from json import loads
+ from vyos.utils.process import cmd
+ tmp = loads(cmd('ip --json vrf list'))
+ # Result is of type [{"name":"red","table":1000},{"name":"blue","table":2000}]
+ # so we will re-arrange it to a more nicer representation:
+ # {'red': {'table': 1000}, 'blue': {'table': 2000}}
+ data = {}
+ for entry in tmp:
+ name = entry.pop('name')
+ data[name] = entry
+ return data
+
+def mac2eui64(mac, prefix=None):
+ """
+ Convert a MAC address to a EUI64 address or, with prefix provided, a full
+ IPv6 address.
+ Thankfully copied from https://gist.github.com/wido/f5e32576bb57b5cc6f934e177a37a0d3
+ """
+ import re
+ from ipaddress import ip_network
+ # http://tools.ietf.org/html/rfc4291#section-2.5.1
+ eui64 = re.sub(r'[.:-]', '', mac).lower()
+ eui64 = eui64[0:6] + 'fffe' + eui64[6:]
+ eui64 = hex(int(eui64[0:2], 16) ^ 2)[2:].zfill(2) + eui64[2:]
+
+ if prefix is None:
+ return ':'.join(re.findall(r'.{4}', eui64))
+ else:
+ try:
+ net = ip_network(prefix, strict=False)
+ euil = int('0x{0}'.format(eui64), 16)
+ return str(net[euil])
+ except: # pylint: disable=bare-except
+ return
+
+
+def check_port_availability(ipaddress, port, protocol):
+ """
+ Check if port is available and not used by any service
+ Return False if a port is busy or IP address does not exists
+ Should be used carefully for services that can start listening
+ dynamically, because IP address may be dynamic too
+ """
+ from socketserver import TCPServer, UDPServer
+ from ipaddress import ip_address
+
+ # verify arguments
+ try:
+ ipaddress = ip_address(ipaddress).compressed
+ except:
+ raise ValueError(f'The {ipaddress} is not a valid IPv4 or IPv6 address')
+ if port not in range(1, 65536):
+ raise ValueError(f'The port number {port} is not in the 1-65535 range')
+ if protocol not in ['tcp', 'udp']:
+ raise ValueError(f'The protocol {protocol} is not supported. Only tcp and udp are allowed')
+
+ # check port availability
+ try:
+ if protocol == 'tcp':
+ server = TCPServer((ipaddress, port), None, bind_and_activate=True)
+ if protocol == 'udp':
+ server = UDPServer((ipaddress, port), None, bind_and_activate=True)
+ server.server_close()
+ except Exception as e:
+ # errno.h:
+ #define EADDRINUSE 98 /* Address already in use */
+ if e.errno == 98:
+ return False
+
+ return True
diff --git a/python/vyos/utils/permission.py b/python/vyos/utils/permission.py
new file mode 100644
index 000000000..8c2d72b83
--- /dev/null
+++ b/python/vyos/utils/permission.py
@@ -0,0 +1,63 @@
+# Copyright 2023 VyOS maintainers and contributors <maintainers@vyos.io>
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library. If not, see <http://www.gnu.org/licenses/>.
+
+import os
+
+def chown(path, user, group):
+ """ change file/directory owner """
+ from pwd import getpwnam
+ from grp import getgrnam
+
+ if user is None or group is None:
+ return False
+
+ # path may also be an open file descriptor
+ if not isinstance(path, int) and not os.path.exists(path):
+ return False
+
+ uid = getpwnam(user).pw_uid
+ gid = getgrnam(group).gr_gid
+ os.chown(path, uid, gid)
+ return True
+
+def chmod(path, bitmask):
+ # path may also be an open file descriptor
+ if not isinstance(path, int) and not os.path.exists(path):
+ return
+ if bitmask is None:
+ return
+ os.chmod(path, bitmask)
+
+def chmod_600(path):
+ """ make file only read/writable by owner """
+ from stat import S_IRUSR, S_IWUSR
+
+ bitmask = S_IRUSR | S_IWUSR
+ chmod(path, bitmask)
+
+def chmod_750(path):
+ """ make file/directory only executable to user and group """
+ from stat import S_IRUSR, S_IWUSR, S_IXUSR, S_IRGRP, S_IXGRP
+
+ bitmask = S_IRUSR | S_IWUSR | S_IXUSR | S_IRGRP | S_IXGRP
+ chmod(path, bitmask)
+
+def chmod_755(path):
+ """ make file executable by all """
+ from stat import S_IRUSR, S_IWUSR, S_IXUSR, S_IRGRP, S_IXGRP, S_IROTH, S_IXOTH
+
+ bitmask = S_IRUSR | S_IWUSR | S_IXUSR | S_IRGRP | S_IXGRP | \
+ S_IROTH | S_IXOTH
+ chmod(path, bitmask)
diff --git a/python/vyos/utils/process.py b/python/vyos/utils/process.py
new file mode 100644
index 000000000..15b26f4eb
--- /dev/null
+++ b/python/vyos/utils/process.py
@@ -0,0 +1,230 @@
+# Copyright 2023 VyOS maintainers and contributors <maintainers@vyos.io>
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library. If not, see <http://www.gnu.org/licenses/>.
+
+from subprocess import Popen
+from subprocess import PIPE
+from subprocess import STDOUT
+from subprocess import DEVNULL
+
+def popen(command, flag='', shell=None, input=None, timeout=None, env=None,
+ stdout=PIPE, stderr=PIPE, decode='utf-8'):
+ """
+ popen is a wrapper helper aound subprocess.Popen
+ with it default setting it will return a tuple (out, err)
+ out: the output of the program run
+ err: the error code returned by the program
+
+ it can be affected by the following flags:
+ shell: do not try to auto-detect if a shell is required
+ for example if a pipe (|) or redirection (>, >>) is used
+ input: data to sent to the child process via STDIN
+ the data should be bytes but string will be converted
+ timeout: time after which the command will be considered to have failed
+ env: mapping that defines the environment variables for the new process
+ stdout: define how the output of the program should be handled
+ - PIPE (default), sends stdout to the output
+ - DEVNULL, discard the output
+ stderr: define how the output of the program should be handled
+ - None (default), send/merge the data to/with stderr
+ - PIPE, popen will append it to output
+ - STDOUT, send the data to be merged with stdout
+ - DEVNULL, discard the output
+ decode: specify the expected text encoding (utf-8, ascii, ...)
+ the default is explicitely utf-8 which is python's own default
+
+ usage:
+ get both stdout and stderr: popen('command', stdout=PIPE, stderr=STDOUT)
+ discard stdout and get stderr: popen('command', stdout=DEVNUL, stderr=PIPE)
+ """
+
+ # airbag must be left as an import in the function as otherwise we have a
+ # a circual import dependency
+ from vyos import debug
+ from vyos import airbag
+
+ # log if the flag is set, otherwise log if command is set
+ if not debug.enabled(flag):
+ flag = 'command'
+
+ cmd_msg = f"cmd '{command}'"
+ debug.message(cmd_msg, flag)
+
+ use_shell = shell
+ stdin = None
+ if shell is None:
+ use_shell = False
+ if ' ' in command:
+ use_shell = True
+ if env:
+ use_shell = True
+
+ if input:
+ stdin = PIPE
+ input = input.encode() if type(input) is str else input
+
+ p = Popen(command, stdin=stdin, stdout=stdout, stderr=stderr,
+ env=env, shell=use_shell)
+
+ pipe = p.communicate(input, timeout)
+
+ pipe_out = b''
+ if stdout == PIPE:
+ pipe_out = pipe[0]
+
+ pipe_err = b''
+ if stderr == PIPE:
+ pipe_err = pipe[1]
+
+ str_out = pipe_out.decode(decode).replace('\r\n', '\n').strip()
+ str_err = pipe_err.decode(decode).replace('\r\n', '\n').strip()
+
+ out_msg = f"returned (out):\n{str_out}"
+ if str_out:
+ debug.message(out_msg, flag)
+
+ if str_err:
+ from sys import stderr
+ err_msg = f"returned (err):\n{str_err}"
+ # this message will also be send to syslog via airbag
+ debug.message(err_msg, flag, destination=stderr)
+
+ # should something go wrong, report this too via airbag
+ airbag.noteworthy(cmd_msg)
+ airbag.noteworthy(out_msg)
+ airbag.noteworthy(err_msg)
+
+ return str_out, p.returncode
+
+
+def run(command, flag='', shell=None, input=None, timeout=None, env=None,
+ stdout=DEVNULL, stderr=PIPE, decode='utf-8'):
+ """
+ A wrapper around popen, which discard the stdout and
+ will return the error code of a command
+ """
+ _, code = popen(
+ command, flag,
+ stdout=stdout, stderr=stderr,
+ input=input, timeout=timeout,
+ env=env, shell=shell,
+ decode=decode,
+ )
+ return code
+
+
+def cmd(command, flag='', shell=None, input=None, timeout=None, env=None,
+ stdout=PIPE, stderr=PIPE, decode='utf-8', raising=None, message='',
+ expect=[0]):
+ """
+ A wrapper around popen, which returns the stdout and
+ will raise the error code of a command
+
+ raising: specify which call should be used when raising
+ the class should only require a string as parameter
+ (default is OSError) with the error code
+ expect: a list of error codes to consider as normal
+ """
+ decoded, code = popen(
+ command, flag,
+ stdout=stdout, stderr=stderr,
+ input=input, timeout=timeout,
+ env=env, shell=shell,
+ decode=decode,
+ )
+ if code not in expect:
+ feedback = message + '\n' if message else ''
+ feedback += f'failed to run command: {command}\n'
+ feedback += f'returned: {decoded}\n'
+ feedback += f'exit code: {code}'
+ if raising is None:
+ # error code can be recovered with .errno
+ raise OSError(code, feedback)
+ else:
+ raise raising(feedback)
+ return decoded
+
+
+def rc_cmd(command, flag='', shell=None, input=None, timeout=None, env=None,
+ stdout=PIPE, stderr=STDOUT, decode='utf-8'):
+ """
+ A wrapper around popen, which returns the return code
+ of a command and stdout
+
+ % rc_cmd('uname')
+ (0, 'Linux')
+ % rc_cmd('ip link show dev eth99')
+ (1, 'Device "eth99" does not exist.')
+ """
+ out, code = popen(
+ command, flag,
+ stdout=stdout, stderr=stderr,
+ input=input, timeout=timeout,
+ env=env, shell=shell,
+ decode=decode,
+ )
+ return code, out
+
+def call(command, flag='', shell=None, input=None, timeout=None, env=None,
+ stdout=PIPE, stderr=PIPE, decode='utf-8'):
+ """
+ A wrapper around popen, which print the stdout and
+ will return the error code of a command
+ """
+ out, code = popen(
+ command, flag,
+ stdout=stdout, stderr=stderr,
+ input=input, timeout=timeout,
+ env=env, shell=shell,
+ decode=decode,
+ )
+ if out:
+ print(out)
+ return code
+
+def process_running(pid_file):
+ """ Checks if a process with PID in pid_file is running """
+ from psutil import pid_exists
+ if not os.path.isfile(pid_file):
+ return False
+ with open(pid_file, 'r') as f:
+ pid = f.read().strip()
+ return pid_exists(int(pid))
+
+def process_named_running(name, cmdline: str=None):
+ """ Checks if process with given name is running and returns its PID.
+ If Process is not running, return None
+ """
+ from psutil import process_iter
+ for p in process_iter(['name', 'pid', 'cmdline']):
+ if cmdline:
+ if p.info['name'] == name and cmdline in p.info['cmdline']:
+ return p.info['pid']
+ elif p.info['name'] == name:
+ return p.info['pid']
+ return None
+
+def is_systemd_service_active(service):
+ """ Test is a specified systemd service is activated.
+ Returns True if service is active, false otherwise.
+ Copied from: https://unix.stackexchange.com/a/435317 """
+ tmp = cmd(f'systemctl show --value -p ActiveState {service}')
+ return bool((tmp == 'active'))
+
+def is_systemd_service_running(service):
+ """ Test is a specified systemd service is actually running.
+ Returns True if service is running, false otherwise.
+ Copied from: https://unix.stackexchange.com/a/435317 """
+ tmp = cmd(f'systemctl show --value -p SubState {service}')
+ return bool((tmp == 'running'))
diff --git a/python/vyos/validate.py b/python/vyos/validate.py
index e5d8c6043..7afbe81c9 100644
--- a/python/vyos/validate.py
+++ b/python/vyos/validate.py
@@ -100,8 +100,8 @@ def is_intf_addr_assigned(intf, address) -> bool:
def is_addr_assigned(ip_address, vrf=None) -> bool:
""" Verify if the given IPv4/IPv6 address is assigned to any interface """
from netifaces import interfaces
- from vyos.util import get_interface_config
- from vyos.util import dict_search
+ from vyos.utils.network import get_interface_config
+ from vyos.utils.dict import dict_search
for interface in interfaces():
# Check if interface belongs to the requested VRF, if this is not the
# case there is no need to proceed with this data set - continue loop
@@ -218,7 +218,7 @@ def assert_mtu(mtu, ifname):
assert_number(mtu)
import json
- from vyos.util import cmd
+ from vyos.utils.process import cmd
out = cmd(f'ip -j -d link show dev {ifname}')
# [{"ifindex":2,"ifname":"eth0","flags":["BROADCAST","MULTICAST","UP","LOWER_UP"],"mtu":1500,"qdisc":"pfifo_fast","operstate":"UP","linkmode":"DEFAULT","group":"default","txqlen":1000,"link_type":"ether","address":"08:00:27:d9:5b:04","broadcast":"ff:ff:ff:ff:ff:ff","promiscuity":0,"min_mtu":46,"max_mtu":16110,"inet6_addr_gen_mode":"none","num_tx_queues":1,"num_rx_queues":1,"gso_max_size":65536,"gso_max_segs":65535}]
parsed = json.loads(out)[0]
diff --git a/python/vyos/version.py b/python/vyos/version.py
index fb706ad44..1c5651c83 100644
--- a/python/vyos/version.py
+++ b/python/vyos/version.py
@@ -1,4 +1,4 @@
-# Copyright 2017-2020 VyOS maintainers and contributors <maintainers@vyos.io>
+# Copyright 2017-2023 VyOS maintainers and contributors <maintainers@vyos.io>
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
@@ -34,11 +34,11 @@ import json
import requests
import vyos.defaults
-from vyos.util import read_file
-from vyos.util import read_json
-from vyos.util import popen
-from vyos.util import run
-from vyos.util import DEVNULL
+from vyos.utils.file import read_file
+from vyos.utils.file import read_json
+from vyos.utils.process import popen
+from vyos.utils.process import run
+from vyos.utils.process import DEVNULL
version_file = os.path.join(vyos.defaults.directories['data'], 'version.json')