diff options
Diffstat (limited to 'tests/unittests/config/test_schema.py')
-rw-r--r-- | tests/unittests/config/test_schema.py | 917 |
1 files changed, 917 insertions, 0 deletions
diff --git a/tests/unittests/config/test_schema.py b/tests/unittests/config/test_schema.py new file mode 100644 index 00000000..3a39f343 --- /dev/null +++ b/tests/unittests/config/test_schema.py @@ -0,0 +1,917 @@ +# This file is part of cloud-init. See LICENSE file for license information. + + +import importlib +import inspect +import itertools +import logging +import sys +from copy import copy +from pathlib import Path +from textwrap import dedent + +import pytest +import yaml +from yaml import safe_load + +from cloudinit.config.schema import ( + CLOUD_CONFIG_HEADER, + MetaSchema, + SchemaValidationError, + _schemapath_for_cloudconfig, + annotated_cloudconfig_file, + get_jsonschema_validator, + get_meta_doc, + get_schema, + load_doc, + main, + validate_cloudconfig_file, + validate_cloudconfig_metaschema, + validate_cloudconfig_schema, +) +from cloudinit.util import write_file +from tests.unittests.helpers import ( + CiTestCase, + cloud_init_project_dir, + mock, + skipUnlessJsonSchema, +) + + +def get_schemas() -> dict: + """Return all legacy module schemas + + Assumes that module schemas have the variable name "schema" + """ + return get_module_variable("schema") + + +def get_metas() -> dict: + """Return all module metas + + Assumes that module schemas have the variable name "schema" + """ + return get_module_variable("meta") + + +def get_module_variable(var_name) -> dict: + """Inspect modules and get variable from module matching var_name""" + schemas = {} + + files = list( + Path(cloud_init_project_dir("cloudinit/config/")).glob("cc_*.py") + ) + + modules = [mod.stem for mod in files] + + for module in modules: + importlib.import_module("cloudinit.config.{}".format(module)) + + for k, v in sys.modules.items(): + path = Path(k) + + if "cloudinit.config" == path.stem and path.suffix[1:4] == "cc_": + module_name = path.suffix[1:] + members = inspect.getmembers(v) + schemas[module_name] = None + for name, value in members: + if name == var_name: + schemas[module_name] = value + break + return schemas + + +class TestGetSchema: + def test_get_schema_coalesces_known_schema(self): + """Every cloudconfig module with schema is listed in allOf keyword.""" + schema = get_schema() + assert sorted( + [ + "cc_apk_configure", + "cc_apt_configure", + "cc_apt_pipelining", + "cc_bootcmd", + "cc_byobu", + "cc_ca_certs", + "cc_chef", + "cc_debug", + "cc_disable_ec2_metadata", + "cc_disk_setup", + "cc_install_hotplug", + "cc_keyboard", + "cc_locale", + "cc_ntp", + "cc_resizefs", + "cc_resizefs_vyos", + "cc_runcmd", + "cc_snap", + "cc_ubuntu_advantage", + "cc_ubuntu_drivers", + "cc_write_files", + "cc_zypper_add_repo", + ] + ) == sorted( + [meta["id"] for meta in get_metas().values() if meta is not None] + ) + assert "http://json-schema.org/draft-04/schema#" == schema["$schema"] + assert ["$defs", "$schema", "allOf"] == sorted(list(schema.keys())) + # New style schema should be defined in static schema file in $defs + expected_subschema_defs = [ + {"$ref": "#/$defs/cc_apk_configure"}, + {"$ref": "#/$defs/cc_apt_configure"}, + {"$ref": "#/$defs/cc_apt_pipelining"}, + {"$ref": "#/$defs/cc_bootcmd"}, + {"$ref": "#/$defs/cc_byobu"}, + {"$ref": "#/$defs/cc_ca_certs"}, + {"$ref": "#/$defs/cc_chef"}, + {"$ref": "#/$defs/cc_debug"}, + {"$ref": "#/$defs/cc_disable_ec2_metadata"}, + {"$ref": "#/$defs/cc_disk_setup"}, + ] + found_subschema_defs = [] + legacy_schema_keys = [] + for subschema in schema["allOf"]: + if "$ref" in subschema: + found_subschema_defs.append(subschema) + else: # Legacy subschema sourced from cc_* module 'schema' attr + legacy_schema_keys.extend(subschema["properties"].keys()) + + assert expected_subschema_defs == found_subschema_defs + # This list will dwindle as we move legacy schema to new $defs + assert [ + "drivers", + "keyboard", + "locale", + "locale_configfile", + "ntp", + "resize_rootfs", + "resizefs_enabled", + "resizefs_list", + "runcmd", + "snap", + "ubuntu_advantage", + "updates", + "write_files", + "write_files", + "zypper", + ] == sorted(legacy_schema_keys) + + +class TestLoadDoc: + + docs = get_module_variable("__doc__") + + # TODO( Drop legacy test when all sub-schemas in cloud-init-schema.json ) + @pytest.mark.parametrize( + "module_name", + ( + "cc_apt_pipelining", # new style composite schema file + "cc_zypper_add_repo", # legacy sub-schema defined in module + ), + ) + def test_report_docs_for_legacy_and_consolidated_schema(self, module_name): + doc = load_doc([module_name]) + assert doc, "Unexpected empty docs for {}".format(module_name) + assert self.docs[module_name] == doc + + +class Test_SchemapathForCloudconfig: + """Coverage tests for supported YAML formats.""" + + @pytest.mark.parametrize( + "source_content, expected", + ( + (b"{}", {}), # assert empty config handled + # Multiple keys account for comments and whitespace lines + (b"#\na: va\n \nb: vb\n#\nc: vc", {"a": 2, "b": 4, "c": 6}), + # List items represented on correct line number + (b"a:\n - a1\n\n - a2\n", {"a": 1, "a.0": 2, "a.1": 4}), + # Nested dicts represented on correct line number + (b"a:\n a1:\n\n aa1: aa1v\n", {"a": 1, "a.a1": 2, "a.a1.aa1": 4}), + ), + ) + def test_schemapaths_representatative_of_source_yaml( + self, source_content, expected + ): + """Validate schemapaths dict accurately represents source YAML line.""" + cfg = yaml.safe_load(source_content) + assert expected == _schemapath_for_cloudconfig( + config=cfg, original_content=source_content + ) + + +class SchemaValidationErrorTest(CiTestCase): + """Test validate_cloudconfig_schema""" + + def test_schema_validation_error_expects_schema_errors(self): + """SchemaValidationError is initialized from schema_errors.""" + errors = ( + ("key.path", 'unexpected key "junk"'), + ("key2.path", '"-123" is not a valid "hostname" format'), + ) + exception = SchemaValidationError(schema_errors=errors) + self.assertIsInstance(exception, Exception) + self.assertEqual(exception.schema_errors, errors) + self.assertEqual( + 'Cloud config schema errors: key.path: unexpected key "junk", ' + 'key2.path: "-123" is not a valid "hostname" format', + str(exception), + ) + self.assertTrue(isinstance(exception, ValueError)) + + +class TestValidateCloudConfigSchema: + """Tests for validate_cloudconfig_schema.""" + + with_logs = True + + @pytest.mark.parametrize( + "schema, call_count", + ((None, 1), ({"properties": {"p1": {"type": "string"}}}, 0)), + ) + @skipUnlessJsonSchema() + @mock.patch("cloudinit.config.schema.get_schema") + def test_validateconfig_schema_use_full_schema_when_no_schema_param( + self, get_schema, schema, call_count + ): + """Use full schema when schema param is absent.""" + get_schema.return_value = {"properties": {"p1": {"type": "string"}}} + kwargs = {"config": {"p1": "valid"}} + if schema: + kwargs["schema"] = schema + validate_cloudconfig_schema(**kwargs) + assert call_count == get_schema.call_count + + @skipUnlessJsonSchema() + def test_validateconfig_schema_non_strict_emits_warnings(self, caplog): + """When strict is False validate_cloudconfig_schema emits warnings.""" + schema = {"properties": {"p1": {"type": "string"}}} + validate_cloudconfig_schema({"p1": -1}, schema, strict=False) + [(module, log_level, log_msg)] = caplog.record_tuples + assert "cloudinit.config.schema" == module + assert logging.WARNING == log_level + assert ( + "Invalid cloud-config provided:\np1: -1 is not of type 'string'" + == log_msg + ) + + @skipUnlessJsonSchema() + def test_validateconfig_schema_emits_warning_on_missing_jsonschema( + self, caplog + ): + """Warning from validate_cloudconfig_schema when missing jsonschema.""" + schema = {"properties": {"p1": {"type": "string"}}} + with mock.patch.dict("sys.modules", **{"jsonschema": ImportError()}): + validate_cloudconfig_schema({"p1": -1}, schema, strict=True) + assert "Ignoring schema validation. jsonschema is not present" in ( + caplog.text + ) + + @skipUnlessJsonSchema() + def test_validateconfig_schema_strict_raises_errors(self): + """When strict is True validate_cloudconfig_schema raises errors.""" + schema = {"properties": {"p1": {"type": "string"}}} + with pytest.raises(SchemaValidationError) as context_mgr: + validate_cloudconfig_schema({"p1": -1}, schema, strict=True) + assert ( + "Cloud config schema errors: p1: -1 is not of type 'string'" + == (str(context_mgr.value)) + ) + + @skipUnlessJsonSchema() + def test_validateconfig_schema_honors_formats(self): + """With strict True, validate_cloudconfig_schema errors on format.""" + schema = {"properties": {"p1": {"type": "string", "format": "email"}}} + with pytest.raises(SchemaValidationError) as context_mgr: + validate_cloudconfig_schema({"p1": "-1"}, schema, strict=True) + assert "Cloud config schema errors: p1: '-1' is not a 'email'" == ( + str(context_mgr.value) + ) + + @skipUnlessJsonSchema() + def test_validateconfig_schema_honors_formats_strict_metaschema(self): + """With strict and strict_metaschema True, ensure errors on format""" + schema = {"properties": {"p1": {"type": "string", "format": "email"}}} + with pytest.raises(SchemaValidationError) as context_mgr: + validate_cloudconfig_schema( + {"p1": "-1"}, schema, strict=True, strict_metaschema=True + ) + assert "Cloud config schema errors: p1: '-1' is not a 'email'" == str( + context_mgr.value + ) + + @skipUnlessJsonSchema() + def test_validateconfig_strict_metaschema_do_not_raise_exception( + self, caplog + ): + """With strict_metaschema=True, do not raise exceptions. + + This flag is currently unused, but is intended for run-time validation. + This should warn, but not raise. + """ + schema = {"properties": {"p1": {"types": "string", "format": "email"}}} + validate_cloudconfig_schema( + {"p1": "-1"}, schema, strict_metaschema=True + ) + assert ( + "Meta-schema validation failed, attempting to validate config" + in caplog.text + ) + + +class TestCloudConfigExamples: + metas = get_metas() + params = [ + (meta["id"], example) + for meta in metas.values() + if meta and meta.get("examples") + for example in meta.get("examples") + ] + + @pytest.mark.parametrize("schema_id, example", params) + @skipUnlessJsonSchema() + def test_validateconfig_schema_of_example(self, schema_id, example): + """For a given example in a config module we test if it is valid + according to the unified schema of all config modules + """ + schema = get_schema() + config_load = safe_load(example) + validate_cloudconfig_schema(config_load, schema, strict=True) + + +class ValidateCloudConfigFileTest(CiTestCase): + """Tests for validate_cloudconfig_file.""" + + def setUp(self): + super(ValidateCloudConfigFileTest, self).setUp() + self.config_file = self.tmp_path("cloudcfg.yaml") + + def test_validateconfig_file_error_on_absent_file(self): + """On absent config_path, validate_cloudconfig_file errors.""" + with self.assertRaises(RuntimeError) as context_mgr: + validate_cloudconfig_file("/not/here", {}) + self.assertEqual( + "Configfile /not/here does not exist", str(context_mgr.exception) + ) + + def test_validateconfig_file_error_on_invalid_header(self): + """On invalid header, validate_cloudconfig_file errors. + + A SchemaValidationError is raised when the file doesn't begin with + CLOUD_CONFIG_HEADER. + """ + write_file(self.config_file, "#junk") + with self.assertRaises(SchemaValidationError) as context_mgr: + validate_cloudconfig_file(self.config_file, {}) + self.assertEqual( + "Cloud config schema errors: format-l1.c1: File {0} needs to begin" + ' with "{1}"'.format( + self.config_file, CLOUD_CONFIG_HEADER.decode() + ), + str(context_mgr.exception), + ) + + def test_validateconfig_file_error_on_non_yaml_scanner_error(self): + """On non-yaml scan issues, validate_cloudconfig_file errors.""" + # Generate a scanner error by providing text on a single line with + # improper indent. + write_file(self.config_file, "#cloud-config\nasdf:\nasdf") + with self.assertRaises(SchemaValidationError) as context_mgr: + validate_cloudconfig_file(self.config_file, {}) + self.assertIn( + "schema errors: format-l3.c1: File {0} is not valid yaml.".format( + self.config_file + ), + str(context_mgr.exception), + ) + + def test_validateconfig_file_error_on_non_yaml_parser_error(self): + """On non-yaml parser issues, validate_cloudconfig_file errors.""" + write_file(self.config_file, "#cloud-config\n{}}") + with self.assertRaises(SchemaValidationError) as context_mgr: + validate_cloudconfig_file(self.config_file, {}) + self.assertIn( + "schema errors: format-l2.c3: File {0} is not valid yaml.".format( + self.config_file + ), + str(context_mgr.exception), + ) + + @skipUnlessJsonSchema() + def test_validateconfig_file_sctrictly_validates_schema(self): + """validate_cloudconfig_file raises errors on invalid schema.""" + schema = {"properties": {"p1": {"type": "string", "format": "string"}}} + write_file(self.config_file, "#cloud-config\np1: -1") + with self.assertRaises(SchemaValidationError) as context_mgr: + validate_cloudconfig_file(self.config_file, schema) + self.assertEqual( + "Cloud config schema errors: p1: -1 is not of type 'string'", + str(context_mgr.exception), + ) + + +class GetSchemaDocTest(CiTestCase): + """Tests for get_meta_doc.""" + + def setUp(self): + super(GetSchemaDocTest, self).setUp() + self.required_schema = { + "title": "title", + "description": "description", + "id": "id", + "name": "name", + "frequency": "frequency", + "distros": ["debian", "rhel"], + } + self.meta: MetaSchema = { + "title": "title", + "description": "description", + "id": "id", + "name": "name", + "frequency": "frequency", + "distros": ["debian", "rhel"], + "examples": [ + 'ex1:\n [don\'t, expand, "this"]', + "ex2: true", + ], + } + + def test_get_meta_doc_returns_restructured_text(self): + """get_meta_doc returns restructured text for a cloudinit schema.""" + full_schema = copy(self.required_schema) + full_schema.update( + { + "properties": { + "prop1": { + "type": "array", + "description": "prop-description", + "items": {"type": "integer"}, + } + } + } + ) + + doc = get_meta_doc(self.meta, full_schema) + self.assertEqual( + dedent( + """ + name + ---- + **Summary:** title + + description + + **Internal name:** ``id`` + + **Module frequency:** frequency + + **Supported distros:** debian, rhel + + **Config schema**: + **prop1:** (array of integer) prop-description + + **Examples**:: + + ex1: + [don't, expand, "this"] + # --- Example2 --- + ex2: true + """ + ), + doc, + ) + + def test_get_meta_doc_handles_multiple_types(self): + """get_meta_doc delimits multiple property types with a '/'.""" + schema = {"properties": {"prop1": {"type": ["string", "integer"]}}} + self.assertIn( + "**prop1:** (string/integer)", get_meta_doc(self.meta, schema) + ) + + def test_get_meta_doc_handles_enum_types(self): + """get_meta_doc converts enum types to yaml and delimits with '/'.""" + schema = {"properties": {"prop1": {"enum": [True, False, "stuff"]}}} + self.assertIn( + "**prop1:** (true/false/stuff)", get_meta_doc(self.meta, schema) + ) + + def test_get_meta_doc_handles_nested_oneof_property_types(self): + """get_meta_doc describes array items oneOf declarations in type.""" + schema = { + "properties": { + "prop1": { + "type": "array", + "items": { + "oneOf": [{"type": "string"}, {"type": "integer"}] + }, + } + } + } + self.assertIn( + "**prop1:** (array of (string)/(integer))", + get_meta_doc(self.meta, schema), + ) + + def test_get_meta_doc_handles_string_examples(self): + """get_meta_doc properly indented examples as a list of strings.""" + full_schema = copy(self.required_schema) + full_schema.update( + { + "examples": [ + 'ex1:\n [don\'t, expand, "this"]', + "ex2: true", + ], + "properties": { + "prop1": { + "type": "array", + "description": "prop-description", + "items": {"type": "integer"}, + } + }, + } + ) + self.assertIn( + dedent( + """ + **Config schema**: + **prop1:** (array of integer) prop-description + + **Examples**:: + + ex1: + [don't, expand, "this"] + # --- Example2 --- + ex2: true + """ + ), + get_meta_doc(self.meta, full_schema), + ) + + def test_get_meta_doc_properly_parse_description(self): + """get_meta_doc description properly formatted""" + schema = { + "properties": { + "p1": { + "type": "string", + "description": dedent( + """\ + This item + has the + following options: + + - option1 + - option2 + - option3 + + The default value is + option1""" + ), + } + } + } + + self.assertIn( + dedent( + """ + **Config schema**: + **p1:** (string) This item has the following options: + + - option1 + - option2 + - option3 + + The default value is option1 + + """ + ), + get_meta_doc(self.meta, schema), + ) + + def test_get_meta_doc_raises_key_errors(self): + """get_meta_doc raises KeyErrors on missing keys.""" + schema = { + "properties": { + "prop1": { + "type": "array", + "items": { + "oneOf": [{"type": "string"}, {"type": "integer"}] + }, + } + } + } + for key in self.meta: + invalid_meta = copy(self.meta) + invalid_meta.pop(key) + with self.assertRaises(KeyError) as context_mgr: + get_meta_doc(invalid_meta, schema) + self.assertIn(key, str(context_mgr.exception)) + + def test_label_overrides_property_name(self): + """get_meta_doc overrides property name with label.""" + schema = { + "properties": { + "prop1": { + "type": "string", + "label": "label1", + }, + "prop_no_label": { + "type": "string", + }, + "prop_array": { + "label": "array_label", + "type": "array", + "items": { + "type": "object", + "properties": { + "some_prop": {"type": "number"}, + }, + }, + }, + }, + "patternProperties": { + "^.*$": { + "type": "string", + "label": "label2", + } + }, + } + meta_doc = get_meta_doc(self.meta, schema) + assert "**label1:** (string)" in meta_doc + assert "**label2:** (string" in meta_doc + assert "**prop_no_label:** (string)" in meta_doc + assert "Each item in **array_label** list" in meta_doc + + assert "prop1" not in meta_doc + assert ".*" not in meta_doc + + +class AnnotatedCloudconfigFileTest(CiTestCase): + maxDiff = None + + def test_annotated_cloudconfig_file_no_schema_errors(self): + """With no schema_errors, print the original content.""" + content = b"ntp:\n pools: [ntp1.pools.com]\n" + self.assertEqual( + content, annotated_cloudconfig_file({}, content, schema_errors=[]) + ) + + def test_annotated_cloudconfig_file_with_non_dict_cloud_config(self): + """Error when empty non-dict cloud-config is provided. + + OurJSON validation when user-data is None type generates a bunch + schema validation errors of the format: + ('', "None is not of type 'object'"). Ignore those symptoms and + report the general problem instead. + """ + content = b"\n\n\n" + expected = "\n".join( + [ + content.decode(), + "# Errors: -------------", + "# E1: Cloud-config is not a YAML dict.\n\n", + ] + ) + self.assertEqual( + expected, + annotated_cloudconfig_file( + None, + content, + schema_errors=[("", "None is not of type 'object'")], + ), + ) + + def test_annotated_cloudconfig_file_schema_annotates_and_adds_footer(self): + """With schema_errors, error lines are annotated and a footer added.""" + content = dedent( + """\ + #cloud-config + # comment + ntp: + pools: [-99, 75] + """ + ).encode() + expected = dedent( + """\ + #cloud-config + # comment + ntp: # E1 + pools: [-99, 75] # E2,E3 + + # Errors: ------------- + # E1: Some type error + # E2: -99 is not a string + # E3: 75 is not a string + + """ + ) + parsed_config = safe_load(content[13:]) + schema_errors = [ + ("ntp", "Some type error"), + ("ntp.pools.0", "-99 is not a string"), + ("ntp.pools.1", "75 is not a string"), + ] + self.assertEqual( + expected, + annotated_cloudconfig_file(parsed_config, content, schema_errors), + ) + + def test_annotated_cloudconfig_file_annotates_separate_line_items(self): + """Errors are annotated for lists with items on separate lines.""" + content = dedent( + """\ + #cloud-config + # comment + ntp: + pools: + - -99 + - 75 + """ + ).encode() + expected = dedent( + """\ + ntp: + pools: + - -99 # E1 + - 75 # E2 + """ + ) + parsed_config = safe_load(content[13:]) + schema_errors = [ + ("ntp.pools.0", "-99 is not a string"), + ("ntp.pools.1", "75 is not a string"), + ] + self.assertIn( + expected, + annotated_cloudconfig_file(parsed_config, content, schema_errors), + ) + + +class TestMain: + + exclusive_combinations = itertools.combinations( + ["--system", "--docs all", "--config-file something"], 2 + ) + + @pytest.mark.parametrize("params", exclusive_combinations) + def test_main_exclusive_args(self, params, capsys): + """Main exits non-zero and error on required exclusive args.""" + params = list(itertools.chain(*[a.split() for a in params])) + with mock.patch("sys.argv", ["mycmd"] + params): + with pytest.raises(SystemExit) as context_manager: + main() + assert 1 == context_manager.value.code + + _out, err = capsys.readouterr() + expected = ( + "Error:\n" + "Expected one of --config-file, --system or --docs arguments\n" + ) + assert expected == err + + def test_main_missing_args(self, capsys): + """Main exits non-zero and reports an error on missing parameters.""" + with mock.patch("sys.argv", ["mycmd"]): + with pytest.raises(SystemExit) as context_manager: + main() + assert 1 == context_manager.value.code + + _out, err = capsys.readouterr() + expected = ( + "Error:\n" + "Expected one of --config-file, --system or --docs arguments\n" + ) + assert expected == err + + def test_main_absent_config_file(self, capsys): + """Main exits non-zero when config file is absent.""" + myargs = ["mycmd", "--annotate", "--config-file", "NOT_A_FILE"] + with mock.patch("sys.argv", myargs): + with pytest.raises(SystemExit) as context_manager: + main() + assert 1 == context_manager.value.code + _out, err = capsys.readouterr() + assert "Error:\nConfigfile NOT_A_FILE does not exist\n" == err + + def test_main_invalid_flag_combo(self, capsys): + """Main exits non-zero when invalid flag combo used.""" + myargs = ["mycmd", "--annotate", "--docs", "DOES_NOT_MATTER"] + with mock.patch("sys.argv", myargs): + with pytest.raises(SystemExit) as context_manager: + main() + assert 1 == context_manager.value.code + _, err = capsys.readouterr() + assert ( + "Error:\nInvalid flag combination. " + "Cannot use --annotate with --docs\n" == err + ) + + def test_main_prints_docs(self, capsys): + """When --docs parameter is provided, main generates documentation.""" + myargs = ["mycmd", "--docs", "all"] + with mock.patch("sys.argv", myargs): + assert 0 == main(), "Expected 0 exit code" + out, _err = capsys.readouterr() + assert "\nNTP\n---\n" in out + assert "\nRuncmd\n------\n" in out + + def test_main_validates_config_file(self, tmpdir, capsys): + """When --config-file parameter is provided, main validates schema.""" + myyaml = tmpdir.join("my.yaml") + myargs = ["mycmd", "--config-file", myyaml.strpath] + myyaml.write(b"#cloud-config\nntp:") # shortest ntp schema + with mock.patch("sys.argv", myargs): + assert 0 == main(), "Expected 0 exit code" + out, _err = capsys.readouterr() + assert "Valid cloud-config: {0}\n".format(myyaml) == out + + @mock.patch("cloudinit.config.schema.read_cfg_paths") + @mock.patch("cloudinit.config.schema.os.getuid", return_value=0) + def test_main_validates_system_userdata( + self, m_getuid, m_read_cfg_paths, capsys, paths + ): + """When --system is provided, main validates system userdata.""" + m_read_cfg_paths.return_value = paths + ud_file = paths.get_ipath_cur("userdata_raw") + write_file(ud_file, b"#cloud-config\nntp:") + myargs = ["mycmd", "--system"] + with mock.patch("sys.argv", myargs): + assert 0 == main(), "Expected 0 exit code" + out, _err = capsys.readouterr() + assert "Valid cloud-config: system userdata\n" == out + + @mock.patch("cloudinit.config.schema.os.getuid", return_value=1000) + def test_main_system_userdata_requires_root(self, m_getuid, capsys, paths): + """Non-root user can't use --system param""" + myargs = ["mycmd", "--system"] + with mock.patch("sys.argv", myargs): + with pytest.raises(SystemExit) as context_manager: + main() + assert 1 == context_manager.value.code + _out, err = capsys.readouterr() + expected = ( + "Error:\nUnable to read system userdata as non-root user. " + "Try using sudo\n" + ) + assert expected == err + + +def _get_meta_doc_examples(): + examples_dir = Path(cloud_init_project_dir("doc/examples")) + assert examples_dir.is_dir() + + return ( + str(f) + for f in examples_dir.glob("cloud-config*.txt") + if not f.name.startswith("cloud-config-archive") + ) + + +class TestSchemaDocExamples: + schema = get_schema() + + @pytest.mark.parametrize("example_path", _get_meta_doc_examples()) + @skipUnlessJsonSchema() + def test_schema_doc_examples(self, example_path): + validate_cloudconfig_file(example_path, self.schema) + + +class TestStrictMetaschema: + """Validate that schemas follow a stricter metaschema definition than + the default. This disallows arbitrary key/value pairs. + """ + + @skipUnlessJsonSchema() + def test_modules(self): + """Validate all modules with a stricter metaschema""" + (validator, _) = get_jsonschema_validator() + for (name, value) in get_schemas().items(): + if value: + validate_cloudconfig_metaschema(validator, value) + else: + logging.warning("module %s has no schema definition", name) + + @skipUnlessJsonSchema() + def test_validate_bad_module(self): + """Throw exception by default, don't throw if throw=False + + item should be 'items' and is therefore interpreted as an additional + property which is invalid with a strict metaschema + """ + (validator, _) = get_jsonschema_validator() + schema = { + "type": "array", + "item": { + "type": "object", + }, + } + with pytest.raises( + SchemaValidationError, + match=r"Additional properties are not allowed.*", + ): + + validate_cloudconfig_metaschema(validator, schema) + + validate_cloudconfig_metaschema(validator, schema, throw=False) + + +# vi: ts=4 expandtab syntax=python |