diff options
Diffstat (limited to 'tests/unittests/cmd/test_status.py')
-rw-r--r-- | tests/unittests/cmd/test_status.py | 548 |
1 files changed, 548 insertions, 0 deletions
diff --git a/tests/unittests/cmd/test_status.py b/tests/unittests/cmd/test_status.py new file mode 100644 index 00000000..c5f424da --- /dev/null +++ b/tests/unittests/cmd/test_status.py @@ -0,0 +1,548 @@ +# This file is part of cloud-init. See LICENSE file for license information. + +import os +from collections import namedtuple +from io import StringIO +from textwrap import dedent + +from cloudinit.atomic_helper import write_json +from cloudinit.cmd import status +from cloudinit.util import ensure_file +from tests.unittests.helpers import CiTestCase, mock, wrap_and_call + +mypaths = namedtuple("MyPaths", "run_dir") +myargs = namedtuple("MyArgs", "long wait") + + +class TestStatus(CiTestCase): + def setUp(self): + super(TestStatus, self).setUp() + self.new_root = self.tmp_dir() + self.status_file = self.tmp_path("status.json", self.new_root) + self.disable_file = self.tmp_path("cloudinit-disable", self.new_root) + self.paths = mypaths(run_dir=self.new_root) + + class FakeInit(object): + paths = self.paths + + def __init__(self, ds_deps): + pass + + def read_cfg(self): + pass + + self.init_class = FakeInit + + def test__is_cloudinit_disabled_false_on_sysvinit(self): + """When not in an environment using systemd, return False.""" + ensure_file(self.disable_file) # Create the ignored disable file + (is_disabled, reason) = wrap_and_call( + "cloudinit.cmd.status", + { + "uses_systemd": False, + "get_cmdline": "root=/dev/my-root not-important", + }, + status._is_cloudinit_disabled, + self.disable_file, + self.paths, + ) + self.assertFalse( + is_disabled, "expected enabled cloud-init on sysvinit" + ) + self.assertEqual("Cloud-init enabled on sysvinit", reason) + + def test__is_cloudinit_disabled_true_on_disable_file(self): + """When using systemd and disable_file is present return disabled.""" + ensure_file(self.disable_file) # Create observed disable file + (is_disabled, reason) = wrap_and_call( + "cloudinit.cmd.status", + { + "uses_systemd": True, + "get_cmdline": "root=/dev/my-root not-important", + }, + status._is_cloudinit_disabled, + self.disable_file, + self.paths, + ) + self.assertTrue(is_disabled, "expected disabled cloud-init") + self.assertEqual( + "Cloud-init disabled by {0}".format(self.disable_file), reason + ) + + def test__is_cloudinit_disabled_false_on_kernel_cmdline_enable(self): + """Not disabled when using systemd and enabled via commandline.""" + ensure_file(self.disable_file) # Create ignored disable file + (is_disabled, reason) = wrap_and_call( + "cloudinit.cmd.status", + { + "uses_systemd": True, + "get_cmdline": "something cloud-init=enabled else", + }, + status._is_cloudinit_disabled, + self.disable_file, + self.paths, + ) + self.assertFalse(is_disabled, "expected enabled cloud-init") + self.assertEqual( + "Cloud-init enabled by kernel command line cloud-init=enabled", + reason, + ) + + def test__is_cloudinit_disabled_true_on_kernel_cmdline(self): + """When kernel command line disables cloud-init return True.""" + (is_disabled, reason) = wrap_and_call( + "cloudinit.cmd.status", + { + "uses_systemd": True, + "get_cmdline": "something cloud-init=disabled else", + }, + status._is_cloudinit_disabled, + self.disable_file, + self.paths, + ) + self.assertTrue(is_disabled, "expected disabled cloud-init") + self.assertEqual( + "Cloud-init disabled by kernel parameter cloud-init=disabled", + reason, + ) + + def test__is_cloudinit_disabled_true_when_generator_disables(self): + """When cloud-init-generator writes disabled file return True.""" + disabled_file = os.path.join(self.paths.run_dir, "disabled") + ensure_file(disabled_file) + (is_disabled, reason) = wrap_and_call( + "cloudinit.cmd.status", + {"uses_systemd": True, "get_cmdline": "something"}, + status._is_cloudinit_disabled, + self.disable_file, + self.paths, + ) + self.assertTrue(is_disabled, "expected disabled cloud-init") + self.assertEqual("Cloud-init disabled by cloud-init-generator", reason) + + def test__is_cloudinit_disabled_false_when_enabled_in_systemd(self): + """Report enabled when systemd generator creates the enabled file.""" + enabled_file = os.path.join(self.paths.run_dir, "enabled") + ensure_file(enabled_file) + (is_disabled, reason) = wrap_and_call( + "cloudinit.cmd.status", + {"uses_systemd": True, "get_cmdline": "something ignored"}, + status._is_cloudinit_disabled, + self.disable_file, + self.paths, + ) + self.assertFalse(is_disabled, "expected enabled cloud-init") + self.assertEqual( + "Cloud-init enabled by systemd cloud-init-generator", reason + ) + + def test_status_returns_not_run(self): + """When status.json does not exist yet, return 'not run'.""" + self.assertFalse( + os.path.exists(self.status_file), "Unexpected status.json found" + ) + cmdargs = myargs(long=False, wait=False) + with mock.patch("sys.stdout", new_callable=StringIO) as m_stdout: + retcode = wrap_and_call( + "cloudinit.cmd.status", + { + "_is_cloudinit_disabled": (False, ""), + "Init": {"side_effect": self.init_class}, + }, + status.handle_status_args, + "ignored", + cmdargs, + ) + self.assertEqual(0, retcode) + self.assertEqual("status: not run\n", m_stdout.getvalue()) + + def test_status_returns_disabled_long_on_presence_of_disable_file(self): + """When cloudinit is disabled, return disabled reason.""" + + checked_files = [] + + def fakeexists(filepath): + checked_files.append(filepath) + status_file = os.path.join(self.paths.run_dir, "status.json") + return bool(not filepath == status_file) + + cmdargs = myargs(long=True, wait=False) + with mock.patch("sys.stdout", new_callable=StringIO) as m_stdout: + retcode = wrap_and_call( + "cloudinit.cmd.status", + { + "os.path.exists": {"side_effect": fakeexists}, + "_is_cloudinit_disabled": ( + True, + "disabled for some reason", + ), + "Init": {"side_effect": self.init_class}, + }, + status.handle_status_args, + "ignored", + cmdargs, + ) + self.assertEqual(0, retcode) + self.assertEqual( + [os.path.join(self.paths.run_dir, "status.json")], checked_files + ) + expected = dedent( + """\ + status: disabled + detail: + disabled for some reason + """ + ) + self.assertEqual(expected, m_stdout.getvalue()) + + def test_status_returns_running_on_no_results_json(self): + """Report running when status.json exists but result.json does not.""" + result_file = self.tmp_path("result.json", self.new_root) + write_json(self.status_file, {}) + self.assertFalse( + os.path.exists(result_file), "Unexpected result.json found" + ) + cmdargs = myargs(long=False, wait=False) + with mock.patch("sys.stdout", new_callable=StringIO) as m_stdout: + retcode = wrap_and_call( + "cloudinit.cmd.status", + { + "_is_cloudinit_disabled": (False, ""), + "Init": {"side_effect": self.init_class}, + }, + status.handle_status_args, + "ignored", + cmdargs, + ) + self.assertEqual(0, retcode) + self.assertEqual("status: running\n", m_stdout.getvalue()) + + def test_status_returns_running(self): + """Report running when status exists with an unfinished stage.""" + ensure_file(self.tmp_path("result.json", self.new_root)) + write_json( + self.status_file, {"v1": {"init": {"start": 1, "finished": None}}} + ) + cmdargs = myargs(long=False, wait=False) + with mock.patch("sys.stdout", new_callable=StringIO) as m_stdout: + retcode = wrap_and_call( + "cloudinit.cmd.status", + { + "_is_cloudinit_disabled": (False, ""), + "Init": {"side_effect": self.init_class}, + }, + status.handle_status_args, + "ignored", + cmdargs, + ) + self.assertEqual(0, retcode) + self.assertEqual("status: running\n", m_stdout.getvalue()) + + def test_status_returns_done(self): + """Report done results.json exists no stages are unfinished.""" + ensure_file(self.tmp_path("result.json", self.new_root)) + write_json( + self.status_file, + { + "v1": { + "stage": None, # No current stage running + "datasource": ( + "DataSourceNoCloud [seed=/var/.../seed/nocloud-net]" + "[dsmode=net]" + ), + "blah": {"finished": 123.456}, + "init": { + "errors": [], + "start": 124.567, + "finished": 125.678, + }, + "init-local": {"start": 123.45, "finished": 123.46}, + } + }, + ) + cmdargs = myargs(long=False, wait=False) + with mock.patch("sys.stdout", new_callable=StringIO) as m_stdout: + retcode = wrap_and_call( + "cloudinit.cmd.status", + { + "_is_cloudinit_disabled": (False, ""), + "Init": {"side_effect": self.init_class}, + }, + status.handle_status_args, + "ignored", + cmdargs, + ) + self.assertEqual(0, retcode) + self.assertEqual("status: done\n", m_stdout.getvalue()) + + def test_status_returns_done_long(self): + """Long format of done status includes datasource info.""" + ensure_file(self.tmp_path("result.json", self.new_root)) + write_json( + self.status_file, + { + "v1": { + "stage": None, + "datasource": ( + "DataSourceNoCloud [seed=/var/.../seed/nocloud-net]" + "[dsmode=net]" + ), + "init": {"start": 124.567, "finished": 125.678}, + "init-local": {"start": 123.45, "finished": 123.46}, + } + }, + ) + cmdargs = myargs(long=True, wait=False) + with mock.patch("sys.stdout", new_callable=StringIO) as m_stdout: + retcode = wrap_and_call( + "cloudinit.cmd.status", + { + "_is_cloudinit_disabled": (False, ""), + "Init": {"side_effect": self.init_class}, + }, + status.handle_status_args, + "ignored", + cmdargs, + ) + self.assertEqual(0, retcode) + expected = dedent( + """\ + status: done + time: Thu, 01 Jan 1970 00:02:05 +0000 + detail: + DataSourceNoCloud [seed=/var/.../seed/nocloud-net][dsmode=net] + """ + ) + self.assertEqual(expected, m_stdout.getvalue()) + + def test_status_on_errors(self): + """Reports error when any stage has errors.""" + write_json( + self.status_file, + { + "v1": { + "stage": None, + "blah": {"errors": [], "finished": 123.456}, + "init": { + "errors": ["error1"], + "start": 124.567, + "finished": 125.678, + }, + "init-local": {"start": 123.45, "finished": 123.46}, + } + }, + ) + cmdargs = myargs(long=False, wait=False) + with mock.patch("sys.stdout", new_callable=StringIO) as m_stdout: + retcode = wrap_and_call( + "cloudinit.cmd.status", + { + "_is_cloudinit_disabled": (False, ""), + "Init": {"side_effect": self.init_class}, + }, + status.handle_status_args, + "ignored", + cmdargs, + ) + self.assertEqual(1, retcode) + self.assertEqual("status: error\n", m_stdout.getvalue()) + + def test_status_on_errors_long(self): + """Long format of error status includes all error messages.""" + write_json( + self.status_file, + { + "v1": { + "stage": None, + "datasource": ( + "DataSourceNoCloud [seed=/var/.../seed/nocloud-net]" + "[dsmode=net]" + ), + "init": { + "errors": ["error1"], + "start": 124.567, + "finished": 125.678, + }, + "init-local": { + "errors": ["error2", "error3"], + "start": 123.45, + "finished": 123.46, + }, + } + }, + ) + cmdargs = myargs(long=True, wait=False) + with mock.patch("sys.stdout", new_callable=StringIO) as m_stdout: + retcode = wrap_and_call( + "cloudinit.cmd.status", + { + "_is_cloudinit_disabled": (False, ""), + "Init": {"side_effect": self.init_class}, + }, + status.handle_status_args, + "ignored", + cmdargs, + ) + self.assertEqual(1, retcode) + expected = dedent( + """\ + status: error + time: Thu, 01 Jan 1970 00:02:05 +0000 + detail: + error1 + error2 + error3 + """ + ) + self.assertEqual(expected, m_stdout.getvalue()) + + def test_status_returns_running_long_format(self): + """Long format reports the stage in which we are running.""" + write_json( + self.status_file, + { + "v1": { + "stage": "init", + "init": {"start": 124.456, "finished": None}, + "init-local": {"start": 123.45, "finished": 123.46}, + } + }, + ) + cmdargs = myargs(long=True, wait=False) + with mock.patch("sys.stdout", new_callable=StringIO) as m_stdout: + retcode = wrap_and_call( + "cloudinit.cmd.status", + { + "_is_cloudinit_disabled": (False, ""), + "Init": {"side_effect": self.init_class}, + }, + status.handle_status_args, + "ignored", + cmdargs, + ) + self.assertEqual(0, retcode) + expected = dedent( + """\ + status: running + time: Thu, 01 Jan 1970 00:02:04 +0000 + detail: + Running in stage: init + """ + ) + self.assertEqual(expected, m_stdout.getvalue()) + + def test_status_wait_blocks_until_done(self): + """Specifying wait will poll every 1/4 second until done state.""" + running_json = { + "v1": { + "stage": "init", + "init": {"start": 124.456, "finished": None}, + "init-local": {"start": 123.45, "finished": 123.46}, + } + } + done_json = { + "v1": { + "stage": None, + "init": {"start": 124.456, "finished": 125.678}, + "init-local": {"start": 123.45, "finished": 123.46}, + } + } + + self.sleep_calls = 0 + + def fake_sleep(interval): + self.assertEqual(0.25, interval) + self.sleep_calls += 1 + if self.sleep_calls == 2: + write_json(self.status_file, running_json) + elif self.sleep_calls == 3: + write_json(self.status_file, done_json) + result_file = self.tmp_path("result.json", self.new_root) + ensure_file(result_file) + + cmdargs = myargs(long=False, wait=True) + with mock.patch("sys.stdout", new_callable=StringIO) as m_stdout: + retcode = wrap_and_call( + "cloudinit.cmd.status", + { + "sleep": {"side_effect": fake_sleep}, + "_is_cloudinit_disabled": (False, ""), + "Init": {"side_effect": self.init_class}, + }, + status.handle_status_args, + "ignored", + cmdargs, + ) + self.assertEqual(0, retcode) + self.assertEqual(4, self.sleep_calls) + self.assertEqual("....\nstatus: done\n", m_stdout.getvalue()) + + def test_status_wait_blocks_until_error(self): + """Specifying wait will poll every 1/4 second until error state.""" + running_json = { + "v1": { + "stage": "init", + "init": {"start": 124.456, "finished": None}, + "init-local": {"start": 123.45, "finished": 123.46}, + } + } + error_json = { + "v1": { + "stage": None, + "init": { + "errors": ["error1"], + "start": 124.456, + "finished": 125.678, + }, + "init-local": {"start": 123.45, "finished": 123.46}, + } + } + + self.sleep_calls = 0 + + def fake_sleep(interval): + self.assertEqual(0.25, interval) + self.sleep_calls += 1 + if self.sleep_calls == 2: + write_json(self.status_file, running_json) + elif self.sleep_calls == 3: + write_json(self.status_file, error_json) + + cmdargs = myargs(long=False, wait=True) + with mock.patch("sys.stdout", new_callable=StringIO) as m_stdout: + retcode = wrap_and_call( + "cloudinit.cmd.status", + { + "sleep": {"side_effect": fake_sleep}, + "_is_cloudinit_disabled": (False, ""), + "Init": {"side_effect": self.init_class}, + }, + status.handle_status_args, + "ignored", + cmdargs, + ) + self.assertEqual(1, retcode) + self.assertEqual(4, self.sleep_calls) + self.assertEqual("....\nstatus: error\n", m_stdout.getvalue()) + + def test_status_main(self): + """status.main can be run as a standalone script.""" + write_json( + self.status_file, {"v1": {"init": {"start": 1, "finished": None}}} + ) + with self.assertRaises(SystemExit) as context_manager: + with mock.patch("sys.stdout", new_callable=StringIO) as m_stdout: + wrap_and_call( + "cloudinit.cmd.status", + { + "sys.argv": {"new": ["status"]}, + "_is_cloudinit_disabled": (False, ""), + "Init": {"side_effect": self.init_class}, + }, + status.main, + ) + self.assertEqual(0, context_manager.exception.code) + self.assertEqual("status: running\n", m_stdout.getvalue()) + + +# vi: ts=4 expandtab syntax=python |