diff options
author | zsdc <taras@vyos.io> | 2022-03-25 20:58:01 +0200 |
---|---|---|
committer | zsdc <taras@vyos.io> | 2022-03-25 21:42:00 +0200 |
commit | 31448cccedd8f841fb3ac7d0f2e3cdefe08a53ba (patch) | |
tree | 349631a02467dae0158f6f663cc8aa8537974a97 /tests/unittests/cmd | |
parent | 5c4b3943343a85fbe517e5ec1fc670b3a8566b4b (diff) | |
parent | 8537237d80a48c8f0cbf8e66aa4826bbc882b022 (diff) | |
download | vyos-cloud-init-31448cccedd8f841fb3ac7d0f2e3cdefe08a53ba.tar.gz vyos-cloud-init-31448cccedd8f841fb3ac7d0f2e3cdefe08a53ba.zip |
T2117: Cloud-init updated to 22.1
Merged with 22.1 tag from the upstream Cloud-init repository.
Our modules were slightly modified for compatibility with the new
version.
Diffstat (limited to 'tests/unittests/cmd')
-rw-r--r-- | tests/unittests/cmd/__init__.py | 0 | ||||
-rw-r--r-- | tests/unittests/cmd/devel/__init__.py | 0 | ||||
-rw-r--r-- | tests/unittests/cmd/devel/test_hotplug_hook.py | 236 | ||||
-rw-r--r-- | tests/unittests/cmd/devel/test_logs.py | 213 | ||||
-rw-r--r-- | tests/unittests/cmd/devel/test_render.py | 154 | ||||
-rw-r--r-- | tests/unittests/cmd/test_clean.py | 211 | ||||
-rw-r--r-- | tests/unittests/cmd/test_cloud_id.py | 187 | ||||
-rw-r--r-- | tests/unittests/cmd/test_main.py | 241 | ||||
-rw-r--r-- | tests/unittests/cmd/test_query.py | 537 | ||||
-rw-r--r-- | tests/unittests/cmd/test_status.py | 548 |
10 files changed, 2327 insertions, 0 deletions
diff --git a/tests/unittests/cmd/__init__.py b/tests/unittests/cmd/__init__.py new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/tests/unittests/cmd/__init__.py diff --git a/tests/unittests/cmd/devel/__init__.py b/tests/unittests/cmd/devel/__init__.py new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/tests/unittests/cmd/devel/__init__.py diff --git a/tests/unittests/cmd/devel/test_hotplug_hook.py b/tests/unittests/cmd/devel/test_hotplug_hook.py new file mode 100644 index 00000000..5ecb5969 --- /dev/null +++ b/tests/unittests/cmd/devel/test_hotplug_hook.py @@ -0,0 +1,236 @@ +from collections import namedtuple +from unittest import mock +from unittest.mock import call + +import pytest + +from cloudinit.cmd.devel.hotplug_hook import handle_hotplug +from cloudinit.distros import Distro +from cloudinit.event import EventType +from cloudinit.net.activators import NetworkActivator +from cloudinit.net.network_state import NetworkState +from cloudinit.sources import DataSource +from cloudinit.stages import Init + +hotplug_args = namedtuple("hotplug_args", "udevaction, subsystem, devpath") +FAKE_MAC = "11:22:33:44:55:66" + + +@pytest.fixture +def mocks(): + m_init = mock.MagicMock(spec=Init) + m_distro = mock.MagicMock(spec=Distro) + m_datasource = mock.MagicMock(spec=DataSource) + m_datasource.distro = m_distro + m_init.datasource = m_datasource + m_init.fetch.return_value = m_datasource + + read_sys_net = mock.patch( + "cloudinit.cmd.devel.hotplug_hook.read_sys_net_safe", + return_value=FAKE_MAC, + ) + + update_event_enabled = mock.patch( + "cloudinit.stages.update_event_enabled", + return_value=True, + ) + + m_network_state = mock.MagicMock(spec=NetworkState) + parse_net = mock.patch( + "cloudinit.cmd.devel.hotplug_hook.parse_net_config_data", + return_value=m_network_state, + ) + + m_activator = mock.MagicMock(spec=NetworkActivator) + select_activator = mock.patch( + "cloudinit.cmd.devel.hotplug_hook.activators.select_activator", + return_value=m_activator, + ) + + sleep = mock.patch("time.sleep") + + read_sys_net.start() + update_event_enabled.start() + parse_net.start() + select_activator.start() + m_sleep = sleep.start() + + yield namedtuple("mocks", "m_init m_network_state m_activator m_sleep")( + m_init=m_init, + m_network_state=m_network_state, + m_activator=m_activator, + m_sleep=m_sleep, + ) + + read_sys_net.stop() + update_event_enabled.stop() + parse_net.stop() + select_activator.stop() + sleep.stop() + + +class TestUnsupportedActions: + def test_unsupported_subsystem(self, mocks): + with pytest.raises( + Exception, match="cannot handle events for subsystem: not_real" + ): + handle_hotplug( + hotplug_init=mocks.m_init, + devpath="/dev/fake", + subsystem="not_real", + udevaction="add", + ) + + def test_unsupported_udevaction(self, mocks): + with pytest.raises(ValueError, match="Unknown action: not_real"): + handle_hotplug( + hotplug_init=mocks.m_init, + devpath="/dev/fake", + udevaction="not_real", + subsystem="net", + ) + + +class TestHotplug: + def test_succcessful_add(self, mocks): + init = mocks.m_init + mocks.m_network_state.iter_interfaces.return_value = [ + { + "mac_address": FAKE_MAC, + } + ] + handle_hotplug( + hotplug_init=init, + devpath="/dev/fake", + udevaction="add", + subsystem="net", + ) + init.datasource.update_metadata_if_supported.assert_called_once_with( + [EventType.HOTPLUG] + ) + mocks.m_activator.bring_up_interface.assert_called_once_with("fake") + mocks.m_activator.bring_down_interface.assert_not_called() + init._write_to_cache.assert_called_once_with() + + def test_successful_remove(self, mocks): + init = mocks.m_init + mocks.m_network_state.iter_interfaces.return_value = [{}] + handle_hotplug( + hotplug_init=init, + devpath="/dev/fake", + udevaction="remove", + subsystem="net", + ) + init.datasource.update_metadata_if_supported.assert_called_once_with( + [EventType.HOTPLUG] + ) + mocks.m_activator.bring_down_interface.assert_called_once_with("fake") + mocks.m_activator.bring_up_interface.assert_not_called() + init._write_to_cache.assert_called_once_with() + + def test_update_event_disabled(self, mocks, caplog): + init = mocks.m_init + with mock.patch( + "cloudinit.stages.update_event_enabled", return_value=False + ): + handle_hotplug( + hotplug_init=init, + devpath="/dev/fake", + udevaction="remove", + subsystem="net", + ) + assert "hotplug not enabled for event of type" in caplog.text + init.datasource.update_metadata_if_supported.assert_not_called() + mocks.m_activator.bring_up_interface.assert_not_called() + mocks.m_activator.bring_down_interface.assert_not_called() + init._write_to_cache.assert_not_called() + + def test_update_metadata_failed(self, mocks): + mocks.m_init.datasource.update_metadata_if_supported.return_value = ( + False + ) + with pytest.raises( + RuntimeError, match="Datasource .* not updated for event hotplug" + ): + handle_hotplug( + hotplug_init=mocks.m_init, + devpath="/dev/fake", + udevaction="remove", + subsystem="net", + ) + + def test_detect_hotplugged_device_not_detected_on_add(self, mocks): + mocks.m_network_state.iter_interfaces.return_value = [{}] + with pytest.raises( + RuntimeError, + match="Failed to detect {} in updated metadata".format(FAKE_MAC), + ): + handle_hotplug( + hotplug_init=mocks.m_init, + devpath="/dev/fake", + udevaction="add", + subsystem="net", + ) + + def test_detect_hotplugged_device_detected_on_remove(self, mocks): + mocks.m_network_state.iter_interfaces.return_value = [ + { + "mac_address": FAKE_MAC, + } + ] + with pytest.raises( + RuntimeError, match="Failed to detect .* in updated metadata" + ): + handle_hotplug( + hotplug_init=mocks.m_init, + devpath="/dev/fake", + udevaction="remove", + subsystem="net", + ) + + def test_apply_failed_on_add(self, mocks): + mocks.m_network_state.iter_interfaces.return_value = [ + { + "mac_address": FAKE_MAC, + } + ] + mocks.m_activator.bring_up_interface.return_value = False + with pytest.raises( + RuntimeError, match="Failed to bring up device: /dev/fake" + ): + handle_hotplug( + hotplug_init=mocks.m_init, + devpath="/dev/fake", + udevaction="add", + subsystem="net", + ) + + def test_apply_failed_on_remove(self, mocks): + mocks.m_network_state.iter_interfaces.return_value = [{}] + mocks.m_activator.bring_down_interface.return_value = False + with pytest.raises( + RuntimeError, match="Failed to bring down device: /dev/fake" + ): + handle_hotplug( + hotplug_init=mocks.m_init, + devpath="/dev/fake", + udevaction="remove", + subsystem="net", + ) + + def test_retry(self, mocks): + with pytest.raises(RuntimeError): + handle_hotplug( + hotplug_init=mocks.m_init, + devpath="/dev/fake", + udevaction="add", + subsystem="net", + ) + assert mocks.m_sleep.call_count == 5 + assert mocks.m_sleep.call_args_list == [ + call(1), + call(3), + call(5), + call(10), + call(30), + ] diff --git a/tests/unittests/cmd/devel/test_logs.py b/tests/unittests/cmd/devel/test_logs.py new file mode 100644 index 00000000..73ed3c65 --- /dev/null +++ b/tests/unittests/cmd/devel/test_logs.py @@ -0,0 +1,213 @@ +# This file is part of cloud-init. See LICENSE file for license information. + +import os +from datetime import datetime +from io import StringIO + +from cloudinit.cmd.devel import logs +from cloudinit.sources import INSTANCE_JSON_SENSITIVE_FILE +from cloudinit.subp import subp +from cloudinit.util import ensure_dir, load_file, write_file +from tests.unittests.helpers import ( + FilesystemMockingTestCase, + mock, + wrap_and_call, +) + + +@mock.patch("cloudinit.cmd.devel.logs.os.getuid") +class TestCollectLogs(FilesystemMockingTestCase): + def setUp(self): + super(TestCollectLogs, self).setUp() + self.new_root = self.tmp_dir() + self.run_dir = self.tmp_path("run", self.new_root) + + def test_collect_logs_with_userdata_requires_root_user(self, m_getuid): + """collect-logs errors when non-root user collects userdata .""" + m_getuid.return_value = 100 # non-root + output_tarfile = self.tmp_path("logs.tgz") + with mock.patch("sys.stderr", new_callable=StringIO) as m_stderr: + self.assertEqual( + 1, logs.collect_logs(output_tarfile, include_userdata=True) + ) + self.assertEqual( + "To include userdata, root user is required." + " Try sudo cloud-init collect-logs\n", + m_stderr.getvalue(), + ) + + def test_collect_logs_creates_tarfile(self, m_getuid): + """collect-logs creates a tarfile with all related cloud-init info.""" + m_getuid.return_value = 100 + log1 = self.tmp_path("cloud-init.log", self.new_root) + write_file(log1, "cloud-init-log") + log2 = self.tmp_path("cloud-init-output.log", self.new_root) + write_file(log2, "cloud-init-output-log") + ensure_dir(self.run_dir) + write_file(self.tmp_path("results.json", self.run_dir), "results") + write_file( + self.tmp_path(INSTANCE_JSON_SENSITIVE_FILE, self.run_dir), + "sensitive", + ) + output_tarfile = self.tmp_path("logs.tgz") + + date = datetime.utcnow().date().strftime("%Y-%m-%d") + date_logdir = "cloud-init-logs-{0}".format(date) + + version_out = "/usr/bin/cloud-init 18.2fake\n" + expected_subp = { + ( + "dpkg-query", + "--show", + "-f=${Version}\n", + "cloud-init", + ): "0.7fake\n", + ("cloud-init", "--version"): version_out, + ("dmesg",): "dmesg-out\n", + ("journalctl", "--boot=0", "-o", "short-precise"): "journal-out\n", + ("tar", "czvf", output_tarfile, date_logdir): "", + } + + def fake_subp(cmd): + cmd_tuple = tuple(cmd) + if cmd_tuple not in expected_subp: + raise AssertionError( + "Unexpected command provided to subp: {0}".format(cmd) + ) + if cmd == ["tar", "czvf", output_tarfile, date_logdir]: + subp(cmd) # Pass through tar cmd so we can check output + return expected_subp[cmd_tuple], "" + + fake_stderr = mock.MagicMock() + + wrap_and_call( + "cloudinit.cmd.devel.logs", + { + "subp": {"side_effect": fake_subp}, + "sys.stderr": {"new": fake_stderr}, + "CLOUDINIT_LOGS": {"new": [log1, log2]}, + "CLOUDINIT_RUN_DIR": {"new": self.run_dir}, + }, + logs.collect_logs, + output_tarfile, + include_userdata=False, + ) + # unpack the tarfile and check file contents + subp(["tar", "zxvf", output_tarfile, "-C", self.new_root]) + out_logdir = self.tmp_path(date_logdir, self.new_root) + self.assertFalse( + os.path.exists( + os.path.join( + out_logdir, + "run", + "cloud-init", + INSTANCE_JSON_SENSITIVE_FILE, + ) + ), + "Unexpected file found: %s" % INSTANCE_JSON_SENSITIVE_FILE, + ) + self.assertEqual( + "0.7fake\n", load_file(os.path.join(out_logdir, "dpkg-version")) + ) + self.assertEqual( + version_out, load_file(os.path.join(out_logdir, "version")) + ) + self.assertEqual( + "cloud-init-log", + load_file(os.path.join(out_logdir, "cloud-init.log")), + ) + self.assertEqual( + "cloud-init-output-log", + load_file(os.path.join(out_logdir, "cloud-init-output.log")), + ) + self.assertEqual( + "dmesg-out\n", load_file(os.path.join(out_logdir, "dmesg.txt")) + ) + self.assertEqual( + "journal-out\n", load_file(os.path.join(out_logdir, "journal.txt")) + ) + self.assertEqual( + "results", + load_file( + os.path.join(out_logdir, "run", "cloud-init", "results.json") + ), + ) + fake_stderr.write.assert_any_call("Wrote %s\n" % output_tarfile) + + def test_collect_logs_includes_optional_userdata(self, m_getuid): + """collect-logs include userdata when --include-userdata is set.""" + m_getuid.return_value = 0 + log1 = self.tmp_path("cloud-init.log", self.new_root) + write_file(log1, "cloud-init-log") + log2 = self.tmp_path("cloud-init-output.log", self.new_root) + write_file(log2, "cloud-init-output-log") + userdata = self.tmp_path("user-data.txt", self.new_root) + write_file(userdata, "user-data") + ensure_dir(self.run_dir) + write_file(self.tmp_path("results.json", self.run_dir), "results") + write_file( + self.tmp_path(INSTANCE_JSON_SENSITIVE_FILE, self.run_dir), + "sensitive", + ) + output_tarfile = self.tmp_path("logs.tgz") + + date = datetime.utcnow().date().strftime("%Y-%m-%d") + date_logdir = "cloud-init-logs-{0}".format(date) + + version_out = "/usr/bin/cloud-init 18.2fake\n" + expected_subp = { + ( + "dpkg-query", + "--show", + "-f=${Version}\n", + "cloud-init", + ): "0.7fake", + ("cloud-init", "--version"): version_out, + ("dmesg",): "dmesg-out\n", + ("journalctl", "--boot=0", "-o", "short-precise"): "journal-out\n", + ("tar", "czvf", output_tarfile, date_logdir): "", + } + + def fake_subp(cmd): + cmd_tuple = tuple(cmd) + if cmd_tuple not in expected_subp: + raise AssertionError( + "Unexpected command provided to subp: {0}".format(cmd) + ) + if cmd == ["tar", "czvf", output_tarfile, date_logdir]: + subp(cmd) # Pass through tar cmd so we can check output + return expected_subp[cmd_tuple], "" + + fake_stderr = mock.MagicMock() + + wrap_and_call( + "cloudinit.cmd.devel.logs", + { + "subp": {"side_effect": fake_subp}, + "sys.stderr": {"new": fake_stderr}, + "CLOUDINIT_LOGS": {"new": [log1, log2]}, + "CLOUDINIT_RUN_DIR": {"new": self.run_dir}, + "USER_DATA_FILE": {"new": userdata}, + }, + logs.collect_logs, + output_tarfile, + include_userdata=True, + ) + # unpack the tarfile and check file contents + subp(["tar", "zxvf", output_tarfile, "-C", self.new_root]) + out_logdir = self.tmp_path(date_logdir, self.new_root) + self.assertEqual( + "user-data", load_file(os.path.join(out_logdir, "user-data.txt")) + ) + self.assertEqual( + "sensitive", + load_file( + os.path.join( + out_logdir, + "run", + "cloud-init", + INSTANCE_JSON_SENSITIVE_FILE, + ) + ), + ) + fake_stderr.write.assert_any_call("Wrote %s\n" % output_tarfile) diff --git a/tests/unittests/cmd/devel/test_render.py b/tests/unittests/cmd/devel/test_render.py new file mode 100644 index 00000000..4afc64f0 --- /dev/null +++ b/tests/unittests/cmd/devel/test_render.py @@ -0,0 +1,154 @@ +# This file is part of cloud-init. See LICENSE file for license information. + +import os +from collections import namedtuple +from io import StringIO + +from cloudinit.cmd.devel import render +from cloudinit.helpers import Paths +from cloudinit.sources import INSTANCE_JSON_FILE, INSTANCE_JSON_SENSITIVE_FILE +from cloudinit.util import ensure_dir, write_file +from tests.unittests.helpers import CiTestCase, mock, skipUnlessJinja + + +class TestRender(CiTestCase): + + with_logs = True + + args = namedtuple("renderargs", "user_data instance_data debug") + + def setUp(self): + super(TestRender, self).setUp() + self.tmp = self.tmp_dir() + + def test_handle_args_error_on_missing_user_data(self): + """When user_data file path does not exist, log an error.""" + absent_file = self.tmp_path("user-data", dir=self.tmp) + instance_data = self.tmp_path("instance-data", dir=self.tmp) + write_file(instance_data, "{}") + args = self.args( + user_data=absent_file, instance_data=instance_data, debug=False + ) + with mock.patch("sys.stderr", new_callable=StringIO): + self.assertEqual(1, render.handle_args("anyname", args)) + self.assertIn( + "Missing user-data file: %s" % absent_file, self.logs.getvalue() + ) + + def test_handle_args_error_on_missing_instance_data(self): + """When instance_data file path does not exist, log an error.""" + user_data = self.tmp_path("user-data", dir=self.tmp) + absent_file = self.tmp_path("instance-data", dir=self.tmp) + args = self.args( + user_data=user_data, instance_data=absent_file, debug=False + ) + with mock.patch("sys.stderr", new_callable=StringIO): + self.assertEqual(1, render.handle_args("anyname", args)) + self.assertIn( + "Missing instance-data.json file: %s" % absent_file, + self.logs.getvalue(), + ) + + def test_handle_args_defaults_instance_data(self): + """When no instance_data argument, default to configured run_dir.""" + user_data = self.tmp_path("user-data", dir=self.tmp) + run_dir = self.tmp_path("run_dir", dir=self.tmp) + ensure_dir(run_dir) + paths = Paths({"run_dir": run_dir}) + self.add_patch("cloudinit.cmd.devel.render.read_cfg_paths", "m_paths") + self.m_paths.return_value = paths + args = self.args(user_data=user_data, instance_data=None, debug=False) + with mock.patch("sys.stderr", new_callable=StringIO): + self.assertEqual(1, render.handle_args("anyname", args)) + json_file = os.path.join(run_dir, INSTANCE_JSON_FILE) + self.assertIn( + "Missing instance-data.json file: %s" % json_file, + self.logs.getvalue(), + ) + + def test_handle_args_root_fallback_from_sensitive_instance_data(self): + """When root user defaults to sensitive.json.""" + user_data = self.tmp_path("user-data", dir=self.tmp) + run_dir = self.tmp_path("run_dir", dir=self.tmp) + ensure_dir(run_dir) + paths = Paths({"run_dir": run_dir}) + self.add_patch("cloudinit.cmd.devel.render.read_cfg_paths", "m_paths") + self.m_paths.return_value = paths + args = self.args(user_data=user_data, instance_data=None, debug=False) + with mock.patch("sys.stderr", new_callable=StringIO): + with mock.patch("os.getuid") as m_getuid: + m_getuid.return_value = 0 + self.assertEqual(1, render.handle_args("anyname", args)) + json_file = os.path.join(run_dir, INSTANCE_JSON_FILE) + json_sensitive = os.path.join(run_dir, INSTANCE_JSON_SENSITIVE_FILE) + self.assertIn( + "WARNING: Missing root-readable %s. Using redacted %s" + % (json_sensitive, json_file), + self.logs.getvalue(), + ) + self.assertIn( + "ERROR: Missing instance-data.json file: %s" % json_file, + self.logs.getvalue(), + ) + + def test_handle_args_root_uses_sensitive_instance_data(self): + """When root user, and no instance-data arg, use sensitive.json.""" + user_data = self.tmp_path("user-data", dir=self.tmp) + write_file(user_data, "##template: jinja\nrendering: {{ my_var }}") + run_dir = self.tmp_path("run_dir", dir=self.tmp) + ensure_dir(run_dir) + json_sensitive = os.path.join(run_dir, INSTANCE_JSON_SENSITIVE_FILE) + write_file(json_sensitive, '{"my-var": "jinja worked"}') + paths = Paths({"run_dir": run_dir}) + self.add_patch("cloudinit.cmd.devel.render.read_cfg_paths", "m_paths") + self.m_paths.return_value = paths + args = self.args(user_data=user_data, instance_data=None, debug=False) + with mock.patch("sys.stderr", new_callable=StringIO): + with mock.patch("sys.stdout", new_callable=StringIO) as m_stdout: + with mock.patch("os.getuid") as m_getuid: + m_getuid.return_value = 0 + self.assertEqual(0, render.handle_args("anyname", args)) + self.assertIn("rendering: jinja worked", m_stdout.getvalue()) + + @skipUnlessJinja() + def test_handle_args_renders_instance_data_vars_in_template(self): + """If user_data file is a jinja template render instance-data vars.""" + user_data = self.tmp_path("user-data", dir=self.tmp) + write_file(user_data, "##template: jinja\nrendering: {{ my_var }}") + instance_data = self.tmp_path("instance-data", dir=self.tmp) + write_file(instance_data, '{"my-var": "jinja worked"}') + args = self.args( + user_data=user_data, instance_data=instance_data, debug=True + ) + with mock.patch("sys.stderr", new_callable=StringIO) as m_console_err: + with mock.patch("sys.stdout", new_callable=StringIO) as m_stdout: + self.assertEqual(0, render.handle_args("anyname", args)) + self.assertIn( + "DEBUG: Converted jinja variables\n{", self.logs.getvalue() + ) + self.assertIn( + "DEBUG: Converted jinja variables\n{", m_console_err.getvalue() + ) + self.assertEqual("rendering: jinja worked", m_stdout.getvalue()) + + @skipUnlessJinja() + def test_handle_args_warns_and_gives_up_on_invalid_jinja_operation(self): + """If user_data file has invalid jinja operations log warnings.""" + user_data = self.tmp_path("user-data", dir=self.tmp) + write_file(user_data, "##template: jinja\nrendering: {{ my-var }}") + instance_data = self.tmp_path("instance-data", dir=self.tmp) + write_file(instance_data, '{"my-var": "jinja worked"}') + args = self.args( + user_data=user_data, instance_data=instance_data, debug=True + ) + with mock.patch("sys.stderr", new_callable=StringIO): + self.assertEqual(1, render.handle_args("anyname", args)) + self.assertIn( + "WARNING: Ignoring jinja template for %s: Undefined jinja" + ' variable: "my-var". Jinja tried subtraction. Perhaps you meant' + ' "my_var"?' % user_data, + self.logs.getvalue(), + ) + + +# vi: ts=4 expandtab diff --git a/tests/unittests/cmd/test_clean.py b/tests/unittests/cmd/test_clean.py new file mode 100644 index 00000000..7d12017e --- /dev/null +++ b/tests/unittests/cmd/test_clean.py @@ -0,0 +1,211 @@ +# This file is part of cloud-init. See LICENSE file for license information. + +import os +from collections import namedtuple +from io import StringIO + +from cloudinit.cmd import clean +from cloudinit.util import ensure_dir, sym_link, write_file +from tests.unittests.helpers import CiTestCase, mock, wrap_and_call + +mypaths = namedtuple("MyPaths", "cloud_dir") + + +class TestClean(CiTestCase): + def setUp(self): + super(TestClean, self).setUp() + self.new_root = self.tmp_dir() + self.artifact_dir = self.tmp_path("artifacts", self.new_root) + self.log1 = self.tmp_path("cloud-init.log", self.new_root) + self.log2 = self.tmp_path("cloud-init-output.log", self.new_root) + + class FakeInit(object): + cfg = { + "def_log_file": self.log1, + "output": {"all": "|tee -a {0}".format(self.log2)}, + } + # Ensure cloud_dir has a trailing slash, to match real behaviour + paths = mypaths(cloud_dir="{}/".format(self.artifact_dir)) + + def __init__(self, ds_deps): + pass + + def read_cfg(self): + pass + + self.init_class = FakeInit + + def test_remove_artifacts_removes_logs(self): + """remove_artifacts removes logs when remove_logs is True.""" + write_file(self.log1, "cloud-init-log") + write_file(self.log2, "cloud-init-output-log") + + self.assertFalse( + os.path.exists(self.artifact_dir), "Unexpected artifacts dir" + ) + retcode = wrap_and_call( + "cloudinit.cmd.clean", + {"Init": {"side_effect": self.init_class}}, + clean.remove_artifacts, + remove_logs=True, + ) + self.assertFalse(os.path.exists(self.log1), "Unexpected file") + self.assertFalse(os.path.exists(self.log2), "Unexpected file") + self.assertEqual(0, retcode) + + def test_remove_artifacts_preserves_logs(self): + """remove_artifacts leaves logs when remove_logs is False.""" + write_file(self.log1, "cloud-init-log") + write_file(self.log2, "cloud-init-output-log") + + retcode = wrap_and_call( + "cloudinit.cmd.clean", + {"Init": {"side_effect": self.init_class}}, + clean.remove_artifacts, + remove_logs=False, + ) + self.assertTrue(os.path.exists(self.log1), "Missing expected file") + self.assertTrue(os.path.exists(self.log2), "Missing expected file") + self.assertEqual(0, retcode) + + def test_remove_artifacts_removes_unlinks_symlinks(self): + """remove_artifacts cleans artifacts dir unlinking any symlinks.""" + dir1 = os.path.join(self.artifact_dir, "dir1") + ensure_dir(dir1) + symlink = os.path.join(self.artifact_dir, "mylink") + sym_link(dir1, symlink) + + retcode = wrap_and_call( + "cloudinit.cmd.clean", + {"Init": {"side_effect": self.init_class}}, + clean.remove_artifacts, + remove_logs=False, + ) + self.assertEqual(0, retcode) + for path in (dir1, symlink): + self.assertFalse( + os.path.exists(path), "Unexpected {0} dir".format(path) + ) + + def test_remove_artifacts_removes_artifacts_skipping_seed(self): + """remove_artifacts cleans artifacts dir with exception of seed dir.""" + dirs = [ + self.artifact_dir, + os.path.join(self.artifact_dir, "seed"), + os.path.join(self.artifact_dir, "dir1"), + os.path.join(self.artifact_dir, "dir2"), + ] + for _dir in dirs: + ensure_dir(_dir) + + retcode = wrap_and_call( + "cloudinit.cmd.clean", + {"Init": {"side_effect": self.init_class}}, + clean.remove_artifacts, + remove_logs=False, + ) + self.assertEqual(0, retcode) + for expected_dir in dirs[:2]: + self.assertTrue( + os.path.exists(expected_dir), + "Missing {0} dir".format(expected_dir), + ) + for deleted_dir in dirs[2:]: + self.assertFalse( + os.path.exists(deleted_dir), + "Unexpected {0} dir".format(deleted_dir), + ) + + def test_remove_artifacts_removes_artifacts_removes_seed(self): + """remove_artifacts removes seed dir when remove_seed is True.""" + dirs = [ + self.artifact_dir, + os.path.join(self.artifact_dir, "seed"), + os.path.join(self.artifact_dir, "dir1"), + os.path.join(self.artifact_dir, "dir2"), + ] + for _dir in dirs: + ensure_dir(_dir) + + retcode = wrap_and_call( + "cloudinit.cmd.clean", + {"Init": {"side_effect": self.init_class}}, + clean.remove_artifacts, + remove_logs=False, + remove_seed=True, + ) + self.assertEqual(0, retcode) + self.assertTrue( + os.path.exists(self.artifact_dir), "Missing artifact dir" + ) + for deleted_dir in dirs[1:]: + self.assertFalse( + os.path.exists(deleted_dir), + "Unexpected {0} dir".format(deleted_dir), + ) + + def test_remove_artifacts_returns_one_on_errors(self): + """remove_artifacts returns non-zero on failure and prints an error.""" + ensure_dir(self.artifact_dir) + ensure_dir(os.path.join(self.artifact_dir, "dir1")) + + with mock.patch("sys.stderr", new_callable=StringIO) as m_stderr: + retcode = wrap_and_call( + "cloudinit.cmd.clean", + { + "del_dir": {"side_effect": OSError("oops")}, + "Init": {"side_effect": self.init_class}, + }, + clean.remove_artifacts, + remove_logs=False, + ) + self.assertEqual(1, retcode) + self.assertEqual( + "Error:\nCould not remove %s/dir1: oops\n" % self.artifact_dir, + m_stderr.getvalue(), + ) + + def test_handle_clean_args_reboots(self): + """handle_clean_args_reboots when reboot arg is provided.""" + + called_cmds = [] + + def fake_subp(cmd, capture): + called_cmds.append((cmd, capture)) + return "", "" + + myargs = namedtuple("MyArgs", "remove_logs remove_seed reboot") + cmdargs = myargs(remove_logs=False, remove_seed=False, reboot=True) + retcode = wrap_and_call( + "cloudinit.cmd.clean", + { + "subp": {"side_effect": fake_subp}, + "Init": {"side_effect": self.init_class}, + }, + clean.handle_clean_args, + name="does not matter", + args=cmdargs, + ) + self.assertEqual(0, retcode) + self.assertEqual([(["shutdown", "-r", "now"], False)], called_cmds) + + def test_status_main(self): + """clean.main can be run as a standalone script.""" + write_file(self.log1, "cloud-init-log") + with self.assertRaises(SystemExit) as context_manager: + wrap_and_call( + "cloudinit.cmd.clean", + { + "Init": {"side_effect": self.init_class}, + "sys.argv": {"new": ["clean", "--logs"]}, + }, + clean.main, + ) + + self.assertEqual(0, context_manager.exception.code) + self.assertFalse( + os.path.exists(self.log1), "Unexpected log {0}".format(self.log1) + ) + + +# vi: ts=4 expandtab syntax=python diff --git a/tests/unittests/cmd/test_cloud_id.py b/tests/unittests/cmd/test_cloud_id.py new file mode 100644 index 00000000..907297a6 --- /dev/null +++ b/tests/unittests/cmd/test_cloud_id.py @@ -0,0 +1,187 @@ +# This file is part of cloud-init. See LICENSE file for license information. + +"""Tests for cloud-id command line utility.""" + +from collections import namedtuple + +import pytest + +from cloudinit import util +from cloudinit.cmd import cloud_id +from tests.unittests.helpers import mock + +M_PATH = "cloudinit.cmd.cloud_id." + + +class TestCloudId: + + args = namedtuple("cloudidargs", "instance_data json long") + + def test_cloud_id_arg_parser_defaults(self): + """Validate the argument defaults when not provided by the end-user.""" + cmd = ["cloud-id"] + with mock.patch("sys.argv", cmd): + args = cloud_id.get_parser().parse_args() + assert "/run/cloud-init/instance-data.json" == args.instance_data + assert False is args.long + assert False is args.json + + def test_cloud_id_arg_parse_overrides(self, tmpdir): + """Override argument defaults by specifying values for each param.""" + instance_data = tmpdir.join("instance-data.json") + instance_data.write("{}") + cmd = [ + "cloud-id", + "--instance-data", + instance_data.strpath, + "--long", + "--json", + ] + with mock.patch("sys.argv", cmd): + args = cloud_id.get_parser().parse_args() + assert instance_data.strpath == args.instance_data + assert True is args.long + assert True is args.json + + @mock.patch(M_PATH + "get_status_details") + def test_cloud_id_missing_instance_data_json( + self, get_status_details, tmpdir, capsys + ): + """Exit error when the provided instance-data.json does not exist.""" + get_status_details.return_value = cloud_id.UXAppStatus.DONE, "n/a", "" + instance_data = tmpdir.join("instance-data.json") + cmd = ["cloud-id", "--instance-data", instance_data.strpath] + with mock.patch("sys.argv", cmd): + with pytest.raises(SystemExit) as context_manager: + cloud_id.main() + assert 1 == context_manager.value.code + _out, err = capsys.readouterr() + assert "Error:\nFile not found '%s'" % instance_data.strpath in err + + @mock.patch(M_PATH + "get_status_details") + def test_cloud_id_non_json_instance_data( + self, get_status_details, tmpdir, capsys + ): + """Exit error when the provided instance-data.json is not json.""" + get_status_details.return_value = cloud_id.UXAppStatus.DONE, "n/a", "" + instance_data = tmpdir.join("instance-data.json") + cmd = ["cloud-id", "--instance-data", instance_data.strpath] + instance_data.write("{") + with mock.patch("sys.argv", cmd): + with pytest.raises(SystemExit) as context_manager: + cloud_id.main() + assert 1 == context_manager.value.code + _out, err = capsys.readouterr() + assert ( + "Error:\nFile '%s' is not valid json." % instance_data.strpath + in err + ) + + @mock.patch(M_PATH + "get_status_details") + def test_cloud_id_from_cloud_name_in_instance_data( + self, get_status_details, tmpdir, capsys + ): + """Report canonical cloud-id from cloud_name in instance-data.""" + instance_data = tmpdir.join("instance-data.json") + get_status_details.return_value = cloud_id.UXAppStatus.DONE, "n/a", "" + instance_data.write( + '{"v1": {"cloud_name": "mycloud", "region": "somereg"}}', + ) + cmd = ["cloud-id", "--instance-data", instance_data.strpath] + with mock.patch("sys.argv", cmd): + with pytest.raises(SystemExit) as context_manager: + cloud_id.main() + assert 0 == context_manager.value.code + out, _err = capsys.readouterr() + assert "mycloud\n" == out + + @mock.patch(M_PATH + "get_status_details") + def test_cloud_id_long_name_from_instance_data( + self, get_status_details, tmpdir, capsys + ): + """Report long cloud-id format from cloud_name and region.""" + get_status_details.return_value = cloud_id.UXAppStatus.DONE, "n/a", "" + instance_data = tmpdir.join("instance-data.json") + instance_data.write( + '{"v1": {"cloud_name": "mycloud", "region": "somereg"}}', + ) + cmd = ["cloud-id", "--instance-data", instance_data.strpath, "--long"] + with mock.patch("sys.argv", cmd): + with pytest.raises(SystemExit) as context_manager: + cloud_id.main() + out, _err = capsys.readouterr() + assert 0 == context_manager.value.code + assert "mycloud\tsomereg\n" == out + + @mock.patch(M_PATH + "get_status_details") + def test_cloud_id_lookup_from_instance_data_region( + self, get_status_details, tmpdir, capsys + ): + """Report discovered canonical cloud_id when region lookup matches.""" + get_status_details.return_value = cloud_id.UXAppStatus.DONE, "n/a", "" + instance_data = tmpdir.join("instance-data.json") + instance_data.write( + '{"v1": {"cloud_name": "aws", "region": "cn-north-1",' + ' "platform": "ec2"}}', + ) + cmd = ["cloud-id", "--instance-data", instance_data.strpath, "--long"] + with mock.patch("sys.argv", cmd): + with pytest.raises(SystemExit) as context_manager: + cloud_id.main() + assert 0 == context_manager.value.code + out, _err = capsys.readouterr() + assert "aws-china\tcn-north-1\n" == out + + @mock.patch(M_PATH + "get_status_details") + def test_cloud_id_lookup_json_instance_data_adds_cloud_id_to_json( + self, get_status_details, tmpdir, capsys + ): + """Report v1 instance-data content with cloud_id when --json set.""" + get_status_details.return_value = cloud_id.UXAppStatus.DONE, "n/a", "" + instance_data = tmpdir.join("instance-data.json") + instance_data.write( + '{"v1": {"cloud_name": "unknown", "region": "dfw",' + ' "platform": "openstack", "public_ssh_keys": []}}', + ) + expected = util.json_dumps( + { + "cloud_id": "openstack", + "cloud_name": "unknown", + "platform": "openstack", + "public_ssh_keys": [], + "region": "dfw", + } + ) + cmd = ["cloud-id", "--instance-data", instance_data.strpath, "--json"] + with mock.patch("sys.argv", cmd): + with pytest.raises(SystemExit) as context_manager: + cloud_id.main() + out, _err = capsys.readouterr() + assert 0 == context_manager.value.code + assert expected + "\n" == out + + @pytest.mark.parametrize( + "status, exit_code", + ( + (cloud_id.UXAppStatus.DISABLED, 2), + (cloud_id.UXAppStatus.NOT_RUN, 3), + (cloud_id.UXAppStatus.RUNNING, 0), + ), + ) + @mock.patch(M_PATH + "get_status_details") + def test_cloud_id_unique_exit_codes_for_status( + self, get_status_details, status, exit_code, tmpdir, capsys + ): + """cloud-id returns unique exit codes for status.""" + get_status_details.return_value = status, "n/a", "" + instance_data = tmpdir.join("instance-data.json") + if status == cloud_id.UXAppStatus.RUNNING: + instance_data.write("{}") + cmd = ["cloud-id", "--instance-data", instance_data.strpath, "--json"] + with mock.patch("sys.argv", cmd): + with pytest.raises(SystemExit) as context_manager: + cloud_id.main() + assert exit_code == context_manager.value.code + + +# vi: ts=4 expandtab diff --git a/tests/unittests/cmd/test_main.py b/tests/unittests/cmd/test_main.py new file mode 100644 index 00000000..3e778b0b --- /dev/null +++ b/tests/unittests/cmd/test_main.py @@ -0,0 +1,241 @@ +# This file is part of cloud-init. See LICENSE file for license information. + +import copy +import os +from collections import namedtuple +from io import StringIO +from unittest import mock + +import pytest + +from cloudinit import safeyaml +from cloudinit.cmd import main +from cloudinit.util import ensure_dir, load_file, write_file +from tests.unittests.helpers import FilesystemMockingTestCase, wrap_and_call + +mypaths = namedtuple("MyPaths", "run_dir") +myargs = namedtuple("MyArgs", "debug files force local reporter subcommand") + + +class TestMain(FilesystemMockingTestCase): + with_logs = True + allowed_subp = False + + def setUp(self): + super(TestMain, self).setUp() + self.new_root = self.tmp_dir() + self.cloud_dir = self.tmp_path("var/lib/cloud/", dir=self.new_root) + os.makedirs(self.cloud_dir) + self.replicateTestRoot("simple_ubuntu", self.new_root) + self.cfg = { + "datasource_list": ["None"], + "runcmd": ["ls /etc"], # test ALL_DISTROS + "system_info": { + "paths": { + "cloud_dir": self.cloud_dir, + "run_dir": self.new_root, + } + }, + "write_files": [ + { + "path": "/etc/blah.ini", + "content": "blah", + "permissions": 0o755, + }, + ], + "cloud_init_modules": ["write-files", "runcmd"], + } + cloud_cfg = safeyaml.dumps(self.cfg) + ensure_dir(os.path.join(self.new_root, "etc", "cloud")) + self.cloud_cfg_file = os.path.join( + self.new_root, "etc", "cloud", "cloud.cfg" + ) + write_file(self.cloud_cfg_file, cloud_cfg) + self.patchOS(self.new_root) + self.patchUtils(self.new_root) + self.stderr = StringIO() + self.patchStdoutAndStderr(stderr=self.stderr) + + def test_main_init_run_net_stops_on_file_no_net(self): + """When no-net file is present, main_init does not process modules.""" + stop_file = os.path.join(self.cloud_dir, "data", "no-net") # stop file + write_file(stop_file, "") + cmdargs = myargs( + debug=False, + files=None, + force=False, + local=False, + reporter=None, + subcommand="init", + ) + (_item1, item2) = wrap_and_call( + "cloudinit.cmd.main", + { + "util.close_stdin": True, + "netinfo.debug_info": "my net debug info", + "util.fixup_output": ("outfmt", "errfmt"), + }, + main.main_init, + "init", + cmdargs, + ) + # We should not run write_files module + self.assertFalse( + os.path.exists(os.path.join(self.new_root, "etc/blah.ini")), + "Unexpected run of write_files module produced blah.ini", + ) + self.assertEqual([], item2) + # Instancify is called + instance_id_path = "var/lib/cloud/data/instance-id" + self.assertFalse( + os.path.exists(os.path.join(self.new_root, instance_id_path)), + "Unexpected call to datasource.instancify produced instance-id", + ) + expected_logs = [ + "Exiting. stop file ['{stop_file}'] existed\n".format( + stop_file=stop_file + ), + "my net debug info", # netinfo.debug_info + ] + for log in expected_logs: + self.assertIn(log, self.stderr.getvalue()) + + def test_main_init_run_net_runs_modules(self): + """Modules like write_files are run in 'net' mode.""" + cmdargs = myargs( + debug=False, + files=None, + force=False, + local=False, + reporter=None, + subcommand="init", + ) + (_item1, item2) = wrap_and_call( + "cloudinit.cmd.main", + { + "util.close_stdin": True, + "netinfo.debug_info": "my net debug info", + "util.fixup_output": ("outfmt", "errfmt"), + }, + main.main_init, + "init", + cmdargs, + ) + self.assertEqual([], item2) + # Instancify is called + instance_id_path = "var/lib/cloud/data/instance-id" + self.assertEqual( + "iid-datasource-none\n", + os.path.join( + load_file(os.path.join(self.new_root, instance_id_path)) + ), + ) + # modules are run (including write_files) + self.assertEqual( + "blah", load_file(os.path.join(self.new_root, "etc/blah.ini")) + ) + expected_logs = [ + "network config is disabled by fallback", # apply_network_config + "my net debug info", # netinfo.debug_info + "no previous run detected", + ] + for log in expected_logs: + self.assertIn(log, self.stderr.getvalue()) + + def test_main_init_run_net_calls_set_hostname_when_metadata_present(self): + """When local-hostname metadata is present, call cc_set_hostname.""" + self.cfg["datasource"] = { + "None": {"metadata": {"local-hostname": "md-hostname"}} + } + cloud_cfg = safeyaml.dumps(self.cfg) + write_file(self.cloud_cfg_file, cloud_cfg) + cmdargs = myargs( + debug=False, + files=None, + force=False, + local=False, + reporter=None, + subcommand="init", + ) + + def set_hostname(name, cfg, cloud, log, args): + self.assertEqual("set-hostname", name) + updated_cfg = copy.deepcopy(self.cfg) + updated_cfg.update( + { + "def_log_file": "/var/log/cloud-init.log", + "log_cfgs": [], + "syslog_fix_perms": [ + "syslog:adm", + "root:adm", + "root:wheel", + "root:root", + ], + "vendor_data": {"enabled": True, "prefix": []}, + "vendor_data2": {"enabled": True, "prefix": []}, + } + ) + updated_cfg.pop("system_info") + + self.assertEqual(updated_cfg, cfg) + self.assertEqual(main.LOG, log) + self.assertIsNone(args) + + (_item1, item2) = wrap_and_call( + "cloudinit.cmd.main", + { + "util.close_stdin": True, + "netinfo.debug_info": "my net debug info", + "cc_set_hostname.handle": {"side_effect": set_hostname}, + "util.fixup_output": ("outfmt", "errfmt"), + }, + main.main_init, + "init", + cmdargs, + ) + self.assertEqual([], item2) + # Instancify is called + instance_id_path = "var/lib/cloud/data/instance-id" + self.assertEqual( + "iid-datasource-none\n", + os.path.join( + load_file(os.path.join(self.new_root, instance_id_path)) + ), + ) + # modules are run (including write_files) + self.assertEqual( + "blah", load_file(os.path.join(self.new_root, "etc/blah.ini")) + ) + expected_logs = [ + "network config is disabled by fallback", # apply_network_config + "my net debug info", # netinfo.debug_info + "no previous run detected", + ] + for log in expected_logs: + self.assertIn(log, self.stderr.getvalue()) + + +class TestShouldBringUpInterfaces: + @pytest.mark.parametrize( + "cfg_disable,args_local,expected", + [ + (True, True, False), + (True, False, False), + (False, True, False), + (False, False, True), + ], + ) + def test_should_bring_up_interfaces( + self, cfg_disable, args_local, expected + ): + init = mock.Mock() + init.cfg = {"disable_network_activation": cfg_disable} + + args = mock.Mock() + args.local = args_local + + result = main._should_bring_up_interfaces(init, args) + assert result == expected + + +# vi: ts=4 expandtab diff --git a/tests/unittests/cmd/test_query.py b/tests/unittests/cmd/test_query.py new file mode 100644 index 00000000..03a73bb5 --- /dev/null +++ b/tests/unittests/cmd/test_query.py @@ -0,0 +1,537 @@ +# This file is part of cloud-init. See LICENSE file for license information. + +import errno +import gzip +import json +import os +from collections import namedtuple +from io import BytesIO +from textwrap import dedent + +import pytest + +from cloudinit.cmd import query +from cloudinit.helpers import Paths +from cloudinit.sources import ( + INSTANCE_JSON_FILE, + INSTANCE_JSON_SENSITIVE_FILE, + REDACT_SENSITIVE_VALUE, +) +from cloudinit.util import b64e, write_file +from tests.unittests.helpers import mock + + +def _gzip_data(data): + with BytesIO() as iobuf: + with gzip.GzipFile(mode="wb", fileobj=iobuf) as gzfp: + gzfp.write(data) + return iobuf.getvalue() + + +@mock.patch("cloudinit.cmd.query.addLogHandlerCLI", lambda *args: "") +class TestQuery: + + args = namedtuple( + "queryargs", + "debug dump_all format instance_data list_keys user_data vendor_data" + " varname", + ) + + def _setup_paths(self, tmpdir, ud_val=None, vd_val=None): + """Write userdata and vendordata into a tmpdir. + + Return: + 4-tuple : (paths, run_dir_path, userdata_path, vendordata_path) + """ + if ud_val: + user_data = tmpdir.join("user-data") + write_file(user_data.strpath, ud_val) + else: + user_data = None + if vd_val: + vendor_data = tmpdir.join("vendor-data") + write_file(vendor_data.strpath, vd_val) + else: + vendor_data = None + run_dir = tmpdir.join("run_dir") + run_dir.ensure_dir() + + cloud_dir = tmpdir.join("cloud_dir") + cloud_dir.ensure_dir() + + return ( + Paths( + {"cloud_dir": cloud_dir.strpath, "run_dir": run_dir.strpath} + ), + run_dir, + user_data, + vendor_data, + ) + + def test_handle_args_error_on_missing_param(self, caplog, capsys): + """Error when missing required parameters and print usage.""" + args = self.args( + debug=False, + dump_all=False, + format=None, + instance_data=None, + list_keys=False, + user_data=None, + vendor_data=None, + varname=None, + ) + with mock.patch( + "cloudinit.cmd.query.addLogHandlerCLI", return_value="" + ) as m_cli_log: + assert 1 == query.handle_args("anyname", args) + expected_error = ( + "Expected one of the options: --all, --format, --list-keys" + " or varname\n" + ) + assert expected_error in caplog.text + out, _err = capsys.readouterr() + assert "usage: query" in out + assert 1 == m_cli_log.call_count + + @pytest.mark.parametrize( + "inst_data,varname,expected_error", + ( + ( + '{"v1": {"key-2": "value-2"}}', + "v1.absent_leaf", + "instance-data 'v1' has no 'absent_leaf'\n", + ), + ( + '{"v1": {"key-2": "value-2"}}', + "absent_key", + "Undefined instance-data key 'absent_key'\n", + ), + ), + ) + def test_handle_args_error_on_invalid_vaname_paths( + self, inst_data, varname, expected_error, caplog, tmpdir + ): + """Error when varname is not a valid instance-data variable path.""" + instance_data = tmpdir.join("instance-data") + instance_data.write(inst_data) + args = self.args( + debug=False, + dump_all=False, + format=None, + instance_data=instance_data.strpath, + list_keys=False, + user_data=None, + vendor_data=None, + varname=varname, + ) + paths, _, _, _ = self._setup_paths(tmpdir) + with mock.patch("cloudinit.cmd.query.read_cfg_paths") as m_paths: + m_paths.return_value = paths + with mock.patch( + "cloudinit.cmd.query.addLogHandlerCLI", return_value="" + ): + with mock.patch("cloudinit.cmd.query.load_userdata") as m_lud: + m_lud.return_value = "ud" + assert 1 == query.handle_args("anyname", args) + assert expected_error in caplog.text + + def test_handle_args_error_on_missing_instance_data(self, caplog, tmpdir): + """When instance_data file path does not exist, log an error.""" + absent_fn = tmpdir.join("absent") + args = self.args( + debug=False, + dump_all=True, + format=None, + instance_data=absent_fn.strpath, + list_keys=False, + user_data="ud", + vendor_data="vd", + varname=None, + ) + assert 1 == query.handle_args("anyname", args) + + msg = "Missing instance-data file: %s" % absent_fn + assert msg in caplog.text + + def test_handle_args_error_when_no_read_permission_instance_data( + self, caplog, tmpdir + ): + """When instance_data file is unreadable, log an error.""" + noread_fn = tmpdir.join("unreadable") + noread_fn.write("thou shall not pass") + args = self.args( + debug=False, + dump_all=True, + format=None, + instance_data=noread_fn.strpath, + list_keys=False, + user_data="ud", + vendor_data="vd", + varname=None, + ) + with mock.patch("cloudinit.cmd.query.util.load_file") as m_load: + m_load.side_effect = OSError(errno.EACCES, "Not allowed") + assert 1 == query.handle_args("anyname", args) + msg = "No read permission on '%s'. Try sudo" % noread_fn + assert msg in caplog.text + + def test_handle_args_defaults_instance_data(self, caplog, tmpdir): + """When no instance_data argument, default to configured run_dir.""" + args = self.args( + debug=False, + dump_all=True, + format=None, + instance_data=None, + list_keys=False, + user_data=None, + vendor_data=None, + varname=None, + ) + paths, run_dir, _, _ = self._setup_paths(tmpdir) + with mock.patch("cloudinit.cmd.query.read_cfg_paths") as m_paths: + m_paths.return_value = paths + assert 1 == query.handle_args("anyname", args) + json_file = run_dir.join(INSTANCE_JSON_FILE) + msg = "Missing instance-data file: %s" % json_file.strpath + assert msg in caplog.text + + def test_handle_args_root_fallsback_to_instance_data(self, caplog, tmpdir): + """When no instance_data argument, root falls back to redacted json.""" + args = self.args( + debug=False, + dump_all=True, + format=None, + instance_data=None, + list_keys=False, + user_data=None, + vendor_data=None, + varname=None, + ) + paths, run_dir, _, _ = self._setup_paths(tmpdir) + with mock.patch("cloudinit.cmd.query.read_cfg_paths") as m_paths: + m_paths.return_value = paths + with mock.patch("os.getuid") as m_getuid: + m_getuid.return_value = 0 + assert 1 == query.handle_args("anyname", args) + json_file = run_dir.join(INSTANCE_JSON_FILE) + sensitive_file = run_dir.join(INSTANCE_JSON_SENSITIVE_FILE) + msg = "Missing root-readable %s. Using redacted %s instead." % ( + sensitive_file.strpath, + json_file.strpath, + ) + assert msg in caplog.text + + @pytest.mark.parametrize( + "ud_src,ud_expected,vd_src,vd_expected", + ( + ("hi mom", "hi mom", "hi pops", "hi pops"), + ("ud".encode("utf-8"), "ud", "vd".encode("utf-8"), "vd"), + (_gzip_data(b"ud"), "ud", _gzip_data(b"vd"), "vd"), + (_gzip_data("ud".encode("utf-8")), "ud", _gzip_data(b"vd"), "vd"), + ), + ) + def test_handle_args_root_processes_user_data( + self, ud_src, ud_expected, vd_src, vd_expected, capsys, tmpdir + ): + """Support reading multiple user-data file content types""" + paths, run_dir, user_data, vendor_data = self._setup_paths( + tmpdir, ud_val=ud_src, vd_val=vd_src + ) + sensitive_file = run_dir.join(INSTANCE_JSON_SENSITIVE_FILE) + sensitive_file.write('{"my-var": "it worked"}') + args = self.args( + debug=False, + dump_all=True, + format=None, + instance_data=None, + list_keys=False, + user_data=user_data.strpath, + vendor_data=vendor_data.strpath, + varname=None, + ) + with mock.patch("cloudinit.cmd.query.read_cfg_paths") as m_paths: + m_paths.return_value = paths + with mock.patch("os.getuid") as m_getuid: + m_getuid.return_value = 0 + assert 0 == query.handle_args("anyname", args) + out, _err = capsys.readouterr() + cmd_output = json.loads(out) + assert "it worked" == cmd_output["my-var"] + if ud_expected == "ci-b64:": + ud_expected = "ci-b64:{}".format(b64e(ud_src)) + if vd_expected == "ci-b64:": + vd_expected = "ci-b64:{}".format(b64e(vd_src)) + assert ud_expected == cmd_output["userdata"] + assert vd_expected == cmd_output["vendordata"] + + def test_handle_args_user_vendor_data_defaults_to_instance_link( + self, capsys, tmpdir + ): + """When no instance_data argument, root uses sensitive json.""" + paths, run_dir, _, _ = self._setup_paths(tmpdir) + sensitive_file = run_dir.join(INSTANCE_JSON_SENSITIVE_FILE) + sensitive_file.write('{"my-var": "it worked"}') + + ud_path = os.path.join(paths.instance_link, "user-data.txt") + write_file(ud_path, "instance_link_ud") + vd_path = os.path.join(paths.instance_link, "vendor-data.txt") + write_file(vd_path, "instance_link_vd") + + args = self.args( + debug=False, + dump_all=True, + format=None, + instance_data=None, + list_keys=False, + user_data=None, + vendor_data=None, + varname=None, + ) + with mock.patch("cloudinit.cmd.query.read_cfg_paths") as m_paths: + m_paths.return_value = paths + with mock.patch("os.getuid", return_value=0): + assert 0 == query.handle_args("anyname", args) + expected = ( + '{\n "my-var": "it worked",\n ' + '"userdata": "instance_link_ud",\n ' + '"vendordata": "instance_link_vd"\n}\n' + ) + out, _ = capsys.readouterr() + assert expected == out + + def test_handle_args_root_uses_instance_sensitive_data( + self, capsys, tmpdir + ): + """When no instance_data argument, root uses sensitive json.""" + paths, run_dir, user_data, vendor_data = self._setup_paths( + tmpdir, ud_val="ud", vd_val="vd" + ) + sensitive_file = run_dir.join(INSTANCE_JSON_SENSITIVE_FILE) + sensitive_file.write('{"my-var": "it worked"}') + args = self.args( + debug=False, + dump_all=True, + format=None, + instance_data=None, + list_keys=False, + user_data=user_data.strpath, + vendor_data=vendor_data.strpath, + varname=None, + ) + with mock.patch("cloudinit.cmd.query.read_cfg_paths") as m_paths: + m_paths.return_value = paths + with mock.patch("os.getuid") as m_getuid: + m_getuid.return_value = 0 + assert 0 == query.handle_args("anyname", args) + expected = ( + '{\n "my-var": "it worked",\n ' + '"userdata": "ud",\n "vendordata": "vd"\n}\n' + ) + out, _err = capsys.readouterr() + assert expected == out + + def test_handle_args_dumps_all_instance_data(self, capsys, tmpdir): + """When --all is specified query will dump all instance data vars.""" + instance_data = tmpdir.join("instance-data") + instance_data.write('{"my-var": "it worked"}') + args = self.args( + debug=False, + dump_all=True, + format=None, + instance_data=instance_data.strpath, + list_keys=False, + user_data="ud", + vendor_data="vd", + varname=None, + ) + with mock.patch("os.getuid") as m_getuid: + m_getuid.return_value = 100 + assert 0 == query.handle_args("anyname", args) + expected = ( + '{\n "my-var": "it worked",\n "userdata": "<%s> file:ud",\n' + ' "vendordata": "<%s> file:vd"\n}\n' + % (REDACT_SENSITIVE_VALUE, REDACT_SENSITIVE_VALUE) + ) + out, _err = capsys.readouterr() + assert expected == out + + def test_handle_args_returns_top_level_varname(self, capsys, tmpdir): + """When the argument varname is passed, report its value.""" + instance_data = tmpdir.join("instance-data") + instance_data.write('{"my-var": "it worked"}') + args = self.args( + debug=False, + dump_all=True, + format=None, + instance_data=instance_data.strpath, + list_keys=False, + user_data="ud", + vendor_data="vd", + varname="my_var", + ) + with mock.patch("os.getuid") as m_getuid: + m_getuid.return_value = 100 + assert 0 == query.handle_args("anyname", args) + out, _err = capsys.readouterr() + assert "it worked\n" == out + + @pytest.mark.parametrize( + "inst_data,varname,expected", + ( + ( + '{"v1": {"key-2": "value-2"}, "my-var": "it worked"}', + "v1.key_2", + "value-2\n", + ), + # Assert no jinja underscore-delimited aliases are reported on CLI + ( + '{"v1": {"something-hyphenated": {"no.underscores":"x",' + ' "no-alias": "y"}}, "my-var": "it worked"}', + "v1.something_hyphenated", + '{\n "no-alias": "y",\n "no.underscores": "x"\n}\n', + ), + ), + ) + def test_handle_args_returns_nested_varname( + self, inst_data, varname, expected, capsys, tmpdir + ): + """If user_data file is a jinja template render instance-data vars.""" + instance_data = tmpdir.join("instance-data") + instance_data.write(inst_data) + args = self.args( + debug=False, + dump_all=False, + format=None, + instance_data=instance_data.strpath, + user_data="ud", + vendor_data="vd", + list_keys=False, + varname=varname, + ) + with mock.patch("os.getuid") as m_getuid: + m_getuid.return_value = 100 + assert 0 == query.handle_args("anyname", args) + out, _err = capsys.readouterr() + assert expected == out + + def test_handle_args_returns_standardized_vars_to_top_level_aliases( + self, capsys, tmpdir + ): + """Any standardized vars under v# are promoted as top-level aliases.""" + instance_data = tmpdir.join("instance-data") + instance_data.write( + '{"v1": {"v1_1": "val1.1"}, "v2": {"v2_2": "val2.2"},' + ' "top": "gun"}' + ) + expected = dedent( + """\ + { + "top": "gun", + "userdata": "<redacted for non-root user> file:ud", + "v1": { + "v1_1": "val1.1" + }, + "v1_1": "val1.1", + "v2": { + "v2_2": "val2.2" + }, + "v2_2": "val2.2", + "vendordata": "<redacted for non-root user> file:vd" + } + """ + ) + args = self.args( + debug=False, + dump_all=True, + format=None, + instance_data=instance_data.strpath, + user_data="ud", + vendor_data="vd", + list_keys=False, + varname=None, + ) + with mock.patch("os.getuid") as m_getuid: + m_getuid.return_value = 100 + assert 0 == query.handle_args("anyname", args) + out, _err = capsys.readouterr() + assert expected == out + + def test_handle_args_list_keys_sorts_top_level_keys_when_no_varname( + self, capsys, tmpdir + ): + """Sort all top-level keys when only --list-keys provided.""" + instance_data = tmpdir.join("instance-data") + instance_data.write( + '{"v1": {"v1_1": "val1.1"}, "v2": {"v2_2": "val2.2"},' + ' "top": "gun"}' + ) + expected = "top\nuserdata\nv1\nv1_1\nv2\nv2_2\nvendordata\n" + args = self.args( + debug=False, + dump_all=False, + format=None, + instance_data=instance_data.strpath, + list_keys=True, + user_data="ud", + vendor_data="vd", + varname=None, + ) + with mock.patch("os.getuid") as m_getuid: + m_getuid.return_value = 100 + assert 0 == query.handle_args("anyname", args) + out, _err = capsys.readouterr() + assert expected == out + + def test_handle_args_list_keys_sorts_nested_keys_when_varname( + self, capsys, tmpdir + ): + """Sort all nested keys of varname object when --list-keys provided.""" + instance_data = tmpdir.join("instance-data") + instance_data.write( + '{"v1": {"v1_1": "val1.1", "v1_2": "val1.2"}, "v2":' + + ' {"v2_2": "val2.2"}, "top": "gun"}' + ) + expected = "v1_1\nv1_2\n" + args = self.args( + debug=False, + dump_all=False, + format=None, + instance_data=instance_data.strpath, + list_keys=True, + user_data="ud", + vendor_data="vd", + varname="v1", + ) + with mock.patch("os.getuid") as m_getuid: + m_getuid.return_value = 100 + assert 0 == query.handle_args("anyname", args) + out, _err = capsys.readouterr() + assert expected == out + + def test_handle_args_list_keys_errors_when_varname_is_not_a_dict( + self, caplog, tmpdir + ): + """Raise an error when --list-keys and varname specify a non-list.""" + instance_data = tmpdir.join("instance-data") + instance_data.write( + '{"v1": {"v1_1": "val1.1", "v1_2": "val1.2"}, "v2": ' + + '{"v2_2": "val2.2"}, "top": "gun"}' + ) + expected_error = "--list-keys provided but 'top' is not a dict" + args = self.args( + debug=False, + dump_all=False, + format=None, + instance_data=instance_data.strpath, + list_keys=True, + user_data="ud", + vendor_data="vd", + varname="top", + ) + with mock.patch("os.getuid") as m_getuid: + m_getuid.return_value = 100 + assert 1 == query.handle_args("anyname", args) + assert expected_error in caplog.text + + +# vi: ts=4 expandtab diff --git a/tests/unittests/cmd/test_status.py b/tests/unittests/cmd/test_status.py new file mode 100644 index 00000000..c5f424da --- /dev/null +++ b/tests/unittests/cmd/test_status.py @@ -0,0 +1,548 @@ +# This file is part of cloud-init. See LICENSE file for license information. + +import os +from collections import namedtuple +from io import StringIO +from textwrap import dedent + +from cloudinit.atomic_helper import write_json +from cloudinit.cmd import status +from cloudinit.util import ensure_file +from tests.unittests.helpers import CiTestCase, mock, wrap_and_call + +mypaths = namedtuple("MyPaths", "run_dir") +myargs = namedtuple("MyArgs", "long wait") + + +class TestStatus(CiTestCase): + def setUp(self): + super(TestStatus, self).setUp() + self.new_root = self.tmp_dir() + self.status_file = self.tmp_path("status.json", self.new_root) + self.disable_file = self.tmp_path("cloudinit-disable", self.new_root) + self.paths = mypaths(run_dir=self.new_root) + + class FakeInit(object): + paths = self.paths + + def __init__(self, ds_deps): + pass + + def read_cfg(self): + pass + + self.init_class = FakeInit + + def test__is_cloudinit_disabled_false_on_sysvinit(self): + """When not in an environment using systemd, return False.""" + ensure_file(self.disable_file) # Create the ignored disable file + (is_disabled, reason) = wrap_and_call( + "cloudinit.cmd.status", + { + "uses_systemd": False, + "get_cmdline": "root=/dev/my-root not-important", + }, + status._is_cloudinit_disabled, + self.disable_file, + self.paths, + ) + self.assertFalse( + is_disabled, "expected enabled cloud-init on sysvinit" + ) + self.assertEqual("Cloud-init enabled on sysvinit", reason) + + def test__is_cloudinit_disabled_true_on_disable_file(self): + """When using systemd and disable_file is present return disabled.""" + ensure_file(self.disable_file) # Create observed disable file + (is_disabled, reason) = wrap_and_call( + "cloudinit.cmd.status", + { + "uses_systemd": True, + "get_cmdline": "root=/dev/my-root not-important", + }, + status._is_cloudinit_disabled, + self.disable_file, + self.paths, + ) + self.assertTrue(is_disabled, "expected disabled cloud-init") + self.assertEqual( + "Cloud-init disabled by {0}".format(self.disable_file), reason + ) + + def test__is_cloudinit_disabled_false_on_kernel_cmdline_enable(self): + """Not disabled when using systemd and enabled via commandline.""" + ensure_file(self.disable_file) # Create ignored disable file + (is_disabled, reason) = wrap_and_call( + "cloudinit.cmd.status", + { + "uses_systemd": True, + "get_cmdline": "something cloud-init=enabled else", + }, + status._is_cloudinit_disabled, + self.disable_file, + self.paths, + ) + self.assertFalse(is_disabled, "expected enabled cloud-init") + self.assertEqual( + "Cloud-init enabled by kernel command line cloud-init=enabled", + reason, + ) + + def test__is_cloudinit_disabled_true_on_kernel_cmdline(self): + """When kernel command line disables cloud-init return True.""" + (is_disabled, reason) = wrap_and_call( + "cloudinit.cmd.status", + { + "uses_systemd": True, + "get_cmdline": "something cloud-init=disabled else", + }, + status._is_cloudinit_disabled, + self.disable_file, + self.paths, + ) + self.assertTrue(is_disabled, "expected disabled cloud-init") + self.assertEqual( + "Cloud-init disabled by kernel parameter cloud-init=disabled", + reason, + ) + + def test__is_cloudinit_disabled_true_when_generator_disables(self): + """When cloud-init-generator writes disabled file return True.""" + disabled_file = os.path.join(self.paths.run_dir, "disabled") + ensure_file(disabled_file) + (is_disabled, reason) = wrap_and_call( + "cloudinit.cmd.status", + {"uses_systemd": True, "get_cmdline": "something"}, + status._is_cloudinit_disabled, + self.disable_file, + self.paths, + ) + self.assertTrue(is_disabled, "expected disabled cloud-init") + self.assertEqual("Cloud-init disabled by cloud-init-generator", reason) + + def test__is_cloudinit_disabled_false_when_enabled_in_systemd(self): + """Report enabled when systemd generator creates the enabled file.""" + enabled_file = os.path.join(self.paths.run_dir, "enabled") + ensure_file(enabled_file) + (is_disabled, reason) = wrap_and_call( + "cloudinit.cmd.status", + {"uses_systemd": True, "get_cmdline": "something ignored"}, + status._is_cloudinit_disabled, + self.disable_file, + self.paths, + ) + self.assertFalse(is_disabled, "expected enabled cloud-init") + self.assertEqual( + "Cloud-init enabled by systemd cloud-init-generator", reason + ) + + def test_status_returns_not_run(self): + """When status.json does not exist yet, return 'not run'.""" + self.assertFalse( + os.path.exists(self.status_file), "Unexpected status.json found" + ) + cmdargs = myargs(long=False, wait=False) + with mock.patch("sys.stdout", new_callable=StringIO) as m_stdout: + retcode = wrap_and_call( + "cloudinit.cmd.status", + { + "_is_cloudinit_disabled": (False, ""), + "Init": {"side_effect": self.init_class}, + }, + status.handle_status_args, + "ignored", + cmdargs, + ) + self.assertEqual(0, retcode) + self.assertEqual("status: not run\n", m_stdout.getvalue()) + + def test_status_returns_disabled_long_on_presence_of_disable_file(self): + """When cloudinit is disabled, return disabled reason.""" + + checked_files = [] + + def fakeexists(filepath): + checked_files.append(filepath) + status_file = os.path.join(self.paths.run_dir, "status.json") + return bool(not filepath == status_file) + + cmdargs = myargs(long=True, wait=False) + with mock.patch("sys.stdout", new_callable=StringIO) as m_stdout: + retcode = wrap_and_call( + "cloudinit.cmd.status", + { + "os.path.exists": {"side_effect": fakeexists}, + "_is_cloudinit_disabled": ( + True, + "disabled for some reason", + ), + "Init": {"side_effect": self.init_class}, + }, + status.handle_status_args, + "ignored", + cmdargs, + ) + self.assertEqual(0, retcode) + self.assertEqual( + [os.path.join(self.paths.run_dir, "status.json")], checked_files + ) + expected = dedent( + """\ + status: disabled + detail: + disabled for some reason + """ + ) + self.assertEqual(expected, m_stdout.getvalue()) + + def test_status_returns_running_on_no_results_json(self): + """Report running when status.json exists but result.json does not.""" + result_file = self.tmp_path("result.json", self.new_root) + write_json(self.status_file, {}) + self.assertFalse( + os.path.exists(result_file), "Unexpected result.json found" + ) + cmdargs = myargs(long=False, wait=False) + with mock.patch("sys.stdout", new_callable=StringIO) as m_stdout: + retcode = wrap_and_call( + "cloudinit.cmd.status", + { + "_is_cloudinit_disabled": (False, ""), + "Init": {"side_effect": self.init_class}, + }, + status.handle_status_args, + "ignored", + cmdargs, + ) + self.assertEqual(0, retcode) + self.assertEqual("status: running\n", m_stdout.getvalue()) + + def test_status_returns_running(self): + """Report running when status exists with an unfinished stage.""" + ensure_file(self.tmp_path("result.json", self.new_root)) + write_json( + self.status_file, {"v1": {"init": {"start": 1, "finished": None}}} + ) + cmdargs = myargs(long=False, wait=False) + with mock.patch("sys.stdout", new_callable=StringIO) as m_stdout: + retcode = wrap_and_call( + "cloudinit.cmd.status", + { + "_is_cloudinit_disabled": (False, ""), + "Init": {"side_effect": self.init_class}, + }, + status.handle_status_args, + "ignored", + cmdargs, + ) + self.assertEqual(0, retcode) + self.assertEqual("status: running\n", m_stdout.getvalue()) + + def test_status_returns_done(self): + """Report done results.json exists no stages are unfinished.""" + ensure_file(self.tmp_path("result.json", self.new_root)) + write_json( + self.status_file, + { + "v1": { + "stage": None, # No current stage running + "datasource": ( + "DataSourceNoCloud [seed=/var/.../seed/nocloud-net]" + "[dsmode=net]" + ), + "blah": {"finished": 123.456}, + "init": { + "errors": [], + "start": 124.567, + "finished": 125.678, + }, + "init-local": {"start": 123.45, "finished": 123.46}, + } + }, + ) + cmdargs = myargs(long=False, wait=False) + with mock.patch("sys.stdout", new_callable=StringIO) as m_stdout: + retcode = wrap_and_call( + "cloudinit.cmd.status", + { + "_is_cloudinit_disabled": (False, ""), + "Init": {"side_effect": self.init_class}, + }, + status.handle_status_args, + "ignored", + cmdargs, + ) + self.assertEqual(0, retcode) + self.assertEqual("status: done\n", m_stdout.getvalue()) + + def test_status_returns_done_long(self): + """Long format of done status includes datasource info.""" + ensure_file(self.tmp_path("result.json", self.new_root)) + write_json( + self.status_file, + { + "v1": { + "stage": None, + "datasource": ( + "DataSourceNoCloud [seed=/var/.../seed/nocloud-net]" + "[dsmode=net]" + ), + "init": {"start": 124.567, "finished": 125.678}, + "init-local": {"start": 123.45, "finished": 123.46}, + } + }, + ) + cmdargs = myargs(long=True, wait=False) + with mock.patch("sys.stdout", new_callable=StringIO) as m_stdout: + retcode = wrap_and_call( + "cloudinit.cmd.status", + { + "_is_cloudinit_disabled": (False, ""), + "Init": {"side_effect": self.init_class}, + }, + status.handle_status_args, + "ignored", + cmdargs, + ) + self.assertEqual(0, retcode) + expected = dedent( + """\ + status: done + time: Thu, 01 Jan 1970 00:02:05 +0000 + detail: + DataSourceNoCloud [seed=/var/.../seed/nocloud-net][dsmode=net] + """ + ) + self.assertEqual(expected, m_stdout.getvalue()) + + def test_status_on_errors(self): + """Reports error when any stage has errors.""" + write_json( + self.status_file, + { + "v1": { + "stage": None, + "blah": {"errors": [], "finished": 123.456}, + "init": { + "errors": ["error1"], + "start": 124.567, + "finished": 125.678, + }, + "init-local": {"start": 123.45, "finished": 123.46}, + } + }, + ) + cmdargs = myargs(long=False, wait=False) + with mock.patch("sys.stdout", new_callable=StringIO) as m_stdout: + retcode = wrap_and_call( + "cloudinit.cmd.status", + { + "_is_cloudinit_disabled": (False, ""), + "Init": {"side_effect": self.init_class}, + }, + status.handle_status_args, + "ignored", + cmdargs, + ) + self.assertEqual(1, retcode) + self.assertEqual("status: error\n", m_stdout.getvalue()) + + def test_status_on_errors_long(self): + """Long format of error status includes all error messages.""" + write_json( + self.status_file, + { + "v1": { + "stage": None, + "datasource": ( + "DataSourceNoCloud [seed=/var/.../seed/nocloud-net]" + "[dsmode=net]" + ), + "init": { + "errors": ["error1"], + "start": 124.567, + "finished": 125.678, + }, + "init-local": { + "errors": ["error2", "error3"], + "start": 123.45, + "finished": 123.46, + }, + } + }, + ) + cmdargs = myargs(long=True, wait=False) + with mock.patch("sys.stdout", new_callable=StringIO) as m_stdout: + retcode = wrap_and_call( + "cloudinit.cmd.status", + { + "_is_cloudinit_disabled": (False, ""), + "Init": {"side_effect": self.init_class}, + }, + status.handle_status_args, + "ignored", + cmdargs, + ) + self.assertEqual(1, retcode) + expected = dedent( + """\ + status: error + time: Thu, 01 Jan 1970 00:02:05 +0000 + detail: + error1 + error2 + error3 + """ + ) + self.assertEqual(expected, m_stdout.getvalue()) + + def test_status_returns_running_long_format(self): + """Long format reports the stage in which we are running.""" + write_json( + self.status_file, + { + "v1": { + "stage": "init", + "init": {"start": 124.456, "finished": None}, + "init-local": {"start": 123.45, "finished": 123.46}, + } + }, + ) + cmdargs = myargs(long=True, wait=False) + with mock.patch("sys.stdout", new_callable=StringIO) as m_stdout: + retcode = wrap_and_call( + "cloudinit.cmd.status", + { + "_is_cloudinit_disabled": (False, ""), + "Init": {"side_effect": self.init_class}, + }, + status.handle_status_args, + "ignored", + cmdargs, + ) + self.assertEqual(0, retcode) + expected = dedent( + """\ + status: running + time: Thu, 01 Jan 1970 00:02:04 +0000 + detail: + Running in stage: init + """ + ) + self.assertEqual(expected, m_stdout.getvalue()) + + def test_status_wait_blocks_until_done(self): + """Specifying wait will poll every 1/4 second until done state.""" + running_json = { + "v1": { + "stage": "init", + "init": {"start": 124.456, "finished": None}, + "init-local": {"start": 123.45, "finished": 123.46}, + } + } + done_json = { + "v1": { + "stage": None, + "init": {"start": 124.456, "finished": 125.678}, + "init-local": {"start": 123.45, "finished": 123.46}, + } + } + + self.sleep_calls = 0 + + def fake_sleep(interval): + self.assertEqual(0.25, interval) + self.sleep_calls += 1 + if self.sleep_calls == 2: + write_json(self.status_file, running_json) + elif self.sleep_calls == 3: + write_json(self.status_file, done_json) + result_file = self.tmp_path("result.json", self.new_root) + ensure_file(result_file) + + cmdargs = myargs(long=False, wait=True) + with mock.patch("sys.stdout", new_callable=StringIO) as m_stdout: + retcode = wrap_and_call( + "cloudinit.cmd.status", + { + "sleep": {"side_effect": fake_sleep}, + "_is_cloudinit_disabled": (False, ""), + "Init": {"side_effect": self.init_class}, + }, + status.handle_status_args, + "ignored", + cmdargs, + ) + self.assertEqual(0, retcode) + self.assertEqual(4, self.sleep_calls) + self.assertEqual("....\nstatus: done\n", m_stdout.getvalue()) + + def test_status_wait_blocks_until_error(self): + """Specifying wait will poll every 1/4 second until error state.""" + running_json = { + "v1": { + "stage": "init", + "init": {"start": 124.456, "finished": None}, + "init-local": {"start": 123.45, "finished": 123.46}, + } + } + error_json = { + "v1": { + "stage": None, + "init": { + "errors": ["error1"], + "start": 124.456, + "finished": 125.678, + }, + "init-local": {"start": 123.45, "finished": 123.46}, + } + } + + self.sleep_calls = 0 + + def fake_sleep(interval): + self.assertEqual(0.25, interval) + self.sleep_calls += 1 + if self.sleep_calls == 2: + write_json(self.status_file, running_json) + elif self.sleep_calls == 3: + write_json(self.status_file, error_json) + + cmdargs = myargs(long=False, wait=True) + with mock.patch("sys.stdout", new_callable=StringIO) as m_stdout: + retcode = wrap_and_call( + "cloudinit.cmd.status", + { + "sleep": {"side_effect": fake_sleep}, + "_is_cloudinit_disabled": (False, ""), + "Init": {"side_effect": self.init_class}, + }, + status.handle_status_args, + "ignored", + cmdargs, + ) + self.assertEqual(1, retcode) + self.assertEqual(4, self.sleep_calls) + self.assertEqual("....\nstatus: error\n", m_stdout.getvalue()) + + def test_status_main(self): + """status.main can be run as a standalone script.""" + write_json( + self.status_file, {"v1": {"init": {"start": 1, "finished": None}}} + ) + with self.assertRaises(SystemExit) as context_manager: + with mock.patch("sys.stdout", new_callable=StringIO) as m_stdout: + wrap_and_call( + "cloudinit.cmd.status", + { + "sys.argv": {"new": ["status"]}, + "_is_cloudinit_disabled": (False, ""), + "Init": {"side_effect": self.init_class}, + }, + status.main, + ) + self.assertEqual(0, context_manager.exception.code) + self.assertEqual("status: running\n", m_stdout.getvalue()) + + +# vi: ts=4 expandtab syntax=python |