From c95ef34d6fa09aead77fe6dd17b6ca3d5ce61eaa Mon Sep 17 00:00:00 2001 From: zsdc Date: Wed, 1 Apr 2020 17:57:11 +0300 Subject: keepalived: T1350: Replaced transition scripts logic In old implementation, all notify scripts are configured directly inside a keepalived.conf. This way is not recommended even by keepalived as scripts execution is not tracked and they may be running not in order and killed before the actual work will be finished. We have observed such situations in very rare cases during tests. New implementation use FIFO pipe, to which keepalived send any state changes. And these notifications are read by a daemon, saved in a queue and processed one by one, which eliminates the situation when the scripts creating inconsistent config or environment. --- src/conf_mode/vrrp.py | 47 +++++----- src/system/keepalived-fifo.py | 190 ++++++++++++++++++++++++++++++++++++++ src/system/vrrp-script-wrapper.py | 64 ------------- 3 files changed, 215 insertions(+), 86 deletions(-) create mode 100755 src/system/keepalived-fifo.py delete mode 100755 src/system/vrrp-script-wrapper.py diff --git a/src/conf_mode/vrrp.py b/src/conf_mode/vrrp.py index a09e55a2f..b17f1ce82 100755 --- a/src/conf_mode/vrrp.py +++ b/src/conf_mode/vrrp.py @@ -19,16 +19,18 @@ import os import sys import subprocess import ipaddress - +import json import jinja2 import vyos.config import vyos.keepalived from vyos import ConfigError +from pathlib import Path daemon_file = "/etc/default/keepalived" config_file = "/etc/keepalived/keepalived.conf" +config_dict_path = "/run/keepalived_config.dict" config_tmpl = """ # Autogenerated by VyOS @@ -38,16 +40,18 @@ config_tmpl = """ global_defs { dynamic_interfaces script_user root + notify_fifo /run/keepalived_notify_fifo + notify_fifo_script /usr/libexec/vyos/system/keepalived-fifo.py } {% for group in groups -%} {% if group.health_check_script -%} vrrp_script healthcheck_{{ group.name }} { - script "{{ group.health_check_script }}" - interval {{ group.health_check_interval }} - fall {{ group.health_check_count }} - rise 1 + script "{{ group.health_check_script }}" + interval {{ group.health_check_interval }} + fall {{ group.health_check_count }} + rise 1 } {% endif %} @@ -106,22 +110,6 @@ vrrp_instance {{ group.name }} { healthcheck_{{ group.name }} } {% endif -%} - - {% if group.master_script -%} - notify_master "/usr/libexec/vyos/system/vrrp-script-wrapper.py --state master --group {{ group.name }} --interface {{ group.interface }} {{ group.master_script }}" - {% endif -%} - - {% if group.backup_script -%} - notify_backup "/usr/libexec/vyos/system/vrrp-script-wrapper.py --state backup --group {{ group.name }} --interface {{ group.interface }} {{ group.backup_script }}" - {% endif -%} - - {% if group.fault_script -%} - notify_fault "/usr/libexec/vyos/system/vrrp-script-wrapper.py --state fault --group {{ group.name }} --interface {{ group.interface }} {{ group.fault_script }}" - {% endif -%} - - {% if group.stop_script -%} - notify_stop "/usr/libexec/vyos/system/vrrp-script-wrapper.py --state stop --group {{ group.name }} --interface {{ group.interface }} {{ group.stop_script }}" - {% endif -%} } {% endfor -%} @@ -153,6 +141,7 @@ daemon_tmpl = """ DAEMON_ARGS="--snmp" """ + def get_config(): vrrp_groups = [] sync_groups = [] @@ -240,8 +229,13 @@ def get_config(): sync_groups.append(sync_group) + # create a file with dict with proposed configuration + with open("{}.temp".format(config_dict_path), 'w') as dict_file: + dict_file.write(json.dumps({'vrrp_groups': vrrp_groups, 'sync_groups': sync_groups})) + return (vrrp_groups, sync_groups) + def verify(data): vrrp_groups, sync_groups = data @@ -308,6 +302,7 @@ def verify(data): if not (m in vrrp_group_names): raise ConfigError("VRRP sync-group {0} refers to VRRP group {1}, but group {1} does not exist".format(sync_group["name"], m)) + def generate(data): vrrp_groups, sync_groups = data @@ -318,7 +313,7 @@ def generate(data): if g["disable"]: print("Warning: ignoring disabled VRRP group {0} in sync-group {1}".format(g["name"], sync_group["name"])) # Filter out disabled groups - vrrp_groups = list(filter(lambda x: x["disable"] != True, vrrp_groups)) + vrrp_groups = list(filter(lambda x: x["disable"] is not True, vrrp_groups)) tmpl = jinja2.Template(config_tmpl) config_text = tmpl.render({"groups": vrrp_groups, "sync_groups": sync_groups}) @@ -330,9 +325,17 @@ def generate(data): return None + def apply(data): vrrp_groups, sync_groups = data if vrrp_groups: + # safely rename a temporary file with configuration dict + try: + dict_file = Path("{}.temp".format(config_dict_path)) + dict_file.rename(Path(config_dict_path)) + except Exception as err: + print("Unable to rename the file with keepalived config for FIFO pipe: {}".format(err)) + if not vyos.keepalived.vrrp_running(): print("Starting the VRRP process") ret = subprocess.call("sudo systemctl restart keepalived.service", shell=True) diff --git a/src/system/keepalived-fifo.py b/src/system/keepalived-fifo.py new file mode 100755 index 000000000..5e85da4a4 --- /dev/null +++ b/src/system/keepalived-fifo.py @@ -0,0 +1,190 @@ +#!/usr/bin/env python3 +# +# Copyright (C) 2020 VyOS maintainers and contributors +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 2 or later as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# + +import os +import time +import signal +import argparse +import threading +import subprocess +import re +import json +from pathlib import Path +from queue import Queue +import logging +from logging.handlers import SysLogHandler + +# configure logging +logger = logging.getLogger(__name__) +logs_format = logging.Formatter('%(filename)s: %(message)s') +logs_handler_syslog = SysLogHandler('/dev/log') +logs_handler_syslog.setFormatter(logs_format) +logger.addHandler(logs_handler_syslog) +logger.setLevel(logging.DEBUG) + + +# class for all operations +class KeepalivedFifo: + # init - read command arguments + def __init__(self): + logger.info("Starting FIFO pipe for Keepalived") + # define program arguments + cmd_args_parser = argparse.ArgumentParser(description='Create FIFO pipe for keepalived and process notify events', add_help=False) + cmd_args_parser.add_argument('PIPE', help='path to the FIFO pipe') + # parse arguments + cmd_args = cmd_args_parser.parse_args() + self._config_load() + self.pipe_path = cmd_args.PIPE + + # create queue for messages and events for syncronization + self.message_queue = Queue(maxsize=100) + self.stopme = threading.Event() + self.message_event = threading.Event() + + # load configuration + def _config_load(self): + try: + # read the dictionary file with configuration + with open('/run/keepalived_config.dict', 'r') as dict_file: + vrrp_config_dict = json.load(dict_file) + self.vrrp_config = {'vrrp_groups': {}, 'sync_groups': {}} + # save VRRP instances to the new dictionary + for vrrp_group in vrrp_config_dict['vrrp_groups']: + self.vrrp_config['vrrp_groups'][vrrp_group['name']] = { + 'STOP': vrrp_group.get('stop_script'), + 'FAULT': vrrp_group.get('fault_script'), + 'BACKUP': vrrp_group.get('backup_script'), + 'MASTER': vrrp_group.get('master_script') + } + # save VRRP sync groups to the new dictionary + for sync_group in vrrp_config_dict['sync_groups']: + self.vrrp_config['sync_groups'][sync_group['name']] = { + 'STOP': sync_group.get('stop_script'), + 'FAULT': sync_group.get('fault_script'), + 'BACKUP': sync_group.get('backup_script'), + 'MASTER': sync_group.get('master_script') + } + logger.debug("Loaded configuration: {}".format(self.vrrp_config)) + except Exception as err: + logger.error("Unable to load configuration: {}".format(err)) + + # run command + def _run_command(self, command): + try: + logger.debug("Running the command: {}".format(command)) + process = subprocess.Popen(command.split(' '), stdout=subprocess.PIPE, stdin=subprocess.PIPE, universal_newlines=True) + stdout, stderr = process.communicate() + if process.returncode != 0: + raise Exception("The command \"{}\" returned status {}. Error: {}".format(command, process.returncode, stderr)) + except Exception as err: + logger.error("Unable to execute command \"{}\": {}".format(command, err)) + + # create FIFO pipe + def pipe_create(self): + if Path(self.pipe_path).exists(): + logger.info("PIPE already exist: {}".format(self.pipe_path)) + else: + os.mkfifo(self.pipe_path) + + # process message from pipe + def pipe_process(self): + logger.debug("Message processing start") + regex_notify = re.compile(r'^(?P\w+) "(?P[\w-]+)" (?P\w+) (?P\d+)$', re.MULTILINE) + while self.stopme.is_set() is False: + # wait for a new message event from pipe_wait + self.message_event.wait() + try: + # clear mesage event flag + self.message_event.clear() + # get all messages from queue and try to process them + while self.message_queue.empty() is not True: + message = self.message_queue.get() + logger.debug("Received message: {}".format(message)) + notify_message = regex_notify.search(message) + # try to process a message if it looks valid + if notify_message: + n_type = notify_message.group('type') + n_name = notify_message.group('name') + n_state = notify_message.group('state') + logger.info("{} {} changed state to {}".format(n_type, n_name, n_state)) + # check and run commands for VRRP instances + if n_type == 'INSTANCE': + if n_name in self.vrrp_config['vrrp_groups'] and n_state in self.vrrp_config['vrrp_groups'][n_name]: + n_script = self.vrrp_config['vrrp_groups'][n_name].get(n_state) + if n_script: + self._run_command(n_script) + # check and run commands for VRRP sync groups + # currently, this is not available in VyOS CLI + if n_type == 'GROUP': + if n_name in self.vrrp_config['sync_groups'] and n_state in self.vrrp_config['sync_groups'][n_name]: + n_script = self.vrrp_config['sync_groups'][n_name].get(n_state) + if n_script: + self._run_command(n_script) + # mark task in queue as done + self.message_queue.task_done() + except Exception as err: + logger.error("Error processing message: {}".format(err)) + logger.debug("Terminating messages processing thread") + + # wait for messages + def pipe_wait(self): + logger.debug("Message reading start") + self.pipe_read = os.open(self.pipe_path, os.O_RDONLY | os.O_NONBLOCK) + while self.stopme.is_set() is False: + # sleep a bit to not produce 100% CPU load + time.sleep(0.1) + try: + # try to read a message from PIPE + message = os.read(self.pipe_read, 500) + if message: + # split PIPE content by lines and put them into queue + for line in message.decode().strip().splitlines(): + self.message_queue.put(line) + # set new message flag to start processing + self.message_event.set() + except Exception as err: + # ignore the "Resource temporarily unavailable" error + if err.errno != 11: + logger.error("Error receiving message: {}".format(err)) + + logger.debug("Closing FIFO pipe") + os.close(self.pipe_read) + + +# handle SIGTERM signal to allow finish all messages processing +def sigterm_handle(signum, frame): + logger.info("Ending processing: Received SIGTERM signal") + fifo.stopme.set() + thread_wait_message.join() + fifo.message_event.set() + thread_process_message.join() + + +signal.signal(signal.SIGTERM, sigterm_handle) + +# init our class +fifo = KeepalivedFifo() +# try to create PIPE if it is not exist yet +# It looks like keepalived do it before the script will be running, but if we +# will decide to run this not from keepalived config, then we may get in +# trouble. So it is betteer to leave this here. +fifo.pipe_create() +# create and run dedicated threads for reading and processing messages +thread_wait_message = threading.Thread(target=fifo.pipe_wait) +thread_process_message = threading.Thread(target=fifo.pipe_process) +thread_wait_message.start() +thread_process_message.start() diff --git a/src/system/vrrp-script-wrapper.py b/src/system/vrrp-script-wrapper.py deleted file mode 100755 index c28ecba55..000000000 --- a/src/system/vrrp-script-wrapper.py +++ /dev/null @@ -1,64 +0,0 @@ -#!/usr/bin/env python3 -# -# Copyright (C) 2018 VyOS maintainers and contributors -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License version 2 or later as -# published by the Free Software Foundation. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . -# -# - -import os -import sys -import subprocess -import argparse -import syslog - -import vyos.util - - -parser = argparse.ArgumentParser() -parser.add_argument("-t", "--state", type=str, help="VRRP state") -parser.add_argument("-g", "--group", type=str, help="VRRP group") -parser.add_argument("-i", "--interface", type=str, help="Network interface") -parser.add_argument("script", nargs='+') - -syslog.openlog('vyos-vrrp-wrapper') - -args = parser.parse_args() -if not args.script or not args.state or not args.group \ - or not args.interface: - parser.print_usage() - sys.exit(1) - -# Fixup: the reason we take multiple "script" arguments is that people may want -# to pass arguments to the script -args.script = " ".join(args.script) - -exitcode = 0 -# Change the process GID to the config owners group to avoid screwing up -# running config permissions -os.setgid(vyos.util.get_cfg_group_id()) -syslog.syslog(syslog.LOG_NOTICE, 'Running transition script {0} for VRRP group {1}'.format(args.script, args.group)) -try: - ret = subprocess.call("%s %s %s %s" % ( args.script, args.state, args.interface, args.group), shell=True) - if ret != 0: - syslog.syslog(syslog.LOG_ERR, "Transition script {0} failed, exit status: {1}".format(args.script, ret)) - exitcode = ret -except Exception as e: - syslog.syslog(syslog.LOG_ERR, "Failed to execute transition script {0}: {1}".format(args.script, e)) - exitcode = 1 - -if exitcode == 0: - syslog.syslog(syslog.LOG_NOTICE, "Transition script {0} executed successfully".format(args.script)) - -syslog.closelog() -sys.exit(exitcode) -- cgit v1.2.3