summaryrefslogtreecommitdiff
path: root/cloudinit/cmd
diff options
context:
space:
mode:
authorzdc <zdc@users.noreply.github.com>2020-09-15 21:35:20 +0300
committerGitHub <noreply@github.com>2020-09-15 21:35:20 +0300
commit76adf82b8a4dbcf636151d292175b7d1ac182fcf (patch)
treef57f3db085a724df237ffa64b589c6bb6dd3b28f /cloudinit/cmd
parent1a790ee102fd405e5c3a20a17a69ba0c118ed874 (diff)
parent7cd260b313267dc7123cb99a75d4555e24909cca (diff)
downloadvyos-cloud-init-76adf82b8a4dbcf636151d292175b7d1ac182fcf.tar.gz
vyos-cloud-init-76adf82b8a4dbcf636151d292175b7d1ac182fcf.zip
Merge pull request #18 from zdc/T2117-equuleus-20.3
T2117: Cloud-init updated to 20.3
Diffstat (limited to 'cloudinit/cmd')
-rw-r--r--cloudinit/cmd/clean.py5
-rw-r--r--cloudinit/cmd/devel/logs.py4
-rwxr-xr-xcloudinit/cmd/devel/make_mime.py114
-rw-r--r--cloudinit/cmd/devel/parser.py5
-rwxr-xr-xcloudinit/cmd/devel/render.py5
-rw-r--r--cloudinit/cmd/devel/tests/test_logs.py3
-rw-r--r--cloudinit/cmd/query.py45
-rw-r--r--cloudinit/cmd/tests/test_clean.py1
-rw-r--r--cloudinit/cmd/tests/test_main.py2
-rw-r--r--cloudinit/cmd/tests/test_query.py392
-rw-r--r--cloudinit/cmd/tests/test_status.py1
11 files changed, 394 insertions, 183 deletions
diff --git a/cloudinit/cmd/clean.py b/cloudinit/cmd/clean.py
index 30e49de0..928a8eea 100644
--- a/cloudinit/cmd/clean.py
+++ b/cloudinit/cmd/clean.py
@@ -10,9 +10,8 @@ import os
import sys
from cloudinit.stages import Init
-from cloudinit.util import (
- ProcessExecutionError, del_dir, del_file, get_config_logfiles,
- is_link, subp)
+from cloudinit.subp import (ProcessExecutionError, subp)
+from cloudinit.util import (del_dir, del_file, get_config_logfiles, is_link)
def error(msg):
diff --git a/cloudinit/cmd/devel/logs.py b/cloudinit/cmd/devel/logs.py
index 4c086b51..51c61cca 100644
--- a/cloudinit/cmd/devel/logs.py
+++ b/cloudinit/cmd/devel/logs.py
@@ -12,8 +12,8 @@ import sys
from cloudinit.sources import INSTANCE_JSON_SENSITIVE_FILE
from cloudinit.temp_utils import tempdir
-from cloudinit.util import (
- ProcessExecutionError, chdir, copy, ensure_dir, subp, write_file)
+from cloudinit.subp import (ProcessExecutionError, subp)
+from cloudinit.util import (chdir, copy, ensure_dir, write_file)
CLOUDINIT_LOGS = ['/var/log/cloud-init.log', '/var/log/cloud-init-output.log']
diff --git a/cloudinit/cmd/devel/make_mime.py b/cloudinit/cmd/devel/make_mime.py
new file mode 100755
index 00000000..4e6a5778
--- /dev/null
+++ b/cloudinit/cmd/devel/make_mime.py
@@ -0,0 +1,114 @@
+# This file is part of cloud-init. See LICENSE file for license information.
+
+"""Generate multi-part mime messages for user-data """
+
+import argparse
+import sys
+from email.mime.multipart import MIMEMultipart
+from email.mime.text import MIMEText
+
+from cloudinit import log
+from cloudinit.handlers import INCLUSION_TYPES_MAP
+from . import addLogHandlerCLI
+
+NAME = 'make-mime'
+LOG = log.getLogger(NAME)
+EPILOG = ("Example: make-mime -a config.yaml:cloud-config "
+ "-a script.sh:x-shellscript > user-data")
+
+
+def file_content_type(text):
+ """ Return file content type by reading the first line of the input. """
+ try:
+ filename, content_type = text.split(":", 1)
+ return (open(filename, 'r'), filename, content_type.strip())
+ except ValueError as e:
+ raise argparse.ArgumentError(
+ text, "Invalid value for %r" % (text)
+ ) from e
+
+
+def get_parser(parser=None):
+ """Build or extend and arg parser for make-mime utility.
+
+ @param parser: Optional existing ArgumentParser instance representing the
+ subcommand which will be extended to support the args of this utility.
+
+ @returns: ArgumentParser with proper argument configuration.
+ """
+ if not parser:
+ parser = argparse.ArgumentParser()
+ # update the parser's doc and add an epilog to show an example
+ parser.description = __doc__
+ parser.epilog = EPILOG
+ parser.add_argument("-a", "--attach", dest="files", type=file_content_type,
+ action='append', default=[],
+ metavar="<file>:<content-type>",
+ help=("attach the given file as the specified "
+ "content-type"))
+ parser.add_argument('-l', '--list-types', action='store_true',
+ default=False,
+ help='List support cloud-init content types.')
+ parser.add_argument('-f', '--force', action='store_true',
+ default=False,
+ help='Ignore unknown content-type warnings')
+ return parser
+
+
+def get_content_types(strip_prefix=False):
+ """ Return a list of cloud-init supported content types. Optionally
+ strip out the leading 'text/' of the type if strip_prefix=True.
+ """
+ return sorted([ctype.replace("text/", "") if strip_prefix else ctype
+ for ctype in INCLUSION_TYPES_MAP.values()])
+
+
+def handle_args(name, args):
+ """Create a multi-part MIME archive for use as user-data. Optionally
+ print out the list of supported content types of cloud-init.
+
+ Also setup CLI log handlers to report to stderr since this is a development
+ utility which should be run by a human on the CLI.
+
+ @return 0 on success, 1 on failure.
+ """
+ addLogHandlerCLI(LOG, log.DEBUG if args.debug else log.WARNING)
+ if args.list_types:
+ print("\n".join(get_content_types(strip_prefix=True)))
+ return 0
+
+ sub_messages = []
+ errors = []
+ for i, (fh, filename, format_type) in enumerate(args.files):
+ contents = fh.read()
+ sub_message = MIMEText(contents, format_type, sys.getdefaultencoding())
+ sub_message.add_header('Content-Disposition',
+ 'attachment; filename="%s"' % (filename))
+ content_type = sub_message.get_content_type().lower()
+ if content_type not in get_content_types():
+ level = "WARNING" if args.force else "ERROR"
+ msg = (level + ": content type %r for attachment %s "
+ "may be incorrect!") % (content_type, i + 1)
+ sys.stderr.write(msg + '\n')
+ errors.append(msg)
+ sub_messages.append(sub_message)
+ if len(errors) and not args.force:
+ sys.stderr.write("Invalid content-types, override with --force\n")
+ return 1
+ combined_message = MIMEMultipart()
+ for msg in sub_messages:
+ combined_message.attach(msg)
+ print(combined_message)
+ return 0
+
+
+def main():
+ args = get_parser().parse_args()
+ return(handle_args(NAME, args))
+
+
+if __name__ == '__main__':
+ sys.exit(main())
+
+
+# vi: ts=4 expandtab
diff --git a/cloudinit/cmd/devel/parser.py b/cloudinit/cmd/devel/parser.py
index 99a234ce..1a3c46a4 100644
--- a/cloudinit/cmd/devel/parser.py
+++ b/cloudinit/cmd/devel/parser.py
@@ -9,6 +9,7 @@ from cloudinit.config import schema
from . import net_convert
from . import render
+from . import make_mime
def get_parser(parser=None):
@@ -25,7 +26,9 @@ def get_parser(parser=None):
(net_convert.NAME, net_convert.__doc__,
net_convert.get_parser, net_convert.handle_args),
(render.NAME, render.__doc__,
- render.get_parser, render.handle_args)
+ render.get_parser, render.handle_args),
+ (make_mime.NAME, make_mime.__doc__,
+ make_mime.get_parser, make_mime.handle_args),
]
for (subcmd, helpmsg, get_parser, handler) in subcmds:
parser = subparsers.add_parser(subcmd, help=helpmsg)
diff --git a/cloudinit/cmd/devel/render.py b/cloudinit/cmd/devel/render.py
index 1bc22406..1090aa16 100755
--- a/cloudinit/cmd/devel/render.py
+++ b/cloudinit/cmd/devel/render.py
@@ -57,8 +57,9 @@ def handle_args(name, args):
paths.run_dir, INSTANCE_JSON_SENSITIVE_FILE)
if not os.path.exists(instance_data_fn):
LOG.warning(
- 'Missing root-readable %s. Using redacted %s instead.',
- instance_data_fn, redacted_data_fn)
+ 'Missing root-readable %s. Using redacted %s instead.',
+ instance_data_fn, redacted_data_fn
+ )
instance_data_fn = redacted_data_fn
else:
instance_data_fn = redacted_data_fn
diff --git a/cloudinit/cmd/devel/tests/test_logs.py b/cloudinit/cmd/devel/tests/test_logs.py
index d2dfa8de..ddfd58e1 100644
--- a/cloudinit/cmd/devel/tests/test_logs.py
+++ b/cloudinit/cmd/devel/tests/test_logs.py
@@ -8,7 +8,8 @@ from cloudinit.cmd.devel import logs
from cloudinit.sources import INSTANCE_JSON_SENSITIVE_FILE
from cloudinit.tests.helpers import (
FilesystemMockingTestCase, mock, wrap_and_call)
-from cloudinit.util import ensure_dir, load_file, subp, write_file
+from cloudinit.subp import subp
+from cloudinit.util import ensure_dir, load_file, write_file
@mock.patch('cloudinit.cmd.devel.logs.os.getuid')
diff --git a/cloudinit/cmd/query.py b/cloudinit/cmd/query.py
index e3db8679..07db9552 100644
--- a/cloudinit/cmd/query.py
+++ b/cloudinit/cmd/query.py
@@ -1,6 +1,17 @@
# This file is part of cloud-init. See LICENSE file for license information.
-"""Query standardized instance metadata from the command line."""
+"""Query standardized instance metadata provided to machine, returning a JSON
+structure.
+
+Some instance-data values may be binary on some platforms, such as userdata and
+vendordata. Attempt to decompress and decode UTF-8 any binary values.
+
+Any binary values in the instance metadata will be base64-encoded and prefixed
+with "ci-b64:" in the output. userdata and, where applicable, vendordata may
+be provided to the machine gzip-compressed (and therefore as binary data).
+query will attempt to decompress these to a string before emitting the JSON
+output; if this fails, they are treated as binary.
+"""
import argparse
from errno import EACCES
@@ -30,7 +41,7 @@ def get_parser(parser=None):
"""
if not parser:
parser = argparse.ArgumentParser(
- prog=NAME, description='Query cloud-init instance data')
+ prog=NAME, description=__doc__)
parser.add_argument(
'-d', '--debug', action='store_true', default=False,
help='Add verbose messages during template render')
@@ -52,8 +63,10 @@ def get_parser(parser=None):
' /var/lib/cloud/instance/vendor-data.txt'))
parser.add_argument(
'varname', type=str, nargs='?',
- help=('A dot-delimited instance data variable to query from'
- ' instance-data query. For example: v2.local_hostname'))
+ help=('A dot-delimited specific variable to query from'
+ ' instance-data. For example: v1.local_hostname. If the'
+ ' value is not JSON serializable, it will be base64-encoded and'
+ ' will contain the prefix "ci-b64:". '))
parser.add_argument(
'-a', '--all', action='store_true', default=False, dest='dump_all',
help='Dump all available instance-data')
@@ -65,6 +78,21 @@ def get_parser(parser=None):
return parser
+def load_userdata(ud_file_path):
+ """Attempt to return a string of user-data from ud_file_path
+
+ Attempt to decode or decompress if needed.
+ If unable to decode the content, raw bytes will be returned.
+
+ @returns: String of uncompressed userdata if possible, otherwise bytes.
+ """
+ bdata = util.load_file(ud_file_path, decode=False)
+ try:
+ return bdata.decode('utf-8')
+ except UnicodeDecodeError:
+ return util.decomp_gzip(bdata, quiet=False, decode=True)
+
+
def handle_args(name, args):
"""Handle calls to 'cloud-init query' as a subcommand."""
paths = None
@@ -90,8 +118,9 @@ def handle_args(name, args):
instance_data_fn = sensitive_data_fn
else:
LOG.warning(
- 'Missing root-readable %s. Using redacted %s instead.',
- sensitive_data_fn, redacted_data_fn)
+ 'Missing root-readable %s. Using redacted %s instead.',
+ sensitive_data_fn, redacted_data_fn
+ )
instance_data_fn = redacted_data_fn
else:
instance_data_fn = redacted_data_fn
@@ -120,8 +149,8 @@ def handle_args(name, args):
instance_data['vendordata'] = (
'<%s> file:%s' % (REDACT_SENSITIVE_VALUE, vendor_data_fn))
else:
- instance_data['userdata'] = util.load_file(user_data_fn)
- instance_data['vendordata'] = util.load_file(vendor_data_fn)
+ instance_data['userdata'] = load_userdata(user_data_fn)
+ instance_data['vendordata'] = load_userdata(vendor_data_fn)
if args.format:
payload = '## template: jinja\n{fmt}'.format(fmt=args.format)
rendered_payload = render_jinja_payload(
diff --git a/cloudinit/cmd/tests/test_clean.py b/cloudinit/cmd/tests/test_clean.py
index 13a69aa1..a848a810 100644
--- a/cloudinit/cmd/tests/test_clean.py
+++ b/cloudinit/cmd/tests/test_clean.py
@@ -167,7 +167,6 @@ class TestClean(CiTestCase):
wrap_and_call(
'cloudinit.cmd.clean',
{'Init': {'side_effect': self.init_class},
- 'sys.exit': {'side_effect': self.sys_exit},
'sys.argv': {'new': ['clean', '--logs']}},
clean.main)
diff --git a/cloudinit/cmd/tests/test_main.py b/cloudinit/cmd/tests/test_main.py
index 384fddc6..585b3b0e 100644
--- a/cloudinit/cmd/tests/test_main.py
+++ b/cloudinit/cmd/tests/test_main.py
@@ -18,8 +18,6 @@ myargs = namedtuple('MyArgs', 'debug files force local reporter subcommand')
class TestMain(FilesystemMockingTestCase):
- with_logs = True
-
def setUp(self):
super(TestMain, self).setUp()
self.new_root = self.tmp_dir()
diff --git a/cloudinit/cmd/tests/test_query.py b/cloudinit/cmd/tests/test_query.py
index 6d36a4ea..c258d321 100644
--- a/cloudinit/cmd/tests/test_query.py
+++ b/cloudinit/cmd/tests/test_query.py
@@ -1,195 +1,260 @@
# This file is part of cloud-init. See LICENSE file for license information.
import errno
-from io import StringIO
+import gzip
+from io import BytesIO
+import json
from textwrap import dedent
-import os
+
+import pytest
from collections import namedtuple
from cloudinit.cmd import query
from cloudinit.helpers import Paths
from cloudinit.sources import (
REDACT_SENSITIVE_VALUE, INSTANCE_JSON_FILE, INSTANCE_JSON_SENSITIVE_FILE)
-from cloudinit.tests.helpers import CiTestCase, mock
-from cloudinit.util import ensure_dir, write_file
+from cloudinit.tests.helpers import mock
+
+from cloudinit.util import b64e, write_file
+
+def _gzip_data(data):
+ with BytesIO() as iobuf:
+ with gzip.GzipFile(mode="wb", fileobj=iobuf) as gzfp:
+ gzfp.write(data)
+ return iobuf.getvalue()
-class TestQuery(CiTestCase):
- with_logs = True
+@mock.patch("cloudinit.cmd.query.addLogHandlerCLI", lambda *args: "")
+class TestQuery:
args = namedtuple(
'queryargs',
('debug dump_all format instance_data list_keys user_data vendor_data'
' varname'))
- def setUp(self):
- super(TestQuery, self).setUp()
- self.tmp = self.tmp_dir()
- self.instance_data = self.tmp_path('instance-data', dir=self.tmp)
+ def _setup_paths(self, tmpdir, ud_val=None, vd_val=None):
+ """Write userdata and vendordata into a tmpdir.
- def test_handle_args_error_on_missing_param(self):
+ Return:
+ 4-tuple : (paths, run_dir_path, userdata_path, vendordata_path)
+ """
+ if ud_val:
+ user_data = tmpdir.join('user-data')
+ write_file(user_data.strpath, ud_val)
+ else:
+ user_data = None
+ if vd_val:
+ vendor_data = tmpdir.join('vendor-data')
+ write_file(vendor_data.strpath, vd_val)
+ else:
+ vendor_data = None
+ run_dir = tmpdir.join('run_dir')
+ run_dir.ensure_dir()
+ return (
+ Paths({'run_dir': run_dir.strpath}),
+ run_dir,
+ user_data,
+ vendor_data
+ )
+
+ def test_handle_args_error_on_missing_param(self, caplog, capsys):
"""Error when missing required parameters and print usage."""
args = self.args(
debug=False, dump_all=False, format=None, instance_data=None,
list_keys=False, user_data=None, vendor_data=None, varname=None)
- with mock.patch('sys.stderr', new_callable=StringIO) as m_stderr:
- with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout:
- self.assertEqual(1, query.handle_args('anyname', args))
+ with mock.patch(
+ "cloudinit.cmd.query.addLogHandlerCLI", return_value=""
+ ) as m_cli_log:
+ assert 1 == query.handle_args('anyname', args)
expected_error = (
- 'ERROR: Expected one of the options: --all, --format, --list-keys'
+ 'Expected one of the options: --all, --format, --list-keys'
' or varname\n')
- self.assertIn(expected_error, self.logs.getvalue())
- self.assertIn('usage: query', m_stdout.getvalue())
- self.assertIn(expected_error, m_stderr.getvalue())
+ assert expected_error in caplog.text
+ out, _err = capsys.readouterr()
+ assert 'usage: query' in out
+ assert 1 == m_cli_log.call_count
- def test_handle_args_error_on_missing_instance_data(self):
+ def test_handle_args_error_on_missing_instance_data(self, caplog, tmpdir):
"""When instance_data file path does not exist, log an error."""
- absent_fn = self.tmp_path('absent', dir=self.tmp)
+ absent_fn = tmpdir.join('absent')
args = self.args(
- debug=False, dump_all=True, format=None, instance_data=absent_fn,
+ debug=False, dump_all=True, format=None,
+ instance_data=absent_fn.strpath,
list_keys=False, user_data='ud', vendor_data='vd', varname=None)
- with mock.patch('sys.stderr', new_callable=StringIO) as m_stderr:
- self.assertEqual(1, query.handle_args('anyname', args))
- self.assertIn(
- 'ERROR: Missing instance-data file: %s' % absent_fn,
- self.logs.getvalue())
- self.assertIn(
- 'ERROR: Missing instance-data file: %s' % absent_fn,
- m_stderr.getvalue())
+ assert 1 == query.handle_args('anyname', args)
- def test_handle_args_error_when_no_read_permission_instance_data(self):
+ msg = 'Missing instance-data file: %s' % absent_fn
+ assert msg in caplog.text
+
+ def test_handle_args_error_when_no_read_permission_instance_data(
+ self, caplog, tmpdir
+ ):
"""When instance_data file is unreadable, log an error."""
- noread_fn = self.tmp_path('unreadable', dir=self.tmp)
- write_file(noread_fn, 'thou shall not pass')
+ noread_fn = tmpdir.join('unreadable')
+ noread_fn.write('thou shall not pass')
args = self.args(
- debug=False, dump_all=True, format=None, instance_data=noread_fn,
+ debug=False, dump_all=True, format=None,
+ instance_data=noread_fn.strpath,
list_keys=False, user_data='ud', vendor_data='vd', varname=None)
- with mock.patch('sys.stderr', new_callable=StringIO) as m_stderr:
- with mock.patch('cloudinit.cmd.query.util.load_file') as m_load:
- m_load.side_effect = OSError(errno.EACCES, 'Not allowed')
- self.assertEqual(1, query.handle_args('anyname', args))
- self.assertIn(
- "ERROR: No read permission on '%s'. Try sudo" % noread_fn,
- self.logs.getvalue())
- self.assertIn(
- "ERROR: No read permission on '%s'. Try sudo" % noread_fn,
- m_stderr.getvalue())
+ with mock.patch('cloudinit.cmd.query.util.load_file') as m_load:
+ m_load.side_effect = OSError(errno.EACCES, 'Not allowed')
+ assert 1 == query.handle_args('anyname', args)
+ msg = "No read permission on '%s'. Try sudo" % noread_fn
+ assert msg in caplog.text
- def test_handle_args_defaults_instance_data(self):
+ def test_handle_args_defaults_instance_data(self, caplog, tmpdir):
"""When no instance_data argument, default to configured run_dir."""
args = self.args(
debug=False, dump_all=True, format=None, instance_data=None,
list_keys=False, user_data=None, vendor_data=None, varname=None)
- run_dir = self.tmp_path('run_dir', dir=self.tmp)
- ensure_dir(run_dir)
- paths = Paths({'run_dir': run_dir})
- self.add_patch('cloudinit.cmd.query.read_cfg_paths', 'm_paths')
- self.m_paths.return_value = paths
- with mock.patch('sys.stderr', new_callable=StringIO) as m_stderr:
- self.assertEqual(1, query.handle_args('anyname', args))
- json_file = os.path.join(run_dir, INSTANCE_JSON_FILE)
- self.assertIn(
- 'ERROR: Missing instance-data file: %s' % json_file,
- self.logs.getvalue())
- self.assertIn(
- 'ERROR: Missing instance-data file: %s' % json_file,
- m_stderr.getvalue())
+ paths, run_dir, _, _ = self._setup_paths(tmpdir)
+ with mock.patch('cloudinit.cmd.query.read_cfg_paths') as m_paths:
+ m_paths.return_value = paths
+ assert 1 == query.handle_args('anyname', args)
+ json_file = run_dir.join(INSTANCE_JSON_FILE)
+ msg = 'Missing instance-data file: %s' % json_file.strpath
+ assert msg in caplog.text
- def test_handle_args_root_fallsback_to_instance_data(self):
+ def test_handle_args_root_fallsback_to_instance_data(self, caplog, tmpdir):
"""When no instance_data argument, root falls back to redacted json."""
args = self.args(
debug=False, dump_all=True, format=None, instance_data=None,
list_keys=False, user_data=None, vendor_data=None, varname=None)
- run_dir = self.tmp_path('run_dir', dir=self.tmp)
- ensure_dir(run_dir)
- paths = Paths({'run_dir': run_dir})
- self.add_patch('cloudinit.cmd.query.read_cfg_paths', 'm_paths')
- self.m_paths.return_value = paths
- with mock.patch('sys.stderr', new_callable=StringIO) as m_stderr:
+ paths, run_dir, _, _ = self._setup_paths(tmpdir)
+ with mock.patch('cloudinit.cmd.query.read_cfg_paths') as m_paths:
+ m_paths.return_value = paths
with mock.patch('os.getuid') as m_getuid:
m_getuid.return_value = 0
- self.assertEqual(1, query.handle_args('anyname', args))
- json_file = os.path.join(run_dir, INSTANCE_JSON_FILE)
- sensitive_file = os.path.join(run_dir, INSTANCE_JSON_SENSITIVE_FILE)
- self.assertIn(
- 'WARNING: Missing root-readable %s. Using redacted %s instead.' % (
- sensitive_file, json_file),
- m_stderr.getvalue())
+ assert 1 == query.handle_args('anyname', args)
+ json_file = run_dir.join(INSTANCE_JSON_FILE)
+ sensitive_file = run_dir.join(INSTANCE_JSON_SENSITIVE_FILE)
+ msg = (
+ 'Missing root-readable %s. Using redacted %s instead.' %
+ (
+ sensitive_file.strpath, json_file.strpath
+ )
+ )
+ assert msg in caplog.text
- def test_handle_args_root_uses_instance_sensitive_data(self):
- """When no instance_data argument, root uses semsitive json."""
- user_data = self.tmp_path('user-data', dir=self.tmp)
- vendor_data = self.tmp_path('vendor-data', dir=self.tmp)
- write_file(user_data, 'ud')
- write_file(vendor_data, 'vd')
- run_dir = self.tmp_path('run_dir', dir=self.tmp)
- sensitive_file = os.path.join(run_dir, INSTANCE_JSON_SENSITIVE_FILE)
- write_file(sensitive_file, '{"my-var": "it worked"}')
- ensure_dir(run_dir)
- paths = Paths({'run_dir': run_dir})
- self.add_patch('cloudinit.cmd.query.read_cfg_paths', 'm_paths')
- self.m_paths.return_value = paths
+ @pytest.mark.parametrize(
+ 'ud_src,ud_expected,vd_src,vd_expected',
+ (
+ ('hi mom', 'hi mom', 'hi pops', 'hi pops'),
+ ('ud'.encode('utf-8'), 'ud', 'vd'.encode('utf-8'), 'vd'),
+ (_gzip_data(b'ud'), 'ud', _gzip_data(b'vd'), 'vd'),
+ (_gzip_data('ud'.encode('utf-8')), 'ud', _gzip_data(b'vd'), 'vd'),
+ )
+ )
+ def test_handle_args_root_processes_user_data(
+ self, ud_src, ud_expected, vd_src, vd_expected, capsys, tmpdir
+ ):
+ """Support reading multiple user-data file content types"""
+ paths, run_dir, user_data, vendor_data = self._setup_paths(
+ tmpdir, ud_val=ud_src, vd_val=vd_src
+ )
+ sensitive_file = run_dir.join(INSTANCE_JSON_SENSITIVE_FILE)
+ sensitive_file.write('{"my-var": "it worked"}')
args = self.args(
debug=False, dump_all=True, format=None, instance_data=None,
- list_keys=False, user_data=vendor_data, vendor_data=vendor_data,
- varname=None)
- with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout:
+ list_keys=False, user_data=user_data.strpath,
+ vendor_data=vendor_data.strpath, varname=None)
+ with mock.patch('cloudinit.cmd.query.read_cfg_paths') as m_paths:
+ m_paths.return_value = paths
with mock.patch('os.getuid') as m_getuid:
m_getuid.return_value = 0
- self.assertEqual(0, query.handle_args('anyname', args))
- self.assertEqual(
- '{\n "my_var": "it worked",\n "userdata": "vd",\n '
- '"vendordata": "vd"\n}\n', m_stdout.getvalue())
+ assert 0 == query.handle_args('anyname', args)
+ out, _err = capsys.readouterr()
+ cmd_output = json.loads(out)
+ assert "it worked" == cmd_output['my_var']
+ if ud_expected == "ci-b64:":
+ ud_expected = "ci-b64:{}".format(b64e(ud_src))
+ if vd_expected == "ci-b64:":
+ vd_expected = "ci-b64:{}".format(b64e(vd_src))
+ assert ud_expected == cmd_output['userdata']
+ assert vd_expected == cmd_output['vendordata']
- def test_handle_args_dumps_all_instance_data(self):
+ def test_handle_args_root_uses_instance_sensitive_data(
+ self, capsys, tmpdir
+ ):
+ """When no instance_data argument, root uses sensitive json."""
+ paths, run_dir, user_data, vendor_data = self._setup_paths(
+ tmpdir, ud_val='ud', vd_val='vd'
+ )
+ sensitive_file = run_dir.join(INSTANCE_JSON_SENSITIVE_FILE)
+ sensitive_file.write('{"my-var": "it worked"}')
+ args = self.args(
+ debug=False, dump_all=True, format=None, instance_data=None,
+ list_keys=False, user_data=user_data.strpath,
+ vendor_data=vendor_data.strpath, varname=None)
+ with mock.patch('cloudinit.cmd.query.read_cfg_paths') as m_paths:
+ m_paths.return_value = paths
+ with mock.patch('os.getuid') as m_getuid:
+ m_getuid.return_value = 0
+ assert 0 == query.handle_args('anyname', args)
+ expected = (
+ '{\n "my_var": "it worked",\n "userdata": "ud",\n '
+ '"vendordata": "vd"\n}\n'
+ )
+ out, _err = capsys.readouterr()
+ assert expected == out
+
+ def test_handle_args_dumps_all_instance_data(self, capsys, tmpdir):
"""When --all is specified query will dump all instance data vars."""
- write_file(self.instance_data, '{"my-var": "it worked"}')
+ instance_data = tmpdir.join('instance-data')
+ instance_data.write('{"my-var": "it worked"}')
args = self.args(
debug=False, dump_all=True, format=None,
- instance_data=self.instance_data, list_keys=False,
+ instance_data=instance_data.strpath, list_keys=False,
user_data='ud', vendor_data='vd', varname=None)
- with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout:
- with mock.patch('os.getuid') as m_getuid:
- m_getuid.return_value = 100
- self.assertEqual(0, query.handle_args('anyname', args))
- self.assertEqual(
+ with mock.patch('os.getuid') as m_getuid:
+ m_getuid.return_value = 100
+ assert 0 == query.handle_args('anyname', args)
+ expected = (
'{\n "my_var": "it worked",\n "userdata": "<%s> file:ud",\n'
' "vendordata": "<%s> file:vd"\n}\n' % (
- REDACT_SENSITIVE_VALUE, REDACT_SENSITIVE_VALUE),
- m_stdout.getvalue())
+ REDACT_SENSITIVE_VALUE, REDACT_SENSITIVE_VALUE
+ )
+ )
+ out, _err = capsys.readouterr()
+ assert expected == out
- def test_handle_args_returns_top_level_varname(self):
+ def test_handle_args_returns_top_level_varname(self, capsys, tmpdir):
"""When the argument varname is passed, report its value."""
- write_file(self.instance_data, '{"my-var": "it worked"}')
+ instance_data = tmpdir.join('instance-data')
+ instance_data.write('{"my-var": "it worked"}')
args = self.args(
debug=False, dump_all=True, format=None,
- instance_data=self.instance_data, list_keys=False,
+ instance_data=instance_data.strpath, list_keys=False,
user_data='ud', vendor_data='vd', varname='my_var')
- with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout:
- with mock.patch('os.getuid') as m_getuid:
- m_getuid.return_value = 100
- self.assertEqual(0, query.handle_args('anyname', args))
- self.assertEqual('it worked\n', m_stdout.getvalue())
+ with mock.patch('os.getuid') as m_getuid:
+ m_getuid.return_value = 100
+ assert 0 == query.handle_args('anyname', args)
+ out, _err = capsys.readouterr()
+ assert 'it worked\n' == out
- def test_handle_args_returns_nested_varname(self):
+ def test_handle_args_returns_nested_varname(self, capsys, tmpdir):
"""If user_data file is a jinja template render instance-data vars."""
- write_file(self.instance_data,
- '{"v1": {"key-2": "value-2"}, "my-var": "it worked"}')
+ instance_data = tmpdir.join('instance-data')
+ instance_data.write(
+ '{"v1": {"key-2": "value-2"}, "my-var": "it worked"}'
+ )
args = self.args(
debug=False, dump_all=False, format=None,
- instance_data=self.instance_data, user_data='ud', vendor_data='vd',
- list_keys=False, varname='v1.key_2')
- with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout:
- with mock.patch('os.getuid') as m_getuid:
- m_getuid.return_value = 100
- self.assertEqual(0, query.handle_args('anyname', args))
- self.assertEqual('value-2\n', m_stdout.getvalue())
+ instance_data=instance_data.strpath, user_data='ud',
+ vendor_data='vd', list_keys=False, varname='v1.key_2')
+ with mock.patch('os.getuid') as m_getuid:
+ m_getuid.return_value = 100
+ assert 0 == query.handle_args('anyname', args)
+ out, _err = capsys.readouterr()
+ assert 'value-2\n' == out
- def test_handle_args_returns_standardized_vars_to_top_level_aliases(self):
+ def test_handle_args_returns_standardized_vars_to_top_level_aliases(
+ self, capsys, tmpdir
+ ):
"""Any standardized vars under v# are promoted as top-level aliases."""
- write_file(
- self.instance_data,
+ instance_data = tmpdir.join('instance-data')
+ instance_data.write(
'{"v1": {"v1_1": "val1.1"}, "v2": {"v2_2": "val2.2"},'
' "top": "gun"}')
expected = dedent("""\
@@ -209,65 +274,68 @@ class TestQuery(CiTestCase):
""")
args = self.args(
debug=False, dump_all=True, format=None,
- instance_data=self.instance_data, user_data='ud', vendor_data='vd',
- list_keys=False, varname=None)
- with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout:
- with mock.patch('os.getuid') as m_getuid:
- m_getuid.return_value = 100
- self.assertEqual(0, query.handle_args('anyname', args))
- self.assertEqual(expected, m_stdout.getvalue())
+ instance_data=instance_data.strpath, user_data='ud',
+ vendor_data='vd', list_keys=False, varname=None)
+ with mock.patch('os.getuid') as m_getuid:
+ m_getuid.return_value = 100
+ assert 0 == query.handle_args('anyname', args)
+ out, _err = capsys.readouterr()
+ assert expected == out
- def test_handle_args_list_keys_sorts_top_level_keys_when_no_varname(self):
+ def test_handle_args_list_keys_sorts_top_level_keys_when_no_varname(
+ self, capsys, tmpdir
+ ):
"""Sort all top-level keys when only --list-keys provided."""
- write_file(
- self.instance_data,
+ instance_data = tmpdir.join('instance-data')
+ instance_data.write(
'{"v1": {"v1_1": "val1.1"}, "v2": {"v2_2": "val2.2"},'
' "top": "gun"}')
expected = 'top\nuserdata\nv1\nv1_1\nv2\nv2_2\nvendordata\n'
args = self.args(
debug=False, dump_all=False, format=None,
- instance_data=self.instance_data, list_keys=True, user_data='ud',
- vendor_data='vd', varname=None)
- with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout:
- with mock.patch('os.getuid') as m_getuid:
- m_getuid.return_value = 100
- self.assertEqual(0, query.handle_args('anyname', args))
- self.assertEqual(expected, m_stdout.getvalue())
+ instance_data=instance_data.strpath, list_keys=True,
+ user_data='ud', vendor_data='vd', varname=None)
+ with mock.patch('os.getuid') as m_getuid:
+ m_getuid.return_value = 100
+ assert 0 == query.handle_args('anyname', args)
+ out, _err = capsys.readouterr()
+ assert expected == out
- def test_handle_args_list_keys_sorts_nested_keys_when_varname(self):
+ def test_handle_args_list_keys_sorts_nested_keys_when_varname(
+ self, capsys, tmpdir
+ ):
"""Sort all nested keys of varname object when --list-keys provided."""
- write_file(
- self.instance_data,
+ instance_data = tmpdir.join('instance-data')
+ instance_data.write(
'{"v1": {"v1_1": "val1.1", "v1_2": "val1.2"}, "v2":' +
' {"v2_2": "val2.2"}, "top": "gun"}')
expected = 'v1_1\nv1_2\n'
args = self.args(
debug=False, dump_all=False, format=None,
- instance_data=self.instance_data, list_keys=True,
+ instance_data=instance_data.strpath, list_keys=True,
user_data='ud', vendor_data='vd', varname='v1')
- with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout:
- with mock.patch('os.getuid') as m_getuid:
- m_getuid.return_value = 100
- self.assertEqual(0, query.handle_args('anyname', args))
- self.assertEqual(expected, m_stdout.getvalue())
+ with mock.patch('os.getuid') as m_getuid:
+ m_getuid.return_value = 100
+ assert 0 == query.handle_args('anyname', args)
+ out, _err = capsys.readouterr()
+ assert expected == out
- def test_handle_args_list_keys_errors_when_varname_is_not_a_dict(self):
+ def test_handle_args_list_keys_errors_when_varname_is_not_a_dict(
+ self, caplog, tmpdir
+ ):
"""Raise an error when --list-keys and varname specify a non-list."""
- write_file(
- self.instance_data,
+ instance_data = tmpdir.join('instance-data')
+ instance_data.write(
'{"v1": {"v1_1": "val1.1", "v1_2": "val1.2"}, "v2": ' +
'{"v2_2": "val2.2"}, "top": "gun"}')
- expected_error = "ERROR: --list-keys provided but 'top' is not a dict"
+ expected_error = "--list-keys provided but 'top' is not a dict"
args = self.args(
debug=False, dump_all=False, format=None,
- instance_data=self.instance_data, list_keys=True, user_data='ud',
- vendor_data='vd', varname='top')
- with mock.patch('sys.stderr', new_callable=StringIO) as m_stderr:
- with mock.patch('sys.stdout', new_callable=StringIO) as m_stdout:
- with mock.patch('os.getuid') as m_getuid:
- m_getuid.return_value = 100
- self.assertEqual(1, query.handle_args('anyname', args))
- self.assertEqual('', m_stdout.getvalue())
- self.assertIn(expected_error, m_stderr.getvalue())
+ instance_data=instance_data.strpath, list_keys=True,
+ user_data='ud', vendor_data='vd', varname='top')
+ with mock.patch('os.getuid') as m_getuid:
+ m_getuid.return_value = 100
+ assert 1 == query.handle_args('anyname', args)
+ assert expected_error in caplog.text
# vi: ts=4 expandtab
diff --git a/cloudinit/cmd/tests/test_status.py b/cloudinit/cmd/tests/test_status.py
index 1ed10896..1c9eec37 100644
--- a/cloudinit/cmd/tests/test_status.py
+++ b/cloudinit/cmd/tests/test_status.py
@@ -382,7 +382,6 @@ class TestStatus(CiTestCase):
wrap_and_call(
'cloudinit.cmd.status',
{'sys.argv': {'new': ['status']},
- 'sys.exit': {'side_effect': self.sys_exit},
'_is_cloudinit_disabled': (False, ''),
'Init': {'side_effect': self.init_class}},
status.main)