summaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
authorDaniil Baturin <daniil@vyos.io>2024-11-07 17:46:27 +0000
committerGitHub <noreply@github.com>2024-11-07 17:46:27 +0000
commit3eb66e36b81c835f802900c4bc2e973e2657fa31 (patch)
tree39e8061bcd16155b80b6ab1747b27aa68b43f9ad /python
parentc97980d18610d8d48a726986139fed29ad03137a (diff)
parent18a9cec3deb6cc2dc49020a89208dc70defe9822 (diff)
downloadvyos-1x-current.tar.gz
vyos-1x-current.zip
Merge pull request #4151 from natali-rs1985/T6695HEADcurrent
T6695: Machine-readable operational mode support for traceroute
Diffstat (limited to 'python')
-rw-r--r--python/vyos/configsession.py14
-rw-r--r--python/vyos/opmode.py165
2 files changed, 117 insertions, 62 deletions
diff --git a/python/vyos/configsession.py b/python/vyos/configsession.py
index 5876dc5b0..90b96b88c 100644
--- a/python/vyos/configsession.py
+++ b/python/vyos/configsession.py
@@ -66,6 +66,16 @@ REBOOT = ['/opt/vyatta/bin/vyatta-op-cmd-wrapper', 'reboot']
POWEROFF = ['/opt/vyatta/bin/vyatta-op-cmd-wrapper', 'poweroff']
OP_CMD_ADD = ['/opt/vyatta/bin/vyatta-op-cmd-wrapper', 'add']
OP_CMD_DELETE = ['/opt/vyatta/bin/vyatta-op-cmd-wrapper', 'delete']
+TRACEROUTE = [
+ '/usr/libexec/vyos/op_mode/mtr_execute.py',
+ 'mtr',
+ '--for-api',
+ '--report-mode',
+ '--report-cycles',
+ '1',
+ '--json',
+ '--host',
+]
# Default "commit via" string
APP = 'vyos-http-api'
@@ -335,3 +345,7 @@ class ConfigSession(object):
def show_container_image(self):
out = self.__run_command(SHOW + ['container', 'image'])
return out
+
+ def traceroute(self, host):
+ out = self.__run_command(TRACEROUTE + [host])
+ return out
diff --git a/python/vyos/opmode.py b/python/vyos/opmode.py
index 136ac35e2..7b11d36dd 100644
--- a/python/vyos/opmode.py
+++ b/python/vyos/opmode.py
@@ -20,89 +20,110 @@ from humps import decamelize
class Error(Exception):
- """ Any error that makes requested operation impossible to complete
- for reasons unrelated to the user input or script logic.
+ """Any error that makes requested operation impossible to complete
+ for reasons unrelated to the user input or script logic.
- This is the base class, scripts should not use it directly
- and should raise more specific errors instead,
- whenever possible.
+ This is the base class, scripts should not use it directly
+ and should raise more specific errors instead,
+ whenever possible.
"""
+
pass
+
class UnconfiguredSubsystem(Error):
- """ Requested operation is valid, but cannot be completed
- because corresponding subsystem is not configured
- and thus is not running.
+ """Requested operation is valid, but cannot be completed
+ because corresponding subsystem is not configured
+ and thus is not running.
"""
+
pass
+
class UnconfiguredObject(UnconfiguredSubsystem):
- """ Requested operation is valid but cannot be completed
- because its parameter refers to an object that does not exist
- in the system configuration.
+ """Requested operation is valid but cannot be completed
+ because its parameter refers to an object that does not exist
+ in the system configuration.
"""
+
pass
+
class DataUnavailable(Error):
- """ Requested operation is valid, but cannot be completed
- because data for it is not available.
- This error MAY be treated as temporary because such issues
- are often caused by transient events such as service restarts.
+ """Requested operation is valid, but cannot be completed
+ because data for it is not available.
+ This error MAY be treated as temporary because such issues
+ are often caused by transient events such as service restarts.
"""
+
pass
+
class PermissionDenied(Error):
- """ Requested operation is valid, but the caller has no permission
- to perform it.
+ """Requested operation is valid, but the caller has no permission
+ to perform it.
"""
+
pass
+
class InsufficientResources(Error):
- """ Requested operation and its arguments are valid but the system
- does not have enough resources (such as drive space or memory)
- to complete it.
+ """Requested operation and its arguments are valid but the system
+ does not have enough resources (such as drive space or memory)
+ to complete it.
"""
+
pass
+
class UnsupportedOperation(Error):
- """ Requested operation is technically valid but is not implemented yet. """
+ """Requested operation is technically valid but is not implemented yet."""
+
pass
+
class IncorrectValue(Error):
- """ Requested operation is valid, but an argument provided has an
- incorrect value, preventing successful completion.
+ """Requested operation is valid, but an argument provided has an
+ incorrect value, preventing successful completion.
"""
+
pass
+
class CommitInProgress(Error):
- """ Requested operation is valid, but not possible at the time due
+ """Requested operation is valid, but not possible at the time due
to a commit being in progress.
"""
+
pass
+
class InternalError(Error):
- """ Any situation when VyOS detects that it could not perform
- an operation correctly due to logic errors in its own code
- or errors in underlying software.
+ """Any situation when VyOS detects that it could not perform
+ an operation correctly due to logic errors in its own code
+ or errors in underlying software.
"""
+
pass
def _is_op_mode_function_name(name):
if re.match(
- r'^(show|clear|reset|restart|add|update|delete|generate|set|renew|release|execute|import)',
+ r'^(show|clear|reset|restart|add|update|delete|generate|set|renew|release|execute|import|mtr)',
name,
):
return True
else:
return False
+
def _capture_output(name):
- if re.match(r"^(show|generate)", name):
+ if re.match(r'^(show|generate)', name):
return True
else:
return False
+
def _get_op_mode_functions(module):
from inspect import getmembers, isfunction
@@ -113,32 +134,35 @@ def _get_op_mode_functions(module):
funcs = list(filter(lambda ft: _is_op_mode_function_name(ft[0]), funcs))
funcs_dict = {}
- for (name, thunk) in funcs:
+ for name, thunk in funcs:
funcs_dict[name] = thunk
return funcs_dict
+
def _is_optional_type(t):
# Optional[t] is internally an alias for Union[t, NoneType]
# and there's no easy way to get union members it seems
- if (type(t) == typing._UnionGenericAlias):
- if (len(t.__args__) == 2):
- if t.__args__[1] == type(None):
+ if type(t) is typing._UnionGenericAlias:
+ if len(t.__args__) == 2:
+ if t.__args__[1] is type(None):
return True
return False
+
def _get_arg_type(t):
- """ Returns the type itself if it's a primitive type,
- or the "real" type of typing.Optional
+ """Returns the type itself if it's a primitive type,
+ or the "real" type of typing.Optional
- Doesn't work with anything else at the moment!
+ Doesn't work with anything else at the moment!
"""
if _is_optional_type(t):
return t.__args__[0]
else:
return t
+
def _is_literal_type(t):
if _is_optional_type(t):
t = _get_arg_type(t)
@@ -148,9 +172,9 @@ def _is_literal_type(t):
return False
+
def _get_literal_values(t):
- """ Returns the tuple of allowed values for a Literal type
- """
+ """Returns the tuple of allowed values for a Literal type"""
if not _is_literal_type(t):
return tuple()
if _is_optional_type(t):
@@ -158,6 +182,7 @@ def _get_literal_values(t):
return typing.get_args(t)
+
def _normalize_field_name(name):
# Convert the name to string if it is not
# (in some cases they may be numbers)
@@ -182,6 +207,7 @@ def _normalize_field_name(name):
return name
+
def _normalize_dict_field_names(old_dict):
new_dict = {}
@@ -191,10 +217,11 @@ def _normalize_dict_field_names(old_dict):
# Sanity check
if len(old_dict) != len(new_dict):
- raise InternalError("Dictionary fields do not allow unique normalization")
+ raise InternalError('Dictionary fields do not allow unique normalization')
else:
return new_dict
+
def _normalize_field_names(value):
if isinstance(value, dict):
return _normalize_dict_field_names(value)
@@ -203,16 +230,19 @@ def _normalize_field_names(value):
else:
return value
+
def run(module):
from argparse import ArgumentParser
functions = _get_op_mode_functions(module)
parser = ArgumentParser()
- subparsers = parser.add_subparsers(dest="subcommand")
+ subparsers = parser.add_subparsers(dest='subcommand')
for function_name in functions:
- subparser = subparsers.add_parser(function_name, help=functions[function_name].__doc__)
+ subparser = subparsers.add_parser(
+ function_name, help=functions[function_name].__doc__
+ )
type_hints = typing.get_type_hints(functions[function_name])
if 'return' in type_hints:
@@ -225,62 +255,73 @@ def run(module):
# Without this, we'd get options like "--foo_bar"
opt = re.sub(r'_', '-', opt)
- if _get_arg_type(th) == bool:
- subparser.add_argument(f"--{opt}", action='store_true')
+ if _get_arg_type(th) is bool:
+ subparser.add_argument(f'--{opt}', action='store_true')
else:
if _is_optional_type(th):
if _is_literal_type(th):
- subparser.add_argument(f"--{opt}",
- choices=list(_get_literal_values(th)),
- default=None)
+ subparser.add_argument(
+ f'--{opt}',
+ choices=list(_get_literal_values(th)),
+ default=None,
+ )
else:
- subparser.add_argument(f"--{opt}",
- type=_get_arg_type(th), default=None)
+ subparser.add_argument(
+ f'--{opt}',
+ type=_get_arg_type(th),
+ default=None,
+ )
else:
if _is_literal_type(th):
- subparser.add_argument(f"--{opt}",
- choices=list(_get_literal_values(th)),
- required=True)
+ subparser.add_argument(
+ f'--{opt}',
+ choices=list(_get_literal_values(th)),
+ required=True,
+ )
else:
- subparser.add_argument(f"--{opt}",
- type=_get_arg_type(th), required=True)
+ subparser.add_argument(
+ f'--{opt}', type=_get_arg_type(th), required=True
+ )
# Get options as a dict rather than a namespace,
# so that we can modify it and pack for passing to functions
args = vars(parser.parse_args())
- if not args["subcommand"]:
- print("Subcommand required!")
+ if not args['subcommand']:
+ print('Subcommand required!')
parser.print_usage()
sys.exit(1)
- function_name = args["subcommand"]
+ function_name = args['subcommand']
func = functions[function_name]
# Remove the subcommand from the arguments,
# it would cause an extra argument error when we pass the dict to a function
- del args["subcommand"]
+ del args['subcommand']
# Show and generate commands must always get the "raw" argument,
# but other commands (clear/reset/restart/add/delete) should not,
# because they produce no output and it makes no sense for them.
- if ("raw" not in args) and _capture_output(function_name):
- args["raw"] = False
+ if ('raw' not in args) and _capture_output(function_name):
+ args['raw'] = False
if _capture_output(function_name):
# Show and generate commands are slightly special:
# they may return human-formatted output
# or a raw dict that we need to serialize in JSON for printing
res = func(**args)
- if not args["raw"]:
+ if not args['raw']:
return res
else:
if not isinstance(res, dict) and not isinstance(res, list):
- raise InternalError(f"Bare literal is not an acceptable raw output, must be a list or an object.\
- The output was:{res}")
+ raise InternalError(
+ f'Bare literal is not an acceptable raw output, must be a list or an object.\
+ The output was:{res}'
+ )
res = decamelize(res)
res = _normalize_field_names(res)
from json import dumps
+
return dumps(res, indent=4)
else:
# Other functions should not return anything,