diff options
Diffstat (limited to 'tests/unittests/test_util.py')
-rw-r--r-- | tests/unittests/test_util.py | 135 |
1 files changed, 135 insertions, 0 deletions
diff --git a/tests/unittests/test_util.py b/tests/unittests/test_util.py index 4a92e741..8685b8e2 100644 --- a/tests/unittests/test_util.py +++ b/tests/unittests/test_util.py @@ -8,7 +8,9 @@ import shutil import stat import tempfile +import json import six +import sys import yaml from cloudinit import importer, util @@ -364,6 +366,56 @@ class TestMountinfoParsing(helpers.ResourceUsingTestCase): expected = ('none', 'tmpfs', '/run/lock') self.assertEqual(expected, util.parse_mount_info('/run/lock', lines)) + @mock.patch('cloudinit.util.subp') + def test_get_device_info_from_zpool(self, zpool_output): + # mock subp command from util.get_mount_info_fs_on_zpool + zpool_output.return_value = ( + self.readResource('zpool_status_simple.txt'), '' + ) + # save function return values and do asserts + ret = util.get_device_info_from_zpool('vmzroot') + self.assertEqual('gpt/system', ret) + self.assertIsNotNone(ret) + + @mock.patch('cloudinit.util.subp') + def test_get_device_info_from_zpool_on_error(self, zpool_output): + # mock subp command from util.get_mount_info_fs_on_zpool + zpool_output.return_value = ( + self.readResource('zpool_status_simple.txt'), 'error' + ) + # save function return values and do asserts + ret = util.get_device_info_from_zpool('vmzroot') + self.assertIsNone(ret) + + @mock.patch('cloudinit.util.subp') + def test_parse_mount_with_ext(self, mount_out): + mount_out.return_value = (self.readResource('mount_parse_ext.txt'), '') + # this one is valid and exists in mount_parse_ext.txt + ret = util.parse_mount('/var') + self.assertEqual(('/dev/mapper/vg00-lv_var', 'ext4', '/var'), ret) + # another one that is valid and exists + ret = util.parse_mount('/') + self.assertEqual(('/dev/mapper/vg00-lv_root', 'ext4', '/'), ret) + # this one exists in mount_parse_ext.txt + ret = util.parse_mount('/sys/kernel/debug') + self.assertIsNone(ret) + # this one does not even exist in mount_parse_ext.txt + ret = util.parse_mount('/not/existing/mount') + self.assertIsNone(ret) + + @mock.patch('cloudinit.util.subp') + def test_parse_mount_with_zfs(self, mount_out): + mount_out.return_value = (self.readResource('mount_parse_zfs.txt'), '') + # this one is valid and exists in mount_parse_zfs.txt + ret = util.parse_mount('/var') + self.assertEqual(('vmzroot/ROOT/freebsd/var', 'zfs', '/var'), ret) + # this one is the root, valid and also exists in mount_parse_zfs.txt + ret = util.parse_mount('/') + self.assertEqual(('vmzroot/ROOT/freebsd', 'zfs', '/'), ret) + # this one does not even exist in mount_parse_ext.txt + ret = util.parse_mount('/not/existing/mount') + self.assertIsNone(ret) + class TestReadDMIData(helpers.FilesystemMockingTestCase): @@ -630,6 +682,24 @@ class TestSubp(helpers.CiTestCase): # but by using bash, we remove dependency on another program. return([BASH, '-c', 'printf "$@"', 'printf'] + list(args)) + def test_subp_handles_bytestrings(self): + """subp can run a bytestring command if shell is True.""" + tmp_file = self.tmp_path('test.out') + cmd = 'echo HI MOM >> {tmp_file}'.format(tmp_file=tmp_file) + (out, _err) = util.subp(cmd.encode('utf-8'), shell=True) + self.assertEqual(u'', out) + self.assertEqual(u'', _err) + self.assertEqual('HI MOM\n', util.load_file(tmp_file)) + + def test_subp_handles_strings(self): + """subp can run a string command if shell is True.""" + tmp_file = self.tmp_path('test.out') + cmd = 'echo HI MOM >> {tmp_file}'.format(tmp_file=tmp_file) + (out, _err) = util.subp(cmd, shell=True) + self.assertEqual(u'', out) + self.assertEqual(u'', _err) + self.assertEqual('HI MOM\n', util.load_file(tmp_file)) + def test_subp_handles_utf8(self): # The given bytes contain utf-8 accented characters as seen in e.g. # the "deja dup" package in Ubuntu. @@ -733,6 +803,71 @@ class TestSubp(helpers.CiTestCase): self.assertEqual("/target/my/path/", util.target_path("/target/", "///my/path/")) + def test_c_lang_can_take_utf8_args(self): + """Independent of system LC_CTYPE, args can contain utf-8 strings. + + When python starts up, its default encoding gets set based on + the value of LC_CTYPE. If no system locale is set, the default + encoding for both python2 and python3 in some paths will end up + being ascii. + + Attempts to use setlocale or patching (or changing) os.environ + in the current environment seem to not be effective. + + This test starts up a python with LC_CTYPE set to C so that + the default encoding will be set to ascii. In such an environment + Popen(['command', 'non-ascii-arg']) would cause a UnicodeDecodeError. + """ + python_prog = '\n'.join([ + 'import json, sys', + 'from cloudinit.util import subp', + 'data = sys.stdin.read()', + 'cmd = json.loads(data)', + 'subp(cmd, capture=False)', + '']) + cmd = [BASH, '-c', 'echo -n "$@"', '--', + self.utf8_valid.decode("utf-8")] + python_subp = [sys.executable, '-c', python_prog] + + out, _err = util.subp( + python_subp, update_env={'LC_CTYPE': 'C'}, + data=json.dumps(cmd).encode("utf-8"), + decode=False) + self.assertEqual(self.utf8_valid, out) + + def test_bogus_command_logs_status_messages(self): + """status_cb gets status messages logs on bogus commands provided.""" + logs = [] + + def status_cb(log): + logs.append(log) + + with self.assertRaises(util.ProcessExecutionError): + util.subp([self.bogus_command], status_cb=status_cb) + + expected = [ + 'Begin run command: {cmd}\n'.format(cmd=self.bogus_command), + 'ERROR: End run command: invalid command provided\n'] + self.assertEqual(expected, logs) + + def test_command_logs_exit_codes_to_status_cb(self): + """status_cb gets status messages containing command exit code.""" + logs = [] + + def status_cb(log): + logs.append(log) + + with self.assertRaises(util.ProcessExecutionError): + util.subp(['ls', '/I/dont/exist'], status_cb=status_cb) + util.subp(['ls'], status_cb=status_cb) + + expected = [ + 'Begin run command: ls /I/dont/exist\n', + 'ERROR: End run command: exit(2)\n', + 'Begin run command: ls\n', + 'End run command: exit(0)\n'] + self.assertEqual(expected, logs) + class TestEncode(helpers.TestCase): """Test the encoding functions""" |