diff options
author | James Falcon <james.falcon@canonical.com> | 2021-12-15 20:16:38 -0600 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-12-15 19:16:38 -0700 |
commit | bae9b11da9ed7dd0b16fe5adeaf4774b7cc628cf (patch) | |
tree | 1fbb3269fc87e39832e3286ef42eefd2b23fcd44 /cloudinit/config/schema.py | |
parent | 2bcf4fa972fde686c2e3141c58e640640b44dd00 (diff) | |
download | vyos-cloud-init-bae9b11da9ed7dd0b16fe5adeaf4774b7cc628cf.tar.gz vyos-cloud-init-bae9b11da9ed7dd0b16fe5adeaf4774b7cc628cf.zip |
Adopt Black and isort (SC-700) (#1157)
Applied Black and isort, fixed any linting issues, updated tox.ini
and CI.
Diffstat (limited to 'cloudinit/config/schema.py')
-rw-r--r-- | cloudinit/config/schema.py | 239 |
1 files changed, 141 insertions, 98 deletions
diff --git a/cloudinit/config/schema.py b/cloudinit/config/schema.py index d772b4f9..8ec4ab6a 100644 --- a/cloudinit/config/schema.py +++ b/cloudinit/config/schema.py @@ -1,26 +1,27 @@ # This file is part of cloud-init. See LICENSE file for license information. """schema.py: Set of module functions for processing cloud-config schema.""" -from cloudinit.cmd.devel import read_cfg_paths -from cloudinit import importer -from cloudinit.importer import MetaSchema -from cloudinit.util import find_modules, load_file, error - import argparse -from collections import defaultdict -from copy import deepcopy -from functools import partial import logging import os import re import sys +from collections import defaultdict +from copy import deepcopy +from functools import partial + import yaml +from cloudinit import importer +from cloudinit.cmd.devel import read_cfg_paths +from cloudinit.importer import MetaSchema +from cloudinit.util import error, find_modules, load_file + error = partial(error, sys_exit=True) LOG = logging.getLogger(__name__) -_YAML_MAP = {True: 'true', False: 'false', None: 'null'} -CLOUD_CONFIG_HEADER = b'#cloud-config' +_YAML_MAP = {True: "true", False: "false", None: "null"} +CLOUD_CONFIG_HEADER = b"#cloud-config" SCHEMA_DOC_TMPL = """ {name} {title_underbar} @@ -40,9 +41,10 @@ SCHEMA_DOC_TMPL = """ """ SCHEMA_PROPERTY_TMPL = "{prefix}**{prop_name}:** ({prop_type}) {description}" SCHEMA_LIST_ITEM_TMPL = ( - '{prefix}Each item in **{prop_name}** list supports the following keys:') -SCHEMA_EXAMPLES_HEADER = '\n**Examples**::\n\n' -SCHEMA_EXAMPLES_SPACER_TEMPLATE = '\n # --- Example{0} ---' + "{prefix}Each item in **{prop_name}** list supports the following keys:" +) +SCHEMA_EXAMPLES_HEADER = "\n**Examples**::\n\n" +SCHEMA_EXAMPLES_SPACER_TEMPLATE = "\n # --- Example{0} ---" class SchemaValidationError(ValueError): @@ -56,10 +58,12 @@ class SchemaValidationError(ValueError): """ self.schema_errors = schema_errors error_messages = [ - '{0}: {1}'.format(config_key, message) - for config_key, message in schema_errors] + "{0}: {1}".format(config_key, message) + for config_key, message in schema_errors + ] message = "Cloud config schema errors: {0}".format( - ', '.join(error_messages)) + ", ".join(error_messages) + ) super(SchemaValidationError, self).__init__(message) @@ -72,8 +76,9 @@ def is_schema_byte_string(checker, instance): from jsonschema import Draft4Validator except ImportError: return False - return (Draft4Validator.TYPE_CHECKER.is_type(instance, "string") or - isinstance(instance, (bytes,))) + return Draft4Validator.TYPE_CHECKER.is_type( + instance, "string" + ) or isinstance(instance, (bytes,)) def get_jsonschema_validator(): @@ -102,25 +107,28 @@ def get_jsonschema_validator(): # http://json-schema.org/understanding-json-schema/reference/object.html#pattern-properties strict_metaschema["properties"]["label"] = {"type": "string"} - if hasattr(Draft4Validator, 'TYPE_CHECKER'): # jsonschema 3.0+ + if hasattr(Draft4Validator, "TYPE_CHECKER"): # jsonschema 3.0+ type_checker = Draft4Validator.TYPE_CHECKER.redefine( - 'string', is_schema_byte_string) + "string", is_schema_byte_string + ) cloudinitValidator = create( meta_schema=strict_metaschema, validators=Draft4Validator.VALIDATORS, version="draft4", - type_checker=type_checker) + type_checker=type_checker, + ) else: # jsonschema 2.6 workaround types = Draft4Validator.DEFAULT_TYPES # Allow bytes as well as string (and disable a spurious unsupported # assignment-operation pylint warning which appears because this # code path isn't written against the latest jsonschema). - types['string'] = (str, bytes) # pylint: disable=E1137 + types["string"] = (str, bytes) # pylint: disable=E1137 cloudinitValidator = create( meta_schema=strict_metaschema, validators=Draft4Validator.VALIDATORS, version="draft4", - default_types=types) + default_types=types, + ) return (cloudinitValidator, FormatChecker) @@ -147,12 +155,14 @@ def validate_cloudconfig_metaschema(validator, schema: dict, throw=True): if throw: raise SchemaValidationError( schema_errors=( - ('.'.join([str(p) for p in err.path]), err.message), + (".".join([str(p) for p in err.path]), err.message), ) ) from err LOG.warning( "Meta-schema validation failed, attempting to validate config " - "anyway: %s", err) + "anyway: %s", + err, + ) def validate_cloudconfig_schema( @@ -176,7 +186,8 @@ def validate_cloudconfig_schema( (cloudinitValidator, FormatChecker) = get_jsonschema_validator() if strict_metaschema: validate_cloudconfig_metaschema( - cloudinitValidator, schema, throw=False) + cloudinitValidator, schema, throw=False + ) except ImportError: LOG.debug("Ignoring schema validation. jsonschema is not present") return @@ -184,7 +195,7 @@ def validate_cloudconfig_schema( validator = cloudinitValidator(schema, format_checker=FormatChecker()) errors = () for error in sorted(validator.iter_errors(config), key=lambda e: e.path): - path = '.'.join([str(p) for p in error.path]) + path = ".".join([str(p) for p in error.path]) errors += ((path, error.message),) if errors: if strict: @@ -208,12 +219,13 @@ def annotated_cloudconfig_file(cloudconfig, original_content, schema_errors): schemapaths = {} if cloudconfig: schemapaths = _schemapath_for_cloudconfig( - cloudconfig, original_content) + cloudconfig, original_content + ) errors_by_line = defaultdict(list) error_footer = [] annotated_content = [] for path, msg in schema_errors: - match = re.match(r'format-l(?P<line>\d+)\.c(?P<col>\d+).*', path) + match = re.match(r"format-l(?P<line>\d+)\.c(?P<col>\d+).*", path) if match: line, col = match.groups() errors_by_line[int(line)].append(msg) @@ -221,24 +233,26 @@ def annotated_cloudconfig_file(cloudconfig, original_content, schema_errors): col = None errors_by_line[schemapaths[path]].append(msg) if col is not None: - msg = 'Line {line} column {col}: {msg}'.format( - line=line, col=col, msg=msg) - lines = original_content.decode().split('\n') + msg = "Line {line} column {col}: {msg}".format( + line=line, col=col, msg=msg + ) + lines = original_content.decode().split("\n") error_index = 1 for line_number, line in enumerate(lines, 1): errors = errors_by_line[line_number] if errors: error_label = [] for error in errors: - error_label.append('E{0}'.format(error_index)) - error_footer.append('# E{0}: {1}'.format(error_index, error)) + error_label.append("E{0}".format(error_index)) + error_footer.append("# E{0}: {1}".format(error_index, error)) error_index += 1 - annotated_content.append(line + '\t\t# ' + ','.join(error_label)) + annotated_content.append(line + "\t\t# " + ",".join(error_label)) else: annotated_content.append(line) annotated_content.append( - '# Errors: -------------\n{0}\n\n'.format('\n'.join(error_footer))) - return '\n'.join(annotated_content) + "# Errors: -------------\n{0}\n\n".format("\n".join(error_footer)) + ) + return "\n".join(annotated_content) def validate_cloudconfig_file(config_path, schema, annotate=False): @@ -266,15 +280,18 @@ def validate_cloudconfig_file(config_path, schema, annotate=False): else: if not os.path.exists(config_path): raise RuntimeError( - 'Configfile {0} does not exist'.format( - config_path - ) + "Configfile {0} does not exist".format(config_path) ) content = load_file(config_path, decode=False) if not content.startswith(CLOUD_CONFIG_HEADER): errors = ( - ('format-l1.c1', 'File {0} needs to begin with "{1}"'.format( - config_path, CLOUD_CONFIG_HEADER.decode())),) + ( + "format-l1.c1", + 'File {0} needs to begin with "{1}"'.format( + config_path, CLOUD_CONFIG_HEADER.decode() + ), + ), + ) error = SchemaValidationError(errors) if annotate: print(annotated_cloudconfig_file({}, content, error.schema_errors)) @@ -284,27 +301,32 @@ def validate_cloudconfig_file(config_path, schema, annotate=False): except (yaml.YAMLError) as e: line = column = 1 mark = None - if hasattr(e, 'context_mark') and getattr(e, 'context_mark'): - mark = getattr(e, 'context_mark') - elif hasattr(e, 'problem_mark') and getattr(e, 'problem_mark'): - mark = getattr(e, 'problem_mark') + if hasattr(e, "context_mark") and getattr(e, "context_mark"): + mark = getattr(e, "context_mark") + elif hasattr(e, "problem_mark") and getattr(e, "problem_mark"): + mark = getattr(e, "problem_mark") if mark: line = mark.line + 1 column = mark.column + 1 - errors = (('format-l{line}.c{col}'.format(line=line, col=column), - 'File {0} is not valid yaml. {1}'.format( - config_path, str(e))),) + errors = ( + ( + "format-l{line}.c{col}".format(line=line, col=column), + "File {0} is not valid yaml. {1}".format(config_path, str(e)), + ), + ) error = SchemaValidationError(errors) if annotate: print(annotated_cloudconfig_file({}, content, error.schema_errors)) raise error from e try: - validate_cloudconfig_schema( - cloudconfig, schema, strict=True) + validate_cloudconfig_schema(cloudconfig, schema, strict=True) except SchemaValidationError as e: if annotate: - print(annotated_cloudconfig_file( - cloudconfig, content, e.schema_errors)) + print( + annotated_cloudconfig_file( + cloudconfig, content, e.schema_errors + ) + ) raise @@ -315,26 +337,26 @@ def _schemapath_for_cloudconfig(config, original_content): @param original_content: The simple file content of the cloud-config file """ # FIXME Doesn't handle multi-line lists or multi-line strings - content_lines = original_content.decode().split('\n') + content_lines = original_content.decode().split("\n") schema_line_numbers = {} list_index = 0 - RE_YAML_INDENT = r'^(\s*)' + RE_YAML_INDENT = r"^(\s*)" scopes = [] for line_number, line in enumerate(content_lines, 1): indent_depth = len(re.match(RE_YAML_INDENT, line).groups()[0]) line = line.strip() - if not line or line.startswith('#'): + if not line or line.startswith("#"): continue if scopes: previous_depth, path_prefix = scopes[-1] else: previous_depth = -1 - path_prefix = '' - if line.startswith('- '): + path_prefix = "" + if line.startswith("- "): # Process list items adding a list_index to the path prefix - previous_list_idx = '.%d' % (list_index - 1) + previous_list_idx = ".%d" % (list_index - 1) if path_prefix and path_prefix.endswith(previous_list_idx): - path_prefix = path_prefix[:-len(previous_list_idx)] + path_prefix = path_prefix[: -len(previous_list_idx)] key = str(list_index) schema_line_numbers[key] = line_number item_indent = len(re.match(RE_YAML_INDENT, line[1:]).groups()[0]) @@ -346,26 +368,26 @@ def _schemapath_for_cloudconfig(config, original_content): else: # Process non-list lines setting value if present list_index = 0 - key, value = line.split(':', 1) + key, value = line.split(":", 1) if path_prefix: # Append any existing path_prefix for a fully-pathed key - key = path_prefix + '.' + key + key = path_prefix + "." + key while indent_depth <= previous_depth: if scopes: previous_depth, path_prefix = scopes.pop() if list_index > 0 and indent_depth == previous_depth: - path_prefix = '.'.join(path_prefix.split('.')[:-1]) + path_prefix = ".".join(path_prefix.split(".")[:-1]) break else: previous_depth = -1 - path_prefix = '' + path_prefix = "" scopes.append((indent_depth, key)) if value: value = value.strip() - if value.startswith('['): - scopes.append((indent_depth + 2, key + '.0')) + if value.startswith("["): + scopes.append((indent_depth + 2, key + ".0")) for inner_list_index in range(0, len(yaml.safe_load(value))): - list_key = key + '.' + str(inner_list_index) + list_key = key + "." + str(inner_list_index) schema_line_numbers[list_key] = line_number schema_line_numbers[key] = line_number return schema_line_numbers @@ -381,14 +403,14 @@ def _get_property_type(property_dict: dict) -> str: str(_YAML_MAP.get(k, k)) for k in property_dict["enum"] ] if isinstance(property_type, list): - property_type = '/'.join(property_type) - items = property_dict.get('items', {}) - sub_property_type = items.get('type', '') + property_type = "/".join(property_type) + items = property_dict.get("items", {}) + sub_property_type = items.get("type", "") # Collect each item type - for sub_item in items.get('oneOf', {}): + for sub_item in items.get("oneOf", {}): if sub_property_type: - sub_property_type += '/' - sub_property_type += '(' + _get_property_type(sub_item) + ')' + sub_property_type += "/" + sub_property_type += "(" + _get_property_type(sub_item) + ")" if sub_property_type: return "{0} of {1}".format(property_type, sub_property_type) return property_type or "UNDEFINED" @@ -408,17 +430,17 @@ def _parse_description(description, prefix) -> str: """ list_paragraph = prefix * 3 description = re.sub(r"(\S)\n(\S)", r"\1 \2", description) + description = re.sub(r"\n\n", r"\n\n{}".format(prefix), description) description = re.sub( - r"\n\n", r"\n\n{}".format(prefix), description) - description = re.sub( - r"\n( +)-", r"\n{}-".format(list_paragraph), description) + r"\n( +)-", r"\n{}-".format(list_paragraph), description + ) return description def _get_property_doc(schema: dict, prefix=" ") -> str: """Return restructured text describing the supported schema properties.""" - new_prefix = prefix + ' ' + new_prefix = prefix + " " properties = [] property_keys = [ schema.get("properties", {}), @@ -473,16 +495,17 @@ def _get_examples(meta: MetaSchema) -> str: """Return restructured text describing the meta examples if present.""" examples = meta.get("examples") if not examples: - return '' + return "" rst_content = SCHEMA_EXAMPLES_HEADER for count, example in enumerate(examples): # Python2.6 is missing textwrapper.indent - lines = example.split('\n') - indented_lines = [' {0}'.format(line) for line in lines] + lines = example.split("\n") + indented_lines = [" {0}".format(line) for line in lines] if rst_content != SCHEMA_EXAMPLES_HEADER: indented_lines.insert( - 0, SCHEMA_EXAMPLES_SPACER_TEMPLATE.format(count + 1)) - rst_content += '\n'.join(indented_lines) + 0, SCHEMA_EXAMPLES_SPACER_TEMPLATE.format(count + 1) + ) + rst_content += "\n".join(indented_lines) return rst_content @@ -552,7 +575,8 @@ def load_doc(requested_modules: list) -> str: if invalid_docs: error( "Invalid --docs value {}. Must be one of: {}".format( - list(invalid_docs), ", ".join(all_modules), + list(invalid_docs), + ", ".join(all_modules), ) ) for mod_name in all_modules: @@ -601,17 +625,35 @@ def get_parser(parser=None): """Return a parser for supported cmdline arguments.""" if not parser: parser = argparse.ArgumentParser( - prog='cloudconfig-schema', - description='Validate cloud-config files or document schema') - parser.add_argument('-c', '--config-file', - help='Path of the cloud-config yaml file to validate') - parser.add_argument('--system', action='store_true', default=False, - help='Validate the system cloud-config userdata') - parser.add_argument('-d', '--docs', nargs='+', - help=('Print schema module docs. Choices: all or' - ' space-delimited cc_names.')) - parser.add_argument('--annotate', action="store_true", default=False, - help='Annotate existing cloud-config file with errors') + prog="cloudconfig-schema", + description="Validate cloud-config files or document schema", + ) + parser.add_argument( + "-c", + "--config-file", + help="Path of the cloud-config yaml file to validate", + ) + parser.add_argument( + "--system", + action="store_true", + default=False, + help="Validate the system cloud-config userdata", + ) + parser.add_argument( + "-d", + "--docs", + nargs="+", + help=( + "Print schema module docs. Choices: all or" + " space-delimited cc_names." + ), + ) + parser.add_argument( + "--annotate", + action="store_true", + default=False, + help="Annotate existing cloud-config file with errors", + ) return parser @@ -619,12 +661,13 @@ def handle_schema_args(name, args): """Handle provided schema args and perform the appropriate actions.""" exclusive_args = [args.config_file, args.docs, args.system] if len([arg for arg in exclusive_args if arg]) != 1: - error('Expected one of --config-file, --system or --docs arguments') + error("Expected one of --config-file, --system or --docs arguments") full_schema = get_schema() if args.config_file or args.system: try: validate_cloudconfig_file( - args.config_file, full_schema, args.annotate) + args.config_file, full_schema, args.annotate + ) except SchemaValidationError as e: if not args.annotate: error(str(e)) @@ -643,11 +686,11 @@ def handle_schema_args(name, args): def main(): """Tool to validate schema of a cloud-config file or print schema docs.""" parser = get_parser() - handle_schema_args('cloudconfig-schema', parser.parse_args()) + handle_schema_args("cloudconfig-schema", parser.parse_args()) return 0 -if __name__ == '__main__': +if __name__ == "__main__": sys.exit(main()) # vi: ts=4 expandtab |