diff options
Diffstat (limited to 'cloudinit/cmd/tests')
-rw-r--r-- | cloudinit/cmd/tests/__init__.py | 0 | ||||
-rw-r--r-- | cloudinit/cmd/tests/test_clean.py | 178 | ||||
-rw-r--r-- | cloudinit/cmd/tests/test_cloud_id.py | 127 | ||||
-rw-r--r-- | cloudinit/cmd/tests/test_main.py | 188 | ||||
-rw-r--r-- | cloudinit/cmd/tests/test_query.py | 392 | ||||
-rw-r--r-- | cloudinit/cmd/tests/test_status.py | 391 |
6 files changed, 0 insertions, 1276 deletions
diff --git a/cloudinit/cmd/tests/__init__.py b/cloudinit/cmd/tests/__init__.py deleted file mode 100644 index e69de29b..00000000 --- a/cloudinit/cmd/tests/__init__.py +++ /dev/null diff --git a/cloudinit/cmd/tests/test_clean.py b/cloudinit/cmd/tests/test_clean.py deleted file mode 100644 index a848a810..00000000 --- a/cloudinit/cmd/tests/test_clean.py +++ /dev/null @@ -1,178 +0,0 @@ -# This file is part of cloud-init. See LICENSE file for license information. - -from cloudinit.cmd import clean -from cloudinit.util import ensure_dir, sym_link, write_file -from cloudinit.tests.helpers import CiTestCase, wrap_and_call, mock -from collections import namedtuple -import os -from io import StringIO - -mypaths = namedtuple('MyPaths', 'cloud_dir') - - -class TestClean(CiTestCase): - - def setUp(self): - super(TestClean, self).setUp() - self.new_root = self.tmp_dir() - self.artifact_dir = self.tmp_path('artifacts', self.new_root) - self.log1 = self.tmp_path('cloud-init.log', self.new_root) - self.log2 = self.tmp_path('cloud-init-output.log', self.new_root) - - class FakeInit(object): - cfg = {'def_log_file': self.log1, - 'output': {'all': '|tee -a {0}'.format(self.log2)}} - # Ensure cloud_dir has a trailing slash, to match real behaviour - paths = mypaths(cloud_dir='{}/'.format(self.artifact_dir)) - - def __init__(self, ds_deps): - pass - - def read_cfg(self): - pass - - self.init_class = FakeInit - - def test_remove_artifacts_removes_logs(self): - """remove_artifacts removes logs when remove_logs is True.""" - write_file(self.log1, 'cloud-init-log') - write_file(self.log2, 'cloud-init-output-log') - - self.assertFalse( - os.path.exists(self.artifact_dir), 'Unexpected artifacts dir') - retcode = wrap_and_call( - 'cloudinit.cmd.clean', - {'Init': {'side_effect': self.init_class}}, - clean.remove_artifacts, remove_logs=True) - self.assertFalse(os.path.exists(self.log1), 'Unexpected file') - self.assertFalse(os.path.exists(self.log2), 'Unexpected file') - self.assertEqual(0, retcode) - - def test_remove_artifacts_preserves_logs(self): - """remove_artifacts leaves logs when remove_logs is False.""" - write_file(self.log1, 'cloud-init-log') - write_file(self.log2, 'cloud-init-output-log') - - retcode = wrap_and_call( - 'cloudinit.cmd.clean', - {'Init': {'side_effect': self.init_class}}, - clean.remove_artifacts, remove_logs=False) - self.assertTrue(os.path.exists(self.log1), 'Missing expected file') - self.assertTrue(os.path.exists(self.log2), 'Missing expected file') - self.assertEqual(0, retcode) - - def test_remove_artifacts_removes_unlinks_symlinks(self): - """remove_artifacts cleans artifacts dir unlinking any symlinks.""" - dir1 = os.path.join(self.artifact_dir, 'dir1') - ensure_dir(dir1) - symlink = os.path.join(self.artifact_dir, 'mylink') - sym_link(dir1, symlink) - - retcode = wrap_and_call( - 'cloudinit.cmd.clean', - {'Init': {'side_effect': self.init_class}}, - clean.remove_artifacts, remove_logs=False) - self.assertEqual(0, retcode) - for path in (dir1, symlink): - self.assertFalse( - os.path.exists(path), - 'Unexpected {0} dir'.format(path)) - - def test_remove_artifacts_removes_artifacts_skipping_seed(self): - """remove_artifacts cleans artifacts dir with exception of seed dir.""" - dirs = [ - self.artifact_dir, - os.path.join(self.artifact_dir, 'seed'), - os.path.join(self.artifact_dir, 'dir1'), - os.path.join(self.artifact_dir, 'dir2')] - for _dir in dirs: - ensure_dir(_dir) - - retcode = wrap_and_call( - 'cloudinit.cmd.clean', - {'Init': {'side_effect': self.init_class}}, - clean.remove_artifacts, remove_logs=False) - self.assertEqual(0, retcode) - for expected_dir in dirs[:2]: - self.assertTrue( - os.path.exists(expected_dir), - 'Missing {0} dir'.format(expected_dir)) - for deleted_dir in dirs[2:]: - self.assertFalse( - os.path.exists(deleted_dir), - 'Unexpected {0} dir'.format(deleted_dir)) - - def test_remove_artifacts_removes_artifacts_removes_seed(self): - """remove_artifacts removes seed dir when remove_seed is True.""" - dirs = [ - self.artifact_dir, - os.path.join(self.artifact_dir, 'seed'), - os.path.join(self.artifact_dir, 'dir1'), - os.path.join(self.artifact_dir, 'dir2')] - for _dir in dirs: - ensure_dir(_dir) - - retcode = wrap_and_call( - 'cloudinit.cmd.clean', - {'Init': {'side_effect': self.init_class}}, - clean.remove_artifacts, remove_logs=False, remove_seed=True) - self.assertEqual(0, retcode) - self.assertTrue( - os.path.exists(self.artifact_dir), 'Missing artifact dir') - for deleted_dir in dirs[1:]: - self.assertFalse( - os.path.exists(deleted_dir), - 'Unexpected {0} dir'.format(deleted_dir)) - - def test_remove_artifacts_returns_one_on_errors(self): - """remove_artifacts returns non-zero on failure and prints an error.""" - ensure_dir(self.artifact_dir) - ensure_dir(os.path.join(self.artifact_dir, 'dir1')) - - with mock.patch('sys.stderr', new_callable=StringIO) as m_stderr: - retcode = wrap_and_call( - 'cloudinit.cmd.clean', - {'del_dir': {'side_effect': OSError('oops')}, - 'Init': {'side_effect': self.init_class}}, - clean.remove_artifacts, remove_logs=False) - self.assertEqual(1, retcode) - self.assertEqual( - 'ERROR: Could not remove %s/dir1: oops\n' % self.artifact_dir, - m_stderr.getvalue()) - - def test_handle_clean_args_reboots(self): - """handle_clean_args_reboots when reboot arg is provided.""" - - called_cmds = [] - - def fake_subp(cmd, capture): - called_cmds.append((cmd, capture)) - return '', '' - - myargs = namedtuple('MyArgs', 'remove_logs remove_seed reboot') - cmdargs = myargs(remove_logs=False, remove_seed=False, reboot=True) - retcode = wrap_and_call( - 'cloudinit.cmd.clean', - {'subp': {'side_effect': fake_subp}, - 'Init': {'side_effect': self.init_class}}, - clean.handle_clean_args, name='does not matter', args=cmdargs) - self.assertEqual(0, retcode) - self.assertEqual( - [(['shutdown', '-r', 'now'], False)], called_cmds) - - def test_status_main(self): - '''clean.main can be run as a standalone script.''' - write_file(self.log1, 'cloud-init-log') - with self.assertRaises(SystemExit) as context_manager: - wrap_and_call( - 'cloudinit.cmd.clean', - {'Init': {'side_effect': self.init_class}, - 'sys.argv': {'new': ['clean', '--logs']}}, - clean.main) - - self.assertEqual(0, context_manager.exception.code) - self.assertFalse( - os.path.exists(self.log1), 'Unexpected log {0}'.format(self.log1)) - - -# vi: ts=4 expandtab syntax=python diff --git a/cloudinit/cmd/tests/test_cloud_id.py b/cloudinit/cmd/tests/test_cloud_id.py deleted file mode 100644 index 3f3727fd..00000000 --- a/cloudinit/cmd/tests/test_cloud_id.py +++ /dev/null @@ -1,127 +0,0 @@ -# This file is part of cloud-init. See LICENSE file for license information. - -"""Tests for cloud-id command line utility.""" - -from cloudinit import util -from collections import namedtuple -from io import StringIO - -from cloudinit.cmd import cloud_id - -from cloudinit.tests.helpers import CiTestCase, mock - - -class TestCloudId(CiTestCase): - - args = namedtuple('cloudidargs', ('instance_data json long')) - - def setUp(self): - super(TestCloudId, self).setUp() - self.tmp = self.tmp_dir() - self.instance_data = self.tmp_path('instance-data.json', dir=self.tmp) - - def test_cloud_id_arg_parser_defaults(self): - """Validate the argument defaults when not provided by the end-user.""" - cmd = ['cloud-id'] - with mock.patch('sys.argv', cmd): - args = cloud_id.get_parser().parse_args() - self.assertEqual( - '/run/cloud-init/instance-data.json', - args.instance_data) - self.assertEqual(False, args.long) - self.assertEqual(False, args.json) - - def test_cloud_id_arg_parse_overrides(self): - """Override argument defaults by specifying values for each param.""" - util.write_file(self.instance_data, '{}') - cmd = ['cloud-id', '--instance-data', self.instance_data, '--long', - '--json'] - with mock.patch('sys.argv', cmd): - args = cloud_id.get_parser().parse_args() - self.assertEqual(self.instance_data, args.instance_data) - self.assertEqual(True, args.long) - self.assertEqual(True, args.json) - - def test_cloud_id_missing_instance_data_json(self): - """Exit error when the provided instance-data.json does not exist.""" - cmd = ['cloud-id', '--instance-data', self.instance_data] - with mock.patch('sys.argv', cmd): - with mock.patch('sys.stderr', new_callable=StringIO) as m_stderr: - with self.assertRaises(SystemExit) as context_manager: - cloud_id.main() - self.assertEqual(1, context_manager.exception.code) - self.assertIn( - "ERROR: File not found '%s'" % self.instance_data, - m_stderr.getvalue()) - - def test_cloud_id_non_json_instance_data(self): - """Exit error when the provided instance-data.json is not json.""" - cmd = ['cloud-id', '--instance-data', self.instance_data] - util.write_file(self.instance_data, '{') - with mock.patch('sys.argv', cmd): - with mock.patch('sys.stderr', new_callable=StringIO) as m_stderr: - with self.assertRaises(SystemExit) as context_manager: - cloud_id.main() - self.assertEqual(1, context_manager.exception.code) - self.assertIn( - "ERROR: File '%s' is not valid json." % self.instance_data, - m_stderr.getvalue()) - - def test_cloud_id_from_cloud_name_in_instance_data(self): - """Report canonical cloud-id from cloud_name in instance-data.""" - util.write_file( - self.instance_data, - '{"v1": {"cloud_name": "mycloud", "region": "somereg"}}') - cmd = ['cloud-id', '--instance-data', self.instance_data] - with mock.patch('sys.argv', cmd): - with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout: - with self.assertRaises(SystemExit) as context_manager: - cloud_id.main() - self.assertEqual(0, context_manager.exception.code) - self.assertEqual("mycloud\n", m_stdout.getvalue()) - - def test_cloud_id_long_name_from_instance_data(self): - """Report long cloud-id format from cloud_name and region.""" - util.write_file( - self.instance_data, - '{"v1": {"cloud_name": "mycloud", "region": "somereg"}}') - cmd = ['cloud-id', '--instance-data', self.instance_data, '--long'] - with mock.patch('sys.argv', cmd): - with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout: - with self.assertRaises(SystemExit) as context_manager: - cloud_id.main() - self.assertEqual(0, context_manager.exception.code) - self.assertEqual("mycloud\tsomereg\n", m_stdout.getvalue()) - - def test_cloud_id_lookup_from_instance_data_region(self): - """Report discovered canonical cloud_id when region lookup matches.""" - util.write_file( - self.instance_data, - '{"v1": {"cloud_name": "aws", "region": "cn-north-1",' - ' "platform": "ec2"}}') - cmd = ['cloud-id', '--instance-data', self.instance_data, '--long'] - with mock.patch('sys.argv', cmd): - with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout: - with self.assertRaises(SystemExit) as context_manager: - cloud_id.main() - self.assertEqual(0, context_manager.exception.code) - self.assertEqual("aws-china\tcn-north-1\n", m_stdout.getvalue()) - - def test_cloud_id_lookup_json_instance_data_adds_cloud_id_to_json(self): - """Report v1 instance-data content with cloud_id when --json set.""" - util.write_file( - self.instance_data, - '{"v1": {"cloud_name": "unknown", "region": "dfw",' - ' "platform": "openstack", "public_ssh_keys": []}}') - expected = util.json_dumps({ - 'cloud_id': 'openstack', 'cloud_name': 'unknown', - 'platform': 'openstack', 'public_ssh_keys': [], 'region': 'dfw'}) - cmd = ['cloud-id', '--instance-data', self.instance_data, '--json'] - with mock.patch('sys.argv', cmd): - with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout: - with self.assertRaises(SystemExit) as context_manager: - cloud_id.main() - self.assertEqual(0, context_manager.exception.code) - self.assertEqual(expected + '\n', m_stdout.getvalue()) - -# vi: ts=4 expandtab diff --git a/cloudinit/cmd/tests/test_main.py b/cloudinit/cmd/tests/test_main.py deleted file mode 100644 index 2e380848..00000000 --- a/cloudinit/cmd/tests/test_main.py +++ /dev/null @@ -1,188 +0,0 @@ -# This file is part of cloud-init. See LICENSE file for license information. - -from collections import namedtuple -import copy -import os -from io import StringIO -from unittest import mock - -import pytest - -from cloudinit.cmd import main -from cloudinit import safeyaml -from cloudinit.util import ( - ensure_dir, load_file, write_file) -from cloudinit.tests.helpers import ( - FilesystemMockingTestCase, wrap_and_call) - -mypaths = namedtuple('MyPaths', 'run_dir') -myargs = namedtuple('MyArgs', 'debug files force local reporter subcommand') - - -class TestMain(FilesystemMockingTestCase): - with_logs = True - allowed_subp = False - - def setUp(self): - super(TestMain, self).setUp() - self.new_root = self.tmp_dir() - self.cloud_dir = self.tmp_path('var/lib/cloud/', dir=self.new_root) - os.makedirs(self.cloud_dir) - self.replicateTestRoot('simple_ubuntu', self.new_root) - self.cfg = { - 'datasource_list': ['None'], - 'runcmd': ['ls /etc'], # test ALL_DISTROS - 'system_info': {'paths': {'cloud_dir': self.cloud_dir, - 'run_dir': self.new_root}}, - 'write_files': [ - { - 'path': '/etc/blah.ini', - 'content': 'blah', - 'permissions': 0o755, - }, - ], - 'cloud_init_modules': ['write-files', 'runcmd'], - } - cloud_cfg = safeyaml.dumps(self.cfg) - ensure_dir(os.path.join(self.new_root, 'etc', 'cloud')) - self.cloud_cfg_file = os.path.join( - self.new_root, 'etc', 'cloud', 'cloud.cfg') - write_file(self.cloud_cfg_file, cloud_cfg) - self.patchOS(self.new_root) - self.patchUtils(self.new_root) - self.stderr = StringIO() - self.patchStdoutAndStderr(stderr=self.stderr) - - def test_main_init_run_net_stops_on_file_no_net(self): - """When no-net file is present, main_init does not process modules.""" - stop_file = os.path.join(self.cloud_dir, 'data', 'no-net') # stop file - write_file(stop_file, '') - cmdargs = myargs( - debug=False, files=None, force=False, local=False, reporter=None, - subcommand='init') - (_item1, item2) = wrap_and_call( - 'cloudinit.cmd.main', - {'util.close_stdin': True, - 'netinfo.debug_info': 'my net debug info', - 'util.fixup_output': ('outfmt', 'errfmt')}, - main.main_init, 'init', cmdargs) - # We should not run write_files module - self.assertFalse( - os.path.exists(os.path.join(self.new_root, 'etc/blah.ini')), - 'Unexpected run of write_files module produced blah.ini') - self.assertEqual([], item2) - # Instancify is called - instance_id_path = 'var/lib/cloud/data/instance-id' - self.assertFalse( - os.path.exists(os.path.join(self.new_root, instance_id_path)), - 'Unexpected call to datasource.instancify produced instance-id') - expected_logs = [ - "Exiting. stop file ['{stop_file}'] existed\n".format( - stop_file=stop_file), - 'my net debug info' # netinfo.debug_info - ] - for log in expected_logs: - self.assertIn(log, self.stderr.getvalue()) - - def test_main_init_run_net_runs_modules(self): - """Modules like write_files are run in 'net' mode.""" - cmdargs = myargs( - debug=False, files=None, force=False, local=False, reporter=None, - subcommand='init') - (_item1, item2) = wrap_and_call( - 'cloudinit.cmd.main', - {'util.close_stdin': True, - 'netinfo.debug_info': 'my net debug info', - 'util.fixup_output': ('outfmt', 'errfmt')}, - main.main_init, 'init', cmdargs) - self.assertEqual([], item2) - # Instancify is called - instance_id_path = 'var/lib/cloud/data/instance-id' - self.assertEqual( - 'iid-datasource-none\n', - os.path.join(load_file( - os.path.join(self.new_root, instance_id_path)))) - # modules are run (including write_files) - self.assertEqual( - 'blah', load_file(os.path.join(self.new_root, 'etc/blah.ini'))) - expected_logs = [ - 'network config is disabled by fallback', # apply_network_config - 'my net debug info', # netinfo.debug_info - 'no previous run detected' - ] - for log in expected_logs: - self.assertIn(log, self.stderr.getvalue()) - - def test_main_init_run_net_calls_set_hostname_when_metadata_present(self): - """When local-hostname metadata is present, call cc_set_hostname.""" - self.cfg['datasource'] = { - 'None': {'metadata': {'local-hostname': 'md-hostname'}}} - cloud_cfg = safeyaml.dumps(self.cfg) - write_file(self.cloud_cfg_file, cloud_cfg) - cmdargs = myargs( - debug=False, files=None, force=False, local=False, reporter=None, - subcommand='init') - - def set_hostname(name, cfg, cloud, log, args): - self.assertEqual('set-hostname', name) - updated_cfg = copy.deepcopy(self.cfg) - updated_cfg.update( - {'def_log_file': '/var/log/cloud-init.log', - 'log_cfgs': [], - 'syslog_fix_perms': [ - 'syslog:adm', 'root:adm', 'root:wheel', 'root:root' - ], - 'vendor_data': {'enabled': True, 'prefix': []}, - 'vendor_data2': {'enabled': True, 'prefix': []}}) - updated_cfg.pop('system_info') - - self.assertEqual(updated_cfg, cfg) - self.assertEqual(main.LOG, log) - self.assertIsNone(args) - - (_item1, item2) = wrap_and_call( - 'cloudinit.cmd.main', - {'util.close_stdin': True, - 'netinfo.debug_info': 'my net debug info', - 'cc_set_hostname.handle': {'side_effect': set_hostname}, - 'util.fixup_output': ('outfmt', 'errfmt')}, - main.main_init, 'init', cmdargs) - self.assertEqual([], item2) - # Instancify is called - instance_id_path = 'var/lib/cloud/data/instance-id' - self.assertEqual( - 'iid-datasource-none\n', - os.path.join(load_file( - os.path.join(self.new_root, instance_id_path)))) - # modules are run (including write_files) - self.assertEqual( - 'blah', load_file(os.path.join(self.new_root, 'etc/blah.ini'))) - expected_logs = [ - 'network config is disabled by fallback', # apply_network_config - 'my net debug info', # netinfo.debug_info - 'no previous run detected' - ] - for log in expected_logs: - self.assertIn(log, self.stderr.getvalue()) - - -class TestShouldBringUpInterfaces: - @pytest.mark.parametrize('cfg_disable,args_local,expected', [ - (True, True, False), - (True, False, False), - (False, True, False), - (False, False, True), - ]) - def test_should_bring_up_interfaces( - self, cfg_disable, args_local, expected - ): - init = mock.Mock() - init.cfg = {'disable_network_activation': cfg_disable} - - args = mock.Mock() - args.local = args_local - - result = main._should_bring_up_interfaces(init, args) - assert result == expected - -# vi: ts=4 expandtab diff --git a/cloudinit/cmd/tests/test_query.py b/cloudinit/cmd/tests/test_query.py deleted file mode 100644 index d96c3945..00000000 --- a/cloudinit/cmd/tests/test_query.py +++ /dev/null @@ -1,392 +0,0 @@ -# This file is part of cloud-init. See LICENSE file for license information. - -import errno -import gzip -from io import BytesIO -import json -from textwrap import dedent - -import pytest - -from collections import namedtuple -from cloudinit.cmd import query -from cloudinit.helpers import Paths -from cloudinit.sources import ( - REDACT_SENSITIVE_VALUE, INSTANCE_JSON_FILE, INSTANCE_JSON_SENSITIVE_FILE) -from cloudinit.tests.helpers import mock - -from cloudinit.util import b64e, write_file - - -def _gzip_data(data): - with BytesIO() as iobuf: - with gzip.GzipFile(mode="wb", fileobj=iobuf) as gzfp: - gzfp.write(data) - return iobuf.getvalue() - - -@mock.patch("cloudinit.cmd.query.addLogHandlerCLI", lambda *args: "") -class TestQuery: - - args = namedtuple( - 'queryargs', - ('debug dump_all format instance_data list_keys user_data vendor_data' - ' varname')) - - def _setup_paths(self, tmpdir, ud_val=None, vd_val=None): - """Write userdata and vendordata into a tmpdir. - - Return: - 4-tuple : (paths, run_dir_path, userdata_path, vendordata_path) - """ - if ud_val: - user_data = tmpdir.join('user-data') - write_file(user_data.strpath, ud_val) - else: - user_data = None - if vd_val: - vendor_data = tmpdir.join('vendor-data') - write_file(vendor_data.strpath, vd_val) - else: - vendor_data = None - run_dir = tmpdir.join('run_dir') - run_dir.ensure_dir() - return ( - Paths({'run_dir': run_dir.strpath}), - run_dir, - user_data, - vendor_data - ) - - def test_handle_args_error_on_missing_param(self, caplog, capsys): - """Error when missing required parameters and print usage.""" - args = self.args( - debug=False, dump_all=False, format=None, instance_data=None, - list_keys=False, user_data=None, vendor_data=None, varname=None) - with mock.patch( - "cloudinit.cmd.query.addLogHandlerCLI", return_value="" - ) as m_cli_log: - assert 1 == query.handle_args('anyname', args) - expected_error = ( - 'Expected one of the options: --all, --format, --list-keys' - ' or varname\n') - assert expected_error in caplog.text - out, _err = capsys.readouterr() - assert 'usage: query' in out - assert 1 == m_cli_log.call_count - - @pytest.mark.parametrize( - "inst_data,varname,expected_error", ( - ( - '{"v1": {"key-2": "value-2"}}', - 'v1.absent_leaf', - "instance-data 'v1' has no 'absent_leaf'\n" - ), - ( - '{"v1": {"key-2": "value-2"}}', - 'absent_key', - "Undefined instance-data key 'absent_key'\n" - ), - ) - ) - def test_handle_args_error_on_invalid_vaname_paths( - self, inst_data, varname, expected_error, caplog, tmpdir - ): - """Error when varname is not a valid instance-data variable path.""" - instance_data = tmpdir.join('instance-data') - instance_data.write(inst_data) - args = self.args( - debug=False, dump_all=False, format=None, - instance_data=instance_data.strpath, - list_keys=False, user_data=None, vendor_data=None, varname=varname - ) - paths, _, _, _ = self._setup_paths(tmpdir) - with mock.patch('cloudinit.cmd.query.read_cfg_paths') as m_paths: - m_paths.return_value = paths - with mock.patch( - "cloudinit.cmd.query.addLogHandlerCLI", return_value="" - ): - assert 1 == query.handle_args('anyname', args) - assert expected_error in caplog.text - - def test_handle_args_error_on_missing_instance_data(self, caplog, tmpdir): - """When instance_data file path does not exist, log an error.""" - absent_fn = tmpdir.join('absent') - args = self.args( - debug=False, dump_all=True, format=None, - instance_data=absent_fn.strpath, - list_keys=False, user_data='ud', vendor_data='vd', varname=None) - assert 1 == query.handle_args('anyname', args) - - msg = 'Missing instance-data file: %s' % absent_fn - assert msg in caplog.text - - def test_handle_args_error_when_no_read_permission_instance_data( - self, caplog, tmpdir - ): - """When instance_data file is unreadable, log an error.""" - noread_fn = tmpdir.join('unreadable') - noread_fn.write('thou shall not pass') - args = self.args( - debug=False, dump_all=True, format=None, - instance_data=noread_fn.strpath, - list_keys=False, user_data='ud', vendor_data='vd', varname=None) - with mock.patch('cloudinit.cmd.query.util.load_file') as m_load: - m_load.side_effect = OSError(errno.EACCES, 'Not allowed') - assert 1 == query.handle_args('anyname', args) - msg = "No read permission on '%s'. Try sudo" % noread_fn - assert msg in caplog.text - - def test_handle_args_defaults_instance_data(self, caplog, tmpdir): - """When no instance_data argument, default to configured run_dir.""" - args = self.args( - debug=False, dump_all=True, format=None, instance_data=None, - list_keys=False, user_data=None, vendor_data=None, varname=None) - paths, run_dir, _, _ = self._setup_paths(tmpdir) - with mock.patch('cloudinit.cmd.query.read_cfg_paths') as m_paths: - m_paths.return_value = paths - assert 1 == query.handle_args('anyname', args) - json_file = run_dir.join(INSTANCE_JSON_FILE) - msg = 'Missing instance-data file: %s' % json_file.strpath - assert msg in caplog.text - - def test_handle_args_root_fallsback_to_instance_data(self, caplog, tmpdir): - """When no instance_data argument, root falls back to redacted json.""" - args = self.args( - debug=False, dump_all=True, format=None, instance_data=None, - list_keys=False, user_data=None, vendor_data=None, varname=None) - paths, run_dir, _, _ = self._setup_paths(tmpdir) - with mock.patch('cloudinit.cmd.query.read_cfg_paths') as m_paths: - m_paths.return_value = paths - with mock.patch('os.getuid') as m_getuid: - m_getuid.return_value = 0 - assert 1 == query.handle_args('anyname', args) - json_file = run_dir.join(INSTANCE_JSON_FILE) - sensitive_file = run_dir.join(INSTANCE_JSON_SENSITIVE_FILE) - msg = ( - 'Missing root-readable %s. Using redacted %s instead.' % - ( - sensitive_file.strpath, json_file.strpath - ) - ) - assert msg in caplog.text - - @pytest.mark.parametrize( - 'ud_src,ud_expected,vd_src,vd_expected', - ( - ('hi mom', 'hi mom', 'hi pops', 'hi pops'), - ('ud'.encode('utf-8'), 'ud', 'vd'.encode('utf-8'), 'vd'), - (_gzip_data(b'ud'), 'ud', _gzip_data(b'vd'), 'vd'), - (_gzip_data('ud'.encode('utf-8')), 'ud', _gzip_data(b'vd'), 'vd'), - ) - ) - def test_handle_args_root_processes_user_data( - self, ud_src, ud_expected, vd_src, vd_expected, capsys, tmpdir - ): - """Support reading multiple user-data file content types""" - paths, run_dir, user_data, vendor_data = self._setup_paths( - tmpdir, ud_val=ud_src, vd_val=vd_src - ) - sensitive_file = run_dir.join(INSTANCE_JSON_SENSITIVE_FILE) - sensitive_file.write('{"my-var": "it worked"}') - args = self.args( - debug=False, dump_all=True, format=None, instance_data=None, - list_keys=False, user_data=user_data.strpath, - vendor_data=vendor_data.strpath, varname=None) - with mock.patch('cloudinit.cmd.query.read_cfg_paths') as m_paths: - m_paths.return_value = paths - with mock.patch('os.getuid') as m_getuid: - m_getuid.return_value = 0 - assert 0 == query.handle_args('anyname', args) - out, _err = capsys.readouterr() - cmd_output = json.loads(out) - assert "it worked" == cmd_output['my-var'] - if ud_expected == "ci-b64:": - ud_expected = "ci-b64:{}".format(b64e(ud_src)) - if vd_expected == "ci-b64:": - vd_expected = "ci-b64:{}".format(b64e(vd_src)) - assert ud_expected == cmd_output['userdata'] - assert vd_expected == cmd_output['vendordata'] - - def test_handle_args_root_uses_instance_sensitive_data( - self, capsys, tmpdir - ): - """When no instance_data argument, root uses sensitive json.""" - paths, run_dir, user_data, vendor_data = self._setup_paths( - tmpdir, ud_val='ud', vd_val='vd' - ) - sensitive_file = run_dir.join(INSTANCE_JSON_SENSITIVE_FILE) - sensitive_file.write('{"my-var": "it worked"}') - args = self.args( - debug=False, dump_all=True, format=None, instance_data=None, - list_keys=False, user_data=user_data.strpath, - vendor_data=vendor_data.strpath, varname=None) - with mock.patch('cloudinit.cmd.query.read_cfg_paths') as m_paths: - m_paths.return_value = paths - with mock.patch('os.getuid') as m_getuid: - m_getuid.return_value = 0 - assert 0 == query.handle_args('anyname', args) - expected = ( - '{\n "my-var": "it worked",\n ' - '"userdata": "ud",\n "vendordata": "vd"\n}\n' - ) - out, _err = capsys.readouterr() - assert expected == out - - def test_handle_args_dumps_all_instance_data(self, capsys, tmpdir): - """When --all is specified query will dump all instance data vars.""" - instance_data = tmpdir.join('instance-data') - instance_data.write('{"my-var": "it worked"}') - args = self.args( - debug=False, dump_all=True, format=None, - instance_data=instance_data.strpath, list_keys=False, - user_data='ud', vendor_data='vd', varname=None) - with mock.patch('os.getuid') as m_getuid: - m_getuid.return_value = 100 - assert 0 == query.handle_args('anyname', args) - expected = ( - '{\n "my-var": "it worked",\n "userdata": "<%s> file:ud",\n' - ' "vendordata": "<%s> file:vd"\n}\n' % ( - REDACT_SENSITIVE_VALUE, REDACT_SENSITIVE_VALUE - ) - ) - out, _err = capsys.readouterr() - assert expected == out - - def test_handle_args_returns_top_level_varname(self, capsys, tmpdir): - """When the argument varname is passed, report its value.""" - instance_data = tmpdir.join('instance-data') - instance_data.write('{"my-var": "it worked"}') - args = self.args( - debug=False, dump_all=True, format=None, - instance_data=instance_data.strpath, list_keys=False, - user_data='ud', vendor_data='vd', varname='my_var') - with mock.patch('os.getuid') as m_getuid: - m_getuid.return_value = 100 - assert 0 == query.handle_args('anyname', args) - out, _err = capsys.readouterr() - assert 'it worked\n' == out - - @pytest.mark.parametrize( - 'inst_data,varname,expected', - ( - ( - '{"v1": {"key-2": "value-2"}, "my-var": "it worked"}', - 'v1.key_2', - 'value-2\n' - ), - # Assert no jinja underscore-delimited aliases are reported on CLI - ( - '{"v1": {"something-hyphenated": {"no.underscores":"x",' - ' "no-alias": "y"}}, "my-var": "it worked"}', - 'v1.something_hyphenated', - '{\n "no-alias": "y",\n "no.underscores": "x"\n}\n' - ), - ) - ) - def test_handle_args_returns_nested_varname( - self, inst_data, varname, expected, capsys, tmpdir - ): - """If user_data file is a jinja template render instance-data vars.""" - instance_data = tmpdir.join('instance-data') - instance_data.write(inst_data) - args = self.args( - debug=False, dump_all=False, format=None, - instance_data=instance_data.strpath, user_data='ud', - vendor_data='vd', list_keys=False, varname=varname) - with mock.patch('os.getuid') as m_getuid: - m_getuid.return_value = 100 - assert 0 == query.handle_args('anyname', args) - out, _err = capsys.readouterr() - assert expected == out - - def test_handle_args_returns_standardized_vars_to_top_level_aliases( - self, capsys, tmpdir - ): - """Any standardized vars under v# are promoted as top-level aliases.""" - instance_data = tmpdir.join('instance-data') - instance_data.write( - '{"v1": {"v1_1": "val1.1"}, "v2": {"v2_2": "val2.2"},' - ' "top": "gun"}') - expected = dedent("""\ - { - "top": "gun", - "userdata": "<redacted for non-root user> file:ud", - "v1": { - "v1_1": "val1.1" - }, - "v1_1": "val1.1", - "v2": { - "v2_2": "val2.2" - }, - "v2_2": "val2.2", - "vendordata": "<redacted for non-root user> file:vd" - } - """) - args = self.args( - debug=False, dump_all=True, format=None, - instance_data=instance_data.strpath, user_data='ud', - vendor_data='vd', list_keys=False, varname=None) - with mock.patch('os.getuid') as m_getuid: - m_getuid.return_value = 100 - assert 0 == query.handle_args('anyname', args) - out, _err = capsys.readouterr() - assert expected == out - - def test_handle_args_list_keys_sorts_top_level_keys_when_no_varname( - self, capsys, tmpdir - ): - """Sort all top-level keys when only --list-keys provided.""" - instance_data = tmpdir.join('instance-data') - instance_data.write( - '{"v1": {"v1_1": "val1.1"}, "v2": {"v2_2": "val2.2"},' - ' "top": "gun"}') - expected = 'top\nuserdata\nv1\nv1_1\nv2\nv2_2\nvendordata\n' - args = self.args( - debug=False, dump_all=False, format=None, - instance_data=instance_data.strpath, list_keys=True, - user_data='ud', vendor_data='vd', varname=None) - with mock.patch('os.getuid') as m_getuid: - m_getuid.return_value = 100 - assert 0 == query.handle_args('anyname', args) - out, _err = capsys.readouterr() - assert expected == out - - def test_handle_args_list_keys_sorts_nested_keys_when_varname( - self, capsys, tmpdir - ): - """Sort all nested keys of varname object when --list-keys provided.""" - instance_data = tmpdir.join('instance-data') - instance_data.write( - '{"v1": {"v1_1": "val1.1", "v1_2": "val1.2"}, "v2":' + - ' {"v2_2": "val2.2"}, "top": "gun"}') - expected = 'v1_1\nv1_2\n' - args = self.args( - debug=False, dump_all=False, format=None, - instance_data=instance_data.strpath, list_keys=True, - user_data='ud', vendor_data='vd', varname='v1') - with mock.patch('os.getuid') as m_getuid: - m_getuid.return_value = 100 - assert 0 == query.handle_args('anyname', args) - out, _err = capsys.readouterr() - assert expected == out - - def test_handle_args_list_keys_errors_when_varname_is_not_a_dict( - self, caplog, tmpdir - ): - """Raise an error when --list-keys and varname specify a non-list.""" - instance_data = tmpdir.join('instance-data') - instance_data.write( - '{"v1": {"v1_1": "val1.1", "v1_2": "val1.2"}, "v2": ' + - '{"v2_2": "val2.2"}, "top": "gun"}') - expected_error = "--list-keys provided but 'top' is not a dict" - args = self.args( - debug=False, dump_all=False, format=None, - instance_data=instance_data.strpath, list_keys=True, - user_data='ud', vendor_data='vd', varname='top') - with mock.patch('os.getuid') as m_getuid: - m_getuid.return_value = 100 - assert 1 == query.handle_args('anyname', args) - assert expected_error in caplog.text - -# vi: ts=4 expandtab diff --git a/cloudinit/cmd/tests/test_status.py b/cloudinit/cmd/tests/test_status.py deleted file mode 100644 index 1c9eec37..00000000 --- a/cloudinit/cmd/tests/test_status.py +++ /dev/null @@ -1,391 +0,0 @@ -# This file is part of cloud-init. See LICENSE file for license information. - -from collections import namedtuple -import os -from io import StringIO -from textwrap import dedent - -from cloudinit.atomic_helper import write_json -from cloudinit.cmd import status -from cloudinit.util import ensure_file -from cloudinit.tests.helpers import CiTestCase, wrap_and_call, mock - -mypaths = namedtuple('MyPaths', 'run_dir') -myargs = namedtuple('MyArgs', 'long wait') - - -class TestStatus(CiTestCase): - - def setUp(self): - super(TestStatus, self).setUp() - self.new_root = self.tmp_dir() - self.status_file = self.tmp_path('status.json', self.new_root) - self.disable_file = self.tmp_path('cloudinit-disable', self.new_root) - self.paths = mypaths(run_dir=self.new_root) - - class FakeInit(object): - paths = self.paths - - def __init__(self, ds_deps): - pass - - def read_cfg(self): - pass - - self.init_class = FakeInit - - def test__is_cloudinit_disabled_false_on_sysvinit(self): - '''When not in an environment using systemd, return False.''' - ensure_file(self.disable_file) # Create the ignored disable file - (is_disabled, reason) = wrap_and_call( - 'cloudinit.cmd.status', - {'uses_systemd': False, - 'get_cmdline': "root=/dev/my-root not-important"}, - status._is_cloudinit_disabled, self.disable_file, self.paths) - self.assertFalse( - is_disabled, 'expected enabled cloud-init on sysvinit') - self.assertEqual('Cloud-init enabled on sysvinit', reason) - - def test__is_cloudinit_disabled_true_on_disable_file(self): - '''When using systemd and disable_file is present return disabled.''' - ensure_file(self.disable_file) # Create observed disable file - (is_disabled, reason) = wrap_and_call( - 'cloudinit.cmd.status', - {'uses_systemd': True, - 'get_cmdline': "root=/dev/my-root not-important"}, - status._is_cloudinit_disabled, self.disable_file, self.paths) - self.assertTrue(is_disabled, 'expected disabled cloud-init') - self.assertEqual( - 'Cloud-init disabled by {0}'.format(self.disable_file), reason) - - def test__is_cloudinit_disabled_false_on_kernel_cmdline_enable(self): - '''Not disabled when using systemd and enabled via commandline.''' - ensure_file(self.disable_file) # Create ignored disable file - (is_disabled, reason) = wrap_and_call( - 'cloudinit.cmd.status', - {'uses_systemd': True, - 'get_cmdline': 'something cloud-init=enabled else'}, - status._is_cloudinit_disabled, self.disable_file, self.paths) - self.assertFalse(is_disabled, 'expected enabled cloud-init') - self.assertEqual( - 'Cloud-init enabled by kernel command line cloud-init=enabled', - reason) - - def test__is_cloudinit_disabled_true_on_kernel_cmdline(self): - '''When using systemd and disable_file is present return disabled.''' - (is_disabled, reason) = wrap_and_call( - 'cloudinit.cmd.status', - {'uses_systemd': True, - 'get_cmdline': 'something cloud-init=disabled else'}, - status._is_cloudinit_disabled, self.disable_file, self.paths) - self.assertTrue(is_disabled, 'expected disabled cloud-init') - self.assertEqual( - 'Cloud-init disabled by kernel parameter cloud-init=disabled', - reason) - - def test__is_cloudinit_disabled_true_when_generator_disables(self): - '''When cloud-init-generator doesn't write enabled file return True.''' - enabled_file = os.path.join(self.paths.run_dir, 'enabled') - self.assertFalse(os.path.exists(enabled_file)) - (is_disabled, reason) = wrap_and_call( - 'cloudinit.cmd.status', - {'uses_systemd': True, - 'get_cmdline': 'something'}, - status._is_cloudinit_disabled, self.disable_file, self.paths) - self.assertTrue(is_disabled, 'expected disabled cloud-init') - self.assertEqual('Cloud-init disabled by cloud-init-generator', reason) - - def test__is_cloudinit_disabled_false_when_enabled_in_systemd(self): - '''Report enabled when systemd generator creates the enabled file.''' - enabled_file = os.path.join(self.paths.run_dir, 'enabled') - ensure_file(enabled_file) - (is_disabled, reason) = wrap_and_call( - 'cloudinit.cmd.status', - {'uses_systemd': True, - 'get_cmdline': 'something ignored'}, - status._is_cloudinit_disabled, self.disable_file, self.paths) - self.assertFalse(is_disabled, 'expected enabled cloud-init') - self.assertEqual( - 'Cloud-init enabled by systemd cloud-init-generator', reason) - - def test_status_returns_not_run(self): - '''When status.json does not exist yet, return 'not run'.''' - self.assertFalse( - os.path.exists(self.status_file), 'Unexpected status.json found') - cmdargs = myargs(long=False, wait=False) - with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout: - retcode = wrap_and_call( - 'cloudinit.cmd.status', - {'_is_cloudinit_disabled': (False, ''), - 'Init': {'side_effect': self.init_class}}, - status.handle_status_args, 'ignored', cmdargs) - self.assertEqual(0, retcode) - self.assertEqual('status: not run\n', m_stdout.getvalue()) - - def test_status_returns_disabled_long_on_presence_of_disable_file(self): - '''When cloudinit is disabled, return disabled reason.''' - - checked_files = [] - - def fakeexists(filepath): - checked_files.append(filepath) - status_file = os.path.join(self.paths.run_dir, 'status.json') - return bool(not filepath == status_file) - - cmdargs = myargs(long=True, wait=False) - with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout: - retcode = wrap_and_call( - 'cloudinit.cmd.status', - {'os.path.exists': {'side_effect': fakeexists}, - '_is_cloudinit_disabled': (True, 'disabled for some reason'), - 'Init': {'side_effect': self.init_class}}, - status.handle_status_args, 'ignored', cmdargs) - self.assertEqual(0, retcode) - self.assertEqual( - [os.path.join(self.paths.run_dir, 'status.json')], - checked_files) - expected = dedent('''\ - status: disabled - detail: - disabled for some reason - ''') - self.assertEqual(expected, m_stdout.getvalue()) - - def test_status_returns_running_on_no_results_json(self): - '''Report running when status.json exists but result.json does not.''' - result_file = self.tmp_path('result.json', self.new_root) - write_json(self.status_file, {}) - self.assertFalse( - os.path.exists(result_file), 'Unexpected result.json found') - cmdargs = myargs(long=False, wait=False) - with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout: - retcode = wrap_and_call( - 'cloudinit.cmd.status', - {'_is_cloudinit_disabled': (False, ''), - 'Init': {'side_effect': self.init_class}}, - status.handle_status_args, 'ignored', cmdargs) - self.assertEqual(0, retcode) - self.assertEqual('status: running\n', m_stdout.getvalue()) - - def test_status_returns_running(self): - '''Report running when status exists with an unfinished stage.''' - ensure_file(self.tmp_path('result.json', self.new_root)) - write_json(self.status_file, - {'v1': {'init': {'start': 1, 'finished': None}}}) - cmdargs = myargs(long=False, wait=False) - with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout: - retcode = wrap_and_call( - 'cloudinit.cmd.status', - {'_is_cloudinit_disabled': (False, ''), - 'Init': {'side_effect': self.init_class}}, - status.handle_status_args, 'ignored', cmdargs) - self.assertEqual(0, retcode) - self.assertEqual('status: running\n', m_stdout.getvalue()) - - def test_status_returns_done(self): - '''Report done results.json exists no stages are unfinished.''' - ensure_file(self.tmp_path('result.json', self.new_root)) - write_json( - self.status_file, - {'v1': {'stage': None, # No current stage running - 'datasource': ( - 'DataSourceNoCloud [seed=/var/.../seed/nocloud-net]' - '[dsmode=net]'), - 'blah': {'finished': 123.456}, - 'init': {'errors': [], 'start': 124.567, - 'finished': 125.678}, - 'init-local': {'start': 123.45, 'finished': 123.46}}}) - cmdargs = myargs(long=False, wait=False) - with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout: - retcode = wrap_and_call( - 'cloudinit.cmd.status', - {'_is_cloudinit_disabled': (False, ''), - 'Init': {'side_effect': self.init_class}}, - status.handle_status_args, 'ignored', cmdargs) - self.assertEqual(0, retcode) - self.assertEqual('status: done\n', m_stdout.getvalue()) - - def test_status_returns_done_long(self): - '''Long format of done status includes datasource info.''' - ensure_file(self.tmp_path('result.json', self.new_root)) - write_json( - self.status_file, - {'v1': {'stage': None, - 'datasource': ( - 'DataSourceNoCloud [seed=/var/.../seed/nocloud-net]' - '[dsmode=net]'), - 'init': {'start': 124.567, 'finished': 125.678}, - 'init-local': {'start': 123.45, 'finished': 123.46}}}) - cmdargs = myargs(long=True, wait=False) - with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout: - retcode = wrap_and_call( - 'cloudinit.cmd.status', - {'_is_cloudinit_disabled': (False, ''), - 'Init': {'side_effect': self.init_class}}, - status.handle_status_args, 'ignored', cmdargs) - self.assertEqual(0, retcode) - expected = dedent('''\ - status: done - time: Thu, 01 Jan 1970 00:02:05 +0000 - detail: - DataSourceNoCloud [seed=/var/.../seed/nocloud-net][dsmode=net] - ''') - self.assertEqual(expected, m_stdout.getvalue()) - - def test_status_on_errors(self): - '''Reports error when any stage has errors.''' - write_json( - self.status_file, - {'v1': {'stage': None, - 'blah': {'errors': [], 'finished': 123.456}, - 'init': {'errors': ['error1'], 'start': 124.567, - 'finished': 125.678}, - 'init-local': {'start': 123.45, 'finished': 123.46}}}) - cmdargs = myargs(long=False, wait=False) - with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout: - retcode = wrap_and_call( - 'cloudinit.cmd.status', - {'_is_cloudinit_disabled': (False, ''), - 'Init': {'side_effect': self.init_class}}, - status.handle_status_args, 'ignored', cmdargs) - self.assertEqual(1, retcode) - self.assertEqual('status: error\n', m_stdout.getvalue()) - - def test_status_on_errors_long(self): - '''Long format of error status includes all error messages.''' - write_json( - self.status_file, - {'v1': {'stage': None, - 'datasource': ( - 'DataSourceNoCloud [seed=/var/.../seed/nocloud-net]' - '[dsmode=net]'), - 'init': {'errors': ['error1'], 'start': 124.567, - 'finished': 125.678}, - 'init-local': {'errors': ['error2', 'error3'], - 'start': 123.45, 'finished': 123.46}}}) - cmdargs = myargs(long=True, wait=False) - with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout: - retcode = wrap_and_call( - 'cloudinit.cmd.status', - {'_is_cloudinit_disabled': (False, ''), - 'Init': {'side_effect': self.init_class}}, - status.handle_status_args, 'ignored', cmdargs) - self.assertEqual(1, retcode) - expected = dedent('''\ - status: error - time: Thu, 01 Jan 1970 00:02:05 +0000 - detail: - error1 - error2 - error3 - ''') - self.assertEqual(expected, m_stdout.getvalue()) - - def test_status_returns_running_long_format(self): - '''Long format reports the stage in which we are running.''' - write_json( - self.status_file, - {'v1': {'stage': 'init', - 'init': {'start': 124.456, 'finished': None}, - 'init-local': {'start': 123.45, 'finished': 123.46}}}) - cmdargs = myargs(long=True, wait=False) - with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout: - retcode = wrap_and_call( - 'cloudinit.cmd.status', - {'_is_cloudinit_disabled': (False, ''), - 'Init': {'side_effect': self.init_class}}, - status.handle_status_args, 'ignored', cmdargs) - self.assertEqual(0, retcode) - expected = dedent('''\ - status: running - time: Thu, 01 Jan 1970 00:02:04 +0000 - detail: - Running in stage: init - ''') - self.assertEqual(expected, m_stdout.getvalue()) - - def test_status_wait_blocks_until_done(self): - '''Specifying wait will poll every 1/4 second until done state.''' - running_json = { - 'v1': {'stage': 'init', - 'init': {'start': 124.456, 'finished': None}, - 'init-local': {'start': 123.45, 'finished': 123.46}}} - done_json = { - 'v1': {'stage': None, - 'init': {'start': 124.456, 'finished': 125.678}, - 'init-local': {'start': 123.45, 'finished': 123.46}}} - - self.sleep_calls = 0 - - def fake_sleep(interval): - self.assertEqual(0.25, interval) - self.sleep_calls += 1 - if self.sleep_calls == 2: - write_json(self.status_file, running_json) - elif self.sleep_calls == 3: - write_json(self.status_file, done_json) - result_file = self.tmp_path('result.json', self.new_root) - ensure_file(result_file) - - cmdargs = myargs(long=False, wait=True) - with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout: - retcode = wrap_and_call( - 'cloudinit.cmd.status', - {'sleep': {'side_effect': fake_sleep}, - '_is_cloudinit_disabled': (False, ''), - 'Init': {'side_effect': self.init_class}}, - status.handle_status_args, 'ignored', cmdargs) - self.assertEqual(0, retcode) - self.assertEqual(4, self.sleep_calls) - self.assertEqual('....\nstatus: done\n', m_stdout.getvalue()) - - def test_status_wait_blocks_until_error(self): - '''Specifying wait will poll every 1/4 second until error state.''' - running_json = { - 'v1': {'stage': 'init', - 'init': {'start': 124.456, 'finished': None}, - 'init-local': {'start': 123.45, 'finished': 123.46}}} - error_json = { - 'v1': {'stage': None, - 'init': {'errors': ['error1'], 'start': 124.456, - 'finished': 125.678}, - 'init-local': {'start': 123.45, 'finished': 123.46}}} - - self.sleep_calls = 0 - - def fake_sleep(interval): - self.assertEqual(0.25, interval) - self.sleep_calls += 1 - if self.sleep_calls == 2: - write_json(self.status_file, running_json) - elif self.sleep_calls == 3: - write_json(self.status_file, error_json) - - cmdargs = myargs(long=False, wait=True) - with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout: - retcode = wrap_and_call( - 'cloudinit.cmd.status', - {'sleep': {'side_effect': fake_sleep}, - '_is_cloudinit_disabled': (False, ''), - 'Init': {'side_effect': self.init_class}}, - status.handle_status_args, 'ignored', cmdargs) - self.assertEqual(1, retcode) - self.assertEqual(4, self.sleep_calls) - self.assertEqual('....\nstatus: error\n', m_stdout.getvalue()) - - def test_status_main(self): - '''status.main can be run as a standalone script.''' - write_json(self.status_file, - {'v1': {'init': {'start': 1, 'finished': None}}}) - with self.assertRaises(SystemExit) as context_manager: - with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout: - wrap_and_call( - 'cloudinit.cmd.status', - {'sys.argv': {'new': ['status']}, - '_is_cloudinit_disabled': (False, ''), - 'Init': {'side_effect': self.init_class}}, - status.main) - self.assertEqual(0, context_manager.exception.code) - self.assertEqual('status: running\n', m_stdout.getvalue()) - -# vi: ts=4 expandtab syntax=python |