diff options
Diffstat (limited to 'src/op_mode')
-rwxr-xr-x | src/op_mode/cgnat.py | 96 | ||||
-rw-r--r-- | src/op_mode/evpn.py | 46 | ||||
-rwxr-xr-x | src/op_mode/ikev2_profile_generator.py | 19 | ||||
-rwxr-xr-x | src/op_mode/image_installer.py | 34 | ||||
-rwxr-xr-x | src/op_mode/nat.py | 35 | ||||
-rwxr-xr-x | src/op_mode/pki.py | 13 | ||||
-rwxr-xr-x | src/op_mode/snmp_v3.py | 3 | ||||
-rwxr-xr-x | src/op_mode/version.py | 6 |
8 files changed, 219 insertions, 33 deletions
diff --git a/src/op_mode/cgnat.py b/src/op_mode/cgnat.py new file mode 100755 index 000000000..9ad8f92f9 --- /dev/null +++ b/src/op_mode/cgnat.py @@ -0,0 +1,96 @@ +#!/usr/bin/env python3 +# +# Copyright (C) 2024 VyOS maintainers and contributors +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 2 or later as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import json +import sys +import typing + +from tabulate import tabulate + +import vyos.opmode + +from vyos.configquery import ConfigTreeQuery +from vyos.utils.process import cmd + +CGNAT_TABLE = 'cgnat' + + +def _get_raw_data(external_address: str = '', internal_address: str = '') -> list[dict]: + """Get CGNAT dictionary and filter by external or internal address if provided.""" + cmd_output = cmd(f'nft --json list table ip {CGNAT_TABLE}') + data = json.loads(cmd_output) + + elements = data['nftables'][2]['map']['elem'] + allocations = [] + for elem in elements: + internal = elem[0] # internal + external = elem[1]['concat'][0] # external + start_port = elem[1]['concat'][1]['range'][0] + end_port = elem[1]['concat'][1]['range'][1] + port_range = f'{start_port}-{end_port}' + + if (internal_address and internal != internal_address) or ( + external_address and external != external_address + ): + continue + + allocations.append( + { + 'internal_address': internal, + 'external_address': external, + 'port_range': port_range, + } + ) + + return allocations + + +def _get_formatted_output(allocations: list[dict]) -> str: + # Convert the list of dictionaries to a list of tuples for tabulate + headers = ['Internal IP', 'External IP', 'Port range'] + data = [ + (alloc['internal_address'], alloc['external_address'], alloc['port_range']) + for alloc in allocations + ] + output = tabulate(data, headers, numalign="left") + return output + + +def show_allocation( + raw: bool, + external_address: typing.Optional[str], + internal_address: typing.Optional[str], +) -> str: + config = ConfigTreeQuery() + if not config.exists('nat cgnat'): + raise vyos.opmode.UnconfiguredSubsystem('CGNAT is not configured') + + if raw: + return _get_raw_data(external_address, internal_address) + + else: + raw_data = _get_raw_data(external_address, internal_address) + return _get_formatted_output(raw_data) + + +if __name__ == '__main__': + try: + res = vyos.opmode.run(sys.modules[__name__]) + if res: + print(res) + except (ValueError, vyos.opmode.Error) as e: + print(e) + sys.exit(1) diff --git a/src/op_mode/evpn.py b/src/op_mode/evpn.py new file mode 100644 index 000000000..cae4ab9f5 --- /dev/null +++ b/src/op_mode/evpn.py @@ -0,0 +1,46 @@ +#!/usr/bin/env python3 +# +# Copyright (C) 2016-2024 VyOS maintainers and contributors +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 2 or later as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +# +# This script is a helper to run VTYSH commands for "show evpn", allowing for the --raw flag to output JSON + +import sys +import typing +import json + +import vyos.opmode +from vyos.utils.process import cmd + +def show_evpn(raw: bool, command: typing.Optional[str]): + if raw: + command = f"{command} json" + evpnDict = {} + try: + evpnDict['evpn'] = json.loads(cmd(f"vtysh -c '{command}'")) + except: + raise vyos.opmode.DataUnavailable(f"\"{command.replace(' json', '')}\" is invalid or has no JSON option") + + return evpnDict + else: + return cmd(f"vtysh -c '{command}'") + +if __name__ == '__main__': + try: + res = vyos.opmode.run(sys.modules[__name__]) + if res: + print(res) + except (ValueError, vyos.opmode.Error) as e: + print(e) + sys.exit(1) diff --git a/src/op_mode/ikev2_profile_generator.py b/src/op_mode/ikev2_profile_generator.py index 2b29f94bf..4ac4fb14a 100755 --- a/src/op_mode/ikev2_profile_generator.py +++ b/src/op_mode/ikev2_profile_generator.py @@ -144,15 +144,22 @@ tmp = reversed(tmp) data['rfqdn'] = '.'.join(tmp) pki = conf.get_config_dict(pki_base, get_first_key=True) -ca_name = data['authentication']['x509']['ca_certificate'] cert_name = data['authentication']['x509']['certificate'] -ca_cert = load_certificate(pki['ca'][ca_name]['certificate']) -cert = load_certificate(pki['certificate'][cert_name]['certificate']) +data['certs'] = [] + +for ca_name in data['authentication']['x509']['ca_certificate']: + tmp = {} + ca_cert = load_certificate(pki['ca'][ca_name]['certificate']) + cert = load_certificate(pki['certificate'][cert_name]['certificate']) + + + tmp['ca_cn'] = ca_cert.subject.get_attributes_for_oid(NameOID.COMMON_NAME)[0].value + tmp['cert_cn'] = cert.subject.get_attributes_for_oid(NameOID.COMMON_NAME)[0].value + tmp['ca_cert'] = conf.value(pki_base + ['ca', ca_name, 'certificate']) + + data['certs'].append(tmp) -data['ca_cn'] = ca_cert.subject.get_attributes_for_oid(NameOID.COMMON_NAME)[0].value -data['cert_cn'] = cert.subject.get_attributes_for_oid(NameOID.COMMON_NAME)[0].value -data['ca_cert'] = conf.value(pki_base + ['ca', ca_name, 'certificate']) esp_proposals = conf.get_config_dict(ipsec_base + ['esp-group', data['esp_group'], 'proposal'], key_mangling=('-', '_'), get_first_key=True) diff --git a/src/op_mode/image_installer.py b/src/op_mode/image_installer.py index 0d2d7076c..bdc16de15 100755 --- a/src/op_mode/image_installer.py +++ b/src/op_mode/image_installer.py @@ -40,13 +40,14 @@ from vyos.template import render from vyos.utils.io import ask_input, ask_yes_no, select_entry from vyos.utils.file import chmod_2775 from vyos.utils.process import cmd, run -from vyos.version import get_remote_version +from vyos.version import get_remote_version, get_version_data # define text messages MSG_ERR_NOT_LIVE: str = 'The system is already installed. Please use "add system image" instead.' MSG_ERR_LIVE: str = 'The system is in live-boot mode. Please use "install image" instead.' MSG_ERR_NO_DISK: str = 'No suitable disk was found. There must be at least one disk of 2GB or greater size.' MSG_ERR_IMPROPER_IMAGE: str = 'Missing sha256sum.txt.\nEither this image is corrupted, or of era 1.2.x (md5sum) and would downgrade image tools;\ndisallowed in either case.' +MSG_ERR_ARCHITECTURE_MISMATCH: str = 'Upgrading to a different image architecture will break your system.' MSG_INFO_INSTALL_WELCOME: str = 'Welcome to VyOS installation!\nThis command will install VyOS to your permanent storage.' MSG_INFO_INSTALL_EXIT: str = 'Exiting from VyOS installation' MSG_INFO_INSTALL_SUCCESS: str = 'The image installed successfully; please reboot now.' @@ -79,6 +80,9 @@ MSG_WARN_ROOT_SIZE_TOOSMALL: str = 'The size is too small. Try again' MSG_WARN_IMAGE_NAME_WRONG: str = 'The suggested name is unsupported!\n'\ 'It must be between 1 and 64 characters long and contains only the next characters: .+-_ a-z A-Z 0-9' MSG_WARN_PASSWORD_CONFIRM: str = 'The entered values did not match. Try again' +MSG_WARN_FLAVOR_MISMATCH: str = 'The running image flavor is "{0}". The new image flavor is "{1}".\n' \ +'Installing a different image flavor may cause functionality degradation or break your system.\n' \ +'Do you want to continue with installation?' CONST_MIN_DISK_SIZE: int = 2147483648 # 2 GB CONST_MIN_ROOT_SIZE: int = 1610612736 # 1.5 GB # a reserved space: 2MB for header, 1 MB for BIOS partition, 256 MB for EFI @@ -693,6 +697,31 @@ def is_raid_install(install_object: Union[disk.DiskDetails, raid.RaidDetails]) - return False +def validate_compatibility(iso_path: str) -> None: + """Check architecture and flavor compatibility with the running image + + Args: + iso_path (str): a path to the mounted ISO image + """ + old_data = get_version_data() + old_flavor = old_data.get('flavor', '') + old_architecture = old_data.get('architecture') or cmd('dpkg --print-architecture') + + new_data = get_version_data(f'{iso_path}/version.json') + new_flavor = new_data.get('flavor', '') + new_architecture = new_data.get('architecture', '') + + if not old_architecture == new_architecture: + print(MSG_ERR_ARCHITECTURE_MISMATCH) + cleanup() + exit(MSG_INFO_INSTALL_EXIT) + + if not old_flavor == new_flavor: + if not ask_yes_no(MSG_WARN_FLAVOR_MISMATCH.format(old_flavor, new_flavor), default=False): + cleanup() + exit(MSG_INFO_INSTALL_EXIT) + + def install_image() -> None: """Install an image to a disk """ @@ -876,6 +905,9 @@ def add_image(image_path: str, vrf: str = None, username: str = '', Path(DIR_ISO_MOUNT).mkdir(mode=0o755, parents=True) disk.partition_mount(iso_path, DIR_ISO_MOUNT, 'iso9660') + print('Validating image compatibility') + validate_compatibility(DIR_ISO_MOUNT) + # check sums print('Validating image checksums') if not Path(DIR_ISO_MOUNT).joinpath('sha256sum.txt').exists(): diff --git a/src/op_mode/nat.py b/src/op_mode/nat.py index 2bc7e24fe..16a545cda 100755 --- a/src/op_mode/nat.py +++ b/src/op_mode/nat.py @@ -99,6 +99,23 @@ def _get_raw_translation(direction, family, address=None): def _get_formatted_output_rules(data, direction, family): + def _get_ports_for_output(my_dict): + # Get and insert all configured ports or port ranges into output string + for index, port in enumerate(my_dict['set']): + if 'range' in str(my_dict['set'][index]): + output = my_dict['set'][index]['range'] + output = '-'.join(map(str, output)) + else: + output = str(port) + if index == 0: + output = str(output) + else: + output = ','.join([output,output]) + # Handle case where configured ports are a negated list + if my_dict['op'] == '!=': + output = '!' + output + return(output) + # Add default values before loop sport, dport, proto = 'any', 'any', 'any' saddr = '::/0' if family == 'inet6' else '0.0.0.0/0' @@ -126,21 +143,9 @@ def _get_formatted_output_rules(data, direction, family): elif my_dict['field'] == 'daddr': daddr = f'{op}{my_dict["prefix"]["addr"]}/{my_dict["prefix"]["len"]}' elif my_dict['field'] == 'sport': - # Port range or single port - if jmespath.search('set[*].range', my_dict): - sport = my_dict['set'][0]['range'] - sport = '-'.join(map(str, sport)) - else: - sport = my_dict.get('set') - sport = ','.join(map(str, sport)) + sport = _get_ports_for_output(my_dict) elif my_dict['field'] == 'dport': - # Port range or single port - if jmespath.search('set[*].range', my_dict): - dport = my_dict["set"][0]["range"] - dport = '-'.join(map(str, dport)) - else: - dport = my_dict.get('set') - dport = ','.join(map(str, dport)) + dport = _get_ports_for_output(my_dict) else: field = jmespath.search('left.payload.field', match) if field == 'saddr': @@ -263,7 +268,7 @@ def _get_formatted_translation(dict_data, nat_direction, family, verbose): proto = meta['layer4']['protoname'] if direction == 'independent': conn_id = meta['id'] - timeout = meta['timeout'] + timeout = meta.get('timeout', 'n/a') orig_src = f'{orig_src}:{orig_sport}' if orig_sport else orig_src orig_dst = f'{orig_dst}:{orig_dport}' if orig_dport else orig_dst reply_src = f'{reply_src}:{reply_sport}' if reply_sport else reply_src diff --git a/src/op_mode/pki.py b/src/op_mode/pki.py index b1ca6ee29..361b60e0e 100755 --- a/src/op_mode/pki.py +++ b/src/op_mode/pki.py @@ -876,7 +876,7 @@ def show_certificate_authority(name=None, pem=False): print("Certificate Authorities:") print(tabulate.tabulate(data, headers)) -def show_certificate(name=None, pem=False): +def show_certificate(name=None, pem=False, fingerprint_hash=None): headers = ['Name', 'Type', 'Subject CN', 'Issuer CN', 'Issued', 'Expiry', 'Revoked', 'Private Key', 'CA Present'] data = [] certs = get_config_certificate() @@ -897,6 +897,9 @@ def show_certificate(name=None, pem=False): if name and pem: print(encode_certificate(cert)) return + elif name and fingerprint_hash: + print(get_certificate_fingerprint(cert, fingerprint_hash)) + return ca_name = get_certificate_ca(cert, ca_certs) cert_subject_cn = cert.subject.rfc4514_string().split(",")[0] @@ -923,12 +926,6 @@ def show_certificate(name=None, pem=False): print("Certificates:") print(tabulate.tabulate(data, headers)) -def show_certificate_fingerprint(name, hash): - cert = get_config_certificate(name=name) - cert = load_certificate(cert['certificate']) - - print(get_certificate_fingerprint(cert, hash)) - def show_crl(name=None, pem=False): headers = ['CA Name', 'Updated', 'Revokes'] data = [] @@ -1074,7 +1071,7 @@ if __name__ == '__main__': if args.fingerprint is None: show_certificate(None if args.certificate == 'all' else args.certificate, args.pem) else: - show_certificate_fingerprint(args.certificate, args.fingerprint) + show_certificate(args.certificate, fingerprint_hash=args.fingerprint) elif args.crl: show_crl(None if args.crl == 'all' else args.crl, args.pem) else: diff --git a/src/op_mode/snmp_v3.py b/src/op_mode/snmp_v3.py index a1f76f0bc..abeb524dd 100755 --- a/src/op_mode/snmp_v3.py +++ b/src/op_mode/snmp_v3.py @@ -85,7 +85,7 @@ if __name__ == '__main__': 'user': [], 'view': [] } - + if c.exists_effective('service snmp v3 group'): for g in c.list_effective_nodes('service snmp v3 group'): group = { @@ -146,7 +146,6 @@ if __name__ == '__main__': data['trap'].append(trap) - print(data) if args.all: # Special case, print all templates ! tmpl = jinja2.Template(GROUP_OUTP_TMPL_SRC) diff --git a/src/op_mode/version.py b/src/op_mode/version.py index ad0293aca..09d69ad1d 100755 --- a/src/op_mode/version.py +++ b/src/op_mode/version.py @@ -1,6 +1,6 @@ #!/usr/bin/env python3 # -# Copyright (C) 2016-2022 VyOS maintainers and contributors +# Copyright (C) 2016-2024 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as @@ -30,11 +30,15 @@ from jinja2 import Template version_output_tmpl = """ Version: VyOS {{version}} Release train: {{release_train}} +Release flavor: {{flavor}} Built by: {{built_by}} Built on: {{built_on}} Build UUID: {{build_uuid}} Build commit ID: {{build_git}} +{%- if build_comment %} +Build comment: {{build_comment}} +{% endif %} Architecture: {{system_arch}} Boot via: {{boot_via}} |