diff options
Diffstat (limited to 'tests/unittests/cmd/devel')
-rw-r--r-- | tests/unittests/cmd/devel/__init__.py | 0 | ||||
-rw-r--r-- | tests/unittests/cmd/devel/test_hotplug_hook.py | 236 | ||||
-rw-r--r-- | tests/unittests/cmd/devel/test_logs.py | 213 | ||||
-rw-r--r-- | tests/unittests/cmd/devel/test_render.py | 154 |
4 files changed, 603 insertions, 0 deletions
diff --git a/tests/unittests/cmd/devel/__init__.py b/tests/unittests/cmd/devel/__init__.py new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/tests/unittests/cmd/devel/__init__.py diff --git a/tests/unittests/cmd/devel/test_hotplug_hook.py b/tests/unittests/cmd/devel/test_hotplug_hook.py new file mode 100644 index 00000000..5ecb5969 --- /dev/null +++ b/tests/unittests/cmd/devel/test_hotplug_hook.py @@ -0,0 +1,236 @@ +from collections import namedtuple +from unittest import mock +from unittest.mock import call + +import pytest + +from cloudinit.cmd.devel.hotplug_hook import handle_hotplug +from cloudinit.distros import Distro +from cloudinit.event import EventType +from cloudinit.net.activators import NetworkActivator +from cloudinit.net.network_state import NetworkState +from cloudinit.sources import DataSource +from cloudinit.stages import Init + +hotplug_args = namedtuple("hotplug_args", "udevaction, subsystem, devpath") +FAKE_MAC = "11:22:33:44:55:66" + + +@pytest.fixture +def mocks(): + m_init = mock.MagicMock(spec=Init) + m_distro = mock.MagicMock(spec=Distro) + m_datasource = mock.MagicMock(spec=DataSource) + m_datasource.distro = m_distro + m_init.datasource = m_datasource + m_init.fetch.return_value = m_datasource + + read_sys_net = mock.patch( + "cloudinit.cmd.devel.hotplug_hook.read_sys_net_safe", + return_value=FAKE_MAC, + ) + + update_event_enabled = mock.patch( + "cloudinit.stages.update_event_enabled", + return_value=True, + ) + + m_network_state = mock.MagicMock(spec=NetworkState) + parse_net = mock.patch( + "cloudinit.cmd.devel.hotplug_hook.parse_net_config_data", + return_value=m_network_state, + ) + + m_activator = mock.MagicMock(spec=NetworkActivator) + select_activator = mock.patch( + "cloudinit.cmd.devel.hotplug_hook.activators.select_activator", + return_value=m_activator, + ) + + sleep = mock.patch("time.sleep") + + read_sys_net.start() + update_event_enabled.start() + parse_net.start() + select_activator.start() + m_sleep = sleep.start() + + yield namedtuple("mocks", "m_init m_network_state m_activator m_sleep")( + m_init=m_init, + m_network_state=m_network_state, + m_activator=m_activator, + m_sleep=m_sleep, + ) + + read_sys_net.stop() + update_event_enabled.stop() + parse_net.stop() + select_activator.stop() + sleep.stop() + + +class TestUnsupportedActions: + def test_unsupported_subsystem(self, mocks): + with pytest.raises( + Exception, match="cannot handle events for subsystem: not_real" + ): + handle_hotplug( + hotplug_init=mocks.m_init, + devpath="/dev/fake", + subsystem="not_real", + udevaction="add", + ) + + def test_unsupported_udevaction(self, mocks): + with pytest.raises(ValueError, match="Unknown action: not_real"): + handle_hotplug( + hotplug_init=mocks.m_init, + devpath="/dev/fake", + udevaction="not_real", + subsystem="net", + ) + + +class TestHotplug: + def test_succcessful_add(self, mocks): + init = mocks.m_init + mocks.m_network_state.iter_interfaces.return_value = [ + { + "mac_address": FAKE_MAC, + } + ] + handle_hotplug( + hotplug_init=init, + devpath="/dev/fake", + udevaction="add", + subsystem="net", + ) + init.datasource.update_metadata_if_supported.assert_called_once_with( + [EventType.HOTPLUG] + ) + mocks.m_activator.bring_up_interface.assert_called_once_with("fake") + mocks.m_activator.bring_down_interface.assert_not_called() + init._write_to_cache.assert_called_once_with() + + def test_successful_remove(self, mocks): + init = mocks.m_init + mocks.m_network_state.iter_interfaces.return_value = [{}] + handle_hotplug( + hotplug_init=init, + devpath="/dev/fake", + udevaction="remove", + subsystem="net", + ) + init.datasource.update_metadata_if_supported.assert_called_once_with( + [EventType.HOTPLUG] + ) + mocks.m_activator.bring_down_interface.assert_called_once_with("fake") + mocks.m_activator.bring_up_interface.assert_not_called() + init._write_to_cache.assert_called_once_with() + + def test_update_event_disabled(self, mocks, caplog): + init = mocks.m_init + with mock.patch( + "cloudinit.stages.update_event_enabled", return_value=False + ): + handle_hotplug( + hotplug_init=init, + devpath="/dev/fake", + udevaction="remove", + subsystem="net", + ) + assert "hotplug not enabled for event of type" in caplog.text + init.datasource.update_metadata_if_supported.assert_not_called() + mocks.m_activator.bring_up_interface.assert_not_called() + mocks.m_activator.bring_down_interface.assert_not_called() + init._write_to_cache.assert_not_called() + + def test_update_metadata_failed(self, mocks): + mocks.m_init.datasource.update_metadata_if_supported.return_value = ( + False + ) + with pytest.raises( + RuntimeError, match="Datasource .* not updated for event hotplug" + ): + handle_hotplug( + hotplug_init=mocks.m_init, + devpath="/dev/fake", + udevaction="remove", + subsystem="net", + ) + + def test_detect_hotplugged_device_not_detected_on_add(self, mocks): + mocks.m_network_state.iter_interfaces.return_value = [{}] + with pytest.raises( + RuntimeError, + match="Failed to detect {} in updated metadata".format(FAKE_MAC), + ): + handle_hotplug( + hotplug_init=mocks.m_init, + devpath="/dev/fake", + udevaction="add", + subsystem="net", + ) + + def test_detect_hotplugged_device_detected_on_remove(self, mocks): + mocks.m_network_state.iter_interfaces.return_value = [ + { + "mac_address": FAKE_MAC, + } + ] + with pytest.raises( + RuntimeError, match="Failed to detect .* in updated metadata" + ): + handle_hotplug( + hotplug_init=mocks.m_init, + devpath="/dev/fake", + udevaction="remove", + subsystem="net", + ) + + def test_apply_failed_on_add(self, mocks): + mocks.m_network_state.iter_interfaces.return_value = [ + { + "mac_address": FAKE_MAC, + } + ] + mocks.m_activator.bring_up_interface.return_value = False + with pytest.raises( + RuntimeError, match="Failed to bring up device: /dev/fake" + ): + handle_hotplug( + hotplug_init=mocks.m_init, + devpath="/dev/fake", + udevaction="add", + subsystem="net", + ) + + def test_apply_failed_on_remove(self, mocks): + mocks.m_network_state.iter_interfaces.return_value = [{}] + mocks.m_activator.bring_down_interface.return_value = False + with pytest.raises( + RuntimeError, match="Failed to bring down device: /dev/fake" + ): + handle_hotplug( + hotplug_init=mocks.m_init, + devpath="/dev/fake", + udevaction="remove", + subsystem="net", + ) + + def test_retry(self, mocks): + with pytest.raises(RuntimeError): + handle_hotplug( + hotplug_init=mocks.m_init, + devpath="/dev/fake", + udevaction="add", + subsystem="net", + ) + assert mocks.m_sleep.call_count == 5 + assert mocks.m_sleep.call_args_list == [ + call(1), + call(3), + call(5), + call(10), + call(30), + ] diff --git a/tests/unittests/cmd/devel/test_logs.py b/tests/unittests/cmd/devel/test_logs.py new file mode 100644 index 00000000..73ed3c65 --- /dev/null +++ b/tests/unittests/cmd/devel/test_logs.py @@ -0,0 +1,213 @@ +# This file is part of cloud-init. See LICENSE file for license information. + +import os +from datetime import datetime +from io import StringIO + +from cloudinit.cmd.devel import logs +from cloudinit.sources import INSTANCE_JSON_SENSITIVE_FILE +from cloudinit.subp import subp +from cloudinit.util import ensure_dir, load_file, write_file +from tests.unittests.helpers import ( + FilesystemMockingTestCase, + mock, + wrap_and_call, +) + + +@mock.patch("cloudinit.cmd.devel.logs.os.getuid") +class TestCollectLogs(FilesystemMockingTestCase): + def setUp(self): + super(TestCollectLogs, self).setUp() + self.new_root = self.tmp_dir() + self.run_dir = self.tmp_path("run", self.new_root) + + def test_collect_logs_with_userdata_requires_root_user(self, m_getuid): + """collect-logs errors when non-root user collects userdata .""" + m_getuid.return_value = 100 # non-root + output_tarfile = self.tmp_path("logs.tgz") + with mock.patch("sys.stderr", new_callable=StringIO) as m_stderr: + self.assertEqual( + 1, logs.collect_logs(output_tarfile, include_userdata=True) + ) + self.assertEqual( + "To include userdata, root user is required." + " Try sudo cloud-init collect-logs\n", + m_stderr.getvalue(), + ) + + def test_collect_logs_creates_tarfile(self, m_getuid): + """collect-logs creates a tarfile with all related cloud-init info.""" + m_getuid.return_value = 100 + log1 = self.tmp_path("cloud-init.log", self.new_root) + write_file(log1, "cloud-init-log") + log2 = self.tmp_path("cloud-init-output.log", self.new_root) + write_file(log2, "cloud-init-output-log") + ensure_dir(self.run_dir) + write_file(self.tmp_path("results.json", self.run_dir), "results") + write_file( + self.tmp_path(INSTANCE_JSON_SENSITIVE_FILE, self.run_dir), + "sensitive", + ) + output_tarfile = self.tmp_path("logs.tgz") + + date = datetime.utcnow().date().strftime("%Y-%m-%d") + date_logdir = "cloud-init-logs-{0}".format(date) + + version_out = "/usr/bin/cloud-init 18.2fake\n" + expected_subp = { + ( + "dpkg-query", + "--show", + "-f=${Version}\n", + "cloud-init", + ): "0.7fake\n", + ("cloud-init", "--version"): version_out, + ("dmesg",): "dmesg-out\n", + ("journalctl", "--boot=0", "-o", "short-precise"): "journal-out\n", + ("tar", "czvf", output_tarfile, date_logdir): "", + } + + def fake_subp(cmd): + cmd_tuple = tuple(cmd) + if cmd_tuple not in expected_subp: + raise AssertionError( + "Unexpected command provided to subp: {0}".format(cmd) + ) + if cmd == ["tar", "czvf", output_tarfile, date_logdir]: + subp(cmd) # Pass through tar cmd so we can check output + return expected_subp[cmd_tuple], "" + + fake_stderr = mock.MagicMock() + + wrap_and_call( + "cloudinit.cmd.devel.logs", + { + "subp": {"side_effect": fake_subp}, + "sys.stderr": {"new": fake_stderr}, + "CLOUDINIT_LOGS": {"new": [log1, log2]}, + "CLOUDINIT_RUN_DIR": {"new": self.run_dir}, + }, + logs.collect_logs, + output_tarfile, + include_userdata=False, + ) + # unpack the tarfile and check file contents + subp(["tar", "zxvf", output_tarfile, "-C", self.new_root]) + out_logdir = self.tmp_path(date_logdir, self.new_root) + self.assertFalse( + os.path.exists( + os.path.join( + out_logdir, + "run", + "cloud-init", + INSTANCE_JSON_SENSITIVE_FILE, + ) + ), + "Unexpected file found: %s" % INSTANCE_JSON_SENSITIVE_FILE, + ) + self.assertEqual( + "0.7fake\n", load_file(os.path.join(out_logdir, "dpkg-version")) + ) + self.assertEqual( + version_out, load_file(os.path.join(out_logdir, "version")) + ) + self.assertEqual( + "cloud-init-log", + load_file(os.path.join(out_logdir, "cloud-init.log")), + ) + self.assertEqual( + "cloud-init-output-log", + load_file(os.path.join(out_logdir, "cloud-init-output.log")), + ) + self.assertEqual( + "dmesg-out\n", load_file(os.path.join(out_logdir, "dmesg.txt")) + ) + self.assertEqual( + "journal-out\n", load_file(os.path.join(out_logdir, "journal.txt")) + ) + self.assertEqual( + "results", + load_file( + os.path.join(out_logdir, "run", "cloud-init", "results.json") + ), + ) + fake_stderr.write.assert_any_call("Wrote %s\n" % output_tarfile) + + def test_collect_logs_includes_optional_userdata(self, m_getuid): + """collect-logs include userdata when --include-userdata is set.""" + m_getuid.return_value = 0 + log1 = self.tmp_path("cloud-init.log", self.new_root) + write_file(log1, "cloud-init-log") + log2 = self.tmp_path("cloud-init-output.log", self.new_root) + write_file(log2, "cloud-init-output-log") + userdata = self.tmp_path("user-data.txt", self.new_root) + write_file(userdata, "user-data") + ensure_dir(self.run_dir) + write_file(self.tmp_path("results.json", self.run_dir), "results") + write_file( + self.tmp_path(INSTANCE_JSON_SENSITIVE_FILE, self.run_dir), + "sensitive", + ) + output_tarfile = self.tmp_path("logs.tgz") + + date = datetime.utcnow().date().strftime("%Y-%m-%d") + date_logdir = "cloud-init-logs-{0}".format(date) + + version_out = "/usr/bin/cloud-init 18.2fake\n" + expected_subp = { + ( + "dpkg-query", + "--show", + "-f=${Version}\n", + "cloud-init", + ): "0.7fake", + ("cloud-init", "--version"): version_out, + ("dmesg",): "dmesg-out\n", + ("journalctl", "--boot=0", "-o", "short-precise"): "journal-out\n", + ("tar", "czvf", output_tarfile, date_logdir): "", + } + + def fake_subp(cmd): + cmd_tuple = tuple(cmd) + if cmd_tuple not in expected_subp: + raise AssertionError( + "Unexpected command provided to subp: {0}".format(cmd) + ) + if cmd == ["tar", "czvf", output_tarfile, date_logdir]: + subp(cmd) # Pass through tar cmd so we can check output + return expected_subp[cmd_tuple], "" + + fake_stderr = mock.MagicMock() + + wrap_and_call( + "cloudinit.cmd.devel.logs", + { + "subp": {"side_effect": fake_subp}, + "sys.stderr": {"new": fake_stderr}, + "CLOUDINIT_LOGS": {"new": [log1, log2]}, + "CLOUDINIT_RUN_DIR": {"new": self.run_dir}, + "USER_DATA_FILE": {"new": userdata}, + }, + logs.collect_logs, + output_tarfile, + include_userdata=True, + ) + # unpack the tarfile and check file contents + subp(["tar", "zxvf", output_tarfile, "-C", self.new_root]) + out_logdir = self.tmp_path(date_logdir, self.new_root) + self.assertEqual( + "user-data", load_file(os.path.join(out_logdir, "user-data.txt")) + ) + self.assertEqual( + "sensitive", + load_file( + os.path.join( + out_logdir, + "run", + "cloud-init", + INSTANCE_JSON_SENSITIVE_FILE, + ) + ), + ) + fake_stderr.write.assert_any_call("Wrote %s\n" % output_tarfile) diff --git a/tests/unittests/cmd/devel/test_render.py b/tests/unittests/cmd/devel/test_render.py new file mode 100644 index 00000000..4afc64f0 --- /dev/null +++ b/tests/unittests/cmd/devel/test_render.py @@ -0,0 +1,154 @@ +# This file is part of cloud-init. See LICENSE file for license information. + +import os +from collections import namedtuple +from io import StringIO + +from cloudinit.cmd.devel import render +from cloudinit.helpers import Paths +from cloudinit.sources import INSTANCE_JSON_FILE, INSTANCE_JSON_SENSITIVE_FILE +from cloudinit.util import ensure_dir, write_file +from tests.unittests.helpers import CiTestCase, mock, skipUnlessJinja + + +class TestRender(CiTestCase): + + with_logs = True + + args = namedtuple("renderargs", "user_data instance_data debug") + + def setUp(self): + super(TestRender, self).setUp() + self.tmp = self.tmp_dir() + + def test_handle_args_error_on_missing_user_data(self): + """When user_data file path does not exist, log an error.""" + absent_file = self.tmp_path("user-data", dir=self.tmp) + instance_data = self.tmp_path("instance-data", dir=self.tmp) + write_file(instance_data, "{}") + args = self.args( + user_data=absent_file, instance_data=instance_data, debug=False + ) + with mock.patch("sys.stderr", new_callable=StringIO): + self.assertEqual(1, render.handle_args("anyname", args)) + self.assertIn( + "Missing user-data file: %s" % absent_file, self.logs.getvalue() + ) + + def test_handle_args_error_on_missing_instance_data(self): + """When instance_data file path does not exist, log an error.""" + user_data = self.tmp_path("user-data", dir=self.tmp) + absent_file = self.tmp_path("instance-data", dir=self.tmp) + args = self.args( + user_data=user_data, instance_data=absent_file, debug=False + ) + with mock.patch("sys.stderr", new_callable=StringIO): + self.assertEqual(1, render.handle_args("anyname", args)) + self.assertIn( + "Missing instance-data.json file: %s" % absent_file, + self.logs.getvalue(), + ) + + def test_handle_args_defaults_instance_data(self): + """When no instance_data argument, default to configured run_dir.""" + user_data = self.tmp_path("user-data", dir=self.tmp) + run_dir = self.tmp_path("run_dir", dir=self.tmp) + ensure_dir(run_dir) + paths = Paths({"run_dir": run_dir}) + self.add_patch("cloudinit.cmd.devel.render.read_cfg_paths", "m_paths") + self.m_paths.return_value = paths + args = self.args(user_data=user_data, instance_data=None, debug=False) + with mock.patch("sys.stderr", new_callable=StringIO): + self.assertEqual(1, render.handle_args("anyname", args)) + json_file = os.path.join(run_dir, INSTANCE_JSON_FILE) + self.assertIn( + "Missing instance-data.json file: %s" % json_file, + self.logs.getvalue(), + ) + + def test_handle_args_root_fallback_from_sensitive_instance_data(self): + """When root user defaults to sensitive.json.""" + user_data = self.tmp_path("user-data", dir=self.tmp) + run_dir = self.tmp_path("run_dir", dir=self.tmp) + ensure_dir(run_dir) + paths = Paths({"run_dir": run_dir}) + self.add_patch("cloudinit.cmd.devel.render.read_cfg_paths", "m_paths") + self.m_paths.return_value = paths + args = self.args(user_data=user_data, instance_data=None, debug=False) + with mock.patch("sys.stderr", new_callable=StringIO): + with mock.patch("os.getuid") as m_getuid: + m_getuid.return_value = 0 + self.assertEqual(1, render.handle_args("anyname", args)) + json_file = os.path.join(run_dir, INSTANCE_JSON_FILE) + json_sensitive = os.path.join(run_dir, INSTANCE_JSON_SENSITIVE_FILE) + self.assertIn( + "WARNING: Missing root-readable %s. Using redacted %s" + % (json_sensitive, json_file), + self.logs.getvalue(), + ) + self.assertIn( + "ERROR: Missing instance-data.json file: %s" % json_file, + self.logs.getvalue(), + ) + + def test_handle_args_root_uses_sensitive_instance_data(self): + """When root user, and no instance-data arg, use sensitive.json.""" + user_data = self.tmp_path("user-data", dir=self.tmp) + write_file(user_data, "##template: jinja\nrendering: {{ my_var }}") + run_dir = self.tmp_path("run_dir", dir=self.tmp) + ensure_dir(run_dir) + json_sensitive = os.path.join(run_dir, INSTANCE_JSON_SENSITIVE_FILE) + write_file(json_sensitive, '{"my-var": "jinja worked"}') + paths = Paths({"run_dir": run_dir}) + self.add_patch("cloudinit.cmd.devel.render.read_cfg_paths", "m_paths") + self.m_paths.return_value = paths + args = self.args(user_data=user_data, instance_data=None, debug=False) + with mock.patch("sys.stderr", new_callable=StringIO): + with mock.patch("sys.stdout", new_callable=StringIO) as m_stdout: + with mock.patch("os.getuid") as m_getuid: + m_getuid.return_value = 0 + self.assertEqual(0, render.handle_args("anyname", args)) + self.assertIn("rendering: jinja worked", m_stdout.getvalue()) + + @skipUnlessJinja() + def test_handle_args_renders_instance_data_vars_in_template(self): + """If user_data file is a jinja template render instance-data vars.""" + user_data = self.tmp_path("user-data", dir=self.tmp) + write_file(user_data, "##template: jinja\nrendering: {{ my_var }}") + instance_data = self.tmp_path("instance-data", dir=self.tmp) + write_file(instance_data, '{"my-var": "jinja worked"}') + args = self.args( + user_data=user_data, instance_data=instance_data, debug=True + ) + with mock.patch("sys.stderr", new_callable=StringIO) as m_console_err: + with mock.patch("sys.stdout", new_callable=StringIO) as m_stdout: + self.assertEqual(0, render.handle_args("anyname", args)) + self.assertIn( + "DEBUG: Converted jinja variables\n{", self.logs.getvalue() + ) + self.assertIn( + "DEBUG: Converted jinja variables\n{", m_console_err.getvalue() + ) + self.assertEqual("rendering: jinja worked", m_stdout.getvalue()) + + @skipUnlessJinja() + def test_handle_args_warns_and_gives_up_on_invalid_jinja_operation(self): + """If user_data file has invalid jinja operations log warnings.""" + user_data = self.tmp_path("user-data", dir=self.tmp) + write_file(user_data, "##template: jinja\nrendering: {{ my-var }}") + instance_data = self.tmp_path("instance-data", dir=self.tmp) + write_file(instance_data, '{"my-var": "jinja worked"}') + args = self.args( + user_data=user_data, instance_data=instance_data, debug=True + ) + with mock.patch("sys.stderr", new_callable=StringIO): + self.assertEqual(1, render.handle_args("anyname", args)) + self.assertIn( + "WARNING: Ignoring jinja template for %s: Undefined jinja" + ' variable: "my-var". Jinja tried subtraction. Perhaps you meant' + ' "my_var"?' % user_data, + self.logs.getvalue(), + ) + + +# vi: ts=4 expandtab |