summaryrefslogtreecommitdiff
path: root/tests/unittests/cmd
diff options
context:
space:
mode:
Diffstat (limited to 'tests/unittests/cmd')
-rw-r--r--tests/unittests/cmd/__init__.py0
-rw-r--r--tests/unittests/cmd/devel/__init__.py0
-rw-r--r--tests/unittests/cmd/devel/test_hotplug_hook.py236
-rw-r--r--tests/unittests/cmd/devel/test_logs.py213
-rw-r--r--tests/unittests/cmd/devel/test_render.py154
-rw-r--r--tests/unittests/cmd/test_clean.py211
-rw-r--r--tests/unittests/cmd/test_cloud_id.py187
-rw-r--r--tests/unittests/cmd/test_main.py241
-rw-r--r--tests/unittests/cmd/test_query.py537
-rw-r--r--tests/unittests/cmd/test_status.py548
10 files changed, 2327 insertions, 0 deletions
diff --git a/tests/unittests/cmd/__init__.py b/tests/unittests/cmd/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/tests/unittests/cmd/__init__.py
diff --git a/tests/unittests/cmd/devel/__init__.py b/tests/unittests/cmd/devel/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/tests/unittests/cmd/devel/__init__.py
diff --git a/tests/unittests/cmd/devel/test_hotplug_hook.py b/tests/unittests/cmd/devel/test_hotplug_hook.py
new file mode 100644
index 00000000..5ecb5969
--- /dev/null
+++ b/tests/unittests/cmd/devel/test_hotplug_hook.py
@@ -0,0 +1,236 @@
+from collections import namedtuple
+from unittest import mock
+from unittest.mock import call
+
+import pytest
+
+from cloudinit.cmd.devel.hotplug_hook import handle_hotplug
+from cloudinit.distros import Distro
+from cloudinit.event import EventType
+from cloudinit.net.activators import NetworkActivator
+from cloudinit.net.network_state import NetworkState
+from cloudinit.sources import DataSource
+from cloudinit.stages import Init
+
+hotplug_args = namedtuple("hotplug_args", "udevaction, subsystem, devpath")
+FAKE_MAC = "11:22:33:44:55:66"
+
+
+@pytest.fixture
+def mocks():
+ m_init = mock.MagicMock(spec=Init)
+ m_distro = mock.MagicMock(spec=Distro)
+ m_datasource = mock.MagicMock(spec=DataSource)
+ m_datasource.distro = m_distro
+ m_init.datasource = m_datasource
+ m_init.fetch.return_value = m_datasource
+
+ read_sys_net = mock.patch(
+ "cloudinit.cmd.devel.hotplug_hook.read_sys_net_safe",
+ return_value=FAKE_MAC,
+ )
+
+ update_event_enabled = mock.patch(
+ "cloudinit.stages.update_event_enabled",
+ return_value=True,
+ )
+
+ m_network_state = mock.MagicMock(spec=NetworkState)
+ parse_net = mock.patch(
+ "cloudinit.cmd.devel.hotplug_hook.parse_net_config_data",
+ return_value=m_network_state,
+ )
+
+ m_activator = mock.MagicMock(spec=NetworkActivator)
+ select_activator = mock.patch(
+ "cloudinit.cmd.devel.hotplug_hook.activators.select_activator",
+ return_value=m_activator,
+ )
+
+ sleep = mock.patch("time.sleep")
+
+ read_sys_net.start()
+ update_event_enabled.start()
+ parse_net.start()
+ select_activator.start()
+ m_sleep = sleep.start()
+
+ yield namedtuple("mocks", "m_init m_network_state m_activator m_sleep")(
+ m_init=m_init,
+ m_network_state=m_network_state,
+ m_activator=m_activator,
+ m_sleep=m_sleep,
+ )
+
+ read_sys_net.stop()
+ update_event_enabled.stop()
+ parse_net.stop()
+ select_activator.stop()
+ sleep.stop()
+
+
+class TestUnsupportedActions:
+ def test_unsupported_subsystem(self, mocks):
+ with pytest.raises(
+ Exception, match="cannot handle events for subsystem: not_real"
+ ):
+ handle_hotplug(
+ hotplug_init=mocks.m_init,
+ devpath="/dev/fake",
+ subsystem="not_real",
+ udevaction="add",
+ )
+
+ def test_unsupported_udevaction(self, mocks):
+ with pytest.raises(ValueError, match="Unknown action: not_real"):
+ handle_hotplug(
+ hotplug_init=mocks.m_init,
+ devpath="/dev/fake",
+ udevaction="not_real",
+ subsystem="net",
+ )
+
+
+class TestHotplug:
+ def test_succcessful_add(self, mocks):
+ init = mocks.m_init
+ mocks.m_network_state.iter_interfaces.return_value = [
+ {
+ "mac_address": FAKE_MAC,
+ }
+ ]
+ handle_hotplug(
+ hotplug_init=init,
+ devpath="/dev/fake",
+ udevaction="add",
+ subsystem="net",
+ )
+ init.datasource.update_metadata_if_supported.assert_called_once_with(
+ [EventType.HOTPLUG]
+ )
+ mocks.m_activator.bring_up_interface.assert_called_once_with("fake")
+ mocks.m_activator.bring_down_interface.assert_not_called()
+ init._write_to_cache.assert_called_once_with()
+
+ def test_successful_remove(self, mocks):
+ init = mocks.m_init
+ mocks.m_network_state.iter_interfaces.return_value = [{}]
+ handle_hotplug(
+ hotplug_init=init,
+ devpath="/dev/fake",
+ udevaction="remove",
+ subsystem="net",
+ )
+ init.datasource.update_metadata_if_supported.assert_called_once_with(
+ [EventType.HOTPLUG]
+ )
+ mocks.m_activator.bring_down_interface.assert_called_once_with("fake")
+ mocks.m_activator.bring_up_interface.assert_not_called()
+ init._write_to_cache.assert_called_once_with()
+
+ def test_update_event_disabled(self, mocks, caplog):
+ init = mocks.m_init
+ with mock.patch(
+ "cloudinit.stages.update_event_enabled", return_value=False
+ ):
+ handle_hotplug(
+ hotplug_init=init,
+ devpath="/dev/fake",
+ udevaction="remove",
+ subsystem="net",
+ )
+ assert "hotplug not enabled for event of type" in caplog.text
+ init.datasource.update_metadata_if_supported.assert_not_called()
+ mocks.m_activator.bring_up_interface.assert_not_called()
+ mocks.m_activator.bring_down_interface.assert_not_called()
+ init._write_to_cache.assert_not_called()
+
+ def test_update_metadata_failed(self, mocks):
+ mocks.m_init.datasource.update_metadata_if_supported.return_value = (
+ False
+ )
+ with pytest.raises(
+ RuntimeError, match="Datasource .* not updated for event hotplug"
+ ):
+ handle_hotplug(
+ hotplug_init=mocks.m_init,
+ devpath="/dev/fake",
+ udevaction="remove",
+ subsystem="net",
+ )
+
+ def test_detect_hotplugged_device_not_detected_on_add(self, mocks):
+ mocks.m_network_state.iter_interfaces.return_value = [{}]
+ with pytest.raises(
+ RuntimeError,
+ match="Failed to detect {} in updated metadata".format(FAKE_MAC),
+ ):
+ handle_hotplug(
+ hotplug_init=mocks.m_init,
+ devpath="/dev/fake",
+ udevaction="add",
+ subsystem="net",
+ )
+
+ def test_detect_hotplugged_device_detected_on_remove(self, mocks):
+ mocks.m_network_state.iter_interfaces.return_value = [
+ {
+ "mac_address": FAKE_MAC,
+ }
+ ]
+ with pytest.raises(
+ RuntimeError, match="Failed to detect .* in updated metadata"
+ ):
+ handle_hotplug(
+ hotplug_init=mocks.m_init,
+ devpath="/dev/fake",
+ udevaction="remove",
+ subsystem="net",
+ )
+
+ def test_apply_failed_on_add(self, mocks):
+ mocks.m_network_state.iter_interfaces.return_value = [
+ {
+ "mac_address": FAKE_MAC,
+ }
+ ]
+ mocks.m_activator.bring_up_interface.return_value = False
+ with pytest.raises(
+ RuntimeError, match="Failed to bring up device: /dev/fake"
+ ):
+ handle_hotplug(
+ hotplug_init=mocks.m_init,
+ devpath="/dev/fake",
+ udevaction="add",
+ subsystem="net",
+ )
+
+ def test_apply_failed_on_remove(self, mocks):
+ mocks.m_network_state.iter_interfaces.return_value = [{}]
+ mocks.m_activator.bring_down_interface.return_value = False
+ with pytest.raises(
+ RuntimeError, match="Failed to bring down device: /dev/fake"
+ ):
+ handle_hotplug(
+ hotplug_init=mocks.m_init,
+ devpath="/dev/fake",
+ udevaction="remove",
+ subsystem="net",
+ )
+
+ def test_retry(self, mocks):
+ with pytest.raises(RuntimeError):
+ handle_hotplug(
+ hotplug_init=mocks.m_init,
+ devpath="/dev/fake",
+ udevaction="add",
+ subsystem="net",
+ )
+ assert mocks.m_sleep.call_count == 5
+ assert mocks.m_sleep.call_args_list == [
+ call(1),
+ call(3),
+ call(5),
+ call(10),
+ call(30),
+ ]
diff --git a/tests/unittests/cmd/devel/test_logs.py b/tests/unittests/cmd/devel/test_logs.py
new file mode 100644
index 00000000..73ed3c65
--- /dev/null
+++ b/tests/unittests/cmd/devel/test_logs.py
@@ -0,0 +1,213 @@
+# This file is part of cloud-init. See LICENSE file for license information.
+
+import os
+from datetime import datetime
+from io import StringIO
+
+from cloudinit.cmd.devel import logs
+from cloudinit.sources import INSTANCE_JSON_SENSITIVE_FILE
+from cloudinit.subp import subp
+from cloudinit.util import ensure_dir, load_file, write_file
+from tests.unittests.helpers import (
+ FilesystemMockingTestCase,
+ mock,
+ wrap_and_call,
+)
+
+
+@mock.patch("cloudinit.cmd.devel.logs.os.getuid")
+class TestCollectLogs(FilesystemMockingTestCase):
+ def setUp(self):
+ super(TestCollectLogs, self).setUp()
+ self.new_root = self.tmp_dir()
+ self.run_dir = self.tmp_path("run", self.new_root)
+
+ def test_collect_logs_with_userdata_requires_root_user(self, m_getuid):
+ """collect-logs errors when non-root user collects userdata ."""
+ m_getuid.return_value = 100 # non-root
+ output_tarfile = self.tmp_path("logs.tgz")
+ with mock.patch("sys.stderr", new_callable=StringIO) as m_stderr:
+ self.assertEqual(
+ 1, logs.collect_logs(output_tarfile, include_userdata=True)
+ )
+ self.assertEqual(
+ "To include userdata, root user is required."
+ " Try sudo cloud-init collect-logs\n",
+ m_stderr.getvalue(),
+ )
+
+ def test_collect_logs_creates_tarfile(self, m_getuid):
+ """collect-logs creates a tarfile with all related cloud-init info."""
+ m_getuid.return_value = 100
+ log1 = self.tmp_path("cloud-init.log", self.new_root)
+ write_file(log1, "cloud-init-log")
+ log2 = self.tmp_path("cloud-init-output.log", self.new_root)
+ write_file(log2, "cloud-init-output-log")
+ ensure_dir(self.run_dir)
+ write_file(self.tmp_path("results.json", self.run_dir), "results")
+ write_file(
+ self.tmp_path(INSTANCE_JSON_SENSITIVE_FILE, self.run_dir),
+ "sensitive",
+ )
+ output_tarfile = self.tmp_path("logs.tgz")
+
+ date = datetime.utcnow().date().strftime("%Y-%m-%d")
+ date_logdir = "cloud-init-logs-{0}".format(date)
+
+ version_out = "/usr/bin/cloud-init 18.2fake\n"
+ expected_subp = {
+ (
+ "dpkg-query",
+ "--show",
+ "-f=${Version}\n",
+ "cloud-init",
+ ): "0.7fake\n",
+ ("cloud-init", "--version"): version_out,
+ ("dmesg",): "dmesg-out\n",
+ ("journalctl", "--boot=0", "-o", "short-precise"): "journal-out\n",
+ ("tar", "czvf", output_tarfile, date_logdir): "",
+ }
+
+ def fake_subp(cmd):
+ cmd_tuple = tuple(cmd)
+ if cmd_tuple not in expected_subp:
+ raise AssertionError(
+ "Unexpected command provided to subp: {0}".format(cmd)
+ )
+ if cmd == ["tar", "czvf", output_tarfile, date_logdir]:
+ subp(cmd) # Pass through tar cmd so we can check output
+ return expected_subp[cmd_tuple], ""
+
+ fake_stderr = mock.MagicMock()
+
+ wrap_and_call(
+ "cloudinit.cmd.devel.logs",
+ {
+ "subp": {"side_effect": fake_subp},
+ "sys.stderr": {"new": fake_stderr},
+ "CLOUDINIT_LOGS": {"new": [log1, log2]},
+ "CLOUDINIT_RUN_DIR": {"new": self.run_dir},
+ },
+ logs.collect_logs,
+ output_tarfile,
+ include_userdata=False,
+ )
+ # unpack the tarfile and check file contents
+ subp(["tar", "zxvf", output_tarfile, "-C", self.new_root])
+ out_logdir = self.tmp_path(date_logdir, self.new_root)
+ self.assertFalse(
+ os.path.exists(
+ os.path.join(
+ out_logdir,
+ "run",
+ "cloud-init",
+ INSTANCE_JSON_SENSITIVE_FILE,
+ )
+ ),
+ "Unexpected file found: %s" % INSTANCE_JSON_SENSITIVE_FILE,
+ )
+ self.assertEqual(
+ "0.7fake\n", load_file(os.path.join(out_logdir, "dpkg-version"))
+ )
+ self.assertEqual(
+ version_out, load_file(os.path.join(out_logdir, "version"))
+ )
+ self.assertEqual(
+ "cloud-init-log",
+ load_file(os.path.join(out_logdir, "cloud-init.log")),
+ )
+ self.assertEqual(
+ "cloud-init-output-log",
+ load_file(os.path.join(out_logdir, "cloud-init-output.log")),
+ )
+ self.assertEqual(
+ "dmesg-out\n", load_file(os.path.join(out_logdir, "dmesg.txt"))
+ )
+ self.assertEqual(
+ "journal-out\n", load_file(os.path.join(out_logdir, "journal.txt"))
+ )
+ self.assertEqual(
+ "results",
+ load_file(
+ os.path.join(out_logdir, "run", "cloud-init", "results.json")
+ ),
+ )
+ fake_stderr.write.assert_any_call("Wrote %s\n" % output_tarfile)
+
+ def test_collect_logs_includes_optional_userdata(self, m_getuid):
+ """collect-logs include userdata when --include-userdata is set."""
+ m_getuid.return_value = 0
+ log1 = self.tmp_path("cloud-init.log", self.new_root)
+ write_file(log1, "cloud-init-log")
+ log2 = self.tmp_path("cloud-init-output.log", self.new_root)
+ write_file(log2, "cloud-init-output-log")
+ userdata = self.tmp_path("user-data.txt", self.new_root)
+ write_file(userdata, "user-data")
+ ensure_dir(self.run_dir)
+ write_file(self.tmp_path("results.json", self.run_dir), "results")
+ write_file(
+ self.tmp_path(INSTANCE_JSON_SENSITIVE_FILE, self.run_dir),
+ "sensitive",
+ )
+ output_tarfile = self.tmp_path("logs.tgz")
+
+ date = datetime.utcnow().date().strftime("%Y-%m-%d")
+ date_logdir = "cloud-init-logs-{0}".format(date)
+
+ version_out = "/usr/bin/cloud-init 18.2fake\n"
+ expected_subp = {
+ (
+ "dpkg-query",
+ "--show",
+ "-f=${Version}\n",
+ "cloud-init",
+ ): "0.7fake",
+ ("cloud-init", "--version"): version_out,
+ ("dmesg",): "dmesg-out\n",
+ ("journalctl", "--boot=0", "-o", "short-precise"): "journal-out\n",
+ ("tar", "czvf", output_tarfile, date_logdir): "",
+ }
+
+ def fake_subp(cmd):
+ cmd_tuple = tuple(cmd)
+ if cmd_tuple not in expected_subp:
+ raise AssertionError(
+ "Unexpected command provided to subp: {0}".format(cmd)
+ )
+ if cmd == ["tar", "czvf", output_tarfile, date_logdir]:
+ subp(cmd) # Pass through tar cmd so we can check output
+ return expected_subp[cmd_tuple], ""
+
+ fake_stderr = mock.MagicMock()
+
+ wrap_and_call(
+ "cloudinit.cmd.devel.logs",
+ {
+ "subp": {"side_effect": fake_subp},
+ "sys.stderr": {"new": fake_stderr},
+ "CLOUDINIT_LOGS": {"new": [log1, log2]},
+ "CLOUDINIT_RUN_DIR": {"new": self.run_dir},
+ "USER_DATA_FILE": {"new": userdata},
+ },
+ logs.collect_logs,
+ output_tarfile,
+ include_userdata=True,
+ )
+ # unpack the tarfile and check file contents
+ subp(["tar", "zxvf", output_tarfile, "-C", self.new_root])
+ out_logdir = self.tmp_path(date_logdir, self.new_root)
+ self.assertEqual(
+ "user-data", load_file(os.path.join(out_logdir, "user-data.txt"))
+ )
+ self.assertEqual(
+ "sensitive",
+ load_file(
+ os.path.join(
+ out_logdir,
+ "run",
+ "cloud-init",
+ INSTANCE_JSON_SENSITIVE_FILE,
+ )
+ ),
+ )
+ fake_stderr.write.assert_any_call("Wrote %s\n" % output_tarfile)
diff --git a/tests/unittests/cmd/devel/test_render.py b/tests/unittests/cmd/devel/test_render.py
new file mode 100644
index 00000000..4afc64f0
--- /dev/null
+++ b/tests/unittests/cmd/devel/test_render.py
@@ -0,0 +1,154 @@
+# This file is part of cloud-init. See LICENSE file for license information.
+
+import os
+from collections import namedtuple
+from io import StringIO
+
+from cloudinit.cmd.devel import render
+from cloudinit.helpers import Paths
+from cloudinit.sources import INSTANCE_JSON_FILE, INSTANCE_JSON_SENSITIVE_FILE
+from cloudinit.util import ensure_dir, write_file
+from tests.unittests.helpers import CiTestCase, mock, skipUnlessJinja
+
+
+class TestRender(CiTestCase):
+
+ with_logs = True
+
+ args = namedtuple("renderargs", "user_data instance_data debug")
+
+ def setUp(self):
+ super(TestRender, self).setUp()
+ self.tmp = self.tmp_dir()
+
+ def test_handle_args_error_on_missing_user_data(self):
+ """When user_data file path does not exist, log an error."""
+ absent_file = self.tmp_path("user-data", dir=self.tmp)
+ instance_data = self.tmp_path("instance-data", dir=self.tmp)
+ write_file(instance_data, "{}")
+ args = self.args(
+ user_data=absent_file, instance_data=instance_data, debug=False
+ )
+ with mock.patch("sys.stderr", new_callable=StringIO):
+ self.assertEqual(1, render.handle_args("anyname", args))
+ self.assertIn(
+ "Missing user-data file: %s" % absent_file, self.logs.getvalue()
+ )
+
+ def test_handle_args_error_on_missing_instance_data(self):
+ """When instance_data file path does not exist, log an error."""
+ user_data = self.tmp_path("user-data", dir=self.tmp)
+ absent_file = self.tmp_path("instance-data", dir=self.tmp)
+ args = self.args(
+ user_data=user_data, instance_data=absent_file, debug=False
+ )
+ with mock.patch("sys.stderr", new_callable=StringIO):
+ self.assertEqual(1, render.handle_args("anyname", args))
+ self.assertIn(
+ "Missing instance-data.json file: %s" % absent_file,
+ self.logs.getvalue(),
+ )
+
+ def test_handle_args_defaults_instance_data(self):
+ """When no instance_data argument, default to configured run_dir."""
+ user_data = self.tmp_path("user-data", dir=self.tmp)
+ run_dir = self.tmp_path("run_dir", dir=self.tmp)
+ ensure_dir(run_dir)
+ paths = Paths({"run_dir": run_dir})
+ self.add_patch("cloudinit.cmd.devel.render.read_cfg_paths", "m_paths")
+ self.m_paths.return_value = paths
+ args = self.args(user_data=user_data, instance_data=None, debug=False)
+ with mock.patch("sys.stderr", new_callable=StringIO):
+ self.assertEqual(1, render.handle_args("anyname", args))
+ json_file = os.path.join(run_dir, INSTANCE_JSON_FILE)
+ self.assertIn(
+ "Missing instance-data.json file: %s" % json_file,
+ self.logs.getvalue(),
+ )
+
+ def test_handle_args_root_fallback_from_sensitive_instance_data(self):
+ """When root user defaults to sensitive.json."""
+ user_data = self.tmp_path("user-data", dir=self.tmp)
+ run_dir = self.tmp_path("run_dir", dir=self.tmp)
+ ensure_dir(run_dir)
+ paths = Paths({"run_dir": run_dir})
+ self.add_patch("cloudinit.cmd.devel.render.read_cfg_paths", "m_paths")
+ self.m_paths.return_value = paths
+ args = self.args(user_data=user_data, instance_data=None, debug=False)
+ with mock.patch("sys.stderr", new_callable=StringIO):
+ with mock.patch("os.getuid") as m_getuid:
+ m_getuid.return_value = 0
+ self.assertEqual(1, render.handle_args("anyname", args))
+ json_file = os.path.join(run_dir, INSTANCE_JSON_FILE)
+ json_sensitive = os.path.join(run_dir, INSTANCE_JSON_SENSITIVE_FILE)
+ self.assertIn(
+ "WARNING: Missing root-readable %s. Using redacted %s"
+ % (json_sensitive, json_file),
+ self.logs.getvalue(),
+ )
+ self.assertIn(
+ "ERROR: Missing instance-data.json file: %s" % json_file,
+ self.logs.getvalue(),
+ )
+
+ def test_handle_args_root_uses_sensitive_instance_data(self):
+ """When root user, and no instance-data arg, use sensitive.json."""
+ user_data = self.tmp_path("user-data", dir=self.tmp)
+ write_file(user_data, "##template: jinja\nrendering: {{ my_var }}")
+ run_dir = self.tmp_path("run_dir", dir=self.tmp)
+ ensure_dir(run_dir)
+ json_sensitive = os.path.join(run_dir, INSTANCE_JSON_SENSITIVE_FILE)
+ write_file(json_sensitive, '{"my-var": "jinja worked"}')
+ paths = Paths({"run_dir": run_dir})
+ self.add_patch("cloudinit.cmd.devel.render.read_cfg_paths", "m_paths")
+ self.m_paths.return_value = paths
+ args = self.args(user_data=user_data, instance_data=None, debug=False)
+ with mock.patch("sys.stderr", new_callable=StringIO):
+ with mock.patch("sys.stdout", new_callable=StringIO) as m_stdout:
+ with mock.patch("os.getuid") as m_getuid:
+ m_getuid.return_value = 0
+ self.assertEqual(0, render.handle_args("anyname", args))
+ self.assertIn("rendering: jinja worked", m_stdout.getvalue())
+
+ @skipUnlessJinja()
+ def test_handle_args_renders_instance_data_vars_in_template(self):
+ """If user_data file is a jinja template render instance-data vars."""
+ user_data = self.tmp_path("user-data", dir=self.tmp)
+ write_file(user_data, "##template: jinja\nrendering: {{ my_var }}")
+ instance_data = self.tmp_path("instance-data", dir=self.tmp)
+ write_file(instance_data, '{"my-var": "jinja worked"}')
+ args = self.args(
+ user_data=user_data, instance_data=instance_data, debug=True
+ )
+ with mock.patch("sys.stderr", new_callable=StringIO) as m_console_err:
+ with mock.patch("sys.stdout", new_callable=StringIO) as m_stdout:
+ self.assertEqual(0, render.handle_args("anyname", args))
+ self.assertIn(
+ "DEBUG: Converted jinja variables\n{", self.logs.getvalue()
+ )
+ self.assertIn(
+ "DEBUG: Converted jinja variables\n{", m_console_err.getvalue()
+ )
+ self.assertEqual("rendering: jinja worked", m_stdout.getvalue())
+
+ @skipUnlessJinja()
+ def test_handle_args_warns_and_gives_up_on_invalid_jinja_operation(self):
+ """If user_data file has invalid jinja operations log warnings."""
+ user_data = self.tmp_path("user-data", dir=self.tmp)
+ write_file(user_data, "##template: jinja\nrendering: {{ my-var }}")
+ instance_data = self.tmp_path("instance-data", dir=self.tmp)
+ write_file(instance_data, '{"my-var": "jinja worked"}')
+ args = self.args(
+ user_data=user_data, instance_data=instance_data, debug=True
+ )
+ with mock.patch("sys.stderr", new_callable=StringIO):
+ self.assertEqual(1, render.handle_args("anyname", args))
+ self.assertIn(
+ "WARNING: Ignoring jinja template for %s: Undefined jinja"
+ ' variable: "my-var". Jinja tried subtraction. Perhaps you meant'
+ ' "my_var"?' % user_data,
+ self.logs.getvalue(),
+ )
+
+
+# vi: ts=4 expandtab
diff --git a/tests/unittests/cmd/test_clean.py b/tests/unittests/cmd/test_clean.py
new file mode 100644
index 00000000..7d12017e
--- /dev/null
+++ b/tests/unittests/cmd/test_clean.py
@@ -0,0 +1,211 @@
+# This file is part of cloud-init. See LICENSE file for license information.
+
+import os
+from collections import namedtuple
+from io import StringIO
+
+from cloudinit.cmd import clean
+from cloudinit.util import ensure_dir, sym_link, write_file
+from tests.unittests.helpers import CiTestCase, mock, wrap_and_call
+
+mypaths = namedtuple("MyPaths", "cloud_dir")
+
+
+class TestClean(CiTestCase):
+ def setUp(self):
+ super(TestClean, self).setUp()
+ self.new_root = self.tmp_dir()
+ self.artifact_dir = self.tmp_path("artifacts", self.new_root)
+ self.log1 = self.tmp_path("cloud-init.log", self.new_root)
+ self.log2 = self.tmp_path("cloud-init-output.log", self.new_root)
+
+ class FakeInit(object):
+ cfg = {
+ "def_log_file": self.log1,
+ "output": {"all": "|tee -a {0}".format(self.log2)},
+ }
+ # Ensure cloud_dir has a trailing slash, to match real behaviour
+ paths = mypaths(cloud_dir="{}/".format(self.artifact_dir))
+
+ def __init__(self, ds_deps):
+ pass
+
+ def read_cfg(self):
+ pass
+
+ self.init_class = FakeInit
+
+ def test_remove_artifacts_removes_logs(self):
+ """remove_artifacts removes logs when remove_logs is True."""
+ write_file(self.log1, "cloud-init-log")
+ write_file(self.log2, "cloud-init-output-log")
+
+ self.assertFalse(
+ os.path.exists(self.artifact_dir), "Unexpected artifacts dir"
+ )
+ retcode = wrap_and_call(
+ "cloudinit.cmd.clean",
+ {"Init": {"side_effect": self.init_class}},
+ clean.remove_artifacts,
+ remove_logs=True,
+ )
+ self.assertFalse(os.path.exists(self.log1), "Unexpected file")
+ self.assertFalse(os.path.exists(self.log2), "Unexpected file")
+ self.assertEqual(0, retcode)
+
+ def test_remove_artifacts_preserves_logs(self):
+ """remove_artifacts leaves logs when remove_logs is False."""
+ write_file(self.log1, "cloud-init-log")
+ write_file(self.log2, "cloud-init-output-log")
+
+ retcode = wrap_and_call(
+ "cloudinit.cmd.clean",
+ {"Init": {"side_effect": self.init_class}},
+ clean.remove_artifacts,
+ remove_logs=False,
+ )
+ self.assertTrue(os.path.exists(self.log1), "Missing expected file")
+ self.assertTrue(os.path.exists(self.log2), "Missing expected file")
+ self.assertEqual(0, retcode)
+
+ def test_remove_artifacts_removes_unlinks_symlinks(self):
+ """remove_artifacts cleans artifacts dir unlinking any symlinks."""
+ dir1 = os.path.join(self.artifact_dir, "dir1")
+ ensure_dir(dir1)
+ symlink = os.path.join(self.artifact_dir, "mylink")
+ sym_link(dir1, symlink)
+
+ retcode = wrap_and_call(
+ "cloudinit.cmd.clean",
+ {"Init": {"side_effect": self.init_class}},
+ clean.remove_artifacts,
+ remove_logs=False,
+ )
+ self.assertEqual(0, retcode)
+ for path in (dir1, symlink):
+ self.assertFalse(
+ os.path.exists(path), "Unexpected {0} dir".format(path)
+ )
+
+ def test_remove_artifacts_removes_artifacts_skipping_seed(self):
+ """remove_artifacts cleans artifacts dir with exception of seed dir."""
+ dirs = [
+ self.artifact_dir,
+ os.path.join(self.artifact_dir, "seed"),
+ os.path.join(self.artifact_dir, "dir1"),
+ os.path.join(self.artifact_dir, "dir2"),
+ ]
+ for _dir in dirs:
+ ensure_dir(_dir)
+
+ retcode = wrap_and_call(
+ "cloudinit.cmd.clean",
+ {"Init": {"side_effect": self.init_class}},
+ clean.remove_artifacts,
+ remove_logs=False,
+ )
+ self.assertEqual(0, retcode)
+ for expected_dir in dirs[:2]:
+ self.assertTrue(
+ os.path.exists(expected_dir),
+ "Missing {0} dir".format(expected_dir),
+ )
+ for deleted_dir in dirs[2:]:
+ self.assertFalse(
+ os.path.exists(deleted_dir),
+ "Unexpected {0} dir".format(deleted_dir),
+ )
+
+ def test_remove_artifacts_removes_artifacts_removes_seed(self):
+ """remove_artifacts removes seed dir when remove_seed is True."""
+ dirs = [
+ self.artifact_dir,
+ os.path.join(self.artifact_dir, "seed"),
+ os.path.join(self.artifact_dir, "dir1"),
+ os.path.join(self.artifact_dir, "dir2"),
+ ]
+ for _dir in dirs:
+ ensure_dir(_dir)
+
+ retcode = wrap_and_call(
+ "cloudinit.cmd.clean",
+ {"Init": {"side_effect": self.init_class}},
+ clean.remove_artifacts,
+ remove_logs=False,
+ remove_seed=True,
+ )
+ self.assertEqual(0, retcode)
+ self.assertTrue(
+ os.path.exists(self.artifact_dir), "Missing artifact dir"
+ )
+ for deleted_dir in dirs[1:]:
+ self.assertFalse(
+ os.path.exists(deleted_dir),
+ "Unexpected {0} dir".format(deleted_dir),
+ )
+
+ def test_remove_artifacts_returns_one_on_errors(self):
+ """remove_artifacts returns non-zero on failure and prints an error."""
+ ensure_dir(self.artifact_dir)
+ ensure_dir(os.path.join(self.artifact_dir, "dir1"))
+
+ with mock.patch("sys.stderr", new_callable=StringIO) as m_stderr:
+ retcode = wrap_and_call(
+ "cloudinit.cmd.clean",
+ {
+ "del_dir": {"side_effect": OSError("oops")},
+ "Init": {"side_effect": self.init_class},
+ },
+ clean.remove_artifacts,
+ remove_logs=False,
+ )
+ self.assertEqual(1, retcode)
+ self.assertEqual(
+ "Error:\nCould not remove %s/dir1: oops\n" % self.artifact_dir,
+ m_stderr.getvalue(),
+ )
+
+ def test_handle_clean_args_reboots(self):
+ """handle_clean_args_reboots when reboot arg is provided."""
+
+ called_cmds = []
+
+ def fake_subp(cmd, capture):
+ called_cmds.append((cmd, capture))
+ return "", ""
+
+ myargs = namedtuple("MyArgs", "remove_logs remove_seed reboot")
+ cmdargs = myargs(remove_logs=False, remove_seed=False, reboot=True)
+ retcode = wrap_and_call(
+ "cloudinit.cmd.clean",
+ {
+ "subp": {"side_effect": fake_subp},
+ "Init": {"side_effect": self.init_class},
+ },
+ clean.handle_clean_args,
+ name="does not matter",
+ args=cmdargs,
+ )
+ self.assertEqual(0, retcode)
+ self.assertEqual([(["shutdown", "-r", "now"], False)], called_cmds)
+
+ def test_status_main(self):
+ """clean.main can be run as a standalone script."""
+ write_file(self.log1, "cloud-init-log")
+ with self.assertRaises(SystemExit) as context_manager:
+ wrap_and_call(
+ "cloudinit.cmd.clean",
+ {
+ "Init": {"side_effect": self.init_class},
+ "sys.argv": {"new": ["clean", "--logs"]},
+ },
+ clean.main,
+ )
+
+ self.assertEqual(0, context_manager.exception.code)
+ self.assertFalse(
+ os.path.exists(self.log1), "Unexpected log {0}".format(self.log1)
+ )
+
+
+# vi: ts=4 expandtab syntax=python
diff --git a/tests/unittests/cmd/test_cloud_id.py b/tests/unittests/cmd/test_cloud_id.py
new file mode 100644
index 00000000..907297a6
--- /dev/null
+++ b/tests/unittests/cmd/test_cloud_id.py
@@ -0,0 +1,187 @@
+# This file is part of cloud-init. See LICENSE file for license information.
+
+"""Tests for cloud-id command line utility."""
+
+from collections import namedtuple
+
+import pytest
+
+from cloudinit import util
+from cloudinit.cmd import cloud_id
+from tests.unittests.helpers import mock
+
+M_PATH = "cloudinit.cmd.cloud_id."
+
+
+class TestCloudId:
+
+ args = namedtuple("cloudidargs", "instance_data json long")
+
+ def test_cloud_id_arg_parser_defaults(self):
+ """Validate the argument defaults when not provided by the end-user."""
+ cmd = ["cloud-id"]
+ with mock.patch("sys.argv", cmd):
+ args = cloud_id.get_parser().parse_args()
+ assert "/run/cloud-init/instance-data.json" == args.instance_data
+ assert False is args.long
+ assert False is args.json
+
+ def test_cloud_id_arg_parse_overrides(self, tmpdir):
+ """Override argument defaults by specifying values for each param."""
+ instance_data = tmpdir.join("instance-data.json")
+ instance_data.write("{}")
+ cmd = [
+ "cloud-id",
+ "--instance-data",
+ instance_data.strpath,
+ "--long",
+ "--json",
+ ]
+ with mock.patch("sys.argv", cmd):
+ args = cloud_id.get_parser().parse_args()
+ assert instance_data.strpath == args.instance_data
+ assert True is args.long
+ assert True is args.json
+
+ @mock.patch(M_PATH + "get_status_details")
+ def test_cloud_id_missing_instance_data_json(
+ self, get_status_details, tmpdir, capsys
+ ):
+ """Exit error when the provided instance-data.json does not exist."""
+ get_status_details.return_value = cloud_id.UXAppStatus.DONE, "n/a", ""
+ instance_data = tmpdir.join("instance-data.json")
+ cmd = ["cloud-id", "--instance-data", instance_data.strpath]
+ with mock.patch("sys.argv", cmd):
+ with pytest.raises(SystemExit) as context_manager:
+ cloud_id.main()
+ assert 1 == context_manager.value.code
+ _out, err = capsys.readouterr()
+ assert "Error:\nFile not found '%s'" % instance_data.strpath in err
+
+ @mock.patch(M_PATH + "get_status_details")
+ def test_cloud_id_non_json_instance_data(
+ self, get_status_details, tmpdir, capsys
+ ):
+ """Exit error when the provided instance-data.json is not json."""
+ get_status_details.return_value = cloud_id.UXAppStatus.DONE, "n/a", ""
+ instance_data = tmpdir.join("instance-data.json")
+ cmd = ["cloud-id", "--instance-data", instance_data.strpath]
+ instance_data.write("{")
+ with mock.patch("sys.argv", cmd):
+ with pytest.raises(SystemExit) as context_manager:
+ cloud_id.main()
+ assert 1 == context_manager.value.code
+ _out, err = capsys.readouterr()
+ assert (
+ "Error:\nFile '%s' is not valid json." % instance_data.strpath
+ in err
+ )
+
+ @mock.patch(M_PATH + "get_status_details")
+ def test_cloud_id_from_cloud_name_in_instance_data(
+ self, get_status_details, tmpdir, capsys
+ ):
+ """Report canonical cloud-id from cloud_name in instance-data."""
+ instance_data = tmpdir.join("instance-data.json")
+ get_status_details.return_value = cloud_id.UXAppStatus.DONE, "n/a", ""
+ instance_data.write(
+ '{"v1": {"cloud_name": "mycloud", "region": "somereg"}}',
+ )
+ cmd = ["cloud-id", "--instance-data", instance_data.strpath]
+ with mock.patch("sys.argv", cmd):
+ with pytest.raises(SystemExit) as context_manager:
+ cloud_id.main()
+ assert 0 == context_manager.value.code
+ out, _err = capsys.readouterr()
+ assert "mycloud\n" == out
+
+ @mock.patch(M_PATH + "get_status_details")
+ def test_cloud_id_long_name_from_instance_data(
+ self, get_status_details, tmpdir, capsys
+ ):
+ """Report long cloud-id format from cloud_name and region."""
+ get_status_details.return_value = cloud_id.UXAppStatus.DONE, "n/a", ""
+ instance_data = tmpdir.join("instance-data.json")
+ instance_data.write(
+ '{"v1": {"cloud_name": "mycloud", "region": "somereg"}}',
+ )
+ cmd = ["cloud-id", "--instance-data", instance_data.strpath, "--long"]
+ with mock.patch("sys.argv", cmd):
+ with pytest.raises(SystemExit) as context_manager:
+ cloud_id.main()
+ out, _err = capsys.readouterr()
+ assert 0 == context_manager.value.code
+ assert "mycloud\tsomereg\n" == out
+
+ @mock.patch(M_PATH + "get_status_details")
+ def test_cloud_id_lookup_from_instance_data_region(
+ self, get_status_details, tmpdir, capsys
+ ):
+ """Report discovered canonical cloud_id when region lookup matches."""
+ get_status_details.return_value = cloud_id.UXAppStatus.DONE, "n/a", ""
+ instance_data = tmpdir.join("instance-data.json")
+ instance_data.write(
+ '{"v1": {"cloud_name": "aws", "region": "cn-north-1",'
+ ' "platform": "ec2"}}',
+ )
+ cmd = ["cloud-id", "--instance-data", instance_data.strpath, "--long"]
+ with mock.patch("sys.argv", cmd):
+ with pytest.raises(SystemExit) as context_manager:
+ cloud_id.main()
+ assert 0 == context_manager.value.code
+ out, _err = capsys.readouterr()
+ assert "aws-china\tcn-north-1\n" == out
+
+ @mock.patch(M_PATH + "get_status_details")
+ def test_cloud_id_lookup_json_instance_data_adds_cloud_id_to_json(
+ self, get_status_details, tmpdir, capsys
+ ):
+ """Report v1 instance-data content with cloud_id when --json set."""
+ get_status_details.return_value = cloud_id.UXAppStatus.DONE, "n/a", ""
+ instance_data = tmpdir.join("instance-data.json")
+ instance_data.write(
+ '{"v1": {"cloud_name": "unknown", "region": "dfw",'
+ ' "platform": "openstack", "public_ssh_keys": []}}',
+ )
+ expected = util.json_dumps(
+ {
+ "cloud_id": "openstack",
+ "cloud_name": "unknown",
+ "platform": "openstack",
+ "public_ssh_keys": [],
+ "region": "dfw",
+ }
+ )
+ cmd = ["cloud-id", "--instance-data", instance_data.strpath, "--json"]
+ with mock.patch("sys.argv", cmd):
+ with pytest.raises(SystemExit) as context_manager:
+ cloud_id.main()
+ out, _err = capsys.readouterr()
+ assert 0 == context_manager.value.code
+ assert expected + "\n" == out
+
+ @pytest.mark.parametrize(
+ "status, exit_code",
+ (
+ (cloud_id.UXAppStatus.DISABLED, 2),
+ (cloud_id.UXAppStatus.NOT_RUN, 3),
+ (cloud_id.UXAppStatus.RUNNING, 0),
+ ),
+ )
+ @mock.patch(M_PATH + "get_status_details")
+ def test_cloud_id_unique_exit_codes_for_status(
+ self, get_status_details, status, exit_code, tmpdir, capsys
+ ):
+ """cloud-id returns unique exit codes for status."""
+ get_status_details.return_value = status, "n/a", ""
+ instance_data = tmpdir.join("instance-data.json")
+ if status == cloud_id.UXAppStatus.RUNNING:
+ instance_data.write("{}")
+ cmd = ["cloud-id", "--instance-data", instance_data.strpath, "--json"]
+ with mock.patch("sys.argv", cmd):
+ with pytest.raises(SystemExit) as context_manager:
+ cloud_id.main()
+ assert exit_code == context_manager.value.code
+
+
+# vi: ts=4 expandtab
diff --git a/tests/unittests/cmd/test_main.py b/tests/unittests/cmd/test_main.py
new file mode 100644
index 00000000..3e778b0b
--- /dev/null
+++ b/tests/unittests/cmd/test_main.py
@@ -0,0 +1,241 @@
+# This file is part of cloud-init. See LICENSE file for license information.
+
+import copy
+import os
+from collections import namedtuple
+from io import StringIO
+from unittest import mock
+
+import pytest
+
+from cloudinit import safeyaml
+from cloudinit.cmd import main
+from cloudinit.util import ensure_dir, load_file, write_file
+from tests.unittests.helpers import FilesystemMockingTestCase, wrap_and_call
+
+mypaths = namedtuple("MyPaths", "run_dir")
+myargs = namedtuple("MyArgs", "debug files force local reporter subcommand")
+
+
+class TestMain(FilesystemMockingTestCase):
+ with_logs = True
+ allowed_subp = False
+
+ def setUp(self):
+ super(TestMain, self).setUp()
+ self.new_root = self.tmp_dir()
+ self.cloud_dir = self.tmp_path("var/lib/cloud/", dir=self.new_root)
+ os.makedirs(self.cloud_dir)
+ self.replicateTestRoot("simple_ubuntu", self.new_root)
+ self.cfg = {
+ "datasource_list": ["None"],
+ "runcmd": ["ls /etc"], # test ALL_DISTROS
+ "system_info": {
+ "paths": {
+ "cloud_dir": self.cloud_dir,
+ "run_dir": self.new_root,
+ }
+ },
+ "write_files": [
+ {
+ "path": "/etc/blah.ini",
+ "content": "blah",
+ "permissions": 0o755,
+ },
+ ],
+ "cloud_init_modules": ["write-files", "runcmd"],
+ }
+ cloud_cfg = safeyaml.dumps(self.cfg)
+ ensure_dir(os.path.join(self.new_root, "etc", "cloud"))
+ self.cloud_cfg_file = os.path.join(
+ self.new_root, "etc", "cloud", "cloud.cfg"
+ )
+ write_file(self.cloud_cfg_file, cloud_cfg)
+ self.patchOS(self.new_root)
+ self.patchUtils(self.new_root)
+ self.stderr = StringIO()
+ self.patchStdoutAndStderr(stderr=self.stderr)
+
+ def test_main_init_run_net_stops_on_file_no_net(self):
+ """When no-net file is present, main_init does not process modules."""
+ stop_file = os.path.join(self.cloud_dir, "data", "no-net") # stop file
+ write_file(stop_file, "")
+ cmdargs = myargs(
+ debug=False,
+ files=None,
+ force=False,
+ local=False,
+ reporter=None,
+ subcommand="init",
+ )
+ (_item1, item2) = wrap_and_call(
+ "cloudinit.cmd.main",
+ {
+ "util.close_stdin": True,
+ "netinfo.debug_info": "my net debug info",
+ "util.fixup_output": ("outfmt", "errfmt"),
+ },
+ main.main_init,
+ "init",
+ cmdargs,
+ )
+ # We should not run write_files module
+ self.assertFalse(
+ os.path.exists(os.path.join(self.new_root, "etc/blah.ini")),
+ "Unexpected run of write_files module produced blah.ini",
+ )
+ self.assertEqual([], item2)
+ # Instancify is called
+ instance_id_path = "var/lib/cloud/data/instance-id"
+ self.assertFalse(
+ os.path.exists(os.path.join(self.new_root, instance_id_path)),
+ "Unexpected call to datasource.instancify produced instance-id",
+ )
+ expected_logs = [
+ "Exiting. stop file ['{stop_file}'] existed\n".format(
+ stop_file=stop_file
+ ),
+ "my net debug info", # netinfo.debug_info
+ ]
+ for log in expected_logs:
+ self.assertIn(log, self.stderr.getvalue())
+
+ def test_main_init_run_net_runs_modules(self):
+ """Modules like write_files are run in 'net' mode."""
+ cmdargs = myargs(
+ debug=False,
+ files=None,
+ force=False,
+ local=False,
+ reporter=None,
+ subcommand="init",
+ )
+ (_item1, item2) = wrap_and_call(
+ "cloudinit.cmd.main",
+ {
+ "util.close_stdin": True,
+ "netinfo.debug_info": "my net debug info",
+ "util.fixup_output": ("outfmt", "errfmt"),
+ },
+ main.main_init,
+ "init",
+ cmdargs,
+ )
+ self.assertEqual([], item2)
+ # Instancify is called
+ instance_id_path = "var/lib/cloud/data/instance-id"
+ self.assertEqual(
+ "iid-datasource-none\n",
+ os.path.join(
+ load_file(os.path.join(self.new_root, instance_id_path))
+ ),
+ )
+ # modules are run (including write_files)
+ self.assertEqual(
+ "blah", load_file(os.path.join(self.new_root, "etc/blah.ini"))
+ )
+ expected_logs = [
+ "network config is disabled by fallback", # apply_network_config
+ "my net debug info", # netinfo.debug_info
+ "no previous run detected",
+ ]
+ for log in expected_logs:
+ self.assertIn(log, self.stderr.getvalue())
+
+ def test_main_init_run_net_calls_set_hostname_when_metadata_present(self):
+ """When local-hostname metadata is present, call cc_set_hostname."""
+ self.cfg["datasource"] = {
+ "None": {"metadata": {"local-hostname": "md-hostname"}}
+ }
+ cloud_cfg = safeyaml.dumps(self.cfg)
+ write_file(self.cloud_cfg_file, cloud_cfg)
+ cmdargs = myargs(
+ debug=False,
+ files=None,
+ force=False,
+ local=False,
+ reporter=None,
+ subcommand="init",
+ )
+
+ def set_hostname(name, cfg, cloud, log, args):
+ self.assertEqual("set-hostname", name)
+ updated_cfg = copy.deepcopy(self.cfg)
+ updated_cfg.update(
+ {
+ "def_log_file": "/var/log/cloud-init.log",
+ "log_cfgs": [],
+ "syslog_fix_perms": [
+ "syslog:adm",
+ "root:adm",
+ "root:wheel",
+ "root:root",
+ ],
+ "vendor_data": {"enabled": True, "prefix": []},
+ "vendor_data2": {"enabled": True, "prefix": []},
+ }
+ )
+ updated_cfg.pop("system_info")
+
+ self.assertEqual(updated_cfg, cfg)
+ self.assertEqual(main.LOG, log)
+ self.assertIsNone(args)
+
+ (_item1, item2) = wrap_and_call(
+ "cloudinit.cmd.main",
+ {
+ "util.close_stdin": True,
+ "netinfo.debug_info": "my net debug info",
+ "cc_set_hostname.handle": {"side_effect": set_hostname},
+ "util.fixup_output": ("outfmt", "errfmt"),
+ },
+ main.main_init,
+ "init",
+ cmdargs,
+ )
+ self.assertEqual([], item2)
+ # Instancify is called
+ instance_id_path = "var/lib/cloud/data/instance-id"
+ self.assertEqual(
+ "iid-datasource-none\n",
+ os.path.join(
+ load_file(os.path.join(self.new_root, instance_id_path))
+ ),
+ )
+ # modules are run (including write_files)
+ self.assertEqual(
+ "blah", load_file(os.path.join(self.new_root, "etc/blah.ini"))
+ )
+ expected_logs = [
+ "network config is disabled by fallback", # apply_network_config
+ "my net debug info", # netinfo.debug_info
+ "no previous run detected",
+ ]
+ for log in expected_logs:
+ self.assertIn(log, self.stderr.getvalue())
+
+
+class TestShouldBringUpInterfaces:
+ @pytest.mark.parametrize(
+ "cfg_disable,args_local,expected",
+ [
+ (True, True, False),
+ (True, False, False),
+ (False, True, False),
+ (False, False, True),
+ ],
+ )
+ def test_should_bring_up_interfaces(
+ self, cfg_disable, args_local, expected
+ ):
+ init = mock.Mock()
+ init.cfg = {"disable_network_activation": cfg_disable}
+
+ args = mock.Mock()
+ args.local = args_local
+
+ result = main._should_bring_up_interfaces(init, args)
+ assert result == expected
+
+
+# vi: ts=4 expandtab
diff --git a/tests/unittests/cmd/test_query.py b/tests/unittests/cmd/test_query.py
new file mode 100644
index 00000000..03a73bb5
--- /dev/null
+++ b/tests/unittests/cmd/test_query.py
@@ -0,0 +1,537 @@
+# This file is part of cloud-init. See LICENSE file for license information.
+
+import errno
+import gzip
+import json
+import os
+from collections import namedtuple
+from io import BytesIO
+from textwrap import dedent
+
+import pytest
+
+from cloudinit.cmd import query
+from cloudinit.helpers import Paths
+from cloudinit.sources import (
+ INSTANCE_JSON_FILE,
+ INSTANCE_JSON_SENSITIVE_FILE,
+ REDACT_SENSITIVE_VALUE,
+)
+from cloudinit.util import b64e, write_file
+from tests.unittests.helpers import mock
+
+
+def _gzip_data(data):
+ with BytesIO() as iobuf:
+ with gzip.GzipFile(mode="wb", fileobj=iobuf) as gzfp:
+ gzfp.write(data)
+ return iobuf.getvalue()
+
+
+@mock.patch("cloudinit.cmd.query.addLogHandlerCLI", lambda *args: "")
+class TestQuery:
+
+ args = namedtuple(
+ "queryargs",
+ "debug dump_all format instance_data list_keys user_data vendor_data"
+ " varname",
+ )
+
+ def _setup_paths(self, tmpdir, ud_val=None, vd_val=None):
+ """Write userdata and vendordata into a tmpdir.
+
+ Return:
+ 4-tuple : (paths, run_dir_path, userdata_path, vendordata_path)
+ """
+ if ud_val:
+ user_data = tmpdir.join("user-data")
+ write_file(user_data.strpath, ud_val)
+ else:
+ user_data = None
+ if vd_val:
+ vendor_data = tmpdir.join("vendor-data")
+ write_file(vendor_data.strpath, vd_val)
+ else:
+ vendor_data = None
+ run_dir = tmpdir.join("run_dir")
+ run_dir.ensure_dir()
+
+ cloud_dir = tmpdir.join("cloud_dir")
+ cloud_dir.ensure_dir()
+
+ return (
+ Paths(
+ {"cloud_dir": cloud_dir.strpath, "run_dir": run_dir.strpath}
+ ),
+ run_dir,
+ user_data,
+ vendor_data,
+ )
+
+ def test_handle_args_error_on_missing_param(self, caplog, capsys):
+ """Error when missing required parameters and print usage."""
+ args = self.args(
+ debug=False,
+ dump_all=False,
+ format=None,
+ instance_data=None,
+ list_keys=False,
+ user_data=None,
+ vendor_data=None,
+ varname=None,
+ )
+ with mock.patch(
+ "cloudinit.cmd.query.addLogHandlerCLI", return_value=""
+ ) as m_cli_log:
+ assert 1 == query.handle_args("anyname", args)
+ expected_error = (
+ "Expected one of the options: --all, --format, --list-keys"
+ " or varname\n"
+ )
+ assert expected_error in caplog.text
+ out, _err = capsys.readouterr()
+ assert "usage: query" in out
+ assert 1 == m_cli_log.call_count
+
+ @pytest.mark.parametrize(
+ "inst_data,varname,expected_error",
+ (
+ (
+ '{"v1": {"key-2": "value-2"}}',
+ "v1.absent_leaf",
+ "instance-data 'v1' has no 'absent_leaf'\n",
+ ),
+ (
+ '{"v1": {"key-2": "value-2"}}',
+ "absent_key",
+ "Undefined instance-data key 'absent_key'\n",
+ ),
+ ),
+ )
+ def test_handle_args_error_on_invalid_vaname_paths(
+ self, inst_data, varname, expected_error, caplog, tmpdir
+ ):
+ """Error when varname is not a valid instance-data variable path."""
+ instance_data = tmpdir.join("instance-data")
+ instance_data.write(inst_data)
+ args = self.args(
+ debug=False,
+ dump_all=False,
+ format=None,
+ instance_data=instance_data.strpath,
+ list_keys=False,
+ user_data=None,
+ vendor_data=None,
+ varname=varname,
+ )
+ paths, _, _, _ = self._setup_paths(tmpdir)
+ with mock.patch("cloudinit.cmd.query.read_cfg_paths") as m_paths:
+ m_paths.return_value = paths
+ with mock.patch(
+ "cloudinit.cmd.query.addLogHandlerCLI", return_value=""
+ ):
+ with mock.patch("cloudinit.cmd.query.load_userdata") as m_lud:
+ m_lud.return_value = "ud"
+ assert 1 == query.handle_args("anyname", args)
+ assert expected_error in caplog.text
+
+ def test_handle_args_error_on_missing_instance_data(self, caplog, tmpdir):
+ """When instance_data file path does not exist, log an error."""
+ absent_fn = tmpdir.join("absent")
+ args = self.args(
+ debug=False,
+ dump_all=True,
+ format=None,
+ instance_data=absent_fn.strpath,
+ list_keys=False,
+ user_data="ud",
+ vendor_data="vd",
+ varname=None,
+ )
+ assert 1 == query.handle_args("anyname", args)
+
+ msg = "Missing instance-data file: %s" % absent_fn
+ assert msg in caplog.text
+
+ def test_handle_args_error_when_no_read_permission_instance_data(
+ self, caplog, tmpdir
+ ):
+ """When instance_data file is unreadable, log an error."""
+ noread_fn = tmpdir.join("unreadable")
+ noread_fn.write("thou shall not pass")
+ args = self.args(
+ debug=False,
+ dump_all=True,
+ format=None,
+ instance_data=noread_fn.strpath,
+ list_keys=False,
+ user_data="ud",
+ vendor_data="vd",
+ varname=None,
+ )
+ with mock.patch("cloudinit.cmd.query.util.load_file") as m_load:
+ m_load.side_effect = OSError(errno.EACCES, "Not allowed")
+ assert 1 == query.handle_args("anyname", args)
+ msg = "No read permission on '%s'. Try sudo" % noread_fn
+ assert msg in caplog.text
+
+ def test_handle_args_defaults_instance_data(self, caplog, tmpdir):
+ """When no instance_data argument, default to configured run_dir."""
+ args = self.args(
+ debug=False,
+ dump_all=True,
+ format=None,
+ instance_data=None,
+ list_keys=False,
+ user_data=None,
+ vendor_data=None,
+ varname=None,
+ )
+ paths, run_dir, _, _ = self._setup_paths(tmpdir)
+ with mock.patch("cloudinit.cmd.query.read_cfg_paths") as m_paths:
+ m_paths.return_value = paths
+ assert 1 == query.handle_args("anyname", args)
+ json_file = run_dir.join(INSTANCE_JSON_FILE)
+ msg = "Missing instance-data file: %s" % json_file.strpath
+ assert msg in caplog.text
+
+ def test_handle_args_root_fallsback_to_instance_data(self, caplog, tmpdir):
+ """When no instance_data argument, root falls back to redacted json."""
+ args = self.args(
+ debug=False,
+ dump_all=True,
+ format=None,
+ instance_data=None,
+ list_keys=False,
+ user_data=None,
+ vendor_data=None,
+ varname=None,
+ )
+ paths, run_dir, _, _ = self._setup_paths(tmpdir)
+ with mock.patch("cloudinit.cmd.query.read_cfg_paths") as m_paths:
+ m_paths.return_value = paths
+ with mock.patch("os.getuid") as m_getuid:
+ m_getuid.return_value = 0
+ assert 1 == query.handle_args("anyname", args)
+ json_file = run_dir.join(INSTANCE_JSON_FILE)
+ sensitive_file = run_dir.join(INSTANCE_JSON_SENSITIVE_FILE)
+ msg = "Missing root-readable %s. Using redacted %s instead." % (
+ sensitive_file.strpath,
+ json_file.strpath,
+ )
+ assert msg in caplog.text
+
+ @pytest.mark.parametrize(
+ "ud_src,ud_expected,vd_src,vd_expected",
+ (
+ ("hi mom", "hi mom", "hi pops", "hi pops"),
+ ("ud".encode("utf-8"), "ud", "vd".encode("utf-8"), "vd"),
+ (_gzip_data(b"ud"), "ud", _gzip_data(b"vd"), "vd"),
+ (_gzip_data("ud".encode("utf-8")), "ud", _gzip_data(b"vd"), "vd"),
+ ),
+ )
+ def test_handle_args_root_processes_user_data(
+ self, ud_src, ud_expected, vd_src, vd_expected, capsys, tmpdir
+ ):
+ """Support reading multiple user-data file content types"""
+ paths, run_dir, user_data, vendor_data = self._setup_paths(
+ tmpdir, ud_val=ud_src, vd_val=vd_src
+ )
+ sensitive_file = run_dir.join(INSTANCE_JSON_SENSITIVE_FILE)
+ sensitive_file.write('{"my-var": "it worked"}')
+ args = self.args(
+ debug=False,
+ dump_all=True,
+ format=None,
+ instance_data=None,
+ list_keys=False,
+ user_data=user_data.strpath,
+ vendor_data=vendor_data.strpath,
+ varname=None,
+ )
+ with mock.patch("cloudinit.cmd.query.read_cfg_paths") as m_paths:
+ m_paths.return_value = paths
+ with mock.patch("os.getuid") as m_getuid:
+ m_getuid.return_value = 0
+ assert 0 == query.handle_args("anyname", args)
+ out, _err = capsys.readouterr()
+ cmd_output = json.loads(out)
+ assert "it worked" == cmd_output["my-var"]
+ if ud_expected == "ci-b64:":
+ ud_expected = "ci-b64:{}".format(b64e(ud_src))
+ if vd_expected == "ci-b64:":
+ vd_expected = "ci-b64:{}".format(b64e(vd_src))
+ assert ud_expected == cmd_output["userdata"]
+ assert vd_expected == cmd_output["vendordata"]
+
+ def test_handle_args_user_vendor_data_defaults_to_instance_link(
+ self, capsys, tmpdir
+ ):
+ """When no instance_data argument, root uses sensitive json."""
+ paths, run_dir, _, _ = self._setup_paths(tmpdir)
+ sensitive_file = run_dir.join(INSTANCE_JSON_SENSITIVE_FILE)
+ sensitive_file.write('{"my-var": "it worked"}')
+
+ ud_path = os.path.join(paths.instance_link, "user-data.txt")
+ write_file(ud_path, "instance_link_ud")
+ vd_path = os.path.join(paths.instance_link, "vendor-data.txt")
+ write_file(vd_path, "instance_link_vd")
+
+ args = self.args(
+ debug=False,
+ dump_all=True,
+ format=None,
+ instance_data=None,
+ list_keys=False,
+ user_data=None,
+ vendor_data=None,
+ varname=None,
+ )
+ with mock.patch("cloudinit.cmd.query.read_cfg_paths") as m_paths:
+ m_paths.return_value = paths
+ with mock.patch("os.getuid", return_value=0):
+ assert 0 == query.handle_args("anyname", args)
+ expected = (
+ '{\n "my-var": "it worked",\n '
+ '"userdata": "instance_link_ud",\n '
+ '"vendordata": "instance_link_vd"\n}\n'
+ )
+ out, _ = capsys.readouterr()
+ assert expected == out
+
+ def test_handle_args_root_uses_instance_sensitive_data(
+ self, capsys, tmpdir
+ ):
+ """When no instance_data argument, root uses sensitive json."""
+ paths, run_dir, user_data, vendor_data = self._setup_paths(
+ tmpdir, ud_val="ud", vd_val="vd"
+ )
+ sensitive_file = run_dir.join(INSTANCE_JSON_SENSITIVE_FILE)
+ sensitive_file.write('{"my-var": "it worked"}')
+ args = self.args(
+ debug=False,
+ dump_all=True,
+ format=None,
+ instance_data=None,
+ list_keys=False,
+ user_data=user_data.strpath,
+ vendor_data=vendor_data.strpath,
+ varname=None,
+ )
+ with mock.patch("cloudinit.cmd.query.read_cfg_paths") as m_paths:
+ m_paths.return_value = paths
+ with mock.patch("os.getuid") as m_getuid:
+ m_getuid.return_value = 0
+ assert 0 == query.handle_args("anyname", args)
+ expected = (
+ '{\n "my-var": "it worked",\n '
+ '"userdata": "ud",\n "vendordata": "vd"\n}\n'
+ )
+ out, _err = capsys.readouterr()
+ assert expected == out
+
+ def test_handle_args_dumps_all_instance_data(self, capsys, tmpdir):
+ """When --all is specified query will dump all instance data vars."""
+ instance_data = tmpdir.join("instance-data")
+ instance_data.write('{"my-var": "it worked"}')
+ args = self.args(
+ debug=False,
+ dump_all=True,
+ format=None,
+ instance_data=instance_data.strpath,
+ list_keys=False,
+ user_data="ud",
+ vendor_data="vd",
+ varname=None,
+ )
+ with mock.patch("os.getuid") as m_getuid:
+ m_getuid.return_value = 100
+ assert 0 == query.handle_args("anyname", args)
+ expected = (
+ '{\n "my-var": "it worked",\n "userdata": "<%s> file:ud",\n'
+ ' "vendordata": "<%s> file:vd"\n}\n'
+ % (REDACT_SENSITIVE_VALUE, REDACT_SENSITIVE_VALUE)
+ )
+ out, _err = capsys.readouterr()
+ assert expected == out
+
+ def test_handle_args_returns_top_level_varname(self, capsys, tmpdir):
+ """When the argument varname is passed, report its value."""
+ instance_data = tmpdir.join("instance-data")
+ instance_data.write('{"my-var": "it worked"}')
+ args = self.args(
+ debug=False,
+ dump_all=True,
+ format=None,
+ instance_data=instance_data.strpath,
+ list_keys=False,
+ user_data="ud",
+ vendor_data="vd",
+ varname="my_var",
+ )
+ with mock.patch("os.getuid") as m_getuid:
+ m_getuid.return_value = 100
+ assert 0 == query.handle_args("anyname", args)
+ out, _err = capsys.readouterr()
+ assert "it worked\n" == out
+
+ @pytest.mark.parametrize(
+ "inst_data,varname,expected",
+ (
+ (
+ '{"v1": {"key-2": "value-2"}, "my-var": "it worked"}',
+ "v1.key_2",
+ "value-2\n",
+ ),
+ # Assert no jinja underscore-delimited aliases are reported on CLI
+ (
+ '{"v1": {"something-hyphenated": {"no.underscores":"x",'
+ ' "no-alias": "y"}}, "my-var": "it worked"}',
+ "v1.something_hyphenated",
+ '{\n "no-alias": "y",\n "no.underscores": "x"\n}\n',
+ ),
+ ),
+ )
+ def test_handle_args_returns_nested_varname(
+ self, inst_data, varname, expected, capsys, tmpdir
+ ):
+ """If user_data file is a jinja template render instance-data vars."""
+ instance_data = tmpdir.join("instance-data")
+ instance_data.write(inst_data)
+ args = self.args(
+ debug=False,
+ dump_all=False,
+ format=None,
+ instance_data=instance_data.strpath,
+ user_data="ud",
+ vendor_data="vd",
+ list_keys=False,
+ varname=varname,
+ )
+ with mock.patch("os.getuid") as m_getuid:
+ m_getuid.return_value = 100
+ assert 0 == query.handle_args("anyname", args)
+ out, _err = capsys.readouterr()
+ assert expected == out
+
+ def test_handle_args_returns_standardized_vars_to_top_level_aliases(
+ self, capsys, tmpdir
+ ):
+ """Any standardized vars under v# are promoted as top-level aliases."""
+ instance_data = tmpdir.join("instance-data")
+ instance_data.write(
+ '{"v1": {"v1_1": "val1.1"}, "v2": {"v2_2": "val2.2"},'
+ ' "top": "gun"}'
+ )
+ expected = dedent(
+ """\
+ {
+ "top": "gun",
+ "userdata": "<redacted for non-root user> file:ud",
+ "v1": {
+ "v1_1": "val1.1"
+ },
+ "v1_1": "val1.1",
+ "v2": {
+ "v2_2": "val2.2"
+ },
+ "v2_2": "val2.2",
+ "vendordata": "<redacted for non-root user> file:vd"
+ }
+ """
+ )
+ args = self.args(
+ debug=False,
+ dump_all=True,
+ format=None,
+ instance_data=instance_data.strpath,
+ user_data="ud",
+ vendor_data="vd",
+ list_keys=False,
+ varname=None,
+ )
+ with mock.patch("os.getuid") as m_getuid:
+ m_getuid.return_value = 100
+ assert 0 == query.handle_args("anyname", args)
+ out, _err = capsys.readouterr()
+ assert expected == out
+
+ def test_handle_args_list_keys_sorts_top_level_keys_when_no_varname(
+ self, capsys, tmpdir
+ ):
+ """Sort all top-level keys when only --list-keys provided."""
+ instance_data = tmpdir.join("instance-data")
+ instance_data.write(
+ '{"v1": {"v1_1": "val1.1"}, "v2": {"v2_2": "val2.2"},'
+ ' "top": "gun"}'
+ )
+ expected = "top\nuserdata\nv1\nv1_1\nv2\nv2_2\nvendordata\n"
+ args = self.args(
+ debug=False,
+ dump_all=False,
+ format=None,
+ instance_data=instance_data.strpath,
+ list_keys=True,
+ user_data="ud",
+ vendor_data="vd",
+ varname=None,
+ )
+ with mock.patch("os.getuid") as m_getuid:
+ m_getuid.return_value = 100
+ assert 0 == query.handle_args("anyname", args)
+ out, _err = capsys.readouterr()
+ assert expected == out
+
+ def test_handle_args_list_keys_sorts_nested_keys_when_varname(
+ self, capsys, tmpdir
+ ):
+ """Sort all nested keys of varname object when --list-keys provided."""
+ instance_data = tmpdir.join("instance-data")
+ instance_data.write(
+ '{"v1": {"v1_1": "val1.1", "v1_2": "val1.2"}, "v2":'
+ + ' {"v2_2": "val2.2"}, "top": "gun"}'
+ )
+ expected = "v1_1\nv1_2\n"
+ args = self.args(
+ debug=False,
+ dump_all=False,
+ format=None,
+ instance_data=instance_data.strpath,
+ list_keys=True,
+ user_data="ud",
+ vendor_data="vd",
+ varname="v1",
+ )
+ with mock.patch("os.getuid") as m_getuid:
+ m_getuid.return_value = 100
+ assert 0 == query.handle_args("anyname", args)
+ out, _err = capsys.readouterr()
+ assert expected == out
+
+ def test_handle_args_list_keys_errors_when_varname_is_not_a_dict(
+ self, caplog, tmpdir
+ ):
+ """Raise an error when --list-keys and varname specify a non-list."""
+ instance_data = tmpdir.join("instance-data")
+ instance_data.write(
+ '{"v1": {"v1_1": "val1.1", "v1_2": "val1.2"}, "v2": '
+ + '{"v2_2": "val2.2"}, "top": "gun"}'
+ )
+ expected_error = "--list-keys provided but 'top' is not a dict"
+ args = self.args(
+ debug=False,
+ dump_all=False,
+ format=None,
+ instance_data=instance_data.strpath,
+ list_keys=True,
+ user_data="ud",
+ vendor_data="vd",
+ varname="top",
+ )
+ with mock.patch("os.getuid") as m_getuid:
+ m_getuid.return_value = 100
+ assert 1 == query.handle_args("anyname", args)
+ assert expected_error in caplog.text
+
+
+# vi: ts=4 expandtab
diff --git a/tests/unittests/cmd/test_status.py b/tests/unittests/cmd/test_status.py
new file mode 100644
index 00000000..c5f424da
--- /dev/null
+++ b/tests/unittests/cmd/test_status.py
@@ -0,0 +1,548 @@
+# This file is part of cloud-init. See LICENSE file for license information.
+
+import os
+from collections import namedtuple
+from io import StringIO
+from textwrap import dedent
+
+from cloudinit.atomic_helper import write_json
+from cloudinit.cmd import status
+from cloudinit.util import ensure_file
+from tests.unittests.helpers import CiTestCase, mock, wrap_and_call
+
+mypaths = namedtuple("MyPaths", "run_dir")
+myargs = namedtuple("MyArgs", "long wait")
+
+
+class TestStatus(CiTestCase):
+ def setUp(self):
+ super(TestStatus, self).setUp()
+ self.new_root = self.tmp_dir()
+ self.status_file = self.tmp_path("status.json", self.new_root)
+ self.disable_file = self.tmp_path("cloudinit-disable", self.new_root)
+ self.paths = mypaths(run_dir=self.new_root)
+
+ class FakeInit(object):
+ paths = self.paths
+
+ def __init__(self, ds_deps):
+ pass
+
+ def read_cfg(self):
+ pass
+
+ self.init_class = FakeInit
+
+ def test__is_cloudinit_disabled_false_on_sysvinit(self):
+ """When not in an environment using systemd, return False."""
+ ensure_file(self.disable_file) # Create the ignored disable file
+ (is_disabled, reason) = wrap_and_call(
+ "cloudinit.cmd.status",
+ {
+ "uses_systemd": False,
+ "get_cmdline": "root=/dev/my-root not-important",
+ },
+ status._is_cloudinit_disabled,
+ self.disable_file,
+ self.paths,
+ )
+ self.assertFalse(
+ is_disabled, "expected enabled cloud-init on sysvinit"
+ )
+ self.assertEqual("Cloud-init enabled on sysvinit", reason)
+
+ def test__is_cloudinit_disabled_true_on_disable_file(self):
+ """When using systemd and disable_file is present return disabled."""
+ ensure_file(self.disable_file) # Create observed disable file
+ (is_disabled, reason) = wrap_and_call(
+ "cloudinit.cmd.status",
+ {
+ "uses_systemd": True,
+ "get_cmdline": "root=/dev/my-root not-important",
+ },
+ status._is_cloudinit_disabled,
+ self.disable_file,
+ self.paths,
+ )
+ self.assertTrue(is_disabled, "expected disabled cloud-init")
+ self.assertEqual(
+ "Cloud-init disabled by {0}".format(self.disable_file), reason
+ )
+
+ def test__is_cloudinit_disabled_false_on_kernel_cmdline_enable(self):
+ """Not disabled when using systemd and enabled via commandline."""
+ ensure_file(self.disable_file) # Create ignored disable file
+ (is_disabled, reason) = wrap_and_call(
+ "cloudinit.cmd.status",
+ {
+ "uses_systemd": True,
+ "get_cmdline": "something cloud-init=enabled else",
+ },
+ status._is_cloudinit_disabled,
+ self.disable_file,
+ self.paths,
+ )
+ self.assertFalse(is_disabled, "expected enabled cloud-init")
+ self.assertEqual(
+ "Cloud-init enabled by kernel command line cloud-init=enabled",
+ reason,
+ )
+
+ def test__is_cloudinit_disabled_true_on_kernel_cmdline(self):
+ """When kernel command line disables cloud-init return True."""
+ (is_disabled, reason) = wrap_and_call(
+ "cloudinit.cmd.status",
+ {
+ "uses_systemd": True,
+ "get_cmdline": "something cloud-init=disabled else",
+ },
+ status._is_cloudinit_disabled,
+ self.disable_file,
+ self.paths,
+ )
+ self.assertTrue(is_disabled, "expected disabled cloud-init")
+ self.assertEqual(
+ "Cloud-init disabled by kernel parameter cloud-init=disabled",
+ reason,
+ )
+
+ def test__is_cloudinit_disabled_true_when_generator_disables(self):
+ """When cloud-init-generator writes disabled file return True."""
+ disabled_file = os.path.join(self.paths.run_dir, "disabled")
+ ensure_file(disabled_file)
+ (is_disabled, reason) = wrap_and_call(
+ "cloudinit.cmd.status",
+ {"uses_systemd": True, "get_cmdline": "something"},
+ status._is_cloudinit_disabled,
+ self.disable_file,
+ self.paths,
+ )
+ self.assertTrue(is_disabled, "expected disabled cloud-init")
+ self.assertEqual("Cloud-init disabled by cloud-init-generator", reason)
+
+ def test__is_cloudinit_disabled_false_when_enabled_in_systemd(self):
+ """Report enabled when systemd generator creates the enabled file."""
+ enabled_file = os.path.join(self.paths.run_dir, "enabled")
+ ensure_file(enabled_file)
+ (is_disabled, reason) = wrap_and_call(
+ "cloudinit.cmd.status",
+ {"uses_systemd": True, "get_cmdline": "something ignored"},
+ status._is_cloudinit_disabled,
+ self.disable_file,
+ self.paths,
+ )
+ self.assertFalse(is_disabled, "expected enabled cloud-init")
+ self.assertEqual(
+ "Cloud-init enabled by systemd cloud-init-generator", reason
+ )
+
+ def test_status_returns_not_run(self):
+ """When status.json does not exist yet, return 'not run'."""
+ self.assertFalse(
+ os.path.exists(self.status_file), "Unexpected status.json found"
+ )
+ cmdargs = myargs(long=False, wait=False)
+ with mock.patch("sys.stdout", new_callable=StringIO) as m_stdout:
+ retcode = wrap_and_call(
+ "cloudinit.cmd.status",
+ {
+ "_is_cloudinit_disabled": (False, ""),
+ "Init": {"side_effect": self.init_class},
+ },
+ status.handle_status_args,
+ "ignored",
+ cmdargs,
+ )
+ self.assertEqual(0, retcode)
+ self.assertEqual("status: not run\n", m_stdout.getvalue())
+
+ def test_status_returns_disabled_long_on_presence_of_disable_file(self):
+ """When cloudinit is disabled, return disabled reason."""
+
+ checked_files = []
+
+ def fakeexists(filepath):
+ checked_files.append(filepath)
+ status_file = os.path.join(self.paths.run_dir, "status.json")
+ return bool(not filepath == status_file)
+
+ cmdargs = myargs(long=True, wait=False)
+ with mock.patch("sys.stdout", new_callable=StringIO) as m_stdout:
+ retcode = wrap_and_call(
+ "cloudinit.cmd.status",
+ {
+ "os.path.exists": {"side_effect": fakeexists},
+ "_is_cloudinit_disabled": (
+ True,
+ "disabled for some reason",
+ ),
+ "Init": {"side_effect": self.init_class},
+ },
+ status.handle_status_args,
+ "ignored",
+ cmdargs,
+ )
+ self.assertEqual(0, retcode)
+ self.assertEqual(
+ [os.path.join(self.paths.run_dir, "status.json")], checked_files
+ )
+ expected = dedent(
+ """\
+ status: disabled
+ detail:
+ disabled for some reason
+ """
+ )
+ self.assertEqual(expected, m_stdout.getvalue())
+
+ def test_status_returns_running_on_no_results_json(self):
+ """Report running when status.json exists but result.json does not."""
+ result_file = self.tmp_path("result.json", self.new_root)
+ write_json(self.status_file, {})
+ self.assertFalse(
+ os.path.exists(result_file), "Unexpected result.json found"
+ )
+ cmdargs = myargs(long=False, wait=False)
+ with mock.patch("sys.stdout", new_callable=StringIO) as m_stdout:
+ retcode = wrap_and_call(
+ "cloudinit.cmd.status",
+ {
+ "_is_cloudinit_disabled": (False, ""),
+ "Init": {"side_effect": self.init_class},
+ },
+ status.handle_status_args,
+ "ignored",
+ cmdargs,
+ )
+ self.assertEqual(0, retcode)
+ self.assertEqual("status: running\n", m_stdout.getvalue())
+
+ def test_status_returns_running(self):
+ """Report running when status exists with an unfinished stage."""
+ ensure_file(self.tmp_path("result.json", self.new_root))
+ write_json(
+ self.status_file, {"v1": {"init": {"start": 1, "finished": None}}}
+ )
+ cmdargs = myargs(long=False, wait=False)
+ with mock.patch("sys.stdout", new_callable=StringIO) as m_stdout:
+ retcode = wrap_and_call(
+ "cloudinit.cmd.status",
+ {
+ "_is_cloudinit_disabled": (False, ""),
+ "Init": {"side_effect": self.init_class},
+ },
+ status.handle_status_args,
+ "ignored",
+ cmdargs,
+ )
+ self.assertEqual(0, retcode)
+ self.assertEqual("status: running\n", m_stdout.getvalue())
+
+ def test_status_returns_done(self):
+ """Report done results.json exists no stages are unfinished."""
+ ensure_file(self.tmp_path("result.json", self.new_root))
+ write_json(
+ self.status_file,
+ {
+ "v1": {
+ "stage": None, # No current stage running
+ "datasource": (
+ "DataSourceNoCloud [seed=/var/.../seed/nocloud-net]"
+ "[dsmode=net]"
+ ),
+ "blah": {"finished": 123.456},
+ "init": {
+ "errors": [],
+ "start": 124.567,
+ "finished": 125.678,
+ },
+ "init-local": {"start": 123.45, "finished": 123.46},
+ }
+ },
+ )
+ cmdargs = myargs(long=False, wait=False)
+ with mock.patch("sys.stdout", new_callable=StringIO) as m_stdout:
+ retcode = wrap_and_call(
+ "cloudinit.cmd.status",
+ {
+ "_is_cloudinit_disabled": (False, ""),
+ "Init": {"side_effect": self.init_class},
+ },
+ status.handle_status_args,
+ "ignored",
+ cmdargs,
+ )
+ self.assertEqual(0, retcode)
+ self.assertEqual("status: done\n", m_stdout.getvalue())
+
+ def test_status_returns_done_long(self):
+ """Long format of done status includes datasource info."""
+ ensure_file(self.tmp_path("result.json", self.new_root))
+ write_json(
+ self.status_file,
+ {
+ "v1": {
+ "stage": None,
+ "datasource": (
+ "DataSourceNoCloud [seed=/var/.../seed/nocloud-net]"
+ "[dsmode=net]"
+ ),
+ "init": {"start": 124.567, "finished": 125.678},
+ "init-local": {"start": 123.45, "finished": 123.46},
+ }
+ },
+ )
+ cmdargs = myargs(long=True, wait=False)
+ with mock.patch("sys.stdout", new_callable=StringIO) as m_stdout:
+ retcode = wrap_and_call(
+ "cloudinit.cmd.status",
+ {
+ "_is_cloudinit_disabled": (False, ""),
+ "Init": {"side_effect": self.init_class},
+ },
+ status.handle_status_args,
+ "ignored",
+ cmdargs,
+ )
+ self.assertEqual(0, retcode)
+ expected = dedent(
+ """\
+ status: done
+ time: Thu, 01 Jan 1970 00:02:05 +0000
+ detail:
+ DataSourceNoCloud [seed=/var/.../seed/nocloud-net][dsmode=net]
+ """
+ )
+ self.assertEqual(expected, m_stdout.getvalue())
+
+ def test_status_on_errors(self):
+ """Reports error when any stage has errors."""
+ write_json(
+ self.status_file,
+ {
+ "v1": {
+ "stage": None,
+ "blah": {"errors": [], "finished": 123.456},
+ "init": {
+ "errors": ["error1"],
+ "start": 124.567,
+ "finished": 125.678,
+ },
+ "init-local": {"start": 123.45, "finished": 123.46},
+ }
+ },
+ )
+ cmdargs = myargs(long=False, wait=False)
+ with mock.patch("sys.stdout", new_callable=StringIO) as m_stdout:
+ retcode = wrap_and_call(
+ "cloudinit.cmd.status",
+ {
+ "_is_cloudinit_disabled": (False, ""),
+ "Init": {"side_effect": self.init_class},
+ },
+ status.handle_status_args,
+ "ignored",
+ cmdargs,
+ )
+ self.assertEqual(1, retcode)
+ self.assertEqual("status: error\n", m_stdout.getvalue())
+
+ def test_status_on_errors_long(self):
+ """Long format of error status includes all error messages."""
+ write_json(
+ self.status_file,
+ {
+ "v1": {
+ "stage": None,
+ "datasource": (
+ "DataSourceNoCloud [seed=/var/.../seed/nocloud-net]"
+ "[dsmode=net]"
+ ),
+ "init": {
+ "errors": ["error1"],
+ "start": 124.567,
+ "finished": 125.678,
+ },
+ "init-local": {
+ "errors": ["error2", "error3"],
+ "start": 123.45,
+ "finished": 123.46,
+ },
+ }
+ },
+ )
+ cmdargs = myargs(long=True, wait=False)
+ with mock.patch("sys.stdout", new_callable=StringIO) as m_stdout:
+ retcode = wrap_and_call(
+ "cloudinit.cmd.status",
+ {
+ "_is_cloudinit_disabled": (False, ""),
+ "Init": {"side_effect": self.init_class},
+ },
+ status.handle_status_args,
+ "ignored",
+ cmdargs,
+ )
+ self.assertEqual(1, retcode)
+ expected = dedent(
+ """\
+ status: error
+ time: Thu, 01 Jan 1970 00:02:05 +0000
+ detail:
+ error1
+ error2
+ error3
+ """
+ )
+ self.assertEqual(expected, m_stdout.getvalue())
+
+ def test_status_returns_running_long_format(self):
+ """Long format reports the stage in which we are running."""
+ write_json(
+ self.status_file,
+ {
+ "v1": {
+ "stage": "init",
+ "init": {"start": 124.456, "finished": None},
+ "init-local": {"start": 123.45, "finished": 123.46},
+ }
+ },
+ )
+ cmdargs = myargs(long=True, wait=False)
+ with mock.patch("sys.stdout", new_callable=StringIO) as m_stdout:
+ retcode = wrap_and_call(
+ "cloudinit.cmd.status",
+ {
+ "_is_cloudinit_disabled": (False, ""),
+ "Init": {"side_effect": self.init_class},
+ },
+ status.handle_status_args,
+ "ignored",
+ cmdargs,
+ )
+ self.assertEqual(0, retcode)
+ expected = dedent(
+ """\
+ status: running
+ time: Thu, 01 Jan 1970 00:02:04 +0000
+ detail:
+ Running in stage: init
+ """
+ )
+ self.assertEqual(expected, m_stdout.getvalue())
+
+ def test_status_wait_blocks_until_done(self):
+ """Specifying wait will poll every 1/4 second until done state."""
+ running_json = {
+ "v1": {
+ "stage": "init",
+ "init": {"start": 124.456, "finished": None},
+ "init-local": {"start": 123.45, "finished": 123.46},
+ }
+ }
+ done_json = {
+ "v1": {
+ "stage": None,
+ "init": {"start": 124.456, "finished": 125.678},
+ "init-local": {"start": 123.45, "finished": 123.46},
+ }
+ }
+
+ self.sleep_calls = 0
+
+ def fake_sleep(interval):
+ self.assertEqual(0.25, interval)
+ self.sleep_calls += 1
+ if self.sleep_calls == 2:
+ write_json(self.status_file, running_json)
+ elif self.sleep_calls == 3:
+ write_json(self.status_file, done_json)
+ result_file = self.tmp_path("result.json", self.new_root)
+ ensure_file(result_file)
+
+ cmdargs = myargs(long=False, wait=True)
+ with mock.patch("sys.stdout", new_callable=StringIO) as m_stdout:
+ retcode = wrap_and_call(
+ "cloudinit.cmd.status",
+ {
+ "sleep": {"side_effect": fake_sleep},
+ "_is_cloudinit_disabled": (False, ""),
+ "Init": {"side_effect": self.init_class},
+ },
+ status.handle_status_args,
+ "ignored",
+ cmdargs,
+ )
+ self.assertEqual(0, retcode)
+ self.assertEqual(4, self.sleep_calls)
+ self.assertEqual("....\nstatus: done\n", m_stdout.getvalue())
+
+ def test_status_wait_blocks_until_error(self):
+ """Specifying wait will poll every 1/4 second until error state."""
+ running_json = {
+ "v1": {
+ "stage": "init",
+ "init": {"start": 124.456, "finished": None},
+ "init-local": {"start": 123.45, "finished": 123.46},
+ }
+ }
+ error_json = {
+ "v1": {
+ "stage": None,
+ "init": {
+ "errors": ["error1"],
+ "start": 124.456,
+ "finished": 125.678,
+ },
+ "init-local": {"start": 123.45, "finished": 123.46},
+ }
+ }
+
+ self.sleep_calls = 0
+
+ def fake_sleep(interval):
+ self.assertEqual(0.25, interval)
+ self.sleep_calls += 1
+ if self.sleep_calls == 2:
+ write_json(self.status_file, running_json)
+ elif self.sleep_calls == 3:
+ write_json(self.status_file, error_json)
+
+ cmdargs = myargs(long=False, wait=True)
+ with mock.patch("sys.stdout", new_callable=StringIO) as m_stdout:
+ retcode = wrap_and_call(
+ "cloudinit.cmd.status",
+ {
+ "sleep": {"side_effect": fake_sleep},
+ "_is_cloudinit_disabled": (False, ""),
+ "Init": {"side_effect": self.init_class},
+ },
+ status.handle_status_args,
+ "ignored",
+ cmdargs,
+ )
+ self.assertEqual(1, retcode)
+ self.assertEqual(4, self.sleep_calls)
+ self.assertEqual("....\nstatus: error\n", m_stdout.getvalue())
+
+ def test_status_main(self):
+ """status.main can be run as a standalone script."""
+ write_json(
+ self.status_file, {"v1": {"init": {"start": 1, "finished": None}}}
+ )
+ with self.assertRaises(SystemExit) as context_manager:
+ with mock.patch("sys.stdout", new_callable=StringIO) as m_stdout:
+ wrap_and_call(
+ "cloudinit.cmd.status",
+ {
+ "sys.argv": {"new": ["status"]},
+ "_is_cloudinit_disabled": (False, ""),
+ "Init": {"side_effect": self.init_class},
+ },
+ status.main,
+ )
+ self.assertEqual(0, context_manager.exception.code)
+ self.assertEqual("status: running\n", m_stdout.getvalue())
+
+
+# vi: ts=4 expandtab syntax=python