summaryrefslogtreecommitdiff
path: root/tests/unittests/test_handler
diff options
context:
space:
mode:
Diffstat (limited to 'tests/unittests/test_handler')
-rw-r--r--tests/unittests/test_handler/test_handler_lxd.py2
-rw-r--r--tests/unittests/test_handler/test_handler_power_state.py58
-rw-r--r--tests/unittests/test_handler/test_handler_refresh_rmc_and_interface.py109
-rw-r--r--tests/unittests/test_handler/test_handler_resizefs.py55
-rw-r--r--tests/unittests/test_handler/test_schema.py109
5 files changed, 258 insertions, 75 deletions
diff --git a/tests/unittests/test_handler/test_handler_lxd.py b/tests/unittests/test_handler/test_handler_lxd.py
index 21011204..b2181992 100644
--- a/tests/unittests/test_handler/test_handler_lxd.py
+++ b/tests/unittests/test_handler/test_handler_lxd.py
@@ -214,7 +214,7 @@ class TestLxdMaybeCleanupDefault(t_help.CiTestCase):
"""deletion of network should occur if create is True."""
cc_lxd.maybe_cleanup_default(
net_name=self.defnet, did_init=True, create=True, attach=False)
- m_lxc.assert_called_once_with(["network", "delete", self.defnet])
+ m_lxc.assert_called_with(["network", "delete", self.defnet])
@mock.patch("cloudinit.config.cc_lxd._lxc")
def test_device_removed_if_attach_true(self, m_lxc):
diff --git a/tests/unittests/test_handler/test_handler_power_state.py b/tests/unittests/test_handler/test_handler_power_state.py
index 93b24fdc..4ac49424 100644
--- a/tests/unittests/test_handler/test_handler_power_state.py
+++ b/tests/unittests/test_handler/test_handler_power_state.py
@@ -4,72 +4,102 @@ import sys
from cloudinit.config import cc_power_state_change as psc
+from cloudinit import distros
+from cloudinit import helpers
+
from cloudinit.tests import helpers as t_help
from cloudinit.tests.helpers import mock
class TestLoadPowerState(t_help.TestCase):
+ def setUp(self):
+ super(TestLoadPowerState, self).setUp()
+ cls = distros.fetch('ubuntu')
+ paths = helpers.Paths({})
+ self.dist = cls('ubuntu', {}, paths)
+
def test_no_config(self):
# completely empty config should mean do nothing
- (cmd, _timeout, _condition) = psc.load_power_state({}, 'ubuntu')
+ (cmd, _timeout, _condition) = psc.load_power_state({}, self.dist)
self.assertIsNone(cmd)
def test_irrelevant_config(self):
# no power_state field in config should return None for cmd
(cmd, _timeout, _condition) = psc.load_power_state({'foo': 'bar'},
- 'ubuntu')
+ self.dist)
self.assertIsNone(cmd)
def test_invalid_mode(self):
+
cfg = {'power_state': {'mode': 'gibberish'}}
- self.assertRaises(TypeError, psc.load_power_state, cfg, 'ubuntu')
+ self.assertRaises(TypeError, psc.load_power_state, cfg, self.dist)
cfg = {'power_state': {'mode': ''}}
- self.assertRaises(TypeError, psc.load_power_state, cfg, 'ubuntu')
+ self.assertRaises(TypeError, psc.load_power_state, cfg, self.dist)
def test_empty_mode(self):
cfg = {'power_state': {'message': 'goodbye'}}
- self.assertRaises(TypeError, psc.load_power_state, cfg, 'ubuntu')
+ self.assertRaises(TypeError, psc.load_power_state, cfg, self.dist)
def test_valid_modes(self):
cfg = {'power_state': {}}
for mode in ('halt', 'poweroff', 'reboot'):
cfg['power_state']['mode'] = mode
- check_lps_ret(psc.load_power_state(cfg, 'ubuntu'), mode=mode)
+ check_lps_ret(psc.load_power_state(cfg, self.dist), mode=mode)
def test_invalid_delay(self):
cfg = {'power_state': {'mode': 'poweroff', 'delay': 'goodbye'}}
- self.assertRaises(TypeError, psc.load_power_state, cfg, 'ubuntu')
+ self.assertRaises(TypeError, psc.load_power_state, cfg, self.dist)
def test_valid_delay(self):
cfg = {'power_state': {'mode': 'poweroff', 'delay': ''}}
for delay in ("now", "+1", "+30"):
cfg['power_state']['delay'] = delay
- check_lps_ret(psc.load_power_state(cfg, 'ubuntu'))
+ check_lps_ret(psc.load_power_state(cfg, self.dist))
def test_message_present(self):
cfg = {'power_state': {'mode': 'poweroff', 'message': 'GOODBYE'}}
- ret = psc.load_power_state(cfg, 'ubuntu')
- check_lps_ret(psc.load_power_state(cfg, 'ubuntu'))
+ ret = psc.load_power_state(cfg, self.dist)
+ check_lps_ret(psc.load_power_state(cfg, self.dist))
self.assertIn(cfg['power_state']['message'], ret[0])
def test_no_message(self):
# if message is not present, then no argument should be passed for it
cfg = {'power_state': {'mode': 'poweroff'}}
- (cmd, _timeout, _condition) = psc.load_power_state(cfg, 'ubuntu')
+ (cmd, _timeout, _condition) = psc.load_power_state(cfg, self.dist)
self.assertNotIn("", cmd)
- check_lps_ret(psc.load_power_state(cfg, 'ubuntu'))
+ check_lps_ret(psc.load_power_state(cfg, self.dist))
self.assertTrue(len(cmd) == 3)
def test_condition_null_raises(self):
cfg = {'power_state': {'mode': 'poweroff', 'condition': None}}
- self.assertRaises(TypeError, psc.load_power_state, cfg, 'ubuntu')
+ self.assertRaises(TypeError, psc.load_power_state, cfg, self.dist)
def test_condition_default_is_true(self):
cfg = {'power_state': {'mode': 'poweroff'}}
- _cmd, _timeout, cond = psc.load_power_state(cfg, 'ubuntu')
+ _cmd, _timeout, cond = psc.load_power_state(cfg, self.dist)
self.assertEqual(cond, True)
+ def test_freebsd_poweroff_uses_lowercase_p(self):
+ cls = distros.fetch('freebsd')
+ paths = helpers.Paths({})
+ freebsd = cls('freebsd', {}, paths)
+ cfg = {'power_state': {'mode': 'poweroff'}}
+ ret = psc.load_power_state(cfg, freebsd)
+ self.assertIn('-p', ret[0])
+
+ def test_alpine_delay(self):
+ # alpine takes delay in seconds.
+ cls = distros.fetch('alpine')
+ paths = helpers.Paths({})
+ alpine = cls('alpine', {}, paths)
+ cfg = {'power_state': {'mode': 'poweroff', 'delay': ''}}
+ for delay, value in (('now', 0), ("+1", 60), ("+30", 1800)):
+ cfg['power_state']['delay'] = delay
+ ret = psc.load_power_state(cfg, alpine)
+ self.assertEqual('-d', ret[0][1])
+ self.assertEqual(str(value), ret[0][2])
+
class TestCheckCondition(t_help.TestCase):
def cmd_with_exit(self, rc):
diff --git a/tests/unittests/test_handler/test_handler_refresh_rmc_and_interface.py b/tests/unittests/test_handler/test_handler_refresh_rmc_and_interface.py
new file mode 100644
index 00000000..e13b7793
--- /dev/null
+++ b/tests/unittests/test_handler/test_handler_refresh_rmc_and_interface.py
@@ -0,0 +1,109 @@
+from cloudinit.config import cc_refresh_rmc_and_interface as ccrmci
+
+from cloudinit import util
+
+from cloudinit.tests import helpers as t_help
+from cloudinit.tests.helpers import mock
+
+from textwrap import dedent
+import logging
+
+LOG = logging.getLogger(__name__)
+MPATH = "cloudinit.config.cc_refresh_rmc_and_interface"
+NET_INFO = {
+ 'lo': {'ipv4': [{'ip': '127.0.0.1',
+ 'bcast': '', 'mask': '255.0.0.0',
+ 'scope': 'host'}],
+ 'ipv6': [{'ip': '::1/128',
+ 'scope6': 'host'}], 'hwaddr': '',
+ 'up': 'True'},
+ 'env2': {'ipv4': [{'ip': '8.0.0.19',
+ 'bcast': '8.0.0.255', 'mask': '255.255.255.0',
+ 'scope': 'global'}],
+ 'ipv6': [{'ip': 'fe80::f896:c2ff:fe81:8220/64',
+ 'scope6': 'link'}], 'hwaddr': 'fa:96:c2:81:82:20',
+ 'up': 'True'},
+ 'env3': {'ipv4': [{'ip': '90.0.0.14',
+ 'bcast': '90.0.0.255', 'mask': '255.255.255.0',
+ 'scope': 'global'}],
+ 'ipv6': [{'ip': 'fe80::f896:c2ff:fe81:8221/64',
+ 'scope6': 'link'}], 'hwaddr': 'fa:96:c2:81:82:21',
+ 'up': 'True'},
+ 'env4': {'ipv4': [{'ip': '9.114.23.7',
+ 'bcast': '9.114.23.255', 'mask': '255.255.255.0',
+ 'scope': 'global'}],
+ 'ipv6': [{'ip': 'fe80::f896:c2ff:fe81:8222/64',
+ 'scope6': 'link'}], 'hwaddr': 'fa:96:c2:81:82:22',
+ 'up': 'True'},
+ 'env5': {'ipv4': [],
+ 'ipv6': [{'ip': 'fe80::9c26:c3ff:fea4:62c8/64',
+ 'scope6': 'link'}], 'hwaddr': '42:20:86:df:fa:4c',
+ 'up': 'True'}}
+
+
+class TestRsctNodeFile(t_help.CiTestCase):
+ def test_disable_ipv6_interface(self):
+ """test parsing of iface files."""
+ fname = self.tmp_path("iface-eth5")
+ util.write_file(fname, dedent("""\
+ BOOTPROTO=static
+ DEVICE=eth5
+ HWADDR=42:20:86:df:fa:4c
+ IPV6INIT=yes
+ IPADDR6=fe80::9c26:c3ff:fea4:62c8/64
+ IPV6ADDR=fe80::9c26:c3ff:fea4:62c8/64
+ NM_CONTROLLED=yes
+ ONBOOT=yes
+ STARTMODE=auto
+ TYPE=Ethernet
+ USERCTL=no
+ """))
+
+ ccrmci.disable_ipv6(fname)
+ self.assertEqual(dedent("""\
+ BOOTPROTO=static
+ DEVICE=eth5
+ HWADDR=42:20:86:df:fa:4c
+ ONBOOT=yes
+ STARTMODE=auto
+ TYPE=Ethernet
+ USERCTL=no
+ NM_CONTROLLED=no
+ """), util.load_file(fname))
+
+ @mock.patch(MPATH + '.refresh_rmc')
+ @mock.patch(MPATH + '.restart_network_manager')
+ @mock.patch(MPATH + '.disable_ipv6')
+ @mock.patch(MPATH + '.refresh_ipv6')
+ @mock.patch(MPATH + '.netinfo.netdev_info')
+ @mock.patch(MPATH + '.subp.which')
+ def test_handle(self, m_refresh_rmc,
+ m_netdev_info, m_refresh_ipv6, m_disable_ipv6,
+ m_restart_nm, m_which):
+ """Basic test of handle."""
+ m_netdev_info.return_value = NET_INFO
+ m_which.return_value = '/opt/rsct/bin/rmcctrl'
+ ccrmci.handle(
+ "refresh_rmc_and_interface", None, None, None, None)
+ self.assertEqual(1, m_netdev_info.call_count)
+ m_refresh_ipv6.assert_called_with('env5')
+ m_disable_ipv6.assert_called_with(
+ '/etc/sysconfig/network-scripts/ifcfg-env5')
+ self.assertEqual(1, m_restart_nm.call_count)
+ self.assertEqual(1, m_refresh_rmc.call_count)
+
+ @mock.patch(MPATH + '.netinfo.netdev_info')
+ def test_find_ipv6(self, m_netdev_info):
+ """find_ipv6_ifaces parses netdev_info returning those with ipv6"""
+ m_netdev_info.return_value = NET_INFO
+ found = ccrmci.find_ipv6_ifaces()
+ self.assertEqual(['env5'], found)
+
+ @mock.patch(MPATH + '.subp.subp')
+ def test_refresh_ipv6(self, m_subp):
+ """refresh_ipv6 should ip down and up the interface."""
+ iface = "myeth0"
+ ccrmci.refresh_ipv6(iface)
+ m_subp.assert_has_calls([
+ mock.call(['ip', 'link', 'set', iface, 'down']),
+ mock.call(['ip', 'link', 'set', iface, 'up'])])
diff --git a/tests/unittests/test_handler/test_handler_resizefs.py b/tests/unittests/test_handler/test_handler_resizefs.py
index db9a0414..28d55072 100644
--- a/tests/unittests/test_handler/test_handler_resizefs.py
+++ b/tests/unittests/test_handler/test_handler_resizefs.py
@@ -6,8 +6,8 @@ from cloudinit.config.cc_resizefs import (
from collections import namedtuple
import logging
-import textwrap
+from cloudinit.subp import ProcessExecutionError
from cloudinit.tests.helpers import (
CiTestCase, mock, skipUnlessJsonSchema, util, wrap_and_call)
@@ -22,44 +22,41 @@ class TestResizefs(CiTestCase):
super(TestResizefs, self).setUp()
self.name = "resizefs"
- @mock.patch('cloudinit.config.cc_resizefs._get_dumpfs_output')
- @mock.patch('cloudinit.config.cc_resizefs._get_gpart_output')
- def test_skip_ufs_resize(self, gpart_out, dumpfs_out):
+ @mock.patch('cloudinit.subp.subp')
+ def test_skip_ufs_resize(self, m_subp):
fs_type = "ufs"
resize_what = "/"
devpth = "/dev/da0p2"
- dumpfs_out.return_value = (
- "# newfs command for / (/dev/label/rootfs)\n"
- "newfs -O 2 -U -a 4 -b 32768 -d 32768 -e 4096 "
- "-f 4096 -g 16384 -h 64 -i 8192 -j -k 6408 -m 8 "
- "-o time -s 58719232 /dev/label/rootfs\n")
- gpart_out.return_value = textwrap.dedent("""\
- => 40 62914480 da0 GPT (30G)
- 40 1024 1 freebsd-boot (512K)
- 1064 58719232 2 freebsd-ufs (28G)
- 58720296 3145728 3 freebsd-swap (1.5G)
- 61866024 1048496 - free - (512M)
- """)
+ err = ("growfs: requested size 2.0GB is not larger than the "
+ "current filesystem size 2.0GB\n")
+ exception = ProcessExecutionError(stderr=err, exit_code=1)
+ m_subp.side_effect = exception
res = can_skip_resize(fs_type, resize_what, devpth)
self.assertTrue(res)
- @mock.patch('cloudinit.config.cc_resizefs._get_dumpfs_output')
- @mock.patch('cloudinit.config.cc_resizefs._get_gpart_output')
- def test_skip_ufs_resize_roundup(self, gpart_out, dumpfs_out):
+ @mock.patch('cloudinit.subp.subp')
+ def test_cannot_skip_ufs_resize(self, m_subp):
fs_type = "ufs"
resize_what = "/"
devpth = "/dev/da0p2"
- dumpfs_out.return_value = (
- "# newfs command for / (/dev/label/rootfs)\n"
- "newfs -O 2 -U -a 4 -b 32768 -d 32768 -e 4096 "
- "-f 4096 -g 16384 -h 64 -i 8192 -j -k 368 -m 8 "
- "-o time -s 297080 /dev/label/rootfs\n")
- gpart_out.return_value = textwrap.dedent("""\
- => 34 297086 da0 GPT (145M)
- 34 297086 1 freebsd-ufs (145M)
- """)
+ m_subp.return_value = (
+ ("stdout: super-block backups (for fsck_ffs -b #) at:\n\n"),
+ ("growfs: no room to allocate last cylinder group; "
+ "leaving 364KB unused\n")
+ )
res = can_skip_resize(fs_type, resize_what, devpth)
- self.assertTrue(res)
+ self.assertFalse(res)
+
+ @mock.patch('cloudinit.subp.subp')
+ def test_cannot_skip_ufs_growfs_exception(self, m_subp):
+ fs_type = "ufs"
+ resize_what = "/"
+ devpth = "/dev/da0p2"
+ err = "growfs: /dev/da0p2 is not clean - run fsck.\n"
+ exception = ProcessExecutionError(stderr=err, exit_code=1)
+ m_subp.side_effect = exception
+ with self.assertRaises(ProcessExecutionError):
+ can_skip_resize(fs_type, resize_what, devpth)
def test_can_skip_resize_ext(self):
self.assertFalse(can_skip_resize('ext', '/', '/dev/sda1'))
diff --git a/tests/unittests/test_handler/test_schema.py b/tests/unittests/test_handler/test_schema.py
index 44292571..15aa77bb 100644
--- a/tests/unittests/test_handler/test_schema.py
+++ b/tests/unittests/test_handler/test_schema.py
@@ -9,9 +9,9 @@ from cloudinit.util import write_file
from cloudinit.tests.helpers import CiTestCase, mock, skipUnlessJsonSchema
from copy import copy
+import itertools
import os
import pytest
-from io import StringIO
from pathlib import Path
from textwrap import dedent
from yaml import safe_load
@@ -400,50 +400,97 @@ class AnnotatedCloudconfigFileTest(CiTestCase):
annotated_cloudconfig_file(parsed_config, content, schema_errors))
-class MainTest(CiTestCase):
+class TestMain:
- def test_main_missing_args(self):
+ exclusive_combinations = itertools.combinations(
+ ["--system", "--docs all", "--config-file something"], 2
+ )
+
+ @pytest.mark.parametrize("params", exclusive_combinations)
+ def test_main_exclusive_args(self, params, capsys):
+ """Main exits non-zero and error on required exclusive args."""
+ params = list(itertools.chain(*[a.split() for a in params]))
+ with mock.patch('sys.argv', ['mycmd'] + params):
+ with pytest.raises(SystemExit) as context_manager:
+ main()
+ assert 1 == context_manager.value.code
+
+ _out, err = capsys.readouterr()
+ expected = (
+ 'Expected one of --config-file, --system or --docs arguments\n'
+ )
+ assert expected == err
+
+ def test_main_missing_args(self, capsys):
"""Main exits non-zero and reports an error on missing parameters."""
with mock.patch('sys.argv', ['mycmd']):
- with mock.patch('sys.stderr', new_callable=StringIO) as m_stderr:
- with self.assertRaises(SystemExit) as context_manager:
- main()
- self.assertEqual(1, context_manager.exception.code)
- self.assertEqual(
- 'Expected either --config-file argument or --docs\n',
- m_stderr.getvalue())
+ with pytest.raises(SystemExit) as context_manager:
+ main()
+ assert 1 == context_manager.value.code
+
+ _out, err = capsys.readouterr()
+ expected = (
+ 'Expected one of --config-file, --system or --docs arguments\n'
+ )
+ assert expected == err
- def test_main_absent_config_file(self):
+ def test_main_absent_config_file(self, capsys):
"""Main exits non-zero when config file is absent."""
myargs = ['mycmd', '--annotate', '--config-file', 'NOT_A_FILE']
with mock.patch('sys.argv', myargs):
- with mock.patch('sys.stderr', new_callable=StringIO) as m_stderr:
- with self.assertRaises(SystemExit) as context_manager:
- main()
- self.assertEqual(1, context_manager.exception.code)
- self.assertEqual(
- 'Configfile NOT_A_FILE does not exist\n',
- m_stderr.getvalue())
+ with pytest.raises(SystemExit) as context_manager:
+ main()
+ assert 1 == context_manager.value.code
+ _out, err = capsys.readouterr()
+ assert 'Configfile NOT_A_FILE does not exist\n' == err
- def test_main_prints_docs(self):
+ def test_main_prints_docs(self, capsys):
"""When --docs parameter is provided, main generates documentation."""
myargs = ['mycmd', '--docs', 'all']
with mock.patch('sys.argv', myargs):
- with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout:
- self.assertEqual(0, main(), 'Expected 0 exit code')
- self.assertIn('\nNTP\n---\n', m_stdout.getvalue())
- self.assertIn('\nRuncmd\n------\n', m_stdout.getvalue())
+ assert 0 == main(), 'Expected 0 exit code'
+ out, _err = capsys.readouterr()
+ assert '\nNTP\n---\n' in out
+ assert '\nRuncmd\n------\n' in out
- def test_main_validates_config_file(self):
+ def test_main_validates_config_file(self, tmpdir, capsys):
"""When --config-file parameter is provided, main validates schema."""
- myyaml = self.tmp_path('my.yaml')
- myargs = ['mycmd', '--config-file', myyaml]
- write_file(myyaml, b'#cloud-config\nntp:') # shortest ntp schema
+ myyaml = tmpdir.join('my.yaml')
+ myargs = ['mycmd', '--config-file', myyaml.strpath]
+ myyaml.write(b'#cloud-config\nntp:') # shortest ntp schema
with mock.patch('sys.argv', myargs):
- with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout:
- self.assertEqual(0, main(), 'Expected 0 exit code')
- self.assertIn(
- 'Valid cloud-config file {0}'.format(myyaml), m_stdout.getvalue())
+ assert 0 == main(), 'Expected 0 exit code'
+ out, _err = capsys.readouterr()
+ assert 'Valid cloud-config: {0}\n'.format(myyaml) == out
+
+ @mock.patch('cloudinit.config.schema.read_cfg_paths')
+ @mock.patch('cloudinit.config.schema.os.getuid', return_value=0)
+ def test_main_validates_system_userdata(
+ self, m_getuid, m_read_cfg_paths, capsys, paths
+ ):
+ """When --system is provided, main validates system userdata."""
+ m_read_cfg_paths.return_value = paths
+ ud_file = paths.get_ipath_cur("userdata_raw")
+ write_file(ud_file, b'#cloud-config\nntp:')
+ myargs = ['mycmd', '--system']
+ with mock.patch('sys.argv', myargs):
+ assert 0 == main(), 'Expected 0 exit code'
+ out, _err = capsys.readouterr()
+ assert 'Valid cloud-config: system userdata\n' == out
+
+ @mock.patch('cloudinit.config.schema.os.getuid', return_value=1000)
+ def test_main_system_userdata_requires_root(self, m_getuid, capsys, paths):
+ """Non-root user can't use --system param"""
+ myargs = ['mycmd', '--system']
+ with mock.patch('sys.argv', myargs):
+ with pytest.raises(SystemExit) as context_manager:
+ main()
+ assert 1 == context_manager.value.code
+ _out, err = capsys.readouterr()
+ expected = (
+ 'Unable to read system userdata as non-root user. Try using sudo\n'
+ )
+ assert expected == err
class CloudTestsIntegrationTest(CiTestCase):