summaryrefslogtreecommitdiff
path: root/tests/unittests/test_handler
diff options
context:
space:
mode:
authorChad Smith <chad.smith@canonical.com>2017-10-06 13:22:54 -0600
committerChad Smith <chad.smith@canonical.com>2017-10-06 13:22:54 -0600
commit9fd022780ae516df3499b17b2d69b72fc502917c (patch)
treebc33ac6296f374414ccb15dce233a4293b8633d3 /tests/unittests/test_handler
parent89630a6658c099d59f2766493a35c2ad266a8f42 (diff)
parent45d361cb0b7f5e4e7d79522bd285871898358623 (diff)
downloadvyos-cloud-init-9fd022780ae516df3499b17b2d69b72fc502917c.tar.gz
vyos-cloud-init-9fd022780ae516df3499b17b2d69b72fc502917c.zip
merge from master at 17.1-17-g45d361cb
Diffstat (limited to 'tests/unittests/test_handler')
-rw-r--r--tests/unittests/test_handler/test_handler_apt_conf_v1.py2
-rw-r--r--tests/unittests/test_handler/test_handler_apt_configure_sources_list_v1.py2
-rw-r--r--tests/unittests/test_handler/test_handler_apt_configure_sources_list_v3.py2
-rw-r--r--tests/unittests/test_handler/test_handler_apt_source_v1.py2
-rw-r--r--tests/unittests/test_handler/test_handler_apt_source_v3.py2
-rw-r--r--tests/unittests/test_handler/test_handler_bootcmd.py146
-rw-r--r--tests/unittests/test_handler/test_handler_ca_certs.py2
-rw-r--r--tests/unittests/test_handler/test_handler_chef.py88
-rw-r--r--tests/unittests/test_handler/test_handler_debug.py11
-rw-r--r--tests/unittests/test_handler/test_handler_disk_setup.py2
-rw-r--r--tests/unittests/test_handler/test_handler_growpart.py2
-rw-r--r--tests/unittests/test_handler/test_handler_landscape.py130
-rw-r--r--tests/unittests/test_handler/test_handler_locale.py60
-rw-r--r--tests/unittests/test_handler/test_handler_lxd.py2
-rw-r--r--tests/unittests/test_handler/test_handler_mcollective.py2
-rw-r--r--tests/unittests/test_handler/test_handler_mounts.py2
-rw-r--r--tests/unittests/test_handler/test_handler_ntp.py107
-rw-r--r--tests/unittests/test_handler/test_handler_power_state.py4
-rw-r--r--tests/unittests/test_handler/test_handler_puppet.py142
-rw-r--r--tests/unittests/test_handler/test_handler_resizefs.py229
-rw-r--r--tests/unittests/test_handler/test_handler_rsyslog.py2
-rw-r--r--tests/unittests/test_handler/test_handler_runcmd.py108
-rw-r--r--tests/unittests/test_handler/test_handler_seed_random.py2
-rw-r--r--tests/unittests/test_handler/test_handler_set_hostname.py7
-rw-r--r--tests/unittests/test_handler/test_handler_snappy.py4
-rw-r--r--tests/unittests/test_handler/test_handler_spacewalk.py2
-rw-r--r--tests/unittests/test_handler/test_handler_timezone.py2
-rw-r--r--tests/unittests/test_handler/test_handler_write_files.py2
-rw-r--r--tests/unittests/test_handler/test_handler_yum_add_repo.py2
-rw-r--r--tests/unittests/test_handler/test_handler_zypper_add_repo.py237
-rw-r--r--tests/unittests/test_handler/test_schema.py167
31 files changed, 1403 insertions, 71 deletions
diff --git a/tests/unittests/test_handler/test_handler_apt_conf_v1.py b/tests/unittests/test_handler/test_handler_apt_conf_v1.py
index 554277ff..83f962a9 100644
--- a/tests/unittests/test_handler/test_handler_apt_conf_v1.py
+++ b/tests/unittests/test_handler/test_handler_apt_conf_v1.py
@@ -3,7 +3,7 @@
from cloudinit.config import cc_apt_configure
from cloudinit import util
-from ..helpers import TestCase
+from cloudinit.tests.helpers import TestCase
import copy
import os
diff --git a/tests/unittests/test_handler/test_handler_apt_configure_sources_list_v1.py b/tests/unittests/test_handler/test_handler_apt_configure_sources_list_v1.py
index f53ddbb2..d2b96f0b 100644
--- a/tests/unittests/test_handler/test_handler_apt_configure_sources_list_v1.py
+++ b/tests/unittests/test_handler/test_handler_apt_configure_sources_list_v1.py
@@ -24,7 +24,7 @@ from cloudinit.sources import DataSourceNone
from cloudinit.distros.debian import Distro
-from .. import helpers as t_help
+from cloudinit.tests import helpers as t_help
LOG = logging.getLogger(__name__)
diff --git a/tests/unittests/test_handler/test_handler_apt_configure_sources_list_v3.py b/tests/unittests/test_handler/test_handler_apt_configure_sources_list_v3.py
index 1ca915b4..f7608c28 100644
--- a/tests/unittests/test_handler/test_handler_apt_configure_sources_list_v3.py
+++ b/tests/unittests/test_handler/test_handler_apt_configure_sources_list_v3.py
@@ -24,7 +24,7 @@ from cloudinit.sources import DataSourceNone
from cloudinit.distros.debian import Distro
-from .. import helpers as t_help
+from cloudinit.tests import helpers as t_help
LOG = logging.getLogger(__name__)
diff --git a/tests/unittests/test_handler/test_handler_apt_source_v1.py b/tests/unittests/test_handler/test_handler_apt_source_v1.py
index 12502d05..3a3f95ca 100644
--- a/tests/unittests/test_handler/test_handler_apt_source_v1.py
+++ b/tests/unittests/test_handler/test_handler_apt_source_v1.py
@@ -20,7 +20,7 @@ from cloudinit.config import cc_apt_configure
from cloudinit import gpg
from cloudinit import util
-from ..helpers import TestCase
+from cloudinit.tests.helpers import TestCase
EXPECTEDKEY = """-----BEGIN PGP PUBLIC KEY BLOCK-----
Version: GnuPG v1
diff --git a/tests/unittests/test_handler/test_handler_apt_source_v3.py b/tests/unittests/test_handler/test_handler_apt_source_v3.py
index 292d3f59..7bb1b7c4 100644
--- a/tests/unittests/test_handler/test_handler_apt_source_v3.py
+++ b/tests/unittests/test_handler/test_handler_apt_source_v3.py
@@ -28,7 +28,7 @@ from cloudinit import util
from cloudinit.config import cc_apt_configure
from cloudinit.sources import DataSourceNone
-from .. import helpers as t_help
+from cloudinit.tests import helpers as t_help
EXPECTEDKEY = u"""-----BEGIN PGP PUBLIC KEY BLOCK-----
Version: GnuPG v1
diff --git a/tests/unittests/test_handler/test_handler_bootcmd.py b/tests/unittests/test_handler/test_handler_bootcmd.py
new file mode 100644
index 00000000..dbf43e0d
--- /dev/null
+++ b/tests/unittests/test_handler/test_handler_bootcmd.py
@@ -0,0 +1,146 @@
+# This file is part of cloud-init. See LICENSE file for license information.
+
+from cloudinit.config import cc_bootcmd
+from cloudinit.sources import DataSourceNone
+from cloudinit import (distros, helpers, cloud, util)
+from cloudinit.tests.helpers import CiTestCase, mock, skipIf
+
+import logging
+import tempfile
+
+try:
+ import jsonschema
+ assert jsonschema # avoid pyflakes error F401: import unused
+ _missing_jsonschema_dep = False
+except ImportError:
+ _missing_jsonschema_dep = True
+
+LOG = logging.getLogger(__name__)
+
+
+class FakeExtendedTempFile(object):
+ def __init__(self, suffix):
+ self.suffix = suffix
+ self.handle = tempfile.NamedTemporaryFile(
+ prefix="ci-%s." % self.__class__.__name__, delete=False)
+
+ def __enter__(self):
+ return self.handle
+
+ def __exit__(self, exc_type, exc_value, traceback):
+ self.handle.close()
+ util.del_file(self.handle.name)
+
+
+class TestBootcmd(CiTestCase):
+
+ with_logs = True
+
+ _etmpfile_path = ('cloudinit.config.cc_bootcmd.temp_utils.'
+ 'ExtendedTemporaryFile')
+
+ def setUp(self):
+ super(TestBootcmd, self).setUp()
+ self.subp = util.subp
+ self.new_root = self.tmp_dir()
+
+ def _get_cloud(self, distro):
+ paths = helpers.Paths({})
+ cls = distros.fetch(distro)
+ mydist = cls(distro, {}, paths)
+ myds = DataSourceNone.DataSourceNone({}, mydist, paths)
+ paths.datasource = myds
+ return cloud.Cloud(myds, paths, {}, mydist, None)
+
+ def test_handler_skip_if_no_bootcmd(self):
+ """When the provided config doesn't contain bootcmd, skip it."""
+ cfg = {}
+ mycloud = self._get_cloud('ubuntu')
+ cc_bootcmd.handle('notimportant', cfg, mycloud, LOG, None)
+ self.assertIn(
+ "Skipping module named notimportant, no 'bootcmd' key",
+ self.logs.getvalue())
+
+ def test_handler_invalid_command_set(self):
+ """Commands which can't be converted to shell will raise errors."""
+ invalid_config = {'bootcmd': 1}
+ cc = self._get_cloud('ubuntu')
+ with self.assertRaises(TypeError) as context_manager:
+ cc_bootcmd.handle('cc_bootcmd', invalid_config, cc, LOG, [])
+ self.assertIn('Failed to shellify bootcmd', self.logs.getvalue())
+ self.assertEqual(
+ "'int' object is not iterable",
+ str(context_manager.exception))
+
+ @skipIf(_missing_jsonschema_dep, "No python-jsonschema dependency")
+ def test_handler_schema_validation_warns_non_array_type(self):
+ """Schema validation warns of non-array type for bootcmd key.
+
+ Schema validation is not strict, so bootcmd attempts to shellify the
+ invalid content.
+ """
+ invalid_config = {'bootcmd': 1}
+ cc = self._get_cloud('ubuntu')
+ with self.assertRaises(TypeError):
+ cc_bootcmd.handle('cc_bootcmd', invalid_config, cc, LOG, [])
+ self.assertIn(
+ 'Invalid config:\nbootcmd: 1 is not of type \'array\'',
+ self.logs.getvalue())
+ self.assertIn('Failed to shellify', self.logs.getvalue())
+
+ @skipIf(_missing_jsonschema_dep, 'No python-jsonschema dependency')
+ def test_handler_schema_validation_warns_non_array_item_type(self):
+ """Schema validation warns of non-array or string bootcmd items.
+
+ Schema validation is not strict, so bootcmd attempts to shellify the
+ invalid content.
+ """
+ invalid_config = {
+ 'bootcmd': ['ls /', 20, ['wget', 'http://stuff/blah'], {'a': 'n'}]}
+ cc = self._get_cloud('ubuntu')
+ with self.assertRaises(RuntimeError) as context_manager:
+ cc_bootcmd.handle('cc_bootcmd', invalid_config, cc, LOG, [])
+ expected_warnings = [
+ 'bootcmd.1: 20 is not valid under any of the given schemas',
+ 'bootcmd.3: {\'a\': \'n\'} is not valid under any of the given'
+ ' schema'
+ ]
+ logs = self.logs.getvalue()
+ for warning in expected_warnings:
+ self.assertIn(warning, logs)
+ self.assertIn('Failed to shellify', logs)
+ self.assertEqual(
+ 'Unable to shellify type int which is not a list or string',
+ str(context_manager.exception))
+
+ def test_handler_creates_and_runs_bootcmd_script_with_instance_id(self):
+ """Valid schema runs a bootcmd script with INSTANCE_ID in the env."""
+ cc = self._get_cloud('ubuntu')
+ out_file = self.tmp_path('bootcmd.out', self.new_root)
+ my_id = "b6ea0f59-e27d-49c6-9f87-79f19765a425"
+ valid_config = {'bootcmd': [
+ 'echo {0} $INSTANCE_ID > {1}'.format(my_id, out_file)]}
+
+ with mock.patch(self._etmpfile_path, FakeExtendedTempFile):
+ cc_bootcmd.handle('cc_bootcmd', valid_config, cc, LOG, [])
+ self.assertEqual(my_id + ' iid-datasource-none\n',
+ util.load_file(out_file))
+
+ def test_handler_runs_bootcmd_script_with_error(self):
+ """When a valid script generates an error, that error is raised."""
+ cc = self._get_cloud('ubuntu')
+ valid_config = {'bootcmd': ['exit 1']} # Script with error
+
+ with mock.patch(self._etmpfile_path, FakeExtendedTempFile):
+ with self.assertRaises(util.ProcessExecutionError) as ctxt_manager:
+ cc_bootcmd.handle('does-not-matter', valid_config, cc, LOG, [])
+ self.assertIn(
+ 'Unexpected error while running command.\n'
+ "Command: ['/bin/sh',",
+ str(ctxt_manager.exception))
+ self.assertIn(
+ 'Failed to run bootcmd module does-not-matter',
+ self.logs.getvalue())
+
+
+# vi: ts=4 expandtab
diff --git a/tests/unittests/test_handler/test_handler_ca_certs.py b/tests/unittests/test_handler/test_handler_ca_certs.py
index 7cee2c3f..06e14db0 100644
--- a/tests/unittests/test_handler/test_handler_ca_certs.py
+++ b/tests/unittests/test_handler/test_handler_ca_certs.py
@@ -5,7 +5,7 @@ from cloudinit.config import cc_ca_certs
from cloudinit import helpers
from cloudinit import util
-from ..helpers import TestCase
+from cloudinit.tests.helpers import TestCase
import logging
import shutil
diff --git a/tests/unittests/test_handler/test_handler_chef.py b/tests/unittests/test_handler/test_handler_chef.py
index 6a152ea2..0136a93d 100644
--- a/tests/unittests/test_handler/test_handler_chef.py
+++ b/tests/unittests/test_handler/test_handler_chef.py
@@ -1,11 +1,10 @@
# This file is part of cloud-init. See LICENSE file for license information.
+import httpretty
import json
import logging
import os
-import shutil
import six
-import tempfile
from cloudinit import cloud
from cloudinit.config import cc_chef
@@ -14,18 +13,83 @@ from cloudinit import helpers
from cloudinit.sources import DataSourceNone
from cloudinit import util
-from .. import helpers as t_help
+from cloudinit.tests.helpers import (
+ CiTestCase, FilesystemMockingTestCase, mock, skipIf)
LOG = logging.getLogger(__name__)
CLIENT_TEMPL = os.path.sep.join(["templates", "chef_client.rb.tmpl"])
-class TestChef(t_help.FilesystemMockingTestCase):
+class TestInstallChefOmnibus(CiTestCase):
+
+ def setUp(self):
+ self.new_root = self.tmp_dir()
+
+ @httpretty.activate
+ def test_install_chef_from_omnibus_runs_chef_url_content(self):
+ """install_chef_from_omnibus runs downloaded OMNIBUS_URL as script."""
+ chef_outfile = self.tmp_path('chef.out', self.new_root)
+ response = '#!/bin/bash\necho "Hi Mom" > {0}'.format(chef_outfile)
+ httpretty.register_uri(
+ httpretty.GET, cc_chef.OMNIBUS_URL, body=response, status=200)
+ cc_chef.install_chef_from_omnibus()
+ self.assertEqual('Hi Mom\n', util.load_file(chef_outfile))
+
+ @mock.patch('cloudinit.config.cc_chef.url_helper.readurl')
+ @mock.patch('cloudinit.config.cc_chef.util.subp_blob_in_tempfile')
+ def test_install_chef_from_omnibus_retries_url(self, m_subp_blob, m_rdurl):
+ """install_chef_from_omnibus retries OMNIBUS_URL upon failure."""
+
+ class FakeURLResponse(object):
+ contents = '#!/bin/bash\necho "Hi Mom" > {0}/chef.out'.format(
+ self.new_root)
+
+ m_rdurl.return_value = FakeURLResponse()
+
+ cc_chef.install_chef_from_omnibus()
+ expected_kwargs = {'retries': cc_chef.OMNIBUS_URL_RETRIES,
+ 'url': cc_chef.OMNIBUS_URL}
+ self.assertItemsEqual(expected_kwargs, m_rdurl.call_args_list[0][1])
+ cc_chef.install_chef_from_omnibus(retries=10)
+ expected_kwargs = {'retries': 10,
+ 'url': cc_chef.OMNIBUS_URL}
+ self.assertItemsEqual(expected_kwargs, m_rdurl.call_args_list[1][1])
+ expected_subp_kwargs = {
+ 'args': ['-v', '2.0'],
+ 'basename': 'chef-omnibus-install',
+ 'blob': m_rdurl.return_value.contents,
+ 'capture': False
+ }
+ self.assertItemsEqual(
+ expected_subp_kwargs,
+ m_subp_blob.call_args_list[0][1])
+
+ @httpretty.activate
+ @mock.patch('cloudinit.config.cc_chef.util.subp_blob_in_tempfile')
+ def test_install_chef_from_omnibus_has_omnibus_version(self, m_subp_blob):
+ """install_chef_from_omnibus provides version arg to OMNIBUS_URL."""
+ chef_outfile = self.tmp_path('chef.out', self.new_root)
+ response = '#!/bin/bash\necho "Hi Mom" > {0}'.format(chef_outfile)
+ httpretty.register_uri(
+ httpretty.GET, cc_chef.OMNIBUS_URL, body=response)
+ cc_chef.install_chef_from_omnibus(omnibus_version='2.0')
+
+ called_kwargs = m_subp_blob.call_args_list[0][1]
+ expected_kwargs = {
+ 'args': ['-v', '2.0'],
+ 'basename': 'chef-omnibus-install',
+ 'blob': response,
+ 'capture': False
+ }
+ self.assertItemsEqual(expected_kwargs, called_kwargs)
+
+
+class TestChef(FilesystemMockingTestCase):
+
def setUp(self):
super(TestChef, self).setUp()
- self.tmp = tempfile.mkdtemp()
- self.addCleanup(shutil.rmtree, self.tmp)
+ self.tmp = self.tmp_dir()
def fetch_cloud(self, distro_kind):
cls = distros.fetch(distro_kind)
@@ -43,8 +107,8 @@ class TestChef(t_help.FilesystemMockingTestCase):
for d in cc_chef.CHEF_DIRS:
self.assertFalse(os.path.isdir(d))
- @t_help.skipIf(not os.path.isfile(CLIENT_TEMPL),
- CLIENT_TEMPL + " is not available")
+ @skipIf(not os.path.isfile(CLIENT_TEMPL),
+ CLIENT_TEMPL + " is not available")
def test_basic_config(self):
"""
test basic config looks sane
@@ -122,8 +186,8 @@ class TestChef(t_help.FilesystemMockingTestCase):
'c': 'd',
}, json.loads(c))
- @t_help.skipIf(not os.path.isfile(CLIENT_TEMPL),
- CLIENT_TEMPL + " is not available")
+ @skipIf(not os.path.isfile(CLIENT_TEMPL),
+ CLIENT_TEMPL + " is not available")
def test_template_deletes(self):
tpl_file = util.load_file('templates/chef_client.rb.tmpl')
self.patchUtils(self.tmp)
@@ -143,8 +207,8 @@ class TestChef(t_help.FilesystemMockingTestCase):
self.assertNotIn('json_attribs', c)
self.assertNotIn('Formatter.show_time', c)
- @t_help.skipIf(not os.path.isfile(CLIENT_TEMPL),
- CLIENT_TEMPL + " is not available")
+ @skipIf(not os.path.isfile(CLIENT_TEMPL),
+ CLIENT_TEMPL + " is not available")
def test_validation_cert_and_validation_key(self):
# test validation_cert content is written to validation_key path
tpl_file = util.load_file('templates/chef_client.rb.tmpl')
diff --git a/tests/unittests/test_handler/test_handler_debug.py b/tests/unittests/test_handler/test_handler_debug.py
index 929f786e..787ba350 100644
--- a/tests/unittests/test_handler/test_handler_debug.py
+++ b/tests/unittests/test_handler/test_handler_debug.py
@@ -11,7 +11,7 @@ from cloudinit import util
from cloudinit.sources import DataSourceNone
-from .. import helpers as t_help
+from cloudinit.tests.helpers import (FilesystemMockingTestCase, mock)
import logging
import shutil
@@ -20,7 +20,8 @@ import tempfile
LOG = logging.getLogger(__name__)
-class TestDebug(t_help.FilesystemMockingTestCase):
+@mock.patch('cloudinit.distros.debian.read_system_locale')
+class TestDebug(FilesystemMockingTestCase):
def setUp(self):
super(TestDebug, self).setUp()
self.new_root = tempfile.mkdtemp()
@@ -36,7 +37,8 @@ class TestDebug(t_help.FilesystemMockingTestCase):
ds.metadata.update(metadata)
return cloud.Cloud(ds, paths, {}, d, None)
- def test_debug_write(self):
+ def test_debug_write(self, m_locale):
+ m_locale.return_value = 'en_US.UTF-8'
cfg = {
'abc': '123',
'c': u'\u20a0',
@@ -54,7 +56,8 @@ class TestDebug(t_help.FilesystemMockingTestCase):
for k in cfg.keys():
self.assertIn(k, contents)
- def test_debug_no_write(self):
+ def test_debug_no_write(self, m_locale):
+ m_locale.return_value = 'en_US.UTF-8'
cfg = {
'abc': '123',
'debug': {
diff --git a/tests/unittests/test_handler/test_handler_disk_setup.py b/tests/unittests/test_handler/test_handler_disk_setup.py
index 8a6d49ed..5afcacaf 100644
--- a/tests/unittests/test_handler/test_handler_disk_setup.py
+++ b/tests/unittests/test_handler/test_handler_disk_setup.py
@@ -3,7 +3,7 @@
import random
from cloudinit.config import cc_disk_setup
-from ..helpers import CiTestCase, ExitStack, mock, TestCase
+from cloudinit.tests.helpers import CiTestCase, ExitStack, mock, TestCase
class TestIsDiskUsed(TestCase):
diff --git a/tests/unittests/test_handler/test_handler_growpart.py b/tests/unittests/test_handler/test_handler_growpart.py
index c5fc8c9b..a3e46351 100644
--- a/tests/unittests/test_handler/test_handler_growpart.py
+++ b/tests/unittests/test_handler/test_handler_growpart.py
@@ -4,7 +4,7 @@ from cloudinit import cloud
from cloudinit.config import cc_growpart
from cloudinit import util
-from ..helpers import TestCase
+from cloudinit.tests.helpers import TestCase
import errno
import logging
diff --git a/tests/unittests/test_handler/test_handler_landscape.py b/tests/unittests/test_handler/test_handler_landscape.py
new file mode 100644
index 00000000..db92a7e2
--- /dev/null
+++ b/tests/unittests/test_handler/test_handler_landscape.py
@@ -0,0 +1,130 @@
+# This file is part of cloud-init. See LICENSE file for license information.
+
+from cloudinit.config import cc_landscape
+from cloudinit import (distros, helpers, cloud, util)
+from cloudinit.sources import DataSourceNone
+from cloudinit.tests.helpers import (FilesystemMockingTestCase, mock,
+ wrap_and_call)
+
+from configobj import ConfigObj
+import logging
+
+
+LOG = logging.getLogger(__name__)
+
+
+class TestLandscape(FilesystemMockingTestCase):
+
+ with_logs = True
+
+ def setUp(self):
+ super(TestLandscape, self).setUp()
+ self.new_root = self.tmp_dir()
+ self.conf = self.tmp_path('client.conf', self.new_root)
+ self.default_file = self.tmp_path('default_landscape', self.new_root)
+
+ def _get_cloud(self, distro):
+ self.patchUtils(self.new_root)
+ paths = helpers.Paths({'templates_dir': self.new_root})
+ cls = distros.fetch(distro)
+ mydist = cls(distro, {}, paths)
+ myds = DataSourceNone.DataSourceNone({}, mydist, paths)
+ return cloud.Cloud(myds, paths, {}, mydist, None)
+
+ def test_handler_skips_empty_landscape_cloudconfig(self):
+ """Empty landscape cloud-config section does no work."""
+ mycloud = self._get_cloud('ubuntu')
+ mycloud.distro = mock.MagicMock()
+ cfg = {'landscape': {}}
+ cc_landscape.handle('notimportant', cfg, mycloud, LOG, None)
+ self.assertFalse(mycloud.distro.install_packages.called)
+
+ def test_handler_error_on_invalid_landscape_type(self):
+ """Raise an error when landscape configuraiton option is invalid."""
+ mycloud = self._get_cloud('ubuntu')
+ cfg = {'landscape': 'wrongtype'}
+ with self.assertRaises(RuntimeError) as context_manager:
+ cc_landscape.handle('notimportant', cfg, mycloud, LOG, None)
+ self.assertIn(
+ "'landscape' key existed in config, but not a dict",
+ str(context_manager.exception))
+
+ @mock.patch('cloudinit.config.cc_landscape.util')
+ def test_handler_restarts_landscape_client(self, m_util):
+ """handler restarts lansdscape-client after install."""
+ mycloud = self._get_cloud('ubuntu')
+ cfg = {'landscape': {'client': {}}}
+ wrap_and_call(
+ 'cloudinit.config.cc_landscape',
+ {'LSC_CLIENT_CFG_FILE': {'new': self.conf}},
+ cc_landscape.handle, 'notimportant', cfg, mycloud, LOG, None)
+ self.assertEqual(
+ [mock.call(['service', 'landscape-client', 'restart'])],
+ m_util.subp.call_args_list)
+
+ def test_handler_installs_client_and_creates_config_file(self):
+ """Write landscape client.conf and install landscape-client."""
+ mycloud = self._get_cloud('ubuntu')
+ cfg = {'landscape': {'client': {}}}
+ expected = {'client': {
+ 'log_level': 'info',
+ 'url': 'https://landscape.canonical.com/message-system',
+ 'ping_url': 'http://landscape.canonical.com/ping',
+ 'data_path': '/var/lib/landscape/client'}}
+ mycloud.distro = mock.MagicMock()
+ wrap_and_call(
+ 'cloudinit.config.cc_landscape',
+ {'LSC_CLIENT_CFG_FILE': {'new': self.conf},
+ 'LS_DEFAULT_FILE': {'new': self.default_file}},
+ cc_landscape.handle, 'notimportant', cfg, mycloud, LOG, None)
+ self.assertEqual(
+ [mock.call('landscape-client')],
+ mycloud.distro.install_packages.call_args)
+ self.assertEqual(expected, dict(ConfigObj(self.conf)))
+ self.assertIn(
+ 'Wrote landscape config file to {0}'.format(self.conf),
+ self.logs.getvalue())
+ default_content = util.load_file(self.default_file)
+ self.assertEqual('RUN=1\n', default_content)
+
+ def test_handler_writes_merged_client_config_file_with_defaults(self):
+ """Merge and write options from LSC_CLIENT_CFG_FILE with defaults."""
+ # Write existing sparse client.conf file
+ util.write_file(self.conf, '[client]\ncomputer_title = My PC\n')
+ mycloud = self._get_cloud('ubuntu')
+ cfg = {'landscape': {'client': {}}}
+ expected = {'client': {
+ 'log_level': 'info',
+ 'url': 'https://landscape.canonical.com/message-system',
+ 'ping_url': 'http://landscape.canonical.com/ping',
+ 'data_path': '/var/lib/landscape/client',
+ 'computer_title': 'My PC'}}
+ wrap_and_call(
+ 'cloudinit.config.cc_landscape',
+ {'LSC_CLIENT_CFG_FILE': {'new': self.conf}},
+ cc_landscape.handle, 'notimportant', cfg, mycloud, LOG, None)
+ self.assertEqual(expected, dict(ConfigObj(self.conf)))
+ self.assertIn(
+ 'Wrote landscape config file to {0}'.format(self.conf),
+ self.logs.getvalue())
+
+ def test_handler_writes_merged_provided_cloudconfig_with_defaults(self):
+ """Merge and write options from cloud-config options with defaults."""
+ # Write empty sparse client.conf file
+ util.write_file(self.conf, '')
+ mycloud = self._get_cloud('ubuntu')
+ cfg = {'landscape': {'client': {'computer_title': 'My PC'}}}
+ expected = {'client': {
+ 'log_level': 'info',
+ 'url': 'https://landscape.canonical.com/message-system',
+ 'ping_url': 'http://landscape.canonical.com/ping',
+ 'data_path': '/var/lib/landscape/client',
+ 'computer_title': 'My PC'}}
+ wrap_and_call(
+ 'cloudinit.config.cc_landscape',
+ {'LSC_CLIENT_CFG_FILE': {'new': self.conf}},
+ cc_landscape.handle, 'notimportant', cfg, mycloud, LOG, None)
+ self.assertEqual(expected, dict(ConfigObj(self.conf)))
+ self.assertIn(
+ 'Wrote landscape config file to {0}'.format(self.conf),
+ self.logs.getvalue())
diff --git a/tests/unittests/test_handler/test_handler_locale.py b/tests/unittests/test_handler/test_handler_locale.py
index e9a810c5..e29a06f9 100644
--- a/tests/unittests/test_handler/test_handler_locale.py
+++ b/tests/unittests/test_handler/test_handler_locale.py
@@ -13,13 +13,15 @@ from cloudinit import util
from cloudinit.sources import DataSourceNoCloud
-from .. import helpers as t_help
+from cloudinit.tests import helpers as t_help
from configobj import ConfigObj
from six import BytesIO
import logging
+import mock
+import os
import shutil
import tempfile
@@ -27,6 +29,9 @@ LOG = logging.getLogger(__name__)
class TestLocale(t_help.FilesystemMockingTestCase):
+
+ with_logs = True
+
def setUp(self):
super(TestLocale, self).setUp()
self.new_root = tempfile.mkdtemp()
@@ -49,9 +54,58 @@ class TestLocale(t_help.FilesystemMockingTestCase):
}
cc = self._get_cloud('sles')
cc_locale.handle('cc_locale', cfg, cc, LOG, [])
+ if cc.distro.uses_systemd():
+ locale_conf = cc.distro.systemd_locale_conf_fn
+ else:
+ locale_conf = cc.distro.locale_conf_fn
+ contents = util.load_file(locale_conf, decode=False)
+ n_cfg = ConfigObj(BytesIO(contents))
+ if cc.distro.uses_systemd():
+ self.assertEqual({'LANG': cfg['locale']}, dict(n_cfg))
+ else:
+ self.assertEqual({'RC_LANG': cfg['locale']}, dict(n_cfg))
+
+ def test_set_locale_sles_default(self):
+ cfg = {}
+ cc = self._get_cloud('sles')
+ cc_locale.handle('cc_locale', cfg, cc, LOG, [])
- contents = util.load_file('/etc/sysconfig/language', decode=False)
+ if cc.distro.uses_systemd():
+ locale_conf = cc.distro.systemd_locale_conf_fn
+ keyname = 'LANG'
+ else:
+ locale_conf = cc.distro.locale_conf_fn
+ keyname = 'RC_LANG'
+
+ contents = util.load_file(locale_conf, decode=False)
n_cfg = ConfigObj(BytesIO(contents))
- self.assertEqual({'RC_LANG': cfg['locale']}, dict(n_cfg))
+ self.assertEqual({keyname: 'en_US.UTF-8'}, dict(n_cfg))
+
+ def test_locale_update_config_if_different_than_default(self):
+ """Test cc_locale writes updates conf if different than default"""
+ locale_conf = os.path.join(self.new_root, "etc/default/locale")
+ util.write_file(locale_conf, 'LANG="en_US.UTF-8"\n')
+ cfg = {'locale': 'C.UTF-8'}
+ cc = self._get_cloud('ubuntu')
+ with mock.patch('cloudinit.distros.debian.util.subp') as m_subp:
+ with mock.patch('cloudinit.distros.debian.LOCALE_CONF_FN',
+ locale_conf):
+ cc_locale.handle('cc_locale', cfg, cc, LOG, [])
+ m_subp.assert_called_with(['update-locale',
+ '--locale-file=%s' % locale_conf,
+ 'LANG=C.UTF-8'], capture=False)
+
+ def test_locale_rhel_defaults_en_us_utf8(self):
+ """Test cc_locale gets en_US.UTF-8 from distro get_locale fallback"""
+ cfg = {}
+ cc = self._get_cloud('rhel')
+ update_sysconfig = 'cloudinit.distros.rhel_util.update_sysconfig_file'
+ with mock.patch.object(cc.distro, 'uses_systemd') as m_use_sd:
+ m_use_sd.return_value = True
+ with mock.patch(update_sysconfig) as m_update_syscfg:
+ cc_locale.handle('cc_locale', cfg, cc, LOG, [])
+ m_update_syscfg.assert_called_with('/etc/locale.conf',
+ {'LANG': 'en_US.UTF-8'})
+
# vi: ts=4 expandtab
diff --git a/tests/unittests/test_handler/test_handler_lxd.py b/tests/unittests/test_handler/test_handler_lxd.py
index 351226bf..f132a778 100644
--- a/tests/unittests/test_handler/test_handler_lxd.py
+++ b/tests/unittests/test_handler/test_handler_lxd.py
@@ -3,7 +3,7 @@
from cloudinit.config import cc_lxd
from cloudinit.sources import DataSourceNoCloud
from cloudinit import (distros, helpers, cloud)
-from .. import helpers as t_help
+from cloudinit.tests import helpers as t_help
import logging
diff --git a/tests/unittests/test_handler/test_handler_mcollective.py b/tests/unittests/test_handler/test_handler_mcollective.py
index 2a9f3823..7eec7352 100644
--- a/tests/unittests/test_handler/test_handler_mcollective.py
+++ b/tests/unittests/test_handler/test_handler_mcollective.py
@@ -4,7 +4,7 @@ from cloudinit import (cloud, distros, helpers, util)
from cloudinit.config import cc_mcollective
from cloudinit.sources import DataSourceNoCloud
-from .. import helpers as t_help
+from cloudinit.tests import helpers as t_help
import configobj
import logging
diff --git a/tests/unittests/test_handler/test_handler_mounts.py b/tests/unittests/test_handler/test_handler_mounts.py
index 650ca0ec..fe492d4b 100644
--- a/tests/unittests/test_handler/test_handler_mounts.py
+++ b/tests/unittests/test_handler/test_handler_mounts.py
@@ -6,7 +6,7 @@ import tempfile
from cloudinit.config import cc_mounts
-from .. import helpers as test_helpers
+from cloudinit.tests import helpers as test_helpers
try:
from unittest import mock
diff --git a/tests/unittests/test_handler/test_handler_ntp.py b/tests/unittests/test_handler/test_handler_ntp.py
index 7f278646..4f291248 100644
--- a/tests/unittests/test_handler/test_handler_ntp.py
+++ b/tests/unittests/test_handler/test_handler_ntp.py
@@ -3,7 +3,7 @@
from cloudinit.config import cc_ntp
from cloudinit.sources import DataSourceNone
from cloudinit import (distros, helpers, cloud, util)
-from ..helpers import FilesystemMockingTestCase, mock, skipIf
+from cloudinit.tests.helpers import FilesystemMockingTestCase, mock, skipIf
import os
@@ -16,6 +16,14 @@ servers {{servers}}
pools {{pools}}
"""
+TIMESYNCD_TEMPLATE = b"""\
+## template:jinja
+[Time]
+{% if servers or pools -%}
+NTP={% for host in servers|list + pools|list %}{{ host }} {% endfor -%}
+{% endif -%}
+"""
+
try:
import jsonschema
assert jsonschema # avoid pyflakes error F401: import unused
@@ -59,6 +67,14 @@ class TestNtp(FilesystemMockingTestCase):
cc_ntp.install_ntp(install_func, packages=['ntp'], check_exe='ntpd')
install_func.assert_not_called()
+ @mock.patch("cloudinit.config.cc_ntp.util")
+ def test_ntp_install_no_op_with_empty_pkg_list(self, mock_util):
+ """ntp_install calls install_func with empty list"""
+ mock_util.which.return_value = None # check_exe not found
+ install_func = mock.MagicMock()
+ cc_ntp.install_ntp(install_func, packages=[], check_exe='timesyncd')
+ install_func.assert_called_once_with([])
+
def test_ntp_rename_ntp_conf(self):
"""When NTP_CONF exists, rename_ntp moves it."""
ntpconf = self.tmp_path("ntp.conf", self.new_root)
@@ -68,6 +84,30 @@ class TestNtp(FilesystemMockingTestCase):
self.assertFalse(os.path.exists(ntpconf))
self.assertTrue(os.path.exists("{0}.dist".format(ntpconf)))
+ @mock.patch("cloudinit.config.cc_ntp.util")
+ def test_reload_ntp_defaults(self, mock_util):
+ """Test service is restarted/reloaded (defaults)"""
+ service = 'ntp'
+ cmd = ['service', service, 'restart']
+ cc_ntp.reload_ntp(service)
+ mock_util.subp.assert_called_with(cmd, capture=True)
+
+ @mock.patch("cloudinit.config.cc_ntp.util")
+ def test_reload_ntp_systemd(self, mock_util):
+ """Test service is restarted/reloaded (systemd)"""
+ service = 'ntp'
+ cmd = ['systemctl', 'reload-or-restart', service]
+ cc_ntp.reload_ntp(service, systemd=True)
+ mock_util.subp.assert_called_with(cmd, capture=True)
+
+ @mock.patch("cloudinit.config.cc_ntp.util")
+ def test_reload_ntp_systemd_timesycnd(self, mock_util):
+ """Test service is restarted/reloaded (systemd/timesyncd)"""
+ service = 'systemd-timesycnd'
+ cmd = ['systemctl', 'reload-or-restart', service]
+ cc_ntp.reload_ntp(service, systemd=True)
+ mock_util.subp.assert_called_with(cmd, capture=True)
+
def test_ntp_rename_ntp_conf_skip_missing(self):
"""When NTP_CONF doesn't exist rename_ntp doesn't create a file."""
ntpconf = self.tmp_path("ntp.conf", self.new_root)
@@ -94,7 +134,7 @@ class TestNtp(FilesystemMockingTestCase):
with open('{0}.tmpl'.format(ntp_conf), 'wb') as stream:
stream.write(NTP_TEMPLATE)
with mock.patch('cloudinit.config.cc_ntp.NTP_CONF', ntp_conf):
- cc_ntp.write_ntp_config_template(cfg, mycloud)
+ cc_ntp.write_ntp_config_template(cfg, mycloud, ntp_conf)
content = util.read_file_or_url('file://' + ntp_conf).contents
self.assertEqual(
"servers ['192.168.2.1', '192.168.2.2']\npools []\n",
@@ -120,7 +160,7 @@ class TestNtp(FilesystemMockingTestCase):
with open('{0}.{1}.tmpl'.format(ntp_conf, distro), 'wb') as stream:
stream.write(NTP_TEMPLATE)
with mock.patch('cloudinit.config.cc_ntp.NTP_CONF', ntp_conf):
- cc_ntp.write_ntp_config_template(cfg, mycloud)
+ cc_ntp.write_ntp_config_template(cfg, mycloud, ntp_conf)
content = util.read_file_or_url('file://' + ntp_conf).contents
self.assertEqual(
"servers []\npools ['10.0.0.1', '10.0.0.2']\n",
@@ -139,7 +179,7 @@ class TestNtp(FilesystemMockingTestCase):
with open('{0}.tmpl'.format(ntp_conf), 'wb') as stream:
stream.write(NTP_TEMPLATE)
with mock.patch('cloudinit.config.cc_ntp.NTP_CONF', ntp_conf):
- cc_ntp.write_ntp_config_template({}, mycloud)
+ cc_ntp.write_ntp_config_template({}, mycloud, ntp_conf)
content = util.read_file_or_url('file://' + ntp_conf).contents
default_pools = [
"{0}.{1}.pool.ntp.org".format(x, distro)
@@ -152,7 +192,8 @@ class TestNtp(FilesystemMockingTestCase):
",".join(default_pools)),
self.logs.getvalue())
- def test_ntp_handler_mocked_template(self):
+ @mock.patch("cloudinit.config.cc_ntp.ntp_installable")
+ def test_ntp_handler_mocked_template(self, m_ntp_install):
"""Test ntp handler renders ubuntu ntp.conf template."""
pools = ['0.mycompany.pool.ntp.org', '3.mycompany.pool.ntp.org']
servers = ['192.168.23.3', '192.168.23.4']
@@ -164,6 +205,8 @@ class TestNtp(FilesystemMockingTestCase):
}
mycloud = self._get_cloud('ubuntu')
ntp_conf = self.tmp_path('ntp.conf', self.new_root) # Doesn't exist
+ m_ntp_install.return_value = True
+
# Create ntp.conf.tmpl
with open('{0}.tmpl'.format(ntp_conf), 'wb') as stream:
stream.write(NTP_TEMPLATE)
@@ -176,6 +219,34 @@ class TestNtp(FilesystemMockingTestCase):
'servers {0}\npools {1}\n'.format(servers, pools),
content.decode())
+ @mock.patch("cloudinit.config.cc_ntp.util")
+ def test_ntp_handler_mocked_template_snappy(self, m_util):
+ """Test ntp handler renders timesycnd.conf template on snappy."""
+ pools = ['0.mycompany.pool.ntp.org', '3.mycompany.pool.ntp.org']
+ servers = ['192.168.23.3', '192.168.23.4']
+ cfg = {
+ 'ntp': {
+ 'pools': pools,
+ 'servers': servers
+ }
+ }
+ mycloud = self._get_cloud('ubuntu')
+ m_util.system_is_snappy.return_value = True
+
+ # Create timesyncd.conf.tmpl
+ tsyncd_conf = self.tmp_path("timesyncd.conf", self.new_root)
+ template = '{0}.tmpl'.format(tsyncd_conf)
+ with open(template, 'wb') as stream:
+ stream.write(TIMESYNCD_TEMPLATE)
+
+ with mock.patch('cloudinit.config.cc_ntp.TIMESYNCD_CONF', tsyncd_conf):
+ cc_ntp.handle('notimportant', cfg, mycloud, None, None)
+
+ content = util.read_file_or_url('file://' + tsyncd_conf).contents
+ self.assertEqual(
+ "[Time]\nNTP=%s %s \n" % (" ".join(servers), " ".join(pools)),
+ content.decode())
+
def test_ntp_handler_real_distro_templates(self):
"""Test ntp handler renders the shipped distro ntp.conf templates."""
pools = ['0.mycompany.pool.ntp.org', '3.mycompany.pool.ntp.org']
@@ -333,4 +404,30 @@ class TestNtp(FilesystemMockingTestCase):
"pools ['0.mypool.org', '0.mypool.org']\n",
content)
+ @mock.patch("cloudinit.config.cc_ntp.ntp_installable")
+ def test_ntp_handler_timesyncd(self, m_ntp_install):
+ """Test ntp handler configures timesyncd"""
+ m_ntp_install.return_value = False
+ distro = 'ubuntu'
+ cfg = {
+ 'servers': ['192.168.2.1', '192.168.2.2'],
+ 'pools': ['0.mypool.org'],
+ }
+ mycloud = self._get_cloud(distro)
+ tsyncd_conf = self.tmp_path("timesyncd.conf", self.new_root)
+ # Create timesyncd.conf.tmpl
+ template = '{0}.tmpl'.format(tsyncd_conf)
+ print(template)
+ with open(template, 'wb') as stream:
+ stream.write(TIMESYNCD_TEMPLATE)
+ with mock.patch('cloudinit.config.cc_ntp.TIMESYNCD_CONF', tsyncd_conf):
+ cc_ntp.write_ntp_config_template(cfg, mycloud, tsyncd_conf,
+ template='timesyncd.conf')
+
+ content = util.read_file_or_url('file://' + tsyncd_conf).contents
+ self.assertEqual(
+ "[Time]\nNTP=192.168.2.1 192.168.2.2 0.mypool.org \n",
+ content.decode())
+
+
# vi: ts=4 expandtab
diff --git a/tests/unittests/test_handler/test_handler_power_state.py b/tests/unittests/test_handler/test_handler_power_state.py
index e382210d..85a0fe0a 100644
--- a/tests/unittests/test_handler/test_handler_power_state.py
+++ b/tests/unittests/test_handler/test_handler_power_state.py
@@ -4,8 +4,8 @@ import sys
from cloudinit.config import cc_power_state_change as psc
-from .. import helpers as t_help
-from ..helpers import mock
+from cloudinit.tests import helpers as t_help
+from cloudinit.tests.helpers import mock
class TestLoadPowerState(t_help.TestCase):
diff --git a/tests/unittests/test_handler/test_handler_puppet.py b/tests/unittests/test_handler/test_handler_puppet.py
new file mode 100644
index 00000000..0b6e3b58
--- /dev/null
+++ b/tests/unittests/test_handler/test_handler_puppet.py
@@ -0,0 +1,142 @@
+# This file is part of cloud-init. See LICENSE file for license information.
+
+from cloudinit.config import cc_puppet
+from cloudinit.sources import DataSourceNone
+from cloudinit import (distros, helpers, cloud, util)
+from cloudinit.tests.helpers import CiTestCase, mock
+
+import logging
+
+
+LOG = logging.getLogger(__name__)
+
+
+@mock.patch('cloudinit.config.cc_puppet.util')
+@mock.patch('cloudinit.config.cc_puppet.os')
+class TestAutostartPuppet(CiTestCase):
+
+ with_logs = True
+
+ def test_wb_autostart_puppet_updates_puppet_default(self, m_os, m_util):
+ """Update /etc/default/puppet to autostart if it exists."""
+
+ def _fake_exists(path):
+ return path == '/etc/default/puppet'
+
+ m_os.path.exists.side_effect = _fake_exists
+ cc_puppet._autostart_puppet(LOG)
+ self.assertEqual(
+ [mock.call(['sed', '-i', '-e', 's/^START=.*/START=yes/',
+ '/etc/default/puppet'], capture=False)],
+ m_util.subp.call_args_list)
+
+ def test_wb_autostart_pupppet_enables_puppet_systemctl(self, m_os, m_util):
+ """If systemctl is present, enable puppet via systemctl."""
+
+ def _fake_exists(path):
+ return path == '/bin/systemctl'
+
+ m_os.path.exists.side_effect = _fake_exists
+ cc_puppet._autostart_puppet(LOG)
+ expected_calls = [mock.call(
+ ['/bin/systemctl', 'enable', 'puppet.service'], capture=False)]
+ self.assertEqual(expected_calls, m_util.subp.call_args_list)
+
+ def test_wb_autostart_pupppet_enables_puppet_chkconfig(self, m_os, m_util):
+ """If chkconfig is present, enable puppet via checkcfg."""
+
+ def _fake_exists(path):
+ return path == '/sbin/chkconfig'
+
+ m_os.path.exists.side_effect = _fake_exists
+ cc_puppet._autostart_puppet(LOG)
+ expected_calls = [mock.call(
+ ['/sbin/chkconfig', 'puppet', 'on'], capture=False)]
+ self.assertEqual(expected_calls, m_util.subp.call_args_list)
+
+
+@mock.patch('cloudinit.config.cc_puppet._autostart_puppet')
+class TestPuppetHandle(CiTestCase):
+
+ with_logs = True
+
+ def setUp(self):
+ super(TestPuppetHandle, self).setUp()
+ self.new_root = self.tmp_dir()
+ self.conf = self.tmp_path('puppet.conf')
+
+ def _get_cloud(self, distro):
+ paths = helpers.Paths({'templates_dir': self.new_root})
+ cls = distros.fetch(distro)
+ mydist = cls(distro, {}, paths)
+ myds = DataSourceNone.DataSourceNone({}, mydist, paths)
+ return cloud.Cloud(myds, paths, {}, mydist, None)
+
+ def test_handler_skips_missing_puppet_key_in_cloudconfig(self, m_auto):
+ """Cloud-config containing no 'puppet' key is skipped."""
+ mycloud = self._get_cloud('ubuntu')
+ cfg = {}
+ cc_puppet.handle('notimportant', cfg, mycloud, LOG, None)
+ self.assertIn(
+ "no 'puppet' configuration found", self.logs.getvalue())
+ self.assertEqual(0, m_auto.call_count)
+
+ @mock.patch('cloudinit.config.cc_puppet.util.subp')
+ def test_handler_puppet_config_starts_puppet_service(self, m_subp, m_auto):
+ """Cloud-config 'puppet' configuration starts puppet."""
+ mycloud = self._get_cloud('ubuntu')
+ cfg = {'puppet': {'install': False}}
+ cc_puppet.handle('notimportant', cfg, mycloud, LOG, None)
+ self.assertEqual(1, m_auto.call_count)
+ self.assertEqual(
+ [mock.call(['service', 'puppet', 'start'], capture=False)],
+ m_subp.call_args_list)
+
+ @mock.patch('cloudinit.config.cc_puppet.util.subp')
+ def test_handler_empty_puppet_config_installs_puppet(self, m_subp, m_auto):
+ """Cloud-config empty 'puppet' configuration installs latest puppet."""
+ mycloud = self._get_cloud('ubuntu')
+ mycloud.distro = mock.MagicMock()
+ cfg = {'puppet': {}}
+ cc_puppet.handle('notimportant', cfg, mycloud, LOG, None)
+ self.assertEqual(
+ [mock.call(('puppet', None))],
+ mycloud.distro.install_packages.call_args_list)
+
+ @mock.patch('cloudinit.config.cc_puppet.util.subp')
+ def test_handler_puppet_config_installs_puppet_on_true(self, m_subp, _):
+ """Cloud-config with 'puppet' key installs when 'install' is True."""
+ mycloud = self._get_cloud('ubuntu')
+ mycloud.distro = mock.MagicMock()
+ cfg = {'puppet': {'install': True}}
+ cc_puppet.handle('notimportant', cfg, mycloud, LOG, None)
+ self.assertEqual(
+ [mock.call(('puppet', None))],
+ mycloud.distro.install_packages.call_args_list)
+
+ @mock.patch('cloudinit.config.cc_puppet.util.subp')
+ def test_handler_puppet_config_installs_puppet_version(self, m_subp, _):
+ """Cloud-config 'puppet' configuration can specify a version."""
+ mycloud = self._get_cloud('ubuntu')
+ mycloud.distro = mock.MagicMock()
+ cfg = {'puppet': {'version': '3.8'}}
+ cc_puppet.handle('notimportant', cfg, mycloud, LOG, None)
+ self.assertEqual(
+ [mock.call(('puppet', '3.8'))],
+ mycloud.distro.install_packages.call_args_list)
+
+ @mock.patch('cloudinit.config.cc_puppet.util.subp')
+ def test_handler_puppet_config_updates_puppet_conf(self, m_subp, m_auto):
+ """When 'conf' is provided update values in PUPPET_CONF_PATH."""
+ mycloud = self._get_cloud('ubuntu')
+ cfg = {
+ 'puppet': {
+ 'conf': {'agent': {'server': 'puppetmaster.example.org'}}}}
+ util.write_file(self.conf, '[agent]\nserver = origpuppet\nother = 3')
+ puppet_conf_path = 'cloudinit.config.cc_puppet.PUPPET_CONF_PATH'
+ mycloud.distro = mock.MagicMock()
+ with mock.patch(puppet_conf_path, self.conf):
+ cc_puppet.handle('notimportant', cfg, mycloud, LOG, None)
+ content = util.load_file(self.conf)
+ expected = '[agent]\nserver = puppetmaster.example.org\nother = 3\n\n'
+ self.assertEqual(expected, content)
diff --git a/tests/unittests/test_handler/test_handler_resizefs.py b/tests/unittests/test_handler/test_handler_resizefs.py
index 52591b8b..3e5d436c 100644
--- a/tests/unittests/test_handler/test_handler_resizefs.py
+++ b/tests/unittests/test_handler/test_handler_resizefs.py
@@ -1,17 +1,30 @@
# This file is part of cloud-init. See LICENSE file for license information.
-from cloudinit.config import cc_resizefs
+from cloudinit.config.cc_resizefs import (
+ can_skip_resize, handle, is_device_path_writable_block,
+ rootdev_from_cmdline)
+import logging
import textwrap
-import unittest
+
+from cloudinit.tests.helpers import (CiTestCase, mock, skipIf, util,
+ wrap_and_call)
+
+
+LOG = logging.getLogger(__name__)
+
try:
- from unittest import mock
+ import jsonschema
+ assert jsonschema # avoid pyflakes error F401: import unused
+ _missing_jsonschema_dep = False
except ImportError:
- import mock
+ _missing_jsonschema_dep = True
+
+class TestResizefs(CiTestCase):
+ with_logs = True
-class TestResizefs(unittest.TestCase):
def setUp(self):
super(TestResizefs, self).setUp()
self.name = "resizefs"
@@ -34,7 +47,7 @@ class TestResizefs(unittest.TestCase):
58720296 3145728 3 freebsd-swap (1.5G)
61866024 1048496 - free - (512M)
""")
- res = cc_resizefs.can_skip_resize(fs_type, resize_what, devpth)
+ res = can_skip_resize(fs_type, resize_what, devpth)
self.assertTrue(res)
@mock.patch('cloudinit.config.cc_resizefs._get_dumpfs_output')
@@ -52,8 +65,210 @@ class TestResizefs(unittest.TestCase):
=> 34 297086 da0 GPT (145M)
34 297086 1 freebsd-ufs (145M)
""")
- res = cc_resizefs.can_skip_resize(fs_type, resize_what, devpth)
+ res = can_skip_resize(fs_type, resize_what, devpth)
self.assertTrue(res)
+ def test_handle_noops_on_disabled(self):
+ """The handle function logs when the configuration disables resize."""
+ cfg = {'resize_rootfs': False}
+ handle('cc_resizefs', cfg, _cloud=None, log=LOG, args=[])
+ self.assertIn(
+ 'DEBUG: Skipping module named cc_resizefs, resizing disabled\n',
+ self.logs.getvalue())
+
+ @skipIf(_missing_jsonschema_dep, "No python-jsonschema dependency")
+ def test_handle_schema_validation_logs_invalid_resize_rootfs_value(self):
+ """The handle reports json schema violations as a warning.
+
+ Invalid values for resize_rootfs result in disabling the module.
+ """
+ cfg = {'resize_rootfs': 'junk'}
+ handle('cc_resizefs', cfg, _cloud=None, log=LOG, args=[])
+ logs = self.logs.getvalue()
+ self.assertIn(
+ "WARNING: Invalid config:\nresize_rootfs: 'junk' is not one of"
+ " [True, False, 'noblock']",
+ logs)
+ self.assertIn(
+ 'DEBUG: Skipping module named cc_resizefs, resizing disabled\n',
+ logs)
+
+ @mock.patch('cloudinit.config.cc_resizefs.util.get_mount_info')
+ def test_handle_warns_on_unknown_mount_info(self, m_get_mount_info):
+ """handle warns when get_mount_info sees unknown filesystem for /."""
+ m_get_mount_info.return_value = None
+ cfg = {'resize_rootfs': True}
+ handle('cc_resizefs', cfg, _cloud=None, log=LOG, args=[])
+ logs = self.logs.getvalue()
+ self.assertNotIn("WARNING: Invalid config:\nresize_rootfs:", logs)
+ self.assertIn(
+ 'WARNING: Could not determine filesystem type of /\n',
+ logs)
+ self.assertEqual(
+ [mock.call('/', LOG)],
+ m_get_mount_info.call_args_list)
+
+ def test_handle_warns_on_undiscoverable_root_path_in_commandline(self):
+ """handle noops when the root path is not found on the commandline."""
+ cfg = {'resize_rootfs': True}
+ exists_mock_path = 'cloudinit.config.cc_resizefs.os.path.exists'
+
+ def fake_mount_info(path, log):
+ self.assertEqual('/', path)
+ self.assertEqual(LOG, log)
+ return ('/dev/root', 'ext4', '/')
+
+ with mock.patch(exists_mock_path) as m_exists:
+ m_exists.return_value = False
+ wrap_and_call(
+ 'cloudinit.config.cc_resizefs.util',
+ {'is_container': {'return_value': False},
+ 'get_mount_info': {'side_effect': fake_mount_info},
+ 'get_cmdline': {'return_value': 'BOOT_IMAGE=/vmlinuz.efi'}},
+ handle, 'cc_resizefs', cfg, _cloud=None, log=LOG,
+ args=[])
+ logs = self.logs.getvalue()
+ self.assertIn("WARNING: Unable to find device '/dev/root'", logs)
+
+
+class TestRootDevFromCmdline(CiTestCase):
+
+ def test_rootdev_from_cmdline_with_no_root(self):
+ """Return None from rootdev_from_cmdline when root is not present."""
+ invalid_cases = [
+ 'BOOT_IMAGE=/adsf asdfa werasef root adf', 'BOOT_IMAGE=/adsf', '']
+ for case in invalid_cases:
+ self.assertIsNone(rootdev_from_cmdline(case))
+
+ def test_rootdev_from_cmdline_with_root_startswith_dev(self):
+ """Return the cmdline root when the path starts with /dev."""
+ self.assertEqual(
+ '/dev/this', rootdev_from_cmdline('asdf root=/dev/this'))
+
+ def test_rootdev_from_cmdline_with_root_without_dev_prefix(self):
+ """Add /dev prefix to cmdline root when the path lacks the prefix."""
+ self.assertEqual('/dev/this', rootdev_from_cmdline('asdf root=this'))
+
+ def test_rootdev_from_cmdline_with_root_with_label(self):
+ """When cmdline root contains a LABEL, our root is disk/by-label."""
+ self.assertEqual(
+ '/dev/disk/by-label/unique',
+ rootdev_from_cmdline('asdf root=LABEL=unique'))
+
+ def test_rootdev_from_cmdline_with_root_with_uuid(self):
+ """When cmdline root contains a UUID, our root is disk/by-uuid."""
+ self.assertEqual(
+ '/dev/disk/by-uuid/adsfdsaf-adsf',
+ rootdev_from_cmdline('asdf root=UUID=adsfdsaf-adsf'))
+
+
+class TestIsDevicePathWritableBlock(CiTestCase):
+
+ with_logs = True
+
+ def test_is_device_path_writable_block_false_on_overlayroot(self):
+ """When devpath is overlayroot (on MAAS), is_dev_writable is False."""
+ info = 'does not matter'
+ is_writable = wrap_and_call(
+ 'cloudinit.config.cc_resizefs.util',
+ {'is_container': {'return_value': False}},
+ is_device_path_writable_block, 'overlayroot', info, LOG)
+ self.assertFalse(is_writable)
+ self.assertIn(
+ "Not attempting to resize devpath 'overlayroot'",
+ self.logs.getvalue())
+
+ def test_is_device_path_writable_block_warns_missing_cmdline_root(self):
+ """When root does not exist isn't in the cmdline, log warning."""
+ info = 'does not matter'
+
+ def fake_mount_info(path, log):
+ self.assertEqual('/', path)
+ self.assertEqual(LOG, log)
+ return ('/dev/root', 'ext4', '/')
+
+ exists_mock_path = 'cloudinit.config.cc_resizefs.os.path.exists'
+ with mock.patch(exists_mock_path) as m_exists:
+ m_exists.return_value = False
+ is_writable = wrap_and_call(
+ 'cloudinit.config.cc_resizefs.util',
+ {'is_container': {'return_value': False},
+ 'get_mount_info': {'side_effect': fake_mount_info},
+ 'get_cmdline': {'return_value': 'BOOT_IMAGE=/vmlinuz.efi'}},
+ is_device_path_writable_block, '/dev/root', info, LOG)
+ self.assertFalse(is_writable)
+ logs = self.logs.getvalue()
+ self.assertIn("WARNING: Unable to find device '/dev/root'", logs)
+
+ def test_is_device_path_writable_block_does_not_exist(self):
+ """When devpath does not exist, a warning is logged."""
+ info = 'dev=/I/dont/exist mnt_point=/ path=/dev/none'
+ is_writable = wrap_and_call(
+ 'cloudinit.config.cc_resizefs.util',
+ {'is_container': {'return_value': False}},
+ is_device_path_writable_block, '/I/dont/exist', info, LOG)
+ self.assertFalse(is_writable)
+ self.assertIn(
+ "WARNING: Device '/I/dont/exist' did not exist."
+ ' cannot resize: %s' % info,
+ self.logs.getvalue())
+
+ def test_is_device_path_writable_block_does_not_exist_in_container(self):
+ """When devpath does not exist in a container, log a debug message."""
+ info = 'dev=/I/dont/exist mnt_point=/ path=/dev/none'
+ is_writable = wrap_and_call(
+ 'cloudinit.config.cc_resizefs.util',
+ {'is_container': {'return_value': True}},
+ is_device_path_writable_block, '/I/dont/exist', info, LOG)
+ self.assertFalse(is_writable)
+ self.assertIn(
+ "DEBUG: Device '/I/dont/exist' did not exist in container."
+ ' cannot resize: %s' % info,
+ self.logs.getvalue())
+
+ def test_is_device_path_writable_block_raises_oserror(self):
+ """When unexpected OSError is raises by os.stat it is reraised."""
+ info = 'dev=/I/dont/exist mnt_point=/ path=/dev/none'
+ with self.assertRaises(OSError) as context_manager:
+ wrap_and_call(
+ 'cloudinit.config.cc_resizefs',
+ {'util.is_container': {'return_value': True},
+ 'os.stat': {'side_effect': OSError('Something unexpected')}},
+ is_device_path_writable_block, '/I/dont/exist', info, LOG)
+ self.assertEqual(
+ 'Something unexpected', str(context_manager.exception))
+
+ def test_is_device_path_writable_block_non_block(self):
+ """When device is not a block device, emit warning return False."""
+ fake_devpath = self.tmp_path('dev/readwrite')
+ util.write_file(fake_devpath, '', mode=0o600) # read-write
+ info = 'dev=/dev/root mnt_point=/ path={0}'.format(fake_devpath)
+
+ is_writable = wrap_and_call(
+ 'cloudinit.config.cc_resizefs.util',
+ {'is_container': {'return_value': False}},
+ is_device_path_writable_block, fake_devpath, info, LOG)
+ self.assertFalse(is_writable)
+ self.assertIn(
+ "WARNING: device '{0}' not a block device. cannot resize".format(
+ fake_devpath),
+ self.logs.getvalue())
+
+ def test_is_device_path_writable_block_non_block_on_container(self):
+ """When device is non-block device in container, emit debug log."""
+ fake_devpath = self.tmp_path('dev/readwrite')
+ util.write_file(fake_devpath, '', mode=0o600) # read-write
+ info = 'dev=/dev/root mnt_point=/ path={0}'.format(fake_devpath)
+
+ is_writable = wrap_and_call(
+ 'cloudinit.config.cc_resizefs.util',
+ {'is_container': {'return_value': True}},
+ is_device_path_writable_block, fake_devpath, info, LOG)
+ self.assertFalse(is_writable)
+ self.assertIn(
+ "DEBUG: device '{0}' not a block device in container."
+ ' cannot resize'.format(fake_devpath),
+ self.logs.getvalue())
+
# vi: ts=4 expandtab
diff --git a/tests/unittests/test_handler/test_handler_rsyslog.py b/tests/unittests/test_handler/test_handler_rsyslog.py
index cca06678..8c8e2838 100644
--- a/tests/unittests/test_handler/test_handler_rsyslog.py
+++ b/tests/unittests/test_handler/test_handler_rsyslog.py
@@ -9,7 +9,7 @@ from cloudinit.config.cc_rsyslog import (
parse_remotes_line, remotes_to_rsyslog_cfg)
from cloudinit import util
-from .. import helpers as t_help
+from cloudinit.tests import helpers as t_help
class TestLoadConfig(t_help.TestCase):
diff --git a/tests/unittests/test_handler/test_handler_runcmd.py b/tests/unittests/test_handler/test_handler_runcmd.py
new file mode 100644
index 00000000..374c1d31
--- /dev/null
+++ b/tests/unittests/test_handler/test_handler_runcmd.py
@@ -0,0 +1,108 @@
+# This file is part of cloud-init. See LICENSE file for license information.
+
+from cloudinit.config import cc_runcmd
+from cloudinit.sources import DataSourceNone
+from cloudinit import (distros, helpers, cloud, util)
+from cloudinit.tests.helpers import FilesystemMockingTestCase, skipIf
+
+import logging
+import os
+import stat
+
+try:
+ import jsonschema
+ assert jsonschema # avoid pyflakes error F401: import unused
+ _missing_jsonschema_dep = False
+except ImportError:
+ _missing_jsonschema_dep = True
+
+LOG = logging.getLogger(__name__)
+
+
+class TestRuncmd(FilesystemMockingTestCase):
+
+ with_logs = True
+
+ def setUp(self):
+ super(TestRuncmd, self).setUp()
+ self.subp = util.subp
+ self.new_root = self.tmp_dir()
+
+ def _get_cloud(self, distro):
+ self.patchUtils(self.new_root)
+ paths = helpers.Paths({'scripts': self.new_root})
+ cls = distros.fetch(distro)
+ mydist = cls(distro, {}, paths)
+ myds = DataSourceNone.DataSourceNone({}, mydist, paths)
+ paths.datasource = myds
+ return cloud.Cloud(myds, paths, {}, mydist, None)
+
+ def test_handler_skip_if_no_runcmd(self):
+ """When the provided config doesn't contain runcmd, skip it."""
+ cfg = {}
+ mycloud = self._get_cloud('ubuntu')
+ cc_runcmd.handle('notimportant', cfg, mycloud, LOG, None)
+ self.assertIn(
+ "Skipping module named notimportant, no 'runcmd' key",
+ self.logs.getvalue())
+
+ def test_handler_invalid_command_set(self):
+ """Commands which can't be converted to shell will raise errors."""
+ invalid_config = {'runcmd': 1}
+ cc = self._get_cloud('ubuntu')
+ cc_runcmd.handle('cc_runcmd', invalid_config, cc, LOG, [])
+ self.assertIn(
+ 'Failed to shellify 1 into file'
+ ' /var/lib/cloud/instances/iid-datasource-none/scripts/runcmd',
+ self.logs.getvalue())
+
+ @skipIf(_missing_jsonschema_dep, "No python-jsonschema dependency")
+ def test_handler_schema_validation_warns_non_array_type(self):
+ """Schema validation warns of non-array type for runcmd key.
+
+ Schema validation is not strict, so runcmd attempts to shellify the
+ invalid content.
+ """
+ invalid_config = {'runcmd': 1}
+ cc = self._get_cloud('ubuntu')
+ cc_runcmd.handle('cc_runcmd', invalid_config, cc, LOG, [])
+ self.assertIn(
+ 'Invalid config:\nruncmd: 1 is not of type \'array\'',
+ self.logs.getvalue())
+ self.assertIn('Failed to shellify', self.logs.getvalue())
+
+ @skipIf(_missing_jsonschema_dep, 'No python-jsonschema dependency')
+ def test_handler_schema_validation_warns_non_array_item_type(self):
+ """Schema validation warns of non-array or string runcmd items.
+
+ Schema validation is not strict, so runcmd attempts to shellify the
+ invalid content.
+ """
+ invalid_config = {
+ 'runcmd': ['ls /', 20, ['wget', 'http://stuff/blah'], {'a': 'n'}]}
+ cc = self._get_cloud('ubuntu')
+ cc_runcmd.handle('cc_runcmd', invalid_config, cc, LOG, [])
+ expected_warnings = [
+ 'runcmd.1: 20 is not valid under any of the given schemas',
+ 'runcmd.3: {\'a\': \'n\'} is not valid under any of the given'
+ ' schema'
+ ]
+ logs = self.logs.getvalue()
+ for warning in expected_warnings:
+ self.assertIn(warning, logs)
+ self.assertIn('Failed to shellify', logs)
+
+ def test_handler_write_valid_runcmd_schema_to_file(self):
+ """Valid runcmd schema is written to a runcmd shell script."""
+ valid_config = {'runcmd': [['ls', '/']]}
+ cc = self._get_cloud('ubuntu')
+ cc_runcmd.handle('cc_runcmd', valid_config, cc, LOG, [])
+ runcmd_file = os.path.join(
+ self.new_root,
+ 'var/lib/cloud/instances/iid-datasource-none/scripts/runcmd')
+ self.assertEqual("#!/bin/sh\n'ls' '/'\n", util.load_file(runcmd_file))
+ file_stat = os.stat(runcmd_file)
+ self.assertEqual(0o700, stat.S_IMODE(file_stat.st_mode))
+
+
+# vi: ts=4 expandtab
diff --git a/tests/unittests/test_handler/test_handler_seed_random.py b/tests/unittests/test_handler/test_handler_seed_random.py
index e5e607fb..f60dedc2 100644
--- a/tests/unittests/test_handler/test_handler_seed_random.py
+++ b/tests/unittests/test_handler/test_handler_seed_random.py
@@ -22,7 +22,7 @@ from cloudinit import util
from cloudinit.sources import DataSourceNone
-from .. import helpers as t_help
+from cloudinit.tests import helpers as t_help
import logging
diff --git a/tests/unittests/test_handler/test_handler_set_hostname.py b/tests/unittests/test_handler/test_handler_set_hostname.py
index 4b18de75..abdc17e7 100644
--- a/tests/unittests/test_handler/test_handler_set_hostname.py
+++ b/tests/unittests/test_handler/test_handler_set_hostname.py
@@ -7,7 +7,7 @@ from cloudinit import distros
from cloudinit import helpers
from cloudinit import util
-from .. import helpers as t_help
+from cloudinit.tests import helpers as t_help
from configobj import ConfigObj
import logging
@@ -70,7 +70,8 @@ class TestHostname(t_help.FilesystemMockingTestCase):
cc = cloud.Cloud(ds, paths, {}, distro, None)
self.patchUtils(self.tmp)
cc_set_hostname.handle('cc_set_hostname', cfg, cc, LOG, [])
- contents = util.load_file("/etc/HOSTNAME")
- self.assertEqual('blah', contents.strip())
+ if not distro.uses_systemd():
+ contents = util.load_file(distro.hostname_conf_fn)
+ self.assertEqual('blah', contents.strip())
# vi: ts=4 expandtab
diff --git a/tests/unittests/test_handler/test_handler_snappy.py b/tests/unittests/test_handler/test_handler_snappy.py
index e4d07622..76b79c29 100644
--- a/tests/unittests/test_handler/test_handler_snappy.py
+++ b/tests/unittests/test_handler/test_handler_snappy.py
@@ -7,9 +7,9 @@ from cloudinit.config.cc_snap_config import (
from cloudinit import (distros, helpers, cloud, util)
from cloudinit.config.cc_snap_config import handle as snap_handle
from cloudinit.sources import DataSourceNone
-from ..helpers import FilesystemMockingTestCase, mock
+from cloudinit.tests.helpers import FilesystemMockingTestCase, mock
-from .. import helpers as t_help
+from cloudinit.tests import helpers as t_help
import logging
import os
diff --git a/tests/unittests/test_handler/test_handler_spacewalk.py b/tests/unittests/test_handler/test_handler_spacewalk.py
index 28b5892a..ddbf4a79 100644
--- a/tests/unittests/test_handler/test_handler_spacewalk.py
+++ b/tests/unittests/test_handler/test_handler_spacewalk.py
@@ -3,7 +3,7 @@
from cloudinit.config import cc_spacewalk
from cloudinit import util
-from .. import helpers
+from cloudinit.tests import helpers
import logging
diff --git a/tests/unittests/test_handler/test_handler_timezone.py b/tests/unittests/test_handler/test_handler_timezone.py
index c30fbdfe..27eedded 100644
--- a/tests/unittests/test_handler/test_handler_timezone.py
+++ b/tests/unittests/test_handler/test_handler_timezone.py
@@ -13,7 +13,7 @@ from cloudinit import util
from cloudinit.sources import DataSourceNoCloud
-from .. import helpers as t_help
+from cloudinit.tests import helpers as t_help
from configobj import ConfigObj
import logging
diff --git a/tests/unittests/test_handler/test_handler_write_files.py b/tests/unittests/test_handler/test_handler_write_files.py
index 1129e77d..7fa8fd21 100644
--- a/tests/unittests/test_handler/test_handler_write_files.py
+++ b/tests/unittests/test_handler/test_handler_write_files.py
@@ -4,7 +4,7 @@ from cloudinit.config.cc_write_files import write_files, decode_perms
from cloudinit import log as logging
from cloudinit import util
-from ..helpers import CiTestCase, FilesystemMockingTestCase
+from cloudinit.tests.helpers import CiTestCase, FilesystemMockingTestCase
import base64
import gzip
diff --git a/tests/unittests/test_handler/test_handler_yum_add_repo.py b/tests/unittests/test_handler/test_handler_yum_add_repo.py
index c4396df5..b7adbe50 100644
--- a/tests/unittests/test_handler/test_handler_yum_add_repo.py
+++ b/tests/unittests/test_handler/test_handler_yum_add_repo.py
@@ -3,7 +3,7 @@
from cloudinit.config import cc_yum_add_repo
from cloudinit import util
-from .. import helpers
+from cloudinit.tests import helpers
try:
from configparser import ConfigParser
diff --git a/tests/unittests/test_handler/test_handler_zypper_add_repo.py b/tests/unittests/test_handler/test_handler_zypper_add_repo.py
new file mode 100644
index 00000000..315c2a5e
--- /dev/null
+++ b/tests/unittests/test_handler/test_handler_zypper_add_repo.py
@@ -0,0 +1,237 @@
+# This file is part of cloud-init. See LICENSE file for license information.
+
+import glob
+import os
+
+from cloudinit.config import cc_zypper_add_repo
+from cloudinit import util
+
+from cloudinit.tests import helpers
+from cloudinit.tests.helpers import mock
+
+try:
+ from configparser import ConfigParser
+except ImportError:
+ from ConfigParser import ConfigParser
+import logging
+from six import StringIO
+
+LOG = logging.getLogger(__name__)
+
+
+class TestConfig(helpers.FilesystemMockingTestCase):
+ def setUp(self):
+ super(TestConfig, self).setUp()
+ self.tmp = self.tmp_dir()
+ self.zypp_conf = 'etc/zypp/zypp.conf'
+
+ def test_bad_repo_config(self):
+ """Config has no baseurl, no file should be written"""
+ cfg = {
+ 'repos': [
+ {
+ 'id': 'foo',
+ 'name': 'suse-test',
+ 'enabled': '1'
+ },
+ ]
+ }
+ self.patchUtils(self.tmp)
+ cc_zypper_add_repo._write_repos(cfg['repos'], '/etc/zypp/repos.d')
+ self.assertRaises(IOError, util.load_file,
+ "/etc/zypp/repos.d/foo.repo")
+
+ def test_write_repos(self):
+ """Verify valid repos get written"""
+ cfg = self._get_base_config_repos()
+ root_d = self.tmp_dir()
+ cc_zypper_add_repo._write_repos(cfg['zypper']['repos'], root_d)
+ repos = glob.glob('%s/*.repo' % root_d)
+ expected_repos = ['testing-foo.repo', 'testing-bar.repo']
+ if len(repos) != 2:
+ assert 'Number of repos written is "%d" expected 2' % len(repos)
+ for repo in repos:
+ repo_name = os.path.basename(repo)
+ if repo_name not in expected_repos:
+ assert 'Found repo with name "%s"; unexpected' % repo_name
+ # Validation that the content gets properly written is in another test
+
+ def test_write_repo(self):
+ """Verify the content of a repo file"""
+ cfg = {
+ 'repos': [
+ {
+ 'baseurl': 'http://foo',
+ 'name': 'test-foo',
+ 'id': 'testing-foo'
+ },
+ ]
+ }
+ root_d = self.tmp_dir()
+ cc_zypper_add_repo._write_repos(cfg['repos'], root_d)
+ contents = util.load_file("%s/testing-foo.repo" % root_d)
+ parser = ConfigParser()
+ parser.readfp(StringIO(contents))
+ expected = {
+ 'testing-foo': {
+ 'name': 'test-foo',
+ 'baseurl': 'http://foo',
+ 'enabled': '1',
+ 'autorefresh': '1'
+ }
+ }
+ for section in expected:
+ self.assertTrue(parser.has_section(section),
+ "Contains section {0}".format(section))
+ for k, v in expected[section].items():
+ self.assertEqual(parser.get(section, k), v)
+
+ def test_config_write(self):
+ """Write valid configuration data"""
+ cfg = {
+ 'config': {
+ 'download.deltarpm': 'False',
+ 'reposdir': 'foo'
+ }
+ }
+ root_d = self.tmp_dir()
+ helpers.populate_dir(root_d, {self.zypp_conf: '# Zypp config\n'})
+ self.reRoot(root_d)
+ cc_zypper_add_repo._write_zypp_config(cfg['config'])
+ cfg_out = os.path.join(root_d, self.zypp_conf)
+ contents = util.load_file(cfg_out)
+ expected = [
+ '# Zypp config',
+ '# Added via cloud.cfg',
+ 'download.deltarpm=False',
+ 'reposdir=foo'
+ ]
+ for item in contents.split('\n'):
+ if item not in expected:
+ self.assertIsNone(item)
+
+ @mock.patch('cloudinit.log.logging')
+ def test_config_write_skip_configdir(self, mock_logging):
+ """Write configuration but skip writing 'configdir' setting"""
+ cfg = {
+ 'config': {
+ 'download.deltarpm': 'False',
+ 'reposdir': 'foo',
+ 'configdir': 'bar'
+ }
+ }
+ root_d = self.tmp_dir()
+ helpers.populate_dir(root_d, {self.zypp_conf: '# Zypp config\n'})
+ self.reRoot(root_d)
+ cc_zypper_add_repo._write_zypp_config(cfg['config'])
+ cfg_out = os.path.join(root_d, self.zypp_conf)
+ contents = util.load_file(cfg_out)
+ expected = [
+ '# Zypp config',
+ '# Added via cloud.cfg',
+ 'download.deltarpm=False',
+ 'reposdir=foo'
+ ]
+ for item in contents.split('\n'):
+ if item not in expected:
+ self.assertIsNone(item)
+ # Not finding teh right path for mocking :(
+ # assert mock_logging.warning.called
+
+ def test_empty_config_section_no_new_data(self):
+ """When the config section is empty no new data should be written to
+ zypp.conf"""
+ cfg = self._get_base_config_repos()
+ cfg['zypper']['config'] = None
+ root_d = self.tmp_dir()
+ helpers.populate_dir(root_d, {self.zypp_conf: '# No data'})
+ self.reRoot(root_d)
+ cc_zypper_add_repo._write_zypp_config(cfg.get('config', {}))
+ cfg_out = os.path.join(root_d, self.zypp_conf)
+ contents = util.load_file(cfg_out)
+ self.assertEqual(contents, '# No data')
+
+ def test_empty_config_value_no_new_data(self):
+ """When the config section is not empty but there are no values
+ no new data should be written to zypp.conf"""
+ cfg = self._get_base_config_repos()
+ cfg['zypper']['config'] = {
+ 'download.deltarpm': None
+ }
+ root_d = self.tmp_dir()
+ helpers.populate_dir(root_d, {self.zypp_conf: '# No data'})
+ self.reRoot(root_d)
+ cc_zypper_add_repo._write_zypp_config(cfg.get('config', {}))
+ cfg_out = os.path.join(root_d, self.zypp_conf)
+ contents = util.load_file(cfg_out)
+ self.assertEqual(contents, '# No data')
+
+ def test_handler_full_setup(self):
+ """Test that the handler ends up calling the renderers"""
+ cfg = self._get_base_config_repos()
+ cfg['zypper']['config'] = {
+ 'download.deltarpm': 'False',
+ }
+ root_d = self.tmp_dir()
+ os.makedirs('%s/etc/zypp/repos.d' % root_d)
+ helpers.populate_dir(root_d, {self.zypp_conf: '# Zypp config\n'})
+ self.reRoot(root_d)
+ cc_zypper_add_repo.handle('zypper_add_repo', cfg, None, LOG, [])
+ cfg_out = os.path.join(root_d, self.zypp_conf)
+ contents = util.load_file(cfg_out)
+ expected = [
+ '# Zypp config',
+ '# Added via cloud.cfg',
+ 'download.deltarpm=False',
+ ]
+ for item in contents.split('\n'):
+ if item not in expected:
+ self.assertIsNone(item)
+ repos = glob.glob('%s/etc/zypp/repos.d/*.repo' % root_d)
+ expected_repos = ['testing-foo.repo', 'testing-bar.repo']
+ if len(repos) != 2:
+ assert 'Number of repos written is "%d" expected 2' % len(repos)
+ for repo in repos:
+ repo_name = os.path.basename(repo)
+ if repo_name not in expected_repos:
+ assert 'Found repo with name "%s"; unexpected' % repo_name
+
+ def test_no_config_section_no_new_data(self):
+ """When there is no config section no new data should be written to
+ zypp.conf"""
+ cfg = self._get_base_config_repos()
+ root_d = self.tmp_dir()
+ helpers.populate_dir(root_d, {self.zypp_conf: '# No data'})
+ self.reRoot(root_d)
+ cc_zypper_add_repo._write_zypp_config(cfg.get('config', {}))
+ cfg_out = os.path.join(root_d, self.zypp_conf)
+ contents = util.load_file(cfg_out)
+ self.assertEqual(contents, '# No data')
+
+ def test_no_repo_data(self):
+ """When there is no repo data nothing should happen"""
+ root_d = self.tmp_dir()
+ self.reRoot(root_d)
+ cc_zypper_add_repo._write_repos(None, root_d)
+ content = glob.glob('%s/*' % root_d)
+ self.assertEqual(len(content), 0)
+
+ def _get_base_config_repos(self):
+ """Basic valid repo configuration"""
+ cfg = {
+ 'zypper': {
+ 'repos': [
+ {
+ 'baseurl': 'http://foo',
+ 'name': 'test-foo',
+ 'id': 'testing-foo'
+ },
+ {
+ 'baseurl': 'http://bar',
+ 'name': 'test-bar',
+ 'id': 'testing-bar'
+ }
+ ]
+ }
+ }
+ return cfg
diff --git a/tests/unittests/test_handler/test_schema.py b/tests/unittests/test_handler/test_schema.py
index eda4802a..b8fc8930 100644
--- a/tests/unittests/test_handler/test_schema.py
+++ b/tests/unittests/test_handler/test_schema.py
@@ -1,16 +1,17 @@
# This file is part of cloud-init. See LICENSE file for license information.
from cloudinit.config.schema import (
- CLOUD_CONFIG_HEADER, SchemaValidationError, get_schema_doc,
- validate_cloudconfig_file, validate_cloudconfig_schema,
- main)
+ CLOUD_CONFIG_HEADER, SchemaValidationError, annotated_cloudconfig_file,
+ get_schema_doc, get_schema, validate_cloudconfig_file,
+ validate_cloudconfig_schema, main)
from cloudinit.util import write_file
-from ..helpers import CiTestCase, mock, skipIf
+from cloudinit.tests.helpers import CiTestCase, mock, skipIf
from copy import copy
from six import StringIO
from textwrap import dedent
+from yaml import safe_load
try:
import jsonschema
@@ -20,6 +21,35 @@ except ImportError:
_missing_jsonschema_dep = True
+class GetSchemaTest(CiTestCase):
+
+ def test_get_schema_coalesces_known_schema(self):
+ """Every cloudconfig module with schema is listed in allOf keyword."""
+ schema = get_schema()
+ self.assertItemsEqual(
+ [
+ 'cc_bootcmd',
+ 'cc_ntp',
+ 'cc_resizefs',
+ 'cc_runcmd',
+ 'cc_zypper_add_repo'
+ ],
+ [subschema['id'] for subschema in schema['allOf']])
+ self.assertEqual('cloud-config-schema', schema['id'])
+ self.assertEqual(
+ 'http://json-schema.org/draft-04/schema#',
+ schema['$schema'])
+ # FULL_SCHEMA is updated by the get_schema call
+ from cloudinit.config.schema import FULL_SCHEMA
+ self.assertItemsEqual(['id', '$schema', 'allOf'], FULL_SCHEMA.keys())
+
+ def test_get_schema_returns_global_when_set(self):
+ """When FULL_SCHEMA global is already set, get_schema returns it."""
+ m_schema_path = 'cloudinit.config.schema.FULL_SCHEMA'
+ with mock.patch(m_schema_path, {'here': 'iam'}):
+ self.assertEqual({'here': 'iam'}, get_schema())
+
+
class SchemaValidationErrorTest(CiTestCase):
"""Test validate_cloudconfig_schema"""
@@ -151,11 +181,11 @@ class GetSchemaDocTest(CiTestCase):
full_schema.update(
{'properties': {
'prop1': {'type': 'array', 'description': 'prop-description',
- 'items': {'type': 'int'}}}})
+ 'items': {'type': 'integer'}}}})
self.assertEqual(
dedent("""
name
- ---
+ ----
**Summary:** title
description
@@ -167,25 +197,65 @@ class GetSchemaDocTest(CiTestCase):
**Supported distros:** debian, rhel
**Config schema**:
- **prop1:** (array of int) prop-description\n\n"""),
+ **prop1:** (array of integer) prop-description\n\n"""),
+ get_schema_doc(full_schema))
+
+ def test_get_schema_doc_handles_multiple_types(self):
+ """get_schema_doc delimits multiple property types with a '/'."""
+ full_schema = copy(self.required_schema)
+ full_schema.update(
+ {'properties': {
+ 'prop1': {'type': ['string', 'integer'],
+ 'description': 'prop-description'}}})
+ self.assertIn(
+ '**prop1:** (string/integer) prop-description',
+ get_schema_doc(full_schema))
+
+ def test_get_schema_doc_handles_enum_types(self):
+ """get_schema_doc converts enum types to yaml and delimits with '/'."""
+ full_schema = copy(self.required_schema)
+ full_schema.update(
+ {'properties': {
+ 'prop1': {'enum': [True, False, 'stuff'],
+ 'description': 'prop-description'}}})
+ self.assertIn(
+ '**prop1:** (true/false/stuff) prop-description',
+ get_schema_doc(full_schema))
+
+ def test_get_schema_doc_handles_nested_oneof_property_types(self):
+ """get_schema_doc describes array items oneOf declarations in type."""
+ full_schema = copy(self.required_schema)
+ full_schema.update(
+ {'properties': {
+ 'prop1': {'type': 'array',
+ 'items': {
+ 'oneOf': [{'type': 'string'},
+ {'type': 'integer'}]},
+ 'description': 'prop-description'}}})
+ self.assertIn(
+ '**prop1:** (array of (string)/(integer)) prop-description',
get_schema_doc(full_schema))
- def test_get_schema_doc_returns_restructured_text_with_examples(self):
- """get_schema_doc returns indented examples when present in schema."""
+ def test_get_schema_doc_handles_string_examples(self):
+ """get_schema_doc properly indented examples as a list of strings."""
full_schema = copy(self.required_schema)
full_schema.update(
- {'examples': {'ex1': [1, 2, 3]},
+ {'examples': ['ex1:\n [don\'t, expand, "this"]', 'ex2: true'],
'properties': {
'prop1': {'type': 'array', 'description': 'prop-description',
- 'items': {'type': 'int'}}}})
+ 'items': {'type': 'integer'}}}})
self.assertIn(
dedent("""
**Config schema**:
- **prop1:** (array of int) prop-description
+ **prop1:** (array of integer) prop-description
**Examples**::
- ex1"""),
+ ex1:
+ [don't, expand, "this"]
+ # --- Example2 ---
+ ex2: true
+ """),
get_schema_doc(full_schema))
def test_get_schema_doc_raises_key_errors(self):
@@ -198,13 +268,78 @@ class GetSchemaDocTest(CiTestCase):
self.assertIn(key, str(context_mgr.exception))
+class AnnotatedCloudconfigFileTest(CiTestCase):
+ maxDiff = None
+
+ def test_annotated_cloudconfig_file_no_schema_errors(self):
+ """With no schema_errors, print the original content."""
+ content = b'ntp:\n pools: [ntp1.pools.com]\n'
+ self.assertEqual(
+ content,
+ annotated_cloudconfig_file({}, content, schema_errors=[]))
+
+ def test_annotated_cloudconfig_file_schema_annotates_and_adds_footer(self):
+ """With schema_errors, error lines are annotated and a footer added."""
+ content = dedent("""\
+ #cloud-config
+ # comment
+ ntp:
+ pools: [-99, 75]
+ """).encode()
+ expected = dedent("""\
+ #cloud-config
+ # comment
+ ntp: # E1
+ pools: [-99, 75] # E2,E3
+
+ # Errors: -------------
+ # E1: Some type error
+ # E2: -99 is not a string
+ # E3: 75 is not a string
+
+ """)
+ parsed_config = safe_load(content[13:])
+ schema_errors = [
+ ('ntp', 'Some type error'), ('ntp.pools.0', '-99 is not a string'),
+ ('ntp.pools.1', '75 is not a string')]
+ self.assertEqual(
+ expected,
+ annotated_cloudconfig_file(parsed_config, content, schema_errors))
+
+ def test_annotated_cloudconfig_file_annotates_separate_line_items(self):
+ """Errors are annotated for lists with items on separate lines."""
+ content = dedent("""\
+ #cloud-config
+ # comment
+ ntp:
+ pools:
+ - -99
+ - 75
+ """).encode()
+ expected = dedent("""\
+ ntp:
+ pools:
+ - -99 # E1
+ - 75 # E2
+ """)
+ parsed_config = safe_load(content[13:])
+ schema_errors = [
+ ('ntp.pools.0', '-99 is not a string'),
+ ('ntp.pools.1', '75 is not a string')]
+ self.assertIn(
+ expected,
+ annotated_cloudconfig_file(parsed_config, content, schema_errors))
+
+
class MainTest(CiTestCase):
def test_main_missing_args(self):
"""Main exits non-zero and reports an error on missing parameters."""
with mock.patch('sys.argv', ['mycmd']):
with mock.patch('sys.stderr', new_callable=StringIO) as m_stderr:
- self.assertEqual(1, main(), 'Expected non-zero exit code')
+ with self.assertRaises(SystemExit) as context_manager:
+ main()
+ self.assertEqual('1', str(context_manager.exception))
self.assertEqual(
'Expected either --config-file argument or --doc\n',
m_stderr.getvalue())
@@ -216,13 +351,13 @@ class MainTest(CiTestCase):
with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout:
self.assertEqual(0, main(), 'Expected 0 exit code')
self.assertIn('\nNTP\n---\n', m_stdout.getvalue())
+ self.assertIn('\nRuncmd\n------\n', m_stdout.getvalue())
def test_main_validates_config_file(self):
"""When --config-file parameter is provided, main validates schema."""
myyaml = self.tmp_path('my.yaml')
myargs = ['mycmd', '--config-file', myyaml]
- with open(myyaml, 'wb') as stream:
- stream.write(b'#cloud-config\nntp:') # shortest ntp schema
+ write_file(myyaml, b'#cloud-config\nntp:') # shortest ntp schema
with mock.patch('sys.argv', myargs):
with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout:
self.assertEqual(0, main(), 'Expected 0 exit code')