summaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
Diffstat (limited to 'python')
-rw-r--r--python/vyos/base.py21
-rw-r--r--python/vyos/defaults.py5
-rw-r--r--python/vyos/frrender.py3
-rwxr-xr-xpython/vyos/template.py46
-rw-r--r--python/vyos/utils/network.py60
-rw-r--r--python/vyos/utils/process.py48
6 files changed, 130 insertions, 53 deletions
diff --git a/python/vyos/base.py b/python/vyos/base.py
index ca96d96ce..3173ddc20 100644
--- a/python/vyos/base.py
+++ b/python/vyos/base.py
@@ -1,4 +1,4 @@
-# Copyright 2018-2022 VyOS maintainers and contributors <maintainers@vyos.io>
+# Copyright 2018-2025 VyOS maintainers and contributors <maintainers@vyos.io>
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
@@ -15,8 +15,7 @@
from textwrap import fill
-
-class BaseWarning:
+class UserMessage:
def __init__(self, header, message, **kwargs):
self.message = message
self.kwargs = kwargs
@@ -33,7 +32,6 @@ class BaseWarning:
messages = self.message.split('\n')
isfirstmessage = True
initial_indent = self.textinitindent
- print('')
for mes in messages:
mes = fill(mes, initial_indent=initial_indent,
subsequent_indent=self.standardindent, **self.kwargs)
@@ -44,17 +42,24 @@ class BaseWarning:
print('', flush=True)
+class Message():
+ def __init__(self, message, **kwargs):
+ self.Message = UserMessage('', message, **kwargs)
+ self.Message.print()
+
class Warning():
def __init__(self, message, **kwargs):
- self.BaseWarn = BaseWarning('WARNING: ', message, **kwargs)
- self.BaseWarn.print()
+ print('')
+ self.UserMessage = UserMessage('WARNING: ', message, **kwargs)
+ self.UserMessage.print()
class DeprecationWarning():
def __init__(self, message, **kwargs):
# Reformat the message and trim it to 72 characters in length
- self.BaseWarn = BaseWarning('DEPRECATION WARNING: ', message, **kwargs)
- self.BaseWarn.print()
+ print('')
+ self.UserMessage = UserMessage('DEPRECATION WARNING: ', message, **kwargs)
+ self.UserMessage.print()
class ConfigError(Exception):
diff --git a/python/vyos/defaults.py b/python/vyos/defaults.py
index 7efccded6..c1e5ddc04 100644
--- a/python/vyos/defaults.py
+++ b/python/vyos/defaults.py
@@ -43,10 +43,15 @@ directories = {
}
systemd_services = {
+ 'haproxy' : 'haproxy.service',
'syslog' : 'syslog.service',
'snmpd' : 'snmpd.service',
}
+internal_ports = {
+ 'certbot_haproxy' : 65080, # Certbot running behing haproxy
+}
+
config_status = '/tmp/vyos-config-status'
api_config_state = '/run/http-api-state'
frr_debug_enable = '/tmp/vyos.frr.debug'
diff --git a/python/vyos/frrender.py b/python/vyos/frrender.py
index 524167d8b..73d6dd5f0 100644
--- a/python/vyos/frrender.py
+++ b/python/vyos/frrender.py
@@ -697,6 +697,9 @@ class FRRender:
debug('FRR: START CONFIGURATION RENDERING')
# we can not reload an empty file, thus we always embed the marker
output = '!\n'
+ # Enable FRR logging
+ output += 'log syslog\n'
+ output += 'log facility local7\n'
# Enable SNMP agentx support
# SNMP AgentX support cannot be disabled once enabled
if 'snmp' in config_dict:
diff --git a/python/vyos/template.py b/python/vyos/template.py
index d79e1183f..11e1cc50f 100755
--- a/python/vyos/template.py
+++ b/python/vyos/template.py
@@ -36,6 +36,7 @@ DEFAULT_TEMPLATE_DIR = directories["templates"]
# Holds template filters registered via register_filter()
_FILTERS = {}
_TESTS = {}
+_CLEVER_FUNCTIONS = {}
# reuse Environments with identical settings to improve performance
@functools.lru_cache(maxsize=2)
@@ -58,6 +59,7 @@ def _get_environment(location=None):
)
env.filters.update(_FILTERS)
env.tests.update(_TESTS)
+ env.globals.update(_CLEVER_FUNCTIONS)
return env
@@ -77,7 +79,7 @@ def register_filter(name, func=None):
"Filters can only be registered before rendering the first template"
)
if name in _FILTERS:
- raise ValueError(f"A filter with name {name!r} was registered already")
+ raise ValueError(f"A filter with name {name!r} was already registered")
_FILTERS[name] = func
return func
@@ -97,10 +99,30 @@ def register_test(name, func=None):
"Tests can only be registered before rendering the first template"
)
if name in _TESTS:
- raise ValueError(f"A test with name {name!r} was registered already")
+ raise ValueError(f"A test with name {name!r} was already registered")
_TESTS[name] = func
return func
+def register_clever_function(name, func=None):
+ """Register a function to be available as test in templates under given name.
+
+ It can also be used as a decorator, see below in this module for examples.
+
+ :raise RuntimeError:
+ when trying to register a test after a template has been rendered already
+ :raise ValueError: when trying to register a name which was taken already
+ """
+ if func is None:
+ return functools.partial(register_clever_function, name)
+ if _get_environment.cache_info().currsize:
+ raise RuntimeError(
+ "Clever functions can only be registered before rendering the" \
+ "first template")
+ if name in _CLEVER_FUNCTIONS:
+ raise ValueError(f"A clever function with name {name!r} was already "\
+ "registered")
+ _CLEVER_FUNCTIONS[name] = func
+ return func
def render_to_string(template, content, formater=None, location=None):
"""Render a template from the template directory, raise on any errors.
@@ -150,6 +172,8 @@ def render(
# As we are opening the file with 'w', we are performing the rendering before
# calling open() to not accidentally erase the file if rendering fails
rendered = render_to_string(template, content, formater, location)
+ # Remove any trailing character and always add a new line at the end
+ rendered = rendered.rstrip() + "\n"
# Write to file
with open(destination, "w") as file:
@@ -1050,3 +1074,21 @@ def vyos_defined(value, test_value=None, var_type=None):
else:
# Valid value and is matching optional argument if provided - return true
return True
+
+@register_clever_function('get_default_port')
+def get_default_port(service):
+ """
+ Jinja2 plugin to retrieve common service port number from vyos.defaults
+ class form a Jinja2 template. This removes the need to hardcode, or pass in
+ the data using the general dictionary.
+
+ Added to remove code complexity and make it easier to read.
+
+ Example:
+ {{ get_default_port('certbot_haproxy') }}
+ """
+ from vyos.defaults import internal_ports
+ if service not in internal_ports:
+ raise RuntimeError(f'Service "{service}" not found in internal ' \
+ 'vyos.defaults.internal_ports dict!')
+ return internal_ports[service]
diff --git a/python/vyos/utils/network.py b/python/vyos/utils/network.py
index 2f666f0ee..67d247fba 100644
--- a/python/vyos/utils/network.py
+++ b/python/vyos/utils/network.py
@@ -256,40 +256,60 @@ def mac2eui64(mac, prefix=None):
except: # pylint: disable=bare-except
return
-def check_port_availability(ipaddress, port, protocol):
+def check_port_availability(address: str=None, port: int=0, protocol: str='tcp') -> bool:
"""
- Check if port is available and not used by any service
- Return False if a port is busy or IP address does not exists
+ Check if given port is available and not used by any service.
+
Should be used carefully for services that can start listening
dynamically, because IP address may be dynamic too
+
+ Args:
+ address: IPv4 or IPv6 address - if None, checks on all interfaces
+ port: TCP/UDP port number.
+
+
+ Returns:
+ False if a port is busy or IP address does not exists
+ True if a port is free and IP address exists
"""
- from socketserver import TCPServer, UDPServer
+ import socket
from ipaddress import ip_address
+ # treat None as "any address"
+ address = address or '::'
+
# verify arguments
try:
- ipaddress = ip_address(ipaddress).compressed
- except:
- raise ValueError(f'The {ipaddress} is not a valid IPv4 or IPv6 address')
+ address = ip_address(address).compressed
+ except ValueError:
+ raise ValueError(f'{address} is not a valid IPv4 or IPv6 address')
if port not in range(1, 65536):
- raise ValueError(f'The port number {port} is not in the 1-65535 range')
+ raise ValueError(f'Port {port} is not in range 1-65535')
if protocol not in ['tcp', 'udp']:
- raise ValueError(f'The protocol {protocol} is not supported. Only tcp and udp are allowed')
+ raise ValueError(f'{protocol} is not supported - only tcp and udp are allowed')
- # check port availability
+ protocol = socket.SOCK_STREAM if protocol == 'tcp' else socket.SOCK_DGRAM
try:
- if protocol == 'tcp':
- server = TCPServer((ipaddress, port), None, bind_and_activate=True)
- if protocol == 'udp':
- server = UDPServer((ipaddress, port), None, bind_and_activate=True)
- server.server_close()
- except Exception as e:
- # errno.h:
- #define EADDRINUSE 98 /* Address already in use */
- if e.errno == 98:
+ addr_info = socket.getaddrinfo(address, port, socket.AF_UNSPEC, protocol)
+ except socket.gaierror as e:
+ print(f'Invalid address: {address}')
+ return False
+
+ for family, socktype, proto, canonname, sockaddr in addr_info:
+ try:
+ with socket.socket(family, socktype, proto) as s:
+ s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+ s.bind(sockaddr)
+ # port is free to use
+ return True
+ except OSError:
+ # port is already in use
return False
- return True
+ # if we reach this point, no socket was tested and we assume the port is
+ # already in use - better safe then sorry
+ return False
+
def is_listen_port_bind_service(port: int, service: str) -> bool:
"""Check if listen port bound to expected program name
diff --git a/python/vyos/utils/process.py b/python/vyos/utils/process.py
index 121b6e240..21335e6b3 100644
--- a/python/vyos/utils/process.py
+++ b/python/vyos/utils/process.py
@@ -14,6 +14,7 @@
# License along with this library. If not, see <http://www.gnu.org/licenses/>.
import os
+import shlex
from subprocess import Popen
from subprocess import PIPE
@@ -21,20 +22,17 @@ from subprocess import STDOUT
from subprocess import DEVNULL
-def get_wrapper(vrf, netns, auth):
- wrapper = ''
+def get_wrapper(vrf, netns):
+ wrapper = None
if vrf:
- wrapper = f'ip vrf exec {vrf} '
+ wrapper = ['ip', 'vrf', 'exec', vrf]
elif netns:
- wrapper = f'ip netns exec {netns} '
- if auth:
- wrapper = f'{auth} {wrapper}'
+ wrapper = ['ip', 'netns', 'exec', netns]
return wrapper
def popen(command, flag='', shell=None, input=None, timeout=None, env=None,
- stdout=PIPE, stderr=PIPE, decode='utf-8', auth='', vrf=None,
- netns=None):
+ stdout=PIPE, stderr=PIPE, decode='utf-8', vrf=None, netns=None):
"""
popen is a wrapper helper around subprocess.Popen
with it default setting it will return a tuple (out, err)
@@ -75,28 +73,33 @@ def popen(command, flag='', shell=None, input=None, timeout=None, env=None,
if not debug.enabled(flag):
flag = 'command'
+ use_shell = shell
+ stdin = None
+ if shell is None:
+ use_shell = False
+ if ' ' in command:
+ use_shell = True
+ if env:
+ use_shell = True
+
# Must be run as root to execute command in VRF or network namespace
+ wrapper = get_wrapper(vrf, netns)
if vrf or netns:
if os.getuid() != 0:
raise OSError(
'Permission denied: cannot execute commands in VRF and netns contexts as an unprivileged user'
)
- wrapper = get_wrapper(vrf, netns, auth)
- command = f'{wrapper} {command}' if wrapper else command
+ if use_shell:
+ command = f'{shlex.join(wrapper)} {command}'
+ else:
+ if type(command) is not list:
+ command = [command]
+ command = wrapper + command
- cmd_msg = f"cmd '{command}'"
+ cmd_msg = f"cmd '{command}'" if use_shell else f"cmd '{shlex.join(command)}'"
debug.message(cmd_msg, flag)
- use_shell = shell
- stdin = None
- if shell is None:
- use_shell = False
- if ' ' in command:
- use_shell = True
- if env:
- use_shell = True
-
if input:
stdin = PIPE
input = input.encode() if type(input) is str else input
@@ -155,7 +158,7 @@ def run(command, flag='', shell=None, input=None, timeout=None, env=None,
def cmd(command, flag='', shell=None, input=None, timeout=None, env=None,
stdout=PIPE, stderr=PIPE, decode='utf-8', raising=None, message='',
- expect=[0], auth='', vrf=None, netns=None):
+ expect=[0], vrf=None, netns=None):
"""
A wrapper around popen, which returns the stdout and
will raise the error code of a command
@@ -171,12 +174,11 @@ def cmd(command, flag='', shell=None, input=None, timeout=None, env=None,
input=input, timeout=timeout,
env=env, shell=shell,
decode=decode,
- auth=auth,
vrf=vrf,
netns=netns,
)
if code not in expect:
- wrapper = get_wrapper(vrf, netns, auth='')
+ wrapper = get_wrapper(vrf, netns)
command = f'{wrapper} {command}'
feedback = message + '\n' if message else ''
feedback += f'failed to run command: {command}\n'