# This file is part of cloud-init. See LICENSE file for license information. """Query standardized instance metadata provided to machine, returning a JSON structure. Some instance-data values may be binary on some platforms, such as userdata and vendordata. Attempt to decompress and decode UTF-8 any binary values. Any binary values in the instance metadata will be base64-encoded and prefixed with "ci-b64:" in the output. userdata and, where applicable, vendordata may be provided to the machine gzip-compressed (and therefore as binary data). query will attempt to decompress these to a string before emitting the JSON output; if this fails, they are treated as binary. """ import argparse from errno import EACCES import os import sys from cloudinit.handlers.jinja_template import ( convert_jinja_instance_data, get_jinja_variable_alias, render_jinja_payload ) from cloudinit.cmd.devel import addLogHandlerCLI, read_cfg_paths from cloudinit import log from cloudinit.sources import ( INSTANCE_JSON_FILE, INSTANCE_JSON_SENSITIVE_FILE, REDACT_SENSITIVE_VALUE) from cloudinit import util NAME = 'query' LOG = log.getLogger(NAME) def get_parser(parser=None): """Build or extend an arg parser for query utility. @param parser: Optional existing ArgumentParser instance representing the query subcommand which will be extended to support the args of this utility. @returns: ArgumentParser with proper argument configuration. """ if not parser: parser = argparse.ArgumentParser( prog=NAME, description=__doc__) parser.add_argument( '-d', '--debug', action='store_true', default=False, help='Add verbose messages during template render') parser.add_argument( '-i', '--instance-data', type=str, help=('Path to instance-data.json file. Default is /run/cloud-init/%s' % INSTANCE_JSON_FILE)) parser.add_argument( '-l', '--list-keys', action='store_true', default=False, help=('List query keys available at the provided instance-data' ' .')) parser.add_argument( '-u', '--user-data', type=str, help=('Path to user-data file. Default is' ' /var/lib/cloud/instance/user-data.txt')) parser.add_argument( '-v', '--vendor-data', type=str, help=('Path to vendor-data file. Default is' ' /var/lib/cloud/instance/vendor-data.txt')) parser.add_argument( 'varname', type=str, nargs='?', help=('A dot-delimited specific variable to query from' ' instance-data. For example: v1.local_hostname. If the' ' value is not JSON serializable, it will be base64-encoded and' ' will contain the prefix "ci-b64:". ')) parser.add_argument( '-a', '--all', action='store_true', default=False, dest='dump_all', help='Dump all available instance-data') parser.add_argument( '-f', '--format', type=str, dest='format', help=('Optionally specify a custom output format string. Any' ' instance-data variable can be specified between double-curly' ' braces. For example -f "{{ v2.cloud_name }}"')) return parser def load_userdata(ud_file_path): """Attempt to return a string of user-data from ud_file_path Attempt to decode or decompress if needed. If unable to decode the content, raw bytes will be returned. @returns: String of uncompressed userdata if possible, otherwise bytes. """ bdata = util.load_file(ud_file_path, decode=False) try: return bdata.decode('utf-8') except UnicodeDecodeError: return util.decomp_gzip(bdata, quiet=False, decode=True) def _read_instance_data(instance_data, user_data, vendor_data) -> dict: """Return a dict of merged instance-data, vendordata and userdata. The dict will contain supplemental userdata and vendordata keys sourced from default user-data and vendor-data files. Non-root users will have redacted INSTANCE_JSON_FILE content and redacted vendordata and userdata values. :raise: IOError/OSError on absence of instance-data.json file or invalid access perms. """ paths = None uid = os.getuid() if not all([instance_data, user_data, vendor_data]): paths = read_cfg_paths() if instance_data: instance_data_fn = instance_data else: redacted_data_fn = os.path.join(paths.run_dir, INSTANCE_JSON_FILE) if uid == 0: sensitive_data_fn = os.path.join( paths.run_dir, INSTANCE_JSON_SENSITIVE_FILE) if os.path.exists(sensitive_data_fn): instance_data_fn = sensitive_data_fn else: LOG.warning( 'Missing root-readable %s. Using redacted %s instead.', sensitive_data_fn, redacted_data_fn ) instance_data_fn = redacted_data_fn else: instance_data_fn = redacted_data_fn if user_data: user_data_fn = user_data else: user_data_fn = os.path.join(paths.instance_link, 'user-data.txt') if vendor_data: vendor_data_fn = vendor_data else: vendor_data_fn = os.path.join(paths.instance_link, 'vendor-data.txt') try: instance_json = util.load_file(instance_data_fn) except (IOError, OSError) as e: if e.errno == EACCES: LOG.error("No read permission on '%s'. Try sudo", instance_data_fn) else: LOG.error('Missing instance-data file: %s', instance_data_fn) raise instance_data = util.load_json(instance_json) if uid != 0: instance_data['userdata'] = ( '<%s> file:%s' % (REDACT_SENSITIVE_VALUE, user_data_fn)) instance_data['vendordata'] = ( '<%s> file:%s' % (REDACT_SENSITIVE_VALUE, vendor_data_fn)) else: instance_data['userdata'] = load_userdata(user_data_fn) instance_data['vendordata'] = load_userdata(vendor_data_fn) return instance_data def _find_instance_data_leaf_by_varname_path( jinja_vars_without_aliases: dict, jinja_vars_with_aliases: dict, varname: str, list_keys: bool ): """Return the value of the dot-delimited varname path in instance-data Split a dot-delimited jinja variable name path into components, walk the path components into the instance_data and look up a matching jinja variable name or cloud-init's underscore-delimited key aliases. :raises: ValueError when varname represents an invalid key name or path or if list-keys is provided by varname isn't a dict object. """ walked_key_path = "" response = jinja_vars_without_aliases for key_path_part in varname.split('.'): try: # Walk key path using complete aliases dict, yet response # should only contain jinja_without_aliases jinja_vars_with_aliases = jinja_vars_with_aliases[key_path_part] except KeyError as e: if walked_key_path: msg = "instance-data '{key_path}' has no '{leaf}'".format( leaf=key_path_part, key_path=walked_key_path ) else: msg = "Undefined instance-data key '{}'".format(varname) raise ValueError(msg) from e if key_path_part in response: response = response[key_path_part] else: # We are an underscore_delimited key alias for key in response: if get_jinja_variable_alias(key) == key_path_part: response = response[key] break if walked_key_path: walked_key_path += "." walked_key_path += key_path_part return response def handle_args(name, args): """Handle calls to 'cloud-init query' as a subcommand.""" addLogHandlerCLI(LOG, log.DEBUG if args.debug else log.WARNING) if not any([args.list_keys, args.varname, args.format, args.dump_all]): LOG.error( 'Expected one of the options: --all, --format,' ' --list-keys or varname') get_parser().print_help() return 1 try: instance_data = _read_instance_data( args.instance_data, args.user_data, args.vendor_data ) except (IOError, OSError): return 1 if args.format: payload = '## template: jinja\n{fmt}'.format(fmt=args.format) rendered_payload = render_jinja_payload( payload=payload, payload_fn='query commandline', instance_data=instance_data, debug=True if args.debug else False) if rendered_payload: print(rendered_payload) return 0 return 1 # If not rendering a structured format above, query output will be either: # - JSON dump of all instance-data/jinja variables # - JSON dump of a value at an dict path into the instance-data dict. # - a list of keys for a specific dict path into the instance-data dict. response = convert_jinja_instance_data(instance_data) if args.varname: jinja_vars_with_aliases = convert_jinja_instance_data( instance_data, include_key_aliases=True ) try: response = _find_instance_data_leaf_by_varname_path( jinja_vars_without_aliases=response, jinja_vars_with_aliases=jinja_vars_with_aliases, varname=args.varname, list_keys=args.list_keys ) except (KeyError, ValueError) as e: LOG.error(e) return 1 if args.list_keys: if not isinstance(response, dict): LOG.error( "--list-keys provided but '%s' is not a dict", args.varname ) return 1 response = '\n'.join(sorted(response.keys())) if not isinstance(response, str): response = util.json_dumps(response) print(response) return 0 def main(): """Tool to query specific instance-data values.""" parser = get_parser() sys.exit(handle_args(NAME, parser.parse_args())) if __name__ == '__main__': main() # vi: ts=4 expandtab