summaryrefslogtreecommitdiff
path: root/cloudinit/cmd/tests/test_query.py
diff options
context:
space:
mode:
Diffstat (limited to 'cloudinit/cmd/tests/test_query.py')
-rw-r--r--cloudinit/cmd/tests/test_query.py341
1 files changed, 0 insertions, 341 deletions
diff --git a/cloudinit/cmd/tests/test_query.py b/cloudinit/cmd/tests/test_query.py
deleted file mode 100644
index c258d321..00000000
--- a/cloudinit/cmd/tests/test_query.py
+++ /dev/null
@@ -1,341 +0,0 @@
-# This file is part of cloud-init. See LICENSE file for license information.
-
-import errno
-import gzip
-from io import BytesIO
-import json
-from textwrap import dedent
-
-import pytest
-
-from collections import namedtuple
-from cloudinit.cmd import query
-from cloudinit.helpers import Paths
-from cloudinit.sources import (
- REDACT_SENSITIVE_VALUE, INSTANCE_JSON_FILE, INSTANCE_JSON_SENSITIVE_FILE)
-from cloudinit.tests.helpers import mock
-
-from cloudinit.util import b64e, write_file
-
-
-def _gzip_data(data):
- with BytesIO() as iobuf:
- with gzip.GzipFile(mode="wb", fileobj=iobuf) as gzfp:
- gzfp.write(data)
- return iobuf.getvalue()
-
-
-@mock.patch("cloudinit.cmd.query.addLogHandlerCLI", lambda *args: "")
-class TestQuery:
-
- args = namedtuple(
- 'queryargs',
- ('debug dump_all format instance_data list_keys user_data vendor_data'
- ' varname'))
-
- def _setup_paths(self, tmpdir, ud_val=None, vd_val=None):
- """Write userdata and vendordata into a tmpdir.
-
- Return:
- 4-tuple : (paths, run_dir_path, userdata_path, vendordata_path)
- """
- if ud_val:
- user_data = tmpdir.join('user-data')
- write_file(user_data.strpath, ud_val)
- else:
- user_data = None
- if vd_val:
- vendor_data = tmpdir.join('vendor-data')
- write_file(vendor_data.strpath, vd_val)
- else:
- vendor_data = None
- run_dir = tmpdir.join('run_dir')
- run_dir.ensure_dir()
- return (
- Paths({'run_dir': run_dir.strpath}),
- run_dir,
- user_data,
- vendor_data
- )
-
- def test_handle_args_error_on_missing_param(self, caplog, capsys):
- """Error when missing required parameters and print usage."""
- args = self.args(
- debug=False, dump_all=False, format=None, instance_data=None,
- list_keys=False, user_data=None, vendor_data=None, varname=None)
- with mock.patch(
- "cloudinit.cmd.query.addLogHandlerCLI", return_value=""
- ) as m_cli_log:
- assert 1 == query.handle_args('anyname', args)
- expected_error = (
- 'Expected one of the options: --all, --format, --list-keys'
- ' or varname\n')
- assert expected_error in caplog.text
- out, _err = capsys.readouterr()
- assert 'usage: query' in out
- assert 1 == m_cli_log.call_count
-
- def test_handle_args_error_on_missing_instance_data(self, caplog, tmpdir):
- """When instance_data file path does not exist, log an error."""
- absent_fn = tmpdir.join('absent')
- args = self.args(
- debug=False, dump_all=True, format=None,
- instance_data=absent_fn.strpath,
- list_keys=False, user_data='ud', vendor_data='vd', varname=None)
- assert 1 == query.handle_args('anyname', args)
-
- msg = 'Missing instance-data file: %s' % absent_fn
- assert msg in caplog.text
-
- def test_handle_args_error_when_no_read_permission_instance_data(
- self, caplog, tmpdir
- ):
- """When instance_data file is unreadable, log an error."""
- noread_fn = tmpdir.join('unreadable')
- noread_fn.write('thou shall not pass')
- args = self.args(
- debug=False, dump_all=True, format=None,
- instance_data=noread_fn.strpath,
- list_keys=False, user_data='ud', vendor_data='vd', varname=None)
- with mock.patch('cloudinit.cmd.query.util.load_file') as m_load:
- m_load.side_effect = OSError(errno.EACCES, 'Not allowed')
- assert 1 == query.handle_args('anyname', args)
- msg = "No read permission on '%s'. Try sudo" % noread_fn
- assert msg in caplog.text
-
- def test_handle_args_defaults_instance_data(self, caplog, tmpdir):
- """When no instance_data argument, default to configured run_dir."""
- args = self.args(
- debug=False, dump_all=True, format=None, instance_data=None,
- list_keys=False, user_data=None, vendor_data=None, varname=None)
- paths, run_dir, _, _ = self._setup_paths(tmpdir)
- with mock.patch('cloudinit.cmd.query.read_cfg_paths') as m_paths:
- m_paths.return_value = paths
- assert 1 == query.handle_args('anyname', args)
- json_file = run_dir.join(INSTANCE_JSON_FILE)
- msg = 'Missing instance-data file: %s' % json_file.strpath
- assert msg in caplog.text
-
- def test_handle_args_root_fallsback_to_instance_data(self, caplog, tmpdir):
- """When no instance_data argument, root falls back to redacted json."""
- args = self.args(
- debug=False, dump_all=True, format=None, instance_data=None,
- list_keys=False, user_data=None, vendor_data=None, varname=None)
- paths, run_dir, _, _ = self._setup_paths(tmpdir)
- with mock.patch('cloudinit.cmd.query.read_cfg_paths') as m_paths:
- m_paths.return_value = paths
- with mock.patch('os.getuid') as m_getuid:
- m_getuid.return_value = 0
- assert 1 == query.handle_args('anyname', args)
- json_file = run_dir.join(INSTANCE_JSON_FILE)
- sensitive_file = run_dir.join(INSTANCE_JSON_SENSITIVE_FILE)
- msg = (
- 'Missing root-readable %s. Using redacted %s instead.' %
- (
- sensitive_file.strpath, json_file.strpath
- )
- )
- assert msg in caplog.text
-
- @pytest.mark.parametrize(
- 'ud_src,ud_expected,vd_src,vd_expected',
- (
- ('hi mom', 'hi mom', 'hi pops', 'hi pops'),
- ('ud'.encode('utf-8'), 'ud', 'vd'.encode('utf-8'), 'vd'),
- (_gzip_data(b'ud'), 'ud', _gzip_data(b'vd'), 'vd'),
- (_gzip_data('ud'.encode('utf-8')), 'ud', _gzip_data(b'vd'), 'vd'),
- )
- )
- def test_handle_args_root_processes_user_data(
- self, ud_src, ud_expected, vd_src, vd_expected, capsys, tmpdir
- ):
- """Support reading multiple user-data file content types"""
- paths, run_dir, user_data, vendor_data = self._setup_paths(
- tmpdir, ud_val=ud_src, vd_val=vd_src
- )
- sensitive_file = run_dir.join(INSTANCE_JSON_SENSITIVE_FILE)
- sensitive_file.write('{"my-var": "it worked"}')
- args = self.args(
- debug=False, dump_all=True, format=None, instance_data=None,
- list_keys=False, user_data=user_data.strpath,
- vendor_data=vendor_data.strpath, varname=None)
- with mock.patch('cloudinit.cmd.query.read_cfg_paths') as m_paths:
- m_paths.return_value = paths
- with mock.patch('os.getuid') as m_getuid:
- m_getuid.return_value = 0
- assert 0 == query.handle_args('anyname', args)
- out, _err = capsys.readouterr()
- cmd_output = json.loads(out)
- assert "it worked" == cmd_output['my_var']
- if ud_expected == "ci-b64:":
- ud_expected = "ci-b64:{}".format(b64e(ud_src))
- if vd_expected == "ci-b64:":
- vd_expected = "ci-b64:{}".format(b64e(vd_src))
- assert ud_expected == cmd_output['userdata']
- assert vd_expected == cmd_output['vendordata']
-
- def test_handle_args_root_uses_instance_sensitive_data(
- self, capsys, tmpdir
- ):
- """When no instance_data argument, root uses sensitive json."""
- paths, run_dir, user_data, vendor_data = self._setup_paths(
- tmpdir, ud_val='ud', vd_val='vd'
- )
- sensitive_file = run_dir.join(INSTANCE_JSON_SENSITIVE_FILE)
- sensitive_file.write('{"my-var": "it worked"}')
- args = self.args(
- debug=False, dump_all=True, format=None, instance_data=None,
- list_keys=False, user_data=user_data.strpath,
- vendor_data=vendor_data.strpath, varname=None)
- with mock.patch('cloudinit.cmd.query.read_cfg_paths') as m_paths:
- m_paths.return_value = paths
- with mock.patch('os.getuid') as m_getuid:
- m_getuid.return_value = 0
- assert 0 == query.handle_args('anyname', args)
- expected = (
- '{\n "my_var": "it worked",\n "userdata": "ud",\n '
- '"vendordata": "vd"\n}\n'
- )
- out, _err = capsys.readouterr()
- assert expected == out
-
- def test_handle_args_dumps_all_instance_data(self, capsys, tmpdir):
- """When --all is specified query will dump all instance data vars."""
- instance_data = tmpdir.join('instance-data')
- instance_data.write('{"my-var": "it worked"}')
- args = self.args(
- debug=False, dump_all=True, format=None,
- instance_data=instance_data.strpath, list_keys=False,
- user_data='ud', vendor_data='vd', varname=None)
- with mock.patch('os.getuid') as m_getuid:
- m_getuid.return_value = 100
- assert 0 == query.handle_args('anyname', args)
- expected = (
- '{\n "my_var": "it worked",\n "userdata": "<%s> file:ud",\n'
- ' "vendordata": "<%s> file:vd"\n}\n' % (
- REDACT_SENSITIVE_VALUE, REDACT_SENSITIVE_VALUE
- )
- )
- out, _err = capsys.readouterr()
- assert expected == out
-
- def test_handle_args_returns_top_level_varname(self, capsys, tmpdir):
- """When the argument varname is passed, report its value."""
- instance_data = tmpdir.join('instance-data')
- instance_data.write('{"my-var": "it worked"}')
- args = self.args(
- debug=False, dump_all=True, format=None,
- instance_data=instance_data.strpath, list_keys=False,
- user_data='ud', vendor_data='vd', varname='my_var')
- with mock.patch('os.getuid') as m_getuid:
- m_getuid.return_value = 100
- assert 0 == query.handle_args('anyname', args)
- out, _err = capsys.readouterr()
- assert 'it worked\n' == out
-
- def test_handle_args_returns_nested_varname(self, capsys, tmpdir):
- """If user_data file is a jinja template render instance-data vars."""
- instance_data = tmpdir.join('instance-data')
- instance_data.write(
- '{"v1": {"key-2": "value-2"}, "my-var": "it worked"}'
- )
- args = self.args(
- debug=False, dump_all=False, format=None,
- instance_data=instance_data.strpath, user_data='ud',
- vendor_data='vd', list_keys=False, varname='v1.key_2')
- with mock.patch('os.getuid') as m_getuid:
- m_getuid.return_value = 100
- assert 0 == query.handle_args('anyname', args)
- out, _err = capsys.readouterr()
- assert 'value-2\n' == out
-
- def test_handle_args_returns_standardized_vars_to_top_level_aliases(
- self, capsys, tmpdir
- ):
- """Any standardized vars under v# are promoted as top-level aliases."""
- instance_data = tmpdir.join('instance-data')
- instance_data.write(
- '{"v1": {"v1_1": "val1.1"}, "v2": {"v2_2": "val2.2"},'
- ' "top": "gun"}')
- expected = dedent("""\
- {
- "top": "gun",
- "userdata": "<redacted for non-root user> file:ud",
- "v1": {
- "v1_1": "val1.1"
- },
- "v1_1": "val1.1",
- "v2": {
- "v2_2": "val2.2"
- },
- "v2_2": "val2.2",
- "vendordata": "<redacted for non-root user> file:vd"
- }
- """)
- args = self.args(
- debug=False, dump_all=True, format=None,
- instance_data=instance_data.strpath, user_data='ud',
- vendor_data='vd', list_keys=False, varname=None)
- with mock.patch('os.getuid') as m_getuid:
- m_getuid.return_value = 100
- assert 0 == query.handle_args('anyname', args)
- out, _err = capsys.readouterr()
- assert expected == out
-
- def test_handle_args_list_keys_sorts_top_level_keys_when_no_varname(
- self, capsys, tmpdir
- ):
- """Sort all top-level keys when only --list-keys provided."""
- instance_data = tmpdir.join('instance-data')
- instance_data.write(
- '{"v1": {"v1_1": "val1.1"}, "v2": {"v2_2": "val2.2"},'
- ' "top": "gun"}')
- expected = 'top\nuserdata\nv1\nv1_1\nv2\nv2_2\nvendordata\n'
- args = self.args(
- debug=False, dump_all=False, format=None,
- instance_data=instance_data.strpath, list_keys=True,
- user_data='ud', vendor_data='vd', varname=None)
- with mock.patch('os.getuid') as m_getuid:
- m_getuid.return_value = 100
- assert 0 == query.handle_args('anyname', args)
- out, _err = capsys.readouterr()
- assert expected == out
-
- def test_handle_args_list_keys_sorts_nested_keys_when_varname(
- self, capsys, tmpdir
- ):
- """Sort all nested keys of varname object when --list-keys provided."""
- instance_data = tmpdir.join('instance-data')
- instance_data.write(
- '{"v1": {"v1_1": "val1.1", "v1_2": "val1.2"}, "v2":' +
- ' {"v2_2": "val2.2"}, "top": "gun"}')
- expected = 'v1_1\nv1_2\n'
- args = self.args(
- debug=False, dump_all=False, format=None,
- instance_data=instance_data.strpath, list_keys=True,
- user_data='ud', vendor_data='vd', varname='v1')
- with mock.patch('os.getuid') as m_getuid:
- m_getuid.return_value = 100
- assert 0 == query.handle_args('anyname', args)
- out, _err = capsys.readouterr()
- assert expected == out
-
- def test_handle_args_list_keys_errors_when_varname_is_not_a_dict(
- self, caplog, tmpdir
- ):
- """Raise an error when --list-keys and varname specify a non-list."""
- instance_data = tmpdir.join('instance-data')
- instance_data.write(
- '{"v1": {"v1_1": "val1.1", "v1_2": "val1.2"}, "v2": ' +
- '{"v2_2": "val2.2"}, "top": "gun"}')
- expected_error = "--list-keys provided but 'top' is not a dict"
- args = self.args(
- debug=False, dump_all=False, format=None,
- instance_data=instance_data.strpath, list_keys=True,
- user_data='ud', vendor_data='vd', varname='top')
- with mock.patch('os.getuid') as m_getuid:
- m_getuid.return_value = 100
- assert 1 == query.handle_args('anyname', args)
- assert expected_error in caplog.text
-
-# vi: ts=4 expandtab