diff options
Diffstat (limited to 'src')
| -rwxr-xr-x | src/conf_mode/firewall.py | 11 | ||||
| -rwxr-xr-x | src/conf_mode/interfaces_wireguard.py | 28 | ||||
| -rwxr-xr-x | src/conf_mode/nat.py | 6 | ||||
| -rwxr-xr-x | src/conf_mode/nat66.py | 4 | ||||
| -rwxr-xr-x | src/conf_mode/policy.py | 16 | ||||
| -rwxr-xr-x | src/conf_mode/protocols_bgp.py | 5 | ||||
| -rwxr-xr-x | src/conf_mode/system_option.py | 45 | ||||
| -rwxr-xr-x | src/conf_mode/vpn_ipsec.py | 2 | ||||
| -rwxr-xr-x | src/conf_mode/vpn_openconnect.py | 2 | ||||
| -rw-r--r-- | src/etc/systemd/system/frr.service.d/override.conf | 6 | ||||
| -rwxr-xr-x | src/migration-scripts/reverse-proxy/2-to-3 | 66 | ||||
| -rwxr-xr-x | src/op_mode/firewall.py | 21 | ||||
| -rwxr-xr-x | src/services/vyos-domain-resolver | 29 | ||||
| -rwxr-xr-x | src/validators/bgp-large-community-list | 21 | ||||
| -rwxr-xr-x | src/validators/cpu | 43 | 
15 files changed, 260 insertions, 45 deletions
| diff --git a/src/conf_mode/firewall.py b/src/conf_mode/firewall.py index 274ca2ce6..348eaeba3 100755 --- a/src/conf_mode/firewall.py +++ b/src/conf_mode/firewall.py @@ -17,6 +17,8 @@  import os  import re +from glob import glob +  from sys import exit  from vyos.base import Warning  from vyos.config import Config @@ -30,6 +32,7 @@ from vyos.firewall import geoip_update  from vyos.template import render  from vyos.utils.dict import dict_search_args  from vyos.utils.dict import dict_search_recursive +from vyos.utils.file import write_file  from vyos.utils.process import call  from vyos.utils.process import cmd  from vyos.utils.process import rc_cmd @@ -37,7 +40,6 @@ from vyos.utils.network import get_vrf_members  from vyos.utils.network import get_interface_vrf  from vyos import ConfigError  from vyos import airbag -from pathlib import Path  from subprocess import run as subp_run  airbag.enable() @@ -626,10 +628,11 @@ def apply(firewall):      domain_action = 'restart'      if dict_search_args(firewall, 'group', 'remote_group') or dict_search_args(firewall, 'group', 'domain_group') or firewall['ip_fqdn'].items() or firewall['ip6_fqdn'].items():          text = f'# Automatically generated by firewall.py\nThis file indicates that vyos-domain-resolver service is used by the firewall.\n' -        Path(domain_resolver_usage).write_text(text) +        write_file(domain_resolver_usage, text)      else: -        Path(domain_resolver_usage).unlink(missing_ok=True) -        if not Path('/run').glob('use-vyos-domain-resolver*'): +        if os.path.exists(domain_resolver_usage): +            os.unlink(domain_resolver_usage) +        if not glob('/run/use-vyos-domain-resolver*'):              domain_action = 'stop'      call(f'systemctl {domain_action} vyos-domain-resolver.service') diff --git a/src/conf_mode/interfaces_wireguard.py b/src/conf_mode/interfaces_wireguard.py index 3ca6ecdca..770667df1 100755 --- a/src/conf_mode/interfaces_wireguard.py +++ b/src/conf_mode/interfaces_wireguard.py @@ -14,6 +14,9 @@  # You should have received a copy of the GNU General Public License  # along with this program.  If not, see <http://www.gnu.org/licenses/>. +import os + +from glob import glob  from sys import exit  from vyos.config import Config @@ -35,7 +38,6 @@ from vyos.utils.network import is_wireguard_key_pair  from vyos.utils.process import call  from vyos import ConfigError  from vyos import airbag -from pathlib import Path  airbag.enable() @@ -145,19 +147,11 @@ def generate(wireguard):  def apply(wireguard):      check_kmod('wireguard') -    if 'rebuild_required' in wireguard or 'deleted' in wireguard: -        wg = WireGuardIf(**wireguard) -        # WireGuard only supports peer removal based on the configured public-key, -        # by deleting the entire interface this is the shortcut instead of parsing -        # out all peers and removing them one by one. -        # -        # Peer reconfiguration will always come with a short downtime while the -        # WireGuard interface is recreated (see below) -        wg.remove() +    wg = WireGuardIf(**wireguard) -    # Create the new interface if required -    if 'deleted' not in wireguard: -        wg = WireGuardIf(**wireguard) +    if 'deleted' in wireguard: +        wg.remove() +    else:          wg.update(wireguard)      domain_resolver_usage = '/run/use-vyos-domain-resolver-interfaces-wireguard-' + wireguard['ifname'] @@ -168,12 +162,12 @@ def apply(wireguard):          from vyos.utils.file import write_file          text = f'# Automatically generated by interfaces_wireguard.py\nThis file indicates that vyos-domain-resolver service is used by the interfaces_wireguard.\n' -        text += "intefaces:\n" + "".join([f"  - {peer}\n" for peer in wireguard['peers_need_resolve']]) -        Path(domain_resolver_usage).write_text(text) +        text += "interfaces:\n" + "".join([f"  - {peer}\n" for peer in wireguard['peers_need_resolve']])          write_file(domain_resolver_usage, text)      else: -        Path(domain_resolver_usage).unlink(missing_ok=True) -        if not Path('/run').glob('use-vyos-domain-resolver*'): +        if os.path.exists(domain_resolver_usage): +            os.unlink(domain_resolver_usage) +        if not glob('/run/use-vyos-domain-resolver*'):              domain_action = 'stop'      call(f'systemctl {domain_action} vyos-domain-resolver.service') diff --git a/src/conf_mode/nat.py b/src/conf_mode/nat.py index 504b3e82a..6c88e5cfd 100755 --- a/src/conf_mode/nat.py +++ b/src/conf_mode/nat.py @@ -16,8 +16,8 @@  import os +from glob import glob  from sys import exit -from pathlib import Path  from vyos.base import Warning  from vyos.config import Config @@ -265,9 +265,9 @@ def apply(nat):              text = f'# Automatically generated by nat.py\nThis file indicates that vyos-domain-resolver service is used by nat.\n'              write_file(domain_resolver_usage, text)          elif os.path.exists(domain_resolver_usage): -            Path(domain_resolver_usage).unlink(missing_ok=True) +            os.unlink(domain_resolver_usage) -            if not Path('/run').glob('use-vyos-domain-resolver*'): +            if not glob('/run/use-vyos-domain-resolver*'):                  domain_action = 'stop'          call(f'systemctl {domain_action} vyos-domain-resolver.service') diff --git a/src/conf_mode/nat66.py b/src/conf_mode/nat66.py index 95dfae3a5..c65950c9e 100755 --- a/src/conf_mode/nat66.py +++ b/src/conf_mode/nat66.py @@ -92,6 +92,10 @@ def verify(nat):              if prefix != None:                  if not is_ipv6(prefix):                      raise ConfigError(f'{err_msg} source-prefix not specified') +                 +            if 'destination' in config and 'group' in config['destination']: +                if len({'address_group', 'network_group', 'domain_group'} & set(config['destination']['group'])) > 1: +                    raise ConfigError('Only one address-group, network-group or domain-group can be specified')      if dict_search('destination.rule', nat):          for rule, config in dict_search('destination.rule', nat).items(): diff --git a/src/conf_mode/policy.py b/src/conf_mode/policy.py index a90e33e81..ec9005890 100755 --- a/src/conf_mode/policy.py +++ b/src/conf_mode/policy.py @@ -14,6 +14,7 @@  # You should have received a copy of the GNU General Public License  # along with this program.  If not, see <http://www.gnu.org/licenses/>. +import re  from sys import exit  from vyos.config import Config @@ -24,9 +25,20 @@ from vyos.frrender import get_frrender_dict  from vyos.utils.dict import dict_search  from vyos.utils.process import is_systemd_service_running  from vyos import ConfigError +from vyos.base import Warning  from vyos import airbag  airbag.enable() +# Sanity checks for large-community-list regex: +# * Require complete 3-tuples, no blank members. Catch missed & doubled colons.  +# * Permit appropriate community separators (whitespace, underscore) +# * Permit common regex between tuples while requiring at least one separator +#   (eg, "1:1:1_.*_4:4:4", matching "1:1:1 4:4:4" and "1:1:1 2:2:2 4:4:4", +#        but not "1:1:13 24:4:4") +# Best practice: stick with basic patterns, mind your wildcards and whitespace. +# Regex that doesn't match this pattern will be allowed with a warning.  +large_community_regex_pattern = r'([^: _]+):([^: _]+):([^: _]+)([ _]([^:]+):([^: _]+):([^: _]+))*' +  def community_action_compatibility(actions: dict) -> bool:      """      Check compatibility of values in community and large community sections @@ -147,6 +159,10 @@ def verify(config_dict):                      if 'regex' not in rule_config:                          raise ConfigError(f'A regex {mandatory_error}') +                if policy_type == 'large_community_list': +                    if not re.fullmatch(large_community_regex_pattern, rule_config['regex']): +                        Warning(f'"policy large-community-list {instance} rule {rule} regex" does not follow expected form and may not match as expected.') +                  if policy_type in ['prefix_list', 'prefix_list6']:                      if 'prefix' not in rule_config:                          raise ConfigError(f'A prefix {mandatory_error}') diff --git a/src/conf_mode/protocols_bgp.py b/src/conf_mode/protocols_bgp.py index 99d8eb9d1..e29f3358a 100755 --- a/src/conf_mode/protocols_bgp.py +++ b/src/conf_mode/protocols_bgp.py @@ -527,6 +527,10 @@ def verify(config_dict):                          raise ConfigError(                              'Please unconfigure import vrf commands before using vpn commands in dependent VRFs!') +                # Verify if the route-map exists +                if dict_search('route_map.vrf.import', afi_config) is not None: +                    verify_route_map(afi_config['route_map']['vrf']['import'], bgp) +                  if (dict_search('route_map.vrf.import', afi_config) is not None                          or  dict_search('import.vrf', afi_config) is not None):                      # FRR error: please unconfigure vpn to vrf commands before @@ -541,7 +545,6 @@ def verify(config_dict):                          raise ConfigError('Please unconfigure route-map VPN to VRF commands before '\                                            'using "import vrf" commands!') -                  # Verify that the export/import route-maps do exist                  for export_import in ['export', 'import']:                      tmp = dict_search(f'route_map.vpn.{export_import}', afi_config) diff --git a/src/conf_mode/system_option.py b/src/conf_mode/system_option.py index 3d76a1eaa..5acad6599 100755 --- a/src/conf_mode/system_option.py +++ b/src/conf_mode/system_option.py @@ -127,6 +127,9 @@ def generate(options):      # occurance is used for having the appropriate options passed to GRUB      # when re-configuring options on the CLI.      cmdline_options = [] +    kernel_opts = options.get('kernel', {}) +    k_cpu_opts = kernel_opts.get('cpu', {}) +    k_memory_opts = kernel_opts.get('memory', {})      if 'kernel' in options:          if 'disable_mitigations' in options['kernel']:              cmdline_options.append('mitigations=off') @@ -138,6 +141,48 @@ def generate(options):                  f'initcall_blacklist=acpi_cpufreq_init amd_pstate={mode}')          if 'quiet' in options['kernel']:              cmdline_options.append('quiet') + +    if 'disable_hpet' in kernel_opts: +        cmdline_options.append('hpet=disable') + +    if 'disable_mce' in kernel_opts: +        cmdline_options.append('mce=off') + +    if 'disable_softlockup' in kernel_opts: +        cmdline_options.append('nosoftlockup') + +    # CPU options +    isol_cpus = k_cpu_opts.get('isolate_cpus') +    if isol_cpus: +        cmdline_options.append(f'isolcpus={isol_cpus}') + +    nohz_full = k_cpu_opts.get('nohz_full') +    if nohz_full: +        cmdline_options.append(f'nohz_full={nohz_full}') + +    rcu_nocbs = k_cpu_opts.get('rcu_no_cbs') +    if rcu_nocbs: +        cmdline_options.append(f'rcu_nocbs={rcu_nocbs}') + +    if 'disable_nmi_watchdog' in k_cpu_opts: +        cmdline_options.append('nmi_watchdog=0') + +    # Memory options +    if 'disable_numa_balancing' in k_memory_opts: +        cmdline_options.append('numa_balancing=disable') + +    default_hp_size = k_memory_opts.get('default_hugepage_size') +    if default_hp_size: +        cmdline_options.append(f'default_hugepagesz={default_hp_size}') + +    hp_sizes = k_memory_opts.get('hugepage_size') +    if hp_sizes: +        for size, settings in hp_sizes.items(): +            cmdline_options.append(f'hugepagesz={size}') +            count = settings.get('hugepage_count') +            if count: +                cmdline_options.append(f'hugepages={count}') +      grub_util.update_kernel_cmdline_options(' '.join(cmdline_options))      return None diff --git a/src/conf_mode/vpn_ipsec.py b/src/conf_mode/vpn_ipsec.py index 2754314f7..ac25cd671 100755 --- a/src/conf_mode/vpn_ipsec.py +++ b/src/conf_mode/vpn_ipsec.py @@ -727,7 +727,7 @@ def generate(ipsec):                          for remote_prefix in remote_prefixes:                              local_net = ipaddress.ip_network(local_prefix)                              remote_net = ipaddress.ip_network(remote_prefix) -                            if local_net.overlaps(remote_net): +                            if local_net.subnet_of(remote_net):                                  if passthrough is None:                                      passthrough = []                                  passthrough.append(local_prefix) diff --git a/src/conf_mode/vpn_openconnect.py b/src/conf_mode/vpn_openconnect.py index 42785134f..0346c7819 100755 --- a/src/conf_mode/vpn_openconnect.py +++ b/src/conf_mode/vpn_openconnect.py @@ -93,7 +93,7 @@ def verify(ocserv):                  "radius" in ocserv["authentication"]["mode"]):                      raise ConfigError('OpenConnect authentication modes are mutually-exclusive, remove either local or radius from your configuration')              if "radius" in ocserv["authentication"]["mode"]: -                if not ocserv["authentication"]['radius']['server']: +                if 'server' not in ocserv['authentication']['radius']:                      raise ConfigError('OpenConnect authentication mode radius requires at least one RADIUS server')              if "local" in ocserv["authentication"]["mode"]:                  if not ocserv.get("authentication", {}).get("local_users"): diff --git a/src/etc/systemd/system/frr.service.d/override.conf b/src/etc/systemd/system/frr.service.d/override.conf index 614b4f7ed..a4a73ecd9 100644 --- a/src/etc/systemd/system/frr.service.d/override.conf +++ b/src/etc/systemd/system/frr.service.d/override.conf @@ -3,9 +3,11 @@ After=vyos-router.service  [Service]  LimitNOFILE=4096 -ExecStartPre=/bin/bash -c 'mkdir -p /run/frr/config; \ +ExecStartPre=/bin/bash -c 'if [ ! -f /run/frr/config/frr.conf ]; then \ +             mkdir -p /run/frr/config; \               echo "log syslog" > /run/frr/config/frr.conf; \               echo "log facility local7" >> /run/frr/config/frr.conf; \               chown frr:frr /run/frr/config/frr.conf; \               chmod 664 /run/frr/config/frr.conf; \ -             mount --bind /run/frr/config/frr.conf /etc/frr/frr.conf' +             mount --bind /run/frr/config/frr.conf /etc/frr/frr.conf; \ +fi;' diff --git a/src/migration-scripts/reverse-proxy/2-to-3 b/src/migration-scripts/reverse-proxy/2-to-3 new file mode 100755 index 000000000..ac539618e --- /dev/null +++ b/src/migration-scripts/reverse-proxy/2-to-3 @@ -0,0 +1,66 @@ +# Copyright 2025 VyOS maintainers and contributors <maintainers@vyos.io> +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public License +# along with this library.  If not, see <http://www.gnu.org/licenses/>. + +# T7429: logging facility "all" unavailable in code + +from vyos.configtree import ConfigTree + +base = ['load-balancing', 'haproxy'] +unsupported_facilities = ['all', 'authpriv', 'mark'] + +def config_migrator(config, config_path: list) -> None: +    if not config.exists(config_path): +        return +    # Remove unsupported backend HAProxy syslog facilities form CLI +    # Works for both backend and service CLI nodes +    for service_backend in config.list_nodes(config_path): +        log_path = config_path + [service_backend, 'logging', 'facility'] +        if not config.exists(log_path): +            continue +        # Remove unsupported syslog facilities form CLI +        for facility in config.list_nodes(log_path): +            if facility in unsupported_facilities: +                config.delete(log_path + [facility]) +                continue +            # Remove unsupported facility log level form CLI. VyOS will fallback +            # to default log level if not set +            if config.exists(log_path + [facility, 'level']): +                tmp = config.return_value(log_path + [facility, 'level']) +                if tmp == 'all': +                    config.delete(log_path + [facility, 'level']) + +def migrate(config: ConfigTree) -> None: +    if not config.exists(base): +        # Nothing to do +        return + +    # Remove unsupported syslog facilities form CLI +    global_path = base + ['global-parameters', 'logging', 'facility'] +    if config.exists(global_path): +        for facility in config.list_nodes(global_path): +            if facility in unsupported_facilities: +                config.delete(global_path + [facility]) +                continue +            # Remove unsupported facility log level form CLI. VyOS will fallback +            # to default log level if not set +            if config.exists(global_path + [facility, 'level']): +                tmp = config.return_value(global_path + [facility, 'level']) +                if tmp == 'all': +                    config.delete(global_path + [facility, 'level']) + +    # Remove unsupported backend HAProxy syslog facilities from CLI +    config_migrator(config, base + ['backend']) +    # Remove unsupported service HAProxy syslog facilities from CLI +    config_migrator(config, base + ['service']) diff --git a/src/op_mode/firewall.py b/src/op_mode/firewall.py index ac47e3273..f3309ee34 100755 --- a/src/op_mode/firewall.py +++ b/src/op_mode/firewall.py @@ -18,6 +18,7 @@ import argparse  import ipaddress  import json  import re +from signal import signal, SIGPIPE, SIG_DFL  import tabulate  import textwrap @@ -25,6 +26,9 @@ from vyos.config import Config  from vyos.utils.process import cmd  from vyos.utils.dict import dict_search_args +signal(SIGPIPE, SIG_DFL) + +  def get_config_node(conf, node=None, family=None, hook=None, priority=None):      if node == 'nat':          if family == 'ipv6': @@ -648,12 +652,14 @@ def show_firewall_group(name=None):                  references = find_references(group_type, remote_name)                  row = [remote_name, textwrap.fill(remote_conf.get('description') or '', 50), group_type, '\n'.join(references) or 'N/D']                  members = get_nftables_remote_group_members("ipv4", 'vyos_filter', f'R_{remote_name}') +                members6 = get_nftables_remote_group_members("ipv6", 'vyos_filter', f'R6_{remote_name}')                  if 'url' in remote_conf:                      # display only the url if no members are found for both views -                    if not members: +                    if not members and not members6:                          if args.detail: -                            header_tail = ['Remote URL'] +                            header_tail = ['IPv6 Members', 'Remote URL'] +                            row.append('N/D')                              row.append('N/D')                              row.append(remote_conf['url'])                          else: @@ -662,8 +668,15 @@ def show_firewall_group(name=None):                      else:                          # display all table elements in detail view                          if args.detail: -                            header_tail = ['Remote URL'] -                            row += [' '.join(members)] +                            header_tail = ['IPv6 Members', 'Remote URL'] +                            if members: +                                row.append(' '.join(members)) +                            else: +                                row.append('N/D') +                            if members6: +                                row.append(' '.join(members6)) +                            else: +                                row.append('N/D')                              row.append(remote_conf['url'])                              rows.append(row)                          else: diff --git a/src/services/vyos-domain-resolver b/src/services/vyos-domain-resolver index 4419fc4a7..fb18724af 100755 --- a/src/services/vyos-domain-resolver +++ b/src/services/vyos-domain-resolver @@ -28,7 +28,7 @@ from vyos.utils.commit import commit_in_progress  from vyos.utils.dict import dict_search_args  from vyos.utils.kernel import WIREGUARD_REKEY_AFTER_TIME  from vyos.utils.file import makedir, chmod_775, write_file, read_file -from vyos.utils.network import is_valid_ipv4_address_or_range +from vyos.utils.network import is_valid_ipv4_address_or_range, is_valid_ipv6_address_or_range  from vyos.utils.process import cmd  from vyos.utils.process import run  from vyos.xml_ref import get_defaults @@ -143,10 +143,11 @@ def update_remote_group(config):          for set_name, remote_config in remote_groups.items():              if 'url' not in remote_config:                  continue -            nft_set_name = f'R_{set_name}' +            nft_ip_set_name = f'R_{set_name}' +            nft_ip6_set_name = f'R6_{set_name}'              # Create list file if necessary -            list_file = os.path.join(firewall_config_dir, f"{nft_set_name}.txt") +            list_file = os.path.join(firewall_config_dir, f"{nft_ip_set_name}.txt")              if not os.path.exists(list_file):                  write_file(list_file, '', user="root", group="vyattacfg", mode=0o644) @@ -159,16 +160,32 @@ def update_remote_group(config):              # Read list file              ip_list = [] +            ip6_list = [] +            invalid_list = []              for line in read_file(list_file).splitlines():                  line_first_word = line.strip().partition(' ')[0]                  if is_valid_ipv4_address_or_range(line_first_word):                      ip_list.append(line_first_word) +                elif is_valid_ipv6_address_or_range(line_first_word): +                    ip6_list.append(line_first_word) +                else: +                    if line_first_word[0].isalnum(): +                        invalid_list.append(line_first_word) -            # Load tables +            # Load ip tables              for table in ipv4_tables: -                if (table, nft_set_name) in valid_sets: -                    conf_lines += nft_output(table, nft_set_name, ip_list) +                if (table, nft_ip_set_name) in valid_sets: +                    conf_lines += nft_output(table, nft_ip_set_name, ip_list) + +            # Load ip6 tables +            for table in ipv6_tables: +                if (table, nft_ip6_set_name) in valid_sets: +                    conf_lines += nft_output(table, nft_ip6_set_name, ip6_list) + +            invalid_str = ", ".join(invalid_list) +            if invalid_str: +                logger.info(f'Invalid address for set {set_name}: {invalid_str}')              count += 1 diff --git a/src/validators/bgp-large-community-list b/src/validators/bgp-large-community-list index 9ba5b27eb..75276630c 100755 --- a/src/validators/bgp-large-community-list +++ b/src/validators/bgp-large-community-list @@ -1,6 +1,6 @@  #!/usr/bin/env python3  # -# Copyright (C) 2021-2023 VyOS maintainers and contributors +# Copyright (C) 2021-2025 VyOS maintainers and contributors  #  # This program is free software; you can redistribute it and/or modify  # it under the terms of the GNU General Public License version 2 or later as @@ -17,18 +17,27 @@  import re  import sys -pattern = '(.*):(.*):(.*)' -allowedChars = { '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '.', '+', '*', '?', '^', '$', '(', ')', '[', ']', '{', '}', '|', '\\', ':', '-' } +allowedChars = { '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '.', '+', '*', '?', '^', '$', '(', ')', '[', ']', '{', '}', '|', '\\', ':', '-', '_', ' ' }  if __name__ == '__main__':      if len(sys.argv) != 2:          sys.exit(1) -    value = sys.argv[1].split(':') -    if not len(value) == 3: +    value = sys.argv[1] + +    # Require at least one well-formed large-community tuple in the pattern.  +    tmp = value.split(':') +    if len(tmp) < 3: +        sys.exit(1) + +    # Simple guard against invalid community & 1003.2 pattern chars +    if not set(value).issubset(allowedChars):          sys.exit(1) -    if not (re.match(pattern, sys.argv[1]) and set(sys.argv[1]).issubset(allowedChars)): +    # Don't feed FRR badly formed regex +    try: +        re.compile(value) +    except re.error:          sys.exit(1)      sys.exit(0) diff --git a/src/validators/cpu b/src/validators/cpu new file mode 100755 index 000000000..959a49248 --- /dev/null +++ b/src/validators/cpu @@ -0,0 +1,43 @@ +#!/usr/bin/python3 + +import re +import sys + +MAX_CPU = 511 + + +def validate_isolcpus(value): +    pattern = re.compile(r'^(\d{1,3}(-\d{1,3})?)(,(\d{1,3}(-\d{1,3})?))*$') +    if not pattern.fullmatch(value): +        return False + +    flat_list = [] +    for part in value.split(','): +        if '-' in part: +            start, end = map(int, part.split('-')) +            if start > end or start < 0 or end > MAX_CPU: +                return False +            flat_list.extend(range(start, end + 1)) +        else: +            num = int(part) +            if num < 0 or num > MAX_CPU: +                return False +            flat_list.append(num) + +    for i in range(1, len(flat_list)): +        if flat_list[i] <= flat_list[i - 1]: +            return False + +    return True + + +if __name__ == "__main__": +    if len(sys.argv) != 2: +        print("Usage: python3 cpu.py <cpu_list>") +        sys.exit(1) + +    input_value = sys.argv[1] +    if validate_isolcpus(input_value): +        sys.exit(0) +    else: +        sys.exit(1) | 
