summaryrefslogtreecommitdiff
path: root/tests/unittests/test_util.py
diff options
context:
space:
mode:
authorScott Moser <smoser@brickies.net>2020-06-08 12:49:12 -0400
committerGitHub <noreply@github.com>2020-06-08 10:49:12 -0600
commit3c551f6ebc12f7729a2755c89b19b9000e27cc88 (patch)
tree0f7cd7ae6161791e7361e2bdffd38f414857f0c3 /tests/unittests/test_util.py
parent30aa1197c4c4d35d4ccf77d5d8854a40aa21219f (diff)
downloadvyos-cloud-init-3c551f6ebc12f7729a2755c89b19b9000e27cc88.tar.gz
vyos-cloud-init-3c551f6ebc12f7729a2755c89b19b9000e27cc88.zip
Move subp into its own module. (#416)
This was painful, but it finishes a TODO from cloudinit/subp.py. It moves the following from util to subp: ProcessExecutionError subp which target_path I moved subp_blob_in_tempfile into cc_chef, which is its only caller. That saved us from having to deal with it using write_file and temp_utils from subp (which does not import any cloudinit things now). It is arguable that 'target_path' could be moved to a 'path_utils' or something, but in order to use it from subp and also from utils, we had to get it out of utils.
Diffstat (limited to 'tests/unittests/test_util.py')
-rw-r--r--tests/unittests/test_util.py258
1 files changed, 22 insertions, 236 deletions
diff --git a/tests/unittests/test_util.py b/tests/unittests/test_util.py
index 4b439267..737dda8b 100644
--- a/tests/unittests/test_util.py
+++ b/tests/unittests/test_util.py
@@ -1,25 +1,20 @@
# This file is part of cloud-init. See LICENSE file for license information.
import io
-import json
import logging
import os
import re
import shutil
import stat
-import sys
import tempfile
import yaml
from unittest import mock
+from cloudinit import subp
from cloudinit import importer, util
from cloudinit.tests import helpers
-BASH = util.which('bash')
-BOGUS_COMMAND = 'this-is-not-expected-to-be-a-program-name'
-
-
class FakeSelinux(object):
def __init__(self, match_what):
@@ -385,7 +380,7 @@ class TestMountinfoParsing(helpers.ResourceUsingTestCase):
self.assertEqual(expected, util.parse_mount_info('/run/lock', lines))
@mock.patch('cloudinit.util.os')
- @mock.patch('cloudinit.util.subp')
+ @mock.patch('cloudinit.subp.subp')
def test_get_device_info_from_zpool(self, zpool_output, m_os):
# mock /dev/zfs exists
m_os.path.exists.return_value = True
@@ -408,17 +403,17 @@ class TestMountinfoParsing(helpers.ResourceUsingTestCase):
self.assertIsNone(ret)
@mock.patch('cloudinit.util.os')
- @mock.patch('cloudinit.util.subp')
+ @mock.patch('cloudinit.subp.subp')
def test_get_device_info_from_zpool_handles_no_zpool(self, m_sub, m_os):
"""Handle case where there is no zpool command"""
# mock /dev/zfs exists
m_os.path.exists.return_value = True
- m_sub.side_effect = util.ProcessExecutionError("No zpool cmd")
+ m_sub.side_effect = subp.ProcessExecutionError("No zpool cmd")
ret = util.get_device_info_from_zpool('vmzroot')
self.assertIsNone(ret)
@mock.patch('cloudinit.util.os')
- @mock.patch('cloudinit.util.subp')
+ @mock.patch('cloudinit.subp.subp')
def test_get_device_info_from_zpool_on_error(self, zpool_output, m_os):
# mock /dev/zfs exists
m_os.path.exists.return_value = True
@@ -430,7 +425,7 @@ class TestMountinfoParsing(helpers.ResourceUsingTestCase):
ret = util.get_device_info_from_zpool('vmzroot')
self.assertIsNone(ret)
- @mock.patch('cloudinit.util.subp')
+ @mock.patch('cloudinit.subp.subp')
def test_parse_mount_with_ext(self, mount_out):
mount_out.return_value = (
helpers.readResource('mount_parse_ext.txt'), '')
@@ -447,7 +442,7 @@ class TestMountinfoParsing(helpers.ResourceUsingTestCase):
ret = util.parse_mount('/not/existing/mount')
self.assertIsNone(ret)
- @mock.patch('cloudinit.util.subp')
+ @mock.patch('cloudinit.subp.subp')
def test_parse_mount_with_zfs(self, mount_out):
mount_out.return_value = (
helpers.readResource('mount_parse_zfs.txt'), '')
@@ -513,13 +508,13 @@ class TestReadDMIData(helpers.FilesystemMockingTestCase):
"""
def _dmidecode_subp(cmd):
if cmd[-1] != key:
- raise util.ProcessExecutionError()
+ raise subp.ProcessExecutionError()
return (content, error)
self.patched_funcs.enter_context(
- mock.patch.object(util, 'which', lambda _: True))
+ mock.patch("cloudinit.subp.which", side_effect=lambda _: True))
self.patched_funcs.enter_context(
- mock.patch.object(util, 'subp', _dmidecode_subp))
+ mock.patch("cloudinit.subp.subp", side_effect=_dmidecode_subp))
def patch_mapping(self, new_mapping):
self.patched_funcs.enter_context(
@@ -546,10 +541,12 @@ class TestReadDMIData(helpers.FilesystemMockingTestCase):
def test_dmidecode_not_used_on_arm(self):
self.patch_mapping({})
+ print("current =%s", subp)
self._create_sysfs_parent_directory()
dmi_val = 'from-dmidecode'
dmi_name = 'use-dmidecode'
self._configure_dmidecode_return(dmi_name, dmi_val)
+ print("now =%s", subp)
expected = {'armel': None, 'aarch64': dmi_val, 'x86_64': dmi_val}
found = {}
@@ -560,6 +557,7 @@ class TestReadDMIData(helpers.FilesystemMockingTestCase):
for arch in expected:
m_uname.return_value = ('x-sysname', 'x-nodename',
'x-release', 'x-version', arch)
+ print("now2 =%s", subp)
found[arch] = util.read_dmi_data(dmi_name)
self.assertEqual(expected, found)
@@ -570,7 +568,7 @@ class TestReadDMIData(helpers.FilesystemMockingTestCase):
def test_none_returned_if_dmidecode_not_in_path(self):
self.patched_funcs.enter_context(
- mock.patch.object(util, 'which', lambda _: False))
+ mock.patch.object(subp, 'which', lambda _: False))
self.patch_mapping({})
self.assertIsNone(util.read_dmi_data('expect-fail'))
@@ -734,218 +732,6 @@ class TestReadSeeded(helpers.TestCase):
self.assertEqual(found_ud, ud)
-class TestSubp(helpers.CiTestCase):
- allowed_subp = [BASH, 'cat', helpers.CiTestCase.SUBP_SHELL_TRUE,
- BOGUS_COMMAND, sys.executable]
-
- stdin2err = [BASH, '-c', 'cat >&2']
- stdin2out = ['cat']
- utf8_invalid = b'ab\xaadef'
- utf8_valid = b'start \xc3\xa9 end'
- utf8_valid_2 = b'd\xc3\xa9j\xc8\xa7'
- printenv = [BASH, '-c', 'for n in "$@"; do echo "$n=${!n}"; done', '--']
-
- def printf_cmd(self, *args):
- # bash's printf supports \xaa. So does /usr/bin/printf
- # but by using bash, we remove dependency on another program.
- return([BASH, '-c', 'printf "$@"', 'printf'] + list(args))
-
- def test_subp_handles_bytestrings(self):
- """subp can run a bytestring command if shell is True."""
- tmp_file = self.tmp_path('test.out')
- cmd = 'echo HI MOM >> {tmp_file}'.format(tmp_file=tmp_file)
- (out, _err) = util.subp(cmd.encode('utf-8'), shell=True)
- self.assertEqual(u'', out)
- self.assertEqual(u'', _err)
- self.assertEqual('HI MOM\n', util.load_file(tmp_file))
-
- def test_subp_handles_strings(self):
- """subp can run a string command if shell is True."""
- tmp_file = self.tmp_path('test.out')
- cmd = 'echo HI MOM >> {tmp_file}'.format(tmp_file=tmp_file)
- (out, _err) = util.subp(cmd, shell=True)
- self.assertEqual(u'', out)
- self.assertEqual(u'', _err)
- self.assertEqual('HI MOM\n', util.load_file(tmp_file))
-
- def test_subp_handles_utf8(self):
- # The given bytes contain utf-8 accented characters as seen in e.g.
- # the "deja dup" package in Ubuntu.
- cmd = self.printf_cmd(self.utf8_valid_2)
- (out, _err) = util.subp(cmd, capture=True)
- self.assertEqual(out, self.utf8_valid_2.decode('utf-8'))
-
- def test_subp_respects_decode_false(self):
- (out, err) = util.subp(self.stdin2out, capture=True, decode=False,
- data=self.utf8_valid)
- self.assertTrue(isinstance(out, bytes))
- self.assertTrue(isinstance(err, bytes))
- self.assertEqual(out, self.utf8_valid)
-
- def test_subp_decode_ignore(self):
- # this executes a string that writes invalid utf-8 to stdout
- (out, _err) = util.subp(self.printf_cmd('abc\\xaadef'),
- capture=True, decode='ignore')
- self.assertEqual(out, 'abcdef')
-
- def test_subp_decode_strict_valid_utf8(self):
- (out, _err) = util.subp(self.stdin2out, capture=True,
- decode='strict', data=self.utf8_valid)
- self.assertEqual(out, self.utf8_valid.decode('utf-8'))
-
- def test_subp_decode_invalid_utf8_replaces(self):
- (out, _err) = util.subp(self.stdin2out, capture=True,
- data=self.utf8_invalid)
- expected = self.utf8_invalid.decode('utf-8', 'replace')
- self.assertEqual(out, expected)
-
- def test_subp_decode_strict_raises(self):
- args = []
- kwargs = {'args': self.stdin2out, 'capture': True,
- 'decode': 'strict', 'data': self.utf8_invalid}
- self.assertRaises(UnicodeDecodeError, util.subp, *args, **kwargs)
-
- def test_subp_capture_stderr(self):
- data = b'hello world'
- (out, err) = util.subp(self.stdin2err, capture=True,
- decode=False, data=data,
- update_env={'LC_ALL': 'C'})
- self.assertEqual(err, data)
- self.assertEqual(out, b'')
-
- def test_subp_reads_env(self):
- with mock.patch.dict("os.environ", values={'FOO': 'BAR'}):
- out, _err = util.subp(self.printenv + ['FOO'], capture=True)
- self.assertEqual('FOO=BAR', out.splitlines()[0])
-
- def test_subp_env_and_update_env(self):
- out, _err = util.subp(
- self.printenv + ['FOO', 'HOME', 'K1', 'K2'], capture=True,
- env={'FOO': 'BAR'},
- update_env={'HOME': '/myhome', 'K2': 'V2'})
- self.assertEqual(
- ['FOO=BAR', 'HOME=/myhome', 'K1=', 'K2=V2'], out.splitlines())
-
- def test_subp_update_env(self):
- extra = {'FOO': 'BAR', 'HOME': '/root', 'K1': 'V1'}
- with mock.patch.dict("os.environ", values=extra):
- out, _err = util.subp(
- self.printenv + ['FOO', 'HOME', 'K1', 'K2'], capture=True,
- update_env={'HOME': '/myhome', 'K2': 'V2'})
-
- self.assertEqual(
- ['FOO=BAR', 'HOME=/myhome', 'K1=V1', 'K2=V2'], out.splitlines())
-
- def test_subp_warn_missing_shebang(self):
- """Warn on no #! in script"""
- noshebang = self.tmp_path('noshebang')
- util.write_file(noshebang, 'true\n')
-
- os.chmod(noshebang, os.stat(noshebang).st_mode | stat.S_IEXEC)
- with self.allow_subp([noshebang]):
- self.assertRaisesRegex(util.ProcessExecutionError,
- r'Missing #! in script\?',
- util.subp, (noshebang,))
-
- def test_subp_combined_stderr_stdout(self):
- """Providing combine_capture as True redirects stderr to stdout."""
- data = b'hello world'
- (out, err) = util.subp(self.stdin2err, capture=True,
- combine_capture=True, decode=False, data=data)
- self.assertEqual(b'', err)
- self.assertEqual(data, out)
-
- def test_returns_none_if_no_capture(self):
- (out, err) = util.subp(self.stdin2out, data=b'', capture=False)
- self.assertIsNone(err)
- self.assertIsNone(out)
-
- def test_exception_has_out_err_are_bytes_if_decode_false(self):
- """Raised exc should have stderr, stdout as bytes if no decode."""
- with self.assertRaises(util.ProcessExecutionError) as cm:
- util.subp([BOGUS_COMMAND], decode=False)
- self.assertTrue(isinstance(cm.exception.stdout, bytes))
- self.assertTrue(isinstance(cm.exception.stderr, bytes))
-
- def test_exception_has_out_err_are_bytes_if_decode_true(self):
- """Raised exc should have stderr, stdout as string if no decode."""
- with self.assertRaises(util.ProcessExecutionError) as cm:
- util.subp([BOGUS_COMMAND], decode=True)
- self.assertTrue(isinstance(cm.exception.stdout, str))
- self.assertTrue(isinstance(cm.exception.stderr, str))
-
- def test_bunch_of_slashes_in_path(self):
- self.assertEqual("/target/my/path/",
- util.target_path("/target/", "//my/path/"))
- self.assertEqual("/target/my/path/",
- util.target_path("/target/", "///my/path/"))
-
- def test_c_lang_can_take_utf8_args(self):
- """Independent of system LC_CTYPE, args can contain utf-8 strings.
-
- When python starts up, its default encoding gets set based on
- the value of LC_CTYPE. If no system locale is set, the default
- encoding for both python2 and python3 in some paths will end up
- being ascii.
-
- Attempts to use setlocale or patching (or changing) os.environ
- in the current environment seem to not be effective.
-
- This test starts up a python with LC_CTYPE set to C so that
- the default encoding will be set to ascii. In such an environment
- Popen(['command', 'non-ascii-arg']) would cause a UnicodeDecodeError.
- """
- python_prog = '\n'.join([
- 'import json, sys',
- 'from cloudinit.util import subp',
- 'data = sys.stdin.read()',
- 'cmd = json.loads(data)',
- 'subp(cmd, capture=False)',
- ''])
- cmd = [BASH, '-c', 'echo -n "$@"', '--',
- self.utf8_valid.decode("utf-8")]
- python_subp = [sys.executable, '-c', python_prog]
-
- out, _err = util.subp(
- python_subp, update_env={'LC_CTYPE': 'C'},
- data=json.dumps(cmd).encode("utf-8"),
- decode=False)
- self.assertEqual(self.utf8_valid, out)
-
- def test_bogus_command_logs_status_messages(self):
- """status_cb gets status messages logs on bogus commands provided."""
- logs = []
-
- def status_cb(log):
- logs.append(log)
-
- with self.assertRaises(util.ProcessExecutionError):
- util.subp([BOGUS_COMMAND], status_cb=status_cb)
-
- expected = [
- 'Begin run command: {cmd}\n'.format(cmd=BOGUS_COMMAND),
- 'ERROR: End run command: invalid command provided\n']
- self.assertEqual(expected, logs)
-
- def test_command_logs_exit_codes_to_status_cb(self):
- """status_cb gets status messages containing command exit code."""
- logs = []
-
- def status_cb(log):
- logs.append(log)
-
- with self.assertRaises(util.ProcessExecutionError):
- util.subp([BASH, '-c', 'exit 2'], status_cb=status_cb)
- util.subp([BASH, '-c', 'exit 0'], status_cb=status_cb)
-
- expected = [
- 'Begin run command: %s -c exit 2\n' % BASH,
- 'ERROR: End run command: exit(2)\n',
- 'Begin run command: %s -c exit 0\n' % BASH,
- 'End run command: exit(0)\n']
- self.assertEqual(expected, logs)
-
-
class TestEncode(helpers.TestCase):
"""Test the encoding functions"""
def test_decode_binary_plain_text_with_hex(self):
@@ -966,7 +752,7 @@ class TestProcessExecutionError(helpers.TestCase):
empty_description = 'Unexpected error while running command.'
def test_pexec_error_indent_text(self):
- error = util.ProcessExecutionError()
+ error = subp.ProcessExecutionError()
msg = 'abc\ndef'
formatted = 'abc\n{0}def'.format(' ' * 4)
self.assertEqual(error._indent_text(msg, indent_level=4), formatted)
@@ -976,10 +762,10 @@ class TestProcessExecutionError(helpers.TestCase):
error._indent_text(msg.encode()), type(msg.encode()))
def test_pexec_error_type(self):
- self.assertIsInstance(util.ProcessExecutionError(), IOError)
+ self.assertIsInstance(subp.ProcessExecutionError(), IOError)
def test_pexec_error_empty_msgs(self):
- error = util.ProcessExecutionError()
+ error = subp.ProcessExecutionError()
self.assertTrue(all(attr == self.empty_attr for attr in
(error.stderr, error.stdout, error.reason)))
self.assertEqual(error.description, self.empty_description)
@@ -993,7 +779,7 @@ class TestProcessExecutionError(helpers.TestCase):
stderr_msg = 'error error'
cmd = 'test command'
exit_code = 3
- error = util.ProcessExecutionError(
+ error = subp.ProcessExecutionError(
stdout=stdout_msg, stderr=stderr_msg, exit_code=3, cmd=cmd)
self.assertEqual(str(error), self.template.format(
description=self.empty_description, stdout=stdout_msg,
@@ -1004,7 +790,7 @@ class TestProcessExecutionError(helpers.TestCase):
# make sure bytes is converted handled properly when formatting
stdout_msg = 'multi\nline\noutput message'.encode()
stderr_msg = 'multi\nline\nerror message\n\n\n'
- error = util.ProcessExecutionError(
+ error = subp.ProcessExecutionError(
stdout=stdout_msg, stderr=stderr_msg)
self.assertEqual(
str(error),
@@ -1170,7 +956,7 @@ class TestGetProcEnv(helpers.TestCase):
self.assertEqual(my_ppid, util.get_proc_ppid(my_pid))
-@mock.patch('cloudinit.util.subp')
+@mock.patch('cloudinit.subp.subp')
def test_find_devs_with_openbsd(m_subp):
m_subp.return_value = (
'cd0:,sd0:630d98d32b5d3759,sd1:,fd0:', ''
@@ -1179,7 +965,7 @@ def test_find_devs_with_openbsd(m_subp):
assert devlist == ['/dev/cd0a', '/dev/sd1i']
-@mock.patch('cloudinit.util.subp')
+@mock.patch('cloudinit.subp.subp')
def test_find_devs_with_openbsd_with_criteria(m_subp):
m_subp.return_value = (
'cd0:,sd0:630d98d32b5d3759,sd1:,fd0:', ''
@@ -1209,7 +995,7 @@ def test_find_devs_with_freebsd(m_glob):
assert devlist == ['/dev/msdosfs/EFISYS']
-@mock.patch("cloudinit.util.subp")
+@mock.patch("cloudinit.subp.subp")
def test_find_devs_with_netbsd(m_subp):
side_effect_values = [
("ld0 dk0 dk1 cd0", ""),