summaryrefslogtreecommitdiff
path: root/python/vyos/util.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/vyos/util.py')
-rw-r--r--python/vyos/util.py70
1 files changed, 68 insertions, 2 deletions
diff --git a/python/vyos/util.py b/python/vyos/util.py
index 9c4c29322..571d43754 100644
--- a/python/vyos/util.py
+++ b/python/vyos/util.py
@@ -794,6 +794,24 @@ def get_interface_address(interface):
tmp = loads(cmd(f'ip -d -j addr show {interface}'))[0]
return tmp
+def get_interface_namespace(iface):
+ """
+ Returns wich netns the interface belongs to
+ """
+ from json import loads
+ # Check if netns exist
+ tmp = loads(cmd(f'ip --json netns ls'))
+ if len(tmp) == 0:
+ return None
+
+ for ns in tmp:
+ namespace = f'{ns["name"]}'
+ # Search interface in each netns
+ data = loads(cmd(f'ip netns exec {namespace} ip -j link show'))
+ for compare in data:
+ if iface == compare["ifname"]:
+ return namespace
+
def get_all_vrfs():
""" Return a dictionary of all system wide known VRF instances """
from json import loads
@@ -856,6 +874,20 @@ def make_incremental_progressbar(increment: float):
while True:
yield
+def begin(*args):
+ """
+ Evaluate arguments in order and return the result of the *last* argument.
+ For combining multiple expressions in one statement. Useful for lambdas.
+ """
+ return args[-1]
+
+def begin0(*args):
+ """
+ Evaluate arguments in order and return the result of the *first* argument.
+ For combining multiple expressions in one statement. Useful for lambdas.
+ """
+ return args[0]
+
def is_systemd_service_active(service):
""" Test is a specified systemd service is activated.
Returns True if service is active, false otherwise.
@@ -920,14 +952,48 @@ def install_into_config(conf, config_paths, override_prompt=True):
return None
count = 0
+ failed = []
for path in config_paths:
if override_prompt and conf.exists(path) and not conf.is_multi(path):
if not ask_yes_no(f'Config node "{node}" already exists. Do you want to overwrite it?'):
continue
- cmd(f'/opt/vyatta/sbin/my_set {path}')
- count += 1
+ try:
+ cmd(f'/opt/vyatta/sbin/my_set {path}')
+ count += 1
+ except:
+ failed.append(path)
+
+ if failed:
+ print(f'Failed to install {len(failed)} value(s). Commands to manually install:')
+ for path in failed:
+ print(f'set {path}')
if count > 0:
print(f'{count} value(s) installed. Use "compare" to see the pending changes, and "commit" to apply.')
+
+def is_wwan_connected(interface):
+ """ Determine if a given WWAN interface, e.g. wwan0 is connected to the
+ carrier network or not """
+ import json
+
+ if not interface.startswith('wwan'):
+ raise ValueError(f'Specified interface "{interface}" is not a WWAN interface')
+
+ modem = interface.lstrip('wwan')
+
+ tmp = cmd(f'mmcli --modem {modem} --output-json')
+ tmp = json.loads(tmp)
+
+ # return True/False if interface is in connected state
+ return dict_search('modem.generic.state', tmp) == 'connected'
+
+def boot_configuration_complete() -> bool:
+ """ Check if the boot config loader has completed
+ """
+ from vyos.defaults import config_status
+
+ if os.path.isfile(config_status):
+ return True
+ return False