diff options
author | Scott Moser <smoser@brickies.net> | 2020-06-08 12:49:12 -0400 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-06-08 10:49:12 -0600 |
commit | 3c551f6ebc12f7729a2755c89b19b9000e27cc88 (patch) | |
tree | 0f7cd7ae6161791e7361e2bdffd38f414857f0c3 /tests/unittests/test_util.py | |
parent | 30aa1197c4c4d35d4ccf77d5d8854a40aa21219f (diff) | |
download | vyos-cloud-init-3c551f6ebc12f7729a2755c89b19b9000e27cc88.tar.gz vyos-cloud-init-3c551f6ebc12f7729a2755c89b19b9000e27cc88.zip |
Move subp into its own module. (#416)
This was painful, but it finishes a TODO from cloudinit/subp.py.
It moves the following from util to subp:
ProcessExecutionError
subp
which
target_path
I moved subp_blob_in_tempfile into cc_chef, which is its only caller.
That saved us from having to deal with it using write_file
and temp_utils from subp (which does not import any cloudinit things now).
It is arguable that 'target_path' could be moved to a 'path_utils' or
something, but in order to use it from subp and also from utils,
we had to get it out of utils.
Diffstat (limited to 'tests/unittests/test_util.py')
-rw-r--r-- | tests/unittests/test_util.py | 258 |
1 files changed, 22 insertions, 236 deletions
diff --git a/tests/unittests/test_util.py b/tests/unittests/test_util.py index 4b439267..737dda8b 100644 --- a/tests/unittests/test_util.py +++ b/tests/unittests/test_util.py @@ -1,25 +1,20 @@ # This file is part of cloud-init. See LICENSE file for license information. import io -import json import logging import os import re import shutil import stat -import sys import tempfile import yaml from unittest import mock +from cloudinit import subp from cloudinit import importer, util from cloudinit.tests import helpers -BASH = util.which('bash') -BOGUS_COMMAND = 'this-is-not-expected-to-be-a-program-name' - - class FakeSelinux(object): def __init__(self, match_what): @@ -385,7 +380,7 @@ class TestMountinfoParsing(helpers.ResourceUsingTestCase): self.assertEqual(expected, util.parse_mount_info('/run/lock', lines)) @mock.patch('cloudinit.util.os') - @mock.patch('cloudinit.util.subp') + @mock.patch('cloudinit.subp.subp') def test_get_device_info_from_zpool(self, zpool_output, m_os): # mock /dev/zfs exists m_os.path.exists.return_value = True @@ -408,17 +403,17 @@ class TestMountinfoParsing(helpers.ResourceUsingTestCase): self.assertIsNone(ret) @mock.patch('cloudinit.util.os') - @mock.patch('cloudinit.util.subp') + @mock.patch('cloudinit.subp.subp') def test_get_device_info_from_zpool_handles_no_zpool(self, m_sub, m_os): """Handle case where there is no zpool command""" # mock /dev/zfs exists m_os.path.exists.return_value = True - m_sub.side_effect = util.ProcessExecutionError("No zpool cmd") + m_sub.side_effect = subp.ProcessExecutionError("No zpool cmd") ret = util.get_device_info_from_zpool('vmzroot') self.assertIsNone(ret) @mock.patch('cloudinit.util.os') - @mock.patch('cloudinit.util.subp') + @mock.patch('cloudinit.subp.subp') def test_get_device_info_from_zpool_on_error(self, zpool_output, m_os): # mock /dev/zfs exists m_os.path.exists.return_value = True @@ -430,7 +425,7 @@ class TestMountinfoParsing(helpers.ResourceUsingTestCase): ret = util.get_device_info_from_zpool('vmzroot') self.assertIsNone(ret) - @mock.patch('cloudinit.util.subp') + @mock.patch('cloudinit.subp.subp') def test_parse_mount_with_ext(self, mount_out): mount_out.return_value = ( helpers.readResource('mount_parse_ext.txt'), '') @@ -447,7 +442,7 @@ class TestMountinfoParsing(helpers.ResourceUsingTestCase): ret = util.parse_mount('/not/existing/mount') self.assertIsNone(ret) - @mock.patch('cloudinit.util.subp') + @mock.patch('cloudinit.subp.subp') def test_parse_mount_with_zfs(self, mount_out): mount_out.return_value = ( helpers.readResource('mount_parse_zfs.txt'), '') @@ -513,13 +508,13 @@ class TestReadDMIData(helpers.FilesystemMockingTestCase): """ def _dmidecode_subp(cmd): if cmd[-1] != key: - raise util.ProcessExecutionError() + raise subp.ProcessExecutionError() return (content, error) self.patched_funcs.enter_context( - mock.patch.object(util, 'which', lambda _: True)) + mock.patch("cloudinit.subp.which", side_effect=lambda _: True)) self.patched_funcs.enter_context( - mock.patch.object(util, 'subp', _dmidecode_subp)) + mock.patch("cloudinit.subp.subp", side_effect=_dmidecode_subp)) def patch_mapping(self, new_mapping): self.patched_funcs.enter_context( @@ -546,10 +541,12 @@ class TestReadDMIData(helpers.FilesystemMockingTestCase): def test_dmidecode_not_used_on_arm(self): self.patch_mapping({}) + print("current =%s", subp) self._create_sysfs_parent_directory() dmi_val = 'from-dmidecode' dmi_name = 'use-dmidecode' self._configure_dmidecode_return(dmi_name, dmi_val) + print("now =%s", subp) expected = {'armel': None, 'aarch64': dmi_val, 'x86_64': dmi_val} found = {} @@ -560,6 +557,7 @@ class TestReadDMIData(helpers.FilesystemMockingTestCase): for arch in expected: m_uname.return_value = ('x-sysname', 'x-nodename', 'x-release', 'x-version', arch) + print("now2 =%s", subp) found[arch] = util.read_dmi_data(dmi_name) self.assertEqual(expected, found) @@ -570,7 +568,7 @@ class TestReadDMIData(helpers.FilesystemMockingTestCase): def test_none_returned_if_dmidecode_not_in_path(self): self.patched_funcs.enter_context( - mock.patch.object(util, 'which', lambda _: False)) + mock.patch.object(subp, 'which', lambda _: False)) self.patch_mapping({}) self.assertIsNone(util.read_dmi_data('expect-fail')) @@ -734,218 +732,6 @@ class TestReadSeeded(helpers.TestCase): self.assertEqual(found_ud, ud) -class TestSubp(helpers.CiTestCase): - allowed_subp = [BASH, 'cat', helpers.CiTestCase.SUBP_SHELL_TRUE, - BOGUS_COMMAND, sys.executable] - - stdin2err = [BASH, '-c', 'cat >&2'] - stdin2out = ['cat'] - utf8_invalid = b'ab\xaadef' - utf8_valid = b'start \xc3\xa9 end' - utf8_valid_2 = b'd\xc3\xa9j\xc8\xa7' - printenv = [BASH, '-c', 'for n in "$@"; do echo "$n=${!n}"; done', '--'] - - def printf_cmd(self, *args): - # bash's printf supports \xaa. So does /usr/bin/printf - # but by using bash, we remove dependency on another program. - return([BASH, '-c', 'printf "$@"', 'printf'] + list(args)) - - def test_subp_handles_bytestrings(self): - """subp can run a bytestring command if shell is True.""" - tmp_file = self.tmp_path('test.out') - cmd = 'echo HI MOM >> {tmp_file}'.format(tmp_file=tmp_file) - (out, _err) = util.subp(cmd.encode('utf-8'), shell=True) - self.assertEqual(u'', out) - self.assertEqual(u'', _err) - self.assertEqual('HI MOM\n', util.load_file(tmp_file)) - - def test_subp_handles_strings(self): - """subp can run a string command if shell is True.""" - tmp_file = self.tmp_path('test.out') - cmd = 'echo HI MOM >> {tmp_file}'.format(tmp_file=tmp_file) - (out, _err) = util.subp(cmd, shell=True) - self.assertEqual(u'', out) - self.assertEqual(u'', _err) - self.assertEqual('HI MOM\n', util.load_file(tmp_file)) - - def test_subp_handles_utf8(self): - # The given bytes contain utf-8 accented characters as seen in e.g. - # the "deja dup" package in Ubuntu. - cmd = self.printf_cmd(self.utf8_valid_2) - (out, _err) = util.subp(cmd, capture=True) - self.assertEqual(out, self.utf8_valid_2.decode('utf-8')) - - def test_subp_respects_decode_false(self): - (out, err) = util.subp(self.stdin2out, capture=True, decode=False, - data=self.utf8_valid) - self.assertTrue(isinstance(out, bytes)) - self.assertTrue(isinstance(err, bytes)) - self.assertEqual(out, self.utf8_valid) - - def test_subp_decode_ignore(self): - # this executes a string that writes invalid utf-8 to stdout - (out, _err) = util.subp(self.printf_cmd('abc\\xaadef'), - capture=True, decode='ignore') - self.assertEqual(out, 'abcdef') - - def test_subp_decode_strict_valid_utf8(self): - (out, _err) = util.subp(self.stdin2out, capture=True, - decode='strict', data=self.utf8_valid) - self.assertEqual(out, self.utf8_valid.decode('utf-8')) - - def test_subp_decode_invalid_utf8_replaces(self): - (out, _err) = util.subp(self.stdin2out, capture=True, - data=self.utf8_invalid) - expected = self.utf8_invalid.decode('utf-8', 'replace') - self.assertEqual(out, expected) - - def test_subp_decode_strict_raises(self): - args = [] - kwargs = {'args': self.stdin2out, 'capture': True, - 'decode': 'strict', 'data': self.utf8_invalid} - self.assertRaises(UnicodeDecodeError, util.subp, *args, **kwargs) - - def test_subp_capture_stderr(self): - data = b'hello world' - (out, err) = util.subp(self.stdin2err, capture=True, - decode=False, data=data, - update_env={'LC_ALL': 'C'}) - self.assertEqual(err, data) - self.assertEqual(out, b'') - - def test_subp_reads_env(self): - with mock.patch.dict("os.environ", values={'FOO': 'BAR'}): - out, _err = util.subp(self.printenv + ['FOO'], capture=True) - self.assertEqual('FOO=BAR', out.splitlines()[0]) - - def test_subp_env_and_update_env(self): - out, _err = util.subp( - self.printenv + ['FOO', 'HOME', 'K1', 'K2'], capture=True, - env={'FOO': 'BAR'}, - update_env={'HOME': '/myhome', 'K2': 'V2'}) - self.assertEqual( - ['FOO=BAR', 'HOME=/myhome', 'K1=', 'K2=V2'], out.splitlines()) - - def test_subp_update_env(self): - extra = {'FOO': 'BAR', 'HOME': '/root', 'K1': 'V1'} - with mock.patch.dict("os.environ", values=extra): - out, _err = util.subp( - self.printenv + ['FOO', 'HOME', 'K1', 'K2'], capture=True, - update_env={'HOME': '/myhome', 'K2': 'V2'}) - - self.assertEqual( - ['FOO=BAR', 'HOME=/myhome', 'K1=V1', 'K2=V2'], out.splitlines()) - - def test_subp_warn_missing_shebang(self): - """Warn on no #! in script""" - noshebang = self.tmp_path('noshebang') - util.write_file(noshebang, 'true\n') - - os.chmod(noshebang, os.stat(noshebang).st_mode | stat.S_IEXEC) - with self.allow_subp([noshebang]): - self.assertRaisesRegex(util.ProcessExecutionError, - r'Missing #! in script\?', - util.subp, (noshebang,)) - - def test_subp_combined_stderr_stdout(self): - """Providing combine_capture as True redirects stderr to stdout.""" - data = b'hello world' - (out, err) = util.subp(self.stdin2err, capture=True, - combine_capture=True, decode=False, data=data) - self.assertEqual(b'', err) - self.assertEqual(data, out) - - def test_returns_none_if_no_capture(self): - (out, err) = util.subp(self.stdin2out, data=b'', capture=False) - self.assertIsNone(err) - self.assertIsNone(out) - - def test_exception_has_out_err_are_bytes_if_decode_false(self): - """Raised exc should have stderr, stdout as bytes if no decode.""" - with self.assertRaises(util.ProcessExecutionError) as cm: - util.subp([BOGUS_COMMAND], decode=False) - self.assertTrue(isinstance(cm.exception.stdout, bytes)) - self.assertTrue(isinstance(cm.exception.stderr, bytes)) - - def test_exception_has_out_err_are_bytes_if_decode_true(self): - """Raised exc should have stderr, stdout as string if no decode.""" - with self.assertRaises(util.ProcessExecutionError) as cm: - util.subp([BOGUS_COMMAND], decode=True) - self.assertTrue(isinstance(cm.exception.stdout, str)) - self.assertTrue(isinstance(cm.exception.stderr, str)) - - def test_bunch_of_slashes_in_path(self): - self.assertEqual("/target/my/path/", - util.target_path("/target/", "//my/path/")) - self.assertEqual("/target/my/path/", - util.target_path("/target/", "///my/path/")) - - def test_c_lang_can_take_utf8_args(self): - """Independent of system LC_CTYPE, args can contain utf-8 strings. - - When python starts up, its default encoding gets set based on - the value of LC_CTYPE. If no system locale is set, the default - encoding for both python2 and python3 in some paths will end up - being ascii. - - Attempts to use setlocale or patching (or changing) os.environ - in the current environment seem to not be effective. - - This test starts up a python with LC_CTYPE set to C so that - the default encoding will be set to ascii. In such an environment - Popen(['command', 'non-ascii-arg']) would cause a UnicodeDecodeError. - """ - python_prog = '\n'.join([ - 'import json, sys', - 'from cloudinit.util import subp', - 'data = sys.stdin.read()', - 'cmd = json.loads(data)', - 'subp(cmd, capture=False)', - '']) - cmd = [BASH, '-c', 'echo -n "$@"', '--', - self.utf8_valid.decode("utf-8")] - python_subp = [sys.executable, '-c', python_prog] - - out, _err = util.subp( - python_subp, update_env={'LC_CTYPE': 'C'}, - data=json.dumps(cmd).encode("utf-8"), - decode=False) - self.assertEqual(self.utf8_valid, out) - - def test_bogus_command_logs_status_messages(self): - """status_cb gets status messages logs on bogus commands provided.""" - logs = [] - - def status_cb(log): - logs.append(log) - - with self.assertRaises(util.ProcessExecutionError): - util.subp([BOGUS_COMMAND], status_cb=status_cb) - - expected = [ - 'Begin run command: {cmd}\n'.format(cmd=BOGUS_COMMAND), - 'ERROR: End run command: invalid command provided\n'] - self.assertEqual(expected, logs) - - def test_command_logs_exit_codes_to_status_cb(self): - """status_cb gets status messages containing command exit code.""" - logs = [] - - def status_cb(log): - logs.append(log) - - with self.assertRaises(util.ProcessExecutionError): - util.subp([BASH, '-c', 'exit 2'], status_cb=status_cb) - util.subp([BASH, '-c', 'exit 0'], status_cb=status_cb) - - expected = [ - 'Begin run command: %s -c exit 2\n' % BASH, - 'ERROR: End run command: exit(2)\n', - 'Begin run command: %s -c exit 0\n' % BASH, - 'End run command: exit(0)\n'] - self.assertEqual(expected, logs) - - class TestEncode(helpers.TestCase): """Test the encoding functions""" def test_decode_binary_plain_text_with_hex(self): @@ -966,7 +752,7 @@ class TestProcessExecutionError(helpers.TestCase): empty_description = 'Unexpected error while running command.' def test_pexec_error_indent_text(self): - error = util.ProcessExecutionError() + error = subp.ProcessExecutionError() msg = 'abc\ndef' formatted = 'abc\n{0}def'.format(' ' * 4) self.assertEqual(error._indent_text(msg, indent_level=4), formatted) @@ -976,10 +762,10 @@ class TestProcessExecutionError(helpers.TestCase): error._indent_text(msg.encode()), type(msg.encode())) def test_pexec_error_type(self): - self.assertIsInstance(util.ProcessExecutionError(), IOError) + self.assertIsInstance(subp.ProcessExecutionError(), IOError) def test_pexec_error_empty_msgs(self): - error = util.ProcessExecutionError() + error = subp.ProcessExecutionError() self.assertTrue(all(attr == self.empty_attr for attr in (error.stderr, error.stdout, error.reason))) self.assertEqual(error.description, self.empty_description) @@ -993,7 +779,7 @@ class TestProcessExecutionError(helpers.TestCase): stderr_msg = 'error error' cmd = 'test command' exit_code = 3 - error = util.ProcessExecutionError( + error = subp.ProcessExecutionError( stdout=stdout_msg, stderr=stderr_msg, exit_code=3, cmd=cmd) self.assertEqual(str(error), self.template.format( description=self.empty_description, stdout=stdout_msg, @@ -1004,7 +790,7 @@ class TestProcessExecutionError(helpers.TestCase): # make sure bytes is converted handled properly when formatting stdout_msg = 'multi\nline\noutput message'.encode() stderr_msg = 'multi\nline\nerror message\n\n\n' - error = util.ProcessExecutionError( + error = subp.ProcessExecutionError( stdout=stdout_msg, stderr=stderr_msg) self.assertEqual( str(error), @@ -1170,7 +956,7 @@ class TestGetProcEnv(helpers.TestCase): self.assertEqual(my_ppid, util.get_proc_ppid(my_pid)) -@mock.patch('cloudinit.util.subp') +@mock.patch('cloudinit.subp.subp') def test_find_devs_with_openbsd(m_subp): m_subp.return_value = ( 'cd0:,sd0:630d98d32b5d3759,sd1:,fd0:', '' @@ -1179,7 +965,7 @@ def test_find_devs_with_openbsd(m_subp): assert devlist == ['/dev/cd0a', '/dev/sd1i'] -@mock.patch('cloudinit.util.subp') +@mock.patch('cloudinit.subp.subp') def test_find_devs_with_openbsd_with_criteria(m_subp): m_subp.return_value = ( 'cd0:,sd0:630d98d32b5d3759,sd1:,fd0:', '' @@ -1209,7 +995,7 @@ def test_find_devs_with_freebsd(m_glob): assert devlist == ['/dev/msdosfs/EFISYS'] -@mock.patch("cloudinit.util.subp") +@mock.patch("cloudinit.subp.subp") def test_find_devs_with_netbsd(m_subp): side_effect_values = [ ("ld0 dk0 dk1 cd0", ""), |