summaryrefslogtreecommitdiff
path: root/cloudinit/config/schema.py
diff options
context:
space:
mode:
Diffstat (limited to 'cloudinit/config/schema.py')
-rw-r--r--cloudinit/config/schema.py288
1 files changed, 205 insertions, 83 deletions
diff --git a/cloudinit/config/schema.py b/cloudinit/config/schema.py
index 456bab2c..d32b7c01 100644
--- a/cloudinit/config/schema.py
+++ b/cloudinit/config/schema.py
@@ -3,19 +3,22 @@
from cloudinit.cmd.devel import read_cfg_paths
from cloudinit import importer
-from cloudinit.util import find_modules, load_file
+from cloudinit.importer import MetaSchema
+from cloudinit.util import find_modules, load_file, error
import argparse
from collections import defaultdict
from copy import deepcopy
+from functools import partial
import logging
import os
import re
import sys
import yaml
+error = partial(error, sys_exit=True)
+
_YAML_MAP = {True: 'true', False: 'false', None: 'null'}
-SCHEMA_UNDEFINED = b'UNDEFINED'
CLOUD_CONFIG_HEADER = b'#cloud-config'
SCHEMA_DOC_TMPL = """
{name}
@@ -34,7 +37,7 @@ SCHEMA_DOC_TMPL = """
{property_doc}
{examples}
"""
-SCHEMA_PROPERTY_TMPL = '{prefix}**{prop_name}:** ({type}) {description}'
+SCHEMA_PROPERTY_TMPL = "{prefix}**{prop_name}:** ({prop_type}) {description}"
SCHEMA_LIST_ITEM_TMPL = (
'{prefix}Each item in **{prop_name}** list supports the following keys:')
SCHEMA_EXAMPLES_HEADER = '\n**Examples**::\n\n'
@@ -72,45 +75,102 @@ def is_schema_byte_string(checker, instance):
isinstance(instance, (bytes,)))
-def validate_cloudconfig_schema(config, schema, strict=False):
- """Validate provided config meets the schema definition.
+def get_jsonschema_validator():
+ """Get metaschema validator and format checker
- @param config: Dict of cloud configuration settings validated against
- schema.
- @param schema: jsonschema dict describing the supported schema definition
- for the cloud config module (config.cc_*).
- @param strict: Boolean, when True raise SchemaValidationErrors instead of
- logging warnings.
+ Older versions of jsonschema require some compatibility changes.
- @raises: SchemaValidationError when provided config does not validate
- against the provided schema.
+ @returns: Tuple: (jsonschema.Validator, FormatChecker)
+ @raises: ImportError when jsonschema is not present
"""
- try:
- from jsonschema import Draft4Validator, FormatChecker
- from jsonschema.validators import create, extend
- except ImportError:
- logging.debug(
- 'Ignoring schema validation. python-jsonschema is not present')
- return
+ from jsonschema import Draft4Validator, FormatChecker
+ from jsonschema.validators import create
# Allow for bytes to be presented as an acceptable valid value for string
# type jsonschema attributes in cloud-init's schema.
# This allows #cloud-config to provide valid yaml "content: !!binary | ..."
+
+ strict_metaschema = deepcopy(Draft4Validator.META_SCHEMA)
+ strict_metaschema['additionalProperties'] = False
if hasattr(Draft4Validator, 'TYPE_CHECKER'): # jsonschema 3.0+
type_checker = Draft4Validator.TYPE_CHECKER.redefine(
'string', is_schema_byte_string)
- cloudinitValidator = extend(Draft4Validator, type_checker=type_checker)
+ cloudinitValidator = create(
+ meta_schema=strict_metaschema,
+ validators=Draft4Validator.VALIDATORS,
+ version="draft4",
+ type_checker=type_checker)
else: # jsonschema 2.6 workaround
types = Draft4Validator.DEFAULT_TYPES
- # Allow bytes as well as string (and disable a spurious
- # unsupported-assignment-operation pylint warning which appears because
- # this code path isn't written against the latest jsonschema).
+ # Allow bytes as well as string (and disable a spurious unsupported
+ # assignment-operation pylint warning which appears because this
+ # code path isn't written against the latest jsonschema).
types['string'] = (str, bytes) # pylint: disable=E1137
cloudinitValidator = create(
- meta_schema=Draft4Validator.META_SCHEMA,
+ meta_schema=strict_metaschema,
validators=Draft4Validator.VALIDATORS,
version="draft4",
default_types=types)
+ return (cloudinitValidator, FormatChecker)
+
+
+def validate_cloudconfig_metaschema(validator, schema: dict, throw=True):
+ """Validate provided schema meets the metaschema definition. Return strict
+ Validator and FormatChecker for use in validation
+ @param validator: Draft4Validator instance used to validate the schema
+ @param schema: schema to validate
+ @param throw: Sometimes the validator and checker are required, even if
+ the schema is invalid. Toggle for whether to raise
+ SchemaValidationError or log warnings.
+
+ @raises: ImportError when jsonschema is not present
+ @raises: SchemaValidationError when the schema is invalid
+ """
+
+ from jsonschema.exceptions import SchemaError
+
+ try:
+ validator.check_schema(schema)
+ except SchemaError as err:
+ # Raise SchemaValidationError to avoid jsonschema imports at call
+ # sites
+ if throw:
+ raise SchemaValidationError(
+ schema_errors=(
+ ('.'.join([str(p) for p in err.path]), err.message),
+ )
+ ) from err
+ logging.warning(
+ "Meta-schema validation failed, attempting to validate config "
+ "anyway: %s", err)
+
+
+def validate_cloudconfig_schema(
+ config: dict, schema: dict, strict=False, strict_metaschema=False
+):
+ """Validate provided config meets the schema definition.
+
+ @param config: Dict of cloud configuration settings validated against
+ schema. Ignored if strict_metaschema=True
+ @param schema: jsonschema dict describing the supported schema definition
+ for the cloud config module (config.cc_*).
+ @param strict: Boolean, when True raise SchemaValidationErrors instead of
+ logging warnings.
+ @param strict_metaschema: Boolean, when True validates schema using strict
+ metaschema definition at runtime (currently unused)
+
+ @raises: SchemaValidationError when provided config does not validate
+ against the provided schema.
+ """
+ try:
+ (cloudinitValidator, FormatChecker) = get_jsonschema_validator()
+ if strict_metaschema:
+ validate_cloudconfig_metaschema(
+ cloudinitValidator, schema, throw=False)
+ except ImportError:
+ logging.debug("Ignoring schema validation. jsonschema is not present")
+ return
+
validator = cloudinitValidator(schema, format_checker=FormatChecker())
errors = ()
for error in sorted(validator.iter_errors(config), key=lambda e: e.path):
@@ -301,12 +361,15 @@ def _schemapath_for_cloudconfig(config, original_content):
return schema_line_numbers
-def _get_property_type(property_dict):
- """Return a string representing a property type from a given jsonschema."""
- property_type = property_dict.get('type', SCHEMA_UNDEFINED)
- if property_type == SCHEMA_UNDEFINED and property_dict.get('enum'):
+def _get_property_type(property_dict: dict) -> str:
+ """Return a string representing a property type from a given
+ jsonschema.
+ """
+ property_type = property_dict.get("type")
+ if property_type is None and property_dict.get("enum"):
property_type = [
- str(_YAML_MAP.get(k, k)) for k in property_dict['enum']]
+ str(_YAML_MAP.get(k, k)) for k in property_dict["enum"]
+ ]
if isinstance(property_type, list):
property_type = '/'.join(property_type)
items = property_dict.get('items', {})
@@ -317,12 +380,12 @@ def _get_property_type(property_dict):
sub_property_type += '/'
sub_property_type += '(' + _get_property_type(sub_item) + ')'
if sub_property_type:
- return '{0} of {1}'.format(property_type, sub_property_type)
- return property_type
+ return "{0} of {1}".format(property_type, sub_property_type)
+ return property_type or "UNDEFINED"
-def _parse_description(description, prefix):
- """Parse description from the schema in a format that we can better
+def _parse_description(description, prefix) -> str:
+ """Parse description from the meta in a format that we can better
display in our docs. This parser does three things:
- Guarantee that a paragraph will be in a single line
@@ -330,7 +393,7 @@ def _parse_description(description, prefix):
the first paragraph
- Proper align lists of items
- @param description: The original description in the schema.
+ @param description: The original description in the meta.
@param prefix: The number of spaces used to align the current description
"""
list_paragraph = prefix * 3
@@ -343,20 +406,24 @@ def _parse_description(description, prefix):
return description
-def _get_property_doc(schema, prefix=' '):
+def _get_property_doc(schema: dict, prefix=" ") -> str:
"""Return restructured text describing the supported schema properties."""
new_prefix = prefix + ' '
properties = []
for prop_key, prop_config in schema.get('properties', {}).items():
- # Define prop_name and dscription for SCHEMA_PROPERTY_TMPL
+ # Define prop_name and description for SCHEMA_PROPERTY_TMPL
description = prop_config.get('description', '')
- properties.append(SCHEMA_PROPERTY_TMPL.format(
- prefix=prefix,
- prop_name=prop_key,
- type=_get_property_type(prop_config),
- description=_parse_description(description, prefix)))
- items = prop_config.get('items')
+ # Define prop_name and description for SCHEMA_PROPERTY_TMPL
+ properties.append(
+ SCHEMA_PROPERTY_TMPL.format(
+ prefix=prefix,
+ prop_name=prop_key,
+ description=_parse_description(description, prefix),
+ prop_type=_get_property_type(prop_config),
+ )
+ )
+ items = prop_config.get("items")
if items:
if isinstance(items, list):
for item in items:
@@ -373,9 +440,9 @@ def _get_property_doc(schema, prefix=' '):
return '\n\n'.join(properties)
-def _get_schema_examples(schema, prefix=''):
- """Return restructured text describing the schema examples if present."""
- examples = schema.get('examples')
+def _get_examples(meta: MetaSchema) -> str:
+ """Return restructured text describing the meta examples if present."""
+ examples = meta.get("examples")
if not examples:
return ''
rst_content = SCHEMA_EXAMPLES_HEADER
@@ -390,48 +457,111 @@ def _get_schema_examples(schema, prefix=''):
return rst_content
-def get_schema_doc(schema):
- """Return reStructured text rendering the provided jsonschema.
+def get_meta_doc(meta: MetaSchema, schema: dict) -> str:
+ """Return reStructured text rendering the provided metadata.
- @param schema: Dict of jsonschema to render.
- @raise KeyError: If schema lacks an expected key.
+ @param meta: Dict of metadata to render.
+ @raise KeyError: If metadata lacks an expected key.
"""
- schema_copy = deepcopy(schema)
- schema_copy['property_doc'] = _get_property_doc(schema)
- schema_copy['examples'] = _get_schema_examples(schema)
- schema_copy['distros'] = ', '.join(schema['distros'])
+
+ if not meta or not schema:
+ raise ValueError("Expected meta and schema")
+ keys = set(meta.keys())
+ expected = set(
+ {
+ "id",
+ "title",
+ "examples",
+ "frequency",
+ "distros",
+ "description",
+ "name",
+ }
+ )
+ error_message = ""
+ if expected - keys:
+ error_message = "Missing expected keys in module meta: {}".format(
+ expected - keys
+ )
+ elif keys - expected:
+ error_message = (
+ "Additional unexpected keys found in module meta: {}".format(
+ keys - expected
+ )
+ )
+ if error_message:
+ raise KeyError(error_message)
+
+ # cast away type annotation
+ meta_copy = dict(deepcopy(meta))
+ meta_copy["property_doc"] = _get_property_doc(schema)
+ meta_copy["examples"] = _get_examples(meta)
+ meta_copy["distros"] = ", ".join(meta["distros"])
# Need an underbar of the same length as the name
- schema_copy['title_underbar'] = re.sub(r'.', '-', schema['name'])
- return SCHEMA_DOC_TMPL.format(**schema_copy)
+ meta_copy["title_underbar"] = re.sub(r".", "-", meta["name"])
+ template = SCHEMA_DOC_TMPL.format(**meta_copy)
+ return template
+
+
+def get_modules() -> dict:
+ configs_dir = os.path.dirname(os.path.abspath(__file__))
+ return find_modules(configs_dir)
+
+def load_doc(requested_modules: list) -> str:
+ """Load module docstrings
-FULL_SCHEMA = None
+ Docstrings are generated on module load. Reduce, reuse, recycle.
+ """
+ docs = ""
+ all_modules = list(get_modules().values()) + ["all"]
+ invalid_docs = set(requested_modules).difference(set(all_modules))
+ if invalid_docs:
+ error(
+ "Invalid --docs value {}. Must be one of: {}".format(
+ list(invalid_docs), ", ".join(all_modules),
+ )
+ )
+ for mod_name in all_modules:
+ if "all" in requested_modules or mod_name in requested_modules:
+ (mod_locs, _) = importer.find_module(
+ mod_name, ["cloudinit.config"], ["schema"]
+ )
+ if mod_locs:
+ mod = importer.import_module(mod_locs[0])
+ docs += mod.__doc__ or ""
+ return docs
-def get_schema():
+def get_schema() -> dict:
"""Return jsonschema coalesced from all cc_* cloud-config module."""
- global FULL_SCHEMA
- if FULL_SCHEMA:
- return FULL_SCHEMA
full_schema = {
- '$schema': 'http://json-schema.org/draft-04/schema#',
- 'id': 'cloud-config-schema', 'allOf': []}
-
- configs_dir = os.path.dirname(os.path.abspath(__file__))
- potential_handlers = find_modules(configs_dir)
- for (_fname, mod_name) in potential_handlers.items():
- mod_locs, _looked_locs = importer.find_module(
- mod_name, ['cloudinit.config'], ['schema'])
+ "$schema": "http://json-schema.org/draft-04/schema#",
+ "id": "cloud-config-schema",
+ "allOf": [],
+ }
+
+ for (_, mod_name) in get_modules().items():
+ (mod_locs, _) = importer.find_module(
+ mod_name, ["cloudinit.config"], ["schema"]
+ )
if mod_locs:
mod = importer.import_module(mod_locs[0])
- full_schema['allOf'].append(mod.schema)
- FULL_SCHEMA = full_schema
+ full_schema["allOf"].append(mod.schema)
return full_schema
-def error(message):
- print(message, file=sys.stderr)
- sys.exit(1)
+def get_meta() -> dict:
+ """Return metadata coalesced from all cc_* cloud-config module."""
+ full_meta = dict()
+ for (_, mod_name) in get_modules().items():
+ mod_locs, _ = importer.find_module(
+ mod_name, ["cloudinit.config"], ["meta"]
+ )
+ if mod_locs:
+ mod = importer.import_module(mod_locs[0])
+ full_meta[mod.meta["id"]] = mod.meta
+ return full_meta
def get_parser(parser=None):
@@ -474,15 +604,7 @@ def handle_schema_args(name, args):
cfg_name = args.config_file
print("Valid cloud-config:", cfg_name)
elif args.docs:
- schema_ids = [subschema['id'] for subschema in full_schema['allOf']]
- schema_ids += ['all']
- invalid_docs = set(args.docs).difference(set(schema_ids))
- if invalid_docs:
- error('Invalid --docs value {0}. Must be one of: {1}'.format(
- list(invalid_docs), ', '.join(schema_ids)))
- for subschema in full_schema['allOf']:
- if 'all' in args.docs or subschema['id'] in args.docs:
- print(get_schema_doc(subschema))
+ print(load_doc(args.docs))
def main():