summaryrefslogtreecommitdiff
path: root/tests
diff options
context:
space:
mode:
Diffstat (limited to 'tests')
-rw-r--r--tests/unittests/helpers.py6
-rw-r--r--tests/unittests/test_handler/test_handler_ntp.py109
-rw-r--r--tests/unittests/test_handler/test_schema.py220
3 files changed, 330 insertions, 5 deletions
diff --git a/tests/unittests/helpers.py b/tests/unittests/helpers.py
index 9ff15993..e78abce2 100644
--- a/tests/unittests/helpers.py
+++ b/tests/unittests/helpers.py
@@ -19,10 +19,6 @@ try:
from contextlib import ExitStack
except ImportError:
from contextlib2 import ExitStack
-try:
- from cStringIO import StringIO
-except ImportError:
- from io import StringIO
from cloudinit import helpers as ch
from cloudinit import util
@@ -102,7 +98,7 @@ class CiTestCase(TestCase):
if self.with_logs:
# Create a log handler so unit tests can search expected logs.
logger = logging.getLogger()
- self.logs = StringIO()
+ self.logs = six.StringIO()
handler = logging.StreamHandler(self.logs)
self.old_handlers = logger.handlers
logger.handlers = [handler]
diff --git a/tests/unittests/test_handler/test_handler_ntp.py b/tests/unittests/test_handler/test_handler_ntp.py
index bc4277b7..6cafa63d 100644
--- a/tests/unittests/test_handler/test_handler_ntp.py
+++ b/tests/unittests/test_handler/test_handler_ntp.py
@@ -212,4 +212,113 @@ class TestNtp(FilesystemMockingTestCase):
'Skipping module named cc_ntp, not present or disabled by cfg\n',
self.logs.getvalue())
+ def test_ntp_handler_schema_validation_allows_empty_ntp_config(self):
+ """Ntp schema validation allows for an empty ntp: configuration."""
+ invalid_config = {'ntp': {}}
+ distro = 'ubuntu'
+ cc = self._get_cloud(distro)
+ ntp_conf = os.path.join(self.new_root, 'ntp.conf')
+ with open('{0}.tmpl'.format(ntp_conf), 'wb') as stream:
+ stream.write(NTP_TEMPLATE)
+ with mock.patch('cloudinit.config.cc_ntp.NTP_CONF', ntp_conf):
+ cc_ntp.handle('cc_ntp', invalid_config, cc, None, [])
+ self.assertNotIn('Invalid config:', self.logs.getvalue())
+ with open(ntp_conf) as stream:
+ content = stream.read()
+ default_pools = [
+ "{0}.{1}.pool.ntp.org".format(x, distro)
+ for x in range(0, cc_ntp.NR_POOL_SERVERS)]
+ self.assertEqual(
+ "servers []\npools {0}\n".format(default_pools),
+ content)
+
+ def test_ntp_handler_schema_validation_warns_non_string_item_type(self):
+ """Ntp schema validation warns of non-strings in pools or servers.
+
+ Schema validation is not strict, so ntp config is still be rendered.
+ """
+ invalid_config = {'ntp': {'pools': [123], 'servers': ['valid', None]}}
+ cc = self._get_cloud('ubuntu')
+ ntp_conf = os.path.join(self.new_root, 'ntp.conf')
+ with open('{0}.tmpl'.format(ntp_conf), 'wb') as stream:
+ stream.write(NTP_TEMPLATE)
+ with mock.patch('cloudinit.config.cc_ntp.NTP_CONF', ntp_conf):
+ cc_ntp.handle('cc_ntp', invalid_config, cc, None, [])
+ self.assertIn(
+ "Invalid config:\nntp.pools.0: 123 is not of type 'string'\n"
+ "ntp.servers.1: None is not of type 'string'",
+ self.logs.getvalue())
+ with open(ntp_conf) as stream:
+ content = stream.read()
+ self.assertEqual("servers ['valid', None]\npools [123]\n", content)
+
+ def test_ntp_handler_schema_validation_warns_of_non_array_type(self):
+ """Ntp schema validation warns of non-array pools or servers types.
+
+ Schema validation is not strict, so ntp config is still be rendered.
+ """
+ invalid_config = {'ntp': {'pools': 123, 'servers': 'non-array'}}
+ cc = self._get_cloud('ubuntu')
+ ntp_conf = os.path.join(self.new_root, 'ntp.conf')
+ with open('{0}.tmpl'.format(ntp_conf), 'wb') as stream:
+ stream.write(NTP_TEMPLATE)
+ with mock.patch('cloudinit.config.cc_ntp.NTP_CONF', ntp_conf):
+ cc_ntp.handle('cc_ntp', invalid_config, cc, None, [])
+ self.assertIn(
+ "Invalid config:\nntp.pools: 123 is not of type 'array'\n"
+ "ntp.servers: 'non-array' is not of type 'array'",
+ self.logs.getvalue())
+ with open(ntp_conf) as stream:
+ content = stream.read()
+ self.assertEqual("servers non-array\npools 123\n", content)
+
+ def test_ntp_handler_schema_validation_warns_invalid_key_present(self):
+ """Ntp schema validation warns of invalid keys present in ntp config.
+
+ Schema validation is not strict, so ntp config is still be rendered.
+ """
+ invalid_config = {
+ 'ntp': {'invalidkey': 1, 'pools': ['0.mycompany.pool.ntp.org']}}
+ cc = self._get_cloud('ubuntu')
+ ntp_conf = os.path.join(self.new_root, 'ntp.conf')
+ with open('{0}.tmpl'.format(ntp_conf), 'wb') as stream:
+ stream.write(NTP_TEMPLATE)
+ with mock.patch('cloudinit.config.cc_ntp.NTP_CONF', ntp_conf):
+ cc_ntp.handle('cc_ntp', invalid_config, cc, None, [])
+ self.assertIn(
+ "Invalid config:\nntp: Additional properties are not allowed "
+ "('invalidkey' was unexpected)",
+ self.logs.getvalue())
+ with open(ntp_conf) as stream:
+ content = stream.read()
+ self.assertEqual(
+ "servers []\npools ['0.mycompany.pool.ntp.org']\n",
+ content)
+
+ def test_ntp_handler_schema_validation_warns_of_duplicates(self):
+ """Ntp schema validation warns of duplicates in servers or pools.
+
+ Schema validation is not strict, so ntp config is still be rendered.
+ """
+ invalid_config = {
+ 'ntp': {'pools': ['0.mypool.org', '0.mypool.org'],
+ 'servers': ['10.0.0.1', '10.0.0.1']}}
+ cc = self._get_cloud('ubuntu')
+ ntp_conf = os.path.join(self.new_root, 'ntp.conf')
+ with open('{0}.tmpl'.format(ntp_conf), 'wb') as stream:
+ stream.write(NTP_TEMPLATE)
+ with mock.patch('cloudinit.config.cc_ntp.NTP_CONF', ntp_conf):
+ cc_ntp.handle('cc_ntp', invalid_config, cc, None, [])
+ self.assertIn(
+ "Invalid config:\nntp.pools: ['0.mypool.org', '0.mypool.org'] has "
+ "non-unique elements\nntp.servers: ['10.0.0.1', '10.0.0.1'] has "
+ "non-unique elements",
+ self.logs.getvalue())
+ with open(ntp_conf) as stream:
+ content = stream.read()
+ self.assertEqual(
+ "servers ['10.0.0.1', '10.0.0.1']\n"
+ "pools ['0.mypool.org', '0.mypool.org']\n",
+ content)
+
# vi: ts=4 expandtab
diff --git a/tests/unittests/test_handler/test_schema.py b/tests/unittests/test_handler/test_schema.py
new file mode 100644
index 00000000..3239e326
--- /dev/null
+++ b/tests/unittests/test_handler/test_schema.py
@@ -0,0 +1,220 @@
+# This file is part of cloud-init. See LICENSE file for license information.
+
+from cloudinit.config.schema import (
+ CLOUD_CONFIG_HEADER, SchemaValidationError, get_schema_doc,
+ validate_cloudconfig_file, validate_cloudconfig_schema,
+ main)
+from cloudinit.util import write_file
+
+from ..helpers import CiTestCase, mock
+
+from copy import copy
+from six import StringIO
+from textwrap import dedent
+
+
+class SchemaValidationErrorTest(CiTestCase):
+ """Test validate_cloudconfig_schema"""
+
+ def test_schema_validation_error_expects_schema_errors(self):
+ """SchemaValidationError is initialized from schema_errors."""
+ errors = (('key.path', 'unexpected key "junk"'),
+ ('key2.path', '"-123" is not a valid "hostname" format'))
+ exception = SchemaValidationError(schema_errors=errors)
+ self.assertIsInstance(exception, Exception)
+ self.assertEqual(exception.schema_errors, errors)
+ self.assertEqual(
+ 'Cloud config schema errors: key.path: unexpected key "junk", '
+ 'key2.path: "-123" is not a valid "hostname" format',
+ str(exception))
+ self.assertTrue(isinstance(exception, ValueError))
+
+
+class ValidateCloudConfigSchemaTest(CiTestCase):
+ """Tests for validate_cloudconfig_schema."""
+
+ with_logs = True
+
+ def test_validateconfig_schema_non_strict_emits_warnings(self):
+ """When strict is False validate_cloudconfig_schema emits warnings."""
+ schema = {'properties': {'p1': {'type': 'string'}}}
+ validate_cloudconfig_schema({'p1': -1}, schema, strict=False)
+ self.assertIn(
+ "Invalid config:\np1: -1 is not of type 'string'\n",
+ self.logs.getvalue())
+
+ def test_validateconfig_schema_emits_warning_on_missing_jsonschema(self):
+ """Warning from validate_cloudconfig_schema when missing jsonschema."""
+ schema = {'properties': {'p1': {'type': 'string'}}}
+ with mock.patch.dict('sys.modules', **{'jsonschema': ImportError()}):
+ validate_cloudconfig_schema({'p1': -1}, schema, strict=True)
+ self.assertIn(
+ 'Ignoring schema validation. python-jsonschema is not present',
+ self.logs.getvalue())
+
+ def test_validateconfig_schema_strict_raises_errors(self):
+ """When strict is True validate_cloudconfig_schema raises errors."""
+ schema = {'properties': {'p1': {'type': 'string'}}}
+ with self.assertRaises(SchemaValidationError) as context_mgr:
+ validate_cloudconfig_schema({'p1': -1}, schema, strict=True)
+ self.assertEqual(
+ "Cloud config schema errors: p1: -1 is not of type 'string'",
+ str(context_mgr.exception))
+
+ def test_validateconfig_schema_honors_formats(self):
+ """When strict is True validate_cloudconfig_schema raises errors."""
+ schema = {
+ 'properties': {'p1': {'type': 'string', 'format': 'hostname'}}}
+ with self.assertRaises(SchemaValidationError) as context_mgr:
+ validate_cloudconfig_schema({'p1': '-1'}, schema, strict=True)
+ self.assertEqual(
+ "Cloud config schema errors: p1: '-1' is not a 'hostname'",
+ str(context_mgr.exception))
+
+
+class ValidateCloudConfigFileTest(CiTestCase):
+ """Tests for validate_cloudconfig_file."""
+
+ def setUp(self):
+ super(ValidateCloudConfigFileTest, self).setUp()
+ self.config_file = self.tmp_path('cloudcfg.yaml')
+
+ def test_validateconfig_file_error_on_absent_file(self):
+ """On absent config_path, validate_cloudconfig_file errors."""
+ with self.assertRaises(RuntimeError) as context_mgr:
+ validate_cloudconfig_file('/not/here', {})
+ self.assertEqual(
+ 'Configfile /not/here does not exist',
+ str(context_mgr.exception))
+
+ def test_validateconfig_file_error_on_invalid_header(self):
+ """On invalid header, validate_cloudconfig_file errors.
+
+ A SchemaValidationError is raised when the file doesn't begin with
+ CLOUD_CONFIG_HEADER.
+ """
+ write_file(self.config_file, '#junk')
+ with self.assertRaises(SchemaValidationError) as context_mgr:
+ validate_cloudconfig_file(self.config_file, {})
+ self.assertEqual(
+ 'Cloud config schema errors: header: File {0} needs to begin with '
+ '"{1}"'.format(self.config_file, CLOUD_CONFIG_HEADER.decode()),
+ str(context_mgr.exception))
+
+ def test_validateconfig_file_error_on_non_yaml_format(self):
+ """On non-yaml format, validate_cloudconfig_file errors."""
+ write_file(self.config_file, '#cloud-config\n{}}')
+ with self.assertRaises(SchemaValidationError) as context_mgr:
+ validate_cloudconfig_file(self.config_file, {})
+ self.assertIn(
+ 'schema errors: format: File {0} is not valid yaml.'.format(
+ self.config_file),
+ str(context_mgr.exception))
+
+ def test_validateconfig_file_sctricty_validates_schema(self):
+ """validate_cloudconfig_file raises errors on invalid schema."""
+ schema = {
+ 'properties': {'p1': {'type': 'string', 'format': 'hostname'}}}
+ write_file(self.config_file, '#cloud-config\np1: "-1"')
+ with self.assertRaises(SchemaValidationError) as context_mgr:
+ validate_cloudconfig_file(self.config_file, schema)
+ self.assertEqual(
+ "Cloud config schema errors: p1: '-1' is not a 'hostname'",
+ str(context_mgr.exception))
+
+
+class GetSchemaDocTest(CiTestCase):
+ """Tests for get_schema_doc."""
+
+ def setUp(self):
+ super(GetSchemaDocTest, self).setUp()
+ self.required_schema = {
+ 'title': 'title', 'description': 'description', 'id': 'id',
+ 'name': 'name', 'frequency': 'frequency',
+ 'distros': ['debian', 'rhel']}
+
+ def test_get_schema_doc_returns_restructured_text(self):
+ """get_schema_doc returns restructured text for a cloudinit schema."""
+ full_schema = copy(self.required_schema)
+ full_schema.update(
+ {'properties': {
+ 'prop1': {'type': 'array', 'description': 'prop-description',
+ 'items': {'type': 'int'}}}})
+ self.assertEqual(
+ dedent("""
+ name
+ ---
+ **Summary:** title
+
+ description
+
+ **Internal name:** ``id``
+
+ **Module frequency:** frequency
+
+ **Supported distros:** debian, rhel
+
+ **Config schema**:
+ **prop1:** (array of int) prop-description\n\n"""),
+ get_schema_doc(full_schema))
+
+ def test_get_schema_doc_returns_restructured_text_with_examples(self):
+ """get_schema_doc returns indented examples when present in schema."""
+ full_schema = copy(self.required_schema)
+ full_schema.update(
+ {'examples': {'ex1': [1, 2, 3]},
+ 'properties': {
+ 'prop1': {'type': 'array', 'description': 'prop-description',
+ 'items': {'type': 'int'}}}})
+ self.assertIn(
+ dedent("""
+ **Config schema**:
+ **prop1:** (array of int) prop-description
+
+ **Examples**::
+
+ ex1"""),
+ get_schema_doc(full_schema))
+
+ def test_get_schema_doc_raises_key_errors(self):
+ """get_schema_doc raises KeyErrors on missing keys."""
+ for key in self.required_schema:
+ invalid_schema = copy(self.required_schema)
+ invalid_schema.pop(key)
+ with self.assertRaises(KeyError) as context_mgr:
+ get_schema_doc(invalid_schema)
+ self.assertEqual("'{0}'".format(key), str(context_mgr.exception))
+
+
+class MainTest(CiTestCase):
+
+ def test_main_missing_args(self):
+ """Main exits non-zero and reports an error on missing parameters."""
+ with mock.patch('sys.argv', ['mycmd']):
+ with mock.patch('sys.stderr', new_callable=StringIO) as m_stderr:
+ self.assertEqual(1, main(), 'Expected non-zero exit code')
+ self.assertEqual(
+ 'Expected either --config-file argument or --doc\n',
+ m_stderr.getvalue())
+
+ def test_main_prints_docs(self):
+ """When --doc parameter is provided, main generates documentation."""
+ myargs = ['mycmd', '--doc']
+ with mock.patch('sys.argv', myargs):
+ with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout:
+ self.assertEqual(0, main(), 'Expected 0 exit code')
+ self.assertIn('\nNTP\n---\n', m_stdout.getvalue())
+
+ def test_main_validates_config_file(self):
+ """When --config-file parameter is provided, main validates schema."""
+ myyaml = self.tmp_path('my.yaml')
+ myargs = ['mycmd', '--config-file', myyaml]
+ with open(myyaml, 'wb') as stream:
+ stream.write(b'#cloud-config\nntp:') # shortest ntp schema
+ with mock.patch('sys.argv', myargs):
+ with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout:
+ self.assertEqual(0, main(), 'Expected 0 exit code')
+ self.assertIn(
+ 'Valid cloud-config file {0}'.format(myyaml), m_stdout.getvalue())
+
+# vi: ts=4 expandtab syntax=python