summaryrefslogtreecommitdiff
path: root/src/system
diff options
context:
space:
mode:
Diffstat (limited to 'src/system')
-rwxr-xr-xsrc/system/keepalived-fifo.py85
1 files changed, 43 insertions, 42 deletions
diff --git a/src/system/keepalived-fifo.py b/src/system/keepalived-fifo.py
index 3b4330e9b..1fba0d75b 100755
--- a/src/system/keepalived-fifo.py
+++ b/src/system/keepalived-fifo.py
@@ -1,6 +1,6 @@
#!/usr/bin/env python3
#
-# Copyright (C) 2020 VyOS maintainers and contributors
+# Copyright (C) 2020-2021 VyOS maintainers and contributors
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 or later as
@@ -13,7 +13,6 @@
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-#
import os
import time
@@ -22,11 +21,13 @@ import argparse
import threading
import re
import json
-from pathlib import Path
-from queue import Queue
import logging
+
+from queue import Queue
from logging.handlers import SysLogHandler
+from vyos.ifconfig.vrrp import VRRP
+from vyos.configquery import ConfigTreeQuery
from vyos.util import cmd
# configure logging
@@ -44,12 +45,13 @@ mdns_update_command = 'sudo /usr/libexec/vyos/conf_mode/service_mdns-repeater.py
class KeepalivedFifo:
# init - read command arguments
def __init__(self):
- logger.info("Starting FIFO pipe for Keepalived")
+ logger.info('Starting FIFO pipe for Keepalived')
# define program arguments
cmd_args_parser = argparse.ArgumentParser(description='Create FIFO pipe for keepalived and process notify events', add_help=False)
cmd_args_parser.add_argument('PIPE', help='path to the FIFO pipe')
# parse arguments
cmd_args = cmd_args_parser.parse_args()
+
self._config_load()
self.pipe_path = cmd_args.PIPE
@@ -61,33 +63,34 @@ class KeepalivedFifo:
# load configuration
def _config_load(self):
try:
- # read the dictionary file with configuration
- with open('/run/keepalived_config.dict', 'r') as dict_file:
- vrrp_config_dict = json.load(dict_file)
+ base = ['high-availability', 'vrrp']
+ conf = ConfigTreeQuery()
+ if not conf.exists(base):
+ raise ValueError()
+
+ # Read VRRP configuration directly from CLI
+ vrrp_config_dict = conf.get_config_dict(base, key_mangling=('-', '_'),
+ get_first_key=True)
self.vrrp_config = {'vrrp_groups': {}, 'sync_groups': {}}
- # save VRRP instances to the new dictionary
- for vrrp_group in vrrp_config_dict['vrrp_groups']:
- self.vrrp_config['vrrp_groups'][vrrp_group['name']] = {
- 'STOP': vrrp_group.get('stop_script'),
- 'FAULT': vrrp_group.get('fault_script'),
- 'BACKUP': vrrp_group.get('backup_script'),
- 'MASTER': vrrp_group.get('master_script')
- }
- # save VRRP sync groups to the new dictionary
- for sync_group in vrrp_config_dict['sync_groups']:
- self.vrrp_config['sync_groups'][sync_group['name']] = {
- 'STOP': sync_group.get('stop_script'),
- 'FAULT': sync_group.get('fault_script'),
- 'BACKUP': sync_group.get('backup_script'),
- 'MASTER': sync_group.get('master_script')
- }
- logger.debug("Loaded configuration: {}".format(self.vrrp_config))
+ for key in ['group', 'sync_group']:
+ if key not in vrrp_config_dict:
+ continue
+ for group, group_config in vrrp_config_dict[key].items():
+ if 'transition_script' not in group_config:
+ continue
+ self.vrrp_config['vrrp_groups'][group] = {
+ 'STOP': group_config['transition_script'].get('stop'),
+ 'FAULT': group_config['transition_script'].get('fault'),
+ 'BACKUP': group_config['transition_script'].get('backup'),
+ 'MASTER': group_config['transition_script'].get('master'),
+ }
+ logger.info(f'Loaded configuration: {self.vrrp_config}')
except Exception as err:
- logger.error("Unable to load configuration: {}".format(err))
+ logger.error(f'Unable to load configuration: {err}')
# run command
def _run_command(self, command):
- logger.debug("Running the command: {}".format(command))
+ logger.debug(f'Running the command: {command}')
try:
cmd(command)
except OSError as err:
@@ -95,14 +98,14 @@ class KeepalivedFifo:
# create FIFO pipe
def pipe_create(self):
- if Path(self.pipe_path).exists():
- logger.info("PIPE already exist: {}".format(self.pipe_path))
+ if os.path.exists(self.pipe_path):
+ logger.info(f'PIPE already exist: {self.pipe_path}')
else:
os.mkfifo(self.pipe_path)
# process message from pipe
def pipe_process(self):
- logger.debug("Message processing start")
+ logger.debug('Message processing start')
regex_notify = re.compile(r'^(?P<type>\w+) "(?P<name>[\w-]+)" (?P<state>\w+) (?P<priority>\d+)$', re.MULTILINE)
while self.stopme.is_set() is False:
# wait for a new message event from pipe_wait
@@ -113,14 +116,14 @@ class KeepalivedFifo:
# get all messages from queue and try to process them
while self.message_queue.empty() is not True:
message = self.message_queue.get()
- logger.debug("Received message: {}".format(message))
+ logger.debug(f'Received message: {message}')
notify_message = regex_notify.search(message)
# try to process a message if it looks valid
if notify_message:
n_type = notify_message.group('type')
n_name = notify_message.group('name')
n_state = notify_message.group('state')
- logger.info("{} {} changed state to {}".format(n_type, n_name, n_state))
+ logger.info(f'{n_type} {n_name} changed state to {n_state}')
# check and run commands for VRRP instances
if n_type == 'INSTANCE':
if os.path.exists(mdns_running_file):
@@ -135,7 +138,7 @@ class KeepalivedFifo:
if n_type == 'GROUP':
if os.path.exists(mdns_running_file):
cmd(mdns_update_command)
-
+
if n_name in self.vrrp_config['sync_groups'] and n_state in self.vrrp_config['sync_groups'][n_name]:
n_script = self.vrrp_config['sync_groups'][n_name].get(n_state)
if n_script:
@@ -143,16 +146,16 @@ class KeepalivedFifo:
# mark task in queue as done
self.message_queue.task_done()
except Exception as err:
- logger.error("Error processing message: {}".format(err))
- logger.debug("Terminating messages processing thread")
+ logger.error(f'Error processing message: {err}')
+ logger.debug('Terminating messages processing thread')
# wait for messages
def pipe_wait(self):
- logger.debug("Message reading start")
+ logger.debug('Message reading start')
self.pipe_read = os.open(self.pipe_path, os.O_RDONLY | os.O_NONBLOCK)
while self.stopme.is_set() is False:
# sleep a bit to not produce 100% CPU load
- time.sleep(0.1)
+ time.sleep(0.250)
try:
# try to read a message from PIPE
message = os.read(self.pipe_read, 500)
@@ -165,21 +168,19 @@ class KeepalivedFifo:
except Exception as err:
# ignore the "Resource temporarily unavailable" error
if err.errno != 11:
- logger.error("Error receiving message: {}".format(err))
+ logger.error(f'Error receiving message: {err}')
- logger.debug("Closing FIFO pipe")
+ logger.debug('Closing FIFO pipe')
os.close(self.pipe_read)
-
# handle SIGTERM signal to allow finish all messages processing
def sigterm_handle(signum, frame):
- logger.info("Ending processing: Received SIGTERM signal")
+ logger.info('Ending processing: Received SIGTERM signal')
fifo.stopme.set()
thread_wait_message.join()
fifo.message_event.set()
thread_process_message.join()
-
signal.signal(signal.SIGTERM, sigterm_handle)
# init our class