# vi: ts=4 expandtab
#
# Copyright (C) 2020 Sentrium S.L.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 3, as
# published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
import re
from pathlib import Path
from cloudinit import log as logging
from cloudinit.settings import PER_INSTANCE
try:
from vyos.configtree import ConfigTree
except ImportError as err:
print(f'The module cannot be imported: {err}')
# configure logging
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
frequency = PER_INSTANCE
# path to templates directory, required for analyzing nodes
TEMPLATES_DIR = '/opt/vyatta/share/vyatta-cfg/templates/'
# VyOS configuration files
CFG_FILE_MAIN = '/opt/vyatta/etc/config/config.boot'
CFG_FILE_DEFAULT = '/opt/vyatta/etc/config.boot.default'
# get list of all tag nodes
def get_tag_nodes():
try:
logger.debug("Searching for tag nodes in configuration templates")
tag_nodes = []
# search for node.tag directories
node_tag_dirs = Path(TEMPLATES_DIR).rglob('node.tag')
# add each found directory to tag nodes list
for node_tag in node_tag_dirs:
current_node_path = node_tag.relative_to(TEMPLATES_DIR).parent.parts
tag_nodes.append(current_node_path)
logger.debug("Tag nodes: {}".format(tag_nodes))
return tag_nodes
except Exception as err:
logger.error("Failed to find tag nodes: {}".format(err))
# get list of all multi nodes
def get_multi_nodes():
try:
logger.debug("Searching for multi nodes in configuration templates")
multi_nodes = []
# search for node.def files
node_def_files = Path(TEMPLATES_DIR).rglob('node.def')
# prepare filter to match multi node files
regex_filter = re.compile(r'^multi:.*$', re.MULTILINE)
# add each node.def with multi mark to list
for node_def_file in node_def_files:
file_content = node_def_file.read_text()
if regex_filter.search(file_content):
current_multi_path = node_def_file.relative_to(
TEMPLATES_DIR).parent.parts
multi_nodes.append(current_multi_path)
logger.debug("Multi nodes: {}".format(multi_nodes))
return multi_nodes
except Exception as err:
logger.error("Failed to find multi nodes: {}".format(err))
# check if a node is inside a list of nodes
def inside_nodes_list(node_path, nodes_list):
match = False
# compare with all items in list
for list_item in nodes_list:
# continue only if lengths are equal
if len(list_item) == len(node_path):
# match parts of nodes paths one by one
for element_id in list(range(len(node_path))):
# break is items does not match
if not (node_path[element_id] == list_item[element_id] or
list_item[element_id] == 'node.tag'):
break
# match as tag node only if both nodes have the same length
elif ((node_path[element_id] == list_item[element_id] or
list_item[element_id] == 'node.tag') and
element_id == len(node_path) - 1):
match = True
# break if we have a match
if match is True:
break
return match
# convert string to command (action + path + value)
def string_to_command(stringcmd):
# regex to split string to action + path + value
regex_filter = re.compile(
r'^(?Pset|delete) (?P[^\']+)( \'(?P.*)\')*$'
)
if regex_filter.search(stringcmd):
# command structure
command = {
'cmd_action':
regex_filter.search(stringcmd).group('cmd_action'),
'cmd_path':
regex_filter.search(stringcmd).group('cmd_path').split(),
'cmd_value':
regex_filter.search(stringcmd).group('cmd_value')
}
return command
else:
return None
# helper: mark nodes as tag in config, if this is necessary
def mark_tag(config, node_path, tag_nodes):
current_node_path = []
# check and mark each element in command path if necessary
for current_node in node_path:
current_node_path.append(current_node)
if inside_nodes_list(current_node_path, tag_nodes):
logger.debug(
"Marking node as tag: \"{}\"".format(current_node_path))
config.set_tag(current_node_path)
# apply "set" command
def apply_command_set(config, tag_nodes, multi_nodes, command):
# if a node is multi type add value instead replacing
replace_option = not inside_nodes_list(command['cmd_path'],
multi_nodes + tag_nodes)
if not replace_option:
logger.debug(
"{} is a multi or tag node, adding value instead overwriting".
format(command['cmd_path']))
config.set(command['cmd_path'],
command['cmd_value'],
replace=replace_option)
# mark configured nodes as tag, if this is necessary
mark_tag(config, command['cmd_path'], tag_nodes)
# apply "delete" command
def apply_command_delete(config, command):
# delete a value
if command['cmd_value']:
config.delete_value(command['cmd_path'], command['cmd_value'])
# otherwise delete path
else:
config.delete(command['cmd_path'])
# apply command
def apply_commands(config, commands_list):
# get all tag and multi nodes
tag_nodes = get_tag_nodes()
multi_nodes = get_multi_nodes()
# roll through configration commands
for command_line in commands_list:
# convert command to format, appliable to configuration
command = string_to_command(command_line)
# if conversion is successful, apply the command
if command:
logger.debug("Configuring command: \"{}\"".format(command_line))
try:
if command['cmd_action'] == 'set':
apply_command_set(config, tag_nodes, multi_nodes, command)
if command['cmd_action'] == 'delete':
apply_command_delete(config, command)
except Exception as err:
logger.error("Unable to configure command: {}".format(err))
# main config handler
def handle(name, cfg, cloud, log, _args):
# Get commands list to configure
commands_list = cfg.get('vyos_config_commands', [])
logger.debug("Commands to configure: {}".format(commands_list))
if commands_list:
# open configuration file
if Path(CFG_FILE_MAIN).exists():
config_file_path = CFG_FILE_MAIN
else:
config_file_path = CFG_FILE_DEFAULT
logger.debug("Using configuration file: {}".format(config_file_path))
with open(config_file_path, 'r') as f:
config_file = f.read()
# load a file content into a config object
config = ConfigTree(config_file)
# Add configuration from the vyos_config_commands cloud-config section
try:
apply_commands(config, commands_list)
except Exception as err:
logger.error(
"Failed to apply configuration commands: {}".format(err))
# save a new configuration file
try:
with open(config_file_path, 'w') as f:
f.write(config.to_string())
logger.debug(
"Configuration file saved: {}".format(config_file_path))
except Exception as err:
logger.error("Failed to write config into the file {}: {}".format(
config_file_path, err))