summaryrefslogtreecommitdiff
path: root/src/op_mode
diff options
context:
space:
mode:
Diffstat (limited to 'src/op_mode')
-rwxr-xr-xsrc/op_mode/pki.py201
-rwxr-xr-xsrc/op_mode/show_neigh.py162
-rwxr-xr-xsrc/op_mode/show_openconnect_otp.py109
-rwxr-xr-xsrc/op_mode/show_uptime.py17
4 files changed, 394 insertions, 95 deletions
diff --git a/src/op_mode/pki.py b/src/op_mode/pki.py
index bc7813052..1e78c3a03 100755
--- a/src/op_mode/pki.py
+++ b/src/op_mode/pki.py
@@ -17,6 +17,7 @@
import argparse
import ipaddress
import os
+import re
import sys
import tabulate
@@ -30,7 +31,8 @@ from vyos.pki import encode_certificate, encode_public_key, encode_private_key,
from vyos.pki import create_certificate, create_certificate_request, create_certificate_revocation_list
from vyos.pki import create_private_key
from vyos.pki import create_dh_parameters
-from vyos.pki import load_certificate, load_certificate_request, load_private_key, load_crl
+from vyos.pki import load_certificate, load_certificate_request, load_private_key
+from vyos.pki import load_crl, load_dh_parameters, load_public_key
from vyos.pki import verify_certificate
from vyos.xml import defaults
from vyos.util import ask_input, ask_yes_no
@@ -183,13 +185,13 @@ def install_ssh_key(name, public_key, private_key, passphrase=None):
])
print(encode_private_key(private_key, encoding='PEM', key_format='OpenSSH', passphrase=passphrase))
-def install_keypair(name, key_type, private_key=None, public_key=None, passphrase=None):
+def install_keypair(name, key_type, private_key=None, public_key=None, passphrase=None, prompt=True):
# Show/install conf commands for key-pair
config_paths = []
if public_key:
- install_public_key = ask_yes_no('Do you want to install the public key?', default=True)
+ install_public_key = not prompt or ask_yes_no('Do you want to install the public key?', default=True)
public_key_pem = encode_public_key(public_key)
if install_public_key:
@@ -200,7 +202,7 @@ def install_keypair(name, key_type, private_key=None, public_key=None, passphras
print(public_key_pem)
if private_key:
- install_private_key = ask_yes_no('Do you want to install the private key?', default=True)
+ install_private_key = not prompt or ask_yes_no('Do you want to install the private key?', default=True)
private_key_pem = encode_private_key(private_key, passphrase=passphrase)
if install_private_key:
@@ -214,6 +216,13 @@ def install_keypair(name, key_type, private_key=None, public_key=None, passphras
install_into_config(conf, config_paths)
+def install_openvpn_key(name, key_data, key_version='1'):
+ config_paths = [
+ f"pki openvpn shared-secret {name} key '{key_data}'",
+ f"pki openvpn shared-secret {name} version '{key_version}'"
+ ]
+ install_into_config(conf, config_paths)
+
def install_wireguard_key(interface, private_key, public_key):
# Show conf commands for installing wireguard key pairs
from vyos.ifconfig import Section
@@ -640,15 +649,11 @@ def generate_openvpn_key(name, install=False, file=False):
key_data = "".join(key_lines[1:-1]) # Remove wrapper tags and line endings
key_version = '1'
- import re
version_search = re.search(r'BEGIN OpenVPN Static key V(\d+)', result) # Future-proofing (hopefully)
if version_search:
key_version = version_search[1]
- base = f"set pki openvpn shared-secret {name}"
- print("Configure mode commands to install OpenVPN key:")
- print(f"{base} key '{key_data}'")
- print(f"{base} version '{key_version}'")
+ install_openvpn_key(name, key_data, key_version)
if file:
write_file(f'{name}.key', result)
@@ -670,6 +675,167 @@ def generate_wireguard_psk(interface=None, peer=None, install=False):
else:
print(f'Pre-shared key: {psk}')
+# Import functions
+def import_ca_certificate(name, path=None, key_path=None):
+ if path:
+ if not os.path.exists(path):
+ print(f'File not found: {path}')
+ return
+
+ cert = None
+
+ with open(path) as f:
+ cert_data = f.read()
+ cert = load_certificate(cert_data, wrap_tags=False)
+
+ if not cert:
+ print(f'Invalid certificate: {path}')
+ return
+
+ install_certificate(name, cert, is_ca=True)
+
+ if key_path:
+ if not os.path.exists(key_path):
+ print(f'File not found: {key_path}')
+ return
+
+ key = None
+ passphrase = ask_input('Enter private key passphrase: ') or None
+
+ with open(key_path) as f:
+ key_data = f.read()
+ key = load_private_key(key_data, passphrase=passphrase, wrap_tags=False)
+
+ if not key:
+ print(f'Invalid private key or passphrase: {path}')
+ return
+
+ install_certificate(name, private_key=key, is_ca=True)
+
+def import_certificate(name, path=None, key_path=None):
+ if path:
+ if not os.path.exists(path):
+ print(f'File not found: {path}')
+ return
+
+ cert = None
+
+ with open(path) as f:
+ cert_data = f.read()
+ cert = load_certificate(cert_data, wrap_tags=False)
+
+ if not cert:
+ print(f'Invalid certificate: {path}')
+ return
+
+ install_certificate(name, cert, is_ca=False)
+
+ if key_path:
+ if not os.path.exists(key_path):
+ print(f'File not found: {key_path}')
+ return
+
+ key = None
+ passphrase = ask_input('Enter private key passphrase: ') or None
+
+ with open(key_path) as f:
+ key_data = f.read()
+ key = load_private_key(key_data, passphrase=passphrase, wrap_tags=False)
+
+ if not key:
+ print(f'Invalid private key or passphrase: {path}')
+ return
+
+ install_certificate(name, private_key=key, is_ca=False)
+
+def import_crl(name, path):
+ if not os.path.exists(path):
+ print(f'File not found: {path}')
+ return
+
+ crl = None
+
+ with open(path) as f:
+ crl_data = f.read()
+ crl = load_crl(crl_data, wrap_tags=False)
+
+ if not crl:
+ print(f'Invalid certificate: {path}')
+ return
+
+ install_crl(name, crl)
+
+def import_dh_parameters(name, path):
+ if not os.path.exists(path):
+ print(f'File not found: {path}')
+ return
+
+ dh = None
+
+ with open(path) as f:
+ dh_data = f.read()
+ dh = load_dh_parameters(dh_data, wrap_tags=False)
+
+ if not dh:
+ print(f'Invalid DH parameters: {path}')
+ return
+
+ install_dh_parameters(name, dh)
+
+def import_keypair(name, path=None, key_path=None):
+ if path:
+ if not os.path.exists(path):
+ print(f'File not found: {path}')
+ return
+
+ key = None
+
+ with open(path) as f:
+ key_data = f.read()
+ key = load_public_key(key_data, wrap_tags=False)
+
+ if not key:
+ print(f'Invalid public key: {path}')
+ return
+
+ install_keypair(name, None, public_key=key, prompt=False)
+
+ if key_path:
+ if not os.path.exists(key_path):
+ print(f'File not found: {key_path}')
+ return
+
+ key = None
+ passphrase = ask_input('Enter private key passphrase: ') or None
+
+ with open(key_path) as f:
+ key_data = f.read()
+ key = load_private_key(key_data, passphrase=passphrase, wrap_tags=False)
+
+ if not key:
+ print(f'Invalid private key or passphrase: {path}')
+ return
+
+ install_keypair(name, None, private_key=key, prompt=False)
+
+def import_openvpn_secret(name, path):
+ if not os.path.exists(path):
+ print(f'File not found: {path}')
+ return
+
+ key_data = None
+ key_version = '1'
+
+ with open(path) as f:
+ key_lines = f.read().split("\n")
+ key_data = "".join(key_lines[1:-1]) # Remove wrapper tags and line endings
+
+ version_search = re.search(r'BEGIN OpenVPN Static key V(\d+)', key_lines[0]) # Future-proofing (hopefully)
+ if version_search:
+ key_version = version_search[1]
+
+ install_openvpn_key(name, key_data, key_version)
+
# Show functions
def show_certificate_authority(name=None):
headers = ['Name', 'Subject', 'Issuer CN', 'Issued', 'Expiry', 'Private Key', 'Parent']
@@ -799,6 +965,9 @@ if __name__ == '__main__':
parser.add_argument('--file', help='Write generated keys into specified filename', action='store_true')
parser.add_argument('--install', help='Install generated keys into running-config', action='store_true')
+ parser.add_argument('--filename', help='Write certificate into specified filename', action='store')
+ parser.add_argument('--key-filename', help='Write key into specified filename', action='store')
+
args = parser.parse_args()
try:
@@ -840,7 +1009,19 @@ if __name__ == '__main__':
generate_wireguard_key(args.interface, install=args.install)
if args.psk:
generate_wireguard_psk(args.interface, peer=args.peer, install=args.install)
-
+ elif args.action == 'import':
+ if args.ca:
+ import_ca_certificate(args.ca, path=args.filename, key_path=args.key_filename)
+ elif args.certificate:
+ import_certificate(args.certificate, path=args.filename, key_path=args.key_filename)
+ elif args.crl:
+ import_crl(args.crl, args.filename)
+ elif args.dh:
+ import_dh_parameters(args.dh, args.filename)
+ elif args.keypair:
+ import_keypair(args.keypair, path=args.filename, key_path=args.key_filename)
+ elif args.openvpn:
+ import_openvpn_secret(args.openvpn, args.filename)
elif args.action == 'show':
if args.ca:
ca_name = None if args.ca == 'all' else args.ca
diff --git a/src/op_mode/show_neigh.py b/src/op_mode/show_neigh.py
index 94e745493..d874bd544 100755
--- a/src/op_mode/show_neigh.py
+++ b/src/op_mode/show_neigh.py
@@ -1,6 +1,6 @@
#!/usr/bin/env python3
#
-# Copyright (C) 2020 VyOS maintainers and contributors
+# Copyright (C) 2022 VyOS maintainers and contributors
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 or later as
@@ -14,83 +14,89 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-#ip -j -f inet neigh list | jq
-#[
- #{
- #"dst": "192.168.101.8",
- #"dev": "enp0s25",
- #"lladdr": "78:d2:94:72:77:7e",
- #"state": [
- #"STALE"
- #]
- #},
- #{
- #"dst": "192.168.101.185",
- #"dev": "enp0s25",
- #"lladdr": "34:46:ec:76:f8:9b",
- #"state": [
- #"STALE"
- #]
- #},
- #{
- #"dst": "192.168.101.225",
- #"dev": "enp0s25",
- #"lladdr": "c2:cb:fa:bf:a0:35",
- #"state": [
- #"STALE"
- #]
- #},
- #{
- #"dst": "192.168.101.1",
- #"dev": "enp0s25",
- #"lladdr": "00:98:2b:f8:3f:11",
- #"state": [
- #"REACHABLE"
- #]
- #},
- #{
- #"dst": "192.168.101.181",
- #"dev": "enp0s25",
- #"lladdr": "d8:9b:3b:d5:88:22",
- #"state": [
- #"STALE"
- #]
- #}
-#]
+# Sample output of `ip --json neigh list`:
+#
+# [
+# {
+# "dst": "192.168.1.1",
+# "dev": "eth0", # Missing if `dev ...` option is used
+# "lladdr": "00:aa:bb:cc:dd:ee", # May be missing for failed entries
+# "state": [
+# "REACHABLE"
+# ]
+# },
+# ]
import sys
-import argparse
-import json
-from vyos.util import cmd
-
-def main():
- #parese args
- parser = argparse.ArgumentParser()
- parser.add_argument('--family', help='Protocol family', required=True)
- args = parser.parse_args()
-
- neigh_raw_json = cmd(f'ip -j -f {args.family} neigh list')
- neigh_raw_json = neigh_raw_json.lower()
- neigh_json = json.loads(neigh_raw_json)
-
- format_neigh = '%-50s %-10s %-20s %s'
- print(format_neigh % ("IP Address", "Device", "State", "LLADDR"))
- print(format_neigh % ("----------", "------", "-----", "------"))
-
- if neigh_json is not None:
- for neigh_item in neigh_json:
- dev = neigh_item['dev']
- dst = neigh_item['dst']
- lladdr = neigh_item['lladdr'] if 'lladdr' in neigh_item else ''
- state = neigh_item['state']
-
- i = 0
- for state_item in state:
- if i == 0:
- print(format_neigh % (dst, dev, state_item, lladdr))
- else:
- print(format_neigh % ('', '', state_item, ''))
- i+=1
-
+
+
+def get_raw_data(family, device=None, state=None):
+ from json import loads
+ from vyos.util import cmd
+
+ if device:
+ device = f"dev {device}"
+ else:
+ device = ""
+
+ if state:
+ state = f"nud {state}"
+ else:
+ state = ""
+
+ neigh_cmd = f"ip --family {family} --json neighbor list {device} {state}"
+
+ data = loads(cmd(neigh_cmd))
+
+ return data
+
+def get_formatted_output(family, device=None, state=None):
+ from tabulate import tabulate
+
+ def entry_to_list(e, intf=None):
+ dst = e["dst"]
+
+ # State is always a list in the iproute2 output
+ state = ", ".join(e["state"])
+
+ # Link layer address is absent from e.g. FAILED entries
+ if "lladdr" in e:
+ lladdr = e["lladdr"]
+ else:
+ lladdr = None
+
+ # Device field is absent from outputs of `ip neigh list dev ...`
+ if "dev" in e:
+ dev = e["dev"]
+ elif device:
+ dev = device
+ else:
+ raise ValueError("interface is not defined")
+
+ return [dst, dev, lladdr, state]
+
+ neighs = get_raw_data(family, device=device, state=state)
+ neighs = map(entry_to_list, neighs)
+
+ headers = ["Address", "Interface", "Link layer address", "State"]
+ return tabulate(neighs, headers)
+
if __name__ == '__main__':
- main()
+ from argparse import ArgumentParser
+
+ parser = ArgumentParser()
+ parser.add_argument("-f", "--family", type=str, default="inet", help="Address family")
+ parser.add_argument("-i", "--interface", type=str, help="Network interface")
+ parser.add_argument("-s", "--state", type=str, help="Neighbor table entry state")
+
+ args = parser.parse_args()
+
+ if args.state:
+ if args.state not in ["reachable", "failed", "stale", "permanent"]:
+ raise ValueError(f"""Incorrect state "{args.state}"! Must be one of: reachable, stale, failed, permanent""")
+
+ try:
+ print(get_formatted_output(args.family, device=args.interface, state=args.state))
+ except ValueError as e:
+ print(e)
+ sys.exit(1)
diff --git a/src/op_mode/show_openconnect_otp.py b/src/op_mode/show_openconnect_otp.py
new file mode 100755
index 000000000..ae532ccc9
--- /dev/null
+++ b/src/op_mode/show_openconnect_otp.py
@@ -0,0 +1,109 @@
+#!/usr/bin/env python3
+
+# Copyright 2017, 2022 VyOS maintainers and contributors <maintainers@vyos.io>
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library. If not, see <http://www.gnu.org/licenses/>.
+
+import argparse
+import os
+
+from vyos.config import Config
+from vyos.xml import defaults
+from vyos.configdict import dict_merge
+from vyos.util import popen
+from base64 import b32encode
+
+otp_file = '/run/ocserv/users.oath'
+
+def check_uname_otp(username):
+ """
+ Check if "username" exists and have an OTP key
+ """
+ config = Config()
+ base_key = ['vpn', 'openconnect', 'authentication', 'local-users', 'username', username, 'otp', 'key']
+ if not config.exists(base_key):
+ return None
+ return True
+
+def get_otp_ocserv(username):
+ config = Config()
+ base = ['vpn', 'openconnect']
+ if not config.exists(base):
+ return None
+ ocserv = config.get_config_dict(base, key_mangling=('-', '_'), get_first_key=True)
+ # We have gathered the dict representation of the CLI, but there are default
+ # options which we need to update into the dictionary retrived.
+ default_values = defaults(base)
+ ocserv = dict_merge(default_values, ocserv)
+ # workaround a "know limitation" - https://phabricator.vyos.net/T2665
+ del ocserv['authentication']['local_users']['username']['otp']
+ if not ocserv["authentication"]["local_users"]["username"]:
+ return None
+ default_ocserv_usr_values = default_values['authentication']['local_users']['username']['otp']
+ for user, params in ocserv['authentication']['local_users']['username'].items():
+ # Not every configuration requires OTP settings
+ if ocserv['authentication']['local_users']['username'][user].get('otp'):
+ ocserv['authentication']['local_users']['username'][user]['otp'] = dict_merge(default_ocserv_usr_values, ocserv['authentication']['local_users']['username'][user]['otp'])
+ result = ocserv['authentication']['local_users']['username'][username]
+ return result
+
+def display_otp_ocserv(username, params, info):
+ hostname = os.uname()[1]
+ key_hex = params['otp']['key']
+ otp_length = params['otp']['otp_length']
+ interval = params['otp']['interval']
+ token_type = params['otp']['token_type']
+ if token_type == 'hotp-time':
+ token_type_acrn = 'totp'
+ key_base32 = b32encode(bytes.fromhex(key_hex)).decode()
+ otp_url = ''.join(["otpauth://",token_type_acrn,"/",username,"@",hostname,"?secret=",key_base32,"&digits=",otp_length,"&period=",interval])
+ qrcode,err = popen('qrencode -t ansiutf8', input=otp_url)
+
+ if info == 'full':
+ print("# You can share it with the user, he just needs to scan the QR in his OTP app")
+ print("# username: ", username)
+ print("# OTP KEY: ", key_base32)
+ print("# OTP URL: ", otp_url)
+ print(qrcode)
+ print('# To add this OTP key to configuration, run the following commands:')
+ print(f"set vpn openconnect authentication local-users username {username} otp key '{key_hex}'")
+ if interval != "30":
+ print(f"set vpn openconnect authentication local-users username {username} otp interval '{interval}'")
+ if otp_length != "6":
+ print(f"set vpn openconnect authentication local-users username {username} otp otp-length '{otp_length}'")
+ elif info == 'key-hex':
+ print("# OTP key in hexadecimal: ")
+ print(key_hex)
+ elif info == 'key-b32':
+ print("# OTP key in Base32: ")
+ print(key_base32)
+ elif info == 'qrcode':
+ print(f"# QR code for OpenConnect user '{username}'")
+ print(qrcode)
+ elif info == 'uri':
+ print(f"# URI for OpenConnect user '{username}'")
+ print(otp_url)
+
+if __name__ == '__main__':
+ parser = argparse.ArgumentParser(add_help=False, description='Show OTP authentication information for selected user')
+ parser.add_argument('--user', action="store", type=str, default='', help='Username')
+ parser.add_argument('--info', action="store", type=str, default='full', help='Wich information to display')
+
+ args = parser.parse_args()
+ check_otp = check_uname_otp(args.user)
+ if check_otp:
+ user_otp_params = get_otp_ocserv(args.user)
+ display_otp_ocserv(args.user, user_otp_params, args.info)
+ else:
+ print(f'There is no such user ("{args.user}") with an OTP key configured')
diff --git a/src/op_mode/show_uptime.py b/src/op_mode/show_uptime.py
index 1b5e33fa9..b70c60cf8 100755
--- a/src/op_mode/show_uptime.py
+++ b/src/op_mode/show_uptime.py
@@ -1,6 +1,6 @@
#!/usr/bin/env python3
#
-# Copyright (C) 2021 VyOS maintainers and contributors
+# Copyright (C) 2021-2022 VyOS maintainers and contributors
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 as
@@ -26,14 +26,17 @@ def get_uptime_seconds():
def get_load_averages():
from re import search
from vyos.util import cmd
+ from vyos.cpu import get_core_count
data = cmd("uptime")
matches = search(r"load average:\s*(?P<one>[0-9\.]+)\s*,\s*(?P<five>[0-9\.]+)\s*,\s*(?P<fifteen>[0-9\.]+)\s*", data)
+ core_count = get_core_count()
+
res = {}
- res[1] = float(matches["one"])
- res[5] = float(matches["five"])
- res[15] = float(matches["fifteen"])
+ res[1] = float(matches["one"]) / core_count
+ res[5] = float(matches["five"]) / core_count
+ res[15] = float(matches["fifteen"]) / core_count
return res
@@ -53,9 +56,9 @@ def get_formatted_output():
out = "Uptime: {}\n\n".format(data["uptime"])
avgs = data["load_average"]
out += "Load averages:\n"
- out += "1 minute: {:.02f}%\n".format(avgs[1]*100)
- out += "5 minutes: {:.02f}%\n".format(avgs[5]*100)
- out += "15 minutes: {:.02f}%\n".format(avgs[15]*100)
+ out += "1 minute: {:.01f}%\n".format(avgs[1]*100)
+ out += "5 minutes: {:.01f}%\n".format(avgs[5]*100)
+ out += "15 minutes: {:.01f}%\n".format(avgs[15]*100)
return out