summaryrefslogtreecommitdiff
path: root/tests/unittests/config/test_schema.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/unittests/config/test_schema.py')
-rw-r--r--tests/unittests/config/test_schema.py339
1 files changed, 250 insertions, 89 deletions
diff --git a/tests/unittests/config/test_schema.py b/tests/unittests/config/test_schema.py
index b01f5eea..f90e0f62 100644
--- a/tests/unittests/config/test_schema.py
+++ b/tests/unittests/config/test_schema.py
@@ -1,13 +1,10 @@
# This file is part of cloud-init. See LICENSE file for license information.
-import cloudinit
-from cloudinit.config.schema import (
- CLOUD_CONFIG_HEADER, SchemaValidationError, annotated_cloudconfig_file,
- get_schema_doc, get_schema, validate_cloudconfig_file,
- validate_cloudconfig_schema, main)
-from cloudinit.util import write_file
-from tests.unittests.helpers import CiTestCase, mock, skipUnlessJsonSchema
+import importlib
+import sys
+import inspect
+import logging
from copy import copy
import itertools
import pytest
@@ -15,6 +12,63 @@ from pathlib import Path
from textwrap import dedent
from yaml import safe_load
+import cloudinit
+from cloudinit.config.schema import (
+ CLOUD_CONFIG_HEADER,
+ SchemaValidationError,
+ annotated_cloudconfig_file,
+ get_meta_doc,
+ get_schema,
+ get_jsonschema_validator,
+ validate_cloudconfig_file,
+ validate_cloudconfig_metaschema,
+ validate_cloudconfig_schema,
+ main,
+ MetaSchema,
+)
+from cloudinit.util import write_file
+from tests.unittests.helpers import CiTestCase, mock, skipUnlessJsonSchema
+
+
+def get_schemas() -> dict:
+ """Return all module schemas
+
+ Assumes that module schemas have the variable name "schema"
+ """
+ return get_module_variable("schema")
+
+
+def get_metas() -> dict:
+ """Return all module metas
+
+ Assumes that module schemas have the variable name "schema"
+ """
+ return get_module_variable("meta")
+
+
+def get_module_variable(var_name) -> dict:
+ """Inspect modules and get variable from module matching var_name"""
+ schemas = {}
+
+ files = list(Path("../../cloudinit/config/").glob("cc_*.py"))
+ modules = [mod.stem for mod in files]
+
+ for module in modules:
+ importlib.import_module("cloudinit.config.{}".format(module))
+
+ for k, v in sys.modules.items():
+ path = Path(k)
+
+ if "cloudinit.config" == path.stem and path.suffix[1:4] == "cc_":
+ module_name = path.suffix[1:]
+ members = inspect.getmembers(v)
+ schemas[module_name] = None
+ for name, value in members:
+ if name == var_name:
+ schemas[module_name] = value
+ break
+ return schemas
+
class GetSchemaTest(CiTestCase):
@@ -34,25 +88,17 @@ class GetSchemaTest(CiTestCase):
'cc_ubuntu_advantage',
'cc_ubuntu_drivers',
'cc_write_files',
- 'cc_write_files_deferred',
'cc_zypper_add_repo',
'cc_chef',
'cc_install_hotplug',
],
- [subschema['id'] for subschema in schema['allOf']])
- self.assertEqual('cloud-config-schema', schema['id'])
+ [meta["id"] for meta in get_metas().values() if meta is not None],
+ )
+ self.assertEqual("cloud-config-schema", schema["id"])
self.assertEqual(
- 'http://json-schema.org/draft-04/schema#',
- schema['$schema'])
- # FULL_SCHEMA is updated by the get_schema call
- from cloudinit.config.schema import FULL_SCHEMA
- self.assertCountEqual(['id', '$schema', 'allOf'], FULL_SCHEMA.keys())
-
- def test_get_schema_returns_global_when_set(self):
- """When FULL_SCHEMA global is already set, get_schema returns it."""
- m_schema_path = 'cloudinit.config.schema.FULL_SCHEMA'
- with mock.patch(m_schema_path, {'here': 'iam'}):
- self.assertEqual({'here': 'iam'}, get_schema())
+ "http://json-schema.org/draft-04/schema#", schema["$schema"]
+ )
+ self.assertCountEqual(["id", "$schema", "allOf"], get_schema().keys())
class SchemaValidationErrorTest(CiTestCase):
@@ -93,8 +139,9 @@ class ValidateCloudConfigSchemaTest(CiTestCase):
with mock.patch.dict('sys.modules', **{'jsonschema': ImportError()}):
validate_cloudconfig_schema({'p1': -1}, schema, strict=True)
self.assertIn(
- 'Ignoring schema validation. python-jsonschema is not present',
- self.logs.getvalue())
+ "Ignoring schema validation. jsonschema is not present",
+ self.logs.getvalue(),
+ )
@skipUnlessJsonSchema()
def test_validateconfig_schema_strict_raises_errors(self):
@@ -117,14 +164,48 @@ class ValidateCloudConfigSchemaTest(CiTestCase):
"Cloud config schema errors: p1: '-1' is not a 'email'",
str(context_mgr.exception))
+ @skipUnlessJsonSchema()
+ def test_validateconfig_schema_honors_formats_strict_metaschema(self):
+ """With strict True and strict_metascheam True, ensure errors on format
+ """
+ schema = {"properties": {"p1": {"type": "string", "format": "email"}}}
+ with self.assertRaises(SchemaValidationError) as context_mgr:
+ validate_cloudconfig_schema(
+ {"p1": "-1"}, schema, strict=True, strict_metaschema=True
+ )
+ self.assertEqual(
+ "Cloud config schema errors: p1: '-1' is not a 'email'",
+ str(context_mgr.exception),
+ )
+
+ @skipUnlessJsonSchema()
+ def test_validateconfig_strict_metaschema_do_not_raise_exception(self):
+ """With strict_metaschema=True, do not raise exceptions.
+
+ This flag is currently unused, but is intended for run-time validation.
+ This should warn, but not raise.
+ """
+ schema = {"properties": {"p1": {"types": "string", "format": "email"}}}
+ validate_cloudconfig_schema(
+ {"p1": "-1"}, schema, strict_metaschema=True
+ )
+ assert (
+ "Meta-schema validation failed, attempting to validate config"
+ in self.logs.getvalue()
+ )
+
class TestCloudConfigExamples:
- schema = get_schema()
+ schema = get_schemas()
+ metas = get_metas()
params = [
- (schema["id"], example)
- for schema in schema["allOf"] for example in schema["examples"]]
+ (meta["id"], example)
+ for meta in metas.values()
+ if meta and meta.get("examples")
+ for example in meta.get("examples")
+ ]
- @pytest.mark.parametrize("schema_id,example", params)
+ @pytest.mark.parametrize("schema_id, example", params)
@skipUnlessJsonSchema()
def test_validateconfig_schema_of_example(self, schema_id, example):
""" For a given example in a config module we test if it is valid
@@ -201,22 +282,42 @@ class ValidateCloudConfigFileTest(CiTestCase):
class GetSchemaDocTest(CiTestCase):
- """Tests for get_schema_doc."""
+ """Tests for get_meta_doc."""
def setUp(self):
super(GetSchemaDocTest, self).setUp()
self.required_schema = {
- 'title': 'title', 'description': 'description', 'id': 'id',
- 'name': 'name', 'frequency': 'frequency',
- 'distros': ['debian', 'rhel']}
+ "title": "title",
+ "description": "description",
+ "id": "id",
+ "name": "name",
+ "frequency": "frequency",
+ "distros": ["debian", "rhel"],
+ }
+ self.meta = MetaSchema(
+ {
+ "title": "title",
+ "description": "description",
+ "id": "id",
+ "name": "name",
+ "frequency": "frequency",
+ "distros": ["debian", "rhel"],
+ "examples": [
+ 'ex1:\n [don\'t, expand, "this"]',
+ "ex2: true",
+ ],
+ }
+ )
- def test_get_schema_doc_returns_restructured_text(self):
- """get_schema_doc returns restructured text for a cloudinit schema."""
+ def test_get_meta_doc_returns_restructured_text(self):
+ """get_meta_doc returns restructured text for a cloudinit schema."""
full_schema = copy(self.required_schema)
full_schema.update(
{'properties': {
'prop1': {'type': 'array', 'description': 'prop-description',
'items': {'type': 'integer'}}}})
+
+ doc = get_meta_doc(self.meta, full_schema)
self.assertEqual(
dedent("""
name
@@ -232,47 +333,51 @@ class GetSchemaDocTest(CiTestCase):
**Supported distros:** debian, rhel
**Config schema**:
- **prop1:** (array of integer) prop-description\n\n"""),
- get_schema_doc(full_schema))
+ **prop1:** (array of integer) prop-description
- def test_get_schema_doc_handles_multiple_types(self):
- """get_schema_doc delimits multiple property types with a '/'."""
- full_schema = copy(self.required_schema)
- full_schema.update(
- {'properties': {
- 'prop1': {'type': ['string', 'integer'],
- 'description': 'prop-description'}}})
+ **Examples**::
+
+ ex1:
+ [don't, expand, "this"]
+ # --- Example2 ---
+ ex2: true
+ """),
+ doc,
+ )
+
+ def test_get_meta_doc_handles_multiple_types(self):
+ """get_meta_doc delimits multiple property types with a '/'."""
+ schema = {"properties": {"prop1": {"type": ["string", "integer"]}}}
self.assertIn(
- '**prop1:** (string/integer) prop-description',
- get_schema_doc(full_schema))
+ "**prop1:** (string/integer)", get_meta_doc(self.meta, schema)
+ )
- def test_get_schema_doc_handles_enum_types(self):
- """get_schema_doc converts enum types to yaml and delimits with '/'."""
- full_schema = copy(self.required_schema)
- full_schema.update(
- {'properties': {
- 'prop1': {'enum': [True, False, 'stuff'],
- 'description': 'prop-description'}}})
+ def test_get_meta_doc_handles_enum_types(self):
+ """get_meta_doc converts enum types to yaml and delimits with '/'."""
+ schema = {"properties": {"prop1": {"enum": [True, False, "stuff"]}}}
self.assertIn(
- '**prop1:** (true/false/stuff) prop-description',
- get_schema_doc(full_schema))
+ "**prop1:** (true/false/stuff)", get_meta_doc(self.meta, schema)
+ )
- def test_get_schema_doc_handles_nested_oneof_property_types(self):
- """get_schema_doc describes array items oneOf declarations in type."""
- full_schema = copy(self.required_schema)
- full_schema.update(
- {'properties': {
- 'prop1': {'type': 'array',
- 'items': {
- 'oneOf': [{'type': 'string'},
- {'type': 'integer'}]},
- 'description': 'prop-description'}}})
+ def test_get_meta_doc_handles_nested_oneof_property_types(self):
+ """get_meta_doc describes array items oneOf declarations in type."""
+ schema = {
+ "properties": {
+ "prop1": {
+ "type": "array",
+ "items": {
+ "oneOf": [{"type": "string"}, {"type": "integer"}]
+ },
+ }
+ }
+ }
self.assertIn(
- '**prop1:** (array of (string)/(integer)) prop-description',
- get_schema_doc(full_schema))
+ "**prop1:** (array of (string)/(integer))",
+ get_meta_doc(self.meta, schema),
+ )
- def test_get_schema_doc_handles_string_examples(self):
- """get_schema_doc properly indented examples as a list of strings."""
+ def test_get_meta_doc_handles_string_examples(self):
+ """get_meta_doc properly indented examples as a list of strings."""
full_schema = copy(self.required_schema)
full_schema.update(
{'examples': ['ex1:\n [don\'t, expand, "this"]', 'ex2: true'],
@@ -291,16 +396,17 @@ class GetSchemaDocTest(CiTestCase):
# --- Example2 ---
ex2: true
"""),
- get_schema_doc(full_schema))
+ get_meta_doc(self.meta, full_schema),
+ )
- def test_get_schema_doc_properly_parse_description(self):
- """get_schema_doc description properly formatted"""
- full_schema = copy(self.required_schema)
- full_schema.update(
- {'properties': {
- 'p1': {
- 'type': 'string',
- 'description': dedent("""\
+ def test_get_meta_doc_properly_parse_description(self):
+ """get_meta_doc description properly formatted"""
+ schema = {
+ "properties": {
+ "p1": {
+ "type": "string",
+ "description": dedent(
+ """\
This item
has the
following options:
@@ -312,8 +418,8 @@ class GetSchemaDocTest(CiTestCase):
The default value is
option1""")
}
- }}
- )
+ }
+ }
self.assertIn(
dedent("""
@@ -325,16 +431,28 @@ class GetSchemaDocTest(CiTestCase):
- option3
The default value is option1
+
"""),
- get_schema_doc(full_schema))
+ get_meta_doc(self.meta, schema),
+ )
- def test_get_schema_doc_raises_key_errors(self):
- """get_schema_doc raises KeyErrors on missing keys."""
- for key in self.required_schema:
- invalid_schema = copy(self.required_schema)
- invalid_schema.pop(key)
+ def test_get_meta_doc_raises_key_errors(self):
+ """get_meta_doc raises KeyErrors on missing keys."""
+ schema = {
+ "properties": {
+ "prop1": {
+ "type": "array",
+ "items": {
+ "oneOf": [{"type": "string"}, {"type": "integer"}]
+ },
+ }
+ }
+ }
+ for key in self.meta:
+ invalid_meta = copy(self.meta)
+ invalid_meta.pop(key)
with self.assertRaises(KeyError) as context_mgr:
- get_schema_doc(invalid_schema)
+ get_meta_doc(invalid_meta, schema)
self.assertIn(key, str(context_mgr.exception))
@@ -418,6 +536,7 @@ class TestMain:
_out, err = capsys.readouterr()
expected = (
+ 'Error:\n'
'Expected one of --config-file, --system or --docs arguments\n'
)
assert expected == err
@@ -431,6 +550,7 @@ class TestMain:
_out, err = capsys.readouterr()
expected = (
+ 'Error:\n'
'Expected one of --config-file, --system or --docs arguments\n'
)
assert expected == err
@@ -443,7 +563,7 @@ class TestMain:
main()
assert 1 == context_manager.value.code
_out, err = capsys.readouterr()
- assert 'Configfile NOT_A_FILE does not exist\n' == err
+ assert 'Error:\nConfigfile NOT_A_FILE does not exist\n' == err
def test_main_prints_docs(self, capsys):
"""When --docs parameter is provided, main generates documentation."""
@@ -489,12 +609,13 @@ class TestMain:
assert 1 == context_manager.value.code
_out, err = capsys.readouterr()
expected = (
- 'Unable to read system userdata as non-root user. Try using sudo\n'
+ 'Error:\nUnable to read system userdata as non-root user. '
+ 'Try using sudo\n'
)
assert expected == err
-def _get_schema_doc_examples():
+def _get_meta_doc_examples():
examples_dir = Path(
cloudinit.__file__).parent.parent / 'doc' / 'examples'
assert examples_dir.is_dir()
@@ -507,9 +628,49 @@ def _get_schema_doc_examples():
class TestSchemaDocExamples:
schema = get_schema()
- @pytest.mark.parametrize("example_path", _get_schema_doc_examples())
+ @pytest.mark.parametrize("example_path", _get_meta_doc_examples())
@skipUnlessJsonSchema()
def test_schema_doc_examples(self, example_path):
validate_cloudconfig_file(str(example_path), self.schema)
+
+class TestStrictMetaschema:
+ """Validate that schemas follow a stricter metaschema definition than
+ the default. This disallows arbitrary key/value pairs.
+ """
+
+ @skipUnlessJsonSchema()
+ def test_modules(self):
+ """Validate all modules with a stricter metaschema"""
+ (validator, _) = get_jsonschema_validator()
+ for (name, value) in get_schemas().items():
+ if value:
+ validate_cloudconfig_metaschema(validator, value)
+ else:
+ logging.warning("module %s has no schema definition", name)
+
+ @skipUnlessJsonSchema()
+ def test_validate_bad_module(self):
+ """Throw exception by default, don't throw if throw=False
+
+ item should be 'items' and is therefore interpreted as an additional
+ property which is invalid with a strict metaschema
+ """
+ (validator, _) = get_jsonschema_validator()
+ schema = {
+ "type": "array",
+ "item": {
+ "type": "object",
+ },
+ }
+ with pytest.raises(
+ SchemaValidationError,
+ match=(r"Additional properties are not allowed.*")
+ ):
+
+ validate_cloudconfig_metaschema(validator, schema)
+
+ validate_cloudconfig_metaschema(validator, schema, throw=False)
+
+
# vi: ts=4 expandtab syntax=python