summaryrefslogtreecommitdiff
path: root/cloudinit/config/schema.py
diff options
context:
space:
mode:
Diffstat (limited to 'cloudinit/config/schema.py')
-rw-r--r--cloudinit/config/schema.py239
1 files changed, 141 insertions, 98 deletions
diff --git a/cloudinit/config/schema.py b/cloudinit/config/schema.py
index d772b4f9..8ec4ab6a 100644
--- a/cloudinit/config/schema.py
+++ b/cloudinit/config/schema.py
@@ -1,26 +1,27 @@
# This file is part of cloud-init. See LICENSE file for license information.
"""schema.py: Set of module functions for processing cloud-config schema."""
-from cloudinit.cmd.devel import read_cfg_paths
-from cloudinit import importer
-from cloudinit.importer import MetaSchema
-from cloudinit.util import find_modules, load_file, error
-
import argparse
-from collections import defaultdict
-from copy import deepcopy
-from functools import partial
import logging
import os
import re
import sys
+from collections import defaultdict
+from copy import deepcopy
+from functools import partial
+
import yaml
+from cloudinit import importer
+from cloudinit.cmd.devel import read_cfg_paths
+from cloudinit.importer import MetaSchema
+from cloudinit.util import error, find_modules, load_file
+
error = partial(error, sys_exit=True)
LOG = logging.getLogger(__name__)
-_YAML_MAP = {True: 'true', False: 'false', None: 'null'}
-CLOUD_CONFIG_HEADER = b'#cloud-config'
+_YAML_MAP = {True: "true", False: "false", None: "null"}
+CLOUD_CONFIG_HEADER = b"#cloud-config"
SCHEMA_DOC_TMPL = """
{name}
{title_underbar}
@@ -40,9 +41,10 @@ SCHEMA_DOC_TMPL = """
"""
SCHEMA_PROPERTY_TMPL = "{prefix}**{prop_name}:** ({prop_type}) {description}"
SCHEMA_LIST_ITEM_TMPL = (
- '{prefix}Each item in **{prop_name}** list supports the following keys:')
-SCHEMA_EXAMPLES_HEADER = '\n**Examples**::\n\n'
-SCHEMA_EXAMPLES_SPACER_TEMPLATE = '\n # --- Example{0} ---'
+ "{prefix}Each item in **{prop_name}** list supports the following keys:"
+)
+SCHEMA_EXAMPLES_HEADER = "\n**Examples**::\n\n"
+SCHEMA_EXAMPLES_SPACER_TEMPLATE = "\n # --- Example{0} ---"
class SchemaValidationError(ValueError):
@@ -56,10 +58,12 @@ class SchemaValidationError(ValueError):
"""
self.schema_errors = schema_errors
error_messages = [
- '{0}: {1}'.format(config_key, message)
- for config_key, message in schema_errors]
+ "{0}: {1}".format(config_key, message)
+ for config_key, message in schema_errors
+ ]
message = "Cloud config schema errors: {0}".format(
- ', '.join(error_messages))
+ ", ".join(error_messages)
+ )
super(SchemaValidationError, self).__init__(message)
@@ -72,8 +76,9 @@ def is_schema_byte_string(checker, instance):
from jsonschema import Draft4Validator
except ImportError:
return False
- return (Draft4Validator.TYPE_CHECKER.is_type(instance, "string") or
- isinstance(instance, (bytes,)))
+ return Draft4Validator.TYPE_CHECKER.is_type(
+ instance, "string"
+ ) or isinstance(instance, (bytes,))
def get_jsonschema_validator():
@@ -102,25 +107,28 @@ def get_jsonschema_validator():
# http://json-schema.org/understanding-json-schema/reference/object.html#pattern-properties
strict_metaschema["properties"]["label"] = {"type": "string"}
- if hasattr(Draft4Validator, 'TYPE_CHECKER'): # jsonschema 3.0+
+ if hasattr(Draft4Validator, "TYPE_CHECKER"): # jsonschema 3.0+
type_checker = Draft4Validator.TYPE_CHECKER.redefine(
- 'string', is_schema_byte_string)
+ "string", is_schema_byte_string
+ )
cloudinitValidator = create(
meta_schema=strict_metaschema,
validators=Draft4Validator.VALIDATORS,
version="draft4",
- type_checker=type_checker)
+ type_checker=type_checker,
+ )
else: # jsonschema 2.6 workaround
types = Draft4Validator.DEFAULT_TYPES
# Allow bytes as well as string (and disable a spurious unsupported
# assignment-operation pylint warning which appears because this
# code path isn't written against the latest jsonschema).
- types['string'] = (str, bytes) # pylint: disable=E1137
+ types["string"] = (str, bytes) # pylint: disable=E1137
cloudinitValidator = create(
meta_schema=strict_metaschema,
validators=Draft4Validator.VALIDATORS,
version="draft4",
- default_types=types)
+ default_types=types,
+ )
return (cloudinitValidator, FormatChecker)
@@ -147,12 +155,14 @@ def validate_cloudconfig_metaschema(validator, schema: dict, throw=True):
if throw:
raise SchemaValidationError(
schema_errors=(
- ('.'.join([str(p) for p in err.path]), err.message),
+ (".".join([str(p) for p in err.path]), err.message),
)
) from err
LOG.warning(
"Meta-schema validation failed, attempting to validate config "
- "anyway: %s", err)
+ "anyway: %s",
+ err,
+ )
def validate_cloudconfig_schema(
@@ -176,7 +186,8 @@ def validate_cloudconfig_schema(
(cloudinitValidator, FormatChecker) = get_jsonschema_validator()
if strict_metaschema:
validate_cloudconfig_metaschema(
- cloudinitValidator, schema, throw=False)
+ cloudinitValidator, schema, throw=False
+ )
except ImportError:
LOG.debug("Ignoring schema validation. jsonschema is not present")
return
@@ -184,7 +195,7 @@ def validate_cloudconfig_schema(
validator = cloudinitValidator(schema, format_checker=FormatChecker())
errors = ()
for error in sorted(validator.iter_errors(config), key=lambda e: e.path):
- path = '.'.join([str(p) for p in error.path])
+ path = ".".join([str(p) for p in error.path])
errors += ((path, error.message),)
if errors:
if strict:
@@ -208,12 +219,13 @@ def annotated_cloudconfig_file(cloudconfig, original_content, schema_errors):
schemapaths = {}
if cloudconfig:
schemapaths = _schemapath_for_cloudconfig(
- cloudconfig, original_content)
+ cloudconfig, original_content
+ )
errors_by_line = defaultdict(list)
error_footer = []
annotated_content = []
for path, msg in schema_errors:
- match = re.match(r'format-l(?P<line>\d+)\.c(?P<col>\d+).*', path)
+ match = re.match(r"format-l(?P<line>\d+)\.c(?P<col>\d+).*", path)
if match:
line, col = match.groups()
errors_by_line[int(line)].append(msg)
@@ -221,24 +233,26 @@ def annotated_cloudconfig_file(cloudconfig, original_content, schema_errors):
col = None
errors_by_line[schemapaths[path]].append(msg)
if col is not None:
- msg = 'Line {line} column {col}: {msg}'.format(
- line=line, col=col, msg=msg)
- lines = original_content.decode().split('\n')
+ msg = "Line {line} column {col}: {msg}".format(
+ line=line, col=col, msg=msg
+ )
+ lines = original_content.decode().split("\n")
error_index = 1
for line_number, line in enumerate(lines, 1):
errors = errors_by_line[line_number]
if errors:
error_label = []
for error in errors:
- error_label.append('E{0}'.format(error_index))
- error_footer.append('# E{0}: {1}'.format(error_index, error))
+ error_label.append("E{0}".format(error_index))
+ error_footer.append("# E{0}: {1}".format(error_index, error))
error_index += 1
- annotated_content.append(line + '\t\t# ' + ','.join(error_label))
+ annotated_content.append(line + "\t\t# " + ",".join(error_label))
else:
annotated_content.append(line)
annotated_content.append(
- '# Errors: -------------\n{0}\n\n'.format('\n'.join(error_footer)))
- return '\n'.join(annotated_content)
+ "# Errors: -------------\n{0}\n\n".format("\n".join(error_footer))
+ )
+ return "\n".join(annotated_content)
def validate_cloudconfig_file(config_path, schema, annotate=False):
@@ -266,15 +280,18 @@ def validate_cloudconfig_file(config_path, schema, annotate=False):
else:
if not os.path.exists(config_path):
raise RuntimeError(
- 'Configfile {0} does not exist'.format(
- config_path
- )
+ "Configfile {0} does not exist".format(config_path)
)
content = load_file(config_path, decode=False)
if not content.startswith(CLOUD_CONFIG_HEADER):
errors = (
- ('format-l1.c1', 'File {0} needs to begin with "{1}"'.format(
- config_path, CLOUD_CONFIG_HEADER.decode())),)
+ (
+ "format-l1.c1",
+ 'File {0} needs to begin with "{1}"'.format(
+ config_path, CLOUD_CONFIG_HEADER.decode()
+ ),
+ ),
+ )
error = SchemaValidationError(errors)
if annotate:
print(annotated_cloudconfig_file({}, content, error.schema_errors))
@@ -284,27 +301,32 @@ def validate_cloudconfig_file(config_path, schema, annotate=False):
except (yaml.YAMLError) as e:
line = column = 1
mark = None
- if hasattr(e, 'context_mark') and getattr(e, 'context_mark'):
- mark = getattr(e, 'context_mark')
- elif hasattr(e, 'problem_mark') and getattr(e, 'problem_mark'):
- mark = getattr(e, 'problem_mark')
+ if hasattr(e, "context_mark") and getattr(e, "context_mark"):
+ mark = getattr(e, "context_mark")
+ elif hasattr(e, "problem_mark") and getattr(e, "problem_mark"):
+ mark = getattr(e, "problem_mark")
if mark:
line = mark.line + 1
column = mark.column + 1
- errors = (('format-l{line}.c{col}'.format(line=line, col=column),
- 'File {0} is not valid yaml. {1}'.format(
- config_path, str(e))),)
+ errors = (
+ (
+ "format-l{line}.c{col}".format(line=line, col=column),
+ "File {0} is not valid yaml. {1}".format(config_path, str(e)),
+ ),
+ )
error = SchemaValidationError(errors)
if annotate:
print(annotated_cloudconfig_file({}, content, error.schema_errors))
raise error from e
try:
- validate_cloudconfig_schema(
- cloudconfig, schema, strict=True)
+ validate_cloudconfig_schema(cloudconfig, schema, strict=True)
except SchemaValidationError as e:
if annotate:
- print(annotated_cloudconfig_file(
- cloudconfig, content, e.schema_errors))
+ print(
+ annotated_cloudconfig_file(
+ cloudconfig, content, e.schema_errors
+ )
+ )
raise
@@ -315,26 +337,26 @@ def _schemapath_for_cloudconfig(config, original_content):
@param original_content: The simple file content of the cloud-config file
"""
# FIXME Doesn't handle multi-line lists or multi-line strings
- content_lines = original_content.decode().split('\n')
+ content_lines = original_content.decode().split("\n")
schema_line_numbers = {}
list_index = 0
- RE_YAML_INDENT = r'^(\s*)'
+ RE_YAML_INDENT = r"^(\s*)"
scopes = []
for line_number, line in enumerate(content_lines, 1):
indent_depth = len(re.match(RE_YAML_INDENT, line).groups()[0])
line = line.strip()
- if not line or line.startswith('#'):
+ if not line or line.startswith("#"):
continue
if scopes:
previous_depth, path_prefix = scopes[-1]
else:
previous_depth = -1
- path_prefix = ''
- if line.startswith('- '):
+ path_prefix = ""
+ if line.startswith("- "):
# Process list items adding a list_index to the path prefix
- previous_list_idx = '.%d' % (list_index - 1)
+ previous_list_idx = ".%d" % (list_index - 1)
if path_prefix and path_prefix.endswith(previous_list_idx):
- path_prefix = path_prefix[:-len(previous_list_idx)]
+ path_prefix = path_prefix[: -len(previous_list_idx)]
key = str(list_index)
schema_line_numbers[key] = line_number
item_indent = len(re.match(RE_YAML_INDENT, line[1:]).groups()[0])
@@ -346,26 +368,26 @@ def _schemapath_for_cloudconfig(config, original_content):
else:
# Process non-list lines setting value if present
list_index = 0
- key, value = line.split(':', 1)
+ key, value = line.split(":", 1)
if path_prefix:
# Append any existing path_prefix for a fully-pathed key
- key = path_prefix + '.' + key
+ key = path_prefix + "." + key
while indent_depth <= previous_depth:
if scopes:
previous_depth, path_prefix = scopes.pop()
if list_index > 0 and indent_depth == previous_depth:
- path_prefix = '.'.join(path_prefix.split('.')[:-1])
+ path_prefix = ".".join(path_prefix.split(".")[:-1])
break
else:
previous_depth = -1
- path_prefix = ''
+ path_prefix = ""
scopes.append((indent_depth, key))
if value:
value = value.strip()
- if value.startswith('['):
- scopes.append((indent_depth + 2, key + '.0'))
+ if value.startswith("["):
+ scopes.append((indent_depth + 2, key + ".0"))
for inner_list_index in range(0, len(yaml.safe_load(value))):
- list_key = key + '.' + str(inner_list_index)
+ list_key = key + "." + str(inner_list_index)
schema_line_numbers[list_key] = line_number
schema_line_numbers[key] = line_number
return schema_line_numbers
@@ -381,14 +403,14 @@ def _get_property_type(property_dict: dict) -> str:
str(_YAML_MAP.get(k, k)) for k in property_dict["enum"]
]
if isinstance(property_type, list):
- property_type = '/'.join(property_type)
- items = property_dict.get('items', {})
- sub_property_type = items.get('type', '')
+ property_type = "/".join(property_type)
+ items = property_dict.get("items", {})
+ sub_property_type = items.get("type", "")
# Collect each item type
- for sub_item in items.get('oneOf', {}):
+ for sub_item in items.get("oneOf", {}):
if sub_property_type:
- sub_property_type += '/'
- sub_property_type += '(' + _get_property_type(sub_item) + ')'
+ sub_property_type += "/"
+ sub_property_type += "(" + _get_property_type(sub_item) + ")"
if sub_property_type:
return "{0} of {1}".format(property_type, sub_property_type)
return property_type or "UNDEFINED"
@@ -408,17 +430,17 @@ def _parse_description(description, prefix) -> str:
"""
list_paragraph = prefix * 3
description = re.sub(r"(\S)\n(\S)", r"\1 \2", description)
+ description = re.sub(r"\n\n", r"\n\n{}".format(prefix), description)
description = re.sub(
- r"\n\n", r"\n\n{}".format(prefix), description)
- description = re.sub(
- r"\n( +)-", r"\n{}-".format(list_paragraph), description)
+ r"\n( +)-", r"\n{}-".format(list_paragraph), description
+ )
return description
def _get_property_doc(schema: dict, prefix=" ") -> str:
"""Return restructured text describing the supported schema properties."""
- new_prefix = prefix + ' '
+ new_prefix = prefix + " "
properties = []
property_keys = [
schema.get("properties", {}),
@@ -473,16 +495,17 @@ def _get_examples(meta: MetaSchema) -> str:
"""Return restructured text describing the meta examples if present."""
examples = meta.get("examples")
if not examples:
- return ''
+ return ""
rst_content = SCHEMA_EXAMPLES_HEADER
for count, example in enumerate(examples):
# Python2.6 is missing textwrapper.indent
- lines = example.split('\n')
- indented_lines = [' {0}'.format(line) for line in lines]
+ lines = example.split("\n")
+ indented_lines = [" {0}".format(line) for line in lines]
if rst_content != SCHEMA_EXAMPLES_HEADER:
indented_lines.insert(
- 0, SCHEMA_EXAMPLES_SPACER_TEMPLATE.format(count + 1))
- rst_content += '\n'.join(indented_lines)
+ 0, SCHEMA_EXAMPLES_SPACER_TEMPLATE.format(count + 1)
+ )
+ rst_content += "\n".join(indented_lines)
return rst_content
@@ -552,7 +575,8 @@ def load_doc(requested_modules: list) -> str:
if invalid_docs:
error(
"Invalid --docs value {}. Must be one of: {}".format(
- list(invalid_docs), ", ".join(all_modules),
+ list(invalid_docs),
+ ", ".join(all_modules),
)
)
for mod_name in all_modules:
@@ -601,17 +625,35 @@ def get_parser(parser=None):
"""Return a parser for supported cmdline arguments."""
if not parser:
parser = argparse.ArgumentParser(
- prog='cloudconfig-schema',
- description='Validate cloud-config files or document schema')
- parser.add_argument('-c', '--config-file',
- help='Path of the cloud-config yaml file to validate')
- parser.add_argument('--system', action='store_true', default=False,
- help='Validate the system cloud-config userdata')
- parser.add_argument('-d', '--docs', nargs='+',
- help=('Print schema module docs. Choices: all or'
- ' space-delimited cc_names.'))
- parser.add_argument('--annotate', action="store_true", default=False,
- help='Annotate existing cloud-config file with errors')
+ prog="cloudconfig-schema",
+ description="Validate cloud-config files or document schema",
+ )
+ parser.add_argument(
+ "-c",
+ "--config-file",
+ help="Path of the cloud-config yaml file to validate",
+ )
+ parser.add_argument(
+ "--system",
+ action="store_true",
+ default=False,
+ help="Validate the system cloud-config userdata",
+ )
+ parser.add_argument(
+ "-d",
+ "--docs",
+ nargs="+",
+ help=(
+ "Print schema module docs. Choices: all or"
+ " space-delimited cc_names."
+ ),
+ )
+ parser.add_argument(
+ "--annotate",
+ action="store_true",
+ default=False,
+ help="Annotate existing cloud-config file with errors",
+ )
return parser
@@ -619,12 +661,13 @@ def handle_schema_args(name, args):
"""Handle provided schema args and perform the appropriate actions."""
exclusive_args = [args.config_file, args.docs, args.system]
if len([arg for arg in exclusive_args if arg]) != 1:
- error('Expected one of --config-file, --system or --docs arguments')
+ error("Expected one of --config-file, --system or --docs arguments")
full_schema = get_schema()
if args.config_file or args.system:
try:
validate_cloudconfig_file(
- args.config_file, full_schema, args.annotate)
+ args.config_file, full_schema, args.annotate
+ )
except SchemaValidationError as e:
if not args.annotate:
error(str(e))
@@ -643,11 +686,11 @@ def handle_schema_args(name, args):
def main():
"""Tool to validate schema of a cloud-config file or print schema docs."""
parser = get_parser()
- handle_schema_args('cloudconfig-schema', parser.parse_args())
+ handle_schema_args("cloudconfig-schema", parser.parse_args())
return 0
-if __name__ == '__main__':
+if __name__ == "__main__":
sys.exit(main())
# vi: ts=4 expandtab