diff options
Diffstat (limited to 'cloudinit/config/schema.py')
-rw-r--r-- | cloudinit/config/schema.py | 658 |
1 files changed, 460 insertions, 198 deletions
diff --git a/cloudinit/config/schema.py b/cloudinit/config/schema.py index 456bab2c..1f969c97 100644 --- a/cloudinit/config/schema.py +++ b/cloudinit/config/schema.py @@ -1,22 +1,28 @@ # This file is part of cloud-init. See LICENSE file for license information. """schema.py: Set of module functions for processing cloud-config schema.""" -from cloudinit.cmd.devel import read_cfg_paths -from cloudinit import importer -from cloudinit.util import find_modules, load_file - import argparse -from collections import defaultdict -from copy import deepcopy +import json import logging import os import re import sys +from collections import defaultdict +from copy import deepcopy +from functools import partial + import yaml -_YAML_MAP = {True: 'true', False: 'false', None: 'null'} -SCHEMA_UNDEFINED = b'UNDEFINED' -CLOUD_CONFIG_HEADER = b'#cloud-config' +from cloudinit import importer +from cloudinit.cmd.devel import read_cfg_paths +from cloudinit.importer import MetaSchema +from cloudinit.util import error, find_modules, load_file + +error = partial(error, sys_exit=True) +LOG = logging.getLogger(__name__) + +_YAML_MAP = {True: "true", False: "false", None: "null"} +CLOUD_CONFIG_HEADER = b"#cloud-config" SCHEMA_DOC_TMPL = """ {name} {title_underbar} @@ -34,11 +40,12 @@ SCHEMA_DOC_TMPL = """ {property_doc} {examples} """ -SCHEMA_PROPERTY_TMPL = '{prefix}**{prop_name}:** ({type}) {description}' +SCHEMA_PROPERTY_TMPL = "{prefix}**{prop_name}:** ({prop_type}) {description}" SCHEMA_LIST_ITEM_TMPL = ( - '{prefix}Each item in **{prop_name}** list supports the following keys:') -SCHEMA_EXAMPLES_HEADER = '\n**Examples**::\n\n' -SCHEMA_EXAMPLES_SPACER_TEMPLATE = '\n # --- Example{0} ---' + "{prefix}Each item in **{prop_name}** list supports the following keys:" +) +SCHEMA_EXAMPLES_HEADER = "\n**Examples**::\n\n" +SCHEMA_EXAMPLES_SPACER_TEMPLATE = "\n # --- Example{0} ---" class SchemaValidationError(ValueError): @@ -52,10 +59,12 @@ class SchemaValidationError(ValueError): """ self.schema_errors = schema_errors error_messages = [ - '{0}: {1}'.format(config_key, message) - for config_key, message in schema_errors] + "{0}: {1}".format(config_key, message) + for config_key, message in schema_errors + ] message = "Cloud config schema errors: {0}".format( - ', '.join(error_messages)) + ", ".join(error_messages) + ) super(SchemaValidationError, self).__init__(message) @@ -68,60 +77,142 @@ def is_schema_byte_string(checker, instance): from jsonschema import Draft4Validator except ImportError: return False - return (Draft4Validator.TYPE_CHECKER.is_type(instance, "string") or - isinstance(instance, (bytes,))) + return Draft4Validator.TYPE_CHECKER.is_type( + instance, "string" + ) or isinstance(instance, (bytes,)) + + +def get_jsonschema_validator(): + """Get metaschema validator and format checker + + Older versions of jsonschema require some compatibility changes. + + @returns: Tuple: (jsonschema.Validator, FormatChecker) + @raises: ImportError when jsonschema is not present + """ + from jsonschema import Draft4Validator, FormatChecker + from jsonschema.validators import create + + # Allow for bytes to be presented as an acceptable valid value for string + # type jsonschema attributes in cloud-init's schema. + # This allows #cloud-config to provide valid yaml "content: !!binary | ..." + + strict_metaschema = deepcopy(Draft4Validator.META_SCHEMA) + strict_metaschema["additionalProperties"] = False + # This additional label allows us to specify a different name + # than the property key when generating docs. + # This is especially useful when using a "patternProperties" regex, + # otherwise the property label in the generated docs will be a + # regular expression. + # http://json-schema.org/understanding-json-schema/reference/object.html#pattern-properties + strict_metaschema["properties"]["label"] = {"type": "string"} -def validate_cloudconfig_schema(config, schema, strict=False): + if hasattr(Draft4Validator, "TYPE_CHECKER"): # jsonschema 3.0+ + type_checker = Draft4Validator.TYPE_CHECKER.redefine( + "string", is_schema_byte_string + ) + cloudinitValidator = create( + meta_schema=strict_metaschema, + validators=Draft4Validator.VALIDATORS, + version="draft4", + type_checker=type_checker, + ) + else: # jsonschema 2.6 workaround + types = Draft4Validator.DEFAULT_TYPES # pylint: disable=E1101 + # Allow bytes as well as string (and disable a spurious unsupported + # assignment-operation pylint warning which appears because this + # code path isn't written against the latest jsonschema). + types["string"] = (str, bytes) # pylint: disable=E1137 + cloudinitValidator = create( # pylint: disable=E1123 + meta_schema=strict_metaschema, + validators=Draft4Validator.VALIDATORS, + version="draft4", + default_types=types, + ) + return (cloudinitValidator, FormatChecker) + + +def validate_cloudconfig_metaschema(validator, schema: dict, throw=True): + """Validate provided schema meets the metaschema definition. Return strict + Validator and FormatChecker for use in validation + @param validator: Draft4Validator instance used to validate the schema + @param schema: schema to validate + @param throw: Sometimes the validator and checker are required, even if + the schema is invalid. Toggle for whether to raise + SchemaValidationError or log warnings. + + @raises: ImportError when jsonschema is not present + @raises: SchemaValidationError when the schema is invalid + """ + + from jsonschema.exceptions import SchemaError + + try: + validator.check_schema(schema) + except SchemaError as err: + # Raise SchemaValidationError to avoid jsonschema imports at call + # sites + if throw: + raise SchemaValidationError( + schema_errors=( + (".".join([str(p) for p in err.path]), err.message), + ) + ) from err + LOG.warning( + "Meta-schema validation failed, attempting to validate config " + "anyway: %s", + err, + ) + + +def validate_cloudconfig_schema( + config: dict, + schema: dict = None, + strict: bool = False, + strict_metaschema: bool = False, +): """Validate provided config meets the schema definition. @param config: Dict of cloud configuration settings validated against - schema. + schema. Ignored if strict_metaschema=True @param schema: jsonschema dict describing the supported schema definition - for the cloud config module (config.cc_*). + for the cloud config module (config.cc_*). If None, validate against + global schema. @param strict: Boolean, when True raise SchemaValidationErrors instead of logging warnings. + @param strict_metaschema: Boolean, when True validates schema using strict + metaschema definition at runtime (currently unused) @raises: SchemaValidationError when provided config does not validate against the provided schema. + @raises: RuntimeError when provided config sourced from YAML is not a dict. """ + if schema is None: + schema = get_schema() try: - from jsonschema import Draft4Validator, FormatChecker - from jsonschema.validators import create, extend + (cloudinitValidator, FormatChecker) = get_jsonschema_validator() + if strict_metaschema: + validate_cloudconfig_metaschema( + cloudinitValidator, schema, throw=False + ) except ImportError: - logging.debug( - 'Ignoring schema validation. python-jsonschema is not present') + LOG.debug("Ignoring schema validation. jsonschema is not present") return - # Allow for bytes to be presented as an acceptable valid value for string - # type jsonschema attributes in cloud-init's schema. - # This allows #cloud-config to provide valid yaml "content: !!binary | ..." - if hasattr(Draft4Validator, 'TYPE_CHECKER'): # jsonschema 3.0+ - type_checker = Draft4Validator.TYPE_CHECKER.redefine( - 'string', is_schema_byte_string) - cloudinitValidator = extend(Draft4Validator, type_checker=type_checker) - else: # jsonschema 2.6 workaround - types = Draft4Validator.DEFAULT_TYPES - # Allow bytes as well as string (and disable a spurious - # unsupported-assignment-operation pylint warning which appears because - # this code path isn't written against the latest jsonschema). - types['string'] = (str, bytes) # pylint: disable=E1137 - cloudinitValidator = create( - meta_schema=Draft4Validator.META_SCHEMA, - validators=Draft4Validator.VALIDATORS, - version="draft4", - default_types=types) validator = cloudinitValidator(schema, format_checker=FormatChecker()) errors = () for error in sorted(validator.iter_errors(config), key=lambda e: e.path): - path = '.'.join([str(p) for p in error.path]) + path = ".".join([str(p) for p in error.path]) errors += ((path, error.message),) if errors: if strict: raise SchemaValidationError(errors) else: - messages = ['{0}: {1}'.format(k, msg) for k, msg in errors] - logging.warning('Invalid config:\n%s', '\n'.join(messages)) + messages = ["{0}: {1}".format(k, msg) for k, msg in errors] + LOG.warning( + "Invalid cloud-config provided:\n%s", "\n".join(messages) + ) def annotated_cloudconfig_file(cloudconfig, original_content, schema_errors): @@ -136,14 +227,23 @@ def annotated_cloudconfig_file(cloudconfig, original_content, schema_errors): if not schema_errors: return original_content schemapaths = {} - if cloudconfig: - schemapaths = _schemapath_for_cloudconfig( - cloudconfig, original_content) errors_by_line = defaultdict(list) error_footer = [] + error_header = "# Errors: -------------\n{0}\n\n" annotated_content = [] + lines = original_content.decode().split("\n") + if not isinstance(cloudconfig, dict): + # Return a meaningful message on empty cloud-config + return "\n".join( + lines + + [error_header.format("# E1: Cloud-config is not a YAML dict.")] + ) + if cloudconfig: + schemapaths = _schemapath_for_cloudconfig( + cloudconfig, original_content + ) for path, msg in schema_errors: - match = re.match(r'format-l(?P<line>\d+)\.c(?P<col>\d+).*', path) + match = re.match(r"format-l(?P<line>\d+)\.c(?P<col>\d+).*", path) if match: line, col = match.groups() errors_by_line[int(line)].append(msg) @@ -151,24 +251,24 @@ def annotated_cloudconfig_file(cloudconfig, original_content, schema_errors): col = None errors_by_line[schemapaths[path]].append(msg) if col is not None: - msg = 'Line {line} column {col}: {msg}'.format( - line=line, col=col, msg=msg) - lines = original_content.decode().split('\n') + msg = "Line {line} column {col}: {msg}".format( + line=line, col=col, msg=msg + ) error_index = 1 for line_number, line in enumerate(lines, 1): errors = errors_by_line[line_number] if errors: error_label = [] for error in errors: - error_label.append('E{0}'.format(error_index)) - error_footer.append('# E{0}: {1}'.format(error_index, error)) + error_label.append("E{0}".format(error_index)) + error_footer.append("# E{0}: {1}".format(error_index, error)) error_index += 1 - annotated_content.append(line + '\t\t# ' + ','.join(error_label)) + annotated_content.append(line + "\t\t# " + ",".join(error_label)) + else: annotated_content.append(line) - annotated_content.append( - '# Errors: -------------\n{0}\n\n'.format('\n'.join(error_footer))) - return '\n'.join(annotated_content) + annotated_content.append(error_header.format("\n".join(error_footer))) + return "\n".join(annotated_content) def validate_cloudconfig_file(config_path, schema, annotate=False): @@ -196,15 +296,18 @@ def validate_cloudconfig_file(config_path, schema, annotate=False): else: if not os.path.exists(config_path): raise RuntimeError( - 'Configfile {0} does not exist'.format( - config_path - ) + "Configfile {0} does not exist".format(config_path) ) content = load_file(config_path, decode=False) if not content.startswith(CLOUD_CONFIG_HEADER): errors = ( - ('format-l1.c1', 'File {0} needs to begin with "{1}"'.format( - config_path, CLOUD_CONFIG_HEADER.decode())),) + ( + "format-l1.c1", + 'File {0} needs to begin with "{1}"'.format( + config_path, CLOUD_CONFIG_HEADER.decode() + ), + ), + ) error = SchemaValidationError(errors) if annotate: print(annotated_cloudconfig_file({}, content, error.schema_errors)) @@ -214,27 +317,36 @@ def validate_cloudconfig_file(config_path, schema, annotate=False): except (yaml.YAMLError) as e: line = column = 1 mark = None - if hasattr(e, 'context_mark') and getattr(e, 'context_mark'): - mark = getattr(e, 'context_mark') - elif hasattr(e, 'problem_mark') and getattr(e, 'problem_mark'): - mark = getattr(e, 'problem_mark') + if hasattr(e, "context_mark") and getattr(e, "context_mark"): + mark = getattr(e, "context_mark") + elif hasattr(e, "problem_mark") and getattr(e, "problem_mark"): + mark = getattr(e, "problem_mark") if mark: line = mark.line + 1 column = mark.column + 1 - errors = (('format-l{line}.c{col}'.format(line=line, col=column), - 'File {0} is not valid yaml. {1}'.format( - config_path, str(e))),) + errors = ( + ( + "format-l{line}.c{col}".format(line=line, col=column), + "File {0} is not valid yaml. {1}".format(config_path, str(e)), + ), + ) error = SchemaValidationError(errors) if annotate: print(annotated_cloudconfig_file({}, content, error.schema_errors)) raise error from e + if not isinstance(cloudconfig, dict): + # Return a meaningful message on empty cloud-config + if not annotate: + raise RuntimeError("Cloud-config is not a YAML dict.") try: - validate_cloudconfig_schema( - cloudconfig, schema, strict=True) + validate_cloudconfig_schema(cloudconfig, schema, strict=True) except SchemaValidationError as e: if annotate: - print(annotated_cloudconfig_file( - cloudconfig, content, e.schema_errors)) + print( + annotated_cloudconfig_file( + cloudconfig, content, e.schema_errors + ) + ) raise @@ -244,29 +356,30 @@ def _schemapath_for_cloudconfig(config, original_content): @param config: The yaml.loaded config dictionary of a cloud-config file. @param original_content: The simple file content of the cloud-config file """ - # FIXME Doesn't handle multi-line lists or multi-line strings - content_lines = original_content.decode().split('\n') + # TODO( handle multi-line lists or multi-line strings, inline dicts) + content_lines = original_content.decode().split("\n") schema_line_numbers = {} list_index = 0 - RE_YAML_INDENT = r'^(\s*)' + RE_YAML_INDENT = r"^(\s*)" scopes = [] + if not config: + return {} # No YAML config dict, no schemapaths to annotate for line_number, line in enumerate(content_lines, 1): indent_depth = len(re.match(RE_YAML_INDENT, line).groups()[0]) line = line.strip() - if not line or line.startswith('#'): + if not line or line.startswith("#"): continue if scopes: previous_depth, path_prefix = scopes[-1] else: previous_depth = -1 - path_prefix = '' - if line.startswith('- '): + path_prefix = "" + if line.startswith("- "): # Process list items adding a list_index to the path prefix - previous_list_idx = '.%d' % (list_index - 1) + previous_list_idx = ".%d" % (list_index - 1) if path_prefix and path_prefix.endswith(previous_list_idx): - path_prefix = path_prefix[:-len(previous_list_idx)] + path_prefix = path_prefix[: -len(previous_list_idx)] key = str(list_index) - schema_line_numbers[key] = line_number item_indent = len(re.match(RE_YAML_INDENT, line[1:]).groups()[0]) item_indent += 1 # For the leading '-' character previous_depth = indent_depth @@ -276,53 +389,63 @@ def _schemapath_for_cloudconfig(config, original_content): else: # Process non-list lines setting value if present list_index = 0 - key, value = line.split(':', 1) - if path_prefix: + key, value = line.split(":", 1) + if path_prefix and indent_depth > previous_depth: # Append any existing path_prefix for a fully-pathed key - key = path_prefix + '.' + key + key = path_prefix + "." + key while indent_depth <= previous_depth: if scopes: previous_depth, path_prefix = scopes.pop() if list_index > 0 and indent_depth == previous_depth: - path_prefix = '.'.join(path_prefix.split('.')[:-1]) + path_prefix = ".".join(path_prefix.split(".")[:-1]) break else: previous_depth = -1 - path_prefix = '' + path_prefix = "" scopes.append((indent_depth, key)) if value: value = value.strip() - if value.startswith('['): - scopes.append((indent_depth + 2, key + '.0')) + if value.startswith("["): + scopes.append((indent_depth + 2, key + ".0")) for inner_list_index in range(0, len(yaml.safe_load(value))): - list_key = key + '.' + str(inner_list_index) + list_key = key + "." + str(inner_list_index) schema_line_numbers[list_key] = line_number schema_line_numbers[key] = line_number return schema_line_numbers -def _get_property_type(property_dict): - """Return a string representing a property type from a given jsonschema.""" - property_type = property_dict.get('type', SCHEMA_UNDEFINED) - if property_type == SCHEMA_UNDEFINED and property_dict.get('enum'): - property_type = [ - str(_YAML_MAP.get(k, k)) for k in property_dict['enum']] +def _get_property_type(property_dict: dict) -> str: + """Return a string representing a property type from a given + jsonschema. + """ + property_type = property_dict.get("type") + if property_type is None: + if property_dict.get("enum"): + property_type = [ + str(_YAML_MAP.get(k, k)) for k in property_dict["enum"] + ] + elif property_dict.get("oneOf"): + property_type = [ + subschema["type"] + for subschema in property_dict.get("oneOf") + if subschema.get("type") + ] if isinstance(property_type, list): - property_type = '/'.join(property_type) - items = property_dict.get('items', {}) - sub_property_type = items.get('type', '') + property_type = "/".join(property_type) + items = property_dict.get("items", {}) + sub_property_type = items.get("type", "") # Collect each item type - for sub_item in items.get('oneOf', {}): + for sub_item in items.get("oneOf", {}): if sub_property_type: - sub_property_type += '/' - sub_property_type += '(' + _get_property_type(sub_item) + ')' + sub_property_type += "/" + sub_property_type += "(" + _get_property_type(sub_item) + ")" if sub_property_type: - return '{0} of {1}'.format(property_type, sub_property_type) - return property_type + return "{0} of {1}".format(property_type, sub_property_type) + return property_type or "UNDEFINED" -def _parse_description(description, prefix): - """Parse description from the schema in a format that we can better +def _parse_description(description, prefix) -> str: + """Parse description from the meta in a format that we can better display in our docs. This parser does three things: - Guarantee that a paragraph will be in a single line @@ -330,125 +453,269 @@ def _parse_description(description, prefix): the first paragraph - Proper align lists of items - @param description: The original description in the schema. + @param description: The original description in the meta. @param prefix: The number of spaces used to align the current description """ list_paragraph = prefix * 3 description = re.sub(r"(\S)\n(\S)", r"\1 \2", description) + description = re.sub(r"\n\n", r"\n\n{}".format(prefix), description) description = re.sub( - r"\n\n", r"\n\n{}".format(prefix), description) - description = re.sub( - r"\n( +)-", r"\n{}-".format(list_paragraph), description) + r"\n( +)-", r"\n{}-".format(list_paragraph), description + ) return description -def _get_property_doc(schema, prefix=' '): +def _get_property_doc(schema: dict, defs: dict, prefix=" ") -> str: """Return restructured text describing the supported schema properties.""" - new_prefix = prefix + ' ' + new_prefix = prefix + " " properties = [] - for prop_key, prop_config in schema.get('properties', {}).items(): - # Define prop_name and dscription for SCHEMA_PROPERTY_TMPL - description = prop_config.get('description', '') - - properties.append(SCHEMA_PROPERTY_TMPL.format( - prefix=prefix, - prop_name=prop_key, - type=_get_property_type(prop_config), - description=_parse_description(description, prefix))) - items = prop_config.get('items') - if items: - if isinstance(items, list): - for item in items: - properties.append( - _get_property_doc(item, prefix=new_prefix)) - elif isinstance(items, dict) and items.get('properties'): - properties.append(SCHEMA_LIST_ITEM_TMPL.format( - prefix=new_prefix, prop_name=prop_key)) - new_prefix += ' ' - properties.append(_get_property_doc(items, prefix=new_prefix)) - if 'properties' in prop_config: + property_keys = [ + schema.get("properties", {}), + schema.get("patternProperties", {}), + ] + + for props in property_keys: + for prop_key, prop_config in props.items(): + if "$ref" in prop_config: + # Update the defined references in subschema for doc rendering + ref = defs[prop_config["$ref"].replace("#/$defs/", "")] + prop_config.update(ref) + # Define prop_name and description for SCHEMA_PROPERTY_TMPL + description = prop_config.get("description", "") + + # Define prop_name and description for SCHEMA_PROPERTY_TMPL + label = prop_config.get("label", prop_key) properties.append( - _get_property_doc(prop_config, prefix=new_prefix)) - return '\n\n'.join(properties) + SCHEMA_PROPERTY_TMPL.format( + prefix=prefix, + prop_name=label, + description=_parse_description(description, prefix), + prop_type=_get_property_type(prop_config), + ) + ) + items = prop_config.get("items") + if items: + if isinstance(items, list): + for item in items: + properties.append( + _get_property_doc( + item, defs=defs, prefix=new_prefix + ) + ) + elif isinstance(items, dict) and ( + items.get("properties") or items.get("patternProperties") + ): + properties.append( + SCHEMA_LIST_ITEM_TMPL.format( + prefix=new_prefix, prop_name=label + ) + ) + new_prefix += " " + properties.append( + _get_property_doc(items, defs=defs, prefix=new_prefix) + ) + if ( + "properties" in prop_config + or "patternProperties" in prop_config + ): + properties.append( + _get_property_doc( + prop_config, defs=defs, prefix=new_prefix + ) + ) + return "\n\n".join(properties) -def _get_schema_examples(schema, prefix=''): - """Return restructured text describing the schema examples if present.""" - examples = schema.get('examples') +def _get_examples(meta: MetaSchema) -> str: + """Return restructured text describing the meta examples if present.""" + examples = meta.get("examples") if not examples: - return '' + return "" rst_content = SCHEMA_EXAMPLES_HEADER for count, example in enumerate(examples): # Python2.6 is missing textwrapper.indent - lines = example.split('\n') - indented_lines = [' {0}'.format(line) for line in lines] + lines = example.split("\n") + indented_lines = [" {0}".format(line) for line in lines] if rst_content != SCHEMA_EXAMPLES_HEADER: indented_lines.insert( - 0, SCHEMA_EXAMPLES_SPACER_TEMPLATE.format(count + 1)) - rst_content += '\n'.join(indented_lines) + 0, SCHEMA_EXAMPLES_SPACER_TEMPLATE.format(count + 1) + ) + rst_content += "\n".join(indented_lines) return rst_content -def get_schema_doc(schema): - """Return reStructured text rendering the provided jsonschema. +def get_meta_doc(meta: MetaSchema, schema: dict = None) -> str: + """Return reStructured text rendering the provided metadata. - @param schema: Dict of jsonschema to render. - @raise KeyError: If schema lacks an expected key. + @param meta: Dict of metadata to render. + @param schema: Optional module schema, if absent, read global schema. + @raise KeyError: If metadata lacks an expected key. """ - schema_copy = deepcopy(schema) - schema_copy['property_doc'] = _get_property_doc(schema) - schema_copy['examples'] = _get_schema_examples(schema) - schema_copy['distros'] = ', '.join(schema['distros']) + + if schema is None: + schema = get_schema() + if not meta or not schema: + raise ValueError("Expected non-empty meta and schema") + keys = set(meta.keys()) + expected = set( + { + "id", + "title", + "examples", + "frequency", + "distros", + "description", + "name", + } + ) + error_message = "" + if expected - keys: + error_message = "Missing expected keys in module meta: {}".format( + expected - keys + ) + elif keys - expected: + error_message = ( + "Additional unexpected keys found in module meta: {}".format( + keys - expected + ) + ) + if error_message: + raise KeyError(error_message) + + # cast away type annotation + meta_copy = dict(deepcopy(meta)) + defs = schema.get("$defs", {}) + if defs.get(meta["id"]): + schema = defs.get(meta["id"]) + try: + meta_copy["property_doc"] = _get_property_doc(schema, defs=defs) + except AttributeError: + LOG.warning("Unable to render property_doc due to invalid schema") + meta_copy["property_doc"] = "" + meta_copy["examples"] = _get_examples(meta) + meta_copy["distros"] = ", ".join(meta["distros"]) # Need an underbar of the same length as the name - schema_copy['title_underbar'] = re.sub(r'.', '-', schema['name']) - return SCHEMA_DOC_TMPL.format(**schema_copy) + meta_copy["title_underbar"] = re.sub(r".", "-", meta["name"]) + template = SCHEMA_DOC_TMPL.format(**meta_copy) + return template -FULL_SCHEMA = None +def get_modules() -> dict: + configs_dir = os.path.dirname(os.path.abspath(__file__)) + return find_modules(configs_dir) -def get_schema(): - """Return jsonschema coalesced from all cc_* cloud-config module.""" - global FULL_SCHEMA - if FULL_SCHEMA: - return FULL_SCHEMA - full_schema = { - '$schema': 'http://json-schema.org/draft-04/schema#', - 'id': 'cloud-config-schema', 'allOf': []} +def load_doc(requested_modules: list) -> str: + """Load module docstrings - configs_dir = os.path.dirname(os.path.abspath(__file__)) - potential_handlers = find_modules(configs_dir) - for (_fname, mod_name) in potential_handlers.items(): - mod_locs, _looked_locs = importer.find_module( - mod_name, ['cloudinit.config'], ['schema']) + Docstrings are generated on module load. Reduce, reuse, recycle. + """ + docs = "" + all_modules = list(get_modules().values()) + ["all"] + invalid_docs = set(requested_modules).difference(set(all_modules)) + if invalid_docs: + error( + "Invalid --docs value {}. Must be one of: {}".format( + list(invalid_docs), + ", ".join(all_modules), + ) + ) + for mod_name in all_modules: + if "all" in requested_modules or mod_name in requested_modules: + (mod_locs, _) = importer.find_module( + mod_name, ["cloudinit.config"], ["meta"] + ) + if mod_locs: + mod = importer.import_module(mod_locs[0]) + docs += mod.__doc__ or "" + return docs + + +def get_schema() -> dict: + """Return jsonschema coalesced from all cc_* cloud-config modules.""" + schema_file = os.path.join( + os.path.dirname(os.path.abspath(__file__)), "cloud-init-schema.json" + ) + full_schema = None + try: + full_schema = json.loads(load_file(schema_file)) + except Exception as e: + LOG.warning("Cannot parse JSON schema file %s. %s", schema_file, e) + if not full_schema: + LOG.warning( + "No base JSON schema files found at %s." + " Setting default empty schema", + schema_file, + ) + full_schema = { + "$defs": {}, + "$schema": "http://json-schema.org/draft-04/schema#", + "allOf": [], + } + + # TODO( Drop the get_modules loop when all legacy cc_* schema migrates ) + # Supplement base_schema with any legacy modules which still contain a + # "schema" attribute. Legacy cc_* modules will be migrated to use the + # store module schema in the composite cloud-init-schema-<version>.json + # and will drop "schema" at that point. + for (_, mod_name) in get_modules().items(): + # All cc_* modules need a "meta" attribute to represent schema defs + (mod_locs, _) = importer.find_module( + mod_name, ["cloudinit.config"], ["schema"] + ) if mod_locs: mod = importer.import_module(mod_locs[0]) - full_schema['allOf'].append(mod.schema) - FULL_SCHEMA = full_schema + full_schema["allOf"].append(mod.schema) return full_schema -def error(message): - print(message, file=sys.stderr) - sys.exit(1) +def get_meta() -> dict: + """Return metadata coalesced from all cc_* cloud-config module.""" + full_meta = dict() + for (_, mod_name) in get_modules().items(): + mod_locs, _ = importer.find_module( + mod_name, ["cloudinit.config"], ["meta"] + ) + if mod_locs: + mod = importer.import_module(mod_locs[0]) + full_meta[mod.meta["id"]] = mod.meta + return full_meta def get_parser(parser=None): """Return a parser for supported cmdline arguments.""" if not parser: parser = argparse.ArgumentParser( - prog='cloudconfig-schema', - description='Validate cloud-config files or document schema') - parser.add_argument('-c', '--config-file', - help='Path of the cloud-config yaml file to validate') - parser.add_argument('--system', action='store_true', default=False, - help='Validate the system cloud-config userdata') - parser.add_argument('-d', '--docs', nargs='+', - help=('Print schema module docs. Choices: all or' - ' space-delimited cc_names.')) - parser.add_argument('--annotate', action="store_true", default=False, - help='Annotate existing cloud-config file with errors') + prog="cloudconfig-schema", + description="Validate cloud-config files or document schema", + ) + parser.add_argument( + "-c", + "--config-file", + help="Path of the cloud-config yaml file to validate", + ) + parser.add_argument( + "--system", + action="store_true", + default=False, + help="Validate the system cloud-config userdata", + ) + parser.add_argument( + "-d", + "--docs", + nargs="+", + help=( + "Print schema module docs. Choices: all or" + " space-delimited cc_names." + ), + ) + parser.add_argument( + "--annotate", + action="store_true", + default=False, + help="Annotate existing cloud-config file with errors", + ) return parser @@ -456,12 +723,15 @@ def handle_schema_args(name, args): """Handle provided schema args and perform the appropriate actions.""" exclusive_args = [args.config_file, args.docs, args.system] if len([arg for arg in exclusive_args if arg]) != 1: - error('Expected one of --config-file, --system or --docs arguments') + error("Expected one of --config-file, --system or --docs arguments") + if args.annotate and args.docs: + error("Invalid flag combination. Cannot use --annotate with --docs") full_schema = get_schema() if args.config_file or args.system: try: validate_cloudconfig_file( - args.config_file, full_schema, args.annotate) + args.config_file, full_schema, args.annotate + ) except SchemaValidationError as e: if not args.annotate: error(str(e)) @@ -474,25 +744,17 @@ def handle_schema_args(name, args): cfg_name = args.config_file print("Valid cloud-config:", cfg_name) elif args.docs: - schema_ids = [subschema['id'] for subschema in full_schema['allOf']] - schema_ids += ['all'] - invalid_docs = set(args.docs).difference(set(schema_ids)) - if invalid_docs: - error('Invalid --docs value {0}. Must be one of: {1}'.format( - list(invalid_docs), ', '.join(schema_ids))) - for subschema in full_schema['allOf']: - if 'all' in args.docs or subschema['id'] in args.docs: - print(get_schema_doc(subschema)) + print(load_doc(args.docs)) def main(): """Tool to validate schema of a cloud-config file or print schema docs.""" parser = get_parser() - handle_schema_args('cloudconfig-schema', parser.parse_args()) + handle_schema_args("cloudconfig-schema", parser.parse_args()) return 0 -if __name__ == '__main__': +if __name__ == "__main__": sys.exit(main()) # vi: ts=4 expandtab |