#!/usr/bin/env python3
#
# Copyright (C) 2020 VyOS maintainers and contributors
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 or later as
# published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
#
import os
import time
import signal
import argparse
import threading
import subprocess
import re
import json
from pathlib import Path
from queue import Queue
import logging
from logging.handlers import SysLogHandler
# configure logging
logger = logging.getLogger(__name__)
logs_format = logging.Formatter('%(filename)s: %(message)s')
logs_handler_syslog = SysLogHandler('/dev/log')
logs_handler_syslog.setFormatter(logs_format)
logger.addHandler(logs_handler_syslog)
logger.setLevel(logging.DEBUG)
# class for all operations
class KeepalivedFifo:
# init - read command arguments
def __init__(self):
logger.info("Starting FIFO pipe for Keepalived")
# define program arguments
cmd_args_parser = argparse.ArgumentParser(description='Create FIFO pipe for keepalived and process notify events', add_help=False)
cmd_args_parser.add_argument('PIPE', help='path to the FIFO pipe')
# parse arguments
cmd_args = cmd_args_parser.parse_args()
self._config_load()
self.pipe_path = cmd_args.PIPE
# create queue for messages and events for syncronization
self.message_queue = Queue(maxsize=100)
self.stopme = threading.Event()
self.message_event = threading.Event()
# load configuration
def _config_load(self):
try:
# read the dictionary file with configuration
with open('/run/keepalived_config.dict', 'r') as dict_file:
vrrp_config_dict = json.load(dict_file)
self.vrrp_config = {'vrrp_groups': {}, 'sync_groups': {}}
# save VRRP instances to the new dictionary
for vrrp_group in vrrp_config_dict['vrrp_groups']:
self.vrrp_config['vrrp_groups'][vrrp_group['name']] = {
'STOP': vrrp_group.get('stop_script'),
'FAULT': vrrp_group.get('fault_script'),
'BACKUP': vrrp_group.get('backup_script'),
'MASTER': vrrp_group.get('master_script')
}
# save VRRP sync groups to the new dictionary
for sync_group in vrrp_config_dict['sync_groups']:
self.vrrp_config['sync_groups'][sync_group['name']] = {
'STOP': sync_group.get('stop_script'),
'FAULT': sync_group.get('fault_script'),
'BACKUP': sync_group.get('backup_script'),
'MASTER': sync_group.get('master_script')
}
logger.debug("Loaded configuration: {}".format(self.vrrp_config))
except Exception as err:
logger.error("Unable to load configuration: {}".format(err))
# run command
def _run_command(self, command):
try:
logger.debug("Running the command: {}".format(command))
process = subprocess.Popen(command.split(' '), stdout=subprocess.PIPE, stdin=subprocess.PIPE, universal_newlines=True)
stdout, stderr = process.communicate()
if process.returncode != 0:
raise Exception("The command \"{}\" returned status {}. Error: {}".format(command, process.returncode, stderr))
except Exception as err:
logger.error("Unable to execute command \"{}\": {}".format(command, err))
# create FIFO pipe
def pipe_create(self):
if Path(self.pipe_path).exists():
logger.info("PIPE already exist: {}".format(self.pipe_path))
else:
os.mkfifo(self.pipe_path)
# process message from pipe
def pipe_process(self):
logger.debug("Message processing start")
regex_notify = re.compile(r'^(?P\w+) "(?P[\w-]+)" (?P\w+) (?P\d+)$', re.MULTILINE)
while self.stopme.is_set() is False:
# wait for a new message event from pipe_wait
self.message_event.wait()
try:
# clear mesage event flag
self.message_event.clear()
# get all messages from queue and try to process them
while self.message_queue.empty() is not True:
message = self.message_queue.get()
logger.debug("Received message: {}".format(message))
notify_message = regex_notify.search(message)
# try to process a message if it looks valid
if notify_message:
n_type = notify_message.group('type')
n_name = notify_message.group('name')
n_state = notify_message.group('state')
logger.info("{} {} changed state to {}".format(n_type, n_name, n_state))
# check and run commands for VRRP instances
if n_type == 'INSTANCE':
if n_name in self.vrrp_config['vrrp_groups'] and n_state in self.vrrp_config['vrrp_groups'][n_name]:
n_script = self.vrrp_config['vrrp_groups'][n_name].get(n_state)
if n_script:
self._run_command(n_script)
# check and run commands for VRRP sync groups
# currently, this is not available in VyOS CLI
if n_type == 'GROUP':
if n_name in self.vrrp_config['sync_groups'] and n_state in self.vrrp_config['sync_groups'][n_name]:
n_script = self.vrrp_config['sync_groups'][n_name].get(n_state)
if n_script:
self._run_command(n_script)
# mark task in queue as done
self.message_queue.task_done()
except Exception as err:
logger.error("Error processing message: {}".format(err))
logger.debug("Terminating messages processing thread")
# wait for messages
def pipe_wait(self):
logger.debug("Message reading start")
self.pipe_read = os.open(self.pipe_path, os.O_RDONLY | os.O_NONBLOCK)
while self.stopme.is_set() is False:
# sleep a bit to not produce 100% CPU load
time.sleep(0.1)
try:
# try to read a message from PIPE
message = os.read(self.pipe_read, 500)
if message:
# split PIPE content by lines and put them into queue
for line in message.decode().strip().splitlines():
self.message_queue.put(line)
# set new message flag to start processing
self.message_event.set()
except Exception as err:
# ignore the "Resource temporarily unavailable" error
if err.errno != 11:
logger.error("Error receiving message: {}".format(err))
logger.debug("Closing FIFO pipe")
os.close(self.pipe_read)
# handle SIGTERM signal to allow finish all messages processing
def sigterm_handle(signum, frame):
logger.info("Ending processing: Received SIGTERM signal")
fifo.stopme.set()
thread_wait_message.join()
fifo.message_event.set()
thread_process_message.join()
signal.signal(signal.SIGTERM, sigterm_handle)
# init our class
fifo = KeepalivedFifo()
# try to create PIPE if it is not exist yet
# It looks like keepalived do it before the script will be running, but if we
# will decide to run this not from keepalived config, then we may get in
# trouble. So it is betteer to leave this here.
fifo.pipe_create()
# create and run dedicated threads for reading and processing messages
thread_wait_message = threading.Thread(target=fifo.pipe_wait)
thread_process_message = threading.Thread(target=fifo.pipe_process)
thread_wait_message.start()
thread_process_message.start()