summaryrefslogtreecommitdiff
path: root/tests/unittests/cmd
diff options
context:
space:
mode:
Diffstat (limited to 'tests/unittests/cmd')
-rw-r--r--tests/unittests/cmd/devel/test_hotplug_hook.py162
-rw-r--r--tests/unittests/cmd/devel/test_logs.py232
-rw-r--r--tests/unittests/cmd/devel/test_render.py152
-rw-r--r--tests/unittests/cmd/test_clean.py179
-rw-r--r--tests/unittests/cmd/test_cloud_id.py99
-rw-r--r--tests/unittests/cmd/test_main.py223
-rw-r--r--tests/unittests/cmd/test_query.py403
-rw-r--r--tests/unittests/cmd/test_status.py561
8 files changed, 1221 insertions, 790 deletions
diff --git a/tests/unittests/cmd/devel/test_hotplug_hook.py b/tests/unittests/cmd/devel/test_hotplug_hook.py
index e1c64e2f..842e8dfd 100644
--- a/tests/unittests/cmd/devel/test_hotplug_hook.py
+++ b/tests/unittests/cmd/devel/test_hotplug_hook.py
@@ -1,8 +1,9 @@
-import pytest
from collections import namedtuple
from unittest import mock
from unittest.mock import call
+import pytest
+
from cloudinit.cmd.devel.hotplug_hook import handle_hotplug
from cloudinit.distros import Distro
from cloudinit.event import EventType
@@ -11,9 +12,8 @@ from cloudinit.net.network_state import NetworkState
from cloudinit.sources import DataSource
from cloudinit.stages import Init
-
-hotplug_args = namedtuple('hotplug_args', 'udevaction, subsystem, devpath')
-FAKE_MAC = '11:22:33:44:55:66'
+hotplug_args = namedtuple("hotplug_args", "udevaction, subsystem, devpath")
+FAKE_MAC = "11:22:33:44:55:66"
@pytest.yield_fixture
@@ -26,28 +26,28 @@ def mocks():
m_init.fetch.return_value = m_datasource
read_sys_net = mock.patch(
- 'cloudinit.cmd.devel.hotplug_hook.read_sys_net_safe',
- return_value=FAKE_MAC
+ "cloudinit.cmd.devel.hotplug_hook.read_sys_net_safe",
+ return_value=FAKE_MAC,
)
update_event_enabled = mock.patch(
- 'cloudinit.stages.update_event_enabled',
+ "cloudinit.stages.update_event_enabled",
return_value=True,
)
m_network_state = mock.MagicMock(spec=NetworkState)
parse_net = mock.patch(
- 'cloudinit.cmd.devel.hotplug_hook.parse_net_config_data',
- return_value=m_network_state
+ "cloudinit.cmd.devel.hotplug_hook.parse_net_config_data",
+ return_value=m_network_state,
)
m_activator = mock.MagicMock(spec=NetworkActivator)
select_activator = mock.patch(
- 'cloudinit.cmd.devel.hotplug_hook.activators.select_activator',
- return_value=m_activator
+ "cloudinit.cmd.devel.hotplug_hook.activators.select_activator",
+ return_value=m_activator,
)
- sleep = mock.patch('time.sleep')
+ sleep = mock.patch("time.sleep")
read_sys_net.start()
update_event_enabled.start()
@@ -55,7 +55,7 @@ def mocks():
select_activator.start()
m_sleep = sleep.start()
- yield namedtuple('mocks', 'm_init m_network_state m_activator m_sleep')(
+ yield namedtuple("mocks", "m_init m_network_state m_activator m_sleep")(
m_init=m_init,
m_network_state=m_network_state,
m_activator=m_activator,
@@ -72,42 +72,43 @@ def mocks():
class TestUnsupportedActions:
def test_unsupported_subsystem(self, mocks):
with pytest.raises(
- Exception,
- match='cannot handle events for subsystem: not_real'
+ Exception, match="cannot handle events for subsystem: not_real"
):
handle_hotplug(
hotplug_init=mocks.m_init,
- devpath='/dev/fake',
- subsystem='not_real',
- udevaction='add'
+ devpath="/dev/fake",
+ subsystem="not_real",
+ udevaction="add",
)
def test_unsupported_udevaction(self, mocks):
- with pytest.raises(ValueError, match='Unknown action: not_real'):
+ with pytest.raises(ValueError, match="Unknown action: not_real"):
handle_hotplug(
hotplug_init=mocks.m_init,
- devpath='/dev/fake',
- udevaction='not_real',
- subsystem='net'
+ devpath="/dev/fake",
+ udevaction="not_real",
+ subsystem="net",
)
class TestHotplug:
def test_succcessful_add(self, mocks):
init = mocks.m_init
- mocks.m_network_state.iter_interfaces.return_value = [{
- 'mac_address': FAKE_MAC,
- }]
+ mocks.m_network_state.iter_interfaces.return_value = [
+ {
+ "mac_address": FAKE_MAC,
+ }
+ ]
handle_hotplug(
hotplug_init=init,
- devpath='/dev/fake',
- udevaction='add',
- subsystem='net'
+ devpath="/dev/fake",
+ udevaction="add",
+ subsystem="net",
+ )
+ init.datasource.update_metadata_if_supported.assert_called_once_with(
+ [EventType.HOTPLUG]
)
- init.datasource.update_metadata_if_supported.assert_called_once_with([
- EventType.HOTPLUG
- ])
- mocks.m_activator.bring_up_interface.assert_called_once_with('fake')
+ mocks.m_activator.bring_up_interface.assert_called_once_with("fake")
mocks.m_activator.bring_down_interface.assert_not_called()
init._write_to_cache.assert_called_once_with()
@@ -116,113 +117,120 @@ class TestHotplug:
mocks.m_network_state.iter_interfaces.return_value = [{}]
handle_hotplug(
hotplug_init=init,
- devpath='/dev/fake',
- udevaction='remove',
- subsystem='net'
+ devpath="/dev/fake",
+ udevaction="remove",
+ subsystem="net",
)
- init.datasource.update_metadata_if_supported.assert_called_once_with([
- EventType.HOTPLUG
- ])
- mocks.m_activator.bring_down_interface.assert_called_once_with('fake')
+ init.datasource.update_metadata_if_supported.assert_called_once_with(
+ [EventType.HOTPLUG]
+ )
+ mocks.m_activator.bring_down_interface.assert_called_once_with("fake")
mocks.m_activator.bring_up_interface.assert_not_called()
init._write_to_cache.assert_called_once_with()
def test_update_event_disabled(self, mocks, caplog):
init = mocks.m_init
with mock.patch(
- 'cloudinit.stages.update_event_enabled',
- return_value=False
+ "cloudinit.stages.update_event_enabled", return_value=False
):
handle_hotplug(
hotplug_init=init,
- devpath='/dev/fake',
- udevaction='remove',
- subsystem='net'
+ devpath="/dev/fake",
+ udevaction="remove",
+ subsystem="net",
)
- assert 'hotplug not enabled for event of type' in caplog.text
+ assert "hotplug not enabled for event of type" in caplog.text
init.datasource.update_metadata_if_supported.assert_not_called()
mocks.m_activator.bring_up_interface.assert_not_called()
mocks.m_activator.bring_down_interface.assert_not_called()
init._write_to_cache.assert_not_called()
def test_update_metadata_failed(self, mocks):
- mocks.m_init.datasource.update_metadata_if_supported.return_value = \
+ mocks.m_init.datasource.update_metadata_if_supported.return_value = (
False
+ )
with pytest.raises(
- RuntimeError, match='Datasource .* not updated for event hotplug'
+ RuntimeError, match="Datasource .* not updated for event hotplug"
):
handle_hotplug(
hotplug_init=mocks.m_init,
- devpath='/dev/fake',
- udevaction='remove',
- subsystem='net'
+ devpath="/dev/fake",
+ udevaction="remove",
+ subsystem="net",
)
def test_detect_hotplugged_device_not_detected_on_add(self, mocks):
mocks.m_network_state.iter_interfaces.return_value = [{}]
with pytest.raises(
RuntimeError,
- match='Failed to detect {} in updated metadata'.format(FAKE_MAC)
+ match="Failed to detect {} in updated metadata".format(FAKE_MAC),
):
handle_hotplug(
hotplug_init=mocks.m_init,
- devpath='/dev/fake',
- udevaction='add',
- subsystem='net'
+ devpath="/dev/fake",
+ udevaction="add",
+ subsystem="net",
)
def test_detect_hotplugged_device_detected_on_remove(self, mocks):
- mocks.m_network_state.iter_interfaces.return_value = [{
- 'mac_address': FAKE_MAC,
- }]
+ mocks.m_network_state.iter_interfaces.return_value = [
+ {
+ "mac_address": FAKE_MAC,
+ }
+ ]
with pytest.raises(
- RuntimeError,
- match='Failed to detect .* in updated metadata'
+ RuntimeError, match="Failed to detect .* in updated metadata"
):
handle_hotplug(
hotplug_init=mocks.m_init,
- devpath='/dev/fake',
- udevaction='remove',
- subsystem='net'
+ devpath="/dev/fake",
+ udevaction="remove",
+ subsystem="net",
)
def test_apply_failed_on_add(self, mocks):
- mocks.m_network_state.iter_interfaces.return_value = [{
- 'mac_address': FAKE_MAC,
- }]
+ mocks.m_network_state.iter_interfaces.return_value = [
+ {
+ "mac_address": FAKE_MAC,
+ }
+ ]
mocks.m_activator.bring_up_interface.return_value = False
with pytest.raises(
- RuntimeError, match='Failed to bring up device: /dev/fake'
+ RuntimeError, match="Failed to bring up device: /dev/fake"
):
handle_hotplug(
hotplug_init=mocks.m_init,
- devpath='/dev/fake',
- udevaction='add',
- subsystem='net'
+ devpath="/dev/fake",
+ udevaction="add",
+ subsystem="net",
)
def test_apply_failed_on_remove(self, mocks):
mocks.m_network_state.iter_interfaces.return_value = [{}]
mocks.m_activator.bring_down_interface.return_value = False
with pytest.raises(
- RuntimeError, match='Failed to bring down device: /dev/fake'
+ RuntimeError, match="Failed to bring down device: /dev/fake"
):
handle_hotplug(
hotplug_init=mocks.m_init,
- devpath='/dev/fake',
- udevaction='remove',
- subsystem='net'
+ devpath="/dev/fake",
+ udevaction="remove",
+ subsystem="net",
)
def test_retry(self, mocks):
with pytest.raises(RuntimeError):
handle_hotplug(
hotplug_init=mocks.m_init,
- devpath='/dev/fake',
- udevaction='add',
- subsystem='net'
+ devpath="/dev/fake",
+ udevaction="add",
+ subsystem="net",
)
assert mocks.m_sleep.call_count == 5
assert mocks.m_sleep.call_args_list == [
- call(1), call(3), call(5), call(10), call(30)
+ call(1),
+ call(3),
+ call(5),
+ call(10),
+ call(30),
]
diff --git a/tests/unittests/cmd/devel/test_logs.py b/tests/unittests/cmd/devel/test_logs.py
index 18bdcdda..73ed3c65 100644
--- a/tests/unittests/cmd/devel/test_logs.py
+++ b/tests/unittests/cmd/devel/test_logs.py
@@ -1,167 +1,213 @@
# This file is part of cloud-init. See LICENSE file for license information.
-from datetime import datetime
import os
+from datetime import datetime
from io import StringIO
from cloudinit.cmd.devel import logs
from cloudinit.sources import INSTANCE_JSON_SENSITIVE_FILE
-from tests.unittests.helpers import (
- FilesystemMockingTestCase, mock, wrap_and_call)
from cloudinit.subp import subp
from cloudinit.util import ensure_dir, load_file, write_file
+from tests.unittests.helpers import (
+ FilesystemMockingTestCase,
+ mock,
+ wrap_and_call,
+)
-@mock.patch('cloudinit.cmd.devel.logs.os.getuid')
+@mock.patch("cloudinit.cmd.devel.logs.os.getuid")
class TestCollectLogs(FilesystemMockingTestCase):
-
def setUp(self):
super(TestCollectLogs, self).setUp()
self.new_root = self.tmp_dir()
- self.run_dir = self.tmp_path('run', self.new_root)
+ self.run_dir = self.tmp_path("run", self.new_root)
def test_collect_logs_with_userdata_requires_root_user(self, m_getuid):
"""collect-logs errors when non-root user collects userdata ."""
m_getuid.return_value = 100 # non-root
- output_tarfile = self.tmp_path('logs.tgz')
- with mock.patch('sys.stderr', new_callable=StringIO) as m_stderr:
+ output_tarfile = self.tmp_path("logs.tgz")
+ with mock.patch("sys.stderr", new_callable=StringIO) as m_stderr:
self.assertEqual(
- 1, logs.collect_logs(output_tarfile, include_userdata=True))
+ 1, logs.collect_logs(output_tarfile, include_userdata=True)
+ )
self.assertEqual(
- 'To include userdata, root user is required.'
- ' Try sudo cloud-init collect-logs\n',
- m_stderr.getvalue())
+ "To include userdata, root user is required."
+ " Try sudo cloud-init collect-logs\n",
+ m_stderr.getvalue(),
+ )
def test_collect_logs_creates_tarfile(self, m_getuid):
"""collect-logs creates a tarfile with all related cloud-init info."""
m_getuid.return_value = 100
- log1 = self.tmp_path('cloud-init.log', self.new_root)
- write_file(log1, 'cloud-init-log')
- log2 = self.tmp_path('cloud-init-output.log', self.new_root)
- write_file(log2, 'cloud-init-output-log')
+ log1 = self.tmp_path("cloud-init.log", self.new_root)
+ write_file(log1, "cloud-init-log")
+ log2 = self.tmp_path("cloud-init-output.log", self.new_root)
+ write_file(log2, "cloud-init-output-log")
ensure_dir(self.run_dir)
- write_file(self.tmp_path('results.json', self.run_dir), 'results')
- write_file(self.tmp_path(INSTANCE_JSON_SENSITIVE_FILE, self.run_dir),
- 'sensitive')
- output_tarfile = self.tmp_path('logs.tgz')
+ write_file(self.tmp_path("results.json", self.run_dir), "results")
+ write_file(
+ self.tmp_path(INSTANCE_JSON_SENSITIVE_FILE, self.run_dir),
+ "sensitive",
+ )
+ output_tarfile = self.tmp_path("logs.tgz")
- date = datetime.utcnow().date().strftime('%Y-%m-%d')
- date_logdir = 'cloud-init-logs-{0}'.format(date)
+ date = datetime.utcnow().date().strftime("%Y-%m-%d")
+ date_logdir = "cloud-init-logs-{0}".format(date)
- version_out = '/usr/bin/cloud-init 18.2fake\n'
+ version_out = "/usr/bin/cloud-init 18.2fake\n"
expected_subp = {
- ('dpkg-query', '--show', "-f=${Version}\n", 'cloud-init'):
- '0.7fake\n',
- ('cloud-init', '--version'): version_out,
- ('dmesg',): 'dmesg-out\n',
- ('journalctl', '--boot=0', '-o', 'short-precise'): 'journal-out\n',
- ('tar', 'czvf', output_tarfile, date_logdir): ''
+ (
+ "dpkg-query",
+ "--show",
+ "-f=${Version}\n",
+ "cloud-init",
+ ): "0.7fake\n",
+ ("cloud-init", "--version"): version_out,
+ ("dmesg",): "dmesg-out\n",
+ ("journalctl", "--boot=0", "-o", "short-precise"): "journal-out\n",
+ ("tar", "czvf", output_tarfile, date_logdir): "",
}
def fake_subp(cmd):
cmd_tuple = tuple(cmd)
if cmd_tuple not in expected_subp:
raise AssertionError(
- 'Unexpected command provided to subp: {0}'.format(cmd))
- if cmd == ['tar', 'czvf', output_tarfile, date_logdir]:
+ "Unexpected command provided to subp: {0}".format(cmd)
+ )
+ if cmd == ["tar", "czvf", output_tarfile, date_logdir]:
subp(cmd) # Pass through tar cmd so we can check output
- return expected_subp[cmd_tuple], ''
+ return expected_subp[cmd_tuple], ""
fake_stderr = mock.MagicMock()
wrap_and_call(
- 'cloudinit.cmd.devel.logs',
- {'subp': {'side_effect': fake_subp},
- 'sys.stderr': {'new': fake_stderr},
- 'CLOUDINIT_LOGS': {'new': [log1, log2]},
- 'CLOUDINIT_RUN_DIR': {'new': self.run_dir}},
- logs.collect_logs, output_tarfile, include_userdata=False)
+ "cloudinit.cmd.devel.logs",
+ {
+ "subp": {"side_effect": fake_subp},
+ "sys.stderr": {"new": fake_stderr},
+ "CLOUDINIT_LOGS": {"new": [log1, log2]},
+ "CLOUDINIT_RUN_DIR": {"new": self.run_dir},
+ },
+ logs.collect_logs,
+ output_tarfile,
+ include_userdata=False,
+ )
# unpack the tarfile and check file contents
- subp(['tar', 'zxvf', output_tarfile, '-C', self.new_root])
+ subp(["tar", "zxvf", output_tarfile, "-C", self.new_root])
out_logdir = self.tmp_path(date_logdir, self.new_root)
self.assertFalse(
os.path.exists(
- os.path.join(out_logdir, 'run', 'cloud-init',
- INSTANCE_JSON_SENSITIVE_FILE)),
- 'Unexpected file found: %s' % INSTANCE_JSON_SENSITIVE_FILE)
+ os.path.join(
+ out_logdir,
+ "run",
+ "cloud-init",
+ INSTANCE_JSON_SENSITIVE_FILE,
+ )
+ ),
+ "Unexpected file found: %s" % INSTANCE_JSON_SENSITIVE_FILE,
+ )
+ self.assertEqual(
+ "0.7fake\n", load_file(os.path.join(out_logdir, "dpkg-version"))
+ )
self.assertEqual(
- '0.7fake\n',
- load_file(os.path.join(out_logdir, 'dpkg-version')))
- self.assertEqual(version_out,
- load_file(os.path.join(out_logdir, 'version')))
+ version_out, load_file(os.path.join(out_logdir, "version"))
+ )
self.assertEqual(
- 'cloud-init-log',
- load_file(os.path.join(out_logdir, 'cloud-init.log')))
+ "cloud-init-log",
+ load_file(os.path.join(out_logdir, "cloud-init.log")),
+ )
self.assertEqual(
- 'cloud-init-output-log',
- load_file(os.path.join(out_logdir, 'cloud-init-output.log')))
+ "cloud-init-output-log",
+ load_file(os.path.join(out_logdir, "cloud-init-output.log")),
+ )
self.assertEqual(
- 'dmesg-out\n',
- load_file(os.path.join(out_logdir, 'dmesg.txt')))
+ "dmesg-out\n", load_file(os.path.join(out_logdir, "dmesg.txt"))
+ )
self.assertEqual(
- 'journal-out\n',
- load_file(os.path.join(out_logdir, 'journal.txt')))
+ "journal-out\n", load_file(os.path.join(out_logdir, "journal.txt"))
+ )
self.assertEqual(
- 'results',
+ "results",
load_file(
- os.path.join(out_logdir, 'run', 'cloud-init', 'results.json')))
- fake_stderr.write.assert_any_call('Wrote %s\n' % output_tarfile)
+ os.path.join(out_logdir, "run", "cloud-init", "results.json")
+ ),
+ )
+ fake_stderr.write.assert_any_call("Wrote %s\n" % output_tarfile)
def test_collect_logs_includes_optional_userdata(self, m_getuid):
"""collect-logs include userdata when --include-userdata is set."""
m_getuid.return_value = 0
- log1 = self.tmp_path('cloud-init.log', self.new_root)
- write_file(log1, 'cloud-init-log')
- log2 = self.tmp_path('cloud-init-output.log', self.new_root)
- write_file(log2, 'cloud-init-output-log')
- userdata = self.tmp_path('user-data.txt', self.new_root)
- write_file(userdata, 'user-data')
+ log1 = self.tmp_path("cloud-init.log", self.new_root)
+ write_file(log1, "cloud-init-log")
+ log2 = self.tmp_path("cloud-init-output.log", self.new_root)
+ write_file(log2, "cloud-init-output-log")
+ userdata = self.tmp_path("user-data.txt", self.new_root)
+ write_file(userdata, "user-data")
ensure_dir(self.run_dir)
- write_file(self.tmp_path('results.json', self.run_dir), 'results')
- write_file(self.tmp_path(INSTANCE_JSON_SENSITIVE_FILE, self.run_dir),
- 'sensitive')
- output_tarfile = self.tmp_path('logs.tgz')
+ write_file(self.tmp_path("results.json", self.run_dir), "results")
+ write_file(
+ self.tmp_path(INSTANCE_JSON_SENSITIVE_FILE, self.run_dir),
+ "sensitive",
+ )
+ output_tarfile = self.tmp_path("logs.tgz")
- date = datetime.utcnow().date().strftime('%Y-%m-%d')
- date_logdir = 'cloud-init-logs-{0}'.format(date)
+ date = datetime.utcnow().date().strftime("%Y-%m-%d")
+ date_logdir = "cloud-init-logs-{0}".format(date)
- version_out = '/usr/bin/cloud-init 18.2fake\n'
+ version_out = "/usr/bin/cloud-init 18.2fake\n"
expected_subp = {
- ('dpkg-query', '--show', "-f=${Version}\n", 'cloud-init'):
- '0.7fake',
- ('cloud-init', '--version'): version_out,
- ('dmesg',): 'dmesg-out\n',
- ('journalctl', '--boot=0', '-o', 'short-precise'): 'journal-out\n',
- ('tar', 'czvf', output_tarfile, date_logdir): ''
+ (
+ "dpkg-query",
+ "--show",
+ "-f=${Version}\n",
+ "cloud-init",
+ ): "0.7fake",
+ ("cloud-init", "--version"): version_out,
+ ("dmesg",): "dmesg-out\n",
+ ("journalctl", "--boot=0", "-o", "short-precise"): "journal-out\n",
+ ("tar", "czvf", output_tarfile, date_logdir): "",
}
def fake_subp(cmd):
cmd_tuple = tuple(cmd)
if cmd_tuple not in expected_subp:
raise AssertionError(
- 'Unexpected command provided to subp: {0}'.format(cmd))
- if cmd == ['tar', 'czvf', output_tarfile, date_logdir]:
+ "Unexpected command provided to subp: {0}".format(cmd)
+ )
+ if cmd == ["tar", "czvf", output_tarfile, date_logdir]:
subp(cmd) # Pass through tar cmd so we can check output
- return expected_subp[cmd_tuple], ''
+ return expected_subp[cmd_tuple], ""
fake_stderr = mock.MagicMock()
wrap_and_call(
- 'cloudinit.cmd.devel.logs',
- {'subp': {'side_effect': fake_subp},
- 'sys.stderr': {'new': fake_stderr},
- 'CLOUDINIT_LOGS': {'new': [log1, log2]},
- 'CLOUDINIT_RUN_DIR': {'new': self.run_dir},
- 'USER_DATA_FILE': {'new': userdata}},
- logs.collect_logs, output_tarfile, include_userdata=True)
+ "cloudinit.cmd.devel.logs",
+ {
+ "subp": {"side_effect": fake_subp},
+ "sys.stderr": {"new": fake_stderr},
+ "CLOUDINIT_LOGS": {"new": [log1, log2]},
+ "CLOUDINIT_RUN_DIR": {"new": self.run_dir},
+ "USER_DATA_FILE": {"new": userdata},
+ },
+ logs.collect_logs,
+ output_tarfile,
+ include_userdata=True,
+ )
# unpack the tarfile and check file contents
- subp(['tar', 'zxvf', output_tarfile, '-C', self.new_root])
+ subp(["tar", "zxvf", output_tarfile, "-C", self.new_root])
out_logdir = self.tmp_path(date_logdir, self.new_root)
self.assertEqual(
- 'user-data',
- load_file(os.path.join(out_logdir, 'user-data.txt')))
+ "user-data", load_file(os.path.join(out_logdir, "user-data.txt"))
+ )
self.assertEqual(
- 'sensitive',
- load_file(os.path.join(out_logdir, 'run', 'cloud-init',
- INSTANCE_JSON_SENSITIVE_FILE)))
- fake_stderr.write.assert_any_call('Wrote %s\n' % output_tarfile)
+ "sensitive",
+ load_file(
+ os.path.join(
+ out_logdir,
+ "run",
+ "cloud-init",
+ INSTANCE_JSON_SENSITIVE_FILE,
+ )
+ ),
+ )
+ fake_stderr.write.assert_any_call("Wrote %s\n" % output_tarfile)
diff --git a/tests/unittests/cmd/devel/test_render.py b/tests/unittests/cmd/devel/test_render.py
index c7ddca3d..4afc64f0 100644
--- a/tests/unittests/cmd/devel/test_render.py
+++ b/tests/unittests/cmd/devel/test_render.py
@@ -1,21 +1,21 @@
# This file is part of cloud-init. See LICENSE file for license information.
import os
+from collections import namedtuple
from io import StringIO
-from collections import namedtuple
from cloudinit.cmd.devel import render
from cloudinit.helpers import Paths
from cloudinit.sources import INSTANCE_JSON_FILE, INSTANCE_JSON_SENSITIVE_FILE
-from tests.unittests.helpers import CiTestCase, mock, skipUnlessJinja
from cloudinit.util import ensure_dir, write_file
+from tests.unittests.helpers import CiTestCase, mock, skipUnlessJinja
class TestRender(CiTestCase):
with_logs = True
- args = namedtuple('renderargs', 'user_data instance_data debug')
+ args = namedtuple("renderargs", "user_data instance_data debug")
def setUp(self):
super(TestRender, self).setUp()
@@ -23,122 +23,132 @@ class TestRender(CiTestCase):
def test_handle_args_error_on_missing_user_data(self):
"""When user_data file path does not exist, log an error."""
- absent_file = self.tmp_path('user-data', dir=self.tmp)
- instance_data = self.tmp_path('instance-data', dir=self.tmp)
- write_file(instance_data, '{}')
+ absent_file = self.tmp_path("user-data", dir=self.tmp)
+ instance_data = self.tmp_path("instance-data", dir=self.tmp)
+ write_file(instance_data, "{}")
args = self.args(
- user_data=absent_file, instance_data=instance_data, debug=False)
- with mock.patch('sys.stderr', new_callable=StringIO):
- self.assertEqual(1, render.handle_args('anyname', args))
+ user_data=absent_file, instance_data=instance_data, debug=False
+ )
+ with mock.patch("sys.stderr", new_callable=StringIO):
+ self.assertEqual(1, render.handle_args("anyname", args))
self.assertIn(
- 'Missing user-data file: %s' % absent_file,
- self.logs.getvalue())
+ "Missing user-data file: %s" % absent_file, self.logs.getvalue()
+ )
def test_handle_args_error_on_missing_instance_data(self):
"""When instance_data file path does not exist, log an error."""
- user_data = self.tmp_path('user-data', dir=self.tmp)
- absent_file = self.tmp_path('instance-data', dir=self.tmp)
+ user_data = self.tmp_path("user-data", dir=self.tmp)
+ absent_file = self.tmp_path("instance-data", dir=self.tmp)
args = self.args(
- user_data=user_data, instance_data=absent_file, debug=False)
- with mock.patch('sys.stderr', new_callable=StringIO):
- self.assertEqual(1, render.handle_args('anyname', args))
+ user_data=user_data, instance_data=absent_file, debug=False
+ )
+ with mock.patch("sys.stderr", new_callable=StringIO):
+ self.assertEqual(1, render.handle_args("anyname", args))
self.assertIn(
- 'Missing instance-data.json file: %s' % absent_file,
- self.logs.getvalue())
+ "Missing instance-data.json file: %s" % absent_file,
+ self.logs.getvalue(),
+ )
def test_handle_args_defaults_instance_data(self):
"""When no instance_data argument, default to configured run_dir."""
- user_data = self.tmp_path('user-data', dir=self.tmp)
- run_dir = self.tmp_path('run_dir', dir=self.tmp)
+ user_data = self.tmp_path("user-data", dir=self.tmp)
+ run_dir = self.tmp_path("run_dir", dir=self.tmp)
ensure_dir(run_dir)
- paths = Paths({'run_dir': run_dir})
- self.add_patch('cloudinit.cmd.devel.render.read_cfg_paths', 'm_paths')
+ paths = Paths({"run_dir": run_dir})
+ self.add_patch("cloudinit.cmd.devel.render.read_cfg_paths", "m_paths")
self.m_paths.return_value = paths
- args = self.args(
- user_data=user_data, instance_data=None, debug=False)
- with mock.patch('sys.stderr', new_callable=StringIO):
- self.assertEqual(1, render.handle_args('anyname', args))
+ args = self.args(user_data=user_data, instance_data=None, debug=False)
+ with mock.patch("sys.stderr", new_callable=StringIO):
+ self.assertEqual(1, render.handle_args("anyname", args))
json_file = os.path.join(run_dir, INSTANCE_JSON_FILE)
self.assertIn(
- 'Missing instance-data.json file: %s' % json_file,
- self.logs.getvalue())
+ "Missing instance-data.json file: %s" % json_file,
+ self.logs.getvalue(),
+ )
def test_handle_args_root_fallback_from_sensitive_instance_data(self):
"""When root user defaults to sensitive.json."""
- user_data = self.tmp_path('user-data', dir=self.tmp)
- run_dir = self.tmp_path('run_dir', dir=self.tmp)
+ user_data = self.tmp_path("user-data", dir=self.tmp)
+ run_dir = self.tmp_path("run_dir", dir=self.tmp)
ensure_dir(run_dir)
- paths = Paths({'run_dir': run_dir})
- self.add_patch('cloudinit.cmd.devel.render.read_cfg_paths', 'm_paths')
+ paths = Paths({"run_dir": run_dir})
+ self.add_patch("cloudinit.cmd.devel.render.read_cfg_paths", "m_paths")
self.m_paths.return_value = paths
- args = self.args(
- user_data=user_data, instance_data=None, debug=False)
- with mock.patch('sys.stderr', new_callable=StringIO):
- with mock.patch('os.getuid') as m_getuid:
+ args = self.args(user_data=user_data, instance_data=None, debug=False)
+ with mock.patch("sys.stderr", new_callable=StringIO):
+ with mock.patch("os.getuid") as m_getuid:
m_getuid.return_value = 0
- self.assertEqual(1, render.handle_args('anyname', args))
+ self.assertEqual(1, render.handle_args("anyname", args))
json_file = os.path.join(run_dir, INSTANCE_JSON_FILE)
json_sensitive = os.path.join(run_dir, INSTANCE_JSON_SENSITIVE_FILE)
self.assertIn(
- 'WARNING: Missing root-readable %s. Using redacted %s' % (
- json_sensitive, json_file), self.logs.getvalue())
+ "WARNING: Missing root-readable %s. Using redacted %s"
+ % (json_sensitive, json_file),
+ self.logs.getvalue(),
+ )
self.assertIn(
- 'ERROR: Missing instance-data.json file: %s' % json_file,
- self.logs.getvalue())
+ "ERROR: Missing instance-data.json file: %s" % json_file,
+ self.logs.getvalue(),
+ )
def test_handle_args_root_uses_sensitive_instance_data(self):
"""When root user, and no instance-data arg, use sensitive.json."""
- user_data = self.tmp_path('user-data', dir=self.tmp)
- write_file(user_data, '##template: jinja\nrendering: {{ my_var }}')
- run_dir = self.tmp_path('run_dir', dir=self.tmp)
+ user_data = self.tmp_path("user-data", dir=self.tmp)
+ write_file(user_data, "##template: jinja\nrendering: {{ my_var }}")
+ run_dir = self.tmp_path("run_dir", dir=self.tmp)
ensure_dir(run_dir)
json_sensitive = os.path.join(run_dir, INSTANCE_JSON_SENSITIVE_FILE)
write_file(json_sensitive, '{"my-var": "jinja worked"}')
- paths = Paths({'run_dir': run_dir})
- self.add_patch('cloudinit.cmd.devel.render.read_cfg_paths', 'm_paths')
+ paths = Paths({"run_dir": run_dir})
+ self.add_patch("cloudinit.cmd.devel.render.read_cfg_paths", "m_paths")
self.m_paths.return_value = paths
- args = self.args(
- user_data=user_data, instance_data=None, debug=False)
- with mock.patch('sys.stderr', new_callable=StringIO):
- with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout:
- with mock.patch('os.getuid') as m_getuid:
+ args = self.args(user_data=user_data, instance_data=None, debug=False)
+ with mock.patch("sys.stderr", new_callable=StringIO):
+ with mock.patch("sys.stdout", new_callable=StringIO) as m_stdout:
+ with mock.patch("os.getuid") as m_getuid:
m_getuid.return_value = 0
- self.assertEqual(0, render.handle_args('anyname', args))
- self.assertIn('rendering: jinja worked', m_stdout.getvalue())
+ self.assertEqual(0, render.handle_args("anyname", args))
+ self.assertIn("rendering: jinja worked", m_stdout.getvalue())
@skipUnlessJinja()
def test_handle_args_renders_instance_data_vars_in_template(self):
"""If user_data file is a jinja template render instance-data vars."""
- user_data = self.tmp_path('user-data', dir=self.tmp)
- write_file(user_data, '##template: jinja\nrendering: {{ my_var }}')
- instance_data = self.tmp_path('instance-data', dir=self.tmp)
+ user_data = self.tmp_path("user-data", dir=self.tmp)
+ write_file(user_data, "##template: jinja\nrendering: {{ my_var }}")
+ instance_data = self.tmp_path("instance-data", dir=self.tmp)
write_file(instance_data, '{"my-var": "jinja worked"}')
args = self.args(
- user_data=user_data, instance_data=instance_data, debug=True)
- with mock.patch('sys.stderr', new_callable=StringIO) as m_console_err:
- with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout:
- self.assertEqual(0, render.handle_args('anyname', args))
+ user_data=user_data, instance_data=instance_data, debug=True
+ )
+ with mock.patch("sys.stderr", new_callable=StringIO) as m_console_err:
+ with mock.patch("sys.stdout", new_callable=StringIO) as m_stdout:
+ self.assertEqual(0, render.handle_args("anyname", args))
self.assertIn(
- 'DEBUG: Converted jinja variables\n{', self.logs.getvalue())
+ "DEBUG: Converted jinja variables\n{", self.logs.getvalue()
+ )
self.assertIn(
- 'DEBUG: Converted jinja variables\n{', m_console_err.getvalue())
- self.assertEqual('rendering: jinja worked', m_stdout.getvalue())
+ "DEBUG: Converted jinja variables\n{", m_console_err.getvalue()
+ )
+ self.assertEqual("rendering: jinja worked", m_stdout.getvalue())
@skipUnlessJinja()
def test_handle_args_warns_and_gives_up_on_invalid_jinja_operation(self):
"""If user_data file has invalid jinja operations log warnings."""
- user_data = self.tmp_path('user-data', dir=self.tmp)
- write_file(user_data, '##template: jinja\nrendering: {{ my-var }}')
- instance_data = self.tmp_path('instance-data', dir=self.tmp)
+ user_data = self.tmp_path("user-data", dir=self.tmp)
+ write_file(user_data, "##template: jinja\nrendering: {{ my-var }}")
+ instance_data = self.tmp_path("instance-data", dir=self.tmp)
write_file(instance_data, '{"my-var": "jinja worked"}')
args = self.args(
- user_data=user_data, instance_data=instance_data, debug=True)
- with mock.patch('sys.stderr', new_callable=StringIO):
- self.assertEqual(1, render.handle_args('anyname', args))
+ user_data=user_data, instance_data=instance_data, debug=True
+ )
+ with mock.patch("sys.stderr", new_callable=StringIO):
+ self.assertEqual(1, render.handle_args("anyname", args))
self.assertIn(
- 'WARNING: Ignoring jinja template for %s: Undefined jinja'
+ "WARNING: Ignoring jinja template for %s: Undefined jinja"
' variable: "my-var". Jinja tried subtraction. Perhaps you meant'
' "my_var"?' % user_data,
- self.logs.getvalue())
+ self.logs.getvalue(),
+ )
+
# vi: ts=4 expandtab
diff --git a/tests/unittests/cmd/test_clean.py b/tests/unittests/cmd/test_clean.py
index 3bb0ee9b..7d12017e 100644
--- a/tests/unittests/cmd/test_clean.py
+++ b/tests/unittests/cmd/test_clean.py
@@ -1,29 +1,31 @@
# This file is part of cloud-init. See LICENSE file for license information.
-from cloudinit.cmd import clean
-from cloudinit.util import ensure_dir, sym_link, write_file
-from tests.unittests.helpers import CiTestCase, wrap_and_call, mock
-from collections import namedtuple
import os
+from collections import namedtuple
from io import StringIO
-mypaths = namedtuple('MyPaths', 'cloud_dir')
+from cloudinit.cmd import clean
+from cloudinit.util import ensure_dir, sym_link, write_file
+from tests.unittests.helpers import CiTestCase, mock, wrap_and_call
+mypaths = namedtuple("MyPaths", "cloud_dir")
-class TestClean(CiTestCase):
+class TestClean(CiTestCase):
def setUp(self):
super(TestClean, self).setUp()
self.new_root = self.tmp_dir()
- self.artifact_dir = self.tmp_path('artifacts', self.new_root)
- self.log1 = self.tmp_path('cloud-init.log', self.new_root)
- self.log2 = self.tmp_path('cloud-init-output.log', self.new_root)
+ self.artifact_dir = self.tmp_path("artifacts", self.new_root)
+ self.log1 = self.tmp_path("cloud-init.log", self.new_root)
+ self.log2 = self.tmp_path("cloud-init-output.log", self.new_root)
class FakeInit(object):
- cfg = {'def_log_file': self.log1,
- 'output': {'all': '|tee -a {0}'.format(self.log2)}}
+ cfg = {
+ "def_log_file": self.log1,
+ "output": {"all": "|tee -a {0}".format(self.log2)},
+ }
# Ensure cloud_dir has a trailing slash, to match real behaviour
- paths = mypaths(cloud_dir='{}/'.format(self.artifact_dir))
+ paths = mypaths(cloud_dir="{}/".format(self.artifact_dir))
def __init__(self, ds_deps):
pass
@@ -35,110 +37,133 @@ class TestClean(CiTestCase):
def test_remove_artifacts_removes_logs(self):
"""remove_artifacts removes logs when remove_logs is True."""
- write_file(self.log1, 'cloud-init-log')
- write_file(self.log2, 'cloud-init-output-log')
+ write_file(self.log1, "cloud-init-log")
+ write_file(self.log2, "cloud-init-output-log")
self.assertFalse(
- os.path.exists(self.artifact_dir), 'Unexpected artifacts dir')
+ os.path.exists(self.artifact_dir), "Unexpected artifacts dir"
+ )
retcode = wrap_and_call(
- 'cloudinit.cmd.clean',
- {'Init': {'side_effect': self.init_class}},
- clean.remove_artifacts, remove_logs=True)
- self.assertFalse(os.path.exists(self.log1), 'Unexpected file')
- self.assertFalse(os.path.exists(self.log2), 'Unexpected file')
+ "cloudinit.cmd.clean",
+ {"Init": {"side_effect": self.init_class}},
+ clean.remove_artifacts,
+ remove_logs=True,
+ )
+ self.assertFalse(os.path.exists(self.log1), "Unexpected file")
+ self.assertFalse(os.path.exists(self.log2), "Unexpected file")
self.assertEqual(0, retcode)
def test_remove_artifacts_preserves_logs(self):
"""remove_artifacts leaves logs when remove_logs is False."""
- write_file(self.log1, 'cloud-init-log')
- write_file(self.log2, 'cloud-init-output-log')
+ write_file(self.log1, "cloud-init-log")
+ write_file(self.log2, "cloud-init-output-log")
retcode = wrap_and_call(
- 'cloudinit.cmd.clean',
- {'Init': {'side_effect': self.init_class}},
- clean.remove_artifacts, remove_logs=False)
- self.assertTrue(os.path.exists(self.log1), 'Missing expected file')
- self.assertTrue(os.path.exists(self.log2), 'Missing expected file')
+ "cloudinit.cmd.clean",
+ {"Init": {"side_effect": self.init_class}},
+ clean.remove_artifacts,
+ remove_logs=False,
+ )
+ self.assertTrue(os.path.exists(self.log1), "Missing expected file")
+ self.assertTrue(os.path.exists(self.log2), "Missing expected file")
self.assertEqual(0, retcode)
def test_remove_artifacts_removes_unlinks_symlinks(self):
"""remove_artifacts cleans artifacts dir unlinking any symlinks."""
- dir1 = os.path.join(self.artifact_dir, 'dir1')
+ dir1 = os.path.join(self.artifact_dir, "dir1")
ensure_dir(dir1)
- symlink = os.path.join(self.artifact_dir, 'mylink')
+ symlink = os.path.join(self.artifact_dir, "mylink")
sym_link(dir1, symlink)
retcode = wrap_and_call(
- 'cloudinit.cmd.clean',
- {'Init': {'side_effect': self.init_class}},
- clean.remove_artifacts, remove_logs=False)
+ "cloudinit.cmd.clean",
+ {"Init": {"side_effect": self.init_class}},
+ clean.remove_artifacts,
+ remove_logs=False,
+ )
self.assertEqual(0, retcode)
for path in (dir1, symlink):
self.assertFalse(
- os.path.exists(path),
- 'Unexpected {0} dir'.format(path))
+ os.path.exists(path), "Unexpected {0} dir".format(path)
+ )
def test_remove_artifacts_removes_artifacts_skipping_seed(self):
"""remove_artifacts cleans artifacts dir with exception of seed dir."""
dirs = [
self.artifact_dir,
- os.path.join(self.artifact_dir, 'seed'),
- os.path.join(self.artifact_dir, 'dir1'),
- os.path.join(self.artifact_dir, 'dir2')]
+ os.path.join(self.artifact_dir, "seed"),
+ os.path.join(self.artifact_dir, "dir1"),
+ os.path.join(self.artifact_dir, "dir2"),
+ ]
for _dir in dirs:
ensure_dir(_dir)
retcode = wrap_and_call(
- 'cloudinit.cmd.clean',
- {'Init': {'side_effect': self.init_class}},
- clean.remove_artifacts, remove_logs=False)
+ "cloudinit.cmd.clean",
+ {"Init": {"side_effect": self.init_class}},
+ clean.remove_artifacts,
+ remove_logs=False,
+ )
self.assertEqual(0, retcode)
for expected_dir in dirs[:2]:
self.assertTrue(
os.path.exists(expected_dir),
- 'Missing {0} dir'.format(expected_dir))
+ "Missing {0} dir".format(expected_dir),
+ )
for deleted_dir in dirs[2:]:
self.assertFalse(
os.path.exists(deleted_dir),
- 'Unexpected {0} dir'.format(deleted_dir))
+ "Unexpected {0} dir".format(deleted_dir),
+ )
def test_remove_artifacts_removes_artifacts_removes_seed(self):
"""remove_artifacts removes seed dir when remove_seed is True."""
dirs = [
self.artifact_dir,
- os.path.join(self.artifact_dir, 'seed'),
- os.path.join(self.artifact_dir, 'dir1'),
- os.path.join(self.artifact_dir, 'dir2')]
+ os.path.join(self.artifact_dir, "seed"),
+ os.path.join(self.artifact_dir, "dir1"),
+ os.path.join(self.artifact_dir, "dir2"),
+ ]
for _dir in dirs:
ensure_dir(_dir)
retcode = wrap_and_call(
- 'cloudinit.cmd.clean',
- {'Init': {'side_effect': self.init_class}},
- clean.remove_artifacts, remove_logs=False, remove_seed=True)
+ "cloudinit.cmd.clean",
+ {"Init": {"side_effect": self.init_class}},
+ clean.remove_artifacts,
+ remove_logs=False,
+ remove_seed=True,
+ )
self.assertEqual(0, retcode)
self.assertTrue(
- os.path.exists(self.artifact_dir), 'Missing artifact dir')
+ os.path.exists(self.artifact_dir), "Missing artifact dir"
+ )
for deleted_dir in dirs[1:]:
self.assertFalse(
os.path.exists(deleted_dir),
- 'Unexpected {0} dir'.format(deleted_dir))
+ "Unexpected {0} dir".format(deleted_dir),
+ )
def test_remove_artifacts_returns_one_on_errors(self):
"""remove_artifacts returns non-zero on failure and prints an error."""
ensure_dir(self.artifact_dir)
- ensure_dir(os.path.join(self.artifact_dir, 'dir1'))
+ ensure_dir(os.path.join(self.artifact_dir, "dir1"))
- with mock.patch('sys.stderr', new_callable=StringIO) as m_stderr:
+ with mock.patch("sys.stderr", new_callable=StringIO) as m_stderr:
retcode = wrap_and_call(
- 'cloudinit.cmd.clean',
- {'del_dir': {'side_effect': OSError('oops')},
- 'Init': {'side_effect': self.init_class}},
- clean.remove_artifacts, remove_logs=False)
+ "cloudinit.cmd.clean",
+ {
+ "del_dir": {"side_effect": OSError("oops")},
+ "Init": {"side_effect": self.init_class},
+ },
+ clean.remove_artifacts,
+ remove_logs=False,
+ )
self.assertEqual(1, retcode)
self.assertEqual(
- 'Error:\nCould not remove %s/dir1: oops\n' % self.artifact_dir,
- m_stderr.getvalue())
+ "Error:\nCould not remove %s/dir1: oops\n" % self.artifact_dir,
+ m_stderr.getvalue(),
+ )
def test_handle_clean_args_reboots(self):
"""handle_clean_args_reboots when reboot arg is provided."""
@@ -147,32 +172,40 @@ class TestClean(CiTestCase):
def fake_subp(cmd, capture):
called_cmds.append((cmd, capture))
- return '', ''
+ return "", ""
- myargs = namedtuple('MyArgs', 'remove_logs remove_seed reboot')
+ myargs = namedtuple("MyArgs", "remove_logs remove_seed reboot")
cmdargs = myargs(remove_logs=False, remove_seed=False, reboot=True)
retcode = wrap_and_call(
- 'cloudinit.cmd.clean',
- {'subp': {'side_effect': fake_subp},
- 'Init': {'side_effect': self.init_class}},
- clean.handle_clean_args, name='does not matter', args=cmdargs)
+ "cloudinit.cmd.clean",
+ {
+ "subp": {"side_effect": fake_subp},
+ "Init": {"side_effect": self.init_class},
+ },
+ clean.handle_clean_args,
+ name="does not matter",
+ args=cmdargs,
+ )
self.assertEqual(0, retcode)
- self.assertEqual(
- [(['shutdown', '-r', 'now'], False)], called_cmds)
+ self.assertEqual([(["shutdown", "-r", "now"], False)], called_cmds)
def test_status_main(self):
- '''clean.main can be run as a standalone script.'''
- write_file(self.log1, 'cloud-init-log')
+ """clean.main can be run as a standalone script."""
+ write_file(self.log1, "cloud-init-log")
with self.assertRaises(SystemExit) as context_manager:
wrap_and_call(
- 'cloudinit.cmd.clean',
- {'Init': {'side_effect': self.init_class},
- 'sys.argv': {'new': ['clean', '--logs']}},
- clean.main)
+ "cloudinit.cmd.clean",
+ {
+ "Init": {"side_effect": self.init_class},
+ "sys.argv": {"new": ["clean", "--logs"]},
+ },
+ clean.main,
+ )
self.assertEqual(0, context_manager.exception.code)
self.assertFalse(
- os.path.exists(self.log1), 'Unexpected log {0}'.format(self.log1))
+ os.path.exists(self.log1), "Unexpected log {0}".format(self.log1)
+ )
# vi: ts=4 expandtab syntax=python
diff --git a/tests/unittests/cmd/test_cloud_id.py b/tests/unittests/cmd/test_cloud_id.py
index 9a010402..42941d4f 100644
--- a/tests/unittests/cmd/test_cloud_id.py
+++ b/tests/unittests/cmd/test_cloud_id.py
@@ -2,41 +2,45 @@
"""Tests for cloud-id command line utility."""
-from cloudinit import util
from collections import namedtuple
from io import StringIO
+from cloudinit import util
from cloudinit.cmd import cloud_id
-
from tests.unittests.helpers import CiTestCase, mock
class TestCloudId(CiTestCase):
- args = namedtuple('cloudidargs', ('instance_data json long'))
+ args = namedtuple("cloudidargs", "instance_data json long")
def setUp(self):
super(TestCloudId, self).setUp()
self.tmp = self.tmp_dir()
- self.instance_data = self.tmp_path('instance-data.json', dir=self.tmp)
+ self.instance_data = self.tmp_path("instance-data.json", dir=self.tmp)
def test_cloud_id_arg_parser_defaults(self):
"""Validate the argument defaults when not provided by the end-user."""
- cmd = ['cloud-id']
- with mock.patch('sys.argv', cmd):
+ cmd = ["cloud-id"]
+ with mock.patch("sys.argv", cmd):
args = cloud_id.get_parser().parse_args()
self.assertEqual(
- '/run/cloud-init/instance-data.json',
- args.instance_data)
+ "/run/cloud-init/instance-data.json", args.instance_data
+ )
self.assertEqual(False, args.long)
self.assertEqual(False, args.json)
def test_cloud_id_arg_parse_overrides(self):
"""Override argument defaults by specifying values for each param."""
- util.write_file(self.instance_data, '{}')
- cmd = ['cloud-id', '--instance-data', self.instance_data, '--long',
- '--json']
- with mock.patch('sys.argv', cmd):
+ util.write_file(self.instance_data, "{}")
+ cmd = [
+ "cloud-id",
+ "--instance-data",
+ self.instance_data,
+ "--long",
+ "--json",
+ ]
+ with mock.patch("sys.argv", cmd):
args = cloud_id.get_parser().parse_args()
self.assertEqual(self.instance_data, args.instance_data)
self.assertEqual(True, args.long)
@@ -44,37 +48,40 @@ class TestCloudId(CiTestCase):
def test_cloud_id_missing_instance_data_json(self):
"""Exit error when the provided instance-data.json does not exist."""
- cmd = ['cloud-id', '--instance-data', self.instance_data]
- with mock.patch('sys.argv', cmd):
- with mock.patch('sys.stderr', new_callable=StringIO) as m_stderr:
+ cmd = ["cloud-id", "--instance-data", self.instance_data]
+ with mock.patch("sys.argv", cmd):
+ with mock.patch("sys.stderr", new_callable=StringIO) as m_stderr:
with self.assertRaises(SystemExit) as context_manager:
cloud_id.main()
self.assertEqual(1, context_manager.exception.code)
self.assertIn(
"Error:\nFile not found '%s'" % self.instance_data,
- m_stderr.getvalue())
+ m_stderr.getvalue(),
+ )
def test_cloud_id_non_json_instance_data(self):
"""Exit error when the provided instance-data.json is not json."""
- cmd = ['cloud-id', '--instance-data', self.instance_data]
- util.write_file(self.instance_data, '{')
- with mock.patch('sys.argv', cmd):
- with mock.patch('sys.stderr', new_callable=StringIO) as m_stderr:
+ cmd = ["cloud-id", "--instance-data", self.instance_data]
+ util.write_file(self.instance_data, "{")
+ with mock.patch("sys.argv", cmd):
+ with mock.patch("sys.stderr", new_callable=StringIO) as m_stderr:
with self.assertRaises(SystemExit) as context_manager:
cloud_id.main()
self.assertEqual(1, context_manager.exception.code)
self.assertIn(
"Error:\nFile '%s' is not valid json." % self.instance_data,
- m_stderr.getvalue())
+ m_stderr.getvalue(),
+ )
def test_cloud_id_from_cloud_name_in_instance_data(self):
"""Report canonical cloud-id from cloud_name in instance-data."""
util.write_file(
self.instance_data,
- '{"v1": {"cloud_name": "mycloud", "region": "somereg"}}')
- cmd = ['cloud-id', '--instance-data', self.instance_data]
- with mock.patch('sys.argv', cmd):
- with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout:
+ '{"v1": {"cloud_name": "mycloud", "region": "somereg"}}',
+ )
+ cmd = ["cloud-id", "--instance-data", self.instance_data]
+ with mock.patch("sys.argv", cmd):
+ with mock.patch("sys.stdout", new_callable=StringIO) as m_stdout:
with self.assertRaises(SystemExit) as context_manager:
cloud_id.main()
self.assertEqual(0, context_manager.exception.code)
@@ -84,10 +91,11 @@ class TestCloudId(CiTestCase):
"""Report long cloud-id format from cloud_name and region."""
util.write_file(
self.instance_data,
- '{"v1": {"cloud_name": "mycloud", "region": "somereg"}}')
- cmd = ['cloud-id', '--instance-data', self.instance_data, '--long']
- with mock.patch('sys.argv', cmd):
- with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout:
+ '{"v1": {"cloud_name": "mycloud", "region": "somereg"}}',
+ )
+ cmd = ["cloud-id", "--instance-data", self.instance_data, "--long"]
+ with mock.patch("sys.argv", cmd):
+ with mock.patch("sys.stdout", new_callable=StringIO) as m_stdout:
with self.assertRaises(SystemExit) as context_manager:
cloud_id.main()
self.assertEqual(0, context_manager.exception.code)
@@ -98,10 +106,11 @@ class TestCloudId(CiTestCase):
util.write_file(
self.instance_data,
'{"v1": {"cloud_name": "aws", "region": "cn-north-1",'
- ' "platform": "ec2"}}')
- cmd = ['cloud-id', '--instance-data', self.instance_data, '--long']
- with mock.patch('sys.argv', cmd):
- with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout:
+ ' "platform": "ec2"}}',
+ )
+ cmd = ["cloud-id", "--instance-data", self.instance_data, "--long"]
+ with mock.patch("sys.argv", cmd):
+ with mock.patch("sys.stdout", new_callable=StringIO) as m_stdout:
with self.assertRaises(SystemExit) as context_manager:
cloud_id.main()
self.assertEqual(0, context_manager.exception.code)
@@ -112,16 +121,24 @@ class TestCloudId(CiTestCase):
util.write_file(
self.instance_data,
'{"v1": {"cloud_name": "unknown", "region": "dfw",'
- ' "platform": "openstack", "public_ssh_keys": []}}')
- expected = util.json_dumps({
- 'cloud_id': 'openstack', 'cloud_name': 'unknown',
- 'platform': 'openstack', 'public_ssh_keys': [], 'region': 'dfw'})
- cmd = ['cloud-id', '--instance-data', self.instance_data, '--json']
- with mock.patch('sys.argv', cmd):
- with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout:
+ ' "platform": "openstack", "public_ssh_keys": []}}',
+ )
+ expected = util.json_dumps(
+ {
+ "cloud_id": "openstack",
+ "cloud_name": "unknown",
+ "platform": "openstack",
+ "public_ssh_keys": [],
+ "region": "dfw",
+ }
+ )
+ cmd = ["cloud-id", "--instance-data", self.instance_data, "--json"]
+ with mock.patch("sys.argv", cmd):
+ with mock.patch("sys.stdout", new_callable=StringIO) as m_stdout:
with self.assertRaises(SystemExit) as context_manager:
cloud_id.main()
self.assertEqual(0, context_manager.exception.code)
- self.assertEqual(expected + '\n', m_stdout.getvalue())
+ self.assertEqual(expected + "\n", m_stdout.getvalue())
+
# vi: ts=4 expandtab
diff --git a/tests/unittests/cmd/test_main.py b/tests/unittests/cmd/test_main.py
index e1ce682b..3e778b0b 100644
--- a/tests/unittests/cmd/test_main.py
+++ b/tests/unittests/cmd/test_main.py
@@ -1,22 +1,20 @@
# This file is part of cloud-init. See LICENSE file for license information.
-from collections import namedtuple
import copy
import os
+from collections import namedtuple
from io import StringIO
from unittest import mock
import pytest
-from cloudinit.cmd import main
from cloudinit import safeyaml
-from cloudinit.util import (
- ensure_dir, load_file, write_file)
-from tests.unittests.helpers import (
- FilesystemMockingTestCase, wrap_and_call)
+from cloudinit.cmd import main
+from cloudinit.util import ensure_dir, load_file, write_file
+from tests.unittests.helpers import FilesystemMockingTestCase, wrap_and_call
-mypaths = namedtuple('MyPaths', 'run_dir')
-myargs = namedtuple('MyArgs', 'debug files force local reporter subcommand')
+mypaths = namedtuple("MyPaths", "run_dir")
+myargs = namedtuple("MyArgs", "debug files force local reporter subcommand")
class TestMain(FilesystemMockingTestCase):
@@ -26,27 +24,32 @@ class TestMain(FilesystemMockingTestCase):
def setUp(self):
super(TestMain, self).setUp()
self.new_root = self.tmp_dir()
- self.cloud_dir = self.tmp_path('var/lib/cloud/', dir=self.new_root)
+ self.cloud_dir = self.tmp_path("var/lib/cloud/", dir=self.new_root)
os.makedirs(self.cloud_dir)
- self.replicateTestRoot('simple_ubuntu', self.new_root)
+ self.replicateTestRoot("simple_ubuntu", self.new_root)
self.cfg = {
- 'datasource_list': ['None'],
- 'runcmd': ['ls /etc'], # test ALL_DISTROS
- 'system_info': {'paths': {'cloud_dir': self.cloud_dir,
- 'run_dir': self.new_root}},
- 'write_files': [
+ "datasource_list": ["None"],
+ "runcmd": ["ls /etc"], # test ALL_DISTROS
+ "system_info": {
+ "paths": {
+ "cloud_dir": self.cloud_dir,
+ "run_dir": self.new_root,
+ }
+ },
+ "write_files": [
{
- 'path': '/etc/blah.ini',
- 'content': 'blah',
- 'permissions': 0o755,
+ "path": "/etc/blah.ini",
+ "content": "blah",
+ "permissions": 0o755,
},
],
- 'cloud_init_modules': ['write-files', 'runcmd'],
+ "cloud_init_modules": ["write-files", "runcmd"],
}
cloud_cfg = safeyaml.dumps(self.cfg)
- ensure_dir(os.path.join(self.new_root, 'etc', 'cloud'))
+ ensure_dir(os.path.join(self.new_root, "etc", "cloud"))
self.cloud_cfg_file = os.path.join(
- self.new_root, 'etc', 'cloud', 'cloud.cfg')
+ self.new_root, "etc", "cloud", "cloud.cfg"
+ )
write_file(self.cloud_cfg_file, cloud_cfg)
self.patchOS(self.new_root)
self.patchUtils(self.new_root)
@@ -55,31 +58,44 @@ class TestMain(FilesystemMockingTestCase):
def test_main_init_run_net_stops_on_file_no_net(self):
"""When no-net file is present, main_init does not process modules."""
- stop_file = os.path.join(self.cloud_dir, 'data', 'no-net') # stop file
- write_file(stop_file, '')
+ stop_file = os.path.join(self.cloud_dir, "data", "no-net") # stop file
+ write_file(stop_file, "")
cmdargs = myargs(
- debug=False, files=None, force=False, local=False, reporter=None,
- subcommand='init')
+ debug=False,
+ files=None,
+ force=False,
+ local=False,
+ reporter=None,
+ subcommand="init",
+ )
(_item1, item2) = wrap_and_call(
- 'cloudinit.cmd.main',
- {'util.close_stdin': True,
- 'netinfo.debug_info': 'my net debug info',
- 'util.fixup_output': ('outfmt', 'errfmt')},
- main.main_init, 'init', cmdargs)
+ "cloudinit.cmd.main",
+ {
+ "util.close_stdin": True,
+ "netinfo.debug_info": "my net debug info",
+ "util.fixup_output": ("outfmt", "errfmt"),
+ },
+ main.main_init,
+ "init",
+ cmdargs,
+ )
# We should not run write_files module
self.assertFalse(
- os.path.exists(os.path.join(self.new_root, 'etc/blah.ini')),
- 'Unexpected run of write_files module produced blah.ini')
+ os.path.exists(os.path.join(self.new_root, "etc/blah.ini")),
+ "Unexpected run of write_files module produced blah.ini",
+ )
self.assertEqual([], item2)
# Instancify is called
- instance_id_path = 'var/lib/cloud/data/instance-id'
+ instance_id_path = "var/lib/cloud/data/instance-id"
self.assertFalse(
os.path.exists(os.path.join(self.new_root, instance_id_path)),
- 'Unexpected call to datasource.instancify produced instance-id')
+ "Unexpected call to datasource.instancify produced instance-id",
+ )
expected_logs = [
"Exiting. stop file ['{stop_file}'] existed\n".format(
- stop_file=stop_file),
- 'my net debug info' # netinfo.debug_info
+ stop_file=stop_file
+ ),
+ "my net debug info", # netinfo.debug_info
]
for log in expected_logs:
self.assertIn(log, self.stderr.getvalue())
@@ -87,97 +103,133 @@ class TestMain(FilesystemMockingTestCase):
def test_main_init_run_net_runs_modules(self):
"""Modules like write_files are run in 'net' mode."""
cmdargs = myargs(
- debug=False, files=None, force=False, local=False, reporter=None,
- subcommand='init')
+ debug=False,
+ files=None,
+ force=False,
+ local=False,
+ reporter=None,
+ subcommand="init",
+ )
(_item1, item2) = wrap_and_call(
- 'cloudinit.cmd.main',
- {'util.close_stdin': True,
- 'netinfo.debug_info': 'my net debug info',
- 'util.fixup_output': ('outfmt', 'errfmt')},
- main.main_init, 'init', cmdargs)
+ "cloudinit.cmd.main",
+ {
+ "util.close_stdin": True,
+ "netinfo.debug_info": "my net debug info",
+ "util.fixup_output": ("outfmt", "errfmt"),
+ },
+ main.main_init,
+ "init",
+ cmdargs,
+ )
self.assertEqual([], item2)
# Instancify is called
- instance_id_path = 'var/lib/cloud/data/instance-id'
+ instance_id_path = "var/lib/cloud/data/instance-id"
self.assertEqual(
- 'iid-datasource-none\n',
- os.path.join(load_file(
- os.path.join(self.new_root, instance_id_path))))
+ "iid-datasource-none\n",
+ os.path.join(
+ load_file(os.path.join(self.new_root, instance_id_path))
+ ),
+ )
# modules are run (including write_files)
self.assertEqual(
- 'blah', load_file(os.path.join(self.new_root, 'etc/blah.ini')))
+ "blah", load_file(os.path.join(self.new_root, "etc/blah.ini"))
+ )
expected_logs = [
- 'network config is disabled by fallback', # apply_network_config
- 'my net debug info', # netinfo.debug_info
- 'no previous run detected'
+ "network config is disabled by fallback", # apply_network_config
+ "my net debug info", # netinfo.debug_info
+ "no previous run detected",
]
for log in expected_logs:
self.assertIn(log, self.stderr.getvalue())
def test_main_init_run_net_calls_set_hostname_when_metadata_present(self):
"""When local-hostname metadata is present, call cc_set_hostname."""
- self.cfg['datasource'] = {
- 'None': {'metadata': {'local-hostname': 'md-hostname'}}}
+ self.cfg["datasource"] = {
+ "None": {"metadata": {"local-hostname": "md-hostname"}}
+ }
cloud_cfg = safeyaml.dumps(self.cfg)
write_file(self.cloud_cfg_file, cloud_cfg)
cmdargs = myargs(
- debug=False, files=None, force=False, local=False, reporter=None,
- subcommand='init')
+ debug=False,
+ files=None,
+ force=False,
+ local=False,
+ reporter=None,
+ subcommand="init",
+ )
def set_hostname(name, cfg, cloud, log, args):
- self.assertEqual('set-hostname', name)
+ self.assertEqual("set-hostname", name)
updated_cfg = copy.deepcopy(self.cfg)
updated_cfg.update(
- {'def_log_file': '/var/log/cloud-init.log',
- 'log_cfgs': [],
- 'syslog_fix_perms': [
- 'syslog:adm', 'root:adm', 'root:wheel', 'root:root'
- ],
- 'vendor_data': {'enabled': True, 'prefix': []},
- 'vendor_data2': {'enabled': True, 'prefix': []}})
- updated_cfg.pop('system_info')
+ {
+ "def_log_file": "/var/log/cloud-init.log",
+ "log_cfgs": [],
+ "syslog_fix_perms": [
+ "syslog:adm",
+ "root:adm",
+ "root:wheel",
+ "root:root",
+ ],
+ "vendor_data": {"enabled": True, "prefix": []},
+ "vendor_data2": {"enabled": True, "prefix": []},
+ }
+ )
+ updated_cfg.pop("system_info")
self.assertEqual(updated_cfg, cfg)
self.assertEqual(main.LOG, log)
self.assertIsNone(args)
(_item1, item2) = wrap_and_call(
- 'cloudinit.cmd.main',
- {'util.close_stdin': True,
- 'netinfo.debug_info': 'my net debug info',
- 'cc_set_hostname.handle': {'side_effect': set_hostname},
- 'util.fixup_output': ('outfmt', 'errfmt')},
- main.main_init, 'init', cmdargs)
+ "cloudinit.cmd.main",
+ {
+ "util.close_stdin": True,
+ "netinfo.debug_info": "my net debug info",
+ "cc_set_hostname.handle": {"side_effect": set_hostname},
+ "util.fixup_output": ("outfmt", "errfmt"),
+ },
+ main.main_init,
+ "init",
+ cmdargs,
+ )
self.assertEqual([], item2)
# Instancify is called
- instance_id_path = 'var/lib/cloud/data/instance-id'
+ instance_id_path = "var/lib/cloud/data/instance-id"
self.assertEqual(
- 'iid-datasource-none\n',
- os.path.join(load_file(
- os.path.join(self.new_root, instance_id_path))))
+ "iid-datasource-none\n",
+ os.path.join(
+ load_file(os.path.join(self.new_root, instance_id_path))
+ ),
+ )
# modules are run (including write_files)
self.assertEqual(
- 'blah', load_file(os.path.join(self.new_root, 'etc/blah.ini')))
+ "blah", load_file(os.path.join(self.new_root, "etc/blah.ini"))
+ )
expected_logs = [
- 'network config is disabled by fallback', # apply_network_config
- 'my net debug info', # netinfo.debug_info
- 'no previous run detected'
+ "network config is disabled by fallback", # apply_network_config
+ "my net debug info", # netinfo.debug_info
+ "no previous run detected",
]
for log in expected_logs:
self.assertIn(log, self.stderr.getvalue())
class TestShouldBringUpInterfaces:
- @pytest.mark.parametrize('cfg_disable,args_local,expected', [
- (True, True, False),
- (True, False, False),
- (False, True, False),
- (False, False, True),
- ])
+ @pytest.mark.parametrize(
+ "cfg_disable,args_local,expected",
+ [
+ (True, True, False),
+ (True, False, False),
+ (False, True, False),
+ (False, False, True),
+ ],
+ )
def test_should_bring_up_interfaces(
self, cfg_disable, args_local, expected
):
init = mock.Mock()
- init.cfg = {'disable_network_activation': cfg_disable}
+ init.cfg = {"disable_network_activation": cfg_disable}
args = mock.Mock()
args.local = args_local
@@ -185,4 +237,5 @@ class TestShouldBringUpInterfaces:
result = main._should_bring_up_interfaces(init, args)
assert result == expected
+
# vi: ts=4 expandtab
diff --git a/tests/unittests/cmd/test_query.py b/tests/unittests/cmd/test_query.py
index b7d02d13..03a73bb5 100644
--- a/tests/unittests/cmd/test_query.py
+++ b/tests/unittests/cmd/test_query.py
@@ -4,19 +4,21 @@ import errno
import gzip
import json
import os
+from collections import namedtuple
from io import BytesIO
from textwrap import dedent
import pytest
-from collections import namedtuple
from cloudinit.cmd import query
from cloudinit.helpers import Paths
from cloudinit.sources import (
- REDACT_SENSITIVE_VALUE, INSTANCE_JSON_FILE, INSTANCE_JSON_SENSITIVE_FILE)
-from tests.unittests.helpers import mock
-
+ INSTANCE_JSON_FILE,
+ INSTANCE_JSON_SENSITIVE_FILE,
+ REDACT_SENSITIVE_VALUE,
+)
from cloudinit.util import b64e, write_file
+from tests.unittests.helpers import mock
def _gzip_data(data):
@@ -30,9 +32,10 @@ def _gzip_data(data):
class TestQuery:
args = namedtuple(
- 'queryargs',
- ('debug dump_all format instance_data list_keys user_data vendor_data'
- ' varname'))
+ "queryargs",
+ "debug dump_all format instance_data list_keys user_data vendor_data"
+ " varname",
+ )
def _setup_paths(self, tmpdir, ud_val=None, vd_val=None):
"""Write userdata and vendordata into a tmpdir.
@@ -41,153 +44,191 @@ class TestQuery:
4-tuple : (paths, run_dir_path, userdata_path, vendordata_path)
"""
if ud_val:
- user_data = tmpdir.join('user-data')
+ user_data = tmpdir.join("user-data")
write_file(user_data.strpath, ud_val)
else:
user_data = None
if vd_val:
- vendor_data = tmpdir.join('vendor-data')
+ vendor_data = tmpdir.join("vendor-data")
write_file(vendor_data.strpath, vd_val)
else:
vendor_data = None
- run_dir = tmpdir.join('run_dir')
+ run_dir = tmpdir.join("run_dir")
run_dir.ensure_dir()
- cloud_dir = tmpdir.join('cloud_dir')
+ cloud_dir = tmpdir.join("cloud_dir")
cloud_dir.ensure_dir()
return (
Paths(
- {'cloud_dir': cloud_dir.strpath, 'run_dir': run_dir.strpath}
+ {"cloud_dir": cloud_dir.strpath, "run_dir": run_dir.strpath}
),
run_dir,
user_data,
- vendor_data
+ vendor_data,
)
def test_handle_args_error_on_missing_param(self, caplog, capsys):
"""Error when missing required parameters and print usage."""
args = self.args(
- debug=False, dump_all=False, format=None, instance_data=None,
- list_keys=False, user_data=None, vendor_data=None, varname=None)
+ debug=False,
+ dump_all=False,
+ format=None,
+ instance_data=None,
+ list_keys=False,
+ user_data=None,
+ vendor_data=None,
+ varname=None,
+ )
with mock.patch(
"cloudinit.cmd.query.addLogHandlerCLI", return_value=""
) as m_cli_log:
- assert 1 == query.handle_args('anyname', args)
+ assert 1 == query.handle_args("anyname", args)
expected_error = (
- 'Expected one of the options: --all, --format, --list-keys'
- ' or varname\n')
+ "Expected one of the options: --all, --format, --list-keys"
+ " or varname\n"
+ )
assert expected_error in caplog.text
out, _err = capsys.readouterr()
- assert 'usage: query' in out
+ assert "usage: query" in out
assert 1 == m_cli_log.call_count
@pytest.mark.parametrize(
- "inst_data,varname,expected_error", (
+ "inst_data,varname,expected_error",
+ (
(
'{"v1": {"key-2": "value-2"}}',
- 'v1.absent_leaf',
- "instance-data 'v1' has no 'absent_leaf'\n"
+ "v1.absent_leaf",
+ "instance-data 'v1' has no 'absent_leaf'\n",
),
(
'{"v1": {"key-2": "value-2"}}',
- 'absent_key',
- "Undefined instance-data key 'absent_key'\n"
+ "absent_key",
+ "Undefined instance-data key 'absent_key'\n",
),
- )
+ ),
)
def test_handle_args_error_on_invalid_vaname_paths(
self, inst_data, varname, expected_error, caplog, tmpdir
):
"""Error when varname is not a valid instance-data variable path."""
- instance_data = tmpdir.join('instance-data')
+ instance_data = tmpdir.join("instance-data")
instance_data.write(inst_data)
args = self.args(
- debug=False, dump_all=False, format=None,
+ debug=False,
+ dump_all=False,
+ format=None,
instance_data=instance_data.strpath,
- list_keys=False, user_data=None, vendor_data=None, varname=varname
+ list_keys=False,
+ user_data=None,
+ vendor_data=None,
+ varname=varname,
)
paths, _, _, _ = self._setup_paths(tmpdir)
- with mock.patch('cloudinit.cmd.query.read_cfg_paths') as m_paths:
+ with mock.patch("cloudinit.cmd.query.read_cfg_paths") as m_paths:
m_paths.return_value = paths
with mock.patch(
"cloudinit.cmd.query.addLogHandlerCLI", return_value=""
):
- with mock.patch('cloudinit.cmd.query.load_userdata') as m_lud:
+ with mock.patch("cloudinit.cmd.query.load_userdata") as m_lud:
m_lud.return_value = "ud"
- assert 1 == query.handle_args('anyname', args)
+ assert 1 == query.handle_args("anyname", args)
assert expected_error in caplog.text
def test_handle_args_error_on_missing_instance_data(self, caplog, tmpdir):
"""When instance_data file path does not exist, log an error."""
- absent_fn = tmpdir.join('absent')
+ absent_fn = tmpdir.join("absent")
args = self.args(
- debug=False, dump_all=True, format=None,
+ debug=False,
+ dump_all=True,
+ format=None,
instance_data=absent_fn.strpath,
- list_keys=False, user_data='ud', vendor_data='vd', varname=None)
- assert 1 == query.handle_args('anyname', args)
+ list_keys=False,
+ user_data="ud",
+ vendor_data="vd",
+ varname=None,
+ )
+ assert 1 == query.handle_args("anyname", args)
- msg = 'Missing instance-data file: %s' % absent_fn
+ msg = "Missing instance-data file: %s" % absent_fn
assert msg in caplog.text
def test_handle_args_error_when_no_read_permission_instance_data(
self, caplog, tmpdir
):
"""When instance_data file is unreadable, log an error."""
- noread_fn = tmpdir.join('unreadable')
- noread_fn.write('thou shall not pass')
+ noread_fn = tmpdir.join("unreadable")
+ noread_fn.write("thou shall not pass")
args = self.args(
- debug=False, dump_all=True, format=None,
+ debug=False,
+ dump_all=True,
+ format=None,
instance_data=noread_fn.strpath,
- list_keys=False, user_data='ud', vendor_data='vd', varname=None)
- with mock.patch('cloudinit.cmd.query.util.load_file') as m_load:
- m_load.side_effect = OSError(errno.EACCES, 'Not allowed')
- assert 1 == query.handle_args('anyname', args)
+ list_keys=False,
+ user_data="ud",
+ vendor_data="vd",
+ varname=None,
+ )
+ with mock.patch("cloudinit.cmd.query.util.load_file") as m_load:
+ m_load.side_effect = OSError(errno.EACCES, "Not allowed")
+ assert 1 == query.handle_args("anyname", args)
msg = "No read permission on '%s'. Try sudo" % noread_fn
assert msg in caplog.text
def test_handle_args_defaults_instance_data(self, caplog, tmpdir):
"""When no instance_data argument, default to configured run_dir."""
args = self.args(
- debug=False, dump_all=True, format=None, instance_data=None,
- list_keys=False, user_data=None, vendor_data=None, varname=None)
+ debug=False,
+ dump_all=True,
+ format=None,
+ instance_data=None,
+ list_keys=False,
+ user_data=None,
+ vendor_data=None,
+ varname=None,
+ )
paths, run_dir, _, _ = self._setup_paths(tmpdir)
- with mock.patch('cloudinit.cmd.query.read_cfg_paths') as m_paths:
+ with mock.patch("cloudinit.cmd.query.read_cfg_paths") as m_paths:
m_paths.return_value = paths
- assert 1 == query.handle_args('anyname', args)
+ assert 1 == query.handle_args("anyname", args)
json_file = run_dir.join(INSTANCE_JSON_FILE)
- msg = 'Missing instance-data file: %s' % json_file.strpath
+ msg = "Missing instance-data file: %s" % json_file.strpath
assert msg in caplog.text
def test_handle_args_root_fallsback_to_instance_data(self, caplog, tmpdir):
"""When no instance_data argument, root falls back to redacted json."""
args = self.args(
- debug=False, dump_all=True, format=None, instance_data=None,
- list_keys=False, user_data=None, vendor_data=None, varname=None)
+ debug=False,
+ dump_all=True,
+ format=None,
+ instance_data=None,
+ list_keys=False,
+ user_data=None,
+ vendor_data=None,
+ varname=None,
+ )
paths, run_dir, _, _ = self._setup_paths(tmpdir)
- with mock.patch('cloudinit.cmd.query.read_cfg_paths') as m_paths:
+ with mock.patch("cloudinit.cmd.query.read_cfg_paths") as m_paths:
m_paths.return_value = paths
- with mock.patch('os.getuid') as m_getuid:
+ with mock.patch("os.getuid") as m_getuid:
m_getuid.return_value = 0
- assert 1 == query.handle_args('anyname', args)
+ assert 1 == query.handle_args("anyname", args)
json_file = run_dir.join(INSTANCE_JSON_FILE)
sensitive_file = run_dir.join(INSTANCE_JSON_SENSITIVE_FILE)
- msg = (
- 'Missing root-readable %s. Using redacted %s instead.' %
- (
- sensitive_file.strpath, json_file.strpath
- )
+ msg = "Missing root-readable %s. Using redacted %s instead." % (
+ sensitive_file.strpath,
+ json_file.strpath,
)
assert msg in caplog.text
@pytest.mark.parametrize(
- 'ud_src,ud_expected,vd_src,vd_expected',
+ "ud_src,ud_expected,vd_src,vd_expected",
(
- ('hi mom', 'hi mom', 'hi pops', 'hi pops'),
- ('ud'.encode('utf-8'), 'ud', 'vd'.encode('utf-8'), 'vd'),
- (_gzip_data(b'ud'), 'ud', _gzip_data(b'vd'), 'vd'),
- (_gzip_data('ud'.encode('utf-8')), 'ud', _gzip_data(b'vd'), 'vd'),
- )
+ ("hi mom", "hi mom", "hi pops", "hi pops"),
+ ("ud".encode("utf-8"), "ud", "vd".encode("utf-8"), "vd"),
+ (_gzip_data(b"ud"), "ud", _gzip_data(b"vd"), "vd"),
+ (_gzip_data("ud".encode("utf-8")), "ud", _gzip_data(b"vd"), "vd"),
+ ),
)
def test_handle_args_root_processes_user_data(
self, ud_src, ud_expected, vd_src, vd_expected, capsys, tmpdir
@@ -199,23 +240,29 @@ class TestQuery:
sensitive_file = run_dir.join(INSTANCE_JSON_SENSITIVE_FILE)
sensitive_file.write('{"my-var": "it worked"}')
args = self.args(
- debug=False, dump_all=True, format=None, instance_data=None,
- list_keys=False, user_data=user_data.strpath,
- vendor_data=vendor_data.strpath, varname=None)
- with mock.patch('cloudinit.cmd.query.read_cfg_paths') as m_paths:
+ debug=False,
+ dump_all=True,
+ format=None,
+ instance_data=None,
+ list_keys=False,
+ user_data=user_data.strpath,
+ vendor_data=vendor_data.strpath,
+ varname=None,
+ )
+ with mock.patch("cloudinit.cmd.query.read_cfg_paths") as m_paths:
m_paths.return_value = paths
- with mock.patch('os.getuid') as m_getuid:
+ with mock.patch("os.getuid") as m_getuid:
m_getuid.return_value = 0
- assert 0 == query.handle_args('anyname', args)
+ assert 0 == query.handle_args("anyname", args)
out, _err = capsys.readouterr()
cmd_output = json.loads(out)
- assert "it worked" == cmd_output['my-var']
+ assert "it worked" == cmd_output["my-var"]
if ud_expected == "ci-b64:":
ud_expected = "ci-b64:{}".format(b64e(ud_src))
if vd_expected == "ci-b64:":
vd_expected = "ci-b64:{}".format(b64e(vd_src))
- assert ud_expected == cmd_output['userdata']
- assert vd_expected == cmd_output['vendordata']
+ assert ud_expected == cmd_output["userdata"]
+ assert vd_expected == cmd_output["vendordata"]
def test_handle_args_user_vendor_data_defaults_to_instance_link(
self, capsys, tmpdir
@@ -231,13 +278,19 @@ class TestQuery:
write_file(vd_path, "instance_link_vd")
args = self.args(
- debug=False, dump_all=True, format=None, instance_data=None,
- list_keys=False, user_data=None,
- vendor_data=None, varname=None)
- with mock.patch('cloudinit.cmd.query.read_cfg_paths') as m_paths:
+ debug=False,
+ dump_all=True,
+ format=None,
+ instance_data=None,
+ list_keys=False,
+ user_data=None,
+ vendor_data=None,
+ varname=None,
+ )
+ with mock.patch("cloudinit.cmd.query.read_cfg_paths") as m_paths:
m_paths.return_value = paths
- with mock.patch('os.getuid', return_value=0):
- assert 0 == query.handle_args('anyname', args)
+ with mock.patch("os.getuid", return_value=0):
+ assert 0 == query.handle_args("anyname", args)
expected = (
'{\n "my-var": "it worked",\n '
'"userdata": "instance_link_ud",\n '
@@ -251,19 +304,25 @@ class TestQuery:
):
"""When no instance_data argument, root uses sensitive json."""
paths, run_dir, user_data, vendor_data = self._setup_paths(
- tmpdir, ud_val='ud', vd_val='vd'
+ tmpdir, ud_val="ud", vd_val="vd"
)
sensitive_file = run_dir.join(INSTANCE_JSON_SENSITIVE_FILE)
sensitive_file.write('{"my-var": "it worked"}')
args = self.args(
- debug=False, dump_all=True, format=None, instance_data=None,
- list_keys=False, user_data=user_data.strpath,
- vendor_data=vendor_data.strpath, varname=None)
- with mock.patch('cloudinit.cmd.query.read_cfg_paths') as m_paths:
+ debug=False,
+ dump_all=True,
+ format=None,
+ instance_data=None,
+ list_keys=False,
+ user_data=user_data.strpath,
+ vendor_data=vendor_data.strpath,
+ varname=None,
+ )
+ with mock.patch("cloudinit.cmd.query.read_cfg_paths") as m_paths:
m_paths.return_value = paths
- with mock.patch('os.getuid') as m_getuid:
+ with mock.patch("os.getuid") as m_getuid:
m_getuid.return_value = 0
- assert 0 == query.handle_args('anyname', args)
+ assert 0 == query.handle_args("anyname", args)
expected = (
'{\n "my-var": "it worked",\n '
'"userdata": "ud",\n "vendordata": "vd"\n}\n'
@@ -273,68 +332,85 @@ class TestQuery:
def test_handle_args_dumps_all_instance_data(self, capsys, tmpdir):
"""When --all is specified query will dump all instance data vars."""
- instance_data = tmpdir.join('instance-data')
+ instance_data = tmpdir.join("instance-data")
instance_data.write('{"my-var": "it worked"}')
args = self.args(
- debug=False, dump_all=True, format=None,
- instance_data=instance_data.strpath, list_keys=False,
- user_data='ud', vendor_data='vd', varname=None)
- with mock.patch('os.getuid') as m_getuid:
+ debug=False,
+ dump_all=True,
+ format=None,
+ instance_data=instance_data.strpath,
+ list_keys=False,
+ user_data="ud",
+ vendor_data="vd",
+ varname=None,
+ )
+ with mock.patch("os.getuid") as m_getuid:
m_getuid.return_value = 100
- assert 0 == query.handle_args('anyname', args)
+ assert 0 == query.handle_args("anyname", args)
expected = (
'{\n "my-var": "it worked",\n "userdata": "<%s> file:ud",\n'
- ' "vendordata": "<%s> file:vd"\n}\n' % (
- REDACT_SENSITIVE_VALUE, REDACT_SENSITIVE_VALUE
- )
+ ' "vendordata": "<%s> file:vd"\n}\n'
+ % (REDACT_SENSITIVE_VALUE, REDACT_SENSITIVE_VALUE)
)
out, _err = capsys.readouterr()
assert expected == out
def test_handle_args_returns_top_level_varname(self, capsys, tmpdir):
"""When the argument varname is passed, report its value."""
- instance_data = tmpdir.join('instance-data')
+ instance_data = tmpdir.join("instance-data")
instance_data.write('{"my-var": "it worked"}')
args = self.args(
- debug=False, dump_all=True, format=None,
- instance_data=instance_data.strpath, list_keys=False,
- user_data='ud', vendor_data='vd', varname='my_var')
- with mock.patch('os.getuid') as m_getuid:
+ debug=False,
+ dump_all=True,
+ format=None,
+ instance_data=instance_data.strpath,
+ list_keys=False,
+ user_data="ud",
+ vendor_data="vd",
+ varname="my_var",
+ )
+ with mock.patch("os.getuid") as m_getuid:
m_getuid.return_value = 100
- assert 0 == query.handle_args('anyname', args)
+ assert 0 == query.handle_args("anyname", args)
out, _err = capsys.readouterr()
- assert 'it worked\n' == out
+ assert "it worked\n" == out
@pytest.mark.parametrize(
- 'inst_data,varname,expected',
+ "inst_data,varname,expected",
(
(
'{"v1": {"key-2": "value-2"}, "my-var": "it worked"}',
- 'v1.key_2',
- 'value-2\n'
+ "v1.key_2",
+ "value-2\n",
),
# Assert no jinja underscore-delimited aliases are reported on CLI
(
'{"v1": {"something-hyphenated": {"no.underscores":"x",'
' "no-alias": "y"}}, "my-var": "it worked"}',
- 'v1.something_hyphenated',
- '{\n "no-alias": "y",\n "no.underscores": "x"\n}\n'
+ "v1.something_hyphenated",
+ '{\n "no-alias": "y",\n "no.underscores": "x"\n}\n',
),
- )
+ ),
)
def test_handle_args_returns_nested_varname(
self, inst_data, varname, expected, capsys, tmpdir
):
"""If user_data file is a jinja template render instance-data vars."""
- instance_data = tmpdir.join('instance-data')
+ instance_data = tmpdir.join("instance-data")
instance_data.write(inst_data)
args = self.args(
- debug=False, dump_all=False, format=None,
- instance_data=instance_data.strpath, user_data='ud',
- vendor_data='vd', list_keys=False, varname=varname)
- with mock.patch('os.getuid') as m_getuid:
+ debug=False,
+ dump_all=False,
+ format=None,
+ instance_data=instance_data.strpath,
+ user_data="ud",
+ vendor_data="vd",
+ list_keys=False,
+ varname=varname,
+ )
+ with mock.patch("os.getuid") as m_getuid:
m_getuid.return_value = 100
- assert 0 == query.handle_args('anyname', args)
+ assert 0 == query.handle_args("anyname", args)
out, _err = capsys.readouterr()
assert expected == out
@@ -342,11 +418,13 @@ class TestQuery:
self, capsys, tmpdir
):
"""Any standardized vars under v# are promoted as top-level aliases."""
- instance_data = tmpdir.join('instance-data')
+ instance_data = tmpdir.join("instance-data")
instance_data.write(
'{"v1": {"v1_1": "val1.1"}, "v2": {"v2_2": "val2.2"},'
- ' "top": "gun"}')
- expected = dedent("""\
+ ' "top": "gun"}'
+ )
+ expected = dedent(
+ """\
{
"top": "gun",
"userdata": "<redacted for non-root user> file:ud",
@@ -360,14 +438,21 @@ class TestQuery:
"v2_2": "val2.2",
"vendordata": "<redacted for non-root user> file:vd"
}
- """)
+ """
+ )
args = self.args(
- debug=False, dump_all=True, format=None,
- instance_data=instance_data.strpath, user_data='ud',
- vendor_data='vd', list_keys=False, varname=None)
- with mock.patch('os.getuid') as m_getuid:
+ debug=False,
+ dump_all=True,
+ format=None,
+ instance_data=instance_data.strpath,
+ user_data="ud",
+ vendor_data="vd",
+ list_keys=False,
+ varname=None,
+ )
+ with mock.patch("os.getuid") as m_getuid:
m_getuid.return_value = 100
- assert 0 == query.handle_args('anyname', args)
+ assert 0 == query.handle_args("anyname", args)
out, _err = capsys.readouterr()
assert expected == out
@@ -375,18 +460,25 @@ class TestQuery:
self, capsys, tmpdir
):
"""Sort all top-level keys when only --list-keys provided."""
- instance_data = tmpdir.join('instance-data')
+ instance_data = tmpdir.join("instance-data")
instance_data.write(
'{"v1": {"v1_1": "val1.1"}, "v2": {"v2_2": "val2.2"},'
- ' "top": "gun"}')
- expected = 'top\nuserdata\nv1\nv1_1\nv2\nv2_2\nvendordata\n'
+ ' "top": "gun"}'
+ )
+ expected = "top\nuserdata\nv1\nv1_1\nv2\nv2_2\nvendordata\n"
args = self.args(
- debug=False, dump_all=False, format=None,
- instance_data=instance_data.strpath, list_keys=True,
- user_data='ud', vendor_data='vd', varname=None)
- with mock.patch('os.getuid') as m_getuid:
+ debug=False,
+ dump_all=False,
+ format=None,
+ instance_data=instance_data.strpath,
+ list_keys=True,
+ user_data="ud",
+ vendor_data="vd",
+ varname=None,
+ )
+ with mock.patch("os.getuid") as m_getuid:
m_getuid.return_value = 100
- assert 0 == query.handle_args('anyname', args)
+ assert 0 == query.handle_args("anyname", args)
out, _err = capsys.readouterr()
assert expected == out
@@ -394,18 +486,25 @@ class TestQuery:
self, capsys, tmpdir
):
"""Sort all nested keys of varname object when --list-keys provided."""
- instance_data = tmpdir.join('instance-data')
+ instance_data = tmpdir.join("instance-data")
instance_data.write(
- '{"v1": {"v1_1": "val1.1", "v1_2": "val1.2"}, "v2":' +
- ' {"v2_2": "val2.2"}, "top": "gun"}')
- expected = 'v1_1\nv1_2\n'
+ '{"v1": {"v1_1": "val1.1", "v1_2": "val1.2"}, "v2":'
+ + ' {"v2_2": "val2.2"}, "top": "gun"}'
+ )
+ expected = "v1_1\nv1_2\n"
args = self.args(
- debug=False, dump_all=False, format=None,
- instance_data=instance_data.strpath, list_keys=True,
- user_data='ud', vendor_data='vd', varname='v1')
- with mock.patch('os.getuid') as m_getuid:
+ debug=False,
+ dump_all=False,
+ format=None,
+ instance_data=instance_data.strpath,
+ list_keys=True,
+ user_data="ud",
+ vendor_data="vd",
+ varname="v1",
+ )
+ with mock.patch("os.getuid") as m_getuid:
m_getuid.return_value = 100
- assert 0 == query.handle_args('anyname', args)
+ assert 0 == query.handle_args("anyname", args)
out, _err = capsys.readouterr()
assert expected == out
@@ -413,18 +512,26 @@ class TestQuery:
self, caplog, tmpdir
):
"""Raise an error when --list-keys and varname specify a non-list."""
- instance_data = tmpdir.join('instance-data')
+ instance_data = tmpdir.join("instance-data")
instance_data.write(
- '{"v1": {"v1_1": "val1.1", "v1_2": "val1.2"}, "v2": ' +
- '{"v2_2": "val2.2"}, "top": "gun"}')
+ '{"v1": {"v1_1": "val1.1", "v1_2": "val1.2"}, "v2": '
+ + '{"v2_2": "val2.2"}, "top": "gun"}'
+ )
expected_error = "--list-keys provided but 'top' is not a dict"
args = self.args(
- debug=False, dump_all=False, format=None,
- instance_data=instance_data.strpath, list_keys=True,
- user_data='ud', vendor_data='vd', varname='top')
- with mock.patch('os.getuid') as m_getuid:
+ debug=False,
+ dump_all=False,
+ format=None,
+ instance_data=instance_data.strpath,
+ list_keys=True,
+ user_data="ud",
+ vendor_data="vd",
+ varname="top",
+ )
+ with mock.patch("os.getuid") as m_getuid:
m_getuid.return_value = 100
- assert 1 == query.handle_args('anyname', args)
+ assert 1 == query.handle_args("anyname", args)
assert expected_error in caplog.text
+
# vi: ts=4 expandtab
diff --git a/tests/unittests/cmd/test_status.py b/tests/unittests/cmd/test_status.py
index 49eae043..acd1fea5 100644
--- a/tests/unittests/cmd/test_status.py
+++ b/tests/unittests/cmd/test_status.py
@@ -1,26 +1,25 @@
# This file is part of cloud-init. See LICENSE file for license information.
-from collections import namedtuple
import os
+from collections import namedtuple
from io import StringIO
from textwrap import dedent
from cloudinit.atomic_helper import write_json
from cloudinit.cmd import status
from cloudinit.util import ensure_file
-from tests.unittests.helpers import CiTestCase, wrap_and_call, mock
+from tests.unittests.helpers import CiTestCase, mock, wrap_and_call
-mypaths = namedtuple('MyPaths', 'run_dir')
-myargs = namedtuple('MyArgs', 'long wait')
+mypaths = namedtuple("MyPaths", "run_dir")
+myargs = namedtuple("MyArgs", "long wait")
class TestStatus(CiTestCase):
-
def setUp(self):
super(TestStatus, self).setUp()
self.new_root = self.tmp_dir()
- self.status_file = self.tmp_path('status.json', self.new_root)
- self.disable_file = self.tmp_path('cloudinit-disable', self.new_root)
+ self.status_file = self.tmp_path("status.json", self.new_root)
+ self.disable_file = self.tmp_path("cloudinit-disable", self.new_root)
self.paths = mypaths(run_dir=self.new_root)
class FakeInit(object):
@@ -35,285 +34,419 @@ class TestStatus(CiTestCase):
self.init_class = FakeInit
def test__is_cloudinit_disabled_false_on_sysvinit(self):
- '''When not in an environment using systemd, return False.'''
+ """When not in an environment using systemd, return False."""
ensure_file(self.disable_file) # Create the ignored disable file
(is_disabled, reason) = wrap_and_call(
- 'cloudinit.cmd.status',
- {'uses_systemd': False,
- 'get_cmdline': "root=/dev/my-root not-important"},
- status._is_cloudinit_disabled, self.disable_file, self.paths)
+ "cloudinit.cmd.status",
+ {
+ "uses_systemd": False,
+ "get_cmdline": "root=/dev/my-root not-important",
+ },
+ status._is_cloudinit_disabled,
+ self.disable_file,
+ self.paths,
+ )
self.assertFalse(
- is_disabled, 'expected enabled cloud-init on sysvinit')
- self.assertEqual('Cloud-init enabled on sysvinit', reason)
+ is_disabled, "expected enabled cloud-init on sysvinit"
+ )
+ self.assertEqual("Cloud-init enabled on sysvinit", reason)
def test__is_cloudinit_disabled_true_on_disable_file(self):
- '''When using systemd and disable_file is present return disabled.'''
+ """When using systemd and disable_file is present return disabled."""
ensure_file(self.disable_file) # Create observed disable file
(is_disabled, reason) = wrap_and_call(
- 'cloudinit.cmd.status',
- {'uses_systemd': True,
- 'get_cmdline': "root=/dev/my-root not-important"},
- status._is_cloudinit_disabled, self.disable_file, self.paths)
- self.assertTrue(is_disabled, 'expected disabled cloud-init')
+ "cloudinit.cmd.status",
+ {
+ "uses_systemd": True,
+ "get_cmdline": "root=/dev/my-root not-important",
+ },
+ status._is_cloudinit_disabled,
+ self.disable_file,
+ self.paths,
+ )
+ self.assertTrue(is_disabled, "expected disabled cloud-init")
self.assertEqual(
- 'Cloud-init disabled by {0}'.format(self.disable_file), reason)
+ "Cloud-init disabled by {0}".format(self.disable_file), reason
+ )
def test__is_cloudinit_disabled_false_on_kernel_cmdline_enable(self):
- '''Not disabled when using systemd and enabled via commandline.'''
+ """Not disabled when using systemd and enabled via commandline."""
ensure_file(self.disable_file) # Create ignored disable file
(is_disabled, reason) = wrap_and_call(
- 'cloudinit.cmd.status',
- {'uses_systemd': True,
- 'get_cmdline': 'something cloud-init=enabled else'},
- status._is_cloudinit_disabled, self.disable_file, self.paths)
- self.assertFalse(is_disabled, 'expected enabled cloud-init')
+ "cloudinit.cmd.status",
+ {
+ "uses_systemd": True,
+ "get_cmdline": "something cloud-init=enabled else",
+ },
+ status._is_cloudinit_disabled,
+ self.disable_file,
+ self.paths,
+ )
+ self.assertFalse(is_disabled, "expected enabled cloud-init")
self.assertEqual(
- 'Cloud-init enabled by kernel command line cloud-init=enabled',
- reason)
+ "Cloud-init enabled by kernel command line cloud-init=enabled",
+ reason,
+ )
def test__is_cloudinit_disabled_true_on_kernel_cmdline(self):
- '''When using systemd and disable_file is present return disabled.'''
+ """When using systemd and disable_file is present return disabled."""
(is_disabled, reason) = wrap_and_call(
- 'cloudinit.cmd.status',
- {'uses_systemd': True,
- 'get_cmdline': 'something cloud-init=disabled else'},
- status._is_cloudinit_disabled, self.disable_file, self.paths)
- self.assertTrue(is_disabled, 'expected disabled cloud-init')
+ "cloudinit.cmd.status",
+ {
+ "uses_systemd": True,
+ "get_cmdline": "something cloud-init=disabled else",
+ },
+ status._is_cloudinit_disabled,
+ self.disable_file,
+ self.paths,
+ )
+ self.assertTrue(is_disabled, "expected disabled cloud-init")
self.assertEqual(
- 'Cloud-init disabled by kernel parameter cloud-init=disabled',
- reason)
+ "Cloud-init disabled by kernel parameter cloud-init=disabled",
+ reason,
+ )
def test__is_cloudinit_disabled_true_when_generator_disables(self):
- '''When cloud-init-generator doesn't write enabled file return True.'''
- enabled_file = os.path.join(self.paths.run_dir, 'enabled')
+ """When cloud-init-generator doesn't write enabled file return True."""
+ enabled_file = os.path.join(self.paths.run_dir, "enabled")
self.assertFalse(os.path.exists(enabled_file))
(is_disabled, reason) = wrap_and_call(
- 'cloudinit.cmd.status',
- {'uses_systemd': True,
- 'get_cmdline': 'something'},
- status._is_cloudinit_disabled, self.disable_file, self.paths)
- self.assertTrue(is_disabled, 'expected disabled cloud-init')
- self.assertEqual('Cloud-init disabled by cloud-init-generator', reason)
+ "cloudinit.cmd.status",
+ {"uses_systemd": True, "get_cmdline": "something"},
+ status._is_cloudinit_disabled,
+ self.disable_file,
+ self.paths,
+ )
+ self.assertTrue(is_disabled, "expected disabled cloud-init")
+ self.assertEqual("Cloud-init disabled by cloud-init-generator", reason)
def test__is_cloudinit_disabled_false_when_enabled_in_systemd(self):
- '''Report enabled when systemd generator creates the enabled file.'''
- enabled_file = os.path.join(self.paths.run_dir, 'enabled')
+ """Report enabled when systemd generator creates the enabled file."""
+ enabled_file = os.path.join(self.paths.run_dir, "enabled")
ensure_file(enabled_file)
(is_disabled, reason) = wrap_and_call(
- 'cloudinit.cmd.status',
- {'uses_systemd': True,
- 'get_cmdline': 'something ignored'},
- status._is_cloudinit_disabled, self.disable_file, self.paths)
- self.assertFalse(is_disabled, 'expected enabled cloud-init')
+ "cloudinit.cmd.status",
+ {"uses_systemd": True, "get_cmdline": "something ignored"},
+ status._is_cloudinit_disabled,
+ self.disable_file,
+ self.paths,
+ )
+ self.assertFalse(is_disabled, "expected enabled cloud-init")
self.assertEqual(
- 'Cloud-init enabled by systemd cloud-init-generator', reason)
+ "Cloud-init enabled by systemd cloud-init-generator", reason
+ )
def test_status_returns_not_run(self):
- '''When status.json does not exist yet, return 'not run'.'''
+ """When status.json does not exist yet, return 'not run'."""
self.assertFalse(
- os.path.exists(self.status_file), 'Unexpected status.json found')
+ os.path.exists(self.status_file), "Unexpected status.json found"
+ )
cmdargs = myargs(long=False, wait=False)
- with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout:
+ with mock.patch("sys.stdout", new_callable=StringIO) as m_stdout:
retcode = wrap_and_call(
- 'cloudinit.cmd.status',
- {'_is_cloudinit_disabled': (False, ''),
- 'Init': {'side_effect': self.init_class}},
- status.handle_status_args, 'ignored', cmdargs)
+ "cloudinit.cmd.status",
+ {
+ "_is_cloudinit_disabled": (False, ""),
+ "Init": {"side_effect": self.init_class},
+ },
+ status.handle_status_args,
+ "ignored",
+ cmdargs,
+ )
self.assertEqual(0, retcode)
- self.assertEqual('status: not run\n', m_stdout.getvalue())
+ self.assertEqual("status: not run\n", m_stdout.getvalue())
def test_status_returns_disabled_long_on_presence_of_disable_file(self):
- '''When cloudinit is disabled, return disabled reason.'''
+ """When cloudinit is disabled, return disabled reason."""
checked_files = []
def fakeexists(filepath):
checked_files.append(filepath)
- status_file = os.path.join(self.paths.run_dir, 'status.json')
+ status_file = os.path.join(self.paths.run_dir, "status.json")
return bool(not filepath == status_file)
cmdargs = myargs(long=True, wait=False)
- with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout:
+ with mock.patch("sys.stdout", new_callable=StringIO) as m_stdout:
retcode = wrap_and_call(
- 'cloudinit.cmd.status',
- {'os.path.exists': {'side_effect': fakeexists},
- '_is_cloudinit_disabled': (True, 'disabled for some reason'),
- 'Init': {'side_effect': self.init_class}},
- status.handle_status_args, 'ignored', cmdargs)
+ "cloudinit.cmd.status",
+ {
+ "os.path.exists": {"side_effect": fakeexists},
+ "_is_cloudinit_disabled": (
+ True,
+ "disabled for some reason",
+ ),
+ "Init": {"side_effect": self.init_class},
+ },
+ status.handle_status_args,
+ "ignored",
+ cmdargs,
+ )
self.assertEqual(0, retcode)
self.assertEqual(
- [os.path.join(self.paths.run_dir, 'status.json')],
- checked_files)
- expected = dedent('''\
+ [os.path.join(self.paths.run_dir, "status.json")], checked_files
+ )
+ expected = dedent(
+ """\
status: disabled
detail:
disabled for some reason
- ''')
+ """
+ )
self.assertEqual(expected, m_stdout.getvalue())
def test_status_returns_running_on_no_results_json(self):
- '''Report running when status.json exists but result.json does not.'''
- result_file = self.tmp_path('result.json', self.new_root)
+ """Report running when status.json exists but result.json does not."""
+ result_file = self.tmp_path("result.json", self.new_root)
write_json(self.status_file, {})
self.assertFalse(
- os.path.exists(result_file), 'Unexpected result.json found')
+ os.path.exists(result_file), "Unexpected result.json found"
+ )
cmdargs = myargs(long=False, wait=False)
- with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout:
+ with mock.patch("sys.stdout", new_callable=StringIO) as m_stdout:
retcode = wrap_and_call(
- 'cloudinit.cmd.status',
- {'_is_cloudinit_disabled': (False, ''),
- 'Init': {'side_effect': self.init_class}},
- status.handle_status_args, 'ignored', cmdargs)
+ "cloudinit.cmd.status",
+ {
+ "_is_cloudinit_disabled": (False, ""),
+ "Init": {"side_effect": self.init_class},
+ },
+ status.handle_status_args,
+ "ignored",
+ cmdargs,
+ )
self.assertEqual(0, retcode)
- self.assertEqual('status: running\n', m_stdout.getvalue())
+ self.assertEqual("status: running\n", m_stdout.getvalue())
def test_status_returns_running(self):
- '''Report running when status exists with an unfinished stage.'''
- ensure_file(self.tmp_path('result.json', self.new_root))
- write_json(self.status_file,
- {'v1': {'init': {'start': 1, 'finished': None}}})
+ """Report running when status exists with an unfinished stage."""
+ ensure_file(self.tmp_path("result.json", self.new_root))
+ write_json(
+ self.status_file, {"v1": {"init": {"start": 1, "finished": None}}}
+ )
cmdargs = myargs(long=False, wait=False)
- with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout:
+ with mock.patch("sys.stdout", new_callable=StringIO) as m_stdout:
retcode = wrap_and_call(
- 'cloudinit.cmd.status',
- {'_is_cloudinit_disabled': (False, ''),
- 'Init': {'side_effect': self.init_class}},
- status.handle_status_args, 'ignored', cmdargs)
+ "cloudinit.cmd.status",
+ {
+ "_is_cloudinit_disabled": (False, ""),
+ "Init": {"side_effect": self.init_class},
+ },
+ status.handle_status_args,
+ "ignored",
+ cmdargs,
+ )
self.assertEqual(0, retcode)
- self.assertEqual('status: running\n', m_stdout.getvalue())
+ self.assertEqual("status: running\n", m_stdout.getvalue())
def test_status_returns_done(self):
- '''Report done results.json exists no stages are unfinished.'''
- ensure_file(self.tmp_path('result.json', self.new_root))
+ """Report done results.json exists no stages are unfinished."""
+ ensure_file(self.tmp_path("result.json", self.new_root))
write_json(
self.status_file,
- {'v1': {'stage': None, # No current stage running
- 'datasource': (
- 'DataSourceNoCloud [seed=/var/.../seed/nocloud-net]'
- '[dsmode=net]'),
- 'blah': {'finished': 123.456},
- 'init': {'errors': [], 'start': 124.567,
- 'finished': 125.678},
- 'init-local': {'start': 123.45, 'finished': 123.46}}})
+ {
+ "v1": {
+ "stage": None, # No current stage running
+ "datasource": (
+ "DataSourceNoCloud [seed=/var/.../seed/nocloud-net]"
+ "[dsmode=net]"
+ ),
+ "blah": {"finished": 123.456},
+ "init": {
+ "errors": [],
+ "start": 124.567,
+ "finished": 125.678,
+ },
+ "init-local": {"start": 123.45, "finished": 123.46},
+ }
+ },
+ )
cmdargs = myargs(long=False, wait=False)
- with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout:
+ with mock.patch("sys.stdout", new_callable=StringIO) as m_stdout:
retcode = wrap_and_call(
- 'cloudinit.cmd.status',
- {'_is_cloudinit_disabled': (False, ''),
- 'Init': {'side_effect': self.init_class}},
- status.handle_status_args, 'ignored', cmdargs)
+ "cloudinit.cmd.status",
+ {
+ "_is_cloudinit_disabled": (False, ""),
+ "Init": {"side_effect": self.init_class},
+ },
+ status.handle_status_args,
+ "ignored",
+ cmdargs,
+ )
self.assertEqual(0, retcode)
- self.assertEqual('status: done\n', m_stdout.getvalue())
+ self.assertEqual("status: done\n", m_stdout.getvalue())
def test_status_returns_done_long(self):
- '''Long format of done status includes datasource info.'''
- ensure_file(self.tmp_path('result.json', self.new_root))
+ """Long format of done status includes datasource info."""
+ ensure_file(self.tmp_path("result.json", self.new_root))
write_json(
self.status_file,
- {'v1': {'stage': None,
- 'datasource': (
- 'DataSourceNoCloud [seed=/var/.../seed/nocloud-net]'
- '[dsmode=net]'),
- 'init': {'start': 124.567, 'finished': 125.678},
- 'init-local': {'start': 123.45, 'finished': 123.46}}})
+ {
+ "v1": {
+ "stage": None,
+ "datasource": (
+ "DataSourceNoCloud [seed=/var/.../seed/nocloud-net]"
+ "[dsmode=net]"
+ ),
+ "init": {"start": 124.567, "finished": 125.678},
+ "init-local": {"start": 123.45, "finished": 123.46},
+ }
+ },
+ )
cmdargs = myargs(long=True, wait=False)
- with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout:
+ with mock.patch("sys.stdout", new_callable=StringIO) as m_stdout:
retcode = wrap_and_call(
- 'cloudinit.cmd.status',
- {'_is_cloudinit_disabled': (False, ''),
- 'Init': {'side_effect': self.init_class}},
- status.handle_status_args, 'ignored', cmdargs)
+ "cloudinit.cmd.status",
+ {
+ "_is_cloudinit_disabled": (False, ""),
+ "Init": {"side_effect": self.init_class},
+ },
+ status.handle_status_args,
+ "ignored",
+ cmdargs,
+ )
self.assertEqual(0, retcode)
- expected = dedent('''\
+ expected = dedent(
+ """\
status: done
time: Thu, 01 Jan 1970 00:02:05 +0000
detail:
DataSourceNoCloud [seed=/var/.../seed/nocloud-net][dsmode=net]
- ''')
+ """
+ )
self.assertEqual(expected, m_stdout.getvalue())
def test_status_on_errors(self):
- '''Reports error when any stage has errors.'''
+ """Reports error when any stage has errors."""
write_json(
self.status_file,
- {'v1': {'stage': None,
- 'blah': {'errors': [], 'finished': 123.456},
- 'init': {'errors': ['error1'], 'start': 124.567,
- 'finished': 125.678},
- 'init-local': {'start': 123.45, 'finished': 123.46}}})
+ {
+ "v1": {
+ "stage": None,
+ "blah": {"errors": [], "finished": 123.456},
+ "init": {
+ "errors": ["error1"],
+ "start": 124.567,
+ "finished": 125.678,
+ },
+ "init-local": {"start": 123.45, "finished": 123.46},
+ }
+ },
+ )
cmdargs = myargs(long=False, wait=False)
- with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout:
+ with mock.patch("sys.stdout", new_callable=StringIO) as m_stdout:
retcode = wrap_and_call(
- 'cloudinit.cmd.status',
- {'_is_cloudinit_disabled': (False, ''),
- 'Init': {'side_effect': self.init_class}},
- status.handle_status_args, 'ignored', cmdargs)
+ "cloudinit.cmd.status",
+ {
+ "_is_cloudinit_disabled": (False, ""),
+ "Init": {"side_effect": self.init_class},
+ },
+ status.handle_status_args,
+ "ignored",
+ cmdargs,
+ )
self.assertEqual(1, retcode)
- self.assertEqual('status: error\n', m_stdout.getvalue())
+ self.assertEqual("status: error\n", m_stdout.getvalue())
def test_status_on_errors_long(self):
- '''Long format of error status includes all error messages.'''
+ """Long format of error status includes all error messages."""
write_json(
self.status_file,
- {'v1': {'stage': None,
- 'datasource': (
- 'DataSourceNoCloud [seed=/var/.../seed/nocloud-net]'
- '[dsmode=net]'),
- 'init': {'errors': ['error1'], 'start': 124.567,
- 'finished': 125.678},
- 'init-local': {'errors': ['error2', 'error3'],
- 'start': 123.45, 'finished': 123.46}}})
+ {
+ "v1": {
+ "stage": None,
+ "datasource": (
+ "DataSourceNoCloud [seed=/var/.../seed/nocloud-net]"
+ "[dsmode=net]"
+ ),
+ "init": {
+ "errors": ["error1"],
+ "start": 124.567,
+ "finished": 125.678,
+ },
+ "init-local": {
+ "errors": ["error2", "error3"],
+ "start": 123.45,
+ "finished": 123.46,
+ },
+ }
+ },
+ )
cmdargs = myargs(long=True, wait=False)
- with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout:
+ with mock.patch("sys.stdout", new_callable=StringIO) as m_stdout:
retcode = wrap_and_call(
- 'cloudinit.cmd.status',
- {'_is_cloudinit_disabled': (False, ''),
- 'Init': {'side_effect': self.init_class}},
- status.handle_status_args, 'ignored', cmdargs)
+ "cloudinit.cmd.status",
+ {
+ "_is_cloudinit_disabled": (False, ""),
+ "Init": {"side_effect": self.init_class},
+ },
+ status.handle_status_args,
+ "ignored",
+ cmdargs,
+ )
self.assertEqual(1, retcode)
- expected = dedent('''\
+ expected = dedent(
+ """\
status: error
time: Thu, 01 Jan 1970 00:02:05 +0000
detail:
error1
error2
error3
- ''')
+ """
+ )
self.assertEqual(expected, m_stdout.getvalue())
def test_status_returns_running_long_format(self):
- '''Long format reports the stage in which we are running.'''
+ """Long format reports the stage in which we are running."""
write_json(
self.status_file,
- {'v1': {'stage': 'init',
- 'init': {'start': 124.456, 'finished': None},
- 'init-local': {'start': 123.45, 'finished': 123.46}}})
+ {
+ "v1": {
+ "stage": "init",
+ "init": {"start": 124.456, "finished": None},
+ "init-local": {"start": 123.45, "finished": 123.46},
+ }
+ },
+ )
cmdargs = myargs(long=True, wait=False)
- with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout:
+ with mock.patch("sys.stdout", new_callable=StringIO) as m_stdout:
retcode = wrap_and_call(
- 'cloudinit.cmd.status',
- {'_is_cloudinit_disabled': (False, ''),
- 'Init': {'side_effect': self.init_class}},
- status.handle_status_args, 'ignored', cmdargs)
+ "cloudinit.cmd.status",
+ {
+ "_is_cloudinit_disabled": (False, ""),
+ "Init": {"side_effect": self.init_class},
+ },
+ status.handle_status_args,
+ "ignored",
+ cmdargs,
+ )
self.assertEqual(0, retcode)
- expected = dedent('''\
+ expected = dedent(
+ """\
status: running
time: Thu, 01 Jan 1970 00:02:04 +0000
detail:
Running in stage: init
- ''')
+ """
+ )
self.assertEqual(expected, m_stdout.getvalue())
def test_status_wait_blocks_until_done(self):
- '''Specifying wait will poll every 1/4 second until done state.'''
+ """Specifying wait will poll every 1/4 second until done state."""
running_json = {
- 'v1': {'stage': 'init',
- 'init': {'start': 124.456, 'finished': None},
- 'init-local': {'start': 123.45, 'finished': 123.46}}}
+ "v1": {
+ "stage": "init",
+ "init": {"start": 124.456, "finished": None},
+ "init-local": {"start": 123.45, "finished": 123.46},
+ }
+ }
done_json = {
- 'v1': {'stage': None,
- 'init': {'start': 124.456, 'finished': 125.678},
- 'init-local': {'start': 123.45, 'finished': 123.46}}}
+ "v1": {
+ "stage": None,
+ "init": {"start": 124.456, "finished": 125.678},
+ "init-local": {"start": 123.45, "finished": 123.46},
+ }
+ }
self.sleep_calls = 0
@@ -324,32 +457,46 @@ class TestStatus(CiTestCase):
write_json(self.status_file, running_json)
elif self.sleep_calls == 3:
write_json(self.status_file, done_json)
- result_file = self.tmp_path('result.json', self.new_root)
+ result_file = self.tmp_path("result.json", self.new_root)
ensure_file(result_file)
cmdargs = myargs(long=False, wait=True)
- with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout:
+ with mock.patch("sys.stdout", new_callable=StringIO) as m_stdout:
retcode = wrap_and_call(
- 'cloudinit.cmd.status',
- {'sleep': {'side_effect': fake_sleep},
- '_is_cloudinit_disabled': (False, ''),
- 'Init': {'side_effect': self.init_class}},
- status.handle_status_args, 'ignored', cmdargs)
+ "cloudinit.cmd.status",
+ {
+ "sleep": {"side_effect": fake_sleep},
+ "_is_cloudinit_disabled": (False, ""),
+ "Init": {"side_effect": self.init_class},
+ },
+ status.handle_status_args,
+ "ignored",
+ cmdargs,
+ )
self.assertEqual(0, retcode)
self.assertEqual(4, self.sleep_calls)
- self.assertEqual('....\nstatus: done\n', m_stdout.getvalue())
+ self.assertEqual("....\nstatus: done\n", m_stdout.getvalue())
def test_status_wait_blocks_until_error(self):
- '''Specifying wait will poll every 1/4 second until error state.'''
+ """Specifying wait will poll every 1/4 second until error state."""
running_json = {
- 'v1': {'stage': 'init',
- 'init': {'start': 124.456, 'finished': None},
- 'init-local': {'start': 123.45, 'finished': 123.46}}}
+ "v1": {
+ "stage": "init",
+ "init": {"start": 124.456, "finished": None},
+ "init-local": {"start": 123.45, "finished": 123.46},
+ }
+ }
error_json = {
- 'v1': {'stage': None,
- 'init': {'errors': ['error1'], 'start': 124.456,
- 'finished': 125.678},
- 'init-local': {'start': 123.45, 'finished': 123.46}}}
+ "v1": {
+ "stage": None,
+ "init": {
+ "errors": ["error1"],
+ "start": 124.456,
+ "finished": 125.678,
+ },
+ "init-local": {"start": 123.45, "finished": 123.46},
+ }
+ }
self.sleep_calls = 0
@@ -362,30 +509,40 @@ class TestStatus(CiTestCase):
write_json(self.status_file, error_json)
cmdargs = myargs(long=False, wait=True)
- with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout:
+ with mock.patch("sys.stdout", new_callable=StringIO) as m_stdout:
retcode = wrap_and_call(
- 'cloudinit.cmd.status',
- {'sleep': {'side_effect': fake_sleep},
- '_is_cloudinit_disabled': (False, ''),
- 'Init': {'side_effect': self.init_class}},
- status.handle_status_args, 'ignored', cmdargs)
+ "cloudinit.cmd.status",
+ {
+ "sleep": {"side_effect": fake_sleep},
+ "_is_cloudinit_disabled": (False, ""),
+ "Init": {"side_effect": self.init_class},
+ },
+ status.handle_status_args,
+ "ignored",
+ cmdargs,
+ )
self.assertEqual(1, retcode)
self.assertEqual(4, self.sleep_calls)
- self.assertEqual('....\nstatus: error\n', m_stdout.getvalue())
+ self.assertEqual("....\nstatus: error\n", m_stdout.getvalue())
def test_status_main(self):
- '''status.main can be run as a standalone script.'''
- write_json(self.status_file,
- {'v1': {'init': {'start': 1, 'finished': None}}})
+ """status.main can be run as a standalone script."""
+ write_json(
+ self.status_file, {"v1": {"init": {"start": 1, "finished": None}}}
+ )
with self.assertRaises(SystemExit) as context_manager:
- with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout:
+ with mock.patch("sys.stdout", new_callable=StringIO) as m_stdout:
wrap_and_call(
- 'cloudinit.cmd.status',
- {'sys.argv': {'new': ['status']},
- '_is_cloudinit_disabled': (False, ''),
- 'Init': {'side_effect': self.init_class}},
- status.main)
+ "cloudinit.cmd.status",
+ {
+ "sys.argv": {"new": ["status"]},
+ "_is_cloudinit_disabled": (False, ""),
+ "Init": {"side_effect": self.init_class},
+ },
+ status.main,
+ )
self.assertEqual(0, context_manager.exception.code)
- self.assertEqual('status: running\n', m_stdout.getvalue())
+ self.assertEqual("status: running\n", m_stdout.getvalue())
+
# vi: ts=4 expandtab syntax=python