diff options
Diffstat (limited to 'tests/unittests/cmd/test_query.py')
-rw-r--r-- | tests/unittests/cmd/test_query.py | 537 |
1 files changed, 537 insertions, 0 deletions
diff --git a/tests/unittests/cmd/test_query.py b/tests/unittests/cmd/test_query.py new file mode 100644 index 00000000..03a73bb5 --- /dev/null +++ b/tests/unittests/cmd/test_query.py @@ -0,0 +1,537 @@ +# This file is part of cloud-init. See LICENSE file for license information. + +import errno +import gzip +import json +import os +from collections import namedtuple +from io import BytesIO +from textwrap import dedent + +import pytest + +from cloudinit.cmd import query +from cloudinit.helpers import Paths +from cloudinit.sources import ( + INSTANCE_JSON_FILE, + INSTANCE_JSON_SENSITIVE_FILE, + REDACT_SENSITIVE_VALUE, +) +from cloudinit.util import b64e, write_file +from tests.unittests.helpers import mock + + +def _gzip_data(data): + with BytesIO() as iobuf: + with gzip.GzipFile(mode="wb", fileobj=iobuf) as gzfp: + gzfp.write(data) + return iobuf.getvalue() + + +@mock.patch("cloudinit.cmd.query.addLogHandlerCLI", lambda *args: "") +class TestQuery: + + args = namedtuple( + "queryargs", + "debug dump_all format instance_data list_keys user_data vendor_data" + " varname", + ) + + def _setup_paths(self, tmpdir, ud_val=None, vd_val=None): + """Write userdata and vendordata into a tmpdir. + + Return: + 4-tuple : (paths, run_dir_path, userdata_path, vendordata_path) + """ + if ud_val: + user_data = tmpdir.join("user-data") + write_file(user_data.strpath, ud_val) + else: + user_data = None + if vd_val: + vendor_data = tmpdir.join("vendor-data") + write_file(vendor_data.strpath, vd_val) + else: + vendor_data = None + run_dir = tmpdir.join("run_dir") + run_dir.ensure_dir() + + cloud_dir = tmpdir.join("cloud_dir") + cloud_dir.ensure_dir() + + return ( + Paths( + {"cloud_dir": cloud_dir.strpath, "run_dir": run_dir.strpath} + ), + run_dir, + user_data, + vendor_data, + ) + + def test_handle_args_error_on_missing_param(self, caplog, capsys): + """Error when missing required parameters and print usage.""" + args = self.args( + debug=False, + dump_all=False, + format=None, + instance_data=None, + list_keys=False, + user_data=None, + vendor_data=None, + varname=None, + ) + with mock.patch( + "cloudinit.cmd.query.addLogHandlerCLI", return_value="" + ) as m_cli_log: + assert 1 == query.handle_args("anyname", args) + expected_error = ( + "Expected one of the options: --all, --format, --list-keys" + " or varname\n" + ) + assert expected_error in caplog.text + out, _err = capsys.readouterr() + assert "usage: query" in out + assert 1 == m_cli_log.call_count + + @pytest.mark.parametrize( + "inst_data,varname,expected_error", + ( + ( + '{"v1": {"key-2": "value-2"}}', + "v1.absent_leaf", + "instance-data 'v1' has no 'absent_leaf'\n", + ), + ( + '{"v1": {"key-2": "value-2"}}', + "absent_key", + "Undefined instance-data key 'absent_key'\n", + ), + ), + ) + def test_handle_args_error_on_invalid_vaname_paths( + self, inst_data, varname, expected_error, caplog, tmpdir + ): + """Error when varname is not a valid instance-data variable path.""" + instance_data = tmpdir.join("instance-data") + instance_data.write(inst_data) + args = self.args( + debug=False, + dump_all=False, + format=None, + instance_data=instance_data.strpath, + list_keys=False, + user_data=None, + vendor_data=None, + varname=varname, + ) + paths, _, _, _ = self._setup_paths(tmpdir) + with mock.patch("cloudinit.cmd.query.read_cfg_paths") as m_paths: + m_paths.return_value = paths + with mock.patch( + "cloudinit.cmd.query.addLogHandlerCLI", return_value="" + ): + with mock.patch("cloudinit.cmd.query.load_userdata") as m_lud: + m_lud.return_value = "ud" + assert 1 == query.handle_args("anyname", args) + assert expected_error in caplog.text + + def test_handle_args_error_on_missing_instance_data(self, caplog, tmpdir): + """When instance_data file path does not exist, log an error.""" + absent_fn = tmpdir.join("absent") + args = self.args( + debug=False, + dump_all=True, + format=None, + instance_data=absent_fn.strpath, + list_keys=False, + user_data="ud", + vendor_data="vd", + varname=None, + ) + assert 1 == query.handle_args("anyname", args) + + msg = "Missing instance-data file: %s" % absent_fn + assert msg in caplog.text + + def test_handle_args_error_when_no_read_permission_instance_data( + self, caplog, tmpdir + ): + """When instance_data file is unreadable, log an error.""" + noread_fn = tmpdir.join("unreadable") + noread_fn.write("thou shall not pass") + args = self.args( + debug=False, + dump_all=True, + format=None, + instance_data=noread_fn.strpath, + list_keys=False, + user_data="ud", + vendor_data="vd", + varname=None, + ) + with mock.patch("cloudinit.cmd.query.util.load_file") as m_load: + m_load.side_effect = OSError(errno.EACCES, "Not allowed") + assert 1 == query.handle_args("anyname", args) + msg = "No read permission on '%s'. Try sudo" % noread_fn + assert msg in caplog.text + + def test_handle_args_defaults_instance_data(self, caplog, tmpdir): + """When no instance_data argument, default to configured run_dir.""" + args = self.args( + debug=False, + dump_all=True, + format=None, + instance_data=None, + list_keys=False, + user_data=None, + vendor_data=None, + varname=None, + ) + paths, run_dir, _, _ = self._setup_paths(tmpdir) + with mock.patch("cloudinit.cmd.query.read_cfg_paths") as m_paths: + m_paths.return_value = paths + assert 1 == query.handle_args("anyname", args) + json_file = run_dir.join(INSTANCE_JSON_FILE) + msg = "Missing instance-data file: %s" % json_file.strpath + assert msg in caplog.text + + def test_handle_args_root_fallsback_to_instance_data(self, caplog, tmpdir): + """When no instance_data argument, root falls back to redacted json.""" + args = self.args( + debug=False, + dump_all=True, + format=None, + instance_data=None, + list_keys=False, + user_data=None, + vendor_data=None, + varname=None, + ) + paths, run_dir, _, _ = self._setup_paths(tmpdir) + with mock.patch("cloudinit.cmd.query.read_cfg_paths") as m_paths: + m_paths.return_value = paths + with mock.patch("os.getuid") as m_getuid: + m_getuid.return_value = 0 + assert 1 == query.handle_args("anyname", args) + json_file = run_dir.join(INSTANCE_JSON_FILE) + sensitive_file = run_dir.join(INSTANCE_JSON_SENSITIVE_FILE) + msg = "Missing root-readable %s. Using redacted %s instead." % ( + sensitive_file.strpath, + json_file.strpath, + ) + assert msg in caplog.text + + @pytest.mark.parametrize( + "ud_src,ud_expected,vd_src,vd_expected", + ( + ("hi mom", "hi mom", "hi pops", "hi pops"), + ("ud".encode("utf-8"), "ud", "vd".encode("utf-8"), "vd"), + (_gzip_data(b"ud"), "ud", _gzip_data(b"vd"), "vd"), + (_gzip_data("ud".encode("utf-8")), "ud", _gzip_data(b"vd"), "vd"), + ), + ) + def test_handle_args_root_processes_user_data( + self, ud_src, ud_expected, vd_src, vd_expected, capsys, tmpdir + ): + """Support reading multiple user-data file content types""" + paths, run_dir, user_data, vendor_data = self._setup_paths( + tmpdir, ud_val=ud_src, vd_val=vd_src + ) + sensitive_file = run_dir.join(INSTANCE_JSON_SENSITIVE_FILE) + sensitive_file.write('{"my-var": "it worked"}') + args = self.args( + debug=False, + dump_all=True, + format=None, + instance_data=None, + list_keys=False, + user_data=user_data.strpath, + vendor_data=vendor_data.strpath, + varname=None, + ) + with mock.patch("cloudinit.cmd.query.read_cfg_paths") as m_paths: + m_paths.return_value = paths + with mock.patch("os.getuid") as m_getuid: + m_getuid.return_value = 0 + assert 0 == query.handle_args("anyname", args) + out, _err = capsys.readouterr() + cmd_output = json.loads(out) + assert "it worked" == cmd_output["my-var"] + if ud_expected == "ci-b64:": + ud_expected = "ci-b64:{}".format(b64e(ud_src)) + if vd_expected == "ci-b64:": + vd_expected = "ci-b64:{}".format(b64e(vd_src)) + assert ud_expected == cmd_output["userdata"] + assert vd_expected == cmd_output["vendordata"] + + def test_handle_args_user_vendor_data_defaults_to_instance_link( + self, capsys, tmpdir + ): + """When no instance_data argument, root uses sensitive json.""" + paths, run_dir, _, _ = self._setup_paths(tmpdir) + sensitive_file = run_dir.join(INSTANCE_JSON_SENSITIVE_FILE) + sensitive_file.write('{"my-var": "it worked"}') + + ud_path = os.path.join(paths.instance_link, "user-data.txt") + write_file(ud_path, "instance_link_ud") + vd_path = os.path.join(paths.instance_link, "vendor-data.txt") + write_file(vd_path, "instance_link_vd") + + args = self.args( + debug=False, + dump_all=True, + format=None, + instance_data=None, + list_keys=False, + user_data=None, + vendor_data=None, + varname=None, + ) + with mock.patch("cloudinit.cmd.query.read_cfg_paths") as m_paths: + m_paths.return_value = paths + with mock.patch("os.getuid", return_value=0): + assert 0 == query.handle_args("anyname", args) + expected = ( + '{\n "my-var": "it worked",\n ' + '"userdata": "instance_link_ud",\n ' + '"vendordata": "instance_link_vd"\n}\n' + ) + out, _ = capsys.readouterr() + assert expected == out + + def test_handle_args_root_uses_instance_sensitive_data( + self, capsys, tmpdir + ): + """When no instance_data argument, root uses sensitive json.""" + paths, run_dir, user_data, vendor_data = self._setup_paths( + tmpdir, ud_val="ud", vd_val="vd" + ) + sensitive_file = run_dir.join(INSTANCE_JSON_SENSITIVE_FILE) + sensitive_file.write('{"my-var": "it worked"}') + args = self.args( + debug=False, + dump_all=True, + format=None, + instance_data=None, + list_keys=False, + user_data=user_data.strpath, + vendor_data=vendor_data.strpath, + varname=None, + ) + with mock.patch("cloudinit.cmd.query.read_cfg_paths") as m_paths: + m_paths.return_value = paths + with mock.patch("os.getuid") as m_getuid: + m_getuid.return_value = 0 + assert 0 == query.handle_args("anyname", args) + expected = ( + '{\n "my-var": "it worked",\n ' + '"userdata": "ud",\n "vendordata": "vd"\n}\n' + ) + out, _err = capsys.readouterr() + assert expected == out + + def test_handle_args_dumps_all_instance_data(self, capsys, tmpdir): + """When --all is specified query will dump all instance data vars.""" + instance_data = tmpdir.join("instance-data") + instance_data.write('{"my-var": "it worked"}') + args = self.args( + debug=False, + dump_all=True, + format=None, + instance_data=instance_data.strpath, + list_keys=False, + user_data="ud", + vendor_data="vd", + varname=None, + ) + with mock.patch("os.getuid") as m_getuid: + m_getuid.return_value = 100 + assert 0 == query.handle_args("anyname", args) + expected = ( + '{\n "my-var": "it worked",\n "userdata": "<%s> file:ud",\n' + ' "vendordata": "<%s> file:vd"\n}\n' + % (REDACT_SENSITIVE_VALUE, REDACT_SENSITIVE_VALUE) + ) + out, _err = capsys.readouterr() + assert expected == out + + def test_handle_args_returns_top_level_varname(self, capsys, tmpdir): + """When the argument varname is passed, report its value.""" + instance_data = tmpdir.join("instance-data") + instance_data.write('{"my-var": "it worked"}') + args = self.args( + debug=False, + dump_all=True, + format=None, + instance_data=instance_data.strpath, + list_keys=False, + user_data="ud", + vendor_data="vd", + varname="my_var", + ) + with mock.patch("os.getuid") as m_getuid: + m_getuid.return_value = 100 + assert 0 == query.handle_args("anyname", args) + out, _err = capsys.readouterr() + assert "it worked\n" == out + + @pytest.mark.parametrize( + "inst_data,varname,expected", + ( + ( + '{"v1": {"key-2": "value-2"}, "my-var": "it worked"}', + "v1.key_2", + "value-2\n", + ), + # Assert no jinja underscore-delimited aliases are reported on CLI + ( + '{"v1": {"something-hyphenated": {"no.underscores":"x",' + ' "no-alias": "y"}}, "my-var": "it worked"}', + "v1.something_hyphenated", + '{\n "no-alias": "y",\n "no.underscores": "x"\n}\n', + ), + ), + ) + def test_handle_args_returns_nested_varname( + self, inst_data, varname, expected, capsys, tmpdir + ): + """If user_data file is a jinja template render instance-data vars.""" + instance_data = tmpdir.join("instance-data") + instance_data.write(inst_data) + args = self.args( + debug=False, + dump_all=False, + format=None, + instance_data=instance_data.strpath, + user_data="ud", + vendor_data="vd", + list_keys=False, + varname=varname, + ) + with mock.patch("os.getuid") as m_getuid: + m_getuid.return_value = 100 + assert 0 == query.handle_args("anyname", args) + out, _err = capsys.readouterr() + assert expected == out + + def test_handle_args_returns_standardized_vars_to_top_level_aliases( + self, capsys, tmpdir + ): + """Any standardized vars under v# are promoted as top-level aliases.""" + instance_data = tmpdir.join("instance-data") + instance_data.write( + '{"v1": {"v1_1": "val1.1"}, "v2": {"v2_2": "val2.2"},' + ' "top": "gun"}' + ) + expected = dedent( + """\ + { + "top": "gun", + "userdata": "<redacted for non-root user> file:ud", + "v1": { + "v1_1": "val1.1" + }, + "v1_1": "val1.1", + "v2": { + "v2_2": "val2.2" + }, + "v2_2": "val2.2", + "vendordata": "<redacted for non-root user> file:vd" + } + """ + ) + args = self.args( + debug=False, + dump_all=True, + format=None, + instance_data=instance_data.strpath, + user_data="ud", + vendor_data="vd", + list_keys=False, + varname=None, + ) + with mock.patch("os.getuid") as m_getuid: + m_getuid.return_value = 100 + assert 0 == query.handle_args("anyname", args) + out, _err = capsys.readouterr() + assert expected == out + + def test_handle_args_list_keys_sorts_top_level_keys_when_no_varname( + self, capsys, tmpdir + ): + """Sort all top-level keys when only --list-keys provided.""" + instance_data = tmpdir.join("instance-data") + instance_data.write( + '{"v1": {"v1_1": "val1.1"}, "v2": {"v2_2": "val2.2"},' + ' "top": "gun"}' + ) + expected = "top\nuserdata\nv1\nv1_1\nv2\nv2_2\nvendordata\n" + args = self.args( + debug=False, + dump_all=False, + format=None, + instance_data=instance_data.strpath, + list_keys=True, + user_data="ud", + vendor_data="vd", + varname=None, + ) + with mock.patch("os.getuid") as m_getuid: + m_getuid.return_value = 100 + assert 0 == query.handle_args("anyname", args) + out, _err = capsys.readouterr() + assert expected == out + + def test_handle_args_list_keys_sorts_nested_keys_when_varname( + self, capsys, tmpdir + ): + """Sort all nested keys of varname object when --list-keys provided.""" + instance_data = tmpdir.join("instance-data") + instance_data.write( + '{"v1": {"v1_1": "val1.1", "v1_2": "val1.2"}, "v2":' + + ' {"v2_2": "val2.2"}, "top": "gun"}' + ) + expected = "v1_1\nv1_2\n" + args = self.args( + debug=False, + dump_all=False, + format=None, + instance_data=instance_data.strpath, + list_keys=True, + user_data="ud", + vendor_data="vd", + varname="v1", + ) + with mock.patch("os.getuid") as m_getuid: + m_getuid.return_value = 100 + assert 0 == query.handle_args("anyname", args) + out, _err = capsys.readouterr() + assert expected == out + + def test_handle_args_list_keys_errors_when_varname_is_not_a_dict( + self, caplog, tmpdir + ): + """Raise an error when --list-keys and varname specify a non-list.""" + instance_data = tmpdir.join("instance-data") + instance_data.write( + '{"v1": {"v1_1": "val1.1", "v1_2": "val1.2"}, "v2": ' + + '{"v2_2": "val2.2"}, "top": "gun"}' + ) + expected_error = "--list-keys provided but 'top' is not a dict" + args = self.args( + debug=False, + dump_all=False, + format=None, + instance_data=instance_data.strpath, + list_keys=True, + user_data="ud", + vendor_data="vd", + varname="top", + ) + with mock.patch("os.getuid") as m_getuid: + m_getuid.return_value = 100 + assert 1 == query.handle_args("anyname", args) + assert expected_error in caplog.text + + +# vi: ts=4 expandtab |