diff options
Diffstat (limited to 'tests/unittests/cmd')
-rw-r--r-- | tests/unittests/cmd/devel/test_hotplug_hook.py | 162 | ||||
-rw-r--r-- | tests/unittests/cmd/devel/test_logs.py | 232 | ||||
-rw-r--r-- | tests/unittests/cmd/devel/test_render.py | 152 | ||||
-rw-r--r-- | tests/unittests/cmd/test_clean.py | 179 | ||||
-rw-r--r-- | tests/unittests/cmd/test_cloud_id.py | 99 | ||||
-rw-r--r-- | tests/unittests/cmd/test_main.py | 223 | ||||
-rw-r--r-- | tests/unittests/cmd/test_query.py | 403 | ||||
-rw-r--r-- | tests/unittests/cmd/test_status.py | 561 |
8 files changed, 1221 insertions, 790 deletions
diff --git a/tests/unittests/cmd/devel/test_hotplug_hook.py b/tests/unittests/cmd/devel/test_hotplug_hook.py index e1c64e2f..842e8dfd 100644 --- a/tests/unittests/cmd/devel/test_hotplug_hook.py +++ b/tests/unittests/cmd/devel/test_hotplug_hook.py @@ -1,8 +1,9 @@ -import pytest from collections import namedtuple from unittest import mock from unittest.mock import call +import pytest + from cloudinit.cmd.devel.hotplug_hook import handle_hotplug from cloudinit.distros import Distro from cloudinit.event import EventType @@ -11,9 +12,8 @@ from cloudinit.net.network_state import NetworkState from cloudinit.sources import DataSource from cloudinit.stages import Init - -hotplug_args = namedtuple('hotplug_args', 'udevaction, subsystem, devpath') -FAKE_MAC = '11:22:33:44:55:66' +hotplug_args = namedtuple("hotplug_args", "udevaction, subsystem, devpath") +FAKE_MAC = "11:22:33:44:55:66" @pytest.yield_fixture @@ -26,28 +26,28 @@ def mocks(): m_init.fetch.return_value = m_datasource read_sys_net = mock.patch( - 'cloudinit.cmd.devel.hotplug_hook.read_sys_net_safe', - return_value=FAKE_MAC + "cloudinit.cmd.devel.hotplug_hook.read_sys_net_safe", + return_value=FAKE_MAC, ) update_event_enabled = mock.patch( - 'cloudinit.stages.update_event_enabled', + "cloudinit.stages.update_event_enabled", return_value=True, ) m_network_state = mock.MagicMock(spec=NetworkState) parse_net = mock.patch( - 'cloudinit.cmd.devel.hotplug_hook.parse_net_config_data', - return_value=m_network_state + "cloudinit.cmd.devel.hotplug_hook.parse_net_config_data", + return_value=m_network_state, ) m_activator = mock.MagicMock(spec=NetworkActivator) select_activator = mock.patch( - 'cloudinit.cmd.devel.hotplug_hook.activators.select_activator', - return_value=m_activator + "cloudinit.cmd.devel.hotplug_hook.activators.select_activator", + return_value=m_activator, ) - sleep = mock.patch('time.sleep') + sleep = mock.patch("time.sleep") read_sys_net.start() update_event_enabled.start() @@ -55,7 +55,7 @@ def mocks(): select_activator.start() m_sleep = sleep.start() - yield namedtuple('mocks', 'm_init m_network_state m_activator m_sleep')( + yield namedtuple("mocks", "m_init m_network_state m_activator m_sleep")( m_init=m_init, m_network_state=m_network_state, m_activator=m_activator, @@ -72,42 +72,43 @@ def mocks(): class TestUnsupportedActions: def test_unsupported_subsystem(self, mocks): with pytest.raises( - Exception, - match='cannot handle events for subsystem: not_real' + Exception, match="cannot handle events for subsystem: not_real" ): handle_hotplug( hotplug_init=mocks.m_init, - devpath='/dev/fake', - subsystem='not_real', - udevaction='add' + devpath="/dev/fake", + subsystem="not_real", + udevaction="add", ) def test_unsupported_udevaction(self, mocks): - with pytest.raises(ValueError, match='Unknown action: not_real'): + with pytest.raises(ValueError, match="Unknown action: not_real"): handle_hotplug( hotplug_init=mocks.m_init, - devpath='/dev/fake', - udevaction='not_real', - subsystem='net' + devpath="/dev/fake", + udevaction="not_real", + subsystem="net", ) class TestHotplug: def test_succcessful_add(self, mocks): init = mocks.m_init - mocks.m_network_state.iter_interfaces.return_value = [{ - 'mac_address': FAKE_MAC, - }] + mocks.m_network_state.iter_interfaces.return_value = [ + { + "mac_address": FAKE_MAC, + } + ] handle_hotplug( hotplug_init=init, - devpath='/dev/fake', - udevaction='add', - subsystem='net' + devpath="/dev/fake", + udevaction="add", + subsystem="net", + ) + init.datasource.update_metadata_if_supported.assert_called_once_with( + [EventType.HOTPLUG] ) - init.datasource.update_metadata_if_supported.assert_called_once_with([ - EventType.HOTPLUG - ]) - mocks.m_activator.bring_up_interface.assert_called_once_with('fake') + mocks.m_activator.bring_up_interface.assert_called_once_with("fake") mocks.m_activator.bring_down_interface.assert_not_called() init._write_to_cache.assert_called_once_with() @@ -116,113 +117,120 @@ class TestHotplug: mocks.m_network_state.iter_interfaces.return_value = [{}] handle_hotplug( hotplug_init=init, - devpath='/dev/fake', - udevaction='remove', - subsystem='net' + devpath="/dev/fake", + udevaction="remove", + subsystem="net", ) - init.datasource.update_metadata_if_supported.assert_called_once_with([ - EventType.HOTPLUG - ]) - mocks.m_activator.bring_down_interface.assert_called_once_with('fake') + init.datasource.update_metadata_if_supported.assert_called_once_with( + [EventType.HOTPLUG] + ) + mocks.m_activator.bring_down_interface.assert_called_once_with("fake") mocks.m_activator.bring_up_interface.assert_not_called() init._write_to_cache.assert_called_once_with() def test_update_event_disabled(self, mocks, caplog): init = mocks.m_init with mock.patch( - 'cloudinit.stages.update_event_enabled', - return_value=False + "cloudinit.stages.update_event_enabled", return_value=False ): handle_hotplug( hotplug_init=init, - devpath='/dev/fake', - udevaction='remove', - subsystem='net' + devpath="/dev/fake", + udevaction="remove", + subsystem="net", ) - assert 'hotplug not enabled for event of type' in caplog.text + assert "hotplug not enabled for event of type" in caplog.text init.datasource.update_metadata_if_supported.assert_not_called() mocks.m_activator.bring_up_interface.assert_not_called() mocks.m_activator.bring_down_interface.assert_not_called() init._write_to_cache.assert_not_called() def test_update_metadata_failed(self, mocks): - mocks.m_init.datasource.update_metadata_if_supported.return_value = \ + mocks.m_init.datasource.update_metadata_if_supported.return_value = ( False + ) with pytest.raises( - RuntimeError, match='Datasource .* not updated for event hotplug' + RuntimeError, match="Datasource .* not updated for event hotplug" ): handle_hotplug( hotplug_init=mocks.m_init, - devpath='/dev/fake', - udevaction='remove', - subsystem='net' + devpath="/dev/fake", + udevaction="remove", + subsystem="net", ) def test_detect_hotplugged_device_not_detected_on_add(self, mocks): mocks.m_network_state.iter_interfaces.return_value = [{}] with pytest.raises( RuntimeError, - match='Failed to detect {} in updated metadata'.format(FAKE_MAC) + match="Failed to detect {} in updated metadata".format(FAKE_MAC), ): handle_hotplug( hotplug_init=mocks.m_init, - devpath='/dev/fake', - udevaction='add', - subsystem='net' + devpath="/dev/fake", + udevaction="add", + subsystem="net", ) def test_detect_hotplugged_device_detected_on_remove(self, mocks): - mocks.m_network_state.iter_interfaces.return_value = [{ - 'mac_address': FAKE_MAC, - }] + mocks.m_network_state.iter_interfaces.return_value = [ + { + "mac_address": FAKE_MAC, + } + ] with pytest.raises( - RuntimeError, - match='Failed to detect .* in updated metadata' + RuntimeError, match="Failed to detect .* in updated metadata" ): handle_hotplug( hotplug_init=mocks.m_init, - devpath='/dev/fake', - udevaction='remove', - subsystem='net' + devpath="/dev/fake", + udevaction="remove", + subsystem="net", ) def test_apply_failed_on_add(self, mocks): - mocks.m_network_state.iter_interfaces.return_value = [{ - 'mac_address': FAKE_MAC, - }] + mocks.m_network_state.iter_interfaces.return_value = [ + { + "mac_address": FAKE_MAC, + } + ] mocks.m_activator.bring_up_interface.return_value = False with pytest.raises( - RuntimeError, match='Failed to bring up device: /dev/fake' + RuntimeError, match="Failed to bring up device: /dev/fake" ): handle_hotplug( hotplug_init=mocks.m_init, - devpath='/dev/fake', - udevaction='add', - subsystem='net' + devpath="/dev/fake", + udevaction="add", + subsystem="net", ) def test_apply_failed_on_remove(self, mocks): mocks.m_network_state.iter_interfaces.return_value = [{}] mocks.m_activator.bring_down_interface.return_value = False with pytest.raises( - RuntimeError, match='Failed to bring down device: /dev/fake' + RuntimeError, match="Failed to bring down device: /dev/fake" ): handle_hotplug( hotplug_init=mocks.m_init, - devpath='/dev/fake', - udevaction='remove', - subsystem='net' + devpath="/dev/fake", + udevaction="remove", + subsystem="net", ) def test_retry(self, mocks): with pytest.raises(RuntimeError): handle_hotplug( hotplug_init=mocks.m_init, - devpath='/dev/fake', - udevaction='add', - subsystem='net' + devpath="/dev/fake", + udevaction="add", + subsystem="net", ) assert mocks.m_sleep.call_count == 5 assert mocks.m_sleep.call_args_list == [ - call(1), call(3), call(5), call(10), call(30) + call(1), + call(3), + call(5), + call(10), + call(30), ] diff --git a/tests/unittests/cmd/devel/test_logs.py b/tests/unittests/cmd/devel/test_logs.py index 18bdcdda..73ed3c65 100644 --- a/tests/unittests/cmd/devel/test_logs.py +++ b/tests/unittests/cmd/devel/test_logs.py @@ -1,167 +1,213 @@ # This file is part of cloud-init. See LICENSE file for license information. -from datetime import datetime import os +from datetime import datetime from io import StringIO from cloudinit.cmd.devel import logs from cloudinit.sources import INSTANCE_JSON_SENSITIVE_FILE -from tests.unittests.helpers import ( - FilesystemMockingTestCase, mock, wrap_and_call) from cloudinit.subp import subp from cloudinit.util import ensure_dir, load_file, write_file +from tests.unittests.helpers import ( + FilesystemMockingTestCase, + mock, + wrap_and_call, +) -@mock.patch('cloudinit.cmd.devel.logs.os.getuid') +@mock.patch("cloudinit.cmd.devel.logs.os.getuid") class TestCollectLogs(FilesystemMockingTestCase): - def setUp(self): super(TestCollectLogs, self).setUp() self.new_root = self.tmp_dir() - self.run_dir = self.tmp_path('run', self.new_root) + self.run_dir = self.tmp_path("run", self.new_root) def test_collect_logs_with_userdata_requires_root_user(self, m_getuid): """collect-logs errors when non-root user collects userdata .""" m_getuid.return_value = 100 # non-root - output_tarfile = self.tmp_path('logs.tgz') - with mock.patch('sys.stderr', new_callable=StringIO) as m_stderr: + output_tarfile = self.tmp_path("logs.tgz") + with mock.patch("sys.stderr", new_callable=StringIO) as m_stderr: self.assertEqual( - 1, logs.collect_logs(output_tarfile, include_userdata=True)) + 1, logs.collect_logs(output_tarfile, include_userdata=True) + ) self.assertEqual( - 'To include userdata, root user is required.' - ' Try sudo cloud-init collect-logs\n', - m_stderr.getvalue()) + "To include userdata, root user is required." + " Try sudo cloud-init collect-logs\n", + m_stderr.getvalue(), + ) def test_collect_logs_creates_tarfile(self, m_getuid): """collect-logs creates a tarfile with all related cloud-init info.""" m_getuid.return_value = 100 - log1 = self.tmp_path('cloud-init.log', self.new_root) - write_file(log1, 'cloud-init-log') - log2 = self.tmp_path('cloud-init-output.log', self.new_root) - write_file(log2, 'cloud-init-output-log') + log1 = self.tmp_path("cloud-init.log", self.new_root) + write_file(log1, "cloud-init-log") + log2 = self.tmp_path("cloud-init-output.log", self.new_root) + write_file(log2, "cloud-init-output-log") ensure_dir(self.run_dir) - write_file(self.tmp_path('results.json', self.run_dir), 'results') - write_file(self.tmp_path(INSTANCE_JSON_SENSITIVE_FILE, self.run_dir), - 'sensitive') - output_tarfile = self.tmp_path('logs.tgz') + write_file(self.tmp_path("results.json", self.run_dir), "results") + write_file( + self.tmp_path(INSTANCE_JSON_SENSITIVE_FILE, self.run_dir), + "sensitive", + ) + output_tarfile = self.tmp_path("logs.tgz") - date = datetime.utcnow().date().strftime('%Y-%m-%d') - date_logdir = 'cloud-init-logs-{0}'.format(date) + date = datetime.utcnow().date().strftime("%Y-%m-%d") + date_logdir = "cloud-init-logs-{0}".format(date) - version_out = '/usr/bin/cloud-init 18.2fake\n' + version_out = "/usr/bin/cloud-init 18.2fake\n" expected_subp = { - ('dpkg-query', '--show', "-f=${Version}\n", 'cloud-init'): - '0.7fake\n', - ('cloud-init', '--version'): version_out, - ('dmesg',): 'dmesg-out\n', - ('journalctl', '--boot=0', '-o', 'short-precise'): 'journal-out\n', - ('tar', 'czvf', output_tarfile, date_logdir): '' + ( + "dpkg-query", + "--show", + "-f=${Version}\n", + "cloud-init", + ): "0.7fake\n", + ("cloud-init", "--version"): version_out, + ("dmesg",): "dmesg-out\n", + ("journalctl", "--boot=0", "-o", "short-precise"): "journal-out\n", + ("tar", "czvf", output_tarfile, date_logdir): "", } def fake_subp(cmd): cmd_tuple = tuple(cmd) if cmd_tuple not in expected_subp: raise AssertionError( - 'Unexpected command provided to subp: {0}'.format(cmd)) - if cmd == ['tar', 'czvf', output_tarfile, date_logdir]: + "Unexpected command provided to subp: {0}".format(cmd) + ) + if cmd == ["tar", "czvf", output_tarfile, date_logdir]: subp(cmd) # Pass through tar cmd so we can check output - return expected_subp[cmd_tuple], '' + return expected_subp[cmd_tuple], "" fake_stderr = mock.MagicMock() wrap_and_call( - 'cloudinit.cmd.devel.logs', - {'subp': {'side_effect': fake_subp}, - 'sys.stderr': {'new': fake_stderr}, - 'CLOUDINIT_LOGS': {'new': [log1, log2]}, - 'CLOUDINIT_RUN_DIR': {'new': self.run_dir}}, - logs.collect_logs, output_tarfile, include_userdata=False) + "cloudinit.cmd.devel.logs", + { + "subp": {"side_effect": fake_subp}, + "sys.stderr": {"new": fake_stderr}, + "CLOUDINIT_LOGS": {"new": [log1, log2]}, + "CLOUDINIT_RUN_DIR": {"new": self.run_dir}, + }, + logs.collect_logs, + output_tarfile, + include_userdata=False, + ) # unpack the tarfile and check file contents - subp(['tar', 'zxvf', output_tarfile, '-C', self.new_root]) + subp(["tar", "zxvf", output_tarfile, "-C", self.new_root]) out_logdir = self.tmp_path(date_logdir, self.new_root) self.assertFalse( os.path.exists( - os.path.join(out_logdir, 'run', 'cloud-init', - INSTANCE_JSON_SENSITIVE_FILE)), - 'Unexpected file found: %s' % INSTANCE_JSON_SENSITIVE_FILE) + os.path.join( + out_logdir, + "run", + "cloud-init", + INSTANCE_JSON_SENSITIVE_FILE, + ) + ), + "Unexpected file found: %s" % INSTANCE_JSON_SENSITIVE_FILE, + ) + self.assertEqual( + "0.7fake\n", load_file(os.path.join(out_logdir, "dpkg-version")) + ) self.assertEqual( - '0.7fake\n', - load_file(os.path.join(out_logdir, 'dpkg-version'))) - self.assertEqual(version_out, - load_file(os.path.join(out_logdir, 'version'))) + version_out, load_file(os.path.join(out_logdir, "version")) + ) self.assertEqual( - 'cloud-init-log', - load_file(os.path.join(out_logdir, 'cloud-init.log'))) + "cloud-init-log", + load_file(os.path.join(out_logdir, "cloud-init.log")), + ) self.assertEqual( - 'cloud-init-output-log', - load_file(os.path.join(out_logdir, 'cloud-init-output.log'))) + "cloud-init-output-log", + load_file(os.path.join(out_logdir, "cloud-init-output.log")), + ) self.assertEqual( - 'dmesg-out\n', - load_file(os.path.join(out_logdir, 'dmesg.txt'))) + "dmesg-out\n", load_file(os.path.join(out_logdir, "dmesg.txt")) + ) self.assertEqual( - 'journal-out\n', - load_file(os.path.join(out_logdir, 'journal.txt'))) + "journal-out\n", load_file(os.path.join(out_logdir, "journal.txt")) + ) self.assertEqual( - 'results', + "results", load_file( - os.path.join(out_logdir, 'run', 'cloud-init', 'results.json'))) - fake_stderr.write.assert_any_call('Wrote %s\n' % output_tarfile) + os.path.join(out_logdir, "run", "cloud-init", "results.json") + ), + ) + fake_stderr.write.assert_any_call("Wrote %s\n" % output_tarfile) def test_collect_logs_includes_optional_userdata(self, m_getuid): """collect-logs include userdata when --include-userdata is set.""" m_getuid.return_value = 0 - log1 = self.tmp_path('cloud-init.log', self.new_root) - write_file(log1, 'cloud-init-log') - log2 = self.tmp_path('cloud-init-output.log', self.new_root) - write_file(log2, 'cloud-init-output-log') - userdata = self.tmp_path('user-data.txt', self.new_root) - write_file(userdata, 'user-data') + log1 = self.tmp_path("cloud-init.log", self.new_root) + write_file(log1, "cloud-init-log") + log2 = self.tmp_path("cloud-init-output.log", self.new_root) + write_file(log2, "cloud-init-output-log") + userdata = self.tmp_path("user-data.txt", self.new_root) + write_file(userdata, "user-data") ensure_dir(self.run_dir) - write_file(self.tmp_path('results.json', self.run_dir), 'results') - write_file(self.tmp_path(INSTANCE_JSON_SENSITIVE_FILE, self.run_dir), - 'sensitive') - output_tarfile = self.tmp_path('logs.tgz') + write_file(self.tmp_path("results.json", self.run_dir), "results") + write_file( + self.tmp_path(INSTANCE_JSON_SENSITIVE_FILE, self.run_dir), + "sensitive", + ) + output_tarfile = self.tmp_path("logs.tgz") - date = datetime.utcnow().date().strftime('%Y-%m-%d') - date_logdir = 'cloud-init-logs-{0}'.format(date) + date = datetime.utcnow().date().strftime("%Y-%m-%d") + date_logdir = "cloud-init-logs-{0}".format(date) - version_out = '/usr/bin/cloud-init 18.2fake\n' + version_out = "/usr/bin/cloud-init 18.2fake\n" expected_subp = { - ('dpkg-query', '--show', "-f=${Version}\n", 'cloud-init'): - '0.7fake', - ('cloud-init', '--version'): version_out, - ('dmesg',): 'dmesg-out\n', - ('journalctl', '--boot=0', '-o', 'short-precise'): 'journal-out\n', - ('tar', 'czvf', output_tarfile, date_logdir): '' + ( + "dpkg-query", + "--show", + "-f=${Version}\n", + "cloud-init", + ): "0.7fake", + ("cloud-init", "--version"): version_out, + ("dmesg",): "dmesg-out\n", + ("journalctl", "--boot=0", "-o", "short-precise"): "journal-out\n", + ("tar", "czvf", output_tarfile, date_logdir): "", } def fake_subp(cmd): cmd_tuple = tuple(cmd) if cmd_tuple not in expected_subp: raise AssertionError( - 'Unexpected command provided to subp: {0}'.format(cmd)) - if cmd == ['tar', 'czvf', output_tarfile, date_logdir]: + "Unexpected command provided to subp: {0}".format(cmd) + ) + if cmd == ["tar", "czvf", output_tarfile, date_logdir]: subp(cmd) # Pass through tar cmd so we can check output - return expected_subp[cmd_tuple], '' + return expected_subp[cmd_tuple], "" fake_stderr = mock.MagicMock() wrap_and_call( - 'cloudinit.cmd.devel.logs', - {'subp': {'side_effect': fake_subp}, - 'sys.stderr': {'new': fake_stderr}, - 'CLOUDINIT_LOGS': {'new': [log1, log2]}, - 'CLOUDINIT_RUN_DIR': {'new': self.run_dir}, - 'USER_DATA_FILE': {'new': userdata}}, - logs.collect_logs, output_tarfile, include_userdata=True) + "cloudinit.cmd.devel.logs", + { + "subp": {"side_effect": fake_subp}, + "sys.stderr": {"new": fake_stderr}, + "CLOUDINIT_LOGS": {"new": [log1, log2]}, + "CLOUDINIT_RUN_DIR": {"new": self.run_dir}, + "USER_DATA_FILE": {"new": userdata}, + }, + logs.collect_logs, + output_tarfile, + include_userdata=True, + ) # unpack the tarfile and check file contents - subp(['tar', 'zxvf', output_tarfile, '-C', self.new_root]) + subp(["tar", "zxvf", output_tarfile, "-C", self.new_root]) out_logdir = self.tmp_path(date_logdir, self.new_root) self.assertEqual( - 'user-data', - load_file(os.path.join(out_logdir, 'user-data.txt'))) + "user-data", load_file(os.path.join(out_logdir, "user-data.txt")) + ) self.assertEqual( - 'sensitive', - load_file(os.path.join(out_logdir, 'run', 'cloud-init', - INSTANCE_JSON_SENSITIVE_FILE))) - fake_stderr.write.assert_any_call('Wrote %s\n' % output_tarfile) + "sensitive", + load_file( + os.path.join( + out_logdir, + "run", + "cloud-init", + INSTANCE_JSON_SENSITIVE_FILE, + ) + ), + ) + fake_stderr.write.assert_any_call("Wrote %s\n" % output_tarfile) diff --git a/tests/unittests/cmd/devel/test_render.py b/tests/unittests/cmd/devel/test_render.py index c7ddca3d..4afc64f0 100644 --- a/tests/unittests/cmd/devel/test_render.py +++ b/tests/unittests/cmd/devel/test_render.py @@ -1,21 +1,21 @@ # This file is part of cloud-init. See LICENSE file for license information. import os +from collections import namedtuple from io import StringIO -from collections import namedtuple from cloudinit.cmd.devel import render from cloudinit.helpers import Paths from cloudinit.sources import INSTANCE_JSON_FILE, INSTANCE_JSON_SENSITIVE_FILE -from tests.unittests.helpers import CiTestCase, mock, skipUnlessJinja from cloudinit.util import ensure_dir, write_file +from tests.unittests.helpers import CiTestCase, mock, skipUnlessJinja class TestRender(CiTestCase): with_logs = True - args = namedtuple('renderargs', 'user_data instance_data debug') + args = namedtuple("renderargs", "user_data instance_data debug") def setUp(self): super(TestRender, self).setUp() @@ -23,122 +23,132 @@ class TestRender(CiTestCase): def test_handle_args_error_on_missing_user_data(self): """When user_data file path does not exist, log an error.""" - absent_file = self.tmp_path('user-data', dir=self.tmp) - instance_data = self.tmp_path('instance-data', dir=self.tmp) - write_file(instance_data, '{}') + absent_file = self.tmp_path("user-data", dir=self.tmp) + instance_data = self.tmp_path("instance-data", dir=self.tmp) + write_file(instance_data, "{}") args = self.args( - user_data=absent_file, instance_data=instance_data, debug=False) - with mock.patch('sys.stderr', new_callable=StringIO): - self.assertEqual(1, render.handle_args('anyname', args)) + user_data=absent_file, instance_data=instance_data, debug=False + ) + with mock.patch("sys.stderr", new_callable=StringIO): + self.assertEqual(1, render.handle_args("anyname", args)) self.assertIn( - 'Missing user-data file: %s' % absent_file, - self.logs.getvalue()) + "Missing user-data file: %s" % absent_file, self.logs.getvalue() + ) def test_handle_args_error_on_missing_instance_data(self): """When instance_data file path does not exist, log an error.""" - user_data = self.tmp_path('user-data', dir=self.tmp) - absent_file = self.tmp_path('instance-data', dir=self.tmp) + user_data = self.tmp_path("user-data", dir=self.tmp) + absent_file = self.tmp_path("instance-data", dir=self.tmp) args = self.args( - user_data=user_data, instance_data=absent_file, debug=False) - with mock.patch('sys.stderr', new_callable=StringIO): - self.assertEqual(1, render.handle_args('anyname', args)) + user_data=user_data, instance_data=absent_file, debug=False + ) + with mock.patch("sys.stderr", new_callable=StringIO): + self.assertEqual(1, render.handle_args("anyname", args)) self.assertIn( - 'Missing instance-data.json file: %s' % absent_file, - self.logs.getvalue()) + "Missing instance-data.json file: %s" % absent_file, + self.logs.getvalue(), + ) def test_handle_args_defaults_instance_data(self): """When no instance_data argument, default to configured run_dir.""" - user_data = self.tmp_path('user-data', dir=self.tmp) - run_dir = self.tmp_path('run_dir', dir=self.tmp) + user_data = self.tmp_path("user-data", dir=self.tmp) + run_dir = self.tmp_path("run_dir", dir=self.tmp) ensure_dir(run_dir) - paths = Paths({'run_dir': run_dir}) - self.add_patch('cloudinit.cmd.devel.render.read_cfg_paths', 'm_paths') + paths = Paths({"run_dir": run_dir}) + self.add_patch("cloudinit.cmd.devel.render.read_cfg_paths", "m_paths") self.m_paths.return_value = paths - args = self.args( - user_data=user_data, instance_data=None, debug=False) - with mock.patch('sys.stderr', new_callable=StringIO): - self.assertEqual(1, render.handle_args('anyname', args)) + args = self.args(user_data=user_data, instance_data=None, debug=False) + with mock.patch("sys.stderr", new_callable=StringIO): + self.assertEqual(1, render.handle_args("anyname", args)) json_file = os.path.join(run_dir, INSTANCE_JSON_FILE) self.assertIn( - 'Missing instance-data.json file: %s' % json_file, - self.logs.getvalue()) + "Missing instance-data.json file: %s" % json_file, + self.logs.getvalue(), + ) def test_handle_args_root_fallback_from_sensitive_instance_data(self): """When root user defaults to sensitive.json.""" - user_data = self.tmp_path('user-data', dir=self.tmp) - run_dir = self.tmp_path('run_dir', dir=self.tmp) + user_data = self.tmp_path("user-data", dir=self.tmp) + run_dir = self.tmp_path("run_dir", dir=self.tmp) ensure_dir(run_dir) - paths = Paths({'run_dir': run_dir}) - self.add_patch('cloudinit.cmd.devel.render.read_cfg_paths', 'm_paths') + paths = Paths({"run_dir": run_dir}) + self.add_patch("cloudinit.cmd.devel.render.read_cfg_paths", "m_paths") self.m_paths.return_value = paths - args = self.args( - user_data=user_data, instance_data=None, debug=False) - with mock.patch('sys.stderr', new_callable=StringIO): - with mock.patch('os.getuid') as m_getuid: + args = self.args(user_data=user_data, instance_data=None, debug=False) + with mock.patch("sys.stderr", new_callable=StringIO): + with mock.patch("os.getuid") as m_getuid: m_getuid.return_value = 0 - self.assertEqual(1, render.handle_args('anyname', args)) + self.assertEqual(1, render.handle_args("anyname", args)) json_file = os.path.join(run_dir, INSTANCE_JSON_FILE) json_sensitive = os.path.join(run_dir, INSTANCE_JSON_SENSITIVE_FILE) self.assertIn( - 'WARNING: Missing root-readable %s. Using redacted %s' % ( - json_sensitive, json_file), self.logs.getvalue()) + "WARNING: Missing root-readable %s. Using redacted %s" + % (json_sensitive, json_file), + self.logs.getvalue(), + ) self.assertIn( - 'ERROR: Missing instance-data.json file: %s' % json_file, - self.logs.getvalue()) + "ERROR: Missing instance-data.json file: %s" % json_file, + self.logs.getvalue(), + ) def test_handle_args_root_uses_sensitive_instance_data(self): """When root user, and no instance-data arg, use sensitive.json.""" - user_data = self.tmp_path('user-data', dir=self.tmp) - write_file(user_data, '##template: jinja\nrendering: {{ my_var }}') - run_dir = self.tmp_path('run_dir', dir=self.tmp) + user_data = self.tmp_path("user-data", dir=self.tmp) + write_file(user_data, "##template: jinja\nrendering: {{ my_var }}") + run_dir = self.tmp_path("run_dir", dir=self.tmp) ensure_dir(run_dir) json_sensitive = os.path.join(run_dir, INSTANCE_JSON_SENSITIVE_FILE) write_file(json_sensitive, '{"my-var": "jinja worked"}') - paths = Paths({'run_dir': run_dir}) - self.add_patch('cloudinit.cmd.devel.render.read_cfg_paths', 'm_paths') + paths = Paths({"run_dir": run_dir}) + self.add_patch("cloudinit.cmd.devel.render.read_cfg_paths", "m_paths") self.m_paths.return_value = paths - args = self.args( - user_data=user_data, instance_data=None, debug=False) - with mock.patch('sys.stderr', new_callable=StringIO): - with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout: - with mock.patch('os.getuid') as m_getuid: + args = self.args(user_data=user_data, instance_data=None, debug=False) + with mock.patch("sys.stderr", new_callable=StringIO): + with mock.patch("sys.stdout", new_callable=StringIO) as m_stdout: + with mock.patch("os.getuid") as m_getuid: m_getuid.return_value = 0 - self.assertEqual(0, render.handle_args('anyname', args)) - self.assertIn('rendering: jinja worked', m_stdout.getvalue()) + self.assertEqual(0, render.handle_args("anyname", args)) + self.assertIn("rendering: jinja worked", m_stdout.getvalue()) @skipUnlessJinja() def test_handle_args_renders_instance_data_vars_in_template(self): """If user_data file is a jinja template render instance-data vars.""" - user_data = self.tmp_path('user-data', dir=self.tmp) - write_file(user_data, '##template: jinja\nrendering: {{ my_var }}') - instance_data = self.tmp_path('instance-data', dir=self.tmp) + user_data = self.tmp_path("user-data", dir=self.tmp) + write_file(user_data, "##template: jinja\nrendering: {{ my_var }}") + instance_data = self.tmp_path("instance-data", dir=self.tmp) write_file(instance_data, '{"my-var": "jinja worked"}') args = self.args( - user_data=user_data, instance_data=instance_data, debug=True) - with mock.patch('sys.stderr', new_callable=StringIO) as m_console_err: - with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout: - self.assertEqual(0, render.handle_args('anyname', args)) + user_data=user_data, instance_data=instance_data, debug=True + ) + with mock.patch("sys.stderr", new_callable=StringIO) as m_console_err: + with mock.patch("sys.stdout", new_callable=StringIO) as m_stdout: + self.assertEqual(0, render.handle_args("anyname", args)) self.assertIn( - 'DEBUG: Converted jinja variables\n{', self.logs.getvalue()) + "DEBUG: Converted jinja variables\n{", self.logs.getvalue() + ) self.assertIn( - 'DEBUG: Converted jinja variables\n{', m_console_err.getvalue()) - self.assertEqual('rendering: jinja worked', m_stdout.getvalue()) + "DEBUG: Converted jinja variables\n{", m_console_err.getvalue() + ) + self.assertEqual("rendering: jinja worked", m_stdout.getvalue()) @skipUnlessJinja() def test_handle_args_warns_and_gives_up_on_invalid_jinja_operation(self): """If user_data file has invalid jinja operations log warnings.""" - user_data = self.tmp_path('user-data', dir=self.tmp) - write_file(user_data, '##template: jinja\nrendering: {{ my-var }}') - instance_data = self.tmp_path('instance-data', dir=self.tmp) + user_data = self.tmp_path("user-data", dir=self.tmp) + write_file(user_data, "##template: jinja\nrendering: {{ my-var }}") + instance_data = self.tmp_path("instance-data", dir=self.tmp) write_file(instance_data, '{"my-var": "jinja worked"}') args = self.args( - user_data=user_data, instance_data=instance_data, debug=True) - with mock.patch('sys.stderr', new_callable=StringIO): - self.assertEqual(1, render.handle_args('anyname', args)) + user_data=user_data, instance_data=instance_data, debug=True + ) + with mock.patch("sys.stderr", new_callable=StringIO): + self.assertEqual(1, render.handle_args("anyname", args)) self.assertIn( - 'WARNING: Ignoring jinja template for %s: Undefined jinja' + "WARNING: Ignoring jinja template for %s: Undefined jinja" ' variable: "my-var". Jinja tried subtraction. Perhaps you meant' ' "my_var"?' % user_data, - self.logs.getvalue()) + self.logs.getvalue(), + ) + # vi: ts=4 expandtab diff --git a/tests/unittests/cmd/test_clean.py b/tests/unittests/cmd/test_clean.py index 3bb0ee9b..7d12017e 100644 --- a/tests/unittests/cmd/test_clean.py +++ b/tests/unittests/cmd/test_clean.py @@ -1,29 +1,31 @@ # This file is part of cloud-init. See LICENSE file for license information. -from cloudinit.cmd import clean -from cloudinit.util import ensure_dir, sym_link, write_file -from tests.unittests.helpers import CiTestCase, wrap_and_call, mock -from collections import namedtuple import os +from collections import namedtuple from io import StringIO -mypaths = namedtuple('MyPaths', 'cloud_dir') +from cloudinit.cmd import clean +from cloudinit.util import ensure_dir, sym_link, write_file +from tests.unittests.helpers import CiTestCase, mock, wrap_and_call +mypaths = namedtuple("MyPaths", "cloud_dir") -class TestClean(CiTestCase): +class TestClean(CiTestCase): def setUp(self): super(TestClean, self).setUp() self.new_root = self.tmp_dir() - self.artifact_dir = self.tmp_path('artifacts', self.new_root) - self.log1 = self.tmp_path('cloud-init.log', self.new_root) - self.log2 = self.tmp_path('cloud-init-output.log', self.new_root) + self.artifact_dir = self.tmp_path("artifacts", self.new_root) + self.log1 = self.tmp_path("cloud-init.log", self.new_root) + self.log2 = self.tmp_path("cloud-init-output.log", self.new_root) class FakeInit(object): - cfg = {'def_log_file': self.log1, - 'output': {'all': '|tee -a {0}'.format(self.log2)}} + cfg = { + "def_log_file": self.log1, + "output": {"all": "|tee -a {0}".format(self.log2)}, + } # Ensure cloud_dir has a trailing slash, to match real behaviour - paths = mypaths(cloud_dir='{}/'.format(self.artifact_dir)) + paths = mypaths(cloud_dir="{}/".format(self.artifact_dir)) def __init__(self, ds_deps): pass @@ -35,110 +37,133 @@ class TestClean(CiTestCase): def test_remove_artifacts_removes_logs(self): """remove_artifacts removes logs when remove_logs is True.""" - write_file(self.log1, 'cloud-init-log') - write_file(self.log2, 'cloud-init-output-log') + write_file(self.log1, "cloud-init-log") + write_file(self.log2, "cloud-init-output-log") self.assertFalse( - os.path.exists(self.artifact_dir), 'Unexpected artifacts dir') + os.path.exists(self.artifact_dir), "Unexpected artifacts dir" + ) retcode = wrap_and_call( - 'cloudinit.cmd.clean', - {'Init': {'side_effect': self.init_class}}, - clean.remove_artifacts, remove_logs=True) - self.assertFalse(os.path.exists(self.log1), 'Unexpected file') - self.assertFalse(os.path.exists(self.log2), 'Unexpected file') + "cloudinit.cmd.clean", + {"Init": {"side_effect": self.init_class}}, + clean.remove_artifacts, + remove_logs=True, + ) + self.assertFalse(os.path.exists(self.log1), "Unexpected file") + self.assertFalse(os.path.exists(self.log2), "Unexpected file") self.assertEqual(0, retcode) def test_remove_artifacts_preserves_logs(self): """remove_artifacts leaves logs when remove_logs is False.""" - write_file(self.log1, 'cloud-init-log') - write_file(self.log2, 'cloud-init-output-log') + write_file(self.log1, "cloud-init-log") + write_file(self.log2, "cloud-init-output-log") retcode = wrap_and_call( - 'cloudinit.cmd.clean', - {'Init': {'side_effect': self.init_class}}, - clean.remove_artifacts, remove_logs=False) - self.assertTrue(os.path.exists(self.log1), 'Missing expected file') - self.assertTrue(os.path.exists(self.log2), 'Missing expected file') + "cloudinit.cmd.clean", + {"Init": {"side_effect": self.init_class}}, + clean.remove_artifacts, + remove_logs=False, + ) + self.assertTrue(os.path.exists(self.log1), "Missing expected file") + self.assertTrue(os.path.exists(self.log2), "Missing expected file") self.assertEqual(0, retcode) def test_remove_artifacts_removes_unlinks_symlinks(self): """remove_artifacts cleans artifacts dir unlinking any symlinks.""" - dir1 = os.path.join(self.artifact_dir, 'dir1') + dir1 = os.path.join(self.artifact_dir, "dir1") ensure_dir(dir1) - symlink = os.path.join(self.artifact_dir, 'mylink') + symlink = os.path.join(self.artifact_dir, "mylink") sym_link(dir1, symlink) retcode = wrap_and_call( - 'cloudinit.cmd.clean', - {'Init': {'side_effect': self.init_class}}, - clean.remove_artifacts, remove_logs=False) + "cloudinit.cmd.clean", + {"Init": {"side_effect": self.init_class}}, + clean.remove_artifacts, + remove_logs=False, + ) self.assertEqual(0, retcode) for path in (dir1, symlink): self.assertFalse( - os.path.exists(path), - 'Unexpected {0} dir'.format(path)) + os.path.exists(path), "Unexpected {0} dir".format(path) + ) def test_remove_artifacts_removes_artifacts_skipping_seed(self): """remove_artifacts cleans artifacts dir with exception of seed dir.""" dirs = [ self.artifact_dir, - os.path.join(self.artifact_dir, 'seed'), - os.path.join(self.artifact_dir, 'dir1'), - os.path.join(self.artifact_dir, 'dir2')] + os.path.join(self.artifact_dir, "seed"), + os.path.join(self.artifact_dir, "dir1"), + os.path.join(self.artifact_dir, "dir2"), + ] for _dir in dirs: ensure_dir(_dir) retcode = wrap_and_call( - 'cloudinit.cmd.clean', - {'Init': {'side_effect': self.init_class}}, - clean.remove_artifacts, remove_logs=False) + "cloudinit.cmd.clean", + {"Init": {"side_effect": self.init_class}}, + clean.remove_artifacts, + remove_logs=False, + ) self.assertEqual(0, retcode) for expected_dir in dirs[:2]: self.assertTrue( os.path.exists(expected_dir), - 'Missing {0} dir'.format(expected_dir)) + "Missing {0} dir".format(expected_dir), + ) for deleted_dir in dirs[2:]: self.assertFalse( os.path.exists(deleted_dir), - 'Unexpected {0} dir'.format(deleted_dir)) + "Unexpected {0} dir".format(deleted_dir), + ) def test_remove_artifacts_removes_artifacts_removes_seed(self): """remove_artifacts removes seed dir when remove_seed is True.""" dirs = [ self.artifact_dir, - os.path.join(self.artifact_dir, 'seed'), - os.path.join(self.artifact_dir, 'dir1'), - os.path.join(self.artifact_dir, 'dir2')] + os.path.join(self.artifact_dir, "seed"), + os.path.join(self.artifact_dir, "dir1"), + os.path.join(self.artifact_dir, "dir2"), + ] for _dir in dirs: ensure_dir(_dir) retcode = wrap_and_call( - 'cloudinit.cmd.clean', - {'Init': {'side_effect': self.init_class}}, - clean.remove_artifacts, remove_logs=False, remove_seed=True) + "cloudinit.cmd.clean", + {"Init": {"side_effect": self.init_class}}, + clean.remove_artifacts, + remove_logs=False, + remove_seed=True, + ) self.assertEqual(0, retcode) self.assertTrue( - os.path.exists(self.artifact_dir), 'Missing artifact dir') + os.path.exists(self.artifact_dir), "Missing artifact dir" + ) for deleted_dir in dirs[1:]: self.assertFalse( os.path.exists(deleted_dir), - 'Unexpected {0} dir'.format(deleted_dir)) + "Unexpected {0} dir".format(deleted_dir), + ) def test_remove_artifacts_returns_one_on_errors(self): """remove_artifacts returns non-zero on failure and prints an error.""" ensure_dir(self.artifact_dir) - ensure_dir(os.path.join(self.artifact_dir, 'dir1')) + ensure_dir(os.path.join(self.artifact_dir, "dir1")) - with mock.patch('sys.stderr', new_callable=StringIO) as m_stderr: + with mock.patch("sys.stderr", new_callable=StringIO) as m_stderr: retcode = wrap_and_call( - 'cloudinit.cmd.clean', - {'del_dir': {'side_effect': OSError('oops')}, - 'Init': {'side_effect': self.init_class}}, - clean.remove_artifacts, remove_logs=False) + "cloudinit.cmd.clean", + { + "del_dir": {"side_effect": OSError("oops")}, + "Init": {"side_effect": self.init_class}, + }, + clean.remove_artifacts, + remove_logs=False, + ) self.assertEqual(1, retcode) self.assertEqual( - 'Error:\nCould not remove %s/dir1: oops\n' % self.artifact_dir, - m_stderr.getvalue()) + "Error:\nCould not remove %s/dir1: oops\n" % self.artifact_dir, + m_stderr.getvalue(), + ) def test_handle_clean_args_reboots(self): """handle_clean_args_reboots when reboot arg is provided.""" @@ -147,32 +172,40 @@ class TestClean(CiTestCase): def fake_subp(cmd, capture): called_cmds.append((cmd, capture)) - return '', '' + return "", "" - myargs = namedtuple('MyArgs', 'remove_logs remove_seed reboot') + myargs = namedtuple("MyArgs", "remove_logs remove_seed reboot") cmdargs = myargs(remove_logs=False, remove_seed=False, reboot=True) retcode = wrap_and_call( - 'cloudinit.cmd.clean', - {'subp': {'side_effect': fake_subp}, - 'Init': {'side_effect': self.init_class}}, - clean.handle_clean_args, name='does not matter', args=cmdargs) + "cloudinit.cmd.clean", + { + "subp": {"side_effect": fake_subp}, + "Init": {"side_effect": self.init_class}, + }, + clean.handle_clean_args, + name="does not matter", + args=cmdargs, + ) self.assertEqual(0, retcode) - self.assertEqual( - [(['shutdown', '-r', 'now'], False)], called_cmds) + self.assertEqual([(["shutdown", "-r", "now"], False)], called_cmds) def test_status_main(self): - '''clean.main can be run as a standalone script.''' - write_file(self.log1, 'cloud-init-log') + """clean.main can be run as a standalone script.""" + write_file(self.log1, "cloud-init-log") with self.assertRaises(SystemExit) as context_manager: wrap_and_call( - 'cloudinit.cmd.clean', - {'Init': {'side_effect': self.init_class}, - 'sys.argv': {'new': ['clean', '--logs']}}, - clean.main) + "cloudinit.cmd.clean", + { + "Init": {"side_effect": self.init_class}, + "sys.argv": {"new": ["clean", "--logs"]}, + }, + clean.main, + ) self.assertEqual(0, context_manager.exception.code) self.assertFalse( - os.path.exists(self.log1), 'Unexpected log {0}'.format(self.log1)) + os.path.exists(self.log1), "Unexpected log {0}".format(self.log1) + ) # vi: ts=4 expandtab syntax=python diff --git a/tests/unittests/cmd/test_cloud_id.py b/tests/unittests/cmd/test_cloud_id.py index 9a010402..42941d4f 100644 --- a/tests/unittests/cmd/test_cloud_id.py +++ b/tests/unittests/cmd/test_cloud_id.py @@ -2,41 +2,45 @@ """Tests for cloud-id command line utility.""" -from cloudinit import util from collections import namedtuple from io import StringIO +from cloudinit import util from cloudinit.cmd import cloud_id - from tests.unittests.helpers import CiTestCase, mock class TestCloudId(CiTestCase): - args = namedtuple('cloudidargs', ('instance_data json long')) + args = namedtuple("cloudidargs", "instance_data json long") def setUp(self): super(TestCloudId, self).setUp() self.tmp = self.tmp_dir() - self.instance_data = self.tmp_path('instance-data.json', dir=self.tmp) + self.instance_data = self.tmp_path("instance-data.json", dir=self.tmp) def test_cloud_id_arg_parser_defaults(self): """Validate the argument defaults when not provided by the end-user.""" - cmd = ['cloud-id'] - with mock.patch('sys.argv', cmd): + cmd = ["cloud-id"] + with mock.patch("sys.argv", cmd): args = cloud_id.get_parser().parse_args() self.assertEqual( - '/run/cloud-init/instance-data.json', - args.instance_data) + "/run/cloud-init/instance-data.json", args.instance_data + ) self.assertEqual(False, args.long) self.assertEqual(False, args.json) def test_cloud_id_arg_parse_overrides(self): """Override argument defaults by specifying values for each param.""" - util.write_file(self.instance_data, '{}') - cmd = ['cloud-id', '--instance-data', self.instance_data, '--long', - '--json'] - with mock.patch('sys.argv', cmd): + util.write_file(self.instance_data, "{}") + cmd = [ + "cloud-id", + "--instance-data", + self.instance_data, + "--long", + "--json", + ] + with mock.patch("sys.argv", cmd): args = cloud_id.get_parser().parse_args() self.assertEqual(self.instance_data, args.instance_data) self.assertEqual(True, args.long) @@ -44,37 +48,40 @@ class TestCloudId(CiTestCase): def test_cloud_id_missing_instance_data_json(self): """Exit error when the provided instance-data.json does not exist.""" - cmd = ['cloud-id', '--instance-data', self.instance_data] - with mock.patch('sys.argv', cmd): - with mock.patch('sys.stderr', new_callable=StringIO) as m_stderr: + cmd = ["cloud-id", "--instance-data", self.instance_data] + with mock.patch("sys.argv", cmd): + with mock.patch("sys.stderr", new_callable=StringIO) as m_stderr: with self.assertRaises(SystemExit) as context_manager: cloud_id.main() self.assertEqual(1, context_manager.exception.code) self.assertIn( "Error:\nFile not found '%s'" % self.instance_data, - m_stderr.getvalue()) + m_stderr.getvalue(), + ) def test_cloud_id_non_json_instance_data(self): """Exit error when the provided instance-data.json is not json.""" - cmd = ['cloud-id', '--instance-data', self.instance_data] - util.write_file(self.instance_data, '{') - with mock.patch('sys.argv', cmd): - with mock.patch('sys.stderr', new_callable=StringIO) as m_stderr: + cmd = ["cloud-id", "--instance-data", self.instance_data] + util.write_file(self.instance_data, "{") + with mock.patch("sys.argv", cmd): + with mock.patch("sys.stderr", new_callable=StringIO) as m_stderr: with self.assertRaises(SystemExit) as context_manager: cloud_id.main() self.assertEqual(1, context_manager.exception.code) self.assertIn( "Error:\nFile '%s' is not valid json." % self.instance_data, - m_stderr.getvalue()) + m_stderr.getvalue(), + ) def test_cloud_id_from_cloud_name_in_instance_data(self): """Report canonical cloud-id from cloud_name in instance-data.""" util.write_file( self.instance_data, - '{"v1": {"cloud_name": "mycloud", "region": "somereg"}}') - cmd = ['cloud-id', '--instance-data', self.instance_data] - with mock.patch('sys.argv', cmd): - with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout: + '{"v1": {"cloud_name": "mycloud", "region": "somereg"}}', + ) + cmd = ["cloud-id", "--instance-data", self.instance_data] + with mock.patch("sys.argv", cmd): + with mock.patch("sys.stdout", new_callable=StringIO) as m_stdout: with self.assertRaises(SystemExit) as context_manager: cloud_id.main() self.assertEqual(0, context_manager.exception.code) @@ -84,10 +91,11 @@ class TestCloudId(CiTestCase): """Report long cloud-id format from cloud_name and region.""" util.write_file( self.instance_data, - '{"v1": {"cloud_name": "mycloud", "region": "somereg"}}') - cmd = ['cloud-id', '--instance-data', self.instance_data, '--long'] - with mock.patch('sys.argv', cmd): - with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout: + '{"v1": {"cloud_name": "mycloud", "region": "somereg"}}', + ) + cmd = ["cloud-id", "--instance-data", self.instance_data, "--long"] + with mock.patch("sys.argv", cmd): + with mock.patch("sys.stdout", new_callable=StringIO) as m_stdout: with self.assertRaises(SystemExit) as context_manager: cloud_id.main() self.assertEqual(0, context_manager.exception.code) @@ -98,10 +106,11 @@ class TestCloudId(CiTestCase): util.write_file( self.instance_data, '{"v1": {"cloud_name": "aws", "region": "cn-north-1",' - ' "platform": "ec2"}}') - cmd = ['cloud-id', '--instance-data', self.instance_data, '--long'] - with mock.patch('sys.argv', cmd): - with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout: + ' "platform": "ec2"}}', + ) + cmd = ["cloud-id", "--instance-data", self.instance_data, "--long"] + with mock.patch("sys.argv", cmd): + with mock.patch("sys.stdout", new_callable=StringIO) as m_stdout: with self.assertRaises(SystemExit) as context_manager: cloud_id.main() self.assertEqual(0, context_manager.exception.code) @@ -112,16 +121,24 @@ class TestCloudId(CiTestCase): util.write_file( self.instance_data, '{"v1": {"cloud_name": "unknown", "region": "dfw",' - ' "platform": "openstack", "public_ssh_keys": []}}') - expected = util.json_dumps({ - 'cloud_id': 'openstack', 'cloud_name': 'unknown', - 'platform': 'openstack', 'public_ssh_keys': [], 'region': 'dfw'}) - cmd = ['cloud-id', '--instance-data', self.instance_data, '--json'] - with mock.patch('sys.argv', cmd): - with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout: + ' "platform": "openstack", "public_ssh_keys": []}}', + ) + expected = util.json_dumps( + { + "cloud_id": "openstack", + "cloud_name": "unknown", + "platform": "openstack", + "public_ssh_keys": [], + "region": "dfw", + } + ) + cmd = ["cloud-id", "--instance-data", self.instance_data, "--json"] + with mock.patch("sys.argv", cmd): + with mock.patch("sys.stdout", new_callable=StringIO) as m_stdout: with self.assertRaises(SystemExit) as context_manager: cloud_id.main() self.assertEqual(0, context_manager.exception.code) - self.assertEqual(expected + '\n', m_stdout.getvalue()) + self.assertEqual(expected + "\n", m_stdout.getvalue()) + # vi: ts=4 expandtab diff --git a/tests/unittests/cmd/test_main.py b/tests/unittests/cmd/test_main.py index e1ce682b..3e778b0b 100644 --- a/tests/unittests/cmd/test_main.py +++ b/tests/unittests/cmd/test_main.py @@ -1,22 +1,20 @@ # This file is part of cloud-init. See LICENSE file for license information. -from collections import namedtuple import copy import os +from collections import namedtuple from io import StringIO from unittest import mock import pytest -from cloudinit.cmd import main from cloudinit import safeyaml -from cloudinit.util import ( - ensure_dir, load_file, write_file) -from tests.unittests.helpers import ( - FilesystemMockingTestCase, wrap_and_call) +from cloudinit.cmd import main +from cloudinit.util import ensure_dir, load_file, write_file +from tests.unittests.helpers import FilesystemMockingTestCase, wrap_and_call -mypaths = namedtuple('MyPaths', 'run_dir') -myargs = namedtuple('MyArgs', 'debug files force local reporter subcommand') +mypaths = namedtuple("MyPaths", "run_dir") +myargs = namedtuple("MyArgs", "debug files force local reporter subcommand") class TestMain(FilesystemMockingTestCase): @@ -26,27 +24,32 @@ class TestMain(FilesystemMockingTestCase): def setUp(self): super(TestMain, self).setUp() self.new_root = self.tmp_dir() - self.cloud_dir = self.tmp_path('var/lib/cloud/', dir=self.new_root) + self.cloud_dir = self.tmp_path("var/lib/cloud/", dir=self.new_root) os.makedirs(self.cloud_dir) - self.replicateTestRoot('simple_ubuntu', self.new_root) + self.replicateTestRoot("simple_ubuntu", self.new_root) self.cfg = { - 'datasource_list': ['None'], - 'runcmd': ['ls /etc'], # test ALL_DISTROS - 'system_info': {'paths': {'cloud_dir': self.cloud_dir, - 'run_dir': self.new_root}}, - 'write_files': [ + "datasource_list": ["None"], + "runcmd": ["ls /etc"], # test ALL_DISTROS + "system_info": { + "paths": { + "cloud_dir": self.cloud_dir, + "run_dir": self.new_root, + } + }, + "write_files": [ { - 'path': '/etc/blah.ini', - 'content': 'blah', - 'permissions': 0o755, + "path": "/etc/blah.ini", + "content": "blah", + "permissions": 0o755, }, ], - 'cloud_init_modules': ['write-files', 'runcmd'], + "cloud_init_modules": ["write-files", "runcmd"], } cloud_cfg = safeyaml.dumps(self.cfg) - ensure_dir(os.path.join(self.new_root, 'etc', 'cloud')) + ensure_dir(os.path.join(self.new_root, "etc", "cloud")) self.cloud_cfg_file = os.path.join( - self.new_root, 'etc', 'cloud', 'cloud.cfg') + self.new_root, "etc", "cloud", "cloud.cfg" + ) write_file(self.cloud_cfg_file, cloud_cfg) self.patchOS(self.new_root) self.patchUtils(self.new_root) @@ -55,31 +58,44 @@ class TestMain(FilesystemMockingTestCase): def test_main_init_run_net_stops_on_file_no_net(self): """When no-net file is present, main_init does not process modules.""" - stop_file = os.path.join(self.cloud_dir, 'data', 'no-net') # stop file - write_file(stop_file, '') + stop_file = os.path.join(self.cloud_dir, "data", "no-net") # stop file + write_file(stop_file, "") cmdargs = myargs( - debug=False, files=None, force=False, local=False, reporter=None, - subcommand='init') + debug=False, + files=None, + force=False, + local=False, + reporter=None, + subcommand="init", + ) (_item1, item2) = wrap_and_call( - 'cloudinit.cmd.main', - {'util.close_stdin': True, - 'netinfo.debug_info': 'my net debug info', - 'util.fixup_output': ('outfmt', 'errfmt')}, - main.main_init, 'init', cmdargs) + "cloudinit.cmd.main", + { + "util.close_stdin": True, + "netinfo.debug_info": "my net debug info", + "util.fixup_output": ("outfmt", "errfmt"), + }, + main.main_init, + "init", + cmdargs, + ) # We should not run write_files module self.assertFalse( - os.path.exists(os.path.join(self.new_root, 'etc/blah.ini')), - 'Unexpected run of write_files module produced blah.ini') + os.path.exists(os.path.join(self.new_root, "etc/blah.ini")), + "Unexpected run of write_files module produced blah.ini", + ) self.assertEqual([], item2) # Instancify is called - instance_id_path = 'var/lib/cloud/data/instance-id' + instance_id_path = "var/lib/cloud/data/instance-id" self.assertFalse( os.path.exists(os.path.join(self.new_root, instance_id_path)), - 'Unexpected call to datasource.instancify produced instance-id') + "Unexpected call to datasource.instancify produced instance-id", + ) expected_logs = [ "Exiting. stop file ['{stop_file}'] existed\n".format( - stop_file=stop_file), - 'my net debug info' # netinfo.debug_info + stop_file=stop_file + ), + "my net debug info", # netinfo.debug_info ] for log in expected_logs: self.assertIn(log, self.stderr.getvalue()) @@ -87,97 +103,133 @@ class TestMain(FilesystemMockingTestCase): def test_main_init_run_net_runs_modules(self): """Modules like write_files are run in 'net' mode.""" cmdargs = myargs( - debug=False, files=None, force=False, local=False, reporter=None, - subcommand='init') + debug=False, + files=None, + force=False, + local=False, + reporter=None, + subcommand="init", + ) (_item1, item2) = wrap_and_call( - 'cloudinit.cmd.main', - {'util.close_stdin': True, - 'netinfo.debug_info': 'my net debug info', - 'util.fixup_output': ('outfmt', 'errfmt')}, - main.main_init, 'init', cmdargs) + "cloudinit.cmd.main", + { + "util.close_stdin": True, + "netinfo.debug_info": "my net debug info", + "util.fixup_output": ("outfmt", "errfmt"), + }, + main.main_init, + "init", + cmdargs, + ) self.assertEqual([], item2) # Instancify is called - instance_id_path = 'var/lib/cloud/data/instance-id' + instance_id_path = "var/lib/cloud/data/instance-id" self.assertEqual( - 'iid-datasource-none\n', - os.path.join(load_file( - os.path.join(self.new_root, instance_id_path)))) + "iid-datasource-none\n", + os.path.join( + load_file(os.path.join(self.new_root, instance_id_path)) + ), + ) # modules are run (including write_files) self.assertEqual( - 'blah', load_file(os.path.join(self.new_root, 'etc/blah.ini'))) + "blah", load_file(os.path.join(self.new_root, "etc/blah.ini")) + ) expected_logs = [ - 'network config is disabled by fallback', # apply_network_config - 'my net debug info', # netinfo.debug_info - 'no previous run detected' + "network config is disabled by fallback", # apply_network_config + "my net debug info", # netinfo.debug_info + "no previous run detected", ] for log in expected_logs: self.assertIn(log, self.stderr.getvalue()) def test_main_init_run_net_calls_set_hostname_when_metadata_present(self): """When local-hostname metadata is present, call cc_set_hostname.""" - self.cfg['datasource'] = { - 'None': {'metadata': {'local-hostname': 'md-hostname'}}} + self.cfg["datasource"] = { + "None": {"metadata": {"local-hostname": "md-hostname"}} + } cloud_cfg = safeyaml.dumps(self.cfg) write_file(self.cloud_cfg_file, cloud_cfg) cmdargs = myargs( - debug=False, files=None, force=False, local=False, reporter=None, - subcommand='init') + debug=False, + files=None, + force=False, + local=False, + reporter=None, + subcommand="init", + ) def set_hostname(name, cfg, cloud, log, args): - self.assertEqual('set-hostname', name) + self.assertEqual("set-hostname", name) updated_cfg = copy.deepcopy(self.cfg) updated_cfg.update( - {'def_log_file': '/var/log/cloud-init.log', - 'log_cfgs': [], - 'syslog_fix_perms': [ - 'syslog:adm', 'root:adm', 'root:wheel', 'root:root' - ], - 'vendor_data': {'enabled': True, 'prefix': []}, - 'vendor_data2': {'enabled': True, 'prefix': []}}) - updated_cfg.pop('system_info') + { + "def_log_file": "/var/log/cloud-init.log", + "log_cfgs": [], + "syslog_fix_perms": [ + "syslog:adm", + "root:adm", + "root:wheel", + "root:root", + ], + "vendor_data": {"enabled": True, "prefix": []}, + "vendor_data2": {"enabled": True, "prefix": []}, + } + ) + updated_cfg.pop("system_info") self.assertEqual(updated_cfg, cfg) self.assertEqual(main.LOG, log) self.assertIsNone(args) (_item1, item2) = wrap_and_call( - 'cloudinit.cmd.main', - {'util.close_stdin': True, - 'netinfo.debug_info': 'my net debug info', - 'cc_set_hostname.handle': {'side_effect': set_hostname}, - 'util.fixup_output': ('outfmt', 'errfmt')}, - main.main_init, 'init', cmdargs) + "cloudinit.cmd.main", + { + "util.close_stdin": True, + "netinfo.debug_info": "my net debug info", + "cc_set_hostname.handle": {"side_effect": set_hostname}, + "util.fixup_output": ("outfmt", "errfmt"), + }, + main.main_init, + "init", + cmdargs, + ) self.assertEqual([], item2) # Instancify is called - instance_id_path = 'var/lib/cloud/data/instance-id' + instance_id_path = "var/lib/cloud/data/instance-id" self.assertEqual( - 'iid-datasource-none\n', - os.path.join(load_file( - os.path.join(self.new_root, instance_id_path)))) + "iid-datasource-none\n", + os.path.join( + load_file(os.path.join(self.new_root, instance_id_path)) + ), + ) # modules are run (including write_files) self.assertEqual( - 'blah', load_file(os.path.join(self.new_root, 'etc/blah.ini'))) + "blah", load_file(os.path.join(self.new_root, "etc/blah.ini")) + ) expected_logs = [ - 'network config is disabled by fallback', # apply_network_config - 'my net debug info', # netinfo.debug_info - 'no previous run detected' + "network config is disabled by fallback", # apply_network_config + "my net debug info", # netinfo.debug_info + "no previous run detected", ] for log in expected_logs: self.assertIn(log, self.stderr.getvalue()) class TestShouldBringUpInterfaces: - @pytest.mark.parametrize('cfg_disable,args_local,expected', [ - (True, True, False), - (True, False, False), - (False, True, False), - (False, False, True), - ]) + @pytest.mark.parametrize( + "cfg_disable,args_local,expected", + [ + (True, True, False), + (True, False, False), + (False, True, False), + (False, False, True), + ], + ) def test_should_bring_up_interfaces( self, cfg_disable, args_local, expected ): init = mock.Mock() - init.cfg = {'disable_network_activation': cfg_disable} + init.cfg = {"disable_network_activation": cfg_disable} args = mock.Mock() args.local = args_local @@ -185,4 +237,5 @@ class TestShouldBringUpInterfaces: result = main._should_bring_up_interfaces(init, args) assert result == expected + # vi: ts=4 expandtab diff --git a/tests/unittests/cmd/test_query.py b/tests/unittests/cmd/test_query.py index b7d02d13..03a73bb5 100644 --- a/tests/unittests/cmd/test_query.py +++ b/tests/unittests/cmd/test_query.py @@ -4,19 +4,21 @@ import errno import gzip import json import os +from collections import namedtuple from io import BytesIO from textwrap import dedent import pytest -from collections import namedtuple from cloudinit.cmd import query from cloudinit.helpers import Paths from cloudinit.sources import ( - REDACT_SENSITIVE_VALUE, INSTANCE_JSON_FILE, INSTANCE_JSON_SENSITIVE_FILE) -from tests.unittests.helpers import mock - + INSTANCE_JSON_FILE, + INSTANCE_JSON_SENSITIVE_FILE, + REDACT_SENSITIVE_VALUE, +) from cloudinit.util import b64e, write_file +from tests.unittests.helpers import mock def _gzip_data(data): @@ -30,9 +32,10 @@ def _gzip_data(data): class TestQuery: args = namedtuple( - 'queryargs', - ('debug dump_all format instance_data list_keys user_data vendor_data' - ' varname')) + "queryargs", + "debug dump_all format instance_data list_keys user_data vendor_data" + " varname", + ) def _setup_paths(self, tmpdir, ud_val=None, vd_val=None): """Write userdata and vendordata into a tmpdir. @@ -41,153 +44,191 @@ class TestQuery: 4-tuple : (paths, run_dir_path, userdata_path, vendordata_path) """ if ud_val: - user_data = tmpdir.join('user-data') + user_data = tmpdir.join("user-data") write_file(user_data.strpath, ud_val) else: user_data = None if vd_val: - vendor_data = tmpdir.join('vendor-data') + vendor_data = tmpdir.join("vendor-data") write_file(vendor_data.strpath, vd_val) else: vendor_data = None - run_dir = tmpdir.join('run_dir') + run_dir = tmpdir.join("run_dir") run_dir.ensure_dir() - cloud_dir = tmpdir.join('cloud_dir') + cloud_dir = tmpdir.join("cloud_dir") cloud_dir.ensure_dir() return ( Paths( - {'cloud_dir': cloud_dir.strpath, 'run_dir': run_dir.strpath} + {"cloud_dir": cloud_dir.strpath, "run_dir": run_dir.strpath} ), run_dir, user_data, - vendor_data + vendor_data, ) def test_handle_args_error_on_missing_param(self, caplog, capsys): """Error when missing required parameters and print usage.""" args = self.args( - debug=False, dump_all=False, format=None, instance_data=None, - list_keys=False, user_data=None, vendor_data=None, varname=None) + debug=False, + dump_all=False, + format=None, + instance_data=None, + list_keys=False, + user_data=None, + vendor_data=None, + varname=None, + ) with mock.patch( "cloudinit.cmd.query.addLogHandlerCLI", return_value="" ) as m_cli_log: - assert 1 == query.handle_args('anyname', args) + assert 1 == query.handle_args("anyname", args) expected_error = ( - 'Expected one of the options: --all, --format, --list-keys' - ' or varname\n') + "Expected one of the options: --all, --format, --list-keys" + " or varname\n" + ) assert expected_error in caplog.text out, _err = capsys.readouterr() - assert 'usage: query' in out + assert "usage: query" in out assert 1 == m_cli_log.call_count @pytest.mark.parametrize( - "inst_data,varname,expected_error", ( + "inst_data,varname,expected_error", + ( ( '{"v1": {"key-2": "value-2"}}', - 'v1.absent_leaf', - "instance-data 'v1' has no 'absent_leaf'\n" + "v1.absent_leaf", + "instance-data 'v1' has no 'absent_leaf'\n", ), ( '{"v1": {"key-2": "value-2"}}', - 'absent_key', - "Undefined instance-data key 'absent_key'\n" + "absent_key", + "Undefined instance-data key 'absent_key'\n", ), - ) + ), ) def test_handle_args_error_on_invalid_vaname_paths( self, inst_data, varname, expected_error, caplog, tmpdir ): """Error when varname is not a valid instance-data variable path.""" - instance_data = tmpdir.join('instance-data') + instance_data = tmpdir.join("instance-data") instance_data.write(inst_data) args = self.args( - debug=False, dump_all=False, format=None, + debug=False, + dump_all=False, + format=None, instance_data=instance_data.strpath, - list_keys=False, user_data=None, vendor_data=None, varname=varname + list_keys=False, + user_data=None, + vendor_data=None, + varname=varname, ) paths, _, _, _ = self._setup_paths(tmpdir) - with mock.patch('cloudinit.cmd.query.read_cfg_paths') as m_paths: + with mock.patch("cloudinit.cmd.query.read_cfg_paths") as m_paths: m_paths.return_value = paths with mock.patch( "cloudinit.cmd.query.addLogHandlerCLI", return_value="" ): - with mock.patch('cloudinit.cmd.query.load_userdata') as m_lud: + with mock.patch("cloudinit.cmd.query.load_userdata") as m_lud: m_lud.return_value = "ud" - assert 1 == query.handle_args('anyname', args) + assert 1 == query.handle_args("anyname", args) assert expected_error in caplog.text def test_handle_args_error_on_missing_instance_data(self, caplog, tmpdir): """When instance_data file path does not exist, log an error.""" - absent_fn = tmpdir.join('absent') + absent_fn = tmpdir.join("absent") args = self.args( - debug=False, dump_all=True, format=None, + debug=False, + dump_all=True, + format=None, instance_data=absent_fn.strpath, - list_keys=False, user_data='ud', vendor_data='vd', varname=None) - assert 1 == query.handle_args('anyname', args) + list_keys=False, + user_data="ud", + vendor_data="vd", + varname=None, + ) + assert 1 == query.handle_args("anyname", args) - msg = 'Missing instance-data file: %s' % absent_fn + msg = "Missing instance-data file: %s" % absent_fn assert msg in caplog.text def test_handle_args_error_when_no_read_permission_instance_data( self, caplog, tmpdir ): """When instance_data file is unreadable, log an error.""" - noread_fn = tmpdir.join('unreadable') - noread_fn.write('thou shall not pass') + noread_fn = tmpdir.join("unreadable") + noread_fn.write("thou shall not pass") args = self.args( - debug=False, dump_all=True, format=None, + debug=False, + dump_all=True, + format=None, instance_data=noread_fn.strpath, - list_keys=False, user_data='ud', vendor_data='vd', varname=None) - with mock.patch('cloudinit.cmd.query.util.load_file') as m_load: - m_load.side_effect = OSError(errno.EACCES, 'Not allowed') - assert 1 == query.handle_args('anyname', args) + list_keys=False, + user_data="ud", + vendor_data="vd", + varname=None, + ) + with mock.patch("cloudinit.cmd.query.util.load_file") as m_load: + m_load.side_effect = OSError(errno.EACCES, "Not allowed") + assert 1 == query.handle_args("anyname", args) msg = "No read permission on '%s'. Try sudo" % noread_fn assert msg in caplog.text def test_handle_args_defaults_instance_data(self, caplog, tmpdir): """When no instance_data argument, default to configured run_dir.""" args = self.args( - debug=False, dump_all=True, format=None, instance_data=None, - list_keys=False, user_data=None, vendor_data=None, varname=None) + debug=False, + dump_all=True, + format=None, + instance_data=None, + list_keys=False, + user_data=None, + vendor_data=None, + varname=None, + ) paths, run_dir, _, _ = self._setup_paths(tmpdir) - with mock.patch('cloudinit.cmd.query.read_cfg_paths') as m_paths: + with mock.patch("cloudinit.cmd.query.read_cfg_paths") as m_paths: m_paths.return_value = paths - assert 1 == query.handle_args('anyname', args) + assert 1 == query.handle_args("anyname", args) json_file = run_dir.join(INSTANCE_JSON_FILE) - msg = 'Missing instance-data file: %s' % json_file.strpath + msg = "Missing instance-data file: %s" % json_file.strpath assert msg in caplog.text def test_handle_args_root_fallsback_to_instance_data(self, caplog, tmpdir): """When no instance_data argument, root falls back to redacted json.""" args = self.args( - debug=False, dump_all=True, format=None, instance_data=None, - list_keys=False, user_data=None, vendor_data=None, varname=None) + debug=False, + dump_all=True, + format=None, + instance_data=None, + list_keys=False, + user_data=None, + vendor_data=None, + varname=None, + ) paths, run_dir, _, _ = self._setup_paths(tmpdir) - with mock.patch('cloudinit.cmd.query.read_cfg_paths') as m_paths: + with mock.patch("cloudinit.cmd.query.read_cfg_paths") as m_paths: m_paths.return_value = paths - with mock.patch('os.getuid') as m_getuid: + with mock.patch("os.getuid") as m_getuid: m_getuid.return_value = 0 - assert 1 == query.handle_args('anyname', args) + assert 1 == query.handle_args("anyname", args) json_file = run_dir.join(INSTANCE_JSON_FILE) sensitive_file = run_dir.join(INSTANCE_JSON_SENSITIVE_FILE) - msg = ( - 'Missing root-readable %s. Using redacted %s instead.' % - ( - sensitive_file.strpath, json_file.strpath - ) + msg = "Missing root-readable %s. Using redacted %s instead." % ( + sensitive_file.strpath, + json_file.strpath, ) assert msg in caplog.text @pytest.mark.parametrize( - 'ud_src,ud_expected,vd_src,vd_expected', + "ud_src,ud_expected,vd_src,vd_expected", ( - ('hi mom', 'hi mom', 'hi pops', 'hi pops'), - ('ud'.encode('utf-8'), 'ud', 'vd'.encode('utf-8'), 'vd'), - (_gzip_data(b'ud'), 'ud', _gzip_data(b'vd'), 'vd'), - (_gzip_data('ud'.encode('utf-8')), 'ud', _gzip_data(b'vd'), 'vd'), - ) + ("hi mom", "hi mom", "hi pops", "hi pops"), + ("ud".encode("utf-8"), "ud", "vd".encode("utf-8"), "vd"), + (_gzip_data(b"ud"), "ud", _gzip_data(b"vd"), "vd"), + (_gzip_data("ud".encode("utf-8")), "ud", _gzip_data(b"vd"), "vd"), + ), ) def test_handle_args_root_processes_user_data( self, ud_src, ud_expected, vd_src, vd_expected, capsys, tmpdir @@ -199,23 +240,29 @@ class TestQuery: sensitive_file = run_dir.join(INSTANCE_JSON_SENSITIVE_FILE) sensitive_file.write('{"my-var": "it worked"}') args = self.args( - debug=False, dump_all=True, format=None, instance_data=None, - list_keys=False, user_data=user_data.strpath, - vendor_data=vendor_data.strpath, varname=None) - with mock.patch('cloudinit.cmd.query.read_cfg_paths') as m_paths: + debug=False, + dump_all=True, + format=None, + instance_data=None, + list_keys=False, + user_data=user_data.strpath, + vendor_data=vendor_data.strpath, + varname=None, + ) + with mock.patch("cloudinit.cmd.query.read_cfg_paths") as m_paths: m_paths.return_value = paths - with mock.patch('os.getuid') as m_getuid: + with mock.patch("os.getuid") as m_getuid: m_getuid.return_value = 0 - assert 0 == query.handle_args('anyname', args) + assert 0 == query.handle_args("anyname", args) out, _err = capsys.readouterr() cmd_output = json.loads(out) - assert "it worked" == cmd_output['my-var'] + assert "it worked" == cmd_output["my-var"] if ud_expected == "ci-b64:": ud_expected = "ci-b64:{}".format(b64e(ud_src)) if vd_expected == "ci-b64:": vd_expected = "ci-b64:{}".format(b64e(vd_src)) - assert ud_expected == cmd_output['userdata'] - assert vd_expected == cmd_output['vendordata'] + assert ud_expected == cmd_output["userdata"] + assert vd_expected == cmd_output["vendordata"] def test_handle_args_user_vendor_data_defaults_to_instance_link( self, capsys, tmpdir @@ -231,13 +278,19 @@ class TestQuery: write_file(vd_path, "instance_link_vd") args = self.args( - debug=False, dump_all=True, format=None, instance_data=None, - list_keys=False, user_data=None, - vendor_data=None, varname=None) - with mock.patch('cloudinit.cmd.query.read_cfg_paths') as m_paths: + debug=False, + dump_all=True, + format=None, + instance_data=None, + list_keys=False, + user_data=None, + vendor_data=None, + varname=None, + ) + with mock.patch("cloudinit.cmd.query.read_cfg_paths") as m_paths: m_paths.return_value = paths - with mock.patch('os.getuid', return_value=0): - assert 0 == query.handle_args('anyname', args) + with mock.patch("os.getuid", return_value=0): + assert 0 == query.handle_args("anyname", args) expected = ( '{\n "my-var": "it worked",\n ' '"userdata": "instance_link_ud",\n ' @@ -251,19 +304,25 @@ class TestQuery: ): """When no instance_data argument, root uses sensitive json.""" paths, run_dir, user_data, vendor_data = self._setup_paths( - tmpdir, ud_val='ud', vd_val='vd' + tmpdir, ud_val="ud", vd_val="vd" ) sensitive_file = run_dir.join(INSTANCE_JSON_SENSITIVE_FILE) sensitive_file.write('{"my-var": "it worked"}') args = self.args( - debug=False, dump_all=True, format=None, instance_data=None, - list_keys=False, user_data=user_data.strpath, - vendor_data=vendor_data.strpath, varname=None) - with mock.patch('cloudinit.cmd.query.read_cfg_paths') as m_paths: + debug=False, + dump_all=True, + format=None, + instance_data=None, + list_keys=False, + user_data=user_data.strpath, + vendor_data=vendor_data.strpath, + varname=None, + ) + with mock.patch("cloudinit.cmd.query.read_cfg_paths") as m_paths: m_paths.return_value = paths - with mock.patch('os.getuid') as m_getuid: + with mock.patch("os.getuid") as m_getuid: m_getuid.return_value = 0 - assert 0 == query.handle_args('anyname', args) + assert 0 == query.handle_args("anyname", args) expected = ( '{\n "my-var": "it worked",\n ' '"userdata": "ud",\n "vendordata": "vd"\n}\n' @@ -273,68 +332,85 @@ class TestQuery: def test_handle_args_dumps_all_instance_data(self, capsys, tmpdir): """When --all is specified query will dump all instance data vars.""" - instance_data = tmpdir.join('instance-data') + instance_data = tmpdir.join("instance-data") instance_data.write('{"my-var": "it worked"}') args = self.args( - debug=False, dump_all=True, format=None, - instance_data=instance_data.strpath, list_keys=False, - user_data='ud', vendor_data='vd', varname=None) - with mock.patch('os.getuid') as m_getuid: + debug=False, + dump_all=True, + format=None, + instance_data=instance_data.strpath, + list_keys=False, + user_data="ud", + vendor_data="vd", + varname=None, + ) + with mock.patch("os.getuid") as m_getuid: m_getuid.return_value = 100 - assert 0 == query.handle_args('anyname', args) + assert 0 == query.handle_args("anyname", args) expected = ( '{\n "my-var": "it worked",\n "userdata": "<%s> file:ud",\n' - ' "vendordata": "<%s> file:vd"\n}\n' % ( - REDACT_SENSITIVE_VALUE, REDACT_SENSITIVE_VALUE - ) + ' "vendordata": "<%s> file:vd"\n}\n' + % (REDACT_SENSITIVE_VALUE, REDACT_SENSITIVE_VALUE) ) out, _err = capsys.readouterr() assert expected == out def test_handle_args_returns_top_level_varname(self, capsys, tmpdir): """When the argument varname is passed, report its value.""" - instance_data = tmpdir.join('instance-data') + instance_data = tmpdir.join("instance-data") instance_data.write('{"my-var": "it worked"}') args = self.args( - debug=False, dump_all=True, format=None, - instance_data=instance_data.strpath, list_keys=False, - user_data='ud', vendor_data='vd', varname='my_var') - with mock.patch('os.getuid') as m_getuid: + debug=False, + dump_all=True, + format=None, + instance_data=instance_data.strpath, + list_keys=False, + user_data="ud", + vendor_data="vd", + varname="my_var", + ) + with mock.patch("os.getuid") as m_getuid: m_getuid.return_value = 100 - assert 0 == query.handle_args('anyname', args) + assert 0 == query.handle_args("anyname", args) out, _err = capsys.readouterr() - assert 'it worked\n' == out + assert "it worked\n" == out @pytest.mark.parametrize( - 'inst_data,varname,expected', + "inst_data,varname,expected", ( ( '{"v1": {"key-2": "value-2"}, "my-var": "it worked"}', - 'v1.key_2', - 'value-2\n' + "v1.key_2", + "value-2\n", ), # Assert no jinja underscore-delimited aliases are reported on CLI ( '{"v1": {"something-hyphenated": {"no.underscores":"x",' ' "no-alias": "y"}}, "my-var": "it worked"}', - 'v1.something_hyphenated', - '{\n "no-alias": "y",\n "no.underscores": "x"\n}\n' + "v1.something_hyphenated", + '{\n "no-alias": "y",\n "no.underscores": "x"\n}\n', ), - ) + ), ) def test_handle_args_returns_nested_varname( self, inst_data, varname, expected, capsys, tmpdir ): """If user_data file is a jinja template render instance-data vars.""" - instance_data = tmpdir.join('instance-data') + instance_data = tmpdir.join("instance-data") instance_data.write(inst_data) args = self.args( - debug=False, dump_all=False, format=None, - instance_data=instance_data.strpath, user_data='ud', - vendor_data='vd', list_keys=False, varname=varname) - with mock.patch('os.getuid') as m_getuid: + debug=False, + dump_all=False, + format=None, + instance_data=instance_data.strpath, + user_data="ud", + vendor_data="vd", + list_keys=False, + varname=varname, + ) + with mock.patch("os.getuid") as m_getuid: m_getuid.return_value = 100 - assert 0 == query.handle_args('anyname', args) + assert 0 == query.handle_args("anyname", args) out, _err = capsys.readouterr() assert expected == out @@ -342,11 +418,13 @@ class TestQuery: self, capsys, tmpdir ): """Any standardized vars under v# are promoted as top-level aliases.""" - instance_data = tmpdir.join('instance-data') + instance_data = tmpdir.join("instance-data") instance_data.write( '{"v1": {"v1_1": "val1.1"}, "v2": {"v2_2": "val2.2"},' - ' "top": "gun"}') - expected = dedent("""\ + ' "top": "gun"}' + ) + expected = dedent( + """\ { "top": "gun", "userdata": "<redacted for non-root user> file:ud", @@ -360,14 +438,21 @@ class TestQuery: "v2_2": "val2.2", "vendordata": "<redacted for non-root user> file:vd" } - """) + """ + ) args = self.args( - debug=False, dump_all=True, format=None, - instance_data=instance_data.strpath, user_data='ud', - vendor_data='vd', list_keys=False, varname=None) - with mock.patch('os.getuid') as m_getuid: + debug=False, + dump_all=True, + format=None, + instance_data=instance_data.strpath, + user_data="ud", + vendor_data="vd", + list_keys=False, + varname=None, + ) + with mock.patch("os.getuid") as m_getuid: m_getuid.return_value = 100 - assert 0 == query.handle_args('anyname', args) + assert 0 == query.handle_args("anyname", args) out, _err = capsys.readouterr() assert expected == out @@ -375,18 +460,25 @@ class TestQuery: self, capsys, tmpdir ): """Sort all top-level keys when only --list-keys provided.""" - instance_data = tmpdir.join('instance-data') + instance_data = tmpdir.join("instance-data") instance_data.write( '{"v1": {"v1_1": "val1.1"}, "v2": {"v2_2": "val2.2"},' - ' "top": "gun"}') - expected = 'top\nuserdata\nv1\nv1_1\nv2\nv2_2\nvendordata\n' + ' "top": "gun"}' + ) + expected = "top\nuserdata\nv1\nv1_1\nv2\nv2_2\nvendordata\n" args = self.args( - debug=False, dump_all=False, format=None, - instance_data=instance_data.strpath, list_keys=True, - user_data='ud', vendor_data='vd', varname=None) - with mock.patch('os.getuid') as m_getuid: + debug=False, + dump_all=False, + format=None, + instance_data=instance_data.strpath, + list_keys=True, + user_data="ud", + vendor_data="vd", + varname=None, + ) + with mock.patch("os.getuid") as m_getuid: m_getuid.return_value = 100 - assert 0 == query.handle_args('anyname', args) + assert 0 == query.handle_args("anyname", args) out, _err = capsys.readouterr() assert expected == out @@ -394,18 +486,25 @@ class TestQuery: self, capsys, tmpdir ): """Sort all nested keys of varname object when --list-keys provided.""" - instance_data = tmpdir.join('instance-data') + instance_data = tmpdir.join("instance-data") instance_data.write( - '{"v1": {"v1_1": "val1.1", "v1_2": "val1.2"}, "v2":' + - ' {"v2_2": "val2.2"}, "top": "gun"}') - expected = 'v1_1\nv1_2\n' + '{"v1": {"v1_1": "val1.1", "v1_2": "val1.2"}, "v2":' + + ' {"v2_2": "val2.2"}, "top": "gun"}' + ) + expected = "v1_1\nv1_2\n" args = self.args( - debug=False, dump_all=False, format=None, - instance_data=instance_data.strpath, list_keys=True, - user_data='ud', vendor_data='vd', varname='v1') - with mock.patch('os.getuid') as m_getuid: + debug=False, + dump_all=False, + format=None, + instance_data=instance_data.strpath, + list_keys=True, + user_data="ud", + vendor_data="vd", + varname="v1", + ) + with mock.patch("os.getuid") as m_getuid: m_getuid.return_value = 100 - assert 0 == query.handle_args('anyname', args) + assert 0 == query.handle_args("anyname", args) out, _err = capsys.readouterr() assert expected == out @@ -413,18 +512,26 @@ class TestQuery: self, caplog, tmpdir ): """Raise an error when --list-keys and varname specify a non-list.""" - instance_data = tmpdir.join('instance-data') + instance_data = tmpdir.join("instance-data") instance_data.write( - '{"v1": {"v1_1": "val1.1", "v1_2": "val1.2"}, "v2": ' + - '{"v2_2": "val2.2"}, "top": "gun"}') + '{"v1": {"v1_1": "val1.1", "v1_2": "val1.2"}, "v2": ' + + '{"v2_2": "val2.2"}, "top": "gun"}' + ) expected_error = "--list-keys provided but 'top' is not a dict" args = self.args( - debug=False, dump_all=False, format=None, - instance_data=instance_data.strpath, list_keys=True, - user_data='ud', vendor_data='vd', varname='top') - with mock.patch('os.getuid') as m_getuid: + debug=False, + dump_all=False, + format=None, + instance_data=instance_data.strpath, + list_keys=True, + user_data="ud", + vendor_data="vd", + varname="top", + ) + with mock.patch("os.getuid") as m_getuid: m_getuid.return_value = 100 - assert 1 == query.handle_args('anyname', args) + assert 1 == query.handle_args("anyname", args) assert expected_error in caplog.text + # vi: ts=4 expandtab diff --git a/tests/unittests/cmd/test_status.py b/tests/unittests/cmd/test_status.py index 49eae043..acd1fea5 100644 --- a/tests/unittests/cmd/test_status.py +++ b/tests/unittests/cmd/test_status.py @@ -1,26 +1,25 @@ # This file is part of cloud-init. See LICENSE file for license information. -from collections import namedtuple import os +from collections import namedtuple from io import StringIO from textwrap import dedent from cloudinit.atomic_helper import write_json from cloudinit.cmd import status from cloudinit.util import ensure_file -from tests.unittests.helpers import CiTestCase, wrap_and_call, mock +from tests.unittests.helpers import CiTestCase, mock, wrap_and_call -mypaths = namedtuple('MyPaths', 'run_dir') -myargs = namedtuple('MyArgs', 'long wait') +mypaths = namedtuple("MyPaths", "run_dir") +myargs = namedtuple("MyArgs", "long wait") class TestStatus(CiTestCase): - def setUp(self): super(TestStatus, self).setUp() self.new_root = self.tmp_dir() - self.status_file = self.tmp_path('status.json', self.new_root) - self.disable_file = self.tmp_path('cloudinit-disable', self.new_root) + self.status_file = self.tmp_path("status.json", self.new_root) + self.disable_file = self.tmp_path("cloudinit-disable", self.new_root) self.paths = mypaths(run_dir=self.new_root) class FakeInit(object): @@ -35,285 +34,419 @@ class TestStatus(CiTestCase): self.init_class = FakeInit def test__is_cloudinit_disabled_false_on_sysvinit(self): - '''When not in an environment using systemd, return False.''' + """When not in an environment using systemd, return False.""" ensure_file(self.disable_file) # Create the ignored disable file (is_disabled, reason) = wrap_and_call( - 'cloudinit.cmd.status', - {'uses_systemd': False, - 'get_cmdline': "root=/dev/my-root not-important"}, - status._is_cloudinit_disabled, self.disable_file, self.paths) + "cloudinit.cmd.status", + { + "uses_systemd": False, + "get_cmdline": "root=/dev/my-root not-important", + }, + status._is_cloudinit_disabled, + self.disable_file, + self.paths, + ) self.assertFalse( - is_disabled, 'expected enabled cloud-init on sysvinit') - self.assertEqual('Cloud-init enabled on sysvinit', reason) + is_disabled, "expected enabled cloud-init on sysvinit" + ) + self.assertEqual("Cloud-init enabled on sysvinit", reason) def test__is_cloudinit_disabled_true_on_disable_file(self): - '''When using systemd and disable_file is present return disabled.''' + """When using systemd and disable_file is present return disabled.""" ensure_file(self.disable_file) # Create observed disable file (is_disabled, reason) = wrap_and_call( - 'cloudinit.cmd.status', - {'uses_systemd': True, - 'get_cmdline': "root=/dev/my-root not-important"}, - status._is_cloudinit_disabled, self.disable_file, self.paths) - self.assertTrue(is_disabled, 'expected disabled cloud-init') + "cloudinit.cmd.status", + { + "uses_systemd": True, + "get_cmdline": "root=/dev/my-root not-important", + }, + status._is_cloudinit_disabled, + self.disable_file, + self.paths, + ) + self.assertTrue(is_disabled, "expected disabled cloud-init") self.assertEqual( - 'Cloud-init disabled by {0}'.format(self.disable_file), reason) + "Cloud-init disabled by {0}".format(self.disable_file), reason + ) def test__is_cloudinit_disabled_false_on_kernel_cmdline_enable(self): - '''Not disabled when using systemd and enabled via commandline.''' + """Not disabled when using systemd and enabled via commandline.""" ensure_file(self.disable_file) # Create ignored disable file (is_disabled, reason) = wrap_and_call( - 'cloudinit.cmd.status', - {'uses_systemd': True, - 'get_cmdline': 'something cloud-init=enabled else'}, - status._is_cloudinit_disabled, self.disable_file, self.paths) - self.assertFalse(is_disabled, 'expected enabled cloud-init') + "cloudinit.cmd.status", + { + "uses_systemd": True, + "get_cmdline": "something cloud-init=enabled else", + }, + status._is_cloudinit_disabled, + self.disable_file, + self.paths, + ) + self.assertFalse(is_disabled, "expected enabled cloud-init") self.assertEqual( - 'Cloud-init enabled by kernel command line cloud-init=enabled', - reason) + "Cloud-init enabled by kernel command line cloud-init=enabled", + reason, + ) def test__is_cloudinit_disabled_true_on_kernel_cmdline(self): - '''When using systemd and disable_file is present return disabled.''' + """When using systemd and disable_file is present return disabled.""" (is_disabled, reason) = wrap_and_call( - 'cloudinit.cmd.status', - {'uses_systemd': True, - 'get_cmdline': 'something cloud-init=disabled else'}, - status._is_cloudinit_disabled, self.disable_file, self.paths) - self.assertTrue(is_disabled, 'expected disabled cloud-init') + "cloudinit.cmd.status", + { + "uses_systemd": True, + "get_cmdline": "something cloud-init=disabled else", + }, + status._is_cloudinit_disabled, + self.disable_file, + self.paths, + ) + self.assertTrue(is_disabled, "expected disabled cloud-init") self.assertEqual( - 'Cloud-init disabled by kernel parameter cloud-init=disabled', - reason) + "Cloud-init disabled by kernel parameter cloud-init=disabled", + reason, + ) def test__is_cloudinit_disabled_true_when_generator_disables(self): - '''When cloud-init-generator doesn't write enabled file return True.''' - enabled_file = os.path.join(self.paths.run_dir, 'enabled') + """When cloud-init-generator doesn't write enabled file return True.""" + enabled_file = os.path.join(self.paths.run_dir, "enabled") self.assertFalse(os.path.exists(enabled_file)) (is_disabled, reason) = wrap_and_call( - 'cloudinit.cmd.status', - {'uses_systemd': True, - 'get_cmdline': 'something'}, - status._is_cloudinit_disabled, self.disable_file, self.paths) - self.assertTrue(is_disabled, 'expected disabled cloud-init') - self.assertEqual('Cloud-init disabled by cloud-init-generator', reason) + "cloudinit.cmd.status", + {"uses_systemd": True, "get_cmdline": "something"}, + status._is_cloudinit_disabled, + self.disable_file, + self.paths, + ) + self.assertTrue(is_disabled, "expected disabled cloud-init") + self.assertEqual("Cloud-init disabled by cloud-init-generator", reason) def test__is_cloudinit_disabled_false_when_enabled_in_systemd(self): - '''Report enabled when systemd generator creates the enabled file.''' - enabled_file = os.path.join(self.paths.run_dir, 'enabled') + """Report enabled when systemd generator creates the enabled file.""" + enabled_file = os.path.join(self.paths.run_dir, "enabled") ensure_file(enabled_file) (is_disabled, reason) = wrap_and_call( - 'cloudinit.cmd.status', - {'uses_systemd': True, - 'get_cmdline': 'something ignored'}, - status._is_cloudinit_disabled, self.disable_file, self.paths) - self.assertFalse(is_disabled, 'expected enabled cloud-init') + "cloudinit.cmd.status", + {"uses_systemd": True, "get_cmdline": "something ignored"}, + status._is_cloudinit_disabled, + self.disable_file, + self.paths, + ) + self.assertFalse(is_disabled, "expected enabled cloud-init") self.assertEqual( - 'Cloud-init enabled by systemd cloud-init-generator', reason) + "Cloud-init enabled by systemd cloud-init-generator", reason + ) def test_status_returns_not_run(self): - '''When status.json does not exist yet, return 'not run'.''' + """When status.json does not exist yet, return 'not run'.""" self.assertFalse( - os.path.exists(self.status_file), 'Unexpected status.json found') + os.path.exists(self.status_file), "Unexpected status.json found" + ) cmdargs = myargs(long=False, wait=False) - with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout: + with mock.patch("sys.stdout", new_callable=StringIO) as m_stdout: retcode = wrap_and_call( - 'cloudinit.cmd.status', - {'_is_cloudinit_disabled': (False, ''), - 'Init': {'side_effect': self.init_class}}, - status.handle_status_args, 'ignored', cmdargs) + "cloudinit.cmd.status", + { + "_is_cloudinit_disabled": (False, ""), + "Init": {"side_effect": self.init_class}, + }, + status.handle_status_args, + "ignored", + cmdargs, + ) self.assertEqual(0, retcode) - self.assertEqual('status: not run\n', m_stdout.getvalue()) + self.assertEqual("status: not run\n", m_stdout.getvalue()) def test_status_returns_disabled_long_on_presence_of_disable_file(self): - '''When cloudinit is disabled, return disabled reason.''' + """When cloudinit is disabled, return disabled reason.""" checked_files = [] def fakeexists(filepath): checked_files.append(filepath) - status_file = os.path.join(self.paths.run_dir, 'status.json') + status_file = os.path.join(self.paths.run_dir, "status.json") return bool(not filepath == status_file) cmdargs = myargs(long=True, wait=False) - with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout: + with mock.patch("sys.stdout", new_callable=StringIO) as m_stdout: retcode = wrap_and_call( - 'cloudinit.cmd.status', - {'os.path.exists': {'side_effect': fakeexists}, - '_is_cloudinit_disabled': (True, 'disabled for some reason'), - 'Init': {'side_effect': self.init_class}}, - status.handle_status_args, 'ignored', cmdargs) + "cloudinit.cmd.status", + { + "os.path.exists": {"side_effect": fakeexists}, + "_is_cloudinit_disabled": ( + True, + "disabled for some reason", + ), + "Init": {"side_effect": self.init_class}, + }, + status.handle_status_args, + "ignored", + cmdargs, + ) self.assertEqual(0, retcode) self.assertEqual( - [os.path.join(self.paths.run_dir, 'status.json')], - checked_files) - expected = dedent('''\ + [os.path.join(self.paths.run_dir, "status.json")], checked_files + ) + expected = dedent( + """\ status: disabled detail: disabled for some reason - ''') + """ + ) self.assertEqual(expected, m_stdout.getvalue()) def test_status_returns_running_on_no_results_json(self): - '''Report running when status.json exists but result.json does not.''' - result_file = self.tmp_path('result.json', self.new_root) + """Report running when status.json exists but result.json does not.""" + result_file = self.tmp_path("result.json", self.new_root) write_json(self.status_file, {}) self.assertFalse( - os.path.exists(result_file), 'Unexpected result.json found') + os.path.exists(result_file), "Unexpected result.json found" + ) cmdargs = myargs(long=False, wait=False) - with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout: + with mock.patch("sys.stdout", new_callable=StringIO) as m_stdout: retcode = wrap_and_call( - 'cloudinit.cmd.status', - {'_is_cloudinit_disabled': (False, ''), - 'Init': {'side_effect': self.init_class}}, - status.handle_status_args, 'ignored', cmdargs) + "cloudinit.cmd.status", + { + "_is_cloudinit_disabled": (False, ""), + "Init": {"side_effect": self.init_class}, + }, + status.handle_status_args, + "ignored", + cmdargs, + ) self.assertEqual(0, retcode) - self.assertEqual('status: running\n', m_stdout.getvalue()) + self.assertEqual("status: running\n", m_stdout.getvalue()) def test_status_returns_running(self): - '''Report running when status exists with an unfinished stage.''' - ensure_file(self.tmp_path('result.json', self.new_root)) - write_json(self.status_file, - {'v1': {'init': {'start': 1, 'finished': None}}}) + """Report running when status exists with an unfinished stage.""" + ensure_file(self.tmp_path("result.json", self.new_root)) + write_json( + self.status_file, {"v1": {"init": {"start": 1, "finished": None}}} + ) cmdargs = myargs(long=False, wait=False) - with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout: + with mock.patch("sys.stdout", new_callable=StringIO) as m_stdout: retcode = wrap_and_call( - 'cloudinit.cmd.status', - {'_is_cloudinit_disabled': (False, ''), - 'Init': {'side_effect': self.init_class}}, - status.handle_status_args, 'ignored', cmdargs) + "cloudinit.cmd.status", + { + "_is_cloudinit_disabled": (False, ""), + "Init": {"side_effect": self.init_class}, + }, + status.handle_status_args, + "ignored", + cmdargs, + ) self.assertEqual(0, retcode) - self.assertEqual('status: running\n', m_stdout.getvalue()) + self.assertEqual("status: running\n", m_stdout.getvalue()) def test_status_returns_done(self): - '''Report done results.json exists no stages are unfinished.''' - ensure_file(self.tmp_path('result.json', self.new_root)) + """Report done results.json exists no stages are unfinished.""" + ensure_file(self.tmp_path("result.json", self.new_root)) write_json( self.status_file, - {'v1': {'stage': None, # No current stage running - 'datasource': ( - 'DataSourceNoCloud [seed=/var/.../seed/nocloud-net]' - '[dsmode=net]'), - 'blah': {'finished': 123.456}, - 'init': {'errors': [], 'start': 124.567, - 'finished': 125.678}, - 'init-local': {'start': 123.45, 'finished': 123.46}}}) + { + "v1": { + "stage": None, # No current stage running + "datasource": ( + "DataSourceNoCloud [seed=/var/.../seed/nocloud-net]" + "[dsmode=net]" + ), + "blah": {"finished": 123.456}, + "init": { + "errors": [], + "start": 124.567, + "finished": 125.678, + }, + "init-local": {"start": 123.45, "finished": 123.46}, + } + }, + ) cmdargs = myargs(long=False, wait=False) - with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout: + with mock.patch("sys.stdout", new_callable=StringIO) as m_stdout: retcode = wrap_and_call( - 'cloudinit.cmd.status', - {'_is_cloudinit_disabled': (False, ''), - 'Init': {'side_effect': self.init_class}}, - status.handle_status_args, 'ignored', cmdargs) + "cloudinit.cmd.status", + { + "_is_cloudinit_disabled": (False, ""), + "Init": {"side_effect": self.init_class}, + }, + status.handle_status_args, + "ignored", + cmdargs, + ) self.assertEqual(0, retcode) - self.assertEqual('status: done\n', m_stdout.getvalue()) + self.assertEqual("status: done\n", m_stdout.getvalue()) def test_status_returns_done_long(self): - '''Long format of done status includes datasource info.''' - ensure_file(self.tmp_path('result.json', self.new_root)) + """Long format of done status includes datasource info.""" + ensure_file(self.tmp_path("result.json", self.new_root)) write_json( self.status_file, - {'v1': {'stage': None, - 'datasource': ( - 'DataSourceNoCloud [seed=/var/.../seed/nocloud-net]' - '[dsmode=net]'), - 'init': {'start': 124.567, 'finished': 125.678}, - 'init-local': {'start': 123.45, 'finished': 123.46}}}) + { + "v1": { + "stage": None, + "datasource": ( + "DataSourceNoCloud [seed=/var/.../seed/nocloud-net]" + "[dsmode=net]" + ), + "init": {"start": 124.567, "finished": 125.678}, + "init-local": {"start": 123.45, "finished": 123.46}, + } + }, + ) cmdargs = myargs(long=True, wait=False) - with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout: + with mock.patch("sys.stdout", new_callable=StringIO) as m_stdout: retcode = wrap_and_call( - 'cloudinit.cmd.status', - {'_is_cloudinit_disabled': (False, ''), - 'Init': {'side_effect': self.init_class}}, - status.handle_status_args, 'ignored', cmdargs) + "cloudinit.cmd.status", + { + "_is_cloudinit_disabled": (False, ""), + "Init": {"side_effect": self.init_class}, + }, + status.handle_status_args, + "ignored", + cmdargs, + ) self.assertEqual(0, retcode) - expected = dedent('''\ + expected = dedent( + """\ status: done time: Thu, 01 Jan 1970 00:02:05 +0000 detail: DataSourceNoCloud [seed=/var/.../seed/nocloud-net][dsmode=net] - ''') + """ + ) self.assertEqual(expected, m_stdout.getvalue()) def test_status_on_errors(self): - '''Reports error when any stage has errors.''' + """Reports error when any stage has errors.""" write_json( self.status_file, - {'v1': {'stage': None, - 'blah': {'errors': [], 'finished': 123.456}, - 'init': {'errors': ['error1'], 'start': 124.567, - 'finished': 125.678}, - 'init-local': {'start': 123.45, 'finished': 123.46}}}) + { + "v1": { + "stage": None, + "blah": {"errors": [], "finished": 123.456}, + "init": { + "errors": ["error1"], + "start": 124.567, + "finished": 125.678, + }, + "init-local": {"start": 123.45, "finished": 123.46}, + } + }, + ) cmdargs = myargs(long=False, wait=False) - with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout: + with mock.patch("sys.stdout", new_callable=StringIO) as m_stdout: retcode = wrap_and_call( - 'cloudinit.cmd.status', - {'_is_cloudinit_disabled': (False, ''), - 'Init': {'side_effect': self.init_class}}, - status.handle_status_args, 'ignored', cmdargs) + "cloudinit.cmd.status", + { + "_is_cloudinit_disabled": (False, ""), + "Init": {"side_effect": self.init_class}, + }, + status.handle_status_args, + "ignored", + cmdargs, + ) self.assertEqual(1, retcode) - self.assertEqual('status: error\n', m_stdout.getvalue()) + self.assertEqual("status: error\n", m_stdout.getvalue()) def test_status_on_errors_long(self): - '''Long format of error status includes all error messages.''' + """Long format of error status includes all error messages.""" write_json( self.status_file, - {'v1': {'stage': None, - 'datasource': ( - 'DataSourceNoCloud [seed=/var/.../seed/nocloud-net]' - '[dsmode=net]'), - 'init': {'errors': ['error1'], 'start': 124.567, - 'finished': 125.678}, - 'init-local': {'errors': ['error2', 'error3'], - 'start': 123.45, 'finished': 123.46}}}) + { + "v1": { + "stage": None, + "datasource": ( + "DataSourceNoCloud [seed=/var/.../seed/nocloud-net]" + "[dsmode=net]" + ), + "init": { + "errors": ["error1"], + "start": 124.567, + "finished": 125.678, + }, + "init-local": { + "errors": ["error2", "error3"], + "start": 123.45, + "finished": 123.46, + }, + } + }, + ) cmdargs = myargs(long=True, wait=False) - with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout: + with mock.patch("sys.stdout", new_callable=StringIO) as m_stdout: retcode = wrap_and_call( - 'cloudinit.cmd.status', - {'_is_cloudinit_disabled': (False, ''), - 'Init': {'side_effect': self.init_class}}, - status.handle_status_args, 'ignored', cmdargs) + "cloudinit.cmd.status", + { + "_is_cloudinit_disabled": (False, ""), + "Init": {"side_effect": self.init_class}, + }, + status.handle_status_args, + "ignored", + cmdargs, + ) self.assertEqual(1, retcode) - expected = dedent('''\ + expected = dedent( + """\ status: error time: Thu, 01 Jan 1970 00:02:05 +0000 detail: error1 error2 error3 - ''') + """ + ) self.assertEqual(expected, m_stdout.getvalue()) def test_status_returns_running_long_format(self): - '''Long format reports the stage in which we are running.''' + """Long format reports the stage in which we are running.""" write_json( self.status_file, - {'v1': {'stage': 'init', - 'init': {'start': 124.456, 'finished': None}, - 'init-local': {'start': 123.45, 'finished': 123.46}}}) + { + "v1": { + "stage": "init", + "init": {"start": 124.456, "finished": None}, + "init-local": {"start": 123.45, "finished": 123.46}, + } + }, + ) cmdargs = myargs(long=True, wait=False) - with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout: + with mock.patch("sys.stdout", new_callable=StringIO) as m_stdout: retcode = wrap_and_call( - 'cloudinit.cmd.status', - {'_is_cloudinit_disabled': (False, ''), - 'Init': {'side_effect': self.init_class}}, - status.handle_status_args, 'ignored', cmdargs) + "cloudinit.cmd.status", + { + "_is_cloudinit_disabled": (False, ""), + "Init": {"side_effect": self.init_class}, + }, + status.handle_status_args, + "ignored", + cmdargs, + ) self.assertEqual(0, retcode) - expected = dedent('''\ + expected = dedent( + """\ status: running time: Thu, 01 Jan 1970 00:02:04 +0000 detail: Running in stage: init - ''') + """ + ) self.assertEqual(expected, m_stdout.getvalue()) def test_status_wait_blocks_until_done(self): - '''Specifying wait will poll every 1/4 second until done state.''' + """Specifying wait will poll every 1/4 second until done state.""" running_json = { - 'v1': {'stage': 'init', - 'init': {'start': 124.456, 'finished': None}, - 'init-local': {'start': 123.45, 'finished': 123.46}}} + "v1": { + "stage": "init", + "init": {"start": 124.456, "finished": None}, + "init-local": {"start": 123.45, "finished": 123.46}, + } + } done_json = { - 'v1': {'stage': None, - 'init': {'start': 124.456, 'finished': 125.678}, - 'init-local': {'start': 123.45, 'finished': 123.46}}} + "v1": { + "stage": None, + "init": {"start": 124.456, "finished": 125.678}, + "init-local": {"start": 123.45, "finished": 123.46}, + } + } self.sleep_calls = 0 @@ -324,32 +457,46 @@ class TestStatus(CiTestCase): write_json(self.status_file, running_json) elif self.sleep_calls == 3: write_json(self.status_file, done_json) - result_file = self.tmp_path('result.json', self.new_root) + result_file = self.tmp_path("result.json", self.new_root) ensure_file(result_file) cmdargs = myargs(long=False, wait=True) - with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout: + with mock.patch("sys.stdout", new_callable=StringIO) as m_stdout: retcode = wrap_and_call( - 'cloudinit.cmd.status', - {'sleep': {'side_effect': fake_sleep}, - '_is_cloudinit_disabled': (False, ''), - 'Init': {'side_effect': self.init_class}}, - status.handle_status_args, 'ignored', cmdargs) + "cloudinit.cmd.status", + { + "sleep": {"side_effect": fake_sleep}, + "_is_cloudinit_disabled": (False, ""), + "Init": {"side_effect": self.init_class}, + }, + status.handle_status_args, + "ignored", + cmdargs, + ) self.assertEqual(0, retcode) self.assertEqual(4, self.sleep_calls) - self.assertEqual('....\nstatus: done\n', m_stdout.getvalue()) + self.assertEqual("....\nstatus: done\n", m_stdout.getvalue()) def test_status_wait_blocks_until_error(self): - '''Specifying wait will poll every 1/4 second until error state.''' + """Specifying wait will poll every 1/4 second until error state.""" running_json = { - 'v1': {'stage': 'init', - 'init': {'start': 124.456, 'finished': None}, - 'init-local': {'start': 123.45, 'finished': 123.46}}} + "v1": { + "stage": "init", + "init": {"start": 124.456, "finished": None}, + "init-local": {"start": 123.45, "finished": 123.46}, + } + } error_json = { - 'v1': {'stage': None, - 'init': {'errors': ['error1'], 'start': 124.456, - 'finished': 125.678}, - 'init-local': {'start': 123.45, 'finished': 123.46}}} + "v1": { + "stage": None, + "init": { + "errors": ["error1"], + "start": 124.456, + "finished": 125.678, + }, + "init-local": {"start": 123.45, "finished": 123.46}, + } + } self.sleep_calls = 0 @@ -362,30 +509,40 @@ class TestStatus(CiTestCase): write_json(self.status_file, error_json) cmdargs = myargs(long=False, wait=True) - with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout: + with mock.patch("sys.stdout", new_callable=StringIO) as m_stdout: retcode = wrap_and_call( - 'cloudinit.cmd.status', - {'sleep': {'side_effect': fake_sleep}, - '_is_cloudinit_disabled': (False, ''), - 'Init': {'side_effect': self.init_class}}, - status.handle_status_args, 'ignored', cmdargs) + "cloudinit.cmd.status", + { + "sleep": {"side_effect": fake_sleep}, + "_is_cloudinit_disabled": (False, ""), + "Init": {"side_effect": self.init_class}, + }, + status.handle_status_args, + "ignored", + cmdargs, + ) self.assertEqual(1, retcode) self.assertEqual(4, self.sleep_calls) - self.assertEqual('....\nstatus: error\n', m_stdout.getvalue()) + self.assertEqual("....\nstatus: error\n", m_stdout.getvalue()) def test_status_main(self): - '''status.main can be run as a standalone script.''' - write_json(self.status_file, - {'v1': {'init': {'start': 1, 'finished': None}}}) + """status.main can be run as a standalone script.""" + write_json( + self.status_file, {"v1": {"init": {"start": 1, "finished": None}}} + ) with self.assertRaises(SystemExit) as context_manager: - with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout: + with mock.patch("sys.stdout", new_callable=StringIO) as m_stdout: wrap_and_call( - 'cloudinit.cmd.status', - {'sys.argv': {'new': ['status']}, - '_is_cloudinit_disabled': (False, ''), - 'Init': {'side_effect': self.init_class}}, - status.main) + "cloudinit.cmd.status", + { + "sys.argv": {"new": ["status"]}, + "_is_cloudinit_disabled": (False, ""), + "Init": {"side_effect": self.init_class}, + }, + status.main, + ) self.assertEqual(0, context_manager.exception.code) - self.assertEqual('status: running\n', m_stdout.getvalue()) + self.assertEqual("status: running\n", m_stdout.getvalue()) + # vi: ts=4 expandtab syntax=python |