diff options
Diffstat (limited to 'tests/unittests/test_util.py')
-rw-r--r-- | tests/unittests/test_util.py | 408 |
1 files changed, 166 insertions, 242 deletions
diff --git a/tests/unittests/test_util.py b/tests/unittests/test_util.py index 9ff17f52..fc557469 100644 --- a/tests/unittests/test_util.py +++ b/tests/unittests/test_util.py @@ -1,27 +1,21 @@ # This file is part of cloud-init. See LICENSE file for license information. -from __future__ import print_function - import io -import json import logging import os import re import shutil import stat -import sys import tempfile +import pytest import yaml from unittest import mock +from cloudinit import subp from cloudinit import importer, util from cloudinit.tests import helpers -BASH = util.which('bash') -BOGUS_COMMAND = 'this-is-not-expected-to-be-a-program-name' - - class FakeSelinux(object): def __init__(self, match_what): @@ -105,6 +99,17 @@ class TestWriteFile(helpers.TestCase): self.assertTrue(os.path.isdir(dirname)) self.assertTrue(os.path.isfile(path)) + def test_dir_is_not_created_if_ensure_dir_false(self): + """Verify directories are not created if ensure_dir_exists is False.""" + dirname = os.path.join(self.tmp, "subdir") + path = os.path.join(dirname, "NewFile.txt") + contents = "Hey there" + + with self.assertRaises(FileNotFoundError): + util.write_file(path, contents, ensure_dir_exists=False) + + self.assertFalse(os.path.isdir(dirname)) + def test_explicit_mode(self): """Verify explicit file mode works properly.""" path = os.path.join(self.tmp, "NewFile.txt") @@ -117,29 +122,29 @@ class TestWriteFile(helpers.TestCase): file_stat = os.stat(path) self.assertEqual(0o666, stat.S_IMODE(file_stat.st_mode)) - def test_copy_mode_no_existing(self): - """Verify that file is created with mode 0o644 if copy_mode + def test_preserve_mode_no_existing(self): + """Verify that file is created with mode 0o644 if preserve_mode is true and there is no prior existing file.""" path = os.path.join(self.tmp, "NewFile.txt") contents = "Hey there" - util.write_file(path, contents, copy_mode=True) + util.write_file(path, contents, preserve_mode=True) self.assertTrue(os.path.exists(path)) self.assertTrue(os.path.isfile(path)) file_stat = os.stat(path) self.assertEqual(0o644, stat.S_IMODE(file_stat.st_mode)) - def test_copy_mode_with_existing(self): + def test_preserve_mode_with_existing(self): """Verify that file is created using mode of existing file - if copy_mode is true.""" + if preserve_mode is true.""" path = os.path.join(self.tmp, "NewFile.txt") contents = "Hey there" open(path, 'w').close() os.chmod(path, 0o666) - util.write_file(path, contents, copy_mode=True) + util.write_file(path, contents, preserve_mode=True) self.assertTrue(os.path.exists(path)) self.assertTrue(os.path.isfile(path)) @@ -387,7 +392,7 @@ class TestMountinfoParsing(helpers.ResourceUsingTestCase): self.assertEqual(expected, util.parse_mount_info('/run/lock', lines)) @mock.patch('cloudinit.util.os') - @mock.patch('cloudinit.util.subp') + @mock.patch('cloudinit.subp.subp') def test_get_device_info_from_zpool(self, zpool_output, m_os): # mock /dev/zfs exists m_os.path.exists.return_value = True @@ -410,17 +415,17 @@ class TestMountinfoParsing(helpers.ResourceUsingTestCase): self.assertIsNone(ret) @mock.patch('cloudinit.util.os') - @mock.patch('cloudinit.util.subp') + @mock.patch('cloudinit.subp.subp') def test_get_device_info_from_zpool_handles_no_zpool(self, m_sub, m_os): """Handle case where there is no zpool command""" # mock /dev/zfs exists m_os.path.exists.return_value = True - m_sub.side_effect = util.ProcessExecutionError("No zpool cmd") + m_sub.side_effect = subp.ProcessExecutionError("No zpool cmd") ret = util.get_device_info_from_zpool('vmzroot') self.assertIsNone(ret) @mock.patch('cloudinit.util.os') - @mock.patch('cloudinit.util.subp') + @mock.patch('cloudinit.subp.subp') def test_get_device_info_from_zpool_on_error(self, zpool_output, m_os): # mock /dev/zfs exists m_os.path.exists.return_value = True @@ -432,7 +437,7 @@ class TestMountinfoParsing(helpers.ResourceUsingTestCase): ret = util.get_device_info_from_zpool('vmzroot') self.assertIsNone(ret) - @mock.patch('cloudinit.util.subp') + @mock.patch('cloudinit.subp.subp') def test_parse_mount_with_ext(self, mount_out): mount_out.return_value = ( helpers.readResource('mount_parse_ext.txt'), '') @@ -449,7 +454,7 @@ class TestMountinfoParsing(helpers.ResourceUsingTestCase): ret = util.parse_mount('/not/existing/mount') self.assertIsNone(ret) - @mock.patch('cloudinit.util.subp') + @mock.patch('cloudinit.subp.subp') def test_parse_mount_with_zfs(self, mount_out): mount_out.return_value = ( helpers.readResource('mount_parse_zfs.txt'), '') @@ -515,13 +520,13 @@ class TestReadDMIData(helpers.FilesystemMockingTestCase): """ def _dmidecode_subp(cmd): if cmd[-1] != key: - raise util.ProcessExecutionError() + raise subp.ProcessExecutionError() return (content, error) self.patched_funcs.enter_context( - mock.patch.object(util, 'which', lambda _: True)) + mock.patch("cloudinit.subp.which", side_effect=lambda _: True)) self.patched_funcs.enter_context( - mock.patch.object(util, 'subp', _dmidecode_subp)) + mock.patch("cloudinit.subp.subp", side_effect=_dmidecode_subp)) def patch_mapping(self, new_mapping): self.patched_funcs.enter_context( @@ -548,10 +553,12 @@ class TestReadDMIData(helpers.FilesystemMockingTestCase): def test_dmidecode_not_used_on_arm(self): self.patch_mapping({}) + print("current =%s", subp) self._create_sysfs_parent_directory() dmi_val = 'from-dmidecode' dmi_name = 'use-dmidecode' self._configure_dmidecode_return(dmi_name, dmi_val) + print("now =%s", subp) expected = {'armel': None, 'aarch64': dmi_val, 'x86_64': dmi_val} found = {} @@ -562,6 +569,7 @@ class TestReadDMIData(helpers.FilesystemMockingTestCase): for arch in expected: m_uname.return_value = ('x-sysname', 'x-nodename', 'x-release', 'x-version', arch) + print("now2 =%s", subp) found[arch] = util.read_dmi_data(dmi_name) self.assertEqual(expected, found) @@ -572,7 +580,7 @@ class TestReadDMIData(helpers.FilesystemMockingTestCase): def test_none_returned_if_dmidecode_not_in_path(self): self.patched_funcs.enter_context( - mock.patch.object(util, 'which', lambda _: False)) + mock.patch.object(subp, 'which', lambda _: False)) self.patch_mapping({}) self.assertIsNone(util.read_dmi_data('expect-fail')) @@ -736,219 +744,6 @@ class TestReadSeeded(helpers.TestCase): self.assertEqual(found_ud, ud) -class TestSubp(helpers.CiTestCase): - with_logs = True - allowed_subp = [BASH, 'cat', helpers.CiTestCase.SUBP_SHELL_TRUE, - BOGUS_COMMAND, sys.executable] - - stdin2err = [BASH, '-c', 'cat >&2'] - stdin2out = ['cat'] - utf8_invalid = b'ab\xaadef' - utf8_valid = b'start \xc3\xa9 end' - utf8_valid_2 = b'd\xc3\xa9j\xc8\xa7' - printenv = [BASH, '-c', 'for n in "$@"; do echo "$n=${!n}"; done', '--'] - - def printf_cmd(self, *args): - # bash's printf supports \xaa. So does /usr/bin/printf - # but by using bash, we remove dependency on another program. - return([BASH, '-c', 'printf "$@"', 'printf'] + list(args)) - - def test_subp_handles_bytestrings(self): - """subp can run a bytestring command if shell is True.""" - tmp_file = self.tmp_path('test.out') - cmd = 'echo HI MOM >> {tmp_file}'.format(tmp_file=tmp_file) - (out, _err) = util.subp(cmd.encode('utf-8'), shell=True) - self.assertEqual(u'', out) - self.assertEqual(u'', _err) - self.assertEqual('HI MOM\n', util.load_file(tmp_file)) - - def test_subp_handles_strings(self): - """subp can run a string command if shell is True.""" - tmp_file = self.tmp_path('test.out') - cmd = 'echo HI MOM >> {tmp_file}'.format(tmp_file=tmp_file) - (out, _err) = util.subp(cmd, shell=True) - self.assertEqual(u'', out) - self.assertEqual(u'', _err) - self.assertEqual('HI MOM\n', util.load_file(tmp_file)) - - def test_subp_handles_utf8(self): - # The given bytes contain utf-8 accented characters as seen in e.g. - # the "deja dup" package in Ubuntu. - cmd = self.printf_cmd(self.utf8_valid_2) - (out, _err) = util.subp(cmd, capture=True) - self.assertEqual(out, self.utf8_valid_2.decode('utf-8')) - - def test_subp_respects_decode_false(self): - (out, err) = util.subp(self.stdin2out, capture=True, decode=False, - data=self.utf8_valid) - self.assertTrue(isinstance(out, bytes)) - self.assertTrue(isinstance(err, bytes)) - self.assertEqual(out, self.utf8_valid) - - def test_subp_decode_ignore(self): - # this executes a string that writes invalid utf-8 to stdout - (out, _err) = util.subp(self.printf_cmd('abc\\xaadef'), - capture=True, decode='ignore') - self.assertEqual(out, 'abcdef') - - def test_subp_decode_strict_valid_utf8(self): - (out, _err) = util.subp(self.stdin2out, capture=True, - decode='strict', data=self.utf8_valid) - self.assertEqual(out, self.utf8_valid.decode('utf-8')) - - def test_subp_decode_invalid_utf8_replaces(self): - (out, _err) = util.subp(self.stdin2out, capture=True, - data=self.utf8_invalid) - expected = self.utf8_invalid.decode('utf-8', 'replace') - self.assertEqual(out, expected) - - def test_subp_decode_strict_raises(self): - args = [] - kwargs = {'args': self.stdin2out, 'capture': True, - 'decode': 'strict', 'data': self.utf8_invalid} - self.assertRaises(UnicodeDecodeError, util.subp, *args, **kwargs) - - def test_subp_capture_stderr(self): - data = b'hello world' - (out, err) = util.subp(self.stdin2err, capture=True, - decode=False, data=data, - update_env={'LC_ALL': 'C'}) - self.assertEqual(err, data) - self.assertEqual(out, b'') - - def test_subp_reads_env(self): - with mock.patch.dict("os.environ", values={'FOO': 'BAR'}): - out, _err = util.subp(self.printenv + ['FOO'], capture=True) - self.assertEqual('FOO=BAR', out.splitlines()[0]) - - def test_subp_env_and_update_env(self): - out, _err = util.subp( - self.printenv + ['FOO', 'HOME', 'K1', 'K2'], capture=True, - env={'FOO': 'BAR'}, - update_env={'HOME': '/myhome', 'K2': 'V2'}) - self.assertEqual( - ['FOO=BAR', 'HOME=/myhome', 'K1=', 'K2=V2'], out.splitlines()) - - def test_subp_update_env(self): - extra = {'FOO': 'BAR', 'HOME': '/root', 'K1': 'V1'} - with mock.patch.dict("os.environ", values=extra): - out, _err = util.subp( - self.printenv + ['FOO', 'HOME', 'K1', 'K2'], capture=True, - update_env={'HOME': '/myhome', 'K2': 'V2'}) - - self.assertEqual( - ['FOO=BAR', 'HOME=/myhome', 'K1=V1', 'K2=V2'], out.splitlines()) - - def test_subp_warn_missing_shebang(self): - """Warn on no #! in script""" - noshebang = self.tmp_path('noshebang') - util.write_file(noshebang, 'true\n') - - os.chmod(noshebang, os.stat(noshebang).st_mode | stat.S_IEXEC) - with self.allow_subp([noshebang]): - self.assertRaisesRegex(util.ProcessExecutionError, - r'Missing #! in script\?', - util.subp, (noshebang,)) - - def test_subp_combined_stderr_stdout(self): - """Providing combine_capture as True redirects stderr to stdout.""" - data = b'hello world' - (out, err) = util.subp(self.stdin2err, capture=True, - combine_capture=True, decode=False, data=data) - self.assertEqual(b'', err) - self.assertEqual(data, out) - - def test_returns_none_if_no_capture(self): - (out, err) = util.subp(self.stdin2out, data=b'', capture=False) - self.assertIsNone(err) - self.assertIsNone(out) - - def test_exception_has_out_err_are_bytes_if_decode_false(self): - """Raised exc should have stderr, stdout as bytes if no decode.""" - with self.assertRaises(util.ProcessExecutionError) as cm: - util.subp([BOGUS_COMMAND], decode=False) - self.assertTrue(isinstance(cm.exception.stdout, bytes)) - self.assertTrue(isinstance(cm.exception.stderr, bytes)) - - def test_exception_has_out_err_are_bytes_if_decode_true(self): - """Raised exc should have stderr, stdout as string if no decode.""" - with self.assertRaises(util.ProcessExecutionError) as cm: - util.subp([BOGUS_COMMAND], decode=True) - self.assertTrue(isinstance(cm.exception.stdout, str)) - self.assertTrue(isinstance(cm.exception.stderr, str)) - - def test_bunch_of_slashes_in_path(self): - self.assertEqual("/target/my/path/", - util.target_path("/target/", "//my/path/")) - self.assertEqual("/target/my/path/", - util.target_path("/target/", "///my/path/")) - - def test_c_lang_can_take_utf8_args(self): - """Independent of system LC_CTYPE, args can contain utf-8 strings. - - When python starts up, its default encoding gets set based on - the value of LC_CTYPE. If no system locale is set, the default - encoding for both python2 and python3 in some paths will end up - being ascii. - - Attempts to use setlocale or patching (or changing) os.environ - in the current environment seem to not be effective. - - This test starts up a python with LC_CTYPE set to C so that - the default encoding will be set to ascii. In such an environment - Popen(['command', 'non-ascii-arg']) would cause a UnicodeDecodeError. - """ - python_prog = '\n'.join([ - 'import json, sys', - 'from cloudinit.util import subp', - 'data = sys.stdin.read()', - 'cmd = json.loads(data)', - 'subp(cmd, capture=False)', - '']) - cmd = [BASH, '-c', 'echo -n "$@"', '--', - self.utf8_valid.decode("utf-8")] - python_subp = [sys.executable, '-c', python_prog] - - out, _err = util.subp( - python_subp, update_env={'LC_CTYPE': 'C'}, - data=json.dumps(cmd).encode("utf-8"), - decode=False) - self.assertEqual(self.utf8_valid, out) - - def test_bogus_command_logs_status_messages(self): - """status_cb gets status messages logs on bogus commands provided.""" - logs = [] - - def status_cb(log): - logs.append(log) - - with self.assertRaises(util.ProcessExecutionError): - util.subp([BOGUS_COMMAND], status_cb=status_cb) - - expected = [ - 'Begin run command: {cmd}\n'.format(cmd=BOGUS_COMMAND), - 'ERROR: End run command: invalid command provided\n'] - self.assertEqual(expected, logs) - - def test_command_logs_exit_codes_to_status_cb(self): - """status_cb gets status messages containing command exit code.""" - logs = [] - - def status_cb(log): - logs.append(log) - - with self.assertRaises(util.ProcessExecutionError): - util.subp([BASH, '-c', 'exit 2'], status_cb=status_cb) - util.subp([BASH, '-c', 'exit 0'], status_cb=status_cb) - - expected = [ - 'Begin run command: %s -c exit 2\n' % BASH, - 'ERROR: End run command: exit(2)\n', - 'Begin run command: %s -c exit 0\n' % BASH, - 'End run command: exit(0)\n'] - self.assertEqual(expected, logs) - - class TestEncode(helpers.TestCase): """Test the encoding functions""" def test_decode_binary_plain_text_with_hex(self): @@ -969,7 +764,7 @@ class TestProcessExecutionError(helpers.TestCase): empty_description = 'Unexpected error while running command.' def test_pexec_error_indent_text(self): - error = util.ProcessExecutionError() + error = subp.ProcessExecutionError() msg = 'abc\ndef' formatted = 'abc\n{0}def'.format(' ' * 4) self.assertEqual(error._indent_text(msg, indent_level=4), formatted) @@ -979,10 +774,10 @@ class TestProcessExecutionError(helpers.TestCase): error._indent_text(msg.encode()), type(msg.encode())) def test_pexec_error_type(self): - self.assertIsInstance(util.ProcessExecutionError(), IOError) + self.assertIsInstance(subp.ProcessExecutionError(), IOError) def test_pexec_error_empty_msgs(self): - error = util.ProcessExecutionError() + error = subp.ProcessExecutionError() self.assertTrue(all(attr == self.empty_attr for attr in (error.stderr, error.stdout, error.reason))) self.assertEqual(error.description, self.empty_description) @@ -996,7 +791,7 @@ class TestProcessExecutionError(helpers.TestCase): stderr_msg = 'error error' cmd = 'test command' exit_code = 3 - error = util.ProcessExecutionError( + error = subp.ProcessExecutionError( stdout=stdout_msg, stderr=stderr_msg, exit_code=3, cmd=cmd) self.assertEqual(str(error), self.template.format( description=self.empty_description, stdout=stdout_msg, @@ -1007,7 +802,7 @@ class TestProcessExecutionError(helpers.TestCase): # make sure bytes is converted handled properly when formatting stdout_msg = 'multi\nline\noutput message'.encode() stderr_msg = 'multi\nline\nerror message\n\n\n' - error = util.ProcessExecutionError( + error = subp.ProcessExecutionError( stdout=stdout_msg, stderr=stderr_msg) self.assertEqual( str(error), @@ -1172,4 +967,133 @@ class TestGetProcEnv(helpers.TestCase): my_ppid = os.getppid() self.assertEqual(my_ppid, util.get_proc_ppid(my_pid)) + +class TestKernelVersion(): + """test kernel version function""" + + params = [ + ('5.6.19-300.fc32.x86_64', (5, 6)), + ('4.15.0-101-generic', (4, 15)), + ('3.10.0-1062.12.1.vz7.131.10', (3, 10)), + ('4.18.0-144.el8.x86_64', (4, 18))] + + @mock.patch('os.uname') + @pytest.mark.parametrize("uname_release,expected", params) + def test_kernel_version(self, m_uname, uname_release, expected): + m_uname.return_value.release = uname_release + assert expected == util.kernel_version() + + +class TestFindDevs: + @mock.patch('cloudinit.subp.subp') + def test_find_devs_with(self, m_subp): + m_subp.return_value = ( + '/dev/sda1: UUID="some-uuid" TYPE="ext4" PARTUUID="some-partid"', + '' + ) + devlist = util.find_devs_with() + assert devlist == [ + '/dev/sda1: UUID="some-uuid" TYPE="ext4" PARTUUID="some-partid"'] + + devlist = util.find_devs_with("LABEL_FATBOOT=A_LABEL") + assert devlist == [ + '/dev/sda1: UUID="some-uuid" TYPE="ext4" PARTUUID="some-partid"'] + + @mock.patch('cloudinit.subp.subp') + def test_find_devs_with_openbsd(self, m_subp): + m_subp.return_value = ( + 'cd0:,sd0:630d98d32b5d3759,sd1:,fd0:', '' + ) + devlist = util.find_devs_with_openbsd() + assert devlist == ['/dev/cd0a', '/dev/sd1i'] + + @mock.patch('cloudinit.subp.subp') + def test_find_devs_with_openbsd_with_criteria(self, m_subp): + m_subp.return_value = ( + 'cd0:,sd0:630d98d32b5d3759,sd1:,fd0:', '' + ) + devlist = util.find_devs_with_openbsd(criteria="TYPE=iso9660") + assert devlist == ['/dev/cd0a'] + + # lp: #1841466 + devlist = util.find_devs_with_openbsd(criteria="LABEL_FATBOOT=A_LABEL") + assert devlist == ['/dev/cd0a', '/dev/sd1i'] + + @pytest.mark.parametrize( + 'criteria,expected_devlist', ( + (None, ['/dev/msdosfs/EFISYS', '/dev/iso9660/config-2']), + ('TYPE=iso9660', ['/dev/iso9660/config-2']), + ('TYPE=vfat', ['/dev/msdosfs/EFISYS']), + ('LABEL_FATBOOT=A_LABEL', []), # lp: #1841466 + ), + ) + @mock.patch('glob.glob') + def test_find_devs_with_freebsd(self, m_glob, criteria, expected_devlist): + def fake_glob(pattern): + msdos = ["/dev/msdosfs/EFISYS"] + iso9660 = ["/dev/iso9660/config-2"] + if pattern == "/dev/msdosfs/*": + return msdos + elif pattern == "/dev/iso9660/*": + return iso9660 + raise Exception + m_glob.side_effect = fake_glob + + devlist = util.find_devs_with_freebsd(criteria=criteria) + assert devlist == expected_devlist + + @pytest.mark.parametrize( + 'criteria,expected_devlist', ( + (None, ['/dev/ld0', '/dev/dk0', '/dev/dk1', '/dev/cd0']), + ('TYPE=iso9660', ['/dev/cd0']), + ('TYPE=vfat', ["/dev/ld0", "/dev/dk0", "/dev/dk1"]), + ('LABEL_FATBOOT=A_LABEL', # lp: #1841466 + ['/dev/ld0', '/dev/dk0', '/dev/dk1', '/dev/cd0']), + ) + ) + @mock.patch("cloudinit.subp.subp") + def test_find_devs_with_netbsd(self, m_subp, criteria, expected_devlist): + side_effect_values = [ + ("ld0 dk0 dk1 cd0", ""), + ( + ( + "mscdlabel: CDIOREADTOCHEADER: " + "Inappropriate ioctl for device\n" + "track (ctl=4) at sector 0\n" + "disklabel not written\n" + ), + "", + ), + ( + ( + "mscdlabel: CDIOREADTOCHEADER: " + "Inappropriate ioctl for device\n" + "track (ctl=4) at sector 0\n" + "disklabel not written\n" + ), + "", + ), + ( + ( + "mscdlabel: CDIOREADTOCHEADER: " + "Inappropriate ioctl for device\n" + "track (ctl=4) at sector 0\n" + "disklabel not written\n" + ), + "", + ), + ( + ( + "track (ctl=4) at sector 0\n" + 'ISO filesystem, label "config-2", ' + "creation time: 2020/03/31 17:29\n" + "adding as 'a'\n" + ), + "", + ), + ] + m_subp.side_effect = side_effect_values + devlist = util.find_devs_with_netbsd(criteria=criteria) + assert devlist == expected_devlist + # vi: ts=4 expandtab |