summaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
authorNataliia Solomko <natalirs1985@gmail.com>2024-09-20 13:13:26 +0300
committerNataliia Solomko <natalirs1985@gmail.com>2024-09-20 13:46:10 +0300
commitc6062ef100f20e2cc3e5ed9c4d145d4d969741e4 (patch)
treee66cd75c830c5b2da5b706b2569cc4647c08231c /python
parent394c2ad60b9d78b516facd9509493f719643323c (diff)
downloadvyos-1x-c6062ef100f20e2cc3e5ed9c4d145d4d969741e4.tar.gz
vyos-1x-c6062ef100f20e2cc3e5ed9c4d145d4d969741e4.zip
op-mode: T4833: Include wireguard peer name in interface summary report
Diffstat (limited to 'python')
-rw-r--r--python/vyos/ifconfig/wireguard.py110
1 files changed, 63 insertions, 47 deletions
diff --git a/python/vyos/ifconfig/wireguard.py b/python/vyos/ifconfig/wireguard.py
index 5b5f25323..9030b1302 100644
--- a/python/vyos/ifconfig/wireguard.py
+++ b/python/vyos/ifconfig/wireguard.py
@@ -26,6 +26,7 @@ from vyos.ifconfig import Interface
from vyos.ifconfig import Operational
from vyos.template import is_ipv6
+
class WireGuardOperational(Operational):
def _dump(self):
"""Dump wireguard data in a python friendly way."""
@@ -54,7 +55,17 @@ class WireGuardOperational(Operational):
}
else:
# We are entering a peer
- device, public_key, preshared_key, endpoint, allowed_ips, latest_handshake, transfer_rx, transfer_tx, persistent_keepalive = items
+ (
+ device,
+ public_key,
+ preshared_key,
+ endpoint,
+ allowed_ips,
+ latest_handshake,
+ transfer_rx,
+ transfer_tx,
+ persistent_keepalive,
+ ) = items
if allowed_ips == '(none)':
allowed_ips = []
else:
@@ -72,75 +83,78 @@ class WireGuardOperational(Operational):
def show_interface(self):
from vyos.config import Config
+
c = Config()
wgdump = self._dump().get(self.config['ifname'], None)
- c.set_level(["interfaces", "wireguard", self.config['ifname']])
- description = c.return_effective_value(["description"])
- ips = c.return_effective_values(["address"])
+ c.set_level(['interfaces', 'wireguard', self.config['ifname']])
+ description = c.return_effective_value(['description'])
+ ips = c.return_effective_values(['address'])
- answer = "interface: {}\n".format(self.config['ifname'])
- if (description):
- answer += " description: {}\n".format(description)
- if (ips):
- answer += " address: {}\n".format(", ".join(ips))
+ answer = 'interface: {}\n'.format(self.config['ifname'])
+ if description:
+ answer += ' description: {}\n'.format(description)
+ if ips:
+ answer += ' address: {}\n'.format(', '.join(ips))
- answer += " public key: {}\n".format(wgdump['public_key'])
- answer += " private key: (hidden)\n"
- answer += " listening port: {}\n".format(wgdump['listen_port'])
- answer += "\n"
+ answer += ' public key: {}\n'.format(wgdump['public_key'])
+ answer += ' private key: (hidden)\n'
+ answer += ' listening port: {}\n'.format(wgdump['listen_port'])
+ answer += '\n'
- for peer in c.list_effective_nodes(["peer"]):
+ for peer in c.list_effective_nodes(['peer']):
if wgdump['peers']:
- pubkey = c.return_effective_value(["peer", peer, "public_key"])
+ pubkey = c.return_effective_value(['peer', peer, 'public-key'])
if pubkey in wgdump['peers']:
wgpeer = wgdump['peers'][pubkey]
- answer += " peer: {}\n".format(peer)
- answer += " public key: {}\n".format(pubkey)
+ answer += ' peer: {}\n'.format(peer)
+ answer += ' public key: {}\n'.format(pubkey)
""" figure out if the tunnel is recently active or not """
- status = "inactive"
- if (wgpeer['latest_handshake'] is None):
+ status = 'inactive'
+ if wgpeer['latest_handshake'] is None:
""" no handshake ever """
- status = "inactive"
+ status = 'inactive'
else:
if int(wgpeer['latest_handshake']) > 0:
- delta = timedelta(seconds=int(
- time.time() - wgpeer['latest_handshake']))
- answer += " latest handshake: {}\n".format(delta)
- if (time.time() - int(wgpeer['latest_handshake']) < (60*5)):
+ delta = timedelta(
+ seconds=int(time.time() - wgpeer['latest_handshake'])
+ )
+ answer += ' latest handshake: {}\n'.format(delta)
+ if time.time() - int(wgpeer['latest_handshake']) < (60 * 5):
""" Five minutes and the tunnel is still active """
- status = "active"
+ status = 'active'
else:
""" it's been longer than 5 minutes """
- status = "inactive"
+ status = 'inactive'
elif int(wgpeer['latest_handshake']) == 0:
""" no handshake ever """
- status = "inactive"
- answer += " status: {}\n".format(status)
+ status = 'inactive'
+ answer += ' status: {}\n'.format(status)
if wgpeer['endpoint'] is not None:
- answer += " endpoint: {}\n".format(wgpeer['endpoint'])
+ answer += ' endpoint: {}\n'.format(wgpeer['endpoint'])
if wgpeer['allowed_ips'] is not None:
- answer += " allowed ips: {}\n".format(
- ",".join(wgpeer['allowed_ips']).replace(",", ", "))
+ answer += ' allowed ips: {}\n'.format(
+ ','.join(wgpeer['allowed_ips']).replace(',', ', ')
+ )
if wgpeer['transfer_rx'] > 0 or wgpeer['transfer_tx'] > 0:
- rx_size = size(
- wgpeer['transfer_rx'], system=alternative)
- tx_size = size(
- wgpeer['transfer_tx'], system=alternative)
- answer += " transfer: {} received, {} sent\n".format(
- rx_size, tx_size)
+ rx_size = size(wgpeer['transfer_rx'], system=alternative)
+ tx_size = size(wgpeer['transfer_tx'], system=alternative)
+ answer += ' transfer: {} received, {} sent\n'.format(
+ rx_size, tx_size
+ )
if wgpeer['persistent_keepalive'] is not None:
- answer += " persistent keepalive: every {} seconds\n".format(
- wgpeer['persistent_keepalive'])
+ answer += ' persistent keepalive: every {} seconds\n'.format(
+ wgpeer['persistent_keepalive']
+ )
answer += '\n'
- return answer + super().formated_stats()
+ return answer
@Interface.register
@@ -151,27 +165,29 @@ class WireGuardIf(Interface):
**Interface.definition,
**{
'section': 'wireguard',
- 'prefixes': ['wg', ],
+ 'prefixes': [
+ 'wg',
+ ],
'bridgeable': False,
- }
+ },
}
def get_mac(self):
- """ Get a synthetic MAC address. """
+ """Get a synthetic MAC address."""
return self.get_mac_synthetic()
def update(self, config):
- """ General helper function which works on a dictionary retrived by
+ """General helper function which works on a dictionary retrived by
get_config_dict(). It's main intention is to consolidate the scattered
interface setup code and provide a single point of entry when workin
- on any interface. """
+ on any interface."""
tmp_file = NamedTemporaryFile('w')
tmp_file.write(config['private_key'])
tmp_file.flush()
# Wireguard base command is identical for every peer
- base_cmd = 'wg set {ifname}'
+ base_cmd = 'wg set {ifname}'
if 'port' in config:
base_cmd += ' listen-port {port}'
if 'fwmark' in config:
@@ -201,7 +217,7 @@ class WireGuardIf(Interface):
cmd += f' preshared-key {psk_file}'
# Persistent keepalive is optional
- if 'persistent_keepalive'in peer_config:
+ if 'persistent_keepalive' in peer_config:
cmd += ' persistent-keepalive {persistent_keepalive}'
# Multiple allowed-ip ranges can be defined - ensure we are always