summaryrefslogtreecommitdiff
path: root/cloudinit/config/schema.py
diff options
context:
space:
mode:
Diffstat (limited to 'cloudinit/config/schema.py')
-rw-r--r--cloudinit/config/schema.py658
1 files changed, 460 insertions, 198 deletions
diff --git a/cloudinit/config/schema.py b/cloudinit/config/schema.py
index 456bab2c..1f969c97 100644
--- a/cloudinit/config/schema.py
+++ b/cloudinit/config/schema.py
@@ -1,22 +1,28 @@
# This file is part of cloud-init. See LICENSE file for license information.
"""schema.py: Set of module functions for processing cloud-config schema."""
-from cloudinit.cmd.devel import read_cfg_paths
-from cloudinit import importer
-from cloudinit.util import find_modules, load_file
-
import argparse
-from collections import defaultdict
-from copy import deepcopy
+import json
import logging
import os
import re
import sys
+from collections import defaultdict
+from copy import deepcopy
+from functools import partial
+
import yaml
-_YAML_MAP = {True: 'true', False: 'false', None: 'null'}
-SCHEMA_UNDEFINED = b'UNDEFINED'
-CLOUD_CONFIG_HEADER = b'#cloud-config'
+from cloudinit import importer
+from cloudinit.cmd.devel import read_cfg_paths
+from cloudinit.importer import MetaSchema
+from cloudinit.util import error, find_modules, load_file
+
+error = partial(error, sys_exit=True)
+LOG = logging.getLogger(__name__)
+
+_YAML_MAP = {True: "true", False: "false", None: "null"}
+CLOUD_CONFIG_HEADER = b"#cloud-config"
SCHEMA_DOC_TMPL = """
{name}
{title_underbar}
@@ -34,11 +40,12 @@ SCHEMA_DOC_TMPL = """
{property_doc}
{examples}
"""
-SCHEMA_PROPERTY_TMPL = '{prefix}**{prop_name}:** ({type}) {description}'
+SCHEMA_PROPERTY_TMPL = "{prefix}**{prop_name}:** ({prop_type}) {description}"
SCHEMA_LIST_ITEM_TMPL = (
- '{prefix}Each item in **{prop_name}** list supports the following keys:')
-SCHEMA_EXAMPLES_HEADER = '\n**Examples**::\n\n'
-SCHEMA_EXAMPLES_SPACER_TEMPLATE = '\n # --- Example{0} ---'
+ "{prefix}Each item in **{prop_name}** list supports the following keys:"
+)
+SCHEMA_EXAMPLES_HEADER = "\n**Examples**::\n\n"
+SCHEMA_EXAMPLES_SPACER_TEMPLATE = "\n # --- Example{0} ---"
class SchemaValidationError(ValueError):
@@ -52,10 +59,12 @@ class SchemaValidationError(ValueError):
"""
self.schema_errors = schema_errors
error_messages = [
- '{0}: {1}'.format(config_key, message)
- for config_key, message in schema_errors]
+ "{0}: {1}".format(config_key, message)
+ for config_key, message in schema_errors
+ ]
message = "Cloud config schema errors: {0}".format(
- ', '.join(error_messages))
+ ", ".join(error_messages)
+ )
super(SchemaValidationError, self).__init__(message)
@@ -68,60 +77,142 @@ def is_schema_byte_string(checker, instance):
from jsonschema import Draft4Validator
except ImportError:
return False
- return (Draft4Validator.TYPE_CHECKER.is_type(instance, "string") or
- isinstance(instance, (bytes,)))
+ return Draft4Validator.TYPE_CHECKER.is_type(
+ instance, "string"
+ ) or isinstance(instance, (bytes,))
+
+
+def get_jsonschema_validator():
+ """Get metaschema validator and format checker
+
+ Older versions of jsonschema require some compatibility changes.
+
+ @returns: Tuple: (jsonschema.Validator, FormatChecker)
+ @raises: ImportError when jsonschema is not present
+ """
+ from jsonschema import Draft4Validator, FormatChecker
+ from jsonschema.validators import create
+
+ # Allow for bytes to be presented as an acceptable valid value for string
+ # type jsonschema attributes in cloud-init's schema.
+ # This allows #cloud-config to provide valid yaml "content: !!binary | ..."
+
+ strict_metaschema = deepcopy(Draft4Validator.META_SCHEMA)
+ strict_metaschema["additionalProperties"] = False
+ # This additional label allows us to specify a different name
+ # than the property key when generating docs.
+ # This is especially useful when using a "patternProperties" regex,
+ # otherwise the property label in the generated docs will be a
+ # regular expression.
+ # http://json-schema.org/understanding-json-schema/reference/object.html#pattern-properties
+ strict_metaschema["properties"]["label"] = {"type": "string"}
-def validate_cloudconfig_schema(config, schema, strict=False):
+ if hasattr(Draft4Validator, "TYPE_CHECKER"): # jsonschema 3.0+
+ type_checker = Draft4Validator.TYPE_CHECKER.redefine(
+ "string", is_schema_byte_string
+ )
+ cloudinitValidator = create(
+ meta_schema=strict_metaschema,
+ validators=Draft4Validator.VALIDATORS,
+ version="draft4",
+ type_checker=type_checker,
+ )
+ else: # jsonschema 2.6 workaround
+ types = Draft4Validator.DEFAULT_TYPES # pylint: disable=E1101
+ # Allow bytes as well as string (and disable a spurious unsupported
+ # assignment-operation pylint warning which appears because this
+ # code path isn't written against the latest jsonschema).
+ types["string"] = (str, bytes) # pylint: disable=E1137
+ cloudinitValidator = create( # pylint: disable=E1123
+ meta_schema=strict_metaschema,
+ validators=Draft4Validator.VALIDATORS,
+ version="draft4",
+ default_types=types,
+ )
+ return (cloudinitValidator, FormatChecker)
+
+
+def validate_cloudconfig_metaschema(validator, schema: dict, throw=True):
+ """Validate provided schema meets the metaschema definition. Return strict
+ Validator and FormatChecker for use in validation
+ @param validator: Draft4Validator instance used to validate the schema
+ @param schema: schema to validate
+ @param throw: Sometimes the validator and checker are required, even if
+ the schema is invalid. Toggle for whether to raise
+ SchemaValidationError or log warnings.
+
+ @raises: ImportError when jsonschema is not present
+ @raises: SchemaValidationError when the schema is invalid
+ """
+
+ from jsonschema.exceptions import SchemaError
+
+ try:
+ validator.check_schema(schema)
+ except SchemaError as err:
+ # Raise SchemaValidationError to avoid jsonschema imports at call
+ # sites
+ if throw:
+ raise SchemaValidationError(
+ schema_errors=(
+ (".".join([str(p) for p in err.path]), err.message),
+ )
+ ) from err
+ LOG.warning(
+ "Meta-schema validation failed, attempting to validate config "
+ "anyway: %s",
+ err,
+ )
+
+
+def validate_cloudconfig_schema(
+ config: dict,
+ schema: dict = None,
+ strict: bool = False,
+ strict_metaschema: bool = False,
+):
"""Validate provided config meets the schema definition.
@param config: Dict of cloud configuration settings validated against
- schema.
+ schema. Ignored if strict_metaschema=True
@param schema: jsonschema dict describing the supported schema definition
- for the cloud config module (config.cc_*).
+ for the cloud config module (config.cc_*). If None, validate against
+ global schema.
@param strict: Boolean, when True raise SchemaValidationErrors instead of
logging warnings.
+ @param strict_metaschema: Boolean, when True validates schema using strict
+ metaschema definition at runtime (currently unused)
@raises: SchemaValidationError when provided config does not validate
against the provided schema.
+ @raises: RuntimeError when provided config sourced from YAML is not a dict.
"""
+ if schema is None:
+ schema = get_schema()
try:
- from jsonschema import Draft4Validator, FormatChecker
- from jsonschema.validators import create, extend
+ (cloudinitValidator, FormatChecker) = get_jsonschema_validator()
+ if strict_metaschema:
+ validate_cloudconfig_metaschema(
+ cloudinitValidator, schema, throw=False
+ )
except ImportError:
- logging.debug(
- 'Ignoring schema validation. python-jsonschema is not present')
+ LOG.debug("Ignoring schema validation. jsonschema is not present")
return
- # Allow for bytes to be presented as an acceptable valid value for string
- # type jsonschema attributes in cloud-init's schema.
- # This allows #cloud-config to provide valid yaml "content: !!binary | ..."
- if hasattr(Draft4Validator, 'TYPE_CHECKER'): # jsonschema 3.0+
- type_checker = Draft4Validator.TYPE_CHECKER.redefine(
- 'string', is_schema_byte_string)
- cloudinitValidator = extend(Draft4Validator, type_checker=type_checker)
- else: # jsonschema 2.6 workaround
- types = Draft4Validator.DEFAULT_TYPES
- # Allow bytes as well as string (and disable a spurious
- # unsupported-assignment-operation pylint warning which appears because
- # this code path isn't written against the latest jsonschema).
- types['string'] = (str, bytes) # pylint: disable=E1137
- cloudinitValidator = create(
- meta_schema=Draft4Validator.META_SCHEMA,
- validators=Draft4Validator.VALIDATORS,
- version="draft4",
- default_types=types)
validator = cloudinitValidator(schema, format_checker=FormatChecker())
errors = ()
for error in sorted(validator.iter_errors(config), key=lambda e: e.path):
- path = '.'.join([str(p) for p in error.path])
+ path = ".".join([str(p) for p in error.path])
errors += ((path, error.message),)
if errors:
if strict:
raise SchemaValidationError(errors)
else:
- messages = ['{0}: {1}'.format(k, msg) for k, msg in errors]
- logging.warning('Invalid config:\n%s', '\n'.join(messages))
+ messages = ["{0}: {1}".format(k, msg) for k, msg in errors]
+ LOG.warning(
+ "Invalid cloud-config provided:\n%s", "\n".join(messages)
+ )
def annotated_cloudconfig_file(cloudconfig, original_content, schema_errors):
@@ -136,14 +227,23 @@ def annotated_cloudconfig_file(cloudconfig, original_content, schema_errors):
if not schema_errors:
return original_content
schemapaths = {}
- if cloudconfig:
- schemapaths = _schemapath_for_cloudconfig(
- cloudconfig, original_content)
errors_by_line = defaultdict(list)
error_footer = []
+ error_header = "# Errors: -------------\n{0}\n\n"
annotated_content = []
+ lines = original_content.decode().split("\n")
+ if not isinstance(cloudconfig, dict):
+ # Return a meaningful message on empty cloud-config
+ return "\n".join(
+ lines
+ + [error_header.format("# E1: Cloud-config is not a YAML dict.")]
+ )
+ if cloudconfig:
+ schemapaths = _schemapath_for_cloudconfig(
+ cloudconfig, original_content
+ )
for path, msg in schema_errors:
- match = re.match(r'format-l(?P<line>\d+)\.c(?P<col>\d+).*', path)
+ match = re.match(r"format-l(?P<line>\d+)\.c(?P<col>\d+).*", path)
if match:
line, col = match.groups()
errors_by_line[int(line)].append(msg)
@@ -151,24 +251,24 @@ def annotated_cloudconfig_file(cloudconfig, original_content, schema_errors):
col = None
errors_by_line[schemapaths[path]].append(msg)
if col is not None:
- msg = 'Line {line} column {col}: {msg}'.format(
- line=line, col=col, msg=msg)
- lines = original_content.decode().split('\n')
+ msg = "Line {line} column {col}: {msg}".format(
+ line=line, col=col, msg=msg
+ )
error_index = 1
for line_number, line in enumerate(lines, 1):
errors = errors_by_line[line_number]
if errors:
error_label = []
for error in errors:
- error_label.append('E{0}'.format(error_index))
- error_footer.append('# E{0}: {1}'.format(error_index, error))
+ error_label.append("E{0}".format(error_index))
+ error_footer.append("# E{0}: {1}".format(error_index, error))
error_index += 1
- annotated_content.append(line + '\t\t# ' + ','.join(error_label))
+ annotated_content.append(line + "\t\t# " + ",".join(error_label))
+
else:
annotated_content.append(line)
- annotated_content.append(
- '# Errors: -------------\n{0}\n\n'.format('\n'.join(error_footer)))
- return '\n'.join(annotated_content)
+ annotated_content.append(error_header.format("\n".join(error_footer)))
+ return "\n".join(annotated_content)
def validate_cloudconfig_file(config_path, schema, annotate=False):
@@ -196,15 +296,18 @@ def validate_cloudconfig_file(config_path, schema, annotate=False):
else:
if not os.path.exists(config_path):
raise RuntimeError(
- 'Configfile {0} does not exist'.format(
- config_path
- )
+ "Configfile {0} does not exist".format(config_path)
)
content = load_file(config_path, decode=False)
if not content.startswith(CLOUD_CONFIG_HEADER):
errors = (
- ('format-l1.c1', 'File {0} needs to begin with "{1}"'.format(
- config_path, CLOUD_CONFIG_HEADER.decode())),)
+ (
+ "format-l1.c1",
+ 'File {0} needs to begin with "{1}"'.format(
+ config_path, CLOUD_CONFIG_HEADER.decode()
+ ),
+ ),
+ )
error = SchemaValidationError(errors)
if annotate:
print(annotated_cloudconfig_file({}, content, error.schema_errors))
@@ -214,27 +317,36 @@ def validate_cloudconfig_file(config_path, schema, annotate=False):
except (yaml.YAMLError) as e:
line = column = 1
mark = None
- if hasattr(e, 'context_mark') and getattr(e, 'context_mark'):
- mark = getattr(e, 'context_mark')
- elif hasattr(e, 'problem_mark') and getattr(e, 'problem_mark'):
- mark = getattr(e, 'problem_mark')
+ if hasattr(e, "context_mark") and getattr(e, "context_mark"):
+ mark = getattr(e, "context_mark")
+ elif hasattr(e, "problem_mark") and getattr(e, "problem_mark"):
+ mark = getattr(e, "problem_mark")
if mark:
line = mark.line + 1
column = mark.column + 1
- errors = (('format-l{line}.c{col}'.format(line=line, col=column),
- 'File {0} is not valid yaml. {1}'.format(
- config_path, str(e))),)
+ errors = (
+ (
+ "format-l{line}.c{col}".format(line=line, col=column),
+ "File {0} is not valid yaml. {1}".format(config_path, str(e)),
+ ),
+ )
error = SchemaValidationError(errors)
if annotate:
print(annotated_cloudconfig_file({}, content, error.schema_errors))
raise error from e
+ if not isinstance(cloudconfig, dict):
+ # Return a meaningful message on empty cloud-config
+ if not annotate:
+ raise RuntimeError("Cloud-config is not a YAML dict.")
try:
- validate_cloudconfig_schema(
- cloudconfig, schema, strict=True)
+ validate_cloudconfig_schema(cloudconfig, schema, strict=True)
except SchemaValidationError as e:
if annotate:
- print(annotated_cloudconfig_file(
- cloudconfig, content, e.schema_errors))
+ print(
+ annotated_cloudconfig_file(
+ cloudconfig, content, e.schema_errors
+ )
+ )
raise
@@ -244,29 +356,30 @@ def _schemapath_for_cloudconfig(config, original_content):
@param config: The yaml.loaded config dictionary of a cloud-config file.
@param original_content: The simple file content of the cloud-config file
"""
- # FIXME Doesn't handle multi-line lists or multi-line strings
- content_lines = original_content.decode().split('\n')
+ # TODO( handle multi-line lists or multi-line strings, inline dicts)
+ content_lines = original_content.decode().split("\n")
schema_line_numbers = {}
list_index = 0
- RE_YAML_INDENT = r'^(\s*)'
+ RE_YAML_INDENT = r"^(\s*)"
scopes = []
+ if not config:
+ return {} # No YAML config dict, no schemapaths to annotate
for line_number, line in enumerate(content_lines, 1):
indent_depth = len(re.match(RE_YAML_INDENT, line).groups()[0])
line = line.strip()
- if not line or line.startswith('#'):
+ if not line or line.startswith("#"):
continue
if scopes:
previous_depth, path_prefix = scopes[-1]
else:
previous_depth = -1
- path_prefix = ''
- if line.startswith('- '):
+ path_prefix = ""
+ if line.startswith("- "):
# Process list items adding a list_index to the path prefix
- previous_list_idx = '.%d' % (list_index - 1)
+ previous_list_idx = ".%d" % (list_index - 1)
if path_prefix and path_prefix.endswith(previous_list_idx):
- path_prefix = path_prefix[:-len(previous_list_idx)]
+ path_prefix = path_prefix[: -len(previous_list_idx)]
key = str(list_index)
- schema_line_numbers[key] = line_number
item_indent = len(re.match(RE_YAML_INDENT, line[1:]).groups()[0])
item_indent += 1 # For the leading '-' character
previous_depth = indent_depth
@@ -276,53 +389,63 @@ def _schemapath_for_cloudconfig(config, original_content):
else:
# Process non-list lines setting value if present
list_index = 0
- key, value = line.split(':', 1)
- if path_prefix:
+ key, value = line.split(":", 1)
+ if path_prefix and indent_depth > previous_depth:
# Append any existing path_prefix for a fully-pathed key
- key = path_prefix + '.' + key
+ key = path_prefix + "." + key
while indent_depth <= previous_depth:
if scopes:
previous_depth, path_prefix = scopes.pop()
if list_index > 0 and indent_depth == previous_depth:
- path_prefix = '.'.join(path_prefix.split('.')[:-1])
+ path_prefix = ".".join(path_prefix.split(".")[:-1])
break
else:
previous_depth = -1
- path_prefix = ''
+ path_prefix = ""
scopes.append((indent_depth, key))
if value:
value = value.strip()
- if value.startswith('['):
- scopes.append((indent_depth + 2, key + '.0'))
+ if value.startswith("["):
+ scopes.append((indent_depth + 2, key + ".0"))
for inner_list_index in range(0, len(yaml.safe_load(value))):
- list_key = key + '.' + str(inner_list_index)
+ list_key = key + "." + str(inner_list_index)
schema_line_numbers[list_key] = line_number
schema_line_numbers[key] = line_number
return schema_line_numbers
-def _get_property_type(property_dict):
- """Return a string representing a property type from a given jsonschema."""
- property_type = property_dict.get('type', SCHEMA_UNDEFINED)
- if property_type == SCHEMA_UNDEFINED and property_dict.get('enum'):
- property_type = [
- str(_YAML_MAP.get(k, k)) for k in property_dict['enum']]
+def _get_property_type(property_dict: dict) -> str:
+ """Return a string representing a property type from a given
+ jsonschema.
+ """
+ property_type = property_dict.get("type")
+ if property_type is None:
+ if property_dict.get("enum"):
+ property_type = [
+ str(_YAML_MAP.get(k, k)) for k in property_dict["enum"]
+ ]
+ elif property_dict.get("oneOf"):
+ property_type = [
+ subschema["type"]
+ for subschema in property_dict.get("oneOf")
+ if subschema.get("type")
+ ]
if isinstance(property_type, list):
- property_type = '/'.join(property_type)
- items = property_dict.get('items', {})
- sub_property_type = items.get('type', '')
+ property_type = "/".join(property_type)
+ items = property_dict.get("items", {})
+ sub_property_type = items.get("type", "")
# Collect each item type
- for sub_item in items.get('oneOf', {}):
+ for sub_item in items.get("oneOf", {}):
if sub_property_type:
- sub_property_type += '/'
- sub_property_type += '(' + _get_property_type(sub_item) + ')'
+ sub_property_type += "/"
+ sub_property_type += "(" + _get_property_type(sub_item) + ")"
if sub_property_type:
- return '{0} of {1}'.format(property_type, sub_property_type)
- return property_type
+ return "{0} of {1}".format(property_type, sub_property_type)
+ return property_type or "UNDEFINED"
-def _parse_description(description, prefix):
- """Parse description from the schema in a format that we can better
+def _parse_description(description, prefix) -> str:
+ """Parse description from the meta in a format that we can better
display in our docs. This parser does three things:
- Guarantee that a paragraph will be in a single line
@@ -330,125 +453,269 @@ def _parse_description(description, prefix):
the first paragraph
- Proper align lists of items
- @param description: The original description in the schema.
+ @param description: The original description in the meta.
@param prefix: The number of spaces used to align the current description
"""
list_paragraph = prefix * 3
description = re.sub(r"(\S)\n(\S)", r"\1 \2", description)
+ description = re.sub(r"\n\n", r"\n\n{}".format(prefix), description)
description = re.sub(
- r"\n\n", r"\n\n{}".format(prefix), description)
- description = re.sub(
- r"\n( +)-", r"\n{}-".format(list_paragraph), description)
+ r"\n( +)-", r"\n{}-".format(list_paragraph), description
+ )
return description
-def _get_property_doc(schema, prefix=' '):
+def _get_property_doc(schema: dict, defs: dict, prefix=" ") -> str:
"""Return restructured text describing the supported schema properties."""
- new_prefix = prefix + ' '
+ new_prefix = prefix + " "
properties = []
- for prop_key, prop_config in schema.get('properties', {}).items():
- # Define prop_name and dscription for SCHEMA_PROPERTY_TMPL
- description = prop_config.get('description', '')
-
- properties.append(SCHEMA_PROPERTY_TMPL.format(
- prefix=prefix,
- prop_name=prop_key,
- type=_get_property_type(prop_config),
- description=_parse_description(description, prefix)))
- items = prop_config.get('items')
- if items:
- if isinstance(items, list):
- for item in items:
- properties.append(
- _get_property_doc(item, prefix=new_prefix))
- elif isinstance(items, dict) and items.get('properties'):
- properties.append(SCHEMA_LIST_ITEM_TMPL.format(
- prefix=new_prefix, prop_name=prop_key))
- new_prefix += ' '
- properties.append(_get_property_doc(items, prefix=new_prefix))
- if 'properties' in prop_config:
+ property_keys = [
+ schema.get("properties", {}),
+ schema.get("patternProperties", {}),
+ ]
+
+ for props in property_keys:
+ for prop_key, prop_config in props.items():
+ if "$ref" in prop_config:
+ # Update the defined references in subschema for doc rendering
+ ref = defs[prop_config["$ref"].replace("#/$defs/", "")]
+ prop_config.update(ref)
+ # Define prop_name and description for SCHEMA_PROPERTY_TMPL
+ description = prop_config.get("description", "")
+
+ # Define prop_name and description for SCHEMA_PROPERTY_TMPL
+ label = prop_config.get("label", prop_key)
properties.append(
- _get_property_doc(prop_config, prefix=new_prefix))
- return '\n\n'.join(properties)
+ SCHEMA_PROPERTY_TMPL.format(
+ prefix=prefix,
+ prop_name=label,
+ description=_parse_description(description, prefix),
+ prop_type=_get_property_type(prop_config),
+ )
+ )
+ items = prop_config.get("items")
+ if items:
+ if isinstance(items, list):
+ for item in items:
+ properties.append(
+ _get_property_doc(
+ item, defs=defs, prefix=new_prefix
+ )
+ )
+ elif isinstance(items, dict) and (
+ items.get("properties") or items.get("patternProperties")
+ ):
+ properties.append(
+ SCHEMA_LIST_ITEM_TMPL.format(
+ prefix=new_prefix, prop_name=label
+ )
+ )
+ new_prefix += " "
+ properties.append(
+ _get_property_doc(items, defs=defs, prefix=new_prefix)
+ )
+ if (
+ "properties" in prop_config
+ or "patternProperties" in prop_config
+ ):
+ properties.append(
+ _get_property_doc(
+ prop_config, defs=defs, prefix=new_prefix
+ )
+ )
+ return "\n\n".join(properties)
-def _get_schema_examples(schema, prefix=''):
- """Return restructured text describing the schema examples if present."""
- examples = schema.get('examples')
+def _get_examples(meta: MetaSchema) -> str:
+ """Return restructured text describing the meta examples if present."""
+ examples = meta.get("examples")
if not examples:
- return ''
+ return ""
rst_content = SCHEMA_EXAMPLES_HEADER
for count, example in enumerate(examples):
# Python2.6 is missing textwrapper.indent
- lines = example.split('\n')
- indented_lines = [' {0}'.format(line) for line in lines]
+ lines = example.split("\n")
+ indented_lines = [" {0}".format(line) for line in lines]
if rst_content != SCHEMA_EXAMPLES_HEADER:
indented_lines.insert(
- 0, SCHEMA_EXAMPLES_SPACER_TEMPLATE.format(count + 1))
- rst_content += '\n'.join(indented_lines)
+ 0, SCHEMA_EXAMPLES_SPACER_TEMPLATE.format(count + 1)
+ )
+ rst_content += "\n".join(indented_lines)
return rst_content
-def get_schema_doc(schema):
- """Return reStructured text rendering the provided jsonschema.
+def get_meta_doc(meta: MetaSchema, schema: dict = None) -> str:
+ """Return reStructured text rendering the provided metadata.
- @param schema: Dict of jsonschema to render.
- @raise KeyError: If schema lacks an expected key.
+ @param meta: Dict of metadata to render.
+ @param schema: Optional module schema, if absent, read global schema.
+ @raise KeyError: If metadata lacks an expected key.
"""
- schema_copy = deepcopy(schema)
- schema_copy['property_doc'] = _get_property_doc(schema)
- schema_copy['examples'] = _get_schema_examples(schema)
- schema_copy['distros'] = ', '.join(schema['distros'])
+
+ if schema is None:
+ schema = get_schema()
+ if not meta or not schema:
+ raise ValueError("Expected non-empty meta and schema")
+ keys = set(meta.keys())
+ expected = set(
+ {
+ "id",
+ "title",
+ "examples",
+ "frequency",
+ "distros",
+ "description",
+ "name",
+ }
+ )
+ error_message = ""
+ if expected - keys:
+ error_message = "Missing expected keys in module meta: {}".format(
+ expected - keys
+ )
+ elif keys - expected:
+ error_message = (
+ "Additional unexpected keys found in module meta: {}".format(
+ keys - expected
+ )
+ )
+ if error_message:
+ raise KeyError(error_message)
+
+ # cast away type annotation
+ meta_copy = dict(deepcopy(meta))
+ defs = schema.get("$defs", {})
+ if defs.get(meta["id"]):
+ schema = defs.get(meta["id"])
+ try:
+ meta_copy["property_doc"] = _get_property_doc(schema, defs=defs)
+ except AttributeError:
+ LOG.warning("Unable to render property_doc due to invalid schema")
+ meta_copy["property_doc"] = ""
+ meta_copy["examples"] = _get_examples(meta)
+ meta_copy["distros"] = ", ".join(meta["distros"])
# Need an underbar of the same length as the name
- schema_copy['title_underbar'] = re.sub(r'.', '-', schema['name'])
- return SCHEMA_DOC_TMPL.format(**schema_copy)
+ meta_copy["title_underbar"] = re.sub(r".", "-", meta["name"])
+ template = SCHEMA_DOC_TMPL.format(**meta_copy)
+ return template
-FULL_SCHEMA = None
+def get_modules() -> dict:
+ configs_dir = os.path.dirname(os.path.abspath(__file__))
+ return find_modules(configs_dir)
-def get_schema():
- """Return jsonschema coalesced from all cc_* cloud-config module."""
- global FULL_SCHEMA
- if FULL_SCHEMA:
- return FULL_SCHEMA
- full_schema = {
- '$schema': 'http://json-schema.org/draft-04/schema#',
- 'id': 'cloud-config-schema', 'allOf': []}
+def load_doc(requested_modules: list) -> str:
+ """Load module docstrings
- configs_dir = os.path.dirname(os.path.abspath(__file__))
- potential_handlers = find_modules(configs_dir)
- for (_fname, mod_name) in potential_handlers.items():
- mod_locs, _looked_locs = importer.find_module(
- mod_name, ['cloudinit.config'], ['schema'])
+ Docstrings are generated on module load. Reduce, reuse, recycle.
+ """
+ docs = ""
+ all_modules = list(get_modules().values()) + ["all"]
+ invalid_docs = set(requested_modules).difference(set(all_modules))
+ if invalid_docs:
+ error(
+ "Invalid --docs value {}. Must be one of: {}".format(
+ list(invalid_docs),
+ ", ".join(all_modules),
+ )
+ )
+ for mod_name in all_modules:
+ if "all" in requested_modules or mod_name in requested_modules:
+ (mod_locs, _) = importer.find_module(
+ mod_name, ["cloudinit.config"], ["meta"]
+ )
+ if mod_locs:
+ mod = importer.import_module(mod_locs[0])
+ docs += mod.__doc__ or ""
+ return docs
+
+
+def get_schema() -> dict:
+ """Return jsonschema coalesced from all cc_* cloud-config modules."""
+ schema_file = os.path.join(
+ os.path.dirname(os.path.abspath(__file__)), "cloud-init-schema.json"
+ )
+ full_schema = None
+ try:
+ full_schema = json.loads(load_file(schema_file))
+ except Exception as e:
+ LOG.warning("Cannot parse JSON schema file %s. %s", schema_file, e)
+ if not full_schema:
+ LOG.warning(
+ "No base JSON schema files found at %s."
+ " Setting default empty schema",
+ schema_file,
+ )
+ full_schema = {
+ "$defs": {},
+ "$schema": "http://json-schema.org/draft-04/schema#",
+ "allOf": [],
+ }
+
+ # TODO( Drop the get_modules loop when all legacy cc_* schema migrates )
+ # Supplement base_schema with any legacy modules which still contain a
+ # "schema" attribute. Legacy cc_* modules will be migrated to use the
+ # store module schema in the composite cloud-init-schema-<version>.json
+ # and will drop "schema" at that point.
+ for (_, mod_name) in get_modules().items():
+ # All cc_* modules need a "meta" attribute to represent schema defs
+ (mod_locs, _) = importer.find_module(
+ mod_name, ["cloudinit.config"], ["schema"]
+ )
if mod_locs:
mod = importer.import_module(mod_locs[0])
- full_schema['allOf'].append(mod.schema)
- FULL_SCHEMA = full_schema
+ full_schema["allOf"].append(mod.schema)
return full_schema
-def error(message):
- print(message, file=sys.stderr)
- sys.exit(1)
+def get_meta() -> dict:
+ """Return metadata coalesced from all cc_* cloud-config module."""
+ full_meta = dict()
+ for (_, mod_name) in get_modules().items():
+ mod_locs, _ = importer.find_module(
+ mod_name, ["cloudinit.config"], ["meta"]
+ )
+ if mod_locs:
+ mod = importer.import_module(mod_locs[0])
+ full_meta[mod.meta["id"]] = mod.meta
+ return full_meta
def get_parser(parser=None):
"""Return a parser for supported cmdline arguments."""
if not parser:
parser = argparse.ArgumentParser(
- prog='cloudconfig-schema',
- description='Validate cloud-config files or document schema')
- parser.add_argument('-c', '--config-file',
- help='Path of the cloud-config yaml file to validate')
- parser.add_argument('--system', action='store_true', default=False,
- help='Validate the system cloud-config userdata')
- parser.add_argument('-d', '--docs', nargs='+',
- help=('Print schema module docs. Choices: all or'
- ' space-delimited cc_names.'))
- parser.add_argument('--annotate', action="store_true", default=False,
- help='Annotate existing cloud-config file with errors')
+ prog="cloudconfig-schema",
+ description="Validate cloud-config files or document schema",
+ )
+ parser.add_argument(
+ "-c",
+ "--config-file",
+ help="Path of the cloud-config yaml file to validate",
+ )
+ parser.add_argument(
+ "--system",
+ action="store_true",
+ default=False,
+ help="Validate the system cloud-config userdata",
+ )
+ parser.add_argument(
+ "-d",
+ "--docs",
+ nargs="+",
+ help=(
+ "Print schema module docs. Choices: all or"
+ " space-delimited cc_names."
+ ),
+ )
+ parser.add_argument(
+ "--annotate",
+ action="store_true",
+ default=False,
+ help="Annotate existing cloud-config file with errors",
+ )
return parser
@@ -456,12 +723,15 @@ def handle_schema_args(name, args):
"""Handle provided schema args and perform the appropriate actions."""
exclusive_args = [args.config_file, args.docs, args.system]
if len([arg for arg in exclusive_args if arg]) != 1:
- error('Expected one of --config-file, --system or --docs arguments')
+ error("Expected one of --config-file, --system or --docs arguments")
+ if args.annotate and args.docs:
+ error("Invalid flag combination. Cannot use --annotate with --docs")
full_schema = get_schema()
if args.config_file or args.system:
try:
validate_cloudconfig_file(
- args.config_file, full_schema, args.annotate)
+ args.config_file, full_schema, args.annotate
+ )
except SchemaValidationError as e:
if not args.annotate:
error(str(e))
@@ -474,25 +744,17 @@ def handle_schema_args(name, args):
cfg_name = args.config_file
print("Valid cloud-config:", cfg_name)
elif args.docs:
- schema_ids = [subschema['id'] for subschema in full_schema['allOf']]
- schema_ids += ['all']
- invalid_docs = set(args.docs).difference(set(schema_ids))
- if invalid_docs:
- error('Invalid --docs value {0}. Must be one of: {1}'.format(
- list(invalid_docs), ', '.join(schema_ids)))
- for subschema in full_schema['allOf']:
- if 'all' in args.docs or subschema['id'] in args.docs:
- print(get_schema_doc(subschema))
+ print(load_doc(args.docs))
def main():
"""Tool to validate schema of a cloud-config file or print schema docs."""
parser = get_parser()
- handle_schema_args('cloudconfig-schema', parser.parse_args())
+ handle_schema_args("cloudconfig-schema", parser.parse_args())
return 0
-if __name__ == '__main__':
+if __name__ == "__main__":
sys.exit(main())
# vi: ts=4 expandtab