summaryrefslogtreecommitdiff
path: root/tests/unittests/cmd/test_status.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/unittests/cmd/test_status.py')
-rw-r--r--tests/unittests/cmd/test_status.py548
1 files changed, 548 insertions, 0 deletions
diff --git a/tests/unittests/cmd/test_status.py b/tests/unittests/cmd/test_status.py
new file mode 100644
index 00000000..c5f424da
--- /dev/null
+++ b/tests/unittests/cmd/test_status.py
@@ -0,0 +1,548 @@
+# This file is part of cloud-init. See LICENSE file for license information.
+
+import os
+from collections import namedtuple
+from io import StringIO
+from textwrap import dedent
+
+from cloudinit.atomic_helper import write_json
+from cloudinit.cmd import status
+from cloudinit.util import ensure_file
+from tests.unittests.helpers import CiTestCase, mock, wrap_and_call
+
+mypaths = namedtuple("MyPaths", "run_dir")
+myargs = namedtuple("MyArgs", "long wait")
+
+
+class TestStatus(CiTestCase):
+ def setUp(self):
+ super(TestStatus, self).setUp()
+ self.new_root = self.tmp_dir()
+ self.status_file = self.tmp_path("status.json", self.new_root)
+ self.disable_file = self.tmp_path("cloudinit-disable", self.new_root)
+ self.paths = mypaths(run_dir=self.new_root)
+
+ class FakeInit(object):
+ paths = self.paths
+
+ def __init__(self, ds_deps):
+ pass
+
+ def read_cfg(self):
+ pass
+
+ self.init_class = FakeInit
+
+ def test__is_cloudinit_disabled_false_on_sysvinit(self):
+ """When not in an environment using systemd, return False."""
+ ensure_file(self.disable_file) # Create the ignored disable file
+ (is_disabled, reason) = wrap_and_call(
+ "cloudinit.cmd.status",
+ {
+ "uses_systemd": False,
+ "get_cmdline": "root=/dev/my-root not-important",
+ },
+ status._is_cloudinit_disabled,
+ self.disable_file,
+ self.paths,
+ )
+ self.assertFalse(
+ is_disabled, "expected enabled cloud-init on sysvinit"
+ )
+ self.assertEqual("Cloud-init enabled on sysvinit", reason)
+
+ def test__is_cloudinit_disabled_true_on_disable_file(self):
+ """When using systemd and disable_file is present return disabled."""
+ ensure_file(self.disable_file) # Create observed disable file
+ (is_disabled, reason) = wrap_and_call(
+ "cloudinit.cmd.status",
+ {
+ "uses_systemd": True,
+ "get_cmdline": "root=/dev/my-root not-important",
+ },
+ status._is_cloudinit_disabled,
+ self.disable_file,
+ self.paths,
+ )
+ self.assertTrue(is_disabled, "expected disabled cloud-init")
+ self.assertEqual(
+ "Cloud-init disabled by {0}".format(self.disable_file), reason
+ )
+
+ def test__is_cloudinit_disabled_false_on_kernel_cmdline_enable(self):
+ """Not disabled when using systemd and enabled via commandline."""
+ ensure_file(self.disable_file) # Create ignored disable file
+ (is_disabled, reason) = wrap_and_call(
+ "cloudinit.cmd.status",
+ {
+ "uses_systemd": True,
+ "get_cmdline": "something cloud-init=enabled else",
+ },
+ status._is_cloudinit_disabled,
+ self.disable_file,
+ self.paths,
+ )
+ self.assertFalse(is_disabled, "expected enabled cloud-init")
+ self.assertEqual(
+ "Cloud-init enabled by kernel command line cloud-init=enabled",
+ reason,
+ )
+
+ def test__is_cloudinit_disabled_true_on_kernel_cmdline(self):
+ """When kernel command line disables cloud-init return True."""
+ (is_disabled, reason) = wrap_and_call(
+ "cloudinit.cmd.status",
+ {
+ "uses_systemd": True,
+ "get_cmdline": "something cloud-init=disabled else",
+ },
+ status._is_cloudinit_disabled,
+ self.disable_file,
+ self.paths,
+ )
+ self.assertTrue(is_disabled, "expected disabled cloud-init")
+ self.assertEqual(
+ "Cloud-init disabled by kernel parameter cloud-init=disabled",
+ reason,
+ )
+
+ def test__is_cloudinit_disabled_true_when_generator_disables(self):
+ """When cloud-init-generator writes disabled file return True."""
+ disabled_file = os.path.join(self.paths.run_dir, "disabled")
+ ensure_file(disabled_file)
+ (is_disabled, reason) = wrap_and_call(
+ "cloudinit.cmd.status",
+ {"uses_systemd": True, "get_cmdline": "something"},
+ status._is_cloudinit_disabled,
+ self.disable_file,
+ self.paths,
+ )
+ self.assertTrue(is_disabled, "expected disabled cloud-init")
+ self.assertEqual("Cloud-init disabled by cloud-init-generator", reason)
+
+ def test__is_cloudinit_disabled_false_when_enabled_in_systemd(self):
+ """Report enabled when systemd generator creates the enabled file."""
+ enabled_file = os.path.join(self.paths.run_dir, "enabled")
+ ensure_file(enabled_file)
+ (is_disabled, reason) = wrap_and_call(
+ "cloudinit.cmd.status",
+ {"uses_systemd": True, "get_cmdline": "something ignored"},
+ status._is_cloudinit_disabled,
+ self.disable_file,
+ self.paths,
+ )
+ self.assertFalse(is_disabled, "expected enabled cloud-init")
+ self.assertEqual(
+ "Cloud-init enabled by systemd cloud-init-generator", reason
+ )
+
+ def test_status_returns_not_run(self):
+ """When status.json does not exist yet, return 'not run'."""
+ self.assertFalse(
+ os.path.exists(self.status_file), "Unexpected status.json found"
+ )
+ cmdargs = myargs(long=False, wait=False)
+ with mock.patch("sys.stdout", new_callable=StringIO) as m_stdout:
+ retcode = wrap_and_call(
+ "cloudinit.cmd.status",
+ {
+ "_is_cloudinit_disabled": (False, ""),
+ "Init": {"side_effect": self.init_class},
+ },
+ status.handle_status_args,
+ "ignored",
+ cmdargs,
+ )
+ self.assertEqual(0, retcode)
+ self.assertEqual("status: not run\n", m_stdout.getvalue())
+
+ def test_status_returns_disabled_long_on_presence_of_disable_file(self):
+ """When cloudinit is disabled, return disabled reason."""
+
+ checked_files = []
+
+ def fakeexists(filepath):
+ checked_files.append(filepath)
+ status_file = os.path.join(self.paths.run_dir, "status.json")
+ return bool(not filepath == status_file)
+
+ cmdargs = myargs(long=True, wait=False)
+ with mock.patch("sys.stdout", new_callable=StringIO) as m_stdout:
+ retcode = wrap_and_call(
+ "cloudinit.cmd.status",
+ {
+ "os.path.exists": {"side_effect": fakeexists},
+ "_is_cloudinit_disabled": (
+ True,
+ "disabled for some reason",
+ ),
+ "Init": {"side_effect": self.init_class},
+ },
+ status.handle_status_args,
+ "ignored",
+ cmdargs,
+ )
+ self.assertEqual(0, retcode)
+ self.assertEqual(
+ [os.path.join(self.paths.run_dir, "status.json")], checked_files
+ )
+ expected = dedent(
+ """\
+ status: disabled
+ detail:
+ disabled for some reason
+ """
+ )
+ self.assertEqual(expected, m_stdout.getvalue())
+
+ def test_status_returns_running_on_no_results_json(self):
+ """Report running when status.json exists but result.json does not."""
+ result_file = self.tmp_path("result.json", self.new_root)
+ write_json(self.status_file, {})
+ self.assertFalse(
+ os.path.exists(result_file), "Unexpected result.json found"
+ )
+ cmdargs = myargs(long=False, wait=False)
+ with mock.patch("sys.stdout", new_callable=StringIO) as m_stdout:
+ retcode = wrap_and_call(
+ "cloudinit.cmd.status",
+ {
+ "_is_cloudinit_disabled": (False, ""),
+ "Init": {"side_effect": self.init_class},
+ },
+ status.handle_status_args,
+ "ignored",
+ cmdargs,
+ )
+ self.assertEqual(0, retcode)
+ self.assertEqual("status: running\n", m_stdout.getvalue())
+
+ def test_status_returns_running(self):
+ """Report running when status exists with an unfinished stage."""
+ ensure_file(self.tmp_path("result.json", self.new_root))
+ write_json(
+ self.status_file, {"v1": {"init": {"start": 1, "finished": None}}}
+ )
+ cmdargs = myargs(long=False, wait=False)
+ with mock.patch("sys.stdout", new_callable=StringIO) as m_stdout:
+ retcode = wrap_and_call(
+ "cloudinit.cmd.status",
+ {
+ "_is_cloudinit_disabled": (False, ""),
+ "Init": {"side_effect": self.init_class},
+ },
+ status.handle_status_args,
+ "ignored",
+ cmdargs,
+ )
+ self.assertEqual(0, retcode)
+ self.assertEqual("status: running\n", m_stdout.getvalue())
+
+ def test_status_returns_done(self):
+ """Report done results.json exists no stages are unfinished."""
+ ensure_file(self.tmp_path("result.json", self.new_root))
+ write_json(
+ self.status_file,
+ {
+ "v1": {
+ "stage": None, # No current stage running
+ "datasource": (
+ "DataSourceNoCloud [seed=/var/.../seed/nocloud-net]"
+ "[dsmode=net]"
+ ),
+ "blah": {"finished": 123.456},
+ "init": {
+ "errors": [],
+ "start": 124.567,
+ "finished": 125.678,
+ },
+ "init-local": {"start": 123.45, "finished": 123.46},
+ }
+ },
+ )
+ cmdargs = myargs(long=False, wait=False)
+ with mock.patch("sys.stdout", new_callable=StringIO) as m_stdout:
+ retcode = wrap_and_call(
+ "cloudinit.cmd.status",
+ {
+ "_is_cloudinit_disabled": (False, ""),
+ "Init": {"side_effect": self.init_class},
+ },
+ status.handle_status_args,
+ "ignored",
+ cmdargs,
+ )
+ self.assertEqual(0, retcode)
+ self.assertEqual("status: done\n", m_stdout.getvalue())
+
+ def test_status_returns_done_long(self):
+ """Long format of done status includes datasource info."""
+ ensure_file(self.tmp_path("result.json", self.new_root))
+ write_json(
+ self.status_file,
+ {
+ "v1": {
+ "stage": None,
+ "datasource": (
+ "DataSourceNoCloud [seed=/var/.../seed/nocloud-net]"
+ "[dsmode=net]"
+ ),
+ "init": {"start": 124.567, "finished": 125.678},
+ "init-local": {"start": 123.45, "finished": 123.46},
+ }
+ },
+ )
+ cmdargs = myargs(long=True, wait=False)
+ with mock.patch("sys.stdout", new_callable=StringIO) as m_stdout:
+ retcode = wrap_and_call(
+ "cloudinit.cmd.status",
+ {
+ "_is_cloudinit_disabled": (False, ""),
+ "Init": {"side_effect": self.init_class},
+ },
+ status.handle_status_args,
+ "ignored",
+ cmdargs,
+ )
+ self.assertEqual(0, retcode)
+ expected = dedent(
+ """\
+ status: done
+ time: Thu, 01 Jan 1970 00:02:05 +0000
+ detail:
+ DataSourceNoCloud [seed=/var/.../seed/nocloud-net][dsmode=net]
+ """
+ )
+ self.assertEqual(expected, m_stdout.getvalue())
+
+ def test_status_on_errors(self):
+ """Reports error when any stage has errors."""
+ write_json(
+ self.status_file,
+ {
+ "v1": {
+ "stage": None,
+ "blah": {"errors": [], "finished": 123.456},
+ "init": {
+ "errors": ["error1"],
+ "start": 124.567,
+ "finished": 125.678,
+ },
+ "init-local": {"start": 123.45, "finished": 123.46},
+ }
+ },
+ )
+ cmdargs = myargs(long=False, wait=False)
+ with mock.patch("sys.stdout", new_callable=StringIO) as m_stdout:
+ retcode = wrap_and_call(
+ "cloudinit.cmd.status",
+ {
+ "_is_cloudinit_disabled": (False, ""),
+ "Init": {"side_effect": self.init_class},
+ },
+ status.handle_status_args,
+ "ignored",
+ cmdargs,
+ )
+ self.assertEqual(1, retcode)
+ self.assertEqual("status: error\n", m_stdout.getvalue())
+
+ def test_status_on_errors_long(self):
+ """Long format of error status includes all error messages."""
+ write_json(
+ self.status_file,
+ {
+ "v1": {
+ "stage": None,
+ "datasource": (
+ "DataSourceNoCloud [seed=/var/.../seed/nocloud-net]"
+ "[dsmode=net]"
+ ),
+ "init": {
+ "errors": ["error1"],
+ "start": 124.567,
+ "finished": 125.678,
+ },
+ "init-local": {
+ "errors": ["error2", "error3"],
+ "start": 123.45,
+ "finished": 123.46,
+ },
+ }
+ },
+ )
+ cmdargs = myargs(long=True, wait=False)
+ with mock.patch("sys.stdout", new_callable=StringIO) as m_stdout:
+ retcode = wrap_and_call(
+ "cloudinit.cmd.status",
+ {
+ "_is_cloudinit_disabled": (False, ""),
+ "Init": {"side_effect": self.init_class},
+ },
+ status.handle_status_args,
+ "ignored",
+ cmdargs,
+ )
+ self.assertEqual(1, retcode)
+ expected = dedent(
+ """\
+ status: error
+ time: Thu, 01 Jan 1970 00:02:05 +0000
+ detail:
+ error1
+ error2
+ error3
+ """
+ )
+ self.assertEqual(expected, m_stdout.getvalue())
+
+ def test_status_returns_running_long_format(self):
+ """Long format reports the stage in which we are running."""
+ write_json(
+ self.status_file,
+ {
+ "v1": {
+ "stage": "init",
+ "init": {"start": 124.456, "finished": None},
+ "init-local": {"start": 123.45, "finished": 123.46},
+ }
+ },
+ )
+ cmdargs = myargs(long=True, wait=False)
+ with mock.patch("sys.stdout", new_callable=StringIO) as m_stdout:
+ retcode = wrap_and_call(
+ "cloudinit.cmd.status",
+ {
+ "_is_cloudinit_disabled": (False, ""),
+ "Init": {"side_effect": self.init_class},
+ },
+ status.handle_status_args,
+ "ignored",
+ cmdargs,
+ )
+ self.assertEqual(0, retcode)
+ expected = dedent(
+ """\
+ status: running
+ time: Thu, 01 Jan 1970 00:02:04 +0000
+ detail:
+ Running in stage: init
+ """
+ )
+ self.assertEqual(expected, m_stdout.getvalue())
+
+ def test_status_wait_blocks_until_done(self):
+ """Specifying wait will poll every 1/4 second until done state."""
+ running_json = {
+ "v1": {
+ "stage": "init",
+ "init": {"start": 124.456, "finished": None},
+ "init-local": {"start": 123.45, "finished": 123.46},
+ }
+ }
+ done_json = {
+ "v1": {
+ "stage": None,
+ "init": {"start": 124.456, "finished": 125.678},
+ "init-local": {"start": 123.45, "finished": 123.46},
+ }
+ }
+
+ self.sleep_calls = 0
+
+ def fake_sleep(interval):
+ self.assertEqual(0.25, interval)
+ self.sleep_calls += 1
+ if self.sleep_calls == 2:
+ write_json(self.status_file, running_json)
+ elif self.sleep_calls == 3:
+ write_json(self.status_file, done_json)
+ result_file = self.tmp_path("result.json", self.new_root)
+ ensure_file(result_file)
+
+ cmdargs = myargs(long=False, wait=True)
+ with mock.patch("sys.stdout", new_callable=StringIO) as m_stdout:
+ retcode = wrap_and_call(
+ "cloudinit.cmd.status",
+ {
+ "sleep": {"side_effect": fake_sleep},
+ "_is_cloudinit_disabled": (False, ""),
+ "Init": {"side_effect": self.init_class},
+ },
+ status.handle_status_args,
+ "ignored",
+ cmdargs,
+ )
+ self.assertEqual(0, retcode)
+ self.assertEqual(4, self.sleep_calls)
+ self.assertEqual("....\nstatus: done\n", m_stdout.getvalue())
+
+ def test_status_wait_blocks_until_error(self):
+ """Specifying wait will poll every 1/4 second until error state."""
+ running_json = {
+ "v1": {
+ "stage": "init",
+ "init": {"start": 124.456, "finished": None},
+ "init-local": {"start": 123.45, "finished": 123.46},
+ }
+ }
+ error_json = {
+ "v1": {
+ "stage": None,
+ "init": {
+ "errors": ["error1"],
+ "start": 124.456,
+ "finished": 125.678,
+ },
+ "init-local": {"start": 123.45, "finished": 123.46},
+ }
+ }
+
+ self.sleep_calls = 0
+
+ def fake_sleep(interval):
+ self.assertEqual(0.25, interval)
+ self.sleep_calls += 1
+ if self.sleep_calls == 2:
+ write_json(self.status_file, running_json)
+ elif self.sleep_calls == 3:
+ write_json(self.status_file, error_json)
+
+ cmdargs = myargs(long=False, wait=True)
+ with mock.patch("sys.stdout", new_callable=StringIO) as m_stdout:
+ retcode = wrap_and_call(
+ "cloudinit.cmd.status",
+ {
+ "sleep": {"side_effect": fake_sleep},
+ "_is_cloudinit_disabled": (False, ""),
+ "Init": {"side_effect": self.init_class},
+ },
+ status.handle_status_args,
+ "ignored",
+ cmdargs,
+ )
+ self.assertEqual(1, retcode)
+ self.assertEqual(4, self.sleep_calls)
+ self.assertEqual("....\nstatus: error\n", m_stdout.getvalue())
+
+ def test_status_main(self):
+ """status.main can be run as a standalone script."""
+ write_json(
+ self.status_file, {"v1": {"init": {"start": 1, "finished": None}}}
+ )
+ with self.assertRaises(SystemExit) as context_manager:
+ with mock.patch("sys.stdout", new_callable=StringIO) as m_stdout:
+ wrap_and_call(
+ "cloudinit.cmd.status",
+ {
+ "sys.argv": {"new": ["status"]},
+ "_is_cloudinit_disabled": (False, ""),
+ "Init": {"side_effect": self.init_class},
+ },
+ status.main,
+ )
+ self.assertEqual(0, context_manager.exception.code)
+ self.assertEqual("status: running\n", m_stdout.getvalue())
+
+
+# vi: ts=4 expandtab syntax=python