summaryrefslogtreecommitdiff
path: root/cloudinit/cmd/tests
diff options
context:
space:
mode:
Diffstat (limited to 'cloudinit/cmd/tests')
-rw-r--r--cloudinit/cmd/tests/__init__.py0
-rw-r--r--cloudinit/cmd/tests/test_clean.py178
-rw-r--r--cloudinit/cmd/tests/test_cloud_id.py127
-rw-r--r--cloudinit/cmd/tests/test_main.py188
-rw-r--r--cloudinit/cmd/tests/test_query.py392
-rw-r--r--cloudinit/cmd/tests/test_status.py391
6 files changed, 0 insertions, 1276 deletions
diff --git a/cloudinit/cmd/tests/__init__.py b/cloudinit/cmd/tests/__init__.py
deleted file mode 100644
index e69de29b..00000000
--- a/cloudinit/cmd/tests/__init__.py
+++ /dev/null
diff --git a/cloudinit/cmd/tests/test_clean.py b/cloudinit/cmd/tests/test_clean.py
deleted file mode 100644
index a848a810..00000000
--- a/cloudinit/cmd/tests/test_clean.py
+++ /dev/null
@@ -1,178 +0,0 @@
-# This file is part of cloud-init. See LICENSE file for license information.
-
-from cloudinit.cmd import clean
-from cloudinit.util import ensure_dir, sym_link, write_file
-from cloudinit.tests.helpers import CiTestCase, wrap_and_call, mock
-from collections import namedtuple
-import os
-from io import StringIO
-
-mypaths = namedtuple('MyPaths', 'cloud_dir')
-
-
-class TestClean(CiTestCase):
-
- def setUp(self):
- super(TestClean, self).setUp()
- self.new_root = self.tmp_dir()
- self.artifact_dir = self.tmp_path('artifacts', self.new_root)
- self.log1 = self.tmp_path('cloud-init.log', self.new_root)
- self.log2 = self.tmp_path('cloud-init-output.log', self.new_root)
-
- class FakeInit(object):
- cfg = {'def_log_file': self.log1,
- 'output': {'all': '|tee -a {0}'.format(self.log2)}}
- # Ensure cloud_dir has a trailing slash, to match real behaviour
- paths = mypaths(cloud_dir='{}/'.format(self.artifact_dir))
-
- def __init__(self, ds_deps):
- pass
-
- def read_cfg(self):
- pass
-
- self.init_class = FakeInit
-
- def test_remove_artifacts_removes_logs(self):
- """remove_artifacts removes logs when remove_logs is True."""
- write_file(self.log1, 'cloud-init-log')
- write_file(self.log2, 'cloud-init-output-log')
-
- self.assertFalse(
- os.path.exists(self.artifact_dir), 'Unexpected artifacts dir')
- retcode = wrap_and_call(
- 'cloudinit.cmd.clean',
- {'Init': {'side_effect': self.init_class}},
- clean.remove_artifacts, remove_logs=True)
- self.assertFalse(os.path.exists(self.log1), 'Unexpected file')
- self.assertFalse(os.path.exists(self.log2), 'Unexpected file')
- self.assertEqual(0, retcode)
-
- def test_remove_artifacts_preserves_logs(self):
- """remove_artifacts leaves logs when remove_logs is False."""
- write_file(self.log1, 'cloud-init-log')
- write_file(self.log2, 'cloud-init-output-log')
-
- retcode = wrap_and_call(
- 'cloudinit.cmd.clean',
- {'Init': {'side_effect': self.init_class}},
- clean.remove_artifacts, remove_logs=False)
- self.assertTrue(os.path.exists(self.log1), 'Missing expected file')
- self.assertTrue(os.path.exists(self.log2), 'Missing expected file')
- self.assertEqual(0, retcode)
-
- def test_remove_artifacts_removes_unlinks_symlinks(self):
- """remove_artifacts cleans artifacts dir unlinking any symlinks."""
- dir1 = os.path.join(self.artifact_dir, 'dir1')
- ensure_dir(dir1)
- symlink = os.path.join(self.artifact_dir, 'mylink')
- sym_link(dir1, symlink)
-
- retcode = wrap_and_call(
- 'cloudinit.cmd.clean',
- {'Init': {'side_effect': self.init_class}},
- clean.remove_artifacts, remove_logs=False)
- self.assertEqual(0, retcode)
- for path in (dir1, symlink):
- self.assertFalse(
- os.path.exists(path),
- 'Unexpected {0} dir'.format(path))
-
- def test_remove_artifacts_removes_artifacts_skipping_seed(self):
- """remove_artifacts cleans artifacts dir with exception of seed dir."""
- dirs = [
- self.artifact_dir,
- os.path.join(self.artifact_dir, 'seed'),
- os.path.join(self.artifact_dir, 'dir1'),
- os.path.join(self.artifact_dir, 'dir2')]
- for _dir in dirs:
- ensure_dir(_dir)
-
- retcode = wrap_and_call(
- 'cloudinit.cmd.clean',
- {'Init': {'side_effect': self.init_class}},
- clean.remove_artifacts, remove_logs=False)
- self.assertEqual(0, retcode)
- for expected_dir in dirs[:2]:
- self.assertTrue(
- os.path.exists(expected_dir),
- 'Missing {0} dir'.format(expected_dir))
- for deleted_dir in dirs[2:]:
- self.assertFalse(
- os.path.exists(deleted_dir),
- 'Unexpected {0} dir'.format(deleted_dir))
-
- def test_remove_artifacts_removes_artifacts_removes_seed(self):
- """remove_artifacts removes seed dir when remove_seed is True."""
- dirs = [
- self.artifact_dir,
- os.path.join(self.artifact_dir, 'seed'),
- os.path.join(self.artifact_dir, 'dir1'),
- os.path.join(self.artifact_dir, 'dir2')]
- for _dir in dirs:
- ensure_dir(_dir)
-
- retcode = wrap_and_call(
- 'cloudinit.cmd.clean',
- {'Init': {'side_effect': self.init_class}},
- clean.remove_artifacts, remove_logs=False, remove_seed=True)
- self.assertEqual(0, retcode)
- self.assertTrue(
- os.path.exists(self.artifact_dir), 'Missing artifact dir')
- for deleted_dir in dirs[1:]:
- self.assertFalse(
- os.path.exists(deleted_dir),
- 'Unexpected {0} dir'.format(deleted_dir))
-
- def test_remove_artifacts_returns_one_on_errors(self):
- """remove_artifacts returns non-zero on failure and prints an error."""
- ensure_dir(self.artifact_dir)
- ensure_dir(os.path.join(self.artifact_dir, 'dir1'))
-
- with mock.patch('sys.stderr', new_callable=StringIO) as m_stderr:
- retcode = wrap_and_call(
- 'cloudinit.cmd.clean',
- {'del_dir': {'side_effect': OSError('oops')},
- 'Init': {'side_effect': self.init_class}},
- clean.remove_artifacts, remove_logs=False)
- self.assertEqual(1, retcode)
- self.assertEqual(
- 'ERROR: Could not remove %s/dir1: oops\n' % self.artifact_dir,
- m_stderr.getvalue())
-
- def test_handle_clean_args_reboots(self):
- """handle_clean_args_reboots when reboot arg is provided."""
-
- called_cmds = []
-
- def fake_subp(cmd, capture):
- called_cmds.append((cmd, capture))
- return '', ''
-
- myargs = namedtuple('MyArgs', 'remove_logs remove_seed reboot')
- cmdargs = myargs(remove_logs=False, remove_seed=False, reboot=True)
- retcode = wrap_and_call(
- 'cloudinit.cmd.clean',
- {'subp': {'side_effect': fake_subp},
- 'Init': {'side_effect': self.init_class}},
- clean.handle_clean_args, name='does not matter', args=cmdargs)
- self.assertEqual(0, retcode)
- self.assertEqual(
- [(['shutdown', '-r', 'now'], False)], called_cmds)
-
- def test_status_main(self):
- '''clean.main can be run as a standalone script.'''
- write_file(self.log1, 'cloud-init-log')
- with self.assertRaises(SystemExit) as context_manager:
- wrap_and_call(
- 'cloudinit.cmd.clean',
- {'Init': {'side_effect': self.init_class},
- 'sys.argv': {'new': ['clean', '--logs']}},
- clean.main)
-
- self.assertEqual(0, context_manager.exception.code)
- self.assertFalse(
- os.path.exists(self.log1), 'Unexpected log {0}'.format(self.log1))
-
-
-# vi: ts=4 expandtab syntax=python
diff --git a/cloudinit/cmd/tests/test_cloud_id.py b/cloudinit/cmd/tests/test_cloud_id.py
deleted file mode 100644
index 3f3727fd..00000000
--- a/cloudinit/cmd/tests/test_cloud_id.py
+++ /dev/null
@@ -1,127 +0,0 @@
-# This file is part of cloud-init. See LICENSE file for license information.
-
-"""Tests for cloud-id command line utility."""
-
-from cloudinit import util
-from collections import namedtuple
-from io import StringIO
-
-from cloudinit.cmd import cloud_id
-
-from cloudinit.tests.helpers import CiTestCase, mock
-
-
-class TestCloudId(CiTestCase):
-
- args = namedtuple('cloudidargs', ('instance_data json long'))
-
- def setUp(self):
- super(TestCloudId, self).setUp()
- self.tmp = self.tmp_dir()
- self.instance_data = self.tmp_path('instance-data.json', dir=self.tmp)
-
- def test_cloud_id_arg_parser_defaults(self):
- """Validate the argument defaults when not provided by the end-user."""
- cmd = ['cloud-id']
- with mock.patch('sys.argv', cmd):
- args = cloud_id.get_parser().parse_args()
- self.assertEqual(
- '/run/cloud-init/instance-data.json',
- args.instance_data)
- self.assertEqual(False, args.long)
- self.assertEqual(False, args.json)
-
- def test_cloud_id_arg_parse_overrides(self):
- """Override argument defaults by specifying values for each param."""
- util.write_file(self.instance_data, '{}')
- cmd = ['cloud-id', '--instance-data', self.instance_data, '--long',
- '--json']
- with mock.patch('sys.argv', cmd):
- args = cloud_id.get_parser().parse_args()
- self.assertEqual(self.instance_data, args.instance_data)
- self.assertEqual(True, args.long)
- self.assertEqual(True, args.json)
-
- def test_cloud_id_missing_instance_data_json(self):
- """Exit error when the provided instance-data.json does not exist."""
- cmd = ['cloud-id', '--instance-data', self.instance_data]
- with mock.patch('sys.argv', cmd):
- with mock.patch('sys.stderr', new_callable=StringIO) as m_stderr:
- with self.assertRaises(SystemExit) as context_manager:
- cloud_id.main()
- self.assertEqual(1, context_manager.exception.code)
- self.assertIn(
- "ERROR: File not found '%s'" % self.instance_data,
- m_stderr.getvalue())
-
- def test_cloud_id_non_json_instance_data(self):
- """Exit error when the provided instance-data.json is not json."""
- cmd = ['cloud-id', '--instance-data', self.instance_data]
- util.write_file(self.instance_data, '{')
- with mock.patch('sys.argv', cmd):
- with mock.patch('sys.stderr', new_callable=StringIO) as m_stderr:
- with self.assertRaises(SystemExit) as context_manager:
- cloud_id.main()
- self.assertEqual(1, context_manager.exception.code)
- self.assertIn(
- "ERROR: File '%s' is not valid json." % self.instance_data,
- m_stderr.getvalue())
-
- def test_cloud_id_from_cloud_name_in_instance_data(self):
- """Report canonical cloud-id from cloud_name in instance-data."""
- util.write_file(
- self.instance_data,
- '{"v1": {"cloud_name": "mycloud", "region": "somereg"}}')
- cmd = ['cloud-id', '--instance-data', self.instance_data]
- with mock.patch('sys.argv', cmd):
- with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout:
- with self.assertRaises(SystemExit) as context_manager:
- cloud_id.main()
- self.assertEqual(0, context_manager.exception.code)
- self.assertEqual("mycloud\n", m_stdout.getvalue())
-
- def test_cloud_id_long_name_from_instance_data(self):
- """Report long cloud-id format from cloud_name and region."""
- util.write_file(
- self.instance_data,
- '{"v1": {"cloud_name": "mycloud", "region": "somereg"}}')
- cmd = ['cloud-id', '--instance-data', self.instance_data, '--long']
- with mock.patch('sys.argv', cmd):
- with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout:
- with self.assertRaises(SystemExit) as context_manager:
- cloud_id.main()
- self.assertEqual(0, context_manager.exception.code)
- self.assertEqual("mycloud\tsomereg\n", m_stdout.getvalue())
-
- def test_cloud_id_lookup_from_instance_data_region(self):
- """Report discovered canonical cloud_id when region lookup matches."""
- util.write_file(
- self.instance_data,
- '{"v1": {"cloud_name": "aws", "region": "cn-north-1",'
- ' "platform": "ec2"}}')
- cmd = ['cloud-id', '--instance-data', self.instance_data, '--long']
- with mock.patch('sys.argv', cmd):
- with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout:
- with self.assertRaises(SystemExit) as context_manager:
- cloud_id.main()
- self.assertEqual(0, context_manager.exception.code)
- self.assertEqual("aws-china\tcn-north-1\n", m_stdout.getvalue())
-
- def test_cloud_id_lookup_json_instance_data_adds_cloud_id_to_json(self):
- """Report v1 instance-data content with cloud_id when --json set."""
- util.write_file(
- self.instance_data,
- '{"v1": {"cloud_name": "unknown", "region": "dfw",'
- ' "platform": "openstack", "public_ssh_keys": []}}')
- expected = util.json_dumps({
- 'cloud_id': 'openstack', 'cloud_name': 'unknown',
- 'platform': 'openstack', 'public_ssh_keys': [], 'region': 'dfw'})
- cmd = ['cloud-id', '--instance-data', self.instance_data, '--json']
- with mock.patch('sys.argv', cmd):
- with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout:
- with self.assertRaises(SystemExit) as context_manager:
- cloud_id.main()
- self.assertEqual(0, context_manager.exception.code)
- self.assertEqual(expected + '\n', m_stdout.getvalue())
-
-# vi: ts=4 expandtab
diff --git a/cloudinit/cmd/tests/test_main.py b/cloudinit/cmd/tests/test_main.py
deleted file mode 100644
index 2e380848..00000000
--- a/cloudinit/cmd/tests/test_main.py
+++ /dev/null
@@ -1,188 +0,0 @@
-# This file is part of cloud-init. See LICENSE file for license information.
-
-from collections import namedtuple
-import copy
-import os
-from io import StringIO
-from unittest import mock
-
-import pytest
-
-from cloudinit.cmd import main
-from cloudinit import safeyaml
-from cloudinit.util import (
- ensure_dir, load_file, write_file)
-from cloudinit.tests.helpers import (
- FilesystemMockingTestCase, wrap_and_call)
-
-mypaths = namedtuple('MyPaths', 'run_dir')
-myargs = namedtuple('MyArgs', 'debug files force local reporter subcommand')
-
-
-class TestMain(FilesystemMockingTestCase):
- with_logs = True
- allowed_subp = False
-
- def setUp(self):
- super(TestMain, self).setUp()
- self.new_root = self.tmp_dir()
- self.cloud_dir = self.tmp_path('var/lib/cloud/', dir=self.new_root)
- os.makedirs(self.cloud_dir)
- self.replicateTestRoot('simple_ubuntu', self.new_root)
- self.cfg = {
- 'datasource_list': ['None'],
- 'runcmd': ['ls /etc'], # test ALL_DISTROS
- 'system_info': {'paths': {'cloud_dir': self.cloud_dir,
- 'run_dir': self.new_root}},
- 'write_files': [
- {
- 'path': '/etc/blah.ini',
- 'content': 'blah',
- 'permissions': 0o755,
- },
- ],
- 'cloud_init_modules': ['write-files', 'runcmd'],
- }
- cloud_cfg = safeyaml.dumps(self.cfg)
- ensure_dir(os.path.join(self.new_root, 'etc', 'cloud'))
- self.cloud_cfg_file = os.path.join(
- self.new_root, 'etc', 'cloud', 'cloud.cfg')
- write_file(self.cloud_cfg_file, cloud_cfg)
- self.patchOS(self.new_root)
- self.patchUtils(self.new_root)
- self.stderr = StringIO()
- self.patchStdoutAndStderr(stderr=self.stderr)
-
- def test_main_init_run_net_stops_on_file_no_net(self):
- """When no-net file is present, main_init does not process modules."""
- stop_file = os.path.join(self.cloud_dir, 'data', 'no-net') # stop file
- write_file(stop_file, '')
- cmdargs = myargs(
- debug=False, files=None, force=False, local=False, reporter=None,
- subcommand='init')
- (_item1, item2) = wrap_and_call(
- 'cloudinit.cmd.main',
- {'util.close_stdin': True,
- 'netinfo.debug_info': 'my net debug info',
- 'util.fixup_output': ('outfmt', 'errfmt')},
- main.main_init, 'init', cmdargs)
- # We should not run write_files module
- self.assertFalse(
- os.path.exists(os.path.join(self.new_root, 'etc/blah.ini')),
- 'Unexpected run of write_files module produced blah.ini')
- self.assertEqual([], item2)
- # Instancify is called
- instance_id_path = 'var/lib/cloud/data/instance-id'
- self.assertFalse(
- os.path.exists(os.path.join(self.new_root, instance_id_path)),
- 'Unexpected call to datasource.instancify produced instance-id')
- expected_logs = [
- "Exiting. stop file ['{stop_file}'] existed\n".format(
- stop_file=stop_file),
- 'my net debug info' # netinfo.debug_info
- ]
- for log in expected_logs:
- self.assertIn(log, self.stderr.getvalue())
-
- def test_main_init_run_net_runs_modules(self):
- """Modules like write_files are run in 'net' mode."""
- cmdargs = myargs(
- debug=False, files=None, force=False, local=False, reporter=None,
- subcommand='init')
- (_item1, item2) = wrap_and_call(
- 'cloudinit.cmd.main',
- {'util.close_stdin': True,
- 'netinfo.debug_info': 'my net debug info',
- 'util.fixup_output': ('outfmt', 'errfmt')},
- main.main_init, 'init', cmdargs)
- self.assertEqual([], item2)
- # Instancify is called
- instance_id_path = 'var/lib/cloud/data/instance-id'
- self.assertEqual(
- 'iid-datasource-none\n',
- os.path.join(load_file(
- os.path.join(self.new_root, instance_id_path))))
- # modules are run (including write_files)
- self.assertEqual(
- 'blah', load_file(os.path.join(self.new_root, 'etc/blah.ini')))
- expected_logs = [
- 'network config is disabled by fallback', # apply_network_config
- 'my net debug info', # netinfo.debug_info
- 'no previous run detected'
- ]
- for log in expected_logs:
- self.assertIn(log, self.stderr.getvalue())
-
- def test_main_init_run_net_calls_set_hostname_when_metadata_present(self):
- """When local-hostname metadata is present, call cc_set_hostname."""
- self.cfg['datasource'] = {
- 'None': {'metadata': {'local-hostname': 'md-hostname'}}}
- cloud_cfg = safeyaml.dumps(self.cfg)
- write_file(self.cloud_cfg_file, cloud_cfg)
- cmdargs = myargs(
- debug=False, files=None, force=False, local=False, reporter=None,
- subcommand='init')
-
- def set_hostname(name, cfg, cloud, log, args):
- self.assertEqual('set-hostname', name)
- updated_cfg = copy.deepcopy(self.cfg)
- updated_cfg.update(
- {'def_log_file': '/var/log/cloud-init.log',
- 'log_cfgs': [],
- 'syslog_fix_perms': [
- 'syslog:adm', 'root:adm', 'root:wheel', 'root:root'
- ],
- 'vendor_data': {'enabled': True, 'prefix': []},
- 'vendor_data2': {'enabled': True, 'prefix': []}})
- updated_cfg.pop('system_info')
-
- self.assertEqual(updated_cfg, cfg)
- self.assertEqual(main.LOG, log)
- self.assertIsNone(args)
-
- (_item1, item2) = wrap_and_call(
- 'cloudinit.cmd.main',
- {'util.close_stdin': True,
- 'netinfo.debug_info': 'my net debug info',
- 'cc_set_hostname.handle': {'side_effect': set_hostname},
- 'util.fixup_output': ('outfmt', 'errfmt')},
- main.main_init, 'init', cmdargs)
- self.assertEqual([], item2)
- # Instancify is called
- instance_id_path = 'var/lib/cloud/data/instance-id'
- self.assertEqual(
- 'iid-datasource-none\n',
- os.path.join(load_file(
- os.path.join(self.new_root, instance_id_path))))
- # modules are run (including write_files)
- self.assertEqual(
- 'blah', load_file(os.path.join(self.new_root, 'etc/blah.ini')))
- expected_logs = [
- 'network config is disabled by fallback', # apply_network_config
- 'my net debug info', # netinfo.debug_info
- 'no previous run detected'
- ]
- for log in expected_logs:
- self.assertIn(log, self.stderr.getvalue())
-
-
-class TestShouldBringUpInterfaces:
- @pytest.mark.parametrize('cfg_disable,args_local,expected', [
- (True, True, False),
- (True, False, False),
- (False, True, False),
- (False, False, True),
- ])
- def test_should_bring_up_interfaces(
- self, cfg_disable, args_local, expected
- ):
- init = mock.Mock()
- init.cfg = {'disable_network_activation': cfg_disable}
-
- args = mock.Mock()
- args.local = args_local
-
- result = main._should_bring_up_interfaces(init, args)
- assert result == expected
-
-# vi: ts=4 expandtab
diff --git a/cloudinit/cmd/tests/test_query.py b/cloudinit/cmd/tests/test_query.py
deleted file mode 100644
index d96c3945..00000000
--- a/cloudinit/cmd/tests/test_query.py
+++ /dev/null
@@ -1,392 +0,0 @@
-# This file is part of cloud-init. See LICENSE file for license information.
-
-import errno
-import gzip
-from io import BytesIO
-import json
-from textwrap import dedent
-
-import pytest
-
-from collections import namedtuple
-from cloudinit.cmd import query
-from cloudinit.helpers import Paths
-from cloudinit.sources import (
- REDACT_SENSITIVE_VALUE, INSTANCE_JSON_FILE, INSTANCE_JSON_SENSITIVE_FILE)
-from cloudinit.tests.helpers import mock
-
-from cloudinit.util import b64e, write_file
-
-
-def _gzip_data(data):
- with BytesIO() as iobuf:
- with gzip.GzipFile(mode="wb", fileobj=iobuf) as gzfp:
- gzfp.write(data)
- return iobuf.getvalue()
-
-
-@mock.patch("cloudinit.cmd.query.addLogHandlerCLI", lambda *args: "")
-class TestQuery:
-
- args = namedtuple(
- 'queryargs',
- ('debug dump_all format instance_data list_keys user_data vendor_data'
- ' varname'))
-
- def _setup_paths(self, tmpdir, ud_val=None, vd_val=None):
- """Write userdata and vendordata into a tmpdir.
-
- Return:
- 4-tuple : (paths, run_dir_path, userdata_path, vendordata_path)
- """
- if ud_val:
- user_data = tmpdir.join('user-data')
- write_file(user_data.strpath, ud_val)
- else:
- user_data = None
- if vd_val:
- vendor_data = tmpdir.join('vendor-data')
- write_file(vendor_data.strpath, vd_val)
- else:
- vendor_data = None
- run_dir = tmpdir.join('run_dir')
- run_dir.ensure_dir()
- return (
- Paths({'run_dir': run_dir.strpath}),
- run_dir,
- user_data,
- vendor_data
- )
-
- def test_handle_args_error_on_missing_param(self, caplog, capsys):
- """Error when missing required parameters and print usage."""
- args = self.args(
- debug=False, dump_all=False, format=None, instance_data=None,
- list_keys=False, user_data=None, vendor_data=None, varname=None)
- with mock.patch(
- "cloudinit.cmd.query.addLogHandlerCLI", return_value=""
- ) as m_cli_log:
- assert 1 == query.handle_args('anyname', args)
- expected_error = (
- 'Expected one of the options: --all, --format, --list-keys'
- ' or varname\n')
- assert expected_error in caplog.text
- out, _err = capsys.readouterr()
- assert 'usage: query' in out
- assert 1 == m_cli_log.call_count
-
- @pytest.mark.parametrize(
- "inst_data,varname,expected_error", (
- (
- '{"v1": {"key-2": "value-2"}}',
- 'v1.absent_leaf',
- "instance-data 'v1' has no 'absent_leaf'\n"
- ),
- (
- '{"v1": {"key-2": "value-2"}}',
- 'absent_key',
- "Undefined instance-data key 'absent_key'\n"
- ),
- )
- )
- def test_handle_args_error_on_invalid_vaname_paths(
- self, inst_data, varname, expected_error, caplog, tmpdir
- ):
- """Error when varname is not a valid instance-data variable path."""
- instance_data = tmpdir.join('instance-data')
- instance_data.write(inst_data)
- args = self.args(
- debug=False, dump_all=False, format=None,
- instance_data=instance_data.strpath,
- list_keys=False, user_data=None, vendor_data=None, varname=varname
- )
- paths, _, _, _ = self._setup_paths(tmpdir)
- with mock.patch('cloudinit.cmd.query.read_cfg_paths') as m_paths:
- m_paths.return_value = paths
- with mock.patch(
- "cloudinit.cmd.query.addLogHandlerCLI", return_value=""
- ):
- assert 1 == query.handle_args('anyname', args)
- assert expected_error in caplog.text
-
- def test_handle_args_error_on_missing_instance_data(self, caplog, tmpdir):
- """When instance_data file path does not exist, log an error."""
- absent_fn = tmpdir.join('absent')
- args = self.args(
- debug=False, dump_all=True, format=None,
- instance_data=absent_fn.strpath,
- list_keys=False, user_data='ud', vendor_data='vd', varname=None)
- assert 1 == query.handle_args('anyname', args)
-
- msg = 'Missing instance-data file: %s' % absent_fn
- assert msg in caplog.text
-
- def test_handle_args_error_when_no_read_permission_instance_data(
- self, caplog, tmpdir
- ):
- """When instance_data file is unreadable, log an error."""
- noread_fn = tmpdir.join('unreadable')
- noread_fn.write('thou shall not pass')
- args = self.args(
- debug=False, dump_all=True, format=None,
- instance_data=noread_fn.strpath,
- list_keys=False, user_data='ud', vendor_data='vd', varname=None)
- with mock.patch('cloudinit.cmd.query.util.load_file') as m_load:
- m_load.side_effect = OSError(errno.EACCES, 'Not allowed')
- assert 1 == query.handle_args('anyname', args)
- msg = "No read permission on '%s'. Try sudo" % noread_fn
- assert msg in caplog.text
-
- def test_handle_args_defaults_instance_data(self, caplog, tmpdir):
- """When no instance_data argument, default to configured run_dir."""
- args = self.args(
- debug=False, dump_all=True, format=None, instance_data=None,
- list_keys=False, user_data=None, vendor_data=None, varname=None)
- paths, run_dir, _, _ = self._setup_paths(tmpdir)
- with mock.patch('cloudinit.cmd.query.read_cfg_paths') as m_paths:
- m_paths.return_value = paths
- assert 1 == query.handle_args('anyname', args)
- json_file = run_dir.join(INSTANCE_JSON_FILE)
- msg = 'Missing instance-data file: %s' % json_file.strpath
- assert msg in caplog.text
-
- def test_handle_args_root_fallsback_to_instance_data(self, caplog, tmpdir):
- """When no instance_data argument, root falls back to redacted json."""
- args = self.args(
- debug=False, dump_all=True, format=None, instance_data=None,
- list_keys=False, user_data=None, vendor_data=None, varname=None)
- paths, run_dir, _, _ = self._setup_paths(tmpdir)
- with mock.patch('cloudinit.cmd.query.read_cfg_paths') as m_paths:
- m_paths.return_value = paths
- with mock.patch('os.getuid') as m_getuid:
- m_getuid.return_value = 0
- assert 1 == query.handle_args('anyname', args)
- json_file = run_dir.join(INSTANCE_JSON_FILE)
- sensitive_file = run_dir.join(INSTANCE_JSON_SENSITIVE_FILE)
- msg = (
- 'Missing root-readable %s. Using redacted %s instead.' %
- (
- sensitive_file.strpath, json_file.strpath
- )
- )
- assert msg in caplog.text
-
- @pytest.mark.parametrize(
- 'ud_src,ud_expected,vd_src,vd_expected',
- (
- ('hi mom', 'hi mom', 'hi pops', 'hi pops'),
- ('ud'.encode('utf-8'), 'ud', 'vd'.encode('utf-8'), 'vd'),
- (_gzip_data(b'ud'), 'ud', _gzip_data(b'vd'), 'vd'),
- (_gzip_data('ud'.encode('utf-8')), 'ud', _gzip_data(b'vd'), 'vd'),
- )
- )
- def test_handle_args_root_processes_user_data(
- self, ud_src, ud_expected, vd_src, vd_expected, capsys, tmpdir
- ):
- """Support reading multiple user-data file content types"""
- paths, run_dir, user_data, vendor_data = self._setup_paths(
- tmpdir, ud_val=ud_src, vd_val=vd_src
- )
- sensitive_file = run_dir.join(INSTANCE_JSON_SENSITIVE_FILE)
- sensitive_file.write('{"my-var": "it worked"}')
- args = self.args(
- debug=False, dump_all=True, format=None, instance_data=None,
- list_keys=False, user_data=user_data.strpath,
- vendor_data=vendor_data.strpath, varname=None)
- with mock.patch('cloudinit.cmd.query.read_cfg_paths') as m_paths:
- m_paths.return_value = paths
- with mock.patch('os.getuid') as m_getuid:
- m_getuid.return_value = 0
- assert 0 == query.handle_args('anyname', args)
- out, _err = capsys.readouterr()
- cmd_output = json.loads(out)
- assert "it worked" == cmd_output['my-var']
- if ud_expected == "ci-b64:":
- ud_expected = "ci-b64:{}".format(b64e(ud_src))
- if vd_expected == "ci-b64:":
- vd_expected = "ci-b64:{}".format(b64e(vd_src))
- assert ud_expected == cmd_output['userdata']
- assert vd_expected == cmd_output['vendordata']
-
- def test_handle_args_root_uses_instance_sensitive_data(
- self, capsys, tmpdir
- ):
- """When no instance_data argument, root uses sensitive json."""
- paths, run_dir, user_data, vendor_data = self._setup_paths(
- tmpdir, ud_val='ud', vd_val='vd'
- )
- sensitive_file = run_dir.join(INSTANCE_JSON_SENSITIVE_FILE)
- sensitive_file.write('{"my-var": "it worked"}')
- args = self.args(
- debug=False, dump_all=True, format=None, instance_data=None,
- list_keys=False, user_data=user_data.strpath,
- vendor_data=vendor_data.strpath, varname=None)
- with mock.patch('cloudinit.cmd.query.read_cfg_paths') as m_paths:
- m_paths.return_value = paths
- with mock.patch('os.getuid') as m_getuid:
- m_getuid.return_value = 0
- assert 0 == query.handle_args('anyname', args)
- expected = (
- '{\n "my-var": "it worked",\n '
- '"userdata": "ud",\n "vendordata": "vd"\n}\n'
- )
- out, _err = capsys.readouterr()
- assert expected == out
-
- def test_handle_args_dumps_all_instance_data(self, capsys, tmpdir):
- """When --all is specified query will dump all instance data vars."""
- instance_data = tmpdir.join('instance-data')
- instance_data.write('{"my-var": "it worked"}')
- args = self.args(
- debug=False, dump_all=True, format=None,
- instance_data=instance_data.strpath, list_keys=False,
- user_data='ud', vendor_data='vd', varname=None)
- with mock.patch('os.getuid') as m_getuid:
- m_getuid.return_value = 100
- assert 0 == query.handle_args('anyname', args)
- expected = (
- '{\n "my-var": "it worked",\n "userdata": "<%s> file:ud",\n'
- ' "vendordata": "<%s> file:vd"\n}\n' % (
- REDACT_SENSITIVE_VALUE, REDACT_SENSITIVE_VALUE
- )
- )
- out, _err = capsys.readouterr()
- assert expected == out
-
- def test_handle_args_returns_top_level_varname(self, capsys, tmpdir):
- """When the argument varname is passed, report its value."""
- instance_data = tmpdir.join('instance-data')
- instance_data.write('{"my-var": "it worked"}')
- args = self.args(
- debug=False, dump_all=True, format=None,
- instance_data=instance_data.strpath, list_keys=False,
- user_data='ud', vendor_data='vd', varname='my_var')
- with mock.patch('os.getuid') as m_getuid:
- m_getuid.return_value = 100
- assert 0 == query.handle_args('anyname', args)
- out, _err = capsys.readouterr()
- assert 'it worked\n' == out
-
- @pytest.mark.parametrize(
- 'inst_data,varname,expected',
- (
- (
- '{"v1": {"key-2": "value-2"}, "my-var": "it worked"}',
- 'v1.key_2',
- 'value-2\n'
- ),
- # Assert no jinja underscore-delimited aliases are reported on CLI
- (
- '{"v1": {"something-hyphenated": {"no.underscores":"x",'
- ' "no-alias": "y"}}, "my-var": "it worked"}',
- 'v1.something_hyphenated',
- '{\n "no-alias": "y",\n "no.underscores": "x"\n}\n'
- ),
- )
- )
- def test_handle_args_returns_nested_varname(
- self, inst_data, varname, expected, capsys, tmpdir
- ):
- """If user_data file is a jinja template render instance-data vars."""
- instance_data = tmpdir.join('instance-data')
- instance_data.write(inst_data)
- args = self.args(
- debug=False, dump_all=False, format=None,
- instance_data=instance_data.strpath, user_data='ud',
- vendor_data='vd', list_keys=False, varname=varname)
- with mock.patch('os.getuid') as m_getuid:
- m_getuid.return_value = 100
- assert 0 == query.handle_args('anyname', args)
- out, _err = capsys.readouterr()
- assert expected == out
-
- def test_handle_args_returns_standardized_vars_to_top_level_aliases(
- self, capsys, tmpdir
- ):
- """Any standardized vars under v# are promoted as top-level aliases."""
- instance_data = tmpdir.join('instance-data')
- instance_data.write(
- '{"v1": {"v1_1": "val1.1"}, "v2": {"v2_2": "val2.2"},'
- ' "top": "gun"}')
- expected = dedent("""\
- {
- "top": "gun",
- "userdata": "<redacted for non-root user> file:ud",
- "v1": {
- "v1_1": "val1.1"
- },
- "v1_1": "val1.1",
- "v2": {
- "v2_2": "val2.2"
- },
- "v2_2": "val2.2",
- "vendordata": "<redacted for non-root user> file:vd"
- }
- """)
- args = self.args(
- debug=False, dump_all=True, format=None,
- instance_data=instance_data.strpath, user_data='ud',
- vendor_data='vd', list_keys=False, varname=None)
- with mock.patch('os.getuid') as m_getuid:
- m_getuid.return_value = 100
- assert 0 == query.handle_args('anyname', args)
- out, _err = capsys.readouterr()
- assert expected == out
-
- def test_handle_args_list_keys_sorts_top_level_keys_when_no_varname(
- self, capsys, tmpdir
- ):
- """Sort all top-level keys when only --list-keys provided."""
- instance_data = tmpdir.join('instance-data')
- instance_data.write(
- '{"v1": {"v1_1": "val1.1"}, "v2": {"v2_2": "val2.2"},'
- ' "top": "gun"}')
- expected = 'top\nuserdata\nv1\nv1_1\nv2\nv2_2\nvendordata\n'
- args = self.args(
- debug=False, dump_all=False, format=None,
- instance_data=instance_data.strpath, list_keys=True,
- user_data='ud', vendor_data='vd', varname=None)
- with mock.patch('os.getuid') as m_getuid:
- m_getuid.return_value = 100
- assert 0 == query.handle_args('anyname', args)
- out, _err = capsys.readouterr()
- assert expected == out
-
- def test_handle_args_list_keys_sorts_nested_keys_when_varname(
- self, capsys, tmpdir
- ):
- """Sort all nested keys of varname object when --list-keys provided."""
- instance_data = tmpdir.join('instance-data')
- instance_data.write(
- '{"v1": {"v1_1": "val1.1", "v1_2": "val1.2"}, "v2":' +
- ' {"v2_2": "val2.2"}, "top": "gun"}')
- expected = 'v1_1\nv1_2\n'
- args = self.args(
- debug=False, dump_all=False, format=None,
- instance_data=instance_data.strpath, list_keys=True,
- user_data='ud', vendor_data='vd', varname='v1')
- with mock.patch('os.getuid') as m_getuid:
- m_getuid.return_value = 100
- assert 0 == query.handle_args('anyname', args)
- out, _err = capsys.readouterr()
- assert expected == out
-
- def test_handle_args_list_keys_errors_when_varname_is_not_a_dict(
- self, caplog, tmpdir
- ):
- """Raise an error when --list-keys and varname specify a non-list."""
- instance_data = tmpdir.join('instance-data')
- instance_data.write(
- '{"v1": {"v1_1": "val1.1", "v1_2": "val1.2"}, "v2": ' +
- '{"v2_2": "val2.2"}, "top": "gun"}')
- expected_error = "--list-keys provided but 'top' is not a dict"
- args = self.args(
- debug=False, dump_all=False, format=None,
- instance_data=instance_data.strpath, list_keys=True,
- user_data='ud', vendor_data='vd', varname='top')
- with mock.patch('os.getuid') as m_getuid:
- m_getuid.return_value = 100
- assert 1 == query.handle_args('anyname', args)
- assert expected_error in caplog.text
-
-# vi: ts=4 expandtab
diff --git a/cloudinit/cmd/tests/test_status.py b/cloudinit/cmd/tests/test_status.py
deleted file mode 100644
index 1c9eec37..00000000
--- a/cloudinit/cmd/tests/test_status.py
+++ /dev/null
@@ -1,391 +0,0 @@
-# This file is part of cloud-init. See LICENSE file for license information.
-
-from collections import namedtuple
-import os
-from io import StringIO
-from textwrap import dedent
-
-from cloudinit.atomic_helper import write_json
-from cloudinit.cmd import status
-from cloudinit.util import ensure_file
-from cloudinit.tests.helpers import CiTestCase, wrap_and_call, mock
-
-mypaths = namedtuple('MyPaths', 'run_dir')
-myargs = namedtuple('MyArgs', 'long wait')
-
-
-class TestStatus(CiTestCase):
-
- def setUp(self):
- super(TestStatus, self).setUp()
- self.new_root = self.tmp_dir()
- self.status_file = self.tmp_path('status.json', self.new_root)
- self.disable_file = self.tmp_path('cloudinit-disable', self.new_root)
- self.paths = mypaths(run_dir=self.new_root)
-
- class FakeInit(object):
- paths = self.paths
-
- def __init__(self, ds_deps):
- pass
-
- def read_cfg(self):
- pass
-
- self.init_class = FakeInit
-
- def test__is_cloudinit_disabled_false_on_sysvinit(self):
- '''When not in an environment using systemd, return False.'''
- ensure_file(self.disable_file) # Create the ignored disable file
- (is_disabled, reason) = wrap_and_call(
- 'cloudinit.cmd.status',
- {'uses_systemd': False,
- 'get_cmdline': "root=/dev/my-root not-important"},
- status._is_cloudinit_disabled, self.disable_file, self.paths)
- self.assertFalse(
- is_disabled, 'expected enabled cloud-init on sysvinit')
- self.assertEqual('Cloud-init enabled on sysvinit', reason)
-
- def test__is_cloudinit_disabled_true_on_disable_file(self):
- '''When using systemd and disable_file is present return disabled.'''
- ensure_file(self.disable_file) # Create observed disable file
- (is_disabled, reason) = wrap_and_call(
- 'cloudinit.cmd.status',
- {'uses_systemd': True,
- 'get_cmdline': "root=/dev/my-root not-important"},
- status._is_cloudinit_disabled, self.disable_file, self.paths)
- self.assertTrue(is_disabled, 'expected disabled cloud-init')
- self.assertEqual(
- 'Cloud-init disabled by {0}'.format(self.disable_file), reason)
-
- def test__is_cloudinit_disabled_false_on_kernel_cmdline_enable(self):
- '''Not disabled when using systemd and enabled via commandline.'''
- ensure_file(self.disable_file) # Create ignored disable file
- (is_disabled, reason) = wrap_and_call(
- 'cloudinit.cmd.status',
- {'uses_systemd': True,
- 'get_cmdline': 'something cloud-init=enabled else'},
- status._is_cloudinit_disabled, self.disable_file, self.paths)
- self.assertFalse(is_disabled, 'expected enabled cloud-init')
- self.assertEqual(
- 'Cloud-init enabled by kernel command line cloud-init=enabled',
- reason)
-
- def test__is_cloudinit_disabled_true_on_kernel_cmdline(self):
- '''When using systemd and disable_file is present return disabled.'''
- (is_disabled, reason) = wrap_and_call(
- 'cloudinit.cmd.status',
- {'uses_systemd': True,
- 'get_cmdline': 'something cloud-init=disabled else'},
- status._is_cloudinit_disabled, self.disable_file, self.paths)
- self.assertTrue(is_disabled, 'expected disabled cloud-init')
- self.assertEqual(
- 'Cloud-init disabled by kernel parameter cloud-init=disabled',
- reason)
-
- def test__is_cloudinit_disabled_true_when_generator_disables(self):
- '''When cloud-init-generator doesn't write enabled file return True.'''
- enabled_file = os.path.join(self.paths.run_dir, 'enabled')
- self.assertFalse(os.path.exists(enabled_file))
- (is_disabled, reason) = wrap_and_call(
- 'cloudinit.cmd.status',
- {'uses_systemd': True,
- 'get_cmdline': 'something'},
- status._is_cloudinit_disabled, self.disable_file, self.paths)
- self.assertTrue(is_disabled, 'expected disabled cloud-init')
- self.assertEqual('Cloud-init disabled by cloud-init-generator', reason)
-
- def test__is_cloudinit_disabled_false_when_enabled_in_systemd(self):
- '''Report enabled when systemd generator creates the enabled file.'''
- enabled_file = os.path.join(self.paths.run_dir, 'enabled')
- ensure_file(enabled_file)
- (is_disabled, reason) = wrap_and_call(
- 'cloudinit.cmd.status',
- {'uses_systemd': True,
- 'get_cmdline': 'something ignored'},
- status._is_cloudinit_disabled, self.disable_file, self.paths)
- self.assertFalse(is_disabled, 'expected enabled cloud-init')
- self.assertEqual(
- 'Cloud-init enabled by systemd cloud-init-generator', reason)
-
- def test_status_returns_not_run(self):
- '''When status.json does not exist yet, return 'not run'.'''
- self.assertFalse(
- os.path.exists(self.status_file), 'Unexpected status.json found')
- cmdargs = myargs(long=False, wait=False)
- with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout:
- retcode = wrap_and_call(
- 'cloudinit.cmd.status',
- {'_is_cloudinit_disabled': (False, ''),
- 'Init': {'side_effect': self.init_class}},
- status.handle_status_args, 'ignored', cmdargs)
- self.assertEqual(0, retcode)
- self.assertEqual('status: not run\n', m_stdout.getvalue())
-
- def test_status_returns_disabled_long_on_presence_of_disable_file(self):
- '''When cloudinit is disabled, return disabled reason.'''
-
- checked_files = []
-
- def fakeexists(filepath):
- checked_files.append(filepath)
- status_file = os.path.join(self.paths.run_dir, 'status.json')
- return bool(not filepath == status_file)
-
- cmdargs = myargs(long=True, wait=False)
- with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout:
- retcode = wrap_and_call(
- 'cloudinit.cmd.status',
- {'os.path.exists': {'side_effect': fakeexists},
- '_is_cloudinit_disabled': (True, 'disabled for some reason'),
- 'Init': {'side_effect': self.init_class}},
- status.handle_status_args, 'ignored', cmdargs)
- self.assertEqual(0, retcode)
- self.assertEqual(
- [os.path.join(self.paths.run_dir, 'status.json')],
- checked_files)
- expected = dedent('''\
- status: disabled
- detail:
- disabled for some reason
- ''')
- self.assertEqual(expected, m_stdout.getvalue())
-
- def test_status_returns_running_on_no_results_json(self):
- '''Report running when status.json exists but result.json does not.'''
- result_file = self.tmp_path('result.json', self.new_root)
- write_json(self.status_file, {})
- self.assertFalse(
- os.path.exists(result_file), 'Unexpected result.json found')
- cmdargs = myargs(long=False, wait=False)
- with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout:
- retcode = wrap_and_call(
- 'cloudinit.cmd.status',
- {'_is_cloudinit_disabled': (False, ''),
- 'Init': {'side_effect': self.init_class}},
- status.handle_status_args, 'ignored', cmdargs)
- self.assertEqual(0, retcode)
- self.assertEqual('status: running\n', m_stdout.getvalue())
-
- def test_status_returns_running(self):
- '''Report running when status exists with an unfinished stage.'''
- ensure_file(self.tmp_path('result.json', self.new_root))
- write_json(self.status_file,
- {'v1': {'init': {'start': 1, 'finished': None}}})
- cmdargs = myargs(long=False, wait=False)
- with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout:
- retcode = wrap_and_call(
- 'cloudinit.cmd.status',
- {'_is_cloudinit_disabled': (False, ''),
- 'Init': {'side_effect': self.init_class}},
- status.handle_status_args, 'ignored', cmdargs)
- self.assertEqual(0, retcode)
- self.assertEqual('status: running\n', m_stdout.getvalue())
-
- def test_status_returns_done(self):
- '''Report done results.json exists no stages are unfinished.'''
- ensure_file(self.tmp_path('result.json', self.new_root))
- write_json(
- self.status_file,
- {'v1': {'stage': None, # No current stage running
- 'datasource': (
- 'DataSourceNoCloud [seed=/var/.../seed/nocloud-net]'
- '[dsmode=net]'),
- 'blah': {'finished': 123.456},
- 'init': {'errors': [], 'start': 124.567,
- 'finished': 125.678},
- 'init-local': {'start': 123.45, 'finished': 123.46}}})
- cmdargs = myargs(long=False, wait=False)
- with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout:
- retcode = wrap_and_call(
- 'cloudinit.cmd.status',
- {'_is_cloudinit_disabled': (False, ''),
- 'Init': {'side_effect': self.init_class}},
- status.handle_status_args, 'ignored', cmdargs)
- self.assertEqual(0, retcode)
- self.assertEqual('status: done\n', m_stdout.getvalue())
-
- def test_status_returns_done_long(self):
- '''Long format of done status includes datasource info.'''
- ensure_file(self.tmp_path('result.json', self.new_root))
- write_json(
- self.status_file,
- {'v1': {'stage': None,
- 'datasource': (
- 'DataSourceNoCloud [seed=/var/.../seed/nocloud-net]'
- '[dsmode=net]'),
- 'init': {'start': 124.567, 'finished': 125.678},
- 'init-local': {'start': 123.45, 'finished': 123.46}}})
- cmdargs = myargs(long=True, wait=False)
- with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout:
- retcode = wrap_and_call(
- 'cloudinit.cmd.status',
- {'_is_cloudinit_disabled': (False, ''),
- 'Init': {'side_effect': self.init_class}},
- status.handle_status_args, 'ignored', cmdargs)
- self.assertEqual(0, retcode)
- expected = dedent('''\
- status: done
- time: Thu, 01 Jan 1970 00:02:05 +0000
- detail:
- DataSourceNoCloud [seed=/var/.../seed/nocloud-net][dsmode=net]
- ''')
- self.assertEqual(expected, m_stdout.getvalue())
-
- def test_status_on_errors(self):
- '''Reports error when any stage has errors.'''
- write_json(
- self.status_file,
- {'v1': {'stage': None,
- 'blah': {'errors': [], 'finished': 123.456},
- 'init': {'errors': ['error1'], 'start': 124.567,
- 'finished': 125.678},
- 'init-local': {'start': 123.45, 'finished': 123.46}}})
- cmdargs = myargs(long=False, wait=False)
- with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout:
- retcode = wrap_and_call(
- 'cloudinit.cmd.status',
- {'_is_cloudinit_disabled': (False, ''),
- 'Init': {'side_effect': self.init_class}},
- status.handle_status_args, 'ignored', cmdargs)
- self.assertEqual(1, retcode)
- self.assertEqual('status: error\n', m_stdout.getvalue())
-
- def test_status_on_errors_long(self):
- '''Long format of error status includes all error messages.'''
- write_json(
- self.status_file,
- {'v1': {'stage': None,
- 'datasource': (
- 'DataSourceNoCloud [seed=/var/.../seed/nocloud-net]'
- '[dsmode=net]'),
- 'init': {'errors': ['error1'], 'start': 124.567,
- 'finished': 125.678},
- 'init-local': {'errors': ['error2', 'error3'],
- 'start': 123.45, 'finished': 123.46}}})
- cmdargs = myargs(long=True, wait=False)
- with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout:
- retcode = wrap_and_call(
- 'cloudinit.cmd.status',
- {'_is_cloudinit_disabled': (False, ''),
- 'Init': {'side_effect': self.init_class}},
- status.handle_status_args, 'ignored', cmdargs)
- self.assertEqual(1, retcode)
- expected = dedent('''\
- status: error
- time: Thu, 01 Jan 1970 00:02:05 +0000
- detail:
- error1
- error2
- error3
- ''')
- self.assertEqual(expected, m_stdout.getvalue())
-
- def test_status_returns_running_long_format(self):
- '''Long format reports the stage in which we are running.'''
- write_json(
- self.status_file,
- {'v1': {'stage': 'init',
- 'init': {'start': 124.456, 'finished': None},
- 'init-local': {'start': 123.45, 'finished': 123.46}}})
- cmdargs = myargs(long=True, wait=False)
- with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout:
- retcode = wrap_and_call(
- 'cloudinit.cmd.status',
- {'_is_cloudinit_disabled': (False, ''),
- 'Init': {'side_effect': self.init_class}},
- status.handle_status_args, 'ignored', cmdargs)
- self.assertEqual(0, retcode)
- expected = dedent('''\
- status: running
- time: Thu, 01 Jan 1970 00:02:04 +0000
- detail:
- Running in stage: init
- ''')
- self.assertEqual(expected, m_stdout.getvalue())
-
- def test_status_wait_blocks_until_done(self):
- '''Specifying wait will poll every 1/4 second until done state.'''
- running_json = {
- 'v1': {'stage': 'init',
- 'init': {'start': 124.456, 'finished': None},
- 'init-local': {'start': 123.45, 'finished': 123.46}}}
- done_json = {
- 'v1': {'stage': None,
- 'init': {'start': 124.456, 'finished': 125.678},
- 'init-local': {'start': 123.45, 'finished': 123.46}}}
-
- self.sleep_calls = 0
-
- def fake_sleep(interval):
- self.assertEqual(0.25, interval)
- self.sleep_calls += 1
- if self.sleep_calls == 2:
- write_json(self.status_file, running_json)
- elif self.sleep_calls == 3:
- write_json(self.status_file, done_json)
- result_file = self.tmp_path('result.json', self.new_root)
- ensure_file(result_file)
-
- cmdargs = myargs(long=False, wait=True)
- with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout:
- retcode = wrap_and_call(
- 'cloudinit.cmd.status',
- {'sleep': {'side_effect': fake_sleep},
- '_is_cloudinit_disabled': (False, ''),
- 'Init': {'side_effect': self.init_class}},
- status.handle_status_args, 'ignored', cmdargs)
- self.assertEqual(0, retcode)
- self.assertEqual(4, self.sleep_calls)
- self.assertEqual('....\nstatus: done\n', m_stdout.getvalue())
-
- def test_status_wait_blocks_until_error(self):
- '''Specifying wait will poll every 1/4 second until error state.'''
- running_json = {
- 'v1': {'stage': 'init',
- 'init': {'start': 124.456, 'finished': None},
- 'init-local': {'start': 123.45, 'finished': 123.46}}}
- error_json = {
- 'v1': {'stage': None,
- 'init': {'errors': ['error1'], 'start': 124.456,
- 'finished': 125.678},
- 'init-local': {'start': 123.45, 'finished': 123.46}}}
-
- self.sleep_calls = 0
-
- def fake_sleep(interval):
- self.assertEqual(0.25, interval)
- self.sleep_calls += 1
- if self.sleep_calls == 2:
- write_json(self.status_file, running_json)
- elif self.sleep_calls == 3:
- write_json(self.status_file, error_json)
-
- cmdargs = myargs(long=False, wait=True)
- with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout:
- retcode = wrap_and_call(
- 'cloudinit.cmd.status',
- {'sleep': {'side_effect': fake_sleep},
- '_is_cloudinit_disabled': (False, ''),
- 'Init': {'side_effect': self.init_class}},
- status.handle_status_args, 'ignored', cmdargs)
- self.assertEqual(1, retcode)
- self.assertEqual(4, self.sleep_calls)
- self.assertEqual('....\nstatus: error\n', m_stdout.getvalue())
-
- def test_status_main(self):
- '''status.main can be run as a standalone script.'''
- write_json(self.status_file,
- {'v1': {'init': {'start': 1, 'finished': None}}})
- with self.assertRaises(SystemExit) as context_manager:
- with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout:
- wrap_and_call(
- 'cloudinit.cmd.status',
- {'sys.argv': {'new': ['status']},
- '_is_cloudinit_disabled': (False, ''),
- 'Init': {'side_effect': self.init_class}},
- status.main)
- self.assertEqual(0, context_manager.exception.code)
- self.assertEqual('status: running\n', m_stdout.getvalue())
-
-# vi: ts=4 expandtab syntax=python