summaryrefslogtreecommitdiff
path: root/tests/unittests/config/test_schema.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/unittests/config/test_schema.py')
-rw-r--r--tests/unittests/config/test_schema.py301
1 files changed, 174 insertions, 127 deletions
diff --git a/tests/unittests/config/test_schema.py b/tests/unittests/config/test_schema.py
index 40803cae..fb5b891d 100644
--- a/tests/unittests/config/test_schema.py
+++ b/tests/unittests/config/test_schema.py
@@ -2,35 +2,36 @@
import importlib
-import sys
import inspect
+import itertools
import logging
+import sys
from copy import copy
-import itertools
-import pytest
from pathlib import Path
from textwrap import dedent
+
+import pytest
from yaml import safe_load
from cloudinit.config.schema import (
CLOUD_CONFIG_HEADER,
+ MetaSchema,
SchemaValidationError,
annotated_cloudconfig_file,
+ get_jsonschema_validator,
get_meta_doc,
get_schema,
- get_jsonschema_validator,
+ main,
validate_cloudconfig_file,
validate_cloudconfig_metaschema,
validate_cloudconfig_schema,
- main,
- MetaSchema,
)
from cloudinit.util import write_file
from tests.unittests.helpers import (
CiTestCase,
+ cloud_init_project_dir,
mock,
skipUnlessJsonSchema,
- cloud_init_project_dir,
)
@@ -78,26 +79,25 @@ def get_module_variable(var_name) -> dict:
class GetSchemaTest(CiTestCase):
-
def test_get_schema_coalesces_known_schema(self):
"""Every cloudconfig module with schema is listed in allOf keyword."""
schema = get_schema()
self.assertCountEqual(
[
- 'cc_apk_configure',
- 'cc_apt_configure',
- 'cc_bootcmd',
- 'cc_locale',
- 'cc_ntp',
- 'cc_resizefs',
- 'cc_runcmd',
- 'cc_snap',
- 'cc_ubuntu_advantage',
- 'cc_ubuntu_drivers',
- 'cc_write_files',
- 'cc_zypper_add_repo',
- 'cc_chef',
- 'cc_install_hotplug',
+ "cc_apk_configure",
+ "cc_apt_configure",
+ "cc_bootcmd",
+ "cc_locale",
+ "cc_ntp",
+ "cc_resizefs",
+ "cc_runcmd",
+ "cc_snap",
+ "cc_ubuntu_advantage",
+ "cc_ubuntu_drivers",
+ "cc_write_files",
+ "cc_zypper_add_repo",
+ "cc_chef",
+ "cc_install_hotplug",
],
[meta["id"] for meta in get_metas().values() if meta is not None],
)
@@ -113,15 +113,18 @@ class SchemaValidationErrorTest(CiTestCase):
def test_schema_validation_error_expects_schema_errors(self):
"""SchemaValidationError is initialized from schema_errors."""
- errors = (('key.path', 'unexpected key "junk"'),
- ('key2.path', '"-123" is not a valid "hostname" format'))
+ errors = (
+ ("key.path", 'unexpected key "junk"'),
+ ("key2.path", '"-123" is not a valid "hostname" format'),
+ )
exception = SchemaValidationError(schema_errors=errors)
self.assertIsInstance(exception, Exception)
self.assertEqual(exception.schema_errors, errors)
self.assertEqual(
'Cloud config schema errors: key.path: unexpected key "junk", '
'key2.path: "-123" is not a valid "hostname" format',
- str(exception))
+ str(exception),
+ )
self.assertTrue(isinstance(exception, ValueError))
@@ -133,18 +136,19 @@ class ValidateCloudConfigSchemaTest(CiTestCase):
@skipUnlessJsonSchema()
def test_validateconfig_schema_non_strict_emits_warnings(self):
"""When strict is False validate_cloudconfig_schema emits warnings."""
- schema = {'properties': {'p1': {'type': 'string'}}}
- validate_cloudconfig_schema({'p1': -1}, schema, strict=False)
+ schema = {"properties": {"p1": {"type": "string"}}}
+ validate_cloudconfig_schema({"p1": -1}, schema, strict=False)
self.assertIn(
"Invalid config:\np1: -1 is not of type 'string'\n",
- self.logs.getvalue())
+ self.logs.getvalue(),
+ )
@skipUnlessJsonSchema()
def test_validateconfig_schema_emits_warning_on_missing_jsonschema(self):
"""Warning from validate_cloudconfig_schema when missing jsonschema."""
- schema = {'properties': {'p1': {'type': 'string'}}}
- with mock.patch.dict('sys.modules', **{'jsonschema': ImportError()}):
- validate_cloudconfig_schema({'p1': -1}, schema, strict=True)
+ schema = {"properties": {"p1": {"type": "string"}}}
+ with mock.patch.dict("sys.modules", **{"jsonschema": ImportError()}):
+ validate_cloudconfig_schema({"p1": -1}, schema, strict=True)
self.assertIn(
"Ignoring schema validation. jsonschema is not present",
self.logs.getvalue(),
@@ -153,28 +157,28 @@ class ValidateCloudConfigSchemaTest(CiTestCase):
@skipUnlessJsonSchema()
def test_validateconfig_schema_strict_raises_errors(self):
"""When strict is True validate_cloudconfig_schema raises errors."""
- schema = {'properties': {'p1': {'type': 'string'}}}
+ schema = {"properties": {"p1": {"type": "string"}}}
with self.assertRaises(SchemaValidationError) as context_mgr:
- validate_cloudconfig_schema({'p1': -1}, schema, strict=True)
+ validate_cloudconfig_schema({"p1": -1}, schema, strict=True)
self.assertEqual(
"Cloud config schema errors: p1: -1 is not of type 'string'",
- str(context_mgr.exception))
+ str(context_mgr.exception),
+ )
@skipUnlessJsonSchema()
def test_validateconfig_schema_honors_formats(self):
"""With strict True, validate_cloudconfig_schema errors on format."""
- schema = {
- 'properties': {'p1': {'type': 'string', 'format': 'email'}}}
+ schema = {"properties": {"p1": {"type": "string", "format": "email"}}}
with self.assertRaises(SchemaValidationError) as context_mgr:
- validate_cloudconfig_schema({'p1': '-1'}, schema, strict=True)
+ validate_cloudconfig_schema({"p1": "-1"}, schema, strict=True)
self.assertEqual(
"Cloud config schema errors: p1: '-1' is not a 'email'",
- str(context_mgr.exception))
+ str(context_mgr.exception),
+ )
@skipUnlessJsonSchema()
def test_validateconfig_schema_honors_formats_strict_metaschema(self):
- """With strict True and strict_metascheam True, ensure errors on format
- """
+ """With strict and strict_metaschema True, ensure errors on format"""
schema = {"properties": {"p1": {"type": "string", "format": "email"}}}
with self.assertRaises(SchemaValidationError) as context_mgr:
validate_cloudconfig_schema(
@@ -229,15 +233,15 @@ class ValidateCloudConfigFileTest(CiTestCase):
def setUp(self):
super(ValidateCloudConfigFileTest, self).setUp()
- self.config_file = self.tmp_path('cloudcfg.yaml')
+ self.config_file = self.tmp_path("cloudcfg.yaml")
def test_validateconfig_file_error_on_absent_file(self):
"""On absent config_path, validate_cloudconfig_file errors."""
with self.assertRaises(RuntimeError) as context_mgr:
- validate_cloudconfig_file('/not/here', {})
+ validate_cloudconfig_file("/not/here", {})
self.assertEqual(
- 'Configfile /not/here does not exist',
- str(context_mgr.exception))
+ "Configfile /not/here does not exist", str(context_mgr.exception)
+ )
def test_validateconfig_file_error_on_invalid_header(self):
"""On invalid header, validate_cloudconfig_file errors.
@@ -245,48 +249,54 @@ class ValidateCloudConfigFileTest(CiTestCase):
A SchemaValidationError is raised when the file doesn't begin with
CLOUD_CONFIG_HEADER.
"""
- write_file(self.config_file, '#junk')
+ write_file(self.config_file, "#junk")
with self.assertRaises(SchemaValidationError) as context_mgr:
validate_cloudconfig_file(self.config_file, {})
self.assertEqual(
- 'Cloud config schema errors: format-l1.c1: File {0} needs to begin'
+ "Cloud config schema errors: format-l1.c1: File {0} needs to begin"
' with "{1}"'.format(
- self.config_file, CLOUD_CONFIG_HEADER.decode()),
- str(context_mgr.exception))
+ self.config_file, CLOUD_CONFIG_HEADER.decode()
+ ),
+ str(context_mgr.exception),
+ )
def test_validateconfig_file_error_on_non_yaml_scanner_error(self):
"""On non-yaml scan issues, validate_cloudconfig_file errors."""
# Generate a scanner error by providing text on a single line with
# improper indent.
- write_file(self.config_file, '#cloud-config\nasdf:\nasdf')
+ write_file(self.config_file, "#cloud-config\nasdf:\nasdf")
with self.assertRaises(SchemaValidationError) as context_mgr:
validate_cloudconfig_file(self.config_file, {})
self.assertIn(
- 'schema errors: format-l3.c1: File {0} is not valid yaml.'.format(
- self.config_file),
- str(context_mgr.exception))
+ "schema errors: format-l3.c1: File {0} is not valid yaml.".format(
+ self.config_file
+ ),
+ str(context_mgr.exception),
+ )
def test_validateconfig_file_error_on_non_yaml_parser_error(self):
"""On non-yaml parser issues, validate_cloudconfig_file errors."""
- write_file(self.config_file, '#cloud-config\n{}}')
+ write_file(self.config_file, "#cloud-config\n{}}")
with self.assertRaises(SchemaValidationError) as context_mgr:
validate_cloudconfig_file(self.config_file, {})
self.assertIn(
- 'schema errors: format-l2.c3: File {0} is not valid yaml.'.format(
- self.config_file),
- str(context_mgr.exception))
+ "schema errors: format-l2.c3: File {0} is not valid yaml.".format(
+ self.config_file
+ ),
+ str(context_mgr.exception),
+ )
@skipUnlessJsonSchema()
def test_validateconfig_file_sctrictly_validates_schema(self):
"""validate_cloudconfig_file raises errors on invalid schema."""
- schema = {
- 'properties': {'p1': {'type': 'string', 'format': 'string'}}}
- write_file(self.config_file, '#cloud-config\np1: -1')
+ schema = {"properties": {"p1": {"type": "string", "format": "string"}}}
+ write_file(self.config_file, "#cloud-config\np1: -1")
with self.assertRaises(SchemaValidationError) as context_mgr:
validate_cloudconfig_file(self.config_file, schema)
self.assertEqual(
"Cloud config schema errors: p1: -1 is not of type 'string'",
- str(context_mgr.exception))
+ str(context_mgr.exception),
+ )
class GetSchemaDocTest(CiTestCase):
@@ -321,13 +331,21 @@ class GetSchemaDocTest(CiTestCase):
"""get_meta_doc returns restructured text for a cloudinit schema."""
full_schema = copy(self.required_schema)
full_schema.update(
- {'properties': {
- 'prop1': {'type': 'array', 'description': 'prop-description',
- 'items': {'type': 'integer'}}}})
+ {
+ "properties": {
+ "prop1": {
+ "type": "array",
+ "description": "prop-description",
+ "items": {"type": "integer"},
+ }
+ }
+ }
+ )
doc = get_meta_doc(self.meta, full_schema)
self.assertEqual(
- dedent("""
+ dedent(
+ """
name
----
**Summary:** title
@@ -349,7 +367,8 @@ class GetSchemaDocTest(CiTestCase):
[don't, expand, "this"]
# --- Example2 ---
ex2: true
- """),
+ """
+ ),
doc,
)
@@ -388,12 +407,23 @@ class GetSchemaDocTest(CiTestCase):
"""get_meta_doc properly indented examples as a list of strings."""
full_schema = copy(self.required_schema)
full_schema.update(
- {'examples': ['ex1:\n [don\'t, expand, "this"]', 'ex2: true'],
- 'properties': {
- 'prop1': {'type': 'array', 'description': 'prop-description',
- 'items': {'type': 'integer'}}}})
+ {
+ "examples": [
+ 'ex1:\n [don\'t, expand, "this"]',
+ "ex2: true",
+ ],
+ "properties": {
+ "prop1": {
+ "type": "array",
+ "description": "prop-description",
+ "items": {"type": "integer"},
+ }
+ },
+ }
+ )
self.assertIn(
- dedent("""
+ dedent(
+ """
**Config schema**:
**prop1:** (array of integer) prop-description
@@ -403,7 +433,8 @@ class GetSchemaDocTest(CiTestCase):
[don't, expand, "this"]
# --- Example2 ---
ex2: true
- """),
+ """
+ ),
get_meta_doc(self.meta, full_schema),
)
@@ -424,13 +455,15 @@ class GetSchemaDocTest(CiTestCase):
- option3
The default value is
- option1""")
+ option1"""
+ ),
}
}
}
self.assertIn(
- dedent("""
+ dedent(
+ """
**Config schema**:
**p1:** (string) This item has the following options:
@@ -440,7 +473,8 @@ class GetSchemaDocTest(CiTestCase):
The default value is option1
- """),
+ """
+ ),
get_meta_doc(self.meta, schema),
)
@@ -475,7 +509,7 @@ class GetSchemaDocTest(CiTestCase):
"type": "string",
},
"prop_array": {
- "label": 'array_label',
+ "label": "array_label",
"type": "array",
"items": {
"type": "object",
@@ -490,7 +524,7 @@ class GetSchemaDocTest(CiTestCase):
"type": "string",
"label": "label2",
}
- }
+ },
}
meta_doc = get_meta_doc(self.meta, schema)
assert "**label1:** (string)" in meta_doc
@@ -507,20 +541,23 @@ class AnnotatedCloudconfigFileTest(CiTestCase):
def test_annotated_cloudconfig_file_no_schema_errors(self):
"""With no schema_errors, print the original content."""
- content = b'ntp:\n pools: [ntp1.pools.com]\n'
+ content = b"ntp:\n pools: [ntp1.pools.com]\n"
self.assertEqual(
- content,
- annotated_cloudconfig_file({}, content, schema_errors=[]))
+ content, annotated_cloudconfig_file({}, content, schema_errors=[])
+ )
def test_annotated_cloudconfig_file_schema_annotates_and_adds_footer(self):
"""With schema_errors, error lines are annotated and a footer added."""
- content = dedent("""\
+ content = dedent(
+ """\
#cloud-config
# comment
ntp:
pools: [-99, 75]
- """).encode()
- expected = dedent("""\
+ """
+ ).encode()
+ expected = dedent(
+ """\
#cloud-config
# comment
ntp: # E1
@@ -531,38 +568,48 @@ class AnnotatedCloudconfigFileTest(CiTestCase):
# E2: -99 is not a string
# E3: 75 is not a string
- """)
+ """
+ )
parsed_config = safe_load(content[13:])
schema_errors = [
- ('ntp', 'Some type error'), ('ntp.pools.0', '-99 is not a string'),
- ('ntp.pools.1', '75 is not a string')]
+ ("ntp", "Some type error"),
+ ("ntp.pools.0", "-99 is not a string"),
+ ("ntp.pools.1", "75 is not a string"),
+ ]
self.assertEqual(
expected,
- annotated_cloudconfig_file(parsed_config, content, schema_errors))
+ annotated_cloudconfig_file(parsed_config, content, schema_errors),
+ )
def test_annotated_cloudconfig_file_annotates_separate_line_items(self):
"""Errors are annotated for lists with items on separate lines."""
- content = dedent("""\
+ content = dedent(
+ """\
#cloud-config
# comment
ntp:
pools:
- -99
- 75
- """).encode()
- expected = dedent("""\
+ """
+ ).encode()
+ expected = dedent(
+ """\
ntp:
pools:
- -99 # E1
- 75 # E2
- """)
+ """
+ )
parsed_config = safe_load(content[13:])
schema_errors = [
- ('ntp.pools.0', '-99 is not a string'),
- ('ntp.pools.1', '75 is not a string')]
+ ("ntp.pools.0", "-99 is not a string"),
+ ("ntp.pools.1", "75 is not a string"),
+ ]
self.assertIn(
expected,
- annotated_cloudconfig_file(parsed_config, content, schema_errors))
+ annotated_cloudconfig_file(parsed_config, content, schema_errors),
+ )
class TestMain:
@@ -575,94 +622,94 @@ class TestMain:
def test_main_exclusive_args(self, params, capsys):
"""Main exits non-zero and error on required exclusive args."""
params = list(itertools.chain(*[a.split() for a in params]))
- with mock.patch('sys.argv', ['mycmd'] + params):
+ with mock.patch("sys.argv", ["mycmd"] + params):
with pytest.raises(SystemExit) as context_manager:
main()
assert 1 == context_manager.value.code
_out, err = capsys.readouterr()
expected = (
- 'Error:\n'
- 'Expected one of --config-file, --system or --docs arguments\n'
+ "Error:\n"
+ "Expected one of --config-file, --system or --docs arguments\n"
)
assert expected == err
def test_main_missing_args(self, capsys):
"""Main exits non-zero and reports an error on missing parameters."""
- with mock.patch('sys.argv', ['mycmd']):
+ with mock.patch("sys.argv", ["mycmd"]):
with pytest.raises(SystemExit) as context_manager:
main()
assert 1 == context_manager.value.code
_out, err = capsys.readouterr()
expected = (
- 'Error:\n'
- 'Expected one of --config-file, --system or --docs arguments\n'
+ "Error:\n"
+ "Expected one of --config-file, --system or --docs arguments\n"
)
assert expected == err
def test_main_absent_config_file(self, capsys):
"""Main exits non-zero when config file is absent."""
- myargs = ['mycmd', '--annotate', '--config-file', 'NOT_A_FILE']
- with mock.patch('sys.argv', myargs):
+ myargs = ["mycmd", "--annotate", "--config-file", "NOT_A_FILE"]
+ with mock.patch("sys.argv", myargs):
with pytest.raises(SystemExit) as context_manager:
main()
assert 1 == context_manager.value.code
_out, err = capsys.readouterr()
- assert 'Error:\nConfigfile NOT_A_FILE does not exist\n' == err
+ assert "Error:\nConfigfile NOT_A_FILE does not exist\n" == err
def test_main_prints_docs(self, capsys):
"""When --docs parameter is provided, main generates documentation."""
- myargs = ['mycmd', '--docs', 'all']
- with mock.patch('sys.argv', myargs):
- assert 0 == main(), 'Expected 0 exit code'
+ myargs = ["mycmd", "--docs", "all"]
+ with mock.patch("sys.argv", myargs):
+ assert 0 == main(), "Expected 0 exit code"
out, _err = capsys.readouterr()
- assert '\nNTP\n---\n' in out
- assert '\nRuncmd\n------\n' in out
+ assert "\nNTP\n---\n" in out
+ assert "\nRuncmd\n------\n" in out
def test_main_validates_config_file(self, tmpdir, capsys):
"""When --config-file parameter is provided, main validates schema."""
- myyaml = tmpdir.join('my.yaml')
- myargs = ['mycmd', '--config-file', myyaml.strpath]
- myyaml.write(b'#cloud-config\nntp:') # shortest ntp schema
- with mock.patch('sys.argv', myargs):
- assert 0 == main(), 'Expected 0 exit code'
+ myyaml = tmpdir.join("my.yaml")
+ myargs = ["mycmd", "--config-file", myyaml.strpath]
+ myyaml.write(b"#cloud-config\nntp:") # shortest ntp schema
+ with mock.patch("sys.argv", myargs):
+ assert 0 == main(), "Expected 0 exit code"
out, _err = capsys.readouterr()
- assert 'Valid cloud-config: {0}\n'.format(myyaml) == out
+ assert "Valid cloud-config: {0}\n".format(myyaml) == out
- @mock.patch('cloudinit.config.schema.read_cfg_paths')
- @mock.patch('cloudinit.config.schema.os.getuid', return_value=0)
+ @mock.patch("cloudinit.config.schema.read_cfg_paths")
+ @mock.patch("cloudinit.config.schema.os.getuid", return_value=0)
def test_main_validates_system_userdata(
self, m_getuid, m_read_cfg_paths, capsys, paths
):
"""When --system is provided, main validates system userdata."""
m_read_cfg_paths.return_value = paths
ud_file = paths.get_ipath_cur("userdata_raw")
- write_file(ud_file, b'#cloud-config\nntp:')
- myargs = ['mycmd', '--system']
- with mock.patch('sys.argv', myargs):
- assert 0 == main(), 'Expected 0 exit code'
+ write_file(ud_file, b"#cloud-config\nntp:")
+ myargs = ["mycmd", "--system"]
+ with mock.patch("sys.argv", myargs):
+ assert 0 == main(), "Expected 0 exit code"
out, _err = capsys.readouterr()
- assert 'Valid cloud-config: system userdata\n' == out
+ assert "Valid cloud-config: system userdata\n" == out
- @mock.patch('cloudinit.config.schema.os.getuid', return_value=1000)
+ @mock.patch("cloudinit.config.schema.os.getuid", return_value=1000)
def test_main_system_userdata_requires_root(self, m_getuid, capsys, paths):
"""Non-root user can't use --system param"""
- myargs = ['mycmd', '--system']
- with mock.patch('sys.argv', myargs):
+ myargs = ["mycmd", "--system"]
+ with mock.patch("sys.argv", myargs):
with pytest.raises(SystemExit) as context_manager:
main()
assert 1 == context_manager.value.code
_out, err = capsys.readouterr()
expected = (
- 'Error:\nUnable to read system userdata as non-root user. '
- 'Try using sudo\n'
+ "Error:\nUnable to read system userdata as non-root user. "
+ "Try using sudo\n"
)
assert expected == err
def _get_meta_doc_examples():
- examples_dir = Path(cloud_init_project_dir('doc/examples'))
+ examples_dir = Path(cloud_init_project_dir("doc/examples"))
assert examples_dir.is_dir()
return (
@@ -712,7 +759,7 @@ class TestStrictMetaschema:
}
with pytest.raises(
SchemaValidationError,
- match=(r"Additional properties are not allowed.*")
+ match=r"Additional properties are not allowed.*",
):
validate_cloudconfig_metaschema(validator, schema)