diff options
author | Brett Holman <bholman.devel@gmail.com> | 2021-12-06 15:27:12 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-12-06 15:27:12 -0700 |
commit | bedac77e9348e7a54c0ec364fb61df90cd893972 (patch) | |
tree | 73a0ddaada5ceb256e22c053fec50db82671d14c /tests/unittests/config/test_schema.py | |
parent | f428ed1611bdb685598832dd42495f0bcda40ec4 (diff) | |
download | vyos-cloud-init-bedac77e9348e7a54c0ec364fb61df90cd893972.tar.gz vyos-cloud-init-bedac77e9348e7a54c0ec364fb61df90cd893972.zip |
Add Strict Metaschema Validation (#1101)
Improve schema validation.
This adds strict validation of config module definitions at testing
time, with plumbing included for future runtime validation. This
eliminates a class of bugs resulting from schemas that have definitions
that are incorrect, but get interpreted by jsonschema as
"additionalProperties" that are therefore ignored.
- Add strict meta-schema for jsonschema unit test validation
- Separate schema from module metadata structure
- Improve type annotations for various functions and data types
Cleanup:
- Remove unused jsonschema "required" elements
- Eliminate manual memoization in schema.py:get_schema(),
reference module.__doc__ directly
Diffstat (limited to 'tests/unittests/config/test_schema.py')
-rw-r--r-- | tests/unittests/config/test_schema.py | 339 |
1 files changed, 250 insertions, 89 deletions
diff --git a/tests/unittests/config/test_schema.py b/tests/unittests/config/test_schema.py index b01f5eea..f90e0f62 100644 --- a/tests/unittests/config/test_schema.py +++ b/tests/unittests/config/test_schema.py @@ -1,13 +1,10 @@ # This file is part of cloud-init. See LICENSE file for license information. -import cloudinit -from cloudinit.config.schema import ( - CLOUD_CONFIG_HEADER, SchemaValidationError, annotated_cloudconfig_file, - get_schema_doc, get_schema, validate_cloudconfig_file, - validate_cloudconfig_schema, main) -from cloudinit.util import write_file -from tests.unittests.helpers import CiTestCase, mock, skipUnlessJsonSchema +import importlib +import sys +import inspect +import logging from copy import copy import itertools import pytest @@ -15,6 +12,63 @@ from pathlib import Path from textwrap import dedent from yaml import safe_load +import cloudinit +from cloudinit.config.schema import ( + CLOUD_CONFIG_HEADER, + SchemaValidationError, + annotated_cloudconfig_file, + get_meta_doc, + get_schema, + get_jsonschema_validator, + validate_cloudconfig_file, + validate_cloudconfig_metaschema, + validate_cloudconfig_schema, + main, + MetaSchema, +) +from cloudinit.util import write_file +from tests.unittests.helpers import CiTestCase, mock, skipUnlessJsonSchema + + +def get_schemas() -> dict: + """Return all module schemas + + Assumes that module schemas have the variable name "schema" + """ + return get_module_variable("schema") + + +def get_metas() -> dict: + """Return all module metas + + Assumes that module schemas have the variable name "schema" + """ + return get_module_variable("meta") + + +def get_module_variable(var_name) -> dict: + """Inspect modules and get variable from module matching var_name""" + schemas = {} + + files = list(Path("../../cloudinit/config/").glob("cc_*.py")) + modules = [mod.stem for mod in files] + + for module in modules: + importlib.import_module("cloudinit.config.{}".format(module)) + + for k, v in sys.modules.items(): + path = Path(k) + + if "cloudinit.config" == path.stem and path.suffix[1:4] == "cc_": + module_name = path.suffix[1:] + members = inspect.getmembers(v) + schemas[module_name] = None + for name, value in members: + if name == var_name: + schemas[module_name] = value + break + return schemas + class GetSchemaTest(CiTestCase): @@ -34,25 +88,17 @@ class GetSchemaTest(CiTestCase): 'cc_ubuntu_advantage', 'cc_ubuntu_drivers', 'cc_write_files', - 'cc_write_files_deferred', 'cc_zypper_add_repo', 'cc_chef', 'cc_install_hotplug', ], - [subschema['id'] for subschema in schema['allOf']]) - self.assertEqual('cloud-config-schema', schema['id']) + [meta["id"] for meta in get_metas().values() if meta is not None], + ) + self.assertEqual("cloud-config-schema", schema["id"]) self.assertEqual( - 'http://json-schema.org/draft-04/schema#', - schema['$schema']) - # FULL_SCHEMA is updated by the get_schema call - from cloudinit.config.schema import FULL_SCHEMA - self.assertCountEqual(['id', '$schema', 'allOf'], FULL_SCHEMA.keys()) - - def test_get_schema_returns_global_when_set(self): - """When FULL_SCHEMA global is already set, get_schema returns it.""" - m_schema_path = 'cloudinit.config.schema.FULL_SCHEMA' - with mock.patch(m_schema_path, {'here': 'iam'}): - self.assertEqual({'here': 'iam'}, get_schema()) + "http://json-schema.org/draft-04/schema#", schema["$schema"] + ) + self.assertCountEqual(["id", "$schema", "allOf"], get_schema().keys()) class SchemaValidationErrorTest(CiTestCase): @@ -93,8 +139,9 @@ class ValidateCloudConfigSchemaTest(CiTestCase): with mock.patch.dict('sys.modules', **{'jsonschema': ImportError()}): validate_cloudconfig_schema({'p1': -1}, schema, strict=True) self.assertIn( - 'Ignoring schema validation. python-jsonschema is not present', - self.logs.getvalue()) + "Ignoring schema validation. jsonschema is not present", + self.logs.getvalue(), + ) @skipUnlessJsonSchema() def test_validateconfig_schema_strict_raises_errors(self): @@ -117,14 +164,48 @@ class ValidateCloudConfigSchemaTest(CiTestCase): "Cloud config schema errors: p1: '-1' is not a 'email'", str(context_mgr.exception)) + @skipUnlessJsonSchema() + def test_validateconfig_schema_honors_formats_strict_metaschema(self): + """With strict True and strict_metascheam True, ensure errors on format + """ + schema = {"properties": {"p1": {"type": "string", "format": "email"}}} + with self.assertRaises(SchemaValidationError) as context_mgr: + validate_cloudconfig_schema( + {"p1": "-1"}, schema, strict=True, strict_metaschema=True + ) + self.assertEqual( + "Cloud config schema errors: p1: '-1' is not a 'email'", + str(context_mgr.exception), + ) + + @skipUnlessJsonSchema() + def test_validateconfig_strict_metaschema_do_not_raise_exception(self): + """With strict_metaschema=True, do not raise exceptions. + + This flag is currently unused, but is intended for run-time validation. + This should warn, but not raise. + """ + schema = {"properties": {"p1": {"types": "string", "format": "email"}}} + validate_cloudconfig_schema( + {"p1": "-1"}, schema, strict_metaschema=True + ) + assert ( + "Meta-schema validation failed, attempting to validate config" + in self.logs.getvalue() + ) + class TestCloudConfigExamples: - schema = get_schema() + schema = get_schemas() + metas = get_metas() params = [ - (schema["id"], example) - for schema in schema["allOf"] for example in schema["examples"]] + (meta["id"], example) + for meta in metas.values() + if meta and meta.get("examples") + for example in meta.get("examples") + ] - @pytest.mark.parametrize("schema_id,example", params) + @pytest.mark.parametrize("schema_id, example", params) @skipUnlessJsonSchema() def test_validateconfig_schema_of_example(self, schema_id, example): """ For a given example in a config module we test if it is valid @@ -201,22 +282,42 @@ class ValidateCloudConfigFileTest(CiTestCase): class GetSchemaDocTest(CiTestCase): - """Tests for get_schema_doc.""" + """Tests for get_meta_doc.""" def setUp(self): super(GetSchemaDocTest, self).setUp() self.required_schema = { - 'title': 'title', 'description': 'description', 'id': 'id', - 'name': 'name', 'frequency': 'frequency', - 'distros': ['debian', 'rhel']} + "title": "title", + "description": "description", + "id": "id", + "name": "name", + "frequency": "frequency", + "distros": ["debian", "rhel"], + } + self.meta = MetaSchema( + { + "title": "title", + "description": "description", + "id": "id", + "name": "name", + "frequency": "frequency", + "distros": ["debian", "rhel"], + "examples": [ + 'ex1:\n [don\'t, expand, "this"]', + "ex2: true", + ], + } + ) - def test_get_schema_doc_returns_restructured_text(self): - """get_schema_doc returns restructured text for a cloudinit schema.""" + def test_get_meta_doc_returns_restructured_text(self): + """get_meta_doc returns restructured text for a cloudinit schema.""" full_schema = copy(self.required_schema) full_schema.update( {'properties': { 'prop1': {'type': 'array', 'description': 'prop-description', 'items': {'type': 'integer'}}}}) + + doc = get_meta_doc(self.meta, full_schema) self.assertEqual( dedent(""" name @@ -232,47 +333,51 @@ class GetSchemaDocTest(CiTestCase): **Supported distros:** debian, rhel **Config schema**: - **prop1:** (array of integer) prop-description\n\n"""), - get_schema_doc(full_schema)) + **prop1:** (array of integer) prop-description - def test_get_schema_doc_handles_multiple_types(self): - """get_schema_doc delimits multiple property types with a '/'.""" - full_schema = copy(self.required_schema) - full_schema.update( - {'properties': { - 'prop1': {'type': ['string', 'integer'], - 'description': 'prop-description'}}}) + **Examples**:: + + ex1: + [don't, expand, "this"] + # --- Example2 --- + ex2: true + """), + doc, + ) + + def test_get_meta_doc_handles_multiple_types(self): + """get_meta_doc delimits multiple property types with a '/'.""" + schema = {"properties": {"prop1": {"type": ["string", "integer"]}}} self.assertIn( - '**prop1:** (string/integer) prop-description', - get_schema_doc(full_schema)) + "**prop1:** (string/integer)", get_meta_doc(self.meta, schema) + ) - def test_get_schema_doc_handles_enum_types(self): - """get_schema_doc converts enum types to yaml and delimits with '/'.""" - full_schema = copy(self.required_schema) - full_schema.update( - {'properties': { - 'prop1': {'enum': [True, False, 'stuff'], - 'description': 'prop-description'}}}) + def test_get_meta_doc_handles_enum_types(self): + """get_meta_doc converts enum types to yaml and delimits with '/'.""" + schema = {"properties": {"prop1": {"enum": [True, False, "stuff"]}}} self.assertIn( - '**prop1:** (true/false/stuff) prop-description', - get_schema_doc(full_schema)) + "**prop1:** (true/false/stuff)", get_meta_doc(self.meta, schema) + ) - def test_get_schema_doc_handles_nested_oneof_property_types(self): - """get_schema_doc describes array items oneOf declarations in type.""" - full_schema = copy(self.required_schema) - full_schema.update( - {'properties': { - 'prop1': {'type': 'array', - 'items': { - 'oneOf': [{'type': 'string'}, - {'type': 'integer'}]}, - 'description': 'prop-description'}}}) + def test_get_meta_doc_handles_nested_oneof_property_types(self): + """get_meta_doc describes array items oneOf declarations in type.""" + schema = { + "properties": { + "prop1": { + "type": "array", + "items": { + "oneOf": [{"type": "string"}, {"type": "integer"}] + }, + } + } + } self.assertIn( - '**prop1:** (array of (string)/(integer)) prop-description', - get_schema_doc(full_schema)) + "**prop1:** (array of (string)/(integer))", + get_meta_doc(self.meta, schema), + ) - def test_get_schema_doc_handles_string_examples(self): - """get_schema_doc properly indented examples as a list of strings.""" + def test_get_meta_doc_handles_string_examples(self): + """get_meta_doc properly indented examples as a list of strings.""" full_schema = copy(self.required_schema) full_schema.update( {'examples': ['ex1:\n [don\'t, expand, "this"]', 'ex2: true'], @@ -291,16 +396,17 @@ class GetSchemaDocTest(CiTestCase): # --- Example2 --- ex2: true """), - get_schema_doc(full_schema)) + get_meta_doc(self.meta, full_schema), + ) - def test_get_schema_doc_properly_parse_description(self): - """get_schema_doc description properly formatted""" - full_schema = copy(self.required_schema) - full_schema.update( - {'properties': { - 'p1': { - 'type': 'string', - 'description': dedent("""\ + def test_get_meta_doc_properly_parse_description(self): + """get_meta_doc description properly formatted""" + schema = { + "properties": { + "p1": { + "type": "string", + "description": dedent( + """\ This item has the following options: @@ -312,8 +418,8 @@ class GetSchemaDocTest(CiTestCase): The default value is option1""") } - }} - ) + } + } self.assertIn( dedent(""" @@ -325,16 +431,28 @@ class GetSchemaDocTest(CiTestCase): - option3 The default value is option1 + """), - get_schema_doc(full_schema)) + get_meta_doc(self.meta, schema), + ) - def test_get_schema_doc_raises_key_errors(self): - """get_schema_doc raises KeyErrors on missing keys.""" - for key in self.required_schema: - invalid_schema = copy(self.required_schema) - invalid_schema.pop(key) + def test_get_meta_doc_raises_key_errors(self): + """get_meta_doc raises KeyErrors on missing keys.""" + schema = { + "properties": { + "prop1": { + "type": "array", + "items": { + "oneOf": [{"type": "string"}, {"type": "integer"}] + }, + } + } + } + for key in self.meta: + invalid_meta = copy(self.meta) + invalid_meta.pop(key) with self.assertRaises(KeyError) as context_mgr: - get_schema_doc(invalid_schema) + get_meta_doc(invalid_meta, schema) self.assertIn(key, str(context_mgr.exception)) @@ -418,6 +536,7 @@ class TestMain: _out, err = capsys.readouterr() expected = ( + 'Error:\n' 'Expected one of --config-file, --system or --docs arguments\n' ) assert expected == err @@ -431,6 +550,7 @@ class TestMain: _out, err = capsys.readouterr() expected = ( + 'Error:\n' 'Expected one of --config-file, --system or --docs arguments\n' ) assert expected == err @@ -443,7 +563,7 @@ class TestMain: main() assert 1 == context_manager.value.code _out, err = capsys.readouterr() - assert 'Configfile NOT_A_FILE does not exist\n' == err + assert 'Error:\nConfigfile NOT_A_FILE does not exist\n' == err def test_main_prints_docs(self, capsys): """When --docs parameter is provided, main generates documentation.""" @@ -489,12 +609,13 @@ class TestMain: assert 1 == context_manager.value.code _out, err = capsys.readouterr() expected = ( - 'Unable to read system userdata as non-root user. Try using sudo\n' + 'Error:\nUnable to read system userdata as non-root user. ' + 'Try using sudo\n' ) assert expected == err -def _get_schema_doc_examples(): +def _get_meta_doc_examples(): examples_dir = Path( cloudinit.__file__).parent.parent / 'doc' / 'examples' assert examples_dir.is_dir() @@ -507,9 +628,49 @@ def _get_schema_doc_examples(): class TestSchemaDocExamples: schema = get_schema() - @pytest.mark.parametrize("example_path", _get_schema_doc_examples()) + @pytest.mark.parametrize("example_path", _get_meta_doc_examples()) @skipUnlessJsonSchema() def test_schema_doc_examples(self, example_path): validate_cloudconfig_file(str(example_path), self.schema) + +class TestStrictMetaschema: + """Validate that schemas follow a stricter metaschema definition than + the default. This disallows arbitrary key/value pairs. + """ + + @skipUnlessJsonSchema() + def test_modules(self): + """Validate all modules with a stricter metaschema""" + (validator, _) = get_jsonschema_validator() + for (name, value) in get_schemas().items(): + if value: + validate_cloudconfig_metaschema(validator, value) + else: + logging.warning("module %s has no schema definition", name) + + @skipUnlessJsonSchema() + def test_validate_bad_module(self): + """Throw exception by default, don't throw if throw=False + + item should be 'items' and is therefore interpreted as an additional + property which is invalid with a strict metaschema + """ + (validator, _) = get_jsonschema_validator() + schema = { + "type": "array", + "item": { + "type": "object", + }, + } + with pytest.raises( + SchemaValidationError, + match=(r"Additional properties are not allowed.*") + ): + + validate_cloudconfig_metaschema(validator, schema) + + validate_cloudconfig_metaschema(validator, schema, throw=False) + + # vi: ts=4 expandtab syntax=python |