summaryrefslogtreecommitdiff
path: root/tests/unittests/test_handler
diff options
context:
space:
mode:
Diffstat (limited to 'tests/unittests/test_handler')
-rw-r--r--tests/unittests/test_handler/test_handler_lxd.py75
-rw-r--r--tests/unittests/test_handler/test_handler_power_state.py47
-rw-r--r--tests/unittests/test_handler/test_handler_seed_random.py13
-rw-r--r--tests/unittests/test_handler/test_handler_snappy.py3
-rw-r--r--tests/unittests/test_handler/test_handler_write_files.py112
5 files changed, 239 insertions, 11 deletions
diff --git a/tests/unittests/test_handler/test_handler_lxd.py b/tests/unittests/test_handler/test_handler_lxd.py
new file mode 100644
index 00000000..7ffa2a53
--- /dev/null
+++ b/tests/unittests/test_handler/test_handler_lxd.py
@@ -0,0 +1,75 @@
+from cloudinit.config import cc_lxd
+from cloudinit import (distros, helpers, cloud)
+from cloudinit.sources import DataSourceNoCloud
+from .. import helpers as t_help
+
+import logging
+
+try:
+ from unittest import mock
+except ImportError:
+ import mock
+
+LOG = logging.getLogger(__name__)
+
+
+class TestLxd(t_help.TestCase):
+ lxd_cfg = {
+ 'lxd': {
+ 'init': {
+ 'network_address': '0.0.0.0',
+ 'storage_backend': 'zfs',
+ 'storage_pool': 'poolname',
+ }
+ }
+ }
+
+ def setUp(self):
+ super(TestLxd, self).setUp()
+
+ def _get_cloud(self, distro):
+ cls = distros.fetch(distro)
+ paths = helpers.Paths({})
+ d = cls(distro, {}, paths)
+ ds = DataSourceNoCloud.DataSourceNoCloud({}, d, paths)
+ cc = cloud.Cloud(ds, paths, {}, d, None)
+ return cc
+
+ @mock.patch("cloudinit.config.cc_lxd.util")
+ def test_lxd_init(self, mock_util):
+ cc = self._get_cloud('ubuntu')
+ mock_util.which.return_value = True
+ cc_lxd.handle('cc_lxd', self.lxd_cfg, cc, LOG, [])
+ self.assertTrue(mock_util.which.called)
+ init_call = mock_util.subp.call_args_list[0][0][0]
+ self.assertEquals(init_call,
+ ['lxd', 'init', '--auto',
+ '--network-address=0.0.0.0',
+ '--storage-backend=zfs',
+ '--storage-pool=poolname'])
+
+ @mock.patch("cloudinit.config.cc_lxd.util")
+ def test_lxd_install(self, mock_util):
+ cc = self._get_cloud('ubuntu')
+ cc.distro = mock.MagicMock()
+ mock_util.which.return_value = None
+ cc_lxd.handle('cc_lxd', self.lxd_cfg, cc, LOG, [])
+ self.assertTrue(cc.distro.install_packages.called)
+ install_pkg = cc.distro.install_packages.call_args_list[0][0][0]
+ self.assertEquals(sorted(install_pkg), ['lxd', 'zfs'])
+
+ @mock.patch("cloudinit.config.cc_lxd.util")
+ def test_no_init_does_nothing(self, mock_util):
+ cc = self._get_cloud('ubuntu')
+ cc.distro = mock.MagicMock()
+ cc_lxd.handle('cc_lxd', {'lxd': {}}, cc, LOG, [])
+ self.assertFalse(cc.distro.install_packages.called)
+ self.assertFalse(mock_util.subp.called)
+
+ @mock.patch("cloudinit.config.cc_lxd.util")
+ def test_no_lxd_does_nothing(self, mock_util):
+ cc = self._get_cloud('ubuntu')
+ cc.distro = mock.MagicMock()
+ cc_lxd.handle('cc_lxd', {'package_update': True}, cc, LOG, [])
+ self.assertFalse(cc.distro.install_packages.called)
+ self.assertFalse(mock_util.subp.called)
diff --git a/tests/unittests/test_handler/test_handler_power_state.py b/tests/unittests/test_handler/test_handler_power_state.py
index 2f86b8f8..04ce5687 100644
--- a/tests/unittests/test_handler/test_handler_power_state.py
+++ b/tests/unittests/test_handler/test_handler_power_state.py
@@ -1,6 +1,9 @@
+import sys
+
from cloudinit.config import cc_power_state_change as psc
from .. import helpers as t_help
+from ..helpers import mock
class TestLoadPowerState(t_help.TestCase):
@@ -9,12 +12,12 @@ class TestLoadPowerState(t_help.TestCase):
def test_no_config(self):
# completely empty config should mean do nothing
- (cmd, _timeout) = psc.load_power_state({})
+ (cmd, _timeout, _condition) = psc.load_power_state({})
self.assertEqual(cmd, None)
def test_irrelevant_config(self):
# no power_state field in config should return None for cmd
- (cmd, _timeout) = psc.load_power_state({'foo': 'bar'})
+ (cmd, _timeout, _condition) = psc.load_power_state({'foo': 'bar'})
self.assertEqual(cmd, None)
def test_invalid_mode(self):
@@ -53,23 +56,59 @@ class TestLoadPowerState(t_help.TestCase):
def test_no_message(self):
# if message is not present, then no argument should be passed for it
cfg = {'power_state': {'mode': 'poweroff'}}
- (cmd, _timeout) = psc.load_power_state(cfg)
+ (cmd, _timeout, _condition) = psc.load_power_state(cfg)
self.assertNotIn("", cmd)
check_lps_ret(psc.load_power_state(cfg))
self.assertTrue(len(cmd) == 3)
+ def test_condition_null_raises(self):
+ cfg = {'power_state': {'mode': 'poweroff', 'condition': None}}
+ self.assertRaises(TypeError, psc.load_power_state, cfg)
+
+ def test_condition_default_is_true(self):
+ cfg = {'power_state': {'mode': 'poweroff'}}
+ _cmd, _timeout, cond = psc.load_power_state(cfg)
+ self.assertEqual(cond, True)
+
+
+class TestCheckCondition(t_help.TestCase):
+ def cmd_with_exit(self, rc):
+ return([sys.executable, '-c', 'import sys; sys.exit(%s)' % rc])
+
+ def test_true_is_true(self):
+ self.assertEqual(psc.check_condition(True), True)
+
+ def test_false_is_false(self):
+ self.assertEqual(psc.check_condition(False), False)
+
+ def test_cmd_exit_zero_true(self):
+ self.assertEqual(psc.check_condition(self.cmd_with_exit(0)), True)
+
+ def test_cmd_exit_one_false(self):
+ self.assertEqual(psc.check_condition(self.cmd_with_exit(1)), False)
+
+ def test_cmd_exit_nonzero_warns(self):
+ mocklog = mock.Mock()
+ self.assertEqual(
+ psc.check_condition(self.cmd_with_exit(2), mocklog), False)
+ self.assertEqual(mocklog.warn.call_count, 1)
+
def check_lps_ret(psc_return, mode=None):
- if len(psc_return) != 2:
+ if len(psc_return) != 3:
raise TypeError("length returned = %d" % len(psc_return))
errs = []
cmd = psc_return[0]
timeout = psc_return[1]
+ condition = psc_return[2]
if 'shutdown' not in psc_return[0][0]:
errs.append("string 'shutdown' not in cmd")
+ if condition is None:
+ errs.append("condition was not returned")
+
if mode is not None:
opt = {'halt': '-H', 'poweroff': '-P', 'reboot': '-r'}[mode]
if opt not in psc_return[0]:
diff --git a/tests/unittests/test_handler/test_handler_seed_random.py b/tests/unittests/test_handler/test_handler_seed_random.py
index 0bcdcb31..98bc9b81 100644
--- a/tests/unittests/test_handler/test_handler_seed_random.py
+++ b/tests/unittests/test_handler/test_handler_seed_random.py
@@ -170,27 +170,30 @@ class TestRandomSeed(t_help.TestCase):
contents = util.load_file(self._seed_file)
self.assertEquals('tiny-tim-was-here-so-was-josh', contents)
- def test_seed_command_not_provided_pollinate_available(self):
+ def test_seed_command_provided_and_available(self):
c = self._get_cloud('ubuntu', {})
self.whichdata = {'pollinate': '/usr/bin/pollinate'}
- cc_seed_random.handle('test', {}, c, LOG, [])
+ cfg = {'random_seed': {'command': ['pollinate', '-q']}}
+ cc_seed_random.handle('test', cfg, c, LOG, [])
subp_args = [f['args'] for f in self.subp_called]
self.assertIn(['pollinate', '-q'], subp_args)
- def test_seed_command_not_provided_pollinate_not_available(self):
+ def test_seed_command_not_provided(self):
c = self._get_cloud('ubuntu', {})
self.whichdata = {}
cc_seed_random.handle('test', {}, c, LOG, [])
# subp should not have been called as which would say not available
- self.assertEquals(self.subp_called, list())
+ self.assertFalse(self.subp_called)
def test_unavailable_seed_command_and_required_raises_error(self):
c = self._get_cloud('ubuntu', {})
self.whichdata = {}
+ cfg = {'random_seed': {'command': ['THIS_NO_COMMAND'],
+ 'command_required': True}}
self.assertRaises(ValueError, cc_seed_random.handle,
- 'test', {'random_seed': {'command_required': True}}, c, LOG, [])
+ 'test', cfg, c, LOG, [])
def test_seed_command_and_required(self):
c = self._get_cloud('ubuntu', {})
diff --git a/tests/unittests/test_handler/test_handler_snappy.py b/tests/unittests/test_handler/test_handler_snappy.py
index eceb14d9..8aeff53c 100644
--- a/tests/unittests/test_handler/test_handler_snappy.py
+++ b/tests/unittests/test_handler/test_handler_snappy.py
@@ -125,8 +125,7 @@ class TestInstallPackages(t_help.TestCase):
"pkg1.smoser.config": "pkg1.smoser.config-data",
"pkg1.config": "pkg1.config-data",
"pkg2.smoser_0.0_amd64.snap": "pkg2-snapdata",
- "pkg2.smoser_0.0_amd64.config": "pkg2.config",
- })
+ "pkg2.smoser_0.0_amd64.config": "pkg2.config"})
ret = get_package_ops(
packages=[], configs={}, installed=[], fspath=self.tmp)
diff --git a/tests/unittests/test_handler/test_handler_write_files.py b/tests/unittests/test_handler/test_handler_write_files.py
new file mode 100644
index 00000000..f1c7f7b4
--- /dev/null
+++ b/tests/unittests/test_handler/test_handler_write_files.py
@@ -0,0 +1,112 @@
+from cloudinit import util
+from cloudinit import log as logging
+from cloudinit.config.cc_write_files import write_files
+
+from ..helpers import FilesystemMockingTestCase
+
+import base64
+import gzip
+import shutil
+import six
+import tempfile
+
+LOG = logging.getLogger(__name__)
+
+YAML_TEXT = """
+write_files:
+ - encoding: gzip
+ content: !!binary |
+ H4sIAIDb/U8C/1NW1E/KzNMvzuBKTc7IV8hIzcnJVyjPL8pJ4QIA6N+MVxsAAAA=
+ path: /usr/bin/hello
+ permissions: '0755'
+ - content: !!binary |
+ Zm9vYmFyCg==
+ path: /wark
+ permissions: '0755'
+ - content: |
+ hi mom line 1
+ hi mom line 2
+ path: /tmp/message
+"""
+
+YAML_CONTENT_EXPECTED = {
+ '/usr/bin/hello': "#!/bin/sh\necho hello world\n",
+ '/wark': "foobar\n",
+ '/tmp/message': "hi mom line 1\nhi mom line 2\n",
+}
+
+
+class TestWriteFiles(FilesystemMockingTestCase):
+ def setUp(self):
+ super(TestWriteFiles, self).setUp()
+ self.tmp = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, self.tmp)
+
+ def test_simple(self):
+ self.patchUtils(self.tmp)
+ expected = "hello world\n"
+ filename = "/tmp/my.file"
+ write_files(
+ "test_simple", [{"content": expected, "path": filename}], LOG)
+ self.assertEqual(util.load_file(filename), expected)
+
+ def test_yaml_binary(self):
+ self.patchUtils(self.tmp)
+ data = util.load_yaml(YAML_TEXT)
+ write_files("testname", data['write_files'], LOG)
+ for path, content in YAML_CONTENT_EXPECTED.items():
+ self.assertEqual(util.load_file(path), content)
+
+ def test_all_decodings(self):
+ self.patchUtils(self.tmp)
+
+ # build a 'files' array that has a dictionary of encodings
+ # for 'gz', 'gzip', 'gz+base64' ...
+ data = b"foobzr"
+ utf8_valid = b"foobzr"
+ utf8_invalid = b'ab\xaadef'
+ files = []
+ expected = []
+
+ gz_aliases = ('gz', 'gzip')
+ gz_b64_aliases = ('gz+base64', 'gzip+base64', 'gz+b64', 'gzip+b64')
+ b64_aliases = ('base64', 'b64')
+
+ datum = (("utf8", utf8_valid), ("no-utf8", utf8_invalid))
+ for name, data in datum:
+ gz = (_gzip_bytes(data), gz_aliases)
+ gz_b64 = (base64.b64encode(_gzip_bytes(data)), gz_b64_aliases)
+ b64 = (base64.b64encode(data), b64_aliases)
+ for content, aliases in (gz, gz_b64, b64):
+ for enc in aliases:
+ cur = {'content': content,
+ 'path': '/tmp/file-%s-%s' % (name, enc),
+ 'encoding': enc}
+ files.append(cur)
+ expected.append((cur['path'], data))
+
+ write_files("test_decoding", files, LOG)
+
+ for path, content in expected:
+ self.assertEqual(util.load_file(path, decode=False), content)
+
+ # make sure we actually wrote *some* files.
+ flen_expected = (
+ len(gz_aliases + gz_b64_aliases + b64_aliases) * len(datum))
+ self.assertEqual(len(expected), flen_expected)
+
+
+def _gzip_bytes(data):
+ buf = six.BytesIO()
+ fp = None
+ try:
+ fp = gzip.GzipFile(fileobj=buf, mode="wb")
+ fp.write(data)
+ fp.close()
+ return buf.getvalue()
+ finally:
+ if fp:
+ fp.close()
+
+
+# vi: ts=4 expandtab