summaryrefslogtreecommitdiff
path: root/tests/unittests/test_subp.py
diff options
context:
space:
mode:
authorBrett Holman <bholman.devel@gmail.com>2021-12-03 13:11:46 -0700
committerGitHub <noreply@github.com>2021-12-03 13:11:46 -0700
commit039c40f9b3d88ee8158604bb18ca4bf2fb5d5e51 (patch)
tree5f1b09486ccaf98ee8159de58d9a2a1ef0af5dc1 /tests/unittests/test_subp.py
parentffa6fc88249aa080aa31811a45569a45e567418a (diff)
downloadvyos-cloud-init-039c40f9b3d88ee8158604bb18ca4bf2fb5d5e51.tar.gz
vyos-cloud-init-039c40f9b3d88ee8158604bb18ca4bf2fb5d5e51.zip
Reorganize unit test locations under tests/unittests (#1126)
This attempts to standardize unit test file location under test/unittests/ such that any source file located at cloudinit/path/to/file.py may have a corresponding unit test file at test/unittests/path/to/test_file.py. Noteworthy Comments: ==================== Four different duplicate test files existed: test_{gpg,util,cc_mounts,cc_resolv_conf}.py Each of these duplicate file pairs has been merged together. This is a break in git history for these files. The test suite appears to have a dependency on test order. Changing test order causes some tests to fail. This should be rectified, but for now some tests have been modified in tests/unittests/config/test_set_passwords.py. A helper class name starts with "Test" which causes pytest to try executing it as a test case, which then throws warnings "due to Class having __init__()". Silence by changing the name of the class. # helpers.py is imported in many test files, import paths change cloudinit/tests/helpers.py -> tests/unittests/helpers.py # Move directories: cloudinit/distros/tests -> tests/unittests/distros cloudinit/cmd/devel/tests -> tests/unittests/cmd/devel cloudinit/cmd/tests -> tests/unittests/cmd/ cloudinit/sources/helpers/tests -> tests/unittests/sources/helpers cloudinit/sources/tests -> tests/unittests/sources cloudinit/net/tests -> tests/unittests/net cloudinit/config/tests -> tests/unittests/config cloudinit/analyze/tests/ -> tests/unittests/analyze/ # Standardize tests already in tests/unittests/ test_datasource -> sources test_distros -> distros test_vmware -> sources/vmware test_handler -> config # this contains cloudconfig module tests test_runs -> runs
Diffstat (limited to 'tests/unittests/test_subp.py')
-rw-r--r--tests/unittests/test_subp.py286
1 files changed, 286 insertions, 0 deletions
diff --git a/tests/unittests/test_subp.py b/tests/unittests/test_subp.py
new file mode 100644
index 00000000..ec513d01
--- /dev/null
+++ b/tests/unittests/test_subp.py
@@ -0,0 +1,286 @@
+# This file is part of cloud-init. See LICENSE file for license information.
+
+"""Tests for cloudinit.subp utility functions"""
+
+import json
+import os
+import sys
+import stat
+
+from unittest import mock
+
+from cloudinit import subp, util
+from tests.unittests.helpers import CiTestCase
+
+
+BASH = subp.which('bash')
+BOGUS_COMMAND = 'this-is-not-expected-to-be-a-program-name'
+
+
+class TestPrependBaseCommands(CiTestCase):
+
+ with_logs = True
+
+ def test_prepend_base_command_errors_on_neither_string_nor_list(self):
+ """Raise an error for each command which is not a string or list."""
+ orig_commands = ['ls', 1, {'not': 'gonna work'}, ['basecmd', 'list']]
+ with self.assertRaises(TypeError) as context_manager:
+ subp.prepend_base_command(
+ base_command='basecmd', commands=orig_commands)
+ self.assertEqual(
+ "Invalid basecmd config. These commands are not a string or"
+ " list:\n1\n{'not': 'gonna work'}",
+ str(context_manager.exception))
+
+ def test_prepend_base_command_warns_on_non_base_string_commands(self):
+ """Warn on each non-base for commands of type string."""
+ orig_commands = [
+ 'ls', 'basecmd list', 'touch /blah', 'basecmd install x']
+ fixed_commands = subp.prepend_base_command(
+ base_command='basecmd', commands=orig_commands)
+ self.assertEqual(
+ 'WARNING: Non-basecmd commands in basecmd config:\n'
+ 'ls\ntouch /blah\n',
+ self.logs.getvalue())
+ self.assertEqual(orig_commands, fixed_commands)
+
+ def test_prepend_base_command_prepends_on_non_base_list_commands(self):
+ """Prepend 'basecmd' for each non-basecmd command of type list."""
+ orig_commands = [['ls'], ['basecmd', 'list'], ['basecmda', '/blah'],
+ ['basecmd', 'install', 'x']]
+ expected = [['basecmd', 'ls'], ['basecmd', 'list'],
+ ['basecmd', 'basecmda', '/blah'],
+ ['basecmd', 'install', 'x']]
+ fixed_commands = subp.prepend_base_command(
+ base_command='basecmd', commands=orig_commands)
+ self.assertEqual('', self.logs.getvalue())
+ self.assertEqual(expected, fixed_commands)
+
+ def test_prepend_base_command_removes_first_item_when_none(self):
+ """Remove the first element of a non-basecmd when it is None."""
+ orig_commands = [[None, 'ls'], ['basecmd', 'list'],
+ [None, 'touch', '/blah'],
+ ['basecmd', 'install', 'x']]
+ expected = [['ls'], ['basecmd', 'list'],
+ ['touch', '/blah'],
+ ['basecmd', 'install', 'x']]
+ fixed_commands = subp.prepend_base_command(
+ base_command='basecmd', commands=orig_commands)
+ self.assertEqual('', self.logs.getvalue())
+ self.assertEqual(expected, fixed_commands)
+
+
+class TestSubp(CiTestCase):
+ allowed_subp = [BASH, 'cat', CiTestCase.SUBP_SHELL_TRUE,
+ BOGUS_COMMAND, sys.executable]
+
+ stdin2err = [BASH, '-c', 'cat >&2']
+ stdin2out = ['cat']
+ utf8_invalid = b'ab\xaadef'
+ utf8_valid = b'start \xc3\xa9 end'
+ utf8_valid_2 = b'd\xc3\xa9j\xc8\xa7'
+ printenv = [BASH, '-c', 'for n in "$@"; do echo "$n=${!n}"; done', '--']
+
+ def printf_cmd(self, *args):
+ # bash's printf supports \xaa. So does /usr/bin/printf
+ # but by using bash, we remove dependency on another program.
+ return([BASH, '-c', 'printf "$@"', 'printf'] + list(args))
+
+ def test_subp_handles_bytestrings(self):
+ """subp can run a bytestring command if shell is True."""
+ tmp_file = self.tmp_path('test.out')
+ cmd = 'echo HI MOM >> {tmp_file}'.format(tmp_file=tmp_file)
+ (out, _err) = subp.subp(cmd.encode('utf-8'), shell=True)
+ self.assertEqual('', out)
+ self.assertEqual('', _err)
+ self.assertEqual('HI MOM\n', util.load_file(tmp_file))
+
+ def test_subp_handles_strings(self):
+ """subp can run a string command if shell is True."""
+ tmp_file = self.tmp_path('test.out')
+ cmd = 'echo HI MOM >> {tmp_file}'.format(tmp_file=tmp_file)
+ (out, _err) = subp.subp(cmd, shell=True)
+ self.assertEqual('', out)
+ self.assertEqual('', _err)
+ self.assertEqual('HI MOM\n', util.load_file(tmp_file))
+
+ def test_subp_handles_utf8(self):
+ # The given bytes contain utf-8 accented characters as seen in e.g.
+ # the "deja dup" package in Ubuntu.
+ cmd = self.printf_cmd(self.utf8_valid_2)
+ (out, _err) = subp.subp(cmd, capture=True)
+ self.assertEqual(out, self.utf8_valid_2.decode('utf-8'))
+
+ def test_subp_respects_decode_false(self):
+ (out, err) = subp.subp(self.stdin2out, capture=True, decode=False,
+ data=self.utf8_valid)
+ self.assertTrue(isinstance(out, bytes))
+ self.assertTrue(isinstance(err, bytes))
+ self.assertEqual(out, self.utf8_valid)
+
+ def test_subp_decode_ignore(self):
+ # this executes a string that writes invalid utf-8 to stdout
+ (out, _err) = subp.subp(self.printf_cmd('abc\\xaadef'),
+ capture=True, decode='ignore')
+ self.assertEqual(out, 'abcdef')
+
+ def test_subp_decode_strict_valid_utf8(self):
+ (out, _err) = subp.subp(self.stdin2out, capture=True,
+ decode='strict', data=self.utf8_valid)
+ self.assertEqual(out, self.utf8_valid.decode('utf-8'))
+
+ def test_subp_decode_invalid_utf8_replaces(self):
+ (out, _err) = subp.subp(self.stdin2out, capture=True,
+ data=self.utf8_invalid)
+ expected = self.utf8_invalid.decode('utf-8', 'replace')
+ self.assertEqual(out, expected)
+
+ def test_subp_decode_strict_raises(self):
+ args = []
+ kwargs = {'args': self.stdin2out, 'capture': True,
+ 'decode': 'strict', 'data': self.utf8_invalid}
+ self.assertRaises(UnicodeDecodeError, subp.subp, *args, **kwargs)
+
+ def test_subp_capture_stderr(self):
+ data = b'hello world'
+ (out, err) = subp.subp(self.stdin2err, capture=True,
+ decode=False, data=data,
+ update_env={'LC_ALL': 'C'})
+ self.assertEqual(err, data)
+ self.assertEqual(out, b'')
+
+ def test_subp_reads_env(self):
+ with mock.patch.dict("os.environ", values={'FOO': 'BAR'}):
+ out, _err = subp.subp(self.printenv + ['FOO'], capture=True)
+ self.assertEqual('FOO=BAR', out.splitlines()[0])
+
+ def test_subp_env_and_update_env(self):
+ out, _err = subp.subp(
+ self.printenv + ['FOO', 'HOME', 'K1', 'K2'], capture=True,
+ env={'FOO': 'BAR'},
+ update_env={'HOME': '/myhome', 'K2': 'V2'})
+ self.assertEqual(
+ ['FOO=BAR', 'HOME=/myhome', 'K1=', 'K2=V2'], out.splitlines())
+
+ def test_subp_update_env(self):
+ extra = {'FOO': 'BAR', 'HOME': '/root', 'K1': 'V1'}
+ with mock.patch.dict("os.environ", values=extra):
+ out, _err = subp.subp(
+ self.printenv + ['FOO', 'HOME', 'K1', 'K2'], capture=True,
+ update_env={'HOME': '/myhome', 'K2': 'V2'})
+
+ self.assertEqual(
+ ['FOO=BAR', 'HOME=/myhome', 'K1=V1', 'K2=V2'], out.splitlines())
+
+ def test_subp_warn_missing_shebang(self):
+ """Warn on no #! in script"""
+ noshebang = self.tmp_path('noshebang')
+ util.write_file(noshebang, 'true\n')
+
+ print("os is %s" % os)
+ os.chmod(noshebang, os.stat(noshebang).st_mode | stat.S_IEXEC)
+ with self.allow_subp([noshebang]):
+ self.assertRaisesRegex(subp.ProcessExecutionError,
+ r'Missing #! in script\?',
+ subp.subp, (noshebang,))
+
+ def test_subp_combined_stderr_stdout(self):
+ """Providing combine_capture as True redirects stderr to stdout."""
+ data = b'hello world'
+ (out, err) = subp.subp(self.stdin2err, capture=True,
+ combine_capture=True, decode=False, data=data)
+ self.assertEqual(b'', err)
+ self.assertEqual(data, out)
+
+ def test_returns_none_if_no_capture(self):
+ (out, err) = subp.subp(self.stdin2out, data=b'', capture=False)
+ self.assertIsNone(err)
+ self.assertIsNone(out)
+
+ def test_exception_has_out_err_are_bytes_if_decode_false(self):
+ """Raised exc should have stderr, stdout as bytes if no decode."""
+ with self.assertRaises(subp.ProcessExecutionError) as cm:
+ subp.subp([BOGUS_COMMAND], decode=False)
+ self.assertTrue(isinstance(cm.exception.stdout, bytes))
+ self.assertTrue(isinstance(cm.exception.stderr, bytes))
+
+ def test_exception_has_out_err_are_bytes_if_decode_true(self):
+ """Raised exc should have stderr, stdout as string if no decode."""
+ with self.assertRaises(subp.ProcessExecutionError) as cm:
+ subp.subp([BOGUS_COMMAND], decode=True)
+ self.assertTrue(isinstance(cm.exception.stdout, str))
+ self.assertTrue(isinstance(cm.exception.stderr, str))
+
+ def test_bunch_of_slashes_in_path(self):
+ self.assertEqual("/target/my/path/",
+ subp.target_path("/target/", "//my/path/"))
+ self.assertEqual("/target/my/path/",
+ subp.target_path("/target/", "///my/path/"))
+
+ def test_c_lang_can_take_utf8_args(self):
+ """Independent of system LC_CTYPE, args can contain utf-8 strings.
+
+ When python starts up, its default encoding gets set based on
+ the value of LC_CTYPE. If no system locale is set, the default
+ encoding for both python2 and python3 in some paths will end up
+ being ascii.
+
+ Attempts to use setlocale or patching (or changing) os.environ
+ in the current environment seem to not be effective.
+
+ This test starts up a python with LC_CTYPE set to C so that
+ the default encoding will be set to ascii. In such an environment
+ Popen(['command', 'non-ascii-arg']) would cause a UnicodeDecodeError.
+ """
+ python_prog = '\n'.join([
+ 'import json, sys',
+ 'from cloudinit.subp import subp',
+ 'data = sys.stdin.read()',
+ 'cmd = json.loads(data)',
+ 'subp(cmd, capture=False)',
+ ''])
+ cmd = [BASH, '-c', 'echo -n "$@"', '--',
+ self.utf8_valid.decode("utf-8")]
+ python_subp = [sys.executable, '-c', python_prog]
+
+ out, _err = subp.subp(
+ python_subp, update_env={'LC_CTYPE': 'C'},
+ data=json.dumps(cmd).encode("utf-8"),
+ decode=False)
+ self.assertEqual(self.utf8_valid, out)
+
+ def test_bogus_command_logs_status_messages(self):
+ """status_cb gets status messages logs on bogus commands provided."""
+ logs = []
+
+ def status_cb(log):
+ logs.append(log)
+
+ with self.assertRaises(subp.ProcessExecutionError):
+ subp.subp([BOGUS_COMMAND], status_cb=status_cb)
+
+ expected = [
+ 'Begin run command: {cmd}\n'.format(cmd=BOGUS_COMMAND),
+ 'ERROR: End run command: invalid command provided\n']
+ self.assertEqual(expected, logs)
+
+ def test_command_logs_exit_codes_to_status_cb(self):
+ """status_cb gets status messages containing command exit code."""
+ logs = []
+
+ def status_cb(log):
+ logs.append(log)
+
+ with self.assertRaises(subp.ProcessExecutionError):
+ subp.subp([BASH, '-c', 'exit 2'], status_cb=status_cb)
+ subp.subp([BASH, '-c', 'exit 0'], status_cb=status_cb)
+
+ expected = [
+ 'Begin run command: %s -c exit 2\n' % BASH,
+ 'ERROR: End run command: exit(2)\n',
+ 'Begin run command: %s -c exit 0\n' % BASH,
+ 'End run command: exit(0)\n']
+ self.assertEqual(expected, logs)
+
+
+# vi: ts=4 expandtab