diff options
Diffstat (limited to 'tests/unittests/config/test_schema.py')
-rw-r--r-- | tests/unittests/config/test_schema.py | 515 |
1 files changed, 515 insertions, 0 deletions
diff --git a/tests/unittests/config/test_schema.py b/tests/unittests/config/test_schema.py new file mode 100644 index 00000000..b01f5eea --- /dev/null +++ b/tests/unittests/config/test_schema.py @@ -0,0 +1,515 @@ +# This file is part of cloud-init. See LICENSE file for license information. +import cloudinit +from cloudinit.config.schema import ( + CLOUD_CONFIG_HEADER, SchemaValidationError, annotated_cloudconfig_file, + get_schema_doc, get_schema, validate_cloudconfig_file, + validate_cloudconfig_schema, main) +from cloudinit.util import write_file + +from tests.unittests.helpers import CiTestCase, mock, skipUnlessJsonSchema + +from copy import copy +import itertools +import pytest +from pathlib import Path +from textwrap import dedent +from yaml import safe_load + + +class GetSchemaTest(CiTestCase): + + def test_get_schema_coalesces_known_schema(self): + """Every cloudconfig module with schema is listed in allOf keyword.""" + schema = get_schema() + self.assertCountEqual( + [ + 'cc_apk_configure', + 'cc_apt_configure', + 'cc_bootcmd', + 'cc_locale', + 'cc_ntp', + 'cc_resizefs', + 'cc_runcmd', + 'cc_snap', + 'cc_ubuntu_advantage', + 'cc_ubuntu_drivers', + 'cc_write_files', + 'cc_write_files_deferred', + 'cc_zypper_add_repo', + 'cc_chef', + 'cc_install_hotplug', + ], + [subschema['id'] for subschema in schema['allOf']]) + self.assertEqual('cloud-config-schema', schema['id']) + self.assertEqual( + 'http://json-schema.org/draft-04/schema#', + schema['$schema']) + # FULL_SCHEMA is updated by the get_schema call + from cloudinit.config.schema import FULL_SCHEMA + self.assertCountEqual(['id', '$schema', 'allOf'], FULL_SCHEMA.keys()) + + def test_get_schema_returns_global_when_set(self): + """When FULL_SCHEMA global is already set, get_schema returns it.""" + m_schema_path = 'cloudinit.config.schema.FULL_SCHEMA' + with mock.patch(m_schema_path, {'here': 'iam'}): + self.assertEqual({'here': 'iam'}, get_schema()) + + +class SchemaValidationErrorTest(CiTestCase): + """Test validate_cloudconfig_schema""" + + def test_schema_validation_error_expects_schema_errors(self): + """SchemaValidationError is initialized from schema_errors.""" + errors = (('key.path', 'unexpected key "junk"'), + ('key2.path', '"-123" is not a valid "hostname" format')) + exception = SchemaValidationError(schema_errors=errors) + self.assertIsInstance(exception, Exception) + self.assertEqual(exception.schema_errors, errors) + self.assertEqual( + 'Cloud config schema errors: key.path: unexpected key "junk", ' + 'key2.path: "-123" is not a valid "hostname" format', + str(exception)) + self.assertTrue(isinstance(exception, ValueError)) + + +class ValidateCloudConfigSchemaTest(CiTestCase): + """Tests for validate_cloudconfig_schema.""" + + with_logs = True + + @skipUnlessJsonSchema() + def test_validateconfig_schema_non_strict_emits_warnings(self): + """When strict is False validate_cloudconfig_schema emits warnings.""" + schema = {'properties': {'p1': {'type': 'string'}}} + validate_cloudconfig_schema({'p1': -1}, schema, strict=False) + self.assertIn( + "Invalid config:\np1: -1 is not of type 'string'\n", + self.logs.getvalue()) + + @skipUnlessJsonSchema() + def test_validateconfig_schema_emits_warning_on_missing_jsonschema(self): + """Warning from validate_cloudconfig_schema when missing jsonschema.""" + schema = {'properties': {'p1': {'type': 'string'}}} + with mock.patch.dict('sys.modules', **{'jsonschema': ImportError()}): + validate_cloudconfig_schema({'p1': -1}, schema, strict=True) + self.assertIn( + 'Ignoring schema validation. python-jsonschema is not present', + self.logs.getvalue()) + + @skipUnlessJsonSchema() + def test_validateconfig_schema_strict_raises_errors(self): + """When strict is True validate_cloudconfig_schema raises errors.""" + schema = {'properties': {'p1': {'type': 'string'}}} + with self.assertRaises(SchemaValidationError) as context_mgr: + validate_cloudconfig_schema({'p1': -1}, schema, strict=True) + self.assertEqual( + "Cloud config schema errors: p1: -1 is not of type 'string'", + str(context_mgr.exception)) + + @skipUnlessJsonSchema() + def test_validateconfig_schema_honors_formats(self): + """With strict True, validate_cloudconfig_schema errors on format.""" + schema = { + 'properties': {'p1': {'type': 'string', 'format': 'email'}}} + with self.assertRaises(SchemaValidationError) as context_mgr: + validate_cloudconfig_schema({'p1': '-1'}, schema, strict=True) + self.assertEqual( + "Cloud config schema errors: p1: '-1' is not a 'email'", + str(context_mgr.exception)) + + +class TestCloudConfigExamples: + schema = get_schema() + params = [ + (schema["id"], example) + for schema in schema["allOf"] for example in schema["examples"]] + + @pytest.mark.parametrize("schema_id,example", params) + @skipUnlessJsonSchema() + def test_validateconfig_schema_of_example(self, schema_id, example): + """ For a given example in a config module we test if it is valid + according to the unified schema of all config modules + """ + config_load = safe_load(example) + validate_cloudconfig_schema( + config_load, self.schema, strict=True) + + +class ValidateCloudConfigFileTest(CiTestCase): + """Tests for validate_cloudconfig_file.""" + + def setUp(self): + super(ValidateCloudConfigFileTest, self).setUp() + self.config_file = self.tmp_path('cloudcfg.yaml') + + def test_validateconfig_file_error_on_absent_file(self): + """On absent config_path, validate_cloudconfig_file errors.""" + with self.assertRaises(RuntimeError) as context_mgr: + validate_cloudconfig_file('/not/here', {}) + self.assertEqual( + 'Configfile /not/here does not exist', + str(context_mgr.exception)) + + def test_validateconfig_file_error_on_invalid_header(self): + """On invalid header, validate_cloudconfig_file errors. + + A SchemaValidationError is raised when the file doesn't begin with + CLOUD_CONFIG_HEADER. + """ + write_file(self.config_file, '#junk') + with self.assertRaises(SchemaValidationError) as context_mgr: + validate_cloudconfig_file(self.config_file, {}) + self.assertEqual( + 'Cloud config schema errors: format-l1.c1: File {0} needs to begin' + ' with "{1}"'.format( + self.config_file, CLOUD_CONFIG_HEADER.decode()), + str(context_mgr.exception)) + + def test_validateconfig_file_error_on_non_yaml_scanner_error(self): + """On non-yaml scan issues, validate_cloudconfig_file errors.""" + # Generate a scanner error by providing text on a single line with + # improper indent. + write_file(self.config_file, '#cloud-config\nasdf:\nasdf') + with self.assertRaises(SchemaValidationError) as context_mgr: + validate_cloudconfig_file(self.config_file, {}) + self.assertIn( + 'schema errors: format-l3.c1: File {0} is not valid yaml.'.format( + self.config_file), + str(context_mgr.exception)) + + def test_validateconfig_file_error_on_non_yaml_parser_error(self): + """On non-yaml parser issues, validate_cloudconfig_file errors.""" + write_file(self.config_file, '#cloud-config\n{}}') + with self.assertRaises(SchemaValidationError) as context_mgr: + validate_cloudconfig_file(self.config_file, {}) + self.assertIn( + 'schema errors: format-l2.c3: File {0} is not valid yaml.'.format( + self.config_file), + str(context_mgr.exception)) + + @skipUnlessJsonSchema() + def test_validateconfig_file_sctrictly_validates_schema(self): + """validate_cloudconfig_file raises errors on invalid schema.""" + schema = { + 'properties': {'p1': {'type': 'string', 'format': 'string'}}} + write_file(self.config_file, '#cloud-config\np1: -1') + with self.assertRaises(SchemaValidationError) as context_mgr: + validate_cloudconfig_file(self.config_file, schema) + self.assertEqual( + "Cloud config schema errors: p1: -1 is not of type 'string'", + str(context_mgr.exception)) + + +class GetSchemaDocTest(CiTestCase): + """Tests for get_schema_doc.""" + + def setUp(self): + super(GetSchemaDocTest, self).setUp() + self.required_schema = { + 'title': 'title', 'description': 'description', 'id': 'id', + 'name': 'name', 'frequency': 'frequency', + 'distros': ['debian', 'rhel']} + + def test_get_schema_doc_returns_restructured_text(self): + """get_schema_doc returns restructured text for a cloudinit schema.""" + full_schema = copy(self.required_schema) + full_schema.update( + {'properties': { + 'prop1': {'type': 'array', 'description': 'prop-description', + 'items': {'type': 'integer'}}}}) + self.assertEqual( + dedent(""" + name + ---- + **Summary:** title + + description + + **Internal name:** ``id`` + + **Module frequency:** frequency + + **Supported distros:** debian, rhel + + **Config schema**: + **prop1:** (array of integer) prop-description\n\n"""), + get_schema_doc(full_schema)) + + def test_get_schema_doc_handles_multiple_types(self): + """get_schema_doc delimits multiple property types with a '/'.""" + full_schema = copy(self.required_schema) + full_schema.update( + {'properties': { + 'prop1': {'type': ['string', 'integer'], + 'description': 'prop-description'}}}) + self.assertIn( + '**prop1:** (string/integer) prop-description', + get_schema_doc(full_schema)) + + def test_get_schema_doc_handles_enum_types(self): + """get_schema_doc converts enum types to yaml and delimits with '/'.""" + full_schema = copy(self.required_schema) + full_schema.update( + {'properties': { + 'prop1': {'enum': [True, False, 'stuff'], + 'description': 'prop-description'}}}) + self.assertIn( + '**prop1:** (true/false/stuff) prop-description', + get_schema_doc(full_schema)) + + def test_get_schema_doc_handles_nested_oneof_property_types(self): + """get_schema_doc describes array items oneOf declarations in type.""" + full_schema = copy(self.required_schema) + full_schema.update( + {'properties': { + 'prop1': {'type': 'array', + 'items': { + 'oneOf': [{'type': 'string'}, + {'type': 'integer'}]}, + 'description': 'prop-description'}}}) + self.assertIn( + '**prop1:** (array of (string)/(integer)) prop-description', + get_schema_doc(full_schema)) + + def test_get_schema_doc_handles_string_examples(self): + """get_schema_doc properly indented examples as a list of strings.""" + full_schema = copy(self.required_schema) + full_schema.update( + {'examples': ['ex1:\n [don\'t, expand, "this"]', 'ex2: true'], + 'properties': { + 'prop1': {'type': 'array', 'description': 'prop-description', + 'items': {'type': 'integer'}}}}) + self.assertIn( + dedent(""" + **Config schema**: + **prop1:** (array of integer) prop-description + + **Examples**:: + + ex1: + [don't, expand, "this"] + # --- Example2 --- + ex2: true + """), + get_schema_doc(full_schema)) + + def test_get_schema_doc_properly_parse_description(self): + """get_schema_doc description properly formatted""" + full_schema = copy(self.required_schema) + full_schema.update( + {'properties': { + 'p1': { + 'type': 'string', + 'description': dedent("""\ + This item + has the + following options: + + - option1 + - option2 + - option3 + + The default value is + option1""") + } + }} + ) + + self.assertIn( + dedent(""" + **Config schema**: + **p1:** (string) This item has the following options: + + - option1 + - option2 + - option3 + + The default value is option1 + """), + get_schema_doc(full_schema)) + + def test_get_schema_doc_raises_key_errors(self): + """get_schema_doc raises KeyErrors on missing keys.""" + for key in self.required_schema: + invalid_schema = copy(self.required_schema) + invalid_schema.pop(key) + with self.assertRaises(KeyError) as context_mgr: + get_schema_doc(invalid_schema) + self.assertIn(key, str(context_mgr.exception)) + + +class AnnotatedCloudconfigFileTest(CiTestCase): + maxDiff = None + + def test_annotated_cloudconfig_file_no_schema_errors(self): + """With no schema_errors, print the original content.""" + content = b'ntp:\n pools: [ntp1.pools.com]\n' + self.assertEqual( + content, + annotated_cloudconfig_file({}, content, schema_errors=[])) + + def test_annotated_cloudconfig_file_schema_annotates_and_adds_footer(self): + """With schema_errors, error lines are annotated and a footer added.""" + content = dedent("""\ + #cloud-config + # comment + ntp: + pools: [-99, 75] + """).encode() + expected = dedent("""\ + #cloud-config + # comment + ntp: # E1 + pools: [-99, 75] # E2,E3 + + # Errors: ------------- + # E1: Some type error + # E2: -99 is not a string + # E3: 75 is not a string + + """) + parsed_config = safe_load(content[13:]) + schema_errors = [ + ('ntp', 'Some type error'), ('ntp.pools.0', '-99 is not a string'), + ('ntp.pools.1', '75 is not a string')] + self.assertEqual( + expected, + annotated_cloudconfig_file(parsed_config, content, schema_errors)) + + def test_annotated_cloudconfig_file_annotates_separate_line_items(self): + """Errors are annotated for lists with items on separate lines.""" + content = dedent("""\ + #cloud-config + # comment + ntp: + pools: + - -99 + - 75 + """).encode() + expected = dedent("""\ + ntp: + pools: + - -99 # E1 + - 75 # E2 + """) + parsed_config = safe_load(content[13:]) + schema_errors = [ + ('ntp.pools.0', '-99 is not a string'), + ('ntp.pools.1', '75 is not a string')] + self.assertIn( + expected, + annotated_cloudconfig_file(parsed_config, content, schema_errors)) + + +class TestMain: + + exclusive_combinations = itertools.combinations( + ["--system", "--docs all", "--config-file something"], 2 + ) + + @pytest.mark.parametrize("params", exclusive_combinations) + def test_main_exclusive_args(self, params, capsys): + """Main exits non-zero and error on required exclusive args.""" + params = list(itertools.chain(*[a.split() for a in params])) + with mock.patch('sys.argv', ['mycmd'] + params): + with pytest.raises(SystemExit) as context_manager: + main() + assert 1 == context_manager.value.code + + _out, err = capsys.readouterr() + expected = ( + 'Expected one of --config-file, --system or --docs arguments\n' + ) + assert expected == err + + def test_main_missing_args(self, capsys): + """Main exits non-zero and reports an error on missing parameters.""" + with mock.patch('sys.argv', ['mycmd']): + with pytest.raises(SystemExit) as context_manager: + main() + assert 1 == context_manager.value.code + + _out, err = capsys.readouterr() + expected = ( + 'Expected one of --config-file, --system or --docs arguments\n' + ) + assert expected == err + + def test_main_absent_config_file(self, capsys): + """Main exits non-zero when config file is absent.""" + myargs = ['mycmd', '--annotate', '--config-file', 'NOT_A_FILE'] + with mock.patch('sys.argv', myargs): + with pytest.raises(SystemExit) as context_manager: + main() + assert 1 == context_manager.value.code + _out, err = capsys.readouterr() + assert 'Configfile NOT_A_FILE does not exist\n' == err + + def test_main_prints_docs(self, capsys): + """When --docs parameter is provided, main generates documentation.""" + myargs = ['mycmd', '--docs', 'all'] + with mock.patch('sys.argv', myargs): + assert 0 == main(), 'Expected 0 exit code' + out, _err = capsys.readouterr() + assert '\nNTP\n---\n' in out + assert '\nRuncmd\n------\n' in out + + def test_main_validates_config_file(self, tmpdir, capsys): + """When --config-file parameter is provided, main validates schema.""" + myyaml = tmpdir.join('my.yaml') + myargs = ['mycmd', '--config-file', myyaml.strpath] + myyaml.write(b'#cloud-config\nntp:') # shortest ntp schema + with mock.patch('sys.argv', myargs): + assert 0 == main(), 'Expected 0 exit code' + out, _err = capsys.readouterr() + assert 'Valid cloud-config: {0}\n'.format(myyaml) == out + + @mock.patch('cloudinit.config.schema.read_cfg_paths') + @mock.patch('cloudinit.config.schema.os.getuid', return_value=0) + def test_main_validates_system_userdata( + self, m_getuid, m_read_cfg_paths, capsys, paths + ): + """When --system is provided, main validates system userdata.""" + m_read_cfg_paths.return_value = paths + ud_file = paths.get_ipath_cur("userdata_raw") + write_file(ud_file, b'#cloud-config\nntp:') + myargs = ['mycmd', '--system'] + with mock.patch('sys.argv', myargs): + assert 0 == main(), 'Expected 0 exit code' + out, _err = capsys.readouterr() + assert 'Valid cloud-config: system userdata\n' == out + + @mock.patch('cloudinit.config.schema.os.getuid', return_value=1000) + def test_main_system_userdata_requires_root(self, m_getuid, capsys, paths): + """Non-root user can't use --system param""" + myargs = ['mycmd', '--system'] + with mock.patch('sys.argv', myargs): + with pytest.raises(SystemExit) as context_manager: + main() + assert 1 == context_manager.value.code + _out, err = capsys.readouterr() + expected = ( + 'Unable to read system userdata as non-root user. Try using sudo\n' + ) + assert expected == err + + +def _get_schema_doc_examples(): + examples_dir = Path( + cloudinit.__file__).parent.parent / 'doc' / 'examples' + assert examples_dir.is_dir() + + all_text_files = (f for f in examples_dir.glob('cloud-config*.txt') + if not f.name.startswith('cloud-config-archive')) + return all_text_files + + +class TestSchemaDocExamples: + schema = get_schema() + + @pytest.mark.parametrize("example_path", _get_schema_doc_examples()) + @skipUnlessJsonSchema() + def test_schema_doc_examples(self, example_path): + validate_cloudconfig_file(str(example_path), self.schema) + +# vi: ts=4 expandtab syntax=python |